summaryrefslogtreecommitdiffstats
path: root/src/polyp
diff options
context:
space:
mode:
Diffstat (limited to 'src/polyp')
-rw-r--r--src/polyp/context.c73
-rw-r--r--src/polyp/def.h45
-rw-r--r--src/polyp/error.c37
-rw-r--r--src/polyp/error.h2
-rw-r--r--src/polyp/internal.h37
-rw-r--r--src/polyp/introspect.c24
-rw-r--r--src/polyp/simple.c41
-rw-r--r--src/polyp/stream.c366
-rw-r--r--src/polyp/stream.h140
-rw-r--r--src/polyp/subscribe.c2
10 files changed, 485 insertions, 282 deletions
diff --git a/src/polyp/context.c b/src/polyp/context.c
index 6b778562..eac5dd23 100644
--- a/src/polyp/context.c
+++ b/src/polyp/context.c
@@ -74,6 +74,8 @@
static const pa_pdispatch_callback command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_REQUEST] = pa_command_request,
+ [PA_COMMAND_OVERFLOW] = pa_command_overflow_or_underflow,
+ [PA_COMMAND_UNDERFLOW] = pa_command_overflow_or_underflow,
[PA_COMMAND_PLAYBACK_STREAM_KILLED] = pa_command_stream_killed,
[PA_COMMAND_RECORD_STREAM_KILLED] = pa_command_stream_killed,
[PA_COMMAND_SUBSCRIBE_EVENT] = pa_command_subscribe_event
@@ -109,9 +111,10 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) {
PA_LLIST_HEAD_INIT(pa_stream, c->streams);
PA_LLIST_HEAD_INIT(pa_operation, c->operations);
- c->error = PA_ERROR_OK;
+ c->error = PA_OK;
c->state = PA_CONTEXT_UNCONNECTED;
c->ctag = 0;
+ c->csyncid = 0;
c->state_callback = NULL;
c->state_userdata = NULL;
@@ -234,14 +237,24 @@ void pa_context_set_state(pa_context *c, pa_context_state_t st) {
void pa_context_fail(pa_context *c, int error) {
assert(c);
- c->error = error;
+
+ pa_context_set_error(c, error);
pa_context_set_state(c, PA_CONTEXT_FAILED);
}
+int pa_context_set_error(pa_context *c, int error) {
+ assert(error >= 0 && error < PA_ERR_MAX);
+
+ if (c)
+ c->error = error;
+
+ return error;
+}
+
static void pstream_die_callback(pa_pstream *p, void *userdata) {
pa_context *c = userdata;
assert(p && c);
- pa_context_fail(c, PA_ERROR_CONNECTIONTERMINATED);
+ pa_context_fail(c, PA_ERR_CONNECTIONTERMINATED);
}
static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *userdata) {
@@ -252,34 +265,34 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *user
if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
pa_log(__FILE__": invalid packet.\n");
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
}
pa_context_unref(c);
}
-static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, PA_GCC_UNUSED uint32_t delta, const pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
pa_context *c = userdata;
pa_stream *s;
- assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
+
+ assert(p);
+ assert(chunk);
+ assert(chunk->memblock);
+ assert(chunk->length);
+ assert(c);
pa_context_ref(c);
if ((s = pa_dynarray_get(c->record_streams, channel))) {
- pa_mcalign_push(s->mcalign, chunk);
- for (;;) {
- pa_memchunk t;
-
- if (pa_mcalign_pop(s->mcalign, &t) < 0)
- break;
+ pa_memblockq_seek(s->record_memblockq, offset, seek);
+ pa_memblockq_push_align(s->record_memblockq, chunk);
- assert(s->record_memblockq);
- pa_memblockq_push(s->record_memblockq, &t, 0);
- if (s->read_callback)
- s->read_callback(s, pa_stream_readable_size(s), s->read_userdata);
+ if (s->read_callback) {
+ size_t l;
- pa_memblock_unref(t.memblock);
+ if ((l = pa_memblockq_get_length(s->record_memblockq)) > 0)
+ s->read_callback(s, l, s->read_userdata);
}
}
@@ -293,14 +306,14 @@ int pa_context_handle_error(pa_context *c, uint32_t command, pa_tagstruct *t) {
assert(t);
if (pa_tagstruct_getu32(t, &c->error) < 0) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
return -1;
}
} else if (command == PA_COMMAND_TIMEOUT)
- c->error = PA_ERROR_TIMEOUT;
+ c->error = PA_ERR_TIMEOUT;
else {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
return -1;
}
@@ -316,7 +329,7 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
if (command != PA_COMMAND_REPLY) {
if (pa_context_handle_error(c, command, t) < 0)
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
pa_context_fail(c, c->error);
goto finish;
@@ -368,7 +381,7 @@ static void setup_context(pa_context *c, pa_iochannel *io) {
assert(c->pdispatch);
if (!c->conf->cookie_valid) {
- pa_context_fail(c, PA_ERROR_AUTHKEY);
+ pa_context_fail(c, PA_ERR_AUTHKEY);
goto finish;
}
@@ -401,7 +414,7 @@ static int context_connect_spawn(pa_context *c) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
pa_log(__FILE__": socketpair() failed: %s\n", strerror(errno));
- pa_context_fail(c, PA_ERROR_INTERNAL);
+ pa_context_fail(c, PA_ERR_INTERNAL);
goto fail;
}
@@ -415,7 +428,7 @@ static int context_connect_spawn(pa_context *c) {
if ((pid = fork()) < 0) {
pa_log(__FILE__": fork() failed: %s\n", strerror(errno));
- pa_context_fail(c, PA_ERROR_INTERNAL);
+ pa_context_fail(c, PA_ERR_INTERNAL);
if (c->spawn_api.postfork)
c->spawn_api.postfork();
@@ -471,10 +484,10 @@ static int context_connect_spawn(pa_context *c) {
if (r < 0) {
pa_log(__FILE__": waitpid() failed: %s\n", strerror(errno));
- pa_context_fail(c, PA_ERROR_INTERNAL);
+ pa_context_fail(c, PA_ERR_INTERNAL);
goto fail;
} else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
- pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED);
+ pa_context_fail(c, PA_ERR_CONNECTIONREFUSED);
goto fail;
}
@@ -527,7 +540,7 @@ static int try_next_connection(pa_context *c) {
}
#endif
- pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED);
+ pa_context_fail(c, PA_ERR_CONNECTIONREFUSED);
goto finish;
}
@@ -569,7 +582,7 @@ static void on_connection(pa_socket_client *client, pa_iochannel*io, void *userd
goto finish;
}
- pa_context_fail(c, PA_ERROR_CONNECTIONREFUSED);
+ pa_context_fail(c, PA_ERR_CONNECTIONREFUSED);
goto finish;
}
@@ -593,7 +606,7 @@ int pa_context_connect(pa_context *c, const char *server, int spawn, const pa_sp
if (server) {
if (!(c->server_list = pa_strlist_parse(server))) {
- pa_context_fail(c, PA_ERROR_INVALIDSERVER);
+ pa_context_fail(c, PA_ERR_INVALIDSERVER);
goto finish;
}
} else {
@@ -759,7 +772,7 @@ void pa_context_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_U
success = 0;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
diff --git a/src/polyp/def.h b/src/polyp/def.h
index ba35b31e..f8e0bed4 100644
--- a/src/polyp/def.h
+++ b/src/polyp/def.h
@@ -47,7 +47,7 @@ typedef enum pa_context_state {
/** The state of a stream */
typedef enum pa_stream_state {
- PA_STREAM_DISCONNECTED, /**< The stream is not yet connected to any sink or source */
+ PA_STREAM_UNCONNECTED, /**< The stream is not yet connected to any sink or source */
PA_STREAM_CREATING, /**< The stream is being created */
PA_STREAM_READY, /**< The stream is established, you may pass audio data to it now */
PA_STREAM_FAILED, /**< An error occured that made the stream invalid */
@@ -103,22 +103,24 @@ typedef struct pa_buffer_attr {
/** Error values as used by pa_context_errno(). Use pa_strerror() to convert these values to human readable strings */
enum {
- PA_ERROR_OK, /**< No error */
- PA_ERROR_ACCESS, /**< Access failure */
- PA_ERROR_COMMAND, /**< Unknown command */
- PA_ERROR_INVALID, /**< Invalid argument */
- PA_ERROR_EXIST, /**< Entity exists */
- PA_ERROR_NOENTITY, /**< No such entity */
- PA_ERROR_CONNECTIONREFUSED, /**< Connection refused */
- PA_ERROR_PROTOCOL, /**< Protocol error */
- PA_ERROR_TIMEOUT, /**< Timeout */
- PA_ERROR_AUTHKEY, /**< No authorization key */
- PA_ERROR_INTERNAL, /**< Internal error */
- PA_ERROR_CONNECTIONTERMINATED, /**< Connection terminated */
- PA_ERROR_KILLED, /**< Entity killed */
- PA_ERROR_INVALIDSERVER, /**< Invalid server */
- PA_ERROR_INITFAILED, /**< Module initialization failed */
- PA_ERROR_MAX /**< Not really an error but the first invalid error code */
+ PA_OK = 0, /**< No error */
+ PA_ERR_ACCESS, /**< Access failure */
+ PA_ERR_COMMAND, /**< Unknown command */
+ PA_ERR_INVALID, /**< Invalid argument */
+ PA_ERR_EXIST, /**< Entity exists */
+ PA_ERR_NOENTITY, /**< No such entity */
+ PA_ERR_CONNECTIONREFUSED, /**< Connection refused */
+ PA_ERR_PROTOCOL, /**< Protocol error */
+ PA_ERR_TIMEOUT, /**< Timeout */
+ PA_ERR_AUTHKEY, /**< No authorization key */
+ PA_ERR_INTERNAL, /**< Internal error */
+ PA_ERR_CONNECTIONTERMINATED, /**< Connection terminated */
+ PA_ERR_KILLED, /**< Entity killed */
+ PA_ERR_INVALIDSERVER, /**< Invalid server */
+ PA_ERR_MODINITFAILED, /**< Module initialization failed */
+ PA_ERR_BADSTATE, /**< Bad state */
+ PA_ERR_NODATA, /**< No data */
+ PA_ERR_MAX /**< Not really an error but the first invalid error code */
};
/** Subscription event mask, as used by pa_context_subscribe() */
@@ -208,6 +210,15 @@ typedef struct pa_spawn_api {
* passed to the new process. */
} pa_spawn_api;
+/** Seek type \since 0.8*/
+typedef enum pa_seek_mode {
+ PA_SEEK_RELATIVE = 0, /**< Seek relatively to the write index */
+ PA_SEEK_ABSOLUTE = 1, /**< Seek relatively to the start of the buffer queue */
+ PA_SEEK_RELATIVE_ON_READ = 2, /**< Seek relatively to the read index */
+ PA_SEEK_RELATIVE_END = 3, /**< Seek relatively to the current end of the buffer queue */
+} pa_seek_mode_t;
+
+
PA_C_DECL_END
#endif
diff --git a/src/polyp/error.c b/src/polyp/error.c
index ece77bf2..eff37cc8 100644
--- a/src/polyp/error.c
+++ b/src/polyp/error.c
@@ -30,25 +30,28 @@
#include "error.h"
-static const char* const errortab[PA_ERROR_MAX] = {
- [PA_ERROR_OK] = "OK",
- [PA_ERROR_ACCESS] = "Access denied",
- [PA_ERROR_COMMAND] = "Unknown command",
- [PA_ERROR_INVALID] = "Invalid argument",
- [PA_ERROR_EXIST] = "Entity exists",
- [PA_ERROR_NOENTITY] = "No such entity",
- [PA_ERROR_CONNECTIONREFUSED] = "Connection refused",
- [PA_ERROR_PROTOCOL] = "Protocol error",
- [PA_ERROR_TIMEOUT] = "Timeout",
- [PA_ERROR_AUTHKEY] = "No authorization key",
- [PA_ERROR_INTERNAL] = "Internal error",
- [PA_ERROR_CONNECTIONTERMINATED] = "Connection terminated",
- [PA_ERROR_KILLED] = "Entity killed",
- [PA_ERROR_INVALIDSERVER] = "Invalid server",
+static const char* const errortab[PA_ERR_MAX] = {
+ [PA_OK] = "OK",
+ [PA_ERR_ACCESS] = "Access denied",
+ [PA_ERR_COMMAND] = "Unknown command",
+ [PA_ERR_INVALID] = "Invalid argument",
+ [PA_ERR_EXIST] = "Entity exists",
+ [PA_ERR_NOENTITY] = "No such entity",
+ [PA_ERR_CONNECTIONREFUSED] = "Connection refused",
+ [PA_ERR_PROTOCOL] = "Protocol error",
+ [PA_ERR_TIMEOUT] = "Timeout",
+ [PA_ERR_AUTHKEY] = "No authorization key",
+ [PA_ERR_INTERNAL] = "Internal error",
+ [PA_ERR_CONNECTIONTERMINATED] = "Connection terminated",
+ [PA_ERR_KILLED] = "Entity killed",
+ [PA_ERR_INVALIDSERVER] = "Invalid server",
+ [PA_ERR_MODINITFAILED] = "Module initalization failed",
+ [PA_ERR_BADSTATE] = "Bad state",
+ [PA_ERR_NODATA] = "No data",
};
-const char*pa_strerror(uint32_t error) {
- if (error >= PA_ERROR_MAX)
+const char*pa_strerror(int error) {
+ if (error < 0 || error >= PA_ERR_MAX)
return NULL;
return errortab[error];
diff --git a/src/polyp/error.h b/src/polyp/error.h
index ff2507b2..9856c1af 100644
--- a/src/polyp/error.h
+++ b/src/polyp/error.h
@@ -31,7 +31,7 @@
PA_C_DECL_BEGIN
/** Return a human readable error message for the specified numeric error code */
-const char* pa_strerror(uint32_t error);
+const char* pa_strerror(int error);
PA_C_DECL_END
diff --git a/src/polyp/internal.h b/src/polyp/internal.h
index 7f4d38ac..578969ee 100644
--- a/src/polyp/internal.h
+++ b/src/polyp/internal.h
@@ -54,8 +54,9 @@ struct pa_context {
pa_dynarray *record_streams, *playback_streams;
PA_LLIST_HEAD(pa_stream, streams);
PA_LLIST_HEAD(pa_operation, operations);
-
+
uint32_t ctag;
+ uint32_t csyncid;
uint32_t error;
pa_context_state_t state;
@@ -90,6 +91,7 @@ struct pa_stream {
pa_sample_spec sample_spec;
pa_channel_map channel_map;
uint32_t channel;
+ uint32_t syncid;
int channel_valid;
uint32_t device_index;
pa_stream_direction_t direction;
@@ -98,7 +100,6 @@ struct pa_stream {
pa_usec_t previous_time;
pa_usec_t previous_ipol_time;
pa_stream_state_t state;
- pa_mcalign *mcalign;
pa_memchunk peek_memchunk;
pa_memblockq *record_memblockq;
@@ -110,14 +111,20 @@ struct pa_stream {
pa_time_event *ipol_event;
int ipol_requested;
- void (*state_callback)(pa_stream*c, void *userdata);
+ pa_stream_notify_cb_t state_callback;
void *state_userdata;
- void (*read_callback)(pa_stream *p, size_t length, void *userdata);
+ pa_stream_request_cb_t read_callback;
void *read_userdata;
- void (*write_callback)(pa_stream *p, size_t length, void *userdata);
+ pa_stream_request_cb_t write_callback;
void *write_userdata;
+
+ pa_stream_notify_cb_t overflow_callback;
+ void *overflow_userdata;
+
+ pa_stream_notify_cb_t underflow_callback;
+ void *underflow_userdata;
};
typedef void (*pa_operation_callback)(void);
@@ -136,6 +143,7 @@ struct pa_operation {
void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
void pa_command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
pa_operation *pa_operation_new(pa_context *c, pa_stream *s);
void pa_operation_done(pa_operation *o);
@@ -146,6 +154,7 @@ void pa_context_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t
void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
void pa_context_fail(pa_context *c, int error);
+int pa_context_set_error(pa_context *c, int error);
void pa_context_set_state(pa_context *c, pa_context_state_t st);
int pa_context_handle_error(pa_context *c, uint32_t command, pa_tagstruct *t);
pa_operation* pa_context_send_simple_command(pa_context *c, uint32_t command, void (*internal_callback)(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata), void (*cb)(void), void *userdata);
@@ -154,5 +163,23 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st);
void pa_stream_trash_ipol(pa_stream *s);
+#define PA_CHECK_VALIDITY(context, expression, error) do { \
+ if (!(expression)) \
+ return -pa_context_set_error((context), (error)); \
+} while(0)
+
+#define PA_CHECK_VALIDITY_RETURN_NULL(context, expression, error) do { \
+ if (!(expression)) { \
+ pa_context_set_error((context), (error)); \
+ return NULL; \
+ } \
+} while(0)
+
+#define PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, value) do { \
+ if (!(expression)) { \
+ pa_context_set_error((context), (error)); \
+ return value; \
+ } \
+} while(0)
#endif
diff --git a/src/polyp/introspect.c b/src/polyp/introspect.c
index 4af724b4..75ce3ff9 100644
--- a/src/polyp/introspect.c
+++ b/src/polyp/introspect.c
@@ -52,7 +52,7 @@ static void context_stat_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNU
pa_tagstruct_getu32(t, &i.memblock_allocated_size) < 0 ||
pa_tagstruct_getu32(t, &i.scache_size) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -92,7 +92,7 @@ static void context_get_server_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_getu32(t, &i.cookie) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -139,7 +139,7 @@ static void context_get_sink_info_callback(pa_pdispatch *pd, uint32_t command, P
pa_tagstruct_get_usec(t, &i.latency) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -234,7 +234,7 @@ static void context_get_source_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_get_usec(t, &i.latency) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -322,7 +322,7 @@ static void context_get_client_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_gets(t, &i.name) < 0 ||
pa_tagstruct_getu32(t, &i.owner_module) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0 ) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -389,7 +389,7 @@ static void context_get_module_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_gets(t, &i.argument) < 0 ||
pa_tagstruct_getu32(t, &i.n_used) < 0 ||
pa_tagstruct_get_boolean(t, &i.auto_unload) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -464,7 +464,7 @@ static void context_get_sink_input_info_callback(pa_pdispatch *pd, uint32_t comm
pa_tagstruct_gets(t, &i.resample_method) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -538,7 +538,7 @@ static void context_get_source_output_info_callback(pa_pdispatch *pd, uint32_t c
pa_tagstruct_gets(t, &i.resample_method) < 0 ||
pa_tagstruct_gets(t, &i.driver) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -677,7 +677,7 @@ static void context_get_sample_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_get_boolean(t, &i.lazy) < 0 ||
pa_tagstruct_gets(t, &i.filename) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -787,7 +787,7 @@ static void load_module_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUS
} else if (pa_tagstruct_getu32(t, &idx) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -848,7 +848,7 @@ static void context_get_autoload_info_callback(pa_pdispatch *pd, uint32_t comman
pa_tagstruct_getu32(t, &i.type) < 0 ||
pa_tagstruct_gets(t, &i.module) < 0 ||
pa_tagstruct_gets(t, &i.argument) < 0) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -926,7 +926,7 @@ static void context_add_autoload_callback(pa_pdispatch *pd, uint32_t command, PA
idx = PA_INVALID_INDEX;
} else if (pa_tagstruct_getu32(t, &idx) ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
diff --git a/src/polyp/simple.c b/src/polyp/simple.c
index e14cab2e..8a20c223 100644
--- a/src/polyp/simple.c
+++ b/src/polyp/simple.c
@@ -45,13 +45,11 @@ struct pa_simple {
int dead;
- void *read_data;
+ const void *read_data;
size_t read_index, read_length;
pa_usec_t latency;
};
-static void read_callback(pa_stream *s, const void*data, size_t length, void *userdata);
-
static int check_error(pa_simple *p, int *rerror) {
pa_context_state_t cst;
pa_stream_state_t sst;
@@ -92,7 +90,7 @@ static int iterate(pa_simple *p, int block, int *rerror) {
do {
if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) {
if (rerror)
- *rerror = PA_ERROR_INTERNAL;
+ *rerror = PA_ERR_INTERNAL;
return -1;
}
@@ -106,7 +104,7 @@ static int iterate(pa_simple *p, int block, int *rerror) {
if (pa_mainloop_iterate(p->mainloop, 0, NULL) < 0) {
if (rerror)
- *rerror = PA_ERROR_INTERNAL;
+ *rerror = PA_ERR_INTERNAL;
return -1;
}
@@ -128,7 +126,7 @@ pa_simple* pa_simple_new(
int *rerror) {
pa_simple *p;
- int error = PA_ERROR_INTERNAL;
+ int error = PA_ERR_INTERNAL;
assert(ss && (dir == PA_STREAM_PLAYBACK || dir == PA_STREAM_RECORD));
p = pa_xmalloc(sizeof(pa_simple));
@@ -157,7 +155,7 @@ pa_simple* pa_simple_new(
goto fail;
if (dir == PA_STREAM_PLAYBACK)
- pa_stream_connect_playback(p->stream, dev, attr, 0, NULL);
+ pa_stream_connect_playback(p->stream, dev, attr, 0, NULL, NULL);
else
pa_stream_connect_record(p->stream, dev, attr, 0);
@@ -167,8 +165,6 @@ pa_simple* pa_simple_new(
goto fail;
}
- pa_stream_set_read_callback(p->stream, read_callback, p);
-
return p;
fail:
@@ -181,8 +177,6 @@ fail:
void pa_simple_free(pa_simple *s) {
assert(s);
- pa_xfree(s->read_data);
-
if (s->stream)
pa_stream_unref(s->stream);
@@ -215,7 +209,7 @@ int pa_simple_write(pa_simple *p, const void*data, size_t length, int *rerror) {
if (l > length)
l = length;
- pa_stream_write(p->stream, data, l, NULL, 0);
+ pa_stream_write(p->stream, data, l, NULL, 0, PA_SEEK_RELATIVE);
data = (const uint8_t*) data + l;
length -= l;
}
@@ -227,19 +221,6 @@ int pa_simple_write(pa_simple *p, const void*data, size_t length, int *rerror) {
return 0;
}
-static void read_callback(pa_stream *s, const void*data, size_t length, void *userdata) {
- pa_simple *p = userdata;
- assert(s && data && length && p);
-
- if (p->read_data) {
- pa_log(__FILE__": Buffer overflow, dropping incoming memory blocks.\n");
- pa_xfree(p->read_data);
- }
-
- p->read_data = pa_xmemdup(data, p->read_length = length);
- p->read_index = 0;
-}
-
int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) {
assert(p && data && p->direction == PA_STREAM_RECORD);
@@ -251,13 +232,18 @@ int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) {
}
while (length > 0) {
+
+ if (!p->read_data)
+ if (pa_stream_peek(p->stream, &p->read_data, &p->read_length) >= 0)
+ p->read_index = 0;
+
if (p->read_data) {
size_t l = length;
if (p->read_length <= l)
l = p->read_length;
- memcpy(data, (uint8_t*) p->read_data+p->read_index, l);
+ memcpy(data, (const uint8_t*) p->read_data+p->read_index, l);
data = (uint8_t*) data + l;
length -= l;
@@ -266,8 +252,9 @@ int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) {
p->read_length -= l;
if (!p->read_length) {
- pa_xfree(p->read_data);
+ pa_stream_drop(p->stream);
p->read_data = NULL;
+ p->read_length = 0;
p->read_index = 0;
}
diff --git a/src/polyp/stream.c b/src/polyp/stream.c
index 35de2d01..5497f0c4 100644
--- a/src/polyp/stream.c
+++ b/src/polyp/stream.c
@@ -28,6 +28,7 @@
#include <stdio.h>
#include <string.h>
+#include <polyp/def.h>
#include <polypcore/xmalloc.h>
#include <polypcore/pstream-util.h>
#include <polypcore/util.h>
@@ -39,14 +40,11 @@
pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
pa_stream *s;
+
assert(c);
- assert(ss);
-
- if (!pa_sample_spec_valid(ss))
- return NULL;
- if (map && !pa_channel_map_valid(map))
- return NULL;
+ PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
+ PA_CHECK_VALIDITY_RETURN_NULL(c, !map || pa_channel_map_valid(map), PA_ERR_INVALID);
s = pa_xnew(pa_stream, 1);
s->ref = 1;
@@ -59,6 +57,10 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->write_userdata = NULL;
s->state_callback = NULL;
s->state_userdata = NULL;
+ s->overflow_callback = NULL;
+ s->overflow_userdata = NULL;
+ s->underflow_callback = NULL;
+ s->underflow_userdata = NULL;
s->direction = PA_STREAM_NODIRECTION;
s->name = pa_xstrdup(name);
@@ -71,13 +73,13 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->channel = 0;
s->channel_valid = 0;
+ s->syncid = c->csyncid++;
s->device_index = PA_INVALID_INDEX;
s->requested_bytes = 0;
- s->state = PA_STREAM_DISCONNECTED;
+ s->state = PA_STREAM_UNCONNECTED;
memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
- s->mcalign = pa_mcalign_new(pa_frame_size(ss), c->memblock_stat);
-
+ s->peek_memchunk.index = 0;
s->peek_memchunk.length = 0;
s->peek_memchunk.memblock = NULL;
@@ -114,42 +116,52 @@ static void stream_free(pa_stream *s) {
if (s->record_memblockq)
pa_memblockq_free(s->record_memblockq);
- pa_mcalign_free(s->mcalign);
-
pa_xfree(s->name);
pa_xfree(s);
}
void pa_stream_unref(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
if (--(s->ref) == 0)
stream_free(s);
}
pa_stream* pa_stream_ref(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
s->ref++;
return s;
}
pa_stream_state_t pa_stream_get_state(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
return s->state;
}
pa_context* pa_stream_get_context(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
return s->context;
}
uint32_t pa_stream_get_index(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
+
return s->device_index;
}
void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
if (s->state == st)
return;
@@ -159,6 +171,8 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
s->state = st;
if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) {
+ /* Detach from context */
+
if (s->channel_valid)
pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
@@ -182,14 +196,14 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
if (pa_tagstruct_getu32(t, &channel) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}
if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
goto finish;
- c->error = PA_ERROR_KILLED;
+ c->error = PA_ERR_KILLED;
pa_stream_set_state(s, PA_STREAM_FAILED);
finish:
@@ -207,24 +221,55 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32
if (pa_tagstruct_getu32(t, &channel) < 0 ||
pa_tagstruct_getu32(t, &bytes) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}
if (!(s = pa_dynarray_get(c->playback_streams, channel)))
goto finish;
- if (s->state != PA_STREAM_READY)
- goto finish;
+ if (s->state == PA_STREAM_READY) {
+ s->requested_bytes += bytes;
+
+ if (s->requested_bytes > 0 && s->write_callback)
+ s->write_callback(s, s->requested_bytes, s->write_userdata);
+ }
- pa_stream_ref(s);
+finish:
+ pa_context_unref(c);
+}
+
+void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
+ pa_stream *s;
+ pa_context *c = userdata;
+ uint32_t channel;
+
+ assert(pd);
+ assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
+ assert(t);
+ assert(c);
+
+ pa_context_ref(c);
+
+ if (pa_tagstruct_getu32(t, &channel) < 0 ||
+ !pa_tagstruct_eof(t)) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
- s->requested_bytes += bytes;
+ if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+ goto finish;
- if (s->requested_bytes && s->write_callback)
- s->write_callback(s, s->requested_bytes, s->write_userdata);
+ if (s->state == PA_STREAM_READY) {
- pa_stream_unref(s);
+ if (command == PA_COMMAND_OVERFLOW) {
+ if (s->overflow_callback)
+ s->overflow_callback(s, s->overflow_userdata);
+ } else if (command == PA_COMMAND_UNDERFLOW) {
+ if (s->underflow_callback)
+ s->underflow_callback(s, s->underflow_userdata);
+ }
+ }
finish:
pa_context_unref(c);
@@ -270,14 +315,21 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(s->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
if (s->direction == PA_STREAM_RECORD) {
assert(!s->record_memblockq);
- s->record_memblockq = pa_memblockq_new(s->buffer_attr.maxlength, 0,
- pa_frame_size(&s->sample_spec), 0, 0, s->context->memblock_stat);
+ s->record_memblockq = pa_memblockq_new(
+ 0,
+ s->buffer_attr.maxlength,
+ 0,
+ pa_frame_size(&s->sample_spec),
+ 1,
+ 0,
+ NULL,
+ s->context->memblock_stat);
assert(s->record_memblockq);
}
@@ -303,13 +355,32 @@ finish:
pa_stream_unref(s);
}
-static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume) {
+static int create_stream(
+ pa_stream_direction_t direction,
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags,
+ const pa_cvolume *volume,
+ pa_stream *sync_stream) {
+
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1 && s->state == PA_STREAM_DISCONNECTED);
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, (flags & ~(PA_STREAM_START_CORKED|PA_STREAM_INTERPOLATE_LATENCY)) == 0, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || flags == 0, PA_ERR_INVALID);
pa_stream_ref(s);
+ s->direction = direction;
+
+ if (sync_stream)
+ s->syncid = sync_stream->syncid;
+
s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY);
pa_stream_trash_ipol(s);
@@ -336,25 +407,28 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a
dev = s->context->conf->default_source;
}
- pa_tagstruct_put(t,
- PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
- PA_TAG_U32, tag = s->context->ctag++,
- PA_TAG_STRING, s->name,
- PA_TAG_SAMPLE_SPEC, &s->sample_spec,
- PA_TAG_CHANNEL_MAP, &s->channel_map,
- PA_TAG_U32, PA_INVALID_INDEX,
- PA_TAG_STRING, dev,
- PA_TAG_U32, s->buffer_attr.maxlength,
- PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
- PA_TAG_INVALID);
+ pa_tagstruct_put(
+ t,
+ PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
+ PA_TAG_U32, tag = s->context->ctag++,
+ PA_TAG_STRING, s->name,
+ PA_TAG_SAMPLE_SPEC, &s->sample_spec,
+ PA_TAG_CHANNEL_MAP, &s->channel_map,
+ PA_TAG_U32, PA_INVALID_INDEX,
+ PA_TAG_STRING, dev,
+ PA_TAG_U32, s->buffer_attr.maxlength,
+ PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
+ PA_TAG_INVALID);
if (s->direction == PA_STREAM_PLAYBACK) {
pa_cvolume cv;
- pa_tagstruct_put(t,
- PA_TAG_U32, s->buffer_attr.tlength,
- PA_TAG_U32, s->buffer_attr.prebuf,
- PA_TAG_U32, s->buffer_attr.minreq,
- PA_TAG_INVALID);
+ pa_tagstruct_put(
+ t,
+ PA_TAG_U32, s->buffer_attr.tlength,
+ PA_TAG_U32, s->buffer_attr.prebuf,
+ PA_TAG_U32, s->buffer_attr.minreq,
+ PA_TAG_U32, s->syncid,
+ PA_TAG_INVALID);
if (!volume) {
pa_cvolume_reset(&cv, s->sample_spec.channels);
@@ -369,23 +443,57 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s);
pa_stream_unref(s);
+ return 0;
}
-void pa_stream_connect_playback(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, pa_cvolume *volume) {
- assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
- s->direction = PA_STREAM_PLAYBACK;
- create_stream(s, dev, attr, flags, volume);
+int pa_stream_connect_playback(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags,
+ pa_cvolume *volume,
+ pa_stream *sync_stream) {
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
}
-void pa_stream_connect_record(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags) {
- assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
- s->direction = PA_STREAM_RECORD;
- create_stream(s, dev, attr, flags, 0);
+int pa_stream_connect_record(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags) {
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
}
-void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) {
+int pa_stream_write(
+ pa_stream *s,
+ const void *data,
+ size_t length,
+ void (*free_cb)(void *p),
+ int64_t offset,
+ pa_seek_mode_t seek) {
+
pa_memchunk chunk;
- assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
+
+ assert(s);
+ assert(s->ref >= 1);
+ assert(s->context);
+ assert(data);
+
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
+
+ if (length <= 0)
+ return 0;
if (free_cb) {
chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
@@ -398,7 +506,7 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_
chunk.index = 0;
chunk.length = length;
- pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk);
+ pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
pa_memblock_unref(chunk.memblock);
if (length < s->requested_bytes)
@@ -407,72 +515,87 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_
s->requested_bytes = 0;
s->counter += length;
+ return 0;
}
-void pa_stream_peek(pa_stream *s, void **data, size_t *length) {
- assert(s && s->record_memblockq && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
+int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
+ assert(s);
+ assert(s->ref >= 1);
+ assert(data);
+ assert(length);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
+
if (!s->peek_memchunk.memblock) {
- *data = NULL;
- *length = 0;
-
- if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0)
- return;
- pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+ if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
+ *data = NULL;
+ *length = 0;
+ return 0;
+ }
}
- *data = (char*)s->peek_memchunk.memblock->data + s->peek_memchunk.index;
+ *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index;
*length = s->peek_memchunk.length;
+ return 0;
}
-void pa_stream_drop(pa_stream *s) {
- assert(s && s->peek_memchunk.memblock && s->state == PA_STREAM_READY && s->ref >= 1);
-
- s->counter += s->peek_memchunk.length;
+int pa_stream_drop(pa_stream *s) {
+ assert(s);
+ assert(s->ref >= 1);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
+
+ pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+
pa_memblock_unref(s->peek_memchunk.memblock);
-
s->peek_memchunk.length = 0;
+ s->peek_memchunk.index = 0;
s->peek_memchunk.memblock = NULL;
+
+ s->counter += s->peek_memchunk.length;
+ return 0;
}
size_t pa_stream_writable_size(pa_stream *s) {
- assert(s && s->ref >= 1);
- return s->state == PA_STREAM_READY ? s->requested_bytes : 0;
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE, (size_t) -1);
+
+ return s->requested_bytes;
}
size_t pa_stream_readable_size(pa_stream *s) {
- size_t sz;
-
- assert(s && s->ref >= 1);
-
- if (s->state != PA_STREAM_READY)
- return 0;
-
- assert(s->record_memblockq);
-
- sz = (size_t)pa_memblockq_get_length(s->record_memblockq);
+ assert(s);
+ assert(s->ref >= 1);
- if (s->peek_memchunk.memblock)
- sz += s->peek_memchunk.length;
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
- return sz;
+ return pa_memblockq_get_length(s->record_memblockq);
}
pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) {
pa_operation *o;
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
o = pa_operation_new(s->context, s);
- assert(o);
o->callback = (pa_operation_callback) cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
@@ -501,7 +624,7 @@ static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_get_timeval(t, &remote) < 0 ||
pa_tagstruct_getu64(t, &i.counter) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
} else {
pa_gettimeofday(&now);
@@ -549,15 +672,18 @@ pa_operation* pa_stream_get_latency_info(pa_stream *s, void (*cb)(pa_stream *p,
pa_operation *o;
pa_tagstruct *t;
struct timeval now;
- assert(s && s->direction != PA_STREAM_UPLOAD);
+
+ assert(s);
+ assert(s->ref >= 1);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+
o = pa_operation_new(s->context, s);
- assert(o);
o->callback = (pa_operation_callback) cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
@@ -585,7 +711,7 @@ void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN
pa_stream_set_state(s, PA_STREAM_FAILED);
goto finish;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(s->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -595,18 +721,19 @@ finish:
pa_stream_unref(s);
}
-void pa_stream_disconnect(pa_stream *s) {
+int pa_stream_disconnect(pa_stream *s) {
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1);
- if (!s->channel_valid || !s->context->state == PA_CONTEXT_READY)
- return;
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
pa_stream_ref(s);
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
(s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM));
@@ -616,26 +743,49 @@ void pa_stream_disconnect(pa_stream *s) {
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s);
pa_stream_unref(s);
+ return 0;
}
-void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->read_callback = cb;
s->read_userdata = userdata;
}
-void pa_stream_set_write_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->write_callback = cb;
s->write_userdata = userdata;
}
-void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->state_callback = cb;
s->state_userdata = userdata;
}
+void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
+ s->overflow_callback = cb;
+ s->overflow_userdata = userdata;
+}
+
+void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
+ s->underflow_callback = cb;
+ s->underflow_userdata = userdata;
+}
+
void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_operation *o = userdata;
int success = 1;
@@ -647,7 +797,7 @@ void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN
success = 0;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}
diff --git a/src/polyp/stream.h b/src/polyp/stream.h
index 9bbda436..ce535963 100644
--- a/src/polyp/stream.h
+++ b/src/polyp/stream.h
@@ -40,8 +40,27 @@ PA_C_DECL_BEGIN
* An opaque stream for playback or recording */
typedef struct pa_stream pa_stream;
+/** A generic callback for operation completion */
+typedef void (*pa_stream_success_cb_t) (pa_stream*s, int success, void *userdata);
+
+/** A generic free callback */
+typedef void (*pa_free_cb_t)(void *p);
+
+/** A generic request callback */
+typedef void (*pa_stream_request_cb_t)(pa_stream *p, size_t length, void *userdata);
+
+/** A generic notification callback */
+typedef void (*pa_stream_notify_cb_t)(pa_stream *p, void *userdata);
+
+/** Callback prototype for pa_stream_get_latency_info() */
+typedef void (*pa_stream_get_latency_info_cb_t)(pa_stream *p, const pa_latency_info *i, void *userdata);
+
/** Create a new, unconnected stream with the specified name and sample type */
-pa_stream* pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map);
+pa_stream* pa_stream_new(
+ pa_context *c,
+ const char *name,
+ const pa_sample_spec *ss,
+ const pa_channel_map *map);
/** Decrease the reference counter by one */
void pa_stream_unref(pa_stream *s);
@@ -59,108 +78,101 @@ pa_context* pa_stream_get_context(pa_stream *p);
uint32_t pa_stream_get_index(pa_stream *s);
/** Connect the stream to a sink */
-void pa_stream_connect_playback(
- pa_stream *s,
- const char *dev,
- const pa_buffer_attr *attr,
- pa_stream_flags_t flags,
- pa_cvolume *volume);
+int pa_stream_connect_playback(
+ pa_stream *s /**< The stream to connect to a sink */,
+ const char *dev /**< Name of the sink to connect to, or NULL for default */ ,
+ const pa_buffer_attr *attr /**< Buffering attributes, or NULL for default */,
+ pa_stream_flags_t flags /**< Additional flags, or 0 for default */,
+ pa_cvolume *volume /**< Initial volume, or NULL for default */,
+ pa_stream *sync_stream /**< Synchronize this stream with the specified one, or NULL for a standalone stream*/);
/** Connect the stream to a source */
-void pa_stream_connect_record(
- pa_stream *s,
- const char *dev,
- const pa_buffer_attr *attr,
- pa_stream_flags_t flags);
+int pa_stream_connect_record(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags);
/** Disconnect a stream from a source/sink */
-void pa_stream_disconnect(pa_stream *s);
+int pa_stream_disconnect(pa_stream *s);
/** Write some data to the server (for playback sinks), if free_cb is
* non-NULL this routine is called when all data has been written out
* and an internal reference to the specified data is kept, the data
* is not copied. If NULL, the data is copied into an internal
- * buffer. */
-void pa_stream_write(pa_stream *p /**< The stream to use */,
- const void *data /**< The data to write */,
- size_t length /**< The length of the data to write */,
- void (*free_cb)(void *p) /**< A cleanup routine for the data or NULL to request an internal copy */,
- size_t delta /**< Drop this many
- bytes in the playback
- buffer before writing
- this data. Use
- (size_t) -1 for
- clearing the whole
- playback
- buffer. Normally you
- will specify 0 here,
- i.e. append to the
- playback buffer. If
- the value given here
- is greater than the
- buffered data length
- the buffer is cleared
- and the data is
- written to the
- buffer's start. This
- value is ignored on
- upload streams. */);
-
-/** Read the next fragment from the buffer (for capture sources).
+ * buffer. The client my freely seek around in the output buffer. For
+ * most applications passing 0 and PA_SEEK_RELATIVE as arguments for
+ * offset and seek should be useful.*/
+int pa_stream_write(
+ pa_stream *p /**< The stream to use */,
+ const void *data /**< The data to write */,
+ size_t length /**< The length of the data to write */,
+ pa_free_cb_t free_cb /**< A cleanup routine for the data or NULL to request an internal copy */,
+ int64_t offset, /**< Offset for seeking, must be 0 for upload streams */
+ pa_seek_mode_t seek /**< Seek mode, must be PA_SEEK_RELATIVE for upload streams */);
+
+/** Read the next fragment from the buffer (for recording).
* data will point to the actual data and length will contain the size
* of the data in bytes (which can be less than a complete framgnet).
- * Use pa_stream_drop() to actually remove the data from the buffer.
- * \since 0.8 */
-void pa_stream_peek(pa_stream *p /**< The stream to use */,
- void **data /**< Pointer to pointer that will point to data */,
- size_t *length /**< The length of the data read */);
+ * Use pa_stream_drop() to actually remove the data from the
+ * buffer. If no data is available will return a NULL pointer \since 0.8 */
+int pa_stream_peek(
+ pa_stream *p /**< The stream to use */,
+ const void **data /**< Pointer to pointer that will point to data */,
+ size_t *length /**< The length of the data read */);
/** Remove the current fragment. It is invalid to do this without first
* calling pa_stream_peek(). \since 0.8 */
-void pa_stream_drop(pa_stream *p);
+int pa_stream_drop(pa_stream *p);
-/** Return the amount of bytes that may be written using pa_stream_write() */
+/** Return the nember of bytes that may be written using pa_stream_write() */
size_t pa_stream_writable_size(pa_stream *p);
-/** Return the ammount of bytes that may be read using pa_stream_read() \since 0.8 */
+/** Return the number of bytes that may be read using pa_stream_read() \since 0.8 */
size_t pa_stream_readable_size(pa_stream *p);
-/** Drain a playback stream */
-pa_operation* pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata);
+/** Drain a playback stream. Use this for notification when the buffer is empty */
+pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
/** Get the playback latency of a stream */
-pa_operation* pa_stream_get_latency_info(pa_stream *p, void (*cb)(pa_stream *p, const pa_latency_info *i, void *userdata), void *userdata);
+pa_operation* pa_stream_get_latency_info(pa_stream *p, pa_stream_get_latency_info_cb_t cby, void *userdata);
/** Set the callback function that is called whenever the state of the stream changes */
-void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata);
+void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata);
/** Set the callback function that is called when new data may be
* written to the stream. */
-void pa_stream_set_write_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata);
+void pa_stream_set_write_callback(pa_stream *p, pa_stream_request_cb_t cb, void *userdata);
/** Set the callback function that is called when new data is available from the stream.
- * Return the number of bytes read. \since 0.8
- */
-void pa_stream_set_read_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata);
+ * Return the number of bytes read. \since 0.8 */
+void pa_stream_set_read_callback(pa_stream *p, pa_stream_request_cb_t cb, void *userdata);
+
+/** Set the callback function that is called when a buffer overflow happens. (Only for playback streams) \since 0.8 */
+void pa_stream_set_overflow_callback(pa_stream *p, pa_stream_notify_cb_t cb, void *userdata);
+
+/** Set the callback function that is called when a buffer underflow happens. (Only for playback streams) \since 0.8 */
+void pa_stream_set_underflow_callback(pa_stream *p, pa_stream_notify_cb_t cb, void *userdata);
/** Pause (or resume) playback of this stream temporarily. Available on both playback and recording streams. \since 0.3 */
-pa_operation* pa_stream_cork(pa_stream *s, int b, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata);
+pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata);
/** Flush the playback buffer of this stream. Most of the time you're
* better off using the parameter delta of pa_stream_write() instead of this
* function. Available on both playback and recording streams. \since 0.3 */
-pa_operation* pa_stream_flush(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata);
+pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
-/** Reenable prebuffering. Available for playback streams only. \since 0.6 */
-pa_operation* pa_stream_prebuf(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata);
+/** Reenable prebuffering as specified in the pa_buffer_attr
+ * structure. Available for playback streams only. \since 0.6 */
+pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
/** Request immediate start of playback on this stream. This disables
- * prebuffering as specified in the pa_buffer_attr structure. Available for playback streams only. \since
- * 0.3 */
-pa_operation* pa_stream_trigger(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata);
+ * prebuffering as specified in the pa_buffer_attr
+ * structure, temporarily. Available for playback streams only. \since 0.3 */
+pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);
/** Rename the stream. \since 0.5 */
-pa_operation* pa_stream_set_name(pa_stream *s, const char *name, void(*cb)(pa_stream*c, int success, void *userdata), void *userdata);
+pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata);
/** Return the total number of bytes written to/read from the
* stream. This counter is not reset on pa_stream_flush(), you may do
diff --git a/src/polyp/subscribe.c b/src/polyp/subscribe.c
index b90e0bf1..4e00997a 100644
--- a/src/polyp/subscribe.c
+++ b/src/polyp/subscribe.c
@@ -44,7 +44,7 @@ void pa_command_subscribe_event(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSE
if (pa_tagstruct_getu32(t, &e) < 0 ||
pa_tagstruct_getu32(t, &index) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}