当前位置: 首页 > news >正文

SRS流媒体服务器(8)源码分析之rtc/rtmp互相转码详解

1)测试环境配置要点

RTC服务器配置

rtc_server {enabled on;listen 8000;  # UDP端口candidate $CANDIDATE;  # 服务器IP
}

VHOST RTC配置

vhost __defaultVhost__ {rtc {enabled on;rtmp_to_rtc on;  # 启用RTMP到RTC转换rtc_to_rtmp off;}
}

2) 流媒体转换基础知识

一问:rtmp和webrtc协议怎么转换?

需解析rtmp协议,得到aac/h264裸流。对aac进行转opus,对h264过滤b帧封装成rtp。因为在实时通信webrtc一般不用b帧,会带来延迟。

img

二问:webrtc播放基本的逻辑是怎么样的

1.推rtmp流 /live/livestream

2.使用web播放对应的码流。 /live/livestream

三问:SRS流媒体服务器RTMPtoRTC转换技术实现

  1. RTMP基于TCP协议,使用长连接传输,而WebRTC基于UDP协议,采用P2P或服务器中介的连接方式。协议差异处理:
  • 传输协议: RTMP基于TCP,WebRTC基于UDP
  • 媒体格式: 音频从AAC转换为OPUS
  • 封装格式: 从RTMP标签转换为RTP包
  • 音频转码优化
// 音频转码关键逻辑
SrsAudioTransCoder* transcoder = new SrsAudioTransCoder();
transcoder->transcode(aac_data, opus_data);
  1. 视频处理策略
  • B帧过滤:WebRTC不支持B帧
  • NALU合并:可配置是否合并多个NALU
  • 分片处理:大于MTU的NALU使用FUA分片
  1. 性能优化
  • 协程架构:基于ST (State Threads)协程库
  • 内存管理:避免频繁内存分配
  • 缓存机制:缓存关键媒体头信息

3) SRS流媒体服务器转换架构设计

​ SRS的RTMPtoRTC转换通过精心设计的桥接器架构实现,核心是SrsRtcFromRtmpBridger类。这个桥接器实现了ISrsLiveSourceBridger接口,负责将RTMP的音视频数据包转封装成RTC需要的数据包格式。

核心组件与类分析

核心组件类关系:

  1. 连接层 (Connection Layer)
  • SrsRtmpConn: 处理RTMP客户端连接和推流请求
  • SrsRtcConnection: 管理WebRTC客户端连接
  1. 桥接层 (Bridge Layer)
  • SrsRtcFromRtmpBridger: 核心转换桥接器
    • 实现ISrsLiveSourceBridger接口
    • 负责RTMP到RTC的协议转换
    • 包含音频转码和RTP封装逻辑
  1. 源管理层 (Source Management)
  • SrsLiveSource: 管理RTMP直播源
  • SrsRtcSource: 管理RTC流源
  1. 格式处理层 (Format Processing)
  • SrsRtmpFormat: 解析RTMP音视频标签
  • SrsAudioTransCoder: AAC到OPUS音频转码

关键类的职责分工:

SrsRtmpConn类的acquire_publish方法中,系统首先判断是否开启了RTC功能。如果条件满足,会根据RTMP推流的请求信息创建SrsRtcSource对象,然后创建SrsRtcFromRtmpBridger桥接对象进行协议转换。

SRS流媒体服务器RTMPtoRTC转换架构图在这里插入图片描述

关键转换流程

初始化阶段:

  1. RTMP客户端发起推流请求
  2. SrsRtmpConn::acquire_publish()处理请求
  3. 检查RTC服务是否启用
  4. 创建SrsRtcSource实例
  5. 创建SrsRtcFromRtmpBridger桥接器
  6. 将桥接器注册到SrsLiveSource

媒体处理阶段:

  1. RTMP流数据通过SrsLiveSource::on_audio/on_video传递

  2. 桥接器通过SrsRtmpFormat解析RTMP标签

  3. 音频处理:

    • 过滤非AAC编码数据
    • 为AAC数据添加ADTS头
    • 通过SrsAudioTransCoder转码为OPUS
  4. 视频处理:

    • 缓存H.264序列头
    • 过滤B帧(WebRTC不支持)
    • 为IDR帧打包SPS/PPS数据
    • 根据配置合并或分离NALU

RTP封装阶段细节:

  1. 将处理后的媒体数据封装为RTP包
  2. 单个NALU对应一个RTP包,或使用FUA分片
  3. 将RTP包投递到SrsRtcSource

4) SRS服务器WebRTC连接流程分析

SRS WebRTC连接的两个阶段

webrtc初始化阶段是媒体协商和地址协商。地址协商(ICE过程 )首先地址收集:本地 内网穿透 NAT探测(stun包进行)turn服务器地址,连接检查,提名。媒体协商,收集本地支持媒体编码类型,然后就是通过sdp交换。完成媒体协商和地址协商。随后媒体数据处理阶段!

srs服务器基于http来进行初始化阶段,使用udp来做媒体数据处理阶段。通过http 请求处理推拉流资源初始化,并通过ice-ufrag索引保存资源,返回给网页请求者。后续请求者携带ice-ufrag标识找到udp通道实现媒体处理阶段

初始化阶段(信令阶段)

用户描述中关于HTTP用于初始化阶段的说法是正确的。SRS确实使用HTTP API来处理WebRTC的信令交换和资源初始化:

  1. HTTP请求处理:客户端(浏览器)通过HTTP POST请求向SRS服务器发送包含SDP offer的信息
  2. 资源分配:SRS服务器接收到请求后,会为该连接分配资源并生成对应的SDP answer
  3. ice-ufrag标识:SRS确实会使用ice-ufrag作为标识符来索引和保存WebRTC会话资源
    1. 唯一标识:每个WebRTC连接都有唯一的ice-ufrag标识,媒体阶段的UDP包中包含ice-ufrag,使服务器能够将数据包路由到正确的会话。
  4. ICE-Lite模式:SRS采用ICE-Lite模式,只需响应客户端的连接检查请求
    1. 候选地址:SRS通常只提供一个host类型的候选地址(服务器的IP和端口)
    2. 连接建立:客户端发送STUN绑定请求,SRS验证ice-ufrag和ice-pwd后建立连接
  5. RTC UDP传输:媒体数据通过UDP通道传输(默认端口8000)

webrtc媒体处理阶段

SRS实现RTC标准流程逻辑:

  • 先进行ICE(STUN)连接建立
    • 首次ICE地址协商收到STUN包,后续udp连接会执行DTLS-SRTP过程,生成会话密钥。启动推拉流协程!
  • 再进行DTLS握手,建立安全SRTP通道
    • DTLS-SRTP是WebRTC中常用的安全方案,先通过DTLS握手交换密钥,再用SRTP加密音视频流。
  • 根据SSRC找到对应流对象,运行协程模式异步处理媒体
    • SrsRtcPublishStream/SrsRtcPlayStream,基于协程的单线程模型处理媒体流,协程负责异步接收和处理RTP包,保证高效和低延迟。
  • 最后调用on_audio()和on_video()处理媒体数据
    • 音频处理
      • 通过SrsRtmpFormat解析AAC音频数据。
      • 过滤非AAC编码的音频包。
      • 对AAC音频数据添加ADTS头。
      • 使用SrsAudioTranscoder将AAC转码为WebRTC支持的Opus格式。
    • 视频处理
      • 解析H.264视频数据,识别NALU类型。
      • 过滤B帧(WebRTC不支持B帧)。
      • 对关键帧(IDR帧)进行SPS/PPS的打包。
      • 支持NALU合并(STAP-A)或分片(FU-A)处理。
  • RTP封装
    • 将处理后的音视频样本封装成RTP包。
    • 支持单NALU RTP包、STAP-A打包多个NALU、FU-A分片大NALU。
    • RTP包中包含必要的序列号、时间戳等信息,符合WebRTC标准。
  • RTP包发送与分发
    • RTP包通过SrsRtcSourceon_rtp()接口发送到WebRTC客户端。
    • SRS作为SFU(Selective Forwarding Unit),负责将RTP包转发给对应的WebRTC观看端。

sdp协商 → dtls_srtp过程 → 协程开启 → 接收RTMP音视频数据 → 解析并过滤 → 音频转码AAC到Opus → 视频NALU处理和过滤 → RTP封装 → (rtc拉流)通过WebRTC发送RTP包

5) SRS服务器WebRTC阶段源码分析

SRS RTMPtoRTC 初始化阶段(信令阶段)

srs服务启动会调用SrsRtcServer::listen_api 被注册到 /rtc/v1/play/ 路径。当客户端向 /rtc/v1/play/ 发送 HTTP POST 请求时,HTTP 服务器会调用已注册的处理器的 SrsGoApiRtcPlay::do_serve_http。该方法处理 WebRTC 播放请求,解析 SDP offer,创建 WebRTC 会话,并返回 SDP answer!

/*** 注册RTC服务的HTTP API处理器* * 该函数负责为RTC服务器注册各种HTTP API处理器,包括:* - 播放接口(/rtc/v1/play/)* - 发布接口(/rtc/v1/publish/)* - WHIP协议接口(/rtc/v1/whip/)* - WHIP播放接口(/rtc/v1/whip-play/)* - 模拟器模式下的NACK接口(/rtc/v1/nack/)* * @return 返回srs_error_t错误码,srs_success表示成功,否则为失败* @remark 注意HTTP API多路复用器目前是从SRS实例获取,未来需要改为从混合管理器获取*/
srs_error_t SrsRtcServer::listen_api()
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 从SRS混合服务器实例中获取HTTP API多路复用器// TODO: FIXME: Fetch api from hybrid manager, not from SRS.SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();// 注册RTC播放处理器,处理/rtc/v1/play/路径的请求if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {// 注册失败时包装错误信息并返回return srs_error_wrap(err, "handle play");}// 注册RTC发布处理器,处理/rtc/v1/publish/路径的请求if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {// 注册失败时包装错误信息并返回return srs_error_wrap(err, "handle publish");}// 注册RTC WHIP处理器,处理/rtc/v1/whip/路径的请求// 虽然WHIP主要是发布协议,但也可用于播放// Generally, WHIP is a publishing protocol, but it can be also used as playing.if ((err = http_api_mux->handle("/rtc/v1/whip/", new SrsGoApiRtcWhip(this))) != srs_success) {// 注册失败时包装错误信息并返回return srs_error_wrap(err, "handle whip");}// 创建另一个WHIP挂载点,支持使用与发布相同的查询字符串进行播放// We create another mount, to support play with the same query string as publish.if ((err = http_api_mux->handle("/rtc/v1/whip-play/", new SrsGoApiRtcWhip(this))) != srs_success) {// 注册失败时包装错误信息并返回return srs_error_wrap(err, "handle whip play");}#ifdef SRS_SIMULATOR// 在模拟器模式下,注册RTC NACK处理器if ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) {// 注册失败时包装错误信息并返回return srs_error_wrap(err, "handle nack");}
#endif// 返回处理结果return err;
}/*** 处理HTTP路由模式的注册** 注册指定的URL模式(pattern)和对应的HTTP处理器(handler),支持以下特性:* 1. 检查空模式或空处理器* 2. 防止重复注册显式匹配的模式* 3. 自动处理虚拟主机路径(vhost)* 4. 为以斜杠结尾的模式自动创建隐式重定向** @param pattern 要注册的URL模式,如"/api/v1/"* @param handler 对应的HTTP请求处理器* @return 成功返回srs_success,失败返回错误码* @remark 如果模式以斜杠结尾,会自动为非斜杠版本创建302重定向* @see SrsHttpMuxEntry 路由条目数据结构* @see ISrsHttpHandler HTTP处理器接口*/
srs_error_t SrsHttpServeMux::handle(std::string pattern, ISrsHttpHandler *handler)
{// 确保处理器不为空srs_assert(handler);// 检查URL模式是否为空,如果为空则返回错误if (pattern.empty()){return srs_error_new(ERROR_HTTP_PATTERN_EMPTY, "empty pattern");}// 检查是否已存在显式匹配的相同模式if (entries.find(pattern) != entries.end()){SrsHttpMuxEntry *exists = entries[pattern];if (exists->explicit_match){return srs_error_new(ERROR_HTTP_PATTERN_DUPLICATED, "pattern=%s exists", pattern.c_str());}}// 处理虚拟主机路径,提取虚拟主机名并注册到vhosts映射std::string vhost = pattern;if (pattern.at(0) != '/'){if (pattern.find("/") != string::npos){vhost = pattern.substr(0, pattern.find("/"));}vhosts[vhost] = handler;}// 创建新的多路复用器入口并注册到entries映射if (true){SrsHttpMuxEntry *entry = new SrsHttpMuxEntry();entry->explicit_match = true;entry->handler = handler;entry->pattern = pattern;entry->handler->entry = entry;// 如果已存在相同模式的入口,先释放它if (entries.find(pattern) != entries.end()){SrsHttpMuxEntry *exists = entries[pattern];srs_freep(exists);}entries[pattern] = entry; // entries[/live/livestream.flv] = entry}// 帮助性行为:// 如果模式以"/"结尾,为去掉末尾"/"的路径添加隐式永久重定向// 这可以被显式注册覆盖// Helpful behavior:// If pattern is /tree/, insert an implicit permanent redirect for /tree.// It can be overridden by an explicit registration.if (pattern != "/" && !pattern.empty() && pattern.at(pattern.length() - 1) == '/'){// 生成不带末尾斜杠的模式std::string rpattern = pattern.substr(0, pattern.length() - 1);SrsHttpMuxEntry *entry = NULL;// 释放已存在的隐式入口// free the exists implicit entryif (entries.find(rpattern) != entries.end()){entry = entries[rpattern];}// 如果没有显式匹配的入口,创建隐式重定向// create implicit redirect.if (!entry || !entry->explicit_match){srs_freep(entry);// 创建新的入口,重定向到带斜杠的路径entry = new SrsHttpMuxEntry();entry->explicit_match = false;entry->handler = new SrsHttpRedirectHandler(pattern, SRS_CONSTS_HTTP_Found);entry->pattern = pattern;entry->handler->entry = entry;// 注册到不带末尾斜杠的路径entries[rpattern] = entry;}}// 返回成功return srs_success;
}

创建session,其中session就包含了SrsRtcConnection,底层又创建了SrsRtcPlayStream。即创建会话(连接和音视频轨道)并返回sdp

/*** 处理RTC播放的HTTP请求** @param w HTTP响应写入器* @param r HTTP请求消息* @return 返回错误码,成功返回srs_success** *  Request:POST /rtc/v1/play/{"sdp":"offer...", "streamurl":"webrtc://r.ossrs.net/live/livestream","api":'http...", "clientip":"..."}
Response:{"sdp":"answer...", "sid":"..."}
// @see https://github.com/rtcdn/rtcdn-draft* @remark 若处理失败会返回400错误码并记录警告日志*/
srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r)
{srs_error_t err = srs_success;SrsJsonObject* res = SrsJsonAny::object();SrsAutoFree(SrsJsonObject, res);if ((err = do_serve_http(w, r, res)) != srs_success) { //收集参数srs_warn("RTC error %s", srs_error_desc(err).c_str()); srs_freep(err);return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);}return srs_api_response(w, r, res->dumps()); // 返回json
}/*** 处理RTC播放请求的HTTP服务函数** @param w HTTP响应写入器* @param r HTTP请求消息* @param res 用于构建响应的JSON对象* @return 错误码,成功返回srs_success*/
srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, SrsJsonObject *res)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 对于每个RTC会话,我们使用短期HTTP连接// For each RTC session, we use short-term HTTP connection.SrsHttpHeader* hdr = w->header();hdr->set("Connection", "Close");// 解析请求体中的JSON对象// Parse req, the request json object, from body.SrsJsonObject* req = NULL;SrsAutoFree(SrsJsonObject, req);if (true) {// 读取整个请求体string req_json;if ((err = r->body_read_all(req_json)) != srs_success) {return srs_error_wrap(err, "read body");}// 将请求体解析为JSON对象SrsJsonAny* json = SrsJsonAny::loads(req_json);if (!json || !json->is_object()) {return srs_error_new(ERROR_RTC_API_BODY, "invalid body %s", req_json.c_str());}req = json->to_object();}// 从请求对象中获取参数// Fetch params from req object.SrsJsonAny* prop = NULL;if ((prop = req->ensure_property_string("sdp")) == NULL) {return srs_error_wrap(err, "not sdp");}string remote_sdp_str = prop->to_str();// 获取流URLif ((prop = req->ensure_property_string("streamurl")) == NULL) {return srs_error_wrap(err, "not streamurl");}string streamurl = prop->to_str();// 获取客户端IP地址string clientip;if ((prop = req->ensure_property_string("clientip")) != NULL) {clientip = prop->to_str();}if (clientip.empty()) {// 如果未提供客户端IP,则使用连接的远程IPclientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();// 使用代理传递的原始IP覆盖// Overwrite by ip from proxy.        string oip = srs_get_original_ip(r);if (!oip.empty()) {clientip = oip;}}// 获取API地址string api;if ((prop = req->ensure_property_string("api")) != NULL) {api = prop->to_str();}// 获取事务ID todo 客户端唯一标识符string tid;if ((prop = req->ensure_property_string("tid")) != NULL) {tid = prop->to_str();}// 创建RTC用户配置对象// The RTC user config object.SrsRtcUserConfig ruc;ruc.req_->ip = clientip;ruc.api_ = api;// 解析RTMP URL,提取tcUrl和streamsrs_parse_rtmp_url(streamurl, ruc.req_->tcUrl, ruc.req_->stream);// 发现tc_url的各个组成部分srs_discovery_tc_url(ruc.req_->tcUrl, ruc.req_->schema, ruc.req_->host, ruc.req_->vhost,ruc.req_->app, ruc.req_->stream, ruc.req_->port, ruc.req_->param);// 发现虚拟主机,从配置中解析虚拟主机// discovery vhost, resolve the vhost from configSrsConfDirective* parsed_vhost = _srs_config->get_vhost(ruc.req_->vhost);if (parsed_vhost) {ruc.req_->vhost = parsed_vhost->arg0();}// 获取客户端指定的服务器候选地址(EIP) /eip=192.168.1.100&candidate=192.168.1.100 否则使用本地condidation设置// For client to specifies the candidate(EIP) of server.string eip = r->query_get("eip");if (eip.empty()) {eip = r->query_get("candidate");}// 获取编解码器设置string codec = r->query_get("codec");// 获取客户端指定的SRTP加密设置// For client to specifies whether encrypt by SRTP.string srtp = r->query_get("encrypt");// 获取DTLS设置string dtls = r->query_get("dtls");// 打印RTC播放请求的详细信息srs_trace("RTC play %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s, srtp=%s, dtls=%s",streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app.c_str(),ruc.req_->stream.c_str(), remote_sdp_str.length(),eip.c_str(), codec.c_str(), srtp.c_str(), dtls.c_str());// 设置用户配置的参数ruc.eip_ = eip;ruc.codec_ = codec;ruc.publish_ = false;ruc.dtls_ = (dtls != "false");// 设置SRTP加密选项if (srtp.empty()) {ruc.srtp_ = _srs_config->get_rtc_server_encrypt();} else {ruc.srtp_ = (srtp != "false");}// 解析远程SDP字符串// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.ruc.remote_sdp_str_ = remote_sdp_str;if ((err = ruc.remote_sdp_.parse(remote_sdp_str)) != srs_success) {return srs_error_wrap(err, "parse sdp failed: %s", remote_sdp_str.c_str());}// 检查远程SDP是否合法if ((err = check_remote_sdp(ruc.remote_sdp_)) != srs_success) {return srs_error_wrap(err, "remote sdp check failed");}// 触发HTTP钩子通知播放事件if ((err = http_hooks_on_play(ruc.req_)) != srs_success) {return srs_error_wrap(err, "RTC: http_hooks_on_play");}// 处理HTTP请求,创建会话并生成本地SDPif ((err = serve_http(w, r, &ruc)) != srs_success) {return srs_error_wrap(err, "serve");}// 设置响应的JSON字段res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str()));// 将本地SDP和会话ID添加到响应中  answer返回 json由外部发送// TODO: add candidates in response json?res->set("sdp", SrsJsonAny::str(ruc.local_sdp_str_.c_str()));res->set("sessionid", SrsJsonAny::str(ruc.session_id_.c_str()));return err;
}

创建RTC会话,生成并编码本地SDP,作为answer。

/*** 处理RTC播放请求的HTTP服务函数** @param w HTTP响应写入器* @param r HTTP请求消息* @param ruc RTC用户配置* @return 错误码,成功返回srs_success** 功能:* 1. 检查RTC服务是否启用* 2. 检查RTC流是否活跃* 3. 创建RTC会话* 4. 生成并编码本地SDP* 5. 记录会话相关信息** 错误处理:* - 当RTC服务未启用时返回ERROR_RTC_DISABLED* - 创建会话或编码SDP失败时返回对应错误*/
srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, SrsRtcUserConfig *ruc)
{// 初始化错误对象为成功状态srs_error_t err = srs_success;// 创建本地SDP对象,用于生成应答SrsSdp local_sdp;// 配置SDP和会话的参数// 从配置文件中获取DTLS角色(客户端/服务端)local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(ruc->req_->vhost);// 从配置文件中获取DTLS版本local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(ruc->req_->vhost);// 检查RTC功能是否启用// 获取全局RTC服务器是否启用bool server_enabled = _srs_config->get_rtc_server_enabled();// 获取特定虚拟主机的RTC功能是否启用bool rtc_enabled = _srs_config->get_rtc_enabled(ruc->req_->vhost);// 如果全局启用但虚拟主机禁用,记录警告日志if (server_enabled && !rtc_enabled) {srs_warn("RTC disabled in vhost %s", ruc->req_->vhost.c_str());}// 如果全局或虚拟主机任一禁用,返回错误if (!server_enabled || !rtc_enabled) {return srs_error_new(ERROR_RTC_DISABLED, "Disabled server=%d, rtc=%d, vhost=%s",server_enabled, rtc_enabled, ruc->req_->vhost.c_str());}// 检查RTC流是否活跃(是否已有发布者)bool is_rtc_stream_active = false;// 使用代码块限制变量作用域if (true) {// 获取RTC源对象SrsRtcSource* source = _srs_rtc_sources->fetch(ruc->req_);// 如果源存在且不能发布(已被占用),则流是活跃的is_rtc_stream_active = (source && !source->can_publish());}// 处理RTMP到RTC的转换场景// 如果RTC流不活跃且配置禁止了RTMP转RTC,则检查RTMP源// 参考GitHub issue: https://github.com/ossrs/srs/issues/2728if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc->req_->vhost)) {// 获取RTMP源对象SrsLiveSource* rtmp = _srs_sources->fetch(ruc->req_);// 如果RTMP源存在且活跃,返回错误(禁止RTMP转RTC)if (rtmp && !rtmp->inactive()) {return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc->req_->vhost.c_str());}}// TODO: FIXME: 当服务器启用但虚拟主机禁用时,应报告错误// 创建RTC会话SrsRtcConnection* session = NULL;// 调用RTC服务器创建会话,传入用户配置和本地SDPif ((err = server_->create_session(ruc, local_sdp, &session)) != srs_success) {return srs_error_wrap(err, "create session, dtls=%u, srtp=%u, eip=%s", ruc->dtls_, ruc->srtp_, ruc->eip_.c_str());}// 创建字符串输出流,用于编码SDPostringstream os;// 将本地SDP编码为字符串if ((err = local_sdp.encode(os)) != srs_success) {return srs_error_wrap(err, "encode sdp");}// 获取编码后的SDP字符串string local_sdp_str = os.str();// 将SDP中的\r\n替换为\\r\\n,以便在JSON中使用string local_sdp_escaped = srs_string_replace(local_sdp_str.c_str(), "\r\n", "\\r\\n");// 保存本地SDP字符串到用户配置ruc->local_sdp_str_ = local_sdp_str;// 保存会话ID(用户名)到用户配置ruc->session_id_ = session->username();// 记录RTC会话信息的跟踪日志srs_trace("RTC username=%s, dtls=%u, srtp=%u, offer=%dB, answer=%dB", session->username().c_str(),ruc->dtls_, ruc->srtp_, ruc->remote_sdp_str_.length(), local_sdp_escaped.length());// 记录远程SDP(客户端提供的offer)srs_trace("RTC remote offer: %s", srs_string_replace(ruc->remote_sdp_str_.c_str(), "\r\n", "\\r\\n").c_str());// 记录本地SDP(服务器生成的answer)srs_trace("RTC local answer: %s", local_sdp_escaped.c_str());// 返回处理结果return err;
}

初始化SrsRtcServer的会话资源,创建SrsRtcConnection,SrsRtcSource。

/*** 创建RTC会话并初始化相关资源** @param ruc RTC用户配置,包含请求信息和发布/播放标志* @param local_sdp 用于存储生成的本地SDP信息* @param psession 输出参数,返回创建的RTC连接会话对象* @return 返回错误码,srs_success表示成功** @remark 如果是发布流,会检查源是否已被占用* @remark 创建失败时会自动释放会话对象* @warning 调用者需要确保psession指向有效的指针地址*/
srs_error_t SrsRtcServer::create_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 获取当前上下文ID,用于跟踪整个会话的生命周期SrsContextId cid = _srs_context->get_id();// 获取请求对象的引用SrsRequest* req = ruc->req_;// 获取或创建对应的RTC源SrsRtcSource* source = NULL;if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {return srs_error_wrap(err, "create source");}// 如果是发布流,检查源是否已被占用if (ruc->publish_ && !source->can_publish()) {return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str());}// 创建RTC连接对象// TODO: FIXME: add do_create_session to error process.SrsRtcConnection* session = new SrsRtcConnection(this, cid);if ((err = do_create_session(ruc, local_sdp, session)) != srs_success) {// 创建会话失败时,释放会话对象并返回错误srs_freep(session);return srs_error_wrap(err, "create session");}// 通过输出参数返回创建的会话对象*psession = session;return err;
}

创建srsRtcPlayStream/SrsRtcPublishStream,设置媒体协商和ICE协商相关参数,并编码成sdp格式。

/*** 创建RTC会话并初始化SDP协商** 根据用户配置创建RTC会话,处理SDP协商过程,包括:* 1. 根据发布/播放类型添加对应的处理器* 2. 设置音视频轨道状态* 3. 生成ICE凭证和会话标识* 4. 设置SDP的ICE和DTLS参数* 5. 添加候选地址* 6. 初始化会话并添加到管理器** @param ruc 用户配置,包含请求、远程SDP等信息* @param local_sdp 本地SDP对象,用于设置协商参数* @param session RTC连接会话对象* @return 错误码,srs_success表示成功*/
srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 获取请求对象SrsRequest* req = ruc->req_;// 根据会话类型(发布或播放)添加相应的处理器// 首先为SDP协商添加发布者或播放器,获取媒体信息if (ruc->publish_) {// 如果是发布流,添加发布者if ((err = session->add_publisher(ruc, local_sdp)) != srs_success) {return srs_error_wrap(err, "add publisher");}} else {// 如果是播放流,添加播放器 new srsRtcPlayStreamif ((err = session->add_player(ruc, local_sdp)) != srs_success) {return srs_error_wrap(err, "add player");}}// 默认所有音视频轨道都是非活跃的,需要启用它们session->set_all_tracks_status(req->get_stream_url(), ruc->publish_, true);// 生成本地ICE密码,用于STUN协商std::string local_pwd = srs_random_str(32);std::string local_ufrag = "";// 生成用户名,实际上是ICE协商的标识符// TODO: FIXME: 变量名可优化,这里并非真正的用户名std::string username = "";while (true) {// 生成随机的本地ufraglocal_ufrag = srs_random_str(8);// 组合本地和远程ufrag作为会话的唯一标识username = local_ufrag + ":" + ruc->remote_sdp_.get_ice_ufrag();// 确保生成的标识符在系统中唯一if (!_srs_rtc_manager->find_by_name(username)) {break;}}// 设置本地SDP的ICE参数local_sdp.set_ice_ufrag(local_ufrag);local_sdp.set_ice_pwd(local_pwd);// 设置指纹算法和指纹local_sdp.set_fingerprint_algo("sha-256");local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint());// 允许模拟服务器的外部IP,添加候选地址if (true) {// 获取RTC服务器监听端口int listen_port = _srs_config->get_rtc_server_listen();// 发现可能的候选地址set<string> candidates = discover_candidates(ruc);for (set<string>::iterator it = candidates.begin(); it != candidates.end(); ++it) {// 解析每个候选地址和端口string hostname; int port = listen_port;srs_parse_hostport(*it, hostname, port);// 添加到本地SDP中local_sdp.add_candidate(hostname, port, "host");}// 打印使用的候选地址vector<string> v = vector<string>(candidates.begin(), candidates.end());srs_trace("RTC: Use candidates %s", srs_join_vector_string(v, ", ").c_str());}// 根据配置设置协商的DTLS参数local_sdp.session_negotiate_ = local_sdp.session_config_;// 根据远程SDP设置协商的DTLS角色if (ruc->remote_sdp_.get_dtls_role() == "active") {// 如果远程是active,那么本地设为passivelocal_sdp.session_negotiate_.dtls_role = "passive";} else if (ruc->remote_sdp_.get_dtls_role() == "passive") {// 如果远程是passive,那么本地设为activelocal_sdp.session_negotiate_.dtls_role = "active";} else if (ruc->remote_sdp_.get_dtls_role() == "actpass") {// 如果远程是actpass,使用本地配置的角色local_sdp.session_negotiate_.dtls_role = local_sdp.session_config_.dtls_role;} else {// 默认情况下,在应答中使用passive角色// 参考RFC4145#section-4.1local_sdp.session_negotiate_.dtls_role = "passive";}// 在本地SDP中设置协商后的DTLS角色local_sdp.set_dtls_role(local_sdp.session_negotiate_.dtls_role);// 设置会话的远程SDPsession->set_remote_sdp(ruc->remote_sdp_);// 设置会话的本地SDP(必须在初始化会话对象前完成)session->set_local_sdp(local_sdp);// 设置会话状态为等待STUNsession->set_state(WAITING_STUN);// 在设置本地SDP之后初始化会话if ((err = session->initialize(req, ruc->dtls_, ruc->srtp_, username)) != srs_success) {return srs_error_wrap(err, "init");}// 使用用户名将会话添加到管理器中 username "x42rorx0:GcpM"_srs_rtc_manager->add_with_name(username, session);// 返回处理结果return err;
}

设置媒体协商相关参数,并创建推拉流实例。

/*** 为RTC连接添加播放器** @param ruc RTC用户配置,包含请求信息和远程SDP* @param local_sdp 用于存储生成的本地SDP* @return 成功返回srs_success,失败返回错误码** @remark 该函数会执行以下操作:* 1. 检查并执行劫持器的before_play回调* 2. 协商播放能力并建立播放订阅关系* 3. 生成本地SDP描述* 4. 创建播放器实例** @note 当前仅支持单个音频轨道,多个视频轨道*/
srs_error_t SrsRtcConnection::add_player(SrsRtcUserConfig *ruc, SrsSdp &local_sdp)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 获取请求对象,包含流信息等SrsRequest* req = ruc->req_;// 如果存在RTC劫持器,先执行before_play回调,允许外部自定义处理if (_srs_rtc_hijacker) {if ((err = _srs_rtc_hijacker->on_before_play(this, req)) != srs_success) {// 劫持器回调失败则直接返回错误return srs_error_wrap(err, "before play");}}// 用于存储协商后的播放轨道关系,key为ssrc,value为轨道描述std::map<uint32_t, SrsRtcTrackDescription*> play_sub_relations;// 协商播放能力,获取可用的音视频轨道if ((err = negotiate_play_capability(ruc, play_sub_relations)) != srs_success) {// 协商失败则返回错误return srs_error_wrap(err, "play negotiate");}// 如果没有协商出任何可播放的轨道,返回错误if (!play_sub_relations.size()) {return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no play relations");}// 创建流描述对象,用于后续生成本地SDPSrsRtcSourceDescription* stream_desc = new SrsRtcSourceDescription();// 自动释放stream_desc,防止内存泄漏SrsAutoFree(SrsRtcSourceDescription, stream_desc);// 遍历所有协商出来的轨道,填充到流描述中std::map<uint32_t, SrsRtcTrackDescription*>::iterator it = play_sub_relations.begin();while (it != play_sub_relations.end()) {SrsRtcTrackDescription* track_desc = it->second;// 只支持一个音频轨道,找到第一个音频轨道后赋值// TODO: FIXME: 目前只支持一个音频轨道if (track_desc->type_ == "audio" && !stream_desc->audio_track_desc_) {stream_desc->audio_track_desc_ = track_desc->copy();}// 支持多个视频轨道,全部加入到描述中if (track_desc->type_ == "video") {stream_desc->video_track_descs_.push_back(track_desc->copy());}++it;}// 根据流描述和协商结果生成本地SDP,供客户端应答if ((err = generate_play_local_sdp(req, local_sdp, stream_desc, ruc->remote_sdp_.is_unified(), ruc->audio_before_video_)) != srs_success) {// 生成SDP失败则返回错误return srs_error_wrap(err, "generate local sdp");}// 创建播放器实例,并建立订阅关系if ((err = create_player(req, play_sub_relations)) != srs_success) {// 创建播放器失败则返回错误return srs_error_wrap(err, "create player");}// 返回处理结果,成功则为srs_successreturn err;
}

创建播放器实例,并建立订阅关系

/*** 创建RTC播放器并初始化** @param req 播放请求对象,包含流URL等信息* @param sub_relations 轨道描述映射表,key为轨道ID,value为轨道描述对象* @return 错误码,成功返回srs_success** @remark 如果相同流URL的播放器已存在,则直接返回成功* @remark 会检查SSRC是否重复,包括主SSRC、FEC SSRC和RTX SSRC* @remark 如果DTLS已建立(ESTABLISHED状态),会立即启动播放器* @remark 支持TWCC(Transport Wide Congestion Control)扩展* @warning 不支持reload功能(代码中有TODO标记)*/
srs_error_t SrsRtcConnection::create_player(SrsRequest *req, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 如果已存在相同流URL的播放器,则直接返回成功,避免重复创建if(players_.end() != players_.find(req->get_stream_url())) {return err;}// 创建新的RTC播放流对象,传入当前连接和上下文IDSrsRtcPlayStream* player = new SrsRtcPlayStream(this, _srs_context->get_id());// 调用初始化函数,传入请求和轨道映射关系if ((err = player->initialize(req, sub_relations)) != srs_success) {// 初始化失败则释放player对象并返回错误srs_freep(player);return srs_error_wrap(err, "SrsRtcPlayStream init");}// 插入到players_映射表中,key为流URLplayers_.insert(make_pair(req->get_stream_url(), player));// 为快速搜索创建SSRC与播放器之间的映射for(map<uint32_t, SrsRtcTrackDescription*>::iterator it = sub_relations.begin(); it != sub_relations.end(); ++it) {SrsRtcTrackDescription* track_desc = it->second;map<uint32_t, SrsRtcPlayStream*>::iterator it_player = players_ssrc_map_.find(track_desc->ssrc_);if((players_ssrc_map_.end() != it_player) && (player != it_player->second)) {return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate ssrc %d, track id: %s",track_desc->ssrc_, track_desc->id_.c_str());}players_ssrc_map_[track_desc->ssrc_] = player;if(0 != track_desc->fec_ssrc_) {if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->fec_ssrc_)) {return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate fec ssrc %d, track id: %s",track_desc->fec_ssrc_, track_desc->id_.c_str());}players_ssrc_map_[track_desc->fec_ssrc_] = player;}if(0 != track_desc->rtx_ssrc_) {if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->rtx_ssrc_)) {return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate rtx ssrc %d, track id: %s",track_desc->rtx_ssrc_, track_desc->id_.c_str());}players_ssrc_map_[track_desc->rtx_ssrc_] = player;}}// TODO: FIXME: Support reload.// The TWCC ID is the ext-map ID in local SDP, and we set to enable GCC.// Whatever the ext-map, we will disable GCC when config disable it.int twcc_id = 0;if (true) {std::map<uint32_t, SrsRtcTrackDescription*>::iterator it = sub_relations.begin();while (it != sub_relations.end()) {if (it->second->type_ == "video") {SrsRtcTrackDescription* track = it->second;twcc_id = track->get_rtp_extension_id(kTWCCExt);}++it;}}srs_trace("RTC connection player gcc=%d", twcc_id);// 如果DTLS已建立,则立即启动播放器,保证后续流程正常if(ESTABLISHED == state_) {if(srs_success != (err = player->start())) {return srs_error_wrap(err, "start player");}}// 返回处理结果,成功则为srs_successreturn err;
}

轨道创建

/*** 初始化RTC播放流** @param req 请求对象,会被复制保存* @param sub_relations 轨道描述映射表,key为SSRC,value为轨道描述* @return 错误码,成功返回srs_success** 功能说明:* 1. 复制请求对象并保存* 2. 获取或创建RTC源* 3. 根据轨道类型创建音频/视频发送轨道* 4. 配置NACK参数并应用到所有轨道* 5. 返回初始化结果*/
srs_error_t SrsRtcPlayStream::initialize(SrsRequest *req, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations)
{srs_error_t err = srs_success;req_ = req->copy();if ((err = _srs_rtc_sources->fetch_or_create(req_, &source_)) != srs_success) {return srs_error_wrap(err, "rtc fetch source failed");}for (map<uint32_t, SrsRtcTrackDescription*>::iterator it = sub_relations.begin(); it != sub_relations.end(); ++it) {uint32_t ssrc = it->first;SrsRtcTrackDescription* desc = it->second;if (desc->type_ == "audio") {SrsRtcAudioSendTrack* track = new SrsRtcAudioSendTrack(session_, desc);audio_tracks_.insert(make_pair(ssrc, track));}if (desc->type_ == "video") {SrsRtcVideoSendTrack* track = new SrsRtcVideoSendTrack(session_, desc);video_tracks_.insert(make_pair(ssrc, track));}}// TODO: FIXME: Support reload.nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost);nack_no_copy_ = _srs_config->get_rtc_nack_no_copy(req->vhost);srs_trace("RTC player nack=%d, nnc=%d", nack_enabled_, nack_no_copy_);// Setup tracks.for (map<uint32_t, SrsRtcAudioSendTrack*>::iterator it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) {SrsRtcAudioSendTrack* track = it->second;track->set_nack_no_copy(nack_no_copy_);}for (map<uint32_t, SrsRtcVideoSendTrack*>::iterator it = video_tracks_.begin(); it != video_tracks_.end(); ++it) {SrsRtcVideoSendTrack* track = it->second;track->set_nack_no_copy(nack_no_copy_);}return err;
}

调用栈

音频轨道

(gdb) bt
#0  SrsRtcSendTrack::SrsRtcSendTrack (this=0x5555561f2bd0, session=0x55555611b520, track_desc=0x5555561f2000, is_audio=true)at src/app/srs_app_rtc_source.cpp:2584
#1  0x0000555555833757 in SrsRtcAudioSendTrack::SrsRtcAudioSendTrack (this=0x5555561f2bd0, session=0x55555611b520, track_desc=0x5555561f2000)at src/app/srs_app_rtc_source.cpp:2721
#2  0x00005555557ddc1b in SrsRtcPlayStream::initialize (this=0x555556145d80, req=0x55555615ab60, sub_relations=...) at src/app/srs_app_rtc_conn.cpp:482
#3  0x00005555557f1a1f in SrsRtcConnection::create_player (this=0x55555611b520, req=0x55555615ab60, sub_relations=...) at src/app/srs_app_rtc_conn.cpp:3603
#4  0x00005555557e74ec in SrsRtcConnection::add_player (this=0x55555611b520, ruc=0x5555560e5760, local_sdp=...) at src/app/srs_app_rtc_conn.cpp:2065
#5  0x0000555555822587 in SrsRtcServer::do_create_session (this=0x555555ed7080, ruc=0x5555560e5760, local_sdp=..., session=0x55555611b520)at src/app/srs_app_rtc_server.cpp:569
#6  0x0000555555822395 in SrsRtcServer::create_session (this=0x555555ed7080, ruc=0x5555560e5760, local_sdp=..., psession=0x5555560e4f60)at src/app/srs_app_rtc_server.cpp:547
#7  0x000055555583cc1e in SrsGoApiRtcPlay::serve_http (this=0x555555fa5b20, w=0x5555560e5e80, r=0x55555613e300, ruc=0x5555560e5760)at src/app/srs_app_rtc_api.cpp:227
#8  0x000055555583bd6e in SrsGoApiRtcPlay::do_serve_http (this=0x555555fa5b20, w=0x5555560e5e80, r=0x55555613e300, res=0x55555611bdb0)at src/app/srs_app_rtc_api.cpp:175
#9  0x000055555583aa58 in SrsGoApiRtcPlay::serve_http (this=0x555555fa5b20, w=0x5555560e5e80, r=0x55555613e300) at src/app/srs_app_rtc_api.cpp:49
#10 0x00005555556a123f in SrsHttpServeMux::serve_http (this=0x555555ed4130, w=0x5555560e5e80, r=0x55555613e300) at src/protocol/srs_http_stack.cpp:727
--Type <RET> for more, q to quit, c to continue without paging--
#11 0x00005555556a2088 in SrsHttpCorsMux::serve_http (this=0x555556143bc0, w=0x5555560e5e80, r=0x55555613e300) at src/protocol/srs_http_stack.cpp:875
#12 0x0000555555779ccb in SrsHttpConn::process_request (this=0x555556141b60, w=0x5555560e5e80, r=0x55555613e300, rid=1) at src/app/srs_app_http_conn.cpp:233
#13 0x00005555557798f6 in SrsHttpConn::process_requests (this=0x555556141b60, preq=0x5555560e5f58) at src/app/srs_app_http_conn.cpp:206
#14 0x0000555555779479 in SrsHttpConn::do_cycle (this=0x555556141b60) at src/app/srs_app_http_conn.cpp:160
#15 0x0000555555778e48 in SrsHttpConn::cycle (this=0x555556141b60) at src/app/srs_app_http_conn.cpp:105
#16 0x0000555555721b22 in SrsFastCoroutine::cycle (this=0x5555560cd430) at src/app/srs_app_st.cpp:272

视频轨道

(gdb) bt
#0  SrsRtcSendTrack::SrsRtcSendTrack (this=0x5555561d9b00, session=0x55555611b4e0, track_desc=0x55555615b730, is_audio=false)at src/app/srs_app_rtc_source.cpp:2584
#1  0x00005555558339e3 in SrsRtcVideoSendTrack::SrsRtcVideoSendTrack (this=0x5555561d9b00, session=0x55555611b4e0, track_desc=0x55555615b730)at src/app/srs_app_rtc_source.cpp:2771
#2  0x00005555557ddcb4 in SrsRtcPlayStream::initialize (this=0x555556040fc0, req=0x555556143c00, sub_relations=...) at src/app/srs_app_rtc_conn.cpp:487
#3  0x00005555557f1a1f in SrsRtcConnection::create_player (this=0x55555611b4e0, req=0x555556143c00, sub_relations=...) at src/app/srs_app_rtc_conn.cpp:3603
#4  0x00005555557e74ec in SrsRtcConnection::add_player (this=0x55555611b4e0, ruc=0x5555561d0aa0, local_sdp=...) at src/app/srs_app_rtc_conn.cpp:2065
#5  0x0000555555822587 in SrsRtcServer::do_create_session (this=0x555555ed7080, ruc=0x5555561d0aa0, local_sdp=..., session=0x55555611b4e0)at src/app/srs_app_rtc_server.cpp:569
#6  0x0000555555822395 in SrsRtcServer::create_session (this=0x555555ed7080, ruc=0x5555561d0aa0, local_sdp=..., psession=0x5555561d02a0)at src/app/srs_app_rtc_server.cpp:547
#7  0x000055555583cc1e in SrsGoApiRtcPlay::serve_http (this=0x555555fa5b20, w=0x5555561d11c0, r=0x55555613e300, ruc=0x5555561d0aa0)at src/app/srs_app_rtc_api.cpp:227
#8  0x000055555583bd6e in SrsGoApiRtcPlay::do_serve_http (this=0x555555fa5b20, w=0x5555561d11c0, r=0x55555613e300, res=0x55555603d730)at src/app/srs_app_rtc_api.cpp:175
#9  0x000055555583aa58 in SrsGoApiRtcPlay::serve_http (this=0x555555fa5b20, w=0x5555561d11c0, r=0x55555613e300) at src/app/srs_app_rtc_api.cpp:49
#10 0x00005555556a123f in SrsHttpServeMux::serve_http (this=0x555555ed4130, w=0x5555561d11c0, r=0x55555613e300) at src/protocol/srs_http_stack.cpp:727
--Type <RET> for more, q to quit, c to continue without paging--
#11 0x00005555556a2088 in SrsHttpCorsMux::serve_http (this=0x5555561f2970, w=0x5555561d11c0, r=0x55555613e300) at src/protocol/srs_http_stack.cpp:875
#12 0x0000555555779ccb in SrsHttpConn::process_request (this=0x55555610aab0, w=0x5555561d11c0, r=0x55555613e300, rid=1) at src/app/srs_app_http_conn.cpp:233
#13 0x00005555557798f6 in SrsHttpConn::process_requests (this=0x55555610aab0, preq=0x5555561d1298) at src/app/srs_app_http_conn.cpp:206
#14 0x0000555555779479 in SrsHttpConn::do_cycle (this=0x55555610aab0) at src/app/srs_app_http_conn.cpp:160
#15 0x0000555555778e48 in SrsHttpConn::cycle (this=0x55555610aab0) at src/app/srs_app_http_conn.cpp:105
#16 0x0000555555721b22 in SrsFastCoroutine::cycle (this=0x55555613e820) at src/app/srs_app_st.cpp:272
#17 0x0000555555721bc6 in SrsFastCoroutine::pfn (arg=0x55555613e820) at src/app/srs_app_st.cpp:287

SRS RTMPtoRTC(媒体处理阶段)

核心入口:SrsRtcServer::on_udp_packet 处理UDP数据包,支持多种协议类型(RTP/RTCP/STUN/DTLS)处理。DTLS-SRTP流程:

  • WebRTC通过DTLS握手交换SRTP主密钥和盐值(Master Key/Salt)
  • 生成SRTP会话密钥:key = HMAC-SHA1(master_key, label || session_id)
  • 媒体流使用SRTP加密,控制信道(如SCTP)直接由DTLS加密。
/*** 处理UDP数据包,支持多种协议类型(RTP/RTCP/STUN/DTLS)** @param skt UDP套接字对象,包含接收到的数据包* @return 错误码,srs_success表示成功** 功能说明:* 1. 根据数据包类型(RTP/RTCP/STUN/DTLS)进行分发处理* 2. 维护会话活跃状态* 3. 支持劫持器优先处理RTCP包* 4. 处理ICE协议(STUN)的连接建立和NAT穿越* 5. 统计各类协议数据包数量** 注意事项:* - 必须保证会话存在才能处理RTP/RTCP/DTLS数据包* - STUN包可能在会话建立前到达(ICE trickle情况)* - 性能优化:RTP处理不自动切换上下文*/
srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket *skt)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 默认会话为NULL,根据数据包内容找到对应的会话SrsRtcConnection* session = NULL;// 获取数据包内容和大小char* data = skt->data(); int size = skt->size();// 判断数据包类型:是否为RTP或RTCP数据包bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)data, size);// 如果是RTP或RTCP,进一步判断是否为RTCP数据包bool is_rtcp = srs_is_rtcp((uint8_t*)data, size);// 获取套接字的快速IDuint64_t fast_id = skt->fast_id();// 先尝试用快速ID查找会话,如果没找到再用对端ID查找if (fast_id) {session = (SrsRtcConnection*)_srs_rtc_manager->find_by_fast_id(fast_id);}if (!session) {string peer_id = skt->peer_id();session = (SrsRtcConnection*)_srs_rtc_manager->find_by_id(peer_id);}// 如果找到会话,更新会话活跃时间if (session) {// 任何数据包都会刷新会话活跃状态session->alive();}// 通知劫持器处理UDP数据包,优先处理RTCP包if (hijacker && is_rtp_or_rtcp && is_rtcp) {bool consumed = false;if (session) {session->switch_to_context();}if ((err = hijacker->on_udp_packet(skt, session, &consumed)) != srs_success) {return srs_error_wrap(err, "hijack consumed=%u", consumed);}// 如果劫持器已处理数据包,直接返回if (consumed) {return err;}}// 处理STUN数据包(ICE协议),处理连接建立和NAT穿越if (!is_rtp_or_rtcp && srs_is_stun((uint8_t*)data, size)) {// 更新STUN数据包计数++_srs_pps_rstuns->sugar;string peer_id = skt->peer_id();// 解码STUN数据包SrsStunPacket ping;if ((err = ping.decode(data, size)) != srs_success) {return srs_error_wrap(err, "decode stun packet failed");}// 如果会话为空,尝试通过用户名查找会话if (!session) {session = find_session_by_username(ping.get_username());}if (session) {session->switch_to_context();}// 记录STUN日志,包括连接信息和ICE协商状态srs_info("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d",peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());// 处理ICE trickle(增量候选者)情况// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.if (!session) {return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64,ping.get_username().c_str(), peer_id.c_str(), fast_id);}// 调用会话处理STUN数据包return session->on_stun(skt, &ping);}// 处理DTLS、RTCP或RTP数据包,这些数据包不支持对端地址变更if (!session) {string peer_id = skt->peer_id();return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id);}// 处理RTP数据包(媒体数据)// 注意:除非出错,否则不切换到会话上下文,这是为了提高性能if (is_rtp_or_rtcp && !is_rtcp) {// 更新RTP数据包计数++_srs_pps_rrtps->sugar;// 调用会话处理RTP数据包err = session->on_rtp(data, size);if (err != srs_success) {session->switch_to_context();}return err;}// 切换到会话上下文session->switch_to_context();// 处理RTCP数据包(控制协议)if (is_rtp_or_rtcp && is_rtcp) {// 更新RTCP数据包计数++_srs_pps_rrtcps->sugar;// 调用会话处理RTCP数据包return session->on_rtcp(data, size);}// 处理DTLS数据包(安全传输)if (srs_is_dtls((uint8_t*)data, size)) {// 更新DTLS数据包计数++_srs_pps_rstuns->sugar;// 调用会话处理DTLS数据包,这将触发后续的握手流程return session->on_dtls(data, size);}
}

dtls握手

srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data)
{// 将DTLS数据包转发给传输层处理// transport_可能是SrsSecurityTransport或其他传输实现return transport_->on_dtls(data, nb_data); //SrsSecurityTransport::on_dtls
}
srs_error_t SrsSecurityTransport::on_dtls(char* data, int nb_data)
{// 将DTLS数据包转发给DTLS实现(dtls_)进行处理// dtls_可能是SrsDtlsImpl的某个实现类,如SrsDtlsServerImplreturn dtls_->on_dtls(data, nb_data);
}
srs_error_t SrsDtls::on_dtls(char* data, int nb_data)
{return impl->on_dtls(data, nb_data);
}
srs_error_t SrsDtlsImpl::on_dtls(char* data, int nb_data)
{srs_error_t err = srs_success;if ((err = do_on_dtls(data, nb_data)) != srs_success) {return srs_error_wrap(err, "on_dtls size=%u, data=[%s]", nb_data,srs_string_dumps_hex(data, nb_data, 32).c_str());}return err;
}/*** 处理DTLS协议数据包** 该函数负责处理接收到的DTLS数据包,包括握手和应用数据。* 1. 重置BIO输入输出缓冲区* 2. 将接收到的数据写入BIO* 3. 执行DTLS握手流程(即使握手已完成也可能需要重传)* 4. 从BIO中读取并处理应用数据** @param data 接收到的DTLS数据包指针* @param nb_data 数据包长度* @return 返回处理结果,成功返回srs_success,失败返回错误对象** @remark 函数内部会处理握手重传和应用数据,最多循环1024次读取BIO中的数据* @see do_handshake() 用于执行DTLS握手流程* @see callback_->write_dtls_data() 用于发送DTLS数据* @see callback_->on_dtls_application_data() 用于处理应用数据*/
srs_error_t SrsDtlsImpl::do_on_dtls(char *data, int nb_data)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 如果握手已完成,打印日志,可能是重传或应用数据if (handshake_done_for_us) {srs_info("DTLS: After done, got %d bytes", nb_data);}// 重置BIO(OpenSSL的I/O抽象)int r0 = 0;// TODO: FIXME: Why reset it before writing?if ((r0 = BIO_reset(bio_in)) != 1) {return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset r0=%d", r0);}if ((r0 = BIO_reset(bio_out)) != 1) {return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset r0=%d", r0);}// 记录DTLS数据包详情state_trace((uint8_t*)data, nb_data, true, r0, SSL_ERROR_NONE, false);// 将收到的数据写入BIOif ((r0 = BIO_write(bio_in, data, nb_data)) <= 0) {// TODO: 0或-1可能是阻塞,应使用BIO_should_retry检查return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write r0=%d", r0);}// 执行握手操作,即使握手已完成也要执行// 因为最后一个DTLS包可能丢失,客户端需要我们重传if ((err = do_handshake()) != srs_success) {return srs_error_wrap(err, "do handshake");}// 如果BIO中还有数据,继续读取,让SSL消费这些数据// 限制最大循环次数,避免死循环for (int i = 0; i < 1024 && BIO_ctrl_pending(bio_in) > 0; i++) {char buf[8092];int r0 = SSL_read(dtls, buf, sizeof(buf));int r1 = SSL_get_error(dtls, r0);// 如果读取失败if (r0 <= 0) {// SSL_ERROR_ZERO_RETURN表示TLS/SSL连接已关闭// 如果是安全关闭(收到关闭警告),就不再读取if (r1 != SSL_ERROR_WANT_READ && r1 != SSL_ERROR_WANT_WRITE) {break;}// 内存中有数据,但SSL_read无法读取,通常是握手数据uint8_t* data = NULL;int size = BIO_get_mem_data(bio_out, (char**)&data);// 记录SSL原始数据state_trace((uint8_t*)data, size, false, r0, r1, false);// 如果有数据需要发送出去if (size > 0 && (err = callback_->write_dtls_data(data, size)) != srs_success) {return srs_error_wrap(err, "dtls send size=%u, data=[%s]", size,srs_string_dumps_hex((char*)data, size, 32).c_str());}continue;}// 记录读取到的DTLS应用数据srs_trace("DTLS: read r0=%d, r1=%d, padding=%d, done=%d, data=[%s]",r0, r1, BIO_ctrl_pending(bio_in), handshake_done_for_us, srs_string_dumps_hex(buf, r0, 32).c_str());// 处理DTLS应用数据if ((err = callback_->on_dtls_application_data(buf, r0)) != srs_success) {return srs_error_wrap(err, "on DTLS data, done=%d, r1=%d, size=%u, data=[%s]", handshake_done_for_us,r1, r0, srs_string_dumps_hex(buf, r0, 32).c_str());}}// 返回处理结果return err;
}

执行DTLS握手过程核心,握手完成后启动所有的发布者和播放者协程运行,接收数据处理。

/*** 执行DTLS握手过程。** 该函数负责处理DTLS握手协议,包括:* 1. 检查是否已完成握手,避免重复处理* 2. 执行SSL握手操作并处理可能出现的错误* 3. 获取握手过程中需要发送的数据* 4. 处理握手完成后的回调通知** @return 返回错误码,srs_success表示成功,否则为错误对象* @remark 握手可能会多次完成,函数内部会处理这种情况* @see on_final_out_data 处理最终输出数据的回调* @see callback_->write_dtls_data 发送DTLS数据的回调* @see on_handshake_done 握手完成后的通知回调*/
srs_error_t SrsDtlsImpl::do_handshake()
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 如果握手已完成,忽略握手包// 如果需要重传握手包,应使用SSL_read处理if (handshake_done_for_us) {return err;}// 执行握手并获取结果int r0 = SSL_do_handshake(dtls);int r1 = SSL_get_error(dtls, r0);// 致命SSL错误,例如对端是DTLS 1.0而我们是DTLS 1.2时没有可用的加密套件if (r0 < 0 && (r1 != SSL_ERROR_NONE && r1 != SSL_ERROR_WANT_READ && r1 != SSL_ERROR_WANT_WRITE)) {return srs_error_new(ERROR_RTC_DTLS, "handshake r0=%d, r1=%d", r0, r1);}// 握手成功,注意可能会完成多次if (r1 == SSL_ERROR_NONE) {handshake_done_for_us = true;}// 获取要发送给对端的数据uint8_t* data = NULL;int size = BIO_get_mem_data(bio_out, (char**)&data);// 记录SSL原始数据state_trace((uint8_t*)data, size, false, r0, r1, false);// 在发送前回调处理最终输出数据if ((err = on_final_out_data(data, size)) != srs_success) {return srs_error_wrap(err, "handle");}// 如果有数据需要发送if (size > 0 && (err = callback_->write_dtls_data(data, size)) != srs_success) {return srs_error_wrap(err, "dtls send size=%u, data=[%s]", size,srs_string_dumps_hex((char*)data, size, 32).c_str());}// 如果握手完成,通知上层if (handshake_done_for_us) {if (((err = on_handshake_done()) != srs_success)) {return srs_error_wrap(err, "done");}}// 返回处理结果return err;
}srs_error_t SrsDtlsServerImpl::on_handshake_done()
{srs_error_t err = srs_success;// Notify connection the DTLS is done.if (((err = callback_->on_dtls_handshake_done()) != srs_success)) {return srs_error_wrap(err, "dtls done");}return err;
}
/*** 安全传输层处理DTLS握手完成事件* * @return 错误码,成功返回srs_success* * 当DTLS握手完成时,初始化SRTP并通知会话握手已完成* 是RTP媒体安全传输(SRTP)的启动点*/
srs_error_t SrsSecurityTransport::on_dtls_handshake_done()
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 如果已经处理过握手完成事件,避免重复处理if (handshake_done) {return err;}// 标记握手已完成handshake_done = true;// TODO: FIXME: 添加DTLS耗时统计// 记录DTLS握手完成日志srs_trace("RTC: DTLS handshake done.");// 初始化SRTP(安全实时传输协议)if ((err = srtp_initialize()) != srs_success) {return srs_error_wrap(err, "srtp init");}// 通知会话连接已建立return session_->on_connection_established();
}
/*** WebRTC连接建立后的处理函数** 该函数在DTLS握手成功后被调用,用于启动所有的发布者和播放者* * @return 错误码,成功返回srs_success** @remark 即使重复收到DTLS完成包(如ARQ重传),也会忽略* @remark 会通知所有的发布者和播放者开始流传输*/
srs_error_t SrsRtcConnection::on_connection_established()
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 如果连接正在销毁中,则忽略此事件if (disposing_) {return err;}// 如果DTLS已完成的数据包收到多次(如ARQ重传),则忽略if(ESTABLISHED == state_) {return err;}// 将连接状态设置为已建立state_ = ESTABLISHED;// 打印日志,显示发布者和播放者数量,以及会话超时时间srs_trace("RTC: session pub=%u, sub=%u, to=%dms connection established", publishers_.size(), players_.size(),srsu2msi(session_timeout));// 启动所有发布者for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {string url = it->first;SrsRtcPublishStream* publisher = it->second;// 打印发布URL已建立的日志srs_trace("RTC: Publisher url=%s established", url.c_str());// 启动发布流,如果失败则返回错误if ((err = publisher->start()) != srs_success) {return srs_error_wrap(err, "start publish");}}// 启动所有播放者for(map<string, SrsRtcPlayStream*>::iterator it = players_.begin(); it != players_.end(); ++it) {string url = it->first;SrsRtcPlayStream* player = it->second;// 打印订阅URL已建立的日志srs_trace("RTC: Subscriber url=%s established", url.c_str());// 启动播放流,如果失败则返回错误if ((err = player->start()) != srs_success) {return srs_error_wrap(err, "start play");}}// 如果存在劫持器,通知DTLS完成事件if (hijacker_) {if ((err = hijacker_->on_dtls_done()) != srs_success) {return srs_error_wrap(err, "hijack on dtls done");}}// 返回处理结果return err;
}

创建并初始化RTC消费者 → 接收RTMP音视频数据 → 解析并过滤 → 音频转码AAC到Opus → 视频NALU处理和过滤 → RTP封装 → 通过WebRTC发送RTP包

/*** RTC播放流的周期处理函数** 主要功能:* 1. 创建并初始化RTC消费者* 2. 从源中dump数据给消费者* 3. 配置实时和消息等待参数* 4. 启动消费循环,不断拉取RTP数据包并发送** @return 返回错误码,srs_success表示成功* @remark 包含TODO项需要后续完善:* - SPS/PPS帧处理* - 性能耗时统计* - 退出事件检查** 注意:* - 使用自动内存管理(SrsAutoFree)确保资源释放* - 支持RTC劫持器扩展点(_srs_rtc_hijacker)* - 使用错误精简打印(SrsErrorPithyPrint)控制错误日志频率*/
srs_error_t SrsRtcPlayStream::cycle()
{srs_error_t err = srs_success;SrsRtcSource* source = source_;SrsRtcConsumer* consumer = NULL;SrsAutoFree(SrsRtcConsumer, consumer);if ((err = source->create_consumer(consumer)) != srs_success) {return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str());}srs_assert(consumer);consumer->set_handler(this);// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.if ((err = source->consumer_dumps(consumer)) != srs_success) {return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str());}realtime = _srs_config->get_realtime_enabled(req_->vhost, true);mw_msgs = _srs_config->get_mw_msgs(req_->vhost, realtime, true);// TODO: FIXME: Add cost in ms.SrsContextId cid = source->source_id();srs_trace("RTC: start play url=%s, source_id=%s/%s, realtime=%d, mw_msgs=%d", req_->get_stream_url().c_str(),cid.c_str(), source->pre_source_id().c_str(), realtime, mw_msgs);SrsErrorPithyPrint* epp = new SrsErrorPithyPrint();SrsAutoFree(SrsErrorPithyPrint, epp);if (_srs_rtc_hijacker) {if ((err = _srs_rtc_hijacker->on_start_consume(session_, this, req_, consumer)) != srs_success) {return srs_error_wrap(err, "on start consuming");}}while (true) {if ((err = trd_->pull()) != srs_success) {return srs_error_wrap(err, "rtc sender thread");}// Wait for amount of packets.SrsRtpPacket* pkt = NULL;consumer->dump_packet(&pkt);if (!pkt) {// TODO: FIXME: We should check the quit event.consumer->wait(mw_msgs);continue;}// Send-out the RTP packet and do cleanup// @remark Note that the pkt might be set to NULL.if ((err = send_packet(pkt)) != srs_success) {uint32_t nn = 0;if (epp->can_print(err, &nn)) {srs_warn("play send packets=%u, nn=%u/%u, err: %s", 1, epp->nn_count, nn, srs_error_desc(err).c_str());}srs_freep(err);}// Free the packet.// @remark Note that the pkt might be set to NULL.srs_freep(pkt);}
}

音频转码opus

当RTMP推流启用RTC桥接时,转码在SrsRtcRtpBuilder::transcode方法中进行。

vhost __defaultVhost__ {rtc {enabled on;rtmp_to_rtc on;	 # 核心转换 SrsRtcFromRtmpBridger::transcodertc_to_rtmp off; # 核心转换 SrsRtmpFromRtcBridger::transcode_audio}
}

这个转码过程将AAC音频转换为Opus格式,并立即通过bridge_->on_rtp(pkt.get())发送到RTC源。

SRS 的 RTMP→WebRTC 转码(AAC→Opus)发生在推流阶段,因为 SRS 将转码器(SrsRtcFromRtmpBridger)注册在 RTMP publish路径上,接收每一路 RTMP 包 时就立即解包、转码并打包为 RTP,保证最低的端到端延迟与持续的 RTP 输出。这种设计将转码融入到 RTMP 消息处理路径中,一旦推流就持续将数据转为符合 WebRTC 的 RTP 流,无需等到有客户端订阅再临时转码

若仅用 RTMP 拉流(不启用 WebRTC),应关闭 rtmp_to_rtc 配置,以免浪费转码资源。

rtmp_to_rtc off;

rtmp_to_rtc数据流向是:

  1. RTMP推流SrsLiveSource (RTMP源)
  2. SrsLiveSourceSrsRtcFromRtmpBridger (RTMP到RTC桥接器)
  3. SrsRtcFromRtmpBridgerSrsRtcSource (RTC源)
  4. SrsRtcSourceRTC consumers (WebRTC拉流客户端)

源码分析:

根据配置文件,是否创建SrsRtcFromRtmpBridger对象
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)if (rtc) {SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc);if ((err = bridger->initialize(req)) != srs_success) {srs_freep(bridger);return srs_error_wrap(err, "bridger init");}source->set_bridger(bridger);}
#endif
srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg){// 如果存在桥接器,则将音频消息传递给桥接器处理// bridger_是SrsLiveSource的 与下文不一样!if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) {// 如果桥接处理失败,包装错误信息并返回return srs_error_wrap(err, "bridger consume audio");}
}srs_error_t SrsRtcFromRtmpBridger::on_audio(SrsSharedPtrMessage* msg)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 检查是否启用了RTMP到RTC的转换功能if (!rtmp_to_rtc) {// 如果未启用转换,直接返回成功return err;}// 使用格式解析器处理音频消息,解析音频编码信息// TODO: FIXME: Support parsing OPUS for RTC.if ((err = format->on_audio(msg)) != srs_success) {// 如果解析失败,返回包装后的错误信息return srs_error_wrap(err, "format consume audio");}// 检查是否成功解析出音频编码器信息// 如果没有acodec,说明编码器未解析或为未知编码器// @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474if (!format->acodec) {// 没有音频编码器信息,直接返回成功return err;}// 获取音频编码器ID,目前只支持AAC和MP3编码// ts support audio codec: aac/mp3SrsAudioCodecId acodec = format->acodec->id;// 检查是否为支持的音频编码格式if (acodec != SrsAudioCodecIdAAC && acodec != SrsAudioCodecIdMP3) {// 不支持的编码格式,直接返回成功return err;}// 目前只处理AAC编码的音频数据// When drop aac audio packet, never transcode.if (acodec != SrsAudioCodecIdAAC) {// 非AAC编码,直接返回成功return err;}// 确保音频数据存在,忽略序列头// ignore sequence headersrs_assert(format->audio);// 声明ADTS音频数据指针和大小变量char* adts_audio = NULL;int nn_adts_audio = 0;// 为AAC原始数据添加ADTS头部信息// TODO: FIXME: Reserve 7 bytes header when create shared message.if ((err = aac_raw_append_adts_header(msg, format, &adts_audio, &nn_adts_audio)) != srs_success) {// 添加ADTS头部失败,返回错误信息return srs_error_wrap(err, "aac append header");}// 检查ADTS音频数据是否成功创建if (!adts_audio) {// 没有ADTS音频数据,直接返回成功return err;}// 创建音频帧对象,用于后续转码处理SrsAudioFrame aac;// 设置音频帧的解码时间戳aac.dts = format->audio->dts;// 设置音频帧的组合时间戳aac.cts = format->audio->cts;// 将ADTS音频数据添加到音频帧中if ((err = aac.add_sample(adts_audio, nn_adts_audio)) == srs_success) {// 如果添加成功,进行AAC到OPUS的转码处理// If OK, transcode the AAC to Opus and consume it.err = transcode(&aac);}// 释放ADTS音频数据内存srs_freepa(adts_audio);// 返回处理结果return err;
}
/*** 打包Opus音频帧为RTP数据包** @param audio 输入的音频帧,包含音频数据和相关信息* @param pkt 输出的RTP数据包,将被填充音频数据和头部信息* @return 返回错误码,srs_success表示成功** 该函数负责将Opus音频帧打包成RTP数据包,设置RTP头部信息包括:* - 负载类型(payload_type)* - SSRC标识* - 帧类型(音频)* - 标记位(marker)* - 序列号(sequence)* - 时间戳(基于dts计算)* 并将音频数据包装为RTP原始负载(raw payload)*/
srs_error_t SrsRtcFromRtmpBridger::package_opus(SrsAudioFrame *audio, SrsRtpPacket *pkt)
{srs_error_t err = srs_success;pkt->header.set_payload_type(audio_payload_type_);pkt->header.set_ssrc(audio_ssrc);pkt->frame_type = SrsFrameTypeAudio;pkt->header.set_marker(true);pkt->header.set_sequence(audio_sequence++);pkt->header.set_timestamp(audio->dts * 48);SrsRtpRawPayload* raw = new SrsRtpRawPayload();pkt->set_payload(raw, SrsRtspPacketPayloadTypeRaw);srs_assert(audio->nb_samples == 1);raw->payload = pkt->wrap(audio->samples[0].bytes, audio->samples[0].size);raw->nn_payload = audio->samples[0].size;return err;
}/*** 处理接收到的RTP包,并将其分发给所有消费者和桥接器* * @param pkt 接收到的RTP数据包* @return 错误码,成功返回srs_success* * @remark 该函数负责将RTP包分发给所有订阅的消费者* @remark 如果启用了桥接功能,还会将RTP包转发给桥接器处理* @warning 如果分发过程中任一消费者或桥接器失败,会立即返回错误*/
srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket *pkt)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 检查服务器是否处于高负载状态,如果是则丢弃数据包// If circuit-breaker is dying, drop packet.if (_srs_circuit_breaker->hybrid_dying_water_level()) {// 记录丢弃的数据包数量,用于统计_srs_pps_aloss2->sugar += (int64_t)consumers.size();// 直接返回成功,不进行后续处理return err;}// 遍历所有消费者,将RTP包分发给每个消费者for (int i = 0; i < (int)consumers.size(); i++) {// 获取当前消费者对象SrsRtcConsumer* consumer = consumers.at(i);// 复制RTP包并加入消费者的队列中if ((err = consumer->enqueue(pkt->copy())) != srs_success) {// 如果分发失败,返回包装后的错误信息return srs_error_wrap(err, "consume message");}}//bridger_是SrsRtcSource的成员 //rtc_to_rtmp on;时进入,将rtc转成rtmp,if (bridger_ && (err = bridger_->on_rtp(pkt)) != srs_success) {// 如果桥接处理失败,返回包装后的错误信息return srs_error_wrap(err, "bridger consume message");}// 返回处理结果return err;
}

rtc_to_rtmp逻辑

当WebRTC推流时,数据流向是这样的:

  1. WebRTC推流SrsRtcSource (RTC源)
  2. SrsRtcSourceSrsFrameToRtmpBridge (RTC到RTMP桥接器)
  3. SrsFrameToRtmpBridgeSrsLiveSource (RTMP源)
/*** rtc推流 rtmp拉流场景。* 将RTC音频数据转码为RTMP格式并发送** @param pkt RTP音频包,包含需要转码的音频数据* @return 返回错误码,srs_success表示成功** 处理流程:* 1. 首次音频时发送AAC头信息* 2. 将RTP音频数据转码为AAC格式* 3. 将转码后的音频数据打包为RTMP格式并发送** 注意:* - 会维护is_first_audio状态标识* - 负责释放转码过程中生成的帧内存*/
srs_error_t SrsRtmpFromRtcBridger::transcode_audio(SrsRtpPacket *pkt)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 获取音频同步时间戳,用于RTMP包// to common message.uint32_t ts = pkt->get_avsync_time();// 如果是第一个音频包,需要先发送AAC序列头if (is_first_audio) {// 获取AAC编码器的序列头数据int header_len = 0;uint8_t* header = NULL;// 从编码器获取AAC序列头codec_->aac_codec_header(&header, &header_len);// 创建RTMP消息对象SrsCommonMessage out_rtmp;// 将AAC序列头打包成RTMP音频包packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio);// 将AAC序列头发送到RTMP源if ((err = source_->on_audio(&out_rtmp)) != srs_success) {// 发送失败,返回包装后的错误信息return srs_error_wrap(err, "source on audio");}// 标记已发送序列头,后续不再发送is_first_audio = false;}// 创建输出音频帧容器std::vector<SrsAudioFrame *> out_pkts;// 获取RTP包中的原始音频负载数据SrsRtpRawPayload *payload = dynamic_cast<SrsRtpRawPayload *>(pkt->payload());// 创建音频帧对象,用于转码输入SrsAudioFrame frame;// 将RTP负载数据添加到音频帧frame.add_sample(payload->payload, payload->nn_payload);// 设置音频帧的时间戳信息frame.dts = ts;frame.cts = 0;// 调用编码器进行OPUS到AAC的转码err = codec_->transcode(&frame, out_pkts);// 如果转码失败,直接返回错误if (err != srs_success) {return err;}// 遍历所有转码后的AAC音频帧for (std::vector<SrsAudioFrame *>::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) {// 创建RTMP消息对象SrsCommonMessage out_rtmp;// 设置RTMP消息的时间戳out_rtmp.header.timestamp = (*it)->dts;// 将AAC音频帧打包成RTMP音频包packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio);// 将AAC音频帧发送到RTMP源 SrsLiveSource::on_audio(SrsCommonMessage *shared_audio)if ((err = source_->on_audio(&out_rtmp)) != srs_success) {// 发送失败,包装错误信息并跳出循环err = srs_error_wrap(err, "source on audio");break;}}// 释放转码后的音频帧内存codec_->free_frames(out_pkts);// 返回处理结果return err;
}

当推流rtmp走Bridge 转码逻辑SrsRtcSource是如何解决循环转码的?

推rtmp流(SrsLiveSource.bridger_这个变量初始化)进行转码后的rtp包,分发逻辑:

向所有RTC consumers分发时,这些是WebRTC拉流客户端,

向bridger分发:这是用于转码到其他协议(如RTMP)的桥接器。(推rtmp流不会进入,这是用于rtc推流SrsRtcSource时会初始化bridger_才会进行转码)

所以不会循环转码

调用栈

dtls协议握手

(gdb) bt
#0  SrsRtcConnection::on_connection_established (this=0x5555562c8580) at src/app/srs_app_rtc_conn.cpp:2310
#1  0x00005555557dbbad in SrsSecurityTransport::on_dtls_handshake_done (this=0x55555609ca70) at src/app/srs_app_rtc_conn.cpp:150
#2  0x0000555555812171 in SrsDtlsServerImpl::on_handshake_done (this=0x55555617a790) at src/app/srs_app_rtc_dtls.cpp:940
#3  0x00005555558108a7 in SrsDtlsImpl::do_handshake (this=0x55555617a790) at src/app/srs_app_rtc_dtls.cpp:607
#4  0x000055555581012f in SrsDtlsImpl::do_on_dtls (this=0x55555617a790, data=0x555556011810 "\026\376", <incomplete sequence \375>, nb_data=580)at src/app/srs_app_rtc_dtls.cpp:516
#5  0x000055555580fe6b in SrsDtlsImpl::on_dtls (this=0x55555617a790, data=0x555556011810 "\026\376", <incomplete sequence \375>, nb_data=580)at src/app/srs_app_rtc_dtls.cpp:479
#6  0x0000555555812665 in SrsDtls::on_dtls (this=0x5555562efa50, data=0x555556011810 "\026\376", <incomplete sequence \375>, nb_data=580)at src/app/srs_app_rtc_dtls.cpp:1035
#7  0x00005555557db9c1 in SrsSecurityTransport::on_dtls (this=0x55555609ca70, data=0x555556011810 "\026\376", <incomplete sequence \375>, nb_data=580)at src/app/srs_app_rtc_conn.cpp:126
#8  0x00005555557e7c03 in SrsRtcConnection::on_dtls (this=0x5555562c8580, data=0x555556011810 "\026\376", <incomplete sequence \375>, nb_data=580)at src/app/srs_app_rtc_conn.cpp:2132
#9  0x0000555555821be0 in SrsRtcServer::on_udp_packet (this=0x555555ed7080, skt=0x7ffff75a7b70) at src/app/srs_app_rtc_server.cpp:489
#10 0x00005555557b4ef2 in SrsUdpMuxListener::cycle (this=0x555555f202b0) at src/app/srs_app_listener.cpp:620
#11 0x0000555555721b22 in SrsFastCoroutine::cycle (this=0x555555ffe150) at src/app/srs_app_st.cpp:272
rtmp_to_rtc 推流音频转opus
(gdb) bt
#0  SrsAudioTranscoder::transcode (this=0x555555f20ba0, in_pkt=0x7ffff7542190, out_pkts=...)at src/app/srs_app_rtc_codec.cpp:158
#1  0x000055555582a1aa in SrsRtcFromRtmpBridger::transcode (this=0x55555603d940, audio=0x7ffff7542190)at src/app/srs_app_rtc_source.cpp:872
#2  0x000055555582a0a2 in SrsRtcFromRtmpBridger::on_audio (this=0x55555603d940, msg=0x7ffff7543aa0)at src/app/srs_app_rtc_source.cpp:859
#3  0x00005555556fc902 in SrsLiveSource::on_audio_imp (this=0x555556040b80, msg=0x7ffff7543aa0)at src/app/srs_app_source.cpp:2222
#4  0x00005555556fc4c8 in SrsLiveSource::on_audio (this=0x555556040b80, shared_audio=0x55555608db10) at src/app/srs_app_source.cpp:2177《SrsLiveSource》#5  0x00005555556ee231 in SrsRtmpConn::process_publish_message (this=0x555556021790, source=0x555556040b80, msg=0x55555608db10)at src/app/srs_app_rtmp_conn.cpp:1057
#6  0x00005555556ee09d in SrsRtmpConn::handle_publish_message (this=0x555556021790, source=0x555556040b80, msg=0x55555608db10)at src/app/srs_app_rtmp_conn.cpp:1036
#7  0x00005555557a12c1 in SrsPublishRecvThread::consume (this=0x55555603bb40, msg=0x55555608db10)at src/app/srs_app_recv_thread.cpp:373
#8  0x00005555557a021b in SrsRecvThread::do_cycle (this=0x55555603bb60)
--Type <RET> for more, q to quit, c to continue without paging--at src/app/srs_app_recv_thread.cpp:131
#9  0x00005555557a0068 in SrsRecvThread::cycle (this=0x55555603bb60)at src/app/srs_app_recv_thread.cpp:100
#10 0x0000555555721b22 in SrsFastCoroutine::cycle (this=0x55555608d8d0)at src/app/srs_app_st.cpp:272
#11 0x0000555555721bc6 in SrsFastCoroutine::pfn (arg=0x55555608d8d0)at src/app/srs_app_st.cpp:287
#12 0x00005555558485e0 in _st_thread_main () at sched.c:363
#13 0x0000555555848e99 in st_thread_create (start=0x55555608d4b0, arg=0x55555603b860, joinable=21845, stk_size=1443084375)at sched.c:694
rtc_to_rtmp opus转aac

可见rtc推流独特之处是通过轨道调用上来的,因为通过媒体协商了。

(gdb) bt
#0  SrsRtmpFromRtcBridger::transcode_audio (this=0x555556047030, pkt=0x555556171270) at src/app/srs_app_rtc_source.cpp:1364
#1  0x000055555582bf87 in SrsRtmpFromRtcBridger::on_rtp (this=0x555556047030, pkt=0x555556171270)at src/app/srs_app_rtc_source.cpp:1349
#2  0x000055555582902b in SrsRtcSource::on_rtp (this=0x555555f24fd0, pkt=0x555556171270) at src/app/srs_app_rtc_source.cpp:632
#3  0x000055555583245a in SrsRtcAudioRecvTrack::on_rtp (this=0x555556042fa0, source=0x555555f24fd0, pkt=0x555556171270)at src/app/srs_app_rtc_source.cpp:2466
#4  0x00005555557e40ca in SrsRtcPublishStream::do_on_rtp_plaintext (this=0x5555560425a0, pkt=@0x7ffff75a76e8: 0x555556171270, buf=0x7ffff75a7700) at src/app/srs_app_rtc_conn.cpp:1457
#5  0x00005555557e3edb in SrsRtcPublishStream::on_rtp_plaintext (this=0x5555560425a0, 
--Type <RET> for more, q to quit, c to continue without paging--plaintext=0x5555560116f0 "\220ocq\t4\331\300\332\356\025\265\276", <incomplete sequence \336>, nb_plaintext=88)at src/app/srs_app_rtc_conn.cpp:1430
#6  0x00005555557e3b93 in SrsRtcPublishStream::on_rtp (this=0x5555560425a0, data=0x5555560116f0 "\220ocq\t4\331\300\332\356\025\265\276", <incomplete sequence \336>, nb_data=98) at src/app/srs_app_rtc_conn.cpp:1397
#7  0x00005555557e8a1c in SrsRtcConnection::on_rtp (this=0x55555603a560, data=0x5555560116f0 "\220ocq\t4\331\300\332\356\025\265\276", <incomplete sequence \336>, nb_data=98) at src/app/srs_app_rtc_conn.cpp:2283
#8  0x0000555555821afc in SrsRtcServer::on_udp_packet (this=0x555555ed7080, skt=0x7ffff75a7b70)at src/app/srs_app_rtc_server.cpp:473《SrsRtcServer》#9  0x00005555557b4ef2 in SrsUdpMuxListener::cycle (this=0x555555f202b0)
--Type <RET> for more, q to quit, c to continue without paging--at src/app/srs_app_listener.cpp:620
#10 0x0000555555721b22 in SrsFastCoroutine::cycle (this=0x555555ffe030)at src/app/srs_app_st.cpp:272
#11 0x0000555555721bc6 in SrsFastCoroutine::pfn (arg=0x555555ffe030)at src/app/srs_app_st.cpp:287
#12 0x00005555558485e0 in _st_thread_main () at sched.c:363
#13 0x0000555555848e99 in st_thread_create (start=0x7ffff7add154 <malloc+116>, arg=0x5b0000006e, joinable=119, stk_size=124) at sched.c:694
#14 0x000055555572259e in SrsFileLog::info (this=0x12010, tag=0xffffffffffffff90 <error: Cannot access memory at address 0xffffffffffffff90>, context_id=<error reading variable: Cannot access memory at address 0x12000>, fmt=0x7ffff7c2fb80 "") at src/app/srs_app_log.cpp:118
#15 0x0000555555ed6450 in ?? ()
--Type <RET> for more, q to quit, c to continue without paging--
#16 0x00007ffff7adeb95 in calloc () from /lib/x86_64-linux-gnu/libc.so.6
#17 0x0000555555849708 in st_cond_new () at sync.c:157
#18 0x0000555555848eba in st_thread_create (start=0x555555721ba2 <SrsFastCoroutine::pfn(void*)>, arg=0x555555ffe980, joinable=1, stk_size=65536) at sched.c:702
#19 0x00005555557215f4 in SrsFastCoroutine::start (this=0x555555ffe980)at src/app/srs_app_st.cpp:180
#20 0x0000555555721130 in SrsSTCoroutine::start (this=0x555555ffe6b0)at src/app/srs_app_st.cpp:95
#21 0x00005555556db148 in SrsResourceManager::start (this=0x555555ebf110) at src/app/srs_app_conn.cpp:72
#22 0x0000555555f24f90 in ?? ()
#23 0x00007fffffffd9e0 in ?? ()
#24 0x0000555555721d71 in SrsWaitGroup::wait (this=0x7fffffffda20)at src/app/srs_app_st.cpp:328
--Type <RET> for more, q to quit, c to continue without paging--
#25 0x00005555557d80cf in SrsHybridServer::run (this=0x555555ebe9d0)at src/app/srs_app_hybrid.cpp:281
#26 0x0000555555847917 in run_hybrid_server ()at src/main/srs_main_server.cpp:477
#27 0x00005555558473da in run_directly_or_daemon ()at src/main/srs_main_server.cpp:407
#28 0x0000555555845c2d in do_main (argc=3, argv=0x7fffffffdf68)at src/main/srs_main_server.cpp:198
#29 0x0000555555845eb2 in main (argc=3, argv=0x7fffffffdf68)at src/main/srs_main_server.cpp:207

音频发送

(gdb) bt
#0  SrsUdpMuxSocket::sendto (this=0x5555562eaa60, data=0x5555562ed800, size=146, timeout=0) at src/app/srs_app_listener.cpp:339
#1  0x00005555557eaced in SrsRtcConnection::do_send_packet (this=0x5555562ecff0, pkt=0x5555562bd280) at src/app/srs_app_rtc_conn.cpp:2703
#2  0x0000555555833950 in SrsRtcAudioSendTrack::on_rtp (this=0x5555562e0410, pkt=0x5555562bd280) at src/app/srs_app_rtc_source.cpp:2753
#3  0x00005555557df880 in SrsRtcPlayStream::send_packet (this=0x5555562e0610, pkt=@0x5555561ed838: 0x5555562bd280) at src/app/srs_app_rtc_conn.cpp:738
#4  0x00005555557df245 in SrsRtcPlayStream::cycle (this=0x5555562e0610) at src/app/srs_app_rtc_conn.cpp:673
#5  0x0000555555721b22 in SrsFastCoroutine::cycle (this=0x5555562e4340) at src/app/srs_app_st.cpp:272
#6  0x0000555555721bc6 in SrsFastCoroutine::pfn (arg=0x5555562e4340) at src/app/srs_app_st.cpp:287

视频发送

(gdb) bt
#0  SrsUdpMuxSocket::sendto (this=0x5555560c8950, data=0x5555560586d0, size=547, timeout=0) at src/app/srs_app_listener.cpp:339
#1  0x00005555557eaced in SrsRtcConnection::do_send_packet (this=0x5555560b7c60, pkt=0x55555609efe0) at src/app/srs_app_rtc_conn.cpp:2703
#2  0x0000555555833bdc in SrsRtcVideoSendTrack::on_rtp (this=0x555556390d30, pkt=0x55555609efe0) at src/app/srs_app_rtc_source.cpp:2803
#3  0x00005555557df880 in SrsRtcPlayStream::send_packet (this=0x55555615b850, pkt=@0x5555561d1278: 0x55555609efe0) at src/app/srs_app_rtc_conn.cpp:738
#4  0x00005555557df245 in SrsRtcPlayStream::cycle (this=0x55555615b850) at src/app/srs_app_rtc_conn.cpp:673
#5  0x0000555555721b22 in SrsFastCoroutine::cycle (this=0x555556040ee0) at src/app/srs_app_st.cpp:272
#6  0x0000555555721bc6 in SrsFastCoroutine::pfn (arg=0x555556040ee0) at src/app/srs_app_st.cpp:287

其余知识

WebRTC over TCP支持

值得注意的是,从SRS 5.0版本开始,SRS也支持通过TCP传输WebRTC媒体数据:

  1. TCP传输:除了传统的UDP传输外,SRS现在也支持通过TCP传输WebRTC媒体数据
  2. 端口复用:WebRTC over TCP可以与HTTP API和HTTP流共用同一个TCP端口
  3. 配置方式:可以通过设置SRS_RTC_SERVER_TCP_ENABLED=onSRS_RTC_SERVER_PROTOCOL=tcp启用

测试webrtc推流问题

bug:WebRTC推流必须是HTTPS或者localhost:HttpsRequiredError Please use HTTPS or localhost to publish, read https://github.com/ossrs/srs/issues/2762#issuecomment-983147576

WebRTC推流确实必须在HTTPS或localhost环境下进行,这是现代浏览器的安全策略要求。

解决:

  1. 通过 http://localhost:8080/players/rtc_publisher.html 访问
  2. 推流链接使用localhost会存在问题,应该使用虚拟机IP:webrtc://192.168.126.129/live/livestream
http://www.lqws.cn/news/566803.html

相关文章:

  • 数据可视化 - 单子图
  • 第10章 数组和指针
  • 左神算法之螺旋打印
  • SQL Server从入门到项目实践(超值版)读书笔记 19
  • 从GPTs到Real智能体:目前常见的几种创建智能体方式
  • spring:BeanPostProcessor后置处理器介绍
  • 小米路由器 AX3000T自定义子网掩码
  • Mybatis多条件查询设置参数的三种方法
  • stm32hal模块驱动(1)hpdl1414驱动
  • Vue的watch函数实现
  • 华为云 Flexus+DeepSeek 征文|华为云 Flexus 云服务 Dify-LLM 平台深度部署指南:从基础搭建到高可用实践
  • 智能制造——解读西门子数字化工厂规划报告(三年实施计划)【附全文阅读】
  • 机器学习在智能供应链中的应用:需求预测与库存优化
  • 大事件项目记录12-文章管理接口开发-总
  • 设计模式之适配器模式
  • OpenCV读取照片和可视化详解和代码示例
  • MySQL 安装使用教程
  • Java垃圾收集机制Test
  • PL-SLAM: Real-Time Monocular Visual SLAM with Points and Lines
  • Ai工具分享(2):Vscode+Cline无限免费的使用教程
  • XWPFDocument导出word文件
  • Linux中《动/静态库原理》
  • Redis缓存击穿深度解析:从现象到实战的完整解决方案
  • github上传代码步骤(http)
  • Cesium快速入门到精通系列教程十二:Cesium1.74中环绕地球生成​​经线环​​
  • Javaweb - 7 xml
  • 【智能协同云图库】智能协同云图库第三弹:基于腾讯云 COS 对象存储—开发图片模块
  • 日常 AI 工具汇总
  • Oracle 递归 + Decode + 分组函数实现复杂树形统计进阶(第二课)
  • 深入剖析 Linux 内核网络核心:sock.c 源码解析