summaryrefslogtreecommitdiffstats
path: root/src/pulsecore/protocol-simple.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pulsecore/protocol-simple.c')
-rw-r--r--src/pulsecore/protocol-simple.c114
1 files changed, 58 insertions, 56 deletions
diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c
index 8f9aed58..0ded5d26 100644
--- a/src/pulsecore/protocol-simple.c
+++ b/src/pulsecore/protocol-simple.c
@@ -67,7 +67,7 @@ typedef struct connection {
PA_DECLARE_CLASS(connection);
#define CONNECTION(o) (connection_cast(o))
-static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
struct pa_protocol_simple {
pa_module *module;
@@ -91,9 +91,9 @@ enum {
};
enum {
- MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
- MESSAGE_POST_DATA, /* data from source output to main loop */
- MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
+ CONNECTION_MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
+ CONNECTION_MESSAGE_POST_DATA, /* data from source output to main loop */
+ CONNECTION_MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
};
@@ -102,29 +102,12 @@ enum {
#define RECORD_BUFFER_SECONDS (5)
#define RECORD_BUFFER_FRAGMENTS (100)
-static void connection_free(pa_object *o) {
- connection *c = CONNECTION(o);
+static void connection_unlink(connection *c) {
pa_assert(c);
- if (c->playback.current_memblock)
- pa_memblock_unref(c->playback.current_memblock);
-
- if (c->io)
- pa_iochannel_free(c->io);
- if (c->input_memblockq)
- pa_memblockq_free(c->input_memblockq);
- if (c->output_memblockq)
- pa_memblockq_free(c->output_memblockq);
-
- pa_xfree(c);
-}
-
-static void connection_drop(connection *c) {
- pa_assert(c);
-
- if (!pa_idxset_remove_by_data(c->protocol->connections, c, NULL))
+ if (!c->protocol)
return;
-
+
if (c->sink_input) {
pa_sink_input_disconnect(c->sink_input);
pa_sink_input_unref(c->sink_input);
@@ -142,9 +125,30 @@ static void connection_drop(connection *c) {
c->client = NULL;
}
+ pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
+ c->protocol = NULL;
connection_unref(c);
}
+static void connection_free(pa_object *o) {
+ connection *c = CONNECTION(o);
+ pa_assert(c);
+
+ connection_unref(c);
+
+ if (c->playback.current_memblock)
+ pa_memblock_unref(c->playback.current_memblock);
+
+ if (c->io)
+ pa_iochannel_free(c->io);
+ if (c->input_memblockq)
+ pa_memblockq_free(c->input_memblockq);
+ if (c->output_memblockq)
+ pa_memblockq_free(c->output_memblockq);
+
+ pa_xfree(c);
+}
+
static int do_read(connection *c) {
pa_memchunk chunk;
ssize_t r;
@@ -190,7 +194,7 @@ static int do_read(connection *c) {
c->playback.memblock_index += r;
- pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
+ pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
pa_atomic_sub(&c->playback.missing, r);
return 0;
@@ -263,28 +267,28 @@ fail:
pa_iochannel_free(c->io);
c->io = NULL;
- pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, NULL, NULL);
+ pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
} else
- connection_drop(c);
+ connection_unlink(c);
}
-static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
connection *c = CONNECTION(o);
connection_assert_ref(c);
switch (code) {
- case MESSAGE_REQUEST_DATA:
+ case CONNECTION_MESSAGE_REQUEST_DATA:
do_work(c);
break;
- case MESSAGE_POST_DATA:
+ case CONNECTION_MESSAGE_POST_DATA:
/* pa_log("got data %u", chunk->length); */
pa_memblockq_push_align(c->output_memblockq, chunk);
do_work(c);
break;
- case MESSAGE_DROP_CONNECTION:
- connection_drop(c);
+ case CONNECTION_MESSAGE_DROP_CONNECTION:
+ connection_unlink(c);
break;
}
@@ -294,13 +298,13 @@ static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_m
/*** sink_input callbacks ***/
/* Called from thread context */
-static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink_input *i = PA_SINK_INPUT(o);
connection*c;
- pa_assert(i);
- c = i->userdata;
- pa_assert(c);
+ pa_sink_input_assert_ref(i);
+ c = CONNECTION(i->userdata);
+ connection_assert_ref(c);
switch (code) {
@@ -330,7 +334,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_
}
default:
- return pa_sink_input_process_msg(o, code, userdata, chunk);
+ return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
}
}
@@ -349,7 +353,7 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
/* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */
if (c->dead && r < 0)
- pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_DROP_CONNECTION, NULL, 0, NULL, NULL);
return r;
}
@@ -369,19 +373,20 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
if (new > old) {
if (pa_atomic_add(&c->playback.missing, new - old) <= 0)
- pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
}
}
/* Called from main context */
static void sink_input_kill_cb(pa_sink_input *i) {
- pa_assert(i);
+ pa_sink_input_assert_ref(i);
- connection_drop(CONNECTION(i->userdata));
+ connection_unlink(CONNECTION(i->userdata));
}
/*** source_output callbacks ***/
+/* Called from thread context */
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
connection *c;
@@ -390,24 +395,22 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
pa_assert(c);
pa_assert(chunk);
- pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
}
+/* Called from main context */
static void source_output_kill_cb(pa_source_output *o) {
- connection*c;
+ pa_source_output_assert_ref(o);
- pa_assert(o);
- c = o->userdata;
- pa_assert(c);
-
- connection_drop(c);
+ connection_unlink(CONNECTION(o->userdata));
}
+/* Called from main context */
static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
connection*c;
pa_assert(o);
- c = o->userdata;
+ c = CONNECTION(o->userdata);
pa_assert(c);
return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
@@ -419,16 +422,16 @@ static void client_kill_cb(pa_client *client) {
connection*c;
pa_assert(client);
- c = client->userdata;
+ c = CONNECTION(client->userdata);
pa_assert(c);
- connection_drop(c);
+ connection_unlink(c);
}
/*** pa_iochannel callbacks ***/
static void io_callback(pa_iochannel*io, void *userdata) {
- connection *c = userdata;
+ connection *c = CONNECTION(userdata);
pa_assert(io);
pa_assert(c);
@@ -453,7 +456,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
return;
}
- c = pa_msgobject_new(connection, connection_check_type);
+ c = pa_msgobject_new(connection);
c->parent.parent.free = connection_free;
c->parent.process_msg = connection_process_msg;
c->io = io;
@@ -547,7 +550,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
pa_source_output_put(c->source_output);
}
-
pa_iochannel_set_callback(c->io, io_callback, c);
pa_idxset_put(p->connections, c, NULL);
@@ -555,7 +557,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
fail:
if (c)
- connection_drop(c);
+ connection_unlink(c);
}
pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
@@ -618,7 +620,7 @@ void pa_protocol_simple_free(pa_protocol_simple *p) {
if (p->connections) {
while((c = pa_idxset_first(p->connections, NULL)))
- connection_drop(c);
+ connection_unlink(c);
pa_idxset_free(p->connections, NULL, NULL);
}