0


深入浅出WebRTC—Pacer

平滑发包(Pacer)是 WebRTC 实现高质量实时通信不可或缺的一部分。在视频通信中,单帧视频可能包含大量的数据,如果未经控制地立即发送,可能瞬间对网络造成巨大压力。Pacer 能够根据网络条件动态调整发送速率,确保数据包以均匀且可控的速度发送,避免突发的大批量数据造成网络拥塞和数据包丢失。这样可以提升传输的稳定性,减少延迟和抖动,从而改善视频和音频的流畅度与质量。

1. 总体架构

1.1. 静态结构

1)TaskQueuePacedSender

TaskQueuePacedSender 是 PacingController 的包装器,其大部分接口直接透传到 PacingController。同时,TaskQueuePacedSender 是平滑发包的驱动器,内部使用 TaskQueue 驱动不断循环发包。

2)PacingController

PacingController 是平滑发包的控制器,用来实现指定速率的平滑发包,包含较复杂的控制逻辑,其内部使用 PrioritizedPacketQueue 缓存待发送报文。

**3)BitRateProber **

BitRateProber 和带宽探测相关。其接受带宽探测任务,使用平滑发包机制,按照带宽探测任务要求,控制带宽探测发包速率、发包时长、发包数量等参数。

1.2. 调用流程

下图展示的是一个典型的发包调用流程,包含一长两短三条路径,这三条路径配合实现平滑发包和带宽探测功能。

1)长路径是由 TaskQueueBase 驱动将报文插入发送队列,然后调用 NextSendTime 和 ProcessPackets 发送到期该发送的报文,然后创建一个调度任务 Post 到 TaskQueueBase 驱动循环报文发送。

2)其中一条短路径,是外部模块调用 EnqueuePackets 发送报文,TaskQueuePacedSender 将其封装成一个任务,调用 PostTask 转换到内部线程执行。

3)另外一条短路径,是外部模块调用 CreateProbeCluster 向 BitrateProber 创建带宽探测任务。

1.3. 逻辑架构

下图展示的是平滑发包逻辑架构。

1)平滑发包以 TaskQueue 进行驱动,它接受 Controller 和 BitrateProber 的控制,从 packet_queue_ 抓取报文经 PacketSender 发送到网络。

2)Controller 主要控制发包时间和发包数量,即什么时候发包以及发多少包,其控制逻辑受多个方面影响,比如外部设置的发包间隔、平滑发送码率、网络缓冲区状态、发包队列长度以及平均排队时长等。

3)BitrateProber 接受创建的带宽探测任务,它可以控制 TaskQueue 优先发送探测报文,来实现带宽探测功能。探测报文可以是媒体报文,也可以是 Padding 报文,Padding 报文需要调用 PacketSender 接口生成。

WebRTC 平滑发包是一个典型的“发送-等待”模型,如下图所示。基本逻辑是,将发送时间切分成一段一段的时间片(时间片长度不一定相等),在每个时间片的开始按照指定码率发送一定数量的报文,等待网络管道排空,然后继续下一轮发送,循环往复。这样做可以尽量保持比较均匀的发送码率,不会对网络造成冲击,同时,可以获得尽可能低延迟。

以上发包模型最关键的是计算每个时间片发送多少数据。最简单的思路就是使用固定时间片长度,发送固定数量的报文。WebRTC 最新代码已经没有使用固定周期发包模式了,采用的是动态发包周期,原因可能是固定发包周期无法满足不同发包要求:

1)带宽探测需要使用更小的发送时间间隔,以实现更准确的带宽探测。

2)高优先级报文需要立即发送,不能等待平滑发送时间片。

3)如果平滑发送速率太大,要适当调低发送时间间隔,否则有可能导致网络缓冲区溢出。

4)没有报文发送的时候,发送 keep-alive 的时间间隔不需要像发送媒体报文那么小。

动态周期发包模式引入 media_debt_ 变量来控制发包节奏,如下图所示。media_debt_ 可以认为是存在于网络缓冲区中的报文数量的一个计算值而非测量值。每次发送报文都会增加 media_debt_,增加的大小等于发送报文的大小;每经过一段时间,都会减少 media_debt_,减少的大小等于adjusted_media_rate * delta_t。当 media_debt_ 为0时,认为网络管道已经排空。

1.4. 报文类型

平滑发包会发送三类报文,分别是媒体报文、保活(keep-alive)报文和探测报文。如果基于Payload Type划分,可以分为 Audio、Video、RTX、FEC、Padding 五种类型的报文。

Keep-alive 报文属于 Padding 报文;探测报文可能是 Padding 报文,也可能是 RTX 报文。Audio、Video、RTX、FEC 都属于媒体报文。

2. TaskQueuePacedSender

2.1. 静态结构

2.1.1. 重要属性

1)pacing_controller_

执行具体平滑发包控制,包括什么时候发包,发多少包。

2)task_queue_

单线程驱动 pacing_controller_ 循环发包。

3)packet_size_

一个指数加权平均算法,用于获取平滑后的报文大小。平滑后的报文大小主要用来计算保持窗口大小。

2.1.2. 重要方法

1)EnqueuePackets

外部模块调用此接口发送报文。

2)CreateProbeClusters

创建带宽探测簇,内部调用 PacingController 对应方法。

3)SetCongested

设置链路拥塞状态,内部调用 PacingController 对应方法。

4)SetPacingRates

设置平滑发送速率,内部调用 PacingController 对应方法。

5)SetSendBurstInterval

设置平滑发包间隔,内部调用 PacingController 对应方法。

6)SetQueueTimeLimit

设置报文最大排队时间,内部调用 PacingController 对应方法。

2.2. 源码分析

2.2.1. EnqueuePackets

创建一个任务 Post 到内部线程,循环遍历所有要发送的报文,调用 PacingController 接口将报文插入发送队列。报文入队列完毕后,可能有报文需要发送,立即触发一次发包处理。

void TaskQueuePacedSender::EnqueuePackets(
  std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
    task_queue_->PostTask(
      SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
        // 循环遍历所有报文
        for (auto& packet : packets) {
          // 计算报文大小:header + payload + padding
          size_t packet_size = packet->payload_size() + packet->padding_size();
          if (include_overhead_) {
            packet_size += packet->headers_size();
          }
          // 计算平滑后的报文大小:y(k) = 0.9 * y(k-1) + 0.1 * sample
          packet_size_.Apply(1, packet_size);
          // 调用 PacingController::EnqueuePacket
          pacing_controller_.EnqueuePacket(std::move(packet));
        }
        // 插入报文后立即触发一次处理
        MaybeProcessPackets(Timestamp::MinusInfinity());
      }));
}

2.2.2. MaybeProcessPackets

先检测是否有报文需要发送,如果有则循环发送这个时间片需要发送的报文,然后获取下一次报文发送时间,创建一个新的调度任务,继续下一轮报文发送,形成一个发包循环。

MaybeProcessPackets 除了自循环外,如果执行了可能导致 PacingController 内部状态变化的操作,也会被调用来触发报文发送,包括 CreateProbeClusters、SetCongested、SetPacingRates 等操作。

除了主干流程,还有几个处理细节值得分析:

1)early_execute_margin

对于探测报文,允许提前 1ms 发送和提前 1ms 调度,代码中没有明确说明这么做的原因。理论上,探测发包是按照探测目标码率进行调度,没必要特殊处理。猜测可能的原因是定时器精度存在误差,而带宽探测对发包速率要求比较高,提前发送和调度可以有效保证探测发送速率。比如本来是第 100ms 回调 MaybeProcessPackets,结果是 99ms 回调,发现未到发包时间,需要重新创建一个调度任务,如果是探测发包,索性提前发送。

2)hold_back_window

对于非探测发包,会设置一个小于 5ms 的保持窗口,两次发包时间间隔不能低于保持窗口。保持窗口受 pacing_rate 影响,pacing_rate 越高则窗口越小。由于 send_burst_interval_ 已经可以用来控制发包间隔,这里附加的发包间隔控制,更像是一个额外保护。

3)scheduled_process_time

这个参数用来区分 MaybeProcessPackets 是自循环调度还是其他调度,对于自循环调度,scheduled_process_time 是一个有效值,其他调度这个值是 Timestamp::MinusInfinity。如果是自循环调度,但是 next_process_time_ 已经改变,说明在此之前发生了其他调用,当前调度已经过时,没必要继续执行。

void TaskQueuePacedSender::MaybeProcessPackets(Timestamp scheduled_process_time) {
    ...

    // 获取下次发送时间
  Timestamp next_send_time = pacing_controller_.NextSendTime();
  const Timestamp now = clock_->CurrentTime();

    // 获取提前发送间隔(探测为什么要加1ms的允许提前发送?)
  TimeDelta early_execute_margin = pacing_controller_.IsProbing()
          ? PacingController::kMaxEarlyProbeProcessing : TimeDelta::Zero();

    // 循环发送所有需要发送的报文
  while (next_send_time <= now + early_execute_margin) {
        // 执行发送
    pacing_controller_.ProcessPackets();
        // 获取下次发送时间
    next_send_time = pacing_controller_.NextSendTime();
        // 获取提前发送间隔
    early_execute_margin = pacing_controller_.IsProbing()
                ? PacingController::kMaxEarlyProbeProcessing : TimeDelta::Zero();
  }

    // 如果 scheduled_process_time 有值,说明是 TaskQueuePacedSender 循环调度
  if (scheduled_process_time.IsFinite()) {
        // 新任务修改了next_process_time_,此任务没必要再往下执行了
    if (scheduled_process_time != next_process_time_) {
      return;
    }
        // 匹配到任务时间戳,重置表示任务已经执行,没有待执行任务了
    next_process_time_ = Timestamp::MinusInfinity();
  }

  // hold_back_window 用来避免非探测状态下发送间隔太小
  TimeDelta hold_back_window = TimeDelta::Zero();
  if (!pacing_controller_.IsProbing()) {
        // 保持窗口初始化为 5ms
    hold_back_window = max_hold_back_window_;
        // 获取平滑速率
    DataRate pacing_rate = pacing_controller_.pacing_rate();        
    if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
        !pacing_rate.IsZero() &&
        packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
            // 计算发送一个报文需要多长时间
      TimeDelta avg_packet_send_time =
        DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
            // 取发送 3 个报文所需时间与 5ms 之间的更小值
      hold_back_window = std::min(hold_back_window,
        avg_packet_send_time * max_hold_back_window_in_packets_);
    }
  }

  // 计算下次发送时间,发送间隔不能小于 hold_back_window
  TimeDelta time_to_next_process =
      std::max(hold_back_window, next_send_time - now - early_execute_margin);
  next_send_time = now + time_to_next_process;

  // 尝试启动一个新任务
  if (next_process_time_.IsMinusInfinity() || // 没有待执行任务
      next_process_time_ > next_send_time) {  // 存在待执行任务,但执行时间更靠后
    task_queue_->PostDelayedHighPrecisionTask(
        SafeTask(safety_.flag(),
            [this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
        time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
        // 更新下一次处理时间
    next_process_time_ = next_send_time;
  }
}

3. PacingController

3.1. 静态结构

3.1.1. 重要属性

1)packet_sender_

发送报文的对象,生成 padding 报文也是靠它。

2)prober_

带宽探测控制对象,基于平滑发送机制执行带宽探测任务。

3)packet_queue_

优先级队列,报文发送前会先插入此队列,能够保证高优先级报文先发送。

3.1.2. 重要方法

1)EnqueuePacket

将报文插入发送队列。

2)CreateProbeClusters

创建探测任务,内部调用 BitrateProber 方法。

3)SetCongested

设置拥塞状态,拥塞状态下需要停止发包,但需要继续发送 keep-alive 报文。

4)SetPacingRates

设置平滑发包速率。

5)SetSendBurstInterval

设置平滑发包时间间隔,内部可能根据状态和反馈调整发包间隔。

6)SetQueueTimeLimit

设置报文最大排队时长,如果发现按照当前发包速率,报文在队列中的排队时长会超过此设置值,则会提高发包速率。

7)NextSendTime

获取下次发包时间,TaskQueuePacedSender 调用。

8)ProcessPackets

执行发包流程,TaskQueuePacedSender 调用。

3.2. 源码分析

3.2.1. EnqueuePacket

此方法的核心逻辑是将报文插入到优先级队列,但在插入前和插入后有做一些必要处理。

1)keyframe_flushing_

如果配置了关键帧刷新,当收到关键帧的第一个报文,会检查当前队列中是否有其他关键帧报文,如果有的话,需要将之前关键帧报文全部移除,包括 RTX 报文。因为,正常情况两个关键帧相隔时间较长,如果新来的关键帧在队列中能看到上一个关键帧,说明发生网络阻塞,再发送老的关键帧会引起更大的延迟。

2)队列为空处理

如果发现队列为空,说明当前已经没有要发送的报文,立即更新 budget,提前消耗 media_dbet_,使得下次 TaskQueuePacedSender 调用 NextSendTime 时能够尽快发送报文。

3)MaybeUpdateMediaRateDueToLongQueue

报文入队列后,队列长度增加,检查是否需要调整发送码率以尽快发送队列中的报文。具体参考 MaybeUpdateMediaRateDueToLongQueue 源码分析。

void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  // 为了尽快输出新的关键帧,清空该流中当前待处理的所有数据包。
    if (keyframe_flushing_ && // 如果配置了关键帧刷新
      packet->packet_type() == RtpPacketMediaType::kVideo && // 视频报文
      packet->is_key_frame() && // 属于关键帧报文
          packet->is_first_packet_of_frame() && // 关键帧的第一个报文
      !packet_queue_.HasKeyframePackets(packet->Ssrc())) { // 当前队列没有关键帧报文
        // 先清空媒体报文
    packet_queue_.RemovePacketsForSsrc(packet->Ssrc());
        // 再清空关联的 RTX 报文,如果有的话
    absl::optional<uint32_t> rtx_ssrc =
        packet_sender_->GetRtxSsrcForMedia(packet->Ssrc());
    if (rtx_ssrc) {
      packet_queue_.RemovePacketsForSsrc(*rtx_ssrc);
    }
  }

    // prober 可能在等待一个足够大的报文来启动带宽探测任务
  prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));

    // 获取当前时间
  const Timestamp now = CurrentTime();

    // 当前队列为空,来了一个新的报文,希望能够尽快处理
  if (packet_queue_.Empty()) {
    Timestamp target_process_time = now;
    Timestamp next_send_time = NextSendTime();
    if (next_send_time.IsFinite()) {
      target_process_time = std::min(now, next_send_time);
    }
    UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
  }

    // 报文入队列
  packet_queue_.Push(now, std::move(packet));
  seen_first_packet_ = true;

    // 队列长度增加了,检查是否需要改变平滑码率
  MaybeUpdateMediaRateDueToLongQueue(now);
}

3.2.2. MaybeUpdateMediaRateDueToLongQueue

正常情况,真实发送码率等于设置的平滑发送码率。但如果要考虑超长报文队列的影响,则需要在报文队列超长时适当调高发送码率,以排空报文队列,防止队列不断累积而导致报文被丢弃。

【注意】这里可能会导致真实发送码率大于设置的平滑发送码率。

void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {
  // 更新为pacing rate
  adjusted_media_rate_ = pacing_rate_;

  // 如果不考虑排空超长队列,则直接使用设置的pacing rate
  if (!drain_large_queues_) {
    return;
  }

  DataSize queue_size_data = QueueSizeData();
  if (queue_size_data > DataSize::Zero()) {
    // 更新队列中报文排队时长
    packet_queue_.UpdateAverageQueueTime(now);
    
    // 计算当前报文已排队平均时长与配置的最大排队时长之间的差值
    TimeDelta avg_time_left =
        std::max(TimeDelta::Millis(1),
                 queue_time_limit_ - packet_queue_.AverageQueueTime());
    
    // 在剩余排队时长中将队列中所有报文都发送出去所需的最小码率(排队超时会导致报文丢弃)
    DataRate min_rate_needed = queue_size_data / avg_time_left;
    
    // 如果基于报文排队时长限制计算出来的码率大于带宽评估设置的码率,则更新调整后码率
    if (min_rate_needed > pacing_rate_) {
      adjusted_media_rate_ = min_rate_needed;
    }
  }
}

3.2.3. NextSendTime

计算 next_send_time 之前,先计算排空时间 drain_time,即使用当前 adjusted_media_rate_ 来排空 media_debt_ 需要多少时间,然后将 drain_time 与 send_burst_interval 进行比较。

如果 send_burst_interval > drain_time,如下图所示。表示按照当前的发送速率和发送时间间隔,在下次发送时间到来前,网络换缓冲区会被提前排干,这样会导致网络空闲一段时间,应该立即发送报文,因此将 next_send_time 设置为 last_process_time_,表示发送的报文数量不足,需要立即补发报文。

如果 send_burst_interval < drain_time,如下图所示。表示按照当前的发送速率和发送时间间隔,会导致网络缓冲区积压,这样会增大延时,继续积压可能会导致拥塞。所以应该等待足够多的时间让网络缓冲区排空后再继续发送报文。next_send_time 设置为 last_process_time_ + drain_time。

Timestamp PacingController::NextSendTime() const {
  const Timestamp now = CurrentTime();
  Timestamp next_send_time = Timestamp::PlusInfinity();

  // 已经暂停,但还需要发送keep-alive报文
  if (paused_) {
    return last_send_time_ + kPausedProcessInterval; // 500ms
  }

  // 带宽探测优先级更高,如果当前正在探测,由 BitrateProber 控制发送时间间隔
  if (prober_.is_probing() && !probing_send_failure_) {
    Timestamp probe_time = prober_.NextProbeTime(now);
    if (!probe_time.IsPlusInfinity()) {
      return probe_time.IsMinusInfinity() ? now : probe_time;
    }
  }

  // 音频报文和重传报文不做平滑,立即发送
  Timestamp unpaced_send_time = NextUnpacedSendTime();
  if (unpaced_send_time.IsFinite()) {
    return unpaced_send_time;
  }

  // 处于拥塞状态或者队列中未层插入任何报文,也要发送keep-alive报文
  if (congested_ || !seen_first_packet_) {
    return last_send_time_ + kCongestedPacketInterval; // 500ms
  }

  // 有待发送媒体报文,且媒体发送码率大于 0
  if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
    // 计算按照当前速率排空media_debt_需要多长时间
    TimeDelta drain_time = media_debt_ / adjusted_media_rate_;

    // send_burst_interval = 40ms
    // 确保突发的数据包总数不超过kMaxBurstSize,以免在高码率下填满 socket 缓冲区。
    // 换种说法,如果码率太高的话,发送时间间隔要相应调小一些。
    TimeDelta send_burst_interval =
        std::min(send_burst_interval_, kMaxBurstSize / adjusted_media_rate_);

    // send_burst_interval >  drain_time:说明网络缓冲区欠载,可以立即发送报文
    // send_burst_interval <= drain_time: 说明缓冲区过载或稳定,尽量保持网络缓冲区稳定
    next_send_time = last_process_time_ +
        ((send_burst_interval > drain_time) ? TimeDelta::Zero() : drain_time);
    
  // 无待发送媒体报文,且 padding 码率大于 0
  } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
    // 计算排空时间(取媒体数据和 Padding 数据排空的最大值)
    TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_,
                                    padding_debt_ / padding_rate_);

    if (drain_time.IsZero() && (!media_debt_.IsZero() || !padding_debt_.IsZero())) {
      // 有非零的 debt,但排空时间为 0,取最小的非零时间增量
      drain_time = TimeDelta::Micros(1); // 1us
    }
    next_send_time = last_process_time_ + drain_time;
  } else {
    // Nothing to do.
    next_send_time = last_process_time_ + kPausedProcessInterval;
  }

  if (send_padding_if_silent_) {
    next_send_time =
        std::min(next_send_time, last_send_time_ + kPausedProcessInterval);
  }

  return next_send_time;
}

3.2.4. ProcessPackets

此方法的主要逻辑是循环发送报文。对于带宽探测,发送指定大小的报文后停止;对于非带宽探测,根据 NextSendTime 退出。几个处理细节需要关注:

1)GetPendingPacket

带宽探测是先用媒体报文来探测,如果媒体报文发送完了,则会调用 PacketRouter::GeneratePadding 获取 padding 报文来填充探测码率。

2)UpdateTimeAndGetElapsed

代码中使用 target_send_time 来更新 media_dbet_而不是用 now 来更新 media_dbet_。在每一轮发送循环中,会调用 UpdateBudgetWithElapsedTime 来消耗 media_dbet_,从最终结果看,效果是一样的。 (具体原因待分析)

3)circuit_breaker_threshold_

这是一个保护阀,避免出现异常情况,进入死循环。

void PacingController::ProcessPackets() {
  ...

  TimeDelta early_execute_margin =
      prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();

  // 获取发送时间
  target_send_time = NextSendTime();
  if (now + early_execute_margin < target_send_time) {
    // 虽然未到发送时间,但还是要处理 dbet
    UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now));
    return;
  }

  // 到了要发送的时间,使用 target_send_time 来更新 dbet,而不是 now
  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
  if (elapsed_time > TimeDelta::Zero()) {
    UpdateBudgetWithElapsedTime(elapsed_time);
  }

  PacedPacketInfo pacing_info;
  DataSize recommended_probe_size = DataSize::Zero();
  bool is_probing = prober_.is_probing();

  // 如果正在探测中,则先发送探测报文
  if (is_probing) {
    pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
    if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
      // 一轮探测至少要发送数据量
      recommended_probe_size = prober_.RecommendedMinProbeSize();
    } else {
      // No valid probe cluster returned, probe might have timed out.
      is_probing = false;
    }
  }

  DataSize data_sent = DataSize::Zero();
  int iteration = 0;
  int packets_sent = 0;
  int padding_packets_generated = 0;

  // circuit_breaker_threshold_ 是一个保护阀
  for (; iteration < circuit_breaker_threshold_; ++iteration) {
    // 从队列中获取待发送媒体报文
    std::unique_ptr<RtpPacketToSend> rtp_packet =
        GetPendingPacket(pacing_info, target_send_time, now);
    if (rtp_packet == nullptr) {
      // 没有待发送媒体报文,则发送 Padding 报文
      ...
    } else { // 获取到待发送媒体报文
      const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
      DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
                                             rtp_packet->padding_size());

      if (include_overhead_) {
        packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
                       transport_overhead_per_packet_;
      }

      // pacing_info 会与被发送的报文一起被记录下来
      packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);

      // 如果有 FEC 报文,则将 FEC 报文入队列
      for (auto& packet : packet_sender_->FetchFec()) {
        EnqueuePacket(std::move(packet));
      }

      // 更新统计数据
      data_sent += packet_size;
      ++packets_sent;

      // 发送完成,更新发送时间和 media_debt_
      OnPacketSent(packet_type, packet_size, now);

      if (is_probing) {
        // 累加已发送数据
        pacing_info.probe_cluster_bytes_sent += packet_size.bytes();
        // 发送数据以达到设定目标,退出循环
        if (data_sent >= recommended_probe_size) {
          break;
        }
      }

      // 获取下一次发送时间
      target_send_time = NextSendTime();

      // 还未到下一次发送时间
      if (target_send_time > now) {
        // 如果是媒体报文,直接退出
        if (!is_probing) {
          break;
        }
        // 探测报文继续发送
        target_send_time = now;
      }

      // 更新 debt
      UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time));
    }
  }

  if (iteration >= circuit_breaker_threshold_) {
    last_send_time_ = now;
    last_process_time_ = now;
    return;
  }

  // 如果发送的是探测报文,则需要更新探测器的状态
  if (is_probing) {
    probing_send_failure_ = data_sent == DataSize::Zero();
    if (!probing_send_failure_) {
      prober_.ProbeSent(CurrentTime(), data_sent);
    }
  }

  MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
}

4. PrioritizedPacketQueue

PrioritizedPacketQueue 用来存放待发送报文,支持按照优先级存取,以保证高优先级报文能够更早发送出去。比如重传报文应该尽快发送而不是在队列尾部老老实实的排队,否则可能黄花菜都凉了。

4.1. 静态结构

4.1.1. 重要属性

1)streams_

streams_ 是一个 unordered_map,key 是 SSRC,value 是一个 StreamQueue。StreamQueue 是一个包含 5 个元素的 deque 数组,数组下标即为优先级,deque 中保存的对应优先级的报文。

2)streams_by_prio_

streams_by_prio_ 也是一个包含 5 个元素的 deque 数组,数组下标也表示优先级,但是 deque 中保存的是 StreamQueue 指针,表示 StreamQueue 存在对应优先级的报文。

3)top_active_prio_level_

top_active_prio_level_是一个 int 类型数据,指向当前存在数据的最高优先级(数值越小优先级越高)。

4.1.2. 重要方法

1)Push

向队列中插入一个报文,内部会根据报文的 ssrc 和预先定义的优先级插入合适的地方。

2)Pop

从队列中取出一个报文,内部会获取最高优先级的报文返回。

4.2. 优先级定义

报文优先级定义如下,数字越小优先级越高(参考函数GetPriorityForType)。

0 - Audio
1 - Audio RTX
2 - Video RTX
3 - Video and FEC
4 - Padding

4.3. 数据结构

下图是 PrioritizedPacketQueue 的一个典型示例,streams_ 保存了两个 SSRC 的报文,SSRC1 没有 P1 和 P3 两个优先级的报文,SSRC2 没有 P0 和 P4 两个优先级的报文。streams_by_prio_中,只有 P2 优先级保存了 A、B 两个StreamQueue 的指针,其他优先级都只有一个 StreamQueue 指针。当前最高优先级报文为 P0,所以 top_active_prio_level_ 指向 P0。

Push操作

SSRC1 没有 P1 优先级的报文,所以在 streams_by_prio_ 的优先级 P1 没有保存 A 的指针,假设现在向 SSRC1 插入一个 P1 优先级的报文,则需要在 streams_by_prio_ 的 P1 优先级加上 A 的指针,根据先来后到规则,A 排在 B 之后。此时 top_active_prio_level_ 仍然指向 P0。

Pop操作

SSRC1 的 P0 优先级存在一个报文,streams_by_prio_ 中的 P0 队列中保存了 A 的指针。假设现在要取一个报文去发送,根据 top_active_prio_level_ 指向的优先级,应该从 SSRC1 的 P0 队列 pop 一个报文。pop以后,A 的 P0 队列为空,此时,要从 streams_by_prio_ 的 P0 队列中将 A 移除,这样 streams_by_prio_ 的 P0 队列也空了,top_active_prio_level_ 指向当前最高优先级 P1。

5. BitrateProber

带宽探测不能无所顾忌的发包,需要复用平滑发送机制。BitrateProber 负责带宽探测的管理和控制,它接收创建的带宽探测任务,按照任务目标和要求执行探测动作,并控制探测的生命周期。

5.1. 静态结构

5.1.1. 重要属性

1)probing_state_

带宽探测状态:{ kDisabled, kInactive, kActive },kActive 表示正在探测中。

2)clusters_

用来保存待执行的探测任务,这些探测任务会从头开始一个一个顺序执行,默认最大探测任务数为 5。

3)next_probe_time_

下一次发送探测包时间,用来控制探测发包的频率。

4)config_

探测发包相关配置。

struct BitrateProberConfig {
  FieldTrialParameter<TimeDelta> min_probe_delta; // 默认 2ms
  FieldTrialParameter<TimeDelta> max_probe_delay; // 默认 10ms
  FieldTrialParameter<DataSize> min_packet_size;  // 默认 200Bytes
};

5.1.2. 重要方法

1)OnIncomingPacket

BitrateProber 会监听所有插入发送队列的报文,当发现有足够大的发送报文时,才会启动带宽探测任务。代码注释中说这样能够得到更加准确的探测结果,具体原因未知。

2)CreateProbeClusters

创建带宽探测任务,探测任务会被插入任务队列等待执行。

3)NextProbeTime

获取下一次发送探测包的时间。

4)RecommendedMinProbeSize

获取一轮发送探测包大小。

5)ProbeSent

通知一轮探测报文发送完毕,内部会更新下一次探测时间,并判断探测任务是否已经完成。

5.2. 数据结构

5.2.1. ProbeClusterConfig

创建探测任务传入的参数,指定了探测码率、探测时长和探测次数(一个 burst 算一次)

struct ProbeClusterConfig {
  // 探测任务创建时间
  Timestamp at_time = Timestamp::PlusInfinity();
  // 要求的探测速率
  DataRate target_data_rate = DataRate::Zero();
  // 要求的探测时长(默认为 15ms,由 ProbeController 设置)
  TimeDelta target_duration = TimeDelta::Zero();
  // 探测次数(默认为 5,由 ProbeController 设置)
  int32_t target_probe_count = 0;
  // 探测任务 ID,由 ProbeController 分配
  int32_t id = 0;
};

5.2.2. ProbeCluster

BitrateProber 将 ProbeClusterConfig 转化为内部的 ProbeCluster 数据结构。BitrateProber 不再使用 target_duration,将其转换为 probe_cluster_min_bytes。

struct PacedPacketInfo {
  static constexpr int kNotAProbe = -1;
  // 来自 ProbeClusterConfig::target_data_rate
  DataRate send_bitrate = DataRate::BitsPerSec(0);
  // 来自 ProbeClusterConfig::id
  int probe_cluster_id = kNotAProbe;
  // 来自 ProbeClusterConfig::target_probe_count
  int probe_cluster_min_probes = -1;
  // 来自 ProbeClusterConfig::target_data_rate * ProbeClusterConfig::target_duration
  int probe_cluster_min_bytes = -1;
  // 发送的总字节数
  int probe_cluster_bytes_sent = 0;
};

struct ProbeCluster {
  PacedPacketInfo pace_info;
  // 已发送了几轮探测报文
  int sent_probes = 0;
  // 已经发送的探测报文数据量
  int ProbeCluster = 0;
  // 来自 ProbeClusterConfig::at_time
  Timestamp requested_at = Timestamp::MinusInfinity();
  // 开始发送探测报文时间
  Timestamp started_at = Timestamp::MinusInfinity();
  // 这个字段目前没用到
  int retries = 0;
};

PacedPacketInfo::probe_cluster_bytes_sent 是 ProbeCluster::ProbeCluster 的一个拷贝。

absl::optional<PacedPacketInfo> BitrateProber::CurrentCluster(Timestamp now) {
    ...

  PacedPacketInfo info = clusters_.front().pace_info;

  // 将 ProbeCluster::sent_bytes 拷贝到 PacedPacketInfo::probe_cluster_bytes_sent
  info.probe_cluster_bytes_sent = clusters_.front().sent_bytes;
  return info;
}

5.3. 生命周期

5.3.1. 探测开始

调用 CreateProbeClusters 创建探测任务,探测任务并不会立即执行,BitrateProber 会等待一个足够大的报文来启动探测任务。

void BitrateProber::OnIncomingPacket(DataSize packet_size) {
  if (ReadyToSetActiveState(packet_size)) {
    next_probe_time_ = Timestamp::MinusInfinity();
    probing_state_ = ProbingState::kActive;
  }
}

bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {
  if (clusters_.empty()) {
    return false;
  }
  switch (probing_state_) {
    case ProbingState::kDisabled: // 已经关闭
    case ProbingState::kActive:   // 已经处于active状态
      return false;
    case ProbingState::kInactive:
      // min_packet_size = 200B
      return packet_size >=
        std::min(RecommendedMinProbeSize(), config_.min_packet_size.Get());
  }
}

5.3.2. 报文发送

1)什么时间发送

与媒体报文的发送控制逻辑不一样,探测报文的发送控制不需要考虑网络管道的拥塞、发送队列的积压等情况,严格按照探测任务要求的速率发送数据。

Timestamp BitrateProber::CalculateNextProbeTime(const ProbeCluster& cluster) const 
{
    ...

  DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);
  DataRate send_bitrate = cluster.pace_info.send_bitrate;

  // 这里计算距离开始发送报文的相对时间,尽力保证目标探测速率
  TimeDelta delta = sent_bytes / send_bitrate;
  return cluster.started_at + delta;
}

每次发送完数据后,相对探测任务发送起点,使用探测码率重新计算 NextProbeTime,尽量保证发送码率符合探测任务要求。

2)发送多少数据

发送探测报文前,会获取这一轮探测需要发送数据量大小:recommended_probe_size。

if (is_probing) {
  pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
  // probe_cluster_id 是否有效
  if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
    recommended_probe_size = prober_.RecommendedMinProbeSize();
  } else {
    is_probing = false;
  }
}

recommended_probe_size = 设置的探测目标速率 x 配置的每轮探测最小时间间隔,带宽探测使用的发送时间间隔与媒体报文发送时间间隔 send_burst_interval_ 不一样,默认 2ms 的发送间隔感觉挺恐怖。

DataSize BitrateProber::RecommendedMinProbeSize() const {
  if (clusters_.empty()) {
    return DataSize::Zero();
  }
  // send_bitrate 是探测目标码率
  DataRate send_rate = clusters_.front().pace_info.send_bitrate;
  // 使用配置值:min_probe_delta = 2ms
  return send_rate * config_.min_probe_delta;
}

PacingController 保证会在一个 burst 发送指定数量的数据。

if (is_probing) {
  pacing_info.probe_cluster_bytes_sent += packet_size.bytes();
  // 如果是带宽探测,至少发送这么多数据才能退出发送循环。
  if (data_sent >= recommended_probe_size) {
    break;
  }
}

5.3.3. 探测结束

PacingController 每轮发送完以后,会调用 BitrateProber::ProbeSent,更新总共发送了多少数据和发送了多少轮。当达到任务设置的发送数据大小和发送次数要求,探测结束,cluster 被从队列中移除。

void BitrateProber::ProbeSent(Timestamp now, DataSize size) {
  ...

  ProbeCluster* cluster = &clusters_.front();

  // 第一次发送
  if (cluster->sent_probes == 0) {
    cluster->started_at = now;
  }

  // 累加发送的字节数和探测的次数
  cluster->sent_bytes += size.bytes<int>();
  cluster->sent_probes += 1;

  // 根据要求的探测速率计算下一次探测时间
  next_probe_time_ = CalculateNextProbeTime(*cluster);

  // Cluster已经完成,发送数据和发送次数都需要达成目标
  if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
      cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
    clusters_.pop();
  }

  // 没有探测任务了,更新探测状态
  if (clusters_.empty()) {
    probing_state_ = ProbingState::kInactive;
  }

  ...
}

6. 总结

本文详细分析了 WebRTC 平滑发送模块的整体框架和实现原理,并对重要的数据结构和逻辑进行了深入剖析。平滑发送模块设计的非常灵活,采用动态发包周期和漏桶控制机制,能够满足媒体报文发送、带宽探测、高优先级报文优先发送等多种发送要求。不过,相比于固定发包周期,这种设计实现起来更加复杂性,大家在借鉴的时候,要充分评估、小心谨慎。


本文转载自: https://blog.csdn.net/zhh157/article/details/140627672
版权归原作者 old-six-programmer 所有, 如有侵权,请联系我们删除。

“深入浅出WebRTC—Pacer”的评论:

还没有评论