diff options
author | Lennart Poettering <lennart@poettering.net> | 2007-09-19 22:21:55 +0000 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2007-09-19 22:21:55 +0000 |
commit | 75647bc38f8a65f45c6cee23d5b373c6c3b3ecdc (patch) | |
tree | 74e9f483e6c4bbff65a0a621300c8960d86b4ec6 | |
parent | a8a9ee499d400fd54d4f67340247f78fb4ab2a5c (diff) |
render new data always in the master sink's thread, fixing missing locking
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1871 fefdeb5f-60dc-0310-8127-8f9354f1896f
-rw-r--r-- | src/modules/module-combine.c | 198 |
1 files changed, 133 insertions, 65 deletions
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c index feadf4f9..9923f2e8 100644 --- a/src/modules/module-combine.c +++ b/src/modules/module-combine.c @@ -87,8 +87,9 @@ struct output { pa_sink *sink; pa_sink_input *sink_input; - pa_asyncmsgq *asyncmsgq; - pa_rtpoll_item *rtpoll_item; + pa_asyncmsgq *inq, /* Message queue from the master to this sink input */ + *outq; /* Message queue from this sink input to the master */ + pa_rtpoll_item *inq_rtpoll_item, *outq_rtpoll_item; pa_memblockq *memblockq; @@ -106,8 +107,6 @@ struct userdata { pa_thread_mq thread_mq; pa_rtpoll *rtpoll; - pa_mutex *mutex; - struct output *master; pa_time_event *time_event; @@ -134,7 +133,8 @@ struct userdata { enum { SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX, - SINK_MESSAGE_REMOVE_OUTPUT + SINK_MESSAGE_REMOVE_OUTPUT, + SINK_MESSAGE_NEED }; enum { @@ -275,9 +275,51 @@ finish: pa_log_debug("Thread shutting down"); } -static void request_memblock(struct output *o, size_t length) { - pa_memchunk chunk; +static void render_memblock(struct userdata *u, struct output *o, size_t length) { + pa_assert(u); + pa_assert(o); + + if (!PA_SINK_OPENED(u->sink->thread_info.state)) + return; + + /* We are run by the master output (u->master), possibly on behalf + * of another output (o). The other output is waiting for us, + * hence it is safe to access its mainblockq directly. */ + /* Maybe there's some data in the requesting output's queue + * now? */ + while (pa_asyncmsgq_process_one(o->inq) > 0) + ; + + /* Ok, now let's prepare some data if we really have to */ + while (!pa_memblockq_is_readable(o->memblockq)) { + struct output *j; + pa_memchunk chunk; + + /* Render data! */ + pa_sink_render(u->sink, length, &chunk); + + /* OK, let's send this data to the other threads */ + for (j = o->userdata->thread_info.outputs; j; j = j->next) + + /* Send to other outputs, which are not the requesting + * one, and not the master */ + + if (j != o && j != u->master && j->sink_input) + pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); + + /* Now push it into the master queue */ + pa_memblockq_push_align(u->master->memblockq, &chunk); + + /* And into the requesting output's queue */ + if (o != u->master) + pa_memblockq_push_align(o->memblockq, &chunk); + + pa_memblock_unref(chunk.memblock); + } +} + +static void request_memblock(struct output *o, size_t length) { pa_assert(o); pa_sink_input_assert_ref(o->sink_input); pa_sink_assert_ref(o->userdata->sink); @@ -285,7 +327,7 @@ static void request_memblock(struct output *o, size_t length) { /* If another thread already prepared some data we received * the data over the asyncmsgq, hence let's first process * it. */ - while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0) + while (pa_asyncmsgq_process_one(o->inq) > 0) ; /* Check whether we're now readable */ @@ -293,33 +335,16 @@ static void request_memblock(struct output *o, size_t length) { return; /* OK, we need to prepare new data */ - pa_mutex_lock(o->userdata->mutex); - if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) { - - /* Maybe there's some data now? */ - while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0) - ; - - /* Ok, now let's prepare some data if we really have to */ - while (!pa_memblockq_is_readable(o->memblockq)) { - struct output *j; - - /* Do it! */ - pa_sink_render(o->userdata->sink, length, &chunk); - - /* OK, let's send this data to the other threads */ - for (j = o->userdata->thread_info.outputs; j; j = j->next) - if (j != o && j->sink_input) - pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); - - /* And push it into our own queue */ - pa_memblockq_push_align(o->memblockq, &chunk); - pa_memblock_unref(chunk.memblock); - } - } - - pa_mutex_unlock(o->userdata->mutex); + if (o == o->userdata->master) + /* OK, we're the master, so let's render some data */ + render_memblock(o->userdata, o, length); + + else + /* We're not the master, we need to ask the master to do the + * rendering for us */ + + pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, length, NULL); } /* Called from I/O thread context */ @@ -327,8 +352,7 @@ static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chun struct output *o; pa_sink_input_assert_ref(i); - o = i->userdata; - pa_assert(o); + pa_assert_se(o = i->userdata); /* If necessary, get some new data */ request_memblock(o, length); @@ -342,8 +366,7 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) { pa_sink_input_assert_ref(i); pa_assert(length > 0); - o = i->userdata; - pa_assert(o); + pa_assert_se(o = i->userdata); pa_memblockq_drop(o->memblockq, length); } @@ -353,23 +376,42 @@ static void sink_input_attach_cb(pa_sink_input *i) { struct output *o; pa_sink_input_assert_ref(i); - o = i->userdata; - pa_assert(o); + pa_assert_se(o = i->userdata); + pa_assert(!o->inq_rtpoll_item); + if (o->userdata->master == o) { + struct output *k; + + pa_assert(!o->outq_rtpoll_item); + + /* Set up the queues from the outputs to the master */ + for (k = o->userdata->thread_info.outputs; k; k = k->next) { + + pa_assert(!k->outq_rtpoll_item); + + if (o == k) + continue; + + k->outq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq( + i->sink->rtpoll, + PA_RTPOLL_EARLY+1, /* This one has a slightly lower priority than the normal message handling */ + k->outq); + } + /* Calling these two functions here is safe, because both - * threads that might access this sink input are known to be + * threads that might access this sink are known to be * waiting for us. */ pa_sink_set_asyncmsgq(o->userdata->sink, i->sink->asyncmsgq); pa_sink_set_rtpoll(o->userdata->sink, i->sink->rtpoll); pa_sink_attach_within_thread(o->userdata->sink); } - - pa_assert(!o->rtpoll_item); - o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq( + + /* Set up the queues from the inputs to the master */ + o->inq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq( i->sink->rtpoll, PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */ - o->asyncmsgq); + o->inq); } /* Called from I/O thread context */ @@ -377,15 +419,27 @@ static void sink_input_detach_cb(pa_sink_input *i) { struct output *o; pa_sink_input_assert_ref(i); - o = i->userdata; - pa_assert(o); + pa_assert_se(o = i->userdata); - pa_assert(o->rtpoll_item); - pa_rtpoll_item_free(o->rtpoll_item); - o->rtpoll_item = NULL; + pa_assert(o->inq_rtpoll_item); + pa_rtpoll_item_free(o->inq_rtpoll_item); + o->inq_rtpoll_item = NULL; - if (o->userdata->master == o) + if (o->userdata->master == o) { + struct output *k; + pa_sink_detach_within_thread(o->userdata->sink); + + for (k = o->userdata->thread_info.outputs; k; k = k->next) { + + if (o == k) + continue; + + pa_assert(k->outq_rtpoll_item); + pa_rtpoll_item_free(k->outq_rtpoll_item); + k->outq_rtpoll_item = NULL; + } + } } /* Called from main context */ @@ -433,6 +487,7 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 return pa_sink_input_process_msg(obj, code, data, offset, chunk); } +/* Called from main context */ static int suspend(struct userdata *u) { struct output *o; uint32_t idx; @@ -458,6 +513,7 @@ static int suspend(struct userdata *u) { return 0; } +/* Called from main context */ static int unsuspend(struct userdata *u) { struct output *o; uint32_t idx; @@ -485,12 +541,12 @@ static int unsuspend(struct userdata *u) { return 0; } +/* Called from main context */ static int sink_set_state(pa_sink *sink, pa_sink_state_t state) { struct userdata *u; pa_sink_assert_ref(sink); - u = sink->userdata; - pa_assert(u); + pa_assert_se(u = sink->userdata); /* Please note that in contrast to the ALSA modules we call * suspend/unsuspend from main context here! */ @@ -575,6 +631,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse case SINK_MESSAGE_REMOVE_OUTPUT: PA_LLIST_REMOVE(struct output, u->thread_info.outputs, (struct output*) data); break; + + case SINK_MESSAGE_NEED: + render_memblock(u, data, (size_t) offset); + break; } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -765,8 +825,10 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) { o = pa_xnew(struct output, 1); o->userdata = u; - o->asyncmsgq = pa_asyncmsgq_new(0); - o->rtpoll_item = NULL; + o->inq = pa_asyncmsgq_new(0); + o->outq = pa_asyncmsgq_new(0); + o->inq_rtpoll_item = NULL; + o->outq_rtpoll_item = NULL; o->sink = sink; o->sink_input = NULL; o->memblockq = pa_memblockq_new( @@ -809,9 +871,12 @@ fail: if (o->memblockq) pa_memblockq_free(o->memblockq); - if (o->asyncmsgq) - pa_asyncmsgq_unref(o->asyncmsgq); + if (o->inq) + pa_asyncmsgq_unref(o->inq); + if (o->outq) + pa_asyncmsgq_unref(o->outq); + pa_xfree(o); } @@ -947,7 +1012,6 @@ int pa__init(pa_module*m) { u->thread_info.master = u->master = NULL; u->time_event = NULL; u->adjust_time = DEFAULT_ADJUST_TIME; - u->mutex = pa_mutex_new(FALSE, TRUE); pa_thread_mq_init(&u->thread_mq, m->core->mainloop); u->rtpoll = NULL; u->thread = NULL; @@ -1134,14 +1198,20 @@ static void output_free(struct output *o) { pa_sink_input_unref(o->sink_input); } - if (o->rtpoll_item) - pa_rtpoll_item_free(o->rtpoll_item); + if (o->inq_rtpoll_item) + pa_rtpoll_item_free(o->inq_rtpoll_item); + + if (o->outq_rtpoll_item) + pa_rtpoll_item_free(o->outq_rtpoll_item); + + if (o->inq) + pa_asyncmsgq_unref(o->inq); + + if (o->outq) + pa_asyncmsgq_unref(o->outq); if (o->memblockq) pa_memblockq_free(o->memblockq); - - if (o->asyncmsgq) - pa_asyncmsgq_unref(o->asyncmsgq); pa_xfree(o); } @@ -1190,8 +1260,6 @@ void pa__done(pa_module*m) { if (u->time_event) u->core->mainloop->time_free(u->time_event); - pa_mutex_free(u->mutex); - pa_xfree(u); } |