pjmedia总述
转载:pjmedia_HYQ458941968的博客-CSDN博客
从对象关系来看:
一次session,多个endpoint参与,彼此发送stream,每个stream两个channel分别收发。音频设备port对象采集播放声音,transport最终发送接收信号。
1、 pjmedia_endpt,代表一个媒体端点,端点可以理解为一个节点,可以是服务器或者客户端,一个设备一般只会有唯一一个端点,而且在初始化的时候创建。
2、pjmedia_session,代表一次会话,一个会话可以只有两个,也可以有多个参与者会议。
3、pjmedia_stream,代表一个流,在于一方进行通讯时,一般可以有音频流、视频流。
4、pjmedia_channel,只有一个方向的媒体流,也就是说,对应音频流,要发送通道和接收通道两个channel。
以上是范围从大到小的媒体对象,下面介绍其它对象。
5、pjmedia_transport,传输对象,封装了所有从网络接收数据和发送数据到网络的操作,可以是UDP、SRTP、ICE等传输方式。
6、pjmedia_snd_port,音频设备对象,最终的音频裸流都要在设备上播放,从麦克风采集,此对象封装所有设备操作。
从数据流来看:
左边为最底层,音频设备的播放和采集,通过回调接口,调用stream.c生产或消化数据,最后通过transport传输接口进行发送接收。当然,其中还夹杂着前后处理、编解码、抖动缓冲区等,后面的章节会对各个对象及相关的音频处理进行描述。
最后来分析一下实例代码simpleua.c中对pjmedia的使用方法。
初始化
在main函数中,前半部分主要是对sip的初始化,对pjmedia的初始化大概从358行开始
1、创建媒体端点endpoint
static pjmedia_endpt *g_med_endpt; /* Media endpoint. */
#if PJ_HAS_THREADS
status = pjmedia_endpt_create(&cp.factory, NULL, 1, &g_med_endpt);
#else
status = pjmedia_endpt_create(&cp.factory,
pjsip_endpt_get_ioqueue(g_endpt),
0, &g_med_endpt);
#endif
这里我们默认看有多线程的流程。
2、创建udp网络接口transport
static pjmedia_transport *g_med_transport[MAX_MEDIA_CNT];
/*
* Create media transport used to send/receive RTP/RTCP socket.
* One media transport is needed for each call. Application may
* opt to re-use the same media transport for subsequent calls.
*/
for (i = 0; i < PJ_ARRAY_SIZE(g_med_transport); ++i) {
status = pjmedia_transport_udp_create3(g_med_endpt, AF, NULL, NULL,
RTP_PORT + i*2, 0,
&g_med_transport[i]);
可以看出,虽然初始化还没有建立媒体通讯,但是预先创建了若干传输对象。
3、loop处理sip事件
/* Loop until one call is completed */
for (;!g_complete;) {
pj_time_val timeout = {0, 10};
pjsip_endpt_handle_events(g_endpt, &timeout);
}
初始化的最后是一个循环等待处理sip对象的操作。
建立媒体通讯
当sip,sdp协商成功后,则要开始媒体通讯,发生在回调函数call_on_media_update
1、创建并启动流对象stream
static pjmedia_stream *g_med_stream; /* Call's audio stream. */
/* Create stream info based on the media audio SDP. */
status = pjmedia_stream_info_from_sdp(&stream_info, inv->dlg->pool,
g_med_endpt,
local_sdp, remote_sdp, 0);
/* Create new audio media stream, passing the stream info, and also the
* media socket that we created earlier.
*/
status = pjmedia_stream_create(g_med_endpt, inv->dlg->pool, &stream_info,
g_med_transport[0], NULL, &g_med_stream);
/* Start the audio stream */
status = pjmedia_stream_start(g_med_stream);
这个实例并没有创建session,而是直接创建流对象。先从sip协商的sdp中获取媒体信息pjmedia_stream_info_from_sdp,然后根据这些信息创建流对象pjmedia_stream_create,并紧接着启动流pjmedia_stream_start。这里只有音频流,视频流暂不纳入分析范围。
2、启动网络传输transport
/* Start the UDP media transport */
pjmedia_transport_media_start(g_med_transport[0], 0, 0, 0, 0);
前面讲到,初始化的时候预创建了若干传输对象,但是并没有启动,等到协商成功后,才启动网络传输。
3、创建音频设备对象Sound_device
static pjmedia_snd_port *g_snd_port; /* Sound device. */
/* Get the media port interface of the audio stream.
* Media port interface is basicly a struct containing get_frame() and
* put_frame() function. With this media port interface, we can attach
* the port interface to conference bridge, or directly to a sound
* player/recorder device.
*/
pjmedia_stream_get_port(g_med_stream, &media_port);
/* Create sound port */
pjmedia_snd_port_create(inv->pool,
PJMEDIA_AUD_DEFAULT_CAPTURE_DEV,
PJMEDIA_AUD_DEFAULT_PLAYBACK_DEV,
PJMEDIA_PIA_SRATE(&media_port->info),/* clock rate */
PJMEDIA_PIA_CCNT(&media_port->info),/* channel count */
PJMEDIA_PIA_SPF(&media_port->info), /* samples per frame*/
PJMEDIA_PIA_BITS(&media_port->info),/* bits per sample */
0,
&g_snd_port);
status = pjmedia_snd_port_connect(g_snd_port, media_port);
这里先从流对象取出媒体接口pjmedia_stream_get_port,其中的媒体接口包含了数据回调,这些后面分析数据流再重点讲,然后创建pjmedia_snd_port_create并启动pjmedia_snd_port_connect音频设备对象。
结束销毁
/* Destroy audio ports. Destroy the audio port first
* before the stream since the audio port has threads
* that get/put frames to the stream.
*/
if (g_snd_port)
pjmedia_snd_port_destroy(g_snd_port);
/* Destroy streams */
if (g_med_stream)
pjmedia_stream_destroy(g_med_stream);
/* Destroy media transports */
for (i = 0; i < MAX_MEDIA_CNT; ++i) {
if (g_med_transport[i])
pjmedia_transport_close(g_med_transport[i]);
}
/* Deinit pjmedia endpoint */
if (g_med_endpt)
pjmedia_endpt_destroy(g_med_endpt);
/* Deinit pjsip endpoint */
if (g_endpt)
pjsip_endpt_destroy(g_endpt);
/* Release pool */
if (pool)
pj_pool_release(pool);
当主线程退出loop时,则销毁所有创建的对象。
pjmedia_endpt
simpleua.c在进行媒体相关初始化时,首先创建媒体端点,看看媒体端点的数据结构和创建流程。
#if PJ_HAS_THREADS
status = pjmedia_endpt_create(&cp.factory, NULL, 1, &g_med_endpt);
#else
status = pjmedia_endpt_create(&cp.factory,
pjsip_endpt_get_ioqueue(g_endpt),
0, &g_med_endpt);
#endif
pjmedia_endpt结构体
/** Concrete declaration of media endpoint. */
struct pjmedia_endpt
{
/** Pool. */
pj_pool_t *pool;
/** Pool factory. */
pj_pool_factory *pf;
/** Codec manager. */
pjmedia_codec_mgr codec_mgr;
/** IOqueue instance. */
pj_ioqueue_t *ioqueue;
/** Do we own the ioqueue? */
pj_bool_t own_ioqueue;
/** Number of threads. */
unsigned thread_cnt;
/** IOqueue polling thread, if any. */
pj_thread_t *thread[MAX_THREADS];
/** To signal polling thread to quit. */
pj_bool_t quit_flag;
/** Is telephone-event enable */
pj_bool_t has_telephone_event;
/** List of exit callback. */
exit_cb exit_cb_list;
};
ioqueue:用于绑定socket,然后异步接收数据;
thread[]:线程数组,存储工作线程。
codec_mgr:codec的管理者
exit_cb_list:退出时的回调链表
quit_flag:置位则工作线程都退出
创建端点
/**
* Initialize and get the instance of media endpoint.
*/
PJ_DEF(pj_status_t) pjmedia_endpt_create2(pj_pool_factory *pf,
pj_ioqueue_t *ioqueue,
unsigned worker_cnt,
pjmedia_endpt **p_endpt)
{
pj_pool_t *pool;
pjmedia_endpt *endpt;
unsigned i;
pj_status_t status;
pool = pj_pool_create(pf, "med-ept", 512, 512, NULL);
if (!pool)
return PJ_ENOMEM;
endpt = PJ_POOL_ZALLOC_T(pool, struct pjmedia_endpt);
endpt->pool = pool;
endpt->pf = pf;
endpt->ioqueue = ioqueue;
endpt->thread_cnt = worker_cnt;
endpt->has_telephone_event = PJ_TRUE;
/* Init codec manager. */
status = pjmedia_codec_mgr_init(&endpt->codec_mgr, endpt->pf);
if (status != PJ_SUCCESS)
goto on_error;
/* Initialize exit callback list. */
pj_list_init(&endpt->exit_cb_list);
/* Create ioqueue if none is specified. */
if (endpt->ioqueue == NULL) {
endpt->own_ioqueue = PJ_TRUE;
status = pj_ioqueue_create( endpt->pool, PJ_IOQUEUE_MAX_HANDLES,
&endpt->ioqueue);
if (status != PJ_SUCCESS)
goto on_error;
}
/* Create worker threads if asked. */
for (i=0; i<worker_cnt; ++i) {
status = pj_thread_create( endpt->pool, "media", &worker_proc,
endpt, 0, 0, &endpt->thread[i]);
if (status != PJ_SUCCESS)
goto on_error;
}
*p_endpt = endpt;
return PJ_SUCCESS;
on_error:
/* Destroy threads */
for (i=0; i<endpt->thread_cnt; ++i) {
if (endpt->thread[i]) {
pj_thread_destroy(endpt->thread[i]);
}
}
/* Destroy internal ioqueue */
if (endpt->ioqueue && endpt->own_ioqueue)
pj_ioqueue_destroy(endpt->ioqueue);
pjmedia_codec_mgr_destroy(&endpt->codec_mgr);
//pjmedia_aud_subsys_shutdown();
pj_pool_release(pool);
return status;
}
1、创建内存池med-ept
2、申请媒体端点结构体内存,并把外部的ioqueue地址存到结构体 endpt->ioqueue = ioqueue;
3、初始化编码器manager pjmedia_codec_mgr_init(&endpt->codec_mgr, endpt->pf)
4、如果初入的ioqueue为空,则创建一个新的ioqueue
5、创建工作线程组,线程函数是worker_proc
工作线程
/**
* Worker thread proc.
*/
static int PJ_THREAD_FUNC worker_proc(void *arg)
{
pjmedia_endpt *endpt = (pjmedia_endpt*) arg;
while (!endpt->quit_flag) {
pj_time_val timeout = { 0, 500 };
pj_ioqueue_poll(endpt->ioqueue, &timeout);
}
return 0;
}
工作线程就是每500ms调用一次pj_ioqueue_pool。刚创建媒体端点时,ioqueue还没有注册socket,所以不会监控到数据。
pjmedia_transport
媒体传输封装了网络收发细节,pjmedia_transport可以是udp、srtp、ice等,这里以udp为例。
结构体pjmedia_transport
/**
* This structure declares media transport. A media transport is called
* by the stream to transmit a packet, and will notify stream when
* incoming packet is arrived.
*/
struct pjmedia_transport
{
/** Transport name (for logging purpose). */
char name[PJ_MAX_OBJ_NAME];
/** Transport type. */
pjmedia_transport_type type;
/** Transport's "virtual" function table. */
pjmedia_transport_op *op;
/** Application/user data */
void *user_data;
};
type:传输类型,上面讲过,这里以udp为例。
op:操作集,每种传输类型实现了同一组接口。
user_data:应用层用户数据
pjmedia_transport_op
操作集是核心,这里列举重要的一些函数。pjmedia_transport_op
/**
* This structure describes the operations for the stream transport.
*/
struct pjmedia_transport_op
{
/**
* This function is called by the stream when the transport is about
* to be used by the stream for the first time, and it tells the transport
* about remote RTP address to send the packet and some callbacks to be
* called for incoming packets. This function exists for backwards
* compatibility. Transports should implement attach2 instead.
*
* Application should call #pjmedia_transport_attach() instead of
* calling this function directly.
*/
pj_status_t (*attach)(pjmedia_transport *tp,
void *user_data,
const pj_sockaddr_t *rem_addr,
const pj_sockaddr_t *rem_rtcp,
unsigned addr_len,
void (*rtp_cb)(void *user_data,
void *pkt,
pj_ssize_t size),
void (*rtcp_cb)(void *user_data,
void *pkt,
pj_ssize_t size));
/**
* This function is called by the stream to send RTP packet using the
* transport.
*
* Application should call #pjmedia_transport_send_rtp() instead of
* calling this function directly.
*/
pj_status_t (*send_rtp)(pjmedia_transport *tp,
const void *pkt,
pj_size_t size);
/**
* Prepare the transport for a new media session.
*
* Application should call #pjmedia_transport_media_create() instead of
* calling this function directly.
*/
pj_status_t (*media_create)(pjmedia_transport *tp,
pj_pool_t *sdp_pool,
unsigned options,
const pjmedia_sdp_session *remote_sdp,
unsigned media_index);
/**
* This function is called by application to start the transport
* based on local and remote SDP.
*
* Application should call #pjmedia_transport_media_start() instead of
* calling this function directly.
*/
pj_status_t (*media_start) (pjmedia_transport *tp,
pj_pool_t *tmp_pool,
const pjmedia_sdp_session *sdp_local,
const pjmedia_sdp_session *sdp_remote,
unsigned media_index);
/**
* This function is called by application to stop the transport.
*
* Application should call #pjmedia_transport_media_stop() instead of
* calling this function directly.
*/
pj_status_t (*media_stop) (pjmedia_transport *tp);
/**
* This function can be called to destroy this transport.
*
* Application should call #pjmedia_transport_close() instead of
* calling this function directly.
*/
pj_status_t (*destroy)(pjmedia_transport *tp);
/**
* This function is called by the stream when the transport is about
* to be used by the stream for the first time, and it tells the transport
* about remote RTP address to send the packet and some callbacks to be
* called for incoming packets.
*
* Application should call #pjmedia_transport_attach2() instead of
* calling this function directly.
*/
pj_status_t (*attach2)(pjmedia_transport *tp,
pjmedia_transport_attach_param *att_param);
};
这里主要看attach2,这个函数传入rtp和rtcp的回调函数指针,当从网络收到数据时,会通过该回调通知。
创建udp媒体传输
在simpleua.c初始化时,创建完媒体端点pjmedia_endpt后,还会预先创建好udp媒体传输
/*
* Create media transport used to send/receive RTP/RTCP socket.
* One media transport is needed for each call. Application may
* opt to re-use the same media transport for subsequent calls.
*/
for (i = 0; i < PJ_ARRAY_SIZE(g_med_transport); ++i) {
status = pjmedia_transport_udp_create3(g_med_endpt, AF, NULL, NULL,
RTP_PORT + i*2, 0,
&g_med_transport[i]);
pjmedia_transport_udp_create最终调用pjmedia_transport_udp_create3,这个函数先创建rtp和rtcp两个socket,然后调用pjmedia_transport_udp_attach。
调用流:
pjmedia_transport_udp_create3、pjmedia_transport_udp_attach、pj_ioqueue_register_sock2
pjmedia_transport_udp_create3
/**
* Create UDP stream transport.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_create3(pjmedia_endpt *endpt,
int af,
const char *name,
const pj_str_t *addr,
int port,
unsigned options,
pjmedia_transport **p_tp)
{
pjmedia_sock_info si;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && port && p_tp, PJ_EINVAL);
pj_bzero(&si, sizeof(pjmedia_sock_info));
si.rtp_sock = si.rtcp_sock = PJ_INVALID_SOCKET;
/* Create RTP socket */
status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &si.rtp_sock);
if (status != PJ_SUCCESS)
goto on_error;
/* Bind RTP socket */
status = pj_sockaddr_init(af, &si.rtp_addr_name, addr, (pj_uint16_t)port);
if (status != PJ_SUCCESS)
goto on_error;
status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name,
pj_sockaddr_get_len(&si.rtp_addr_name));
if (status != PJ_SUCCESS)
goto on_error;
/* Create RTCP socket */
status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &si.rtcp_sock);
if (status != PJ_SUCCESS)
goto on_error;
/* Bind RTCP socket */
status = pj_sockaddr_init(af, &si.rtcp_addr_name, addr,
(pj_uint16_t)(port+1));
if (status != PJ_SUCCESS)
goto on_error;
status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name,
pj_sockaddr_get_len(&si.rtcp_addr_name));
if (status != PJ_SUCCESS)
goto on_error;
/* Create UDP transport by attaching socket info */
return pjmedia_transport_udp_attach( endpt, name, &si, options, p_tp);
on_error:
if (si.rtp_sock != PJ_INVALID_SOCKET)
pj_sock_close(si.rtp_sock);
if (si.rtcp_sock != PJ_INVALID_SOCKET)
pj_sock_close(si.rtcp_sock);
return status;
}
Create & Bind RTP & RTCP socket、Create UDP transport by attaching socket info
transport_udp
struct transport_udp
{
pjmedia_transport base; /**< Base transport. */
pj_pool_t *pool; /**< Memory pool */
unsigned options; /**< Transport options. */
unsigned media_options; /**< Transport media options. */
void *user_data; /**< Only valid when attached */
//pj_bool_t attached; /**< Has attachment? */
pj_bool_t started; /**< Has started? */
pj_sockaddr rem_rtp_addr; /**< Remote RTP address */
pj_sockaddr rem_rtcp_addr; /**< Remote RTCP address */
int addr_len; /**< Length of addresses. */
void (*rtp_cb)( void*, /**< To report incoming RTP. */
void*,
pj_ssize_t);
void (*rtp_cb2)(pjmedia_tp_cb_param*); /**< To report incoming RTP. */
void (*rtcp_cb)( void*, /**< To report incoming RTCP. */
void*,
pj_ssize_t);
unsigned tx_drop_pct; /**< Percent of tx pkts to drop. */
unsigned rx_drop_pct; /**< Percent of rx pkts to drop. */
pj_ioqueue_t *ioqueue; /**< Ioqueue instance. */
pj_sock_t rtp_sock; /**< RTP socket */
pj_sockaddr rtp_addr_name; /**< Published RTP address. */
pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */
pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */
unsigned rtp_write_op_id;/**< Next write_op to use */
pending_write rtp_pending_write[MAX_PENDING]; /**< Pending write */
pj_sockaddr rtp_src_addr; /**< Actual packet src addr. */
int rtp_addrlen; /**< Address length. */
char rtp_pkt[RTP_LEN];/**< Incoming RTP packet buffer */
pj_bool_t enable_rtcp_mux;/**< Enable RTP & RTCP multiplexing?*/
pj_bool_t use_rtcp_mux; /**< Use RTP & RTCP multiplexing? */
pj_sock_t rtcp_sock; /**< RTCP socket */
pj_sockaddr rtcp_addr_name; /**< Published RTCP address. */
pj_sockaddr rtcp_src_addr; /**< Actual source RTCP address. */
unsigned rtcp_src_cnt; /**< How many pkt from this addr. */
int rtcp_addr_len; /**< Length of RTCP src address. */
pj_ioqueue_key_t *rtcp_key; /**< RTCP socket key in ioqueue */
pj_ioqueue_op_key_t rtcp_read_op; /**< Pending read operation */
pj_ioqueue_op_key_t rtcp_write_op; /**< Pending write operation */
char rtcp_pkt[RTCP_LEN];/**< Incoming RTCP packet buffer */
};
pjmedia_transport_udp_attach
/**
* Create UDP stream transport from existing socket info.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
const char *name,
const pjmedia_sock_info *si,
unsigned options,
pjmedia_transport **p_tp)
{
struct transport_udp *tp;
pj_pool_t *pool;
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback rtp_cb, rtcp_cb;
pj_grp_lock_t *grp_lock;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && si && p_tp, PJ_EINVAL);
/* Get ioqueue instance */
ioqueue = pjmedia_endpt_get_ioqueue(endpt);
if (name==NULL)
name = "udp%p";
/* Create transport structure */
pool = pjmedia_endpt_create_pool(endpt, name, 512, 512);
if (!pool)
return PJ_ENOMEM;
tp = PJ_POOL_ZALLOC_T(pool, struct transport_udp);
tp->pool = pool;
tp->options = options;
pj_memcpy(tp->base.name, pool->obj_name, PJ_MAX_OBJ_NAME);
tp->base.op = &transport_udp_op;
tp->base.type = PJMEDIA_TRANSPORT_TYPE_UDP;
/* Copy socket infos */
tp->rtp_sock = si->rtp_sock;
tp->rtp_addr_name = si->rtp_addr_name;
tp->rtcp_sock = si->rtcp_sock;
tp->rtcp_addr_name = si->rtcp_addr_name;
/* If address is 0.0.0.0, use host's IP address */
if (!pj_sockaddr_has_addr(&tp->rtp_addr_name)) {
pj_sockaddr hostip;
status = pj_gethostip(tp->rtp_addr_name.addr.sa_family, &hostip);
if (status != PJ_SUCCESS)
goto on_error;
pj_memcpy(pj_sockaddr_get_addr(&tp->rtp_addr_name),
pj_sockaddr_get_addr(&hostip),
pj_sockaddr_get_addr_len(&hostip));
}
/* Same with RTCP */
if (!pj_sockaddr_has_addr(&tp->rtcp_addr_name)) {
pj_memcpy(pj_sockaddr_get_addr(&tp->rtcp_addr_name),
pj_sockaddr_get_addr(&tp->rtp_addr_name),
pj_sockaddr_get_addr_len(&tp->rtp_addr_name));
}
/* Create group lock */
status = pj_grp_lock_create(pool, NULL, &grp_lock);
if (status != PJ_SUCCESS)
goto on_error;
pj_grp_lock_add_ref(grp_lock);
tp->base.grp_lock = grp_lock;
/* Setup RTP socket with the ioqueue */
pj_bzero(&rtp_cb, sizeof(rtp_cb));
rtp_cb.on_read_complete = &on_rx_rtp;
rtp_cb.on_write_complete = &on_rtp_data_sent;
status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtp_sock, grp_lock,
tp, &rtp_cb, &tp->rtp_key);
if (status != PJ_SUCCESS)
goto on_error;
/* Disallow concurrency so that detach() and destroy() are
* synchronized with the callback.
*
* Note that we still need this even after group lock is added to
* maintain the above behavior.
*/
status = pj_ioqueue_set_concurrency(tp->rtp_key, PJ_FALSE);
if (status != PJ_SUCCESS)
goto on_error;
/* Setup RTCP socket with ioqueue */
pj_bzero(&rtcp_cb, sizeof(rtcp_cb));
rtcp_cb.on_read_complete = &on_rx_rtcp;
status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtcp_sock, grp_lock,
tp, &rtcp_cb, &tp->rtcp_key);
if (status != PJ_SUCCESS)
goto on_error;
status = pj_ioqueue_set_concurrency(tp->rtcp_key, PJ_FALSE);
if (status != PJ_SUCCESS)
goto on_error;
tp->ioqueue = ioqueue;
/* Done */
*p_tp = &tp->base;
return PJ_SUCCESS;
on_error:
transport_destroy(&tp->base);
return status;
}
创建transport_udp PJ_POOL_ZALLOC_T,Copy socket infos 到transport_udp、 Setup RTP/RTCP socket with the ioqueue(设置两个回调函数on_rx_rtp、on_rtp_data_sent、on_rx_rtcp)。
udp_attach先申请UDP媒体传输结构体transport_udp *tp的内存,注意,此结构体包含了媒体传输pjmedia_transport和一些回调,但是这些回调还没有设置。其中的操作集指向transport_udp_op。接着把socket注册到媒体端点中的io队列,io队列的读完成回调是on_rx_rtp。从这里可以知道,从网络读到数据时,会调用transport_udp.c中的on_rx_rtp,而在这个回调里,会再调用transport_udp中的回调rtp_cb和rtp_cb2,而这两个回调,创建的时候还没有设置,要等到调用操作集的attach才会设置。
注意,这里有两个attach的地方,一个是创建的时候,调用pjmedia_transport_udp_attach,这个attach会把socket注册到ioqueue,同时ioqueue的读完成回调为transport_udp.c中的on_rx_rtp。
pj_ioqueue_register_sock2
注册一个套接字到I/O队列框架。当一个套接字注册到IO队列时,它可以被修改为使用非阻塞IO。如果被修改了,就不能保证在套接字取消注册后会恢复这种修改。
- pool – To allocate the resource for the specified handle, which must be valid until the handle/key is unregistered from I/O Queue.
- ioque – The I/O Queue.
- sock – The socket.
- user_data – User data to be associated with the key, which can be retrieved later.
- cb – Callback to be called when I/O opertion completes.
- key – Pointer to receive the key to be associated with this socket. Subsequent I/O queue operation will need this key.
第二个attach是操作集的attach,看看上面提到的操作集
static pjmedia_transport_op transport_udp_op =
{
&transport_get_info,
&transport_attach,
&transport_detach,
&transport_send_rtp,
&transport_send_rtcp,
&transport_send_rtcp2,
&transport_media_create,
&transport_encode_sdp,
&transport_media_start,
&transport_media_stop,
&transport_simulate_lost,
&transport_destroy,
&transport_attach2
};
第二个attach是transport_attach/transport_attach2,这个attach会再传入rtp_cb和rtcp_cb,而这两个回调,会被on_rx_rtp调用,所以这里有两个回调。总结数据流方向,从网络收到数据,最后会进入attach传入的rtp_cb。但是这个rtp_cb什么时候设置,设置的是谁,这个是在 stream 中实现,下一篇再说。