diff options
Diffstat (limited to 'src')
33 files changed, 906 insertions, 692 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index eab465c8..0a5d5297 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -283,14 +283,14 @@ flist_test_CFLAGS = $(AM_CFLAGS) flist_test_LDADD = $(AM_LDADD) libpulsecore.la flist_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS) -asyncq_test_SOURCES = tests/asyncq-test.c pulsecore/thread-posix.c pulsecore/thread.h pulsecore/asyncq.c pulsecore/asyncq.h pulsecore/core-util.c pulsecore/core-util.h pulse/xmalloc.c pulse/xmalloc.h pulsecore/log.h pulsecore/log.c pulsecore/core-error.h pulsecore/core-error.c pulsecore/once-posix.c pulsecore/once.h pulsecore/mutex-posix.c pulsecore/mutex.h pulse/utf8.c pulse/utf8.h pulse/util.h pulse/util.c +asyncq_test_SOURCES = tests/asyncq-test.c asyncq_test_CFLAGS = $(AM_CFLAGS) -asyncq_test_LDADD = $(AM_LDADD) #libpulsecore.la +asyncq_test_LDADD = $(AM_LDADD) libpulsecore.la asyncq_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS) -asyncmsgq_test_SOURCES = tests/asyncmsgq-test.c pulsecore/thread-posix.c pulsecore/thread.h pulsecore/asyncq.c pulsecore/asyncq.h pulsecore/asyncmsgq.c pulsecore/asyncmsgq.h pulsecore/core-util.c pulsecore/core-util.h pulse/xmalloc.c pulse/xmalloc.h pulsecore/log.h pulsecore/log.c pulsecore/core-error.h pulsecore/core-error.c pulsecore/once-posix.c pulsecore/once.h pulsecore/mutex-posix.c pulsecore/mutex.h pulse/utf8.c pulse/utf8.h pulse/util.h pulse/util.c pulsecore/semaphore.h pulsecore/semaphore-posix.c pulsecore/flist.h pulsecore/flist.c +asyncmsgq_test_SOURCES = tests/asyncmsgq-test.c asyncmsgq_test_CFLAGS = $(AM_CFLAGS) -asyncmsgq_test_LDADD = $(AM_LDADD) #libpulsecore.la +asyncmsgq_test_LDADD = $(AM_LDADD) libpulsecore.la asyncmsgq_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS) mcalign_test_SOURCES = tests/mcalign-test.c @@ -653,7 +653,10 @@ libpulsecore_la_SOURCES += \ pulsecore/hook-list.c pulsecore/hook-list.h \ pulsecore/shm.c pulsecore/shm.h \ pulsecore/flist.c pulsecore/flist.h \ - pulsecore/anotify.c pulsecore/anotify.h \ + pulsecore/asyncmsgq.c pulsecore/asyncmsgqq.h \ + pulsecore/asyncq.c pulsecore/asyncq.h \ + pulsecore/object.c pulsecore/object.h \ + pulsecore/msgobject.c pulsecore/msgobject.h \ $(PA_THREAD_OBJS) if OS_IS_WIN32 @@ -718,9 +721,10 @@ modlibexec_LTLIBRARIES = \ libauthkey-prop.la \ libstrlist.la \ libprotocol-simple.la \ - libprotocol-esound.la \ - libprotocol-native.la \ - libprotocol-http.la + libprotocol-http.la + +# libprotocol-esound.la +# libprotocol-native.la # We need to emulate sendmsg/recvmsg to support this on Win32 if !OS_IS_WIN32 @@ -870,6 +874,11 @@ modlibexec_LTLIBRARIES += \ module-cli-protocol-tcp.la \ module-simple-protocol-tcp.la \ module-null-sink.la + module-detect.la \ + module-volume-restore.la \ + module-rescue-streams.la \ + module-http-protocol-tcp.la + # module-esound-protocol-tcp.la \ # module-native-protocol-tcp.la \ # module-native-protocol-fd.la \ @@ -877,11 +886,7 @@ modlibexec_LTLIBRARIES += \ # module-combine.la \ # module-tunnel-sink.la \ # module-tunnel-source.la \ -# module-esound-sink.la \ -# module-http-protocol-tcp.la \ -# module-detect.la \ -# module-volume-restore.la \ -# module-rescue-streams.la +# module-esound-sink.la # See comment at librtp.la above #if !OS_IS_WIN32 @@ -894,9 +899,9 @@ if HAVE_AF_UNIX modlibexec_LTLIBRARIES += \ module-cli-protocol-unix.la \ module-simple-protocol-unix.la + module-http-protocol-unix.la # module-esound-protocol-unix.la \ -# module-native-protocol-unix.la \ -# module-http-protocol-unix.la +# module-native-protocol-unix.la endif if HAVE_MKFIFO @@ -1079,44 +1084,44 @@ module_http_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-h # Native protocol -module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c -module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS) -module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version -module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la +#module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c +#module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS) +#module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version +#module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la -module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c -module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS) -module_native_protocol_unix_la_LDFLAGS = -module -avoid-version -module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la +#module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c +#module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS) +#module_native_protocol_unix_la_LDFLAGS = -module -avoid-version +#module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la -module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c -module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS) -module_native_protocol_fd_la_LDFLAGS = -module -avoid-version -module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la +#module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c +#module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS) +#module_native_protocol_fd_la_LDFLAGS = -module -avoid-version +#module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la # EsounD protocol -module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c -module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) -module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version -module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la +#module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c +#module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) +#module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version +#module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la -module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c -module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) -module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version -module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la +#module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c +#module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) +#module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version +#module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la -module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c -module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version -module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la +#module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c +#module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version +#module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la -module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c -module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version -module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la +#module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c +#module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version +#module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la -module_esound_sink_la_SOURCES = modules/module-esound-sink.c -module_esound_sink_la_LDFLAGS = -module -avoid-version -module_esound_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libiochannel.la libsocket-client.la libauthkey.la +#module_esound_sink_la_SOURCES = modules/module-esound-sink.c +#module_esound_sink_la_LDFLAGS = -module -avoid-version +#module_esound_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libiochannel.la libsocket-client.la libauthkey.la # Pipes @@ -1140,22 +1145,22 @@ module_null_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la # Couplings -module_combine_la_SOURCES = modules/module-combine.c -module_combine_la_LDFLAGS = -module -avoid-version -module_combine_la_LIBADD = $(AM_LIBADD) libpulsecore.la +#module_combine_la_SOURCES = modules/module-combine.c +#module_combine_la_LDFLAGS = -module -avoid-version +#module_combine_la_LIBADD = $(AM_LIBADD) libpulsecore.la module_match_la_SOURCES = modules/module-match.c module_match_la_LDFLAGS = -module -avoid-version module_match_la_LIBADD = $(AM_LIBADD) libpulsecore.la -module_tunnel_sink_la_SOURCES = modules/module-tunnel.c -module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS) -module_tunnel_sink_la_LDFLAGS = -module -avoid-version -module_tunnel_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la +#module_tunnel_sink_la_SOURCES = modules/module-tunnel.c +#module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS) +#module_tunnel_sink_la_LDFLAGS = -module -avoid-version +#module_tunnel_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la -module_tunnel_source_la_SOURCES = modules/module-tunnel.c -module_tunnel_source_la_LDFLAGS = -module -avoid-version -module_tunnel_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la +#module_tunnel_source_la_SOURCES = modules/module-tunnel.c +#module_tunnel_source_la_LDFLAGS = -module -avoid-version +#module_tunnel_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la # X11 @@ -1171,34 +1176,34 @@ module_x11_publish_la_LIBADD = $(AM_LIBADD) $(X_PRE_LIBS) -lX11 $(X_LIBS) $(X_EX # OSS -liboss_util_la_SOURCES = modules/oss-util.c modules/oss-util.h -liboss_util_la_LDFLAGS = -avoid-version -liboss_util_la_LIBADD = libpulsecore.la +#liboss_util_la_SOURCES = modules/oss-util.c modules/oss-util.h +#liboss_util_la_LDFLAGS = -avoid-version +#liboss_util_la_LIBADD = libpulsecore.la -module_oss_la_SOURCES = modules/module-oss.c -module_oss_la_LDFLAGS = -module -avoid-version -module_oss_la_LIBADD = $(AM_LIBADD) libiochannel.la liboss-util.la +#module_oss_la_SOURCES = modules/module-oss.c +#module_oss_la_LDFLAGS = -module -avoid-version +#module_oss_la_LIBADD = $(AM_LIBADD) libiochannel.la liboss-util.la -module_oss_mmap_la_SOURCES = modules/module-oss-mmap.c -module_oss_mmap_la_LDFLAGS = -module -avoid-version -module_oss_mmap_la_LIBADD = $(AM_LIBADD) liboss-util.la libpulsecore.la +#module_oss_mmap_la_SOURCES = modules/module-oss-mmap.c +#module_oss_mmap_la_LDFLAGS = -module -avoid-version +#module_oss_mmap_la_LIBADD = $(AM_LIBADD) liboss-util.la libpulsecore.la # ALSA -libalsa_util_la_SOURCES = modules/alsa-util.c modules/alsa-util.h -libalsa_util_la_LDFLAGS = -avoid-version -libalsa_util_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libpulsecore.la -libalsa_util_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS) +#libalsa_util_la_SOURCES = modules/alsa-util.c modules/alsa-util.h +#libalsa_util_la_LDFLAGS = -avoid-version +#libalsa_util_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libpulsecore.la +#libalsa_util_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS) -module_alsa_sink_la_SOURCES = modules/module-alsa-sink.c -module_alsa_sink_la_LDFLAGS = -module -avoid-version -module_alsa_sink_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la -module_alsa_sink_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS) +#module_alsa_sink_la_SOURCES = modules/module-alsa-sink.c +#module_alsa_sink_la_LDFLAGS = -module -avoid-version +#module_alsa_sink_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la +#module_alsa_sink_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS) -module_alsa_source_la_SOURCES = modules/module-alsa-source.c -module_alsa_source_la_LDFLAGS = -module -avoid-version -module_alsa_source_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la -module_alsa_source_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS) +#module_alsa_source_la_SOURCES = modules/module-alsa-source.c +#module_alsa_source_la_LDFLAGS = -module -avoid-version +#module_alsa_source_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la +#module_alsa_source_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS) # Solaris @@ -1265,15 +1270,15 @@ module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) # JACK -module_jack_sink_la_SOURCES = modules/module-jack-sink.c -module_jack_sink_la_LDFLAGS = -module -avoid-version -module_jack_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS) -module_jack_sink_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS) +#module_jack_sink_la_SOURCES = modules/module-jack-sink.c +#module_jack_sink_la_LDFLAGS = -module -avoid-version +#module_jack_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS) +#module_jack_sink_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS) -module_jack_source_la_SOURCES = modules/module-jack-source.c -module_jack_source_la_LDFLAGS = -module -avoid-version -module_jack_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS) -module_jack_source_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS) +#module_jack_source_la_SOURCES = modules/module-jack-source.c +#module_jack_source_la_LDFLAGS = -module -avoid-version +#module_jack_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS) +#module_jack_source_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS) # HAL libdbus_util_la_SOURCES = modules/dbus-util.c modules/dbus-util.h diff --git a/src/daemon/main.c b/src/daemon/main.c index 2424efa7..a1926fe5 100644 --- a/src/daemon/main.c +++ b/src/daemon/main.c @@ -656,7 +656,7 @@ int main(int argc, char *argv[]) { pa_mainloop_get_api(mainloop)->time_free(timer); #endif - pa_core_free(c); + pa_core_unref(c); if (!conf->no_cpu_limit) pa_cpu_limit_done(); diff --git a/src/modules/module-lirc.c b/src/modules/module-lirc.c index c8adbc8b..452fa1f3 100644 --- a/src/modules/module-lirc.c +++ b/src/modules/module-lirc.c @@ -121,7 +121,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC pa_log("failed to get sink '%s'", u->sink_name); else { int i; - pa_cvolume cv = *pa_sink_get_volume(s, PA_MIXER_HARDWARE); + pa_cvolume cv = *pa_sink_get_volume(s); #define DELTA (PA_VOLUME_NORM/20) @@ -134,7 +134,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC cv.values[i] = PA_VOLUME_NORM; } - pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv); + pa_sink_set_volume(s, &cv); break; case DOWN: @@ -145,20 +145,20 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC cv.values[i] = PA_VOLUME_MUTED; } - pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv); + pa_sink_set_volume(s, &cv); break; case MUTE: - pa_sink_set_mute(s, PA_MIXER_HARDWARE, 0); + pa_sink_set_mute(s, 0); break; case RESET: - pa_sink_set_mute(s, PA_MIXER_HARDWARE, 1); + pa_sink_set_mute(s, 1); break; case MUTE_TOGGLE: - pa_sink_set_mute(s, PA_MIXER_HARDWARE, !pa_sink_get_mute(s, PA_MIXER_HARDWARE)); + pa_sink_set_mute(s, !pa_sink_get_mute(s)); break; case INVALID: diff --git a/src/modules/module-mmkbd-evdev.c b/src/modules/module-mmkbd-evdev.c index b7433ac8..919b399d 100644 --- a/src/modules/module-mmkbd-evdev.c +++ b/src/modules/module-mmkbd-evdev.c @@ -114,7 +114,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC pa_log("failed to get sink '%s'", u->sink_name); else { int i; - pa_cvolume cv = *pa_sink_get_volume(s, PA_MIXER_HARDWARE); + pa_cvolume cv = *pa_sink_get_volume(s); #define DELTA (PA_VOLUME_NORM/20) @@ -127,7 +127,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC cv.values[i] = PA_VOLUME_NORM; } - pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv); + pa_sink_set_volume(s, &cv); break; case DOWN: @@ -138,12 +138,12 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC cv.values[i] = PA_VOLUME_MUTED; } - pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv); + pa_sink_set_volume(s, &cv); break; case MUTE_TOGGLE: - pa_sink_set_mute(s, PA_MIXER_HARDWARE, !pa_sink_get_mute(s, PA_MIXER_HARDWARE)); + pa_sink_set_mute(s, !pa_sink_get_mute(s)); break; case INVALID: diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index ce3b29b0..afe130d9 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -33,17 +33,19 @@ #include <fcntl.h> #include <unistd.h> #include <limits.h> +#include <sys/poll.h> #include <pulse/timeval.h> #include <pulse/xmalloc.h> #include <pulsecore/macro.h> -#include <pulsecore/iochannel.h> #include <pulsecore/sink.h> #include <pulsecore/module.h> #include <pulsecore/core-util.h> +#include <pulsecore/core-error.h> #include <pulsecore/modargs.h> #include <pulsecore/log.h> +#include <pulsecore/thread.h> #include "module-null-sink-symdef.h" @@ -65,7 +67,9 @@ struct userdata { pa_module *module; pa_sink *sink; pa_thread *thread; + pa_asyncmsgq *asyncmsgq; size_t block_size; + struct timeval timestamp; }; @@ -79,85 +83,74 @@ static const char* const valid_modargs[] = { NULL }; +static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) { + struct userdata *u = PA_SINK(o)->userdata; + + switch (code) { + case PA_SINK_MESSAGE_SET_STATE: + + if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) + pa_gettimeofday(&u->timestamp); + + break; + + case PA_SINK_MESSAGE_GET_LATENCY: { + struct timeval now; + + pa_gettimeofday(&now); + + if (pa_timeval_cmp(&u->timestamp, &now) > 0) + *((pa_usec_t*) data) = 0; + else + *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now); + break; + } + } + + return pa_sink_process_msg(o, code, data, chunk); +} + static void thread_func(void *userdata) { struct userdata *u = userdata; - int quit = 0; struct pollfd pollfd; - int running = 1; pa_assert(u); pa_log_debug("Thread starting up"); + pa_gettimeofday(&u->timestamp); + memset(&pollfd, 0, sizeof(pollfd)); - pollfd.fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP); + pollfd.fd = pa_asyncmsgq_get_fd(u->asyncmsgq); pollfd.events = POLLIN; - pa_gettimeofday(u->timestamp); - for (;;) { + pa_msgobject *object; int code; - void *data, *object; + void *data; + pa_memchunk chunk; int r, timeout; struct timeval now; /* Check whether there is a message for us to process */ - if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) { - - - /* Now process these messages our own way */ - if (!object) { - - switch (code) { - case PA_MESSAGE_SHUTDOWN: - goto finish; - - default: - pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); - - } + if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) { + int ret; - } else if (object == u->sink) { - - switch (code) { - case PA_SINK_MESSAGE_STOP: - pa_assert(running); - running = 0; - break; - - case PA_SINK_MESSAGE_START: - pa_assert(!running); - running = 1; - - pa_gettimeofday(u->timestamp); - break; - - case PA_SINK_MESSAGE_GET_LATENCY: - - if (pa_timeval_cmp(&u->timestamp, &now) > 0) - *((pa_usec_t*) data) = 0; - else - *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now); - break; - - /* ... */ - - default: - pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); - } + if (!object && code == PA_MESSAGE_SHUTDOWN) { + pa_asyncmsgq_done(u->asyncmsgq, 0); + goto finish; } - - pa_asyncmsgq_done(u->sink->asyncmsgq); + + ret = pa_asyncmsgq_dispatch(object, code, data, &chunk); + pa_asyncmsgq_done(u->asyncmsgq, ret); continue; } /* Render some data and drop it immediately */ - - if (running) { + if (u->sink->thread_info.state == PA_SINK_RUNNING) { pa_gettimeofday(&now); - if (pa_timeval_cmp(u->timestamp, &now) <= 0) { - pa_memchunk chunk; + if (pa_timeval_cmp(&u->timestamp, &now) <= 0) { size_t l; if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) { @@ -179,11 +172,11 @@ static void thread_func(void *userdata) { /* Hmm, nothing to do. Let's sleep */ - if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0) + if (pa_asyncmsgq_before_poll(u->asyncmsgq) < 0) continue; r = poll(&pollfd, 1, timeout); - pa_asyncmsgq_after_poll(u->sink->asyncmsgq); + pa_asyncmsgq_after_poll(u->asyncmsgq); if (r < 0) { if (errno == EINTR) @@ -199,8 +192,8 @@ static void thread_func(void *userdata) { fail: /* We have to continue processing messages until we receive the * SHUTDOWN message */ - pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), NULL, pa_module_unref); - pa_asyncmsgq_wait_for(PA_MESSAGE_SHUTDOWN); + pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL); + pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN); finish: pa_log_debug("Thread shutting down"); @@ -231,20 +224,24 @@ int pa__init(pa_core *c, pa_module*m) { u->module = m; m->userdata = u; + pa_assert_se(u->asyncmsgq = pa_asyncmsgq_new(0)); + if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) { pa_log("Failed to create sink."); goto fail; } + u->sink->parent.process_msg = sink_process_msg; u->sink->userdata = u; - pa_sink_set_owner(u->sink, m); + + pa_sink_set_module(u->sink, m); + pa_sink_set_asyncmsgq(u->sink, u->asyncmsgq); pa_sink_set_description(u->sink, pa_modargs_get_value(ma, "description", "NULL sink")); u->block_size = pa_bytes_per_second(&ss) / 20; /* 50 ms */ - if (u->block_size <= 0) u->block_size = pa_frame_size(&ss); - + if (!(u->thread = pa_thread_new(thread_func, u))) { pa_log("Failed to create thread."); goto fail; @@ -272,14 +269,19 @@ void pa__done(pa_core *c, pa_module*m) { if (!(u = m->userdata)) return; - pa_sink_disconnect(u->sink); + if (u->sink) + pa_sink_disconnect(u->sink); if (u->thread) { - pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL); + pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL); pa_thread_free(u->thread); } - pa_sink_unref(u->sink); + if (u->asyncmsgq) + pa_asyncmsgq_free(u->asyncmsgq); + + if (u->sink) + pa_sink_unref(u->sink); pa_xfree(u); } diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index e4735f61..da9124a7 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -34,6 +34,8 @@ #include <fcntl.h> #include <unistd.h> #include <limits.h> +#include <sys/ioctl.h> +#include <sys/poll.h> #include <pulse/xmalloc.h> @@ -44,6 +46,7 @@ #include <pulsecore/core-util.h> #include <pulsecore/modargs.h> #include <pulsecore/log.h> +#include <pulsecore/thread.h> #include "module-pipe-sink-symdef.h" @@ -65,9 +68,12 @@ struct userdata { pa_core *core; pa_module *module; pa_sink *sink; + pa_thread *thread; + pa_asyncmsgq *asyncmsgq; char *filename; int fd; - pa_thread *thread; + + pa_memchunk memchunk; }; static const char* const valid_modargs[] = { @@ -80,109 +86,99 @@ static const char* const valid_modargs[] = { NULL }; -enum { - POLLFD_ASYNCQ, - POLLFD_FIFO, - POLLFD_MAX, -}; +static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) { + struct userdata *u = PA_SINK(o)->userdata; + + switch (code) { + + case PA_SINK_MESSAGE_GET_LATENCY: { + size_t n = 0; + int l; + + if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0) + n = (size_t) l; + + n += u->memchunk.length; + + *((pa_usec_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec); + break; + } + } + + return pa_sink_process_msg(o, code, data, chunk); +} static void thread_func(void *userdata) { + enum { + POLLFD_ASYNCQ, + POLLFD_FIFO, + POLLFD_MAX, + }; + struct userdata *u = userdata; - int quit = 0; struct pollfd pollfd[POLLFD_MAX]; - int running = 1, underrun = 0; - pa_memchunk memchunk; + int underrun = 0; + int write_type = 0; pa_assert(u); pa_log_debug("Thread starting up"); + pa_memchunk_reset(&u->memchunk); + memset(&pollfd, 0, sizeof(pollfd)); - pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP); + + pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->asyncmsgq); pollfd[POLLFD_ASYNCQ].events = POLLIN; - pollfd[POLLFD_FIFO].fd = u->fd; - memset(&memchunk, 0, sizeof(memchunk)); - for (;;) { + pa_msgobject *object; int code; - void *object, *data; + void *data; + pa_memchunk chunk; int r; - struct timeval now; /* Check whether there is a message for us to process */ - if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) { - - - /* Now process these messages our own way */ - if (!object) { - switch (code) { - case PA_SINK_MESSAGE_SHUTDOWN: - goto finish; - - default: - pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); - } + if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) { + int ret; - } else if (object == u->sink) { - - case PA_SINK_MESSAGE_STOP: - pa_assert(running); - running = 0; - break; - - case PA_SINK_MESSAGE_START: - pa_assert(!running); - running = 1; - break; - - case PA_SINK_MESSAGE_GET_LATENCY: { - size_t n = 0; - int l; - - if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0) - n = (size_t) l; - - n += memchunk.length; - - *((pa_usec_t*) data) pa_bytes_to_usec(n, &u->sink->sample_spec); - break; - } - - /* ... */ - - default: - pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); + if (!object && code == PA_MESSAGE_SHUTDOWN) { + pa_asyncmsgq_done(u->asyncmsgq, 0); + goto finish; } - pa_asyncmsgq_done(u->sink->asyncmsgq); + ret = pa_asyncmsgq_dispatch(object, code, data, &chunk); + pa_asyncmsgq_done(u->asyncmsgq, ret); continue; } /* Render some data and write it to the fifo */ - if (running && (pollfd[POLLFD_FIFO].revents || underrun)) { + if (u->sink->thread_info.state == PA_SINK_RUNNING && (pollfd[POLLFD_FIFO].revents || underrun)) { - if (chunk.length <= 0) - pa_sink_render(u->fd, PIPE_BUF, &chunk); + if (u->memchunk.length <= 0) + pa_sink_render(u->sink, PIPE_BUF, &u->memchunk); - underrun = chunk.length <= 0; + underrun = u->memchunk.length <= 0; if (!underrun) { ssize_t l; + void *p; p = pa_memblock_acquire(u->memchunk.memblock); - l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length); - pa_memblock_release(p); + l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type); + pa_memblock_release(u->memchunk.memblock); if (l < 0) { - if (errno != EINTR && errno != EAGAIN) { + if (errno == EINTR) + continue; + else if (errno != EAGAIN) { pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); goto fail; } - + } else { u->memchunk.index += l; @@ -190,24 +186,24 @@ static void thread_func(void *userdata) { if (u->memchunk.length <= 0) { pa_memblock_unref(u->memchunk.memblock); - u->memchunk.memblock = NULL; + pa_memchunk_reset(&u->memchunk); } - } - pollfd[POLLFD_FIFO].revents = 0; - continue; + pollfd[POLLFD_FIFO].revents = 0; + continue; + } } } - pollfd[POLLFD_FIFO].events = running && !underrun ? POLLOUT : 0; + pollfd[POLLFD_FIFO].events = (u->sink->thread_info.state == PA_SINK_RUNNING && !underrun) ? POLLOUT : 0; /* Hmm, nothing to do. Let's sleep */ - if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0) + if (pa_asyncmsgq_before_poll(u->asyncmsgq) < 0) continue; - r = poll(&pollfd, 1, 0); - pa_asyncmsgq_after_poll(u->sink->asyncmsgq); + r = poll(pollfd, POLLFD_MAX, -1); + pa_asyncmsgq_after_poll(u->asyncmsgq); if (r < 0) { if (errno == EINTR) @@ -217,19 +213,19 @@ static void thread_func(void *userdata) { goto fail; } - if (pollfd[POLLFD_FIFO].revents & ~POLLIN) { + if (pollfd[POLLFD_FIFO].revents & ~POLLOUT) { pa_log("FIFO shutdown."); goto fail; } - pa_assert(pollfd[POLLFD_ASYNCQ].revents & ~POLLIN == 0); + pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0); } fail: /* We have to continue processing messages until we receive the * SHUTDOWN message */ - pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), pa_module_unref); - pa_asyncmsgq_wait_for(PA_SINK_MESSAGE_SHUTDOWN); + pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL); + pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN); finish: pa_log_debug("Thread shutting down"); @@ -253,23 +249,22 @@ int pa__init(pa_core *c, pa_module*m) { ss = c->default_sample_spec; if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) { - pa_log("Invalid sample format specification"); + pa_log("Invalid sample format specification or channel map"); goto fail; } u = pa_xnew0(struct userdata, 1); u->core = c; u->module = m; - u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FIFO_NAME)); - u->fd = fd; - u->memchunk.memblock = NULL; - u->memchunk.length = 0; m->userdata = u; - mkfifo(u->filename, 0666); + pa_assert_se(u->asyncmsgq = pa_asyncmsgq_new(0)); + + u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); - if ((u->fd = open(u->filename, O_RDWR)) < 0) { - pa_log("open('%s'): %s", p, pa_cstrerror(errno)); + mkfifo(u->filename, 0666); + if ((u->fd = open(u->filename, O_RDWR|O_NOCTTY)) < 0) { + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); goto fail; } @@ -277,12 +272,12 @@ int pa__init(pa_core *c, pa_module*m) { pa_make_nonblock_fd(u->fd); if (fstat(u->fd, &st) < 0) { - pa_log("fstat('%s'): %s", p, pa_cstrerror(errno)); + pa_log("fstat('%s'): %s", u->filename, pa_cstrerror(errno)); goto fail; } if (!S_ISFIFO(st.st_mode)) { - pa_log("'%s' is not a FIFO.", p); + pa_log("'%s' is not a FIFO.", u->filename); goto fail; } @@ -291,9 +286,12 @@ int pa__init(pa_core *c, pa_module*m) { goto fail; } + u->sink->parent.process_msg = sink_process_msg; u->sink->userdata = u; - pa_sink_set_owner(u->sink, m); - pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", p)); + + pa_sink_set_module(u->sink, m); + pa_sink_set_asyncmsgq(u->sink, u->asyncmsgq); + pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", u->filename)); pa_xfree(t); if (!(u->thread = pa_thread_new(thread_func, u))) { @@ -316,20 +314,26 @@ fail: void pa__done(pa_core *c, pa_module*m) { struct userdata *u; + pa_assert(c); pa_assert(m); if (!(u = m->userdata)) return; - pa_sink_disconnect(u->sink); + if (u->sink) + pa_sink_disconnect(u->sink); if (u->thread) { - pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL); + pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL); pa_thread_free(u->thread); } - pa_sink_unref(u->sink); + if (u->asyncmsgq) + pa_asyncmsgq_free(u->asyncmsgq); + + if (u->sink) + pa_sink_unref(u->sink); if (u->memchunk.memblock) pa_memblock_unref(u->memchunk.memblock); diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index f275c5d4..ac2bef7d 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -179,7 +179,7 @@ int pa__init(pa_core *c, pa_module*m) { goto fail; } u->source->userdata = u; - pa_source_set_owner(u->source, m); + pa_source_set_module(u->source, m); pa_source_set_description(u->source, t = pa_sprintf_malloc("Unix FIFO source '%s'", p)); pa_xfree(t); diff --git a/src/pulsecore/asyncmsgq.c b/src/pulsecore/asyncmsgq.c index de5b2f9d..6becb629 100644 --- a/src/pulsecore/asyncmsgq.c +++ b/src/pulsecore/asyncmsgq.c @@ -48,6 +48,7 @@ struct asyncmsgq_item { pa_free_cb_t free_cb; pa_memchunk memchunk; pa_semaphore *semaphore; + int ret; }; struct pa_asyncmsgq { @@ -81,10 +82,10 @@ void pa_asyncmsgq_free(pa_asyncmsgq *a) { pa_msgobject_unref(i->object); if (i->memchunk.memblock) - pa_memblock_unref(i->object); + pa_memblock_unref(i->memchunk.memblock); - if (i->userdata_free_cb) - i->userdata_free_cb(i->userdata); + if (i->free_cb) + i->free_cb(i->userdata); if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0) pa_xfree(i); @@ -103,7 +104,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo i = pa_xnew(struct asyncmsgq_item, 1); i->code = code; - i->object = pa_msgobject_ref(object); + i->object = object ? pa_msgobject_ref(object) : NULL; i->userdata = (void*) userdata; i->free_cb = free_cb; if (chunk) { @@ -131,9 +132,9 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi i.ret = -1; if (chunk) { pa_assert(chunk->memblock); - i->memchunk = *chunk; + i.memchunk = *chunk; } else - pa_memchunk_reset(&i->memchunk); + pa_memchunk_reset(&i.memchunk); pa_assert_se(i.semaphore = pa_semaphore_new(0)); /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ @@ -161,8 +162,10 @@ int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **u if (object) *object = a->current->object; if (chunk) - *chunk = a->chunk; + *chunk = a->current->memchunk; + pa_log_debug("q=%p object=%p code=%i data=%p", a, a->current->object, a->current->code, a->current->userdata); + return 0; } @@ -196,11 +199,16 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) { pa_assert(a); do { + pa_msgobject *o; + void *data; + pa_memchunk chunk; + int ret; - if (pa_asyncmsgq_get(a, NULL, &c, NULL, 1) < 0) + if (pa_asyncmsgq_get(a, &o, &c, &data, &chunk, 1) < 0) return -1; - pa_asyncmsgq_done(a); + ret = pa_asyncmsgq_dispatch(o, c, data, &chunk); + pa_asyncmsgq_done(a, ret); } while (c != code); @@ -226,10 +234,9 @@ void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) { } int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) { - pa_assert(q); if (object) - return object->msg_process(object, code, userdata, memchunk); + return object->process_msg(object, code, userdata, memchunk); return 0; } diff --git a/src/pulsecore/asyncq.c b/src/pulsecore/asyncq.c index 54d36dc0..da1f16fb 100644 --- a/src/pulsecore/asyncq.c +++ b/src/pulsecore/asyncq.c @@ -52,8 +52,8 @@ struct pa_asyncq { unsigned size; unsigned read_idx; unsigned write_idx; - pa_atomic_int_t read_waiting; - pa_atomic_int_t write_waiting; + pa_atomic_t read_waiting, n_read; + pa_atomic_t write_waiting, n_written; int read_fds[2], write_fds[2]; }; @@ -80,6 +80,8 @@ pa_asyncq *pa_asyncq_new(unsigned size) { l->size = size; pa_atomic_store(&l->read_waiting, 0); pa_atomic_store(&l->write_waiting, 0); + pa_atomic_store(&l->n_written, 0); + pa_atomic_store(&l->n_read, 0); if (pipe(l->read_fds) < 0) { pa_xfree(l); @@ -131,10 +133,26 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { - /* First try failed. Let's wait for changes. */ - - if (!wait) + if (!wait) { + /* Let's empty the FIFO from old notifications, before we return */ + + while (pa_atomic_load(&l->n_read) > 0) { + ssize_t r; + int x[20]; + + errno = 0; + if ((r = read(l->write_fds[0], x, sizeof(x))) <= 0 && errno != EINTR) + return -1; + + if (r > 0) + if (pa_atomic_sub(&l->n_read, r) <= r) + break; + } + return -1; + } + + /* First try failed. Let's wait for changes. */ _Y; @@ -142,6 +160,7 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { for (;;) { char x[20]; + ssize_t r; _Y; @@ -150,10 +169,13 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { _Y; - if (read(l->write_fds[0], x, sizeof(x)) < 0 && errno != EINTR) { + if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) { pa_atomic_dec(&l->write_waiting); return -1; } + + if (r > 0) + pa_atomic_sub(&l->n_read, r); } _Y; @@ -167,7 +189,8 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { if (pa_atomic_load(&l->read_waiting)) { char x = 'x'; _Y; - write(l->read_fds[1], &x, sizeof(x)); + if (write(l->read_fds[1], &x, sizeof(x)) > 0) + pa_atomic_inc(&l->n_written); } return 0; @@ -189,8 +212,24 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { /* First try failed. Let's wait for changes. */ - if (!wait) + if (!wait) { + /* Let's empty the FIFO from old notifications, before we return */ + + while (pa_atomic_load(&l->n_written) > 0) { + ssize_t r; + int x[20]; + + errno = 0; + if ((r = read(l->read_fds[0], x, sizeof(x))) <= 0 && errno != EINTR) + return NULL; + + if (r > 0) + if (pa_atomic_sub(&l->n_written, r) <= r) + break; + } + return NULL; + } _Y; @@ -198,6 +237,7 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { for (;;) { char x[20]; + ssize_t r; _Y; @@ -206,10 +246,13 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { _Y; - if (read(l->read_fds[0], x, sizeof(x)) < 0 && errno != EINTR) { + if ((r = read(l->read_fds[0], x, sizeof(x)) < 0) && errno != EINTR) { pa_atomic_dec(&l->read_waiting); return NULL; } + + if (r > 0) + pa_atomic_sub(&l->n_written, r); } _Y; @@ -226,7 +269,8 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { if (pa_atomic_load(&l->write_waiting)) { char x = 'x'; _Y; - write(l->write_fds[1], &x, sizeof(x)); + if (write(l->write_fds[1], &x, sizeof(x)) >= 0) + pa_atomic_inc(&l->n_read); } return ret; @@ -262,10 +306,13 @@ int pa_asyncq_before_poll(pa_asyncq *l) { return 0; } -int pa_asyncq_after_poll(pa_asyncq *l) { +void pa_asyncq_after_poll(pa_asyncq *l) { pa_assert(l); pa_assert(pa_atomic_load(&l->read_waiting) > 0); pa_atomic_dec(&l->read_waiting); + + + } diff --git a/src/pulsecore/asyncq.h b/src/pulsecore/asyncq.h index aac45b1d..729ec466 100644 --- a/src/pulsecore/asyncq.h +++ b/src/pulsecore/asyncq.h @@ -51,6 +51,6 @@ int pa_asyncq_push(pa_asyncq *q, void *p, int wait); int pa_asyncq_get_fd(pa_asyncq *q); int pa_asyncq_before_poll(pa_asyncq *a); -int pa_asyncq_after_poll(pa_asyncq *a); +void pa_asyncq_after_poll(pa_asyncq *a); #endif diff --git a/src/pulsecore/cli-command.c b/src/pulsecore/cli-command.c index 36c85d60..d7613530 100644 --- a/src/pulsecore/cli-command.c +++ b/src/pulsecore/cli-command.c @@ -115,6 +115,8 @@ static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); +static int pa_cli_command_suspend_sink(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); +static int pa_cli_command_suspend_source(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); /* A method table for all available commands */ @@ -134,11 +136,11 @@ static const struct command commands[] = { { "load-module", pa_cli_command_load, "Load a module (args: name, arguments)", 3}, { "unload-module", pa_cli_command_unload, "Unload a module (args: index)", 2}, { "set-sink-volume", pa_cli_command_sink_volume, "Set the volume of a sink (args: index|name, volume)", 3}, - { "set-sink-input-volume", pa_cli_command_sink_input_volume, "Set the volume of a sink input (args: index|name, volume)", 3}, + { "set-sink-input-volume", pa_cli_command_sink_input_volume, "Set the volume of a sink input (args: index, volume)", 3}, { "set-source-volume", pa_cli_command_source_volume, "Set the volume of a source (args: index|name, volume)", 3}, - { "set-sink-mute", pa_cli_command_sink_mute, "Set the mute switch of a sink (args: index|name, mute)", 3}, - { "set-sink-input-mute", pa_cli_command_sink_input_mute, "Set the mute switch of a sink input (args: index|name, mute)", 3}, - { "set-source-mute", pa_cli_command_source_mute, "Set the mute switch of a source (args: index|name, mute)", 3}, + { "set-sink-mute", pa_cli_command_sink_mute, "Set the mute switch of a sink (args: index|name, bool)", 3}, + { "set-sink-input-mute", pa_cli_command_sink_input_mute, "Set the mute switch of a sink input (args: index, bool)", 3}, + { "set-source-mute", pa_cli_command_source_mute, "Set the mute switch of a source (args: index|name, bool)", 3}, { "set-default-sink", pa_cli_command_sink_default, "Set the default sink (args: index|name)", 2}, { "set-default-source", pa_cli_command_source_default, "Set the default source (args: index|name)", 2}, { "kill-client", pa_cli_command_kill_client, "Kill a client (args: index)", 2}, @@ -161,6 +163,8 @@ static const struct command commands[] = { { "move-sink-input", pa_cli_command_move_sink_input, "Move sink input to another sink (args: index, sink)", 3}, { "move-source-output", pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3}, { "vacuum", pa_cli_command_vacuum, NULL, 1}, + { "suspend-sink", pa_cli_command_suspend_sink, "Suspend sink (args: index|name, bool)", 3}, + { "suspend-source", pa_cli_command_suspend_source, "Suspend source (args: index|name, bool)", 3}, { NULL, NULL, NULL, 0 } }; @@ -899,6 +903,64 @@ static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_str return 0; } +static int pa_cli_command_suspend_sink(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) { + const char *n, *m; + pa_sink *sink; + int suspend; + + if (!(n = pa_tokenizer_get(t, 1))) { + pa_strbuf_puts(buf, "You need to specify a sink either by its name or its index.\n"); + return -1; + } + + if (!(m = pa_tokenizer_get(t, 2))) { + pa_strbuf_puts(buf, "You need to specify a suspend switch setting (0/1).\n"); + return -1; + } + + if (pa_atoi(m, &suspend) < 0) { + pa_strbuf_puts(buf, "Failed to parse suspend switch.\n"); + return -1; + } + + if (!(sink = pa_namereg_get(c, n, PA_NAMEREG_SINK, 1))) { + pa_strbuf_puts(buf, "No sink found by this name or index.\n"); + return -1; + } + + pa_sink_suspend(sink, suspend); + return 0; +} + +static int pa_cli_command_suspend_source(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) { + const char *n, *m; + pa_source *source; + int suspend; + + if (!(n = pa_tokenizer_get(t, 1))) { + pa_strbuf_puts(buf, "You need to specify a source either by its name or its index.\n"); + return -1; + } + + if (!(m = pa_tokenizer_get(t, 2))) { + pa_strbuf_puts(buf, "You need to specify a suspend switch setting (0/1).\n"); + return -1; + } + + if (pa_atoi(m, &suspend) < 0) { + pa_strbuf_puts(buf, "Failed to parse suspend switch.\n"); + return -1; + } + + if (!(source = pa_namereg_get(c, n, PA_NAMEREG_SOURCE, 1))) { + pa_strbuf_puts(buf, "No source found by this name or index.\n"); + return -1; + } + + pa_source_suspend(source, suspend); + return 0; +} + static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) { pa_module *m; pa_sink *sink; @@ -1162,3 +1224,4 @@ int pa_cli_command_execute(pa_core *c, const char *s, pa_strbuf *buf, int *fail) return 0; } + diff --git a/src/pulsecore/cli-text.c b/src/pulsecore/cli-text.c index 05d681e3..c919e46d 100644 --- a/src/pulsecore/cli-text.c +++ b/src/pulsecore/cli-text.c @@ -114,14 +114,15 @@ char *pa_sink_list_to_string(pa_core *c) { " %c index: %u\n" "\tname: <%s>\n" "\tdriver: <%s>\n" - "\tis_hardware: <%i>\n" + "\tis hardware: <%i>\n" "\tstate: %s\n" "\tvolume: <%s>\n" "\tmute: <%i>\n" "\tlatency: <%0.0f usec>\n" - "\tmonitor_source: <%u>\n" + "\tmonitor source: <%u>\n" "\tsample spec: <%s>\n" - "\tchannel map: <%s>\n", + "\tchannel map: <%s>\n" + "\tused by: <%u>\n", c->default_sink_name && !strcmp(sink->name, c->default_sink_name) ? '*' : ' ', sink->index, sink->name, @@ -133,7 +134,8 @@ char *pa_sink_list_to_string(pa_core *c) { (double) pa_sink_get_latency(sink), sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX, pa_sample_spec_snprint(ss, sizeof(ss), &sink->sample_spec), - pa_channel_map_snprint(cm, sizeof(cm), &sink->channel_map)); + pa_channel_map_snprint(cm, sizeof(cm), &sink->channel_map), + pa_sink_used_by(sink)); if (sink->module) pa_strbuf_printf(s, "\tmodule: <%u>\n", sink->module->index); @@ -170,13 +172,14 @@ char *pa_source_list_to_string(pa_core *c) { " %c index: %u\n" "\tname: <%s>\n" "\tdriver: <%s>\n" - "\tis_hardware: <%i>\n" + "\tis hardware: <%i>\n" "\tstate: %s\n" "\tvolume: <%s>\n" "\tmute: <%u>\n" "\tlatency: <%0.0f usec>\n" "\tsample spec: <%s>\n" - "\tchannel map: <%s>\n", + "\tchannel map: <%s>\n" + "\tused by: <%u>\n", c->default_source_name && !strcmp(source->name, c->default_source_name) ? '*' : ' ', source->index, source->name, @@ -187,7 +190,8 @@ char *pa_source_list_to_string(pa_core *c) { !!pa_source_get_mute(source), (double) pa_source_get_latency(source), pa_sample_spec_snprint(ss, sizeof(ss), &source->sample_spec), - pa_channel_map_snprint(cm, sizeof(cm), &source->channel_map)); + pa_channel_map_snprint(cm, sizeof(cm), &source->channel_map), + pa_source_used_by(source)); if (source->monitor_of) pa_strbuf_printf(s, "\tmonitor_of: <%u>\n", source->monitor_of->index); diff --git a/src/pulsecore/core-subscribe.c b/src/pulsecore/core-subscribe.c index 6608d57a..288d1078 100644 --- a/src/pulsecore/core-subscribe.c +++ b/src/pulsecore/core-subscribe.c @@ -207,7 +207,7 @@ static void sched_event(pa_core *c) { } /* Append a new subscription event to the subscription event queue and schedule a main loop event */ -void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t index) { +void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t idx) { pa_subscription_event *e; assert(c); @@ -227,7 +227,7 @@ void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t i continue; /* not the same object */ - if (i->index != index) + if (i->index != idx) continue; if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE) { @@ -253,7 +253,7 @@ void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t i e = pa_xnew(pa_subscription_event, 1); e->core = c; e->type = t; - e->index = index; + e->index = idx; PA_LLIST_INSERT_AFTER(pa_subscription_event, c->subscription_event_queue, c->subscription_event_last, e); c->subscription_event_last = e; diff --git a/src/pulsecore/core.c b/src/pulsecore/core.c index 99ac74e1..a940bfc0 100644 --- a/src/pulsecore/core.c +++ b/src/pulsecore/core.c @@ -49,6 +49,8 @@ #include "core.h" +static PA_DEFINE_CHECK_TYPE(pa_core, core_check_type, pa_msgobject_check_type); + static int core_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) { pa_core *c = PA_CORE(o); @@ -81,8 +83,10 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even /* Check whether there is a message for us to process */ while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) { - pa_asyncmsgq_dispatch(object, code, data, &chunk); - pa_asyncmsgq_done(c->asyncmsgq, 0); + int ret; + + ret = pa_asyncmsgq_dispatch(object, code, data, &chunk); + pa_asyncmsgq_done(c->asyncmsgq, ret); } if (pa_asyncmsgq_before_poll(c->asyncmsgq) == 0) @@ -112,7 +116,7 @@ pa_core* pa_core_new(pa_mainloop_api *m, int shared) { } } - c = pa_msgobject_new(pa_core); + c = pa_msgobject_new(pa_core, core_check_type); c->parent.parent.free = core_free; c->parent.process_msg = core_process_msg; @@ -181,7 +185,7 @@ pa_core* pa_core_new(pa_mainloop_api *m, int shared) { static void core_free(pa_object *o) { pa_core *c = PA_CORE(o); - pa_core_assert_ref(c); + pa_assert(c); pa_module_unload_all(c); assert(!c->modules); @@ -212,13 +216,14 @@ static void core_free(pa_object *o) { pa_xfree(c->default_source_name); pa_xfree(c->default_sink_name); + pa_asyncmsgq_after_poll(c->asyncmsgq); + pa_asyncmsgq_free(c->asyncmsgq); + pa_mempool_free(c->mempool); pa_property_cleanup(c); c->mainloop->io_free(c->asyncmsgq_event); - pa_asyncmsgq_after_poll(c->asyncmsgq); - pa_asyncmsgq_free(c->asyncmsgq); pa_hook_free(&c->hook_sink_input_new); pa_hook_free(&c->hook_sink_disconnect); diff --git a/src/pulsecore/core.h b/src/pulsecore/core.h index 86660b7a..a64f2179 100644 --- a/src/pulsecore/core.h +++ b/src/pulsecore/core.h @@ -98,7 +98,7 @@ struct pa_core { }; PA_DECLARE_CLASS(pa_core); -#define PA_CORE(o) ((pa_core*) o) +#define PA_CORE(o) pa_core_cast(o) enum { PA_CORE_MESSAGE_UNLOAD_MODULE, diff --git a/src/pulsecore/log.c b/src/pulsecore/log.c index 0033adb9..a1197eb5 100644 --- a/src/pulsecore/log.c +++ b/src/pulsecore/log.c @@ -71,14 +71,11 @@ static const char level_to_char[] = { }; void pa_log_set_ident(const char *p) { - if (log_ident) - pa_xfree(log_ident); - if (log_ident_local) - pa_xfree(log_ident_local); + pa_xfree(log_ident); + pa_xfree(log_ident_local); log_ident = pa_xstrdup(p); - log_ident_local = pa_utf8_to_locale(log_ident); - if (!log_ident_local) + if (!(log_ident_local = pa_utf8_to_locale(log_ident))) log_ident_local = pa_xstrdup(log_ident); } diff --git a/src/pulsecore/msgobject.c b/src/pulsecore/msgobject.c index ce9f22f2..6db630c5 100644 --- a/src/pulsecore/msgobject.c +++ b/src/pulsecore/msgobject.c @@ -28,13 +28,15 @@ #include "msgobject.h" -pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name) { +PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_msgobject_check_type, pa_object_check_type); + +pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) { pa_msgobject *o; pa_assert(size > sizeof(pa_msgobject)); pa_assert(type_name); - o = PA_MSGOBJECT(pa_object_new_internal(size, type_name)); + o = PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type ? check_type : pa_msgobject_check_type)); o->process_msg = NULL; return o; } diff --git a/src/pulsecore/msgobject.h b/src/pulsecore/msgobject.h index 317ebd20..65761aea 100644 --- a/src/pulsecore/msgobject.h +++ b/src/pulsecore/msgobject.h @@ -40,12 +40,14 @@ struct pa_msgobject { int (*process_msg)(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk); }; -pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name); +pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)); -#define pa_msgobject_new(type) ((type*) pa_msgobject_new_internal(sizeof(type), #type)) +int pa_msgobject_check_type(pa_object *o, const char *type); + +#define pa_msgobject_new(type, check_type) ((type*) pa_msgobject_new_internal(sizeof(type), #type, check_type)) #define pa_msgobject_free ((void (*) (pa_msgobject* o)) pa_object_free) -#define PA_MSGOBJECT(o) ((pa_msgobject*) (o)) +#define PA_MSGOBJECT(o) pa_msgobject_cast(o) PA_DECLARE_CLASS(pa_msgobject); diff --git a/src/pulsecore/object.c b/src/pulsecore/object.c index e6ed53b2..a983c5ae 100644 --- a/src/pulsecore/object.c +++ b/src/pulsecore/object.c @@ -28,7 +28,7 @@ #include "object.h" -pa_object *pa_object_new_internal(size_t size, const char *type_name) { +pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) { pa_object *o; pa_assert(size > sizeof(pa_object)); @@ -38,24 +38,30 @@ pa_object *pa_object_new_internal(size_t size, const char *type_name) { PA_REFCNT_INIT(o); o->type_name = type_name; o->free = pa_object_free; + o->check_type = check_type ? check_type : pa_object_check_type; return o; } pa_object *pa_object_ref(pa_object *o) { - pa_assert(o); - pa_assert(PA_REFCNT_VALUE(o) >= 1); + pa_object_assert_ref(o); PA_REFCNT_INC(o); return o; } void pa_object_unref(pa_object *o) { - pa_assert(o); - pa_assert(PA_REFCNT_VALUE(o) >= 1); + pa_object_assert_ref(o); if (PA_REFCNT_DEC(o) <= 0) { pa_assert(o->free); o->free(o); } } + +int pa_object_check_type(pa_object *o, const char *type_name) { + pa_assert(o); + pa_assert(type_name); + + return type_name == "pa_object" || strcmp(type_name, "pa_object") == 0; +} diff --git a/src/pulsecore/object.h b/src/pulsecore/object.h index e195a359..270f289d 100644 --- a/src/pulsecore/object.h +++ b/src/pulsecore/object.h @@ -25,7 +25,9 @@ USA. ***/ +#include <string.h> #include <sys/types.h> + #include <pulse/xmalloc.h> #include <pulsecore/refcnt.h> #include <pulsecore/macro.h> @@ -36,13 +38,22 @@ struct pa_object { PA_REFCNT_DECLARE; const char *type_name; void (*free)(pa_object *o); + int (*check_type)(pa_object *o, const char *type_name); }; -pa_object *pa_object_new_internal(size_t size, const char *type_name); -#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), #type)) +pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)); +#define pa_object_new(type, check_type) ((type*) pa_object_new_internal(sizeof(type), #type, check_type) #define pa_object_free ((void (*) (pa_object* o)) pa_xfree) +int pa_object_check_type(pa_object *o, const char *type); + +static inline int pa_object_isinstance(void *o) { + pa_object *obj = (pa_object*) o; + pa_assert(obj); + return obj->check_type(obj, "pa_object"); +} + pa_object *pa_object_ref(pa_object *o); void pa_object_unref(pa_object *o); @@ -50,23 +61,50 @@ static inline int pa_object_refcnt(pa_object *o) { return o ? PA_REFCNT_VALUE(o) : 0; } +static inline pa_object* pa_object_cast(void *o) { + pa_object *obj = (pa_object*) o; + pa_assert(obj->check_type(obj, "pa_object")); + return obj; +} + #define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o)) -#define PA_OBJECT(o) ((pa_object*) (o)) - -#define PA_DECLARE_CLASS(c) \ - static inline c* c##_ref(c *o) { \ - return (c*) pa_object_ref(PA_OBJECT(o)); \ - } \ - static inline void c##_unref(c* o) { \ - pa_object_unref(PA_OBJECT(o)); \ - } \ - static inline int c##_refcnt(c* o) { \ - return pa_object_refcnt(PA_OBJECT(o)); \ - } \ - static inline void c##_assert_ref(c *o) { \ - pa_object_assert_ref(PA_OBJECT(o)); \ - } \ +#define PA_OBJECT(o) pa_object_cast(o) + +#define PA_DECLARE_CLASS(c) \ + static inline int c##_isinstance(void *o) { \ + pa_object *obj = (pa_object*) o; \ + pa_assert(obj); \ + return obj->check_type(obj, #c); \ + } \ + static inline c* c##_cast(void *o) { \ + pa_assert(c##_isinstance(o)); \ + return (c*) o; \ + } \ + static inline c* c##_ref(c *o) { \ + return (c*) pa_object_ref(PA_OBJECT(o)); \ + } \ + static inline void c##_unref(c* o) { \ + pa_object_unref(PA_OBJECT(o)); \ + } \ + static inline int c##_refcnt(c* o) { \ + return pa_object_refcnt(PA_OBJECT(o)); \ + } \ + static inline void c##_assert_ref(c *o) { \ + pa_object_assert_ref(PA_OBJECT(o)); \ + } \ + struct __stupid_useless_struct_to_allow_trailing_semicolon + +#define PA_DEFINE_CHECK_TYPE(c, func, parent) \ + int func(pa_object *o, const char *type) { \ + pa_assert(o); \ + pa_assert(type); \ + if (type == #c || \ + strcmp(type, #c) == 0) \ + return 1; \ + return parent(o, type); \ + } \ struct __stupid_useless_struct_to_allow_trailing_semicolon + #endif diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index b7a4cc78..67741bde 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -40,13 +40,15 @@ #include <pulsecore/namereg.h> #include <pulsecore/log.h> #include <pulsecore/core-error.h> +#include <pulsecore/atomic.h> #include "protocol-simple.h" /* Don't allow more than this many concurrent connections */ #define MAX_CONNECTIONS 10 -struct connection { +typedef struct connection { + pa_msgobject parent; pa_protocol_simple *protocol; pa_iochannel *io; pa_sink_input *sink_input; @@ -59,18 +61,21 @@ struct connection { struct { pa_memblock *current_memblock; size_t memblock_index, fragment_size; - pa_atomic_int missing; + pa_atomic_t missing; } playback; -}; +} connection; + +PA_DECLARE_CLASS(connection); +#define CONNECTION(o) (connection_cast(o)) +static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type); + struct pa_protocol_simple { pa_module *module; pa_core *core; pa_socket_server*server; pa_idxset *connections; - pa_asyncmsgq *asyncmsgq; - enum { RECORD = 1, PLAYBACK = 2, @@ -86,8 +91,9 @@ enum { }; enum { - MESSAGE_REQUEST_DATA, /* data from source output to main loop */ - MESSAGE_POST_DATA /* data from source output to main loop */ + MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */ + MESSAGE_POST_DATA, /* data from source output to main loop */ + MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */ }; @@ -96,37 +102,49 @@ enum { #define RECORD_BUFFER_SECONDS (5) #define RECORD_BUFFER_FRAGMENTS (100) -static void connection_free(struct connection *c) { +static void connection_free(pa_object *o) { + connection *c = CONNECTION(o); pa_assert(c); + if (c->playback.current_memblock) + pa_memblock_unref(c->playback.current_memblock); + + if (c->io) + pa_iochannel_free(c->io); + if (c->input_memblockq) + pa_memblockq_free(c->input_memblockq); + if (c->output_memblockq) + pa_memblockq_free(c->output_memblockq); + + pa_xfree(c); +} + +static void connection_drop(connection *c) { + pa_assert(c); + pa_idxset_remove_by_data(c->protocol->connections, c, NULL); if (c->sink_input) { pa_sink_input_disconnect(c->sink_input); pa_sink_input_unref(c->sink_input); + c->sink_input = NULL; } if (c->source_output) { pa_source_output_disconnect(c->source_output); pa_source_output_unref(c->source_output); + c->source_output = NULL; } - if (c->playback.current_memblock) - pa_memblock_unref(c->playback.current_memblock); - - if (c->client) + if (c->client) { pa_client_free(c->client); - if (c->io) - pa_iochannel_free(c->io); - if (c->input_memblockq) - pa_memblockq_free(c->input_memblockq); - if (c->output_memblockq) - pa_memblockq_free(c->output_memblockq); + c->client = NULL; + } - pa_xfree(c); + connection_unref(c); } -static int do_read(struct connection *c) { +static int do_read(connection *c) { pa_memchunk chunk; ssize_t r; size_t l; @@ -171,17 +189,17 @@ static int do_read(struct connection *c) { c->playback.memblock_index += r; - pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_POST_DATA, NULL, &chunk, NULL, NULL); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL); return 0; } -static int do_write(struct connection *c) { +static int do_write(connection *c) { pa_memchunk chunk; ssize_t r; void *p; - p_assert(c); + pa_assert(c); if (!c->source_output) return 0; @@ -212,7 +230,7 @@ static int do_write(struct connection *c) { return 0; } -static void do_work(struct connection *c) { +static void do_work(connection *c) { pa_assert(c); if (c->dead) @@ -243,14 +261,39 @@ fail: pa_memblockq_prebuf_disable(c->input_memblockq); } else - connection_free(c); + connection_drop(c); +} + +static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) { + connection *c = CONNECTION(o); + + connection_assert_ref(c); + + switch (code) { + case MESSAGE_REQUEST_DATA: + do_work(c); + break; + + case MESSAGE_POST_DATA: + pa_memblockq_push(c->output_memblockq, chunk); + do_work(c); + break; + + case MESSAGE_DROP_CONNECTION: + connection_drop(c); + break; + + } + + return 0; } /*** sink_input callbacks ***/ /* Called from thread context */ -static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, const pa_memchunk *chunk) { - struct connection*c; +static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) { + pa_sink_input *i = PA_SINK_INPUT(o); + connection*c; pa_assert(i); c = i->userdata; @@ -263,6 +306,8 @@ static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, co /* New data from the main loop */ pa_memblockq_push_align(c->input_memblockq, chunk); + pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); + return 0; } @@ -276,13 +321,14 @@ static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, co } default: - return pa_sink_input_process_msg(i, code, userdata); + return pa_sink_input_process_msg(o, code, userdata, chunk); } } /* Called from thread context */ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { - struct connection*c; + connection*c; + int r; pa_assert(i); c = i->userdata; @@ -292,14 +338,14 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { r = pa_memblockq_peek(c->input_memblockq, chunk); if (c->dead && r < 0) - connection_free(c); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, c, NULL, NULL); return r; } /* Called from thread context */ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { - struct connection*c = i->userdata; + connection*c = i->userdata; size_t old, new; pa_assert(i); @@ -310,10 +356,10 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_ pa_memblockq_drop(c->input_memblockq, chunk, length); new = pa_memblockq_missing(c->input_memblockq); - pa_atomic_store(&c->playback.missing, &new); + pa_atomic_store(&c->playback.missing, new); if (new > old) - pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, NULL, NULL, NULL); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL); } /* Called from main context */ @@ -321,34 +367,34 @@ static void sink_input_kill_cb(pa_sink_input *i) { pa_assert(i); pa_assert(i->userdata); - connection_free((struct connection *) i->userdata); + connection_drop((connection *) i->userdata); } /*** source_output callbacks ***/ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { - struct connection *c; + connection *c; pa_assert(o); c = o->userdata; pa_assert(c); pa_assert(chunk); - pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, chunk, NULL, NULL); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL); } static void source_output_kill_cb(pa_source_output *o) { - struct connection*c; + connection*c; pa_assert(o); c = o->userdata; pa_assert(c); - connection_free(c); + connection_drop(c); } static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { - struct connection*c; + connection*c; pa_assert(o); c = o->userdata; @@ -360,19 +406,19 @@ static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { /*** client callbacks ***/ static void client_kill_cb(pa_client *client) { - struct connection*c; + connection*c; pa_assert(client); c = client->userdata; pa_assert(c); - connection_free(client); + connection_drop(c); } /*** pa_iochannel callbacks ***/ static void io_callback(pa_iochannel*io, void *userdata) { - struct connection *c = userdata; + connection *c = userdata; pa_assert(io); pa_assert(c); @@ -384,7 +430,7 @@ static void io_callback(pa_iochannel*io, void *userdata) { static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) { pa_protocol_simple *p = userdata; - struct connection *c = NULL; + connection *c = NULL; char cname[256]; pa_assert(s); @@ -397,7 +443,9 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) return; } - c = pa_xnew(struct connection, 1); + c = pa_msgobject_new(connection, connection_check_type); + c->parent.parent.free = connection_free; + c->parent.process_msg = connection_process_msg; c->io = io; c->sink_input = NULL; c->source_output = NULL; @@ -415,7 +463,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->client->kill = client_kill_cb; c->client->userdata = c; - if (p->mode & PLAYBACK) { pa_sink_input_new_data data; size_t l; @@ -432,10 +479,10 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) goto fail; } + c->sink_input->parent.process_msg = sink_input_process_msg; c->sink_input->peek = sink_input_peek_cb; c->sink_input->drop = sink_input_drop_cb; c->sink_input->kill = sink_input_kill_cb; - c->sink_input->get_latency = sink_input_get_latency_cb; c->sink_input->userdata = c; l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS); @@ -449,7 +496,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) NULL); pa_assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); - c->playback.fragment_size = l/10; + c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); @@ -498,47 +545,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) fail: if (c) - connection_free(c); -} - -static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { - pa_protocol_simple *p = userdata; - int do_some_work = 0; - - pa_assert(pa_asyncmsgq_get_fd(p->asyncmsgq) == fd); - pa_assert(events == PA_IO_EVENT_INPUT); - - pa_asyncmsgq_after_poll(p->asyncmsgq); - - for (;;) { - int code; - void *object, *data; - - /* Check whether there is a message for us to process */ - while (pa_asyncmsgq_get(p->asyncmsgq, &object, &code, &data) == 0) { - - connection *c = object; - - pa_assert(c); - - switch (code) { - - case MESSAGE_REQUEST_DATA: - do_work(c); - break; - - case MESSAGE_POST_DATA: - pa_memblockq_push(c->output_memblockq, chunk); - do_work(c); - break; - } - - pa_asyncmsgq_done(p->asyncmsgq); - } - - if (pa_asyncmsgq_before_poll(p->asyncmsgq) == 0) - break; - } + connection_drop(c); } pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { @@ -554,7 +561,6 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv p->core = core; p->server = server; p->connections = pa_idxset_new(NULL, NULL); - pa_assert_se(p->asyncmsgq = pa_asyncmsgq_new(0)); p->sample_spec = core->default_sample_spec; if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) { @@ -586,9 +592,6 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv pa_socket_server_set_callback(p->server, on_connection, p); - pa_assert_se(pa_asyncmsgq_before_poll(p->asyncmsgq) == 0); - pa_assert_se(p->asyncmsgq_event = core->mainloop->io_event_new(core->mainloop, pa_asyncmsgq_get_fd(p->asyncmsgq), PA_IO_EVENT_INPUT, p)); - return p; fail: @@ -600,12 +603,12 @@ fail: void pa_protocol_simple_free(pa_protocol_simple *p) { - struct connection *c; + connection *c; pa_assert(p); if (p->connections) { while((c = pa_idxset_first(p->connections, NULL))) - connection_free(c); + connection_drop(c); pa_idxset_free(p->connections, NULL, NULL); } @@ -613,12 +616,6 @@ void pa_protocol_simple_free(pa_protocol_simple *p) { if (p->server) pa_socket_server_unref(p->server); - if (p->asyncmsgq) { - c->mainloop->io_event_free(c->asyncmsgq_event); - pa_asyncmsgq_after_poll(c->asyncmsgq); - pa_asyncmsgq_free(p->asyncmsgq); - } - pa_xfree(p); } diff --git a/src/pulsecore/resampler.c b/src/pulsecore/resampler.c index 248d7337..a43c7c7c 100644 --- a/src/pulsecore/resampler.c +++ b/src/pulsecore/resampler.c @@ -48,6 +48,7 @@ struct pa_resampler { void (*impl_free)(pa_resampler *r); void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate); + void (*impl_update_output_rate)(pa_resampler *r, uint32_t rate); void (*impl_run)(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out); void *impl_data; }; @@ -165,6 +166,19 @@ void pa_resampler_set_input_rate(pa_resampler *r, uint32_t rate) { r->impl_update_input_rate(r, rate); } +void pa_resampler_set_output_rate(pa_resampler *r, uint32_t rate) { + assert(r); + assert(rate > 0); + + if (r->o_ss.rate == rate) + return; + + r->o_ss.rate = rate; + + if (r->impl_update_output_rate) + r->impl_update_output_rate(r, rate); +} + void pa_resampler_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out) { assert(r && in && out && r->impl_run); @@ -512,6 +526,25 @@ static void libsamplerate_update_input_rate(pa_resampler *r, uint32_t rate) { } } + +static void libsamplerate_update_output_rate(pa_resampler *r, uint32_t rate) { + struct impl_libsamplerate *u; + + assert(r); + assert(rate > 0); + assert(r->impl_data); + u = r->impl_data; + + if (!u->src_state) { + int err; + u->src_state = src_new(r->resample_method, r->o_ss.channels, &err); + assert(u->src_state); + } else { + int ret = src_set_ratio(u->src_state, (double) rate / r->i_ss.rate); + assert(ret == 0); + } +} + static int libsamplerate_init(pa_resampler *r) { struct impl_libsamplerate *u = NULL; int err; @@ -541,6 +574,7 @@ static int libsamplerate_init(pa_resampler *r) { r->impl_free = libsamplerate_free; r->impl_update_input_rate = libsamplerate_update_input_rate; + r->impl_update_output_rate = libsamplerate_update_output_rate; r->impl_run = libsamplerate_run; calc_map_table(r); @@ -631,7 +665,7 @@ static void trivial_free(pa_resampler *r) { pa_xfree(r->impl_data); } -static void trivial_update_input_rate(pa_resampler *r, uint32_t rate) { +static void trivial_update_rate(pa_resampler *r, uint32_t rate) { struct impl_trivial *u; assert(r); @@ -655,7 +689,8 @@ static int trivial_init(pa_resampler*r) { r->impl_run = trivial_run; r->impl_free = trivial_free; - r->impl_update_input_rate = trivial_update_input_rate; + r->impl_update_input_rate = trivial_update_rate; + r->impl_update_output_rate = trivial_update_rate; return 0; } diff --git a/src/pulsecore/resampler.h b/src/pulsecore/resampler.h index c283593d..ada293e5 100644 --- a/src/pulsecore/resampler.h +++ b/src/pulsecore/resampler.h @@ -63,6 +63,9 @@ void pa_resampler_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out); /* Change the input rate of the resampler object */ void pa_resampler_set_input_rate(pa_resampler *r, uint32_t rate); +/* Change the output rate of the resampler object */ +void pa_resampler_set_output_rate(pa_resampler *r, uint32_t rate); + /* Return the resampling method of the resampler object */ pa_resample_method_t pa_resampler_get_method(pa_resampler *r); diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index 00b82d26..2c6b356c 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -45,7 +45,9 @@ #define MOVE_BUFFER_LENGTH (1024*1024) #define SILENCE_BUFFER_LENGTH (64*1024) -static void sink_input_free(pa_msgobject *o); +static PA_DEFINE_CHECK_TYPE(pa_sink_input, sink_input_check_type, pa_msgobject_check_type); + +static void sink_input_free(pa_object *o); pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data *data) { pa_assert(data); @@ -159,13 +161,12 @@ pa_sink_input* pa_sink_input_new( data->resample_method = pa_resampler_get_method(resampler); } - i = pa_msgobject_new(pa_sink_input); - + i = pa_msgobject_new(pa_sink_input, sink_input_check_type); i->parent.parent.free = sink_input_free; i->parent.process_msg = pa_sink_input_process_msg; i->core = core; - pa_atomic_load(&i->state, PA_SINK_INPUT_DRAINED); + pa_atomic_store(&i->state, PA_SINK_INPUT_DRAINED); i->flags = flags; i->name = pa_xstrdup(data->name); i->driver = pa_xstrdup(data->driver); @@ -189,11 +190,11 @@ pa_sink_input* pa_sink_input_new( i->userdata = NULL; i->thread_info.silence_memblock = NULL; - i->thread_info.move_silence = 0; +/* i->thread_info.move_silence = 0; */ pa_memchunk_reset(&i->thread_info.resampled_chunk); i->thread_info.resampler = resampler; - i->thread_info.soft_volume = i->volume; - i->thread_info.soft_muted = i->muted; + i->thread_info.volume = i->volume; + i->thread_info.muted = i->muted; pa_assert_se(pa_idxset_put(core->sink_inputs, i, &i->index) == 0); pa_assert_se(pa_idxset_put(i->sink->inputs, i, NULL) == 0); @@ -213,14 +214,16 @@ void pa_sink_input_disconnect(pa_sink_input *i) { pa_assert(i); pa_return_if_fail(pa_sink_input_get_state(i) != PA_SINK_INPUT_DISCONNECTED); - pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL); + pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL); pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL); pa_idxset_remove_by_data(i->sink->inputs, i, NULL); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index); - i->sink = NULL; + pa_sink_update_status(i->sink); + + i->sink = NULL; i->process_msg = NULL; i->peek = NULL; i->drop = NULL; @@ -228,10 +231,10 @@ void pa_sink_input_disconnect(pa_sink_input *i) { i->get_latency = NULL; i->underrun = NULL; - pa_atomic_load(&i->state, PA_SINK_INPUT_DISCONNECTED); + pa_atomic_store(&i->state, PA_SINK_INPUT_DISCONNECTED); } -static void sink_input_free(pa_msgobject *o) { +static void sink_input_free(pa_object *o) { pa_sink_input* i = PA_SINK_INPUT(o); pa_assert(i); @@ -241,8 +244,8 @@ static void sink_input_free(pa_msgobject *o) { pa_log_info("Freeing output %u \"%s\"", i->index, i->name); - if (i->resampled_chunk.memblock) - pa_memblock_unref(i->resampled_chunk.memblock); + if (i->thread_info.resampled_chunk.memblock) + pa_memblock_unref(i->thread_info.resampled_chunk.memblock); if (i->thread_info.resampler) pa_resampler_free(i->thread_info.resampler); @@ -261,10 +264,10 @@ void pa_sink_input_put(pa_sink_input *i) { i->thread_info.volume = i->volume; i->thread_info.muted = i->muted; - pa_asyncmsgq_post(i->sink->asyncmsgq, i->sink, PA_SINK_MESSAGE_ADD_INPUT, i, NULL, pa_sink_unref, pa_sink_input_unref); + pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, pa_sink_input_ref(i), NULL, (pa_free_cb_t) pa_sink_input_unref); pa_sink_update_status(i->sink); - pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index); + pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index); } void pa_sink_input_kill(pa_sink_input*i) { @@ -279,7 +282,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i) { pa_sink_input_assert_ref(i); - if (pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0) + if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0) r = 0; if (i->get_latency) @@ -327,14 +330,14 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) /* goto finish; */ /* } */ - if (!i->resampler) { + if (!i->thread_info.resampler) { do_volume_adj_here = 0; ret = i->peek(i, chunk); goto finish; } do_volume_adj_here = !pa_channel_map_equal(&i->channel_map, &i->sink->channel_map); - volume_is_norm = pa_cvolume_is_norm(&i->thread_info.soft_volume) && !i->thread_info.soft_muted; + volume_is_norm = pa_cvolume_is_norm(&i->thread_info.volume) && !i->thread_info.muted; while (!i->thread_info.resampled_chunk.memblock) { pa_memchunk tchunk; @@ -345,7 +348,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) pa_assert(tchunk.length); - l = pa_resampler_request(i->resampler, CONVERT_BUFFER_LENGTH); + l = pa_resampler_request(i->thread_info.resampler, CONVERT_BUFFER_LENGTH); if (l > tchunk.length) l = tchunk.length; @@ -356,10 +359,10 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) /* It might be necessary to adjust the volume here */ if (do_volume_adj_here && !volume_is_norm) { pa_memchunk_make_writable(&tchunk, 0); - pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.soft_volume); + pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.volume); } - pa_resampler_run(i->resampler, &tchunk, &i->thread_info.resampled_chunk); + pa_resampler_run(i->thread_info.resampler, &tchunk, &i->thread_info.resampled_chunk); pa_memblock_unref(tchunk.memblock); } @@ -378,7 +381,7 @@ finish: if (ret >= 0) pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_RUNNING); - else if (ret < 0 && i->state == PA_SINK_INPUT_RUNNING) + else if (ret < 0 && state == PA_SINK_INPUT_RUNNING) pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED); if (ret >= 0) { @@ -427,7 +430,7 @@ void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t lengt /* return; */ /* } */ - if (!i->resampler) { + if (!i->thread_info.resampler) { if (i->drop) i->drop(i, chunk, length); return; @@ -454,7 +457,7 @@ void pa_sink_input_set_volume(pa_sink_input *i, const pa_cvolume *volume) { i->volume = *volume; - pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), pa_sink_input_unref, pa_xfree); + pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); } @@ -473,18 +476,17 @@ void pa_sink_input_set_mute(pa_sink_input *i, int mute) { i->muted = mute; - pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), pa_sink_input_unref, NULL); + pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); } int pa_sink_input_get_mute(pa_sink_input *i) { pa_sink_input_assert_ref(i); - return !!i->mute; + return !!i->muted; } void pa_sink_input_cork(pa_sink_input *i, int b) { - int n; pa_sink_input_state_t state; pa_sink_input_assert_ref(i); @@ -493,24 +495,24 @@ void pa_sink_input_cork(pa_sink_input *i, int b) { pa_assert(state != PA_SINK_INPUT_DISCONNECTED); if (b && state != PA_SINK_INPUT_CORKED) - pa_atomic_store(i->state, PA_SINK_INPUT_CORKED); + pa_atomic_store(&i->state, PA_SINK_INPUT_CORKED); else if (!b && state == PA_SINK_INPUT_CORKED) - pa_atomic_cmpxchg(i->state, state, PA_SINK_INPUT_DRAINED); + pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED); } int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) { pa_sink_input_assert_ref(i); - pa_return_val_if_fail(u->thread_info.resampler, -1); + pa_return_val_if_fail(i->thread_info.resampler, -1); if (i->sample_spec.rate == rate) return 0; i->sample_spec.rate = rate; - pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, pa_sink_input_unref, NULL); + pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); - return 0 + return 0; } void pa_sink_input_set_name(pa_sink_input *i, const char *name) { @@ -535,9 +537,9 @@ pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i) { } int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { - pa_resampler *new_resampler = NULL; - pa_memblockq *buffer = NULL; - pa_sink *origin; +/* pa_resampler *new_resampler = NULL; */ +/* pa_memblockq *buffer = NULL; */ +/* pa_sink *origin; */ pa_sink_input_assert_ref(i); pa_sink_assert_ref(dest); @@ -702,18 +704,18 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc switch (code) { case PA_SINK_INPUT_MESSAGE_SET_VOLUME: - s->thread_info.soft_volume = *((pa_cvolume*) userdata); + i->thread_info.volume = *((pa_cvolume*) userdata); return 0; case PA_SINK_INPUT_MESSAGE_SET_MUTE: - s->thread_info.soft_muted = PA_PTR_TO_UINT(userdata); + i->thread_info.muted = PA_PTR_TO_UINT(userdata); return 0; case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = userdata; if (i->thread_info.resampled_chunk.memblock) - *r += pa_bytes_to_usec(i->resampled_chunk.length, &i->sink->sample_spec); + *r += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &i->sink->sample_spec); /* if (i->move_silence) */ /* r += pa_bytes_to_usec(i->move_silence, &i->sink->sample_spec); */ @@ -724,7 +726,7 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc case PA_SINK_INPUT_MESSAGE_SET_RATE: { i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); - pa_resampler_set_input_rate(i->resampler, PA_PTR_TO_UINT(userdata)); + pa_resampler_set_input_rate(i->thread_info.resampler, PA_PTR_TO_UINT(userdata)); return 0; } diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index 338d6962..a8c05b85 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -99,7 +99,7 @@ struct pa_sink_input { }; PA_DECLARE_CLASS(pa_sink_input); -#define PA_SINK_INPUT(o) ((pa_sink_input*) (o)) +#define PA_SINK_INPUT(o) pa_sink_input_cast(o) enum { PA_SINK_INPUT_MESSAGE_SET_VOLUME, @@ -160,7 +160,7 @@ int pa_sink_input_get_mute(pa_sink_input *i); void pa_sink_input_cork(pa_sink_input *i, int b); -void pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate); +int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate); pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i); diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 0e022d92..7f009048 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -47,6 +47,8 @@ #define MAX_MIX_CHANNELS 32 +static PA_DEFINE_CHECK_TYPE(pa_sink, sink_check_type, pa_msgobject_check_type); + static void sink_free(pa_object *s); pa_sink* pa_sink_new( @@ -77,7 +79,7 @@ pa_sink* pa_sink_new( pa_return_null_if_fail(!driver || pa_utf8_valid(driver)); pa_return_null_if_fail(name && pa_utf8_valid(name) && *name); - s = pa_msgobject_new(pa_sink); + s = pa_msgobject_new(pa_sink, sink_check_type); if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fail))) { pa_xfree(s); @@ -88,7 +90,7 @@ pa_sink* pa_sink_new( s->parent.process_msg = pa_sink_process_msg; s->core = core; - pa_atomic_store(&s->state, PA_SINK_IDLE); + s->state = PA_SINK_IDLE; s->name = pa_xstrdup(name); s->description = NULL; s->driver = pa_xstrdup(driver); @@ -110,11 +112,10 @@ pa_sink* pa_sink_new( s->get_volume = NULL; s->set_mute = NULL; s->get_mute = NULL; - s->start = NULL; - s->stop = NULL; + s->set_state = NULL; s->userdata = NULL; - pa_assert_se(s->asyncmsgq = pa_asyncmsgq_new(0)); + s->asyncmsgq = NULL; r = pa_idxset_put(core->sinks, s, &s->index); pa_assert(s->index != PA_IDXSET_INVALID && r >= 0); @@ -139,56 +140,40 @@ pa_sink* pa_sink_new( s->thread_info.inputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); s->thread_info.soft_volume = s->volume; s->thread_info.soft_muted = s->muted; + s->thread_info.state = s->state; pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index); return s; } -static void sink_start(pa_sink *s) { - pa_sink_state_t state; +static int sink_set_state(pa_sink *s, pa_sink_state_t state) { + int ret; + pa_assert(s); - state = pa_sink_get_state(s); - pa_return_if_fail(state == PA_SINK_IDLE || state == PA_SINK_SUSPENDED); - - pa_atomic_store(&s->state, PA_SINK_RUNNING); - - if (s->start) - s->start(s); - else - pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_START, NULL, NULL, NULL); -} - -static void sink_stop(pa_sink *s) { - pa_sink_state_t state; - int stop; + if (s->state == state) + return 0; - pa_assert(s); - state = pa_sink_get_state(s); - pa_return_if_fail(state == PA_SINK_RUNNING || state == PA_SINK_SUSPENDED); + if (s->set_state) + if ((ret = s->set_state(s, state)) < 0) + return -1; - stop = state == PA_SINK_RUNNING; - pa_atomic_store(&s->state, PA_SINK_IDLE); + if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0) + return -1; - if (stop) { - if (s->stop) - s->stop(s); - else - pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_STOP, NULL, NULL, NULL); - } + s->state = state; + return 0; } void pa_sink_disconnect(pa_sink* s) { pa_sink_input *i, *j = NULL; pa_assert(s); - pa_return_if_fail(pa_sink_get_state(s) != PA_SINK_DISCONNECTED); - - sink_stop(s); + pa_return_if_fail(s->state != PA_SINK_DISCONNECTED); - pa_atomic_store(&s->state, PA_SINK_DISCONNECTED); pa_namereg_unregister(s->core, s->name); + pa_idxset_remove_by_data(s->core->sinks, s, NULL); pa_hook_fire(&s->core->hook_sink_disconnect, s); @@ -201,26 +186,27 @@ void pa_sink_disconnect(pa_sink* s) { if (s->monitor_source) pa_source_disconnect(s->monitor_source); - pa_idxset_remove_by_data(s->core->sinks, s, NULL); + sink_set_state(s, PA_SINK_DISCONNECTED); s->get_latency = NULL; s->get_volume = NULL; s->set_volume = NULL; s->set_mute = NULL; s->get_mute = NULL; - s->start = NULL; - s->stop = NULL; + s->set_state = NULL; pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_REMOVE, s->index); } static void sink_free(pa_object *o) { pa_sink *s = PA_SINK(o); + pa_sink_input *i; pa_assert(s); pa_assert(pa_sink_refcnt(s) == 0); - pa_sink_disconnect(s); + if (s->state != PA_SINK_DISCONNECTED) + pa_sink_disconnect(s); pa_log_info("Freeing sink %u \"%s\"", s->index, s->name); @@ -231,9 +217,10 @@ static void sink_free(pa_object *o) { pa_idxset_free(s->inputs, NULL, NULL); - pa_hashmap_free(s->thread_info.inputs, (pa_free2_cb_t) pa_sink_input_unref, NULL); - - pa_asyncmsgq_free(s->asyncmsgq); + while ((i = pa_hashmap_steal_first(s->thread_info.inputs))) + pa_sink_input_unref(i); + + pa_hashmap_free(s->thread_info.inputs, NULL, NULL); pa_xfree(s->name); pa_xfree(s->description); @@ -241,44 +228,38 @@ static void sink_free(pa_object *o) { pa_xfree(s); } -void pa_sink_update_status(pa_sink*s) { +void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q) { pa_sink_assert_ref(s); + pa_assert(q); - if (pa_sink_get_state(s) == PA_SINK_SUSPENDED) - return; + s->asyncmsgq = q; - if (pa_sink_used_by(s) > 0) - sink_start(s); - else - sink_stop(s); + if (s->monitor_source) + pa_source_set_asyncmsgq(s->monitor_source, q); } -void pa_sink_suspend(pa_sink *s, int suspend) { - pa_sink_state_t state; - +int pa_sink_update_status(pa_sink*s) { pa_sink_assert_ref(s); - state = pa_sink_get_state(s); - pa_return_if_fail(suspend && (state == PA_SINK_RUNNING || state == PA_SINK_IDLE)); - pa_return_if_fail(!suspend && (state == PA_SINK_SUSPENDED)); + if (s->state == PA_SINK_SUSPENDED) + return 0; + return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE); +} - if (suspend) { - pa_atomic_store(&s->state, PA_SINK_SUSPENDED); +int pa_sink_suspend(pa_sink *s, int suspend) { + pa_sink_assert_ref(s); - if (s->stop) - s->stop(s); - else - pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_STOP, NULL, NULL, NULL); + if (suspend) + return sink_set_state(s, PA_SINK_SUSPENDED); + else + return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE); +} - } else { - pa_atomic_store(&s->state, PA_SINK_RUNNING); +void pa_sink_ping(pa_sink *s) { + pa_sink_assert_ref(s); - if (s->start) - s->start(s); - else - pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_START, NULL, NULL, NULL); - } + pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, NULL, NULL); } static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) { @@ -652,7 +633,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk * pa_sink *s = PA_SINK(o); pa_sink_assert_ref(s); - switch (code) { + switch ((pa_sink_message_t) code) { case PA_SINK_MESSAGE_ADD_INPUT: { pa_sink_input *i = userdata; pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i)); @@ -681,7 +662,17 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk * *((int*) userdata) = s->thread_info.soft_muted; return 0; - default: - return -1; + case PA_SINK_MESSAGE_PING: + return 0; + + case PA_SINK_MESSAGE_SET_STATE: + s->thread_info.state = PA_PTR_TO_UINT(userdata); + return 0; + + case PA_SINK_MESSAGE_GET_LATENCY: + case PA_SINK_MESSAGE_MAX: + ; } + + return -1; } diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index 2939cc47..0b308e53 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -55,7 +55,7 @@ struct pa_sink { uint32_t index; pa_core *core; - pa_atomic_t state; + pa_sink_state_t state; char *name; char *description, *driver; /* may be NULL */ @@ -74,8 +74,7 @@ struct pa_sink { int refresh_volume; int refresh_mute; - int (*start)(pa_sink *s); - int (*stop)(pa_sink *s); + int (*set_state)(pa_sink *s, pa_sink_state_t state); int (*set_volume)(pa_sink *s); /* dito */ int (*get_volume)(pa_sink *s); /* dito */ int (*get_mute)(pa_sink *s); /* dito */ @@ -87,6 +86,7 @@ struct pa_sink { /* Contains copies of the above data so that the real-time worker * thread can work without access locking */ struct { + pa_sink_state_t state; pa_hashmap *inputs; pa_cvolume soft_volume; int soft_muted; @@ -96,7 +96,7 @@ struct pa_sink { }; PA_DECLARE_CLASS(pa_sink); -#define PA_SINK(s) ((pa_sink*) (s)) +#define PA_SINK(s) (pa_sink_cast(s)) typedef enum pa_sink_message { PA_SINK_MESSAGE_ADD_INPUT, @@ -106,8 +106,8 @@ typedef enum pa_sink_message { PA_SINK_MESSAGE_GET_MUTE, PA_SINK_MESSAGE_SET_MUTE, PA_SINK_MESSAGE_GET_LATENCY, - PA_SINK_MESSAGE_START, - PA_SINK_MESSAGE_STOP, + PA_SINK_MESSAGE_SET_STATE, + PA_SINK_MESSAGE_PING, PA_SINK_MESSAGE_MAX } pa_sink_message_t; @@ -125,13 +125,19 @@ void pa_sink_disconnect(pa_sink* s); void pa_sink_set_module(pa_sink *sink, pa_module *m); void pa_sink_set_description(pa_sink *s, const char *description); +void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q); /* Usable by everyone */ pa_usec_t pa_sink_get_latency(pa_sink *s); -void pa_sink_update_status(pa_sink*s); -void pa_sink_suspend(pa_sink *s, int suspend); +int pa_sink_update_status(pa_sink*s); +int pa_sink_suspend(pa_sink *s, int suspend); + +/* Sends a ping message to the sink thread, to make it wake up and + * check for data to process even if there is no real message is + * sent */ +void pa_sink_ping(pa_sink *s); void pa_sink_set_volume(pa_sink *sink, const pa_cvolume *volume); const pa_cvolume *pa_sink_get_volume(pa_sink *sink); @@ -139,7 +145,7 @@ void pa_sink_set_mute(pa_sink *sink, int mute); int pa_sink_get_mute(pa_sink *sink); unsigned pa_sink_used_by(pa_sink *s); -#define pa_sink_get_state(s) ((pa_sink_state_t) pa_atomic_load(&(s)->state)) +#define pa_sink_get_state(s) ((s)->state) /* To be used exclusively by the sink driver thread */ @@ -149,5 +155,5 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target); void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target); int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk); - + #endif diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c index a682ee6c..974c053a 100644 --- a/src/pulsecore/sound-file-stream.c +++ b/src/pulsecore/sound-file-stream.c @@ -200,7 +200,7 @@ int pa_play_file( u->sink_input->kill = sink_input_kill; u->sink_input->userdata = u; - pa_sink_notify(u->sink_input->sink); +/* pa_sink_notify(u->sink_input->sink); */ return 0; diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 517c033d..2211f251 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -38,6 +38,10 @@ #include "source-output.h" +static PA_DEFINE_CHECK_TYPE(pa_source_output, source_output_check_type, pa_msgobject_check_type); + +static void source_output_free(pa_object* mo); + pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_data *data) { pa_assert(data); @@ -126,13 +130,12 @@ pa_source_output* pa_source_output_new( data->resample_method = pa_resampler_get_method(resampler); } - o = pa_source_output_new(pa_source_output); - + o = pa_msgobject_new(pa_source_output, source_output_check_type); o->parent.parent.free = source_output_free; o->parent.process_msg = pa_source_output_process_msg; o->core = core; - pa_atomic_load(&o->state, PA_SOURCE_OUTPUT_RUNNING); + pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_RUNNING); o->flags = flags; o->name = pa_xstrdup(data->name); o->driver = pa_xstrdup(data->driver); @@ -168,27 +171,29 @@ pa_source_output* pa_source_output_new( void pa_source_output_disconnect(pa_source_output*o) { pa_assert(o); - pa_return_if_fail(pa_source_output_get_state(i) != PA_SOURCE_OUTPUT_DISCONNECTED); + pa_return_if_fail(pa_source_output_get_state(o) != PA_SOURCE_OUTPUT_DISCONNECTED); pa_assert(o->source); pa_assert(o->source->core); - pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL); + pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL); pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL); pa_idxset_remove_by_data(o->source->outputs, o, NULL); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index); - o->source = NULL; + pa_source_update_status(o->source); + + o->source = NULL; o->process_msg = NULL; o->push = NULL; o->kill = NULL; o->get_latency = NULL; - pa_atomic_load(&i->state, PA_SOURCE_OUTPUT_DISCONNECTED); + pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_DISCONNECTED); } -static void source_output_free(pa_msgobject* mo) { +static void source_output_free(pa_object* mo) { pa_source_output *o = PA_SOURCE_OUTPUT(mo); pa_assert(pa_source_output_refcnt(o) == 0); @@ -208,10 +213,10 @@ static void source_output_free(pa_msgobject* mo) { void pa_source_output_put(pa_source_output *o) { pa_source_output_assert_ref(o); - pa_asyncmsgq_post(o->source->asyncmsgq, o->source, PA_SOURCE_MESSAGE_ADD_OUTPUT, o, NULL, pa_source_unref, pa_source_output_unref); + pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), NULL, (pa_free_cb_t) pa_source_output_unref); pa_source_update_status(o->source); - pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); + pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); } void pa_source_output_kill(pa_source_output*o) { @@ -226,7 +231,7 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) { pa_source_output_assert_ref(o); - if (pa_asyncmsgq_send(o->source->asyncmsgq, i->source, PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0) + if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0) r = 0; if (o->get_latency) @@ -250,12 +255,12 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { pa_assert(state = PA_SOURCE_OUTPUT_RUNNING); - if (!o->resampler) { + if (!o->thread_info.resampler) { o->push(o, chunk); return; } - pa_resampler_run(o->resampler, chunk, &rchunk); + pa_resampler_run(o->thread_info.resampler, chunk, &rchunk); if (!rchunk.length) return; @@ -265,7 +270,6 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { } void pa_source_output_cork(pa_source_output *o, int b) { - int n; pa_source_output_state_t state; pa_source_output_assert_ref(o); @@ -274,23 +278,23 @@ void pa_source_output_cork(pa_source_output *o, int b) { pa_assert(state != PA_SOURCE_OUTPUT_DISCONNECTED); if (b && state != PA_SOURCE_OUTPUT_CORKED) - pa_atomic_store(o->state, PA_SOURCE_OUTPUT_CORKED); + pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_CORKED); else if (!b && state == PA_SOURCE_OUTPUT_CORKED) - pa_atomic_cmpxchg(o->state, state, PA_SOURCE_OUTPUT_RUNNING); + pa_atomic_cmpxchg(&o->state, state, PA_SOURCE_OUTPUT_RUNNING); } int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) { pa_source_output_assert_ref(o); pa_return_val_if_fail(o->thread_info.resampler, -1); - if (i->sample_spec.rate == rate) + if (o->sample_spec.rate == rate) return 0; - i->sample_spec.rate = rate; + o->sample_spec.rate = rate; - pa_asyncmsgq_post(s->asyncmsgq, pa_source_output_ref(i), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, pa_source_output_unref, NULL); + pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL); - pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT!|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); + pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index); return 0; } @@ -316,8 +320,8 @@ pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o) { } int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { - pa_source *origin; - pa_resampler *new_resampler = NULL; +/* pa_source *origin; */ +/* pa_resampler *new_resampler = NULL; */ pa_source_output_assert_ref(o); pa_source_assert_ref(dest); @@ -344,7 +348,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { /* else if (!pa_sample_spec_equal(&o->sample_spec, &dest->sample_spec) || */ /* !pa_channel_map_equal(&o->channel_map, &dest->channel_map)) { */ -/* /\* Okey, we need a new resampler for the new sink *\/ */ +/* /\* Okey, we need a new resampler for the new source *\/ */ /* if (!(new_resampler = pa_resampler_new( */ /* dest->core->mempool, */ @@ -376,16 +380,16 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { } int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk* chunk) { - pa_source_output *o = PA_SOURCE_OUTPUT(o); + pa_source_output *o = PA_SOURCE_OUTPUT(mo); - pa_source_output_assert_ref(i); + pa_source_output_assert_ref(o); switch (code) { case PA_SOURCE_OUTPUT_MESSAGE_SET_RATE: { - i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); - pa_resampler_set_output_rate(i->resampler, PA_PTR_TO_UINT(userdata)); + o->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); + pa_resampler_set_output_rate(o->thread_info.resampler, PA_PTR_TO_UINT(userdata)); return 0; } diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h index e7c2c131..d3bc0bc4 100644 --- a/src/pulsecore/source-output.h +++ b/src/pulsecore/source-output.h @@ -80,7 +80,7 @@ struct pa_source_output { }; PA_DECLARE_CLASS(pa_source_output); -#define PA_SOURCE_OUTPUT(o) ((pa_source_output*) (o)) +#define PA_SOURCE_OUTPUT(o) pa_source_output_cast(o) enum { PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, @@ -129,7 +129,7 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *i); void pa_source_output_cork(pa_source_output *i, int b); -void pa_source_output_set_rate(pa_source_output *o, uint32_t rate); +int pa_source_output_set_rate(pa_source_output *o, uint32_t rate); pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o); diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index 7d013387..f0a898f4 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -42,6 +42,10 @@ #include "source.h" +static PA_DEFINE_CHECK_TYPE(pa_source, source_check_type, pa_msgobject_check_type); + +static void source_free(pa_object *o); + pa_source* pa_source_new( pa_core *core, const char *driver, @@ -69,7 +73,7 @@ pa_source* pa_source_new( pa_return_null_if_fail(!driver || pa_utf8_valid(driver)); pa_return_null_if_fail(pa_utf8_valid(name) && *name); - s = pa_msgobject_new(pa_source); + s = pa_msgobject_new(pa_source, source_check_type); if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SOURCE, s, fail))) { pa_xfree(s); @@ -80,7 +84,7 @@ pa_source* pa_source_new( s->parent.process_msg = pa_source_process_msg; s->core = core; - pa_atomic_store(&s->state, PA_SOURCE_IDLE); + s->state = PA_SOURCE_IDLE; s->name = pa_xstrdup(name); s->description = NULL; s->driver = pa_xstrdup(driver); @@ -94,7 +98,7 @@ pa_source* pa_source_new( pa_cvolume_reset(&s->volume, spec->channels); s->muted = 0; - s->refresh_volume = s->refresh_mute = 0; + s->refresh_volume = s->refresh_muted = 0; s->is_hardware = 0; @@ -103,11 +107,10 @@ pa_source* pa_source_new( s->get_volume = NULL; s->set_mute = NULL; s->get_mute = NULL; - s->start = NULL; - s->stop = NULL; + s->set_state = NULL; s->userdata = NULL; - pa_assert_se(s->asyncmsgq = pa_asyncmsgq_new(0)); + s->asyncmsgq = NULL; r = pa_idxset_put(core->sources, s, &s->index); assert(s->index != PA_IDXSET_INVALID && r >= 0); @@ -118,56 +121,40 @@ pa_source* pa_source_new( s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); s->thread_info.soft_volume = s->volume; s->thread_info.soft_muted = s->muted; + s->thread_info.state = s->state; pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index); return s; } -static void source_start(pa_source *s) { - pa_source_state_t state; +static int source_set_state(pa_source *s, pa_source_state_t state) { + int ret; + pa_assert(s); - state = pa_source_get_state(s); - pa_return_if_fail(state == PA_SOURCE_IDLE || state == PA_SOURCE_SUSPENDED); - - pa_atomic_store(&s->state, PA_SOURCE_RUNNING); - - if (s->start) - s->start(s); - else - pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_START, NULL, NULL, pa_source_unref, NULL); -} - -static void source_stop(pa_source *s) { - pa_source_state_t state; - int stop; + if (s->state == state) + return 0; - pa_assert(s); - state = pa_source_get_state(s); - pa_return_if_fail(state == PA_SOURCE_RUNNING || state == PA_SOURCE_SUSPENDED); + if (s->set_state) + if ((ret = s->set_state(s, state)) < 0) + return -1; - stop = state == PA_SOURCE_RUNNING; - pa_atomic_store(&s->state, PA_SOURCE_IDLE); + if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0) + return -1; - if (stop) { - if (s->stop) - s->stop(s); - else - pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_STOP, NULL, NULL, pa_source_unref, NULL); - } + s->state = state; + return 0; } void pa_source_disconnect(pa_source *s) { pa_source_output *o, *j = NULL; pa_assert(s); - pa_return_if_fail(pa_sink_get_state(s) != PA_SINK_DISCONNECT); - - source_stop(s); + pa_return_if_fail(s->state != PA_SOURCE_DISCONNECTED); - pa_atomic_store(&s->state, PA_SOURCE_DISCONNECTED); pa_namereg_unregister(s->core, s->name); + pa_idxset_remove_by_data(s->core->sources, s, NULL); pa_hook_fire(&s->core->hook_source_disconnect, s); @@ -177,33 +164,36 @@ void pa_source_disconnect(pa_source *s) { j = o; } - pa_idxset_remove_by_data(s->core->sources, s, NULL); + source_set_state(s, PA_SOURCE_DISCONNECTED); s->get_latency = NULL; s->get_volume = NULL; s->set_volume = NULL; s->set_mute = NULL; s->get_mute = NULL; - s->start = NULL; - s->stop = NULL; + s->set_state = NULL; pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index); } -static void source_free(pa_msgobject *o) { +static void source_free(pa_object *o) { + pa_source_output *so; pa_source *s = PA_SOURCE(o); pa_assert(s); pa_assert(pa_source_refcnt(s) == 0); - pa_source_disconnect(s); + if (s->state != PA_SOURCE_DISCONNECTED) + pa_source_disconnect(s); pa_log_info("Freeing source %u \"%s\"", s->index, s->name); pa_idxset_free(s->outputs, NULL, NULL); - pa_hashmap_free(s->thread_info.outputs, pa_sink_output_unref, NULL); - pa_asyncmsgq_free(s->asyncmsgq); + while ((so = pa_hashmap_steal_first(s->thread_info.outputs))) + pa_source_output_unref(so); + + pa_hashmap_free(s->thread_info.outputs, NULL, NULL); pa_xfree(s->name); pa_xfree(s->description); @@ -211,44 +201,28 @@ static void source_free(pa_msgobject *o) { pa_xfree(s); } -void pa_source_update_status(pa_source*s) { +int pa_source_update_status(pa_source*s) { pa_source_assert_ref(s); - if (pa_source_get_state(s) == PA_SOURCE_STATE_SUSPENDED) - return; + if (s->state == PA_SOURCE_SUSPENDED) + return 0; - if (pa_source_used_by(s) > 0) - source_start(s); - else - source_stop(s); + return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE); } -void pa_source_suspend(pa_source *s, int suspend) { - pa_source_state_t state; - +int pa_source_suspend(pa_source *s, int suspend) { pa_source_assert_ref(s); - state = pa_source_get_state(s); - pa_return_if_fail(suspend && (s->state == PA_SOURCE_RUNNING || s->state == PA_SOURCE_IDLE)); - pa_return_if_fail(!suspend && (s->state == PA_SOURCE_SUSPENDED)); - - - if (suspend) { - pa_atomic_store(&s->state, PA_SOURCE_SUSPENDED); - - if (s->stop) - s->stop(s); - else - pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_STOP, NULL, NULL, pa_source_unref, NULL); + if (suspend) + return source_set_state(s, PA_SOURCE_SUSPENDED); + else + return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE); +} - } else { - pa_atomic_store(&s->state, PA_SOURCE_RUNNING); +void pa_source_ping(pa_source *s) { + pa_source_assert_ref(s); - if (s->start) - s->start(s); - else - pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_START, NULL, NULL, pa_source_unref, NULL); - } + pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, NULL, NULL); } void pa_source_post(pa_source*s, const pa_memchunk *chunk) { @@ -258,16 +232,16 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) { pa_source_assert_ref(s); pa_assert(chunk); - if (s->sw_muted || !pa_cvolume_is_norm(&s->sw_volume)) { + if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) { pa_memchunk vchunk = *chunk; pa_memblock_ref(vchunk.memblock); pa_memchunk_make_writable(&vchunk, 0); - if (s->thread_info.muted || pa_cvolume_is_muted(s->thread_info.volume)) + if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume)) pa_silence_memchunk(&vchunk, &s->sample_spec); else - pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.volume); + pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume); while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) pa_source_output_push(o, &vchunk); @@ -289,32 +263,33 @@ pa_usec_t pa_source_get_latency(pa_source *s) { if (s->get_latency) return s->get_latency(s); - if (pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0) + if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0) return 0; return usec; } void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) { - pa_cvolume *v; + int changed; pa_source_assert_ref(s); pa_assert(volume); - changed = !pa_cvolume_equal(volume, s->volume); + changed = !pa_cvolume_equal(volume, &s->volume); s->volume = *volume; if (s->set_volume && s->set_volume(s) < 0) s->set_volume = NULL; if (!s->set_volume) - pa_asyncmsgq_post(s->asyncmsgq, pa_source_ref(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), pa_source_unref, pa_xfree); + pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree); if (changed) pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); } const pa_cvolume *pa_source_get_volume(pa_source *s) { + pa_cvolume old_volume; pa_source_assert_ref(s); old_volume = s->volume; @@ -323,7 +298,7 @@ const pa_cvolume *pa_source_get_volume(pa_source *s) { s->get_volume = NULL; if (!s->get_volume && s->refresh_volume) - pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume); + pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume, NULL); if (!pa_cvolume_equal(&old_volume, &s->volume)) pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); @@ -331,7 +306,7 @@ const pa_cvolume *pa_source_get_volume(pa_source *s) { return &s->volume; } -void pa_source_set_mute(pa_source *s, pa_mixer_t m, int mute) { +void pa_source_set_mute(pa_source *s, int mute) { int changed; pa_source_assert_ref(s); @@ -342,13 +317,13 @@ void pa_source_set_mute(pa_source *s, pa_mixer_t m, int mute) { s->set_mute = NULL; if (!s->set_mute) - pa_asyncmsgq_post(s->asyncmsgq, pa_source_ref(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), pa_source_unref, NULL); + pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL); if (changed) pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); } -int pa_source_get_mute(pa_source *s, pa_mixer_t m) { +int pa_source_get_mute(pa_source *s) { int old_muted; pa_source_assert_ref(s); @@ -358,8 +333,8 @@ int pa_source_get_mute(pa_source *s, pa_mixer_t m) { if (s->get_mute && s->get_mute(s) < 0) s->get_mute = NULL; - if (!s->get_mute && s->refresh_mute) - pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_MUTE, &s->muted); + if (!s->get_mute && s->refresh_muted) + pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, &s->muted, NULL); if (old_muted != s->muted) pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); @@ -393,26 +368,33 @@ void pa_source_set_description(pa_source *s, const char *description) { pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); } +void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) { + pa_source_assert_ref(s); + pa_assert(q); + + s->asyncmsgq = q; +} + unsigned pa_source_used_by(pa_source *s) { pa_source_assert_ref(s); return pa_idxset_size(s->outputs); } -int pa_source_process_msg(pa_msgobject *o, void *object, int code, pa_memchunk *chunk, void *userdata) { +int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) { pa_source *s = PA_SOURCE(o); pa_source_assert_ref(s); - switch (code) { + switch ((pa_source_message_t) code) { case PA_SOURCE_MESSAGE_ADD_OUTPUT: { pa_source_output *i = userdata; pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index), pa_source_output_ref(i)); return 0; } - case PA_SOURCE_MESSAGE_REMOVE_INPUT: { - pa_source_input *i = userdata; - pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index), pa_source_output_ref(i)); + case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: { + pa_source_output *i = userdata; + pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index)); return 0; } @@ -432,7 +414,17 @@ int pa_source_process_msg(pa_msgobject *o, void *object, int code, pa_memchunk * *((int*) userdata) = s->thread_info.soft_muted; return 0; - default: - return -1; + case PA_SOURCE_MESSAGE_PING: + return 0; + + case PA_SOURCE_MESSAGE_SET_STATE: + s->thread_info.state = PA_PTR_TO_UINT(userdata); + return 0; + + case PA_SOURCE_MESSAGE_GET_LATENCY: + case PA_SOURCE_MESSAGE_MAX: + ; } + + return -1; } diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h index b41b1bc3..4db2dedf 100644 --- a/src/pulsecore/source.h +++ b/src/pulsecore/source.h @@ -57,7 +57,7 @@ struct pa_source { uint32_t index; pa_core *core; - pa_atomic_t state; + pa_source_state_t state; char *name; char *description, *driver; /* may be NULL */ @@ -74,10 +74,9 @@ struct pa_source { pa_cvolume volume; int muted; int refresh_volume; - int referesh_mute; + int refresh_muted; - void (*start)(pa_source*source); /* may be NULL */ - void (*stop)(pa_source*source); /* may be NULL */ + int (*set_state)(pa_source*source, pa_source_state_t state); /* may be NULL */ int (*set_volume)(pa_source *s); /* dito */ int (*get_volume)(pa_source *s); /* dito */ int (*set_mute)(pa_source *s); /* dito */ @@ -87,6 +86,7 @@ struct pa_source { pa_asyncmsgq *asyncmsgq; struct { + pa_source_state_t state; pa_hashmap *outputs; pa_cvolume soft_volume; int soft_muted; @@ -96,7 +96,7 @@ struct pa_source { }; PA_DECLARE_CLASS(pa_source); -#define PA_SOURCE(s) ((pa_source*) (s)) +#define PA_SOURCE(s) pa_source_cast(s) typedef enum pa_source_message { PA_SOURCE_MESSAGE_ADD_OUTPUT, @@ -106,8 +106,8 @@ typedef enum pa_source_message { PA_SOURCE_MESSAGE_GET_MUTE, PA_SOURCE_MESSAGE_SET_MUTE, PA_SOURCE_MESSAGE_GET_LATENCY, - PA_SOURCE_MESSAGE_START, - PA_SOURCE_MESSAGE_STOP, + PA_SOURCE_MESSAGE_SET_STATE, + PA_SOURCE_MESSAGE_PING, PA_SOURCE_MESSAGE_MAX } pa_source_message_t; @@ -125,13 +125,15 @@ void pa_source_disconnect(pa_source *s); void pa_source_set_module(pa_source *s, pa_module *m); void pa_source_set_description(pa_source *s, const char *description); +void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q); /* Callable by everyone */ pa_usec_t pa_source_get_latency(pa_source *s); -void pa_source_update_status(pa_source*s); -void pa_source_suspend(pa_source *s); +int pa_source_update_status(pa_source*s); +int pa_source_suspend(pa_source *s, int suspend); +void pa_source_ping(pa_source *s); void pa_source_set_volume(pa_source *source, const pa_cvolume *volume); const pa_cvolume *pa_source_get_volume(pa_source *source); @@ -139,11 +141,11 @@ void pa_source_set_mute(pa_source *source, int mute); int pa_source_get_mute(pa_source *source); unsigned pa_source_used_by(pa_source *s); -#define pa_source_get_state(s) ((pa_source_state_t) pa_atomic_load(&(s)->state)) +#define pa_source_get_state(s) ((pa_source_state_t) (s)->state) /* To be used exclusively by the source driver thread */ void pa_source_post(pa_source*s, const pa_memchunk *b); -void pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk); +int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk); #endif diff --git a/src/tests/asyncmsgq-test.c b/src/tests/asyncmsgq-test.c index d10b512d..847d5be1 100644 --- a/src/tests/asyncmsgq-test.c +++ b/src/tests/asyncmsgq-test.c @@ -49,7 +49,7 @@ static void the_thread(void *_q) { do { int code = 0; - pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, 1) == 0); + pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, 1) == 0); switch (code) { @@ -71,7 +71,7 @@ static void the_thread(void *_q) { break; } - pa_asyncmsgq_done(q); + pa_asyncmsgq_done(q, 0); } while (!quit); } @@ -91,11 +91,11 @@ int main(int argc, char *argv[]) { printf("Operation B post\n"); pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, NULL, NULL); - + pa_thread_yield(); printf("Operation C send\n"); - pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL); + pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, NULL); pa_thread_yield(); |