diff options
Diffstat (limited to 'src/modules/module-raop-sink.c')
-rw-r--r-- | src/modules/module-raop-sink.c | 111 |
1 files changed, 82 insertions, 29 deletions
diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index 96c98a6f..51c2368a 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -127,9 +127,31 @@ static const char* const valid_modargs[] = { }; enum { - SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX + SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX, + SINK_MESSAGE_RIP_SOCKET }; +static void on_connection(PA_GCC_UNUSED int fd, void*userdata) { + struct userdata *u = userdata; + pa_assert(u); + + pa_assert(u->fd < 0); + u->fd = fd; + + pa_log_debug("Connection authenticated, handing fd to IO thread..."); + + pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL); +} + +static void on_close(void*userdata) { + struct userdata *u = userdata; + pa_assert(u); + + pa_log_debug("Connection closed, informing IO thread..."); + + pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL); +} + static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; @@ -143,14 +165,27 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pa_assert(PA_SINK_OPENED(u->sink->thread_info.state)); pa_smoother_pause(u->smoother, pa_rtclock_usec()); + + /* Issue a FLUSH if we are connected */ + if (u->fd >= 0) { + pa_raop_flush(u->raop); + } break; case PA_SINK_IDLE: case PA_SINK_RUNNING: - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { pa_smoother_resume(u->smoother, pa_rtclock_usec()); + /* The connection can be closed when idle, so check to + see if we need to reestablish it */ + if (u->fd < 0) + pa_raop_connect(u->raop); + else + pa_raop_flush(u->raop); + } + break; case PA_SINK_UNLINKED: @@ -179,8 +214,34 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); pollfd->fd = u->fd; pollfd->events = POLLOUT; - pollfd->revents = 0; + /*pollfd->events = */pollfd->revents = 0; + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + /* Our stream has been suspended so we just flush it.... */ + pa_raop_flush(u->raop); + } + return 0; + } + + case SINK_MESSAGE_RIP_SOCKET: { + pa_assert(u->fd >= 0); + pa_close(u->fd); + u->fd = -1; + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + + pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later"); + + if (u->rtpoll_item) + pa_rtpoll_item_free(u->rtpoll_item); + u->rtpoll_item = NULL; + } else { + /* Quesiton: is this valid here: or should we do some sort of: + return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL); + ?? */ + pa_module_unload_request(u->module); + } return 0; } } @@ -215,7 +276,7 @@ static void thread_func(void *userdata) { pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); /* Render some data and write it to the fifo */ - if (pollfd->revents) { + if (/*PA_SINK_OPENED(u->sink->thread_info.state) && */pollfd->revents) { pa_usec_t usec; int64_t n; void *p; @@ -264,9 +325,10 @@ static void thread_func(void *userdata) { u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length); } else { /* We render some silence into our memchunk */ - u->encoding_overhead += u->next_encoding_overhead; memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk)); pa_memblock_ref(silence.memblock); + + /* Calculate/store some values to be used with the smoother */ u->next_encoding_overhead = silence_overhead; u->encoding_ratio = silence_ratio; } @@ -302,12 +364,15 @@ static void thread_func(void *userdata) { pollfd->revents = 0; - if (u->encoded_memchunk.length > 0) + if (u->encoded_memchunk.length > 0) { + /* we've completely written the encoded data, so update our overhead */ + u->encoding_overhead += u->next_encoding_overhead; /* OK, we wrote less that we asked for, * hence we can assume that the socket * buffers are full now */ goto filled_up; + } } } @@ -338,7 +403,7 @@ static void thread_func(void *userdata) { } /* Hmm, nothing to do. Let's sleep */ - /* pollfd->events = PA_SINK_OPENED(u->sink->thread_info.state) ? POLLOUT : 0; */ + pollfd->events = POLLOUT; /*PA_SINK_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/ } if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) @@ -353,8 +418,16 @@ static void thread_func(void *userdata) { pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); if (pollfd->revents & ~POLLOUT) { - pa_log("FIFO shutdown."); - goto fail; + if (u->sink->thread_info.state != PA_SINK_SUSPENDED) { + pa_log("FIFO shutdown."); + goto fail; + } + + /* We expect this to happen on occasion if we are not sending data. + It's perfectly natural and normal and natural */ + if (u->rtpoll_item) + pa_rtpoll_item_free(u->rtpoll_item); + u->rtpoll_item = NULL; } } } @@ -371,26 +444,6 @@ finish: pa_log_debug("Thread shutting down"); } -static void on_connection(PA_GCC_UNUSED int fd, void*userdata) { - struct userdata *u = userdata; - pa_assert(u); - - pa_assert(u->fd < 0); - u->fd = fd; - - pa_log_debug("Connection authenticated, handing fd to IO thread..."); - - pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL); -} - -static void on_close(void*userdata) { - struct userdata *u = userdata; - pa_assert(u); - - pa_log_debug("Control connection closed."); - pa_module_unload_request(u->module); -} - int pa__init(pa_module*m) { struct userdata *u = NULL; const char *p; |