From 89e6dfd29cfbdb92f15e8fb2bde76d94a4c5a7b3 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 26 Feb 2003 15:52:25 +0000 Subject: 2003-02-26 Alexander Larsson * configure.in: Set DBUS_GLIB_THREADS_LIBS for apps using gthread-2.0 * dbus/dbus-connection.c: * dbus/dbus-connection.h: Fix _dbus_connection_acquire_io_path and _dbus_connection_acquire_dispatch. Add dbus_connection_set_wakeup_main_function and use it when queueing incoming and outgoing messages. * dbus/dbus-dataslot.c: Threadsafe usage of DBusDataSlotAllocator * dbus/dbus-message.c: (dbus_message_get_args_iter): dbus_new can fail. * dbus/dbus-server-unix.c: Add todo comment * glib/dbus-gmain.c: Implement the new wakeup functions for glib. * glib/Makefile.am: * glib/test-thread-client.c: * glib/test-thread-server.c: * glib/test-thread.h: Initial cut at some thread test code. Not really done yet. --- ChangeLog | 30 ++++++ configure.in | 2 + dbus/dbus-connection.c | 87 ++++++++++++++-- dbus/dbus-connection.h | 31 +++--- dbus/dbus-dataslot.c | 20 +++- dbus/dbus-message.c | 13 ++- dbus/dbus-server-unix.c | 5 + glib/Makefile.am | 16 ++- glib/dbus-gmain.c | 13 ++- glib/test-thread-client.c | 89 +++++++++++++++++ glib/test-thread-server.c | 246 ++++++++++++++++++++++++++++++++++++++++++++++ glib/test-thread.h | 1 + 12 files changed, 522 insertions(+), 31 deletions(-) create mode 100644 glib/test-thread-client.c create mode 100644 glib/test-thread-server.c create mode 100644 glib/test-thread.h diff --git a/ChangeLog b/ChangeLog index 87adb2a8..4a36ddf9 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,33 @@ +2003-02-26 Alexander Larsson + + * configure.in: + Set DBUS_GLIB_THREADS_LIBS for apps using gthread-2.0 + + * dbus/dbus-connection.c: + * dbus/dbus-connection.h: + Fix _dbus_connection_acquire_io_path and _dbus_connection_acquire_dispatch. + Add dbus_connection_set_wakeup_main_function and use it when queueing + incoming and outgoing messages. + + + * dbus/dbus-dataslot.c: + Threadsafe usage of DBusDataSlotAllocator + + * dbus/dbus-message.c: (dbus_message_get_args_iter): + dbus_new can fail. + + * dbus/dbus-server-unix.c: + Add todo comment + + * glib/dbus-gmain.c: + Implement the new wakeup functions for glib. + + * glib/Makefile.am: + * glib/test-thread-client.c: + * glib/test-thread-server.c: + * glib/test-thread.h: + Initial cut at some thread test code. Not really done yet. + 2003-02-26 Havoc Pennington * dbus/dbus-connection.c diff --git a/configure.in b/configure.in index 4b613e06..281da370 100644 --- a/configure.in +++ b/configure.in @@ -218,6 +218,7 @@ AC_SUBST(DBUS_TEST_LIBS) # Glib detection PKG_CHECK_MODULES(DBUS_GLIB, glib-2.0, have_glib=yes, have_glib=no) +PKG_CHECK_MODULES(DBUS_GLIB_THREADS, glib-2.0 gthread-2.0) if test x$have_glib = xno ; then AC_MSG_WARN([GLib development libraries not found]) @@ -238,6 +239,7 @@ AM_CONDITIONAL(HAVE_GLIB, test x$have_glib = xyes) dnl GLib flags AC_SUBST(DBUS_GLIB_CFLAGS) AC_SUBST(DBUS_GLIB_LIBS) +AC_SUBST(DBUS_GLIB_THREADS_LIBS) # Qt detection have_qt=no diff --git a/dbus/dbus-connection.c b/dbus/dbus-connection.c index 6fb4e84c..4990cff3 100644 --- a/dbus/dbus-connection.c +++ b/dbus/dbus-connection.c @@ -105,6 +105,10 @@ struct DBusConnection int client_serial; /**< Client serial. Increments each time a message is sent */ DBusList *disconnect_message_link; /**< Preallocated list node for queueing the disconnection message */ + + DBusWakeupMainFunction wakeup_main_function; /**< Function to wake up the mainloop */ + void *wakeup_main_data; /**< Application data for wakeup_main_function */ + DBusFreeFunction free_wakeup_main_data; /**< free wakeup_main_data */ }; typedef struct @@ -147,6 +151,19 @@ _dbus_connection_unlock (DBusConnection *connection) dbus_mutex_unlock (connection->mutex); } +/** + * Wakes up the main loop if it is sleeping + * Needed if we're e.g. queueing outgoing messages + * on a thread while the mainloop sleeps. + * + * @param connection the connection. + */ +static void +_dbus_connection_wakeup_mainloop (DBusConnection *connection) +{ + if (connection->wakeup_main_function) + (*connection->wakeup_main_function) (connection->wakeup_main_data); +} /** * Adds a message to the incoming message queue, returning #FALSE @@ -187,6 +204,8 @@ _dbus_connection_queue_received_message (DBusConnection *connection, dbus_message_ref (message); connection->n_incoming += 1; + _dbus_connection_wakeup_mainloop (connection); + _dbus_verbose ("Incoming message %p added to queue, %d incoming\n", message, connection->n_incoming); @@ -211,6 +230,8 @@ _dbus_connection_queue_synthesized_message_link (DBusConnection *connection, connection->n_incoming += 1; + _dbus_connection_wakeup_mainloop (connection); + _dbus_verbose ("Incoming synthesized message %p added to queue, %d incoming\n", link->data, connection->n_incoming); } @@ -388,13 +409,17 @@ _dbus_connection_acquire_io_path (DBusConnection *connection, int timeout_milliseconds) { dbus_bool_t res = TRUE; - if (timeout_milliseconds != -1) - res = dbus_condvar_wait_timeout (connection->io_path_cond, - connection->mutex, - timeout_milliseconds); - else - dbus_condvar_wait (connection->io_path_cond, connection->mutex); + if (connection->io_path_acquired) + { + if (timeout_milliseconds != -1) + res = dbus_condvar_wait_timeout (connection->io_path_cond, + connection->mutex, + timeout_milliseconds); + else + dbus_condvar_wait (connection->io_path_cond, connection->mutex); + } + if (res) { _dbus_assert (!connection->io_path_acquired); @@ -774,6 +799,9 @@ _dbus_connection_last_unref (DBusConnection *connection) DBusHashIter iter; DBusList *link; + /* You have to disconnect the connection before unref:ing it. Otherwise + * you won't get the disconnected message. + */ _dbus_assert (!_dbus_transport_get_is_connected (connection->transport)); if (connection->connection_counter != NULL) @@ -953,7 +981,7 @@ dbus_connection_get_is_authenticated (DBusConnection *connection) dbus_bool_t dbus_connection_send_message (DBusConnection *connection, DBusMessage *message, - dbus_int32_t *client_serial, + dbus_int32_t *client_serial, DBusResultCode *result) { @@ -988,7 +1016,9 @@ dbus_connection_send_message (DBusConnection *connection, if (connection->n_outgoing == 1) _dbus_transport_messages_pending (connection->transport, - connection->n_outgoing); + connection->n_outgoing); + + _dbus_connection_wakeup_mainloop (connection); dbus_mutex_unlock (connection->mutex); @@ -1512,7 +1542,8 @@ dbus_connection_pop_message (DBusConnection *connection) static void _dbus_connection_acquire_dispatch (DBusConnection *connection) { - dbus_condvar_wait (connection->dispatch_cond, connection->mutex); + if (connection->dispatch_acquired) + dbus_condvar_wait (connection->dispatch_cond, connection->mutex); _dbus_assert (!connection->dispatch_acquired); connection->dispatch_acquired = TRUE; @@ -1788,6 +1819,44 @@ dbus_connection_set_timeout_functions (DBusConnection *connection, dbus_connection_unref (connection); } +/** + * Sets the mainloop wakeup function for the connection. Thi function is + * responsible for waking up the main loop (if its sleeping) when some some + * change has happened to the connection that the mainloop needs to reconsiders + * (e.g. a message has been queued for writing). + * When using Qt, this typically results in a call to QEventLoop::wakeUp(). + * When using GLib, it would call g_main_context_wakeup(). + * + * + * @param connection the connection. + * @param wakeup_main_function function to wake up the mainloop + * @param data data to pass wakeup_main_function + * @param free_data_function function to be called to free the data. + */ +void +dbus_connection_set_wakeup_main_function (DBusConnection *connection, + DBusWakeupMainFunction wakeup_main_function, + void *data, + DBusFreeFunction free_data_function) +{ + void *old_data; + DBusFreeFunction old_free_data; + + dbus_mutex_lock (connection->mutex); + old_data = connection->wakeup_main_data; + old_free_data = connection->free_wakeup_main_data; + + connection->wakeup_main_function = wakeup_main_function; + connection->wakeup_main_data = data; + connection->free_wakeup_main_data = free_data_function; + + dbus_mutex_unlock (connection->mutex); + + /* Callback outside the lock */ + if (old_free_data) + (*old_free_data) (old_data); +} + /** * Called to notify the connection when a previously-added watch * is ready for reading or writing, or has an exception such diff --git a/dbus/dbus-connection.h b/dbus/dbus-connection.h index 5a91ce82..fd631c6d 100644 --- a/dbus/dbus-connection.h +++ b/dbus/dbus-connection.h @@ -61,6 +61,7 @@ typedef void (* DBusAddWatchFunction) (DBusWatch *watch, typedef void (* DBusRemoveWatchFunction) (DBusWatch *watch, void *data); +typedef void (* DBusWakeupMainFunction) (void *data); typedef void (* DBusAddTimeoutFunction) (DBusTimeout *timeout, void *data); typedef void (* DBusRemoveTimeoutFunction) (DBusTimeout *timeout, @@ -99,19 +100,23 @@ DBusMessage *dbus_connection_send_message_with_reply_and_block (DBusConnection DBusResultCode *result); -void dbus_connection_set_watch_functions (DBusConnection *connection, - DBusAddWatchFunction add_function, - DBusRemoveWatchFunction remove_function, - void *data, - DBusFreeFunction free_data_function); -void dbus_connection_set_timeout_functions (DBusConnection *connection, - DBusAddTimeoutFunction add_function, - DBusRemoveTimeoutFunction remove_function, - void *data, - DBusFreeFunction free_data_function); -void dbus_connection_handle_watch (DBusConnection *connection, - DBusWatch *watch, - unsigned int condition); +void dbus_connection_set_watch_functions (DBusConnection *connection, + DBusAddWatchFunction add_function, + DBusRemoveWatchFunction remove_function, + void *data, + DBusFreeFunction free_data_function); +void dbus_connection_set_timeout_functions (DBusConnection *connection, + DBusAddTimeoutFunction add_function, + DBusRemoveTimeoutFunction remove_function, + void *data, + DBusFreeFunction free_data_function); +void dbus_connection_set_wakeup_main_function (DBusConnection *connection, + DBusWakeupMainFunction wakeup_main_function, + void *data, + DBusFreeFunction free_data_function); +void dbus_connection_handle_watch (DBusConnection *connection, + DBusWatch *watch, + unsigned int condition); diff --git a/dbus/dbus-dataslot.c b/dbus/dbus-dataslot.c index a5909ffc..53fb9e4c 100644 --- a/dbus/dbus-dataslot.c +++ b/dbus/dbus-dataslot.c @@ -177,9 +177,18 @@ _dbus_data_slot_list_set (DBusDataSlotAllocator *allocator, DBusFreeFunction free_data_func, DBusFreeFunction *old_free_func, void **old_data) -{ +{ +#ifndef DBUS_DISABLE_ASSERT + /* We need to take the allocator lock here, because the allocator could + * be e.g. realloc()ing allocated_slots. We avoid doing this if asserts + * are disabled, since then the asserts are empty. + */ + if (!dbus_mutex_lock (allocator->lock)) + return FALSE; _dbus_assert (slot < allocator->n_allocated_slots); _dbus_assert (allocator->allocated_slots[slot] == slot); + dbus_mutex_unlock (allocator->lock); +#endif if (slot >= list->n_slots) { @@ -227,8 +236,17 @@ _dbus_data_slot_list_get (DBusDataSlotAllocator *allocator, DBusDataSlotList *list, int slot) { +#ifndef DBUS_DISABLE_ASSERT + /* We need to take the allocator lock here, because the allocator could + * be e.g. realloc()ing allocated_slots. We avoid doing this if asserts + * are disabled, since then the asserts are empty. + */ + if (!dbus_mutex_lock (allocator->lock)) + return FALSE; _dbus_assert (slot < allocator->n_allocated_slots); _dbus_assert (allocator->allocated_slots[slot] == slot); + dbus_mutex_unlock (allocator->lock); +#endif if (slot >= list->n_slots) return NULL; diff --git a/dbus/dbus-message.c b/dbus/dbus-message.c index 355d6310..1a112a79 100644 --- a/dbus/dbus-message.c +++ b/dbus/dbus-message.c @@ -1725,12 +1725,15 @@ dbus_message_get_args_iter (DBusMessage *message) iter = dbus_new (DBusMessageIter, 1); - dbus_message_ref (message); + if (iter != NULL) + { + dbus_message_ref (message); + + iter->refcount = 1; + iter->message = message; + iter->pos = 0; + } - iter->refcount = 1; - iter->message = message; - iter->pos = 0; - return iter; } diff --git a/dbus/dbus-server-unix.c b/dbus/dbus-server-unix.c index a6635ddc..0a98c53c 100644 --- a/dbus/dbus-server-unix.c +++ b/dbus/dbus-server-unix.c @@ -65,6 +65,11 @@ unix_finalize (DBusServer *server) dbus_free (server); } +/** + * @todo unreffing the connection at the end may cause + * us to drop the last ref to the connection before + * disconnecting it. That is invalid. + */ static void handle_new_client_fd (DBusServer *server, int client_fd) diff --git a/glib/Makefile.am b/glib/Makefile.am index 6f3906ce..63d2edb9 100644 --- a/glib/Makefile.am +++ b/glib/Makefile.am @@ -16,11 +16,23 @@ libdbus_glib_1_la_LIBADD= $(DBUS_GLIB_LIBS) $(top_builddir)/dbus/libdbus-1.la if DBUS_BUILD_TESTS -noinst_PROGRAMS= test-dbus-glib +noinst_PROGRAMS= test-dbus-glib test-thread-server test-thread-client test_dbus_glib_SOURCES= \ - test-dbus-glib.c + test-dbus-glib.c test_dbus_glib_LDADD= $(top_builddir)/glib/libdbus-glib-1.la +test_thread_server_SOURCES= \ + test-thread-server.c \ + test-thread.h + +test_thread_server_LDADD= $(DBUS_GLIB_THREADS_LIBS) $(top_builddir)/glib/libdbus-glib-1.la + +test_thread_client_SOURCES= \ + test-thread-client.c \ + test-thread.h + +test_thread_client_LDADD= $(DBUS_GLIB_THREADS_LIBS) $(top_builddir)/glib/libdbus-glib-1.la + endif diff --git a/glib/dbus-gmain.c b/glib/dbus-gmain.c index 2638d54b..40523738 100644 --- a/glib/dbus-gmain.c +++ b/glib/dbus-gmain.c @@ -314,6 +314,13 @@ free_source (GSource *source) g_source_destroy (source); } +static void +wakeup_main (void *data) +{ + g_main_context_wakeup (NULL); +} + + /** @} */ /* End of GLib bindings internals */ /** @addtogroup DBusGLib @@ -359,6 +366,10 @@ dbus_connection_setup_with_g_main (DBusConnection *connection) remove_timeout, NULL, NULL); + dbus_connection_set_wakeup_main_function (connection, + wakeup_main, + NULL, NULL); + g_source_attach (source, NULL); g_static_mutex_lock (&connection_slot_lock); @@ -401,7 +412,7 @@ dbus_server_setup_with_g_main (DBusServer *server) add_timeout, remove_timeout, NULL, NULL); - + g_source_attach (source, NULL); g_static_mutex_lock (&server_slot_lock); diff --git a/glib/test-thread-client.c b/glib/test-thread-client.c new file mode 100644 index 00000000..ca78dbb8 --- /dev/null +++ b/glib/test-thread-client.c @@ -0,0 +1,89 @@ +#include +#include "dbus-glib.h" +#include +#include +#include + +#include "test-thread.h" + +DBusConnection *connection; + +static gpointer +thread_func (gpointer data) +{ + gint32 threadnr = GPOINTER_TO_INT (data); + guint32 counter = 0; + DBusMessage *message; + char *str; + + while (1) + { + message = dbus_message_new (NULL, "org.freedesktop.ThreadTest"); + + if (!dbus_message_append_int32 (message, threadnr)) + { + g_print ("thread %d: append threadnr failed\n", threadnr); + } + + if (!dbus_message_append_uint32 (message, counter)) + { + g_print ("thread %d: append counter (%d) failed\n", threadnr, counter); + } + + str = g_strdup_printf ("Thread %d-%d\n", threadnr, counter); + if (!dbus_message_append_string (message, str)) + { + g_print ("thread %d: append string (%s) failed\n", threadnr, str); + } + g_free (str); + + if (!dbus_connection_send_message (connection, + message, + NULL, NULL)) + { + g_print ("thread %d: send message failerd\n", threadnr); + } + dbus_message_unref (message); + + counter ++; + } + + return NULL; +} + +int +main (int argc, char *argv[]) +{ + GMainLoop *loop; + DBusResultCode result; + int i; + + g_thread_init (NULL); + dbus_gthread_init (); + + if(argc < 2) + { + g_error("Need an address as argv[1]\n"); + return 1; + } + + connection = dbus_connection_open (argv[1], &result); + if (connection == NULL) + { + g_printerr ("could not open connection\n"); + return 1; + } + + dbus_connection_setup_with_g_main (connection); + + for (i = 0; i < N_TEST_THREADS; i++) + { + g_thread_create (thread_func, GINT_TO_POINTER (i), FALSE, NULL); + } + + loop = g_main_loop_new (NULL, FALSE); + g_main_run (loop); + + return 0; +} + diff --git a/glib/test-thread-server.c b/glib/test-thread-server.c new file mode 100644 index 00000000..066c393b --- /dev/null +++ b/glib/test-thread-server.c @@ -0,0 +1,246 @@ +#include +#include "dbus-glib.h" +#include +#include + +#include "test-thread.h" + +typedef struct { + guint32 counters[N_TEST_THREADS]; +} ThreadTestData; + +static ThreadTestData * +thread_test_data_new (void) +{ + ThreadTestData *data; + + data = g_new0 (ThreadTestData, 1); + + return data; +} + +static void +thread_test_data_free (ThreadTestData *data) +{ + g_free (data); +} + +static DBusMessageHandler *disconnect_handler; +static DBusMessageHandler *filter_handler; +static int handler_slot; + +static DBusHandlerResult +handle_test_message (DBusMessageHandler *handler, + DBusConnection *connection, + DBusMessage *message, + void *user_data) +{ + ThreadTestData *data = user_data; + DBusMessageIter *iter; + gint32 threadnr; + guint32 counter; + char *str, *expected_str; + GString *counter_str; + int i; + + iter = dbus_message_get_args_iter (message); + g_assert (iter != NULL); + + if (dbus_message_iter_get_arg_type (iter) != DBUS_TYPE_INT32) + { + g_print ("First arg not right type\n"); + goto out; + } + threadnr = dbus_message_iter_get_int32 (iter); + if (threadnr < 0 || threadnr >= N_TEST_THREADS) + { + g_print ("Invalid thread nr\n"); + goto out; + } + + if (! dbus_message_iter_next (iter)) + { + g_print ("Couldn't get second arg\n"); + goto out; + } + + if (dbus_message_iter_get_arg_type (iter) != DBUS_TYPE_UINT32) + { + g_print ("Second arg not right type\n"); + goto out; + } + + counter = dbus_message_iter_get_uint32 (iter); + + if (counter != data->counters[threadnr]) + { + g_print ("Thread %d, counter %d, expected %d\n", threadnr, counter, data->counters[threadnr]); + goto out; + } + data->counters[threadnr]++; + + if (! dbus_message_iter_next (iter)) + { + g_print ("Couldn't get third arg\n"); + goto out; + } + + if (dbus_message_iter_get_arg_type (iter) != DBUS_TYPE_STRING) + { + g_print ("Third arg not right type\n"); + goto out; + } + + str = dbus_message_iter_get_string (iter); + + if (str == NULL) + { + g_print ("No third arg\n"); + goto out; + } + + expected_str = g_strdup_printf ("Thread %d-%d\n", threadnr, counter); + if (strcmp (expected_str, str) != 0) + { + g_print ("Wrong string '%s', expected '%s'\n", str, expected_str); + goto out; + } + g_free (str); + g_free (expected_str); + + if (dbus_message_iter_next (iter)) + { + g_print ("Extra args on end of message\n"); + goto out; + } + + dbus_connection_flush (connection); + + counter_str = g_string_new (""); + for (i = 0; i < N_TEST_THREADS; i++) + { + g_string_append_printf (counter_str, "%d ", data->counters[i]); + } + g_print ("%s\r", counter_str->str); + g_string_free (counter_str, TRUE); + + out: + return DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS; +} + +static DBusHandlerResult +handle_filter (DBusMessageHandler *handler, + DBusConnection *connection, + DBusMessage *message, + void *user_data) +{ + return DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS; +} + +static DBusHandlerResult +handle_disconnect (DBusMessageHandler *handler, + DBusConnection *connection, + DBusMessage *message, + void *user_data) +{ + g_print ("connection disconnected\n"); + dbus_connection_unref (connection); + + return DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS; +} + + +static void +new_connection_callback (DBusServer *server, + DBusConnection *new_connection, + void *user_data) +{ + const char *test_messages[] = { "org.freedesktop.ThreadTest" }; + const char *disconnect_messages[] = { "org.freedesktop.Local.Disconnect" }; + DBusMessageHandler *test_message_handler; + ThreadTestData * data; + + g_print ("new_connection_callback\n"); + + dbus_connection_ref (new_connection); + dbus_connection_setup_with_g_main (new_connection); + + data = thread_test_data_new (); + + test_message_handler = + dbus_message_handler_new (handle_test_message, + data, (DBusFreeFunction)thread_test_data_free); + + if (!dbus_connection_register_handler (new_connection, + test_message_handler, + test_messages, 1)) + goto nomem; + + if (!dbus_connection_set_data (new_connection, + handler_slot, + test_message_handler, + (DBusFreeFunction)dbus_message_handler_unref)) + goto nomem; + + if (!dbus_connection_register_handler (new_connection, + disconnect_handler, + disconnect_messages, 1)) + goto nomem; + + if (!dbus_connection_add_filter (new_connection, + filter_handler)) + goto nomem; + + return; + + nomem: + g_error ("no memory to setup new connection"); +} + +int +main (int argc, char *argv[]) +{ + GMainLoop *loop; + DBusServer *server; + DBusResultCode result; + + g_thread_init (NULL); + dbus_gthread_init (); + + if (argc < 2) + { + fprintf (stderr, "Give the server address as an argument\n"); + return 1; + } + + server = dbus_server_listen (argv[1], &result); + if (server == NULL) + { + fprintf (stderr, "Failed to start server on %s: %s\n", + argv[1], dbus_result_to_string (result)); + return 1; + } + + handler_slot = dbus_connection_allocate_data_slot (); + + filter_handler = + dbus_message_handler_new (handle_filter, NULL, NULL); + if (filter_handler == NULL) + g_error ("no memory for handler"); + + disconnect_handler = + dbus_message_handler_new (handle_disconnect, NULL, NULL); + if (disconnect_handler == NULL) + g_error ("no memory for handler"); + + dbus_server_set_new_connection_function (server, + new_connection_callback, + NULL, NULL); + + dbus_server_setup_with_g_main (server); + + loop = g_main_loop_new (NULL, FALSE); + g_main_run (loop); + + return 0; +} diff --git a/glib/test-thread.h b/glib/test-thread.h new file mode 100644 index 00000000..8c78fba2 --- /dev/null +++ b/glib/test-thread.h @@ -0,0 +1 @@ +#define N_TEST_THREADS 5 -- cgit