From 373b5efe51238b0ad34cb9a9d8fc61b973afdad8 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 1 Apr 2009 23:05:09 +0200 Subject: properly account for seeks in the requested_bytes counter --- src/modules/module-ladspa-sink.c | 2 +- src/modules/rtp/module-rtp-recv.c | 4 ++-- src/pulse/context.c | 4 ++-- src/pulse/internal.h | 2 +- src/pulse/stream.c | 20 ++++++++++---------- src/pulsecore/memblockq.c | 18 ++++++++++-------- src/pulsecore/memblockq.h | 2 +- src/pulsecore/protocol-native.c | 29 ++++++++++++++++------------- src/pulsecore/sink-input.c | 4 ++-- src/pulsecore/source-output.c | 2 +- src/tests/memblockq-test.c | 18 +++++++++--------- 11 files changed, 55 insertions(+), 50 deletions(-) diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c index e619acd3..44052c9c 100644 --- a/src/modules/module-ladspa-sink.c +++ b/src/modules/module-ladspa-sink.c @@ -235,7 +235,7 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { if (amount > 0) { unsigned c; - pa_memblockq_seek(u->memblockq, - (int64_t) amount, PA_SEEK_RELATIVE); + pa_memblockq_seek(u->memblockq, - (int64_t) amount, PA_SEEK_RELATIVE, TRUE); pa_log_debug("Resetting plugin"); diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c index 33e23af2..209770f9 100644 --- a/src/modules/rtp/module-rtp-recv.c +++ b/src/modules/rtp/module-rtp-recv.c @@ -238,7 +238,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { else delta = j; - pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE); + pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE); pa_rtclock_get(&now); @@ -246,7 +246,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { if (pa_memblockq_push(s->memblockq, &chunk) < 0) { pa_log_warn("Queue overrun"); - pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE); + pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE); } /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */ diff --git a/src/pulse/context.c b/src/pulse/context.c index 28d17191..991a886f 100644 --- a/src/pulse/context.c +++ b/src/pulse/context.c @@ -364,10 +364,10 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o if ((s = pa_dynarray_get(c->record_streams, channel))) { if (chunk->memblock) { - pa_memblockq_seek(s->record_memblockq, offset, seek); + pa_memblockq_seek(s->record_memblockq, offset, seek, TRUE); pa_memblockq_push_align(s->record_memblockq, chunk); } else - pa_memblockq_seek(s->record_memblockq, offset+chunk->length, seek); + pa_memblockq_seek(s->record_memblockq, offset+chunk->length, seek, TRUE); if (s->read_callback) { size_t l; diff --git a/src/pulse/internal.h b/src/pulse/internal.h index cf362d99..43d1877c 100644 --- a/src/pulse/internal.h +++ b/src/pulse/internal.h @@ -140,7 +140,7 @@ struct pa_stream { uint32_t syncid; uint32_t stream_index; - uint32_t requested_bytes; + int64_t requested_bytes; pa_buffer_attr buffer_attr; uint32_t device_index; diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 16342cad..c4a54af6 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -729,7 +729,7 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag s->requested_bytes += bytes; if (s->requested_bytes > 0 && s->write_callback) - s->write_callback(s, s->requested_bytes, s->write_userdata); + s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata); finish: pa_context_unref(c); @@ -826,7 +826,7 @@ static void create_stream_complete(pa_stream *s) { pa_stream_set_state(s, PA_STREAM_READY); if (s->requested_bytes > 0 && s->write_callback) - s->write_callback(s, s->requested_bytes, s->write_userdata); + s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata); if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) { struct timeval tv; @@ -874,6 +874,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { pa_stream *s = userdata; + uint32_t requested_bytes; pa_assert(pd); pa_assert(s); @@ -893,11 +894,13 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, if (pa_tagstruct_getu32(t, &s->channel) < 0 || s->channel == PA_INVALID_INDEX || ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) || - ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) { + ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) { pa_context_fail(s->context, PA_ERR_PROTOCOL); goto finish; } + s->requested_bytes = (int64_t) requested_bytes; + if (s->context->version >= 9) { if (s->direction == PA_STREAM_PLAYBACK) { if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 || @@ -1258,12 +1261,9 @@ int pa_stream_write( if (free_cb && pa_pstream_get_shm(s->context->pstream)) free_cb((void*) data); - if (length < s->requested_bytes) - s->requested_bytes -= (uint32_t) length; - else - s->requested_bytes = 0; - - /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/ + /* This is obviously wrong since we ignore the seeking index . But + * that's OK, the server side applies the same error */ + s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length; if (s->direction == PA_STREAM_PLAYBACK) { @@ -1359,7 +1359,7 @@ size_t pa_stream_writable_size(pa_stream *s) { PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1); PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - return s->requested_bytes; + return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0; } size_t pa_stream_readable_size(pa_stream *s) { diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c index e6e7b736..d12d13a8 100644 --- a/src/pulsecore/memblockq.c +++ b/src/pulsecore/memblockq.c @@ -601,7 +601,7 @@ size_t pa_memblockq_missing(pa_memblockq *bq) { return l >= bq->minreq ? l : 0; } -void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) { +void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account) { int64_t old, delta; pa_assert(bq); @@ -628,12 +628,14 @@ void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) { delta = bq->write_index - old; - if (delta >= (int64_t) bq->requested) { - delta -= (int64_t) bq->requested; - bq->requested = 0; - } else if (delta >= 0) { - bq->requested -= (size_t) delta; - delta = 0; + if (account) { + if (delta >= (int64_t) bq->requested) { + delta -= (int64_t) bq->requested; + bq->requested = 0; + } else if (delta >= 0) { + bq->requested -= (size_t) delta; + delta = 0; + } } bq->missing -= delta; @@ -895,7 +897,7 @@ int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source) { pa_memblock_unref(chunk.memblock); } else - pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE); + pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE); pa_memblockq_drop(bq, chunk.length); } diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h index e315b831..146d261b 100644 --- a/src/pulsecore/memblockq.h +++ b/src/pulsecore/memblockq.h @@ -85,7 +85,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk); int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk); /* Manipulate the write pointer */ -void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek); +void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account); /* Return a copy of the next memory chunk in the queue. It is not * removed from the queue. There are two reasons this function might diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index e11d7a6c..30d68f35 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -738,24 +738,21 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, switch (code) { case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: { pa_tagstruct *t; - uint32_t l = 0; + int l = 0; for (;;) { - if ((l = (uint32_t) pa_atomic_load(&s->missing)) <= 0) - break; + if ((l = pa_atomic_load(&s->missing)) <= 0) + return 0; - if (pa_atomic_cmpxchg(&s->missing, (int) l, 0)) + if (pa_atomic_cmpxchg(&s->missing, l, 0)) break; } - if (l <= 0) - break; - t = pa_tagstruct_new(NULL, 0); pa_tagstruct_putu32(t, PA_COMMAND_REQUEST); pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */ pa_tagstruct_putu32(t, s->index); - pa_tagstruct_putu32(t, l); + pa_tagstruct_putu32(t, (uint32_t) l); pa_pstream_send_tagstruct(s->connection->pstream, t); /* pa_log("Requesting %lu bytes", (unsigned long) l); */ @@ -1097,7 +1094,8 @@ static playback_stream* playback_stream_new( /* Called from IO context */ static void playback_stream_request_bytes(playback_stream *s) { - size_t m, previous_missing, minreq; + size_t m, minreq; + int previous_missing; playback_stream_assert_ref(s); @@ -1108,11 +1106,11 @@ static void playback_stream_request_bytes(playback_stream *s) { /* pa_log("request_bytes(%lu)", (unsigned long) m); */ - previous_missing = (size_t) pa_atomic_add(&s->missing, (int) m); + previous_missing = pa_atomic_add(&s->missing, (int) m); minreq = pa_memblockq_get_minreq(s->memblockq); if (pa_memblockq_prebuf_active(s->memblockq) || - (previous_missing < minreq && previous_missing+m >= minreq)) + (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq)) pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); } @@ -1297,7 +1295,12 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int int64_t windex; windex = pa_memblockq_get_write_index(s->memblockq); - pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata)); + + /* The client side is incapable of accounting correctly + * for seeks of a type != PA_SEEK_RELATIVE. We need to be + * able to deal with that. */ + + pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE); handle_seek(s, windex); return 0; @@ -1315,7 +1318,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int if (pa_memblockq_push_align(s->memblockq, chunk) < 0) { pa_log_warn("Failed to push data into queue"); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL); - pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE); + pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE); } handle_seek(s, windex); diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index 0ed16dd8..1fdb3fa6 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -631,7 +631,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p * data, so let's just hand out silence */ pa_atomic_store(&i->thread_info.drained, 1); - pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE); + pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE); i->thread_info.playing_for = 0; if (i->thread_info.underrun_for != (uint64_t) -1) i->thread_info.underrun_for += ilength; @@ -776,7 +776,7 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam if (amount > 0) /* Ok, now update the write pointer */ - pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE); + pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE, TRUE); if (i->thread_info.rewrite_flush) pa_memblockq_silence(i->thread_info.render_memblockq); diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 27f24cd1..1c37be93 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -434,7 +434,7 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { if (pa_memblockq_push(o->thread_info.delay_memblockq, chunk) < 0) { pa_log_debug("Delay queue overflow!"); - pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE); + pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE); } limit = o->process_rewind ? 0 : o->source->thread_info.max_rewind; diff --git a/src/tests/memblockq-test.c b/src/tests/memblockq-test.c index 127fb197..ec3f5426 100644 --- a/src/tests/memblockq-test.c +++ b/src/tests/memblockq-test.c @@ -105,45 +105,45 @@ int main(int argc, char *argv[]) { ret = pa_memblockq_push(bq, &chunk4); assert(ret == 0); - pa_memblockq_seek(bq, -6, 0); + pa_memblockq_seek(bq, -6, 0, TRUE); ret = pa_memblockq_push(bq, &chunk3); assert(ret == 0); - pa_memblockq_seek(bq, -2, 0); + pa_memblockq_seek(bq, -2, 0, TRUE); ret = pa_memblockq_push(bq, &chunk1); assert(ret == 0); - pa_memblockq_seek(bq, -10, 0); + pa_memblockq_seek(bq, -10, 0, TRUE); ret = pa_memblockq_push(bq, &chunk4); assert(ret == 0); - pa_memblockq_seek(bq, 10, 0); + pa_memblockq_seek(bq, 10, 0, TRUE); ret = pa_memblockq_push(bq, &chunk1); assert(ret == 0); - pa_memblockq_seek(bq, -6, 0); + pa_memblockq_seek(bq, -6, 0, TRUE); ret = pa_memblockq_push(bq, &chunk2); assert(ret == 0); /* Test splitting */ - pa_memblockq_seek(bq, -12, 0); + pa_memblockq_seek(bq, -12, 0, TRUE); ret = pa_memblockq_push(bq, &chunk1); assert(ret == 0); - pa_memblockq_seek(bq, 20, 0); + pa_memblockq_seek(bq, 20, 0, TRUE); /* Test merging */ ret = pa_memblockq_push(bq, &chunk3); assert(ret == 0); - pa_memblockq_seek(bq, -2, 0); + pa_memblockq_seek(bq, -2, 0, TRUE); chunk3.index += 2; chunk3.length -= 2; ret = pa_memblockq_push(bq, &chunk3); assert(ret == 0); - pa_memblockq_seek(bq, 30, PA_SEEK_RELATIVE); + pa_memblockq_seek(bq, 30, PA_SEEK_RELATIVE, TRUE); dump(bq); -- cgit