diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/pulsecore/sink-input.c | 83 | ||||
-rw-r--r-- | src/pulsecore/sink-input.h | 15 | ||||
-rw-r--r-- | src/pulsecore/source-output.c | 53 | ||||
-rw-r--r-- | src/pulsecore/source-output.h | 8 |
4 files changed, 98 insertions, 61 deletions
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index 2c6b356c..d27f00f0 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -166,7 +166,7 @@ pa_sink_input* pa_sink_input_new( i->parent.process_msg = pa_sink_input_process_msg; i->core = core; - pa_atomic_store(&i->state, PA_SINK_INPUT_DRAINED); + i->state = PA_SINK_INPUT_RUNNING; i->flags = flags; i->name = pa_xstrdup(data->name); i->driver = pa_xstrdup(data->driver); @@ -181,7 +181,6 @@ pa_sink_input* pa_sink_input_new( i->volume = data->volume; i->muted = data->muted; - i->process_msg = NULL; i->peek = NULL; i->drop = NULL; i->kill = NULL; @@ -189,6 +188,9 @@ pa_sink_input* pa_sink_input_new( i->underrun = NULL; i->userdata = NULL; + i->thread_info.state = i->state; + pa_atomic_store(&i->thread_info.drained, 1); + i->thread_info.sample_spec = i->sample_spec; i->thread_info.silence_memblock = NULL; /* i->thread_info.move_silence = 0; */ pa_memchunk_reset(&i->thread_info.resampled_chunk); @@ -210,28 +212,41 @@ pa_sink_input* pa_sink_input_new( return i; } +static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) { + pa_assert(i); + + if (state == PA_SINK_INPUT_DRAINED) + state = PA_SINK_INPUT_RUNNING; + + if (i->state == state) + return 0; + + if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0) + return -1; + + i->state = state; + return 0; +} + void pa_sink_input_disconnect(pa_sink_input *i) { pa_assert(i); - pa_return_if_fail(pa_sink_input_get_state(i) != PA_SINK_INPUT_DISCONNECTED); + pa_return_if_fail(i->state != PA_SINK_INPUT_DISCONNECTED); pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL); - pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL); pa_idxset_remove_by_data(i->sink->inputs, i, NULL); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index); + sink_input_set_state(i, PA_SINK_INPUT_DISCONNECTED); pa_sink_update_status(i->sink); - + i->sink = NULL; - i->process_msg = NULL; i->peek = NULL; i->drop = NULL; i->kill = NULL; i->get_latency = NULL; i->underrun = NULL; - - pa_atomic_store(&i->state, PA_SINK_INPUT_DISCONNECTED); } static void sink_input_free(pa_object *o) { @@ -240,7 +255,8 @@ static void sink_input_free(pa_object *o) { pa_assert(i); pa_assert(pa_sink_input_refcnt(i) == 0); - pa_sink_input_disconnect(i); + if (i->state != PA_SINK_INPUT_DISCONNECTED) + pa_sink_input_disconnect(i); pa_log_info("Freeing output %u \"%s\"", i->index, i->name); @@ -295,21 +311,15 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) int ret = -1; int do_volume_adj_here; int volume_is_norm; - pa_sink_input_state_t state; pa_sink_input_assert_ref(i); pa_assert(chunk); pa_assert(volume); - state = pa_sink_input_get_state(i); - - if (state == PA_SINK_INPUT_DISCONNECTED) - return -1; - - if (!i->peek || !i->drop || state == PA_SINK_INPUT_CORKED) + if (!i->peek || !i->drop || i->thread_info.state == PA_SINK_INPUT_DISCONNECTED || i->thread_info.state == PA_SINK_INPUT_CORKED) goto finish; - pa_assert(state == PA_SINK_INPUT_RUNNING || state == PA_SINK_INPUT_DRAINED); + pa_assert(i->thread_info.state == PA_SINK_INPUT_RUNNING || i->thread_info.state == PA_SINK_INPUT_DRAINED); /* if (i->thread_info.move_silence > 0) { */ /* size_t l; */ @@ -359,7 +369,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) /* It might be necessary to adjust the volume here */ if (do_volume_adj_here && !volume_is_norm) { pa_memchunk_make_writable(&tchunk, 0); - pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.volume); + pa_volume_memchunk(&tchunk, &i->thread_info.sample_spec, &i->thread_info.volume); } pa_resampler_run(i->thread_info.resampler, &tchunk, &i->thread_info.resampled_chunk); @@ -376,13 +386,13 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) finish: - if (ret < 0 && state == PA_SINK_INPUT_RUNNING && i->underrun) + if (ret < 0 && !pa_atomic_load(&i->thread_info.drained) && i->underrun) i->underrun(i); if (ret >= 0) - pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_RUNNING); - else if (ret < 0 && state == PA_SINK_INPUT_RUNNING) - pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED); + pa_atomic_store(&i->thread_info.drained, 0); + else if (ret < 0) + pa_atomic_store(&i->thread_info.drained, 1); if (ret >= 0) { /* Let's see if we had to apply the volume adjustment @@ -487,17 +497,9 @@ int pa_sink_input_get_mute(pa_sink_input *i) { } void pa_sink_input_cork(pa_sink_input *i, int b) { - pa_sink_input_state_t state; - pa_sink_input_assert_ref(i); - state = pa_sink_input_get_state(i); - pa_assert(state != PA_SINK_INPUT_DISCONNECTED); - - if (b && state != PA_SINK_INPUT_CORKED) - pa_atomic_store(&i->state, PA_SINK_INPUT_CORKED); - else if (!b && state == PA_SINK_INPUT_CORKED) - pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED); + sink_input_set_state(i, b ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT_RUNNING); } int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) { @@ -730,7 +732,26 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc return 0; } + + case PA_SINK_INPUT_MESSAGE_SET_STATE: { + if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) && + (i->thread_info.state != PA_SINK_INPUT_DRAINED) && (i->thread_info.state != PA_SINK_INPUT_RUNNING)) + pa_atomic_store(&i->thread_info.drained, 1); + + i->thread_info.state = PA_PTR_TO_UINT(userdata); + + return 0; + } } return -1; } + +pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i) { + pa_sink_input_assert_ref(i); + + if (i->state == PA_SINK_INPUT_RUNNING || i->state == PA_SINK_INPUT_DRAINED) + return pa_atomic_load(&i->thread_info.drained) ? PA_SINK_INPUT_DRAINED : PA_SINK_INPUT_RUNNING; + + return i->state; +} diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index a8c05b85..426e48c0 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -39,8 +39,8 @@ typedef struct pa_sink_input pa_sink_input; #include <pulsecore/core.h> typedef enum pa_sink_input_state { - PA_SINK_INPUT_RUNNING, /*< The stream is alive and kicking */ PA_SINK_INPUT_DRAINED, /*< The stream stopped playing because there was no data to play */ + PA_SINK_INPUT_RUNNING, /*< The stream is alive and kicking */ PA_SINK_INPUT_CORKED, /*< The stream was corked on user request */ PA_SINK_INPUT_DISCONNECTED /*< The stream is dead */ } pa_sink_input_state_t; @@ -55,7 +55,11 @@ struct pa_sink_input { uint32_t index; pa_core *core; - pa_atomic_t state; + + /* Please note that this state should only be read with + * pa_sink_input_get_state(). That function will transparently + * merge the thread_info.drained value in. */ + pa_sink_input_state_t state; pa_sink_input_flags_t flags; char *name, *driver; /* may be NULL */ @@ -70,7 +74,6 @@ struct pa_sink_input { pa_cvolume volume; int muted; - int (*process_msg)(pa_sink_input *i, int code, void *userdata); int (*peek) (pa_sink_input *i, pa_memchunk *chunk); void (*drop) (pa_sink_input *i, const pa_memchunk *chunk, size_t length); void (*kill) (pa_sink_input *i); /* may be NULL */ @@ -80,6 +83,9 @@ struct pa_sink_input { pa_resample_method_t resample_method; struct { + pa_sink_input_state_t state; + pa_atomic_t drained; + pa_sample_spec sample_spec; pa_memchunk resampled_chunk; @@ -106,6 +112,7 @@ enum { PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_SINK_INPUT_MESSAGE_GET_LATENCY, PA_SINK_INPUT_MESSAGE_SET_RATE, + PA_SINK_INPUT_MESSAGE_SET_STATE, PA_SINK_INPUT_MESSAGE_MAX }; @@ -166,7 +173,7 @@ pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i); int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately); -#define pa_sink_input_get_state(i) ((pa_sink_input_state_t) pa_atomic_load(&i->state)) +pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i); /* To be used exclusively by the sink driver thread */ diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 2211f251..1b93c06d 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -135,7 +135,7 @@ pa_source_output* pa_source_output_new( o->parent.process_msg = pa_source_output_process_msg; o->core = core; - pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_RUNNING); + o->state = PA_SOURCE_OUTPUT_RUNNING; o->flags = flags; o->name = pa_xstrdup(data->name); o->driver = pa_xstrdup(data->driver); @@ -147,12 +147,13 @@ pa_source_output* pa_source_output_new( o->sample_spec = data->sample_spec; o->channel_map = data->channel_map; - o->process_msg = NULL; o->push = NULL; o->kill = NULL; o->get_latency = NULL; o->userdata = NULL; + o->thread_info.state = o->state; + o->thread_info.sample_spec = o->sample_spec; o->thread_info.resampler = resampler; pa_assert_se(pa_idxset_put(core->source_outputs, o, &o->index) == 0); @@ -169,11 +170,22 @@ pa_source_output* pa_source_output_new( return o; } +static int source_output_set_state(pa_source_output *o, pa_source_output_state_t state) { + pa_assert(o); + + if (o->state == state) + return 0; + + if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0) + return -1; + + o->state = state; + return 0; +} + void pa_source_output_disconnect(pa_source_output*o) { pa_assert(o); - pa_return_if_fail(pa_source_output_get_state(o) != PA_SOURCE_OUTPUT_DISCONNECTED); - pa_assert(o->source); - pa_assert(o->source->core); + pa_return_if_fail(o->state != PA_SOURCE_OUTPUT_DISCONNECTED); pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL); @@ -182,15 +194,13 @@ void pa_source_output_disconnect(pa_source_output*o) { pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index); + source_output_set_state(o, PA_SOURCE_OUTPUT_DISCONNECTED); pa_source_update_status(o->source); o->source = NULL; - o->process_msg = NULL; o->push = NULL; o->kill = NULL; o->get_latency = NULL; - - pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_DISCONNECTED); } static void source_output_free(pa_object* mo) { @@ -198,7 +208,8 @@ static void source_output_free(pa_object* mo) { pa_assert(pa_source_output_refcnt(o) == 0); - pa_source_output_disconnect(o); + if (o->state != PA_SOURCE_OUTPUT_DISCONNECTED) + pa_source_output_disconnect(o); pa_log_info("Freeing output %u \"%s\"", o->index, o->name); @@ -242,18 +253,15 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) { void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { pa_memchunk rchunk; - pa_source_output_state_t state; pa_source_output_assert_ref(o); pa_assert(chunk); pa_assert(chunk->length); - state = pa_source_output_get_state(o); - - if (!o->push || state == PA_SOURCE_OUTPUT_DISCONNECTED || state == PA_SOURCE_OUTPUT_CORKED) + if (!o->push || o->state == PA_SOURCE_OUTPUT_DISCONNECTED || o->state == PA_SOURCE_OUTPUT_CORKED) return; - pa_assert(state = PA_SOURCE_OUTPUT_RUNNING); + pa_assert(o->state = PA_SOURCE_OUTPUT_RUNNING); if (!o->thread_info.resampler) { o->push(o, chunk); @@ -270,17 +278,9 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { } void pa_source_output_cork(pa_source_output *o, int b) { - pa_source_output_state_t state; - pa_source_output_assert_ref(o); - state = pa_source_output_get_state(o); - pa_assert(state != PA_SOURCE_OUTPUT_DISCONNECTED); - - if (b && state != PA_SOURCE_OUTPUT_CORKED) - pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_CORKED); - else if (!b && state == PA_SOURCE_OUTPUT_CORKED) - pa_atomic_cmpxchg(&o->state, state, PA_SOURCE_OUTPUT_RUNNING); + source_output_set_state(o, b ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING); } int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) { @@ -393,6 +393,13 @@ int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_ return 0; } + + case PA_SOURCE_OUTPUT_MESSAGE_SET_STATE: { + o->thread_info.state = PA_PTR_TO_UINT(userdata); + + return 0; + } + } return -1; diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h index d3bc0bc4..7b6afe81 100644 --- a/src/pulsecore/source-output.h +++ b/src/pulsecore/source-output.h @@ -51,7 +51,7 @@ struct pa_source_output { uint32_t index; pa_core *core; - pa_atomic_t state; + pa_source_output_state_t state; pa_source_output_flags_t flags; char *name, *driver; /* may be NULL */ @@ -63,7 +63,6 @@ struct pa_source_output { pa_sample_spec sample_spec; pa_channel_map channel_map; - int (*process_msg)(pa_sink_input *i, int code, void *userdata); void (*push)(pa_source_output *o, const pa_memchunk *chunk); void (*kill)(pa_source_output* o); /* may be NULL */ pa_usec_t (*get_latency) (pa_source_output *o); /* may be NULL */ @@ -71,6 +70,8 @@ struct pa_source_output { pa_resample_method_t resample_method; struct { + pa_source_output_state_t state; + pa_sample_spec sample_spec; pa_resampler* resampler; /* may be NULL */ @@ -85,6 +86,7 @@ PA_DECLARE_CLASS(pa_source_output); enum { PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, + PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_SOURCE_OUTPUT_MESSAGE_MAX }; @@ -135,7 +137,7 @@ pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o); int pa_source_output_move_to(pa_source_output *o, pa_source *dest); -#define pa_source_output_get_state(o) ((pa_source_output_state_t) pa_atomic_load(&o->state)) +#define pa_source_output_get_state(o) ((o)->state) /* To be used exclusively by the source driver thread */ |