Skip to content

Commit

Permalink
Merge pull request #816 from ChainSafe/tuyen/refactor-req-resp
Browse files Browse the repository at this point in the history
Refactor req/resp to better handle different method types
  • Loading branch information
wemeetagain authored Apr 27, 2020
2 parents 59600f8 + 6fac4c3 commit d112ae3
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 57 deletions.
43 changes: 43 additions & 0 deletions packages/lodestar/src/constants/network.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// gossip

import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Type} from "@chainsafe/ssz";

export const ATTESTATION_SUBNET_COUNT = 64;
export const ATTESTATION_PROPAGATION_SLOT_RANGE = 23;
export const MAXIMUM_GOSSIP_CLOCK_DISPARITY = 500;
Expand All @@ -16,6 +19,46 @@ export enum Method {
BeaconBlocksByRoot = "beacon_blocks_by_root",
}

export enum MethodResponseType {
SingleRespone = "SingleRespone",
NoResponse = "NoResponse",
Stream = "Stream",
}

export const Methods = {
[Method.Status]: {
requestSSZType: (config: IBeaconConfig) => config.types.Status,
responseSSZType: (config: IBeaconConfig) => config.types.Status,
responseType: MethodResponseType.SingleRespone
},
[Method.Goodbye]: {
requestSSZType: (config: IBeaconConfig) => config.types.Goodbye,
responseSSZType: (config: IBeaconConfig) => config.types.Goodbye,
responseType: MethodResponseType.NoResponse
},
[Method.Ping]: {
requestSSZType: (config: IBeaconConfig) => config.types.Ping,
responseSSZType: (config: IBeaconConfig) => config.types.Ping,
responseType: MethodResponseType.SingleRespone
},
[Method.Metadata]: {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
requestSSZType: (config: IBeaconConfig): Type<unknown> => undefined,
responseSSZType: (config: IBeaconConfig) => config.types.Metadata,
responseType: MethodResponseType.SingleRespone
},
[Method.BeaconBlocksByRange]: {
requestSSZType: (config: IBeaconConfig) => config.types.BeaconBlocksByRangeRequest,
responseSSZType: (config: IBeaconConfig) => config.types.SignedBeaconBlock,
responseType: MethodResponseType.Stream
},
[Method.BeaconBlocksByRoot]: {
requestSSZType: (config: IBeaconConfig) => config.types.BeaconBlocksByRootRequest,
responseSSZType: (config: IBeaconConfig) => config.types.SignedBeaconBlock,
responseType: MethodResponseType.Stream
}
};

export enum ReqRespEncoding {
SSZ = "ssz",
SSZ_SNAPPY = "ssz_snappy",
Expand Down
5 changes: 4 additions & 1 deletion packages/lodestar/src/network/encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ export class ReqRespEncoder {
}, data) as Buffer;
return this.writeLengthPrefixed(encodedPayload);
}

public decodeRequest(method: Method, data: Buffer): RequestBody {
if (!data) {
return undefined;
}
data = this.readLengthPrefixed(data);
const type = getRequestMethodSSZType(this.config, method);
//decoding is done backwards
Expand Down
31 changes: 15 additions & 16 deletions packages/lodestar/src/network/reqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import {
Status,
} from "@chainsafe/lodestar-types";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Method, ReqRespEncoding, RequestId, RESP_TIMEOUT, RpcErrorCode, TTFB_TIMEOUT,} from "../constants";
import {Method, ReqRespEncoding, RequestId, RESP_TIMEOUT, RpcErrorCode, TTFB_TIMEOUT} from "../constants";
import {ILogger} from "@chainsafe/lodestar-utils/lib/logger";
import {createResponseEvent, createRpcProtocol, randomRequestId} from "./util";
import {createResponseEvent, createRpcProtocol, randomRequestId, isRequestOnly, isRequestSingleChunk} from "./util";
import {IReqResp, ReqEventEmitter, RespEventEmitter, ResponseCallbackFn, ResponseChunk} from "./interface";
import {INetworkOptions} from "./options";
import PeerId from "peer-id";
Expand Down Expand Up @@ -131,7 +131,7 @@ export class ReqResp extends (EventEmitter as IReqEventEmitterClass) implements
}
public async goodbye(peerInfo: PeerInfo, request: Goodbye): Promise<void> {
try {
await this.sendRequest<Goodbye>(peerInfo, Method.Goodbye, request, true);
await this.sendRequest<Goodbye>(peerInfo, Method.Goodbye, request);
} catch (e) {
this.logger.warn("Failed to send goodbye request");
}
Expand Down Expand Up @@ -161,21 +161,20 @@ export class ReqResp extends (EventEmitter as IReqEventEmitterClass) implements
const getResponse = this.getResponse;
return (source: AsyncIterable<Buffer|BufferList>) => {
return (async function * () {
if (method === Method.Metadata) {
yield* getResponse(peerId, method, undefined);
return;
}
let data: Buffer;
// source is an array of 1 item or empty array
for await (const val of source) {
const data = Buffer.isBuffer(val) ? val : val.slice();
data = Buffer.isBuffer(val) ? val : val.slice();
yield* getResponse(peerId, method, data);
break;
return;
}
yield* getResponse(peerId, method, data);
})();
};
}

private getResponse = (peerId: PeerId, method: Method, data: Buffer): AsyncIterable<ResponseChunk> => {
const request = method === Method.Metadata ? undefined : this.encoder.decodeRequest(method, data);
const request = this.encoder.decodeRequest(method, data);
const requestId = randomRequestId();
this.logger.verbose(`${requestId} - receive ${method} request from ${peerId.toB58String()}`);
// eslint-disable-next-line
Expand All @@ -188,7 +187,7 @@ export class ReqResp extends (EventEmitter as IReqEventEmitterClass) implements
responseTimer = this.responseListener.waitForResponse(requestId, responseListenerFn);
this.emit("request", new PeerInfo(peerId), method, requestId, request);
});

return (async function * () {
const responseIter = await promise;
yield* responseIter;
Expand All @@ -199,8 +198,9 @@ export class ReqResp extends (EventEmitter as IReqEventEmitterClass) implements
peerInfo: PeerInfo,
method: Method,
body?: RequestBody,
requestOnly?: boolean
): Promise<T> {
const requestOnly = isRequestOnly(method);
const requestSingleChunk = isRequestSingleChunk(method);
return await new Promise((resolve, reject) => {
let responseTimer = setTimeout(() => reject(new RpcError(RpcErrorCode.ERR_RESP_TIMEOUT)), TTFB_TIMEOUT);
const renewTimer = (): void => {
Expand All @@ -220,13 +220,12 @@ export class ReqResp extends (EventEmitter as IReqEventEmitterClass) implements
responses.push(response);
}
cancelTimer();
if (!requestOnly && method === Method.Status && responses.length === 0) {
if (requestSingleChunk && responses.length === 0) {
// allow empty response for beacon blocks by range/root
reject(`No response returned for method ${method}`);
return;
}
const finalResponse =
[Method.Status, Method.Ping, Method.Metadata].includes(method) ? responses[0] : responses;
const finalResponse = requestSingleChunk ? responses[0] : responses;
this.logger.verbose(`receive ${method} response from ${peerInfo.id.toB58String()}`);
resolve(requestOnly? undefined : finalResponse as T);
});
Expand All @@ -243,7 +242,7 @@ export class ReqResp extends (EventEmitter as IReqEventEmitterClass) implements
body?: RequestBody,
): AsyncIterable<T> {
const {encoder, libp2p, logger} = this;

return (async function * () {
const protocol = createRpcProtocol(method, encoder.encoding);
const {stream} = await libp2p.dialProtocol(peerInfo, protocol) as {stream: Stream};
Expand Down
51 changes: 11 additions & 40 deletions packages/lodestar/src/network/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import PeerId from "peer-id";
import PeerInfo from "peer-info";
import {Type} from "@chainsafe/ssz";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Method, RequestId} from "../constants";
import {Method, RequestId, Methods, MethodResponseType} from "../constants";

// req/resp

Expand Down Expand Up @@ -56,48 +56,19 @@ export async function initializePeerInfo(peerId: PeerId, multiaddrs: string[]):
export function getRequestMethodSSZType(
config: IBeaconConfig, method: Method
): Type<any> {
let type: Type<any>;
switch (method) {
case Method.Status:
type = config.types.Status;
break;
case Method.Goodbye:
type = config.types.Goodbye;
break;
case Method.Ping:
type = config.types.Ping;
break;
case Method.BeaconBlocksByRange:
type = config.types.BeaconBlocksByRangeRequest;
break;
case Method.BeaconBlocksByRoot:
type = config.types.BeaconBlocksByRootRequest;
break;
}
return type;
return Methods[method].requestSSZType(config);
}

export function getResponseMethodSSZType(
config: IBeaconConfig, method: Method
): Type<any> {
let type: Type<any>;
switch (method) {
case Method.Status:
type = config.types.Status;
break;
case Method.Goodbye:
type = config.types.Goodbye;
break;
case Method.Ping:
type = config.types.Ping;
break;
case Method.Metadata:
type = config.types.Metadata;
break;
case Method.BeaconBlocksByRange:
case Method.BeaconBlocksByRoot:
type = config.types.SignedBeaconBlock;
break;
}
return type;
return Methods[method].responseSSZType(config);
}

export function isRequestOnly(method: Method): boolean {
return Methods[method].responseType === MethodResponseType.NoResponse;
}

export function isRequestSingleChunk(method: Method): boolean {
return Methods[method].responseType === MethodResponseType.SingleRespone;
}

0 comments on commit d112ae3

Please sign in to comment.