diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/pulsecore/asyncq.c | 201 | 
1 files changed, 113 insertions, 88 deletions
| diff --git a/src/pulsecore/asyncq.c b/src/pulsecore/asyncq.c index c966e7dd..025c695e 100644 --- a/src/pulsecore/asyncq.c +++ b/src/pulsecore/asyncq.c @@ -52,9 +52,10 @@ struct pa_asyncq {      unsigned size;      unsigned read_idx;      unsigned write_idx; -    pa_atomic_t read_waiting, n_read; -    pa_atomic_t write_waiting, n_written; +    pa_atomic_t read_waiting; /* a bool */ +    pa_atomic_t write_waiting; /* a bool */      int read_fds[2], write_fds[2]; +    pa_atomic_t in_read_fifo, in_write_fifo;  };  #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq)))) @@ -80,8 +81,8 @@ pa_asyncq *pa_asyncq_new(unsigned size) {      l->size = size;      pa_atomic_store(&l->read_waiting, 0);      pa_atomic_store(&l->write_waiting, 0); -    pa_atomic_store(&l->n_written, 0); -    pa_atomic_store(&l->n_read, 0); +    pa_atomic_store(&l->in_read_fifo, 0); +    pa_atomic_store(&l->in_write_fifo, 0);      if (pipe(l->read_fds) < 0) {          pa_xfree(l); @@ -133,68 +134,79 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {      if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { -        if (!wait) { -            /* Let's empty the FIFO from old notifications, before we return */ +        /* Let's empty the FIFO from old notifications, before we return */ -            while (pa_atomic_load(&l->n_read) > 0) { -                ssize_t r; -                int x[20]; -                 -                errno = 0; -                if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) -                    return -1; +        while (pa_atomic_load(&l->in_write_fifo) > 0) { +            ssize_t r; +            int x[20]; + +            if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) { -                pa_assert(r != 0); +                if (errno == EINTR) +                    continue; -                if (r > 0) -                    if (pa_atomic_sub(&l->n_read, r) <= r) -                        break; +                return -1;              } -             -            return -1; -        } - -        /* First try failed. Let's wait for changes. */ -        _Y; +            pa_assert(r > 0); +                 +            if (pa_atomic_sub(&l->in_write_fifo, r) <= r) +                break; -        pa_atomic_inc(&l->write_waiting); +        } -        for (;;) { -            char x[20]; -            ssize_t r; +        /* Now let's make sure that we didn't lose any events */ +        if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { -            _Y; +            if (!wait) +                return -1; -            if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) -                break; +            /* Let's wait for changes. */              _Y; -            if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) { -                pa_atomic_dec(&l->write_waiting); -                return -1; -            } +            pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 0, 1)); -            pa_assert(r != 0); +            for (;;) { +                char x[20]; +                ssize_t r; +                 +                _Y; +                 +                if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) +                    break; +                 +                _Y; -            if (r > 0) -                pa_atomic_sub(&l->n_read, r); -        } +                if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) { -        _Y; +                    if (errno == EINTR) +                        continue; +                     +                    pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0)); +                    return -1; +                } -        pa_atomic_dec(&l->write_waiting); +                pa_assert(r > 0); +                pa_atomic_sub(&l->in_write_fifo, r); +            } +             +            _Y; +             +            pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0)); +        }      }      _Y;      l->write_idx++; -    if (pa_atomic_load(&l->read_waiting)) { +    if (pa_atomic_load(&l->read_waiting) > 0) {          char x = 'x';          _Y; -        if (write(l->read_fds[1], &x, sizeof(x)) > 0) -            pa_atomic_inc(&l->n_written); +        if (write(l->read_fds[1], &x, sizeof(x)) > 0) { +            pa_atomic_inc(&l->in_read_fifo); +/*             pa_log("increasing %p by 1", l); */ +        }      }      return 0; @@ -206,7 +218,7 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {      pa_atomic_ptr_t *cells;      pa_assert(l); - +          cells = PA_ASYNCQ_CELLS(l);      _Y; @@ -214,71 +226,86 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {      if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { -        /* First try failed. Let's wait for changes. */ +/*         pa_log("pop failed wait=%i", wait); */ -        if (!wait) { -            /* Let's empty the FIFO from old notifications, before we return */ +        /* Hmm, nothing, here, so let's drop all queued events. */ +        while (pa_atomic_load(&l->in_read_fifo) > 0) { +            ssize_t r; +            int x[20]; -            while (pa_atomic_load(&l->n_written) > 0) { -                ssize_t r; -                int x[20]; +            if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) { -                errno = 0; -                if ((r = read(l->read_fds[0], x, sizeof(x))) < 0 && errno != EINTR) -                    return NULL; - -                pa_assert(r != 0); +                if (errno == EINTR) +                    continue; -                if (r > 0) -                    if (pa_atomic_sub(&l->n_written, r) <= r) -                        break; +                return NULL;              } + +            pa_assert(r > 0); + +/*             pa_log("decreasing %p by %i", l, r); */ -            return NULL; +            if (pa_atomic_sub(&l->in_read_fifo, r) <= r) +                break;          } -        _Y; - -        pa_atomic_inc(&l->read_waiting); +        /* Now let's make sure that we didn't lose any events */ +        if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { -        for (;;) { -            char x[20]; -            ssize_t r; +            if (!wait) +                return NULL; +            /* Let's wait for changes. */ +                          _Y; +             +            pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1)); +             +            for (;;) { +                char x[20]; +                ssize_t r; +                 +                _Y; +                 +                if ((ret = pa_atomic_ptr_load(&cells[idx]))) +                    break; +                 +                _Y; +                 +                if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) { -            if ((ret = pa_atomic_ptr_load(&cells[idx]))) -                break; - -            _Y; +                    if (errno == EINTR) +                        continue; +                     +                    pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); +                    return NULL; +                } -            if ((r = read(l->read_fds[0], x, sizeof(x))) < 0 && errno != EINTR) { -                pa_atomic_dec(&l->read_waiting); -                return NULL; +/*                 pa_log("decreasing %p by %i", l, r); */ +                 +                pa_assert(r > 0); +                pa_atomic_sub(&l->in_read_fifo, r);              } -            pa_assert(r != 0); +            _Y; -            if (r > 0) -                pa_atomic_sub(&l->n_written, r); +            pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0));          } - -        _Y; - -        pa_atomic_dec(&l->read_waiting);      } +    pa_assert(ret); +      /* Guaranteed if we only have a single reader */      pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));      _Y;      l->read_idx++; -    if (pa_atomic_load(&l->write_waiting)) { +    if (pa_atomic_load(&l->write_waiting) > 0) {          char x = 'x';          _Y;          if (write(l->write_fds[1], &x, sizeof(x)) >= 0) -            pa_atomic_inc(&l->n_read); +            pa_atomic_inc(&l->in_write_fifo);      }      return ret; @@ -301,13 +328,13 @@ int pa_asyncq_before_poll(pa_asyncq *l) {      _Y;      idx = reduce(l, l->read_idx); -    if (pa_atomic_ptr_load(&cells[idx])) +    if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0)          return -1; -    pa_atomic_inc(&l->read_waiting); +    pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1)); -    if (pa_atomic_ptr_load(&cells[idx])) { -        pa_atomic_dec(&l->read_waiting); +    if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0) { +        pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0));          return -1;      } @@ -317,7 +344,5 @@ int pa_asyncq_before_poll(pa_asyncq *l) {  void pa_asyncq_after_poll(pa_asyncq *l) {      pa_assert(l); -    pa_assert(pa_atomic_load(&l->read_waiting) > 0); - -    pa_atomic_dec(&l->read_waiting); +    pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0));  } | 
