diff --git a/gst/edge/edge_src.c b/gst/edge/edge_src.c index 2ed2b4b5c6..24c9e0c45a 100644 --- a/gst/edge/edge_src.c +++ b/gst/edge/edge_src.c @@ -65,6 +65,8 @@ static void gst_edgesrc_set_dest_port (GstEdgeSrc * self, static nns_edge_connect_type_e gst_edgesrc_get_connect_type (GstEdgeSrc * self); static void gst_edgesrc_set_connect_type (GstEdgeSrc * self, const nns_edge_connect_type_e connect_type); +static GstStateChangeReturn gst_edgesrc_change_state (GstElement * + element, GstStateChange transition); /** * @brief initialize the class @@ -117,6 +119,7 @@ gst_edgesrc_class_init (GstEdgeSrcClass * klass) gstbasesrc_class->start = gst_edgesrc_start; gstbasesrc_class->create = gst_edgesrc_create; + gstelement_class->change_state = gst_edgesrc_change_state; GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_EDGE_ELEM_NAME_SRC, 0, "Edge src"); @@ -138,6 +141,7 @@ gst_edgesrc_init (GstEdgeSrc * self) self->topic = NULL; self->msg_queue = g_async_queue_new (); self->connect_type = DEFAULT_CONNECT_TYPE; + self->playing = FALSE; } /** @@ -222,6 +226,7 @@ gst_edgesrc_class_finalize (GObject * object) GstEdgeSrc *self = GST_EDGESRC (object); nns_edge_data_h data_h; + self->playing = FALSE; g_free (self->dest_host); self->dest_host = NULL; @@ -243,6 +248,39 @@ gst_edgesrc_class_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } + +/** + * @brief Change state of edgesrc. + */ +static GstStateChangeReturn +gst_edgesrc_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + GstEdgeSrc *self = GST_EDGESRC (element); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + GST_INFO_OBJECT (self, "State changed from PAUSED to PLAYING."); + self->playing = TRUE; + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + { + GST_INFO_OBJECT (self, "State changed from PLAYING to PAUSED."); + self->playing = FALSE; + break; + } + default: + break; + } + return ret; +} + /** * @brief nnstreamer-edge event callback. */ @@ -269,13 +307,7 @@ _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data) } case NNS_EDGE_EVENT_CONNECTION_CLOSED: { - nns_edge_disconnect (self->edge_h); - ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port); - if (NNS_EDGE_ERROR_NONE != ret) { - nns_edge_data_h data_h; - nns_edge_data_create (&data_h); - g_async_queue_push (self->msg_queue, data_h); - } + self->playing = FALSE; break; } default: @@ -333,6 +365,7 @@ gst_edgesrc_start (GstBaseSrc * basesrc) nns_loge ("Failed to connect to edge server!"); return FALSE; } + self->playing = TRUE; return TRUE; } @@ -345,8 +378,7 @@ gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size, GstBuffer ** out_buf) { GstEdgeSrc *self = GST_EDGESRC (basesrc); - - nns_edge_data_h data_h; + nns_edge_data_h data_h = NULL; GstBuffer *buffer = NULL; guint i, num_data; int ret; @@ -354,11 +386,13 @@ gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size, UNUSED (offset); UNUSED (size); - data_h = g_async_queue_pop (self->msg_queue); + while (self->playing && !data_h) { + data_h = g_async_queue_timeout_pop (self->msg_queue, G_USEC_PER_SEC); + } if (!data_h) { nns_loge ("Failed to get message from the edgesrc message queue."); - goto done; + return GST_FLOW_ERROR; } ret = nns_edge_data_get_count (data_h, &num_data); diff --git a/gst/edge/edge_src.h b/gst/edge/edge_src.h index 8ccfce4be7..40671bcd4d 100644 --- a/gst/edge/edge_src.h +++ b/gst/edge/edge_src.h @@ -49,6 +49,8 @@ struct _GstEdgeSrc nns_edge_connect_type_e connect_type; nns_edge_h edge_h; GAsyncQueue *msg_queue; + + gboolean playing; }; /** diff --git a/gst/nnstreamer/tensor_query/tensor_query_client.c b/gst/nnstreamer/tensor_query/tensor_query_client.c index b493d33a01..46bf7c5d67 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_client.c +++ b/gst/nnstreamer/tensor_query/tensor_query_client.c @@ -359,26 +359,6 @@ gst_tensor_query_client_update_caps (GstTensorQueryClient * self, return ret; } -/** - * @brief Retry to connect to available server. - */ -static gboolean -_client_retry_connection (GstTensorQueryClient * self) -{ - if (NNS_EDGE_ERROR_NONE != nns_edge_disconnect (self->edge_h)) { - nns_loge ("Failed to retry connection, disconnection failure"); - return FALSE; - } - - if (NNS_EDGE_ERROR_NONE != nns_edge_connect (self->edge_h, - self->dest_host, self->dest_port)) { - nns_loge ("Failed to retry connection, connection failure"); - return FALSE; - } - - return TRUE; -} - /** * @brief Parse caps from received event data. */ @@ -690,8 +670,8 @@ gst_tensor_query_client_chain (GstPad * pad, g_free (val); if (NNS_EDGE_ERROR_NONE != nns_edge_send (self->edge_h, data_h)) { - nns_logw ("Failed to publish to server node, retry connection."); - goto retry; + nns_logi ("Failed to publish to server node."); + goto done; } nns_edge_data_destroy (data_h); @@ -723,13 +703,7 @@ gst_tensor_query_client_chain (GstPad * pad, res = gst_pad_push (self->srcpad, out_buf); } - goto done; -retry: - if (!self->topic || !_client_retry_connection (self)) { - nns_loge ("Failed to retry connection"); - res = GST_FLOW_ERROR; - } done: if (data_h) { nns_edge_data_destroy (data_h);