diff options
Diffstat (limited to 'src/modules/rtp')
-rw-r--r-- | src/modules/rtp/module-rtp-recv.c | 167 | ||||
-rw-r--r-- | src/modules/rtp/module-rtp-send.c | 11 | ||||
-rw-r--r-- | src/modules/rtp/rtp.c | 44 | ||||
-rw-r--r-- | src/modules/rtp/rtp.h | 2 | ||||
-rw-r--r-- | src/modules/rtp/sap.c | 4 | ||||
-rw-r--r-- | src/modules/rtp/sap.h | 4 | ||||
-rw-r--r-- | src/modules/rtp/sdp.c | 6 |
7 files changed, 189 insertions, 49 deletions
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c index d8e7a781..cff5cf8b 100644 --- a/src/modules/rtp/module-rtp-recv.c +++ b/src/modules/rtp/module-rtp-recv.c @@ -51,6 +51,7 @@ #include <pulsecore/atomic.h> #include <pulsecore/rtclock.h> #include <pulsecore/atomic.h> +#include <pulsecore/time-smoother.h> #include "module-rtp-recv-symdef.h" @@ -69,9 +70,11 @@ PA_MODULE_USAGE( #define SAP_PORT 9875 #define DEFAULT_SAP_ADDRESS "224.0.0.56" -#define MEMBLOCKQ_MAXLENGTH (1024*170) +#define MEMBLOCKQ_MAXLENGTH (1024*1024*40) #define MAX_SESSIONS 16 #define DEATH_TIMEOUT 20 +#define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC) +#define LATENCY_USEC (500*PA_USEC_PER_MSEC) static const char* const valid_modargs[] = { "sink", @@ -97,6 +100,12 @@ struct session { pa_rtpoll_item *rtpoll_item; pa_atomic_t timestamp; + + pa_smoother *smoother; + pa_usec_t intended_latency; + pa_usec_t sink_latency; + + pa_usec_t last_rate_update; }; struct userdata { @@ -133,21 +142,37 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t } /* Called from I/O thread context */ -static int sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { struct session *s; pa_sink_input_assert_ref(i); pa_assert_se(s = i->userdata); - return pa_memblockq_peek(s->memblockq, chunk); + if (pa_memblockq_peek(s->memblockq, chunk) < 0) + return -1; + + pa_memblockq_drop(s->memblockq, chunk->length); + + return 0; } /* Called from I/O thread context */ -static void sink_input_drop(pa_sink_input *i, size_t length) { +static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { struct session *s; + pa_sink_input_assert_ref(i); pa_assert_se(s = i->userdata); - pa_memblockq_drop(s->memblockq, length); + pa_memblockq_rewind(s->memblockq, nbytes); +} + +/* Called from thread context */ +static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { + struct session *s; + + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + pa_memblockq_set_maxrewind(s->memblockq, nbytes); } /* Called from main context */ @@ -215,20 +240,82 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { pa_memblockq_seek(s->memblockq, delta * s->rtp_context.frame_size, PA_SEEK_RELATIVE); + pa_rtclock_get(&now); + + pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec(pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec)); + if (pa_memblockq_push(s->memblockq, &chunk) < 0) { - /* queue overflow, let's flush it and try again */ - pa_memblockq_flush(s->memblockq); - pa_memblockq_push(s->memblockq, &chunk); + pa_log_warn("Queue overrun"); + pa_memblockq_seek(s->memblockq, chunk.length, PA_SEEK_RELATIVE); } - /* The next timestamp we expect */ - s->offset = s->rtp_context.timestamp + (chunk.length / s->rtp_context.frame_size); + pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); pa_memblock_unref(chunk.memblock); - pa_rtclock_get(&now); + /* The next timestamp we expect */ + s->offset = s->rtp_context.timestamp + (chunk.length / s->rtp_context.frame_size); + pa_atomic_store(&s->timestamp, now.tv_sec); + if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) { + pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix; + unsigned fix_samples; + + pa_log("Updating sample rate"); + + wi = pa_smoother_get(s->smoother, pa_timeval_load(&now)); + ri = pa_bytes_to_usec(pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec); + + if (PA_MSGOBJECT(s->sink_input->sink)->process_msg(PA_MSGOBJECT(s->sink_input->sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_delay, 0, NULL) < 0) + sink_delay = 0; + + render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec); + + if (ri > render_delay+sink_delay) + ri -= render_delay+sink_delay; + else + ri = 0; + + if (wi < ri) + latency = 0; + else + latency = wi - ri; + + pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC); + + /* Calculate deviation */ + if (latency < s->intended_latency) + fix = s->intended_latency - latency; + else + fix = latency - s->intended_latency; + + /* How many samples is this per second? */ + fix_samples = fix * s->sink_input->thread_info.sample_spec.rate / RATE_UPDATE_INTERVAL; + + /* Check if deviation is in bounds */ + if (fix_samples > s->sink_input->sample_spec.rate*.20) + pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples); + + /* Fix up rate */ + if (latency < s->intended_latency) + s->sink_input->sample_spec.rate -= fix_samples; + else + s->sink_input->sample_spec.rate += fix_samples; + + pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate); + + pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate); + + s->last_rate_update = pa_timeval_load(&now); + } + + if (pa_memblockq_is_readable(s->memblockq) && + s->sink_input->thread_info.underrun_for > 0) { + pa_log_debug("Requesting rewind due to end of underrun"); + pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE); + } + return 1; } @@ -314,10 +401,9 @@ fail: static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) { struct session *s = NULL; - char *c; pa_sink *sink; int fd = -1; - pa_memblock *silence; + pa_memchunk silence; pa_sink_input_new_data data; struct timeval now; @@ -329,37 +415,46 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in goto fail; } - if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK, 1))) { + if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK, TRUE))) { pa_log("Sink does not exist."); goto fail; } + pa_rtclock_get(&now); + s = pa_xnew0(struct session, 1); s->userdata = u; s->first_packet = FALSE; s->sdp_info = *sdp_info; s->rtpoll_item = NULL; - - pa_rtclock_get(&now); + s->intended_latency = LATENCY_USEC; + s->smoother = pa_smoother_new(PA_USEC_PER_SEC*5, PA_USEC_PER_SEC*2, TRUE, 10); + pa_smoother_set_time_offset(s->smoother, pa_timeval_load(&now)); + s->last_rate_update = pa_timeval_load(&now); pa_atomic_store(&s->timestamp, now.tv_sec); if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0) goto fail; - c = pa_sprintf_malloc("RTP Stream%s%s%s", - sdp_info->session_name ? " (" : "", - sdp_info->session_name ? sdp_info->session_name : "", - sdp_info->session_name ? ")" : ""); - pa_sink_input_new_data_init(&data); data.sink = sink; data.driver = __FILE__; - data.name = c; + pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream"); + pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME, + "RTP Stream%s%s%s", + sdp_info->session_name ? " (" : "", + sdp_info->session_name ? sdp_info->session_name : "", + sdp_info->session_name ? ")" : ""); + + if (sdp_info->session_name) + pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name); + pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin); + pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload); data.module = u->module; pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec); s->sink_input = pa_sink_input_new(u->module->core, &data, 0); - pa_xfree(c); + pa_sink_input_new_data_done(&data); if (!s->sink_input) { pa_log("Failed to create sink input."); @@ -369,27 +464,31 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in s->sink_input->userdata = s; s->sink_input->parent.process_msg = sink_input_process_msg; - s->sink_input->peek = sink_input_peek; - s->sink_input->drop = sink_input_drop; + s->sink_input->pop = sink_input_pop_cb; + s->sink_input->process_rewind = sink_input_process_rewind_cb; + s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; s->sink_input->kill = sink_input_kill; s->sink_input->attach = sink_input_attach; s->sink_input->detach = sink_input_detach; - silence = pa_silence_memblock_new( - s->userdata->module->core->mempool, - &s->sink_input->sample_spec, - pa_frame_align(pa_bytes_per_second(&s->sink_input->sample_spec)/128, &s->sink_input->sample_spec)); + pa_sink_input_get_silence(s->sink_input, &silence); + + s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2); + + if (s->intended_latency < s->sink_latency*2) + s->intended_latency = s->sink_latency*2; s->memblockq = pa_memblockq_new( 0, MEMBLOCKQ_MAXLENGTH, MEMBLOCKQ_MAXLENGTH, pa_frame_size(&s->sink_input->sample_spec), - pa_bytes_per_second(&s->sink_input->sample_spec)/10+1, + pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec), + 0, 0, - silence); + &silence); - pa_memblock_unref(silence); + pa_memblock_unref(silence.memblock); pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec)); @@ -429,12 +528,14 @@ static void session_free(struct session *s) { pa_sdp_info_destroy(&s->sdp_info); pa_rtp_context_destroy(&s->rtp_context); + pa_smoother_free(s->smoother); + pa_xfree(s); } static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) { struct userdata *u = userdata; - int goodbye; + pa_bool_t goodbye = FALSE; pa_sdp_info info; struct session *s; diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c index 95ff15de..3a526c14 100644 --- a/src/modules/rtp/module-rtp-send.c +++ b/src/modules/rtp/module-rtp-send.c @@ -288,14 +288,20 @@ int pa__init(pa_module*m) { pa_make_fd_cloexec(sap_fd); pa_source_output_new_data_init(&data); - data.name = "RTP Monitor Stream"; + pa_proplist_sets(data.proplist, PA_PROP_MEDIA_NAME, "RTP Monitor Stream"); + pa_proplist_sets(data.proplist, "rtp.destination", dest); + pa_proplist_setf(data.proplist, "rtp.mtu", "%lu", (unsigned long) mtu); + pa_proplist_setf(data.proplist, "rtp.port", "%lu", (unsigned long) port); data.driver = __FILE__; data.module = m; data.source = s; pa_source_output_new_data_set_sample_spec(&data, &ss); pa_source_output_new_data_set_channel_map(&data, &cm); - if (!(o = pa_source_output_new(m->core, &data, 0))) { + o = pa_source_output_new(m->core, &data, 0); + pa_source_output_new_data_done(&data); + + if (!o) { pa_log("failed to create source output."); goto fail; } @@ -318,6 +324,7 @@ int pa__init(pa_module*m) { pa_frame_size(&ss), 1, 0, + 0, NULL); u->mtu = mtu; diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index 997fcc34..5c299844 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -55,6 +55,8 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr c->payload = payload & 127; c->frame_size = frame_size; + pa_memchunk_reset(&c->memchunk); + return c; } @@ -152,6 +154,8 @@ pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame c->fd = fd; c->frame_size = frame_size; + + pa_memchunk_reset(&c->memchunk); return c; } @@ -173,12 +177,28 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { goto fail; } - if (!size) + if (size <= 0) return 0; - chunk->memblock = pa_memblock_new(pool, size); + if (c->memchunk.length < (unsigned) size) { + size_t l; + + if (c->memchunk.memblock) + pa_memblock_unref(c->memchunk.memblock); + + l = PA_MAX((size_t) size, pa_mempool_block_size_max(pool)); + + c->memchunk.memblock = pa_memblock_new(pool, l); + c->memchunk.index = 0; + c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock); + } + + pa_assert(c->memchunk.length >= (size_t) size); - iov.iov_base = pa_memblock_acquire(chunk->memblock); + chunk->memblock = pa_memblock_ref(c->memchunk.memblock); + chunk->index = c->memchunk.index; + + iov.iov_base = (uint8_t*) pa_memblock_acquire(chunk->memblock) + chunk->index; iov.iov_len = size; m.msg_name = NULL; @@ -236,14 +256,22 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { goto fail; } - chunk->index = 12 + cc*4; - chunk->length = size - chunk->index; + chunk->index += 12 + cc*4; + chunk->length = size - 12 + cc*4; if (chunk->length % c->frame_size != 0) { pa_log_warn("Bad RTP packet size."); goto fail; } + c->memchunk.index = chunk->index + chunk->length; + c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock) - c->memchunk.index; + + if (c->memchunk.length <= 0) { + pa_memblock_unref(c->memchunk.memblock); + pa_memchunk_reset(&c->memchunk); + } + return 0; fail: @@ -329,7 +357,10 @@ int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) { void pa_rtp_context_destroy(pa_rtp_context *c) { pa_assert(c); - pa_close(c->fd); + pa_assert_se(pa_close(c->fd) == 0); + + if (c->memchunk.memblock) + pa_memblock_unref(c->memchunk.memblock); } const char* pa_rtp_format_to_string(pa_sample_format_t f) { @@ -361,4 +392,3 @@ pa_sample_format_t pa_rtp_string_to_format(const char *s) { else return PA_SAMPLE_INVALID; } - diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h index ad7175ca..a366d7a6 100644 --- a/src/modules/rtp/rtp.h +++ b/src/modules/rtp/rtp.h @@ -37,6 +37,8 @@ typedef struct pa_rtp_context { uint32_t ssrc; uint8_t payload; size_t frame_size; + + pa_memchunk memchunk; } pa_rtp_context; pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssrc, uint8_t payload, size_t frame_size); diff --git a/src/modules/rtp/sap.c b/src/modules/rtp/sap.c index ed7eb0be..123bc494 100644 --- a/src/modules/rtp/sap.c +++ b/src/modules/rtp/sap.c @@ -71,7 +71,7 @@ void pa_sap_context_destroy(pa_sap_context *c) { pa_xfree(c->sdp_data); } -int pa_sap_send(pa_sap_context *c, int goodbye) { +int pa_sap_send(pa_sap_context *c, pa_bool_t goodbye) { uint32_t header; struct sockaddr_storage sa_buf; struct sockaddr *sa = (struct sockaddr*) &sa_buf; @@ -127,7 +127,7 @@ pa_sap_context* pa_sap_context_init_recv(pa_sap_context *c, int fd) { return c; } -int pa_sap_recv(pa_sap_context *c, int *goodbye) { +int pa_sap_recv(pa_sap_context *c, pa_bool_t *goodbye) { struct msghdr m; struct iovec iov; int size, k; diff --git a/src/modules/rtp/sap.h b/src/modules/rtp/sap.h index f906a32b..db096d61 100644 --- a/src/modules/rtp/sap.h +++ b/src/modules/rtp/sap.h @@ -40,9 +40,9 @@ typedef struct pa_sap_context { pa_sap_context* pa_sap_context_init_send(pa_sap_context *c, int fd, char *sdp_data); void pa_sap_context_destroy(pa_sap_context *c); -int pa_sap_send(pa_sap_context *c, int goodbye); +int pa_sap_send(pa_sap_context *c, pa_bool_t goodbye); pa_sap_context* pa_sap_context_init_recv(pa_sap_context *c, int fd); -int pa_sap_recv(pa_sap_context *c, int *goodbye); +int pa_sap_recv(pa_sap_context *c, pa_bool_t *goodbye); #endif diff --git a/src/modules/rtp/sdp.c b/src/modules/rtp/sdp.c index 50ac157a..9265a200 100644 --- a/src/modules/rtp/sdp.c +++ b/src/modules/rtp/sdp.c @@ -117,7 +117,7 @@ static pa_sample_spec *parse_sdp_sample_spec(pa_sample_spec *ss, char *c) { pa_sdp_info *pa_sdp_parse(const char *t, pa_sdp_info *i, int is_goodbye) { uint16_t port = 0; - int ss_valid = 0; + pa_bool_t ss_valid = FALSE; pa_assert(t); pa_assert(i); @@ -202,7 +202,7 @@ pa_sdp_info *pa_sdp_parse(const char *t, pa_sdp_info *i, int is_goodbye) { i->payload = (uint8_t) _payload; if (pa_rtp_sample_spec_from_payload(i->payload, &i->sample_spec)) - ss_valid = 1; + ss_valid = TRUE; } } } else if (pa_startswith(t, "a=rtpmap:")) { @@ -222,7 +222,7 @@ pa_sdp_info *pa_sdp_parse(const char *t, pa_sdp_info *i, int is_goodbye) { c[strcspn(c, "\n")] = 0; if (parse_sdp_sample_spec(&i->sample_spec, c)) - ss_valid = 1; + ss_valid = TRUE; } } } |