From dd9398075cc85eeb949312c9632038d5fef3d48a Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 27 Aug 2004 01:30:56 +0000 Subject: make it work that is: more or less git-svn-id: file:///home/lennart/svn/public/xmms-pulse/trunk@8 ef929aba-56e2-0310-84e0-b7573d389508 --- src/plugin.c | 263 +++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 183 insertions(+), 80 deletions(-) diff --git a/src/plugin.c b/src/plugin.c index 640540d..6d39c13 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1,3 +1,5 @@ +/* $Id$ */ + #include #include #include @@ -31,6 +33,7 @@ struct request { size_t length; int success, done; uint32_t value; + pa_usec_t latency; pa_volume_t volume; }; @@ -46,8 +49,9 @@ static struct pa_mainloop_api *mainloop_api = NULL; static int failed = 0; static pa_volume_t volume = PA_VOLUME_NORM; static size_t written = 0; -static int do_trigger; +static int do_trigger = 0, triggered = 0; static struct pa_sample_spec sample_spec; +static int locked_by_thread = 0; static void *memdup(void *p, size_t l) { void *r = malloc(l); @@ -55,18 +59,35 @@ static void *memdup(void *p, size_t l) { return r; } +static void lock_request(void) { + assert(thread_running && pthread_equal(pthread_self(), thread_id)); + + if (!(locked_by_thread++)) + pthread_mutex_lock(&request_mutex); +} + +static void unlock_request(void) { + assert(thread_running && pthread_equal(pthread_self(), thread_id) && locked_by_thread > 0); + + if (!(--locked_by_thread)) + pthread_mutex_unlock(&request_mutex); +} + static void finish_request(int success) { - failed = success; + if (!success) + failed = 1; - pthread_mutex_lock(&request_mutex); + lock_request(); if (current_request) { + if (!success) + fprintf(stderr, "failure: %i\n", success); current_request->done = 1; current_request->success = success; - pthread_cond_signal(&request_cond); + pthread_cond_broadcast(&request_cond); } - - pthread_mutex_unlock(&request_mutex); + + unlock_request(); } static void info_callback(struct pa_context *c, const struct pa_sink_input_info *i, int is_last, void *userdata) { @@ -106,6 +127,8 @@ static void stream_state_callback(struct pa_stream *s, void *userdata) { static void context_state_callback(struct pa_context *c, void *userdata) { assert(c && c == context); + fprintf(stderr, "state\n"); + switch (pa_context_get_state(c)) { case PA_CONTEXT_CONNECTING: case PA_CONTEXT_AUTHORIZING: @@ -132,6 +155,7 @@ static void context_state_callback(struct pa_context *c, void *userdata) { static void context_drain_callback(struct pa_context *c, void *userdata) { assert(c && c == context); mainloop_api->quit(mainloop_api, 0); + finish_request(1); } static void stream_success_callback(struct pa_stream *s, int success, void *userdata) { @@ -144,32 +168,39 @@ static void context_success_callback(struct pa_context *c, int success, void *us finish_request(!!success); } -static void latency_callback(struct pa_stream *s, pa_usec_t latency, void *userdata) { +static void latency_callback(struct pa_stream *s, const struct pa_latency_info* i, void *userdata) { assert(s == stream && s); assert(current_request && current_request->message == MESSAGE_LATENCY); - current_request->value = latency; - finish_request(latency != (pa_usec_t) -1); + if (i) { + current_request->latency = i->buffer_usec + i->sink_usec; + current_request->value = i->playing; + } + finish_request(!!i); } static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int fd, enum pa_io_event_flags f, void *userdata) { char x; - fprintf(stderr, "REQUEST\n"); + /*fprintf(stderr, "REQUEST %p\n", current_request);*/ - assert(api && io && f == PA_IO_EVENT_INPUT); - read(pipe_fds[0], &x, 1); + read(fd, &x, 1); - pthread_mutex_lock(&request_mutex); + lock_request(); if (current_request) { - if (failed) { + if (failed && current_request->message != MESSAGE_CLOSE) { finish_request(0); } else { + + /*fprintf(stderr, "req: %i\n", current_request->message);*/ + switch (current_request->message) { case MESSAGE_OPEN: assert(!context && !stream); + + fprintf(stderr, "opening\n"); context = pa_context_new(api, "xmms"); assert(context); pa_context_set_state_callback(context, context_state_callback, NULL); @@ -178,11 +209,16 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int case MESSAGE_CLOSE: { struct pa_operation *o; + + fprintf(stderr, "closing\n"); + assert(context); if ((o = pa_context_drain(context, context_drain_callback, NULL))) pa_operation_unref(o); - else + else { api->quit(mainloop_api, 0); + finish_request(1); + } break; } @@ -205,8 +241,11 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int break; case MESSAGE_LATENCY: - assert(context && stream); - pa_operation_unref(pa_stream_get_latency(stream, latency_callback, NULL)); + if (!context || !stream) { + current_request->latency = 0; + finish_request(1); + } else + pa_operation_unref(pa_stream_get_latency(stream, latency_callback, NULL)); break; case MESSAGE_WRITABLE: @@ -223,7 +262,11 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int case MESSAGE_SETVOLUME: assert(context && stream); - pa_operation_unref(pa_context_set_sink_input_volume(context, pa_stream_get_index(stream), current_request->volume, context_success_callback, NULL)); + + if (current_request->volume == volume) + finish_request(1); + else + pa_operation_unref(pa_context_set_sink_input_volume(context, pa_stream_get_index(stream), current_request->volume, context_success_callback, NULL)); break; case MESSAGE_TRIGGER: @@ -235,7 +278,9 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int } } - pthread_mutex_unlock(&request_mutex); + unlock_request(); + + /*fprintf(stderr, "REQ_DONE\n");*/ } static void* thread_func(void *t) { @@ -244,7 +289,10 @@ static void* thread_func(void *t) { assert(pipe_fds[0] >= 0 && !mainloop_api); - failed = 0; + fprintf(stderr, "THREAD START\n"); + + failed = locked_by_thread = 0; + thread_running = 1; m = pa_mainloop_new(); assert(m); @@ -256,10 +304,25 @@ static void* thread_func(void *t) { pa_mainloop_run(m, NULL); + if (context) { + pa_context_unref(context); + context = NULL; + } + + if (stream) { + pa_stream_unref(stream); + stream = NULL; + } + mainloop_api->io_free(io); pa_mainloop_free(m); mainloop_api = NULL; + thread_running = 0; + + pthread_cond_broadcast(&request_cond); + + fprintf(stderr, "THREAD STOP\n"); return NULL; } @@ -275,38 +338,57 @@ static void start_thread(void) { r = pthread_create(&thread_id, NULL, thread_func, NULL); assert(!r); + + thread_running = 1; } static void stop_thread(void) { - assert(thread_running); - + fprintf(stderr, "joining\n"); pthread_join(thread_id, NULL); thread_running = 0; - assert(!current_request); close(pipe_fds[0]); close(pipe_fds[1]); pipe_fds[0] = pipe_fds[1] = -1; + fprintf(stderr, "join done\n"); } -static void request_execute(struct request *r) { +static void execute_request(struct request *r) { char x = 'x'; assert(r); - + r->success = r->done = 0; pthread_mutex_lock(&request_mutex); - assert(!current_request); - current_request = r; - assert(pipe_fds[1] >= 0); - write(pipe_fds[1], &x, sizeof(x)); - - while (!r->done) + while (current_request && thread_running) pthread_cond_wait(&request_cond, &request_mutex); + + if (!thread_running) { + r->success = 0; + r->done = 1; + } else { + current_request = r; + + assert(pipe_fds[1] >= 0); + write(pipe_fds[1], &x, sizeof(x)); - current_request = NULL; + while (!r->done && thread_running) { + pthread_cond_wait(&request_cond, &request_mutex); + /*fprintf(stderr, "COND done\n"); */ + } + + if (!thread_running) { + r->success = 0; + r->done = 1; + } + + current_request = NULL; + + /* Notify other waiting threads, that the request was completed */ + pthread_cond_broadcast(&request_cond); + } pthread_mutex_unlock(&request_mutex); } @@ -314,10 +396,16 @@ static void request_execute(struct request *r) { static void polyp_get_volume(int *l, int *r) { struct request req; int v; - fprintf(stderr, "get_volume\n"); + /*fprintf(stderr, "get_volume\n");*/ + req.message = MESSAGE_GETVOLUME; - request_execute(&req); + execute_request(&req); + + if (!req.success) { + *l = *r = 100; + return; + } v = (req.volume*100)/PA_VOLUME_NORM; @@ -326,101 +414,124 @@ static void polyp_get_volume(int *l, int *r) { void polyp_set_volume(int l, int r) { struct request req; - fprintf(stderr, "set_volume\n"); + + /*fprintf(stderr, "set_volume\n");*/ req.message = MESSAGE_SETVOLUME; req.volume = ((l+r)*PA_VOLUME_NORM)/200; - request_execute(&req); + execute_request(&req); } - static void polyp_pause(short b) { struct request r; + fprintf(stderr, "*** PAUSE ***\n"); + fprintf(stderr, "pause: %s\n", b ? "yes" : "no"); r.message = b ? MESSAGE_PAUSE : MESSAGE_UNPAUSE; - request_execute(&r); + execute_request(&r); } static int polyp_free(void) { int ret; struct request r; - fprintf(stderr, "free\n"); - r.message = MESSAGE_WRITABLE; - request_execute(&r); + execute_request(&r); + + if (!r.success) + return 0; ret = r.value; + fprintf(stderr, "free: %u", ret); - if (do_trigger) { + if (do_trigger && !triggered) { + fprintf(stderr, "trigger\n"); r.message = MESSAGE_TRIGGER; - request_execute(&r); + execute_request(&r); + triggered = 1; } - do_trigger = 1; + do_trigger = !!ret; return ret; } static int polyp_playing(void) { struct request r; - fprintf(stderr, "playing\n"); - r.message = MESSAGE_LATENCY; - request_execute(&r); + execute_request(&r); + if (!r.success) + return 0; + + fprintf(stderr, "playing : %s\n", r.value ? "yes" : "no"); + return r.value != 0; } static int polyp_get_written_time(void) { - fprintf(stderr, "get_written_time\n"); - - return ((written/pa_frame_size(&sample_spec))*1000)/sample_spec.rate; + return (int) ((((double) written/pa_frame_size(&sample_spec))*1000)/sample_spec.rate); } static int polyp_get_output_time(void) { int t, ms; struct request r; - fprintf(stderr, "get_output_time\n"); - + r.message = MESSAGE_LATENCY; - request_execute(&r); + execute_request(&r); - t = polyp_get_output_time(); - ms = r.value/1000; - - if (ms > t) + if (!r.success) return 0; - else - return t-ms; + + t = polyp_get_written_time(); + ms = r.latency/1000; + + return ms > t ? 0 : t-ms+100; } static void polyp_flush(int time) { struct request r; - fprintf(stderr, "flush\n"); + + fprintf(stderr, "*** FLUSH ***\n"); + + /*fprintf(stderr, "flush: %i\n", time);*/ r.message = MESSAGE_FLUSH; - request_execute(&r); + execute_request(&r); - written = (time*sample_spec.rate/1000)*pa_frame_size(&sample_spec); + written = (size_t) (((double)time*sample_spec.rate/1000)*pa_frame_size(&sample_spec)); + fprintf(stderr, "flush: %u -- %u\n", written, time); } static void polyp_write(void* ptr, int length) { struct request r; - fprintf(stderr, "write\n"); r.message = MESSAGE_WRITE; r.data = memdup(ptr, length); r.length = length; - request_execute(&r); + execute_request(&r); written += length; do_trigger = 0; } +static void polyp_close(void) { + struct request r; + fprintf(stderr, "close\n"); + + assert(thread_running); + + fprintf(stderr, "msg close\n"); + r.message = MESSAGE_CLOSE; + execute_request(&r); + fprintf(stderr, "msg done\n"); + + stop_thread(); +} + static int polyp_open(AFormat fmt, int rate, int nch) { struct request r; fprintf(stderr, "open\n"); @@ -446,32 +557,24 @@ static int polyp_open(AFormat fmt, int rate, int nch) { start_thread(); + fprintf(stderr, "go for req\n"); r.message = MESSAGE_OPEN; - request_execute(&r); + execute_request(&r); + fprintf(stderr, "done req\n"); if (!r.success) { - stop_thread(); + polyp_close(); + fprintf(stderr, "open() failed.\n"); return 0; } - written = do_trigger = failed = 0; + written = do_trigger = triggered = failed = 0; + + fprintf(stderr, "READY\n"); return 1; } -static void polyp_close(void) { - struct request r; - fprintf(stderr, "close\n"); - - assert(thread_running); - - r.message = MESSAGE_CLOSE; - request_execute(&r); - - stop_thread(); -} - - static void polyp_init(void) { fprintf(stderr, "init\n"); } -- cgit