0


mediasoup源码分析(三)channel创建及信令交互

mediasoup源码分析--channel创建及信令交互

概述

在golang实现mediasoup的tcp服务及channel通道一文中,已经介绍过信令服务中tcp和channel的创建,本文主要讲解c++中mediasoup的channel创建,以及信令服务和mediasoup服务如何交互

跨职能图

c92bb199ca71184775fa8bea149e201.png

业务流程图

image.png

数据发送有两种方式:
应用层发送的request最后被封装在Requst对象中,其中包含着"id",因为Request对象中包含着Channel::UnixStreamSocket对象,所以可以直接调用Request::Accept()将处理后的结果告诉应用层进程。
Worker进程也可以主动给应用层进程发送消息,通过Notifier::Emit()即可以给应用进程发送消息,Notifier类中有Channel::UnixStreamSocket,所以直接调用Channel::UnixStreamSocket::Send()就可以发送消息。Notifier类内部的数据成员和函数成员都是静态的,所以在任意位置可以直接通过Channel::Notifier::Emit()函数发送消息。

代码剖析

1.channel创建

intmain(int argc,char* argv[]){// Ensure we are called by our Node library.if(argc ==1){
        std::cerr <<"ERROR: you don't seem to be my real father"<< std::endl;

        std::_Exit(EXIT_FAILURE);}

    std::string id = std::string(argv[1]);
    std::string ip = std::string(argv[2]);int port =atoi(argv[3]);int iperfPort =atoi(argv[4]);// Initialize libuv stuff (we need it for the Channel).DepLibUV::ClassInit();//..........省略部分代码..............// Set the Channel socket (this will be handled and deleted by the Worker).printf("new Channel to %s:%d\n",ip.c_str(),port);auto* channel =newChannel::UnixStreamSocket(ip,port);//..........省略部分代码..............try{// Run the Worker.
            Worker worker(id,channel);// Worker ended.destroy();exitSuccess();}catch(const MediaSoupError& error){MS_ERROR_STD("failure exit: %s", error.what());destroy();exitWithError();}}

UnixStreamSocket构造函数

UnixStreamSocket::UnixStreamSocket(const std::string& ip,int port):::UnixStreamSocket::UnixStreamSocket(ip,port, MaxSize){MS_TRACE_STD();// Create the JSON reader.{
        Json::CharReaderBuilder builder;
        Json::Value settings = Json::nullValue;
        Json::Value invalidSettings;

        builder.strictMode(&settings);MS_ASSERT(builder.validate(&invalidSettings),"invalid Json::CharReaderBuilder");this->jsonReader = builder.newCharReader();}// Create the JSON writer.{
        Json::StreamWriterBuilder builder;
        Json::Value invalidSettings;

        builder["commentStyle"]="None";
        builder["indentation"]="";
        builder["enableYAMLCompatibility"]=false;
        builder["dropNullPlaceholders"]=false;MS_ASSERT(builder.validate(&invalidSettings),"invalid Json::StreamWriterBuilder");this->jsonWriter = builder.newStreamWriter();}}

跳转到handles\UnixStreamSocket.cpp下

UnixStreamSocket::UnixStreamSocket(const std::string& ip,int port,size_t bufferSize):bufferSize(bufferSize){printf("::UnixStreamSocket::UnixStreamSocket\n");MS_TRACE_STD();int err;this->uvHandle       =new uv_tcp_t;this->uvHandle->data =(void*)this;
    err =uv_tcp_init(DepLibUV::GetLoop(),this->uvHandle);if(err !=0){deletethis->uvHandle;this->uvHandle =nullptr;printf("uv_tcp_init() failed: %s\n",uv_strerror(err));MS_THROW_ERROR_STD("uv_tcp_init() failed: %s",uv_strerror(err));}structsockaddr_in dest;uv_ip4_addr(ip.c_str(), port,&dest);this->connect =new uv_connect_t;printf("will connect to %s:%d\n",ip.c_str(),port);
    err =uv_tcp_connect(this->connect,this->uvHandle,(conststructsockaddr*)&dest, onConnect);if(err !=0){deletethis->uvHandle;this->uvHandle =nullptr;printf("uv_tcp_connect() failed: %s\n",uv_strerror(err));MS_THROW_ERROR_STD("uv_tcp_connect() failed: %s",uv_strerror(err));}// Start reading.
    err =uv_read_start(reinterpret_cast<uv_stream_t*>(this->uvHandle),static_cast<uv_alloc_cb>(onAlloc),static_cast<uv_read_cb>(onRead));if(err !=0){uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle),static_cast<uv_close_cb>(onClose));MS_THROW_ERROR_STD("uv_read_start() failed: %s",uv_strerror(err));}// NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb().}

代码中的uv_read_start接口中onRead回调

    err =uv_read_start(reinterpret_cast<uv_stream_t*>(this->uvHandle),static_cast<uv_alloc_cb>(onAlloc),static_cast<uv_read_cb>(onRead));

跳转到onRead中

inlinestaticvoidonRead(uv_stream_t* handle, ssize_t nread,const uv_buf_t* buf){auto* socket =static_cast<UnixStreamSocket*>(handle->data);if(socket ==nullptr)return;

    socket->OnUvRead(nread, buf);}

OnUvRead中调用UserOnUnixStreamRead

voidUnixStreamSocket::UserOnUnixStreamRead(){MS_TRACE_STD();// Be ready to parse more than a single message in a single TCP chunk.while(true){if(IsClosed())return;

            size_t readLen  =this->bufferDataLen -this->msgStart;char* jsonStart =nullptr;
            size_t jsonLen;int nsRet =netstring_read(reinterpret_cast<char*>(this->buffer +this->msgStart), readLen,&jsonStart,&jsonLen);//.............省略部分代码..............// If here it means that jsonStart points to the beginning of a JSON string// with jsonLen bytes length, so recalculate readLen.
            readLen =reinterpret_cast<constuint8_t*>(jsonStart)-(this->buffer +this->msgStart)+ jsonLen +1;

            Json::Value json;
            std::string jsonParseError;if(this->jsonReader->parse((constchar*)jsonStart,(constchar*)jsonStart + jsonLen,&json,&jsonParseError)){
                Channel::Request* request =nullptr;try{
                    request =newChannel::Request(this, json);}catch(const MediaSoupError& error){MS_ERROR_STD("discarding wrong Channel request");}if(request !=nullptr){// Notify the listener.this->listener->OnChannelRequest(this, request);// Delete the Request.delete request;}//.............省略部分代码.................}}

channel创建完成,至此,跳转到worker.cpp中的OnChannelRequest接口。mediasoup监听channel信令并根据request->methodId分类处理
根据request->methodId,分别执行不同的业务
request->methodId有如下分类

    std::unordered_map<std::string, Request::MethodId> Request::string2MethodId ={{"worker.dump",                       Request::MethodId::WORKER_DUMP                          },{"worker.updateSettings",             Request::MethodId::WORKER_UPDATE_SETTINGS               },{"worker.createRouter",               Request::MethodId::WORKER_CREATE_ROUTER                 },{"router.close",                      Request::MethodId::ROUTER_CLOSE                         },{"router.dump",                       Request::MethodId::ROUTER_DUMP                          },{"router.createWebRtcTransport",      Request::MethodId::ROUTER_CREATE_WEBRTC_TRANSPORT       },{"router.createPlainRtpTransport",    Request::MethodId::ROUTER_CREATE_PLAIN_RTP_TRANSPORT    },{"router.createProducer",             Request::MethodId::ROUTER_CREATE_PRODUCER               },{"router.createConsumer",             Request::MethodId::ROUTER_CREATE_CONSUMER               },{"router.setAudioLevelsEvent",        Request::MethodId::ROUTER_SET_AUDIO_LEVELS_EVENT        },{"transport.close",                   Request::MethodId::TRANSPORT_CLOSE                      },{"transport.dump",                    Request::MethodId::TRANSPORT_DUMP                       },{"transport.getStats",                Request::MethodId::TRANSPORT_GET_STATS                  },{"transport.setRemoteDtlsParameters", Request::MethodId::TRANSPORT_SET_REMOTE_DTLS_PARAMETERS },{"transport.setRemoteParameters",     Request::MethodId::TRANSPORT_SET_REMOTE_PARAMETERS      },{"transport.setMaxBitrate",           Request::MethodId::TRANSPORT_SET_MAX_BITRATE            },{"transport.changeUfragPwd",          Request::MethodId::TRANSPORT_CHANGE_UFRAG_PWD           },{"transport.startMirroring",          Request::MethodId::TRANSPORT_START_MIRRORING            },{"transport.stopMirroring",           Request::MethodId::TRANSPORT_STOP_MIRRORING             },{"producer.close",                    Request::MethodId::PRODUCER_CLOSE                       },{"producer.dump",                     Request::MethodId::PRODUCER_DUMP                        },{"producer.getStats",                 Request::MethodId::PRODUCER_GET_STATS                   },{"producer.pause",                    Request::MethodId::PRODUCER_PAUSE                       },{"producer.resume",                  Request::MethodId::PRODUCER_RESUME                      },{"producer.setPreferredProfile",      Request::MethodId::PRODUCER_SET_PREFERRED_PROFILE       },{"consumer.close",                    Request::MethodId::CONSUMER_CLOSE                       },{"consumer.dump",                     Request::MethodId::CONSUMER_DUMP                        },{"consumer.getStats",                 Request::MethodId::CONSUMER_GET_STATS                   },{"consumer.enable",                   Request::MethodId::CONSUMER_ENABLE                      },{"consumer.pause",                    Request::MethodId::CONSUMER_PAUSE                       },{"consumer.resume",                   Request::MethodId::CONSUMER_RESUME                      },{"consumer.setPreferredProfile",      Request::MethodId::CONSUMER_SET_PREFERRED_PROFILE       },{"consumer.setEncodingPreferences",   Request::MethodId::CONSUMER_SET_ENCODING_PREFERENCES    },{"consumer.requestKeyFrame",          Request::MethodId::CONSUMER_REQUEST_KEY_FRAME           }};

下一章节介绍mediasoup如何将信令返回值及其他通知信息推送到信令服务,敬请期待!


本文转载自: https://blog.csdn.net/LittleBoy_IT/article/details/139757489
版权归原作者 littleboy_webrtc 所有, 如有侵权,请联系我们删除。

“mediasoup源码分析(三)channel创建及信令交互”的评论:

还没有评论