summaryrefslogtreecommitdiffstats
path: root/src/polypcore/protocol-native.c
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
committerLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
commit304449002cbc84fdcf235b5dfaec891278dd7085 (patch)
tree2a2d00e34d5c620835b76a0d6f7890a1d3e9fb97 /src/polypcore/protocol-native.c
parent0876b1ba82ea9c988df90ca98d202765ac697313 (diff)
1) Add flexible seeking support (including absolute) for memory block queues and playback streams
2) Add support to synchronize multiple playback streams 3) add two tests for 1) and 2) 4) s/PA_ERROR/PA_ERR/ 5) s/PA_ERROR_OK/PA_OK/ 6) update simple API to deal properly with new peek/drop recording API 7) add beginnings of proper validity checking on API calls in client libs (needs to be extended) 8) report playback buffer overflows/underflows to the client 9) move client side recording mcalign stuff into the memblockq 10) create typedefs for a bunch of API callback prototypes 11) simplify handling of HUP poll() events Yes, i know, it's usually better to commit a lot of small patches instead of a single big one. In this case however, this would have contradicted the other rule: never commit broken or incomplete stuff. *** This stuff needs a lot of additional testing! *** git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@511 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/polypcore/protocol-native.c')
-rw-r--r--src/polypcore/protocol-native.c388
1 files changed, 282 insertions, 106 deletions
diff --git a/src/polypcore/protocol-native.c b/src/polypcore/protocol-native.c
index 1362fdf2..aaa4fc48 100644
--- a/src/polypcore/protocol-native.c
+++ b/src/polypcore/protocol-native.c
@@ -48,6 +48,8 @@
#include <polypcore/authkey-prop.h>
#include <polypcore/strlist.h>
#include <polypcore/props.h>
+#include <polypcore/sample-util.h>
+#include <polypcore/llist.h>
#include "protocol-native.h"
@@ -77,6 +79,11 @@ struct playback_stream {
size_t requested_bytes;
int drain_request;
uint32_t drain_tag;
+ uint32_t syncid;
+ int underrun;
+
+ /* Sync group members */
+ PA_LLIST_FIELDS(struct playback_stream);
};
struct upload_stream {
@@ -153,7 +160,8 @@ static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t
static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_flush_or_trigger_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_flush_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_trigger_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -210,9 +218,9 @@ static const pa_pdispatch_callback command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
[PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
- [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_flush_or_trigger_playback_stream,
- [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_flush_or_trigger_playback_stream,
- [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_flush_or_trigger_playback_stream,
+ [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_flush_playback_stream,
+ [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
+ [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
[PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
[PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
@@ -244,7 +252,7 @@ static struct upload_stream* upload_stream_new(
struct upload_stream *s;
assert(c && ss && name && length);
- s = pa_xmalloc(sizeof(struct upload_stream));
+ s = pa_xnew(struct upload_stream, 1);
s->type = UPLOAD_STREAM;
s->connection = c;
s->sample_spec = *ss;
@@ -291,7 +299,7 @@ static struct record_stream* record_stream_new(
if (!(source_output = pa_source_output_new(source, __FILE__, name, ss, map, -1)))
return NULL;
- s = pa_xmalloc(sizeof(struct record_stream));
+ s = pa_xnew(struct record_stream, 1);
s->connection = c;
s->source_output = source_output;
s->source_output->push = source_output_push_cb;
@@ -301,7 +309,15 @@ static struct record_stream* record_stream_new(
s->source_output->owner = c->protocol->module;
s->source_output->client = c->client;
- s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0, c->protocol->core->memblock_stat);
+ s->memblockq = pa_memblockq_new(
+ 0,
+ maxlength,
+ 0,
+ base = pa_frame_size(ss),
+ 1,
+ 0,
+ NULL,
+ c->protocol->core->memblock_stat);
assert(s->memblockq);
s->fragment_size = (fragment_size/base)*base;
@@ -332,19 +348,40 @@ static struct playback_stream* playback_stream_new(
size_t tlength,
size_t prebuf,
size_t minreq,
- pa_cvolume *volume) {
+ pa_cvolume *volume,
+ uint32_t syncid) {
- struct playback_stream *s;
+ struct playback_stream *s, *sync;
pa_sink_input *sink_input;
+ pa_memblock *silence;
+ uint32_t idx;
+ int64_t start_index;
+
assert(c && sink && ss && name && maxlength);
+ /* Find syncid group */
+ for (sync = pa_idxset_first(c->output_streams, &idx); sync; sync = pa_idxset_next(c->output_streams, &idx)) {
+
+ if (sync->type != PLAYBACK_STREAM)
+ continue;
+
+ if (sync->syncid == syncid)
+ break;
+ }
+
+ /* Synced streams must connect to the same sink */
+ if (sync && sync->sink_input->sink != sink)
+ return NULL;
+
if (!(sink_input = pa_sink_input_new(sink, __FILE__, name, ss, map, 0, -1)))
return NULL;
- s = pa_xmalloc(sizeof(struct playback_stream));
+ s = pa_xnew(struct playback_stream, 1);
s->type = PLAYBACK_STREAM;
s->connection = c;
+ s->syncid = syncid;
s->sink_input = sink_input;
+ s->underrun = 1;
s->sink_input->peek = sink_input_peek_cb;
s->sink_input->drop = sink_input_drop_cb;
@@ -353,24 +390,56 @@ static struct playback_stream* playback_stream_new(
s->sink_input->userdata = s;
s->sink_input->owner = c->protocol->module;
s->sink_input->client = c->client;
-
- s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq, c->protocol->core->memblock_stat);
- assert(s->memblockq);
+ if (sync) {
+ /* Sync id found, now find head of list */
+ PA_LLIST_FIND_HEAD(struct playback_stream, sync, &sync);
+
+ /* Prepend ourselves */
+ PA_LLIST_PREPEND(struct playback_stream, sync, s);
+
+ /* Set our start index to the current read index of the other grozp member(s) */
+ assert(sync->next);
+ start_index = pa_memblockq_get_read_index(sync->next->memblockq);
+ } else {
+ /* This ia a new sync group */
+ PA_LLIST_INIT(struct playback_stream, s);
+ start_index = 0;
+ }
+
+ silence = pa_silence_memblock_new(ss, 0, c->protocol->core->memblock_stat);
+
+ s->memblockq = pa_memblockq_new(
+ start_index,
+ maxlength,
+ tlength,
+ pa_frame_size(ss),
+ prebuf,
+ minreq,
+ silence,
+ c->protocol->core->memblock_stat);
+
+ pa_memblock_unref(silence);
+
s->requested_bytes = 0;
s->drain_request = 0;
s->sink_input->volume = *volume;
pa_idxset_put(c->output_streams, s, &s->index);
+
return s;
}
static void playback_stream_free(struct playback_stream* p) {
+ struct playback_stream *head;
assert(p && p->connection);
if (p->drain_request)
- pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERR_NOENTITY);
+
+ PA_LLIST_FIND_HEAD(struct playback_stream, p, &head);
+ PA_LLIST_REMOVE(struct playback_stream, head, p);
pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
pa_sink_input_disconnect(p->sink_input);
@@ -436,7 +505,7 @@ static void request_bytes(struct playback_stream *s) {
pa_tagstruct_putu32(t, l);
pa_pstream_send_tagstruct(s->connection->pstream, t);
-/* pa_log(__FILE__": Requesting %u bytes\n", l); */
+/* pa_log(__FILE__": Requesting %u bytes\n", l); */
}
static void send_memblock(struct connection *c) {
@@ -461,7 +530,7 @@ static void send_memblock(struct connection *c) {
if (schunk.length > r->fragment_size)
schunk.length = r->fragment_size;
- pa_pstream_send_memblock(c->pstream, r->index, 0, &schunk);
+ pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
pa_memblock_unref(schunk.memblock);
@@ -501,9 +570,27 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
assert(i && i->userdata && chunk);
s = i->userdata;
- if (pa_memblockq_peek(s->memblockq, chunk) < 0)
+ if (pa_memblockq_get_length(s->memblockq) <= 0 && !s->underrun) {
+ pa_tagstruct *t;
+
+ /* Report that we're empty */
+
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, s->index);
+ pa_pstream_send_tagstruct(s->connection->pstream, t);
+
+ s->underrun = 1;
+ }
+
+ if (pa_memblockq_peek(s->memblockq, chunk) < 0) {
+ pa_log(__FILE__": peek: failure\n");
return -1;
+ }
+/* pa_log(__FILE__": peek: %u\n", chunk->length); */
+
return 0;
}
@@ -513,6 +600,7 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_
s = i->userdata;
pa_memblockq_drop(s->memblockq, chunk, length);
+
request_bytes(s);
if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
@@ -520,7 +608,7 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_
s->drain_request = 0;
}
-/* pa_log(__FILE__": after_drop: %u\n", pa_memblockq_get_length(s->memblockq)); */
+/* pa_log(__FILE__": after_drop: %u %u\n", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq)); */
}
static void sink_input_kill_cb(pa_sink_input *i) {
@@ -546,7 +634,11 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
assert(o && o->userdata && chunk);
s = o->userdata;
- pa_memblockq_push_align(s->memblockq, chunk, 0);
+ if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+ pa_log_warn(__FILE__": Failed to push data into output queue.\n");
+ return;
+ }
+
if (!pa_pstream_is_pending(s->connection->pstream))
send_memblock(s->connection);
}
@@ -578,7 +670,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
struct connection *c = userdata;
struct playback_stream *s;
size_t maxlength, tlength, prebuf, minreq;
- uint32_t sink_index;
+ uint32_t sink_index, syncid;
const char *name, *sink_name;
pa_sample_spec ss;
pa_channel_map map;
@@ -601,6 +693,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
PA_TAG_U32, &tlength,
PA_TAG_U32, &prebuf,
PA_TAG_U32, &minreq,
+ PA_TAG_U32, &syncid,
PA_TAG_CVOLUME, &volume,
PA_TAG_INVALID) < 0 ||
!pa_tagstruct_eof(t) ||
@@ -610,23 +703,23 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
- if (sink_index != (uint32_t) -1)
+ if (sink_index != PA_INVALID_INDEX)
sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
else
sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1);
if (!sink) {
- pa_log("%s: Can't find a suitable sink.\n", __FILE__);
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_log_warn(__FILE__": Can't find a suitable sink.\n");
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
- if (!(s = playback_stream_new(c, sink, &ss, &map, name, maxlength, tlength, prebuf, minreq, &volume))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ if (!(s = playback_stream_new(c, sink, &ss, &map, name, maxlength, tlength, prebuf, minreq, &volume, syncid))) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
@@ -656,14 +749,14 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
struct playback_stream *s;
if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -671,7 +764,7 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
} else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
struct record_stream *s;
if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -680,7 +773,7 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
struct upload_stream *s;
assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -717,7 +810,7 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -727,12 +820,12 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE, 1);
if (!source) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
if (!(s = record_stream_new(c, source, &ss, &map, name, maxlength, fragment_size))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
@@ -758,7 +851,7 @@ static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -782,7 +875,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
if (!c->authorized) {
if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
pa_log(__FILE__": Denied access to client with invalid authorization key.\n");
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -826,7 +919,7 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -842,7 +935,7 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
}
if (idx == PA_IDXSET_INVALID)
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
else {
pa_tagstruct *reply;
reply = pa_tagstruct_new(NULL, 0);
@@ -867,12 +960,12 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -881,10 +974,10 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
pa_memblockq_prebuf_disable(s->memblockq);
if (!pa_memblockq_is_readable(s->memblockq)) {
-/* pa_log("immediate drain: %u\n", pa_memblockq_get_length(s->memblockq)); */
+/* pa_log("immediate drain: %u\n", pa_memblockq_get_length(s->memblockq)); */
pa_pstream_send_simple_ack(c->pstream, tag);
} else {
-/* pa_log("slow drain triggered\n"); */
+/* pa_log("slow drain triggered\n"); */
s->drain_request = 1;
s->drain_tag = tag;
@@ -903,7 +996,7 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -937,12 +1030,12 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -980,12 +1073,12 @@ static void command_get_record_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1026,17 +1119,17 @@ static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
if (!(s = upload_stream_new(c, &ss, &map, name, length))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
@@ -1063,12 +1156,12 @@ static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -1095,7 +1188,7 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1105,12 +1198,12 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1);
if (!sink) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
if (pa_scache_play_item(c->protocol->core, name, sink, &volume) < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1129,12 +1222,12 @@ static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (pa_scache_remove_item(c->protocol->core, name) < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1261,7 +1354,7 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1292,7 +1385,7 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
}
if (!sink && !source && !client && !module && !si && !so && !sce) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1331,7 +1424,7 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1394,7 +1487,7 @@ static void command_get_server_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1444,7 +1537,7 @@ static void command_subscribe(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1478,7 +1571,7 @@ static void command_set_volume(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command,
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1493,7 +1586,7 @@ static void command_set_volume(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command,
}
if (!si && !sink) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1509,7 +1602,7 @@ static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
struct connection *c = userdata;
uint32_t idx;
int b;
- struct playback_stream *s;
+ struct playback_stream *s, *sync;
assert(c && t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
@@ -1520,20 +1613,82 @@ static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
+ fprintf(stderr, "Corking %i\n", b);
+
pa_sink_input_cork(s->sink_input, b);
+ pa_memblockq_prebuf_force(s->memblockq);
+
+ /* Do the same for all other members in the sync group */
+ for (sync = s->prev; sync; sync = sync->prev) {
+ pa_sink_input_cork(sync->sink_input, b);
+ pa_memblockq_prebuf_force(sync->memblockq);
+ }
+
+ for (sync = s->next; sync; sync = sync->next) {
+ pa_sink_input_cork(sync->sink_input, b);
+ pa_memblockq_prebuf_force(sync->memblockq);
+ }
+
+ pa_pstream_send_simple_ack(c->pstream, tag);
+}
+
+static void command_flush_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+ struct connection *c = userdata;
+ uint32_t idx;
+ struct playback_stream *s, *sync;
+ assert(c && t);
+
+ if (pa_tagstruct_getu32(t, &idx) < 0 ||
+ !pa_tagstruct_eof(t)) {
+ protocol_error(c);
+ return;
+ }
+
+ if (!c->authorized) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
+ return;
+ }
+
+ if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
+ return;
+ }
+
+ pa_memblockq_flush(s->memblockq);
+ s->underrun = 0;
+
+ /* Do the same for all other members in the sync group */
+ for (sync = s->prev; sync; sync = sync->prev) {
+ pa_memblockq_flush(sync->memblockq);
+ sync->underrun = 0;
+ }
+
+ for (sync = s->next; sync; sync = sync->next) {
+ pa_memblockq_flush(sync->memblockq);
+ sync->underrun = 0;
+ }
+
pa_pstream_send_simple_ack(c->pstream, tag);
+ pa_sink_notify(s->sink_input->sink);
+ request_bytes(s);
+
+ for (sync = s->prev; sync; sync = sync->prev)
+ request_bytes(sync);
+
+ for (sync = s->next; sync; sync = sync->next)
+ request_bytes(sync);
}
-static void command_flush_or_trigger_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+static void command_trigger_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
struct connection *c = userdata;
uint32_t idx;
struct playback_stream *s;
@@ -1546,23 +1701,26 @@ static void command_flush_or_trigger_playback_stream(PA_GCC_UNUSED pa_pdispatch
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
- if (command == PA_COMMAND_PREBUF_PLAYBACK_STREAM)
- pa_memblockq_prebuf_reenable(s->memblockq);
- else if (command == PA_COMMAND_TRIGGER_PLAYBACK_STREAM)
- pa_memblockq_prebuf_disable(s->memblockq);
- else {
- assert(command == PA_COMMAND_FLUSH_PLAYBACK_STREAM);
- pa_memblockq_flush(s->memblockq);
- /*pa_log(__FILE__": flush: %u\n", pa_memblockq_get_length(s->memblockq));*/
+ switch (command) {
+ case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
+ pa_memblockq_prebuf_force(s->memblockq);
+ break;
+
+ case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
+ pa_memblockq_prebuf_disable(s->memblockq);
+ break;
+
+ default:
+ abort();
}
pa_sink_notify(s->sink_input->sink);
@@ -1585,16 +1743,17 @@ static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
pa_source_output_cork(s->source_output, b);
+ pa_memblockq_prebuf_force(s->memblockq);
pa_pstream_send_simple_ack(c->pstream, tag);
}
@@ -1611,12 +1770,12 @@ static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1638,7 +1797,7 @@ static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch *pd, u
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1660,7 +1819,7 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1668,7 +1827,7 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
struct playback_stream *s;
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1678,7 +1837,7 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
struct record_stream *s;
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1700,7 +1859,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1708,7 +1867,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
pa_client *client;
if (!(client = pa_idxset_get_by_index(c->protocol->core->clients, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1717,7 +1876,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
pa_sink_input *s;
if (!(s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1728,7 +1887,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
if (!(s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1753,12 +1912,12 @@ static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(m = pa_module_load(c->protocol->core, name, argument))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INITFAILED);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
return;
}
@@ -1782,12 +1941,12 @@ static void command_unload_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(m = pa_idxset_get_by_index(c->protocol->core->modules, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1813,12 +1972,12 @@ static void command_add_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED u
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (pa_autoload_add(c->protocol->core, name, type == 0 ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE, module, argument, &idx) < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -1847,7 +2006,7 @@ static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1857,7 +2016,7 @@ static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
r = pa_autoload_remove_by_index(c->protocol->core, idx);
if (r < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1893,7 +2052,7 @@ static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1904,7 +2063,7 @@ static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU
a = pa_autoload_get_by_index(c->protocol->core, idx);
if (!a) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1927,7 +2086,7 @@ static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1958,7 +2117,7 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *user
}
}
-static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, uint32_t delta, const pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
struct connection *c = userdata;
struct output_stream *stream;
assert(p && chunk && userdata);
@@ -1975,13 +2134,30 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, uint32_t
ps->requested_bytes = 0;
else
ps->requested_bytes -= chunk->length;
-
- pa_memblockq_push_align(ps->memblockq, chunk, delta);
- assert(ps->sink_input);
-/* pa_log(__FILE__": after_recv: %u\n", pa_memblockq_get_length(p->memblockq)); */
+ pa_memblockq_seek(ps->memblockq, offset, seek);
+
+ if (pa_memblockq_push_align(ps->memblockq, chunk) < 0) {
+ pa_tagstruct *t;
+
+ pa_log_warn(__FILE__": failed to push data into queue\n");
+
+ /* Pushing this block into the queue failed, so we simulate
+ * it by skipping ahead */
+
+ pa_memblockq_seek(ps->memblockq, chunk->length, PA_SEEK_RELATIVE);
+
+ /* Notify the user */
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, ps->index);
+ pa_pstream_send_tagstruct(p, t);
+ }
+
+ ps->underrun = 0;
+
pa_sink_notify(ps->sink_input->sink);
-/* pa_log(__FILE__": Recieved %u bytes.\n", chunk->length); */
} else {
struct upload_stream *u = (struct upload_stream*) stream;