diff options
Diffstat (limited to 'src/pulsecore')
-rw-r--r-- | src/pulsecore/asyncmsgq.c | 21 | ||||
-rw-r--r-- | src/pulsecore/asyncmsgq.h | 1 | ||||
-rw-r--r-- | src/pulsecore/rtpoll.c | 91 | ||||
-rw-r--r-- | src/pulsecore/rtpoll.h | 20 | ||||
-rw-r--r-- | src/pulsecore/sink-input.c | 2 | ||||
-rw-r--r-- | src/pulsecore/sink-input.h | 7 | ||||
-rw-r--r-- | src/pulsecore/sink.c | 17 | ||||
-rw-r--r-- | src/pulsecore/source-output.c | 2 | ||||
-rw-r--r-- | src/pulsecore/source-output.h | 7 | ||||
-rw-r--r-- | src/pulsecore/source.c | 18 | ||||
-rw-r--r-- | src/pulsecore/thread-mq.c | 25 | ||||
-rw-r--r-- | src/pulsecore/thread-mq.h | 3 |
12 files changed, 115 insertions, 99 deletions
diff --git a/src/pulsecore/asyncmsgq.c b/src/pulsecore/asyncmsgq.c index e3a1ba91..b3654460 100644 --- a/src/pulsecore/asyncmsgq.c +++ b/src/pulsecore/asyncmsgq.c @@ -248,6 +248,27 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) { return 0; } +int pa_asyncmsgq_process_one(pa_asyncmsgq *a) { + pa_msgobject *object; + int code; + void *data; + pa_memchunk chunk; + int64_t offset; + int ret; + + pa_assert(PA_REFCNT_VALUE(a) > 0); + + if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0) + return 0; + + pa_asyncmsgq_ref(a); + ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); + pa_asyncmsgq_done(a, ret); + pa_asyncmsgq_unref(a); + + return 1; +} + int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); diff --git a/src/pulsecore/asyncmsgq.h b/src/pulsecore/asyncmsgq.h index 55812c6f..393bb0b1 100644 --- a/src/pulsecore/asyncmsgq.h +++ b/src/pulsecore/asyncmsgq.h @@ -65,6 +65,7 @@ int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **u int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk); void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret); int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code); +int pa_asyncmsgq_process_one(pa_asyncmsgq *a); /* Just for the reading side */ int pa_asyncmsgq_get_fd(pa_asyncmsgq *q); diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c index 659e5381..0de8d0ce 100644 --- a/src/pulsecore/rtpoll.c +++ b/src/pulsecore/rtpoll.c @@ -53,7 +53,7 @@ struct pa_rtpoll { pa_usec_t period; int scan_for_dead; - int running, installed, rebuild_needed; + int running, installed, rebuild_needed, quit; #ifdef HAVE_PPOLL int rtsig; @@ -76,6 +76,7 @@ struct pa_rtpoll_item { struct pollfd *pollfd; unsigned n_pollfd; + int (*work_cb)(pa_rtpoll_item *i); int (*before_cb)(pa_rtpoll_item *i); void (*after_cb)(pa_rtpoll_item *i); void *userdata; @@ -134,6 +135,7 @@ pa_rtpoll *pa_rtpoll_new(void) { p->installed = 0; p->scan_for_dead = 0; p->rebuild_needed = 0; + p->quit = 0; PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items); @@ -288,7 +290,6 @@ static void reset_all_revents(pa_rtpoll *p) { int pa_rtpoll_run(pa_rtpoll *p, int wait) { pa_rtpoll_item *i; int r = 0; - int saved_errno = 0; struct timespec timeout; pa_assert(p); @@ -297,6 +298,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { p->running = 1; + /* First, let's do some work */ for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { int k; @@ -306,12 +308,31 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { if (!i->before_cb) continue; - if ((k = i->before_cb(i)) != 0) { + if (p->quit) + goto finish; + + if ((k = i->work_cb(i)) != 0) { + if (k < 0) + r = k; + + goto finish; + } + } + + /* Now let's prepare for entering the sleep */ + for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { + int k = 0; + + if (i->dead) + continue; + + if (!i->before_cb) + continue; + + if (p->quit || (k = i->before_cb(i)) != 0) { /* Hmm, this one doesn't let us enter the poll, so rewind everything */ - reset_all_revents(p); - for (i = i->prev; i; i = i->prev) { if (i->dead) @@ -334,7 +355,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { rtpoll_rebuild(p); /* Calculate timeout */ - if (!wait) { + if (!wait || p->quit) { timeout.tv_sec = 0; timeout.tv_nsec = 0; } else if (p->timer_enabled) { @@ -362,13 +383,14 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { r = poll(p->pollfd, p->n_pollfd_used, p->timer_enabled > 0 ? (timeout.tv_sec*1000) + (timeout.tv_nsec / 1000000) : -1); #endif - if (r < 0) + if (r < 0) { reset_all_revents(p); - if (r < 0 && (errno == EAGAIN || errno == EINTR)) - r = 0; - - saved_errno = r < 0 ? errno : 0; + if (errno == EAGAIN || errno == EINTR) + r = 0; + else + pa_log_error("poll(): %s", pa_cstrerror(errno)); + } if (p->timer_enabled) { if (p->period > 0) { @@ -385,6 +407,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { p->timer_enabled = 0; } + /* Let's tell everyone that we left the sleep */ for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { if (i->dead) @@ -413,10 +436,7 @@ finish: } } - if (saved_errno != 0) - errno = saved_errno; - - return r; + return r < 0 ? r : !p->quit; } static void update_timer(pa_rtpoll *p) { @@ -528,6 +548,7 @@ pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsi i->userdata = NULL; i->before_cb = NULL; i->after_cb = NULL; + i->work_cb = NULL; for (j = p->items; j; j = j->next) { if (prio <= j->priority) @@ -585,6 +606,13 @@ void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rt i->after_cb = after_cb; } +void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) { + pa_assert(i); + pa_assert(i->priority < PA_RTPOLL_NEVER); + + i->work_cb = work_cb; +} + void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) { pa_assert(i); @@ -649,6 +677,32 @@ static void asyncmsgq_after(pa_rtpoll_item *i) { pa_asyncmsgq_after_poll(i->userdata); } +static int asyncmsgq_work(pa_rtpoll_item *i) { + pa_msgobject *object; + int code; + void *data; + pa_memchunk chunk; + int64_t offset; + + pa_assert(i); + + if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) { + int ret; + + if (!object && code == PA_MESSAGE_SHUTDOWN) { + pa_asyncmsgq_done(i->userdata, 0); + pa_rtpoll_quit(i->rtpoll); + return 1; + } + + ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); + pa_asyncmsgq_done(i->userdata, ret); + return 1; + } + + return 0; +} + pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) { pa_rtpoll_item *i; struct pollfd *pollfd; @@ -664,7 +718,14 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t i->before_cb = asyncmsgq_before; i->after_cb = asyncmsgq_after; + i->work_cb = asyncmsgq_work; i->userdata = q; return i; } + +void pa_rtpoll_quit(pa_rtpoll *p) { + pa_assert(p); + + p->quit = 1; +} diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h index bef9eedb..9a368d36 100644 --- a/src/pulsecore/rtpoll.h +++ b/src/pulsecore/rtpoll.h @@ -69,7 +69,9 @@ void pa_rtpoll_install(pa_rtpoll *p); /* Sleep on the rtpoll until the time event, or any of the fd events * is triggered. If "wait" is 0 we don't sleep but only update the - * struct pollfd. */ + * struct pollfd. Returns negative on error, positive if the loop + * should continue to run, 0 when the loop should be terminated + * cleanly. */ int pa_rtpoll_run(pa_rtpoll *f, int wait); void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, const struct timespec *ts); @@ -86,18 +88,30 @@ void pa_rtpoll_item_free(pa_rtpoll_item *i); * using the pointer and don't save the result anywhere */ struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds); +/* Set the callback that shall be called when there's time to do some work: If the + * callback returns a value > 0, the poll is skipped and the next + * iteraton of the loop will start immediately. */ +void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)); + /* Set the callback that shall be called immediately before entering - * the sleeping poll: If the callback returns a negative value, the - * poll is skipped. */ + * the sleeping poll: If the callback returns a value > 0, the poll is + * skipped and the next iteraton of the loop will start + * immediately.. */ void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)); /* Set the callback that shall be called immediately after having * entered the sleeping poll */ void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)); + + void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata); void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i); pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s); pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q); +/* Requests the loop to exit. Will cause the next iteration of + * pa_rtpoll_run() to return 0 */ +void pa_rtpoll_quit(pa_rtpoll *p); + #endif diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index c33d8e70..2687cfaa 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -194,7 +194,6 @@ pa_sink_input* pa_sink_input_new( i->peek = NULL; i->drop = NULL; - i->process = NULL; i->kill = NULL; i->get_latency = NULL; i->attach = NULL; @@ -272,7 +271,6 @@ void pa_sink_input_unlink(pa_sink_input *i) { i->peek = NULL; i->drop = NULL; - i->process = NULL; i->kill = NULL; i->get_latency = NULL; i->attach = NULL; diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index e1d89ffb..c4e65b50 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -90,13 +90,6 @@ struct pa_sink_input { * peek(), but not necessarily. Called from IO thread context. */ void (*drop) (pa_sink_input *i, size_t length); - /* If non-NULL this function is called in each IO event loop and - * can be used to do additional processing even when the device is - * suspended and peek() is never called. Should return 1 when - * "some work" has been done and the IO event loop should be - * reiterated immediately. Called from IO thread context. */ - int (*process) (pa_sink_input *i); /* may be NULL */ - /* If non-NULL this function is called when the input is first * connected to a sink. Called from IO thread context */ void (*attach) (pa_sink_input *i); /* may be NULL */ diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index b009bc77..a7ed5a40 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -922,20 +922,3 @@ int pa_sink_suspend_all(pa_core *c, int suspend) { return ret; } -int pa_sink_process_inputs(pa_sink *s) { - pa_sink_input *i; - void *state = NULL; - int r; - - pa_sink_assert_ref(s); - - if (!PA_SINK_LINKED(s->thread_info.state)) - return 0; - - while ((i = PA_SINK_INPUT(pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)))) - if (i->process) - if ((r = i->process(i))) - return r; - - return 0; -} diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 42672341..b77a4ae3 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -148,7 +148,6 @@ pa_source_output* pa_source_output_new( o->channel_map = data->channel_map; o->push = NULL; - o->process = NULL; o->kill = NULL; o->get_latency = NULL; o->detach = NULL; @@ -204,7 +203,6 @@ void pa_source_output_unlink(pa_source_output*o) { pa_source_update_status(o->source); o->push = NULL; - o->process = NULL; o->kill = NULL; o->get_latency = NULL; o->attach = NULL; diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h index 2027e37a..5059c465 100644 --- a/src/pulsecore/source-output.h +++ b/src/pulsecore/source-output.h @@ -73,13 +73,6 @@ struct pa_source_output { * context. */ void (*push)(pa_source_output *o, const pa_memchunk *chunk); - /* If non-NULL this function is called in each IO event loop and - * can be used to do additional processing even when the device is - * suspended and peek() is never called. Should return 1 when - * "some work" has been done and the IO event loop should be - * reiterated immediately. Called from IO thread context. */ - int (*process) (pa_source_output *o); /* may be NULL */ - /* If non-NULL this function is called when the output is first * connected to a source. Called from IO thread context */ void (*attach) (pa_source_output *o); /* may be NULL */ diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index 2f1a5a5f..34e023de 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -504,21 +504,3 @@ int pa_source_suspend_all(pa_core *c, int suspend) { return ret; } - -int pa_source_process_outputs(pa_source *s) { - pa_source_output *o; - void *state = NULL; - int r; - - pa_source_assert_ref(s); - - if (!PA_SOURCE_LINKED(s->state)) - return 0; - - while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) - if (o->process) - if ((r = o->process(o))) - return r; - - return 0; -} diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c index 3000246a..d572f6e0 100644 --- a/src/pulsecore/thread-mq.c +++ b/src/pulsecore/thread-mq.c @@ -110,28 +110,3 @@ void pa_thread_mq_install(pa_thread_mq *q) { pa_thread_mq *pa_thread_mq_get(void) { return PA_STATIC_TLS_GET(thread_mq); } - -int pa_thread_mq_process(pa_thread_mq *q) { - pa_msgobject *object; - int code; - void *data; - pa_memchunk chunk; - int64_t offset; - - pa_assert(q); - - if (pa_asyncmsgq_get(q->inq, &object, &code, &data, &offset, &chunk, 0) == 0) { - int ret; - - if (!object && code == PA_MESSAGE_SHUTDOWN) { - pa_asyncmsgq_done(q->inq, 0); - return -1; - } - - ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); - pa_asyncmsgq_done(q->inq, ret); - return 1; - } - - return 0; -} diff --git a/src/pulsecore/thread-mq.h b/src/pulsecore/thread-mq.h index 2b1fd687..13b6e01f 100644 --- a/src/pulsecore/thread-mq.h +++ b/src/pulsecore/thread-mq.h @@ -43,9 +43,6 @@ void pa_thread_mq_done(pa_thread_mq *q); /* Install the specified pa_thread_mq object for the current thread */ void pa_thread_mq_install(pa_thread_mq *q); -/* Dispatched queued events on the thread side. */ -int pa_thread_mq_process(pa_thread_mq *q); - /* Return the pa_thread_mq object that is set for the current thread */ pa_thread_mq *pa_thread_mq_get(void); |