接收数据全流程
1、接受的线程(在声音设备端直接向声卡写入数据)
先看发送的线程 pb_thread_func,这里我们以alsa_dev为例
static int pb_thread_func (void *arg)
{
struct alsa_stream* stream = (struct alsa_stream*) arg;
snd_pcm_t* pcm = stream->pb_pcm;
int size = stream->pb_buf_size;
snd_pcm_uframes_t nframes = stream->pb_frames;
void* user_data = stream->user_data;
char* buf = stream->pb_buf;
pj_timestamp tstamp;
int result;
pj_bzero (buf, size);
tstamp.u64 = 0;
TRACE_((THIS_FILE, "pb_thread_func(%u): Started",
(unsigned)syscall(SYS_gettid)));
snd_pcm_prepare (pcm);
while (!stream->quit) {
pjmedia_frame frame;
frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
frame.buf = buf;
frame.size = size;
frame.timestamp.u64 = tstamp.u64;
frame.bit_info = 0;
result = stream->pb_cb (user_data, &frame);
if (result != PJ_SUCCESS || stream->quit)
break;
if (frame.type != PJMEDIA_FRAME_TYPE_AUDIO)
pj_bzero (buf, size);
result = snd_pcm_writei (pcm, buf, nframes);
if (result == -EPIPE) {
PJ_LOG (4,(THIS_FILE, "pb_thread_func: underrun!"));
snd_pcm_prepare (pcm);
} else if (result < 0) {
PJ_LOG (4,(THIS_FILE, "pb_thread_func: error writing data!"));
}
tstamp.u64 += nframes;
}
snd_pcm_drop(pcm);
TRACE_((THIS_FILE, "pb_thread_func: Stopped"));
return PJ_SUCCESS;
}
该线程的主体部分在不停的while循环,我们来看一次循环的内容:
调用 stream->pb_cb (user_data, &frame);
其中user_data 依然是我们创建的snd_port
,frame存放最终接收的一个frame即Frame to store samples.,需要先对frame的类型等信息进行设置。
调用 snd_pcm_writei (pcm, buf, nframes);
将数据写入声卡
2、play_cb (音频流端)
pjmedia/src/pjmedia/sound_port.c
/*
* The callback called by sound player when it needs more samples to be
* played.
*/
static pj_status_t play_cb(void *user_data, pjmedia_frame *frame)
{
pjmedia_snd_port *snd_port = (pjmedia_snd_port*) user_data;
pjmedia_port *port;
const unsigned required_size = (unsigned)frame->size;
pj_status_t status;
pjmedia_clock_src_update(&snd_port->play_clocksrc, &frame->timestamp);
port = snd_port->port;
if (port == NULL)
goto no_frame;
status = pjmedia_port_get_frame(port, frame);
if (status != PJ_SUCCESS)
goto no_frame;
if (frame->type != PJMEDIA_FRAME_TYPE_AUDIO)
goto no_frame;
/* Must supply the required samples */
pj_assert(frame->size == required_size);
if (snd_port->ec_state) {
if (snd_port->ec_suspended) {
snd_port->ec_suspended = PJ_FALSE;
//pjmedia_echo_state_reset(snd_port->ec_state);
PJ_LOG(4,(THIS_FILE, "EC activated"));
}
snd_port->ec_suspend_count = 0;
pjmedia_echo_playback(snd_port->ec_state, (pj_int16_t*)frame->buf);
}
/* Invoke preview callback */
if (snd_port->on_play_frame)
(*snd_port->on_play_frame)(snd_port->user_data, frame);
return PJ_SUCCESS;
no_frame:
frame->type = PJMEDIA_FRAME_TYPE_AUDIO;
frame->size = required_size;
pj_bzero(frame->buf, frame->size);
if (snd_port->ec_state && !snd_port->ec_suspended) {
++snd_port->ec_suspend_count;
if (snd_port->ec_suspend_count > snd_port->ec_suspend_limit) {
snd_port->ec_suspended = PJ_TRUE;
PJ_LOG(4,(THIS_FILE, "EC suspended because of inactivity"));
}
if (snd_port->ec_state) {
/* To maintain correct delay in EC */
pjmedia_echo_playback(snd_port->ec_state, (pj_int16_t*)frame->buf);
}
}
/* Invoke preview callback */
if (snd_port->on_play_frame)
(*snd_port->on_play_frame)(snd_port->user_data, frame);
return PJ_SUCCESS;
}
首先看snd_port->on_play_frame
这个是可选项,这里没有使用
其次就是最重要的调用pjmedia_port_get_frame,看一下这里的port参数,port = snd_port->port;
是pjmedia_snd_port类型snd_port中的pjmedia_port类型属性,snd_port->port是在pjmedia_snd_port_connect
中初始化的先看一下pjmedia_snd_port_connect
/*
* Connect a port.
*/
PJ_DEF(pj_status_t) pjmedia_snd_port_connect( pjmedia_snd_port *snd_port,
pjmedia_port *port)
{
pjmedia_audio_format_detail *afd;
PJ_ASSERT_RETURN(snd_port && port, PJ_EINVAL);
afd = pjmedia_format_get_audio_format_detail(&port->info.fmt, PJ_TRUE);
/* Check that port has the same configuration as the sound device
* port.
*/
if (afd->clock_rate != snd_port->clock_rate)
return PJMEDIA_ENCCLOCKRATE;
if (PJMEDIA_AFD_SPF(afd) != snd_port->samples_per_frame)
return PJMEDIA_ENCSAMPLESPFRAME;
if (afd->channel_count != snd_port->channel_count)
return PJMEDIA_ENCCHANNEL;
if (afd->bits_per_sample != snd_port->bits_per_sample)
return PJMEDIA_ENCBITS;
/* Port is okay. */
snd_port->port = port;
return PJ_SUCCESS;
}
注意这里的参数pjmedia_port *port
来自pjmedia_stream stream的port,这个port还有一点特殊即port->port_data.pdata; 其实是指向stream,即stream中有port,port也有办法指向stream,这在put_frame中会遇到。
<::>再回来看pjmedia_port_put_frame该函数其实是port->put_frame callback的封装,会直接调用port->put_frame,这个回调函数的初始化在pjmedia_stream_create
完成将port->put_frame 初始化为 stream->get_frame = &get_frame;,所以接下来我们看put_frame
/**
* Get a frame from the port (and subsequent downstream ports).
*/
PJ_DEF(pj_status_t) pjmedia_port_get_frame( pjmedia_port *port,
pjmedia_frame *frame )
{
PJ_ASSERT_RETURN(port && frame, PJ_EINVAL);
if (port->get_frame)
return port->get_frame(port, frame);
else {
frame->type = PJMEDIA_FRAME_TYPE_NONE;
return PJ_EINVALIDOP;
}
}
3、get_frame (port端)
pjmedia/src/pjmedia/stream.c
-
首先,函数从
port
中获取与当前流相关联的stream
和channel
。pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; pjmedia_channel *channel = stream->dec;
-
然后,函数检查通道是否处于暂停状态,如果是,则设置帧类型为
PJMEDIA_FRAME_TYPE_NONE
,表示没有帧可用,并返回PJ_SUCCESS
。/* Return no frame is channel is paused */ if (channel->paused) { frame->type = PJMEDIA_FRAME_TYPE_NONE; return PJ_SUCCESS; }
-
接着,函数检查是否处于软启动计数状态。如果是,它首先检查软启动计数是否为
PJMEDIA_STREAM_SOFT_START
,如果是,则重置抖动缓冲区。然后递减软启动计数,并返回PJ_SUCCESS
。if (stream->soft_start_cnt) { if (stream->soft_start_cnt == PJMEDIA_STREAM_SOFT_START) { PJ_LOG(4,(stream->port.info.name.ptr, "Resetting jitter buffer in stream playback start")); pj_mutex_lock( stream->jb_mutex ); pjmedia_jbuf_reset(stream->jb); pj_mutex_unlock( stream->jb_mutex ); } --stream->soft_start_cnt; frame->type = PJMEDIA_FRAME_TYPE_NONE; return PJ_SUCCESS; }
-
接下来,函数从抖动缓冲区中获取帧并解码,直到获得足够的帧以满足编解码器的 ptime 要求。
-
函数首先锁定抖动缓冲区的互斥锁。
pj_mutex_lock( stream->jb_mutex );
-
接着,函数计算所需的样本数,并根据解码器的 ptime 要求计算每帧的样本数。
samples_required = PJMEDIA_PIA_SPF(&stream->port.info); samples_per_frame = stream->dec_ptime * stream->codec_param.info.clock_rate * stream->codec_param.info.channel_cnt / stream->dec_ptime_denum / 1000; p_out_samp = (pj_int16_t*) frame->buf;
-
然后,函数循环获取帧,直到获得足够的样本数。在每次循环中,它会尝试从抖动缓冲区获取帧pjmedia_jbuf_get_frame2放在channel->out_pkt,并根据获取的帧类型进行不同的处理。
for (samples_count=0; samples_count < samples_required;) { char frame_type; pj_size_t frame_size = channel->out_pkt_size; pj_uint32_t bit_info; if (stream->dec_buf && stream->dec_buf_pos < stream->dec_buf_count) { unsigned nsamples_req = samples_required - samples_count; unsigned nsamples_avail = stream->dec_buf_count - stream->dec_buf_pos; unsigned nsamples_copy = PJ_MIN(nsamples_req, nsamples_avail); pjmedia_copy_samples(p_out_samp + samples_count, stream->dec_buf + stream->dec_buf_pos, nsamples_copy); samples_count += nsamples_copy; stream->dec_buf_pos += nsamples_copy; continue; } /* Get frame from jitter buffer. */ pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size, &frame_type, &bit_info);
-
如果帧类型为
PJMEDIA_JB_MISSING_FRAME
,表示丢失帧,则尝试激活 PLC 进行丢帧处理。如果 PLC 激活成功,则填充丢失的样本,并增加 PLC 计数。 -
如果帧类型为
PJMEDIA_JB_ZERO_EMPTY_FRAME
,表示抖动缓冲区为空。函数会尝试激活 PLC 进行丢帧处理,然后填充零样本,以平滑淡出。 -
如果帧类型为
PJMEDIA_JB_ZERO_PREFETCH_FRAME
,表示抖动缓冲区正在预取数据。函数会尝试激活 PLC 进行丢帧处理,然后填充零样本。 -
如果帧类型为
PJMEDIA_JB_NORMAL_FRAME
,表示获得了正常的帧。函数会解码帧pjmedia_codec_decode 得到frame_out并将其放入播放缓冲区即传入的参数frame中。/* Got "NORMAL" frame from jitter buffer */ pjmedia_frame frame_in, frame_out; pj_bool_t use_dec_buf = PJ_FALSE; stream->plc_cnt = 0; /* Decode */ frame_in.buf = channel->out_pkt; frame_in.size = frame_size; frame_in.bit_info = bit_info; frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; /* ignored */ frame_out.buf = p_out_samp + samples_count; frame_out.size = frame->size - samples_count*BYTES_PER_SAMPLE; if (stream->dec_buf && bit_info * sizeof(pj_int16_t) > frame_out.size) { stream->dec_buf_pos = 0; stream->dec_buf_count = bit_info; use_dec_buf = PJ_TRUE; frame_out.buf = stream->dec_buf; frame_out.size = stream->dec_buf_size; } status = pjmedia_codec_decode( stream->codec, &frame_in, (unsigned)frame_out.size, &frame_out); if (status != 0) { LOGERR_((port->info.name.ptr, status, "codec decode() error")); if (use_dec_buf) { pjmedia_zero_samples(stream->dec_buf, stream->dec_buf_count); } else { pjmedia_zero_samples(p_out_samp + samples_count, samples_per_frame); } } else if (use_dec_buf) { stream->dec_buf_count = (unsigned)frame_out.size / sizeof(pj_int16_t); } if (stream->jb_last_frm != frame_type) { /* Report changing frame type event */ PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer starts returning normal frames " "(after %d empty/lost)", stream->jb_last_frm_cnt)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } if (!use_dec_buf) samples_count += samples_per_frame;
-
-
-
最后,函数解锁抖动缓冲区的互斥锁,并根据获取的样本数设置帧类型和大小,并返回
PJ_SUCCESS
。
(1)pjmedia_jbuf_get_frame3
/*
* Get frame from jitter buffer.
*/
PJ_DEF(void) pjmedia_jbuf_get_frame3(pjmedia_jbuf *jb,
void *frame,
pj_size_t *size,
char *p_frame_type,
pj_uint32_t *bit_info,
pj_uint32_t *ts,
int *seq)
{
if (jb->jb_prefetching) {
/* Can't return frame because jitter buffer is filling up
* minimum prefetch.
*/
//pj_bzero(frame, jb->jb_frame_size);
*p_frame_type = PJMEDIA_JB_ZERO_PREFETCH_FRAME;
if (size)
*size = 0;
TRACE__((jb->jb_name.ptr, "GET prefetch_cnt=%d/%d",
jb_framelist_eff_size(&jb->jb_framelist), jb->jb_prefetch));
jb->jb_empty++;
} else {
pjmedia_jb_frame_type ftype = PJMEDIA_JB_NORMAL_FRAME;
pj_bool_t res;
/* Try to retrieve a frame from frame list */
res = jb_framelist_get(&jb->jb_framelist, frame, size, &ftype,
bit_info, ts, seq);
if (res) {
/* We've successfully retrieved a frame from the frame list, but
* the frame could be a blank frame!
*/
if (ftype == PJMEDIA_JB_NORMAL_FRAME) {
*p_frame_type = PJMEDIA_JB_NORMAL_FRAME;
} else {
*p_frame_type = PJMEDIA_JB_MISSING_FRAME;
jb->jb_lost++;
}
/* Store delay history at the first GET */
if (jb->jb_last_op == JB_OP_PUT) {
unsigned cur_size;
/* We've just retrieved one frame, so add one to cur_size */
cur_size = jb_framelist_eff_size(&jb->jb_framelist) + 1;
pj_math_stat_update(&jb->jb_delay,
cur_size * jb->jb_frame_ptime /
jb->jb_frame_ptime_denum);
}
} else {
/* Jitter buffer is empty */
if (jb->jb_prefetch)
jb->jb_prefetching = PJ_TRUE;
//pj_bzero(frame, jb->jb_frame_size);
*p_frame_type = PJMEDIA_JB_ZERO_EMPTY_FRAME;
if (size)
*size = 0;
jb->jb_empty++;
}
}
jb->jb_level++;
jbuf_update(jb, JB_OP_GET);
}
- 首先,函数检查抖动缓冲区是否正在预取数据。如果是,则表示抖动缓冲区正在填充,此时无法返回帧,因此将帧类型设置为
PJMEDIA_JB_ZERO_PREFETCH_FRAME
,并将帧大小设置为 0。 - 如果抖动缓冲区不在预取状态,则尝试从帧列表中获取帧。如果成功获取到帧,则将帧类型设置为
PJMEDIA_JB_NORMAL_FRAME
,表示正常帧。如果获取到的是空白帧,则将帧类型设置为PJMEDIA_JB_MISSING_FRAME
,并增加抖动缓冲区丢失帧的计数。 - 如果无法从帧列表中获取帧,表示抖动缓冲区为空。如果抖动缓冲区允许预取,则将预取状态设置为
PJ_TRUE
。然后,将帧类型设置为PJMEDIA_JB_ZERO_EMPTY_FRAME
,并将帧大小设置为 0。 - 最后,函数更新抖动缓冲区的级别,并调用
jbuf_update
函数更新抖动缓冲区的操作。
4、ioqueue_epoll参与
这里不一定是上述追踪的rtp包
当pj_ioqueue_poll工作线程 调用os_epoll_wait 发现监测的读触发,调用ioqueue_dispatch_read_event写操作,在ioqueue_dispatch_read_event中先看key read_list上有没有pending_read,有的话,从read_list取出,根据read_list的read_op确定读入大小,pj_sock_recvfrom接受数据的函数,将数据读入到read_op中,最后调用on_read_complete,回调函数已在pj_ioqueue_register_sock2时设置过,传入read_op为 on_rx_rtp
(*h->cb.on_read_complete)(h,
(pj_ioqueue_op_key_t*)read_op,
bytes_read);
(1)on_rx_rtp 读取操作
on_rx_rtp是被下面调用
(*h->cb.on_read_complete)(h,
(pj_ioqueue_op_key_t*)read_op,
bytes_read);
这里有一个很有意思的问题,就是read_op 在on_rx_rtp中竟然没有使用,下面来分析一下原因
udp = (struct transport_udp*) pj_ioqueue_get_user_data(key);
我们的read_op是从readlist中取出的,readlist对于读操作添加是依靠pj_ioqueue_recvfrom函数,在data is not immediately available时将read_op加入readlist
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
这里read_op->buf = buffer;的buffer,来自udp->rtp_pkt,相当于直接写入了rtp_pkt,所以不用read_op了。
on_rx_rtp是一个while循环,条件如下status来自pj_ioqueue_recvfrom的结果
status != PJ_EPENDING && status != PJ_ECANCELLED &&
udp->started
a. call_rtp_cb
在while循环里,先执行call_rtp_cb,设置pjmedia_tp_cb_param param;,调用(*cb2)(¶m);cb2由transport_attach2-》tp_attach设置为stream.c ::on_rx_rtp。注意param.pkt = udp->rtp_pkt;,这里rtp_pkt其实就是ioqueue_dispatch_read_event中read_op->buf中读到的数据rtp包
/* Call RTP cb. */
static void call_rtp_cb(struct transport_udp *udp, pj_ssize_t bytes_read,
pj_bool_t *rem_switch)
{
void (*cb)(void*,void*,pj_ssize_t);
void (*cb2)(pjmedia_tp_cb_param*);
void *user_data;
cb = udp->rtp_cb;
cb2 = udp->rtp_cb2;
user_data = udp->user_data;
if (cb2) {
pjmedia_tp_cb_param param;
param.user_data = user_data;
param.pkt = udp->rtp_pkt;
param.size = bytes_read;
param.src_addr = &udp->rtp_src_addr;
param.rem_switch = PJ_FALSE;
(*cb2)(¶m);
if (rem_switch)
*rem_switch = param.rem_switch;
} else if (cb) {
(*cb)(user_data, udp->rtp_pkt, bytes_read);
}
}
param.user_data = user_data; 注意这个user_data,是pjmedia_stream *stream
b. pj_ioqueue_recvfrom
/*
* pj_ioqueue_recvfrom()
*
* Start asynchronous recvfrom() from the socket.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
unsigned flags,
pj_sockaddr_t *addr,
int *addrlen)
{
struct read_operation *read_op;
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Check if key is closing. */
if (IS_CLOSING(key))
return PJ_ECANCELLED;
read_op = (struct read_operation*)op_key;
PJ_ASSERT_RETURN(read_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
read_op->op = PJ_IOQUEUE_OP_NONE;
/* Try to see if there's data immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
pj_status_t status;
pj_ssize_t size;
size = *length;
status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
addr, addrlen);
if (status == PJ_SUCCESS) {
/* Yes! Data is available! */
*length = size;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
return status;
}
}
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
接下来是调用pj_ioqueue_recvfrom,至于为什么明明ioqueue_dispatch_read_event已经读取了数据,此时还在读取数据,是因为可能有新的rtp包到达,pj_ioqueue_recvfrom查看有没有到达的包,如果有就调用pj_sock_recvfrom继续读读到udp->rtp_pkt,如果没有加到readlist中,返回PJ_EPENDING,结束on_rx_rtp中的while循环。
(2)on_rx_rtp::cb2
Stream.c中的回调 tp_attach中设置该回调
该函数处理接收到的rtp包, 解析成payload和head
Put "good" packet to jitter buffer,需要先把payload解析成frame,再把frame放入jitter buffer
附:相互指向关系
stream.user_data-》snd_port,
snd_port.port-》port
port->port_data.pdata-》stream
stream->transport-》pjmedia_transport