summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2004-08-27 01:30:56 +0000
committerLennart Poettering <lennart@poettering.net>2004-08-27 01:30:56 +0000
commitdd9398075cc85eeb949312c9632038d5fef3d48a (patch)
treecc2fb92a77462a68229b824d5fcc780ffe5dd27e
parent39ed60f403d82a78ec75dfc8f1ea67c710f7125b (diff)
make it work
that is: more or less git-svn-id: file:///home/lennart/svn/public/xmms-pulse/trunk@8 ef929aba-56e2-0310-84e0-b7573d389508
-rw-r--r--src/plugin.c263
1 files changed, 183 insertions, 80 deletions
diff --git a/src/plugin.c b/src/plugin.c
index 640540d..6d39c13 100644
--- a/src/plugin.c
+++ b/src/plugin.c
@@ -1,3 +1,5 @@
+/* $Id$ */
+
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
@@ -31,6 +33,7 @@ struct request {
size_t length;
int success, done;
uint32_t value;
+ pa_usec_t latency;
pa_volume_t volume;
};
@@ -46,8 +49,9 @@ static struct pa_mainloop_api *mainloop_api = NULL;
static int failed = 0;
static pa_volume_t volume = PA_VOLUME_NORM;
static size_t written = 0;
-static int do_trigger;
+static int do_trigger = 0, triggered = 0;
static struct pa_sample_spec sample_spec;
+static int locked_by_thread = 0;
static void *memdup(void *p, size_t l) {
void *r = malloc(l);
@@ -55,18 +59,35 @@ static void *memdup(void *p, size_t l) {
return r;
}
+static void lock_request(void) {
+ assert(thread_running && pthread_equal(pthread_self(), thread_id));
+
+ if (!(locked_by_thread++))
+ pthread_mutex_lock(&request_mutex);
+}
+
+static void unlock_request(void) {
+ assert(thread_running && pthread_equal(pthread_self(), thread_id) && locked_by_thread > 0);
+
+ if (!(--locked_by_thread))
+ pthread_mutex_unlock(&request_mutex);
+}
+
static void finish_request(int success) {
- failed = success;
+ if (!success)
+ failed = 1;
- pthread_mutex_lock(&request_mutex);
+ lock_request();
if (current_request) {
+ if (!success)
+ fprintf(stderr, "failure: %i\n", success);
current_request->done = 1;
current_request->success = success;
- pthread_cond_signal(&request_cond);
+ pthread_cond_broadcast(&request_cond);
}
-
- pthread_mutex_unlock(&request_mutex);
+
+ unlock_request();
}
static void info_callback(struct pa_context *c, const struct pa_sink_input_info *i, int is_last, void *userdata) {
@@ -106,6 +127,8 @@ static void stream_state_callback(struct pa_stream *s, void *userdata) {
static void context_state_callback(struct pa_context *c, void *userdata) {
assert(c && c == context);
+ fprintf(stderr, "state\n");
+
switch (pa_context_get_state(c)) {
case PA_CONTEXT_CONNECTING:
case PA_CONTEXT_AUTHORIZING:
@@ -132,6 +155,7 @@ static void context_state_callback(struct pa_context *c, void *userdata) {
static void context_drain_callback(struct pa_context *c, void *userdata) {
assert(c && c == context);
mainloop_api->quit(mainloop_api, 0);
+ finish_request(1);
}
static void stream_success_callback(struct pa_stream *s, int success, void *userdata) {
@@ -144,32 +168,39 @@ static void context_success_callback(struct pa_context *c, int success, void *us
finish_request(!!success);
}
-static void latency_callback(struct pa_stream *s, pa_usec_t latency, void *userdata) {
+static void latency_callback(struct pa_stream *s, const struct pa_latency_info* i, void *userdata) {
assert(s == stream && s);
assert(current_request && current_request->message == MESSAGE_LATENCY);
- current_request->value = latency;
- finish_request(latency != (pa_usec_t) -1);
+ if (i) {
+ current_request->latency = i->buffer_usec + i->sink_usec;
+ current_request->value = i->playing;
+ }
+ finish_request(!!i);
}
static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int fd, enum pa_io_event_flags f, void *userdata) {
char x;
- fprintf(stderr, "REQUEST\n");
+ /*fprintf(stderr, "REQUEST %p\n", current_request);*/
-
assert(api && io && f == PA_IO_EVENT_INPUT);
- read(pipe_fds[0], &x, 1);
+ read(fd, &x, 1);
- pthread_mutex_lock(&request_mutex);
+ lock_request();
if (current_request) {
- if (failed) {
+ if (failed && current_request->message != MESSAGE_CLOSE) {
finish_request(0);
} else {
+
+ /*fprintf(stderr, "req: %i\n", current_request->message);*/
+
switch (current_request->message) {
case MESSAGE_OPEN:
assert(!context && !stream);
+
+ fprintf(stderr, "opening\n");
context = pa_context_new(api, "xmms");
assert(context);
pa_context_set_state_callback(context, context_state_callback, NULL);
@@ -178,11 +209,16 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int
case MESSAGE_CLOSE: {
struct pa_operation *o;
+
+ fprintf(stderr, "closing\n");
+
assert(context);
if ((o = pa_context_drain(context, context_drain_callback, NULL)))
pa_operation_unref(o);
- else
+ else {
api->quit(mainloop_api, 0);
+ finish_request(1);
+ }
break;
}
@@ -205,8 +241,11 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int
break;
case MESSAGE_LATENCY:
- assert(context && stream);
- pa_operation_unref(pa_stream_get_latency(stream, latency_callback, NULL));
+ if (!context || !stream) {
+ current_request->latency = 0;
+ finish_request(1);
+ } else
+ pa_operation_unref(pa_stream_get_latency(stream, latency_callback, NULL));
break;
case MESSAGE_WRITABLE:
@@ -223,7 +262,11 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int
case MESSAGE_SETVOLUME:
assert(context && stream);
- pa_operation_unref(pa_context_set_sink_input_volume(context, pa_stream_get_index(stream), current_request->volume, context_success_callback, NULL));
+
+ if (current_request->volume == volume)
+ finish_request(1);
+ else
+ pa_operation_unref(pa_context_set_sink_input_volume(context, pa_stream_get_index(stream), current_request->volume, context_success_callback, NULL));
break;
case MESSAGE_TRIGGER:
@@ -235,7 +278,9 @@ static void request_func(struct pa_mainloop_api*api, struct pa_io_event *io, int
}
}
- pthread_mutex_unlock(&request_mutex);
+ unlock_request();
+
+ /*fprintf(stderr, "REQ_DONE\n");*/
}
static void* thread_func(void *t) {
@@ -244,7 +289,10 @@ static void* thread_func(void *t) {
assert(pipe_fds[0] >= 0 && !mainloop_api);
- failed = 0;
+ fprintf(stderr, "THREAD START\n");
+
+ failed = locked_by_thread = 0;
+ thread_running = 1;
m = pa_mainloop_new();
assert(m);
@@ -256,10 +304,25 @@ static void* thread_func(void *t) {
pa_mainloop_run(m, NULL);
+ if (context) {
+ pa_context_unref(context);
+ context = NULL;
+ }
+
+ if (stream) {
+ pa_stream_unref(stream);
+ stream = NULL;
+ }
+
mainloop_api->io_free(io);
pa_mainloop_free(m);
mainloop_api = NULL;
+ thread_running = 0;
+
+ pthread_cond_broadcast(&request_cond);
+
+ fprintf(stderr, "THREAD STOP\n");
return NULL;
}
@@ -275,38 +338,57 @@ static void start_thread(void) {
r = pthread_create(&thread_id, NULL, thread_func, NULL);
assert(!r);
+
+ thread_running = 1;
}
static void stop_thread(void) {
- assert(thread_running);
-
+ fprintf(stderr, "joining\n");
pthread_join(thread_id, NULL);
thread_running = 0;
- assert(!current_request);
close(pipe_fds[0]);
close(pipe_fds[1]);
pipe_fds[0] = pipe_fds[1] = -1;
+ fprintf(stderr, "join done\n");
}
-static void request_execute(struct request *r) {
+static void execute_request(struct request *r) {
char x = 'x';
assert(r);
-
+
r->success = r->done = 0;
pthread_mutex_lock(&request_mutex);
- assert(!current_request);
- current_request = r;
- assert(pipe_fds[1] >= 0);
- write(pipe_fds[1], &x, sizeof(x));
-
- while (!r->done)
+ while (current_request && thread_running)
pthread_cond_wait(&request_cond, &request_mutex);
+
+ if (!thread_running) {
+ r->success = 0;
+ r->done = 1;
+ } else {
+ current_request = r;
+
+ assert(pipe_fds[1] >= 0);
+ write(pipe_fds[1], &x, sizeof(x));
- current_request = NULL;
+ while (!r->done && thread_running) {
+ pthread_cond_wait(&request_cond, &request_mutex);
+ /*fprintf(stderr, "COND done\n"); */
+ }
+
+ if (!thread_running) {
+ r->success = 0;
+ r->done = 1;
+ }
+
+ current_request = NULL;
+
+ /* Notify other waiting threads, that the request was completed */
+ pthread_cond_broadcast(&request_cond);
+ }
pthread_mutex_unlock(&request_mutex);
}
@@ -314,10 +396,16 @@ static void request_execute(struct request *r) {
static void polyp_get_volume(int *l, int *r) {
struct request req;
int v;
- fprintf(stderr, "get_volume\n");
+ /*fprintf(stderr, "get_volume\n");*/
+
req.message = MESSAGE_GETVOLUME;
- request_execute(&req);
+ execute_request(&req);
+
+ if (!req.success) {
+ *l = *r = 100;
+ return;
+ }
v = (req.volume*100)/PA_VOLUME_NORM;
@@ -326,101 +414,124 @@ static void polyp_get_volume(int *l, int *r) {
void polyp_set_volume(int l, int r) {
struct request req;
- fprintf(stderr, "set_volume\n");
+
+ /*fprintf(stderr, "set_volume\n");*/
req.message = MESSAGE_SETVOLUME;
req.volume = ((l+r)*PA_VOLUME_NORM)/200;
- request_execute(&req);
+ execute_request(&req);
}
-
static void polyp_pause(short b) {
struct request r;
+ fprintf(stderr, "*** PAUSE ***\n");
+
fprintf(stderr, "pause: %s\n", b ? "yes" : "no");
r.message = b ? MESSAGE_PAUSE : MESSAGE_UNPAUSE;
- request_execute(&r);
+ execute_request(&r);
}
static int polyp_free(void) {
int ret;
struct request r;
- fprintf(stderr, "free\n");
-
r.message = MESSAGE_WRITABLE;
- request_execute(&r);
+ execute_request(&r);
+
+ if (!r.success)
+ return 0;
ret = r.value;
+ fprintf(stderr, "free: %u", ret);
- if (do_trigger) {
+ if (do_trigger && !triggered) {
+ fprintf(stderr, "trigger\n");
r.message = MESSAGE_TRIGGER;
- request_execute(&r);
+ execute_request(&r);
+ triggered = 1;
}
- do_trigger = 1;
+ do_trigger = !!ret;
return ret;
}
static int polyp_playing(void) {
struct request r;
- fprintf(stderr, "playing\n");
-
r.message = MESSAGE_LATENCY;
- request_execute(&r);
+ execute_request(&r);
+ if (!r.success)
+ return 0;
+
+ fprintf(stderr, "playing : %s\n", r.value ? "yes" : "no");
+
return r.value != 0;
}
static int polyp_get_written_time(void) {
- fprintf(stderr, "get_written_time\n");
-
- return ((written/pa_frame_size(&sample_spec))*1000)/sample_spec.rate;
+ return (int) ((((double) written/pa_frame_size(&sample_spec))*1000)/sample_spec.rate);
}
static int polyp_get_output_time(void) {
int t, ms;
struct request r;
- fprintf(stderr, "get_output_time\n");
-
+
r.message = MESSAGE_LATENCY;
- request_execute(&r);
+ execute_request(&r);
- t = polyp_get_output_time();
- ms = r.value/1000;
-
- if (ms > t)
+ if (!r.success)
return 0;
- else
- return t-ms;
+
+ t = polyp_get_written_time();
+ ms = r.latency/1000;
+
+ return ms > t ? 0 : t-ms+100;
}
static void polyp_flush(int time) {
struct request r;
- fprintf(stderr, "flush\n");
+
+ fprintf(stderr, "*** FLUSH ***\n");
+
+ /*fprintf(stderr, "flush: %i\n", time);*/
r.message = MESSAGE_FLUSH;
- request_execute(&r);
+ execute_request(&r);
- written = (time*sample_spec.rate/1000)*pa_frame_size(&sample_spec);
+ written = (size_t) (((double)time*sample_spec.rate/1000)*pa_frame_size(&sample_spec));
+ fprintf(stderr, "flush: %u -- %u\n", written, time);
}
static void polyp_write(void* ptr, int length) {
struct request r;
- fprintf(stderr, "write\n");
r.message = MESSAGE_WRITE;
r.data = memdup(ptr, length);
r.length = length;
- request_execute(&r);
+ execute_request(&r);
written += length;
do_trigger = 0;
}
+static void polyp_close(void) {
+ struct request r;
+ fprintf(stderr, "close\n");
+
+ assert(thread_running);
+
+ fprintf(stderr, "msg close\n");
+ r.message = MESSAGE_CLOSE;
+ execute_request(&r);
+ fprintf(stderr, "msg done\n");
+
+ stop_thread();
+}
+
static int polyp_open(AFormat fmt, int rate, int nch) {
struct request r;
fprintf(stderr, "open\n");
@@ -446,32 +557,24 @@ static int polyp_open(AFormat fmt, int rate, int nch) {
start_thread();
+ fprintf(stderr, "go for req\n");
r.message = MESSAGE_OPEN;
- request_execute(&r);
+ execute_request(&r);
+ fprintf(stderr, "done req\n");
if (!r.success) {
- stop_thread();
+ polyp_close();
+ fprintf(stderr, "open() failed.\n");
return 0;
}
- written = do_trigger = failed = 0;
+ written = do_trigger = triggered = failed = 0;
+
+ fprintf(stderr, "READY\n");
return 1;
}
-static void polyp_close(void) {
- struct request r;
- fprintf(stderr, "close\n");
-
- assert(thread_running);
-
- r.message = MESSAGE_CLOSE;
- request_execute(&r);
-
- stop_thread();
-}
-
-
static void polyp_init(void) {
fprintf(stderr, "init\n");
}