golang实现mediasoup的tcp服务及channel通道
tcp模块
定义相关类
- Client:表示客户端连接,包含网络连接conn、指向服务器的指针Server和Channel指针c。
- server:表示TCP服务器,包含服务器地址address、TLS配置config以及三个回调函数: - onNewClientCallback:新客户端连接时调用。- onClientConnectionClosed:客户端连接关闭时调用。- onNewMessage:客户端接收新消息时调用。
客户端相关接口
- Client.listen():客户端监听方法,读取连接数据,调用onNewMessage回调。
- Client.Send(message string):发送文本消息给客户端。
- Client.SendBytes(b []byte):发送字节数据给客户端。
- Client.Conn():获取客户端的网络连接。
- Client.Close():关闭客户端的网络连接。
服务器相关接口
- server.OnNewClient(callback func(c *Client)):设置新客户端连接的回调。
- server.OnClientConnectionClosed(callback func(c *Client, err error)):设置客户端连接关闭的回调。
- server.OnNewMessage(callback func(c *Client, message []byte, size int)):设置客户端接收新消息的回调。
- server.Listen():启动网络服务器,监听连接。
服务器初始化
- NewTcpServer(address string) *server:创建新的TCP服务器实例,不使用TLS。
- NewTcpServerWithTLS(address, certFile, keyFile string) *server:创建带有TLS功能的TCP服务器实例。
服务器启动流程
- 使用NewTcpServer或NewTcpServerWithTLS创建服务器实例。
- 设置回调函数,响应新客户端连接、客户端连接关闭和接收新消息事件。
- 调用server.Listen()开始监听连接。
TLS支持
- 如果需要TLS,使用NewTcpServerWithTLS函数,提供证书和密钥文件路径。
参考demo
package MediasoupLib
import("bufio""time""crypto/tls""log""net")// Client holds info about connectiontype Client struct{
conn net.Conn
Server *server
c *Channel
}// TCP servertype server struct{
address string// Address to open connection: localhost:9999
config *tls.Config
onNewClientCallback func(c *Client)
onClientConnectionClosed func(c *Client, err error)
onNewMessage func(c *Client, message []byte, size int)}// Read client data from channelfunc(c *Client)listen(){
fmt.Printf("tcp client listen() ")
c.Server.onNewClientCallback(c)
reader := bufio.NewReader(c.conn)for{
recv :=make([]byte,1500)//MTU 1500
size, err := reader.Read(recv)if err !=nil{
c.conn.Close()
c.Server.onClientConnectionClosed(c, err)
fmt.Printf("tcp client close! %s", err.Error())return}if size ==0{
time.Sleep(time.Millisecond *250)
fmt.Printf("tcp client recv size=0")continue}
recv = recv[0:size]
c.Server.onNewMessage(c, recv, size)}}// Send text message to clientfunc(c *Client)Send(message string)error{return c.SendBytes([]byte(message))}// Send bytes to clientfunc(c *Client)SendBytes(b []byte)error{_, err := c.conn.Write(b)if err !=nil{
c.conn.Close()
c.Server.onClientConnectionClosed(c, err)}return err
}func(c *Client)Conn() net.Conn {return c.conn
}func(c *Client)Close()error{return c.conn.Close()}// Called right after server starts listening new clientfunc(s *server)OnNewClient(callback func(c *Client)){
s.onNewClientCallback = callback
}// Called right after connection closedfunc(s *server)OnClientConnectionClosed(callback func(c *Client, err error)){
s.onClientConnectionClosed = callback
}// Called when Client receives new messagefunc(s *server)OnNewMessage(callback func(c *Client, message []byte, size int)){
s.onNewMessage = callback
}// Listen starts network serverfunc(s *server)Listen(){var listener net.Listener
var err errorif s.config ==nil{
listener, err = net.Listen("tcp", s.address)}else{
listener, err = tls.Listen("tcp", s.address, s.config)}if err !=nil{
fmt.Printf("Error starting TCP server.\r\n", err)}defer listener.Close()for{
conn, err := listener.Accept()if err !=nil{
fmt.Printf("tcpserver listner Accept error:%s", err.Error())}
client :=&Client{
conn: conn,
Server: s,}go client.listen()}}// Creates new tcp server instancefuncNewTcpServer(address string)*server {
fmt.Printf("Creating server with address %s", address)
server :=&server{
address: address,}
server.OnNewClient(func(c *Client){
c.c =NewChannel(c)})
server.OnNewMessage(func(c *Client, message []byte, size int){
c.c.onData(c, message, size)})
server.OnClientConnectionClosed(func(c *Client, err error){
c.c.Close()
fmt.Printf("OnClientConnectionClosed err = %s", err.Error())})return server
}funcNewTcpServerWithTLS(address, certFile, keyFile string)*server {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)if err !=nil{
fmt.Printf("Error loading certificate files. Unable to create TCP server with TLS functionality.\r\n", err)}
config :=&tls.Config{
Certificates:[]tls.Certificate{cert},}
server :=NewTcpServer(address)
server.config = config
return server
}
channel模块
ChannelListener接口定义:
- ChannelListener:定义了一个接口,包含两个方法OnChannelEvent和OnChannelStringEvent,用于监听通道事件。
Channel结构体:
- 包含字段如MediasoupClient(指向Client的指针)、PendingSent(一个同步映射,用于存储待发送的数据)、LastBinaryNotification、ID、Pid、udpAddress、udpPort、queue和messageQueue(两个循环队列)、Num和Listeners(一个映射,存储监听器)。
- 包含一个互斥锁mutex,用于并发控制。
Channel的接口:
- NewChannel:构造函数,创建并返回一个新的Channel实例。
- AddListener和RemoveListener:用于添加和移除监听器。
- processMessage:处理接收到的消息。
- onData:处理接收到的数据。
- handle:一个循环,从队列中取出项目并处理。
- handleMessage:处理消息队列中的消息。
- process:根据消息类型进行不同的处理。
- Close:关闭通道,清理资源。
- Request:发送请求并返回一个通道用于接收响应。
- SetUdp:设置UDP地址和端口。
并发处理:
- 使用sync.Map和sync.RWMutex来处理并发,确保数据的一致性和线程安全。
循环队列:
- 使用MeetGo.CycleQueue作为循环队列,用于存储消息和数据。
参考demo
import("encoding/json""fmt""strconv""sync""time""strings")//###########SendRequest begin############/var REQUEST_TIMEOUT =30000type SendRequest struct{
ID string
Method string
Internal map[string]interface{}
Data map[string]interface{}}//async chan SendReponsetype SendReponse struct{
ID int
TargetId int
Event string
Accepted bool
Rejected bool
Internal map[string]interface{}
Data map[string]interface{}
Reason string
Binary bool}type AsyncSingal struct{
Async chan SendReponse
}###########SendRequest End############/////###########CycleQueue begin############//type CycleQueue struct{
data []interface{}//存储空间
front int//前指针,前指针负责弹出数据移动
rear int//尾指针,后指针负责添加数据移动capint//设置切片最大容量}funcNewCycleQueue(capint)*CycleQueue {return&CycleQueue{
data:make([]interface{},cap),cap:cap,
front:0,
rear:0,}}//入队操作//判断队列是否队满,队满则不允许添加数据func(q *CycleQueue)Push(data interface{})bool{//check queue is fullif(q.rear+1)%q.cap== q.front {//队列已满时,不执行入队操作returnfalse}
q.data[q.rear]= data //将元素放入队列尾部
q.rear =(q.rear +1)% q.cap//尾部元素指向下一个空间位置,取模运算保证了索引不越界(余数一定小于除数)returntrue}//出队操作//需要考虑: 队队为空没有数据返回了func(q *CycleQueue)Pop()interface{}{if q.rear == q.front {returnnil}
data := q.data[q.front]
q.data[q.front]=nil
q.front =(q.front +1)% q.capreturn data
}//因为是循环队列, 后指针减去前指针 加上最大值, 然后与最大值 取余func(q *CycleQueue)QueueLength()int{return(q.rear - q.front + q.cap)% q.cap}func(q *CycleQueue)FindDataByRequestId(requestId string)string{for i :=0; i < q.QueueLength(); i++{if strings.Count(q.data[i].(string), requestId)==1{
emitData := q.data[i].(string)
q.data =append(q.data[:i], q.data[i+1:]...)return emitData
}}return""}///###########CycleQueue############import("encoding/json""fmt""strconv""sync""time"
MeetGo "vrv.meeting.server/MeetGo")const NS_MAX_SIZE int=655350var messageBuffer =make([]byte, NS_MAX_SIZE)var messageIndex =0type ChannelListener interface{OnChannelEvent(string,map[string]interface{})OnChannelStringEvent(string,string)}type Channel struct{
MediasoupClient *Client
PendingSent sync.Map
LastBinaryNotification interface{}
ID int
Pid int
udpAddress string
udpPort int
queue CycleQueue
messageQueue CycleQueue
Num int
Listeners map[string]ChannelListener
mutex sync.RWMutex
}funcNewChannel(tcpClient *Client)*Channel {
channel :=new(Channel)
channel.MediasoupClient = tcpClient
channel.queue = MeetGo.NewCycleQueue(1000)
channel.messageQueue = MeetGo.NewCycleQueue(10000)
channel.Num =0
channel.Listeners =make(map[string]ChannelListener,100)go channel.handle()go channel.handleMessage()return channel
}func(channel *Channel)AddListener(id string, listener ChannelListener){
channel.mutex.Lock()
channel.Listeners[id]= listener
channel.mutex.Unlock()}func(channel *Channel)RemoveListener(id string){
channel.mutex.Lock()delete(channel.Listeners, id)
channel.mutex.Unlock()}func(channel *Channel)processMessage(message string){
jsonMessage :=make(map[string]interface{})
err := json.Unmarshal([]byte(message),&jsonMessage)if err !=nil{
MeetGo.Log.Error("Channel processMessage error:%s", err.Error())return}if jsonMessage["registId"]!=nil{
MeetGo.Log.Debug("client registId succeeded [id:%s]", jsonMessage["registId"].(string))
channel.ID,_= strconv.Atoi(jsonMessage["registId"].(string))
channel.Pid =int(jsonMessage["pid"].(float64))
Global_Worker.OnMediasoupWorkerOnline(channel.ID, channel, jsonMessage["registId"].(string))}elseif jsonMessage["id"]!=nil{
idd :=int(jsonMessage["id"].(float64))
value, ret := channel.PendingSent.Load(idd)if!ret {
fmt.Printf("received Response does not match any sent Request")return}
channel.PendingSent.Delete(idd)
asyncReponse := value.(*MeetGo.AsyncSingal)if jsonMessage["accepted"]!=nil&& jsonMessage["accepted"].(bool){
MeetGo.Log.Debug("request succeeded [id:%d]",int(jsonMessage["id"].(float64)))
sendReponse := MeetGo.SendReponse{
ID: idd,
Accepted: jsonMessage["accepted"].(bool),
Data: jsonMessage["data"].(interface{}).(map[string]interface{}),}
asyncReponse.Async <- sendReponse
}else{
MeetGo.Log.Debug("request failed [id:%d, reason: %s]",int(jsonMessage["id"].(float64)), jsonMessage["reason"].(string))
sendReponse := MeetGo.SendReponse{
ID:int(jsonMessage["id"].(float64)),
Reason: jsonMessage["reason"].(string),}
asyncReponse.Async <- sendReponse
}}elseif jsonMessage["targetId"]!=nil&& jsonMessage["event"]!=nil{if jsonMessage["binary"]!=nil{
channel.LastBinaryNotification = jsonMessage
return}elseif jsonMessage["data"]!=nil{
listenerKey := fmt.Sprintf("%d",int(jsonMessage["targetId"].(float64)))
channel.mutex.RLock()
listener := channel.Listeners[listenerKey]
channel.mutex.RUnlock()if listener !=nil{
listener.OnChannelEvent(jsonMessage["event"].(string), jsonMessage["data"].(map[string]interface{}))}}else{
data :=make(map[string]interface{})
listenerKey := fmt.Sprintf("%d",int(jsonMessage["targetId"].(float64)))
channel.mutex.RLock()
listener := channel.Listeners[listenerKey]
channel.mutex.RUnlock()if listener !=nil{
listener.OnChannelEvent(jsonMessage["event"].(string), data)}}}else{
fmt.Printf("received message is not a Response nor a Notification")return}}func(channel *Channel)onData(client *Client, message []byte, size int){for{
ret := channel.messageQueue.Push(message)if ret {break}else{
time.Sleep(40* time.Millisecond)}}}func(channel *Channel)handle(){for{
item := channel.queue.Pop()if item ==nil{
time.Sleep(40* time.Millisecond)continue}
channel.process(item)
time.Sleep(1* time.Millisecond)}}func(channel *Channel)handleMessage(){
ns := NetString{bufLen:0, length:0, state:0}for{
item := channel.messageQueue.Pop()if item ==nil{
time.Sleep(40* time.Millisecond)continue}
message := item.([]byte)var nsPayloads [][]byte
err := ns.NsUnmarshal(message,&nsPayloads)if err !=nil{
fmt.Printf("Channel handleMessage nsPayload error %s", err.Error())return}for_, nsPayload :=range nsPayloads {
channel.queue.Push(nsPayload)}
time.Sleep(1* time.Millisecond)}}func(channel *Channel)process(data interface{}){
nsPayload := data.([]byte)if channel.LastBinaryNotification ==nil{switch nsPayload[0]{// 123 = '{' (a Channel JSON messsage).case123:
channel.processMessage(string(nsPayload))break// 68 = 'D' (a debug log).case68:
fmt.Printf(string(nsPayload))break// 87 = 'W' (a warning log).case87:
fmt.Printf(string(nsPayload))break// 69 = 'E' (an error log).case69:
fmt.Printf(string(nsPayload))breakdefault:
fmt.Printf("unexpected data: %s",string(nsPayload))}}else{
msg := channel.LastBinaryNotification
channel.LastBinaryNotification =nil
jsonMsg :=make(map[string]interface{})
err := json.Unmarshal([]byte(msg.(string)),&jsonMsg)if err !=nil{panic(err)}
listenerKey := fmt.Sprintf("%d",int(jsonMsg["targetId"].(float64)))
channel.mutex.RLock()
listener := channel.Listeners[listenerKey]
channel.mutex.RUnlock()if listener !=nil{
listener.OnChannelStringEvent(jsonMsg["event"].(string), jsonMsg["data"].(string))}}}func(channel *Channel)Close(){
channel.PendingSent.Range(func(k, v interface{})bool{
channel.PendingSent.Delete(k)returntrue})
registId := strconv.Itoa(channel.ID)
Global_Worker.OnMediasoupWorkerOffline(registId)
time.Sleep(time.Millisecond *250)//?
fmt.Printf("channel.MediasoupClient.Close() ")
channel.MediasoupClient.Close()}func(c *Channel)Request(method string, internal,
data map[string]interface{})(chan MeetGo.SendReponse,int64){
id :=RandomNumberGenerator(10000000,99999999)
fmt.Printf("MediasoupLib Channel [method:%s, id:%d]", method, id)
request := MeetGo.RequestJson{
ID: id,
Method: method,
Internal: internal,
Data: data,}
requestJson := request.Encode()
requestSend :=nsEncode(requestJson)
fmt.Printf("___requestSend : %s", requestSend)
sendReponse :=new(MeetGo.AsyncSingal)
sendReponse.Async =make(chan MeetGo.SendReponse)if sendReponse !=nil{
c.PendingSent.Store(int(id), sendReponse)}defer c.MediasoupClient.Send(requestSend)return sendReponse.Async, id
}func(channel *Channel)SetUdp(udpAddress string, udpPort int){
channel.udpAddress = udpAddress
channel.udpPort = udpPort
}
tips
更多关于mediasoup的文章可以进入我的专栏查看
http://t.csdnimg.cn/3UQeL
本文转载自: https://blog.csdn.net/LittleBoy_IT/article/details/139220967
版权归原作者 littleboy_webrtc 所有, 如有侵权,请联系我们删除。
版权归原作者 littleboy_webrtc 所有, 如有侵权,请联系我们删除。