前言
在本次寒假软件竞赛中,我们决定为系统添加用户间的实时聊天功能。经过综合考量性能、开发难度以及用户体验等因素,我们最终选择基于 WebSocket 协议来实现这一功能。
为什么是 WebSocket?
WebSocket 是一种基于 TCP 的通信协议,它通过建立一个单一的、持久的连接,实现了客户端与服务器之间的全双工实时通信。相较于传统的 HTTP 协议,WebSocket 在实时通信场景下具有显著优势:
实时双向通信:WebSocket 允许服务器主动向客户端推送消息,而不仅仅是等待客户端的请求。这消除了传统 HTTP 轮询(Polling)方式带来的延迟,使得消息能够即时送达。
高效资源利用:WebSocket 使用单一的、长时间保持的连接,避免了 HTTP 协议每次请求都需要建立和断开连接的开销。这能减少不必要的握手过程,降低服务器的资源消耗,并显著节省带宽。
灵活格式支持:WebSocket 协议本身不对消息格式进行限制,开发者可以根据实际需求选择合适的数据格式,例如 JSON、XML 或 Protobuf 等。
项目实践
为了在 Golang 中处理 WebSocket 协议的请求,我们首先需要引入一个强大的第三方库 gorilla/websocket
。这个库为我们提供了便捷的 WebSocket 操作接口,极大地简化了开发过程。
1
| go get github.com/gorilla/websocket
|
安装完成后,我们需要初始化一个 Upgrader
实例。它的作用是将 HTTP 请求升级为 WebSocket 连接。通过配置缓冲区的大小,我们可以对数据传输的性能进行调优。CheckOrigin
函数则用于跨域安全检查,这里我们简单地允许所有来源,但在实际生产环境中,请务必进行严格的来源验证。
1 2 3 4 5 6 7
| var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(_ *http.Request) bool { return true }, }
|
然后,为了便于管理 WebSocket 连接,我们可以创建一个结构体作为连接管理器。它使用 map
来存储每个用户的连接信息,其中键是用户 ID,值是对应的 WebSocket 连接对象。为了保证多协程并发访问的安全性,我们引入了 sync.RWMutex
读写锁。此外,msgChannel
作为消息队列,用于异步处理消息,而 stop
变量则用于控制服务的优雅停止。
1 2 3 4 5 6
| type ConnectionManager struct { connections map[uint]*websocket.Conn msgChannel chan models.Message mutex sync.RWMutex stop atomic.Bool }
|
ConnectionManager
提供了两个核心方法:registerConnection
和 unregisterConnection
。前者用于注册新的 WebSocket 连接,并从数据库中检索该用户的历史消息,将其推送给客户端,以便用户能够立即看到之前的聊天记录。后者则负责移除已断开的连接,确保连接管理器中只保留有效的连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (cm *ConnectionManager) registerConnection(conn *websocket.Conn, uid uint) { messages, err := messageService.GetMessagesByUser(uid) if err != nil { zap.L().Warn("Error getting messages", zap.Error(err)) }
cm.mutex.Lock() cm.connections[uid] = conn cm.mutex.Unlock() for _, msg := range messages { if err := conn.WriteJSON(msg); err != nil { zap.L().Warn("Error sending history message", zap.Error(err)) } } }
func (cm *ConnectionManager) unregisterConnection(uid uint) { cm.mutex.Lock() delete(cm.connections, uid) cm.mutex.Unlock() }
|
接着,我们编写 handleMessage
方法来进行消息处理,该方法首先将消息保存到数据库,确保数据的持久性,然后将消息推送给发送者和接受者。如果消息发送失败,则记录错误日志并关闭、删除连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (cm *ConnectionManager) handleMessage(message *models.Message) { if err := messageService.CreateMessage(message); err != nil { zap.L().Warn("Error saving message to database", zap.Error(err)) return }
cm.mutex.RLock() receiverConn, exists := cm.connections[message.Receiver] senderConn, senderExists := cm.connections[message.Sender] cm.mutex.RUnlock()
if exists { if err := receiverConn.WriteJSON(message); err != nil { zap.L().Warn("Error writing message", zap.Error(err)) _ = receiverConn.Close() cm.unregisterConnection(message.Receiver) } }
if senderExists { if err := senderConn.WriteJSON(message); err != nil { zap.L().Warn("Error writing message", zap.Error(err)) _ = senderConn.Close() cm.unregisterConnection(message.Sender) } } }
|
最后,我们编写一个 WebSocketController
来处理用户请求。在连接建立后,它会注册连接,并通过一个循环不断读取客户端发送的消息。读取到的消息会被反序列化为 models.Message
结构体,并放入消息队列中进行异步处理。如果读取消息时发生错误,则关闭连接,并从连接管理器中移除该连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func WebSocketController(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { response.AbortWithException(c, apiException.WebSocketError, err) return } defer conn.Close()
cm.registerConnection(conn, uid) for { msgType, msg, err := conn.ReadMessage() if err != nil { cm.unregisterConnection(uid) break }
if msgType == websocket.TextMessage { var message models.Message if err := json.Unmarshal(msg, &message); err != nil { zap.L().Warn("Error unmarshaling message", zap.Error(err)) continue } cm.msgChannel <- message } } }
|
至此,一个超级简单的 WebSocket 服务端就完成了。
总结
这次对 WebSocket 的折腾十分有趣,特别是部署完后亲身试用时,一下就被那极低的延迟所惊艳(你知道的,用 HTTP 协议的话很难做到如此低的延迟)。
当然,由于只是花两天时间草草搓的系统,所以项目中仍有许多可以改进的地方,如支持图片的发送,引入 asynq
等第三方库来完善消息队列等,更多的探索,相信未来会有机会尝试的。
最后,由于篇幅所限,本文删去了许多非核心代码,如果你想参考我的实现,可以直接翻阅原项目代码,希望能对你有所帮助。