From 75b7b0fb1dda20049d8840a1319868f07d2f04be Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Thu, 30 Jul 2009 23:14:46 +0200 Subject: pulse: split up handling of outstanding structs into part where the lock must be taken and where not --- src/pulse.c | 213 ++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 122 insertions(+), 91 deletions(-) diff --git a/src/pulse.c b/src/pulse.c index 958fcf0..2a61c8f 100644 --- a/src/pulse.c +++ b/src/pulse.c @@ -61,7 +61,8 @@ struct outstanding { void *userdata; ca_sound_file *file; int error; - ca_bool_t clean_up; + unsigned clean_up:1; /* Handler needs to clean up the outstanding struct */ + unsigned finished:1; /* finished playing */ }; struct private { @@ -80,18 +81,25 @@ struct private { static void context_state_cb(pa_context *pc, void *userdata); static void context_subscribe_cb(pa_context *pc, pa_subscription_event_type_t t, uint32_t idx, void *userdata); -static void outstanding_free(struct outstanding *o) { +static void outstanding_disconnect(struct outstanding *o) { ca_assert(o); - if (o->file) - ca_sound_file_close(o->file); - if (o->stream) { pa_stream_set_write_callback(o->stream, NULL, NULL); pa_stream_set_state_callback(o->stream, NULL, NULL); pa_stream_disconnect(o->stream); pa_stream_unref(o->stream); + o->stream = NULL; } +} + +static void outstanding_free(struct outstanding *o) { + ca_assert(o); + + outstanding_disconnect(o); + + if (o->file) + ca_sound_file_close(o->file); ca_free(o); } @@ -245,7 +253,6 @@ static void context_state_cb(pa_context *pc, void *userdata) { p = PRIVATE(c); state = pa_context_get_state(pc); - if (state == PA_CONTEXT_FAILED || state == PA_CONTEXT_TERMINATED) { struct outstanding *out; int ret; @@ -259,7 +266,9 @@ static void context_state_cb(pa_context *pc, void *userdata) { while ((out = p->outstanding)) { + outstanding_disconnect(out); CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); + ca_mutex_unlock(p->outstanding_mutex); if (out->callback) @@ -312,10 +321,12 @@ static void context_subscribe_cb(pa_context *pc, pa_subscription_event_type_t t, for (out = p->outstanding; out; out = n) { n = out->next; - if (out->type != OUTSTANDING_SAMPLE || out->sink_input != idx) + if (!out->clean_up ||out->type != OUTSTANDING_SAMPLE || out->sink_input != idx) continue; + outstanding_disconnect(out); CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); + CA_LLIST_PREPEND(struct outstanding, l, out); } @@ -550,30 +561,46 @@ static void play_sample_cb(pa_context *c, uint32_t idx, void *userdata) { static void stream_state_cb(pa_stream *s, void *userdata) { struct private *p; struct outstanding *out = userdata; + pa_stream_state_t state; ca_assert(s); ca_assert(out); p = PRIVATE(out->context); - if (out->clean_up) { - pa_stream_state_t state; + state = pa_stream_get_state(s); - state = pa_stream_get_state(s); + switch (state) { + case PA_STREAM_CREATING: + case PA_STREAM_UNCONNECTED: + break; - if (state == PA_STREAM_FAILED || state == PA_STREAM_TERMINATED) { - int err; + case PA_STREAM_READY: + out->sink_input = pa_stream_get_index(out->stream); + break; - ca_mutex_lock(p->outstanding_mutex); - CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); - ca_mutex_unlock(p->outstanding_mutex); + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: { + int err; err = state == PA_STREAM_FAILED ? translate_error(pa_context_errno(pa_stream_get_context(s))) : CA_ERROR_DESTROYED; - if (out->callback) - out->callback(out->context, out->id, err, out->userdata); + if (out->clean_up) { + ca_mutex_lock(p->outstanding_mutex); + outstanding_disconnect(out); + CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); + ca_mutex_unlock(p->outstanding_mutex); - outstanding_free(out); + if (out->callback) + out->callback(out->context, out->id, out->error, out->userdata); + + outstanding_free(out); + } else { + out->finished = TRUE; + out->error = err; + } + + break; } } @@ -583,27 +610,33 @@ static void stream_state_cb(pa_stream *s, void *userdata) { static void stream_drain_cb(pa_stream *s, int success, void *userdata) { struct private *p; struct outstanding *out = userdata; + int err; ca_assert(s); ca_assert(out); + ca_assert(out->type == OUTSTANDING_STREAM); p = PRIVATE(out->context); + err = success ? CA_SUCCESS : translate_error(pa_context_errno(p->context)); - ca_assert(out->type == OUTSTANDING_STREAM); - ca_assert(out->clean_up); + if (out->clean_up) { + ca_mutex_lock(p->outstanding_mutex); + outstanding_disconnect(out); + CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); + ca_mutex_unlock(p->outstanding_mutex); - ca_mutex_lock(p->outstanding_mutex); - CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); - ca_mutex_unlock(p->outstanding_mutex); + if (out->callback) + out->callback(out->context, out->id, err, out->userdata); - if (out->callback) { - int err; + outstanding_free(out); - err = success ? CA_SUCCESS : translate_error(pa_context_errno(p->context)); - out->callback(out->context, out->id, err, out->userdata); + } else { + pa_stream_disconnect(s); + out->error = err; + out->finished = TRUE; } - outstanding_free(out); + pa_threaded_mainloop_signal(p->mainloop, FALSE); } static void stream_write_cb(pa_stream *s, size_t bytes, void *userdata) { @@ -686,6 +719,7 @@ finish: if (out->clean_up) { ca_mutex_lock(p->outstanding_mutex); + outstanding_disconnect(out); CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); ca_mutex_unlock(p->outstanding_mutex); @@ -693,11 +727,14 @@ finish: out->callback(out->context, out->id, ret, out->userdata); outstanding_free(out); + } else { pa_stream_disconnect(s); - pa_threaded_mainloop_signal(p->mainloop, FALSE); out->error = ret; + out->finished = TRUE; } + + pa_threaded_mainloop_signal(p->mainloop, FALSE); } static const pa_sample_format_t sample_type_table[] = { @@ -779,7 +816,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal if (!(out = ca_new0(struct outstanding, 1))) { ret = CA_ERROR_OOM; - goto finish; + goto finish_unlocked; } out->type = OUTSTANDING_SAMPLE; @@ -790,12 +827,12 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal out->userdata = userdata; if ((ret = convert_proplist(&l, proplist)) < 0) - goto finish; + goto finish_unlocked; if ((n = pa_proplist_gets(l, CA_PROP_EVENT_ID))) if (!(name = ca_strdup(n))) { ret = CA_ERROR_OOM; - goto finish; + goto finish_unlocked; } if ((vol = pa_proplist_gets(l, CA_PROP_CANBERRA_VOLUME))) { @@ -806,7 +843,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal dvol = strtod(vol, &e); if (errno != 0 || !e || *e) { ret = CA_ERROR_INVALID; - goto finish; + goto finish_unlocked; } v = pa_sw_volume_from_dB(dvol); @@ -816,7 +853,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal if ((ct = pa_proplist_gets(l, CA_PROP_CANBERRA_CACHE_CONTROL))) if ((ret = ca_parse_cache_control(&cache_control, ct)) < 0) { ret = CA_ERROR_INVALID; - goto finish; + goto finish_unlocked; } if ((channel = pa_proplist_gets(l, CA_PROP_CANBERRA_FORCE_CHANNEL))) { @@ -825,7 +862,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal if (!pa_channel_map_parse(&t, channel) || t.channels != 1) { ret = CA_ERROR_INVALID; - goto finish; + goto finish_unlocked; } position = t.map[0]; @@ -834,7 +871,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal * shall be used */ if (cache_control != CA_CACHE_CONTROL_NEVER) { ret = CA_ERROR_NOTSUPPORTED; - goto finish; + goto finish_unlocked; } } @@ -842,7 +879,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal add_common(l); if ((ret = subscribe(c)) < 0) - goto finish; + goto finish_unlocked; if (name && cache_control != CA_CACHE_CONTROL_NEVER) { @@ -855,15 +892,13 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal if (!p->context) { ret = CA_ERROR_STATE; - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } /* Let's try to play the sample */ if (!(o = pa_context_play_sample_with_proplist(p->context, name, c->device, v, l, play_sample_cb, out))) { ret = translate_error(pa_context_errno(p->context)); - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } for (;;) { @@ -887,13 +922,13 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal /* The operation might have been canceled due to connection termination */ if (canceled) { ret = CA_ERROR_DISCONNECTED; - goto finish; + goto finish_unlocked; } /* Did we manage to play the sample or did some other error occur? */ if (out->error != CA_ERROR_NOTFOUND) { ret = out->error; - goto finish; + goto finish_unlocked; } /* Hmm, we need to play it directly */ @@ -906,7 +941,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal /* Let's upload the sample and retry playing */ if ((ret = driver_cache(c, proplist)) < 0) - goto finish; + goto finish_unlocked; } } @@ -914,7 +949,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal /* Let's stream the sample directly */ if ((ret = ca_lookup_sound(&out->file, &sp, &p->theme, c->props, proplist)) < 0) - goto finish; + goto finish_unlocked; if (sp) if (!pa_proplist_contains(l, CA_PROP_MEDIA_FILENAME)) @@ -950,14 +985,12 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal if (!p->context) { ret = CA_ERROR_STATE; - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } if (!(out->stream = pa_stream_new_with_proplist(p->context, name, &ss, cm_good ? &cm : NULL, l))) { ret = translate_error(pa_context_errno(p->context)); - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } pa_stream_set_state_callback(out->stream, stream_state_cb, out); @@ -975,8 +1008,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal | (position != PA_CHANNEL_POSITION_INVALID ? PA_STREAM_NO_REMIX_CHANNELS : 0) , volume_set ? &cvol : NULL, NULL) < 0) { ret = translate_error(pa_context_errno(p->context)); - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } for (;;) { @@ -984,8 +1016,7 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal if (!p->context) { ret = CA_ERROR_STATE; - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } /* Stream sucessfully created */ @@ -995,33 +1026,24 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal /* Check for failure */ if (state == PA_STREAM_FAILED) { ret = translate_error(pa_context_errno(p->context)); - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } + /* Prematurely ended */ if (state == PA_STREAM_TERMINATED) { ret = out->error; - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } pa_threaded_mainloop_wait(p->mainloop); } - if ((out->sink_input = pa_stream_get_index(out->stream)) == PA_INVALID_INDEX) { - ret = translate_error(pa_context_errno(p->context)); - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; - } - - pa_threaded_mainloop_unlock(p->mainloop); - ret = CA_SUCCESS; -finish: +finish_locked: - /* We keep the outstanding struct around if we need clean up later to */ - if (ret == CA_SUCCESS) { + /* We keep the outstanding struct around to clean up later if the sound din't finish yet*/ + if (ret == CA_SUCCESS && !out->finished) { out->clean_up = TRUE; ca_mutex_lock(p->outstanding_mutex); @@ -1030,6 +1052,15 @@ finish: } else outstanding_free(out); + out = NULL; + + pa_threaded_mainloop_unlock(p->mainloop); + +finish_unlocked: + + if (out) + outstanding_free(out); + if (l) pa_proplist_free(l); @@ -1087,6 +1118,7 @@ int driver_cancel(ca_context *c, uint32_t id) { if (out->callback) out->callback(c, out->id, CA_ERROR_CANCELED, out->userdata); + outstanding_disconnect(out); CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); outstanding_free(out); } @@ -1121,7 +1153,7 @@ int driver_cache(ca_context *c, ca_proplist *proplist) { if (!(out = ca_new0(struct outstanding, 1))) { ret = CA_ERROR_OOM; - goto finish; + goto finish_unlocked; } out->type = OUTSTANDING_UPLOAD; @@ -1129,32 +1161,32 @@ int driver_cache(ca_context *c, ca_proplist *proplist) { out->sink_input = PA_INVALID_INDEX; if ((ret = convert_proplist(&l, proplist)) < 0) - goto finish; + goto finish_unlocked; if (!(n = pa_proplist_gets(l, CA_PROP_EVENT_ID))) { ret = CA_ERROR_INVALID; - goto finish; + goto finish_unlocked; } if (!(name = ca_strdup(n))) { ret = CA_ERROR_OOM; - goto finish; + goto finish_unlocked; } if ((ct = pa_proplist_gets(l, CA_PROP_CANBERRA_CACHE_CONTROL))) if ((ret = ca_parse_cache_control(&cache_control, ct)) < 0) { ret = CA_ERROR_INVALID; - goto finish; + goto finish_unlocked; } if (cache_control != CA_CACHE_CONTROL_PERMANENT) { ret = CA_ERROR_INVALID; - goto finish; + goto finish_unlocked; } if ((ct = pa_proplist_gets(l, CA_PROP_CANBERRA_FORCE_CHANNEL))) { ret = CA_ERROR_NOTSUPPORTED; - goto finish; + goto finish_unlocked; } strip_prefix(l, "canberra."); @@ -1164,7 +1196,7 @@ int driver_cache(ca_context *c, ca_proplist *proplist) { /* Let's stream the sample directly */ if ((ret = ca_lookup_sound(&out->file, &sp, &p->theme, c->props, proplist)) < 0) - goto finish; + goto finish_unlocked; if (sp) if (!pa_proplist_contains(l, CA_PROP_MEDIA_FILENAME)) @@ -1181,16 +1213,13 @@ int driver_cache(ca_context *c, ca_proplist *proplist) { pa_threaded_mainloop_lock(p->mainloop); if (!p->context) { - pa_threaded_mainloop_unlock(p->mainloop); ret = CA_ERROR_STATE; - goto finish; + goto finish_locked; } if (!(out->stream = pa_stream_new_with_proplist(p->context, name, &ss, cm_good ? &cm : NULL, l))) { ret = translate_error(pa_context_errno(p->context)); - - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } pa_stream_set_state_callback(out->stream, stream_state_cb, out); @@ -1198,8 +1227,7 @@ int driver_cache(ca_context *c, ca_proplist *proplist) { if (pa_stream_connect_upload(out->stream, (size_t) ca_sound_file_get_size(out->file)) < 0) { ret = translate_error(pa_context_errno(p->context)); - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } for (;;) { @@ -1207,8 +1235,7 @@ int driver_cache(ca_context *c, ca_proplist *proplist) { if (!p->context) { ret = CA_ERROR_STATE; - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } /* Stream sucessfully created and uploaded */ @@ -1218,20 +1245,24 @@ int driver_cache(ca_context *c, ca_proplist *proplist) { /* Check for failure */ if (state == PA_STREAM_FAILED) { ret = translate_error(pa_context_errno(p->context)); - pa_threaded_mainloop_unlock(p->mainloop); - goto finish; + goto finish_locked; } pa_threaded_mainloop_wait(p->mainloop); } - pa_threaded_mainloop_unlock(p->mainloop); - ret = CA_SUCCESS; -finish: - +finish_locked: outstanding_free(out); + out = NULL; + + pa_threaded_mainloop_unlock(p->mainloop); + +finish_unlocked: + + if (out) + outstanding_free(out); if (l) pa_proplist_free(l); -- cgit