diff --git a/include/asterisk/data_buffer.h b/include/asterisk/data_buffer.h index dacbaa5e4eeaa965f1daa88093d3827ae05c1725..66aad2722abe02e50904102ebbd3470c4b6d2ecb 100644 --- a/include/asterisk/data_buffer.h +++ b/include/asterisk/data_buffer.h @@ -110,6 +110,35 @@ int ast_data_buffer_put(struct ast_data_buffer *buffer, size_t pos, void *payloa */ void *ast_data_buffer_get(const struct ast_data_buffer *buffer, size_t pos); +/*! + * \brief Remove a data payload from the data buffer + * + * \param buffer The data buffer + * \param pos The position of the data payload + * + * \retval non-NULL success + * \retval NULL failure + * + * \note This DOES remove the data payload from the data buffer. It does not free it, though. + * + * \since 15.5.0 + */ +void *ast_data_buffer_remove(struct ast_data_buffer *buffer, size_t pos); + +/*! + * \brief Remove the first payload from the data buffer + * + * \param buffer The data buffer + * + * \retval non-NULL success + * \retval NULL failure + * + * \note This DOES remove the data payload from the data buffer. + * + * \since 15.5.0 + */ +void *ast_data_buffer_remove_head(struct ast_data_buffer *buffer); + /*! * \brief Free a data buffer (and all held data payloads) * diff --git a/main/data_buffer.c b/main/data_buffer.c index ccbffd22dae62528ee7ffdc350bd1cf8ad50f213..cfc323c680d5ac8d5266501e952471aced937c4b 100644 --- a/main/data_buffer.c +++ b/main/data_buffer.c @@ -281,6 +281,60 @@ void *ast_data_buffer_get(const struct ast_data_buffer *buffer, size_t pos) return NULL; } +static void data_buffer_free_buffer_payload(struct ast_data_buffer *buffer, + struct data_buffer_payload_entry *buffer_payload) +{ + buffer_payload->payload = NULL; + buffer->count--; + + if (buffer->cache_count < CACHED_PAYLOAD_MAX + && buffer->cache_count < (buffer->max - buffer->count)) { + AST_LIST_INSERT_TAIL(&buffer->cached_payloads, buffer_payload, list); + buffer->cache_count++; + } else { + ast_free(buffer_payload); + } +} + +void *ast_data_buffer_remove(struct ast_data_buffer *buffer, size_t pos) +{ + struct data_buffer_payload_entry *buffer_payload; + + ast_assert(buffer != NULL); + + AST_LIST_TRAVERSE_SAFE_BEGIN(&buffer->payloads, buffer_payload, list) { + if (buffer_payload->pos == pos) { + void *payload = buffer_payload->payload; + + AST_LIST_REMOVE_CURRENT(list); + data_buffer_free_buffer_payload(buffer, buffer_payload); + + return payload; + } + } + AST_LIST_TRAVERSE_SAFE_END; + + return NULL; +} + +void *ast_data_buffer_remove_head(struct ast_data_buffer *buffer) +{ + ast_assert(buffer != NULL); + + if (buffer->count > 0) { + struct data_buffer_payload_entry *buffer_payload; + void *payload; + + buffer_payload = AST_LIST_REMOVE_HEAD(&buffer->payloads, list); + payload = buffer_payload->payload; + data_buffer_free_buffer_payload(buffer, buffer_payload); + + return payload; + } + + return NULL; +} + void ast_data_buffer_free(struct ast_data_buffer *buffer) { struct data_buffer_payload_entry *buffer_payload; diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c index f6e26d6ae55a51516a5814beedca59593422f601..36b9cb9a41a277d49d58b0c86bb8c2d8d0c23432 100644 --- a/res/res_rtp_asterisk.c +++ b/res/res_rtp_asterisk.c @@ -103,7 +103,8 @@ #define TURN_STATE_WAIT_TIME 2000 -#define DEFAULT_RTP_BUFFER_SIZE 250 +#define DEFAULT_RTP_SEND_BUFFER_SIZE 250 +#define DEFAULT_RTP_RECV_BUFFER_SIZE 20 /*! Full INTRA-frame Request / Fast Update Request (From RFC2032) */ #define RTCP_PT_FUR 192 @@ -323,6 +324,8 @@ struct ast_rtp { unsigned int lastotexttimestamp; unsigned int lasteventseqn; int lastrxseqno; /*!< Last received sequence number, from the network */ + int expectedrxseqno; /*!< Next expected sequence number, from the network */ + AST_VECTOR(, int) missing_seqno; /*!< A vector of sequence numbers we never received */ int expectedseqno; /*!< Next expected sequence number, from the core */ unsigned short seedrxseqno; /*!< What sequence number did they start with?*/ unsigned int seedrxts; /*!< What RTP timestamp did they start with? */ @@ -388,6 +391,7 @@ struct ast_rtp { struct rtp_red *red; struct ast_data_buffer *send_buffer; /*!< Buffer for storing sent packets for retransmission */ + struct ast_data_buffer *recv_buffer; /*!< Buffer for storing received packets for retransmission */ #ifdef HAVE_PJPROJECT ast_cond_t cond; /*!< ICE/TURN condition for signaling */ @@ -2771,6 +2775,18 @@ static int dtls_srtp_setup(struct ast_rtp *rtp, struct ast_srtp *srtp, struct as } #endif +/*! \brief Helper function to compare an elem in a vector by value */ +static int compare_by_value(int elem, int value) +{ + return elem - value; +} + +/*! \brief Helper function to find an elem in a vector by value */ +static int find_by_value(int elem, int value) +{ + return elem == value; +} + static int rtcp_mux(struct ast_rtp *rtp, const unsigned char *packet) { uint8_t version; @@ -2941,11 +2957,6 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s } #endif - if ((*in & 0xC0) && res_srtp && srtp && res_srtp->unprotect( - srtp, buf, &len, rtcp || rtcp_mux(rtp, buf)) < 0) { - return -1; - } - return len; } @@ -3629,6 +3640,7 @@ static int ast_rtp_new(struct ast_rtp_instance *instance, rtp->ssrc = ast_random(); ast_uuid_generate_str(rtp->cname, sizeof(rtp->cname)); rtp->seqno = ast_random() & 0x7fff; + rtp->expectedrxseqno = -1; rtp->expectedseqno = -1; rtp->sched = sched; ast_sockaddr_copy(&rtp->bind_address, addr); @@ -3713,10 +3725,16 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance) ast_data_buffer_free(rtp->send_buffer); } + /* Destroy the recv buffer if it was being used */ + if (rtp->recv_buffer) { + ast_data_buffer_free(rtp->recv_buffer); + } + ao2_cleanup(rtp->lasttxformat); ao2_cleanup(rtp->lastrxformat); ao2_cleanup(rtp->f.subclass.format); AST_VECTOR_FREE(&rtp->ssrc_mapping); + AST_VECTOR_FREE(&rtp->missing_seqno); /* Finally destroy ourselves */ ast_free(rtp); @@ -3983,6 +4001,9 @@ static void ast_rtp_change_source(struct ast_rtp_instance *instance) rtp->ssrc = ssrc; + /* Since the source is changing, we don't know what sequence number to expect next */ + rtp->expectedrxseqno = -1; + return; } @@ -4078,39 +4099,24 @@ static void calculate_lost_packet_statistics(struct ast_rtp *rtp, rtp->rtcp->rxlost_count++; } -/*! - * \brief Send RTCP SR or RR report - * - * \pre instance is locked - */ -static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) +static int ast_rtcp_generate_report(struct ast_rtp_instance *instance, unsigned char *rtcpheader, + struct ast_rtp_rtcp_report *rtcp_report, int *sr) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); - RAII_VAR(struct ast_json *, message_blob, NULL, ast_json_unref); - int res; int len = 0; - uint16_t sdes_packet_len_bytes, sdes_packet_len_rounded; struct timeval now; unsigned int now_lsw; unsigned int now_msw; - unsigned char *rtcpheader; unsigned int lost_packets; int fraction_lost; struct timeval dlsr = { 0, }; - unsigned char bdata[AST_UUID_STR_LEN + 128] = ""; /* More than enough */ - int rate = rtp_get_rate(rtp->f.subclass.format); - int ice; - struct ast_sockaddr remote_address = { { 0, } }; struct ast_rtp_rtcp_report_block *report_block = NULL; - RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report, - ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0), - ao2_cleanup); if (!rtp || !rtp->rtcp) { return 0; } - if (ast_sockaddr_isnull(&rtp->rtcp->them)) { /* This'll stop rtcp for this rtp session */ + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { /* This'll stop rtcp for this rtp session */ /* RTCP was stopped. */ return 0; } @@ -4119,14 +4125,16 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) return 1; } + *sr = rtp->txcount > rtp->rtcp->lastsrtxcount ? 1 : 0; + /* Compute statistics */ calculate_lost_packet_statistics(rtp, &lost_packets, &fraction_lost); gettimeofday(&now, NULL); rtcp_report->reception_report_count = rtp->themssrc_valid ? 1 : 0; rtcp_report->ssrc = rtp->ssrc; - rtcp_report->type = sr ? RTCP_PT_SR : RTCP_PT_RR; - if (sr) { + rtcp_report->type = *sr ? RTCP_PT_SR : RTCP_PT_RR; + if (*sr) { rtcp_report->sender_information.ntp_timestamp = now; rtcp_report->sender_information.rtp_timestamp = rtp->lastts; rtcp_report->sender_information.packet_count = rtp->txcount; @@ -4144,7 +4152,7 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) report_block->lost_count.fraction = (fraction_lost & 0xff); report_block->lost_count.packets = (lost_packets & 0xffffff); report_block->highest_seq_no = (rtp->cycles | (rtp->lastrxseqno & 0xffff)); - report_block->ia_jitter = (unsigned int)(rtp->rxjitter * rate); + report_block->ia_jitter = (unsigned int)(rtp->rxjitter * rtp_get_rate(rtp->f.subclass.format)); report_block->lsr = rtp->rtcp->themrxlsr; /* If we haven't received an SR report, DLSR should be 0 */ if (!ast_tvzero(rtp->rtcp->rxlsr)) { @@ -4153,11 +4161,10 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) } } timeval2ntp(rtcp_report->sender_information.ntp_timestamp, &now_msw, &now_lsw); - rtcpheader = bdata; put_unaligned_uint32(rtcpheader + 4, htonl(rtcp_report->ssrc)); /* Our SSRC */ len += 8; - if (sr) { - put_unaligned_uint32(rtcpheader + len, htonl(now_msw)); /* now, MSW. gettimeofday() + SEC_BETWEEN_1900_AND_1970*/ + if (*sr) { + put_unaligned_uint32(rtcpheader + len, htonl(now_msw)); /* now, MSW. gettimeofday() + SEC_BETWEEN_1900_AND_1970 */ put_unaligned_uint32(rtcpheader + len + 4, htonl(now_lsw)); /* now, LSW */ put_unaligned_uint32(rtcpheader + len + 8, htonl(rtcp_report->sender_information.rtp_timestamp)); put_unaligned_uint32(rtcpheader + len + 12, htonl(rtcp_report->sender_information.packet_count)); @@ -4175,53 +4182,32 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) } put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (rtcp_report->reception_report_count << 24) - | ((sr ? RTCP_PT_SR : RTCP_PT_RR) << 16) | ((len/4)-1))); - - sdes_packet_len_bytes = - 4 + /* RTCP Header */ - 4 + /* SSRC */ - 1 + /* Type (CNAME) */ - 1 + /* Text Length */ - AST_UUID_STR_LEN /* Text and NULL terminator */ - ; - - /* Round to 32 bit boundary */ - sdes_packet_len_rounded = (sdes_packet_len_bytes + 3) & ~0x3; + | ((*sr ? RTCP_PT_SR : RTCP_PT_RR) << 16) | ((len/4)-1))); - put_unaligned_uint32(rtcpheader + len, htonl((2 << 30) | (1 << 24) | (RTCP_PT_SDES << 16) | ((sdes_packet_len_rounded / 4) - 1))); - put_unaligned_uint32(rtcpheader + len + 4, htonl(rtcp_report->ssrc)); - rtcpheader[len + 8] = 0x01; /* CNAME */ - rtcpheader[len + 9] = AST_UUID_STR_LEN - 1; /* Number of bytes of text */ - memcpy(rtcpheader + len + 10, rtp->cname, AST_UUID_STR_LEN); - len += 10 + AST_UUID_STR_LEN; + return len; +} - /* Padding - Note that we don't set the padded bit on the packet. From - * RFC 3550 Section 6.5: - * - * No length octet follows the null item type octet, but additional null - * octets MUST be included if needed to pad until the next 32-bit - * boundary. Note that this padding is separate from that indicated by - * the P bit in the RTCP header. - * - * These bytes will already be zeroed out during array initialization. - */ - len += (sdes_packet_len_rounded - sdes_packet_len_bytes); +static int ast_rtcp_calculate_sr_rr_statistics(struct ast_rtp_instance *instance, + struct ast_rtp_rtcp_report *rtcp_report, struct ast_sockaddr remote_address, int ice, int sr) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + struct ast_rtp_rtcp_report_block *report_block = NULL; + RAII_VAR(struct ast_json *, message_blob, NULL, ast_json_unref); - if (rtp->bundled) { - ast_rtp_instance_get_remote_address(instance, &remote_address); - } else { - ast_sockaddr_copy(&remote_address, &rtp->rtcp->them); + if (!rtp || !rtp->rtcp) { + return 0; } - res = rtcp_sendto(instance, (unsigned int *)rtcpheader, len, 0, &remote_address, &ice); - if (res < 0) { - ast_log(LOG_ERROR, "RTCP %s transmission error to %s, rtcp halted %s\n", - sr ? "SR" : "RR", - ast_sockaddr_stringify(&rtp->rtcp->them), - strerror(errno)); + + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { return 0; } - /* Update RTCP SR/RR statistics */ + if (!rtcp_report) { + return -1; + } + + report_block = rtcp_report->report_block[0]; + if (sr) { rtp->rtcp->txlsr = rtcp_report->sender_information.ntp_timestamp; rtp->rtcp->sr_count++; @@ -4248,7 +4234,7 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) ast_verbose(" Fraction lost: %d\n", report_block->lost_count.fraction); ast_verbose(" Cumulative loss: %u\n", report_block->lost_count.packets); ast_verbose(" Highest seq no: %u\n", report_block->highest_seq_no); - ast_verbose(" IA jitter: %.4f\n", (double)report_block->ia_jitter / rate); + ast_verbose(" IA jitter: %.4f\n", (double)report_block->ia_jitter / rtp_get_rate(rtp->f.subclass.format)); ast_verbose(" Their last SR: %u\n", report_block->lsr); ast_verbose(" DLSR: %4.4f (sec)\n\n", (double)(report_block->dlsr / 65536.0)); } @@ -4258,9 +4244,121 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) "to", ast_sockaddr_stringify(&remote_address), "from", rtp->rtcp->local_addr_str); ast_rtp_publish_rtcp_message(instance, ast_rtp_rtcp_sent_type(), - rtcp_report, - message_blob); - return res; + rtcp_report, message_blob); + + return 1; +} + +static int ast_rtcp_generate_sdes(struct ast_rtp_instance *instance, unsigned char *rtcpheader, + struct ast_rtp_rtcp_report *rtcp_report) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + int len = 0; + uint16_t sdes_packet_len_bytes; + uint16_t sdes_packet_len_rounded; + + if (!rtp || !rtp->rtcp) { + return 0; + } + + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { + return 0; + } + + if (!rtcp_report) { + return -1; + } + + sdes_packet_len_bytes = + 4 + /* RTCP Header */ + 4 + /* SSRC */ + 1 + /* Type (CNAME) */ + 1 + /* Text Length */ + AST_UUID_STR_LEN /* Text and NULL terminator */ + ; + + /* Round to 32 bit boundary */ + sdes_packet_len_rounded = (sdes_packet_len_bytes + 3) & ~0x3; + + put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (1 << 24) | (RTCP_PT_SDES << 16) | ((sdes_packet_len_rounded / 4) - 1))); + put_unaligned_uint32(rtcpheader + 4, htonl(rtcp_report->ssrc)); + rtcpheader[8] = 0x01; /* CNAME */ + rtcpheader[9] = AST_UUID_STR_LEN - 1; /* Number of bytes of text */ + memcpy(rtcpheader + 10, rtp->cname, AST_UUID_STR_LEN); + len += 10 + AST_UUID_STR_LEN; + + /* Padding - Note that we don't set the padded bit on the packet. From + * RFC 3550 Section 6.5: + * + * No length octet follows the null item type octet, but additional null + * octets MUST be included if needd to pad until the next 32-bit + * boundary. Note that this padding is separate from that indicated by + * the P bit in the RTCP header. + * + * These bytes will already be zeroed out during array initialization. + */ + len += (sdes_packet_len_rounded - sdes_packet_len_bytes); + + return len; +} + +static int ast_rtcp_generate_nack(struct ast_rtp_instance *instance, unsigned char *rtcpheader) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + int packet_len; + int blp_index; + int current_seqno; + int seqno; + unsigned int fci; + + if (!rtp || !rtp->rtcp) { + return 0; + } + + if (ast_sockaddr_isnull(&rtp->rtcp->them)) { + return 0; + } + + current_seqno = rtp->expectedrxseqno; + seqno = rtp->lastrxseqno; + packet_len = 12; /* The header length is 12 (version line, packet source SSRC, media source SSRC) */ + + /* Get the missing sequence numbers for the FCI section of the NACK request */ + for (blp_index = 0, fci = 0; current_seqno < seqno; current_seqno++, blp_index++) { + int *missing_seqno; + + missing_seqno = AST_VECTOR_GET_CMP(&rtp->missing_seqno, current_seqno, + find_by_value); + + if (!missing_seqno) { + continue; + } + + /* We hit the max blp size, reset */ + if (blp_index >= 17) { + put_unaligned_uint32(rtcpheader + packet_len, htonl(fci)); + fci = 0; + blp_index = 0; + packet_len += 4; + } + + if (blp_index == 0) { + fci |= (current_seqno << 16); + } else { + fci |= (1 << (blp_index - 1)); + } + } + + put_unaligned_uint32(rtcpheader + packet_len, htonl(fci)); + packet_len += 4; + + /* Length MUST be 2+n, where n is the number of NACKs. Same as length in words minus 1 */ + put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (AST_RTP_RTCP_FMT_NACK << 24) + | (AST_RTP_RTCP_RTPFB << 16) | ((packet_len / 4) - 1))); + put_unaligned_uint32(rtcpheader + 4, htonl(rtp->ssrc)); + put_unaligned_uint32(rtcpheader + 8, htonl(rtp->themssrc)); + + return packet_len; } /*! @@ -4276,6 +4374,15 @@ static int ast_rtcp_write(const void *data) struct ast_rtp_instance *instance = (struct ast_rtp_instance *) data; struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); int res; + int sr = 0; + int packet_len = 0; + int ice; + struct ast_sockaddr remote_address = { { 0, } }; + unsigned char *rtcpheader; + unsigned char bdata[AST_UUID_STR_LEN + 128] = ""; /* More than enough */ + RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report, + ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0), + ao2_cleanup); if (!rtp || !rtp->rtcp || rtp->rtcp->schedid == -1) { ao2_ref(instance, -1); @@ -4283,13 +4390,44 @@ static int ast_rtcp_write(const void *data) } ao2_lock(instance); - if (rtp->txcount > rtp->rtcp->lastsrtxcount) { - /* Send an SR */ - res = ast_rtcp_write_report(instance, 1); + rtcpheader = bdata; + + res = ast_rtcp_generate_report(instance, rtcpheader, rtcp_report, &sr); + + if (res == 0 || res == 1) { + ast_debug(1, "Failed to add %s report to RTCP packet!\n", sr ? "SR" : "RR"); + goto cleanup; + } + + packet_len += res; + + res = ast_rtcp_generate_sdes(instance, rtcpheader + packet_len, rtcp_report); + + if (res == 0 || res == 1) { + ast_debug(1, "Failed to add SDES to RTCP packet!\n"); + goto cleanup; + } + + packet_len += res; + + if (rtp->bundled) { + ast_rtp_instance_get_remote_address(instance, &remote_address); + } else { + ast_sockaddr_copy(&remote_address, &rtp->rtcp->them); + } + + res = rtcp_sendto(instance, (unsigned int *)rtcpheader, packet_len, 0, &remote_address, &ice); + if (res < 0) { + ast_log(LOG_ERROR, "RTCP %s transmission error to %s, rtcp halted %s\n", + sr ? "SR" : "RR", + ast_sockaddr_stringify(&rtp->rtcp->them), + strerror(errno)); + res = 0; } else { - /* Send an RR */ - res = ast_rtcp_write_report(instance, 0); + ast_rtcp_calculate_sr_rr_statistics(instance, rtcp_report, remote_address, ice, sr); } + +cleanup: ao2_unlock(instance); if (!res) { @@ -4872,7 +5010,7 @@ static struct ast_frame *create_dtmf_frame(struct ast_rtp_instance *instance, en return &rtp->f; } -static void process_dtmf_rfc2833(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, struct ast_sockaddr *addr, int payloadtype, int mark, struct frame_list *frames) +static void process_dtmf_rfc2833(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, int payloadtype, int mark, struct frame_list *frames) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); struct ast_sockaddr remote_address = { {0,} }; @@ -5007,7 +5145,7 @@ static void process_dtmf_rfc2833(struct ast_rtp_instance *instance, unsigned cha return; } -static struct ast_frame *process_dtmf_cisco(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, struct ast_sockaddr *addr, int payloadtype, int mark) +static struct ast_frame *process_dtmf_cisco(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, int payloadtype, int mark) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); unsigned int event, flags, power; @@ -5087,7 +5225,7 @@ static struct ast_frame *process_dtmf_cisco(struct ast_rtp_instance *instance, u return f; } -static struct ast_frame *process_cn_rfc3389(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, struct ast_sockaddr *addr, int payloadtype, int mark) +static struct ast_frame *process_cn_rfc3389(struct ast_rtp_instance *instance, unsigned char *data, int len, unsigned int seqno, unsigned int timestamp, int payloadtype, int mark) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); @@ -5438,10 +5576,12 @@ static int ast_rtp_rtcp_handle_nack(struct ast_rtp_instance *instance, unsigned #define RTCP_FB_REMB_BLOCK_WORD_LENGTH 4 #define RTCP_FB_NACK_BLOCK_WORD_LENGTH 2 -static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, const unsigned char *rtcpdata, size_t size, struct ast_sockaddr *addr) +static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, struct ast_srtp *srtp, + const unsigned char *rtcpdata, size_t size, struct ast_sockaddr *addr) { struct ast_rtp_instance *transport = instance; struct ast_rtp *transport_rtp = ast_rtp_instance_get_data(instance); + int len = size; unsigned int *rtcpheader = (unsigned int *)(rtcpdata); unsigned int packetwords; unsigned int position; @@ -5451,10 +5591,16 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c struct ast_rtp_rtcp_report_block *report_block; struct ast_frame *f = &ast_null_frame; - packetwords = size / 4; + /* If this is encrypted then decrypt the payload */ + if ((*rtcpheader & 0xC0) && res_srtp && srtp && res_srtp->unprotect( + srtp, rtcpheader, &len, 1) < 0) { + return &ast_null_frame; + } + + packetwords = len / 4; - ast_debug(1, "Got RTCP report of %zu bytes from %s\n", - size, ast_sockaddr_stringify(addr)); + ast_debug(1, "Got RTCP report of %d bytes from %s\n", + len, ast_sockaddr_stringify(addr)); /* * Validate the RTCP packet according to an adapted and slightly @@ -5899,6 +6045,7 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c static struct ast_frame *ast_rtcp_read(struct ast_rtp_instance *instance) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + struct ast_srtp *srtp = ast_rtp_instance_get_srtp(instance, 1); struct ast_sockaddr addr; unsigned char rtcpdata[8192 + AST_FRIENDLY_OFFSET]; unsigned char *read_area = rtcpdata + AST_FRIENDLY_OFFSET; @@ -5950,7 +6097,7 @@ static struct ast_frame *ast_rtcp_read(struct ast_rtp_instance *instance) return &ast_null_frame; } - return ast_rtcp_interpret(instance, read_area, res, &addr); + return ast_rtcp_interpret(instance, srtp, read_area, res, &addr); } /*! \pre instance is locked */ @@ -6123,78 +6270,362 @@ static void rtp_instance_unlock(struct ast_rtp_instance *instance) } } -/*! \pre instance is locked */ -static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtcp) +static struct ast_frame *ast_rtp_interpret(struct ast_rtp_instance *instance, struct ast_srtp *srtp, + const struct ast_sockaddr *remote_address, unsigned char *read_area, int length, int prev_seqno) { + unsigned int *rtpheader = (unsigned int*)(read_area); struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); struct ast_rtp_instance *instance1; - RAII_VAR(struct ast_rtp_instance *, child, NULL, rtp_instance_unlock); - struct ast_sockaddr addr; - int res, hdrlen = 12, version, payloadtype, padding, mark, ext, cc, prev_seqno; - unsigned char *read_area = rtp->rawdata + AST_FRIENDLY_OFFSET; - size_t read_area_size = sizeof(rtp->rawdata) - AST_FRIENDLY_OFFSET; - unsigned int *rtpheader = (unsigned int*)(read_area), seqno, ssrc, timestamp; + int res = length, hdrlen = 12, seqno, timestamp, payloadtype, padding, mark, ext, cc; RAII_VAR(struct ast_rtp_payload_type *, payload, NULL, ao2_cleanup); - struct ast_sockaddr remote_address = { {0,} }; struct frame_list frames; - /* If this is actually RTCP let's hop on over and handle it */ - if (rtcp) { - if (rtp->rtcp && rtp->rtcp->type == AST_RTP_INSTANCE_RTCP_STANDARD) { - return ast_rtcp_read(instance); - } + /* If this payload is encrypted then decrypt it using the given SRTP instance */ + if ((*read_area & 0xC0) && res_srtp && srtp && res_srtp->unprotect( + srtp, read_area, &res, 0) < 0) { return &ast_null_frame; } - /* Actually read in the data from the socket */ - if ((res = rtp_recvfrom(instance, read_area, read_area_size, 0, - &addr)) < 0) { - if (res == RTP_DTLS_ESTABLISHED) { - rtp->f.frametype = AST_FRAME_CONTROL; - rtp->f.subclass.integer = AST_CONTROL_SRCCHANGE; - return &rtp->f; - } - - ast_assert(errno != EBADF); - if (errno != EAGAIN) { - ast_log(LOG_WARNING, "RTP Read error: %s. Hanging up.\n", - (errno) ? strerror(errno) : "Unspecified"); - return NULL; - } - return &ast_null_frame; + /* If we are currently sending DTMF to the remote party send a continuation packet */ + if (rtp->sending_digit) { + ast_rtp_dtmf_continuation(instance); } - /* If this was handled by the ICE session don't do anything */ - if (!res) { - return &ast_null_frame; + /* Pull out the various other fields we will need */ + seqno = ntohl(rtpheader[0]); + payloadtype = (seqno & 0x7f0000) >> 16; + padding = seqno & (1 << 29); + mark = seqno & (1 << 23); + ext = seqno & (1 << 28); + cc = (seqno & 0xF000000) >> 24; + seqno &= 0xffff; + timestamp = ntohl(rtpheader[1]); + + AST_LIST_HEAD_INIT_NOLOCK(&frames); + + /* Remove any padding bytes that may be present */ + if (padding) { + res -= read_area[res - 1]; } - /* This could be a multiplexed RTCP packet. If so, be sure to interpret it correctly */ - if (rtcp_mux(rtp, read_area)) { - return ast_rtcp_interpret(instance, read_area, res, &addr); + /* Skip over any CSRC fields */ + if (cc) { + hdrlen += cc * 4; } - /* Make sure the data that was read in is actually enough to make up an RTP packet */ - if (res < hdrlen) { - /* If this is a keepalive containing only nulls, don't bother with a warning */ - int i; - for (i = 0; i < res; ++i) { - if (read_area[i] != '\0') { - ast_log(LOG_WARNING, "RTP Read too short\n"); - return &ast_null_frame; + /* Look for any RTP extensions, currently we do not support any */ + if (ext) { + hdrlen += (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2; + hdrlen += 4; + if (DEBUG_ATLEAST(1)) { + unsigned int profile; + profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16; + if (profile == 0x505a) { + ast_log(LOG_DEBUG, "Found Zfone extension in RTP stream - zrtp - not supported.\n"); + } else if (profile != 0xbede) { + /* SDP negotiated RTP extensions can not currently be output in logging */ + ast_log(LOG_DEBUG, "Found unknown RTP Extensions %x\n", profile); } } - return &ast_null_frame; } - /* Get fields and verify this is an RTP packet */ - seqno = ntohl(rtpheader[0]); - - ast_rtp_instance_get_remote_address(instance, &remote_address); + /* Make sure after we potentially mucked with the header length that it is once again valid */ + if (res < hdrlen) { + ast_log(LOG_WARNING, "RTP Read too short (%d, expecting %d\n", res, hdrlen); + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } - if (!(version = (seqno & 0xC0000000) >> 30)) { - struct sockaddr_in addr_tmp; - struct ast_sockaddr addr_v4; + rtp->rxcount++; + rtp->rxoctetcount += (res - hdrlen); + if (rtp->rxcount == 1) { + rtp->seedrxseqno = seqno; + } + + /* Do not schedule RR if RTCP isn't run */ + if (rtp->rtcp && !ast_sockaddr_isnull(&rtp->rtcp->them) && rtp->rtcp->schedid < 0) { + /* Schedule transmission of Receiver Report */ + ao2_ref(instance, +1); + rtp->rtcp->schedid = ast_sched_add(rtp->sched, ast_rtcp_calc_interval(rtp), ast_rtcp_write, instance); + if (rtp->rtcp->schedid < 0) { + ao2_ref(instance, -1); + ast_log(LOG_WARNING, "scheduling RTCP transmission failed.\n"); + } + } + if ((int)rtp->lastrxseqno - (int)seqno > 100) /* if so it would indicate that the sender cycled; allow for misordering */ + rtp->cycles += RTP_SEQ_MOD; + + /* If we are directly bridged to another instance send the audio directly out, + * but only after updating core information about the received traffic so that + * outgoing RTCP reflects it. + */ + instance1 = ast_rtp_instance_get_bridged(instance); + if (instance1 + && !bridge_p2p_rtp_write(instance, instance1, rtpheader, res, hdrlen)) { + struct timeval rxtime; + struct ast_frame *f; + + /* Update statistics for jitter so they are correct in RTCP */ + calc_rxstamp(&rxtime, rtp, timestamp, mark); + + /* When doing P2P we don't need to raise any frames about SSRC change to the core */ + while ((f = AST_LIST_REMOVE_HEAD(&frames, frame_list)) != NULL) { + ast_frfree(f); + } + + return &ast_null_frame; + } + + payload = ast_rtp_codecs_get_payload(ast_rtp_instance_get_codecs(instance), payloadtype); + if (!payload) { + /* Unknown payload type. */ + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } + + /* If the payload is not actually an Asterisk one but a special one pass it off to the respective handler */ + if (!payload->asterisk_format) { + struct ast_frame *f = NULL; + if (payload->rtp_code == AST_RTP_DTMF) { + /* process_dtmf_rfc2833 may need to return multiple frames. We do this + * by passing the pointer to the frame list to it so that the method + * can append frames to the list as needed. + */ + process_dtmf_rfc2833(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, payloadtype, mark, &frames); + } else if (payload->rtp_code == AST_RTP_CISCO_DTMF) { + f = process_dtmf_cisco(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, payloadtype, mark); + } else if (payload->rtp_code == AST_RTP_CN) { + f = process_cn_rfc3389(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, payloadtype, mark); + } else { + ast_log(LOG_NOTICE, "Unknown RTP codec %d received from '%s'\n", + payloadtype, + ast_sockaddr_stringify(remote_address)); + } + + if (f) { + AST_LIST_INSERT_TAIL(&frames, f, frame_list); + } + /* Even if no frame was returned by one of the above methods, + * we may have a frame to return in our frame list + */ + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } + + ao2_replace(rtp->lastrxformat, payload->format); + ao2_replace(rtp->f.subclass.format, payload->format); + switch (ast_format_get_type(rtp->f.subclass.format)) { + case AST_MEDIA_TYPE_AUDIO: + rtp->f.frametype = AST_FRAME_VOICE; + break; + case AST_MEDIA_TYPE_VIDEO: + rtp->f.frametype = AST_FRAME_VIDEO; + break; + case AST_MEDIA_TYPE_TEXT: + rtp->f.frametype = AST_FRAME_TEXT; + break; + case AST_MEDIA_TYPE_IMAGE: + /* Fall through */ + default: + ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", + ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); + return &ast_null_frame; + } + rtp->rxseqno = seqno; + + if (rtp->dtmf_timeout && rtp->dtmf_timeout < timestamp) { + rtp->dtmf_timeout = 0; + + if (rtp->resp) { + struct ast_frame *f; + f = create_dtmf_frame(instance, AST_FRAME_DTMF_END, 0); + f->len = ast_tvdiff_ms(ast_samp2tv(rtp->dtmf_duration, rtp_get_rate(f->subclass.format)), ast_tv(0, 0)); + rtp->resp = 0; + rtp->dtmf_timeout = rtp->dtmf_duration = 0; + AST_LIST_INSERT_TAIL(&frames, f, frame_list); + return AST_LIST_FIRST(&frames); + } + } + + rtp->lastrxts = timestamp; + + rtp->f.src = "RTP"; + rtp->f.mallocd = 0; + rtp->f.datalen = res - hdrlen; + rtp->f.data.ptr = read_area + hdrlen; + rtp->f.offset = hdrlen + AST_FRIENDLY_OFFSET; + ast_set_flag(&rtp->f, AST_FRFLAG_HAS_SEQUENCE_NUMBER); + rtp->f.seqno = seqno; + rtp->f.stream_num = rtp->stream_num; + + if ((ast_format_cmp(rtp->f.subclass.format, ast_format_t140) == AST_FORMAT_CMP_EQUAL) + && ((int)seqno - (prev_seqno + 1) > 0) + && ((int)seqno - (prev_seqno + 1) < 10)) { + unsigned char *data = rtp->f.data.ptr; + + memmove(rtp->f.data.ptr+3, rtp->f.data.ptr, rtp->f.datalen); + rtp->f.datalen +=3; + *data++ = 0xEF; + *data++ = 0xBF; + *data = 0xBD; + } + + if (ast_format_cmp(rtp->f.subclass.format, ast_format_t140_red) == AST_FORMAT_CMP_EQUAL) { + unsigned char *data = rtp->f.data.ptr; + unsigned char *header_end; + int num_generations; + int header_length; + int len; + int diff =(int)seqno - (prev_seqno+1); /* if diff = 0, no drop*/ + int x; + + ao2_replace(rtp->f.subclass.format, ast_format_t140); + header_end = memchr(data, ((*data) & 0x7f), rtp->f.datalen); + if (header_end == NULL) { + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + } + header_end++; + + header_length = header_end - data; + num_generations = header_length / 4; + len = header_length; + + if (!diff) { + for (x = 0; x < num_generations; x++) + len += data[x * 4 + 3]; + + if (!(rtp->f.datalen - len)) + return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + + rtp->f.data.ptr += len; + rtp->f.datalen -= len; + } else if (diff > num_generations && diff < 10) { + len -= 3; + rtp->f.data.ptr += len; + rtp->f.datalen -= len; + + data = rtp->f.data.ptr; + *data++ = 0xEF; + *data++ = 0xBF; + *data = 0xBD; + } else { + for ( x = 0; x < num_generations - diff; x++) + len += data[x * 4 + 3]; + + rtp->f.data.ptr += len; + rtp->f.datalen -= len; + } + } + + if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_AUDIO) { + rtp->f.samples = ast_codec_samples_count(&rtp->f); + if (ast_format_cache_is_slinear(rtp->f.subclass.format)) { + ast_frame_byteswap_be(&rtp->f); + } + calc_rxstamp(&rtp->f.delivery, rtp, timestamp, mark); + /* Add timing data to let ast_generic_bridge() put the frame into a jitterbuf */ + ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); + rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); + rtp->f.len = rtp->f.samples / ((ast_format_get_sample_rate(rtp->f.subclass.format) / 1000)); + } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_VIDEO) { + /* Video -- samples is # of samples vs. 90000 */ + if (!rtp->lastividtimestamp) + rtp->lastividtimestamp = timestamp; + ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); + rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); + rtp->f.samples = timestamp - rtp->lastividtimestamp; + rtp->lastividtimestamp = timestamp; + rtp->f.delivery.tv_sec = 0; + rtp->f.delivery.tv_usec = 0; + /* Pass the RTP marker bit as bit */ + rtp->f.subclass.frame_ending = mark ? 1 : 0; + } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_TEXT) { + /* TEXT -- samples is # of samples vs. 1000 */ + if (!rtp->lastitexttimestamp) + rtp->lastitexttimestamp = timestamp; + rtp->f.samples = timestamp - rtp->lastitexttimestamp; + rtp->lastitexttimestamp = timestamp; + rtp->f.delivery.tv_sec = 0; + rtp->f.delivery.tv_usec = 0; + } else { + ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", + ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); + return &ast_null_frame; + } + + AST_LIST_INSERT_TAIL(&frames, &rtp->f, frame_list); + return AST_LIST_FIRST(&frames); +} + +/*! \pre instance is locked */ +static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtcp) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + struct ast_srtp *srtp; + RAII_VAR(struct ast_rtp_instance *, child, NULL, rtp_instance_unlock); + struct ast_sockaddr addr; + int res, hdrlen = 12, version, payloadtype, mark; + unsigned char *read_area = rtp->rawdata + AST_FRIENDLY_OFFSET; + size_t read_area_size = sizeof(rtp->rawdata) - AST_FRIENDLY_OFFSET; + unsigned int *rtpheader = (unsigned int*)(read_area), seqno, ssrc, timestamp, prev_seqno; + struct ast_sockaddr remote_address = { {0,} }; + struct frame_list frames; + struct ast_frame *frame; + + /* If this is actually RTCP let's hop on over and handle it */ + if (rtcp) { + if (rtp->rtcp && rtp->rtcp->type == AST_RTP_INSTANCE_RTCP_STANDARD) { + return ast_rtcp_read(instance); + } + return &ast_null_frame; + } + + /* Actually read in the data from the socket */ + if ((res = rtp_recvfrom(instance, read_area, read_area_size, 0, + &addr)) < 0) { + if (res == RTP_DTLS_ESTABLISHED) { + rtp->f.frametype = AST_FRAME_CONTROL; + rtp->f.subclass.integer = AST_CONTROL_SRCCHANGE; + return &rtp->f; + } + + ast_assert(errno != EBADF); + if (errno != EAGAIN) { + ast_log(LOG_WARNING, "RTP Read error: %s. Hanging up.\n", + (errno) ? strerror(errno) : "Unspecified"); + return NULL; + } + return &ast_null_frame; + } + + /* If this was handled by the ICE session don't do anything */ + if (!res) { + return &ast_null_frame; + } + + /* This could be a multiplexed RTCP packet. If so, be sure to interpret it correctly */ + if (rtcp_mux(rtp, read_area)) { + return ast_rtcp_interpret(instance, ast_rtp_instance_get_srtp(instance, 1), read_area, res, &addr); + } + + /* Make sure the data that was read in is actually enough to make up an RTP packet */ + if (res < hdrlen) { + /* If this is a keepalive containing only nulls, don't bother with a warning */ + int i; + for (i = 0; i < res; ++i) { + if (read_area[i] != '\0') { + ast_log(LOG_WARNING, "RTP Read too short\n"); + return &ast_null_frame; + } + } + return &ast_null_frame; + } + + /* Get fields and verify this is an RTP packet */ + seqno = ntohl(rtpheader[0]); + + ast_rtp_instance_get_remote_address(instance, &remote_address); + + if (!(version = (seqno & 0xC0000000) >> 30)) { + struct sockaddr_in addr_tmp; + struct ast_sockaddr addr_v4; if (ast_sockaddr_is_ipv4(&addr)) { ast_sockaddr_to_sin(&addr, &addr_tmp); } else if (ast_sockaddr_ipv4_mapped(&addr, &addr_v4)) { @@ -6222,6 +6653,9 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc /* We use the SSRC to determine what RTP instance this packet is actually for */ ssrc = ntohl(rtpheader[2]); + /* We use the SRTP data from the provided instance that it came in on, not the child */ + srtp = ast_rtp_instance_get_srtp(instance, 0); + /* Determine the appropriate instance for this */ child = rtp_find_instance_by_packet_source_ssrc(instance, rtp, ssrc); if (!child) { @@ -6397,20 +6831,18 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc } } - /* If we are currently sending DTMF to the remote party send a continuation packet */ - if (rtp->sending_digit) { - ast_rtp_dtmf_continuation(instance); - } - /* Pull out the various other fields we will need */ payloadtype = (seqno & 0x7f0000) >> 16; - padding = seqno & (1 << 29); mark = seqno & (1 << 23); - ext = seqno & (1 << 28); - cc = (seqno & 0xF000000) >> 24; seqno &= 0xffff; timestamp = ntohl(rtpheader[1]); + if (rtp_debug_test_addr(&addr)) { + ast_verbose("Got RTP packet from %s (type %-2.2d, seq %-6.6u, ts %-6.6u, len %-6.6d)\n", + ast_sockaddr_stringify(&addr), + payloadtype, seqno, timestamp, res - hdrlen); + } + AST_LIST_HEAD_INIT_NOLOCK(&frames); /* Only non-bundled instances can change/learn the remote's SSRC implicitly. */ @@ -6433,280 +6865,280 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc AST_LIST_INSERT_TAIL(&frames, f, frame_list); rtp->seedrxseqno = 0; - rtp->rxcount = 0; - rtp->rxoctetcount = 0; - rtp->cycles = 0; - rtp->lastrxseqno = 0; - rtp->last_seqno = 0; - rtp->last_end_timestamp = 0; - if (rtp->rtcp) { - rtp->rtcp->expected_prior = 0; - rtp->rtcp->received_prior = 0; - } - } - - rtp->themssrc = ssrc; /* Record their SSRC to put in future RR */ - rtp->themssrc_valid = 1; - } - - /* Remove any padding bytes that may be present */ - if (padding) { - res -= read_area[res - 1]; - } - - /* Skip over any CSRC fields */ - if (cc) { - hdrlen += cc * 4; - } - - /* Look for any RTP extensions, currently we do not support any */ - if (ext) { - hdrlen += (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2; - hdrlen += 4; - if (DEBUG_ATLEAST(1)) { - unsigned int profile; - profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16; - if (profile == 0x505a) { - ast_log(LOG_DEBUG, "Found Zfone extension in RTP stream - zrtp - not supported.\n"); - } else { - ast_log(LOG_DEBUG, "Found unknown RTP Extensions %x\n", profile); + rtp->rxcount = 0; + rtp->rxoctetcount = 0; + rtp->cycles = 0; + rtp->lastrxseqno = 0; + rtp->last_seqno = 0; + rtp->last_end_timestamp = 0; + if (rtp->rtcp) { + rtp->rtcp->expected_prior = 0; + rtp->rtcp->received_prior = 0; } } - } - /* Make sure after we potentially mucked with the header length that it is once again valid */ - if (res < hdrlen) { - ast_log(LOG_WARNING, "RTP Read too short (%d, expecting %d\n", res, hdrlen); - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + rtp->themssrc = ssrc; /* Record their SSRC to put in future RR */ + rtp->themssrc_valid = 1; } - rtp->rxcount++; - rtp->rxoctetcount += (res - hdrlen); - if (rtp->rxcount == 1) { - rtp->seedrxseqno = seqno; - } + prev_seqno = rtp->lastrxseqno; + rtp->lastrxseqno = seqno; - /* Do not schedule RR if RTCP isn't run */ - if (rtp->rtcp && !ast_sockaddr_isnull(&rtp->rtcp->them) && rtp->rtcp->schedid < 0) { - /* Schedule transmission of Receiver Report */ - ao2_ref(instance, +1); - rtp->rtcp->schedid = ast_sched_add(rtp->sched, ast_rtcp_calc_interval(rtp), ast_rtcp_write, instance); - if (rtp->rtcp->schedid < 0) { - ao2_ref(instance, -1); - ast_log(LOG_WARNING, "scheduling RTCP transmission failed.\n"); + if (!rtp->recv_buffer) { + /* If there is no receive buffer then we can pass back the frame directly */ + return ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); + } else if (rtp->expectedrxseqno == -1 || seqno == rtp->expectedrxseqno) { + rtp->expectedrxseqno = seqno + 1; + + /* If there are no buffered packets that will be placed after this frame then we can + * return it directly without duplicating it. + */ + if (!ast_data_buffer_count(rtp->recv_buffer)) { + return ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); } - } - if ((int)rtp->lastrxseqno - (int)seqno > 100) /* if so it would indicate that the sender cycled; allow for misordering */ - rtp->cycles += RTP_SEQ_MOD; - prev_seqno = rtp->lastrxseqno; - rtp->lastrxseqno = seqno; + if (!AST_VECTOR_REMOVE_CMP_ORDERED(&rtp->missing_seqno, seqno, find_by_value, + AST_VECTOR_ELEM_CLEANUP_NOOP)) { + ast_debug(2, "Packet with sequence number '%d' on RTP instance '%p' is no longer missing\n", + seqno, instance); + } + /* If we don't have the next packet after this we can directly return the frame, as there is no + * chance it will be overwritten. + */ + if (!ast_data_buffer_get(rtp->recv_buffer, seqno + 1)) { + return ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); + } - /* If we are directly bridged to another instance send the audio directly out, - * but only after updating core information about the received traffic so that - * outgoing RTCP reflects it. - */ - instance1 = ast_rtp_instance_get_bridged(instance); - if (instance1 - && !bridge_p2p_rtp_write(instance, instance1, rtpheader, res, hdrlen)) { - struct timeval rxtime; - struct ast_frame *f; + /* Otherwise we need to dupe the frame so that the potential processing of frames placed after + * it do not overwrite the data. You may be thinking that we could just add the current packet + * to the head of the frames list and avoid having to duplicate it but this would result in out + * of order packet processing by libsrtp which we are trying to avoid. + */ + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, seqno - 1)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + } - /* Update statistics for jitter so they are correct in RTCP */ - calc_rxstamp(&rxtime, rtp, timestamp, mark); + /* Add any additional packets that we have buffered and that are available */ + while (ast_data_buffer_count(rtp->recv_buffer)) { + struct ast_rtp_rtcp_nack_payload *payload; - /* When doing P2P we don't need to raise any frames about SSRC change to the core */ - while ((f = AST_LIST_REMOVE_HEAD(&frames, frame_list)) != NULL) { - ast_frfree(f); + payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_remove(rtp->recv_buffer, rtp->expectedrxseqno); + if (!payload) { + break; + } + + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, rtp->expectedrxseqno - 1)); + ast_free(payload); + + if (!frame) { + /* If this packet can't be interpeted due to being out of memory we return what we have and assume + * that we will determine it is a missing packet later and NACK for it. + */ + return AST_LIST_FIRST(&frames); + } + + ast_debug(2, "Pulled buffered packet with sequence number '%d' to additionally return on RTP instance '%p'\n", + frame->seqno, instance); + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + rtp->expectedrxseqno++; } - return &ast_null_frame; - } + return AST_LIST_FIRST(&frames); + } else if ((abs(seqno - rtp->expectedrxseqno) > 100) || + ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer)) { + int inserted = 0; - if (rtp_debug_test_addr(&addr)) { - ast_verbose("Got RTP packet from %s (type %-2.2d, seq %-6.6u, ts %-6.6u, len %-6.6d)\n", - ast_sockaddr_stringify(&addr), - payloadtype, seqno, timestamp,res - hdrlen); - } + /* We have a large number of outstanding buffered packets or we've jumped far ahead in time. + * To compensate we dump what we have in the buffer and place the current packet in a logical + * spot. In the case of video we also require a full frame to give the decoding side a fighting + * chance. + */ - payload = ast_rtp_codecs_get_payload(ast_rtp_instance_get_codecs(instance), payloadtype); - if (!payload) { - /* Unknown payload type. */ - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; - } + if (rtp->rtp_source_learn.stream_type == AST_MEDIA_TYPE_VIDEO) { + ast_debug(2, "Source on RTP instance '%p' has wild gap or packet loss, sending FIR\n", + instance); + rtp_write_rtcp_fir(instance, rtp, &remote_address); + } - /* If the payload is not actually an Asterisk one but a special one pass it off to the respective handler */ - if (!payload->asterisk_format) { - struct ast_frame *f = NULL; - if (payload->rtp_code == AST_RTP_DTMF) { - /* process_dtmf_rfc2833 may need to return multiple frames. We do this - * by passing the pointer to the frame list to it so that the method - * can append frames to the list as needed. + while (ast_data_buffer_count(rtp->recv_buffer)) { + struct ast_rtp_rtcp_nack_payload *payload; + + payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_remove_head(rtp->recv_buffer); + if (!payload) { + continue; + } + + /* Even when dumping the receive buffer we do our best to order things, so we ensure that the + * packet we just received is processed in the correct order, so see if we need to insert it now. */ - process_dtmf_rfc2833(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, &addr, payloadtype, mark, &frames); - } else if (payload->rtp_code == AST_RTP_CISCO_DTMF) { - f = process_dtmf_cisco(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, &addr, payloadtype, mark); - } else if (payload->rtp_code == AST_RTP_CN) { - f = process_cn_rfc3389(instance, read_area + hdrlen, res - hdrlen, seqno, timestamp, &addr, payloadtype, mark); - } else { - ast_log(LOG_NOTICE, "Unknown RTP codec %d received from '%s'\n", - payloadtype, - ast_sockaddr_stringify(&remote_address)); + if (!inserted) { + int buffer_seqno; + + buffer_seqno = ntohl(payload->buf[0]) & 0xffff; + if (seqno < buffer_seqno) { + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + rtp->expectedrxseqno = seqno + 1; + prev_seqno = seqno; + ast_debug(2, "Inserted just received packet with sequence number '%d' in correct order on RTP instance '%p'\n", + seqno, instance); + } + inserted = 1; + } + } + + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, prev_seqno)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + prev_seqno = frame->seqno; + ast_debug(2, "Emptying queue and returning packet with sequence number '%d' from RTP instance '%p'\n", + frame->seqno, instance); + } + + ast_free(payload); } - if (f) { - AST_LIST_INSERT_TAIL(&frames, f, frame_list); + if (!inserted) { + /* This current packet goes after them, and we assume that packets going forward will follow + * that new sequence number increment. It is okay for this to not be duplicated as it is guaranteed + * to be the last packet processed right now and it is also guaranteed that it will always return + * non-NULL. + */ + frame = ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + rtp->expectedrxseqno = seqno + 1; + + ast_debug(2, "Adding just received packet with sequence number '%d' to end of dumped queue on RTP instance '%p'\n", + seqno, instance); } - /* Even if no frame was returned by one of the above methods, - * we may have a frame to return in our frame list + + /* As there is such a large gap we don't want to flood the order side with missing packets, so we + * give up and start anew. */ - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; - } + AST_VECTOR_RESET(&rtp->missing_seqno, AST_VECTOR_ELEM_CLEANUP_NOOP); - ao2_replace(rtp->lastrxformat, payload->format); - ao2_replace(rtp->f.subclass.format, payload->format); - switch (ast_format_get_type(rtp->f.subclass.format)) { - case AST_MEDIA_TYPE_AUDIO: - rtp->f.frametype = AST_FRAME_VOICE; - break; - case AST_MEDIA_TYPE_VIDEO: - rtp->f.frametype = AST_FRAME_VIDEO; - break; - case AST_MEDIA_TYPE_TEXT: - rtp->f.frametype = AST_FRAME_TEXT; - break; - case AST_MEDIA_TYPE_IMAGE: - /* Fall through */ - default: - ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", - ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); + return AST_LIST_FIRST(&frames); + } else if (seqno < rtp->expectedrxseqno) { + /* If this is a packet from the past then we have received a duplicate packet, so just drop it */ + ast_debug(2, "Received an old packet with sequence number '%d' on RTP instance '%p', dropping it\n", + seqno, instance); return &ast_null_frame; - } - rtp->rxseqno = seqno; - - if (rtp->dtmf_timeout && rtp->dtmf_timeout < timestamp) { - rtp->dtmf_timeout = 0; - - if (rtp->resp) { - struct ast_frame *f; - f = create_dtmf_frame(instance, AST_FRAME_DTMF_END, 0); - f->len = ast_tvdiff_ms(ast_samp2tv(rtp->dtmf_duration, rtp_get_rate(f->subclass.format)), ast_tv(0, 0)); - rtp->resp = 0; - rtp->dtmf_timeout = rtp->dtmf_duration = 0; - AST_LIST_INSERT_TAIL(&frames, f, frame_list); - return AST_LIST_FIRST(&frames); + } else if (ast_data_buffer_get(rtp->recv_buffer, seqno)) { + /* If this is a packet we already have buffered then it is a duplicate, so just drop it */ + ast_debug(2, "Received a duplicate transmission of packet with sequence number '%d' on RTP instance '%p', dropping it\n", + seqno, instance); + return &ast_null_frame; + } else { + /* This is an out of order packet from the future */ + struct ast_rtp_rtcp_nack_payload *payload; + int difference; + + ast_debug(2, "Received an out of order packet with sequence number '%d' from the future on RTP instance '%p'\n", + seqno, instance); + + payload = ast_malloc(sizeof(*payload) + res); + if (!payload) { + /* If the payload can't be allocated then we can't defer this packet right now. + * Instead of dumping what we have we pretend we lost this packet. It will then + * get NACKed later or the existing buffer will be returned entirely. Well, we may + * try since we're seemingly out of memory. It's a bad situation all around and + * packets are likely to get lost anyway. + */ + return &ast_null_frame; } - } - rtp->lastrxts = timestamp; + payload->size = res; + memcpy(payload->buf, rtpheader, res); + ast_data_buffer_put(rtp->recv_buffer, seqno, payload); + AST_VECTOR_REMOVE_CMP_ORDERED(&rtp->missing_seqno, seqno, find_by_value, + AST_VECTOR_ELEM_CLEANUP_NOOP); - rtp->f.src = "RTP"; - rtp->f.mallocd = 0; - rtp->f.datalen = res - hdrlen; - rtp->f.data.ptr = read_area + hdrlen; - rtp->f.offset = hdrlen + AST_FRIENDLY_OFFSET; - ast_set_flag(&rtp->f, AST_FRFLAG_HAS_SEQUENCE_NUMBER); - rtp->f.seqno = seqno; - rtp->f.stream_num = rtp->stream_num; + difference = seqno - (prev_seqno + 1); + while (difference > 0) { + /* We don't want missing sequence number duplicates. If, for some reason, + * packets are really out of order, we could end up in this scenario: + * + * We are expecting sequence number 100 + * We receive sequence number 105 + * Sequence numbers 100 through 104 get added to the vector + * We receive sequence number 101 (this section is skipped) + * We receive sequence number 103 + * Sequence number 102 is added to the vector + * + * This will prevent the duplicate from being added. + */ + if (AST_VECTOR_GET_CMP(&rtp->missing_seqno, seqno - difference, + find_by_value)) { + difference--; + continue; + } - if ((ast_format_cmp(rtp->f.subclass.format, ast_format_t140) == AST_FORMAT_CMP_EQUAL) - && ((int)seqno - (prev_seqno + 1) > 0) - && ((int)seqno - (prev_seqno + 1) < 10)) { - unsigned char *data = rtp->f.data.ptr; + ast_debug(2, "Added missing sequence number '%d' to RTP instance '%p'\n", + seqno - difference, instance); + AST_VECTOR_ADD_SORTED(&rtp->missing_seqno, seqno - difference, + compare_by_value); + difference--; + } - memmove(rtp->f.data.ptr+3, rtp->f.data.ptr, rtp->f.datalen); - rtp->f.datalen +=3; - *data++ = 0xEF; - *data++ = 0xBF; - *data = 0xBD; - } + /* When our data buffer is half full we assume that the packets aren't just out of order but + * have actually been lost. To get them back we construct and send a NACK causing the sender to + * retransmit them. + */ + if (ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer) / 2) { + int packet_len = 0; + int res = 0; + int ice; + int sr; + size_t data_size = AST_UUID_STR_LEN + 128 + (seqno - rtp->expectedrxseqno) / 17; + RAII_VAR(unsigned char *, rtcpheader, NULL, ast_free_ptr); + RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report, + ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0), + ao2_cleanup); + + rtcpheader = ast_malloc(sizeof(*rtcpheader) + data_size); + if (!rtcpheader) { + ast_debug(1, "Failed to allocate memory for NACK\n"); + return &ast_null_frame; + } - if (ast_format_cmp(rtp->f.subclass.format, ast_format_t140_red) == AST_FORMAT_CMP_EQUAL) { - unsigned char *data = rtp->f.data.ptr; - unsigned char *header_end; - int num_generations; - int header_length; - int len; - int diff =(int)seqno - (prev_seqno+1); /* if diff = 0, no drop*/ - int x; + memset(rtcpheader, 0, data_size); - ao2_replace(rtp->f.subclass.format, ast_format_t140); - header_end = memchr(data, ((*data) & 0x7f), rtp->f.datalen); - if (header_end == NULL) { - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; - } - header_end++; + res = ast_rtcp_generate_report(instance, rtcpheader, rtcp_report, &sr); - header_length = header_end - data; - num_generations = header_length / 4; - len = header_length; + if (res == 0 || res == 1) { + ast_debug(1, "Failed to add %s report to NACK, stopping here\n", sr ? "SR" : "RR"); + return &ast_null_frame; + } - if (!diff) { - for (x = 0; x < num_generations; x++) - len += data[x * 4 + 3]; + packet_len += res; - if (!(rtp->f.datalen - len)) - return AST_LIST_FIRST(&frames) ? AST_LIST_FIRST(&frames) : &ast_null_frame; + res = ast_rtcp_generate_nack(instance, rtcpheader + packet_len); - rtp->f.data.ptr += len; - rtp->f.datalen -= len; - } else if (diff > num_generations && diff < 10) { - len -= 3; - rtp->f.data.ptr += len; - rtp->f.datalen -= len; + if (res == 0) { + ast_debug(1, "Failed to construct NACK, stopping here\n"); + return &ast_null_frame; + } - data = rtp->f.data.ptr; - *data++ = 0xEF; - *data++ = 0xBF; - *data = 0xBD; - } else { - for ( x = 0; x < num_generations - diff; x++) - len += data[x * 4 + 3]; + packet_len += res; - rtp->f.data.ptr += len; - rtp->f.datalen -= len; - } - } + res = rtcp_sendto(instance, rtcpheader, packet_len, 0, &remote_address, &ice); + if (res < 0) { + ast_debug(1, "Failed to send NACK request out\n"); + } else { + /* Update RTCP SR/RR statistics */ + ast_rtcp_calculate_sr_rr_statistics(instance, rtcp_report, remote_address, ice, sr); + } - if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_AUDIO) { - rtp->f.samples = ast_codec_samples_count(&rtp->f); - if (ast_format_cache_is_slinear(rtp->f.subclass.format)) { - ast_frame_byteswap_be(&rtp->f); + ast_debug(2, "Sending a NACK request on RTP instance '%p' to get missing packets\n", instance); } - calc_rxstamp(&rtp->f.delivery, rtp, timestamp, mark); - /* Add timing data to let ast_generic_bridge() put the frame into a jitterbuf */ - ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); - rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); - rtp->f.len = rtp->f.samples / ((ast_format_get_sample_rate(rtp->f.subclass.format) / 1000)); - } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_VIDEO) { - /* Video -- samples is # of samples vs. 90000 */ - if (!rtp->lastividtimestamp) - rtp->lastividtimestamp = timestamp; - ast_set_flag(&rtp->f, AST_FRFLAG_HAS_TIMING_INFO); - rtp->f.ts = timestamp / (rtp_get_rate(rtp->f.subclass.format) / 1000); - rtp->f.samples = timestamp - rtp->lastividtimestamp; - rtp->lastividtimestamp = timestamp; - rtp->f.delivery.tv_sec = 0; - rtp->f.delivery.tv_usec = 0; - /* Pass the RTP marker bit as bit */ - rtp->f.subclass.frame_ending = mark ? 1 : 0; - } else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_TEXT) { - /* TEXT -- samples is # of samples vs. 1000 */ - if (!rtp->lastitexttimestamp) - rtp->lastitexttimestamp = timestamp; - rtp->f.samples = timestamp - rtp->lastitexttimestamp; - rtp->lastitexttimestamp = timestamp; - rtp->f.delivery.tv_sec = 0; - rtp->f.delivery.tv_usec = 0; - } else { - ast_log(LOG_WARNING, "Unknown or unsupported media type: %s\n", - ast_codec_media_type2str(ast_format_get_type(rtp->f.subclass.format))); - return &ast_null_frame;; + + return &ast_null_frame; } - AST_LIST_INSERT_TAIL(&frames, &rtp->f, frame_list); - return AST_LIST_FIRST(&frames); + return &ast_null_frame; } /*! \pre instance is locked */ @@ -6857,7 +7289,10 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro } else if (property == AST_RTP_PROPERTY_ASYMMETRIC_CODEC) { rtp->asymmetric_codec = value; } else if (property == AST_RTP_PROPERTY_RETRANS_SEND) { - rtp->send_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_BUFFER_SIZE); + rtp->send_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_SEND_BUFFER_SIZE); + } else if (property == AST_RTP_PROPERTY_RETRANS_RECV) { + rtp->recv_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_RECV_BUFFER_SIZE); + AST_VECTOR_INIT(&rtp->missing_seqno, 0); } } diff --git a/tests/test_data_buffer.c b/tests/test_data_buffer.c index 11fdc7b6f57470ef48542bbabcc5efc6e7e5d8d8..93c2c0612afa8e9f48b57987ed1a0e7ad1c54c4d 100644 --- a/tests/test_data_buffer.c +++ b/tests/test_data_buffer.c @@ -217,6 +217,7 @@ AST_TEST_DEFINE(buffer_resize) AST_TEST_DEFINE(buffer_nominal) { RAII_VAR(struct ast_data_buffer *, buffer, NULL, ast_data_buffer_free_wrapper); + RAII_VAR(struct mock_payload *, removed_payload, NULL, ast_free_ptr); struct mock_payload *payload; struct mock_payload *fetched_payload; int ret; @@ -247,6 +248,9 @@ AST_TEST_DEFINE(buffer_nominal) "Failed to allocate memory for payload %d", i); ret = ast_data_buffer_put(buffer, i, payload); + if (ret) { + ast_free(payload); + } ast_test_validate(test, ret == 0, "Failed to add payload %d to buffer", i); @@ -268,7 +272,11 @@ AST_TEST_DEFINE(buffer_nominal) ast_test_validate(test, payload != NULL, "Failed to allocate memory for payload %d", i + BUFFER_MAX_NOMINAL); + payload->id = i; ret = ast_data_buffer_put(buffer, i + BUFFER_MAX_NOMINAL, payload); + if (ret) { + ast_free(payload); + } ast_test_validate(test, ret == 0, "Failed to add payload %d to buffer", i + BUFFER_MAX_NOMINAL); @@ -289,6 +297,30 @@ AST_TEST_DEFINE(buffer_nominal) "Failed to get payload at position %d during second loop", i + BUFFER_MAX_NOMINAL); } + removed_payload = (struct mock_payload *)ast_data_buffer_remove_head(buffer); + + ast_test_validate(test, removed_payload != NULL, + "Failed to get the payload at the HEAD of the buffer"); + + ast_test_validate(test, ast_data_buffer_count(buffer) == BUFFER_MAX_NOMINAL - 1, + "Removing payload from HEAD of buffer did not decrease buffer size"); + + ast_test_validate(test, removed_payload->id == 1, + "Removing payload from HEAD of buffer did not return expected payload"); + + ast_free(removed_payload); + + removed_payload = (struct mock_payload *)ast_data_buffer_remove(buffer, BUFFER_MAX_NOMINAL * 2); + + ast_test_validate(test, removed_payload != NULL, + "Failed to get payload at position %d from buffer", BUFFER_MAX_NOMINAL * 2); + + ast_test_validate(test, ast_data_buffer_count(buffer) == BUFFER_MAX_NOMINAL - 2, + "Removing payload from buffer did not decrease buffer size"); + + ast_test_validate(test, removed_payload->id == BUFFER_MAX_NOMINAL, + "Removing payload from buffer did not return expected payload"); + return AST_TEST_PASS; }