接收数据全流程

1、接受的线程(在声音设备端直接向声卡写入数据)

先看发送的线程 pb_thread_func,这里我们以alsa_dev为例

static int pb_thread_func (void *arg)
{
    struct alsa_stream* stream = (struct alsa_stream*) arg;
    snd_pcm_t* pcm             = stream->pb_pcm;
    int size                   = stream->pb_buf_size;
    snd_pcm_uframes_t nframes  = stream->pb_frames;
    void* user_data            = stream->user_data;
    char* buf                  = stream->pb_buf;
    pj_timestamp tstamp;
    int result;

    pj_bzero (buf, size);
    tstamp.u64 = 0;

    TRACE_((THIS_FILE, "pb_thread_func(%u): Started",
            (unsigned)syscall(SYS_gettid)));

    snd_pcm_prepare (pcm);

    while (!stream->quit) {
        pjmedia_frame frame;

        frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
        frame.buf = buf;
        frame.size = size;
        frame.timestamp.u64 = tstamp.u64;
        frame.bit_info = 0;

        result = stream->pb_cb (user_data, &frame);
        if (result != PJ_SUCCESS || stream->quit)
            break;

        if (frame.type != PJMEDIA_FRAME_TYPE_AUDIO)
            pj_bzero (buf, size);

        result = snd_pcm_writei (pcm, buf, nframes);
        if (result == -EPIPE) {
            PJ_LOG (4,(THIS_FILE, "pb_thread_func: underrun!"));
            snd_pcm_prepare (pcm);
        } else if (result < 0) {
            PJ_LOG (4,(THIS_FILE, "pb_thread_func: error writing data!"));
        }

        tstamp.u64 += nframes;
    }

    snd_pcm_drop(pcm);
    TRACE_((THIS_FILE, "pb_thread_func: Stopped"));
    return PJ_SUCCESS;
}

该线程的主体部分在不停的while循环,我们来看一次循环的内容:

调用 stream->pb_cb (user_data, &frame); 其中user_data 依然是我们创建的snd_port,frame存放最终接收的一个frame即Frame to store samples.,需要先对frame的类型等信息进行设置。

调用 snd_pcm_writei (pcm, buf, nframes); 将数据写入声卡

2、play_cb (音频流端)

pjmedia/src/pjmedia/sound_port.c

/*
 * The callback called by sound player when it needs more samples to be
 * played.
 */
static pj_status_t play_cb(void *user_data, pjmedia_frame *frame)
{
    pjmedia_snd_port *snd_port = (pjmedia_snd_port*) user_data;
    pjmedia_port *port;
    const unsigned required_size = (unsigned)frame->size;
    pj_status_t status;

    pjmedia_clock_src_update(&snd_port->play_clocksrc, &frame->timestamp);

    port = snd_port->port;
    if (port == NULL)
        goto no_frame;

    status = pjmedia_port_get_frame(port, frame);
    if (status != PJ_SUCCESS)
        goto no_frame;

    if (frame->type != PJMEDIA_FRAME_TYPE_AUDIO)
        goto no_frame;

    /* Must supply the required samples */
    pj_assert(frame->size == required_size);

    if (snd_port->ec_state) {
        if (snd_port->ec_suspended) {
            snd_port->ec_suspended = PJ_FALSE;
            //pjmedia_echo_state_reset(snd_port->ec_state);
            PJ_LOG(4,(THIS_FILE, "EC activated"));
        }
        snd_port->ec_suspend_count = 0;
        pjmedia_echo_playback(snd_port->ec_state, (pj_int16_t*)frame->buf);
    }

    /* Invoke preview callback */
    if (snd_port->on_play_frame)
        (*snd_port->on_play_frame)(snd_port->user_data, frame);

    return PJ_SUCCESS;

no_frame:
    frame->type = PJMEDIA_FRAME_TYPE_AUDIO;
    frame->size = required_size;
    pj_bzero(frame->buf, frame->size);

    if (snd_port->ec_state && !snd_port->ec_suspended) {
        ++snd_port->ec_suspend_count;
        if (snd_port->ec_suspend_count > snd_port->ec_suspend_limit) {
            snd_port->ec_suspended = PJ_TRUE;
            PJ_LOG(4,(THIS_FILE, "EC suspended because of inactivity"));
        }
        if (snd_port->ec_state) {
            /* To maintain correct delay in EC */
            pjmedia_echo_playback(snd_port->ec_state, (pj_int16_t*)frame->buf);
        }
    }

    /* Invoke preview callback */
    if (snd_port->on_play_frame)
        (*snd_port->on_play_frame)(snd_port->user_data, frame);

    return PJ_SUCCESS;
}

首先看snd_port->on_play_frame这个是可选项,这里没有使用

其次就是最重要的调用pjmedia_port_get_frame,看一下这里的port参数,port = snd_port->port; 是pjmedia_snd_port类型snd_port中的pjmedia_port类型属性,snd_port->port是在pjmedia_snd_port_connect中初始化的先看一下pjmedia_snd_port_connect

/*
 * Connect a port.
 */
PJ_DEF(pj_status_t) pjmedia_snd_port_connect( pjmedia_snd_port *snd_port,
                                              pjmedia_port *port)
{
    pjmedia_audio_format_detail *afd;

    PJ_ASSERT_RETURN(snd_port && port, PJ_EINVAL);

    afd = pjmedia_format_get_audio_format_detail(&port->info.fmt, PJ_TRUE);

    /* Check that port has the same configuration as the sound device
     * port.
     */
    if (afd->clock_rate != snd_port->clock_rate)
        return PJMEDIA_ENCCLOCKRATE;

    if (PJMEDIA_AFD_SPF(afd) != snd_port->samples_per_frame)
        return PJMEDIA_ENCSAMPLESPFRAME;

    if (afd->channel_count != snd_port->channel_count)
        return PJMEDIA_ENCCHANNEL;

    if (afd->bits_per_sample != snd_port->bits_per_sample)
        return PJMEDIA_ENCBITS;

    /* Port is okay. */
    snd_port->port = port;
    return PJ_SUCCESS;
}

注意这里的参数pjmedia_port *port来自pjmedia_stream stream的port,这个port还有一点特殊即port->port_data.pdata; 其实是指向stream,即stream中有port,port也有办法指向stream,这在put_frame中会遇到。

<::>再回来看pjmedia_port_put_frame该函数其实是port->put_frame callback的封装,会直接调用port->put_frame,这个回调函数的初始化在pjmedia_stream_create 完成将port->put_frame 初始化为 stream->get_frame = &get_frame;,所以接下来我们看put_frame

/**
 * Get a frame from the port (and subsequent downstream ports).
 */
PJ_DEF(pj_status_t) pjmedia_port_get_frame( pjmedia_port *port,
                                            pjmedia_frame *frame )
{
    PJ_ASSERT_RETURN(port && frame, PJ_EINVAL);

    if (port->get_frame)
        return port->get_frame(port, frame);
    else {
        frame->type = PJMEDIA_FRAME_TYPE_NONE;
        return PJ_EINVALIDOP;
    }
}

3、get_frame (port端)

pjmedia/src/pjmedia/stream.c

  1. 首先,函数从 port 中获取与当前流相关联的 streamchannel

        pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
        pjmedia_channel *channel = stream->dec;
    
  2. 然后,函数检查通道是否处于暂停状态,如果是,则设置帧类型为 PJMEDIA_FRAME_TYPE_NONE,表示没有帧可用,并返回 PJ_SUCCESS

        /* Return no frame is channel is paused */
        if (channel->paused) {
            frame->type = PJMEDIA_FRAME_TYPE_NONE;
            return PJ_SUCCESS;
        }
    
  3. 接着,函数检查是否处于软启动计数状态。如果是,它首先检查软启动计数是否为 PJMEDIA_STREAM_SOFT_START,如果是,则重置抖动缓冲区。然后递减软启动计数,并返回 PJ_SUCCESS

        if (stream->soft_start_cnt) {
            if (stream->soft_start_cnt == PJMEDIA_STREAM_SOFT_START) {
                PJ_LOG(4,(stream->port.info.name.ptr,
                          "Resetting jitter buffer in stream playback start"));
                pj_mutex_lock( stream->jb_mutex );
                pjmedia_jbuf_reset(stream->jb);
                pj_mutex_unlock( stream->jb_mutex );
            }
            --stream->soft_start_cnt;
            frame->type = PJMEDIA_FRAME_TYPE_NONE;
            return PJ_SUCCESS;
        }
    
  4. 接下来,函数从抖动缓冲区中获取帧并解码,直到获得足够的帧以满足编解码器的 ptime 要求。

    • 函数首先锁定抖动缓冲区的互斥锁。

      pj_mutex_lock( stream->jb_mutex );
      
    • 接着,函数计算所需的样本数,并根据解码器的 ptime 要求计算每帧的样本数。

          samples_required = PJMEDIA_PIA_SPF(&stream->port.info);
          samples_per_frame = stream->dec_ptime *
                              stream->codec_param.info.clock_rate *
                              stream->codec_param.info.channel_cnt /
                              stream->dec_ptime_denum /
                              1000;
          p_out_samp = (pj_int16_t*) frame->buf;
      
    • 然后,函数循环获取帧,直到获得足够的样本数。在每次循环中,它会尝试从抖动缓冲区获取帧pjmedia_jbuf_get_frame2放在channel->out_pkt,并根据获取的帧类型进行不同的处理。

          for (samples_count=0; samples_count < samples_required;) {
              char frame_type;
              pj_size_t frame_size = channel->out_pkt_size;
              pj_uint32_t bit_info;
      
              if (stream->dec_buf && stream->dec_buf_pos < stream->dec_buf_count) {
                  unsigned nsamples_req = samples_required - samples_count;
                  unsigned nsamples_avail = stream->dec_buf_count -
                                            stream->dec_buf_pos;
                  unsigned nsamples_copy = PJ_MIN(nsamples_req, nsamples_avail);
      
                  pjmedia_copy_samples(p_out_samp + samples_count,
                                       stream->dec_buf + stream->dec_buf_pos,
                                       nsamples_copy);
                  samples_count += nsamples_copy;
                  stream->dec_buf_pos += nsamples_copy;
                  continue;
              }
      
              /* Get frame from jitter buffer. */
              pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size,
                                      &frame_type, &bit_info);
      
      • 如果帧类型为 PJMEDIA_JB_MISSING_FRAME,表示丢失帧,则尝试激活 PLC 进行丢帧处理。如果 PLC 激活成功,则填充丢失的样本,并增加 PLC 计数。

      • 如果帧类型为 PJMEDIA_JB_ZERO_EMPTY_FRAME,表示抖动缓冲区为空。函数会尝试激活 PLC 进行丢帧处理,然后填充零样本,以平滑淡出。

      • 如果帧类型为 PJMEDIA_JB_ZERO_PREFETCH_FRAME,表示抖动缓冲区正在预取数据。函数会尝试激活 PLC 进行丢帧处理,然后填充零样本。

      • 如果帧类型为 PJMEDIA_JB_NORMAL_FRAME,表示获得了正常的帧。函数会解码帧pjmedia_codec_decode 得到frame_out并将其放入播放缓冲区即传入的参数frame中。

          /* Got "NORMAL" frame from jitter buffer */
          pjmedia_frame frame_in, frame_out;
          pj_bool_t use_dec_buf = PJ_FALSE;
        
          stream->plc_cnt = 0;
        
          /* Decode */
          frame_in.buf = channel->out_pkt;
          frame_in.size = frame_size;
          frame_in.bit_info = bit_info;
          frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO;  /* ignored */
        
          frame_out.buf = p_out_samp + samples_count;
          frame_out.size = frame->size - samples_count*BYTES_PER_SAMPLE;
          if (stream->dec_buf &&
              bit_info * sizeof(pj_int16_t) > frame_out.size)
          {
              stream->dec_buf_pos = 0;
              stream->dec_buf_count = bit_info;
        
              use_dec_buf = PJ_TRUE;
              frame_out.buf = stream->dec_buf;
              frame_out.size = stream->dec_buf_size;
          }
        
          status = pjmedia_codec_decode( stream->codec, &frame_in,
                                         (unsigned)frame_out.size,
                                         &frame_out);
          if (status != 0) {
              LOGERR_((port->info.name.ptr, status,
                       "codec decode() error"));
        
              if (use_dec_buf) {
                  pjmedia_zero_samples(stream->dec_buf,
                                       stream->dec_buf_count);
              } else {
                  pjmedia_zero_samples(p_out_samp + samples_count,
                                       samples_per_frame);
              }
          } else if (use_dec_buf) {
              stream->dec_buf_count = (unsigned)frame_out.size /
                                      sizeof(pj_int16_t);
          }
        
          if (stream->jb_last_frm != frame_type) {
              /* Report changing frame type event */
              PJ_LOG(5,(stream->port.info.name.ptr,
                        "Jitter buffer starts returning normal frames "
                        "(after %d empty/lost)",
                        stream->jb_last_frm_cnt));
        
              stream->jb_last_frm = frame_type;
              stream->jb_last_frm_cnt = 1;
          } else {
              stream->jb_last_frm_cnt++;
          }
          if (!use_dec_buf)
              samples_count += samples_per_frame;
        
        
  5. 最后,函数解锁抖动缓冲区的互斥锁,并根据获取的样本数设置帧类型和大小,并返回 PJ_SUCCESS

(1)pjmedia_jbuf_get_frame3

/*
 * Get frame from jitter buffer.
 */
PJ_DEF(void) pjmedia_jbuf_get_frame3(pjmedia_jbuf *jb,
                                     void *frame,
                                     pj_size_t *size,
                                     char *p_frame_type,
                                     pj_uint32_t *bit_info,
                                     pj_uint32_t *ts,
                                     int *seq)
{
    if (jb->jb_prefetching) {

        /* Can't return frame because jitter buffer is filling up
         * minimum prefetch.
         */

        //pj_bzero(frame, jb->jb_frame_size);
        *p_frame_type = PJMEDIA_JB_ZERO_PREFETCH_FRAME;
        if (size)
            *size = 0;

        TRACE__((jb->jb_name.ptr, "GET prefetch_cnt=%d/%d",
                 jb_framelist_eff_size(&jb->jb_framelist), jb->jb_prefetch));

        jb->jb_empty++;

    } else {

        pjmedia_jb_frame_type ftype = PJMEDIA_JB_NORMAL_FRAME;
        pj_bool_t res;

        /* Try to retrieve a frame from frame list */
        res = jb_framelist_get(&jb->jb_framelist, frame, size, &ftype,
                               bit_info, ts, seq);
        if (res) {
            /* We've successfully retrieved a frame from the frame list, but
             * the frame could be a blank frame!
             */
            if (ftype == PJMEDIA_JB_NORMAL_FRAME) {
                *p_frame_type = PJMEDIA_JB_NORMAL_FRAME;
            } else {
                *p_frame_type = PJMEDIA_JB_MISSING_FRAME;
                jb->jb_lost++;
            }

            /* Store delay history at the first GET */
            if (jb->jb_last_op == JB_OP_PUT) {
                unsigned cur_size;

                /* We've just retrieved one frame, so add one to cur_size */
                cur_size = jb_framelist_eff_size(&jb->jb_framelist) + 1;
                pj_math_stat_update(&jb->jb_delay,
                                    cur_size * jb->jb_frame_ptime /
                                    jb->jb_frame_ptime_denum);
            }
        } else {
            /* Jitter buffer is empty */
            if (jb->jb_prefetch)
                jb->jb_prefetching = PJ_TRUE;

            //pj_bzero(frame, jb->jb_frame_size);
            *p_frame_type = PJMEDIA_JB_ZERO_EMPTY_FRAME;
            if (size)
                *size = 0;

            jb->jb_empty++;
        }
    }

    jb->jb_level++;
    jbuf_update(jb, JB_OP_GET);
}

  1. 首先,函数检查抖动缓冲区是否正在预取数据。如果是,则表示抖动缓冲区正在填充,此时无法返回帧,因此将帧类型设置为 PJMEDIA_JB_ZERO_PREFETCH_FRAME,并将帧大小设置为 0。
  2. 如果抖动缓冲区不在预取状态,则尝试从帧列表中获取帧。如果成功获取到帧,则将帧类型设置为 PJMEDIA_JB_NORMAL_FRAME,表示正常帧。如果获取到的是空白帧,则将帧类型设置为 PJMEDIA_JB_MISSING_FRAME,并增加抖动缓冲区丢失帧的计数。
  3. 如果无法从帧列表中获取帧,表示抖动缓冲区为空。如果抖动缓冲区允许预取,则将预取状态设置为 PJ_TRUE。然后,将帧类型设置为 PJMEDIA_JB_ZERO_EMPTY_FRAME,并将帧大小设置为 0。
  4. 最后,函数更新抖动缓冲区的级别,并调用 jbuf_update 函数更新抖动缓冲区的操作。

4、ioqueue_epoll参与

这里不一定是上述追踪的rtp包

当pj_ioqueue_poll工作线程 调用os_epoll_wait 发现监测的读触发,调用ioqueue_dispatch_read_event写操作,在ioqueue_dispatch_read_event中先看key read_list上有没有pending_read,有的话,从read_list取出,根据read_list的read_op确定读入大小,pj_sock_recvfrom接受数据的函数,将数据读入到read_op中,最后调用on_read_complete,回调函数已在pj_ioqueue_register_sock2时设置过,传入read_op为 on_rx_rtp

(*h->cb.on_read_complete)(h, 
                                      (pj_ioqueue_op_key_t*)read_op,
                                      bytes_read);

(1)on_rx_rtp 读取操作

on_rx_rtp是被下面调用

(*h->cb.on_read_complete)(h, 
                                      (pj_ioqueue_op_key_t*)read_op,
                                      bytes_read);

这里有一个很有意思的问题,就是read_op 在on_rx_rtp中竟然没有使用,下面来分析一下原因

udp = (struct transport_udp*) pj_ioqueue_get_user_data(key);

我们的read_op是从readlist中取出的,readlist对于读操作添加是依靠pj_ioqueue_recvfrom函数,在data is not immediately available时将read_op加入readlist

read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;

pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
 * in multithreaded app. If we add bad handle to the set it will
 * corrupt the ioqueue set. See #913
 */
if (IS_CLOSING(key)) {
    pj_ioqueue_unlock_key(key);
    return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);

这里read_op->buf = buffer;的buffer,来自udp->rtp_pkt,相当于直接写入了rtp_pkt,所以不用read_op了。

on_rx_rtp是一个while循环,条件如下status来自pj_ioqueue_recvfrom的结果

status != PJ_EPENDING && status != PJ_ECANCELLED &&
             udp->started

a. call_rtp_cb

在while循环里,先执行call_rtp_cb,设置pjmedia_tp_cb_param param;,调用(*cb2)(&param);cb2由transport_attach2-》tp_attach设置为stream.c ::on_rx_rtp。注意param.pkt = udp->rtp_pkt;,这里rtp_pkt其实就是ioqueue_dispatch_read_event中read_op->buf中读到的数据rtp包

/* Call RTP cb. */
static void call_rtp_cb(struct transport_udp *udp, pj_ssize_t bytes_read, 
                        pj_bool_t *rem_switch)
{
    void (*cb)(void*,void*,pj_ssize_t);
    void (*cb2)(pjmedia_tp_cb_param*);
    void *user_data;

    cb = udp->rtp_cb;
    cb2 = udp->rtp_cb2;
    user_data = udp->user_data;

    if (cb2) {
        pjmedia_tp_cb_param param;

        param.user_data = user_data;
        param.pkt = udp->rtp_pkt;
        param.size = bytes_read;
        param.src_addr = &udp->rtp_src_addr;
        param.rem_switch = PJ_FALSE;
        (*cb2)(&param);
        if (rem_switch)
            *rem_switch = param.rem_switch;
    } else if (cb) {
        (*cb)(user_data, udp->rtp_pkt, bytes_read);
    }
}

param.user_data = user_data; 注意这个user_data,是pjmedia_stream *stream

b. pj_ioqueue_recvfrom

/*
 * pj_ioqueue_recvfrom()
 *
 * Start asynchronous recvfrom() from the socket.
 */
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
                                         pj_ioqueue_op_key_t *op_key,
                                         void *buffer,
                                         pj_ssize_t *length,
                                         unsigned flags,
                                         pj_sockaddr_t *addr,
                                         int *addrlen)
{
    struct read_operation *read_op;

    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
    PJ_CHECK_STACK();

    /* Check if key is closing. */
    if (IS_CLOSING(key))
        return PJ_ECANCELLED;

    read_op = (struct read_operation*)op_key;
    PJ_ASSERT_RETURN(read_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
    read_op->op = PJ_IOQUEUE_OP_NONE;

    /* Try to see if there's data immediately available. 
     */
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
        pj_status_t status;
        pj_ssize_t size;

        size = *length;
        status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
                                  addr, addrlen);
        if (status == PJ_SUCCESS) {
            /* Yes! Data is available! */
            *length = size;
            return PJ_SUCCESS;
        } else {
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
             * the error to caller.
             */
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
                return status;
        }
    }

    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

    /*
     * No data is immediately available.
     * Must schedule asynchronous operation to the ioqueue.
     */
    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
    read_op->buf = buffer;
    read_op->size = *length;
    read_op->flags = flags;
    read_op->rmt_addr = addr;
    read_op->rmt_addrlen = addrlen;

    pj_ioqueue_lock_key(key);
    /* Check again. Handle may have been closed after the previous check
     * in multithreaded app. If we add bad handle to the set it will
     * corrupt the ioqueue set. See #913
     */
    if (IS_CLOSING(key)) {
        pj_ioqueue_unlock_key(key);
        return PJ_ECANCELLED;
    }
    pj_list_insert_before(&key->read_list, read_op);
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
    pj_ioqueue_unlock_key(key);

    return PJ_EPENDING;
}

接下来是调用pj_ioqueue_recvfrom,至于为什么明明ioqueue_dispatch_read_event已经读取了数据,此时还在读取数据,是因为可能有新的rtp包到达,pj_ioqueue_recvfrom查看有没有到达的包,如果有就调用pj_sock_recvfrom继续读读到udp->rtp_pkt,如果没有加到readlist中,返回PJ_EPENDING,结束on_rx_rtp中的while循环。

(2)on_rx_rtp::cb2

Stream.c中的回调 tp_attach中设置该回调

该函数处理接收到的rtp包, 解析成payload和head

Put "good" packet to jitter buffer,需要先把payload解析成frame,再把frame放入jitter buffer

附:相互指向关系

stream.user_data-》snd_port,

snd_port.port-》port

port->port_data.pdata-》stream

stream->transport-》pjmedia_transport