diff options
Diffstat (limited to 'libasyncns')
| -rw-r--r-- | libasyncns/Makefile.am | 8 | ||||
| -rw-r--r-- | libasyncns/asyncns-test.c | 2 | ||||
| -rw-r--r-- | libasyncns/asyncns.c | 213 | ||||
| -rw-r--r-- | libasyncns/asyncns.h | 2 | 
4 files changed, 147 insertions, 78 deletions
| diff --git a/libasyncns/Makefile.am b/libasyncns/Makefile.am index 164439c..568d3fa 100644 --- a/libasyncns/Makefile.am +++ b/libasyncns/Makefile.am @@ -17,13 +17,17 @@  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  # USA. -AM_CFLAGS=-D_GNU_SOURCE -D__EXTENSIONS__  +AM_CFLAGS=-D_GNU_SOURCE -D__EXTENSIONS__ $(PTHREAD_CFLAGS) +AM_LIBADD=$(PTHREAD_LIBS) +AM_LDADD=$(PTHREAD_LIBS)  lib_LTLIBRARIES=libasyncns.la +libasyncns_la_LIBADD=$(AM_LIBADD)  +libasyncns_la_CC=$(PTHREAD_CC)  libasyncns_la_SOURCES=asyncns.c asyncns.h  noinst_PROGRAMS=asyncns-test  asyncns_test_SOURCES=asyncns-test.c -asyncns_test_LDADD=libasyncns.la +asyncns_test_LDADD=$(AM_LDADD) libasyncns.la  include_HEADERS=asyncns.h diff --git a/libasyncns/asyncns-test.c b/libasyncns/asyncns-test.c index 6eb0a0d..e3169ec 100644 --- a/libasyncns/asyncns-test.c +++ b/libasyncns/asyncns-test.c @@ -35,7 +35,7 @@ int main(int argc, char *argv[]) {      struct sockaddr_in sa;      char host[NI_MAXHOST] = "", serv[NI_MAXSERV] = ""; -    if (!(asyncns = asyncns_new(5))) { +    if (!(asyncns = asyncns_new(10))) {          fprintf(stderr, "asyncns_new() failed\n");          goto fail;      } diff --git a/libasyncns/asyncns.c b/libasyncns/asyncns.c index d1ce584..6e05635 100644 --- a/libasyncns/asyncns.c +++ b/libasyncns/asyncns.c @@ -23,6 +23,8 @@  #include <config.h>  #endif +/*#undef HAVE_PTHREAD */ +  #include <assert.h>  #include <fcntl.h>  #include <signal.h> @@ -40,6 +42,10 @@  #include <sys/prctl.h>  #endif +#if HAVE_PTHREAD +#include <pthread.h> +#endif +  #include "asyncns.h"  #define MAX_WORKERS 16 @@ -50,13 +56,26 @@ typedef enum {      REQUEST_ADDRINFO,      RESPONSE_ADDRINFO,      REQUEST_NAMEINFO, -    RESPONSE_NAMEINFO +    RESPONSE_NAMEINFO, +    REQUEST_TERMINATE  } query_type_t; +enum { +    REQUEST_RECV_FD = 0, +    REQUEST_SEND_FD = 1, +    RESPONSE_RECV_FD = 2, +    RESPONSE_SEND_FD = 3 +}; +  struct asyncns { -    int in_fd, out_fd; +    int fds[4]; +#ifndef HAVE_PTHREAD          pid_t workers[MAX_WORKERS]; +#else +    pthread_t workers[MAX_WORKERS]; +#endif +    unsigned valid_workers;      unsigned current_id, current_index;      asyncns_query_t* queries[MAX_QUERIES]; @@ -146,7 +165,7 @@ static int fd_nonblock(int fd) {      int i;      assert(fd >= 0); -    if ((i = fcntl(fd, F_GETFL)) < 0) +    if ((i = fcntl(fd, F_GETFL, 0)) < 0)          return -1;      if (i & O_NONBLOCK) @@ -218,6 +237,9 @@ static int send_addrinfo_reply(int out_fd, unsigned id, int ret, struct addrinfo          }      } +    if (ai) +        freeaddrinfo(ai); +      return send(out_fd, resp, resp->header.length, 0);  } @@ -277,12 +299,9 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) {              ret = getaddrinfo(node, service,                                ai_req->hints_is_null ? NULL : &ai,                                &result); -            ret = send_addrinfo_reply(out_fd, req->id, ret, result); - -            if (result) -                freeaddrinfo(result); -            return ret; +            /* send_addrinfo_reply() frees result */ +            return send_addrinfo_reply(out_fd, req->id, ret, result);          }          case REQUEST_NAMEINFO: { @@ -306,6 +325,11 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) {                                         ret == 0 && ni_req->getserv ? servbuf : NULL);          } +        case REQUEST_TERMINATE: { +            /* Quit */ +            return -1; +        } +          default:              ;      } @@ -313,8 +337,9 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) {      return 0;  } -static int worker(int in_fd, int out_fd) { -    int r = 0; +#ifndef HAVE_PTHREAD + +static int process_worker(int in_fd, int out_fd) {      int have_death_sig = 0;      assert(in_fd > 2);      assert(out_fd > 2); @@ -372,7 +397,7 @@ static int worker(int in_fd, int out_fd) {              FD_SET(in_fd, &fds);              if (select(in_fd+1, &fds, NULL, NULL, &tv) < 0) -                goto fail; +                break;              if (getppid() == 1)                  break; @@ -382,84 +407,106 @@ static int worker(int in_fd, int out_fd) {              if (length < 0 && errno == EAGAIN)                  continue; -             -            goto fail; + +            break;          }          if (handle_request(out_fd, (rheader_t*) buf, (size_t) length) < 0) -            goto fail; +            break;      } -fail: -      close(in_fd);      close(out_fd); -    return r; +    return 0;  } -asyncns_t* asyncns_new(int n_proc) { -    asyncns_t *asyncns = NULL; -    int fd1[2] = { -1, -1 }, fd2[2] = { -1, -1 }, p; - -    assert(n_proc >= 1); +#else -    if (!(asyncns = malloc(sizeof(asyncns_t)))) -        goto fail; +static void* thread_worker(void *p) { +    sigset_t fullset; +    int *fds = p; +    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); -    asyncns->in_fd = asyncns->out_fd = -1; -    memset(asyncns->workers, 0, sizeof(asyncns->workers)); +    /* No signals in this thread please */ +    sigfillset(&fullset); +    pthread_sigmask(SIG_BLOCK, &fullset, NULL); -    if (socketpair(PF_UNIX, SOCK_DGRAM, 0, fd1) < 0) -        goto fail; +    for (;;) { +        char buf[BUFSIZE]; +        ssize_t length; -    if (socketpair(PF_UNIX, SOCK_DGRAM, 0, fd2) < 0) -        goto fail; +        if ((length = recv(fds[REQUEST_RECV_FD], buf, sizeof(buf), 0)) <= 0) +            break; -    fd_cloexec(fd1[0]); -    fd_cloexec(fd1[1]); -    fd_cloexec(fd2[0]); -    fd_cloexec(fd2[1]); +        if (handle_request(fds[RESPONSE_SEND_FD], (rheader_t*) buf, (size_t) length) < 0) +            break; +         +    } + +    return NULL; +} + +#endif + +asyncns_t* asyncns_new(unsigned n_proc) { +    asyncns_t *asyncns = NULL; +    int i; +    unsigned p; +    assert(n_proc >= 1);      if (n_proc > MAX_WORKERS)          n_proc = MAX_WORKERS; -    for (p = 0; p < n_proc; p++) { +    if (!(asyncns = malloc(sizeof(asyncns_t)))) +        goto fail; + +    asyncns->valid_workers = 0; + +    for (i = 0; i < 4; i++)  +        asyncns->fds[i] = -1; +         +    for (p = 0; p < MAX_QUERIES; p++) +        asyncns->queries[p] = NULL; +     +    if (socketpair(PF_UNIX, SOCK_DGRAM, 0, asyncns->fds) < 0 || +        socketpair(PF_UNIX, SOCK_DGRAM, 0, asyncns->fds+2) < 0) +        goto fail; +     +    for (i = 0; i < 4; i++)  +        fd_cloexec(asyncns->fds[i]); + +    for (asyncns->valid_workers = 0; asyncns->valid_workers < n_proc; asyncns->valid_workers++) { -        if ((asyncns->workers[p] = fork()) < 0) +#ifndef HAVE_PTHREAD +        if ((asyncns->workers[asyncns->valid_workers] = fork()) < 0)              goto fail; -        else if (asyncns->workers[p] == 0) { -            close(fd1[0]); -            close(fd2[1]); -            _exit(worker(fd2[0], fd1[1])); +        else if (asyncns->workers[asyncns->valid_workers] == 0) { +            close(asyncns->fds[REQUEST_SEND_FD]); +            close(asyncns->fds[RESPONSE_RECV_FD]); +            _exit(process_worker(asyncns->fds[REQUEST_RECV_FD], asyncns->fds[RESPONSE_SEND_FD]));          } +#else +        if (pthread_create(&asyncns->workers[asyncns->valid_workers], NULL, thread_worker, asyncns->fds) != 0) +            goto fail; +#endif      } -    close(fd2[0]); -    close(fd1[1]); -    asyncns->in_fd = fd1[0]; -    asyncns->out_fd = fd2[1]; - +#ifndef HAVE_PTHREAD +    close(asyncns->fds[REQUEST_RECV_FD]); +    close(asyncns->fds[RESPONSE_SEND_FD]); +    asyncns->fds[REQUEST_RECV_FD] = asyncns->fds[RESPONSE_SEND_FD] = -1; +#endif     +          asyncns->current_index = asyncns->current_id = 0; - -    for (p = 0; p < MAX_QUERIES; p++) -        asyncns->queries[p] = NULL; -/*     memset(asyncns->queries, 0, sizeof(asyncns->queries)); */ -      asyncns->done_head = asyncns->done_tail = NULL;      asyncns->n_queries = 0; -    fd_nonblock(asyncns->in_fd); +    fd_nonblock(asyncns->fds[RESPONSE_RECV_FD]);      return asyncns;  fail: - -    if (fd1[0] >= 0) close(fd1[0]); -    if (fd1[1] >= 0) close(fd1[1]); -    if (fd2[0] >= 0) close(fd2[0]); -    if (fd2[1] >= 0) close(fd2[1]); -      if (asyncns)           asyncns_free(asyncns); @@ -467,20 +514,38 @@ fail:  }  void asyncns_free(asyncns_t *asyncns) { -    int p; +    unsigned p; +    int i; +    rheader_t req;      assert(asyncns); + +    req.type = REQUEST_TERMINATE; +    req.length = sizeof(req); +    req.id = 0; -    if (asyncns->in_fd >= 0) -        close(asyncns->in_fd); -    if (asyncns->out_fd >= 0) -        close(asyncns->out_fd); -     -    for (p = 0; p < MAX_WORKERS; p++) -        if (asyncns->workers[p] >= 1) { -            kill(asyncns->workers[p], SIGTERM); -            waitpid(asyncns->workers[p], NULL, 0); -        } +    /* Send one termiantion packet for each worker */ +    for (p = 0; p < asyncns->valid_workers; p++) +        send(asyncns->fds[REQUEST_SEND_FD], &req, req.length, 0); + +    /* No terminate them forcibly*/ +    for (p = 0; p < asyncns->valid_workers; p++) { +#ifndef HAVE_PTHREAD +        kill(asyncns->workers[p], SIGTERM); +        waitpid(asyncns->workers[p], NULL, 0); +#else +        pthread_cancel(asyncns->workers[p]); +        pthread_join(asyncns->workers[p], NULL); +#endif         +    } + +    /* Due to Solaris' broken thread cancelation we first send an +     * termination request and then cancel th thread. */ +     +    for (i = 0; i < 4; i++) +        if (asyncns->fds[i] >= 0) +            close(asyncns->fds[i]); +          for (p = 0; p < MAX_QUERIES; p++)          if (asyncns->queries[p])              asyncns_cancel(asyncns, asyncns->queries[p]); @@ -491,7 +556,7 @@ void asyncns_free(asyncns_t *asyncns) {  int asyncns_fd(asyncns_t *asyncns) {      assert(asyncns); -    return asyncns->in_fd; +    return asyncns->fds[RESPONSE_RECV_FD];  }  static asyncns_query_t *lookup_query(asyncns_t *asyncns, unsigned id) { @@ -651,7 +716,7 @@ int asyncns_wait(asyncns_t *asyncns, int block) {          char buf[BUFSIZE];          ssize_t l; -        if (((l = recv(asyncns->in_fd, buf, sizeof(buf), 0)) < 0)) { +        if (((l = recv(asyncns->fds[RESPONSE_RECV_FD], buf, sizeof(buf), 0)) < 0)) {              fd_set fds;              if (errno != EAGAIN) @@ -661,9 +726,9 @@ int asyncns_wait(asyncns_t *asyncns, int block) {                  return 0;              FD_ZERO(&fds); -            FD_SET(asyncns->in_fd, &fds); +            FD_SET(asyncns->fds[RESPONSE_RECV_FD], &fds); -            if (select(asyncns->in_fd+1, &fds, NULL, NULL, NULL) < 0) +            if (select(asyncns->fds[RESPONSE_RECV_FD]+1, &fds, NULL, NULL, NULL) < 0)                  return -1;              continue; @@ -745,7 +810,7 @@ asyncns_query_t* asyncns_getaddrinfo(asyncns_t *asyncns, const char *node, const      if (service)          strcpy((char*) req + sizeof(addrinfo_request_t) + req->node_len, service); -    if (send(asyncns->out_fd, req, req->header.length, 0) < 0) +    if (send(asyncns->fds[REQUEST_SEND_FD], req, req->header.length, 0) < 0)          goto fail;      return q; @@ -803,7 +868,7 @@ asyncns_query_t* asyncns_getnameinfo(asyncns_t *asyncns, const struct sockaddr *      memcpy((uint8_t*) req + sizeof(nameinfo_request_t), sa, salen); -    if (send(asyncns->out_fd, req, req->header.length, 0) < 0) +    if (send(asyncns->fds[REQUEST_SEND_FD], req, req->header.length, 0) < 0)          goto fail;      return q; diff --git a/libasyncns/asyncns.h b/libasyncns/asyncns.h index ac5f789..beb1e47 100644 --- a/libasyncns/asyncns.h +++ b/libasyncns/asyncns.h @@ -55,7 +55,7 @@ typedef struct asyncns asyncns_t;  typedef struct asyncns_query asyncns_query_t;  /** Allocate a new libasyncns session with n_proc worker processes */ -asyncns_t* asyncns_new(int n_proc); +asyncns_t* asyncns_new(unsigned n_proc);  /** Free a libasyncns session. This destroys all attached   * asyncns_query_t objects automatically */ | 
