From 412278d54644f995a1d1eedf6da6db47634b2470 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sat, 8 Jan 2005 01:04:03 +0000 Subject: pthread compat git-svn-id: file:///home/lennart/svn/public/libasyncns/trunk@14 cc0fb855-19ed-0310-866e-8c1d96e4abae --- acinclude.m4 | 199 +++++++++++++++++++++++++++++++++++++++++++ configure.ac | 3 +- libasyncns/Makefile.am | 8 +- libasyncns/asyncns-test.c | 2 +- libasyncns/asyncns.c | 213 ++++++++++++++++++++++++++++++---------------- libasyncns/asyncns.h | 2 +- 6 files changed, 348 insertions(+), 79 deletions(-) create mode 100644 acinclude.m4 diff --git a/acinclude.m4 b/acinclude.m4 new file mode 100644 index 0000000..bedf51c --- /dev/null +++ b/acinclude.m4 @@ -0,0 +1,199 @@ +dnl Available from the GNU Autoconf Macro Archive at: +dnl http://www.gnu.org/software/ac-archive/htmldoc/acx_pthread.html +dnl +AC_DEFUN([ACX_PTHREAD], [ +AC_REQUIRE([AC_CANONICAL_HOST]) +AC_LANG_SAVE +AC_LANG_C +acx_pthread_ok=no + +# We used to check for pthread.h first, but this fails if pthread.h +# requires special compiler flags (e.g. on True64 or Sequent). +# It gets checked for in the link test anyway. + +# First of all, check if the user has set any of the PTHREAD_LIBS, +# etcetera environment variables, and if threads linking works using +# them: +if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS]) + AC_TRY_LINK_FUNC(pthread_join, acx_pthread_ok=yes) + AC_MSG_RESULT($acx_pthread_ok) + if test x"$acx_pthread_ok" = xno; then + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" + fi + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" +fi + +# We must check for the threads library under a number of different +# names; the ordering is very important because some systems +# (e.g. DEC) have both -lpthread and -lpthreads, where one of the +# libraries is broken (non-POSIX). + +# Create a list of thread flags to try. Items starting with a "-" are +# C compiler flags, and other items are library names, except for "none" +# which indicates that we try without any flags at all, and "pthread-config" +# which is a program returning the flags for the Pth emulation library. + +acx_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config" + +# The ordering *is* (sometimes) important. Some notes on the +# individual items follow: + +# pthreads: AIX (must check this before -lpthread) +# none: in case threads are in libc; should be tried before -Kthread and +# other compiler flags to prevent continual compiler warnings +# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h) +# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able) +# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread) +# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads) +# -pthreads: Solaris/gcc +# -mthreads: Mingw32/gcc, Lynx/gcc +# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it +# doesn't hurt to check since this sometimes defines pthreads too; +# also defines -D_REENTRANT) +# pthread: Linux, etcetera +# --thread-safe: KAI C++ +# pthread-config: use pthread-config program (for GNU Pth library) + +case "${host_cpu}-${host_os}" in + *solaris*) + + # On Solaris (at least, for some versions), libc contains stubbed + # (non-functional) versions of the pthreads routines, so link-based + # tests will erroneously succeed. (We need to link with -pthread or + # -lpthread.) (The stubs are missing pthread_cleanup_push, or rather + # a function called by this macro, so we could check for that, but + # who knows whether they'll stub that too in a future libc.) So, + # we'll just look for -pthreads and -lpthread first: + + acx_pthread_flags="-pthread -pthreads pthread -mt $acx_pthread_flags" + ;; +esac + +if test x"$acx_pthread_ok" = xno; then +for flag in $acx_pthread_flags; do + + case $flag in + none) + AC_MSG_CHECKING([whether pthreads work without any flags]) + ;; + + -*) + AC_MSG_CHECKING([whether pthreads work with $flag]) + PTHREAD_CFLAGS="$flag" + ;; + + pthread-config) + AC_CHECK_PROG(acx_pthread_config, pthread-config, yes, no) + if test x"$acx_pthread_config" = xno; then continue; fi + PTHREAD_CFLAGS="`pthread-config --cflags`" + PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`" + ;; + + *) + AC_MSG_CHECKING([for the pthreads library -l$flag]) + PTHREAD_LIBS="-l$flag" + ;; + esac + + save_LIBS="$LIBS" + save_CFLAGS="$CFLAGS" + LIBS="$PTHREAD_LIBS $LIBS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Check for various functions. We must include pthread.h, + # since some functions may be macros. (On the Sequent, we + # need a special flag -Kthread to make this header compile.) + # We check for pthread_join because it is in -lpthread on IRIX + # while pthread_create is in libc. We check for pthread_attr_init + # due to DEC craziness with -lpthreads. We check for + # pthread_cleanup_push because it is one of the few pthread + # functions on Solaris that doesn't have a non-functional libc stub. + # We try pthread_create on general principles. + AC_TRY_LINK([#include ], + [pthread_t th; pthread_join(th, 0); + pthread_attr_init(0); pthread_cleanup_push(0, 0); + pthread_create(0,0,0,0); pthread_cleanup_pop(0); ], + [acx_pthread_ok=yes]) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + AC_MSG_RESULT($acx_pthread_ok) + if test "x$acx_pthread_ok" = xyes; then + break; + fi + + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" +done +fi + +# Various other checks: +if test "x$acx_pthread_ok" = xyes; then + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Detect AIX lossage: threads are created detached by default + # and the JOINABLE attribute has a nonstandard name (UNDETACHED). + AC_MSG_CHECKING([for joinable pthread attribute]) + AC_TRY_LINK([#include ], + [int attr=PTHREAD_CREATE_JOINABLE;], + ok=PTHREAD_CREATE_JOINABLE, ok=unknown) + if test x"$ok" = xunknown; then + AC_TRY_LINK([#include ], + [int attr=PTHREAD_CREATE_UNDETACHED;], + ok=PTHREAD_CREATE_UNDETACHED, ok=unknown) + fi + if test x"$ok" != xPTHREAD_CREATE_JOINABLE; then + AC_DEFINE(PTHREAD_CREATE_JOINABLE, $ok, + [Define to the necessary symbol if this constant + uses a non-standard name on your system.]) + fi + AC_MSG_RESULT(${ok}) + if test x"$ok" = xunknown; then + AC_MSG_WARN([we do not know how to create joinable pthreads]) + fi + + AC_MSG_CHECKING([if more special flags are required for pthreads]) + flag=no + case "${host_cpu}-${host_os}" in + *-aix* | *-freebsd* | *-darwin*) flag="-D_THREAD_SAFE";; + *solaris* | *-osf* | *-hpux*) flag="-D_REENTRANT";; + esac + AC_MSG_RESULT(${flag}) + if test "x$flag" != xno; then + PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS" + fi + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + # More AIX lossage: must compile with cc_r + AC_CHECK_PROG(PTHREAD_CC, cc_r, cc_r, ${CC}) +else + PTHREAD_CC="$CC" +fi + +AC_SUBST(PTHREAD_LIBS) +AC_SUBST(PTHREAD_CFLAGS) +AC_SUBST(PTHREAD_CC) + +# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND: +if test x"$acx_pthread_ok" = xyes; then + ifelse([$1],,AC_DEFINE(HAVE_PTHREAD,1,[Define if you have POSIX threads libraries and header files.]),[$1]) + : +else + acx_pthread_ok=no + $2 +fi +AC_LANG_RESTORE +])dnl ACX_PTHREAD diff --git a/configure.ac b/configure.ac index 581c086..5e0a68c 100644 --- a/configure.ac +++ b/configure.ac @@ -55,7 +55,6 @@ AC_C_CONST AC_TYPE_PID_T AC_TYPE_SIZE_T AC_HEADER_TIME - # Checks for library functions. AC_FUNC_FORK AC_FUNC_MALLOC @@ -66,6 +65,8 @@ AC_CHECK_FUNCS([memset select strndup setresuid setreuid]) AC_CHECK_LIB(nsl, gethostbyname) AC_CHECK_LIB(socket, connect) +ACX_PTHREAD + # If using GCC specify some additional parameters if test "x$GCC" = "xyes" ; then CFLAGS="$CFLAGS -pipe -W -Wall -pedantic" diff --git a/libasyncns/Makefile.am b/libasyncns/Makefile.am index 164439c..568d3fa 100644 --- a/libasyncns/Makefile.am +++ b/libasyncns/Makefile.am @@ -17,13 +17,17 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA. -AM_CFLAGS=-D_GNU_SOURCE -D__EXTENSIONS__ +AM_CFLAGS=-D_GNU_SOURCE -D__EXTENSIONS__ $(PTHREAD_CFLAGS) +AM_LIBADD=$(PTHREAD_LIBS) +AM_LDADD=$(PTHREAD_LIBS) lib_LTLIBRARIES=libasyncns.la +libasyncns_la_LIBADD=$(AM_LIBADD) +libasyncns_la_CC=$(PTHREAD_CC) libasyncns_la_SOURCES=asyncns.c asyncns.h noinst_PROGRAMS=asyncns-test asyncns_test_SOURCES=asyncns-test.c -asyncns_test_LDADD=libasyncns.la +asyncns_test_LDADD=$(AM_LDADD) libasyncns.la include_HEADERS=asyncns.h diff --git a/libasyncns/asyncns-test.c b/libasyncns/asyncns-test.c index 6eb0a0d..e3169ec 100644 --- a/libasyncns/asyncns-test.c +++ b/libasyncns/asyncns-test.c @@ -35,7 +35,7 @@ int main(int argc, char *argv[]) { struct sockaddr_in sa; char host[NI_MAXHOST] = "", serv[NI_MAXSERV] = ""; - if (!(asyncns = asyncns_new(5))) { + if (!(asyncns = asyncns_new(10))) { fprintf(stderr, "asyncns_new() failed\n"); goto fail; } diff --git a/libasyncns/asyncns.c b/libasyncns/asyncns.c index d1ce584..6e05635 100644 --- a/libasyncns/asyncns.c +++ b/libasyncns/asyncns.c @@ -23,6 +23,8 @@ #include #endif +/*#undef HAVE_PTHREAD */ + #include #include #include @@ -40,6 +42,10 @@ #include #endif +#if HAVE_PTHREAD +#include +#endif + #include "asyncns.h" #define MAX_WORKERS 16 @@ -50,13 +56,26 @@ typedef enum { REQUEST_ADDRINFO, RESPONSE_ADDRINFO, REQUEST_NAMEINFO, - RESPONSE_NAMEINFO + RESPONSE_NAMEINFO, + REQUEST_TERMINATE } query_type_t; +enum { + REQUEST_RECV_FD = 0, + REQUEST_SEND_FD = 1, + RESPONSE_RECV_FD = 2, + RESPONSE_SEND_FD = 3 +}; + struct asyncns { - int in_fd, out_fd; + int fds[4]; +#ifndef HAVE_PTHREAD pid_t workers[MAX_WORKERS]; +#else + pthread_t workers[MAX_WORKERS]; +#endif + unsigned valid_workers; unsigned current_id, current_index; asyncns_query_t* queries[MAX_QUERIES]; @@ -146,7 +165,7 @@ static int fd_nonblock(int fd) { int i; assert(fd >= 0); - if ((i = fcntl(fd, F_GETFL)) < 0) + if ((i = fcntl(fd, F_GETFL, 0)) < 0) return -1; if (i & O_NONBLOCK) @@ -218,6 +237,9 @@ static int send_addrinfo_reply(int out_fd, unsigned id, int ret, struct addrinfo } } + if (ai) + freeaddrinfo(ai); + return send(out_fd, resp, resp->header.length, 0); } @@ -277,12 +299,9 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) { ret = getaddrinfo(node, service, ai_req->hints_is_null ? NULL : &ai, &result); - ret = send_addrinfo_reply(out_fd, req->id, ret, result); - - if (result) - freeaddrinfo(result); - return ret; + /* send_addrinfo_reply() frees result */ + return send_addrinfo_reply(out_fd, req->id, ret, result); } case REQUEST_NAMEINFO: { @@ -306,6 +325,11 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) { ret == 0 && ni_req->getserv ? servbuf : NULL); } + case REQUEST_TERMINATE: { + /* Quit */ + return -1; + } + default: ; } @@ -313,8 +337,9 @@ static int handle_request(int out_fd, const rheader_t *req, size_t length) { return 0; } -static int worker(int in_fd, int out_fd) { - int r = 0; +#ifndef HAVE_PTHREAD + +static int process_worker(int in_fd, int out_fd) { int have_death_sig = 0; assert(in_fd > 2); assert(out_fd > 2); @@ -372,7 +397,7 @@ static int worker(int in_fd, int out_fd) { FD_SET(in_fd, &fds); if (select(in_fd+1, &fds, NULL, NULL, &tv) < 0) - goto fail; + break; if (getppid() == 1) break; @@ -382,84 +407,106 @@ static int worker(int in_fd, int out_fd) { if (length < 0 && errno == EAGAIN) continue; - - goto fail; + + break; } if (handle_request(out_fd, (rheader_t*) buf, (size_t) length) < 0) - goto fail; + break; } -fail: - close(in_fd); close(out_fd); - return r; + return 0; } -asyncns_t* asyncns_new(int n_proc) { - asyncns_t *asyncns = NULL; - int fd1[2] = { -1, -1 }, fd2[2] = { -1, -1 }, p; - - assert(n_proc >= 1); +#else - if (!(asyncns = malloc(sizeof(asyncns_t)))) - goto fail; +static void* thread_worker(void *p) { + sigset_t fullset; + int *fds = p; + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - asyncns->in_fd = asyncns->out_fd = -1; - memset(asyncns->workers, 0, sizeof(asyncns->workers)); + /* No signals in this thread please */ + sigfillset(&fullset); + pthread_sigmask(SIG_BLOCK, &fullset, NULL); - if (socketpair(PF_UNIX, SOCK_DGRAM, 0, fd1) < 0) - goto fail; + for (;;) { + char buf[BUFSIZE]; + ssize_t length; - if (socketpair(PF_UNIX, SOCK_DGRAM, 0, fd2) < 0) - goto fail; + if ((length = recv(fds[REQUEST_RECV_FD], buf, sizeof(buf), 0)) <= 0) + break; - fd_cloexec(fd1[0]); - fd_cloexec(fd1[1]); - fd_cloexec(fd2[0]); - fd_cloexec(fd2[1]); + if (handle_request(fds[RESPONSE_SEND_FD], (rheader_t*) buf, (size_t) length) < 0) + break; + + } + + return NULL; +} + +#endif + +asyncns_t* asyncns_new(unsigned n_proc) { + asyncns_t *asyncns = NULL; + int i; + unsigned p; + assert(n_proc >= 1); if (n_proc > MAX_WORKERS) n_proc = MAX_WORKERS; - for (p = 0; p < n_proc; p++) { + if (!(asyncns = malloc(sizeof(asyncns_t)))) + goto fail; + + asyncns->valid_workers = 0; + + for (i = 0; i < 4; i++) + asyncns->fds[i] = -1; + + for (p = 0; p < MAX_QUERIES; p++) + asyncns->queries[p] = NULL; + + if (socketpair(PF_UNIX, SOCK_DGRAM, 0, asyncns->fds) < 0 || + socketpair(PF_UNIX, SOCK_DGRAM, 0, asyncns->fds+2) < 0) + goto fail; + + for (i = 0; i < 4; i++) + fd_cloexec(asyncns->fds[i]); + + for (asyncns->valid_workers = 0; asyncns->valid_workers < n_proc; asyncns->valid_workers++) { - if ((asyncns->workers[p] = fork()) < 0) +#ifndef HAVE_PTHREAD + if ((asyncns->workers[asyncns->valid_workers] = fork()) < 0) goto fail; - else if (asyncns->workers[p] == 0) { - close(fd1[0]); - close(fd2[1]); - _exit(worker(fd2[0], fd1[1])); + else if (asyncns->workers[asyncns->valid_workers] == 0) { + close(asyncns->fds[REQUEST_SEND_FD]); + close(asyncns->fds[RESPONSE_RECV_FD]); + _exit(process_worker(asyncns->fds[REQUEST_RECV_FD], asyncns->fds[RESPONSE_SEND_FD])); } +#else + if (pthread_create(&asyncns->workers[asyncns->valid_workers], NULL, thread_worker, asyncns->fds) != 0) + goto fail; +#endif } - close(fd2[0]); - close(fd1[1]); - asyncns->in_fd = fd1[0]; - asyncns->out_fd = fd2[1]; - +#ifndef HAVE_PTHREAD + close(asyncns->fds[REQUEST_RECV_FD]); + close(asyncns->fds[RESPONSE_SEND_FD]); + asyncns->fds[REQUEST_RECV_FD] = asyncns->fds[RESPONSE_SEND_FD] = -1; +#endif + asyncns->current_index = asyncns->current_id = 0; - - for (p = 0; p < MAX_QUERIES; p++) - asyncns->queries[p] = NULL; -/* memset(asyncns->queries, 0, sizeof(asyncns->queries)); */ - asyncns->done_head = asyncns->done_tail = NULL; asyncns->n_queries = 0; - fd_nonblock(asyncns->in_fd); + fd_nonblock(asyncns->fds[RESPONSE_RECV_FD]); return asyncns; fail: - - if (fd1[0] >= 0) close(fd1[0]); - if (fd1[1] >= 0) close(fd1[1]); - if (fd2[0] >= 0) close(fd2[0]); - if (fd2[1] >= 0) close(fd2[1]); - if (asyncns) asyncns_free(asyncns); @@ -467,20 +514,38 @@ fail: } void asyncns_free(asyncns_t *asyncns) { - int p; + unsigned p; + int i; + rheader_t req; assert(asyncns); + + req.type = REQUEST_TERMINATE; + req.length = sizeof(req); + req.id = 0; - if (asyncns->in_fd >= 0) - close(asyncns->in_fd); - if (asyncns->out_fd >= 0) - close(asyncns->out_fd); - - for (p = 0; p < MAX_WORKERS; p++) - if (asyncns->workers[p] >= 1) { - kill(asyncns->workers[p], SIGTERM); - waitpid(asyncns->workers[p], NULL, 0); - } + /* Send one termiantion packet for each worker */ + for (p = 0; p < asyncns->valid_workers; p++) + send(asyncns->fds[REQUEST_SEND_FD], &req, req.length, 0); + + /* No terminate them forcibly*/ + for (p = 0; p < asyncns->valid_workers; p++) { +#ifndef HAVE_PTHREAD + kill(asyncns->workers[p], SIGTERM); + waitpid(asyncns->workers[p], NULL, 0); +#else + pthread_cancel(asyncns->workers[p]); + pthread_join(asyncns->workers[p], NULL); +#endif + } + + /* Due to Solaris' broken thread cancelation we first send an + * termination request and then cancel th thread. */ + + for (i = 0; i < 4; i++) + if (asyncns->fds[i] >= 0) + close(asyncns->fds[i]); + for (p = 0; p < MAX_QUERIES; p++) if (asyncns->queries[p]) asyncns_cancel(asyncns, asyncns->queries[p]); @@ -491,7 +556,7 @@ void asyncns_free(asyncns_t *asyncns) { int asyncns_fd(asyncns_t *asyncns) { assert(asyncns); - return asyncns->in_fd; + return asyncns->fds[RESPONSE_RECV_FD]; } static asyncns_query_t *lookup_query(asyncns_t *asyncns, unsigned id) { @@ -651,7 +716,7 @@ int asyncns_wait(asyncns_t *asyncns, int block) { char buf[BUFSIZE]; ssize_t l; - if (((l = recv(asyncns->in_fd, buf, sizeof(buf), 0)) < 0)) { + if (((l = recv(asyncns->fds[RESPONSE_RECV_FD], buf, sizeof(buf), 0)) < 0)) { fd_set fds; if (errno != EAGAIN) @@ -661,9 +726,9 @@ int asyncns_wait(asyncns_t *asyncns, int block) { return 0; FD_ZERO(&fds); - FD_SET(asyncns->in_fd, &fds); + FD_SET(asyncns->fds[RESPONSE_RECV_FD], &fds); - if (select(asyncns->in_fd+1, &fds, NULL, NULL, NULL) < 0) + if (select(asyncns->fds[RESPONSE_RECV_FD]+1, &fds, NULL, NULL, NULL) < 0) return -1; continue; @@ -745,7 +810,7 @@ asyncns_query_t* asyncns_getaddrinfo(asyncns_t *asyncns, const char *node, const if (service) strcpy((char*) req + sizeof(addrinfo_request_t) + req->node_len, service); - if (send(asyncns->out_fd, req, req->header.length, 0) < 0) + if (send(asyncns->fds[REQUEST_SEND_FD], req, req->header.length, 0) < 0) goto fail; return q; @@ -803,7 +868,7 @@ asyncns_query_t* asyncns_getnameinfo(asyncns_t *asyncns, const struct sockaddr * memcpy((uint8_t*) req + sizeof(nameinfo_request_t), sa, salen); - if (send(asyncns->out_fd, req, req->header.length, 0) < 0) + if (send(asyncns->fds[REQUEST_SEND_FD], req, req->header.length, 0) < 0) goto fail; return q; diff --git a/libasyncns/asyncns.h b/libasyncns/asyncns.h index ac5f789..beb1e47 100644 --- a/libasyncns/asyncns.h +++ b/libasyncns/asyncns.h @@ -55,7 +55,7 @@ typedef struct asyncns asyncns_t; typedef struct asyncns_query asyncns_query_t; /** Allocate a new libasyncns session with n_proc worker processes */ -asyncns_t* asyncns_new(int n_proc); +asyncns_t* asyncns_new(unsigned n_proc); /** Free a libasyncns session. This destroys all attached * asyncns_query_t objects automatically */ -- cgit