Transport
媒体传输封装了网络收发细节,pjmedia_transport可以是udp、srtp、ice等,这里以udp为例。
相关结构体
结构体pjmedia_transport
/**
* This structure declares media transport. A media transport is called
* by the stream to transmit a packet, and will notify stream when
* incoming packet is arrived.
*/
struct pjmedia_transport
{
/** Transport name (for logging purpose). */
char name[PJ_MAX_OBJ_NAME];
/** Transport type. */
pjmedia_transport_type type;
/** Transport's "virtual" function table. */
pjmedia_transport_op *op;
/** Application/user data */
void *user_data;
};
type:传输类型,上面讲过,这里以udp为例。
op:操作集,每种传输类型实现了同一组接口。
user_data:应用层用户数据
pjmedia_transport_op
操作集是核心,这里列举重要的一些函数。pjmedia_transport_op
/**
* This structure describes the operations for the stream transport.
*/
struct pjmedia_transport_op
{
/**
* This function is called by the stream when the transport is about
* to be used by the stream for the first time, and it tells the transport
* about remote RTP address to send the packet and some callbacks to be
* called for incoming packets. This function exists for backwards
* compatibility. Transports should implement attach2 instead.
*
* Application should call #pjmedia_transport_attach() instead of
* calling this function directly.
*/
pj_status_t (*attach)(pjmedia_transport *tp,
void *user_data,
const pj_sockaddr_t *rem_addr,
const pj_sockaddr_t *rem_rtcp,
unsigned addr_len,
void (*rtp_cb)(void *user_data,
void *pkt,
pj_ssize_t size),
void (*rtcp_cb)(void *user_data,
void *pkt,
pj_ssize_t size));
/**
* This function is called by the stream to send RTP packet using the
* transport.
*
* Application should call #pjmedia_transport_send_rtp() instead of
* calling this function directly.
*/
pj_status_t (*send_rtp)(pjmedia_transport *tp,
const void *pkt,
pj_size_t size);
/**
* Prepare the transport for a new media session.
*
* Application should call #pjmedia_transport_media_create() instead of
* calling this function directly.
*/
pj_status_t (*media_create)(pjmedia_transport *tp,
pj_pool_t *sdp_pool,
unsigned options,
const pjmedia_sdp_session *remote_sdp,
unsigned media_index);
/**
* This function is called by application to start the transport
* based on local and remote SDP.
*
* Application should call #pjmedia_transport_media_start() instead of
* calling this function directly.
*/
pj_status_t (*media_start) (pjmedia_transport *tp,
pj_pool_t *tmp_pool,
const pjmedia_sdp_session *sdp_local,
const pjmedia_sdp_session *sdp_remote,
unsigned media_index);
/**
* This function is called by application to stop the transport.
*
* Application should call #pjmedia_transport_media_stop() instead of
* calling this function directly.
*/
pj_status_t (*media_stop) (pjmedia_transport *tp);
/**
* This function can be called to destroy this transport.
*
* Application should call #pjmedia_transport_close() instead of
* calling this function directly.
*/
pj_status_t (*destroy)(pjmedia_transport *tp);
/**
* This function is called by the stream when the transport is about
* to be used by the stream for the first time, and it tells the transport
* about remote RTP address to send the packet and some callbacks to be
* called for incoming packets.
*
* Application should call #pjmedia_transport_attach2() instead of
* calling this function directly.
*/
pj_status_t (*attach2)(pjmedia_transport *tp,
pjmedia_transport_attach_param *att_param);
};
op的默认初始化方法
static pjmedia_transport_op transport_udp_op =
{
&transport_get_info,
&transport_attach,
&transport_detach,
&transport_send_rtp,
&transport_send_rtcp,
&transport_send_rtcp2,
&transport_media_create,
&transport_encode_sdp,
&transport_media_start,
&transport_media_stop,
&transport_simulate_lost,
&transport_destroy,
&transport_attach2
};
这里主要看attach2,这个函数传入rtp和rtcp的回调函数指针,当从网络收到数据时,会通过该回调通知。
pjmedia_transport_attach_param
/**
* This structure describes the data passed when calling
* #pjmedia_transport_attach2().
*/
struct pjmedia_transport_attach_param
{
/**
* The media stream.
*/
void *stream;
/**
* Indicate the stream type, either it's audio (PJMEDIA_TYPE_AUDIO)
* or video (PJMEDIA_TYPE_VIDEO).
*/
pjmedia_type media_type;
/**
* Remote RTP address to send RTP packet to.
*/
pj_sockaddr rem_addr;
/**
* Optional remote RTCP address. If the argument is NULL
* or if the address is zero, the RTCP address will be
* calculated from the RTP address (which is RTP port plus one).
*/
pj_sockaddr rem_rtcp;
/**
* Length of the remote address.
*/
unsigned addr_len;
/**
* Arbitrary user data to be set when the callbacks are called.
*/
void *user_data;
/**
* Callback to be called when RTP packet is received on the transport.
*/
void (*rtp_cb)(void *user_data, void *pkt, pj_ssize_t);
/**
* Callback to be called when RTCP packet is received on the transport.
*/
void (*rtcp_cb)(void *user_data, void *pkt, pj_ssize_t);
/**
* Callback to be called when RTP packet is received on the transport.
*/
void (*rtp_cb2)(pjmedia_tp_cb_param *param);
};
transport_udp
struct transport_udp
{
pjmedia_transport base; /**< Base transport. */
pj_pool_t *pool; /**< Memory pool */
unsigned options; /**< Transport options. */
unsigned media_options; /**< Transport media options. */
void *user_data; /**< Only valid when attached */
//pj_bool_t attached; /**< Has attachment? */
pj_bool_t started; /**< Has started? */
pj_sockaddr rem_rtp_addr; /**< Remote RTP address */
pj_sockaddr rem_rtcp_addr; /**< Remote RTCP address */
int addr_len; /**< Length of addresses. */
void (*rtp_cb)( void*, /**< To report incoming RTP. */
void*,
pj_ssize_t);
void (*rtp_cb2)(pjmedia_tp_cb_param*); /**< To report incoming RTP. */
void (*rtcp_cb)( void*, /**< To report incoming RTCP. */
void*,
pj_ssize_t);
unsigned tx_drop_pct; /**< Percent of tx pkts to drop. */
unsigned rx_drop_pct; /**< Percent of rx pkts to drop. */
pj_ioqueue_t *ioqueue; /**< Ioqueue instance. */
pj_sock_t rtp_sock; /**< RTP socket */
pj_sockaddr rtp_addr_name; /**< Published RTP address. */
pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */
pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */
unsigned rtp_write_op_id;/**< Next write_op to use */
pending_write rtp_pending_write[MAX_PENDING]; /**< Pending write */
pj_sockaddr rtp_src_addr; /**< Actual packet src addr. */
int rtp_addrlen; /**< Address length. */
char rtp_pkt[RTP_LEN];/**< Incoming RTP packet buffer */
pj_bool_t enable_rtcp_mux;/**< Enable RTP & RTCP multiplexing?*/
pj_bool_t use_rtcp_mux; /**< Use RTP & RTCP multiplexing? */
pj_sock_t rtcp_sock; /**< RTCP socket */
pj_sockaddr rtcp_addr_name; /**< Published RTCP address. */
pj_sockaddr rtcp_src_addr; /**< Actual source RTCP address. */
unsigned rtcp_src_cnt; /**< How many pkt from this addr. */
int rtcp_addr_len; /**< Length of RTCP src address. */
pj_ioqueue_key_t *rtcp_key; /**< RTCP socket key in ioqueue */
pj_ioqueue_op_key_t rtcp_read_op; /**< Pending read operation */
pj_ioqueue_op_key_t rtcp_write_op; /**< Pending write operation */
char rtcp_pkt[RTCP_LEN];/**< Incoming RTCP packet buffer */
};
pj_ioqueue_callback
/**
* This structure describes the callbacks to be called when I/O operation
* completes.
*/
typedef struct pj_ioqueue_callback
{
/**
* This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom
* completes.
*
* @param key The key.
* @param op_key Operation key.
* @param bytes_read >= 0 to indicate the amount of data read,
* otherwise negative value containing the error
* code. To obtain the pj_status_t error code, use
* (pj_status_t code = -bytes_read).
*/
void (*on_read_complete)(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read);
/**
* This callback is called when #pj_ioqueue_send or #pj_ioqueue_sendto
* completes.
*
* @param key The key.
* @param op_key Operation key.
* @param bytes_sent >= 0 to indicate the amount of data written,
* otherwise negative value containing the error
* code. To obtain the pj_status_t error code, use
* (pj_status_t code = -bytes_sent).
*/
void (*on_write_complete)(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent);
/**
* This callback is called when #pj_ioqueue_accept completes.
*
* @param key The key.
* @param op_key Operation key.
* @param sock Newly connected socket.
* @param status Zero if the operation completes successfully.
*/
void (*on_accept_complete)(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t sock,
pj_status_t status);
/**
* This callback is called when #pj_ioqueue_connect completes.
*
* @param key The key.
* @param status PJ_SUCCESS if the operation completes successfully.
*/
void (*on_connect_complete)(pj_ioqueue_key_t *key,
pj_status_t status);
} pj_ioqueue_callback;
创建udp media transport
在simpleua.c初始化时,创建完媒体端点pjmedia_endpt后,还会预先创建好udp媒体传输在main.c在sdp和invite session之前
/*
* Create media transport used to send/receive RTP/RTCP socket.
* One media transport is needed for each call. Application may
* opt to re-use the same media transport for subsequent calls.
*/
for (i = 0; i < PJ_ARRAY_SIZE(g_med_transport); ++i) {
status = pjmedia_transport_udp_create3(g_med_endpt, AF, NULL, NULL,
RTP_PORT + i*2, 0,
&g_med_transport[i]);
pjmedia_transport_udp_create最终调用pjmedia_transport_udp_create3,这个函数先创建rtp和rtcp两个socket,然后调用pjmedia_transport_udp_attach。
transport_udp_create调用流
pjmedia_transport_udp_create3 (Create & Bind RTP & RTCP socket)->pjmedia_transport_udp_attach(创建transport_udp,初始化socket infos,pj_ioqueue_register_sock2:注册socket到 I/O queue同时设置回调函数,以便异步接受消息)
pjmedia_transport_udp_create3
/**
* Create UDP stream transport.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_create3(pjmedia_endpt *endpt,
int af,
const char *name,
const pj_str_t *addr,
int port,
unsigned options,
pjmedia_transport **p_tp)
{
pjmedia_sock_info si;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && port && p_tp, PJ_EINVAL);
pj_bzero(&si, sizeof(pjmedia_sock_info));
si.rtp_sock = si.rtcp_sock = PJ_INVALID_SOCKET;
/* Create RTP socket */
status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &si.rtp_sock);
if (status != PJ_SUCCESS)
goto on_error;
/* Bind RTP socket */
status = pj_sockaddr_init(af, &si.rtp_addr_name, addr, (pj_uint16_t)port);
if (status != PJ_SUCCESS)
goto on_error;
status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name,
pj_sockaddr_get_len(&si.rtp_addr_name));
if (status != PJ_SUCCESS)
goto on_error;
/* Create RTCP socket */
status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &si.rtcp_sock);
if (status != PJ_SUCCESS)
goto on_error;
/* Bind RTCP socket */
status = pj_sockaddr_init(af, &si.rtcp_addr_name, addr,
(pj_uint16_t)(port+1));
if (status != PJ_SUCCESS)
goto on_error;
status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name,
pj_sockaddr_get_len(&si.rtcp_addr_name));
if (status != PJ_SUCCESS)
goto on_error;
/* Create UDP transport by attaching socket info */
return pjmedia_transport_udp_attach( endpt, name, &si, options, p_tp);
on_error:
if (si.rtp_sock != PJ_INVALID_SOCKET)
pj_sock_close(si.rtp_sock);
if (si.rtcp_sock != PJ_INVALID_SOCKET)
pj_sock_close(si.rtcp_sock);
return status;
}
Create & Bind RTP & RTCP socket、Create UDP transport by attaching socket info
pjmedia_transport_udp_attach
/**
* Create UDP stream transport from existing socket info.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
const char *name,
const pjmedia_sock_info *si,
unsigned options,
pjmedia_transport **p_tp)
{
struct transport_udp *tp;
pj_pool_t *pool;
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback rtp_cb, rtcp_cb;
pj_grp_lock_t *grp_lock;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && si && p_tp, PJ_EINVAL);
/* Get ioqueue instance */
ioqueue = pjmedia_endpt_get_ioqueue(endpt);
if (name==NULL)
name = "udp%p";
/* Create transport structure */
pool = pjmedia_endpt_create_pool(endpt, name, 512, 512);
if (!pool)
return PJ_ENOMEM;
tp = PJ_POOL_ZALLOC_T(pool, struct transport_udp);
tp->pool = pool;
tp->options = options;
pj_memcpy(tp->base.name, pool->obj_name, PJ_MAX_OBJ_NAME);
tp->base.op = &transport_udp_op;
tp->base.type = PJMEDIA_TRANSPORT_TYPE_UDP;
/* Copy socket infos */
tp->rtp_sock = si->rtp_sock;
tp->rtp_addr_name = si->rtp_addr_name;
tp->rtcp_sock = si->rtcp_sock;
tp->rtcp_addr_name = si->rtcp_addr_name;
/* If address is 0.0.0.0, use host's IP address */
if (!pj_sockaddr_has_addr(&tp->rtp_addr_name)) {
pj_sockaddr hostip;
status = pj_gethostip(tp->rtp_addr_name.addr.sa_family, &hostip);
if (status != PJ_SUCCESS)
goto on_error;
pj_memcpy(pj_sockaddr_get_addr(&tp->rtp_addr_name),
pj_sockaddr_get_addr(&hostip),
pj_sockaddr_get_addr_len(&hostip));
}
/* Same with RTCP */
if (!pj_sockaddr_has_addr(&tp->rtcp_addr_name)) {
pj_memcpy(pj_sockaddr_get_addr(&tp->rtcp_addr_name),
pj_sockaddr_get_addr(&tp->rtp_addr_name),
pj_sockaddr_get_addr_len(&tp->rtp_addr_name));
}
/* Create group lock */
status = pj_grp_lock_create(pool, NULL, &grp_lock);
if (status != PJ_SUCCESS)
goto on_error;
pj_grp_lock_add_ref(grp_lock);
tp->base.grp_lock = grp_lock;
/* Setup RTP socket with the ioqueue */
pj_bzero(&rtp_cb, sizeof(rtp_cb));
rtp_cb.on_read_complete = &on_rx_rtp;
rtp_cb.on_write_complete = &on_rtp_data_sent;
status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtp_sock, grp_lock,
tp, &rtp_cb, &tp->rtp_key);
if (status != PJ_SUCCESS)
goto on_error;
/* Disallow concurrency so that detach() and destroy() are
* synchronized with the callback.
*
* Note that we still need this even after group lock is added to
* maintain the above behavior.
*/
status = pj_ioqueue_set_concurrency(tp->rtp_key, PJ_FALSE);
if (status != PJ_SUCCESS)
goto on_error;
/* Setup RTCP socket with ioqueue */
pj_bzero(&rtcp_cb, sizeof(rtcp_cb));
rtcp_cb.on_read_complete = &on_rx_rtcp;
status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtcp_sock, grp_lock,
tp, &rtcp_cb, &tp->rtcp_key);
if (status != PJ_SUCCESS)
goto on_error;
status = pj_ioqueue_set_concurrency(tp->rtcp_key, PJ_FALSE);
if (status != PJ_SUCCESS)
goto on_error;
tp->ioqueue = ioqueue;
/* Done */
*p_tp = &tp->base;
return PJ_SUCCESS;
on_error:
transport_destroy(&tp->base);
return status;
}
创建transport_udp :PJ_POOL_ZALLOC_T,Copy socket infos 到transport_udp、 Setup RTP/RTCP socket with the ioqueue(设置回调函数on_rx_rtp、on_rtp_data_sent、on_rx_rtcp)。
udp_attach先申请UDP媒体传输结构体transport_udp *tp的内存,注意,此结构体包含了媒体传输pjmedia_transport和一些回调,但是这些回调还没有设置。其中的操作集指向transport_udp_op。接着把socket注册到媒体端点中的io队列,io队列的读完成回调是on_rx_rtp。从这里可以知道,从网络读到数据时,会调用transport_udp.c中的on_rx_rtp,而在这个回调里,会再调用transport_udp中的回调rtp_cb和rtp_cb2,而这两个回调,创建的时候还没有设置,要等到调用操作集的attach才会设置。
注意,这里有两个attach的地方,一个是创建的时候,调用pjmedia_transport_udp_attach,这个attach会把socket注册到ioqueue,同时ioqueue的读完成回调为transport_udp.c中的on_rx_rtp。
pj_ioqueue_register_sock2
注册一个套接字到I/O队列框架。当一个套接字注册到IO队列时,它可以被修改为使用非阻塞IO。如果被修改了,就不能保证在套接字取消注册后会恢复这种修改。
- pool – To allocate the resource for the specified handle, which must be valid until the handle/key is unregistered from I/O Queue.
- ioque – The I/O Queue.
- sock – The socket.
- user_data – User data to be associated with the key, which can be retrieved later.
- cb – Callback to be called when I/O opertion completes.
- key – Pointer to receive the key to be associated with this socket. Subsequent I/O queue operation will need this key.
第二个attach是transport_attach/transport_attach2,这个attach会再传入rtp_cb和rtcp_cb,而这两个回调,会被on_rx_rtp调用,所以这里有两个回调。总结数据流方向,从网络收到数据,最后会进入attach传入的rtp_cb。但是这个rtp_cb什么时候设置,设置的是谁,这个是在 stream 中实现,下一篇再说。
pjmedia_transport_attach2调用流
pjmedia_transport_attach2-》transport_attach2-》tp_attach
tp_attach最重要就是设置了pjmedia_transport 的cb2 为on_rx_request,还有Copy remote RTP address