From 57dc42709fa258844db05f2042dfffe6ca8ade8b Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sun, 27 Jun 2004 17:50:02 +0000 Subject: many fixes git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@37 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/mainloop.c | 12 ++++++++---- src/pacat.c | 32 ++++++++++++++++++++------------ src/pdispatch.c | 5 +++-- src/polyp.c | 16 ++++++++++++---- src/protocol-native.c | 24 +++++++++++++++++++++--- src/pstream.c | 10 +++++----- src/tagstruct.c | 4 ++-- 7 files changed, 71 insertions(+), 32 deletions(-) diff --git a/src/mainloop.c b/src/mainloop.c index a1758c65..8629add2 100644 --- a/src/mainloop.c +++ b/src/mainloop.c @@ -162,11 +162,14 @@ static void dispatch_pollfds(struct pa_mainloop *m) { struct mainloop_source_io *s; for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) { - if (s->header.dead || !s->events || !s->pollfd || !s->pollfd->revents) + if (s->header.dead || !s->pollfd || !s->pollfd->revents) continue; - assert(s->pollfd->revents <= s->pollfd->events && s->pollfd->fd == s->fd && s->callback); - s->callback(&m->api, s, s->fd, ((s->pollfd->revents & POLLIN) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata); + assert(s->pollfd->fd == s->fd && s->callback); + s->callback(&m->api, s, s->fd, + ((s->pollfd->revents & (POLLIN|POLLHUP|POLLERR)) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | + ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata); + s->pollfd->revents = 0; } } @@ -212,7 +215,7 @@ static int calc_next_timeout(struct pa_mainloop *m) { if (tmp == 0) return 0; - else if (tmp < t) + else if (t == -1 || tmp < t) t = tmp; } @@ -357,6 +360,7 @@ static void mainloop_cancel_io(struct pa_mainloop_api*a, void* id) { s->header.dead = 1; m->io_sources_scan_dead = 1; + m->rebuild_pollfds = 1; } /* Fixed sources */ diff --git a/src/pacat.c b/src/pacat.c index 5f5a373b..5ee1b866 100644 --- a/src/pacat.c +++ b/src/pacat.c @@ -29,22 +29,15 @@ static void stream_die_callback(struct pa_stream *s, void *userdata) { mainloop_api->quit(mainloop_api, 1); } -static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) { +static void do_write(size_t length) { size_t l; - assert(s && length); - - mainloop_api->enable_io(mainloop_api, stdin_source, PA_STREAM_PLAYBACK); - - if (!buffer) - return; - - assert(buffer_length); + assert(buffer && buffer_length); l = length; if (l > buffer_length) l = buffer_length; - pa_stream_write(s, buffer+buffer_index, l); + pa_stream_write(stream, buffer+buffer_index, l); buffer_length -= l; buffer_index += l; @@ -55,12 +48,24 @@ static void stream_write_callback(struct pa_stream *s, size_t length, void *user } } +static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) { + assert(s && length); + + mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT); + + if (!buffer) + return; + + do_write(length); +} + static void stream_complete_callback(struct pa_context*c, struct pa_stream *s, void *userdata) { assert(c); if (!s) { fprintf(stderr, "Stream creation failed.\n"); mainloop_api->quit(mainloop_api, 1); + return; } stream = s; @@ -94,7 +99,7 @@ fail: } static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { - size_t l; + size_t l, w = 0; ssize_t r; assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT); @@ -103,7 +108,7 @@ static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_m return; } - if (!(l = pa_stream_writable_size(stream))) + if (!stream || !(l = w = pa_stream_writable_size(stream))) l = 4096; buffer = malloc(l); assert(buffer); @@ -120,6 +125,9 @@ static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_m buffer_length = r; buffer_index = 0; + + if (w) + do_write(w); } int main(int argc, char *argv[]) { diff --git a/src/pdispatch.c b/src/pdispatch.c index 15f4b1b5..65dcd747 100644 --- a/src/pdispatch.c +++ b/src/pdispatch.c @@ -22,7 +22,9 @@ struct pdispatch { static void reply_info_free(struct reply_info *r) { assert(r && r->pdispatch && r->pdispatch->mainloop); - r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout); + + if (r->pdispatch) + r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout); if (r->previous) r->previous->next = r->next; @@ -112,7 +114,6 @@ fail: if (ts) tagstruct_free(ts); - fprintf(stderr, "protocol-native: invalid packet.\n"); return -1; } diff --git a/src/polyp.c b/src/polyp.c index 69b85c21..c298d46d 100644 --- a/src/polyp.c +++ b/src/polyp.c @@ -14,7 +14,7 @@ #define DEFAULT_QUEUE_LENGTH 10240 #define DEFAULT_MAX_LENGTH 20480 #define DEFAULT_PREBUF 4096 -#define DEFAULT_TIMEOUT 5 +#define DEFAULT_TIMEOUT (5*60) #define DEFAULT_SERVER "/tmp/polypaudio_native" struct pa_context { @@ -39,6 +39,7 @@ struct pa_context { struct pa_stream { struct pa_context *context; struct pa_stream *next, *previous; + uint32_t device_index; uint32_t channel; int channel_valid; enum pa_stream_direction direction; @@ -252,11 +253,11 @@ static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct pa_stream *s; struct pa_context *c = userdata; uint32_t bytes, channel; - assert(pd && command == PA_COMMAND_REQUEST && t && s); + assert(pd && command == PA_COMMAND_REQUEST && t && c); if (tagstruct_getu32(t, &channel) < 0 || tagstruct_getu32(t, &bytes) < 0 || - tagstruct_eof(t)) { + !tagstruct_eof(t)) { c->errno = PA_ERROR_PROTOCOL; return -1; } @@ -266,6 +267,8 @@ static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, return -1; } + fprintf(stderr, "Requested %u bytes\n", bytes); + s->requested_bytes += bytes; if (s->requested_bytes && s->write_callback) @@ -295,7 +298,8 @@ static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint } if (tagstruct_getu32(t, &s->channel) < 0 || - tagstruct_eof(t)) { + tagstruct_getu32(t, &s->device_index) < 0 || + !tagstruct_eof(t)) { s->context->errno = PA_ERROR_PROTOCOL; ret = -1; goto fail; @@ -349,6 +353,7 @@ int pa_stream_new( s->requested_bytes = 0; s->channel = 0; s->channel_valid = 0; + s->device_index = (uint32_t) -1; s->direction = dir; t = tagstruct_new(NULL, 0); @@ -419,7 +424,10 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { chunk.length = length; pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk); + memblock_unref(chunk.memblock); + fprintf(stderr, "Sent %u bytes\n", length); + if (length < s->requested_bytes) s->requested_bytes -= length; else diff --git a/src/protocol-native.c b/src/protocol-native.c index a39880b8..b8a461ae 100644 --- a/src/protocol-native.c +++ b/src/protocol-native.c @@ -29,6 +29,7 @@ struct playback_stream { size_t qlength; struct sink_input *sink_input; struct memblockq *memblockq; + size_t requested_bytes; }; struct connection { @@ -81,7 +82,7 @@ static void record_stream_free(struct record_stream* r) { static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) { struct playback_stream *s; - assert(c && sink && s && name && qlen && maxlength && prebuf); + assert(c && sink && ss && name && qlen && maxlength && prebuf); s = malloc(sizeof(struct playback_stream)); assert (s); @@ -99,8 +100,9 @@ static struct playback_stream* playback_stream_new(struct connection *c, struct s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf); assert(s->memblockq); + s->requested_bytes = 0; + idxset_put(c->playback_streams, s, &s->index); - request_bytes(s); return s; } @@ -141,12 +143,21 @@ static void request_bytes(struct playback_stream *s) { if (!(l = memblockq_missing_to(s->memblockq, s->qlength))) return; + if (l <= s->requested_bytes) + return; + + l -= s->requested_bytes; + s->requested_bytes += l; + t = tagstruct_new(NULL, 0); assert(t); tagstruct_putu32(t, PA_COMMAND_REQUEST); + tagstruct_putu32(t, (uint32_t) -1); /* tag */ tagstruct_putu32(t, s->index); tagstruct_putu32(t, l); pstream_send_tagstruct(s->connection->pstream, t); + + fprintf(stderr, "Requesting %u bytes\n", l); } /*** sinkinput callbacks ***/ @@ -237,6 +248,7 @@ static int command_create_playback_stream(struct pdispatch *pd, uint32_t command assert(s->sink_input); tagstruct_putu32(reply, s->sink_input->index); pstream_send_tagstruct(c->pstream, reply); + request_bytes(s); return 0; } @@ -284,7 +296,6 @@ static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, st /*** pstream callbacks ***/ - static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) { struct connection *c = userdata; assert(p && packet && packet->data && c); @@ -307,10 +318,17 @@ static int memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, return -1; } + if (chunk->length >= stream->requested_bytes) + stream->requested_bytes = 0; + else + stream->requested_bytes -= chunk->length; + memblockq_push(stream->memblockq, chunk, delta); assert(stream->sink_input); sink_notify(stream->sink_input->sink); + fprintf(stderr, "Recieved %u bytes.\n", chunk->length); + return 0; } diff --git a/src/pstream.c b/src/pstream.c index 4a3a648b..e3f59816 100644 --- a/src/pstream.c +++ b/src/pstream.c @@ -131,11 +131,11 @@ static void item_free(void *item, void *p) { struct item_info *i = item; assert(i); - if (i->type == PSTREAM_ITEM_PACKET) { + if (i->type == PSTREAM_ITEM_MEMBLOCK) { assert(i->chunk.memblock); memblock_unref(i->chunk.memblock); } else { - assert(i->type == PSTREAM_ITEM_MEMBLOCK); + assert(i->type == PSTREAM_ITEM_PACKET); assert(i->packet); packet_unref(i->packet); } @@ -184,7 +184,7 @@ void pstream_send_packet(struct pstream*p, struct packet *packet) { void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) { struct item_info *i; - assert(p && channel && chunk); + assert(p && channel != (uint32_t) -1 && chunk); i = malloc(sizeof(struct item_info)); assert(i); @@ -258,7 +258,7 @@ static void do_write(struct pstream *p) { l = PSTREAM_DESCRIPTOR_SIZE - p->write.index; } else { d = (void*) p->write.data + p->write.index - PSTREAM_DESCRIPTOR_SIZE; - l = ntohl(p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - p->write.index - PSTREAM_DESCRIPTOR_SIZE; + l = ntohl(p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PSTREAM_DESCRIPTOR_SIZE); } if ((r = iochannel_write(p->io, d, l)) < 0) @@ -298,7 +298,7 @@ static void do_read(struct pstream *p) { } else { assert(p->read.data); d = (void*) p->read.data + p->read.index - PSTREAM_DESCRIPTOR_SIZE; - l = ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - p->read.index - PSTREAM_DESCRIPTOR_SIZE; + l = ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PSTREAM_DESCRIPTOR_SIZE); } if ((r = iochannel_read(p->io, d, l)) <= 0) diff --git a/src/tagstruct.c b/src/tagstruct.c index 407440d3..e6d6b2b2 100644 --- a/src/tagstruct.c +++ b/src/tagstruct.c @@ -113,7 +113,7 @@ int tagstruct_gets(struct tagstruct*t, const char **s) { return -1; error = 1; - for (n = 0, c = (char*) (t->data+t->rindex+1); n < t->length-t->rindex-1; c++) + for (n = 0, c = (char*) (t->data+t->rindex+1); t->rindex+1+n < t->length; n++, c++) if (!*c) { error = 0; break; @@ -124,7 +124,7 @@ int tagstruct_gets(struct tagstruct*t, const char **s) { *s = (char*) (t->data+t->rindex+1); - t->rindex += n+1; + t->rindex += n+2; return 0; } -- cgit