diff options
author | Lennart Poettering <lennart@poettering.net> | 2006-02-20 04:05:16 +0000 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2006-02-20 04:05:16 +0000 |
commit | 304449002cbc84fdcf235b5dfaec891278dd7085 (patch) | |
tree | 2a2d00e34d5c620835b76a0d6f7890a1d3e9fb97 /src/polyp | |
parent | 0876b1ba82ea9c988df90ca98d202765ac697313 (diff) |
1) Add flexible seeking support (including absolute) for memory block queues and playback streams
2) Add support to synchronize multiple playback streams
3) add two tests for 1) and 2)
4) s/PA_ERROR/PA_ERR/
5) s/PA_ERROR_OK/PA_OK/
6) update simple API to deal properly with new peek/drop recording API
7) add beginnings of proper validity checking on API calls in client libs (needs to be extended)
8) report playback buffer overflows/underflows to the client
9) move client side recording mcalign stuff into the memblockq
10) create typedefs for a bunch of API callback prototypes
11) simplify handling of HUP poll() events
Yes, i know, it's usually better to commit a lot of small patches instead of a
single big one. In this case however, this would have contradicted the other
rule: never commit broken or incomplete stuff.
*** This stuff needs a lot of additional testing! ***
git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@511 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/polyp')
-rw-r--r-- | src/polyp/context.c | 73 | ||||
-rw-r--r-- | src/polyp/def.h | 45 | ||||
-rw-r--r-- | src/polyp/error.c | 37 | ||||
-rw-r--r-- | src/polyp/error.h | 2 | ||||
-rw-r--r-- | src/polyp/internal.h | 37 | ||||
-rw-r--r-- | src/polyp/introspect.c | 24 | ||||
-rw-r--r-- | src/polyp/simple.c | 41 | ||||
-rw-r--r-- | src/polyp/stream.c | 366 | ||||
-rw-r--r-- | src/polyp/stream.h | 140 | ||||
-rw-r--r-- | src/polyp/subscribe.c | 2 |
10 files changed, 485 insertions, 282 deletions
diff --git a/src/polyp/context.c b/src/polyp/context.c index 6b778562..eac5dd23 100644 --- a/src/polyp/context.c +++ b/src/polyp/context.c @@ -74,6 +74,8 @@ static const pa_pdispatch_callback command_table[PA_COMMAND_MAX] = { [PA_COMMAND_REQUEST] = pa_command_request, + [PA_COMMAND_OVERFLOW] = pa_command_overflow_or_underflow, + [PA_COMMAND_UNDERFLOW] = pa_command_overflow_or_underflow, [PA_COMMAND_PLAYBACK_STREAM_KILLED] = pa_command_stream_killed, [PA_COMMAND_RECORD_STREAM_KILLED] = pa_command_stream_killed, [PA_COMMAND_SUBSCRIBE_EVENT] = pa_command_subscribe_event @@ -109,9 +111,10 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) { PA_LLIST_HEAD_INIT(pa_stream, c->streams); PA_LLIST_HEAD_INIT(pa_operation, c->operations); - c->error = PA_ERROR_OK; + c->error = PA_OK; c->state = PA_CONTEXT_UNCONNECTED; c->ctag = 0; + c->csyncid = 0; c->state_callback = NULL; c->state_userdata = NULL; @@ -234,14 +237,24 @@ void pa_context_set_state(pa_context *c, pa_context_state_t st) { void pa_context_fail(pa_context *c, int error) { assert(c); - c->error = error; + + pa_context_set_error(c, error); pa_context_set_state(c, PA_CONTEXT_FAILED); } +int pa_context_set_error(pa_context *c, int error) { + assert(error >= 0 && error < PA_ERR_MAX); + + if (c) + c->error = error; + + return error; +} + static void pstream_die_callback(pa_pstream *p, void *userdata) { pa_context *c = userdata; assert(p && c); - pa_context_fail(c, PA_ERROR_CONNECTIONTERMINATED); + pa_context_fail(c, PA_ERR_CONNECTIONTERMINATED); } static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *userdata) { @@ -252,34 +265,34 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *user if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) { pa_log(__FILE__": invalid packet.\n"); - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); } pa_context_unref(c); } -static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, PA_GCC_UNUSED uint32_t delta, const pa_memchunk *chunk, void *userdata) { +static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) { pa_context *c = userdata; pa_stream *s; - assert(p && chunk && c && chunk->memblock && chunk->memblock->data); + + assert(p); + assert(chunk); + assert(chunk->memblock); + assert(chunk->length); + assert(c); pa_context_ref(c); if ((s = pa_dynarray_get(c->record_streams, channel))) { - pa_mcalign_push(s->mcalign, chunk); - for (;;) { - pa_memchunk t; - - if (pa_mcalign_pop(s->mcalign, &t) < 0) - break; + pa_memblockq_seek(s->record_memblockq, offset, seek); + pa_memblockq_push_align(s->record_memblockq, chunk); - assert(s->record_memblockq); - pa_memblockq_push(s->record_memblockq, &t, 0); - if (s->read_callback) - s->read_callback(s, pa_stream_readable_size(s), s->read_userdata); + if (s->read_callback) { + size_t l; - pa_memblock_unref(t.memblock); + if ((l = pa_memblockq_get_length(s->record_memblockq)) > 0) + s->read_callback(s, l, s->read_userdata); } } @@ -293,14 +306,14 @@ int pa_context_handle_error(pa_context *c, uint32_t command, pa_tagstruct *t) { assert(t); if (pa_tagstruct_getu32(t, &c->error) < 0) { - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); return -1; } } else if (command == PA_COMMAND_TIMEOUT) - c->error = PA_ERROR_TIMEOUT; + c->error = PA_ERR_TIMEOUT; else { - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); return -1; } @@ -316,7 +329,7 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t if (command != PA_COMMAND_REPLY) { if (pa_context_handle_error(c, command, t) < 0) - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); pa_context_fail(c, c->error); goto finish; @@ -368,7 +381,7 @@ static void setup_context(pa_context *c, pa_iochannel *io) { assert(c->pdispatch); if (!c->conf->cookie_valid) { - pa_context_fail(c, PA_ERROR_AUTHKEY); + pa_context_fail(c, PA_ERR_AUTHKEY); goto finish; } @@ -401,7 +414,7 @@ static int context_connect_spawn(pa_context *c) { if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { pa_log(__FILE__": socketpair() failed: %s\n", strerror(errno)); - pa_context_fail(c, PA_ERROR_INTERNAL); + pa_context_fail(c, PA_ERR_INTERNAL); goto fail; } @@ -415,7 +428,7 @@ static int context_connect_spawn(pa_context *c) { if ((pid = fork()) < 0) { pa_log(__FILE__": fork() failed: %s\n", strerror(errno)); - pa_context_fail(c, PA_ERROR_INTERNAL); + pa_context_fail(c, PA_ERR_INTERNAL); if (c->spawn_api.postfork) c->spawn_api.postfork(); @@ -471,10 +484,10 @@ static int context_connect_spawn(pa_context *c) { if (r < 0) { pa_log(__FILE__": waitpid() failed: %s\n", strerror(errno)); - pa_context_fail(c, PA_ERROR_INTERNAL); + pa_context_fail(c, PA_ERR_INTERNAL); goto fail; } else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { - pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED); + pa_context_fail(c, PA_ERR_CONNECTIONREFUSED); goto fail; } @@ -527,7 +540,7 @@ static int try_next_connection(pa_context *c) { } #endif - pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED); + pa_context_fail(c, PA_ERR_CONNECTIONREFUSED); goto finish; } @@ -569,7 +582,7 @@ static void on_connection(pa_socket_client *client, pa_iochannel*io, void *userd goto finish; } - pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED); + pa_context_fail(c, PA_ERR_CONNECTIONREFUSED); goto finish; } @@ -593,7 +606,7 @@ int pa_context_connect(pa_context *c, const char *server, int spawn, const pa_sp if (server) { if (!(c->server_list = pa_strlist_parse(server))) { - pa_context_fail(c, PA_ERROR_INVALIDSERVER); + pa_context_fail(c, PA_ERR_INVALIDSERVER); goto finish; } } else { @@ -759,7 +772,7 @@ void pa_context_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_U success = 0; } else if (!pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } diff --git a/src/polyp/def.h b/src/polyp/def.h index ba35b31e..f8e0bed4 100644 --- a/src/polyp/def.h +++ b/src/polyp/def.h @@ -47,7 +47,7 @@ typedef enum pa_context_state { /** The state of a stream */ typedef enum pa_stream_state { - PA_STREAM_DISCONNECTED, /**< The stream is not yet connected to any sink or source */ + PA_STREAM_UNCONNECTED, /**< The stream is not yet connected to any sink or source */ PA_STREAM_CREATING, /**< The stream is being created */ PA_STREAM_READY, /**< The stream is established, you may pass audio data to it now */ PA_STREAM_FAILED, /**< An error occured that made the stream invalid */ @@ -103,22 +103,24 @@ typedef struct pa_buffer_attr { /** Error values as used by pa_context_errno(). Use pa_strerror() to convert these values to human readable strings */ enum { - PA_ERROR_OK, /**< No error */ - PA_ERROR_ACCESS, /**< Access failure */ - PA_ERROR_COMMAND, /**< Unknown command */ - PA_ERROR_INVALID, /**< Invalid argument */ - PA_ERROR_EXIST, /**< Entity exists */ - PA_ERROR_NOENTITY, /**< No such entity */ - PA_ERROR_CONNECTIONREFUSED, /**< Connection refused */ - PA_ERROR_PROTOCOL, /**< Protocol error */ - PA_ERROR_TIMEOUT, /**< Timeout */ - PA_ERROR_AUTHKEY, /**< No authorization key */ - PA_ERROR_INTERNAL, /**< Internal error */ - PA_ERROR_CONNECTIONTERMINATED, /**< Connection terminated */ - PA_ERROR_KILLED, /**< Entity killed */ - PA_ERROR_INVALIDSERVER, /**< Invalid server */ - PA_ERROR_INITFAILED, /**< Module initialization failed */ - PA_ERROR_MAX /**< Not really an error but the first invalid error code */ + PA_OK = 0, /**< No error */ + PA_ERR_ACCESS, /**< Access failure */ + PA_ERR_COMMAND, /**< Unknown command */ + PA_ERR_INVALID, /**< Invalid argument */ + PA_ERR_EXIST, /**< Entity exists */ + PA_ERR_NOENTITY, /**< No such entity */ + PA_ERR_CONNECTIONREFUSED, /**< Connection refused */ + PA_ERR_PROTOCOL, /**< Protocol error */ + PA_ERR_TIMEOUT, /**< Timeout */ + PA_ERR_AUTHKEY, /**< No authorization key */ + PA_ERR_INTERNAL, /**< Internal error */ + PA_ERR_CONNECTIONTERMINATED, /**< Connection terminated */ + PA_ERR_KILLED, /**< Entity killed */ + PA_ERR_INVALIDSERVER, /**< Invalid server */ + PA_ERR_MODINITFAILED, /**< Module initialization failed */ + PA_ERR_BADSTATE, /**< Bad state */ + PA_ERR_NODATA, /**< No data */ + PA_ERR_MAX /**< Not really an error but the first invalid error code */ }; /** Subscription event mask, as used by pa_context_subscribe() */ @@ -208,6 +210,15 @@ typedef struct pa_spawn_api { * passed to the new process. */ } pa_spawn_api; +/** Seek type \since 0.8*/ +typedef enum pa_seek_mode { + PA_SEEK_RELATIVE = 0, /**< Seek relatively to the write index */ + PA_SEEK_ABSOLUTE = 1, /**< Seek relatively to the start of the buffer queue */ + PA_SEEK_RELATIVE_ON_READ = 2, /**< Seek relatively to the read index */ + PA_SEEK_RELATIVE_END = 3, /**< Seek relatively to the current end of the buffer queue */ +} pa_seek_mode_t; + + PA_C_DECL_END #endif diff --git a/src/polyp/error.c b/src/polyp/error.c index ece77bf2..eff37cc8 100644 --- a/src/polyp/error.c +++ b/src/polyp/error.c @@ -30,25 +30,28 @@ #include "error.h" -static const char* const errortab[PA_ERROR_MAX] = { - [PA_ERROR_OK] = "OK", - [PA_ERROR_ACCESS] = "Access denied", - [PA_ERROR_COMMAND] = "Unknown command", - [PA_ERROR_INVALID] = "Invalid argument", - [PA_ERROR_EXIST] = "Entity exists", - [PA_ERROR_NOENTITY] = "No such entity", - [PA_ERROR_CONNECTIONREFUSED] = "Connection refused", - [PA_ERROR_PROTOCOL] = "Protocol error", - [PA_ERROR_TIMEOUT] = "Timeout", - [PA_ERROR_AUTHKEY] = "No authorization key", - [PA_ERROR_INTERNAL] = "Internal error", - [PA_ERROR_CONNECTIONTERMINATED] = "Connection terminated", - [PA_ERROR_KILLED] = "Entity killed", - [PA_ERROR_INVALIDSERVER] = "Invalid server", +static const char* const errortab[PA_ERR_MAX] = { + [PA_OK] = "OK", + [PA_ERR_ACCESS] = "Access denied", + [PA_ERR_COMMAND] = "Unknown command", + [PA_ERR_INVALID] = "Invalid argument", + [PA_ERR_EXIST] = "Entity exists", + [PA_ERR_NOENTITY] = "No such entity", + [PA_ERR_CONNECTIONREFUSED] = "Connection refused", + [PA_ERR_PROTOCOL] = "Protocol error", + [PA_ERR_TIMEOUT] = "Timeout", + [PA_ERR_AUTHKEY] = "No authorization key", + [PA_ERR_INTERNAL] = "Internal error", + [PA_ERR_CONNECTIONTERMINATED] = "Connection terminated", + [PA_ERR_KILLED] = "Entity killed", + [PA_ERR_INVALIDSERVER] = "Invalid server", + [PA_ERR_MODINITFAILED] = "Module initalization failed", + [PA_ERR_BADSTATE] = "Bad state", + [PA_ERR_NODATA] = "No data", }; -const char*pa_strerror(uint32_t error) { - if (error >= PA_ERROR_MAX) +const char*pa_strerror(int error) { + if (error < 0 || error >= PA_ERR_MAX) return NULL; return errortab[error]; diff --git a/src/polyp/error.h b/src/polyp/error.h index ff2507b2..9856c1af 100644 --- a/src/polyp/error.h +++ b/src/polyp/error.h @@ -31,7 +31,7 @@ PA_C_DECL_BEGIN /** Return a human readable error message for the specified numeric error code */ -const char* pa_strerror(uint32_t error); +const char* pa_strerror(int error); PA_C_DECL_END diff --git a/src/polyp/internal.h b/src/polyp/internal.h index 7f4d38ac..578969ee 100644 --- a/src/polyp/internal.h +++ b/src/polyp/internal.h @@ -54,8 +54,9 @@ struct pa_context { pa_dynarray *record_streams, *playback_streams; PA_LLIST_HEAD(pa_stream, streams); PA_LLIST_HEAD(pa_operation, operations); - + uint32_t ctag; + uint32_t csyncid; uint32_t error; pa_context_state_t state; @@ -90,6 +91,7 @@ struct pa_stream { pa_sample_spec sample_spec; pa_channel_map channel_map; uint32_t channel; + uint32_t syncid; int channel_valid; uint32_t device_index; pa_stream_direction_t direction; @@ -98,7 +100,6 @@ struct pa_stream { pa_usec_t previous_time; pa_usec_t previous_ipol_time; pa_stream_state_t state; - pa_mcalign *mcalign; pa_memchunk peek_memchunk; pa_memblockq *record_memblockq; @@ -110,14 +111,20 @@ struct pa_stream { pa_time_event *ipol_event; int ipol_requested; - void (*state_callback)(pa_stream*c, void *userdata); + pa_stream_notify_cb_t state_callback; void *state_userdata; - void (*read_callback)(pa_stream *p, size_t length, void *userdata); + pa_stream_request_cb_t read_callback; void *read_userdata; - void (*write_callback)(pa_stream *p, size_t length, void *userdata); + pa_stream_request_cb_t write_callback; void *write_userdata; + + pa_stream_notify_cb_t overflow_callback; + void *overflow_userdata; + + pa_stream_notify_cb_t underflow_callback; + void *underflow_userdata; }; typedef void (*pa_operation_callback)(void); @@ -136,6 +143,7 @@ struct pa_operation { void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); void pa_command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); +void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); pa_operation *pa_operation_new(pa_context *c, pa_stream *s); void pa_operation_done(pa_operation *o); @@ -146,6 +154,7 @@ void pa_context_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); void pa_context_fail(pa_context *c, int error); +int pa_context_set_error(pa_context *c, int error); void pa_context_set_state(pa_context *c, pa_context_state_t st); int pa_context_handle_error(pa_context *c, uint32_t command, pa_tagstruct *t); pa_operation* pa_context_send_simple_command(pa_context *c, uint32_t command, void (*internal_callback)(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata), void (*cb)(void), void *userdata); @@ -154,5 +163,23 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st); void pa_stream_trash_ipol(pa_stream *s); +#define PA_CHECK_VALIDITY(context, expression, error) do { \ + if (!(expression)) \ + return -pa_context_set_error((context), (error)); \ +} while(0) + +#define PA_CHECK_VALIDITY_RETURN_NULL(context, expression, error) do { \ + if (!(expression)) { \ + pa_context_set_error((context), (error)); \ + return NULL; \ + } \ +} while(0) + +#define PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, value) do { \ + if (!(expression)) { \ + pa_context_set_error((context), (error)); \ + return value; \ + } \ +} while(0) #endif diff --git a/src/polyp/introspect.c b/src/polyp/introspect.c index 4af724b4..75ce3ff9 100644 --- a/src/polyp/introspect.c +++ b/src/polyp/introspect.c @@ -52,7 +52,7 @@ static void context_stat_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNU pa_tagstruct_getu32(t, &i.memblock_allocated_size) < 0 || pa_tagstruct_getu32(t, &i.scache_size) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -92,7 +92,7 @@ static void context_get_server_info_callback(pa_pdispatch *pd, uint32_t command, pa_tagstruct_getu32(t, &i.cookie) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -139,7 +139,7 @@ static void context_get_sink_info_callback(pa_pdispatch *pd, uint32_t command, P pa_tagstruct_get_usec(t, &i.latency) < 0 || pa_tagstruct_gets(t, &i.driver) < 0) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -234,7 +234,7 @@ static void context_get_source_info_callback(pa_pdispatch *pd, uint32_t command, pa_tagstruct_get_usec(t, &i.latency) < 0 || pa_tagstruct_gets(t, &i.driver) < 0) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -322,7 +322,7 @@ static void context_get_client_info_callback(pa_pdispatch *pd, uint32_t command, pa_tagstruct_gets(t, &i.name) < 0 || pa_tagstruct_getu32(t, &i.owner_module) < 0 || pa_tagstruct_gets(t, &i.driver) < 0 ) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -389,7 +389,7 @@ static void context_get_module_info_callback(pa_pdispatch *pd, uint32_t command, pa_tagstruct_gets(t, &i.argument) < 0 || pa_tagstruct_getu32(t, &i.n_used) < 0 || pa_tagstruct_get_boolean(t, &i.auto_unload) < 0) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -464,7 +464,7 @@ static void context_get_sink_input_info_callback(pa_pdispatch *pd, uint32_t comm pa_tagstruct_gets(t, &i.resample_method) < 0 || pa_tagstruct_gets(t, &i.driver) < 0) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -538,7 +538,7 @@ static void context_get_source_output_info_callback(pa_pdispatch *pd, uint32_t c pa_tagstruct_gets(t, &i.resample_method) < 0 || pa_tagstruct_gets(t, &i.driver) < 0) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -677,7 +677,7 @@ static void context_get_sample_info_callback(pa_pdispatch *pd, uint32_t command, pa_tagstruct_get_boolean(t, &i.lazy) < 0 || pa_tagstruct_gets(t, &i.filename) < 0) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -787,7 +787,7 @@ static void load_module_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUS } else if (pa_tagstruct_getu32(t, &idx) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -848,7 +848,7 @@ static void context_get_autoload_info_callback(pa_pdispatch *pd, uint32_t comman pa_tagstruct_getu32(t, &i.type) < 0 || pa_tagstruct_gets(t, &i.module) < 0 || pa_tagstruct_gets(t, &i.argument) < 0) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } @@ -926,7 +926,7 @@ static void context_add_autoload_callback(pa_pdispatch *pd, uint32_t command, PA idx = PA_INVALID_INDEX; } else if (pa_tagstruct_getu32(t, &idx) || !pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } diff --git a/src/polyp/simple.c b/src/polyp/simple.c index e14cab2e..8a20c223 100644 --- a/src/polyp/simple.c +++ b/src/polyp/simple.c @@ -45,13 +45,11 @@ struct pa_simple { int dead; - void *read_data; + const void *read_data; size_t read_index, read_length; pa_usec_t latency; }; -static void read_callback(pa_stream *s, const void*data, size_t length, void *userdata); - static int check_error(pa_simple *p, int *rerror) { pa_context_state_t cst; pa_stream_state_t sst; @@ -92,7 +90,7 @@ static int iterate(pa_simple *p, int block, int *rerror) { do { if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) { if (rerror) - *rerror = PA_ERROR_INTERNAL; + *rerror = PA_ERR_INTERNAL; return -1; } @@ -106,7 +104,7 @@ static int iterate(pa_simple *p, int block, int *rerror) { if (pa_mainloop_iterate(p->mainloop, 0, NULL) < 0) { if (rerror) - *rerror = PA_ERROR_INTERNAL; + *rerror = PA_ERR_INTERNAL; return -1; } @@ -128,7 +126,7 @@ pa_simple* pa_simple_new( int *rerror) { pa_simple *p; - int error = PA_ERROR_INTERNAL; + int error = PA_ERR_INTERNAL; assert(ss && (dir == PA_STREAM_PLAYBACK || dir == PA_STREAM_RECORD)); p = pa_xmalloc(sizeof(pa_simple)); @@ -157,7 +155,7 @@ pa_simple* pa_simple_new( goto fail; if (dir == PA_STREAM_PLAYBACK) - pa_stream_connect_playback(p->stream, dev, attr, 0, NULL); + pa_stream_connect_playback(p->stream, dev, attr, 0, NULL, NULL); else pa_stream_connect_record(p->stream, dev, attr, 0); @@ -167,8 +165,6 @@ pa_simple* pa_simple_new( goto fail; } - pa_stream_set_read_callback(p->stream, read_callback, p); - return p; fail: @@ -181,8 +177,6 @@ fail: void pa_simple_free(pa_simple *s) { assert(s); - pa_xfree(s->read_data); - if (s->stream) pa_stream_unref(s->stream); @@ -215,7 +209,7 @@ int pa_simple_write(pa_simple *p, const void*data, size_t length, int *rerror) { if (l > length) l = length; - pa_stream_write(p->stream, data, l, NULL, 0); + pa_stream_write(p->stream, data, l, NULL, 0, PA_SEEK_RELATIVE); data = (const uint8_t*) data + l; length -= l; } @@ -227,19 +221,6 @@ int pa_simple_write(pa_simple *p, const void*data, size_t length, int *rerror) { return 0; } -static void read_callback(pa_stream *s, const void*data, size_t length, void *userdata) { - pa_simple *p = userdata; - assert(s && data && length && p); - - if (p->read_data) { - pa_log(__FILE__": Buffer overflow, dropping incoming memory blocks.\n"); - pa_xfree(p->read_data); - } - - p->read_data = pa_xmemdup(data, p->read_length = length); - p->read_index = 0; -} - int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) { assert(p && data && p->direction == PA_STREAM_RECORD); @@ -251,13 +232,18 @@ int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) { } while (length > 0) { + + if (!p->read_data) + if (pa_stream_peek(p->stream, &p->read_data, &p->read_length) >= 0) + p->read_index = 0; + if (p->read_data) { size_t l = length; if (p->read_length <= l) l = p->read_length; - memcpy(data, (uint8_t*) p->read_data+p->read_index, l); + memcpy(data, (const uint8_t*) p->read_data+p->read_index, l); data = (uint8_t*) data + l; length -= l; @@ -266,8 +252,9 @@ int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) { p->read_length -= l; if (!p->read_length) { - pa_xfree(p->read_data); + pa_stream_drop(p->stream); p->read_data = NULL; + p->read_length = 0; p->read_index = 0; } diff --git a/src/polyp/stream.c b/src/polyp/stream.c index 35de2d01..5497f0c4 100644 --- a/src/polyp/stream.c +++ b/src/polyp/stream.c @@ -28,6 +28,7 @@ #include <stdio.h> #include <string.h> +#include <polyp/def.h> #include <polypcore/xmalloc.h> #include <polypcore/pstream-util.h> #include <polypcore/util.h> @@ -39,14 +40,11 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) { pa_stream *s; + assert(c); - assert(ss); - - if (!pa_sample_spec_valid(ss)) - return NULL; - if (map && !pa_channel_map_valid(map)) - return NULL; + PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID); + PA_CHECK_VALIDITY_RETURN_NULL(c, !map || pa_channel_map_valid(map), PA_ERR_INVALID); s = pa_xnew(pa_stream, 1); s->ref = 1; @@ -59,6 +57,10 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * s->write_userdata = NULL; s->state_callback = NULL; s->state_userdata = NULL; + s->overflow_callback = NULL; + s->overflow_userdata = NULL; + s->underflow_callback = NULL; + s->underflow_userdata = NULL; s->direction = PA_STREAM_NODIRECTION; s->name = pa_xstrdup(name); @@ -71,13 +73,13 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * s->channel = 0; s->channel_valid = 0; + s->syncid = c->csyncid++; s->device_index = PA_INVALID_INDEX; s->requested_bytes = 0; - s->state = PA_STREAM_DISCONNECTED; + s->state = PA_STREAM_UNCONNECTED; memset(&s->buffer_attr, 0, sizeof(s->buffer_attr)); - s->mcalign = pa_mcalign_new(pa_frame_size(ss), c->memblock_stat); - + s->peek_memchunk.index = 0; s->peek_memchunk.length = 0; s->peek_memchunk.memblock = NULL; @@ -114,42 +116,52 @@ static void stream_free(pa_stream *s) { if (s->record_memblockq) pa_memblockq_free(s->record_memblockq); - pa_mcalign_free(s->mcalign); - pa_xfree(s->name); pa_xfree(s); } void pa_stream_unref(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); if (--(s->ref) == 0) stream_free(s); } pa_stream* pa_stream_ref(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + s->ref++; return s; } pa_stream_state_t pa_stream_get_state(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + return s->state; } pa_context* pa_stream_get_context(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + return s->context; } uint32_t pa_stream_get_index(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX); + return s->device_index; } void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); if (s->state == st) return; @@ -159,6 +171,8 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) { s->state = st; if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) { + /* Detach from context */ + if (s->channel_valid) pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL); @@ -182,14 +196,14 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED if (pa_tagstruct_getu32(t, &channel) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); goto finish; } if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel))) goto finish; - c->error = PA_ERROR_KILLED; + c->error = PA_ERR_KILLED; pa_stream_set_state(s, PA_STREAM_FAILED); finish: @@ -207,24 +221,55 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32 if (pa_tagstruct_getu32(t, &channel) < 0 || pa_tagstruct_getu32(t, &bytes) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); goto finish; } if (!(s = pa_dynarray_get(c->playback_streams, channel))) goto finish; - if (s->state != PA_STREAM_READY) - goto finish; + if (s->state == PA_STREAM_READY) { + s->requested_bytes += bytes; + + if (s->requested_bytes > 0 && s->write_callback) + s->write_callback(s, s->requested_bytes, s->write_userdata); + } - pa_stream_ref(s); +finish: + pa_context_unref(c); +} + +void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) { + pa_stream *s; + pa_context *c = userdata; + uint32_t channel; + + assert(pd); + assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW); + assert(t); + assert(c); + + pa_context_ref(c); + + if (pa_tagstruct_getu32(t, &channel) < 0 || + !pa_tagstruct_eof(t)) { + pa_context_fail(c, PA_ERR_PROTOCOL); + goto finish; + } - s->requested_bytes += bytes; + if (!(s = pa_dynarray_get(c->playback_streams, channel))) + goto finish; - if (s->requested_bytes && s->write_callback) - s->write_callback(s, s->requested_bytes, s->write_userdata); + if (s->state == PA_STREAM_READY) { - pa_stream_unref(s); + if (command == PA_COMMAND_OVERFLOW) { + if (s->overflow_callback) + s->overflow_callback(s, s->overflow_userdata); + } else if (command == PA_COMMAND_UNDERFLOW) { + if (s->underflow_callback) + s->underflow_callback(s, s->underflow_userdata); + } + } finish: pa_context_unref(c); @@ -270,14 +315,21 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED ((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) || ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) || !pa_tagstruct_eof(t)) { - pa_context_fail(s->context, PA_ERROR_PROTOCOL); + pa_context_fail(s->context, PA_ERR_PROTOCOL); goto finish; } if (s->direction == PA_STREAM_RECORD) { assert(!s->record_memblockq); - s->record_memblockq = pa_memblockq_new(s->buffer_attr.maxlength, 0, - pa_frame_size(&s->sample_spec), 0, 0, s->context->memblock_stat); + s->record_memblockq = pa_memblockq_new( + 0, + s->buffer_attr.maxlength, + 0, + pa_frame_size(&s->sample_spec), + 1, + 0, + NULL, + s->context->memblock_stat); assert(s->record_memblockq); } @@ -303,13 +355,32 @@ finish: pa_stream_unref(s); } -static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume) { +static int create_stream( + pa_stream_direction_t direction, + pa_stream *s, + const char *dev, + const pa_buffer_attr *attr, + pa_stream_flags_t flags, + const pa_cvolume *volume, + pa_stream *sync_stream) { + pa_tagstruct *t; uint32_t tag; - assert(s && s->ref >= 1 && s->state == PA_STREAM_DISCONNECTED); + + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, (flags & ~(PA_STREAM_START_CORKED|PA_STREAM_INTERPOLATE_LATENCY)) == 0, PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || flags == 0, PA_ERR_INVALID); pa_stream_ref(s); + s->direction = direction; + + if (sync_stream) + s->syncid = sync_stream->syncid; + s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY); pa_stream_trash_ipol(s); @@ -336,25 +407,28 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a dev = s->context->conf->default_source; } - pa_tagstruct_put(t, - PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM, - PA_TAG_U32, tag = s->context->ctag++, - PA_TAG_STRING, s->name, - PA_TAG_SAMPLE_SPEC, &s->sample_spec, - PA_TAG_CHANNEL_MAP, &s->channel_map, - PA_TAG_U32, PA_INVALID_INDEX, - PA_TAG_STRING, dev, - PA_TAG_U32, s->buffer_attr.maxlength, - PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED), - PA_TAG_INVALID); + pa_tagstruct_put( + t, + PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM, + PA_TAG_U32, tag = s->context->ctag++, + PA_TAG_STRING, s->name, + PA_TAG_SAMPLE_SPEC, &s->sample_spec, + PA_TAG_CHANNEL_MAP, &s->channel_map, + PA_TAG_U32, PA_INVALID_INDEX, + PA_TAG_STRING, dev, + PA_TAG_U32, s->buffer_attr.maxlength, + PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED), + PA_TAG_INVALID); if (s->direction == PA_STREAM_PLAYBACK) { pa_cvolume cv; - pa_tagstruct_put(t, - PA_TAG_U32, s->buffer_attr.tlength, - PA_TAG_U32, s->buffer_attr.prebuf, - PA_TAG_U32, s->buffer_attr.minreq, - PA_TAG_INVALID); + pa_tagstruct_put( + t, + PA_TAG_U32, s->buffer_attr.tlength, + PA_TAG_U32, s->buffer_attr.prebuf, + PA_TAG_U32, s->buffer_attr.minreq, + PA_TAG_U32, s->syncid, + PA_TAG_INVALID); if (!volume) { pa_cvolume_reset(&cv, s->sample_spec.channels); @@ -369,23 +443,57 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s); pa_stream_unref(s); + return 0; } -void pa_stream_connect_playback(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, pa_cvolume *volume) { - assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1); - s->direction = PA_STREAM_PLAYBACK; - create_stream(s, dev, attr, flags, volume); +int pa_stream_connect_playback( + pa_stream *s, + const char *dev, + const pa_buffer_attr *attr, + pa_stream_flags_t flags, + pa_cvolume *volume, + pa_stream *sync_stream) { + + assert(s); + assert(s->ref >= 1); + + return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream); } -void pa_stream_connect_record(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags) { - assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1); - s->direction = PA_STREAM_RECORD; - create_stream(s, dev, attr, flags, 0); +int pa_stream_connect_record( + pa_stream *s, + const char *dev, + const pa_buffer_attr *attr, + pa_stream_flags_t flags) { + + assert(s); + assert(s->ref >= 1); + + return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL); } -void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) { +int pa_stream_write( + pa_stream *s, + const void *data, + size_t length, + void (*free_cb)(void *p), + int64_t offset, + pa_seek_mode_t seek) { + pa_memchunk chunk; - assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1); + + assert(s); + assert(s->ref >= 1); + assert(s->context); + assert(data); + + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID); + + if (length <= 0) + return 0; if (free_cb) { chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat); @@ -398,7 +506,7 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_ chunk.index = 0; chunk.length = length; - pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk); + pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk); pa_memblock_unref(chunk.memblock); if (length < s->requested_bytes) @@ -407,72 +515,87 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_ s->requested_bytes = 0; s->counter += length; + return 0; } -void pa_stream_peek(pa_stream *s, void **data, size_t *length) { - assert(s && s->record_memblockq && data && length && s->state == PA_STREAM_READY && s->ref >= 1); +int pa_stream_peek(pa_stream *s, const void **data, size_t *length) { + assert(s); + assert(s->ref >= 1); + assert(data); + assert(length); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); + if (!s->peek_memchunk.memblock) { - *data = NULL; - *length = 0; - - if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) - return; - pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length); + if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) { + *data = NULL; + *length = 0; + return 0; + } } - *data = (char*)s->peek_memchunk.memblock->data + s->peek_memchunk.index; + *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index; *length = s->peek_memchunk.length; + return 0; } -void pa_stream_drop(pa_stream *s) { - assert(s && s->peek_memchunk.memblock && s->state == PA_STREAM_READY && s->ref >= 1); - - s->counter += s->peek_memchunk.length; +int pa_stream_drop(pa_stream *s) { + assert(s); + assert(s->ref >= 1); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE); + + pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length); + pa_memblock_unref(s->peek_memchunk.memblock); - s->peek_memchunk.length = 0; + s->peek_memchunk.index = 0; s->peek_memchunk.memblock = NULL; + + s->counter += s->peek_memchunk.length; + return 0; } size_t pa_stream_writable_size(pa_stream *s) { - assert(s && s->ref >= 1); - return s->state == PA_STREAM_READY ? s->requested_bytes : 0; + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1); + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE, (size_t) -1); + + return s->requested_bytes; } size_t pa_stream_readable_size(pa_stream *s) { - size_t sz; - - assert(s && s->ref >= 1); - - if (s->state != PA_STREAM_READY) - return 0; - - assert(s->record_memblockq); - - sz = (size_t)pa_memblockq_get_length(s->record_memblockq); + assert(s); + assert(s->ref >= 1); - if (s->peek_memchunk.memblock) - sz += s->peek_memchunk.length; + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1); + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - return sz; + return pa_memblockq_get_length(s->record_memblockq); } pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) { pa_operation *o; pa_tagstruct *t; uint32_t tag; - assert(s && s->ref >= 1 && s->state == PA_STREAM_READY); + + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); o = pa_operation_new(s->context, s); - assert(o); o->callback = (pa_operation_callback) cb; o->userdata = userdata; t = pa_tagstruct_new(NULL, 0); - assert(t); pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM); pa_tagstruct_putu32(t, tag = s->context->ctag++); pa_tagstruct_putu32(t, s->channel); @@ -501,7 +624,7 @@ static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command, pa_tagstruct_get_timeval(t, &remote) < 0 || pa_tagstruct_getu64(t, &i.counter) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } else { pa_gettimeofday(&now); @@ -549,15 +672,18 @@ pa_operation* pa_stream_get_latency_info(pa_stream *s, void (*cb)(pa_stream *p, pa_operation *o; pa_tagstruct *t; struct timeval now; - assert(s && s->direction != PA_STREAM_UPLOAD); + + assert(s); + assert(s->ref >= 1); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + o = pa_operation_new(s->context, s); - assert(o); o->callback = (pa_operation_callback) cb; o->userdata = userdata; t = pa_tagstruct_new(NULL, 0); - assert(t); pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY); pa_tagstruct_putu32(t, tag = s->context->ctag++); pa_tagstruct_putu32(t, s->channel); @@ -585,7 +711,7 @@ void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN pa_stream_set_state(s, PA_STREAM_FAILED); goto finish; } else if (!pa_tagstruct_eof(t)) { - pa_context_fail(s->context, PA_ERROR_PROTOCOL); + pa_context_fail(s->context, PA_ERR_PROTOCOL); goto finish; } @@ -595,18 +721,19 @@ finish: pa_stream_unref(s); } -void pa_stream_disconnect(pa_stream *s) { +int pa_stream_disconnect(pa_stream *s) { pa_tagstruct *t; uint32_t tag; - assert(s && s->ref >= 1); - if (!s->channel_valid || !s->context->state == PA_CONTEXT_READY) - return; + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); pa_stream_ref(s); t = pa_tagstruct_new(NULL, 0); - assert(t); pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM : (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)); @@ -616,26 +743,49 @@ void pa_stream_disconnect(pa_stream *s) { pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s); pa_stream_unref(s); + return 0; } -void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) { - assert(s && s->ref >= 1); +void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + s->read_callback = cb; s->read_userdata = userdata; } -void pa_stream_set_write_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) { - assert(s && s->ref >= 1); +void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + s->write_callback = cb; s->write_userdata = userdata; } -void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata) { - assert(s && s->ref >= 1); +void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + s->state_callback = cb; s->state_userdata = userdata; } +void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + + s->overflow_callback = cb; + s->overflow_userdata = userdata; +} + +void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + + s->underflow_callback = cb; + s->underflow_userdata = userdata; +} + void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) { pa_operation *o = userdata; int success = 1; @@ -647,7 +797,7 @@ void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN success = 0; } else if (!pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } diff --git a/src/polyp/stream.h b/src/polyp/stream.h index 9bbda436..ce535963 100644 --- a/src/polyp/stream.h +++ b/src/polyp/stream.h @@ -40,8 +40,27 @@ PA_C_DECL_BEGIN * An opaque stream for playback or recording */ typedef struct pa_stream pa_stream; +/** A generic callback for operation completion */ +typedef void (*pa_stream_success_cb_t) (pa_stream*s, int success, void *userdata); + +/** A generic free callback */ +typedef void (*pa_free_cb_t)(void *p); + +/** A generic request callback */ +typedef void (*pa_stream_request_cb_t)(pa_stream *p, size_t length, void *userdata); + +/** A generic notification callback */ +typedef void (*pa_stream_notify_cb_t)(pa_stream *p, void *userdata); + +/** Callback prototype for pa_stream_get_latency_info() */ +typedef void (*pa_stream_get_latency_info_cb_t)(pa_stream *p, const pa_latency_info *i, void *userdata); + /** Create a new, unconnected stream with the specified name and sample type */ -pa_stream* pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map); +pa_stream* pa_stream_new( + pa_context *c, + const char *name, + const pa_sample_spec *ss, + const pa_channel_map *map); /** Decrease the reference counter by one */ void pa_stream_unref(pa_stream *s); @@ -59,108 +78,101 @@ pa_context* pa_stream_get_context(pa_stream *p); uint32_t pa_stream_get_index(pa_stream *s); /** Connect the stream to a sink */ -void pa_stream_connect_playback( - pa_stream *s, - const char *dev, - const pa_buffer_attr *attr, - pa_stream_flags_t flags, - pa_cvolume *volume); +int pa_stream_connect_playback( + pa_stream *s /**< The stream to connect to a sink */, + const char *dev /**< Name of the sink to connect to, or NULL for default */ , + const pa_buffer_attr *attr /**< Buffering attributes, or NULL for default */, + pa_stream_flags_t flags /**< Additional flags, or 0 for default */, + pa_cvolume *volume /**< Initial volume, or NULL for default */, + pa_stream *sync_stream /**< Synchronize this stream with the specified one, or NULL for a standalone stream*/); /** Connect the stream to a source */ -void pa_stream_connect_record( - pa_stream *s, - const char *dev, - const pa_buffer_attr *attr, - pa_stream_flags_t flags); +int pa_stream_connect_record( + pa_stream *s, + const char *dev, + const pa_buffer_attr *attr, + pa_stream_flags_t flags); /** Disconnect a stream from a source/sink */ -void pa_stream_disconnect(pa_stream *s); +int pa_stream_disconnect(pa_stream *s); /** Write some data to the server (for playback sinks), if free_cb is * non-NULL this routine is called when all data has been written out * and an internal reference to the specified data is kept, the data * is not copied. If NULL, the data is copied into an internal - * buffer. */ -void pa_stream_write(pa_stream *p /**< The stream to use */, - const void *data /**< The data to write */, - size_t length /**< The length of the data to write */, - void (*free_cb)(void *p) /**< A cleanup routine for the data or NULL to request an internal copy */, - size_t delta /**< Drop this many - bytes in the playback - buffer before writing - this data. Use - (size_t) -1 for - clearing the whole - playback - buffer. Normally you - will specify 0 here, - i.e. append to the - playback buffer. If - the value given here - is greater than the - buffered data length - the buffer is cleared - and the data is - written to the - buffer's start. This - value is ignored on - upload streams. */); - -/** Read the next fragment from the buffer (for capture sources). + * buffer. The client my freely seek around in the output buffer. For + * most applications passing 0 and PA_SEEK_RELATIVE as arguments for + * offset and seek should be useful.*/ +int pa_stream_write( + pa_stream *p /**< The stream to use */, + const void *data /**< The data to write */, + size_t length /**< The length of the data to write */, + pa_free_cb_t free_cb /**< A cleanup routine for the data or NULL to request an internal copy */, + int64_t offset, /**< Offset for seeking, must be 0 for upload streams */ + pa_seek_mode_t seek /**< Seek mode, must be PA_SEEK_RELATIVE for upload streams */); + +/** Read the next fragment from the buffer (for recording). * data will point to the actual data and length will contain the size * of the data in bytes (which can be less than a complete framgnet). - * Use pa_stream_drop() to actually remove the data from the buffer. - * \since 0.8 */ -void pa_stream_peek(pa_stream *p /**< The stream to use */, - void **data /**< Pointer to pointer that will point to data */, - size_t *length /**< The length of the data read */); + * Use pa_stream_drop() to actually remove the data from the + * buffer. If no data is available will return a NULL pointer \since 0.8 */ +int pa_stream_peek( + pa_stream *p /**< The stream to use */, + const void **data /**< Pointer to pointer that will point to data */, + size_t *length /**< The length of the data read */); /** Remove the current fragment. It is invalid to do this without first * calling pa_stream_peek(). \since 0.8 */ -void pa_stream_drop(pa_stream *p); +int pa_stream_drop(pa_stream *p); -/** Return the amount of bytes that may be written using pa_stream_write() */ +/** Return the nember of bytes that may be written using pa_stream_write() */ size_t pa_stream_writable_size(pa_stream *p); -/** Return the ammount of bytes that may be read using pa_stream_read() \since 0.8 */ +/** Return the number of bytes that may be read using pa_stream_read() \since 0.8 */ size_t pa_stream_readable_size(pa_stream *p); -/** Drain a playback stream */ -pa_operation* pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata); +/** Drain a playback stream. Use this for notification when the buffer is empty */ +pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata); /** Get the playback latency of a stream */ -pa_operation* pa_stream_get_latency_info(pa_stream *p, void (*cb)(pa_stream *p, const pa_latency_info *i, void *userdata), void *userdata); +pa_operation* pa_stream_get_latency_info(pa_stream *p, pa_stream_get_latency_info_cb_t cby, void *userdata); /** Set the callback function that is called whenever the state of the stream changes */ -void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata); +void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata); /** Set the callback function that is called when new data may be * written to the stream. */ -void pa_stream_set_write_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata); +void pa_stream_set_write_callback(pa_stream *p, pa_stream_request_cb_t cb, void *userdata); /** Set the callback function that is called when new data is available from the stream. - * Return the number of bytes read. \since 0.8 - */ -void pa_stream_set_read_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata); + * Return the number of bytes read. \since 0.8 */ +void pa_stream_set_read_callback(pa_stream *p, pa_stream_request_cb_t cb, void *userdata); + +/** Set the callback function that is called when a buffer overflow happens. (Only for playback streams) \since 0.8 */ +void pa_stream_set_overflow_callback(pa_stream *p, pa_stream_notify_cb_t cb, void *userdata); + +/** Set the callback function that is called when a buffer underflow happens. (Only for playback streams) \since 0.8 */ +void pa_stream_set_underflow_callback(pa_stream *p, pa_stream_notify_cb_t cb, void *userdata); /** Pause (or resume) playback of this stream temporarily. Available on both playback and recording streams. \since 0.3 */ -pa_operation* pa_stream_cork(pa_stream *s, int b, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata); +pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata); /** Flush the playback buffer of this stream. Most of the time you're * better off using the parameter delta of pa_stream_write() instead of this * function. Available on both playback and recording streams. \since 0.3 */ -pa_operation* pa_stream_flush(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata); +pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata); -/** Reenable prebuffering. Available for playback streams only. \since 0.6 */ -pa_operation* pa_stream_prebuf(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata); +/** Reenable prebuffering as specified in the pa_buffer_attr + * structure. Available for playback streams only. \since 0.6 */ +pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata); /** Request immediate start of playback on this stream. This disables - * prebuffering as specified in the pa_buffer_attr structure. Available for playback streams only. \since - * 0.3 */ -pa_operation* pa_stream_trigger(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata); + * prebuffering as specified in the pa_buffer_attr + * structure, temporarily. Available for playback streams only. \since 0.3 */ +pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata); /** Rename the stream. \since 0.5 */ -pa_operation* pa_stream_set_name(pa_stream *s, const char *name, void(*cb)(pa_stream*c, int success, void *userdata), void *userdata); +pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata); /** Return the total number of bytes written to/read from the * stream. This counter is not reset on pa_stream_flush(), you may do diff --git a/src/polyp/subscribe.c b/src/polyp/subscribe.c index b90e0bf1..4e00997a 100644 --- a/src/polyp/subscribe.c +++ b/src/polyp/subscribe.c @@ -44,7 +44,7 @@ void pa_command_subscribe_event(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSE if (pa_tagstruct_getu32(t, &e) < 0 || pa_tagstruct_getu32(t, &index) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); goto finish; } |