From 5ea96e312be3aa495e77786283e1edea7592047f Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sat, 10 Jul 2004 19:04:21 +0000 Subject: implement parec-simple and matching simple recording API add support for killing source outputs in native protocol fix channel management in client library git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@56 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/Makefile.am | 6 +++- src/polyp-error.c | 5 +++- src/polyp.c | 47 +++++++++++++++++++++++++------ src/protocol-native-spec.h | 4 +++ src/protocol-native.c | 27 ++++++++++++++++++ src/simple.c | 70 +++++++++++++++++++++++++++++++++++++++++++--- src/todo | 3 +- 7 files changed, 145 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index d7002caf..b02bfba6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -18,7 +18,7 @@ AM_CFLAGS=-ansi -D_GNU_SOURCE -bin_PROGRAMS = polypaudio pacat pacat-simple +bin_PROGRAMS = polypaudio pacat pacat-simple parec-simple pkglib_LTLIBRARIES=libiochannel.la \ libsocket-server.la \ @@ -248,3 +248,7 @@ pacat_CFLAGS = $(AM_CFLAGS) pacat_simple_SOURCES = pacat-simple.c $(libpolyp_la_SOURCES) $(libpolyp_simple_la_SOURCES) $(libpolyp_error_la_SOURCES) #pacat_simple_LDADD = libpolyp-simple.la libpolyp-error.la pacat_simple_CFLAGS = $(AM_CFLAGS) + +parec_simple_SOURCES = parec-simple.c $(libpolyp_la_SOURCES) $(libpolyp_simple_la_SOURCES) $(libpolyp_error_la_SOURCES) +#parec_simple_LDADD = libpolyp-simple.la libpolyp-error.la +parec_simple_CFLAGS = $(AM_CFLAGS) diff --git a/src/polyp-error.c b/src/polyp-error.c index 861711a2..86166b0c 100644 --- a/src/polyp-error.c +++ b/src/polyp-error.c @@ -1,3 +1,4 @@ +#include #include #include "polyp-error.h" @@ -14,7 +15,9 @@ static const char* const errortab[PA_ERROR_MAX] = { [PA_ERROR_PROTOCOL] = "Protocol corrupt", [PA_ERROR_TIMEOUT] = "Timeout", [PA_ERROR_AUTHKEY] = "Not authorization key", - [PA_ERROR_INTERNAL] = "Internal error" + [PA_ERROR_INTERNAL] = "Internal error", + [PA_ERROR_CONNECTIONTERMINATED] = "Connection terminated", + [PA_ERROR_KILLED] = "Entity killed", }; const char*pa_strerror(uint32_t error) { diff --git a/src/polyp.c b/src/polyp.c index 2c2810fb..fde0c68b 100644 --- a/src/polyp.c +++ b/src/polyp.c @@ -28,7 +28,7 @@ struct pa_context { struct pa_socket_client *client; struct pa_pstream *pstream; struct pa_pdispatch *pdispatch; - struct pa_dynarray *streams; + struct pa_dynarray *record_streams, *playback_streams; struct pa_stream *first_stream; uint32_t ctag; uint32_t error; @@ -85,6 +85,7 @@ struct pa_stream { }; static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ERROR] = { NULL }, @@ -95,6 +96,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL }, [PA_COMMAND_EXIT] = { NULL }, [PA_COMMAND_REQUEST] = { command_request }, + [PA_COMMAND_PLAYBACK_STREAM_KILLED] = { command_playback_stream_killed }, + [PA_COMMAND_RECORD_STREAM_KILLED] = { command_playback_stream_killed }, }; struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) { @@ -108,8 +111,10 @@ struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char * c->client = NULL; c->pstream = NULL; c->pdispatch = NULL; - c->streams = pa_dynarray_new(); - assert(c->streams); + c->playback_streams = pa_dynarray_new(); + assert(c->playback_streams); + c->record_streams = pa_dynarray_new(); + assert(c->record_streams); c->first_stream = NULL; c->error = PA_ERROR_OK; c->state = CONTEXT_UNCONNECTED; @@ -140,8 +145,10 @@ void pa_context_free(struct pa_context *c) { pa_pdispatch_free(c->pdispatch); if (c->pstream) pa_pstream_free(c->pstream); - if (c->streams) - pa_dynarray_free(c->streams, NULL, NULL); + if (c->record_streams) + pa_dynarray_free(c->record_streams, NULL, NULL); + if (c->playback_streams) + pa_dynarray_free(c->playback_streams, NULL, NULL); free(c->name); free(c); @@ -194,6 +201,7 @@ static void context_dead(struct pa_context *c) { static void pstream_die_callback(struct pa_pstream *p, void *userdata) { struct pa_context *c = userdata; assert(p && c); + c->error = PA_ERROR_CONNECTIONTERMINATED; context_dead(c); } @@ -203,6 +211,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) { fprintf(stderr, "polyp.c: invalid packet.\n"); + c->error = PA_ERROR_PROTOCOL; context_dead(c); } } @@ -212,7 +221,7 @@ static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, in struct pa_stream *s; assert(p && chunk && c && chunk->memblock && chunk->memblock->data); - if (!(s = pa_dynarray_get(c->streams, channel))) + if (!(s = pa_dynarray_get(c->record_streams, channel))) return; if (s->read_callback) @@ -353,6 +362,26 @@ void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_cont c->die_userdata = userdata; } +static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct pa_context *c = userdata; + struct pa_stream *s; + uint32_t channel; + assert(pd && (command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED) && t && c); + + if (pa_tagstruct_getu32(t, &channel) < 0 || + !pa_tagstruct_eof(t)) { + c->error = PA_ERROR_PROTOCOL; + context_dead(c); + return; + } + + if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel))) + return; + + c->error = PA_ERROR_KILLED; + stream_dead(s); +} + static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct pa_stream *s; struct pa_context *c = userdata; @@ -367,7 +396,7 @@ static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t return; } - if (!(s = pa_dynarray_get(c->streams, channel))) + if (!(s = pa_dynarray_get(c->playback_streams, channel))) return; if (s->state != STREAM_READY) @@ -405,7 +434,7 @@ static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, ui } s->channel_valid = 1; - pa_dynarray_put(s->context->streams, s->channel, s); + pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, s); s->state = STREAM_READY; if (s->create_complete_callback) @@ -562,7 +591,7 @@ void pa_stream_free(struct pa_stream *s) { } if (s->channel_valid) - pa_dynarray_put(s->context->streams, s->channel, NULL); + pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL); if (s->next) s->next->previous = s->previous; diff --git a/src/protocol-native-spec.h b/src/protocol-native-spec.h index fea14cc0..5e67fe74 100644 --- a/src/protocol-native-spec.h +++ b/src/protocol-native-spec.h @@ -16,6 +16,8 @@ enum { PA_COMMAND_LOOKUP_SINK, PA_COMMAND_LOOKUP_SOURCE, PA_COMMAND_DRAIN_PLAYBACK_STREAM, + PA_COMMAND_PLAYBACK_STREAM_KILLED, + PA_COMMAND_RECORD_STREAM_KILLED, PA_COMMAND_MAX }; @@ -31,6 +33,8 @@ enum { PA_ERROR_TIMEOUT, PA_ERROR_AUTHKEY, PA_ERROR_INTERNAL, + PA_ERROR_CONNECTIONTERMINATED, + PA_ERROR_KILLED, PA_ERROR_MAX }; diff --git a/src/protocol-native.c b/src/protocol-native.c index fe086066..c7a7cce0 100644 --- a/src/protocol-native.c +++ b/src/protocol-native.c @@ -254,6 +254,31 @@ static void send_memblock(struct connection *c) { } } +static void send_playback_stream_killed(struct playback_stream *p) { + struct pa_tagstruct *t; + assert(p); + + t = pa_tagstruct_new(NULL, 0); + assert(t); + pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED); + pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */ + pa_tagstruct_putu32(t, p->index); + pa_pstream_send_tagstruct(p->connection->pstream, t); +} + +static void send_record_stream_killed(struct record_stream *r) { + struct pa_tagstruct *t; + assert(r); + + t = pa_tagstruct_new(NULL, 0); + assert(t); + pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED); + pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */ + pa_tagstruct_putu32(t, r->index); + pa_pstream_send_tagstruct(r->connection->pstream, t); +} + + /*** sinkinput callbacks ***/ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) { @@ -283,6 +308,7 @@ static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) { static void sink_input_kill_cb(struct pa_sink_input *i) { assert(i && i->userdata); + send_playback_stream_killed((struct playback_stream *) i->userdata); playback_stream_free((struct playback_stream *) i->userdata); } @@ -308,6 +334,7 @@ static void source_output_push_cb(struct pa_source_output *o, const struct pa_me static void source_output_kill_cb(struct pa_source_output *o) { assert(o && o->userdata); + send_record_stream_killed((struct record_stream *) o->userdata); record_stream_free((struct record_stream *) o->userdata); } diff --git a/src/simple.c b/src/simple.c index ba0a8f0c..5f86c5bf 100644 --- a/src/simple.c +++ b/src/simple.c @@ -1,3 +1,5 @@ +#include +#include #include #include @@ -10,10 +12,16 @@ struct pa_simple { struct pa_mainloop *mainloop; struct pa_context *context; struct pa_stream *stream; + enum pa_stream_direction direction; int dead, drained; + + void *read_data; + size_t read_index, read_length; }; +static void read_callback(struct pa_stream *s, const void*data, size_t length, void *userdata); + static int check_error(struct pa_simple *p, int *perror) { assert(p); @@ -71,6 +79,9 @@ struct pa_simple* pa_simple_new( p->mainloop = pa_mainloop_new(); assert(p->mainloop); p->dead = 0; + p->direction = dir; + p->read_data = NULL; + p->read_index = p->read_length = 0; if (!(p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), name))) goto fail; @@ -95,6 +106,8 @@ struct pa_simple* pa_simple_new( goto fail; } + pa_stream_set_read_callback(p->stream, read_callback, p); + return p; fail: @@ -107,6 +120,8 @@ fail: void pa_simple_free(struct pa_simple *s) { assert(s); + free(s->read_data); + if (s->stream) pa_stream_free(s->stream); @@ -120,7 +135,7 @@ void pa_simple_free(struct pa_simple *s) { } int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *perror) { - assert(p && data); + assert(p && data && p->direction == PA_STREAM_PLAYBACK); while (length > 0) { size_t l; @@ -144,10 +159,57 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe return 0; } -int pa_simple_read(struct pa_simple *s, void*data, size_t length, int *perror) { - assert(0); +static void read_callback(struct pa_stream *s, const void*data, size_t length, void *userdata) { + struct pa_simple *p = userdata; + assert(s && data && length && p); + + if (p->read_data) { + fprintf(stderr, __FILE__": Buffer overflow, dropping incoming memory blocks.\n"); + free(p->read_data); + } + + p->read_data = malloc(p->read_length = length); + assert(p->read_data); + memcpy(p->read_data, data, length); + p->read_index = 0; } +int pa_simple_read(struct pa_simple *p, void*data, size_t length, int *perror) { + assert(p && data && p->direction == PA_STREAM_RECORD); + + while (length > 0) { + if (p->read_data) { + size_t l = length; + + if (p->read_length <= l) + l = p->read_length; + + memcpy(data, p->read_data+p->read_index, l); + + data += l; + length -= l; + + p->read_index += l; + p->read_length -= l; + + if (!p->read_length) { + free(p->read_data); + p->read_data = NULL; + p->read_index = 0; + } + + if (!length) + return 0; + + assert(!p->read_data); + } + + if (iterate(p, 1, perror) < 0) + return -1; + } + + return 0; +} static void drain_complete(struct pa_stream *s, void *userdata) { struct pa_simple *p = userdata; @@ -156,7 +218,7 @@ static void drain_complete(struct pa_stream *s, void *userdata) { } int pa_simple_drain(struct pa_simple *p, int *perror) { - assert(p); + assert(p && p->direction == PA_STREAM_PLAYBACK); p->drained = 0; pa_stream_drain(p->stream, drain_complete, p); diff --git a/src/todo b/src/todo index ee971ca0..1e1e02c5 100644 --- a/src/todo +++ b/src/todo @@ -1,4 +1,3 @@ -- implement parec-simple - native library/protocol: more functions (esp. latency) @@ -21,7 +20,7 @@ - svn-id and license in every file - documentation - +- dependency checking script -- post 0.1 - future cancellation -- cgit