Skip to content

Commit

Permalink
refactor(experimental): add cluster level subscriptions API for library
Browse files Browse the repository at this point in the history
This change adds the cluster-level APIs to the main library's subscriptions.
  • Loading branch information
buffalojoec authored Feb 2, 2024
1 parent cbf8f38 commit 0e03ca9
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 30 deletions.
169 changes: 164 additions & 5 deletions packages/library/src/__typetests__/create-rpc-subscriptions.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,176 @@
import { SolanaRpcSubscriptions, SolanaRpcSubscriptionsUnstable } from '@solana/rpc-core';
import { createJsonSubscriptionRpc } from '@solana/rpc-transport';
import type { RpcSubscriptions } from '@solana/rpc-types';
import {
createWebSocketTransport,
IRpcWebSocketTransport,
IRpcWebSocketTransportDevnet,
IRpcWebSocketTransportMainnet,
IRpcWebSocketTransportTestnet,
RpcSubscriptionsDevnet,
RpcSubscriptionsMainnet,
RpcSubscriptionsTestnet,
} from '@solana/rpc-transport';
import { devnet, mainnet, RpcSubscriptions, testnet } from '@solana/rpc-types';

import { createSolanaRpcSubscriptions, createSolanaRpcSubscriptions_UNSTABLE } from '../rpc';
import { createDefaultRpcSubscriptionsTransport } from '../rpc-websocket-transport';

const config = null as unknown as Omit<Parameters<typeof createJsonSubscriptionRpc>[0], 'api'>;
// Creating default websocket transports

createSolanaRpcSubscriptions(config) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
const genericUrl = 'http://localhost:8899';
const devnetUrl = devnet('https://api.devnet.solana.com');
const testnetUrl = testnet('https://api.testnet.solana.com');
const mainnetUrl = mainnet('https://api.mainnet-beta.solana.com');

// No cluster specified should be generic `IRpcWebSocketTransport`
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransport;
//@ts-expect-error Should not be a devnet transport
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransportDevnet;
//@ts-expect-error Should not be a testnet transport
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransportTestnet;
//@ts-expect-error Should not be a mainnet transport
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransportMainnet;

// Devnet cluster should be `IRpcWebSocketTransportDevnet`
createDefaultRpcSubscriptionsTransport({ url: devnetUrl }) satisfies IRpcWebSocketTransportDevnet;
//@ts-expect-error Should not be a testnet transport
createDefaultRpcSubscriptionsTransport({ url: devnetUrl }) satisfies IRpcWebSocketTransportTestnet;
//@ts-expect-error Should not be a mainnet transport
createDefaultRpcSubscriptionsTransport({ url: devnetUrl }) satisfies IRpcWebSocketTransportMainnet;

// Testnet cluster should be `IRpcWebSocketTransportTestnet`
createDefaultRpcSubscriptionsTransport({ url: testnetUrl }) satisfies IRpcWebSocketTransportTestnet;
//@ts-expect-error Should not be a devnet transport
createDefaultRpcSubscriptionsTransport({ url: testnetUrl }) satisfies IRpcWebSocketTransportDevnet;
//@ts-expect-error Should not be a mainnet transport
createDefaultRpcSubscriptionsTransport({ url: testnetUrl }) satisfies IRpcWebSocketTransportMainnet;

// Mainnet cluster should be `IRpcWebSocketTransportMainnet`
createDefaultRpcSubscriptionsTransport({ url: mainnetUrl }) satisfies IRpcWebSocketTransportMainnet;
//@ts-expect-error Should not be a devnet transport
createDefaultRpcSubscriptionsTransport({ url: mainnetUrl }) satisfies IRpcWebSocketTransportDevnet;
//@ts-expect-error Should not be a testnet transport
createDefaultRpcSubscriptionsTransport({ url: mainnetUrl }) satisfies IRpcWebSocketTransportTestnet;

// Creating JSON Subscription RPC clients

const genericWebSocketTransport = createWebSocketTransport({
sendBufferHighWatermark: 0,
url: genericUrl,
});
const devnetWebSocketTransport = createWebSocketTransport({
sendBufferHighWatermark: 0,
url: devnetUrl,
});
const testnetWebSocketTransport = createWebSocketTransport({
sendBufferHighWatermark: 0,
url: testnetUrl,
});
const mainnetWebSocketTransport = createWebSocketTransport({
sendBufferHighWatermark: 0,
url: mainnetUrl,
});

// Checking stable vs unstable subscriptions

createSolanaRpcSubscriptions({
transport: genericWebSocketTransport,
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
// @ts-expect-error Should not have unstable subscriptions
createSolanaRpcSubscriptions(config) satisfies RpcSubscriptions<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;

createSolanaRpcSubscriptions_UNSTABLE(config) satisfies RpcSubscriptions<
createSolanaRpcSubscriptions_UNSTABLE({ transport: genericWebSocketTransport }) satisfies RpcSubscriptions<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;

// Checking cluster-level subscriptions API

// No cluster specified should be generic `RpcSubscriptions`
createSolanaRpcSubscriptions({
transport: genericWebSocketTransport,
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: genericWebSocketTransport,
//@ts-expect-error Should not be a devnet RPC
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: genericWebSocketTransport,
//@ts-expect-error Should not be a testnet RPC
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: genericWebSocketTransport,
//@ts-expect-error Should not be a mainnet RPC
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;

// Devnet cluster should be `RpcSubscriptionsDevnet`
createSolanaRpcSubscriptions({
transport: devnetWebSocketTransport,
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: devnetWebSocketTransport,
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: devnetWebSocketTransport,
//@ts-expect-error Should not be a testnet RPC
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: devnetWebSocketTransport,
//@ts-expect-error Should not be a mainnet RPC
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;
//@ts-expect-error Should not have unstable subscriptions
createSolanaRpcSubscriptions({ transport: devnetWebSocketTransport }) satisfies RpcSubscriptionsDevnet<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;
// Unstable methods present with proper initializer
createSolanaRpcSubscriptions_UNSTABLE({ transport: devnetWebSocketTransport }) satisfies RpcSubscriptionsDevnet<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;

// Testnet cluster should be `RpcSubscriptionsTestnet`
createSolanaRpcSubscriptions({
transport: testnetWebSocketTransport,
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: testnetWebSocketTransport,
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: testnetWebSocketTransport,
//@ts-expect-error Should not be a devnet RPC
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: testnetWebSocketTransport,
//@ts-expect-error Should not be a mainnet RPC
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;
//@ts-expect-error Should not have unstable subscriptions
createSolanaRpcSubscriptions({ transport: testnetWebSocketTransport }) satisfies RpcSubscriptionsTestnet<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;
// Unstable methods present with proper initializer
createSolanaRpcSubscriptions_UNSTABLE({ transport: testnetWebSocketTransport }) satisfies RpcSubscriptionsTestnet<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;

// Mainnet cluster should be `RpcSubscriptionsMainnet`
createSolanaRpcSubscriptions({
transport: mainnetWebSocketTransport,
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: mainnetWebSocketTransport,
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: mainnetWebSocketTransport,
//@ts-expect-error Should not be a devnet RPC
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
createSolanaRpcSubscriptions({
transport: mainnetWebSocketTransport,
//@ts-expect-error Should not be a testnet RPC
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
//@ts-expect-error Should not have unstable subscriptions
createSolanaRpcSubscriptions({ transport: mainnetWebSocketTransport }) satisfies RpcSubscriptionsMainnet<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;
// Unstable methods present with proper initializer
createSolanaRpcSubscriptions_UNSTABLE({ transport: mainnetWebSocketTransport }) satisfies RpcSubscriptionsMainnet<
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
>;
13 changes: 8 additions & 5 deletions packages/library/src/rpc-websocket-autopinger.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import type { IRpcWebSocketTransport } from '@solana/rpc-transport';

type Config = Readonly<{
type Config<TTransport extends IRpcWebSocketTransport> = Readonly<{
intervalMs: number;
transport: IRpcWebSocketTransport;
transport: TTransport;
}>;

const PING_PAYLOAD = {
jsonrpc: '2.0',
method: 'ping',
} as const;

export function getWebSocketTransportWithAutoping({ intervalMs, transport }: Config): IRpcWebSocketTransport {
export function getWebSocketTransportWithAutoping<TTransport extends IRpcWebSocketTransport>({
intervalMs,
transport,
}: Config<TTransport>): TTransport {
const pingableConnections = new Map<
Awaited<ReturnType<IRpcWebSocketTransport>>,
Awaited<ReturnType<IRpcWebSocketTransport>>
>();
return async (...args) => {
return (async (...args) => {
const connection = await transport(...args);
let intervalId: number | undefined;
function sendPing() {
Expand Down Expand Up @@ -72,5 +75,5 @@ export function getWebSocketTransportWithAutoping({ intervalMs, transport }: Con
}
}
return pingableConnections.get(connection)!;
};
}) as TTransport;
}
11 changes: 7 additions & 4 deletions packages/library/src/rpc-websocket-connection-sharding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ import type { IRpcWebSocketTransport } from '@solana/rpc-transport';

import { getCachedAbortableIterableFactory } from './cached-abortable-iterable';

type Config = Readonly<{
type Config<TTransport extends IRpcWebSocketTransport> = Readonly<{
/**
* You might like to open more subscriptions per connection than your RPC provider allows for.
* Using the initial payload as input, return a shard key from this method to assign
* subscriptions to separate connections. One socket will be opened per shard key.
*/
getShard?: (payload: unknown) => string | symbol;
transport: IRpcWebSocketTransport;
transport: TTransport;
}>;

const NULL_SHARD_CACHE_KEY = Symbol(
__DEV__ ? 'Cache key to use when there is no connection sharding strategy' : undefined,
);

export function getWebSocketTransportWithConnectionSharding({ getShard, transport }: Config): IRpcWebSocketTransport {
export function getWebSocketTransportWithConnectionSharding<TTransport extends IRpcWebSocketTransport>({
getShard,
transport,
}: Config<TTransport>): TTransport {
return getCachedAbortableIterableFactory({
getAbortSignalFromInputArgs: ({ signal }) => signal,
getCacheEntryMissingError(shardKey) {
Expand All @@ -30,5 +33,5 @@ export function getWebSocketTransportWithConnectionSharding({ getShard, transpor
...config,
signal: abortSignal,
}),
});
}) as TTransport;
}
15 changes: 10 additions & 5 deletions packages/library/src/rpc-websocket-transport.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { pipe } from '@solana/functional';
import { createWebSocketTransport, type IRpcWebSocketTransport } from '@solana/rpc-transport';
import { createWebSocketTransport } from '@solana/rpc-transport';
import { ClusterUrl } from '@solana/rpc-types';

import { getWebSocketTransportWithAutoping } from './rpc-websocket-autopinger';
import { getWebSocketTransportWithConnectionSharding } from './rpc-websocket-connection-sharding';

export function createDefaultRpcSubscriptionsTransport(
config: Omit<Parameters<typeof createWebSocketTransport>[0], 'sendBufferHighWatermark'> & {
type Config<TClusterUrl extends ClusterUrl> = Readonly<{
url: TClusterUrl;
}>;

export function createDefaultRpcSubscriptionsTransport<TClusterUrl extends ClusterUrl>(
config: Config<TClusterUrl> & {
/**
* You might like to open more subscriptions per connection than your RPC provider allows
* for. Using the initial payload as input, return a shard key from this method to assign
Expand All @@ -15,10 +20,10 @@ export function createDefaultRpcSubscriptionsTransport(
intervalMs?: number;
sendBufferHighWatermark?: number;
},
): IRpcWebSocketTransport {
) {
const { getShard, intervalMs, ...rest } = config;
return pipe(
createWebSocketTransport({
createWebSocketTransport<TClusterUrl>({
...rest,
sendBufferHighWatermark:
config.sendBufferHighWatermark ??
Expand Down
35 changes: 24 additions & 11 deletions packages/library/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ import {
SolanaRpcSubscriptions,
SolanaRpcSubscriptionsUnstable,
} from '@solana/rpc-core';
import { createJsonRpc, createJsonSubscriptionRpc, IRpcTransport, type RpcFromTransport } from '@solana/rpc-transport';
import { IRpcTransportWithCluster } from '@solana/rpc-transport/dist/types/transports/transport-types';
import type { RpcSubscriptions } from '@solana/rpc-types';
import {
createJsonRpc,
createJsonSubscriptionRpc,
IRpcTransport,
IRpcTransportWithCluster,
IRpcWebSocketTransport,
IRpcWebSocketTransportWithCluster,
RpcFromTransport,
RpcSubscriptionsFromTransport,
} from '@solana/rpc-transport';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import fastStableStringify from 'fast-stable-stringify';
Expand All @@ -31,9 +38,13 @@ export function createSolanaRpc<TTransport extends IRpcTransport | IRpcTransport
}) as RpcFromTransport<SolanaRpcMethodsFromTransport<TTransport>, TTransport>;
}

export function createSolanaRpcSubscriptions(
config: Omit<Parameters<typeof createJsonSubscriptionRpc>[0], 'api'>,
): RpcSubscriptions<SolanaRpcSubscriptions> {
type RpcSubscriptionsConfig<TTransport extends IRpcWebSocketTransport | IRpcWebSocketTransportWithCluster> = Readonly<{
transport: TTransport;
}>;

export function createSolanaRpcSubscriptions<
TTransport extends IRpcWebSocketTransport | IRpcWebSocketTransportWithCluster,
>(config: RpcSubscriptionsConfig<TTransport>): RpcSubscriptionsFromTransport<SolanaRpcSubscriptions, TTransport> {
return pipe(
createJsonSubscriptionRpc({
...config,
Expand All @@ -44,14 +55,16 @@ export function createSolanaRpcSubscriptions(
getDeduplicationKey: (...args) => fastStableStringify(args),
rpcSubscriptions,
}),
);
) as RpcSubscriptionsFromTransport<SolanaRpcSubscriptions, TTransport>;
}

export function createSolanaRpcSubscriptions_UNSTABLE(
config: Omit<Parameters<typeof createJsonSubscriptionRpc>[0], 'api'>,
): RpcSubscriptions<SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable> {
export function createSolanaRpcSubscriptions_UNSTABLE<
TTransport extends IRpcWebSocketTransport | IRpcWebSocketTransportWithCluster,
>(
config: RpcSubscriptionsConfig<TTransport>,
): RpcSubscriptionsFromTransport<SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable, TTransport> {
return createJsonSubscriptionRpc({
...config,
api: createSolanaRpcSubscriptionsApi_UNSTABLE(DEFAULT_RPC_CONFIG),
});
}) as RpcSubscriptionsFromTransport<SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable, TTransport>;
}

0 comments on commit 0e03ca9

Please sign in to comment.