refactor: 重构sse消息推送,封装sseManger

This commit is contained in:
czhqwer
2025-03-30 14:03:42 +08:00
parent 1982a450a0
commit a38bfe0a69
9 changed files with 52 additions and 128 deletions

View File

@@ -154,7 +154,7 @@ public class FileController {
}
enableShare = enable;
Map<String, Object> data = MapUtil.of("enable", enable);
sseService.notifySystemEvent(new NotifyMessage("enableShare", data));
sseService.notification(new NotifyMessage("enableShare", data));
return Result.success();
}

View File

@@ -18,16 +18,13 @@ public class SseController {
@Resource
private ISseService sseService;
@GetMapping("/subscribeSharedFiles")
public SseEmitter subscribeSharedFiles(HttpServletRequest request) {
/**
* 通用订阅
*/
@GetMapping("/subscribe")
public SseEmitter subscribe(HttpServletRequest request) {
String remoteAddr = request.getRemoteAddr();
return sseService.subscribeSharedFiles(remoteAddr);
}
@GetMapping("/subscribeSystemEvent")
public SseEmitter subscribeSystemEvent(HttpServletRequest request) {
String remoteAddr = request.getRemoteAddr();
return sseService.subscribeSystemEvent(remoteAddr);
return sseService.subscribe(remoteAddr);
}

View File

@@ -6,23 +6,13 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
public interface ISseService {
/**
* 订阅共享文件更新
* 通用订阅
*/
SseEmitter subscribeSharedFiles(String clientId);
SseEmitter subscribe(String remoteAddr);
/**
* 订阅系统事件
* 通用通知
*/
SseEmitter subscribeSystemEvent(String clientId);
/**
* 通知共享文件更新
*/
void notifySharedFileUpdate(String message);
/**
* 通知系统事件
*/
void notifySystemEvent(NotifyMessage message);
void notification(NotifyMessage message);
}

View File

@@ -58,7 +58,7 @@ public class AuthServiceImpl implements IAuthService {
data.put("password", password);
NotifyMessage message = new NotifyMessage("setPassword", data);
sseService.notifySystemEvent(message);
sseService.notification(message);
}
@Override

View File

@@ -1,6 +1,7 @@
package cn.czh.service.impl;
import cn.czh.base.BusinessException;
import cn.czh.dto.NotifyMessage;
import cn.czh.entity.SharedFile;
import cn.czh.entity.UploadFile;
import cn.czh.mapper.ShareFileMapper;
@@ -76,14 +77,22 @@ public class FileServiceImpl implements IFileService {
sharedFile.setFileIdentifier(identifier);
sharedFile.setCreateTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
shareFileMapper.insert(sharedFile);
sseService.notifySharedFileUpdate("add");
HashMap<String, Object> map = new HashMap<>();
map.put("fileIdentifier", identifier);
map.put("action", "add");
NotifyMessage message = new NotifyMessage("sharedFileUpdate", map);
sseService.notification(message);
}
}
@Override
public void removeSharedFile(String fileIdentifier) {
shareFileMapper.delete(Wrappers.lambdaQuery(SharedFile.class).eq(SharedFile::getFileIdentifier, fileIdentifier));
sseService.notifySharedFileUpdate("remove");
HashMap<String, Object> map = new HashMap<>();
map.put("fileIdentifier", fileIdentifier);
map.put("action", "remote");
NotifyMessage message = new NotifyMessage("sharedFileUpdate", map);
sseService.notification(message);
}
@Override

View File

@@ -16,41 +16,23 @@ import java.util.concurrent.ConcurrentHashMap;
@Service
public class SseServiceImpl implements ISseService {
private final Map<String, SseEmitter> sharedFilesEmitters = new ConcurrentHashMap<>();
private final Map<String, SseEmitter> systemEventEmitters = new ConcurrentHashMap<>();
private final Map<String, SseEmitter> clientEmitters = new ConcurrentHashMap<>();
@Override
public SseEmitter subscribeSharedFiles(String clientId) {
SseEmitter emitter = new SseEmitter(0L); // 设置超时时间
sharedFilesEmitters.put(clientId, emitter);
public SseEmitter subscribe(String remoteAddr) {
SseEmitter emitter = new SseEmitter(0L);
clientEmitters.put(remoteAddr, emitter);
emitter.onCompletion(() -> sharedFilesEmitters.remove(clientId));
emitter.onTimeout(() -> sharedFilesEmitters.remove(clientId));
return emitter;
}
@Override
public SseEmitter subscribeSystemEvent(String clientId) {
SseEmitter emitter = new SseEmitter(0L); // 设置超时时间
systemEventEmitters.put(clientId, emitter);
emitter.onCompletion(() -> systemEventEmitters.remove(clientId));
emitter.onTimeout(() -> systemEventEmitters.remove(clientId));
emitter.onCompletion(() -> clientEmitters.remove(remoteAddr));
emitter.onTimeout(() -> clientEmitters.remove(remoteAddr));
return emitter;
}
@Async
@Override
public void notifySharedFileUpdate(String message) {
sharedFilesEmitters.forEach((clientId, emitter) ->
sendNotification(clientId, emitter, "sharedFileUpdate", message, sharedFilesEmitters));
}
@Async
@Override
public void notifySystemEvent(NotifyMessage message) {
systemEventEmitters.forEach((clientId, emitter) ->
sendNotification(clientId, emitter, "systemEvent", JSONUtil.toJsonStr(message), systemEventEmitters));
public void notification(NotifyMessage message) {
clientEmitters.forEach((clientId, emitter) ->
sendNotification(clientId, emitter, message.getType(), JSONUtil.toJsonStr(message), clientEmitters));
}
private void sendNotification(String clientId, SseEmitter emitter, String event, String message, Map<String, SseEmitter> emittersMap) {

View File

@@ -51,6 +51,7 @@
</template>
<script>
import sseManager from '@/utils/sse';
import { getPassword, setPassword } from '@/utils/api';
export default {
@@ -64,10 +65,16 @@ export default {
tempPassword: '',
inputPassword: '',
showPasswordDialog: false,
mainUser: false,
};
},
mounted() {
this.getPassword();
sseManager.subscribe('setPassword', () => {
if (!this.mainUser) {
this.getPassword();
}
});
},
methods: {
async getPassword() {
@@ -76,7 +83,7 @@ export default {
const dataStr = res.data.toString();
const password = dataStr.slice(0, -1);
const mainUser = dataStr.slice(-1) === '1';
this.mainUser = mainUser;
this.tempPassword = password;
if (password) {
this.tempEnablePassword = true;

View File

@@ -68,7 +68,8 @@
</template>
<script>
import { getSharedFiles, enableShare, getShareStatus, shareAddress, unShareFile, subscribeSharedFiles, subscribeSystemEvent } from '@/utils/api';
import sseManager from '@/utils/sse'
import { getSharedFiles, enableShare, getShareStatus, shareAddress, unShareFile } from '@/utils/api';
export default {
name: 'ShareFile',
@@ -80,8 +81,6 @@ export default {
fileList: [],
enableShare: false,
isAdmin: false,
sharedFilesEventSource: null,
systemEventSource: null,
};
},
mounted() {
@@ -89,26 +88,22 @@ export default {
this.getShareStatus();
this.getShareAddress();
this.sharedFilesEventSource = subscribeSharedFiles(() => {
sseManager.subscribe('sharedFileUpdate', () => {
this.fetchFiles();
});
this.systemEventSource = subscribeSystemEvent((event) => {
if (event.type === 'enableShare') {
this.fileList = [];
this.enableShare = event.data.enable;
this.fetchFiles();
}
sseManager.subscribe('enableShare', (event) => {
this.fileList = [];
this.enableShare = event.enable;
this.fetchFiles();
});
},
beforeDestroy() {
if (this.sharedFilesEventSource) {
this.sharedFilesEventSource.close();
}
if (this.systemEventSource) {
this.systemEventSource.close();
}
sseManager.unsubscribe('sharedFileUpdate');
sseManager.unsubscribe('enableShare');
},
methods: {
async getShareAddress() {
try {

View File

@@ -1,4 +1,4 @@
import { http, baseUrl, httpExtra } from './http';
import { http, httpExtra } from './http';
/**
* 获取上传进度
@@ -271,60 +271,6 @@ const setPassword = (password) => {
return http.post("/config/setPassword", formData);
}
/**
* 订阅共享文件更新
* @param {Function} callback 文件更新的回调函数
* @returns {EventSource} 返回 EventSource 实例
*/
const subscribeSharedFiles = (callback) => {
const eventSource = new EventSource(`${baseUrl}/sse/subscribeSharedFiles`);
eventSource.addEventListener('sharedFileUpdate', (event) => {
callback(event.data);
});
eventSource.onerror = (error) => {
console.error("SSE连接异常:", error);
if (eventSource.readyState === EventSource.CLOSED) {
console.log("连接已关闭");
}
};
return eventSource;
};
/**
* 订阅系统事件更新
* @param {Function} callback 文件更新的回调函数
* @returns {EventSource} 返回 EventSource 实例
*/
const subscribeSystemEvent = (callback) => {
const eventSource = new EventSource(`${baseUrl}/sse/subscribeSystemEvent`);
eventSource.addEventListener('systemEvent', (event) => {
try {
const data = JSON.parse(event.data);
callback(data);
} catch (e) {
console.error('JSON解析失败:', e, '原始数据:', event.data);
}
});
eventSource.onmessage = (event) => {
console.log('[SSE] Received default message:', event.data);
};
eventSource.onerror = (error) => {
console.error("SSE连接异常:", error);
if (eventSource.readyState === EventSource.CLOSED) {
console.log("连接已关闭");
}
};
return eventSource;
}
export {
getUploadProgress,
createMultipartUpload,
@@ -345,7 +291,5 @@ export {
setPassword,
addSharedFile,
deleteFile,
subscribeSharedFiles,
subscribeSystemEvent,
httpExtra
};