diff options
Diffstat (limited to 'src/protocol-simple.c')
| -rw-r--r-- | src/protocol-simple.c | 162 | 
1 files changed, 116 insertions, 46 deletions
diff --git a/src/protocol-simple.c b/src/protocol-simple.c index 91eab59a..3e2e7fda 100644 --- a/src/protocol-simple.c +++ b/src/protocol-simple.c @@ -18,6 +18,12 @@ struct connection {      struct pa_source_output *source_output;      struct pa_client *client;      struct pa_memblockq *input_memblockq, *output_memblockq; +    void *fixed_source; + +    struct { +        struct pa_memblock *current_memblock; +        size_t memblock_index, fragment_size; +    } playback;  };  struct pa_protocol_simple { @@ -29,13 +35,17 @@ struct pa_protocol_simple {  };  #define PLAYBACK_BUFFER_SECONDS (.5) +#define PLAYBACK_BUFFER_FRAGMENTS (10)  #define RECORD_BUFFER_SECONDS (5) +#define RECORD_BUFFER_FRAGMENTS (100)  static void connection_free(struct connection *c) {      assert(c);      pa_idxset_remove_by_data(c->protocol->connections, c, NULL); +    if (c->playback.current_memblock) +        pa_memblock_unref(c->playback.current_memblock);      if (c->sink_input)          pa_sink_input_free(c->sink_input);      if (c->source_output) @@ -48,34 +58,49 @@ static void connection_free(struct connection *c) {          pa_memblockq_free(c->input_memblockq);      if (c->output_memblockq)          pa_memblockq_free(c->output_memblockq); +    if (c->fixed_source) +        c->protocol->core->mainloop->cancel_fixed(c->protocol->core->mainloop, c->fixed_source);      free(c);  } +  static int do_read(struct connection *c) {      struct pa_memchunk chunk;      ssize_t r;      size_t l; -    if (!pa_iochannel_is_readable(c->io)) -        return 0; -          if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))          return 0; -    chunk.memblock = pa_memblock_new(l); -    assert(chunk.memblock); +    if (l > c->playback.fragment_size) +        l = c->playback.fragment_size; -    if ((r = pa_iochannel_read(c->io, chunk.memblock->data, l)) <= 0) { -        fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno)); -        pa_memblock_unref(chunk.memblock); +    if (c->playback.current_memblock)  +        if (c->playback.current_memblock->length - c->playback.memblock_index < l) { +            pa_memblock_unref(c->playback.current_memblock); +            c->playback.current_memblock = NULL; +            c->playback.memblock_index = 0; +        } + +    if (!c->playback.current_memblock) { +        c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2); +        assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); +        c->playback.memblock_index = 0; +    } +     +    if ((r = pa_iochannel_read(c->io, c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { +        fprintf(stderr, __FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));          return -1;      } -    chunk.memblock->length = chunk.length = r; -    chunk.index = 0; +    chunk.memblock = c->playback.current_memblock; +    chunk.index = c->playback.memblock_index; +    chunk.length = r; +    assert(chunk.memblock); +    c->playback.memblock_index += r; +          assert(c->input_memblockq);      pa_memblockq_push_align(c->input_memblockq, &chunk, 0); -    pa_memblock_unref(chunk.memblock);      assert(c->sink_input);      pa_sink_notify(c->sink_input->sink); @@ -86,29 +111,51 @@ static int do_write(struct connection *c) {      struct pa_memchunk chunk;      ssize_t r; -    if (!pa_iochannel_is_writable(c->io)) -        return 0; -          if (!c->source_output)          return 0;          assert(c->output_memblockq);      if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)          return 0; -         +          assert(chunk.memblock && chunk.length);      if ((r = pa_iochannel_write(c->io, chunk.memblock->data+chunk.index, chunk.length)) < 0) { -        fprintf(stderr, "write(): %s\n", strerror(errno));          pa_memblock_unref(chunk.memblock); +        fprintf(stderr, "write(): %s\n", strerror(errno));          return -1;      }      pa_memblockq_drop(c->output_memblockq, r);      pa_memblock_unref(chunk.memblock); +          return 0;  } + +static void do_work(struct connection *c) { +    assert(c); + +    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->enable_fixed); +    c->protocol->core->mainloop->enable_fixed(c->protocol->core->mainloop, c->fixed_source, 0); + +    if (pa_iochannel_is_hungup(c->io)) +        goto fail; +     +    if (pa_iochannel_is_writable(c->io)) +        if (do_write(c) < 0) +            goto fail; +     +    if (pa_iochannel_is_readable(c->io)) +        if (do_read(c) < 0) +            goto fail; + +    return; + +fail: +    connection_free(c); +} +  /*** sink_input callbacks ***/  static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) { @@ -127,9 +174,10 @@ static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {      assert(i && c && length);      pa_memblockq_drop(c->input_memblockq, length); -     -    if (do_read(c) < 0) -        connection_free(c); + +    /* do something */ +    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->enable_fixed); +    c->protocol->core->mainloop->enable_fixed(c->protocol->core->mainloop, c->fixed_source, 1);  }  static void sink_input_kill_cb(struct pa_sink_input *i) { @@ -152,8 +200,9 @@ static void source_output_push_cb(struct pa_source_output *o, const struct pa_me      pa_memblockq_push(c->output_memblockq, chunk, 0); -    if (do_write(c) < 0) -        connection_free(c); +    /* do something */ +    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->enable_fixed); +    c->protocol->core->mainloop->enable_fixed(c->protocol->core->mainloop, c->fixed_source, 1);  }  static void source_output_kill_cb(struct pa_source_output *o) { @@ -174,11 +223,19 @@ static void io_callback(struct pa_iochannel*io, void *userdata) {      struct connection *c = userdata;      assert(io && c && c->io == io); -    if (do_read(c) < 0 || do_write(c) < 0) -        connection_free(c); +    do_work(c);  } -/*** socket_server callbacks */ +/*** fixed callback ***/ + +void fixed_callback(struct pa_mainloop_api*a, void *id, void *userdata) { +    struct connection *c = userdata; +    assert(a && c && c->fixed_source == id); + +    do_work(c); +} + +/*** socket_server callbacks ***/  static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {      struct pa_protocol_simple *p = userdata; @@ -191,34 +248,19 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo      c->io = io;      c->sink_input = NULL;      c->source_output = NULL; +    c->fixed_source = NULL;      c->input_memblockq = c->output_memblockq = NULL;      c->protocol = p; - -    pa_iochannel_peer_to_string(io, cname, sizeof(cname)); +    c->playback.current_memblock = NULL; +    c->playback.memblock_index = 0; +    c->playback.fragment_size = 0; +     +    pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));      c->client = pa_client_new(p->core, "SIMPLE", cname);      assert(c->client);      c->client->kill = client_kill_cb;      c->client->userdata = c; -    if (p->mode & PA_PROTOCOL_SIMPLE_RECORD) { -        struct pa_source *source; -        size_t l; - -        if (!(source = pa_source_get_default(p->core))) { -            fprintf(stderr, "Failed to get default source.\n"); -            goto fail; -        } - -        c->source_output = pa_source_output_new(source, c->client->name, &p->sample_spec); -        assert(c->source_output); -        c->source_output->push = source_output_push_cb; -        c->source_output->kill = source_output_kill_cb; -        c->source_output->userdata = c; - -        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS); -        c->output_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, 0); -    } -      if (p->mode & PA_PROTOCOL_SIMPLE_PLAYBACK) {          struct pa_sink *sink;          size_t l; @@ -237,12 +279,40 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo          c->sink_input->userdata = c;          l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS); -        c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, l/10); +        c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, l/PLAYBACK_BUFFER_FRAGMENTS); +        assert(c->input_memblockq); +        pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); +        c->playback.fragment_size = l/10;      } +    if (p->mode & PA_PROTOCOL_SIMPLE_RECORD) { +        struct pa_source *source; +        size_t l; + +        if (!(source = pa_source_get_default(p->core))) { +            fprintf(stderr, "Failed to get default source.\n"); +            goto fail; +        } + +        c->source_output = pa_source_output_new(source, c->client->name, &p->sample_spec); +        assert(c->source_output); +        c->source_output->push = source_output_push_cb; +        c->source_output->kill = source_output_kill_cb; +        c->source_output->userdata = c; + +        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS); +        c->output_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), 0, 0); +        pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); +    } +      pa_iochannel_set_callback(c->io, io_callback, c);      pa_idxset_put(p->connections, c, NULL); + +    c->fixed_source = p->core->mainloop->source_fixed(p->core->mainloop, fixed_callback, c); +    assert(c->fixed_source); +    p->core->mainloop->enable_fixed(p->core->mainloop, c->fixed_source, 0); +          return;  fail:  | 
