diff options
author | Lennart Poettering <lennart@poettering.net> | 2005-01-08 01:04:03 +0000 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2005-01-08 01:04:03 +0000 |
commit | 412278d54644f995a1d1eedf6da6db47634b2470 (patch) | |
tree | bd826be408dc3567e6f8b9b9a76acf17e9c52e7b /libasyncns/asyncns.c | |
parent | 1117b84689ee5337369b95c7b093be9acbed26ae (diff) |
pthread compat
git-svn-id: file:///home/lennart/svn/public/libasyncns/trunk@14 cc0fb855-19ed-0310-866e-8c1d96e4abae
Diffstat (limited to 'libasyncns/asyncns.c')
-rw-r--r-- | libasyncns/asyncns.c | 213 |
1 files changed, 139 insertions, 74 deletions
diff --git a/libasyncns/asyncns.c b/libasyncns/asyncns.c index d1ce584..6e05635 100644 --- a/libasyncns/asyncns.c +++ b/libasyncns/asyncns.c @@ -23,6 +23,8 @@ #include <config.h> #endif +/*#undef HAVE_PTHREAD */ + #include <assert.h> #include <fcntl.h> #include <signal.h> @@ -40,6 +42,10 @@ #include <sys/prctl.h> #endif +#if HAVE_PTHREAD +#include <pthread.h> +#endif + #include "asyncns.h" #define MAX_WORKERS 16 @@ -50,13 +56,26 @@ typedef enum { REQUEST_ADDRINFO, RESPONSE_ADDRINFO, REQUEST_NAMEINFO, - RESPONSE_NAMEINFO + RESPONSE_NAMEINFO, + REQUEST_TERMINATE } query_type_t; +enum { + REQUEST_RECV_FD = 0, + REQUEST_SEND_FD = 1, + RESPONSE_RECV_FD = 2, + RESPONSE_SEND_FD = 3 +}; + struct asyncns { - int in_fd, out_fd; + int fds[4]; +#ifndef HAVE_PTHREAD pid_t workers[MAX_WORKERS]; +#else + pthread_t workers[MAX_WORKERS]; +#endif + unsigned valid_workers; unsigned current_id, current_index; asyncns_query_t* queries[MAX_QUERIES]; @@ -146,7 +165,7 @@ static int fd_nonblock(int fd) { int i; assert(fd >= 0); - if ((i = fcntl(fd, F_GETFL)) < 0) + if ((i = fcntl(fd, F_GETFL, 0)) < 0) return -1; if (i & O_NONBLOCK) @@ -218,6 +237,9 @@ static int send_addrinfo_reply(int out_fd, unsigned id, int ret, struct addrinfo } } + if (ai) + freeaddrinfo(ai); + return send(out_fd, resp, resp->header.length, 0); } @@ -277,12 +299,9 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) { ret = getaddrinfo(node, service, ai_req->hints_is_null ? NULL : &ai, &result); - ret = send_addrinfo_reply(out_fd, req->id, ret, result); - - if (result) - freeaddrinfo(result); - return ret; + /* send_addrinfo_reply() frees result */ + return send_addrinfo_reply(out_fd, req->id, ret, result); } case REQUEST_NAMEINFO: { @@ -306,6 +325,11 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) { ret == 0 && ni_req->getserv ? servbuf : NULL); } + case REQUEST_TERMINATE: { + /* Quit */ + return -1; + } + default: ; } @@ -313,8 +337,9 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) { return 0; } -static int worker(int in_fd, int out_fd) { - int r = 0; +#ifndef HAVE_PTHREAD + +static int process_worker(int in_fd, int out_fd) { int have_death_sig = 0; assert(in_fd > 2); assert(out_fd > 2); @@ -372,7 +397,7 @@ static int worker(int in_fd, int out_fd) { FD_SET(in_fd, &fds); if (select(in_fd+1, &fds, NULL, NULL, &tv) < 0) - goto fail; + break; if (getppid() == 1) break; @@ -382,84 +407,106 @@ static int worker(int in_fd, int out_fd) { if (length < 0 && errno == EAGAIN) continue; - - goto fail; + + break; } if (handle_request(out_fd, (rheader_t*) buf, (size_t) length) < 0) - goto fail; + break; } -fail: - close(in_fd); close(out_fd); - return r; + return 0; } -asyncns_t* asyncns_new(int n_proc) { - asyncns_t *asyncns = NULL; - int fd1[2] = { -1, -1 }, fd2[2] = { -1, -1 }, p; - - assert(n_proc >= 1); +#else - if (!(asyncns = malloc(sizeof(asyncns_t)))) - goto fail; +static void* thread_worker(void *p) { + sigset_t fullset; + int *fds = p; + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - asyncns->in_fd = asyncns->out_fd = -1; - memset(asyncns->workers, 0, sizeof(asyncns->workers)); + /* No signals in this thread please */ + sigfillset(&fullset); + pthread_sigmask(SIG_BLOCK, &fullset, NULL); - if (socketpair(PF_UNIX, SOCK_DGRAM, 0, fd1) < 0) - goto fail; + for (;;) { + char buf[BUFSIZE]; + ssize_t length; - if (socketpair(PF_UNIX, SOCK_DGRAM, 0, fd2) < 0) - goto fail; + if ((length = recv(fds[REQUEST_RECV_FD], buf, sizeof(buf), 0)) <= 0) + break; - fd_cloexec(fd1[0]); - fd_cloexec(fd1[1]); - fd_cloexec(fd2[0]); - fd_cloexec(fd2[1]); + if (handle_request(fds[RESPONSE_SEND_FD], (rheader_t*) buf, (size_t) length) < 0) + break; + + } + + return NULL; +} + +#endif + +asyncns_t* asyncns_new(unsigned n_proc) { + asyncns_t *asyncns = NULL; + int i; + unsigned p; + assert(n_proc >= 1); if (n_proc > MAX_WORKERS) n_proc = MAX_WORKERS; - for (p = 0; p < n_proc; p++) { + if (!(asyncns = malloc(sizeof(asyncns_t)))) + goto fail; + + asyncns->valid_workers = 0; + + for (i = 0; i < 4; i++) + asyncns->fds[i] = -1; + + for (p = 0; p < MAX_QUERIES; p++) + asyncns->queries[p] = NULL; + + if (socketpair(PF_UNIX, SOCK_DGRAM, 0, asyncns->fds) < 0 || + socketpair(PF_UNIX, SOCK_DGRAM, 0, asyncns->fds+2) < 0) + goto fail; + + for (i = 0; i < 4; i++) + fd_cloexec(asyncns->fds[i]); + + for (asyncns->valid_workers = 0; asyncns->valid_workers < n_proc; asyncns->valid_workers++) { - if ((asyncns->workers[p] = fork()) < 0) +#ifndef HAVE_PTHREAD + if ((asyncns->workers[asyncns->valid_workers] = fork()) < 0) goto fail; - else if (asyncns->workers[p] == 0) { - close(fd1[0]); - close(fd2[1]); - _exit(worker(fd2[0], fd1[1])); + else if (asyncns->workers[asyncns->valid_workers] == 0) { + close(asyncns->fds[REQUEST_SEND_FD]); + close(asyncns->fds[RESPONSE_RECV_FD]); + _exit(process_worker(asyncns->fds[REQUEST_RECV_FD], asyncns->fds[RESPONSE_SEND_FD])); } +#else + if (pthread_create(&asyncns->workers[asyncns->valid_workers], NULL, thread_worker, asyncns->fds) != 0) + goto fail; +#endif } - close(fd2[0]); - close(fd1[1]); - asyncns->in_fd = fd1[0]; - asyncns->out_fd = fd2[1]; - +#ifndef HAVE_PTHREAD + close(asyncns->fds[REQUEST_RECV_FD]); + close(asyncns->fds[RESPONSE_SEND_FD]); + asyncns->fds[REQUEST_RECV_FD] = asyncns->fds[RESPONSE_SEND_FD] = -1; +#endif + asyncns->current_index = asyncns->current_id = 0; - - for (p = 0; p < MAX_QUERIES; p++) - asyncns->queries[p] = NULL; -/* memset(asyncns->queries, 0, sizeof(asyncns->queries)); */ - asyncns->done_head = asyncns->done_tail = NULL; asyncns->n_queries = 0; - fd_nonblock(asyncns->in_fd); + fd_nonblock(asyncns->fds[RESPONSE_RECV_FD]); return asyncns; fail: - - if (fd1[0] >= 0) close(fd1[0]); - if (fd1[1] >= 0) close(fd1[1]); - if (fd2[0] >= 0) close(fd2[0]); - if (fd2[1] >= 0) close(fd2[1]); - if (asyncns) asyncns_free(asyncns); @@ -467,20 +514,38 @@ fail: } void asyncns_free(asyncns_t *asyncns) { - int p; + unsigned p; + int i; + rheader_t req; assert(asyncns); + + req.type = REQUEST_TERMINATE; + req.length = sizeof(req); + req.id = 0; - if (asyncns->in_fd >= 0) - close(asyncns->in_fd); - if (asyncns->out_fd >= 0) - close(asyncns->out_fd); - - for (p = 0; p < MAX_WORKERS; p++) - if (asyncns->workers[p] >= 1) { - kill(asyncns->workers[p], SIGTERM); - waitpid(asyncns->workers[p], NULL, 0); - } + /* Send one termiantion packet for each worker */ + for (p = 0; p < asyncns->valid_workers; p++) + send(asyncns->fds[REQUEST_SEND_FD], &req, req.length, 0); + + /* No terminate them forcibly*/ + for (p = 0; p < asyncns->valid_workers; p++) { +#ifndef HAVE_PTHREAD + kill(asyncns->workers[p], SIGTERM); + waitpid(asyncns->workers[p], NULL, 0); +#else + pthread_cancel(asyncns->workers[p]); + pthread_join(asyncns->workers[p], NULL); +#endif + } + + /* Due to Solaris' broken thread cancelation we first send an + * termination request and then cancel th thread. */ + + for (i = 0; i < 4; i++) + if (asyncns->fds[i] >= 0) + close(asyncns->fds[i]); + for (p = 0; p < MAX_QUERIES; p++) if (asyncns->queries[p]) asyncns_cancel(asyncns, asyncns->queries[p]); @@ -491,7 +556,7 @@ void asyncns_free(asyncns_t *asyncns) { int asyncns_fd(asyncns_t *asyncns) { assert(asyncns); - return asyncns->in_fd; + return asyncns->fds[RESPONSE_RECV_FD]; } static asyncns_query_t *lookup_query(asyncns_t *asyncns, unsigned id) { @@ -651,7 +716,7 @@ int asyncns_wait(asyncns_t *asyncns, int block) { char buf[BUFSIZE]; ssize_t l; - if (((l = recv(asyncns->in_fd, buf, sizeof(buf), 0)) < 0)) { + if (((l = recv(asyncns->fds[RESPONSE_RECV_FD], buf, sizeof(buf), 0)) < 0)) { fd_set fds; if (errno != EAGAIN) @@ -661,9 +726,9 @@ int asyncns_wait(asyncns_t *asyncns, int block) { return 0; FD_ZERO(&fds); - FD_SET(asyncns->in_fd, &fds); + FD_SET(asyncns->fds[RESPONSE_RECV_FD], &fds); - if (select(asyncns->in_fd+1, &fds, NULL, NULL, NULL) < 0) + if (select(asyncns->fds[RESPONSE_RECV_FD]+1, &fds, NULL, NULL, NULL) < 0) return -1; continue; @@ -745,7 +810,7 @@ asyncns_query_t* asyncns_getaddrinfo(asyncns_t *asyncns, const char *node, const if (service) strcpy((char*) req + sizeof(addrinfo_request_t) + req->node_len, service); - if (send(asyncns->out_fd, req, req->header.length, 0) < 0) + if (send(asyncns->fds[REQUEST_SEND_FD], req, req->header.length, 0) < 0) goto fail; return q; @@ -803,7 +868,7 @@ asyncns_query_t* asyncns_getnameinfo(asyncns_t *asyncns, const struct sockaddr * memcpy((uint8_t*) req + sizeof(nameinfo_request_t), sa, salen); - if (send(asyncns->out_fd, req, req->header.length, 0) < 0) + if (send(asyncns->fds[REQUEST_SEND_FD], req, req->header.length, 0) < 0) goto fail; return q; |