summaryrefslogtreecommitdiffstats
path: root/src/pulse/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pulse/stream.c')
-rw-r--r--src/pulse/stream.c169
1 files changed, 117 insertions, 52 deletions
diff --git a/src/pulse/stream.c b/src/pulse/stream.c
index 339a89e5..5baf5c2c 100644
--- a/src/pulse/stream.c
+++ b/src/pulse/stream.c
@@ -30,13 +30,14 @@
#include <pulse/def.h>
#include <pulse/timeval.h>
+#include <pulse/rtclock.h>
#include <pulse/xmalloc.h>
#include <pulsecore/pstream-util.h>
#include <pulsecore/log.h>
#include <pulsecore/hashmap.h>
#include <pulsecore/macro.h>
-#include <pulsecore/rtclock.h>
+#include <pulsecore/core-rtclock.h>
#include "fork-detect.h"
#include "internal.h"
@@ -143,12 +144,13 @@ pa_stream *pa_stream_new_with_proplist(
s->suspended = FALSE;
s->corked = FALSE;
+ s->write_memblock = NULL;
+ s->write_data = NULL;
+
pa_memchunk_reset(&s->peek_memchunk);
s->peek_data = NULL;
-
s->record_memblockq = NULL;
-
memset(&s->timing_info, 0, sizeof(s->timing_info));
s->timing_info_valid = FALSE;
@@ -220,6 +222,11 @@ static void stream_free(pa_stream *s) {
stream_unlink(s);
+ if (s->write_memblock) {
+ pa_memblock_release(s->write_memblock);
+ pa_memblock_unref(s->write_data);
+ }
+
if (s->peek_memchunk.memblock) {
if (s->peek_data)
pa_memblock_release(s->peek_memchunk.memblock);
@@ -319,14 +326,10 @@ static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
}
if (s->auto_timing_update_event) {
- struct timeval next;
-
if (force)
s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
- pa_gettimeofday(&next);
- pa_timeval_add(&next, s->auto_timing_interval_usec);
- s->mainloop->time_restart(s->auto_timing_update_event, &next);
+ pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
}
@@ -373,7 +376,7 @@ static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t
if (!s->smoother)
return;
- x = pa_rtclock_usec();
+ x = pa_rtclock_now();
if (s->timing_info_valid) {
if (aposteriori)
@@ -800,7 +803,7 @@ static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
request_auto_timing_update(s, TRUE);
}
-static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
+static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
pa_stream *s = userdata;
pa_assert(s);
@@ -822,12 +825,9 @@ static void create_stream_complete(pa_stream *s) {
s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
- struct timeval tv;
- pa_gettimeofday(&tv);
s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
- pa_timeval_add(&tv, s->auto_timing_interval_usec);
pa_assert(!s->auto_timing_update_event);
- s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
+ s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
request_auto_timing_update(s, TRUE);
}
@@ -1057,7 +1057,7 @@ static int create_stream(
if (flags & PA_STREAM_INTERPOLATE_TIMING) {
pa_usec_t x;
- x = pa_rtclock_usec();
+ x = pa_rtclock_now();
pa_assert(!s->smoother);
s->smoother = pa_smoother_new(
@@ -1193,20 +1193,60 @@ int pa_stream_connect_record(
return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
}
+int pa_stream_begin_write(
+ pa_stream *s,
+ void **data,
+ size_t *nbytes) {
+
+ pa_assert(s);
+ pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+ PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
+
+ if (!s->write_memblock) {
+ s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
+ s->write_data = pa_memblock_acquire(s->write_memblock);
+ }
+
+ *data = s->write_data;
+ *nbytes = pa_memblock_get_length(s->write_memblock);
+
+ return 0;
+}
+
+int pa_stream_cancel_write(
+ pa_stream *s) {
+
+ pa_assert(s);
+ pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+ PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
+
+ pa_assert(s->write_data);
+
+ pa_memblock_release(s->write_memblock);
+ pa_memblock_unref(s->write_memblock);
+ s->write_memblock = NULL;
+ s->write_data = NULL;
+
+ return 0;
+}
+
int pa_stream_write(
pa_stream *s,
const void *data,
size_t length,
- void (*free_cb)(void *p),
+ pa_free_cb_t free_cb,
int64_t offset,
pa_seek_mode_t seek) {
- pa_memchunk chunk;
- pa_seek_mode_t t_seek;
- int64_t t_offset;
- size_t t_length;
- const void *t_data;
-
pa_assert(s);
pa_assert(PA_REFCNT_VALUE(s) >= 1);
pa_assert(data);
@@ -1216,46 +1256,71 @@ int pa_stream_write(
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context,
+ !s->write_memblock ||
+ ((data >= s->write_data) &&
+ ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
+ PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
- if (length <= 0)
- return 0;
+ if (s->write_memblock) {
+ pa_memchunk chunk;
- t_seek = seek;
- t_offset = offset;
- t_length = length;
- t_data = data;
+ /* pa_stream_write_begin() was called before */
- while (t_length > 0) {
+ pa_memblock_release(s->write_memblock);
- chunk.index = 0;
+ chunk.memblock = s->write_memblock;
+ chunk.index = (const char *) data - (const char *) s->write_data;
+ chunk.length = length;
- if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
- chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
- chunk.length = t_length;
- } else {
- void *d;
+ s->write_memblock = NULL;
+ s->write_data = NULL;
- chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
- chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
+ pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
+ pa_memblock_unref(chunk.memblock);
- d = pa_memblock_acquire(chunk.memblock);
- memcpy(d, t_data, chunk.length);
- pa_memblock_release(chunk.memblock);
- }
+ } else {
+ pa_seek_mode_t t_seek = seek;
+ int64_t t_offset = offset;
+ size_t t_length = length;
+ const void *t_data = data;
- pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
+ /* pa_stream_write_begin() was not called before */
- t_offset = 0;
- t_seek = PA_SEEK_RELATIVE;
+ while (t_length > 0) {
+ pa_memchunk chunk;
- t_data = (const uint8_t*) t_data + chunk.length;
- t_length -= chunk.length;
+ chunk.index = 0;
- pa_memblock_unref(chunk.memblock);
- }
+ if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
+ chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
+ chunk.length = t_length;
+ } else {
+ void *d;
+
+ chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
+ chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
+
+ d = pa_memblock_acquire(chunk.memblock);
+ memcpy(d, t_data, chunk.length);
+ pa_memblock_release(chunk.memblock);
+ }
- if (free_cb && pa_pstream_get_shm(s->context->pstream))
- free_cb((void*) data);
+ pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
+
+ t_offset = 0;
+ t_seek = PA_SEEK_RELATIVE;
+
+ t_data = (const uint8_t*) t_data + chunk.length;
+ t_length -= chunk.length;
+
+ pa_memblock_unref(chunk.memblock);
+ }
+
+ if (free_cb && pa_pstream_get_shm(s->context->pstream))
+ free_cb((void*) data);
+ }
/* This is obviously wrong since we ignore the seeking index . But
* that's OK, the server side applies the same error */
@@ -1594,7 +1659,7 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
if (o->stream->smoother) {
pa_usec_t u, x;
- u = x = pa_rtclock_usec() - i->transport_usec;
+ u = x = pa_rtclock_now() - i->transport_usec;
if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
pa_usec_t su;
@@ -2103,7 +2168,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
if (s->smoother)
- usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
+ usec = pa_smoother_get(s->smoother, pa_rtclock_now());
else
usec = calc_time(s, FALSE);