summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2007-05-22 22:54:05 +0000
committerLennart Poettering <lennart@poettering.net>2007-05-22 22:54:05 +0000
commitda925cc7dff9ba59a748bcd658cf57153213bf61 (patch)
tree2dcda2652a6f82e603f5417abff0aa70a6978fb1
parentc1b4872b0910c9aa784d878dca771b21c4690048 (diff)
add asyncq
git-svn-id: file:///home/lennart/svn/public/libsydney/trunk@31 9ba3c220-e4d3-45a2-8aa3-73fcc9aff6ce
-rw-r--r--asyncq.c102
-rw-r--r--asyncq.h49
-rw-r--r--test-asyncq.c62
3 files changed, 213 insertions, 0 deletions
diff --git a/asyncq.c b/asyncq.c
new file mode 100644
index 0000000..54d7d5b
--- /dev/null
+++ b/asyncq.c
@@ -0,0 +1,102 @@
+#include "sydney.h"
+#include "asyncq.h"
+#include "malloc.h"
+
+int sa_asyncq_init(sa_asyncq_t *a, size_t item_size) {
+ sa_assert(a);
+
+ SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->items);
+ SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->unused);
+
+ a->last = NULL;
+ a->item_size = item_size;
+
+ if (!(a->mutex = sa_mutex_new(0)))
+ return SA_ERROR_OOM;
+
+ return SA_SUCCESS;
+}
+
+sa_asyncq_item_t *sa_asyncq_get(sa_asyncq_t *a) {
+ sa_assert(a);
+
+ sa_mutex_lock(a->mutex);
+
+ if ((i = a->unused))
+ SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i);
+
+ sa_mutex_unlock(a->mutex);
+
+ if (!i)
+ if (!(i = sa_malloc(SA_ALIGN(sa_asyncq_item_t) + a->item_size)))
+ return NULL;
+
+ return i;
+}
+
+void sa_asyncq_recycle(sa_asyncq_t *a, sa_asyncq_item_t *i) {
+ sa_assert(a);
+ sa_assert(i);
+
+ sa_mutex_lock(a->mutex);
+ SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->unused, i);
+ sa_mutex_unlock(a->mutex);
+}
+
+void sa_asyncq_push(sa_asyncq_t *a, sa_asyncq_item_t *i) {
+ sa_assert(a);
+ sa_assert(i);
+
+ sa_mutex_lock(a->mutex);
+
+ if (a->last)
+ SA_LLIST_INSERT_AFTER(sa_asyncq_item_t, items, a->items, a->last, i);
+ else
+ SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->items, i);
+
+ a->last = i;
+
+ sa_mutex_unlock(a->mutex);
+
+ return SA_SUCCESS;
+}
+
+sa_asyncq_item_t sa_asyncq_pop(sa_asyncq_t *a, int wait) {
+ sa_asyncq_item_t *i;
+
+ sa_assert(a);
+
+ if (wait)
+ sa_mutex_lock(a->mutex);
+ else
+ if (!sa_mutex_try_lock(a->mutex))
+ return NULL;
+
+ if ((i = a->items)) {
+ if (i == a->last)
+ a->last = NULL;
+
+ SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->items, i);
+ }
+
+ sa_mutex_unlock(a->mutex);
+
+ return i;
+}
+
+void sa_asyncq_done(sa_asyncq_t *a) {
+ sa_asyncq_item_t *i;
+ sa_assert(a);
+
+ /* The caller *must* pop all items from the queue before
+ * destructing us! */
+ sa_assert(!a->items);
+
+ if (a->mutex)
+ sa_mutex_free(a->mutex);
+
+ while ((i = a->unused)) {
+ SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i);
+ sa_free(i);
+ }
+}
diff --git a/asyncq.h b/asyncq.h
new file mode 100644
index 0000000..2e60c23
--- /dev/null
+++ b/asyncq.h
@@ -0,0 +1,49 @@
+#ifndef foosydneyhasynchfoo
+#define foosydneyhasynchfoo
+
+#include "llist.h"
+#include "mutex.h"
+
+typedef struct sa_asyncq sa_asyncq_t;
+typedef struct sa_asyncq_item sa_asyncq_item_t;
+
+struct sa_asyncq_item {
+ SA_LLIST_ITEM(sa_asyncq_item_t, items);
+};
+
+#define SA_ASYNCQ_ITEM_DATA(x) ((void*) ((uint8_t*) (x) + ALIGN(sizeof(sa_asyncq_item_t))))
+
+struct sa_asyncq {
+ sa_mutex_t *mutex;
+
+ SA_LLIST_HEAD(sa_asyncq_item_t, items);
+ SA_LLIST_HEAD(sa_asyncq_item_t, unused);
+
+ sa_asyncq_item_t *last;
+
+ size_t item_size;
+};
+
+/* Implements a simple asynchronous queue for
+ * inter-thread-communication. To reading side can act in a wait-free
+ * fashion (though not lock-free). Should only be used together with a
+ * non-sychrnoized backing buffer such as sa_bufferq. */
+
+int sa_asyncq_init(sa_asyncq_t *a, size_t item_size);
+
+void sa_asyncq_done(sa_asyncq_t *a);
+
+/* Allocate a free queue item */
+sa_asyncq_item_t *sa_asyncq_get(sa_asyncq_t *a);
+
+/* Give the queue item back to the queue */
+void sa_asyncq_recycle(sa_asyncq_t *a);
+
+/* Push a previously allocated entry into the queue */
+void sa_asyncq_push(sa_asyncq_t *a, sa_asyncq_item_t *i);
+
+/* Pop an entry from the queue */
+sa_asyncq_item_t* sa_asyncq_pop(sa_asyncq_t *a, int wait);
+
+
+#endif
diff --git a/test-asyncq.c b/test-asyncq.c
new file mode 100644
index 0000000..26dbc55
--- /dev/null
+++ b/test-asyncq.c
@@ -0,0 +1,62 @@
+#include <stdlib.h>
+
+#include "asyncq.h"
+#include "thread.h"
+#include "macro.h"
+
+#define ITERATIONS_MAX 1000
+
+static void thread(void *userdata) {
+ sa_asyncq_t *q = userdata;
+ int i;
+
+ for (i = 0; i < ITERATIONS_MAX; i++) {
+ sa_asyncq_item_t *i;
+
+ i = sa_asyncq_get(q);
+ sa_assert(i);
+
+ sa_asyncq_push(q, i);
+
+ if (rand() & 1)
+ sa_thread_yield();
+ }
+}
+
+int main(int argc, char *argv[]) {
+ int j;
+ sa_thread_t *t;
+ sa_asyncq_t q;
+ sa_asyncq_item_t *i;
+
+ sa_assert_success(sa_asyncq_init(&q));
+
+ t = sa_thread_new(thread, &q);
+ sa_assert(t);
+
+ for (j = 0; j < ITERATIONS_MAX; j++) {
+
+ do {
+ i = sa_asyncq_pop(&q, 0);
+ printf("%s ", i ? "gotcha" : "miss");
+
+ if (i)
+ sa_asyncq_recycle(&q, i);
+ } while (i);
+
+ if (rand() & 1)
+ sa_thread_yield();
+ }
+
+ printf("\n");
+
+ sa_thread_free(t);
+
+ while (sa_asyncq_pop(&q, 1))
+ ;
+
+ sa_asyncq_done(&q);
+
+
+ return 0;
+}