From 0f323e6cad4d9b21988d431f90d9a1f051fb3144 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Sun, 16 Feb 2003 15:18:35 +0000 Subject: 2003-02-16 Alexander Larsson * dbus/dbus-connection.c: Implement sent_message_with_reply. (with_reply_and block is still busted). Made dispatch_message not lose message if OOM. * dbus/dbus-errors.h: Add NoReply error (for reply timeouts). --- ChangeLog | 10 ++ dbus/dbus-connection.c | 294 +++++++++++++++++++++++++++++++++++++++++++++++-- dbus/dbus-errors.h | 1 + 3 files changed, 294 insertions(+), 11 deletions(-) diff --git a/ChangeLog b/ChangeLog index 29dcd4e3..5f2710bc 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +2003-02-16 Alexander Larsson + + * dbus/dbus-connection.c: + Implement sent_message_with_reply. (with_reply_and block is still + busted). + Made dispatch_message not lose message if OOM. + + * dbus/dbus-errors.h: + Add NoReply error (for reply timeouts). + 2003-02-16 Alexander Larsson * dbus/dbus-hash.c (_dbus_hash_table_unref): diff --git a/dbus/dbus-connection.c b/dbus/dbus-connection.c index f97a5e13..bedeab4e 100644 --- a/dbus/dbus-connection.c +++ b/dbus/dbus-connection.c @@ -110,13 +110,33 @@ struct DBusConnection DBusDataSlot *data_slots; /**< Data slots */ int n_slots; /**< Slots allocated so far. */ + DBusHashTable *pending_replies; /**< Hash of message serials and their message handlers. */ DBusCounter *connection_counter; /**< Counter that we decrement when finalized */ int client_serial; /**< Client serial. Increments each time a message is sent */ DBusList *disconnect_message_link; }; +typedef struct +{ + DBusConnection *connection; + DBusMessageHandler *handler; + DBusTimeout *timeout; + int serial; + + DBusList *timeout_link; /* Preallocated timeout response */ + + dbus_bool_t timeout_added; + dbus_bool_t connection_added; +} ReplyHandlerData; + +static void reply_handler_data_free (ReplyHandlerData *data); + static void _dbus_connection_free_data_slots_nolock (DBusConnection *connection); +static void _dbus_connection_remove_timeout_locked (DBusConnection *connection, + DBusTimeout *timeout); + + /** * Adds a message to the incoming message queue, returning #FALSE @@ -130,11 +150,29 @@ dbus_bool_t _dbus_connection_queue_received_message (DBusConnection *connection, DBusMessage *message) { + ReplyHandlerData *reply_handler_data; + dbus_int32_t reply_serial; + _dbus_assert (_dbus_transport_get_is_authenticated (connection->transport)); if (!_dbus_list_append (&connection->incoming_messages, message)) return FALSE; + + /* If this is a reply we're waiting on, remove timeout for it */ + reply_serial = _dbus_message_get_reply_serial (message); + if (reply_serial != -1) + { + reply_handler_data = _dbus_hash_table_lookup_int (connection->pending_replies, + reply_serial); + if (reply_handler_data != NULL) + { + if (reply_handler_data->timeout_added) + _dbus_connection_remove_timeout_locked (connection, + reply_handler_data->timeout); + reply_handler_data->timeout_added = FALSE; + } + } dbus_message_ref (message); connection->n_incoming += 1; @@ -296,6 +334,16 @@ _dbus_connection_remove_timeout (DBusConnection *connection, timeout); } +static void +_dbus_connection_remove_timeout_locked (DBusConnection *connection, + DBusTimeout *timeout) +{ + dbus_mutex_lock (connection->mutex); + _dbus_connection_remove_timeout (connection, timeout); + dbus_mutex_unlock (connection->mutex); +} + + /** * Tells the connection that the transport has been disconnected. * Results in posting a disconnect message on the incoming message @@ -422,17 +470,18 @@ _dbus_connection_new_for_transport (DBusTransport *transport) DBusConnection *connection; DBusWatchList *watch_list; DBusTimeoutList *timeout_list; - DBusHashTable *handler_table; + DBusHashTable *handler_table, *pending_replies; DBusMutex *mutex; DBusCondVar *message_returned_cond; DBusCondVar *dispatch_cond; DBusCondVar *io_path_cond; DBusList *disconnect_link; DBusMessage *disconnect_message; - + watch_list = NULL; connection = NULL; handler_table = NULL; + pending_replies = NULL; timeout_list = NULL; mutex = NULL; message_returned_cond = NULL; @@ -454,6 +503,12 @@ _dbus_connection_new_for_transport (DBusTransport *transport) dbus_free, NULL); if (handler_table == NULL) goto error; + + pending_replies = + _dbus_hash_table_new (DBUS_HASH_INT, + NULL, (DBusFreeFunction)reply_handler_data_free); + if (pending_replies == NULL) + goto error; connection = dbus_new0 (DBusConnection, 1); if (connection == NULL) @@ -495,6 +550,7 @@ _dbus_connection_new_for_transport (DBusTransport *transport) connection->watches = watch_list; connection->timeouts = timeout_list; connection->handler_table = handler_table; + connection->pending_replies = pending_replies; connection->filter_list = NULL; connection->data_slots = NULL; @@ -532,6 +588,9 @@ _dbus_connection_new_for_transport (DBusTransport *transport) if (handler_table) _dbus_hash_table_unref (handler_table); + + if (pending_replies) + _dbus_hash_table_unref (pending_replies); if (watch_list) _dbus_watch_list_free (watch_list); @@ -738,9 +797,12 @@ _dbus_connection_last_unref (DBusConnection *connection) link = next; } - + _dbus_hash_table_unref (connection->handler_table); connection->handler_table = NULL; + + _dbus_hash_table_unref (connection->pending_replies); + connection->pending_replies = NULL; _dbus_list_clear (&connection->filter_list); @@ -919,6 +981,54 @@ dbus_connection_send_message (DBusConnection *connection, return TRUE; } +static void +reply_handler_timeout (void *data) +{ + DBusConnection *connection; + ReplyHandlerData *reply_handler_data = data; + + connection = reply_handler_data->connection; + + dbus_mutex_lock (connection->mutex); + if (reply_handler_data->timeout_link) + { + _dbus_connection_queue_synthesized_message_link (connection, + reply_handler_data->timeout_link); + reply_handler_data->timeout_link = NULL; + } + + _dbus_connection_remove_timeout (connection, + reply_handler_data->timeout); + reply_handler_data->timeout_added = FALSE; + + dbus_mutex_unlock (connection->mutex); +} + +static void +reply_handler_data_free (ReplyHandlerData *data) +{ + if (!data) + return; + + if (data->timeout_added) + _dbus_connection_remove_timeout_locked (data->connection, + data->timeout); + + if (data->connection_added) + _dbus_message_handler_remove_connection (data->handler, + data->connection); + + if (data->timeout_link) + { + dbus_message_unref ((DBusMessage *)data->timeout_link->data); + _dbus_list_free_link (data->timeout_link); + } + + dbus_message_handler_unref (data->handler); + + dbus_free (data); +} + /** * Queues a message to send, as with dbus_connection_send_message(), * but also sets up a DBusMessageHandler to receive a reply to the @@ -934,8 +1044,8 @@ dbus_connection_send_message (DBusConnection *connection, * message as a reply, after a reply has been seen the handler is * removed. If a filter filters out the reply before the handler sees * it, the handler is not removed but the timeout will immediately - * fire again. If a filter was dumb and kept removing the timeout - * reply then we'd get in an infinite loop. + * fire. If a filter was dumb and removed the timeout reply then + * the reply is lost (this will give a runtime warning). * * If #NULL is passed for the reply_handler, the timeout reply will * still be generated and placed into the message queue, but no @@ -969,8 +1079,120 @@ dbus_connection_send_message_with_reply (DBusConnection *connection, int timeout_milliseconds, DBusResultCode *result) { - /* FIXME */ - return dbus_connection_send_message (connection, message, NULL, result); + DBusTimeout *timeout; + ReplyHandlerData *data; + DBusMessage *reply; + DBusList *reply_link; + dbus_int32_t serial = -1; + + if (timeout_milliseconds == -1) + timeout_milliseconds = DEFAULT_TIMEOUT_VALUE; + + data = dbus_new0 (ReplyHandlerData, 1); + + if (!data) + { + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + return FALSE; + } + + timeout = _dbus_timeout_new (timeout_milliseconds, reply_handler_timeout, + data, NULL); + + if (!timeout) + { + reply_handler_data_free (data); + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + return FALSE; + } + + dbus_mutex_lock (connection->mutex); + + /* Add timeout */ + if (!_dbus_connection_add_timeout (connection, timeout)) + { + reply_handler_data_free (data); + _dbus_timeout_unref (timeout); + dbus_mutex_unlock (connection->mutex); + + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + return FALSE; + } + + /* The connection now owns the reference to the timeout. */ + _dbus_timeout_unref (timeout); + + data->timeout_added = TRUE; + data->timeout = timeout; + data->connection = connection; + + if (!_dbus_message_handler_add_connection (reply_handler, connection)) + { + dbus_mutex_unlock (connection->mutex); + reply_handler_data_free (data); + + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + return FALSE; + } + data->connection_added = TRUE; + + /* Assign a serial to the message */ + if (_dbus_message_get_client_serial (message) == -1) + { + serial = _dbus_connection_get_next_client_serial (connection); + _dbus_message_set_client_serial (message, serial); + } + + data->handler = reply_handler; + data->serial = serial; + + dbus_message_handler_ref (reply_handler); + + reply = dbus_message_new_error_reply (message, DBUS_ERROR_NO_REPLY, + "No reply within specified time"); + if (!reply) + { + dbus_mutex_unlock (connection->mutex); + reply_handler_data_free (data); + + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + return FALSE; + } + + reply_link = _dbus_list_alloc_link (reply); + if (!reply) + { + dbus_mutex_unlock (connection->mutex); + dbus_message_unref (reply); + reply_handler_data_free (data); + + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + return FALSE; + } + + data->timeout_link = reply_link; + + /* Insert the serial in the pending replies hash. */ + if (!_dbus_hash_table_insert_int (connection->pending_replies, serial, data)) + { + dbus_set_result (result, DBUS_RESULT_NO_MEMORY); + dbus_mutex_unlock (connection->mutex); + reply_handler_data_free (data); + + return FALSE; + } + + dbus_mutex_unlock (connection->mutex); + + if (!dbus_connection_send_message (connection, message, NULL, result)) + { + /* This will free the handler data too */ + _dbus_hash_table_remove_int (connection->pending_replies, serial); + return FALSE; + } + + dbus_set_result (result, DBUS_RESULT_SUCCESS); + return TRUE; } /** @@ -1256,6 +1478,15 @@ _dbus_connection_release_dispatch (DBusConnection *connection) dbus_condvar_wake_one (connection->dispatch_cond); } +static void +_dbus_connection_failed_pop (DBusConnection *connection, + DBusList *message_link) +{ + _dbus_list_prepend_link (&connection->incoming_messages, + message_link); + connection->n_incoming += 1; +} + /** * Pops the first-received message from the current incoming message * queue, runs any handlers for it, then unrefs the message. @@ -1266,11 +1497,19 @@ _dbus_connection_release_dispatch (DBusConnection *connection) dbus_bool_t dbus_connection_dispatch_message (DBusConnection *connection) { + DBusMessageHandler *handler; DBusMessage *message; - DBusList *link, *filter_list_copy; + DBusList *link, *filter_list_copy, *message_link; DBusHandlerResult result; + ReplyHandlerData *reply_handler_data; const char *name; + dbus_int32_t reply_serial; + /* Preallocate link so we can put the message back on failure */ + message_link = _dbus_list_alloc_link (NULL); + if (message_link) + return FALSE; + dbus_mutex_lock (connection->mutex); /* We need to ref the connection since the callback could potentially @@ -1292,13 +1531,20 @@ dbus_connection_dispatch_message (DBusConnection *connection) dbus_connection_unref (connection); return FALSE; } - + + message_link->data = message; + result = DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS; + reply_serial = _dbus_message_get_reply_serial (message); + reply_handler_data = _dbus_hash_table_lookup_int (connection->pending_replies, + reply_serial); + if (!_dbus_list_copy (&connection->filter_list, &filter_list_copy)) { _dbus_connection_release_dispatch (connection); dbus_mutex_unlock (connection->mutex); + _dbus_connection_failed_pop (connection, message_link); dbus_connection_unref (connection); return FALSE; } @@ -1332,15 +1578,40 @@ dbus_connection_dispatch_message (DBusConnection *connection) _dbus_list_clear (&filter_list_copy); dbus_mutex_lock (connection->mutex); + + /* Did a reply we were waiting on get filtered? */ + if (reply_handler_data && result == DBUS_HANDLER_RESULT_REMOVE_MESSAGE) + { + /* Queue the timeout immediately! */ + if (reply_handler_data->timeout_link) + { + _dbus_connection_queue_synthesized_message_link (connection, + reply_handler_data->timeout_link); + reply_handler_data->timeout_link = NULL; + } + else + { + /* We already queued the timeout? Then it was filtered! */ + _dbus_warn ("The timeout for the reply to %d was filtered\n", reply_serial); + } + } if (result == DBUS_HANDLER_RESULT_REMOVE_MESSAGE) goto out; + if (reply_handler_data) + { + dbus_mutex_unlock (connection->mutex); + result = _dbus_message_handler_handle_message (reply_handler_data->handler, + connection, message); + reply_handler_data_free (reply_handler_data); + dbus_mutex_lock (connection->mutex); + goto out; + } + name = dbus_message_get_name (message); if (name != NULL) { - DBusMessageHandler *handler; - handler = _dbus_hash_table_lookup_string (connection->handler_table, name); if (handler != NULL) @@ -1359,6 +1630,7 @@ dbus_connection_dispatch_message (DBusConnection *connection) out: _dbus_connection_release_dispatch (connection); dbus_mutex_unlock (connection->mutex); + _dbus_list_free_link (message_link); dbus_connection_unref (connection); dbus_message_unref (message); diff --git a/dbus/dbus-errors.h b/dbus/dbus-errors.h index fd861feb..6e83dae4 100644 --- a/dbus/dbus-errors.h +++ b/dbus/dbus-errors.h @@ -54,6 +54,7 @@ struct DBusError #define DBUS_ERROR_SPAWN_FAILED "org.freedesktop.DBus.Error.Spawn.Failed" #define DBUS_ERROR_NO_MEMORY "org.freedesktop.DBus.Error.NoMemory" #define DBUS_ERROR_SERVICE_DOES_NOT_EXIST "org.freedesktop.DBus.Error.ServiceDoesNotExist" +#define DBUS_ERROR_NO_REPLY "org.freedesktop.DBus.Error.NoReply" typedef enum { -- cgit