diff --git a/samples/basic-chat/README.md b/samples/basic-chat/README.md new file mode 100644 index 000000000..35348b0fb --- /dev/null +++ b/samples/basic-chat/README.md @@ -0,0 +1,40 @@ +# Ballerina WebSockets Test Examples + +## Overview + +This is a set of Ballerina WebSocket test examples. The examples demonstrates how to use the AsyncAPI tools to generate the specification from a Ballerina WebSocket service and how to use the generated specification to create a Ballerina WebSocket client. Moreover, the examples demonstrate how to use the generated WebSockets client to write a simple client application. + +## Generating the AsyncAPI specification + +You can generate a demo AsyncAPI specification using the `server-demo` package. A pregenerated specification already exists in the `specs` directory. If you want to generate the specification again, you can use the following command, where `main.bal` is the Ballerina file that contains the entry point to the WebSocket service. + +```bash +bal asyncapi --protocol ws -i main.bal -o specs +``` + +## A client-server example + +The `client` package and the `server` package are a simple example of a client-server application. The client sends a message to the server, which responds with a message, and the client logs the response. The client continuously listens for messages from the server. + +You can have two instances of the client to pass messages between them through the server. + +In the `client` package, the files `client.bal`, `types.bal`, and `utils.bal` are generated by the AsyncAPI tool, whereas the `main.bal` file is an example usage of the generated client. + +In the `server` package, the `messages.md` file contains the message examples that the server expects. The `main.bal` file is the WebSocket server implementation. + +You can regenerate the client using the following command: + +```bash +bal asyncapi --protocol ws -i asyncapi.yaml +``` + +You can run the server and client using the following commands: + +```bash +bal run server +bal run client +``` + +## A client-server example with a dispatcher stream id + +The same application can be run with a dispatcher stream id using the `client-with-dispatcherStreamId` package and the `server-with-dispatcherStreamId` package. \ No newline at end of file diff --git a/samples/basic-chat/client-with-dispatcherStreamId/Ballerina.toml b/samples/basic-chat/client-with-dispatcherStreamId/Ballerina.toml new file mode 100644 index 000000000..529d692b8 --- /dev/null +++ b/samples/basic-chat/client-with-dispatcherStreamId/Ballerina.toml @@ -0,0 +1,8 @@ +[package] +org = "wso2" +name = "test_websocket" +version = "0.1.0" +distribution = "2201.9.0" + +[build-options] +observabilityIncluded = true diff --git a/samples/basic-chat/client-with-dispatcherStreamId/Dependencies.toml b/samples/basic-chat/client-with-dispatcherStreamId/Dependencies.toml new file mode 100644 index 000000000..30980c398 --- /dev/null +++ b/samples/basic-chat/client-with-dispatcherStreamId/Dependencies.toml @@ -0,0 +1,348 @@ +# AUTO-GENERATED FILE. DO NOT MODIFY. + +# This file is auto-generated by Ballerina for managing dependency versions. +# It should not be modified by hand. + +[ballerina] +dependencies-toml-version = "2" +distribution-version = "2201.9.0" + +[[package]] +org = "ballerina" +name = "auth" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"} +] + +[[package]] +org = "ballerina" +name = "cache" +version = "3.8.0" +dependencies = [ + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "task"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "constraint" +version = "1.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "crypto" +version = "2.7.2" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "file" +version = "1.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "os"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "http" +version = "2.11.2" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "file"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.decimal"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.regexp"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "mime"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "observe"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "io" +version = "1.6.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"} +] +modules = [ + {org = "ballerina", packageName = "io", moduleName = "io"} +] + +[[package]] +org = "ballerina" +name = "jballerina.java" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "jwt" +version = "2.12.1" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "lang.__internal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.array" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"} +] + +[[package]] +org = "ballerina" +name = "lang.decimal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.int" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.object" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "lang.regexp" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.runtime" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.string" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.regexp"} +] + +[[package]] +org = "ballerina" +name = "lang.value" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "log" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "observe"} +] +modules = [ + {org = "ballerina", packageName = "log", moduleName = "log"} +] + +[[package]] +org = "ballerina" +name = "mime" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"} +] + +[[package]] +org = "ballerina" +name = "oauth2" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "observe" +version = "1.2.3" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "os" +version = "1.8.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "task" +version = "2.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "time" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "url" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "uuid" +version = "1.8.0" +dependencies = [ + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "uuid", moduleName = "uuid"} +] + +[[package]] +org = "ballerina" +name = "websocket" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "http"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "websocket", moduleName = "websocket"} +] + +[[package]] +org = "ballerinai" +name = "observe" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "observe"} +] +modules = [ + {org = "ballerinai", packageName = "observe", moduleName = "observe"} +] + +[[package]] +org = "haritha" +name = "test_websocket" +version = "0.1.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "uuid"}, + {org = "ballerina", name = "websocket"}, + {org = "ballerinai", name = "observe"}, + {org = "xlibb", name = "pipe"} +] +modules = [ + {org = "haritha", packageName = "test_websocket", moduleName = "test_websocket"} +] + +[[package]] +org = "xlibb" +name = "pipe" +version = "1.4.1" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] +modules = [ + {org = "xlibb", packageName = "pipe", moduleName = "pipe"} +] + diff --git a/samples/basic-chat/client-with-dispatcherStreamId/asyncapi.yaml b/samples/basic-chat/client-with-dispatcherStreamId/asyncapi.yaml new file mode 100644 index 000000000..697a5b914 --- /dev/null +++ b/samples/basic-chat/client-with-dispatcherStreamId/asyncapi.yaml @@ -0,0 +1,124 @@ +asyncapi: 2.5.0 +info: + title: User + version: 0.1.0 +servers: + development: + url: "{server}:{port}/user" + protocol: ws + protocolVersion: "13" + variables: + server: + default: ws://localhost + port: + default: "9092" +channels: + /: + description: Allows clients to get real-time data on users and chat with them. + subscribe: + message: + $ref: '#/components/messages/Response' + publish: + message: + oneOf: + - $ref: '#/components/messages/Subscribe' + - $ref: '#/components/messages/Unsubscribe' + - $ref: '#/components/messages/Chat' +components: + schemas: + Subscribe: + type: object + required: + - event + - name + - gender + - id + properties: + event: + type: string + const: Subscribe + description: type of event + name: + type: string + description: name of the user + gender: + type: string + description: gender of the user + id: + type: string + description: dispathcherStreamId of the message + description: Representation of a subscription. + Response: + type: object + required: + - event + - message + - id + properties: + event: + type: string + description: dispatcher key + message: + type: string + description: message to be sent + id: + type: string + description: dispathcherStreamId of the message + description: Representation of a response + Unsubscribe: + type: object + required: + - event + - id + properties: + event: + type: string + const: Unsubscribe + description: dispatcher key + id: + type: string + description: dispathcherStreamId of the message + description: Representation of an unsubscribe message. + Chat: + type: object + required: + - message + - event + - toUserId + - id + properties: + message: + type: string + description: 'message to be sent ' + event: + type: string + const: Chat + description: dispatcher key + toUserId: + type: string + description: user id to send the message + id: + type: string + description: dispathcherStreamId of the message + description: Repersentation of a message. + messages: + Response: + payload: + $ref: '#/components/schemas/Response' + Subscribe: + payload: + $ref: '#/components/schemas/Subscribe' + x-response: + $ref: '#/components/messages/Response' + x-response-type: server-streaming + Unsubscribe: + payload: + $ref: '#/components/schemas/Unsubscribe' + Chat: + payload: + $ref: '#/components/schemas/Chat' + x-response: + $ref: '#/components/messages/Response' + x-response-type: simple-rpc +x-dispatcherKey: event +x-dispatcherStreamId: id \ No newline at end of file diff --git a/samples/basic-chat/client-with-dispatcherStreamId/client.bal b/samples/basic-chat/client-with-dispatcherStreamId/client.bal new file mode 100644 index 000000000..02a573823 --- /dev/null +++ b/samples/basic-chat/client-with-dispatcherStreamId/client.bal @@ -0,0 +1,184 @@ +import ballerina/log; +import ballerina/websocket; + +import xlibb/pipe; + +public client isolated class UserClient { + private final websocket:Client clientEp; + private final pipe:Pipe writeMessageQueue; + private final PipesMap pipes; + private final StreamGeneratorsMap streamGenerators; + private boolean isActive; + + # Gets invoked to initialize the `connector`. + # + # + config - The configurations to be used when initializing the `connector` + # + serviceUrl - URL of the target service + # + return - An error if connector initialization failed + public isolated function init(websocket:ClientConfiguration clientConfig = {}, string serviceUrl = "ws://localhost:9092/user") returns error? { + self.pipes = new (); + self.streamGenerators = new (); + self.writeMessageQueue = new (1000); + websocket:Client websocketEp = check new (serviceUrl, clientConfig); + self.clientEp = websocketEp; + self.isActive = true; + self.startMessageWriting(); + self.startMessageReading(); + return; + } + + # Used to write messages to the websocket. + # + private isolated function startMessageWriting() { + worker writeMessage { + while true { + lock { + if !self.isActive { + break; + } + } + Message|pipe:Error message = self.writeMessageQueue.consume(5); + if message is pipe:Error { + if message.message() == "Operation has timed out" { + continue; + } + log:printError("PipeError: Failed to consume message from the pipe", message); + self.attemptToCloseConnection(); + return; + } + websocket:Error? wsErr = self.clientEp->writeMessage(message); + if wsErr is websocket:Error { + log:printError("WsError: Failed to write message to the client", wsErr); + self.attemptToCloseConnection(); + return; + } + } + } + } + + # Used to read messages from the websocket. + # + private isolated function startMessageReading() { + worker readMessage { + while true { + lock { + if !self.isActive { + break; + } + } + Message|websocket:Error message = self.clientEp->readMessage(Message); + if message is websocket:Error { + log:printError("WsError: Failed to read message from the client", message); + self.attemptToCloseConnection(); + return; + } + pipe:Pipe pipe; + MessageWithId|error messageWithId = message.cloneWithType(MessageWithId); + if messageWithId is MessageWithId { + pipe = self.pipes.getPipe(messageWithId.id); + } else { + pipe = self.pipes.getPipe(message.event); + } + pipe:Error? pipeErr = pipe.produce(message, 5); + if pipeErr is pipe:Error { + log:printError("PipeError: Failed to produce message to the pipe", pipeErr); + self.attemptToCloseConnection(); + return; + } + } + } + } + + remote isolated function doSubscribe(Subscribe subscribe, decimal timeout) returns stream|error { + lock { + if !self.isActive { + return error("ConnectionError: Connection has been closed"); + } + } + Message|error message = subscribe.cloneWithType(); + if message is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", message); + } + pipe:Error? pipeErr = self.writeMessageQueue.produce(message, timeout); + if pipeErr is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in producing message", pipeErr); + } + stream streamMessages; + lock { + ResponseStreamGenerator streamGenerator = new (self.pipes, subscribe.id, timeout); + self.streamGenerators.addStreamGenerator(streamGenerator); + streamMessages = new (streamGenerator); + } + return streamMessages; + } + + remote isolated function doUnsubscribe(Unsubscribe unsubscribe, decimal timeout) returns error? { + lock { + if !self.isActive { + return error("ConnectionError: Connection has been closed"); + } + } + Message|error message = unsubscribe.cloneWithType(); + if message is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", message); + } + pipe:Error? pipeErr = self.writeMessageQueue.produce(message, timeout); + if pipeErr is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in producing message", pipeErr); + } + } + + remote isolated function doChat(Chat chat, decimal timeout) returns Response|error { + lock { + if !self.isActive { + return error("ConnectionError: Connection has been closed"); + } + } + Message|error message = chat.cloneWithType(); + if message is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", message); + } + pipe:Error? pipeErr = self.writeMessageQueue.produce(message, timeout); + if pipeErr is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in producing message", pipeErr); + } + Message|pipe:Error responseMessage = self.pipes.getPipe(chat.id).consume(timeout); + if responseMessage is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in consuming message", responseMessage); + } + error? pipeCloseError = self.pipes.removePipe(chat.id); + if pipeCloseError is error { + log:printDebug("PipeError: Error in closing pipe.", pipeCloseError); + } + Response|error response = responseMessage.cloneWithType(); + if response is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", response); + } + return response; + } + + isolated function attemptToCloseConnection() { + error? connectionClose = self->connectionClose(); + if connectionClose is error { + log:printError("ConnectionError", connectionClose); + } + } + + remote isolated function connectionClose() returns error? { + lock { + self.isActive = false; + check self.writeMessageQueue.immediateClose(); + check self.pipes.removePipes(); + check self.streamGenerators.removeStreamGenerators(); + check self.clientEp->close(); + } + }; +} diff --git a/samples/basic-chat/client-with-dispatcherStreamId/main.bal b/samples/basic-chat/client-with-dispatcherStreamId/main.bal new file mode 100644 index 000000000..bdf5ad6ad --- /dev/null +++ b/samples/basic-chat/client-with-dispatcherStreamId/main.bal @@ -0,0 +1,43 @@ +import ballerina/io; +import ballerina/uuid; + +UserClient chatClient = check new (); + +public function main() returns error? { + worker subscribe returns error? { + io:println("Subscribing to the chat service"); + stream subscription = check chatClient->doSubscribe({"event":"subscribe","name":"Ballerina","gender":"Female", "id":uuid:createType1AsString()}, 10); + // the server will send two responses to the subscription request immediately + printSingleResponse(subscription); + printSingleResponse(subscription); + //to notify the subscription is done + true ->> function; + while true { + printSingleResponse(subscription); + } + } + boolean waitForSubscribe = check <- subscribe; + if waitForSubscribe is true { + io:print("Enter your message: "); + string message = io:readln(); + io:print("Enter to whom you want to send the message: "); + string toUser = io:readln(); + Response|error response = chatClient->doChat({"event":"chat","message":message, "toUserId": toUser, "id":uuid:createType1AsString()}, 10); + if response is error { + io:println("Error occurred: " + response.message()); + } + io:println("RESPONSE: ", response); + } + check wait subscribe; +} + +function printSingleResponse(stream subscription) { + record {|Response value;|}|error? message = subscription.next(); + if message !is error? { + io:println(message.value); + } else if message is error { + io:println("Error occurred at worker: " + message.message()); + } else { + io:println("NILL"); + } +} diff --git a/samples/basic-chat/client-with-dispatcherStreamId/types.bal b/samples/basic-chat/client-with-dispatcherStreamId/types.bal new file mode 100644 index 000000000..e42962e7e --- /dev/null +++ b/samples/basic-chat/client-with-dispatcherStreamId/types.bal @@ -0,0 +1,45 @@ +public type Message readonly & record {string event;}; + +public type MessageWithId readonly & record {string event; string id;}; + +# Representation of a subscription. +public type Subscribe record { + # type of event + string event; + # name of the user + string name; + # gender of the user + string gender; + # dispathcherStreamId of the message + string id; +}; + +# Representation of a response +public type Response record { + # dispatcher key + string event; + # message to be sent + string message; + # dispathcherStreamId of the message + string id; +}; + +# Representation of an unsubscribe message. +public type Unsubscribe record { + # dispatcher key + string event; + # dispathcherStreamId of the message + string id; +}; + +# Repersentation of a message. +public type Chat record { + # message to be sent + string message; + # dispatcher key + string event; + # user id to send the message + string toUserId; + # dispathcherStreamId of the message + string id; +}; diff --git a/samples/basic-chat/client-with-dispatcherStreamId/utils.bal b/samples/basic-chat/client-with-dispatcherStreamId/utils.bal new file mode 100644 index 000000000..c5ca466ec --- /dev/null +++ b/samples/basic-chat/client-with-dispatcherStreamId/utils.bal @@ -0,0 +1,105 @@ +import xlibb/pipe; + +# Stream generator class for Response return type +public client isolated class ResponseStreamGenerator { + *Generator; + private final PipesMap pipes; + private final string pipeId; + private final decimal timeout; + + # StreamGenerator + # + # + pipe - Pipe to hold stream messages + # + timeout - Waiting time + public isolated function init(PipesMap pipes, string pipeId, decimal timeout) { + self.pipes = pipes; + self.pipeId = pipeId; + self.timeout = timeout; + } + + public isolated function next() returns record {|Response value;|}|error { + while true { + anydata|error? message = self.pipes.getPipe(self.pipeId).consume(self.timeout); + if message is error? { + continue; + } + Response response = check message.cloneWithType(); + return {value: response}; + } + } + + public isolated function close() returns error? { + check self.pipes.removePipe(self.pipeId); + } +} + +# PipesMap class to handle generated pipes +public isolated class PipesMap { + private final map pipes; + + public isolated function init() { + self.pipes = {}; + } + + public isolated function addPipe(string id, pipe:Pipe pipe) { + lock { + self.pipes[id] = pipe; + } + } + + public isolated function getPipe(string id) returns pipe:Pipe { + lock { + if (self.pipes.hasKey(id)) { + return self.pipes.get(id); + } + pipe:Pipe pipe = new (100); + self.addPipe(id, pipe); + return pipe; + } + } + + public isolated function removePipe(string id) returns error? { + lock { + _ = check self.getPipe(id).gracefulClose(); + _ = self.pipes.remove(id); + } + } + + public isolated function removePipes() returns error? { + lock { + foreach pipe:Pipe pipe in self.pipes { + check pipe.gracefulClose(); + } + self.pipes.removeAll(); + } + } +} + +# StreamGeneratorsMap class to handle generated stream generators +public isolated class StreamGeneratorsMap { + private final Generator[] streamGenerators; + + public isolated function init() { + self.streamGenerators = []; + } + + public isolated function addStreamGenerator(Generator streamGenerator) { + lock { + self.streamGenerators.push(streamGenerator); + } + } + + public isolated function removeStreamGenerators() returns error? { + lock { + foreach Generator streamGenerator in self.streamGenerators { + check streamGenerator.close(); + } + } + } +} + +# Generator object type for type inclusion +public type Generator isolated object { + public isolated function next() returns record {|anydata value;|}|error; + public isolated function close() returns error?; +}; diff --git a/samples/basic-chat/client/Ballerina.toml b/samples/basic-chat/client/Ballerina.toml new file mode 100644 index 000000000..529d692b8 --- /dev/null +++ b/samples/basic-chat/client/Ballerina.toml @@ -0,0 +1,8 @@ +[package] +org = "wso2" +name = "test_websocket" +version = "0.1.0" +distribution = "2201.9.0" + +[build-options] +observabilityIncluded = true diff --git a/samples/basic-chat/client/Dependencies.toml b/samples/basic-chat/client/Dependencies.toml new file mode 100644 index 000000000..fab3f420d --- /dev/null +++ b/samples/basic-chat/client/Dependencies.toml @@ -0,0 +1,333 @@ +# AUTO-GENERATED FILE. DO NOT MODIFY. + +# This file is auto-generated by Ballerina for managing dependency versions. +# It should not be modified by hand. + +[ballerina] +dependencies-toml-version = "2" +distribution-version = "2201.9.0" + +[[package]] +org = "ballerina" +name = "auth" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"} +] + +[[package]] +org = "ballerina" +name = "cache" +version = "3.8.0" +dependencies = [ + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "task"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "constraint" +version = "1.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "crypto" +version = "2.7.2" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "file" +version = "1.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "os"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "http" +version = "2.11.2" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "file"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.decimal"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.regexp"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "mime"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "observe"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "io" +version = "1.6.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"} +] +modules = [ + {org = "ballerina", packageName = "io", moduleName = "io"} +] + +[[package]] +org = "ballerina" +name = "jballerina.java" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "jwt" +version = "2.12.1" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "lang.__internal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.array" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"} +] + +[[package]] +org = "ballerina" +name = "lang.decimal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.int" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.object" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "lang.regexp" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.runtime" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.string" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.regexp"} +] + +[[package]] +org = "ballerina" +name = "lang.value" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "log" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "observe"} +] +modules = [ + {org = "ballerina", packageName = "log", moduleName = "log"} +] + +[[package]] +org = "ballerina" +name = "mime" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"} +] + +[[package]] +org = "ballerina" +name = "oauth2" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "observe" +version = "1.2.3" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "os" +version = "1.8.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "task" +version = "2.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "time" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "url" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "websocket" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "http"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "websocket", moduleName = "websocket"} +] + +[[package]] +org = "ballerinai" +name = "observe" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "observe"} +] +modules = [ + {org = "ballerinai", packageName = "observe", moduleName = "observe"} +] + +[[package]] +org = "haritha" +name = "test_websocket" +version = "0.1.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "websocket"}, + {org = "ballerinai", name = "observe"}, + {org = "xlibb", name = "pipe"} +] +modules = [ + {org = "haritha", packageName = "test_websocket", moduleName = "test_websocket"} +] + +[[package]] +org = "xlibb" +name = "pipe" +version = "1.4.1" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] +modules = [ + {org = "xlibb", packageName = "pipe", moduleName = "pipe"} +] + diff --git a/samples/basic-chat/client/asyncapi.yaml b/samples/basic-chat/client/asyncapi.yaml new file mode 100644 index 000000000..d0b08496f --- /dev/null +++ b/samples/basic-chat/client/asyncapi.yaml @@ -0,0 +1,107 @@ +asyncapi: 2.5.0 +info: + title: User + version: 0.1.0 +servers: + development: + url: "{server}:{port}/user" + protocol: ws + protocolVersion: "13" + variables: + server: + default: ws://localhost + port: + default: "9092" +channels: + /: + description: Allows clients to get real-time data on users and chat with them. + subscribe: + message: + $ref: '#/components/messages/Response' + publish: + message: + oneOf: + - $ref: '#/components/messages/Subscribe' + - $ref: '#/components/messages/Unsubscribe' + - $ref: '#/components/messages/Chat' +components: + schemas: + Subscribe: + type: object + required: + - event + - name + - gender + properties: + event: + type: string + const: Subscribe + description: type of event + name: + type: string + description: name of the user + gender: + type: string + description: gender of the user + description: Representation of a subscription. + Response: + type: object + required: + - event + - message + properties: + event: + type: string + description: dispatcher key + message: + type: string + description: message to be sent + description: Representation of a response + Unsubscribe: + type: object + required: + - event + properties: + event: + type: string + const: Unsubscribe + description: dispatcher key + description: Representation of an unsubscribe message. + Chat: + type: object + required: + - message + - event + - toUserId + properties: + message: + type: string + description: 'message to be sent ' + event: + type: string + const: Chat + description: dispatcher key + toUserId: + type: string + description: user id to send the message + description: Repersentation of a message. + messages: + Response: + payload: + $ref: '#/components/schemas/Response' + Subscribe: + payload: + $ref: '#/components/schemas/Subscribe' + x-response: + $ref: '#/components/messages/Response' + x-response-type: server-streaming + Unsubscribe: + payload: + $ref: '#/components/schemas/Unsubscribe' + Chat: + payload: + $ref: '#/components/schemas/Chat' + x-response: + $ref: '#/components/messages/Response' + x-response-type: simple-rpc +x-dispatcherKey: event diff --git a/samples/basic-chat/client/client.bal b/samples/basic-chat/client/client.bal new file mode 100644 index 000000000..de86b74be --- /dev/null +++ b/samples/basic-chat/client/client.bal @@ -0,0 +1,174 @@ +import ballerina/log; +import ballerina/websocket; + +import xlibb/pipe; + +public client isolated class UserClient { + private final websocket:Client clientEp; + private final pipe:Pipe writeMessageQueue; + private final PipesMap pipes; + private final StreamGeneratorsMap streamGenerators; + private boolean isActive; + + # Gets invoked to initialize the `connector`. + # + # + config - The configurations to be used when initializing the `connector` + # + serviceUrl - URL of the target service + # + return - An error if connector initialization failed + public isolated function init(websocket:ClientConfiguration clientConfig = {}, string serviceUrl = "ws://localhost:9092/user") returns error? { + self.pipes = new (); + self.streamGenerators = new (); + self.writeMessageQueue = new (1000); + websocket:Client websocketEp = check new (serviceUrl, clientConfig); + self.clientEp = websocketEp; + self.isActive = true; + self.startMessageWriting(); + self.startMessageReading(); + return; + } + + # Used to write messages to the websocket. + # + private isolated function startMessageWriting() { + worker writeMessage { + while true { + lock { + if !self.isActive { + break; + } + } + Message|pipe:Error message = self.writeMessageQueue.consume(5); + if message is pipe:Error { + if message.message() == "Operation has timed out" { + continue; + } + log:printError("PipeError: Failed to consume message from the pipe", message); + self.attemptToCloseConnection(); + return; + } + websocket:Error? wsErr = self.clientEp->writeMessage(message); + if wsErr is websocket:Error { + log:printError("WsError: Failed to write message to the client", wsErr); + self.attemptToCloseConnection(); + return; + } + } + } + } + + # Used to read messages from the websocket. + # + private isolated function startMessageReading() { + worker readMessage { + while true { + lock { + if !self.isActive { + break; + } + } + Message|websocket:Error message = self.clientEp->readMessage(Message); + if message is websocket:Error { + log:printError("WsError: Failed to read message from the client", message); + self.attemptToCloseConnection(); + return; + } + pipe:Pipe pipe = self.pipes.getPipe(message.event); + pipe:Error? pipeErr = pipe.produce(message, 5); + if pipeErr is pipe:Error { + log:printError("PipeError: Failed to produce message to the pipe", pipeErr); + self.attemptToCloseConnection(); + return; + } + } + } + } + + remote isolated function doSubscribe(Subscribe subscribe, decimal timeout) returns stream|error { + lock { + if !self.isActive { + return error("ConnectionError: Connection has been closed"); + } + } + Message|error message = subscribe.cloneWithType(); + if message is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", message); + } + pipe:Error? pipeErr = self.writeMessageQueue.produce(message, timeout); + if pipeErr is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in producing message", pipeErr); + } + stream streamMessages; + lock { + ResponseStreamGenerator streamGenerator = new (self.pipes, "subscribe", timeout); + self.streamGenerators.addStreamGenerator(streamGenerator); + streamMessages = new (streamGenerator); + } + return streamMessages; + } + + remote isolated function doUnsubscribe(Unsubscribe unsubscribe, decimal timeout) returns error? { + lock { + if !self.isActive { + return error("ConnectionError: Connection has been closed"); + } + } + Message|error message = unsubscribe.cloneWithType(); + if message is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", message); + } + pipe:Error? pipeErr = self.writeMessageQueue.produce(message, timeout); + if pipeErr is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in producing message", pipeErr); + } + } + + remote isolated function doChat(Chat chat, decimal timeout) returns Response|error { + lock { + if !self.isActive { + return error("ConnectionError: Connection has been closed"); + } + } + Message|error message = chat.cloneWithType(); + if message is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", message); + } + pipe:Error? pipeErr = self.writeMessageQueue.produce(message, timeout); + if pipeErr is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in producing message", pipeErr); + } + Message|pipe:Error responseMessage = self.pipes.getPipe("chat").consume(timeout); + if responseMessage is pipe:Error { + self.attemptToCloseConnection(); + return error("PipeError: Error in consuming message", responseMessage); + } + Response|error response = responseMessage.cloneWithType(); + if response is error { + self.attemptToCloseConnection(); + return error("DataBindingError: Error in cloning message", response); + } + return response; + } + + isolated function attemptToCloseConnection() { + error? connectionClose = self->connectionClose(); + if connectionClose is error { + log:printError("ConnectionError", connectionClose); + } + } + + remote isolated function connectionClose() returns error? { + lock { + self.isActive = false; + check self.writeMessageQueue.immediateClose(); + check self.pipes.removePipes(); + check self.streamGenerators.removeStreamGenerators(); + check self.clientEp->close(); + } + }; +} diff --git a/samples/basic-chat/client/main.bal b/samples/basic-chat/client/main.bal new file mode 100644 index 000000000..2c4bb2716 --- /dev/null +++ b/samples/basic-chat/client/main.bal @@ -0,0 +1,42 @@ +import ballerina/io; + +UserClient chatClient = check new (); + +public function main() returns error? { + worker subscribe returns error? { + io:println("Subscribing to the chat service"); + stream subscription = check chatClient->doSubscribe({"event":"subscribe","name":"Ballerina","gender":"Female"}, 10); + // the server will send two responses to the subscription request immediately + printSingleResponse(subscription); + printSingleResponse(subscription); + //to notify the subscription is done + true ->> function; + while true { + printSingleResponse(subscription); + } + } + boolean waitForSubscribe = check <- subscribe; + if waitForSubscribe is true { + io:print("Enter your message: "); + string message = io:readln(); + io:print("Enter to whom you want to send the message: "); + string toUser = io:readln(); + Response|error response = chatClient->doChat({"event":"chat","message":message, "toUserId": toUser}, 10); + if response is error { + io:println("Error occurred: " + response.message()); + } + io:println("RESPONSE: ", response); + } + check wait subscribe; +} + +function printSingleResponse(stream subscription) { + record {|Response value;|}|error? message = subscription.next(); + if message !is error? { + io:println(message.value); + } else if message is error { + io:println("Error occurred at worker: " + message.message()); + } else { + io:println("NILL"); + } +} \ No newline at end of file diff --git a/samples/basic-chat/client/types.bal b/samples/basic-chat/client/types.bal new file mode 100644 index 000000000..47bd50fe6 --- /dev/null +++ b/samples/basic-chat/client/types.bal @@ -0,0 +1,35 @@ +public type Message readonly & record {string event;}; + +# Representation of a subscription. +public type Subscribe record { + # type of event + string event; + # name of the user + string name; + # gender of the user + string gender; +}; + +# Representation of a response +public type Response record { + # dispatcher key + string event; + # message to be sent + string message; +}; + +# Representation of an unsubscribe message. +public type Unsubscribe record { + # dispatcher key + string event; +}; + +# Repersentation of a message. +public type Chat record { + # message to be sent + string message; + # dispatcher key + string event; + # user id to send the message + string toUserId; +}; diff --git a/samples/basic-chat/client/utils.bal b/samples/basic-chat/client/utils.bal new file mode 100644 index 000000000..c5ca466ec --- /dev/null +++ b/samples/basic-chat/client/utils.bal @@ -0,0 +1,105 @@ +import xlibb/pipe; + +# Stream generator class for Response return type +public client isolated class ResponseStreamGenerator { + *Generator; + private final PipesMap pipes; + private final string pipeId; + private final decimal timeout; + + # StreamGenerator + # + # + pipe - Pipe to hold stream messages + # + timeout - Waiting time + public isolated function init(PipesMap pipes, string pipeId, decimal timeout) { + self.pipes = pipes; + self.pipeId = pipeId; + self.timeout = timeout; + } + + public isolated function next() returns record {|Response value;|}|error { + while true { + anydata|error? message = self.pipes.getPipe(self.pipeId).consume(self.timeout); + if message is error? { + continue; + } + Response response = check message.cloneWithType(); + return {value: response}; + } + } + + public isolated function close() returns error? { + check self.pipes.removePipe(self.pipeId); + } +} + +# PipesMap class to handle generated pipes +public isolated class PipesMap { + private final map pipes; + + public isolated function init() { + self.pipes = {}; + } + + public isolated function addPipe(string id, pipe:Pipe pipe) { + lock { + self.pipes[id] = pipe; + } + } + + public isolated function getPipe(string id) returns pipe:Pipe { + lock { + if (self.pipes.hasKey(id)) { + return self.pipes.get(id); + } + pipe:Pipe pipe = new (100); + self.addPipe(id, pipe); + return pipe; + } + } + + public isolated function removePipe(string id) returns error? { + lock { + _ = check self.getPipe(id).gracefulClose(); + _ = self.pipes.remove(id); + } + } + + public isolated function removePipes() returns error? { + lock { + foreach pipe:Pipe pipe in self.pipes { + check pipe.gracefulClose(); + } + self.pipes.removeAll(); + } + } +} + +# StreamGeneratorsMap class to handle generated stream generators +public isolated class StreamGeneratorsMap { + private final Generator[] streamGenerators; + + public isolated function init() { + self.streamGenerators = []; + } + + public isolated function addStreamGenerator(Generator streamGenerator) { + lock { + self.streamGenerators.push(streamGenerator); + } + } + + public isolated function removeStreamGenerators() returns error? { + lock { + foreach Generator streamGenerator in self.streamGenerators { + check streamGenerator.close(); + } + } + } +} + +# Generator object type for type inclusion +public type Generator isolated object { + public isolated function next() returns record {|anydata value;|}|error; + public isolated function close() returns error?; +}; diff --git a/samples/basic-chat/server-demo/Ballerina.toml b/samples/basic-chat/server-demo/Ballerina.toml new file mode 100644 index 000000000..2c61b61bc --- /dev/null +++ b/samples/basic-chat/server-demo/Ballerina.toml @@ -0,0 +1,8 @@ +[package] +org = "wso2" +name = "websocket_test" +version = "0.1.0" +distribution = "2201.9.0" + +[build-options] +observabilityIncluded = true diff --git a/samples/basic-chat/server-demo/Dependencies.toml b/samples/basic-chat/server-demo/Dependencies.toml new file mode 100644 index 000000000..66545b816 --- /dev/null +++ b/samples/basic-chat/server-demo/Dependencies.toml @@ -0,0 +1,318 @@ +# AUTO-GENERATED FILE. DO NOT MODIFY. + +# This file is auto-generated by Ballerina for managing dependency versions. +# It should not be modified by hand. + +[ballerina] +dependencies-toml-version = "2" +distribution-version = "2201.9.0" + +[[package]] +org = "ballerina" +name = "auth" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"} +] + +[[package]] +org = "ballerina" +name = "cache" +version = "3.8.0" +dependencies = [ + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "task"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "constraint" +version = "1.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "crypto" +version = "2.7.2" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "file" +version = "1.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "os"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "http" +version = "2.11.2" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "file"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.decimal"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.regexp"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "mime"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "observe"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "io" +version = "1.6.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"} +] +modules = [ + {org = "ballerina", packageName = "io", moduleName = "io"} +] + +[[package]] +org = "ballerina" +name = "jballerina.java" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "jwt" +version = "2.12.1" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "lang.__internal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.array" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"} +] + +[[package]] +org = "ballerina" +name = "lang.decimal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.int" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.object" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "lang.regexp" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.runtime" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.string" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.regexp"} +] + +[[package]] +org = "ballerina" +name = "lang.value" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "log" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "observe"} +] + +[[package]] +org = "ballerina" +name = "mime" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"} +] + +[[package]] +org = "ballerina" +name = "oauth2" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "observe" +version = "1.2.3" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "os" +version = "1.8.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "task" +version = "2.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "time" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "url" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "websocket" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "http"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "websocket", moduleName = "websocket"} +] + +[[package]] +org = "ballerinai" +name = "observe" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "observe"} +] +modules = [ + {org = "ballerinai", packageName = "observe", moduleName = "observe"} +] + +[[package]] +org = "haritha" +name = "websocket_test" +version = "0.1.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "websocket"}, + {org = "ballerinai", name = "observe"} +] +modules = [ + {org = "haritha", packageName = "websocket_test", moduleName = "websocket_test"}, + {org = "haritha", packageName = "websocket_test", moduleName = "websocket_test.types"} +] + diff --git a/samples/basic-chat/server-demo/main.bal b/samples/basic-chat/server-demo/main.bal new file mode 100644 index 000000000..4c8778ca3 --- /dev/null +++ b/samples/basic-chat/server-demo/main.bal @@ -0,0 +1,94 @@ +import ballerina/websocket; +import websocket_test.types; +import ballerina/io; + +listener websocket:Listener websocketListener = check new(9092); +map users = {}; + +@websocket:ServiceConfig{dispatcherKey: "event"} +service / on websocketListener { + # An echo service that echoes the messages sent by the client. + # + return - User status + resource function get .() returns websocket:Service|websocket:UpgradeError { + return new WsService(); + } + +} + +@websocket:ServiceConfig{dispatcherKey: "event"} +service /user on websocketListener { + # Allows clients to get real-time data on users and chat with them. + # + return - websocket service + resource function get .() returns websocket:Service|websocket:UpgradeError { + return new WsServiceUser(); + } + +} + +service class WsService { + *websocket:Service; + + remote function onHello(types:Hello clientData) returns types:Response? { + return {message:"You sent: " + clientData.message}; + } + +} + +service class WsServiceUser { + *websocket:Service; + + remote function onSubscribe(websocket:Caller caller, types:Subscribe sub) returns stream|error { + string callerId = caller.getConnectionId(); + io:println(sub.name + " subscribed: " + callerId); + types:User user = {caller: caller, gender: sub.gender, name: sub.name, id: callerId}; + users[callerId] = user; + broadcast("System: User " + user.name + " (" + callerId + ")" + " has joined the chat"); + types:Response[] response = [{message: "System: Welcome to the chat!", event:"subscribe"}]; + return response.toStream(); + } + + remote function onUnsubscribe(websocket:Caller caller, types:Unsubscribe unsubscribe) returns error? { + string callerId = caller.getConnectionId(); + broadcast("System: User " + users.get(callerId).name + " has left the chat"); + _ = users.remove(callerId); + } + + // remote function onClose(websocket:Caller caller) returns websocket:Error? { + // string callerId = caller.getConnectionId(); + // if (users.hasKey(callerId)) { + // _ = users.remove(callerId); + // } + // } + + remote function onChat(websocket:Caller caller, types:Chat message) returns types:Response|error { + string callerId = caller.getConnectionId(); + if (!users.hasKey(callerId)) { + return { message: "Please subscribe first to send messages"}; + } + types:User sender = users.get(callerId); + if (!users.hasKey(message.toUserId)) { + return {message:"User not found"}; + } + websocket:Caller? receiver = users.get(message.toUserId).caller; + if (receiver is ()) { + return {message:"User not found"}; + } + _ = check receiver->writeMessage({message: sender.name + ": " + message.message, event:"subscribe"}); + return {message:"You: " + message.message, event: "chat"}; + } + +} + +function broadcast(string message) { + users.forEach(function (types:User user) { + websocket:Caller? caller = user.caller; + if (caller is ()) { + return; + } + types:Response response = {message: message, event: "subscribe"}; + error? err = caller->writeMessage(response); + if (err is error) { + io:println("Error broadcasting message: " + err.message()); + } + }); +} diff --git a/samples/basic-chat/server-demo/messages.md b/samples/basic-chat/server-demo/messages.md new file mode 100644 index 000000000..fa4a3fad3 --- /dev/null +++ b/samples/basic-chat/server-demo/messages.md @@ -0,0 +1,17 @@ +## Subscribe Message Example + +```json +{"event":"subscribe","name":"Gopi","gender":"Male"} +``` + +## Unsubscribe Message Example + +```json +{"event":"unsubscribe"} +``` + +## Chat Message Example + +```json +{"event":"chat","message":"Hello ballerina", "toUserId": "cc6b1efffe42ea75-0001e3b1-00000002-237f98c8d2877f67-d7aa765b"} +``` \ No newline at end of file diff --git a/samples/basic-chat/server-demo/modules/types/types.bal b/samples/basic-chat/server-demo/modules/types/types.bal new file mode 100644 index 000000000..3e486d00b --- /dev/null +++ b/samples/basic-chat/server-demo/modules/types/types.bal @@ -0,0 +1,88 @@ +// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/websocket; + +# Representation of a response +# +# + event - dispatcher key +# + message - message to be sent +public type Response record {| + string event = "just a message"; + string message; +|}; + +# Representation of an unsubscribe message. +# +# + event - dispatcher key +public type Unsubscribe record {| + string event; +|}; + +# Representation of an info message. +# +# + event - dispatcher key +public type Info record {| + string event; +|}; + +# Representation of a subscription. +# +# + event - type of event +# + name - name of the user +# + gender - gender of the user +public type Subscribe record {| + string event; + string name; + string gender; +|}; + +# Representation of a user. +# +# + name - name of the user +# + gender - gender of the user +# + id - id of the user (connection id) +# + caller - websocket caller object +public type User record {| + string name; + string gender; + string id; + websocket:Caller caller; +|}; + +# Repersentation of a message. +# +# + message - message to be sent +# + event - dispatcher key +# + toUserId - user id to send the message +public type Chat record {| + string message; + string event; + string toUserId; +|}; + +public type Hello record {| + string message; + string event; +|}; + +# Representation of a basic event. +# +# + event - type of event +# + id - id of the event +public type ClientData record {| + string event; + string id; +|}; diff --git a/samples/basic-chat/server-demo/specs/main_asyncapi.yaml b/samples/basic-chat/server-demo/specs/main_asyncapi.yaml new file mode 100644 index 000000000..fdbcd3095 --- /dev/null +++ b/samples/basic-chat/server-demo/specs/main_asyncapi.yaml @@ -0,0 +1,61 @@ +asyncapi: 2.5.0 +info: + title: / + version: 0.1.0 +servers: + development: + url: "{server}:{port}/" + protocol: ws + protocolVersion: "13" + variables: + server: + default: ws://localhost + port: + default: "9092" +channels: + /: + description: An echo service that echoes the messages sent by the client. + subscribe: + message: + $ref: '#/components/messages/Response' + publish: + message: + $ref: '#/components/messages/Hello' +components: + schemas: + Hello: + type: object + required: + - message + - event + properties: + message: + type: string + event: + type: string + const: Hello + Response: + type: object + required: + - event + - message + properties: + event: + type: string + description: dispatcher key + message: + type: string + description: message to be sent + description: Representation of a response + messages: + Response: + payload: + $ref: '#/components/schemas/Response' + Hello: + payload: + $ref: '#/components/schemas/Hello' + x-response: + $ref: '#/components/messages/Response' + x-required: false + x-response-type: simple-rpc +x-dispatcherKey: event diff --git a/samples/basic-chat/server-demo/specs/user_asyncapi.yaml b/samples/basic-chat/server-demo/specs/user_asyncapi.yaml new file mode 100644 index 000000000..d0b08496f --- /dev/null +++ b/samples/basic-chat/server-demo/specs/user_asyncapi.yaml @@ -0,0 +1,107 @@ +asyncapi: 2.5.0 +info: + title: User + version: 0.1.0 +servers: + development: + url: "{server}:{port}/user" + protocol: ws + protocolVersion: "13" + variables: + server: + default: ws://localhost + port: + default: "9092" +channels: + /: + description: Allows clients to get real-time data on users and chat with them. + subscribe: + message: + $ref: '#/components/messages/Response' + publish: + message: + oneOf: + - $ref: '#/components/messages/Subscribe' + - $ref: '#/components/messages/Unsubscribe' + - $ref: '#/components/messages/Chat' +components: + schemas: + Subscribe: + type: object + required: + - event + - name + - gender + properties: + event: + type: string + const: Subscribe + description: type of event + name: + type: string + description: name of the user + gender: + type: string + description: gender of the user + description: Representation of a subscription. + Response: + type: object + required: + - event + - message + properties: + event: + type: string + description: dispatcher key + message: + type: string + description: message to be sent + description: Representation of a response + Unsubscribe: + type: object + required: + - event + properties: + event: + type: string + const: Unsubscribe + description: dispatcher key + description: Representation of an unsubscribe message. + Chat: + type: object + required: + - message + - event + - toUserId + properties: + message: + type: string + description: 'message to be sent ' + event: + type: string + const: Chat + description: dispatcher key + toUserId: + type: string + description: user id to send the message + description: Repersentation of a message. + messages: + Response: + payload: + $ref: '#/components/schemas/Response' + Subscribe: + payload: + $ref: '#/components/schemas/Subscribe' + x-response: + $ref: '#/components/messages/Response' + x-response-type: server-streaming + Unsubscribe: + payload: + $ref: '#/components/schemas/Unsubscribe' + Chat: + payload: + $ref: '#/components/schemas/Chat' + x-response: + $ref: '#/components/messages/Response' + x-response-type: simple-rpc +x-dispatcherKey: event diff --git a/samples/basic-chat/server-with-dispatcherStreamId/Ballerina.toml b/samples/basic-chat/server-with-dispatcherStreamId/Ballerina.toml new file mode 100644 index 000000000..2c61b61bc --- /dev/null +++ b/samples/basic-chat/server-with-dispatcherStreamId/Ballerina.toml @@ -0,0 +1,8 @@ +[package] +org = "wso2" +name = "websocket_test" +version = "0.1.0" +distribution = "2201.9.0" + +[build-options] +observabilityIncluded = true diff --git a/samples/basic-chat/server-with-dispatcherStreamId/Dependencies.toml b/samples/basic-chat/server-with-dispatcherStreamId/Dependencies.toml new file mode 100644 index 000000000..66545b816 --- /dev/null +++ b/samples/basic-chat/server-with-dispatcherStreamId/Dependencies.toml @@ -0,0 +1,318 @@ +# AUTO-GENERATED FILE. DO NOT MODIFY. + +# This file is auto-generated by Ballerina for managing dependency versions. +# It should not be modified by hand. + +[ballerina] +dependencies-toml-version = "2" +distribution-version = "2201.9.0" + +[[package]] +org = "ballerina" +name = "auth" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"} +] + +[[package]] +org = "ballerina" +name = "cache" +version = "3.8.0" +dependencies = [ + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "task"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "constraint" +version = "1.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "crypto" +version = "2.7.2" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "file" +version = "1.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "os"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "http" +version = "2.11.2" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "file"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.decimal"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.regexp"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "mime"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "observe"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "io" +version = "1.6.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"} +] +modules = [ + {org = "ballerina", packageName = "io", moduleName = "io"} +] + +[[package]] +org = "ballerina" +name = "jballerina.java" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "jwt" +version = "2.12.1" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "lang.__internal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.array" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"} +] + +[[package]] +org = "ballerina" +name = "lang.decimal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.int" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.object" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "lang.regexp" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.runtime" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.string" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.regexp"} +] + +[[package]] +org = "ballerina" +name = "lang.value" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "log" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "observe"} +] + +[[package]] +org = "ballerina" +name = "mime" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"} +] + +[[package]] +org = "ballerina" +name = "oauth2" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "observe" +version = "1.2.3" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "os" +version = "1.8.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "task" +version = "2.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "time" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "url" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "websocket" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "http"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "websocket", moduleName = "websocket"} +] + +[[package]] +org = "ballerinai" +name = "observe" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "observe"} +] +modules = [ + {org = "ballerinai", packageName = "observe", moduleName = "observe"} +] + +[[package]] +org = "haritha" +name = "websocket_test" +version = "0.1.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "websocket"}, + {org = "ballerinai", name = "observe"} +] +modules = [ + {org = "haritha", packageName = "websocket_test", moduleName = "websocket_test"}, + {org = "haritha", packageName = "websocket_test", moduleName = "websocket_test.types"} +] + diff --git a/samples/basic-chat/server-with-dispatcherStreamId/main.bal b/samples/basic-chat/server-with-dispatcherStreamId/main.bal new file mode 100644 index 000000000..91ba45f8f --- /dev/null +++ b/samples/basic-chat/server-with-dispatcherStreamId/main.bal @@ -0,0 +1,94 @@ +import ballerina/websocket; +import websocket_test.types; +import ballerina/io; + +listener websocket:Listener websocketListener = check new(9092); +map users = {}; + +@websocket:ServiceConfig{dispatcherKey: "event"} +service / on websocketListener { + # An echo service that echoes the messages sent by the client. + # + return - User status + resource function get .() returns websocket:Service|websocket:UpgradeError { + return new WsService(); + } + +} + +@websocket:ServiceConfig{dispatcherKey: "event"} +service /user on websocketListener { + # Allows clients to get real-time data on users and chat with them. + # + return - websocket service + resource function get .() returns websocket:Service|websocket:UpgradeError { + return new WsServiceUser(); + } + +} + +service class WsService { + *websocket:Service; + + remote function onHello(types:Hello clientData) returns types:Response? { + return {message:"You sent: " + clientData.message, id: "null"}; + } + +} + +service class WsServiceUser { + *websocket:Service; + + remote function onSubscribe(websocket:Caller caller, types:Subscribe sub) returns types:Response { + string callerId = caller.getConnectionId(); + io:println(sub.name + " subscribed: " + callerId); + types:User user = {caller: caller, gender: sub.gender, name: sub.name, id: callerId, streamId: sub.id}; + users[callerId] = user; + broadcast("System: User " + user.name + " (" + callerId + ")" + " has joined the chat"); + return {message: "System: Welcome to the chat!", event:"chat", id: sub.id}; + } + + remote function onUnsubscribe(websocket:Caller caller, types:Unsubscribe unsubscribe) returns error? { + string callerId = caller.getConnectionId(); + broadcast("System: User " + users.get(callerId).name + " has left the chat"); + _ = users.remove(callerId); + } + + remote function onClose(websocket:Caller caller) returns websocket:Error? { + string callerId = caller.getConnectionId(); + if (users.hasKey(callerId)) { + _ = users.remove(callerId); + } + } + + remote function onChat(websocket:Caller caller, types:Chat message) returns types:Response|error { + string callerId = caller.getConnectionId(); + if (!users.hasKey(callerId)) { + return { message: "Please subscribe first to send messages", id: message.id, event:"chat"}; + } + types:User sender = users.get(callerId); + if (!users.hasKey(message.toUserId)) { + return {message:"User not found", id: message.id, event:"chat"}; + } + types:User receiver = users.get(message.toUserId); + websocket:Caller? receiverCaller = receiver.caller; + if (receiverCaller is ()) { + return {message:"User not found", id: message.id, event:"chat"}; + } + _ = check receiverCaller->writeMessage({message: sender.name + ": " + message.message, event:"chat", id: receiver.streamId}); + return {message:"You: " + message.message, event: "chat", id: message.id}; + } + +} + +function broadcast(string message) { + users.forEach(function (types:User user) { + websocket:Caller? caller = user.caller; + if (caller is ()) { + return; + } + types:Response response = {message: message, event: "chat", id: user.streamId}; + error? err = caller->writeMessage(response); + if (err is error) { + io:println("Error broadcasting message: " + err.message()); + } + }); +} diff --git a/samples/basic-chat/server-with-dispatcherStreamId/messages.md b/samples/basic-chat/server-with-dispatcherStreamId/messages.md new file mode 100644 index 000000000..bb094b6b1 --- /dev/null +++ b/samples/basic-chat/server-with-dispatcherStreamId/messages.md @@ -0,0 +1,17 @@ +## Subscribe Message Example + +```json +{"event":"subscribe","name":"Gopi","gender":"Male", "id":"ALOHA"} +``` + +## Unsubscribe Message Example + +```json +{"event":"unsubscribe", id:"Hellooooo"} +``` + +## Chat Message Example + +```json +{"event":"chat","message":"Hello ballerina", "toUserId": "cc6b1efffe42ea75-000420c8-00000002-3ff441ce1aa91480-ef8175ca", "id":"CHAT_ID"} +``` \ No newline at end of file diff --git a/samples/basic-chat/server-with-dispatcherStreamId/modules/types/types.bal b/samples/basic-chat/server-with-dispatcherStreamId/modules/types/types.bal new file mode 100644 index 000000000..8925c7a37 --- /dev/null +++ b/samples/basic-chat/server-with-dispatcherStreamId/modules/types/types.bal @@ -0,0 +1,98 @@ +// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/websocket; + +# Representation of a response +# +# + event - dispatcher key +# + message - message to be sent +# + id - dispatcher stream id of the event +public type Response record {| + string event = "just a message"; + string message; + string id; +|}; + +# Representation of an unsubscribe message. +# +# + event - dispatcher key +# + id - dispatcher stream id of the event +public type Unsubscribe record {| + string event; + string id; +|}; + +# Representation of an info message. +# +# + event - dispatcher key +public type Info record {| + string event; +|}; + +# Representation of a subscription. +# +# + event - type of event +# + name - name of the user +# + gender - gender of the user +# + id - dispatcher stream id of the event +public type Subscribe record {| + string event; + string name; + string gender; + string id; +|}; + +# Representation of a user. +# +# + name - name of the user +# + gender - gender of the user +# + id - id of the user (connection id) +# + caller - websocket caller object +# + streamId - dispatcher stream id of the event subscription +public type User record {| + string name; + string gender; + string id; + websocket:Caller caller; + string streamId; +|}; + +# Repersentation of a message. +# +# + message - message to be sent +# + event - dispatcher key +# + toUserId - user id to send the message +# + id - dispatcher stream id of the event +public type Chat record {| + string message; + string event; + string toUserId; + string id; +|}; + +public type Hello record {| + string message; + string event; +|}; + +# Representation of a basic event. +# +# + event - type of event +# + id - id of the event +public type ClientData record {| + string event; + string id; +|}; diff --git a/samples/basic-chat/server/Ballerina.toml b/samples/basic-chat/server/Ballerina.toml new file mode 100644 index 000000000..2c61b61bc --- /dev/null +++ b/samples/basic-chat/server/Ballerina.toml @@ -0,0 +1,8 @@ +[package] +org = "wso2" +name = "websocket_test" +version = "0.1.0" +distribution = "2201.9.0" + +[build-options] +observabilityIncluded = true diff --git a/samples/basic-chat/server/Dependencies.toml b/samples/basic-chat/server/Dependencies.toml new file mode 100644 index 000000000..66545b816 --- /dev/null +++ b/samples/basic-chat/server/Dependencies.toml @@ -0,0 +1,318 @@ +# AUTO-GENERATED FILE. DO NOT MODIFY. + +# This file is auto-generated by Ballerina for managing dependency versions. +# It should not be modified by hand. + +[ballerina] +dependencies-toml-version = "2" +distribution-version = "2201.9.0" + +[[package]] +org = "ballerina" +name = "auth" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"} +] + +[[package]] +org = "ballerina" +name = "cache" +version = "3.8.0" +dependencies = [ + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "task"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "constraint" +version = "1.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "crypto" +version = "2.7.2" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "file" +version = "1.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "os"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "http" +version = "2.11.2" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "file"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.decimal"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.regexp"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "mime"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "observe"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "io" +version = "1.6.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"} +] +modules = [ + {org = "ballerina", packageName = "io", moduleName = "io"} +] + +[[package]] +org = "ballerina" +name = "jballerina.java" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "jwt" +version = "2.12.1" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "lang.__internal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.array" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"} +] + +[[package]] +org = "ballerina" +name = "lang.decimal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.int" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.object" +version = "0.0.0" + +[[package]] +org = "ballerina" +name = "lang.regexp" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.runtime" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.string" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.regexp"} +] + +[[package]] +org = "ballerina" +name = "lang.value" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "log" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "observe"} +] + +[[package]] +org = "ballerina" +name = "mime" +version = "2.9.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"} +] + +[[package]] +org = "ballerina" +name = "oauth2" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "cache"}, + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "url"} +] + +[[package]] +org = "ballerina" +name = "observe" +version = "1.2.3" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "os" +version = "1.8.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "task" +version = "2.5.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + +[[package]] +org = "ballerina" +name = "time" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "url" +version = "2.4.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "websocket" +version = "2.11.0" +dependencies = [ + {org = "ballerina", name = "auth"}, + {org = "ballerina", name = "constraint"}, + {org = "ballerina", name = "http"}, + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "jwt"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, + {org = "ballerina", name = "lang.value"}, + {org = "ballerina", name = "log"}, + {org = "ballerina", name = "oauth2"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "websocket", moduleName = "websocket"} +] + +[[package]] +org = "ballerinai" +name = "observe" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "observe"} +] +modules = [ + {org = "ballerinai", packageName = "observe", moduleName = "observe"} +] + +[[package]] +org = "haritha" +name = "websocket_test" +version = "0.1.0" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "websocket"}, + {org = "ballerinai", name = "observe"} +] +modules = [ + {org = "haritha", packageName = "websocket_test", moduleName = "websocket_test"}, + {org = "haritha", packageName = "websocket_test", moduleName = "websocket_test.types"} +] + diff --git a/samples/basic-chat/server/main.bal b/samples/basic-chat/server/main.bal new file mode 100644 index 000000000..aa0185aff --- /dev/null +++ b/samples/basic-chat/server/main.bal @@ -0,0 +1,93 @@ +import ballerina/websocket; +import websocket_test.types; +import ballerina/io; + +listener websocket:Listener websocketListener = check new(9092); +map users = {}; + +@websocket:ServiceConfig{dispatcherKey: "event"} +service / on websocketListener { + # An echo service that echoes the messages sent by the client. + # + return - User status + resource function get .() returns websocket:Service|websocket:UpgradeError { + return new WsService(); + } + +} + +@websocket:ServiceConfig{dispatcherKey: "event"} +service /user on websocketListener { + # Allows clients to get real-time data on users and chat with them. + # + return - websocket service + resource function get .() returns websocket:Service|websocket:UpgradeError { + return new WsServiceUser(); + } + +} + +service class WsService { + *websocket:Service; + + remote function onHello(types:Hello clientData) returns types:Response? { + return {message:"You sent: " + clientData.message}; + } + +} + +service class WsServiceUser { + *websocket:Service; + + remote function onSubscribe(websocket:Caller caller, types:Subscribe sub) returns types:Response { + string callerId = caller.getConnectionId(); + io:println(sub.name + " subscribed: " + callerId); + types:User user = {caller: caller, gender: sub.gender, name: sub.name, id: callerId}; + users[callerId] = user; + broadcast("System: User " + user.name + " (" + callerId + ")" + " has joined the chat"); + return {message: "System: Welcome to the chat!", event:"subscribe"}; + } + + remote function onUnsubscribe(websocket:Caller caller, types:Unsubscribe unsubscribe) returns error? { + string callerId = caller.getConnectionId(); + broadcast("System: User " + users.get(callerId).name + " has left the chat"); + _ = users.remove(callerId); + } + + remote function onClose(websocket:Caller caller) returns websocket:Error? { + string callerId = caller.getConnectionId(); + if (users.hasKey(callerId)) { + _ = users.remove(callerId); + } + } + + remote function onChat(websocket:Caller caller, types:Chat message) returns types:Response|error { + string callerId = caller.getConnectionId(); + if (!users.hasKey(callerId)) { + return { message: "Please subscribe first to send messages"}; + } + types:User sender = users.get(callerId); + if (!users.hasKey(message.toUserId)) { + return {message:"User not found"}; + } + websocket:Caller? receiver = users.get(message.toUserId).caller; + if (receiver is ()) { + return {message:"User not found"}; + } + _ = check receiver->writeMessage({message: sender.name + ": " + message.message, event:"subscribe"}); + return {message:"You: " + message.message, event: "chat"}; + } + +} + +function broadcast(string message) { + users.forEach(function (types:User user) { + websocket:Caller? caller = user.caller; + if (caller is ()) { + return; + } + types:Response response = {message: message, event: "subscribe"}; + error? err = caller->writeMessage(response); + if (err is error) { + io:println("Error broadcasting message: " + err.message()); + } + }); +} diff --git a/samples/basic-chat/server/messages.md b/samples/basic-chat/server/messages.md new file mode 100644 index 000000000..56903e809 --- /dev/null +++ b/samples/basic-chat/server/messages.md @@ -0,0 +1,17 @@ +## Subscribe Message Example + +```json +{"event":"subscribe","name":"Gopi","gender":"Male"} +``` + +## Unsubscribe Message Example + +```json +{"event":"unsubscribe"} +``` + +## Chat Message Example + +```json +{"event":"chat","message":"Hello ballerina", "toUserId": "cc6b1efffe42ea75-0003c7e2-00000002-aa19eabfbcf62d1d-48179b7d"} +``` \ No newline at end of file diff --git a/samples/basic-chat/server/modules/types/types.bal b/samples/basic-chat/server/modules/types/types.bal new file mode 100644 index 000000000..3e486d00b --- /dev/null +++ b/samples/basic-chat/server/modules/types/types.bal @@ -0,0 +1,88 @@ +// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/websocket; + +# Representation of a response +# +# + event - dispatcher key +# + message - message to be sent +public type Response record {| + string event = "just a message"; + string message; +|}; + +# Representation of an unsubscribe message. +# +# + event - dispatcher key +public type Unsubscribe record {| + string event; +|}; + +# Representation of an info message. +# +# + event - dispatcher key +public type Info record {| + string event; +|}; + +# Representation of a subscription. +# +# + event - type of event +# + name - name of the user +# + gender - gender of the user +public type Subscribe record {| + string event; + string name; + string gender; +|}; + +# Representation of a user. +# +# + name - name of the user +# + gender - gender of the user +# + id - id of the user (connection id) +# + caller - websocket caller object +public type User record {| + string name; + string gender; + string id; + websocket:Caller caller; +|}; + +# Repersentation of a message. +# +# + message - message to be sent +# + event - dispatcher key +# + toUserId - user id to send the message +public type Chat record {| + string message; + string event; + string toUserId; +|}; + +public type Hello record {| + string message; + string event; +|}; + +# Representation of a basic event. +# +# + event - type of event +# + id - id of the event +public type ClientData record {| + string event; + string id; +|};