From 920f045380d70785d6ca483d901610d70daee361 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 7 Apr 2006 00:24:48 +0000 Subject: rework latency querying API (this needs more testing) git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@648 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/polyp/def.h | 44 ++++-- src/polyp/internal.h | 44 ++++-- src/polyp/stream.c | 425 +++++++++++++++++++++++++++++++-------------------- src/polyp/stream.h | 30 +--- 4 files changed, 323 insertions(+), 220 deletions(-) (limited to 'src/polyp') diff --git a/src/polyp/def.h b/src/polyp/def.h index 98420bc0..93d0996b 100644 --- a/src/polyp/def.h +++ b/src/polyp/def.h @@ -80,7 +80,7 @@ typedef enum pa_stream_direction { /** Some special flags for stream connections. \since 0.6 */ typedef enum pa_stream_flags { PA_STREAM_START_CORKED = 1, /**< Create the stream corked, requiring an explicit pa_stream_cork() call to uncork it. */ - PA_STREAM_INTERPOLATE_LATENCY = 2 /**< Interpolate the latency for + PA_STREAM_INTERPOLATE_LATENCY = 2, /**< Interpolate the latency for * this stream. When enabled, * you can use * pa_stream_interpolated_xxx() @@ -95,6 +95,7 @@ typedef enum pa_stream_flags { * information. This is * especially useful on long latency * network connections. */ + PA_STREAM_NOT_MONOTONOUS = 4, /**< Don't force the time to run monotonically */ } pa_stream_flags_t; /** Playback and record buffer metrics */ @@ -124,7 +125,7 @@ enum { PA_ERR_INVALIDSERVER, /**< Invalid server */ PA_ERR_MODINITFAILED, /**< Module initialization failed */ PA_ERR_BADSTATE, /**< Bad state */ - PA_ERR_NODATA, /**< No data */ + PA_ERR_NODATA, /**< No data */ PA_ERR_VERSION, /**< Incompatible protocol version \since 0.8 */ PA_ERR_MAX /**< Not really an error but the first invalid error code */ }; @@ -171,7 +172,7 @@ typedef enum pa_subscription_event_type { * pa_stream_write() takes to be played may be estimated by * sink_usec+buffer_usec+transport_usec. The output buffer to which * buffer_usec relates may be manipulated freely (with - * pa_stream_write()'s delta argument, pa_stream_flush() and friends), + * pa_stream_write()'s seek argument, pa_stream_flush() and friends), * the buffers sink_usec/source_usec relates to is a first-in * first-out buffer which cannot be flushed or manipulated in any * way. The total input latency a sample that is recorded takes to be @@ -180,12 +181,7 @@ typedef enum pa_subscription_event_type { * sign issues!) When connected to a monitor source sink_usec contains * the latency of the owning sink.*/ typedef struct pa_latency_info { - pa_usec_t buffer_usec; /**< Time in usecs the current buffer takes to play. For both playback and record streams. */ - pa_usec_t sink_usec; /**< Time in usecs a sample takes to be played on the sink. For playback streams and record streams connected to a monitor source. */ - pa_usec_t source_usec; /**< Time in usecs a sample takes from being recorded to being delivered to the application. Only for record streams. \since 0.5*/ - pa_usec_t transport_usec; /**< Estimated time in usecs a sample takes to be transferred to/from the daemon. For both playback and record streams. \since 0.5 */ - int playing; /**< Non-zero when the stream is currently playing. Only for playback streams. */ - uint32_t queue_length; /**< Queue size in bytes. For both playback and record streams. */ + struct timeval timestamp; /**< The time when this latency info was current */ int synchronized_clocks; /**< Non-zero if the local and the * remote machine have synchronized * clocks. If synchronized clocks are @@ -194,10 +190,31 @@ typedef struct pa_latency_info { * detects synchronized clocks is very * limited und unreliable itself. \since * 0.5 */ - struct timeval timestamp; /**< The time when this latency info was current */ - uint64_t counter; /**< The byte counter current when the latency info was requested. \since 0.6 */ - int64_t write_index; /**< Current absolute write index in the buffer. \since 0.8 */ - int64_t read_index; /**< Current absolute read index in the buffer. \since 0.8 */ + + pa_usec_t buffer_usec; /**< Time in usecs the current buffer takes to play. For both playback and record streams. */ + pa_usec_t sink_usec; /**< Time in usecs a sample takes to be played on the sink. For playback streams and record streams connected to a monitor source. */ + pa_usec_t source_usec; /**< Time in usecs a sample takes from being recorded to being delivered to the application. Only for record streams. \since 0.5*/ + pa_usec_t transport_usec; /**< Estimated time in usecs a sample takes to be transferred to/from the daemon. For both playback and record streams. \since 0.5 */ + + int playing; /**< Non-zero when the stream is currently playing. Only for playback streams. */ + + int write_index_corrupt; /**< Non-Zero if the write_index is not up to date because a local write command corrupted it */ + int64_t write_index; /**< Current write index into the + * playback buffer in bytes. Think twice before + * using this for seeking purposes: it + * might be out of date a the time you + * want to use it. Consider using + * PA_SEEK_RELATIVE instead. \since + * 0.8 */ + int64_t read_index; /**< Current read index into the + * playback buffer in bytes. Think twice before + * using this for seeking purposes: it + * might be out of date a the time you + * want to use it. Consider using + * PA_SEEK_RELATIVE_ON_READ + * instead. \since 0.8 */ + + uint32_t buffer_length; /* Current buffer length. This is usually identical to write_index-read_index. */ } pa_latency_info; /** A structure for the spawn api. This may be used to integrate auto @@ -227,7 +244,6 @@ typedef enum pa_seek_mode { PA_SEEK_RELATIVE_END = 3, /**< Seek relatively to the current end of the buffer queue */ } pa_seek_mode_t; - PA_C_DECL_END #endif diff --git a/src/polyp/internal.h b/src/polyp/internal.h index 82d8f7ce..8f1603b3 100644 --- a/src/polyp/internal.h +++ b/src/polyp/internal.h @@ -83,6 +83,15 @@ struct pa_context { pa_client_conf *conf; }; +#define PA_MAX_LATENCY_CORRECTIONS 10 + +typedef struct pa_latency_correction { + uint32_t tag; + int valid; + int64_t value; + int absolute, corrupt; +} pa_latency_correction; + struct pa_stream { int ref; pa_context *context; @@ -93,41 +102,48 @@ struct pa_stream { pa_buffer_attr buffer_attr; pa_sample_spec sample_spec; pa_channel_map channel_map; + pa_stream_flags_t flags; uint32_t channel; uint32_t syncid; int channel_valid; uint32_t device_index; pa_stream_direction_t direction; - uint32_t requested_bytes; - uint64_t counter; - pa_usec_t previous_time; - pa_usec_t previous_ipol_time; pa_stream_state_t state; + + uint32_t requested_bytes; + pa_memchunk peek_memchunk; pa_memblockq *record_memblockq; - pa_hashmap *counter_hashmap; - - int interpolate; int corked; - uint32_t ipol_usec; - struct timeval ipol_timestamp; + /* Store latest latency info */ + pa_latency_info latency_info; + int latency_info_valid; + + /* Use to make sure that time advances monotonically */ + pa_usec_t previous_time; + + /* Latency correction stuff */ + pa_latency_correction latency_corrections[PA_MAX_LATENCY_CORRECTIONS]; + int idx_latency_correction; + + /* Latency interpolation stuff */ pa_time_event *ipol_event; int ipol_requested; - + pa_usec_t ipol_usec; + int ipol_usec_valid; + struct timeval ipol_timestamp; + + /* Callbacks */ pa_stream_notify_cb_t state_callback; void *state_userdata; - pa_stream_request_cb_t read_callback; void *read_userdata; - pa_stream_request_cb_t write_callback; void *write_userdata; - pa_stream_notify_cb_t overflow_callback; void *overflow_userdata; - pa_stream_notify_cb_t underflow_callback; void *underflow_userdata; }; diff --git a/src/polyp/stream.c b/src/polyp/stream.c index 672c376a..d3599582 100644 --- a/src/polyp/stream.c +++ b/src/polyp/stream.c @@ -38,10 +38,10 @@ #include "internal.h" #define LATENCY_IPOL_INTERVAL_USEC (10000L) -#define COUNTER_HASHMAP_MAXSIZE (5) pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) { pa_stream *s; + int i; assert(c); @@ -67,6 +67,7 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * s->direction = PA_STREAM_NODIRECTION; s->name = pa_xstrdup(name); s->sample_spec = *ss; + s->flags = 0; if (map) s->channel_map = *map; @@ -87,20 +88,21 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * s->record_memblockq = NULL; - s->counter_hashmap = pa_hashmap_new(NULL, NULL); - - s->counter = 0; s->previous_time = 0; - s->previous_ipol_time = 0; + s->latency_info_valid = 0; s->corked = 0; - s->interpolate = 0; - s->ipol_usec = 0; - memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp)); + s->ipol_usec_valid = 0; + s->ipol_timestamp.tv_sec = 0; + s->ipol_timestamp.tv_usec = 0; s->ipol_event = NULL; s->ipol_requested = 0; + for (i = 0; i < PA_MAX_LATENCY_CORRECTIONS; i++) + s->latency_corrections[i].valid = 0; + s->idx_latency_correction = 0; + PA_LLIST_PREPEND(pa_stream, c->streams, s); /* The context and stream will point at each other. We cannot ref count @@ -110,10 +112,6 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * return s; } -static void hashmap_free_func(void *p, void *userdata) { - pa_xfree(p); -} - static void stream_free(pa_stream *s) { assert(s && s->context && !s->channel_valid); @@ -132,9 +130,6 @@ static void stream_free(pa_stream *s) { if (s->record_memblockq) pa_memblockq_free(s->record_memblockq); - if (s->counter_hashmap) - pa_hashmap_free(s->counter_hashmap, hashmap_free_func, NULL); - pa_xfree(s->name); pa_xfree(s); } @@ -315,10 +310,11 @@ static void ipol_callback(pa_mainloop_api *m, pa_time_event *e, PA_GCC_UNUSED co if (s->state == PA_STREAM_READY && !s->ipol_requested) { pa_operation *o; - o = pa_stream_get_latency_info(s, NULL, NULL); - if (o) + + if ((o = pa_stream_update_latency_info(s, NULL, NULL))) { pa_operation_unref(o); - s->ipol_requested = 1; + s->ipol_requested = 1; + } } pa_gettimeofday(&next); @@ -375,7 +371,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED /* We add an extra ref as long as we're connected (i.e. in the dynarray) */ pa_stream_ref(s); - if (s->interpolate) { + if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) { struct timeval tv; pa_gettimeofday(&tv); @@ -418,13 +414,11 @@ static int create_stream( pa_stream_ref(s); s->direction = direction; + s->flags = flags; if (sync_stream) s->syncid = sync_stream->syncid; - s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY); - pa_stream_trash_ipol(s); - if (attr) s->buffer_attr = *attr; else { @@ -548,7 +542,33 @@ int pa_stream_write( else s->requested_bytes = 0; - s->counter += length; + /* Update latency request correction */ + if (s->latency_corrections[s->idx_latency_correction].valid) { + + if (seek == PA_SEEK_ABSOLUTE) { + s->latency_corrections[s->idx_latency_correction].corrupt = 0; + s->latency_corrections[s->idx_latency_correction].absolute = 1; + s->latency_corrections[s->idx_latency_correction].value = offset + length; + } else if (seek == PA_SEEK_RELATIVE) { + if (!s->latency_corrections[s->idx_latency_correction].corrupt) + s->latency_corrections[s->idx_latency_correction].value += offset + length; + } else + s->latency_corrections[s->idx_latency_correction].corrupt = 1; + } + + /* Update the write index in the already available latency data */ + if (s->latency_info_valid) { + + if (seek == PA_SEEK_ABSOLUTE) { + s->latency_info.write_index_corrupt = 0; + s->latency_info.write_index = offset + length; + } else if (seek == PA_SEEK_RELATIVE) { + if (!s->latency_info.write_index_corrupt) + s->latency_info.write_index += offset + length; + } else + s->latency_info.write_index_corrupt = 1; + } + return 0; } @@ -590,7 +610,6 @@ int pa_stream_drop(pa_stream *s) { s->peek_memchunk.index = 0; s->peek_memchunk.memblock = NULL; - s->counter += s->peek_memchunk.length; return 0; } @@ -637,33 +656,34 @@ pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *us static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { pa_operation *o = userdata; - pa_latency_info i, *p = NULL; struct timeval local, remote, now; + pa_latency_info *i; assert(pd); assert(o); assert(o->stream); assert(o->context); - i.counter = *(uint64_t*)pa_hashmap_get(o->stream->counter_hashmap, (void*)(unsigned long)tag); - pa_xfree(pa_hashmap_remove(o->stream->counter_hashmap, (void*)(unsigned long)tag)); + i = &o->stream->latency_info; + o->stream->latency_info_valid = 0; + i->write_index_corrupt = 0; if (command != PA_COMMAND_REPLY) { if (pa_context_handle_error(o->context, command, t) < 0) goto finish; - } else if (pa_tagstruct_get_usec(t, &i.buffer_usec) < 0 || - pa_tagstruct_get_usec(t, &i.sink_usec) < 0 || - pa_tagstruct_get_usec(t, &i.source_usec) < 0 || - pa_tagstruct_get_boolean(t, &i.playing) < 0 || - pa_tagstruct_getu32(t, &i.queue_length) < 0 || + } else if (pa_tagstruct_get_usec(t, &i->buffer_usec) < 0 || + pa_tagstruct_get_usec(t, &i->sink_usec) < 0 || + pa_tagstruct_get_usec(t, &i->source_usec) < 0 || + pa_tagstruct_get_boolean(t, &i->playing) < 0 || pa_tagstruct_get_timeval(t, &local) < 0 || pa_tagstruct_get_timeval(t, &remote) < 0 || - pa_tagstruct_gets64(t, &i.write_index) < 0 || - pa_tagstruct_gets64(t, &i.read_index) < 0 || + pa_tagstruct_gets64(t, &i->write_index) < 0 || + pa_tagstruct_gets64(t, &i->read_index) < 0 || !pa_tagstruct_eof(t)) { pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; + } else { pa_gettimeofday(&now); @@ -671,72 +691,125 @@ static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command, /* local and remote seem to have synchronized clocks */ if (o->stream->direction == PA_STREAM_PLAYBACK) - i.transport_usec = pa_timeval_diff(&remote, &local); + i->transport_usec = pa_timeval_diff(&remote, &local); else - i.transport_usec = pa_timeval_diff(&now, &remote); + i->transport_usec = pa_timeval_diff(&now, &remote); - i.synchronized_clocks = 1; - i.timestamp = remote; + i->synchronized_clocks = 1; + i->timestamp = remote; } else { /* clocks are not synchronized, let's estimate latency then */ - i.transport_usec = pa_timeval_diff(&now, &local)/2; - i.synchronized_clocks = 0; - i.timestamp = local; - pa_timeval_add(&i.timestamp, i.transport_usec); + i->transport_usec = pa_timeval_diff(&now, &local)/2; + i->synchronized_clocks = 0; + i->timestamp = local; + pa_timeval_add(&i->timestamp, i->transport_usec); } - - if (o->stream->interpolate) { -/* pa_log("new interpol data"); */ - o->stream->ipol_timestamp = i.timestamp; - o->stream->ipol_usec = pa_stream_get_time(o->stream, &i); - o->stream->ipol_requested = 0; + + if (o->stream->direction == PA_STREAM_PLAYBACK) { + /* Write index correction */ + + int n, j; + uint32_t ctag = tag; + + /* Go through the saved correction values and add up the total correction.*/ + + for (n = 0, j = o->stream->idx_latency_correction; + n < PA_MAX_LATENCY_CORRECTIONS; + n++, j = (j + 1) % PA_MAX_LATENCY_CORRECTIONS) { + + /* Step over invalid data or out-of-date data */ + if (!o->stream->latency_corrections[j].valid || + o->stream->latency_corrections[j].tag < ctag) + continue; + + /* Make sure that everything is in order */ + ctag = o->stream->latency_corrections[j].tag+1; + + /* Now fix the write index */ + if (o->stream->latency_corrections[j].corrupt) { + /* A corrupting seek was made */ + i->write_index = 0; + i->write_index_corrupt = 1; + } else if (o->stream->latency_corrections[j].absolute) { + /* An absolute seek was made */ + i->write_index = o->stream->latency_corrections[j].value; + i->write_index_corrupt = 0; + } else if (!i->write_index_corrupt) { + /* A relative seek was made */ + i->write_index += o->stream->latency_corrections[j].value; + } + } } + + o->stream->latency_info_valid = 1; + + o->stream->ipol_timestamp = now; + o->stream->ipol_usec_valid = 0; + } - p = &i; + /* Clear old correction entries */ + if (o->stream->direction == PA_STREAM_PLAYBACK) { + int n; + + for (n = 0; n < PA_MAX_LATENCY_CORRECTIONS; n++) { + if (!o->stream->latency_corrections[n].valid) + continue; + + if (o->stream->latency_corrections[n].tag <= tag) + o->stream->latency_corrections[n].valid = 0; + } } if (o->callback) { - pa_stream_get_latency_info_cb_t cb = (pa_stream_get_latency_info_cb_t) o->callback; - cb(o->stream, p, o->userdata); + pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback; + cb(o->stream, o->stream->latency_info_valid, o->userdata); } - + finish: + pa_operation_done(o); pa_operation_unref(o); } -pa_operation* pa_stream_get_latency_info(pa_stream *s, pa_stream_get_latency_info_cb_t cb, void *userdata) { +pa_operation* pa_stream_update_latency_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { uint32_t tag; pa_operation *o; pa_tagstruct *t; struct timeval now; - uint64_t *counter; + int cidx; assert(s); assert(s->ref >= 1); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_hashmap_size(s->counter_hashmap) < COUNTER_HASHMAP_MAXSIZE, PA_ERR_INTERNAL); + + /* Find a place to store the write_index correction data for this entry */ + cidx = (s->idx_latency_correction + 1) % PA_MAX_LATENCY_CORRECTIONS; + + /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */ + PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->latency_corrections[cidx].valid, PA_ERR_INTERNAL); o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); - + t = pa_tagstruct_command( s->context, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY, &tag); pa_tagstruct_putu32(t, s->channel); - - pa_gettimeofday(&now); - pa_tagstruct_put_timeval(t, &now); + pa_tagstruct_put_timeval(t, pa_gettimeofday(&now)); pa_pstream_send_tagstruct(s->context->pstream, t); pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_info_callback, o); - counter = pa_xmalloc(sizeof(uint64_t)); - *counter = s->counter; - pa_hashmap_put(s->counter_hashmap, (void*)(unsigned long)tag, counter); - + /* Fill in initial correction data */ + o->stream->idx_latency_correction = cidx; + o->stream->latency_corrections[cidx].valid = 1; + o->stream->latency_corrections[cidx].tag = tag; + o->stream->latency_corrections[cidx].absolute = 0; + o->stream->latency_corrections[cidx].value = 0; + o->stream->latency_corrections[cidx].corrupt = 0; + return pa_operation_ref(o); } @@ -872,11 +945,11 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - if (s->interpolate) { - if (!s->corked && b) - /* Pausing */ - s->ipol_usec = pa_stream_get_interpolated_time(s); - else if (s->corked && !b) + if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) { + if (!s->corked && b) { + /* Refresh the interpolated data just befor pausing */ + pa_stream_get_time(s, NULL); + } else if (s->corked && !b) /* Unpausing */ pa_gettimeofday(&s->ipol_timestamp); } @@ -894,8 +967,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi pa_pstream_send_tagstruct(s->context->pstream, t); pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); - lo = pa_stream_get_latency_info(s, NULL, NULL); - if (lo) + if ((lo = pa_stream_update_latency_info(s, NULL, NULL))) pa_operation_unref(lo); return pa_operation_ref(o); @@ -928,8 +1000,8 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use if ((o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata))) { pa_operation *lo; - lo = pa_stream_get_latency_info(s, NULL, NULL); - if (lo) + + if ((lo = pa_stream_update_latency_info(s, NULL, NULL))) pa_operation_unref(lo); } @@ -943,8 +1015,8 @@ pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *us if ((o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata))) { pa_operation *lo; - lo = pa_stream_get_latency_info(s, NULL, NULL); - if (lo) + + if ((lo = pa_stream_update_latency_info(s, NULL, NULL))) pa_operation_unref(lo); } @@ -958,8 +1030,8 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u if ((o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata))) { pa_operation *lo; - lo = pa_stream_get_latency_info(s, NULL, NULL); - if (lo) + + if ((lo = pa_stream_update_latency_info(s, NULL, NULL))) pa_operation_unref(lo); } @@ -992,150 +1064,165 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe return pa_operation_ref(o); } -uint64_t pa_stream_get_counter(pa_stream *s) { - assert(s); - assert(s->ref >= 1); - - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (uint64_t) -1); - - return s->counter; -} - -pa_usec_t pa_stream_get_time(pa_stream *s, const pa_latency_info *i) { - pa_usec_t usec; +int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { + pa_usec_t usec = 0; assert(s); assert(s->ref >= 1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); - - usec = pa_bytes_to_usec(i->counter, &s->sample_spec); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->latency_info_valid, PA_ERR_NODATA); - if (i) { + if (s->flags & PA_STREAM_INTERPOLATE_LATENCY && s->ipol_usec_valid ) + usec = s->ipol_usec; + else { if (s->direction == PA_STREAM_PLAYBACK) { - pa_usec_t latency = i->transport_usec + i->buffer_usec + i->sink_usec; - if (usec < latency) + /* The last byte that was written into the output device + * had this time value associated */ + usec = pa_bytes_to_usec(s->latency_info.read_index < 0 ? 0 : (uint64_t) s->latency_info.read_index, &s->sample_spec); + + /* Because the latency info took a little time to come + * to us, we assume that the real output time is actually + * a little ahead */ + usec += s->latency_info.transport_usec; + + /* However, the output device usually maintains a buffer + too, hence the real sample currently played is a little + back */ + if (s->latency_info.sink_usec >= usec) usec = 0; else - usec -= latency; - + usec -= s->latency_info.sink_usec; + } else if (s->direction == PA_STREAM_RECORD) { - usec += i->source_usec + i->buffer_usec + i->transport_usec; - - if (usec > i->sink_usec) - usec -= i->sink_usec; - else + /* The last byte written into the server side queue had + * this time value associated */ + usec = pa_bytes_to_usec(s->latency_info.write_index < 0 ? 0 : (uint64_t) s->latency_info.write_index, &s->sample_spec); + + /* Add transport latency */ + usec += s->latency_info.transport_usec; + + /* Add latency of data in device buffer */ + usec += s->latency_info.source_usec; + + /* If this is a monitor source, we need to correct the + * time by the playback device buffer */ + if (s->latency_info.sink_usec >= usec) usec = 0; + else + usec -= s->latency_info.sink_usec; + } + + if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) { + s->ipol_usec_valid = 1; + s->ipol_usec = usec; } } - if (usec < s->previous_time) - usec = s->previous_time; + /* Interpolate if requested */ + if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) { - s->previous_time = usec; + /* We just add the time that passed since the latency info was + * current */ + if (!s->corked) { + struct timeval now; + + usec += pa_timeval_diff(pa_gettimeofday(&now), &s->ipol_timestamp); + s->ipol_timestamp = now; + } + } - return usec; + /* Make sure the time runs monotonically */ + if (!(s->flags & PA_STREAM_NOT_MONOTONOUS)) { + if (usec < s->previous_time) + usec = s->previous_time; + else + s->previous_time = usec; + } + + if (r_usec) + *r_usec = usec; + + return 0; } -static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t t, pa_usec_t c, int *negative) { +static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) { assert(s); assert(s->ref >= 1); if (negative) *negative = 0; - if (c < t) { - if (s->direction == PA_STREAM_RECORD) { - if (negative) - *negative = 1; - - return t-c; + if (a >= b) + return a-b; + else { + if (negative && s->direction == PA_STREAM_RECORD) { + *negative = 1; + return b-a; } else return 0; - } else - return c-t; + } } -pa_usec_t pa_stream_get_latency(pa_stream *s, const pa_latency_info *i, int *negative) { +int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) { pa_usec_t t, c; + int r; + int64_t cindex; assert(s); assert(s->ref >= 1); - assert(i); + assert(r_usec); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->latency_info_valid, PA_ERR_NODATA); + PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->latency_info.write_index_corrupt, PA_ERR_NODATA); - t = pa_stream_get_time(s, i); - c = pa_bytes_to_usec(s->counter, &s->sample_spec); - - return time_counter_diff(s, t, c, negative); -} + if ((r = pa_stream_get_time(s, &t)) < 0) + return r; -const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) { - assert(s); - assert(s->ref >= 1); + if (s->direction == PA_STREAM_PLAYBACK) + cindex = s->latency_info.write_index; + else + cindex = s->latency_info.read_index; - return &s->sample_spec; -} + if (cindex < 0) + cindex = 0; + + c = pa_bytes_to_usec(cindex, &s->sample_spec); -const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) { - assert(s); - assert(s->ref >= 1); + if (s->direction == PA_STREAM_PLAYBACK) + *r_usec = time_counter_diff(s, c, t, negative); + else + *r_usec = time_counter_diff(s, t, c, negative); - return &s->channel_map; + return 0; } -void pa_stream_trash_ipol(pa_stream *s) { +const pa_latency_info* pa_stream_get_latency_info(pa_stream *s) { assert(s); assert(s->ref >= 1); - if (!s->interpolate) - return; + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->latency_info_valid, PA_ERR_BADSTATE); - memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp)); - s->ipol_usec = 0; + return &s->latency_info; } -pa_usec_t pa_stream_get_interpolated_time(pa_stream *s) { - pa_usec_t usec; - +const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) { assert(s); assert(s->ref >= 1); - - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->interpolate, PA_ERR_BADSTATE, (pa_usec_t) -1); - - if (s->corked) - usec = s->ipol_usec; - else { - if (s->ipol_timestamp.tv_sec == 0) - usec = 0; - else - usec = s->ipol_usec + pa_timeval_age(&s->ipol_timestamp); - } - - if (usec < s->previous_ipol_time) - usec = s->previous_ipol_time; - - s->previous_ipol_time = usec; - return usec; + return &s->sample_spec; } -pa_usec_t pa_stream_get_interpolated_latency(pa_stream *s, int *negative) { - pa_usec_t t, c; - +const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) { assert(s); assert(s->ref >= 1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->interpolate, PA_ERR_BADSTATE, (pa_usec_t) -1); - - t = pa_stream_get_interpolated_time(s); - c = pa_bytes_to_usec(s->counter, &s->sample_spec); - return time_counter_diff(s, t, c, negative); + return &s->channel_map; } + + diff --git a/src/polyp/stream.h b/src/polyp/stream.h index 75b0a900..99284ba3 100644 --- a/src/polyp/stream.h +++ b/src/polyp/stream.h @@ -51,9 +51,6 @@ typedef void (*pa_stream_request_cb_t)(pa_stream *p, size_t length, void *userda /** A generic notification callback */ typedef void (*pa_stream_notify_cb_t)(pa_stream *p, void *userdata); -/** Callback prototype for pa_stream_get_latency_info() */ -typedef void (*pa_stream_get_latency_info_cb_t)(pa_stream *p, const pa_latency_info *i, void *userdata); - /** Create a new, unconnected stream with the specified name and sample type */ pa_stream* pa_stream_new( pa_context *c /**< The context to create this stream in */, @@ -133,8 +130,8 @@ size_t pa_stream_readable_size(pa_stream *p); /** Drain a playback stream. Use this for notification when the buffer is empty */ pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata); -/** Get the playback latency of a stream */ -pa_operation* pa_stream_get_latency_info(pa_stream *p, pa_stream_get_latency_info_cb_t cby, void *userdata); +/** Update the latency info of a stream */ +pa_operation* pa_stream_update_latency_info(pa_stream *p, pa_stream_success_cb_t cb, void *userdata); /** Set the callback function that is called whenever the state of the stream changes */ void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata); @@ -173,34 +170,21 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u /** Rename the stream. \since 0.5 */ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata); -/** Return the total number of bytes written to/read from the - * stream. This counter is not reset on pa_stream_flush(), you may do - * this yourself using pa_stream_reset_counter(). \since 0.6 */ -uint64_t pa_stream_get_counter(pa_stream *s); - /** Return the current playback/recording time. This is based on the * counter accessible with pa_stream_get_counter(). This function * requires a pa_latency_info structure as argument, which should be * acquired using pa_stream_get_latency(). \since 0.6 */ -pa_usec_t pa_stream_get_time(pa_stream *s, const pa_latency_info *i); +int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec); /** Return the total stream latency. Thus function requires a * pa_latency_info structure as argument, which should be aquired * using pa_stream_get_latency(). In case the stream is a monitoring * stream the result can be negative, i.e. the captured samples are * not yet played. In this case *negative is set to 1. \since 0.6 */ -pa_usec_t pa_stream_get_latency(pa_stream *s, const pa_latency_info *i, int *negative); - -/** Return the interpolated playback/recording time. Requires the - * PA_STREAM_INTERPOLATE_LATENCY bit set when creating the stream. In - * contrast to pa_stream_get_latency() this function doesn't require - * a whole roundtrip for response. \since 0.6 */ -pa_usec_t pa_stream_get_interpolated_time(pa_stream *s); - -/** Return the interpolated playback/recording latency. Requires the - * PA_STREAM_INTERPOLATE_LATENCY bit set when creating the - * stream. \since 0.6 */ -pa_usec_t pa_stream_get_interpolated_latency(pa_stream *s, int *negative); +int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative); + +/** Return the latest latency data. \since 0.8 */ +const pa_latency_info* pa_stream_get_latency_info(pa_stream *s); /** Return a pointer to the stream's sample specification. \since 0.6 */ const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s); -- cgit