summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/modules/module-raop-sink.c111
1 files changed, 82 insertions, 29 deletions
diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c
index 96c98a6f..51c2368a 100644
--- a/src/modules/module-raop-sink.c
+++ b/src/modules/module-raop-sink.c
@@ -127,9 +127,31 @@ static const char* const valid_modargs[] = {
};
enum {
- SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX
+ SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
+ SINK_MESSAGE_RIP_SOCKET
};
+static void on_connection(PA_GCC_UNUSED int fd, void*userdata) {
+ struct userdata *u = userdata;
+ pa_assert(u);
+
+ pa_assert(u->fd < 0);
+ u->fd = fd;
+
+ pa_log_debug("Connection authenticated, handing fd to IO thread...");
+
+ pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
+}
+
+static void on_close(void*userdata) {
+ struct userdata *u = userdata;
+ pa_assert(u);
+
+ pa_log_debug("Connection closed, informing IO thread...");
+
+ pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
+}
+
static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
@@ -143,14 +165,27 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
pa_assert(PA_SINK_OPENED(u->sink->thread_info.state));
pa_smoother_pause(u->smoother, pa_rtclock_usec());
+
+ /* Issue a FLUSH if we are connected */
+ if (u->fd >= 0) {
+ pa_raop_flush(u->raop);
+ }
break;
case PA_SINK_IDLE:
case PA_SINK_RUNNING:
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
pa_smoother_resume(u->smoother, pa_rtclock_usec());
+ /* The connection can be closed when idle, so check to
+ see if we need to reestablish it */
+ if (u->fd < 0)
+ pa_raop_connect(u->raop);
+ else
+ pa_raop_flush(u->raop);
+ }
+
break;
case PA_SINK_UNLINKED:
@@ -179,8 +214,34 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
pollfd->fd = u->fd;
pollfd->events = POLLOUT;
- pollfd->revents = 0;
+ /*pollfd->events = */pollfd->revents = 0;
+
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+ /* Our stream has been suspended so we just flush it.... */
+ pa_raop_flush(u->raop);
+ }
+ return 0;
+ }
+
+ case SINK_MESSAGE_RIP_SOCKET: {
+ pa_assert(u->fd >= 0);
+ pa_close(u->fd);
+ u->fd = -1;
+
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+
+ pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
+
+ if (u->rtpoll_item)
+ pa_rtpoll_item_free(u->rtpoll_item);
+ u->rtpoll_item = NULL;
+ } else {
+ /* Quesiton: is this valid here: or should we do some sort of:
+ return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
+ ?? */
+ pa_module_unload_request(u->module);
+ }
return 0;
}
}
@@ -215,7 +276,7 @@ static void thread_func(void *userdata) {
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
/* Render some data and write it to the fifo */
- if (pollfd->revents) {
+ if (/*PA_SINK_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
pa_usec_t usec;
int64_t n;
void *p;
@@ -264,9 +325,10 @@ static void thread_func(void *userdata) {
u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
} else {
/* We render some silence into our memchunk */
- u->encoding_overhead += u->next_encoding_overhead;
memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
pa_memblock_ref(silence.memblock);
+
+ /* Calculate/store some values to be used with the smoother */
u->next_encoding_overhead = silence_overhead;
u->encoding_ratio = silence_ratio;
}
@@ -302,12 +364,15 @@ static void thread_func(void *userdata) {
pollfd->revents = 0;
- if (u->encoded_memchunk.length > 0)
+ if (u->encoded_memchunk.length > 0) {
+ /* we've completely written the encoded data, so update our overhead */
+ u->encoding_overhead += u->next_encoding_overhead;
/* OK, we wrote less that we asked for,
* hence we can assume that the socket
* buffers are full now */
goto filled_up;
+ }
}
}
@@ -338,7 +403,7 @@ static void thread_func(void *userdata) {
}
/* Hmm, nothing to do. Let's sleep */
- /* pollfd->events = PA_SINK_OPENED(u->sink->thread_info.state) ? POLLOUT : 0; */
+ pollfd->events = POLLOUT; /*PA_SINK_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
}
if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
@@ -353,8 +418,16 @@ static void thread_func(void *userdata) {
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
if (pollfd->revents & ~POLLOUT) {
- pa_log("FIFO shutdown.");
- goto fail;
+ if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
+ pa_log("FIFO shutdown.");
+ goto fail;
+ }
+
+ /* We expect this to happen on occasion if we are not sending data.
+ It's perfectly natural and normal and natural */
+ if (u->rtpoll_item)
+ pa_rtpoll_item_free(u->rtpoll_item);
+ u->rtpoll_item = NULL;
}
}
}
@@ -371,26 +444,6 @@ finish:
pa_log_debug("Thread shutting down");
}
-static void on_connection(PA_GCC_UNUSED int fd, void*userdata) {
- struct userdata *u = userdata;
- pa_assert(u);
-
- pa_assert(u->fd < 0);
- u->fd = fd;
-
- pa_log_debug("Connection authenticated, handing fd to IO thread...");
-
- pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
-}
-
-static void on_close(void*userdata) {
- struct userdata *u = userdata;
- pa_assert(u);
-
- pa_log_debug("Control connection closed.");
- pa_module_unload_request(u->module);
-}
-
int pa__init(pa_module*m) {
struct userdata *u = NULL;
const char *p;