0


golang实现mediasoup的tcp服务及channel通道

golang实现mediasoup的tcp服务及channel通道

tcp模块

定义相关类

  • Client:表示客户端连接,包含网络连接conn、指向服务器的指针Server和Channel指针c。
  • server:表示TCP服务器,包含服务器地址address、TLS配置config以及三个回调函数: - onNewClientCallback:新客户端连接时调用。- onClientConnectionClosed:客户端连接关闭时调用。- onNewMessage:客户端接收新消息时调用。

客户端相关接口

  • Client.listen():客户端监听方法,读取连接数据,调用onNewMessage回调。
  • Client.Send(message string):发送文本消息给客户端。
  • Client.SendBytes(b []byte):发送字节数据给客户端。
  • Client.Conn():获取客户端的网络连接。
  • Client.Close():关闭客户端的网络连接。

服务器相关接口

  • server.OnNewClient(callback func(c *Client)):设置新客户端连接的回调。
  • server.OnClientConnectionClosed(callback func(c *Client, err error)):设置客户端连接关闭的回调。
  • server.OnNewMessage(callback func(c *Client, message []byte, size int)):设置客户端接收新消息的回调。
  • server.Listen():启动网络服务器,监听连接。

服务器初始化

  • NewTcpServer(address string) *server:创建新的TCP服务器实例,不使用TLS。
  • NewTcpServerWithTLS(address, certFile, keyFile string) *server:创建带有TLS功能的TCP服务器实例。

服务器启动流程

  1. 使用NewTcpServer或NewTcpServerWithTLS创建服务器实例。
  2. 设置回调函数,响应新客户端连接、客户端连接关闭和接收新消息事件。
  3. 调用server.Listen()开始监听连接。

TLS支持

  • 如果需要TLS,使用NewTcpServerWithTLS函数,提供证书和密钥文件路径。

参考demo

package MediasoupLib

import("bufio""time""crypto/tls""log""net")// Client holds info about connectiontype Client struct{
    conn net.Conn
    Server *server
    c *Channel
}// TCP servertype server struct{
    address string// Address to open connection: localhost:9999
    config *tls.Config
    onNewClientCallback func(c *Client)
    onClientConnectionClosed func(c *Client, err error)
    onNewMessage func(c *Client, message []byte, size int)}// Read client data from channelfunc(c *Client)listen(){
    fmt.Printf("tcp client listen() ")
    c.Server.onNewClientCallback(c)

    reader := bufio.NewReader(c.conn)for{
        recv :=make([]byte,1500)//MTU 1500
        size, err := reader.Read(recv)if err !=nil{
            c.conn.Close()
            c.Server.onClientConnectionClosed(c, err)
            fmt.Printf("tcp client close! %s", err.Error())return}if size ==0{
            time.Sleep(time.Millisecond *250)
            fmt.Printf("tcp client recv size=0")continue}
        
        recv = recv[0:size]
        c.Server.onNewMessage(c, recv, size)}}// Send text message to clientfunc(c *Client)Send(message string)error{return c.SendBytes([]byte(message))}// Send bytes to clientfunc(c *Client)SendBytes(b []byte)error{_, err := c.conn.Write(b)if err !=nil{
        c.conn.Close()
        c.Server.onClientConnectionClosed(c, err)}return err
}func(c *Client)Conn() net.Conn {return c.conn
}func(c *Client)Close()error{return c.conn.Close()}// Called right after server starts listening new clientfunc(s *server)OnNewClient(callback func(c *Client)){
    s.onNewClientCallback = callback
}// Called right after connection closedfunc(s *server)OnClientConnectionClosed(callback func(c *Client, err error)){
    s.onClientConnectionClosed = callback
}// Called when Client receives new messagefunc(s *server)OnNewMessage(callback func(c *Client, message []byte, size int)){
    s.onNewMessage = callback
}// Listen starts network serverfunc(s *server)Listen(){var listener net.Listener
    var err errorif s.config ==nil{
        listener, err = net.Listen("tcp", s.address)}else{
        listener, err = tls.Listen("tcp", s.address, s.config)}if err !=nil{
        fmt.Printf("Error starting TCP server.\r\n", err)}defer listener.Close()for{

        conn, err := listener.Accept()if err !=nil{
            fmt.Printf("tcpserver listner Accept error:%s", err.Error())}

        client :=&Client{
            conn: conn,
            Server: s,}go client.listen()}}// Creates new tcp server instancefuncNewTcpServer(address string)*server {
    fmt.Printf("Creating server with address %s", address)
    server :=&server{
        address: address,}

    server.OnNewClient(func(c *Client){
        c.c =NewChannel(c)})

    server.OnNewMessage(func(c *Client, message []byte, size int){
        c.c.onData(c, message, size)})

    server.OnClientConnectionClosed(func(c *Client, err error){
        c.c.Close()
        fmt.Printf("OnClientConnectionClosed   err = %s", err.Error())})return server
}funcNewTcpServerWithTLS(address, certFile, keyFile string)*server {
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)if err !=nil{

        fmt.Printf("Error loading certificate files. Unable to create TCP server with TLS functionality.\r\n", err)}

    config :=&tls.Config{

        Certificates:[]tls.Certificate{cert},}

    server :=NewTcpServer(address)
    server.config = config
    return server

}

channel模块

ChannelListener接口定义:

  • ChannelListener:定义了一个接口,包含两个方法OnChannelEvent和OnChannelStringEvent,用于监听通道事件。

Channel结构体:

  • 包含字段如MediasoupClient(指向Client的指针)、PendingSent(一个同步映射,用于存储待发送的数据)、LastBinaryNotification、ID、Pid、udpAddress、udpPort、queue和messageQueue(两个循环队列)、Num和Listeners(一个映射,存储监听器)。
  • 包含一个互斥锁mutex,用于并发控制。

Channel的接口:

  • NewChannel:构造函数,创建并返回一个新的Channel实例。
  • AddListener和RemoveListener:用于添加和移除监听器。
  • processMessage:处理接收到的消息。
  • onData:处理接收到的数据。
  • handle:一个循环,从队列中取出项目并处理。
  • handleMessage:处理消息队列中的消息。
  • process:根据消息类型进行不同的处理。
  • Close:关闭通道,清理资源。
  • Request:发送请求并返回一个通道用于接收响应。
  • SetUdp:设置UDP地址和端口。

并发处理:

  • 使用sync.Map和sync.RWMutex来处理并发,确保数据的一致性和线程安全。

循环队列:

  • 使用MeetGo.CycleQueue作为循环队列,用于存储消息和数据。

参考demo

import("encoding/json""fmt""strconv""sync""time""strings")//###########SendRequest begin############/var REQUEST_TIMEOUT =30000type SendRequest struct{
    ID       string
    Method   string
    Internal map[string]interface{}
    Data     map[string]interface{}}//async chan SendReponsetype SendReponse struct{
    ID       int
    TargetId int
    Event    string
    Accepted bool
    Rejected bool
    Internal map[string]interface{}
    Data     map[string]interface{}
    Reason   string
    Binary   bool}type AsyncSingal struct{
    Async chan SendReponse
}###########SendRequest End############/////###########CycleQueue begin############//type CycleQueue struct{
    data  []interface{}//存储空间
    front int//前指针,前指针负责弹出数据移动
    rear  int//尾指针,后指针负责添加数据移动capint//设置切片最大容量}funcNewCycleQueue(capint)*CycleQueue {return&CycleQueue{
        data:make([]interface{},cap),cap:cap,
        front:0,
        rear:0,}}//入队操作//判断队列是否队满,队满则不允许添加数据func(q *CycleQueue)Push(data interface{})bool{//check queue is fullif(q.rear+1)%q.cap== q.front {//队列已满时,不执行入队操作returnfalse}
    q.data[q.rear]= data         //将元素放入队列尾部
    q.rear =(q.rear +1)% q.cap//尾部元素指向下一个空间位置,取模运算保证了索引不越界(余数一定小于除数)returntrue}//出队操作//需要考虑: 队队为空没有数据返回了func(q *CycleQueue)Pop()interface{}{if q.rear == q.front {returnnil}
    data := q.data[q.front]
    q.data[q.front]=nil
    q.front =(q.front +1)% q.capreturn data
}//因为是循环队列, 后指针减去前指针 加上最大值, 然后与最大值 取余func(q *CycleQueue)QueueLength()int{return(q.rear - q.front + q.cap)% q.cap}func(q *CycleQueue)FindDataByRequestId(requestId string)string{for i :=0; i < q.QueueLength(); i++{if strings.Count(q.data[i].(string), requestId)==1{
            emitData := q.data[i].(string)
            q.data =append(q.data[:i], q.data[i+1:]...)return emitData
        }}return""}///###########CycleQueue############import("encoding/json""fmt""strconv""sync""time"

    MeetGo "vrv.meeting.server/MeetGo")const NS_MAX_SIZE int=655350var messageBuffer =make([]byte, NS_MAX_SIZE)var messageIndex =0type ChannelListener interface{OnChannelEvent(string,map[string]interface{})OnChannelStringEvent(string,string)}type Channel struct{
    MediasoupClient        *Client
    PendingSent            sync.Map
    LastBinaryNotification interface{}
    ID                     int
    Pid                    int
    udpAddress             string
    udpPort                int
    queue                  CycleQueue
    messageQueue          CycleQueue
    Num                    int
    Listeners              map[string]ChannelListener
    mutex                  sync.RWMutex
}funcNewChannel(tcpClient *Client)*Channel {
    channel :=new(Channel)
    channel.MediasoupClient = tcpClient
    channel.queue = MeetGo.NewCycleQueue(1000)
    channel.messageQueue = MeetGo.NewCycleQueue(10000)
    channel.Num =0
    channel.Listeners =make(map[string]ChannelListener,100)go channel.handle()go channel.handleMessage()return channel
}func(channel *Channel)AddListener(id string, listener ChannelListener){
    channel.mutex.Lock()
    channel.Listeners[id]= listener
    channel.mutex.Unlock()}func(channel *Channel)RemoveListener(id string){
    channel.mutex.Lock()delete(channel.Listeners, id)
    channel.mutex.Unlock()}func(channel *Channel)processMessage(message string){
    jsonMessage :=make(map[string]interface{})
    err := json.Unmarshal([]byte(message),&jsonMessage)if err !=nil{
        MeetGo.Log.Error("Channel processMessage error:%s", err.Error())return}if jsonMessage["registId"]!=nil{
        MeetGo.Log.Debug("client registId succeeded [id:%s]", jsonMessage["registId"].(string))
        channel.ID,_= strconv.Atoi(jsonMessage["registId"].(string))
        channel.Pid =int(jsonMessage["pid"].(float64))
        Global_Worker.OnMediasoupWorkerOnline(channel.ID, channel, jsonMessage["registId"].(string))}elseif jsonMessage["id"]!=nil{
        idd :=int(jsonMessage["id"].(float64))
        value, ret := channel.PendingSent.Load(idd)if!ret {
            fmt.Printf("received Response does not match any sent Request")return}
        channel.PendingSent.Delete(idd)
        asyncReponse := value.(*MeetGo.AsyncSingal)if jsonMessage["accepted"]!=nil&& jsonMessage["accepted"].(bool){
            MeetGo.Log.Debug("request succeeded [id:%d]",int(jsonMessage["id"].(float64)))
            sendReponse := MeetGo.SendReponse{
                ID:       idd,
                Accepted: jsonMessage["accepted"].(bool),
                Data:     jsonMessage["data"].(interface{}).(map[string]interface{}),}
            asyncReponse.Async <- sendReponse
        }else{
            MeetGo.Log.Debug("request failed [id:%d, reason: %s]",int(jsonMessage["id"].(float64)), jsonMessage["reason"].(string))
            sendReponse := MeetGo.SendReponse{
                ID:int(jsonMessage["id"].(float64)),
                Reason: jsonMessage["reason"].(string),}
            asyncReponse.Async <- sendReponse
        }}elseif jsonMessage["targetId"]!=nil&& jsonMessage["event"]!=nil{if jsonMessage["binary"]!=nil{
            channel.LastBinaryNotification = jsonMessage
            return}elseif jsonMessage["data"]!=nil{
            listenerKey := fmt.Sprintf("%d",int(jsonMessage["targetId"].(float64)))
            channel.mutex.RLock()
            listener := channel.Listeners[listenerKey]
            channel.mutex.RUnlock()if listener !=nil{
                listener.OnChannelEvent(jsonMessage["event"].(string), jsonMessage["data"].(map[string]interface{}))}}else{
            data :=make(map[string]interface{})
            listenerKey := fmt.Sprintf("%d",int(jsonMessage["targetId"].(float64)))
            channel.mutex.RLock()
            listener := channel.Listeners[listenerKey]
            channel.mutex.RUnlock()if listener !=nil{
                listener.OnChannelEvent(jsonMessage["event"].(string), data)}}}else{
        fmt.Printf("received message is not a Response nor a Notification")return}}func(channel *Channel)onData(client *Client, message []byte, size int){for{
        ret := channel.messageQueue.Push(message)if ret {break}else{
            time.Sleep(40* time.Millisecond)}}}func(channel *Channel)handle(){for{
        item := channel.queue.Pop()if item ==nil{
            time.Sleep(40* time.Millisecond)continue}
        channel.process(item)
        time.Sleep(1* time.Millisecond)}}func(channel *Channel)handleMessage(){
    ns := NetString{bufLen:0, length:0, state:0}for{
        item := channel.messageQueue.Pop()if item ==nil{
            time.Sleep(40* time.Millisecond)continue}
        message := item.([]byte)var nsPayloads [][]byte
        err := ns.NsUnmarshal(message,&nsPayloads)if err !=nil{
            fmt.Printf("Channel handleMessage nsPayload error %s", err.Error())return}for_, nsPayload :=range nsPayloads {
            channel.queue.Push(nsPayload)}
        time.Sleep(1* time.Millisecond)}}func(channel *Channel)process(data interface{}){
    nsPayload := data.([]byte)if channel.LastBinaryNotification ==nil{switch nsPayload[0]{// 123 = '{' (a Channel JSON messsage).case123:
            channel.processMessage(string(nsPayload))break// 68 = 'D' (a debug log).case68:
            fmt.Printf(string(nsPayload))break// 87 = 'W' (a warning log).case87:
            fmt.Printf(string(nsPayload))break// 69 = 'E' (an error log).case69:
            fmt.Printf(string(nsPayload))breakdefault:
            fmt.Printf("unexpected data: %s",string(nsPayload))}}else{
        msg := channel.LastBinaryNotification
        channel.LastBinaryNotification =nil
        jsonMsg :=make(map[string]interface{})
        err := json.Unmarshal([]byte(msg.(string)),&jsonMsg)if err !=nil{panic(err)}
        listenerKey := fmt.Sprintf("%d",int(jsonMsg["targetId"].(float64)))
        channel.mutex.RLock()
        listener := channel.Listeners[listenerKey]
        channel.mutex.RUnlock()if listener !=nil{
            listener.OnChannelStringEvent(jsonMsg["event"].(string), jsonMsg["data"].(string))}}}func(channel *Channel)Close(){
    channel.PendingSent.Range(func(k, v interface{})bool{
        channel.PendingSent.Delete(k)returntrue})
    registId := strconv.Itoa(channel.ID)
    Global_Worker.OnMediasoupWorkerOffline(registId)
    time.Sleep(time.Millisecond *250)//?
    fmt.Printf("channel.MediasoupClient.Close() ")
    channel.MediasoupClient.Close()}func(c *Channel)Request(method string, internal,
    data map[string]interface{})(chan MeetGo.SendReponse,int64){

    id :=RandomNumberGenerator(10000000,99999999)
    fmt.Printf("MediasoupLib Channel [method:%s, id:%d]", method, id)

    request := MeetGo.RequestJson{
        ID:       id,
        Method:   method,
        Internal: internal,
        Data:     data,}
    requestJson := request.Encode()
    requestSend :=nsEncode(requestJson)
    fmt.Printf("___requestSend : %s", requestSend)
    sendReponse :=new(MeetGo.AsyncSingal)
    sendReponse.Async =make(chan MeetGo.SendReponse)if sendReponse !=nil{
        c.PendingSent.Store(int(id), sendReponse)}defer c.MediasoupClient.Send(requestSend)return sendReponse.Async, id
}func(channel *Channel)SetUdp(udpAddress string, udpPort int){
    channel.udpAddress = udpAddress
    channel.udpPort = udpPort
}

tips

更多关于mediasoup的文章可以进入我的专栏查看
http://t.csdnimg.cn/3UQeL


本文转载自: https://blog.csdn.net/LittleBoy_IT/article/details/139220967
版权归原作者 littleboy_webrtc 所有, 如有侵权,请联系我们删除。

“golang实现mediasoup的tcp服务及channel通道”的评论:

还没有评论