summaryrefslogtreecommitdiffstats
path: root/src/modules/module-combine.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/module-combine.c')
-rw-r--r--src/modules/module-combine.c85
1 files changed, 27 insertions, 58 deletions
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 11707b19..7df04ec5 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -139,6 +139,10 @@ enum {
SINK_MESSAGE_REMOVE_OUTPUT
};
+enum {
+ SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX
+};
+
static void output_free(struct output *o);
static int output_create_sink_input(struct userdata *u, struct output *o);
static int update_master(struct userdata *u, struct output *o);
@@ -255,28 +259,17 @@ static void thread_func(void *userdata) {
} else
pa_rtpoll_set_timer_disabled(u->rtpoll);
- /* Now give the sink inputs some to time to process their data */
- if ((ret = pa_sink_process_inputs(u->sink)) < 0)
+ /* Hmm, nothing to do. Let's sleep */
+ if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
goto fail;
- if (ret > 0)
- continue;
- /* Check whether there is a message for us to process */
- if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
+ if (ret == 0)
goto finish;
- if (ret > 0)
- continue;
-
- /* Hmm, nothing to do. Let's sleep */
- if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
- pa_log("poll() failed: %s", pa_cstrerror(errno));
- goto fail;
- }
}
fail:
- /* We have to continue processing messages until we receive the
- * SHUTDOWN message */
+ /* If this was no regular exit from the loop we have to continue
+ * processing messages until we received PA_MESSAGE_SHUTDOWN */
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
@@ -294,10 +287,8 @@ static void request_memblock(struct output *o) {
/* If another thread already prepared some data we received
* the data over the asyncmsgq, hence let's first process
* it. */
- while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
- pa_memblockq_push_align(o->memblockq, &chunk);
- pa_asyncmsgq_done(o->asyncmsgq, 0);
- }
+ while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
+ ;
/* Check whether we're now readable */
if (pa_memblockq_is_readable(o->memblockq))
@@ -309,10 +300,8 @@ static void request_memblock(struct output *o) {
if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) {
/* Maybe there's some data now? */
- while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
- pa_memblockq_push_align(o->memblockq, &chunk);
- pa_asyncmsgq_done(o->asyncmsgq, 0);
- }
+ while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
+ ;
/* Ok, now let's prepare some data if we really have to */
while (!pa_memblockq_is_readable(o->memblockq)) {
@@ -324,7 +313,7 @@ static void request_memblock(struct output *o) {
/* OK, let's send this data to the other threads */
for (j = o->userdata->thread_info.outputs; j; j = j->next)
if (j != o && j->sink_input)
- pa_asyncmsgq_post(j->asyncmsgq, NULL, 0, NULL, 0, &chunk, NULL);
+ pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
/* And push it into our own queue */
pa_memblockq_push_align(o->memblockq, &chunk);
@@ -362,37 +351,6 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
}
/* Called from I/O thread context */
-static int sink_input_process_cb(pa_sink_input *i) {
- struct output *o;
- pa_memchunk chunk;
- int r = 0;
-
- pa_sink_input_assert_ref(i);
- o = i->userdata;
- pa_assert(o);
-
- /* Move all data in the asyncmsgq into our memblockq */
-
- while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
- if (PA_SINK_OPENED(i->sink->thread_info.state))
- pa_memblockq_push_align(o->memblockq, &chunk);
- pa_asyncmsgq_done(o->asyncmsgq, 0);
- }
-
- /* If the sink is suspended, flush our queue */
- if (!PA_SINK_OPENED(i->sink->thread_info.state))
- pa_memblockq_flush(o->memblockq);
-
- if (o == o->userdata->thread_info.master) {
- pa_mutex_lock(o->userdata->mutex);
- r = pa_sink_process_inputs(o->userdata->sink);
- pa_mutex_unlock(o->userdata->mutex);
- }
-
- return r;
-}
-
-/* Called from I/O thread context */
static void sink_input_attach_cb(pa_sink_input *i) {
struct output *o;
@@ -401,7 +359,10 @@ static void sink_input_attach_cb(pa_sink_input *i) {
pa_assert(o);
pa_assert(!o->rtpoll_item);
- o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(i->sink->rtpoll, PA_RTPOLL_NORMAL, o->asyncmsgq);
+ o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
+ i->sink->rtpoll,
+ PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */
+ o->asyncmsgq);
}
/* Called from I/O thread context */
@@ -448,6 +409,15 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
break;
}
+ case SINK_INPUT_MESSAGE_POST: {
+
+ if (PA_SINK_OPENED(o->sink_input->sink->thread_info.state))
+ pa_memblockq_push_align(o->memblockq, chunk);
+ else
+ pa_memblockq_flush(o->memblockq);
+
+ break;
+ }
}
return pa_sink_input_process_msg(obj, code, data, offset, chunk);
@@ -784,7 +754,6 @@ static int output_create_sink_input(struct userdata *u, struct output *o) {
o->sink_input->parent.process_msg = sink_input_process_msg;
o->sink_input->peek = sink_input_peek_cb;
o->sink_input->drop = sink_input_drop_cb;
- o->sink_input->process = sink_input_process_cb;
o->sink_input->attach = sink_input_attach_cb;
o->sink_input->detach = sink_input_detach_cb;
o->sink_input->kill = sink_input_kill_cb;