接收数据全流程
1、接受的线程(在声音设备端直接向声卡写入数据)
先看发送的线程 pb_thread_func,这里我们以alsa_dev为例
static int pb_thread_func (void *arg)
{
    struct alsa_stream* stream = (struct alsa_stream*) arg;
    snd_pcm_t* pcm             = stream->pb_pcm;
    int size                   = stream->pb_buf_size;
    snd_pcm_uframes_t nframes  = stream->pb_frames;
    void* user_data            = stream->user_data;
    char* buf                  = stream->pb_buf;
    pj_timestamp tstamp;
    int result;
    pj_bzero (buf, size);
    tstamp.u64 = 0;
    TRACE_((THIS_FILE, "pb_thread_func(%u): Started",
            (unsigned)syscall(SYS_gettid)));
    snd_pcm_prepare (pcm);
    while (!stream->quit) {
        pjmedia_frame frame;
        frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
        frame.buf = buf;
        frame.size = size;
        frame.timestamp.u64 = tstamp.u64;
        frame.bit_info = 0;
        result = stream->pb_cb (user_data, &frame);
        if (result != PJ_SUCCESS || stream->quit)
            break;
        if (frame.type != PJMEDIA_FRAME_TYPE_AUDIO)
            pj_bzero (buf, size);
        result = snd_pcm_writei (pcm, buf, nframes);
        if (result == -EPIPE) {
            PJ_LOG (4,(THIS_FILE, "pb_thread_func: underrun!"));
            snd_pcm_prepare (pcm);
        } else if (result < 0) {
            PJ_LOG (4,(THIS_FILE, "pb_thread_func: error writing data!"));
        }
        tstamp.u64 += nframes;
    }
    snd_pcm_drop(pcm);
    TRACE_((THIS_FILE, "pb_thread_func: Stopped"));
    return PJ_SUCCESS;
}
该线程的主体部分在不停的while循环,我们来看一次循环的内容:
调用 stream->pb_cb (user_data, &frame);  其中user_data 依然是我们创建的snd_port,frame存放最终接收的一个frame即Frame to store samples.,需要先对frame的类型等信息进行设置。
调用 snd_pcm_writei (pcm, buf, nframes); 将数据写入声卡
2、play_cb (音频流端)
pjmedia/src/pjmedia/sound_port.c
/*
 * The callback called by sound player when it needs more samples to be
 * played.
 */
static pj_status_t play_cb(void *user_data, pjmedia_frame *frame)
{
    pjmedia_snd_port *snd_port = (pjmedia_snd_port*) user_data;
    pjmedia_port *port;
    const unsigned required_size = (unsigned)frame->size;
    pj_status_t status;
    pjmedia_clock_src_update(&snd_port->play_clocksrc, &frame->timestamp);
    port = snd_port->port;
    if (port == NULL)
        goto no_frame;
    status = pjmedia_port_get_frame(port, frame);
    if (status != PJ_SUCCESS)
        goto no_frame;
    if (frame->type != PJMEDIA_FRAME_TYPE_AUDIO)
        goto no_frame;
    /* Must supply the required samples */
    pj_assert(frame->size == required_size);
    if (snd_port->ec_state) {
        if (snd_port->ec_suspended) {
            snd_port->ec_suspended = PJ_FALSE;
            //pjmedia_echo_state_reset(snd_port->ec_state);
            PJ_LOG(4,(THIS_FILE, "EC activated"));
        }
        snd_port->ec_suspend_count = 0;
        pjmedia_echo_playback(snd_port->ec_state, (pj_int16_t*)frame->buf);
    }
    /* Invoke preview callback */
    if (snd_port->on_play_frame)
        (*snd_port->on_play_frame)(snd_port->user_data, frame);
    return PJ_SUCCESS;
no_frame:
    frame->type = PJMEDIA_FRAME_TYPE_AUDIO;
    frame->size = required_size;
    pj_bzero(frame->buf, frame->size);
    if (snd_port->ec_state && !snd_port->ec_suspended) {
        ++snd_port->ec_suspend_count;
        if (snd_port->ec_suspend_count > snd_port->ec_suspend_limit) {
            snd_port->ec_suspended = PJ_TRUE;
            PJ_LOG(4,(THIS_FILE, "EC suspended because of inactivity"));
        }
        if (snd_port->ec_state) {
            /* To maintain correct delay in EC */
            pjmedia_echo_playback(snd_port->ec_state, (pj_int16_t*)frame->buf);
        }
    }
    /* Invoke preview callback */
    if (snd_port->on_play_frame)
        (*snd_port->on_play_frame)(snd_port->user_data, frame);
    return PJ_SUCCESS;
}
首先看snd_port->on_play_frame这个是可选项,这里没有使用
其次就是最重要的调用pjmedia_port_get_frame,看一下这里的port参数,port = snd_port->port;  是pjmedia_snd_port类型snd_port中的pjmedia_port类型属性,snd_port->port是在pjmedia_snd_port_connect中初始化的先看一下pjmedia_snd_port_connect
/*
 * Connect a port.
 */
PJ_DEF(pj_status_t) pjmedia_snd_port_connect( pjmedia_snd_port *snd_port,
                                              pjmedia_port *port)
{
    pjmedia_audio_format_detail *afd;
    PJ_ASSERT_RETURN(snd_port && port, PJ_EINVAL);
    afd = pjmedia_format_get_audio_format_detail(&port->info.fmt, PJ_TRUE);
    /* Check that port has the same configuration as the sound device
     * port.
     */
    if (afd->clock_rate != snd_port->clock_rate)
        return PJMEDIA_ENCCLOCKRATE;
    if (PJMEDIA_AFD_SPF(afd) != snd_port->samples_per_frame)
        return PJMEDIA_ENCSAMPLESPFRAME;
    if (afd->channel_count != snd_port->channel_count)
        return PJMEDIA_ENCCHANNEL;
    if (afd->bits_per_sample != snd_port->bits_per_sample)
        return PJMEDIA_ENCBITS;
    /* Port is okay. */
    snd_port->port = port;
    return PJ_SUCCESS;
}
注意这里的参数pjmedia_port *port来自pjmedia_stream stream的port,这个port还有一点特殊即port->port_data.pdata; 其实是指向stream,即stream中有port,port也有办法指向stream,这在put_frame中会遇到。
<::>再回来看pjmedia_port_put_frame该函数其实是port->put_frame callback的封装,会直接调用port->put_frame,这个回调函数的初始化在pjmedia_stream_create 完成将port->put_frame 初始化为 stream->get_frame = &get_frame;,所以接下来我们看put_frame
/**
 * Get a frame from the port (and subsequent downstream ports).
 */
PJ_DEF(pj_status_t) pjmedia_port_get_frame( pjmedia_port *port,
                                            pjmedia_frame *frame )
{
    PJ_ASSERT_RETURN(port && frame, PJ_EINVAL);
    if (port->get_frame)
        return port->get_frame(port, frame);
    else {
        frame->type = PJMEDIA_FRAME_TYPE_NONE;
        return PJ_EINVALIDOP;
    }
}
3、get_frame (port端)
pjmedia/src/pjmedia/stream.c
- 
首先,函数从 port中获取与当前流相关联的stream和channel。pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; pjmedia_channel *channel = stream->dec;
- 
然后,函数检查通道是否处于暂停状态,如果是,则设置帧类型为 PJMEDIA_FRAME_TYPE_NONE,表示没有帧可用,并返回PJ_SUCCESS。/* Return no frame is channel is paused */ if (channel->paused) { frame->type = PJMEDIA_FRAME_TYPE_NONE; return PJ_SUCCESS; }
- 
接着,函数检查是否处于软启动计数状态。如果是,它首先检查软启动计数是否为 PJMEDIA_STREAM_SOFT_START,如果是,则重置抖动缓冲区。然后递减软启动计数,并返回PJ_SUCCESS。if (stream->soft_start_cnt) { if (stream->soft_start_cnt == PJMEDIA_STREAM_SOFT_START) { PJ_LOG(4,(stream->port.info.name.ptr, "Resetting jitter buffer in stream playback start")); pj_mutex_lock( stream->jb_mutex ); pjmedia_jbuf_reset(stream->jb); pj_mutex_unlock( stream->jb_mutex ); } --stream->soft_start_cnt; frame->type = PJMEDIA_FRAME_TYPE_NONE; return PJ_SUCCESS; }
- 
接下来,函数从抖动缓冲区中获取帧并解码,直到获得足够的帧以满足编解码器的 ptime 要求。 - 
函数首先锁定抖动缓冲区的互斥锁。 pj_mutex_lock( stream->jb_mutex );
- 
接着,函数计算所需的样本数,并根据解码器的 ptime 要求计算每帧的样本数。 samples_required = PJMEDIA_PIA_SPF(&stream->port.info); samples_per_frame = stream->dec_ptime * stream->codec_param.info.clock_rate * stream->codec_param.info.channel_cnt / stream->dec_ptime_denum / 1000; p_out_samp = (pj_int16_t*) frame->buf;
- 
然后,函数循环获取帧,直到获得足够的样本数。在每次循环中,它会尝试从抖动缓冲区获取帧pjmedia_jbuf_get_frame2放在channel->out_pkt,并根据获取的帧类型进行不同的处理。 for (samples_count=0; samples_count < samples_required;) { char frame_type; pj_size_t frame_size = channel->out_pkt_size; pj_uint32_t bit_info; if (stream->dec_buf && stream->dec_buf_pos < stream->dec_buf_count) { unsigned nsamples_req = samples_required - samples_count; unsigned nsamples_avail = stream->dec_buf_count - stream->dec_buf_pos; unsigned nsamples_copy = PJ_MIN(nsamples_req, nsamples_avail); pjmedia_copy_samples(p_out_samp + samples_count, stream->dec_buf + stream->dec_buf_pos, nsamples_copy); samples_count += nsamples_copy; stream->dec_buf_pos += nsamples_copy; continue; } /* Get frame from jitter buffer. */ pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size, &frame_type, &bit_info);- 
如果帧类型为 PJMEDIA_JB_MISSING_FRAME,表示丢失帧,则尝试激活 PLC 进行丢帧处理。如果 PLC 激活成功,则填充丢失的样本,并增加 PLC 计数。
- 
如果帧类型为 PJMEDIA_JB_ZERO_EMPTY_FRAME,表示抖动缓冲区为空。函数会尝试激活 PLC 进行丢帧处理,然后填充零样本,以平滑淡出。
- 
如果帧类型为 PJMEDIA_JB_ZERO_PREFETCH_FRAME,表示抖动缓冲区正在预取数据。函数会尝试激活 PLC 进行丢帧处理,然后填充零样本。
- 
如果帧类型为 PJMEDIA_JB_NORMAL_FRAME,表示获得了正常的帧。函数会解码帧pjmedia_codec_decode 得到frame_out并将其放入播放缓冲区即传入的参数frame中。/* Got "NORMAL" frame from jitter buffer */ pjmedia_frame frame_in, frame_out; pj_bool_t use_dec_buf = PJ_FALSE; stream->plc_cnt = 0; /* Decode */ frame_in.buf = channel->out_pkt; frame_in.size = frame_size; frame_in.bit_info = bit_info; frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; /* ignored */ frame_out.buf = p_out_samp + samples_count; frame_out.size = frame->size - samples_count*BYTES_PER_SAMPLE; if (stream->dec_buf && bit_info * sizeof(pj_int16_t) > frame_out.size) { stream->dec_buf_pos = 0; stream->dec_buf_count = bit_info; use_dec_buf = PJ_TRUE; frame_out.buf = stream->dec_buf; frame_out.size = stream->dec_buf_size; } status = pjmedia_codec_decode( stream->codec, &frame_in, (unsigned)frame_out.size, &frame_out); if (status != 0) { LOGERR_((port->info.name.ptr, status, "codec decode() error")); if (use_dec_buf) { pjmedia_zero_samples(stream->dec_buf, stream->dec_buf_count); } else { pjmedia_zero_samples(p_out_samp + samples_count, samples_per_frame); } } else if (use_dec_buf) { stream->dec_buf_count = (unsigned)frame_out.size / sizeof(pj_int16_t); } if (stream->jb_last_frm != frame_type) { /* Report changing frame type event */ PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer starts returning normal frames " "(after %d empty/lost)", stream->jb_last_frm_cnt)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } if (!use_dec_buf) samples_count += samples_per_frame;
 
- 
 
- 
- 
最后,函数解锁抖动缓冲区的互斥锁,并根据获取的样本数设置帧类型和大小,并返回 PJ_SUCCESS。
(1)pjmedia_jbuf_get_frame3
/*
 * Get frame from jitter buffer.
 */
PJ_DEF(void) pjmedia_jbuf_get_frame3(pjmedia_jbuf *jb,
                                     void *frame,
                                     pj_size_t *size,
                                     char *p_frame_type,
                                     pj_uint32_t *bit_info,
                                     pj_uint32_t *ts,
                                     int *seq)
{
    if (jb->jb_prefetching) {
        /* Can't return frame because jitter buffer is filling up
         * minimum prefetch.
         */
        //pj_bzero(frame, jb->jb_frame_size);
        *p_frame_type = PJMEDIA_JB_ZERO_PREFETCH_FRAME;
        if (size)
            *size = 0;
        TRACE__((jb->jb_name.ptr, "GET prefetch_cnt=%d/%d",
                 jb_framelist_eff_size(&jb->jb_framelist), jb->jb_prefetch));
        jb->jb_empty++;
    } else {
        pjmedia_jb_frame_type ftype = PJMEDIA_JB_NORMAL_FRAME;
        pj_bool_t res;
        /* Try to retrieve a frame from frame list */
        res = jb_framelist_get(&jb->jb_framelist, frame, size, &ftype,
                               bit_info, ts, seq);
        if (res) {
            /* We've successfully retrieved a frame from the frame list, but
             * the frame could be a blank frame!
             */
            if (ftype == PJMEDIA_JB_NORMAL_FRAME) {
                *p_frame_type = PJMEDIA_JB_NORMAL_FRAME;
            } else {
                *p_frame_type = PJMEDIA_JB_MISSING_FRAME;
                jb->jb_lost++;
            }
            /* Store delay history at the first GET */
            if (jb->jb_last_op == JB_OP_PUT) {
                unsigned cur_size;
                /* We've just retrieved one frame, so add one to cur_size */
                cur_size = jb_framelist_eff_size(&jb->jb_framelist) + 1;
                pj_math_stat_update(&jb->jb_delay,
                                    cur_size * jb->jb_frame_ptime /
                                    jb->jb_frame_ptime_denum);
            }
        } else {
            /* Jitter buffer is empty */
            if (jb->jb_prefetch)
                jb->jb_prefetching = PJ_TRUE;
            //pj_bzero(frame, jb->jb_frame_size);
            *p_frame_type = PJMEDIA_JB_ZERO_EMPTY_FRAME;
            if (size)
                *size = 0;
            jb->jb_empty++;
        }
    }
    jb->jb_level++;
    jbuf_update(jb, JB_OP_GET);
}
- 首先,函数检查抖动缓冲区是否正在预取数据。如果是,则表示抖动缓冲区正在填充,此时无法返回帧,因此将帧类型设置为 PJMEDIA_JB_ZERO_PREFETCH_FRAME,并将帧大小设置为 0。
- 如果抖动缓冲区不在预取状态,则尝试从帧列表中获取帧。如果成功获取到帧,则将帧类型设置为 PJMEDIA_JB_NORMAL_FRAME,表示正常帧。如果获取到的是空白帧,则将帧类型设置为PJMEDIA_JB_MISSING_FRAME,并增加抖动缓冲区丢失帧的计数。
- 如果无法从帧列表中获取帧,表示抖动缓冲区为空。如果抖动缓冲区允许预取,则将预取状态设置为 PJ_TRUE。然后,将帧类型设置为PJMEDIA_JB_ZERO_EMPTY_FRAME,并将帧大小设置为 0。
- 最后,函数更新抖动缓冲区的级别,并调用 jbuf_update函数更新抖动缓冲区的操作。
4、ioqueue_epoll参与
这里不一定是上述追踪的rtp包
当pj_ioqueue_poll工作线程 调用os_epoll_wait 发现监测的读触发,调用ioqueue_dispatch_read_event写操作,在ioqueue_dispatch_read_event中先看key read_list上有没有pending_read,有的话,从read_list取出,根据read_list的read_op确定读入大小,pj_sock_recvfrom接受数据的函数,将数据读入到read_op中,最后调用on_read_complete,回调函数已在pj_ioqueue_register_sock2时设置过,传入read_op为 on_rx_rtp
(*h->cb.on_read_complete)(h, 
                                      (pj_ioqueue_op_key_t*)read_op,
                                      bytes_read);
(1)on_rx_rtp 读取操作
on_rx_rtp是被下面调用
(*h->cb.on_read_complete)(h, 
                                      (pj_ioqueue_op_key_t*)read_op,
                                      bytes_read);
这里有一个很有意思的问题,就是read_op 在on_rx_rtp中竟然没有使用,下面来分析一下原因
udp = (struct transport_udp*) pj_ioqueue_get_user_data(key);
我们的read_op是从readlist中取出的,readlist对于读操作添加是依靠pj_ioqueue_recvfrom函数,在data is not immediately available时将read_op加入readlist
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
 * in multithreaded app. If we add bad handle to the set it will
 * corrupt the ioqueue set. See #913
 */
if (IS_CLOSING(key)) {
    pj_ioqueue_unlock_key(key);
    return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
这里read_op->buf = buffer;的buffer,来自udp->rtp_pkt,相当于直接写入了rtp_pkt,所以不用read_op了。
on_rx_rtp是一个while循环,条件如下status来自pj_ioqueue_recvfrom的结果
status != PJ_EPENDING && status != PJ_ECANCELLED &&
             udp->started
a. call_rtp_cb
在while循环里,先执行call_rtp_cb,设置pjmedia_tp_cb_param param;,调用(*cb2)(¶m);cb2由transport_attach2-》tp_attach设置为stream.c ::on_rx_rtp。注意param.pkt = udp->rtp_pkt;,这里rtp_pkt其实就是ioqueue_dispatch_read_event中read_op->buf中读到的数据rtp包
/* Call RTP cb. */
static void call_rtp_cb(struct transport_udp *udp, pj_ssize_t bytes_read, 
                        pj_bool_t *rem_switch)
{
    void (*cb)(void*,void*,pj_ssize_t);
    void (*cb2)(pjmedia_tp_cb_param*);
    void *user_data;
    cb = udp->rtp_cb;
    cb2 = udp->rtp_cb2;
    user_data = udp->user_data;
    if (cb2) {
        pjmedia_tp_cb_param param;
        param.user_data = user_data;
        param.pkt = udp->rtp_pkt;
        param.size = bytes_read;
        param.src_addr = &udp->rtp_src_addr;
        param.rem_switch = PJ_FALSE;
        (*cb2)(¶m);
        if (rem_switch)
            *rem_switch = param.rem_switch;
    } else if (cb) {
        (*cb)(user_data, udp->rtp_pkt, bytes_read);
    }
}
param.user_data = user_data; 注意这个user_data,是pjmedia_stream *stream
b. pj_ioqueue_recvfrom
/*
 * pj_ioqueue_recvfrom()
 *
 * Start asynchronous recvfrom() from the socket.
 */
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
                                         pj_ioqueue_op_key_t *op_key,
                                         void *buffer,
                                         pj_ssize_t *length,
                                         unsigned flags,
                                         pj_sockaddr_t *addr,
                                         int *addrlen)
{
    struct read_operation *read_op;
    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
    PJ_CHECK_STACK();
    /* Check if key is closing. */
    if (IS_CLOSING(key))
        return PJ_ECANCELLED;
    read_op = (struct read_operation*)op_key;
    PJ_ASSERT_RETURN(read_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
    read_op->op = PJ_IOQUEUE_OP_NONE;
    /* Try to see if there's data immediately available. 
     */
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
        pj_status_t status;
        pj_ssize_t size;
        size = *length;
        status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
                                  addr, addrlen);
        if (status == PJ_SUCCESS) {
            /* Yes! Data is available! */
            *length = size;
            return PJ_SUCCESS;
        } else {
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
             * the error to caller.
             */
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
                return status;
        }
    }
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
    /*
     * No data is immediately available.
     * Must schedule asynchronous operation to the ioqueue.
     */
    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
    read_op->buf = buffer;
    read_op->size = *length;
    read_op->flags = flags;
    read_op->rmt_addr = addr;
    read_op->rmt_addrlen = addrlen;
    pj_ioqueue_lock_key(key);
    /* Check again. Handle may have been closed after the previous check
     * in multithreaded app. If we add bad handle to the set it will
     * corrupt the ioqueue set. See #913
     */
    if (IS_CLOSING(key)) {
        pj_ioqueue_unlock_key(key);
        return PJ_ECANCELLED;
    }
    pj_list_insert_before(&key->read_list, read_op);
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
    pj_ioqueue_unlock_key(key);
    return PJ_EPENDING;
}
接下来是调用pj_ioqueue_recvfrom,至于为什么明明ioqueue_dispatch_read_event已经读取了数据,此时还在读取数据,是因为可能有新的rtp包到达,pj_ioqueue_recvfrom查看有没有到达的包,如果有就调用pj_sock_recvfrom继续读读到udp->rtp_pkt,如果没有加到readlist中,返回PJ_EPENDING,结束on_rx_rtp中的while循环。
(2)on_rx_rtp::cb2
Stream.c中的回调 tp_attach中设置该回调
该函数处理接收到的rtp包, 解析成payload和head
Put "good" packet to jitter buffer,需要先把payload解析成frame,再把frame放入jitter buffer
附:相互指向关系
stream.user_data-》snd_port,
snd_port.port-》port
port->port_data.pdata-》stream
stream->transport-》pjmedia_transport