diff options
62 files changed, 2460 insertions, 742 deletions
diff --git a/bootstrap.sh b/bootstrap.sh index f26ceb15..c9880d85 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -33,7 +33,7 @@ else automake -a -c autoconf -Wall - ./configure --sysconfdir=/etc "$@" + CFLAGS="-g -O0" ./configure --sysconfdir=/etc "$@" make clean fi diff --git a/configure.ac b/configure.ac index 16376902..3a14a061 100644 --- a/configure.ac +++ b/configure.ac @@ -42,7 +42,7 @@ AC_PROG_LIBTOOL # If using GCC specifiy some additional parameters if test "x$GCC" = "xyes" ; then - CFLAGS="$CFLAGS -pipe -Wall -W" + CFLAGS="$CFLAGS -pipe -Wall -W -Wno-unused-parameter" fi AC_CONFIG_FILES([Makefile src/Makefile]) diff --git a/src/Makefile.am b/src/Makefile.am index 443a25f2..6ad1488f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -18,31 +18,39 @@ AM_CFLAGS=-ansi -D_GNU_SOURCE -bin_PROGRAMS = polypaudio +bin_PROGRAMS = polypaudio pacat -pkglib_LTLIBRARIES=libprotocol-simple.la module-simple-protocol-tcp.la \ - libsocket-server.la module-pipe-sink.la libpstream.la libiochannel.la \ +pkglib_LTLIBRARIES=libiochannel.la libsocket-server.la libsocket-client.la \ + libprotocol-simple.la module-simple-protocol-tcp.la \ + module-pipe-sink.la libpstream.la \ libpacket.la module-oss.la module-oss-mmap.la liboss.la libioline.la \ libcli.la module-cli.la libtokenizer.la libdynarray.la \ module-simple-protocol-unix.la module-cli-protocol-tcp.la \ - libprotocol-cli.la libprotocol-native.la module-native-protocol-tcp.la \ - module-native-protocol-unix.la module-cli-protocol-unix.la libtagstruct.la + libprotocol-cli.la module-cli-protocol-unix.la libtagstruct.la \ + libpdispatch.la libprotocol-native.la libpstream-util.la \ + module-native-protocol-tcp.la module-native-protocol-unix.la \ + libpolyp.la polypaudio_SOURCES = idxset.c idxset.h \ queue.c queue.h \ strbuf.c strbuf.h \ + main.c main.h \ mainloop.c mainloop.h \ memblock.c memblock.h \ sample.c sample.h \ + sample-util.c sample-util.h \ memblockq.c memblockq.h \ client.c client.h \ core.c core.h \ - main.c main.h \ sourceoutput.c sourceoutput.h \ sinkinput.c sinkinput.h \ source.c source.h \ sink.c sink.h \ - module.c module.h + module.c module.h \ + mainloop-signal.c mainloop-signal.h \ + mainloop-api.c mainloop-api.h \ + util.c util.h +polypaudio_CFLAGS = $(AM_CFLAGS) polypaudio_INCLUDES = $(INCLTDL) polypaudio_LDADD = $(LIBLTDL) @@ -56,10 +64,22 @@ libsocket_server_la_SOURCES = socket-server.c socket-server.h libsocket_server_la_LDFLAGS = -avoid-version libsocket_server_la_LIBADD = libiochannel.la +libsocket_client_la_SOURCES = socket-client.c socket-client.h +libsocket_client_la_LDFLAGS = -avoid-version +libsocket_client_la_LIBADD = libiochannel.la + libpstream_la_SOURCES = pstream.c pstream.h libpstream_la_LDFLAGS = -avoid-version libpstream_la_LIBADD = libpacket.la +libpstream_util_la_SOURCES = pstream-util.c pstream-util.h +libpstream_util_la_LDFLAGS = -avoid-version +libpstream_util_la_LIBADD = libpstream.la libtagstruct.la + +libpdispatch_la_SOURCES = pdispatch.c pdispatch.h +libpdispatch_la_LDFLAGS = -avoid-version +libpdispatch_la_LIBADD = libpacket.la libtagstruct.la + libiochannel_la_SOURCES = iochannel.c iochannel.h libiochannel_la_LDFLAGS = -avoid-version @@ -90,7 +110,7 @@ libprotocol_cli_la_LIBADD = libsocket-server.la libiochannel.la libcli.la libprotocol_native_la_SOURCES = protocol-native.c protocol-native.h libprotocol_native_la_LDFLAGS = -avoid-version -libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la +libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la libpstream-util.la libtagstruct_la_SOURCES = tagstruct.c tagstruct.h libtagstruct_la_LDFLAGS = -avoid-version @@ -140,3 +160,24 @@ module_oss_mmap_la_LIBADD = libiochannel.la liboss.la module_cli_la_SOURCES = module-cli.c module_cli_la_LDFLAGS = -module -avoid-version module_cli_la_LIBADD = libcli.la libiochannel.la libtokenizer.la + +libpolyp_la_SOURCES = polyp.c polyp.h \ + polypdef.h \ + tagstruct.c tagstruct.h \ + iochannel.c iochannel.h \ + pstream.c pstream.h \ + pstream-util.c pstream-util.h \ + pdispatch.c pdispatch.h \ + protocol-native-spec.h \ + mainloop-api.c mainloop-api.h \ + mainloop.c mainloop.h \ + idxset.c idxset.h \ + util.c util.h \ + memblock.c memblock.h \ + socket-client.c socket-client.h \ + packet.c packet.h \ + queue.c queue.h \ + dynarray.c dynarray.h + +pacat_SOURCES = pacat.c +pacat_LDADD = libpolyp.la @@ -20,6 +20,8 @@ struct cli { void (*eof_callback)(struct cli *c, void *userdata); void *userdata; + + struct client *client; }; struct command { @@ -63,6 +65,7 @@ static const struct command commands[] = { static const char prompt[] = ">>> "; struct cli* cli_new(struct core *core, struct iochannel *io) { + char cname[256]; struct cli *c; assert(io); @@ -75,16 +78,21 @@ struct cli* cli_new(struct core *core, struct iochannel *io) { c->userdata = NULL; c->eof_callback = NULL; + iochannel_peer_to_string(io, cname, sizeof(cname)); + c->client = client_new(core, "CLI", cname); + assert(c->client); + ioline_set_callback(c->line, line_callback, c); ioline_puts(c->line, "Welcome to polypaudio! Use \"help\" for usage information.\n"); ioline_puts(c->line, prompt); - + return c; } void cli_free(struct cli *c) { assert(c); ioline_free(c->line); + client_free(c->client); free(c); } @@ -135,7 +143,7 @@ void cli_set_eof_callback(struct cli *c, void (*cb)(struct cli*c, void *userdata static void cli_command_exit(struct cli *c, struct tokenizer *t) { assert(c && c->core && c->core->mainloop && t); - mainloop_quit(c->core->mainloop, -1); + c->core->mainloop->quit(c->core->mainloop, 0); } static void cli_command_help(struct cli *c, struct tokenizer *t) { @@ -7,7 +7,7 @@ #include "sink.h" #include "source.h" -struct core* core_new(struct mainloop *m) { +struct core* core_new(struct pa_mainloop_api *m) { struct core* c; c = malloc(sizeof(struct core)); assert(c); @@ -2,17 +2,17 @@ #define foocorehfoo #include "idxset.h" -#include "mainloop.h" +#include "mainloop-api.h" struct core { - struct mainloop *mainloop; + struct pa_mainloop_api *mainloop; struct idxset *clients, *sinks, *sources, *sink_inputs, *source_outputs, *modules; uint32_t default_source_index, default_sink_index; }; -struct core* core_new(struct mainloop *m); +struct core* core_new(struct pa_mainloop_api *m); void core_free(struct core*c); #endif diff --git a/src/iochannel.c b/src/iochannel.c index f0c4c499..910b7e0b 100644 --- a/src/iochannel.c +++ b/src/iochannel.c @@ -4,10 +4,11 @@ #include <unistd.h> #include "iochannel.h" +#include "util.h" struct iochannel { int ifd, ofd; - struct mainloop* mainloop; + struct pa_mainloop_api* mainloop; void (*callback)(struct iochannel*io, void *userdata); void*userdata; @@ -17,43 +18,45 @@ struct iochannel { int no_close; - struct mainloop_source* input_source, *output_source; + void* input_source, *output_source; }; static void enable_mainloop_sources(struct iochannel *io) { assert(io); if (io->input_source == io->output_source) { - enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL; + enum pa_mainloop_api_io_events e = PA_MAINLOOP_API_IO_EVENT_NULL; assert(io->input_source); if (!io->readable) - e |= MAINLOOP_IO_EVENT_IN; + e |= PA_MAINLOOP_API_IO_EVENT_INPUT; if (!io->writable) - e |= MAINLOOP_IO_EVENT_OUT; + e |= PA_MAINLOOP_API_IO_EVENT_OUTPUT; - mainloop_source_io_set_events(io->input_source, e); + io->mainloop->enable_io(io->mainloop, io->input_source, e); } else { if (io->input_source) - mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN); + io->mainloop->enable_io(io->mainloop, io->input_source, io->readable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_INPUT); if (io->output_source) - mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT); + io->mainloop->enable_io(io->mainloop, io->output_source, io->writable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_OUTPUT); } } -static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) { +static void callback(struct pa_mainloop_api* m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { struct iochannel *io = userdata; int changed = 0; - assert(s && fd >= 0 && userdata); + assert(m && fd >= 0 && events && userdata); - if ((events & MAINLOOP_IO_EVENT_IN) && !io->readable) { + if ((events & PA_MAINLOOP_API_IO_EVENT_INPUT) && !io->readable) { io->readable = 1; changed = 1; + assert(id == io->input_source); } - if ((events & MAINLOOP_IO_EVENT_OUT) && !io->writable) { + if ((events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) && !io->writable) { io->writable = 1; changed = 1; + assert(id == io->output_source); } if (changed) { @@ -64,15 +67,7 @@ static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event ev } } -static void make_nonblock_fd(int fd) { - int v; - - if ((v = fcntl(fd, F_GETFL)) >= 0) - if (!(v & O_NONBLOCK)) - fcntl(fd, F_SETFL, v|O_NONBLOCK); -} - -struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) { +struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd) { struct iochannel *io; assert(m && (ifd >= 0 || ofd >= 0)); @@ -90,18 +85,18 @@ struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) { if (ifd == ofd) { assert(ifd >= 0); make_nonblock_fd(io->ifd); - io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io); + io->input_source = io->output_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_BOTH, callback, io); } else { if (ifd >= 0) { make_nonblock_fd(io->ifd); - io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io); + io->input_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, io); } else io->input_source = NULL; if (ofd >= 0) { make_nonblock_fd(io->ofd); - io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io); + io->output_source = m->source_io(m, ofd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, callback, io); } else io->output_source = NULL; } @@ -120,9 +115,9 @@ void iochannel_free(struct iochannel*io) { } if (io->input_source) - mainloop_source_free(io->input_source); + io->mainloop->cancel_io(io->mainloop, io->input_source); if (io->output_source && io->output_source != io->input_source) - mainloop_source_free(io->output_source); + io->mainloop->cancel_io(io->mainloop, io->output_source); free(io); } @@ -172,3 +167,8 @@ void iochannel_set_noclose(struct iochannel*io, int b) { assert(io); io->no_close = b; } + +void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l) { + assert(io && s && l); + peer_to_string(s, l, io->ifd); +} diff --git a/src/iochannel.h b/src/iochannel.h index 8ed8b878..b0465a19 100644 --- a/src/iochannel.h +++ b/src/iochannel.h @@ -2,11 +2,11 @@ #define fooiochannelhfoo #include <sys/types.h> -#include "mainloop.h" +#include "mainloop-api.h" struct iochannel; -struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd); +struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd); void iochannel_free(struct iochannel*io); ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l); @@ -19,4 +19,6 @@ void iochannel_set_noclose(struct iochannel*io, int b); void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata); +void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l); + #endif @@ -8,46 +8,52 @@ #include "core.h" #include "mainloop.h" #include "module.h" +#include "mainloop-signal.h" int stdin_inuse = 0, stdout_inuse = 0; -static void signal_callback(struct mainloop_source *m, int sig, void *userdata) { - mainloop_quit(mainloop_source_get_mainloop(m), -1); +static struct pa_mainloop *mainloop; + +static void signal_callback(void *id, int sig, void *userdata) { + struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop); + m->quit(m, 1); fprintf(stderr, "main: got signal.\n"); } int main(int argc, char *argv[]) { - struct mainloop *m; struct core *c; - int r; + int r, retval = 0; r = lt_dlinit(); assert(r == 0); - m = mainloop_new(); - assert(m); - c = core_new(m); - assert(c); + mainloop = pa_mainloop_new(); + assert(mainloop); - mainloop_source_new_signal(m, SIGINT, signal_callback, NULL); + r = pa_signal_init(pa_mainloop_get_api(mainloop)); + assert(r == 0); + pa_signal_register(SIGINT, signal_callback, NULL); signal(SIGPIPE, SIG_IGN); + c = core_new(pa_mainloop_get_api(mainloop)); + assert(c); + module_load(c, "module-oss-mmap", "/dev/dsp1"); module_load(c, "module-pipe-sink", NULL); module_load(c, "module-simple-protocol-tcp", NULL); module_load(c, "module-cli", NULL); fprintf(stderr, "main: mainloop entry.\n"); - while (mainloop_iterate(m, 1) == 0); -/* fprintf(stderr, "main: %u blocks\n", n_blocks);*/ + if (pa_mainloop_run(mainloop, &retval) < 0) + retval = 1; fprintf(stderr, "main: mainloop exit.\n"); - - mainloop_run(m); core_free(c); - mainloop_free(m); + + pa_signal_done(); + pa_mainloop_free(mainloop); lt_dlexit(); - return 0; + return retval; } diff --git a/src/mainloop-api.c b/src/mainloop-api.c new file mode 100644 index 00000000..6caa0c25 --- /dev/null +++ b/src/mainloop-api.c @@ -0,0 +1,35 @@ +#include <assert.h> +#include <stdlib.h> +#include "mainloop-api.h" + +struct once_info { + void (*callback)(void *userdata); + void *userdata; +}; + +static void once_callback(struct pa_mainloop_api *api, void *id, void *userdata) { + struct once_info *i = userdata; + assert(api && i && i->callback); + i->callback(i->userdata); + assert(api->cancel_fixed); + api->cancel_fixed(api, id); + free(i); +} + +void pa_mainloop_api_once(struct pa_mainloop_api* api, void (*callback)(void *userdata), void *userdata) { + struct once_info *i; + void *id; + assert(api && callback); + + i = malloc(sizeof(struct once_info)); + assert(i); + i->callback = callback; + i->userdata = userdata; + + assert(api->source_fixed); + id = api->source_fixed(api, once_callback, i); + assert(id); + + /* Note: if the mainloop is destroyed before once_callback() was called, some memory is leaked. */ +} + diff --git a/src/mainloop-api.h b/src/mainloop-api.h new file mode 100644 index 00000000..96dacc22 --- /dev/null +++ b/src/mainloop-api.h @@ -0,0 +1,43 @@ +#ifndef foomainloopapihfoo +#define foomainloopapihfoo + +#include <time.h> +#include <sys/time.h> + +enum pa_mainloop_api_io_events { + PA_MAINLOOP_API_IO_EVENT_NULL = 0, + PA_MAINLOOP_API_IO_EVENT_INPUT = 1, + PA_MAINLOOP_API_IO_EVENT_OUTPUT = 2, + PA_MAINLOOP_API_IO_EVENT_BOTH = 3 +}; + +struct pa_mainloop_api { + void *userdata; + + /* IO sources */ + void* (*source_io)(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata); + void (*enable_io)(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events); + void (*cancel_io)(struct pa_mainloop_api*a, void* id); + + /* Fixed sources */ + void* (*source_fixed)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata); + void (*enable_fixed)(struct pa_mainloop_api*a, void* id, int b); + void (*cancel_fixed)(struct pa_mainloop_api*a, void* id); + + /* Idle sources */ + void* (*source_idle)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata); + void (*enable_idle)(struct pa_mainloop_api*a, void* id, int b); + void (*cancel_idle)(struct pa_mainloop_api*a, void* id); + + /* Time sources */ + void* (*source_time)(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata); + void (*enable_time)(struct pa_mainloop_api*a, void *id, const struct timeval *tv); + void (*cancel_time)(struct pa_mainloop_api*a, void* id); + + /* Exit mainloop */ + void (*quit)(struct pa_mainloop_api*a, int retval); +}; + +void pa_mainloop_api_once(struct pa_mainloop_api*m, void (*callback)(void *userdata), void *userdata); + +#endif diff --git a/src/mainloop-signal.c b/src/mainloop-signal.c new file mode 100644 index 00000000..dcc72f69 --- /dev/null +++ b/src/mainloop-signal.c @@ -0,0 +1,138 @@ +#include <stdio.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "mainloop-signal.h" +#include "util.h" + +struct signal_info { + int sig; + struct sigaction saved_sigaction; + void (*callback) (void *id, int signal, void *userdata); + void *userdata; + struct signal_info *previous, *next; +}; + +static struct pa_mainloop_api *api = NULL; +static int signal_pipe[2] = { -1, -1 }; +static void* mainloop_source = NULL; +static struct signal_info *signals = NULL; + +static void signal_handler(int sig) { + write(signal_pipe[1], &sig, sizeof(sig)); +} + +static void callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { + assert(a && id && events == PA_MAINLOOP_API_IO_EVENT_INPUT && id == mainloop_source && fd == signal_pipe[0]); + + for (;;) { + ssize_t r; + int sig; + struct signal_info*s; + + if ((r = read(signal_pipe[0], &sig, sizeof(sig))) < 0) { + if (errno == EAGAIN) + return; + + fprintf(stderr, "signal.c: read(): %s\n", strerror(errno)); + return; + } + + if (r != sizeof(sig)) { + fprintf(stderr, "signal.c: short read()\n"); + return; + } + + for (s = signals; s; s = s->next) + if (s->sig == sig) { + assert(s->callback); + s->callback(s, sig, s->userdata); + break; + } + } +} + +int pa_signal_init(struct pa_mainloop_api *a) { + assert(a); + if (pipe(signal_pipe) < 0) { + fprintf(stderr, "pipe() failed: %s\n", strerror(errno)); + return -1; + } + + make_nonblock_fd(signal_pipe[0]); + make_nonblock_fd(signal_pipe[1]); + + api = a; + mainloop_source = api->source_io(api, signal_pipe[0], PA_MAINLOOP_API_IO_EVENT_INPUT, callback, NULL); + assert(mainloop_source); + return 0; +} + +void pa_signal_done(void) { + assert(api && signal_pipe[0] >= 0 && signal_pipe[1] >= 0 && mainloop_source); + + api->cancel_io(api, mainloop_source); + mainloop_source = NULL; + + close(signal_pipe[0]); + close(signal_pipe[1]); + signal_pipe[0] = signal_pipe[1] = -1; + + while (signals) + pa_signal_unregister(signals); + + api = NULL; +} + +void* pa_signal_register(int sig, void (*callback) (void *id, int signal, void *userdata), void *userdata) { + struct signal_info *s = NULL; + struct sigaction sa; + assert(sig > 0 && callback); + + for (s = signals; s; s = s->next) + if (s->sig == sig) + goto fail; + + s = malloc(sizeof(struct signal_info)); + assert(s); + s->sig = sig; + s->callback = callback; + s->userdata = userdata; + + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + + if (sigaction(sig, &sa, &s->saved_sigaction) < 0) + goto fail; + + s->previous = NULL; + s->next = signals; + signals = s; + + return s; +fail: + if (s) + free(s); + return NULL; +} + +void pa_signal_unregister(void *id) { + struct signal_info *s = id; + assert(s); + + if (s->next) + s->next->previous = s->previous; + if (s->previous) + s->previous->next = s->next; + else + signals = s->next; + + sigaction(s->sig, &s->saved_sigaction, NULL); + free(s); +} diff --git a/src/mainloop-signal.h b/src/mainloop-signal.h new file mode 100644 index 00000000..e3e2364d --- /dev/null +++ b/src/mainloop-signal.h @@ -0,0 +1,12 @@ +#ifndef foomainloopsignalhfoo +#define foomainloopsignalhfoo + +#include "mainloop-api.h" + +int pa_signal_init(struct pa_mainloop_api *api); +void pa_signal_done(void); + +void* pa_signal_register(int signal, void (*callback) (void *id, int signal, void *userdata), void *userdata); +void pa_signal_unregister(void *id); + +#endif diff --git a/src/mainloop.c b/src/mainloop.c index fba0461c..a1758c65 100644 --- a/src/mainloop.c +++ b/src/mainloop.c @@ -1,3 +1,4 @@ +#include <stdio.h> #include <signal.h> #include <unistd.h> #include <sys/poll.h> @@ -8,468 +9,515 @@ #include <errno.h> #include "mainloop.h" +#include "util.h" +#include "idxset.h" -struct mainloop_source { - struct mainloop_source *next; - struct mainloop *mainloop; - enum mainloop_source_type type; - - int enabled; +struct mainloop_source_header { + struct pa_mainloop *mainloop; int dead; - void *userdata; - - struct { - int fd; - enum mainloop_io_event events; - void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata); - struct pollfd pollfd; - } io; +}; - struct { - void (*callback)(struct mainloop_source*s, void *userdata); - } fixed; +struct mainloop_source_io { + struct mainloop_source_header header; - struct { - void (*callback)(struct mainloop_source*s, void *userdata); - } idle; - - struct { - int sig; - struct sigaction sigaction; - void (*callback)(struct mainloop_source*s, int sig, void *userdata); - } signal; + int fd; + enum pa_mainloop_api_io_events events; + void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata); + void *userdata; + + struct pollfd *pollfd; }; -struct mainloop_source_list { - struct mainloop_source *sources; - int n_sources; - int dead_sources; +struct mainloop_source_fixed_or_idle { + struct mainloop_source_header header; + int enabled; + + void (*callback)(struct pa_mainloop_api*a, void *id, void *userdata); + void *userdata; }; -struct mainloop { - struct mainloop_source_list io_sources, fixed_sources, idle_sources, signal_sources; +struct mainloop_source_time { + struct mainloop_source_header header; + int enabled; + struct timeval timeval; + void (*callback)(struct pa_mainloop_api*a, void *id, const struct timeval*tv, void *userdata); + void *userdata; +}; + +struct pa_mainloop { + struct idxset *io_sources, *fixed_sources, *idle_sources, *time_sources; + int io_sources_scan_dead, fixed_sources_scan_dead, idle_sources_scan_dead, time_sources_scan_dead; + struct pollfd *pollfds; - int max_pollfds, n_pollfds; + unsigned max_pollfds, n_pollfds; int rebuild_pollfds; - int quit; - int running; - int signal_pipe[2]; - struct pollfd signal_pollfd; + int quit, running, retval; + struct pa_mainloop_api api; }; -static int signal_pipe = -1; +static void setup_api(struct pa_mainloop *m); -static void signal_func(int sig) { - if (signal_pipe >= 0) - write(signal_pipe, &sig, sizeof(sig)); -} +struct pa_mainloop *pa_mainloop_new(void) { + struct pa_mainloop *m; -static void make_nonblock(int fd) { - int v; - - if ((v = fcntl(fd, F_GETFL)) >= 0) - fcntl(fd, F_SETFL, v|O_NONBLOCK); -} + m = malloc(sizeof(struct pa_mainloop)); + assert(m); + m->io_sources = idxset_new(NULL, NULL); + m->fixed_sources = idxset_new(NULL, NULL); + m->idle_sources = idxset_new(NULL, NULL); + m->time_sources = idxset_new(NULL, NULL); -struct mainloop *mainloop_new(void) { - int r; - struct mainloop *m; + assert(m->io_sources && m->fixed_sources && m->idle_sources && m->time_sources); - m = malloc(sizeof(struct mainloop)); - assert(m); - memset(m, 0, sizeof(struct mainloop)); + m->io_sources_scan_dead = m->fixed_sources_scan_dead = m->idle_sources_scan_dead = m->time_sources_scan_dead = 0; + + m->pollfds = NULL; + m->max_pollfds = m->n_pollfds = m->rebuild_pollfds = 0; - r = pipe(m->signal_pipe); - assert(r >= 0 && m->signal_pipe[0] >= 0 && m->signal_pipe[1] >= 0); + m->quit = m->running = m->retval = 0; - make_nonblock(m->signal_pipe[0]); - make_nonblock(m->signal_pipe[1]); - - signal_pipe = m->signal_pipe[1]; - m->signal_pollfd.fd = m->signal_pipe[0]; - m->signal_pollfd.events = POLLIN; - m->signal_pollfd.revents = 0; + setup_api(m); return m; } -static void free_sources(struct mainloop_source_list *l, int all) { - struct mainloop_source *s, *p; - assert(l); +static int foreach(void *p, uint32_t index, int *del, void*userdata) { + struct mainloop_source_header *h = p; + int *all = userdata; + assert(p && del && all); - if (!all && !l->dead_sources) - return; - - p = NULL; - s = l->sources; - while (s) { - if (all || s->dead) { - struct mainloop_source *t = s; - s = s->next; - - if (p) - p->next = s; - else - l->sources = s; - - free(t); - } else { - p = s; - s = s->next; - } + if (*all || h->dead) { + free(h); + *del = 1; } - l->dead_sources = 0; - - if (all) { - assert(!l->sources); - l->n_sources = 0; - } -} + return 0; +}; -void mainloop_free(struct mainloop* m) { +void pa_mainloop_free(struct pa_mainloop* m) { + int all = 1; assert(m); - free_sources(&m->io_sources, 1); - free_sources(&m->fixed_sources, 1); - free_sources(&m->idle_sources, 1); - free_sources(&m->signal_sources, 1); - - if (signal_pipe == m->signal_pipe[1]) - signal_pipe = -1; - close(m->signal_pipe[0]); - close(m->signal_pipe[1]); - + idxset_foreach(m->io_sources, foreach, &all); + idxset_foreach(m->fixed_sources, foreach, &all); + idxset_foreach(m->idle_sources, foreach, &all); + idxset_foreach(m->time_sources, foreach, &all); + + idxset_free(m->io_sources, NULL, NULL); + idxset_free(m->fixed_sources, NULL, NULL); + idxset_free(m->idle_sources, NULL, NULL); + idxset_free(m->time_sources, NULL, NULL); + free(m->pollfds); free(m); } -static void rebuild_pollfds(struct mainloop *m) { - struct mainloop_source*s; +static void scan_dead(struct pa_mainloop *m) { + int all = 0; + assert(m); + if (m->io_sources_scan_dead) + idxset_foreach(m->io_sources, foreach, &all); + if (m->fixed_sources_scan_dead) + idxset_foreach(m->fixed_sources, foreach, &all); + if (m->idle_sources_scan_dead) + idxset_foreach(m->idle_sources, foreach, &all); + if (m->time_sources_scan_dead) + idxset_foreach(m->time_sources, foreach, &all); +} + +static void rebuild_pollfds(struct pa_mainloop *m) { + struct mainloop_source_io*s; struct pollfd *p; - - if (m->max_pollfds < m->io_sources.n_sources+1) { - m->max_pollfds = (m->io_sources.n_sources+1)*2; - m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds); + uint32_t index = IDXSET_INVALID; + unsigned l; + + l = idxset_ncontents(m->io_sources); + if (m->max_pollfds < l) { + m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*l); + m->max_pollfds = l; } m->n_pollfds = 0; p = m->pollfds; - for (s = m->io_sources.sources; s; s = s->next) { - assert(s->type == MAINLOOP_SOURCE_TYPE_IO); - if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) { - *(p++) = s->io.pollfd; - m->n_pollfds++; + for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) { + if (s->header.dead) { + s->pollfd = NULL; + continue; } + + s->pollfd = p; + p->fd = s->fd; + p->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0); + p->revents = 0; + + p++; + m->n_pollfds++; } +} + +static void dispatch_pollfds(struct pa_mainloop *m) { + uint32_t index = IDXSET_INVALID; + struct mainloop_source_io *s; - *(p++) = m->signal_pollfd; - m->n_pollfds++; + for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) { + if (s->header.dead || !s->events || !s->pollfd || !s->pollfd->revents) + continue; + + assert(s->pollfd->revents <= s->pollfd->events && s->pollfd->fd == s->fd && s->callback); + s->callback(&m->api, s, s->fd, ((s->pollfd->revents & POLLIN) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata); + } } -static void dispatch_pollfds(struct mainloop *m) { - int i; - struct pollfd *p; - struct mainloop_source *s; - /* This loop assumes that m->sources and m->pollfds have the same - * order and that m->pollfds is a subset of m->sources! */ +static void run_fixed_or_idle(struct pa_mainloop *m, struct idxset *i) { + uint32_t index = IDXSET_INVALID; + struct mainloop_source_fixed_or_idle *s; - s = m->io_sources.sources; - for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) { - if (!p->revents) + for (s = idxset_first(i, &index); s; s = idxset_next(i, &index)) { + if (s->header.dead || !s->enabled) continue; - if (p->fd == m->signal_pipe[0]) { - /* Event from signal pipe */ + assert(s->callback); + s->callback(&m->api, s, s->userdata); + } +} + +static int calc_next_timeout(struct pa_mainloop *m) { + uint32_t index = IDXSET_INVALID; + struct mainloop_source_time *s; + struct timeval now; + int t = -1; - if (p->revents & POLLIN) { - int sig; - ssize_t r; - r = read(m->signal_pipe[0], &sig, sizeof(sig)); - assert((r < 0 && errno == EAGAIN) || r == sizeof(sig)); + if (idxset_isempty(m->time_sources)) + return -1; + + gettimeofday(&now, NULL); + + for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) { + int tmp; + + if (s->header.dead || !s->enabled) + continue; + + if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec)) + return 0; + + tmp = (s->timeval.tv_sec - now.tv_sec)*1000; - if (r == sizeof(sig)) { - struct mainloop_source *l = m->signal_sources.sources; - while (l) { - assert(l->type == MAINLOOP_SOURCE_TYPE_SIGNAL); - - if (l->signal.sig == sig && l->enabled && !l->dead) { - assert(l->signal.callback); - l->signal.callback(l, sig, l->userdata); - } - - l = l->next; - } - } - } - - } else { - /* Event from I/O source */ - - for (; s; s = s->next) { - if (p->fd != s->io.fd) - continue; - - assert(s->type == MAINLOOP_SOURCE_TYPE_IO); - - if (!s->dead && s->enabled) { - enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0); - if (e) { - assert(s->io.callback); - s->io.callback(s, s->io.fd, e, s->userdata); - } - } - - break; - } + if (s->timeval.tv_usec > now.tv_usec) + tmp += (s->timeval.tv_usec - now.tv_usec)/1000; + else + tmp -= (now.tv_usec - s->timeval.tv_usec)/1000; + + if (tmp == 0) + return 0; + else if (tmp < t) + t = tmp; + } + + return t; +} + +static void dispatch_timeout(struct pa_mainloop *m) { + uint32_t index = IDXSET_INVALID; + struct mainloop_source_time *s; + struct timeval now; + assert(m); + + if (idxset_isempty(m->time_sources)) + return; + + gettimeofday(&now, NULL); + for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) { + + if (s->header.dead || !s->enabled) + continue; + + if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec)) { + assert(s->callback); + + s->enabled = 0; + s->callback(&m->api, s, &s->timeval, s->userdata); } } } -int mainloop_iterate(struct mainloop *m, int block) { - struct mainloop_source *s; - int c; +static int any_idle_sources(struct pa_mainloop *m) { + struct mainloop_source_fixed_or_idle *s; + uint32_t index; + assert(m); + + for (s = idxset_first(m->idle_sources, &index); s; s = idxset_next(m->idle_sources, &index)) + if (!s->header.dead && s->enabled) + return 1; + + return 0; +} + +int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval) { + int r, idle; assert(m && !m->running); - if(m->quit) - return m->quit; - - free_sources(&m->io_sources, 0); - free_sources(&m->fixed_sources, 0); - free_sources(&m->idle_sources, 0); - free_sources(&m->signal_sources, 0); - - for (s = m->fixed_sources.sources; s; s = s->next) { - assert(s->type == MAINLOOP_SOURCE_TYPE_FIXED); - if (!s->dead && s->enabled) { - assert(s->fixed.callback); - s->fixed.callback(s, s->userdata); - } + if(m->quit) { + if (retval) + *retval = m->retval; + return 1; } + m->running = 1; + + scan_dead(m); + run_fixed_or_idle(m, m->fixed_sources); + if (m->rebuild_pollfds) { rebuild_pollfds(m); m->rebuild_pollfds = 0; } - m->running = 1; + idle = any_idle_sources(m); do { - c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0); - } while (c < 0 && errno == EINTR); - - if (c > 0) + int t; + + if (!block || idle) + t = 0; + else + t = calc_next_timeout(m); + + r = poll(m->pollfds, m->n_pollfds, t); + } while (r < 0 && errno == EINTR); + + dispatch_timeout(m); + + if (r > 0) dispatch_pollfds(m); - else if (c == 0) { - for (s = m->idle_sources.sources; s; s = s->next) { - assert(s->type == MAINLOOP_SOURCE_TYPE_IDLE); - if (!s->dead && s->enabled) { - assert(s->idle.callback); - s->idle.callback(s, s->userdata); - } - } - } + else if (r == 0 && idle) + run_fixed_or_idle(m, m->idle_sources); + else if (r < 0) + fprintf(stderr, "select(): %s\n", strerror(errno)); m->running = 0; - return c < 0 ? -1 : 0; + return r < 0 ? -1 : 0; } -int mainloop_run(struct mainloop *m) { +int pa_mainloop_run(struct pa_mainloop *m, int *retval) { int r; - while (!(r = mainloop_iterate(m, 1))); + while ((r = pa_mainloop_iterate(m, 1, retval)) == 0); return r; } -void mainloop_quit(struct mainloop *m, int r) { +void pa_mainloop_quit(struct pa_mainloop *m, int r) { assert(m); m->quit = r; } -static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) { - struct mainloop_source_list *l; - - switch(type) { - case MAINLOOP_SOURCE_TYPE_IO: - l = &m->io_sources; - break; - case MAINLOOP_SOURCE_TYPE_FIXED: - l = &m->fixed_sources; - break; - case MAINLOOP_SOURCE_TYPE_IDLE: - l = &m->idle_sources; - break; - case MAINLOOP_SOURCE_TYPE_SIGNAL: - l = &m->signal_sources; - break; - default: - l = NULL; - break; - } - - return l; -} - -static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) { - struct mainloop_source_list *l; - struct mainloop_source* s; - assert(m); +/* IO sources */ +static void* mainloop_source_io(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata) { + struct pa_mainloop *m; + struct mainloop_source_io *s; + assert(a && a->userdata && fd >= 0 && callback); + m = a->userdata; + assert(a == &m->api); - s = malloc(sizeof(struct mainloop_source)); + s = malloc(sizeof(struct mainloop_source_io)); assert(s); - memset(s, 0, sizeof(struct mainloop_source)); + s->header.mainloop = m; + s->header.dead = 0; - s->type = type; - s->mainloop = m; + s->fd = fd; + s->events = events; + s->callback = callback; + s->userdata = userdata; + s->pollfd = NULL; - l = get_source_list(m, type); - assert(l); - - s->next = l->sources; - l->sources = s; - l->n_sources++; + idxset_put(m->io_sources, s, NULL); + m->rebuild_pollfds = 1; return s; } -struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) { - struct mainloop_source* s; - assert(m && fd>=0 && callback); +static void mainloop_enable_io(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events) { + struct pa_mainloop *m; + struct mainloop_source_io *s = id; + assert(a && a->userdata && s && !s->header.dead); + m = a->userdata; + assert(a == &m->api && s->header.mainloop == m); - s = source_new(m, MAINLOOP_SOURCE_TYPE_IO); + s->events = events; + if (s->pollfd) + s->pollfd->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0); +} - s->io.fd = fd; - s->io.events = event; - s->io.callback = callback; - s->userdata = userdata; - s->io.pollfd.fd = fd; - s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0); - s->io.pollfd.revents = 0; +static void mainloop_cancel_io(struct pa_mainloop_api*a, void* id) { + struct pa_mainloop *m; + struct mainloop_source_io *s = id; + assert(a && a->userdata && s && !s->header.dead); + m = a->userdata; + assert(a == &m->api && s->header.mainloop == m); - s->enabled = 1; - - m->rebuild_pollfds = 1; - return s; + s->header.dead = 1; + m->io_sources_scan_dead = 1; } -struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) { - struct mainloop_source* s; - assert(m && callback); +/* Fixed sources */ +static void* mainloop_source_fixed(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) { + struct pa_mainloop *m; + struct mainloop_source_fixed_or_idle *s; + assert(a && a->userdata && callback); + m = a->userdata; + assert(a == &m->api); - s = source_new(m, MAINLOOP_SOURCE_TYPE_FIXED); + s = malloc(sizeof(struct mainloop_source_fixed_or_idle)); + assert(s); + s->header.mainloop = m; + s->header.dead = 0; - s->fixed.callback = callback; - s->userdata = userdata; s->enabled = 1; + s->callback = callback; + s->userdata = userdata; + + idxset_put(m->fixed_sources, s, NULL); return s; } -struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) { - struct mainloop_source* s; - assert(m && callback); - - s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE); +static void mainloop_enable_fixed(struct pa_mainloop_api*a, void* id, int b) { + struct pa_mainloop *m; + struct mainloop_source_fixed_or_idle *s = id; + assert(a && a->userdata && s && !s->header.dead); + m = a->userdata; + assert(a == &m->api); - s->idle.callback = callback; - s->userdata = userdata; - s->enabled = 1; - return s; + s->enabled = b; } -struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata) { - struct mainloop_source* s; - struct sigaction save_sa, sa; - - assert(m && callback); +static void mainloop_cancel_fixed(struct pa_mainloop_api*a, void* id) { + struct pa_mainloop *m; + struct mainloop_source_fixed_or_idle *s = id; + assert(a && a->userdata && s && !s->header.dead); + m = a->userdata; + assert(a == &m->api); - memset(&sa, 0, sizeof(sa)); - sa.sa_handler = signal_func; - sa.sa_flags = SA_RESTART; - sigemptyset(&sa.sa_mask); + s->header.dead = 1; + m->fixed_sources_scan_dead = 1; +} - memset(&save_sa, 0, sizeof(save_sa)); +/* Idle sources */ +static void* mainloop_source_idle(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) { + struct pa_mainloop *m; + struct mainloop_source_fixed_or_idle *s; + assert(a && a->userdata && callback); + m = a->userdata; + assert(a == &m->api); + + s = malloc(sizeof(struct mainloop_source_fixed_or_idle)); + assert(s); + s->header.mainloop = m; + s->header.dead = 0; - if (sigaction(sig, &sa, &save_sa) < 0) - return NULL; - - s = source_new(m, MAINLOOP_SOURCE_TYPE_SIGNAL); - s->signal.sig = sig; - s->signal.sigaction = save_sa; - - s->signal.callback = callback; - s->userdata = userdata; s->enabled = 1; + s->callback = callback; + s->userdata = userdata; + + idxset_put(m->idle_sources, s, NULL); return s; } -void mainloop_source_free(struct mainloop_source*s) { - struct mainloop_source_list *l; - assert(s && !s->dead); - s->dead = 1; - - assert(s->mainloop); - l = get_source_list(s->mainloop, s->type); - assert(l); +static void mainloop_cancel_idle(struct pa_mainloop_api*a, void* id) { + struct pa_mainloop *m; + struct mainloop_source_fixed_or_idle *s = id; + assert(a && a->userdata && s && !s->header.dead); + m = a->userdata; + assert(a == &m->api); - l->n_sources--; - l->dead_sources = 1; - - if (s->type == MAINLOOP_SOURCE_TYPE_IO) - s->mainloop->rebuild_pollfds = 1; - else if (s->type == MAINLOOP_SOURCE_TYPE_SIGNAL) - sigaction(s->signal.sig, &s->signal.sigaction, NULL); + s->header.dead = 1; + m->idle_sources_scan_dead = 1; } -void mainloop_source_enable(struct mainloop_source*s, int b) { - assert(s && !s->dead); +/* Time sources */ +static void* mainloop_source_time(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata) { + struct pa_mainloop *m; + struct mainloop_source_time *s; + assert(a && a->userdata && callback); + m = a->userdata; + assert(a == &m->api); - if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) { - assert(s->mainloop); - s->mainloop->rebuild_pollfds = 1; - } + s = malloc(sizeof(struct mainloop_source_time)); + assert(s); + s->header.mainloop = m; + s->header.dead = 0; - s->enabled = b; -} + s->enabled = !!tv; + if (tv) + s->timeval = *tv; -void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) { - assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO); + s->callback = callback; + s->userdata = userdata; - if (s->io.events != events) { - assert(s->mainloop); - s->mainloop->rebuild_pollfds = 1; - } + idxset_put(m->time_sources, s, NULL); + return s; +} - s->io.events = events; - s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0); +static void mainloop_enable_time(struct pa_mainloop_api*a, void *id, const struct timeval *tv) { + struct pa_mainloop *m; + struct mainloop_source_time *s = id; + assert(a && a->userdata && s && !s->header.dead); + m = a->userdata; + assert(a == &m->api); + + if (tv) { + s->enabled = 1; + s->timeval = *tv; + } else + s->enabled = 0; } -struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) { - assert(s); +static void mainloop_cancel_time(struct pa_mainloop_api*a, void* id) { + struct pa_mainloop *m; + struct mainloop_source_time *s = id; + assert(a && a->userdata && s && !s->header.dead); + m = a->userdata; + assert(a == &m->api); + + s->header.dead = 1; + m->time_sources_scan_dead = 1; - return s->mainloop; } -struct once_info { - void (*callback)(void *userdata); - void *userdata; -}; +static void mainloop_quit(struct pa_mainloop_api*a, int retval) { + struct pa_mainloop *m; + assert(a && a->userdata); + m = a->userdata; + assert(a == &m->api); -static void once_callback(struct mainloop_source *s, void *userdata) { - struct once_info *i = userdata; - assert(s && i && i->callback); - i->callback(i->userdata); - mainloop_source_free(s); - free(i); + m->quit = 1; + m->retval = retval; } + +static void setup_api(struct pa_mainloop *m) { + assert(m); + + m->api.userdata = m; + m->api.source_io = mainloop_source_io; + m->api.enable_io = mainloop_enable_io; + m->api.cancel_io = mainloop_cancel_io; + + m->api.source_fixed = mainloop_source_fixed; + m->api.enable_fixed = mainloop_enable_fixed; + m->api.cancel_fixed = mainloop_cancel_fixed; + + m->api.source_idle = mainloop_source_idle; + m->api.enable_idle = mainloop_enable_fixed; /* (!) */ + m->api.cancel_idle = mainloop_cancel_idle; + + m->api.source_time = mainloop_source_time; + m->api.enable_time = mainloop_enable_time; + m->api.cancel_time = mainloop_cancel_time; -void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata) { - struct once_info *i; - assert(m && callback); + m->api.quit = mainloop_quit; +} - i = malloc(sizeof(struct once_info)); - assert(i); - i->callback = callback; - i->userdata = userdata; - - mainloop_source_new_fixed(m, once_callback, i); +struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m) { + assert(m); + return &m->api; } + diff --git a/src/mainloop.h b/src/mainloop.h index c1bfc62a..7837636f 100644 --- a/src/mainloop.h +++ b/src/mainloop.h @@ -1,42 +1,16 @@ #ifndef foomainloophfoo #define foomainloophfoo -struct mainloop; -struct mainloop_source; +#include "mainloop-api.h" -enum mainloop_io_event { - MAINLOOP_IO_EVENT_NULL = 0, - MAINLOOP_IO_EVENT_IN = 1, - MAINLOOP_IO_EVENT_OUT = 2, - MAINLOOP_IO_EVENT_BOTH = 3 -}; +struct pa_mainloop; -enum mainloop_source_type { - MAINLOOP_SOURCE_TYPE_IO, - MAINLOOP_SOURCE_TYPE_FIXED, - MAINLOOP_SOURCE_TYPE_IDLE, - MAINLOOP_SOURCE_TYPE_SIGNAL -}; +struct pa_mainloop *pa_mainloop_new(void); +void pa_mainloop_free(struct pa_mainloop* m); -struct mainloop *mainloop_new(void); -void mainloop_free(struct mainloop* m); +int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval); +int pa_mainloop_run(struct pa_mainloop *m, int *retval); -int mainloop_iterate(struct mainloop *m, int block); -int mainloop_run(struct mainloop *m); -void mainloop_quit(struct mainloop *m, int r); - -struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata); -struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata); -struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata); -struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata); - -void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata); - -void mainloop_source_free(struct mainloop_source*s); -void mainloop_source_enable(struct mainloop_source*s, int b); - -void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event); - -struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s); +struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m); #endif diff --git a/src/memblockq.c b/src/memblockq.c index 10c59e50..9a601c3a 100644 --- a/src/memblockq.c +++ b/src/memblockq.c @@ -220,3 +220,12 @@ uint32_t memblockq_get_length(struct memblockq *bq) { assert(bq); return bq->total_length; } + +uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen) { + assert(bq && qlen); + + if (bq->total_length >= qlen) + return 0; + + return qlen - bq->total_length; +} diff --git a/src/memblockq.h b/src/memblockq.h index 0a68ddaf..a681ff08 100644 --- a/src/memblockq.h +++ b/src/memblockq.h @@ -25,4 +25,6 @@ int memblockq_is_writable(struct memblockq *bq, size_t length); uint32_t memblockq_get_delay(struct memblockq *bq); uint32_t memblockq_get_length(struct memblockq *bq); +uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen); + #endif diff --git a/src/module-oss-mmap.c b/src/module-oss-mmap.c index bfc3a6fa..b70ea6bd 100644 --- a/src/module-oss-mmap.c +++ b/src/module-oss-mmap.c @@ -16,15 +16,15 @@ #include "source.h" #include "module.h" #include "oss.h" +#include "sample-util.h" struct userdata { struct sink *sink; struct source *source; struct core *core; - struct sample_spec sample_spec; + struct pa_sample_spec sample_spec; - size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, sample_size, out_fill; - uint32_t sample_usec; + size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, out_fill; int fd; @@ -161,12 +161,14 @@ static void do_read(struct userdata *u) { in_clear_memblocks(u, u->in_fragments/2); }; -static void io_callback(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata) { +static void io_callback(struct pa_mainloop_api *m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { struct userdata *u = userdata; - if (event & MAINLOOP_IO_EVENT_IN) + assert (u && u->core->mainloop == m && u->mainloop_source == id); + + if (events & PA_MAINLOOP_API_IO_EVENT_INPUT) do_read(u); - if (event & MAINLOOP_IO_EVENT_OUT) + if (events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) do_write(u); } @@ -175,7 +177,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) { assert(s && u); do_write(u); - return u->out_fill/u->sample_size*u->sample_usec; + return pa_samples_usec(u->out_fill, &s->sample_spec); } int module_init(struct core *c, struct module*m) { @@ -316,10 +318,7 @@ int module_init(struct core *c, struct module*m) { assert(u->source || u->sink); - u->sample_size = sample_size(&u->sample_spec); - u->sample_usec = 1000000/u->sample_spec.rate; - - u->mainloop_source = mainloop_source_new_io(c->mainloop, u->fd, (u->source ? MAINLOOP_IO_EVENT_IN : 0) | (u->sink ? MAINLOOP_IO_EVENT_OUT : 0), io_callback, u); + u->mainloop_source = c->mainloop->source_io(c->mainloop, u->fd, (u->source ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | (u->sink ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), io_callback, u); assert(u->mainloop_source); return 0; @@ -360,7 +359,7 @@ void module_done(struct core *c, struct module*m) { source_free(u->source); if (u->mainloop_source) - mainloop_source_free(u->mainloop_source); + u->core->mainloop->cancel_io(u->core->mainloop, u->mainloop_source); if (u->fd >= 0) close(u->fd); diff --git a/src/module-oss.c b/src/module-oss.c index fab5c8c5..4f307545 100644 --- a/src/module-oss.c +++ b/src/module-oss.c @@ -15,7 +15,7 @@ #include "source.h" #include "module.h" #include "oss.h" -#include "sample.h" +#include "sample-util.h" struct userdata { struct sink *sink; @@ -81,7 +81,7 @@ static void do_read(struct userdata *u) { return; } - assert(r <= memchunk.memblock->length); + assert(r <= (ssize_t) memchunk.memblock->length); memchunk.length = memchunk.memblock->length = r; memchunk.index = 0; @@ -107,7 +107,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) { return 0; } - return samples_usec(arg, &s->sample_spec); + return pa_samples_usec(arg, &s->sample_spec); } int module_init(struct core *c, struct module*m) { @@ -117,7 +117,7 @@ int module_init(struct core *c, struct module*m) { int fd = -1; int frag_size, in_frag_size, out_frag_size; int mode; - struct sample_spec ss; + struct pa_sample_spec ss; assert(c && m); p = m->argument ? m->argument : "/dev/dsp"; @@ -203,7 +203,7 @@ int module_init(struct core *c, struct module*m) { u->memchunk.memblock = NULL; u->memchunk.length = 0; - u->sample_size = sample_size(&ss); + u->sample_size = pa_sample_size(&ss); u->out_fragment_size = out_frag_size; u->in_fragment_size = in_frag_size; diff --git a/src/module-pipe-sink.c b/src/module-pipe-sink.c index c0a903e9..9dcf5d23 100644 --- a/src/module-pipe-sink.c +++ b/src/module-pipe-sink.c @@ -18,7 +18,8 @@ struct userdata { struct sink *sink; struct iochannel *io; struct core *core; - struct mainloop_source *mainloop_source; + void *mainloop_source; + struct pa_mainloop_api *mainloop; struct memchunk memchunk; }; @@ -27,7 +28,7 @@ static void do_write(struct userdata *u) { ssize_t r; assert(u); - mainloop_source_enable(u->mainloop_source, 0); + u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0); if (!iochannel_is_writable(u->io)) return; @@ -57,10 +58,10 @@ static void notify_cb(struct sink*s) { assert(s && u); if (iochannel_is_writable(u->io)) - mainloop_source_enable(u->mainloop_source, 1); + u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 1); } -static void prepare_callback(struct mainloop_source *src, void *userdata) { +static void fixed_callback(struct pa_mainloop_api *m, void *id, void *userdata) { struct userdata *u = userdata; assert(u); do_write(u); @@ -77,7 +78,7 @@ int module_init(struct core *c, struct module*m) { struct stat st; char *p; int fd = -1; - static const struct sample_spec ss = { + static const struct pa_sample_spec ss = { .format = SAMPLE_S16NE, .rate = 44100, .channels = 2, @@ -120,10 +121,11 @@ int module_init(struct core *c, struct module*m) { u->memchunk.memblock = NULL; u->memchunk.length = 0; - u->mainloop_source = mainloop_source_new_fixed(c->mainloop, prepare_callback, u); + u->mainloop = c->mainloop; + u->mainloop_source = u->mainloop->source_fixed(u->mainloop, fixed_callback, u); assert(u->mainloop_source); - mainloop_source_enable(u->mainloop_source, 0); - + u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0); + m->userdata = u; return 0; @@ -147,7 +149,7 @@ void module_done(struct core *c, struct module*m) { sink_free(u->sink); iochannel_free(u->io); - mainloop_source_free(u->mainloop_source); + u->mainloop->cancel_fixed(u->mainloop, u->mainloop_source); assert(u->filename); unlink(u->filename); diff --git a/src/module.c b/src/module.c index c6de1751..883a22df 100644 --- a/src/module.c +++ b/src/module.c @@ -151,5 +151,5 @@ void module_unload_request(struct core *c, struct module *m) { assert(i); i->core = c; i->index = m->index; - mainloop_once(c->mainloop, module_unload_once_callback, i); + pa_mainloop_api_once(c->mainloop, module_unload_once_callback, i); } @@ -7,7 +7,7 @@ #include "oss.h" -int oss_auto_format(int fd, struct sample_spec *ss) { +int oss_auto_format(int fd, struct pa_sample_spec *ss) { int format, channels, speed; assert(fd >= 0 && ss); @@ -3,6 +3,6 @@ #include "sample.h" -int oss_auto_format(int fd, struct sample_spec *ss); +int oss_auto_format(int fd, struct pa_sample_spec *ss); #endif diff --git a/src/pacat.c b/src/pacat.c new file mode 100644 index 00000000..5f5a373b --- /dev/null +++ b/src/pacat.c @@ -0,0 +1,169 @@ +#include <string.h> +#include <errno.h> +#include <unistd.h> +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> + +#include "polyp.h" +#include "mainloop.h" + +static struct pa_context *context = NULL; +static struct pa_stream *stream = NULL; +static struct pa_mainloop_api *mainloop_api = NULL; + +static void *buffer = NULL; +static size_t buffer_length = 0, buffer_index = 0; + +static void* stdin_source = NULL; + +static void context_die_callback(struct pa_context *c, void *userdata) { + assert(c); + fprintf(stderr, "Connection to server shut down, exiting.\n"); + mainloop_api->quit(mainloop_api, 1); +} + +static void stream_die_callback(struct pa_stream *s, void *userdata) { + assert(s); + fprintf(stderr, "Stream deleted, exiting.\n"); + mainloop_api->quit(mainloop_api, 1); +} + +static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) { + size_t l; + assert(s && length); + + mainloop_api->enable_io(mainloop_api, stdin_source, PA_STREAM_PLAYBACK); + + if (!buffer) + return; + + assert(buffer_length); + + l = length; + if (l > buffer_length) + l = buffer_length; + + pa_stream_write(s, buffer+buffer_index, l); + buffer_length -= l; + buffer_index += l; + + if (!buffer_length) { + free(buffer); + buffer = NULL; + buffer_index = buffer_length = 0; + } +} + +static void stream_complete_callback(struct pa_context*c, struct pa_stream *s, void *userdata) { + assert(c); + + if (!s) { + fprintf(stderr, "Stream creation failed.\n"); + mainloop_api->quit(mainloop_api, 1); + } + + stream = s; + pa_stream_set_die_callback(stream, stream_die_callback, NULL); + pa_stream_set_write_callback(stream, stream_write_callback, NULL); +} + +static void context_complete_callback(struct pa_context *c, int success, void *userdata) { + static const struct pa_sample_spec ss = { + .format = SAMPLE_S16NE, + .rate = 44100, + .channels = 2 + }; + + assert(c && !stream); + + if (!success) { + fprintf(stderr, "Connection failed\n"); + goto fail; + } + + if (pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL) < 0) { + fprintf(stderr, "pa_stream_new() failed.\n"); + goto fail; + } + + return; + +fail: + mainloop_api->quit(mainloop_api, 1); +} + +static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { + size_t l; + ssize_t r; + assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT); + + if (buffer) { + mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL); + return; + } + + if (!(l = pa_stream_writable_size(stream))) + l = 4096; + buffer = malloc(l); + assert(buffer); + if ((r = read(fd, buffer, l)) <= 0) { + if (r == 0) + mainloop_api->quit(mainloop_api, 0); + else { + fprintf(stderr, "read() failed: %s\n", strerror(errno)); + mainloop_api->quit(mainloop_api, 1); + } + + return; + } + + buffer_length = r; + buffer_index = 0; +} + +int main(int argc, char *argv[]) { + struct pa_mainloop* m; + int ret = 1; + + if (!(m = pa_mainloop_new())) { + fprintf(stderr, "pa_mainloop_new() failed.\n"); + goto quit; + } + + mainloop_api = pa_mainloop_get_api(m); + + if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) { + fprintf(stderr, "source_io() failed.\n"); + goto quit; + } + + if (!(context = pa_context_new(mainloop_api, argv[0]))) { + fprintf(stderr, "pa_context_new() failed.\n"); + goto quit; + } + + if (pa_context_connect(context, NULL, context_complete_callback, NULL) < 0) { + fprintf(stderr, "pa_context_connext() failed.\n"); + goto quit; + } + + pa_context_set_die_callback(context, context_die_callback, NULL); + + if (pa_mainloop_run(m, &ret) < 0) { + fprintf(stderr, "pa_mainloop_run() failed.\n"); + goto quit; + } + +quit: + if (stream) + pa_stream_free(stream); + if (context) + pa_context_free(context); + if (m) + pa_mainloop_free(m); + if (buffer) + free(buffer); + + return ret; +} diff --git a/src/packet.c b/src/packet.c index 47fce919..0f966d9a 100644 --- a/src/packet.c +++ b/src/packet.c @@ -16,7 +16,7 @@ struct packet* packet_new(size_t length) { return p; } -struct packet* packet_dynamic(uint8_t* data, size_t length) { +struct packet* packet_new_dynamic(uint8_t* data, size_t length) { struct packet *p; assert(data && length); p = malloc(sizeof(struct packet)); @@ -26,6 +26,7 @@ struct packet* packet_dynamic(uint8_t* data, size_t length) { p->length = length; p->data = data; p->type = PACKET_DYNAMIC; + return p; } struct packet* packet_ref(struct packet *p) { diff --git a/src/pdispatch.c b/src/pdispatch.c new file mode 100644 index 00000000..48b6639d --- /dev/null +++ b/src/pdispatch.c @@ -0,0 +1,149 @@ +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include "pdispatch.h" +#include "protocol-native-spec.h" + +struct reply_info { + struct pdispatch *pdispatch; + struct reply_info *next, *previous; + int (*callback)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); + void *userdata; + uint32_t tag; + void *mainloop_timeout; +}; + +struct pdispatch { + struct pa_mainloop_api *mainloop; + const struct pdispatch_command *command_table; + unsigned n_commands; + struct reply_info *replies; +}; + +static void reply_info_free(struct reply_info *r) { + assert(r && r->pdispatch && r->pdispatch->mainloop); + r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout); + + if (r->previous) + r->previous->next = r->next; + else + r->pdispatch->replies = r->next; + + if (r->next) + r->next->previous = r->previous; + + free(r); +} + +struct pdispatch* pdispatch_new(struct pa_mainloop_api *mainloop, const struct pdispatch_command*table, unsigned entries) { + struct pdispatch *pd; + assert(mainloop); + + assert((entries && table) || (!entries && !table)); + + pd = malloc(sizeof(struct pdispatch)); + assert(pd); + pd->mainloop = mainloop; + pd->command_table = table; + pd->n_commands = entries; + return pd; +} + +void pdispatch_free(struct pdispatch *pd) { + assert(pd); + while (pd->replies) + reply_info_free(pd->replies); + free(pd); +} + +int pdispatch_run(struct pdispatch *pd, struct packet*packet, void *userdata) { + uint32_t tag, command; + assert(pd && packet); + struct tagstruct *ts = NULL; + assert(pd && packet && packet->data); + + if (packet->length <= 8) + goto fail; + + ts = tagstruct_new(packet->data, packet->length); + assert(ts); + + if (tagstruct_getu32(ts, &command) < 0 || + tagstruct_getu32(ts, &tag) < 0) + goto fail; + + if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) { + struct reply_info *r; + int done = 0; + + for (r = pd->replies; r; r = r->next) { + if (r->tag == tag) { + int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata); + reply_info_free(r); + + if (ret < 0) + goto fail; + + done = 1; + break; + } + } + + if (!done) + goto fail; + + } else if (pd->command_table && command < pd->n_commands) { + const struct pdispatch_command *c = pd->command_table+command; + + if (!c->proc) + goto fail; + + if (c->proc(pd, command, tag, ts, userdata) < 0) + goto fail; + } else + goto fail; + + tagstruct_free(ts); + + return 0; + +fail: + if (ts) + tagstruct_free(ts); + + fprintf(stderr, "protocol-native: invalid packet.\n"); + return -1; +} + +static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) { + struct reply_info*r = userdata; + assert (r && r->mainloop_timeout == id && r->pdispatch && r->pdispatch->mainloop == m && r->callback); + + r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata); + reply_info_free(r); +} + +void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata) { + struct reply_info *r; + struct timeval tv; + assert(pd && cb); + + r = malloc(sizeof(struct reply_info)); + assert(r); + r->pdispatch = pd; + r->callback = cb; + r->userdata = userdata; + r->tag = tag; + + gettimeofday(&tv, NULL); + tv.tv_sec += timeout; + + r->mainloop_timeout = pd->mainloop->source_time(pd->mainloop, &tv, timeout_callback, r); + assert(r->mainloop_timeout); + + r->previous = NULL; + r->next = pd->replies; + if (r->next) + r->next->previous = r; + pd->replies = r; +} diff --git a/src/pdispatch.h b/src/pdispatch.h new file mode 100644 index 00000000..466da9a4 --- /dev/null +++ b/src/pdispatch.h @@ -0,0 +1,22 @@ +#ifndef foopdispatchhfoo +#define foopdispatchhfoo + +#include <inttypes.h> +#include "tagstruct.h" +#include "packet.h" +#include "mainloop-api.h" + +struct pdispatch; + +struct pdispatch_command { + int (*proc)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +}; + +struct pdispatch* pdispatch_new(struct pa_mainloop_api *m, const struct pdispatch_command*table, unsigned entries); +void pdispatch_free(struct pdispatch *pd); + +int pdispatch_run(struct pdispatch *pd, struct packet*p, void *userdata); + +void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata); + +#endif diff --git a/src/polyp.c b/src/polyp.c new file mode 100644 index 00000000..fdff7f9c --- /dev/null +++ b/src/polyp.c @@ -0,0 +1,451 @@ +#include <stdio.h> +#include <assert.h> +#include <stdlib.h> +#include <string.h> + +#include "polyp.h" +#include "protocol-native-spec.h" +#include "pdispatch.h" +#include "pstream.h" +#include "dynarray.h" +#include "socket-client.h" +#include "pstream-util.h" + +#define DEFAULT_QUEUE_LENGTH 10240 +#define DEFAULT_MAX_LENGTH 20480 +#define DEFAULT_PREBUF 4096 +#define DEFAULT_TIMEOUT 5 + +struct pa_context { + char *name; + struct pa_mainloop_api* mainloop; + struct socket_client *client; + struct pstream *pstream; + struct pdispatch *pdispatch; + struct dynarray *streams; + struct pa_stream *first_stream; + uint32_t ctag; + uint32_t errno; + enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_READY, CONTEXT_DEAD} state; + + void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata); + void *connect_complete_userdata; + + void (*die_callback)(struct pa_context*c, void *userdata); + void *die_userdata; +}; + +struct pa_stream { + struct pa_context *context; + struct pa_stream *next, *previous; + uint32_t channel; + int channel_valid; + enum pa_stream_direction direction; + enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state; + uint32_t requested_bytes; + + void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata); + void *read_userdata; + + void (*write_callback)(struct pa_stream *p, size_t length, void *userdata); + void *write_userdata; + + void (*create_complete_callback)(struct pa_context*c, struct pa_stream *s, void *userdata); + void *create_complete_userdata; + + void (*die_callback)(struct pa_stream*c, void *userdata); + void *die_userdata; +}; + +static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); + +static const struct pdispatch_command command_table[PA_COMMAND_MAX] = { + [PA_COMMAND_ERROR] = { NULL }, + [PA_COMMAND_REPLY] = { NULL }, + [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL }, + [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL }, + [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL }, + [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL }, + [PA_COMMAND_EXIT] = { NULL }, + [PA_COMMAND_REQUEST] = { command_request }, +}; + +struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) { + assert(mainloop && name); + struct pa_context *c; + c = malloc(sizeof(struct pa_context)); + assert(c); + c->name = strdup(name); + c->mainloop = mainloop; + c->client = NULL; + c->pstream = NULL; + c->pdispatch = NULL; + c->streams = dynarray_new(); + assert(c->streams); + c->first_stream = NULL; + c->errno = PA_ERROR_OK; + c->state = CONTEXT_UNCONNECTED; + c->ctag = 0; + + c->connect_complete_callback = NULL; + c->connect_complete_userdata = NULL; + + c->die_callback = NULL; + c->die_userdata = NULL; + + return c; +} + +void pa_context_free(struct pa_context *c) { + assert(c); + + while (c->first_stream) + pa_stream_free(c->first_stream); + + if (c->client) + socket_client_free(c->client); + if (c->pdispatch) + pdispatch_free(c->pdispatch); + if (c->pstream) + pstream_free(c->pstream); + if (c->streams) + dynarray_free(c->streams, NULL, NULL); + + free(c->name); + free(c); +} + +static void stream_dead(struct pa_stream *s) { + if (s->state == STREAM_DEAD) + return; + + s->state = STREAM_DEAD; + if (s->die_callback) + s->die_callback(s, s->die_userdata); +} + +static void context_dead(struct pa_context *c) { + struct pa_stream *s; + assert(c); + + for (s = c->first_stream; s; s = s->next) + stream_dead(s); + + if (c->state == CONTEXT_DEAD) + return; + + c->state = CONTEXT_DEAD; + if (c->die_callback) + c->die_callback(c, c->die_userdata); +} + +static void pstream_die_callback(struct pstream *p, void *userdata) { + struct pa_context *c = userdata; + assert(p && c); + + assert(c->state != CONTEXT_DEAD); + + c->state = CONTEXT_DEAD; + + context_dead(c); +} + +static int pstream_packet_callback(struct pstream *p, struct packet *packet, void *userdata) { + struct pa_context *c = userdata; + assert(p && packet && c); + + if (pdispatch_run(c->pdispatch, packet, c) < 0) { + fprintf(stderr, "polyp.c: invalid packet.\n"); + return -1; + } + + return 0; +} + +static int pstream_memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) { + struct pa_context *c = userdata; + struct pa_stream *s; + assert(p && chunk && c && chunk->memblock && chunk->memblock->data); + + if (!(s = dynarray_get(c->streams, channel))) + return -1; + + if (s->read_callback) + s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata); + + return 0; +} + +static void on_connection(struct socket_client *client, struct iochannel*io, void *userdata) { + struct pa_context *c = userdata; + assert(client && io && c && c->state == CONTEXT_CONNECTING); + + socket_client_free(client); + c->client = NULL; + + if (!io) { + c->errno = PA_ERROR_CONNECTIONREFUSED; + c->state = CONTEXT_UNCONNECTED; + + if (c->connect_complete_callback) + c->connect_complete_callback(c, 0, c->connect_complete_userdata); + + return; + } + + c->pstream = pstream_new(c->mainloop, io); + assert(c->pstream); + pstream_set_die_callback(c->pstream, pstream_die_callback, c); + pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c); + pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c); + + c->pdispatch = pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX); + assert(c->pdispatch); + + c->state = CONTEXT_READY; + + if (c->connect_complete_callback) + c->connect_complete_callback(c, 1, c->connect_complete_userdata); +} + +int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) { + assert(c && c->state == CONTEXT_UNCONNECTED); + + assert(!c->client); + if (!(c->client = socket_client_new_unix(c->mainloop, server))) { + c->errno = PA_ERROR_CONNECTIONREFUSED; + return -1; + } + + c->connect_complete_callback = complete; + c->connect_complete_userdata = userdata; + + socket_client_set_callback(c->client, on_connection, c); + c->state = CONTEXT_CONNECTING; + + return 0; +} + +int pa_context_is_dead(struct pa_context *c) { + assert(c); + return c->state == CONTEXT_DEAD; +} + +int pa_context_is_ready(struct pa_context *c) { + assert(c); + return c->state == CONTEXT_READY; +} + +int pa_context_errno(struct pa_context *c) { + assert(c); + return c->errno; +} + +void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) { + assert(c); + c->die_callback = cb; + c->die_userdata = userdata; +} + +static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + struct pa_stream *s; + struct pa_context *c = userdata; + uint32_t bytes, channel; + assert(pd && command == PA_COMMAND_REQUEST && t && s); + + if (tagstruct_getu32(t, &channel) < 0 || + tagstruct_getu32(t, &bytes) < 0 || + tagstruct_eof(t)) { + c->errno = PA_ERROR_PROTOCOL; + return -1; + } + + if (!(s = dynarray_get(c->streams, channel))) { + c->errno = PA_ERROR_PROTOCOL; + return -1; + } + + s->requested_bytes += bytes; + + if (s->requested_bytes && s->write_callback) + s->write_callback(s, s->requested_bytes, s->write_userdata); + + return 0; +} + +static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + int ret = 0; + struct pa_stream *s = userdata; + assert(pd && s && s->state == STREAM_CREATING); + + if (command != PA_COMMAND_REPLY) { + struct pa_context *c = s->context; + assert(c); + + if (command == PA_COMMAND_ERROR && tagstruct_getu32(t, &s->context->errno) < 0) { + s->context->errno = PA_ERROR_PROTOCOL; + ret = -1; + } else if (command == PA_COMMAND_TIMEOUT) { + s->context->errno = PA_ERROR_TIMEOUT; + ret = -1; + } + + goto fail; + } + + if (tagstruct_getu32(t, &s->channel) < 0 || + tagstruct_eof(t)) { + s->context->errno = PA_ERROR_PROTOCOL; + ret = -1; + goto fail; + } + + s->channel_valid = 1; + dynarray_put(s->context->streams, s->channel, s); + + s->state = STREAM_READY; + assert(s->create_complete_callback); + s->create_complete_callback(s->context, s, s->create_complete_userdata); + return 0; + +fail: + assert(s->create_complete_callback); + s->create_complete_callback(s->context, NULL, s->create_complete_userdata); + pa_stream_free(s); + return ret; +} + +int pa_stream_new( + struct pa_context *c, + enum pa_stream_direction dir, + const char *dev, + const char *name, + const struct pa_sample_spec *ss, + const struct pa_buffer_attr *attr, + void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata), + void *userdata) { + + struct pa_stream *s; + struct tagstruct *t; + uint32_t tag; + + assert(c && name && ss && c->state == CONTEXT_READY && complete); + + s = malloc(sizeof(struct pa_stream)); + assert(s); + s->context = c; + + s->read_callback = NULL; + s->read_userdata = NULL; + s->write_callback = NULL; + s->write_userdata = NULL; + s->die_callback = NULL; + s->die_userdata = NULL; + s->create_complete_callback = complete; + s->create_complete_userdata = NULL; + + s->state = STREAM_CREATING; + s->requested_bytes = 0; + s->channel = 0; + s->channel_valid = 0; + s->direction = dir; + + t = tagstruct_new(NULL, 0); + assert(t); + + tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM); + tagstruct_putu32(t, tag = c->ctag++); + tagstruct_puts(t, name); + tagstruct_put_sample_spec(t, ss); + tagstruct_putu32(t, (uint32_t) -1); + tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH); + tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH); + tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF); + + pstream_send_tagstruct(c->pstream, t); + + pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s); + + s->next = c->first_stream; + if (s->next) + s->next->previous = s; + s->previous = NULL; + c->first_stream = s; + + return 0; +} + +void pa_stream_free(struct pa_stream *s) { + assert(s && s->context); + + if (s->channel_valid) { + struct tagstruct *t = tagstruct_new(NULL, 0); + assert(t); + + tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM); + tagstruct_putu32(t, s->context->ctag++); + tagstruct_putu32(t, s->channel); + pstream_send_tagstruct(s->context->pstream, t); + } + + if (s->channel_valid) + dynarray_put(s->context->streams, s->channel, NULL); + + if (s->next) + s->next->previous = s->previous; + if (s->previous) + s->previous->next = s->next; + else + s->context->first_stream = s->next; + + free(s); +} + +void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) { + assert(s && cb); + s->write_callback = cb; + s->write_userdata = userdata; +} + +void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { + struct memchunk chunk; + assert(s && s->context && data && length); + + chunk.memblock = memblock_new(length); + assert(chunk.memblock && chunk.memblock->data); + memcpy(chunk.memblock->data, data, length); + chunk.index = 0; + chunk.length = length; + + pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk); + + if (length < s->requested_bytes) + s->requested_bytes -= length; + else + s->requested_bytes = 0; +} + +size_t pa_stream_writable_size(struct pa_stream *s) { + assert(s); + return s->requested_bytes; +} + +void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) { + assert(s && cb); + s->read_callback = cb; + s->read_userdata = userdata; +} + +int pa_stream_is_dead(struct pa_stream *s) { + return s->state == STREAM_DEAD; +} + +int pa_stream_is_ready(struct pa_stream*s) { + return s->state == STREAM_READY; +} + +void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) { + assert(s); + s->die_callback = cb; + s->die_userdata = userdata; +} diff --git a/src/polyp.h b/src/polyp.h new file mode 100644 index 00000000..171e3bdf --- /dev/null +++ b/src/polyp.h @@ -0,0 +1,53 @@ +#ifndef foopolyphfoo +#define foopolyphfoo + +#include <sys/types.h> + +#include "sample.h" +#include "polypdef.h" +#include "mainloop-api.h" + +struct pa_context; + +struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name); + +int pa_context_connect( + struct pa_context *c, + const char *server, + void (*complete) (struct pa_context*c, int success, void *userdata), + void *userdata); + +void pa_context_free(struct pa_context *c); + +void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata); + +int pa_context_is_dead(struct pa_context *c); +int pa_context_is_ready(struct pa_context *c); +int pa_contect_errno(struct pa_context *c); + +struct pa_stream; + +int pa_stream_new( + struct pa_context *c, + enum pa_stream_direction dir, + const char *dev, + const char *name, + const struct pa_sample_spec *ss, + const struct pa_buffer_attr *attr, + void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata), + void *userdata); + +void pa_stream_free(struct pa_stream *p); + +void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata); + +void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata); +void pa_stream_write(struct pa_stream *p, const void *data, size_t length); +size_t pa_stream_writable_size(struct pa_stream *p); + +void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata); + +int pa_stream_is_dead(struct pa_stream *p); +int pa_stream_is_ready(struct pa_stream*p); + +#endif diff --git a/src/polypdef.h b/src/polypdef.h new file mode 100644 index 00000000..aa6e6bf6 --- /dev/null +++ b/src/polypdef.h @@ -0,0 +1,18 @@ +#ifndef foopolypdefhfoo +#define foopolypdefhfoo + +#include <inttypes.h> + +enum pa_stream_direction { + PA_STREAM_PLAYBACK, + PA_STREAM_RECORD +}; + +struct pa_buffer_attr { + uint32_t queue_length; + uint32_t max_length; + uint32_t prebuf; +}; + + +#endif diff --git a/src/protocol-cli.c b/src/protocol-cli.c index c0c93d98..b6460fec 100644 --- a/src/protocol-cli.c +++ b/src/protocol-cli.c @@ -12,8 +12,7 @@ struct protocol_cli { static void cli_eof_cb(struct cli*c, void*userdata) { struct protocol_cli *p = userdata; - assert(c && p); - + assert(p); idxset_remove_by_data(p->connections, c, NULL); cli_free(c); } @@ -22,7 +21,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us struct protocol_cli *p = userdata; struct cli *c; assert(s && io && p); - + c = cli_new(p->core, io); assert(c); cli_set_eof_callback(c, cli_eof_cb, p); diff --git a/src/protocol-native-spec.h b/src/protocol-native-spec.h new file mode 100644 index 00000000..df11ae3c --- /dev/null +++ b/src/protocol-native-spec.h @@ -0,0 +1,29 @@ +#ifndef fooprotocolnativespech +#define fooprotocolnativespech + +enum { + PA_COMMAND_ERROR, + PA_COMMAND_TIMEOUT, /* pseudo command */ + PA_COMMAND_REPLY, + PA_COMMAND_CREATE_PLAYBACK_STREAM, + PA_COMMAND_DELETE_PLAYBACK_STREAM, + PA_COMMAND_CREATE_RECORD_STREAM, + PA_COMMAND_DELETE_RECORD_STREAM, + PA_COMMAND_EXIT, + PA_COMMAND_REQUEST, + PA_COMMAND_MAX +}; + +enum { + PA_ERROR_OK, + PA_ERROR_ACCESS, + PA_ERROR_COMMAND, + PA_ERROR_INVALID, + PA_ERROR_EXIST, + PA_ERROR_NOENTITY, + PA_ERROR_CONNECTIONREFUSED, + PA_ERROR_PROTOCOL, + PA_ERROR_TIMEOUT +}; + +#endif diff --git a/src/protocol-native.c b/src/protocol-native.c index e9cca7c1..a39880b8 100644 --- a/src/protocol-native.c +++ b/src/protocol-native.c @@ -3,34 +3,19 @@ #include <stdlib.h> #include "protocol-native.h" +#include "protocol-native-spec.h" #include "packet.h" #include "client.h" #include "sourceoutput.h" #include "sinkinput.h" #include "pstream.h" #include "tagstruct.h" +#include "pdispatch.h" +#include "pstream-util.h" struct connection; struct protocol_native; -enum { - COMMAND_ERROR, - COMMAND_REPLY, - COMMAND_CREATE_PLAYBACK_STREAM, - COMMAND_DELETE_PLAYBACK_STREAM, - COMMAND_CREATE_RECORD_STREAM, - COMMAND_DELETE_RECORD_STREAM, - COMMAND_EXIT, - COMMAND_MAX -}; - -enum { - ERROR_ACCESS, - ERROR_COMMAND, - ERROR_ARGUMENT, - ERROR_EXIST -}; - struct record_stream { struct connection *connection; uint32_t index; @@ -41,6 +26,7 @@ struct record_stream { struct playback_stream { struct connection *connection; uint32_t index; + size_t qlength; struct sink_input *sink_input; struct memblockq *memblockq; }; @@ -50,6 +36,7 @@ struct connection { struct protocol_native *protocol; struct client *client; struct pstream *pstream; + struct pdispatch *pdispatch; struct idxset *record_streams, *playback_streams; }; @@ -60,6 +47,29 @@ struct protocol_native { struct idxset *connections; }; +static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk); +static void sink_input_drop_cb(struct sink_input *i, size_t length); +static void sink_input_kill_cb(struct sink_input *i); +static uint32_t sink_input_get_latency_cb(struct sink_input *i); + +static void request_bytes(struct playback_stream*s); + +static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); + +static const struct pdispatch_command command_table[PA_COMMAND_MAX] = { + [PA_COMMAND_ERROR] = { NULL }, + [PA_COMMAND_REPLY] = { NULL }, + [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream }, + [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream }, + [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL }, + [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL }, + [PA_COMMAND_EXIT] = { command_exit }, +}; + +/* structure management */ + static void record_stream_free(struct record_stream* r) { assert(r && r->connection); @@ -69,18 +79,28 @@ static void record_stream_free(struct record_stream* r) { free(r); } -static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct sample_spec *ss, const char *name, size_t maxlength, size_t prebuf) { +static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) { struct playback_stream *s; + assert(c && sink && s && name && qlen && maxlength && prebuf); s = malloc(sizeof(struct playback_stream)); assert (s); s->connection = c; + s->qlength = qlen; + s->sink_input = sink_input_new(sink, ss, name); assert(s->sink_input); - s->memblockq = memblockq_new(maxlength, sample_size(ss), prebuf); + s->sink_input->peek = sink_input_peek_cb; + s->sink_input->drop = sink_input_drop_cb; + s->sink_input->kill = sink_input_kill_cb; + s->sink_input->get_latency = sink_input_get_latency_cb; + s->sink_input->userdata = s; + + s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf); assert(s->memblockq); idxset_put(c->playback_streams, s, &s->index); + request_bytes(s); return s; } @@ -99,7 +119,6 @@ static void connection_free(struct connection *c) { assert(c && c->protocol); idxset_remove_by_data(c->protocol->connections, c, NULL); - pstream_free(c->pstream); while ((r = idxset_first(c->record_streams, NULL))) record_stream_free(r); idxset_free(c->record_streams, NULL, NULL); @@ -108,67 +127,90 @@ static void connection_free(struct connection *c) { playback_stream_free(p); idxset_free(c->playback_streams, NULL, NULL); + pdispatch_free(c->pdispatch); + pstream_free(c->pstream); client_free(c->client); free(c); } -/*** pstream callbacks ***/ +static void request_bytes(struct playback_stream *s) { + struct tagstruct *t; + size_t l; + assert(s); -static void send_tagstruct(struct pstream *p, struct tagstruct *t) { - size_t length; - uint8_t *data; - struct packet *packet; - assert(p && t); - - data = tagstruct_free_data(t, &length); - assert(data && length); - packet = packet_new_dynamic(data, length); - assert(packet); - pstream_send_packet(p, packet); - packet_unref(packet); -} + if (!(l = memblockq_missing_to(s->memblockq, s->qlength))) + return; -static void send_error(struct pstream *p, uint32_t tag, uint32_t error) { - struct tagstruct *t = tagstruct_new(NULL, 0); + t = tagstruct_new(NULL, 0); assert(t); - tagstruct_putu32(t, COMMAND_ERROR); - tagstruct_putu32(t, tag); - tagstruct_putu32(t, error); - send_tagstruct(p, t); + tagstruct_putu32(t, PA_COMMAND_REQUEST); + tagstruct_putu32(t, s->index); + tagstruct_putu32(t, l); + pstream_send_tagstruct(s->connection->pstream, t); } -static void send_simple_ack(struct pstream *p, uint32_t tag) { - struct tagstruct *t = tagstruct_new(NULL, 0); - assert(t); - tagstruct_putu32(t, COMMAND_REPLY); - tagstruct_putu32(t, tag); - send_tagstruct(p, t); +/*** sinkinput callbacks ***/ + +static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) { + struct playback_stream *s; + assert(i && i->userdata && chunk); + s = i->userdata; + + if (memblockq_peek(s->memblockq, chunk) < 0) + return -1; + + return 0; } -struct command { - int (*func)(struct connection *c, uint32_t tag, struct tagstruct *t); -}; +static void sink_input_drop_cb(struct sink_input *i, size_t length) { + struct playback_stream *s; + assert(i && i->userdata && length); + s = i->userdata; -static int command_create_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) { + memblockq_drop(s->memblockq, length); + request_bytes(s); +} + +static void sink_input_kill_cb(struct sink_input *i) { struct playback_stream *s; - size_t maxlength, prebuf; + assert(i && i->userdata); + s = i->userdata; + + playback_stream_free(s); +} + +static uint32_t sink_input_get_latency_cb(struct sink_input *i) { + struct playback_stream *s; + assert(i && i->userdata); + s = i->userdata; + + return pa_samples_usec(memblockq_get_length(s->memblockq), &s->sink_input->sample_spec); +} + +/*** pdispatch callbacks ***/ + +static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + struct connection *c = userdata; + struct playback_stream *s; + size_t maxlength, prebuf, qlength; uint32_t sink_index; const char *name; - struct sample_spec ss; + struct pa_sample_spec ss; struct tagstruct *reply; struct sink *sink; assert(c && t && c->protocol && c->protocol->core); if (tagstruct_gets(t, &name) < 0 || tagstruct_get_sample_spec(t, &ss) < 0 || - tagstruct_getu32(t, &sink_index) < 0 || + tagstruct_getu32(t, &sink_index) < 0 || + tagstruct_getu32(t, &qlength) < 0 || tagstruct_getu32(t, &maxlength) < 0 || tagstruct_getu32(t, &prebuf) < 0 || !tagstruct_eof(t)) return -1; if (!c->authorized) { - send_error(c->pstream, tag, ERROR_ACCESS); + pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); return 0; } @@ -178,25 +220,28 @@ static int command_create_playback_stream(struct connection *c, uint32_t tag, st sink = idxset_get_by_index(c->protocol->core->sinks, sink_index); if (!sink) { - send_error(c->pstream, tag, ERROR_EXIST); + pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); return 0; } - if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, prebuf))) { - send_error(c->pstream, tag, ERROR_ARGUMENT); + if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) { + pstream_send_error(c->pstream, tag, PA_ERROR_INVALID); return 0; } reply = tagstruct_new(NULL, 0); assert(reply); - tagstruct_putu32(reply, COMMAND_REPLY); + tagstruct_putu32(reply, PA_COMMAND_REPLY); tagstruct_putu32(reply, tag); tagstruct_putu32(reply, s->index); - send_tagstruct(c->pstream, reply); + assert(s->sink_input); + tagstruct_putu32(reply, s->sink_input->index); + pstream_send_tagstruct(c->pstream, reply); return 0; } -static int command_delete_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) { +static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + struct connection *c = userdata; uint32_t channel; struct playback_stream *s; assert(c && t); @@ -206,78 +251,50 @@ static int command_delete_playback_stream(struct connection *c, uint32_t tag, st return -1; if (!c->authorized) { - send_error(c->pstream, tag, ERROR_ACCESS); + pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); return 0; } if (!(s = idxset_get_by_index(c->playback_streams, channel))) { - send_error(c->pstream, tag, ERROR_EXIST); + pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); return 0; } - send_simple_ack(c->pstream, tag); + pstream_send_simple_ack(c->pstream, tag); return 0; } -static int command_exit(struct connection *c, uint32_t tag, struct tagstruct *t) { +static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + struct connection *c = userdata; assert(c && t); if (!tagstruct_eof(t)) return -1; if (!c->authorized) { - send_error(c->pstream, tag, ERROR_ACCESS); + pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); return 0; } - assert(c->protocol && c->protocol->core); - mainloop_quit(c->protocol->core->mainloop, -1); - send_simple_ack(c->pstream, tag); /* nonsense */ + assert(c->protocol && c->protocol->core && c->protocol->core->mainloop); + c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0); + pstream_send_simple_ack(c->pstream, tag); /* nonsense */ return 0; } -static const struct command commands[] = { - [COMMAND_ERROR] = { NULL }, - [COMMAND_REPLY] = { NULL }, - [COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream }, - [COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream }, - [COMMAND_CREATE_RECORD_STREAM] = { NULL }, - [COMMAND_DELETE_RECORD_STREAM] = { NULL }, - [COMMAND_EXIT] = { command_exit }, -}; +/*** pstream callbacks ***/ + static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) { struct connection *c = userdata; - uint32_t tag, command; - struct tagstruct *ts = NULL; assert(p && packet && packet->data && c); - if (packet->length <= 8) - goto fail; - - ts = tagstruct_new(packet->data, packet->length); - assert(ts); - - if (tagstruct_getu32(ts, &command) < 0 || - tagstruct_getu32(ts, &tag) < 0) - goto fail; - - if (command >= COMMAND_MAX || !commands[command].func) - send_error(p, tag, ERROR_COMMAND); - else if (commands[command].func(c, tag, ts) < 0) - goto fail; + if (pdispatch_run(c->pdispatch, packet, c) < 0) { + fprintf(stderr, "protocol-native: invalid packet.\n"); + return -1; + } - tagstruct_free(ts); - return 0; - -fail: - if (ts) - tagstruct_free(ts); - - fprintf(stderr, "protocol-native: invalid packet.\n"); - return -1; - } static int memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) { @@ -326,6 +343,9 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us pstream_set_recieve_memblock_callback(c->pstream, memblock_callback, c); pstream_set_die_callback(c->pstream, die_callback, c); + c->pdispatch = pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX); + assert(c->pdispatch); + c->record_streams = idxset_new(NULL, NULL); c->playback_streams = idxset_new(NULL, NULL); assert(c->record_streams && c->playback_streams); diff --git a/src/protocol-simple.c b/src/protocol-simple.c index 8e4246cd..c8c45854 100644 --- a/src/protocol-simple.c +++ b/src/protocol-simple.c @@ -9,6 +9,7 @@ #include "sourceoutput.h" #include "protocol-simple.h" #include "client.h" +#include "sample-util.h" struct connection { struct protocol_simple *protocol; @@ -115,9 +116,10 @@ static int do_write(struct connection *c) { /*** sink_input callbacks ***/ static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) { - struct connection*c = i->userdata; - assert(i && c && chunk); - + struct connection*c; + assert(i && i->userdata && chunk); + c = i->userdata; + if (memblockq_peek(c->input_memblockq, chunk) < 0) return -1; @@ -143,7 +145,7 @@ static void sink_input_kill_cb(struct sink_input *i) { static uint32_t sink_input_get_latency_cb(struct sink_input *i) { struct connection*c = i->userdata; assert(i && c); - return samples_usec(memblockq_get_length(c->input_memblockq), &DEFAULT_SAMPLE_SPEC); + return pa_samples_usec(memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); } /*** source_output callbacks ***/ @@ -185,6 +187,7 @@ static void io_callback(struct iochannel*io, void *userdata) { static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) { struct protocol_simple *p = userdata; struct connection *c = NULL; + char cname[256]; assert(s && io && p); c = malloc(sizeof(struct connection)); @@ -195,7 +198,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c->input_memblockq = c->output_memblockq = NULL; c->protocol = p; - c->client = client_new(p->core, "SIMPLE", "Client"); + iochannel_peer_to_string(io, cname, sizeof(cname)); + c->client = client_new(p->core, "SIMPLE", cname); assert(c->client); c->client->kill = client_kill_cb; c->client->userdata = c; @@ -215,8 +219,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c->source_output->kill = source_output_kill_cb; c->source_output->userdata = c; - l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */ - c->output_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2); + l = 5*pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */ + c->output_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2); } if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) { @@ -236,8 +240,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c->sink_input->get_latency = sink_input_get_latency_cb; c->sink_input->userdata = c; - l = bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */ - c->input_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2); + l = pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */ + c->input_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2); } diff --git a/src/pstream-util.c b/src/pstream-util.c new file mode 100644 index 00000000..2fab2b61 --- /dev/null +++ b/src/pstream-util.c @@ -0,0 +1,35 @@ +#include <assert.h> + +#include "protocol-native-spec.h" +#include "pstream-util.h" + +void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t) { + size_t length; + uint8_t *data; + struct packet *packet; + assert(p && t); + + data = tagstruct_free_data(t, &length); + assert(data && length); + packet = packet_new_dynamic(data, length); + assert(packet); + pstream_send_packet(p, packet); + packet_unref(packet); +} + +void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error) { + struct tagstruct *t = tagstruct_new(NULL, 0); + assert(t); + tagstruct_putu32(t, PA_COMMAND_ERROR); + tagstruct_putu32(t, tag); + tagstruct_putu32(t, error); + pstream_send_tagstruct(p, t); +} + +void pstream_send_simple_ack(struct pstream *p, uint32_t tag) { + struct tagstruct *t = tagstruct_new(NULL, 0); + assert(t); + tagstruct_putu32(t, PA_COMMAND_REPLY); + tagstruct_putu32(t, tag); + pstream_send_tagstruct(p, t); +} diff --git a/src/pstream-util.h b/src/pstream-util.h new file mode 100644 index 00000000..4e64a95c --- /dev/null +++ b/src/pstream-util.h @@ -0,0 +1,14 @@ +#ifndef foopstreamutilhfoo +#define foopstreamutilhfoo + +#include <inttypes.h> +#include "pstream.h" +#include "tagstruct.h" + +/* The tagstruct is freed!*/ +void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t); + +void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error); +void pstream_send_simple_ack(struct pstream *p, uint32_t tag); + +#endif diff --git a/src/pstream.c b/src/pstream.c index a63e126d..4a3a648b 100644 --- a/src/pstream.c +++ b/src/pstream.c @@ -30,7 +30,7 @@ struct item_info { }; struct pstream { - struct mainloop *mainloop; + struct pa_mainloop_api *mainloop; struct mainloop_source *mainloop_source; struct iochannel *io; struct queue *send_queue; @@ -70,18 +70,24 @@ static void do_read(struct pstream *p); static void io_callback(struct iochannel*io, void *userdata) { struct pstream *p = userdata; assert(p && p->io == io); + + p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0); + do_write(p); do_read(p); } -static void prepare_callback(struct mainloop_source *s, void*userdata) { +static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) { struct pstream *p = userdata; - assert(p && p->mainloop_source == s); + assert(p && p->mainloop_source == id && p->mainloop == m); + + p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0); + do_write(p); do_read(p); } -struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) { +struct pstream *pstream_new(struct pa_mainloop_api *m, struct iochannel *io) { struct pstream *p; assert(io); @@ -96,8 +102,8 @@ struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) { p->die_callback_userdata = NULL; p->mainloop = m; - p->mainloop_source = mainloop_source_new_fixed(m, prepare_callback, p); - mainloop_source_enable(p->mainloop_source, 0); + p->mainloop_source = m->source_fixed(m, fixed_callback, p); + m->enable_fixed(m, p->mainloop_source, 0); p->send_queue = queue_new(); assert(p->send_queue); @@ -152,7 +158,7 @@ void pstream_free(struct pstream *p) { if (p->read.packet) packet_unref(p->read.packet); - mainloop_source_free(p->mainloop_source); + p->mainloop->cancel_fixed(p->mainloop, p->mainloop_source); free(p); } @@ -173,7 +179,7 @@ void pstream_send_packet(struct pstream*p, struct packet *packet) { i->packet = packet_ref(packet); queue_push(p->send_queue, i); - mainloop_source_enable(p->mainloop_source, 1); + p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1); } void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) { @@ -190,7 +196,7 @@ void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, st memblock_ref(i->chunk.memblock); queue_push(p->send_queue, i); - mainloop_source_enable(p->mainloop_source, 1); + p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1); } void pstream_set_recieve_packet_callback(struct pstream *p, int (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) { @@ -219,7 +225,7 @@ static void prepare_next_write_item(struct pstream *p) { assert(p->write.current->packet); p->write.data = p->write.current->packet->data; p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length); - p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0; + p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1); p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0; } else { assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock); @@ -236,8 +242,6 @@ static void do_write(struct pstream *p) { ssize_t r; assert(p); - mainloop_source_enable(p->mainloop_source, 0); - if (p->dead || !iochannel_is_writable(p->io)) return; @@ -285,8 +289,6 @@ static void do_read(struct pstream *p) { ssize_t r; assert(p); - mainloop_source_enable(p->mainloop_source, 0); - if (p->dead || !iochannel_is_readable(p->io)) return; @@ -313,7 +315,7 @@ static void do_read(struct pstream *p) { assert(!p->read.packet && !p->read.memblock); - if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == 0) { + if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) { /* Frame is a packet frame */ p->read.packet = packet_new(ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH])); assert(p->read.packet); @@ -331,7 +333,7 @@ static void do_read(struct pstream *p) { if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */ size_t l; - l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r; + l = (p->read.index - r) < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : (size_t) r; if (l > 0) { struct memchunk chunk; diff --git a/src/pstream.h b/src/pstream.h index 7113681e..d418908e 100644 --- a/src/pstream.h +++ b/src/pstream.h @@ -6,10 +6,11 @@ #include "packet.h" #include "memblock.h" #include "iochannel.h" +#include "mainloop-api.h" struct pstream; -struct pstream* pstream_new(struct mainloop *m, struct iochannel *io); +struct pstream* pstream_new(struct pa_mainloop_api *m, struct iochannel *io); void pstream_free(struct pstream*p); void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata); diff --git a/src/queue.c b/src/queue.c index 90823ae6..5c2e7a67 100644 --- a/src/queue.c +++ b/src/queue.c @@ -73,5 +73,12 @@ void* queue_pop(struct queue *q) { p = e->data; free(e); + q->length--; + return p; } + +int queue_is_empty(struct queue *q) { + assert(q); + return q->length == 0; +} diff --git a/src/sample-util.c b/src/sample-util.c new file mode 100644 index 00000000..7a3c267a --- /dev/null +++ b/src/sample-util.c @@ -0,0 +1,88 @@ +#include <string.h> +#include <assert.h> + +#include "sample-util.h" + +struct pa_sample_spec default_sample_spec = { + .format = SAMPLE_S16NE, + .rate = 44100, + .channels = 2 +}; + +struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec) { + assert(b && b->data && spec); + memblock_assert_exclusive(b); + silence_memory(b->data, b->length, spec); + return b; +} + +void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec) { + assert(c && c->memblock && c->memblock->data && spec && c->length); + memblock_assert_exclusive(c->memblock); + silence_memory(c->memblock->data+c->index, c->length, spec); +} + +void silence_memory(void *p, size_t length, struct pa_sample_spec *spec) { + char c = 0; + assert(p && length && spec); + + switch (spec->format) { + case SAMPLE_U8: + c = 127; + break; + case SAMPLE_S16LE: + case SAMPLE_S16BE: + case SAMPLE_FLOAT32: + c = 0; + break; + case SAMPLE_ALAW: + case SAMPLE_ULAW: + c = 80; + break; + } + + memset(p, c, length); +} + +size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume) { + unsigned c, d; + assert(channels && data && length && spec); + assert(spec->format == SAMPLE_S16NE); + + for (d = 0;; d += sizeof(int16_t)) { + int32_t sum = 0; + + if (d >= length) + return d; + + for (c = 0; c < nchannels; c++) { + int32_t v; + uint8_t volume = channels[c].volume; + + if (d >= channels[c].chunk.length) + return d; + + if (volume == 0) + v = 0; + else { + v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d)); + + if (volume != 0xFF) + v = v*volume/0xFF; + } + + sum += v; + } + + if (volume == 0) + sum = 0; + else if (volume != 0xFF) + sum = sum*volume/0xFF; + + if (sum < -0x8000) sum = -0x8000; + if (sum > 0x7FFF) sum = 0x7FFF; + + *((int16_t*) data) = sum; + data += sizeof(int16_t); + } +} diff --git a/src/sample-util.h b/src/sample-util.h new file mode 100644 index 00000000..0a3f7c89 --- /dev/null +++ b/src/sample-util.h @@ -0,0 +1,23 @@ +#ifndef foosampleutilhfoo +#define foosampleutilhfoo + +#include "sample.h" +#include "memblock.h" + +#define DEFAULT_SAMPLE_SPEC default_sample_spec + +extern struct pa_sample_spec default_sample_spec; + +struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec); +void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec); +void silence_memory(void *p, size_t length, struct pa_sample_spec *spec); + +struct mix_info { + struct memchunk chunk; + uint8_t volume; + void *userdata; +}; + +size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume); + +#endif diff --git a/src/sample.c b/src/sample.c index c270f255..2454630c 100644 --- a/src/sample.c +++ b/src/sample.c @@ -1,50 +1,8 @@ -#include <string.h> #include <assert.h> #include "sample.h" -struct sample_spec default_sample_spec = { - .format = SAMPLE_S16NE, - .rate = 44100, - .channels = 2 -}; - -struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec) { - assert(b && b->data && spec); - memblock_assert_exclusive(b); - silence_memory(b->data, b->length, spec); - return b; -} - -void silence_memchunk(struct memchunk *c, struct sample_spec *spec) { - assert(c && c->memblock && c->memblock->data && spec && c->length); - memblock_assert_exclusive(c->memblock); - silence_memory(c->memblock->data+c->index, c->length, spec); -} - -void silence_memory(void *p, size_t length, struct sample_spec *spec) { - char c = 0; - assert(p && length && spec); - - switch (spec->format) { - case SAMPLE_U8: - c = 127; - break; - case SAMPLE_S16LE: - case SAMPLE_S16BE: - case SAMPLE_FLOAT32: - c = 0; - break; - case SAMPLE_ALAW: - case SAMPLE_ULAW: - c = 80; - break; - } - - memset(p, c, length); -} - -size_t sample_size(struct sample_spec *spec) { +size_t pa_sample_size(struct pa_sample_spec *spec) { assert(spec); size_t b = 1; @@ -66,56 +24,14 @@ size_t sample_size(struct sample_spec *spec) { return b * spec->channels; } -size_t bytes_per_second(struct sample_spec *spec) { +size_t pa_bytes_per_second(struct pa_sample_spec *spec) { assert(spec); - return spec->rate*sample_size(spec); + return spec->rate*pa_sample_size(spec); } -size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume) { - unsigned c, d; - assert(channels && data && length && spec); - assert(spec->format == SAMPLE_S16NE); - - for (d = 0;; d += sizeof(int16_t)) { - int32_t sum = 0; - - if (d >= length) - return d; - - for (c = 0; c < nchannels; c++) { - int32_t v; - uint8_t volume = channels[c].volume; - - if (d >= channels[c].chunk.length) - return d; - - if (volume == 0) - v = 0; - else { - v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d)); - - if (volume != 0xFF) - v = v*volume/0xFF; - } - - sum += v; - } - - if (volume == 0) - sum = 0; - else if (volume != 0xFF) - sum = sum*volume/0xFF; - - if (sum < -0x8000) sum = -0x8000; - if (sum > 0x7FFF) sum = 0x7FFF; - - *((int16_t*) data) = sum; - data += sizeof(int16_t); - } -} -uint32_t samples_usec(size_t length, struct sample_spec *spec) { +uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec) { assert(spec); - return (uint32_t) (((double) length /sample_size(spec))/spec->rate*1000000); + return (uint32_t) (((double) length /pa_sample_size(spec))/spec->rate*1000000); } diff --git a/src/sample.h b/src/sample.h index b2f13cc4..a4a973bf 100644 --- a/src/sample.h +++ b/src/sample.h @@ -2,10 +2,9 @@ #define foosamplehfoo #include <inttypes.h> +#include <sys/types.h> -#include "memblock.h" - -enum sample_format { +enum pa_sample_format { SAMPLE_U8, SAMPLE_ALAW, SAMPLE_ULAW, @@ -16,30 +15,14 @@ enum sample_format { #define SAMPLE_S16NE SAMPLE_S16LE -struct sample_spec { - enum sample_format format; +struct pa_sample_spec { + enum pa_sample_format format; uint32_t rate; uint8_t channels; }; -#define DEFAULT_SAMPLE_SPEC default_sample_spec - -extern struct sample_spec default_sample_spec; - -struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec); -void silence_memchunk(struct memchunk *c, struct sample_spec *spec); -void silence_memory(void *p, size_t length, struct sample_spec *spec); - -struct mix_info { - struct memchunk chunk; - uint8_t volume; - void *userdata; -}; - -size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume); - -size_t bytes_per_second(struct sample_spec *spec); -size_t sample_size(struct sample_spec *spec); -uint32_t samples_usec(size_t length, struct sample_spec *spec); +size_t pa_bytes_per_second(struct pa_sample_spec *spec); +size_t pa_sample_size(struct pa_sample_spec *spec); +uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec); #endif diff --git a/src/simple.c b/src/simple.c new file mode 100644 index 00000000..a90d22bd --- /dev/null +++ b/src/simple.c @@ -0,0 +1,120 @@ +#include "simple.h" +#include "polyp.h" +#include "mainloop.h" + +struct pa_simple { + struct mainloop *mainloop; + struct pa_context *context; + struct pa_stream *stream; + + size_t requested; + int dead; +}; + +static void playback_callback(struct pa_stream *p, size_t length, void *userdata) { + struct pa_stream *sp = userdata; + assert(p && length && sp); + + sp->requested = length; +} + +struct pa_simple* pa_simple_new( + const char *server, + const char *name, + enum pa_stream_direction dir, + const char *dev, + const char *stream_name, + const struct pa_sample_spec *ss, + const struct pa_buffer_attr *attr) { + + struct pa_simple *p; + assert(ss); + + p = malloc(sizeof(struct pa_simple)); + assert(p); + p->context = NULL; + p->stream = NULL; + p->mainloop = pa_mainloop_new(); + assert(p->mainloop); + p->requested = 0; + p->dead = 0; + + if (!(p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), name))) + goto fail; + + if (pa_context_connect(c, server, NULL, NULL) < 0) + goto fail; + + while (!pa_context_is_ready(c)) { + if (pa_context_is_dead(c)) + goto fail; + + if (mainloop_iterate(p->mainloop) < 0) + goto fail; + } + + if (!(p->stream = pa_stream_new(p->context, dir, sink, stream_name, ss, attr, NULL, NULL))) + goto fail; + + while (!pa_stream_is_ready(c)) { + if (pa_stream_is_dead(c)) + goto fail; + + if (mainloop_iterate(p->mainloop) < 0) + goto fail; + } + + pa_stream_set_write_callback(p->stream, playback_callback, p); + + return p; + +fail: + pa_simple_free(p); + return NULL; +} + +void pa_simple_free(struct pa_simple *s) { + assert(s); + + if (s->stream) + pa_stream_free(s->stream); + + if (s->context) + pa_context_free(s->context); + + if (s->mainloop) + mainloop_free(s->mainloop); + + free(s); +} + +int pa_simple_write(struct pa_simple *s, const void*data, size_t length) { + assert(s && data); + + while (length > 0) { + size_t l; + + while (!s->requested) { + if (pa_context_is_dead(c)) + return -1; + + if (mainloop_iterate(s->mainloop) < 0) + return -1; + } + + l = length; + if (l > s->requested) + l = s->requested; + + pa_stream_write(s->stream, data, l); + data += l; + length -= l; + s->requested = -l; + } + + return 0; +} + +int pa_simple_read(struct pa_simple *s, const void*data, size_t length) { + assert(0); +} diff --git a/src/simple.h b/src/simple.h new file mode 100644 index 00000000..80693056 --- /dev/null +++ b/src/simple.h @@ -0,0 +1,25 @@ +#ifndef foosimplehfoo +#define foosimplehfoo + +#include <sys/types.h> + +#include "sample.h" +#include "polypdef.h" + +struct pa_simple; + +struct pa_simple* pa_simple_new( + const char *server, + const char *name, + enum pa_stream_direction dir, + const char *dev, + const char *stream_name, + const struct pa_sample_spec *ss, + const struct pa_buffer_attr *attr); + +void pa_simple_free(struct pa_simple *s); + +int pa_simple_write(struct pa_simple *s, const void*data, size_t length); +int pa_simple_read(struct pa_simple *s, const void*data, size_t length); + +#endif @@ -6,10 +6,11 @@ #include "sink.h" #include "sinkinput.h" #include "strbuf.h" +#include "sample-util.h" #define MAX_MIX_CHANNELS 32 -struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) { +struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec) { struct sink *s; char *n = NULL; int r; @@ -15,7 +15,7 @@ struct sink { char *name; struct core *core; - struct sample_spec sample_spec; + struct pa_sample_spec sample_spec; struct idxset *inputs; struct source *monitor_source; @@ -27,7 +27,7 @@ struct sink { void *userdata; }; -struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec); +struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec); void sink_free(struct sink* s); int sink_render(struct sink*s, size_t length, struct memchunk *result); diff --git a/src/sinkinput.c b/src/sinkinput.c index 2e6a8c36..b81c9c71 100644 --- a/src/sinkinput.c +++ b/src/sinkinput.c @@ -5,7 +5,7 @@ #include "sinkinput.h" #include "strbuf.h" -struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name) { +struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name) { struct sink_input *i; int r; assert(s && spec); @@ -14,7 +14,7 @@ struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, cons assert(i); i->name = name ? strdup(name) : NULL; i->sink = s; - i->spec = *spec; + i->sample_spec = *spec; i->peek = NULL; i->drop = NULL; diff --git a/src/sinkinput.h b/src/sinkinput.h index 389d832d..f04ecb95 100644 --- a/src/sinkinput.h +++ b/src/sinkinput.h @@ -12,7 +12,7 @@ struct sink_input { char *name; struct sink *sink; - struct sample_spec spec; + struct pa_sample_spec sample_spec; uint8_t volume; int (*peek) (struct sink_input *i, struct memchunk *chunk); @@ -23,7 +23,7 @@ struct sink_input { void *userdata; }; -struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name); +struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name); void sink_input_free(struct sink_input* i); /* Code that didn't create the input stream should call this function to diff --git a/src/socket-client.c b/src/socket-client.c new file mode 100644 index 00000000..812c43f3 --- /dev/null +++ b/src/socket-client.c @@ -0,0 +1,177 @@ +#include <unistd.h> +#include <stdio.h> +#include <errno.h> +#include <string.h> +#include <assert.h> +#include <stdlib.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include "socket-client.h" +#include "util.h" + +struct socket_client { + struct pa_mainloop_api *mainloop; + int fd; + + void *io_source, *fixed_source; + void (*callback)(struct socket_client*c, struct iochannel *io, void *userdata); + void *userdata; +}; + +static struct socket_client*socket_client_new(struct pa_mainloop_api *m) { + struct socket_client *c; + assert(m); + + c = malloc(sizeof(struct socket_client)); + assert(c); + c->mainloop = m; + c->fd = -1; + c->io_source = c->fixed_source = NULL; + c->callback = NULL; + c->userdata = NULL; + return c; +} + +static void do_call(struct socket_client *c) { + struct iochannel *io; + int error, lerror; + assert(c && c->callback); + + lerror = sizeof(error); + if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &error, &lerror) < 0) { + fprintf(stderr, "getsockopt(): %s\n", strerror(errno)); + goto failed; + } + + if (lerror != sizeof(error)) { + fprintf(stderr, "getsocktop() returned invalid size.\n"); + goto failed; + } + + if (error != 0) { + fprintf(stderr, "connect(): %s\n", strerror(error)); + goto failed; + } + + io = iochannel_new(c->mainloop, c->fd, c->fd); + assert(io); + c->fd = -1; + c->callback(c, io, c->userdata); + + return; + +failed: + close(c->fd); + c->fd = -1; + c->callback(c, NULL, c->userdata); + return; +} + +static void connect_fixed_cb(struct pa_mainloop_api *m, void *id, void *userdata) { + struct socket_client *c = userdata; + assert(m && c && c->fixed_source == id); + m->cancel_fixed(m, c->fixed_source); + c->fixed_source = NULL; + do_call(c); +} + +static void connect_io_cb(struct pa_mainloop_api*m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { + struct socket_client *c = userdata; + assert(m && c && c->io_source == id && fd >= 0 && events == PA_MAINLOOP_API_IO_EVENT_OUTPUT); + m->cancel_io(m, c->io_source); + c->io_source = NULL; + do_call(c); +} + +static int do_connect(struct socket_client *c, const struct sockaddr *sa, socklen_t len) { + int r; + assert(c && sa && len); + + make_nonblock_fd(c->fd); + + if ((r = connect(c->fd, sa, len)) < 0) { + if (r != EINPROGRESS) { + fprintf(stderr, "connect(): %s\n", strerror(errno)); + return -1; + } + + c->io_source = c->mainloop->source_io(c->mainloop, c->fd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, connect_io_cb, c); + assert(c->io_source); + } else { + c->fixed_source = c->mainloop->source_fixed(c->mainloop, connect_fixed_cb, c); + assert(c->io_source); + } + + return 0; +} + +struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) { + struct socket_client *c; + struct sockaddr_in sa; + + c = socket_client_new(m); + assert(c); + + if ((c->fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, "socket(): %s\n", strerror(errno)); + goto fail; + } + + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + sa.sin_addr.s_addr = htonl(address); + + if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0) + goto fail; + + return c; + +fail: + socket_client_free(c); + return NULL; +} + +struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename) { + struct socket_client *c; + struct sockaddr_un sa; + + c = socket_client_new(m); + assert(c); + + if ((c->fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, "socket(): %s\n", strerror(errno)); + goto fail; + } + + sa.sun_family = AF_LOCAL; + strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1); + sa.sun_path[sizeof(sa.sun_path) - 1] = 0; + + if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0) + goto fail; + + return c; + +fail: + socket_client_free(c); + return NULL; +} + +void socket_client_free(struct socket_client *c) { + assert(c && c->mainloop); + if (c->io_source) + c->mainloop->cancel_io(c->mainloop, c->io_source); + if (c->fixed_source) + c->mainloop->cancel_fixed(c->mainloop, c->fixed_source); + if (c->fd >= 0) + close(c->fd); + free(c); +} + +void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata) { + assert(c); + c->callback = on_connection; + c->userdata = userdata; +} diff --git a/src/socket-client.h b/src/socket-client.h new file mode 100644 index 00000000..4de01e34 --- /dev/null +++ b/src/socket-client.h @@ -0,0 +1,17 @@ +#ifndef foosocketclienthfoo +#define foosocketclienthfoo + +#include <inttypes.h> +#include "mainloop-api.h" +#include "iochannel.h" + +struct socket_client; + +struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port); +struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename); + +void socket_client_free(struct socket_client *c); + +void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata); + +#endif diff --git a/src/socket-server.c b/src/socket-server.c index 6ad225e3..87fe1476 100644 --- a/src/socket-server.c +++ b/src/socket-server.c @@ -19,14 +19,15 @@ struct socket_server { void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata); void *userdata; - struct mainloop_source *mainloop_source; + void *mainloop_source; + struct pa_mainloop_api *mainloop; }; -static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) { +static void callback(struct pa_mainloop_api *mainloop, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { struct socket_server *s = userdata; struct iochannel *io; int nfd; - assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s); + assert(s && s->mainloop == mainloop && s->mainloop_source == id && id && fd >= 0 && fd == s->fd && events == PA_MAINLOOP_API_IO_EVENT_INPUT); if ((nfd = accept(fd, NULL, NULL)) < 0) { fprintf(stderr, "accept(): %s\n", strerror(errno)); @@ -38,12 +39,12 @@ static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event return; } - io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd); + io = iochannel_new(s->mainloop, nfd, nfd); assert(io); s->on_connection(s, io, s->userdata); } -struct socket_server* socket_server_new(struct mainloop *m, int fd) { +struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd) { struct socket_server *s; assert(m && fd >= 0); @@ -54,13 +55,14 @@ struct socket_server* socket_server_new(struct mainloop *m, int fd) { s->on_connection = NULL; s->userdata = NULL; - s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s); + s->mainloop = m; + s->mainloop_source = m->source_io(m, fd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, s); assert(s->mainloop_source); return s; } -struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) { +struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename) { int fd = -1; struct sockaddr_un sa; struct socket_server *s; @@ -101,7 +103,7 @@ fail: return NULL; } -struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) { +struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) { int fd = -1; struct sockaddr_in sa; int on = 1; @@ -148,7 +150,8 @@ void socket_server_free(struct socket_server*s) { free(s->filename); } - mainloop_source_free(s->mainloop_source); + + s->mainloop->cancel_io(s->mainloop, s->mainloop_source); free(s); } diff --git a/src/socket-server.h b/src/socket-server.h index 4814fc62..80895f8d 100644 --- a/src/socket-server.h +++ b/src/socket-server.h @@ -2,14 +2,14 @@ #define foosocketserverhfoo #include <inttypes.h> -#include "mainloop.h" +#include "mainloop-api.h" #include "iochannel.h" struct socket_server; -struct socket_server* socket_server_new(struct mainloop *m, int fd); -struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename); -struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port); +struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd); +struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename); +struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port); void socket_server_free(struct socket_server*s); diff --git a/src/source.c b/src/source.c index 44d77532..21ac24f3 100644 --- a/src/source.c +++ b/src/source.c @@ -7,7 +7,7 @@ #include "sourceoutput.h" #include "strbuf.h" -struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) { +struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec) { struct source *s; int r; assert(core && spec); diff --git a/src/source.h b/src/source.h index 078fb1c9..04f3984f 100644 --- a/src/source.h +++ b/src/source.h @@ -14,14 +14,14 @@ struct source { char *name; struct core *core; - struct sample_spec sample_spec; + struct pa_sample_spec sample_spec; struct idxset *outputs; void (*notify)(struct source*source); void *userdata; }; -struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec); +struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec); void source_free(struct source *s); /* Pass a new memory block to all output streams */ diff --git a/src/sourceoutput.c b/src/sourceoutput.c index 8021b522..e2e1dacc 100644 --- a/src/sourceoutput.c +++ b/src/sourceoutput.c @@ -5,7 +5,7 @@ #include "sourceoutput.h" #include "strbuf.h" -struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name) { +struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name) { struct source_output *o; int r; assert(s && spec); @@ -14,7 +14,7 @@ struct source_output* source_output_new(struct source *s, struct sample_spec *sp assert(o); o->name = name ? strdup(name) : NULL; o->source = s; - o->spec = *spec; + o->sample_spec = *spec; o->push = NULL; o->kill = NULL; diff --git a/src/sourceoutput.h b/src/sourceoutput.h index 359ff151..50cb9caf 100644 --- a/src/sourceoutput.h +++ b/src/sourceoutput.h @@ -12,7 +12,7 @@ struct source_output { char *name; struct source *source; - struct sample_spec spec; + struct pa_sample_spec sample_spec; void (*push)(struct source_output *o, struct memchunk *chunk); void (*kill)(struct source_output* o); @@ -20,7 +20,7 @@ struct source_output { void *userdata; }; -struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name); +struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name); void source_output_free(struct source_output* o); void source_output_kill(struct source_output*o); diff --git a/src/tagstruct.c b/src/tagstruct.c index 429dd408..47e17839 100644 --- a/src/tagstruct.c +++ b/src/tagstruct.c @@ -90,7 +90,7 @@ void tagstruct_putu8(struct tagstruct*t, uint8_t c) { t->length += 2; } -void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss) { +void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss) { assert(t && ss); extend(t, 7); t->data[t->length] = TAG_SAMPLE_SPEC; @@ -156,7 +156,7 @@ int tagstruct_getu8(struct tagstruct*t, uint8_t *c) { return 0; } -int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss) { +int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss) { assert(t && ss); if (t->rindex+7 > t->length) diff --git a/src/tagstruct.h b/src/tagstruct.h index 5572c64c..9f6a0bf4 100644 --- a/src/tagstruct.h +++ b/src/tagstruct.h @@ -15,12 +15,12 @@ uint8_t* tagstruct_free_data(struct tagstruct*t, size_t *l); void tagstruct_puts(struct tagstruct*t, const char *s); void tagstruct_putu32(struct tagstruct*t, uint32_t i); void tagstruct_putu8(struct tagstruct*t, uint8_t c); -void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss); +void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss); int tagstruct_gets(struct tagstruct*t, const char **s); int tagstruct_getu32(struct tagstruct*t, uint32_t *i); int tagstruct_getu8(struct tagstruct*t, uint8_t *c); -int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss); +int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss); int tagstruct_eof(struct tagstruct*t); const uint8_t* tagstruct_data(struct tagstruct*t, size_t *l); @@ -1,8 +1,10 @@ +- sync() function in native library +- name registrar - native protocol/library - simple control protocol: kill client/input/output; set_volume - resampling - esound protocol -- config parser +- config parser/cmdline - record testing -- 0.1 - optimierung von rebuild_pollfds() diff --git a/src/util.c b/src/util.c new file mode 100644 index 00000000..0383a0ad --- /dev/null +++ b/src/util.c @@ -0,0 +1,62 @@ +#include <assert.h> +#include <string.h> +#include <stdio.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <fcntl.h> + +#include "util.h" + +void make_nonblock_fd(int fd) { + int v; + + if ((v = fcntl(fd, F_GETFL)) >= 0) + if (!(v & O_NONBLOCK)) + fcntl(fd, F_SETFL, v|O_NONBLOCK); +} + +void peer_to_string(char *c, size_t l, int fd) { + struct stat st; + + assert(c && l && fd >= 0); + + if (fstat(fd, &st) < 0) { + snprintf(c, l, "Invalid client fd"); + return; + } + + if (S_ISSOCK(st.st_mode)) { + union { + struct sockaddr sa; + struct sockaddr_in in; + struct sockaddr_un un; + } sa; + socklen_t sa_len = sizeof(sa); + + if (getpeername(fd, &sa.sa, &sa_len) >= 0) { + + if (sa.sa.sa_family == AF_INET) { + uint32_t ip = ntohl(sa.in.sin_addr.s_addr); + + snprintf(c, l, "TCP/IP client from %i.%i.%i.%i:%u", + ip >> 24, + (ip >> 16) & 0xFF, + (ip >> 8) & 0xFF, + ip & 0xFF, + ntohs(sa.in.sin_port)); + return; + } else if (sa.sa.sa_family == AF_LOCAL) { + snprintf(c, l, "UNIX client for %s", sa.un.sun_path); + return; + } + + } + snprintf(c, l, "Unknown network client"); + return; + } else if (S_ISCHR(st.st_mode) && (fd == 0 || fd == 1)) { + snprintf(c, l, "STDIN/STDOUT client"); + return; + } + + snprintf(c, l, "Unknown client"); +} diff --git a/src/util.h b/src/util.h new file mode 100644 index 00000000..830ee2e0 --- /dev/null +++ b/src/util.h @@ -0,0 +1,8 @@ +#ifndef fooutilhfoo +#define fooutilhfoo + +void make_nonblock_fd(int fd); + +void peer_to_string(char *c, size_t l, int fd); + +#endif |