From 0e436a6926af56f37a74a03bb5e143e078ca0d55 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 18 Aug 2006 19:55:18 +0000 Subject: Rework memory management to allow shared memory data transfer. The central idea is to allocate all audio memory blocks from a per-process memory pool which is available as read-only SHM segment to other local processes. Then, instead of writing the actual audio data to the socket just write references to this shared memory pool. To work optimally all memory blocks should now be of type PA_MEMBLOCK_POOL or PA_MEMBLOCK_POOL_EXTERNAL. The function pa_memblock_new() now generates memory blocks of this type by default. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1266 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/pstream.c | 396 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 332 insertions(+), 64 deletions(-) (limited to 'src/pulsecore/pstream.c') diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 7096d65a..421f5de9 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -49,28 +49,45 @@ #include "pstream.h" +/* We piggyback information if audio data blocks are stored in SHM on the seek mode */ +#define PA_FLAG_SHMDATA 0x80000000LU +#define PA_FLAG_SHMRELEASE 0x40000000LU +#define PA_FLAG_SHMREVOKE 0xC0000000LU +#define PA_FLAG_SHMMASK 0xFF000000LU +#define PA_FLAG_SEEKMASK 0x000000FFLU + +/* The sequence descriptor header consists of 5 32bit integers: */ enum { PA_PSTREAM_DESCRIPTOR_LENGTH, PA_PSTREAM_DESCRIPTOR_CHANNEL, PA_PSTREAM_DESCRIPTOR_OFFSET_HI, PA_PSTREAM_DESCRIPTOR_OFFSET_LO, - PA_PSTREAM_DESCRIPTOR_SEEK, + PA_PSTREAM_DESCRIPTOR_FLAGS, PA_PSTREAM_DESCRIPTOR_MAX }; +/* If we have an SHM block, this info follows the descriptor */ +enum { + PA_PSTREAM_SHM_BLOCKID, + PA_PSTREAM_SHM_SHMID, + PA_PSTREAM_SHM_INDEX, + PA_PSTREAM_SHM_LENGTH, + PA_PSTREAM_SHM_MAX +}; + typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX]; #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t)) #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */ struct item_info { - enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type; + enum { + PA_PSTREAM_ITEM_PACKET, + PA_PSTREAM_ITEM_MEMBLOCK, + PA_PSTREAM_ITEM_SHMRELEASE, + PA_PSTREAM_ITEM_SHMREVOKE + } type; - /* memblock info */ - pa_memchunk chunk; - uint32_t channel; - int64_t offset; - pa_seek_mode_t seek_mode; /* packet info */ pa_packet *packet; @@ -78,6 +95,15 @@ struct item_info { int with_creds; pa_creds creds; #endif + + /* memblock info */ + pa_memchunk chunk; + uint32_t channel; + int64_t offset; + pa_seek_mode_t seek_mode; + + /* release/revoke info */ + uint32_t block_id; }; struct pa_pstream { @@ -91,20 +117,26 @@ struct pa_pstream { int dead; struct { - struct item_info* current; pa_pstream_descriptor descriptor; + struct item_info* current; + uint32_t shm_info[PA_PSTREAM_SHM_MAX]; void *data; size_t index; } write; struct { + pa_pstream_descriptor descriptor; pa_memblock *memblock; pa_packet *packet; - pa_pstream_descriptor descriptor; + uint32_t shm_info[PA_PSTREAM_SHM_MAX]; void *data; size_t index; } read; + int use_shm; + pa_memimport *import; + pa_memexport *export; + pa_pstream_packet_cb_t recieve_packet_callback; void *recieve_packet_callback_userdata; @@ -117,7 +149,7 @@ struct pa_pstream { pa_pstream_notify_cb_t die_callback; void *die_callback_userdata; - pa_memblock_stat *memblock_stat; + pa_mempool *mempool; #ifdef HAVE_CREDS pa_creds read_creds, write_creds; @@ -178,16 +210,19 @@ static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) do_something(p); } -pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) { +static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata); + +pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) { pa_pstream *p; + + assert(m); assert(io); + assert(pool); p = pa_xnew(pa_pstream, 1); - p->ref = 1; p->io = io; pa_iochannel_set_callback(io, io_callback, p); - p->dead = 0; p->mainloop = m; @@ -199,24 +234,24 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_sta p->write.current = NULL; p->write.index = 0; - p->read.memblock = NULL; p->read.packet = NULL; p->read.index = 0; p->recieve_packet_callback = NULL; p->recieve_packet_callback_userdata = NULL; - p->recieve_memblock_callback = NULL; p->recieve_memblock_callback_userdata = NULL; - p->drain_callback = NULL; p->drain_callback_userdata = NULL; - p->die_callback = NULL; p->die_callback_userdata = NULL; - p->memblock_stat = s; + p->mempool = pool; + + p->use_shm = 0; + p->export = NULL; + p->import = NULL; pa_iochannel_socket_set_rcvbuf(io, 1024*8); pa_iochannel_socket_set_sndbuf(io, 1024*8); @@ -235,8 +270,7 @@ static void item_free(void *item, PA_GCC_UNUSED void *p) { if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) { assert(i->chunk.memblock); pa_memblock_unref(i->chunk.memblock); - } else { - assert(i->type == PA_PSTREAM_ITEM_PACKET); + } else if (i->type == PA_PSTREAM_ITEM_PACKET) { assert(i->packet); pa_packet_unref(i->packet); } @@ -265,16 +299,18 @@ static void pstream_free(pa_pstream *p) { void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) { struct item_info *i; - assert(p && packet && p->ref >= 1); + + assert(p); + assert(p->ref >= 1); + assert(packet); if (p->dead) return; -/* pa_log(__FILE__": push-packet %p", packet); */ - i = pa_xnew(struct item_info, 1); i->type = PA_PSTREAM_ITEM_PACKET; i->packet = pa_packet_ref(packet); + #ifdef HAVE_CREDS if ((i->with_creds = !!creds)) i->creds = *creds; @@ -286,13 +322,15 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) { struct item_info *i; - assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1); + + assert(p); + assert(p->ref >= 1); + assert(channel != (uint32_t) -1); + assert(chunk); if (p->dead) return; - -/* pa_log(__FILE__": push-memblock %p", chunk); */ - + i = pa_xnew(struct item_info, 1); i->type = PA_PSTREAM_ITEM_MEMBLOCK; i->chunk = *chunk; @@ -309,6 +347,52 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa p->mainloop->defer_enable(p->defer_event, 1); } +static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) { + struct item_info *item; + pa_pstream *p = userdata; + + assert(p); + assert(p->ref >= 1); + + if (p->dead) + return; + +/* pa_log(__FILE__": Releasing block %u", block_id); */ + + item = pa_xnew(struct item_info, 1); + item->type = PA_PSTREAM_ITEM_SHMRELEASE; + item->block_id = block_id; +#ifdef HAVE_CREDS + item->with_creds = 0; +#endif + + pa_queue_push(p->send_queue, item); + p->mainloop->defer_enable(p->defer_event, 1); +} + +static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) { + struct item_info *item; + pa_pstream *p = userdata; + + assert(p); + assert(p->ref >= 1); + + if (p->dead) + return; + +/* pa_log(__FILE__": Revoking block %u", block_id); */ + + item = pa_xnew(struct item_info, 1); + item->type = PA_PSTREAM_ITEM_SHMREVOKE; + item->block_id = block_id; +#ifdef HAVE_CREDS + item->with_creds = 0; +#endif + + pa_queue_push(p->send_queue, item); + p->mainloop->defer_enable(p->defer_event, 1); +} + static void prepare_next_write_item(pa_pstream *p) { assert(p); @@ -316,27 +400,77 @@ static void prepare_next_write_item(pa_pstream *p) { return; p->write.index = 0; + p->write.data = NULL; + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0; + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0; + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0; if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) { - /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/ assert(p->write.current->packet); p->write.data = p->write.current->packet->data; p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length); - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1); - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0; - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0; + } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) { + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id); + + } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) { + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id); } else { - assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock); - p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index; - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length); + uint32_t flags; + int send_payload = 1; + + assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK); + assert(p->write.current->chunk.memblock); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32)); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset)); - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode); + + flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK; + + if (p->use_shm) { + uint32_t block_id, shm_id; + size_t offset, length; + + assert(p->export); + + if (pa_memexport_put(p->export, + p->write.current->chunk.memblock, + &block_id, + &shm_id, + &offset, + &length) >= 0) { + + flags |= PA_FLAG_SHMDATA; + send_payload = 0; + + p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); + p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); + p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); + p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info)); + p->write.data = p->write.shm_info; + } +/* else */ +/* pa_log_warn(__FILE__": Failed to export memory block."); */ + } + + if (send_payload) { + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length); + p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index; + } + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags); } #ifdef HAVE_CREDS @@ -344,7 +478,6 @@ static void prepare_next_write_item(pa_pstream *p) { p->write_creds = p->write.current->creds; #endif - } static int do_write(pa_pstream *p) { @@ -359,16 +492,18 @@ static int do_write(pa_pstream *p) { if (!p->write.current) return 0; - assert(p->write.data); - if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) { d = (uint8_t*) p->write.descriptor + p->write.index; l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index; } else { + assert(p->write.data); + d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE; l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE); } + assert(l > 0); + #ifdef HAVE_CREDS if (p->send_creds_now) { @@ -384,7 +519,7 @@ static int do_write(pa_pstream *p) { p->write.index += r; - if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) { + if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) { assert(p->write.current); item_free(p->write.current, (void *) 1); p->write.current = NULL; @@ -428,27 +563,87 @@ static int do_read(pa_pstream *p) { p->read.index += r; if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) { + uint32_t flags, length, channel; /* Reading of frame descriptor complete */ - /* Frame size too large */ - if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) { - pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX); + flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); + + if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { + pa_log_warn(__FILE__": Recieved SHM frame on a socket where SHM is disabled."); + return -1; + } + + if (flags == PA_FLAG_SHMRELEASE) { + + /* This is a SHM memblock release frame with no payload */ + +/* pa_log(__FILE__": Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ + + assert(p->export); + pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + + goto frame_done; + + } else if (flags == PA_FLAG_SHMREVOKE) { + + /* This is a SHM memblock revoke frame with no payload */ + +/* pa_log(__FILE__": Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ + + assert(p->import); + pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + + goto frame_done; + } + + length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); + + if (length > FRAME_SIZE_MAX) { + pa_log_warn(__FILE__": Recieved invalid frame size : %lu", (unsigned long) length); return -1; } assert(!p->read.packet && !p->read.memblock); - if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) { + channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); + + if (channel == (uint32_t) -1) { + + if (flags != 0) { + pa_log_warn(__FILE__": Received packet frame with invalid flags value."); + return -1; + } + /* Frame is a packet frame */ - p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])); + p->read.packet = pa_packet_new(length); p->read.data = p->read.packet->data; + } else { - /* Frame is a memblock frame */ - p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat); - p->read.data = p->read.memblock->data; - if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) { - pa_log_warn(__FILE__": Invalid seek mode"); + if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) { + pa_log_warn(__FILE__": Received memblock frame with invalid seek mode."); + return -1; + } + + if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { + + if (length != sizeof(p->read.shm_info)) { + pa_log_warn(__FILE__": Recieved SHM memblock frame with Invalid frame length."); + return -1; + } + + /* Frame is a memblock frame referencing an SHM memblock */ + p->read.data = p->read.shm_info; + + } else if ((flags & PA_FLAG_SHMMASK) == 0) { + + /* Frame is a memblock frame */ + + p->read.memblock = pa_memblock_new(p->mempool, length); + p->read.data = p->read.memblock->data; + } else { + + pa_log_warn(__FILE__": Recieved memblock frame with invalid flags value."); return -1; } } @@ -456,7 +651,9 @@ static int do_read(pa_pstream *p) { } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) { /* Frame payload available */ - if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */ + if (p->read.memblock && p->recieve_memblock_callback) { + + /* Is this memblock data? Than pass it to the user */ l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r; if (l > 0) { @@ -477,13 +674,13 @@ static int do_read(pa_pstream *p) { p, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), offset, - ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]), + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, &chunk, p->recieve_memblock_callback_userdata); } /* Drop seek info for following callbacks */ - p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = + p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; } @@ -491,13 +688,13 @@ static int do_read(pa_pstream *p) { /* Frame complete */ if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) { + if (p->read.memblock) { - assert(!p->read.packet); - + + /* This was a memblock frame. We can unref the memblock now */ pa_memblock_unref(p->read.memblock); - p->read.memblock = NULL; - } else { - assert(p->read.packet); + + } else if (p->read.packet) { if (p->recieve_packet_callback) #ifdef HAVE_CREDS @@ -507,17 +704,63 @@ static int do_read(pa_pstream *p) { #endif pa_packet_unref(p->read.packet); - p->read.packet = NULL; + } else { + pa_memblock *b; + + assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); + + assert(p->import); + + if (!(b = pa_memimport_get(p->import, + ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]), + ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]), + ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]), + ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) { + + pa_log_warn(__FILE__": Failed to import memory block."); + return -1; + } + + if (p->recieve_memblock_callback) { + int64_t offset; + pa_memchunk chunk; + + chunk.memblock = b; + chunk.index = 0; + chunk.length = b->length; + + offset = (int64_t) ( + (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | + (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); + + p->recieve_memblock_callback( + p, + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), + offset, + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, + &chunk, + p->recieve_memblock_callback_userdata); + } + + pa_memblock_unref(b); } - p->read.index = 0; -#ifdef HAVE_CREDS - p->read_creds_valid = 0; -#endif + goto frame_done; } } - return 0; + return 0; + +frame_done: + p->read.memblock = NULL; + p->read.packet = NULL; + p->read.index = 0; + +#ifdef HAVE_CREDS + p->read_creds_valid = 0; +#endif + + return 0; } void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { @@ -583,6 +826,16 @@ void pa_pstream_close(pa_pstream *p) { p->dead = 1; + if (p->import) { + pa_memimport_free(p->import); + p->import = NULL; + } + + if (p->export) { + pa_memexport_free(p->export); + p->export = NULL; + } + if (p->io) { pa_iochannel_free(p->io); p->io = NULL; @@ -597,4 +850,19 @@ void pa_pstream_close(pa_pstream *p) { p->drain_callback = NULL; p->recieve_packet_callback = NULL; p->recieve_memblock_callback = NULL; + + +} + +void pa_pstream_use_shm(pa_pstream *p, int enable) { + assert(p); + assert(p->ref >= 1); + + p->use_shm = enable; + + if (!p->import) + p->import = pa_memimport_new(p->mempool, memimport_release_cb, p); + + if (!p->export) + p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p); } -- cgit