Skip to content

DataFlows: Overview

Michael Gehling edited this page Jun 17, 2020 · 2 revisions

DataFlows

The DataFlows facet implements a concept to unify communication in a very flexible and performant way. It can equally be used within a component, to connect components or even remotely over different transport protocols.

Core Features

  • anything can be a message
  • n:m-connectivity for flexible routing
  • thread-safety
  • auto-disconnection
  • sync and async sending and receiving
  • high performance
  • simple to use
  • many predefined connectors and endpoints

Descripton

A DataFlow element sending messages is called source, an element receiving messages is called sink. Connectors are combined DataFlow elements (sink and source) that can forward messages from a source to a sink. In order to communicate via DataFlows, a route is defined from a source to a sink with an arbitrary amount of connectors in between. The first and last element of a route are called endpoints.

DataFlow messages can be anything: object, struct or primitive type - you don't have to implement any interface or inherit from a special base type. In principle, any route can transport all types of message types in parallel. That makes usage of DataFlows extremly simple and flexible with minimal effort. Additionally, sending messages over a DataFlow route is very efficient - it costs only little more than a direct method call.

Because every source can connect to many sinks and one sink can be connected to many sources (n:m-connectivity), mulitple routes can share some of their involved elements. The following example shows three routes from two different senders to two different receivers. One route directly connects sender1 with receiver1, while the other two routes start from different senders leading over the same filter connector to receiver2, but forwarding only messages that are strings that begin wit "Route2".

var sender1 = new Sender();
var sender2 = new Sender();
var receiver1 = new QueueReceiver<string>();
var receiver2 = new QueueReceiver<string>();
var filter = new Filter<string>(msg=>msg.StartsWith("Route2"));
sender1.ConnectTo(receiver1);
sender1.ConnectTo(filter).ConnectTo(receiver2);
sender2.ConnectTo(filter);

Often routes are pre-defined in the code to wire up components, but they can also be created dynamically at runtime. All DataFlow elements are fully thread-safe, so it is totally possible to add connect or disconnect elements of a route while sending messages from different threads over the very same route.

Additionally, DataFlow sources provide an auto-disconnection feature, which is realized via the option to use a weak reference when connecting a sink. So when no other references to the target are left, it will not be kept alive from the DataFlow connection. This reduces the risk of memory leaks and the effort to actively manage all connections.

sender.ConnectTo(receiver, weakReference:true);

All DataFlow elements support equally synchronous and asynchronous programming and even mixed usage.

The FeatureFlowFramework already provides a large variety of predefined DataFlow elements, but it is also easy to implement custom DataFlow elements.

Predefined DataFlow Elements

Basic Endpoints

  • Sender: Usually, the starting point of a route. Send a message using Send() or SendAsync().
  • QueueReceiver<T>: The most common type of receiver. Queues all messages in the incoming order. Receive the messages by using TryReceive(), TryReceiveAsync() or ReceiveAll().
  • PriorityQueueReceiver<T>: Queues all messages in the order of priority. Receive the messages by using TryReceive(), TryReceiveAsync() or ReceiveAll().
  • LatestMessageReceiver<T>: Keeps only the latest incoming message. Receive the message by using TryReceive(), TryReceiveAsync() or ReceiveAll().
  • PriorityMessageReceiver<T>: Keeps only one incoming message with the highest priority. Receive the message by using TryReceive(), TryReceiveAsync() or ReceiveAll().
  • MessageTrigger: Is triggered on an incoming message. Can be reset either manually, toggle on further messages, or be reset instantly to only interrupt the waiting.
  • ConditionalTrigger: On an incoming message, it is triggered if the trigger condition is met or reset if the reset condition is met.

Connectors

  • Forwarder: Simply forwards each incoming message to the connected sinks.
  • DeactivatableForwarder: Simply forwards incoming messages, but can be deactivated to stop forwarding. Optionally, the activation state can be based on a condition that is checked each time a message comes in.
  • ActiveForwarder: Queues incoming messages and forwards them to the connected sinks using one or more thread-pool-threads.
  • ActivePriorityForwarder<T>: Queues incoming message in the order of priority and forwards them to the connected sinks using one or more thread-pool-threads.
  • BufferingForwarder: Forwards all incoming messages, but also keeps them in a ring-buffer, to provide the message history to each newly connected sink.
  • Filter<T>: Forwards only messages that are of the specified type and, optionally, meets a condition. If a message is filtered out, it can be received over an alternative interface.
  • DuplicateMessageSuppressor: Forwards all messages until a message is judged as a duplicate of a former message that was sent within a defined duration, then the duplicate message is dropped.
  • MessageConverter<I,O>: Converts a message from one type to another. If the message's type doesn't match the input type it is forwarded unchanged.
  • Splitter<T>: Converts a message into multiple other messages that can be of different types. If the message's type doesn't match the input type it is forwarded unchanged.
  • Selector<T>: Accepts an ordered list of options, each consisting of a DataFlow source with a condition. Messages are forwarded only to the first option where it meets it's condition. Optionally, the message can be forwarded to ALL options with a matching condition. A message that doesn't meet any condition is forwarded to an alternative exit.
  • Hub: Accepts a number of sockets, each being a sink and source. Messages send over one socket are forwarded to all other sockets of the hub, but not looped back to the sending socket.

Test and Diagnostic

  • DataFlowProbe<T1,T2>: Creates statistics about the appearance of specific messages on basis of time-slices. Can also buffer timestamps and message data in a ring buffer.
  • CountingForwarder: Forwards all messages while counting them. Can wait until a specific number of messages were forwarded.
  • DelayingForwarder: Delays a message for a defined time before it is forwarded.

TCP Endpoints

  • TCPServerEndpoint: Sets up a TCP/IP server waiting for clients to connect. Supports TLS for server authentication and encryption. Messages forwarded to the TCPServerEndpoint are serialized and send to the connected TCP clients. Messages received from connected TCP clients are deserialized and forwarded to the connected DataFlow sinks.
  • TCPClientEndpoint: (Re-)connects automatically to the configured TCP/IP server. Supports TLS for server authentication and encryption. Messages forwarded to the TCPClientEndpoint are serialized and send to the connected TCP server. Messages received from connected TCP server are deserialized and forwarded to the connected DataFlow sinks.

RPC Endpoints

  • RpcCallee: Can register multiple methods that can be called remotely via their specified name. When the RpcRequest message arrives, the corresponding method is executed and the result is send back to the caller. Accepts RpcRequest objects and JSON encoded RpcRequest.
  • QueuingRpcCallee: Can register multiple methods that can be called remotely via their specified name. When the RpcRequest message arrives it is queued. Queued requests can then be executed in the context of the callee successively and the results are send back to their callers. Accepts RpcRequest objects and JSON encoded RpcRequest.
  • RpcCaller: Can call any method that was registered at a remote RpcCallee. A method can either be called asynchronously to wait for a response or it can be called synchronously and ignoring any response. Optionally the caller can also wait for multiple responses in case the call is sent to multiple callees in parallel.
  • StringRpcCaller: Can call any method that was registered at a remote RpcCallee via a string based API. The request message is send in a JSON encoded form. A method can either be called asynchronously to wait for a response or it can be called synchronously and ignoring any response. Optionally the caller can also wait for multiple responses in case the call is sent to multiple callees in parallel.

Web Endpoints

  • HttpServerFetchProvider: An DataFlow endpoint that serializes incoming messages and keeps them in a ringbuffer to provide them in the webserver. Multiple messages can be fetched with a HTTP-GET-Request using a consecutively numbering. Allows also for long-polling.
  • HttpServerReceiver: Takes messages send via HTTP-POST-Request to the webserver, deserializes and forwards them to connected DataFlow sinks.
  • HttpServerRpcAdapter: Takes RPC commands in a string-format (see StringRpcCaller) send via HTTP-POST-Request to the webserver and forwards them to connected RpcCallee elements. The RPC-Responses are returned as JSON-strings via HTTP-Results to the client.
Clone this wiki locally