diff options
Diffstat (limited to 'src/modules')
-rw-r--r-- | src/modules/module-alsa-sink.c | 12 | ||||
-rw-r--r-- | src/modules/module-alsa-source.c | 10 | ||||
-rw-r--r-- | src/modules/module-esound-sink.c | 11 | ||||
-rw-r--r-- | src/modules/module-jack-sink.c | 5 | ||||
-rw-r--r-- | src/modules/module-jack-source.c | 7 | ||||
-rw-r--r-- | src/modules/module-null-sink.c | 194 | ||||
-rw-r--r-- | src/modules/module-oss-mmap.c | 4 | ||||
-rw-r--r-- | src/modules/module-oss.c | 22 | ||||
-rw-r--r-- | src/modules/module-pipe-sink.c | 289 | ||||
-rw-r--r-- | src/modules/module-pipe-source.c | 13 | ||||
-rw-r--r-- | src/modules/module-sine.c | 12 | ||||
-rw-r--r-- | src/modules/rtp/rtp.c | 18 |
12 files changed, 421 insertions, 176 deletions
diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c index 3d9f7577..f9c4efd4 100644 --- a/src/modules/module-alsa-sink.c +++ b/src/modules/module-alsa-sink.c @@ -174,6 +174,7 @@ static void do_write(struct userdata *u) { update_usage(u); for (;;) { + void *p; pa_memchunk *memchunk = NULL; snd_pcm_sframes_t frames; @@ -185,14 +186,15 @@ static void do_write(struct userdata *u) { else memchunk = &u->memchunk; } - assert(memchunk->memblock); - assert(memchunk->memblock->data); assert(memchunk->length); - assert(memchunk->memblock->length); assert((memchunk->length % u->frame_size) == 0); - if ((frames = snd_pcm_writei(u->pcm_handle, (uint8_t*) memchunk->memblock->data + memchunk->index, memchunk->length / u->frame_size)) < 0) { + p = pa_memblock_acquire(memchunk->memblock); + + if ((frames = snd_pcm_writei(u->pcm_handle, (uint8_t*) p + memchunk->index, memchunk->length / u->frame_size)) < 0) { + pa_memblock_release(memchunk->memblock); + if (frames == -EAGAIN) return; @@ -217,6 +219,8 @@ static void do_write(struct userdata *u) { return; } + pa_memblock_release(memchunk->memblock); + if (memchunk == &u->memchunk) { size_t l = frames * u->frame_size; memchunk->index += l; diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index 4061d668..6d7e09e6 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -180,6 +180,7 @@ static void do_read(struct userdata *u) { pa_memchunk post_memchunk; snd_pcm_sframes_t frames; size_t l; + void *p; if (!u->memchunk.memblock) { u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size); @@ -188,11 +189,13 @@ static void do_read(struct userdata *u) { assert(u->memchunk.memblock); assert(u->memchunk.length); - assert(u->memchunk.memblock->data); - assert(u->memchunk.memblock->length); assert(u->memchunk.length % u->frame_size == 0); - if ((frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length / u->frame_size)) < 0) { + p = pa_memblock_acquire(u->memchunk.memblock); + + if ((frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) p + u->memchunk.index, u->memchunk.length / u->frame_size)) < 0) { + pa_memblock_release(u->memchunk.memblock); + if (frames == -EAGAIN) return; @@ -216,6 +219,7 @@ static void do_read(struct userdata *u) { pa_module_unload_request(u->module); return; } + pa_memblock_release(u->memchunk.memblock); l = frames * u->frame_size; diff --git a/src/modules/module-esound-sink.c b/src/modules/module-esound-sink.c index 26638d9d..39886d04 100644 --- a/src/modules/module-esound-sink.c +++ b/src/modules/module-esound-sink.c @@ -144,18 +144,25 @@ static int do_write(struct userdata *u) { u->write_index = u->write_length = 0; } } else if (u->state == STATE_RUNNING) { + void *p; + pa_module_set_used(u->module, pa_sink_used_by(u->sink)); if (!u->memchunk.length) if (pa_sink_render(u->sink, 8192, &u->memchunk) < 0) return 0; - assert(u->memchunk.memblock && u->memchunk.length); + assert(u->memchunk.memblock); + assert(u->memchunk.length); + + p = pa_memblock_acquire(u->memchunk.memblock); - if ((r = pa_iochannel_write(u->io, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) { + if ((r = pa_iochannel_write(u->io, (uint8_t*) p + u->memchunk.index, u->memchunk.length)) < 0) { + pa_memblock_release(u->memchunk.memblock); pa_log("write() failed: %s", pa_cstrerror(errno)); return -1; } + pa_memblock_release(u->memchunk.memblock); u->memchunk.index += r; u->memchunk.length -= r; diff --git a/src/modules/module-jack-sink.c b/src/modules/module-jack-sink.c index c6a7e33f..1092aed8 100644 --- a/src/modules/module-jack-sink.c +++ b/src/modules/module-jack-sink.c @@ -137,22 +137,25 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_ unsigned fs; jack_nframes_t frame_idx; pa_memchunk chunk; + void *p; fs = pa_frame_size(&u->sink->sample_spec); pa_sink_render_full(u->sink, u->frames_requested * fs, &chunk); + p = pa_memblock_acquire(chunk.memblock); for (frame_idx = 0; frame_idx < u->frames_requested; frame_idx ++) { unsigned c; for (c = 0; c < u->channels; c++) { - float *s = ((float*) ((uint8_t*) chunk.memblock->data + chunk.index)) + (frame_idx * u->channels) + c; + float *s = ((float*) ((uint8_t*) p + chunk.index)) + (frame_idx * u->channels) + c; float *d = ((float*) u->buffer[c]) + frame_idx; *d = *s; } } + pa_memblock_release(chunk.memblock); pa_memblock_unref(chunk.memblock); u->frames_requested = 0; diff --git a/src/modules/module-jack-source.c b/src/modules/module-jack-source.c index 8ca24035..e19b2181 100644 --- a/src/modules/module-jack-source.c +++ b/src/modules/module-jack-source.c @@ -136,23 +136,28 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_ unsigned fs; jack_nframes_t frame_idx; pa_memchunk chunk; + void *p; fs = pa_frame_size(&u->source->sample_spec); chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs); chunk.index = 0; + p = pa_memblock_acquire(chunk.memblock); + for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) { unsigned c; for (c = 0; c < u->channels; c++) { float *s = ((float*) u->buffer[c]) + frame_idx; - float *d = ((float*) ((uint8_t*) chunk.memblock->data + chunk.index)) + (frame_idx * u->channels) + c; + float *d = ((float*) ((uint8_t*) p + chunk.index)) + (frame_idx * u->channels) + c; *d = *s; } } + pa_memblock_release(chunk.memblock); + pa_source_post(u->source, &chunk); pa_memblock_unref(chunk.memblock); diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index 54a8e890..8cf961b9 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -28,7 +28,6 @@ #include <stdlib.h> #include <sys/stat.h> #include <stdio.h> -#include <assert.h> #include <errno.h> #include <string.h> #include <fcntl.h> @@ -38,6 +37,7 @@ #include <pulse/timeval.h> #include <pulse/xmalloc.h> +#include <pulsecore/macro.h> #include <pulsecore/iochannel.h> #include <pulsecore/sink.h> #include <pulsecore/module.h> @@ -64,11 +64,9 @@ struct userdata { pa_core *core; pa_module *module; pa_sink *sink; - pa_time_event *time_event; + pa_thread *thread; size_t block_size; - - uint64_t n_bytes; - struct timeval start_time; + struct timeval timestamp; }; static const char* const valid_modargs[] = { @@ -81,35 +79,131 @@ static const char* const valid_modargs[] = { NULL }; -static void time_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) { +static void thread_func(void *userdata) { struct userdata *u = userdata; - pa_memchunk chunk; - struct timeval ntv = *tv; - size_t l; - - assert(u); - - if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) { - l = chunk.length; - pa_memblock_unref(chunk.memblock); - } else - l = u->block_size; - - pa_timeval_add(&ntv, pa_bytes_to_usec(l, &u->sink->sample_spec)); - m->time_restart(e, &ntv); - - u->n_bytes += l; -} - -static pa_usec_t get_latency(pa_sink *s) { - struct userdata *u = s->userdata; - pa_usec_t a, b; - struct timeval now; + int quit = 0; + struct pollfd pollfd; + int running = 1; + + pa_assert(u); + + pa_log_debug("Thread starting up"); + + memset(&pollfd, 0, sizeof(pollfd)); + pollfd.fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP); + pollfd.events = POLLIN; + + pa_gettimeofday(u->timestamp); + + for (;;) { + int code; + void *data, *object; + int r, timeout; + struct timeval now; + + /* Check whether there is a message for us to process */ + if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) { + + + /* Now process these messages our own way */ + if (!object) { + + switch (code) { + case PA_MESSAGE_SHUTDOWN: + goto finish; + + default: + pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); + + } + + } else if (object == u->sink) { + + switch (code) { + case PA_SINK_MESSAGE_STOP: + pa_assert(running); + running = 0; + break; + + case PA_SINK_MESSAGE_START: + pa_assert(!running); + running = 1; + + pa_gettimeofday(u->timestamp); + break; + + case PA_SINK_MESSAGE_GET_LATENCY: + + if (pa_timeval_cmp(&u->timestamp, &now) > 0) + *((pa_usec_t*) data) = 0; + else + *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now); + break; + + /* ... */ + + default: + pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); + } + } + + pa_asyncmsgq_done(u->sink->asyncmsgq); + continue; + } + + /* Render some data and drop it immediately */ + + if (running) { + pa_gettimeofday(&now); + + if (pa_timeval_cmp(u->timestamp, &now) <= 0) { + pa_memchunk chunk; + size_t l; + + if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) { + l = chunk.length; + pa_memblock_unref(chunk.memblock); + } else + l = u->block_size; + + pa_timeval_add(&u->timestamp, pa_bytes_to_usec(l, &u->sink->sample_spec)); + continue; + } + + timeout = pa_timeval_diff(&u->timestamp, &now)/1000; + + if (timeout < 1) + timeout = 1; + } else + timeout = -1; + + /* Hmm, nothing to do. Let's sleep */ + + if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0) + continue; + + r = poll(&pollfd, 1, timeout); + pa_asyncmsgq_after_poll(u->sink->asyncmsgq); + + if (r < 0) { + if (errno == EINTR) + continue; + + pa_log("poll() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + pa_assert(r == 0 || pollfd.revents == POLLIN); + } - a = pa_timeval_diff(pa_gettimeofday(&now), &u->start_time); - b = pa_bytes_to_usec(u->n_bytes, &s->sample_spec); +fail: + /* We have to continue processing messages until we receive the + * SHUTDOWN message */ + pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), NULL, pa_module_unref); + pa_asyncmsgq_wait_for(PA_MESSAGE_SHUTDOWN); - return b > a ? b - a : 0; +finish: + pa_log_debug("Thread shutting down"); } int pa__init(pa_core *c, pa_module*m) { @@ -118,17 +212,17 @@ int pa__init(pa_core *c, pa_module*m) { pa_channel_map map; pa_modargs *ma = NULL; - assert(c); - assert(m); + pa_assert(c); + pa_assert(m); if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { - pa_log("failed to parse module arguments."); + pa_log("Failed to parse module arguments."); goto fail; } ss = c->default_sample_spec; if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) { - pa_log("invalid sample format specification or channel map."); + pa_log("Invalid sample format specification or channel map"); goto fail; } @@ -138,22 +232,24 @@ int pa__init(pa_core *c, pa_module*m) { m->userdata = u; if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) { - pa_log("failed to create sink."); + pa_log("Failed to create sink."); goto fail; } - u->sink->get_latency = get_latency; u->sink->userdata = u; pa_sink_set_owner(u->sink, m); pa_sink_set_description(u->sink, pa_modargs_get_value(ma, "description", "NULL sink")); - u->n_bytes = 0; - pa_gettimeofday(&u->start_time); - - u->time_event = c->mainloop->time_new(c->mainloop, &u->start_time, time_callback, u); - - u->block_size = pa_bytes_per_second(&ss) / 10; + u->block_size = pa_bytes_per_second(&ss) / 20; /* 50 ms */ + + if (u->block_size <= 0) + u->block_size = pa_frame_size(&ss); + if (!(u->thread = pa_thread_new(thread_func, u))) { + pa_log("Failed to create thread."); + goto fail; + } + pa_modargs_free(ma); return 0; @@ -169,15 +265,21 @@ fail: void pa__done(pa_core *c, pa_module*m) { struct userdata *u; - assert(c && m); + + pa_assert(c); + pa_assert(m); if (!(u = m->userdata)) return; pa_sink_disconnect(u->sink); - pa_sink_unref(u->sink); - u->core->mainloop->time_free(u->time_event); + if (u->thread) { + pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL); + pa_thread_free(u->thread); + } + + pa_sink_unref(u->sink); pa_xfree(u); } diff --git a/src/modules/module-oss-mmap.c b/src/modules/module-oss-mmap.c index 16c9b311..567230e0 100644 --- a/src/modules/module-oss-mmap.c +++ b/src/modules/module-oss-mmap.c @@ -173,7 +173,7 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) { u->out_fragment_size, 1); assert(chunk.memblock); - chunk.length = chunk.memblock->length; + chunk.length = pa_memblock_get_length(chunk.memblock); chunk.index = 0; pa_sink_render_into_full(u->sink, &chunk); @@ -217,7 +217,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) { if (!u->in_memblocks[u->in_current]) { chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1); - chunk.length = chunk.memblock->length; + chunk.length = pa_memblock_get_length(chunk.memblock); chunk.index = 0; pa_source_post(u->source, &chunk); diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c index 9d4d0eac..9061e110 100644 --- a/src/modules/module-oss.c +++ b/src/modules/module-oss.c @@ -158,6 +158,7 @@ static void do_write(struct userdata *u) { } do { + void *p; memchunk = &u->memchunk; if (!memchunk->length) @@ -165,10 +166,11 @@ static void do_write(struct userdata *u) { memchunk = &u->silence; assert(memchunk->memblock); - assert(memchunk->memblock->data); assert(memchunk->length); - if ((r = pa_iochannel_write(u->io, (uint8_t*) memchunk->memblock->data + memchunk->index, memchunk->length)) < 0) { + p = pa_memblock_acquire(memchunk->memblock); + if ((r = pa_iochannel_write(u->io, (uint8_t*) p + memchunk->index, memchunk->length)) < 0) { + pa_memblock_release(memchunk->memblock); if (errno != EAGAIN) { pa_log("write() failed: %s", pa_cstrerror(errno)); @@ -180,6 +182,8 @@ static void do_write(struct userdata *u) { break; } + pa_memblock_release(memchunk->memblock); + if (memchunk == &u->silence) assert(r % u->sample_size == 0); else { @@ -224,9 +228,13 @@ static void do_read(struct userdata *u) { } do { + void *p; memchunk.memblock = pa_memblock_new(u->core->mempool, l); - assert(memchunk.memblock); - if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) { + + p = pa_memblock_acquire(memchunk.memblock); + + if ((r = pa_iochannel_read(u->io, p, pa_memblock_get_length(memchunk.memblock))) < 0) { + pa_memblock_release(memchunk.memblock); pa_memblock_unref(memchunk.memblock); if (errno != EAGAIN) { @@ -239,8 +247,10 @@ static void do_read(struct userdata *u) { break; } - assert(r <= (ssize_t) memchunk.memblock->length); - memchunk.length = memchunk.memblock->length = r; + pa_memblock_release(memchunk.memblock); + + assert(r <= (ssize_t) pa_memblock_get_length(memchunk.memblock)); + memchunk.length = r; memchunk.index = 0; pa_source_post(u->source, &memchunk); diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 170b046e..61672ede 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -58,20 +58,16 @@ PA_MODULE_USAGE( "rate=<sample rate>" "channel_map=<channel map>") -#define DEFAULT_FIFO_NAME "/tmp/music.output" +#define DEFAULT_FILE_NAME "/tmp/music.output" #define DEFAULT_SINK_NAME "fifo_output" struct userdata { pa_core *core; - - char *filename; - - pa_sink *sink; - pa_iochannel *io; - pa_defer_event *defer_event; - - pa_memchunk memchunk; pa_module *module; + pa_sink *sink; + char *filename; + int fd; + pa_thread *thread; }; static const char* const valid_modargs[] = { @@ -84,97 +80,203 @@ static const char* const valid_modargs[] = { NULL }; -static void do_write(struct userdata *u) { - ssize_t r; - assert(u); +enum { + POLLFD_ASYNCQ, + POLLFD_FIFO, + POLLFD_MAX, +}; - u->core->mainloop->defer_enable(u->defer_event, 0); +static void thread_func(void *userdata) { + struct userdata *u = userdata; + int quit = 0; + struct pollfd pollfd[POLLFD_MAX]; + int running = 1, underrun = 0; + pa_memchunk memchunk; - if (!pa_iochannel_is_writable(u->io)) - return; + pa_assert(u); - pa_module_set_used(u->module, pa_sink_used_by(u->sink)); + pa_log_debug("Thread starting up"); + + memset(&pollfd, 0, sizeof(pollfd)); + pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP); + pollfd[POLLFD_ASYNCQ].events = POLLIN; - if (!u->memchunk.length) - if (pa_sink_render(u->sink, PIPE_BUF, &u->memchunk) < 0) - return; + pollfd[POLLFD_FIFO].fd = u->fd; - assert(u->memchunk.memblock && u->memchunk.length); + memset(&memchunk, 0, sizeof(memchunk)); + + for (;;) { + int code; + void *object, *data; + int r; + struct timeval now; - if ((r = pa_iochannel_write(u->io, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) { - pa_log("write(): %s", pa_cstrerror(errno)); - return; - } + /* Check whether there is a message for us to process */ + if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) { - u->memchunk.index += r; - u->memchunk.length -= r; - if (u->memchunk.length <= 0) { - pa_memblock_unref(u->memchunk.memblock); - u->memchunk.memblock = NULL; + /* Now process these messages our own way */ + if (!object) { + switch (code) { + case PA_SINK_MESSAGE_SHUTDOWN: + goto finish; + + default: + pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); + } + + } else if (object == u->sink) { + + case PA_SINK_MESSAGE_STOP: + pa_assert(running); + running = 0; + break; + + case PA_SINK_MESSAGE_START: + pa_assert(!running); + running = 1; + break; + + case PA_SINK_MESSAGE_GET_LATENCY: { + size_t n = 0; + int l; + + if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0) + n = (size_t) l; + + n += memchunk.length; + + *((pa_usec_t*) data) pa_bytes_to_usec(n, &u->sink->sample_spec); + break; + } + + /* ... */ + + default: + pa_sink_process_msg(u->sink->asyncmsgq, object, code, data); + } + + pa_asyncmsgq_done(u->sink->asyncmsgq); + continue; + } + + /* Render some data and write it to the fifo */ + + if (running && (pollfd[POLLFD_FIFO].revents || underrun)) { + + if (chunk.length <= 0) + pa_sink_render(u->fd, PIPE_BUF, &chunk); + + underrun = chunk.length <= 0; + + if (!underrun) { + ssize_t l; + + p = pa_memblock_acquire(u->memchunk.memblock); + l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length); + pa_memblock_release(p); + + if (l < 0) { + + if (errno != EINTR && errno != EAGAIN) { + pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); + goto fail; + } + + } else { + + u->memchunk.index += l; + u->memchunk.length -= l; + + if (u->memchunk.length <= 0) { + pa_memblock_unref(u->memchunk.memblock); + u->memchunk.memblock = NULL; + } + } + + pollfd[POLLFD_FIFO].revents = 0; + continue; + } + } + + pollfd[POLLFD_FIFO].events = running && !underrun ? POLLOUT : 0; + + /* Hmm, nothing to do. Let's sleep */ + + if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0) + continue; + + r = poll(&pollfd, 1, 0); + pa_asyncmsgq_after_poll(u->sink->asyncmsgq); + + if (r < 0) { + if (errno == EINTR) + continue; + + pa_log("poll() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (pollfd[POLLFD_FIFO].revents & ~POLLIN) { + pa_log("FIFO shutdown."); + goto fail; + } + + pa_assert(pollfd[POLLFD_ASYNCQ].revents & ~POLLIN == 0); } -} - -static void notify_cb(pa_sink*s) { - struct userdata *u = s->userdata; - assert(s && u); - - if (pa_iochannel_is_writable(u->io)) - u->core->mainloop->defer_enable(u->defer_event, 1); -} - -static pa_usec_t get_latency_cb(pa_sink *s) { - struct userdata *u = s->userdata; - assert(s && u); - - return u->memchunk.memblock ? pa_bytes_to_usec(u->memchunk.length, &s->sample_spec) : 0; -} - -static void defer_callback(PA_GCC_UNUSED pa_mainloop_api *m, PA_GCC_UNUSED pa_defer_event*e, void *userdata) { - struct userdata *u = userdata; - assert(u); - do_write(u); -} + +fail: + /* We have to continue processing messages until we receive the + * SHUTDOWN message */ + pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), pa_module_unref); + pa_asyncmsgq_wait_for(PA_SINK_MESSAGE_SHUTDOWN); -static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void*userdata) { - struct userdata *u = userdata; - assert(u); - do_write(u); +finish: + pa_log_debug("Thread shutting down"); } int pa__init(pa_core *c, pa_module*m) { struct userdata *u = NULL; struct stat st; - const char *p; - int fd = -1; pa_sample_spec ss; pa_channel_map map; pa_modargs *ma = NULL; char *t; - assert(c && m); + pa_assert(c); + pa_assert(m); if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { - pa_log("failed to parse module arguments"); + pa_log("Failed to parse module arguments."); goto fail; } ss = c->default_sample_spec; if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) { - pa_log("invalid sample format specification"); + pa_log("Invalid sample format specification"); goto fail; } - mkfifo(p = pa_modargs_get_value(ma, "file", DEFAULT_FIFO_NAME), 0777); - - if ((fd = open(p, O_RDWR)) < 0) { + u = pa_xnew0(struct userdata, 1); + u->core = c; + u->module = m; + u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FIFO_NAME)); + u->fd = fd; + u->memchunk.memblock = NULL; + u->memchunk.length = 0; + m->userdata = u; + + mkfifo(u->filename, 0666); + + if ((u->fd = open(u->filename, O_RDWR)) < 0) { pa_log("open('%s'): %s", p, pa_cstrerror(errno)); goto fail; } - pa_fd_set_cloexec(fd, 1); + pa_fd_set_cloexec(u->fd, 1); + pa_make_nonblock_fd(u->fd); - if (fstat(fd, &st) < 0) { + if (fstat(u->fd, &st) < 0) { pa_log("fstat('%s'): %s", p, pa_cstrerror(errno)); goto fail; } @@ -184,34 +286,21 @@ int pa__init(pa_core *c, pa_module*m) { goto fail; } - u = pa_xmalloc0(sizeof(struct userdata)); - u->filename = pa_xstrdup(p); - u->core = c; - u->module = m; - m->userdata = u; - if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) { - pa_log("failed to create sink."); + pa_log("Failed to create sink."); goto fail; } - u->sink->notify = notify_cb; - u->sink->get_latency = get_latency_cb; + u->sink->userdata = u; pa_sink_set_owner(u->sink, m); pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", p)); pa_xfree(t); - u->io = pa_iochannel_new(c->mainloop, -1, fd); - assert(u->io); - pa_iochannel_set_callback(u->io, io_callback, u); - - u->memchunk.memblock = NULL; - u->memchunk.length = 0; - - u->defer_event = c->mainloop->defer_new(c->mainloop, defer_callback, u); - assert(u->defer_event); - c->mainloop->defer_enable(u->defer_event, 0); - + if (!(u->thread = pa_thread_new(thread_func, u))) { + pa_log("Failed to create thread."); + goto fail; + } + pa_modargs_free(ma); return 0; @@ -220,9 +309,6 @@ fail: if (ma) pa_modargs_free(ma); - if (fd >= 0) - close(fd); - pa__done(c, m); return -1; @@ -230,22 +316,31 @@ fail: void pa__done(pa_core *c, pa_module*m) { struct userdata *u; - assert(c && m); + pa_assert(c); + pa_assert(m); if (!(u = m->userdata)) return; - if (u->memchunk.memblock) - pa_memblock_unref(u->memchunk.memblock); - pa_sink_disconnect(u->sink); + + if (u->thread) { + pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL); + pa_thread_free(u->thread); + } + pa_sink_unref(u->sink); - pa_iochannel_free(u->io); - u->core->mainloop->defer_free(u->defer_event); - assert(u->filename); - unlink(u->filename); - pa_xfree(u->filename); + if (u->memchunk.memblock) + pa_memblock_unref(u->memchunk.memblock); + + if (u->filename) { + unlink(u->filename); + pa_xfree(u->filename); + } + + if (u->fd >= 0) + close(u->fd); pa_xfree(u); } diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index 56c721b0..f275c5d4 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -84,7 +84,9 @@ static const char* const valid_modargs[] = { static void do_read(struct userdata *u) { ssize_t r; + void *p; pa_memchunk chunk; + assert(u); if (!pa_iochannel_is_readable(u->io)) @@ -97,17 +99,22 @@ static void do_read(struct userdata *u) { u->chunk.index = chunk.length = 0; } - assert(u->chunk.memblock && u->chunk.memblock->length > u->chunk.index); - if ((r = pa_iochannel_read(u->io, (uint8_t*) u->chunk.memblock->data + u->chunk.index, u->chunk.memblock->length - u->chunk.index)) <= 0) { + assert(u->chunk.memblock); + assert(pa_memblock_get_length(u->chunk.memblock) > u->chunk.index); + + p = pa_memblock_acquire(u->chunk.memblock); + if ((r = pa_iochannel_read(u->io, (uint8_t*) p + u->chunk.index, pa_memblock_get_length(u->chunk.memblock) - u->chunk.index)) <= 0) { + pa_memblock_release(u->chunk.memblock); pa_log("read(): %s", pa_cstrerror(errno)); return; } + pa_memblock_release(u->chunk.memblock); u->chunk.length = r; pa_source_post(u->source, &u->chunk); u->chunk.index += r; - if (u->chunk.index >= u->chunk.memblock->length) { + if (u->chunk.index >= pa_memblock_get_length(u->chunk.memblock)) { u->chunk.index = u->chunk.length = 0; pa_memblock_unref(u->chunk.memblock); u->chunk.memblock = NULL; diff --git a/src/modules/module-sine.c b/src/modules/module-sine.c index 661455b3..baf37346 100644 --- a/src/modules/module-sine.c +++ b/src/modules/module-sine.c @@ -65,7 +65,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { chunk->memblock = pa_memblock_ref(u->memblock); chunk->index = u->peek_index; - chunk->length = u->memblock->length - u->peek_index; + chunk->length = pa_memblock_get_length(u->memblock) - u->peek_index; return 0; } @@ -74,11 +74,12 @@ static void sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t l assert(i && chunk && length && i->userdata); u = i->userdata; - assert(chunk->memblock == u->memblock && length <= u->memblock->length-u->peek_index); + assert(chunk->memblock == u->memblock); + assert(length <= pa_memblock_get_length(u->memblock)-u->peek_index); u->peek_index += length; - if (u->peek_index >= u->memblock->length) + if (u->peek_index >= pa_memblock_get_length(u->memblock)) u->peek_index = 0; } @@ -111,6 +112,7 @@ int pa__init(pa_core *c, pa_module*m) { pa_sample_spec ss; uint32_t frequency; char t[256]; + void *p; pa_sink_input_new_data data; if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { @@ -142,7 +144,9 @@ int pa__init(pa_core *c, pa_module*m) { } u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss)); - calc_sine(u->memblock->data, u->memblock->length, frequency); + p = pa_memblock_acquire(u->memblock); + calc_sine(p, pa_memblock_get_length(u->memblock), frequency); + pa_memblock_release(u->memblock); snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency); diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index 03a01412..f0ab7d8a 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -81,7 +81,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) { size_t k = n + chunk.length > size ? size - n : chunk.length; if (chunk.memblock) { - iov[iov_idx].iov_base = (void*)((uint8_t*) chunk.memblock->data + chunk.index); + iov[iov_idx].iov_base = (void*)((uint8_t*) pa_memblock_acquire(chunk.memblock) + chunk.index); iov[iov_idx].iov_len = k; mb[iov_idx] = chunk.memblock; iov_idx ++; @@ -116,8 +116,10 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) { k = sendmsg(c->fd, &m, MSG_DONTWAIT); - for (i = 1; i < iov_idx; i++) + for (i = 1; i < iov_idx; i++) { + pa_memblock_release(mb[i]); pa_memblock_unref(mb[i]); + } c->sequence++; } else @@ -174,7 +176,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { chunk->memblock = pa_memblock_new(pool, size); - iov.iov_base = chunk->memblock->data; + iov.iov_base = pa_memblock_acquire(chunk->memblock); iov.iov_len = size; m.msg_name = NULL; @@ -195,9 +197,9 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { goto fail; } - memcpy(&header, chunk->memblock->data, sizeof(uint32_t)); - memcpy(&c->timestamp, (uint8_t*) chunk->memblock->data + 4, sizeof(uint32_t)); - memcpy(&c->ssrc, (uint8_t*) chunk->memblock->data + 8, sizeof(uint32_t)); + memcpy(&header, iov.iov_base, sizeof(uint32_t)); + memcpy(&c->timestamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t)); + memcpy(&c->ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t)); header = ntohl(header); c->timestamp = ntohl(c->timestamp); @@ -238,8 +240,10 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { return 0; fail: - if (chunk->memblock) + if (chunk->memblock) { + pa_memblock_release(chunk->memblock); pa_memblock_unref(chunk->memblock); + } return -1; } |