ioqueue_epoll
pj_ioqueue_t
ioqueue的整体结构pj_ioqueue_t,使用epoll的在ioqueue_epoll.c 下列出ioqueue相关的DECLARE_COMMON_IOQUEUE宏
、pj_ioqueue_cfg
、pj_ioqueue_t
#define DECLARE_COMMON_IOQUEUE \
pj_lock_t *lock; \
pj_bool_t auto_delete_lock; \
pj_ioqueue_cfg cfg;
/**
* Additional settings that can be given during ioqueue creation. Application
* MUST initialize this structure with #pj_ioqueue_cfg_default().
*/
typedef struct pj_ioqueue_cfg
{
/**
* Specify flags to control e.g. how events are handled when epoll backend
* is used on Linux. The values are combination of pj_ioqueue_epoll_flag.
* The default value is PJ_IOQUEUE_DEFAULT_EPOLL_FLAGS, which by default
* is set to PJ_IOQUEUE_EPOLL_AUTO. This setting will be ignored for other
* ioqueue backends.
*/
unsigned epoll_flags;
/**
* Default concurrency for the handles registered to this ioqueue. Setting
* this to non-zero enables a handle to process more than one operations
* at the same time using different threads. Default is
* PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY. This setting is equivalent to
* calling pj_ioqueue_set_default_concurrency() after creating the ioqueue.
*/
pj_bool_t default_concurrency;
} pj_ioqueue_cfg;
/*
* This describes the I/O queue.
*/
struct pj_ioqueue_t
{
DECLARE_COMMON_IOQUEUE
unsigned max, count;
//pj_ioqueue_key_t hlist;
pj_ioqueue_key_t active_list;
int epfd;
//struct epoll_event *events;
//struct queue *queue;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_mutex_t *ref_cnt_mutex;
pj_ioqueue_key_t closing_list;
pj_ioqueue_key_t free_list;
#endif
};
有三个 pj_ioqueue_key_t类型的队列active_list、closing_list、free_list,
pj_ioqueue_key_t
#define DECLARE_COMMON_KEY \
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
pj_ioqueue_t *ioqueue; \
pj_grp_lock_t *grp_lock; \
pj_lock_t *lock; \
pj_bool_t inside_callback; \
pj_bool_t destroy_requested; \
pj_bool_t allow_concurrent; \
pj_sock_t fd; \
int fd_type; \
void *user_data; \
pj_ioqueue_callback cb; \
int connecting; \
struct read_operation read_list; \
struct write_operation write_list; \
struct accept_operation accept_list; \
UNREG_FIELDS
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
{
DECLARE_COMMON_KEY
struct epoll_event ev;
};
pj_ioqueue_key_t中出现了epoll_event 是linux中结构
epoll_event
typedef [union](https://so.csdn.net/so/search?q=union&spm=1001.2101.3001.7020) epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;//保存触发事件的某个文件描述符相关的数据
struct epoll_event {
__uint32_t events; /* [epoll](https://so.csdn.net/so/search?q=epoll&spm=1001.2101.3001.7020) event */
epoll_data_t data; /* User data variable */
};
pj_ioqueue_callback
接下来介绍I/O结束的回调函数
/**
* This structure describes the callbacks to be called when I/O operation
* completes.
*/
typedef struct pj_ioqueue_callback
{
/**
* This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom
* completes.
*
* @param key The key.
* @param op_key Operation key.
* @param bytes_read >= 0 to indicate the amount of data read,
* otherwise negative value containing the error
* code. To obtain the pj_status_t error code, use
* (pj_status_t code = -bytes_read).
*/
void (*on_read_complete)(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read);
/**
* This callback is called when #pj_ioqueue_send or #pj_ioqueue_sendto
* completes.
*
* @param key The key.
* @param op_key Operation key.
* @param bytes_sent >= 0 to indicate the amount of data written,
* otherwise negative value containing the error
* code. To obtain the pj_status_t error code, use
* (pj_status_t code = -bytes_sent).
*/
void (*on_write_complete)(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent);
/**
* This callback is called when #pj_ioqueue_accept completes.
*
* @param key The key.
* @param op_key Operation key.
* @param sock Newly connected socket.
* @param status Zero if the operation completes successfully.
*/
void (*on_accept_complete)(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t sock,
pj_status_t status);
/**
* This callback is called when #pj_ioqueue_connect completes.
*
* @param key The key.
* @param status PJ_SUCCESS if the operation completes successfully.
*/
void (*on_connect_complete)(pj_ioqueue_key_t *key,
pj_status_t status);
} pj_ioqueue_callback;
pj_ioqueue_op_key_t
typedef struct pj_ioqueue_op_key_t
{
void *internal__[32]; /**< Internal I/O Queue data. */
void *activesock_data; /**< Active socket data. */
void *user_data; /**< Application data. */
} pj_ioqueue_op_key_t;
初始化相关
pj_ioqueue_create2
pj_ioqueue_create(空实现调用pj_ioqueue_create2)
status = pj_ioqueue_create( endpt->pool, PJSIP_MAX_TRANSPORTS, &endpt->ioqueue);
if (status != PJ_SUCCESS) {
goto on_error;
}
初始化ioqueue的空间、ioqueue->lock、ioqueue->auto_delete_lock、ioqueue->cfg(pj_ioqueue_cfg)、ioqueue->max、ioqueue->count、ioqueue->cfg.epoll_flags(epoll type)、ioqueue->ref_cnt_mutex、ioqueue->free_list(对max_fd个key初始化key->ref_count、key->lock然后加入freelist)、ioqueue->closing_list、ioqueue->epfd(epoll fd调用epoll create)
ioqueue_init_key
static pj_status_t ioqueue_init_key( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
pj_sock_t sock,
pj_grp_lock_t *grp_lock,
void *user_data,
const pj_ioqueue_callback *cb)
初始化key的key->ioqueue、key->fd、key->user_data(这里放到是transport_udp)、key->read_list、key->write_list、key->accept_list、key->connecting = 0、key->cb(callback)、key->closing 、key->allow_concurrent(pj_ioqueue_set_concurrency)、key->fd_type、key->grp_lock
pj_ioqueue_register_sock2
pjmedia_transport_udp_create3->pjmedia_transport_udp_attach->pj_ioqueue_register_sock2
os_ioctl 设置socket to nonblocking.
从ioqueue的freelist中取得key,ioqueue_init_key,然后初始化key->ev(epoll_event类型 data.ptr 是key本身), os_epoll_ctl注册 fd-events监听其socket事件。
PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_sock_t sock,
pj_grp_lock_t *grp_lock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **p_key)
status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtp_sock, grp_lock,
tp, &rtp_cb, &tp->rtp_key);
注意在pjmedia_transport_udp_attach
中调用pj_ioqueue_register_sock2传入的是tp->rtp_key,pj_ioqueue_register_sock2返回后,会将绑定socket后的key保存在rtp_key中
ioqueue 读取写入流程
ioqueue 读取全流程
pj_ioqueue_poll,发现事件-》ioqueue_dispatch_read_event-》on_rx_request-》on_rx_request
pj_ioqueue_poll
事件触发后执行主要使用os_epoll_wait,执行分发回调
一般会另开一个工作线程,不停循环,执行epoll_wait以监听读取/写入rtp包的事件请求
while(1){
pj_ioqueue_poll()
}
下面详细介绍pj_ioqueue_poll
先调用os_epoll_wait,事件放到events中,遍历所有events。根据read,write事件放入到queue中,queue结构如下:
struct queue
{
pj_ioqueue_key_t *key;
enum ioqueue_event_type event_type;
};
enum ioqueue_event_type
{
NO_EVENT,
READABLE_EVENT = 1,
WRITEABLE_EVENT = 2,
EXCEPTION_EVENT = 4,
};
最后在遍历queue,根据事件执行相应的处理函数
READABLE_EVENT:ioqueue_dispatch_read_event
WRITEABLE_EVENT:ioqueue_dispatch_write_event
EXCEPTION_EVENT:ioqueue_dispatch_exception_e
ioqueue_dispatch_read_event 读取操作
首先要看一下read_operation
struct read_operation
{
PJ_DECL_LIST_MEMBER(struct read_operation);
pj_ioqueue_operation_e op;
void *buf;
pj_size_t size;
unsigned flags;
pj_sockaddr_t *rmt_addr;
int *rmt_addrlen;
};
/**
* Types of pending I/O Queue operation. This enumeration is only used
* internally within the ioqueue.
*/
typedef enum pj_ioqueue_operation_e
{
PJ_IOQUEUE_OP_NONE = 0, /**< No operation. */
PJ_IOQUEUE_OP_READ = 1, /**< read() operation. */
PJ_IOQUEUE_OP_RECV = 2, /**< recv() operation. */
PJ_IOQUEUE_OP_RECV_FROM = 4, /**< recvfrom() operation. */
PJ_IOQUEUE_OP_WRITE = 8, /**< write() operation. */
PJ_IOQUEUE_OP_SEND = 16, /**< send() operation. */
PJ_IOQUEUE_OP_SEND_TO = 32, /**< sendto() operation. */
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
PJ_IOQUEUE_OP_ACCEPT = 64, /**< accept() operation. */
PJ_IOQUEUE_OP_CONNECT = 128 /**< connect() operation. */
#endif /* PJ_HAS_TCP */
} pj_ioqueue_operation_e;
众多read_operation会挂在key中struct read_operation read_list;
上通过key_has_pending_read
判断是否有pending的read操作
具体流程:
先看key read_list上有没有pending_read,有的话,从read_list取出,根据read_list的read_op确定读入大小,pj_sock_recvfrom接受数据的函数,将数据读入到read_op中,最后调用on_read_complete,回调函数已在pj_ioqueue_register_sock2时设置过,传入read_op
(*h->cb.on_read_complete)(h,
(pj_ioqueue_op_key_t*)read_op,
bytes_read);
ioqueue_dispatch_write_event写操作
与ioqueue_dispatch_read_event相似,先看write_operation
struct write_operation
{
PJ_DECL_LIST_MEMBER(struct write_operation);
pj_ioqueue_operation_e op;
char *buf;
pj_size_t size;
pj_ssize_t written;
unsigned flags;
pj_sockaddr_in rmt_addr;
int rmt_addrlen;
};
众多write_operation会挂在key中struct write_operation write_list;
上通过key_has_pending_write
判断是否有pending的write操作
具体流程:
先看key write_list上有没有pending_write,有的话,从write_list取出,根据write_list的 write_op确定写大小,要写入的数据,将数据写入调用pj_sock_send函数Transmit data to the socket.,最后调用on_write_complete,回调函数已在pj_ioqueue_register_sock2时设置过,传入write_op
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
(*h->cb.on_write_complete)(h,
(pj_ioqueue_op_key_t*)write_op,
write_op->written);
}
on_rx_rtp 读取操作
on_rx_rtp是被下面调用
(*h->cb.on_read_complete)(h,
(pj_ioqueue_op_key_t*)read_op,
bytes_read);
这里有一个很有意思的问题,就是read_op 在on_rx_rtp中竟然没有使用,下面来分析一下原因
udp = (struct transport_udp*) pj_ioqueue_get_user_data(key);
我们的read_op是从readlist中取出的,readlist对于读操作添加是依靠pj_ioqueue_recvfrom函数,在data is not immediately available时将read_op加入readlist
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
这里read_op->buf = buffer;的buffer,来自udp->rtp_pkt,相当于直接写入了rtp_pkt,所以不用read_op了。
on_rx_rtp是一个while循环,条件如下status来自pj_ioqueue_recvfrom的结果
status != PJ_EPENDING && status != PJ_ECANCELLED &&
udp->started
call_rtp_cb
在while循环里,先执行call_rtp_cb,设置pjmedia_tp_cb_param param;,调用(*cb2)(¶m);cb2由transport_attach2-》tp_attach设置为stream.c ::on_rx_rtp。注意param.pkt = udp->rtp_pkt;,这里rtp_pkt其实就是ioqueue_dispatch_read_event中read_op->buf中读到的数据rtp包
/* Call RTP cb. */
static void call_rtp_cb(struct transport_udp *udp, pj_ssize_t bytes_read,
pj_bool_t *rem_switch)
{
void (*cb)(void*,void*,pj_ssize_t);
void (*cb2)(pjmedia_tp_cb_param*);
void *user_data;
cb = udp->rtp_cb;
cb2 = udp->rtp_cb2;
user_data = udp->user_data;
if (cb2) {
pjmedia_tp_cb_param param;
param.user_data = user_data;
param.pkt = udp->rtp_pkt;
param.size = bytes_read;
param.src_addr = &udp->rtp_src_addr;
param.rem_switch = PJ_FALSE;
(*cb2)(¶m);
if (rem_switch)
*rem_switch = param.rem_switch;
} else if (cb) {
(*cb)(user_data, udp->rtp_pkt, bytes_read);
}
}
param.user_data = user_data; 注意这个user_data,是pjmedia_stream *stream
pj_ioqueue_recvfrom
接下来是调用pj_ioqueue_recvfrom,至于为什么明明ioqueue_dispatch_read_event已经读取了数据,此时还在读取数据,是因为可能有新的rtp包到达,pj_ioqueue_recvfrom查看有没有到达的包,如果有就调用pj_sock_recvfrom继续读读到udp->rtp_pkt,如果没有加到readlist中,返回PJ_EPENDING,结束on_rx_rtp中的while循环。
on_rx_rtp::cb2
Stream.c中的回调 tp_attach中设置该回调
该函数处理接收到的rtp包, 解析成payload和head
Put "good" packet to jitter buffer,需要先把payload解析成frame,再把frame放入jitter buffer