Skip to content

Commit

Permalink
feat(plugin-keychain-memory): add observability via RxJS ReplaySubjects
Browse files Browse the repository at this point in the history
1. This is an example of how to add observability to a plugin such as
if you had to somehow expose the stream of transaction execution requests
flowing through a connector plugin but did not feel like setting up Kafka
or RabbitMQ just for this and instead opted to do it with an in-process,
purely NodeJS/Javascript based solution.
2. The downside of this is of course that this doesn't work well in a
distributed computing environment just by itself, since if you were to
host a fleet of servers running the same connector plugin with horizontal
scaling, then this wouldn't be able to observe all the invocations across
the server fleet, but it would still make it easier to implement a functionality
like that.
3. The main purpose of this pull request is educational. The keychain memory
plugin is only used for testing and demonstration purposes and I wanted to
show to a few other contributors what I meant when I was explaining that
they could just use RxJS subjects to allow consumers of the connector plugins
to observe the stream of transactions flowing through said connector plugin
instance.

Signed-off-by: Peter Somogyvari <[email protected]>
  • Loading branch information
petermetz committed May 18, 2024
1 parent 710463c commit 9b41377
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 1 deletion.
1 change: 1 addition & 0 deletions packages/cactus-plugin-keychain-memory/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"axios": "1.6.0",
"express": "4.19.2",
"prom-client": "13.2.0",
"rxjs": "7.8.1",
"uuid": "9.0.1"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import { HasKeychainEntryV1Endpoint } from "./web-services/has-keychain-entry-en
import { DefaultService } from "./generated/crpc/services/default_service_connect";
import { KeychainMemoryCrpcSvcOpenApi } from "./crpc-services/keychain-memory-crpc-svc-openapi";
import { ServiceType } from "@bufbuild/protobuf";
import { Observable, ReplaySubject, Subject } from "rxjs";

export interface IPluginKeychainMemoryOptions extends ICactusPluginOptions {
logLevel?: LogLevelDesc;
backend?: Map<string, string>;
keychainId: string;
prometheusExporter?: PrometheusExporter;
readonly observabilityBufferSize?: number;
readonly observabilityTtlSeconds?: number;
}

export class PluginKeychainMemory
Expand All @@ -42,6 +45,22 @@ export class PluginKeychainMemory
private readonly backend: Map<string, string>;
private readonly log: Logger;
private readonly instanceId: string;
private readonly observabilityBufferSize: number;
private readonly observabilityTtlSeconds: number;

private readonly getSubject: Subject<{
readonly key: string;
readonly value: string;
}>;
private readonly setSubject: Subject<{
readonly key: string;
readonly value: string;
}>;
private readonly hasSubject: Subject<{
readonly key: string;
readonly isPresent: boolean;
}>;
private readonly deleteSubject: Subject<{ readonly key: string }>;
private endpoints: IWebServiceEndpoint[] | undefined;
public prometheusExporter: PrometheusExporter;

Expand All @@ -56,13 +75,19 @@ export class PluginKeychainMemory
Checks.truthy(opts.instanceId, `${fnTag} options.instanceId`);
Checks.nonBlankString(opts.keychainId, `${fnTag} options.keychainId`);

this.observabilityBufferSize = opts.observabilityBufferSize || 1;
this.observabilityTtlSeconds = opts.observabilityTtlSeconds || 1;

this.backend = opts.backend || new Map();
Checks.truthy(this.backend, `${fnTag} arg options.backend`);

const level = this.opts.logLevel || "INFO";
const label = this.className;
this.log = LoggerProvider.getOrCreate({ level, label });

this.log.debug("observabilityBufferSize=%o", this.observabilityBufferSize);
this.log.debug("observabilityTtlSeconds=%o", this.observabilityTtlSeconds);

this.instanceId = this.opts.instanceId;
this.prometheusExporter =
opts.prometheusExporter ||
Expand All @@ -78,6 +103,23 @@ export class PluginKeychainMemory
`Never use ${this.className} in production. ` +
`It does not support encryption. It stores everything in plain text.`,
);

this.getSubject = new ReplaySubject(
this.observabilityBufferSize,
this.observabilityTtlSeconds,
);
this.setSubject = new ReplaySubject(
this.observabilityBufferSize,
this.observabilityTtlSeconds,
);
this.hasSubject = new ReplaySubject(
this.observabilityBufferSize,
this.observabilityTtlSeconds,
);
this.deleteSubject = new ReplaySubject(
this.observabilityBufferSize,
this.observabilityTtlSeconds,
);
}

public getOpenApiSpec(): unknown {
Expand Down Expand Up @@ -181,23 +223,52 @@ export class PluginKeychainMemory
async get(key: string): Promise<string> {
const value = this.backend.get(key);
if (value) {
this.getSubject.next({ key, value });
return value;
} else {
throw new Error(`Keychain entry for "${key}" not found.`);
}
}

public observeGet(): Observable<{ readonly key: string }> {
return this.getSubject.asObservable();
}

async has(key: string): Promise<boolean> {
return this.backend.has(key);
const isPresent = this.backend.has(key);
this.hasSubject.next({ key, isPresent });
return isPresent;
}

public observeHas(): Observable<{
readonly key: string;
readonly isPresent: boolean;
}> {
return this.hasSubject.asObservable();
}

async set(key: string, value: string): Promise<void> {
this.backend.set(key, value);
this.setSubject.next({ key, value });
this.prometheusExporter.setTotalKeyCounter(this.backend.size);
}

public observeSet(): Observable<{
readonly key: string;
readonly value: string;
}> {
return this.setSubject.asObservable();
}

async delete(key: string): Promise<void> {
this.backend.delete(key);
this.deleteSubject.next({ key });
this.prometheusExporter.setTotalKeyCounter(this.backend.size);
}

public observeDelete(): Observable<{
readonly key: string;
}> {
return this.deleteSubject.asObservable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import "jest-extended";
import { v4 as uuidV4 } from "uuid";

import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common";

import { PluginKeychainMemory } from "../../../main/typescript/public-api";

const logLevel: LogLevelDesc = "INFO";

describe("PluginKeychainMemory", () => {
const log = LoggerProvider.getOrCreate({
label: "plugin-keychain-memory-observability.test.ts",
level: logLevel,
});

test("can observe set operations", async () => {
const keychain = new PluginKeychainMemory({
instanceId: uuidV4(),
keychainId: uuidV4(),
logLevel,
});

let getCount = 0;
const stratedAt = new Date();

const taskPromise = new Promise<void>((resolve) => {
keychain.observeSet().subscribe({
next: (value) => {
getCount++;
log.debug("NEXT_SET: startedAt=%o value=%o", stratedAt, value);
if (getCount >= 5) {
resolve();
}
},
});

keychain.set("some-key-that-does-not-matter-1", uuidV4());
keychain.set("some-key-that-does-not-matter-2", uuidV4());
keychain.set("some-key-that-does-not-matter-3", uuidV4());
keychain.set("some-key-that-does-not-matter-4", uuidV4());
keychain.set("some-key-that-does-not-matter-5", uuidV4());
});
await expect(taskPromise).toResolve();
}, 500);

test("can observe set operations with buffer", async () => {
const keychain = new PluginKeychainMemory({
instanceId: uuidV4(),
keychainId: uuidV4(),
logLevel,
observabilityBufferSize: 5,
observabilityTtlSeconds: 1000,
});

let getCount = 0;
const stratedAt = new Date();

keychain.set("some-key-that-does-not-matter-1", uuidV4());
keychain.set("some-key-that-does-not-matter-2", uuidV4());
keychain.set("some-key-that-does-not-matter-3", uuidV4());
keychain.set("some-key-that-does-not-matter-4", uuidV4());
keychain.set("some-key-that-does-not-matter-5", uuidV4());

const taskPromise = new Promise<void>((resolve) => {
keychain.observeSet().subscribe({
next: (value) => {
getCount++;
log.debug("NEXT_SET_1: startedAt=%o value=%o", stratedAt, value);
},
});
keychain.observeSet().subscribe({
next: (value) => {
getCount++;
log.debug("NEXT_SET_2: startedAt=%o value=%o", stratedAt, value);
if (getCount >= 10) {
resolve();
}
},
});
});

await expect(taskPromise).toResolve();
}, 500);
});
1 change: 1 addition & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8457,6 +8457,7 @@ __metadata:
express: "npm:4.19.2"
npm-run-all2: "npm:6.1.2"
prom-client: "npm:13.2.0"
rxjs: "npm:7.8.1"
uuid: "npm:9.0.1"
languageName: unknown
linkType: soft
Expand Down

1 comment on commit 9b41377

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 0.05.

Benchmark suite Current: 9b41377 Previous: 8ff9b65 Ratio
plugin-ledger-connector-besu_HTTP_GET_getOpenApiSpecV1 725 ops/sec (±3.57%) 779 ops/sec (±2.52%) 1.07

This comment was automatically generated by workflow using github-action-benchmark.

CC: @petermetz

Please sign in to comment.