summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2007-07-31 22:44:53 +0000
committerLennart Poettering <lennart@poettering.net>2007-07-31 22:44:53 +0000
commit0defdfb5607889c35fdefff4af31eb8b0ae0cbcf (patch)
tree8a8a93281aa32baa815185de5b2cb869f30376bf
parenta82505e72f6680258b8162b846c98c64bea45c37 (diff)
A lot of updates, all necessary to get the native protocol ported:
* add an int64_t argument to pa_asyncmsgq because it is very difficult to pass 64 values otherwise * simplify subclassing in pa_object * s/drop/unlink/ at some places * port the native protocol to the lock-free core (not tested, compiles fine) * move synchronisation of playback streams into pa_sink_input * add "start_corked" field to pa_sink_input_new_data * allow casting of NULL values in pa_object git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1562 fefdeb5f-60dc-0310-8127-8f9354f1896f
-rw-r--r--src/Makefile.am43
-rw-r--r--src/modules/module-alsa-sink.c13
-rw-r--r--src/modules/module-alsa-source.c13
-rw-r--r--src/modules/module-null-sink.c13
-rw-r--r--src/modules/module-oss.c21
-rw-r--r--src/modules/module-pipe-sink.c13
-rw-r--r--src/modules/module-pipe-source.c9
-rw-r--r--src/pulsecore/asyncmsgq.c20
-rw-r--r--src/pulsecore/asyncmsgq.h8
-rw-r--r--src/pulsecore/core.c11
-rw-r--r--src/pulsecore/msgobject.c13
-rw-r--r--src/pulsecore/msgobject.h8
-rw-r--r--src/pulsecore/native-common.h2
-rw-r--r--src/pulsecore/object.c13
-rw-r--r--src/pulsecore/object.h25
-rw-r--r--src/pulsecore/protocol-native.c1451
-rw-r--r--src/pulsecore/protocol-simple.c114
-rw-r--r--src/pulsecore/sink-input.c64
-rw-r--r--src/pulsecore/sink-input.h9
-rw-r--r--src/pulsecore/sink.c58
-rw-r--r--src/pulsecore/sink.h2
-rw-r--r--src/pulsecore/sound-file-stream.c8
-rw-r--r--src/pulsecore/source-output.c18
-rw-r--r--src/pulsecore/source-output.h4
-rw-r--r--src/pulsecore/source.c20
-rw-r--r--src/pulsecore/source.h2
-rw-r--r--src/tests/asyncmsgq-test.c10
27 files changed, 1222 insertions, 763 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 1afe6d6e..4083ea55 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -722,10 +722,10 @@ modlibexec_LTLIBRARIES = \
libauthkey-prop.la \
libstrlist.la \
libprotocol-simple.la \
- libprotocol-http.la
+ libprotocol-http.la \
+ libprotocol-native.la
# libprotocol-esound.la
-# libprotocol-native.la
# We need to emulate sendmsg/recvmsg to support this on Win32
if !OS_IS_WIN32
@@ -879,11 +879,10 @@ modlibexec_LTLIBRARIES += \
module-volume-restore.la \
module-rescue-streams.la \
module-http-protocol-tcp.la \
- module-sine.la
-
+ module-sine.la \
+ module-native-protocol-tcp.la \
+ module-native-protocol-fd.la
# module-esound-protocol-tcp.la \
-# module-native-protocol-tcp.la \
-# module-native-protocol-fd.la \
# module-combine.la \
# module-tunnel-sink.la \
# module-tunnel-source.la \
@@ -899,10 +898,10 @@ modlibexec_LTLIBRARIES += \
if HAVE_AF_UNIX
modlibexec_LTLIBRARIES += \
module-cli-protocol-unix.la \
- module-simple-protocol-unix.la
- module-http-protocol-unix.la
-# module-esound-protocol-unix.la \
-# module-native-protocol-unix.la
+ module-simple-protocol-unix.la \
+ module-http-protocol-unix.la \
+ module-native-protocol-unix.la
+# module-esound-protocol-unix.la
endif
if HAVE_MKFIFO
@@ -1083,20 +1082,20 @@ module_http_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-h
# Native protocol
-#module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
-#module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-#module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
-#module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
+module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
+module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
+module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
-#module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
-#module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-#module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
-#module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
+module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
+module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
+module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
-#module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
-#module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
-#module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
-#module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
+module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
+module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
+module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
+module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
# EsounD protocol
diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c
index 9ca881d1..551bad89 100644
--- a/src/modules/module-alsa-sink.c
+++ b/src/modules/module-alsa-sink.c
@@ -302,7 +302,7 @@ fail:
return -1;
}
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
switch (code) {
@@ -347,7 +347,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
break;
}
- return pa_sink_process_msg(o, code, data, chunk);
+ return pa_sink_process_msg(o, code, data, offset, chunk);
}
static int mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
@@ -510,12 +510,13 @@ static void thread_func(void *userdata) {
int code;
void *data;
pa_memchunk chunk;
+ int64_t offset;
int r;
/* pa_log("loop"); */
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
/* pa_log("processing msg"); */
@@ -525,7 +526,7 @@ static void thread_func(void *userdata) {
goto finish;
}
- ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
@@ -660,7 +661,7 @@ static void thread_func(void *userdata) {
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
@@ -893,7 +894,7 @@ void pa__done(pa_core *c, pa_module*m) {
pa_sink_disconnect(u->sink);
if (u->thread) {
- pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_thread_free(u->thread);
}
diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c
index 59414d33..c2dad6f9 100644
--- a/src/modules/module-alsa-source.c
+++ b/src/modules/module-alsa-source.c
@@ -290,7 +290,7 @@ fail:
return -1;
}
-static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SOURCE(o)->userdata;
switch (code) {
@@ -335,7 +335,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk
break;
}
- return pa_source_process_msg(o, code, data, chunk);
+ return pa_source_process_msg(o, code, data, offset, chunk);
}
static int mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
@@ -498,12 +498,13 @@ static void thread_func(void *userdata) {
int code;
void *data;
int r;
+ int64_t offset;
pa_memchunk chunk;
/* pa_log("loop"); */
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
/* pa_log("processing msg"); */
@@ -513,7 +514,7 @@ static void thread_func(void *userdata) {
goto finish;
}
- ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
@@ -634,7 +635,7 @@ static void thread_func(void *userdata) {
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
@@ -864,7 +865,7 @@ void pa__done(pa_core *c, pa_module*m) {
pa_source_disconnect(u->source);
if (u->thread) {
- pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_thread_free(u->thread);
}
diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c
index bb0a5045..f0e3a061 100644
--- a/src/modules/module-null-sink.c
+++ b/src/modules/module-null-sink.c
@@ -83,7 +83,7 @@ static const char* const valid_modargs[] = {
NULL
};
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
switch (code) {
@@ -107,7 +107,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
}
}
- return pa_sink_process_msg(o, code, data, chunk);
+ return pa_sink_process_msg(o, code, data, offset, chunk);
}
static void thread_func(void *userdata) {
@@ -131,9 +131,10 @@ static void thread_func(void *userdata) {
pa_memchunk chunk;
int r, timeout;
struct timeval now;
+ int64_t offset;
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
if (!object && code == PA_MESSAGE_SHUTDOWN) {
@@ -141,7 +142,7 @@ static void thread_func(void *userdata) {
goto finish;
}
- ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
@@ -190,7 +191,7 @@ static void thread_func(void *userdata) {
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
@@ -271,7 +272,7 @@ void pa__done(pa_core *c, pa_module*m) {
pa_sink_disconnect(u->sink);
if (u->thread) {
- pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_thread_free(u->thread);
}
diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c
index 63f4d40e..a43bdb3c 100644
--- a/src/modules/module-oss.c
+++ b/src/modules/module-oss.c
@@ -581,7 +581,7 @@ fail:
return -1;
}
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
int do_trigger = 0, ret, quick = 1;
@@ -673,7 +673,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
break;
}
- ret = pa_sink_process_msg(o, code, data, chunk);
+ ret = pa_sink_process_msg(o, code, data, offset, chunk);
if (do_trigger)
trigger(u, quick);
@@ -681,7 +681,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
return ret;
}
-static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SOURCE(o)->userdata;
int do_trigger = 0, ret, quick = 1;
@@ -770,7 +770,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk
break;
}
- ret = pa_source_process_msg(o, code, data, chunk);
+ ret = pa_source_process_msg(o, code, data, offset, chunk);
if (do_trigger)
trigger(u, quick);
@@ -807,11 +807,12 @@ static void thread_func(void *userdata) {
void *data;
pa_memchunk chunk;
int r;
+ int64_t offset;
/* pa_log("loop"); */
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
/* pa_log("processing msg"); */
@@ -821,7 +822,7 @@ static void thread_func(void *userdata) {
goto finish;
}
- ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
@@ -1051,7 +1052,7 @@ static void thread_func(void *userdata) {
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
@@ -1300,9 +1301,9 @@ go_on:
/* Read mixer settings */
if (u->source)
- pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_VOLUME, &u->source->volume, NULL, NULL);
+ pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_VOLUME, &u->source->volume, 0, NULL, NULL);
if (u->sink)
- pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_VOLUME, &u->sink->volume, NULL, NULL);
+ pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_VOLUME, &u->sink->volume, 0, NULL, NULL);
return 0;
@@ -1335,7 +1336,7 @@ void pa__done(pa_core *c, pa_module*m) {
pa_source_disconnect(u->source);
if (u->thread) {
- pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_thread_free(u->thread);
}
diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c
index db8b2e10..83ee06b7 100644
--- a/src/modules/module-pipe-sink.c
+++ b/src/modules/module-pipe-sink.c
@@ -84,7 +84,7 @@ static const char* const valid_modargs[] = {
NULL
};
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
switch (code) {
@@ -103,7 +103,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
}
}
- return pa_sink_process_msg(o, code, data, chunk);
+ return pa_sink_process_msg(o, code, data, offset, chunk);
}
static void thread_func(void *userdata) {
@@ -133,9 +133,10 @@ static void thread_func(void *userdata) {
void *data;
pa_memchunk chunk;
int r;
+ int64_t offset;
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
if (!object && code == PA_MESSAGE_SHUTDOWN) {
@@ -143,7 +144,7 @@ static void thread_func(void *userdata) {
goto finish;
}
- ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
@@ -224,7 +225,7 @@ static void thread_func(void *userdata) {
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
@@ -326,7 +327,7 @@ void pa__done(pa_core *c, pa_module*m) {
pa_sink_disconnect(u->sink);
if (u->thread) {
- pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_thread_free(u->thread);
}
diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c
index 5dbb1e7b..a5f95f9a 100644
--- a/src/modules/module-pipe-source.c
+++ b/src/modules/module-pipe-source.c
@@ -111,9 +111,10 @@ static void thread_func(void *userdata) {
void *data;
pa_memchunk chunk;
int r;
+ int64_t offset;
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
if (!object && code == PA_MESSAGE_SHUTDOWN) {
@@ -121,7 +122,7 @@ static void thread_func(void *userdata) {
goto finish;
}
- ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
@@ -202,7 +203,7 @@ static void thread_func(void *userdata) {
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
@@ -303,7 +304,7 @@ void pa__done(pa_core *c, pa_module*m) {
pa_source_disconnect(u->source);
if (u->thread) {
- pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_thread_free(u->thread);
}
diff --git a/src/pulsecore/asyncmsgq.c b/src/pulsecore/asyncmsgq.c
index 1b6d8025..ed71d374 100644
--- a/src/pulsecore/asyncmsgq.c
+++ b/src/pulsecore/asyncmsgq.c
@@ -46,6 +46,7 @@ struct asyncmsgq_item {
pa_msgobject *object;
void *userdata;
pa_free_cb_t free_cb;
+ int64_t offset;
pa_memchunk memchunk;
pa_semaphore *semaphore;
int ret;
@@ -96,7 +97,7 @@ void pa_asyncmsgq_free(pa_asyncmsgq *a) {
pa_xfree(a);
}
-void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
+void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
struct asyncmsgq_item *i;
pa_assert(a);
@@ -107,6 +108,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
i->object = object ? pa_msgobject_ref(object) : NULL;
i->userdata = (void*) userdata;
i->free_cb = free_cb;
+ i->offset = offset;
if (chunk) {
pa_assert(chunk->memblock);
i->memchunk = *chunk;
@@ -121,7 +123,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
pa_mutex_unlock(a->mutex);
}
-int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk) {
+int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
struct asyncmsgq_item i;
pa_assert(a);
@@ -130,6 +132,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
i.userdata = (void*) userdata;
i.free_cb = NULL;
i.ret = -1;
+ i.offset = offset;
if (chunk) {
pa_assert(chunk->memblock);
i.memchunk = *chunk;
@@ -148,7 +151,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
return i.ret;
}
-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
pa_assert(a);
pa_assert(code);
pa_assert(!a->current);
@@ -163,6 +166,8 @@ int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **u
*code = a->current->code;
if (userdata)
*userdata = a->current->userdata;
+ if (offset)
+ *offset = a->current->offset;
if (object) {
if ((*object = a->current->object))
pa_msgobject_assert_ref(*object);
@@ -207,13 +212,14 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
do {
pa_msgobject *o;
void *data;
+ int64_t offset;
pa_memchunk chunk;
int ret;
- if (pa_asyncmsgq_get(a, &o, &c, &data, &chunk, 1) < 0)
+ if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
return -1;
- ret = pa_asyncmsgq_dispatch(o, c, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
pa_asyncmsgq_done(a, ret);
} while (c != code);
@@ -239,10 +245,10 @@ void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
pa_asyncq_after_poll(a->asyncq);
}
-int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) {
+int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
if (object)
- return object->process_msg(object, code, userdata, memchunk);
+ return object->process_msg(object, code, userdata, offset, memchunk);
return 0;
}
diff --git a/src/pulsecore/asyncmsgq.h b/src/pulsecore/asyncmsgq.h
index 17b37e4b..b0f1a6e4 100644
--- a/src/pulsecore/asyncmsgq.h
+++ b/src/pulsecore/asyncmsgq.h
@@ -57,11 +57,11 @@ typedef struct pa_asyncmsgq pa_asyncmsgq;
pa_asyncmsgq* pa_asyncmsgq_new(size_t size);
void pa_asyncmsgq_free(pa_asyncmsgq* q);
-void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
-int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *memchunk);
+void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
+int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk);
-int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, pa_memchunk *memchunk, int wait);
-int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk);
+int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, int wait);
+int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk);
void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
diff --git a/src/pulsecore/core.c b/src/pulsecore/core.c
index a940bfc0..1a0e50bb 100644
--- a/src/pulsecore/core.c
+++ b/src/pulsecore/core.c
@@ -49,9 +49,9 @@
#include "core.h"
-static PA_DEFINE_CHECK_TYPE(pa_core, core_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_core, pa_msgobject);
-static int core_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+static int core_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_core *c = PA_CORE(o);
pa_core_assert_ref(c);
@@ -79,13 +79,14 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even
pa_msgobject *object;
int code;
void *data;
+ int64_t offset;
pa_memchunk chunk;
/* Check whether there is a message for us to process */
- while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
- ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
pa_asyncmsgq_done(c->asyncmsgq, ret);
}
@@ -116,7 +117,7 @@ pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
}
}
- c = pa_msgobject_new(pa_core, core_check_type);
+ c = pa_msgobject_new(pa_core);
c->parent.parent.free = core_free;
c->parent.process_msg = core_process_msg;
diff --git a/src/pulsecore/msgobject.c b/src/pulsecore/msgobject.c
index 6db630c5..f54e69f2 100644
--- a/src/pulsecore/msgobject.c
+++ b/src/pulsecore/msgobject.c
@@ -28,15 +28,22 @@
#include "msgobject.h"
-PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_msgobject_check_type, pa_object_check_type);
+PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_object);
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name)) {
pa_msgobject *o;
pa_assert(size > sizeof(pa_msgobject));
pa_assert(type_name);
- o = PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type ? check_type : pa_msgobject_check_type));
+ if (!check_type)
+ check_type = pa_msgobject_check_type;
+
+ pa_assert(check_type(type_name));
+ pa_assert(check_type("pa_object"));
+ pa_assert(check_type("pa_msgobject"));
+
+ o = PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type));
o->process_msg = NULL;
return o;
}
diff --git a/src/pulsecore/msgobject.h b/src/pulsecore/msgobject.h
index 65761aea..8221cc33 100644
--- a/src/pulsecore/msgobject.h
+++ b/src/pulsecore/msgobject.h
@@ -37,14 +37,14 @@ typedef struct pa_msgobject pa_msgobject;
struct pa_msgobject {
pa_object parent;
- int (*process_msg)(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+ int (*process_msg)(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
};
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name));
-int pa_msgobject_check_type(pa_object *o, const char *type);
+int pa_msgobject_check_type(const char *type);
-#define pa_msgobject_new(type, check_type) ((type*) pa_msgobject_new_internal(sizeof(type), #type, check_type))
+#define pa_msgobject_new(type) ((type*) pa_msgobject_new_internal(sizeof(type), #type, type##_check_type))
#define pa_msgobject_free ((void (*) (pa_msgobject* o)) pa_object_free)
#define PA_MSGOBJECT(o) pa_msgobject_cast(o)
diff --git a/src/pulsecore/native-common.h b/src/pulsecore/native-common.h
index f7a7da1d..d22c8d12 100644
--- a/src/pulsecore/native-common.h
+++ b/src/pulsecore/native-common.h
@@ -115,6 +115,8 @@ enum {
PA_COMMAND_MOVE_SINK_INPUT,
PA_COMMAND_MOVE_SOURCE_OUTPUT,
+ PA_COMMAND_SET_SINK_INPUT_MUTE,
+
PA_COMMAND_MAX
};
diff --git a/src/pulsecore/object.c b/src/pulsecore/object.c
index a983c5ae..23a45754 100644
--- a/src/pulsecore/object.c
+++ b/src/pulsecore/object.c
@@ -28,17 +28,23 @@
#include "object.h"
-pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name)) {
pa_object *o;
pa_assert(size > sizeof(pa_object));
pa_assert(type_name);
+ if (!check_type)
+ check_type = pa_object_check_type;
+
+ pa_assert(check_type(type_name));
+ pa_assert(check_type("pa_object"));
+
o = pa_xmalloc(size);
PA_REFCNT_INIT(o);
o->type_name = type_name;
o->free = pa_object_free;
- o->check_type = check_type ? check_type : pa_object_check_type;
+ o->check_type = check_type;
return o;
}
@@ -59,8 +65,7 @@ void pa_object_unref(pa_object *o) {
}
}
-int pa_object_check_type(pa_object *o, const char *type_name) {
- pa_assert(o);
+int pa_object_check_type(const char *type_name) {
pa_assert(type_name);
return type_name == "pa_object" || strcmp(type_name, "pa_object") == 0;
diff --git a/src/pulsecore/object.h b/src/pulsecore/object.h
index 270f289d..9c62f74a 100644
--- a/src/pulsecore/object.h
+++ b/src/pulsecore/object.h
@@ -38,20 +38,19 @@ struct pa_object {
PA_REFCNT_DECLARE;
const char *type_name;
void (*free)(pa_object *o);
- int (*check_type)(pa_object *o, const char *type_name);
+ int (*check_type)(const char *type_name);
};
-pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
-#define pa_object_new(type, check_type) ((type*) pa_object_new_internal(sizeof(type), #type, check_type)
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name));
+#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), #type, type##_check_type)
#define pa_object_free ((void (*) (pa_object* o)) pa_xfree)
-int pa_object_check_type(pa_object *o, const char *type);
+int pa_object_check_type(const char *type);
static inline int pa_object_isinstance(void *o) {
pa_object *obj = (pa_object*) o;
- pa_assert(obj);
- return obj->check_type(obj, "pa_object");
+ return obj ? obj->check_type("pa_object") : 0;
}
pa_object *pa_object_ref(pa_object *o);
@@ -63,19 +62,18 @@ static inline int pa_object_refcnt(pa_object *o) {
static inline pa_object* pa_object_cast(void *o) {
pa_object *obj = (pa_object*) o;
- pa_assert(obj->check_type(obj, "pa_object"));
+ pa_assert(!obj || obj->check_type("pa_object"));
return obj;
}
-#define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o))
+#define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o) > 0)
#define PA_OBJECT(o) pa_object_cast(o)
#define PA_DECLARE_CLASS(c) \
static inline int c##_isinstance(void *o) { \
pa_object *obj = (pa_object*) o; \
- pa_assert(obj); \
- return obj->check_type(obj, #c); \
+ return obj ? obj->check_type(#c) : 1; \
} \
static inline c* c##_cast(void *o) { \
pa_assert(c##_isinstance(o)); \
@@ -95,14 +93,13 @@ static inline pa_object* pa_object_cast(void *o) {
} \
struct __stupid_useless_struct_to_allow_trailing_semicolon
-#define PA_DEFINE_CHECK_TYPE(c, func, parent) \
- int func(pa_object *o, const char *type) { \
- pa_assert(o); \
+#define PA_DEFINE_CHECK_TYPE(c, parent) \
+ int c##_check_type(const char *type) { \
pa_assert(type); \
if (type == #c || \
strcmp(type, #c) == 0) \
return 1; \
- return parent(o, type); \
+ return parent##_check_type(type); \
} \
struct __stupid_useless_struct_to_allow_trailing_semicolon
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index 97345f00..3be5eae8 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -28,7 +28,6 @@
#include <string.h>
#include <stdio.h>
-#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
@@ -72,54 +71,57 @@
#define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
-struct connection;
+typedef struct connection connection;
struct pa_protocol_native;
-struct record_stream {
- struct connection *connection;
+typedef struct record_stream {
+ pa_msgobject parent;
+
+ connection *connection;
uint32_t index;
+
pa_source_output *source_output;
pa_memblockq *memblockq;
size_t fragment_size;
-};
+} record_stream;
+
+typedef struct output_stream {
+ pa_msgobject parent;
+} output_stream;
-struct playback_stream {
- int type;
- struct connection *connection;
+typedef struct playback_stream {
+ output_stream parent;
+
+ connection *connection;
uint32_t index;
+
pa_sink_input *sink_input;
pa_memblockq *memblockq;
- size_t requested_bytes;
int drain_request;
uint32_t drain_tag;
uint32_t syncid;
int underrun;
- /* Sync group members */
- PA_LLIST_FIELDS(struct playback_stream);
-};
+ pa_atomic_t missing;
+ size_t last_missing;
+} playback_stream;
-struct upload_stream {
- int type;
- struct connection *connection;
+typedef struct upload_stream {
+ output_stream parent;
+
+ connection *connection;
uint32_t index;
+
pa_memchunk memchunk;
size_t length;
char *name;
pa_sample_spec sample_spec;
pa_channel_map channel_map;
-};
-
-struct output_stream {
- int type;
-};
-
-enum {
- UPLOAD_STREAM,
- PLAYBACK_STREAM
-};
+} upload_stream;
struct connection {
+ pa_msgobject parent;
+
int authorized;
uint32_t version;
pa_protocol_native *protocol;
@@ -132,10 +134,31 @@ struct connection {
pa_time_event *auth_timeout_event;
};
+
+PA_DECLARE_CLASS(record_stream);
+#define RECORD_STREAM(o) (record_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
+
+PA_DECLARE_CLASS(output_stream);
+#define OUTPUT_STREAM(o) (output_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
+
+PA_DECLARE_CLASS(playback_stream);
+#define PLAYBACK_STREAM(o) (playback_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
+
+PA_DECLARE_CLASS(upload_stream);
+#define UPLOAD_STREAM(o) (upload_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
+
+PA_DECLARE_CLASS(connection);
+#define CONNECTION(o) (connection_cast(o))
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
+
struct pa_protocol_native {
pa_module *module;
- int public;
pa_core *core;
+ int public;
pa_socket_server *server;
pa_idxset *connections;
uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
@@ -146,17 +169,39 @@ struct pa_protocol_native {
pa_ip_acl *auth_ip_acl;
};
+enum {
+ SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
+ SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
+ SINK_INPUT_MESSAGE_FLUSH,
+ SINK_INPUT_MESSAGE_TRIGGER,
+ SINK_INPUT_MESSAGE_SEEK,
+ SINK_INPUT_MESSAGE_PREBUF_FORCE
+};
+
+enum {
+ PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
+ PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
+ PLAYBACK_STREAM_MESSAGE_OVERFLOW,
+ PLAYBACK_STREAM_MESSAGE_DRAIN_ACK
+};
+
+enum {
+ RECORD_STREAM_MESSAGE_POST_DATA /* data from source output to main loop */
+};
+
static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk);
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length);
+static void sink_input_drop_cb(pa_sink_input *i, size_t length);
static void sink_input_kill_cb(pa_sink_input *i);
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i);
+static void send_memblock(connection *c);
static void request_bytes(struct playback_stream*s);
static void source_output_kill_cb(pa_source_output *o);
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
+
static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -179,8 +224,7 @@ static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag,
static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_flush_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_trigger_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -239,12 +283,13 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
[PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
+ [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
[PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
[PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
- [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_flush_playback_stream,
- [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
- [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
+ [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
+ [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
+ [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
[PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
[PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
@@ -269,74 +314,145 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
/* structure management */
-static struct upload_stream* upload_stream_new(
- struct connection *c,
- const pa_sample_spec *ss,
- const pa_channel_map *map,
- const char *name, size_t length) {
+static void upload_stream_unlink(upload_stream *s) {
+ pa_assert(s);
+
+ if (!s->connection)
+ return;
+
+ pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
+ upload_stream_unref(s);
+ s->connection = NULL;
+}
+
+static void upload_stream_free(pa_object *o) {
+ upload_stream *s = UPLOAD_STREAM(o);
+ pa_assert(s);
- struct upload_stream *s;
- assert(c && ss && name && length);
+ upload_stream_unlink(s);
- s = pa_xnew(struct upload_stream, 1);
- s->type = UPLOAD_STREAM;
+ pa_xfree(s->name);
+
+ if (s->memchunk.memblock)
+ pa_memblock_unref(s->memchunk.memblock);
+
+ pa_xfree(s);
+}
+
+static upload_stream* upload_stream_new(
+ connection *c,
+ const pa_sample_spec *ss,
+ const pa_channel_map *map,
+ const char *name, size_t length) {
+
+ upload_stream *s;
+
+ pa_assert(c);
+ pa_assert(ss);
+ pa_assert(name);
+ pa_assert(length > 0);
+
+ s = pa_msgobject_new(upload_stream);
+ c->parent.parent.free = upload_stream_free;
s->connection = c;
s->sample_spec = *ss;
s->channel_map = *map;
s->name = pa_xstrdup(name);
-
- s->memchunk.memblock = NULL;
- s->memchunk.index = 0;
- s->memchunk.length = 0;
-
+ pa_memchunk_reset(&s->memchunk);
s->length = length;
pa_idxset_put(c->output_streams, s, &s->index);
+
return s;
}
-static void upload_stream_free(struct upload_stream *o) {
- assert(o && o->connection);
+static void record_stream_unlink(record_stream *s) {
+ pa_assert(s);
- pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
+ if (!s->connection)
+ return;
- pa_xfree(o->name);
+ if (s->source_output) {
+ pa_source_output_disconnect(s->source_output);
+ pa_source_output_unref(s->source_output);
+ s->source_output = NULL;
+ }
+
+ pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
+ record_stream_unref(s);
+ s->connection = NULL;
+}
+
+static void record_stream_free(pa_object *o) {
+ record_stream *s = RECORD_STREAM(o);
+ pa_assert(s);
+
+ record_stream_unlink(s);
+
+ pa_memblockq_free(s->memblockq);
+ pa_xfree(s);
+}
- if (o->memchunk.memblock)
- pa_memblock_unref(o->memchunk.memblock);
+static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
+ record_stream *s = RECORD_STREAM(o);
+ record_stream_assert_ref(s);
+
+ switch (code) {
+
+ case RECORD_STREAM_MESSAGE_POST_DATA:
+
+ if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+ pa_log_warn("Failed to push data into output queue.");
+ return -1;
+ }
+
+ if (!pa_pstream_is_pending(s->connection->pstream))
+ send_memblock(s->connection);
+
+ pa_pstream_send_memblock(s->connection->pstream, s->index, 0, PA_SEEK_RELATIVE, chunk);
+ break;
+ }
- pa_xfree(o);
+ return 0;
}
-static struct record_stream* record_stream_new(
- struct connection *c,
- pa_source *source,
- const pa_sample_spec *ss,
- const pa_channel_map *map,
- const char *name,
- size_t maxlength,
- size_t fragment_size) {
+static record_stream* record_stream_new(
+ connection *c,
+ pa_source *source,
+ const pa_sample_spec *ss,
+ const pa_channel_map *map,
+ const char *name,
+ size_t *maxlength,
+ size_t fragment_size,
+ int corked) {
- struct record_stream *s;
+ record_stream *s;
pa_source_output *source_output;
size_t base;
pa_source_output_new_data data;
- assert(c && ss && name && maxlength);
+ pa_assert(c);
+ pa_assert(ss);
+ pa_assert(name);
+ pa_assert(maxlength);
+ pa_assert(*maxlength > 0);
pa_source_output_new_data_init(&data);
+ data.module = c->protocol->module;
+ data.client = c->client;
data.source = source;
data.driver = __FILE__;
data.name = name;
+ data.corked = corked;
pa_source_output_new_data_set_sample_spec(&data, ss);
pa_source_output_new_data_set_channel_map(&data, map);
- data.module = c->protocol->module;
- data.client = c->client;
if (!(source_output = pa_source_output_new(c->protocol->core, &data, 0)))
return NULL;
- s = pa_xnew(struct record_stream, 1);
+ s = pa_msgobject_new(record_stream);
+ c->parent.parent.free = record_stream_free;
+ c->parent.process_msg = record_stream_process_msg;
s->connection = c;
s->source_output = source_output;
s->source_output->push = source_output_push_cb;
@@ -346,58 +462,143 @@ static struct record_stream* record_stream_new(
s->memblockq = pa_memblockq_new(
0,
- maxlength,
+ *maxlength,
0,
base = pa_frame_size(ss),
1,
0,
NULL);
- assert(s->memblockq);
s->fragment_size = (fragment_size/base)*base;
- if (!s->fragment_size)
+ if (s->fragment_size <= 0)
s->fragment_size = base;
+ *maxlength = pa_memblockq_get_maxlength(s->memblockq);
pa_idxset_put(c->record_streams, s, &s->index);
+
+ pa_source_output_put(s->source_output);
return s;
}
-static void record_stream_free(struct record_stream* r) {
- assert(r && r->connection);
+static void playback_stream_unlink(playback_stream *s) {
+ pa_assert(s);
+
+ if (!s->connection)
+ return;
- pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
- pa_source_output_disconnect(r->source_output);
- pa_source_output_unref(r->source_output);
- pa_memblockq_free(r->memblockq);
- pa_xfree(r);
+ if (s->sink_input) {
+ pa_sink_input_disconnect(s->sink_input);
+ pa_sink_input_unref(s->sink_input);
+ s->sink_input = NULL;
+ }
+
+ if (s->drain_request)
+ pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
+
+ pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
+ playback_stream_unref(s);
+ s->connection = NULL;
+}
+
+static void playback_stream_free(pa_object* o) {
+ playback_stream *s = PLAYBACK_STREAM(o);
+ pa_assert(s);
+
+ playback_stream_unlink(s);
+
+ pa_memblockq_free(s->memblockq);
+ pa_xfree(s);
}
-static struct playback_stream* playback_stream_new(
- struct connection *c,
+static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
+ playback_stream *s = PLAYBACK_STREAM(o);
+ playback_stream_assert_ref(s);
+
+ switch (code) {
+ case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
+ pa_tagstruct *t;
+ int32_t l;
+
+ if ((l = pa_atomic_load(&s->missing)) <= 0)
+ break;
+
+ pa_assert_se(pa_atomic_sub(&s->missing, l) >= l);
+
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, s->index);
+ pa_tagstruct_putu32(t, l);
+ pa_pstream_send_tagstruct(s->connection->pstream, t);
+
+ /* pa_log("Requesting %u bytes", l); */
+ break;
+ }
+
+ case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
+ pa_tagstruct *t;
+
+ /* Report that we're empty */
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, s->index);
+ pa_pstream_send_tagstruct(s->connection->pstream, t);
+ break;
+ }
+
+ case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
+ pa_tagstruct *t;
+
+ /* Notify the user we're overflowed*/
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, s->index);
+ pa_pstream_send_tagstruct(s->connection->pstream, t);
+ break;
+ }
+
+ case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
+ pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
+ break;
+
+ }
+
+ return 0;
+}
+
+static playback_stream* playback_stream_new(
+ connection *c,
pa_sink *sink,
const pa_sample_spec *ss,
const pa_channel_map *map,
const char *name,
- size_t maxlength,
- size_t tlength,
- size_t prebuf,
- size_t minreq,
+ size_t *maxlength,
+ size_t *tlength,
+ size_t *prebuf,
+ size_t *minreq,
pa_cvolume *volume,
- uint32_t syncid) {
+ uint32_t syncid,
+ int corked,
+ size_t *missing) {
- struct playback_stream *s, *ssync;
+ playback_stream *s, *ssync;
pa_sink_input *sink_input;
pa_memblock *silence;
uint32_t idx;
int64_t start_index;
pa_sink_input_new_data data;
- assert(c && ss && name && maxlength);
+ pa_assert(c);
+ pa_assert(ss);
+ pa_assert(name);
+ pa_assert(maxlength);
/* Find syncid group */
for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
- if (ssync->type != PLAYBACK_STREAM)
+ if (!playback_stream_isinstance(ssync))
continue;
if (ssync->syncid == syncid)
@@ -405,8 +606,13 @@ static struct playback_stream* playback_stream_new(
}
/* Synced streams must connect to the same sink */
- if (ssync)
- sink = ssync->sink_input->sink;
+ if (ssync) {
+
+ if (!sink)
+ sink = ssync->sink_input->sink;
+ else if (sink != ssync->sink_input->sink)
+ return NULL;
+ }
pa_sink_input_new_data_init(&data);
data.sink = sink;
@@ -417,146 +623,136 @@ static struct playback_stream* playback_stream_new(
pa_sink_input_new_data_set_volume(&data, volume);
data.module = c->protocol->module;
data.client = c->client;
+ data.start_corked = corked;
+ data.sync_base = ssync ? ssync->sink_input : NULL;
if (!(sink_input = pa_sink_input_new(c->protocol->core, &data, 0)))
return NULL;
- s = pa_xnew(struct playback_stream, 1);
- s->type = PLAYBACK_STREAM;
+ s = pa_msgobject_new(playback_stream);
+ c->parent.parent.free = playback_stream_free;
+ c->parent.process_msg = playback_stream_process_msg;
s->connection = c;
s->syncid = syncid;
s->sink_input = sink_input;
s->underrun = 1;
+ s->sink_input->parent.process_msg = sink_input_process_msg;
s->sink_input->peek = sink_input_peek_cb;
s->sink_input->drop = sink_input_drop_cb;
s->sink_input->kill = sink_input_kill_cb;
- s->sink_input->get_latency = sink_input_get_latency_cb;
s->sink_input->userdata = s;
- if (ssync) {
- /* Sync id found, now find head of list */
- PA_LLIST_FIND_HEAD(struct playback_stream, ssync, &ssync);
-
- /* Prepend ourselves */
- PA_LLIST_PREPEND(struct playback_stream, ssync, s);
-
- /* Set our start index to the current read index of the other grozp member(s) */
- assert(ssync->next);
- start_index = pa_memblockq_get_read_index(ssync->next->memblockq);
- } else {
- /* This ia a new sync group */
- PA_LLIST_INIT(struct playback_stream, s);
- start_index = 0;
- }
+ start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
silence = pa_silence_memblock_new(c->protocol->core->mempool, ss, 0);
s->memblockq = pa_memblockq_new(
start_index,
- maxlength,
- tlength,
+ *maxlength,
+ *tlength,
pa_frame_size(ss),
- prebuf,
- minreq,
+ *prebuf,
+ *minreq,
silence);
pa_memblock_unref(silence);
- s->requested_bytes = 0;
+ *maxlength = pa_memblockq_get_maxlength(s->memblockq);
+ *tlength = pa_memblockq_get_tlength(s->memblockq);
+ *prebuf = pa_memblockq_get_prebuf(s->memblockq);
+ *minreq = pa_memblockq_get_minreq(s->memblockq);
+ *missing = pa_memblockq_missing(s->memblockq);
+
+ pa_atomic_store(&s->missing, 0);
+ s->last_missing = *missing;
s->drain_request = 0;
pa_idxset_put(c->output_streams, s, &s->index);
+ pa_sink_input_put(s->sink_input);
+
return s;
}
-static void playback_stream_free(struct playback_stream* p) {
- struct playback_stream *head;
- assert(p && p->connection);
+static void connection_unlink(connection *c) {
+ record_stream *r;
+ output_stream *o;
- if (p->drain_request)
- pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERR_NOENTITY);
+ pa_assert(c);
- PA_LLIST_FIND_HEAD(struct playback_stream, p, &head);
- PA_LLIST_REMOVE(struct playback_stream, head, p);
-
- pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
- pa_sink_input_disconnect(p->sink_input);
- pa_sink_input_unref(p->sink_input);
- pa_memblockq_free(p->memblockq);
- pa_xfree(p);
-}
-
-static void connection_free(struct connection *c) {
- struct record_stream *r;
- struct output_stream *o;
- assert(c && c->protocol);
+ if (!c->protocol)
+ return;
- pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
while ((r = pa_idxset_first(c->record_streams, NULL)))
- record_stream_free(r);
- pa_idxset_free(c->record_streams, NULL, NULL);
+ record_stream_unlink(r);
while ((o = pa_idxset_first(c->output_streams, NULL)))
- if (o->type == PLAYBACK_STREAM)
- playback_stream_free((struct playback_stream*) o);
+ if (playback_stream_isinstance(o))
+ playback_stream_unlink(PLAYBACK_STREAM(o));
else
- upload_stream_free((struct upload_stream*) o);
- pa_idxset_free(c->output_streams, NULL, NULL);
-
- pa_pdispatch_unref(c->pdispatch);
- pa_pstream_close(c->pstream);
- pa_pstream_unref(c->pstream);
- pa_client_free(c->client);
+ upload_stream_unlink(UPLOAD_STREAM(o));
if (c->subscription)
pa_subscription_free(c->subscription);
- if (c->auth_timeout_event)
+ if (c->pstream)
+ pa_pstream_close(c->pstream);
+
+ if (c->auth_timeout_event) {
c->protocol->core->mainloop->time_free(c->auth_timeout_event);
+ c->auth_timeout_event = NULL;
+ }
+
+ pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
+ connection_unref(c);
+ c->protocol = NULL;
+}
+
+static void connection_free(pa_object *o) {
+ connection *c = CONNECTION(o);
+
+ pa_assert(c);
+
+ connection_unlink(c);
+
+ pa_idxset_free(c->record_streams, NULL, NULL);
+ pa_idxset_free(c->output_streams, NULL, NULL);
+
+ pa_pdispatch_unref(c->pdispatch);
+ pa_pstream_unref(c->pstream);
+ pa_client_free(c->client);
pa_xfree(c);
}
-static void request_bytes(struct playback_stream *s) {
- pa_tagstruct *t;
- size_t l;
- assert(s);
+static void request_bytes(playback_stream *s) {
+ size_t new_missing, delta, previous_missing;
- if (!(l = pa_memblockq_missing(s->memblockq)))
- return;
-
- if (l <= s->requested_bytes)
- return;
+ playback_stream_assert_ref(s);
- l -= s->requested_bytes;
+ new_missing = pa_memblockq_missing(s->memblockq);
- if (l < pa_memblockq_get_minreq(s->memblockq))
+ if (new_missing <= s->last_missing)
return;
- s->requested_bytes += l;
-
- t = pa_tagstruct_new(NULL, 0);
- assert(t);
- pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
- pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
- pa_tagstruct_putu32(t, s->index);
- pa_tagstruct_putu32(t, l);
- pa_pstream_send_tagstruct(s->connection->pstream, t);
+ delta = new_missing - s->last_missing;
+ s->last_missing = new_missing;
-/* pa_log("Requesting %u bytes", l); */
+ previous_missing = pa_atomic_add(&s->missing, delta);
+ if (previous_missing < pa_memblockq_get_minreq(s->memblockq) && previous_missing+delta >= pa_memblockq_get_minreq(s->memblockq))
+ pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
}
-static void send_memblock(struct connection *c) {
+static void send_memblock(connection *c) {
uint32_t start;
- struct record_stream *r;
+ record_stream *r;
start = PA_IDXSET_INVALID;
for (;;) {
pa_memchunk chunk;
- if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
+ if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
return;
if (start == PA_IDXSET_INVALID)
@@ -571,7 +767,8 @@ static void send_memblock(struct connection *c) {
schunk.length = r->fragment_size;
pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
- pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
+
+ pa_memblockq_drop(r->memblockq, schunk.length);
pa_memblock_unref(schunk.memblock);
return;
@@ -579,9 +776,9 @@ static void send_memblock(struct connection *c) {
}
}
-static void send_playback_stream_killed(struct playback_stream *p) {
+static void send_playback_stream_killed(playback_stream *p) {
pa_tagstruct *t;
- assert(p);
+ playback_stream_assert_ref(p);
t = pa_tagstruct_new(NULL, 0);
pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
@@ -590,9 +787,9 @@ static void send_playback_stream_killed(struct playback_stream *p) {
pa_pstream_send_tagstruct(p->connection->pstream, t);
}
-static void send_record_stream_killed(struct record_stream *r) {
+static void send_record_stream_killed(record_stream *r) {
pa_tagstruct *t;
- assert(r);
+ record_stream_assert_ref(r);
t = pa_tagstruct_new(NULL, 0);
pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
@@ -603,22 +800,123 @@ static void send_record_stream_killed(struct record_stream *r) {
/*** sinkinput callbacks ***/
-static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
- struct playback_stream *s;
- assert(i && i->userdata && chunk);
- s = i->userdata;
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
+ pa_sink_input *i = PA_SINK_INPUT(o);
+ playback_stream *s;
- if (pa_memblockq_get_length(s->memblockq) <= 0 && !s->underrun) {
- pa_tagstruct *t;
+ pa_sink_input_assert_ref(i);
+ s = PLAYBACK_STREAM(i->userdata);
+ playback_stream_assert_ref(s);
+
+ switch (code) {
+
+ case SINK_INPUT_MESSAGE_SEEK:
+ pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
+ return 0;
+
+ case SINK_INPUT_MESSAGE_POST_DATA: {
+ pa_assert(chunk);
+
+ if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+
+ pa_log_warn("Failed to push data into queue");
+ pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
+ pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_RELATIVE);
+ }
+
+ s->underrun = 0;
+ return 0;
+ }
+
+ case SINK_INPUT_MESSAGE_DRAIN: {
+
+ pa_memblockq_prebuf_disable(s->memblockq);
+
+ if (!pa_memblockq_is_readable(s->memblockq))
+ pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
+ else {
+ s->drain_tag = PA_PTR_TO_UINT(userdata);
+ s->drain_request = 1;
+ }
+
+ return 0;
+ }
+
+ case SINK_INPUT_MESSAGE_FLUSH:
+ case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+ case SINK_INPUT_MESSAGE_TRIGGER: {
+
+ pa_sink_input *isync;
+ void (*func)(pa_memblockq *bq);
+
+ switch (code) {
+ case SINK_INPUT_MESSAGE_FLUSH:
+ func = pa_memblockq_flush;
+ break;
+
+ case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+ func = pa_memblockq_prebuf_force;
+ break;
+
+ case SINK_INPUT_MESSAGE_TRIGGER:
+ func = pa_memblockq_prebuf_disable;
+ break;
+
+ default:
+ pa_assert_not_reached();
+ }
+
+ func(s->memblockq);
+ s->underrun = 0;
+ request_bytes(s);
+
+ /* Do the same for all other members in the sync group */
+ for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
+ playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
+ func(ssync->memblockq);
+ ssync->underrun = 0;
+ request_bytes(ssync);
+ }
+
+ for (isync = i->sync_next; isync; isync = isync->sync_next) {
+ playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
+ func(ssync->memblockq);
+ ssync->underrun = 0;
+ request_bytes(ssync);
+ }
+
+ return 0;
+ }
+
+ case PA_SINK_INPUT_MESSAGE_SET_STATE:
+
+ pa_memblockq_prebuf_force(s->memblockq);
+ break;
+
+ case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
+ pa_usec_t *r = userdata;
+
+ *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
+
+ /* Fall through, the default handler will add in the extra
+ * latency added by the resampler */
+ break;
+ }
+ }
- /* Report that we're empty */
+ return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
+}
+
+static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
+ playback_stream *s;
- t = pa_tagstruct_new(NULL, 0);
- pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
- pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
- pa_tagstruct_putu32(t, s->index);
- pa_pstream_send_tagstruct(s->connection->pstream, t);
+ pa_sink_input_assert_ref(i);
+ s = PLAYBACK_STREAM(i->userdata);
+ playback_stream_assert_ref(s);
+ pa_assert(chunk);
+ if (pa_memblockq_get_length(s->memblockq) <= 0 && !s->underrun) {
+ pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
s->underrun = 1;
}
@@ -632,65 +930,67 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
return 0;
}
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
- struct playback_stream *s;
- assert(i && i->userdata && length);
- s = i->userdata;
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
+ playback_stream *s;
- pa_memblockq_drop(s->memblockq, chunk, length);
+ pa_sink_input_assert_ref(i);
+ s = PLAYBACK_STREAM(i->userdata);
+ playback_stream_assert_ref(s);
+ pa_assert(length > 0);
- request_bytes(s);
+ pa_memblockq_drop(s->memblockq, length);
if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
- pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
s->drain_request = 0;
+ pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
}
+ request_bytes(s);
+
/* pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq)); */
}
static void sink_input_kill_cb(pa_sink_input *i) {
- assert(i && i->userdata);
- send_playback_stream_killed((struct playback_stream *) i->userdata);
- playback_stream_free((struct playback_stream *) i->userdata);
-}
+ playback_stream *s;
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
- struct playback_stream *s;
- assert(i && i->userdata);
- s = i->userdata;
+ pa_sink_input_assert_ref(i);
+ s = PLAYBACK_STREAM(i->userdata);
+ playback_stream_assert_ref(s);
- /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
-
- return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
+ send_playback_stream_killed(s);
+ playback_stream_unlink(s);
}
/*** source_output callbacks ***/
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
- struct record_stream *s;
- assert(o && o->userdata && chunk);
- s = o->userdata;
+ record_stream *s;
- if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
- pa_log_warn("Failed to push data into output queue.");
- return;
- }
+ pa_source_output_assert_ref(o);
+ s = RECORD_STREAM(o->userdata);
+ record_stream_assert_ref(s);
+ pa_assert(chunk);
- if (!pa_pstream_is_pending(s->connection->pstream))
- send_memblock(s->connection);
+ pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
}
static void source_output_kill_cb(pa_source_output *o) {
- assert(o && o->userdata);
- send_record_stream_killed((struct record_stream *) o->userdata);
- record_stream_free((struct record_stream *) o->userdata);
+ record_stream *s;
+
+ pa_source_output_assert_ref(o);
+ s = RECORD_STREAM(o->userdata);
+ record_stream_assert_ref(s);
+
+ send_record_stream_killed(s);
+ record_stream_unlink(s);
}
static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
- struct record_stream *s;
- assert(o && o->userdata);
- s = o->userdata;
+ record_stream *s;
+
+ pa_source_output_assert_ref(o);
+ s = RECORD_STREAM(o->userdata);
+ record_stream_assert_ref(s);
/*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
@@ -699,9 +999,9 @@ static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
/*** pdispatch callbacks ***/
-static void protocol_error(struct connection *c) {
+static void protocol_error(connection *c) {
pa_log("protocol error, kicking client");
- connection_free(c);
+ connection_unlink(c);
}
#define CHECK_VALIDITY(pstream, expression, tag, error) do { \
@@ -721,9 +1021,9 @@ static pa_tagstruct *reply_new(uint32_t tag) {
}
static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
- struct playback_stream *s;
- uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid;
+ connection *c = CONNECTION(userdata);
+ playback_stream *s;
+ uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid, missing;
const char *name, *sink_name;
pa_sample_spec ss;
pa_channel_map map;
@@ -731,8 +1031,9 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
pa_sink *sink = NULL;
pa_cvolume volume;
int corked;
-
- assert(c && t && c->protocol && c->protocol->core);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_get(
t,
@@ -773,34 +1074,33 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
}
- s = playback_stream_new(c, sink, &ss, &map, name, maxlength, tlength, prebuf, minreq, &volume, syncid);
+ s = playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tlength, &prebuf, &minreq, &volume, syncid, corked, &missing);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
- pa_sink_input_cork(s->sink_input, corked);
-
reply = reply_new(tag);
pa_tagstruct_putu32(reply, s->index);
- assert(s->sink_input);
+ pa_assert(s->sink_input);
pa_tagstruct_putu32(reply, s->sink_input->index);
- pa_tagstruct_putu32(reply, s->requested_bytes = pa_memblockq_missing(s->memblockq));
+ pa_tagstruct_putu32(reply, missing);
if (c->version >= 9) {
/* Since 0.9 we support sending the buffer metrics back to the client */
- pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_maxlength(s->memblockq));
- pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_tlength(s->memblockq));
- pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_prebuf(s->memblockq));
- pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_minreq(s->memblockq));
+ pa_tagstruct_putu32(reply, (uint32_t) maxlength);
+ pa_tagstruct_putu32(reply, (uint32_t) tlength);
+ pa_tagstruct_putu32(reply, (uint32_t) prebuf);
+ pa_tagstruct_putu32(reply, (uint32_t) minreq);
}
pa_pstream_send_tagstruct(c->pstream, reply);
- request_bytes(s);
}
static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t channel;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &channel) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -810,39 +1110,52 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
- if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
- struct playback_stream *s;
- if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
- return;
+ switch (command) {
+
+ case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
+ playback_stream *s;
+ if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+ return;
+ }
+
+ playback_stream_unlink(s);
+ break;
}
-
- playback_stream_free(s);
- } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
- struct record_stream *s;
- if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
- return;
+
+ case PA_COMMAND_DELETE_RECORD_STREAM: {
+ record_stream *s;
+ if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+ return;
+ }
+
+ record_stream_unlink(s);
+ break;
}
- record_stream_free(s);
- } else {
- struct upload_stream *s;
- assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
- if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
- return;
+ case PA_COMMAND_DELETE_UPLOAD_STREAM: {
+ upload_stream *s;
+
+ if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+ return;
+ }
+
+ upload_stream_unlink(s);
+ break;
}
- upload_stream_free(s);
+ default:
+ pa_assert_not_reached();
}
pa_pstream_send_simple_ack(c->pstream, tag);
}
static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
- struct record_stream *s;
+ connection *c = CONNECTION(userdata);
+ record_stream *s;
uint32_t maxlength, fragment_size;
uint32_t source_index;
const char *name, *source_name;
@@ -851,7 +1164,9 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
pa_tagstruct *reply;
pa_source *source = NULL;
int corked;
- assert(c && t && c->protocol && c->protocol->core);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_gets(t, &name) < 0 ||
pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
@@ -882,20 +1197,18 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
}
- s = record_stream_new(c, source, &ss, &map, name, maxlength, fragment_size);
+ s = record_stream_new(c, source, &ss, &map, name, &maxlength, fragment_size, corked);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
- pa_source_output_cork(s->source_output, corked);
-
reply = reply_new(tag);
pa_tagstruct_putu32(reply, s->index);
- assert(s->source_output);
+ pa_assert(s->source_output);
pa_tagstruct_putu32(reply, s->source_output->index);
if (c->version >= 9) {
/* Since 0.9 we support sending the buffer metrics back to the client */
- pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_maxlength(s->memblockq));
+ pa_tagstruct_putu32(reply, (uint32_t) maxlength);
pa_tagstruct_putu32(reply, (uint32_t) s->fragment_size);
}
@@ -903,9 +1216,11 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
- assert(c && t);
+ connection *c = CONNECTION(userdata);
+ connection_assert_ref(c);
+ pa_assert(t);
+
if (!pa_tagstruct_eof(t)) {
protocol_error(c);
return;
@@ -913,16 +1228,17 @@ static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
- assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
}
static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const void*cookie;
pa_tagstruct *reply;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &c->version) < 0 ||
pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
@@ -1015,9 +1331,11 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
}
static void command_set_client_name(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const char *name;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_gets(t, &name) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1032,10 +1350,12 @@ static void command_set_client_name(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
}
static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const char *name;
uint32_t idx = PA_IDXSET_INVALID;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_gets(t, &name) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1052,7 +1372,7 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
idx = sink->index;
} else {
pa_source *source;
- assert(command == PA_COMMAND_LOOKUP_SOURCE);
+ pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1)))
idx = source->index;
}
@@ -1068,10 +1388,12 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
}
static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
- struct playback_stream *s;
- assert(c && t);
+ playback_stream *s;
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1082,29 +1404,18 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
s = pa_idxset_get_by_index(c->output_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
-
- s->drain_request = 0;
-
- pa_memblockq_prebuf_disable(s->memblockq);
+ CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
- if (!pa_memblockq_is_readable(s->memblockq)) {
-/* pa_log("immediate drain: %u", pa_memblockq_get_length(s->memblockq)); */
- pa_pstream_send_simple_ack(c->pstream, tag);
- } else {
-/* pa_log("slow drain triggered"); */
- s->drain_request = 1;
- s->drain_tag = tag;
-
- pa_sink_notify(s->sink_input->sink);
- }
+ pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
}
static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_tagstruct *reply;
const pa_mempool_stat *stat;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (!pa_tagstruct_eof(t)) {
protocol_error(c);
@@ -1125,13 +1436,15 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
}
static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_tagstruct *reply;
- struct playback_stream *s;
+ playback_stream *s;
struct timeval tv, now;
uint32_t idx;
pa_usec_t latency;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
pa_tagstruct_get_timeval(t, &tv) < 0 ||
@@ -1143,13 +1456,13 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
s = pa_idxset_get_by_index(c->output_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+ CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
reply = reply_new(tag);
latency = pa_sink_get_latency(s->sink_input->sink);
- if (s->sink_input->resampled_chunk.memblock)
- latency += pa_bytes_to_usec(s->sink_input->resampled_chunk.length, &s->sink_input->sample_spec);
+/* if (s->sink_input->resampled_chunk.memblock) */ /* FIXME*/
+/* latency += pa_bytes_to_usec(s->sink_input->resampled_chunk.length, &s->sink_input->sample_spec); */
pa_tagstruct_put_usec(reply, latency);
pa_tagstruct_put_usec(reply, 0);
@@ -1162,12 +1475,14 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
static void command_get_record_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_tagstruct *reply;
- struct record_stream *s;
+ record_stream *s;
struct timeval tv, now;
uint32_t idx;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
pa_tagstruct_get_timeval(t, &tv) < 0 ||
@@ -1192,14 +1507,16 @@ static void command_get_record_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
}
static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
- struct upload_stream *s;
+ connection *c = CONNECTION(userdata);
+ upload_stream *s;
uint32_t length;
const char *name;
pa_sample_spec ss;
pa_channel_map map;
pa_tagstruct *reply;
- assert(c && t && c->protocol && c->protocol->core);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_gets(t, &name) < 0 ||
pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
@@ -1228,11 +1545,13 @@ static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t channel;
- struct upload_stream *s;
+ upload_stream *s;
uint32_t idx;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &channel) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1244,23 +1563,25 @@ static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
s = pa_idxset_get_by_index(c->output_streams, channel);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, s->type == UPLOAD_STREAM, tag, PA_ERR_NOENTITY);
+ CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, &idx) < 0)
pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
else
pa_pstream_send_simple_ack(c->pstream, tag);
- upload_stream_free(s);
+ upload_stream_unlink(s);
}
static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t sink_index;
pa_volume_t volume;
pa_sink *sink;
const char *name, *sink_name;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
pa_tagstruct_gets(t, &sink_name) < 0 ||
@@ -1291,9 +1612,11 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
}
static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const char *name;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_gets(t, &name) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1313,7 +1636,9 @@ static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
}
static void sink_fill_tagstruct(pa_tagstruct *t, pa_sink *sink) {
- assert(t && sink);
+ pa_assert(t);
+ pa_sink_assert_ref(sink);
+
pa_tagstruct_put(
t,
PA_TAG_U32, sink->index,
@@ -1321,22 +1646,24 @@ static void sink_fill_tagstruct(pa_tagstruct *t, pa_sink *sink) {
PA_TAG_STRING, sink->description,
PA_TAG_SAMPLE_SPEC, &sink->sample_spec,
PA_TAG_CHANNEL_MAP, &sink->channel_map,
- PA_TAG_U32, sink->owner ? sink->owner->index : PA_INVALID_INDEX,
- PA_TAG_CVOLUME, pa_sink_get_volume(sink, PA_MIXER_HARDWARE),
- PA_TAG_BOOLEAN, pa_sink_get_mute(sink, PA_MIXER_HARDWARE),
+ PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
+ PA_TAG_CVOLUME, pa_sink_get_volume(sink),
+ PA_TAG_BOOLEAN, pa_sink_get_mute(sink),
PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
PA_TAG_USEC, pa_sink_get_latency(sink),
PA_TAG_STRING, sink->driver,
PA_TAG_U32,
- (sink->get_hw_volume ? PA_SINK_HW_VOLUME_CTRL : 0) |
- (sink->get_latency ? PA_SINK_LATENCY : 0) |
+ (sink->get_volume ? PA_SINK_HW_VOLUME_CTRL : 0) | /* FIXME */
+ (sink->get_latency ? PA_SINK_LATENCY : 0) | /* FIXME */
(sink->is_hardware ? PA_SINK_HARDWARE : 0),
PA_TAG_INVALID);
}
static void source_fill_tagstruct(pa_tagstruct *t, pa_source *source) {
- assert(t && source);
+ pa_assert(t);
+ pa_source_assert_ref(source);
+
pa_tagstruct_put(
t,
PA_TAG_U32, source->index,
@@ -1344,22 +1671,24 @@ static void source_fill_tagstruct(pa_tagstruct *t, pa_source *source) {
PA_TAG_STRING, source->description,
PA_TAG_SAMPLE_SPEC, &source->sample_spec,
PA_TAG_CHANNEL_MAP, &source->channel_map,
- PA_TAG_U32, source->owner ? source->owner->index : PA_INVALID_INDEX,
- PA_TAG_CVOLUME, pa_source_get_volume(source, PA_MIXER_HARDWARE),
- PA_TAG_BOOLEAN, pa_source_get_mute(source, PA_MIXER_HARDWARE),
+ PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
+ PA_TAG_CVOLUME, pa_source_get_volume(source),
+ PA_TAG_BOOLEAN, pa_source_get_mute(source),
PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
PA_TAG_USEC, pa_source_get_latency(source),
PA_TAG_STRING, source->driver,
PA_TAG_U32,
- (source->get_hw_volume ? PA_SOURCE_HW_VOLUME_CTRL : 0) |
- (source->get_latency ? PA_SOURCE_LATENCY : 0) |
+ (source->get_volume ? PA_SOURCE_HW_VOLUME_CTRL : 0) | /* FIXME */
+ (source->get_latency ? PA_SOURCE_LATENCY : 0) | /* FIXME */
(source->is_hardware ? PA_SOURCE_HARDWARE : 0),
PA_TAG_INVALID);
}
static void client_fill_tagstruct(pa_tagstruct *t, pa_client *client) {
- assert(t && client);
+ pa_assert(t);
+ pa_assert(client);
+
pa_tagstruct_putu32(t, client->index);
pa_tagstruct_puts(t, client->name);
pa_tagstruct_putu32(t, client->owner ? client->owner->index : PA_INVALID_INDEX);
@@ -1367,7 +1696,9 @@ static void client_fill_tagstruct(pa_tagstruct *t, pa_client *client) {
}
static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) {
- assert(t && module);
+ pa_assert(t);
+ pa_assert(module);
+
pa_tagstruct_putu32(t, module->index);
pa_tagstruct_puts(t, module->name);
pa_tagstruct_puts(t, module->argument);
@@ -1376,7 +1707,9 @@ static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) {
}
static void sink_input_fill_tagstruct(pa_tagstruct *t, pa_sink_input *s) {
- assert(t && s);
+ pa_assert(t);
+ pa_sink_input_assert_ref(s);
+
pa_tagstruct_putu32(t, s->index);
pa_tagstruct_puts(t, s->name);
pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
@@ -1392,7 +1725,9 @@ static void sink_input_fill_tagstruct(pa_tagstruct *t, pa_sink_input *s) {
}
static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output *s) {
- assert(t && s);
+ pa_assert(t);
+ pa_source_output_assert_ref(s);
+
pa_tagstruct_putu32(t, s->index);
pa_tagstruct_puts(t, s->name);
pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
@@ -1407,7 +1742,9 @@ static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output *s) {
}
static void scache_fill_tagstruct(pa_tagstruct *t, pa_scache_entry *e) {
- assert(t && e);
+ pa_assert(t);
+ pa_assert(e);
+
pa_tagstruct_putu32(t, e->index);
pa_tagstruct_puts(t, e->name);
pa_tagstruct_put_cvolume(t, &e->volume);
@@ -1420,7 +1757,7 @@ static void scache_fill_tagstruct(pa_tagstruct *t, pa_scache_entry *e) {
}
static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
pa_sink *sink = NULL;
pa_source *source = NULL;
@@ -1431,7 +1768,9 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
pa_scache_entry *sce = NULL;
const char *name;
pa_tagstruct *reply;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
(command != PA_COMMAND_GET_CLIENT_INFO &&
@@ -1466,7 +1805,7 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
else {
- assert(command == PA_COMMAND_GET_SAMPLE_INFO);
+ pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
if (idx != PA_INVALID_INDEX)
sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
else
@@ -1497,12 +1836,14 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
}
static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_idxset *i;
uint32_t idx;
void *p;
pa_tagstruct *reply;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (!pa_tagstruct_eof(t)) {
protocol_error(c);
@@ -1526,7 +1867,7 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
i = c->protocol->core->source_outputs;
else {
- assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
+ pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
i = c->protocol->core->scache;
}
@@ -1545,7 +1886,7 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
source_output_fill_tagstruct(reply, p);
else {
- assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
+ pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
scache_fill_tagstruct(reply, p);
}
}
@@ -1555,11 +1896,13 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
}
static void command_get_server_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_tagstruct *reply;
char txt[256];
const char *n;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (!pa_tagstruct_eof(t)) {
protocol_error(c);
@@ -1587,8 +1930,10 @@ static void command_get_server_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
pa_tagstruct *t;
- struct connection *c = userdata;
- assert(c && core);
+ connection *c = CONNECTION(userdata);
+
+ connection_assert_ref(c);
+ pa_assert(t);
t = pa_tagstruct_new(NULL, 0);
pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
@@ -1599,9 +1944,11 @@ static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint3
}
static void command_subscribe(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_subscription_mask_t m;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &m) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1617,7 +1964,7 @@ static void command_subscribe(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint
if (m != 0) {
c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
- assert(c->subscription);
+ pa_assert(c->subscription);
} else
c->subscription = NULL;
@@ -1631,14 +1978,16 @@ static void command_set_volume(
pa_tagstruct *t,
void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
pa_cvolume volume;
pa_sink *sink = NULL;
pa_source *source = NULL;
pa_sink_input *si = NULL;
const char *name = NULL;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
(command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
@@ -1653,27 +2002,36 @@ static void command_set_volume(
CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || !name || (*name && pa_utf8_valid(name)), tag, PA_ERR_INVALID);
CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
- if (command == PA_COMMAND_SET_SINK_VOLUME) {
- if (idx != PA_INVALID_INDEX)
- sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
- else
- sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
- } else if (command == PA_COMMAND_SET_SOURCE_VOLUME) {
- if (idx != (uint32_t) -1)
- source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
- else
- source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
- } else {
- assert(command == PA_COMMAND_SET_SINK_INPUT_VOLUME);
- si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
+ switch (command) {
+
+ case PA_COMMAND_SET_SINK_VOLUME:
+ if (idx != PA_INVALID_INDEX)
+ sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
+ else
+ sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
+ break;
+
+ case PA_COMMAND_SET_SOURCE_VOLUME:
+ if (idx != PA_INVALID_INDEX)
+ source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
+ else
+ source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
+ break;
+
+ case PA_COMMAND_SET_SINK_INPUT_VOLUME:
+ si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
+ break;
+
+ default:
+ pa_assert_not_reached();
}
CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
if (sink)
- pa_sink_set_volume(sink, PA_MIXER_HARDWARE, &volume);
+ pa_sink_set_volume(sink, &volume);
else if (source)
- pa_source_set_volume(source, PA_MIXER_HARDWARE, &volume);
+ pa_source_set_volume(source, &volume);
else if (si)
pa_sink_input_set_volume(si, &volume);
@@ -1687,16 +2045,20 @@ static void command_set_mute(
pa_tagstruct *t,
void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
int mute;
pa_sink *sink = NULL;
pa_source *source = NULL;
+ pa_sink_input *si = NULL;
const char *name = NULL;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
- pa_tagstruct_gets(t, &name) < 0 ||
+ (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
+ (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
pa_tagstruct_get_boolean(t, &mute) ||
!pa_tagstruct_eof(t)) {
protocol_error(c);
@@ -1706,35 +2068,53 @@ static void command_set_mute(
CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || !name || (*name && pa_utf8_valid(name)), tag, PA_ERR_INVALID);
- if (command == PA_COMMAND_SET_SINK_MUTE) {
- if (idx != PA_INVALID_INDEX)
- sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
- else
- sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
- } else {
- assert(command == PA_COMMAND_SET_SOURCE_MUTE);
- if (idx != (uint32_t) -1)
- source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
- else
- source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
+ switch (command) {
+
+ case PA_COMMAND_SET_SINK_MUTE:
+
+ if (idx != PA_INVALID_INDEX)
+ sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
+ else
+ sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
+
+ break;
+
+ case PA_COMMAND_SET_SOURCE_MUTE:
+ if (idx != PA_INVALID_INDEX)
+ source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
+ else
+ source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
+
+ break;
+
+ case PA_COMMAND_SET_SINK_INPUT_MUTE:
+ si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
+ break;
+
+ default:
+ pa_assert_not_reached();
}
- CHECK_VALIDITY(c->pstream, sink || source, tag, PA_ERR_NOENTITY);
+ CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
if (sink)
- pa_sink_set_mute(sink, PA_MIXER_HARDWARE, mute);
+ pa_sink_set_mute(sink, mute);
else if (source)
- pa_source_set_mute(source, PA_MIXER_HARDWARE, mute);
+ pa_source_set_mute(source, mute);
+ else if (si)
+ pa_sink_input_set_mute(si, mute);
pa_pstream_send_simple_ack(c->pstream, tag);
}
static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
int b;
- struct playback_stream *s, *ssync;
- assert(c && t);
+ playback_stream *s;
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
pa_tagstruct_get_boolean(t, &b) < 0 ||
@@ -1747,73 +2127,19 @@ static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
s = pa_idxset_get_by_index(c->output_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+ CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
pa_sink_input_cork(s->sink_input, b);
- pa_memblockq_prebuf_force(s->memblockq);
-
- /* Do the same for all other members in the sync group */
- for (ssync = s->prev; ssync; ssync = ssync->prev) {
- pa_sink_input_cork(ssync->sink_input, b);
- pa_memblockq_prebuf_force(ssync->memblockq);
- }
-
- for (ssync = s->next; ssync; ssync = ssync->next) {
- pa_sink_input_cork(ssync->sink_input, b);
- pa_memblockq_prebuf_force(ssync->memblockq);
- }
-
pa_pstream_send_simple_ack(c->pstream, tag);
}
-static void command_flush_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+static void command_trigger_or_flush_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+ connection *c = CONNECTION(userdata);
uint32_t idx;
- struct playback_stream *s, *ssync;
- assert(c && t);
-
- if (pa_tagstruct_getu32(t, &idx) < 0 ||
- !pa_tagstruct_eof(t)) {
- protocol_error(c);
- return;
- }
-
- CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
- CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
- s = pa_idxset_get_by_index(c->output_streams, idx);
- CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
-
- pa_memblockq_flush(s->memblockq);
- s->underrun = 0;
+ playback_stream *s;
- /* Do the same for all other members in the sync group */
- for (ssync = s->prev; ssync; ssync = ssync->prev) {
- pa_memblockq_flush(ssync->memblockq);
- ssync->underrun = 0;
- }
-
- for (ssync = s->next; ssync; ssync = ssync->next) {
- pa_memblockq_flush(ssync->memblockq);
- ssync->underrun = 0;
- }
-
- pa_pstream_send_simple_ack(c->pstream, tag);
- pa_sink_notify(s->sink_input->sink);
- request_bytes(s);
-
- for (ssync = s->prev; ssync; ssync = ssync->prev)
- request_bytes(ssync);
-
- for (ssync = s->next; ssync; ssync = ssync->next)
- request_bytes(ssync);
-}
-
-static void command_trigger_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
- uint32_t idx;
- struct playback_stream *s;
- assert(c && t);
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1825,32 +2151,36 @@ static void command_trigger_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch
CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
s = pa_idxset_get_by_index(c->output_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+ CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
switch (command) {
+ case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
+ pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
+ break;
+
case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
- pa_memblockq_prebuf_force(s->memblockq);
+ pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
break;
case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
- pa_memblockq_prebuf_disable(s->memblockq);
+ pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
break;
default:
- abort();
+ pa_assert_not_reached();
}
- pa_sink_notify(s->sink_input->sink);
pa_pstream_send_simple_ack(c->pstream, tag);
- request_bytes(s);
}
static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
- struct record_stream *s;
+ record_stream *s;
int b;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
pa_tagstruct_get_boolean(t, &b) < 0 ||
@@ -1869,11 +2199,13 @@ static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
}
static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
- struct record_stream *s;
- assert(c && t);
+ record_stream *s;
+ connection_assert_ref(c);
+ pa_assert(t);
+
if (pa_tagstruct_getu32(t, &idx) < 0 ||
!pa_tagstruct_eof(t)) {
protocol_error(c);
@@ -1889,9 +2221,11 @@ static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U
}
static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const char *s;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_gets(t, &s) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1907,10 +2241,12 @@ static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch *pd, u
}
static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
const char *name;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
pa_tagstruct_gets(t, &name) < 0 ||
@@ -1923,16 +2259,16 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
- struct playback_stream *s;
+ playback_stream *s;
s = pa_idxset_get_by_index(c->output_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
- CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+ CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
pa_sink_input_set_name(s->sink_input, name);
} else {
- struct record_stream *s;
+ record_stream *s;
s = pa_idxset_get_by_index(c->record_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
@@ -1944,9 +2280,11 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
}
static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -1973,7 +2311,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
} else {
pa_source_output *s;
- assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
+ pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
@@ -1985,12 +2323,14 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
}
static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_module *m;
const char *name, *argument;
pa_tagstruct *reply;
- assert(c && t);
+ connection_assert_ref(c);
+ pa_assert(t);
+
if (pa_tagstruct_gets(t, &name) < 0 ||
pa_tagstruct_gets(t, &argument) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -2013,10 +2353,12 @@ static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
}
static void command_unload_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx;
pa_module *m;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
!pa_tagstruct_eof(t)) {
@@ -2033,12 +2375,14 @@ static void command_unload_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
}
static void command_add_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const char *name, *module, *argument;
uint32_t type;
uint32_t idx;
pa_tagstruct *reply;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_gets(t, &name) < 0 ||
pa_tagstruct_getu32(t, &type) < 0 ||
@@ -2066,11 +2410,13 @@ static void command_add_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED u
}
static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const char *name = NULL;
uint32_t type, idx = PA_IDXSET_INVALID;
int r;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if ((pa_tagstruct_getu32(t, &idx) < 0 &&
(pa_tagstruct_gets(t, &name) < 0 ||
@@ -2095,7 +2441,7 @@ static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
}
static void autoload_fill_tagstruct(pa_tagstruct *t, const pa_autoload_entry *e) {
- assert(t && e);
+ pa_assert(t && e);
pa_tagstruct_putu32(t, e->index);
pa_tagstruct_puts(t, e->name);
@@ -2105,12 +2451,14 @@ static void autoload_fill_tagstruct(pa_tagstruct *t, const pa_autoload_entry *e)
}
static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
const pa_autoload_entry *a = NULL;
uint32_t type, idx;
const char *name;
pa_tagstruct *reply;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if ((pa_tagstruct_getu32(t, &idx) < 0 &&
(pa_tagstruct_gets(t, &name) < 0 ||
@@ -2137,9 +2485,11 @@ static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU
}
static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_tagstruct *reply;
- assert(c && t);
+
+ connection_assert_ref(c);
+ pa_assert(t);
if (!pa_tagstruct_eof(t)) {
protocol_error(c);
@@ -2162,12 +2512,12 @@ static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
}
static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
- struct connection *c = userdata;
+ connection *c = CONNECTION(userdata);
uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
const char *name = NULL;
- assert(c);
- assert(t);
+ connection_assert_ref(c);
+ pa_assert(t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
pa_tagstruct_getu32(t, &idx_device) < 0 ||
@@ -2218,69 +2568,49 @@ static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag
}
pa_pstream_send_simple_ack(c->pstream, tag);
-
}
/*** pstream callbacks ***/
static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
- struct connection *c = userdata;
- assert(p && packet && packet->data && c);
+ connection *c = CONNECTION(userdata);
+
+ pa_assert(p);
+ pa_assert(packet);
+ connection_assert_ref(c);
if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
pa_log("invalid packet.");
- connection_free(c);
+ connection_unlink(c);
}
}
static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
- struct connection *c = userdata;
- struct output_stream *stream;
- assert(p && chunk && userdata);
-
- if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
+ connection *c = CONNECTION(userdata);
+ output_stream *stream;
+
+ pa_assert(p);
+ pa_assert(chunk);
+ connection_assert_ref(c);
+
+ if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
pa_log("client sent block for invalid stream.");
/* Ignoring */
return;
}
- if (stream->type == PLAYBACK_STREAM) {
- struct playback_stream *ps = (struct playback_stream*) stream;
- if (chunk->length >= ps->requested_bytes)
- ps->requested_bytes = 0;
- else
- ps->requested_bytes -= chunk->length;
-
- pa_memblockq_seek(ps->memblockq, offset, seek);
-
- if (pa_memblockq_push_align(ps->memblockq, chunk) < 0) {
- pa_tagstruct *t;
-
- pa_log_warn("failed to push data into queue");
-
- /* Pushing this block into the queue failed, so we simulate
- * it by skipping ahead */
-
- pa_memblockq_seek(ps->memblockq, chunk->length, PA_SEEK_RELATIVE);
-
- /* Notify the user */
- t = pa_tagstruct_new(NULL, 0);
- pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
- pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
- pa_tagstruct_putu32(t, ps->index);
- pa_pstream_send_tagstruct(p, t);
- }
-
- ps->underrun = 0;
-
- pa_sink_notify(ps->sink_input->sink);
-
+ if (playback_stream_isinstance(stream)) {
+ playback_stream *ps = PLAYBACK_STREAM(stream);
+
+ if (seek != PA_SEEK_RELATIVE || offset != 0)
+ pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
+
+ pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
+
} else {
- struct upload_stream *u = (struct upload_stream*) stream;
+ upload_stream *u = UPLOAD_STREAM(stream);
size_t l;
- assert(u->type == UPLOAD_STREAM);
-
if (!u->memchunk.memblock) {
if (u->length == chunk->length) {
u->memchunk = *chunk;
@@ -2292,7 +2622,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
}
}
- assert(u->memchunk.memblock);
+ pa_assert(u->memchunk.memblock);
l = u->length;
if (l > chunk->length)
@@ -2317,17 +2647,21 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
}
static void pstream_die_callback(pa_pstream *p, void *userdata) {
- struct connection *c = userdata;
- assert(p && c);
- connection_free(c);
+ connection *c = CONNECTION(userdata);
+
+ pa_assert(p);
+ connection_assert_ref(c);
+ connection_unlink(c);
/* pa_log("connection died.");*/
}
static void pstream_drain_callback(pa_pstream *p, void *userdata) {
- struct connection *c = userdata;
- assert(p && c);
+ connection *c = CONNECTION(userdata);
+
+ pa_assert(p);
+ connection_assert_ref(c);
send_memblock(c);
}
@@ -2335,25 +2669,32 @@ static void pstream_drain_callback(pa_pstream *p, void *userdata) {
/*** client callbacks ***/
static void client_kill_cb(pa_client *c) {
- assert(c && c->userdata);
- connection_free(c->userdata);
+ pa_assert(c);
+
+ connection_unlink(CONNECTION(c->userdata));
}
/*** socket server callbacks ***/
static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
- struct connection *c = userdata;
- assert(m && tv && c && c->auth_timeout_event == e);
+ connection *c = CONNECTION(userdata);
+
+ pa_assert(m);
+ pa_assert(tv);
+ connection_assert_ref(c);
+ pa_assert(c->auth_timeout_event == e);
if (!c->authorized)
- connection_free(c);
+ connection_unlink(c);
}
static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, void *userdata) {
pa_protocol_native *p = userdata;
- struct connection *c;
+ connection *c;
char cname[256], pname[128];
- assert(io && p);
+
+ pa_assert(io);
+ pa_assert(p);
if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
@@ -2361,7 +2702,8 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
return;
}
- c = pa_xmalloc(sizeof(struct connection));
+ c = pa_msgobject_new(connection);
+ c->parent.parent.free = connection_free;
c->authorized = !!p->public;
@@ -2382,15 +2724,15 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
c->protocol = p;
pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
pa_snprintf(cname, sizeof(cname), "Native client (%s)", pname);
- assert(p->core);
+ pa_assert(p->core);
c->client = pa_client_new(p->core, __FILE__, cname);
- assert(c->client);
+ pa_assert(c->client);
c->client->kill = client_kill_cb;
c->client->userdata = c;
c->client->owner = p->module;
c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
- assert(c->pstream);
+ pa_assert(c->pstream);
pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
@@ -2398,11 +2740,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
- assert(c->pdispatch);
+ pa_assert(c->pdispatch);
c->record_streams = pa_idxset_new(NULL, NULL);
c->output_streams = pa_idxset_new(NULL, NULL);
- assert(c->record_streams && c->output_streams);
+ pa_assert(c->record_streams && c->output_streams);
c->rrobin_index = PA_IDXSET_INVALID;
c->subscription = NULL;
@@ -2420,7 +2762,7 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
/*** module entry points ***/
static int load_key(pa_protocol_native*p, const char*fn) {
- assert(p);
+ pa_assert(p);
p->auth_cookie_in_property = 0;
@@ -2450,8 +2792,8 @@ static pa_protocol_native* protocol_new_internal(pa_core *c, pa_module *m, pa_mo
int public = 0;
const char *acl;
- assert(c);
- assert(ma);
+ pa_assert(c);
+ pa_assert(ma);
if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &public) < 0) {
pa_log("auth-anonymous= expects a boolean argument.");
@@ -2492,7 +2834,7 @@ static pa_protocol_native* protocol_new_internal(pa_core *c, pa_module *m, pa_mo
goto fail;
p->connections = pa_idxset_new(NULL, NULL);
- assert(p->connections);
+ pa_assert(p->connections);
return p;
@@ -2527,11 +2869,11 @@ pa_protocol_native* pa_protocol_native_new(pa_core *core, pa_socket_server *serv
}
void pa_protocol_native_free(pa_protocol_native *p) {
- struct connection *c;
- assert(p);
+ connection *c;
+ pa_assert(p);
while ((c = pa_idxset_first(p->connections, NULL)))
- connection_free(c);
+ connection_unlink(c);
pa_idxset_free(p->connections, NULL, NULL);
if (p->server) {
@@ -2563,7 +2905,12 @@ void pa_protocol_native_free(pa_protocol_native *p) {
pa_xfree(p);
}
-pa_protocol_native* pa_protocol_native_new_iochannel(pa_core*core, pa_iochannel *io, pa_module *m, pa_modargs *ma) {
+pa_protocol_native* pa_protocol_native_new_iochannel(
+ pa_core*core,
+ pa_iochannel *io,
+ pa_module *m,
+ pa_modargs *ma) {
+
pa_protocol_native *p;
if (!(p = protocol_new_internal(core, m, ma)))
diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c
index 8f9aed58..0ded5d26 100644
--- a/src/pulsecore/protocol-simple.c
+++ b/src/pulsecore/protocol-simple.c
@@ -67,7 +67,7 @@ typedef struct connection {
PA_DECLARE_CLASS(connection);
#define CONNECTION(o) (connection_cast(o))
-static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
struct pa_protocol_simple {
pa_module *module;
@@ -91,9 +91,9 @@ enum {
};
enum {
- MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
- MESSAGE_POST_DATA, /* data from source output to main loop */
- MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
+ CONNECTION_MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
+ CONNECTION_MESSAGE_POST_DATA, /* data from source output to main loop */
+ CONNECTION_MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
};
@@ -102,29 +102,12 @@ enum {
#define RECORD_BUFFER_SECONDS (5)
#define RECORD_BUFFER_FRAGMENTS (100)
-static void connection_free(pa_object *o) {
- connection *c = CONNECTION(o);
+static void connection_unlink(connection *c) {
pa_assert(c);
- if (c->playback.current_memblock)
- pa_memblock_unref(c->playback.current_memblock);
-
- if (c->io)
- pa_iochannel_free(c->io);
- if (c->input_memblockq)
- pa_memblockq_free(c->input_memblockq);
- if (c->output_memblockq)
- pa_memblockq_free(c->output_memblockq);
-
- pa_xfree(c);
-}
-
-static void connection_drop(connection *c) {
- pa_assert(c);
-
- if (!pa_idxset_remove_by_data(c->protocol->connections, c, NULL))
+ if (!c->protocol)
return;
-
+
if (c->sink_input) {
pa_sink_input_disconnect(c->sink_input);
pa_sink_input_unref(c->sink_input);
@@ -142,9 +125,30 @@ static void connection_drop(connection *c) {
c->client = NULL;
}
+ pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
+ c->protocol = NULL;
connection_unref(c);
}
+static void connection_free(pa_object *o) {
+ connection *c = CONNECTION(o);
+ pa_assert(c);
+
+ connection_unref(c);
+
+ if (c->playback.current_memblock)
+ pa_memblock_unref(c->playback.current_memblock);
+
+ if (c->io)
+ pa_iochannel_free(c->io);
+ if (c->input_memblockq)
+ pa_memblockq_free(c->input_memblockq);
+ if (c->output_memblockq)
+ pa_memblockq_free(c->output_memblockq);
+
+ pa_xfree(c);
+}
+
static int do_read(connection *c) {
pa_memchunk chunk;
ssize_t r;
@@ -190,7 +194,7 @@ static int do_read(connection *c) {
c->playback.memblock_index += r;
- pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
+ pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
pa_atomic_sub(&c->playback.missing, r);
return 0;
@@ -263,28 +267,28 @@ fail:
pa_iochannel_free(c->io);
c->io = NULL;
- pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, NULL, NULL);
+ pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
} else
- connection_drop(c);
+ connection_unlink(c);
}
-static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
connection *c = CONNECTION(o);
connection_assert_ref(c);
switch (code) {
- case MESSAGE_REQUEST_DATA:
+ case CONNECTION_MESSAGE_REQUEST_DATA:
do_work(c);
break;
- case MESSAGE_POST_DATA:
+ case CONNECTION_MESSAGE_POST_DATA:
/* pa_log("got data %u", chunk->length); */
pa_memblockq_push_align(c->output_memblockq, chunk);
do_work(c);
break;
- case MESSAGE_DROP_CONNECTION:
- connection_drop(c);
+ case CONNECTION_MESSAGE_DROP_CONNECTION:
+ connection_unlink(c);
break;
}
@@ -294,13 +298,13 @@ static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_m
/*** sink_input callbacks ***/
/* Called from thread context */
-static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink_input *i = PA_SINK_INPUT(o);
connection*c;
- pa_assert(i);
- c = i->userdata;
- pa_assert(c);
+ pa_sink_input_assert_ref(i);
+ c = CONNECTION(i->userdata);
+ connection_assert_ref(c);
switch (code) {
@@ -330,7 +334,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_
}
default:
- return pa_sink_input_process_msg(o, code, userdata, chunk);
+ return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
}
}
@@ -349,7 +353,7 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
/* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */
if (c->dead && r < 0)
- pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_DROP_CONNECTION, NULL, 0, NULL, NULL);
return r;
}
@@ -369,19 +373,20 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
if (new > old) {
if (pa_atomic_add(&c->playback.missing, new - old) <= 0)
- pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
}
}
/* Called from main context */
static void sink_input_kill_cb(pa_sink_input *i) {
- pa_assert(i);
+ pa_sink_input_assert_ref(i);
- connection_drop(CONNECTION(i->userdata));
+ connection_unlink(CONNECTION(i->userdata));
}
/*** source_output callbacks ***/
+/* Called from thread context */
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
connection *c;
@@ -390,24 +395,22 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
pa_assert(c);
pa_assert(chunk);
- pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
}
+/* Called from main context */
static void source_output_kill_cb(pa_source_output *o) {
- connection*c;
+ pa_source_output_assert_ref(o);
- pa_assert(o);
- c = o->userdata;
- pa_assert(c);
-
- connection_drop(c);
+ connection_unlink(CONNECTION(o->userdata));
}
+/* Called from main context */
static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
connection*c;
pa_assert(o);
- c = o->userdata;
+ c = CONNECTION(o->userdata);
pa_assert(c);
return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
@@ -419,16 +422,16 @@ static void client_kill_cb(pa_client *client) {
connection*c;
pa_assert(client);
- c = client->userdata;
+ c = CONNECTION(client->userdata);
pa_assert(c);
- connection_drop(c);
+ connection_unlink(c);
}
/*** pa_iochannel callbacks ***/
static void io_callback(pa_iochannel*io, void *userdata) {
- connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_assert(io);
pa_assert(c);
@@ -453,7 +456,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
return;
}
- c = pa_msgobject_new(connection, connection_check_type);
+ c = pa_msgobject_new(connection);
c->parent.parent.free = connection_free;
c->parent.process_msg = connection_process_msg;
c->io = io;
@@ -547,7 +550,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
pa_source_output_put(c->source_output);
}
-
pa_iochannel_set_callback(c->io, io_callback, c);
pa_idxset_put(p->connections, c, NULL);
@@ -555,7 +557,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
fail:
if (c)
- connection_drop(c);
+ connection_unlink(c);
}
pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
@@ -618,7 +620,7 @@ void pa_protocol_simple_free(pa_protocol_simple *p) {
if (p->connections) {
while((c = pa_idxset_first(p->connections, NULL)))
- connection_drop(c);
+ connection_unlink(c);
pa_idxset_free(p->connections, NULL, NULL);
}
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index e4e931f4..0a7033d0 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -45,7 +45,7 @@
#define MOVE_BUFFER_LENGTH (1024*1024)
#define SILENCE_BUFFER_LENGTH (64*1024)
-static PA_DEFINE_CHECK_TYPE(pa_sink_input, sink_input_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_sink_input, pa_msgobject);
static void sink_input_free(pa_object *o);
@@ -110,6 +110,7 @@ pa_sink_input* pa_sink_input_new(
pa_return_null_if_fail(data->sink);
pa_return_null_if_fail(pa_sink_get_state(data->sink) != PA_SINK_DISCONNECTED);
+ pa_return_null_if_fail(!data->sync_base || (data->sync_base->sink == data->sink && pa_sink_input_get_state(data->sync_base) == PA_SINK_INPUT_CORKED));
if (!data->sample_spec_is_set)
data->sample_spec = data->sink->sample_spec;
@@ -161,12 +162,12 @@ pa_sink_input* pa_sink_input_new(
data->resample_method = pa_resampler_get_method(resampler);
}
- i = pa_msgobject_new(pa_sink_input, sink_input_check_type);
+ i = pa_msgobject_new(pa_sink_input);
i->parent.parent.free = sink_input_free;
i->parent.process_msg = pa_sink_input_process_msg;
i->core = core;
- i->state = PA_SINK_INPUT_RUNNING;
+ i->state = data->start_corked ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT_RUNNING;
i->flags = flags;
i->name = pa_xstrdup(data->name);
i->driver = pa_xstrdup(data->driver);
@@ -181,6 +182,16 @@ pa_sink_input* pa_sink_input_new(
i->volume = data->volume;
i->muted = data->muted;
+ if (data->sync_base) {
+ i->sync_next = data->sync_base->sync_next;
+ i->sync_prev = data->sync_base;
+
+ if (data->sync_base->sync_next)
+ data->sync_base->sync_next->sync_prev = i;
+ data->sync_base->sync_next = i;
+ } else
+ i->sync_next = i->sync_prev = NULL;
+
i->peek = NULL;
i->drop = NULL;
i->kill = NULL;
@@ -213,6 +224,7 @@ pa_sink_input* pa_sink_input_new(
}
static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) {
+ pa_sink_input *ssync;
pa_assert(i);
if (state == PA_SINK_INPUT_DRAINED)
@@ -221,10 +233,15 @@ static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) {
if (i->state == state)
return 0;
- if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+ if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
return -1;
i->state = state;
+ for (ssync = i->sync_prev; ssync; ssync = ssync->sync_prev)
+ ssync->state = state;
+ for (ssync = i->sync_next; ssync; ssync = ssync->sync_next)
+ ssync->state = state;
+
return 0;
}
@@ -232,10 +249,16 @@ void pa_sink_input_disconnect(pa_sink_input *i) {
pa_assert(i);
pa_return_if_fail(i->state != PA_SINK_INPUT_DISCONNECTED);
- pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL);
+ if (i->sync_prev)
+ i->sync_prev->sync_next = i->sync_next;
+ if (i->sync_next)
+ i->sync_next->sync_prev = i->sync_prev;
+
+ i->sync_prev = i->sync_next = NULL;
+
+ pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, 0, NULL);
pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL);
pa_idxset_remove_by_data(i->sink->inputs, i, NULL);
- pa_sink_input_unref(i);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index);
@@ -248,6 +271,7 @@ void pa_sink_input_disconnect(pa_sink_input *i) {
i->kill = NULL;
i->get_latency = NULL;
i->underrun = NULL;
+ pa_sink_input_unref(i);
}
static void sink_input_free(pa_object *o) {
@@ -281,7 +305,7 @@ void pa_sink_input_put(pa_sink_input *i) {
i->thread_info.volume = i->volume;
i->thread_info.muted = i->muted;
- pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, pa_sink_input_ref(i), NULL, (pa_free_cb_t) pa_sink_input_unref);
+ pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL);
pa_sink_update_status(i->sink);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index);
@@ -299,7 +323,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i) {
pa_sink_input_assert_ref(i);
- if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+ if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, 0, NULL) < 0)
r = 0;
if (i->get_latency)
@@ -509,7 +533,7 @@ void pa_sink_input_set_volume(pa_sink_input *i, const pa_cvolume *volume) {
i->volume = *volume;
- pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+ pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
}
@@ -528,7 +552,7 @@ void pa_sink_input_set_mute(pa_sink_input *i, int mute) {
i->muted = mute;
- pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+ pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
}
@@ -553,7 +577,7 @@ int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) {
i->sample_spec.rate = rate;
- pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
+ pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), 0, NULL, NULL);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
return 0;
@@ -741,7 +765,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
/* return 0; */
}
-int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink_input *i = PA_SINK_INPUT(o);
pa_sink_input_assert_ref(i);
@@ -776,12 +800,28 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc
}
case PA_SINK_INPUT_MESSAGE_SET_STATE: {
+ pa_sink_input *ssync;
+
if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
(i->thread_info.state != PA_SINK_INPUT_DRAINED) && (i->thread_info.state != PA_SINK_INPUT_RUNNING))
pa_atomic_store(&i->thread_info.drained, 1);
i->thread_info.state = PA_PTR_TO_UINT(userdata);
+ for (ssync = i->thread_info.sync_prev; ssync; ssync = ssync->thread_info.sync_prev) {
+ if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
+ (ssync->thread_info.state != PA_SINK_INPUT_DRAINED) && (ssync->thread_info.state != PA_SINK_INPUT_RUNNING))
+ pa_atomic_store(&ssync->thread_info.drained, 1);
+ ssync->thread_info.state = PA_PTR_TO_UINT(userdata);
+ }
+
+ for (ssync = i->thread_info.sync_next; ssync; ssync = ssync->thread_info.sync_next) {
+ if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
+ (ssync->thread_info.state != PA_SINK_INPUT_DRAINED) && (ssync->thread_info.state != PA_SINK_INPUT_RUNNING))
+ pa_atomic_store(&ssync->thread_info.drained, 1);
+ ssync->thread_info.state = PA_PTR_TO_UINT(userdata);
+ }
+
return 0;
}
}
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index fe62917a..af3db95e 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -71,6 +71,8 @@ struct pa_sink_input {
pa_sample_spec sample_spec;
pa_channel_map channel_map;
+ pa_sink_input *sync_prev, *sync_next;
+
pa_cvolume volume;
int muted;
@@ -97,6 +99,8 @@ struct pa_sink_input {
/* size_t move_silence; */
pa_memblock *silence_memblock; /* may be NULL */
+ pa_sink_input *sync_prev, *sync_next;
+
pa_cvolume volume;
int muted;
} thread_info;
@@ -133,6 +137,9 @@ typedef struct pa_sink_input_new_data {
int muted_is_set;
pa_resample_method_t resample_method;
+
+ int start_corked;
+ pa_sink_input *sync_base;
} pa_sink_input_new_data;
pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data *data);
@@ -179,6 +186,6 @@ pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i);
int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume);
void pa_sink_input_drop(pa_sink_input *i, size_t length);
-int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
#endif
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 7f2a8b39..5ab01cb4 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -48,7 +48,7 @@
#define MAX_MIX_CHANNELS 32
#define SILENCE_BUFFER_LENGTH (64*1024)
-static PA_DEFINE_CHECK_TYPE(pa_sink, sink_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_sink, pa_msgobject);
static void sink_free(pa_object *s);
@@ -80,7 +80,7 @@ pa_sink* pa_sink_new(
pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
pa_return_null_if_fail(name && pa_utf8_valid(name) && *name);
- s = pa_msgobject_new(pa_sink, sink_check_type);
+ s = pa_msgobject_new(pa_sink);
if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fail))) {
pa_xfree(s);
@@ -161,7 +161,7 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
if ((ret = s->set_state(s, state)) < 0)
return -1;
- if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+ if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
return -1;
s->state = state;
@@ -264,7 +264,7 @@ int pa_sink_suspend(pa_sink *s, int suspend) {
void pa_sink_ping(pa_sink *s) {
pa_sink_assert_ref(s);
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, NULL, NULL);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, 0, NULL, NULL);
}
static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) {
@@ -530,7 +530,7 @@ pa_usec_t pa_sink_get_latency(pa_sink *s) {
if (s->get_latency)
return s->get_latency(s);
- if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
+ if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
return 0;
return usec;
@@ -549,7 +549,7 @@ void pa_sink_set_volume(pa_sink *s, const pa_cvolume *volume) {
s->set_volume = NULL;
if (!s->set_volume)
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
if (changed)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -566,7 +566,7 @@ const pa_cvolume *pa_sink_get_volume(pa_sink *s) {
s->get_volume = NULL;
if (!s->get_volume && s->refresh_volume)
- pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_VOLUME, &s->volume, NULL);
+ pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_VOLUME, &s->volume, 0, NULL);
if (!pa_cvolume_equal(&old_volume, &s->volume))
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -585,7 +585,7 @@ void pa_sink_set_mute(pa_sink *s, int mute) {
s->set_mute = NULL;
if (!s->set_mute)
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
if (changed)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -602,7 +602,7 @@ int pa_sink_get_mute(pa_sink *s) {
s->get_mute = NULL;
if (!s->get_mute && s->refresh_mute)
- pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_MUTE, &s->muted, NULL);
+ pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_MUTE, &s->muted, 0, NULL);
if (old_muted != s->muted)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -660,21 +660,58 @@ unsigned pa_sink_used_by(pa_sink *s) {
return ret;
}
-int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink *s = PA_SINK(o);
pa_sink_assert_ref(s);
switch ((pa_sink_message_t) code) {
+
case PA_SINK_MESSAGE_ADD_INPUT: {
pa_sink_input *i = userdata;
pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i));
+
+ /* Since the caller sleeps in pa_sink_input_put(), we can
+ * safely access data outside of thread_info even though
+ * it is mutable */
+
+ if ((i->thread_info.sync_prev = i->sync_prev)) {
+ pa_assert(i->sink == i->thread_info.sync_prev->sink);
+ pa_assert(i->sync_prev->sync_next == i);
+ i->thread_info.sync_prev->thread_info.sync_next = i;
+ }
+
+ if ((i->thread_info.sync_next = i->sync_next)) {
+ pa_assert(i->sink == i->thread_info.sync_next->sink);
+ pa_assert(i->sync_next->sync_prev == i);
+ i->thread_info.sync_next->thread_info.sync_prev = i;
+ }
+
return 0;
}
case PA_SINK_MESSAGE_REMOVE_INPUT: {
pa_sink_input *i = userdata;
+
+ /* Since the caller sleeps in pa_sink_input_disconnect(),
+ * we can safely access data outside of thread_info even
+ * though it is mutable */
+
+ pa_assert(!i->thread_info.sync_prev);
+ pa_assert(!i->thread_info.sync_next);
+
+ if (i->thread_info.sync_prev) {
+ i->thread_info.sync_prev->thread_info.sync_next = i->thread_info.sync_prev->sync_next;
+ i->thread_info.sync_prev = NULL;
+ }
+
+ if (i->thread_info.sync_next) {
+ i->thread_info.sync_next->thread_info.sync_prev = i->thread_info.sync_next->sync_prev;
+ i->thread_info.sync_next = NULL;
+ }
+
if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index)))
pa_sink_input_unref(i);
+
return 0;
}
@@ -698,6 +735,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *
return 0;
case PA_SINK_MESSAGE_SET_STATE:
+
s->thread_info.state = PA_PTR_TO_UINT(userdata);
return 0;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index 958279c5..494bb6a9 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -156,7 +156,7 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result);
void pa_sink_render_into(pa_sink*s, pa_memchunk *target);
void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target);
-int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
static inline int PA_SINK_OPENED(pa_sink_state_t x) {
return x == PA_SINK_RUNNING || x == PA_SINK_IDLE;
diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c
index 946af3e6..f2204f26 100644
--- a/src/pulsecore/sound-file-stream.c
+++ b/src/pulsecore/sound-file-stream.c
@@ -56,7 +56,7 @@ enum {
PA_DECLARE_CLASS(file_stream);
#define FILE_STREAM(o) (file_stream_cast(o))
-static PA_DEFINE_CHECK_TYPE(file_stream, file_stream_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(file_stream, pa_msgobject);
static void file_stream_free(pa_object *o) {
file_stream *u = FILE_STREAM(o);
@@ -85,7 +85,7 @@ static void file_stream_drop(file_stream *u) {
}
}
-static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
file_stream *u = FILE_STREAM(o);
file_stream_assert_ref(u);
@@ -154,7 +154,7 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
pa_memblock_unref(u->memchunk.memblock);
pa_memchunk_reset(&u->memchunk);
- pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MESSAGE_DROP_FILE_STREAM, NULL, NULL, NULL);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MESSAGE_DROP_FILE_STREAM, NULL, 0, NULL, NULL);
sf_close(u->sndfile);
u->sndfile = NULL;
@@ -224,7 +224,7 @@ int pa_play_file(
pa_assert(sink);
pa_assert(fname);
- u = pa_msgobject_new(file_stream, file_stream_check_type);
+ u = pa_msgobject_new(file_stream);
u->parent.parent.free = file_stream_free;
u->parent.process_msg = file_stream_process_msg;
u->core = sink->core;
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index ee76a6e0..c3ecf3a2 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -38,7 +38,7 @@
#include "source-output.h"
-static PA_DEFINE_CHECK_TYPE(pa_source_output, source_output_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_source_output, pa_msgobject);
static void source_output_free(pa_object* mo);
@@ -130,12 +130,12 @@ pa_source_output* pa_source_output_new(
data->resample_method = pa_resampler_get_method(resampler);
}
- o = pa_msgobject_new(pa_source_output, source_output_check_type);
+ o = pa_msgobject_new(pa_source_output);
o->parent.parent.free = source_output_free;
o->parent.process_msg = pa_source_output_process_msg;
o->core = core;
- o->state = PA_SOURCE_OUTPUT_RUNNING;
+ o->state = data->corked ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING;
o->flags = flags;
o->name = pa_xstrdup(data->name);
o->driver = pa_xstrdup(data->driver);
@@ -176,7 +176,7 @@ static int source_output_set_state(pa_source_output *o, pa_source_output_state_t
if (o->state == state)
return 0;
- if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+ if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
return -1;
o->state = state;
@@ -187,7 +187,7 @@ void pa_source_output_disconnect(pa_source_output*o) {
pa_assert(o);
pa_return_if_fail(o->state != PA_SOURCE_OUTPUT_DISCONNECTED);
- pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
+ pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL);
pa_idxset_remove_by_data(o->source->outputs, o, NULL);
@@ -225,7 +225,7 @@ static void source_output_free(pa_object* mo) {
void pa_source_output_put(pa_source_output *o) {
pa_source_output_assert_ref(o);
- pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), NULL, (pa_free_cb_t) pa_source_output_unref);
+ pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), 0, NULL, (pa_free_cb_t) pa_source_output_unref);
pa_source_update_status(o->source);
pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
@@ -243,7 +243,7 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) {
pa_source_output_assert_ref(o);
- if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+ if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, 0, NULL) < 0)
r = 0;
if (o->get_latency)
@@ -293,7 +293,7 @@ int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) {
o->sample_spec.rate = rate;
- pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
+ pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), 0, NULL, NULL);
pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index);
return 0;
@@ -380,7 +380,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
/* return 0; */
}
-int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk* chunk) {
+int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk* chunk) {
pa_source_output *o = PA_SOURCE_OUTPUT(mo);
pa_source_output_assert_ref(o);
diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h
index 7b6afe81..9f982a9a 100644
--- a/src/pulsecore/source-output.h
+++ b/src/pulsecore/source-output.h
@@ -103,6 +103,8 @@ typedef struct pa_source_output_new_data {
int channel_map_is_set;
pa_resample_method_t resample_method;
+
+ int corked;
} pa_source_output_new_data;
pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_data *data);
@@ -142,6 +144,6 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest);
/* To be used exclusively by the source driver thread */
void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk);
-int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk *chunk);
+int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
#endif
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 6ca81727..eaf1335e 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -42,7 +42,7 @@
#include "source.h"
-static PA_DEFINE_CHECK_TYPE(pa_source, source_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
static void source_free(pa_object *o);
@@ -73,7 +73,7 @@ pa_source* pa_source_new(
pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
pa_return_null_if_fail(pa_utf8_valid(name) && *name);
- s = pa_msgobject_new(pa_source, source_check_type);
+ s = pa_msgobject_new(pa_source);
if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SOURCE, s, fail))) {
pa_xfree(s);
@@ -140,7 +140,7 @@ static int source_set_state(pa_source *s, pa_source_state_t state) {
if ((ret = s->set_state(s, state)) < 0)
return -1;
- if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+ if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
return -1;
s->state = state;
@@ -222,7 +222,7 @@ int pa_source_suspend(pa_source *s, int suspend) {
void pa_source_ping(pa_source *s) {
pa_source_assert_ref(s);
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, NULL, NULL);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, 0, NULL, NULL);
}
void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
@@ -266,7 +266,7 @@ pa_usec_t pa_source_get_latency(pa_source *s) {
if (s->get_latency)
return s->get_latency(s);
- if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
+ if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
return 0;
return usec;
@@ -285,7 +285,7 @@ void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
s->set_volume = NULL;
if (!s->set_volume)
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
if (changed)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -301,7 +301,7 @@ const pa_cvolume *pa_source_get_volume(pa_source *s) {
s->get_volume = NULL;
if (!s->get_volume && s->refresh_volume)
- pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume, NULL);
+ pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume, 0, NULL);
if (!pa_cvolume_equal(&old_volume, &s->volume))
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -320,7 +320,7 @@ void pa_source_set_mute(pa_source *s, int mute) {
s->set_mute = NULL;
if (!s->set_mute)
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
if (changed)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -337,7 +337,7 @@ int pa_source_get_mute(pa_source *s) {
s->get_mute = NULL;
if (!s->get_mute && s->refresh_muted)
- pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, &s->muted, NULL);
+ pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, &s->muted, 0, NULL);
if (old_muted != s->muted)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -384,7 +384,7 @@ unsigned pa_source_used_by(pa_source *s) {
return pa_idxset_size(s->outputs);
}
-int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, pa_memchunk *chunk) {
+int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_source *s = PA_SOURCE(object);
pa_source_assert_ref(s);
diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h
index e2b02ceb..fe59e584 100644
--- a/src/pulsecore/source.h
+++ b/src/pulsecore/source.h
@@ -146,7 +146,7 @@ unsigned pa_source_used_by(pa_source *s);
/* To be used exclusively by the source driver thread */
void pa_source_post(pa_source*s, const pa_memchunk *b);
-int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, int64_t, pa_memchunk *chunk);
static inline int PA_SOURCE_OPENED(pa_source_state_t x) {
return x == PA_SOURCE_RUNNING || x == PA_SOURCE_IDLE;
diff --git a/src/tests/asyncmsgq-test.c b/src/tests/asyncmsgq-test.c
index 847d5be1..baf93a0c 100644
--- a/src/tests/asyncmsgq-test.c
+++ b/src/tests/asyncmsgq-test.c
@@ -49,7 +49,7 @@ static void the_thread(void *_q) {
do {
int code = 0;
- pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, 1) == 0);
+ pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, NULL, 1) == 0);
switch (code) {
@@ -85,22 +85,22 @@ int main(int argc, char *argv[]) {
pa_assert_se(t = pa_thread_new(the_thread, q));
printf("Operation A post\n");
- pa_asyncmsgq_post(q, NULL, OPERATION_A, NULL, NULL, NULL);
+ pa_asyncmsgq_post(q, NULL, OPERATION_A, NULL, 0, NULL, NULL);
pa_thread_yield();
printf("Operation B post\n");
- pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, NULL, NULL);
+ pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, 0, NULL, NULL);
pa_thread_yield();
printf("Operation C send\n");
- pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, NULL);
+ pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, 0, NULL);
pa_thread_yield();
printf("Quit post\n");
- pa_asyncmsgq_post(q, NULL, QUIT, NULL, NULL, NULL);
+ pa_asyncmsgq_post(q, NULL, QUIT, NULL, 0, NULL, NULL);
pa_thread_free(t);