From 30562d7cba51a9879489ce88840c5f67ff873026 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Thu, 17 May 2007 23:42:28 +0000 Subject: basic threading model git-svn-id: file:///home/lennart/svn/public/libsydney/trunk@27 9ba3c220-e4d3-45a2-8aa3-73fcc9aff6ce --- Makefile | 11 ++++--- TODO | 2 ++ common.c | 25 ++++++++++++++-- common.h | 2 ++ driver.h | 4 ++- macro.h | 1 + once.c | 2 +- oss.c | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- sydney.h | 7 +++-- test-pull.c | 73 ++++++++++++++++++++++++++++++++++++++++++++++ test-sine.c | 20 ++++++------- 11 files changed, 222 insertions(+), 22 deletions(-) create mode 100644 test-pull.c diff --git a/Makefile b/Makefile index cd6deb4..75e9f41 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ CFLAGS=-Wall -O0 -g -W -Wno-unused-parameter `pkg-config --cflags liboil-0.3` -DRANDOM_PREFIX=sa -DOUTSIDE_SPEEX -D_GNU_SOURCE -pthread LIBS=-lm `pkg-config --libs liboil-0.3` -SOURCES=common.c malloc.c test-sine.c oss.c bbuffer.c format.c volscale.c byteswap.c continued-fraction.c zero.c add.c speex/resample.c resample.c interleave.c converter.c g711.c mutex.c once.c thread.c +SOURCES=common.c malloc.c oss.c bbuffer.c format.c volscale.c byteswap.c continued-fraction.c zero.c add.c speex/resample.c resample.c interleave.c converter.c g711.c mutex.c once.c thread.c OBJS=$(SOURCES:.c=.o) -all: test-bufferq test-llist test-sine +all: test-bufferq test-llist test-sine test-pull test-bufferq: test-bufferq.o bufferq.o $(CC) $(CFLAGS) -o $@ $^ @@ -12,7 +12,10 @@ test-bufferq: test-bufferq.o bufferq.o test-llist: test-llist.o $(CC) $(CFLAGS) -o $@ $^ -test-sine: $(OBJS) +test-sine: $(OBJS) test-sine.o + $(CC) $(CFLAGS) -o $@ $^ $(LIBS) + +test-pull: $(OBJS) test-pull.o $(CC) $(CFLAGS) -o $@ $^ $(LIBS) *.o: *.h @@ -22,7 +25,7 @@ indent: # astyle --indent=spaces=4 --brackets=attach --indent-switches --max-instatement-indent=40 --pad=oper --unpad=paren --convert-tabs --mode=c < oss.c clean: - rm -f *.o meta-name-table.h test-bufferq test-llist test-sine + rm -f *.o meta-name-table.h test-bufferq test-llist test-sine core test-pull common.o: meta-name-table.h diff --git a/TODO b/TODO index 982b82c..0358748 100644 --- a/TODO +++ b/TODO @@ -19,3 +19,5 @@ * s/sa_/syd_/g * vbr + +* should we notify about local volume changes? diff --git a/common.c b/common.c index a08316c..2470cca 100644 --- a/common.c +++ b/common.c @@ -323,15 +323,36 @@ int sa_stream_set_driver(sa_stream_t *s, const char *driver) { return SA_SUCCESS; } -int sa_stream_start_thread(sa_stream_t *s, sa_event_callback_t *callback) { +int sa_stream_start_thread(sa_stream_t *s, sa_event_callback_t callback) { int r; sa_return_val_if_fail(s, SA_ERROR_INVALID); sa_return_val_if_fail(callback, SA_ERROR_INVALID); sa_mutex_lock(s->mutex); - sa_return_val_if_fail_mutex(s->mutex, s->state == SA_STATE_INIT, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, s->state == SA_STATE_RUNNING || s->state == SA_STATE_STOPPED, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, !s->callback, SA_ERROR_STATE); r = driver_start_thread(s, callback); + + if (r == SA_SUCCESS) + s->callback = callback; + + sa_mutex_unlock(s->mutex); + return r; +} + +int sa_stream_stop_thread(sa_stream_t *s) { + int r; + + sa_return_val_if_fail(s, SA_ERROR_INVALID); + sa_mutex_lock(s->mutex); + sa_return_val_if_fail_mutex(s->mutex, s->state == SA_STATE_RUNNING || s->state == SA_STATE_STOPPED, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, s->callback, SA_ERROR_STATE); + + r = driver_stop_thread(s); + + if (r == SA_SUCCESS) + s->callback = NULL; sa_mutex_unlock(s->mutex); return r; diff --git a/common.h b/common.h index 58a9a3e..79439c8 100644 --- a/common.h +++ b/common.h @@ -58,6 +58,8 @@ struct sa_stream { size_t meta_data_size[_META_NAMES_MAX]; sa_mutex_t *mutex; + + sa_event_callback_t callback; }; size_t get_pcm_sample_size(sa_pcm_format_t f); diff --git a/driver.h b/driver.h index 5c98070..f58db66 100644 --- a/driver.h +++ b/driver.h @@ -5,7 +5,9 @@ int driver_open(sa_stream_t *dev); int driver_destroy(sa_stream_t *dev); -int driver_start_thread(sa_stream_t *dev, sa_event_callback_t *callback); + +int driver_start_thread(sa_stream_t *dev, sa_event_callback_t callback); +int driver_stop_thread(sa_stream_t *dev); int driver_change_device(sa_stream_t *dev, const char *device_name); int driver_change_read_volume(sa_stream_t *dev, const int32_t vol[]); diff --git a/macro.h b/macro.h index ed80654..478ab07 100644 --- a/macro.h +++ b/macro.h @@ -38,6 +38,7 @@ #define sa_return_val_if_fail_mutex(m, expr, val) \ do { \ if (!(expr)) { \ + \ fprintf(stderr, "%s: Assertion <%s> failed.\n", PRETTY_FUNCTION, #expr ); \ sa_mutex_unlock(m); \ return (val); \ diff --git a/once.c b/once.c index de41be3..5dc9764 100644 --- a/once.c +++ b/once.c @@ -39,7 +39,7 @@ int sa_once(sa_once_t *control, sa_once_func_t func) { } sa_mutex_unlock(global_mutex); - if (!r) + if (r) return -1; /* Execute function */ diff --git a/oss.c b/oss.c index b1b1168..838e0f4 100644 --- a/oss.c +++ b/oss.c @@ -1,9 +1,13 @@ +#include #include #include #include #include #include #include +#include +#include +#include #include "sydney.h" #include "common.h" @@ -11,6 +15,7 @@ #include "malloc.h" #include "converter.h" #include "driver.h" +#include "thread.h" #define DEFAULT_DEVICE "/dev/dsp" #define DRIVER_NAME "oss" @@ -25,6 +30,8 @@ struct oss_stream { converter_t converter_read, converter_write; size_t read_fragment_size, write_fragment_size; unsigned read_nfragments, write_nfragments; + sa_thread_t *thread; + int socket_fds[2]; }; static int simple_log2(int v) { @@ -73,6 +80,7 @@ int driver_open(sa_stream_t *s) { return SA_ERROR_OOM; oss->parent = s; + oss->socket_fds[0] = oss->socket_fds[1] = -1; n = s->device ? s->device : DEFAULT_DEVICE; if (!s->codec) @@ -556,20 +564,105 @@ int driver_destroy(sa_stream_t *s) { oss_stream_t *oss = OSS_STREAM(s); if (oss) { + + if (oss->thread) + driver_stop_thread(s); + if (oss->fd >= 0) close(oss->fd); sa_free(oss->real_pcm_attrs.channel_map); converter_done(&oss->converter_read); converter_done(&oss->converter_write); + sa_free(oss); } return SA_SUCCESS; } -int driver_start_thread(sa_stream_t *s, sa_event_callback_t *callback) { - return SA_ERROR_NOT_SUPPORTED; +enum { + POLLFD_OSS_FD, + POLLFD_SOCKET_FD, + POLLFD_MAX +}; + +static void thread_func(void *data) { + struct pollfd pollfds[POLLFD_MAX]; + sa_stream_t *s = data; + oss_stream_t *oss = OSS_STREAM(s); + sigset_t mask; + + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + + s->event = SA_EVENT_ERROR; + if (s->callback(s, SA_EVENT_INIT_THREAD) < 0) + return; + + memset(pollfds, 0, sizeof(pollfds)); + + pollfds[POLLFD_SOCKET_FD].fd = oss->socket_fds[0]; + pollfds[POLLFD_SOCKET_FD].events = POLLIN; + + pollfds[POLLFD_OSS_FD].fd = oss->fd; + pollfds[POLLFD_OSS_FD].events = ((s->mode & SA_MODE_RDONLY) ? POLLIN : 0) | ((s->mode & SA_MODE_WRONLY) ? POLLOUT : 0); + + for (;;) { + if (poll(pollfds, POLLFD_MAX, -1) < 0) { + if (errno == EINTR) + continue; + } + + if (pollfds[POLLFD_SOCKET_FD].revents) + break; + + if (pollfds[POLLFD_OSS_FD].revents & (POLLERR|POLLHUP|POLLNVAL)) { + s->event = SA_EVENT_ERROR; + s->error = SA_ERROR_SYSTEM; + errno = EIO; + s->callback(s, SA_EVENT_ERROR); + break; + } + + s->event = SA_EVENT_REQUEST_IO; + s->callback(s, SA_EVENT_REQUEST_IO); + } +} + +int driver_start_thread(sa_stream_t *s, sa_event_callback_t callback) { + oss_stream_t *oss = OSS_STREAM(s); + sa_return_val_if_fail(!oss->thread, SA_ERROR_STATE); + + s->callback = callback; + + if ((socketpair(AF_UNIX, SOCK_DGRAM, 0, oss->socket_fds)) < 0) + return SA_ERROR_SYSTEM; + + if (!(oss->thread = sa_thread_new(thread_func, s))) + return SA_ERROR_OOM; + + return SA_SUCCESS; +} + +int driver_stop_thread(sa_stream_t *s) { + oss_stream_t *oss = OSS_STREAM(s); + sa_return_val_if_fail(oss->thread, SA_ERROR_STATE); + sa_return_val_if_fail(oss->thread != sa_thread_self(), SA_ERROR_STATE); + + if (oss->socket_fds[0] >= 0) + close(oss->socket_fds[0]); + + if (oss->socket_fds[1] >= 0) + close(oss->socket_fds[1]); + + if (oss->thread) + sa_thread_free(oss->thread); + + oss->thread = NULL; + oss->socket_fds[0] = oss->socket_fds[1] = -1; + + return SA_SUCCESS; } int driver_change_read_volume(sa_stream_t *s, const int32_t vol[]) { diff --git a/sydney.h b/sydney.h index e9ba724..032caaf 100644 --- a/sydney.h +++ b/sydney.h @@ -270,7 +270,10 @@ int sa_stream_set_dynamic_rate(sa_stream_t *s, int enable); int sa_stream_set_driver(sa_stream_t *s, const char *driver); /** Start callback */ -int sa_stream_start_thread(sa_stream_t *s, sa_event_callback_t *callback); +int sa_stream_start_thread(sa_stream_t *s, sa_event_callback_t callback); + +/** Start callback */ +int sa_stream_stop_thread(sa_stream_t *s); /** Change the device connected to the stream */ int sa_stream_change_device(sa_stream_t *s, const char *device_name); @@ -316,7 +319,7 @@ int sa_stream_get_driver(sa_stream_t *s, char *driver_name, size_t *size); int sa_stream_get_device(sa_stream_t *s, char *device_name, size_t *size); int sa_stream_get_read_volume(sa_stream_t *s, int32_t vol[], unsigned *n); int sa_stream_get_write_volume(sa_stream_t *s, int32_t vol[], unsigned *n); -int sa_stream_get_meta_data(sa_stream_t *s, const char *name, void *data, size_t *size); +int sa_stream_get_meta_data(sa_stream_t *s, const char *name, void*data, size_t *size); int sa_stream_get_adjust_rate(sa_stream_t *s, sa_adjust_t *direction); int sa_stream_get_adjust_nchannels(sa_stream_t *s, sa_adjust_t *direction); int sa_stream_get_adjust_pcm_format(sa_stream_t *s, sa_adjust_t *direction); diff --git a/test-pull.c b/test-pull.c new file mode 100644 index 0000000..7f93906 --- /dev/null +++ b/test-pull.c @@ -0,0 +1,73 @@ +#include +#include +#include +#include +#include + +#include "sydney.h" +#include "macro.h" + +#define ASSERT_SUCCESS(x) do { \ + int _r; \ + if ((_r = x)) { \ + fprintf(stderr, "Operation <%s> failed: %s%s%s\n", \ + #x, \ + sa_strerror(_r), \ + _r == SA_ERROR_SYSTEM ? "; " : "", _r == SA_ERROR_SYSTEM ? strerror(errno) : ""); \ + } \ + assert(_r == SA_SUCCESS); \ +} while(0) + +#define FREQ 440 + +static const float data[4] = { 0.0, 1.0, 0.0, -1.0 }; + +static int callback(sa_stream_t *s, sa_event_t e) { + switch (e) { + case SA_EVENT_INIT_THREAD: + printf("Thread initialized.\n"); + return 0; + + case SA_EVENT_ERROR: { + int e; + ASSERT_SUCCESS(sa_stream_get_event_error(s, &e)); + printf("Error: %s\n", sa_strerror(e)); + return -1; + } + + case SA_EVENT_NOTIFY: + printf("Notified.\n"); + return 0; + + case SA_EVENT_REQUEST_IO: + + ASSERT_SUCCESS(sa_stream_write(s, data, sizeof(data))); + return 0; + + case _SA_EVENT_MAX: + ; + } + + sa_assert_not_reached(); +} + +int main(int argc, char *argv[]) { + + sa_stream_t *s; + + ASSERT_SUCCESS(sa_stream_create_pcm(&s, "Sine Test (pull)", SA_MODE_WRONLY, SA_PCM_FORMAT_FLOAT32_NE, FREQ * 4, 1)); + ASSERT_SUCCESS(sa_stream_change_device(s, "/dev/dsp1")); + ASSERT_SUCCESS(sa_stream_open(s)); + + ASSERT_SUCCESS(sa_stream_start_thread(s, callback)); + + sleep(20); + + ASSERT_SUCCESS(sa_stream_stop_thread(s)); + + ASSERT_SUCCESS(sa_stream_drain(s)); + + ASSERT_SUCCESS(sa_stream_destroy(s)); + + return 0; +} diff --git a/test-sine.c b/test-sine.c index b3a49e9..07f3868 100644 --- a/test-sine.c +++ b/test-sine.c @@ -20,15 +20,15 @@ int main(int argc, char *argv[]) { - sa_stream_t *dev; + sa_stream_t *s; float data[4] = { 0.0, 1.0, 0.0, -1.0 }; int i, j; - ASSERT_SUCCESS(sa_stream_create_pcm(&dev, "Sine Test", SA_MODE_WRONLY, SA_PCM_FORMAT_FLOAT32_NE, FREQ * 4, 1)); - ASSERT_SUCCESS(sa_stream_change_device(dev, "/dev/dsp1")); - ASSERT_SUCCESS(sa_stream_set_dynamic_rate(dev, 1)); - sa_stream_change_meta_data(dev, SA_META_CLIENT_NAME, argv[0], strlen(argv[0])); - ASSERT_SUCCESS(sa_stream_open(dev)); + ASSERT_SUCCESS(sa_stream_create_pcm(&s, "Sine Test", SA_MODE_WRONLY, SA_PCM_FORMAT_FLOAT32_NE, FREQ * 4, 1)); + ASSERT_SUCCESS(sa_stream_change_device(s, "/dev/dsp1")); + ASSERT_SUCCESS(sa_stream_set_dynamic_rate(s, 1)); + sa_stream_change_meta_data(s, SA_META_CLIENT_NAME, argv[0], strlen(argv[0])); + ASSERT_SUCCESS(sa_stream_open(s)); for (j = 0; j < 10; j++) { int v; @@ -36,16 +36,16 @@ int main(int argc, char *argv[]) { v = -j*500; /* ASSERT_SUCCESS(sa_stream_change_rate(dev, FREQ*4+100*j)); */ - ASSERT_SUCCESS(sa_stream_change_write_volume(dev, &v, 1)); + ASSERT_SUCCESS(sa_stream_change_write_volume(s, &v, 1)); for (i = 0; i < FREQ; i++) - ASSERT_SUCCESS(sa_stream_write(dev, data, sizeof(data))); + ASSERT_SUCCESS(sa_stream_write(s, data, sizeof(data))); } - ASSERT_SUCCESS(sa_stream_drain(dev)); + ASSERT_SUCCESS(sa_stream_drain(s)); - ASSERT_SUCCESS(sa_stream_destroy(dev)); + ASSERT_SUCCESS(sa_stream_destroy(s)); return 0; } -- cgit