Skip to content

Commit

Permalink
wip: switched client and server to CreateDestroy
Browse files Browse the repository at this point in the history
Related #501
Related #502

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 18, 2023
1 parent 15043d7 commit 4be338a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 41 deletions.
22 changes: 10 additions & 12 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import type {
} from './types';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { JSONValue, POJO } from 'types';
import { StartStop } from '@matrixai/async-init/dist/StartStop';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';

interface RPCClient extends StartStop {}
@StartStop()
interface RPCClient extends CreateDestroy {}
@CreateDestroy()
class RPCClient {
static async createRPCClient({
streamPairCreateCallback,
Expand All @@ -27,7 +27,6 @@ class RPCClient {
streamPairCreateCallback,
logger,
});
await rpcClient.start();
logger.info(`Created ${this.name}`);
return rpcClient;
}
Expand All @@ -47,22 +46,18 @@ class RPCClient {
this.streamPairCreateCallback = streamPairCreateCallback;
}

public async start(): Promise<void> {
this.logger.info(`Starting ${this.constructor.name}`);
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop(): Promise<void> {
this.logger.info(`Stopping ${this.constructor.name}`);
public async destroy(): Promise<void> {
this.logger.info(`Destroying ${this.constructor.name}`);
for await (const [stream] of this.activeStreams.entries()) {
stream.cancel(new rpcErrors.ErrorRpcStopping());
}
for await (const [stream] of this.activeStreams.entries()) {
await stream;
}
this.logger.info(`Stopped ${this.constructor.name}`);
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async duplexStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
_metadata: POJO,
Expand Down Expand Up @@ -134,6 +129,7 @@ class RPCClient {
};
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async serverStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
Expand All @@ -156,6 +152,7 @@ class RPCClient {
};
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async clientStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
metadata: POJO,
Expand All @@ -182,6 +179,7 @@ class RPCClient {
};
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async unaryCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
Expand Down
38 changes: 10 additions & 28 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,15 @@ import type { JSONValue, POJO } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { UnaryHandler } from './types';
import { ReadableStream } from 'stream/web';
import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import { PromiseCancellable } from '@matrixai/async-cancellable';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';
import * as grpcUtils from '../grpc/utils';

// FIXME: Might need to be StartStop. Won't know for sure until it's used.
interface RPCServer extends CreateDestroyStartStop {}
@CreateDestroyStartStop(
new rpcErrors.ErrorRpcRunning(),
new rpcErrors.ErrorRpcDestroyed(),
)
interface RPCServer extends CreateDestroy {}
@CreateDestroy()
class RPCServer {
static async createRPCServer({
container,
Expand All @@ -41,7 +34,6 @@ class RPCServer {
container,
logger,
});
await rpcServer.start();
logger.info(`Created ${this.name}`);
return rpcServer;
}
Expand All @@ -64,35 +56,25 @@ class RPCServer {
this.logger = logger;
}

public async start(): Promise<void> {
this.logger.info(`Starting ${this.constructor.name}`);
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop(): Promise<void> {
this.logger.info(`Stopping ${this.constructor.name}`);
public async destroy(): Promise<void> {
this.logger.info(`Destroying ${this.constructor.name}`);
// Stopping any active steams
const activeStreams = this.activeStreams;
for await (const [activeStream] of activeStreams.entries()) {
activeStream.cancel(new rpcErrors.ErrorRpcStopping());
}
this.logger.info(`Stopped ${this.constructor.name}`);
}

public async destroy(): Promise<void> {
this.logger.info(`Destroying ${this.constructor.name}`);
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@ready(new rpcErrors.ErrorRpcNotRunning())
@ready(new rpcErrors.ErrorRpcDestroyed())
public registerDuplexStreamHandler<I extends JSONValue, O extends JSONValue>(
method: string,
handler: DuplexStreamHandler<I, O>,
) {
this.handlerMap.set(method, handler);
}

@ready(new rpcErrors.ErrorRpcNotRunning())
@ready(new rpcErrors.ErrorRpcDestroyed())
public registerUnaryHandler<I extends JSONValue, O extends JSONValue>(
method: string,
handler: UnaryHandler<I, O>,
Expand All @@ -111,7 +93,7 @@ class RPCServer {
this.handlerMap.set(method, wrapperDuplex);
}

@ready(new rpcErrors.ErrorRpcNotRunning())
@ready(new rpcErrors.ErrorRpcDestroyed())
public registerServerStreamHandler<I extends JSONValue, O extends JSONValue>(
method: string,
handler: ServerStreamHandler<I, O>,
Expand All @@ -130,7 +112,7 @@ class RPCServer {
this.handlerMap.set(method, wrapperDuplex);
}

@ready(new rpcErrors.ErrorRpcNotRunning())
@ready(new rpcErrors.ErrorRpcDestroyed())
public registerClientStreamHandler<I extends JSONValue, O extends JSONValue>(
method: string,
handler: ClientStreamHandler<I, O>,
Expand All @@ -146,7 +128,7 @@ class RPCServer {
this.handlerMap.set(method, wrapperDuplex);
}

@ready(new rpcErrors.ErrorRpcNotRunning())
@ready(new rpcErrors.ErrorRpcDestroyed())
public handleStream(
streamPair: ReadableWritablePair<Buffer, Buffer>,
connectionInfo: ConnectionInfo,
Expand Down
1 change: 0 additions & 1 deletion src/RPC/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import * as rpcErrors from './errors';
import * as rpcUtils from './utils';
import * as utils from '../utils';
import * as validationErrors from '../validation/errors';
import { ErrorRpcMessageLength } from "./errors";
const jsonStreamParsers = require('@streamparser/json');

class JsonToJsonMessage implements Transformer<Buffer, JsonRpcMessage> {
Expand Down

0 comments on commit 4be338a

Please sign in to comment.