summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--dns.c103
-rw-r--r--dns.h14
-rw-r--r--iface.c56
-rw-r--r--iface.h12
-rw-r--r--psched.c287
-rw-r--r--psched.h48
-rw-r--r--rr.c10
-rw-r--r--rr.h2
-rw-r--r--server.c58
-rw-r--r--server.h7
-rw-r--r--socket.c18
-rw-r--r--timeeventq.c14
-rw-r--r--timeeventq.h5
-rw-r--r--util.c14
-rw-r--r--util.h2
16 files changed, 539 insertions, 113 deletions
diff --git a/Makefile b/Makefile
index 6dac8af..306bbfe 100644
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ LIBS=$(shell pkg-config --libs glib-2.0)
all: flexmdns prioq-test
-flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o
+flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o
$(CC) -o $@ $^ $(LIBS)
#test-llist: test-llist.o
diff --git a/dns.c b/dns.c
index a787723..6824fd3 100644
--- a/dns.c
+++ b/dns.c
@@ -4,11 +4,35 @@
#include "dns.h"
-flxDnsPacket* flx_dns_packet_new(void) {
+flxDnsPacket* flx_dns_packet_new(guint max_size) {
flxDnsPacket *p;
- p = g_new(flxDnsPacket, 1);
- p->size = p->rindex = 2*6;
- memset(p->data, 0, p->size);
+
+ if (max_size <= 0)
+ max_size = FLX_DNS_PACKET_MAX_SIZE;
+ else if (max_size < FLX_DNS_PACKET_HEADER_SIZE)
+ max_size = FLX_DNS_PACKET_HEADER_SIZE;
+
+ p = g_malloc(sizeof(flxDnsPacket) + max_size);
+ p->size = p->rindex = FLX_DNS_PACKET_HEADER_SIZE;
+ p->max_size = max_size;
+
+ memset(FLX_DNS_PACKET_DATA(p), 0, p->size);
+ return p;
+}
+
+flxDnsPacket* flx_dns_packet_new_query(guint max_size) {
+ flxDnsPacket *p;
+
+ p = flx_dns_packet_new(max_size);
+ flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0));
+ return p;
+}
+
+flxDnsPacket* flx_dns_packet_new_response(guint max_size) {
+ flxDnsPacket *p;
+
+ p = flx_dns_packet_new(max_size);
+ flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(1, 0, 0, 0, 0, 0, 0, 0, 0, 0));
return p;
}
@@ -19,30 +43,35 @@ void flx_dns_packet_free(flxDnsPacket *p) {
void flx_dns_packet_set_field(flxDnsPacket *p, guint index, guint16 v) {
g_assert(p);
- g_assert(index < 2*6);
+ g_assert(index < FLX_DNS_PACKET_HEADER_SIZE);
- ((guint16*) p->data)[index] = g_htons(v);
+ ((guint16*) FLX_DNS_PACKET_DATA(p))[index] = g_htons(v);
}
guint16 flx_dns_packet_get_field(flxDnsPacket *p, guint index) {
g_assert(p);
- g_assert(index < 2*6);
+ g_assert(index < FLX_DNS_PACKET_HEADER_SIZE);
- return g_ntohs(((guint16*) p->data)[index]);
+ return g_ntohs(((guint16*) FLX_DNS_PACKET_DATA(p))[index]);
}
guint8* flx_dns_packet_append_name(flxDnsPacket *p, const gchar *name) {
guint8 *d, *f = NULL;
+ guint saved_size;
g_assert(p);
g_assert(name);
+ saved_size = p->size;
+
for (;;) {
guint n = strcspn(name, ".");
if (!n || n > 63)
- return NULL;
+ goto fail;
- d = flx_dns_packet_extend(p, n+1);
+ if (!(d = flx_dns_packet_extend(p, n+1)))
+ goto fail;
+
if (!f)
f = d;
d[0] = n;
@@ -61,28 +90,36 @@ guint8* flx_dns_packet_append_name(flxDnsPacket *p, const gchar *name) {
break;
}
- d = flx_dns_packet_extend(p, 1);
+ if (!(d = flx_dns_packet_extend(p, 1)))
+ goto fail;
+
d[0] = 0;
return f;
+
+fail:
+ p->size = saved_size;
+ return NULL;
}
guint8* flx_dns_packet_append_uint16(flxDnsPacket *p, guint16 v) {
guint8 *d;
-
g_assert(p);
- d = flx_dns_packet_extend(p, sizeof(guint16));
- *((guint16*) d) = g_htons(v);
+ if (!(d = flx_dns_packet_extend(p, sizeof(guint16))))
+ return NULL;
+ *((guint16*) d) = g_htons(v);
return d;
}
guint8 *flx_dns_packet_append_uint32(flxDnsPacket *p, guint32 v) {
guint8 *d;
-
g_assert(p);
- d = flx_dns_packet_extend(p, sizeof(guint32));
+
+ if (!(d = flx_dns_packet_extend(p, sizeof(guint32))))
+ return NULL;
+
*((guint32*) d) = g_htonl(v);
return d;
@@ -95,21 +132,22 @@ guint8 *flx_dns_packet_append_bytes(flxDnsPacket *p, gconstpointer b, guint l)
g_assert(b);
g_assert(l);
- d = flx_dns_packet_extend(p, l);
- g_assert(d);
- memcpy(d, b, l);
+ if (!(d = flx_dns_packet_extend(p, l)))
+ return NULL;
+ memcpy(d, b, l);
return d;
}
-
guint8 *flx_dns_packet_extend(flxDnsPacket *p, guint l) {
guint8 *d;
g_assert(p);
- g_assert(p->size+l <= sizeof(p->data));
- d = p->data + p->size;
+ if (p->size+l > p->max_size)
+ return NULL;
+
+ d = FLX_DNS_PACKET_DATA(p) + p->size;
p->size += l;
return d;
@@ -123,13 +161,14 @@ guint8 *flx_dns_packet_append_name_compressed(flxDnsPacket *p, const gchar *name
if (!prev)
return flx_dns_packet_append_name(p, name);
- k = prev - p->data;
+ k = prev - FLX_DNS_PACKET_DATA(p);
if (k < 0 || k >= 0x4000 || (guint) k >= p->size)
return flx_dns_packet_append_name(p, name);
- d = (guint16*) flx_dns_packet_extend(p, sizeof(guint16));
- *d = g_htons((0xC000 | k));
+ if (!(d = (guint16*) flx_dns_packet_extend(p, sizeof(guint16))))
+ return NULL;
+ *d = g_htons((0xC000 | k));
return prev;
}
@@ -166,7 +205,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint
if (index+1 > p->size)
return -1;
- n = p->data[index];
+ n = FLX_DNS_PACKET_DATA(p)[index];
if (!n) {
index++;
@@ -197,7 +236,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint
} else
first_label = 0;
- memcpy(ret_name, p->data + index, n);
+ memcpy(ret_name, FLX_DNS_PACKET_DATA(p) + index, n);
index += n;
ret_name += n;
l -= n;
@@ -210,7 +249,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint
if (index+2 > p->size)
return -1;
- index = ((guint) (p->data[index] & ~0xC0)) << 8 | p->data[index+1];
+ index = ((guint) (FLX_DNS_PACKET_DATA(p)[index] & ~0xC0)) << 8 | FLX_DNS_PACKET_DATA(p)[index+1];
if (!compressed)
ret += 2;
@@ -238,7 +277,7 @@ gint flx_dns_packet_consume_uint16(flxDnsPacket *p, guint16 *ret_v) {
if (p->rindex + sizeof(guint16) > p->size)
return -1;
- *ret_v = g_ntohs(*((guint16*) (p->data + p->rindex)));
+ *ret_v = g_ntohs(*((guint16*) (FLX_DNS_PACKET_DATA(p) + p->rindex)));
p->rindex += sizeof(guint16);
return 0;
@@ -251,7 +290,7 @@ gint flx_dns_packet_consume_uint32(flxDnsPacket *p, guint32 *ret_v) {
if (p->rindex + sizeof(guint32) > p->size)
return -1;
- *ret_v = g_ntohl(*((guint32*) (p->data + p->rindex)));
+ *ret_v = g_ntohl(*((guint32*) (FLX_DNS_PACKET_DATA(p) + p->rindex)));
p->rindex += sizeof(guint32);
return 0;
@@ -265,7 +304,7 @@ gint flx_dns_packet_consume_bytes(flxDnsPacket *p, gpointer ret_data, guint l) {
if (p->rindex + l > p->size)
return -1;
- memcpy(ret_data, p->data + p->rindex, l);
+ memcpy(ret_data, FLX_DNS_PACKET_DATA(p) + p->rindex, l);
p->rindex += l;
return 0;
@@ -277,7 +316,7 @@ gconstpointer flx_dns_packet_get_rptr(flxDnsPacket *p) {
if (p->rindex >= p->size)
return NULL;
- return p->data + p->rindex;
+ return FLX_DNS_PACKET_DATA(p) + p->rindex;
}
gint flx_dns_packet_skip(flxDnsPacket *p, guint length) {
diff --git a/dns.h b/dns.h
index bdb0f07..0b6e750 100644
--- a/dns.h
+++ b/dns.h
@@ -5,14 +5,20 @@
#include "rr.h"
-#define FLX_DNS_MAX_PACKET_SIZE 9000
+#define FLX_DNS_PACKET_MAX_SIZE 9000
+#define FLX_DNS_PACKET_HEADER_SIZE 12
typedef struct _flxDnsPacket {
- guint size, rindex;
- guint8 data[FLX_DNS_MAX_PACKET_SIZE];
+ guint size, rindex, max_size;
} flxDnsPacket;
-flxDnsPacket* flx_dns_packet_new(void);
+
+#define FLX_DNS_PACKET_DATA(p) (((guint8*) p) + sizeof(flxDnsPacket))
+
+flxDnsPacket* flx_dns_packet_new(guint size);
+flxDnsPacket* flx_dns_packet_new_query(guint size);
+flxDnsPacket* flx_dns_packet_new_response(guint size);
+
void flx_dns_packet_free(flxDnsPacket *p);
void flx_dns_packet_set_field(flxDnsPacket *p, guint index, guint16 v);
guint16 flx_dns_packet_get_field(flxDnsPacket *p, guint index);
diff --git a/iface.c b/iface.c
index 9acef47..c4bd0ff 100644
--- a/iface.c
+++ b/iface.c
@@ -58,6 +58,11 @@ static void free_interface(flxInterfaceMonitor *m, flxInterface *i) {
g_assert(m);
g_assert(i);
+ if (i->ipv4_scheduler)
+ flx_packet_scheduler_free(i->ipv4_scheduler);
+ if (i->ipv6_scheduler)
+ flx_packet_scheduler_free(i->ipv6_scheduler);
+
while (i->addresses)
free_address(m, i->addresses);
@@ -140,8 +145,11 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) {
i->n_ipv4_addrs = i->n_ipv6_addrs = 0;
FLX_LLIST_PREPEND(flxInterface, interface, m->interfaces, i);
g_hash_table_insert(m->hash_table, &i->index, i);
+ i->mtu = 1500;
i->ipv4_cache = flx_cache_new(m->server, i);
i->ipv6_cache = flx_cache_new(m->server, i);
+ i->ipv4_scheduler = flx_packet_scheduler_new(m->server, i, AF_INET);
+ i->ipv6_scheduler = flx_packet_scheduler_new(m->server, i, AF_INET6);
changed = 0;
}
@@ -157,6 +165,11 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) {
g_free(i->name);
i->name = g_strndup(RTA_DATA(a), RTA_PAYLOAD(a));
break;
+
+ case IFLA_MTU:
+ g_assert(RTA_PAYLOAD(a) == sizeof(unsigned int));
+ i->mtu = *((unsigned int*) RTA_DATA(a));
+ break;
default:
;
@@ -347,44 +360,47 @@ int flx_address_is_relevant(flxInterfaceAddress *a) {
void flx_interface_send_packet(flxInterface *i, guchar protocol, flxDnsPacket *p) {
g_assert(i);
g_assert(p);
+
+ if (!flx_interface_is_relevant(i))
+ return;
- if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0 && flx_interface_is_relevant(i)) {
+ if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0) {
g_message("sending on '%s':IPv4", i->name);
flx_send_dns_packet_ipv4(i->monitor->server->fd_ipv4, i->index, p);
}
- if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0 && flx_interface_is_relevant(i)) {
+ if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0) {
g_message("sending on '%s':IPv6", i->name);
flx_send_dns_packet_ipv6(i->monitor->server->fd_ipv6, i->index, p);
}
}
-void flx_interface_send_query(flxInterface *i, guchar protocol, flxKey *k) {
- flxDnsPacket *p;
-
+void flx_interface_post_query(flxInterface *i, guchar protocol, flxKey *k) {
g_assert(i);
g_assert(k);
- p = flx_dns_packet_new();
- flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0));
- flx_dns_packet_append_key(p, k);
- flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, 1);
- flx_interface_send_packet(i, protocol, p);
- flx_dns_packet_free(p);
+ if (!flx_interface_is_relevant(i))
+ return;
+
+ if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0)
+ flx_packet_scheduler_post_query(i->ipv4_scheduler, k);
+
+ if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0)
+ flx_packet_scheduler_post_query(i->ipv6_scheduler, k);
}
-void flx_interface_send_response(flxInterface *i, guchar protocol, flxRecord *rr) {
- flxDnsPacket *p;
-
+void flx_interface_post_response(flxInterface *i, guchar protocol, flxRecord *rr) {
g_assert(i);
g_assert(rr);
- p = flx_dns_packet_new();
- flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(1, 0, 0, 0, 0, 0, 0, 0, 0, 0));
- flx_dns_packet_append_record(p, rr, FALSE);
- flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, 1);
- flx_interface_send_packet(i, protocol, p);
- flx_dns_packet_free(p);
+ if (!flx_interface_is_relevant(i))
+ return;
+
+ if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0)
+ flx_packet_scheduler_post_response(i->ipv4_scheduler, rr);
+
+ if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0)
+ flx_packet_scheduler_post_response(i->ipv6_scheduler, rr);
}
void flx_dump_caches(flxServer *s, FILE *f) {
diff --git a/iface.h b/iface.h
index 2adab72..50cc98f 100644
--- a/iface.h
+++ b/iface.h
@@ -17,6 +17,8 @@ typedef struct _flxInterface flxInterface;
#include "netlink.h"
#include "cache.h"
#include "llist.h"
+#include "psched.h"
+#include "dns.h"
struct _flxInterfaceMonitor {
flxServer *server;
@@ -41,6 +43,10 @@ struct _flxInterface {
guint n_ipv6_addrs, n_ipv4_addrs;
flxCache *ipv4_cache, *ipv6_cache;
+
+ guint mtu;
+
+ flxPacketScheduler *ipv4_scheduler, *ipv6_scheduler;
};
struct _flxInterfaceAddress {
@@ -64,8 +70,10 @@ flxInterface* flx_interface_monitor_get_first(flxInterfaceMonitor *m);
int flx_interface_is_relevant(flxInterface *i);
int flx_address_is_relevant(flxInterfaceAddress *a);
-void flx_interface_send_query(flxInterface *i, guchar protocol, flxKey *k);
-void flx_interface_send_response(flxInterface *i, guchar protocol, flxRecord *rr);
+void flx_interface_send_packet(flxInterface *i, guchar protocol, flxDnsPacket *p);
+
+void flx_interface_post_query(flxInterface *i, guchar protocol, flxKey *k);
+void flx_interface_post_response(flxInterface *i, guchar protocol, flxRecord *rr);
void flx_dump_caches(flxServer *s, FILE *f);
diff --git a/psched.c b/psched.c
new file mode 100644
index 0000000..7b74053
--- /dev/null
+++ b/psched.c
@@ -0,0 +1,287 @@
+#include "util.h"
+#include "psched.h"
+
+flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i, guchar protocol) {
+ flxPacketScheduler *s;
+
+ g_assert(server);
+ g_assert(i);
+
+ s = g_new(flxPacketScheduler, 1);
+ s->server = server;
+ s->interface = i;
+ s->protocol = protocol;
+
+ FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
+ FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
+
+ return s;
+}
+
+static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
+ g_assert(qj);
+
+ if (qj->time_event)
+ flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
+
+ FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
+
+ flx_key_unref(qj->key);
+ g_free(qj);
+}
+
+static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
+ g_assert(rj);
+
+ if (rj->time_event)
+ flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
+
+ FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
+
+ flx_record_unref(rj->record);
+ g_free(rj);
+}
+
+void flx_packet_scheduler_free(flxPacketScheduler *s) {
+ flxQueryJob *qj;
+ flxResponseJob *rj;
+ flxTimeEvent *e;
+
+ g_assert(s);
+
+ while ((qj = s->query_jobs))
+ query_job_free(s, qj);
+ while ((rj = s->response_jobs))
+ response_job_free(s, rj);
+
+ g_free(s);
+}
+
+static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
+ guint8 *d;
+
+ g_assert(s);
+ g_assert(p);
+ g_assert(qj);
+
+ if ((d = flx_dns_packet_append_key(p, qj->key))) {
+ GTimeVal tv;
+
+ qj->done = 1;
+
+ /* Drop query after 100ms from history */
+ flx_elapse_time(&tv, 100, 0);
+ flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
+ }
+
+ return d;
+}
+
+static void query_elapse(flxTimeEvent *e, gpointer data) {
+ flxQueryJob *qj = data;
+ flxPacketScheduler *s;
+ flxDnsPacket *p;
+ guint n;
+ guint8 *d;
+
+ g_assert(qj);
+ s = qj->scheduler;
+
+ if (qj->done) {
+ /* Lets remove it from the history */
+ query_job_free(s, qj);
+ return;
+ }
+
+ p = flx_dns_packet_new_query(s->interface->mtu - 200);
+ d = packet_add_query_job(s, p, qj);
+ g_assert(d);
+ n = 1;
+
+ /* Try to fill up packet with more queries, if available */
+ for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
+
+ if (qj->done)
+ continue;
+
+ if (!packet_add_query_job(s, p, qj))
+ break;
+
+ n++;
+ }
+
+ flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
+ flx_interface_send_packet(s->interface, s->protocol, p);
+ flx_dns_packet_free(p);
+}
+
+static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
+ flxQueryJob *qj;
+
+ g_assert(s);
+ g_assert(key);
+
+ for (qj = s->query_jobs; qj; qj = qj->jobs_next)
+ if (flx_key_equal(qj->key, key))
+ return qj;
+
+ return NULL;
+}
+
+void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key) {
+ flxQueryJob *qj;
+ GTimeVal tv;
+
+ g_assert(s);
+ g_assert(key);
+
+ if (look_for_query(s, key))
+ return;
+
+ qj = g_new(flxQueryJob, 1);
+ qj->key = flx_key_ref(key);
+ qj->done = FALSE;
+
+ flx_elapse_time(&tv, 100, 0);
+ qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
+ qj->scheduler = s;
+
+ FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
+}
+
+static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
+ guint8 *d;
+
+ g_assert(s);
+ g_assert(p);
+ g_assert(rj);
+
+ if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
+ GTimeVal tv;
+
+ rj->done = 1;
+
+ /* Drop response after 1s from history */
+ flx_elapse_time(&tv, 1000, 0);
+ flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
+ }
+
+ return d;
+}
+
+
+static void response_elapse(flxTimeEvent *e, gpointer data) {
+ flxResponseJob *rj = data;
+ flxPacketScheduler *s;
+ flxDnsPacket *p;
+ guint n;
+ guint8 *d;
+
+ g_assert(rj);
+ s = rj->scheduler;
+
+ if (rj->done) {
+ /* Lets remove it from the history */
+ response_job_free(s, rj);
+ return;
+ }
+
+ p = flx_dns_packet_new_response(s->interface->mtu - 200);
+ d = packet_add_response_job(s, p, rj);
+ g_assert(d);
+ n = 1;
+
+ /* Try to fill up packet with more responses, if available */
+ for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
+
+ if (rj->done)
+ continue;
+
+ if (!packet_add_response_job(s, p, rj))
+ break;
+
+ n++;
+ }
+
+ flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
+ flx_interface_send_packet(s->interface, s->protocol, p);
+ flx_dns_packet_free(p);
+}
+
+static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
+ flxResponseJob *rj;
+
+ g_assert(s);
+ g_assert(record);
+
+ for (rj = s->response_jobs; rj; rj = rj->jobs_next)
+ if (flx_record_equal(rj->record, record))
+ return rj;
+
+ return NULL;
+}
+
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record) {
+ flxResponseJob *rj;
+ GTimeVal tv;
+
+ g_assert(s);
+ g_assert(record);
+
+ if (look_for_response(s, record))
+ return;
+
+ rj = g_new(flxResponseJob, 1);
+ rj->record = flx_record_ref(record);
+ rj->done = FALSE;
+
+ flx_elapse_time(&tv, 20, 100);
+ rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
+ rj->scheduler = s;
+
+ FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
+}
+
+void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key) {
+ flxQueryJob *qj;
+
+ g_assert(s);
+ g_assert(key);
+
+ for (qj = s->query_jobs; qj; qj = qj->jobs_next)
+ if (flx_key_equal(qj->key, key)) {
+
+ if (!qj->done) {
+ GTimeVal tv;
+ qj->done = TRUE;
+
+ /* Drop query after 100ms from history */
+ flx_elapse_time(&tv, 100, 0);
+ flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
+ }
+
+ break;
+ }
+}
+
+void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record) {
+ flxResponseJob *rj;
+
+ g_assert(s);
+ g_assert(record);
+
+ for (rj = s->response_jobs; rj; rj = rj->jobs_next)
+ if (flx_record_equal(rj->record, record)) {
+
+ if (!rj->done) {
+ GTimeVal tv;
+ rj->done = TRUE;
+
+ /* Drop response after 100ms from history */
+ flx_elapse_time(&tv, 100, 0);
+ flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
+ }
+
+ break;
+ }
+}
diff --git a/psched.h b/psched.h
new file mode 100644
index 0000000..90b67db
--- /dev/null
+++ b/psched.h
@@ -0,0 +1,48 @@
+#ifndef foopschedhfoo
+#define foopschedhfoo
+
+typedef struct _flxQueryJob flxQueryJob;
+typedef struct _flxResponseJob flxResponseJob;
+typedef struct _flxPacketScheduler flxPacketScheduler;
+
+#include "timeeventq.h"
+#include "rr.h"
+#include "llist.h"
+#include "iface.h"
+
+struct _flxQueryJob {
+ flxPacketScheduler *scheduler;
+ flxTimeEvent *time_event;
+ flxKey *key;
+ gboolean done;
+ FLX_LLIST_FIELDS(flxQueryJob, jobs);
+};
+
+struct _flxResponseJob {
+ flxPacketScheduler *scheduler;
+ flxTimeEvent *time_event;
+ flxRecord *record;
+ gboolean done;
+ FLX_LLIST_FIELDS(flxResponseJob, jobs);
+};
+
+struct _flxPacketScheduler {
+ flxServer *server;
+
+ flxInterface *interface;
+ guchar protocol;
+
+ FLX_LLIST_HEAD(flxQueryJob, query_jobs);
+ FLX_LLIST_HEAD(flxResponseJob, response_jobs);
+};
+
+flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i, guchar protocol);
+void flx_packet_scheduler_free(flxPacketScheduler *s);
+
+void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key);
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record);
+
+void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key);
+void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record);
+
+#endif
diff --git a/rr.c b/rr.c
index 0153b28..5acc09b 100644
--- a/rr.c
+++ b/rr.c
@@ -164,3 +164,13 @@ guint flx_key_hash(const flxKey *k) {
return g_str_hash(k->name) + k->type + k->class;
}
+
+gboolean flx_record_equal(const flxRecord *a, const flxRecord *b) {
+ g_assert(a);
+ g_assert(b);
+
+ return flx_key_equal(a->key, b->key) &&
+ a->ttl == b->ttl &&
+ a->size == b->size &&
+ memcmp(a->data, b->data, a->size) == 0;
+}
diff --git a/rr.h b/rr.h
index 0e61a73..e3d3654 100644
--- a/rr.h
+++ b/rr.h
@@ -55,4 +55,6 @@ const gchar *flx_dns_type_to_string(guint16 type);
gchar *flx_key_to_string(flxKey *k); /* g_free() the result! */
gchar *flx_record_to_string(flxRecord *r); /* g_free() the result! */
+gboolean flx_record_equal(const flxRecord *a, const flxRecord *b);
+
#endif
diff --git a/server.c b/server.c
index 6027f5f..0f292a3 100644
--- a/server.c
+++ b/server.c
@@ -9,45 +9,33 @@
#include "iface.h"
#include "socket.h"
-static void post_response(flxServer *s, flxRecord *r, gint iface, const flxAddress *a) {
- flxInterface *i;
-
- g_assert(s);
- g_assert(r);
- g_assert(iface > 0);
- g_assert(a);
-
- if ((i = flx_interface_monitor_get_interface(s->monitor, iface)))
- flx_interface_send_response(i, a->family, r);
-
-}
-
-static void handle_query_key(flxServer *s, flxKey *k, gint iface, const flxAddress *a) {
+static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flxAddress *a) {
flxEntry *e;
g_assert(s);
g_assert(k);
+ g_assert(i);
g_assert(a);
for (e = g_hash_table_lookup(s->rrset_by_name, k); e; e = e->by_name_next) {
- if ((e->interface <= 0 || e->interface == iface) &&
+ if ((e->interface <= 0 || e->interface == i->index) &&
(e->protocol == AF_UNSPEC || e->protocol == a->family)) {
- post_response(s, e->record, iface, a);
+ flx_interface_post_response(i, a->family, e->record);
}
}
}
-static void handle_query(flxServer *s, flxDnsPacket *p, gint iface, const flxAddress *a) {
+static void handle_query(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) {
guint n;
g_assert(s);
g_assert(p);
+ g_assert(i);
g_assert(a);
for (n = flx_dns_packet_get_field(p, DNS_FIELD_QDCOUNT); n > 0; n --) {
-
flxKey *key;
if (!(key = flx_dns_packet_consume_key(p))) {
@@ -55,29 +43,31 @@ static void handle_query(flxServer *s, flxDnsPacket *p, gint iface, const flxAdd
return;
}
- handle_query_key(s, key, iface, a);
+ handle_query_key(s, key, i, a);
flx_key_unref(key);
}
}
-static void add_response_to_cache(flxCache *c, flxDnsPacket *p, const flxAddress *a) {
+static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) {
guint n;
- g_assert(c);
+ g_assert(s);
g_assert(p);
+ g_assert(i);
g_assert(a);
+
for (n = flx_dns_packet_get_field(p, DNS_FIELD_ANCOUNT); n > 0; n--) {
-
- flxRecord *rr;
+ flxRecord *record;
gboolean cache_flush = FALSE;
- if (!(rr = flx_dns_packet_consume_record(p, &cache_flush))) {
+ if (!(record = flx_dns_packet_consume_record(p, &cache_flush))) {
g_warning("Packet too short");
return;
}
- flx_cache_update(c, rr, cache_flush, a);
- flx_record_unref(rr);
+ flx_cache_update(a->family == AF_INET ? i->ipv4_cache : i->ipv6_cache, record, cache_flush, a);
+ flx_packet_scheduler_drop_response(a->family == AF_INET ? i->ipv4_scheduler : i->ipv6_scheduler, record);
+ flx_record_unref(record);
}
}
@@ -120,8 +110,8 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa,
return;
}
-
flx_address_from_sockaddr(sa, &a);
+
if (flx_dns_packet_is_query(p)) {
@@ -132,11 +122,9 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa,
return;
}
- handle_query(s, p, iface, &a);
+ handle_query(s, p, i, &a);
g_message("Handled query");
} else {
- flxCache *c;
-
if (flx_dns_packet_get_field(p, DNS_FIELD_QDCOUNT) != 0 ||
flx_dns_packet_get_field(p, DNS_FIELD_ANCOUNT) == 0 ||
flx_dns_packet_get_field(p, DNS_FIELD_NSCOUNT) != 0 ||
@@ -145,10 +133,8 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa,
return;
}
- c = a.family == AF_INET ? i->ipv4_cache : i->ipv6_cache;
- add_response_to_cache(c, p, &a);
-
- g_message("Handled responnse");
+ handle_response(s, p, i, &a);
+ g_message("Handled response");
}
}
@@ -528,7 +514,7 @@ void flx_server_send_query(flxServer *s, gint interface, guchar protocol, flxKey
flxInterface *i;
for (i = flx_interface_monitor_get_first(s->monitor); i; i = i->interface_next)
- flx_interface_send_query(i, protocol, k);
+ flx_interface_post_query(i, protocol, k);
} else {
flxInterface *i;
@@ -536,6 +522,6 @@ void flx_server_send_query(flxServer *s, gint interface, guchar protocol, flxKey
if (!(i = flx_interface_monitor_get_interface(s->monitor, interface)))
return;
- flx_interface_send_query(i, protocol, k);
+ flx_interface_post_query(i, protocol, k);
}
}
diff --git a/server.h b/server.h
index 63f2bf6..fbd5715 100644
--- a/server.h
+++ b/server.h
@@ -2,7 +2,6 @@
#define fooflxserverhfoo
typedef struct _flxEntry flxEntry;
-typedef struct _flxResponseJob flxResponseJob;
#include "flx.h"
#include "iface.h"
@@ -23,12 +22,6 @@ struct _flxEntry {
FLX_LLIST_FIELDS(flxEntry, by_id);
};
-struct _flxResponseJob {
- flxTimeEvent *time_event;
- flxRecord *record;
- FLX_LLIST_FIELDS(flxResponseJob, response);
-};
-
struct _flxServer {
GMainContext *context;
flxInterfaceMonitor *monitor;
diff --git a/socket.c b/socket.c
index 1887e1d..cc43f16 100644
--- a/socket.c
+++ b/socket.c
@@ -252,7 +252,7 @@ gint flx_send_dns_packet_ipv4(gint fd, gint interface, flxDnsPacket *p) {
mdns_mcast_group_ipv4(&sa);
memset(&io, 0, sizeof(io));
- io.iov_base = p->data;
+ io.iov_base = FLX_DNS_PACKET_DATA(p);
io.iov_len = p->size;
memset(cmsg_data, 0, sizeof(cmsg_data));
@@ -292,7 +292,7 @@ gint flx_send_dns_packet_ipv6(gint fd, gint interface, flxDnsPacket *p) {
mdns_mcast_group_ipv6(&sa);
memset(&io, 0, sizeof(io));
- io.iov_base = p->data;
+ io.iov_base = FLX_DNS_PACKET_DATA(p);
io.iov_len = p->size;
memset(cmsg_data, 0, sizeof(cmsg_data));
@@ -330,10 +330,10 @@ flxDnsPacket* flx_recv_dns_packet_ipv4(gint fd, struct sockaddr_in *ret_sa, gint
g_assert(ret_iface);
g_assert(ret_ttl);
- p = flx_dns_packet_new();
+ p = flx_dns_packet_new(0);
- io.iov_base = p->data;
- io.iov_len = sizeof(p->data);
+ io.iov_base = FLX_DNS_PACKET_DATA(p);
+ io.iov_len = p->max_size;
memset(&msg, 0, sizeof(msg));
msg.msg_name = ret_sa;
@@ -376,7 +376,7 @@ fail:
}
flxDnsPacket* flx_recv_dns_packet_ipv6(gint fd, struct sockaddr_in6 *ret_sa, gint *ret_iface, guint8* ret_ttl) {
- flxDnsPacket *p= NULL;
+ flxDnsPacket *p = NULL;
struct msghdr msg;
struct iovec io;
uint8_t aux[64];
@@ -389,10 +389,10 @@ flxDnsPacket* flx_recv_dns_packet_ipv6(gint fd, struct sockaddr_in6 *ret_sa, gin
g_assert(ret_iface);
g_assert(ret_ttl);
- p = flx_dns_packet_new();
+ p = flx_dns_packet_new(0);
- io.iov_base = p->data;
- io.iov_len = sizeof(p->data);
+ io.iov_base = FLX_DNS_PACKET_DATA(p);
+ io.iov_len = p->max_size;
memset(&msg, 0, sizeof(msg));
msg.msg_name = ret_sa;
diff --git a/timeeventq.c b/timeeventq.c
index b3dd897..0d4af97 100644
--- a/timeeventq.c
+++ b/timeeventq.c
@@ -133,7 +133,7 @@ void flx_time_event_queue_remove(flxTimeEventQueue *q, flxTimeEvent *e) {
g_free(e);
}
-void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval) {
+void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval) {
g_assert(q);
g_assert(e);
g_assert(e->queue == q);
@@ -142,3 +142,15 @@ void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal
flx_prio_queue_shuffle(q->prioq, e->node);
}
+
+flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q) {
+ g_assert(q);
+
+ return q->prioq->root ? q->prioq->root->data : NULL;
+}
+
+flxTimeEvent* flx_time_event_next(flxTimeEvent *e) {
+ g_assert(e);
+
+ return e->node->next->data;
+}
diff --git a/timeeventq.h b/timeeventq.h
index 8fd5c02..f663f1f 100644
--- a/timeeventq.h
+++ b/timeeventq.h
@@ -25,6 +25,9 @@ void flx_time_event_queue_free(flxTimeEventQueue *q);
flxTimeEvent* flx_time_event_queue_add(flxTimeEventQueue *q, const GTimeVal *timeval, void (*callback)(flxTimeEvent *e, void *userdata), void *userdata);
void flx_time_event_queue_remove(flxTimeEventQueue *q, flxTimeEvent *e);
-void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval);
+void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval);
+
+flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q);
+flxTimeEvent* flx_time_event_next(flxTimeEvent *e);
#endif
diff --git a/util.c b/util.c
index faf1ac0..b8128c4 100644
--- a/util.c
+++ b/util.c
@@ -99,3 +99,17 @@ gint flx_wait_for_write(gint fd) {
return 0;
}
+
+GTimeVal *flx_elapse_time(GTimeVal *tv, guint msec, guint jitter) {
+ g_assert(tv);
+
+ g_get_current_time(tv);
+
+ if (msec)
+ g_time_val_add(tv, msec*1000);
+
+ if (jitter)
+ g_time_val_add(tv, g_random_int_range(0, jitter) * 1000);
+
+ return tv;
+}
diff --git a/util.h b/util.h
index 78f86d2..517c2f1 100644
--- a/util.h
+++ b/util.h
@@ -13,4 +13,6 @@ gint flx_set_cloexec(gint fd);
gint flx_set_nonblock(gint fd);
gint flx_wait_for_write(gint fd);
+GTimeVal *flx_elapse_time(GTimeVal *tv, guint msec, guint jitter);
+
#endif