diff options
| author | Lennart Poettering <lennart@poettering.net> | 2006-04-07 00:24:48 +0000 | 
|---|---|---|
| committer | Lennart Poettering <lennart@poettering.net> | 2006-04-07 00:24:48 +0000 | 
| commit | 920f045380d70785d6ca483d901610d70daee361 (patch) | |
| tree | cd36c8581a8d4630c346c229e3ce9b05c3cd83e2 | |
| parent | cc302f2d17b172bec60b25d549a77b1cb3d17d99 (diff) | |
rework latency querying API (this needs more testing)
git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@648 fefdeb5f-60dc-0310-8127-8f9354f1896f
| -rw-r--r-- | src/polyp/def.h | 44 | ||||
| -rw-r--r-- | src/polyp/internal.h | 44 | ||||
| -rw-r--r-- | src/polyp/stream.c | 425 | ||||
| -rw-r--r-- | src/polyp/stream.h | 30 | 
4 files changed, 323 insertions, 220 deletions
diff --git a/src/polyp/def.h b/src/polyp/def.h index 98420bc0..93d0996b 100644 --- a/src/polyp/def.h +++ b/src/polyp/def.h @@ -80,7 +80,7 @@ typedef enum pa_stream_direction {  /** Some special flags for stream connections. \since 0.6 */  typedef enum pa_stream_flags {      PA_STREAM_START_CORKED = 1,       /**< Create the stream corked, requiring an explicit pa_stream_cork() call to uncork it. */ -    PA_STREAM_INTERPOLATE_LATENCY = 2 /**< Interpolate the latency for +    PA_STREAM_INTERPOLATE_LATENCY = 2, /**< Interpolate the latency for                                         * this stream. When enabled,                                         * you can use                                         * pa_stream_interpolated_xxx() @@ -95,6 +95,7 @@ typedef enum pa_stream_flags {                                         * information. This is                                         * especially useful on long latency                                         * network connections. */ +    PA_STREAM_NOT_MONOTONOUS = 4,    /**< Don't force the time to run monotonically */  } pa_stream_flags_t;  /** Playback and record buffer metrics */ @@ -124,7 +125,7 @@ enum {      PA_ERR_INVALIDSERVER,          /**< Invalid server */      PA_ERR_MODINITFAILED,          /**< Module initialization failed */      PA_ERR_BADSTATE,               /**< Bad state */ -    PA_ERR_NODATA,                 /**< No data */  +    PA_ERR_NODATA,                 /**< No data */      PA_ERR_VERSION,                /**< Incompatible protocol version \since 0.8 */      PA_ERR_MAX                     /**< Not really an error but the first invalid error code */  }; @@ -171,7 +172,7 @@ typedef enum pa_subscription_event_type {   * pa_stream_write() takes to be played may be estimated by   * sink_usec+buffer_usec+transport_usec. The output buffer to which   * buffer_usec relates may be manipulated freely (with - * pa_stream_write()'s delta argument, pa_stream_flush() and friends), + * pa_stream_write()'s seek argument, pa_stream_flush() and friends),   * the buffers sink_usec/source_usec relates to is a first-in   * first-out buffer which cannot be flushed or manipulated in any   * way. The total input latency a sample that is recorded takes to be @@ -180,12 +181,7 @@ typedef enum pa_subscription_event_type {   * sign issues!) When connected to a monitor source sink_usec contains   * the latency of the owning sink.*/  typedef struct pa_latency_info { -    pa_usec_t buffer_usec;    /**< Time in usecs the current buffer takes to play. For both playback and record streams. */ -    pa_usec_t sink_usec;      /**< Time in usecs a sample takes to be played on the sink. For playback streams and record streams connected to a monitor source. */ -    pa_usec_t source_usec;    /**< Time in usecs a sample takes from being recorded to being delivered to the application. Only for record streams. \since 0.5*/ -    pa_usec_t transport_usec; /**< Estimated time in usecs a sample takes to be transferred to/from the daemon. For both playback and record streams. \since 0.5 */ -    int playing;              /**< Non-zero when the stream is currently playing. Only for playback streams. */ -    uint32_t queue_length;    /**< Queue size in bytes. For both playback and record streams. */ +    struct timeval timestamp; /**< The time when this latency info was current */      int synchronized_clocks;  /**< Non-zero if the local and the                                 * remote machine have synchronized                                 * clocks. If synchronized clocks are @@ -194,10 +190,31 @@ typedef struct pa_latency_info {                                 * detects synchronized clocks is very                                 * limited und unreliable itself. \since                                 * 0.5 */ -    struct timeval timestamp; /**< The time when this latency info was current */ -    uint64_t counter;         /**< The byte counter current when the latency info was requested. \since 0.6 */ -    int64_t write_index;      /**< Current absolute write index in the buffer. \since 0.8 */ -    int64_t read_index;       /**< Current absolute read index in the buffer. \since 0.8 */ + +    pa_usec_t buffer_usec;    /**< Time in usecs the current buffer takes to play. For both playback and record streams. */ +    pa_usec_t sink_usec;      /**< Time in usecs a sample takes to be played on the sink. For playback streams and record streams connected to a monitor source. */ +    pa_usec_t source_usec;    /**< Time in usecs a sample takes from being recorded to being delivered to the application. Only for record streams. \since 0.5*/ +    pa_usec_t transport_usec; /**< Estimated time in usecs a sample takes to be transferred to/from the daemon. For both playback and record streams. \since 0.5 */ + +    int playing;              /**< Non-zero when the stream is currently playing. Only for playback streams. */ + +    int write_index_corrupt;  /**< Non-Zero if the write_index is not up to date because a local write command corrupted it */ +    int64_t write_index;      /**< Current write index into the +                               * playback buffer in bytes. Think twice before +                               * using this for seeking purposes: it +                               * might be out of date a the time you +                               * want to use it. Consider using +                               * PA_SEEK_RELATIVE instead. \since +                               * 0.8 */  +    int64_t read_index;       /**< Current read index into the +                               * playback buffer in bytes. Think twice before +                               * using this for seeking purposes: it +                               * might be out of date a the time you +                               * want to use it. Consider using +                               * PA_SEEK_RELATIVE_ON_READ +                               * instead. \since 0.8 */ + +    uint32_t buffer_length;   /* Current buffer length. This is usually identical to write_index-read_index. */  } pa_latency_info;  /** A structure for the spawn api. This may be used to integrate auto @@ -227,7 +244,6 @@ typedef enum pa_seek_mode {      PA_SEEK_RELATIVE_END = 3,       /**< Seek relatively to the current end of the buffer queue */  } pa_seek_mode_t; -  PA_C_DECL_END  #endif diff --git a/src/polyp/internal.h b/src/polyp/internal.h index 82d8f7ce..8f1603b3 100644 --- a/src/polyp/internal.h +++ b/src/polyp/internal.h @@ -83,6 +83,15 @@ struct pa_context {      pa_client_conf *conf;  }; +#define PA_MAX_LATENCY_CORRECTIONS 10 + +typedef struct pa_latency_correction { +    uint32_t tag; +    int valid; +    int64_t value; +    int absolute, corrupt; +} pa_latency_correction; +  struct pa_stream {      int ref;      pa_context *context; @@ -93,41 +102,48 @@ struct pa_stream {      pa_buffer_attr buffer_attr;      pa_sample_spec sample_spec;      pa_channel_map channel_map; +    pa_stream_flags_t flags;      uint32_t channel;      uint32_t syncid;      int channel_valid;      uint32_t device_index;      pa_stream_direction_t direction; -    uint32_t requested_bytes; -    uint64_t counter; -    pa_usec_t previous_time; -    pa_usec_t previous_ipol_time;      pa_stream_state_t state; +     +    uint32_t requested_bytes; +      pa_memchunk peek_memchunk;      pa_memblockq *record_memblockq; -    pa_hashmap *counter_hashmap; - -    int interpolate;      int corked; -    uint32_t ipol_usec; -    struct timeval ipol_timestamp; +    /* Store latest latency info */ +    pa_latency_info latency_info; +    int latency_info_valid; +     +    /* Use to make sure that time advances monotonically */ +    pa_usec_t previous_time; +     +    /* Latency correction stuff */ +    pa_latency_correction latency_corrections[PA_MAX_LATENCY_CORRECTIONS]; +    int idx_latency_correction; + +    /* Latency interpolation stuff */      pa_time_event *ipol_event;      int ipol_requested; -     +    pa_usec_t ipol_usec; +    int ipol_usec_valid; +    struct timeval ipol_timestamp; + +    /* Callbacks */      pa_stream_notify_cb_t state_callback;      void *state_userdata; -      pa_stream_request_cb_t read_callback;      void *read_userdata; -      pa_stream_request_cb_t write_callback;      void *write_userdata; -      pa_stream_notify_cb_t overflow_callback;      void *overflow_userdata; -      pa_stream_notify_cb_t underflow_callback;      void *underflow_userdata;  }; diff --git a/src/polyp/stream.c b/src/polyp/stream.c index 672c376a..d3599582 100644 --- a/src/polyp/stream.c +++ b/src/polyp/stream.c @@ -38,10 +38,10 @@  #include "internal.h"  #define LATENCY_IPOL_INTERVAL_USEC (10000L) -#define COUNTER_HASHMAP_MAXSIZE (5)  pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {      pa_stream *s; +    int i;      assert(c); @@ -67,6 +67,7 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *      s->direction = PA_STREAM_NODIRECTION;      s->name = pa_xstrdup(name);      s->sample_spec = *ss; +    s->flags = 0;      if (map)          s->channel_map = *map; @@ -87,20 +88,21 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *      s->record_memblockq = NULL; -    s->counter_hashmap = pa_hashmap_new(NULL, NULL); - -    s->counter = 0;      s->previous_time = 0; -    s->previous_ipol_time = 0; +    s->latency_info_valid = 0;      s->corked = 0; -    s->interpolate = 0; -    s->ipol_usec = 0; -    memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp)); +    s->ipol_usec_valid = 0; +    s->ipol_timestamp.tv_sec = 0; +    s->ipol_timestamp.tv_usec = 0;      s->ipol_event = NULL;      s->ipol_requested = 0; +    for (i = 0; i < PA_MAX_LATENCY_CORRECTIONS; i++) +        s->latency_corrections[i].valid = 0; +    s->idx_latency_correction = 0; +          PA_LLIST_PREPEND(pa_stream, c->streams, s);      /* The context and stream will point at each other. We cannot ref count @@ -110,10 +112,6 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *      return s;  } -static void hashmap_free_func(void *p, void *userdata) { -    pa_xfree(p); -} -  static void stream_free(pa_stream *s) {      assert(s && s->context && !s->channel_valid); @@ -132,9 +130,6 @@ static void stream_free(pa_stream *s) {      if (s->record_memblockq)          pa_memblockq_free(s->record_memblockq); -    if (s->counter_hashmap) -        pa_hashmap_free(s->counter_hashmap, hashmap_free_func, NULL); -      pa_xfree(s->name);      pa_xfree(s);  } @@ -315,10 +310,11 @@ static void ipol_callback(pa_mainloop_api *m, pa_time_event *e, PA_GCC_UNUSED co      if (s->state == PA_STREAM_READY && !s->ipol_requested) {          pa_operation *o; -        o = pa_stream_get_latency_info(s, NULL, NULL); -        if (o) +         +        if ((o = pa_stream_update_latency_info(s, NULL, NULL))) {              pa_operation_unref(o); -        s->ipol_requested = 1; +            s->ipol_requested = 1; +        }      }      pa_gettimeofday(&next); @@ -375,7 +371,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED      /* We add an extra ref as long as we're connected (i.e. in the dynarray) */      pa_stream_ref(s); -    if (s->interpolate) { +    if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) {          struct timeval tv;          pa_gettimeofday(&tv); @@ -418,13 +414,11 @@ static int create_stream(      pa_stream_ref(s);      s->direction = direction; +    s->flags = flags;      if (sync_stream)          s->syncid = sync_stream->syncid; -    s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY); -    pa_stream_trash_ipol(s); -          if (attr)          s->buffer_attr = *attr;      else { @@ -548,7 +542,33 @@ int pa_stream_write(      else          s->requested_bytes = 0; -    s->counter += length; +    /* Update latency request correction */ +    if (s->latency_corrections[s->idx_latency_correction].valid) { + +        if (seek == PA_SEEK_ABSOLUTE) { +            s->latency_corrections[s->idx_latency_correction].corrupt = 0; +            s->latency_corrections[s->idx_latency_correction].absolute = 1; +            s->latency_corrections[s->idx_latency_correction].value = offset + length; +        } else if (seek == PA_SEEK_RELATIVE) { +            if (!s->latency_corrections[s->idx_latency_correction].corrupt) +                s->latency_corrections[s->idx_latency_correction].value += offset + length; +        } else +            s->latency_corrections[s->idx_latency_correction].corrupt = 1; +    } + +    /* Update the write index in the already available latency data */ +    if (s->latency_info_valid) { + +        if (seek == PA_SEEK_ABSOLUTE) { +            s->latency_info.write_index_corrupt = 0; +            s->latency_info.write_index = offset + length; +        } else if (seek == PA_SEEK_RELATIVE) { +            if (!s->latency_info.write_index_corrupt) +                s->latency_info.write_index += offset + length; +        } else +            s->latency_info.write_index_corrupt = 1; +    } +          return 0;  } @@ -590,7 +610,6 @@ int pa_stream_drop(pa_stream *s) {      s->peek_memchunk.index = 0;      s->peek_memchunk.memblock = NULL; -    s->counter += s->peek_memchunk.length;      return 0;  } @@ -637,33 +656,34 @@ pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *us  static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {      pa_operation *o = userdata; -    pa_latency_info i, *p = NULL;      struct timeval local, remote, now; +    pa_latency_info *i;      assert(pd);      assert(o);      assert(o->stream);      assert(o->context); -    i.counter = *(uint64_t*)pa_hashmap_get(o->stream->counter_hashmap, (void*)(unsigned long)tag); -    pa_xfree(pa_hashmap_remove(o->stream->counter_hashmap, (void*)(unsigned long)tag)); +    i = &o->stream->latency_info; +    o->stream->latency_info_valid = 0; +    i->write_index_corrupt = 0;      if (command != PA_COMMAND_REPLY) {          if (pa_context_handle_error(o->context, command, t) < 0)              goto finish; -    } else if (pa_tagstruct_get_usec(t, &i.buffer_usec) < 0 || -               pa_tagstruct_get_usec(t, &i.sink_usec) < 0 || -               pa_tagstruct_get_usec(t, &i.source_usec) < 0 || -               pa_tagstruct_get_boolean(t, &i.playing) < 0 || -               pa_tagstruct_getu32(t, &i.queue_length) < 0 || +    } else if (pa_tagstruct_get_usec(t, &i->buffer_usec) < 0 || +               pa_tagstruct_get_usec(t, &i->sink_usec) < 0 || +               pa_tagstruct_get_usec(t, &i->source_usec) < 0 || +               pa_tagstruct_get_boolean(t, &i->playing) < 0 ||                 pa_tagstruct_get_timeval(t, &local) < 0 ||                 pa_tagstruct_get_timeval(t, &remote) < 0 || -               pa_tagstruct_gets64(t, &i.write_index) < 0 || -               pa_tagstruct_gets64(t, &i.read_index) < 0 || +               pa_tagstruct_gets64(t, &i->write_index) < 0 || +               pa_tagstruct_gets64(t, &i->read_index) < 0 ||                 !pa_tagstruct_eof(t)) {          pa_context_fail(o->context, PA_ERR_PROTOCOL);          goto finish; +              } else {          pa_gettimeofday(&now); @@ -671,72 +691,125 @@ static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command,              /* local and remote seem to have synchronized clocks */              if (o->stream->direction == PA_STREAM_PLAYBACK) -                i.transport_usec = pa_timeval_diff(&remote, &local); +                i->transport_usec = pa_timeval_diff(&remote, &local);              else -                i.transport_usec = pa_timeval_diff(&now, &remote); +                i->transport_usec = pa_timeval_diff(&now, &remote); -            i.synchronized_clocks = 1; -            i.timestamp = remote; +            i->synchronized_clocks = 1; +            i->timestamp = remote;          } else {              /* clocks are not synchronized, let's estimate latency then */ -            i.transport_usec = pa_timeval_diff(&now, &local)/2; -            i.synchronized_clocks = 0; -            i.timestamp = local; -            pa_timeval_add(&i.timestamp, i.transport_usec); +            i->transport_usec = pa_timeval_diff(&now, &local)/2; +            i->synchronized_clocks = 0; +            i->timestamp = local; +            pa_timeval_add(&i->timestamp, i->transport_usec);          } -         -        if (o->stream->interpolate) { -/*              pa_log("new interpol data");  */ -            o->stream->ipol_timestamp = i.timestamp; -            o->stream->ipol_usec = pa_stream_get_time(o->stream, &i); -            o->stream->ipol_requested = 0; + +        if (o->stream->direction == PA_STREAM_PLAYBACK) { +            /* Write index correction */ + +            int n, j; +            uint32_t ctag = tag; + +            /* Go through the saved correction values and add up the total correction.*/ +             +            for (n = 0, j = o->stream->idx_latency_correction; +                 n < PA_MAX_LATENCY_CORRECTIONS; +                 n++, j = (j + 1) % PA_MAX_LATENCY_CORRECTIONS) { + +                /* Step over invalid data or out-of-date data */ +                if (!o->stream->latency_corrections[j].valid || +                    o->stream->latency_corrections[j].tag < ctag) +                    continue; + +                /* Make sure that everything is in order */ +                ctag = o->stream->latency_corrections[j].tag+1; + +                /* Now fix the write index */ +                if (o->stream->latency_corrections[j].corrupt) { +                    /* A corrupting seek was made */ +                    i->write_index = 0; +                    i->write_index_corrupt = 1; +                } else if (o->stream->latency_corrections[j].absolute) { +                    /* An absolute seek was made */ +                    i->write_index = o->stream->latency_corrections[j].value; +                    i->write_index_corrupt = 0; +                } else if (!i->write_index_corrupt) { +                    /* A relative seek was made */ +                    i->write_index += o->stream->latency_corrections[j].value; +                } +            }          } +         +        o->stream->latency_info_valid = 1; +         +        o->stream->ipol_timestamp = now; +        o->stream->ipol_usec_valid = 0; +    } -        p = &i; +    /* Clear old correction entries */ +    if (o->stream->direction == PA_STREAM_PLAYBACK) { +        int n; +         +        for (n = 0; n < PA_MAX_LATENCY_CORRECTIONS; n++) { +            if (!o->stream->latency_corrections[n].valid) +                continue; +             +            if (o->stream->latency_corrections[n].tag <= tag) +                o->stream->latency_corrections[n].valid = 0; +        }      }      if (o->callback) { -        pa_stream_get_latency_info_cb_t cb = (pa_stream_get_latency_info_cb_t) o->callback; -        cb(o->stream, p, o->userdata); +        pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback; +        cb(o->stream, o->stream->latency_info_valid, o->userdata);      } - +      finish: +      pa_operation_done(o);      pa_operation_unref(o);  } -pa_operation* pa_stream_get_latency_info(pa_stream *s, pa_stream_get_latency_info_cb_t cb, void *userdata) { +pa_operation* pa_stream_update_latency_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {      uint32_t tag;      pa_operation *o;      pa_tagstruct *t;      struct timeval now; -    uint64_t *counter; +    int cidx;      assert(s);      assert(s->ref >= 1);      PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);      PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); -    PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_hashmap_size(s->counter_hashmap) < COUNTER_HASHMAP_MAXSIZE, PA_ERR_INTERNAL); + +    /* Find a place to store the write_index correction data for this entry */ +    cidx = (s->idx_latency_correction + 1) % PA_MAX_LATENCY_CORRECTIONS; + +    /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */ +    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->latency_corrections[cidx].valid, PA_ERR_INTERNAL);      o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); - +          t = pa_tagstruct_command(              s->context,              s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY,              &tag);      pa_tagstruct_putu32(t, s->channel); - -    pa_gettimeofday(&now); -    pa_tagstruct_put_timeval(t, &now); +    pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));      pa_pstream_send_tagstruct(s->context->pstream, t);      pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_info_callback, o); -    counter = pa_xmalloc(sizeof(uint64_t)); -    *counter = s->counter; -    pa_hashmap_put(s->counter_hashmap, (void*)(unsigned long)tag, counter); - +    /* Fill in initial correction data */ +    o->stream->idx_latency_correction = cidx; +    o->stream->latency_corrections[cidx].valid = 1; +    o->stream->latency_corrections[cidx].tag = tag; +    o->stream->latency_corrections[cidx].absolute = 0; +    o->stream->latency_corrections[cidx].value = 0; +    o->stream->latency_corrections[cidx].corrupt = 0; +          return pa_operation_ref(o);  } @@ -872,11 +945,11 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi      PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);      PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); -    if (s->interpolate) { -        if (!s->corked && b) -            /* Pausing */ -            s->ipol_usec = pa_stream_get_interpolated_time(s); -        else if (s->corked && !b) +    if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) { +        if (!s->corked && b) { +            /* Refresh the interpolated data just befor pausing */ +            pa_stream_get_time(s, NULL); +        } else if (s->corked && !b)              /* Unpausing */              pa_gettimeofday(&s->ipol_timestamp);      } @@ -894,8 +967,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi      pa_pstream_send_tagstruct(s->context->pstream, t);      pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); -    lo = pa_stream_get_latency_info(s, NULL, NULL); -    if (lo) +    if ((lo = pa_stream_update_latency_info(s, NULL, NULL)))          pa_operation_unref(lo);      return pa_operation_ref(o); @@ -928,8 +1000,8 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use      if ((o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata))) {          pa_operation *lo; -        lo = pa_stream_get_latency_info(s, NULL, NULL); -        if (lo) + +        if ((lo = pa_stream_update_latency_info(s, NULL, NULL)))              pa_operation_unref(lo);      } @@ -943,8 +1015,8 @@ pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *us      if ((o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata))) {          pa_operation *lo; -        lo = pa_stream_get_latency_info(s, NULL, NULL); -        if (lo) + +        if ((lo = pa_stream_update_latency_info(s, NULL, NULL)))              pa_operation_unref(lo);      } @@ -958,8 +1030,8 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u      if ((o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata))) {          pa_operation *lo; -        lo = pa_stream_get_latency_info(s, NULL, NULL); -        if (lo) + +        if ((lo = pa_stream_update_latency_info(s, NULL, NULL)))              pa_operation_unref(lo);      } @@ -992,150 +1064,165 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe      return pa_operation_ref(o);  } -uint64_t pa_stream_get_counter(pa_stream *s) { -    assert(s); -    assert(s->ref >= 1); - -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (uint64_t) -1); -     -    return s->counter; -} - -pa_usec_t pa_stream_get_time(pa_stream *s, const pa_latency_info *i) { -    pa_usec_t usec; +int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { +    pa_usec_t usec = 0;      assert(s);      assert(s->ref >= 1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); -     -    usec = pa_bytes_to_usec(i->counter, &s->sample_spec); +    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); +    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); +    PA_CHECK_VALIDITY(s->context, s->latency_info_valid, PA_ERR_NODATA); -    if (i) { +    if (s->flags & PA_STREAM_INTERPOLATE_LATENCY && s->ipol_usec_valid ) +        usec = s->ipol_usec; +    else {          if (s->direction == PA_STREAM_PLAYBACK) { -            pa_usec_t latency = i->transport_usec + i->buffer_usec + i->sink_usec; -            if (usec < latency) +            /* The last byte that was written into the output device +             * had this time value associated */ +            usec = pa_bytes_to_usec(s->latency_info.read_index < 0 ? 0 : (uint64_t) s->latency_info.read_index, &s->sample_spec); +             +            /* Because the latency info took a little time to come +             * to us, we assume that the real output time is actually +             * a little ahead */ +            usec += s->latency_info.transport_usec; +             +            /* However, the output device usually maintains a buffer +               too, hence the real sample currently played is a little +               back  */ +            if (s->latency_info.sink_usec >= usec)                  usec = 0;              else -                usec -= latency; -                 +                usec -= s->latency_info.sink_usec; +                      } else if (s->direction == PA_STREAM_RECORD) { -            usec += i->source_usec + i->buffer_usec + i->transport_usec; - -            if (usec > i->sink_usec) -                usec -= i->sink_usec; -            else +            /* The last byte written into the server side queue had +             * this time value associated */ +            usec = pa_bytes_to_usec(s->latency_info.write_index < 0 ? 0 : (uint64_t) s->latency_info.write_index, &s->sample_spec); +             +            /* Add transport latency */ +            usec += s->latency_info.transport_usec; +             +            /* Add latency of data in device buffer */ +            usec += s->latency_info.source_usec; +             +            /* If this is a monitor source, we need to correct the +             * time by the playback device buffer */ +            if (s->latency_info.sink_usec >= usec)                  usec = 0; +            else +                usec -= s->latency_info.sink_usec; +        } + +        if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) { +            s->ipol_usec_valid = 1; +            s->ipol_usec = usec;          }      } -    if (usec < s->previous_time) -        usec = s->previous_time; +    /* Interpolate if requested */ +    if (s->flags & PA_STREAM_INTERPOLATE_LATENCY) { -    s->previous_time = usec; +        /* We just add the time that passed since the latency info was +         * current */ +        if (!s->corked) { +            struct timeval now; +             +            usec += pa_timeval_diff(pa_gettimeofday(&now), &s->ipol_timestamp); +            s->ipol_timestamp = now; +        } +    } -    return usec; +    /* Make sure the time runs monotonically */ +    if (!(s->flags & PA_STREAM_NOT_MONOTONOUS)) { +        if (usec < s->previous_time) +            usec = s->previous_time; +        else +            s->previous_time = usec; +    } + +    if (r_usec) +        *r_usec = usec; +     +    return 0;  } -static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t t, pa_usec_t c, int *negative) { +static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {      assert(s);      assert(s->ref >= 1);      if (negative)          *negative = 0; -    if (c < t) { -        if (s->direction == PA_STREAM_RECORD) { -            if (negative) -                *negative = 1; - -            return t-c; +    if (a >= b)  +        return a-b; +    else { +        if (negative && s->direction == PA_STREAM_RECORD) { +            *negative = 1; +            return b-a;          } else              return 0; -    } else -        return c-t; +    }  } -pa_usec_t pa_stream_get_latency(pa_stream *s, const pa_latency_info *i, int *negative) { +int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {      pa_usec_t t, c; +    int r; +    int64_t cindex;      assert(s);      assert(s->ref >= 1); -    assert(i); +    assert(r_usec); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); +    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); +    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); +    PA_CHECK_VALIDITY(s->context, s->latency_info_valid, PA_ERR_NODATA); +    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->latency_info.write_index_corrupt, PA_ERR_NODATA); -    t = pa_stream_get_time(s, i); -    c = pa_bytes_to_usec(s->counter, &s->sample_spec); - -    return time_counter_diff(s, t, c, negative); -} +    if ((r = pa_stream_get_time(s, &t)) < 0) +        return r; -const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) { -    assert(s); -    assert(s->ref >= 1); +    if (s->direction == PA_STREAM_PLAYBACK) +        cindex = s->latency_info.write_index; +    else +        cindex = s->latency_info.read_index; -    return &s->sample_spec; -} +    if (cindex < 0) +        cindex = 0; +     +    c = pa_bytes_to_usec(cindex, &s->sample_spec); -const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) { -    assert(s); -    assert(s->ref >= 1); +    if (s->direction == PA_STREAM_PLAYBACK) +        *r_usec = time_counter_diff(s, c, t, negative); +    else +        *r_usec = time_counter_diff(s, t, c, negative); -    return &s->channel_map; +    return 0;  } -void pa_stream_trash_ipol(pa_stream *s) { +const pa_latency_info* pa_stream_get_latency_info(pa_stream *s) {      assert(s);      assert(s->ref >= 1); -    if (!s->interpolate) -        return; +    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); +    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); +    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->latency_info_valid, PA_ERR_BADSTATE); -    memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp)); -    s->ipol_usec = 0; +    return &s->latency_info;  } -pa_usec_t pa_stream_get_interpolated_time(pa_stream *s) { -    pa_usec_t usec; - +const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {      assert(s);      assert(s->ref >= 1); -     -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->interpolate, PA_ERR_BADSTATE, (pa_usec_t) -1); -     -    if (s->corked) -        usec = s->ipol_usec; -    else { -        if (s->ipol_timestamp.tv_sec == 0) -            usec = 0; -        else -            usec = s->ipol_usec + pa_timeval_age(&s->ipol_timestamp); -    } -     -    if (usec < s->previous_ipol_time) -        usec = s->previous_ipol_time; - -    s->previous_ipol_time = usec; -    return usec; +    return &s->sample_spec;  } -pa_usec_t pa_stream_get_interpolated_latency(pa_stream *s, int *negative) { -    pa_usec_t t, c; - +const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {      assert(s);      assert(s->ref >= 1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1); -    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->interpolate, PA_ERR_BADSTATE, (pa_usec_t) -1); -     -    t = pa_stream_get_interpolated_time(s); -    c = pa_bytes_to_usec(s->counter, &s->sample_spec); -    return time_counter_diff(s, t, c, negative); +    return &s->channel_map;  } + + diff --git a/src/polyp/stream.h b/src/polyp/stream.h index 75b0a900..99284ba3 100644 --- a/src/polyp/stream.h +++ b/src/polyp/stream.h @@ -51,9 +51,6 @@ typedef void (*pa_stream_request_cb_t)(pa_stream *p, size_t length, void *userda  /** A generic notification callback */  typedef void (*pa_stream_notify_cb_t)(pa_stream *p, void *userdata); -/** Callback prototype for pa_stream_get_latency_info() */ -typedef void (*pa_stream_get_latency_info_cb_t)(pa_stream *p, const pa_latency_info *i, void *userdata); -  /** Create a new, unconnected stream with the specified name and sample type */  pa_stream* pa_stream_new(          pa_context *c                     /**< The context to create this stream in */,              @@ -133,8 +130,8 @@ size_t pa_stream_readable_size(pa_stream *p);  /** Drain a playback stream. Use this for notification when the buffer is empty */  pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata); -/** Get the playback latency of a stream */ -pa_operation* pa_stream_get_latency_info(pa_stream *p, pa_stream_get_latency_info_cb_t cby, void *userdata); +/** Update the latency info of a stream */ +pa_operation* pa_stream_update_latency_info(pa_stream *p, pa_stream_success_cb_t cb, void *userdata);  /** Set the callback function that is called whenever the state of the stream changes */  void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata); @@ -173,34 +170,21 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u  /** Rename the stream. \since 0.5 */  pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata); -/** Return the total number of bytes written to/read from the - * stream. This counter is not reset on pa_stream_flush(), you may do - * this yourself using pa_stream_reset_counter(). \since 0.6 */ -uint64_t pa_stream_get_counter(pa_stream *s); -  /** Return the current playback/recording time. This is based on the   * counter accessible with pa_stream_get_counter(). This function   * requires a pa_latency_info structure as argument, which should be   * acquired using pa_stream_get_latency(). \since 0.6 */ -pa_usec_t pa_stream_get_time(pa_stream *s, const pa_latency_info *i); +int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec);  /** Return the total stream latency. Thus function requires a   * pa_latency_info structure as argument, which should be aquired   * using pa_stream_get_latency(). In case the stream is a monitoring   * stream the result can be negative, i.e. the captured samples are   * not yet played. In this case *negative is set to 1. \since 0.6 */ -pa_usec_t pa_stream_get_latency(pa_stream *s, const pa_latency_info *i, int *negative); - -/** Return the interpolated playback/recording time. Requires the - *  PA_STREAM_INTERPOLATE_LATENCY bit set when creating the stream. In - *  contrast to pa_stream_get_latency() this function doesn't require - *  a whole roundtrip for response. \since 0.6 */ -pa_usec_t pa_stream_get_interpolated_time(pa_stream *s); - -/** Return the interpolated playback/recording latency. Requires the - * PA_STREAM_INTERPOLATE_LATENCY bit set when creating the - * stream. \since 0.6 */ -pa_usec_t pa_stream_get_interpolated_latency(pa_stream *s, int *negative); +int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative); + +/** Return the latest latency data. \since 0.8 */ +const pa_latency_info* pa_stream_get_latency_info(pa_stream *s);  /** Return a pointer to the stream's sample specification. \since 0.6 */  const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s);  | 
