diff options
| author | Lennart Poettering <lennart@poettering.net> | 2008-04-20 20:16:55 +0000 | 
|---|---|---|
| committer | Lennart Poettering <lennart@poettering.net> | 2008-04-20 20:16:55 +0000 | 
| commit | 62e7bc17c41c5542779a3c395a9d47d2bd306de2 (patch) | |
| tree | 4445167f4164a8af45438519fee8202c14a35892 /src/pulsecore/protocol-native.c | |
| parent | 7556ef5bfc37c99064d95857626bcf9f20423c70 (diff) | |
Big pile of dependant changes:
* Change pa_memblockq to carry silence memchunk instead of memblock and adapt all users
* Add new call pa_sink_input_get_silence() to get the suitable silence block for a sink input
* Implement monitoring sources properly by adding a delay queue to even out rewinds
* Remove pa_{sink|source}_ping() becaused unnecessary these days and not used
* Fix naming of various rewind related functions. Downstream is now _request_rewind(), upstream is _process_rewind()
* Fix volume adjustments for a single stream in pa_sink_render()
* Properly handle prebuf-style buffer underruns in pa_sink_input
* Don't allow rewinding to more than the last underrun
* Rework default buffering metrics selection for native protocol
* New functions pa_memblockq_prebuf_active(), pa_memblockq_silence()
* add option "mixer_reset=" to module-alsa-sink
* Other cleanups
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/glitch-free@2283 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/pulsecore/protocol-native.c')
| -rw-r--r-- | src/pulsecore/protocol-native.c | 276 | 
1 files changed, 194 insertions, 82 deletions
| diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index fc5aa598..37200db6 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -72,6 +72,7 @@  #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */  #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */ +#define DEFAULT_PROCESS_MSEC 20   /* 20ms */  #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC  typedef struct connection connection; @@ -104,7 +105,7 @@ typedef struct playback_stream {      pa_bool_t drain_request;      uint32_t drain_tag;      uint32_t syncid; -    pa_bool_t underrun; +    uint64_t underrun; /* length of underrun */      pa_atomic_t missing;      size_t minreq; @@ -208,6 +209,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk  static void sink_input_kill_cb(pa_sink_input *i);  static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);  static void sink_input_moved_cb(pa_sink_input *i); +static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes); +static void sink_input_set_max_rewind_cb(pa_sink_input *i, size_t nbytes); +  static void send_memblock(connection *c);  static void request_bytes(struct playback_stream*s); @@ -471,7 +475,6 @@ static record_stream* record_stream_new(          pa_source *source,          pa_sample_spec *ss,          pa_channel_map *map, -        const char *name,          uint32_t *maxlength,          uint32_t *fragsize,          pa_source_output_flags_t flags, @@ -485,7 +488,6 @@ static record_stream* record_stream_new(      pa_assert(c);      pa_assert(ss); -    pa_assert(name);      pa_assert(maxlength);      pa_assert(p); @@ -523,7 +525,7 @@ static record_stream* record_stream_new(      if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH)          *maxlength = MAX_MEMBLOCKQ_LENGTH;      if (*fragsize <= 0) -        *fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*1000, &source_output->sample_spec); +        *fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &source_output->sample_spec);      if (adjust_latency) {          pa_usec_t fragsize_usec; @@ -618,21 +620,14 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,              uint32_t l = 0;              for (;;) { -                int32_t k; - -                if ((k = pa_atomic_load(&s->missing)) <= 0) -                    break; - -                l += k; - -                if (l < s->minreq) +                if ((l = pa_atomic_load(&s->missing)) <= 0)                      break; -                if (pa_atomic_sub(&s->missing, k) <= k) +                if (pa_atomic_cmpxchg(&s->missing, l, 0))                      break;              } -            if (l < s->minreq) +            if (l <= 0)                  break;              t = pa_tagstruct_new(NULL, 0); @@ -642,7 +637,7 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,              pa_tagstruct_putu32(t, l);              pa_pstream_send_tagstruct(s->connection->pstream, t); -/*             pa_log("Requesting %u bytes", l);     */ +/*             pa_log("Requesting %lu bytes", (unsigned long) l); */              break;          } @@ -684,7 +679,6 @@ static playback_stream* playback_stream_new(          pa_sink *sink,          pa_sample_spec *ss,          pa_channel_map *map, -        const char *name,          uint32_t *maxlength,          uint32_t *tlength,          uint32_t *prebuf, @@ -699,14 +693,15 @@ static playback_stream* playback_stream_new(      playback_stream *s, *ssync;      pa_sink_input *sink_input; -    pa_memblock *silence; +    pa_memchunk silence;      uint32_t idx;      int64_t start_index;      pa_sink_input_new_data data; +    pa_usec_t tlength_usec, minreq_usec, sink_usec; +    size_t frame_size;      pa_assert(c);      pa_assert(ss); -    pa_assert(name);      pa_assert(maxlength);      pa_assert(tlength);      pa_assert(prebuf); @@ -761,10 +756,12 @@ static playback_stream* playback_stream_new(      s->connection = c;      s->syncid = syncid;      s->sink_input = sink_input; -    s->underrun = TRUE; +    s->underrun = (uint64_t) -1;      s->sink_input->parent.process_msg = sink_input_process_msg;      s->sink_input->pop = sink_input_pop_cb; +    s->sink_input->rewind = sink_input_rewind_cb; +    s->sink_input->set_max_rewind = sink_input_set_max_rewind_cb;      s->sink_input->kill = sink_input_kill_cb;      s->sink_input->moved = sink_input_moved_cb;      s->sink_input->suspend = sink_input_suspend_cb; @@ -775,40 +772,69 @@ static playback_stream* playback_stream_new(      if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH)          *maxlength = MAX_MEMBLOCKQ_LENGTH;      if (*tlength <= 0) -        *tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*1000, &sink_input->sample_spec); +        *tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &sink_input->sample_spec);      if (*minreq <= 0) -        *minreq = (*tlength*9)/10; -    if (*prebuf <= 0) -        *prebuf = *tlength; +        *minreq = pa_usec_to_bytes(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &sink_input->sample_spec); + +    frame_size = pa_frame_size(&sink_input->sample_spec); +    if (*minreq <= 0) +        *minreq = frame_size; +    if (*tlength < *minreq+frame_size) +        *tlength = *minreq+frame_size; + +    tlength_usec = pa_bytes_to_usec(*tlength, &sink_input->sample_spec); +    minreq_usec = pa_bytes_to_usec(*minreq, &sink_input->sample_spec);      if (adjust_latency) { -        pa_usec_t tlength_usec, minreq_usec; -        /* So, the user asked us to adjust the latency according to -         * the what the sink can provide. Half the latency will be -         * spent on the hw buffer, half of it in the async buffer -         * queue we maintain for each client. */ +        /* So, the user asked us to adjust the latency of the stream +         * buffer according to the what the sink can provide. The +         * tlength passed in shall be the overall latency. Roughly +         * half the latency will be spent on the hw buffer, the other +         * half of it in the async buffer queue we maintain for each +         * client. In between we'll have a safety space of size +         * minreq.*/ + +        sink_usec = (tlength_usec-minreq_usec)/2; + +    } else { + +        /* Ok, the user didn't ask us to adjust the latency, but we +         * still need to make sure that the parameters from the user +         * do make sense. */ + +        sink_usec = tlength_usec - minreq_usec; +    } -        tlength_usec = pa_bytes_to_usec(*tlength, &sink_input->sample_spec); -        minreq_usec = pa_bytes_to_usec(*minreq, &sink_input->sample_spec); +    s->sink_latency = pa_sink_input_set_requested_latency(sink_input, sink_usec); -        s->sink_latency = pa_sink_input_set_requested_latency(sink_input, tlength_usec/2); +    if (adjust_latency) { +        /* Ok, we didn't necessarily get what we were asking for, so +         * let's subtract from what we asked for for the remaining +         * buffer space */ -        if (tlength_usec >= s->sink_latency*2) +        if (tlength_usec >= s->sink_latency)              tlength_usec -= s->sink_latency; -        else -            tlength_usec = s->sink_latency; +    } -        if (minreq_usec >= s->sink_latency*2) -            minreq_usec -= s->sink_latency; -        else -            minreq_usec = s->sink_latency; +    if (tlength_usec < s->sink_latency + minreq_usec) +        tlength_usec = s->sink_latency + minreq_usec; -        *tlength = pa_usec_to_bytes(tlength_usec, &sink_input->sample_spec); -        *minreq = pa_usec_to_bytes(minreq_usec, &sink_input->sample_spec); +    *tlength = pa_usec_to_bytes(tlength_usec, &sink_input->sample_spec); +    *minreq = pa_usec_to_bytes(minreq_usec, &sink_input->sample_spec); + +    if (*minreq <= 0) { +        *minreq = frame_size; +        *tlength += frame_size;      } -    silence = pa_silence_memblock_new(c->protocol->core->mempool, &sink_input->sample_spec, 0); +    if (*tlength <= *minreq) +        *tlength = *minreq + frame_size; + +    if (*prebuf <= 0) +        *prebuf = *tlength; + +    pa_sink_input_get_silence(sink_input, &silence);      s->memblockq = pa_memblockq_new(              start_index, @@ -818,9 +844,9 @@ static playback_stream* playback_stream_new(              *prebuf,              *minreq,              0, -            silence); +            &silence); -    pa_memblock_unref(silence); +    pa_memblock_unref(silence.memblock);      *maxlength = (uint32_t) pa_memblockq_get_maxlength(s->memblockq);      *tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq); @@ -924,10 +950,12 @@ static void request_bytes(playback_stream *s) {      if (m <= 0)          return; -/*     pa_log("request_bytes(%u)", m); */ +/*     pa_log("request_bytes(%lu)", (unsigned long) m); */      previous_missing = pa_atomic_add(&s->missing, m); -    if (previous_missing < s->minreq && previous_missing+m >= s->minreq) { + +    if (pa_memblockq_prebuf_active(s->memblockq) || +        (previous_missing < s->minreq && previous_missing+m >= s->minreq)) {          pa_assert(pa_thread_mq_get());          pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);      } @@ -989,6 +1017,45 @@ static void send_record_stream_killed(record_stream *r) {  /*** sink input callbacks ***/ +static void handle_seek(playback_stream *s, int64_t indexw) { +    playback_stream_assert_ref(s); + +/*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->underrun, pa_memblockq_is_readable(s->memblockq)); */ + +    if (s->underrun != 0) { + +/*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */ + +        if (pa_memblockq_is_readable(s->memblockq)) { + +            size_t u = pa_memblockq_get_length(s->memblockq); + +            if (u >= s->underrun) +                u = s->underrun; + +            pa_log("yeah! ready to rock"); + +            /* We just ended an underrun, let's ask the sink +             * to rewrite */ +            s->sink_input->thread_info.ignore_rewind = TRUE; +            pa_sink_input_request_rewind(s->sink_input, u, TRUE); +        } + +    } else { +        int64_t indexr; + +        indexr = pa_memblockq_get_read_index(s->memblockq); + +        if (indexw < indexr) +            /* OK, the sink already asked for this data, so +             * let's have it usk us again */ + +            pa_sink_input_request_rewind(s->sink_input, indexr - indexw, FALSE); +    } + +    request_bytes(s); +} +  /* Called from thread context */  static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {      pa_sink_input *i = PA_SINK_INPUT(o); @@ -1000,48 +1067,42 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int      switch (code) { -        case SINK_INPUT_MESSAGE_SEEK: +        case SINK_INPUT_MESSAGE_SEEK: { +            int64_t windex; + +            windex = pa_memblockq_get_write_index(s->memblockq);              pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata)); -            request_bytes(s); + +            handle_seek(s, windex);              return 0; +        }          case SINK_INPUT_MESSAGE_POST_DATA: { +            int64_t windex; +              pa_assert(chunk); -/*             pa_log("sink input post: %u", chunk->length); */ +/*             pa_log("sink input post: %lu", (unsigned long) chunk->length); */ -            if (pa_memblockq_push_align(s->memblockq, chunk) < 0) { +            windex = pa_memblockq_get_write_index(s->memblockq); +            if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {                  pa_log_warn("Failed to push data into queue");                  pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);                  pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_RELATIVE);              } -            request_bytes(s); - -            s->underrun = FALSE; -            return 0; -        } - -        case SINK_INPUT_MESSAGE_DRAIN: { - -            pa_memblockq_prebuf_disable(s->memblockq); - -            if (!pa_memblockq_is_readable(s->memblockq)) -                pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL); -            else { -                s->drain_tag = PA_PTR_TO_UINT(userdata); -                s->drain_request = TRUE; -            } -            request_bytes(s); +            handle_seek(s, windex);              return 0;          } +        case SINK_INPUT_MESSAGE_DRAIN:          case SINK_INPUT_MESSAGE_FLUSH:          case SINK_INPUT_MESSAGE_PREBUF_FORCE:          case SINK_INPUT_MESSAGE_TRIGGER: { +            int64_t windex;              pa_sink_input *isync;              void (*func)(pa_memblockq *bq); @@ -1054,6 +1115,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int                      func = pa_memblockq_prebuf_force;                      break; +                case SINK_INPUT_MESSAGE_DRAIN:                  case SINK_INPUT_MESSAGE_TRIGGER:                      func = pa_memblockq_prebuf_disable;                      break; @@ -1062,23 +1124,32 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int                      pa_assert_not_reached();              } +            windex = pa_memblockq_get_write_index(s->memblockq);              func(s->memblockq); -            s->underrun = FALSE; -            request_bytes(s); +            handle_seek(s, windex);              /* Do the same for all other members in the sync group */              for (isync = i->sync_prev; isync; isync = isync->sync_prev) {                  playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); +                windex = pa_memblockq_get_write_index(ssync->memblockq);                  func(ssync->memblockq); -                ssync->underrun = FALSE; -                request_bytes(ssync); +                handle_seek(ssync, windex);              }              for (isync = i->sync_next; isync; isync = isync->sync_next) {                  playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); +                windex = pa_memblockq_get_write_index(ssync->memblockq);                  func(ssync->memblockq); -                ssync->underrun = FALSE; -                request_bytes(ssync); +                handle_seek(ssync, windex); +            } + +            if (code == SINK_INPUT_MESSAGE_DRAIN) { +                if (!pa_memblockq_is_readable(s->memblockq)) +                    pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL); +                else { +                    s->drain_tag = PA_PTR_TO_UINT(userdata); +                    s->drain_request = TRUE; +                }              }              return 0; @@ -1091,11 +1162,18 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int              s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);              return 0; -        case PA_SINK_INPUT_MESSAGE_SET_STATE: +        case PA_SINK_INPUT_MESSAGE_SET_STATE: { +            int64_t windex; + +            windex = pa_memblockq_get_write_index(s->memblockq);              pa_memblockq_prebuf_force(s->memblockq); -            request_bytes(s); + +            handle_seek(s, windex); + +            /* Fall through to the default handler */              break; +        }          case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {              pa_usec_t *r = userdata; @@ -1112,7 +1190,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int  }  /* Called from thread context */ -static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {      playback_stream *s;      pa_sink_input_assert_ref(i); @@ -1122,23 +1200,58 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk      if (pa_memblockq_peek(s->memblockq, chunk) < 0) { +/*         pa_log("UNDERRUN"); */ +          if (s->drain_request && pa_sink_input_safe_to_remove(i)) {              s->drain_request = FALSE;              pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); -        } else if (!s->underrun) { -            s->underrun = TRUE; +        } else if (s->underrun == 0)              pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL); -        } + +        if (s->underrun != (size_t) -1) +            s->underrun += nbytes; + +/*         pa_log("added %llu bytes, total is %llu", (unsigned long long) nbytes, (unsigned long long) s->underrun); */ + +        request_bytes(s);          return -1;      } +/*     pa_log("NOTUNDERRUN"); */ + +    s->underrun = 0; +      pa_memblockq_drop(s->memblockq, chunk->length);      request_bytes(s);      return 0;  } +static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes) { +    playback_stream *s; + +    pa_sink_input_assert_ref(i); +    s = PLAYBACK_STREAM(i->userdata); +    playback_stream_assert_ref(s); + +    /* If we are in an underrun, then we don't rewind */ +    if (s->underrun != 0) +        return; + +    pa_memblockq_rewind(s->memblockq, nbytes); +} + +static void sink_input_set_max_rewind_cb(pa_sink_input *i, size_t nbytes) { +    playback_stream *s; + +    pa_sink_input_assert_ref(i); +    s = PLAYBACK_STREAM(i->userdata); +    playback_stream_assert_ref(s); + +    pa_memblockq_set_maxrewind(s->memblockq, nbytes); +} +  /* Called from main context */  static void sink_input_kill_cb(pa_sink_input *i) {      playback_stream *s; @@ -1416,7 +1529,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC          (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |          (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0); -    s = playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tlength, &prebuf, &minreq, &volume, muted, syncid, &missing, flags, p, adjust_latency); +    s = playback_stream_new(c, sink, &ss, &map, &maxlength, &tlength, &prebuf, &minreq, &volume, muted, syncid, &missing, flags, p, adjust_latency);      pa_proplist_free(p);      CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); @@ -1625,7 +1738,7 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_          (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |          (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0); -    s = record_stream_new(c, source, &ss, &map, name, &maxlength, &fragment_size, flags, p, adjust_latency); +    s = record_stream_new(c, source, &ss, &map, &maxlength, &fragment_size, flags, p, adjust_latency);      pa_proplist_free(p);      CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); @@ -1721,7 +1834,6 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t                      pa_log_warn("failed to get GID of group '%s'", c->protocol->auth_group);                  else if (gid == creds->gid)                      success = 1; -                  if (!success) {                      if ((r = pa_uid_in_group(creds->uid, c->protocol->auth_group)) < 0)                          pa_log_warn("failed to check group membership."); @@ -1739,7 +1851,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t                  pa_mempool_is_shared(c->protocol->core->mempool) &&                  creds->uid == getuid()) { -                pa_pstream_use_shm(c->pstream, 1); +                pa_pstream_enable_shm(c->pstream, TRUE);                  pa_log_info("Enabled SHM for new connection");              } | 
