From da925cc7dff9ba59a748bcd658cf57153213bf61 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 22 May 2007 22:54:05 +0000 Subject: add asyncq git-svn-id: file:///home/lennart/svn/public/libsydney/trunk@31 9ba3c220-e4d3-45a2-8aa3-73fcc9aff6ce --- asyncq.c | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ asyncq.h | 49 ++++++++++++++++++++++++++++ test-asyncq.c | 62 +++++++++++++++++++++++++++++++++++ 3 files changed, 213 insertions(+) create mode 100644 asyncq.c create mode 100644 asyncq.h create mode 100644 test-asyncq.c diff --git a/asyncq.c b/asyncq.c new file mode 100644 index 0000000..54d7d5b --- /dev/null +++ b/asyncq.c @@ -0,0 +1,102 @@ +#include "sydney.h" +#include "asyncq.h" +#include "malloc.h" + +int sa_asyncq_init(sa_asyncq_t *a, size_t item_size) { + sa_assert(a); + + SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->items); + SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->unused); + + a->last = NULL; + a->item_size = item_size; + + if (!(a->mutex = sa_mutex_new(0))) + return SA_ERROR_OOM; + + return SA_SUCCESS; +} + +sa_asyncq_item_t *sa_asyncq_get(sa_asyncq_t *a) { + sa_assert(a); + + sa_mutex_lock(a->mutex); + + if ((i = a->unused)) + SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i); + + sa_mutex_unlock(a->mutex); + + if (!i) + if (!(i = sa_malloc(SA_ALIGN(sa_asyncq_item_t) + a->item_size))) + return NULL; + + return i; +} + +void sa_asyncq_recycle(sa_asyncq_t *a, sa_asyncq_item_t *i) { + sa_assert(a); + sa_assert(i); + + sa_mutex_lock(a->mutex); + SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->unused, i); + sa_mutex_unlock(a->mutex); +} + +void sa_asyncq_push(sa_asyncq_t *a, sa_asyncq_item_t *i) { + sa_assert(a); + sa_assert(i); + + sa_mutex_lock(a->mutex); + + if (a->last) + SA_LLIST_INSERT_AFTER(sa_asyncq_item_t, items, a->items, a->last, i); + else + SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->items, i); + + a->last = i; + + sa_mutex_unlock(a->mutex); + + return SA_SUCCESS; +} + +sa_asyncq_item_t sa_asyncq_pop(sa_asyncq_t *a, int wait) { + sa_asyncq_item_t *i; + + sa_assert(a); + + if (wait) + sa_mutex_lock(a->mutex); + else + if (!sa_mutex_try_lock(a->mutex)) + return NULL; + + if ((i = a->items)) { + if (i == a->last) + a->last = NULL; + + SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->items, i); + } + + sa_mutex_unlock(a->mutex); + + return i; +} + +void sa_asyncq_done(sa_asyncq_t *a) { + sa_asyncq_item_t *i; + sa_assert(a); + + /* The caller *must* pop all items from the queue before + * destructing us! */ + sa_assert(!a->items); + + if (a->mutex) + sa_mutex_free(a->mutex); + + while ((i = a->unused)) { + SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i); + sa_free(i); + } +} diff --git a/asyncq.h b/asyncq.h new file mode 100644 index 0000000..2e60c23 --- /dev/null +++ b/asyncq.h @@ -0,0 +1,49 @@ +#ifndef foosydneyhasynchfoo +#define foosydneyhasynchfoo + +#include "llist.h" +#include "mutex.h" + +typedef struct sa_asyncq sa_asyncq_t; +typedef struct sa_asyncq_item sa_asyncq_item_t; + +struct sa_asyncq_item { + SA_LLIST_ITEM(sa_asyncq_item_t, items); +}; + +#define SA_ASYNCQ_ITEM_DATA(x) ((void*) ((uint8_t*) (x) + ALIGN(sizeof(sa_asyncq_item_t)))) + +struct sa_asyncq { + sa_mutex_t *mutex; + + SA_LLIST_HEAD(sa_asyncq_item_t, items); + SA_LLIST_HEAD(sa_asyncq_item_t, unused); + + sa_asyncq_item_t *last; + + size_t item_size; +}; + +/* Implements a simple asynchronous queue for + * inter-thread-communication. To reading side can act in a wait-free + * fashion (though not lock-free). Should only be used together with a + * non-sychrnoized backing buffer such as sa_bufferq. */ + +int sa_asyncq_init(sa_asyncq_t *a, size_t item_size); + +void sa_asyncq_done(sa_asyncq_t *a); + +/* Allocate a free queue item */ +sa_asyncq_item_t *sa_asyncq_get(sa_asyncq_t *a); + +/* Give the queue item back to the queue */ +void sa_asyncq_recycle(sa_asyncq_t *a); + +/* Push a previously allocated entry into the queue */ +void sa_asyncq_push(sa_asyncq_t *a, sa_asyncq_item_t *i); + +/* Pop an entry from the queue */ +sa_asyncq_item_t* sa_asyncq_pop(sa_asyncq_t *a, int wait); + + +#endif diff --git a/test-asyncq.c b/test-asyncq.c new file mode 100644 index 0000000..26dbc55 --- /dev/null +++ b/test-asyncq.c @@ -0,0 +1,62 @@ +#include + +#include "asyncq.h" +#include "thread.h" +#include "macro.h" + +#define ITERATIONS_MAX 1000 + +static void thread(void *userdata) { + sa_asyncq_t *q = userdata; + int i; + + for (i = 0; i < ITERATIONS_MAX; i++) { + sa_asyncq_item_t *i; + + i = sa_asyncq_get(q); + sa_assert(i); + + sa_asyncq_push(q, i); + + if (rand() & 1) + sa_thread_yield(); + } +} + +int main(int argc, char *argv[]) { + int j; + sa_thread_t *t; + sa_asyncq_t q; + sa_asyncq_item_t *i; + + sa_assert_success(sa_asyncq_init(&q)); + + t = sa_thread_new(thread, &q); + sa_assert(t); + + for (j = 0; j < ITERATIONS_MAX; j++) { + + do { + i = sa_asyncq_pop(&q, 0); + printf("%s ", i ? "gotcha" : "miss"); + + if (i) + sa_asyncq_recycle(&q, i); + } while (i); + + if (rand() & 1) + sa_thread_yield(); + } + + printf("\n"); + + sa_thread_free(t); + + while (sa_asyncq_pop(&q, 1)) + ; + + sa_asyncq_done(&q); + + + return 0; +} -- cgit