发送数据全流程
1、发送的线程(在声音设备端直接获取声卡数据)
先看发送的线程 ca_thread_func,这里我们以alsa_dev为例
static int ca_thread_func (void *arg)
{
struct alsa_stream* stream = (struct alsa_stream*) arg;
snd_pcm_t* pcm = stream->ca_pcm;
int size = stream->ca_buf_size;
snd_pcm_uframes_t nframes = stream->ca_frames;
void* user_data = stream->user_data;
char* buf = stream->ca_buf;
pj_timestamp tstamp;
int result;
struct sched_param param;
pthread_t* thid;
thid = (pthread_t*) pj_thread_get_os_handle (pj_thread_this());
param.sched_priority = sched_get_priority_max (SCHED_RR);
PJ_LOG (5,(THIS_FILE, "ca_thread_func(%u): Set thread priority "
"for audio capture thread.",
(unsigned)syscall(SYS_gettid)));
result = pthread_setschedparam (*thid, SCHED_RR, ¶m);
if (result) {
if (result == EPERM)
PJ_LOG (5,(THIS_FILE, "Unable to increase thread priority, "
"root access needed."));
else
PJ_LOG (5,(THIS_FILE, "Unable to increase thread priority, "
"error: %d",
result));
}
pj_bzero (buf, size);
tstamp.u64 = 0;
TRACE_((THIS_FILE, "ca_thread_func(%u): Started",
(unsigned)syscall(SYS_gettid)));
snd_pcm_prepare (pcm);
while (!stream->quit) {
pjmedia_frame frame;
pj_bzero (buf, size);
result = snd_pcm_readi (pcm, buf, nframes);
if (result == -EPIPE) {
PJ_LOG (4,(THIS_FILE, "ca_thread_func: overrun!"));
snd_pcm_prepare (pcm);
continue;
} else if (result < 0) {
PJ_LOG (4,(THIS_FILE, "ca_thread_func: error reading data!"));
}
if (stream->quit)
break;
frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
frame.buf = (void*) buf;
frame.size = size;
frame.timestamp.u64 = tstamp.u64;
frame.bit_info = 0;
result = stream->ca_cb (user_data, &frame);
if (result != PJ_SUCCESS || stream->quit)
break;
tstamp.u64 += nframes;
}
snd_pcm_drop(pcm);
TRACE_((THIS_FILE, "ca_thread_func: Stopped"));
return PJ_SUCCESS;
}
该线程的主体部分在不停的while循环,我们来看一次循环的内容:
调用 snd_pcm_readi (pcm, buf, nframes);
读网卡的数据到buf中。然后将读出的数据和一系列属性包装成一个frame。然后调用stream->ca_cb (user_data, &frame);
在初始化(pjmedia_aud_stream_create->f->op->create_stream)中我们知道stream->ca_cb 我们设置为 rec_cb
接下来我们来看rec_cb
在sound_port.c中.
另外我们需要知道ca_cb
中的user_data
到底是什么 这可以看我们之前初始化流程的分析:pjmedia_snd_port_create-》pjmedia_snd_port_create2-》start_sound_device-》pjmedia_aud_stream_create-》f->op->create_stream-》alsa_factory_create_stream
到pjmedia_aud_stream_create这步我们看出user_data是我们创建的snd_port
2、rec_cb (音频流端)
/*
* The callback called by sound recorder when it has finished capturing a
* frame.
*/
static pj_status_t rec_cb(void *user_data, pjmedia_frame *frame)
{
pjmedia_snd_port *snd_port = (pjmedia_snd_port*) user_data;
pjmedia_port *port;
pjmedia_clock_src_update(&snd_port->cap_clocksrc, &frame->timestamp);
/* Invoke preview callback */
if (snd_port->on_rec_frame)
(*snd_port->on_rec_frame)(snd_port->user_data, frame);
port = snd_port->port;
if (port == NULL)
return PJ_SUCCESS;
/* Cancel echo */
if (snd_port->ec_state && !snd_port->ec_suspended) {
pjmedia_echo_capture(snd_port->ec_state, (pj_int16_t*) frame->buf, 0);
}
pjmedia_port_put_frame(port, frame);
return PJ_SUCCESS;
}
首先看snd_port->on_rec_frame
这个是可选项,这里没有使用
其次就是最重要的调用pjmedia_port_put_frame,看一下这里的port参数,port = snd_port->port;
是pjmedia_snd_port类型snd_port中的pjmedia_port类型属性,snd_port->port是在pjmedia_snd_port_connect
中初始化的先看一下pjmedia_snd_port_connect
/*
* Connect a port.
*/
PJ_DEF(pj_status_t) pjmedia_snd_port_connect( pjmedia_snd_port *snd_port,
pjmedia_port *port)
{
pjmedia_audio_format_detail *afd;
PJ_ASSERT_RETURN(snd_port && port, PJ_EINVAL);
afd = pjmedia_format_get_audio_format_detail(&port->info.fmt, PJ_TRUE);
/* Check that port has the same configuration as the sound device
* port.
*/
if (afd->clock_rate != snd_port->clock_rate)
return PJMEDIA_ENCCLOCKRATE;
if (PJMEDIA_AFD_SPF(afd) != snd_port->samples_per_frame)
return PJMEDIA_ENCSAMPLESPFRAME;
if (afd->channel_count != snd_port->channel_count)
return PJMEDIA_ENCCHANNEL;
if (afd->bits_per_sample != snd_port->bits_per_sample)
return PJMEDIA_ENCBITS;
/* Port is okay. */
snd_port->port = port;
return PJ_SUCCESS;
}
注意这里的参数pjmedia_port *port
来自pjmedia_stream stream的port,这个port还有一点特殊即port->port_data.pdata; 其实是指向stream,即stream中有port,port也有办法指向stream,这在put_frame中会遇到。
<::>再回来看pjmedia_port_put_frame该函数其实是port->put_frame callback的封装,会直接调用port->put_frame,这个回调函数的初始化在pjmedia_stream_create
完成将port->put_frame 初始化为 stream->port.put_frame = &put_frame;,所以接下来我们看put_frame
/**
* Put a frame to the port (and subsequent downstream ports).
*/
PJ_DEF(pj_status_t) pjmedia_port_put_frame( pjmedia_port *port,
pjmedia_frame *frame )
{
PJ_ASSERT_RETURN(port && frame, PJ_EINVAL);
if (port->put_frame)
return port->put_frame(port, frame);
else
return PJ_EINVALIDOP;
}
3、put_frame (port端)
/**
* put_frame()
*
* This callback is called by upstream component when it has PCM frame
* to transmit. This function encodes the PCM frame, pack it into
* RTP packet, and transmit to peer.
*/
static pj_status_t put_frame( pjmedia_port *port,
pjmedia_frame *frame )
{
pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
pjmedia_frame tmp_zero_frame;
unsigned samples_per_frame;
samples_per_frame = stream->enc_samples_per_pkt;
/* https://github.com/pjsip/pjproject/issues/56:
* when input is PJMEDIA_FRAME_TYPE_NONE, feed zero PCM frame
* instead so that encoder can decide whether or not to transmit
* silence frame.
*/
if (frame->type == PJMEDIA_FRAME_TYPE_NONE) {
pj_memcpy(&tmp_zero_frame, frame, sizeof(pjmedia_frame));
frame = &tmp_zero_frame;
tmp_zero_frame.buf = NULL;
tmp_zero_frame.size = samples_per_frame * 2;
tmp_zero_frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
}
/* If VAD is temporarily disabled during creation, enable it
* after transmitting for VAD_SUSPEND_SEC seconds.
*/
if (stream->vad_enabled != stream->codec_param.setting.vad &&
(stream->tx_duration - stream->ts_vad_disabled) >
PJMEDIA_PIA_SRATE(&stream->port.info) *
PJMEDIA_STREAM_VAD_SUSPEND_MSEC / 1000)
{
stream->codec_param.setting.vad = stream->vad_enabled;
pjmedia_codec_modify(stream->codec, &stream->codec_param);
PJ_LOG(4,(stream->port.info.name.ptr,"VAD re-enabled"));
}
/* If encoder has different ptime than decoder, then the frame must
* be passed through the encoding buffer via rebuffer() function.
*/
if (stream->enc_buf != NULL) {
pjmedia_frame tmp_rebuffer_frame;
pj_status_t status = PJ_SUCCESS;
/* Copy original frame to temporary frame since we need
* to modify it.
*/
pj_memcpy(&tmp_rebuffer_frame, frame, sizeof(pjmedia_frame));
/* Loop while we have full frame in enc_buffer */
for (;;) {
pj_status_t st;
/* Run rebuffer() */
rebuffer(stream, &tmp_rebuffer_frame);
/* Process this frame */
st = put_frame_imp(port, &tmp_rebuffer_frame);
if (st != PJ_SUCCESS)
status = st;
/* If we still have full frame in the buffer, re-run
* rebuffer() with NULL frame.
*/
if (stream->enc_buf_count >= stream->enc_samples_per_pkt) {
tmp_rebuffer_frame.type = PJMEDIA_FRAME_TYPE_NONE;
} else {
/* Otherwise break */
break;
}
}
return status;
} else {
return put_frame_imp(port, frame);
}
}
重点看一下put_frame_imp
对frame的处理,函数比较长,我们来分段看一下
-
函数开始时,从
port
参数中获取了指向pjmedia_stream
结构的指针stream
,并且从stream
中获取了编码通道enc
。pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; pjmedia_channel *channel = stream->enc;
-
如果流启用了保活机制,函数会检查距离上次发送数据包的时间间隔,如果超过了指定的保活间隔,会发送一个保活数据包。
-
然后,函数会根据帧的类型计算帧中的样本数
ts_len
,并根据是否存在特定的编码器问题来确定 RTP 时间戳的长度rtp_ts_len
。/* Number of samples in the frame */ if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO) ts_len = ((unsigned)frame->size >> 1) / stream->codec_param.info.channel_cnt; else if (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED) ts_len = PJMEDIA_PIA_SPF(&stream->port.info) / PJMEDIA_PIA_CCNT(&stream->port.info); else ts_len = 0; #if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) /* Handle special case for audio codec with RTP timestamp inconsistence * e.g: G722, MPEG audio. */ if (stream->has_g722_mpeg_bug) rtp_ts_len = stream->rtp_tx_ts_len_per_pkt; else rtp_ts_len = ts_len; #else rtp_ts_len = ts_len; #endif
-
如果编码通道被暂停,函数会更新 RTP 会话的时间戳,并在需要时发送 RTCP SR/RR 报告。
/* Don't do anything if stream is paused, except updating RTP timestamp */ if (channel->paused) { stream->enc_buf_pos = stream->enc_buf_count = 0; /* Update RTP session's timestamp. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, 0, 0, 0, rtp_ts_len, NULL, NULL); /* Update RTCP stats with last RTP timestamp. */ stream->rtcp.stat.rtp_tx_last_ts = pj_ntohl(channel->rtp.out_hdr.ts); /* Check if now is the time to transmit RTCP SR/RR report. * We only do this when the decoder is paused, * because otherwise check_tx_rtcp() will be handled by on_rx_rtp(). */ if (stream->dec->paused) { check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts)); } return PJ_SUCCESS; }
-
接着,函数会增加传输时长,初始化输出帧缓冲区,并检查是否有 DTMF 数字在队列中,如果有则发送数字,否则对音频帧进行编码。
if (stream->tx_dtmf_count) { int first=0, last=0; create_dtmf_payload(stream, &frame_out, 0, &first, &last); /* Encapsulate into RTP packet. Note that: * - RTP marker should be set on the beginning of a new event * - RTP timestamp is constant for the same packet. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, first, (int)frame_out.size, (first ? rtp_ts_len : 0), (const void**)&rtphdr, &rtphdrlen); if (last) { /* This is the last packet for the event. * Increment the RTP timestamp of the RTP session, for next * RTP packets. */ inc_timestamp = stream->dtmf_duration + ((DTMF_EBIT_RETRANSMIT_CNT-1) * stream->rtp_tx_ts_len_per_pkt) - rtp_ts_len; } }
-
如果音频帧的缓冲区为空,则发送一段静音,保持 NAT 绑定。
else if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO && frame->buf == NULL && stream->port.info.fmt.id == PJMEDIA_FORMAT_L16 && (stream->dir & PJMEDIA_DIR_ENCODING)) { pjmedia_frame silence_frame; pj_bzero(&silence_frame, sizeof(silence_frame)); silence_frame.buf = stream->zero_frame; silence_frame.size = stream->enc_samples_per_pkt * 2; silence_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; silence_frame.timestamp.u32.lo = pj_ntohl(stream->enc->rtp.out_hdr.ts); /* Encode! */ status = pjmedia_codec_encode( stream->codec, &silence_frame, channel->out_pkt_size - sizeof(pjmedia_rtp_hdr), &frame_out); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, status, "Codec encode() error")); return status; } /* Encapsulate. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, channel->pt, 0, (int)frame_out.size, rtp_ts_len, (const void**)&rtphdr, &rtphdrlen); }
-
如果音频帧不为空,则对音频帧进行编码,并将 RTP 头封装到输出包中。
else if ((frame->type == PJMEDIA_FRAME_TYPE_AUDIO && frame->buf != NULL) || (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED)) { /* Encode! */ status = pjmedia_codec_encode( stream->codec, frame, channel->out_pkt_size - sizeof(pjmedia_rtp_hdr), &frame_out); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, status, "Codec encode() error")); return status; } /* Encapsulate. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, channel->pt, 0, (int)frame_out.size, rtp_ts_len, (const void**)&rtphdr, &rtphdrlen); }
-
最后,函数会根据当前是否正在流式传输来设置 RTP 标记位,并将 RTP 包发送到传输层。
/* Copy RTP header to the beginning of packet */ pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr)); /* Special case for DTMF: timestamp remains constant for * the same event, and is only updated after a complete event * has been transmitted. */ if (inc_timestamp) { pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, 0, 0, inc_timestamp, NULL, NULL); } /* Set RTP marker bit if currently not streaming */ if (stream->is_streaming == PJ_FALSE) { pjmedia_rtp_hdr *rtp = (pjmedia_rtp_hdr*) channel->out_pkt; rtp->m = 1; PJ_LOG(5,(stream->port.info.name.ptr,"Start talksprut..")); } stream->is_streaming = PJ_TRUE; /* Send the RTP packet to the transport. */ status = pjmedia_transport_send_rtp(stream->transport, channel->out_pkt, frame_out.size + sizeof(pjmedia_rtp_hdr));
-
在发送 RTP 包之后,函数会更新一些统计信息,并且如果启用了保活机制,则记录最后发送数据包的时间。
我们来仔细梳理一下rtp包发送的流程
(1)先设置frame_out.buf 对应的偏置 ((char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr);
(2)调用pjmedia_codec_encode,对frame编码结果输出到frame_out中,此时frame_out.buf获取到rtp payload,相应地(char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr处获取到rtp payload
(3)添加rtp头部,调用pjmedia_rtp_encode_rtp,hannel->rtp->out_hdr,并将头部拷贝至(char*)channel->out_pkt)处,至此channel->out_pkt存放地为编码后的rtp包
(4)Send the RTP packet to the transport. 调用pjmedia_transport_send_rtp
我们就主要看pjmedia_transport_send_rtp了,其实就是op->send_rtp 的封装,由初始化可知send_rtp为transport_send_rtcp
PJ_INLINE(pj_status_t) pjmedia_transport_send_rtp(pjmedia_transport *tp,
const void *pkt,
pj_size_t size)
{
return (*tp->op->send_rtp)(tp, pkt, size);
}
4、transport_send_rtp
直接调用transport_send_rtcp2
/* Called by application to send RTP packet */
static pj_status_t transport_send_rtp( pjmedia_transport *tp,
const void *pkt,
pj_size_t size)
{
struct transport_udp *udp = (struct transport_udp*)tp;
pj_ssize_t sent;
unsigned id;
struct pending_write *pw;
pj_status_t status;
/* Must be attached */
//PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP);
/* Check that the size is supported */
PJ_ASSERT_RETURN(size <= PJMEDIA_MAX_MTU, PJ_ETOOBIG);
if (!udp->started) {
return PJ_SUCCESS;
}
/* Simulate packet lost on TX direction */
if (udp->tx_drop_pct) {
if ((pj_rand() % 100) <= (int)udp->tx_drop_pct) {
PJ_LOG(5,(udp->base.name,
"TX RTP packet dropped because of pkt lost "
"simulation"));
return PJ_SUCCESS;
}
}
id = udp->rtp_write_op_id;
pw = &udp->rtp_pending_write[id];
if (pw->is_pending) {
/* There is still currently pending operation for this buffer. */
PJ_LOG(4,(udp->base.name, "Too many pending write operations"));
return PJ_EBUSY;
}
pw->is_pending = PJ_TRUE;
/* We need to copy packet to our buffer because when the
* operation is pending, caller might write something else
* to the original buffer.
*/
pj_memcpy(pw->buffer, pkt, size);
sent = size;
status = pj_ioqueue_sendto( udp->rtp_key,
&udp->rtp_pending_write[id].op_key,
pw->buffer, &sent, 0,
&udp->rem_rtp_addr,
udp->addr_len);
if (status != PJ_EPENDING) {
/* Send operation has completed immediately. Clear the flag. */
pw->is_pending = PJ_FALSE;
}
udp->rtp_write_op_id = (udp->rtp_write_op_id + 1) %
PJ_ARRAY_SIZE(udp->rtp_pending_write);
if (status==PJ_SUCCESS || status==PJ_EPENDING)
return PJ_SUCCESS;
return status;
}
udp->rtp_write_op_id;是当前write操作可用的id,udp->rtp_pending_write是Pending write对象用于指示udp->rtp_pending_write[udp->rtp_write_op_id]是否有pending的write 操作如果有pending 返回。没有的话将该id处置为pending pw->is_pending = PJ_TRUE;
调用pj_ioqueue_sendto 最后需要将id+1
5、pj_ioqueue_sendto
参数key来着 udp->rtp_key 在 pj_ioqueue_register_sock2中初始化,绑定到rtp socket
参数op_key来自 transport_udp::udp->rtp_pending_write[id].op_key,在transport_media_start中设置为空
如果key对应的writelist不为空,直接发送调用pj_sock_sendto
if (pj_list_empty(&key->write_list)) {
/*
* See if data can be sent immediately.
*/
sent = *length;
status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
}
否则初始化write_operation:: write_op 主要需要将要发送的rtp数据保存在write_op中,然后挂在key对应的writelist队列上
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
write_op->buf = (char*)data;
write_op->size = *length;
write_op->written = 0;
write_op->flags = flags;
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
write_op->rmt_addrlen = addrlen;
pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->write_list, write_op);
调用ioqueue_add_to_set进而调用ioqueue_add_to_set2
static void ioqueue_add_to_set2(pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
unsigned event_types )
{
pj_uint32_t events = key->ev.events;
if (event_types & READABLE_EVENT)
events |= EPOLLIN;
if (event_types & WRITEABLE_EVENT)
events |= EPOLLOUT;
if (event_types & EXCEPTION_EVENT)
events |= EPOLLERR;
if (events != key->ev.events)
update_epoll_event_set(ioqueue, key, events);
}
根据 event_types 设置 events 调用 update_epoll_event_set 这里event_types是WRITEABLE_EVENT
static void update_epoll_event_set(pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
pj_uint32_t events)
{
int rc;
/* From epoll_ctl(2):
* EPOLLEXCLUSIVE may be used only in an EPOLL_CTL_ADD operation;
* attempts to employ it with EPOLL_CTL_MOD yield an error.
*/
if (key->ev.events & EPOLLEXCLUSIVE) {
rc = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &key->ev);
key->ev.events = events;
rc = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, key->fd, &key->ev);
} else {
key->ev.events = events;
rc = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &key->ev);
}
if (rc != 0) {
pj_status_t status = pj_get_os_error();
PJ_PERROR(1,(THIS_FILE, status,
"epol_ctl(MOD) error (events=0x%x)", events));
}
}
这里修改监测rtp socket事件EPOLL_CTL_MOD修改为EPOLLOUT 触发
6、ioqueue_epoll参与
这里不一定是上述追踪的rtp包
当pj_ioqueue_poll工作线程 调用os_epoll_wait 发现监测的EPOLLOUT写触发,调用ioqueue_dispatch_write_event写操作,在ioqueue_dispatch_write_event中先看key write_list上有没有pending_write,有的话,从write_list取出,根据write_list的 write_op确定写大小,要写入的数据,将数据写入调用pj_sock_send函数Transmit data to the socket.,最后调用on_write_complete,回调函数已在pj_ioqueue_register_sock2时设置过,传入write_op为on_rtp_data_sent
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
(*h->cb.on_write_complete)(h,
(pj_ioqueue_op_key_t*)write_op,
write_op->written);
}
static void on_rtp_data_sent(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent)
{
struct transport_udp *udp;
unsigned i;
PJ_UNUSED_ARG(bytes_sent);
udp = (struct transport_udp*) pj_ioqueue_get_user_data(key);
for (i = 0; i < PJ_ARRAY_SIZE(udp->rtp_pending_write); ++i) {
if (&udp->rtp_pending_write[i].op_key == op_key) {
udp->rtp_pending_write[i].is_pending = PJ_FALSE;
break;
}
}
}
遍历transport_udp udp中的rtp_pending_write 找到与目标 op_key一致的位置,将is_pending置为false完成
附:相互指向关系
stream.user_data-》snd_port,
snd_port.port-》port
port->port_data.pdata;-》stream
stream->transport-》pjmedia_transport