0


go 实现websocket以及详细设计流程过程,确保通俗易懂

websocket简介:

WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 协议在 2011 年由 IETF 标准化为 RFC 6455,后由 RFC 7936 补充规范。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

理解各种协议和通信层、套接字的含义

IP:网络层协议;(高速公路)

TCP和UDP:传输层协议;(卡车)

HTTP:应用层协议;(货物)。HTTP(超文本传输协议)是建立在TCP协议之上的一种应用。HTTP连接最显著的特点是客户端发送的每次请求都需要服务器回送响应,在请求结束后,会主动释放连接。从建立连接到关闭连接的过程称为“一次连接”。

SOCKET:套接字,TCP/IP网络的API。(港口码头/车站)Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信。

Websocket:同HTTP一样也是应用层的协议,但是它是一种双向通信协议,是建立在TCP之上的,解决了服务器与客户端全双工通信的问题,包含两部分:一部分是“握手”,一部分是“数据传输”。握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。

*注:什么是单工、半双工、全工通信?

数据只能单向传送为单工;
数据能双向传送但不能同时双向传送称为半双工;
数据能够同时双向传送则称为全双工。

上面是简单的接受了websocket情况以及和其他协议的区别及联系,在做之前,还是要了解下这块,对后期实战有帮助。

websocket开源选择:

在go语言中,websocket组件比较多的,可以到go仓库搜索下:

今天以仓库使用最多一个开源框架(gorilla)进行实战落地,以及讲解整个过程细节

websocket (github.com/gorilla/websocket)

上面介绍完整体开源情况。

业务背景:

  1. 1、数据实时推送到前端进行图形化显示
  2. 2、报警数据需要实时推送各端,例如webcliet、安卓、ios等等其他客户端
  3. 我前几年一直从事的是java,可以看看我的播客,基本上与java有关,go也是最近两三周学习的,我其他播客有具体说明,因为公司业务需要,所有就简单的学了一下go语言,作为刚接触go不久的技术人员,如何面对新技术进行探索和落地的,大家可以跟着我的思路进行学习模仿,这样后期学习稍微比较快一些。

实战:

  1. 1、在实战之前,我们先看看官网使用说明:
  2. ![](https://i-blog.csdnimg.cn/direct/1cfa041721a84f7e82333f8c42ba2857.png)

这是最简单的,方式,没有其他业务,我们如何进行最佳实现呢?还是要看官网:

传送门:websocket/examples/chat at main · gorilla/websocket · GitHub

具体源码不说了,但是里面有几个重要点已经出现了,也是我们要学习的思想,这也是为什么要看开源代码的原因,要学习他们的思想和编码技巧。

稍微解释下Hub结构体里的字段:

  1. // Registered clients.
  2. // 这是保存客户端连接的信息,map,从这里可以看出来,所有的客户端都要保存到这里,这时可以想到,后期保存到redis里,从这里进行扩展即可。
  3. clients map[*Client]bool
  4. // Inbound messages from the clients.
  5. // 这个就是广播数据了,但是demo了用了字节链,目的是了并行执行,提高效率,字节目的是接收所有情况的数据
  6. broadcast chan []byte
  7. // Register requests from the clients.
  8. // 这个比较好理解了,是新客户端进行连接时触发的链条,为什么走链,也是为了并行执行,也就是异步执行
  9. register chan *Client
  10. // Unregister requests from clients.
  11. //这个就是取消注册,也就是关闭客户端连接
  12. unregister chan *Client

其实这几个字段已经把我们的框架整体搭建起来了,clients负责存储客户端,broadcast负责服务端发送给客户端数据的,register负责用来监听新建连接,unregister负责关闭客户端连接的

其实websocket也就是干这个事情的,例如websocket服务端收集所有客户端,然后根据需要进行发送消息给客户,然后就是关闭,大致流程就是这样的。

在看客户端怎么进行封装的:

这个就是针对上面的Hub进行组装结构体实例的,这里就不介绍了,本次实战也是根据这个来的,大家看懂这个基本上后续其他开源的websocket都没啥大问题。

接下来真正进行项目实战:

1、第一步创建websocket的客户端管理结构体:

  1. // clientManager
  2. // @Description: 客户端管理者
  3. type clientManager struct {
  4. //客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可
  5. //这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的key
  6. clients map[*client]bool
  7. //广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理
  8. broadcast chan model.BusinessDataWrapper
  9. //客户端注册链;用chan进行异步处理
  10. register chan *client
  11. //客户端关闭链;用chan进行异步处理
  12. unregister chan *client
  13. }

结构体首字母小写,目的不用暴露出去,用于内部使用即可;每个字段不解释了,上面注释已经写好了,其实和官网是一样的。

2、客户端结构体:

  1. // client
  2. // @Description: 客户端信息
  3. type client struct {
  4. //每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制
  5. key string
  6. //客户端连接对象
  7. socket *websocket.Conn
  8. //数据发送链
  9. send chan []byte
  10. }

3、编写启动方法,其实官网写的很像,稍微进行重构,更加符合当前项目

  1. func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {
  2. for {
  3. select {
  4. case client := <-m.register: //进行连接
  5. m.clients[client] = true
  6. msg := "有一个新连接出现,已连接成功,客户端key:" + client.key
  7. log.Info(ctx, msg)
  8. //发送数据,这块可以不用发送,等后续进行注释即可
  9. //m.send([]byte(msg), client)
  10. case client := <-m.unregister: //关闭连接,进行释放资源
  11. if _, ok := m.clients[client]; ok {
  12. close(client.send)
  13. delete(m.clients, client)
  14. msg := "客户端key:" + client.key + ",已关闭"
  15. log.Info(ctx, msg)
  16. //m.send([]byte(msg), client)
  17. }
  18. case businessDataWrapper := <-m.broadcast: //广播数据
  19. for client := range m.clients {
  20. //这里其实就是发送数据了,
  21. //进行转换成json字符串
  22. //businessDataJsonStr, _ := json.Marshal(businessDataWrapper)
  23. //broadCastSend(m, client, businessDataJsonStr)
  24. //进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息
  25. callBackFunc(client, m, businessDataWrapper)
  26. }
  27. }
  28. }
  29. }
  30. // 广播进行发送数据
  31. func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {
  32. select {
  33. case client.send <- businessDataJsonByte:
  34. default:
  35. fmt.Println("关闭连接了,,,,,")
  36. close(client.send)
  37. delete(manager.clients, client)
  38. }
  39. }

4、某一个客户端群发给其他客户端,排除自己客户端

  1. //发送数据,这快类似群发消息
  2. func (m *clientManager) send(message []byte, ignore *client) {
  3. for client := range m.clients {
  4. //ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了
  5. if client != ignore {
  6. //将数据写入到通道链
  7. client.send <- message
  8. }
  9. }
  10. }

5、推送数据,第四步是写入到发送通道里,还没真正发送,这块就是真正推送到客户端机制:

  1. func (c *client) write(manager clientManager) {
  2. defer func() {
  3. manager.unregister <- c
  4. c.socket.Close()
  5. log.Info(ctx, c.key, "客户端进行关闭")
  6. }()
  7. for {
  8. select {
  9. //如果客户端有数据要进行写出去
  10. case message, ok := <-c.send:
  11. if !ok {
  12. c.socket.WriteMessage(websocket.CloseMessage, []byte{})
  13. log.Info(ctx, c.key, "发送关闭提示")
  14. return
  15. }
  16. //这里才是真正的把数据推送到客户端
  17. err := c.socket.WriteMessage(websocket.TextMessage, message)
  18. if err != nil {
  19. manager.unregister <- c
  20. c.socket.Close()
  21. log.Info(ctx, c.key, "数据写入失败,进行关闭!")
  22. break
  23. }
  24. }
  25. }
  26. }

6、接收客户端发送的数据

  1. func (c *client) read(manager clientManager) {
  2. defer func() {
  3. manager.unregister <- c
  4. c.socket.Close()
  5. log.Info(ctx, c.key, "客户端进行关闭")
  6. }()
  7. for {
  8. _, message, err := c.socket.ReadMessage()
  9. if err != nil {
  10. manager.unregister <- c
  11. c.socket.Close()
  12. log.Info(ctx, c.key, "读数据出现异常,直接关闭。")
  13. break
  14. }
  15. //后期可以注释掉
  16. log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))
  17. //读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可
  18. }
  19. }

7、提供创建客户端管理函数

  1. // 创建客户端管理器
  2. func newClientManager() *clientManager {
  3. return &clientManager{
  4. //广播数据,model.BusinessDataWrapper是我具体业务数据,可以换成[]byte接收
  5. broadcast: make(chan model.BusinessDataWrapper),
  6. register: make(chan *client),
  7. unregister: make(chan *client),
  8. clients: make(map[*client]bool),
  9. }
  10. }

以上就是整体的封装处理,可以看到和官方的demo很像,只是结合了一些业务场景而已,其他的都一样的。

接下来进行和业务进行集成:

1、创建管理器,上面也说了有两个业务场景, 一个是原始数据推送 ,另一个是报警数据推送,所以创建两个管理器出来:

  1. // 原始ws客户端管理器
  2. var rowDataManagerNew = newClientManager()
  3. // 报警数据ws客户端管理器
  4. var alarmDataManagerNew = newClientManager()

2、我们注册路由上,通过上面也能推断,需要指定两个路由路径

  1. // 初始化websocket协议配置
  2. var upgrader = websocket.Upgrader{
  3. ReadBufferSize: 1024,
  4. WriteBufferSize: 1024,
  5. CheckOrigin: func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
  6. }
  7. // registerRawDataClientConn
  8. //
  9. // @Author zhaosy
  10. // @Description: 注册原始数据客户端连接
  11. // @date 2024-07-16 18:12:19
  12. func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {
  13. if lang.IsEmpty(businessType) {
  14. io.WriteString(w, "businessType 不能为空")
  15. }
  16. //生成客户端
  17. conn, err := upgrader.Upgrade(w, r, nil)
  18. if err != nil {
  19. log.Error(ctx, err.Error(), err)
  20. io.WriteString(w, "这是一个websocket连接,不是API.")
  21. return
  22. }
  23. clientId := guid.S()
  24. //这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈
  25. key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)
  26. //初始化客户端对象
  27. client := &client{
  28. key: key,
  29. socket: conn,
  30. send: make(chan []byte),
  31. }
  32. rowDataManagerNew.register <- client
  33. //开启读
  34. go client.read(*rowDataManagerNew)
  35. //开起写
  36. go client.write(*rowDataManagerNew)
  37. }
  38. // registerAlarmClient
  39. //
  40. // @Author zhaosy
  41. // @Description: 注册报警客户端连接
  42. // @date 2024-07-16 19:40:52
  43. func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {
  44. //生成客户端
  45. conn, err := upgrader.Upgrade(w, r, nil)
  46. if err != nil {
  47. log.Error(ctx, err.Error(), err)
  48. io.WriteString(w, "这是一个websocket,不是API.")
  49. return
  50. }
  51. clientId := guid.S()
  52. //这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈
  53. key := websocketAlarmCachePrefix(clientId)
  54. //初始化客户端对象
  55. client := &client{
  56. key: key,
  57. socket: conn,
  58. send: make(chan []byte),
  59. }
  60. alarmDataManagerNew.register <- client
  61. //开启读
  62. go client.read(*alarmDataManagerNew)
  63. //开启写
  64. go client.write(*alarmDataManagerNew)
  65. }

上面看到

  1. w http.ResponseWriter, r *http.Request 这两个参数应该就知道怎么做了吧,直接绑定到路由路由,也是官网那种方式

这是注册到go的http路由上了,后续通过path路径进行访问即可。

3、如何与我们的业务数据进行绑定?

还需要提供包函数出去

  1. // SendRowDataBusinessData
  2. //
  3. // @Author zhaosy
  4. // @Description: 接收业务数据进行推送到websocket
  5. // @date 2024-07-16 18:19:46
  6. func SendRowDataBusinessData(data model.BusinessData) {
  7. rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
  8. }
  9. // SendAlarmBusinessData
  10. //
  11. // @Author zhaosy
  12. // @Description: 接收报警数据,推送到websocket客户端,这是我项目的业务,大家换成string即可
  13. // @date 2024-07-16 19:42:13
  14. func SendAlarmBusinessData(data model.AlarmBusinessData) {
  15. alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
  16. }

仅供参考。

这里是业务数据推送websocket的入口:

4、最后一步是管理器要启动了,启动前,大家知道我写了 回调函数,要进行实现下,具体业务了,所以大家参考即可:

  1. func init() {
  2. //启动原始数据websocket
  3. go rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
  4. if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送
  5. businessData := businessDataWrapper.BusinessData
  6. if businessData.BusinessId == "" {
  7. //进行转换成json字符串
  8. businessDataJsonStr, _ := json.Marshal(businessData)
  9. //广播所有客户端
  10. broadCastSend(manager, c, businessDataJsonStr)
  11. return true
  12. }
  13. //找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
  14. if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {
  15. //进行转换成json字符串
  16. businessDataJsonStr, _ := json.Marshal(businessData)
  17. //广播指定客户端
  18. broadCastSend(manager, c, businessDataJsonStr)
  19. return true
  20. }
  21. return false
  22. }
  23. return false
  24. })
  25. //启动报警数据推送
  26. go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
  27. // 报警数据,推送
  28. if consts.ONE == businessDataWrapper.DataType {
  29. alarmBusinessData := businessDataWrapper.AlarmBusinessData
  30. if alarmBusinessData.BusinessId != "" {
  31. //找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
  32. if strings.Contains(c.key, websocketAlarmCachePrefix("")) {
  33. //进行转换成json字符串
  34. alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)
  35. //广播指定客户端
  36. broadCastSend(manager, c, alarmBusinessDataJsonStr)
  37. return true
  38. }
  39. }
  40. }
  41. return false
  42. })
  43. }

这样就可以了,启动整体就没问题了,我这边用的是goframe框架,所以我单独提供了这两个:

  1. func NewWs() *webSocket {
  2. return &webSocket{}
  3. }
  4. type webSocket struct {
  5. }
  6. // RawDataWSHandle
  7. //
  8. // @Author zhaosy
  9. // @Description: 原始数据websocket
  10. // @date 2024-07-16 15:00:04
  11. func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {
  12. //获取参数
  13. businessType := r.Get("businessType")
  14. businessId := r.Get("businessId")
  15. userName := r.Get("userName")
  16. registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
  17. }
  18. // AlarmWSHandle
  19. //
  20. // @Author zhaosy
  21. // @Description: 报警websocket处理器
  22. // @date 2024-07-16 19:43:57
  23. func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {
  24. registerAlarmClientConn(r.Response.BufferWriter, r.Request)
  25. }

在goframe里cmd里进行绑定:

  1. //websocket--原始数据websocket推送
  2. s.BindHandler("/ws/rowdata/{businessType}/{businessId}/{userName}", websocket.NewWs().RawDataWSHandle)
  3. //websocket--报警数据websocket推送
  4. s.BindHandler("/ws/alarm", websocket.NewWs().AlarmWSHandle)

5、进行测试:

启动正常,日志也输出来了,进行测试

上面是连接正常,

通过业务数据进行测试:

以上就是本次研究的结果,整体上go的websocket比较简单,后面有机会,会重新进行重构,重构单独封装可以随意使用。

发一个整的代码:

  1. // Package websocket
  2. // @Author zhaosy
  3. // @Date 2024/7/16 下午2:52:00
  4. // @Desc websocket相关
  5. package websocket
  6. import (
  7. "context"
  8. "encoding/json"
  9. "fmt"
  10. "github.com/gogf/gf/v2/frame/g"
  11. "github.com/gogf/gf/v2/net/ghttp"
  12. "github.com/gogf/gf/v2/util/guid"
  13. "github.com/gorilla/websocket"
  14. "io"
  15. "net/http"
  16. "skynet/internal/consts"
  17. "skynet/internal/model"
  18. "skynet/utility/lang"
  19. "strings"
  20. )
  21. var (
  22. ctx = context.TODO()
  23. log = g.Log()
  24. )
  25. func init() {
  26. //启动原始数据websocket
  27. go rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
  28. if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送
  29. businessData := businessDataWrapper.BusinessData
  30. if businessData.BusinessId == "" {
  31. //进行转换成json字符串
  32. businessDataJsonStr, _ := json.Marshal(businessData)
  33. //广播所有客户端
  34. broadCastSend(manager, c, businessDataJsonStr)
  35. return true
  36. }
  37. //找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
  38. if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {
  39. //进行转换成json字符串
  40. businessDataJsonStr, _ := json.Marshal(businessData)
  41. //广播指定客户端
  42. broadCastSend(manager, c, businessDataJsonStr)
  43. return true
  44. }
  45. return false
  46. }
  47. return false
  48. })
  49. //启动报警数据推送
  50. go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
  51. // 报警数据,推送
  52. if consts.ONE == businessDataWrapper.DataType {
  53. alarmBusinessData := businessDataWrapper.AlarmBusinessData
  54. if alarmBusinessData.BusinessId != "" {
  55. //找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
  56. if strings.Contains(c.key, websocketAlarmCachePrefix("")) {
  57. //进行转换成json字符串
  58. alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)
  59. //广播指定客户端
  60. broadCastSend(manager, c, alarmBusinessDataJsonStr)
  61. return true
  62. }
  63. }
  64. }
  65. return false
  66. })
  67. }
  68. func NewWs() *webSocket {
  69. return &webSocket{}
  70. }
  71. // websocketRowDataCachePrefix
  72. //
  73. // @Author zhaosy
  74. // @Description: 原始数据缓存前缀
  75. // @date 2024-07-16 18:15:55
  76. func websocketRowDataCachePrefix(businessType, businessId, userName, clientId string) string {
  77. key := "websocket:rowdata"
  78. if lang.IsNotEmpty(businessType) {
  79. key = key + ":" + businessType
  80. if lang.IsNotEmpty(businessId) {
  81. key = key + ":" + businessId
  82. if lang.IsNotEmpty(userName) {
  83. key = key + ":" + userName
  84. }
  85. }
  86. }
  87. if lang.IsNotEmpty(clientId) {
  88. key = key + ":" + clientId
  89. }
  90. return key
  91. }
  92. // websocketAlarmCachePrefix
  93. //
  94. // @Author zhaosy
  95. // @Description: 报警数据前缀
  96. // @date 2024-07-16 19:36:24
  97. func websocketAlarmCachePrefix(clientId string) string {
  98. //后期要加组织机构,有权限控制这块,需要进行处理,暂时先不去处理
  99. key := "websocket:alarm"
  100. if lang.IsNotEmpty(clientId) {
  101. key = key + ":" + clientId
  102. }
  103. return key
  104. }
  105. type webSocket struct {
  106. }
  107. // RawDataWSHandle
  108. //
  109. // @Author zhaosy
  110. // @Description: 原始数据websocket
  111. // @date 2024-07-16 15:00:04
  112. func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {
  113. //获取参数
  114. businessType := r.Get("businessType")
  115. businessId := r.Get("businessId")
  116. userName := r.Get("userName")
  117. registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
  118. }
  119. // AlarmWSHandle
  120. //
  121. // @Author zhaosy
  122. // @Description: 报警websocket处理器
  123. // @date 2024-07-16 19:43:57
  124. func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {
  125. registerAlarmClientConn(r.Response.BufferWriter, r.Request)
  126. }
  127. // SendRowDataBusinessData
  128. //
  129. // @Author zhaosy
  130. // @Description: 接收业务数据进行推送到websocket
  131. // @date 2024-07-16 18:19:46
  132. func SendRowDataBusinessData(data model.BusinessData) {
  133. rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
  134. }
  135. // SendAlarmBusinessData
  136. //
  137. // @Author zhaosy
  138. // @Description: 接收报警数据,推送到websocket客户端
  139. // @date 2024-07-16 19:42:13
  140. func SendAlarmBusinessData(data model.AlarmBusinessData) {
  141. alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
  142. }
  143. // 初始化websocket协议配置
  144. var upgrader = websocket.Upgrader{
  145. ReadBufferSize: 1024,
  146. WriteBufferSize: 1024,
  147. CheckOrigin: func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
  148. }
  149. // registerRawDataClientConn
  150. //
  151. // @Author zhaosy
  152. // @Description: 注册原始数据客户端连接
  153. // @date 2024-07-16 18:12:19
  154. func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {
  155. if lang.IsEmpty(businessType) {
  156. io.WriteString(w, "businessType 不能为空")
  157. }
  158. //生成客户端
  159. conn, err := upgrader.Upgrade(w, r, nil)
  160. if err != nil {
  161. log.Error(ctx, err.Error(), err)
  162. io.WriteString(w, "这是一个websocket连接,不是API.")
  163. return
  164. }
  165. clientId := guid.S()
  166. key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)
  167. //初始化客户端对象
  168. client := &client{
  169. key: key,
  170. socket: conn,
  171. send: make(chan []byte),
  172. }
  173. rowDataManagerNew.register <- client
  174. //开启读
  175. go client.read(*rowDataManagerNew)
  176. //开起写
  177. go client.write(*rowDataManagerNew)
  178. }
  179. // registerAlarmClient
  180. //
  181. // @Author zhaosy
  182. // @Description: 注册报警客户端连接
  183. // @date 2024-07-16 19:40:52
  184. func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {
  185. //生成客户端
  186. conn, err := upgrader.Upgrade(w, r, nil)
  187. if err != nil {
  188. log.Error(ctx, err.Error(), err)
  189. io.WriteString(w, "这是一个websocket,不是网站.")
  190. return
  191. }
  192. clientId := guid.S()
  193. key := websocketAlarmCachePrefix(clientId)
  194. //初始化客户端对象
  195. client := &client{
  196. key: key,
  197. socket: conn,
  198. send: make(chan []byte),
  199. }
  200. alarmDataManagerNew.register <- client
  201. //开启读
  202. go client.read(*alarmDataManagerNew)
  203. //开启写
  204. go client.write(*alarmDataManagerNew)
  205. }
  206. // 原始ws客户端管理器
  207. var rowDataManagerNew = newClientManager()
  208. // 报警数据ws客户端管理器
  209. var alarmDataManagerNew = newClientManager()
  210. // **********************************以下是websocket进行封装,可以直接使用******************************************
  211. // 创建客户端管理器
  212. func newClientManager() *clientManager {
  213. return &clientManager{
  214. broadcast: make(chan model.BusinessDataWrapper),
  215. register: make(chan *client),
  216. unregister: make(chan *client),
  217. clients: make(map[*client]bool),
  218. }
  219. }
  220. // clientManager
  221. // @Description: 客户端管理者
  222. type clientManager struct {
  223. //客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可
  224. //这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的key
  225. clients map[*client]bool
  226. //广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理
  227. broadcast chan model.BusinessDataWrapper
  228. //客户端注册链;用chan进行异步处理
  229. register chan *client
  230. //客户端关闭链;用chan进行异步处理
  231. unregister chan *client
  232. }
  233. // client
  234. // @Description: 客户端信息
  235. type client struct {
  236. //每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制
  237. key string
  238. //客户端连接对象
  239. socket *websocket.Conn
  240. //数据发送链
  241. send chan []byte
  242. }
  243. // start
  244. //
  245. // @Author zhaosy
  246. // @Description: websocket启动
  247. // @date 2024-07-17 10:55:28
  248. func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {
  249. for {
  250. select {
  251. case client := <-m.register: //进行连接
  252. m.clients[client] = true
  253. msg := "有一个新连接出现,已连接成功,客户端key:" + client.key
  254. log.Info(ctx, msg)
  255. //发送数据,这块可以不用发送,等后续进行注释即可
  256. //m.send([]byte(msg), client)
  257. case client := <-m.unregister: //关闭连接,进行释放资源
  258. if _, ok := m.clients[client]; ok {
  259. close(client.send)
  260. delete(m.clients, client)
  261. msg := "客户端key:" + client.key + ",已关闭"
  262. log.Info(ctx, msg)
  263. //m.send([]byte(msg), client)
  264. }
  265. case businessDataWrapper := <-m.broadcast: //广播数据
  266. for client := range m.clients {
  267. //这里其实就是发送数据了,
  268. //进行转换成json字符串
  269. //businessDataJsonStr, _ := json.Marshal(businessDataWrapper)
  270. //broadCastSend(m, client, businessDataJsonStr)
  271. //进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息
  272. callBackFunc(client, m, businessDataWrapper)
  273. }
  274. }
  275. }
  276. }
  277. // 广播进行发送数据
  278. func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {
  279. select {
  280. case client.send <- businessDataJsonByte:
  281. default:
  282. fmt.Println("关闭连接了,,,,,")
  283. close(client.send)
  284. delete(manager.clients, client)
  285. }
  286. }
  287. // send
  288. //
  289. // @Author zhaosy
  290. // @Description: 这快类似群发消息
  291. // @date 2024-07-16 16:40:37
  292. func (m *clientManager) send(message []byte, ignore *client) {
  293. for client := range m.clients {
  294. //ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了
  295. if client != ignore {
  296. //将数据写入到通道链
  297. client.send <- message
  298. }
  299. }
  300. }
  301. func (c *client) read(manager clientManager) {
  302. defer func() {
  303. manager.unregister <- c
  304. c.socket.Close()
  305. log.Info(ctx, c.key, "客户端进行关闭")
  306. }()
  307. for {
  308. _, message, err := c.socket.ReadMessage()
  309. if err != nil {
  310. manager.unregister <- c
  311. c.socket.Close()
  312. log.Info(ctx, c.key, "读数据出现异常,直接关闭。")
  313. break
  314. }
  315. //后期可以注释掉
  316. log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))
  317. //读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可
  318. }
  319. }
  320. // write
  321. //
  322. // @Author zhaosy
  323. // @Description: 写入数据
  324. // @date 2024-07-16 16:52:47
  325. func (c *client) write(manager clientManager) {
  326. defer func() {
  327. manager.unregister <- c
  328. c.socket.Close()
  329. log.Info(ctx, c.key, "客户端进行关闭")
  330. }()
  331. for {
  332. select {
  333. //如果客户端有数据要进行写出去
  334. case message, ok := <-c.send:
  335. if !ok {
  336. c.socket.WriteMessage(websocket.CloseMessage, []byte{})
  337. log.Info(ctx, c.key, "发送关闭提示")
  338. return
  339. }
  340. //这里才是真正的把数据推送到客户端
  341. err := c.socket.WriteMessage(websocket.TextMessage, message)
  342. if err != nil {
  343. manager.unregister <- c
  344. c.socket.Close()
  345. log.Info(ctx, c.key, "数据写入失败,进行关闭!")
  346. break
  347. }
  348. }
  349. }
  350. }

ok。结束


本文转载自: https://blog.csdn.net/u011410254/article/details/140489357
版权归原作者 云游遍天下 所有, 如有侵权,请联系我们删除。

“go 实现websocket以及详细设计流程过程,确保通俗易懂”的评论:

还没有评论