From 888e44f3b0a361b5cfcca1090d0defd21217db11 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 20 Mar 2009 18:04:23 +0100 Subject: rework bluetooth IO loops --- src/modules/bluetooth/module-bluetooth-device.c | 254 ++++++++++++++---------- 1 file changed, 150 insertions(+), 104 deletions(-) (limited to 'src/modules/bluetooth') diff --git a/src/modules/bluetooth/module-bluetooth-device.c b/src/modules/bluetooth/module-bluetooth-device.c index 3da69fc7..fc8dc9ca 100644 --- a/src/modules/bluetooth/module-bluetooth-device.c +++ b/src/modules/bluetooth/module-bluetooth-device.c @@ -96,7 +96,7 @@ struct a2dp_info { sbc_capabilities_t sbc_capabilities; sbc_t sbc; /* Codec data */ pa_bool_t sbc_initialized; /* Keep track if the encoder is initialized */ - size_t codesize; /* SBC codesize */ + size_t codesize, frame_length; /* SBC Codesize, frame_length. We simply cache those values here */ void* buffer; /* Codec transfer buffer */ size_t buffer_size; /* Size of the buffer */ @@ -583,7 +583,8 @@ static void setup_sbc(struct a2dp_info *a2dp) { } a2dp->sbc.bitpool = active_capabilities->max_bitpool; - a2dp->codesize = (uint16_t) sbc_get_codesize(&a2dp->sbc); + a2dp->codesize = sbc_get_codesize(&a2dp->sbc); + a2dp->frame_length = sbc_get_frame_length(&a2dp->sbc); } static int set_conf(struct userdata *u) { @@ -645,7 +646,12 @@ static int set_conf(struct userdata *u) { /* setup SBC encoder now we agree on parameters */ if (u->profile == PROFILE_A2DP) { setup_sbc(&u->a2dp); - u->block_size = u->a2dp.codesize; + + u->block_size = + ((u->link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) + / u->a2dp.frame_length + * u->a2dp.codesize); + pa_log_info("SBC parameters:\n\tallocation=%u\n\tsubbands=%u\n\tblocks=%u\n\tbitpool=%u\n", u->a2dp.sbc.allocation, u->a2dp.sbc.subbands, u->a2dp.sbc.blocks, u->a2dp.sbc.bitpool); } else @@ -853,48 +859,62 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off static int hsp_process_render(struct userdata *u) { int ret = 0; - pa_memchunk memchunk; pa_assert(u); pa_assert(u->profile == PROFILE_HSP); pa_assert(u->sink); - pa_sink_render_full(u->sink, u->block_size, &memchunk); + /* First, render some data */ + if (!u->write_memchunk.memblock) + pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk); + + pa_assert(u->write_memchunk.length == u->block_size); for (;;) { ssize_t l; const void *p; - p = (const uint8_t*) pa_memblock_acquire(memchunk.memblock) + memchunk.index; - l = pa_write(u->stream_fd, p, memchunk.length, &u->stream_write_type); - pa_memblock_release(memchunk.memblock); + /* Now write that data to the socket. The socket is of type + * SEQPACKET, and we generated the data of the MTU size, so this + * should just work. */ - pa_log_debug("Memblock written to socket: %lli bytes", (long long) l); + p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index; + l = pa_write(u->stream_fd, p, u->write_memchunk.length, &u->stream_write_type); + pa_memblock_release(u->write_memchunk.memblock); pa_assert(l != 0); if (l < 0) { - if (errno == EINTR || errno == EAGAIN) /*** FIXME: EAGAIN handling borked ***/ + + if (errno == EINTR) + /* Retry right away if we got interrupted */ continue; - else { - pa_log_error("Failed to write data to SCO socket: %s", pa_cstrerror(errno)); - ret = -1; + + else if (errno == EAGAIN) + /* Hmm, apparently the socket was not writable, give up for now */ break; - } - } else { - pa_assert((size_t) l <= memchunk.length); - memchunk.index += (size_t) l; - memchunk.length -= (size_t) l; + pa_log_error("Failed to write data to SCO socket: %s", pa_cstrerror(errno)); + ret = -1; + break; + } - u->write_index += (uint64_t) l; + pa_assert((size_t) l <= u->write_memchunk.length); - if (memchunk.length <= 0) - break; + if ((size_t) l != u->write_memchunk.length) { + pa_log_error("Wrote memory block to socket only partially! %llu written, wanted to write %llu.", + (unsigned long long) l, + (unsigned long long) u->write_memchunk.length); + ret = -1; + break; } - } - pa_memblock_unref(memchunk.memblock); + u->write_index += (uint64_t) u->write_memchunk.length; + pa_memblock_unref(u->write_memchunk.memblock); + pa_memchunk_reset(&u->write_memchunk); + + break; + } return ret; } @@ -919,20 +939,27 @@ static int hsp_process_push(struct userdata *u) { pa_memblock_release(memchunk.memblock); if (l <= 0) { - if (l < 0 && (errno == EINTR || errno == EAGAIN)) /*** FIXME: EAGAIN handling borked ***/ + + if (l < 0 && errno == EINTR) + /* Retry right away if we got interrupted */ continue; - else { - pa_log_error("Failed to read data from SCO socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF"); - ret = -1; + + else if (l < 0 && errno == EAGAIN) + /* Hmm, apparently the socket was not readable, give up for now. */ break; - } - } else { - memchunk.length = (size_t) l; - u->read_index += (uint64_t) l; - pa_source_post(u->source, &memchunk); + pa_log_error("Failed to read data from SCO socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF"); + ret = -1; break; } + + pa_assert((size_t) l <= memchunk.length); + + memchunk.length = (size_t) l; + u->read_index += (uint64_t) l; + + pa_source_post(u->source, &memchunk); + break; } pa_memblock_unref(memchunk.memblock); @@ -940,127 +967,146 @@ static int hsp_process_push(struct userdata *u) { return ret; } +static void a2dp_prepare_buffer(struct userdata *u) { + pa_assert(u); + + if (u->a2dp.buffer_size >= u->link_mtu) + return; + + u->a2dp.buffer_size = 2 * u->link_mtu; + pa_xfree(u->a2dp.buffer); + u->a2dp.buffer = pa_xmalloc(u->a2dp.buffer_size); +} + static int a2dp_process_render(struct userdata *u) { - size_t frame_size; struct a2dp_info *a2dp; struct rtp_header *header; struct rtp_payload *payload; - size_t left; + size_t nbytes; void *d; const void *p; + size_t to_write, to_encode; unsigned frame_count; - size_t written; - uint64_t writing_at; + int ret = 0; pa_assert(u); pa_assert(u->profile == PROFILE_A2DP); pa_assert(u->sink); - a2dp = &u->a2dp; + /* First, render some data */ + if (!u->write_memchunk.memblock) + pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk); - if (a2dp->buffer_size < u->link_mtu) { - a2dp->buffer_size = 2*u->link_mtu; - pa_xfree(a2dp->buffer); - a2dp->buffer = pa_xmalloc(a2dp->buffer_size); - } + pa_assert(u->write_memchunk.length == u->block_size); + + a2dp_prepare_buffer(u); - header = (struct rtp_header*) a2dp->buffer; + a2dp = &u->a2dp; + header = a2dp->buffer; payload = (struct rtp_payload*) ((uint8_t*) a2dp->buffer + sizeof(*header)); - d = (uint8_t*) a2dp->buffer + sizeof(*header) + sizeof(*payload); - left = a2dp->buffer_size - sizeof(*header) - sizeof(*payload); - frame_size = sbc_get_frame_length(&a2dp->sbc); frame_count = 0; - writing_at = u->write_index; + /* Try to create a packet of the full MTU */ - do { - ssize_t encoded; + p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index; + to_encode = u->write_memchunk.length; - if (!u->write_memchunk.memblock) - pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk); + d = (uint8_t*) a2dp->buffer + sizeof(*header) + sizeof(*payload); + to_write = a2dp->buffer_size - sizeof(*header) - sizeof(*payload); + + while (PA_LIKELY(to_encode > 0 && to_write > 0)) { + size_t written; + ssize_t encoded; - p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index; encoded = sbc_encode(&a2dp->sbc, - p, u->write_memchunk.length, - d, left, + p, to_encode, + d, to_write, &written); - PA_ONCE_BEGIN { - pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&a2dp->sbc))); - } PA_ONCE_END; - - pa_memblock_release(u->write_memchunk.memblock); - - if (encoded <= 0) { - pa_log_error("SBC encoding error (%d)", encoded); + if (PA_UNLIKELY(encoded <= 0)) { + pa_log_error("SBC encoding error (%li)", (long) encoded); + pa_memblock_release(u->write_memchunk.memblock); return -1; } - pa_assert((size_t) encoded <= u->write_memchunk.length); - pa_assert((size_t) encoded == sbc_get_codesize(&a2dp->sbc)); +/* pa_log_debug("SBC: encoded: %lu; written: %lu", (unsigned long) encoded, (unsigned long) written); */ +/* pa_log_debug("SBC: codesize: %lu; frame_length: %lu", (unsigned long) a2dp->codesize, (unsigned long) a2dp->frame_length); */ - pa_assert((size_t) written <= left); - pa_assert((size_t) written == sbc_get_frame_length(&a2dp->sbc)); + pa_assert_fp((size_t) encoded <= to_encode); + pa_assert_fp((size_t) encoded == a2dp->codesize); -/* pa_log_debug("SBC: encoded: %d; written: %d", encoded, written); */ + pa_assert_fp((size_t) written <= to_write); + pa_assert_fp((size_t) written == a2dp->frame_length); - u->write_memchunk.index += encoded; - u->write_memchunk.length -= encoded; - - if (u->write_memchunk.length <= 0) { - pa_memblock_unref(u->write_memchunk.memblock); - pa_memchunk_reset(&u->write_memchunk); - } - - u->write_index += encoded; + p = (const uint8_t*) p + encoded; + to_encode -= encoded; d = (uint8_t*) d + written; - left -= written; + to_write -= written; frame_count++; + } + + pa_memblock_release(u->write_memchunk.memblock); + + pa_assert(to_encode == 0); - } while (((uint8_t*) d - ((uint8_t*) a2dp->buffer + sbc_get_frame_length(&a2dp->sbc))) < (ptrdiff_t) u->link_mtu); + PA_ONCE_BEGIN { + pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&a2dp->sbc))); + } PA_ONCE_END; /* write it to the fifo */ memset(a2dp->buffer, 0, sizeof(*header) + sizeof(*payload)); - payload->frame_count = frame_count; header->v = 2; header->pt = 1; header->sequence_number = htons(a2dp->seq_num++); - header->timestamp = htonl(writing_at / frame_size); + header->timestamp = htonl(u->write_index / pa_frame_size(&u->sink->sample_spec)); header->ssrc = htonl(1); + payload->frame_count = frame_count; - p = a2dp->buffer; - left = (uint8_t*) d - (uint8_t*) a2dp->buffer; + nbytes = (uint8_t*) d - (uint8_t*) a2dp->buffer; for (;;) { ssize_t l; - l = pa_write(u->stream_fd, p, left, &u->stream_write_type); -/* pa_log_debug("write: requested %lu bytes; written %li bytes; mtu=%li", (unsigned long) left, (long) l, (unsigned long) u->link_mtu); */ + l = pa_write(u->stream_fd, a2dp->buffer, nbytes, &u->stream_write_type); pa_assert(l != 0); if (l < 0) { - if (errno == EINTR || errno == EAGAIN) /*** FIXME: EAGAIN handling borked ***/ - continue; - else { - pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno)); - return -1; - } - } else { - pa_assert((size_t) l <= left); - d = (uint8_t*) d + l; - left -= l; + if (errno == EINTR) + /* Retry right away if we got interrupted */ + continue; - if (left <= 0) + else if (errno == EAGAIN) + /* Hmm, apparently the socket was not writable, give up for now */ break; + + pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno)); + ret = -1; + break; + } + + pa_assert((size_t) l <= nbytes); + + if ((size_t) l != nbytes) { + pa_log_warn("Wrote memory block to socket only partially! %llu written, wanted to write %llu.", + (unsigned long long) l, + (unsigned long long) nbytes); + ret = -1; + break; } + + u->write_index += (uint64_t) u->write_memchunk.length; + pa_memblock_unref(u->write_memchunk.memblock); + pa_memchunk_reset(&u->write_memchunk); + + break; } - return 0; + return ret; } static void thread_func(void *userdata) { @@ -1509,12 +1555,20 @@ static void shutdown_bt(struct userdata *u) { if (u->stream_fd >= 0) { pa_close(u->stream_fd); u->stream_fd = -1; + + u->stream_write_type = 0; + u->stream_read_type = 0; } if (u->service_fd >= 0) { pa_close(u->service_fd); u->service_fd = -1; } + + if (u->write_memchunk.memblock) { + pa_memblock_unref(u->write_memchunk.memblock); + pa_memchunk_reset(&u->write_memchunk); + } } static int init_bt(struct userdata *u) { @@ -1686,11 +1740,6 @@ static int card_set_profile(pa_card *c, pa_card_profile *new_profile) { stop_thread(u); shutdown_bt(u); - if (u->write_memchunk.memblock) { - pa_memblock_unref(u->write_memchunk.memblock); - pa_memchunk_reset(&u->write_memchunk); - } - u->profile = *d; u->sample_spec = u->requested_sample_spec; @@ -2034,9 +2083,6 @@ void pa__done(pa_module *m) { if (u->device) pa_bluetooth_device_free(u->device); - if (u->write_memchunk.memblock) - pa_memblock_unref(u->write_memchunk.memblock); - if (u->a2dp.buffer) pa_xfree(u->a2dp.buffer); -- cgit