From 041b0767b284034aee09e9a0de2a3844b8cc546a Mon Sep 17 00:00:00 2001 From: Havoc Pennington Date: Mon, 25 Nov 2002 05:13:09 +0000 Subject: 2002-11-24 Havoc Pennington * test/echo-client.c, test/echo-server.c: cheesy test clients. * configure.in (AC_CHECK_FUNCS): check for writev * dbus/dbus-message.c (_dbus_message_get_network_data): new function * dbus/dbus-list.c (_dbus_list_foreach): new function * dbus/dbus-internals.c (_dbus_verbose): new function * dbus/dbus-server.c, dbus/dbus-server.h: public object representing a server that listens for connections. * dbus/.cvsignore: create * dbus/dbus-errors.h, dbus/dbus-errors.c: public API for reporting errors * dbus/dbus-connection.h, dbus/dbus-connection.c: public object representing a connection that sends/receives messages. (Same object used for both client and server.) * dbus/dbus-transport.h, dbus/dbus-transport.c: Basic abstraction for different kinds of stream that we might read/write messages from. --- dbus/dbus-transport-unix.c | 581 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 581 insertions(+) create mode 100644 dbus/dbus-transport-unix.c (limited to 'dbus/dbus-transport-unix.c') diff --git a/dbus/dbus-transport-unix.c b/dbus/dbus-transport-unix.c new file mode 100644 index 00000000..869aa33f --- /dev/null +++ b/dbus/dbus-transport-unix.c @@ -0,0 +1,581 @@ +/* -*- mode: C; c-file-style: "gnu" -*- */ +/* dbus-transport-unix.c UNIX socket subclasses of DBusTransport + * + * Copyright (C) 2002 Red Hat Inc. + * + * Licensed under the Academic Free License version 1.2 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + */ + +#include "dbus-internals.h" +#include "dbus-connection-internal.h" +#include "dbus-transport-unix.h" +#include "dbus-transport-protected.h" +#include "dbus-watch.h" +#include +#include +#include +#include +#include +#include +#ifdef HAVE_WRITEV +#include +#endif + +/** + * @defgroup DBusTransportUnix DBusTransport implementations for UNIX + * @ingroup DBusInternals + * @brief Implementation details of DBusTransport on UNIX + * + * @{ + */ + +/** + * Opaque object representing a Unix file descriptor transport. + */ +typedef struct DBusTransportUnix DBusTransportUnix; + +/** + * Implementation details of DBusTransportUnix. All members are private. + */ +struct DBusTransportUnix +{ + DBusTransport base; /**< Parent instance */ + int fd; /**< File descriptor. */ + DBusWatch *watch; /**< Watch for readability. */ + DBusWatch *write_watch; /**< Watch for writability. */ + + int max_bytes_read_per_iteration; /**< To avoid blocking too long. */ + int max_bytes_written_per_iteration; /**< To avoid blocking too long. */ + + int message_bytes_written; /**< Number of bytes of current + * outgoing message that have + * been written. + */ +}; + +static void +unix_finalize (DBusTransport *transport) +{ + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + + _dbus_transport_finalize_base (transport); + + if (unix_transport->watch) + { + _dbus_watch_invalidate (unix_transport->watch); + _dbus_watch_unref (unix_transport->watch); + } + + dbus_free (transport); +} + +static void +do_io_error (DBusTransport *transport) +{ + _dbus_transport_disconnect (transport); + _dbus_connection_transport_error (transport->connection, + DBUS_RESULT_DISCONNECTED); +} + +static void +do_writing (DBusTransport *transport) +{ + int total; + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + + total = 0; + + again: + + while (_dbus_connection_have_messages_to_send (transport->connection)) + { + int bytes_written; + DBusMessage *message; + const unsigned char *header; + const unsigned char *body; + int header_len, body_len; + + if (total > unix_transport->max_bytes_written_per_iteration) + { + _dbus_verbose ("%d bytes exceeds %d bytes written per iteration, returning\n", + total, unix_transport->max_bytes_written_per_iteration); + goto out; + } + + message = _dbus_connection_get_message_to_send (transport->connection); + _dbus_assert (message != NULL); + _dbus_message_lock (message); + + _dbus_message_get_network_data (message, + &header, &header_len, + &body, &body_len); + + if (unix_transport->message_bytes_written < header_len) + { +#ifdef HAVE_WRITEV + struct iovec vectors[2]; + + vectors[0].iov_base = header + unix_transport->message_bytes_written; + vectors[0].iov_len = header_len - unix_transport->message_bytes_written; + vectors[1].iov_base = body; + vectors[1].iov_len = body_len; + + bytes_written = writev (unix_transport->fd, + vectors, _DBUS_N_ELEMENTS (vectors)); +#else + bytes_written = write (unix_transport->fd, + header + unix_transport->message_bytes_written, + header_len - unix_transport->message_bytes_written); +#endif + } + else + { + bytes_written = write (unix_transport->fd, + body + + (unix_transport->message_bytes_written - header_len), + body_len - + (unix_transport->message_bytes_written - body_len)); + } + + if (bytes_written < 0) + { + if (errno == EINTR) + goto again; + else if (errno == EAGAIN || + errno == EWOULDBLOCK) + goto out; + else + { + _dbus_verbose ("Error writing to message bus: %s\n", + _dbus_strerror (errno)); + do_io_error (transport); + goto out; + } + } + else + { + _dbus_verbose (" wrote %d bytes\n", bytes_written); + + total += bytes_written; + unix_transport->message_bytes_written += bytes_written; + + _dbus_assert (unix_transport->message_bytes_written <= + (header_len + body_len)); + + if (unix_transport->message_bytes_written == (header_len + body_len)) + { + _dbus_connection_message_sent (transport->connection, + message); + unix_transport->message_bytes_written = 0; + } + } + } + + out: + return; /* I think some C compilers require a statement after a label */ +} + +static void +do_reading (DBusTransport *transport) +{ + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + unsigned char *buffer; + int buffer_len; + int bytes_read; + int total; + + total = 0; + + again: + + if (total > unix_transport->max_bytes_read_per_iteration) + { + _dbus_verbose ("%d bytes exceeds %d bytes read per iteration, returning\n", + total, unix_transport->max_bytes_read_per_iteration); + goto out; + } + + if (!_dbus_message_loader_get_buffer (transport->loader, + &buffer, &buffer_len)) + goto out; /* no memory for a buffer */ + + bytes_read = read (unix_transport->fd, + buffer, buffer_len); + + _dbus_message_loader_return_buffer (transport->loader, + buffer, + bytes_read < 0 ? 0 : bytes_read); + + if (bytes_read < 0) + { + if (errno == EINTR) + goto again; + else if (errno == EAGAIN || + errno == EWOULDBLOCK) + goto out; + else + { + _dbus_verbose ("Error reading from message bus: %s\n", + _dbus_strerror (errno)); + do_io_error (transport); + goto out; + } + } + else if (bytes_read == 0) + { + _dbus_verbose ("Disconnected from message bus\n"); + do_io_error (transport); + goto out; + } + else + { + DBusMessage *message; + + _dbus_verbose (" read %d bytes\n", bytes_read); + + total += bytes_read; + + /* Queue any messages */ + while ((message = _dbus_message_loader_pop_message (transport->loader))) + { + _dbus_verbose ("queueing received message %p\n", message); + + _dbus_connection_queue_received_message (transport->connection, + message); + dbus_message_unref (message); + } + + /* Try reading more data until we get EAGAIN and return, or + * exceed max bytes per iteration. If in blocking mode of + * course we'll block instead of returning. + */ + goto again; + } + + out: + return; /* I think some C compilers require a statement after a label */ +} + +static void +unix_handle_watch (DBusTransport *transport, + DBusWatch *watch, + unsigned int flags) +{ + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + + _dbus_assert (watch == unix_transport->watch || + watch == unix_transport->write_watch); + + if (flags & (DBUS_WATCH_HANGUP | DBUS_WATCH_ERROR)) + { + _dbus_transport_disconnect (transport); + _dbus_connection_transport_error (transport->connection, + DBUS_RESULT_DISCONNECTED); + return; + } + + if (watch == unix_transport->watch && + (flags & DBUS_WATCH_READABLE)) + do_reading (transport); + else if (watch == unix_transport->write_watch && + (flags & DBUS_WATCH_WRITABLE)) + do_writing (transport); +} + +static void +unix_disconnect (DBusTransport *transport) +{ + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + + if (unix_transport->watch) + { + _dbus_connection_remove_watch (transport->connection, + unix_transport->watch); + _dbus_watch_invalidate (unix_transport->watch); + _dbus_watch_unref (unix_transport->watch); + unix_transport->watch = NULL; + } + + close (unix_transport->fd); + unix_transport->fd = -1; +} + +static void +unix_connection_set (DBusTransport *transport) +{ + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + DBusWatch *watch; + + _dbus_assert (unix_transport->watch == NULL); + + watch = _dbus_watch_new (unix_transport->fd, + DBUS_WATCH_READABLE); + + if (watch == NULL) + { + _dbus_transport_disconnect (transport); + return; + } + + if (!_dbus_connection_add_watch (transport->connection, + watch)) + { + _dbus_transport_disconnect (transport); + return; + } + + unix_transport->watch = watch; +} + +static void +unix_messages_pending (DBusTransport *transport, + int messages_pending) +{ + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + + if (messages_pending > 0 && + unix_transport->write_watch == NULL) + { + unix_transport->write_watch = + _dbus_watch_new (unix_transport->fd, + DBUS_WATCH_WRITABLE); + + /* we can maybe add it some other time, just silently bomb */ + if (unix_transport->write_watch == NULL) + return; + + if (!_dbus_connection_add_watch (transport->connection, + unix_transport->write_watch)) + { + _dbus_watch_invalidate (unix_transport->write_watch); + _dbus_watch_unref (unix_transport->write_watch); + unix_transport->write_watch = NULL; + } + } + else if (messages_pending == 0 && + unix_transport->write_watch != NULL) + { + _dbus_connection_remove_watch (transport->connection, + unix_transport->write_watch); + _dbus_watch_invalidate (unix_transport->write_watch); + _dbus_watch_unref (unix_transport->write_watch); + unix_transport->write_watch = NULL; + } +} + +static void +unix_do_iteration (DBusTransport *transport, + unsigned int flags, + int timeout_milliseconds) +{ + DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport; + fd_set read_set; + fd_set write_set; + dbus_bool_t do_select; + + do_select = FALSE; + + FD_ZERO (&read_set); + if (flags & DBUS_ITERATION_DO_READING) + { + FD_SET (unix_transport->fd, &read_set); + do_select = TRUE; + } + + FD_ZERO (&write_set); + if (flags & DBUS_ITERATION_DO_WRITING) + { + FD_SET (unix_transport->fd, &write_set); + do_select = TRUE; + } + + if (do_select) + { + fd_set err_set; + struct timeval timeout; + dbus_bool_t use_timeout; + + again: + + FD_ZERO (&err_set); + FD_SET (unix_transport->fd, &err_set); + + if (flags & DBUS_ITERATION_BLOCK) + { + if (timeout_milliseconds >= 0) + { + timeout.tv_sec = timeout_milliseconds / 1000; + timeout.tv_usec = (timeout_milliseconds % 1000) * 1000; + + /* Always use timeout if one is passed in. */ + use_timeout = TRUE; + } + else + { + use_timeout = FALSE; /* NULL timeout to block forever */ + } + } + else + { + /* 0 timeout to not block */ + timeout.tv_sec = 0; + timeout.tv_usec = 0; + use_timeout = TRUE; + } + + if (select (unix_transport->fd + 1, &read_set, &write_set, &err_set, + use_timeout ? &timeout : NULL) >= 0) + { + if (FD_ISSET (unix_transport->fd, &err_set)) + do_io_error (transport); + else + { + if (FD_ISSET (unix_transport->fd, &read_set)) + do_reading (transport); + if (FD_ISSET (unix_transport->fd, &write_set)) + do_writing (transport); + } + } + else if (errno == EINTR) + goto again; + else + { + _dbus_verbose ("Error from select(): %s\n", + _dbus_strerror (errno)); + } + } +} + +static DBusTransportVTable unix_vtable = { + unix_finalize, + unix_handle_watch, + unix_disconnect, + unix_connection_set, + unix_messages_pending, + unix_do_iteration +}; + +/** + * Creates a new transport for the given file descriptor. The file + * descriptor must be nonblocking (use _dbus_set_fd_nonblocking() to + * make it so). This function is shared by various transports that + * boil down to a full duplex file descriptor. + * + * @param fd the file descriptor. + * @returns the new transport, or #NULL if no memory. + */ +DBusTransport* +_dbus_transport_new_for_fd (int fd) +{ + DBusTransportUnix *unix_transport; + + unix_transport = dbus_new0 (DBusTransportUnix, 1); + if (unix_transport == NULL) + return NULL; + + if (!_dbus_transport_init_base (&unix_transport->base, + &unix_vtable)) + { + dbus_free (unix_transport); + return NULL; + } + + unix_transport->fd = fd; + unix_transport->message_bytes_written = 0; + + /* These values should probably be tunable or something. */ + unix_transport->max_bytes_read_per_iteration = 2048; + unix_transport->max_bytes_written_per_iteration = 2048; + + return (DBusTransport*) unix_transport; +} + +/** + * Creates a new transport for the given Unix domain socket + * path. + * + * @param path the path to the domain socket. + * @param result location to store reason for failure. + * @returns a new transport, or #NULL on failure. + */ +DBusTransport* +_dbus_transport_new_for_domain_socket (const char *path, + DBusResultCode *result) +{ + int fd; + DBusTransport *transport; + struct sockaddr_un addr; + + transport = NULL; + + fd = socket (AF_LOCAL, SOCK_STREAM, 0); + + if (fd < 0) + { + dbus_set_result (result, + _dbus_result_from_errno (errno)); + + _dbus_verbose ("Failed to create socket: %s\n", + _dbus_strerror (errno)); + + goto out; + } + + _DBUS_ZERO (addr); + addr.sun_family = AF_LOCAL; + strncpy (addr.sun_path, path, _DBUS_MAX_SUN_PATH_LENGTH); + addr.sun_path[_DBUS_MAX_SUN_PATH_LENGTH] = '\0'; + + if (connect (fd, (struct sockaddr*) &addr, sizeof (addr)) < 0) + { + dbus_set_result (result, + _dbus_result_from_errno (errno)); + + _dbus_verbose ("Failed to connect to socket %s: %s\n", + path, _dbus_strerror (errno)); + + close (fd); + fd = -1; + + goto out; + } + + if (!_dbus_set_fd_nonblocking (fd, result)) + { + close (fd); + fd = -1; + + goto out; + } + + transport = _dbus_transport_new_for_fd (fd); + if (transport == NULL) + { + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + close (fd); + fd = -1; + goto out; + } + + out: + return transport; +} + + +/** @} */ + -- cgit