Skip to content

Commit

Permalink
[Edge] Remove re-connect from nnstreamer element
Browse files Browse the repository at this point in the history
 - Since the reconnection has been changed to be done inside the nns-edge, nns_edge_connect for reconnection is deleted from the nnstreamer element.
 - Add state change function and set `playing` prop.

Signed-off-by: gichan2-jang <[email protected]>
  • Loading branch information
gichan-jang authored and jaeyun-jung committed Nov 16, 2023
1 parent 7b96b20 commit 8d0a7d4
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 39 deletions.
56 changes: 45 additions & 11 deletions gst/edge/edge_src.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ static void gst_edgesrc_set_dest_port (GstEdgeSrc * self,
static nns_edge_connect_type_e gst_edgesrc_get_connect_type (GstEdgeSrc * self);
static void gst_edgesrc_set_connect_type (GstEdgeSrc * self,
const nns_edge_connect_type_e connect_type);
static GstStateChangeReturn gst_edgesrc_change_state (GstElement *
element, GstStateChange transition);

/**
* @brief initialize the class
Expand Down Expand Up @@ -117,6 +119,7 @@ gst_edgesrc_class_init (GstEdgeSrcClass * klass)

gstbasesrc_class->start = gst_edgesrc_start;
gstbasesrc_class->create = gst_edgesrc_create;
gstelement_class->change_state = gst_edgesrc_change_state;

GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
GST_EDGE_ELEM_NAME_SRC, 0, "Edge src");
Expand All @@ -138,6 +141,7 @@ gst_edgesrc_init (GstEdgeSrc * self)
self->topic = NULL;
self->msg_queue = g_async_queue_new ();
self->connect_type = DEFAULT_CONNECT_TYPE;
self->playing = FALSE;
}

/**
Expand Down Expand Up @@ -222,6 +226,7 @@ gst_edgesrc_class_finalize (GObject * object)
GstEdgeSrc *self = GST_EDGESRC (object);
nns_edge_data_h data_h;

self->playing = FALSE;
g_free (self->dest_host);
self->dest_host = NULL;

Expand All @@ -243,6 +248,39 @@ gst_edgesrc_class_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}


/**
* @brief Change state of edgesrc.
*/
static GstStateChangeReturn
gst_edgesrc_change_state (GstElement * element, GstStateChange transition)
{
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
GstEdgeSrc *self = GST_EDGESRC (element);

switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
GST_INFO_OBJECT (self, "State changed from PAUSED to PLAYING.");
self->playing = TRUE;
break;
default:
break;
}

ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
{
GST_INFO_OBJECT (self, "State changed from PLAYING to PAUSED.");
self->playing = FALSE;
break;
}
default:
break;
}
return ret;
}

/**
* @brief nnstreamer-edge event callback.
*/
Expand All @@ -269,13 +307,7 @@ _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
}
case NNS_EDGE_EVENT_CONNECTION_CLOSED:
{
nns_edge_disconnect (self->edge_h);
ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
if (NNS_EDGE_ERROR_NONE != ret) {
nns_edge_data_h data_h;
nns_edge_data_create (&data_h);
g_async_queue_push (self->msg_queue, data_h);
}
self->playing = FALSE;
break;
}
default:
Expand Down Expand Up @@ -333,6 +365,7 @@ gst_edgesrc_start (GstBaseSrc * basesrc)
nns_loge ("Failed to connect to edge server!");
return FALSE;
}
self->playing = TRUE;

return TRUE;
}
Expand All @@ -345,20 +378,21 @@ gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
GstBuffer ** out_buf)
{
GstEdgeSrc *self = GST_EDGESRC (basesrc);

nns_edge_data_h data_h;
nns_edge_data_h data_h = NULL;
GstBuffer *buffer = NULL;
guint i, num_data;
int ret;

UNUSED (offset);
UNUSED (size);

data_h = g_async_queue_pop (self->msg_queue);
while (self->playing && !data_h) {
data_h = g_async_queue_timeout_pop (self->msg_queue, G_USEC_PER_SEC);
}

if (!data_h) {
nns_loge ("Failed to get message from the edgesrc message queue.");
goto done;
return GST_FLOW_ERROR;
}

ret = nns_edge_data_get_count (data_h, &num_data);
Expand Down
2 changes: 2 additions & 0 deletions gst/edge/edge_src.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ struct _GstEdgeSrc
nns_edge_connect_type_e connect_type;
nns_edge_h edge_h;
GAsyncQueue *msg_queue;

gboolean playing;
};

/**
Expand Down
30 changes: 2 additions & 28 deletions gst/nnstreamer/tensor_query/tensor_query_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,26 +359,6 @@ gst_tensor_query_client_update_caps (GstTensorQueryClient * self,
return ret;
}

/**
* @brief Retry to connect to available server.
*/
static gboolean
_client_retry_connection (GstTensorQueryClient * self)
{
if (NNS_EDGE_ERROR_NONE != nns_edge_disconnect (self->edge_h)) {
nns_loge ("Failed to retry connection, disconnection failure");
return FALSE;
}

if (NNS_EDGE_ERROR_NONE != nns_edge_connect (self->edge_h,
self->dest_host, self->dest_port)) {
nns_loge ("Failed to retry connection, connection failure");
return FALSE;
}

return TRUE;
}

/**
* @brief Parse caps from received event data.
*/
Expand Down Expand Up @@ -690,8 +670,8 @@ gst_tensor_query_client_chain (GstPad * pad,
g_free (val);

if (NNS_EDGE_ERROR_NONE != nns_edge_send (self->edge_h, data_h)) {
nns_logw ("Failed to publish to server node, retry connection.");
goto retry;
nns_logi ("Failed to publish to server node.");
goto done;
}

nns_edge_data_destroy (data_h);
Expand Down Expand Up @@ -723,13 +703,7 @@ gst_tensor_query_client_chain (GstPad * pad,

res = gst_pad_push (self->srcpad, out_buf);
}
goto done;

retry:
if (!self->topic || !_client_retry_connection (self)) {
nns_loge ("Failed to retry connection");
res = GST_FLOW_ERROR;
}
done:
if (data_h) {
nns_edge_data_destroy (data_h);
Expand Down

0 comments on commit 8d0a7d4

Please sign in to comment.