From add6c0361ada8c2aa632392efcec538a07af5bce Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 20 Jun 2008 22:32:41 +0200 Subject: Rework module-combine to work with glitch-free core; add new max_request field to pa_sink --- src/pulsecore/source-output.c | 89 ++++++++++++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 30 deletions(-) (limited to 'src/pulsecore/source-output.c') diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index dbb362fc..3d1abe30 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -74,12 +74,15 @@ void pa_source_output_new_data_done(pa_source_output_new_data *data) { pa_proplist_free(data->proplist); } +/* Called from main context */ static void reset_callbacks(pa_source_output *o) { pa_assert(o); o->push = NULL; o->process_rewind = NULL; o->update_max_rewind = NULL; + o->update_source_requested_latency = NULL; + o->update_source_latency_range = NULL; o->attach = NULL; o->detach = NULL; o->suspend = NULL; @@ -89,6 +92,7 @@ static void reset_callbacks(pa_source_output *o) { o->state_change = NULL; } +/* Called from main context */ pa_source_output* pa_source_output_new( pa_core *core, pa_source_output_new_data *data, @@ -107,7 +111,7 @@ pa_source_output* pa_source_output_new( pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver)); if (!data->source) - data->source = pa_namereg_get(core, NULL, PA_NAMEREG_SOURCE, 1); + data->source = pa_namereg_get(core, NULL, PA_NAMEREG_SOURCE, TRUE); pa_return_null_if_fail(data->source); pa_return_null_if_fail(pa_source_get_state(data->source) != PA_SOURCE_UNLINKED); @@ -232,6 +236,7 @@ pa_source_output* pa_source_output_new( return o; } +/* Called from main context */ static void update_n_corked(pa_source_output *o, pa_source_output_state_t state) { pa_assert(o); @@ -243,14 +248,14 @@ static void update_n_corked(pa_source_output *o, pa_source_output_state_t state) pa_source_update_status(o->source); } +/* Called from main context */ static int source_output_set_state(pa_source_output *o, pa_source_output_state_t state) { pa_assert(o); if (o->state == state) return 0; - if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0) - return -1; + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) == 0); update_n_corked(o, state); o->state = state; @@ -261,6 +266,7 @@ static int source_output_set_state(pa_source_output *o, pa_source_output_state_t return 0; } +/* Called from main context */ void pa_source_output_unlink(pa_source_output*o) { pa_bool_t linked; pa_assert(o); @@ -286,7 +292,7 @@ void pa_source_output_unlink(pa_source_output*o) { if (linked) if (o->source->asyncmsgq) - pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL); + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL) == 0); reset_callbacks(o); @@ -299,6 +305,7 @@ void pa_source_output_unlink(pa_source_output*o) { pa_source_output_unref(o); } +/* Called from main context */ static void source_output_free(pa_object* mo) { pa_source_output *o = PA_SOURCE_OUTPUT(mo); @@ -325,45 +332,52 @@ static void source_output_free(pa_object* mo) { pa_xfree(o); } +/* Called from main context */ void pa_source_output_put(pa_source_output *o) { pa_source_output_state_t state; pa_source_output_assert_ref(o); pa_assert(o->state == PA_SOURCE_OUTPUT_INIT); + + /* The following fields must be initialized properly */ pa_assert(o->push); + pa_assert(o->kill); state = o->flags & PA_SOURCE_OUTPUT_START_CORKED ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING; update_n_corked(o, state); o->state = state; - pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL); + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL) == 0); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); pa_hook_fire(&o->source->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PUT], o); } +/* Called from main context */ void pa_source_output_kill(pa_source_output*o) { pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(o->state)); - if (o->kill) - o->kill(o); + o->kill(o); } -pa_usec_t pa_source_output_get_latency(pa_source_output *o) { - pa_usec_t r = 0; +/* Called from main context */ +pa_usec_t pa_source_output_get_latency(pa_source_output *o, pa_usec_t *source_latency) { + pa_usec_t r[2] = { 0, 0 }; pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(o->state)); - if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, 0, NULL) < 0) - r = 0; + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, r, 0, NULL) == 0); if (o->get_latency) - r += o->get_latency(o); + r[0] += o->get_latency(o); - return r; + if (source_latency) + *source_latency = r[1]; + + return r[0]; } /* Called from thread context */ @@ -464,42 +478,44 @@ void pa_source_output_update_max_rewind(pa_source_output *o, size_t nbytes /* i o->update_max_rewind(o, o->thread_info.resampler ? pa_resampler_result(o->thread_info.resampler, nbytes) : nbytes); } +/* Called from thread context */ static pa_usec_t fixup_latency(pa_source *s, pa_usec_t usec) { pa_source_assert_ref(s); if (usec == (pa_usec_t) -1) return usec; - if (s->max_latency > 0 && usec > s->max_latency) - usec = s->max_latency; + if (s->thread_info.max_latency > 0 && usec > s->thread_info.max_latency) + usec = s->thread_info.max_latency; - if (s->min_latency > 0 && usec < s->min_latency) - usec = s->min_latency; + if (s->thread_info.min_latency > 0 && usec < s->thread_info.min_latency) + usec = s->thread_info.min_latency; return usec; } +/* Called from thread context */ pa_usec_t pa_source_output_set_requested_latency_within_thread(pa_source_output *o, pa_usec_t usec) { pa_source_output_assert_ref(o); usec = fixup_latency(o->source, usec); - o->thread_info.requested_source_latency = usec; pa_source_invalidate_requested_latency(o->source); return usec; } +/* Called from main context */ pa_usec_t pa_source_output_set_requested_latency(pa_source_output *o, pa_usec_t usec) { pa_source_output_assert_ref(o); - usec = fixup_latency(o->source, usec); - if (PA_SOURCE_OUTPUT_IS_LINKED(o->state)) - pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, (int64_t) usec, NULL, NULL); + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_REQUESTED_LATENCY, &usec, 0, NULL) == 0); else { /* If this sink input is not realized yet, we have to touch * the thread info data directly */ + + usec = fixup_latency(o->source, usec); o->thread_info.requested_source_latency = usec; o->source->thread_info.requested_latency_valid = FALSE; } @@ -507,13 +523,14 @@ pa_usec_t pa_source_output_set_requested_latency(pa_source_output *o, pa_usec_t return usec; } +/* Called from main context */ pa_usec_t pa_source_output_get_requested_latency(pa_source_output *o) { pa_usec_t usec = 0; pa_source_output_assert_ref(o); if (PA_SOURCE_OUTPUT_IS_LINKED(o->state)) - pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL); + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL) == 0); else /* If this sink input is not realized yet, we have to touch * the thread info data directly */ @@ -522,6 +539,7 @@ pa_usec_t pa_source_output_get_requested_latency(pa_source_output *o) { return usec; } +/* Called from main context */ void pa_source_output_cork(pa_source_output *o, pa_bool_t b) { pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(o->state)); @@ -529,6 +547,7 @@ void pa_source_output_cork(pa_source_output *o, pa_bool_t b) { source_output_set_state(o, b ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING); } +/* Called from main context */ int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) { pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(o->state)); @@ -545,6 +564,7 @@ int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) { return 0; } +/* Called from main context */ void pa_source_output_set_name(pa_source_output *o, const char *name) { const char *old; pa_source_output_assert_ref(o); @@ -568,12 +588,14 @@ void pa_source_output_set_name(pa_source_output *o, const char *name) { } } +/* Called from main context */ pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o) { pa_source_output_assert_ref(o); return o->resample_method; } +/* Called from main context */ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { pa_source *origin; pa_resampler *new_resampler; @@ -631,7 +653,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { pa_hook_fire(&o->source->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE], &hook_data); /* Okey, let's move it */ - pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL); + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL) == 0); pa_idxset_remove_by_data(origin->outputs, o, NULL); pa_idxset_put(dest->outputs, o, NULL); @@ -664,7 +686,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { pa_source_update_status(origin); pa_source_update_status(dest); - pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL); + pa_assert_se(pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL) == 0); if (o->moved) o->moved(o); @@ -679,6 +701,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { return 0; } +/* Called from IO thread context */ void pa_source_output_set_state_within_thread(pa_source_output *o, pa_source_output_state_t state) { pa_source_output_assert_ref(o); @@ -691,19 +714,22 @@ void pa_source_output_set_state_within_thread(pa_source_output *o, pa_source_out o->thread_info.state = state; } -/* Called from thread context */ +/* Called from IO thread context, except when it is not */ int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk* chunk) { pa_source_output *o = PA_SOURCE_OUTPUT(mo); - pa_source_output_assert_ref(o); - pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(o->thread_info.state)); switch (code) { case PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = userdata; + pa_usec_t source_usec = 0; + + r[0] += pa_bytes_to_usec(pa_memblockq_get_length(o->thread_info.delay_memblockq), &o->source->sample_spec); + + if (o->source->parent.process_msg(PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_GET_LATENCY, &source_usec, 0, NULL) >= 0) + r[1] += source_usec; - *r += pa_bytes_to_usec(pa_memblockq_get_length(o->thread_info.delay_memblockq), &o->source->sample_spec); return 0; } @@ -718,10 +744,13 @@ int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int pa_source_output_set_state_within_thread(o, PA_PTR_TO_UINT(userdata)); return 0; - case PA_SOURCE_OUTPUT_MESSAGE_SET_REQUESTED_LATENCY: + case PA_SOURCE_OUTPUT_MESSAGE_SET_REQUESTED_LATENCY: { + pa_usec_t *usec = userdata; + + *usec = pa_source_output_set_requested_latency_within_thread(o, *usec); - pa_source_output_set_requested_latency_within_thread(o, (pa_usec_t) offset); return 0; + } case PA_SINK_INPUT_MESSAGE_GET_REQUESTED_LATENCY: { pa_usec_t *r = userdata; -- cgit