From be4a8828360b3607414c3ebfd836494e6490267d Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 13 Jun 2007 22:08:14 +0000 Subject: A lot of more work to get the lock-free stuff in place git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1474 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/source-output.c | 58 +++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 27 deletions(-) (limited to 'src/pulsecore/source-output.c') diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 517c033d..2211f251 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -38,6 +38,10 @@ #include "source-output.h" +static PA_DEFINE_CHECK_TYPE(pa_source_output, source_output_check_type, pa_msgobject_check_type); + +static void source_output_free(pa_object* mo); + pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_data *data) { pa_assert(data); @@ -126,13 +130,12 @@ pa_source_output* pa_source_output_new( data->resample_method = pa_resampler_get_method(resampler); } - o = pa_source_output_new(pa_source_output); - + o = pa_msgobject_new(pa_source_output, source_output_check_type); o->parent.parent.free = source_output_free; o->parent.process_msg = pa_source_output_process_msg; o->core = core; - pa_atomic_load(&o->state, PA_SOURCE_OUTPUT_RUNNING); + pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_RUNNING); o->flags = flags; o->name = pa_xstrdup(data->name); o->driver = pa_xstrdup(data->driver); @@ -168,27 +171,29 @@ pa_source_output* pa_source_output_new( void pa_source_output_disconnect(pa_source_output*o) { pa_assert(o); - pa_return_if_fail(pa_source_output_get_state(i) != PA_SOURCE_OUTPUT_DISCONNECTED); + pa_return_if_fail(pa_source_output_get_state(o) != PA_SOURCE_OUTPUT_DISCONNECTED); pa_assert(o->source); pa_assert(o->source->core); - pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL); + pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL); pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL); pa_idxset_remove_by_data(o->source->outputs, o, NULL); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index); - o->source = NULL; + pa_source_update_status(o->source); + + o->source = NULL; o->process_msg = NULL; o->push = NULL; o->kill = NULL; o->get_latency = NULL; - pa_atomic_load(&i->state, PA_SOURCE_OUTPUT_DISCONNECTED); + pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_DISCONNECTED); } -static void source_output_free(pa_msgobject* mo) { +static void source_output_free(pa_object* mo) { pa_source_output *o = PA_SOURCE_OUTPUT(mo); pa_assert(pa_source_output_refcnt(o) == 0); @@ -208,10 +213,10 @@ static void source_output_free(pa_msgobject* mo) { void pa_source_output_put(pa_source_output *o) { pa_source_output_assert_ref(o); - pa_asyncmsgq_post(o->source->asyncmsgq, o->source, PA_SOURCE_MESSAGE_ADD_OUTPUT, o, NULL, pa_source_unref, pa_source_output_unref); + pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), NULL, (pa_free_cb_t) pa_source_output_unref); pa_source_update_status(o->source); - pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); + pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); } void pa_source_output_kill(pa_source_output*o) { @@ -226,7 +231,7 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) { pa_source_output_assert_ref(o); - if (pa_asyncmsgq_send(o->source->asyncmsgq, i->source, PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0) + if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0) r = 0; if (o->get_latency) @@ -250,12 +255,12 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { pa_assert(state = PA_SOURCE_OUTPUT_RUNNING); - if (!o->resampler) { + if (!o->thread_info.resampler) { o->push(o, chunk); return; } - pa_resampler_run(o->resampler, chunk, &rchunk); + pa_resampler_run(o->thread_info.resampler, chunk, &rchunk); if (!rchunk.length) return; @@ -265,7 +270,6 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { } void pa_source_output_cork(pa_source_output *o, int b) { - int n; pa_source_output_state_t state; pa_source_output_assert_ref(o); @@ -274,23 +278,23 @@ void pa_source_output_cork(pa_source_output *o, int b) { pa_assert(state != PA_SOURCE_OUTPUT_DISCONNECTED); if (b && state != PA_SOURCE_OUTPUT_CORKED) - pa_atomic_store(o->state, PA_SOURCE_OUTPUT_CORKED); + pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_CORKED); else if (!b && state == PA_SOURCE_OUTPUT_CORKED) - pa_atomic_cmpxchg(o->state, state, PA_SOURCE_OUTPUT_RUNNING); + pa_atomic_cmpxchg(&o->state, state, PA_SOURCE_OUTPUT_RUNNING); } int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) { pa_source_output_assert_ref(o); pa_return_val_if_fail(o->thread_info.resampler, -1); - if (i->sample_spec.rate == rate) + if (o->sample_spec.rate == rate) return 0; - i->sample_spec.rate = rate; + o->sample_spec.rate = rate; - pa_asyncmsgq_post(s->asyncmsgq, pa_source_output_ref(i), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, pa_source_output_unref, NULL); + pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL); - pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT!|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); + pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index); return 0; } @@ -316,8 +320,8 @@ pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o) { } int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { - pa_source *origin; - pa_resampler *new_resampler = NULL; +/* pa_source *origin; */ +/* pa_resampler *new_resampler = NULL; */ pa_source_output_assert_ref(o); pa_source_assert_ref(dest); @@ -344,7 +348,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { /* else if (!pa_sample_spec_equal(&o->sample_spec, &dest->sample_spec) || */ /* !pa_channel_map_equal(&o->channel_map, &dest->channel_map)) { */ -/* /\* Okey, we need a new resampler for the new sink *\/ */ +/* /\* Okey, we need a new resampler for the new source *\/ */ /* if (!(new_resampler = pa_resampler_new( */ /* dest->core->mempool, */ @@ -376,16 +380,16 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { } int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk* chunk) { - pa_source_output *o = PA_SOURCE_OUTPUT(o); + pa_source_output *o = PA_SOURCE_OUTPUT(mo); - pa_source_output_assert_ref(i); + pa_source_output_assert_ref(o); switch (code) { case PA_SOURCE_OUTPUT_MESSAGE_SET_RATE: { - i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); - pa_resampler_set_output_rate(i->resampler, PA_PTR_TO_UINT(userdata)); + o->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); + pa_resampler_set_output_rate(o->thread_info.resampler, PA_PTR_TO_UINT(userdata)); return 0; } -- cgit