summaryrefslogtreecommitdiffstats
path: root/src/modules/module-combine.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/module-combine.c')
-rw-r--r--src/modules/module-combine.c552
1 files changed, 333 insertions, 219 deletions
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index d50e59ae..582cbce1 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -92,6 +92,8 @@ struct output {
pa_sink *sink;
pa_sink_input *sink_input;
+ pa_bool_t ignore_state_change;
+
pa_asyncmsgq *inq, /* Message queue from the sink thread to this sink input */
*outq; /* Message queue from this sink input to the sink thread */
pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
@@ -99,9 +101,12 @@ struct output {
pa_memblockq *memblockq;
+ /* For communication of the stream latencies to the main thread */
pa_usec_t total_latency;
+ /* For coomunication of the stream parameters to the sink thread */
pa_atomic_t max_request;
+ pa_atomic_t requested_latency;
PA_LLIST_FIELDS(struct output);
};
@@ -119,13 +124,12 @@ struct userdata {
uint32_t adjust_time;
pa_bool_t automatic;
+ pa_bool_t auto_desc;
pa_hook_slot *sink_put_slot, *sink_unlink_slot, *sink_state_changed_slot;
pa_resample_method_t resample_method;
- struct timeval adjust_timestamp;
-
pa_usec_t block_usec;
pa_idxset* outputs; /* managed in main context */
@@ -145,13 +149,16 @@ enum {
SINK_MESSAGE_REMOVE_OUTPUT,
SINK_MESSAGE_NEED,
SINK_MESSAGE_UPDATE_LATENCY,
- SINK_MESSAGE_UPDATE_MAX_REQUEST
+ SINK_MESSAGE_UPDATE_MAX_REQUEST,
+ SINK_MESSAGE_UPDATE_REQUESTED_LATENCY
};
enum {
SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
};
+static void output_disable(struct output *o);
+static void output_enable(struct output *o);
static void output_free(struct output *o);
static int output_create_sink_input(struct output *o);
@@ -171,7 +178,7 @@ static void adjust_rates(struct userdata *u) {
if (!PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)))
return;
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+ PA_IDXSET_FOREACH(o, u->outputs, idx) {
pa_usec_t sink_latency;
if (!o->sink_input || !PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
@@ -188,6 +195,11 @@ static void adjust_rates(struct userdata *u) {
avg_total_latency += o->total_latency;
n++;
+
+ pa_log_debug("[%s] total=%0.2fms sink=%0.2fms ", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC, (double) sink_latency / PA_USEC_PER_MSEC);
+
+ if (o->total_latency > 10*PA_USEC_PER_SEC)
+ pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC);
}
if (min_total_latency == (pa_usec_t) -1)
@@ -202,7 +214,7 @@ static void adjust_rates(struct userdata *u) {
base_rate = u->sink->sample_spec.rate;
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+ PA_IDXSET_FOREACH(o, u->outputs, idx) {
uint32_t r = base_rate;
if (!o->sink_input || !PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
@@ -214,10 +226,10 @@ static void adjust_rates(struct userdata *u) {
r += (uint32_t) ((((double) (o->total_latency - target_latency))/(double)u->adjust_time)*(double)r/PA_USEC_PER_SEC);
if (r < (uint32_t) (base_rate*0.9) || r > (uint32_t) (base_rate*1.1)) {
- pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", pa_proplist_gets(o->sink_input->proplist, PA_PROP_MEDIA_NAME), base_rate, r);
+ pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", o->sink_input->sink->name, base_rate, r);
pa_sink_input_set_rate(o->sink_input, base_rate);
} else {
- pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.0f usec.", pa_proplist_gets(o->sink_input->proplist, PA_PROP_MEDIA_NAME), r, (double) r / base_rate, (float) o->total_latency);
+ pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.0f usec.", o->sink_input->sink->name, r, (double) r / base_rate, (float) o->total_latency);
pa_sink_input_set_rate(o->sink_input, r);
}
}
@@ -354,18 +366,15 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
u->thread_info.counter += chunk.length;
/* OK, let's send this data to the other threads */
- for (j = u->thread_info.active_outputs; j; j = j->next)
-
- /* Send to other outputs, which are not the requesting
- * one */
+ PA_LLIST_FOREACH(j, u->thread_info.active_outputs) {
+ if (j == o)
+ continue;
- if (j != o)
- pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+ pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+ }
/* And place it directly into the requesting output's queue */
- if (o)
- pa_memblockq_push_align(o->memblockq, &chunk);
-
+ pa_memblockq_push_align(o->memblockq, &chunk);
pa_memblock_unref(chunk.memblock);
}
}
@@ -401,10 +410,18 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
/* If necessary, get some new data */
request_memblock(o, nbytes);
+ /* pa_log("%s q size is %u + %u (%u/%u)", */
+ /* i->sink->name, */
+ /* pa_memblockq_get_nblocks(o->memblockq), */
+ /* pa_memblockq_get_nblocks(i->thread_info.render_memblockq), */
+ /* pa_memblockq_get_maxrewind(o->memblockq), */
+ /* pa_memblockq_get_maxrewind(i->thread_info.render_memblockq)); */
+
if (pa_memblockq_peek(o->memblockq, chunk) < 0)
return -1;
pa_memblockq_drop(o->memblockq, chunk->length);
+
return 0;
}
@@ -439,13 +456,35 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
return;
pa_atomic_store(&o->max_request, (int) nbytes);
-
pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
}
+/* Called from thread context */
+static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
+ struct output *o;
+ pa_usec_t c;
+
+ pa_assert(i);
+
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(o = i->userdata);
+
+ c = pa_sink_get_requested_latency_within_thread(i->sink);
+
+ if (c == (pa_usec_t) -1)
+ c = i->sink->thread_info.max_latency;
+
+ if (pa_atomic_load(&o->requested_latency) == (int) c)
+ return;
+
+ pa_atomic_store(&o->requested_latency, (int) c);
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
+}
+
/* Called from I/O thread context */
static void sink_input_attach_cb(pa_sink_input *i) {
struct output *o;
+ pa_usec_t c;
pa_sink_input_assert_ref(i);
pa_assert_se(o = i->userdata);
@@ -454,14 +493,24 @@ static void sink_input_attach_cb(pa_sink_input *i) {
pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write);
o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
- i->sink->rtpoll,
+ i->sink->thread_info.rtpoll,
PA_RTPOLL_LATE, /* This one is not that important, since we check for data in _peek() anyway. */
o->inq);
o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
- i->sink->rtpoll,
+ i->sink->thread_info.rtpoll,
PA_RTPOLL_EARLY,
o->outq);
+
+ pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
+
+ pa_atomic_store(&o->max_request, (int) pa_sink_input_get_max_request(i));
+
+ c = pa_sink_get_requested_latency_within_thread(i->sink);
+ pa_atomic_store(&o->requested_latency, (int) (c == (pa_usec_t) -1 ? 0 : c));
+
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
}
/* Called from I/O thread context */
@@ -471,14 +520,15 @@ static void sink_input_detach_cb(pa_sink_input *i) {
pa_sink_input_assert_ref(i);
pa_assert_se(o = i->userdata);
- /* Shut down the queue from the sink thread to us */
- pa_assert(o->inq_rtpoll_item_read && o->outq_rtpoll_item_write);
-
- pa_rtpoll_item_free(o->inq_rtpoll_item_read);
- o->inq_rtpoll_item_read = NULL;
+ if (o->inq_rtpoll_item_read) {
+ pa_rtpoll_item_free(o->inq_rtpoll_item_read);
+ o->inq_rtpoll_item_read = NULL;
+ }
- pa_rtpoll_item_free(o->outq_rtpoll_item_write);
- o->outq_rtpoll_item_write = NULL;
+ if (o->outq_rtpoll_item_write) {
+ pa_rtpoll_item_free(o->outq_rtpoll_item_write);
+ o->outq_rtpoll_item_write = NULL;
+ }
}
/* Called from main context */
@@ -492,20 +542,6 @@ static void sink_input_kill_cb(pa_sink_input *i) {
output_free(o);
}
-/* Called from IO thread context */
-static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
- struct userdata *u;
-
- pa_sink_input_assert_ref(i);
- pa_assert_se(u = i->userdata);
-
- /* If we are added for the first time, ask for a rewinding so that
- * we are heard right-away. */
- if (PA_SINK_INPUT_IS_LINKED(state) &&
- i->thread_info.state == PA_SINK_INPUT_INIT)
- pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
-}
-
/* Called from thread context */
static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct output *o = PA_SINK_INPUT(obj)->userdata;
@@ -536,37 +572,6 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
}
/* Called from main context */
-static void disable_output(struct output *o) {
- pa_assert(o);
-
- if (!o->sink_input)
- return;
-
- pa_sink_input_unlink(o->sink_input);
- pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
- pa_sink_input_unref(o->sink_input);
- o->sink_input = NULL;
-}
-
-/* Called from main context */
-static void enable_output(struct output *o) {
- pa_assert(o);
-
- if (o->sink_input)
- return;
-
- if (output_create_sink_input(o) >= 0) {
-
- pa_memblockq_flush_write(o->memblockq);
-
- pa_sink_input_put(o->sink_input);
-
- if (o->userdata->sink && PA_SINK_IS_LINKED(pa_sink_get_state(o->userdata->sink)))
- pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
- }
-}
-
-/* Called from main context */
static void suspend(struct userdata *u) {
struct output *o;
uint32_t idx;
@@ -574,8 +579,8 @@ static void suspend(struct userdata *u) {
pa_assert(u);
/* Let's suspend by unlinking all streams */
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
- disable_output(o);
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
+ output_disable(o);
pa_log_info("Device suspended...");
}
@@ -588,13 +593,8 @@ static void unsuspend(struct userdata *u) {
pa_assert(u);
/* Let's resume */
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
-
- pa_sink_suspend(o->sink, FALSE, PA_SUSPEND_IDLE);
-
- if (PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
- enable_output(o);
- }
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
+ output_enable(o);
pa_log_info("Resumed successfully...");
}
@@ -638,7 +638,13 @@ static void update_max_request(struct userdata *u) {
size_t max_request = 0;
struct output *o;
- for (o = u->thread_info.active_outputs; o; o = o->next) {
+ pa_assert(u);
+ pa_sink_assert_io_context(u->sink);
+
+ /* Collects the max_request values of all streams and sets the
+ * largest one locally */
+
+ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
size_t mr = (size_t) pa_atomic_load(&o->max_request);
if (mr > max_request)
@@ -651,6 +657,67 @@ static void update_max_request(struct userdata *u) {
pa_sink_set_max_request_within_thread(u->sink, max_request);
}
+/* Called from IO context */
+static void update_fixed_latency(struct userdata *u) {
+ pa_usec_t fixed_latency = 0;
+ struct output *o;
+
+ pa_assert(u);
+ pa_sink_assert_io_context(u->sink);
+
+ /* Collects the requested_latency values of all streams and sets
+ * the largest one as fixed_latency locally */
+
+ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
+ pa_usec_t rl = (size_t) pa_atomic_load(&o->requested_latency);
+
+ if (rl > fixed_latency)
+ fixed_latency = rl;
+ }
+
+ if (fixed_latency <= 0)
+ fixed_latency = u->block_usec;
+
+ pa_sink_set_fixed_latency_within_thread(u->sink, fixed_latency);
+}
+
+/* Called from thread context of the io thread */
+static void output_add_within_thread(struct output *o) {
+ pa_assert(o);
+ pa_sink_assert_io_context(o->sink);
+
+ PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
+
+ pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
+
+ o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
+ o->userdata->rtpoll,
+ PA_RTPOLL_EARLY-1, /* This item is very important */
+ o->outq);
+ o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
+ o->userdata->rtpoll,
+ PA_RTPOLL_EARLY,
+ o->inq);
+}
+
+/* Called from thread context of the io thread */
+static void output_remove_within_thread(struct output *o) {
+ pa_assert(o);
+ pa_sink_assert_io_context(o->sink);
+
+ PA_LLIST_REMOVE(struct output, o->userdata->thread_info.active_outputs, o);
+
+ if (o->outq_rtpoll_item_read) {
+ pa_rtpoll_item_free(o->outq_rtpoll_item_read);
+ o->outq_rtpoll_item_read = NULL;
+ }
+
+ if (o->inq_rtpoll_item_write) {
+ pa_rtpoll_item_free(o->inq_rtpoll_item_write);
+ o->inq_rtpoll_item_write = NULL;
+ }
+}
+
/* Called from thread context of the io thread */
static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
@@ -683,42 +750,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
return 0;
}
- case SINK_MESSAGE_ADD_OUTPUT: {
- struct output *op = data;
-
- PA_LLIST_PREPEND(struct output, u->thread_info.active_outputs, op);
-
- pa_assert(!op->outq_rtpoll_item_read && !op->inq_rtpoll_item_write);
-
- op->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
- u->rtpoll,
- PA_RTPOLL_EARLY-1, /* This item is very important */
- op->outq);
- op->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
- u->rtpoll,
- PA_RTPOLL_EARLY,
- op->inq);
-
+ case SINK_MESSAGE_ADD_OUTPUT:
+ output_add_within_thread(data);
update_max_request(u);
+ update_fixed_latency(u);
return 0;
- }
-
- case SINK_MESSAGE_REMOVE_OUTPUT: {
- struct output *op = data;
-
- PA_LLIST_REMOVE(struct output, u->thread_info.active_outputs, op);
-
- pa_assert(op->outq_rtpoll_item_read && op->inq_rtpoll_item_write);
-
- pa_rtpoll_item_free(op->outq_rtpoll_item_read);
- op->outq_rtpoll_item_read = NULL;
-
- pa_rtpoll_item_free(op->inq_rtpoll_item_write);
- op->inq_rtpoll_item_write = NULL;
+ case SINK_MESSAGE_REMOVE_OUTPUT:
+ output_remove_within_thread(data);
update_max_request(u);
+ update_fixed_latency(u);
return 0;
- }
case SINK_MESSAGE_NEED:
render_memblock(u, (struct output*) data, (size_t) offset);
@@ -740,10 +782,13 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
}
case SINK_MESSAGE_UPDATE_MAX_REQUEST:
-
update_max_request(u);
break;
- }
+
+ case SINK_MESSAGE_UPDATE_REQUESTED_LATENCY:
+ update_fixed_latency(u);
+ break;
+}
return pa_sink_process_msg(o, code, data, offset, chunk);
}
@@ -756,6 +801,9 @@ static void update_description(struct userdata *u) {
pa_assert(u);
+ if (!u->auto_desc)
+ return;
+
if (pa_idxset_isempty(u->outputs)) {
pa_sink_set_description(u->sink, "Simultaneous output");
return;
@@ -763,7 +811,7 @@ static void update_description(struct userdata *u) {
t = pa_xstrdup("Simultaneous output to");
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+ PA_IDXSET_FOREACH(o, u->outputs, idx) {
char *e;
if (first) {
@@ -798,7 +846,7 @@ static int output_create_sink_input(struct output *o) {
data.module = o->userdata->module;
data.resample_method = o->userdata->resample_method;
- pa_sink_input_new(&o->sink_input, o->userdata->core, &data, PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE);
+ pa_sink_input_new(&o->sink_input, o->userdata->core, &data, PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE|PA_SINK_INPUT_NO_CREATE_ON_SUSPEND);
pa_sink_input_new_data_done(&data);
@@ -808,9 +856,9 @@ static int output_create_sink_input(struct output *o) {
o->sink_input->parent.process_msg = sink_input_process_msg;
o->sink_input->pop = sink_input_pop_cb;
o->sink_input->process_rewind = sink_input_process_rewind_cb;
- o->sink_input->state_change = sink_input_state_change_cb;
o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
o->sink_input->update_max_request = sink_input_update_max_request_cb;
+ o->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
o->sink_input->attach = sink_input_attach_cb;
o->sink_input->detach = sink_input_detach_cb;
o->sink_input->kill = sink_input_kill_cb;
@@ -821,22 +869,19 @@ static int output_create_sink_input(struct output *o) {
return 0;
}
+/* Called from main context */
static struct output *output_new(struct userdata *u, pa_sink *sink) {
struct output *o;
- pa_sink_state_t state;
pa_assert(u);
pa_assert(sink);
pa_assert(u->sink);
- o = pa_xnew(struct output, 1);
+ o = pa_xnew0(struct output, 1);
o->userdata = u;
o->inq = pa_asyncmsgq_new(0);
o->outq = pa_asyncmsgq_new(0);
- o->inq_rtpoll_item_write = o->inq_rtpoll_item_read = NULL;
- o->outq_rtpoll_item_write = o->outq_rtpoll_item_read = NULL;
o->sink = sink;
- o->sink_input = NULL;
o->memblockq = pa_memblockq_new(
0,
MEMBLOCKQ_MAXLENGTH,
@@ -846,84 +891,135 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
0,
0,
NULL);
- pa_atomic_store(&o->max_request, 0);
- PA_LLIST_INIT(struct output, o);
pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
+ update_description(u);
- state = pa_sink_get_state(u->sink);
-
- if (state != PA_SINK_INIT)
- pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
- else {
- /* If the sink is not yet started, we need to do the activation ourselves */
- PA_LLIST_PREPEND(struct output, u->thread_info.active_outputs, o);
-
- o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
- u->rtpoll,
- PA_RTPOLL_EARLY-1, /* This item is very important */
- o->outq);
- o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
- u->rtpoll,
- PA_RTPOLL_EARLY,
- o->inq);
- }
+ return o;
+}
- if (PA_SINK_IS_OPENED(state) || state == PA_SINK_INIT) {
- pa_sink_suspend(sink, FALSE, PA_SUSPEND_IDLE);
+/* Called from main context */
+static void output_free(struct output *o) {
+ pa_assert(o);
- if (PA_SINK_IS_OPENED(pa_sink_get_state(sink)))
- if (output_create_sink_input(o) < 0)
- goto fail;
- }
+ output_disable(o);
- update_description(u);
+ pa_assert_se(pa_idxset_remove_by_data(o->userdata->outputs, o, NULL));
+ update_description(o->userdata);
- return o;
+ if (o->inq_rtpoll_item_read)
+ pa_rtpoll_item_free(o->inq_rtpoll_item_read);
+ if (o->inq_rtpoll_item_write)
+ pa_rtpoll_item_free(o->inq_rtpoll_item_write);
-fail:
+ if (o->outq_rtpoll_item_read)
+ pa_rtpoll_item_free(o->outq_rtpoll_item_read);
+ if (o->outq_rtpoll_item_write)
+ pa_rtpoll_item_free(o->outq_rtpoll_item_write);
- if (o) {
- pa_idxset_remove_by_data(u->outputs, o, NULL);
+ if (o->inq)
+ pa_asyncmsgq_unref(o->inq);
- if (o->sink_input) {
- pa_sink_input_unlink(o->sink_input);
- pa_sink_input_unref(o->sink_input);
- }
+ if (o->outq)
+ pa_asyncmsgq_unref(o->outq);
+
+ if (o->memblockq)
+ pa_memblockq_free(o->memblockq);
+
+ pa_xfree(o);
+}
+
+/* Called from main context */
+static void output_enable(struct output *o) {
+ pa_assert(o);
+
+ if (o->sink_input)
+ return;
+
+ /* This might cause the sink to be resumed. The state change hook
+ * of the sink might hence be called from here, which might then
+ * cause us to be called in a loop. Make sure that state changes
+ * for this output don't cause this loop by setting a flag here */
+ o->ignore_state_change = TRUE;
- if (o->memblockq)
- pa_memblockq_free(o->memblockq);
+ if (output_create_sink_input(o) >= 0) {
- if (o->inq)
- pa_asyncmsgq_unref(o->inq);
+ if (pa_sink_get_state(o->sink) != PA_SINK_INIT) {
- if (o->outq)
- pa_asyncmsgq_unref(o->outq);
+ /* First we register the output. That means that the sink
+ * will start to pass data to this output. */
+ pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
- pa_xfree(o);
+ /* Then we enable the sink input. That means that the sink
+ * is now asked for new data. */
+ pa_sink_input_put(o->sink_input);
+
+ } else
+ /* Hmm the sink is not yet started, do things right here */
+ output_add_within_thread(o);
}
- return NULL;
+ o->ignore_state_change = FALSE;
}
+/* Called from main context */
+static void output_disable(struct output *o) {
+ pa_assert(o);
+
+ if (!o->sink_input)
+ return;
+
+ /* First we disable the sink input. That means that the sink is
+ * not asked for new data anymore */
+ pa_sink_input_unlink(o->sink_input);
+
+ /* Then we unregister the output. That means that the sink doesn't
+ * pass any further data to this output */
+ pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
+
+ /* Now dellocate the stream */
+ pa_sink_input_unref(o->sink_input);
+ o->sink_input = NULL;
+
+ /* Finally, drop all queued data */
+ pa_memblockq_flush_write(o->memblockq);
+ pa_asyncmsgq_flush(o->inq, FALSE);
+ pa_asyncmsgq_flush(o->outq, FALSE);
+}
+
+/* Called from main context */
+static void output_verify(struct output *o) {
+ pa_assert(o);
+
+ if (PA_SINK_IS_OPENED(pa_sink_get_state(o->userdata->sink)))
+ output_enable(o);
+ else
+ output_disable(o);
+}
+
+/* Called from main context */
static pa_bool_t is_suitable_sink(struct userdata *u, pa_sink *s) {
const char *t;
pa_sink_assert_ref(s);
+ if (s == u->sink)
+ return FALSE;
+
if (!(s->flags & PA_SINK_HARDWARE))
return FALSE;
- if (s == u->sink)
+ if (!(s->flags & PA_SINK_LATENCY))
return FALSE;
if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_CLASS)))
- if (strcmp(t, "sound"))
+ if (!pa_streq(t, "sound"))
return FALSE;
return TRUE;
}
+/* Called from main context */
static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
struct output *o;
@@ -936,18 +1032,17 @@ static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata
return PA_HOOK_OK;
pa_log_info("Configuring new sink: %s", s->name);
-
if (!(o = output_new(u, s))) {
pa_log("Failed to create sink input on sink '%s'.", s->name);
return PA_HOOK_OK;
}
- if (o->sink_input)
- pa_sink_input_put(o->sink_input);
+ output_verify(o);
return PA_HOOK_OK;
}
+/* Called from main context */
static struct output* find_output(struct userdata *u, pa_sink *s) {
struct output *o;
uint32_t idx;
@@ -958,13 +1053,14 @@ static struct output* find_output(struct userdata *u, pa_sink *s) {
if (u->sink == s)
return NULL;
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
if (o->sink == s)
return o;
return NULL;
}
+/* Called from main context */
static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
struct output *o;
@@ -976,26 +1072,25 @@ static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userd
return PA_HOOK_OK;
pa_log_info("Unconfiguring sink: %s", s->name);
-
output_free(o);
return PA_HOOK_OK;
}
+/* Called from main context */
static pa_hook_result_t sink_state_changed_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
struct output *o;
- pa_sink_state_t state;
if (!(o = find_output(u, s)))
return PA_HOOK_OK;
- state = pa_sink_get_state(s);
-
- if (PA_SINK_IS_OPENED(state) && PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)) && !o->sink_input)
- enable_output(o);
+ /* This state change might be triggered because we are creating a
+ * stream here, in that case we don't want to create it a second
+ * time here and enter a loop */
+ if (o->ignore_state_change)
+ return PA_HOOK_OK;
- if (state == PA_SINK_SUSPENDED && o->sink_input)
- disable_output(o);
+ output_verify(o);
return PA_HOOK_OK;
}
@@ -1025,18 +1120,14 @@ int pa__init(pa_module*m) {
}
}
- m->userdata = u = pa_xnew(struct userdata, 1);
+ m->userdata = u = pa_xnew0(struct userdata, 1);
u->core = m->core;
u->module = m;
- u->sink = NULL;
- u->time_event = NULL;
u->adjust_time = DEFAULT_ADJUST_TIME;
u->rtpoll = pa_rtpoll_new();
pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
- u->thread = NULL;
u->resample_method = resample_method;
u->outputs = pa_idxset_new(NULL, NULL);
- memset(&u->adjust_timestamp, 0, sizeof(u->adjust_timestamp));
u->sink_put_slot = u->sink_unlink_slot = u->sink_state_changed_slot = NULL;
PA_LLIST_HEAD_INIT(struct output, u->thread_info.active_outputs);
pa_atomic_store(&u->thread_info.running, FALSE);
@@ -1061,6 +1152,55 @@ int pa__init(pa_module*m) {
ss = m->core->default_sample_spec;
map = m->core->default_channel_map;
+
+ /* Check the specified slave sinks for sample_spec and channel_map to use for the combined sink */
+ if (!u->automatic) {
+ const char*split_state = NULL;
+ char *n = NULL;
+ pa_sample_spec slaves_spec;
+ pa_channel_map slaves_map;
+ pa_bool_t is_first_slave = TRUE;
+
+ while ((n = pa_split(slaves, ",", &split_state))) {
+ pa_sink *slave_sink;
+
+ if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
+ pa_log("Invalid slave sink '%s'", n);
+ pa_xfree(n);
+ goto fail;
+ }
+
+ pa_xfree(n);
+
+ if (is_first_slave) {
+ slaves_spec = slave_sink->sample_spec;
+ slaves_map = slave_sink->channel_map;
+ is_first_slave = FALSE;
+ } else {
+ if (slaves_spec.format != slave_sink->sample_spec.format)
+ slaves_spec.format = PA_SAMPLE_INVALID;
+
+ if (slaves_spec.rate < slave_sink->sample_spec.rate)
+ slaves_spec.rate = slave_sink->sample_spec.rate;
+
+ if (!pa_channel_map_equal(&slaves_map, &slave_sink->channel_map))
+ slaves_spec.channels = 0;
+ }
+ }
+
+ if (!is_first_slave) {
+ if (slaves_spec.format != PA_SAMPLE_INVALID)
+ ss.format = slaves_spec.format;
+
+ ss.rate = slaves_spec.rate;
+
+ if (slaves_spec.channels > 0) {
+ map = slaves_map;
+ ss.channels = slaves_map.channels;
+ }
+ }
+ }
+
if ((pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0)) {
pa_log("Invalid sample specification.");
goto fail;
@@ -1073,7 +1213,6 @@ int pa__init(pa_module*m) {
pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
pa_sink_new_data_set_sample_spec(&data, &ss);
pa_sink_new_data_set_channel_map(&data, &map);
- pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "filter");
if (slaves)
@@ -1085,6 +1224,13 @@ int pa__init(pa_module*m) {
goto fail;
}
+ /* Check proplist for a description & fill in a default value if not */
+ u->auto_desc = FALSE;
+ if (NULL == pa_proplist_gets(data.proplist, PA_PROP_DEVICE_DESCRIPTION)) {
+ u->auto_desc = TRUE;
+ pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
+ }
+
u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
pa_sink_new_data_done(&data);
@@ -1138,7 +1284,7 @@ int pa__init(pa_module*m) {
/* We're in automatic mode, we add every sink that matches our needs */
- for (s = pa_idxset_first(m->core->sinks, &idx); s; s = pa_idxset_next(m->core->sinks, &idx)) {
+ PA_IDXSET_FOREACH(s, m->core->sinks, idx) {
if (!is_suitable_sink(u, s))
continue;
@@ -1163,9 +1309,8 @@ int pa__init(pa_module*m) {
/* Activate the sink and the sink inputs */
pa_sink_put(u->sink);
- for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
- if (o->sink_input)
- pa_sink_input_put(o->sink_input);
+ PA_IDXSET_FOREACH(o, u->outputs, idx)
+ output_verify(o);
if (u->adjust_time > 0)
u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time * PA_USEC_PER_SEC, time_callback, u);
@@ -1184,37 +1329,6 @@ fail:
return -1;
}
-static void output_free(struct output *o) {
- pa_assert(o);
-
- disable_output(o);
-
- pa_assert_se(pa_idxset_remove_by_data(o->userdata->outputs, o, NULL));
-
- update_description(o->userdata);
-
- if (o->inq_rtpoll_item_read)
- pa_rtpoll_item_free(o->inq_rtpoll_item_read);
- if (o->inq_rtpoll_item_write)
- pa_rtpoll_item_free(o->inq_rtpoll_item_write);
-
- if (o->outq_rtpoll_item_read)
- pa_rtpoll_item_free(o->outq_rtpoll_item_read);
- if (o->outq_rtpoll_item_write)
- pa_rtpoll_item_free(o->outq_rtpoll_item_write);
-
- if (o->inq)
- pa_asyncmsgq_unref(o->inq);
-
- if (o->outq)
- pa_asyncmsgq_unref(o->outq);
-
- if (o->memblockq)
- pa_memblockq_free(o->memblockq);
-
- pa_xfree(o);
-}
-
void pa__done(pa_module*m) {
struct userdata *u;
struct output *o;