diff options
author | Lennart Poettering <lennart@poettering.net> | 2007-09-14 21:51:05 +0000 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2007-09-14 21:51:05 +0000 |
commit | 3396b65f15a06ff312e318bc05e502ba402c564e (patch) | |
tree | 5087148cbe36f765f9fff99528d812c198799c63 /src/modules | |
parent | f0b9dce32e4c5d77f57364ccdc7795f828f7f6a0 (diff) |
simplify rt loops a bit by moving more code into pa_rtpoll. It is now possible to attach "work" functions to a pa_rtpoll_item, which will be called in each loop iteration. This allows us to hide the message processing in the RT loops and to drop the seperate sink_input->process hooks. Basically, only the driver-specific code remains in the RT loops.
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1822 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/modules')
-rw-r--r-- | src/modules/module-alsa-sink.c | 21 | ||||
-rw-r--r-- | src/modules/module-alsa-source.c | 23 | ||||
-rw-r--r-- | src/modules/module-combine.c | 85 | ||||
-rw-r--r-- | src/modules/module-null-sink.c | 23 | ||||
-rw-r--r-- | src/modules/module-oss.c | 33 | ||||
-rw-r--r-- | src/modules/module-pipe-sink.c | 28 | ||||
-rw-r--r-- | src/modules/module-pipe-source.c | 23 |
7 files changed, 65 insertions, 171 deletions
diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c index 26c24c87..1bcb30c0 100644 --- a/src/modules/module-alsa-sink.c +++ b/src/modules/module-alsa-sink.c @@ -630,24 +630,13 @@ static void thread_func(void *userdata) { } } - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) + /* Hmm, nothing to do. Let's sleep */ + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - if (ret > 0) - continue; - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) + if (ret == 0) goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); - goto fail; - } - /* Tell ALSA about this and process its response */ if (PA_SINK_OPENED(u->sink->thread_info.state)) { struct pollfd *pollfd; @@ -676,8 +665,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index 9e03729a..870f204d 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -612,24 +612,13 @@ static void thread_func(void *userdata) { } } - /* Now give the source outputs some to time to process their data */ - if ((ret = pa_source_process_outputs(u->source)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + if (ret == 0) + goto finish; + /* Tell ALSA about this and process its response */ if (PA_SOURCE_OPENED(u->source->thread_info.state)) { struct pollfd *pollfd; @@ -658,8 +647,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c index 11707b19..7df04ec5 100644 --- a/src/modules/module-combine.c +++ b/src/modules/module-combine.c @@ -139,6 +139,10 @@ enum { SINK_MESSAGE_REMOVE_OUTPUT }; +enum { + SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX +}; + static void output_free(struct output *o); static int output_create_sink_input(struct userdata *u, struct output *o); static int update_master(struct userdata *u, struct output *o); @@ -255,28 +259,17 @@ static void thread_func(void *userdata) { } else pa_rtpoll_set_timer_disabled(u->rtpoll); - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) + /* Hmm, nothing to do. Let's sleep */ + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - if (ret > 0) - continue; - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) + if (ret == 0) goto finish; - if (ret > 0) - continue; - - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); - goto fail; - } } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); @@ -294,10 +287,8 @@ static void request_memblock(struct output *o) { /* If another thread already prepared some data we received * the data over the asyncmsgq, hence let's first process * it. */ - while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) { - pa_memblockq_push_align(o->memblockq, &chunk); - pa_asyncmsgq_done(o->asyncmsgq, 0); - } + while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0) + ; /* Check whether we're now readable */ if (pa_memblockq_is_readable(o->memblockq)) @@ -309,10 +300,8 @@ static void request_memblock(struct output *o) { if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) { /* Maybe there's some data now? */ - while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) { - pa_memblockq_push_align(o->memblockq, &chunk); - pa_asyncmsgq_done(o->asyncmsgq, 0); - } + while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0) + ; /* Ok, now let's prepare some data if we really have to */ while (!pa_memblockq_is_readable(o->memblockq)) { @@ -324,7 +313,7 @@ static void request_memblock(struct output *o) { /* OK, let's send this data to the other threads */ for (j = o->userdata->thread_info.outputs; j; j = j->next) if (j != o && j->sink_input) - pa_asyncmsgq_post(j->asyncmsgq, NULL, 0, NULL, 0, &chunk, NULL); + pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); /* And push it into our own queue */ pa_memblockq_push_align(o->memblockq, &chunk); @@ -362,37 +351,6 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) { } /* Called from I/O thread context */ -static int sink_input_process_cb(pa_sink_input *i) { - struct output *o; - pa_memchunk chunk; - int r = 0; - - pa_sink_input_assert_ref(i); - o = i->userdata; - pa_assert(o); - - /* Move all data in the asyncmsgq into our memblockq */ - - while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) { - if (PA_SINK_OPENED(i->sink->thread_info.state)) - pa_memblockq_push_align(o->memblockq, &chunk); - pa_asyncmsgq_done(o->asyncmsgq, 0); - } - - /* If the sink is suspended, flush our queue */ - if (!PA_SINK_OPENED(i->sink->thread_info.state)) - pa_memblockq_flush(o->memblockq); - - if (o == o->userdata->thread_info.master) { - pa_mutex_lock(o->userdata->mutex); - r = pa_sink_process_inputs(o->userdata->sink); - pa_mutex_unlock(o->userdata->mutex); - } - - return r; -} - -/* Called from I/O thread context */ static void sink_input_attach_cb(pa_sink_input *i) { struct output *o; @@ -401,7 +359,10 @@ static void sink_input_attach_cb(pa_sink_input *i) { pa_assert(o); pa_assert(!o->rtpoll_item); - o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(i->sink->rtpoll, PA_RTPOLL_NORMAL, o->asyncmsgq); + o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq( + i->sink->rtpoll, + PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */ + o->asyncmsgq); } /* Called from I/O thread context */ @@ -448,6 +409,15 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 break; } + case SINK_INPUT_MESSAGE_POST: { + + if (PA_SINK_OPENED(o->sink_input->sink->thread_info.state)) + pa_memblockq_push_align(o->memblockq, chunk); + else + pa_memblockq_flush(o->memblockq); + + break; + } } return pa_sink_input_process_msg(obj, code, data, offset, chunk); @@ -784,7 +754,6 @@ static int output_create_sink_input(struct userdata *u, struct output *o) { o->sink_input->parent.process_msg = sink_input_process_msg; o->sink_input->peek = sink_input_peek_cb; o->sink_input->drop = sink_input_drop_cb; - o->sink_input->process = sink_input_process_cb; o->sink_input->attach = sink_input_attach_cb; o->sink_input->detach = sink_input_detach_cb; o->sink_input->kill = sink_input_kill_cb; diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index ba2ae6f0..04df239d 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -145,28 +145,17 @@ static void thread_func(void *userdata) { } else pa_rtpoll_set_timer_disabled(u->rtpoll); - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + + if (ret == 0) + goto finish; } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c index 1fd8d2e5..037c4017 100644 --- a/src/modules/module-oss.c +++ b/src/modules/module-oss.c @@ -999,28 +999,6 @@ static void thread_func(void *userdata) { /* pa_log("loop2"); */ - /* Now give the sink inputs some to time to process their data */ - if (u->sink) { - if ((ret = pa_sink_process_inputs(u->sink)) < 0) - goto fail; - if (ret > 0) - continue; - } - - /* Now give the source outputs some to time to process their data */ - if (u->source) { - if ((ret = pa_source_process_outputs(u->source)) < 0) - goto fail; - if (ret > 0) - continue; - } - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - if (u->fd >= 0) { struct pollfd *pollfd; @@ -1031,11 +1009,12 @@ static void thread_func(void *userdata) { } /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + if (ret == 0) + goto finish; + if (u->fd >= 0) { struct pollfd *pollfd; @@ -1052,8 +1031,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 9594a685..a1bdc8fb 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -126,8 +126,8 @@ static void thread_func(void *userdata) { pa_rtpoll_install(u->rtpoll); for (;;) { - int ret; struct pollfd *pollfd; + int ret; pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); @@ -170,36 +170,26 @@ static void thread_func(void *userdata) { } } - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ pollfd->events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0; - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + + if (ret == 0) + goto finish; pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + if (pollfd->revents & ~POLLOUT) { pa_log("FIFO shutdown."); goto fail; } - } + } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index 1b42fcfa..382da8f9 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -149,26 +149,15 @@ static void thread_func(void *userdata) { } } - /* Now give the source outputs some to time to process their data */ - if ((ret = pa_source_process_outputs(u->source)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ pollfd->events = u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0; - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + if (ret == 0) + goto finish; + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); if (pollfd->revents & ~POLLIN) { pa_log("FIFO shutdown."); @@ -177,8 +166,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); |