本文最后更新于:7 个月前
前言
经过三周的沉寂,我终于迎来了个人博客上全新一篇的契机
今天,我终于有机会探讨 Spring Boot 集成 WebSocket,并将其记录在博客中
无比兴奋的我,废话不多说,期待与读者们分享这一激动人心的主题
正文
WebSocket
项目场景
最近在开发新的网站:MemoryChat,计划由伙伴匹配系统从移动端实现改为PC端,并提供博文分享、组队交流等功能
其中最为核心的,就是实现用户间的实时通信
实现步骤(Version1.0)
这里仅展示核心业务代码,详细的业务流程会在后续讲解
后端
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
1 2 3 4 5 6 7
| @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| package com.memory.usercenter.common;
import com.google.gson.Gson; import com.memory.usercenter.model.DTO.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.net.http.WebSocket; import java.util.concurrent.CopyOnWriteArraySet;
@Slf4j @ServerEndpoint("/websocket/{sid}") @Component public class WebSocketServer { private static RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
@Resource public void setRedisTemplate(RedisTemplate redisTemplate) { WebSocketServer.redisTemplate = redisTemplate; }
private static int onlineCount = 0; private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); private Session session; private String sid = "";
@OnOpen public void onOpen(Session session, @PathParam("sid") String sid) { this.session = session; webSocketSet.add(this); this.sid = sid; addOnlineCount(); try { sendMessage("conn_success"); log.info("有新窗口开始监听:" + sid + ",当前在线人数为:" + getOnlineCount()); } catch (IOException e) { log.error("websocket IO Exception"); } }
@OnClose public void onClose() { webSocketSet.remove(this); subOnlineCount(); log.info("释放的sid为:" + sid); log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
@OnMessage public void onMessage(String message, Session session) { log.info("收到来自窗口" + sid + "的信息:" + message); Gson gson = new Gson(); Message newMessage = gson.fromJson(message, Message.class);
Long senderId = newMessage.getSenderId(); Long receiverId = newMessage.getReceiverId(); redisTemplate.opsForValue().set("1", "2");
for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } }
@OnError public void onError(Session session, Throwable error) { log.error("发生错误"); error.printStackTrace(); }
public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); }
public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException { log.info("推送消息到窗口" + sid + ",推送内容:" + message);
for (WebSocketServer item : webSocketSet) { try { if (sid == null) {
} else if (item.sid.equals(sid)) { item.sendMessage(message); } } catch (IOException e) { continue; } } }
public static synchronized int getOnlineCount() { return onlineCount; }
public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; }
public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; }
public static CopyOnWriteArraySet<WebSocketServer> getWebSocketSet() { return webSocketSet; } }
|
前端
我前端是使用Vue语法,并且引入了Ant Design Vue组件(2023/09/13晚)
1 2 3 4 5 6 7 8 9 10 11 12 13
| <template> <div> Welcome <br/> <input id="text" value="这是一条测试消息"/> <a-button @click="sendMessage">发送消息</a-button> <hr/> <a-button @click="openSocket()">连接socket</a-button> <a-button @click="closeWebSocket()">关闭WebSocket连接</a-button> <hr/> <div id="message">{{ retMes }}</div> </div> </template>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| <script setup> import {ref} from "vue"; import currentUser from "@/model/currentUser";
const retMes = ref(""); const currentUserId = currentUser.value.id; let socket;
const initMessage = { "senderId": "", "receiverId": "", "content": "", "sendTime": "" } const message = ref(initMessage)
function openSocket() { if (typeof WebSocket == "undefined") { console.log("您的浏览器不支持WebSocket"); } else { console.log("您的浏览器支持WebSocket"); const socketUrl = `ws://localhost:8081/api/websocket/${currentUserId}`; console.log(socketUrl); console.log(currentUser); console.log(currentUserId); if (socket != null) { socket.close(); socket = null; } socket = new WebSocket(socketUrl); socket.onopen = function () { console.log("websocket已打开"); setMessage("websocket已打开"); }; socket.onmessage = function (msg) { console.log(msg.data); setMessage("服务端回应: " + msg.data); }; socket.onclose = function () { console.log("websocket已关闭"); setMessage("websocket已关闭"); }; socket.onerror = function () { console.log("websocket发生了错误"); }; } }
function sendMessage() { if (typeof WebSocket == "undefined") { console.log("您的浏览器不支持WebSocket"); } else { console.log("您的浏览器支持WebSocket");
message.value = JSON.stringify({ senderId: currentUserId, receiverId: "1657284893320364034", content: "你好", sendTime: new Date() });
if (socket == null) { retMes.value += "服务未开启" return; } socket.send(message.value); setMessage(message.value); } }
function setMessage(message) { retMes.value += message + "\n"; }
function closeWebSocket() { socket.close(); }
window.onbeforeunload = function () { closeWebSocket(); }; </script>
|
快速测试
服务器接收到消息,记录连接的用户id,处理消息,并转发消息到指定用户id
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@OnMessage public void onMessage(String message, Session session) { log.info("收到来自窗口" + sid + "的信息:" + message); Gson gson = new Gson(); Message newMessage = gson.fromJson(message, Message.class);
Long senderId = newMessage.getSenderId(); Long receiverId = newMessage.getReceiverId(); ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.set("memory:user:message:" + senderId, message, 30, TimeUnit.MINUTES); ops.set("memory:user:message:" + receiverId, message, 30, TimeUnit.MINUTES); for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } }
|
前端获取到服务器转发的消息,展示在页面(2023/09/13晚)
实时双向通信(Version2.0)
业务流程梳理
实现双方实时通信的基础是什么?宏观的来讲,当然是通信双方之间建立一条连接,用来收发数据
- 如何建立这条连接呢?客户端之间建立连接一定是要把服务器作为中转站,让服务器负责连接两端
建立连接之后,双方就可以发送消息了,服务器会帮我们搞定消息从哪里来、发送到哪里去
- 这些消息的内容一定得包含这些字段:发送者、接收者、消息内容、发送时间等
- 这样,服务器就能知晓这些消息的来源和目的地了,能够准确地将消息转发到接收者
收发过程中的这么多消息,应该存放在哪里呢?存放在数据库,我们这里建议使用Redis存储
- 这样,聊天产生的消息都会存进Redis中
- 当用户打开聊天界面时,加载双方的聊天记录
- 当用户发送消息时,服务器负责精确转发给对方
- 用户成功发送消息有反馈,用户接收到消息也有反馈
这就是实时双向通信的整个业务流程(2023/09/25早)
做好简单配置
导入 WebSocket 依赖坐标:(2023/09/30午)
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
1 2 3 4 5 6 7
| @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
|
向服务端发送消息
1 2 3 4 5 6 7
| <div> <a-input v-model:value="mesInput" placeholder="Basic usage" size="large" style="width: 90%" showCount :maxlength="300"/> <a-button @click="sendMessage" type="primary" size="large" style="margin-left: 20px"> 发送消息 </a-button> </div>
|
1 2
| const mesInput = ref("");
|
1 2
| openSocket(currentUserId);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| function openSocket(Id) { if (typeof WebSocket == "undefined") { console.log("您的浏览器不支持WebSocket"); } else { console.log("您的浏览器支持WebSocket");
const socketUrl = `ws://localhost:8081/api/websocket/${Id}`; if (socket != null) { socket.close(); socket = null; }
socket = new WebSocket(socketUrl); ........................ }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@OnOpen public void onOpen(Session session, @PathParam("sid") String sid) { this.session = session; webSocketSet.add(this); this.sid = sid; addOnlineCount(); try { sendMessage(new Gson().toJson("连接成功")); log.info("有新窗口开始监听:" + sid + ",当前在线人数为:" + getOnlineCount()); } catch (IOException e) { log.error("websocket IO Exception"); } }
|
点击发送消息,发送消息至指定端口,并实时更新聊天记录:(2023/09/30午)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| function sendMessage() { if (typeof WebSocket == "undefined") { console.log("您的浏览器不支持WebSocket"); } else { console.log("您的浏览器支持WebSocket");
if (isMesInputEmpty.value) { message.warning("发送消息不能为空") return; } sendMsg.value = JSON.stringify({ senderId: currentUserId, receiverId: chatUser.value, content: mesInput.value, sendTime: new Date() }); if (socket == null) { retMes.value += "服务未开启" return; } socket.send(sendMsg.value); getMesList(chatUser.value); message.success("成功发送一条消息") mesInput.value = ""; } }
|
服务端转发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Slf4j @ServerEndpoint("/websocket/{sid}") @Component public class WebSocketServer {
@OnMessage public void onMessage(String message) throws IOException { log.info("收到来自窗口" + sid + "的信息:" + message); boolean saveMessage = saveMessage(message); if (!saveMessage) { throw new BusinessException(ErrorCode.UPDATE_ERROR_REDIS, "用户信息存放失败"); } msgForward(message, webSocketSet, sid); } }
|
接收到客户端消息,需要进行如下操作:
1 2 3 4 5 6 7
| private static RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); @Resource public void setRedisTemplate(RedisTemplate redisTemplate) { WebSocketServer.redisTemplate = redisTemplate; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public boolean saveMessage(String message) { String msgKey = "memory:user:message:"; ................................... try { HashOperations<String, Object, Object> opsForHash = redisTemplate.opsForHash(); opsForHash.put(senderMsgKey, generateMessageId(), message); opsForHash.put(receiverMsgKey, generateMessageId(), message); long expireTime = 2; redisTemplate.expire(senderMsgKey, expireTime, TimeUnit.HOURS); redisTemplate.expire(receiverMsgKey, expireTime, TimeUnit.HOURS); } ................................... }
|
查看消息内容,判断消息来源 senderId,确认转发对象 receiverId,实现精确转发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public void msgForward(String message, CopyOnWriteArraySet<WebSocketServer> webSocketSet, String sid) { for (WebSocketServer item : webSocketSet) { try { if (item.sid.equals(sid)) { item.sendMessage(message); } else { item.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } } }
|
客户端监听
客户端监听到服务端主动发送的消息,判断该消息的接收者
1 2 3 4 5 6 7 8 9 10 11 12
| socket.onmessage = function (msg) { getMesList(chatUser.value); receiveMsg.value = JSON.parse(msg.data) if (currentUserId === receiveMsg.value.receiverId) { openNotification() getMesList(chatUser.value); } };
|
编码实现重难点
封装消息内容
1 2 3 4 5 6 7 8 9
| @Data public class Message { private Long senderId; private Long receiverId; private String content;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date sendTime; }
|
消息存放方式
1 2 3 4 5 6 7 8 9 10
|
private String generateMessageId() { UUID messageId = UUID.randomUUID(); return messageId.toString(); }
|
存放消息,每个用户下都保存了他发送过的消息记录:
发送者id即为该用户id
接收者id、消息内容、发送时间
1 2 3 4 5 6 7
| HashOperations<String, Object, Object> opsForHash = redisTemplate.opsForHash(); opsForHash.put(senderMsgKey, generateMessageId(), message); opsForHash.put(receiverMsgKey, generateMessageId(), message); long expireTime = 48; redisTemplate.expire(senderMsgKey, expireTime, TimeUnit.HOURS); redisTemplate.expire(receiverMsgKey, expireTime, TimeUnit.HOURS);
|
消息存放完成后,服务器再再转发 message 到客户端,所有客户端都会获取到消息提示
- 仅需判断消息的 receiverId是否为自己的id,选择是否接收该消息,并实时同步聊天记录
实时同步聊天记录
- 判断出消息的接收者为自己后,才会会进行下一步操作:实时获取最新的聊天记录** (2023/09/30午)
- 那么获取聊天记录的业务逻辑又应该是怎样的呢?
获取聊天记录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
@Override public List<Message> listMessage(Long senderId, Long receiverId, HttpServletRequest request) { List<Message> message1 = getMessage(senderId, receiverId); List<Message> message2 = getMessage(receiverId, senderId); List<Message> messageList = new ArrayList<>(); messageList.addAll(message1); messageList.addAll(message2);
messageList.sort(new Comparator<Message>() { @Override public int compare(Message o1, Message o2) { return o1.getSendTime().compareTo(o2.getSendTime()); } });
return messageList; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public List<Message> getMessage(Long senderId, Long receiverId) { Map<Object, Object> mesEntriesJson = redisTemplate.opsForHash().entries(USER_CHAT_MESSAGE + senderId);
Gson gson = new Gson(); String jsonString = gson.toJson(mesEntriesJson); Map<Object, Object> mesEntries = gson.fromJson(jsonString, new TypeToken<Map<Object, Object>>() { }.getType()); if (mesEntries == null) return null;
ArrayList<Message> messageList = new ArrayList<>(); Collection<Object> mesValuesJson = mesEntries.values(); for (Object mesValueJson : mesValuesJson) { String mesValuesJsonStr = (String) mesValueJson; Message message = gson.fromJson(mesValuesJsonStr, Message.class); if (message.getReceiverId().equals(receiverId)) { messageList.add(message); } }
return messageList; }
|
实现点击进入对应好友聊天窗口
- 在好友私聊页面,我们点击私聊,就会携带该该好友id,跳转至聊天大厅
1 2 3
| <template #actions> <a-button size="large" type="primary" ghost @click="goToTab(item)">私聊</a-button> </template>
|
1 2 3 4 5 6 7 8 9 10
| const goToTab = (item: any) => { router.push({ name: "chat", path: "/chat", query: { chatTabName: item.id, } }) }
|
- 在聊天大厅处,我们随时监听聊天窗口 activeKey 的值的变化:(2023/09/30午)
1 2 3 4
| watch(activeKey, (value) => { localStorage.setItem('activeKey', value); });
|
1 2 3 4 5 6
| if (chatTabName.value === undefined) { activeKey.value = localStorage.getItem('activeKey') } else { activeKey.value = chatTabName.value.chatTabName; }
|
同时也保证了在刷新页面后,仍然选中之前选中的 tab页聊天窗口(2023/09/30午)
如果是从用户中心或者博文社区直接进入聊天大厅,则默认选中文件传输助手聊天窗口,极大提升用户体验
1 2
| const activeKey = ref(currentUserId);
|
1 2 3 4 5 6
| <!--文件传输助手--> <a-tab-pane v-model:activeKey="activeKey" :key="currentUserId" tab="文件传输助手" @click="handleTabChange"> <div class="msgWindow"> <h1>收发文件,您的得力助手</h1> </div>
|
踩坑记录
RedisTemplate注入失败
这绝对是我踩过最令人抓狂的坑了,入上述代码所示,在@onMessage方法中获取到用户发来的消息后,服务器需要处理消息
我的处理方法是:以发送者 senderId 和接收者 receiverId 为key,分别将消息存储到 Redis 中,设置过期时间
这里就遇到了这个问题:RedisTemplate注入失败
我的初始代码如下:
1 2
| @Resource private RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
|
1 2 3 4
| // 存储信息到Redis 30min ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.set("memory:user:message:" + senderId, message, 30, TimeUnit.MINUTES); ops.set("memory:user:message:" + receiverId, message, 30, TimeUnit.MINUTES);
|
看起来是很正常的依赖注入,但是却报了如下错误:(2023/09/13晚)
1 2 3 4 5
| 本质原因:spring管理的都是单例(singleton),和 websocket(多对象)相冲突。
详细解释:项目启动时初始化,会初始化 websocket (非用户连接的),spring 同时会为其注入 mapper,该对象的 mapper不是 null,被成功注入。但是,由于 spring 默认管理的是单例,所以只会注入一次 mapper。当新用户进入聊天时,系统又会创建一个新的 websocket 对象,这时矛盾出现了:spring 管理的都是单例,不会给第二个 websocket 对象注入 mapper,所以导致只要是用户连接创建的 websocket 对象,都不能再注入了。
像 controller 里面有 service, service 里面有 dao。因为 controller,service ,dao 都有是单例,所以注入时不会报 null。但是 websocket 不是单例,所以使用spring注入一次后,后面的对象就不会再注入了,会报null。
|
1 2 3 4 5 6 7
| private static RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
@Resource public void setRedisTemplate(RedisTemplate redisTemplate) { WebSocketServer.redisTemplate = redisTemplate; }
|
可以看到,本地 Redis 中已经成功地存储了用户消息,redisTemplate 注入成功,问题解决:(2023/09/13晚)
总结