summaryrefslogtreecommitdiffstats
path: root/src/polyp
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
committerLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
commit304449002cbc84fdcf235b5dfaec891278dd7085 (patch)
tree2a2d00e34d5c620835b76a0d6f7890a1d3e9fb97 /src/polyp
parent0876b1ba82ea9c988df90ca98d202765ac697313 (diff)
1) Add flexible seeking support (including absolute) for memory block queues and playback streams
2) Add support to synchronize multiple playback streams 3) add two tests for 1) and 2) 4) s/PA_ERROR/PA_ERR/ 5) s/PA_ERROR_OK/PA_OK/ 6) update simple API to deal properly with new peek/drop recording API 7) add beginnings of proper validity checking on API calls in client libs (needs to be extended) 8) report playback buffer overflows/underflows to the client 9) move client side recording mcalign stuff into the memblockq 10) create typedefs for a bunch of API callback prototypes 11) simplify handling of HUP poll() events Yes, i know, it's usually better to commit a lot of small patches instead of a single big one. In this case however, this would have contradicted the other rule: never commit broken or incomplete stuff. *** This stuff needs a lot of additional testing! *** git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@511 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/polyp')
-rw-r--r--src/polyp/context.c73
-rw-r--r--src/polyp/def.h45
-rw-r--r--src/polyp/error.c37
-rw-r--r--src/polyp/error.h2
-rw-r--r--src/polyp/internal.h37
-rw-r--r--src/polyp/introspect.c24
-rw-r--r--src/polyp/simple.c41
-rw-r--r--src/polyp/stream.c366
-rw-r--r--src/polyp/stream.h140
-rw-r--r--src/polyp/subscribe.c2
10 files changed, 485 insertions, 282 deletions
diff --git a/src/polyp/context.c b/src/polyp/context.c
index 6b778562..eac5dd23 100644
--- a/src/polyp/context.c
+++ b/src/polyp/context.c
@@ -74,6 +74,8 @@
static const pa_pdispatch_callback command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_REQUEST] = pa_command_request,
+ [PA_COMMAND_OVERFLOW] = pa_command_overflow_or_underflow,
+ [PA_COMMAND_UNDERFLOW] = pa_command_overflow_or_underflow,
[PA_COMMAND_PLAYBACK_STREAM_KILLED] = pa_command_stream_killed,
[PA_COMMAND_RECORD_STREAM_KILLED] = pa_command_stream_killed,
[PA_COMMAND_SUBSCRIBE_EVENT] = pa_command_subscribe_event
@@ -109,9 +111,10 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) {
PA_LLIST_HEAD_INIT(pa_stream, c->streams);
PA_LLIST_HEAD_INIT(pa_operation, c->operations);
- c->error = PA_ERROR_OK;
+ c->error = PA_OK;
c->state = PA_CONTEXT_UNCONNECTED;
c->ctag = 0;
+ c->csyncid = 0;
c->state_callback = NULL;
c->state_userdata = NULL;
@@ -234,14 +237,24 @@ void pa_context_set_state(pa_context *c, pa_context_state_t st) {
void pa_context_fail(pa_context *c, int error) {
assert(c);
- c->error = error;
+
+ pa_context_set_error(c, error);
pa_context_set_state(c, PA_CONTEXT_FAILED);
}
+int pa_context_set_error(pa_context *c, int error) {
+ assert(error >= 0 && error < PA_ERR_MAX);
+
+ if (c)
+ c->error = error;
+
+ return error;
+}
+
static void pstream_die_callback(pa_pstream *p, void *userdata) {
pa_context *c = userdata;
assert(p && c);
- pa_context_fail(c, PA_ERROR_CONNECTIONTERMINATED);
+ pa_context_fail(c, PA_ERR_CONNECTIONTERMINATED);
}
static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *userdata) {
@@ -252,34 +265,34 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *user
if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
pa_log(__FILE__": invalid packet.\n");
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
}
pa_context_unref(c);
}
-static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, PA_GCC_UNUSED uint32_t delta, const pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
pa_context *c = userdata;
pa_stream *s;
- assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
+
+ assert(p);
+ assert(chunk);
+ assert(chunk->memblock);
+ assert(chunk->length);
+ assert(c);
pa_context_ref(c);
if ((s = pa_dynarray_get(c->record_streams, channel))) {
- pa_mcalign_push(s->mcalign, chunk);
- for (;;) {
- pa_memchunk t;
-
- if (pa_mcalign_pop(s->mcalign, &t) < 0)
- break;
+ pa_memblockq_seek(s->record_memblockq, offset, seek);
+ pa_memblockq_push_align(s->record_memblockq, chunk);
- assert(s->record_memblockq);
- pa_memblockq_push(s->record_memblockq, &t, 0);
- if (s->read_callback)
- s->read_callback(s, pa_stream_readable_size(s), s->read_userdata);
+ if (s->read_callback) {
+ size_t l;
- pa_memblock_unref(t.memblock);
+ if ((l = pa_memblockq_get_length(s->record_memblockq)) > 0)
+ s->read_callback(s, l, s->read_userdata);
}
}
@@ -293,14 +306,14 @@ int pa_context_handle_error(pa_context *c, uint32_t command, pa_tagstruct *t) {
assert(t);
if (pa_tagstruct_getu32(t, &c->error) < 0) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
return -1;
}
} else if (command == PA_COMMAND_TIMEOUT)
- c->error = PA_ERROR_TIMEOUT;
+ c->error = PA_ERR_TIMEOUT;
else {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
return -1;
}
@@ -316,7 +329,7 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
if (command != PA_COMMAND_REPLY) {
if (pa_context_handle_error(c, command, t) < 0)
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
pa_context_fail(c, c->error);
goto finish;
@@ -368,7 +381,7 @@ static void setup_context(pa_context *c, pa_iochannel *io) {
assert(c->pdispatch);
if (!c->conf->cookie_valid) {
- pa_context_fail(c, PA_ERROR_AUTHKEY);
+ pa_context_fail(c, PA_ERR_AUTHKEY);
goto finish;
}
@@ -401,7 +414,7 @@ static int context_connect_spawn(pa_context *c) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
pa_log(__FILE__": socketpair() failed: %s\n", strerror(errno));
- pa_context_fail(c, PA_ERROR_INTERNAL);
+ pa_context_fail(c, PA_ERR_INTERNAL);
goto fail;
}
@@ -415,7 +428,7 @@ static int context_connect_spawn(pa_context *c) {
if ((pid = fork()) < 0) {
pa_log(__FILE__": fork() failed: %s\n", strerror(errno));
- pa_context_fail(c, PA_ERROR_INTERNAL);
+ pa_context_fail(c, PA_ERR_INTERNAL);
if (c->spawn_api.postfork)
c->spawn_api.postfork();
@@ -471,10 +484,10 @@ static int context_connect_spawn(pa_context *c) {
if (r < 0) {
pa_log(__FILE__": waitpid() failed: %s\n", strerror(errno));
- pa_context_fail(c, PA_ERROR_INTERNAL);
+ pa_context_fail(c, PA_ERR_INTERNAL);
goto fail;
} else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
- pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED);
+ pa_context_fail(c, PA_ERR_CONNECTIONREFUSED);
goto fail;
}
@@ -527,7 +540,7 @@ static int try_next_connection(pa_context *c) {
}
#endif
- pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED);
+ pa_context_fail(c, PA_ERR_CONNECTIONREFUSED);
goto finish;
}
@@ -569,7 +582,7 @@ static void on_connection(pa_socket_client *client, pa_iochannel*io, void *userd
goto finish;
}
- pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED);
+ pa_context_fail(c, PA_ERR_CONNECTIONREFUSED);
goto finish;
}
@@ -593,7 +606,7 @@ int pa_context_connect(pa_context *c, const char *server, int spawn, const pa_sp
if (server) {
if (!(c->server_list = pa_strlist_parse(server))) {
- pa_context_fail(c, PA_ERROR_INVALIDSERVER);
+ pa_context_fail(c, PA_ERR_INVALIDSERVER);
goto finish;
}
} else {
@@ -759,7 +772,7 @@ void pa_context_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_U
success = 0;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
diff --git a/src/polyp/def.h b/src/polyp/def.h
index ba35b31e..f8e0bed4 100644
--- a/src/polyp/def.h
+++ b/src/polyp/def.h
@@ -47,7 +47,7 @@ typedef enum pa_context_state {
/** The state of a stream */
typedef enum pa_stream_state {
- PA_STREAM_DISCONNECTED, /**< The stream is not yet connected to any sink or source */
+ PA_STREAM_UNCONNECTED, /**< The stream is not yet connected to any sink or source */
PA_STREAM_CREATING, /**< The stream is being created */
PA_STREAM_READY, /**< The stream is established, you may pass audio data to it now */
PA_STREAM_FAILED, /**< An error occured that made the stream invalid */
@@ -103,22 +103,24 @@ typedef struct pa_buffer_attr {
/** Error values as used by pa_context_errno(). Use pa_strerror() to convert these values to human readable strings */
enum {
- PA_ERROR_OK, /**< No error */
- PA_ERROR_ACCESS, /**< Access failure */
- PA_ERROR_COMMAND, /**< Unknown command */
- PA_ERROR_INVALID, /**< Invalid argument */
- PA_ERROR_EXIST, /**< Entity exists */
- PA_ERROR_NOENTITY, /**< No such entity */
- PA_ERROR_CONNECTIONREFUSED, /**< Connection refused */
- PA_ERROR_PROTOCOL, /**< Protocol error */
- PA_ERROR_TIMEOUT, /**< Timeout */
- PA_ERROR_AUTHKEY, /**< No authorization key */
- PA_ERROR_INTERNAL, /**< Internal error */
- PA_ERROR_CONNECTIONTERMINATED, /**< Connection terminated */
- PA_ERROR_KILLED, /**< Entity killed */
- PA_ERROR_INVALIDSERVER, /**< Invalid server */
- PA_ERROR_INITFAILED, /**< Module initialization failed */
- PA_ERROR_MAX /**< Not really an error but the first invalid error code */
+ PA_OK = 0, /**< No error */
+ PA_ERR_ACCESS, /**< Access failure */
+ PA_ERR_COMMAND, /**< Unknown command */
+ PA_ERR_INVALID, /**< Invalid argument */
+ PA_ERR_EXIST, /**< Entity exists */
+ PA_ERR_NOENTITY, /**< No such entity */
+ PA_ERR_CONNECTIONREFUSED, /**< Connection refused */
+ PA_ERR_PROTOCOL, /**< Protocol error */
+ PA_ERR_TIMEOUT, /**< Timeout */
+ PA_ERR_AUTHKEY, /**< No authorization key */
+ PA_ERR_INTERNAL, /**< Internal error */
+ PA_ERR_CONNECTIONTERMINATED, /**< Connection terminated */
+ PA_ERR_KILLED, /**< Entity killed */
+ PA_ERR_INVALIDSERVER, /**< Invalid server */
+ PA_ERR_MODINITFAILED, /**< Module initialization failed */
+ PA_ERR_BADSTATE, /**< Bad state */
+ PA_ERR_NODATA, /**< No data */
+ PA_ERR_MAX /**< Not really an error but the first invalid error code */
};
/** Subscription event mask, as used by pa_context_subscribe() */
@@ -208,6 +210,15 @@ typedef struct pa_spawn_api {
* passed to the new process. */
} pa_spawn_api;
+/** Seek type \since 0.8*/
+typedef enum pa_seek_mode {
+ PA_SEEK_RELATIVE = 0, /**< Seek relatively to the write index */
+ PA_SEEK_ABSOLUTE = 1, /**< Seek relatively to the start of the buffer queue */
+ PA_SEEK_RELATIVE_ON_READ = 2, /**< Seek relatively to the read index */
+ PA_SEEK_RELATIVE_END = 3, /**< Seek relatively to the current end of the buffer queue */
+} pa_seek_mode_t;
+
+
PA_C_DECL_END
#endif
diff --git a/src/polyp/error.c b/src/polyp/error.c
index ece77bf2..eff37cc8 100644
--- a/src/polyp/error.c
+++ b/src/polyp/error.c
@@ -30,25 +30,28 @@
#include "error.h"
-static const char* const errortab[PA_ERROR_MAX] = {
- [PA_ERROR_OK] = "OK",
- [PA_ERROR_ACCESS] = "Access denied",
- [PA_ERROR_COMMAND] = "Unknown command",
- [PA_ERROR_INVALID] = "Invalid argument",
- [PA_ERROR_EXIST] = "Entity exists",
- [PA_ERROR_NOENTITY] = "No such entity",
- [PA_ERROR_CONNECTIONREFUSED] = "Connection refused",
- [PA_ERROR_PROTOCOL] = "Protocol error",
- [PA_ERROR_TIMEOUT] = "Timeout",
- [PA_ERROR_AUTHKEY] = "No authorization key",
- [PA_ERROR_INTERNAL] = "Internal error",
- [PA_ERROR_CONNECTIONTERMINATED] = "Connection terminated",
- [PA_ERROR_KILLED] = "Entity killed",
- [PA_ERROR_INVALIDSERVER] = "Invalid server",
+static const char* const errortab[PA_ERR_MAX] = {
+ [PA_OK] = "OK",
+ [PA_ERR_ACCESS] = "Access denied",
+ [PA_ERR_COMMAND] = "Unknown command",
+ [PA_ERR_INVALID] = "Invalid argument",
+ [PA_ERR_EXIST] = "Entity exists",
+ [PA_ERR_NOENTITY] = "No such entity",
+ [PA_ERR_CONNECTIONREFUSED] = "Connection refused",
+ [PA_ERR_PROTOCOL] = "Protocol error",
+ [PA_ERR_TIMEOUT] = "Timeout",
+ [PA_ERR_AUTHKEY] = "No authorization key",
+ [PA_ERR_INTERNAL] = "Internal error",
+ [PA_ERR_CONNECTIONTERMINATED] = "Connection terminated",
+ [PA_ERR_KILLED] = "Entity killed",
+ [PA_ERR_INVALIDSERVER] = "Invalid server",
+ [PA_ERR_MODINITFAILED] = "Module initalization failed",
+ [PA_ERR_BADSTATE] = "Bad state",
+ [PA_ERR_NODATA] = "No data",
};
-const char*pa_strerror(uint32_t error) {
- if (error >= PA_ERROR_MAX)
+const char*pa_strerror(int error) {
+ if (error < 0 || error >= PA_ERR_MAX)
return NULL;
return errortab[error];
diff --git a/src/polyp/error.h b/src/polyp/error.h
index ff2507b2..9856c1af 100644
--- a/src/polyp/error.h
+++ b/src/polyp/error.h
@@ -31,7 +31,7 @@
PA_C_DECL_BEGIN
/** Return a human readable error message for the specified numeric error code */
-const char* pa_strerror(uint32_t error);
+const char* pa_strerror(int error);
PA_C_DECL_END
diff --git a/src/polyp/internal.h b/src/polyp/internal.h
index 7f4d38ac..578969ee 100644
--- a/src/polyp/internal.h
+++ b/src/polyp/internal.h
@@ -54,8 +54,9 @@ struct pa_context {
pa_dynarray *record_streams, *playback_streams;
PA_LLIST_HEAD(pa_stream, streams);
PA_LLIST_HEAD(pa_operation, operations);
-
+
uint32_t ctag;
+ uint32_t csyncid;
uint32_t error;
pa_context_state_t state;
@@ -90,6 +91,7 @@ struct pa_stream {
pa_sample_spec sample_spec;
pa_channel_map channel_map;
uint32_t channel;
+ uint32_t syncid;
int channel_valid;
uint32_t device_index;
pa_stream_direction_t direction;
@@ -98,7 +100,6 @@ struct pa_stream {
pa_usec_t previous_time;
pa_usec_t previous_ipol_time;
pa_stream_state_t state;
- pa_mcalign *mcalign;
pa_memchunk peek_memchunk;
pa_memblockq *record_memblockq;
@@ -110,14 +111,20 @@ struct pa_stream {
pa_time_event *ipol_event;
int ipol_requested;
- void (*state_callback)(pa_stream*c, void *userdata);
+ pa_stream_notify_cb_t state_callback;
void *state_userdata;
- void (*read_callback)(pa_stream *p, size_t length, void *userdata);
+ pa_stream_request_cb_t read_callback;
void *read_userdata;
- void (*write_callback)(pa_stream *p, size_t length, void *userdata);
+ pa_stream_request_cb_t write_callback;
void *write_userdata;
+
+ pa_stream_notify_cb_t overflow_callback;
+ void *overflow_userdata;
+
+ pa_stream_notify_cb_t underflow_callback;
+ void *underflow_userdata;
};
typedef void (*pa_operation_callback)(void);
@@ -136,6 +143,7 @@ struct pa_operation {
void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
void pa_command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
pa_operation *pa_operation_new(pa_context *c, pa_stream *s);
void pa_operation_done(pa_operation *o);
@@ -146,6 +154,7 @@ void pa_context_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t
void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
void pa_context_fail(pa_context *c, int error);
+int pa_context_set_error(pa_context *c, int error);
void pa_context_set_state(pa_context *c, pa_context_state_t st);
int pa_context_handle_error(pa_context *c, uint32_t command, pa_tagstruct *t);
pa_operation* pa_context_send_simple_command(pa_context *c, uint32_t command, void (*internal_callback)(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata), void (*cb)(void), void *userdata);
@@ -154,5 +163,23 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st);
void pa_stream_trash_ipol(pa_stream *s);
+#define PA_CHECK_VALIDITY(context, expression, error) do { \
+ if (!(expression)) \
+ return -pa_context_set_error((context), (error)); \
+} while(0)
+
+#define PA_CHECK_VALIDITY_RETURN_NULL(context, expression, error) do { \
+ if (!(expression)) { \
+ pa_context_set_error((context), (error)); \
+ return NULL; \
+ } \
+} while(0)
+
+#define PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, value) do { \
+ if (!(expression)) { \
+ pa_context_set_error((context), (error)); \
+ return value; \
+ } \
+} while(0)
#endif
diff --git a/src/polyp/introspect.c b/src/polyp/introspect.c
index 4af724b4..75ce3ff9 100644
--- a/src/polyp/introspect.c
+++ b/src/polyp/introspect.c
@@ -52,7 +52,7 @@ static void context_stat_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNU
pa_tagstruct_getu32(t, &i.memblock_allocated_size) < 0 ||
pa_tagstruct_getu32(t, &i.scache_size) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -92,7 +92,7 @@ static void context_get_server_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_getu32(t, &i.cookie) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -139,7 +139,7 @@ static void context_get_sink_info_callback(pa_pdispatch *pd, uint32_t command, P
pa_tagstruct_get_usec(t, &i.latency) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -234,7 +234,7 @@ static void context_get_source_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_get_usec(t, &i.latency) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -322,7 +322,7 @@ static void context_get_client_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_gets(t, &i.name) < 0 ||
pa_tagstruct_getu32(t, &i.owner_module) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0 ) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -389,7 +389,7 @@ static void context_get_module_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_gets(t, &i.argument) < 0 ||
pa_tagstruct_getu32(t, &i.n_used) < 0 ||
pa_tagstruct_get_boolean(t, &i.auto_unload) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -464,7 +464,7 @@ static void context_get_sink_input_info_callback(pa_pdispatch *pd, uint32_t comm
pa_tagstruct_gets(t, &i.resample_method) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -538,7 +538,7 @@ static void context_get_source_output_info_callback(pa_pdispatch *pd, uint32_t c
pa_tagstruct_gets(t, &i.resample_method) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -677,7 +677,7 @@ static void context_get_sample_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_get_boolean(t, &i.lazy) < 0 ||
pa_tagstruct_gets(t, &i.filename) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -787,7 +787,7 @@ static void load_module_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUS
} else if (pa_tagstruct_getu32(t, &idx) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -848,7 +848,7 @@ static void context_get_autoload_info_callback(pa_pdispatch *pd, uint32_t comman
pa_tagstruct_getu32(t, &i.type) < 0 ||
pa_tagstruct_gets(t, &i.module) < 0 ||
pa_tagstruct_gets(t, &i.argument) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -926,7 +926,7 @@ static void context_add_autoload_callback(pa_pdispatch *pd, uint32_t command, PA
idx = PA_INVALID_INDEX;
} else if (pa_tagstruct_getu32(t, &idx) ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
diff --git a/src/polyp/simple.c b/src/polyp/simple.c
index e14cab2e..8a20c223 100644
--- a/src/polyp/simple.c
+++ b/src/polyp/simple.c
@@ -45,13 +45,11 @@ struct pa_simple {
int dead;
- void *read_data;
+ const void *read_data;
size_t read_index, read_length;
pa_usec_t latency;
};
-static void read_callback(pa_stream *s, const void*data, size_t length, void *userdata);
-
static int check_error(pa_simple *p, int *rerror) {
pa_context_state_t cst;
pa_stream_state_t sst;
@@ -92,7 +90,7 @@ static int iterate(pa_simple *p, int block, int *rerror) {
do {
if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) {
if (rerror)
- *rerror = PA_ERROR_INTERNAL;
+ *rerror = PA_ERR_INTERNAL;
return -1;
}
@@ -106,7 +104,7 @@ static int iterate(pa_simple *p, int block, int *rerror) {
if (pa_mainloop_iterate(p->mainloop, 0, NULL) < 0) {
if (rerror)
- *rerror = PA_ERROR_INTERNAL;
+ *rerror = PA_ERR_INTERNAL;
return -1;
}
@@ -128,7 +126,7 @@ pa_simple* pa_simple_new(
int *rerror) {
pa_simple *p;
- int error = PA_ERROR_INTERNAL;
+ int error = PA_ERR_INTERNAL;
assert(ss && (dir == PA_STREAM_PLAYBACK || dir == PA_STREAM_RECORD));
p = pa_xmalloc(sizeof(pa_simple));
@@ -157,7 +155,7 @@ pa_simple* pa_simple_new(
goto fail;
if (dir == PA_STREAM_PLAYBACK)
- pa_stream_connect_playback(p->stream, dev, attr, 0, NULL);
+ pa_stream_connect_playback(p->stream, dev, attr, 0, NULL, NULL);
else
pa_stream_connect_record(p->stream, dev, attr, 0);
@@ -167,8 +165,6 @@ pa_simple* pa_simple_new(
goto fail;
}
- pa_stream_set_read_callback(p->stream, read_callback, p);
-
return p;
fail:
@@ -181,8 +177,6 @@ fail:
void pa_simple_free(pa_simple *s) {
assert(s);
- pa_xfree(s->read_data);
-
if (s->stream)
pa_stream_unref(s->stream);
@@ -215,7 +209,7 @@ int pa_simple_write(pa_simple *p, const void*data, size_t length, int *rerror) {
if (l > length)
l = length;
- pa_stream_write(p->stream, data, l, NULL, 0);
+ pa_stream_write(p->stream, data, l, NULL, 0, PA_SEEK_RELATIVE);
data = (const uint8_t*) data + l;
length -= l;
}
@@ -227,19 +221,6 @@ int pa_simple_write(pa_simple *p, const void*data, size_t length, int *rerror) {
return 0;
}
-static void read_callback(pa_stream *s, const void*data, size_t length, void *userdata) {
- pa_simple *p = userdata;
- assert(s && data && length && p);
-
- if (p->read_data) {
- pa_log(__FILE__": Buffer overflow, dropping incoming memory blocks.\n");
- pa_xfree(p->read_data);
- }
-
- p->read_data = pa_xmemdup(data, p->read_length = length);
- p->read_index = 0;
-}
-
int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) {
assert(p && data && p->direction == PA_STREAM_RECORD);
@@ -251,13 +232,18 @@ int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) {
}
while (length > 0) {
+
+ if (!p->read_data)
+ if (pa_stream_peek(p->stream, &p->read_data, &p->read_length) >= 0)
+ p->read_index = 0;
+
if (p->read_data) {
size_t l = length;
if (p->read_length <= l)
l = p->read_length;
- memcpy(data, (uint8_t*) p->read_data+p->read_index, l);
+ memcpy(data, (const uint8_t*) p->read_data+p->read_index, l);
data = (uint8_t*) data + l;
length -= l;
@@ -266,8 +252,9 @@ int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) {
p->read_length -= l;
if (!p->read_length) {
- pa_xfree(p->read_data);
+ pa_stream_drop(p->stream);
p->read_data = NULL;
+ p->read_length = 0;
p->read_index = 0;
}
diff --git a/src/polyp/stream.c b/src/polyp/stream.c
index 35de2d01..5497f0c4 100644
--- a/src/polyp/stream.c
+++ b/src/polyp/stream.c
@@ -28,6 +28,7 @@
#include <stdio.h>
#include <string.h>
+#include <polyp/def.h>
#include <polypcore/xmalloc.h>
#include <polypcore/pstream-util.h>
#include <polypcore/util.h>
@@ -39,14 +40,11 @@
pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
pa_stream *s;
+
assert(c);
- assert(ss);
-
- if (!pa_sample_spec_valid(ss))
- return NULL;
- if (map && !pa_channel_map_valid(map))
- return NULL;
+ PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
+ PA_CHECK_VALIDITY_RETURN_NULL(c, !map || pa_channel_map_valid(map), PA_ERR_INVALID);
s = pa_xnew(pa_stream, 1);
s->ref = 1;
@@ -59,6 +57,10 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->write_userdata = NULL;
s->state_callback = NULL;
s->state_userdata = NULL;
+ s->overflow_callback = NULL;
+ s->overflow_userdata = NULL;
+ s->underflow_callback = NULL;
+ s->underflow_userdata = NULL;
s->direction = PA_STREAM_NODIRECTION;
s->name = pa_xstrdup(name);
@@ -71,13 +73,13 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->channel = 0;
s->channel_valid = 0;
+ s->syncid = c->csyncid++;
s->device_index = PA_INVALID_INDEX;
s->requested_bytes = 0;
- s->state = PA_STREAM_DISCONNECTED;
+ s->state = PA_STREAM_UNCONNECTED;
memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
- s->mcalign = pa_mcalign_new(pa_frame_size(ss), c->memblock_stat);
-
+ s->peek_memchunk.index = 0;
s->peek_memchunk.length = 0;
s->peek_memchunk.memblock = NULL;
@@ -114,42 +116,52 @@ static void stream_free(pa_stream *s) {
if (s->record_memblockq)
pa_memblockq_free(s->record_memblockq);
- pa_mcalign_free(s->mcalign);
-
pa_xfree(s->name);
pa_xfree(s);
}
void pa_stream_unref(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
if (--(s->ref) == 0)
stream_free(s);
}
pa_stream* pa_stream_ref(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
s->ref++;
return s;
}
pa_stream_state_t pa_stream_get_state(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
return s->state;
}
pa_context* pa_stream_get_context(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
return s->context;
}
uint32_t pa_stream_get_index(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
+
return s->device_index;
}
void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
if (s->state == st)
return;
@@ -159,6 +171,8 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
s->state = st;
if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) {
+ /* Detach from context */
+
if (s->channel_valid)
pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
@@ -182,14 +196,14 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
if (pa_tagstruct_getu32(t, &channel) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}
if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
goto finish;
- c->error = PA_ERROR_KILLED;
+ c->error = PA_ERR_KILLED;
pa_stream_set_state(s, PA_STREAM_FAILED);
finish:
@@ -207,24 +221,55 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32
if (pa_tagstruct_getu32(t, &channel) < 0 ||
pa_tagstruct_getu32(t, &bytes) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}
if (!(s = pa_dynarray_get(c->playback_streams, channel)))
goto finish;
- if (s->state != PA_STREAM_READY)
- goto finish;
+ if (s->state == PA_STREAM_READY) {
+ s->requested_bytes += bytes;
+
+ if (s->requested_bytes > 0 && s->write_callback)
+ s->write_callback(s, s->requested_bytes, s->write_userdata);
+ }
- pa_stream_ref(s);
+finish:
+ pa_context_unref(c);
+}
+
+void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
+ pa_stream *s;
+ pa_context *c = userdata;
+ uint32_t channel;
+
+ assert(pd);
+ assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
+ assert(t);
+ assert(c);
+
+ pa_context_ref(c);
+
+ if (pa_tagstruct_getu32(t, &channel) < 0 ||
+ !pa_tagstruct_eof(t)) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
- s->requested_bytes += bytes;
+ if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+ goto finish;
- if (s->requested_bytes && s->write_callback)
- s->write_callback(s, s->requested_bytes, s->write_userdata);
+ if (s->state == PA_STREAM_READY) {
- pa_stream_unref(s);
+ if (command == PA_COMMAND_OVERFLOW) {
+ if (s->overflow_callback)
+ s->overflow_callback(s, s->overflow_userdata);
+ } else if (command == PA_COMMAND_UNDERFLOW) {
+ if (s->underflow_callback)
+ s->underflow_callback(s, s->underflow_userdata);
+ }
+ }
finish:
pa_context_unref(c);
@@ -270,14 +315,21 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(s->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
if (s->direction == PA_STREAM_RECORD) {
assert(!s->record_memblockq);
- s->record_memblockq = pa_memblockq_new(s->buffer_attr.maxlength, 0,
- pa_frame_size(&s->sample_spec), 0, 0, s->context->memblock_stat);
+ s->record_memblockq = pa_memblockq_new(
+ 0,
+ s->buffer_attr.maxlength,
+ 0,
+ pa_frame_size(&s->sample_spec),
+ 1,
+ 0,
+ NULL,
+ s->context->memblock_stat);
assert(s->record_memblockq);
}
@@ -303,13 +355,32 @@ finish:
pa_stream_unref(s);
}
-static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume) {
+static int create_stream(
+ pa_stream_direction_t direction,
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags,
+ const pa_cvolume *volume,
+ pa_stream *sync_stream) {
+
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1 && s->state == PA_STREAM_DISCONNECTED);
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, (flags & ~(PA_STREAM_START_CORKED|PA_STREAM_INTERPOLATE_LATENCY)) == 0, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || flags == 0, PA_ERR_INVALID);
pa_stream_ref(s);
+ s->direction = direction;
+
+ if (sync_stream)
+ s->syncid = sync_stream->syncid;
+
s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY);
pa_stream_trash_ipol(s);
@@ -336,25 +407,28 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a
dev = s->context->conf->default_source;
}
- pa_tagstruct_put(t,
- PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
- PA_TAG_U32, tag = s->context->ctag++,
- PA_TAG_STRING, s->name,
- PA_TAG_SAMPLE_SPEC, &s->sample_spec,
- PA_TAG_CHANNEL_MAP, &s->channel_map,
- PA_TAG_U32, PA_INVALID_INDEX,
- PA_TAG_STRING, dev,
- PA_TAG_U32, s->buffer_attr.maxlength,
- PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
- PA_TAG_INVALID);
+ pa_tagstruct_put(
+ t,
+ PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
+ PA_TAG_U32, tag = s->context->ctag++,
+ PA_TAG_STRING, s->name,
+ PA_TAG_SAMPLE_SPEC, &s->sample_spec,
+ PA_TAG_CHANNEL_MAP, &s->channel_map,
+ PA_TAG_U32, PA_INVALID_INDEX,
+ PA_TAG_STRING, dev,
+ PA_TAG_U32, s->buffer_attr.maxlength,
+ PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
+ PA_TAG_INVALID);
if (s->direction == PA_STREAM_PLAYBACK) {
pa_cvolume cv;
- pa_tagstruct_put(t,
- PA_TAG_U32, s->buffer_attr.tlength,
- PA_TAG_U32, s->buffer_attr.prebuf,
- PA_TAG_U32, s->buffer_attr.minreq,
- PA_TAG_INVALID);
+ pa_tagstruct_put(
+ t,
+ PA_TAG_U32, s->buffer_attr.tlength,
+ PA_TAG_U32, s->buffer_attr.prebuf,
+ PA_TAG_U32, s->buffer_attr.minreq,
+ PA_TAG_U32, s->syncid,
+ PA_TAG_INVALID);
if (!volume) {
pa_cvolume_reset(&cv, s->sample_spec.channels);
@@ -369,23 +443,57 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s);
pa_stream_unref(s);
+ return 0;
}
-void pa_stream_connect_playback(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, pa_cvolume *volume) {
- assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
- s->direction = PA_STREAM_PLAYBACK;
- create_stream(s, dev, attr, flags, volume);
+int pa_stream_connect_playback(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags,
+ pa_cvolume *volume,
+ pa_stream *sync_stream) {
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
}
-void pa_stream_connect_record(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags) {
- assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
- s->direction = PA_STREAM_RECORD;
- create_stream(s, dev, attr, flags, 0);
+int pa_stream_connect_record(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags) {
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
}
-void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) {
+int pa_stream_write(
+ pa_stream *s,
+ const void *data,
+ size_t length,
+ void (*free_cb)(void *p),
+ int64_t offset,
+ pa_seek_mode_t seek) {
+
pa_memchunk chunk;
- assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
+
+ assert(s);
+ assert(s->ref >= 1);
+ assert(s->context);
+ assert(data);
+
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
+
+ if (length <= 0)
+ return 0;
if (free_cb) {
chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
@@ -398,7 +506,7 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_
chunk.index = 0;
chunk.length = length;
- pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk);
+ pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
pa_memblock_unref(chunk.memblock);
if (length < s->requested_bytes)
@@ -407,72 +515,87 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_
s->requested_bytes = 0;
s->counter += length;
+ return 0;
}
-void pa_stream_peek(pa_stream *s, void **data, size_t *length) {
- assert(s && s->record_memblockq && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
+int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
+ assert(s);
+ assert(s->ref >= 1);
+ assert(data);
+ assert(length);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
+
if (!s->peek_memchunk.memblock) {
- *data = NULL;
- *length = 0;
-
- if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0)
- return;
- pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+ if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
+ *data = NULL;
+ *length = 0;
+ return 0;
+ }
}
- *data = (char*)s->peek_memchunk.memblock->data + s->peek_memchunk.index;
+ *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index;
*length = s->peek_memchunk.length;
+ return 0;
}
-void pa_stream_drop(pa_stream *s) {
- assert(s && s->peek_memchunk.memblock && s->state == PA_STREAM_READY && s->ref >= 1);
-
- s->counter += s->peek_memchunk.length;
+int pa_stream_drop(pa_stream *s) {
+ assert(s);
+ assert(s->ref >= 1);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
+
+ pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+
pa_memblock_unref(s->peek_memchunk.memblock);
-
s->peek_memchunk.length = 0;
+ s->peek_memchunk.index = 0;
s->peek_memchunk.memblock = NULL;
+
+ s->counter += s->peek_memchunk.length;
+ return 0;
}
size_t pa_stream_writable_size(pa_stream *s) {
- assert(s && s->ref >= 1);
- return s->state == PA_STREAM_READY ? s->requested_bytes : 0;
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE, (size_t) -1);
+
+ return s->requested_bytes;
}
size_t pa_stream_readable_size(pa_stream *s) {
- size_t sz;
-
- assert(s && s->ref >= 1);
-
- if (s->state != PA_STREAM_READY)
- return 0;
-
- assert(s->record_memblockq);
-
- sz = (size_t)pa_memblockq_get_length(s->record_memblockq);
+ assert(s);
+ assert(s->ref >= 1);
- if (s->peek_memchunk.memblock)
- sz += s->peek_memchunk.length;
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
- return sz;
+ return pa_memblockq_get_length(s->record_memblockq);
}
pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) {
pa_operation *o;
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
o = pa_operation_new(s->context, s);
- assert(o);
o->callback = (pa_operation_callback) cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
@@ -501,7 +624,7 @@ static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_get_timeval(t, &remote) < 0 ||
pa_tagstruct_getu64(t, &i.counter) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
} else {
pa_gettimeofday(&now);
@@ -549,15 +672,18 @@ pa_operation* pa_stream_get_latency_info(pa_stream *s, void (*cb)(pa_stream *p,
pa_operation *o;
pa_tagstruct *t;
struct timeval now;
- assert(s && s->direction != PA_STREAM_UPLOAD);
+
+ assert(s);
+ assert(s->ref >= 1);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+
o = pa_operation_new(s->context, s);
- assert(o);
o->callback = (pa_operation_callback) cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
@@ -585,7 +711,7 @@ void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN
pa_stream_set_state(s, PA_STREAM_FAILED);
goto finish;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(s->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -595,18 +721,19 @@ finish:
pa_stream_unref(s);
}
-void pa_stream_disconnect(pa_stream *s) {
+int pa_stream_disconnect(pa_stream *s) {
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1);
- if (!s->channel_valid || !s->context->state == PA_CONTEXT_READY)
- return;
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
pa_stream_ref(s);
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
(s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM));
@@ -616,26 +743,49 @@ void pa_stream_disconnect(pa_stream *s) {
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s);
pa_stream_unref(s);
+ return 0;
}
-void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->read_callback = cb;
s->read_userdata = userdata;
}
-void pa_stream_set_write_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->write_callback = cb;
s->write_userdata = userdata;
}
-void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->state_callback = cb;
s->state_userdata = userdata;
}
+void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
+ s->overflow_callback = cb;
+ s->overflow_userdata = userdata;
+}
+
+void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
+ s->underflow_callback = cb;
+ s->underflow_userdata = userdata;
+}
+
void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_operation *o = userdata;
int success = 1;
@@ -647,7 +797,7 @@ void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN
success = 0;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
diff --git a/src/polyp/stream.h b/src/polyp/stream.h
index 9bbda436..ce535963 100644
--- a/src/polyp/stream.h
+++ b/src/polyp/stream.h
@@ -40,8 +40,27 @@ PA_C_DECL_BEGIN
* An opaque stream for playback or recording */
typedef struct pa_stream pa_stream;
+/** A generic callback for operation completion */
+typedef void (*pa_stream_success_cb_t) (pa_stream*s, int success, void *userdata);
+
+/** A generic free callback */
+typedef void (*pa_free_cb_t)(void *p);
+
+/** A generic request callback */
+typedef void (*pa_stream_request_cb_t)(pa_stream *p, size_t length, void *userdata);
+
+/** A generic notification callback */
+typedef void (*pa_stream_notify_cb_t)(pa_stream *p, void *userdata);
+
+/** Callback prototype for pa_stream_get_latency_info() */
+typedef void (*pa_stream_get_latency_info_cb_t)(pa_stream *p, const pa_latency_info *i, void *userdata);
+
/** Create a new, unconnected stream with the specified name and sample type */
-pa_stream* pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map);
+pa_stream* pa_stream_new(
+ pa_context *c,
+ const char *name,
+ const pa_sample_spec *ss,
+ const pa_channel_map *map);
/** Decrease the reference counter by one */
void pa_stream_unref(pa_stream *s);
@@ -59,108 +78,101 @@ pa_context* pa_stream_get_context(pa_stream *p);
uint32_t pa_stream_get_index(pa_stream *s);
/** Connect the stream to a sink */
-void pa_stream_connect_playback(
- pa_stream *s,
- const char *dev,
- const pa_buffer_attr *attr,
- pa_stream_flags_t flags,
- pa_cvolume *volume);
+int pa_stream_connect_playback(
+ pa_stream *s /**< The stream to connect to a sink */,
+ const char *dev /**< Name of the sink to connect to, or NULL for default */ ,
+ const pa_buffer_attr *attr /**< Buffering attributes, or NULL for default */,
+ pa_stream_flags_t flags /**< Additional flags, or 0 for default */,
+ pa_cvolume *volume /**< Initial volume, or NULL for default */,
+ pa_stream *sync_stream /**< Synchronize this stream with the specified one, or NULL for a standalone stream*/);
/** Connect the stream to a source */
-void pa_stream_connect_record(
- pa_stream *s,
- const char *dev,
- const pa_buffer_attr *attr,
- pa_stream_flags_t flags);
+int pa_stream_connect_record(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags);
/** Disconnect a stream from a source/sink */
-void pa_stream_disconnect(pa_stream *s);
+int pa_stream_disconnect(pa_stream *s);
/** Write some data to the server (for playback sinks), if free_cb is
* non-NULL this routine is called when all data has been written out
* and an internal reference to the specified data is kept, the data
* is not copied. If NULL, the data is copied into an internal
- * buffer. */
-void pa_stream_write(pa_stream *p /**< The stream to use */,
- const void *data /**< The data to write */,
- size_t length /**< The length of the data to write */,
- void (*free_cb)(void *p) /**< A cleanup routine for the data or NULL to request an internal copy */,
- size_t delta /**< Drop this many
- bytes in the playback
- buffer before writing
- this data. Use
- (size_t) -1 for
- clearing the whole
- playback
- buffer. Normally you
- will specify 0 here,
- i.e. append to the
- playback buffer. If
- the value given here
- is greater than the
- buffered data length
- the buffer is cleared
- and the data is
- written to the
- buffer's start. This
- value is ignored on
- upload streams. */);
-
-/** Read the next fragment from the buffer (for capture sources).
+ * buffer. The client my freely seek around in the output buffer. For
+ * most applications passing 0 and PA_SEEK_RELATIVE as arguments for
+ * offset and seek should be useful.*/
+int pa_stream_write(
+ pa_stream *p /**< The stream to use */,
+ const void *data /**< The data to write */,
+ size_t length /**< The length of the data to write */,
+ pa_free_cb_t free_cb /**< A cleanup routine for the data or NULL to request an internal copy */,
+ int64_t offset, /**< Offset for seeking, must be 0 for upload streams */
+ pa_seek_mode_t seek /**< Seek mode, must be PA_SEEK_RELATIVE for upload streams */);
+
+/** Read the next fragment from the buffer (for recording).
* data will point to the actual data and length will contain the size
* of the data in bytes (which can be less than a complete framgnet).
- * Use pa_stream_drop() to actually remove the data from the buffer.
- * \since 0.8 */
-void pa_stream_peek(pa_stream *p /**< The stream to use */,
- void **data /**< Pointer to pointer that will point to data */,
- size_t *length /**< The length of the data read */);
+ * Use pa_stream_drop() to actually remove the data from the
+ * buffer. If no data is available will return a NULL pointer \since 0.8 */
+int pa_stream_peek(
+ pa_stream *p /**< The stream to use */,
+ const void **data /**< Pointer to pointer that will point to data */,
+ size_t *length /**< The length of the data read */);
/** Remove the current fragment. It is invalid to do this without first
* calling pa_stream_peek(). \since 0.8 */
-void pa_stream_drop(pa_stream *p);
+int pa_stream_drop(pa_stream *p);
-/** Return the amount of bytes that may be written using pa_stream_write() */
+/** Return the nember of bytes that may be written using pa_stream_write() */
size_t pa_stream_writable_size(pa_stream *p);
-/** Return the ammount of bytes that may be read using pa_stream_read() \since 0.8 */
+/** Return the number of bytes that may be read using pa_stream_read() \since 0.8 */
size_t pa_stream_readable_size(pa_stream *p);
-/** Drain a playback stream */
-pa_operation* pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata);
+/** Drain a playback stream. Use this for notification when the buffer is empty */
+pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
/** Get the playback latency of a stream */
-pa_operation* pa_stream_get_latency_info(pa_stream *p, void (*cb)(pa_stream *p, const pa_latency_info *i, void *userdata), void *userdata);
+pa_operation* pa_stream_get_latency_info(pa_stream *p, pa_stream_get_latency_info_cb_t cby, void *userdata);
/** Set the callback function that is called whenever the state of the stream changes */
-void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata);
+void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata);
/** Set the callback function that is called when new data may be
* written to the stream. */
-void pa_stream_set_write_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata);
+void pa_stream_set_write_callback(pa_stream *p, pa_stream_request_cb_t cb, void *userdata);
/** Set the callback function that is called when new data is available from the stream.
- * Return the number of bytes read. \since 0.8
- */
-void pa_stream_set_read_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata);
+ * Return the number of bytes read. \since 0.8 */
+void pa_stream_set_read_callback(pa_stream *p, pa_stream_request_cb_t cb, void *userdata);
+
+/** Set the callback function that is called when a buffer overflow happens. (Only for playback streams) \since 0.8 */
+void pa_stream_set_overflow_callback(pa_stream *p, pa_stream_notify_cb_t cb, void *userdata);
+
+/** Set the callback function that is called when a buffer underflow happens. (Only for playback streams) \since 0.8 */
+void pa_stream_set_underflow_callback(pa_stream *p, pa_stream_notify_cb_t cb, void *userdata);
/** Pause (or resume) playback of this stream temporarily. Available on both playback and recording streams. \since 0.3 */
-pa_operation* pa_stream_cork(pa_stream *s, int b, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata);
+pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata);
/** Flush the playback buffer of this stream. Most of the time you're
* better off using the parameter delta of pa_stream_write() instead of this
* function. Available on both playback and recording streams. \since 0.3 */
-pa_operation* pa_stream_flush(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata);
+pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
-/** Reenable prebuffering. Available for playback streams only. \since 0.6 */
-pa_operation* pa_stream_prebuf(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata);
+/** Reenable prebuffering as specified in the pa_buffer_attr
+ * structure. Available for playback streams only. \since 0.6 */
+pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
/** Request immediate start of playback on this stream. This disables
- * prebuffering as specified in the pa_buffer_attr structure. Available for playback streams only. \since
- * 0.3 */
-pa_operation* pa_stream_trigger(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata);
+ * prebuffering as specified in the pa_buffer_attr
+ * structure, temporarily. Available for playback streams only. \since 0.3 */
+pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
/** Rename the stream. \since 0.5 */
-pa_operation* pa_stream_set_name(pa_stream *s, const char *name, void(*cb)(pa_stream*c, int success, void *userdata), void *userdata);
+pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata);
/** Return the total number of bytes written to/read from the
* stream. This counter is not reset on pa_stream_flush(), you may do
diff --git a/src/polyp/subscribe.c b/src/polyp/subscribe.c
index b90e0bf1..4e00997a 100644
--- a/src/polyp/subscribe.c
+++ b/src/polyp/subscribe.c
@@ -44,7 +44,7 @@ void pa_command_subscribe_event(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSE
if (pa_tagstruct_getu32(t, &e) < 0 ||
pa_tagstruct_getu32(t, &index) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}