diff options
| -rw-r--r-- | src/modules/module-raop-sink.c | 111 | 
1 files changed, 82 insertions, 29 deletions
diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index 96c98a6f..51c2368a 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -127,9 +127,31 @@ static const char* const valid_modargs[] = {  };  enum { -    SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX +    SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX, +    SINK_MESSAGE_RIP_SOCKET  }; +static void on_connection(PA_GCC_UNUSED int fd, void*userdata) { +    struct userdata *u = userdata; +    pa_assert(u); + +    pa_assert(u->fd < 0); +    u->fd = fd; + +    pa_log_debug("Connection authenticated, handing fd to IO thread..."); + +    pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL); +} + +static void on_close(void*userdata) { +    struct userdata *u = userdata; +    pa_assert(u); + +    pa_log_debug("Connection closed, informing IO thread..."); + +    pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL); +} +  static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {      struct userdata *u = PA_SINK(o)->userdata; @@ -143,14 +165,27 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse                      pa_assert(PA_SINK_OPENED(u->sink->thread_info.state));                      pa_smoother_pause(u->smoother, pa_rtclock_usec()); + +                    /* Issue a FLUSH if we are connected */ +                    if (u->fd >= 0) { +                        pa_raop_flush(u->raop); +                    }                      break;                  case PA_SINK_IDLE:                  case PA_SINK_RUNNING: -                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED) +                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {                          pa_smoother_resume(u->smoother, pa_rtclock_usec()); +                        /* The connection can be closed when idle, so check to +                           see if we need to reestablish it */ +                        if (u->fd < 0) +                            pa_raop_connect(u->raop); +                        else +                            pa_raop_flush(u->raop); +                    } +                      break;                  case PA_SINK_UNLINKED: @@ -179,8 +214,34 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse              pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);              pollfd->fd = u->fd;              pollfd->events = POLLOUT; -            pollfd->revents = 0; +            /*pollfd->events = */pollfd->revents = 0; + +            if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { +                /* Our stream has been suspended so we just flush it.... */ +                pa_raop_flush(u->raop); +            } +            return 0; +        } + +        case SINK_MESSAGE_RIP_SOCKET: { +            pa_assert(u->fd >= 0); +            pa_close(u->fd); +            u->fd = -1; + +            if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + +                pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later"); + +                if (u->rtpoll_item) +                    pa_rtpoll_item_free(u->rtpoll_item); +                u->rtpoll_item = NULL; +            } else { +                /* Quesiton: is this valid here: or should we do some sort of: +                   return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL); +                   ?? */ +                pa_module_unload_request(u->module); +            }              return 0;          }      } @@ -215,7 +276,7 @@ static void thread_func(void *userdata) {              pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);              /* Render some data and write it to the fifo */ -            if (pollfd->revents) { +            if (/*PA_SINK_OPENED(u->sink->thread_info.state) && */pollfd->revents) {                  pa_usec_t usec;                  int64_t n;                  void *p; @@ -264,9 +325,10 @@ static void thread_func(void *userdata) {                              u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);                          } else {                              /* We render some silence into our memchunk */ -                            u->encoding_overhead += u->next_encoding_overhead;                              memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));                              pa_memblock_ref(silence.memblock); + +                            /* Calculate/store some values to be used with the smoother */                              u->next_encoding_overhead = silence_overhead;                              u->encoding_ratio = silence_ratio;                          } @@ -302,12 +364,15 @@ static void thread_func(void *userdata) {                          pollfd->revents = 0; -                        if (u->encoded_memchunk.length > 0) +                        if (u->encoded_memchunk.length > 0) { +                            /* we've completely written the encoded data, so update our overhead */ +                            u->encoding_overhead += u->next_encoding_overhead;                              /* OK, we wrote less that we asked for,                               * hence we can assume that the socket                               * buffers are full now */                              goto filled_up; +                        }                      }                  } @@ -338,7 +403,7 @@ static void thread_func(void *userdata) {              }              /* Hmm, nothing to do. Let's sleep */ -            /* pollfd->events = PA_SINK_OPENED(u->sink->thread_info.state)  ? POLLOUT : 0; */ +            pollfd->events = POLLOUT; /*PA_SINK_OPENED(u->sink->thread_info.state)  ? POLLOUT : 0;*/          }          if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) @@ -353,8 +418,16 @@ static void thread_func(void *userdata) {              pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);              if (pollfd->revents & ~POLLOUT) { -                pa_log("FIFO shutdown."); -                goto fail; +                if (u->sink->thread_info.state != PA_SINK_SUSPENDED) { +                    pa_log("FIFO shutdown."); +                    goto fail; +                } + +                /* We expect this to happen on occasion if we are not sending data. +                   It's perfectly natural and normal and natural */ +                if (u->rtpoll_item) +                    pa_rtpoll_item_free(u->rtpoll_item); +                u->rtpoll_item = NULL;              }          }      } @@ -371,26 +444,6 @@ finish:      pa_log_debug("Thread shutting down");  } -static void on_connection(PA_GCC_UNUSED int fd, void*userdata) { -    struct userdata *u = userdata; -    pa_assert(u); - -    pa_assert(u->fd < 0); -    u->fd = fd; - -    pa_log_debug("Connection authenticated, handing fd to IO thread..."); - -    pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL); -} - -static void on_close(void*userdata) { -    struct userdata *u = userdata; -    pa_assert(u); - -    pa_log_debug("Control connection closed."); -    pa_module_unload_request(u->module); -} -  int pa__init(pa_module*m) {      struct userdata *u = NULL;      const char *p;  | 
