发送数据全流程

1、发送的线程(在声音设备端直接获取声卡数据)

先看发送的线程 ca_thread_func,这里我们以alsa_dev为例

static int ca_thread_func (void *arg)
{
    struct alsa_stream* stream = (struct alsa_stream*) arg;
    snd_pcm_t* pcm             = stream->ca_pcm;
    int size                   = stream->ca_buf_size;
    snd_pcm_uframes_t nframes  = stream->ca_frames;
    void* user_data            = stream->user_data;
    char* buf                  = stream->ca_buf;
    pj_timestamp tstamp;
    int result;
    struct sched_param param;
    pthread_t* thid;

    thid = (pthread_t*) pj_thread_get_os_handle (pj_thread_this());
    param.sched_priority = sched_get_priority_max (SCHED_RR);
    PJ_LOG (5,(THIS_FILE, "ca_thread_func(%u): Set thread priority "
                          "for audio capture thread.",
                          (unsigned)syscall(SYS_gettid)));
    result = pthread_setschedparam (*thid, SCHED_RR, &param);
    if (result) {
        if (result == EPERM)
            PJ_LOG (5,(THIS_FILE, "Unable to increase thread priority, "
                                  "root access needed."));
        else
            PJ_LOG (5,(THIS_FILE, "Unable to increase thread priority, "
                                  "error: %d",
                                  result));
    }

    pj_bzero (buf, size);
    tstamp.u64 = 0;

    TRACE_((THIS_FILE, "ca_thread_func(%u): Started",
            (unsigned)syscall(SYS_gettid)));

    snd_pcm_prepare (pcm);

    while (!stream->quit) {
        pjmedia_frame frame;

        pj_bzero (buf, size);
        result = snd_pcm_readi (pcm, buf, nframes);
        if (result == -EPIPE) {
            PJ_LOG (4,(THIS_FILE, "ca_thread_func: overrun!"));
            snd_pcm_prepare (pcm);
            continue;
        } else if (result < 0) {
            PJ_LOG (4,(THIS_FILE, "ca_thread_func: error reading data!"));
        }
        if (stream->quit)
            break;

        frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
        frame.buf = (void*) buf;
        frame.size = size;
        frame.timestamp.u64 = tstamp.u64;
        frame.bit_info = 0;

        result = stream->ca_cb (user_data, &frame);
        if (result != PJ_SUCCESS || stream->quit)
            break;

        tstamp.u64 += nframes;
    }
    snd_pcm_drop(pcm);
    TRACE_((THIS_FILE, "ca_thread_func: Stopped"));

    return PJ_SUCCESS;
}

该线程的主体部分在不停的while循环,我们来看一次循环的内容:

调用 snd_pcm_readi (pcm, buf, nframes); 读网卡的数据到buf中。然后将读出的数据和一系列属性包装成一个frame。然后调用stream->ca_cb (user_data, &frame);

在初始化(pjmedia_aud_stream_create->f->op->create_stream)中我们知道stream->ca_cb 我们设置为 rec_cb 接下来我们来看rec_cb 在sound_port.c中.

另外我们需要知道ca_cb 中的user_data 到底是什么 这可以看我们之前初始化流程的分析:pjmedia_snd_port_create-》pjmedia_snd_port_create2-》start_sound_device-》pjmedia_aud_stream_create-》f->op->create_stream-》alsa_factory_create_stream

到pjmedia_aud_stream_create这步我们看出user_data是我们创建的snd_port

2、rec_cb (音频流端)

/*
 * The callback called by sound recorder when it has finished capturing a
 * frame.
 */
static pj_status_t rec_cb(void *user_data, pjmedia_frame *frame)
{
    pjmedia_snd_port *snd_port = (pjmedia_snd_port*) user_data;
    pjmedia_port *port;

    pjmedia_clock_src_update(&snd_port->cap_clocksrc, &frame->timestamp);

    /* Invoke preview callback */
    if (snd_port->on_rec_frame)
        (*snd_port->on_rec_frame)(snd_port->user_data, frame);

    port = snd_port->port;
    if (port == NULL)
        return PJ_SUCCESS;

    /* Cancel echo */
    if (snd_port->ec_state && !snd_port->ec_suspended) {
        pjmedia_echo_capture(snd_port->ec_state, (pj_int16_t*) frame->buf, 0);
    }

    pjmedia_port_put_frame(port, frame);


    return PJ_SUCCESS;
}

首先看snd_port->on_rec_frame这个是可选项,这里没有使用

其次就是最重要的调用pjmedia_port_put_frame,看一下这里的port参数,port = snd_port->port; 是pjmedia_snd_port类型snd_port中的pjmedia_port类型属性,snd_port->port是在pjmedia_snd_port_connect中初始化的先看一下pjmedia_snd_port_connect

/*
 * Connect a port.
 */
PJ_DEF(pj_status_t) pjmedia_snd_port_connect( pjmedia_snd_port *snd_port,
                                              pjmedia_port *port)
{
    pjmedia_audio_format_detail *afd;

    PJ_ASSERT_RETURN(snd_port && port, PJ_EINVAL);

    afd = pjmedia_format_get_audio_format_detail(&port->info.fmt, PJ_TRUE);

    /* Check that port has the same configuration as the sound device
     * port.
     */
    if (afd->clock_rate != snd_port->clock_rate)
        return PJMEDIA_ENCCLOCKRATE;

    if (PJMEDIA_AFD_SPF(afd) != snd_port->samples_per_frame)
        return PJMEDIA_ENCSAMPLESPFRAME;

    if (afd->channel_count != snd_port->channel_count)
        return PJMEDIA_ENCCHANNEL;

    if (afd->bits_per_sample != snd_port->bits_per_sample)
        return PJMEDIA_ENCBITS;

    /* Port is okay. */
    snd_port->port = port;
    return PJ_SUCCESS;
}

注意这里的参数pjmedia_port *port来自pjmedia_stream stream的port,这个port还有一点特殊即port->port_data.pdata; 其实是指向stream,即stream中有port,port也有办法指向stream,这在put_frame中会遇到。

<::>再回来看pjmedia_port_put_frame该函数其实是port->put_frame callback的封装,会直接调用port->put_frame,这个回调函数的初始化在pjmedia_stream_create 完成将port->put_frame 初始化为 stream->port.put_frame = &put_frame;,所以接下来我们看put_frame


/**
 * Put a frame to the port (and subsequent downstream ports).
 */
PJ_DEF(pj_status_t) pjmedia_port_put_frame( pjmedia_port *port,
                                            pjmedia_frame *frame )
{
    PJ_ASSERT_RETURN(port && frame, PJ_EINVAL);

    if (port->put_frame)
        return port->put_frame(port, frame);
    else
        return PJ_EINVALIDOP;
}

3、put_frame (port端)

/**
 * put_frame()
 *
 * This callback is called by upstream component when it has PCM frame
 * to transmit. This function encodes the PCM frame, pack it into
 * RTP packet, and transmit to peer.
 */
static pj_status_t put_frame( pjmedia_port *port,
                              pjmedia_frame *frame )
{
    pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
    pjmedia_frame tmp_zero_frame;
    unsigned samples_per_frame;

    samples_per_frame = stream->enc_samples_per_pkt;

    /* https://github.com/pjsip/pjproject/issues/56:
     *  when input is PJMEDIA_FRAME_TYPE_NONE, feed zero PCM frame
     *  instead so that encoder can decide whether or not to transmit
     *  silence frame.
     */
    if (frame->type == PJMEDIA_FRAME_TYPE_NONE) {
        pj_memcpy(&tmp_zero_frame, frame, sizeof(pjmedia_frame));
        frame = &tmp_zero_frame;

        tmp_zero_frame.buf = NULL;
        tmp_zero_frame.size = samples_per_frame * 2;
        tmp_zero_frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
    }

    /* If VAD is temporarily disabled during creation, enable it
     * after transmitting for VAD_SUSPEND_SEC seconds.
     */
    if (stream->vad_enabled != stream->codec_param.setting.vad &&
        (stream->tx_duration - stream->ts_vad_disabled) >
           PJMEDIA_PIA_SRATE(&stream->port.info) *
          PJMEDIA_STREAM_VAD_SUSPEND_MSEC / 1000)
    {
        stream->codec_param.setting.vad = stream->vad_enabled;
        pjmedia_codec_modify(stream->codec, &stream->codec_param);
        PJ_LOG(4,(stream->port.info.name.ptr,"VAD re-enabled"));
    }


    /* If encoder has different ptime than decoder, then the frame must
     * be passed through the encoding buffer via rebuffer() function.
     */
    if (stream->enc_buf != NULL) {
        pjmedia_frame tmp_rebuffer_frame;
        pj_status_t status = PJ_SUCCESS;

        /* Copy original frame to temporary frame since we need
         * to modify it.
         */
        pj_memcpy(&tmp_rebuffer_frame, frame, sizeof(pjmedia_frame));

        /* Loop while we have full frame in enc_buffer */
        for (;;) {
            pj_status_t st;

            /* Run rebuffer() */
            rebuffer(stream, &tmp_rebuffer_frame);

            /* Process this frame */
            st = put_frame_imp(port, &tmp_rebuffer_frame);
            if (st != PJ_SUCCESS)
                status = st;

            /* If we still have full frame in the buffer, re-run
             * rebuffer() with NULL frame.
             */
            if (stream->enc_buf_count >= stream->enc_samples_per_pkt) {

                tmp_rebuffer_frame.type = PJMEDIA_FRAME_TYPE_NONE;

            } else {

                /* Otherwise break */
                break;
            }
        }

        return status;

    } else {
        return put_frame_imp(port, frame);
    }
}

重点看一下put_frame_imp 对frame的处理,函数比较长,我们来分段看一下

  1. 函数开始时,从 port 参数中获取了指向 pjmedia_stream 结构的指针 stream,并且从 stream 中获取了编码通道 enc

        pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
        pjmedia_channel *channel = stream->enc;
    
  2. 如果流启用了保活机制,函数会检查距离上次发送数据包的时间间隔,如果超过了指定的保活间隔,会发送一个保活数据包。

  3. 然后,函数会根据帧的类型计算帧中的样本数 ts_len,并根据是否存在特定的编码器问题来确定 RTP 时间戳的长度 rtp_ts_len

        /* Number of samples in the frame */
        if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO)
            ts_len = ((unsigned)frame->size >> 1) /
                     stream->codec_param.info.channel_cnt;
        else if (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED)
            ts_len = PJMEDIA_PIA_SPF(&stream->port.info) /
                     PJMEDIA_PIA_CCNT(&stream->port.info);
        else
            ts_len = 0;
    
    #if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0)
        /* Handle special case for audio codec with RTP timestamp inconsistence
         * e.g: G722, MPEG audio.
         */
        if (stream->has_g722_mpeg_bug)
            rtp_ts_len = stream->rtp_tx_ts_len_per_pkt;
        else
            rtp_ts_len = ts_len;
    #else
        rtp_ts_len = ts_len;
    #endif
    
  4. 如果编码通道被暂停,函数会更新 RTP 会话的时间戳,并在需要时发送 RTCP SR/RR 报告。

        /* Don't do anything if stream is paused, except updating RTP timestamp */
        if (channel->paused) {
            stream->enc_buf_pos = stream->enc_buf_count = 0;
    
            /* Update RTP session's timestamp. */
            status = pjmedia_rtp_encode_rtp( &channel->rtp, 0, 0, 0, rtp_ts_len,
                                             NULL, NULL);
    
            /* Update RTCP stats with last RTP timestamp. */
            stream->rtcp.stat.rtp_tx_last_ts = pj_ntohl(channel->rtp.out_hdr.ts);
    
            /* Check if now is the time to transmit RTCP SR/RR report.
             * We only do this when the decoder is paused,
             * because otherwise check_tx_rtcp() will be handled by on_rx_rtp().
             */
            if (stream->dec->paused) {
                check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts));
            }
    
            return PJ_SUCCESS;
        }
    
  5. 接着,函数会增加传输时长,初始化输出帧缓冲区,并检查是否有 DTMF 数字在队列中,如果有则发送数字,否则对音频帧进行编码。

    	  if (stream->tx_dtmf_count) {
            int first=0, last=0;
    
            create_dtmf_payload(stream, &frame_out, 0, &first, &last);
    
            /* Encapsulate into RTP packet. Note that:
             *  - RTP marker should be set on the beginning of a new event
             *  - RTP timestamp is constant for the same packet.
             */
            status = pjmedia_rtp_encode_rtp( &channel->rtp,
                                             stream->tx_event_pt, first,
                                             (int)frame_out.size,
                                             (first ? rtp_ts_len : 0),
                                             (const void**)&rtphdr,
                                             &rtphdrlen);
    
            if (last) {
                /* This is the last packet for the event.
                 * Increment the RTP timestamp of the RTP session, for next
                 * RTP packets.
                 */
                inc_timestamp = stream->dtmf_duration +
                                ((DTMF_EBIT_RETRANSMIT_CNT-1) *
                                 stream->rtp_tx_ts_len_per_pkt)
                                - rtp_ts_len;
            }
    
        } 
    
  6. 如果音频帧的缓冲区为空,则发送一段静音,保持 NAT 绑定。

    	else if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO &&
                   frame->buf == NULL &&
                   stream->port.info.fmt.id == PJMEDIA_FORMAT_L16 &&
                   (stream->dir & PJMEDIA_DIR_ENCODING))
        {
            pjmedia_frame silence_frame;
    
            pj_bzero(&silence_frame, sizeof(silence_frame));
            silence_frame.buf = stream->zero_frame;
            silence_frame.size = stream->enc_samples_per_pkt * 2;
            silence_frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
            silence_frame.timestamp.u32.lo = pj_ntohl(stream->enc->rtp.out_hdr.ts);
    
            /* Encode! */
            status = pjmedia_codec_encode( stream->codec, &silence_frame,
                                           channel->out_pkt_size -
                                           sizeof(pjmedia_rtp_hdr),
                                           &frame_out);
            if (status != PJ_SUCCESS) {
                LOGERR_((stream->port.info.name.ptr, status,
                        "Codec encode() error"));
                return status;
            }
    
            /* Encapsulate. */
            status = pjmedia_rtp_encode_rtp( &channel->rtp,
                                             channel->pt, 0,
                                             (int)frame_out.size, rtp_ts_len,
                                             (const void**)&rtphdr,
                                             &rtphdrlen);
    
        
    } 
    
  7. 如果音频帧不为空,则对音频帧进行编码,并将 RTP 头封装到输出包中。

    else if ((frame->type == PJMEDIA_FRAME_TYPE_AUDIO &&
                    frame->buf != NULL) ||
                   (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED))
        {
            /* Encode! */
            status = pjmedia_codec_encode( stream->codec, frame,
                                           channel->out_pkt_size -
                                           sizeof(pjmedia_rtp_hdr),
                                           &frame_out);
            if (status != PJ_SUCCESS) {
                LOGERR_((stream->port.info.name.ptr, status,
                        "Codec encode() error"));
                return status;
            }
    
            /* Encapsulate. */
            status = pjmedia_rtp_encode_rtp( &channel->rtp,
                                             channel->pt, 0,
                                             (int)frame_out.size, rtp_ts_len,
                                             (const void**)&rtphdr,
                                             &rtphdrlen);
    
        } 
    
  8. 最后,函数会根据当前是否正在流式传输来设置 RTP 标记位,并将 RTP 包发送到传输层。

    
        /* Copy RTP header to the beginning of packet */
        pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr));
    
        /* Special case for DTMF: timestamp remains constant for
         * the same event, and is only updated after a complete event
         * has been transmitted.
         */
        if (inc_timestamp) {
            pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, 0,
                                    0, inc_timestamp, NULL, NULL);
        }
    
        /* Set RTP marker bit if currently not streaming */
        if (stream->is_streaming == PJ_FALSE) {
            pjmedia_rtp_hdr *rtp = (pjmedia_rtp_hdr*) channel->out_pkt;
    
            rtp->m = 1;
            PJ_LOG(5,(stream->port.info.name.ptr,"Start talksprut.."));
        }
    
        stream->is_streaming = PJ_TRUE;
    
        /* Send the RTP packet to the transport. */
        status = pjmedia_transport_send_rtp(stream->transport, channel->out_pkt,
                                            frame_out.size +
                                                sizeof(pjmedia_rtp_hdr));
    
  9. 在发送 RTP 包之后,函数会更新一些统计信息,并且如果启用了保活机制,则记录最后发送数据包的时间。

我们来仔细梳理一下rtp包发送的流程

(1)先设置frame_out.buf 对应的偏置 ((char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr);

(2)调用pjmedia_codec_encode,对frame编码结果输出到frame_out中,此时frame_out.buf获取到rtp payload,相应地(char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr处获取到rtp payload

(3)添加rtp头部,调用pjmedia_rtp_encode_rtp,hannel->rtp->out_hdr,并将头部拷贝至(char*)channel->out_pkt)处,至此channel->out_pkt存放地为编码后的rtp包

(4)Send the RTP packet to the transport. 调用pjmedia_transport_send_rtp

我们就主要看pjmedia_transport_send_rtp了,其实就是op->send_rtp 的封装,由初始化可知send_rtp为transport_send_rtcp

PJ_INLINE(pj_status_t) pjmedia_transport_send_rtp(pjmedia_transport *tp,
                                                  const void *pkt,
                                                  pj_size_t size)
{
    return (*tp->op->send_rtp)(tp, pkt, size);
}

4、transport_send_rtp

直接调用transport_send_rtcp2

/* Called by application to send RTP packet */
static pj_status_t transport_send_rtp( pjmedia_transport *tp,
                                       const void *pkt,
                                       pj_size_t size)
{
    struct transport_udp *udp = (struct transport_udp*)tp;
    pj_ssize_t sent;
    unsigned id;
    struct pending_write *pw;
    pj_status_t status;

    /* Must be attached */
    //PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP);

    /* Check that the size is supported */
    PJ_ASSERT_RETURN(size <= PJMEDIA_MAX_MTU, PJ_ETOOBIG);

    if (!udp->started) {
        return PJ_SUCCESS;
    }

    /* Simulate packet lost on TX direction */
    if (udp->tx_drop_pct) {
        if ((pj_rand() % 100) <= (int)udp->tx_drop_pct) {
            PJ_LOG(5,(udp->base.name, 
                      "TX RTP packet dropped because of pkt lost "
                      "simulation"));
            return PJ_SUCCESS;
        }
    }


    id = udp->rtp_write_op_id;
    pw = &udp->rtp_pending_write[id];
    if (pw->is_pending) {
        /* There is still currently pending operation for this buffer. */
        PJ_LOG(4,(udp->base.name, "Too many pending write operations"));
        return PJ_EBUSY;
    }
    pw->is_pending = PJ_TRUE;

    /* We need to copy packet to our buffer because when the
     * operation is pending, caller might write something else
     * to the original buffer.
     */
    pj_memcpy(pw->buffer, pkt, size);

    sent = size;
    status = pj_ioqueue_sendto( udp->rtp_key, 
                                &udp->rtp_pending_write[id].op_key,
                                pw->buffer, &sent, 0,
                                &udp->rem_rtp_addr, 
                                udp->addr_len);

    if (status != PJ_EPENDING) {
        /* Send operation has completed immediately. Clear the flag. */
        pw->is_pending = PJ_FALSE;
    }

    udp->rtp_write_op_id = (udp->rtp_write_op_id + 1) %
                           PJ_ARRAY_SIZE(udp->rtp_pending_write);

    if (status==PJ_SUCCESS || status==PJ_EPENDING)
        return PJ_SUCCESS;

    return status;
}

udp->rtp_write_op_id;是当前write操作可用的id,udp->rtp_pending_write是Pending write对象用于指示udp->rtp_pending_write[udp->rtp_write_op_id]是否有pending的write 操作如果有pending 返回。没有的话将该id处置为pending pw->is_pending = PJ_TRUE; 调用pj_ioqueue_sendto 最后需要将id+1

5、pj_ioqueue_sendto

参数key来着 udp->rtp_key 在 pj_ioqueue_register_sock2中初始化,绑定到rtp socket

参数op_key来自 transport_udp::udp->rtp_pending_write[id].op_key,在transport_media_start中设置为空

如果key对应的writelist不为空,直接发送调用pj_sock_sendto

	 if (pj_list_empty(&key->write_list)) {
        /*
         * See if data can be sent immediately.
         */
        sent = *length;
        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
        
    }

否则初始化write_operation:: write_op 主要需要将要发送的rtp数据保存在write_op中,然后挂在key对应的writelist队列上

		write_op->op = PJ_IOQUEUE_OP_SEND_TO;
    write_op->buf = (char*)data;
    write_op->size = *length;
    write_op->written = 0;
    write_op->flags = flags;
    pj_memcpy(&write_op->rmt_addr, addr, addrlen);
    write_op->rmt_addrlen = addrlen;
    
    pj_ioqueue_lock_key(key);
    /* Check again. Handle may have been closed after the previous check
     * in multithreaded app. If we add bad handle to the set it will
     * corrupt the ioqueue set. See #913
     */
    if (IS_CLOSING(key)) {
        pj_ioqueue_unlock_key(key);
        return PJ_ECANCELLED;
    }
    pj_list_insert_before(&key->write_list, write_op);

调用ioqueue_add_to_set进而调用ioqueue_add_to_set2

static void ioqueue_add_to_set2(pj_ioqueue_t *ioqueue,
                                pj_ioqueue_key_t *key,
                                unsigned event_types )
{
    pj_uint32_t events = key->ev.events;

    if (event_types & READABLE_EVENT)
        events |= EPOLLIN;
    if (event_types & WRITEABLE_EVENT)
        events |= EPOLLOUT;
    if (event_types & EXCEPTION_EVENT)
        events |= EPOLLERR;

    if (events != key->ev.events)
        update_epoll_event_set(ioqueue, key, events);
}

根据 event_types 设置 events 调用 update_epoll_event_set 这里event_types是WRITEABLE_EVENT

static void update_epoll_event_set(pj_ioqueue_t *ioqueue,
                                   pj_ioqueue_key_t *key,
                                   pj_uint32_t events)
{
    int rc;
    /* From epoll_ctl(2):
     * EPOLLEXCLUSIVE may be used only in an EPOLL_CTL_ADD operation;
     * attempts to employ it with EPOLL_CTL_MOD yield an error.
     */
    if (key->ev.events & EPOLLEXCLUSIVE) {
        rc = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &key->ev);
        key->ev.events = events;
        rc = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, key->fd, &key->ev);
    } else {
        key->ev.events = events;
        rc = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &key->ev);
    }

    if (rc != 0) {
        pj_status_t status = pj_get_os_error();
        PJ_PERROR(1,(THIS_FILE, status,
                     "epol_ctl(MOD) error (events=0x%x)", events));
    }
}

这里修改监测rtp socket事件EPOLL_CTL_MOD修改为EPOLLOUT 触发

6、ioqueue_epoll参与

这里不一定是上述追踪的rtp包

当pj_ioqueue_poll工作线程 调用os_epoll_wait 发现监测的EPOLLOUT写触发,调用ioqueue_dispatch_write_event写操作,在ioqueue_dispatch_write_event中先看key write_list上有没有pending_write,有的话,从write_list取出,根据write_list的 write_op确定写大小,要写入的数据,将数据写入调用pj_sock_send函数Transmit data to the socket.,最后调用on_write_complete,回调函数已在pj_ioqueue_register_sock2时设置过,传入write_op为on_rtp_data_sent

if (h->cb.on_write_complete && !IS_CLOSING(h)) {
                (*h->cb.on_write_complete)(h, 
                                           (pj_ioqueue_op_key_t*)write_op,
                                           write_op->written);
            }
static void on_rtp_data_sent(pj_ioqueue_key_t *key, 
                             pj_ioqueue_op_key_t *op_key, 
                             pj_ssize_t bytes_sent)
{
    struct transport_udp *udp;
    unsigned i;

    PJ_UNUSED_ARG(bytes_sent);

    udp = (struct transport_udp*) pj_ioqueue_get_user_data(key);

    for (i = 0; i < PJ_ARRAY_SIZE(udp->rtp_pending_write); ++i) {
        if (&udp->rtp_pending_write[i].op_key == op_key) {
            udp->rtp_pending_write[i].is_pending = PJ_FALSE;
            break;
        }
    }
}

遍历transport_udp udp中的rtp_pending_write 找到与目标 op_key一致的位置,将is_pending置为false完成

附:相互指向关系

stream.user_data-》snd_port,

snd_port.port-》port

port->port_data.pdata;-》stream

stream->transport-》pjmedia_transport