From c1b4872b0910c9aa784d878dca771b21c4690048 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 22 May 2007 22:53:09 +0000 Subject: lots of work git-svn-id: file:///home/lennart/svn/public/libsydney/trunk@30 9ba3c220-e4d3-45a2-8aa3-73fcc9aff6ce --- Makefile | 13 +++-- TODO | 6 ++ bufferq.c | 147 ++++++++++++++++++++-------------------------- bufferq.h | 6 +- common.c | 71 ++++++++++------------ converter.c | 4 +- converter.h | 1 + driver.h | 5 +- macro.h | 26 ++++----- mutex.c | 27 ++++++--- oss.c | 181 +++++++++++++++++++++++++++++++++++++++++---------------- sydney.h | 12 +--- test-bufferq.c | 13 +++-- thread.c | 14 +++++ thread.h | 1 + 15 files changed, 304 insertions(+), 223 deletions(-) diff --git a/Makefile b/Makefile index 76ef581..bd82b1f 100644 --- a/Makefile +++ b/Makefile @@ -1,16 +1,16 @@ CFLAGS=-Wall -O0 -g -W -Wno-unused-parameter `pkg-config --cflags liboil-0.3` -DRANDOM_PREFIX=saspeex -DOUTSIDE_SPEEX -D_GNU_SOURCE -pthread LIBS=-lm `pkg-config --libs liboil-0.3` -SOURCES=common.c malloc.c oss.c bbuffer.c format.c volscale.c byteswap.c continued-fraction.c zero.c add.c speex/resample.c resample.c interleave.c converter.c g711.c mutex.c once.c thread.c +SOURCES=common.c malloc.c oss.c bbuffer.c format.c volscale.c byteswap.c continued-fraction.c zero.c add.c speex/resample.c resample.c interleave.c converter.c g711.c mutex.c once.c thread.c bufferq.c # asyncq.c OBJS=$(SOURCES:.c=.o) -all: test-bufferq test-llist test-sine test-pull +all: test-bufferq test-llist test-sine test-pull #test-asyncq -test-bufferq: test-bufferq.o bufferq.o malloc.o - $(CC) $(CFLAGS) -o $@ $^ +test-bufferq: test-bufferq.o bufferq.o malloc.o #$(OBJS) + $(CC) $(CFLAGS) -o $@ $^ $(LIBS) test-llist: test-llist.o - $(CC) $(CFLAGS) -o $@ $^ + $(CC) $(CFLAGS) -o $@ $^ $(LIBS) test-sine: $(OBJS) test-sine.o $(CC) $(CFLAGS) -o $@ $^ $(LIBS) @@ -18,6 +18,9 @@ test-sine: $(OBJS) test-sine.o test-pull: $(OBJS) test-pull.o $(CC) $(CFLAGS) -o $@ $^ $(LIBS) +test-asyncq: $(OBJS) test-asyncq.o + $(CC) $(CFLAGS) -o $@ $^ $(LIBS) + *.o: *.h indent: diff --git a/TODO b/TODO index 0358748..f12bf79 100644 --- a/TODO +++ b/TODO @@ -21,3 +21,9 @@ * vbr * should we notify about local volume changes? + +* drop the ability to query the current write index? + +* drop pread + +* stop thread diff --git a/bufferq.c b/bufferq.c index 5261f35..1965dda 100644 --- a/bufferq.c +++ b/bufferq.c @@ -4,7 +4,7 @@ #include "macro.h" #include "bufferq.h" -#define SA_BUFFERQ_CONCAT_DATA(x) ((void*) (uint8_t*) (x) + ALIGN(sizeof(sa_bufferq_item_t))) +#define SA_BUFFERQ_ITEM_CONCAT_DATA(x) ((void*) (uint8_t*) (x) + SA_ALIGN(sizeof(sa_bufferq_item_t))) int sa_bufferq_init(sa_bufferq_t *q, unsigned nchannels, size_t sample_size) { sa_assert(q); @@ -44,23 +44,30 @@ void sa_bufferq_done(sa_bufferq_t *q) { sa_free(q->last); } -static sa_bufferq_item_t* bufferq_item_new(const void *d, size_t size, int copy) { +static sa_bufferq_item_t* bufferq_item_new(const void *d, int64_t idx, size_t size, sa_bufferq_item_type_t type) { sa_bufferq_item_t *i; sa_assert(size > 0); - if (!copy) { + if (type == SA_BUFFERQ_ITEM_STATIC) { if (!(i = sa_new(sa_bufferq_item_t, 1))) - return i; - i->type = SA_BUFFERQ_ITEM_STATIC; + return NULL; i->data = (void*) d; - } else { - if (!(i = sa_malloc(ALIGN(sizeof(sa_bufferq_item_t)) + size))) - return i; - i->type = SA_BUFFERQ_ITEM_CONCATENATED; - i->data = SA_BUFFERQ_CONCAT_DATA(i); + } else if (type == SA_BUFFERQ_ITEM_CONCATENATED) { + if (!(i = sa_malloc(SA_ALIGN(sizeof(sa_bufferq_item_t)) + size))) + return NULL; + i->data = SA_BUFFERQ_ITEM_CONCAT_DATA(i); memcpy(i->data, d, size); + } else { + sa_assert(type == SA_BUFFERQ_ITEM_DYNAMIC); + + if (!(i = sa_new(sa_bufferq_item_t, 1))) + return NULL; + + i->data = (void*) d; } + i->type = type; + i->idx = idx; i->size = size; SA_LLIST_ITEM_INIT(sa_bufferq_item_t, bufferq, i); @@ -90,9 +97,9 @@ static void bufferq_item_free(sa_bufferq_item_t *i) { sa_free(i); } -int sa_bufferq_push(sa_bufferq_t *q, unsigned channel, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence, int copy) { +int sa_bufferq_push(sa_bufferq_t *q, unsigned channel, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence, sa_bufferq_item_type_t type) { int64_t idx; - sa_bufferq_item_t *i, *j; + sa_bufferq_item_t *i, *j, *n; sa_assert(q); @@ -110,7 +117,7 @@ int sa_bufferq_push(sa_bufferq_t *q, unsigned channel, const void *data, size_t sa_assert_not_reached(); } - if (q->read_index > idx) { + if (q->read_index > idx && type != SA_BUFFERQ_ITEM_DYNAMIC) { int64_t l = q->read_index - idx; if (l > nbytes) @@ -122,72 +129,39 @@ int sa_bufferq_push(sa_bufferq_t *q, unsigned channel, const void *data, size_t } - i = q->items[channel]; - - while (i && nbytes > 0) { - - if (idx >= i->idx + i->size) { - /* The new data belongs behind the current node */ - - i = i->bufferq_next; - - } else if (idx < i->idx) { - size_t l; - /* The new data belongs before the current node (at least partially) */ - - l = nbytes; - - if (l > i->idx - idx) - l = i->idx - idx; - - if (!(j = bufferq_item_new(data, l, copy))) - return SA_ERROR_OOM; - - j->idx = idx; - - SA_LLIST_INSERT_BEFORE(sa_bufferq_item_t, bufferq, q->items[channel], i, j); - - idx += l; - data += l; - nbytes -= l; - - } else { - size_t l; - - /* The data belongs right in the current node somewhere */ - - l = nbytes; + /* Allocate the new entry */ + if (!(j = bufferq_item_new(data, idx, nbytes, type))) + return SA_ERROR_OOM; - if (l > i->idx + i->size - idx) - l = i->idx + i->size - idx; + /* Find the position where we need to insert the new entry */ + for (i = q->items[channel]; i && idx >= i->idx + i->size; i = i->bufferq_next) + ; - if (!(i = bufferq_item_make_writable(i))) - return SA_ERROR_OOM; - - memcpy((uint8_t*) i->data + (idx - i->idx), data, l); - - idx += l; - data += l; - nbytes -= l; - } + /* Shorten the current entry if necessary */ + if (i && idx > i->idx) { + i->size = idx - i->idx; + i = i->bufferq_next; } - - if (nbytes > 0) { - sa_assert(!i); - - if (!(j = bufferq_item_new(data, nbytes, copy))) - return SA_ERROR_OOM; - - j->idx = idx; - + + /* Insert the entry */ + if (i) + SA_LLIST_INSERT_BEFORE(sa_bufferq_item_t, bufferq, q->items[channel], i, j); + else { if (q->last[channel]) SA_LLIST_INSERT_AFTER(sa_bufferq_item_t, bufferq, q->items[channel], q->last[channel], j); else SA_LLIST_PREPEND(sa_bufferq_item_t, bufferq, q->items[channel], j); - + q->last[channel] = j; } - + + /* Now kick all the entries that overlap entirely with our new entry */ + for (i = j->bufferq_next; i && i->idx + i->size < j->idx + j->size ; i = n) { + n = i->bufferq_next; + SA_LLIST_REMOVE(sa_bufferq_item_t, bufferq, q->items[channel], i); + bufferq_item_free(i); + } + q->write_index = idx + nbytes; if (q->write_index > q->end_index) @@ -196,7 +170,7 @@ int sa_bufferq_push(sa_bufferq_t *q, unsigned channel, const void *data, size_t return SA_SUCCESS; } -int sa_bufferq_get(sa_bufferq_t *q, void *i[], size_t *bytes) { +void sa_bufferq_get(sa_bufferq_t *q, void *i[], size_t *bytes) { int first = 1; unsigned u; sa_assert(q); @@ -221,7 +195,7 @@ int sa_bufferq_get(sa_bufferq_t *q, void *i[], size_t *bytes) { } else { int64_t l; - i[u] = (uint8_t*) q->items[u]->data + q->read_index - q->items[u]->idx; + i[u] = (uint8_t*) q->items[u]->data + q->read_index - q->items[u]->idx; l = q->items[u]->size - (q->read_index - q->items[u]->idx); @@ -235,12 +209,10 @@ int sa_bufferq_get(sa_bufferq_t *q, void *i[], size_t *bytes) { } else i[u] = NULL; } - - return SA_SUCCESS; } -int sa_bufferq_drop(sa_bufferq_t *q, int64_t bytes) { +void sa_bufferq_drop(sa_bufferq_t *q, int64_t bytes) { unsigned u; sa_assert(q); @@ -264,28 +236,35 @@ int sa_bufferq_drop(sa_bufferq_t *q, int64_t bytes) { if (!i) q->last[u] = NULL; } - - return SA_SUCCESS; } int sa_bufferq_realloc(sa_bufferq_t *q) { unsigned u; + int fail = 0; sa_assert(q); for (u = 0; u < q->nchannels; u++) { sa_bufferq_item_t *i; - i = q->items[u]; + for (i = q->items[u]; i; i = i->bufferq_next) { - while (i) { + if (i->type != SA_BUFFERQ_ITEM_STATIC) + continue; - if (!bufferq_item_make_writable(i)) - return SA_ERROR_OOM; + if (!bufferq_item_make_writable(i)) { + fail = 1; - i = i->bufferq_next; + /* Hmm, we couldn't allocate memory, but we can't + * return without doing anything, hence let's at least + * drop the reference to the statically allocated + * data */ + + i->size = 0; + i->data = SA_BUFFERQ_ITEM_CONCAT_DATA(i); + i->type = SA_BUFFERQ_ITEM_CONCATENATED; + } } - } - return SA_SUCCESS; + return fail ? SA_ERROR_OOM : SA_SUCCESS; } diff --git a/bufferq.h b/bufferq.h index 538bf4d..2d53a45 100644 --- a/bufferq.h +++ b/bufferq.h @@ -33,10 +33,10 @@ int sa_bufferq_init(sa_bufferq_t *q, unsigned nchannels, size_t sample_size); void sa_bufferq_done(sa_bufferq_t *q); -int sa_bufferq_push(sa_bufferq_t *q, unsigned channel, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence, int copy); +int sa_bufferq_push(sa_bufferq_t *q, unsigned channel, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence, sa_bufferq_item_type_t type); int sa_bufferq_realloc(sa_bufferq_t *q); -int sa_bufferq_get(sa_bufferq_t *q, void *i[], size_t *bytes); -int sa_bufferq_drop(sa_bufferq_t *q, int64_t bytes); +void sa_bufferq_get(sa_bufferq_t *q, void *i[], size_t *bytes); +void sa_bufferq_drop(sa_bufferq_t *q, int64_t bytes); #endif diff --git a/common.c b/common.c index 5f3ff57..5a81d02 100644 --- a/common.c +++ b/common.c @@ -540,7 +540,11 @@ int sa_stream_get_state(sa_stream_t *s, sa_state_t *state) { sa_mutex_lock(s->mutex); sa_return_val_if_fail_mutex(s->mutex, state, SA_ERROR_INVALID); - ret = driver_get_state(s, state); + if (s->state == SA_STATE_INIT) { + *state = s->state; + ret = SA_SUCCESS; + } else + ret = driver_get_state(s, state); sa_mutex_unlock(s->mutex); return ret; @@ -895,79 +899,66 @@ int sa_stream_get_position(sa_stream_t *s, sa_position_t position, int64_t *pos) } int sa_stream_read(sa_stream_t *s, void *data, size_t nbytes) { - return sa_stream_pread(s, data, nbytes, 0, SA_SEEK_RELATIVE); -} - -int sa_stream_write(sa_stream_t *s, const void *data, size_t nbytes) { - return sa_stream_pwrite(s, data, nbytes, 0, SA_SEEK_RELATIVE); -} - -int sa_stream_read_ni(sa_stream_t *s, unsigned channel, void *data, size_t nbytes) { - return sa_stream_pread_ni(s, channel, data, nbytes, 0, SA_SEEK_RELATIVE); -} - -int sa_stream_write_ni(sa_stream_t *s, unsigned channel, const void *data, size_t nbytes) { - return sa_stream_pwrite_ni(s, channel, data, nbytes, 0, SA_SEEK_RELATIVE); -} - -int sa_stream_pread(sa_stream_t *s, void *data, size_t nbytes, int64_t offset, sa_seek_t whence) { int ret; - sa_return_val_if_fail(s, SA_ERROR_INVALID); sa_return_val_if_fail(data, SA_ERROR_INVALID); sa_return_val_if_fail(nbytes > 0, SA_ERROR_INVALID); - sa_return_val_if_fail(whence == SA_SEEK_RELATIVE || whence == SA_SEEK_ABSOLUTE || whence == SA_SEEK_RELATIVE_END, SA_ERROR_INVALID); sa_mutex_lock(s->mutex); sa_return_val_if_fail_mutex(s->mutex, !s->ni_enabled, SA_ERROR_STATE); sa_return_val_if_fail_mutex(s->mutex, s->codec || (nbytes % s->pcm_frame_size) == 0, SA_ERROR_INVALID); - sa_return_val_if_fail_mutex(s->mutex, s->codec || (offset % s->pcm_frame_size) == 0, SA_ERROR_INVALID); sa_return_val_if_fail_mutex(s->mutex, s->mode & SA_MODE_RDONLY, SA_ERROR_STATE); sa_return_val_if_fail_mutex(s->mutex, s->state == SA_STATE_RUNNING || s->state == SA_STATE_STOPPED, SA_ERROR_STATE); - ret = driver_pread(s, data, nbytes, offset, whence); + ret = driver_read(s, data, nbytes); sa_mutex_unlock(s->mutex); return ret; } -int sa_stream_pwrite(sa_stream_t *s, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence) { +int sa_stream_read_ni(sa_stream_t *s, unsigned channel, void *data, size_t nbytes) { int ret; - + sa_return_val_if_fail(s, SA_ERROR_INVALID); sa_return_val_if_fail(data, SA_ERROR_INVALID); sa_return_val_if_fail(nbytes > 0, SA_ERROR_INVALID); - sa_return_val_if_fail(whence == SA_SEEK_RELATIVE || whence == SA_SEEK_ABSOLUTE || whence == SA_SEEK_RELATIVE_END, SA_ERROR_INVALID); sa_mutex_lock(s->mutex); - sa_return_val_if_fail_mutex(s->mutex, !s->ni_enabled, SA_ERROR_STATE); - sa_return_val_if_fail_mutex(s->mutex, s->codec || (nbytes % s->pcm_frame_size) == 0, SA_ERROR_INVALID); - sa_return_val_if_fail_mutex(s->mutex, s->codec || (offset % s->pcm_frame_size) == 0, SA_ERROR_INVALID); - sa_return_val_if_fail_mutex(s->mutex, s->mode & SA_MODE_WRONLY, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, !s->codec, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, channel < s->pcm_attrs.nchannels, SA_ERROR_INVALID); + sa_return_val_if_fail_mutex(s->mutex, s->ni_enabled, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, (nbytes % s->pcm_sample_size) == 0, SA_ERROR_INVALID); + sa_return_val_if_fail_mutex(s->mutex, s->mode & SA_MODE_RDONLY, SA_ERROR_STATE); sa_return_val_if_fail_mutex(s->mutex, s->state == SA_STATE_RUNNING || s->state == SA_STATE_STOPPED, SA_ERROR_STATE); - ret = driver_pwrite(s, data, nbytes, offset, whence); - + ret = driver_read_ni(s, channel, data, nbytes); + sa_mutex_unlock(s->mutex); return ret; } -int sa_stream_pread_ni(sa_stream_t *s, unsigned channel, void *data, size_t nbytes, int64_t offset, sa_seek_t whence) { - int ret; +int sa_stream_write(sa_stream_t *s, const void *data, size_t nbytes) { + return sa_stream_pwrite(s, data, nbytes, 0, SA_SEEK_RELATIVE); +} +int sa_stream_write_ni(sa_stream_t *s, unsigned channel, const void *data, size_t nbytes) { + return sa_stream_pwrite_ni(s, channel, data, nbytes, 0, SA_SEEK_RELATIVE); +} + +int sa_stream_pwrite(sa_stream_t *s, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence) { + int ret; + sa_return_val_if_fail(s, SA_ERROR_INVALID); sa_return_val_if_fail(data, SA_ERROR_INVALID); sa_return_val_if_fail(nbytes > 0, SA_ERROR_INVALID); sa_return_val_if_fail(whence == SA_SEEK_RELATIVE || whence == SA_SEEK_ABSOLUTE || whence == SA_SEEK_RELATIVE_END, SA_ERROR_INVALID); sa_mutex_lock(s->mutex); - sa_return_val_if_fail_mutex(s->mutex, !s->codec, SA_ERROR_STATE); - sa_return_val_if_fail_mutex(s->mutex, channel < s->pcm_attrs.nchannels, SA_ERROR_INVALID); - sa_return_val_if_fail_mutex(s->mutex, s->ni_enabled, SA_ERROR_STATE); - sa_return_val_if_fail_mutex(s->mutex, (nbytes % s->pcm_sample_size) == 0, SA_ERROR_INVALID); - sa_return_val_if_fail_mutex(s->mutex, (offset % s->pcm_sample_size) == 0, SA_ERROR_INVALID); - sa_return_val_if_fail_mutex(s->mutex, s->mode & SA_MODE_RDONLY, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, !s->ni_enabled, SA_ERROR_STATE); + sa_return_val_if_fail_mutex(s->mutex, s->codec || (nbytes % s->pcm_frame_size) == 0, SA_ERROR_INVALID); + sa_return_val_if_fail_mutex(s->mutex, s->codec || (offset % s->pcm_frame_size) == 0, SA_ERROR_INVALID); + sa_return_val_if_fail_mutex(s->mutex, s->mode & SA_MODE_WRONLY, SA_ERROR_STATE); sa_return_val_if_fail_mutex(s->mutex, s->state == SA_STATE_RUNNING || s->state == SA_STATE_STOPPED, SA_ERROR_STATE); - ret = driver_pread_ni(s, channel, data, nbytes, offset, whence); - + ret = driver_pwrite(s, data, nbytes, offset, whence); + sa_mutex_unlock(s->mutex); return ret; } diff --git a/converter.c b/converter.c index dda8c10..3b146ce 100644 --- a/converter.c +++ b/converter.c @@ -367,7 +367,7 @@ void sa_converter_done(sa_converter_t *c) { memset(c, 0, sizeof(*c)); } -static void* get_zero_buffer(sa_converter_t *c, size_t size) { +void* sa_converter_get_zero_buffer(sa_converter_t *c, size_t size) { void *b; sa_assert(c); @@ -520,7 +520,7 @@ int sa_converter_go( if (p[0] == -1) { /* We have to write silence to this channel */ - if (!(b = get_zero_buffer(c, *size))) + if (!(b = sa_converter_get_zero_buffer(c, *size))) return SA_ERROR_OOM; c->to_process_data[i] = b; diff --git a/converter.h b/converter.h index 3230471..8112231 100644 --- a/converter.h +++ b/converter.h @@ -77,5 +77,6 @@ void sa_converter_set_volume(sa_converter_t *c, const int32_t vol[]); void sa_converter_set_ratio(sa_converter_t *c, unsigned rate1, unsigned rate2); +void* sa_converter_get_zero_buffer(sa_converter_t *c, size_t size); #endif diff --git a/driver.h b/driver.h index f58db66..be5d4b8 100644 --- a/driver.h +++ b/driver.h @@ -18,9 +18,10 @@ int driver_change_meta_data(sa_stream_t *dev, const char *name, const void *data int driver_get_state(sa_stream_t *dev, sa_state_t *state); int driver_get_position(sa_stream_t *dev, sa_position_t position, int64_t *pos); -int driver_pread(sa_stream_t *dev, void *data, size_t nbytes, int64_t offset, sa_seek_t whence); +int driver_read(sa_stream_t *dev, void *data, size_t nbytes); +int driver_read_ni(sa_stream_t *dev, unsigned channel, void *data, size_t nbytes); + int driver_pwrite(sa_stream_t *dev, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence); -int driver_pread_ni(sa_stream_t *dev, unsigned channel, void *data, size_t nbytes, int64_t offset, sa_seek_t whence); int driver_pwrite_ni(sa_stream_t *dev, unsigned channel, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence); int driver_get_read_size(sa_stream_t *dev, size_t *size); diff --git a/macro.h b/macro.h index 478ab07..5e4be44 100644 --- a/macro.h +++ b/macro.h @@ -5,15 +5,15 @@ #include #ifdef __GNUC__ -#define PRETTY_FUNCTION __PRETTY_FUNCTION__ +#define SA_PRETTY_FUNCTION __PRETTY_FUNCTION__ #else -#define PRETTY_FUNCTION "" +#define SA_PRETTY_FUNCTION "" #endif #define sa_return_if_fail(expr) \ do { \ if (!(expr)) { \ - fprintf(stderr, "%s: Assertion <%s> failed.\n", PRETTY_FUNCTION, #expr ); \ + fprintf(stderr, "%s: Assertion <%s> failed.\n", SA_PRETTY_FUNCTION, #expr ); \ return; \ } \ } while(0) @@ -21,7 +21,7 @@ #define sa_return_val_if_fail(expr, val) \ do { \ if (!(expr)) { \ - fprintf(stderr, "%s: Assertion <%s> failed.\n", PRETTY_FUNCTION, #expr ); \ + fprintf(stderr, "%s: Assertion <%s> failed.\n", SA_PRETTY_FUNCTION, #expr ); \ return (val); \ } \ } while(0) @@ -29,7 +29,7 @@ #define sa_return_if_fail_mutex(m, expr) \ do { \ if (!(expr)) { \ - fprintf(stderr, "%s: Assertion <%s> failed.\n", PRETTY_FUNCTION, #expr ); \ + fprintf(stderr, "%s: Assertion <%s> failed.\n", SA_PRETTY_FUNCTION, #expr ); \ sa_mutex_unlock(m); \ return; \ } \ @@ -39,7 +39,7 @@ do { \ if (!(expr)) { \ \ - fprintf(stderr, "%s: Assertion <%s> failed.\n", PRETTY_FUNCTION, #expr ); \ + fprintf(stderr, "%s: Assertion <%s> failed.\n", SA_PRETTY_FUNCTION, #expr ); \ sa_mutex_unlock(m); \ return (val); \ } \ @@ -54,19 +54,19 @@ sa_assert(_r == 0); \ } while(0) -#define elementsof(x) (sizeof(x)/sizeof((x)[0])) +#define SA_ELEMENTSOF(x) (sizeof(x)/sizeof((x)[0])) -#ifndef MAX -#define MAX(a, b) ((a) > (b) ? (a) : (b)) +#ifndef SA_MAX +#define SA_MAX(a, b) ((a) > (b) ? (a) : (b)) #endif -#ifndef MIN -#define MIN(a, b) ((a) < (b) ? (a) : (b)) +#ifndef SA_MIN +#define SA_MIN(a, b) ((a) < (b) ? (a) : (b)) #endif -static inline size_t align(size_t l) { +static inline size_t sa_align(size_t l) { return (((l + sizeof(void*) - 1) / sizeof(void*)) * sizeof(void*)); } -#define ALIGN(x) (align(x)) +#define SA_ALIGN(x) (sa_align(x)) #endif diff --git a/mutex.c b/mutex.c index 8674fff..12a4f4a 100644 --- a/mutex.c +++ b/mutex.c @@ -2,8 +2,8 @@ #include #endif -#include #include +#include #include "macro.h" #include "malloc.h" @@ -34,20 +34,31 @@ sa_mutex_t* sa_mutex_new(int recursive) { } void sa_mutex_free(sa_mutex_t *m) { - assert(m); + sa_assert(m); sa_assert_success(pthread_mutex_destroy(&m->mutex)); sa_free(m); } void sa_mutex_lock(sa_mutex_t *m) { - assert(m); + sa_assert(m); sa_assert_success(pthread_mutex_lock(&m->mutex)); } +int sa_mutex_try_lock(sa_mutex_t *m) { + int e; + sa_assert(m); + + if ((e = pthread_mutex_trylock(&m->mutex)) == 0) + return 1; + + sa_assert(e == EBUSY); + return 0; +} + void sa_mutex_unlock(sa_mutex_t *m) { - assert(m); + sa_assert(m); sa_assert_success(pthread_mutex_unlock(&m->mutex)); } @@ -63,14 +74,14 @@ sa_cond_t *sa_cond_new(void) { } void sa_cond_free(sa_cond_t *c) { - assert(c); + sa_assert(c); sa_assert_success(pthread_cond_destroy(&c->cond)); sa_free(c); } void sa_cond_signal(sa_cond_t *c, int broadcast) { - assert(c); + sa_assert(c); if (broadcast) sa_assert_success(pthread_cond_broadcast(&c->cond)); @@ -79,8 +90,8 @@ void sa_cond_signal(sa_cond_t *c, int broadcast) { } int sa_cond_wait(sa_cond_t *c, sa_mutex_t *m) { - assert(c); - assert(m); + sa_assert(c); + sa_assert(m); return pthread_cond_wait(&c->cond, &m->mutex); } diff --git a/oss.c b/oss.c index 3adce81..d09f8aa 100644 --- a/oss.c +++ b/oss.c @@ -16,6 +16,7 @@ #include "converter.h" #include "driver.h" #include "thread.h" +#include "bufferq.h" #define DEFAULT_DEVICE "/dev/dsp" #define DRIVER_NAME "oss" @@ -32,6 +33,7 @@ struct oss_stream { unsigned read_nfragments, write_nfragments; sa_thread_t *thread; int socket_fds[2]; + sa_bufferq_t bufferq; }; static int simple_log2(int v) { @@ -118,33 +120,27 @@ int driver_open(sa_stream_t *s) { /* Set fragment settings */ if (s->mode & SA_MODE_WRONLY) { - fs = s->write_upper_watermark - s->write_lower_watermark; - bs = s->write_upper_watermark; + bs = s->write_lower_watermark; } if (s->mode & SA_MODE_RDONLY) { - int rfs, rbs; + int rbs; - rfs = s->read_lower_watermark; rbs = s->read_upper_watermark; - if (s->mode & SA_MODE_WRONLY) { - fs = fs < rfs ? fs : rfs; + if (s->mode & SA_MODE_WRONLY) bs = bs > rbs ? bs : rbs; - } else { - fs = rfs; + else bs = rbs; - } } - if (!s->codec && real_bps) { + if (!s->codec && real_bps) bs = fixup_bps(bs, bps, real_bps); - fs = fixup_bps(fs, bps, real_bps); - } + fs = bs/4; l = simple_log2(fs); - m = (bs+(1< 0x7FFF) m = 0x7FFF; @@ -369,7 +365,7 @@ int driver_open(sa_stream_t *s) { if (phase == 0) { /* Find the next higher sample rate to try */ - for (i = 0; i < (int) elementsof(try_rates); i++) { + for (i = 0; i < (int) SA_ELEMENTSOF(try_rates); i++) { /* Yes, we could optimize a little here */ if (try_rates[i] > r) { @@ -379,7 +375,7 @@ int driver_open(sa_stream_t *s) { } - if (i == elementsof(try_rates)) { + if (i == SA_ELEMENTSOF(try_rates)) { phase = 1; r = s->pcm_attrs.rate; } @@ -388,7 +384,7 @@ int driver_open(sa_stream_t *s) { if (phase == 1) { /* Find the next lower sample rate to try */ - for (i = elementsof(try_rates); i > 0; i--) { + for (i = SA_ELEMENTSOF(try_rates); i > 0; i--) { if (suggested > try_rates[i-1] && suggested < r) { r = suggested; break; @@ -406,7 +402,7 @@ int driver_open(sa_stream_t *s) { if (phase == 0) { /* Find the next lower sample rate to try */ - for (i = elementsof(try_rates); i > 0; i--) { + for (i = SA_ELEMENTSOF(try_rates); i > 0; i--) { if (try_rates[i-1] < r) { r = try_rates[i-1]; @@ -423,7 +419,7 @@ int driver_open(sa_stream_t *s) { if (phase == 1) { /* Find the next higher sample rate to try */ - for (i = 0; i < (int) elementsof(try_rates); i++) { + for (i = 0; i < (int) SA_ELEMENTSOF(try_rates); i++) { if (suggested > r && suggested < try_rates[i]) { r = suggested; break; @@ -433,7 +429,7 @@ int driver_open(sa_stream_t *s) { } } - sa_assert(i < (int) elementsof(try_rates)); + sa_assert(i < (int) SA_ELEMENTSOF(try_rates)); } } @@ -552,6 +548,9 @@ int driver_open(sa_stream_t *s) { if (s->mode & SA_MODE_WRONLY) printf("Chosen for write: %u fragments, %u fragsize\n", oss->write_nfragments, oss->write_fragment_size); + + if (sa_bufferq_init(&oss->bufferq, s->pcm_attrs.nchannels, s->pcm_sample_size) < 0) + goto fail; return SA_SUCCESS; @@ -571,6 +570,8 @@ int driver_destroy(sa_stream_t *s) { if (oss->fd >= 0) close(oss->fd); + sa_bufferq_done(&oss->bufferq); + sa_free(oss->real_pcm_attrs.channel_map); sa_converter_done(&oss->converter_read); sa_converter_done(&oss->converter_write); @@ -581,6 +582,71 @@ int driver_destroy(sa_stream_t *s) { return SA_SUCCESS; } +static int feed(sa_stream_t *s) { + oss_stream_t *oss = OSS_STREAM(s); + size_t w; + int ret; + + sa_assert(s); + + for (;;) { + if ((ret = driver_get_write_size(s, &w)) < 0) + return ret; + + while (w > 0) { + void* i[1]; + size_t nbytes; + uint8_t *d; + int drop; + + sa_bufferq_get(&oss->bufferq, i, &nbytes); + + if (nbytes) { + void **dst; + size_t *stride; + + if ((ret = sa_converter_go_interleaved(&oss->converter_write, i[0], &dst, &stride, 1, &nbytes))) + return ret; + + d = dst[0]; + drop = 1; + + s->state = SA_STATE_RUNNING; + + } else { + nbytes = w > oss->write_fragment_size ? oss->write_fragment_size : w; + + if (!(d = sa_converter_get_zero_buffer(&oss->converter_write, nbytes))) + return SA_ERROR_OOM; + + drop = s->xrun_mode == SA_XRUN_MODE_SPIN; + + if (s->xrun_mode == SA_XRUN_MODE_STOP) + s->state = SA_STATE_STOPPED; + } + + while (nbytes > 0) { + ssize_t l; + + if ((l = write(oss->fd, d, nbytes)) < 0) + return SA_ERROR_SYSTEM; + + sa_assert(l > 0); + + nbytes -= l; + d += l; + + w -= 1; + + if (drop) + sa_bufferq_drop(&oss->bufferq, l); + } + } + } + + return SA_SUCCESS; +} + enum { POLLFD_OSS_FD, POLLFD_SOCKET_FD, @@ -595,10 +661,12 @@ static void thread_func(void *data) { sigfillset(&mask); pthread_sigmask(SIG_BLOCK, &mask, NULL); - - s->event = SA_EVENT_ERROR; - if (s->callback(s, SA_EVENT_INIT_THREAD) < 0) - return; + + if (s->callback) { + s->event = SA_EVENT_INIT_THREAD; + if (s->callback(s, SA_EVENT_INIT_THREAD) < 0) + return; + } memset(pollfds, 0, sizeof(pollfds)); @@ -609,24 +677,47 @@ static void thread_func(void *data) { pollfds[POLLFD_OSS_FD].events = ((s->mode & SA_MODE_RDONLY) ? POLLIN : 0) | ((s->mode & SA_MODE_WRONLY) ? POLLOUT : 0); for (;;) { + int ret; if (poll(pollfds, POLLFD_MAX, -1) < 0) { if (errno == EINTR) continue; + + if (s->callback) { + s->event = SA_EVENT_ERROR; + s->error = SA_ERROR_SYSTEM; + s->callback(s, SA_EVENT_ERROR); + } + break; } if (pollfds[POLLFD_SOCKET_FD].revents) break; if (pollfds[POLLFD_OSS_FD].revents & (POLLERR|POLLHUP|POLLNVAL)) { - s->event = SA_EVENT_ERROR; - s->error = SA_ERROR_SYSTEM; - errno = EIO; - s->callback(s, SA_EVENT_ERROR); + + if (s->callback) { + s->event = SA_EVENT_ERROR; + s->error = SA_ERROR_SYSTEM; + errno = EIO; + s->callback(s, SA_EVENT_ERROR); + } break; } - s->event = SA_EVENT_REQUEST_IO; - s->callback(s, SA_EVENT_REQUEST_IO); + if (s->callback) { + s->event = SA_EVENT_REQUEST_IO; + if (s->callback(s, SA_EVENT_REQUEST_IO) < 0) + break; + } + + if ((ret = feed(s)) < 0) { + if (s->callback) { + s->event = SA_EVENT_ERROR; + s->error = ret; + s->callback(s, SA_EVENT_ERROR); + } + break; + } } } @@ -697,45 +788,32 @@ int driver_change_rate(sa_stream_t *s, unsigned rate) { } int driver_get_state(sa_stream_t *s, sa_state_t *state) { - return SA_ERROR_NOT_SUPPORTED; + *state = s->state; + return SA_SUCCESS; } int driver_get_position(sa_stream_t *s, sa_position_t position, int64_t *pos) { return SA_ERROR_NOT_SUPPORTED; } -int driver_pread(sa_stream_t *s, void *data, size_t nbytes, int64_t offset, sa_seek_t whence) { +int driver_read(sa_stream_t *s, void *data, size_t nbytes) { return SA_ERROR_NOT_SUPPORTED; } int driver_pwrite(sa_stream_t *s, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence) { oss_stream_t *oss = OSS_STREAM(s); - void **dst; - size_t *stride; - int ret; - uint8_t *d; + int ret, ret2; - if ((ret = sa_converter_go_interleaved(&oss->converter_write, data, &dst, &stride, 1, &nbytes))) + if ((ret = sa_bufferq_push(&oss->bufferq, 0, data, nbytes, offset, whence, SA_BUFFERQ_ITEM_STATIC))) return ret; - - d = dst[0]; - while (nbytes > 0) { - ssize_t l; - - if ((l = write(oss->fd, d, nbytes)) < 0) - return SA_ERROR_SYSTEM; - - sa_assert(l > 0); + ret = feed(s); + ret2 = sa_bufferq_realloc(&oss->bufferq); - nbytes -= l; - d += l; - } - - return SA_SUCCESS; + return ret ? ret : ret2; } -int driver_pread_ni(sa_stream_t *s, unsigned channel, void *data, size_t nbytes, int64_t offset, sa_seek_t whence) { +int driver_read_ni(sa_stream_t *s, unsigned channel, void *data, size_t nbytes) { return SA_ERROR_NOT_SUPPORTED; } @@ -761,6 +839,7 @@ int driver_pause(sa_stream_t *s) { int driver_drain(sa_stream_t *s) { oss_stream_t *oss = OSS_STREAM(s); + sa_return_val_if_fail(!oss->thread, SA_ERROR_STATE); if (ioctl(oss->fd, SNDCTL_DSP_SYNC, NULL) < 0) return SA_ERROR_SYSTEM; diff --git a/sydney.h b/sydney.h index 032caaf..3103c54 100644 --- a/sydney.h +++ b/sydney.h @@ -342,21 +342,15 @@ int sa_stream_get_position(sa_stream_t *s, sa_position_t position, int64_t *pos) /** Interleaved capture function */ int sa_stream_read(sa_stream_t *s, void *data, size_t nbytes); -/** Interleaved playback function */ -int sa_stream_write(sa_stream_t *s, const void *data, size_t nbytes); - /** Non-interleaved capture function */ int sa_stream_read_ni(sa_stream_t *s, unsigned channel, void *data, size_t nbytes); + +/** Interleaved playback function */ +int sa_stream_write(sa_stream_t *s, const void *data, size_t nbytes); /** Non-interleaved playback function */ int sa_stream_write_ni(sa_stream_t *s, unsigned channel, const void *data, size_t nbytes); - -/** Interleaved capture function with seek offset */ -int sa_stream_pread(sa_stream_t *s, void *data, size_t nbytes, int64_t offset, sa_seek_t whence); /** Interleaved playback function with seek offset */ int sa_stream_pwrite(sa_stream_t *s, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence); - -/** Non-interleaved capture function with seek offset */ -int sa_stream_pread_ni(sa_stream_t *s, unsigned channel, void *data, size_t nbytes, int64_t offset, sa_seek_t whence); /** Non-interleaved playback function with seek offset */ int sa_stream_pwrite_ni(sa_stream_t *s, unsigned channel, const void *data, size_t nbytes, int64_t offset, sa_seek_t whence); diff --git a/test-bufferq.c b/test-bufferq.c index 6c57384..45d65eb 100644 --- a/test-bufferq.c +++ b/test-bufferq.c @@ -1,4 +1,5 @@ #include "bufferq.h" +#include "malloc.h" int main(int argc, char *argv[]) { @@ -6,12 +7,12 @@ int main(int argc, char *argv[]) { sa_bufferq_init(&q, 1, 1); - sa_bufferq_push(&q, 0, "{AAAAAAAA}", 10, 0, SA_SEEK_RELATIVE, 0); - sa_bufferq_push(&q, 0, "", 10, 5, SA_SEEK_RELATIVE, 0); - sa_bufferq_push(&q, 0, "[CCCC]", 6, -18, SA_SEEK_RELATIVE, 0); - sa_bufferq_push(&q, 0, "(DDDD)", 6, -3, SA_SEEK_ABSOLUTE, 0); - sa_bufferq_push(&q, 0, "XXX", 3, 10, SA_SEEK_RELATIVE_END, 0); - sa_bufferq_push(&q, 0, "YYYYY", 5, -4, SA_SEEK_RELATIVE, 0); + sa_bufferq_push(&q, 0, "{AAAAAAAA}", 10, 0, SA_SEEK_RELATIVE, SA_BUFFERQ_ITEM_STATIC); + sa_bufferq_push(&q, 0, "", 10, 5, SA_SEEK_RELATIVE, SA_BUFFERQ_ITEM_STATIC); + sa_bufferq_push(&q, 0, "[CCCC]", 6, -18, SA_SEEK_RELATIVE, SA_BUFFERQ_ITEM_STATIC); + sa_bufferq_push(&q, 0, "(DDDD)", 6, -3, SA_SEEK_ABSOLUTE, SA_BUFFERQ_ITEM_STATIC); + sa_bufferq_push(&q, 0, sa_strdup("XXX"), 3, 10, SA_SEEK_RELATIVE_END, SA_BUFFERQ_ITEM_DYNAMIC); + sa_bufferq_push(&q, 0, "YYYYY", 5, -4, SA_SEEK_RELATIVE, SA_BUFFERQ_ITEM_CONCATENATED); sa_bufferq_realloc(&q); diff --git a/thread.c b/thread.c index 806e210..26deccb 100644 --- a/thread.c +++ b/thread.c @@ -143,6 +143,20 @@ sa_thread_t* sa_thread_self(void) { return t; } +int sa_thread_is_self(sa_thread_t *t) { + sa_thread_t *c; + sa_assert(t); + + if (sa_once(&thread_tls_once, thread_tls_once_func) < 0 || !thread_tls) + return 0; + + if ((c = sa_tls_get(thread_tls))) + return c == t; + + return 0; +} + + void* sa_thread_get_data(sa_thread_t *t) { sa_assert(t); diff --git a/thread.h b/thread.h index 1a61e7a..ddd2604 100644 --- a/thread.h +++ b/thread.h @@ -11,6 +11,7 @@ int sa_thread_join(sa_thread_t *t); int sa_thread_is_running(sa_thread_t *t); sa_thread_t *sa_thread_self(void); void sa_thread_yield(void); +int sa_thread_is_self(sa_thread_t *t); void* sa_thread_get_data(sa_thread_t *t); void sa_thread_set_data(sa_thread_t *t, void *userdata); -- cgit