diff options
Diffstat (limited to 'src/modules/module-esound-sink.c')
| -rw-r--r-- | src/modules/module-esound-sink.c | 480 | 
1 files changed, 348 insertions, 132 deletions
diff --git a/src/modules/module-esound-sink.c b/src/modules/module-esound-sink.c index faa77a54..8b46637e 100644 --- a/src/modules/module-esound-sink.c +++ b/src/modules/module-esound-sink.c @@ -28,14 +28,23 @@  #include <stdlib.h>  #include <sys/stat.h>  #include <stdio.h> -#include <assert.h>  #include <errno.h>  #include <string.h>  #include <fcntl.h>  #include <unistd.h>  #include <limits.h> +#include <poll.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/ioctl.h> + +#ifdef HAVE_LINUX_SOCKIOS_H +#include <linux/sockios.h> +#endif  #include <pulse/xmalloc.h> +#include <pulse/timeval.h>  #include <pulsecore/core-error.h>  #include <pulsecore/iochannel.h> @@ -47,27 +56,37 @@  #include <pulsecore/socket-client.h>  #include <pulsecore/esound.h>  #include <pulsecore/authkey.h> +#include <pulsecore/thread-mq.h> +#include <pulsecore/thread.h> +#include <pulsecore/time-smoother.h> +#include <pulsecore/rtclock.h> +#include <pulsecore/socket-util.h>  #include "module-esound-sink-symdef.h"  PA_MODULE_AUTHOR("Lennart Poettering")  PA_MODULE_DESCRIPTION("ESOUND Sink")  PA_MODULE_VERSION(PACKAGE_VERSION) -PA_MODULE_USAGE("sink_name=<name for the sink> server=<address> cookie=<filename>  format=<sample format> channels=<number of channels> rate=<sample rate>") +PA_MODULE_USAGE( +        "sink_name=<name for the sink> " +        "server=<address> cookie=<filename>  " +        "format=<sample format> " +        "channels=<number of channels> " +        "rate=<sample rate>") -#define DEFAULT_SINK_NAME "esound_output" +#define DEFAULT_SINK_NAME "esound_out"  struct userdata {      pa_core *core; - +    pa_module *module;      pa_sink *sink; -    pa_iochannel *io; -    pa_socket_client *client; -    pa_defer_event *defer_event; +    pa_thread_mq thread_mq; +    pa_rtpoll *rtpoll; +    pa_rtpoll_item *rtpoll_item; +    pa_thread *thread;      pa_memchunk memchunk; -    pa_module *module;      void *write_data;      size_t write_length, write_index; @@ -75,12 +94,28 @@ struct userdata {      void *read_data;      size_t read_length, read_index; -    enum { STATE_AUTH, STATE_LATENCY, STATE_RUNNING, STATE_DEAD } state; +    enum { +        STATE_AUTH, +        STATE_LATENCY, +        STATE_PREPARE, +        STATE_RUNNING, +        STATE_DEAD +    } state;      pa_usec_t latency;      esd_format_t format;      int32_t rate; + +    pa_smoother *smoother; +    int fd; + +    int64_t offset; + +    pa_iochannel *io; +    pa_socket_client *client; + +    size_t block_size;  };  static const char* const valid_modargs[] = { @@ -93,42 +128,211 @@ static const char* const valid_modargs[] = {      NULL  }; -static void cancel(struct userdata *u) { -    assert(u); +enum { +    SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX +}; -    u->state = STATE_DEAD; +static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { +    struct userdata *u = PA_SINK(o)->userdata; -    if (u->io) { -        pa_iochannel_free(u->io); -        u->io = NULL; -    } +    switch (code) { -    if (u->defer_event) { -        u->core->mainloop->defer_free(u->defer_event); -        u->defer_event = NULL; -    } +        case PA_SINK_MESSAGE_SET_STATE: -    if (u->sink) { -        pa_sink_disconnect(u->sink); -        pa_sink_unref(u->sink); -        u->sink = NULL; +            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { + +                case PA_SINK_SUSPENDED: +                    pa_assert(PA_SINK_OPENED(u->sink->thread_info.state)); + +                    pa_smoother_pause(u->smoother, pa_rtclock_usec()); +                    break; + +                case PA_SINK_IDLE: +                case PA_SINK_RUNNING: + +                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED) +                        pa_smoother_resume(u->smoother, pa_rtclock_usec()); + +                    break; + +                case PA_SINK_UNLINKED: +                case PA_SINK_INIT: +                    ; +            } + +            break; + +        case PA_SINK_MESSAGE_GET_LATENCY: { +            pa_usec_t w, r; + +            r = pa_smoother_get(u->smoother, pa_rtclock_usec()); +            w = pa_bytes_to_usec(u->offset + u->memchunk.length, &u->sink->sample_spec); + +            *((pa_usec_t*) data) = w > r ? w - r : 0; +            break; +        } + +        case SINK_MESSAGE_PASS_SOCKET: { +            struct pollfd *pollfd; + +            pa_assert(!u->rtpoll_item); + +            u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); +            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +            pollfd->fd = u->fd; +            pollfd->events = pollfd->revents = 0; + +            return 0; +        }      } -    if (u->module) { -        pa_module_unload_request(u->module); -        u->module = NULL; +    return pa_sink_process_msg(o, code, data, offset, chunk); +} + +static void thread_func(void *userdata) { +    struct userdata *u = userdata; +    int write_type = 0; + +    pa_assert(u); + +    pa_log_debug("Thread starting up"); + +    pa_thread_mq_install(&u->thread_mq); +    pa_rtpoll_install(u->rtpoll); + +    pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec()); + +    for (;;) { +        int ret; + +        if (u->rtpoll_item) { +            struct pollfd *pollfd; +            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + +            /* Render some data and write it to the fifo */ +            if (PA_SINK_OPENED(u->sink->thread_info.state) && pollfd->revents) { +                pa_usec_t usec; +                int64_t n; + +                for (;;) { +                    ssize_t l; +                    void *p; + +                    if (u->memchunk.length <= 0) +                        pa_sink_render(u->sink, u->block_size, &u->memchunk); + +                    pa_assert(u->memchunk.length > 0); + +                    p = pa_memblock_acquire(u->memchunk.memblock); +                    l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type); +                    pa_memblock_release(u->memchunk.memblock); + +                    pa_assert(l != 0); + +                    if (l < 0) { + +                        if (errno == EINTR) +                            continue; +                        else if (errno == EAGAIN) { + +                            /* OK, we filled all socket buffers up +                             * now. */ +                            goto filled_up; + +                        } else { +                            pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); +                            goto fail; +                        } + +                    } else { +                        u->offset += l; + +                        u->memchunk.index += l; +                        u->memchunk.length -= l; + +                        if (u->memchunk.length <= 0) { +                            pa_memblock_unref(u->memchunk.memblock); +                            pa_memchunk_reset(&u->memchunk); +                        } + +                        pollfd->revents = 0; + +                        if (u->memchunk.length > 0) + +                            /* OK, we wrote less that we asked for, +                             * hence we can assume that the socket +                             * buffers are full now */ +                            goto filled_up; +                    } +                } + +            filled_up: + +                /* At this spot we know that the socket buffers are +                 * fully filled up. This is the best time to estimate +                 * the playback position of the server */ + +                n = u->offset; + +#ifdef SIOCOUTQ +                { +                    int l; +                    if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0) +                        n -= l; +                } +#endif + +                usec = pa_bytes_to_usec(n, &u->sink->sample_spec); + +                if (usec > u->latency) +                    usec -= u->latency; +                else +                    usec = 0; + +                pa_smoother_put(u->smoother, pa_rtclock_usec(), usec); +            } + +            /* Hmm, nothing to do. Let's sleep */ +            pollfd->events = PA_SINK_OPENED(u->sink->thread_info.state)  ? POLLOUT : 0; +        } + +        if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) +            goto fail; + +        if (ret == 0) +            goto finish; + +        if (u->rtpoll_item) { +            struct pollfd* pollfd; + +            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + +            if (pollfd->revents & ~POLLOUT) { +                pa_log("FIFO shutdown."); +                goto fail; +            } +        }      } + +fail: +    /* If this was no regular exit from the loop we have to continue +     * processing messages until we received PA_MESSAGE_SHUTDOWN */ +    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); +    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); + +finish: +    pa_log_debug("Thread shutting down");  }  static int do_write(struct userdata *u) {      ssize_t r; -    assert(u); +    pa_assert(u);      if (!pa_iochannel_is_writable(u->io))          return 0;      if (u->write_data) { -        assert(u->write_index < u->write_length); +        pa_assert(u->write_index < u->write_length);          if ((r = pa_iochannel_write(u->io, (uint8_t*) u->write_data + u->write_index, u->write_length - u->write_index)) <= 0) {              pa_log("write() failed: %s", pa_cstrerror(errno)); @@ -136,52 +340,44 @@ static int do_write(struct userdata *u) {          }          u->write_index += r; -        assert(u->write_index <= u->write_length); +        pa_assert(u->write_index <= u->write_length);          if (u->write_index == u->write_length) { -            free(u->write_data); +            pa_xfree(u->write_data);              u->write_data = NULL;              u->write_index = u->write_length = 0;          } -    } else if (u->state == STATE_RUNNING) { -        void *p; - -        pa_module_set_used(u->module, pa_sink_used_by(u->sink)); +    } -        if (!u->memchunk.length) -            if (pa_sink_render(u->sink, 8192, &u->memchunk) < 0) -                return 0; +    if (!u->write_data && u->state == STATE_PREPARE) { +        /* OK, we're done with sending all control data we need to, so +         * let's hand the socket over to the IO thread now */ -        assert(u->memchunk.memblock); -        assert(u->memchunk.length); +        pa_assert(u->fd < 0); +        u->fd = pa_iochannel_get_send_fd(u->io); -        p = pa_memblock_acquire(u->memchunk.memblock); +        pa_iochannel_set_noclose(u->io, TRUE); +        pa_iochannel_free(u->io); +        u->io = NULL; -        if ((r = pa_iochannel_write(u->io, (uint8_t*) p + u->memchunk.index, u->memchunk.length)) < 0) { -            pa_memblock_release(u->memchunk.memblock); -            pa_log("write() failed: %s", pa_cstrerror(errno)); -            return -1; -        } -        pa_memblock_release(u->memchunk.memblock); +        pa_make_tcp_socket_low_delay(u->fd); -        u->memchunk.index += r; -        u->memchunk.length -= r; +        pa_log_info("Connection authenticated, handing fd to IO thread..."); -        if (u->memchunk.length <= 0) { -            pa_memblock_unref(u->memchunk.memblock); -            u->memchunk.memblock = NULL; -        } +        pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL); +        u->state = STATE_RUNNING;      }      return 0;  }  static int handle_response(struct userdata *u) { -    assert(u); +    pa_assert(u);      switch (u->state) { +          case STATE_AUTH: -            assert(u->read_length == sizeof(int32_t)); +            pa_assert(u->read_length == sizeof(int32_t));              /* Process auth data */              if (!*(int32_t*) u->read_data) { @@ -190,14 +386,14 @@ static int handle_response(struct userdata *u) {              }              /* Request latency data */ -            assert(!u->write_data); +            pa_assert(!u->write_data);              *(int32_t*) (u->write_data = pa_xmalloc(u->write_length = sizeof(int32_t))) = ESD_PROTO_LATENCY;              u->write_index = 0;              u->state = STATE_LATENCY;              /* Space for next response */ -            assert(u->read_length >= sizeof(int32_t)); +            pa_assert(u->read_length >= sizeof(int32_t));              u->read_index = 0;              u->read_length = sizeof(int32_t); @@ -205,7 +401,7 @@ static int handle_response(struct userdata *u) {          case STATE_LATENCY: {              int32_t *p; -            assert(u->read_length == sizeof(int32_t)); +            pa_assert(u->read_length == sizeof(int32_t));              /* Process latency info */              u->latency = (pa_usec_t) ((double) (*(int32_t*) u->read_data) * 1000000 / 44100); @@ -215,7 +411,7 @@ static int handle_response(struct userdata *u) {              }              /* Create stream */ -            assert(!u->write_data); +            pa_assert(!u->write_data);              p = u->write_data = pa_xmalloc0(u->write_length = sizeof(int32_t)*3+ESD_NAME_MAX);              *(p++) = ESD_PROTO_STREAM_PLAY;              *(p++) = u->format; @@ -223,7 +419,7 @@ static int handle_response(struct userdata *u) {              pa_strlcpy((char*) p, "PulseAudio Tunnel", ESD_NAME_MAX);              u->write_index = 0; -            u->state = STATE_RUNNING; +            u->state = STATE_PREPARE;              /* Don't read any further */              pa_xfree(u->read_data); @@ -234,14 +430,14 @@ static int handle_response(struct userdata *u) {          }          default: -            abort(); +            pa_assert_not_reached();      }      return 0;  }  static int do_read(struct userdata *u) { -    assert(u); +    pa_assert(u);      if (!pa_iochannel_is_readable(u->io))          return 0; @@ -252,16 +448,15 @@ static int do_read(struct userdata *u) {          if (!u->read_data)              return 0; -        assert(u->read_index < u->read_length); +        pa_assert(u->read_index < u->read_length);          if ((r = pa_iochannel_read(u->io, (uint8_t*) u->read_data + u->read_index, u->read_length - u->read_index)) <= 0) {              pa_log("read() failed: %s", r < 0 ? pa_cstrerror(errno) : "EOF"); -            cancel(u);              return -1;          }          u->read_index += r; -        assert(u->read_index <= u->read_length); +        pa_assert(u->read_index <= u->read_length);          if (u->read_index == u->read_length)              return handle_response(u); @@ -270,42 +465,19 @@ static int do_read(struct userdata *u) {      return 0;  } -static void do_work(struct userdata *u) { -    assert(u); - -    u->core->mainloop->defer_enable(u->defer_event, 0); - -    if (do_read(u) < 0 || do_write(u) < 0) -        cancel(u); -} - -static void notify_cb(pa_sink*s) { -    struct userdata *u = s->userdata; -    assert(s && u); - -    if (pa_iochannel_is_writable(u->io)) -        u->core->mainloop->defer_enable(u->defer_event, 1); -} - -static pa_usec_t get_latency_cb(pa_sink *s) { -    struct userdata *u = s->userdata; -    assert(s && u); +static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void*userdata) { +    struct userdata *u = userdata; +    pa_assert(u); -    return -        u->latency + -        (u->memchunk.memblock ? pa_bytes_to_usec(u->memchunk.length, &s->sample_spec) : 0); -} +    if (do_read(u) < 0 || do_write(u) < 0) { -static void defer_callback(PA_GCC_UNUSED pa_mainloop_api *m, PA_GCC_UNUSED pa_defer_event*e, void *userdata) { -    struct userdata *u = userdata; -    assert(u); -    do_work(u); -} +        if (u->io) { +            pa_iochannel_free(u->io); +            u->io = NULL; +        } -static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void*userdata) { -    struct userdata *u = userdata; -    assert(u); -    do_work(u); +       pa_module_unload_request(u->module); +    }  }  static void on_connection(PA_GCC_UNUSED pa_socket_client *c, pa_iochannel*io, void *userdata) { @@ -315,30 +487,34 @@ static void on_connection(PA_GCC_UNUSED pa_socket_client *c, pa_iochannel*io, vo      u->client = NULL;      if (!io) { -        pa_log("connection failed: %s", pa_cstrerror(errno)); -        cancel(u); +        pa_log("Connection failed: %s", pa_cstrerror(errno)); +        pa_module_unload_request(u->module);          return;      } +    pa_assert(!u->io);      u->io = io;      pa_iochannel_set_callback(u->io, io_callback, u); + +    pa_log_info("Connection established, authenticating ...");  } -int pa__init(pa_core *c, pa_module*m) { +int pa__init(pa_module*m) {      struct userdata *u = NULL;      const char *p;      pa_sample_spec ss;      pa_modargs *ma = NULL;      char *t; +    const char *espeaker; -    assert(c && m); +    pa_assert(m);      if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {          pa_log("failed to parse module arguments");          goto fail;      } -    ss = c->default_sample_spec; +    ss = m->core->default_sample_spec;      if (pa_modargs_get_sample_spec(ma, &ss) < 0) {          pa_log("invalid sample format specification");          goto fail; @@ -350,37 +526,62 @@ int pa__init(pa_core *c, pa_module*m) {          goto fail;      } -    u = pa_xmalloc0(sizeof(struct userdata)); -    u->core = c; +    u = pa_xnew0(struct userdata, 1); +    u->core = m->core;      u->module = m;      m->userdata = u; +    u->fd = -1; +    u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE); +    pa_memchunk_reset(&u->memchunk); +    u->offset = 0; + +    pa_thread_mq_init(&u->thread_mq, m->core->mainloop); +    u->rtpoll = pa_rtpoll_new(); +    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq); +    u->rtpoll_item = NULL; +      u->format =          (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |          (ss.channels == 2 ? ESD_STEREO : ESD_MONO);      u->rate = ss.rate; -    u->sink = NULL; -    u->client = NULL; -    u->io = NULL; +    u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss); +      u->read_data = u->write_data = NULL;      u->read_index = u->write_index = u->read_length = u->write_length = 0; +      u->state = STATE_AUTH;      u->latency = 0; -    if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, NULL))) { +    if (!(u->sink = pa_sink_new(m->core, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, NULL))) {          pa_log("failed to create sink.");          goto fail;      } -    if (!(u->client = pa_socket_client_new_string(u->core->mainloop, p = pa_modargs_get_value(ma, "server", ESD_UNIX_SOCKET_NAME), ESD_DEFAULT_PORT))) { -        pa_log("failed to connect to server."); +    u->sink->parent.process_msg = sink_process_msg; +    u->sink->userdata = u; +    u->sink->flags = PA_SINK_LATENCY; + +    pa_sink_set_module(u->sink, m); +    pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); +    pa_sink_set_rtpoll(u->sink, u->rtpoll); + +    if (!(espeaker = getenv("ESPEAKER"))) +        espeaker = ESD_UNIX_SOCKET_NAME; + +    if (!(u->client = pa_socket_client_new_string(u->core->mainloop, p = pa_modargs_get_value(ma, "server", espeaker), ESD_DEFAULT_PORT))) { +        pa_log("Failed to connect to server.");          goto fail;      } + +    pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Esound sink '%s'", p)); +    pa_xfree(t); +      pa_socket_client_set_callback(u->client, on_connection, u);      /* Prepare the initial request */      u->write_data = pa_xmalloc(u->write_length = ESD_KEY_LEN + sizeof(int32_t));      if (pa_authkey_load_auto(pa_modargs_get_value(ma, "cookie", ".esd_auth"), u->write_data, ESD_KEY_LEN) < 0) { -        pa_log("failed to load cookie"); +        pa_log("Failed to load cookie");          goto fail;      }      *(int32_t*) ((uint8_t*) u->write_data + ESD_KEY_LEN) = ESD_ENDIAN_KEY; @@ -388,19 +589,12 @@ int pa__init(pa_core *c, pa_module*m) {      /* Reserve space for the response */      u->read_data = pa_xmalloc(u->read_length = sizeof(int32_t)); -    u->sink->notify = notify_cb; -    u->sink->get_latency = get_latency_cb; -    u->sink->userdata = u; -    pa_sink_set_owner(u->sink, m); -    pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Esound sink '%s'", p)); -    pa_xfree(t); - -    u->memchunk.memblock = NULL; -    u->memchunk.length = 0; - -    u->defer_event = c->mainloop->defer_new(c->mainloop, defer_callback, u); -    c->mainloop->defer_enable(u->defer_event, 0); +    if (!(u->thread = pa_thread_new(thread_func, u))) { +        pa_log("Failed to create thread."); +        goto fail; +    } +    pa_sink_put(u->sink);      pa_modargs_free(ma); @@ -410,20 +604,39 @@ fail:      if (ma)          pa_modargs_free(ma); -    pa__done(c, m); +    pa__done(m);      return -1;  } -void pa__done(pa_core *c, pa_module*m) { +void pa__done(pa_module*m) {      struct userdata *u; -    assert(c && m); +    pa_assert(m);      if (!(u = m->userdata))          return; -    u->module = NULL; -    cancel(u); +    if (u->sink) +        pa_sink_unlink(u->sink); + +    if (u->thread) { +        pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); +        pa_thread_free(u->thread); +    } + +    pa_thread_mq_done(&u->thread_mq); + +    if (u->sink) +        pa_sink_unref(u->sink); + +    if (u->io) +        pa_iochannel_free(u->io); + +    if (u->rtpoll_item) +        pa_rtpoll_item_free(u->rtpoll_item); + +    if (u->rtpoll) +        pa_rtpoll_free(u->rtpoll);      if (u->memchunk.memblock)          pa_memblock_unref(u->memchunk.memblock); @@ -434,8 +647,11 @@ void pa__done(pa_core *c, pa_module*m) {      pa_xfree(u->read_data);      pa_xfree(u->write_data); -    pa_xfree(u); -} - +    if (u->smoother) +        pa_smoother_free(u->smoother); +    if (u->fd >= 0) +        pa_close(u->fd); +    pa_xfree(u); +}  | 
