summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--cache.c128
-rw-r--r--cache.h3
-rw-r--r--iface.c30
-rw-r--r--iface.h4
-rw-r--r--main.c24
-rw-r--r--psched.c4
-rw-r--r--rr.c24
-rw-r--r--rr.h2
-rw-r--r--server.c87
-rw-r--r--server.h5
-rw-r--r--subscribe.c103
-rw-r--r--subscribe.h39
13 files changed, 338 insertions, 117 deletions
diff --git a/Makefile b/Makefile
index 2731696..6798fe7 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ LIBS=$(shell pkg-config --libs glib-2.0)
all: flexmdns prioq-test
-flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o announce.o
+flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o announce.o subscribe.o
$(CC) -o $@ $^ $(LIBS)
#test-llist: test-llist.o
diff --git a/cache.c b/cache.c
index b668d33..9d9590b 100644
--- a/cache.c
+++ b/cache.c
@@ -6,7 +6,7 @@ static void remove_entry(flxCache *c, flxCacheEntry *e, gboolean remove_from_has
g_assert(c);
g_assert(e);
- g_message("remvoin from cache: %p %p", c, e);
+ g_message("removing from cache: %p %p", c, e);
if (remove_from_hash_table) {
flxCacheEntry *t;
@@ -18,10 +18,12 @@ static void remove_entry(flxCache *c, flxCacheEntry *e, gboolean remove_from_has
g_hash_table_remove(c->hash_table, e->record->key);
}
- flx_record_unref(e->record);
-
if (e->time_event)
flx_time_event_queue_remove(c->server->time_event_queue, e->time_event);
+
+ flx_subscription_notify(c->server, c->interface, e->record, FLX_SUBSCRIPTION_REMOVE);
+
+ flx_record_unref(e->record);
g_free(e);
}
@@ -71,7 +73,7 @@ flxCacheEntry *flx_cache_lookup_record(flxCache *c, flxRecord *r) {
g_assert(r);
for (e = flx_cache_lookup_key(c, r->key); e; e = e->by_name_next)
- if (e->record->size == r->size && !memcmp(e->record->data, r->data, r->size))
+ if (flx_record_equal_no_ttl(e->record, r))
return e;
return NULL;
@@ -127,6 +129,16 @@ static void elapse_func(flxTimeEvent *t, void *userdata) {
}
}
+static void update_time_event(flxCache *c, flxCacheEntry *e) {
+ g_assert(c);
+ g_assert(e);
+
+ if (e->time_event)
+ flx_time_event_queue_update(c->server->time_event_queue, e->time_event, &e->expiry);
+ else
+ e->time_event = flx_time_event_queue_add(c->server->time_event_queue, &e->expiry, elapse_func, e);
+}
+
static void next_expiry(flxCache *c, flxCacheEntry *e, guint percent) {
gulong usec;
@@ -142,14 +154,10 @@ static void next_expiry(flxCache *c, flxCacheEntry *e, guint percent) {
usec = g_random_int_range(usec*percent, usec*(percent+2));
g_time_val_add(&e->expiry, usec);
-
- if (e->time_event)
- flx_time_event_queue_update(c->server->time_event_queue, e->time_event, &e->expiry);
- else
- e->time_event = flx_time_event_queue_add(c->server->time_event_queue, &e->expiry, elapse_func, e);
+ update_time_event(c, e);
}
-flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a) {
+void flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a) {
flxCacheEntry *e, *t;
gchar *txt;
@@ -159,55 +167,77 @@ flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, cons
g_message("cache update: %s", (txt = flx_record_to_string(r)));
g_free(txt);
- if ((t = e = flx_cache_lookup_key(c, r->key))) {
+ if (r->ttl == 0) {
-/* g_message("found prev cache entry"); */
+ /* This is a goodbye request */
- if (unique) {
- /* Drop all entries but the first which we replace */
- while (e->by_name_next)
- remove_entry(c, e->by_name_next, TRUE);
+ if ((e = flx_cache_lookup_record(c, r))) {
- } else {
- /* Look for exactly the same entry */
- for (; e; e = e->by_name_next)
- if (flx_record_equal(e->record, r))
- break;
+ e->state = FLX_CACHE_FINAL;
+ g_get_current_time(&e->timestamp);
+ e->expiry = e->timestamp;
+ g_time_val_add(&e->expiry, 1000000); /* 1s */
+ update_time_event(c, e);
}
- }
-
- if (e) {
-/* g_message("found matching cache entry"); */
+ } else {
- /* We are the first in the linked list so let's replace the hash table key with the new one */
- if (e->by_name_prev == NULL)
- g_hash_table_replace(c->hash_table, r->key, e);
-
- /* Update the record */
- flx_record_unref(e->record);
- e->record = flx_record_ref(r);
+ /* This is an update request */
+ if ((t = e = flx_cache_lookup_key(c, r->key))) {
- } else {
- /* No entry found, therefore we create a new one */
-
+ if (unique) {
+
+ /* For unique records, remove all entries but one */
+ while (e->by_name_next)
+ remove_entry(c, e->by_name_next, TRUE);
+
+ } else {
+
+ /* For non-unique record, look for exactly the same entry */
+ for (; e; e = e->by_name_next)
+ if (flx_record_equal_no_ttl(e->record, r))
+ break;
+ }
+ }
+
+ if (e) {
+
+/* g_message("found matching cache entry"); */
+
+ /* We are the first in the linked list so let's replace the hash table key with the new one */
+ if (e->by_name_prev == NULL)
+ g_hash_table_replace(c->hash_table, r->key, e);
+
+ /* Notify subscribers */
+ if (!flx_record_equal_no_ttl(e->record, r))
+ flx_subscription_notify(c->server, c->interface, r, FLX_SUBSCRIPTION_CHANGE);
+
+ /* Update the record */
+ flx_record_unref(e->record);
+ e->record = flx_record_ref(r);
+
+ } else {
+ /* No entry found, therefore we create a new one */
+
/* g_message("couldn't find matching cache entry"); */
+
+ e = g_new(flxCacheEntry, 1);
+ e->cache = c;
+ e->time_event = NULL;
+ e->record = flx_record_ref(r);
+ FLX_LLIST_PREPEND(flxCacheEntry, by_name, t, e);
+ g_hash_table_replace(c->hash_table, e->record->key, t);
+
+ /* Notify subscribers */
+ flx_subscription_notify(c->server, c->interface, e->record, FLX_SUBSCRIPTION_NEW);
+ }
- e = g_new(flxCacheEntry, 1);
- e->cache = c;
- e->time_event = NULL;
- e->record = flx_record_ref(r);
- FLX_LLIST_PREPEND(flxCacheEntry, by_name, t, e);
- g_hash_table_replace(c->hash_table, e->record->key, t);
- }
-
- e->origin = *a;
- g_get_current_time(&e->timestamp);
- next_expiry(c, e, 80);
- e->state = FLX_CACHE_VALID;
-
- return e;
+ e->origin = *a;
+ g_get_current_time(&e->timestamp);
+ next_expiry(c, e, 80);
+ e->state = FLX_CACHE_VALID;
+ }
}
void flx_cache_drop_key(flxCache *c, flxKey *k) {
diff --git a/cache.h b/cache.h
index f51e18e..e7c8e7e 100644
--- a/cache.h
+++ b/cache.h
@@ -33,7 +33,6 @@ struct flxCacheEntry {
flxTimeEvent *time_event;
FLX_LLIST_FIELDS(flxCacheEntry, by_name);
-
};
struct _flxCache {
@@ -50,7 +49,7 @@ void flx_cache_free(flxCache *c);
flxCacheEntry *flx_cache_lookup_key(flxCache *c, flxKey *k);
flxCacheEntry *flx_cache_lookup_record(flxCache *c, flxRecord *r);
-flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a);
+void flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a);
void flx_cache_drop_key(flxCache *c, flxKey *k);
void flx_cache_drop_record(flxCache *c, flxRecord *r);
diff --git a/iface.c b/iface.c
index 5e6c94a..a7d210f 100644
--- a/iface.c
+++ b/iface.c
@@ -489,3 +489,33 @@ gboolean flx_interface_match(flxInterface *i, gint index, guchar protocol) {
return TRUE;
}
+
+void flx_interface_monitor_walk(flxInterfaceMonitor *m, gint interface, guchar protocol, flxInterfaceMonitorWalkCallback callback, gpointer userdata) {
+ g_assert(m);
+ g_assert(callback);
+
+ if (interface > 0) {
+ if (protocol != AF_UNSPEC) {
+ flxInterface *i;
+
+ if ((i = flx_interface_monitor_get_interface(m, interface, protocol)))
+ callback(m, i, userdata);
+
+ } else {
+ flxHwInterface *hw;
+ flxInterface *i;
+
+ if ((hw = flx_interface_monitor_get_hw_interface(m, interface)))
+ for (i = hw->interfaces; i; i = i->by_hardware_next)
+ if (flx_interface_match(i, interface, protocol))
+ callback(m, i, userdata);
+ }
+
+ } else {
+ flxInterface *i;
+
+ for (i = m->interfaces; i; i = i->interface_next)
+ if (flx_interface_match(i, interface, protocol))
+ callback(m, i, userdata);
+ }
+}
diff --git a/iface.h b/iface.h
index 85e535c..b5c2708 100644
--- a/iface.h
+++ b/iface.h
@@ -92,4 +92,8 @@ gboolean flx_interface_address_relevant(flxInterfaceAddress *a);
gboolean flx_interface_match(flxInterface *i, gint index, guchar protocol);
+typedef void (*flxInterfaceMonitorWalkCallback)(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata);
+
+void flx_interface_monitor_walk(flxInterfaceMonitor *m, gint index, guchar protocol, flxInterfaceMonitorWalkCallback callback, gpointer userdata);
+
#endif
diff --git a/main.c b/main.c
index 47f3cd7..367dcd3 100644
--- a/main.c
+++ b/main.c
@@ -4,6 +4,7 @@
#include "flx.h"
#include "server.h"
+#include "subscribe.h"
static gboolean quit_timeout(gpointer data) {
g_main_loop_quit(data);
@@ -31,15 +32,36 @@ static gboolean dump_timeout(gpointer data) {
return TRUE;
}
+static void subscription(flxSubscription *s, flxRecord *r, gint interface, guchar protocol, flxSubscriptionEvent event, gpointer userdata) {
+ gchar *t;
+
+ g_assert(s);
+ g_assert(r);
+ g_assert(interface > 0);
+ g_assert(protocol != AF_UNSPEC);
+
+ g_message("SUBSCRIPTION: record [%s] on %i.%i is %s", t = flx_record_to_string(r), interface, protocol,
+ event == FLX_SUBSCRIPTION_NEW ? "new" : (event == FLX_SUBSCRIPTION_CHANGE ? "changed" : "removed"));
+
+ g_free(t);
+}
+
+
int main(int argc, char *argv[]) {
flxServer *flx;
gchar *r;
GMainLoop *loop = NULL;
+ flxSubscription *s;
+ flxKey *k;
flx = flx_server_new(NULL);
flx_server_add_text(flx, 0, 0, AF_UNSPEC, FALSE, NULL, "hallo");
+ k = flx_key_new("_http._tcp.local.", FLX_DNS_CLASS_IN, FLX_DNS_TYPE_PTR);
+ s = flx_subscription_new(flx, k, 0, AF_UNSPEC, subscription, NULL);
+ flx_key_unref(k);
+
loop = g_main_loop_new(NULL, FALSE);
g_timeout_add(1000*60, quit_timeout, loop);
@@ -50,7 +72,7 @@ int main(int argc, char *argv[]) {
g_main_loop_unref(loop);
-
+ flx_subscription_free(s);
flx_server_free(flx);
return 0;
diff --git a/psched.c b/psched.c
index ebae9d3..db72ec3 100644
--- a/psched.c
+++ b/psched.c
@@ -214,7 +214,7 @@ static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *recor
g_assert(record);
for (rj = s->response_jobs; rj; rj = rj->jobs_next)
- if (flx_record_equal(rj->record, record))
+ if (flx_record_equal_no_ttl(rj->record, record))
return rj;
return NULL;
@@ -270,7 +270,7 @@ void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record
g_assert(record);
for (rj = s->response_jobs; rj; rj = rj->jobs_next)
- if (flx_record_equal(rj->record, record)) {
+ if (flx_record_equal_no_ttl(rj->record, record)) {
if (!rj->done) {
GTimeVal tv;
diff --git a/rr.c b/rr.c
index c984934..4108c54 100644
--- a/rr.c
+++ b/rr.c
@@ -50,12 +50,11 @@ flxRecord *flx_record_new(flxKey *k, gconstpointer data, guint16 size, guint32 t
g_assert(k);
g_assert(data);
- g_assert(size > 0);
r = g_new(flxRecord, 1);
r->ref = 1;
r->key = flx_key_ref(k);
- r->data = g_memdup(data, size);
+ r->data = size > 0 ? g_memdup(data, size) : NULL;
r->size = size;
r->ttl = ttl;
@@ -154,10 +153,17 @@ gchar *flx_record_to_string(flxRecord *r) {
}
case FLX_DNS_TYPE_TXT: {
- g_assert(((guchar*) r->data)[0] == r->size-1);
- memcpy(t, r->data+1, ((guchar*) r->data)[0]);
- t[((guchar*) r->data)[0]] = 0;
+ if (r->size == 0)
+ t[0] = 0;
+ else {
+ guchar l = ((guchar*) r->data)[0];
+
+ if ((size_t) l+1 <= r->size) {
+ memcpy(t, r->data+1, ((guchar*) r->data)[0]);
+ t[((guchar*) r->data)[0]] = 0;
+ }
+ }
break;
}
@@ -203,7 +209,7 @@ gchar *flx_record_to_string(flxRecord *r) {
}
p = flx_key_to_string(r->key);
- s = g_strdup_printf("[%s %s ; ttl=%u]", p, t, r->ttl);
+ s = g_strdup_printf("%s %s ; ttl=%u", p, t, r->ttl);
g_free(p);
return s;
@@ -224,12 +230,12 @@ guint flx_key_hash(const flxKey *k) {
return g_str_hash(k->name) + k->type + k->class;
}
-gboolean flx_record_equal(const flxRecord *a, const flxRecord *b) {
+gboolean flx_record_equal_no_ttl(const flxRecord *a, const flxRecord *b) {
g_assert(a);
g_assert(b);
return flx_key_equal(a->key, b->key) &&
- a->ttl == b->ttl &&
+/* a->ttl == b->ttl && */
a->size == b->size &&
- memcmp(a->data, b->data, a->size) == 0;
+ (a->size == 0 || memcmp(a->data, b->data, a->size) == 0);
}
diff --git a/rr.h b/rr.h
index a31b632..f17c36f 100644
--- a/rr.h
+++ b/rr.h
@@ -56,6 +56,6 @@ const gchar *flx_dns_type_to_string(guint16 type);
gchar *flx_key_to_string(flxKey *k); /* g_free() the result! */
gchar *flx_record_to_string(flxRecord *r); /* g_free() the result! */
-gboolean flx_record_equal(const flxRecord *a, const flxRecord *b);
+gboolean flx_record_equal_no_ttl(const flxRecord *a, const flxRecord *b);
#endif
diff --git a/server.c b/server.c
index 0aaaed1..94c7b1d 100644
--- a/server.c
+++ b/server.c
@@ -8,7 +8,7 @@
#include "util.h"
#include "iface.h"
#include "socket.h"
-
+#include "subscribe.h"
static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flxAddress *a) {
flxServerEntry *e;
@@ -71,7 +71,9 @@ static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, cons
g_free(txt);
flx_cache_update(i->cache, record, cache_flush, a);
- flx_packet_scheduler_drop_response(i->scheduler, record);
+
+ if (record->ttl != 0)
+ flx_packet_scheduler_drop_response(i->scheduler, record);
flx_record_unref(record);
}
}
@@ -258,10 +260,13 @@ flxServer *flx_server_new(GMainContext *c) {
s->context = g_main_context_default();
s->current_id = 1;
+
+ FLX_LLIST_HEAD_INIT(flxServerEntry, s->entries);
s->rrset_by_id = g_hash_table_new(g_int_hash, g_int_equal);
s->rrset_by_key = g_hash_table_new((GHashFunc) flx_key_hash, (GEqualFunc) flx_key_equal);
- FLX_LLIST_HEAD_INIT(flxServerEntry, s->entries);
+ FLX_LLIST_HEAD_INIT(flxSubscription, s->subscriptions);
+ s->subscription_hashtable = g_hash_table_new((GHashFunc) flx_key_hash, (GEqualFunc) flx_key_equal);
s->monitor = flx_interface_monitor_new(s);
s->time_event_queue = flx_time_event_queue_new(s->context);
@@ -300,6 +305,10 @@ void flx_server_free(flxServer* s) {
flx_interface_monitor_free(s->monitor);
flx_server_remove(s, 0);
+
+ while (s->subscriptions)
+ flx_subscription_free(s->subscriptions);
+ g_hash_table_destroy(s->subscription_hashtable);
g_hash_table_destroy(s->rrset_by_id);
g_hash_table_destroy(s->rrset_by_key);
@@ -532,60 +541,36 @@ void flx_server_add_text(
flx_server_add_full(s, id, interface, protocol, unique, name, FLX_DNS_CLASS_IN, FLX_DNS_TYPE_TXT, buf, l+1, FLX_DEFAULT_TTL);
}
+static void post_query_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
+ flxKey *k = userdata;
+
+ g_assert(m);
+ g_assert(i);
+ g_assert(k);
+
+ flx_interface_post_query(i, k);
+}
+
void flx_server_post_query(flxServer *s, gint interface, guchar protocol, flxKey *key) {
g_assert(s);
g_assert(key);
-
- if (interface > 0) {
- if (protocol != AF_UNSPEC) {
- flxInterface *i;
-
- if ((i = flx_interface_monitor_get_interface(s->monitor, interface, protocol)))
- flx_interface_post_query(i, key);
- } else {
- flxHwInterface *hw;
- flxInterface *i;
-
- if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, interface)))
- for (i = hw->interfaces; i; i = i->by_hardware_next)
- if (flx_interface_match(i, interface, protocol))
- flx_interface_post_query(i, key);
- }
-
- } else {
- flxInterface *i;
-
- for (i = s->monitor->interfaces; i; i = i->interface_next)
- if (flx_interface_match(i, interface, protocol))
- flx_interface_post_query(i, key);
- }
+
+ flx_interface_monitor_walk(s->monitor, interface, protocol, post_query_callback, key);
+}
+
+static void post_response_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
+ flxRecord *r = userdata;
+
+ g_assert(m);
+ g_assert(i);
+ g_assert(r);
+
+ flx_interface_post_response(i, r);
}
void flx_server_post_response(flxServer *s, gint interface, guchar protocol, flxRecord *record) {
g_assert(s);
g_assert(record);
-
- if (interface > 0) {
- if (protocol != AF_UNSPEC) {
- flxInterface *i;
-
- if ((i = flx_interface_monitor_get_interface(s->monitor, interface, protocol)))
- flx_interface_post_response(i, record);
- } else {
- flxHwInterface *hw;
- flxInterface *i;
-
- if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, interface)))
- for (i = hw->interfaces; i; i = i->by_hardware_next)
- if (flx_interface_match(i, interface, protocol))
- flx_interface_post_response(i, record);
- }
-
- } else {
- flxInterface *i;
-
- for (i = s->monitor->interfaces; i; i = i->interface_next)
- if (flx_interface_match(i, interface, protocol))
- flx_interface_post_response(i, record);
- }
+
+ flx_interface_monitor_walk(s->monitor, interface, protocol, post_response_callback, record);
}
diff --git a/server.h b/server.h
index b7addf6..d050bcc 100644
--- a/server.h
+++ b/server.h
@@ -9,6 +9,7 @@ typedef struct _flxServerEntry flxServerEntry;
#include "llist.h"
#include "timeeventq.h"
#include "announce.h"
+#include "subscribe.h"
struct _flxServerEntry {
flxRecord *record;
@@ -31,10 +32,12 @@ struct _flxServer {
gint current_id;
+ FLX_LLIST_HEAD(flxServerEntry, entries);
GHashTable *rrset_by_id;
GHashTable *rrset_by_key;
- FLX_LLIST_HEAD(flxServerEntry, entries);
+ FLX_LLIST_HEAD(flxSubscription, subscriptions);
+ GHashTable *subscription_hashtable;
flxTimeEventQueue *time_event_queue;
diff --git a/subscribe.c b/subscribe.c
new file mode 100644
index 0000000..3abe464
--- /dev/null
+++ b/subscribe.c
@@ -0,0 +1,103 @@
+#include "subscribe.h"
+#include "util.h"
+
+static void elapse(flxTimeEvent *e, void *userdata) {
+ flxSubscription *s = userdata;
+ GTimeVal tv;
+ gchar *t;
+
+ g_assert(s);
+
+ flx_server_post_query(s->server, s->interface, s->protocol, s->key);
+
+ if (s->n_query++ <= 8)
+ s->sec_delay *= 2;
+
+ g_message("%i. Continuous querying for %s", s->n_query, t = flx_key_to_string(s->key));
+ g_free(t);
+
+
+ flx_elapse_time(&tv, s->sec_delay*1000, 0);
+ flx_time_event_queue_update(s->server->time_event_queue, s->time_event, &tv);
+}
+
+static void scan_cache_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
+ flxSubscription *s = userdata;
+ flxCacheEntry *e;
+
+ g_assert(m);
+ g_assert(i);
+ g_assert(s);
+
+ for (e = flx_cache_lookup_key(i->cache, s->key); e; e = e->by_name_next)
+ s->callback(s, e->record, i->hardware->index, i->protocol, FLX_SUBSCRIPTION_NEW, s->userdata);
+}
+
+flxSubscription *flx_subscription_new(flxServer *server, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata) {
+ flxSubscription *s, *t;
+ GTimeVal tv;
+
+ g_assert(server);
+ g_assert(key);
+ g_assert(callback);
+
+ s = g_new(flxSubscription, 1);
+ s->server = server;
+ s->key = flx_key_ref(key);
+ s->interface = interface;
+ s->protocol = protocol;
+ s->callback = callback;
+ s->userdata = userdata;
+ s->n_query = 1;
+ s->sec_delay = 1;
+
+ flx_server_post_query(s->server, s->interface, s->protocol, s->key);
+
+ flx_elapse_time(&tv, s->sec_delay*1000, 0);
+ s->time_event = flx_time_event_queue_add(server->time_event_queue, &tv, elapse, s);
+
+ FLX_LLIST_PREPEND(flxSubscription, subscriptions, server->subscriptions, s);
+
+ /* Add the new entry to the subscription hash table */
+ t = g_hash_table_lookup(server->subscription_hashtable, key);
+ FLX_LLIST_PREPEND(flxSubscription, by_key, t, s);
+ g_hash_table_replace(server->subscription_hashtable, key, t);
+
+ /* Scan the caches */
+ flx_interface_monitor_walk(s->server->monitor, s->interface, s->protocol, scan_cache_callback, s);
+
+ return s;
+}
+
+void flx_subscription_free(flxSubscription *s) {
+ flxSubscription *t;
+
+ g_assert(s);
+
+ FLX_LLIST_REMOVE(flxSubscription, subscriptions, s->server->subscriptions, s);
+
+ t = g_hash_table_lookup(s->server->subscription_hashtable, s->key);
+ FLX_LLIST_REMOVE(flxSubscription, by_key, t, s);
+ if (t)
+ g_hash_table_replace(s->server->subscription_hashtable, t->key, t);
+ else
+ g_hash_table_remove(s->server->subscription_hashtable, s->key);
+
+ flx_time_event_queue_remove(s->server->time_event_queue, s->time_event);
+ flx_key_unref(s->key);
+
+
+ g_free(s);
+}
+
+void flx_subscription_notify(flxServer *server, flxInterface *i, flxRecord *record, flxSubscriptionEvent event) {
+ flxSubscription *s;
+
+ g_assert(server);
+ g_assert(record);
+
+ for (s = g_hash_table_lookup(server->subscription_hashtable, record->key); s; s = s->by_key_next)
+ if (flx_interface_match(i, s->interface, s->protocol))
+ s->callback(s, record, i->hardware->index, i->protocol, event, s->userdata);
+
+}
diff --git a/subscribe.h b/subscribe.h
new file mode 100644
index 0000000..75818c7
--- /dev/null
+++ b/subscribe.h
@@ -0,0 +1,39 @@
+#ifndef foosubscribehfoo
+#define foosubscribehfoo
+
+typedef struct _flxSubscription flxSubscription;
+
+#include "llist.h"
+#include "server.h"
+
+typedef enum {
+ FLX_SUBSCRIPTION_NEW,
+ FLX_SUBSCRIPTION_REMOVE,
+ FLX_SUBSCRIPTION_CHANGE
+} flxSubscriptionEvent;
+
+typedef void (*flxSubscriptionCallback)(flxSubscription *s, flxRecord *record, gint interface, guchar protocol, flxSubscriptionEvent event, gpointer userdata);
+
+struct _flxSubscription {
+ flxServer *server;
+ flxKey *key;
+ gint interface;
+ guchar protocol;
+ gint n_query;
+ guint sec_delay;
+
+ flxTimeEvent *time_event;
+
+ flxSubscriptionCallback callback;
+ gpointer userdata;
+
+ FLX_LLIST_FIELDS(flxSubscription, subscriptions);
+ FLX_LLIST_FIELDS(flxSubscription, by_key);
+};
+
+flxSubscription *flx_subscription_new(flxServer *s, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata);
+void flx_subscription_free(flxSubscription *s);
+
+void flx_subscription_notify(flxServer *s, flxInterface *i, flxRecord *record, flxSubscriptionEvent event);
+
+#endif