summaryrefslogtreecommitdiffstats
path: root/gst/rtpmanager/async_jitter_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtpmanager/async_jitter_queue.c')
-rw-r--r--gst/rtpmanager/async_jitter_queue.c679
1 files changed, 679 insertions, 0 deletions
diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c
new file mode 100644
index 00000000..22a8ed0e
--- /dev/null
+++ b/gst/rtpmanager/async_jitter_queue.c
@@ -0,0 +1,679 @@
+/*
+ * Async Jitter Queue based on g_async_queue
+ * This code is GST RTP smart and deals with timestamps
+ *
+ * Farsight Voice+Video library
+ * Copyright 2007 Collabora Ltd,
+ * Copyright 2007 Nokia Corporation
+ * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
+ *
+ * This is an async queue that has a buffering mecanism based on the set low
+ * and high threshold. When the lower threshold is reached, the queue will
+ * fill itself up until the higher threshold is reached before allowing any
+ * pops to occur. This allows a jitterbuffer of at least min threshold items
+ * to be available.
+ */
+
+/* GLIB - Library of useful routines for C programming
+ * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
+ *
+ * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
+ * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * MT safe
+ */
+
+#include "config.h"
+
+#include "async_jitter_queue.h"
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#define DEFAULT_LOW_THRESHOLD 0.1
+#define DEFAULT_HIGH_THRESHOLD 0.9
+
+struct _AsyncJitterQueue
+{
+ GMutex *mutex;
+ GCond *cond;
+ GQueue *queue;
+ guint waiting_threads;
+ gint32 ref_count;
+ gfloat low_threshold;
+ gfloat high_threshold;
+ guint32 max_queue_length;
+ gboolean buffering;
+ gboolean pop_flushing;
+ gboolean pop_blocking;
+ guint pops_remaining;
+ guint32 tail_buffer_duration;
+};
+
+/**
+ * async_jitter_queue_new:
+ *
+ * Creates a new asynchronous queue with the initial reference count of 1.
+ *
+ * Return value: the new #AsyncJitterQueue.
+ **/
+AsyncJitterQueue *
+async_jitter_queue_new (void)
+{
+ AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
+
+ retval->mutex = g_mutex_new ();
+ retval->cond = g_cond_new ();
+ retval->queue = g_queue_new ();
+ retval->waiting_threads = 0;
+ retval->ref_count = 1;
+ retval->low_threshold = DEFAULT_LOW_THRESHOLD;
+ retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
+ retval->buffering = TRUE; /* we need to buffer initially */
+ retval->pop_flushing = TRUE;
+ retval->pop_blocking = TRUE;
+ retval->pops_remaining = 0;
+ retval->tail_buffer_duration = 0;
+ return retval;
+}
+
+/* checks buffering state and wakes up waiting pops */
+void
+signal_waiting_threads (AsyncJitterQueue * queue)
+{
+ if (async_jitter_queue_length_ts_units_unlocked (queue) >=
+ queue->high_threshold * queue->max_queue_length) {
+ queue->buffering = FALSE;
+ }
+
+ if (queue->waiting_threads > 0) {
+ if (!queue->buffering) {
+ g_cond_signal (queue->cond);
+ }
+ }
+}
+
+/**
+ * async_jitter_queue_ref:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Increases the reference count of the asynchronous @queue by 1. You
+ * do not need to hold the lock to call this function.
+ *
+ * Returns: the @queue that was passed in (since 2.6)
+ **/
+AsyncJitterQueue *
+async_jitter_queue_ref (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, NULL);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+ g_atomic_int_inc (&queue->ref_count);
+
+ return queue;
+}
+
+/**
+ * async_jitter_queue_ref_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Increases the reference count of the asynchronous @queue by 1.
+ **/
+void
+async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_atomic_int_inc (&queue->ref_count);
+}
+
+/**
+ * async_jitter_queue_set_low_threshold:
+ * @queue: a #AsyncJitterQueue.
+ * @threshold: the lower threshold (fraction of max size)
+ *
+ * Sets the low threshold on the queue. This threshold indicates the minimum
+ * number of items allowed in the queue before we refill it up to the set
+ * maximum threshold.
+ **/
+void
+async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
+ gfloat threshold)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->low_threshold = threshold;
+}
+
+/**
+ * async_jitter_queue_set_max_threshold:
+ * @queue: a #AsyncJitterQueue.
+ * @threshold: the higher threshold (fraction of max size)
+ *
+ * Sets the high threshold on the queue. This threshold indicates the amount of
+ * items to fill in the queue before releasing any blocking pop calls. This
+ * blocking mecanism is only triggered when we reach the low threshold and must
+ * refill the queue.
+ **/
+void
+async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
+ gfloat threshold)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->high_threshold = threshold;
+}
+
+/* set the maximum queue length in RTP timestamp units */
+void
+async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
+ guint32 max_length)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->max_queue_length = max_length;
+}
+
+GQueue *
+async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, NULL);
+
+ return queue->queue;
+}
+
+static guint32
+calculate_ts_diff (guint32 high_ts, guint32 low_ts)
+{
+ /* it needs to work if ts wraps */
+ if (high_ts >= low_ts) {
+ return high_ts - low_ts;
+ } else {
+ return high_ts + G_MAXUINT32 + 1 - low_ts;
+ }
+}
+
+/* this function returns the length of the queue in timestamp units. It will
+ * also add the duration of the last buffer in the queue */
+/* FIXME This function wrongly assumes that there are no missing packets inside
+ * the buffer, in reality it needs to check for gaps and subsctract those from
+ * the total */
+guint32
+async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
+{
+ guint32 tail_ts;
+ guint32 head_ts;
+ guint32 ret;
+ GstBuffer *head;
+ GstBuffer *tail;
+
+ g_return_val_if_fail (queue, 0);
+
+ if (queue->queue->length < 2) {
+ return 0;
+ }
+
+ tail = g_queue_peek_tail (queue->queue);
+ head = g_queue_peek_head (queue->queue);
+
+ if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
+ return 0;
+
+ tail_ts = gst_rtp_buffer_get_timestamp (tail);
+ head_ts = gst_rtp_buffer_get_timestamp (head);
+
+ ret = calculate_ts_diff (head_ts, tail_ts);
+
+ /* let's add the duration of the tail buffer */
+ ret += queue->tail_buffer_duration;
+
+ return ret;
+}
+
+/**
+ * async_jitter_queue_unref_and_unlock:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Decreases the reference count of the asynchronous @queue by 1 and
+ * releases the lock. This function must be called while holding the
+ * @queue's lock. If the reference count went to 0, the @queue will be
+ * destroyed and the memory allocated will be freed.
+ **/
+void
+async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_mutex_unlock (queue->mutex);
+ async_jitter_queue_unref (queue);
+}
+
+/**
+ * async_jitter_queue_unref:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Decreases the reference count of the asynchronous @queue by 1. If
+ * the reference count went to 0, the @queue will be destroyed and the
+ * memory allocated will be freed. So you are not allowed to use the
+ * @queue afterwards, as it might have disappeared. You do not need to
+ * hold the lock to call this function.
+ **/
+void
+async_jitter_queue_unref (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ if (g_atomic_int_dec_and_test (&queue->ref_count)) {
+ g_return_if_fail (queue->waiting_threads == 0);
+ g_mutex_free (queue->mutex);
+ if (queue->cond)
+ g_cond_free (queue->cond);
+ g_queue_free (queue->queue);
+ g_free (queue);
+ }
+}
+
+/**
+ * async_jitter_queue_lock:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Acquires the @queue's lock. After that you can only call the
+ * <function>async_jitter_queue_*_unlocked()</function> function variants on that
+ * @queue. Otherwise it will deadlock.
+ **/
+void
+async_jitter_queue_lock (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_mutex_lock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_unlock:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Releases the queue's lock.
+ **/
+void
+async_jitter_queue_unlock (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_push:
+ * @queue: a #AsyncJitterQueue.
+ * @data: @data to push into the @queue.
+ *
+ * Pushes the @data into the @queue. @data must not be %NULL.
+ **/
+void
+async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+ g_return_if_fail (data);
+
+ g_mutex_lock (queue->mutex);
+ async_jitter_queue_push_unlocked (queue, data);
+ g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_push_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @data: @data to push into the @queue.
+ *
+ * Pushes the @data into the @queue. @data must not be %NULL. This
+ * function must be called while holding the @queue's lock.
+ **/
+void
+async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+ g_return_if_fail (data);
+
+ g_queue_push_head (queue->queue, data);
+
+ signal_waiting_threads (queue);
+}
+
+/**
+ * async_jitter_queue_push_sorted:
+ * @queue: a #AsyncJitterQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ * is passed two elements of the @queue. The function should return
+ * 0 if they are equal, a negative value if the first element
+ * should be higher in the @queue or a positive value if the first
+ * element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ *
+ * Inserts @data into @queue using @func to determine the new
+ * position.
+ *
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ *
+ * This function will lock @queue before it sorts the queue and unlock
+ * it when it is finished.
+ *
+ * For an example of @func see async_jitter_queue_sort().
+ *
+ * Since: 2.10
+ **/
+gboolean
+async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
+ gpointer data, GCompareDataFunc func, gpointer user_data)
+{
+ g_return_val_if_fail (queue != NULL, FALSE);
+ gboolean ret;
+
+ g_mutex_lock (queue->mutex);
+ ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
+ g_mutex_unlock (queue->mutex);
+
+ return ret;
+}
+
+/**
+ * async_jitter_queue_push_sorted_unlocked:
+ * @queue: a #AsyncJitterQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ * is passed two elements of the @queue. The function should return
+ * 0 if they are equal, a negative value if the first element
+ * should be higher in the @queue or a positive value if the first
+ * element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ *
+ * Inserts @data into @queue using @func to determine the new
+ * position.
+ *
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ *
+ * If @GCompareDataFunc returns 0, this function does not insert @data and
+ * return FALSE.
+ *
+ * This function is called while holding the @queue's lock.
+ *
+ * For an example of @func see async_jitter_queue_sort().
+ *
+ * Since: 2.10
+ **/
+gboolean
+async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
+ gpointer data, GCompareDataFunc func, gpointer user_data)
+{
+ GList *list;
+ gint func_ret = TRUE;
+
+ g_return_val_if_fail (queue != NULL, FALSE);
+
+ list = queue->queue->head;
+ while (list && (func_ret = func (list->data, data, user_data)) < 0)
+ list = list->next;
+
+ if (func_ret == 0) {
+ return FALSE;
+ }
+ if (list) {
+ g_queue_insert_before (queue->queue, list, data);
+ } else {
+ g_queue_push_tail (queue->queue, data);
+ }
+
+ signal_waiting_threads (queue);
+ return TRUE;
+}
+
+void
+async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
+ GList * sibling, gpointer data)
+{
+ g_return_if_fail (queue != NULL);
+
+ g_queue_insert_before (queue->queue, sibling, data);
+
+ signal_waiting_threads (queue);
+}
+
+static gpointer
+async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
+{
+ gpointer retval;
+ GstBuffer *tail_buffer = NULL;
+
+ if (queue->pop_flushing)
+ return NULL;
+
+ while (queue->pop_blocking) {
+ queue->waiting_threads++;
+ g_cond_wait (queue->cond, queue->mutex);
+ queue->waiting_threads--;
+ if (queue->pop_flushing)
+ return NULL;
+ }
+
+ if (async_jitter_queue_length_ts_units_unlocked (queue) <=
+ queue->low_threshold * queue->max_queue_length
+ && queue->pops_remaining == 0) {
+ if (!queue->buffering) {
+ queue->buffering = TRUE;
+ queue->pops_remaining = queue->queue->length;
+ } else {
+ while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
+ queue->waiting_threads++;
+ g_cond_wait (queue->cond, queue->mutex);
+ queue->waiting_threads--;
+ if (queue->pop_flushing)
+ return NULL;
+ }
+ }
+ }
+
+ retval = g_queue_pop_tail (queue->queue);
+ if (queue->pops_remaining)
+ queue->pops_remaining--;
+
+ tail_buffer = g_queue_peek_tail (queue->queue);
+ if (tail_buffer) {
+ if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
+ queue->tail_buffer_duration = 0;
+ } else if (gst_rtp_buffer_get_seq (tail_buffer)
+ - gst_rtp_buffer_get_seq (retval) == 1) {
+ queue->tail_buffer_duration =
+ calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
+ gst_rtp_buffer_get_timestamp (retval));
+ } else {
+ /* There is a sequence number gap -> we can't calculate the duration
+ * let's just set it to 0 */
+ queue->tail_buffer_duration = 0;
+ }
+ }
+
+ g_assert (retval);
+
+ return retval;
+}
+
+/**
+ * async_jitter_queue_pop:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Pops data from the @queue. This function blocks until data become
+ * available. If pop is disabled, tis function return NULL.
+ *
+ * Return value: data from the queue.
+ **/
+gpointer
+async_jitter_queue_pop (AsyncJitterQueue * queue)
+{
+ gpointer retval;
+
+ g_return_val_if_fail (queue, NULL);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+ g_mutex_lock (queue->mutex);
+ retval = async_jitter_queue_pop_intern_unlocked (queue);
+ g_mutex_unlock (queue->mutex);
+
+ return retval;
+}
+
+/**
+ * async_jitter_queue_pop_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Pops data from the @queue. This function blocks until data become
+ * available. This function must be called while holding the @queue's
+ * lock.
+ *
+ * Return value: data from the queue.
+ **/
+gpointer
+async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, NULL);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+ return async_jitter_queue_pop_intern_unlocked (queue);
+}
+
+/**
+ * async_jitter_queue_length:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Returns the length of the queue
+ * Return value: the length of the @queue.
+ **/
+gint
+async_jitter_queue_length (AsyncJitterQueue * queue)
+{
+ gint retval;
+
+ g_return_val_if_fail (queue, 0);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
+
+ g_mutex_lock (queue->mutex);
+ retval = queue->queue->length;
+ g_mutex_unlock (queue->mutex);
+
+ return retval;
+}
+
+/**
+ * async_jitter_queue_length_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Returns the length of the queue.
+ *
+ * Return value: the length of the @queue.
+ **/
+gint
+async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, 0);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
+
+ return queue->queue->length;
+}
+
+/**
+ * async_jitter_queue_set_flushing_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @free_func: a function to call to free the elements
+ * @user_data: user data passed to @free_func
+ *
+ * This function is used to set/unset flushing. If flushing is set any
+ * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
+ * return NULL. Flushing is set by default.
+ */
+void
+async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
+ GFunc free_func, gpointer user_data)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->pop_flushing = TRUE;
+ /* let's unblock any remaining pops */
+ if (queue->waiting_threads > 0)
+ g_cond_broadcast (queue->cond);
+ /* free data from queue */
+ g_queue_foreach (queue->queue, free_func, user_data);
+}
+
+/**
+ * async_jitter_queue_unset_flushing_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @free_func: a function to call to free the elements
+ * @user_data: user data passed to @free_func
+ *
+ * This function is used to set/unset flushing. If flushing is set any
+ * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
+ * return NULL. Flushing is set by default.
+ */
+void
+async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->pop_flushing = FALSE;
+ /* let's unblock any remaining pops */
+ if (queue->waiting_threads > 0)
+ g_cond_broadcast (queue->cond);
+}
+
+/**
+ * async_jitter_queue_set_blocking_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @enabled: a boolean to enable/disable blocking
+ *
+ * This function is used to enable/disable blocking. If blocking is enabled any
+ * pops will be blocked until the queue is unblocked. The queue is blocked by
+ * default.
+ */
+void
+async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
+ gboolean blocking)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->pop_blocking = blocking;
+ /* let's unblock any remaining pops */
+ if (queue->waiting_threads > 0)
+ g_cond_broadcast (queue->cond);
+}