diff options
Diffstat (limited to 'src/pulsecore/protocol-native.c')
-rw-r--r-- | src/pulsecore/protocol-native.c | 276 |
1 files changed, 194 insertions, 82 deletions
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index fc5aa598..37200db6 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -72,6 +72,7 @@ #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */ #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */ +#define DEFAULT_PROCESS_MSEC 20 /* 20ms */ #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC typedef struct connection connection; @@ -104,7 +105,7 @@ typedef struct playback_stream { pa_bool_t drain_request; uint32_t drain_tag; uint32_t syncid; - pa_bool_t underrun; + uint64_t underrun; /* length of underrun */ pa_atomic_t missing; size_t minreq; @@ -208,6 +209,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk static void sink_input_kill_cb(pa_sink_input *i); static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend); static void sink_input_moved_cb(pa_sink_input *i); +static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes); +static void sink_input_set_max_rewind_cb(pa_sink_input *i, size_t nbytes); + static void send_memblock(connection *c); static void request_bytes(struct playback_stream*s); @@ -471,7 +475,6 @@ static record_stream* record_stream_new( pa_source *source, pa_sample_spec *ss, pa_channel_map *map, - const char *name, uint32_t *maxlength, uint32_t *fragsize, pa_source_output_flags_t flags, @@ -485,7 +488,6 @@ static record_stream* record_stream_new( pa_assert(c); pa_assert(ss); - pa_assert(name); pa_assert(maxlength); pa_assert(p); @@ -523,7 +525,7 @@ static record_stream* record_stream_new( if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH) *maxlength = MAX_MEMBLOCKQ_LENGTH; if (*fragsize <= 0) - *fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*1000, &source_output->sample_spec); + *fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &source_output->sample_spec); if (adjust_latency) { pa_usec_t fragsize_usec; @@ -618,21 +620,14 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, uint32_t l = 0; for (;;) { - int32_t k; - - if ((k = pa_atomic_load(&s->missing)) <= 0) - break; - - l += k; - - if (l < s->minreq) + if ((l = pa_atomic_load(&s->missing)) <= 0) break; - if (pa_atomic_sub(&s->missing, k) <= k) + if (pa_atomic_cmpxchg(&s->missing, l, 0)) break; } - if (l < s->minreq) + if (l <= 0) break; t = pa_tagstruct_new(NULL, 0); @@ -642,7 +637,7 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, pa_tagstruct_putu32(t, l); pa_pstream_send_tagstruct(s->connection->pstream, t); -/* pa_log("Requesting %u bytes", l); */ +/* pa_log("Requesting %lu bytes", (unsigned long) l); */ break; } @@ -684,7 +679,6 @@ static playback_stream* playback_stream_new( pa_sink *sink, pa_sample_spec *ss, pa_channel_map *map, - const char *name, uint32_t *maxlength, uint32_t *tlength, uint32_t *prebuf, @@ -699,14 +693,15 @@ static playback_stream* playback_stream_new( playback_stream *s, *ssync; pa_sink_input *sink_input; - pa_memblock *silence; + pa_memchunk silence; uint32_t idx; int64_t start_index; pa_sink_input_new_data data; + pa_usec_t tlength_usec, minreq_usec, sink_usec; + size_t frame_size; pa_assert(c); pa_assert(ss); - pa_assert(name); pa_assert(maxlength); pa_assert(tlength); pa_assert(prebuf); @@ -761,10 +756,12 @@ static playback_stream* playback_stream_new( s->connection = c; s->syncid = syncid; s->sink_input = sink_input; - s->underrun = TRUE; + s->underrun = (uint64_t) -1; s->sink_input->parent.process_msg = sink_input_process_msg; s->sink_input->pop = sink_input_pop_cb; + s->sink_input->rewind = sink_input_rewind_cb; + s->sink_input->set_max_rewind = sink_input_set_max_rewind_cb; s->sink_input->kill = sink_input_kill_cb; s->sink_input->moved = sink_input_moved_cb; s->sink_input->suspend = sink_input_suspend_cb; @@ -775,40 +772,69 @@ static playback_stream* playback_stream_new( if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH) *maxlength = MAX_MEMBLOCKQ_LENGTH; if (*tlength <= 0) - *tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*1000, &sink_input->sample_spec); + *tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &sink_input->sample_spec); if (*minreq <= 0) - *minreq = (*tlength*9)/10; - if (*prebuf <= 0) - *prebuf = *tlength; + *minreq = pa_usec_to_bytes(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &sink_input->sample_spec); + + frame_size = pa_frame_size(&sink_input->sample_spec); + if (*minreq <= 0) + *minreq = frame_size; + if (*tlength < *minreq+frame_size) + *tlength = *minreq+frame_size; + + tlength_usec = pa_bytes_to_usec(*tlength, &sink_input->sample_spec); + minreq_usec = pa_bytes_to_usec(*minreq, &sink_input->sample_spec); if (adjust_latency) { - pa_usec_t tlength_usec, minreq_usec; - /* So, the user asked us to adjust the latency according to - * the what the sink can provide. Half the latency will be - * spent on the hw buffer, half of it in the async buffer - * queue we maintain for each client. */ + /* So, the user asked us to adjust the latency of the stream + * buffer according to the what the sink can provide. The + * tlength passed in shall be the overall latency. Roughly + * half the latency will be spent on the hw buffer, the other + * half of it in the async buffer queue we maintain for each + * client. In between we'll have a safety space of size + * minreq.*/ + + sink_usec = (tlength_usec-minreq_usec)/2; + + } else { + + /* Ok, the user didn't ask us to adjust the latency, but we + * still need to make sure that the parameters from the user + * do make sense. */ + + sink_usec = tlength_usec - minreq_usec; + } - tlength_usec = pa_bytes_to_usec(*tlength, &sink_input->sample_spec); - minreq_usec = pa_bytes_to_usec(*minreq, &sink_input->sample_spec); + s->sink_latency = pa_sink_input_set_requested_latency(sink_input, sink_usec); - s->sink_latency = pa_sink_input_set_requested_latency(sink_input, tlength_usec/2); + if (adjust_latency) { + /* Ok, we didn't necessarily get what we were asking for, so + * let's subtract from what we asked for for the remaining + * buffer space */ - if (tlength_usec >= s->sink_latency*2) + if (tlength_usec >= s->sink_latency) tlength_usec -= s->sink_latency; - else - tlength_usec = s->sink_latency; + } - if (minreq_usec >= s->sink_latency*2) - minreq_usec -= s->sink_latency; - else - minreq_usec = s->sink_latency; + if (tlength_usec < s->sink_latency + minreq_usec) + tlength_usec = s->sink_latency + minreq_usec; - *tlength = pa_usec_to_bytes(tlength_usec, &sink_input->sample_spec); - *minreq = pa_usec_to_bytes(minreq_usec, &sink_input->sample_spec); + *tlength = pa_usec_to_bytes(tlength_usec, &sink_input->sample_spec); + *minreq = pa_usec_to_bytes(minreq_usec, &sink_input->sample_spec); + + if (*minreq <= 0) { + *minreq = frame_size; + *tlength += frame_size; } - silence = pa_silence_memblock_new(c->protocol->core->mempool, &sink_input->sample_spec, 0); + if (*tlength <= *minreq) + *tlength = *minreq + frame_size; + + if (*prebuf <= 0) + *prebuf = *tlength; + + pa_sink_input_get_silence(sink_input, &silence); s->memblockq = pa_memblockq_new( start_index, @@ -818,9 +844,9 @@ static playback_stream* playback_stream_new( *prebuf, *minreq, 0, - silence); + &silence); - pa_memblock_unref(silence); + pa_memblock_unref(silence.memblock); *maxlength = (uint32_t) pa_memblockq_get_maxlength(s->memblockq); *tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq); @@ -924,10 +950,12 @@ static void request_bytes(playback_stream *s) { if (m <= 0) return; -/* pa_log("request_bytes(%u)", m); */ +/* pa_log("request_bytes(%lu)", (unsigned long) m); */ previous_missing = pa_atomic_add(&s->missing, m); - if (previous_missing < s->minreq && previous_missing+m >= s->minreq) { + + if (pa_memblockq_prebuf_active(s->memblockq) || + (previous_missing < s->minreq && previous_missing+m >= s->minreq)) { pa_assert(pa_thread_mq_get()); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); } @@ -989,6 +1017,45 @@ static void send_record_stream_killed(record_stream *r) { /*** sink input callbacks ***/ +static void handle_seek(playback_stream *s, int64_t indexw) { + playback_stream_assert_ref(s); + +/* pa_log("handle_seek: %llu -- %i", (unsigned long long) s->underrun, pa_memblockq_is_readable(s->memblockq)); */ + + if (s->underrun != 0) { + +/* pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */ + + if (pa_memblockq_is_readable(s->memblockq)) { + + size_t u = pa_memblockq_get_length(s->memblockq); + + if (u >= s->underrun) + u = s->underrun; + + pa_log("yeah! ready to rock"); + + /* We just ended an underrun, let's ask the sink + * to rewrite */ + s->sink_input->thread_info.ignore_rewind = TRUE; + pa_sink_input_request_rewind(s->sink_input, u, TRUE); + } + + } else { + int64_t indexr; + + indexr = pa_memblockq_get_read_index(s->memblockq); + + if (indexw < indexr) + /* OK, the sink already asked for this data, so + * let's have it usk us again */ + + pa_sink_input_request_rewind(s->sink_input, indexr - indexw, FALSE); + } + + request_bytes(s); +} + /* Called from thread context */ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { pa_sink_input *i = PA_SINK_INPUT(o); @@ -1000,48 +1067,42 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int switch (code) { - case SINK_INPUT_MESSAGE_SEEK: + case SINK_INPUT_MESSAGE_SEEK: { + int64_t windex; + + windex = pa_memblockq_get_write_index(s->memblockq); pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata)); - request_bytes(s); + + handle_seek(s, windex); return 0; + } case SINK_INPUT_MESSAGE_POST_DATA: { + int64_t windex; + pa_assert(chunk); -/* pa_log("sink input post: %u", chunk->length); */ +/* pa_log("sink input post: %lu", (unsigned long) chunk->length); */ - if (pa_memblockq_push_align(s->memblockq, chunk) < 0) { + windex = pa_memblockq_get_write_index(s->memblockq); + if (pa_memblockq_push_align(s->memblockq, chunk) < 0) { pa_log_warn("Failed to push data into queue"); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL); pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_RELATIVE); } - request_bytes(s); - - s->underrun = FALSE; - return 0; - } - - case SINK_INPUT_MESSAGE_DRAIN: { - - pa_memblockq_prebuf_disable(s->memblockq); - - if (!pa_memblockq_is_readable(s->memblockq)) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL); - else { - s->drain_tag = PA_PTR_TO_UINT(userdata); - s->drain_request = TRUE; - } - request_bytes(s); + handle_seek(s, windex); return 0; } + case SINK_INPUT_MESSAGE_DRAIN: case SINK_INPUT_MESSAGE_FLUSH: case SINK_INPUT_MESSAGE_PREBUF_FORCE: case SINK_INPUT_MESSAGE_TRIGGER: { + int64_t windex; pa_sink_input *isync; void (*func)(pa_memblockq *bq); @@ -1054,6 +1115,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int func = pa_memblockq_prebuf_force; break; + case SINK_INPUT_MESSAGE_DRAIN: case SINK_INPUT_MESSAGE_TRIGGER: func = pa_memblockq_prebuf_disable; break; @@ -1062,23 +1124,32 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int pa_assert_not_reached(); } + windex = pa_memblockq_get_write_index(s->memblockq); func(s->memblockq); - s->underrun = FALSE; - request_bytes(s); + handle_seek(s, windex); /* Do the same for all other members in the sync group */ for (isync = i->sync_prev; isync; isync = isync->sync_prev) { playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); + windex = pa_memblockq_get_write_index(ssync->memblockq); func(ssync->memblockq); - ssync->underrun = FALSE; - request_bytes(ssync); + handle_seek(ssync, windex); } for (isync = i->sync_next; isync; isync = isync->sync_next) { playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); + windex = pa_memblockq_get_write_index(ssync->memblockq); func(ssync->memblockq); - ssync->underrun = FALSE; - request_bytes(ssync); + handle_seek(ssync, windex); + } + + if (code == SINK_INPUT_MESSAGE_DRAIN) { + if (!pa_memblockq_is_readable(s->memblockq)) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL); + else { + s->drain_tag = PA_PTR_TO_UINT(userdata); + s->drain_request = TRUE; + } } return 0; @@ -1091,11 +1162,18 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq); return 0; - case PA_SINK_INPUT_MESSAGE_SET_STATE: + case PA_SINK_INPUT_MESSAGE_SET_STATE: { + int64_t windex; + + windex = pa_memblockq_get_write_index(s->memblockq); pa_memblockq_prebuf_force(s->memblockq); - request_bytes(s); + + handle_seek(s, windex); + + /* Fall through to the default handler */ break; + } case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = userdata; @@ -1112,7 +1190,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int } /* Called from thread context */ -static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) { playback_stream *s; pa_sink_input_assert_ref(i); @@ -1122,23 +1200,58 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk if (pa_memblockq_peek(s->memblockq, chunk) < 0) { +/* pa_log("UNDERRUN"); */ + if (s->drain_request && pa_sink_input_safe_to_remove(i)) { s->drain_request = FALSE; pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); - } else if (!s->underrun) { - s->underrun = TRUE; + } else if (s->underrun == 0) pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL); - } + + if (s->underrun != (size_t) -1) + s->underrun += nbytes; + +/* pa_log("added %llu bytes, total is %llu", (unsigned long long) nbytes, (unsigned long long) s->underrun); */ + + request_bytes(s); return -1; } +/* pa_log("NOTUNDERRUN"); */ + + s->underrun = 0; + pa_memblockq_drop(s->memblockq, chunk->length); request_bytes(s); return 0; } +static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes) { + playback_stream *s; + + pa_sink_input_assert_ref(i); + s = PLAYBACK_STREAM(i->userdata); + playback_stream_assert_ref(s); + + /* If we are in an underrun, then we don't rewind */ + if (s->underrun != 0) + return; + + pa_memblockq_rewind(s->memblockq, nbytes); +} + +static void sink_input_set_max_rewind_cb(pa_sink_input *i, size_t nbytes) { + playback_stream *s; + + pa_sink_input_assert_ref(i); + s = PLAYBACK_STREAM(i->userdata); + playback_stream_assert_ref(s); + + pa_memblockq_set_maxrewind(s->memblockq, nbytes); +} + /* Called from main context */ static void sink_input_kill_cb(pa_sink_input *i) { playback_stream *s; @@ -1416,7 +1529,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) | (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0); - s = playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tlength, &prebuf, &minreq, &volume, muted, syncid, &missing, flags, p, adjust_latency); + s = playback_stream_new(c, sink, &ss, &map, &maxlength, &tlength, &prebuf, &minreq, &volume, muted, syncid, &missing, flags, p, adjust_latency); pa_proplist_free(p); CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); @@ -1625,7 +1738,7 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) | (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0); - s = record_stream_new(c, source, &ss, &map, name, &maxlength, &fragment_size, flags, p, adjust_latency); + s = record_stream_new(c, source, &ss, &map, &maxlength, &fragment_size, flags, p, adjust_latency); pa_proplist_free(p); CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); @@ -1721,7 +1834,6 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t pa_log_warn("failed to get GID of group '%s'", c->protocol->auth_group); else if (gid == creds->gid) success = 1; - if (!success) { if ((r = pa_uid_in_group(creds->uid, c->protocol->auth_group)) < 0) pa_log_warn("failed to check group membership."); @@ -1739,7 +1851,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t pa_mempool_is_shared(c->protocol->core->mempool) && creds->uid == getuid()) { - pa_pstream_use_shm(c->pstream, 1); + pa_pstream_enable_shm(c->pstream, TRUE); pa_log_info("Enabled SHM for new connection"); } |