summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbootstrap.sh2
-rw-r--r--configure.ac2
-rw-r--r--src/Makefile.am57
-rw-r--r--src/cli.c12
-rw-r--r--src/core.c2
-rw-r--r--src/core.h6
-rw-r--r--src/iochannel.c52
-rw-r--r--src/iochannel.h6
-rw-r--r--src/main.c36
-rw-r--r--src/mainloop-api.c35
-rw-r--r--src/mainloop-api.h43
-rw-r--r--src/mainloop-signal.c138
-rw-r--r--src/mainloop-signal.h12
-rw-r--r--src/mainloop.c732
-rw-r--r--src/mainloop.h40
-rw-r--r--src/memblockq.c9
-rw-r--r--src/memblockq.h2
-rw-r--r--src/module-oss-mmap.c23
-rw-r--r--src/module-oss.c10
-rw-r--r--src/module-pipe-sink.c20
-rw-r--r--src/module.c2
-rw-r--r--src/oss.c2
-rw-r--r--src/oss.h2
-rw-r--r--src/pacat.c169
-rw-r--r--src/packet.c3
-rw-r--r--src/pdispatch.c149
-rw-r--r--src/pdispatch.h22
-rw-r--r--src/polyp.c451
-rw-r--r--src/polyp.h53
-rw-r--r--src/polypdef.h18
-rw-r--r--src/protocol-cli.c5
-rw-r--r--src/protocol-native-spec.h29
-rw-r--r--src/protocol-native.c228
-rw-r--r--src/protocol-simple.c22
-rw-r--r--src/pstream-util.c35
-rw-r--r--src/pstream-util.h14
-rw-r--r--src/pstream.c34
-rw-r--r--src/pstream.h3
-rw-r--r--src/queue.c7
-rw-r--r--src/sample-util.c88
-rw-r--r--src/sample-util.h23
-rw-r--r--src/sample.c94
-rw-r--r--src/sample.h31
-rw-r--r--src/simple.c120
-rw-r--r--src/simple.h25
-rw-r--r--src/sink.c3
-rw-r--r--src/sink.h4
-rw-r--r--src/sinkinput.c4
-rw-r--r--src/sinkinput.h4
-rw-r--r--src/socket-client.c177
-rw-r--r--src/socket-client.h17
-rw-r--r--src/socket-server.c21
-rw-r--r--src/socket-server.h8
-rw-r--r--src/source.c2
-rw-r--r--src/source.h4
-rw-r--r--src/sourceoutput.c4
-rw-r--r--src/sourceoutput.h4
-rw-r--r--src/tagstruct.c4
-rw-r--r--src/tagstruct.h4
-rw-r--r--src/todo4
-rw-r--r--src/util.c62
-rw-r--r--src/util.h8
62 files changed, 2460 insertions, 742 deletions
diff --git a/bootstrap.sh b/bootstrap.sh
index f26ceb15..c9880d85 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -33,7 +33,7 @@ else
automake -a -c
autoconf -Wall
- ./configure --sysconfdir=/etc "$@"
+ CFLAGS="-g -O0" ./configure --sysconfdir=/etc "$@"
make clean
fi
diff --git a/configure.ac b/configure.ac
index 16376902..3a14a061 100644
--- a/configure.ac
+++ b/configure.ac
@@ -42,7 +42,7 @@ AC_PROG_LIBTOOL
# If using GCC specifiy some additional parameters
if test "x$GCC" = "xyes" ; then
- CFLAGS="$CFLAGS -pipe -Wall -W"
+ CFLAGS="$CFLAGS -pipe -Wall -W -Wno-unused-parameter"
fi
AC_CONFIG_FILES([Makefile src/Makefile])
diff --git a/src/Makefile.am b/src/Makefile.am
index 443a25f2..6ad1488f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -18,31 +18,39 @@
AM_CFLAGS=-ansi -D_GNU_SOURCE
-bin_PROGRAMS = polypaudio
+bin_PROGRAMS = polypaudio pacat
-pkglib_LTLIBRARIES=libprotocol-simple.la module-simple-protocol-tcp.la \
- libsocket-server.la module-pipe-sink.la libpstream.la libiochannel.la \
+pkglib_LTLIBRARIES=libiochannel.la libsocket-server.la libsocket-client.la \
+ libprotocol-simple.la module-simple-protocol-tcp.la \
+ module-pipe-sink.la libpstream.la \
libpacket.la module-oss.la module-oss-mmap.la liboss.la libioline.la \
libcli.la module-cli.la libtokenizer.la libdynarray.la \
module-simple-protocol-unix.la module-cli-protocol-tcp.la \
- libprotocol-cli.la libprotocol-native.la module-native-protocol-tcp.la \
- module-native-protocol-unix.la module-cli-protocol-unix.la libtagstruct.la
+ libprotocol-cli.la module-cli-protocol-unix.la libtagstruct.la \
+ libpdispatch.la libprotocol-native.la libpstream-util.la \
+ module-native-protocol-tcp.la module-native-protocol-unix.la \
+ libpolyp.la
polypaudio_SOURCES = idxset.c idxset.h \
queue.c queue.h \
strbuf.c strbuf.h \
+ main.c main.h \
mainloop.c mainloop.h \
memblock.c memblock.h \
sample.c sample.h \
+ sample-util.c sample-util.h \
memblockq.c memblockq.h \
client.c client.h \
core.c core.h \
- main.c main.h \
sourceoutput.c sourceoutput.h \
sinkinput.c sinkinput.h \
source.c source.h \
sink.c sink.h \
- module.c module.h
+ module.c module.h \
+ mainloop-signal.c mainloop-signal.h \
+ mainloop-api.c mainloop-api.h \
+ util.c util.h
+polypaudio_CFLAGS = $(AM_CFLAGS)
polypaudio_INCLUDES = $(INCLTDL)
polypaudio_LDADD = $(LIBLTDL)
@@ -56,10 +64,22 @@ libsocket_server_la_SOURCES = socket-server.c socket-server.h
libsocket_server_la_LDFLAGS = -avoid-version
libsocket_server_la_LIBADD = libiochannel.la
+libsocket_client_la_SOURCES = socket-client.c socket-client.h
+libsocket_client_la_LDFLAGS = -avoid-version
+libsocket_client_la_LIBADD = libiochannel.la
+
libpstream_la_SOURCES = pstream.c pstream.h
libpstream_la_LDFLAGS = -avoid-version
libpstream_la_LIBADD = libpacket.la
+libpstream_util_la_SOURCES = pstream-util.c pstream-util.h
+libpstream_util_la_LDFLAGS = -avoid-version
+libpstream_util_la_LIBADD = libpstream.la libtagstruct.la
+
+libpdispatch_la_SOURCES = pdispatch.c pdispatch.h
+libpdispatch_la_LDFLAGS = -avoid-version
+libpdispatch_la_LIBADD = libpacket.la libtagstruct.la
+
libiochannel_la_SOURCES = iochannel.c iochannel.h
libiochannel_la_LDFLAGS = -avoid-version
@@ -90,7 +110,7 @@ libprotocol_cli_la_LIBADD = libsocket-server.la libiochannel.la libcli.la
libprotocol_native_la_SOURCES = protocol-native.c protocol-native.h
libprotocol_native_la_LDFLAGS = -avoid-version
-libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la
+libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la libpstream-util.la
libtagstruct_la_SOURCES = tagstruct.c tagstruct.h
libtagstruct_la_LDFLAGS = -avoid-version
@@ -140,3 +160,24 @@ module_oss_mmap_la_LIBADD = libiochannel.la liboss.la
module_cli_la_SOURCES = module-cli.c
module_cli_la_LDFLAGS = -module -avoid-version
module_cli_la_LIBADD = libcli.la libiochannel.la libtokenizer.la
+
+libpolyp_la_SOURCES = polyp.c polyp.h \
+ polypdef.h \
+ tagstruct.c tagstruct.h \
+ iochannel.c iochannel.h \
+ pstream.c pstream.h \
+ pstream-util.c pstream-util.h \
+ pdispatch.c pdispatch.h \
+ protocol-native-spec.h \
+ mainloop-api.c mainloop-api.h \
+ mainloop.c mainloop.h \
+ idxset.c idxset.h \
+ util.c util.h \
+ memblock.c memblock.h \
+ socket-client.c socket-client.h \
+ packet.c packet.h \
+ queue.c queue.h \
+ dynarray.c dynarray.h
+
+pacat_SOURCES = pacat.c
+pacat_LDADD = libpolyp.la
diff --git a/src/cli.c b/src/cli.c
index ec484ace..09162351 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -20,6 +20,8 @@ struct cli {
void (*eof_callback)(struct cli *c, void *userdata);
void *userdata;
+
+ struct client *client;
};
struct command {
@@ -63,6 +65,7 @@ static const struct command commands[] = {
static const char prompt[] = ">>> ";
struct cli* cli_new(struct core *core, struct iochannel *io) {
+ char cname[256];
struct cli *c;
assert(io);
@@ -75,16 +78,21 @@ struct cli* cli_new(struct core *core, struct iochannel *io) {
c->userdata = NULL;
c->eof_callback = NULL;
+ iochannel_peer_to_string(io, cname, sizeof(cname));
+ c->client = client_new(core, "CLI", cname);
+ assert(c->client);
+
ioline_set_callback(c->line, line_callback, c);
ioline_puts(c->line, "Welcome to polypaudio! Use \"help\" for usage information.\n");
ioline_puts(c->line, prompt);
-
+
return c;
}
void cli_free(struct cli *c) {
assert(c);
ioline_free(c->line);
+ client_free(c->client);
free(c);
}
@@ -135,7 +143,7 @@ void cli_set_eof_callback(struct cli *c, void (*cb)(struct cli*c, void *userdata
static void cli_command_exit(struct cli *c, struct tokenizer *t) {
assert(c && c->core && c->core->mainloop && t);
- mainloop_quit(c->core->mainloop, -1);
+ c->core->mainloop->quit(c->core->mainloop, 0);
}
static void cli_command_help(struct cli *c, struct tokenizer *t) {
diff --git a/src/core.c b/src/core.c
index 50248501..d9df38e1 100644
--- a/src/core.c
+++ b/src/core.c
@@ -7,7 +7,7 @@
#include "sink.h"
#include "source.h"
-struct core* core_new(struct mainloop *m) {
+struct core* core_new(struct pa_mainloop_api *m) {
struct core* c;
c = malloc(sizeof(struct core));
assert(c);
diff --git a/src/core.h b/src/core.h
index f6f794b9..8c4c6233 100644
--- a/src/core.h
+++ b/src/core.h
@@ -2,17 +2,17 @@
#define foocorehfoo
#include "idxset.h"
-#include "mainloop.h"
+#include "mainloop-api.h"
struct core {
- struct mainloop *mainloop;
+ struct pa_mainloop_api *mainloop;
struct idxset *clients, *sinks, *sources, *sink_inputs, *source_outputs, *modules;
uint32_t default_source_index, default_sink_index;
};
-struct core* core_new(struct mainloop *m);
+struct core* core_new(struct pa_mainloop_api *m);
void core_free(struct core*c);
#endif
diff --git a/src/iochannel.c b/src/iochannel.c
index f0c4c499..910b7e0b 100644
--- a/src/iochannel.c
+++ b/src/iochannel.c
@@ -4,10 +4,11 @@
#include <unistd.h>
#include "iochannel.h"
+#include "util.h"
struct iochannel {
int ifd, ofd;
- struct mainloop* mainloop;
+ struct pa_mainloop_api* mainloop;
void (*callback)(struct iochannel*io, void *userdata);
void*userdata;
@@ -17,43 +18,45 @@ struct iochannel {
int no_close;
- struct mainloop_source* input_source, *output_source;
+ void* input_source, *output_source;
};
static void enable_mainloop_sources(struct iochannel *io) {
assert(io);
if (io->input_source == io->output_source) {
- enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL;
+ enum pa_mainloop_api_io_events e = PA_MAINLOOP_API_IO_EVENT_NULL;
assert(io->input_source);
if (!io->readable)
- e |= MAINLOOP_IO_EVENT_IN;
+ e |= PA_MAINLOOP_API_IO_EVENT_INPUT;
if (!io->writable)
- e |= MAINLOOP_IO_EVENT_OUT;
+ e |= PA_MAINLOOP_API_IO_EVENT_OUTPUT;
- mainloop_source_io_set_events(io->input_source, e);
+ io->mainloop->enable_io(io->mainloop, io->input_source, e);
} else {
if (io->input_source)
- mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN);
+ io->mainloop->enable_io(io->mainloop, io->input_source, io->readable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_INPUT);
if (io->output_source)
- mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT);
+ io->mainloop->enable_io(io->mainloop, io->output_source, io->writable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_OUTPUT);
}
}
-static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) {
+static void callback(struct pa_mainloop_api* m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
struct iochannel *io = userdata;
int changed = 0;
- assert(s && fd >= 0 && userdata);
+ assert(m && fd >= 0 && events && userdata);
- if ((events & MAINLOOP_IO_EVENT_IN) && !io->readable) {
+ if ((events & PA_MAINLOOP_API_IO_EVENT_INPUT) && !io->readable) {
io->readable = 1;
changed = 1;
+ assert(id == io->input_source);
}
- if ((events & MAINLOOP_IO_EVENT_OUT) && !io->writable) {
+ if ((events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) && !io->writable) {
io->writable = 1;
changed = 1;
+ assert(id == io->output_source);
}
if (changed) {
@@ -64,15 +67,7 @@ static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event ev
}
}
-static void make_nonblock_fd(int fd) {
- int v;
-
- if ((v = fcntl(fd, F_GETFL)) >= 0)
- if (!(v & O_NONBLOCK))
- fcntl(fd, F_SETFL, v|O_NONBLOCK);
-}
-
-struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
+struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd) {
struct iochannel *io;
assert(m && (ifd >= 0 || ofd >= 0));
@@ -90,18 +85,18 @@ struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
if (ifd == ofd) {
assert(ifd >= 0);
make_nonblock_fd(io->ifd);
- io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io);
+ io->input_source = io->output_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_BOTH, callback, io);
} else {
if (ifd >= 0) {
make_nonblock_fd(io->ifd);
- io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io);
+ io->input_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, io);
} else
io->input_source = NULL;
if (ofd >= 0) {
make_nonblock_fd(io->ofd);
- io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io);
+ io->output_source = m->source_io(m, ofd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, callback, io);
} else
io->output_source = NULL;
}
@@ -120,9 +115,9 @@ void iochannel_free(struct iochannel*io) {
}
if (io->input_source)
- mainloop_source_free(io->input_source);
+ io->mainloop->cancel_io(io->mainloop, io->input_source);
if (io->output_source && io->output_source != io->input_source)
- mainloop_source_free(io->output_source);
+ io->mainloop->cancel_io(io->mainloop, io->output_source);
free(io);
}
@@ -172,3 +167,8 @@ void iochannel_set_noclose(struct iochannel*io, int b) {
assert(io);
io->no_close = b;
}
+
+void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l) {
+ assert(io && s && l);
+ peer_to_string(s, l, io->ifd);
+}
diff --git a/src/iochannel.h b/src/iochannel.h
index 8ed8b878..b0465a19 100644
--- a/src/iochannel.h
+++ b/src/iochannel.h
@@ -2,11 +2,11 @@
#define fooiochannelhfoo
#include <sys/types.h>
-#include "mainloop.h"
+#include "mainloop-api.h"
struct iochannel;
-struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd);
+struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd);
void iochannel_free(struct iochannel*io);
ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l);
@@ -19,4 +19,6 @@ void iochannel_set_noclose(struct iochannel*io, int b);
void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata);
+void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l);
+
#endif
diff --git a/src/main.c b/src/main.c
index f35505ec..ef25b5e3 100644
--- a/src/main.c
+++ b/src/main.c
@@ -8,46 +8,52 @@
#include "core.h"
#include "mainloop.h"
#include "module.h"
+#include "mainloop-signal.h"
int stdin_inuse = 0, stdout_inuse = 0;
-static void signal_callback(struct mainloop_source *m, int sig, void *userdata) {
- mainloop_quit(mainloop_source_get_mainloop(m), -1);
+static struct pa_mainloop *mainloop;
+
+static void signal_callback(void *id, int sig, void *userdata) {
+ struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop);
+ m->quit(m, 1);
fprintf(stderr, "main: got signal.\n");
}
int main(int argc, char *argv[]) {
- struct mainloop *m;
struct core *c;
- int r;
+ int r, retval = 0;
r = lt_dlinit();
assert(r == 0);
- m = mainloop_new();
- assert(m);
- c = core_new(m);
- assert(c);
+ mainloop = pa_mainloop_new();
+ assert(mainloop);
- mainloop_source_new_signal(m, SIGINT, signal_callback, NULL);
+ r = pa_signal_init(pa_mainloop_get_api(mainloop));
+ assert(r == 0);
+ pa_signal_register(SIGINT, signal_callback, NULL);
signal(SIGPIPE, SIG_IGN);
+ c = core_new(pa_mainloop_get_api(mainloop));
+ assert(c);
+
module_load(c, "module-oss-mmap", "/dev/dsp1");
module_load(c, "module-pipe-sink", NULL);
module_load(c, "module-simple-protocol-tcp", NULL);
module_load(c, "module-cli", NULL);
fprintf(stderr, "main: mainloop entry.\n");
- while (mainloop_iterate(m, 1) == 0);
-/* fprintf(stderr, "main: %u blocks\n", n_blocks);*/
+ if (pa_mainloop_run(mainloop, &retval) < 0)
+ retval = 1;
fprintf(stderr, "main: mainloop exit.\n");
-
- mainloop_run(m);
core_free(c);
- mainloop_free(m);
+
+ pa_signal_done();
+ pa_mainloop_free(mainloop);
lt_dlexit();
- return 0;
+ return retval;
}
diff --git a/src/mainloop-api.c b/src/mainloop-api.c
new file mode 100644
index 00000000..6caa0c25
--- /dev/null
+++ b/src/mainloop-api.c
@@ -0,0 +1,35 @@
+#include <assert.h>
+#include <stdlib.h>
+#include "mainloop-api.h"
+
+struct once_info {
+ void (*callback)(void *userdata);
+ void *userdata;
+};
+
+static void once_callback(struct pa_mainloop_api *api, void *id, void *userdata) {
+ struct once_info *i = userdata;
+ assert(api && i && i->callback);
+ i->callback(i->userdata);
+ assert(api->cancel_fixed);
+ api->cancel_fixed(api, id);
+ free(i);
+}
+
+void pa_mainloop_api_once(struct pa_mainloop_api* api, void (*callback)(void *userdata), void *userdata) {
+ struct once_info *i;
+ void *id;
+ assert(api && callback);
+
+ i = malloc(sizeof(struct once_info));
+ assert(i);
+ i->callback = callback;
+ i->userdata = userdata;
+
+ assert(api->source_fixed);
+ id = api->source_fixed(api, once_callback, i);
+ assert(id);
+
+ /* Note: if the mainloop is destroyed before once_callback() was called, some memory is leaked. */
+}
+
diff --git a/src/mainloop-api.h b/src/mainloop-api.h
new file mode 100644
index 00000000..96dacc22
--- /dev/null
+++ b/src/mainloop-api.h
@@ -0,0 +1,43 @@
+#ifndef foomainloopapihfoo
+#define foomainloopapihfoo
+
+#include <time.h>
+#include <sys/time.h>
+
+enum pa_mainloop_api_io_events {
+ PA_MAINLOOP_API_IO_EVENT_NULL = 0,
+ PA_MAINLOOP_API_IO_EVENT_INPUT = 1,
+ PA_MAINLOOP_API_IO_EVENT_OUTPUT = 2,
+ PA_MAINLOOP_API_IO_EVENT_BOTH = 3
+};
+
+struct pa_mainloop_api {
+ void *userdata;
+
+ /* IO sources */
+ void* (*source_io)(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata);
+ void (*enable_io)(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events);
+ void (*cancel_io)(struct pa_mainloop_api*a, void* id);
+
+ /* Fixed sources */
+ void* (*source_fixed)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata);
+ void (*enable_fixed)(struct pa_mainloop_api*a, void* id, int b);
+ void (*cancel_fixed)(struct pa_mainloop_api*a, void* id);
+
+ /* Idle sources */
+ void* (*source_idle)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata);
+ void (*enable_idle)(struct pa_mainloop_api*a, void* id, int b);
+ void (*cancel_idle)(struct pa_mainloop_api*a, void* id);
+
+ /* Time sources */
+ void* (*source_time)(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata);
+ void (*enable_time)(struct pa_mainloop_api*a, void *id, const struct timeval *tv);
+ void (*cancel_time)(struct pa_mainloop_api*a, void* id);
+
+ /* Exit mainloop */
+ void (*quit)(struct pa_mainloop_api*a, int retval);
+};
+
+void pa_mainloop_api_once(struct pa_mainloop_api*m, void (*callback)(void *userdata), void *userdata);
+
+#endif
diff --git a/src/mainloop-signal.c b/src/mainloop-signal.c
new file mode 100644
index 00000000..dcc72f69
--- /dev/null
+++ b/src/mainloop-signal.c
@@ -0,0 +1,138 @@
+#include <stdio.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "mainloop-signal.h"
+#include "util.h"
+
+struct signal_info {
+ int sig;
+ struct sigaction saved_sigaction;
+ void (*callback) (void *id, int signal, void *userdata);
+ void *userdata;
+ struct signal_info *previous, *next;
+};
+
+static struct pa_mainloop_api *api = NULL;
+static int signal_pipe[2] = { -1, -1 };
+static void* mainloop_source = NULL;
+static struct signal_info *signals = NULL;
+
+static void signal_handler(int sig) {
+ write(signal_pipe[1], &sig, sizeof(sig));
+}
+
+static void callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
+ assert(a && id && events == PA_MAINLOOP_API_IO_EVENT_INPUT && id == mainloop_source && fd == signal_pipe[0]);
+
+ for (;;) {
+ ssize_t r;
+ int sig;
+ struct signal_info*s;
+
+ if ((r = read(signal_pipe[0], &sig, sizeof(sig))) < 0) {
+ if (errno == EAGAIN)
+ return;
+
+ fprintf(stderr, "signal.c: read(): %s\n", strerror(errno));
+ return;
+ }
+
+ if (r != sizeof(sig)) {
+ fprintf(stderr, "signal.c: short read()\n");
+ return;
+ }
+
+ for (s = signals; s; s = s->next)
+ if (s->sig == sig) {
+ assert(s->callback);
+ s->callback(s, sig, s->userdata);
+ break;
+ }
+ }
+}
+
+int pa_signal_init(struct pa_mainloop_api *a) {
+ assert(a);
+ if (pipe(signal_pipe) < 0) {
+ fprintf(stderr, "pipe() failed: %s\n", strerror(errno));
+ return -1;
+ }
+
+ make_nonblock_fd(signal_pipe[0]);
+ make_nonblock_fd(signal_pipe[1]);
+
+ api = a;
+ mainloop_source = api->source_io(api, signal_pipe[0], PA_MAINLOOP_API_IO_EVENT_INPUT, callback, NULL);
+ assert(mainloop_source);
+ return 0;
+}
+
+void pa_signal_done(void) {
+ assert(api && signal_pipe[0] >= 0 && signal_pipe[1] >= 0 && mainloop_source);
+
+ api->cancel_io(api, mainloop_source);
+ mainloop_source = NULL;
+
+ close(signal_pipe[0]);
+ close(signal_pipe[1]);
+ signal_pipe[0] = signal_pipe[1] = -1;
+
+ while (signals)
+ pa_signal_unregister(signals);
+
+ api = NULL;
+}
+
+void* pa_signal_register(int sig, void (*callback) (void *id, int signal, void *userdata), void *userdata) {
+ struct signal_info *s = NULL;
+ struct sigaction sa;
+ assert(sig > 0 && callback);
+
+ for (s = signals; s; s = s->next)
+ if (s->sig == sig)
+ goto fail;
+
+ s = malloc(sizeof(struct signal_info));
+ assert(s);
+ s->sig = sig;
+ s->callback = callback;
+ s->userdata = userdata;
+
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_handler = signal_handler;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = SA_RESTART;
+
+ if (sigaction(sig, &sa, &s->saved_sigaction) < 0)
+ goto fail;
+
+ s->previous = NULL;
+ s->next = signals;
+ signals = s;
+
+ return s;
+fail:
+ if (s)
+ free(s);
+ return NULL;
+}
+
+void pa_signal_unregister(void *id) {
+ struct signal_info *s = id;
+ assert(s);
+
+ if (s->next)
+ s->next->previous = s->previous;
+ if (s->previous)
+ s->previous->next = s->next;
+ else
+ signals = s->next;
+
+ sigaction(s->sig, &s->saved_sigaction, NULL);
+ free(s);
+}
diff --git a/src/mainloop-signal.h b/src/mainloop-signal.h
new file mode 100644
index 00000000..e3e2364d
--- /dev/null
+++ b/src/mainloop-signal.h
@@ -0,0 +1,12 @@
+#ifndef foomainloopsignalhfoo
+#define foomainloopsignalhfoo
+
+#include "mainloop-api.h"
+
+int pa_signal_init(struct pa_mainloop_api *api);
+void pa_signal_done(void);
+
+void* pa_signal_register(int signal, void (*callback) (void *id, int signal, void *userdata), void *userdata);
+void pa_signal_unregister(void *id);
+
+#endif
diff --git a/src/mainloop.c b/src/mainloop.c
index fba0461c..a1758c65 100644
--- a/src/mainloop.c
+++ b/src/mainloop.c
@@ -1,3 +1,4 @@
+#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <sys/poll.h>
@@ -8,468 +9,515 @@
#include <errno.h>
#include "mainloop.h"
+#include "util.h"
+#include "idxset.h"
-struct mainloop_source {
- struct mainloop_source *next;
- struct mainloop *mainloop;
- enum mainloop_source_type type;
-
- int enabled;
+struct mainloop_source_header {
+ struct pa_mainloop *mainloop;
int dead;
- void *userdata;
-
- struct {
- int fd;
- enum mainloop_io_event events;
- void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata);
- struct pollfd pollfd;
- } io;
+};
- struct {
- void (*callback)(struct mainloop_source*s, void *userdata);
- } fixed;
+struct mainloop_source_io {
+ struct mainloop_source_header header;
- struct {
- void (*callback)(struct mainloop_source*s, void *userdata);
- } idle;
-
- struct {
- int sig;
- struct sigaction sigaction;
- void (*callback)(struct mainloop_source*s, int sig, void *userdata);
- } signal;
+ int fd;
+ enum pa_mainloop_api_io_events events;
+ void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata);
+ void *userdata;
+
+ struct pollfd *pollfd;
};
-struct mainloop_source_list {
- struct mainloop_source *sources;
- int n_sources;
- int dead_sources;
+struct mainloop_source_fixed_or_idle {
+ struct mainloop_source_header header;
+ int enabled;
+
+ void (*callback)(struct pa_mainloop_api*a, void *id, void *userdata);
+ void *userdata;
};
-struct mainloop {
- struct mainloop_source_list io_sources, fixed_sources, idle_sources, signal_sources;
+struct mainloop_source_time {
+ struct mainloop_source_header header;
+ int enabled;
+ struct timeval timeval;
+ void (*callback)(struct pa_mainloop_api*a, void *id, const struct timeval*tv, void *userdata);
+ void *userdata;
+};
+
+struct pa_mainloop {
+ struct idxset *io_sources, *fixed_sources, *idle_sources, *time_sources;
+ int io_sources_scan_dead, fixed_sources_scan_dead, idle_sources_scan_dead, time_sources_scan_dead;
+
struct pollfd *pollfds;
- int max_pollfds, n_pollfds;
+ unsigned max_pollfds, n_pollfds;
int rebuild_pollfds;
- int quit;
- int running;
- int signal_pipe[2];
- struct pollfd signal_pollfd;
+ int quit, running, retval;
+ struct pa_mainloop_api api;
};
-static int signal_pipe = -1;
+static void setup_api(struct pa_mainloop *m);
-static void signal_func(int sig) {
- if (signal_pipe >= 0)
- write(signal_pipe, &sig, sizeof(sig));
-}
+struct pa_mainloop *pa_mainloop_new(void) {
+ struct pa_mainloop *m;
-static void make_nonblock(int fd) {
- int v;
-
- if ((v = fcntl(fd, F_GETFL)) >= 0)
- fcntl(fd, F_SETFL, v|O_NONBLOCK);
-}
+ m = malloc(sizeof(struct pa_mainloop));
+ assert(m);
+ m->io_sources = idxset_new(NULL, NULL);
+ m->fixed_sources = idxset_new(NULL, NULL);
+ m->idle_sources = idxset_new(NULL, NULL);
+ m->time_sources = idxset_new(NULL, NULL);
-struct mainloop *mainloop_new(void) {
- int r;
- struct mainloop *m;
+ assert(m->io_sources && m->fixed_sources && m->idle_sources && m->time_sources);
- m = malloc(sizeof(struct mainloop));
- assert(m);
- memset(m, 0, sizeof(struct mainloop));
+ m->io_sources_scan_dead = m->fixed_sources_scan_dead = m->idle_sources_scan_dead = m->time_sources_scan_dead = 0;
+
+ m->pollfds = NULL;
+ m->max_pollfds = m->n_pollfds = m->rebuild_pollfds = 0;
- r = pipe(m->signal_pipe);
- assert(r >= 0 && m->signal_pipe[0] >= 0 && m->signal_pipe[1] >= 0);
+ m->quit = m->running = m->retval = 0;
- make_nonblock(m->signal_pipe[0]);
- make_nonblock(m->signal_pipe[1]);
-
- signal_pipe = m->signal_pipe[1];
- m->signal_pollfd.fd = m->signal_pipe[0];
- m->signal_pollfd.events = POLLIN;
- m->signal_pollfd.revents = 0;
+ setup_api(m);
return m;
}
-static void free_sources(struct mainloop_source_list *l, int all) {
- struct mainloop_source *s, *p;
- assert(l);
+static int foreach(void *p, uint32_t index, int *del, void*userdata) {
+ struct mainloop_source_header *h = p;
+ int *all = userdata;
+ assert(p && del && all);
- if (!all && !l->dead_sources)
- return;
-
- p = NULL;
- s = l->sources;
- while (s) {
- if (all || s->dead) {
- struct mainloop_source *t = s;
- s = s->next;
-
- if (p)
- p->next = s;
- else
- l->sources = s;
-
- free(t);
- } else {
- p = s;
- s = s->next;
- }
+ if (*all || h->dead) {
+ free(h);
+ *del = 1;
}
- l->dead_sources = 0;
-
- if (all) {
- assert(!l->sources);
- l->n_sources = 0;
- }
-}
+ return 0;
+};
-void mainloop_free(struct mainloop* m) {
+void pa_mainloop_free(struct pa_mainloop* m) {
+ int all = 1;
assert(m);
- free_sources(&m->io_sources, 1);
- free_sources(&m->fixed_sources, 1);
- free_sources(&m->idle_sources, 1);
- free_sources(&m->signal_sources, 1);
-
- if (signal_pipe == m->signal_pipe[1])
- signal_pipe = -1;
- close(m->signal_pipe[0]);
- close(m->signal_pipe[1]);
-
+ idxset_foreach(m->io_sources, foreach, &all);
+ idxset_foreach(m->fixed_sources, foreach, &all);
+ idxset_foreach(m->idle_sources, foreach, &all);
+ idxset_foreach(m->time_sources, foreach, &all);
+
+ idxset_free(m->io_sources, NULL, NULL);
+ idxset_free(m->fixed_sources, NULL, NULL);
+ idxset_free(m->idle_sources, NULL, NULL);
+ idxset_free(m->time_sources, NULL, NULL);
+
free(m->pollfds);
free(m);
}
-static void rebuild_pollfds(struct mainloop *m) {
- struct mainloop_source*s;
+static void scan_dead(struct pa_mainloop *m) {
+ int all = 0;
+ assert(m);
+ if (m->io_sources_scan_dead)
+ idxset_foreach(m->io_sources, foreach, &all);
+ if (m->fixed_sources_scan_dead)
+ idxset_foreach(m->fixed_sources, foreach, &all);
+ if (m->idle_sources_scan_dead)
+ idxset_foreach(m->idle_sources, foreach, &all);
+ if (m->time_sources_scan_dead)
+ idxset_foreach(m->time_sources, foreach, &all);
+}
+
+static void rebuild_pollfds(struct pa_mainloop *m) {
+ struct mainloop_source_io*s;
struct pollfd *p;
-
- if (m->max_pollfds < m->io_sources.n_sources+1) {
- m->max_pollfds = (m->io_sources.n_sources+1)*2;
- m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds);
+ uint32_t index = IDXSET_INVALID;
+ unsigned l;
+
+ l = idxset_ncontents(m->io_sources);
+ if (m->max_pollfds < l) {
+ m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*l);
+ m->max_pollfds = l;
}
m->n_pollfds = 0;
p = m->pollfds;
- for (s = m->io_sources.sources; s; s = s->next) {
- assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
- if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) {
- *(p++) = s->io.pollfd;
- m->n_pollfds++;
+ for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) {
+ if (s->header.dead) {
+ s->pollfd = NULL;
+ continue;
}
+
+ s->pollfd = p;
+ p->fd = s->fd;
+ p->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0);
+ p->revents = 0;
+
+ p++;
+ m->n_pollfds++;
}
+}
+
+static void dispatch_pollfds(struct pa_mainloop *m) {
+ uint32_t index = IDXSET_INVALID;
+ struct mainloop_source_io *s;
- *(p++) = m->signal_pollfd;
- m->n_pollfds++;
+ for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) {
+ if (s->header.dead || !s->events || !s->pollfd || !s->pollfd->revents)
+ continue;
+
+ assert(s->pollfd->revents <= s->pollfd->events && s->pollfd->fd == s->fd && s->callback);
+ s->callback(&m->api, s, s->fd, ((s->pollfd->revents & POLLIN) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata);
+ }
}
-static void dispatch_pollfds(struct mainloop *m) {
- int i;
- struct pollfd *p;
- struct mainloop_source *s;
- /* This loop assumes that m->sources and m->pollfds have the same
- * order and that m->pollfds is a subset of m->sources! */
+static void run_fixed_or_idle(struct pa_mainloop *m, struct idxset *i) {
+ uint32_t index = IDXSET_INVALID;
+ struct mainloop_source_fixed_or_idle *s;
- s = m->io_sources.sources;
- for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) {
- if (!p->revents)
+ for (s = idxset_first(i, &index); s; s = idxset_next(i, &index)) {
+ if (s->header.dead || !s->enabled)
continue;
- if (p->fd == m->signal_pipe[0]) {
- /* Event from signal pipe */
+ assert(s->callback);
+ s->callback(&m->api, s, s->userdata);
+ }
+}
+
+static int calc_next_timeout(struct pa_mainloop *m) {
+ uint32_t index = IDXSET_INVALID;
+ struct mainloop_source_time *s;
+ struct timeval now;
+ int t = -1;
- if (p->revents & POLLIN) {
- int sig;
- ssize_t r;
- r = read(m->signal_pipe[0], &sig, sizeof(sig));
- assert((r < 0 && errno == EAGAIN) || r == sizeof(sig));
+ if (idxset_isempty(m->time_sources))
+ return -1;
+
+ gettimeofday(&now, NULL);
+
+ for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) {
+ int tmp;
+
+ if (s->header.dead || !s->enabled)
+ continue;
+
+ if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec))
+ return 0;
+
+ tmp = (s->timeval.tv_sec - now.tv_sec)*1000;
- if (r == sizeof(sig)) {
- struct mainloop_source *l = m->signal_sources.sources;
- while (l) {
- assert(l->type == MAINLOOP_SOURCE_TYPE_SIGNAL);
-
- if (l->signal.sig == sig && l->enabled && !l->dead) {
- assert(l->signal.callback);
- l->signal.callback(l, sig, l->userdata);
- }
-
- l = l->next;
- }
- }
- }
-
- } else {
- /* Event from I/O source */
-
- for (; s; s = s->next) {
- if (p->fd != s->io.fd)
- continue;
-
- assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
-
- if (!s->dead && s->enabled) {
- enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0);
- if (e) {
- assert(s->io.callback);
- s->io.callback(s, s->io.fd, e, s->userdata);
- }
- }
-
- break;
- }
+ if (s->timeval.tv_usec > now.tv_usec)
+ tmp += (s->timeval.tv_usec - now.tv_usec)/1000;
+ else
+ tmp -= (now.tv_usec - s->timeval.tv_usec)/1000;
+
+ if (tmp == 0)
+ return 0;
+ else if (tmp < t)
+ t = tmp;
+ }
+
+ return t;
+}
+
+static void dispatch_timeout(struct pa_mainloop *m) {
+ uint32_t index = IDXSET_INVALID;
+ struct mainloop_source_time *s;
+ struct timeval now;
+ assert(m);
+
+ if (idxset_isempty(m->time_sources))
+ return;
+
+ gettimeofday(&now, NULL);
+ for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) {
+
+ if (s->header.dead || !s->enabled)
+ continue;
+
+ if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec)) {
+ assert(s->callback);
+
+ s->enabled = 0;
+ s->callback(&m->api, s, &s->timeval, s->userdata);
}
}
}
-int mainloop_iterate(struct mainloop *m, int block) {
- struct mainloop_source *s;
- int c;
+static int any_idle_sources(struct pa_mainloop *m) {
+ struct mainloop_source_fixed_or_idle *s;
+ uint32_t index;
+ assert(m);
+
+ for (s = idxset_first(m->idle_sources, &index); s; s = idxset_next(m->idle_sources, &index))
+ if (!s->header.dead && s->enabled)
+ return 1;
+
+ return 0;
+}
+
+int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval) {
+ int r, idle;
assert(m && !m->running);
- if(m->quit)
- return m->quit;
-
- free_sources(&m->io_sources, 0);
- free_sources(&m->fixed_sources, 0);
- free_sources(&m->idle_sources, 0);
- free_sources(&m->signal_sources, 0);
-
- for (s = m->fixed_sources.sources; s; s = s->next) {
- assert(s->type == MAINLOOP_SOURCE_TYPE_FIXED);
- if (!s->dead && s->enabled) {
- assert(s->fixed.callback);
- s->fixed.callback(s, s->userdata);
- }
+ if(m->quit) {
+ if (retval)
+ *retval = m->retval;
+ return 1;
}
+ m->running = 1;
+
+ scan_dead(m);
+ run_fixed_or_idle(m, m->fixed_sources);
+
if (m->rebuild_pollfds) {
rebuild_pollfds(m);
m->rebuild_pollfds = 0;
}
- m->running = 1;
+ idle = any_idle_sources(m);
do {
- c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0);
- } while (c < 0 && errno == EINTR);
-
- if (c > 0)
+ int t;
+
+ if (!block || idle)
+ t = 0;
+ else
+ t = calc_next_timeout(m);
+
+ r = poll(m->pollfds, m->n_pollfds, t);
+ } while (r < 0 && errno == EINTR);
+
+ dispatch_timeout(m);
+
+ if (r > 0)
dispatch_pollfds(m);
- else if (c == 0) {
- for (s = m->idle_sources.sources; s; s = s->next) {
- assert(s->type == MAINLOOP_SOURCE_TYPE_IDLE);
- if (!s->dead && s->enabled) {
- assert(s->idle.callback);
- s->idle.callback(s, s->userdata);
- }
- }
- }
+ else if (r == 0 && idle)
+ run_fixed_or_idle(m, m->idle_sources);
+ else if (r < 0)
+ fprintf(stderr, "select(): %s\n", strerror(errno));
m->running = 0;
- return c < 0 ? -1 : 0;
+ return r < 0 ? -1 : 0;
}
-int mainloop_run(struct mainloop *m) {
+int pa_mainloop_run(struct pa_mainloop *m, int *retval) {
int r;
- while (!(r = mainloop_iterate(m, 1)));
+ while ((r = pa_mainloop_iterate(m, 1, retval)) == 0);
return r;
}
-void mainloop_quit(struct mainloop *m, int r) {
+void pa_mainloop_quit(struct pa_mainloop *m, int r) {
assert(m);
m->quit = r;
}
-static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) {
- struct mainloop_source_list *l;
-
- switch(type) {
- case MAINLOOP_SOURCE_TYPE_IO:
- l = &m->io_sources;
- break;
- case MAINLOOP_SOURCE_TYPE_FIXED:
- l = &m->fixed_sources;
- break;
- case MAINLOOP_SOURCE_TYPE_IDLE:
- l = &m->idle_sources;
- break;
- case MAINLOOP_SOURCE_TYPE_SIGNAL:
- l = &m->signal_sources;
- break;
- default:
- l = NULL;
- break;
- }
-
- return l;
-}
-
-static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) {
- struct mainloop_source_list *l;
- struct mainloop_source* s;
- assert(m);
+/* IO sources */
+static void* mainloop_source_io(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata) {
+ struct pa_mainloop *m;
+ struct mainloop_source_io *s;
+ assert(a && a->userdata && fd >= 0 && callback);
+ m = a->userdata;
+ assert(a == &m->api);
- s = malloc(sizeof(struct mainloop_source));
+ s = malloc(sizeof(struct mainloop_source_io));
assert(s);
- memset(s, 0, sizeof(struct mainloop_source));
+ s->header.mainloop = m;
+ s->header.dead = 0;
- s->type = type;
- s->mainloop = m;
+ s->fd = fd;
+ s->events = events;
+ s->callback = callback;
+ s->userdata = userdata;
+ s->pollfd = NULL;
- l = get_source_list(m, type);
- assert(l);
-
- s->next = l->sources;
- l->sources = s;
- l->n_sources++;
+ idxset_put(m->io_sources, s, NULL);
+ m->rebuild_pollfds = 1;
return s;
}
-struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) {
- struct mainloop_source* s;
- assert(m && fd>=0 && callback);
+static void mainloop_enable_io(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events) {
+ struct pa_mainloop *m;
+ struct mainloop_source_io *s = id;
+ assert(a && a->userdata && s && !s->header.dead);
+ m = a->userdata;
+ assert(a == &m->api && s->header.mainloop == m);
- s = source_new(m, MAINLOOP_SOURCE_TYPE_IO);
+ s->events = events;
+ if (s->pollfd)
+ s->pollfd->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0);
+}
- s->io.fd = fd;
- s->io.events = event;
- s->io.callback = callback;
- s->userdata = userdata;
- s->io.pollfd.fd = fd;
- s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0);
- s->io.pollfd.revents = 0;
+static void mainloop_cancel_io(struct pa_mainloop_api*a, void* id) {
+ struct pa_mainloop *m;
+ struct mainloop_source_io *s = id;
+ assert(a && a->userdata && s && !s->header.dead);
+ m = a->userdata;
+ assert(a == &m->api && s->header.mainloop == m);
- s->enabled = 1;
-
- m->rebuild_pollfds = 1;
- return s;
+ s->header.dead = 1;
+ m->io_sources_scan_dead = 1;
}
-struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
- struct mainloop_source* s;
- assert(m && callback);
+/* Fixed sources */
+static void* mainloop_source_fixed(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) {
+ struct pa_mainloop *m;
+ struct mainloop_source_fixed_or_idle *s;
+ assert(a && a->userdata && callback);
+ m = a->userdata;
+ assert(a == &m->api);
- s = source_new(m, MAINLOOP_SOURCE_TYPE_FIXED);
+ s = malloc(sizeof(struct mainloop_source_fixed_or_idle));
+ assert(s);
+ s->header.mainloop = m;
+ s->header.dead = 0;
- s->fixed.callback = callback;
- s->userdata = userdata;
s->enabled = 1;
+ s->callback = callback;
+ s->userdata = userdata;
+
+ idxset_put(m->fixed_sources, s, NULL);
return s;
}
-struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
- struct mainloop_source* s;
- assert(m && callback);
-
- s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE);
+static void mainloop_enable_fixed(struct pa_mainloop_api*a, void* id, int b) {
+ struct pa_mainloop *m;
+ struct mainloop_source_fixed_or_idle *s = id;
+ assert(a && a->userdata && s && !s->header.dead);
+ m = a->userdata;
+ assert(a == &m->api);
- s->idle.callback = callback;
- s->userdata = userdata;
- s->enabled = 1;
- return s;
+ s->enabled = b;
}
-struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata) {
- struct mainloop_source* s;
- struct sigaction save_sa, sa;
-
- assert(m && callback);
+static void mainloop_cancel_fixed(struct pa_mainloop_api*a, void* id) {
+ struct pa_mainloop *m;
+ struct mainloop_source_fixed_or_idle *s = id;
+ assert(a && a->userdata && s && !s->header.dead);
+ m = a->userdata;
+ assert(a == &m->api);
- memset(&sa, 0, sizeof(sa));
- sa.sa_handler = signal_func;
- sa.sa_flags = SA_RESTART;
- sigemptyset(&sa.sa_mask);
+ s->header.dead = 1;
+ m->fixed_sources_scan_dead = 1;
+}
- memset(&save_sa, 0, sizeof(save_sa));
+/* Idle sources */
+static void* mainloop_source_idle(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) {
+ struct pa_mainloop *m;
+ struct mainloop_source_fixed_or_idle *s;
+ assert(a && a->userdata && callback);
+ m = a->userdata;
+ assert(a == &m->api);
+
+ s = malloc(sizeof(struct mainloop_source_fixed_or_idle));
+ assert(s);
+ s->header.mainloop = m;
+ s->header.dead = 0;
- if (sigaction(sig, &sa, &save_sa) < 0)
- return NULL;
-
- s = source_new(m, MAINLOOP_SOURCE_TYPE_SIGNAL);
- s->signal.sig = sig;
- s->signal.sigaction = save_sa;
-
- s->signal.callback = callback;
- s->userdata = userdata;
s->enabled = 1;
+ s->callback = callback;
+ s->userdata = userdata;
+
+ idxset_put(m->idle_sources, s, NULL);
return s;
}
-void mainloop_source_free(struct mainloop_source*s) {
- struct mainloop_source_list *l;
- assert(s && !s->dead);
- s->dead = 1;
-
- assert(s->mainloop);
- l = get_source_list(s->mainloop, s->type);
- assert(l);
+static void mainloop_cancel_idle(struct pa_mainloop_api*a, void* id) {
+ struct pa_mainloop *m;
+ struct mainloop_source_fixed_or_idle *s = id;
+ assert(a && a->userdata && s && !s->header.dead);
+ m = a->userdata;
+ assert(a == &m->api);
- l->n_sources--;
- l->dead_sources = 1;
-
- if (s->type == MAINLOOP_SOURCE_TYPE_IO)
- s->mainloop->rebuild_pollfds = 1;
- else if (s->type == MAINLOOP_SOURCE_TYPE_SIGNAL)
- sigaction(s->signal.sig, &s->signal.sigaction, NULL);
+ s->header.dead = 1;
+ m->idle_sources_scan_dead = 1;
}
-void mainloop_source_enable(struct mainloop_source*s, int b) {
- assert(s && !s->dead);
+/* Time sources */
+static void* mainloop_source_time(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata) {
+ struct pa_mainloop *m;
+ struct mainloop_source_time *s;
+ assert(a && a->userdata && callback);
+ m = a->userdata;
+ assert(a == &m->api);
- if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) {
- assert(s->mainloop);
- s->mainloop->rebuild_pollfds = 1;
- }
+ s = malloc(sizeof(struct mainloop_source_time));
+ assert(s);
+ s->header.mainloop = m;
+ s->header.dead = 0;
- s->enabled = b;
-}
+ s->enabled = !!tv;
+ if (tv)
+ s->timeval = *tv;
-void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) {
- assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO);
+ s->callback = callback;
+ s->userdata = userdata;
- if (s->io.events != events) {
- assert(s->mainloop);
- s->mainloop->rebuild_pollfds = 1;
- }
+ idxset_put(m->time_sources, s, NULL);
+ return s;
+}
- s->io.events = events;
- s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0);
+static void mainloop_enable_time(struct pa_mainloop_api*a, void *id, const struct timeval *tv) {
+ struct pa_mainloop *m;
+ struct mainloop_source_time *s = id;
+ assert(a && a->userdata && s && !s->header.dead);
+ m = a->userdata;
+ assert(a == &m->api);
+
+ if (tv) {
+ s->enabled = 1;
+ s->timeval = *tv;
+ } else
+ s->enabled = 0;
}
-struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) {
- assert(s);
+static void mainloop_cancel_time(struct pa_mainloop_api*a, void* id) {
+ struct pa_mainloop *m;
+ struct mainloop_source_time *s = id;
+ assert(a && a->userdata && s && !s->header.dead);
+ m = a->userdata;
+ assert(a == &m->api);
+
+ s->header.dead = 1;
+ m->time_sources_scan_dead = 1;
- return s->mainloop;
}
-struct once_info {
- void (*callback)(void *userdata);
- void *userdata;
-};
+static void mainloop_quit(struct pa_mainloop_api*a, int retval) {
+ struct pa_mainloop *m;
+ assert(a && a->userdata);
+ m = a->userdata;
+ assert(a == &m->api);
-static void once_callback(struct mainloop_source *s, void *userdata) {
- struct once_info *i = userdata;
- assert(s && i && i->callback);
- i->callback(i->userdata);
- mainloop_source_free(s);
- free(i);
+ m->quit = 1;
+ m->retval = retval;
}
+
+static void setup_api(struct pa_mainloop *m) {
+ assert(m);
+
+ m->api.userdata = m;
+ m->api.source_io = mainloop_source_io;
+ m->api.enable_io = mainloop_enable_io;
+ m->api.cancel_io = mainloop_cancel_io;
+
+ m->api.source_fixed = mainloop_source_fixed;
+ m->api.enable_fixed = mainloop_enable_fixed;
+ m->api.cancel_fixed = mainloop_cancel_fixed;
+
+ m->api.source_idle = mainloop_source_idle;
+ m->api.enable_idle = mainloop_enable_fixed; /* (!) */
+ m->api.cancel_idle = mainloop_cancel_idle;
+
+ m->api.source_time = mainloop_source_time;
+ m->api.enable_time = mainloop_enable_time;
+ m->api.cancel_time = mainloop_cancel_time;
-void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata) {
- struct once_info *i;
- assert(m && callback);
+ m->api.quit = mainloop_quit;
+}
- i = malloc(sizeof(struct once_info));
- assert(i);
- i->callback = callback;
- i->userdata = userdata;
-
- mainloop_source_new_fixed(m, once_callback, i);
+struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m) {
+ assert(m);
+ return &m->api;
}
+
diff --git a/src/mainloop.h b/src/mainloop.h
index c1bfc62a..7837636f 100644
--- a/src/mainloop.h
+++ b/src/mainloop.h
@@ -1,42 +1,16 @@
#ifndef foomainloophfoo
#define foomainloophfoo
-struct mainloop;
-struct mainloop_source;
+#include "mainloop-api.h"
-enum mainloop_io_event {
- MAINLOOP_IO_EVENT_NULL = 0,
- MAINLOOP_IO_EVENT_IN = 1,
- MAINLOOP_IO_EVENT_OUT = 2,
- MAINLOOP_IO_EVENT_BOTH = 3
-};
+struct pa_mainloop;
-enum mainloop_source_type {
- MAINLOOP_SOURCE_TYPE_IO,
- MAINLOOP_SOURCE_TYPE_FIXED,
- MAINLOOP_SOURCE_TYPE_IDLE,
- MAINLOOP_SOURCE_TYPE_SIGNAL
-};
+struct pa_mainloop *pa_mainloop_new(void);
+void pa_mainloop_free(struct pa_mainloop* m);
-struct mainloop *mainloop_new(void);
-void mainloop_free(struct mainloop* m);
+int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval);
+int pa_mainloop_run(struct pa_mainloop *m, int *retval);
-int mainloop_iterate(struct mainloop *m, int block);
-int mainloop_run(struct mainloop *m);
-void mainloop_quit(struct mainloop *m, int r);
-
-struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata);
-struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
-struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
-struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata);
-
-void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata);
-
-void mainloop_source_free(struct mainloop_source*s);
-void mainloop_source_enable(struct mainloop_source*s, int b);
-
-void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event);
-
-struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s);
+struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m);
#endif
diff --git a/src/memblockq.c b/src/memblockq.c
index 10c59e50..9a601c3a 100644
--- a/src/memblockq.c
+++ b/src/memblockq.c
@@ -220,3 +220,12 @@ uint32_t memblockq_get_length(struct memblockq *bq) {
assert(bq);
return bq->total_length;
}
+
+uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen) {
+ assert(bq && qlen);
+
+ if (bq->total_length >= qlen)
+ return 0;
+
+ return qlen - bq->total_length;
+}
diff --git a/src/memblockq.h b/src/memblockq.h
index 0a68ddaf..a681ff08 100644
--- a/src/memblockq.h
+++ b/src/memblockq.h
@@ -25,4 +25,6 @@ int memblockq_is_writable(struct memblockq *bq, size_t length);
uint32_t memblockq_get_delay(struct memblockq *bq);
uint32_t memblockq_get_length(struct memblockq *bq);
+uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen);
+
#endif
diff --git a/src/module-oss-mmap.c b/src/module-oss-mmap.c
index bfc3a6fa..b70ea6bd 100644
--- a/src/module-oss-mmap.c
+++ b/src/module-oss-mmap.c
@@ -16,15 +16,15 @@
#include "source.h"
#include "module.h"
#include "oss.h"
+#include "sample-util.h"
struct userdata {
struct sink *sink;
struct source *source;
struct core *core;
- struct sample_spec sample_spec;
+ struct pa_sample_spec sample_spec;
- size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, sample_size, out_fill;
- uint32_t sample_usec;
+ size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, out_fill;
int fd;
@@ -161,12 +161,14 @@ static void do_read(struct userdata *u) {
in_clear_memblocks(u, u->in_fragments/2);
};
-static void io_callback(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata) {
+static void io_callback(struct pa_mainloop_api *m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
struct userdata *u = userdata;
- if (event & MAINLOOP_IO_EVENT_IN)
+ assert (u && u->core->mainloop == m && u->mainloop_source == id);
+
+ if (events & PA_MAINLOOP_API_IO_EVENT_INPUT)
do_read(u);
- if (event & MAINLOOP_IO_EVENT_OUT)
+ if (events & PA_MAINLOOP_API_IO_EVENT_OUTPUT)
do_write(u);
}
@@ -175,7 +177,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {
assert(s && u);
do_write(u);
- return u->out_fill/u->sample_size*u->sample_usec;
+ return pa_samples_usec(u->out_fill, &s->sample_spec);
}
int module_init(struct core *c, struct module*m) {
@@ -316,10 +318,7 @@ int module_init(struct core *c, struct module*m) {
assert(u->source || u->sink);
- u->sample_size = sample_size(&u->sample_spec);
- u->sample_usec = 1000000/u->sample_spec.rate;
-
- u->mainloop_source = mainloop_source_new_io(c->mainloop, u->fd, (u->source ? MAINLOOP_IO_EVENT_IN : 0) | (u->sink ? MAINLOOP_IO_EVENT_OUT : 0), io_callback, u);
+ u->mainloop_source = c->mainloop->source_io(c->mainloop, u->fd, (u->source ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | (u->sink ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), io_callback, u);
assert(u->mainloop_source);
return 0;
@@ -360,7 +359,7 @@ void module_done(struct core *c, struct module*m) {
source_free(u->source);
if (u->mainloop_source)
- mainloop_source_free(u->mainloop_source);
+ u->core->mainloop->cancel_io(u->core->mainloop, u->mainloop_source);
if (u->fd >= 0)
close(u->fd);
diff --git a/src/module-oss.c b/src/module-oss.c
index fab5c8c5..4f307545 100644
--- a/src/module-oss.c
+++ b/src/module-oss.c
@@ -15,7 +15,7 @@
#include "source.h"
#include "module.h"
#include "oss.h"
-#include "sample.h"
+#include "sample-util.h"
struct userdata {
struct sink *sink;
@@ -81,7 +81,7 @@ static void do_read(struct userdata *u) {
return;
}
- assert(r <= memchunk.memblock->length);
+ assert(r <= (ssize_t) memchunk.memblock->length);
memchunk.length = memchunk.memblock->length = r;
memchunk.index = 0;
@@ -107,7 +107,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {
return 0;
}
- return samples_usec(arg, &s->sample_spec);
+ return pa_samples_usec(arg, &s->sample_spec);
}
int module_init(struct core *c, struct module*m) {
@@ -117,7 +117,7 @@ int module_init(struct core *c, struct module*m) {
int fd = -1;
int frag_size, in_frag_size, out_frag_size;
int mode;
- struct sample_spec ss;
+ struct pa_sample_spec ss;
assert(c && m);
p = m->argument ? m->argument : "/dev/dsp";
@@ -203,7 +203,7 @@ int module_init(struct core *c, struct module*m) {
u->memchunk.memblock = NULL;
u->memchunk.length = 0;
- u->sample_size = sample_size(&ss);
+ u->sample_size = pa_sample_size(&ss);
u->out_fragment_size = out_frag_size;
u->in_fragment_size = in_frag_size;
diff --git a/src/module-pipe-sink.c b/src/module-pipe-sink.c
index c0a903e9..9dcf5d23 100644
--- a/src/module-pipe-sink.c
+++ b/src/module-pipe-sink.c
@@ -18,7 +18,8 @@ struct userdata {
struct sink *sink;
struct iochannel *io;
struct core *core;
- struct mainloop_source *mainloop_source;
+ void *mainloop_source;
+ struct pa_mainloop_api *mainloop;
struct memchunk memchunk;
};
@@ -27,7 +28,7 @@ static void do_write(struct userdata *u) {
ssize_t r;
assert(u);
- mainloop_source_enable(u->mainloop_source, 0);
+ u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0);
if (!iochannel_is_writable(u->io))
return;
@@ -57,10 +58,10 @@ static void notify_cb(struct sink*s) {
assert(s && u);
if (iochannel_is_writable(u->io))
- mainloop_source_enable(u->mainloop_source, 1);
+ u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 1);
}
-static void prepare_callback(struct mainloop_source *src, void *userdata) {
+static void fixed_callback(struct pa_mainloop_api *m, void *id, void *userdata) {
struct userdata *u = userdata;
assert(u);
do_write(u);
@@ -77,7 +78,7 @@ int module_init(struct core *c, struct module*m) {
struct stat st;
char *p;
int fd = -1;
- static const struct sample_spec ss = {
+ static const struct pa_sample_spec ss = {
.format = SAMPLE_S16NE,
.rate = 44100,
.channels = 2,
@@ -120,10 +121,11 @@ int module_init(struct core *c, struct module*m) {
u->memchunk.memblock = NULL;
u->memchunk.length = 0;
- u->mainloop_source = mainloop_source_new_fixed(c->mainloop, prepare_callback, u);
+ u->mainloop = c->mainloop;
+ u->mainloop_source = u->mainloop->source_fixed(u->mainloop, fixed_callback, u);
assert(u->mainloop_source);
- mainloop_source_enable(u->mainloop_source, 0);
-
+ u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0);
+
m->userdata = u;
return 0;
@@ -147,7 +149,7 @@ void module_done(struct core *c, struct module*m) {
sink_free(u->sink);
iochannel_free(u->io);
- mainloop_source_free(u->mainloop_source);
+ u->mainloop->cancel_fixed(u->mainloop, u->mainloop_source);
assert(u->filename);
unlink(u->filename);
diff --git a/src/module.c b/src/module.c
index c6de1751..883a22df 100644
--- a/src/module.c
+++ b/src/module.c
@@ -151,5 +151,5 @@ void module_unload_request(struct core *c, struct module *m) {
assert(i);
i->core = c;
i->index = m->index;
- mainloop_once(c->mainloop, module_unload_once_callback, i);
+ pa_mainloop_api_once(c->mainloop, module_unload_once_callback, i);
}
diff --git a/src/oss.c b/src/oss.c
index 7b1315c0..02bf8cd1 100644
--- a/src/oss.c
+++ b/src/oss.c
@@ -7,7 +7,7 @@
#include "oss.h"
-int oss_auto_format(int fd, struct sample_spec *ss) {
+int oss_auto_format(int fd, struct pa_sample_spec *ss) {
int format, channels, speed;
assert(fd >= 0 && ss);
diff --git a/src/oss.h b/src/oss.h
index 35d2dd02..34ac9c66 100644
--- a/src/oss.h
+++ b/src/oss.h
@@ -3,6 +3,6 @@
#include "sample.h"
-int oss_auto_format(int fd, struct sample_spec *ss);
+int oss_auto_format(int fd, struct pa_sample_spec *ss);
#endif
diff --git a/src/pacat.c b/src/pacat.c
new file mode 100644
index 00000000..5f5a373b
--- /dev/null
+++ b/src/pacat.c
@@ -0,0 +1,169 @@
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "polyp.h"
+#include "mainloop.h"
+
+static struct pa_context *context = NULL;
+static struct pa_stream *stream = NULL;
+static struct pa_mainloop_api *mainloop_api = NULL;
+
+static void *buffer = NULL;
+static size_t buffer_length = 0, buffer_index = 0;
+
+static void* stdin_source = NULL;
+
+static void context_die_callback(struct pa_context *c, void *userdata) {
+ assert(c);
+ fprintf(stderr, "Connection to server shut down, exiting.\n");
+ mainloop_api->quit(mainloop_api, 1);
+}
+
+static void stream_die_callback(struct pa_stream *s, void *userdata) {
+ assert(s);
+ fprintf(stderr, "Stream deleted, exiting.\n");
+ mainloop_api->quit(mainloop_api, 1);
+}
+
+static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) {
+ size_t l;
+ assert(s && length);
+
+ mainloop_api->enable_io(mainloop_api, stdin_source, PA_STREAM_PLAYBACK);
+
+ if (!buffer)
+ return;
+
+ assert(buffer_length);
+
+ l = length;
+ if (l > buffer_length)
+ l = buffer_length;
+
+ pa_stream_write(s, buffer+buffer_index, l);
+ buffer_length -= l;
+ buffer_index += l;
+
+ if (!buffer_length) {
+ free(buffer);
+ buffer = NULL;
+ buffer_index = buffer_length = 0;
+ }
+}
+
+static void stream_complete_callback(struct pa_context*c, struct pa_stream *s, void *userdata) {
+ assert(c);
+
+ if (!s) {
+ fprintf(stderr, "Stream creation failed.\n");
+ mainloop_api->quit(mainloop_api, 1);
+ }
+
+ stream = s;
+ pa_stream_set_die_callback(stream, stream_die_callback, NULL);
+ pa_stream_set_write_callback(stream, stream_write_callback, NULL);
+}
+
+static void context_complete_callback(struct pa_context *c, int success, void *userdata) {
+ static const struct pa_sample_spec ss = {
+ .format = SAMPLE_S16NE,
+ .rate = 44100,
+ .channels = 2
+ };
+
+ assert(c && !stream);
+
+ if (!success) {
+ fprintf(stderr, "Connection failed\n");
+ goto fail;
+ }
+
+ if (pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL) < 0) {
+ fprintf(stderr, "pa_stream_new() failed.\n");
+ goto fail;
+ }
+
+ return;
+
+fail:
+ mainloop_api->quit(mainloop_api, 1);
+}
+
+static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
+ size_t l;
+ ssize_t r;
+ assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
+
+ if (buffer) {
+ mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL);
+ return;
+ }
+
+ if (!(l = pa_stream_writable_size(stream)))
+ l = 4096;
+ buffer = malloc(l);
+ assert(buffer);
+ if ((r = read(fd, buffer, l)) <= 0) {
+ if (r == 0)
+ mainloop_api->quit(mainloop_api, 0);
+ else {
+ fprintf(stderr, "read() failed: %s\n", strerror(errno));
+ mainloop_api->quit(mainloop_api, 1);
+ }
+
+ return;
+ }
+
+ buffer_length = r;
+ buffer_index = 0;
+}
+
+int main(int argc, char *argv[]) {
+ struct pa_mainloop* m;
+ int ret = 1;
+
+ if (!(m = pa_mainloop_new())) {
+ fprintf(stderr, "pa_mainloop_new() failed.\n");
+ goto quit;
+ }
+
+ mainloop_api = pa_mainloop_get_api(m);
+
+ if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) {
+ fprintf(stderr, "source_io() failed.\n");
+ goto quit;
+ }
+
+ if (!(context = pa_context_new(mainloop_api, argv[0]))) {
+ fprintf(stderr, "pa_context_new() failed.\n");
+ goto quit;
+ }
+
+ if (pa_context_connect(context, NULL, context_complete_callback, NULL) < 0) {
+ fprintf(stderr, "pa_context_connext() failed.\n");
+ goto quit;
+ }
+
+ pa_context_set_die_callback(context, context_die_callback, NULL);
+
+ if (pa_mainloop_run(m, &ret) < 0) {
+ fprintf(stderr, "pa_mainloop_run() failed.\n");
+ goto quit;
+ }
+
+quit:
+ if (stream)
+ pa_stream_free(stream);
+ if (context)
+ pa_context_free(context);
+ if (m)
+ pa_mainloop_free(m);
+ if (buffer)
+ free(buffer);
+
+ return ret;
+}
diff --git a/src/packet.c b/src/packet.c
index 47fce919..0f966d9a 100644
--- a/src/packet.c
+++ b/src/packet.c
@@ -16,7 +16,7 @@ struct packet* packet_new(size_t length) {
return p;
}
-struct packet* packet_dynamic(uint8_t* data, size_t length) {
+struct packet* packet_new_dynamic(uint8_t* data, size_t length) {
struct packet *p;
assert(data && length);
p = malloc(sizeof(struct packet));
@@ -26,6 +26,7 @@ struct packet* packet_dynamic(uint8_t* data, size_t length) {
p->length = length;
p->data = data;
p->type = PACKET_DYNAMIC;
+ return p;
}
struct packet* packet_ref(struct packet *p) {
diff --git a/src/pdispatch.c b/src/pdispatch.c
new file mode 100644
index 00000000..48b6639d
--- /dev/null
+++ b/src/pdispatch.c
@@ -0,0 +1,149 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include "pdispatch.h"
+#include "protocol-native-spec.h"
+
+struct reply_info {
+ struct pdispatch *pdispatch;
+ struct reply_info *next, *previous;
+ int (*callback)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+ void *userdata;
+ uint32_t tag;
+ void *mainloop_timeout;
+};
+
+struct pdispatch {
+ struct pa_mainloop_api *mainloop;
+ const struct pdispatch_command *command_table;
+ unsigned n_commands;
+ struct reply_info *replies;
+};
+
+static void reply_info_free(struct reply_info *r) {
+ assert(r && r->pdispatch && r->pdispatch->mainloop);
+ r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout);
+
+ if (r->previous)
+ r->previous->next = r->next;
+ else
+ r->pdispatch->replies = r->next;
+
+ if (r->next)
+ r->next->previous = r->previous;
+
+ free(r);
+}
+
+struct pdispatch* pdispatch_new(struct pa_mainloop_api *mainloop, const struct pdispatch_command*table, unsigned entries) {
+ struct pdispatch *pd;
+ assert(mainloop);
+
+ assert((entries && table) || (!entries && !table));
+
+ pd = malloc(sizeof(struct pdispatch));
+ assert(pd);
+ pd->mainloop = mainloop;
+ pd->command_table = table;
+ pd->n_commands = entries;
+ return pd;
+}
+
+void pdispatch_free(struct pdispatch *pd) {
+ assert(pd);
+ while (pd->replies)
+ reply_info_free(pd->replies);
+ free(pd);
+}
+
+int pdispatch_run(struct pdispatch *pd, struct packet*packet, void *userdata) {
+ uint32_t tag, command;
+ assert(pd && packet);
+ struct tagstruct *ts = NULL;
+ assert(pd && packet && packet->data);
+
+ if (packet->length <= 8)
+ goto fail;
+
+ ts = tagstruct_new(packet->data, packet->length);
+ assert(ts);
+
+ if (tagstruct_getu32(ts, &command) < 0 ||
+ tagstruct_getu32(ts, &tag) < 0)
+ goto fail;
+
+ if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) {
+ struct reply_info *r;
+ int done = 0;
+
+ for (r = pd->replies; r; r = r->next) {
+ if (r->tag == tag) {
+ int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata);
+ reply_info_free(r);
+
+ if (ret < 0)
+ goto fail;
+
+ done = 1;
+ break;
+ }
+ }
+
+ if (!done)
+ goto fail;
+
+ } else if (pd->command_table && command < pd->n_commands) {
+ const struct pdispatch_command *c = pd->command_table+command;
+
+ if (!c->proc)
+ goto fail;
+
+ if (c->proc(pd, command, tag, ts, userdata) < 0)
+ goto fail;
+ } else
+ goto fail;
+
+ tagstruct_free(ts);
+
+ return 0;
+
+fail:
+ if (ts)
+ tagstruct_free(ts);
+
+ fprintf(stderr, "protocol-native: invalid packet.\n");
+ return -1;
+}
+
+static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) {
+ struct reply_info*r = userdata;
+ assert (r && r->mainloop_timeout == id && r->pdispatch && r->pdispatch->mainloop == m && r->callback);
+
+ r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata);
+ reply_info_free(r);
+}
+
+void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata) {
+ struct reply_info *r;
+ struct timeval tv;
+ assert(pd && cb);
+
+ r = malloc(sizeof(struct reply_info));
+ assert(r);
+ r->pdispatch = pd;
+ r->callback = cb;
+ r->userdata = userdata;
+ r->tag = tag;
+
+ gettimeofday(&tv, NULL);
+ tv.tv_sec += timeout;
+
+ r->mainloop_timeout = pd->mainloop->source_time(pd->mainloop, &tv, timeout_callback, r);
+ assert(r->mainloop_timeout);
+
+ r->previous = NULL;
+ r->next = pd->replies;
+ if (r->next)
+ r->next->previous = r;
+ pd->replies = r;
+}
diff --git a/src/pdispatch.h b/src/pdispatch.h
new file mode 100644
index 00000000..466da9a4
--- /dev/null
+++ b/src/pdispatch.h
@@ -0,0 +1,22 @@
+#ifndef foopdispatchhfoo
+#define foopdispatchhfoo
+
+#include <inttypes.h>
+#include "tagstruct.h"
+#include "packet.h"
+#include "mainloop-api.h"
+
+struct pdispatch;
+
+struct pdispatch_command {
+ int (*proc)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+};
+
+struct pdispatch* pdispatch_new(struct pa_mainloop_api *m, const struct pdispatch_command*table, unsigned entries);
+void pdispatch_free(struct pdispatch *pd);
+
+int pdispatch_run(struct pdispatch *pd, struct packet*p, void *userdata);
+
+void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata);
+
+#endif
diff --git a/src/polyp.c b/src/polyp.c
new file mode 100644
index 00000000..fdff7f9c
--- /dev/null
+++ b/src/polyp.c
@@ -0,0 +1,451 @@
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "polyp.h"
+#include "protocol-native-spec.h"
+#include "pdispatch.h"
+#include "pstream.h"
+#include "dynarray.h"
+#include "socket-client.h"
+#include "pstream-util.h"
+
+#define DEFAULT_QUEUE_LENGTH 10240
+#define DEFAULT_MAX_LENGTH 20480
+#define DEFAULT_PREBUF 4096
+#define DEFAULT_TIMEOUT 5
+
+struct pa_context {
+ char *name;
+ struct pa_mainloop_api* mainloop;
+ struct socket_client *client;
+ struct pstream *pstream;
+ struct pdispatch *pdispatch;
+ struct dynarray *streams;
+ struct pa_stream *first_stream;
+ uint32_t ctag;
+ uint32_t errno;
+ enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_READY, CONTEXT_DEAD} state;
+
+ void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
+ void *connect_complete_userdata;
+
+ void (*die_callback)(struct pa_context*c, void *userdata);
+ void *die_userdata;
+};
+
+struct pa_stream {
+ struct pa_context *context;
+ struct pa_stream *next, *previous;
+ uint32_t channel;
+ int channel_valid;
+ enum pa_stream_direction direction;
+ enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
+ uint32_t requested_bytes;
+
+ void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
+ void *read_userdata;
+
+ void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
+ void *write_userdata;
+
+ void (*create_complete_callback)(struct pa_context*c, struct pa_stream *s, void *userdata);
+ void *create_complete_userdata;
+
+ void (*die_callback)(struct pa_stream*c, void *userdata);
+ void *die_userdata;
+};
+
+static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+
+static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
+ [PA_COMMAND_ERROR] = { NULL },
+ [PA_COMMAND_REPLY] = { NULL },
+ [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
+ [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
+ [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
+ [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
+ [PA_COMMAND_EXIT] = { NULL },
+ [PA_COMMAND_REQUEST] = { command_request },
+};
+
+struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
+ assert(mainloop && name);
+ struct pa_context *c;
+ c = malloc(sizeof(struct pa_context));
+ assert(c);
+ c->name = strdup(name);
+ c->mainloop = mainloop;
+ c->client = NULL;
+ c->pstream = NULL;
+ c->pdispatch = NULL;
+ c->streams = dynarray_new();
+ assert(c->streams);
+ c->first_stream = NULL;
+ c->errno = PA_ERROR_OK;
+ c->state = CONTEXT_UNCONNECTED;
+ c->ctag = 0;
+
+ c->connect_complete_callback = NULL;
+ c->connect_complete_userdata = NULL;
+
+ c->die_callback = NULL;
+ c->die_userdata = NULL;
+
+ return c;
+}
+
+void pa_context_free(struct pa_context *c) {
+ assert(c);
+
+ while (c->first_stream)
+ pa_stream_free(c->first_stream);
+
+ if (c->client)
+ socket_client_free(c->client);
+ if (c->pdispatch)
+ pdispatch_free(c->pdispatch);
+ if (c->pstream)
+ pstream_free(c->pstream);
+ if (c->streams)
+ dynarray_free(c->streams, NULL, NULL);
+
+ free(c->name);
+ free(c);
+}
+
+static void stream_dead(struct pa_stream *s) {
+ if (s->state == STREAM_DEAD)
+ return;
+
+ s->state = STREAM_DEAD;
+ if (s->die_callback)
+ s->die_callback(s, s->die_userdata);
+}
+
+static void context_dead(struct pa_context *c) {
+ struct pa_stream *s;
+ assert(c);
+
+ for (s = c->first_stream; s; s = s->next)
+ stream_dead(s);
+
+ if (c->state == CONTEXT_DEAD)
+ return;
+
+ c->state = CONTEXT_DEAD;
+ if (c->die_callback)
+ c->die_callback(c, c->die_userdata);
+}
+
+static void pstream_die_callback(struct pstream *p, void *userdata) {
+ struct pa_context *c = userdata;
+ assert(p && c);
+
+ assert(c->state != CONTEXT_DEAD);
+
+ c->state = CONTEXT_DEAD;
+
+ context_dead(c);
+}
+
+static int pstream_packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
+ struct pa_context *c = userdata;
+ assert(p && packet && c);
+
+ if (pdispatch_run(c->pdispatch, packet, c) < 0) {
+ fprintf(stderr, "polyp.c: invalid packet.\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int pstream_memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
+ struct pa_context *c = userdata;
+ struct pa_stream *s;
+ assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
+
+ if (!(s = dynarray_get(c->streams, channel)))
+ return -1;
+
+ if (s->read_callback)
+ s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
+
+ return 0;
+}
+
+static void on_connection(struct socket_client *client, struct iochannel*io, void *userdata) {
+ struct pa_context *c = userdata;
+ assert(client && io && c && c->state == CONTEXT_CONNECTING);
+
+ socket_client_free(client);
+ c->client = NULL;
+
+ if (!io) {
+ c->errno = PA_ERROR_CONNECTIONREFUSED;
+ c->state = CONTEXT_UNCONNECTED;
+
+ if (c->connect_complete_callback)
+ c->connect_complete_callback(c, 0, c->connect_complete_userdata);
+
+ return;
+ }
+
+ c->pstream = pstream_new(c->mainloop, io);
+ assert(c->pstream);
+ pstream_set_die_callback(c->pstream, pstream_die_callback, c);
+ pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
+ pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
+
+ c->pdispatch = pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
+ assert(c->pdispatch);
+
+ c->state = CONTEXT_READY;
+
+ if (c->connect_complete_callback)
+ c->connect_complete_callback(c, 1, c->connect_complete_userdata);
+}
+
+int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
+ assert(c && c->state == CONTEXT_UNCONNECTED);
+
+ assert(!c->client);
+ if (!(c->client = socket_client_new_unix(c->mainloop, server))) {
+ c->errno = PA_ERROR_CONNECTIONREFUSED;
+ return -1;
+ }
+
+ c->connect_complete_callback = complete;
+ c->connect_complete_userdata = userdata;
+
+ socket_client_set_callback(c->client, on_connection, c);
+ c->state = CONTEXT_CONNECTING;
+
+ return 0;
+}
+
+int pa_context_is_dead(struct pa_context *c) {
+ assert(c);
+ return c->state == CONTEXT_DEAD;
+}
+
+int pa_context_is_ready(struct pa_context *c) {
+ assert(c);
+ return c->state == CONTEXT_READY;
+}
+
+int pa_context_errno(struct pa_context *c) {
+ assert(c);
+ return c->errno;
+}
+
+void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
+ assert(c);
+ c->die_callback = cb;
+ c->die_userdata = userdata;
+}
+
+static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+ struct pa_stream *s;
+ struct pa_context *c = userdata;
+ uint32_t bytes, channel;
+ assert(pd && command == PA_COMMAND_REQUEST && t && s);
+
+ if (tagstruct_getu32(t, &channel) < 0 ||
+ tagstruct_getu32(t, &bytes) < 0 ||
+ tagstruct_eof(t)) {
+ c->errno = PA_ERROR_PROTOCOL;
+ return -1;
+ }
+
+ if (!(s = dynarray_get(c->streams, channel))) {
+ c->errno = PA_ERROR_PROTOCOL;
+ return -1;
+ }
+
+ s->requested_bytes += bytes;
+
+ if (s->requested_bytes && s->write_callback)
+ s->write_callback(s, s->requested_bytes, s->write_userdata);
+
+ return 0;
+}
+
+static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+ int ret = 0;
+ struct pa_stream *s = userdata;
+ assert(pd && s && s->state == STREAM_CREATING);
+
+ if (command != PA_COMMAND_REPLY) {
+ struct pa_context *c = s->context;
+ assert(c);
+
+ if (command == PA_COMMAND_ERROR && tagstruct_getu32(t, &s->context->errno) < 0) {
+ s->context->errno = PA_ERROR_PROTOCOL;
+ ret = -1;
+ } else if (command == PA_COMMAND_TIMEOUT) {
+ s->context->errno = PA_ERROR_TIMEOUT;
+ ret = -1;
+ }
+
+ goto fail;
+ }
+
+ if (tagstruct_getu32(t, &s->channel) < 0 ||
+ tagstruct_eof(t)) {
+ s->context->errno = PA_ERROR_PROTOCOL;
+ ret = -1;
+ goto fail;
+ }
+
+ s->channel_valid = 1;
+ dynarray_put(s->context->streams, s->channel, s);
+
+ s->state = STREAM_READY;
+ assert(s->create_complete_callback);
+ s->create_complete_callback(s->context, s, s->create_complete_userdata);
+ return 0;
+
+fail:
+ assert(s->create_complete_callback);
+ s->create_complete_callback(s->context, NULL, s->create_complete_userdata);
+ pa_stream_free(s);
+ return ret;
+}
+
+int pa_stream_new(
+ struct pa_context *c,
+ enum pa_stream_direction dir,
+ const char *dev,
+ const char *name,
+ const struct pa_sample_spec *ss,
+ const struct pa_buffer_attr *attr,
+ void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
+ void *userdata) {
+
+ struct pa_stream *s;
+ struct tagstruct *t;
+ uint32_t tag;
+
+ assert(c && name && ss && c->state == CONTEXT_READY && complete);
+
+ s = malloc(sizeof(struct pa_stream));
+ assert(s);
+ s->context = c;
+
+ s->read_callback = NULL;
+ s->read_userdata = NULL;
+ s->write_callback = NULL;
+ s->write_userdata = NULL;
+ s->die_callback = NULL;
+ s->die_userdata = NULL;
+ s->create_complete_callback = complete;
+ s->create_complete_userdata = NULL;
+
+ s->state = STREAM_CREATING;
+ s->requested_bytes = 0;
+ s->channel = 0;
+ s->channel_valid = 0;
+ s->direction = dir;
+
+ t = tagstruct_new(NULL, 0);
+ assert(t);
+
+ tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
+ tagstruct_putu32(t, tag = c->ctag++);
+ tagstruct_puts(t, name);
+ tagstruct_put_sample_spec(t, ss);
+ tagstruct_putu32(t, (uint32_t) -1);
+ tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH);
+ tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH);
+ tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF);
+
+ pstream_send_tagstruct(c->pstream, t);
+
+ pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
+
+ s->next = c->first_stream;
+ if (s->next)
+ s->next->previous = s;
+ s->previous = NULL;
+ c->first_stream = s;
+
+ return 0;
+}
+
+void pa_stream_free(struct pa_stream *s) {
+ assert(s && s->context);
+
+ if (s->channel_valid) {
+ struct tagstruct *t = tagstruct_new(NULL, 0);
+ assert(t);
+
+ tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM);
+ tagstruct_putu32(t, s->context->ctag++);
+ tagstruct_putu32(t, s->channel);
+ pstream_send_tagstruct(s->context->pstream, t);
+ }
+
+ if (s->channel_valid)
+ dynarray_put(s->context->streams, s->channel, NULL);
+
+ if (s->next)
+ s->next->previous = s->previous;
+ if (s->previous)
+ s->previous->next = s->next;
+ else
+ s->context->first_stream = s->next;
+
+ free(s);
+}
+
+void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
+ assert(s && cb);
+ s->write_callback = cb;
+ s->write_userdata = userdata;
+}
+
+void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
+ struct memchunk chunk;
+ assert(s && s->context && data && length);
+
+ chunk.memblock = memblock_new(length);
+ assert(chunk.memblock && chunk.memblock->data);
+ memcpy(chunk.memblock->data, data, length);
+ chunk.index = 0;
+ chunk.length = length;
+
+ pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
+
+ if (length < s->requested_bytes)
+ s->requested_bytes -= length;
+ else
+ s->requested_bytes = 0;
+}
+
+size_t pa_stream_writable_size(struct pa_stream *s) {
+ assert(s);
+ return s->requested_bytes;
+}
+
+void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
+ assert(s && cb);
+ s->read_callback = cb;
+ s->read_userdata = userdata;
+}
+
+int pa_stream_is_dead(struct pa_stream *s) {
+ return s->state == STREAM_DEAD;
+}
+
+int pa_stream_is_ready(struct pa_stream*s) {
+ return s->state == STREAM_READY;
+}
+
+void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
+ assert(s);
+ s->die_callback = cb;
+ s->die_userdata = userdata;
+}
diff --git a/src/polyp.h b/src/polyp.h
new file mode 100644
index 00000000..171e3bdf
--- /dev/null
+++ b/src/polyp.h
@@ -0,0 +1,53 @@
+#ifndef foopolyphfoo
+#define foopolyphfoo
+
+#include <sys/types.h>
+
+#include "sample.h"
+#include "polypdef.h"
+#include "mainloop-api.h"
+
+struct pa_context;
+
+struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name);
+
+int pa_context_connect(
+ struct pa_context *c,
+ const char *server,
+ void (*complete) (struct pa_context*c, int success, void *userdata),
+ void *userdata);
+
+void pa_context_free(struct pa_context *c);
+
+void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata);
+
+int pa_context_is_dead(struct pa_context *c);
+int pa_context_is_ready(struct pa_context *c);
+int pa_contect_errno(struct pa_context *c);
+
+struct pa_stream;
+
+int pa_stream_new(
+ struct pa_context *c,
+ enum pa_stream_direction dir,
+ const char *dev,
+ const char *name,
+ const struct pa_sample_spec *ss,
+ const struct pa_buffer_attr *attr,
+ void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
+ void *userdata);
+
+void pa_stream_free(struct pa_stream *p);
+
+void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata);
+
+void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata);
+void pa_stream_write(struct pa_stream *p, const void *data, size_t length);
+size_t pa_stream_writable_size(struct pa_stream *p);
+
+void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata);
+
+int pa_stream_is_dead(struct pa_stream *p);
+int pa_stream_is_ready(struct pa_stream*p);
+
+#endif
diff --git a/src/polypdef.h b/src/polypdef.h
new file mode 100644
index 00000000..aa6e6bf6
--- /dev/null
+++ b/src/polypdef.h
@@ -0,0 +1,18 @@
+#ifndef foopolypdefhfoo
+#define foopolypdefhfoo
+
+#include <inttypes.h>
+
+enum pa_stream_direction {
+ PA_STREAM_PLAYBACK,
+ PA_STREAM_RECORD
+};
+
+struct pa_buffer_attr {
+ uint32_t queue_length;
+ uint32_t max_length;
+ uint32_t prebuf;
+};
+
+
+#endif
diff --git a/src/protocol-cli.c b/src/protocol-cli.c
index c0c93d98..b6460fec 100644
--- a/src/protocol-cli.c
+++ b/src/protocol-cli.c
@@ -12,8 +12,7 @@ struct protocol_cli {
static void cli_eof_cb(struct cli*c, void*userdata) {
struct protocol_cli *p = userdata;
- assert(c && p);
-
+ assert(p);
idxset_remove_by_data(p->connections, c, NULL);
cli_free(c);
}
@@ -22,7 +21,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
struct protocol_cli *p = userdata;
struct cli *c;
assert(s && io && p);
-
+
c = cli_new(p->core, io);
assert(c);
cli_set_eof_callback(c, cli_eof_cb, p);
diff --git a/src/protocol-native-spec.h b/src/protocol-native-spec.h
new file mode 100644
index 00000000..df11ae3c
--- /dev/null
+++ b/src/protocol-native-spec.h
@@ -0,0 +1,29 @@
+#ifndef fooprotocolnativespech
+#define fooprotocolnativespech
+
+enum {
+ PA_COMMAND_ERROR,
+ PA_COMMAND_TIMEOUT, /* pseudo command */
+ PA_COMMAND_REPLY,
+ PA_COMMAND_CREATE_PLAYBACK_STREAM,
+ PA_COMMAND_DELETE_PLAYBACK_STREAM,
+ PA_COMMAND_CREATE_RECORD_STREAM,
+ PA_COMMAND_DELETE_RECORD_STREAM,
+ PA_COMMAND_EXIT,
+ PA_COMMAND_REQUEST,
+ PA_COMMAND_MAX
+};
+
+enum {
+ PA_ERROR_OK,
+ PA_ERROR_ACCESS,
+ PA_ERROR_COMMAND,
+ PA_ERROR_INVALID,
+ PA_ERROR_EXIST,
+ PA_ERROR_NOENTITY,
+ PA_ERROR_CONNECTIONREFUSED,
+ PA_ERROR_PROTOCOL,
+ PA_ERROR_TIMEOUT
+};
+
+#endif
diff --git a/src/protocol-native.c b/src/protocol-native.c
index e9cca7c1..a39880b8 100644
--- a/src/protocol-native.c
+++ b/src/protocol-native.c
@@ -3,34 +3,19 @@
#include <stdlib.h>
#include "protocol-native.h"
+#include "protocol-native-spec.h"
#include "packet.h"
#include "client.h"
#include "sourceoutput.h"
#include "sinkinput.h"
#include "pstream.h"
#include "tagstruct.h"
+#include "pdispatch.h"
+#include "pstream-util.h"
struct connection;
struct protocol_native;
-enum {
- COMMAND_ERROR,
- COMMAND_REPLY,
- COMMAND_CREATE_PLAYBACK_STREAM,
- COMMAND_DELETE_PLAYBACK_STREAM,
- COMMAND_CREATE_RECORD_STREAM,
- COMMAND_DELETE_RECORD_STREAM,
- COMMAND_EXIT,
- COMMAND_MAX
-};
-
-enum {
- ERROR_ACCESS,
- ERROR_COMMAND,
- ERROR_ARGUMENT,
- ERROR_EXIST
-};
-
struct record_stream {
struct connection *connection;
uint32_t index;
@@ -41,6 +26,7 @@ struct record_stream {
struct playback_stream {
struct connection *connection;
uint32_t index;
+ size_t qlength;
struct sink_input *sink_input;
struct memblockq *memblockq;
};
@@ -50,6 +36,7 @@ struct connection {
struct protocol_native *protocol;
struct client *client;
struct pstream *pstream;
+ struct pdispatch *pdispatch;
struct idxset *record_streams, *playback_streams;
};
@@ -60,6 +47,29 @@ struct protocol_native {
struct idxset *connections;
};
+static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk);
+static void sink_input_drop_cb(struct sink_input *i, size_t length);
+static void sink_input_kill_cb(struct sink_input *i);
+static uint32_t sink_input_get_latency_cb(struct sink_input *i);
+
+static void request_bytes(struct playback_stream*s);
+
+static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+
+static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
+ [PA_COMMAND_ERROR] = { NULL },
+ [PA_COMMAND_REPLY] = { NULL },
+ [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
+ [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream },
+ [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
+ [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
+ [PA_COMMAND_EXIT] = { command_exit },
+};
+
+/* structure management */
+
static void record_stream_free(struct record_stream* r) {
assert(r && r->connection);
@@ -69,18 +79,28 @@ static void record_stream_free(struct record_stream* r) {
free(r);
}
-static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct sample_spec *ss, const char *name, size_t maxlength, size_t prebuf) {
+static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) {
struct playback_stream *s;
+ assert(c && sink && s && name && qlen && maxlength && prebuf);
s = malloc(sizeof(struct playback_stream));
assert (s);
s->connection = c;
+ s->qlength = qlen;
+
s->sink_input = sink_input_new(sink, ss, name);
assert(s->sink_input);
- s->memblockq = memblockq_new(maxlength, sample_size(ss), prebuf);
+ s->sink_input->peek = sink_input_peek_cb;
+ s->sink_input->drop = sink_input_drop_cb;
+ s->sink_input->kill = sink_input_kill_cb;
+ s->sink_input->get_latency = sink_input_get_latency_cb;
+ s->sink_input->userdata = s;
+
+ s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf);
assert(s->memblockq);
idxset_put(c->playback_streams, s, &s->index);
+ request_bytes(s);
return s;
}
@@ -99,7 +119,6 @@ static void connection_free(struct connection *c) {
assert(c && c->protocol);
idxset_remove_by_data(c->protocol->connections, c, NULL);
- pstream_free(c->pstream);
while ((r = idxset_first(c->record_streams, NULL)))
record_stream_free(r);
idxset_free(c->record_streams, NULL, NULL);
@@ -108,67 +127,90 @@ static void connection_free(struct connection *c) {
playback_stream_free(p);
idxset_free(c->playback_streams, NULL, NULL);
+ pdispatch_free(c->pdispatch);
+ pstream_free(c->pstream);
client_free(c->client);
free(c);
}
-/*** pstream callbacks ***/
+static void request_bytes(struct playback_stream *s) {
+ struct tagstruct *t;
+ size_t l;
+ assert(s);
-static void send_tagstruct(struct pstream *p, struct tagstruct *t) {
- size_t length;
- uint8_t *data;
- struct packet *packet;
- assert(p && t);
-
- data = tagstruct_free_data(t, &length);
- assert(data && length);
- packet = packet_new_dynamic(data, length);
- assert(packet);
- pstream_send_packet(p, packet);
- packet_unref(packet);
-}
+ if (!(l = memblockq_missing_to(s->memblockq, s->qlength)))
+ return;
-static void send_error(struct pstream *p, uint32_t tag, uint32_t error) {
- struct tagstruct *t = tagstruct_new(NULL, 0);
+ t = tagstruct_new(NULL, 0);
assert(t);
- tagstruct_putu32(t, COMMAND_ERROR);
- tagstruct_putu32(t, tag);
- tagstruct_putu32(t, error);
- send_tagstruct(p, t);
+ tagstruct_putu32(t, PA_COMMAND_REQUEST);
+ tagstruct_putu32(t, s->index);
+ tagstruct_putu32(t, l);
+ pstream_send_tagstruct(s->connection->pstream, t);
}
-static void send_simple_ack(struct pstream *p, uint32_t tag) {
- struct tagstruct *t = tagstruct_new(NULL, 0);
- assert(t);
- tagstruct_putu32(t, COMMAND_REPLY);
- tagstruct_putu32(t, tag);
- send_tagstruct(p, t);
+/*** sinkinput callbacks ***/
+
+static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) {
+ struct playback_stream *s;
+ assert(i && i->userdata && chunk);
+ s = i->userdata;
+
+ if (memblockq_peek(s->memblockq, chunk) < 0)
+ return -1;
+
+ return 0;
}
-struct command {
- int (*func)(struct connection *c, uint32_t tag, struct tagstruct *t);
-};
+static void sink_input_drop_cb(struct sink_input *i, size_t length) {
+ struct playback_stream *s;
+ assert(i && i->userdata && length);
+ s = i->userdata;
-static int command_create_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) {
+ memblockq_drop(s->memblockq, length);
+ request_bytes(s);
+}
+
+static void sink_input_kill_cb(struct sink_input *i) {
struct playback_stream *s;
- size_t maxlength, prebuf;
+ assert(i && i->userdata);
+ s = i->userdata;
+
+ playback_stream_free(s);
+}
+
+static uint32_t sink_input_get_latency_cb(struct sink_input *i) {
+ struct playback_stream *s;
+ assert(i && i->userdata);
+ s = i->userdata;
+
+ return pa_samples_usec(memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
+}
+
+/*** pdispatch callbacks ***/
+
+static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+ struct connection *c = userdata;
+ struct playback_stream *s;
+ size_t maxlength, prebuf, qlength;
uint32_t sink_index;
const char *name;
- struct sample_spec ss;
+ struct pa_sample_spec ss;
struct tagstruct *reply;
struct sink *sink;
assert(c && t && c->protocol && c->protocol->core);
if (tagstruct_gets(t, &name) < 0 ||
tagstruct_get_sample_spec(t, &ss) < 0 ||
- tagstruct_getu32(t, &sink_index) < 0 ||
+ tagstruct_getu32(t, &sink_index) < 0 ||
+ tagstruct_getu32(t, &qlength) < 0 ||
tagstruct_getu32(t, &maxlength) < 0 ||
tagstruct_getu32(t, &prebuf) < 0 ||
!tagstruct_eof(t))
return -1;
if (!c->authorized) {
- send_error(c->pstream, tag, ERROR_ACCESS);
+ pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0;
}
@@ -178,25 +220,28 @@ static int command_create_playback_stream(struct connection *c, uint32_t tag, st
sink = idxset_get_by_index(c->protocol->core->sinks, sink_index);
if (!sink) {
- send_error(c->pstream, tag, ERROR_EXIST);
+ pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
return 0;
}
- if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, prebuf))) {
- send_error(c->pstream, tag, ERROR_ARGUMENT);
+ if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) {
+ pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
return 0;
}
reply = tagstruct_new(NULL, 0);
assert(reply);
- tagstruct_putu32(reply, COMMAND_REPLY);
+ tagstruct_putu32(reply, PA_COMMAND_REPLY);
tagstruct_putu32(reply, tag);
tagstruct_putu32(reply, s->index);
- send_tagstruct(c->pstream, reply);
+ assert(s->sink_input);
+ tagstruct_putu32(reply, s->sink_input->index);
+ pstream_send_tagstruct(c->pstream, reply);
return 0;
}
-static int command_delete_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) {
+static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+ struct connection *c = userdata;
uint32_t channel;
struct playback_stream *s;
assert(c && t);
@@ -206,78 +251,50 @@ static int command_delete_playback_stream(struct connection *c, uint32_t tag, st
return -1;
if (!c->authorized) {
- send_error(c->pstream, tag, ERROR_ACCESS);
+ pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0;
}
if (!(s = idxset_get_by_index(c->playback_streams, channel))) {
- send_error(c->pstream, tag, ERROR_EXIST);
+ pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
return 0;
}
- send_simple_ack(c->pstream, tag);
+ pstream_send_simple_ack(c->pstream, tag);
return 0;
}
-static int command_exit(struct connection *c, uint32_t tag, struct tagstruct *t) {
+static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+ struct connection *c = userdata;
assert(c && t);
if (!tagstruct_eof(t))
return -1;
if (!c->authorized) {
- send_error(c->pstream, tag, ERROR_ACCESS);
+ pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0;
}
- assert(c->protocol && c->protocol->core);
- mainloop_quit(c->protocol->core->mainloop, -1);
- send_simple_ack(c->pstream, tag); /* nonsense */
+ assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
+ c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
+ pstream_send_simple_ack(c->pstream, tag); /* nonsense */
return 0;
}
-static const struct command commands[] = {
- [COMMAND_ERROR] = { NULL },
- [COMMAND_REPLY] = { NULL },
- [COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
- [COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream },
- [COMMAND_CREATE_RECORD_STREAM] = { NULL },
- [COMMAND_DELETE_RECORD_STREAM] = { NULL },
- [COMMAND_EXIT] = { command_exit },
-};
+/*** pstream callbacks ***/
+
static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
struct connection *c = userdata;
- uint32_t tag, command;
- struct tagstruct *ts = NULL;
assert(p && packet && packet->data && c);
- if (packet->length <= 8)
- goto fail;
-
- ts = tagstruct_new(packet->data, packet->length);
- assert(ts);
-
- if (tagstruct_getu32(ts, &command) < 0 ||
- tagstruct_getu32(ts, &tag) < 0)
- goto fail;
-
- if (command >= COMMAND_MAX || !commands[command].func)
- send_error(p, tag, ERROR_COMMAND);
- else if (commands[command].func(c, tag, ts) < 0)
- goto fail;
+ if (pdispatch_run(c->pdispatch, packet, c) < 0) {
+ fprintf(stderr, "protocol-native: invalid packet.\n");
+ return -1;
+ }
- tagstruct_free(ts);
-
return 0;
-
-fail:
- if (ts)
- tagstruct_free(ts);
-
- fprintf(stderr, "protocol-native: invalid packet.\n");
- return -1;
-
}
static int memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
@@ -326,6 +343,9 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
pstream_set_recieve_memblock_callback(c->pstream, memblock_callback, c);
pstream_set_die_callback(c->pstream, die_callback, c);
+ c->pdispatch = pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
+ assert(c->pdispatch);
+
c->record_streams = idxset_new(NULL, NULL);
c->playback_streams = idxset_new(NULL, NULL);
assert(c->record_streams && c->playback_streams);
diff --git a/src/protocol-simple.c b/src/protocol-simple.c
index 8e4246cd..c8c45854 100644
--- a/src/protocol-simple.c
+++ b/src/protocol-simple.c
@@ -9,6 +9,7 @@
#include "sourceoutput.h"
#include "protocol-simple.h"
#include "client.h"
+#include "sample-util.h"
struct connection {
struct protocol_simple *protocol;
@@ -115,9 +116,10 @@ static int do_write(struct connection *c) {
/*** sink_input callbacks ***/
static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) {
- struct connection*c = i->userdata;
- assert(i && c && chunk);
-
+ struct connection*c;
+ assert(i && i->userdata && chunk);
+ c = i->userdata;
+
if (memblockq_peek(c->input_memblockq, chunk) < 0)
return -1;
@@ -143,7 +145,7 @@ static void sink_input_kill_cb(struct sink_input *i) {
static uint32_t sink_input_get_latency_cb(struct sink_input *i) {
struct connection*c = i->userdata;
assert(i && c);
- return samples_usec(memblockq_get_length(c->input_memblockq), &DEFAULT_SAMPLE_SPEC);
+ return pa_samples_usec(memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
}
/*** source_output callbacks ***/
@@ -185,6 +187,7 @@ static void io_callback(struct iochannel*io, void *userdata) {
static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {
struct protocol_simple *p = userdata;
struct connection *c = NULL;
+ char cname[256];
assert(s && io && p);
c = malloc(sizeof(struct connection));
@@ -195,7 +198,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->input_memblockq = c->output_memblockq = NULL;
c->protocol = p;
- c->client = client_new(p->core, "SIMPLE", "Client");
+ iochannel_peer_to_string(io, cname, sizeof(cname));
+ c->client = client_new(p->core, "SIMPLE", cname);
assert(c->client);
c->client->kill = client_kill_cb;
c->client->userdata = c;
@@ -215,8 +219,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->source_output->kill = source_output_kill_cb;
c->source_output->userdata = c;
- l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */
- c->output_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
+ l = 5*pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */
+ c->output_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
}
if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) {
@@ -236,8 +240,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->sink_input->get_latency = sink_input_get_latency_cb;
c->sink_input->userdata = c;
- l = bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */
- c->input_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
+ l = pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */
+ c->input_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
}
diff --git a/src/pstream-util.c b/src/pstream-util.c
new file mode 100644
index 00000000..2fab2b61
--- /dev/null
+++ b/src/pstream-util.c
@@ -0,0 +1,35 @@
+#include <assert.h>
+
+#include "protocol-native-spec.h"
+#include "pstream-util.h"
+
+void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t) {
+ size_t length;
+ uint8_t *data;
+ struct packet *packet;
+ assert(p && t);
+
+ data = tagstruct_free_data(t, &length);
+ assert(data && length);
+ packet = packet_new_dynamic(data, length);
+ assert(packet);
+ pstream_send_packet(p, packet);
+ packet_unref(packet);
+}
+
+void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error) {
+ struct tagstruct *t = tagstruct_new(NULL, 0);
+ assert(t);
+ tagstruct_putu32(t, PA_COMMAND_ERROR);
+ tagstruct_putu32(t, tag);
+ tagstruct_putu32(t, error);
+ pstream_send_tagstruct(p, t);
+}
+
+void pstream_send_simple_ack(struct pstream *p, uint32_t tag) {
+ struct tagstruct *t = tagstruct_new(NULL, 0);
+ assert(t);
+ tagstruct_putu32(t, PA_COMMAND_REPLY);
+ tagstruct_putu32(t, tag);
+ pstream_send_tagstruct(p, t);
+}
diff --git a/src/pstream-util.h b/src/pstream-util.h
new file mode 100644
index 00000000..4e64a95c
--- /dev/null
+++ b/src/pstream-util.h
@@ -0,0 +1,14 @@
+#ifndef foopstreamutilhfoo
+#define foopstreamutilhfoo
+
+#include <inttypes.h>
+#include "pstream.h"
+#include "tagstruct.h"
+
+/* The tagstruct is freed!*/
+void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t);
+
+void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error);
+void pstream_send_simple_ack(struct pstream *p, uint32_t tag);
+
+#endif
diff --git a/src/pstream.c b/src/pstream.c
index a63e126d..4a3a648b 100644
--- a/src/pstream.c
+++ b/src/pstream.c
@@ -30,7 +30,7 @@ struct item_info {
};
struct pstream {
- struct mainloop *mainloop;
+ struct pa_mainloop_api *mainloop;
struct mainloop_source *mainloop_source;
struct iochannel *io;
struct queue *send_queue;
@@ -70,18 +70,24 @@ static void do_read(struct pstream *p);
static void io_callback(struct iochannel*io, void *userdata) {
struct pstream *p = userdata;
assert(p && p->io == io);
+
+ p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
+
do_write(p);
do_read(p);
}
-static void prepare_callback(struct mainloop_source *s, void*userdata) {
+static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) {
struct pstream *p = userdata;
- assert(p && p->mainloop_source == s);
+ assert(p && p->mainloop_source == id && p->mainloop == m);
+
+ p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
+
do_write(p);
do_read(p);
}
-struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
+struct pstream *pstream_new(struct pa_mainloop_api *m, struct iochannel *io) {
struct pstream *p;
assert(io);
@@ -96,8 +102,8 @@ struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
p->die_callback_userdata = NULL;
p->mainloop = m;
- p->mainloop_source = mainloop_source_new_fixed(m, prepare_callback, p);
- mainloop_source_enable(p->mainloop_source, 0);
+ p->mainloop_source = m->source_fixed(m, fixed_callback, p);
+ m->enable_fixed(m, p->mainloop_source, 0);
p->send_queue = queue_new();
assert(p->send_queue);
@@ -152,7 +158,7 @@ void pstream_free(struct pstream *p) {
if (p->read.packet)
packet_unref(p->read.packet);
- mainloop_source_free(p->mainloop_source);
+ p->mainloop->cancel_fixed(p->mainloop, p->mainloop_source);
free(p);
}
@@ -173,7 +179,7 @@ void pstream_send_packet(struct pstream*p, struct packet *packet) {
i->packet = packet_ref(packet);
queue_push(p->send_queue, i);
- mainloop_source_enable(p->mainloop_source, 1);
+ p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
}
void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) {
@@ -190,7 +196,7 @@ void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, st
memblock_ref(i->chunk.memblock);
queue_push(p->send_queue, i);
- mainloop_source_enable(p->mainloop_source, 1);
+ p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
}
void pstream_set_recieve_packet_callback(struct pstream *p, int (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) {
@@ -219,7 +225,7 @@ static void prepare_next_write_item(struct pstream *p) {
assert(p->write.current->packet);
p->write.data = p->write.current->packet->data;
p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
- p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0;
+ p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0;
} else {
assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
@@ -236,8 +242,6 @@ static void do_write(struct pstream *p) {
ssize_t r;
assert(p);
- mainloop_source_enable(p->mainloop_source, 0);
-
if (p->dead || !iochannel_is_writable(p->io))
return;
@@ -285,8 +289,6 @@ static void do_read(struct pstream *p) {
ssize_t r;
assert(p);
- mainloop_source_enable(p->mainloop_source, 0);
-
if (p->dead || !iochannel_is_readable(p->io))
return;
@@ -313,7 +315,7 @@ static void do_read(struct pstream *p) {
assert(!p->read.packet && !p->read.memblock);
- if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == 0) {
+ if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
/* Frame is a packet frame */
p->read.packet = packet_new(ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]));
assert(p->read.packet);
@@ -331,7 +333,7 @@ static void do_read(struct pstream *p) {
if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
size_t l;
- l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r;
+ l = (p->read.index - r) < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
if (l > 0) {
struct memchunk chunk;
diff --git a/src/pstream.h b/src/pstream.h
index 7113681e..d418908e 100644
--- a/src/pstream.h
+++ b/src/pstream.h
@@ -6,10 +6,11 @@
#include "packet.h"
#include "memblock.h"
#include "iochannel.h"
+#include "mainloop-api.h"
struct pstream;
-struct pstream* pstream_new(struct mainloop *m, struct iochannel *io);
+struct pstream* pstream_new(struct pa_mainloop_api *m, struct iochannel *io);
void pstream_free(struct pstream*p);
void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata);
diff --git a/src/queue.c b/src/queue.c
index 90823ae6..5c2e7a67 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -73,5 +73,12 @@ void* queue_pop(struct queue *q) {
p = e->data;
free(e);
+ q->length--;
+
return p;
}
+
+int queue_is_empty(struct queue *q) {
+ assert(q);
+ return q->length == 0;
+}
diff --git a/src/sample-util.c b/src/sample-util.c
new file mode 100644
index 00000000..7a3c267a
--- /dev/null
+++ b/src/sample-util.c
@@ -0,0 +1,88 @@
+#include <string.h>
+#include <assert.h>
+
+#include "sample-util.h"
+
+struct pa_sample_spec default_sample_spec = {
+ .format = SAMPLE_S16NE,
+ .rate = 44100,
+ .channels = 2
+};
+
+struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec) {
+ assert(b && b->data && spec);
+ memblock_assert_exclusive(b);
+ silence_memory(b->data, b->length, spec);
+ return b;
+}
+
+void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec) {
+ assert(c && c->memblock && c->memblock->data && spec && c->length);
+ memblock_assert_exclusive(c->memblock);
+ silence_memory(c->memblock->data+c->index, c->length, spec);
+}
+
+void silence_memory(void *p, size_t length, struct pa_sample_spec *spec) {
+ char c = 0;
+ assert(p && length && spec);
+
+ switch (spec->format) {
+ case SAMPLE_U8:
+ c = 127;
+ break;
+ case SAMPLE_S16LE:
+ case SAMPLE_S16BE:
+ case SAMPLE_FLOAT32:
+ c = 0;
+ break;
+ case SAMPLE_ALAW:
+ case SAMPLE_ULAW:
+ c = 80;
+ break;
+ }
+
+ memset(p, c, length);
+}
+
+size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume) {
+ unsigned c, d;
+ assert(channels && data && length && spec);
+ assert(spec->format == SAMPLE_S16NE);
+
+ for (d = 0;; d += sizeof(int16_t)) {
+ int32_t sum = 0;
+
+ if (d >= length)
+ return d;
+
+ for (c = 0; c < nchannels; c++) {
+ int32_t v;
+ uint8_t volume = channels[c].volume;
+
+ if (d >= channels[c].chunk.length)
+ return d;
+
+ if (volume == 0)
+ v = 0;
+ else {
+ v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d));
+
+ if (volume != 0xFF)
+ v = v*volume/0xFF;
+ }
+
+ sum += v;
+ }
+
+ if (volume == 0)
+ sum = 0;
+ else if (volume != 0xFF)
+ sum = sum*volume/0xFF;
+
+ if (sum < -0x8000) sum = -0x8000;
+ if (sum > 0x7FFF) sum = 0x7FFF;
+
+ *((int16_t*) data) = sum;
+ data += sizeof(int16_t);
+ }
+}
diff --git a/src/sample-util.h b/src/sample-util.h
new file mode 100644
index 00000000..0a3f7c89
--- /dev/null
+++ b/src/sample-util.h
@@ -0,0 +1,23 @@
+#ifndef foosampleutilhfoo
+#define foosampleutilhfoo
+
+#include "sample.h"
+#include "memblock.h"
+
+#define DEFAULT_SAMPLE_SPEC default_sample_spec
+
+extern struct pa_sample_spec default_sample_spec;
+
+struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec);
+void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec);
+void silence_memory(void *p, size_t length, struct pa_sample_spec *spec);
+
+struct mix_info {
+ struct memchunk chunk;
+ uint8_t volume;
+ void *userdata;
+};
+
+size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume);
+
+#endif
diff --git a/src/sample.c b/src/sample.c
index c270f255..2454630c 100644
--- a/src/sample.c
+++ b/src/sample.c
@@ -1,50 +1,8 @@
-#include <string.h>
#include <assert.h>
#include "sample.h"
-struct sample_spec default_sample_spec = {
- .format = SAMPLE_S16NE,
- .rate = 44100,
- .channels = 2
-};
-
-struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec) {
- assert(b && b->data && spec);
- memblock_assert_exclusive(b);
- silence_memory(b->data, b->length, spec);
- return b;
-}
-
-void silence_memchunk(struct memchunk *c, struct sample_spec *spec) {
- assert(c && c->memblock && c->memblock->data && spec && c->length);
- memblock_assert_exclusive(c->memblock);
- silence_memory(c->memblock->data+c->index, c->length, spec);
-}
-
-void silence_memory(void *p, size_t length, struct sample_spec *spec) {
- char c = 0;
- assert(p && length && spec);
-
- switch (spec->format) {
- case SAMPLE_U8:
- c = 127;
- break;
- case SAMPLE_S16LE:
- case SAMPLE_S16BE:
- case SAMPLE_FLOAT32:
- c = 0;
- break;
- case SAMPLE_ALAW:
- case SAMPLE_ULAW:
- c = 80;
- break;
- }
-
- memset(p, c, length);
-}
-
-size_t sample_size(struct sample_spec *spec) {
+size_t pa_sample_size(struct pa_sample_spec *spec) {
assert(spec);
size_t b = 1;
@@ -66,56 +24,14 @@ size_t sample_size(struct sample_spec *spec) {
return b * spec->channels;
}
-size_t bytes_per_second(struct sample_spec *spec) {
+size_t pa_bytes_per_second(struct pa_sample_spec *spec) {
assert(spec);
- return spec->rate*sample_size(spec);
+ return spec->rate*pa_sample_size(spec);
}
-size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume) {
- unsigned c, d;
- assert(channels && data && length && spec);
- assert(spec->format == SAMPLE_S16NE);
-
- for (d = 0;; d += sizeof(int16_t)) {
- int32_t sum = 0;
-
- if (d >= length)
- return d;
-
- for (c = 0; c < nchannels; c++) {
- int32_t v;
- uint8_t volume = channels[c].volume;
-
- if (d >= channels[c].chunk.length)
- return d;
-
- if (volume == 0)
- v = 0;
- else {
- v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d));
-
- if (volume != 0xFF)
- v = v*volume/0xFF;
- }
-
- sum += v;
- }
-
- if (volume == 0)
- sum = 0;
- else if (volume != 0xFF)
- sum = sum*volume/0xFF;
-
- if (sum < -0x8000) sum = -0x8000;
- if (sum > 0x7FFF) sum = 0x7FFF;
-
- *((int16_t*) data) = sum;
- data += sizeof(int16_t);
- }
-}
-uint32_t samples_usec(size_t length, struct sample_spec *spec) {
+uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec) {
assert(spec);
- return (uint32_t) (((double) length /sample_size(spec))/spec->rate*1000000);
+ return (uint32_t) (((double) length /pa_sample_size(spec))/spec->rate*1000000);
}
diff --git a/src/sample.h b/src/sample.h
index b2f13cc4..a4a973bf 100644
--- a/src/sample.h
+++ b/src/sample.h
@@ -2,10 +2,9 @@
#define foosamplehfoo
#include <inttypes.h>
+#include <sys/types.h>
-#include "memblock.h"
-
-enum sample_format {
+enum pa_sample_format {
SAMPLE_U8,
SAMPLE_ALAW,
SAMPLE_ULAW,
@@ -16,30 +15,14 @@ enum sample_format {
#define SAMPLE_S16NE SAMPLE_S16LE
-struct sample_spec {
- enum sample_format format;
+struct pa_sample_spec {
+ enum pa_sample_format format;
uint32_t rate;
uint8_t channels;
};
-#define DEFAULT_SAMPLE_SPEC default_sample_spec
-
-extern struct sample_spec default_sample_spec;
-
-struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec);
-void silence_memchunk(struct memchunk *c, struct sample_spec *spec);
-void silence_memory(void *p, size_t length, struct sample_spec *spec);
-
-struct mix_info {
- struct memchunk chunk;
- uint8_t volume;
- void *userdata;
-};
-
-size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume);
-
-size_t bytes_per_second(struct sample_spec *spec);
-size_t sample_size(struct sample_spec *spec);
-uint32_t samples_usec(size_t length, struct sample_spec *spec);
+size_t pa_bytes_per_second(struct pa_sample_spec *spec);
+size_t pa_sample_size(struct pa_sample_spec *spec);
+uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec);
#endif
diff --git a/src/simple.c b/src/simple.c
new file mode 100644
index 00000000..a90d22bd
--- /dev/null
+++ b/src/simple.c
@@ -0,0 +1,120 @@
+#include "simple.h"
+#include "polyp.h"
+#include "mainloop.h"
+
+struct pa_simple {
+ struct mainloop *mainloop;
+ struct pa_context *context;
+ struct pa_stream *stream;
+
+ size_t requested;
+ int dead;
+};
+
+static void playback_callback(struct pa_stream *p, size_t length, void *userdata) {
+ struct pa_stream *sp = userdata;
+ assert(p && length && sp);
+
+ sp->requested = length;
+}
+
+struct pa_simple* pa_simple_new(
+ const char *server,
+ const char *name,
+ enum pa_stream_direction dir,
+ const char *dev,
+ const char *stream_name,
+ const struct pa_sample_spec *ss,
+ const struct pa_buffer_attr *attr) {
+
+ struct pa_simple *p;
+ assert(ss);
+
+ p = malloc(sizeof(struct pa_simple));
+ assert(p);
+ p->context = NULL;
+ p->stream = NULL;
+ p->mainloop = pa_mainloop_new();
+ assert(p->mainloop);
+ p->requested = 0;
+ p->dead = 0;
+
+ if (!(p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), name)))
+ goto fail;
+
+ if (pa_context_connect(c, server, NULL, NULL) < 0)
+ goto fail;
+
+ while (!pa_context_is_ready(c)) {
+ if (pa_context_is_dead(c))
+ goto fail;
+
+ if (mainloop_iterate(p->mainloop) < 0)
+ goto fail;
+ }
+
+ if (!(p->stream = pa_stream_new(p->context, dir, sink, stream_name, ss, attr, NULL, NULL)))
+ goto fail;
+
+ while (!pa_stream_is_ready(c)) {
+ if (pa_stream_is_dead(c))
+ goto fail;
+
+ if (mainloop_iterate(p->mainloop) < 0)
+ goto fail;
+ }
+
+ pa_stream_set_write_callback(p->stream, playback_callback, p);
+
+ return p;
+
+fail:
+ pa_simple_free(p);
+ return NULL;
+}
+
+void pa_simple_free(struct pa_simple *s) {
+ assert(s);
+
+ if (s->stream)
+ pa_stream_free(s->stream);
+
+ if (s->context)
+ pa_context_free(s->context);
+
+ if (s->mainloop)
+ mainloop_free(s->mainloop);
+
+ free(s);
+}
+
+int pa_simple_write(struct pa_simple *s, const void*data, size_t length) {
+ assert(s && data);
+
+ while (length > 0) {
+ size_t l;
+
+ while (!s->requested) {
+ if (pa_context_is_dead(c))
+ return -1;
+
+ if (mainloop_iterate(s->mainloop) < 0)
+ return -1;
+ }
+
+ l = length;
+ if (l > s->requested)
+ l = s->requested;
+
+ pa_stream_write(s->stream, data, l);
+ data += l;
+ length -= l;
+ s->requested = -l;
+ }
+
+ return 0;
+}
+
+int pa_simple_read(struct pa_simple *s, const void*data, size_t length) {
+ assert(0);
+}
diff --git a/src/simple.h b/src/simple.h
new file mode 100644
index 00000000..80693056
--- /dev/null
+++ b/src/simple.h
@@ -0,0 +1,25 @@
+#ifndef foosimplehfoo
+#define foosimplehfoo
+
+#include <sys/types.h>
+
+#include "sample.h"
+#include "polypdef.h"
+
+struct pa_simple;
+
+struct pa_simple* pa_simple_new(
+ const char *server,
+ const char *name,
+ enum pa_stream_direction dir,
+ const char *dev,
+ const char *stream_name,
+ const struct pa_sample_spec *ss,
+ const struct pa_buffer_attr *attr);
+
+void pa_simple_free(struct pa_simple *s);
+
+int pa_simple_write(struct pa_simple *s, const void*data, size_t length);
+int pa_simple_read(struct pa_simple *s, const void*data, size_t length);
+
+#endif
diff --git a/src/sink.c b/src/sink.c
index cd12b463..a334424c 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -6,10 +6,11 @@
#include "sink.h"
#include "sinkinput.h"
#include "strbuf.h"
+#include "sample-util.h"
#define MAX_MIX_CHANNELS 32
-struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) {
+struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {
struct sink *s;
char *n = NULL;
int r;
diff --git a/src/sink.h b/src/sink.h
index 394abb5b..d9f80059 100644
--- a/src/sink.h
+++ b/src/sink.h
@@ -15,7 +15,7 @@ struct sink {
char *name;
struct core *core;
- struct sample_spec sample_spec;
+ struct pa_sample_spec sample_spec;
struct idxset *inputs;
struct source *monitor_source;
@@ -27,7 +27,7 @@ struct sink {
void *userdata;
};
-struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec);
+struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec);
void sink_free(struct sink* s);
int sink_render(struct sink*s, size_t length, struct memchunk *result);
diff --git a/src/sinkinput.c b/src/sinkinput.c
index 2e6a8c36..b81c9c71 100644
--- a/src/sinkinput.c
+++ b/src/sinkinput.c
@@ -5,7 +5,7 @@
#include "sinkinput.h"
#include "strbuf.h"
-struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name) {
+struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name) {
struct sink_input *i;
int r;
assert(s && spec);
@@ -14,7 +14,7 @@ struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, cons
assert(i);
i->name = name ? strdup(name) : NULL;
i->sink = s;
- i->spec = *spec;
+ i->sample_spec = *spec;
i->peek = NULL;
i->drop = NULL;
diff --git a/src/sinkinput.h b/src/sinkinput.h
index 389d832d..f04ecb95 100644
--- a/src/sinkinput.h
+++ b/src/sinkinput.h
@@ -12,7 +12,7 @@ struct sink_input {
char *name;
struct sink *sink;
- struct sample_spec spec;
+ struct pa_sample_spec sample_spec;
uint8_t volume;
int (*peek) (struct sink_input *i, struct memchunk *chunk);
@@ -23,7 +23,7 @@ struct sink_input {
void *userdata;
};
-struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name);
+struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name);
void sink_input_free(struct sink_input* i);
/* Code that didn't create the input stream should call this function to
diff --git a/src/socket-client.c b/src/socket-client.c
new file mode 100644
index 00000000..812c43f3
--- /dev/null
+++ b/src/socket-client.c
@@ -0,0 +1,177 @@
+#include <unistd.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "socket-client.h"
+#include "util.h"
+
+struct socket_client {
+ struct pa_mainloop_api *mainloop;
+ int fd;
+
+ void *io_source, *fixed_source;
+ void (*callback)(struct socket_client*c, struct iochannel *io, void *userdata);
+ void *userdata;
+};
+
+static struct socket_client*socket_client_new(struct pa_mainloop_api *m) {
+ struct socket_client *c;
+ assert(m);
+
+ c = malloc(sizeof(struct socket_client));
+ assert(c);
+ c->mainloop = m;
+ c->fd = -1;
+ c->io_source = c->fixed_source = NULL;
+ c->callback = NULL;
+ c->userdata = NULL;
+ return c;
+}
+
+static void do_call(struct socket_client *c) {
+ struct iochannel *io;
+ int error, lerror;
+ assert(c && c->callback);
+
+ lerror = sizeof(error);
+ if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &error, &lerror) < 0) {
+ fprintf(stderr, "getsockopt(): %s\n", strerror(errno));
+ goto failed;
+ }
+
+ if (lerror != sizeof(error)) {
+ fprintf(stderr, "getsocktop() returned invalid size.\n");
+ goto failed;
+ }
+
+ if (error != 0) {
+ fprintf(stderr, "connect(): %s\n", strerror(error));
+ goto failed;
+ }
+
+ io = iochannel_new(c->mainloop, c->fd, c->fd);
+ assert(io);
+ c->fd = -1;
+ c->callback(c, io, c->userdata);
+
+ return;
+
+failed:
+ close(c->fd);
+ c->fd = -1;
+ c->callback(c, NULL, c->userdata);
+ return;
+}
+
+static void connect_fixed_cb(struct pa_mainloop_api *m, void *id, void *userdata) {
+ struct socket_client *c = userdata;
+ assert(m && c && c->fixed_source == id);
+ m->cancel_fixed(m, c->fixed_source);
+ c->fixed_source = NULL;
+ do_call(c);
+}
+
+static void connect_io_cb(struct pa_mainloop_api*m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
+ struct socket_client *c = userdata;
+ assert(m && c && c->io_source == id && fd >= 0 && events == PA_MAINLOOP_API_IO_EVENT_OUTPUT);
+ m->cancel_io(m, c->io_source);
+ c->io_source = NULL;
+ do_call(c);
+}
+
+static int do_connect(struct socket_client *c, const struct sockaddr *sa, socklen_t len) {
+ int r;
+ assert(c && sa && len);
+
+ make_nonblock_fd(c->fd);
+
+ if ((r = connect(c->fd, sa, len)) < 0) {
+ if (r != EINPROGRESS) {
+ fprintf(stderr, "connect(): %s\n", strerror(errno));
+ return -1;
+ }
+
+ c->io_source = c->mainloop->source_io(c->mainloop, c->fd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, connect_io_cb, c);
+ assert(c->io_source);
+ } else {
+ c->fixed_source = c->mainloop->source_fixed(c->mainloop, connect_fixed_cb, c);
+ assert(c->io_source);
+ }
+
+ return 0;
+}
+
+struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) {
+ struct socket_client *c;
+ struct sockaddr_in sa;
+
+ c = socket_client_new(m);
+ assert(c);
+
+ if ((c->fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+ fprintf(stderr, "socket(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(port);
+ sa.sin_addr.s_addr = htonl(address);
+
+ if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0)
+ goto fail;
+
+ return c;
+
+fail:
+ socket_client_free(c);
+ return NULL;
+}
+
+struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename) {
+ struct socket_client *c;
+ struct sockaddr_un sa;
+
+ c = socket_client_new(m);
+ assert(c);
+
+ if ((c->fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) {
+ fprintf(stderr, "socket(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ sa.sun_family = AF_LOCAL;
+ strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1);
+ sa.sun_path[sizeof(sa.sun_path) - 1] = 0;
+
+ if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0)
+ goto fail;
+
+ return c;
+
+fail:
+ socket_client_free(c);
+ return NULL;
+}
+
+void socket_client_free(struct socket_client *c) {
+ assert(c && c->mainloop);
+ if (c->io_source)
+ c->mainloop->cancel_io(c->mainloop, c->io_source);
+ if (c->fixed_source)
+ c->mainloop->cancel_fixed(c->mainloop, c->fixed_source);
+ if (c->fd >= 0)
+ close(c->fd);
+ free(c);
+}
+
+void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata) {
+ assert(c);
+ c->callback = on_connection;
+ c->userdata = userdata;
+}
diff --git a/src/socket-client.h b/src/socket-client.h
new file mode 100644
index 00000000..4de01e34
--- /dev/null
+++ b/src/socket-client.h
@@ -0,0 +1,17 @@
+#ifndef foosocketclienthfoo
+#define foosocketclienthfoo
+
+#include <inttypes.h>
+#include "mainloop-api.h"
+#include "iochannel.h"
+
+struct socket_client;
+
+struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);
+struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename);
+
+void socket_client_free(struct socket_client *c);
+
+void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata);
+
+#endif
diff --git a/src/socket-server.c b/src/socket-server.c
index 6ad225e3..87fe1476 100644
--- a/src/socket-server.c
+++ b/src/socket-server.c
@@ -19,14 +19,15 @@ struct socket_server {
void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata);
void *userdata;
- struct mainloop_source *mainloop_source;
+ void *mainloop_source;
+ struct pa_mainloop_api *mainloop;
};
-static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) {
+static void callback(struct pa_mainloop_api *mainloop, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
struct socket_server *s = userdata;
struct iochannel *io;
int nfd;
- assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s);
+ assert(s && s->mainloop == mainloop && s->mainloop_source == id && id && fd >= 0 && fd == s->fd && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
if ((nfd = accept(fd, NULL, NULL)) < 0) {
fprintf(stderr, "accept(): %s\n", strerror(errno));
@@ -38,12 +39,12 @@ static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event
return;
}
- io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd);
+ io = iochannel_new(s->mainloop, nfd, nfd);
assert(io);
s->on_connection(s, io, s->userdata);
}
-struct socket_server* socket_server_new(struct mainloop *m, int fd) {
+struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd) {
struct socket_server *s;
assert(m && fd >= 0);
@@ -54,13 +55,14 @@ struct socket_server* socket_server_new(struct mainloop *m, int fd) {
s->on_connection = NULL;
s->userdata = NULL;
- s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s);
+ s->mainloop = m;
+ s->mainloop_source = m->source_io(m, fd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, s);
assert(s->mainloop_source);
return s;
}
-struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) {
+struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename) {
int fd = -1;
struct sockaddr_un sa;
struct socket_server *s;
@@ -101,7 +103,7 @@ fail:
return NULL;
}
-struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) {
+struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) {
int fd = -1;
struct sockaddr_in sa;
int on = 1;
@@ -148,7 +150,8 @@ void socket_server_free(struct socket_server*s) {
free(s->filename);
}
- mainloop_source_free(s->mainloop_source);
+
+ s->mainloop->cancel_io(s->mainloop, s->mainloop_source);
free(s);
}
diff --git a/src/socket-server.h b/src/socket-server.h
index 4814fc62..80895f8d 100644
--- a/src/socket-server.h
+++ b/src/socket-server.h
@@ -2,14 +2,14 @@
#define foosocketserverhfoo
#include <inttypes.h>
-#include "mainloop.h"
+#include "mainloop-api.h"
#include "iochannel.h"
struct socket_server;
-struct socket_server* socket_server_new(struct mainloop *m, int fd);
-struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename);
-struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port);
+struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd);
+struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename);
+struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);
void socket_server_free(struct socket_server*s);
diff --git a/src/source.c b/src/source.c
index 44d77532..21ac24f3 100644
--- a/src/source.c
+++ b/src/source.c
@@ -7,7 +7,7 @@
#include "sourceoutput.h"
#include "strbuf.h"
-struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) {
+struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {
struct source *s;
int r;
assert(core && spec);
diff --git a/src/source.h b/src/source.h
index 078fb1c9..04f3984f 100644
--- a/src/source.h
+++ b/src/source.h
@@ -14,14 +14,14 @@ struct source {
char *name;
struct core *core;
- struct sample_spec sample_spec;
+ struct pa_sample_spec sample_spec;
struct idxset *outputs;
void (*notify)(struct source*source);
void *userdata;
};
-struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec);
+struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec);
void source_free(struct source *s);
/* Pass a new memory block to all output streams */
diff --git a/src/sourceoutput.c b/src/sourceoutput.c
index 8021b522..e2e1dacc 100644
--- a/src/sourceoutput.c
+++ b/src/sourceoutput.c
@@ -5,7 +5,7 @@
#include "sourceoutput.h"
#include "strbuf.h"
-struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name) {
+struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name) {
struct source_output *o;
int r;
assert(s && spec);
@@ -14,7 +14,7 @@ struct source_output* source_output_new(struct source *s, struct sample_spec *sp
assert(o);
o->name = name ? strdup(name) : NULL;
o->source = s;
- o->spec = *spec;
+ o->sample_spec = *spec;
o->push = NULL;
o->kill = NULL;
diff --git a/src/sourceoutput.h b/src/sourceoutput.h
index 359ff151..50cb9caf 100644
--- a/src/sourceoutput.h
+++ b/src/sourceoutput.h
@@ -12,7 +12,7 @@ struct source_output {
char *name;
struct source *source;
- struct sample_spec spec;
+ struct pa_sample_spec sample_spec;
void (*push)(struct source_output *o, struct memchunk *chunk);
void (*kill)(struct source_output* o);
@@ -20,7 +20,7 @@ struct source_output {
void *userdata;
};
-struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name);
+struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name);
void source_output_free(struct source_output* o);
void source_output_kill(struct source_output*o);
diff --git a/src/tagstruct.c b/src/tagstruct.c
index 429dd408..47e17839 100644
--- a/src/tagstruct.c
+++ b/src/tagstruct.c
@@ -90,7 +90,7 @@ void tagstruct_putu8(struct tagstruct*t, uint8_t c) {
t->length += 2;
}
-void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss) {
+void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss) {
assert(t && ss);
extend(t, 7);
t->data[t->length] = TAG_SAMPLE_SPEC;
@@ -156,7 +156,7 @@ int tagstruct_getu8(struct tagstruct*t, uint8_t *c) {
return 0;
}
-int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss) {
+int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss) {
assert(t && ss);
if (t->rindex+7 > t->length)
diff --git a/src/tagstruct.h b/src/tagstruct.h
index 5572c64c..9f6a0bf4 100644
--- a/src/tagstruct.h
+++ b/src/tagstruct.h
@@ -15,12 +15,12 @@ uint8_t* tagstruct_free_data(struct tagstruct*t, size_t *l);
void tagstruct_puts(struct tagstruct*t, const char *s);
void tagstruct_putu32(struct tagstruct*t, uint32_t i);
void tagstruct_putu8(struct tagstruct*t, uint8_t c);
-void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss);
+void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss);
int tagstruct_gets(struct tagstruct*t, const char **s);
int tagstruct_getu32(struct tagstruct*t, uint32_t *i);
int tagstruct_getu8(struct tagstruct*t, uint8_t *c);
-int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss);
+int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss);
int tagstruct_eof(struct tagstruct*t);
const uint8_t* tagstruct_data(struct tagstruct*t, size_t *l);
diff --git a/src/todo b/src/todo
index aeb7ae5f..47344ab4 100644
--- a/src/todo
+++ b/src/todo
@@ -1,8 +1,10 @@
+- sync() function in native library
+- name registrar
- native protocol/library
- simple control protocol: kill client/input/output; set_volume
- resampling
- esound protocol
-- config parser
+- config parser/cmdline
- record testing
-- 0.1
- optimierung von rebuild_pollfds()
diff --git a/src/util.c b/src/util.c
new file mode 100644
index 00000000..0383a0ad
--- /dev/null
+++ b/src/util.c
@@ -0,0 +1,62 @@
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <fcntl.h>
+
+#include "util.h"
+
+void make_nonblock_fd(int fd) {
+ int v;
+
+ if ((v = fcntl(fd, F_GETFL)) >= 0)
+ if (!(v & O_NONBLOCK))
+ fcntl(fd, F_SETFL, v|O_NONBLOCK);
+}
+
+void peer_to_string(char *c, size_t l, int fd) {
+ struct stat st;
+
+ assert(c && l && fd >= 0);
+
+ if (fstat(fd, &st) < 0) {
+ snprintf(c, l, "Invalid client fd");
+ return;
+ }
+
+ if (S_ISSOCK(st.st_mode)) {
+ union {
+ struct sockaddr sa;
+ struct sockaddr_in in;
+ struct sockaddr_un un;
+ } sa;
+ socklen_t sa_len = sizeof(sa);
+
+ if (getpeername(fd, &sa.sa, &sa_len) >= 0) {
+
+ if (sa.sa.sa_family == AF_INET) {
+ uint32_t ip = ntohl(sa.in.sin_addr.s_addr);
+
+ snprintf(c, l, "TCP/IP client from %i.%i.%i.%i:%u",
+ ip >> 24,
+ (ip >> 16) & 0xFF,
+ (ip >> 8) & 0xFF,
+ ip & 0xFF,
+ ntohs(sa.in.sin_port));
+ return;
+ } else if (sa.sa.sa_family == AF_LOCAL) {
+ snprintf(c, l, "UNIX client for %s", sa.un.sun_path);
+ return;
+ }
+
+ }
+ snprintf(c, l, "Unknown network client");
+ return;
+ } else if (S_ISCHR(st.st_mode) && (fd == 0 || fd == 1)) {
+ snprintf(c, l, "STDIN/STDOUT client");
+ return;
+ }
+
+ snprintf(c, l, "Unknown client");
+}
diff --git a/src/util.h b/src/util.h
new file mode 100644
index 00000000..830ee2e0
--- /dev/null
+++ b/src/util.h
@@ -0,0 +1,8 @@
+#ifndef fooutilhfoo
+#define fooutilhfoo
+
+void make_nonblock_fd(int fd);
+
+void peer_to_string(char *c, size_t l, int fd);
+
+#endif