From ebb57e719c32becd95a1efe3dd269c21e5a011b6 Mon Sep 17 00:00:00 2001 From: Havoc Pennington Date: Sun, 16 Mar 2003 20:16:47 +0000 Subject: 2003-03-16 Havoc Pennington * dbus/dbus-string.c (_dbus_string_validate_utf8): oops, unbreak this. always run the test suite before commit... * bus/*: adapt to DBusConnection API changes * glib/dbus-gmain.c: adapt to DBusConnection API changes, requires renaming stuff to avoid dbus_connection_dispatch name conflict. * dbus/dbus-transport.c (_dbus_transport_queue_messages): new function * dbus/dbus-message.c (_dbus_message_loader_queue_messages): separate from _dbus_message_loader_return_buffer() * dbus/dbus-connection.c (dbus_connection_get_n_messages): remove this, because it's now always broken to use; the number of messages in queue vs. the number still buffered by the message loader is undefined/meaningless. Should use dbus_connection_get_dispatch_state(). (dbus_connection_dispatch): rename from dbus_connection_dispatch_message --- dbus/dbus-connection.c | 240 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 176 insertions(+), 64 deletions(-) (limited to 'dbus/dbus-connection.c') diff --git a/dbus/dbus-connection.c b/dbus/dbus-connection.c index 79f27a8d..2b4a7600 100644 --- a/dbus/dbus-connection.c +++ b/dbus/dbus-connection.c @@ -76,8 +76,8 @@ struct DBusConnection DBusMutex *mutex; /**< Lock on the entire DBusConnection */ - dbus_bool_t dispatch_acquired; /**< Protects dispatch_message */ - DBusCondVar *dispatch_cond; /**< Protects dispatch_message */ + dbus_bool_t dispatch_acquired; /**< Protects dispatch() */ + DBusCondVar *dispatch_cond; /**< Protects dispatch() */ dbus_bool_t io_path_acquired; /**< Protects transport io path */ DBusCondVar *io_path_cond; /**< Protects transport io path */ @@ -126,8 +126,9 @@ typedef struct static void reply_handler_data_free (ReplyHandlerData *data); -static void _dbus_connection_remove_timeout_locked (DBusConnection *connection, - DBusTimeout *timeout); +static void _dbus_connection_remove_timeout_locked (DBusConnection *connection, + DBusTimeout *timeout); +static DBusDispatchStatus _dbus_connection_get_dispatch_status_unlocked (DBusConnection *connection); /** * Acquires the connection lock. @@ -359,7 +360,7 @@ _dbus_connection_remove_watch (DBusConnection *connection, * function on a watch that was not previously added. * * @param connection the connection. - * @param timeout the timeout to toggle. + * @param watch the watch to toggle. * @param enabled whether to enable or disable */ void @@ -1355,6 +1356,32 @@ dbus_connection_send_with_reply (DBusConnection *connection, return TRUE; } + +static DBusMessage* +check_for_reply_unlocked (DBusConnection *connection, + dbus_int32_t client_serial) +{ + DBusList *link; + + link = _dbus_list_get_first_link (&connection->incoming_messages); + + while (link != NULL) + { + DBusMessage *reply = link->data; + + if (dbus_message_get_reply_serial (reply) == client_serial) + { + _dbus_list_remove_link (&connection->incoming_messages, link); + connection->n_incoming -= 1; + dbus_message_ref (reply); + return reply; + } + link = _dbus_list_get_next_link (&connection->incoming_messages, link); + } + + return NULL; +} + /** * Sends a message and blocks a certain time period while waiting for a reply. * This function does not dispatch any message handlers until the main loop @@ -1383,11 +1410,11 @@ dbus_connection_send_with_reply_and_block (DBusConnection *connection, DBusError *error) { dbus_int32_t client_serial; - DBusList *link; long start_tv_sec, start_tv_usec; long end_tv_sec, end_tv_usec; long tv_sec, tv_usec; - + DBusDispatchStatus status; + if (timeout_milliseconds == -1) timeout_milliseconds = DEFAULT_TIMEOUT_VALUE; @@ -1423,35 +1450,32 @@ dbus_connection_send_with_reply_and_block (DBusConnection *connection, end_tv_sec, end_tv_usec); /* Now we wait... */ - /* THREAD TODO: This is busted. What if a dispatch_message or pop_message + /* THREAD TODO: This is busted. What if a dispatch() or pop_message * gets the message before we do? */ - block_again: - + /* always block at least once as we know we don't have the reply yet */ _dbus_connection_do_iteration (connection, - DBUS_ITERATION_DO_READING | - DBUS_ITERATION_BLOCK, - timeout_milliseconds); + DBUS_ITERATION_DO_READING | + DBUS_ITERATION_BLOCK, + timeout_milliseconds); - /* Check if we've gotten a reply */ - link = _dbus_list_get_first_link (&connection->incoming_messages); + recheck_status: - while (link != NULL) - { - DBusMessage *reply = link->data; + /* queue messages and get status */ + status = _dbus_connection_get_dispatch_status_unlocked (connection); - if (dbus_message_get_reply_serial (reply) == client_serial) - { - _dbus_list_remove_link (&connection->incoming_messages, link); - connection->n_incoming -= 1; - dbus_message_ref (reply); - - dbus_mutex_unlock (connection->mutex); - return reply; - } - link = _dbus_list_get_next_link (&connection->incoming_messages, link); + if (status == DBUS_DISPATCH_DATA_REMAINS) + { + DBusMessage *reply; + + reply = check_for_reply_unlocked (connection, client_serial); + if (reply != NULL) + { + dbus_mutex_unlock (connection->mutex); + return reply; + } } - + _dbus_get_current_time (&tv_sec, &tv_usec); if (tv_sec < start_tv_sec) @@ -1466,7 +1490,29 @@ dbus_connection_send_with_reply_and_block (DBusConnection *connection, _dbus_verbose ("%d milliseconds remain\n", timeout_milliseconds); _dbus_assert (timeout_milliseconds > 0); - goto block_again; /* not expired yet */ + if (status == DBUS_DISPATCH_NEED_MEMORY) + { + /* Try sleeping a bit, as we aren't sure we need to block for reading, + * we may already have a reply in the buffer and just can't process + * it. + */ + if (timeout_milliseconds < 100) + ; /* just busy loop */ + else if (timeout_milliseconds <= 1000) + _dbus_sleep_milliseconds (timeout_milliseconds / 3); + else + _dbus_sleep_milliseconds (1000); + } + else + { + /* block again, we don't have the reply buffered yet. */ + _dbus_connection_do_iteration (connection, + DBUS_ITERATION_DO_READING | + DBUS_ITERATION_BLOCK, + timeout_milliseconds); + } + + goto recheck_status; } if (dbus_connection_get_is_connected (connection)) @@ -1503,24 +1549,6 @@ dbus_connection_flush (DBusConnection *connection) dbus_mutex_unlock (connection->mutex); } -/** - * Gets the number of messages in the incoming message queue. - * - * @param connection the connection. - * @returns the number of messages in the queue. - */ -int -dbus_connection_get_n_messages (DBusConnection *connection) -{ - int res; - - dbus_mutex_lock (connection->mutex); - res = connection->n_incoming; - dbus_mutex_unlock (connection->mutex); - return res; -} - - /* Call with mutex held. Will drop it while waiting and re-acquire * before returning */ @@ -1551,7 +1579,15 @@ DBusMessage* dbus_connection_borrow_message (DBusConnection *connection) { DBusMessage *message; - + DBusDispatchStatus status; + + /* this is called for the side effect that it queues + * up any messages from the transport + */ + status = dbus_connection_get_dispatch_status (connection); + if (status != DBUS_DISPATCH_DATA_REMAINS) + return NULL; + dbus_mutex_lock (connection->mutex); if (connection->message_borrowed != NULL) @@ -1672,8 +1708,17 @@ DBusMessage* dbus_connection_pop_message (DBusConnection *connection) { DBusMessage *message; - dbus_mutex_lock (connection->mutex); + DBusDispatchStatus status; + /* this is called for the side effect that it queues + * up any messages from the transport + */ + status = dbus_connection_get_dispatch_status (connection); + if (status != DBUS_DISPATCH_DATA_REMAINS) + return NULL; + + dbus_mutex_lock (connection->mutex); + message = _dbus_connection_pop_message_unlocked (connection); dbus_mutex_unlock (connection->mutex); @@ -1724,15 +1769,62 @@ _dbus_connection_failed_pop (DBusConnection *connection, connection->n_incoming += 1; } +static DBusDispatchStatus +_dbus_connection_get_dispatch_status_unlocked (DBusConnection *connection) +{ + if (connection->n_incoming > 0) + return DBUS_DISPATCH_DATA_REMAINS; + else if (!_dbus_transport_queue_messages (connection->transport)) + return DBUS_DISPATCH_NEED_MEMORY; + else + { + DBusDispatchStatus status; + + status = _dbus_transport_get_dispatch_status (connection->transport); + + if (status != DBUS_DISPATCH_COMPLETE) + return status; + else if (connection->n_incoming > 0) + return DBUS_DISPATCH_DATA_REMAINS; + else + return DBUS_DISPATCH_COMPLETE; + } +} + +/** + * Gets the current state (what we would currently return + * from dbus_connection_dispatch()) but doesn't actually + * dispatch any messages. + * + * @param connection the connection. + * @returns current dispatch status + */ +DBusDispatchStatus +dbus_connection_get_dispatch_status (DBusConnection *connection) +{ + DBusDispatchStatus status; + + dbus_mutex_lock (connection->mutex); + + status = _dbus_connection_get_dispatch_status_unlocked (connection); + + dbus_mutex_unlock (connection->mutex); + + return status; +} + /** - * Pops the first-received message from the current incoming message - * queue, runs any handlers for it, then unrefs the message. + * Processes data buffered while handling watches, queueing zero or + * more incoming messages. Then pops the first-received message from + * the current incoming message queue, runs any handlers for it, and + * unrefs the message. Returns a status indicating whether messages/data + * remain, more memory is needed, or all data has been processed. * * @param connection the connection - * @returns #TRUE if the queue is not empty after dispatch + * @returns dispatch status */ -dbus_bool_t -dbus_connection_dispatch_message (DBusConnection *connection) +DBusDispatchStatus +dbus_connection_dispatch (DBusConnection *connection) { DBusMessageHandler *handler; DBusMessage *message; @@ -1741,9 +1833,14 @@ dbus_connection_dispatch_message (DBusConnection *connection) ReplyHandlerData *reply_handler_data; const char *name; dbus_int32_t reply_serial; + DBusDispatchStatus status; - dbus_mutex_lock (connection->mutex); + status = dbus_connection_get_dispatch_status (connection); + if (status != DBUS_DISPATCH_DATA_REMAINS) + return status; + dbus_mutex_lock (connection->mutex); + /* We need to ref the connection since the callback could potentially * drop the last ref to it */ @@ -1753,17 +1850,23 @@ dbus_connection_dispatch_message (DBusConnection *connection) /* This call may drop the lock during the execution (if waiting for * borrowed messages to be returned) but the order of message - * dispatch if several threads call dispatch_message is still + * dispatch if several threads call dispatch() is still * protected by the lock, since only one will get the lock, and that * one will finish the message dispatching */ message_link = _dbus_connection_pop_message_link_unlocked (connection); if (message_link == NULL) { + /* another thread dispatched our stuff */ + _dbus_connection_release_dispatch (connection); dbus_mutex_unlock (connection->mutex); + + status = dbus_connection_get_dispatch_status (connection); + dbus_connection_unref (connection); - return FALSE; + + return status; } message = message_link->data; @@ -1780,14 +1883,14 @@ dbus_connection_dispatch_message (DBusConnection *connection) dbus_mutex_unlock (connection->mutex); _dbus_connection_failed_pop (connection, message_link); dbus_connection_unref (connection); - return FALSE; + return DBUS_DISPATCH_NEED_MEMORY; } _dbus_list_foreach (&filter_list_copy, (DBusForeachFunction)dbus_message_handler_ref, NULL); - /* We're still protected from dispatch_message reentrancy here + /* We're still protected from dispatch() reentrancy here * since we acquired the dispatcher */ dbus_mutex_unlock (connection->mutex); @@ -1855,8 +1958,9 @@ dbus_connection_dispatch_message (DBusConnection *connection) name); if (handler != NULL) { - /* We're still protected from dispatch_message reentrancy here - * since we acquired the dispatcher */ + /* We're still protected from dispatch() reentrancy here + * since we acquired the dispatcher + */ dbus_mutex_unlock (connection->mutex); _dbus_verbose (" running app handler on message %p\n", message); @@ -1876,10 +1980,15 @@ dbus_connection_dispatch_message (DBusConnection *connection) _dbus_connection_release_dispatch (connection); dbus_mutex_unlock (connection->mutex); _dbus_list_free_link (message_link); + dbus_message_unref (message); /* don't want the message to count in max message limits + * in computing dispatch status + */ + + status = dbus_connection_get_dispatch_status (connection); + dbus_connection_unref (connection); - dbus_message_unref (message); - return connection->n_incoming > 0; + return status; } /** @@ -2458,6 +2567,9 @@ dbus_connection_get_max_message_size (DBusConnection *connection) * and that contains a half-dozen small messages, we may exceed the * size max by that amount. But this should be inconsequential. * + * This does imply that we can't call read() with a buffer larger + * than we're willing to exceed this limit by. + * * @param connection the connection * @param size the maximum size in bytes of all outstanding messages */ -- cgit