Skip to content

Commit

Permalink
add thread override option
Browse files Browse the repository at this point in the history
- allows per event override of threading. Should allow for lowest possible latency handling of certain time sensitive events (e.g. speech/streaming rt data)
- default to native default option (game thread for components)
- change namespace default option to TEXT()
  • Loading branch information
getnamo committed Sep 1, 2019
1 parent 681536f commit 3d287d7
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 28 deletions.
2 changes: 1 addition & 1 deletion SocketIOClient.uplugin
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"FileVersion": 3,
"Version": 1,
"VersionName": "1.0.18",
"VersionName": "1.0.19",
"EngineVersion" : "4.22.0",
"FriendlyName": "Socket.IO Client",
"Description": "Real-time networking library Socket.IO Client usable from blueprints and c++.",
Expand Down
16 changes: 12 additions & 4 deletions Source/SocketIOClient/Private/SocketIOClientComponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,12 @@ void USocketIOClientComponent::BindEvent(const FString& EventName, const FString
}, Namespace);
}

void USocketIOClientComponent::BindEventToFunction(const FString& EventName, const FString& FunctionName, UObject* Target, const FString& Namespace /*= FString(TEXT("/"))*/, UObject* WorldContextObject /*= nullptr*/)
void USocketIOClientComponent::BindEventToFunction(const FString& EventName,
const FString& FunctionName,
UObject* Target,
const FString& Namespace /*= FString(TEXT("/"))*/,
ESIOThreadOverrideOption ThreadOverride /*= USE_DEFAULT*/,
UObject* WorldContextObject /*= nullptr*/)
{
if (!FunctionName.IsEmpty())
{
Expand All @@ -582,7 +587,7 @@ void USocketIOClientComponent::BindEventToFunction(const FString& EventName, con
OnNativeEvent(EventName, [&, FunctionName, Target](const FString& Event, const TSharedPtr<FJsonValue>& Message)
{
CallBPFunctionWithMessage(Target, FunctionName, Message);
}, Namespace);
}, Namespace, ThreadOverride);
}
else
{
Expand All @@ -591,9 +596,12 @@ void USocketIOClientComponent::BindEventToFunction(const FString& EventName, con
}
}

void USocketIOClientComponent::OnNativeEvent(const FString& EventName, TFunction< void(const FString&, const TSharedPtr<FJsonValue>&)> CallbackFunction, const FString& Namespace /*= FString(TEXT("/"))*/)
void USocketIOClientComponent::OnNativeEvent(const FString& EventName,
TFunction< void(const FString&, const TSharedPtr<FJsonValue>&)> CallbackFunction,
const FString& Namespace /*= FString(TEXT("/"))*/,
ESIOThreadOverrideOption ThreadOverride /*= USE_DEFAULT*/)
{
NativeClient->OnEvent(EventName, CallbackFunction, Namespace);
NativeClient->OnEvent(EventName, CallbackFunction, Namespace, ThreadOverride);
}

#if PLATFORM_WINDOWS
Expand Down
33 changes: 28 additions & 5 deletions Source/SocketIOClient/Private/SocketIONative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ void FSocketIONative::EmitRawBinary(const FString& EventName, uint8* Data, int32
PrivateClient->socket(USIOMessageConvert::StdString(Namespace))->emit(USIOMessageConvert::StdString(EventName), std::make_shared<std::string>((char*)Data, DataLength));
}

void FSocketIONative::OnEvent(const FString& EventName, TFunction< void(const FString&, const TSharedPtr<FJsonValue>&)> CallbackFunction, const FString& Namespace /*= FString(TEXT("/"))*/)
void FSocketIONative::OnEvent(const FString& EventName,
TFunction< void(const FString&, const TSharedPtr<FJsonValue>&)> CallbackFunction,
const FString& Namespace /*= FString(TEXT("/"))*/,
ESIOThreadOverrideOption CallbackThread /*= USE_DEFAULT*/)
{
//Keep track of all the bound native JsonValue functions
FSIOBoundEvent BoundEvent;
Expand All @@ -234,21 +237,41 @@ void FSocketIONative::OnEvent(const FString& EventName, TFunction< void(const FS

OnRawEvent(EventName, [&, CallbackFunction](const FString& Event, const sio::message::ptr& RawMessage) {
CallbackFunction(Event, USIOMessageConvert::ToJsonValue(RawMessage));
}, Namespace);
}, Namespace, CallbackThread);
}

void FSocketIONative::OnRawEvent(const FString& EventName, TFunction< void(const FString&, const sio::message::ptr&)> CallbackFunction, const FString& Namespace /*= FString(TEXT("/"))*/)
void FSocketIONative::OnRawEvent(const FString& EventName,
TFunction< void(const FString&, const sio::message::ptr&)> CallbackFunction,
const FString& Namespace /*= FString(TEXT("/"))*/,
ESIOThreadOverrideOption CallbackThread /*= USE_DEFAULT*/)
{
const TFunction< void(const FString&, const sio::message::ptr&)> SafeFunction = CallbackFunction; //copy the function so it remains in context

//determine thread override option
bool bCallbackThisEventOnGameThread = bCallbackOnGameThread;
switch (CallbackThread)
{
case USE_DEFAULT:
break;
case GAME_THREAD:
bCallbackThisEventOnGameThread = true;
break;
case NETWORK_THREAD:
bCallbackThisEventOnGameThread = false;
break;
default:
break;
}

PrivateClient->socket(USIOMessageConvert::StdString(Namespace))->on(
USIOMessageConvert::StdString(EventName),
sio::socket::event_listener_aux(
[&, SafeFunction](std::string const& name, sio::message::ptr const& data, bool isAck, sio::message::list &ack_resp)
[&, SafeFunction, bCallbackThisEventOnGameThread](std::string const& name, sio::message::ptr const& data, bool isAck, sio::message::list &ack_resp)
{
const FString SafeName = USIOMessageConvert::FStringFromStd(name);

if (bCallbackOnGameThread)

if (bCallbackThisEventOnGameThread)
{
FLambdaRunnable::RunShortLambdaOnGameThread([&, SafeFunction, SafeName, data]
{
Expand Down
37 changes: 21 additions & 16 deletions Source/SocketIOClient/Public/SocketIOClientComponent.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
* @param Namespace Namespace within socket.io
*/
UFUNCTION(BlueprintCallable, Category = "SocketIO Functions")
void Emit(const FString& EventName, USIOJsonValue* Message = nullptr, const FString& Namespace = FString(TEXT("/")));
void Emit(const FString& EventName, USIOJsonValue* Message = nullptr, const FString& Namespace = TEXT("/"));

/**
* Emit an event with a JsonValue message with a callback function defined by CallBackFunctionName
Expand All @@ -173,7 +173,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
USIOJsonValue* Message = nullptr,
const FString& CallbackFunctionName = FString(""),
UObject* Target = nullptr,
const FString& Namespace = FString(TEXT("/")),
const FString& Namespace = TEXT("/"),
UObject* WorldContextObject = nullptr);


Expand All @@ -191,7 +191,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
struct FLatentActionInfo LatentInfo,
USIOJsonValue*& Result,
USIOJsonValue* Message = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* Bind an event, then respond to it with 'OnEvent' multi-cast delegate
Expand All @@ -200,7 +200,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
* @param Namespace Optional namespace, defaults to default namespace
*/
UFUNCTION(BlueprintCallable, Category = "SocketIO Functions")
void BindEvent(const FString& EventName, const FString& Namespace = FString(TEXT("/")));
void BindEvent(const FString& EventName, const FString& Namespace = TEXT("/"));


/**
Expand All @@ -210,12 +210,15 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
* @param EventName Event name
* @param FunctionName The function that gets called when the event is received
* @param Target Optional, defaults to caller self. Change to delegate to another class.
* @param Namespace Optional namespace, defaults to default namespace
* @param ThreadOverride Optional override to receive event on specified thread. Note NETWORK thread is lower latency but unsafe for a lot of blueprint use. Use with CAUTION.
*/
UFUNCTION(BlueprintCallable, Category = "SocketIO Functions", meta = (WorldContext = "WorldContextObject"))
void BindEventToFunction( const FString& EventName,
const FString& FunctionName,
UObject* Target,
const FString& Namespace = FString(TEXT("/")),
const FString& Namespace = TEXT("/"),
ESIOThreadOverrideOption ThreadOverride = USE_DEFAULT,
UObject* WorldContextObject = nullptr);
//
//C++ functions
Expand Down Expand Up @@ -246,7 +249,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
const TSharedPtr<FJsonValue>& Message = nullptr,
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with a Json Object message
Expand All @@ -259,7 +262,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
const TSharedPtr<FJsonObject>& ObjectMessage = nullptr,
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with a string message
Expand All @@ -272,7 +275,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
const FString& StringMessage = FString(),
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with a string message
Expand All @@ -285,7 +288,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
const SIO_TEXT_TYPE StringMessage = TEXT(""),
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with a number (double) message
Expand All @@ -298,7 +301,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
double NumberMessage,
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with a bool message
Expand All @@ -311,7 +314,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
bool BooleanMessage,
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with a binary message
Expand All @@ -324,7 +327,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
const TArray<uint8>& BinaryMessage,
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with an array message
Expand All @@ -337,7 +340,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
void EmitNative(const FString& EventName,
const TArray<TSharedPtr<FJsonValue>>& ArrayMessage,
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* (Overloaded) Emit an event with an UStruct message
Expand All @@ -352,18 +355,20 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
UStruct* Struct,
const void* StructPtr,
TFunction< void(const TArray<TSharedPtr<FJsonValue>>&)> CallbackFunction = nullptr,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));

/**
* Call function callback on receiving socket event. C++ only.
*
* @param EventName Event name
* @param TFunction Lambda callback, JSONValue
* @param Namespace Optional namespace, defaults to default namespace
* @param ThreadOverride Optional override to receive event on specified thread. Note NETWORK thread is lower latency but unsafe for a lot of blueprint use. Use with CAUTION.
*/
void OnNativeEvent( const FString& EventName,
TFunction< void(const FString&, const TSharedPtr<FJsonValue>&)> CallbackFunction,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"),
ESIOThreadOverrideOption ThreadOverride = USE_DEFAULT);

/**
* Call function callback on receiving binary event. C++ only.
Expand All @@ -374,7 +379,7 @@ class SOCKETIOCLIENT_API USocketIOClientComponent : public UActorComponent
*/
void OnBinaryEvent( const FString& EventName,
TFunction< void(const FString&, const TArray<uint8>&)> CallbackFunction,
const FString& Namespace = FString(TEXT("/")));
const FString& Namespace = TEXT("/"));


/** Called by SocketIOFunctionLibrary to initialize statically constructed components. */
Expand Down
16 changes: 14 additions & 2 deletions Source/SocketIOClient/Public/SocketIONative.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ enum ESIOConnectionCloseReason
CLOSE_REASON_DROP
};

UENUM(BlueprintType)
enum ESIOThreadOverrideOption
{
USE_DEFAULT,
GAME_THREAD,
NETWORK_THREAD
};

//Wrapper function for TFunctions which can be hashed based on pointers. I.e. no duplicate functions allowed
//NB: Not currently used
template <typename T>
Expand Down Expand Up @@ -310,23 +318,27 @@ class SOCKETIOCLIENT_API FSocketIONative
* @param EventName Event name
* @param TFunction Lambda callback, JSONValue
* @param Namespace Optional namespace, defaults to default namespace
* @param CallbackThread Override default bCallbackOnGameThread option to specified option for this event
*/
void OnEvent(
const FString& EventName,
TFunction< void(const FString&, const TSharedPtr<FJsonValue>&)> CallbackFunction,
const FString& Namespace = TEXT("/"));
const FString& Namespace = TEXT("/"),
ESIOThreadOverrideOption CallbackThread = USE_DEFAULT);

/**
* Call function callback on receiving raw event. C++ only.
*
* @param EventName Event name
* @param TFunction Lambda callback, raw flavor
* @param Namespace Optional namespace, defaults to default namespace
* @param CallbackThread Override default bCallbackOnGameThread option to specified option for this event
*/
void OnRawEvent(
const FString& EventName,
TFunction< void(const FString&, const sio::message::ptr&)> CallbackFunction,
const FString& Namespace = TEXT("/"));
const FString& Namespace = TEXT("/"),
ESIOThreadOverrideOption CallbackThread = USE_DEFAULT);
/**
* Call function callback on receiving binary event. C++ only.
*
Expand Down

0 comments on commit 3d287d7

Please sign in to comment.