diff options
62 files changed, 2460 insertions, 742 deletions
diff --git a/bootstrap.sh b/bootstrap.sh index f26ceb15..c9880d85 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -33,7 +33,7 @@ else      automake -a -c      autoconf -Wall -    ./configure --sysconfdir=/etc "$@" +    CFLAGS="-g -O0" ./configure --sysconfdir=/etc "$@"      make clean  fi diff --git a/configure.ac b/configure.ac index 16376902..3a14a061 100644 --- a/configure.ac +++ b/configure.ac @@ -42,7 +42,7 @@ AC_PROG_LIBTOOL  # If using GCC specifiy some additional parameters  if test "x$GCC" = "xyes" ; then -   CFLAGS="$CFLAGS -pipe -Wall -W" +   CFLAGS="$CFLAGS -pipe -Wall -W -Wno-unused-parameter"  fi  AC_CONFIG_FILES([Makefile src/Makefile]) diff --git a/src/Makefile.am b/src/Makefile.am index 443a25f2..6ad1488f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -18,31 +18,39 @@  AM_CFLAGS=-ansi -D_GNU_SOURCE -bin_PROGRAMS = polypaudio  +bin_PROGRAMS = polypaudio pacat -pkglib_LTLIBRARIES=libprotocol-simple.la module-simple-protocol-tcp.la \ -		libsocket-server.la module-pipe-sink.la libpstream.la libiochannel.la \ +pkglib_LTLIBRARIES=libiochannel.la libsocket-server.la libsocket-client.la \ +		libprotocol-simple.la module-simple-protocol-tcp.la \ +		module-pipe-sink.la libpstream.la \  		libpacket.la module-oss.la module-oss-mmap.la liboss.la libioline.la \  		libcli.la module-cli.la libtokenizer.la libdynarray.la \  		module-simple-protocol-unix.la module-cli-protocol-tcp.la \ -		libprotocol-cli.la libprotocol-native.la module-native-protocol-tcp.la \ -        module-native-protocol-unix.la module-cli-protocol-unix.la libtagstruct.la +		libprotocol-cli.la module-cli-protocol-unix.la libtagstruct.la \ +		libpdispatch.la libprotocol-native.la libpstream-util.la \ +        module-native-protocol-tcp.la module-native-protocol-unix.la \ +		libpolyp.la  polypaudio_SOURCES = idxset.c idxset.h \  		queue.c queue.h \  		strbuf.c strbuf.h \ +		main.c main.h \  		mainloop.c mainloop.h \  		memblock.c memblock.h \  		sample.c sample.h \ +		sample-util.c sample-util.h \  		memblockq.c memblockq.h \  		client.c client.h \  		core.c core.h \ -		main.c main.h \  		sourceoutput.c sourceoutput.h \  		sinkinput.c sinkinput.h \  		source.c source.h \  		sink.c sink.h \ -		module.c module.h +		module.c module.h \ +		mainloop-signal.c mainloop-signal.h \ +		mainloop-api.c mainloop-api.h \ +		util.c util.h +polypaudio_CFLAGS = $(AM_CFLAGS)  polypaudio_INCLUDES = $(INCLTDL)  polypaudio_LDADD = $(LIBLTDL)  @@ -56,10 +64,22 @@ libsocket_server_la_SOURCES = socket-server.c socket-server.h  libsocket_server_la_LDFLAGS = -avoid-version  libsocket_server_la_LIBADD = libiochannel.la +libsocket_client_la_SOURCES = socket-client.c socket-client.h +libsocket_client_la_LDFLAGS = -avoid-version +libsocket_client_la_LIBADD = libiochannel.la +  libpstream_la_SOURCES = pstream.c pstream.h  libpstream_la_LDFLAGS = -avoid-version  libpstream_la_LIBADD = libpacket.la +libpstream_util_la_SOURCES = pstream-util.c pstream-util.h +libpstream_util_la_LDFLAGS = -avoid-version +libpstream_util_la_LIBADD = libpstream.la libtagstruct.la + +libpdispatch_la_SOURCES = pdispatch.c pdispatch.h +libpdispatch_la_LDFLAGS = -avoid-version +libpdispatch_la_LIBADD = libpacket.la libtagstruct.la +  libiochannel_la_SOURCES = iochannel.c iochannel.h  libiochannel_la_LDFLAGS = -avoid-version @@ -90,7 +110,7 @@ libprotocol_cli_la_LIBADD = libsocket-server.la libiochannel.la libcli.la  libprotocol_native_la_SOURCES = protocol-native.c protocol-native.h  libprotocol_native_la_LDFLAGS = -avoid-version -libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la +libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la libpstream-util.la  libtagstruct_la_SOURCES = tagstruct.c tagstruct.h  libtagstruct_la_LDFLAGS = -avoid-version @@ -140,3 +160,24 @@ module_oss_mmap_la_LIBADD = libiochannel.la liboss.la  module_cli_la_SOURCES = module-cli.c  module_cli_la_LDFLAGS = -module -avoid-version  module_cli_la_LIBADD = libcli.la libiochannel.la libtokenizer.la + +libpolyp_la_SOURCES = polyp.c polyp.h \ +		polypdef.h \ +		tagstruct.c tagstruct.h \ +		iochannel.c iochannel.h \ +		pstream.c pstream.h \ +		pstream-util.c pstream-util.h \ +		pdispatch.c pdispatch.h \ +		protocol-native-spec.h \ +		mainloop-api.c mainloop-api.h \ +		mainloop.c mainloop.h \ +		idxset.c idxset.h \ +		util.c util.h \ +		memblock.c memblock.h \ +		socket-client.c socket-client.h \ +		packet.c packet.h \ +		queue.c queue.h \ +		dynarray.c dynarray.h + +pacat_SOURCES = pacat.c +pacat_LDADD = libpolyp.la @@ -20,6 +20,8 @@ struct cli {      void (*eof_callback)(struct cli *c, void *userdata);      void *userdata; + +    struct client *client;  };  struct command { @@ -63,6 +65,7 @@ static const struct command commands[] = {  static const char prompt[] = ">>> ";  struct cli* cli_new(struct core *core, struct iochannel *io) { +    char cname[256];      struct cli *c;      assert(io); @@ -75,16 +78,21 @@ struct cli* cli_new(struct core *core, struct iochannel *io) {      c->userdata = NULL;      c->eof_callback = NULL; +    iochannel_peer_to_string(io, cname, sizeof(cname)); +    c->client = client_new(core, "CLI", cname); +    assert(c->client); +          ioline_set_callback(c->line, line_callback, c);      ioline_puts(c->line, "Welcome to polypaudio! Use \"help\" for usage information.\n");      ioline_puts(c->line, prompt); - +          return c;  }  void cli_free(struct cli *c) {      assert(c);      ioline_free(c->line); +    client_free(c->client);      free(c);  } @@ -135,7 +143,7 @@ void cli_set_eof_callback(struct cli *c, void (*cb)(struct cli*c, void *userdata  static void cli_command_exit(struct cli *c, struct tokenizer *t) {      assert(c && c->core && c->core->mainloop && t); -    mainloop_quit(c->core->mainloop, -1); +    c->core->mainloop->quit(c->core->mainloop, 0);  }  static void cli_command_help(struct cli *c, struct tokenizer *t) { @@ -7,7 +7,7 @@  #include "sink.h"  #include "source.h" -struct core* core_new(struct mainloop *m) { +struct core* core_new(struct pa_mainloop_api *m) {      struct core* c;      c = malloc(sizeof(struct core));      assert(c); @@ -2,17 +2,17 @@  #define foocorehfoo  #include "idxset.h" -#include "mainloop.h" +#include "mainloop-api.h"  struct core { -    struct mainloop *mainloop; +    struct pa_mainloop_api *mainloop;      struct idxset *clients, *sinks, *sources, *sink_inputs, *source_outputs, *modules;      uint32_t default_source_index, default_sink_index;  }; -struct core* core_new(struct mainloop *m); +struct core* core_new(struct pa_mainloop_api *m);  void core_free(struct core*c);  #endif diff --git a/src/iochannel.c b/src/iochannel.c index f0c4c499..910b7e0b 100644 --- a/src/iochannel.c +++ b/src/iochannel.c @@ -4,10 +4,11 @@  #include <unistd.h>  #include "iochannel.h" +#include "util.h"  struct iochannel {      int ifd, ofd; -    struct mainloop* mainloop; +    struct pa_mainloop_api* mainloop;      void (*callback)(struct iochannel*io, void *userdata);      void*userdata; @@ -17,43 +18,45 @@ struct iochannel {      int no_close; -    struct mainloop_source* input_source, *output_source; +    void* input_source, *output_source;  };  static void enable_mainloop_sources(struct iochannel *io) {      assert(io);      if (io->input_source == io->output_source) { -        enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL; +        enum pa_mainloop_api_io_events e = PA_MAINLOOP_API_IO_EVENT_NULL;          assert(io->input_source);          if (!io->readable) -            e |= MAINLOOP_IO_EVENT_IN; +            e |= PA_MAINLOOP_API_IO_EVENT_INPUT;          if (!io->writable) -            e |= MAINLOOP_IO_EVENT_OUT; +            e |= PA_MAINLOOP_API_IO_EVENT_OUTPUT; -        mainloop_source_io_set_events(io->input_source, e); +        io->mainloop->enable_io(io->mainloop, io->input_source, e);      } else {          if (io->input_source) -            mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN); +            io->mainloop->enable_io(io->mainloop, io->input_source, io->readable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_INPUT);          if (io->output_source) -            mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT); +            io->mainloop->enable_io(io->mainloop, io->output_source, io->writable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_OUTPUT);      }  } -static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) { +static void callback(struct pa_mainloop_api* m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {      struct iochannel *io = userdata;      int changed = 0; -    assert(s && fd >= 0 && userdata); +    assert(m && fd >= 0 && events && userdata); -    if ((events & MAINLOOP_IO_EVENT_IN) && !io->readable) { +    if ((events & PA_MAINLOOP_API_IO_EVENT_INPUT) && !io->readable) {          io->readable = 1;          changed = 1; +        assert(id == io->input_source);      } -    if ((events & MAINLOOP_IO_EVENT_OUT) && !io->writable) { +    if ((events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) && !io->writable) {          io->writable = 1;          changed = 1; +        assert(id == io->output_source);      }      if (changed) { @@ -64,15 +67,7 @@ static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event ev      }  } -static void make_nonblock_fd(int fd) { -    int v; - -    if ((v = fcntl(fd, F_GETFL)) >= 0) -        if (!(v & O_NONBLOCK)) -            fcntl(fd, F_SETFL, v|O_NONBLOCK); -} - -struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) { +struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd) {      struct iochannel *io;      assert(m && (ifd >= 0 || ofd >= 0)); @@ -90,18 +85,18 @@ struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {      if (ifd == ofd) {          assert(ifd >= 0);          make_nonblock_fd(io->ifd); -        io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io); +        io->input_source = io->output_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_BOTH, callback, io);      } else {          if (ifd >= 0) {              make_nonblock_fd(io->ifd); -            io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io); +            io->input_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, io);          } else              io->input_source = NULL;          if (ofd >= 0) {              make_nonblock_fd(io->ofd); -            io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io); +            io->output_source = m->source_io(m, ofd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, callback, io);          } else              io->output_source = NULL;      } @@ -120,9 +115,9 @@ void iochannel_free(struct iochannel*io) {      }      if (io->input_source) -        mainloop_source_free(io->input_source); +        io->mainloop->cancel_io(io->mainloop, io->input_source);      if (io->output_source && io->output_source != io->input_source) -        mainloop_source_free(io->output_source); +        io->mainloop->cancel_io(io->mainloop, io->output_source);      free(io);  } @@ -172,3 +167,8 @@ void iochannel_set_noclose(struct iochannel*io, int b) {      assert(io);      io->no_close = b;  } + +void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l) { +    assert(io && s && l); +    peer_to_string(s, l, io->ifd); +} diff --git a/src/iochannel.h b/src/iochannel.h index 8ed8b878..b0465a19 100644 --- a/src/iochannel.h +++ b/src/iochannel.h @@ -2,11 +2,11 @@  #define fooiochannelhfoo  #include <sys/types.h> -#include "mainloop.h" +#include "mainloop-api.h"  struct iochannel; -struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd); +struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd);  void iochannel_free(struct iochannel*io);  ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l); @@ -19,4 +19,6 @@ void iochannel_set_noclose(struct iochannel*io, int b);  void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata); +void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l); +  #endif @@ -8,46 +8,52 @@  #include "core.h"  #include "mainloop.h"  #include "module.h" +#include "mainloop-signal.h"  int stdin_inuse = 0, stdout_inuse = 0; -static void signal_callback(struct mainloop_source *m, int sig, void *userdata) { -    mainloop_quit(mainloop_source_get_mainloop(m), -1); +static struct pa_mainloop *mainloop; + +static void signal_callback(void *id, int sig, void *userdata) { +    struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop); +    m->quit(m, 1);      fprintf(stderr, "main: got signal.\n");  }  int main(int argc, char *argv[]) { -    struct mainloop *m;      struct core *c; -    int r; +    int r, retval = 0;      r = lt_dlinit();      assert(r == 0); -    m = mainloop_new(); -    assert(m); -    c = core_new(m); -    assert(c); +    mainloop = pa_mainloop_new(); +    assert(mainloop); -    mainloop_source_new_signal(m, SIGINT, signal_callback, NULL); +    r = pa_signal_init(pa_mainloop_get_api(mainloop)); +    assert(r == 0); +    pa_signal_register(SIGINT, signal_callback, NULL);      signal(SIGPIPE, SIG_IGN); +    c = core_new(pa_mainloop_get_api(mainloop)); +    assert(c); +      module_load(c, "module-oss-mmap", "/dev/dsp1");      module_load(c, "module-pipe-sink", NULL);      module_load(c, "module-simple-protocol-tcp", NULL);      module_load(c, "module-cli", NULL);      fprintf(stderr, "main: mainloop entry.\n"); -    while (mainloop_iterate(m, 1) == 0); -/*        fprintf(stderr, "main: %u blocks\n", n_blocks);*/ +    if (pa_mainloop_run(mainloop, &retval) < 0) +        retval = 1;      fprintf(stderr, "main: mainloop exit.\n"); - -    mainloop_run(m);      core_free(c); -    mainloop_free(m); + +    pa_signal_done(); +    pa_mainloop_free(mainloop);      lt_dlexit(); -    return 0; +    return retval;  } diff --git a/src/mainloop-api.c b/src/mainloop-api.c new file mode 100644 index 00000000..6caa0c25 --- /dev/null +++ b/src/mainloop-api.c @@ -0,0 +1,35 @@ +#include <assert.h> +#include <stdlib.h> +#include "mainloop-api.h" + +struct once_info { +    void (*callback)(void *userdata); +    void *userdata; +}; + +static void once_callback(struct pa_mainloop_api *api, void *id, void *userdata) { +    struct once_info *i = userdata; +    assert(api && i && i->callback); +    i->callback(i->userdata); +    assert(api->cancel_fixed); +    api->cancel_fixed(api, id); +    free(i); +} + +void pa_mainloop_api_once(struct pa_mainloop_api* api, void (*callback)(void *userdata), void *userdata) { +    struct once_info *i; +    void *id; +    assert(api && callback); + +    i = malloc(sizeof(struct once_info)); +    assert(i); +    i->callback = callback; +    i->userdata = userdata; + +    assert(api->source_fixed); +    id = api->source_fixed(api, once_callback, i); +    assert(id); + +    /* Note: if the mainloop is destroyed before once_callback() was called, some memory is leaked. */ +} + diff --git a/src/mainloop-api.h b/src/mainloop-api.h new file mode 100644 index 00000000..96dacc22 --- /dev/null +++ b/src/mainloop-api.h @@ -0,0 +1,43 @@ +#ifndef foomainloopapihfoo +#define foomainloopapihfoo + +#include <time.h> +#include <sys/time.h> + +enum pa_mainloop_api_io_events { +    PA_MAINLOOP_API_IO_EVENT_NULL = 0, +    PA_MAINLOOP_API_IO_EVENT_INPUT = 1, +    PA_MAINLOOP_API_IO_EVENT_OUTPUT = 2, +    PA_MAINLOOP_API_IO_EVENT_BOTH = 3 +}; + +struct pa_mainloop_api { +    void *userdata; + +    /* IO sources */ +    void* (*source_io)(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata); +    void  (*enable_io)(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events); +    void  (*cancel_io)(struct pa_mainloop_api*a, void* id); + +    /* Fixed sources */ +    void* (*source_fixed)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata); +    void  (*enable_fixed)(struct pa_mainloop_api*a, void* id, int b); +    void  (*cancel_fixed)(struct pa_mainloop_api*a, void* id); + +    /* Idle sources */ +    void* (*source_idle)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata); +    void  (*enable_idle)(struct pa_mainloop_api*a, void* id, int b); +    void  (*cancel_idle)(struct pa_mainloop_api*a, void* id); +     +    /* Time sources */ +    void* (*source_time)(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata); +    void  (*enable_time)(struct pa_mainloop_api*a, void *id, const struct timeval *tv); +    void  (*cancel_time)(struct pa_mainloop_api*a, void* id); + +    /* Exit mainloop */ +    void (*quit)(struct pa_mainloop_api*a, int retval); +}; + +void pa_mainloop_api_once(struct pa_mainloop_api*m, void (*callback)(void *userdata), void *userdata); + +#endif diff --git a/src/mainloop-signal.c b/src/mainloop-signal.c new file mode 100644 index 00000000..dcc72f69 --- /dev/null +++ b/src/mainloop-signal.c @@ -0,0 +1,138 @@ +#include <stdio.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "mainloop-signal.h" +#include "util.h" + +struct signal_info { +    int sig; +    struct sigaction saved_sigaction; +    void (*callback) (void *id, int signal, void *userdata); +    void *userdata; +    struct signal_info *previous, *next; +}; + +static struct pa_mainloop_api *api = NULL; +static int signal_pipe[2] = { -1, -1 }; +static void* mainloop_source = NULL; +static struct signal_info *signals = NULL; + +static void signal_handler(int sig) { +    write(signal_pipe[1], &sig, sizeof(sig)); +} + +static void callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { +    assert(a && id && events == PA_MAINLOOP_API_IO_EVENT_INPUT && id == mainloop_source && fd == signal_pipe[0]); + +    for (;;) { +        ssize_t r; +        int sig; +        struct signal_info*s; +         +        if ((r = read(signal_pipe[0], &sig, sizeof(sig))) < 0) { +            if (errno == EAGAIN) +                return; + +            fprintf(stderr, "signal.c: read(): %s\n", strerror(errno)); +            return; +        } + +        if (r != sizeof(sig)) { +            fprintf(stderr, "signal.c: short read()\n"); +            return; +        } + +        for (s = signals; s; s = s->next)  +            if (s->sig == sig) { +                assert(s->callback); +                s->callback(s, sig, s->userdata); +                break; +            } +    } +} + +int pa_signal_init(struct pa_mainloop_api *a) { +    assert(a); +    if (pipe(signal_pipe) < 0) { +        fprintf(stderr, "pipe() failed: %s\n", strerror(errno)); +        return -1; +    } + +    make_nonblock_fd(signal_pipe[0]); +    make_nonblock_fd(signal_pipe[1]); + +    api = a; +    mainloop_source = api->source_io(api, signal_pipe[0], PA_MAINLOOP_API_IO_EVENT_INPUT, callback, NULL); +    assert(mainloop_source); +    return 0; +} + +void pa_signal_done(void) { +    assert(api && signal_pipe[0] >= 0 && signal_pipe[1] >= 0 && mainloop_source); + +    api->cancel_io(api, mainloop_source); +    mainloop_source = NULL; + +    close(signal_pipe[0]); +    close(signal_pipe[1]); +    signal_pipe[0] = signal_pipe[1] = -1; + +    while (signals) +        pa_signal_unregister(signals); +     +    api = NULL; +} + +void* pa_signal_register(int sig, void (*callback) (void *id, int signal, void *userdata), void *userdata) { +    struct signal_info *s = NULL; +    struct sigaction sa; +    assert(sig > 0 && callback); + +    for (s = signals; s; s = s->next) +        if (s->sig == sig) +            goto fail; +     +    s = malloc(sizeof(struct signal_info)); +    assert(s); +    s->sig = sig; +    s->callback = callback; +    s->userdata = userdata; + +    memset(&sa, 0, sizeof(sa)); +    sa.sa_handler = signal_handler; +    sigemptyset(&sa.sa_mask); +    sa.sa_flags = SA_RESTART; +     +    if (sigaction(sig, &sa, &s->saved_sigaction) < 0) +        goto fail; + +    s->previous = NULL; +    s->next = signals; +    signals = s; + +    return s; +fail: +    if (s) +        free(s); +    return NULL; +} + +void pa_signal_unregister(void *id) { +    struct signal_info *s = id; +    assert(s); + +    if (s->next) +        s->next->previous = s->previous; +    if (s->previous) +        s->previous->next = s->next; +    else +        signals = s->next; + +    sigaction(s->sig, &s->saved_sigaction, NULL); +    free(s); +} diff --git a/src/mainloop-signal.h b/src/mainloop-signal.h new file mode 100644 index 00000000..e3e2364d --- /dev/null +++ b/src/mainloop-signal.h @@ -0,0 +1,12 @@ +#ifndef foomainloopsignalhfoo +#define foomainloopsignalhfoo + +#include "mainloop-api.h" + +int pa_signal_init(struct pa_mainloop_api *api); +void pa_signal_done(void); + +void* pa_signal_register(int signal, void (*callback) (void *id, int signal, void *userdata), void *userdata); +void pa_signal_unregister(void *id); + +#endif diff --git a/src/mainloop.c b/src/mainloop.c index fba0461c..a1758c65 100644 --- a/src/mainloop.c +++ b/src/mainloop.c @@ -1,3 +1,4 @@ +#include <stdio.h>  #include <signal.h>  #include <unistd.h>  #include <sys/poll.h> @@ -8,468 +9,515 @@  #include <errno.h>  #include "mainloop.h" +#include "util.h" +#include "idxset.h" -struct mainloop_source { -    struct mainloop_source *next; -    struct mainloop *mainloop; -    enum mainloop_source_type type; - -    int enabled; +struct mainloop_source_header { +    struct pa_mainloop *mainloop;      int dead; -    void *userdata; - -    struct { -        int fd; -        enum mainloop_io_event events; -        void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata); -        struct pollfd pollfd; -    } io; +}; -    struct  { -        void (*callback)(struct mainloop_source*s, void *userdata); -    } fixed; +struct mainloop_source_io { +    struct mainloop_source_header header; -    struct  { -        void (*callback)(struct mainloop_source*s, void *userdata); -    } idle; - -    struct { -        int sig; -        struct sigaction sigaction; -        void (*callback)(struct mainloop_source*s, int sig, void *userdata); -    } signal; +    int fd; +    enum pa_mainloop_api_io_events events; +    void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata); +    void *userdata; + +    struct pollfd *pollfd;  }; -struct mainloop_source_list { -    struct mainloop_source *sources; -    int n_sources; -    int dead_sources; +struct mainloop_source_fixed_or_idle { +    struct mainloop_source_header header; +    int enabled; + +    void (*callback)(struct pa_mainloop_api*a, void *id, void *userdata); +    void *userdata;  }; -struct mainloop { -    struct mainloop_source_list io_sources, fixed_sources, idle_sources, signal_sources; +struct mainloop_source_time { +    struct mainloop_source_header header; +    int enabled; +    struct timeval timeval; +    void (*callback)(struct pa_mainloop_api*a, void *id, const struct timeval*tv, void *userdata); +    void *userdata; +}; + +struct pa_mainloop { +    struct idxset *io_sources, *fixed_sources, *idle_sources, *time_sources; +    int io_sources_scan_dead, fixed_sources_scan_dead, idle_sources_scan_dead, time_sources_scan_dead; +      struct pollfd *pollfds; -    int max_pollfds, n_pollfds; +    unsigned max_pollfds, n_pollfds;      int rebuild_pollfds; -    int quit; -    int running; -    int signal_pipe[2]; -    struct pollfd signal_pollfd; +    int quit, running, retval; +    struct pa_mainloop_api api;  }; -static int signal_pipe = -1; +static void setup_api(struct pa_mainloop *m); -static void signal_func(int sig) { -    if (signal_pipe >= 0) -        write(signal_pipe, &sig, sizeof(sig)); -} +struct pa_mainloop *pa_mainloop_new(void) { +    struct pa_mainloop *m; -static void make_nonblock(int fd) { -    int v; -     -    if ((v = fcntl(fd, F_GETFL)) >= 0) -        fcntl(fd, F_SETFL, v|O_NONBLOCK); -} +    m = malloc(sizeof(struct pa_mainloop)); +    assert(m); +    m->io_sources = idxset_new(NULL, NULL); +    m->fixed_sources = idxset_new(NULL, NULL); +    m->idle_sources = idxset_new(NULL, NULL); +    m->time_sources = idxset_new(NULL, NULL); -struct mainloop *mainloop_new(void) { -    int r; -    struct mainloop *m; +    assert(m->io_sources && m->fixed_sources && m->idle_sources && m->time_sources); -    m = malloc(sizeof(struct mainloop)); -    assert(m); -    memset(m, 0, sizeof(struct mainloop)); +    m->io_sources_scan_dead = m->fixed_sources_scan_dead = m->idle_sources_scan_dead = m->time_sources_scan_dead = 0; +     +    m->pollfds = NULL; +    m->max_pollfds = m->n_pollfds = m->rebuild_pollfds = 0; -    r = pipe(m->signal_pipe); -    assert(r >= 0 && m->signal_pipe[0] >= 0 && m->signal_pipe[1] >= 0); +    m->quit = m->running = m->retval = 0; -    make_nonblock(m->signal_pipe[0]); -    make_nonblock(m->signal_pipe[1]); -     -    signal_pipe = m->signal_pipe[1]; -    m->signal_pollfd.fd = m->signal_pipe[0]; -    m->signal_pollfd.events = POLLIN; -    m->signal_pollfd.revents = 0; +    setup_api(m);      return m;  } -static void free_sources(struct mainloop_source_list *l, int all) { -    struct mainloop_source *s, *p; -    assert(l); +static int foreach(void *p, uint32_t index, int *del, void*userdata) { +    struct mainloop_source_header *h = p; +    int *all = userdata; +    assert(p && del && all); -    if (!all && !l->dead_sources) -        return; - -    p = NULL; -    s = l->sources; -    while (s) { -        if (all || s->dead) { -            struct mainloop_source *t = s; -            s = s->next; - -            if (p) -                p->next = s; -            else -                l->sources = s; -             -            free(t); -        } else { -            p = s; -            s = s->next; -        } +    if (*all || h->dead) { +        free(h); +        *del = 1;      } -    l->dead_sources = 0; - -    if (all) { -        assert(!l->sources); -        l->n_sources = 0; -    } -} +    return 0; +}; -void mainloop_free(struct mainloop* m) { +void pa_mainloop_free(struct pa_mainloop* m) { +    int all = 1;      assert(m); -    free_sources(&m->io_sources, 1); -    free_sources(&m->fixed_sources, 1); -    free_sources(&m->idle_sources, 1); -    free_sources(&m->signal_sources, 1); - -    if (signal_pipe == m->signal_pipe[1]) -        signal_pipe = -1; -    close(m->signal_pipe[0]); -    close(m->signal_pipe[1]); -     +    idxset_foreach(m->io_sources, foreach, &all); +    idxset_foreach(m->fixed_sources, foreach, &all); +    idxset_foreach(m->idle_sources, foreach, &all); +    idxset_foreach(m->time_sources, foreach, &all); + +    idxset_free(m->io_sources, NULL, NULL); +    idxset_free(m->fixed_sources, NULL, NULL); +    idxset_free(m->idle_sources, NULL, NULL); +    idxset_free(m->time_sources, NULL, NULL); +      free(m->pollfds);      free(m);  } -static void rebuild_pollfds(struct mainloop *m) { -    struct mainloop_source*s; +static void scan_dead(struct pa_mainloop *m) { +    int all = 0; +    assert(m); +    if (m->io_sources_scan_dead) +        idxset_foreach(m->io_sources, foreach, &all); +    if (m->fixed_sources_scan_dead) +        idxset_foreach(m->fixed_sources, foreach, &all); +    if (m->idle_sources_scan_dead) +        idxset_foreach(m->idle_sources, foreach, &all); +    if (m->time_sources_scan_dead) +        idxset_foreach(m->time_sources, foreach, &all); +} + +static void rebuild_pollfds(struct pa_mainloop *m) { +    struct mainloop_source_io*s;      struct pollfd *p; -     -    if (m->max_pollfds < m->io_sources.n_sources+1) { -        m->max_pollfds = (m->io_sources.n_sources+1)*2; -        m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds); +    uint32_t index = IDXSET_INVALID; +    unsigned l; + +    l = idxset_ncontents(m->io_sources); +    if (m->max_pollfds < l) { +        m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*l); +        m->max_pollfds = l;      }      m->n_pollfds = 0;      p = m->pollfds; -    for (s = m->io_sources.sources; s; s = s->next) { -        assert(s->type == MAINLOOP_SOURCE_TYPE_IO); -        if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) { -            *(p++) = s->io.pollfd; -            m->n_pollfds++; +    for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) { +        if (s->header.dead) { +            s->pollfd = NULL; +            continue;          } + +        s->pollfd = p; +        p->fd = s->fd; +        p->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0); +        p->revents = 0; + +        p++; +        m->n_pollfds++;      } +} + +static void dispatch_pollfds(struct pa_mainloop *m) { +    uint32_t index = IDXSET_INVALID; +    struct mainloop_source_io *s; -    *(p++) = m->signal_pollfd; -    m->n_pollfds++; +    for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) { +        if (s->header.dead || !s->events || !s->pollfd || !s->pollfd->revents) +            continue; +         +        assert(s->pollfd->revents <= s->pollfd->events && s->pollfd->fd == s->fd && s->callback); +        s->callback(&m->api, s, s->fd, ((s->pollfd->revents & POLLIN) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata); +    }  } -static void dispatch_pollfds(struct mainloop *m) { -    int i; -    struct pollfd *p; -    struct mainloop_source *s; -    /* This loop assumes that m->sources and m->pollfds have the same -     * order and that m->pollfds is a subset of m->sources! */ +static void run_fixed_or_idle(struct pa_mainloop *m, struct idxset *i) { +    uint32_t index = IDXSET_INVALID; +    struct mainloop_source_fixed_or_idle *s; -    s = m->io_sources.sources; -    for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) { -        if (!p->revents) +    for (s = idxset_first(i, &index); s; s = idxset_next(i, &index)) { +        if (s->header.dead || !s->enabled)              continue; -        if (p->fd == m->signal_pipe[0]) { -            /* Event from signal pipe */ +        assert(s->callback); +        s->callback(&m->api, s, s->userdata); +    } +} + +static int calc_next_timeout(struct pa_mainloop *m) { +    uint32_t index = IDXSET_INVALID; +    struct mainloop_source_time *s; +    struct timeval now; +    int t = -1; -            if (p->revents & POLLIN) { -                int sig; -                ssize_t r; -                r = read(m->signal_pipe[0], &sig, sizeof(sig)); -                assert((r < 0 && errno == EAGAIN) || r == sizeof(sig)); +    if (idxset_isempty(m->time_sources)) +        return -1; + +    gettimeofday(&now, NULL); +     +    for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) { +        int tmp; +         +        if (s->header.dead || !s->enabled) +            continue; + +        if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec))  +            return 0; + +        tmp = (s->timeval.tv_sec - now.tv_sec)*1000; -                if (r == sizeof(sig)) { -                    struct mainloop_source *l = m->signal_sources.sources; -                    while (l) { -                        assert(l->type == MAINLOOP_SOURCE_TYPE_SIGNAL); -                         -                        if (l->signal.sig == sig && l->enabled && !l->dead) { -                            assert(l->signal.callback); -                            l->signal.callback(l, sig, l->userdata); -                        } -                         -                        l = l->next; -                    } -                } -            } - -        } else { -            /* Event from I/O source */ - -            for (; s; s = s->next) { -                if (p->fd != s->io.fd) -                    continue; -                 -                assert(s->type == MAINLOOP_SOURCE_TYPE_IO); - -                if (!s->dead && s->enabled) { -                    enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0); -                    if (e) { -                        assert(s->io.callback); -                        s->io.callback(s, s->io.fd, e, s->userdata); -                    } -                } - -                break; -            } +        if (s->timeval.tv_usec > now.tv_usec) +            tmp += (s->timeval.tv_usec - now.tv_usec)/1000; +        else +            tmp -= (now.tv_usec - s->timeval.tv_usec)/1000; + +        if (tmp == 0) +            return 0; +        else if (tmp < t) +            t = tmp; +    } + +    return t; +} + +static void dispatch_timeout(struct pa_mainloop *m) { +    uint32_t index = IDXSET_INVALID; +    struct mainloop_source_time *s; +    struct timeval now; +    assert(m); + +    if (idxset_isempty(m->time_sources)) +        return; + +    gettimeofday(&now, NULL); +    for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) { +         +        if (s->header.dead || !s->enabled) +            continue; + +        if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec)) { +            assert(s->callback); + +            s->enabled = 0; +            s->callback(&m->api, s, &s->timeval, s->userdata);          }      }  } -int mainloop_iterate(struct mainloop *m, int block) { -    struct mainloop_source *s; -    int c; +static int any_idle_sources(struct pa_mainloop *m) { +    struct mainloop_source_fixed_or_idle *s; +    uint32_t index; +    assert(m); +     +    for (s = idxset_first(m->idle_sources, &index); s; s = idxset_next(m->idle_sources, &index)) +        if (!s->header.dead && s->enabled) +            return 1; + +    return 0; +} + +int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval) { +    int r, idle;      assert(m && !m->running); -    if(m->quit) -        return m->quit; - -    free_sources(&m->io_sources, 0); -    free_sources(&m->fixed_sources, 0); -    free_sources(&m->idle_sources, 0); -    free_sources(&m->signal_sources, 0); - -    for (s = m->fixed_sources.sources; s; s = s->next) { -        assert(s->type == MAINLOOP_SOURCE_TYPE_FIXED); -        if (!s->dead && s->enabled) { -            assert(s->fixed.callback); -            s->fixed.callback(s, s->userdata); -        }    +    if(m->quit) { +        if (retval) +            *retval = m->retval; +        return 1;      } +    m->running = 1; + +    scan_dead(m); +    run_fixed_or_idle(m, m->fixed_sources); +      if (m->rebuild_pollfds) {          rebuild_pollfds(m);          m->rebuild_pollfds = 0;      } -    m->running = 1; +    idle = any_idle_sources(m);      do { -        c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0); -    } while (c < 0 && errno == EINTR); -         -    if (c > 0) +        int t; + +        if (!block || idle) +            t = 0; +        else  +            t = calc_next_timeout(m); +             +        r = poll(m->pollfds, m->n_pollfds, t); +    } while (r < 0 && errno == EINTR); + +    dispatch_timeout(m); +     +    if (r > 0)          dispatch_pollfds(m); -    else if (c == 0) { -        for (s = m->idle_sources.sources; s; s = s->next) { -            assert(s->type == MAINLOOP_SOURCE_TYPE_IDLE); -            if (!s->dead && s->enabled) { -                assert(s->idle.callback); -                s->idle.callback(s, s->userdata); -            } -        } -    } +    else if (r == 0 && idle) +        run_fixed_or_idle(m, m->idle_sources); +    else if (r < 0) +        fprintf(stderr, "select(): %s\n", strerror(errno));      m->running = 0; -    return c < 0 ? -1 : 0; +    return r < 0 ? -1 : 0;  } -int mainloop_run(struct mainloop *m) { +int pa_mainloop_run(struct pa_mainloop *m, int *retval) {      int r; -    while (!(r = mainloop_iterate(m, 1))); +    while ((r = pa_mainloop_iterate(m, 1, retval)) == 0);      return r;  } -void mainloop_quit(struct mainloop *m, int r) { +void pa_mainloop_quit(struct pa_mainloop *m, int r) {      assert(m);      m->quit = r;  } -static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) { -    struct mainloop_source_list *l; -     -    switch(type) { -        case MAINLOOP_SOURCE_TYPE_IO: -            l = &m->io_sources; -            break; -        case MAINLOOP_SOURCE_TYPE_FIXED: -            l = &m->fixed_sources; -            break; -        case MAINLOOP_SOURCE_TYPE_IDLE: -            l = &m->idle_sources; -            break; -        case MAINLOOP_SOURCE_TYPE_SIGNAL: -            l = &m->signal_sources; -            break; -        default: -            l = NULL; -            break; -    } -     -    return l; -} - -static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) { -    struct mainloop_source_list *l; -    struct mainloop_source* s; -    assert(m); +/* IO sources */ +static void* mainloop_source_io(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata) { +    struct pa_mainloop *m; +    struct mainloop_source_io *s; +    assert(a && a->userdata && fd >= 0 && callback); +    m = a->userdata; +    assert(a == &m->api); -    s = malloc(sizeof(struct mainloop_source)); +    s = malloc(sizeof(struct mainloop_source_io));      assert(s); -    memset(s, 0, sizeof(struct mainloop_source)); +    s->header.mainloop = m; +    s->header.dead = 0; -    s->type = type; -    s->mainloop = m; +    s->fd = fd; +    s->events = events; +    s->callback = callback; +    s->userdata = userdata; +    s->pollfd = NULL; -    l = get_source_list(m, type); -    assert(l); -             -    s->next = l->sources; -    l->sources = s; -    l->n_sources++; +    idxset_put(m->io_sources, s, NULL); +    m->rebuild_pollfds = 1;      return s;  } -struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) { -    struct mainloop_source* s; -    assert(m && fd>=0 && callback); +static void mainloop_enable_io(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events) { +    struct pa_mainloop *m; +    struct mainloop_source_io *s = id; +    assert(a && a->userdata && s && !s->header.dead); +    m = a->userdata; +    assert(a == &m->api && s->header.mainloop == m); -    s = source_new(m, MAINLOOP_SOURCE_TYPE_IO); +    s->events = events; +    if (s->pollfd) +        s->pollfd->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0); +} -    s->io.fd = fd; -    s->io.events = event; -    s->io.callback = callback; -    s->userdata = userdata; -    s->io.pollfd.fd = fd; -    s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0); -    s->io.pollfd.revents = 0; +static void mainloop_cancel_io(struct pa_mainloop_api*a, void* id) { +    struct pa_mainloop *m; +    struct mainloop_source_io *s = id; +    assert(a && a->userdata && s && !s->header.dead); +    m = a->userdata; +    assert(a == &m->api && s->header.mainloop == m); -    s->enabled = 1; - -    m->rebuild_pollfds = 1; -    return s; +    s->header.dead = 1; +    m->io_sources_scan_dead = 1;  } -struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) { -    struct mainloop_source* s; -    assert(m && callback); +/* Fixed sources */ +static void* mainloop_source_fixed(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) { +    struct pa_mainloop *m; +    struct mainloop_source_fixed_or_idle *s; +    assert(a && a->userdata && callback); +    m = a->userdata; +    assert(a == &m->api); -    s = source_new(m, MAINLOOP_SOURCE_TYPE_FIXED); +    s = malloc(sizeof(struct mainloop_source_fixed_or_idle)); +    assert(s); +    s->header.mainloop = m; +    s->header.dead = 0; -    s->fixed.callback = callback; -    s->userdata = userdata;      s->enabled = 1; +    s->callback = callback; +    s->userdata = userdata; + +    idxset_put(m->fixed_sources, s, NULL);      return s;  } -struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) { -    struct mainloop_source* s; -    assert(m && callback); - -    s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE); +static void mainloop_enable_fixed(struct pa_mainloop_api*a, void* id, int b) { +    struct pa_mainloop *m; +    struct mainloop_source_fixed_or_idle *s = id; +    assert(a && a->userdata && s && !s->header.dead); +    m = a->userdata; +    assert(a == &m->api); -    s->idle.callback = callback; -    s->userdata = userdata; -    s->enabled = 1; -    return s; +    s->enabled = b;  } -struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata) { -    struct mainloop_source* s; -    struct sigaction save_sa, sa; -     -    assert(m && callback); +static void mainloop_cancel_fixed(struct pa_mainloop_api*a, void* id) { +    struct pa_mainloop *m; +    struct mainloop_source_fixed_or_idle *s = id; +    assert(a && a->userdata && s && !s->header.dead); +    m = a->userdata; +    assert(a == &m->api); -    memset(&sa, 0, sizeof(sa)); -    sa.sa_handler = signal_func; -    sa.sa_flags = SA_RESTART; -    sigemptyset(&sa.sa_mask); +    s->header.dead = 1; +    m->fixed_sources_scan_dead = 1; +} -    memset(&save_sa, 0, sizeof(save_sa)); +/* Idle sources */ +static void* mainloop_source_idle(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) { +    struct pa_mainloop *m; +    struct mainloop_source_fixed_or_idle *s; +    assert(a && a->userdata && callback); +    m = a->userdata; +    assert(a == &m->api); + +    s = malloc(sizeof(struct mainloop_source_fixed_or_idle)); +    assert(s); +    s->header.mainloop = m; +    s->header.dead = 0; -    if (sigaction(sig, &sa, &save_sa) < 0) -        return NULL; -     -    s = source_new(m, MAINLOOP_SOURCE_TYPE_SIGNAL); -    s->signal.sig = sig; -    s->signal.sigaction = save_sa; -     -    s->signal.callback = callback; -    s->userdata = userdata;      s->enabled = 1; +    s->callback = callback; +    s->userdata = userdata; + +    idxset_put(m->idle_sources, s, NULL);      return s;  } -void mainloop_source_free(struct mainloop_source*s) { -    struct mainloop_source_list *l; -    assert(s && !s->dead); -    s->dead = 1; - -    assert(s->mainloop); -    l = get_source_list(s->mainloop, s->type); -    assert(l); +static void mainloop_cancel_idle(struct pa_mainloop_api*a, void* id) { +    struct pa_mainloop *m; +    struct mainloop_source_fixed_or_idle *s = id; +    assert(a && a->userdata && s && !s->header.dead); +    m = a->userdata; +    assert(a == &m->api); -    l->n_sources--; -    l->dead_sources = 1; - -    if (s->type == MAINLOOP_SOURCE_TYPE_IO) -        s->mainloop->rebuild_pollfds = 1; -    else if (s->type == MAINLOOP_SOURCE_TYPE_SIGNAL) -        sigaction(s->signal.sig, &s->signal.sigaction, NULL); +    s->header.dead = 1; +    m->idle_sources_scan_dead = 1;  } -void mainloop_source_enable(struct mainloop_source*s, int b) { -    assert(s && !s->dead); +/* Time sources */ +static void* mainloop_source_time(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata) { +    struct pa_mainloop *m; +    struct mainloop_source_time *s; +    assert(a && a->userdata && callback); +    m = a->userdata; +    assert(a == &m->api); -    if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) { -        assert(s->mainloop); -        s->mainloop->rebuild_pollfds = 1; -    } +    s = malloc(sizeof(struct mainloop_source_time)); +    assert(s); +    s->header.mainloop = m; +    s->header.dead = 0; -    s->enabled = b; -} +    s->enabled = !!tv; +    if (tv) +        s->timeval = *tv; -void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) { -    assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO); +    s->callback = callback; +    s->userdata = userdata; -    if (s->io.events != events) { -        assert(s->mainloop); -        s->mainloop->rebuild_pollfds = 1; -    } +    idxset_put(m->time_sources, s, NULL); +    return s; +} -    s->io.events = events; -    s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0); +static void mainloop_enable_time(struct pa_mainloop_api*a, void *id, const struct timeval *tv) { +    struct pa_mainloop *m; +    struct mainloop_source_time *s = id; +    assert(a && a->userdata && s && !s->header.dead); +    m = a->userdata; +    assert(a == &m->api); + +    if (tv) { +        s->enabled = 1; +        s->timeval = *tv; +    } else +        s->enabled = 0;  } -struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) { -    assert(s); +static void mainloop_cancel_time(struct pa_mainloop_api*a, void* id) { +    struct pa_mainloop *m; +    struct mainloop_source_time *s = id; +    assert(a && a->userdata && s && !s->header.dead); +    m = a->userdata; +    assert(a == &m->api); + +    s->header.dead = 1; +    m->time_sources_scan_dead = 1; -    return s->mainloop;  } -struct once_info { -    void (*callback)(void *userdata); -    void *userdata; -}; +static void mainloop_quit(struct pa_mainloop_api*a, int retval) { +    struct pa_mainloop *m; +    assert(a && a->userdata); +    m = a->userdata; +    assert(a == &m->api); -static void once_callback(struct mainloop_source *s, void *userdata) { -    struct once_info *i = userdata; -    assert(s && i && i->callback); -    i->callback(i->userdata); -    mainloop_source_free(s); -    free(i); +    m->quit = 1; +    m->retval = retval;  } +     +static void setup_api(struct pa_mainloop *m) { +    assert(m); +     +    m->api.userdata = m; +    m->api.source_io = mainloop_source_io; +    m->api.enable_io = mainloop_enable_io; +    m->api.cancel_io = mainloop_cancel_io; + +    m->api.source_fixed = mainloop_source_fixed; +    m->api.enable_fixed = mainloop_enable_fixed; +    m->api.cancel_fixed = mainloop_cancel_fixed; + +    m->api.source_idle = mainloop_source_idle; +    m->api.enable_idle = mainloop_enable_fixed; /* (!) */ +    m->api.cancel_idle = mainloop_cancel_idle; +     +    m->api.source_time = mainloop_source_time; +    m->api.enable_time = mainloop_enable_time; +    m->api.cancel_time = mainloop_cancel_time; -void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata) { -    struct once_info *i; -    assert(m && callback); +    m->api.quit = mainloop_quit; +} -    i = malloc(sizeof(struct once_info)); -    assert(i); -    i->callback = callback; -    i->userdata = userdata; -     -    mainloop_source_new_fixed(m, once_callback, i); +struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m) { +    assert(m); +    return &m->api;  } +         diff --git a/src/mainloop.h b/src/mainloop.h index c1bfc62a..7837636f 100644 --- a/src/mainloop.h +++ b/src/mainloop.h @@ -1,42 +1,16 @@  #ifndef foomainloophfoo  #define foomainloophfoo -struct mainloop; -struct mainloop_source; +#include "mainloop-api.h" -enum mainloop_io_event { -    MAINLOOP_IO_EVENT_NULL = 0, -    MAINLOOP_IO_EVENT_IN = 1, -    MAINLOOP_IO_EVENT_OUT = 2, -    MAINLOOP_IO_EVENT_BOTH = 3 -}; +struct pa_mainloop; -enum mainloop_source_type { -    MAINLOOP_SOURCE_TYPE_IO, -    MAINLOOP_SOURCE_TYPE_FIXED,  -    MAINLOOP_SOURCE_TYPE_IDLE, -    MAINLOOP_SOURCE_TYPE_SIGNAL -}; +struct pa_mainloop *pa_mainloop_new(void); +void pa_mainloop_free(struct pa_mainloop* m); -struct mainloop *mainloop_new(void); -void mainloop_free(struct mainloop* m); +int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval); +int pa_mainloop_run(struct pa_mainloop *m, int *retval); -int mainloop_iterate(struct mainloop *m, int block); -int mainloop_run(struct mainloop *m); -void mainloop_quit(struct mainloop *m, int r); - -struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata); -struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata); -struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata); -struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata); - -void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata); - -void mainloop_source_free(struct mainloop_source*s); -void mainloop_source_enable(struct mainloop_source*s, int b); - -void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event); - -struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s); +struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m);  #endif diff --git a/src/memblockq.c b/src/memblockq.c index 10c59e50..9a601c3a 100644 --- a/src/memblockq.c +++ b/src/memblockq.c @@ -220,3 +220,12 @@ uint32_t memblockq_get_length(struct memblockq *bq) {      assert(bq);      return bq->total_length;  } + +uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen) { +    assert(bq && qlen); + +    if (bq->total_length >= qlen) +        return 0; + +    return qlen - bq->total_length; +} diff --git a/src/memblockq.h b/src/memblockq.h index 0a68ddaf..a681ff08 100644 --- a/src/memblockq.h +++ b/src/memblockq.h @@ -25,4 +25,6 @@ int memblockq_is_writable(struct memblockq *bq, size_t length);  uint32_t memblockq_get_delay(struct memblockq *bq);  uint32_t memblockq_get_length(struct memblockq *bq); +uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen); +  #endif diff --git a/src/module-oss-mmap.c b/src/module-oss-mmap.c index bfc3a6fa..b70ea6bd 100644 --- a/src/module-oss-mmap.c +++ b/src/module-oss-mmap.c @@ -16,15 +16,15 @@  #include "source.h"  #include "module.h"  #include "oss.h" +#include "sample-util.h"  struct userdata {      struct sink *sink;      struct source *source;      struct core *core; -    struct sample_spec sample_spec; +    struct pa_sample_spec sample_spec; -    size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, sample_size, out_fill; -    uint32_t sample_usec; +    size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, out_fill;      int fd; @@ -161,12 +161,14 @@ static void do_read(struct userdata *u) {      in_clear_memblocks(u, u->in_fragments/2);  }; -static void io_callback(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata) { +static void io_callback(struct pa_mainloop_api *m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {      struct userdata *u = userdata; -    if (event & MAINLOOP_IO_EVENT_IN) +    assert (u && u->core->mainloop == m && u->mainloop_source == id); + +    if (events & PA_MAINLOOP_API_IO_EVENT_INPUT)          do_read(u); -    if (event & MAINLOOP_IO_EVENT_OUT) +    if (events & PA_MAINLOOP_API_IO_EVENT_OUTPUT)          do_write(u);  } @@ -175,7 +177,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {      assert(s && u);      do_write(u); -    return u->out_fill/u->sample_size*u->sample_usec; +    return pa_samples_usec(u->out_fill, &s->sample_spec);  }  int module_init(struct core *c, struct module*m) { @@ -316,10 +318,7 @@ int module_init(struct core *c, struct module*m) {      assert(u->source || u->sink); -    u->sample_size = sample_size(&u->sample_spec); -    u->sample_usec = 1000000/u->sample_spec.rate; - -    u->mainloop_source = mainloop_source_new_io(c->mainloop, u->fd, (u->source ? MAINLOOP_IO_EVENT_IN : 0) | (u->sink ? MAINLOOP_IO_EVENT_OUT : 0), io_callback, u); +    u->mainloop_source = c->mainloop->source_io(c->mainloop, u->fd, (u->source ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | (u->sink ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), io_callback, u);      assert(u->mainloop_source);      return 0; @@ -360,7 +359,7 @@ void module_done(struct core *c, struct module*m) {          source_free(u->source);      if (u->mainloop_source) -        mainloop_source_free(u->mainloop_source); +        u->core->mainloop->cancel_io(u->core->mainloop, u->mainloop_source);      if (u->fd >= 0)          close(u->fd); diff --git a/src/module-oss.c b/src/module-oss.c index fab5c8c5..4f307545 100644 --- a/src/module-oss.c +++ b/src/module-oss.c @@ -15,7 +15,7 @@  #include "source.h"  #include "module.h"  #include "oss.h" -#include "sample.h" +#include "sample-util.h"  struct userdata {      struct sink *sink; @@ -81,7 +81,7 @@ static void do_read(struct userdata *u) {          return;      } -    assert(r <= memchunk.memblock->length); +    assert(r <= (ssize_t) memchunk.memblock->length);      memchunk.length = memchunk.memblock->length = r;      memchunk.index = 0; @@ -107,7 +107,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {          return 0;      } -    return samples_usec(arg, &s->sample_spec); +    return pa_samples_usec(arg, &s->sample_spec);  }  int module_init(struct core *c, struct module*m) { @@ -117,7 +117,7 @@ int module_init(struct core *c, struct module*m) {      int fd = -1;      int frag_size, in_frag_size, out_frag_size;      int mode; -    struct sample_spec ss; +    struct pa_sample_spec ss;      assert(c && m);      p = m->argument ? m->argument : "/dev/dsp"; @@ -203,7 +203,7 @@ int module_init(struct core *c, struct module*m) {      u->memchunk.memblock = NULL;      u->memchunk.length = 0; -    u->sample_size = sample_size(&ss); +    u->sample_size = pa_sample_size(&ss);      u->out_fragment_size = out_frag_size;      u->in_fragment_size = in_frag_size; diff --git a/src/module-pipe-sink.c b/src/module-pipe-sink.c index c0a903e9..9dcf5d23 100644 --- a/src/module-pipe-sink.c +++ b/src/module-pipe-sink.c @@ -18,7 +18,8 @@ struct userdata {      struct sink *sink;      struct iochannel *io;      struct core *core; -    struct mainloop_source *mainloop_source; +    void *mainloop_source; +    struct pa_mainloop_api *mainloop;      struct memchunk memchunk;  }; @@ -27,7 +28,7 @@ static void do_write(struct userdata *u) {      ssize_t r;      assert(u); -    mainloop_source_enable(u->mainloop_source, 0); +    u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0);      if (!iochannel_is_writable(u->io))          return; @@ -57,10 +58,10 @@ static void notify_cb(struct sink*s) {      assert(s && u);      if (iochannel_is_writable(u->io)) -        mainloop_source_enable(u->mainloop_source, 1); +        u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 1);  } -static void prepare_callback(struct mainloop_source *src, void *userdata) { +static void fixed_callback(struct pa_mainloop_api *m, void *id, void *userdata) {      struct userdata *u = userdata;      assert(u);      do_write(u); @@ -77,7 +78,7 @@ int module_init(struct core *c, struct module*m) {      struct stat st;      char *p;      int fd = -1; -    static const struct sample_spec ss = { +    static const struct pa_sample_spec ss = {          .format = SAMPLE_S16NE,          .rate = 44100,          .channels = 2, @@ -120,10 +121,11 @@ int module_init(struct core *c, struct module*m) {      u->memchunk.memblock = NULL;      u->memchunk.length = 0; -    u->mainloop_source = mainloop_source_new_fixed(c->mainloop, prepare_callback, u); +    u->mainloop = c->mainloop; +    u->mainloop_source = u->mainloop->source_fixed(u->mainloop, fixed_callback, u);      assert(u->mainloop_source); -    mainloop_source_enable(u->mainloop_source, 0); -     +    u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0); +              m->userdata = u;      return 0; @@ -147,7 +149,7 @@ void module_done(struct core *c, struct module*m) {      sink_free(u->sink);      iochannel_free(u->io); -    mainloop_source_free(u->mainloop_source); +    u->mainloop->cancel_fixed(u->mainloop, u->mainloop_source);      assert(u->filename);      unlink(u->filename); diff --git a/src/module.c b/src/module.c index c6de1751..883a22df 100644 --- a/src/module.c +++ b/src/module.c @@ -151,5 +151,5 @@ void module_unload_request(struct core *c, struct module *m) {      assert(i);      i->core = c;      i->index = m->index; -    mainloop_once(c->mainloop, module_unload_once_callback, i); +    pa_mainloop_api_once(c->mainloop, module_unload_once_callback, i);  } @@ -7,7 +7,7 @@  #include "oss.h" -int oss_auto_format(int fd, struct sample_spec *ss) { +int oss_auto_format(int fd, struct pa_sample_spec *ss) {      int format, channels, speed;      assert(fd >= 0 && ss); @@ -3,6 +3,6 @@  #include "sample.h" -int oss_auto_format(int fd, struct sample_spec *ss); +int oss_auto_format(int fd, struct pa_sample_spec *ss);  #endif diff --git a/src/pacat.c b/src/pacat.c new file mode 100644 index 00000000..5f5a373b --- /dev/null +++ b/src/pacat.c @@ -0,0 +1,169 @@ +#include <string.h> +#include <errno.h> +#include <unistd.h> +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> + +#include "polyp.h" +#include "mainloop.h" + +static struct pa_context *context = NULL; +static struct pa_stream *stream = NULL; +static struct pa_mainloop_api *mainloop_api = NULL; + +static void *buffer = NULL; +static size_t buffer_length = 0, buffer_index = 0; + +static void* stdin_source = NULL; + +static void context_die_callback(struct pa_context *c, void *userdata) { +    assert(c); +    fprintf(stderr, "Connection to server shut down, exiting.\n"); +    mainloop_api->quit(mainloop_api, 1); +} + +static void stream_die_callback(struct pa_stream *s, void *userdata) { +    assert(s); +    fprintf(stderr, "Stream deleted, exiting.\n"); +    mainloop_api->quit(mainloop_api, 1); +} + +static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) { +    size_t l; +    assert(s && length); +     +    mainloop_api->enable_io(mainloop_api, stdin_source, PA_STREAM_PLAYBACK); + +    if (!buffer) +        return; +     +    assert(buffer_length); +     +    l = length; +    if (l > buffer_length) +        l = buffer_length; +     +    pa_stream_write(s, buffer+buffer_index, l); +    buffer_length -= l; +    buffer_index += l; +     +    if (!buffer_length) { +        free(buffer); +        buffer = NULL; +        buffer_index = buffer_length = 0; +    } +} + +static void stream_complete_callback(struct pa_context*c, struct pa_stream *s, void *userdata) { +    assert(c); + +    if (!s) { +        fprintf(stderr, "Stream creation failed.\n"); +        mainloop_api->quit(mainloop_api, 1); +    } + +    stream = s; +    pa_stream_set_die_callback(stream, stream_die_callback, NULL); +    pa_stream_set_write_callback(stream, stream_write_callback, NULL); +} + +static void context_complete_callback(struct pa_context *c, int success, void *userdata) { +    static const struct pa_sample_spec ss = { +        .format = SAMPLE_S16NE, +        .rate = 44100, +        .channels = 2 +    }; +         +    assert(c && !stream); + +    if (!success) { +        fprintf(stderr, "Connection failed\n"); +        goto fail; +    } +     +    if (pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL) < 0) { +        fprintf(stderr, "pa_stream_new() failed.\n"); +        goto fail; +    } + +    return; +     +fail: +    mainloop_api->quit(mainloop_api, 1); +} + +static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { +    size_t l; +    ssize_t r; +    assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT); + +    if (buffer) { +        mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL); +        return; +    } + +    if (!(l = pa_stream_writable_size(stream))) +        l = 4096; +    buffer = malloc(l); +    assert(buffer); +    if ((r = read(fd, buffer, l)) <= 0) { +        if (r == 0) +            mainloop_api->quit(mainloop_api, 0); +        else { +            fprintf(stderr, "read() failed: %s\n", strerror(errno)); +            mainloop_api->quit(mainloop_api, 1); +        } + +        return; +    } + +    buffer_length = r; +    buffer_index = 0; +} + +int main(int argc, char *argv[]) { +    struct pa_mainloop* m; +    int ret = 1; + +    if (!(m = pa_mainloop_new())) { +        fprintf(stderr, "pa_mainloop_new() failed.\n"); +        goto quit; +    } + +    mainloop_api = pa_mainloop_get_api(m); + +    if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) { +        fprintf(stderr, "source_io() failed.\n"); +        goto quit; +    } +     +    if (!(context = pa_context_new(mainloop_api, argv[0]))) { +        fprintf(stderr, "pa_context_new() failed.\n"); +        goto quit; +    } + +    if (pa_context_connect(context, NULL, context_complete_callback, NULL) < 0) { +        fprintf(stderr, "pa_context_connext() failed.\n"); +        goto quit; +    } +         +    pa_context_set_die_callback(context, context_die_callback, NULL); + +    if (pa_mainloop_run(m, &ret) < 0) { +        fprintf(stderr, "pa_mainloop_run() failed.\n"); +        goto quit; +    } +     +quit: +    if (stream) +        pa_stream_free(stream); +    if (context) +        pa_context_free(context); +    if (m) +        pa_mainloop_free(m); +    if (buffer) +        free(buffer); +     +    return ret; +} diff --git a/src/packet.c b/src/packet.c index 47fce919..0f966d9a 100644 --- a/src/packet.c +++ b/src/packet.c @@ -16,7 +16,7 @@ struct packet* packet_new(size_t length) {      return p;  } -struct packet* packet_dynamic(uint8_t* data, size_t length) { +struct packet* packet_new_dynamic(uint8_t* data, size_t length) {      struct packet *p;      assert(data && length);      p = malloc(sizeof(struct packet)); @@ -26,6 +26,7 @@ struct packet* packet_dynamic(uint8_t* data, size_t length) {      p->length = length;      p->data = data;      p->type = PACKET_DYNAMIC; +    return p;  }  struct packet* packet_ref(struct packet *p) { diff --git a/src/pdispatch.c b/src/pdispatch.c new file mode 100644 index 00000000..48b6639d --- /dev/null +++ b/src/pdispatch.c @@ -0,0 +1,149 @@ +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include "pdispatch.h" +#include "protocol-native-spec.h" + +struct reply_info { +    struct pdispatch *pdispatch; +    struct reply_info *next, *previous; +    int (*callback)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +    void *userdata; +    uint32_t tag; +    void *mainloop_timeout; +}; + +struct pdispatch { +    struct pa_mainloop_api *mainloop; +    const struct pdispatch_command *command_table; +    unsigned n_commands; +    struct reply_info *replies; +}; + +static void reply_info_free(struct reply_info *r) { +    assert(r && r->pdispatch && r->pdispatch->mainloop); +    r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout); + +    if (r->previous) +        r->previous->next = r->next; +    else +        r->pdispatch->replies = r->next; + +    if (r->next) +        r->next->previous = r->previous; +     +    free(r); +} + +struct pdispatch* pdispatch_new(struct pa_mainloop_api *mainloop, const struct pdispatch_command*table, unsigned entries) { +    struct pdispatch *pd; +    assert(mainloop); + +    assert((entries && table) || (!entries && !table)); +     +    pd = malloc(sizeof(struct pdispatch)); +    assert(pd); +    pd->mainloop = mainloop; +    pd->command_table = table; +    pd->n_commands = entries; +    return pd; +} + +void pdispatch_free(struct pdispatch *pd) { +    assert(pd); +    while (pd->replies) +        reply_info_free(pd->replies); +    free(pd); +} + +int pdispatch_run(struct pdispatch *pd, struct packet*packet, void *userdata) { +    uint32_t tag, command; +    assert(pd && packet); +    struct tagstruct *ts = NULL; +    assert(pd && packet && packet->data); + +    if (packet->length <= 8) +        goto fail; + +    ts = tagstruct_new(packet->data, packet->length); +    assert(ts); +     +    if (tagstruct_getu32(ts, &command) < 0 || +        tagstruct_getu32(ts, &tag) < 0) +        goto fail; + +    if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) { +        struct reply_info *r; +        int done = 0; + +        for (r = pd->replies; r; r = r->next) { +            if (r->tag == tag) { +                int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata); +                reply_info_free(r); +                 +                if (ret < 0) +                    goto fail; +                 +                done = 1; +                break; +            } +        } + +        if (!done) +            goto fail; + +    } else if (pd->command_table && command < pd->n_commands) { +        const struct pdispatch_command *c = pd->command_table+command; + +        if (!c->proc) +            goto fail; +         +        if (c->proc(pd, command, tag, ts, userdata) < 0) +            goto fail; +    } else +        goto fail; +     +    tagstruct_free(ts);     +         +    return 0; + +fail: +    if (ts) +        tagstruct_free(ts);     + +    fprintf(stderr, "protocol-native: invalid packet.\n"); +    return -1; +} + +static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) { +    struct reply_info*r = userdata; +    assert (r && r->mainloop_timeout == id && r->pdispatch && r->pdispatch->mainloop == m && r->callback); + +    r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata); +    reply_info_free(r); +} + +void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata) { +    struct reply_info *r; +    struct timeval tv; +    assert(pd && cb); + +    r = malloc(sizeof(struct reply_info)); +    assert(r); +    r->pdispatch = pd; +    r->callback = cb; +    r->userdata = userdata; +    r->tag = tag; + +    gettimeofday(&tv, NULL); +    tv.tv_sec += timeout; + +    r->mainloop_timeout = pd->mainloop->source_time(pd->mainloop, &tv, timeout_callback, r); +    assert(r->mainloop_timeout); + +    r->previous = NULL; +    r->next = pd->replies; +    if (r->next) +        r->next->previous = r; +    pd->replies = r; +} diff --git a/src/pdispatch.h b/src/pdispatch.h new file mode 100644 index 00000000..466da9a4 --- /dev/null +++ b/src/pdispatch.h @@ -0,0 +1,22 @@ +#ifndef foopdispatchhfoo +#define foopdispatchhfoo + +#include <inttypes.h> +#include "tagstruct.h" +#include "packet.h" +#include "mainloop-api.h" + +struct pdispatch; + +struct pdispatch_command { +    int (*proc)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +}; + +struct pdispatch* pdispatch_new(struct pa_mainloop_api *m, const struct pdispatch_command*table, unsigned entries); +void pdispatch_free(struct pdispatch *pd); + +int pdispatch_run(struct pdispatch *pd, struct packet*p, void *userdata); + +void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata); + +#endif diff --git a/src/polyp.c b/src/polyp.c new file mode 100644 index 00000000..fdff7f9c --- /dev/null +++ b/src/polyp.c @@ -0,0 +1,451 @@ +#include <stdio.h> +#include <assert.h> +#include <stdlib.h> +#include <string.h> + +#include "polyp.h" +#include "protocol-native-spec.h" +#include "pdispatch.h" +#include "pstream.h" +#include "dynarray.h" +#include "socket-client.h" +#include "pstream-util.h" + +#define DEFAULT_QUEUE_LENGTH 10240 +#define DEFAULT_MAX_LENGTH 20480 +#define DEFAULT_PREBUF 4096 +#define DEFAULT_TIMEOUT 5 + +struct pa_context { +    char *name; +    struct pa_mainloop_api* mainloop; +    struct socket_client *client; +    struct pstream *pstream; +    struct pdispatch *pdispatch; +    struct dynarray *streams; +    struct pa_stream *first_stream; +    uint32_t ctag; +    uint32_t errno; +    enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_READY, CONTEXT_DEAD} state; + +    void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata); +    void *connect_complete_userdata; + +    void (*die_callback)(struct pa_context*c, void *userdata); +    void *die_userdata; +}; + +struct pa_stream { +    struct pa_context *context; +    struct pa_stream *next, *previous; +    uint32_t channel; +    int channel_valid; +    enum pa_stream_direction direction; +    enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state; +    uint32_t requested_bytes; + +    void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata); +    void *read_userdata; + +    void (*write_callback)(struct pa_stream *p, size_t length, void *userdata); +    void *write_userdata; +     +    void (*create_complete_callback)(struct pa_context*c, struct pa_stream *s, void *userdata); +    void *create_complete_userdata; +     +    void (*die_callback)(struct pa_stream*c, void *userdata); +    void *die_userdata; +}; + +static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); + +static const struct pdispatch_command command_table[PA_COMMAND_MAX] = { +    [PA_COMMAND_ERROR] = { NULL }, +    [PA_COMMAND_REPLY] = { NULL }, +    [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL }, +    [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL }, +    [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL }, +    [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL }, +    [PA_COMMAND_EXIT] = { NULL }, +    [PA_COMMAND_REQUEST] = { command_request }, +}; + +struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) { +    assert(mainloop && name); +    struct pa_context *c; +    c = malloc(sizeof(struct pa_context)); +    assert(c); +    c->name = strdup(name); +    c->mainloop = mainloop; +    c->client = NULL; +    c->pstream = NULL; +    c->pdispatch = NULL; +    c->streams = dynarray_new(); +    assert(c->streams); +    c->first_stream = NULL; +    c->errno = PA_ERROR_OK; +    c->state = CONTEXT_UNCONNECTED; +    c->ctag = 0; + +    c->connect_complete_callback = NULL; +    c->connect_complete_userdata = NULL; + +    c->die_callback = NULL; +    c->die_userdata = NULL; +     +    return c; +} + +void pa_context_free(struct pa_context *c) { +    assert(c); + +    while (c->first_stream) +        pa_stream_free(c->first_stream); +     +    if (c->client) +        socket_client_free(c->client); +    if (c->pdispatch) +        pdispatch_free(c->pdispatch); +    if (c->pstream) +        pstream_free(c->pstream); +    if (c->streams) +        dynarray_free(c->streams, NULL, NULL); +         +    free(c->name); +    free(c); +} + +static void stream_dead(struct pa_stream *s) { +    if (s->state == STREAM_DEAD) +        return; + +    s->state = STREAM_DEAD; +    if (s->die_callback) +        s->die_callback(s, s->die_userdata); +} + +static void context_dead(struct pa_context *c) { +    struct pa_stream *s; +    assert(c); +     +    for (s = c->first_stream; s; s = s->next) +        stream_dead(s); + +    if (c->state == CONTEXT_DEAD) +        return; +     +    c->state = CONTEXT_DEAD; +    if (c->die_callback) +        c->die_callback(c, c->die_userdata); +} + +static void pstream_die_callback(struct pstream *p, void *userdata) { +    struct pa_context *c = userdata; +    assert(p && c); + +    assert(c->state != CONTEXT_DEAD); +     +    c->state = CONTEXT_DEAD; + +    context_dead(c); +} + +static int pstream_packet_callback(struct pstream *p, struct packet *packet, void *userdata) { +    struct pa_context *c = userdata; +    assert(p && packet && c); + +    if (pdispatch_run(c->pdispatch, packet, c) < 0) { +        fprintf(stderr, "polyp.c: invalid packet.\n"); +        return -1; +    } + +    return 0; +} + +static int pstream_memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) { +    struct pa_context *c = userdata; +    struct pa_stream *s; +    assert(p && chunk && c && chunk->memblock && chunk->memblock->data); + +    if (!(s = dynarray_get(c->streams, channel))) +        return -1; + +    if (s->read_callback) +        s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata); +     +    return 0; +} + +static void on_connection(struct socket_client *client, struct iochannel*io, void *userdata) { +    struct pa_context *c = userdata; +    assert(client && io && c && c->state == CONTEXT_CONNECTING); + +    socket_client_free(client); +    c->client = NULL; + +    if (!io) { +        c->errno = PA_ERROR_CONNECTIONREFUSED; +        c->state = CONTEXT_UNCONNECTED; + +        if (c->connect_complete_callback) +            c->connect_complete_callback(c, 0, c->connect_complete_userdata); + +        return; +    } +     +    c->pstream = pstream_new(c->mainloop, io); +    assert(c->pstream); +    pstream_set_die_callback(c->pstream, pstream_die_callback, c); +    pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c); +    pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c); +     +    c->pdispatch = pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX); +    assert(c->pdispatch); + +    c->state = CONTEXT_READY; + +    if (c->connect_complete_callback) +        c->connect_complete_callback(c, 1, c->connect_complete_userdata); +} + +int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) { +    assert(c && c->state == CONTEXT_UNCONNECTED); + +    assert(!c->client); +    if (!(c->client = socket_client_new_unix(c->mainloop, server))) { +        c->errno = PA_ERROR_CONNECTIONREFUSED; +        return -1; +    } + +    c->connect_complete_callback = complete; +    c->connect_complete_userdata = userdata; +     +    socket_client_set_callback(c->client, on_connection, c); +    c->state = CONTEXT_CONNECTING; + +    return 0; +} + +int pa_context_is_dead(struct pa_context *c) { +    assert(c); +    return c->state == CONTEXT_DEAD; +} + +int pa_context_is_ready(struct pa_context *c) { +    assert(c); +    return c->state == CONTEXT_READY; +} + +int pa_context_errno(struct pa_context *c) { +    assert(c); +    return c->errno; +} + +void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) { +    assert(c); +    c->die_callback = cb; +    c->die_userdata = userdata; +} + +static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { +    struct pa_stream *s; +    struct pa_context *c = userdata; +    uint32_t bytes, channel; +    assert(pd && command == PA_COMMAND_REQUEST && t &&  s); + +    if (tagstruct_getu32(t, &channel) < 0 || +        tagstruct_getu32(t, &bytes) < 0 || +        tagstruct_eof(t)) { +        c->errno = PA_ERROR_PROTOCOL; +        return -1; +    } +     +    if (!(s = dynarray_get(c->streams, channel))) { +        c->errno = PA_ERROR_PROTOCOL; +        return -1; +    } + +    s->requested_bytes += bytes; +     +    if (s->requested_bytes && s->write_callback) +        s->write_callback(s, s->requested_bytes, s->write_userdata); + +    return 0; +} + +static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { +    int ret = 0; +    struct pa_stream *s = userdata; +    assert(pd && s && s->state == STREAM_CREATING); + +    if (command != PA_COMMAND_REPLY) { +        struct pa_context *c = s->context; +        assert(c); + +        if (command == PA_COMMAND_ERROR && tagstruct_getu32(t, &s->context->errno) < 0) { +            s->context->errno = PA_ERROR_PROTOCOL; +            ret = -1; +        } else if (command == PA_COMMAND_TIMEOUT) { +            s->context->errno = PA_ERROR_TIMEOUT; +            ret = -1; +        } + +        goto fail; +    } + +    if (tagstruct_getu32(t, &s->channel) < 0 || +        tagstruct_eof(t)) { +        s->context->errno = PA_ERROR_PROTOCOL; +        ret = -1; +        goto fail; +    } + +    s->channel_valid = 1; +    dynarray_put(s->context->streams, s->channel, s); +     +    s->state = STREAM_READY; +    assert(s->create_complete_callback); +    s->create_complete_callback(s->context, s, s->create_complete_userdata); +    return 0; + +fail: +    assert(s->create_complete_callback); +    s->create_complete_callback(s->context, NULL, s->create_complete_userdata); +    pa_stream_free(s); +    return ret; +} + +int pa_stream_new( +    struct pa_context *c, +    enum pa_stream_direction dir, +    const char *dev, +    const char *name, +    const struct pa_sample_spec *ss, +    const struct pa_buffer_attr *attr, +    void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata), +    void *userdata) { +     +    struct pa_stream *s; +    struct tagstruct *t; +    uint32_t tag; + +    assert(c && name && ss && c->state == CONTEXT_READY && complete); +     +    s = malloc(sizeof(struct pa_stream)); +    assert(s); +    s->context = c; + +    s->read_callback = NULL; +    s->read_userdata = NULL; +    s->write_callback = NULL; +    s->write_userdata = NULL; +    s->die_callback = NULL; +    s->die_userdata = NULL; +    s->create_complete_callback = complete; +    s->create_complete_userdata = NULL; + +    s->state = STREAM_CREATING; +    s->requested_bytes = 0; +    s->channel = 0; +    s->channel_valid = 0; +    s->direction = dir; + +    t = tagstruct_new(NULL, 0); +    assert(t); +     +    tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM); +    tagstruct_putu32(t, tag = c->ctag++); +    tagstruct_puts(t, name); +    tagstruct_put_sample_spec(t, ss); +    tagstruct_putu32(t, (uint32_t) -1); +    tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH); +    tagstruct_putu32(t, attr ? attr->max_length  : DEFAULT_MAX_LENGTH); +    tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF); + +    pstream_send_tagstruct(c->pstream, t); + +    pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s); +     +    s->next = c->first_stream; +    if (s->next) +        s->next->previous = s; +    s->previous = NULL; +    c->first_stream = s; + +    return 0; +} + +void pa_stream_free(struct pa_stream *s) { +    assert(s && s->context); +     +    if (s->channel_valid) { +        struct tagstruct *t = tagstruct_new(NULL, 0); +        assert(t); +     +        tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM); +        tagstruct_putu32(t, s->context->ctag++); +        tagstruct_putu32(t, s->channel); +        pstream_send_tagstruct(s->context->pstream, t); +    } +     +    if (s->channel_valid) +        dynarray_put(s->context->streams, s->channel, NULL); + +    if (s->next) +        s->next->previous = s->previous; +    if (s->previous) +        s->previous->next = s->next; +    else +        s->context->first_stream = s->next; +     +    free(s); +} + +void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) { +    assert(s && cb); +    s->write_callback = cb; +    s->write_userdata = userdata; +} + +void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { +    struct memchunk chunk; +    assert(s && s->context && data && length); + +    chunk.memblock = memblock_new(length); +    assert(chunk.memblock && chunk.memblock->data); +    memcpy(chunk.memblock->data, data, length); +    chunk.index = 0; +    chunk.length = length; + +    pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk); + +    if (length < s->requested_bytes) +        s->requested_bytes -= length; +    else +        s->requested_bytes = 0; +} + +size_t pa_stream_writable_size(struct pa_stream *s) { +    assert(s); +    return s->requested_bytes; +} + +void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) { +    assert(s && cb); +    s->read_callback = cb; +    s->read_userdata = userdata; +} + +int pa_stream_is_dead(struct pa_stream *s) { +    return s->state == STREAM_DEAD; +} + +int pa_stream_is_ready(struct pa_stream*s) { +    return s->state == STREAM_READY; +} + +void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) { +    assert(s); +    s->die_callback = cb; +    s->die_userdata = userdata; +} diff --git a/src/polyp.h b/src/polyp.h new file mode 100644 index 00000000..171e3bdf --- /dev/null +++ b/src/polyp.h @@ -0,0 +1,53 @@ +#ifndef foopolyphfoo +#define foopolyphfoo + +#include <sys/types.h> + +#include "sample.h" +#include "polypdef.h" +#include "mainloop-api.h" + +struct pa_context; + +struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name); + +int pa_context_connect( +    struct pa_context *c, +    const char *server, +    void (*complete) (struct pa_context*c, int success, void *userdata), +    void *userdata); + +void pa_context_free(struct pa_context *c); + +void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata); + +int pa_context_is_dead(struct pa_context *c); +int pa_context_is_ready(struct pa_context *c); +int pa_contect_errno(struct pa_context *c); + +struct pa_stream; + +int pa_stream_new( +    struct pa_context *c, +    enum pa_stream_direction dir, +    const char *dev, +    const char *name, +    const struct pa_sample_spec *ss, +    const struct pa_buffer_attr *attr, +    void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata), +    void *userdata); + +void pa_stream_free(struct pa_stream *p); + +void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata); + +void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata); +void pa_stream_write(struct pa_stream *p, const void *data, size_t length); +size_t pa_stream_writable_size(struct pa_stream *p); + +void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata); + +int pa_stream_is_dead(struct pa_stream *p); +int pa_stream_is_ready(struct pa_stream*p); + +#endif diff --git a/src/polypdef.h b/src/polypdef.h new file mode 100644 index 00000000..aa6e6bf6 --- /dev/null +++ b/src/polypdef.h @@ -0,0 +1,18 @@ +#ifndef foopolypdefhfoo +#define foopolypdefhfoo + +#include <inttypes.h> + +enum pa_stream_direction { +    PA_STREAM_PLAYBACK, +    PA_STREAM_RECORD +}; + +struct pa_buffer_attr { +    uint32_t queue_length; +    uint32_t max_length; +    uint32_t prebuf; +}; + + +#endif diff --git a/src/protocol-cli.c b/src/protocol-cli.c index c0c93d98..b6460fec 100644 --- a/src/protocol-cli.c +++ b/src/protocol-cli.c @@ -12,8 +12,7 @@ struct protocol_cli {  static void cli_eof_cb(struct cli*c, void*userdata) {      struct protocol_cli *p = userdata; -    assert(c && p); - +    assert(p);      idxset_remove_by_data(p->connections, c, NULL);      cli_free(c);  } @@ -22,7 +21,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us      struct protocol_cli *p = userdata;      struct cli *c;      assert(s && io && p); -     +      c = cli_new(p->core, io);      assert(c);      cli_set_eof_callback(c, cli_eof_cb, p); diff --git a/src/protocol-native-spec.h b/src/protocol-native-spec.h new file mode 100644 index 00000000..df11ae3c --- /dev/null +++ b/src/protocol-native-spec.h @@ -0,0 +1,29 @@ +#ifndef fooprotocolnativespech +#define fooprotocolnativespech + +enum { +    PA_COMMAND_ERROR, +    PA_COMMAND_TIMEOUT, /* pseudo command */ +    PA_COMMAND_REPLY, +    PA_COMMAND_CREATE_PLAYBACK_STREAM, +    PA_COMMAND_DELETE_PLAYBACK_STREAM, +    PA_COMMAND_CREATE_RECORD_STREAM, +    PA_COMMAND_DELETE_RECORD_STREAM, +    PA_COMMAND_EXIT, +    PA_COMMAND_REQUEST, +    PA_COMMAND_MAX +}; + +enum { +    PA_ERROR_OK, +    PA_ERROR_ACCESS, +    PA_ERROR_COMMAND, +    PA_ERROR_INVALID, +    PA_ERROR_EXIST, +    PA_ERROR_NOENTITY, +    PA_ERROR_CONNECTIONREFUSED, +    PA_ERROR_PROTOCOL, +    PA_ERROR_TIMEOUT +}; + +#endif diff --git a/src/protocol-native.c b/src/protocol-native.c index e9cca7c1..a39880b8 100644 --- a/src/protocol-native.c +++ b/src/protocol-native.c @@ -3,34 +3,19 @@  #include <stdlib.h>  #include "protocol-native.h" +#include "protocol-native-spec.h"  #include "packet.h"  #include "client.h"  #include "sourceoutput.h"  #include "sinkinput.h"  #include "pstream.h"  #include "tagstruct.h" +#include "pdispatch.h" +#include "pstream-util.h"  struct connection;  struct protocol_native; -enum { -    COMMAND_ERROR, -    COMMAND_REPLY, -    COMMAND_CREATE_PLAYBACK_STREAM, -    COMMAND_DELETE_PLAYBACK_STREAM, -    COMMAND_CREATE_RECORD_STREAM, -    COMMAND_DELETE_RECORD_STREAM, -    COMMAND_EXIT, -    COMMAND_MAX -}; - -enum { -    ERROR_ACCESS, -    ERROR_COMMAND, -    ERROR_ARGUMENT, -    ERROR_EXIST -}; -  struct record_stream {      struct connection *connection;      uint32_t index; @@ -41,6 +26,7 @@ struct record_stream {  struct playback_stream {      struct connection *connection;      uint32_t index; +    size_t qlength;      struct sink_input *sink_input;      struct memblockq *memblockq;  }; @@ -50,6 +36,7 @@ struct connection {      struct protocol_native *protocol;      struct client *client;      struct pstream *pstream; +    struct pdispatch *pdispatch;      struct idxset *record_streams, *playback_streams;  }; @@ -60,6 +47,29 @@ struct protocol_native {      struct idxset *connections;  }; +static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk); +static void sink_input_drop_cb(struct sink_input *i, size_t length); +static void sink_input_kill_cb(struct sink_input *i); +static uint32_t sink_input_get_latency_cb(struct sink_input *i); + +static void request_bytes(struct playback_stream*s); + +static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); + +static const struct pdispatch_command command_table[PA_COMMAND_MAX] = { +    [PA_COMMAND_ERROR] = { NULL }, +    [PA_COMMAND_REPLY] = { NULL }, +    [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream }, +    [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream }, +    [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL }, +    [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL }, +    [PA_COMMAND_EXIT] = { command_exit }, +}; + +/* structure management */ +  static void record_stream_free(struct record_stream* r) {      assert(r && r->connection); @@ -69,18 +79,28 @@ static void record_stream_free(struct record_stream* r) {      free(r);  } -static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct sample_spec *ss, const char *name, size_t maxlength, size_t prebuf) { +static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) {      struct playback_stream *s; +    assert(c && sink && s && name && qlen && maxlength && prebuf);      s = malloc(sizeof(struct playback_stream));      assert (s);      s->connection = c; +    s->qlength = qlen; +          s->sink_input = sink_input_new(sink, ss, name);      assert(s->sink_input); -    s->memblockq = memblockq_new(maxlength, sample_size(ss), prebuf); +    s->sink_input->peek = sink_input_peek_cb; +    s->sink_input->drop = sink_input_drop_cb; +    s->sink_input->kill = sink_input_kill_cb; +    s->sink_input->get_latency = sink_input_get_latency_cb; +    s->sink_input->userdata = s; +     +    s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf);      assert(s->memblockq);      idxset_put(c->playback_streams, s, &s->index); +    request_bytes(s);      return s;  } @@ -99,7 +119,6 @@ static void connection_free(struct connection *c) {      assert(c && c->protocol);      idxset_remove_by_data(c->protocol->connections, c, NULL); -    pstream_free(c->pstream);      while ((r = idxset_first(c->record_streams, NULL)))          record_stream_free(r);      idxset_free(c->record_streams, NULL, NULL); @@ -108,67 +127,90 @@ static void connection_free(struct connection *c) {          playback_stream_free(p);      idxset_free(c->playback_streams, NULL, NULL); +    pdispatch_free(c->pdispatch); +    pstream_free(c->pstream);      client_free(c->client);      free(c);  } -/*** pstream callbacks ***/ +static void request_bytes(struct playback_stream *s) { +    struct tagstruct *t; +    size_t l; +    assert(s); -static void send_tagstruct(struct pstream *p, struct tagstruct *t) { -    size_t length; -    uint8_t *data; -    struct packet *packet; -    assert(p && t); - -    data = tagstruct_free_data(t, &length); -    assert(data && length); -    packet = packet_new_dynamic(data, length); -    assert(packet); -    pstream_send_packet(p, packet); -    packet_unref(packet); -} +    if (!(l = memblockq_missing_to(s->memblockq, s->qlength))) +        return; -static void send_error(struct pstream *p, uint32_t tag, uint32_t error) { -    struct tagstruct *t = tagstruct_new(NULL, 0); +    t = tagstruct_new(NULL, 0);      assert(t); -    tagstruct_putu32(t, COMMAND_ERROR); -    tagstruct_putu32(t, tag); -    tagstruct_putu32(t, error); -    send_tagstruct(p, t); +    tagstruct_putu32(t, PA_COMMAND_REQUEST); +    tagstruct_putu32(t, s->index); +    tagstruct_putu32(t, l); +    pstream_send_tagstruct(s->connection->pstream, t);  } -static void send_simple_ack(struct pstream *p, uint32_t tag) { -    struct tagstruct *t = tagstruct_new(NULL, 0); -    assert(t); -    tagstruct_putu32(t, COMMAND_REPLY); -    tagstruct_putu32(t, tag); -    send_tagstruct(p, t); +/*** sinkinput callbacks ***/ + +static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) { +    struct playback_stream *s; +    assert(i && i->userdata && chunk); +    s = i->userdata; + +    if (memblockq_peek(s->memblockq, chunk) < 0) +        return -1; + +    return 0;  } -struct command { -    int (*func)(struct connection *c, uint32_t tag, struct tagstruct *t); -}; +static void sink_input_drop_cb(struct sink_input *i, size_t length) { +    struct playback_stream *s; +    assert(i && i->userdata && length); +    s = i->userdata; -static int command_create_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) { +    memblockq_drop(s->memblockq, length); +    request_bytes(s); +} + +static void sink_input_kill_cb(struct sink_input *i) {      struct playback_stream *s; -    size_t maxlength, prebuf; +    assert(i && i->userdata); +    s = i->userdata; + +    playback_stream_free(s); +} + +static uint32_t sink_input_get_latency_cb(struct sink_input *i) { +    struct playback_stream *s; +    assert(i && i->userdata); +    s = i->userdata; + +    return pa_samples_usec(memblockq_get_length(s->memblockq), &s->sink_input->sample_spec); +} + +/*** pdispatch callbacks ***/ + +static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { +    struct connection *c = userdata; +    struct playback_stream *s; +    size_t maxlength, prebuf, qlength;      uint32_t sink_index;      const char *name; -    struct sample_spec ss; +    struct pa_sample_spec ss;      struct tagstruct *reply;      struct sink *sink;      assert(c && t && c->protocol && c->protocol->core);      if (tagstruct_gets(t, &name) < 0 ||          tagstruct_get_sample_spec(t, &ss) < 0 || -        tagstruct_getu32(t, &sink_index) < 0 ||  +        tagstruct_getu32(t, &sink_index) < 0 || +        tagstruct_getu32(t, &qlength) < 0 ||          tagstruct_getu32(t, &maxlength) < 0 ||          tagstruct_getu32(t, &prebuf) < 0 ||          !tagstruct_eof(t))          return -1;      if (!c->authorized) { -        send_error(c->pstream, tag, ERROR_ACCESS); +        pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);          return 0;      } @@ -178,25 +220,28 @@ static int command_create_playback_stream(struct connection *c, uint32_t tag, st          sink = idxset_get_by_index(c->protocol->core->sinks, sink_index);      if (!sink) { -        send_error(c->pstream, tag, ERROR_EXIST); +        pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);          return 0;      } -    if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, prebuf))) { -        send_error(c->pstream, tag, ERROR_ARGUMENT); +    if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) { +        pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);          return 0;      }      reply = tagstruct_new(NULL, 0);      assert(reply); -    tagstruct_putu32(reply, COMMAND_REPLY); +    tagstruct_putu32(reply, PA_COMMAND_REPLY);      tagstruct_putu32(reply, tag);      tagstruct_putu32(reply, s->index); -    send_tagstruct(c->pstream, reply); +    assert(s->sink_input); +    tagstruct_putu32(reply, s->sink_input->index); +    pstream_send_tagstruct(c->pstream, reply);      return 0;  } -static int command_delete_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) { +static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { +    struct connection *c = userdata;      uint32_t channel;      struct playback_stream *s;      assert(c && t); @@ -206,78 +251,50 @@ static int command_delete_playback_stream(struct connection *c, uint32_t tag, st          return -1;      if (!c->authorized) { -        send_error(c->pstream, tag, ERROR_ACCESS); +        pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);          return 0;      }      if (!(s = idxset_get_by_index(c->playback_streams, channel))) { -        send_error(c->pstream, tag, ERROR_EXIST); +        pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);          return 0;      } -    send_simple_ack(c->pstream, tag); +    pstream_send_simple_ack(c->pstream, tag);      return 0;  } -static int command_exit(struct connection *c, uint32_t tag, struct tagstruct *t) { +static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { +    struct connection *c = userdata;      assert(c && t);      if (!tagstruct_eof(t))          return -1;      if (!c->authorized) { -        send_error(c->pstream, tag, ERROR_ACCESS); +        pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);          return 0;      } -    assert(c->protocol && c->protocol->core); -    mainloop_quit(c->protocol->core->mainloop, -1); -    send_simple_ack(c->pstream, tag); /* nonsense */ +    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop); +    c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0); +    pstream_send_simple_ack(c->pstream, tag); /* nonsense */      return 0;  } -static const struct command commands[] = { -    [COMMAND_ERROR] = { NULL }, -    [COMMAND_REPLY] = { NULL }, -    [COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream }, -    [COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream }, -    [COMMAND_CREATE_RECORD_STREAM] = { NULL }, -    [COMMAND_DELETE_RECORD_STREAM] = { NULL }, -    [COMMAND_EXIT] = { command_exit }, -}; +/*** pstream callbacks ***/ +  static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) {      struct connection *c = userdata; -    uint32_t tag, command; -    struct tagstruct *ts = NULL;      assert(p && packet && packet->data && c); -    if (packet->length <= 8) -        goto fail; - -    ts = tagstruct_new(packet->data, packet->length); -    assert(ts); - -    if (tagstruct_getu32(ts, &command) < 0 || -        tagstruct_getu32(ts, &tag) < 0) -        goto fail; - -    if (command >= COMMAND_MAX || !commands[command].func) -        send_error(p, tag, ERROR_COMMAND); -    else if (commands[command].func(c, tag, ts) < 0) -        goto fail; +    if (pdispatch_run(c->pdispatch, packet, c) < 0) { +        fprintf(stderr, "protocol-native: invalid packet.\n"); +        return -1; +    } -    tagstruct_free(ts);     -              return 0; - -fail: -    if (ts) -        tagstruct_free(ts);     - -    fprintf(stderr, "protocol-native: invalid packet.\n"); -    return -1; -      }  static int memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) { @@ -326,6 +343,9 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us      pstream_set_recieve_memblock_callback(c->pstream, memblock_callback, c);      pstream_set_die_callback(c->pstream, die_callback, c); +    c->pdispatch = pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX); +    assert(c->pdispatch); +      c->record_streams = idxset_new(NULL, NULL);      c->playback_streams = idxset_new(NULL, NULL);      assert(c->record_streams && c->playback_streams); diff --git a/src/protocol-simple.c b/src/protocol-simple.c index 8e4246cd..c8c45854 100644 --- a/src/protocol-simple.c +++ b/src/protocol-simple.c @@ -9,6 +9,7 @@  #include "sourceoutput.h"  #include "protocol-simple.h"  #include "client.h" +#include "sample-util.h"  struct connection {      struct protocol_simple *protocol; @@ -115,9 +116,10 @@ static int do_write(struct connection *c) {  /*** sink_input callbacks ***/  static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) { -    struct connection*c = i->userdata; -    assert(i && c && chunk); - +    struct connection*c; +    assert(i && i->userdata && chunk); +    c = i->userdata; +          if (memblockq_peek(c->input_memblockq, chunk) < 0)          return -1; @@ -143,7 +145,7 @@ static void sink_input_kill_cb(struct sink_input *i) {  static uint32_t sink_input_get_latency_cb(struct sink_input *i) {      struct connection*c = i->userdata;      assert(i && c); -    return samples_usec(memblockq_get_length(c->input_memblockq), &DEFAULT_SAMPLE_SPEC); +    return pa_samples_usec(memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);  }  /*** source_output callbacks ***/ @@ -185,6 +187,7 @@ static void io_callback(struct iochannel*io, void *userdata) {  static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {      struct protocol_simple *p = userdata;      struct connection *c = NULL; +    char cname[256];      assert(s && io && p);      c = malloc(sizeof(struct connection)); @@ -195,7 +198,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us      c->input_memblockq = c->output_memblockq = NULL;      c->protocol = p; -    c->client = client_new(p->core, "SIMPLE", "Client"); +    iochannel_peer_to_string(io, cname, sizeof(cname)); +    c->client = client_new(p->core, "SIMPLE", cname);      assert(c->client);      c->client->kill = client_kill_cb;      c->client->userdata = c; @@ -215,8 +219,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us          c->source_output->kill = source_output_kill_cb;          c->source_output->userdata = c; -        l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */ -        c->output_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2); +        l = 5*pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */ +        c->output_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);      }      if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) { @@ -236,8 +240,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us          c->sink_input->get_latency = sink_input_get_latency_cb;          c->sink_input->userdata = c; -        l = bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */ -        c->input_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2); +        l = pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */ +        c->input_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);      } diff --git a/src/pstream-util.c b/src/pstream-util.c new file mode 100644 index 00000000..2fab2b61 --- /dev/null +++ b/src/pstream-util.c @@ -0,0 +1,35 @@ +#include <assert.h> + +#include "protocol-native-spec.h" +#include "pstream-util.h" + +void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t) { +    size_t length; +    uint8_t *data; +    struct packet *packet; +    assert(p && t); + +    data = tagstruct_free_data(t, &length); +    assert(data && length); +    packet = packet_new_dynamic(data, length); +    assert(packet); +    pstream_send_packet(p, packet); +    packet_unref(packet); +} + +void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error) { +    struct tagstruct *t = tagstruct_new(NULL, 0); +    assert(t); +    tagstruct_putu32(t, PA_COMMAND_ERROR); +    tagstruct_putu32(t, tag); +    tagstruct_putu32(t, error); +    pstream_send_tagstruct(p, t); +} + +void pstream_send_simple_ack(struct pstream *p, uint32_t tag) { +    struct tagstruct *t = tagstruct_new(NULL, 0); +    assert(t); +    tagstruct_putu32(t, PA_COMMAND_REPLY); +    tagstruct_putu32(t, tag); +    pstream_send_tagstruct(p, t); +} diff --git a/src/pstream-util.h b/src/pstream-util.h new file mode 100644 index 00000000..4e64a95c --- /dev/null +++ b/src/pstream-util.h @@ -0,0 +1,14 @@ +#ifndef foopstreamutilhfoo +#define foopstreamutilhfoo + +#include <inttypes.h> +#include "pstream.h" +#include "tagstruct.h" + +/* The tagstruct is freed!*/ +void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t); + +void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error); +void pstream_send_simple_ack(struct pstream *p, uint32_t tag); + +#endif diff --git a/src/pstream.c b/src/pstream.c index a63e126d..4a3a648b 100644 --- a/src/pstream.c +++ b/src/pstream.c @@ -30,7 +30,7 @@ struct item_info {  };  struct pstream { -    struct mainloop *mainloop; +    struct pa_mainloop_api *mainloop;      struct mainloop_source *mainloop_source;      struct iochannel *io;      struct queue *send_queue; @@ -70,18 +70,24 @@ static void do_read(struct pstream *p);  static void io_callback(struct iochannel*io, void *userdata) {      struct pstream *p = userdata;      assert(p && p->io == io); + +    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0); +          do_write(p);      do_read(p);  } -static void prepare_callback(struct mainloop_source *s, void*userdata) { +static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) {      struct pstream *p = userdata; -    assert(p && p->mainloop_source == s); +    assert(p && p->mainloop_source == id && p->mainloop == m); + +    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0); +          do_write(p);      do_read(p);  } -struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) { +struct pstream *pstream_new(struct pa_mainloop_api *m, struct iochannel *io) {      struct pstream *p;      assert(io); @@ -96,8 +102,8 @@ struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {      p->die_callback_userdata = NULL;      p->mainloop = m; -    p->mainloop_source = mainloop_source_new_fixed(m, prepare_callback, p); -    mainloop_source_enable(p->mainloop_source, 0); +    p->mainloop_source = m->source_fixed(m, fixed_callback, p); +    m->enable_fixed(m, p->mainloop_source, 0);      p->send_queue = queue_new();      assert(p->send_queue); @@ -152,7 +158,7 @@ void pstream_free(struct pstream *p) {      if (p->read.packet)          packet_unref(p->read.packet); -    mainloop_source_free(p->mainloop_source); +    p->mainloop->cancel_fixed(p->mainloop, p->mainloop_source);      free(p);  } @@ -173,7 +179,7 @@ void pstream_send_packet(struct pstream*p, struct packet *packet) {      i->packet = packet_ref(packet);      queue_push(p->send_queue, i); -    mainloop_source_enable(p->mainloop_source, 1); +    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);  }  void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) { @@ -190,7 +196,7 @@ void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, st      memblock_ref(i->chunk.memblock);      queue_push(p->send_queue, i); -    mainloop_source_enable(p->mainloop_source, 1); +    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);  }  void pstream_set_recieve_packet_callback(struct pstream *p, int (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) { @@ -219,7 +225,7 @@ static void prepare_next_write_item(struct pstream *p) {          assert(p->write.current->packet);          p->write.data = p->write.current->packet->data;          p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length); -        p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0; +        p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);          p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0;      } else {          assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock); @@ -236,8 +242,6 @@ static void do_write(struct pstream *p) {      ssize_t r;      assert(p); -    mainloop_source_enable(p->mainloop_source, 0); -      if (p->dead || !iochannel_is_writable(p->io))          return; @@ -285,8 +289,6 @@ static void do_read(struct pstream *p) {      ssize_t r;      assert(p); -    mainloop_source_enable(p->mainloop_source, 0); -          if (p->dead || !iochannel_is_readable(p->io))          return; @@ -313,7 +315,7 @@ static void do_read(struct pstream *p) {          assert(!p->read.packet && !p->read.memblock); -        if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == 0) { +        if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {              /* Frame is a packet frame */              p->read.packet = packet_new(ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]));              assert(p->read.packet); @@ -331,7 +333,7 @@ static void do_read(struct pstream *p) {          if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */              size_t l; -            l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r; +            l = (p->read.index - r) < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : (size_t) r;              if (l > 0) {                  struct memchunk chunk; diff --git a/src/pstream.h b/src/pstream.h index 7113681e..d418908e 100644 --- a/src/pstream.h +++ b/src/pstream.h @@ -6,10 +6,11 @@  #include "packet.h"  #include "memblock.h"  #include "iochannel.h" +#include "mainloop-api.h"  struct pstream; -struct pstream* pstream_new(struct mainloop *m, struct iochannel *io); +struct pstream* pstream_new(struct pa_mainloop_api *m, struct iochannel *io);  void pstream_free(struct pstream*p);  void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata); diff --git a/src/queue.c b/src/queue.c index 90823ae6..5c2e7a67 100644 --- a/src/queue.c +++ b/src/queue.c @@ -73,5 +73,12 @@ void* queue_pop(struct queue *q) {      p = e->data;      free(e); +    q->length--; +          return p;  } + +int queue_is_empty(struct queue *q) { +    assert(q); +    return q->length == 0; +} diff --git a/src/sample-util.c b/src/sample-util.c new file mode 100644 index 00000000..7a3c267a --- /dev/null +++ b/src/sample-util.c @@ -0,0 +1,88 @@ +#include <string.h> +#include <assert.h> + +#include "sample-util.h" + +struct pa_sample_spec default_sample_spec = { +    .format = SAMPLE_S16NE, +    .rate = 44100, +    .channels = 2 +}; + +struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec) { +    assert(b && b->data && spec); +    memblock_assert_exclusive(b); +    silence_memory(b->data, b->length, spec); +    return b; +} + +void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec) { +    assert(c && c->memblock && c->memblock->data && spec && c->length); +    memblock_assert_exclusive(c->memblock); +    silence_memory(c->memblock->data+c->index, c->length, spec); +} + +void silence_memory(void *p, size_t length, struct pa_sample_spec *spec) { +    char c = 0; +    assert(p && length && spec); + +    switch (spec->format) { +        case SAMPLE_U8: +            c = 127; +            break; +        case SAMPLE_S16LE: +        case SAMPLE_S16BE: +        case SAMPLE_FLOAT32: +            c = 0; +            break; +        case SAMPLE_ALAW: +        case SAMPLE_ULAW: +            c = 80; +            break; +    } +                 +    memset(p, c, length); +} + +size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume) { +    unsigned c, d; +    assert(channels && data && length && spec); +    assert(spec->format == SAMPLE_S16NE); + +    for (d = 0;; d += sizeof(int16_t)) { +        int32_t sum = 0; + +        if (d >= length) +            return d; +         +        for (c = 0; c < nchannels; c++) { +            int32_t v; +            uint8_t volume = channels[c].volume; +             +            if (d >= channels[c].chunk.length) +                return d; + +            if (volume == 0) +                v = 0; +            else { +                v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d)); + +                if (volume != 0xFF) +                    v = v*volume/0xFF; +            } + +            sum += v; +        } + +        if (volume == 0) +            sum = 0; +        else if (volume != 0xFF) +            sum = sum*volume/0xFF; +         +        if (sum < -0x8000) sum = -0x8000; +        if (sum > 0x7FFF) sum = 0x7FFF; +         +        *((int16_t*) data) = sum; +        data += sizeof(int16_t); +    } +} diff --git a/src/sample-util.h b/src/sample-util.h new file mode 100644 index 00000000..0a3f7c89 --- /dev/null +++ b/src/sample-util.h @@ -0,0 +1,23 @@ +#ifndef foosampleutilhfoo +#define foosampleutilhfoo + +#include "sample.h" +#include "memblock.h" + +#define DEFAULT_SAMPLE_SPEC default_sample_spec + +extern struct pa_sample_spec default_sample_spec; + +struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec); +void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec); +void silence_memory(void *p, size_t length, struct pa_sample_spec *spec); + +struct mix_info { +    struct memchunk chunk; +    uint8_t volume; +    void *userdata; +}; + +size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume); + +#endif diff --git a/src/sample.c b/src/sample.c index c270f255..2454630c 100644 --- a/src/sample.c +++ b/src/sample.c @@ -1,50 +1,8 @@ -#include <string.h>  #include <assert.h>  #include "sample.h" -struct sample_spec default_sample_spec = { -    .format = SAMPLE_S16NE, -    .rate = 44100, -    .channels = 2 -}; - -struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec) { -    assert(b && b->data && spec); -    memblock_assert_exclusive(b); -    silence_memory(b->data, b->length, spec); -    return b; -} - -void silence_memchunk(struct memchunk *c, struct sample_spec *spec) { -    assert(c && c->memblock && c->memblock->data && spec && c->length); -    memblock_assert_exclusive(c->memblock); -    silence_memory(c->memblock->data+c->index, c->length, spec); -} - -void silence_memory(void *p, size_t length, struct sample_spec *spec) { -    char c = 0; -    assert(p && length && spec); - -    switch (spec->format) { -        case SAMPLE_U8: -            c = 127; -            break; -        case SAMPLE_S16LE: -        case SAMPLE_S16BE: -        case SAMPLE_FLOAT32: -            c = 0; -            break; -        case SAMPLE_ALAW: -        case SAMPLE_ULAW: -            c = 80; -            break; -    } -                 -    memset(p, c, length); -} - -size_t sample_size(struct sample_spec *spec) { +size_t pa_sample_size(struct pa_sample_spec *spec) {      assert(spec);      size_t b = 1; @@ -66,56 +24,14 @@ size_t sample_size(struct sample_spec *spec) {      return b * spec->channels;  } -size_t bytes_per_second(struct sample_spec *spec) { +size_t pa_bytes_per_second(struct pa_sample_spec *spec) {      assert(spec); -    return spec->rate*sample_size(spec); +    return spec->rate*pa_sample_size(spec);  } -size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume) { -    unsigned c, d; -    assert(channels && data && length && spec); -    assert(spec->format == SAMPLE_S16NE); - -    for (d = 0;; d += sizeof(int16_t)) { -        int32_t sum = 0; - -        if (d >= length) -            return d; -         -        for (c = 0; c < nchannels; c++) { -            int32_t v; -            uint8_t volume = channels[c].volume; -             -            if (d >= channels[c].chunk.length) -                return d; - -            if (volume == 0) -                v = 0; -            else { -                v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d)); - -                if (volume != 0xFF) -                    v = v*volume/0xFF; -            } - -            sum += v; -        } - -        if (volume == 0) -            sum = 0; -        else if (volume != 0xFF) -            sum = sum*volume/0xFF; -         -        if (sum < -0x8000) sum = -0x8000; -        if (sum > 0x7FFF) sum = 0x7FFF; -         -        *((int16_t*) data) = sum; -        data += sizeof(int16_t); -    } -} -uint32_t samples_usec(size_t length, struct sample_spec *spec) { +uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec) {      assert(spec); -    return (uint32_t) (((double) length /sample_size(spec))/spec->rate*1000000); +    return (uint32_t) (((double) length /pa_sample_size(spec))/spec->rate*1000000);  } diff --git a/src/sample.h b/src/sample.h index b2f13cc4..a4a973bf 100644 --- a/src/sample.h +++ b/src/sample.h @@ -2,10 +2,9 @@  #define foosamplehfoo  #include <inttypes.h> +#include <sys/types.h> -#include "memblock.h" - -enum sample_format { +enum pa_sample_format {      SAMPLE_U8,      SAMPLE_ALAW,      SAMPLE_ULAW, @@ -16,30 +15,14 @@ enum sample_format {  #define SAMPLE_S16NE SAMPLE_S16LE -struct sample_spec { -    enum sample_format format; +struct pa_sample_spec { +    enum pa_sample_format format;      uint32_t rate;      uint8_t channels;  }; -#define DEFAULT_SAMPLE_SPEC default_sample_spec - -extern struct sample_spec default_sample_spec; - -struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec); -void silence_memchunk(struct memchunk *c, struct sample_spec *spec); -void silence_memory(void *p, size_t length, struct sample_spec *spec); - -struct mix_info { -    struct memchunk chunk; -    uint8_t volume; -    void *userdata; -}; - -size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume); - -size_t bytes_per_second(struct sample_spec *spec); -size_t sample_size(struct sample_spec *spec); -uint32_t samples_usec(size_t length, struct sample_spec *spec); +size_t pa_bytes_per_second(struct pa_sample_spec *spec); +size_t pa_sample_size(struct pa_sample_spec *spec); +uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec);  #endif diff --git a/src/simple.c b/src/simple.c new file mode 100644 index 00000000..a90d22bd --- /dev/null +++ b/src/simple.c @@ -0,0 +1,120 @@ +#include "simple.h" +#include "polyp.h" +#include "mainloop.h" + +struct pa_simple { +    struct mainloop *mainloop; +    struct pa_context *context; +    struct pa_stream *stream; + +    size_t requested; +    int dead; +}; + +static void playback_callback(struct pa_stream *p, size_t length, void *userdata) { +    struct pa_stream *sp = userdata; +    assert(p && length && sp); + +    sp->requested = length; +} + +struct pa_simple* pa_simple_new( +    const char *server, +    const char *name, +    enum pa_stream_direction dir, +    const char *dev, +    const char *stream_name, +    const struct pa_sample_spec *ss, +    const struct pa_buffer_attr *attr) { +     +    struct pa_simple *p; +    assert(ss); + +    p = malloc(sizeof(struct pa_simple)); +    assert(p); +    p->context = NULL; +    p->stream = NULL; +    p->mainloop = pa_mainloop_new(); +    assert(p->mainloop); +    p->requested = 0; +    p->dead = 0; + +    if (!(p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), name))) +        goto fail; + +    if (pa_context_connect(c, server, NULL, NULL) < 0) +        goto fail; + +    while (!pa_context_is_ready(c)) { +        if (pa_context_is_dead(c)) +            goto fail; +         +        if (mainloop_iterate(p->mainloop) < 0) +            goto fail; +    } + +    if (!(p->stream = pa_stream_new(p->context, dir, sink, stream_name, ss, attr, NULL, NULL))) +        goto fail; + +    while (!pa_stream_is_ready(c)) { +        if (pa_stream_is_dead(c)) +            goto fail; + +        if (mainloop_iterate(p->mainloop) < 0) +            goto fail; +    } + +    pa_stream_set_write_callback(p->stream, playback_callback, p); + +    return p; +     +fail: +    pa_simple_free(p); +    return NULL; +} + +void pa_simple_free(struct pa_simple *s) { +    assert(s); + +    if (s->stream) +        pa_stream_free(s->stream); +     +    if (s->context) +        pa_context_free(s->context); + +    if (s->mainloop) +        mainloop_free(s->mainloop); + +    free(s); +} + +int pa_simple_write(struct pa_simple *s, const void*data, size_t length) { +    assert(s && data); + +    while (length > 0) { +        size_t l; +         +        while (!s->requested) { +            if (pa_context_is_dead(c)) +                return -1; +             +            if (mainloop_iterate(s->mainloop) < 0) +                return -1; +        } + +        l = length; +        if (l > s->requested) +            l = s->requested; + +        pa_stream_write(s->stream, data, l); +        data += l; +        length -= l; +        s->requested = -l; +    } + +    return 0; +} + +int pa_simple_read(struct pa_simple *s, const void*data, size_t length) { +    assert(0); +} diff --git a/src/simple.h b/src/simple.h new file mode 100644 index 00000000..80693056 --- /dev/null +++ b/src/simple.h @@ -0,0 +1,25 @@ +#ifndef foosimplehfoo +#define foosimplehfoo + +#include <sys/types.h> + +#include "sample.h" +#include "polypdef.h" + +struct pa_simple; + +struct pa_simple* pa_simple_new( +    const char *server, +    const char *name, +    enum pa_stream_direction dir, +    const char *dev, +    const char *stream_name, +    const struct pa_sample_spec *ss, +    const struct pa_buffer_attr *attr); + +void pa_simple_free(struct pa_simple *s); + +int pa_simple_write(struct pa_simple *s, const void*data, size_t length); +int pa_simple_read(struct pa_simple *s, const void*data, size_t length); + +#endif @@ -6,10 +6,11 @@  #include "sink.h"  #include "sinkinput.h"  #include "strbuf.h" +#include "sample-util.h"  #define MAX_MIX_CHANNELS 32 -struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) { +struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {      struct sink *s;      char *n = NULL;      int r; @@ -15,7 +15,7 @@ struct sink {      char *name;      struct core *core; -    struct sample_spec sample_spec; +    struct pa_sample_spec sample_spec;      struct idxset *inputs;      struct source *monitor_source; @@ -27,7 +27,7 @@ struct sink {      void *userdata;  }; -struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec); +struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec);  void sink_free(struct sink* s);  int sink_render(struct sink*s, size_t length, struct memchunk *result); diff --git a/src/sinkinput.c b/src/sinkinput.c index 2e6a8c36..b81c9c71 100644 --- a/src/sinkinput.c +++ b/src/sinkinput.c @@ -5,7 +5,7 @@  #include "sinkinput.h"  #include "strbuf.h" -struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name) { +struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name) {      struct sink_input *i;      int r;      assert(s && spec); @@ -14,7 +14,7 @@ struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, cons      assert(i);      i->name = name ? strdup(name) : NULL;      i->sink = s; -    i->spec = *spec; +    i->sample_spec = *spec;      i->peek = NULL;      i->drop = NULL; diff --git a/src/sinkinput.h b/src/sinkinput.h index 389d832d..f04ecb95 100644 --- a/src/sinkinput.h +++ b/src/sinkinput.h @@ -12,7 +12,7 @@ struct sink_input {      char *name;      struct sink *sink; -    struct sample_spec spec; +    struct pa_sample_spec sample_spec;      uint8_t volume;      int (*peek) (struct sink_input *i, struct memchunk *chunk); @@ -23,7 +23,7 @@ struct sink_input {      void *userdata;  }; -struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name); +struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name);  void sink_input_free(struct sink_input* i);  /* Code that didn't create the input stream should call this function to diff --git a/src/socket-client.c b/src/socket-client.c new file mode 100644 index 00000000..812c43f3 --- /dev/null +++ b/src/socket-client.c @@ -0,0 +1,177 @@ +#include <unistd.h> +#include <stdio.h> +#include <errno.h> +#include <string.h> +#include <assert.h> +#include <stdlib.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include "socket-client.h" +#include "util.h" + +struct socket_client { +    struct pa_mainloop_api *mainloop; +    int fd; + +    void *io_source, *fixed_source; +    void (*callback)(struct socket_client*c, struct iochannel *io, void *userdata); +    void *userdata; +}; + +static struct socket_client*socket_client_new(struct pa_mainloop_api *m) { +    struct socket_client *c; +    assert(m); + +    c = malloc(sizeof(struct socket_client)); +    assert(c); +    c->mainloop = m; +    c->fd = -1; +    c->io_source = c->fixed_source = NULL; +    c->callback = NULL; +    c->userdata = NULL; +    return c; +} + +static void do_call(struct socket_client *c) { +    struct iochannel *io; +    int error, lerror; +    assert(c && c->callback); + +    lerror = sizeof(error); +    if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &error, &lerror) < 0) { +        fprintf(stderr, "getsockopt(): %s\n", strerror(errno)); +        goto failed; +    } + +    if (lerror != sizeof(error)) { +        fprintf(stderr, "getsocktop() returned invalid size.\n"); +        goto failed; +    } + +    if (error != 0) { +        fprintf(stderr, "connect(): %s\n", strerror(error)); +        goto failed; +    } +         +    io = iochannel_new(c->mainloop, c->fd, c->fd); +    assert(io); +    c->fd = -1; +    c->callback(c, io, c->userdata); + +    return; +     +failed: +    close(c->fd); +    c->fd = -1; +    c->callback(c, NULL, c->userdata); +    return; +} + +static void connect_fixed_cb(struct pa_mainloop_api *m, void *id, void *userdata) { +    struct socket_client *c = userdata; +    assert(m && c && c->fixed_source == id); +    m->cancel_fixed(m, c->fixed_source); +    c->fixed_source = NULL; +    do_call(c); +} + +static void connect_io_cb(struct pa_mainloop_api*m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { +    struct socket_client *c = userdata; +    assert(m && c && c->io_source == id && fd >= 0 && events == PA_MAINLOOP_API_IO_EVENT_OUTPUT); +    m->cancel_io(m, c->io_source); +    c->io_source = NULL; +    do_call(c); +} + +static int do_connect(struct socket_client *c, const struct sockaddr *sa, socklen_t len) { +    int r; +    assert(c && sa && len); +     +    make_nonblock_fd(c->fd); +     +    if ((r = connect(c->fd, sa, len)) < 0) { +        if (r != EINPROGRESS) { +            fprintf(stderr, "connect(): %s\n", strerror(errno)); +            return -1; +        } + +        c->io_source = c->mainloop->source_io(c->mainloop, c->fd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, connect_io_cb, c); +        assert(c->io_source); +    } else { +        c->fixed_source = c->mainloop->source_fixed(c->mainloop, connect_fixed_cb, c); +        assert(c->io_source); +    } + +    return 0; +} + +struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) { +    struct socket_client *c; +    struct sockaddr_in sa; + +    c = socket_client_new(m); +    assert(c); + +    if ((c->fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { +        fprintf(stderr, "socket(): %s\n", strerror(errno)); +        goto fail; +    } + +    sa.sin_family = AF_INET; +    sa.sin_port = htons(port); +    sa.sin_addr.s_addr = htonl(address); + +    if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0) +        goto fail; +     +    return c; + +fail: +    socket_client_free(c); +    return NULL; +} + +struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename) { +    struct socket_client *c; +    struct sockaddr_un sa; +     +    c = socket_client_new(m); +    assert(c); + +    if ((c->fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) { +        fprintf(stderr, "socket(): %s\n", strerror(errno)); +        goto fail; +    } + +    sa.sun_family = AF_LOCAL; +    strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1); +    sa.sun_path[sizeof(sa.sun_path) - 1] = 0; +     +    if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0) +        goto fail; +     +    return c; + +fail: +    socket_client_free(c); +    return NULL; +} +     +void socket_client_free(struct socket_client *c) { +    assert(c && c->mainloop); +    if (c->io_source) +        c->mainloop->cancel_io(c->mainloop, c->io_source); +    if (c->fixed_source) +        c->mainloop->cancel_fixed(c->mainloop, c->fixed_source); +    if (c->fd >= 0) +        close(c->fd); +    free(c); +} + +void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata) { +    assert(c); +    c->callback = on_connection; +    c->userdata = userdata; +} diff --git a/src/socket-client.h b/src/socket-client.h new file mode 100644 index 00000000..4de01e34 --- /dev/null +++ b/src/socket-client.h @@ -0,0 +1,17 @@ +#ifndef foosocketclienthfoo +#define foosocketclienthfoo + +#include <inttypes.h> +#include "mainloop-api.h" +#include "iochannel.h" + +struct socket_client; + +struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port); +struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename); + +void socket_client_free(struct socket_client *c); + +void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata); + +#endif diff --git a/src/socket-server.c b/src/socket-server.c index 6ad225e3..87fe1476 100644 --- a/src/socket-server.c +++ b/src/socket-server.c @@ -19,14 +19,15 @@ struct socket_server {      void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata);      void *userdata; -    struct mainloop_source *mainloop_source; +    void *mainloop_source; +    struct pa_mainloop_api *mainloop;  }; -static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) { +static void callback(struct pa_mainloop_api *mainloop, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {      struct socket_server *s = userdata;      struct iochannel *io;      int nfd; -    assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s); +    assert(s && s->mainloop == mainloop && s->mainloop_source == id && id && fd >= 0 && fd == s->fd && events == PA_MAINLOOP_API_IO_EVENT_INPUT);      if ((nfd = accept(fd, NULL, NULL)) < 0) {          fprintf(stderr, "accept(): %s\n", strerror(errno)); @@ -38,12 +39,12 @@ static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event          return;      } -    io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd); +    io = iochannel_new(s->mainloop, nfd, nfd);      assert(io);      s->on_connection(s, io, s->userdata);  } -struct socket_server* socket_server_new(struct mainloop *m, int fd) { +struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd) {      struct socket_server *s;      assert(m && fd >= 0); @@ -54,13 +55,14 @@ struct socket_server* socket_server_new(struct mainloop *m, int fd) {      s->on_connection = NULL;      s->userdata = NULL; -    s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s); +    s->mainloop = m; +    s->mainloop_source = m->source_io(m, fd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, s);      assert(s->mainloop_source);      return s;  } -struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) { +struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename) {      int fd = -1;      struct sockaddr_un sa;      struct socket_server *s; @@ -101,7 +103,7 @@ fail:      return NULL;  } -struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) { +struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) {      int fd = -1;      struct sockaddr_in sa;      int on = 1; @@ -148,7 +150,8 @@ void socket_server_free(struct socket_server*s) {          free(s->filename);      } -    mainloop_source_free(s->mainloop_source); +     +    s->mainloop->cancel_io(s->mainloop, s->mainloop_source);      free(s);  } diff --git a/src/socket-server.h b/src/socket-server.h index 4814fc62..80895f8d 100644 --- a/src/socket-server.h +++ b/src/socket-server.h @@ -2,14 +2,14 @@  #define foosocketserverhfoo  #include <inttypes.h> -#include "mainloop.h" +#include "mainloop-api.h"  #include "iochannel.h"  struct socket_server; -struct socket_server* socket_server_new(struct mainloop *m, int fd); -struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename); -struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port); +struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd); +struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename); +struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);  void socket_server_free(struct socket_server*s); diff --git a/src/source.c b/src/source.c index 44d77532..21ac24f3 100644 --- a/src/source.c +++ b/src/source.c @@ -7,7 +7,7 @@  #include "sourceoutput.h"  #include "strbuf.h" -struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) { +struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {      struct source *s;      int r;      assert(core && spec); diff --git a/src/source.h b/src/source.h index 078fb1c9..04f3984f 100644 --- a/src/source.h +++ b/src/source.h @@ -14,14 +14,14 @@ struct source {      char *name;      struct core *core; -    struct sample_spec sample_spec; +    struct pa_sample_spec sample_spec;      struct idxset *outputs;      void (*notify)(struct source*source);      void *userdata;  }; -struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec); +struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec);  void source_free(struct source *s);  /* Pass a new memory block to all output streams */ diff --git a/src/sourceoutput.c b/src/sourceoutput.c index 8021b522..e2e1dacc 100644 --- a/src/sourceoutput.c +++ b/src/sourceoutput.c @@ -5,7 +5,7 @@  #include "sourceoutput.h"  #include "strbuf.h" -struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name) { +struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name) {      struct source_output *o;      int r;      assert(s && spec); @@ -14,7 +14,7 @@ struct source_output* source_output_new(struct source *s, struct sample_spec *sp      assert(o);      o->name = name ? strdup(name) : NULL;      o->source = s; -    o->spec = *spec; +    o->sample_spec = *spec;      o->push = NULL;      o->kill = NULL; diff --git a/src/sourceoutput.h b/src/sourceoutput.h index 359ff151..50cb9caf 100644 --- a/src/sourceoutput.h +++ b/src/sourceoutput.h @@ -12,7 +12,7 @@ struct source_output {      char *name;      struct source *source; -    struct sample_spec spec; +    struct pa_sample_spec sample_spec;      void (*push)(struct source_output *o, struct memchunk *chunk);      void (*kill)(struct source_output* o); @@ -20,7 +20,7 @@ struct source_output {      void *userdata;  }; -struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name); +struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name);  void source_output_free(struct source_output* o);  void source_output_kill(struct source_output*o); diff --git a/src/tagstruct.c b/src/tagstruct.c index 429dd408..47e17839 100644 --- a/src/tagstruct.c +++ b/src/tagstruct.c @@ -90,7 +90,7 @@ void tagstruct_putu8(struct tagstruct*t, uint8_t c) {      t->length += 2;  } -void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss) { +void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss) {      assert(t && ss);      extend(t, 7);      t->data[t->length] = TAG_SAMPLE_SPEC; @@ -156,7 +156,7 @@ int tagstruct_getu8(struct tagstruct*t, uint8_t *c) {      return 0;  } -int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss) { +int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss) {      assert(t && ss);      if (t->rindex+7 > t->length) diff --git a/src/tagstruct.h b/src/tagstruct.h index 5572c64c..9f6a0bf4 100644 --- a/src/tagstruct.h +++ b/src/tagstruct.h @@ -15,12 +15,12 @@ uint8_t* tagstruct_free_data(struct tagstruct*t, size_t *l);  void tagstruct_puts(struct tagstruct*t, const char *s);  void tagstruct_putu32(struct tagstruct*t, uint32_t i);  void tagstruct_putu8(struct tagstruct*t, uint8_t c); -void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss); +void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss);  int tagstruct_gets(struct tagstruct*t, const char **s);  int tagstruct_getu32(struct tagstruct*t, uint32_t *i);  int tagstruct_getu8(struct tagstruct*t, uint8_t *c); -int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss); +int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss);  int tagstruct_eof(struct tagstruct*t);  const uint8_t* tagstruct_data(struct tagstruct*t, size_t *l); @@ -1,8 +1,10 @@ +- sync() function in native library +- name registrar  - native protocol/library  - simple control protocol: kill client/input/output; set_volume  - resampling  - esound protocol -- config parser +- config parser/cmdline  - record testing  -- 0.1  - optimierung von rebuild_pollfds() diff --git a/src/util.c b/src/util.c new file mode 100644 index 00000000..0383a0ad --- /dev/null +++ b/src/util.c @@ -0,0 +1,62 @@ +#include <assert.h> +#include <string.h> +#include <stdio.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <fcntl.h> + +#include "util.h" + +void make_nonblock_fd(int fd) { +    int v; + +    if ((v = fcntl(fd, F_GETFL)) >= 0) +        if (!(v & O_NONBLOCK)) +            fcntl(fd, F_SETFL, v|O_NONBLOCK); +} + +void peer_to_string(char *c, size_t l, int fd) { +    struct stat st; + +    assert(c && l && fd >= 0); +     +    if (fstat(fd, &st) < 0) { +        snprintf(c, l, "Invalid client fd"); +        return; +    } + +    if (S_ISSOCK(st.st_mode)) { +        union { +            struct sockaddr sa; +            struct sockaddr_in in; +            struct sockaddr_un un; +        } sa; +        socklen_t sa_len = sizeof(sa); +         +        if (getpeername(fd, &sa.sa, &sa_len) >= 0) { + +            if (sa.sa.sa_family == AF_INET) { +                uint32_t ip = ntohl(sa.in.sin_addr.s_addr); +                 +                snprintf(c, l, "TCP/IP client from %i.%i.%i.%i:%u", +                         ip >> 24, +                         (ip >> 16) & 0xFF, +                         (ip >> 8) & 0xFF, +                         ip & 0xFF, +                         ntohs(sa.in.sin_port)); +                return; +            } else if (sa.sa.sa_family == AF_LOCAL) { +                snprintf(c, l, "UNIX client for %s", sa.un.sun_path); +                return; +            } + +        } +        snprintf(c, l, "Unknown network client"); +        return; +    } else if (S_ISCHR(st.st_mode) && (fd == 0 || fd == 1)) { +        snprintf(c, l, "STDIN/STDOUT client"); +        return; +    } + +    snprintf(c, l, "Unknown client"); +} diff --git a/src/util.h b/src/util.h new file mode 100644 index 00000000..830ee2e0 --- /dev/null +++ b/src/util.h @@ -0,0 +1,8 @@ +#ifndef fooutilhfoo +#define fooutilhfoo + +void make_nonblock_fd(int fd); + +void peer_to_string(char *c, size_t l, int fd); + +#endif  | 
