From ebb57e719c32becd95a1efe3dd269c21e5a011b6 Mon Sep 17 00:00:00 2001 From: Havoc Pennington Date: Sun, 16 Mar 2003 20:16:47 +0000 Subject: 2003-03-16 Havoc Pennington * dbus/dbus-string.c (_dbus_string_validate_utf8): oops, unbreak this. always run the test suite before commit... * bus/*: adapt to DBusConnection API changes * glib/dbus-gmain.c: adapt to DBusConnection API changes, requires renaming stuff to avoid dbus_connection_dispatch name conflict. * dbus/dbus-transport.c (_dbus_transport_queue_messages): new function * dbus/dbus-message.c (_dbus_message_loader_queue_messages): separate from _dbus_message_loader_return_buffer() * dbus/dbus-connection.c (dbus_connection_get_n_messages): remove this, because it's now always broken to use; the number of messages in queue vs. the number still buffered by the message loader is undefined/meaningless. Should use dbus_connection_get_dispatch_state(). (dbus_connection_dispatch): rename from dbus_connection_dispatch_message --- ChangeLog | 25 +++++ bus/connection.c | 8 +- bus/dispatch.c | 10 +- bus/utils.c | 17 +++ bus/utils.h | 5 + dbus/dbus-bus.c | 4 +- dbus/dbus-connection.c | 240 +++++++++++++++++++++++++++++++------------ dbus/dbus-connection.h | 105 +++++++++---------- dbus/dbus-memory.c | 2 + dbus/dbus-mempool.c | 2 +- dbus/dbus-message-handler.c | 2 +- dbus/dbus-message-internal.h | 3 +- dbus/dbus-message.c | 81 +++++++++++---- dbus/dbus-server.c | 2 +- dbus/dbus-string.c | 16 +-- dbus/dbus-transport-unix.c | 36 +------ dbus/dbus-transport.c | 64 ++++++++++++ dbus/dbus-transport.h | 48 ++++----- glib/dbus-gmain.c | 64 ++++++------ 19 files changed, 485 insertions(+), 249 deletions(-) diff --git a/ChangeLog b/ChangeLog index 7143d131..9189b71e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,28 @@ +2003-03-16 Havoc Pennington + + * dbus/dbus-string.c (_dbus_string_validate_utf8): oops, unbreak + this. always run the test suite before commit... + + * bus/*: adapt to DBusConnection API changes + + * glib/dbus-gmain.c: adapt to DBusConnection API changes, + requires renaming stuff to avoid dbus_connection_dispatch name + conflict. + + * dbus/dbus-transport.c (_dbus_transport_queue_messages): new + function + + * dbus/dbus-message.c (_dbus_message_loader_queue_messages): + separate from _dbus_message_loader_return_buffer() + + * dbus/dbus-connection.c (dbus_connection_get_n_messages): remove + this, because it's now always broken to use; the number of + messages in queue vs. the number still buffered by the message + loader is undefined/meaningless. Should use + dbus_connection_get_dispatch_state(). + (dbus_connection_dispatch): rename from + dbus_connection_dispatch_message + 2003-03-16 Havoc Pennington * dbus/dbus-string.c (_dbus_string_validate_utf8): copy in a real diff --git a/bus/connection.c b/bus/connection.c index 3308df0f..2169d8ab 100644 --- a/bus/connection.c +++ b/bus/connection.c @@ -141,8 +141,8 @@ connection_watch_callback (DBusWatch *watch, dbus_connection_handle_watch (connection, watch, condition); - while (dbus_connection_dispatch_message (connection)) - ; + bus_connection_dispatch_all_messages (connection); + dbus_connection_unref (connection); } @@ -171,8 +171,8 @@ connection_timeout_callback (DBusTimeout *timeout, dbus_timeout_handle (timeout); - while (dbus_connection_dispatch_message (connection)) - ; + bus_connection_dispatch_all_messages (connection); + dbus_connection_unref (connection); } diff --git a/bus/dispatch.c b/bus/dispatch.c index 0fe4be19..3c96d704 100644 --- a/bus/dispatch.c +++ b/bus/dispatch.c @@ -514,7 +514,7 @@ kill_client_connection (BusContext *context, _dbus_assert (bus_test_client_listed (connection)); /* Run disconnect handler in test.c */ - if (dbus_connection_dispatch_message (connection)) + if (bus_connection_dispatch_one_message (connection)) _dbus_assert_not_reached ("something received on connection being killed other than the disconnect"); _dbus_assert (!dbus_connection_get_is_connected (connection)); @@ -859,7 +859,7 @@ check_hello_connection (BusContext *context) dbus_connection_ref (connection); dbus_connection_disconnect (connection); /* dispatching disconnect handler will unref once */ - if (dbus_connection_dispatch_message (connection)) + if (bus_connection_dispatch_one_message (connection)) _dbus_assert_not_reached ("message other than disconnect dispatched after failure to register"); dbus_connection_unref (connection); _dbus_assert (!bus_test_client_listed (connection)); @@ -967,19 +967,19 @@ bus_dispatch_test (const DBusString *test_data_dir) check_hello_connection); dbus_connection_disconnect (foo); - if (dbus_connection_dispatch_message (foo)) + if (bus_connection_dispatch_one_message (foo)) _dbus_assert_not_reached ("extra message in queue"); dbus_connection_unref (foo); _dbus_assert (!bus_test_client_listed (foo)); dbus_connection_disconnect (bar); - if (dbus_connection_dispatch_message (bar)) + if (bus_connection_dispatch_one_message (bar)) _dbus_assert_not_reached ("extra message in queue"); dbus_connection_unref (bar); _dbus_assert (!bus_test_client_listed (bar)); dbus_connection_disconnect (baz); - if (dbus_connection_dispatch_message (baz)) + if (bus_connection_dispatch_one_message (baz)) _dbus_assert_not_reached ("extra message in queue"); dbus_connection_unref (baz); _dbus_assert (!bus_test_client_listed (baz)); diff --git a/bus/utils.c b/bus/utils.c index fadfc140..8a68d8a4 100644 --- a/bus/utils.c +++ b/bus/utils.c @@ -39,3 +39,20 @@ bus_wait_for_memory (void) #endif } +void +bus_connection_dispatch_all_messages (DBusConnection *connection) +{ + while (bus_connection_dispatch_one_message (connection)) + ; +} + +dbus_bool_t +bus_connection_dispatch_one_message (DBusConnection *connection) +{ + DBusDispatchStatus status; + + while ((status = dbus_connection_dispatch (connection)) == DBUS_DISPATCH_NEED_MEMORY) + bus_wait_for_memory (); + + return status == DBUS_DISPATCH_DATA_REMAINS; +} diff --git a/bus/utils.h b/bus/utils.h index 41eb5557..968ece37 100644 --- a/bus/utils.h +++ b/bus/utils.h @@ -25,9 +25,14 @@ #ifndef BUS_UTILS_H #define BUS_UTILS_H +#include + void bus_wait_for_memory (void); extern const char bus_no_memory_message[]; #define BUS_SET_OOM(error) dbus_set_error ((error), DBUS_ERROR_NO_MEMORY, bus_no_memory_message) +void bus_connection_dispatch_all_messages (DBusConnection *connection); +dbus_bool_t bus_connection_dispatch_one_message (DBusConnection *connection); + #endif /* BUS_ACTIVATION_H */ diff --git a/dbus/dbus-bus.c b/dbus/dbus-bus.c index 3e409257..84434888 100644 --- a/dbus/dbus-bus.c +++ b/dbus/dbus-bus.c @@ -49,7 +49,7 @@ */ typedef struct { - char *base_service; + char *base_service; /**< Base service name of this connection */ } BusData; @@ -244,7 +244,7 @@ dbus_bus_register (DBusConnection *connection, * once per connection. * * @param connection the connection - * @param the base service name + * @param base_service the base service name * @returns #FALSE if not enough memory */ dbus_bool_t diff --git a/dbus/dbus-connection.c b/dbus/dbus-connection.c index 79f27a8d..2b4a7600 100644 --- a/dbus/dbus-connection.c +++ b/dbus/dbus-connection.c @@ -76,8 +76,8 @@ struct DBusConnection DBusMutex *mutex; /**< Lock on the entire DBusConnection */ - dbus_bool_t dispatch_acquired; /**< Protects dispatch_message */ - DBusCondVar *dispatch_cond; /**< Protects dispatch_message */ + dbus_bool_t dispatch_acquired; /**< Protects dispatch() */ + DBusCondVar *dispatch_cond; /**< Protects dispatch() */ dbus_bool_t io_path_acquired; /**< Protects transport io path */ DBusCondVar *io_path_cond; /**< Protects transport io path */ @@ -126,8 +126,9 @@ typedef struct static void reply_handler_data_free (ReplyHandlerData *data); -static void _dbus_connection_remove_timeout_locked (DBusConnection *connection, - DBusTimeout *timeout); +static void _dbus_connection_remove_timeout_locked (DBusConnection *connection, + DBusTimeout *timeout); +static DBusDispatchStatus _dbus_connection_get_dispatch_status_unlocked (DBusConnection *connection); /** * Acquires the connection lock. @@ -359,7 +360,7 @@ _dbus_connection_remove_watch (DBusConnection *connection, * function on a watch that was not previously added. * * @param connection the connection. - * @param timeout the timeout to toggle. + * @param watch the watch to toggle. * @param enabled whether to enable or disable */ void @@ -1355,6 +1356,32 @@ dbus_connection_send_with_reply (DBusConnection *connection, return TRUE; } + +static DBusMessage* +check_for_reply_unlocked (DBusConnection *connection, + dbus_int32_t client_serial) +{ + DBusList *link; + + link = _dbus_list_get_first_link (&connection->incoming_messages); + + while (link != NULL) + { + DBusMessage *reply = link->data; + + if (dbus_message_get_reply_serial (reply) == client_serial) + { + _dbus_list_remove_link (&connection->incoming_messages, link); + connection->n_incoming -= 1; + dbus_message_ref (reply); + return reply; + } + link = _dbus_list_get_next_link (&connection->incoming_messages, link); + } + + return NULL; +} + /** * Sends a message and blocks a certain time period while waiting for a reply. * This function does not dispatch any message handlers until the main loop @@ -1383,11 +1410,11 @@ dbus_connection_send_with_reply_and_block (DBusConnection *connection, DBusError *error) { dbus_int32_t client_serial; - DBusList *link; long start_tv_sec, start_tv_usec; long end_tv_sec, end_tv_usec; long tv_sec, tv_usec; - + DBusDispatchStatus status; + if (timeout_milliseconds == -1) timeout_milliseconds = DEFAULT_TIMEOUT_VALUE; @@ -1423,35 +1450,32 @@ dbus_connection_send_with_reply_and_block (DBusConnection *connection, end_tv_sec, end_tv_usec); /* Now we wait... */ - /* THREAD TODO: This is busted. What if a dispatch_message or pop_message + /* THREAD TODO: This is busted. What if a dispatch() or pop_message * gets the message before we do? */ - block_again: - + /* always block at least once as we know we don't have the reply yet */ _dbus_connection_do_iteration (connection, - DBUS_ITERATION_DO_READING | - DBUS_ITERATION_BLOCK, - timeout_milliseconds); + DBUS_ITERATION_DO_READING | + DBUS_ITERATION_BLOCK, + timeout_milliseconds); - /* Check if we've gotten a reply */ - link = _dbus_list_get_first_link (&connection->incoming_messages); + recheck_status: - while (link != NULL) - { - DBusMessage *reply = link->data; + /* queue messages and get status */ + status = _dbus_connection_get_dispatch_status_unlocked (connection); - if (dbus_message_get_reply_serial (reply) == client_serial) - { - _dbus_list_remove_link (&connection->incoming_messages, link); - connection->n_incoming -= 1; - dbus_message_ref (reply); - - dbus_mutex_unlock (connection->mutex); - return reply; - } - link = _dbus_list_get_next_link (&connection->incoming_messages, link); + if (status == DBUS_DISPATCH_DATA_REMAINS) + { + DBusMessage *reply; + + reply = check_for_reply_unlocked (connection, client_serial); + if (reply != NULL) + { + dbus_mutex_unlock (connection->mutex); + return reply; + } } - + _dbus_get_current_time (&tv_sec, &tv_usec); if (tv_sec < start_tv_sec) @@ -1466,7 +1490,29 @@ dbus_connection_send_with_reply_and_block (DBusConnection *connection, _dbus_verbose ("%d milliseconds remain\n", timeout_milliseconds); _dbus_assert (timeout_milliseconds > 0); - goto block_again; /* not expired yet */ + if (status == DBUS_DISPATCH_NEED_MEMORY) + { + /* Try sleeping a bit, as we aren't sure we need to block for reading, + * we may already have a reply in the buffer and just can't process + * it. + */ + if (timeout_milliseconds < 100) + ; /* just busy loop */ + else if (timeout_milliseconds <= 1000) + _dbus_sleep_milliseconds (timeout_milliseconds / 3); + else + _dbus_sleep_milliseconds (1000); + } + else + { + /* block again, we don't have the reply buffered yet. */ + _dbus_connection_do_iteration (connection, + DBUS_ITERATION_DO_READING | + DBUS_ITERATION_BLOCK, + timeout_milliseconds); + } + + goto recheck_status; } if (dbus_connection_get_is_connected (connection)) @@ -1503,24 +1549,6 @@ dbus_connection_flush (DBusConnection *connection) dbus_mutex_unlock (connection->mutex); } -/** - * Gets the number of messages in the incoming message queue. - * - * @param connection the connection. - * @returns the number of messages in the queue. - */ -int -dbus_connection_get_n_messages (DBusConnection *connection) -{ - int res; - - dbus_mutex_lock (connection->mutex); - res = connection->n_incoming; - dbus_mutex_unlock (connection->mutex); - return res; -} - - /* Call with mutex held. Will drop it while waiting and re-acquire * before returning */ @@ -1551,7 +1579,15 @@ DBusMessage* dbus_connection_borrow_message (DBusConnection *connection) { DBusMessage *message; - + DBusDispatchStatus status; + + /* this is called for the side effect that it queues + * up any messages from the transport + */ + status = dbus_connection_get_dispatch_status (connection); + if (status != DBUS_DISPATCH_DATA_REMAINS) + return NULL; + dbus_mutex_lock (connection->mutex); if (connection->message_borrowed != NULL) @@ -1672,8 +1708,17 @@ DBusMessage* dbus_connection_pop_message (DBusConnection *connection) { DBusMessage *message; - dbus_mutex_lock (connection->mutex); + DBusDispatchStatus status; + /* this is called for the side effect that it queues + * up any messages from the transport + */ + status = dbus_connection_get_dispatch_status (connection); + if (status != DBUS_DISPATCH_DATA_REMAINS) + return NULL; + + dbus_mutex_lock (connection->mutex); + message = _dbus_connection_pop_message_unlocked (connection); dbus_mutex_unlock (connection->mutex); @@ -1724,15 +1769,62 @@ _dbus_connection_failed_pop (DBusConnection *connection, connection->n_incoming += 1; } +static DBusDispatchStatus +_dbus_connection_get_dispatch_status_unlocked (DBusConnection *connection) +{ + if (connection->n_incoming > 0) + return DBUS_DISPATCH_DATA_REMAINS; + else if (!_dbus_transport_queue_messages (connection->transport)) + return DBUS_DISPATCH_NEED_MEMORY; + else + { + DBusDispatchStatus status; + + status = _dbus_transport_get_dispatch_status (connection->transport); + + if (status != DBUS_DISPATCH_COMPLETE) + return status; + else if (connection->n_incoming > 0) + return DBUS_DISPATCH_DATA_REMAINS; + else + return DBUS_DISPATCH_COMPLETE; + } +} + +/** + * Gets the current state (what we would currently return + * from dbus_connection_dispatch()) but doesn't actually + * dispatch any messages. + * + * @param connection the connection. + * @returns current dispatch status + */ +DBusDispatchStatus +dbus_connection_get_dispatch_status (DBusConnection *connection) +{ + DBusDispatchStatus status; + + dbus_mutex_lock (connection->mutex); + + status = _dbus_connection_get_dispatch_status_unlocked (connection); + + dbus_mutex_unlock (connection->mutex); + + return status; +} + /** - * Pops the first-received message from the current incoming message - * queue, runs any handlers for it, then unrefs the message. + * Processes data buffered while handling watches, queueing zero or + * more incoming messages. Then pops the first-received message from + * the current incoming message queue, runs any handlers for it, and + * unrefs the message. Returns a status indicating whether messages/data + * remain, more memory is needed, or all data has been processed. * * @param connection the connection - * @returns #TRUE if the queue is not empty after dispatch + * @returns dispatch status */ -dbus_bool_t -dbus_connection_dispatch_message (DBusConnection *connection) +DBusDispatchStatus +dbus_connection_dispatch (DBusConnection *connection) { DBusMessageHandler *handler; DBusMessage *message; @@ -1741,9 +1833,14 @@ dbus_connection_dispatch_message (DBusConnection *connection) ReplyHandlerData *reply_handler_data; const char *name; dbus_int32_t reply_serial; + DBusDispatchStatus status; - dbus_mutex_lock (connection->mutex); + status = dbus_connection_get_dispatch_status (connection); + if (status != DBUS_DISPATCH_DATA_REMAINS) + return status; + dbus_mutex_lock (connection->mutex); + /* We need to ref the connection since the callback could potentially * drop the last ref to it */ @@ -1753,17 +1850,23 @@ dbus_connection_dispatch_message (DBusConnection *connection) /* This call may drop the lock during the execution (if waiting for * borrowed messages to be returned) but the order of message - * dispatch if several threads call dispatch_message is still + * dispatch if several threads call dispatch() is still * protected by the lock, since only one will get the lock, and that * one will finish the message dispatching */ message_link = _dbus_connection_pop_message_link_unlocked (connection); if (message_link == NULL) { + /* another thread dispatched our stuff */ + _dbus_connection_release_dispatch (connection); dbus_mutex_unlock (connection->mutex); + + status = dbus_connection_get_dispatch_status (connection); + dbus_connection_unref (connection); - return FALSE; + + return status; } message = message_link->data; @@ -1780,14 +1883,14 @@ dbus_connection_dispatch_message (DBusConnection *connection) dbus_mutex_unlock (connection->mutex); _dbus_connection_failed_pop (connection, message_link); dbus_connection_unref (connection); - return FALSE; + return DBUS_DISPATCH_NEED_MEMORY; } _dbus_list_foreach (&filter_list_copy, (DBusForeachFunction)dbus_message_handler_ref, NULL); - /* We're still protected from dispatch_message reentrancy here + /* We're still protected from dispatch() reentrancy here * since we acquired the dispatcher */ dbus_mutex_unlock (connection->mutex); @@ -1855,8 +1958,9 @@ dbus_connection_dispatch_message (DBusConnection *connection) name); if (handler != NULL) { - /* We're still protected from dispatch_message reentrancy here - * since we acquired the dispatcher */ + /* We're still protected from dispatch() reentrancy here + * since we acquired the dispatcher + */ dbus_mutex_unlock (connection->mutex); _dbus_verbose (" running app handler on message %p\n", message); @@ -1876,10 +1980,15 @@ dbus_connection_dispatch_message (DBusConnection *connection) _dbus_connection_release_dispatch (connection); dbus_mutex_unlock (connection->mutex); _dbus_list_free_link (message_link); + dbus_message_unref (message); /* don't want the message to count in max message limits + * in computing dispatch status + */ + + status = dbus_connection_get_dispatch_status (connection); + dbus_connection_unref (connection); - dbus_message_unref (message); - return connection->n_incoming > 0; + return status; } /** @@ -2458,6 +2567,9 @@ dbus_connection_get_max_message_size (DBusConnection *connection) * and that contains a half-dozen small messages, we may exceed the * size max by that amount. But this should be inconsequential. * + * This does imply that we can't call read() with a buffer larger + * than we're willing to exceed this limit by. + * * @param connection the connection * @param size the maximum size in bytes of all outstanding messages */ diff --git a/dbus/dbus-connection.h b/dbus/dbus-connection.h index 78a8d58c..b4e6007d 100644 --- a/dbus/dbus-connection.h +++ b/dbus/dbus-connection.h @@ -56,6 +56,13 @@ typedef enum * can be present in current state). */ } DBusWatchFlags; +typedef enum +{ + DBUS_DISPATCH_DATA_REMAINS, /**< There is more data to potentially convert to messages. */ + DBUS_DISPATCH_COMPLETE, /**< All currently available data has been processed. */ + DBUS_DISPATCH_NEED_MEMORY /**< More memory is needed to continue. */ +} DBusDispatchStatus; + typedef dbus_bool_t (* DBusAddWatchFunction) (DBusWatch *watch, void *data); typedef void (* DBusWatchToggledFunction) (DBusWatch *watch, @@ -70,58 +77,52 @@ typedef void (* DBusTimeoutToggledFunction) (DBusTimeout *timeout, typedef void (* DBusRemoveTimeoutFunction) (DBusTimeout *timeout, void *data); -DBusConnection* dbus_connection_open (const char *address, - DBusResultCode *result); -void dbus_connection_ref (DBusConnection *connection); -void dbus_connection_unref (DBusConnection *connection); -void dbus_connection_disconnect (DBusConnection *connection); -dbus_bool_t dbus_connection_get_is_connected (DBusConnection *connection); -dbus_bool_t dbus_connection_get_is_authenticated (DBusConnection *connection); -void dbus_connection_flush (DBusConnection *connection); -int dbus_connection_get_n_messages (DBusConnection *connection); -DBusMessage* dbus_connection_borrow_message (DBusConnection *connection); -void dbus_connection_return_message (DBusConnection *connection, - DBusMessage *message); -void dbus_connection_steal_borrowed_message (DBusConnection *connection, - DBusMessage *message); -DBusMessage* dbus_connection_pop_message (DBusConnection *connection); -dbus_bool_t dbus_connection_dispatch_message (DBusConnection *connection); - - -dbus_bool_t dbus_connection_send (DBusConnection *connection, - DBusMessage *message, - dbus_int32_t *client_serial); -dbus_bool_t dbus_connection_send_with_reply (DBusConnection *connection, - DBusMessage *message, - DBusMessageHandler *reply_handler, - int timeout_milliseconds); -DBusMessage *dbus_connection_send_with_reply_and_block (DBusConnection *connection, - DBusMessage *message, - int timeout_milliseconds, - DBusError *error); - - -dbus_bool_t dbus_connection_set_watch_functions (DBusConnection *connection, - DBusAddWatchFunction add_function, - DBusRemoveWatchFunction remove_function, - DBusWatchToggledFunction toggled_function, - void *data, - DBusFreeFunction free_data_function); -dbus_bool_t dbus_connection_set_timeout_functions (DBusConnection *connection, - DBusAddTimeoutFunction add_function, - DBusRemoveTimeoutFunction remove_function, - DBusTimeoutToggledFunction toggled_function, - void *data, - DBusFreeFunction free_data_function); -void dbus_connection_set_wakeup_main_function (DBusConnection *connection, - DBusWakeupMainFunction wakeup_main_function, - void *data, - DBusFreeFunction free_data_function); -void dbus_connection_handle_watch (DBusConnection *connection, - DBusWatch *watch, - unsigned int condition); - - +DBusConnection* dbus_connection_open (const char *address, + DBusResultCode *result); +void dbus_connection_ref (DBusConnection *connection); +void dbus_connection_unref (DBusConnection *connection); +void dbus_connection_disconnect (DBusConnection *connection); +dbus_bool_t dbus_connection_get_is_connected (DBusConnection *connection); +dbus_bool_t dbus_connection_get_is_authenticated (DBusConnection *connection); +void dbus_connection_flush (DBusConnection *connection); +DBusMessage* dbus_connection_borrow_message (DBusConnection *connection); +void dbus_connection_return_message (DBusConnection *connection, + DBusMessage *message); +void dbus_connection_steal_borrowed_message (DBusConnection *connection, + DBusMessage *message); +DBusMessage* dbus_connection_pop_message (DBusConnection *connection); +DBusDispatchStatus dbus_connection_get_dispatch_status (DBusConnection *connection); +DBusDispatchStatus dbus_connection_dispatch (DBusConnection *connection); +dbus_bool_t dbus_connection_send (DBusConnection *connection, + DBusMessage *message, + dbus_int32_t *client_serial); +dbus_bool_t dbus_connection_send_with_reply (DBusConnection *connection, + DBusMessage *message, + DBusMessageHandler *reply_handler, + int timeout_milliseconds); +DBusMessage * dbus_connection_send_with_reply_and_block (DBusConnection *connection, + DBusMessage *message, + int timeout_milliseconds, + DBusError *error); +dbus_bool_t dbus_connection_set_watch_functions (DBusConnection *connection, + DBusAddWatchFunction add_function, + DBusRemoveWatchFunction remove_function, + DBusWatchToggledFunction toggled_function, + void *data, + DBusFreeFunction free_data_function); +dbus_bool_t dbus_connection_set_timeout_functions (DBusConnection *connection, + DBusAddTimeoutFunction add_function, + DBusRemoveTimeoutFunction remove_function, + DBusTimeoutToggledFunction toggled_function, + void *data, + DBusFreeFunction free_data_function); +void dbus_connection_set_wakeup_main_function (DBusConnection *connection, + DBusWakeupMainFunction wakeup_main_function, + void *data, + DBusFreeFunction free_data_function); +void dbus_connection_handle_watch (DBusConnection *connection, + DBusWatch *watch, + unsigned int condition); int dbus_watch_get_fd (DBusWatch *watch); diff --git a/dbus/dbus-memory.c b/dbus/dbus-memory.c index a426b371..f7c43f5c 100644 --- a/dbus/dbus-memory.c +++ b/dbus/dbus-memory.c @@ -156,7 +156,9 @@ _dbus_set_fail_alloc_counter (int until_next_fail) fail_alloc_counter = until_next_fail; +#if 0 _dbus_verbose ("Set fail alloc counter = %d\n", fail_alloc_counter); +#endif } /** diff --git a/dbus/dbus-mempool.c b/dbus/dbus-mempool.c index 437dbfdc..13ba5502 100644 --- a/dbus/dbus-mempool.c +++ b/dbus/dbus-mempool.c @@ -100,7 +100,7 @@ struct DBusMemPool DBusFreedElement *free_elements; /**< a free list of elements to recycle */ DBusMemBlock *blocks; /**< blocks of memory from malloc() */ - int allocated_elements; /* Count of outstanding allocated elements */ + int allocated_elements; /**< Count of outstanding allocated elements */ }; /** @} */ diff --git a/dbus/dbus-message-handler.c b/dbus/dbus-message-handler.c index 143d7b03..6d5bb78d 100644 --- a/dbus/dbus-message-handler.c +++ b/dbus/dbus-message-handler.c @@ -137,7 +137,7 @@ _dbus_message_handler_handle_message (DBusMessageHandler *handler, dbus_mutex_unlock (message_handler_lock); /* This function doesn't ref handler/connection/message - * since that's done in dbus_connection_dispatch_message(). + * since that's done in dbus_connection_dispatch(). */ if (function != NULL) return (* function) (handler, connection, message, user_data); diff --git a/dbus/dbus-message-internal.h b/dbus/dbus-message-internal.h index 9c4ee64b..ef1453a0 100644 --- a/dbus/dbus-message-internal.h +++ b/dbus/dbus-message-internal.h @@ -51,7 +51,8 @@ void _dbus_message_loader_get_buffer (DBusMessageLoader void _dbus_message_loader_return_buffer (DBusMessageLoader *loader, DBusString *buffer, int bytes_read); - +dbus_bool_t _dbus_message_loader_queue_messages (DBusMessageLoader *loader); +DBusMessage* _dbus_message_loader_peek_message (DBusMessageLoader *loader); DBusMessage* _dbus_message_loader_pop_message (DBusMessageLoader *loader); DBusList* _dbus_message_loader_pop_message_link (DBusMessageLoader *loader); diff --git a/dbus/dbus-message.c b/dbus/dbus-message.c index a3b713ee..6bcc2060 100644 --- a/dbus/dbus-message.c +++ b/dbus/dbus-message.c @@ -2676,12 +2676,6 @@ decode_header_data (const DBusString *data, * in. This function must always be called, even if no bytes were * successfully read. * - * @todo if we run out of memory in here, we offer no way for calling - * code to handle it, i.e. they can't re-run the message parsing - * attempt. Perhaps much of this code could be moved to pop_message()? - * But then that may need to distinguish NULL return for no messages - * from NULL return for errors. - * * @param loader the loader. * @param buffer the buffer. * @param bytes_read number of bytes that were read into the buffer. @@ -2695,9 +2689,19 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_assert (buffer == &loader->data); loader->buffer_outstanding = FALSE; +} +/** + * Converts buffered data into messages. + * + * @param loader the loader. + * @returns #TRUE if we had enough memory to finish. + */ +dbus_bool_t +_dbus_message_loader_queue_messages (DBusMessageLoader *loader) +{ if (loader->corrupted) - return; + return TRUE; while (_dbus_string_get_length (&loader->data) >= 16) { @@ -2715,7 +2719,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_verbose ("Message has protocol version %d ours is %d\n", (int) header_data[2], DBUS_MAJOR_PROTOCOL_VERSION); loader->corrupted = TRUE; - return; + return TRUE; } byte_order = header_data[0]; @@ -2726,7 +2730,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_verbose ("Message with bad byte order '%c' received\n", byte_order); loader->corrupted = TRUE; - return; + return TRUE; } header_len_unsigned = _dbus_unpack_uint32 (byte_order, header_data + 4); @@ -2737,7 +2741,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_verbose ("Message had broken too-small header length %u\n", header_len_unsigned); loader->corrupted = TRUE; - return; + return TRUE; } if (header_len_unsigned > (unsigned) MAX_SANE_MESSAGE_SIZE || @@ -2747,7 +2751,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, header_len_unsigned, body_len_unsigned); loader->corrupted = TRUE; - return; + return TRUE; } /* Now that we know the values are in signed range, get @@ -2762,7 +2766,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_verbose ("header length %d is not aligned to 8 bytes\n", header_len); loader->corrupted = TRUE; - return; + return TRUE; } if (header_len + body_len > loader->max_message_size) @@ -2770,7 +2774,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_verbose ("Message claimed length header = %d body = %d exceeds max message length %d\n", header_len, body_len, loader->max_message_size); loader->corrupted = TRUE; - return; + return TRUE; } if (_dbus_string_get_length (&loader->data) >= (header_len + body_len)) @@ -2787,7 +2791,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, { _dbus_verbose ("Header was invalid\n"); loader->corrupted = TRUE; - return; + return TRUE; } next_arg = header_len; @@ -2801,7 +2805,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, &next_arg)) { loader->corrupted = TRUE; - return; + return TRUE; } _dbus_assert (next_arg > prev); @@ -2813,12 +2817,12 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, next_arg, header_len, body_len, header_len + body_len); loader->corrupted = TRUE; - return; + return TRUE; } message = dbus_message_new_empty_header (); if (message == NULL) - break; /* ugh, postpone this I guess. */ + return FALSE; message->byte_order = byte_order; message->header_padding = header_padding; @@ -2834,7 +2838,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, if (!_dbus_list_append (&loader->messages, message)) { dbus_message_unref (message); - break; + return FALSE; } _dbus_assert (_dbus_string_get_length (&message->header) == 0); @@ -2847,7 +2851,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, { _dbus_list_remove_last (&loader->messages, message); dbus_message_unref (message); - break; + return FALSE; } if (!_dbus_string_move_len (&loader->data, 0, body_len, &message->body, 0)) @@ -2861,7 +2865,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_list_remove_last (&loader->messages, message); dbus_message_unref (message); - break; + return FALSE; } _dbus_assert (_dbus_string_get_length (&message->header) == header_len); @@ -2876,14 +2880,32 @@ _dbus_message_loader_return_buffer (DBusMessageLoader *loader, _dbus_verbose ("Loaded message %p\n", message); } else - break; + return TRUE; } + + return TRUE; +} + +/** + * Peeks at first loaded message, returns #NULL if no messages have + * been queued. + * + * @param loader the loader. + * @returns the next message, or #NULL if none. + */ +DBusMessage* +_dbus_message_loader_peek_message (DBusMessageLoader *loader) +{ + if (loader->messages) + return loader->messages->data; + else + return NULL; } /** * Pops a loaded message (passing ownership of the message * to the caller). Returns #NULL if no messages have been - * loaded. + * queued. * * @param loader the loader. * @returns the next message, or #NULL if none. @@ -3153,6 +3175,9 @@ check_have_valid_message (DBusMessageLoader *loader) message = NULL; retval = FALSE; + + if (!_dbus_message_loader_queue_messages (loader)) + _dbus_assert_not_reached ("no memory to queue messages"); if (_dbus_message_loader_get_is_corrupted (loader)) { @@ -3195,6 +3220,9 @@ check_invalid_message (DBusMessageLoader *loader) dbus_bool_t retval; retval = FALSE; + + if (!_dbus_message_loader_queue_messages (loader)) + _dbus_assert_not_reached ("no memory to queue messages"); if (!_dbus_message_loader_get_is_corrupted (loader)) { @@ -3216,6 +3244,9 @@ check_incomplete_message (DBusMessageLoader *loader) message = NULL; retval = FALSE; + + if (!_dbus_message_loader_queue_messages (loader)) + _dbus_assert_not_reached ("no memory to queue messages"); if (_dbus_message_loader_get_is_corrupted (loader)) { @@ -3242,6 +3273,9 @@ static dbus_bool_t check_loader_results (DBusMessageLoader *loader, DBusMessageValidity validity) { + if (!_dbus_message_loader_queue_messages (loader)) + _dbus_assert_not_reached ("no memory to queue messages"); + switch (validity) { case _DBUS_MESSAGE_VALID: @@ -3735,6 +3769,9 @@ _dbus_message_test (const char *test_data_dir) dbus_message_unref (message); /* Now pop back the message */ + if (!_dbus_message_loader_queue_messages (loader)) + _dbus_assert_not_reached ("no memory to queue messages"); + if (_dbus_message_loader_get_is_corrupted (loader)) _dbus_assert_not_reached ("message loader corrupted"); diff --git a/dbus/dbus-server.c b/dbus/dbus-server.c index 48703e17..79ed7ed4 100644 --- a/dbus/dbus-server.c +++ b/dbus/dbus-server.c @@ -151,7 +151,7 @@ _dbus_server_remove_watch (DBusServer *server, * function on a watch that was not previously added. * * @param server the server. - * @param timeout the timeout to toggle. + * @param watch the watch to toggle. * @param enabled whether to enable or disable */ void diff --git a/dbus/dbus-string.c b/dbus/dbus-string.c index 1a50dac7..1bc3e205 100644 --- a/dbus/dbus-string.c +++ b/dbus/dbus-string.c @@ -2395,6 +2395,7 @@ _dbus_string_validate_utf8 (const DBusString *str, int len) { const unsigned char *p; + const unsigned char *end; DBUS_CONST_STRING_PREAMBLE (str); _dbus_assert (start >= 0); _dbus_assert (start <= real->len); @@ -2403,9 +2404,10 @@ _dbus_string_validate_utf8 (const DBusString *str, if (len > real->len - start) return FALSE; - p = real->str; + p = real->str + start; + end = p + len; - while (p - real->str < len && *p) + while (p < end) { int i, mask = 0, char_len; dbus_unichar_t result; @@ -2416,20 +2418,20 @@ _dbus_string_validate_utf8 (const DBusString *str, if (char_len == -1) break; - /* check that the expected number of bytes exists in real->str */ - if ((len - (p - real->str)) < char_len) + /* check that the expected number of bytes exists in the remaining length */ + if ((end - p) < char_len) break; UTF8_GET (result, p, i, mask, char_len); if (UTF8_LENGTH (result) != char_len) /* Check for overlong UTF-8 */ - break; + break; if (result == (dbus_unichar_t)-1) break; if (!UNICODE_VALID (result)) - break; + break; p += char_len; } @@ -2437,7 +2439,7 @@ _dbus_string_validate_utf8 (const DBusString *str, /* See that we covered the entire length if a length was * passed in */ - if (p != (real->str + len)) + if (p != end) return FALSE; else return TRUE; diff --git a/dbus/dbus-transport-unix.c b/dbus/dbus-transport-unix.c index ed68658b..9ea5ce11 100644 --- a/dbus/dbus-transport-unix.c +++ b/dbus/dbus-transport-unix.c @@ -178,37 +178,6 @@ do_io_error (DBusTransport *transport) _dbus_transport_unref (transport); } -static void -queue_messages (DBusTransport *transport) -{ - DBusList *link; - - /* Queue any messages */ - while ((link = _dbus_message_loader_pop_message_link (transport->loader))) - { - DBusMessage *message; - - message = link->data; - - _dbus_verbose ("queueing received message %p\n", message); - - _dbus_message_add_size_counter (message, transport->live_messages_size); - - /* pass ownership of link and message ref to connection */ - _dbus_connection_queue_received_message_link (transport->connection, - link); - } - - if (_dbus_message_loader_get_is_corrupted (transport->loader)) - { - _dbus_verbose ("Corrupted message stream, disconnecting\n"); - do_io_error (transport); - } - - /* check read watch in case we've now exceeded max outstanding messages */ - check_read_watch (transport); -} - /* return value is whether we successfully read any new data. */ static dbus_bool_t read_data_into_auth (DBusTransport *transport) @@ -398,8 +367,6 @@ recover_unused_bytes (DBusTransport *transport) orig_len); } - queue_messages (transport); - return; nomem: @@ -777,7 +744,8 @@ do_reading (DBusTransport *transport) total += bytes_read; - queue_messages (transport); + if (_dbus_transport_queue_messages (transport) == DBUS_DISPATCH_NEED_MEMORY) + goto out; /* Try reading more data until we get EAGAIN and return, or * exceed max bytes per iteration. If in blocking mode of diff --git a/dbus/dbus-transport.c b/dbus/dbus-transport.c index 8c6c7f1c..b2355ea5 100644 --- a/dbus/dbus-transport.c +++ b/dbus/dbus-transport.c @@ -506,6 +506,70 @@ _dbus_transport_do_iteration (DBusTransport *transport, _dbus_transport_unref (transport); } +/** + * Reports our current dispatch status (whether there's buffered + * data to be queued as messages, or not, or we need memory). + * + * @param transport the transport + * @returns current status + */ +DBusDispatchStatus +_dbus_transport_get_dispatch_status (DBusTransport *transport) +{ + if (_dbus_counter_get_value (transport->live_messages_size) >= transport->max_live_messages_size) + return DBUS_DISPATCH_COMPLETE; /* complete for now */ + + if (!_dbus_message_loader_queue_messages (transport->loader)) + return DBUS_DISPATCH_NEED_MEMORY; + + if (_dbus_message_loader_peek_message (transport->loader) != NULL) + return DBUS_DISPATCH_DATA_REMAINS; + else + return DBUS_DISPATCH_COMPLETE; +} + +/** + * Processes data we've read while handling a watch, potentially + * converting some of it to messages and queueing those messages on + * the connection. + * + * @param transport the transport + * @returns #TRUE if we had enough memory to queue all messages + */ +dbus_bool_t +_dbus_transport_queue_messages (DBusTransport *transport) +{ + DBusDispatchStatus status; + + /* Queue any messages */ + while ((status = _dbus_transport_get_dispatch_status (transport)) == DBUS_DISPATCH_DATA_REMAINS) + { + DBusMessage *message; + DBusList *link; + + link = _dbus_message_loader_pop_message_link (transport->loader); + _dbus_assert (link != NULL); + + message = link->data; + + _dbus_verbose ("queueing received message %p\n", message); + + _dbus_message_add_size_counter (message, transport->live_messages_size); + + /* pass ownership of link and message ref to connection */ + _dbus_connection_queue_received_message_link (transport->connection, + link); + } + + if (_dbus_message_loader_get_is_corrupted (transport->loader)) + { + _dbus_verbose ("Corrupted message stream, disconnecting\n"); + _dbus_transport_disconnect (transport); + } + + return status != DBUS_DISPATCH_NEED_MEMORY; +} + /** * See dbus_connection_set_max_message_size(). * diff --git a/dbus/dbus-transport.h b/dbus/dbus-transport.h index 1f01788f..ad3299c2 100644 --- a/dbus/dbus-transport.h +++ b/dbus/dbus-transport.h @@ -30,29 +30,31 @@ DBUS_BEGIN_DECLS; typedef struct DBusTransport DBusTransport; -DBusTransport* _dbus_transport_open (const char *address, - DBusResultCode *result); -void _dbus_transport_ref (DBusTransport *transport); -void _dbus_transport_unref (DBusTransport *transport); -void _dbus_transport_disconnect (DBusTransport *transport); -dbus_bool_t _dbus_transport_get_is_connected (DBusTransport *transport); -dbus_bool_t _dbus_transport_get_is_authenticated (DBusTransport *transport); -void _dbus_transport_handle_watch (DBusTransport *transport, - DBusWatch *watch, - unsigned int condition); -dbus_bool_t _dbus_transport_set_connection (DBusTransport *transport, - DBusConnection *connection); -void _dbus_transport_messages_pending (DBusTransport *transport, - int queue_length); -void _dbus_transport_do_iteration (DBusTransport *transport, - unsigned int flags, - int timeout_milliseconds); -void _dbus_transport_set_max_message_size (DBusTransport *transport, - long size); -long _dbus_transport_get_max_message_size (DBusTransport *transport); -void _dbus_transport_set_max_live_messages_size (DBusTransport *transport, - long size); -long _dbus_transport_get_max_live_messages_size (DBusTransport *transport); +DBusTransport* _dbus_transport_open (const char *address, + DBusResultCode *result); +void _dbus_transport_ref (DBusTransport *transport); +void _dbus_transport_unref (DBusTransport *transport); +void _dbus_transport_disconnect (DBusTransport *transport); +dbus_bool_t _dbus_transport_get_is_connected (DBusTransport *transport); +dbus_bool_t _dbus_transport_get_is_authenticated (DBusTransport *transport); +void _dbus_transport_handle_watch (DBusTransport *transport, + DBusWatch *watch, + unsigned int condition); +dbus_bool_t _dbus_transport_set_connection (DBusTransport *transport, + DBusConnection *connection); +void _dbus_transport_messages_pending (DBusTransport *transport, + int queue_length); +void _dbus_transport_do_iteration (DBusTransport *transport, + unsigned int flags, + int timeout_milliseconds); +DBusDispatchStatus _dbus_transport_get_dispatch_status (DBusTransport *transport); +dbus_bool_t _dbus_transport_queue_messages (DBusTransport *transport); +void _dbus_transport_set_max_message_size (DBusTransport *transport, + long size); +long _dbus_transport_get_max_message_size (DBusTransport *transport); +void _dbus_transport_set_max_live_messages_size (DBusTransport *transport, + long size); +long _dbus_transport_get_max_live_messages_size (DBusTransport *transport); DBUS_END_DECLS; diff --git a/glib/dbus-gmain.c b/glib/dbus-gmain.c index 5eb75d10..b00320f2 100644 --- a/glib/dbus-gmain.c +++ b/glib/dbus-gmain.c @@ -63,47 +63,47 @@ static int connection_slot = -1; static GStaticMutex server_slot_lock = G_STATIC_MUTEX_INIT; static int server_slot = -1; -static gboolean dbus_connection_prepare (GSource *source, - gint *timeout); -static gboolean dbus_connection_check (GSource *source); -static gboolean dbus_connection_dispatch (GSource *source, - GSourceFunc callback, - gpointer user_data); -static gboolean dbus_server_prepare (GSource *source, - gint *timeout); -static gboolean dbus_server_check (GSource *source); -static gboolean dbus_server_dispatch (GSource *source, - GSourceFunc callback, - gpointer user_data); +static gboolean gsource_connection_prepare (GSource *source, + gint *timeout); +static gboolean gsource_connection_check (GSource *source); +static gboolean gsource_connection_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data); +static gboolean gsource_server_prepare (GSource *source, + gint *timeout); +static gboolean gsource_server_check (GSource *source); +static gboolean gsource_server_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data); static GSourceFuncs dbus_connection_funcs = { - dbus_connection_prepare, - dbus_connection_check, - dbus_connection_dispatch, + gsource_connection_prepare, + gsource_connection_check, + gsource_connection_dispatch, NULL }; static GSourceFuncs dbus_server_funcs = { - dbus_server_prepare, - dbus_server_check, - dbus_server_dispatch, + gsource_server_prepare, + gsource_server_check, + gsource_server_dispatch, NULL }; static gboolean -dbus_connection_prepare (GSource *source, - gint *timeout) +gsource_connection_prepare (GSource *source, + gint *timeout) { DBusConnection *connection = ((DBusGSource *)source)->connection_or_server; *timeout = -1; - return (dbus_connection_get_n_messages (connection) > 0); + return (dbus_connection_get_dispatch_status (connection) == DBUS_DISPATCH_DATA_REMAINS); } static gboolean -dbus_server_prepare (GSource *source, - gint *timeout) +gsource_server_prepare (GSource *source, + gint *timeout) { *timeout = -1; @@ -132,13 +132,13 @@ dbus_gsource_check (GSource *source) } static gboolean -dbus_connection_check (GSource *source) +gsource_connection_check (GSource *source) { return dbus_gsource_check (source); } static gboolean -dbus_server_check (GSource *source) +gsource_server_check (GSource *source) { return dbus_gsource_check (source); } @@ -192,9 +192,9 @@ dbus_gsource_dispatch (GSource *source, } static gboolean -dbus_connection_dispatch (GSource *source, - GSourceFunc callback, - gpointer user_data) +gsource_connection_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data) { DBusGSource *dbus_source = (DBusGSource *)source; DBusConnection *connection = dbus_source->connection_or_server; @@ -205,7 +205,7 @@ dbus_connection_dispatch (GSource *source, FALSE); /* Dispatch messages */ - while (dbus_connection_dispatch_message (connection)) + while (dbus_connection_dispatch (connection) == DBUS_DISPATCH_DATA_REMAINS) ; dbus_connection_unref (connection); @@ -214,9 +214,9 @@ dbus_connection_dispatch (GSource *source, } static gboolean -dbus_server_dispatch (GSource *source, - GSourceFunc callback, - gpointer user_data) +gsource_server_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data) { DBusGSource *dbus_source = (DBusGSource *)source; DBusServer *server = dbus_source->connection_or_server; -- cgit