summaryrefslogtreecommitdiffstats
path: root/src/pulsecore
diff options
context:
space:
mode:
Diffstat (limited to 'src/pulsecore')
-rw-r--r--src/pulsecore/asyncmsgq.c21
-rw-r--r--src/pulsecore/asyncmsgq.h1
-rw-r--r--src/pulsecore/rtpoll.c91
-rw-r--r--src/pulsecore/rtpoll.h20
-rw-r--r--src/pulsecore/sink-input.c2
-rw-r--r--src/pulsecore/sink-input.h7
-rw-r--r--src/pulsecore/sink.c17
-rw-r--r--src/pulsecore/source-output.c2
-rw-r--r--src/pulsecore/source-output.h7
-rw-r--r--src/pulsecore/source.c18
-rw-r--r--src/pulsecore/thread-mq.c25
-rw-r--r--src/pulsecore/thread-mq.h3
12 files changed, 115 insertions, 99 deletions
diff --git a/src/pulsecore/asyncmsgq.c b/src/pulsecore/asyncmsgq.c
index e3a1ba91..b3654460 100644
--- a/src/pulsecore/asyncmsgq.c
+++ b/src/pulsecore/asyncmsgq.c
@@ -248,6 +248,27 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
return 0;
}
+int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
+ pa_msgobject *object;
+ int code;
+ void *data;
+ pa_memchunk chunk;
+ int64_t offset;
+ int ret;
+
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
+ return 0;
+
+ pa_asyncmsgq_ref(a);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
+ pa_asyncmsgq_done(a, ret);
+ pa_asyncmsgq_unref(a);
+
+ return 1;
+}
+
int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
diff --git a/src/pulsecore/asyncmsgq.h b/src/pulsecore/asyncmsgq.h
index 55812c6f..393bb0b1 100644
--- a/src/pulsecore/asyncmsgq.h
+++ b/src/pulsecore/asyncmsgq.h
@@ -65,6 +65,7 @@ int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **u
int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk);
void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
+int pa_asyncmsgq_process_one(pa_asyncmsgq *a);
/* Just for the reading side */
int pa_asyncmsgq_get_fd(pa_asyncmsgq *q);
diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c
index 659e5381..0de8d0ce 100644
--- a/src/pulsecore/rtpoll.c
+++ b/src/pulsecore/rtpoll.c
@@ -53,7 +53,7 @@ struct pa_rtpoll {
pa_usec_t period;
int scan_for_dead;
- int running, installed, rebuild_needed;
+ int running, installed, rebuild_needed, quit;
#ifdef HAVE_PPOLL
int rtsig;
@@ -76,6 +76,7 @@ struct pa_rtpoll_item {
struct pollfd *pollfd;
unsigned n_pollfd;
+ int (*work_cb)(pa_rtpoll_item *i);
int (*before_cb)(pa_rtpoll_item *i);
void (*after_cb)(pa_rtpoll_item *i);
void *userdata;
@@ -134,6 +135,7 @@ pa_rtpoll *pa_rtpoll_new(void) {
p->installed = 0;
p->scan_for_dead = 0;
p->rebuild_needed = 0;
+ p->quit = 0;
PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
@@ -288,7 +290,6 @@ static void reset_all_revents(pa_rtpoll *p) {
int pa_rtpoll_run(pa_rtpoll *p, int wait) {
pa_rtpoll_item *i;
int r = 0;
- int saved_errno = 0;
struct timespec timeout;
pa_assert(p);
@@ -297,6 +298,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) {
p->running = 1;
+ /* First, let's do some work */
for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
int k;
@@ -306,12 +308,31 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) {
if (!i->before_cb)
continue;
- if ((k = i->before_cb(i)) != 0) {
+ if (p->quit)
+ goto finish;
+
+ if ((k = i->work_cb(i)) != 0) {
+ if (k < 0)
+ r = k;
+
+ goto finish;
+ }
+ }
+
+ /* Now let's prepare for entering the sleep */
+ for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
+ int k = 0;
+
+ if (i->dead)
+ continue;
+
+ if (!i->before_cb)
+ continue;
+
+ if (p->quit || (k = i->before_cb(i)) != 0) {
/* Hmm, this one doesn't let us enter the poll, so rewind everything */
- reset_all_revents(p);
-
for (i = i->prev; i; i = i->prev) {
if (i->dead)
@@ -334,7 +355,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) {
rtpoll_rebuild(p);
/* Calculate timeout */
- if (!wait) {
+ if (!wait || p->quit) {
timeout.tv_sec = 0;
timeout.tv_nsec = 0;
} else if (p->timer_enabled) {
@@ -362,13 +383,14 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) {
r = poll(p->pollfd, p->n_pollfd_used, p->timer_enabled > 0 ? (timeout.tv_sec*1000) + (timeout.tv_nsec / 1000000) : -1);
#endif
- if (r < 0)
+ if (r < 0) {
reset_all_revents(p);
- if (r < 0 && (errno == EAGAIN || errno == EINTR))
- r = 0;
-
- saved_errno = r < 0 ? errno : 0;
+ if (errno == EAGAIN || errno == EINTR)
+ r = 0;
+ else
+ pa_log_error("poll(): %s", pa_cstrerror(errno));
+ }
if (p->timer_enabled) {
if (p->period > 0) {
@@ -385,6 +407,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) {
p->timer_enabled = 0;
}
+ /* Let's tell everyone that we left the sleep */
for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
if (i->dead)
@@ -413,10 +436,7 @@ finish:
}
}
- if (saved_errno != 0)
- errno = saved_errno;
-
- return r;
+ return r < 0 ? r : !p->quit;
}
static void update_timer(pa_rtpoll *p) {
@@ -528,6 +548,7 @@ pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsi
i->userdata = NULL;
i->before_cb = NULL;
i->after_cb = NULL;
+ i->work_cb = NULL;
for (j = p->items; j; j = j->next) {
if (prio <= j->priority)
@@ -585,6 +606,13 @@ void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rt
i->after_cb = after_cb;
}
+void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
+ pa_assert(i);
+ pa_assert(i->priority < PA_RTPOLL_NEVER);
+
+ i->work_cb = work_cb;
+}
+
void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
pa_assert(i);
@@ -649,6 +677,32 @@ static void asyncmsgq_after(pa_rtpoll_item *i) {
pa_asyncmsgq_after_poll(i->userdata);
}
+static int asyncmsgq_work(pa_rtpoll_item *i) {
+ pa_msgobject *object;
+ int code;
+ void *data;
+ pa_memchunk chunk;
+ int64_t offset;
+
+ pa_assert(i);
+
+ if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
+ int ret;
+
+ if (!object && code == PA_MESSAGE_SHUTDOWN) {
+ pa_asyncmsgq_done(i->userdata, 0);
+ pa_rtpoll_quit(i->rtpoll);
+ return 1;
+ }
+
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
+ pa_asyncmsgq_done(i->userdata, ret);
+ return 1;
+ }
+
+ return 0;
+}
+
pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
pa_rtpoll_item *i;
struct pollfd *pollfd;
@@ -664,7 +718,14 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t
i->before_cb = asyncmsgq_before;
i->after_cb = asyncmsgq_after;
+ i->work_cb = asyncmsgq_work;
i->userdata = q;
return i;
}
+
+void pa_rtpoll_quit(pa_rtpoll *p) {
+ pa_assert(p);
+
+ p->quit = 1;
+}
diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h
index bef9eedb..9a368d36 100644
--- a/src/pulsecore/rtpoll.h
+++ b/src/pulsecore/rtpoll.h
@@ -69,7 +69,9 @@ void pa_rtpoll_install(pa_rtpoll *p);
/* Sleep on the rtpoll until the time event, or any of the fd events
* is triggered. If "wait" is 0 we don't sleep but only update the
- * struct pollfd. */
+ * struct pollfd. Returns negative on error, positive if the loop
+ * should continue to run, 0 when the loop should be terminated
+ * cleanly. */
int pa_rtpoll_run(pa_rtpoll *f, int wait);
void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, const struct timespec *ts);
@@ -86,18 +88,30 @@ void pa_rtpoll_item_free(pa_rtpoll_item *i);
* using the pointer and don't save the result anywhere */
struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds);
+/* Set the callback that shall be called when there's time to do some work: If the
+ * callback returns a value > 0, the poll is skipped and the next
+ * iteraton of the loop will start immediately. */
+void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i));
+
/* Set the callback that shall be called immediately before entering
- * the sleeping poll: If the callback returns a negative value, the
- * poll is skipped. */
+ * the sleeping poll: If the callback returns a value > 0, the poll is
+ * skipped and the next iteraton of the loop will start
+ * immediately.. */
void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i));
/* Set the callback that shall be called immediately after having
* entered the sleeping poll */
void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i));
+
+
void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata);
void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i);
pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s);
pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
+/* Requests the loop to exit. Will cause the next iteration of
+ * pa_rtpoll_run() to return 0 */
+void pa_rtpoll_quit(pa_rtpoll *p);
+
#endif
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index c33d8e70..2687cfaa 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -194,7 +194,6 @@ pa_sink_input* pa_sink_input_new(
i->peek = NULL;
i->drop = NULL;
- i->process = NULL;
i->kill = NULL;
i->get_latency = NULL;
i->attach = NULL;
@@ -272,7 +271,6 @@ void pa_sink_input_unlink(pa_sink_input *i) {
i->peek = NULL;
i->drop = NULL;
- i->process = NULL;
i->kill = NULL;
i->get_latency = NULL;
i->attach = NULL;
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index e1d89ffb..c4e65b50 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -90,13 +90,6 @@ struct pa_sink_input {
* peek(), but not necessarily. Called from IO thread context. */
void (*drop) (pa_sink_input *i, size_t length);
- /* If non-NULL this function is called in each IO event loop and
- * can be used to do additional processing even when the device is
- * suspended and peek() is never called. Should return 1 when
- * "some work" has been done and the IO event loop should be
- * reiterated immediately. Called from IO thread context. */
- int (*process) (pa_sink_input *i); /* may be NULL */
-
/* If non-NULL this function is called when the input is first
* connected to a sink. Called from IO thread context */
void (*attach) (pa_sink_input *i); /* may be NULL */
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index b009bc77..a7ed5a40 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -922,20 +922,3 @@ int pa_sink_suspend_all(pa_core *c, int suspend) {
return ret;
}
-int pa_sink_process_inputs(pa_sink *s) {
- pa_sink_input *i;
- void *state = NULL;
- int r;
-
- pa_sink_assert_ref(s);
-
- if (!PA_SINK_LINKED(s->thread_info.state))
- return 0;
-
- while ((i = PA_SINK_INPUT(pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))))
- if (i->process)
- if ((r = i->process(i)))
- return r;
-
- return 0;
-}
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index 42672341..b77a4ae3 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -148,7 +148,6 @@ pa_source_output* pa_source_output_new(
o->channel_map = data->channel_map;
o->push = NULL;
- o->process = NULL;
o->kill = NULL;
o->get_latency = NULL;
o->detach = NULL;
@@ -204,7 +203,6 @@ void pa_source_output_unlink(pa_source_output*o) {
pa_source_update_status(o->source);
o->push = NULL;
- o->process = NULL;
o->kill = NULL;
o->get_latency = NULL;
o->attach = NULL;
diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h
index 2027e37a..5059c465 100644
--- a/src/pulsecore/source-output.h
+++ b/src/pulsecore/source-output.h
@@ -73,13 +73,6 @@ struct pa_source_output {
* context. */
void (*push)(pa_source_output *o, const pa_memchunk *chunk);
- /* If non-NULL this function is called in each IO event loop and
- * can be used to do additional processing even when the device is
- * suspended and peek() is never called. Should return 1 when
- * "some work" has been done and the IO event loop should be
- * reiterated immediately. Called from IO thread context. */
- int (*process) (pa_source_output *o); /* may be NULL */
-
/* If non-NULL this function is called when the output is first
* connected to a source. Called from IO thread context */
void (*attach) (pa_source_output *o); /* may be NULL */
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 2f1a5a5f..34e023de 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -504,21 +504,3 @@ int pa_source_suspend_all(pa_core *c, int suspend) {
return ret;
}
-
-int pa_source_process_outputs(pa_source *s) {
- pa_source_output *o;
- void *state = NULL;
- int r;
-
- pa_source_assert_ref(s);
-
- if (!PA_SOURCE_LINKED(s->state))
- return 0;
-
- while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
- if (o->process)
- if ((r = o->process(o)))
- return r;
-
- return 0;
-}
diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c
index 3000246a..d572f6e0 100644
--- a/src/pulsecore/thread-mq.c
+++ b/src/pulsecore/thread-mq.c
@@ -110,28 +110,3 @@ void pa_thread_mq_install(pa_thread_mq *q) {
pa_thread_mq *pa_thread_mq_get(void) {
return PA_STATIC_TLS_GET(thread_mq);
}
-
-int pa_thread_mq_process(pa_thread_mq *q) {
- pa_msgobject *object;
- int code;
- void *data;
- pa_memchunk chunk;
- int64_t offset;
-
- pa_assert(q);
-
- if (pa_asyncmsgq_get(q->inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
- int ret;
-
- if (!object && code == PA_MESSAGE_SHUTDOWN) {
- pa_asyncmsgq_done(q->inq, 0);
- return -1;
- }
-
- ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
- pa_asyncmsgq_done(q->inq, ret);
- return 1;
- }
-
- return 0;
-}
diff --git a/src/pulsecore/thread-mq.h b/src/pulsecore/thread-mq.h
index 2b1fd687..13b6e01f 100644
--- a/src/pulsecore/thread-mq.h
+++ b/src/pulsecore/thread-mq.h
@@ -43,9 +43,6 @@ void pa_thread_mq_done(pa_thread_mq *q);
/* Install the specified pa_thread_mq object for the current thread */
void pa_thread_mq_install(pa_thread_mq *q);
-/* Dispatched queued events on the thread side. */
-int pa_thread_mq_process(pa_thread_mq *q);
-
/* Return the pa_thread_mq object that is set for the current thread */
pa_thread_mq *pa_thread_mq_get(void);