summaryrefslogtreecommitdiffstats
path: root/src/modules
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2009-08-15 01:07:37 +0200
committerLennart Poettering <lennart@poettering.net>2009-08-15 01:07:37 +0200
commite1f3f5e0bf3d788ff69d24cd40c465eaaf6e9385 (patch)
tree5dee3632d52dc653722fa4258cd3a7e06e49c683 /src/modules
parent8947d6551586d239be206f90adca2f6dace667a2 (diff)
combine: big rework
Diffstat (limited to 'src/modules')
-rw-r--r--src/modules/module-combine.c471
1 files changed, 266 insertions, 205 deletions
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 325b8988..04c0d4db 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -92,6 +92,8 @@ struct output {
pa_sink *sink;
pa_sink_input *sink_input;
+ pa_bool_t ignore_state_change;
+
pa_asyncmsgq *inq, /* Message queue from the sink thread to this sink input */
*outq; /* Message queue from this sink input to the sink thread */
pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
@@ -99,9 +101,12 @@ struct output {
pa_memblockq *memblockq;
+ /* For communication of the stream latencies to the main thread */
pa_usec_t total_latency;
+ /* For coomunication of the stream parameters to the sink thread */
pa_atomic_t max_request;
+ pa_atomic_t requested_latency;
PA_LLIST_FIELDS(struct output);
};
@@ -144,13 +149,16 @@ enum {
SINK_MESSAGE_REMOVE_OUTPUT,
SINK_MESSAGE_NEED,
SINK_MESSAGE_UPDATE_LATENCY,
- SINK_MESSAGE_UPDATE_MAX_REQUEST
+ SINK_MESSAGE_UPDATE_MAX_REQUEST,
+ SINK_MESSAGE_UPDATE_REQUESTED_LATENCY
};
enum {
SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
};
+static void output_disable(struct output *o);
+static void output_enable(struct output *o);
static void output_free(struct output *o);
static int output_create_sink_input(struct output *o);
@@ -170,7 +178,7 @@ static void adjust_rates(struct userdata *u) {
if (!PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)))
return;
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+ PA_IDXSET_FOREACH(o, u->outputs, idx) {
pa_usec_t sink_latency;
if (!o->sink_input || !PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
@@ -187,6 +195,8 @@ static void adjust_rates(struct userdata *u) {
avg_total_latency += o->total_latency;
n++;
+
+ pa_log_debug("[%s] total=%0.2fms sink=%0.2fms ", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC, (double) sink_latency / PA_USEC_PER_MSEC);
}
if (min_total_latency == (pa_usec_t) -1)
@@ -201,7 +211,7 @@ static void adjust_rates(struct userdata *u) {
base_rate = u->sink->sample_spec.rate;
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+ PA_IDXSET_FOREACH(o, u->outputs, idx) {
uint32_t r = base_rate;
if (!o->sink_input || !PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
@@ -213,10 +223,10 @@ static void adjust_rates(struct userdata *u) {
r += (uint32_t) ((((double) (o->total_latency - target_latency))/(double)u->adjust_time)*(double)r/PA_USEC_PER_SEC);
if (r < (uint32_t) (base_rate*0.9) || r > (uint32_t) (base_rate*1.1)) {
- pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", pa_proplist_gets(o->sink_input->proplist, PA_PROP_MEDIA_NAME), base_rate, r);
+ pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", o->sink_input->sink->name, base_rate, r);
pa_sink_input_set_rate(o->sink_input, base_rate);
} else {
- pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.0f usec.", pa_proplist_gets(o->sink_input->proplist, PA_PROP_MEDIA_NAME), r, (double) r / base_rate, (float) o->total_latency);
+ pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.0f usec.", o->sink_input->sink->name, r, (double) r / base_rate, (float) o->total_latency);
pa_sink_input_set_rate(o->sink_input, r);
}
}
@@ -353,18 +363,15 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
u->thread_info.counter += chunk.length;
/* OK, let's send this data to the other threads */
- for (j = u->thread_info.active_outputs; j; j = j->next)
-
- /* Send to other outputs, which are not the requesting
- * one */
+ PA_LLIST_FOREACH(j, u->thread_info.active_outputs) {
+ if (j == o)
+ continue;
- if (j != o)
- pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+ pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+ }
/* And place it directly into the requesting output's queue */
- if (o)
- pa_memblockq_push_align(o->memblockq, &chunk);
-
+ pa_memblockq_push_align(o->memblockq, &chunk);
pa_memblock_unref(chunk.memblock);
}
}
@@ -400,10 +407,18 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
/* If necessary, get some new data */
request_memblock(o, nbytes);
+ /* pa_log("%s q size is %u + %u (%u/%u)", */
+ /* i->sink->name, */
+ /* pa_memblockq_get_nblocks(o->memblockq), */
+ /* pa_memblockq_get_nblocks(i->thread_info.render_memblockq), */
+ /* pa_memblockq_get_maxrewind(o->memblockq), */
+ /* pa_memblockq_get_maxrewind(i->thread_info.render_memblockq)); */
+
if (pa_memblockq_peek(o->memblockq, chunk) < 0)
return -1;
pa_memblockq_drop(o->memblockq, chunk->length);
+
return 0;
}
@@ -438,13 +453,35 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
return;
pa_atomic_store(&o->max_request, (int) nbytes);
-
pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
}
+/* Called from thread context */
+static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
+ struct output *o;
+ pa_usec_t c;
+
+ pa_assert(i);
+
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(o = i->userdata);
+
+ c = pa_sink_get_requested_latency_within_thread(i->sink);
+
+ if (c == (pa_usec_t) -1)
+ c = i->sink->thread_info.max_latency;
+
+ if (pa_atomic_load(&o->requested_latency) == (int) c)
+ return;
+
+ pa_atomic_store(&o->requested_latency, (int) c);
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
+}
+
/* Called from I/O thread context */
static void sink_input_attach_cb(pa_sink_input *i) {
struct output *o;
+ pa_usec_t c;
pa_sink_input_assert_ref(i);
pa_assert_se(o = i->userdata);
@@ -461,6 +498,16 @@ static void sink_input_attach_cb(pa_sink_input *i) {
i->sink->thread_info.rtpoll,
PA_RTPOLL_EARLY,
o->outq);
+
+ pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
+
+ pa_atomic_store(&o->max_request, (int) pa_sink_input_get_max_request(i));
+
+ c = pa_sink_get_requested_latency_within_thread(i->sink);
+ pa_atomic_store(&o->requested_latency, (int) (c == (pa_usec_t) -1 ? 0 : c));
+
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
}
/* Called from I/O thread context */
@@ -470,14 +517,15 @@ static void sink_input_detach_cb(pa_sink_input *i) {
pa_sink_input_assert_ref(i);
pa_assert_se(o = i->userdata);
- /* Shut down the queue from the sink thread to us */
- pa_assert(o->inq_rtpoll_item_read && o->outq_rtpoll_item_write);
-
- pa_rtpoll_item_free(o->inq_rtpoll_item_read);
- o->inq_rtpoll_item_read = NULL;
+ if (o->inq_rtpoll_item_read) {
+ pa_rtpoll_item_free(o->inq_rtpoll_item_read);
+ o->inq_rtpoll_item_read = NULL;
+ }
- pa_rtpoll_item_free(o->outq_rtpoll_item_write);
- o->outq_rtpoll_item_write = NULL;
+ if (o->outq_rtpoll_item_write) {
+ pa_rtpoll_item_free(o->outq_rtpoll_item_write);
+ o->outq_rtpoll_item_write = NULL;
+ }
}
/* Called from main context */
@@ -491,20 +539,6 @@ static void sink_input_kill_cb(pa_sink_input *i) {
output_free(o);
}
-/* Called from IO thread context */
-static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
- struct userdata *u;
-
- pa_sink_input_assert_ref(i);
- pa_assert_se(u = i->userdata);
-
- /* If we are added for the first time, ask for a rewinding so that
- * we are heard right-away. */
- if (PA_SINK_INPUT_IS_LINKED(state) &&
- i->thread_info.state == PA_SINK_INPUT_INIT)
- pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
-}
-
/* Called from thread context */
static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct output *o = PA_SINK_INPUT(obj)->userdata;
@@ -535,37 +569,6 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
}
/* Called from main context */
-static void disable_output(struct output *o) {
- pa_assert(o);
-
- if (!o->sink_input)
- return;
-
- pa_sink_input_unlink(o->sink_input);
- pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
- pa_sink_input_unref(o->sink_input);
- o->sink_input = NULL;
-}
-
-/* Called from main context */
-static void enable_output(struct output *o) {
- pa_assert(o);
-
- if (o->sink_input)
- return;
-
- if (output_create_sink_input(o) >= 0) {
-
- pa_memblockq_flush_write(o->memblockq);
-
- pa_sink_input_put(o->sink_input);
-
- if (o->userdata->sink && PA_SINK_IS_LINKED(pa_sink_get_state(o->userdata->sink)))
- pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
- }
-}
-
-/* Called from main context */
static void suspend(struct userdata *u) {
struct output *o;
uint32_t idx;
@@ -573,8 +576,8 @@ static void suspend(struct userdata *u) {
pa_assert(u);
/* Let's suspend by unlinking all streams */
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
- disable_output(o);
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
+ output_disable(o);
pa_log_info("Device suspended...");
}
@@ -587,13 +590,8 @@ static void unsuspend(struct userdata *u) {
pa_assert(u);
/* Let's resume */
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
-
- pa_sink_suspend(o->sink, FALSE, PA_SUSPEND_IDLE);
-
- if (PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
- enable_output(o);
- }
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
+ output_enable(o);
pa_log_info("Resumed successfully...");
}
@@ -637,7 +635,13 @@ static void update_max_request(struct userdata *u) {
size_t max_request = 0;
struct output *o;
- for (o = u->thread_info.active_outputs; o; o = o->next) {
+ pa_assert(u);
+ pa_sink_assert_io_context(u->sink);
+
+ /* Collects the max_request values of all streams and sets the
+ * largest one locally */
+
+ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
size_t mr = (size_t) pa_atomic_load(&o->max_request);
if (mr > max_request)
@@ -650,6 +654,67 @@ static void update_max_request(struct userdata *u) {
pa_sink_set_max_request_within_thread(u->sink, max_request);
}
+/* Called from IO context */
+static void update_fixed_latency(struct userdata *u) {
+ pa_usec_t fixed_latency = 0;
+ struct output *o;
+
+ pa_assert(u);
+ pa_sink_assert_io_context(u->sink);
+
+ /* Collects the requested_latency values of all streams and sets
+ * the largest one as fixed_latency locally */
+
+ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
+ pa_usec_t rl = (size_t) pa_atomic_load(&o->requested_latency);
+
+ if (rl > fixed_latency)
+ fixed_latency = rl;
+ }
+
+ if (fixed_latency <= 0)
+ fixed_latency = u->block_usec;
+
+ pa_sink_set_fixed_latency_within_thread(u->sink, fixed_latency);
+}
+
+/* Called from thread context of the io thread */
+static void output_add_within_thread(struct output *o) {
+ pa_assert(o);
+ pa_sink_assert_io_context(o->sink);
+
+ PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
+
+ pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
+
+ o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
+ o->userdata->rtpoll,
+ PA_RTPOLL_EARLY-1, /* This item is very important */
+ o->outq);
+ o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
+ o->userdata->rtpoll,
+ PA_RTPOLL_EARLY,
+ o->inq);
+}
+
+/* Called from thread context of the io thread */
+static void output_remove_within_thread(struct output *o) {
+ pa_assert(o);
+ pa_sink_assert_io_context(o->sink);
+
+ PA_LLIST_REMOVE(struct output, o->userdata->thread_info.active_outputs, o);
+
+ if (o->outq_rtpoll_item_read) {
+ pa_rtpoll_item_free(o->outq_rtpoll_item_read);
+ o->outq_rtpoll_item_read = NULL;
+ }
+
+ if (o->inq_rtpoll_item_write) {
+ pa_rtpoll_item_free(o->inq_rtpoll_item_write);
+ o->inq_rtpoll_item_write = NULL;
+ }
+}
+
/* Called from thread context of the io thread */
static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
@@ -682,42 +747,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
return 0;
}
- case SINK_MESSAGE_ADD_OUTPUT: {
- struct output *op = data;
-
- PA_LLIST_PREPEND(struct output, u->thread_info.active_outputs, op);
-
- pa_assert(!op->outq_rtpoll_item_read && !op->inq_rtpoll_item_write);
-
- op->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
- u->rtpoll,
- PA_RTPOLL_EARLY-1, /* This item is very important */
- op->outq);
- op->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
- u->rtpoll,
- PA_RTPOLL_EARLY,
- op->inq);
-
+ case SINK_MESSAGE_ADD_OUTPUT:
+ output_add_within_thread(data);
update_max_request(u);
+ update_fixed_latency(u);
return 0;
- }
-
- case SINK_MESSAGE_REMOVE_OUTPUT: {
- struct output *op = data;
-
- PA_LLIST_REMOVE(struct output, u->thread_info.active_outputs, op);
-
- pa_assert(op->outq_rtpoll_item_read && op->inq_rtpoll_item_write);
-
- pa_rtpoll_item_free(op->outq_rtpoll_item_read);
- op->outq_rtpoll_item_read = NULL;
-
- pa_rtpoll_item_free(op->inq_rtpoll_item_write);
- op->inq_rtpoll_item_write = NULL;
+ case SINK_MESSAGE_REMOVE_OUTPUT:
+ output_remove_within_thread(data);
update_max_request(u);
+ update_fixed_latency(u);
return 0;
- }
case SINK_MESSAGE_NEED:
render_memblock(u, (struct output*) data, (size_t) offset);
@@ -739,10 +779,13 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
}
case SINK_MESSAGE_UPDATE_MAX_REQUEST:
-
update_max_request(u);
break;
- }
+
+ case SINK_MESSAGE_UPDATE_REQUESTED_LATENCY:
+ update_fixed_latency(u);
+ break;
+}
return pa_sink_process_msg(o, code, data, offset, chunk);
}
@@ -765,7 +808,7 @@ static void update_description(struct userdata *u) {
t = pa_xstrdup("Simultaneous output to");
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+ PA_IDXSET_FOREACH(o, u->outputs, idx) {
char *e;
if (first) {
@@ -800,7 +843,7 @@ static int output_create_sink_input(struct output *o) {
data.module = o->userdata->module;
data.resample_method = o->userdata->resample_method;
- pa_sink_input_new(&o->sink_input, o->userdata->core, &data, PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE);
+ pa_sink_input_new(&o->sink_input, o->userdata->core, &data, PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE|PA_SINK_INPUT_NO_CREATE_ON_SUSPEND);
pa_sink_input_new_data_done(&data);
@@ -810,9 +853,9 @@ static int output_create_sink_input(struct output *o) {
o->sink_input->parent.process_msg = sink_input_process_msg;
o->sink_input->pop = sink_input_pop_cb;
o->sink_input->process_rewind = sink_input_process_rewind_cb;
- o->sink_input->state_change = sink_input_state_change_cb;
o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
o->sink_input->update_max_request = sink_input_update_max_request_cb;
+ o->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
o->sink_input->attach = sink_input_attach_cb;
o->sink_input->detach = sink_input_detach_cb;
o->sink_input->kill = sink_input_kill_cb;
@@ -823,9 +866,9 @@ static int output_create_sink_input(struct output *o) {
return 0;
}
+/* Called from main context */
static struct output *output_new(struct userdata *u, pa_sink *sink) {
struct output *o;
- pa_sink_state_t state;
pa_assert(u);
pa_assert(sink);
@@ -845,84 +888,135 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
0,
0,
NULL);
- pa_atomic_store(&o->max_request, 0);
- PA_LLIST_INIT(struct output, o);
pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
+ update_description(u);
- state = pa_sink_get_state(u->sink);
-
- if (state != PA_SINK_INIT)
- pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
- else {
- /* If the sink is not yet started, we need to do the activation ourselves */
- PA_LLIST_PREPEND(struct output, u->thread_info.active_outputs, o);
-
- o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
- u->rtpoll,
- PA_RTPOLL_EARLY-1, /* This item is very important */
- o->outq);
- o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
- u->rtpoll,
- PA_RTPOLL_EARLY,
- o->inq);
- }
+ return o;
+}
- if (PA_SINK_IS_OPENED(state) || state == PA_SINK_INIT) {
- pa_sink_suspend(sink, FALSE, PA_SUSPEND_IDLE);
+/* Called from main context */
+static void output_free(struct output *o) {
+ pa_assert(o);
- if (PA_SINK_IS_OPENED(pa_sink_get_state(sink)))
- if (output_create_sink_input(o) < 0)
- goto fail;
- }
+ output_disable(o);
- update_description(u);
+ pa_assert_se(pa_idxset_remove_by_data(o->userdata->outputs, o, NULL));
+ update_description(o->userdata);
- return o;
+ if (o->inq_rtpoll_item_read)
+ pa_rtpoll_item_free(o->inq_rtpoll_item_read);
+ if (o->inq_rtpoll_item_write)
+ pa_rtpoll_item_free(o->inq_rtpoll_item_write);
-fail:
+ if (o->outq_rtpoll_item_read)
+ pa_rtpoll_item_free(o->outq_rtpoll_item_read);
+ if (o->outq_rtpoll_item_write)
+ pa_rtpoll_item_free(o->outq_rtpoll_item_write);
- if (o) {
- pa_idxset_remove_by_data(u->outputs, o, NULL);
+ if (o->inq)
+ pa_asyncmsgq_unref(o->inq);
- if (o->sink_input) {
- pa_sink_input_unlink(o->sink_input);
- pa_sink_input_unref(o->sink_input);
- }
+ if (o->outq)
+ pa_asyncmsgq_unref(o->outq);
+
+ if (o->memblockq)
+ pa_memblockq_free(o->memblockq);
- if (o->memblockq)
- pa_memblockq_free(o->memblockq);
+ pa_xfree(o);
+}
- if (o->inq)
- pa_asyncmsgq_unref(o->inq);
+/* Called from main context */
+static void output_enable(struct output *o) {
+ pa_assert(o);
+
+ if (o->sink_input)
+ return;
+
+ /* This might cause the sink to be resumed. The state change hook
+ * of the sink might hence be called from here, which might then
+ * cause us to be called in a loop. Make sure that state changes
+ * for this output don't cause this loop by setting a flag here */
+ o->ignore_state_change = TRUE;
+
+ if (output_create_sink_input(o) >= 0) {
- if (o->outq)
- pa_asyncmsgq_unref(o->outq);
+ if (pa_sink_get_state(o->sink) != PA_SINK_INIT) {
- pa_xfree(o);
+ /* First we register the output. That means that the sink
+ * will start to pass data to this output. */
+ pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
+
+ /* Then we enable the sink input. That means that the sink
+ * is now asked for new data. */
+ pa_sink_input_put(o->sink_input);
+
+ } else
+ /* Hmm the sink is not yet started, do things right here */
+ output_add_within_thread(o);
}
- return NULL;
+ o->ignore_state_change = FALSE;
}
+/* Called from main context */
+static void output_disable(struct output *o) {
+ pa_assert(o);
+
+ if (!o->sink_input)
+ return;
+
+ /* First we disable the sink input. That means that the sink is
+ * not asked for new data anymore */
+ pa_sink_input_unlink(o->sink_input);
+
+ /* Then we unregister the output. That means that the sink doesn't
+ * pass any further data to this output */
+ pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
+
+ /* Now dellocate the stream */
+ pa_sink_input_unref(o->sink_input);
+ o->sink_input = NULL;
+
+ /* Finally, drop all queued data */
+ pa_memblockq_flush_write(o->memblockq);
+ pa_asyncmsgq_flush(o->inq, FALSE);
+ pa_asyncmsgq_flush(o->outq, FALSE);
+}
+
+/* Called from main context */
+static void output_verify(struct output *o) {
+ pa_assert(o);
+
+ if (PA_SINK_IS_OPENED(pa_sink_get_state(o->userdata->sink)))
+ output_enable(o);
+ else
+ output_disable(o);
+}
+
+/* Called from main context */
static pa_bool_t is_suitable_sink(struct userdata *u, pa_sink *s) {
const char *t;
pa_sink_assert_ref(s);
+ if (s == u->sink)
+ return FALSE;
+
if (!(s->flags & PA_SINK_HARDWARE))
return FALSE;
- if (s == u->sink)
+ if (!(s->flags & PA_SINK_LATENCY))
return FALSE;
if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_CLASS)))
- if (strcmp(t, "sound"))
+ if (!pa_streq(t, "sound"))
return FALSE;
return TRUE;
}
+/* Called from main context */
static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
struct output *o;
@@ -935,18 +1029,17 @@ static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata
return PA_HOOK_OK;
pa_log_info("Configuring new sink: %s", s->name);
-
if (!(o = output_new(u, s))) {
pa_log("Failed to create sink input on sink '%s'.", s->name);
return PA_HOOK_OK;
}
- if (o->sink_input)
- pa_sink_input_put(o->sink_input);
+ output_verify(o);
return PA_HOOK_OK;
}
+/* Called from main context */
static struct output* find_output(struct userdata *u, pa_sink *s) {
struct output *o;
uint32_t idx;
@@ -957,13 +1050,14 @@ static struct output* find_output(struct userdata *u, pa_sink *s) {
if (u->sink == s)
return NULL;
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
if (o->sink == s)
return o;
return NULL;
}
+/* Called from main context */
static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
struct output *o;
@@ -975,26 +1069,25 @@ static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userd
return PA_HOOK_OK;
pa_log_info("Unconfiguring sink: %s", s->name);
-
output_free(o);
return PA_HOOK_OK;
}
+/* Called from main context */
static pa_hook_result_t sink_state_changed_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
struct output *o;
- pa_sink_state_t state;
if (!(o = find_output(u, s)))
return PA_HOOK_OK;
- state = pa_sink_get_state(s);
-
- if (PA_SINK_IS_OPENED(state) && PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)) && !o->sink_input)
- enable_output(o);
+ /* This state change might be triggered because we are creating a
+ * stream here, in that case we don't want to create it a second
+ * time here and enter a loop */
+ if (o->ignore_state_change)
+ return PA_HOOK_OK;
- if (state == PA_SINK_SUSPENDED && o->sink_input)
- disable_output(o);
+ output_verify(o);
return PA_HOOK_OK;
}
@@ -1139,7 +1232,7 @@ int pa__init(pa_module*m) {
/* We're in automatic mode, we add every sink that matches our needs */
- for (s = pa_idxset_first(m->core->sinks, &idx); s; s = pa_idxset_next(m->core->sinks, &idx)) {
+ PA_IDXSET_FOREACH(s, m->core->sinks, idx) {
if (!is_suitable_sink(u, s))
continue;
@@ -1164,9 +1257,8 @@ int pa__init(pa_module*m) {
/* Activate the sink and the sink inputs */
pa_sink_put(u->sink);
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
- if (o->sink_input)
- pa_sink_input_put(o->sink_input);
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
+ output_verify(o);
if (u->adjust_time > 0)
u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time * PA_USEC_PER_SEC, time_callback, u);
@@ -1185,37 +1277,6 @@ fail:
return -1;
}
-static void output_free(struct output *o) {
- pa_assert(o);
-
- disable_output(o);
-
- pa_assert_se(pa_idxset_remove_by_data(o->userdata->outputs, o, NULL));
-
- update_description(o->userdata);
-
- if (o->inq_rtpoll_item_read)
- pa_rtpoll_item_free(o->inq_rtpoll_item_read);
- if (o->inq_rtpoll_item_write)
- pa_rtpoll_item_free(o->inq_rtpoll_item_write);
-
- if (o->outq_rtpoll_item_read)
- pa_rtpoll_item_free(o->outq_rtpoll_item_read);
- if (o->outq_rtpoll_item_write)
- pa_rtpoll_item_free(o->outq_rtpoll_item_write);
-
- if (o->inq)
- pa_asyncmsgq_unref(o->inq);
-
- if (o->outq)
- pa_asyncmsgq_unref(o->outq);
-
- if (o->memblockq)
- pa_memblockq_free(o->memblockq);
-
- pa_xfree(o);
-}
-
void pa__done(pa_module*m) {
struct userdata *u;
struct output *o;