From 6dfab4ec7b8d8702d425b2ec9b5f1aff5c8290cd Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 29 Sep 2004 17:38:45 +0000 Subject: renamed module-tunnel to module-tunnel-sink new module module-tunnel-source fix recording git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@248 fefdeb5f-60dc-0310-8127-8f9354f1896f --- polyp/Makefile.am | 14 ++-- polyp/module-tunnel.c | 173 ++++++++++++++++++++++++++++++++++++++++-------- polyp/protocol-native.c | 4 ++ polyp/source.c | 4 +- polyp/source.h | 2 +- 5 files changed, 164 insertions(+), 33 deletions(-) diff --git a/polyp/Makefile.am b/polyp/Makefile.am index 9f6fa1d5..29b388c9 100644 --- a/polyp/Makefile.am +++ b/polyp/Makefile.am @@ -107,7 +107,8 @@ modlib_LTLIBRARIES= \ module-esound-compat-spawnfd.la \ module-esound-compat-spawnpid.la \ module-match.la \ - module-tunnel.la + module-tunnel-sink.la \ + module-tunnel-source.la lib_LTLIBRARIES= \ libpolyp-@PA_MAJORMINOR@.la \ @@ -311,9 +312,14 @@ module_match_la_SOURCES = module-match.c module_match_la_LDFLAGS = -module -avoid-version module_match_la_LIBADD = $(AM_LIBADD) -module_tunnel_la_SOURCES = module-tunnel.c -module_tunnel_la_LDFLAGS = -module -avoid-version -module_tunnel_la_LIBADD = $(AM_LIBADD) libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la +module_tunnel_sink_la_SOURCES = module-tunnel.c +module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS) +module_tunnel_sink_la_LDFLAGS = -module -avoid-version +module_tunnel_sink_la_LIBADD = $(AM_LIBADD) libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la + +module_tunnel_source_la_SOURCES = module-tunnel.c +module_tunnel_source_la_LDFLAGS = -module -avoid-version +module_tunnel_source_la_LIBADD = $(AM_LIBADD) libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la module_esound_compat_spawnfd_la_SOURCES = module-esound-compat-spawnfd.c module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version diff --git a/polyp/module-tunnel.c b/polyp/module-tunnel.c index a151b52a..1a720f3b 100644 --- a/polyp/module-tunnel.c +++ b/polyp/module-tunnel.c @@ -47,11 +47,18 @@ #include "socket-util.h" PA_MODULE_AUTHOR("Lennart Poettering") -PA_MODULE_DESCRIPTION("Tunnel module") -PA_MODULE_USAGE("server= sink= cookie= format= channels= rate= sink_name=") PA_MODULE_VERSION(PACKAGE_VERSION) +#ifdef TUNNEL_SINK +PA_MODULE_DESCRIPTION("Tunnel module for sinks") +PA_MODULE_USAGE("server= sink= cookie= format= channels= rate= sink_name=") +#else +PA_MODULE_DESCRIPTION("Tunnel module for sources") +PA_MODULE_USAGE("server= source= cookie= format= channels= rate= source_name=") +#endif + #define DEFAULT_SINK_NAME "tunnel" +#define DEFAULT_SOURCE_NAME "tunnel" #define DEFAULT_TLENGTH (44100*2*2/10) //(10240*8) #define DEFAULT_MAXLENGTH ((DEFAULT_TLENGTH*3)/2) @@ -65,20 +72,30 @@ PA_MODULE_VERSION(PACKAGE_VERSION) static const char* const valid_modargs[] = { "server", - "sink", "cookie", "format", "channels", "rate", +#ifdef TUNNEL_SINK "sink_name", + "sink", +#else + "source_name", + "source", +#endif NULL, }; static void command_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); + +#ifdef TUNNEL_SINK static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +#endif static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { +#ifdef TUNNEL_SINK [PA_COMMAND_REQUEST] = { command_request }, +#endif [PA_COMMAND_PLAYBACK_STREAM_KILLED] = { command_stream_killed }, [PA_COMMAND_RECORD_STREAM_KILLED] = { command_stream_killed }, }; @@ -88,9 +105,16 @@ struct userdata { struct pa_pstream *pstream; struct pa_pdispatch *pdispatch; - char *server_name, *sink_name; - + char *server_name; +#ifdef TUNNEL_SINK + char *sink_name; struct pa_sink *sink; + uint32_t requested_bytes; +#else + char *source_name; + struct pa_source *source; +#endif + struct pa_module *module; struct pa_core *core; @@ -98,9 +122,8 @@ struct userdata { uint32_t ctag; uint32_t device_index; - uint32_t requested_bytes; uint32_t channel; - + pa_usec_t host_latency; struct pa_time_event *time_event; @@ -124,11 +147,19 @@ static void close_stuff(struct userdata *u) { u->client = NULL; } +#ifdef TUNNEL_SINK if (u->sink) { pa_sink_disconnect(u->sink); pa_sink_unref(u->sink); u->sink = NULL; } +#else + if (u->source) { + pa_source_disconnect(u->source); + pa_source_unref(u->source); + u->source = NULL; + } +#endif if (u->time_event) { u->core->mainloop->time_free(u->time_event); @@ -142,6 +173,15 @@ static void die(struct userdata *u) { pa_module_unload_request(u->module); } +static void command_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct userdata *u = userdata; + assert(pd && t && u && u->pdispatch == pd); + + pa_log(__FILE__": stream killed\n"); + die(u); +} + +#ifdef TUNNEL_SINK static void send_prebuf_request(struct userdata *u) { struct pa_tagstruct *t; @@ -179,14 +219,6 @@ static void send_bytes(struct userdata *u) { } } -static void command_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { - struct userdata *u = userdata; - assert(pd && t && u && u->pdispatch == pd); - - pa_log(__FILE__": stream killed\n"); - die(u); -} - static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t bytes, channel; @@ -210,6 +242,8 @@ static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t send_bytes(u); } +#endif + static void stream_get_latency_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_usec_t buffer_usec, sink_usec, source_usec, transport_usec; @@ -242,13 +276,25 @@ static void stream_get_latency_callback(struct pa_pdispatch *pd, uint32_t comman gettimeofday(&now, NULL); - if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) + if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) { /* local and remote seem to have synchronized clocks */ +#ifdef TUNNEL_SINK transport_usec = pa_timeval_diff(&remote, &local); - else +#else + transport_usec = pa_timeval_diff(&now, &remote); +#endif + } else transport_usec = pa_timeval_diff(&now, &local)/2; - + +#ifdef TUNNEL_SINK u->host_latency = sink_usec + transport_usec; +#else + u->host_latency = source_usec + transport_usec; + if (u->host_latency > sink_usec) + u->host_latency -= sink_usec; + else + u->host_latency = 0; +#endif /* pa_log(__FILE__": estimated host latency: %0.0f usec\n", (double) u->host_latency); */ } @@ -260,7 +306,11 @@ static void request_latency(struct userdata *u) { assert(u); t = pa_tagstruct_new(NULL, 0); +#ifdef TUNNEL_SINK pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY); +#else + pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY); +#endif pa_tagstruct_putu32(t, tag = u->ctag++); pa_tagstruct_putu32(t, u->channel); @@ -286,7 +336,9 @@ static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, ui if (pa_tagstruct_getu32(t, &u->channel) < 0 || pa_tagstruct_getu32(t, &u->device_index) < 0 || +#ifdef TUNNEL_SINK pa_tagstruct_getu32(t, &u->requested_bytes) < 0 || +#endif !pa_tagstruct_eof(t)) { pa_log(__FILE__": invalid reply.\n"); die(u); @@ -294,7 +346,9 @@ static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, ui } request_latency(u); +#ifdef TUNNEL_SINK send_bytes(u); +#endif } static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { @@ -311,11 +365,17 @@ static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, u die(u); return; } - +#ifdef TUNNEL_SINK snprintf(name, sizeof(name), "Tunnel from host '%s', user '%s', sink '%s'", pa_get_host_name(hn, sizeof(hn)), pa_get_user_name(un, sizeof(un)), u->sink->name); +#else + snprintf(name, sizeof(name), "Tunnel from host '%s', user '%s', source '%s'", + pa_get_host_name(hn, sizeof(hn)), + pa_get_user_name(un, sizeof(un)), + u->source->name); +#endif reply = pa_tagstruct_new(NULL, 0); pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME); @@ -325,6 +385,7 @@ static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, u /* We ignore the server's reply here */ reply = pa_tagstruct_new(NULL, 0); +#ifdef TUNNEL_SINK pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM); pa_tagstruct_putu32(reply, tag = u->ctag++); pa_tagstruct_puts(reply, name); @@ -337,6 +398,17 @@ static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, u pa_tagstruct_putu32(reply, DEFAULT_PREBUF); pa_tagstruct_putu32(reply, DEFAULT_MINREQ); pa_tagstruct_putu32(reply, PA_VOLUME_NORM); +#else + pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM); + pa_tagstruct_putu32(reply, tag = u->ctag++); + pa_tagstruct_puts(reply, name); + pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec); + pa_tagstruct_putu32(reply, PA_INVALID_INDEX); + pa_tagstruct_puts(reply, u->source_name); + pa_tagstruct_putu32(reply, DEFAULT_MAXLENGTH); + pa_tagstruct_put_boolean(reply, 0); + pa_tagstruct_putu32(reply, DEFAULT_FRAGSIZE); +#endif pa_pstream_send_tagstruct(u->pstream, reply); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u); @@ -361,6 +433,21 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack } } +#ifndef TUNNEL_SINK +static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) { + struct userdata *u = userdata; + assert(p && chunk && u); + + if (channel != u->channel) { + pa_log(__FILE__": recieved memory block on bad channel.\n"); + die(u); + return; + } + + pa_source_post(u->source, chunk); +} +#endif + static void on_connection(struct pa_socket_client *sc, struct pa_iochannel *io, void *userdata) { struct userdata *u = userdata; struct pa_tagstruct *t; @@ -381,6 +468,9 @@ static void on_connection(struct pa_socket_client *sc, struct pa_iochannel *io, pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u); pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u); +#ifndef TUNNEL_SINK + pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u); +#endif t = pa_tagstruct_new(NULL, 0); pa_tagstruct_putu32(t, PA_COMMAND_AUTH); @@ -391,6 +481,7 @@ static void on_connection(struct pa_socket_client *sc, struct pa_iochannel *io, } +#ifdef TUNNEL_SINK static void sink_notify(struct pa_sink*sink) { struct userdata *u; assert(sink && sink->userdata); @@ -417,6 +508,15 @@ static pa_usec_t sink_get_latency(struct pa_sink *sink) { return usec; } +#else +static pa_usec_t source_get_latency(struct pa_source *source) { + struct userdata *u; + assert(source && source->userdata); + u = source->userdata; + + return u->host_latency; +} +#endif static void timeout_callback(struct pa_mainloop_api *m, struct pa_time_event*e, const struct timeval *tv, void *userdata) { struct userdata *u = userdata; @@ -449,11 +549,17 @@ int pa__init(struct pa_core *c, struct pa_module*m) { u->client = NULL; u->pdispatch = NULL; u->pstream = NULL; - u->server_name = u->sink_name = NULL; + u->server_name = NULL; +#ifdef TUNNEL_SINK + u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));; u->sink = NULL; + u->requested_bytes = 0; +#else + u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));; + u->source = NULL; +#endif u->ctag = 1; u->device_index = u->channel = PA_INVALID_INDEX; - u->requested_bytes = 0; u->host_latency = 0; if (pa_authkey_load_from_home(pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), u->auth_cookie, sizeof(u->auth_cookie)) < 0) { @@ -466,8 +572,6 @@ int pa__init(struct pa_core *c, struct pa_module*m) { goto fail; } - u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); - ss = c->default_sample_spec; if (pa_modargs_get_sample_spec(ma, &ss) < 0) { pa_log(__FILE__": invalid sample format specification\n"); @@ -493,7 +597,8 @@ int pa__init(struct pa_core *c, struct pa_module*m) { goto fail; pa_socket_client_set_callback(u->client, on_connection, u); - + +#ifdef TUNNEL_SINK if (!(u->sink = pa_sink_new(c, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss))) { pa_log(__FILE__": failed to create sink.\n"); goto fail; @@ -504,12 +609,24 @@ int pa__init(struct pa_core *c, struct pa_module*m) { u->sink->userdata = u; u->sink->description = pa_sprintf_malloc("Tunnel to '%s%s%s'", u->sink_name ? u->sink_name : "", u->sink_name ? "@" : "", u->server_name); + pa_sink_set_owner(u->sink, m); +#else + if (!(u->source = pa_source_new(c, pa_modargs_get_value(ma, "source_name", DEFAULT_SOURCE_NAME), 0, &ss))) { + pa_log(__FILE__": failed to create source.\n"); + goto fail; + } + + u->source->get_latency = source_get_latency; + u->source->userdata = u; + u->source->description = pa_sprintf_malloc("Tunnel to '%s%s%s'", u->source_name ? u->source_name : "", u->source_name ? "@" : "", u->server_name); + + pa_source_set_owner(u->source, m); +#endif + gettimeofday(&ntv, NULL); ntv.tv_sec += LATENCY_INTERVAL; u->time_event = c->mainloop->time_new(c->mainloop, &ntv, timeout_callback, u); - pa_sink_set_owner(u->sink, m); - pa_modargs_free(ma); return 0; @@ -531,7 +648,11 @@ void pa__done(struct pa_core *c, struct pa_module*m) { close_stuff(u); +#ifdef TUNNEL_SINK pa_xfree(u->sink_name); +#else + pa_xfree(u->source_name); +#endif pa_xfree(u->server_name); pa_xfree(u); diff --git a/polyp/protocol-native.c b/polyp/protocol-native.c index aeccd504..3b816419 100644 --- a/polyp/protocol-native.c +++ b/polyp/protocol-native.c @@ -656,6 +656,7 @@ static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t comma struct pa_sample_spec ss; struct pa_tagstruct *reply; struct pa_source *source; + int corked; assert(c && t && c->protocol && c->protocol->core); if (pa_tagstruct_gets(t, &name) < 0 || !name || @@ -663,6 +664,7 @@ static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t comma pa_tagstruct_getu32(t, &source_index) < 0 || pa_tagstruct_gets(t, &source_name) < 0 || pa_tagstruct_getu32(t, &maxlength) < 0 || + pa_tagstruct_get_boolean(t, &corked) < 0 || pa_tagstruct_getu32(t, &fragment_size) < 0 || !pa_tagstruct_eof(t)) { protocol_error(c); @@ -688,6 +690,8 @@ static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t comma pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID); return; } + + pa_source_output_cork(s->source_output, corked); reply = pa_tagstruct_new(NULL, 0); assert(reply); diff --git a/polyp/source.c b/polyp/source.c index 23b8bf8a..7df432d5 100644 --- a/polyp/source.c +++ b/polyp/source.c @@ -131,7 +131,7 @@ void pa_source_notify(struct pa_source*s) { } static int do_post(void *p, uint32_t index, int *del, void*userdata) { - struct pa_memchunk *chunk = userdata; + const struct pa_memchunk *chunk = userdata; struct pa_source_output *o = p; assert(o && o->push && del && chunk); @@ -139,7 +139,7 @@ static int do_post(void *p, uint32_t index, int *del, void*userdata) { return 0; } -void pa_source_post(struct pa_source*s, struct pa_memchunk *chunk) { +void pa_source_post(struct pa_source*s, const struct pa_memchunk *chunk) { assert(s && s->ref >= 1 && chunk); pa_source_ref(s); diff --git a/polyp/source.h b/polyp/source.h index cda9e698..3cac2ad1 100644 --- a/polyp/source.h +++ b/polyp/source.h @@ -63,7 +63,7 @@ void pa_source_unref(struct pa_source *s); struct pa_source* pa_source_ref(struct pa_source *c); /* Pass a new memory block to all output streams */ -void pa_source_post(struct pa_source*s, struct pa_memchunk *b); +void pa_source_post(struct pa_source*s, const struct pa_memchunk *b); void pa_source_notify(struct pa_source *s); -- cgit