summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJohan Hedberg <johan.hedberg@nokia.com>2007-08-31 10:25:18 +0000
committerJohan Hedberg <johan.hedberg@nokia.com>2007-08-31 10:25:18 +0000
commitfc0d501d82773718d0f2d040f786136332c39813 (patch)
tree44b5cc14b72a8fd393f1c67917a7d10c9985f1a9
parent03490d81bbe11641c47cd8e7e0c64a45db4f80f4 (diff)
Cleanup and fixes of stream handling
-rw-r--r--audio/a2dp.c88
-rw-r--r--audio/a2dp.h8
-rw-r--r--audio/avdtp.c117
-rw-r--r--audio/avdtp.h2
-rw-r--r--audio/headset.c4
-rw-r--r--audio/headset.h4
-rw-r--r--audio/sink.c8
-rw-r--r--audio/unix.c71
8 files changed, 146 insertions, 156 deletions
diff --git a/audio/a2dp.c b/audio/a2dp.c
index 689e9da6..fa8d1175 100644
--- a/audio/a2dp.c
+++ b/audio/a2dp.c
@@ -58,7 +58,6 @@ struct a2dp_sep {
struct avdtp_local_sep *sep;
struct avdtp *session;
struct avdtp_stream *stream;
- struct device *used_by;
guint suspend_timer;
gboolean locked;
gboolean suspending;
@@ -73,7 +72,6 @@ struct a2dp_stream_cb {
struct a2dp_stream_setup {
struct avdtp *session;
- struct device *dev;
struct a2dp_sep *sep;
struct avdtp_stream *stream;
struct avdtp_service_capability *media_codec;
@@ -102,16 +100,23 @@ static void stream_setup_free(struct a2dp_stream_setup *s)
g_free(s);
}
+static struct device *a2dp_get_dev(struct avdtp *session)
+{
+ bdaddr_t addr;
+
+ avdtp_get_peers(session, NULL, &addr);
+
+ return manager_device_connected(&addr, A2DP_SOURCE_UUID);
+}
+
static void setup_callback(struct a2dp_stream_cb *cb,
struct a2dp_stream_setup *s)
{
- cb->cb(s->session, s->dev, s->stream, cb->user_data);
+ cb->cb(s->session, s->sep, s->stream, cb->user_data);
}
static gboolean finalize_stream_setup(struct a2dp_stream_setup *s)
{
- if (!s->stream && s->sep)
- s->sep->used_by = NULL;
g_slist_foreach(s->cb, (GFunc) setup_callback, s);
stream_setup_free(s);
return FALSE;
@@ -137,8 +142,9 @@ static struct a2dp_stream_setup *find_setup_by_dev(struct device *dev)
for (l = setups; l != NULL; l = l->next) {
struct a2dp_stream_setup *setup = l->data;
+ struct device *setup_dev = a2dp_get_dev(setup->session);
- if (setup->dev == dev)
+ if (setup_dev == dev)
return setup;
}
@@ -365,16 +371,13 @@ static gboolean setconf_ind(struct avdtp *session,
{
struct a2dp_sep *a2dp_sep = user_data;
struct device *dev;
- bdaddr_t addr;
if (a2dp_sep->type == AVDTP_SEP_TYPE_SINK)
debug("SBC Sink: Set_Configuration_Ind");
else
debug("SBC Source: Set_Configuration_Ind");
- avdtp_get_peers(session, NULL, &addr);
-
- dev = manager_device_connected(&addr, A2DP_SOURCE_UUID);
+ dev = a2dp_get_dev(session);
if (!dev) {
*err = AVDTP_UNSUPPORTED_CONFIGURATION;
*category = 0x00;
@@ -383,7 +386,6 @@ static gboolean setconf_ind(struct avdtp *session,
avdtp_stream_add_cb(session, stream, stream_state_changed, a2dp_sep);
a2dp_sep->stream = stream;
- a2dp_sep->used_by = dev;
if (a2dp_sep->type == AVDTP_SEP_TYPE_SOURCE)
sink_new_stream(dev, session, stream);
@@ -452,6 +454,7 @@ static void setconf_cfm(struct avdtp *session, struct avdtp_local_sep *sep,
{
struct a2dp_sep *a2dp_sep = user_data;
struct a2dp_stream_setup *setup;
+ struct device *dev;
int ret;
if (a2dp_sep->type == AVDTP_SEP_TYPE_SINK)
@@ -473,8 +476,10 @@ static void setconf_cfm(struct avdtp *session, struct avdtp_local_sep *sep,
if (!setup)
return;
+ dev = a2dp_get_dev(session);
+
/* Notify sink.c of the new stream */
- sink_new_stream(setup->dev, session, setup->stream);
+ sink_new_stream(dev, session, setup->stream);
ret = avdtp_open(session, stream);
if (ret < 0) {
@@ -728,7 +733,7 @@ static gboolean abort_ind(struct avdtp *session, struct avdtp_local_sep *sep,
else
debug("SBC Source: Abort_Ind");
- a2dp_sep->used_by = NULL;
+ a2dp_sep->stream = NULL;
return TRUE;
}
@@ -1021,7 +1026,6 @@ gboolean a2dp_source_cancel_stream(struct device *dev, unsigned int id)
if (!setup->cb) {
setup->canceled = TRUE;
- setup->sep->used_by = NULL;
setup->sep = NULL;
}
@@ -1029,11 +1033,9 @@ gboolean a2dp_source_cancel_stream(struct device *dev, unsigned int id)
}
unsigned int a2dp_source_request_stream(struct avdtp *session,
- struct device *dev,
gboolean start,
a2dp_stream_cb_t cb,
void *user_data,
- struct a2dp_sep **ret,
struct avdtp_service_capability *media_codec)
{
struct a2dp_stream_cb *cb_data;
@@ -1048,7 +1050,7 @@ unsigned int a2dp_source_request_stream(struct avdtp *session,
if (tmp->locked)
continue;
- if (tmp->used_by == NULL || tmp->used_by == dev) {
+ if (!tmp->stream || avdtp_has_stream(session, tmp->stream)) {
sep = tmp;
break;
}
@@ -1061,8 +1063,6 @@ unsigned int a2dp_source_request_stream(struct avdtp *session,
setup = find_setup_by_session(session);
- sep->used_by = dev;
-
debug("a2dp_source_request_stream: selected SEP %p", sep);
cb_data = g_new(struct a2dp_stream_cb, 1);
@@ -1082,7 +1082,6 @@ unsigned int a2dp_source_request_stream(struct avdtp *session,
setup = g_new0(struct a2dp_stream_setup, 1);
setup->session = avdtp_ref(session);
setup->sep = sep;
- setup->dev = dev;
setup->cb = g_slist_append(setup->cb, cb_data);
setup->start = start;
setup->stream = sep->stream;
@@ -1135,9 +1134,6 @@ unsigned int a2dp_source_request_stream(struct avdtp *session,
goto failed;
}
- if (ret)
- *ret = sep;
-
setups = g_slist_append(setups, setup);
return cb_data->id;
@@ -1148,52 +1144,24 @@ failed:
return 0;
}
-gboolean a2dp_source_lock(struct device *dev, struct avdtp *session)
+gboolean a2dp_sep_lock(struct a2dp_sep *sep, struct avdtp *session)
{
- GSList *l;
-
- for (l = sources; l != NULL; l = l->next) {
- struct a2dp_sep *sep = l->data;
-
- if (sep->locked)
- continue;
-
- if (sep->used_by != dev)
- continue;
+ if (sep->locked)
+ return FALSE;
- debug("SBC Source SEP %p locked", sep);
- sep->locked = TRUE;
- return TRUE;
- }
+ debug("SBC Source SEP %p locked", sep);
+ sep->locked = TRUE;
- return FALSE;
+ return TRUE;
}
-gboolean a2dp_source_unlock(struct device *dev, struct avdtp *session)
+gboolean a2dp_sep_unlock(struct a2dp_sep *sep, struct avdtp *session)
{
avdtp_state_t state;
- GSList *l;
- struct a2dp_sep *sep = NULL;
-
- for (l = sources; l != NULL; l = l->next) {
- struct a2dp_sep *tmp = l->data;
-
- if (!tmp->locked)
- continue;
-
- if (tmp->sep && tmp->used_by == dev) {
- sep = tmp;
- break;
- }
- }
-
- if (!sep)
- return FALSE;
state = avdtp_sep_get_state(sep->sep);
sep->locked = FALSE;
- sep->used_by = NULL;
debug("SBC Source SEP %p unlocked", sep);
@@ -1224,7 +1192,7 @@ gboolean a2dp_source_suspend(struct device *dev, struct avdtp *session)
for (l = sources; l != NULL; l = l->next) {
struct a2dp_sep *tmp = l->data;
- if (tmp->sep && tmp->used_by == dev) {
+ if (tmp->session && tmp->session == session) {
sep = tmp;
break;
}
@@ -1255,7 +1223,7 @@ gboolean a2dp_source_start_stream(struct device *dev, struct avdtp *session)
for (l = sources; l != NULL; l = l->next) {
struct a2dp_sep *tmp = l->data;
- if (tmp->sep && tmp->used_by == dev) {
+ if (tmp->session && tmp->session == session) {
sep = tmp;
break;
}
diff --git a/audio/a2dp.h b/audio/a2dp.h
index ede0c70b..6a966302 100644
--- a/audio/a2dp.h
+++ b/audio/a2dp.h
@@ -60,7 +60,7 @@ struct sbc_codec_cap {
struct a2dp_sep;
-typedef void (*a2dp_stream_cb_t) (struct avdtp *session, struct device *dev,
+typedef void (*a2dp_stream_cb_t) (struct avdtp *session, struct a2dp_sep *sep,
struct avdtp_stream *stream,
void *user_data);
@@ -68,14 +68,12 @@ int a2dp_init(DBusConnection *conn, int sources, int sinks);
void a2dp_exit(void);
unsigned int a2dp_source_request_stream(struct avdtp *session,
- struct device *dev,
gboolean start, a2dp_stream_cb_t cb,
void *user_data,
- struct a2dp_sep **sep,
struct avdtp_service_capability *media_codec);
gboolean a2dp_source_cancel_stream(struct device *dev, unsigned int id);
-gboolean a2dp_source_lock(struct device *dev, struct avdtp *session);
-gboolean a2dp_source_unlock(struct device *dev, struct avdtp *session);
+gboolean a2dp_sep_lock(struct a2dp_sep *sep, struct avdtp *session);
+gboolean a2dp_sep_unlock(struct a2dp_sep *sep, struct avdtp *session);
gboolean a2dp_source_suspend(struct device *dev, struct avdtp *session);
gboolean a2dp_source_start_stream(struct device *dev, struct avdtp *session);
diff --git a/audio/avdtp.c b/audio/avdtp.c
index d03afbae..a50b82b3 100644
--- a/audio/avdtp.c
+++ b/audio/avdtp.c
@@ -387,6 +387,9 @@ static gboolean avdtp_parse_rej(struct avdtp *session,
struct avdtp_header *header, int size);
static int process_queue(struct avdtp *session);
static void connection_lost(struct avdtp *session, int err);
+static void avdtp_sep_set_state(struct avdtp *session,
+ struct avdtp_local_sep *sep,
+ avdtp_state_t state);
static const char *avdtp_statestr(avdtp_state_t state)
{
@@ -572,6 +575,60 @@ static gboolean stream_timeout(struct avdtp_stream *stream)
return FALSE;
}
+static gboolean transport_cb(GIOChannel *chan, GIOCondition cond,
+ gpointer data)
+{
+ struct avdtp_stream *stream = data;
+ struct avdtp_local_sep *sep = stream->lsep;
+
+ if (stream->close_int && sep->cfm && sep->cfm->close)
+ sep->cfm->close(stream->session, sep, stream, NULL,
+ sep->user_data);
+
+ stream->io = 0;
+
+ avdtp_sep_set_state(stream->session, sep, AVDTP_STATE_IDLE);
+
+ return FALSE;
+}
+
+static void handle_transport_connect(struct avdtp *session, int sock,
+ uint16_t mtu)
+{
+ struct avdtp_stream *stream = session->pending_open;
+ struct avdtp_local_sep *sep = stream->lsep;
+ GIOChannel *channel;
+
+ session->pending_open = NULL;
+
+ if (stream->timer) {
+ g_source_remove(stream->timer);
+ stream->timer = 0;
+ }
+
+ if (sock < 0) {
+ if (!stream->open_acp && sep->cfm && sep->cfm->open) {
+ struct avdtp_error err;
+ avdtp_error_init(&err, AVDTP_ERROR_ERRNO, EIO);
+ sep->cfm->open(session, sep, NULL, &err,
+ sep->user_data);
+ }
+ return;
+ }
+
+ stream->sock = sock;
+ stream->mtu = mtu;
+
+ if (!stream->open_acp && sep->cfm && sep->cfm->open)
+ sep->cfm->open(session, sep, stream, NULL, sep->user_data);
+
+ channel = g_io_channel_unix_new(stream->sock);
+
+ stream->io = g_io_add_watch(channel, G_IO_ERR | G_IO_HUP | G_IO_NVAL,
+ (GIOFunc) transport_cb, stream);
+ g_io_channel_unref(channel);
+}
+
static void avdtp_sep_set_state(struct avdtp *session,
struct avdtp_local_sep *sep,
avdtp_state_t state)
@@ -623,6 +680,8 @@ static void avdtp_sep_set_state(struct avdtp *session,
stream->idle_timer = 0;
}
session->streams = g_slist_remove(session->streams, stream);
+ if (session->pending_open == stream)
+ handle_transport_connect(session, -1, 0);
stream_free(stream);
if (session->ref == 1 && !session->streams)
set_disconnect_timer(session);
@@ -1311,50 +1370,6 @@ static gboolean avdtp_parse_cmd(struct avdtp *session,
}
}
-static gboolean transport_cb(GIOChannel *chan, GIOCondition cond,
- gpointer data)
-{
- struct avdtp_stream *stream = data;
- struct avdtp_local_sep *sep = stream->lsep;
-
- if (stream->close_int && sep->cfm && sep->cfm->close)
- sep->cfm->close(stream->session, sep, stream, NULL,
- sep->user_data);
-
- stream->io = 0;
-
- avdtp_sep_set_state(stream->session, sep, AVDTP_STATE_IDLE);
-
- return FALSE;
-}
-
-static void handle_transport_connect(struct avdtp *session, int sock,
- uint16_t mtu)
-{
- struct avdtp_stream *stream = session->pending_open;
- struct avdtp_local_sep *sep = stream->lsep;
- GIOChannel *channel;
-
- session->pending_open = NULL;
-
- if (stream->timer) {
- g_source_remove(stream->timer);
- stream->timer = 0;
- }
-
- stream->sock = sock;
- stream->mtu = mtu;
-
- if (!stream->open_acp && sep->cfm && sep->cfm->open)
- sep->cfm->open(session, sep, stream, NULL, sep->user_data);
-
- channel = g_io_channel_unix_new(stream->sock);
-
- stream->io = g_io_add_watch(channel, G_IO_ERR | G_IO_HUP | G_IO_NVAL,
- (GIOFunc) transport_cb, stream);
- g_io_channel_unref(channel);
-}
-
static void init_request(struct avdtp_header *header, int request_id)
{
static int transaction = 0;
@@ -1488,12 +1503,13 @@ static gboolean l2cap_connect_cb(GIOChannel *chan, GIOCondition cond,
return FALSE;
}
+ sk = g_io_channel_unix_get_fd(chan);
+
if (cond & (G_IO_ERR | G_IO_HUP)) {
err = EIO;
goto failed;
}
- sk = g_io_channel_unix_get_fd(chan);
len = sizeof(ret);
if (getsockopt(sk, SOL_SOCKET, SO_ERROR, &ret, &len) < 0) {
err = errno;
@@ -1533,12 +1549,18 @@ static gboolean l2cap_connect_cb(GIOChannel *chan, GIOCondition cond,
}
else if (session->pending_open)
handle_transport_connect(session, sk, l2o.imtu);
+ else {
+ err = -EIO;
+ goto failed;
+ }
process_queue(session);
return FALSE;
failed:
+ close(sk);
+
if (session->pending_open) {
avdtp_sep_set_state(session, session->pending_open->lsep,
AVDTP_STATE_IDLE);
@@ -2897,3 +2919,8 @@ void avdtp_exit(void)
g_io_channel_unref(avdtp_server);
avdtp_server = NULL;
}
+
+gboolean avdtp_has_stream(struct avdtp *session, struct avdtp_stream *stream)
+{
+ return g_slist_find(session->streams, stream) ? TRUE : FALSE;
+}
diff --git a/audio/avdtp.h b/audio/avdtp.h
index d429b435..d8408ac0 100644
--- a/audio/avdtp.h
+++ b/audio/avdtp.h
@@ -179,6 +179,8 @@ struct avdtp_service_capability *avdtp_get_codec(struct avdtp_remote_sep *sep);
int avdtp_discover(struct avdtp *session, avdtp_discover_cb_t cb,
void *user_data);
+gboolean avdtp_has_stream(struct avdtp *session, struct avdtp_stream *stream);
+
unsigned int avdtp_stream_add_cb(struct avdtp *session,
struct avdtp_stream *stream,
avdtp_stream_state_cb cb, void *data);
diff --git a/audio/headset.c b/audio/headset.c
index 5a626c17..25a5ebc6 100644
--- a/audio/headset.c
+++ b/audio/headset.c
@@ -1604,7 +1604,7 @@ gboolean headset_is_active(struct device *dev)
return FALSE;
}
-gboolean headset_lock(struct device *dev, void *data)
+gboolean headset_lock(struct device *dev)
{
struct headset *hs = dev->headset;
@@ -1616,7 +1616,7 @@ gboolean headset_lock(struct device *dev, void *data)
return TRUE;
}
-gboolean headset_unlock(struct device *dev, void *data)
+gboolean headset_unlock(struct device *dev)
{
struct headset *hs = dev->headset;
diff --git a/audio/headset.h b/audio/headset.h
index 3f762815..6824e0ce 100644
--- a/audio/headset.h
+++ b/audio/headset.h
@@ -74,7 +74,7 @@ int headset_get_sco_fd(struct device *dev);
gboolean headset_is_active(struct device *dev);
-gboolean headset_lock(struct device *dev, void *data);
-gboolean headset_unlock(struct device *dev, void *data);
+gboolean headset_lock(struct device *dev);
+gboolean headset_unlock(struct device *dev);
gboolean headset_suspend(struct device *dev, void *data);
gboolean headset_play(struct device *dev, void *data);
diff --git a/audio/sink.c b/audio/sink.c
index 285dc45c..ea95bcf7 100644
--- a/audio/sink.c
+++ b/audio/sink.c
@@ -133,11 +133,11 @@ static void stream_state_changed(struct avdtp_stream *stream,
sink->state = new_state;
}
-static void stream_setup_complete(struct avdtp *session, struct device *dev,
+static void stream_setup_complete(struct avdtp *session, struct a2dp_sep *sep,
struct avdtp_stream *stream,
void *user_data)
{
- struct sink *sink = dev->sink;
+ struct sink *sink = user_data;
struct pending_request *pending;
pending = sink->connect;
@@ -185,8 +185,8 @@ static DBusHandlerResult sink_connect(DBusConnection *conn,
pending->msg = dbus_message_ref(msg);
sink->connect = pending;
- id = a2dp_source_request_stream(sink->session, dev, FALSE,
- stream_setup_complete, pending, NULL,
+ id = a2dp_source_request_stream(sink->session, FALSE,
+ stream_setup_complete, sink,
NULL);
if (id == 0) {
pending_request_free(pending);
diff --git a/audio/unix.c b/audio/unix.c
index cee56f8a..3e8c7469 100644
--- a/audio/unix.c
+++ b/audio/unix.c
@@ -66,7 +66,6 @@ struct a2dp_data {
struct unix_client {
struct device *dev;
- struct avdtp_local_sep *sep;
struct avdtp_service_capability *media_codec;
service_type_t type;
char *interface;
@@ -77,9 +76,6 @@ struct unix_client {
int sock;
unsigned int req_id;
unsigned int cb_id;
- notify_cb_t disconnect;
- notify_cb_t suspend;
- notify_cb_t play;
gboolean (*cancel_stream) (struct device *dev, unsigned int id);
};
@@ -98,6 +94,8 @@ static void client_free(struct unix_client *client)
if (client->cb_id > 0)
avdtp_stream_remove_cb(a2dp->session, a2dp->stream,
client->cb_id);
+ if (a2dp->sep)
+ a2dp_sep_unlock(a2dp->sep, a2dp->session);
if (a2dp->session)
avdtp_unref(a2dp->session);
break;
@@ -178,7 +176,10 @@ static void stream_state_changed(struct avdtp_stream *stream,
switch (new_state) {
case AVDTP_STATE_IDLE:
- a2dp_source_unlock(client->dev, a2dp->session);
+ if (a2dp->sep) {
+ a2dp_sep_unlock(a2dp->sep, a2dp->session);
+ a2dp->sep = NULL;
+ }
client->dev = NULL;
if (a2dp->session) {
avdtp_unref(a2dp->session);
@@ -254,7 +255,7 @@ static void headset_setup_complete(struct device *dev, void *user_data)
return;
}
- headset_lock(dev, NULL);
+ headset_lock(dev);
memset(&cfg, 0, sizeof(cfg));
@@ -268,13 +269,9 @@ static void headset_setup_complete(struct device *dev, void *user_data)
fd = headset_get_sco_fd(dev);
unix_send_cfg(client->sock, &cfg, fd);
-
- client->disconnect = (notify_cb_t) headset_unlock;
- client->suspend = (notify_cb_t) headset_suspend;
- client->play = (notify_cb_t) headset_play;
}
-static void a2dp_setup_complete(struct avdtp *session, struct device *dev,
+static void a2dp_setup_complete(struct avdtp *session, struct a2dp_sep *sep,
struct avdtp_stream *stream,
void *user_data)
{
@@ -294,15 +291,12 @@ static void a2dp_setup_complete(struct avdtp *session, struct device *dev,
if (!stream)
goto failed;
- if (!a2dp_source_lock(dev, session)) {
+ if (!a2dp_sep_lock(sep, session)) {
error("Unable to lock A2DP source SEP");
goto failed;
}
- client->disconnect = (notify_cb_t) a2dp_source_unlock;
- client->suspend = (notify_cb_t) a2dp_source_suspend;
- client->play = (notify_cb_t) a2dp_source_start_stream;
-
+ a2dp->sep = sep;
a2dp->stream = stream;
if (!avdtp_stream_get_transport(stream, &fd, &cfg->pkt_len, &caps)) {
@@ -376,8 +370,10 @@ static void a2dp_setup_complete(struct avdtp *session, struct device *dev,
failed:
error("stream setup failed");
- if (a2dp->stream)
- a2dp_source_unlock(dev, session);
+ if (a2dp->sep) {
+ a2dp_sep_unlock(a2dp->sep, a2dp->session);
+ a2dp->sep = NULL;
+ }
unix_send_cfg(client->sock, NULL, -1);
avdtp_unref(a2dp->session);
@@ -407,9 +403,9 @@ static void create_stream(struct device *dev, struct unix_client *client)
/* FIXME: The provided media_codec breaks bitpool
selection. So disable it. This needs fixing */
- id = a2dp_source_request_stream(a2dp->session, dev,
+ id = a2dp_source_request_stream(a2dp->session,
TRUE, a2dp_setup_complete,
- client, &a2dp->sep,
+ client,
NULL/*client->media_codec*/);
client->cancel_stream = a2dp_source_cancel_stream;
break;
@@ -621,30 +617,29 @@ static gboolean client_cb(GIOChannel *chan, GIOCondition cond, gpointer data)
struct ipc_packet *pkt = (void *) buf;
struct unix_client *client = data;
int len, len_check;
- void *cb_data;
+ struct a2dp_data *a2dp = &client->d.a2dp;
if (cond & G_IO_NVAL)
return FALSE;
- switch (client->type) {
- case TYPE_HEADSET:
- cb_data = client->d.data;
- break;
- case TYPE_SINK:
- case TYPE_SOURCE:
- cb_data = client->d.a2dp.session;
- break;
- default:
- cb_data = NULL;
- break;
- }
-
if (cond & (G_IO_HUP | G_IO_ERR)) {
debug("Unix client disconnected (fd=%d)", client->sock);
- if (!client->dev)
- goto failed;
- if (client->disconnect)
- client->disconnect(client->dev, cb_data);
+ switch (client->type) {
+ case TYPE_HEADSET:
+ if (client->dev)
+ headset_unlock(client->dev);
+ break;
+ case TYPE_SOURCE:
+ case TYPE_SINK:
+ if (a2dp->sep) {
+ a2dp_sep_unlock(a2dp->sep, a2dp->session);
+ a2dp->sep = NULL;
+ }
+ break;
+ default:
+ break;
+ }
+
if (client->cancel_stream && client->req_id > 0)
client->cancel_stream(client->dev, client->req_id);
goto failed;