From e8d1185c4221fef9d712c1f375d8e592721b6943 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 7 Jul 2004 00:22:46 +0000 Subject: draining ind native protocol fixes in callback code on object destruction simple protocol git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@52 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/polyp.c | 363 ++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 267 insertions(+), 96 deletions(-) (limited to 'src/polyp.c') diff --git a/src/polyp.c b/src/polyp.c index c15d5d9f..9af8d468 100644 --- a/src/polyp.c +++ b/src/polyp.c @@ -11,10 +11,13 @@ #include "socket-client.h" #include "pstream-util.h" #include "authkey.h" +#include "util.h" -#define DEFAULT_QUEUE_LENGTH 10240 -#define DEFAULT_MAX_LENGTH 20480 +#define DEFAULT_MAXLENGTH 20480 +#define DEFAULT_TLENGTH 10240 #define DEFAULT_PREBUF 4096 +#define DEFAULT_MINREQ 1024 + #define DEFAULT_TIMEOUT (5*60) #define DEFAULT_SERVER "/tmp/polypaudio/native" @@ -28,25 +31,40 @@ struct pa_context { struct pa_stream *first_stream; uint32_t ctag; uint32_t error; - enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_AUTHORIZING, CONTEXT_SETTING_NAME, CONTEXT_READY, CONTEXT_DEAD} state; + enum { + CONTEXT_UNCONNECTED, + CONTEXT_CONNECTING, + CONTEXT_AUTHORIZING, + CONTEXT_SETTING_NAME, + CONTEXT_READY, + CONTEXT_DEAD + } state; void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata); void *connect_complete_userdata; + void (*drain_complete_callback)(struct pa_context*c, void *userdata); + void *drain_complete_userdata; + void (*die_callback)(struct pa_context*c, void *userdata); void *die_userdata; - + uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH]; }; struct pa_stream { struct pa_context *context; struct pa_stream *next, *previous; + + char *name; + struct pa_buffer_attr buffer_attr; + struct pa_sample_spec sample_spec; uint32_t device_index; uint32_t channel; int channel_valid; enum pa_stream_direction direction; - enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state; + + enum { STREAM_LOOKING_UP, STREAM_CREATING, STREAM_READY, STREAM_DEAD} state; uint32_t requested_bytes; void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata); @@ -62,7 +80,7 @@ struct pa_stream { void *die_userdata; }; -static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ERROR] = { NULL }, @@ -76,8 +94,9 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { }; struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) { - assert(mainloop && name); struct pa_context *c; + assert(mainloop && name); + c = malloc(sizeof(struct pa_context)); assert(c); c->name = strdup(name); @@ -95,9 +114,13 @@ struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char * c->connect_complete_callback = NULL; c->connect_complete_userdata = NULL; + c->drain_complete_callback = NULL; + c->drain_complete_userdata = NULL; + c->die_callback = NULL; c->die_userdata = NULL; - + + pa_check_for_sigpipe(); return c; } @@ -121,84 +144,105 @@ void pa_context_free(struct pa_context *c) { } static void stream_dead(struct pa_stream *s) { + assert(s); + if (s->state == STREAM_DEAD) return; - - s->state = STREAM_DEAD; - if (s->die_callback) - s->die_callback(s, s->die_userdata); + + if (s->state == STREAM_READY) { + s->state = STREAM_DEAD; + if (s->die_callback) + s->die_callback(s, s->die_userdata); + } else + s->state = STREAM_DEAD; } static void context_dead(struct pa_context *c) { struct pa_stream *s; assert(c); - for (s = c->first_stream; s; s = s->next) - stream_dead(s); - if (c->state == CONTEXT_DEAD) return; + + if (c->pdispatch) + pa_pdispatch_free(c->pdispatch); + c->pdispatch = NULL; + + if (c->pstream) + pa_pstream_free(c->pstream); + c->pstream = NULL; + + if (c->client) + pa_socket_client_free(c->client); + c->client = NULL; - c->state = CONTEXT_DEAD; - if (c->die_callback) - c->die_callback(c, c->die_userdata); + for (s = c->first_stream; s; s = s->next) + stream_dead(s); + + if (c->state == CONTEXT_READY) { + c->state = CONTEXT_DEAD; + if (c->die_callback) + c->die_callback(c, c->die_userdata); + } else + s->state = CONTEXT_DEAD; } static void pstream_die_callback(struct pa_pstream *p, void *userdata) { struct pa_context *c = userdata; assert(p && c); - - assert(c->state != CONTEXT_DEAD); - - c->state = CONTEXT_DEAD; - context_dead(c); } -static int pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { +static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { struct pa_context *c = userdata; assert(p && packet && c); if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) { fprintf(stderr, "polyp.c: invalid packet.\n"); context_dead(c); - return -1; } - - - return 0; } -static int pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { +static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { struct pa_context *c = userdata; struct pa_stream *s; assert(p && chunk && c && chunk->memblock && chunk->memblock->data); if (!(s = pa_dynarray_get(c->streams, channel))) - return 0; + return; if (s->read_callback) s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata); +} + +static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) { + assert(c && t); - return 0; + if (command == PA_COMMAND_ERROR) { + if (pa_tagstruct_getu32(t, &c->error) < 0) { + c->error = PA_ERROR_PROTOCOL; + return -1; + } + + return 0; + } + + c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL; + return -1; } -static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct pa_context *c = userdata; assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME)); if (command != PA_COMMAND_REPLY) { - if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &c->error) < 0) - c->error = PA_ERROR_PROTOCOL; - else if (command == PA_COMMAND_TIMEOUT) - c->error = PA_ERROR_TIMEOUT; - - c->state = CONTEXT_DEAD; + handle_error(c, command, t); + context_dead(c); if (c->connect_complete_callback) c->connect_complete_callback(c, 0, c->connect_complete_userdata); - return -1; + return; } if (c->state == CONTEXT_AUTHORIZING) { @@ -210,7 +254,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin pa_tagstruct_putu32(t, tag = c->ctag++); pa_tagstruct_puts(t, c->name); pa_pstream_send_tagstruct(c->pstream, t); - pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c); + pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c); } else { assert(c->state == CONTEXT_SETTING_NAME); @@ -220,7 +264,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin c->connect_complete_callback(c, 1, c->connect_complete_userdata); } - return 0; + return; } static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) { @@ -234,7 +278,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i if (!io) { c->error = PA_ERROR_CONNECTIONREFUSED; - c->state = CONTEXT_UNCONNECTED; + context_dead(c); if (c->connect_complete_callback) c->connect_complete_callback(c, 0, c->connect_complete_userdata); @@ -257,7 +301,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i pa_tagstruct_putu32(t, tag = c->ctag++); pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie)); pa_pstream_send_tagstruct(c->pstream, t); - pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c); + pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c); c->state = CONTEXT_AUTHORIZING; } @@ -305,7 +349,7 @@ void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_cont c->die_userdata = userdata; } -static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct pa_stream *s; struct pa_context *c = userdata; uint32_t bytes, channel; @@ -315,63 +359,122 @@ static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t t pa_tagstruct_getu32(t, &bytes) < 0 || !pa_tagstruct_eof(t)) { c->error = PA_ERROR_PROTOCOL; - return -1; + context_dead(c); + return; } - if (!(s = pa_dynarray_get(c->streams, channel))) { - c->error = PA_ERROR_PROTOCOL; - return -1; - } + if (!(s = pa_dynarray_get(c->streams, channel))) + return; - /*fprintf(stderr, "Requested %u bytes\n", bytes);*/ + if (s->state != STREAM_READY) + return; s->requested_bytes += bytes; if (s->requested_bytes && s->write_callback) s->write_callback(s, s->requested_bytes, s->write_userdata); - - return 0; } -static int create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { - int ret = 0; +static void create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct pa_stream *s = userdata; assert(pd && s && s->state == STREAM_CREATING); if (command != PA_COMMAND_REPLY) { - struct pa_context *c = s->context; - assert(c); + if (handle_error(s->context, command, t) < 0) { + context_dead(s->context); + return; + } - if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &s->context->error) < 0) - s->context->error = PA_ERROR_PROTOCOL; - else if (command == PA_COMMAND_TIMEOUT) - s->context->error = PA_ERROR_TIMEOUT; - - ret = -1; - goto fail; + stream_dead(s); + if (s->create_complete_callback) + s->create_complete_callback(s, 0, s->create_complete_userdata); + + return; } if (pa_tagstruct_getu32(t, &s->channel) < 0 || pa_tagstruct_getu32(t, &s->device_index) < 0 || !pa_tagstruct_eof(t)) { s->context->error = PA_ERROR_PROTOCOL; - ret = -1; - goto fail; + context_dead(s->context); + return; } s->channel_valid = 1; pa_dynarray_put(s->context->streams, s->channel, s); s->state = STREAM_READY; - assert(s->create_complete_callback); - s->create_complete_callback(s, 1, s->create_complete_userdata); - return 0; + if (s->create_complete_callback) + s->create_complete_callback(s, 1, s->create_complete_userdata); +} + +static void create_stream(struct pa_stream *s, uint32_t tdev_index) { + struct pa_tagstruct *t; + uint32_t tag; + assert(s); -fail: - assert(s->create_complete_callback); - s->create_complete_callback(s, 0, s->create_complete_userdata); - pa_stream_free(s); - return ret; + s->state = STREAM_CREATING; + + t = pa_tagstruct_new(NULL, 0); + assert(t); + + pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM); + pa_tagstruct_putu32(t, tag = s->context->ctag++); + pa_tagstruct_puts(t, s->name); + pa_tagstruct_put_sample_spec(t, &s->sample_spec); + pa_tagstruct_putu32(t, tdev_index); + pa_tagstruct_putu32(t, s->buffer_attr.maxlength); + pa_tagstruct_putu32(t, s->buffer_attr.tlength); + pa_tagstruct_putu32(t, s->buffer_attr.prebuf); + pa_tagstruct_putu32(t, s->buffer_attr.minreq); + + pa_pstream_send_tagstruct(s->context->pstream, t); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s); +} + +static void lookup_device_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct pa_stream *s = userdata; + uint32_t tdev; + assert(pd && s && s->state == STREAM_LOOKING_UP); + + if (command != PA_COMMAND_REPLY) { + if (handle_error(s->context, command, t) < 0) { + context_dead(s->context); + return; + } + + stream_dead(s); + if (s->create_complete_callback) + s->create_complete_callback(s, 0, s->create_complete_userdata); + return; + } + + if (pa_tagstruct_getu32(t, &tdev) < 0 || + !pa_tagstruct_eof(t)) { + s->context->error = PA_ERROR_PROTOCOL; + context_dead(s->context); + return; + } + + create_stream(s, tdev); +} + +static void lookup_device(struct pa_stream *s, const char *tdev) { + struct pa_tagstruct *t; + uint32_t tag; + assert(s); + + s->state = STREAM_LOOKING_UP; + + t = pa_tagstruct_new(NULL, 0); + assert(t); + + pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_LOOKUP_SINK : PA_COMMAND_LOOKUP_SOURCE); + pa_tagstruct_putu32(t, tag = s->context->ctag++); + pa_tagstruct_puts(t, tdev); + + pa_pstream_send_tagstruct(s->context->pstream, t); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, lookup_device_callback, s); } struct pa_stream* pa_stream_new( @@ -385,10 +488,8 @@ struct pa_stream* pa_stream_new( void *userdata) { struct pa_stream *s; - struct pa_tagstruct *t; - uint32_t tag; - assert(c && name && ss && c->state == CONTEXT_READY && complete); + assert(c && name && ss && c->state == CONTEXT_READY); s = malloc(sizeof(struct pa_stream)); assert(s); @@ -403,42 +504,43 @@ struct pa_stream* pa_stream_new( s->create_complete_callback = complete; s->create_complete_userdata = NULL; + s->name = strdup(name); s->state = STREAM_CREATING; s->requested_bytes = 0; s->channel = 0; s->channel_valid = 0; s->device_index = (uint32_t) -1; s->direction = dir; + s->sample_spec = *ss; + if (attr) + s->buffer_attr = *attr; + else { + s->buffer_attr.maxlength = DEFAULT_MAXLENGTH; + s->buffer_attr.tlength = DEFAULT_TLENGTH; + s->buffer_attr.prebuf = DEFAULT_PREBUF; + s->buffer_attr.minreq = DEFAULT_MINREQ; + } - t = pa_tagstruct_new(NULL, 0); - assert(t); - - pa_tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM); - pa_tagstruct_putu32(t, tag = c->ctag++); - pa_tagstruct_puts(t, name); - pa_tagstruct_put_sample_spec(t, ss); - pa_tagstruct_putu32(t, (uint32_t) -1); - pa_tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH); - pa_tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH); - pa_tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF); - - pa_pstream_send_tagstruct(c->pstream, t); - - pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s); - s->next = c->first_stream; if (s->next) s->next->previous = s; s->previous = NULL; c->first_stream = s; - return 0; + if (dev) + lookup_device(s, dev); + else + create_stream(s, (uint32_t) -1); + + return s; } void pa_stream_free(struct pa_stream *s) { assert(s && s->context); + + free(s->name); - if (s->channel_valid) { + if (s->channel_valid && s->context->state == CONTEXT_READY) { struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0); assert(t); @@ -469,7 +571,7 @@ void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stre void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { struct pa_memchunk chunk; - assert(s && s->context && data && length); + assert(s && s->context && data && length && s->state == STREAM_READY); chunk.memblock = pa_memblock_new(length); assert(chunk.memblock && chunk.memblock->data); @@ -489,7 +591,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { } size_t pa_stream_writable_size(struct pa_stream *s) { - assert(s); + assert(s && s->state == STREAM_READY); return s->requested_bytes; } @@ -512,3 +614,72 @@ void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream s->die_callback = cb; s->die_userdata = userdata; } + +int pa_context_is_pending(struct pa_context *c) { + assert(c); + + if (c->state != CONTEXT_READY) + return 0; + + return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch); +} + +struct pa_context* pa_stream_get_context(struct pa_stream *p) { + assert(p); + return p->context; +} + +static void set_dispatch_callbacks(struct pa_context *c); + +static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) { + set_dispatch_callbacks(userdata); +} + +static void pstream_drain_callback(struct pa_pstream *s, void *userdata) { + set_dispatch_callbacks(userdata); +} + +static void set_dispatch_callbacks(struct pa_context *c) { + assert(c && c->state == CONTEXT_READY); + + pa_pstream_set_drain_callback(c->pstream, NULL, NULL); + pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL); + + if (pa_pdispatch_is_pending(c->pdispatch)) { + pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c); + return; + } + + if (pa_pstream_is_pending(c->pstream)) { + pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c); + return; + } + + assert(c->drain_complete_callback); + c->drain_complete_callback(c, c->drain_complete_userdata); +} + +int pa_context_drain( + struct pa_context *c, + void (*complete) (struct pa_context*c, void *userdata), + void *userdata) { + + assert(c && c->state == CONTEXT_READY); + + if (complete == NULL) { + c->drain_complete_callback = NULL; + pa_pstream_set_drain_callback(c->pstream, NULL, NULL); + pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL); + return 0; + } + + if (!pa_context_is_pending(c)) + return -1; + + c->drain_complete_callback = complete; + c->drain_complete_userdata = userdata; + + set_dispatch_callbacks(c); + + return 0; +} -- cgit