summaryrefslogtreecommitdiffstats
path: root/src/pulsecore
diff options
context:
space:
mode:
Diffstat (limited to 'src/pulsecore')
-rw-r--r--src/pulsecore/memblockq.c18
-rw-r--r--src/pulsecore/memblockq.h2
-rw-r--r--src/pulsecore/protocol-native.c168
-rw-r--r--src/pulsecore/sink-input.c4
-rw-r--r--src/pulsecore/sink.c26
-rw-r--r--src/pulsecore/sink.h2
-rw-r--r--src/pulsecore/source-output.c2
-rw-r--r--src/pulsecore/source.c26
-rw-r--r--src/pulsecore/source.h1
-rw-r--r--src/pulsecore/time-smoother.c41
-rw-r--r--src/pulsecore/time-smoother.h14
11 files changed, 228 insertions, 76 deletions
diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c
index e6e7b736..d12d13a8 100644
--- a/src/pulsecore/memblockq.c
+++ b/src/pulsecore/memblockq.c
@@ -601,7 +601,7 @@ size_t pa_memblockq_missing(pa_memblockq *bq) {
return l >= bq->minreq ? l : 0;
}
-void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) {
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account) {
int64_t old, delta;
pa_assert(bq);
@@ -628,12 +628,14 @@ void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) {
delta = bq->write_index - old;
- if (delta >= (int64_t) bq->requested) {
- delta -= (int64_t) bq->requested;
- bq->requested = 0;
- } else if (delta >= 0) {
- bq->requested -= (size_t) delta;
- delta = 0;
+ if (account) {
+ if (delta >= (int64_t) bq->requested) {
+ delta -= (int64_t) bq->requested;
+ bq->requested = 0;
+ } else if (delta >= 0) {
+ bq->requested -= (size_t) delta;
+ delta = 0;
+ }
}
bq->missing -= delta;
@@ -895,7 +897,7 @@ int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source) {
pa_memblock_unref(chunk.memblock);
} else
- pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
pa_memblockq_drop(bq, chunk.length);
}
diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h
index e315b831..146d261b 100644
--- a/src/pulsecore/memblockq.h
+++ b/src/pulsecore/memblockq.h
@@ -85,7 +85,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk);
int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk);
/* Manipulate the write pointer */
-void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek);
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account);
/* Return a copy of the next memory chunk in the queue. It is not
* removed from the queue. There are two reasons this function might
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index e11d7a6c..59e5d80e 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -86,7 +86,15 @@ typedef struct record_stream {
pa_bool_t early_requests:1;
pa_buffer_attr buffer_attr;
- pa_usec_t source_latency;
+
+ pa_atomic_t on_the_fly;
+ pa_usec_t configured_source_latency;
+ size_t drop_initial;
+
+ /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
+ size_t on_the_fly_snapshot;
+ pa_usec_t current_monitor_latency;
+ pa_usec_t current_source_latency;
} record_stream;
PA_DECLARE_CLASS(record_stream);
@@ -119,12 +127,14 @@ typedef struct playback_stream {
uint32_t syncid;
pa_atomic_t missing;
- pa_usec_t sink_latency;
+ pa_usec_t configured_sink_latency;
pa_buffer_attr buffer_attr;
/* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
int64_t read_index, write_index;
size_t render_memblockq_length;
+ pa_usec_t current_sink_latency;
+ uint64_t playing_for, underrun_for;
} playback_stream;
PA_DECLARE_CLASS(playback_stream);
@@ -182,6 +192,10 @@ struct pa_native_protocol {
};
enum {
+ SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
+};
+
+enum {
SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
SINK_INPUT_MESSAGE_FLUSH,
@@ -230,6 +244,7 @@ static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
+static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -474,6 +489,10 @@ static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, i
case RECORD_STREAM_MESSAGE_POST_DATA:
+ /* We try to keep up to date with how many bytes are
+ * currently on the fly */
+ pa_atomic_sub(&s->on_the_fly, chunk->length);
+
if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
/* pa_log_warn("Failed to push data into output queue."); */
return -1;
@@ -537,29 +556,29 @@ static void fix_record_buffer_attr_pre(record_stream *s) {
/* Ok, the user didn't ask us to adjust the latency, hence we
* don't */
- source_usec = 0;
+ source_usec = (pa_usec_t) -1;
}
- if (source_usec > 0)
- s->source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
+ if (source_usec != (pa_usec_t) -1)
+ s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
else
- s->source_latency = 0;
+ s->configured_source_latency = 0;
if (s->early_requests) {
/* Ok, we didn't necessarily get what we were asking for, so
* let's tell the user */
- fragsize_usec = s->source_latency;
+ fragsize_usec = s->configured_source_latency;
} else if (s->adjust_latency) {
/* Now subtract what we actually got */
- if (fragsize_usec >= s->source_latency*2)
- fragsize_usec -= s->source_latency;
+ if (fragsize_usec >= s->configured_source_latency*2)
+ fragsize_usec -= s->configured_source_latency;
else
- fragsize_usec = s->source_latency;
+ fragsize_usec = s->configured_source_latency;
}
if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
@@ -645,7 +664,9 @@ static record_stream* record_stream_new(
s->buffer_attr = *attr;
s->adjust_latency = adjust_latency;
s->early_requests = early_requests;
+ pa_atomic_store(&s->on_the_fly, 0);
+ s->source_output->parent.process_msg = source_output_process_msg;
s->source_output->push = source_output_push_cb;
s->source_output->kill = source_output_kill_cb;
s->source_output->get_latency = source_output_get_latency_cb;
@@ -675,9 +696,9 @@ static record_stream* record_stream_new(
pa_idxset_put(c->record_streams, s, &s->index);
pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
- ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->source_latency) / PA_USEC_PER_MSEC,
+ ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
(double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
- (double) s->source_latency / PA_USEC_PER_MSEC);
+ (double) s->configured_source_latency / PA_USEC_PER_MSEC);
pa_source_output_put(s->source_output);
return s;
@@ -738,24 +759,21 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,
switch (code) {
case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
pa_tagstruct *t;
- uint32_t l = 0;
+ int l = 0;
for (;;) {
- if ((l = (uint32_t) pa_atomic_load(&s->missing)) <= 0)
- break;
+ if ((l = pa_atomic_load(&s->missing)) <= 0)
+ return 0;
- if (pa_atomic_cmpxchg(&s->missing, (int) l, 0))
+ if (pa_atomic_cmpxchg(&s->missing, l, 0))
break;
}
- if (l <= 0)
- break;
-
t = pa_tagstruct_new(NULL, 0);
pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
pa_tagstruct_putu32(t, s->index);
- pa_tagstruct_putu32(t, l);
+ pa_tagstruct_putu32(t, (uint32_t) l);
pa_pstream_send_tagstruct(s->connection->pstream, t);
/* pa_log("Requesting %lu bytes", (unsigned long) l); */
@@ -820,7 +838,7 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,
pa_tagstruct_putu32(t, s->buffer_attr.tlength);
pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
pa_tagstruct_putu32(t, s->buffer_attr.minreq);
- pa_tagstruct_put_usec(t, s->sink_latency);
+ pa_tagstruct_put_usec(t, s->configured_sink_latency);
pa_pstream_send_tagstruct(s->connection->pstream, t);
break;
@@ -918,14 +936,14 @@ static void fix_playback_buffer_attr(playback_stream *s) {
pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
}
- s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
+ s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
if (s->early_requests) {
/* Ok, we didn't necessarily get what we were asking for, so
* let's tell the user */
- minreq_usec = s->sink_latency;
+ minreq_usec = s->configured_sink_latency;
} else if (s->adjust_latency) {
@@ -933,14 +951,14 @@ static void fix_playback_buffer_attr(playback_stream *s) {
* let's subtract from what we asked for for the remaining
* buffer space */
- if (tlength_usec >= s->sink_latency)
- tlength_usec -= s->sink_latency;
+ if (tlength_usec >= s->configured_sink_latency)
+ tlength_usec -= s->configured_sink_latency;
}
/* FIXME: This is actually larger than necessary, since not all of
* the sink latency is actually rewritable. */
- if (tlength_usec < s->sink_latency + 2*minreq_usec)
- tlength_usec = s->sink_latency + 2*minreq_usec;
+ if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
+ tlength_usec = s->configured_sink_latency + 2*minreq_usec;
if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
@@ -1086,10 +1104,10 @@ static playback_stream* playback_stream_new(
pa_idxset_put(c->output_streams, s, &s->index);
pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
- ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->sink_latency) / PA_USEC_PER_MSEC,
+ ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
(double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
(double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
- (double) s->sink_latency / PA_USEC_PER_MSEC);
+ (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
pa_sink_input_put(s->sink_input);
return s;
@@ -1097,7 +1115,8 @@ static playback_stream* playback_stream_new(
/* Called from IO context */
static void playback_stream_request_bytes(playback_stream *s) {
- size_t m, previous_missing, minreq;
+ size_t m, minreq;
+ int previous_missing;
playback_stream_assert_ref(s);
@@ -1108,11 +1127,11 @@ static void playback_stream_request_bytes(playback_stream *s) {
/* pa_log("request_bytes(%lu)", (unsigned long) m); */
- previous_missing = (size_t) pa_atomic_add(&s->missing, (int) m);
+ previous_missing = pa_atomic_add(&s->missing, (int) m);
minreq = pa_memblockq_get_minreq(s->memblockq);
if (pa_memblockq_prebuf_active(s->memblockq) ||
- (previous_missing < minreq && previous_missing+m >= minreq))
+ (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
}
@@ -1297,7 +1316,12 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
int64_t windex;
windex = pa_memblockq_get_write_index(s->memblockq);
- pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
+
+ /* The client side is incapable of accounting correctly
+ * for seeks of a type != PA_SEEK_RELATIVE. We need to be
+ * able to deal with that. */
+
+ pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
handle_seek(s, windex);
return 0;
@@ -1315,7 +1339,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
pa_log_warn("Failed to push data into queue");
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
- pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
}
handle_seek(s, windex);
@@ -1384,10 +1408,14 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
}
case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
-
+ /* Atomically get a snapshot of all timing parameters... */
s->read_index = pa_memblockq_get_read_index(s->memblockq);
s->write_index = pa_memblockq_get_write_index(s->memblockq);
s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
+ s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
+ s->underrun_for = s->sink_input->thread_info.underrun_for;
+ s->playing_for = s->sink_input->thread_info.playing_for;
+
return 0;
case PA_SINK_INPUT_MESSAGE_SET_STATE: {
@@ -1600,7 +1628,7 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
pa_tagstruct_putu32(t, s->buffer_attr.tlength);
pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
pa_tagstruct_putu32(t, s->buffer_attr.minreq);
- pa_tagstruct_put_usec(t, s->sink_latency);
+ pa_tagstruct_put_usec(t, s->configured_sink_latency);
}
pa_pstream_send_tagstruct(s->connection->pstream, t);
@@ -1609,6 +1637,27 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
/*** source_output callbacks ***/
/* Called from thread context */
+static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
+ pa_source_output *o = PA_SOURCE_OUTPUT(_o);
+ record_stream *s;
+
+ pa_source_output_assert_ref(o);
+ s = RECORD_STREAM(o->userdata);
+ record_stream_assert_ref(s);
+
+ switch (code) {
+ case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
+ /* Atomically get a snapshot of all timing parameters... */
+ s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
+ s->current_source_latency = pa_source_get_latency_within_thread(o->source);
+ s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
+ return 0;
+ }
+
+ return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
+}
+
+/* Called from thread context */
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
record_stream *s;
@@ -1617,6 +1666,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
record_stream_assert_ref(s);
pa_assert(chunk);
+ pa_atomic_add(&s->on_the_fly, chunk->length);
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
}
@@ -1712,7 +1762,7 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
if (s->connection->version >= 13) {
pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
- pa_tagstruct_put_usec(t, s->source_latency);
+ pa_tagstruct_put_usec(t, s->configured_source_latency);
}
pa_pstream_send_tagstruct(s->connection->pstream, t);
@@ -1935,7 +1985,7 @@ static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, u
}
if (c->version >= 13)
- pa_tagstruct_put_usec(reply, s->sink_latency);
+ pa_tagstruct_put_usec(reply, s->configured_sink_latency);
pa_pstream_send_tagstruct(c->pstream, reply);
}
@@ -2182,7 +2232,7 @@ static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uin
}
if (c->version >= 13)
- pa_tagstruct_put_usec(reply, s->source_latency);
+ pa_tagstruct_put_usec(reply, s->configured_source_latency);
pa_pstream_send_tagstruct(c->pstream, reply);
}
@@ -2469,7 +2519,6 @@ static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uin
playback_stream *s;
struct timeval tv, now;
uint32_t idx;
- pa_usec_t latency;
pa_native_connection_assert_ref(c);
pa_assert(t);
@@ -2485,25 +2534,27 @@ static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uin
s = pa_idxset_get_by_index(c->output_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0, tag, PA_ERR_NOENTITY)
-
- reply = reply_new(tag);
-
- latency = pa_sink_get_latency(s->sink_input->sink);
- latency += pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec);
- pa_tagstruct_put_usec(reply, latency);
+ /* Get an atomic snapshot of all timing parameters */
+ pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
+ reply = reply_new(tag);
+ pa_tagstruct_put_usec(reply,
+ s->current_sink_latency +
+ pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec));
pa_tagstruct_put_usec(reply, 0);
- pa_tagstruct_put_boolean(reply, s->sink_input->thread_info.playing_for > 0);
+ pa_tagstruct_put_boolean(reply,
+ s->playing_for > 0 &&
+ pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
+ pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
pa_tagstruct_put_timeval(reply, &tv);
pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
pa_tagstruct_puts64(reply, s->write_index);
pa_tagstruct_puts64(reply, s->read_index);
if (c->version >= 13) {
- pa_tagstruct_putu64(reply, s->sink_input->thread_info.underrun_for);
- pa_tagstruct_putu64(reply, s->sink_input->thread_info.playing_for);
+ pa_tagstruct_putu64(reply, s->underrun_for);
+ pa_tagstruct_putu64(reply, s->playing_for);
}
pa_pstream_send_tagstruct(c->pstream, reply);
@@ -2530,10 +2581,17 @@ static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint3
s = pa_idxset_get_by_index(c->record_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
+ /* Get an atomic snapshot of all timing parameters */
+ pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
+
reply = reply_new(tag);
- pa_tagstruct_put_usec(reply, s->source_output->source->monitor_of ? pa_sink_get_latency(s->source_output->source->monitor_of) : 0);
- pa_tagstruct_put_usec(reply, pa_source_get_latency(s->source_output->source));
- pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING);
+ pa_tagstruct_put_usec(reply, s->current_monitor_latency);
+ pa_tagstruct_put_usec(reply,
+ s->current_source_latency +
+ pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
+ pa_tagstruct_put_boolean(reply,
+ pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
+ pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
pa_tagstruct_put_timeval(reply, &tv);
pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
@@ -3511,7 +3569,7 @@ static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, u
pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
if (c->version >= 13)
- pa_tagstruct_put_usec(reply, s->sink_latency);
+ pa_tagstruct_put_usec(reply, s->configured_sink_latency);
} else {
record_stream *s;
@@ -3547,7 +3605,7 @@ static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, u
pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
if (c->version >= 13)
- pa_tagstruct_put_usec(reply, s->source_latency);
+ pa_tagstruct_put_usec(reply, s->configured_source_latency);
}
pa_pstream_send_tagstruct(c->pstream, reply);
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 0ed16dd8..1fdb3fa6 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -631,7 +631,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
* data, so let's just hand out silence */
pa_atomic_store(&i->thread_info.drained, 1);
- pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE);
i->thread_info.playing_for = 0;
if (i->thread_info.underrun_for != (uint64_t) -1)
i->thread_info.underrun_for += ilength;
@@ -776,7 +776,7 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
if (amount > 0)
/* Ok, now update the write pointer */
- pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE);
+ pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE, TRUE);
if (i->thread_info.rewrite_flush)
pa_memblockq_silence(i->thread_info.render_memblockq);
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 73ad247d..a0f0ea7e 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -958,6 +958,32 @@ pa_usec_t pa_sink_get_latency(pa_sink *s) {
return usec;
}
+/* Called from IO thread */
+pa_usec_t pa_sink_get_latency_within_thread(pa_sink *s) {
+ pa_usec_t usec = 0;
+ pa_msgobject *o;
+
+ pa_sink_assert_ref(s);
+ pa_assert(PA_SINK_IS_LINKED(s->thread_info.state));
+
+ /* The returned value is supposed to be in the time domain of the sound card! */
+
+ if (s->thread_info.state == PA_SINK_SUSPENDED)
+ return 0;
+
+ if (!(s->flags & PA_SINK_LATENCY))
+ return 0;
+
+ o = PA_MSGOBJECT(s);
+
+ /* We probably should make this a proper vtable callback instead of going through process_msg() */
+
+ if (o->process_msg(o, PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
+ return -1;
+
+ return usec;
+}
+
/* Called from main thread */
void pa_sink_update_flat_volume(pa_sink *s, pa_cvolume *new_volume) {
pa_sink_input *i;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index 7d1e11ef..634bf3ef 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -295,4 +295,6 @@ void pa_sink_request_rewind(pa_sink*s, size_t nbytes);
void pa_sink_invalidate_requested_latency(pa_sink *s);
+pa_usec_t pa_sink_get_latency_within_thread(pa_sink *s);
+
#endif
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index 27f24cd1..1c37be93 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -434,7 +434,7 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
if (pa_memblockq_push(o->thread_info.delay_memblockq, chunk) < 0) {
pa_log_debug("Delay queue overflow!");
- pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
}
limit = o->process_rewind ? 0 : o->source->thread_info.max_rewind;
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 1c3377be..252e23ab 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -617,6 +617,32 @@ pa_usec_t pa_source_get_latency(pa_source *s) {
return usec;
}
+/* Called from IO thread */
+pa_usec_t pa_source_get_latency_within_thread(pa_source *s) {
+ pa_usec_t usec = 0;
+ pa_msgobject *o;
+
+ pa_source_assert_ref(s);
+ pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
+
+ /* The returned value is supposed to be in the time domain of the sound card! */
+
+ if (s->thread_info.state == PA_SOURCE_SUSPENDED)
+ return 0;
+
+ if (!(s->flags & PA_SOURCE_LATENCY))
+ return 0;
+
+ o = PA_MSGOBJECT(s);
+
+ /* We probably should make this a proper vtable callback instead of going through process_msg() */
+
+ if (o->process_msg(o, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
+ return -1;
+
+ return usec;
+}
+
/* Called from main thread */
void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
pa_cvolume old_virtual_volume;
diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h
index 210f5340..652783ef 100644
--- a/src/pulsecore/source.h
+++ b/src/pulsecore/source.h
@@ -267,5 +267,6 @@ void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_laten
/*** To be called exclusively by source output drivers, from IO context */
void pa_source_invalidate_requested_latency(pa_source *s);
+pa_usec_t pa_source_get_latency_within_thread(pa_source *s);
#endif
diff --git a/src/pulsecore/time-smoother.c b/src/pulsecore/time-smoother.c
index 65621948..55ac8687 100644
--- a/src/pulsecore/time-smoother.c
+++ b/src/pulsecore/time-smoother.c
@@ -78,17 +78,26 @@ struct pa_smoother {
/* Cached parameters for our interpolation polynomial y=ax^3+b^2+cx */
double a, b, c;
- pa_bool_t abc_valid;
+ pa_bool_t abc_valid:1;
pa_bool_t monotonic:1;
pa_bool_t paused:1;
+ pa_bool_t smoothing:1; /* If FALSE we skip the polonyomial interpolation step */
pa_usec_t pause_time;
unsigned min_history;
};
-pa_smoother* pa_smoother_new(pa_usec_t adjust_time, pa_usec_t history_time, pa_bool_t monotonic, unsigned min_history) {
+pa_smoother* pa_smoother_new(
+ pa_usec_t adjust_time,
+ pa_usec_t history_time,
+ pa_bool_t monotonic,
+ pa_bool_t smoothing,
+ unsigned min_history,
+ pa_usec_t time_offset,
+ pa_bool_t paused) {
+
pa_smoother *s;
pa_assert(adjust_time > 0);
@@ -116,9 +125,13 @@ pa_smoother* pa_smoother_new(pa_usec_t adjust_time, pa_usec_t history_time, pa_b
s->abc_valid = FALSE;
s->paused = FALSE;
+ s->smoothing = smoothing;
s->min_history = min_history;
+ s->paused = paused;
+ s->time_offset = s->pause_time = time_offset;
+
return s;
}
@@ -278,7 +291,7 @@ static void estimate(pa_smoother *s, pa_usec_t x, pa_usec_t *y, double *deriv) {
pa_assert(s);
pa_assert(y);
- if (x >= s->px) {
+ if (!s->smoothing || x >= s->px) {
int64_t t;
/* The requested point is right of the point where we wanted
@@ -348,7 +361,6 @@ void pa_smoother_put(pa_smoother *s, pa_usec_t x, pa_usec_t y) {
* we can adjust our position smoothly from this one */
estimate(s, x, &ney, &nde);
s->ex = x; s->ey = ney; s->de = nde;
-
s->ry = y;
}
@@ -359,8 +371,13 @@ void pa_smoother_put(pa_smoother *s, pa_usec_t x, pa_usec_t y) {
s->dp = avg_gradient(s, x);
/* And calculate when we want to be on track again */
- s->px = s->ex + s->adjust_time;
- s->py = s->ry + (pa_usec_t) llrint(s->dp * (double) s->adjust_time);
+ if (s->smoothing) {
+ s->px = s->ex + s->adjust_time;
+ s->py = s->ry + (pa_usec_t) llrint(s->dp * (double) s->adjust_time);
+ } else {
+ s->px = s->ex;
+ s->py = s->ry;
+ }
s->abc_valid = FALSE;
@@ -420,7 +437,7 @@ void pa_smoother_pause(pa_smoother *s, pa_usec_t x) {
s->pause_time = x;
}
-void pa_smoother_resume(pa_smoother *s, pa_usec_t x) {
+void pa_smoother_resume(pa_smoother *s, pa_usec_t x, pa_bool_t fix_now) {
pa_assert(s);
if (!s->paused)
@@ -433,6 +450,16 @@ void pa_smoother_resume(pa_smoother *s, pa_usec_t x) {
s->paused = FALSE;
s->time_offset += x - s->pause_time;
+
+ if (fix_now)
+ pa_smoother_fix_now(s);
+}
+
+void pa_smoother_fix_now(pa_smoother *s) {
+ pa_assert(s);
+
+ s->px = s->ex;
+ s->py = s->ry;
}
pa_usec_t pa_smoother_translate(pa_smoother *s, pa_usec_t x, pa_usec_t y_delay) {
diff --git a/src/pulsecore/time-smoother.h b/src/pulsecore/time-smoother.h
index 2051e640..5244a7e7 100644
--- a/src/pulsecore/time-smoother.h
+++ b/src/pulsecore/time-smoother.h
@@ -27,7 +27,15 @@
typedef struct pa_smoother pa_smoother;
-pa_smoother* pa_smoother_new(pa_usec_t x_adjust_time, pa_usec_t x_history_time, pa_bool_t monotonic, unsigned min_history);
+pa_smoother* pa_smoother_new(
+ pa_usec_t x_adjust_time,
+ pa_usec_t x_history_time,
+ pa_bool_t monotonic,
+ pa_bool_t smoothing,
+ unsigned min_history,
+ pa_usec_t x_offset,
+ pa_bool_t paused);
+
void pa_smoother_free(pa_smoother* s);
/* Adds a new value to our dataset. x = local/system time, y = remote time */
@@ -42,8 +50,10 @@ pa_usec_t pa_smoother_translate(pa_smoother *s, pa_usec_t x, pa_usec_t y_delay);
void pa_smoother_set_time_offset(pa_smoother *s, pa_usec_t x_offset);
void pa_smoother_pause(pa_smoother *s, pa_usec_t x);
-void pa_smoother_resume(pa_smoother *s, pa_usec_t x);
+void pa_smoother_resume(pa_smoother *s, pa_usec_t x, pa_bool_t abrupt);
void pa_smoother_reset(pa_smoother *s);
+void pa_smoother_fix_now(pa_smoother *s);
+
#endif