summaryrefslogtreecommitdiffstats
path: root/src/asyncq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/asyncq.c')
-rw-r--r--src/asyncq.c102
1 files changed, 102 insertions, 0 deletions
diff --git a/src/asyncq.c b/src/asyncq.c
new file mode 100644
index 0000000..54d7d5b
--- /dev/null
+++ b/src/asyncq.c
@@ -0,0 +1,102 @@
+#include "sydney.h"
+#include "asyncq.h"
+#include "malloc.h"
+
+int sa_asyncq_init(sa_asyncq_t *a, size_t item_size) {
+ sa_assert(a);
+
+ SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->items);
+ SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->unused);
+
+ a->last = NULL;
+ a->item_size = item_size;
+
+ if (!(a->mutex = sa_mutex_new(0)))
+ return SA_ERROR_OOM;
+
+ return SA_SUCCESS;
+}
+
+sa_asyncq_item_t *sa_asyncq_get(sa_asyncq_t *a) {
+ sa_assert(a);
+
+ sa_mutex_lock(a->mutex);
+
+ if ((i = a->unused))
+ SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i);
+
+ sa_mutex_unlock(a->mutex);
+
+ if (!i)
+ if (!(i = sa_malloc(SA_ALIGN(sa_asyncq_item_t) + a->item_size)))
+ return NULL;
+
+ return i;
+}
+
+void sa_asyncq_recycle(sa_asyncq_t *a, sa_asyncq_item_t *i) {
+ sa_assert(a);
+ sa_assert(i);
+
+ sa_mutex_lock(a->mutex);
+ SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->unused, i);
+ sa_mutex_unlock(a->mutex);
+}
+
+void sa_asyncq_push(sa_asyncq_t *a, sa_asyncq_item_t *i) {
+ sa_assert(a);
+ sa_assert(i);
+
+ sa_mutex_lock(a->mutex);
+
+ if (a->last)
+ SA_LLIST_INSERT_AFTER(sa_asyncq_item_t, items, a->items, a->last, i);
+ else
+ SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->items, i);
+
+ a->last = i;
+
+ sa_mutex_unlock(a->mutex);
+
+ return SA_SUCCESS;
+}
+
+sa_asyncq_item_t sa_asyncq_pop(sa_asyncq_t *a, int wait) {
+ sa_asyncq_item_t *i;
+
+ sa_assert(a);
+
+ if (wait)
+ sa_mutex_lock(a->mutex);
+ else
+ if (!sa_mutex_try_lock(a->mutex))
+ return NULL;
+
+ if ((i = a->items)) {
+ if (i == a->last)
+ a->last = NULL;
+
+ SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->items, i);
+ }
+
+ sa_mutex_unlock(a->mutex);
+
+ return i;
+}
+
+void sa_asyncq_done(sa_asyncq_t *a) {
+ sa_asyncq_item_t *i;
+ sa_assert(a);
+
+ /* The caller *must* pop all items from the queue before
+ * destructing us! */
+ sa_assert(!a->items);
+
+ if (a->mutex)
+ sa_mutex_free(a->mutex);
+
+ while ((i = a->unused)) {
+ SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i);
+ sa_free(i);
+ }
+}