summaryrefslogtreecommitdiffstats
path: root/asyncq.c
blob: 54d7d5be61878f25400433451d22579d8c2d50ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include "sydney.h"
#include "asyncq.h"
#include "malloc.h"

int sa_asyncq_init(sa_asyncq_t *a, size_t item_size) {
    sa_assert(a);

    SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->items);
    SA_LLIST_HEAD_INIT(sa_asyncq_item_t, a->unused);

    a->last = NULL;
    a->item_size = item_size;
    
    if (!(a->mutex = sa_mutex_new(0)))
        return SA_ERROR_OOM;

    return SA_SUCCESS;
}

sa_asyncq_item_t *sa_asyncq_get(sa_asyncq_t *a) {
    sa_assert(a);
    
    sa_mutex_lock(a->mutex);

    if ((i = a->unused))
        SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i);

    sa_mutex_unlock(a->mutex);

    if (!i)
        if (!(i = sa_malloc(SA_ALIGN(sa_asyncq_item_t) + a->item_size)))
            return NULL;

    return i;
}

void sa_asyncq_recycle(sa_asyncq_t *a, sa_asyncq_item_t *i) {
    sa_assert(a);
    sa_assert(i);

    sa_mutex_lock(a->mutex);
    SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->unused, i);
    sa_mutex_unlock(a->mutex);
}

void sa_asyncq_push(sa_asyncq_t *a, sa_asyncq_item_t *i) {
    sa_assert(a);
    sa_assert(i);

    sa_mutex_lock(a->mutex);

    if (a->last)
        SA_LLIST_INSERT_AFTER(sa_asyncq_item_t, items, a->items, a->last, i);
    else
        SA_LLIST_PREPEND(sa_asyncq_item_t, items, a->items, i);
    
    a->last = i;

    sa_mutex_unlock(a->mutex);

    return SA_SUCCESS;
}

sa_asyncq_item_t sa_asyncq_pop(sa_asyncq_t *a, int wait) {
    sa_asyncq_item_t *i;
    
    sa_assert(a);

    if (wait)
        sa_mutex_lock(a->mutex);
    else
        if (!sa_mutex_try_lock(a->mutex))
            return NULL;

    if ((i = a->items)) {
        if (i == a->last)
            a->last = NULL;
        
        SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->items, i);
    }
    
    sa_mutex_unlock(a->mutex);

    return i;
}

void sa_asyncq_done(sa_asyncq_t *a) {
    sa_asyncq_item_t *i;
    sa_assert(a);

    /* The caller *must* pop all items from the queue before
     * destructing us! */
    sa_assert(!a->items);
    
    if (a->mutex)
        sa_mutex_free(a->mutex);

    while ((i = a->unused)) {
        SA_LLIST_REMOVE(sa_asyncq_item_t, items, a->unused, i);
        sa_free(i);
    }
}