From 5bacde37a28c0aac2fb7d6fbbb799331c58f55e7 Mon Sep 17 00:00:00 2001 From: Ben Ford <bford@digium.com> Date: Thu, 10 May 2018 13:11:06 -0500 Subject: [PATCH] res_rtp_asterisk: Add support for sending NACK requests. Support has been added for receiving a NACK request and handling it. Now, Asterisk can detect when a NACK request should be sent and knows how to construct one based on the packets we've received from the remote end. A buffer has been added that will store out of order packets until we receive the packet we are expecting. Then, these packets are handled like normal and frames are queued to the core like normal. Asterisk knows which packets to request in the NACK request using a vector which stores the sequence numbers of the packets we are currently missing. If a missing packet is received, cycle through the buffer until we reach another packet we have not received yet. If the buffer reaches a certain size, send a NACK request. If the buffer reaches its max size, queue all frames to the core and wipe the buffer and vector. According to RFC3711, the NACK request must be sent out in a compound packet. All compound packets must start with a sender or receiver report, so some work was done to refactor the current sender / receiver code to allow it to be used without having to also include sdes information and automatically send the report. Also added additional functionality to ast_data_buffer, along with some testing. For more information, refer to the wiki page: https://wiki.asterisk.org/wiki/display/AST/WebRTC+User+Experience+Improvements ASTERISK-27810 #close Change-Id: Idab644b08a1593659c92cda64132ccc203fe991d --- include/asterisk/data_buffer.h | 29 + main/data_buffer.c | 54 ++ res/res_rtp_asterisk.c | 1197 ++++++++++++++++++++++---------- tests/test_data_buffer.c | 32 + 4 files changed, 931 insertions(+), 381 deletions(-) diff --git a/include/asterisk/data_buffer.h b/include/asterisk/data_buffer.h index dacbaa5e4e..66aad2722a 100644 --- a/include/asterisk/data_buffer.h +++ b/include/asterisk/data_buffer.h @@ -110,6 +110,35 @@ int ast_data_buffer_put(struct ast_data_buffer *buffer, size_t pos, void *payloa */ void *ast_data_buffer_get(const struct ast_data_buffer *buffer, size_t pos); +/*! + * \brief Remove a data payload from the data buffer + * + * \param buffer The data buffer + * \param pos The position of the data payload + * + * \retval non-NULL success + * \retval NULL failure + * + * \note This DOES remove the data payload from the data buffer. It does not free it, though. + * + * \since 15.5.0 + */ +void *ast_data_buffer_remove(struct ast_data_buffer *buffer, size_t pos); + +/*! + * \brief Remove the first payload from the data buffer + * + * \param buffer The data buffer + * + * \retval non-NULL success + * \retval NULL failure + * + * \note This DOES remove the data payload from the data buffer. + * + * \since 15.5.0 + */ +void *ast_data_buffer_remove_head(struct ast_data_buffer *buffer); + /*! * \brief Free a data buffer (and all held data payloads) * diff --git a/main/data_buffer.c b/main/data_buffer.c index ccbffd22da..cfc323c680 100644 --- a/main/data_buffer.c +++ b/main/data_buffer.c @@ -281,6 +281,60 @@ void *ast_data_buffer_get(const struct ast_data_buffer *buffer, size_t pos) return NULL; } +static void data_buffer_free_buffer_payload(struct ast_data_buffer *buffer, + struct data_buffer_payload_entry *buffer_payload) +{ + buffer_payload->payload = NULL; + buffer->count--; + + if (buffer->cache_count < CACHED_PAYLOAD_MAX + && buffer->cache_count < (buffer->max - buffer->count)) { + AST_LIST_INSERT_TAIL(&buffer->cached_payloads, buffer_payload, list); + buffer->cache_count++; + } else { + ast_free(buffer_payload); + } +} + +void *ast_data_buffer_remove(struct ast_data_buffer *buffer, size_t pos) +{ + struct data_buffer_payload_entry *buffer_payload; + + ast_assert(buffer != NULL); + + AST_LIST_TRAVERSE_SAFE_BEGIN(&buffer->payloads, buffer_payload, list) { + if (buffer_payload->pos == pos) { + void *payload = buffer_payload->payload; + + AST_LIST_REMOVE_CURRENT(list); + data_buffer_free_buffer_payload(buffer, buffer_payload); + + return payload; + } + } + AST_LIST_TRAVERSE_SAFE_END; + + return NULL; +} + +void *ast_data_buffer_remove_head(struct ast_data_buffer *buffer) +{ + ast_assert(buffer != NULL); + + if (buffer->count > 0) { + struct data_buffer_payload_entry *buffer_payload; + void *payload; + + buffer_payload = AST_LIST_REMOVE_HEAD(&buffer->payloads, list); + payload = buffer_payload->payload; + data_buffer_free_buffer_payload(buffer, buffer_payload); + + return payload; + } + + return NULL; +} + void ast_data_buffer_free(struct ast_data_buffer *buffer) { struct data_buffer_payload_entry *buffer_payload; diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c index f6e26d6ae5..36b9cb9a41 100644 --- a/res/res_rtp_asterisk.c +++ b/res/res_rtp_asterisk.c @@ -103,7 +103,8 @@ #define TURN_STATE_WAIT_TIME 2000 -#define DEFAULT_RTP_BUFFER_SIZE 250 +#define DEFAULT_RTP_SEND_BUFFER_SIZE 250 +#define DEFAULT_RTP_RECV_BUFFER_SIZE 20 /*! Full INTRA-frame Request / Fast Update Request (From RFC2032) */ #define RTCP_PT_FUR 192 @@ -323,6 +324,8 @@ struct ast_rtp { unsigned int lastotexttimestamp; unsigned int lasteventseqn; int lastrxseqno; /*!< Last received sequence number, from the network */ + int expectedrxseqno; /*!< Next expected sequence number, from the network */ + AST_VECTOR(, int) missing_seqno; /*!< A vector of sequence numbers we never received */ int expectedseqno; /*!< Next expected sequence number, from the core */ unsigned short seedrxseqno; /*!< What sequence number did they start with?*/ unsigned int seedrxts; /*!< What RTP timestamp did they start with? */ @@ -388,6 +391,7 @@ struct ast_rtp { struct rtp_red *red; struct ast_data_buffer *send_buffer; /*!< Buffer for storing sent packets for retransmission */ + struct ast_data_buffer *recv_buffer; /*!< Buffer for storing received packets for retransmission */ #ifdef HAVE_PJPROJECT ast_cond_t cond; /*!< ICE/TURN condition for signaling */ @@ -2771,6 +2775,18 @@ static int dtls_srtp_setup(struct ast_rtp *rtp, struct ast_srtp *srtp, struct as } #endif +/*! \brief Helper function to compare an elem in a vector by value */ +static int compare_by_value(int elem, int value) +{ + return elem - value; +} + +/*! \brief Helper function to find an elem in a vector by value */ +static int find_by_value(int elem, int value) +{ + return elem == value; +} + static int rtcp_mux(struct ast_rtp *rtp, const unsigned char *packet) { uint8_t version; @@ -2941,11 +2957,6 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s } #endif - if ((*in & 0xC0) && res_srtp && srtp && res_srtp->unprotect( - srtp, buf, &len, rtcp || rtcp_mux(rtp, buf)) < 0) { - return -1; - } - return len; } @@ -3629,6 +3640,7 @@ static int ast_rtp_new(struct ast_rtp_instance *instance, rtp->ssrc = ast_random(); ast_uuid_generate_str(rtp->cname, sizeof(rtp->cname)); rtp->seqno = ast_random() & 0x7fff; + rtp->expectedrxseqno = -1; rtp->expectedseqno = -1; rtp->sched = sched; ast_sockaddr_copy(&rtp->bind_address, addr); @@ -3713,10 +3725,16 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance) ast_data_buffer_free(rtp->send_buffer); } + /* Destroy the recv buffer if it was being used */ + if (rtp->recv_buffer) { + ast_data_buffer_free(rtp->recv_buffer); + } + ao2_cleanup(rtp->lasttxformat); ao2_cleanup(rtp->lastrxformat); ao2_cleanup(rtp->f.subclass.format); AST_VECTOR_FREE(&rtp->ssrc_mapping); + AST_VECTOR_FREE(&rtp->missing_seqno); /* Finally destroy ourselves */ ast_free(rtp); @@ -3983,6 +4001,9 @@ static void ast_rtp_change_source(struct ast_rtp_instance *instance) rtp->ssrc = ssrc; + /* Since the source is changing, we don't know what sequence number to expect next */ + rtp->expectedrxseqno = -1; + return; } @@ -4078,39 +4099,24 @@ static void calculate_lost_packet_statistics(struct ast_rtp *rtp, rtp->rtcp->rxlost_count++; } -/*! - * \brief Send RTCP SR or RR report - * - * \pre instance is locked - */ -static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) +static int ast_rtcp_generate_report(struct ast_rtp_instance *instance, unsigned char *rtcpheader, + struct ast_rtp_rtcp_report *rtcp_report, int *sr) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); - RAII_VAR(struct ast_json *, message_blob, NULL, ast_json_unref); - int res; int len = 0; - uint16_t sdes_packet_len_bytes, sdes_packet_len_rounded; struct timeval now; unsigned int now_lsw; unsigned int now_msw; - unsigned char *rtcpheader; unsigned int lost_packets; int fraction_lost; struct timeval dlsr = { 0, }; - unsigned char bdata[AST_UUID_STR_LEN + 128] = ""; /* More than enough */ - int rate = rtp_get_rate(rtp->f.subclass.format); - int ice; - struct ast_sockaddr remote_address = { { 0, } }; struct ast_rtp_rtcp_report_block *report_block = NULL; - RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report, - ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0), - ao2_cleanup); if (!rtp || !rtp->rtcp) { return 0; } - if (ast_sockaddr_isnull(&rtp->rtcp->them)) { /* This'll stop rtcp for this rtp session */ + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { /* This'll stop rtcp for this rtp session */ /* RTCP was stopped. */ return 0; } @@ -4119,14 +4125,16 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) return 1; } + *sr = rtp->txcount > rtp->rtcp->lastsrtxcount ? 1 : 0; + /* Compute statistics */ calculate_lost_packet_statistics(rtp, &lost_packets, &fraction_lost); gettimeofday(&now, NULL); rtcp_report->reception_report_count = rtp->themssrc_valid ? 1 : 0; rtcp_report->ssrc = rtp->ssrc; - rtcp_report->type = sr ? RTCP_PT_SR : RTCP_PT_RR; - if (sr) { + rtcp_report->type = *sr ? RTCP_PT_SR : RTCP_PT_RR; + if (*sr) { rtcp_report->sender_information.ntp_timestamp = now; rtcp_report->sender_information.rtp_timestamp = rtp->lastts; rtcp_report->sender_information.packet_count = rtp->txcount; @@ -4144,7 +4152,7 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) report_block->lost_count.fraction = (fraction_lost & 0xff); report_block->lost_count.packets = (lost_packets & 0xffffff); report_block->highest_seq_no = (rtp->cycles | (rtp->lastrxseqno & 0xffff)); - report_block->ia_jitter = (unsigned int)(rtp->rxjitter * rate); + report_block->ia_jitter = (unsigned int)(rtp->rxjitter * rtp_get_rate(rtp->f.subclass.format)); report_block->lsr = rtp->rtcp->themrxlsr; /* If we haven't received an SR report, DLSR should be 0 */ if (!ast_tvzero(rtp->rtcp->rxlsr)) { @@ -4153,11 +4161,10 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) } } timeval2ntp(rtcp_report->sender_information.ntp_timestamp, &now_msw, &now_lsw); - rtcpheader = bdata; put_unaligned_uint32(rtcpheader + 4, htonl(rtcp_report->ssrc)); /* Our SSRC */ len += 8; - if (sr) { - put_unaligned_uint32(rtcpheader + len, htonl(now_msw)); /* now, MSW. gettimeofday() + SEC_BETWEEN_1900_AND_1970*/ + if (*sr) { + put_unaligned_uint32(rtcpheader + len, htonl(now_msw)); /* now, MSW. gettimeofday() + SEC_BETWEEN_1900_AND_1970 */ put_unaligned_uint32(rtcpheader + len + 4, htonl(now_lsw)); /* now, LSW */ put_unaligned_uint32(rtcpheader + len + 8, htonl(rtcp_report->sender_information.rtp_timestamp)); put_unaligned_uint32(rtcpheader + len + 12, htonl(rtcp_report->sender_information.packet_count)); @@ -4175,53 +4182,32 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) } put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (rtcp_report->reception_report_count << 24) - | ((sr ? RTCP_PT_SR : RTCP_PT_RR) << 16) | ((len/4)-1))); - - sdes_packet_len_bytes = - 4 + /* RTCP Header */ - 4 + /* SSRC */ - 1 + /* Type (CNAME) */ - 1 + /* Text Length */ - AST_UUID_STR_LEN /* Text and NULL terminator */ - ; - - /* Round to 32 bit boundary */ - sdes_packet_len_rounded = (sdes_packet_len_bytes + 3) & ~0x3; + | ((*sr ? RTCP_PT_SR : RTCP_PT_RR) << 16) | ((len/4)-1))); - put_unaligned_uint32(rtcpheader + len, htonl((2 << 30) | (1 << 24) | (RTCP_PT_SDES << 16) | ((sdes_packet_len_rounded / 4) - 1))); - put_unaligned_uint32(rtcpheader + len + 4, htonl(rtcp_report->ssrc)); - rtcpheader[len + 8] = 0x01; /* CNAME */ - rtcpheader[len + 9] = AST_UUID_STR_LEN - 1; /* Number of bytes of text */ - memcpy(rtcpheader + len + 10, rtp->cname, AST_UUID_STR_LEN); - len += 10 + AST_UUID_STR_LEN; + return len; +} - /* Padding - Note that we don't set the padded bit on the packet. From - * RFC 3550 Section 6.5: - * - * No length octet follows the null item type octet, but additional null - * octets MUST be included if needed to pad until the next 32-bit - * boundary. Note that this padding is separate from that indicated by - * the P bit in the RTCP header. - * - * These bytes will already be zeroed out during array initialization. - */ - len += (sdes_packet_len_rounded - sdes_packet_len_bytes); +static int ast_rtcp_calculate_sr_rr_statistics(struct ast_rtp_instance *instance, + struct ast_rtp_rtcp_report *rtcp_report, struct ast_sockaddr remote_address, int ice, int sr) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + struct ast_rtp_rtcp_report_block *report_block = NULL; + RAII_VAR(struct ast_json *, message_blob, NULL, ast_json_unref); - if (rtp->bundled) { - ast_rtp_instance_get_remote_address(instance, &remote_address); - } else { - ast_sockaddr_copy(&remote_address, &rtp->rtcp->them); + if (!rtp || !rtp->rtcp) { + return 0; } - res = rtcp_sendto(instance, (unsigned int *)rtcpheader, len, 0, &remote_address, &ice); - if (res < 0) { - ast_log(LOG_ERROR, "RTCP %s transmission error to %s, rtcp halted %s\n", - sr ? "SR" : "RR", - ast_sockaddr_stringify(&rtp->rtcp->them), - strerror(errno)); + + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { return 0; } - /* Update RTCP SR/RR statistics */ + if (!rtcp_report) { + return -1; + } + + report_block = rtcp_report->report_block[0]; + if (sr) { rtp->rtcp->txlsr = rtcp_report->sender_information.ntp_timestamp; rtp->rtcp->sr_count++; @@ -4248,7 +4234,7 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) ast_verbose(" Fraction lost: %d\n", report_block->lost_count.fraction); ast_verbose(" Cumulative loss: %u\n", report_block->lost_count.packets); ast_verbose(" Highest seq no: %u\n", report_block->highest_seq_no); - ast_verbose(" IA jitter: %.4f\n", (double)report_block->ia_jitter / rate); + ast_verbose(" IA jitter: %.4f\n", (double)report_block->ia_jitter / rtp_get_rate(rtp->f.subclass.format)); ast_verbose(" Their last SR: %u\n", report_block->lsr); ast_verbose(" DLSR: %4.4f (sec)\n\n", (double)(report_block->dlsr / 65536.0)); } @@ -4258,9 +4244,121 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) "to", ast_sockaddr_stringify(&remote_address), "from", rtp->rtcp->local_addr_str); ast_rtp_publish_rtcp_message(instance, ast_rtp_rtcp_sent_type(), - rtcp_report, - message_blob); - return res; + rtcp_report, message_blob); + + return 1; +} + +static int ast_rtcp_generate_sdes(struct ast_rtp_instance *instance, unsigned char *rtcpheader, + struct ast_rtp_rtcp_report *rtcp_report) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + int len = 0; + uint16_t sdes_packet_len_bytes; + uint16_t sdes_packet_len_rounded; + + if (!rtp || !rtp->rtcp) { + return 0; + } + + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { + return 0; + } + + if (!rtcp_report) { + return -1; + } + + sdes_packet_len_bytes = + 4 + /* RTCP Header */ + 4 + /* SSRC */ + 1 + /* Type (CNAME) */ + 1 + /* Text Length */ + AST_UUID_STR_LEN /* Text and NULL terminator */ + ; + + /* Round to 32 bit boundary */ + sdes_packet_len_rounded = (sdes_packet_len_bytes + 3) & ~0x3; + + put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (1 << 24) | (RTCP_PT_SDES << 16) | ((sdes_packet_len_rounded / 4) - 1))); + put_unaligned_uint32(rtcpheader + 4, htonl(rtcp_report->ssrc)); + rtcpheader[8] = 0x01; /* CNAME */ + rtcpheader[9] = AST_UUID_STR_LEN - 1; /* Number of bytes of text */ + memcpy(rtcpheader + 10, rtp->cname, AST_UUID_STR_LEN); + len += 10 + AST_UUID_STR_LEN; + + /* Padding - Note that we don't set the padded bit on the packet. From + * RFC 3550 Section 6.5: + * + * No length octet follows the null item type octet, but additional null + * octets MUST be included if needd to pad until the next 32-bit + * boundary. Note that this padding is separate from that indicated by + * the P bit in the RTCP header. + * + * These bytes will already be zeroed out during array initialization. + */ + len += (sdes_packet_len_rounded - sdes_packet_len_bytes); + + return len; +} + +static int ast_rtcp_generate_nack(struct ast_rtp_instance *instance, unsigned char *rtcpheader) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + int packet_len; + int blp_index; + int current_seqno; + int seqno; + unsigned int fci; + + if (!rtp || !rtp->rtcp) { + return 0; + } + + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { + return 0; + } + + current_seqno = rtp->expectedrxseqno; + seqno = rtp->lastrxseqno; + packet_len = 12; /* The header length is 12 (version line, packet source SSRC, media source SSRC) */ + + /* Get the missing sequence numbers for the FCI section of the NACK request */ + for (blp_index = 0, fci = 0; current_seqno < seqno; current_seqno++, blp_index++) { + int *missing_seqno; + + missing_seqno = AST_VECTOR_GET_CMP(&rtp->missing_seqno, current_seqno, + find_by_value); + + if (!missing_seqno) { + continue; + } + + /* We hit the max blp size, reset */ + if (blp_index >= 17) { + put_unaligned_uint32(rtcpheader + packet_len, htonl(fci)); + fci = 0; + blp_index = 0; + packet_len += 4; + } + + if (blp_index == 0) { + fci |= (current_seqno << 16); + } else { + fci |= (1 << (blp_index - 1)); + } + } + + put_unaligned_uint32(rtcpheader + packet_len, htonl(fci)); + packet_len += 4; + + /* Length MUST be 2+n, where n is the number of NACKs. Same as length in words minus 1 */ + put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (AST_RTP_RTCP_FMT_NACK << 24) + | (AST_RTP_RTCP_RTPFB << 16) | ((packet_len / 4) - 1))); + put_unaligned_uint32(rtcpheader + 4, htonl(rtp->ssrc)); + put_unaligned_uint32(rtcpheader + 8, htonl(rtp->themssrc)); + + return packet_len; } /*! @@ -4276,6 +4374,15 @@ static int ast_rtcp_write(const void *data) struct ast_rtp_instance *instance = (struct ast_rtp_instance *) data; struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); int res; + int sr = 0; + int packet_len = 0; + int ice; + struct ast_sockaddr remote_address = { { 0, } }; + unsigned char *rtcpheader; + unsigned char bdata[AST_UUID_STR_LEN + 128] = ""; /* More than enough */ + RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report, + ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0), + ao2_cleanup); if (!rtp || !rtp->rtcp || rtp->rtcp->schedid == -1) { ao2_ref(instance, -1); @@ -4283,13 +4390,44 @@ static int ast_rtcp_write(const void *data) } ao2_lock(instance); - if (rtp->txcount > rtp->rtcp->lastsrtxcount) { - /* Send an SR */ - res = ast_rtcp_write_report(instance, 1); + rtcpheader = bdata; + + res = ast_rtcp_generate_report(instance, rtcpheader, rtcp_report, &sr); + + if (res == 0 || res == 1) { + ast_debug(1, "Failed to add %s report to RTCP packet!\n", sr ? "SR" : "RR"); + goto cleanup; + } + + packet_len += res; + + res = ast_rtcp_generate_sdes(instance, rtcpheader + packet_len, rtcp_report); + + if (res == 0 || res == 1) { + ast_debug(1, "Failed to add SDES to RTCP packet!\n"); + goto cleanup; + } + + packet_len += res; + + if (rtp->bundled) { + ast_rtp_instance_get_remote_address(instance, &remote_address); + } else { + ast_sockaddr_copy(&remote_address, &rtp->rtcp->them); + } + + res = rtcp_sendto(instance, (unsigned int *)rtcpheader, packet_len, 0, &remote_address, &ice); + if (res < 0) { + ast_log(LOG_ERROR, "RTCP %s transmission error to %s, rtcp halted %s\n", + sr ? "SR" : "RR", + ast_sockaddr_stringify(&rtp->rtcp->them), + strerror(errno)); + res = 0; } else { - /* Send an RR */ - res = ast_rtcp_write_report(instance, 0); + ast_rtcp_calculate_sr_rr_statistics(instance, rtcp_report, remote_address, ice, sr); } + +cleanup: ao2_unlock(instance); if (!res) { @@ -4872,7 +5010,7 @@ static struct ast_frame *create_dtmf_frame(struct ast_rtp_instance *instance, en return &rtp->f; } -static void process_dtmf_rfc2833(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, struct ast_sockaddr *addr, int payloadtype, int mark, struct frame_list *frames) +static void process_dtmf_rfc2833(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, int payloadtype, int mark, struct frame_list *frames) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); struct ast_sockaddr remote_address = { {0,} }; @@ -5007,7 +5145,7 @@ static void process_dtmf_rfc2833(struct ast_rtp_instance *instance, unsigned cha return; } -static struct ast_frame *process_dtmf_cisco(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, struct ast_sockaddr *addr, int payloadtype, int mark) +static struct ast_frame *process_dtmf_cisco(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, int payloadtype, int mark) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); unsigned int event, flags, power; @@ -5087,7 +5225,7 @@ static struct ast_frame *process_dtmf_cisco(struct ast_rtp_instance *instance, u return f; } -static struct ast_frame *process_cn_rfc3389(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, struct ast_sockaddr *addr, int payloadtype, int mark) +static struct ast_frame *process_cn_rfc3389(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, int payloadtype, int mark) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); @@ -5438,10 +5576,12 @@ static int ast_rtp_rtcp_handle_nack(struct ast_rtp_instance *instance, unsigned #define RTCP_FB_REMB_BLOCK_WORD_LENGTH 4 #define RTCP_FB_NACK_BLOCK_WORD_LENGTH 2 -static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, const unsigned char *rtcpdata, size_t size, struct ast_sockaddr *addr) +static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, struct ast_srtp *srtp, + const unsigned char *rtcpdata, size_t size, struct ast_sockaddr *addr) { struct ast_rtp_instance *transport = instance; struct ast_rtp *transport_rtp = ast_rtp_instance_get_data(instance); + int len = size; unsigned int *rtcpheader = (unsigned int *)(rtcpdata); unsigned int packetwords; unsigned int position; @@ -5451,10 +5591,16 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c struct ast_rtp_rtcp_report_block *report_block; struct ast_frame *f = &ast_null_frame; - packetwords = size / 4; + /* If this is encrypted then decrypt the payload */ + if ((*rtcpheader & 0xC0) && res_srtp && srtp && res_srtp->unprotect( + srtp, rtcpheader, &len, 1) < 0) { + return &ast_null_frame; + } + + packetwords = len / 4; - ast_debug(1, "Got RTCP report of %zu bytes from %s\n", - size, ast_sockaddr_stringify(addr)); + ast_debug(1, "Got RTCP report of %d bytes from %s\n", + len, ast_sockaddr_stringify(addr)); /* * Validate the RTCP packet according to an adapted and slightly @@ -5899,6 +6045,7 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c static struct ast_frame *ast_rtcp_read(struct ast_rtp_instance *instance) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + struct ast_srtp *srtp = ast_rtp_instance_get_srtp(instance, 1); struct ast_sockaddr addr; unsigned char rtcpdata[8192 + AST_FRIENDLY_OFFSET]; unsigned char *read_area = rtcpdata + AST_FRIENDLY_OFFSET; @@ -5950,7 +6097,7 @@ static struct ast_frame *ast_rtcp_read(struct ast_rtp_instance *instance) return &ast_null_frame; } - return ast_rtcp_interpret(instance, read_area, res, &addr); + return ast_rtcp_interpret(instance, srtp, read_area, res, &addr); } /*! \pre instance is locked */ @@ -6123,78 +6270,362 @@ static void rtp_instance_unlock(struct ast_rtp_instance *instance) } } -/*! \pre instance is locked */ -static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtcp) +static struct ast_frame *ast_rtp_interpret(struct ast_rtp_instance *instance, struct ast_srtp *srtp, + const struct ast_sockaddr *remote_address, unsigned char *read_area, int length, int prev_seqno) { + unsigned int *rtpheader = (unsigned int*)(read_area); struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); struct ast_rtp_instance *instance1; - RAII_VAR(struct ast_rtp_instance *, child, NULL, rtp_instance_unlock); - struct ast_sockaddr addr; - int res, hdrlen = 12, version, payloadtype, padding, mark, ext, cc, prev_seqno; - unsigned char *read_area = rtp->rawdata + AST_FRIENDLY_OFFSET; - size_t read_area_size = sizeof(rtp->rawdata) - AST_FRIENDLY_OFFSET; - unsigned int *rtpheader = (unsigned int*)(read_area), seqno, ssrc, timestamp; + int res = length, hdrlen = 12, seqno, timestamp, payloadtype, padding, mark, ext, cc; RAII_VAR(struct ast_rtp_payload_type *, payload, NULL, ao2_cleanup); - struct ast_sockaddr remote_address = { {0,} }; struct frame_list frames; - /* If this is actually RTCP let's hop on over and handle it */ - if (rtcp) { - if (rtp->rtcp && rtp->rtcp->type == AST_RTP_INSTANCE_RTCP_STANDARD) { - return ast_rtcp_read(instance); - } + /* If this payload is encrypted then decrypt it using the given SRTP instance */ + if ((*read_area & 0xC0) && res_srtp && srtp && res_srtp->unprotect( + srtp, read_area, &res, 0) < 0) { return &ast_null_frame; } - /* Actually read in the data from the socket */ - if ((res = rtp_recvfrom(instance, read_area, read_area_size, 0, - &addr)) < 0) { - if (res == RTP_DTLS_ESTABLISHED) { - rtp->f.frametype = AST_FRAME_CONTROL; - rtp->f.subclass.integer = AST_CONTROL_SRCCHANGE; - return &rtp->f; - } - - ast_assert(errno != EBADF); - if (errno != EAGAIN) { - ast_log(LOG_WARNING, "RTP Read error: %s. Hanging up.\n", - (errno) ? strerror(errno) : "Unspecified"); - return NULL; - } - return &ast_null_frame; + /* If we are currently sending DTMF to the remote party send a continuation packet */ + if (rtp->sending_digit) { + ast_rtp_dtmf_continuation(instance); } - /* If this was handled by the ICE session don't do anything */ - if (!res) { - return &ast_null_frame; + /* Pull out the various other fields we will need */ + seqno = ntohl(rtpheader[0]); + payloadtype = (seqno & 0x7f0000) >> 16; + padding = seqno & (1 << 29); + mark = seqno & (1 << 23); + ext = seqno & (1 << 28); + cc = (seqno & 0xF000000) >> 24; + seqno &= 0xffff; + timestamp = ntohl(rtpheader[1]); + + AST_LIST_HEAD_INIT_NOLOCK(&frames); + + /* Remove any padding bytes that may be present */ + if (padding) { + res -= read_area[res - 1]; } - /* This could be a multiplexed RTCP packet. If so, be sure to interpret it correctly */ - if (rtcp_mux(rtp, read_area)) { - return ast_rtcp_interpret(instance, read_area, res, &addr); + /* Skip over any CSRC fields */ + if (cc) { + hdrlen += cc * 4; } - /* Make sure the data that was read in is actually enough to make up an RTP packet */ - if (res < hdrlen) { - /* If this is a keepalive containing only nulls, don't bother with a warning */ - int i; - for (i = 0; i < res; ++i) { - if (read_area[i] != '\0') { - ast_log(LOG_WARNING, "RTP Read too short\n"); - return &ast_null_frame; + /* Look for any RTP extensions, currently we do not support any */ + if (ext) { + hdrlen += (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2; + hdrlen += 4; + if (DEBUG_ATLEAST(1)) { + unsigned int profile; + profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16; + if (profile == 0x505a) { + ast_log(LOG_DEBUG, "Found Zfone extension in RTP stream - zrtp - not supported.\n"); + } else if (profile != 0xbede) { + /* SDP negotiated RTP extensions can not currently be output in logging */ + ast_log(LOG_DEBUG, "Found unknown RTP Extensions %x\n", profile); } } - return &ast_null_frame; } - /* Get fields and verify this is an RTP packet */ - seqno = ntohl(rtpheader[0]); - - ast_rtp_instance_get_remote_address(instance, &remote_address); + /* Make sure after we potentially mucked with the header length that it is once again valid */ + if (res < hdrlen) { + ast_log(LOG_WARNING, "RTP Read too short (%d, expecting %d\n", res, hdrlen); + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } - if (!(version = (seqno & 0xC0000000) >> 30)) { - struct sockaddr_in addr_tmp; - struct ast_sockaddr addr_v4; + rtp->rxcount++; + rtp->rxoctetcount += (res - hdrlen); + if (rtp->rxcount == 1) { + rtp->seedrxseqno = seqno; + } + + /* Do not schedule RR if RTCP isn't run */ + if (rtp->rtcp && !ast_sockaddr_isnull(&rtp->rtcp->them) && rtp->rtcp->schedid < 0) { + /* Schedule transmission of Receiver Report */ + ao2_ref(instance, +1); + rtp->rtcp->schedid = ast_sched_add(rtp->sched, ast_rtcp_calc_interval(rtp), ast_rtcp_write, instance); + if (rtp->rtcp->schedid < 0) { + ao2_ref(instance, -1); + ast_log(LOG_WARNING, "scheduling RTCP transmission failed.\n"); + } + } + if ((int)rtp->lastrxseqno - (int)seqno > 100) /* if so it would indicate that the sender cycled; allow for misordering */ + rtp->cycles += RTP_SEQ_MOD; + + /* If we are directly bridged to another instance send the audio directly out, + * but only after updating core information about the received traffic so that + * outgoing RTCP reflects it. + */ + instance1 = ast_rtp_instance_get_bridged(instance); + if (instance1 + && !bridge_p2p_rtp_write(instance, instance1, rtpheader, res, hdrlen)) { + struct timeval rxtime; + struct ast_frame *f; + + /* Update statistics for jitter so they are correct in RTCP */ + calc_rxstamp(&rxtime, rtp, timestamp, mark); + + /* When doing P2P we don't need to raise any frames about SSRC change to the core */ + while ((f = AST_LIST_REMOVE_HEAD(&frames, frame_list)) != NULL) { + ast_frfree(f); + } + + return &ast_null_frame; + } + + payload = ast_rtp_codecs_get_payload(ast_rtp_instance_get_codecs(instance), payloadtype); + if (!payload) { + /* Unknown payload type. */ + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } + + /* If the payload is not actually an Asterisk one but a special one pass it off to the respective handler */ + if (!payload->asterisk_format) { + struct ast_frame *f = NULL; + if (payload->rtp_code == AST_RTP_DTMF) { + /* process_dtmf_rfc2833 may need to return multiple frames. We do this + * by passing the pointer to the frame list to it so that the method + * can append frames to the list as needed. + */ + process_dtmf_rfc2833(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, payloadtype, mark, &frames); + } else if (payload->rtp_code == AST_RTP_CISCO_DTMF) { + f = process_dtmf_cisco(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, payloadtype, mark); + } else if (payload->rtp_code == AST_RTP_CN) { + f = process_cn_rfc3389(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, payloadtype, mark); + } else { + ast_log(LOG_NOTICE, "Unknown RTP codec %d received from '%s'\n", + payloadtype, + ast_sockaddr_stringify(remote_address)); + } + + if (f) { + AST_LIST_INSERT_TAIL(&frames, f, frame_list); + } + /* Even if no frame was returned by one of the above methods, + * we may have a frame to return in our frame list + */ + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } + + ao2_replace(rtp->lastrxformat, payload->format); + ao2_replace(rtp->f.subclass.format, payload->format); + switch (ast_format_get_type(rtp->f.subclass.format)) { + case AST_MEDIA_TYPE_AUDIO: + rtp->f.frametype = AST_FRAME_VOICE; + break; + case AST_MEDIA_TYPE_VIDEO: + rtp->f.frametype = AST_FRAME_VIDEO; + break; + case AST_MEDIA_TYPE_TEXT: + rtp->f.frametype = AST_FRAME_TEXT; + break; + case AST_MEDIA_TYPE_IMAGE: + /* Fall through */ + default: + ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", + ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); + return &ast_null_frame; + } + rtp->rxseqno = seqno; + + if (rtp->dtmf_timeout && rtp->dtmf_timeout < timestamp) { + rtp->dtmf_timeout = 0; + + if (rtp->resp) { + struct ast_frame *f; + f = create_dtmf_frame(instance, AST_FRAME_DTMF_END, 0); + f->len = ast_tvdiff_ms(ast_samp2tv(rtp->dtmf_duration, rtp_get_rate(f->subclass.format)), ast_tv(0, 0)); + rtp->resp = 0; + rtp->dtmf_timeout = rtp->dtmf_duration = 0; + AST_LIST_INSERT_TAIL(&frames, f, frame_list); + return AST_LIST_FIRST(&frames); + } + } + + rtp->lastrxts = timestamp; + + rtp->f.src = "RTP"; + rtp->f.mallocd = 0; + rtp->f.datalen = res - hdrlen; + rtp->f.data.ptr = read_area + hdrlen; + rtp->f.offset = hdrlen + AST_FRIENDLY_OFFSET; + ast_set_flag(&rtp->f, AST_FRFLAG_HAS_SEQUENCE_NUMBER); + rtp->f.seqno = seqno; + rtp->f.stream_num = rtp->stream_num; + + if ((ast_format_cmp(rtp->f.subclass.format, ast_format_t140) == AST_FORMAT_CMP_EQUAL) + && ((int)seqno - (prev_seqno + 1) > 0) + && ((int)seqno - (prev_seqno + 1) < 10)) { + unsigned char *data = rtp->f.data.ptr; + + memmove(rtp->f.data.ptr+3, rtp->f.data.ptr, rtp->f.datalen); + rtp->f.datalen +=3; + *data++ = 0xEF; + *data++ = 0xBF; + *data = 0xBD; + } + + if (ast_format_cmp(rtp->f.subclass.format, ast_format_t140_red) == AST_FORMAT_CMP_EQUAL) { + unsigned char *data = rtp->f.data.ptr; + unsigned char *header_end; + int num_generations; + int header_length; + int len; + int diff =(int)seqno - (prev_seqno+1); /* if diff = 0, no drop*/ + int x; + + ao2_replace(rtp->f.subclass.format, ast_format_t140); + header_end = memchr(data, ((*data) & 0x7f), rtp->f.datalen); + if (header_end == NULL) { + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } + header_end++; + + header_length = header_end - data; + num_generations = header_length / 4; + len = header_length; + + if (!diff) { + for (x = 0; x < num_generations; x++) + len += data[x * 4 + 3]; + + if (!(rtp->f.datalen - len)) + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + + rtp->f.data.ptr += len; + rtp->f.datalen -= len; + } else if (diff > num_generations && diff < 10) { + len -= 3; + rtp->f.data.ptr += len; + rtp->f.datalen -= len; + + data = rtp->f.data.ptr; + *data++ = 0xEF; + *data++ = 0xBF; + *data = 0xBD; + } else { + for ( x = 0; x < num_generations - diff; x++) + len += data[x * 4 + 3]; + + rtp->f.data.ptr += len; + rtp->f.datalen -= len; + } + } + + if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_AUDIO) { + rtp->f.samples = ast_codec_samples_count(&rtp->f); + if (ast_format_cache_is_slinear(rtp->f.subclass.format)) { + ast_frame_byteswap_be(&rtp->f); + } + calc_rxstamp(&rtp->f.delivery, rtp, timestamp, mark); + /* Add timing data to let ast_generic_bridge() put the frame into a jitterbuf */ + ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); + rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); + rtp->f.len = rtp->f.samples / ((ast_format_get_sample_rate(rtp->f.subclass.format) / 1000)); + } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_VIDEO) { + /* Video -- samples is # of samples vs. 90000 */ + if (!rtp->lastividtimestamp) + rtp->lastividtimestamp = timestamp; + ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); + rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); + rtp->f.samples = timestamp - rtp->lastividtimestamp; + rtp->lastividtimestamp = timestamp; + rtp->f.delivery.tv_sec = 0; + rtp->f.delivery.tv_usec = 0; + /* Pass the RTP marker bit as bit */ + rtp->f.subclass.frame_ending = mark ? 1 : 0; + } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_TEXT) { + /* TEXT -- samples is # of samples vs. 1000 */ + if (!rtp->lastitexttimestamp) + rtp->lastitexttimestamp = timestamp; + rtp->f.samples = timestamp - rtp->lastitexttimestamp; + rtp->lastitexttimestamp = timestamp; + rtp->f.delivery.tv_sec = 0; + rtp->f.delivery.tv_usec = 0; + } else { + ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", + ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); + return &ast_null_frame; + } + + AST_LIST_INSERT_TAIL(&frames, &rtp->f, frame_list); + return AST_LIST_FIRST(&frames); +} + +/*! \pre instance is locked */ +static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtcp) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + struct ast_srtp *srtp; + RAII_VAR(struct ast_rtp_instance *, child, NULL, rtp_instance_unlock); + struct ast_sockaddr addr; + int res, hdrlen = 12, version, payloadtype, mark; + unsigned char *read_area = rtp->rawdata + AST_FRIENDLY_OFFSET; + size_t read_area_size = sizeof(rtp->rawdata) - AST_FRIENDLY_OFFSET; + unsigned int *rtpheader = (unsigned int*)(read_area), seqno, ssrc, timestamp, prev_seqno; + struct ast_sockaddr remote_address = { {0,} }; + struct frame_list frames; + struct ast_frame *frame; + + /* If this is actually RTCP let's hop on over and handle it */ + if (rtcp) { + if (rtp->rtcp && rtp->rtcp->type == AST_RTP_INSTANCE_RTCP_STANDARD) { + return ast_rtcp_read(instance); + } + return &ast_null_frame; + } + + /* Actually read in the data from the socket */ + if ((res = rtp_recvfrom(instance, read_area, read_area_size, 0, + &addr)) < 0) { + if (res == RTP_DTLS_ESTABLISHED) { + rtp->f.frametype = AST_FRAME_CONTROL; + rtp->f.subclass.integer = AST_CONTROL_SRCCHANGE; + return &rtp->f; + } + + ast_assert(errno != EBADF); + if (errno != EAGAIN) { + ast_log(LOG_WARNING, "RTP Read error: %s. Hanging up.\n", + (errno) ? strerror(errno) : "Unspecified"); + return NULL; + } + return &ast_null_frame; + } + + /* If this was handled by the ICE session don't do anything */ + if (!res) { + return &ast_null_frame; + } + + /* This could be a multiplexed RTCP packet. If so, be sure to interpret it correctly */ + if (rtcp_mux(rtp, read_area)) { + return ast_rtcp_interpret(instance, ast_rtp_instance_get_srtp(instance, 1), read_area, res, &addr); + } + + /* Make sure the data that was read in is actually enough to make up an RTP packet */ + if (res < hdrlen) { + /* If this is a keepalive containing only nulls, don't bother with a warning */ + int i; + for (i = 0; i < res; ++i) { + if (read_area[i] != '\0') { + ast_log(LOG_WARNING, "RTP Read too short\n"); + return &ast_null_frame; + } + } + return &ast_null_frame; + } + + /* Get fields and verify this is an RTP packet */ + seqno = ntohl(rtpheader[0]); + + ast_rtp_instance_get_remote_address(instance, &remote_address); + + if (!(version = (seqno & 0xC0000000) >> 30)) { + struct sockaddr_in addr_tmp; + struct ast_sockaddr addr_v4; if (ast_sockaddr_is_ipv4(&addr)) { ast_sockaddr_to_sin(&addr, &addr_tmp); } else if (ast_sockaddr_ipv4_mapped(&addr, &addr_v4)) { @@ -6222,6 +6653,9 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc /* We use the SSRC to determine what RTP instance this packet is actually for */ ssrc = ntohl(rtpheader[2]); + /* We use the SRTP data from the provided instance that it came in on, not the child */ + srtp = ast_rtp_instance_get_srtp(instance, 0); + /* Determine the appropriate instance for this */ child = rtp_find_instance_by_packet_source_ssrc(instance, rtp, ssrc); if (!child) { @@ -6397,20 +6831,18 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc } } - /* If we are currently sending DTMF to the remote party send a continuation packet */ - if (rtp->sending_digit) { - ast_rtp_dtmf_continuation(instance); - } - /* Pull out the various other fields we will need */ payloadtype = (seqno & 0x7f0000) >> 16; - padding = seqno & (1 << 29); mark = seqno & (1 << 23); - ext = seqno & (1 << 28); - cc = (seqno & 0xF000000) >> 24; seqno &= 0xffff; timestamp = ntohl(rtpheader[1]); + if (rtp_debug_test_addr(&addr)) { + ast_verbose("Got RTP packet from %s (type %-2.2d, seq %-6.6u, ts %-6.6u, len %-6.6d)\n", + ast_sockaddr_stringify(&addr), + payloadtype, seqno, timestamp, res - hdrlen); + } + AST_LIST_HEAD_INIT_NOLOCK(&frames); /* Only non-bundled instances can change/learn the remote's SSRC implicitly. */ @@ -6433,280 +6865,280 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc AST_LIST_INSERT_TAIL(&frames, f, frame_list); rtp->seedrxseqno = 0; - rtp->rxcount = 0; - rtp->rxoctetcount = 0; - rtp->cycles = 0; - rtp->lastrxseqno = 0; - rtp->last_seqno = 0; - rtp->last_end_timestamp = 0; - if (rtp->rtcp) { - rtp->rtcp->expected_prior = 0; - rtp->rtcp->received_prior = 0; - } - } - - rtp->themssrc = ssrc; /* Record their SSRC to put in future RR */ - rtp->themssrc_valid = 1; - } - - /* Remove any padding bytes that may be present */ - if (padding) { - res -= read_area[res - 1]; - } - - /* Skip over any CSRC fields */ - if (cc) { - hdrlen += cc * 4; - } - - /* Look for any RTP extensions, currently we do not support any */ - if (ext) { - hdrlen += (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2; - hdrlen += 4; - if (DEBUG_ATLEAST(1)) { - unsigned int profile; - profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16; - if (profile == 0x505a) { - ast_log(LOG_DEBUG, "Found Zfone extension in RTP stream - zrtp - not supported.\n"); - } else { - ast_log(LOG_DEBUG, "Found unknown RTP Extensions %x\n", profile); + rtp->rxcount = 0; + rtp->rxoctetcount = 0; + rtp->cycles = 0; + rtp->lastrxseqno = 0; + rtp->last_seqno = 0; + rtp->last_end_timestamp = 0; + if (rtp->rtcp) { + rtp->rtcp->expected_prior = 0; + rtp->rtcp->received_prior = 0; } } - } - /* Make sure after we potentially mucked with the header length that it is once again valid */ - if (res < hdrlen) { - ast_log(LOG_WARNING, "RTP Read too short (%d, expecting %d\n", res, hdrlen); - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + rtp->themssrc = ssrc; /* Record their SSRC to put in future RR */ + rtp->themssrc_valid = 1; } - rtp->rxcount++; - rtp->rxoctetcount += (res - hdrlen); - if (rtp->rxcount == 1) { - rtp->seedrxseqno = seqno; - } + prev_seqno = rtp->lastrxseqno; + rtp->lastrxseqno = seqno; - /* Do not schedule RR if RTCP isn't run */ - if (rtp->rtcp && !ast_sockaddr_isnull(&rtp->rtcp->them) && rtp->rtcp->schedid < 0) { - /* Schedule transmission of Receiver Report */ - ao2_ref(instance, +1); - rtp->rtcp->schedid = ast_sched_add(rtp->sched, ast_rtcp_calc_interval(rtp), ast_rtcp_write, instance); - if (rtp->rtcp->schedid < 0) { - ao2_ref(instance, -1); - ast_log(LOG_WARNING, "scheduling RTCP transmission failed.\n"); + if (!rtp->recv_buffer) { + /* If there is no receive buffer then we can pass back the frame directly */ + return ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); + } else if (rtp->expectedrxseqno == -1 || seqno == rtp->expectedrxseqno) { + rtp->expectedrxseqno = seqno + 1; + + /* If there are no buffered packets that will be placed after this frame then we can + * return it directly without duplicating it. + */ + if (!ast_data_buffer_count(rtp->recv_buffer)) { + return ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); } - } - if ((int)rtp->lastrxseqno - (int)seqno > 100) /* if so it would indicate that the sender cycled; allow for misordering */ - rtp->cycles += RTP_SEQ_MOD; - prev_seqno = rtp->lastrxseqno; - rtp->lastrxseqno = seqno; + if (!AST_VECTOR_REMOVE_CMP_ORDERED(&rtp->missing_seqno, seqno, find_by_value, + AST_VECTOR_ELEM_CLEANUP_NOOP)) { + ast_debug(2, "Packet with sequence number '%d' on RTP instance '%p' is no longer missing\n", + seqno, instance); + } + /* If we don't have the next packet after this we can directly return the frame, as there is no + * chance it will be overwritten. + */ + if (!ast_data_buffer_get(rtp->recv_buffer, seqno + 1)) { + return ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); + } - /* If we are directly bridged to another instance send the audio directly out, - * but only after updating core information about the received traffic so that - * outgoing RTCP reflects it. - */ - instance1 = ast_rtp_instance_get_bridged(instance); - if (instance1 - && !bridge_p2p_rtp_write(instance, instance1, rtpheader, res, hdrlen)) { - struct timeval rxtime; - struct ast_frame *f; + /* Otherwise we need to dupe the frame so that the potential processing of frames placed after + * it do not overwrite the data. You may be thinking that we could just add the current packet + * to the head of the frames list and avoid having to duplicate it but this would result in out + * of order packet processing by libsrtp which we are trying to avoid. + */ + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, seqno - 1)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + } - /* Update statistics for jitter so they are correct in RTCP */ - calc_rxstamp(&rxtime, rtp, timestamp, mark); + /* Add any additional packets that we have buffered and that are available */ + while (ast_data_buffer_count(rtp->recv_buffer)) { + struct ast_rtp_rtcp_nack_payload *payload; - /* When doing P2P we don't need to raise any frames about SSRC change to the core */ - while ((f = AST_LIST_REMOVE_HEAD(&frames, frame_list)) != NULL) { - ast_frfree(f); + payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_remove(rtp->recv_buffer, rtp->expectedrxseqno); + if (!payload) { + break; + } + + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, rtp->expectedrxseqno - 1)); + ast_free(payload); + + if (!frame) { + /* If this packet can't be interpeted due to being out of memory we return what we have and assume + * that we will determine it is a missing packet later and NACK for it. + */ + return AST_LIST_FIRST(&frames); + } + + ast_debug(2, "Pulled buffered packet with sequence number '%d' to additionally return on RTP instance '%p'\n", + frame->seqno, instance); + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + rtp->expectedrxseqno++; } - return &ast_null_frame; - } + return AST_LIST_FIRST(&frames); + } else if ((abs(seqno - rtp->expectedrxseqno) > 100) || + ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer)) { + int inserted = 0; - if (rtp_debug_test_addr(&addr)) { - ast_verbose("Got RTP packet from %s (type %-2.2d, seq %-6.6u, ts %-6.6u, len %-6.6d)\n", - ast_sockaddr_stringify(&addr), - payloadtype, seqno, timestamp,res - hdrlen); - } + /* We have a large number of outstanding buffered packets or we've jumped far ahead in time. + * To compensate we dump what we have in the buffer and place the current packet in a logical + * spot. In the case of video we also require a full frame to give the decoding side a fighting + * chance. + */ - payload = ast_rtp_codecs_get_payload(ast_rtp_instance_get_codecs(instance), payloadtype); - if (!payload) { - /* Unknown payload type. */ - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; - } + if (rtp->rtp_source_learn.stream_type == AST_MEDIA_TYPE_VIDEO) { + ast_debug(2, "Source on RTP instance '%p' has wild gap or packet loss, sending FIR\n", + instance); + rtp_write_rtcp_fir(instance, rtp, &remote_address); + } - /* If the payload is not actually an Asterisk one but a special one pass it off to the respective handler */ - if (!payload->asterisk_format) { - struct ast_frame *f = NULL; - if (payload->rtp_code == AST_RTP_DTMF) { - /* process_dtmf_rfc2833 may need to return multiple frames. We do this - * by passing the pointer to the frame list to it so that the method - * can append frames to the list as needed. + while (ast_data_buffer_count(rtp->recv_buffer)) { + struct ast_rtp_rtcp_nack_payload *payload; + + payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_remove_head(rtp->recv_buffer); + if (!payload) { + continue; + } + + /* Even when dumping the receive buffer we do our best to order things, so we ensure that the + * packet we just received is processed in the correct order, so see if we need to insert it now. */ - process_dtmf_rfc2833(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, &addr, payloadtype, mark, &frames); - } else if (payload->rtp_code == AST_RTP_CISCO_DTMF) { - f = process_dtmf_cisco(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, &addr, payloadtype, mark); - } else if (payload->rtp_code == AST_RTP_CN) { - f = process_cn_rfc3389(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, &addr, payloadtype, mark); - } else { - ast_log(LOG_NOTICE, "Unknown RTP codec %d received from '%s'\n", - payloadtype, - ast_sockaddr_stringify(&remote_address)); + if (!inserted) { + int buffer_seqno; + + buffer_seqno = ntohl(payload->buf[0]) & 0xffff; + if (seqno < buffer_seqno) { + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + rtp->expectedrxseqno = seqno + 1; + prev_seqno = seqno; + ast_debug(2, "Inserted just received packet with sequence number '%d' in correct order on RTP instance '%p'\n", + seqno, instance); + } + inserted = 1; + } + } + + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, prev_seqno)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + prev_seqno = frame->seqno; + ast_debug(2, "Emptying queue and returning packet with sequence number '%d' from RTP instance '%p'\n", + frame->seqno, instance); + } + + ast_free(payload); } - if (f) { - AST_LIST_INSERT_TAIL(&frames, f, frame_list); + if (!inserted) { + /* This current packet goes after them, and we assume that packets going forward will follow + * that new sequence number increment. It is okay for this to not be duplicated as it is guaranteed + * to be the last packet processed right now and it is also guaranteed that it will always return + * non-NULL. + */ + frame = ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + rtp->expectedrxseqno = seqno + 1; + + ast_debug(2, "Adding just received packet with sequence number '%d' to end of dumped queue on RTP instance '%p'\n", + seqno, instance); } - /* Even if no frame was returned by one of the above methods, - * we may have a frame to return in our frame list + + /* As there is such a large gap we don't want to flood the order side with missing packets, so we + * give up and start anew. */ - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; - } + AST_VECTOR_RESET(&rtp->missing_seqno, AST_VECTOR_ELEM_CLEANUP_NOOP); - ao2_replace(rtp->lastrxformat, payload->format); - ao2_replace(rtp->f.subclass.format, payload->format); - switch (ast_format_get_type(rtp->f.subclass.format)) { - case AST_MEDIA_TYPE_AUDIO: - rtp->f.frametype = AST_FRAME_VOICE; - break; - case AST_MEDIA_TYPE_VIDEO: - rtp->f.frametype = AST_FRAME_VIDEO; - break; - case AST_MEDIA_TYPE_TEXT: - rtp->f.frametype = AST_FRAME_TEXT; - break; - case AST_MEDIA_TYPE_IMAGE: - /* Fall through */ - default: - ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", - ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); + return AST_LIST_FIRST(&frames); + } else if (seqno < rtp->expectedrxseqno) { + /* If this is a packet from the past then we have received a duplicate packet, so just drop it */ + ast_debug(2, "Received an old packet with sequence number '%d' on RTP instance '%p', dropping it\n", + seqno, instance); return &ast_null_frame; - } - rtp->rxseqno = seqno; - - if (rtp->dtmf_timeout && rtp->dtmf_timeout < timestamp) { - rtp->dtmf_timeout = 0; - - if (rtp->resp) { - struct ast_frame *f; - f = create_dtmf_frame(instance, AST_FRAME_DTMF_END, 0); - f->len = ast_tvdiff_ms(ast_samp2tv(rtp->dtmf_duration, rtp_get_rate(f->subclass.format)), ast_tv(0, 0)); - rtp->resp = 0; - rtp->dtmf_timeout = rtp->dtmf_duration = 0; - AST_LIST_INSERT_TAIL(&frames, f, frame_list); - return AST_LIST_FIRST(&frames); + } else if (ast_data_buffer_get(rtp->recv_buffer, seqno)) { + /* If this is a packet we already have buffered then it is a duplicate, so just drop it */ + ast_debug(2, "Received a duplicate transmission of packet with sequence number '%d' on RTP instance '%p', dropping it\n", + seqno, instance); + return &ast_null_frame; + } else { + /* This is an out of order packet from the future */ + struct ast_rtp_rtcp_nack_payload *payload; + int difference; + + ast_debug(2, "Received an out of order packet with sequence number '%d' from the future on RTP instance '%p'\n", + seqno, instance); + + payload = ast_malloc(sizeof(*payload) + res); + if (!payload) { + /* If the payload can't be allocated then we can't defer this packet right now. + * Instead of dumping what we have we pretend we lost this packet. It will then + * get NACKed later or the existing buffer will be returned entirely. Well, we may + * try since we're seemingly out of memory. It's a bad situation all around and + * packets are likely to get lost anyway. + */ + return &ast_null_frame; } - } - rtp->lastrxts = timestamp; + payload->size = res; + memcpy(payload->buf, rtpheader, res); + ast_data_buffer_put(rtp->recv_buffer, seqno, payload); + AST_VECTOR_REMOVE_CMP_ORDERED(&rtp->missing_seqno, seqno, find_by_value, + AST_VECTOR_ELEM_CLEANUP_NOOP); - rtp->f.src = "RTP"; - rtp->f.mallocd = 0; - rtp->f.datalen = res - hdrlen; - rtp->f.data.ptr = read_area + hdrlen; - rtp->f.offset = hdrlen + AST_FRIENDLY_OFFSET; - ast_set_flag(&rtp->f, AST_FRFLAG_HAS_SEQUENCE_NUMBER); - rtp->f.seqno = seqno; - rtp->f.stream_num = rtp->stream_num; + difference = seqno - (prev_seqno + 1); + while (difference > 0) { + /* We don't want missing sequence number duplicates. If, for some reason, + * packets are really out of order, we could end up in this scenario: + * + * We are expecting sequence number 100 + * We receive sequence number 105 + * Sequence numbers 100 through 104 get added to the vector + * We receive sequence number 101 (this section is skipped) + * We receive sequence number 103 + * Sequence number 102 is added to the vector + * + * This will prevent the duplicate from being added. + */ + if (AST_VECTOR_GET_CMP(&rtp->missing_seqno, seqno - difference, + find_by_value)) { + difference--; + continue; + } - if ((ast_format_cmp(rtp->f.subclass.format, ast_format_t140) == AST_FORMAT_CMP_EQUAL) - && ((int)seqno - (prev_seqno + 1) > 0) - && ((int)seqno - (prev_seqno + 1) < 10)) { - unsigned char *data = rtp->f.data.ptr; + ast_debug(2, "Added missing sequence number '%d' to RTP instance '%p'\n", + seqno - difference, instance); + AST_VECTOR_ADD_SORTED(&rtp->missing_seqno, seqno - difference, + compare_by_value); + difference--; + } - memmove(rtp->f.data.ptr+3, rtp->f.data.ptr, rtp->f.datalen); - rtp->f.datalen +=3; - *data++ = 0xEF; - *data++ = 0xBF; - *data = 0xBD; - } + /* When our data buffer is half full we assume that the packets aren't just out of order but + * have actually been lost. To get them back we construct and send a NACK causing the sender to + * retransmit them. + */ + if (ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer) / 2) { + int packet_len = 0; + int res = 0; + int ice; + int sr; + size_t data_size = AST_UUID_STR_LEN + 128 + (seqno - rtp->expectedrxseqno) / 17; + RAII_VAR(unsigned char *, rtcpheader, NULL, ast_free_ptr); + RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report, + ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0), + ao2_cleanup); + + rtcpheader = ast_malloc(sizeof(*rtcpheader) + data_size); + if (!rtcpheader) { + ast_debug(1, "Failed to allocate memory for NACK\n"); + return &ast_null_frame; + } - if (ast_format_cmp(rtp->f.subclass.format, ast_format_t140_red) == AST_FORMAT_CMP_EQUAL) { - unsigned char *data = rtp->f.data.ptr; - unsigned char *header_end; - int num_generations; - int header_length; - int len; - int diff =(int)seqno - (prev_seqno+1); /* if diff = 0, no drop*/ - int x; + memset(rtcpheader, 0, data_size); - ao2_replace(rtp->f.subclass.format, ast_format_t140); - header_end = memchr(data, ((*data) & 0x7f), rtp->f.datalen); - if (header_end == NULL) { - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; - } - header_end++; + res = ast_rtcp_generate_report(instance, rtcpheader, rtcp_report, &sr); - header_length = header_end - data; - num_generations = header_length / 4; - len = header_length; + if (res == 0 || res == 1) { + ast_debug(1, "Failed to add %s report to NACK, stopping here\n", sr ? "SR" : "RR"); + return &ast_null_frame; + } - if (!diff) { - for (x = 0; x < num_generations; x++) - len += data[x * 4 + 3]; + packet_len += res; - if (!(rtp->f.datalen - len)) - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + res = ast_rtcp_generate_nack(instance, rtcpheader + packet_len); - rtp->f.data.ptr += len; - rtp->f.datalen -= len; - } else if (diff > num_generations && diff < 10) { - len -= 3; - rtp->f.data.ptr += len; - rtp->f.datalen -= len; + if (res == 0) { + ast_debug(1, "Failed to construct NACK, stopping here\n"); + return &ast_null_frame; + } - data = rtp->f.data.ptr; - *data++ = 0xEF; - *data++ = 0xBF; - *data = 0xBD; - } else { - for ( x = 0; x < num_generations - diff; x++) - len += data[x * 4 + 3]; + packet_len += res; - rtp->f.data.ptr += len; - rtp->f.datalen -= len; - } - } + res = rtcp_sendto(instance, rtcpheader, packet_len, 0, &remote_address, &ice); + if (res < 0) { + ast_debug(1, "Failed to send NACK request out\n"); + } else { + /* Update RTCP SR/RR statistics */ + ast_rtcp_calculate_sr_rr_statistics(instance, rtcp_report, remote_address, ice, sr); + } - if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_AUDIO) { - rtp->f.samples = ast_codec_samples_count(&rtp->f); - if (ast_format_cache_is_slinear(rtp->f.subclass.format)) { - ast_frame_byteswap_be(&rtp->f); + ast_debug(2, "Sending a NACK request on RTP instance '%p' to get missing packets\n", instance); } - calc_rxstamp(&rtp->f.delivery, rtp, timestamp, mark); - /* Add timing data to let ast_generic_bridge() put the frame into a jitterbuf */ - ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); - rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); - rtp->f.len = rtp->f.samples / ((ast_format_get_sample_rate(rtp->f.subclass.format) / 1000)); - } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_VIDEO) { - /* Video -- samples is # of samples vs. 90000 */ - if (!rtp->lastividtimestamp) - rtp->lastividtimestamp = timestamp; - ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); - rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); - rtp->f.samples = timestamp - rtp->lastividtimestamp; - rtp->lastividtimestamp = timestamp; - rtp->f.delivery.tv_sec = 0; - rtp->f.delivery.tv_usec = 0; - /* Pass the RTP marker bit as bit */ - rtp->f.subclass.frame_ending = mark ? 1 : 0; - } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_TEXT) { - /* TEXT -- samples is # of samples vs. 1000 */ - if (!rtp->lastitexttimestamp) - rtp->lastitexttimestamp = timestamp; - rtp->f.samples = timestamp - rtp->lastitexttimestamp; - rtp->lastitexttimestamp = timestamp; - rtp->f.delivery.tv_sec = 0; - rtp->f.delivery.tv_usec = 0; - } else { - ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", - ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); - return &ast_null_frame;; + + return &ast_null_frame; } - AST_LIST_INSERT_TAIL(&frames, &rtp->f, frame_list); - return AST_LIST_FIRST(&frames); + return &ast_null_frame; } /*! \pre instance is locked */ @@ -6857,7 +7289,10 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro } else if (property == AST_RTP_PROPERTY_ASYMMETRIC_CODEC) { rtp->asymmetric_codec = value; } else if (property == AST_RTP_PROPERTY_RETRANS_SEND) { - rtp->send_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_BUFFER_SIZE); + rtp->send_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_SEND_BUFFER_SIZE); + } else if (property == AST_RTP_PROPERTY_RETRANS_RECV) { + rtp->recv_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_RECV_BUFFER_SIZE); + AST_VECTOR_INIT(&rtp->missing_seqno, 0); } } diff --git a/tests/test_data_buffer.c b/tests/test_data_buffer.c index 11fdc7b6f5..93c2c0612a 100644 --- a/tests/test_data_buffer.c +++ b/tests/test_data_buffer.c @@ -217,6 +217,7 @@ AST_TEST_DEFINE(buffer_resize) AST_TEST_DEFINE(buffer_nominal) { RAII_VAR(struct ast_data_buffer *, buffer, NULL, ast_data_buffer_free_wrapper); + RAII_VAR(struct mock_payload *, removed_payload, NULL, ast_free_ptr); struct mock_payload *payload; struct mock_payload *fetched_payload; int ret; @@ -247,6 +248,9 @@ AST_TEST_DEFINE(buffer_nominal) "Failed to allocate memory for payload %d", i); ret = ast_data_buffer_put(buffer, i, payload); + if (ret) { + ast_free(payload); + } ast_test_validate(test, ret == 0, "Failed to add payload %d to buffer", i); @@ -268,7 +272,11 @@ AST_TEST_DEFINE(buffer_nominal) ast_test_validate(test, payload != NULL, "Failed to allocate memory for payload %d", i + BUFFER_MAX_NOMINAL); + payload->id = i; ret = ast_data_buffer_put(buffer, i + BUFFER_MAX_NOMINAL, payload); + if (ret) { + ast_free(payload); + } ast_test_validate(test, ret == 0, "Failed to add payload %d to buffer", i + BUFFER_MAX_NOMINAL); @@ -289,6 +297,30 @@ AST_TEST_DEFINE(buffer_nominal) "Failed to get payload at position %d during second loop", i + BUFFER_MAX_NOMINAL); } + removed_payload = (struct mock_payload *)ast_data_buffer_remove_head(buffer); + + ast_test_validate(test, removed_payload != NULL, + "Failed to get the payload at the HEAD of the buffer"); + + ast_test_validate(test, ast_data_buffer_count(buffer) == BUFFER_MAX_NOMINAL - 1, + "Removing payload from HEAD of buffer did not decrease buffer size"); + + ast_test_validate(test, removed_payload->id == 1, + "Removing payload from HEAD of buffer did not return expected payload"); + + ast_free(removed_payload); + + removed_payload = (struct mock_payload *)ast_data_buffer_remove(buffer, BUFFER_MAX_NOMINAL * 2); + + ast_test_validate(test, removed_payload != NULL, + "Failed to get payload at position %d from buffer", BUFFER_MAX_NOMINAL * 2); + + ast_test_validate(test, ast_data_buffer_count(buffer) == BUFFER_MAX_NOMINAL - 2, + "Removing payload from buffer did not decrease buffer size"); + + ast_test_validate(test, removed_payload->id == BUFFER_MAX_NOMINAL, + "Removing payload from buffer did not return expected payload"); + return AST_TEST_PASS; } -- GitLab