Skip to content

Commit

Permalink
[Filter] Add suspend mode.
Browse files Browse the repository at this point in the history
When there is no input within suspend timeout, unload NN framework.
Reload the framework when the input comes again.

Signed-off-by: Gichan Jang <[email protected]>
  • Loading branch information
gichan-jang authored and myungjoo committed Nov 5, 2024
1 parent 5d8d198 commit 639921e
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 8 deletions.
2 changes: 2 additions & 0 deletions gst/nnstreamer/include/nnstreamer_plugin_api_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ typedef struct _GstTensorFilterProperties
int latency; /**< The average latency over the recent 10 inferences in microseconds */
int throughput; /**< The average throughput in the number of outputs per second */
int invoke_dynamic; /**< True for supporting invoke with flexible output. */

uint32_t suspend; /**< Timeout (ms) for suspend. (Unload the framework) */
} GstTensorFilterProperties;

/**
Expand Down
97 changes: 97 additions & 0 deletions gst/nnstreamer/tensor_filter/tensor_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,51 @@ _gst_tensor_filter_transform_update_outbuf (GstBaseTransform * trans,
}
}

/*
* Called when there is no input within suspend time specified by the user.
*/
static gboolean
gst_tensor_filter_watchdog_trigger (gpointer ptr)
{
GstTensorFilter *self = (GstTensorFilter *) ptr;
GstTensorFilterPrivate *priv = &self->priv;

ml_logd ("Suspend watchdog triggered. Unload the NN framework.");
gst_tensor_filter_common_unload_fw (priv);

return FALSE;
}

/*
* Release watchdog source. Call with OBJECT_LOCK.
*/
static void
gst_tensor_filter_watchdog_release (GstTensorFilter *self)
{
GstTensorFilterPrivate *priv = &self->priv;
if (priv->source) {
g_source_destroy (priv->source);
g_source_unref (priv->source);
priv->source = NULL;
}
}

/*
* Set watchdog timeout. Call with OBJECT_LOCK.
*/
static void
gst_tensor_filter_watchdog_feed (GstTensorFilter *self)
{
GstTensorFilterPrivate *priv = &self->priv;

if (priv->main_context) {
priv->source = g_timeout_source_new (priv->prop.suspend);
g_source_set_callback (priv->source, gst_tensor_filter_watchdog_trigger,
self, NULL);
g_source_attach (priv->source, priv->main_context);
}
}

/**
* @brief non-ip transform. required vmethod of GstBaseTransform.
*/
Expand All @@ -1085,6 +1130,14 @@ gst_tensor_filter_transform (GstBaseTransform * trans,
FilterTransformData *out_trans_data = NULL;
GstTensorMemory *invoke_tensors = NULL;

/** Reset suspend timeout */
if (priv->prop.suspend != 0) {
GST_OBJECT_LOCK (self);
gst_tensor_filter_watchdog_release (self);
GST_OBJECT_UNLOCK (self);
gst_tensor_filter_common_open_fw (priv);
}

/* 0. Check all properties. */
retval = _gst_tensor_filter_transform_validate (trans, inbuf, outbuf);
if (retval != GST_FLOW_OK)
Expand Down Expand Up @@ -1151,6 +1204,13 @@ gst_tensor_filter_transform (GstBaseTransform * trans,
}

done:
/** Set suspend timeout */
if (retval == GST_FLOW_OK && priv->prop.suspend != 0) {
GST_OBJECT_LOCK (self);
gst_tensor_filter_watchdog_feed (self);
GST_OBJECT_UNLOCK (self);
}

g_free (in_trans_data);
g_free (out_trans_data);
g_free (invoke_tensors);
Expand Down Expand Up @@ -1754,6 +1814,17 @@ gst_tensor_filter_src_event (GstBaseTransform * trans, GstEvent * event)
return GST_BASE_TRANSFORM_CLASS (parent_class)->src_event (trans, event);
}

static gpointer
_gst_tensor_filter_watchdog_thread (gpointer user_data)
{
GstTensorFilter *self = (GstTensorFilter *) user_data;
GstTensorFilterPrivate *priv = &self->priv;

g_main_loop_run (priv->main_loop);

return NULL;
}

/**
* @brief Called when the element starts processing. optional vmethod of BaseTransform
* @param trans "this" pointer
Expand All @@ -1771,6 +1842,15 @@ gst_tensor_filter_start (GstBaseTransform * trans)
return FALSE;
gst_tensor_filter_common_open_fw (priv);

if (priv->prop.suspend != 0) {
GST_OBJECT_LOCK (self);
priv->main_context = g_main_context_new ();
priv->main_loop = g_main_loop_new (priv->main_context, FALSE);
priv->thread = g_thread_new ("suspend_watchdog", _gst_tensor_filter_watchdog_thread, self);
gst_tensor_filter_watchdog_feed (self);
GST_OBJECT_UNLOCK (self);
}

return priv->prop.fw_opened;
}

Expand All @@ -1786,6 +1866,23 @@ gst_tensor_filter_stop (GstBaseTransform * trans)
GstTensorFilterPrivate *priv;
self = GST_TENSOR_FILTER_CAST (trans);
priv = &self->priv;

if (priv->prop.suspend != 0) {
GST_OBJECT_LOCK (self);
gst_tensor_filter_watchdog_release (self);

g_main_loop_quit (priv->main_loop);
g_thread_join (priv->thread);
priv->thread = NULL;

g_main_loop_unref (priv->main_loop);
priv->main_loop = NULL;

g_main_context_unref (priv->main_context);
priv->main_context = NULL;
GST_OBJECT_UNLOCK (self);
}
gst_tensor_filter_common_close_fw (priv);

return TRUE;
}
51 changes: 44 additions & 7 deletions gst/nnstreamer/tensor_filter/tensor_filter_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,14 @@ gst_tensor_filter_install_properties (GObjectClass * gobject_class)
g_param_spec_string ("config-file", "Configuration-file",
"Path to configuration file which contains plugins properties", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SUSPEND,
g_param_spec_uint ("suspend",
"Timeout for unloading the framework",
"Unload the framework if there is no new data within the timeout (ms). "
"When the data arrives, load the framework and run the timer again. "
"The state of the pipeline does not change. "
"Default 0 means no suspend.", 0, G_MAXUINT32, 0,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}

/**
Expand Down Expand Up @@ -1935,6 +1943,18 @@ _gtfc_setprop_PROP_INVOKE_DYNAMIC (GstTensorFilterPrivate * priv,
return 0;
}

/**
* @brief Handle "PROP_SUSPEND" for set-property
*/
static gint
_gtfc_setprop_SUSPEND (GstTensorFilterPrivate * priv,
const GValue * value)
{
priv->prop.suspend = g_value_get_uint (value);

return 0;
}

/**
* @brief Set the properties for tensor_filter
* @param[in] priv Struct containing the properties of the object
Expand Down Expand Up @@ -2021,6 +2041,9 @@ gst_tensor_filter_common_set_property (GstTensorFilterPrivate * priv,
case PROP_INVOKE_DYNAMIC:
status = _gtfc_setprop_PROP_INVOKE_DYNAMIC (priv, value);
break;
case PROP_SUSPEND:
status = _gtfc_setprop_SUSPEND (priv, value);
break;
default:
return FALSE;
}
Expand Down Expand Up @@ -2229,6 +2252,9 @@ gst_tensor_filter_common_get_property (GstTensorFilterPrivate * priv,
case PROP_INVOKE_DYNAMIC:
g_value_set_boolean (value, prop->invoke_dynamic);
break;
case PROP_SUSPEND:
g_value_set_uint (value, prop->suspend);
break;
default:
/* unknown property */
return FALSE;
Expand Down Expand Up @@ -2468,6 +2494,7 @@ gst_tensor_filter_common_open_fw (GstTensorFilterPrivate * priv)

if (!priv->prop.fw_opened && priv->fw) {
gint64 start_time, end_time;

start_time = g_get_monotonic_time ();
if (priv->fw->open) {
/* at least one model should be configured before opening fw */
Expand Down Expand Up @@ -2512,25 +2539,35 @@ gst_tensor_filter_common_open_fw (GstTensorFilterPrivate * priv)
}

/**
* @brief Close NN framework.
* @brief Unload NN framework.
*/
void
gst_tensor_filter_common_close_fw (GstTensorFilterPrivate * priv)
gst_tensor_filter_common_unload_fw (GstTensorFilterPrivate * priv)
{
if (priv->prop.fw_opened) {
if (priv->fw && priv->fw->close) {
priv->fw->close (&priv->prop, &priv->privateData);
}
priv->prop.input_configured = priv->prop.output_configured = FALSE;
priv->prop.fw_opened = FALSE;
g_free_const (priv->prop.fwname);
priv->prop.fwname = NULL;
priv->fw = NULL;
priv->privateData = NULL;
priv->configured = FALSE;
}
}

/**
* @brief Close NN framework.
*/
void
gst_tensor_filter_common_close_fw (GstTensorFilterPrivate * priv)
{
gst_tensor_filter_common_unload_fw (priv);

priv->prop.input_configured = priv->prop.output_configured = FALSE;
g_free_const (priv->prop.fwname);
priv->prop.fwname = NULL;
priv->fw = NULL;
priv->configured = FALSE;
}

/**
* @brief return accl_hw type from string
* @param key The key string value
Expand Down
12 changes: 11 additions & 1 deletion gst/nnstreamer/tensor_filter/tensor_filter_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ enum
PROP_SHARED_TENSOR_FILTER_KEY,
PROP_LATENCY_REPORT,
PROP_INVOKE_DYNAMIC,
PROP_CONFIG
PROP_CONFIG,
PROP_SUSPEND
};

/**
Expand Down Expand Up @@ -169,6 +170,10 @@ typedef struct _GstTensorFilterPrivate
gint64 latency_reported; /**< latency value reported (ns) in last LATENCY query */

GstTensorFilterCombination combi;
GMainContext *main_context;
GMainLoop *main_loop;
GThread *thread;
GSource *source;
} GstTensorFilterPrivate;

/**
Expand Down Expand Up @@ -273,6 +278,11 @@ gst_tensor_filter_load_tensor_info (GstTensorFilterPrivate * priv);
*/
extern void gst_tensor_filter_common_open_fw (GstTensorFilterPrivate * priv);

/**
* @brief Unload NN framework.
*/
extern void gst_tensor_filter_common_unload_fw (GstTensorFilterPrivate * priv);

/**
* @brief Close NN framework.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#include <gtest/gtest.h>
#include <glib.h>
#include <gst/app/gstappsrc.h>
#include <gst/gst.h>

#include <nnstreamer_util.h>
Expand Down Expand Up @@ -367,6 +368,50 @@ TEST (nnstreamerFilterTensorFlow2Lite, manyInOutModel)
g_free (model_file);
}

/**
* @brief Test for suspend mode.
*/
TEST (nnstreamerFilterTensorFlow2Lite, suspend)
{
gchar *pipeline;
GstElement *gstpipe;
GError *err = NULL;
gchar *model_file;

ASSERT_TRUE (_GetModelFilePath (&model_file, 0));

/* create a nnstreamer pipeline */
pipeline = g_strdup_printf ("appsrc name=srcx ! application/octet-stream ! tensor_converter input-dim=3:224:224 input-type=uint8 ! tensor_filter suspend=2000 framework=tensorflow2-lite model=\"%s\" ! tensor_sink name=sink async=false",
model_file);

gstpipe = gst_parse_launch (pipeline, &err);
ASSERT_TRUE (gstpipe != nullptr);

GstElement *src_handle = gst_bin_get_by_name (GST_BIN (gstpipe), "srcx");
ASSERT_TRUE (src_handle != nullptr);
EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);

GstBuffer *buf = gst_buffer_new ();
GstMemory *mem = gst_allocator_alloc (NULL, 3 * 224 * 224, NULL);
gst_buffer_append_memory (buf, mem);

buf = gst_buffer_ref (buf);
EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (src_handle), buf), GST_FLOW_OK);

/** Wait for unloading the framework. */
g_usleep (5000000);

EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (src_handle), buf), GST_FLOW_OK);
g_usleep (1000000);

EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);

gst_object_unref (src_handle);
gst_object_unref (gstpipe);
g_free (pipeline);
g_free (model_file);
}

/**
* @brief Main gtest
*/
Expand Down

0 comments on commit 639921e

Please sign in to comment.