Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Commit

Permalink
Merge pull request #1080 from 0xProject/upgradeBlockstream
Browse files Browse the repository at this point in the history
Fix dropped events issue in Order-watcher and Contract-wrappers subscriptions
  • Loading branch information
fabioberger authored Sep 25, 2018
2 parents 78ef98c + 977d55c commit 7570f3d
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 75 deletions.
23 changes: 19 additions & 4 deletions packages/contract-wrappers/CHANGELOG.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
[
{
"timestamp": 1537875740,
"version": "2.0.0",
"changes": [
{
"note":
"Fixes dropped events in subscriptions by fetching logs by blockHash instead of blockNumber. Support for fetching by blockHash was added in Geth > v1.8.13 and Parity > v2.1.0. Infura works too.",
"pr": 1080
},
{
"note":
"Fix misunderstanding about blockstream interface callbacks and pass the raw JSON RPC responses to it",
"pr": 1080
}
]
},
{
"version": "1.0.5",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537875740
},
{
"timestamp": 1537541580,
"version": "1.0.4",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537541580
},
{
"version": "1.0.3",
Expand Down
2 changes: 1 addition & 1 deletion packages/contract-wrappers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
"@0xproject/utils": "^1.0.10",
"@0xproject/web3-wrapper": "^3.0.0",
"ethereum-types": "^1.0.7",
"ethereumjs-blockstream": "5.0.0",
"ethereumjs-blockstream": "6.0.0",
"ethereumjs-util": "^5.1.1",
"ethers": "3.0.22",
"js-sha3": "^0.7.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { AbiDecoder, intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
import { marshaller, Web3Wrapper } from '@0xproject/web3-wrapper';
import {
BlockParamLiteral,
BlockWithoutTransactionData,
ContractAbi,
ContractArtifact,
FilterObject,
LogEntry,
LogWithDecodedArgs,
RawLog,
RawLogEntry,
} from 'ethereum-types';
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';
Expand Down Expand Up @@ -158,7 +158,8 @@ export abstract class ContractWrapper {
return addressIfExists;
}
}
private _onLogStateChanged<ArgsType extends ContractEventArgs>(isRemoved: boolean, log: LogEntry): void {
private _onLogStateChanged<ArgsType extends ContractEventArgs>(isRemoved: boolean, rawLog: RawLogEntry): void {
const log: LogEntry = marshaller.unmarshalLog(rawLog);
_.forEach(this._filters, (filter: FilterObject, filterToken: string) => {
if (filterUtils.matchesFilter(log, filter)) {
const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs<ArgsType>;
Expand All @@ -175,8 +176,8 @@ export abstract class ContractWrapper {
throw new Error(ContractWrappersError.SubscriptionAlreadyPresent);
}
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
this._getBlockOrNullAsync.bind(this),
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
this._blockstreamGetBlockOrNullAsync.bind(this),
this._blockstreamGetLogsAsync.bind(this),
ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose),
);
const catchAllLogFilter = {};
Expand All @@ -196,12 +197,30 @@ export abstract class ContractWrapper {
);
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _getBlockOrNullAsync(): Promise<BlockWithoutTransactionData | null> {
const blockIfExists = await this._web3Wrapper.getBlockIfExistsAsync.bind(this._web3Wrapper);
if (_.isUndefined(blockIfExists)) {
return null;
}
return blockIfExists;
private async _blockstreamGetBlockOrNullAsync(hash: string): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByHash',
params: [hash, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLatestBlockOrNullAsync(): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByNumber',
params: [BlockParamLiteral.Latest, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLogsAsync(filterOptions: FilterObject): Promise<RawLogEntry[]> {
const logs = await this._web3Wrapper.sendRawPayloadAsync<RawLogEntry[]>({
method: 'eth_getLogs',
params: [filterOptions],
});
return logs as RawLogEntry[];
}
// HACK: This should be a package-scoped method (which doesn't exist in TS)
// We don't want this method available in the public interface for all classes
Expand All @@ -221,14 +240,14 @@ export abstract class ContractWrapper {
delete this._blockAndLogStreamerIfExists;
}
private async _reconcileBlockAsync(): Promise<void> {
const latestBlockIfExists = await this._web3Wrapper.getBlockIfExistsAsync(BlockParamLiteral.Latest);
if (_.isUndefined(latestBlockIfExists)) {
const latestBlockOrNull = await this._blockstreamGetLatestBlockOrNullAsync();
if (_.isNull(latestBlockOrNull)) {
return; // noop
}
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlockIfExists as any) as Block);
await this._blockAndLogStreamerIfExists.reconcileNewBlock(latestBlockOrNull);
}
}
}
1 change: 1 addition & 0 deletions packages/ethereum-types/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export interface CallData extends CallTxDataBase {
export interface FilterObject {
fromBlock?: number | string;
toBlock?: number | string;
blockHash?: string;
address?: string;
topics?: LogTopic[];
}
Expand Down
23 changes: 19 additions & 4 deletions packages/order-watcher/CHANGELOG.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
[
{
"timestamp": 1537875740,
"version": "2.0.0",
"changes": [
{
"note":
"Fixes dropped events issue by fetching logs by blockHash instead of blockNumber. Support for fetching by blockHash was added in Geth > v1.8.13 and Parity > v2.1.0. Infura works too.",
"pr": 1080
},
{
"note":
"Fix misunderstanding about blockstream interface callbacks and pass the raw JSON RPC responses to it",
"pr": 1080
}
]
},
{
"version": "1.0.5",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537875740
},
{
"timestamp": 1537541580,
"version": "1.0.4",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537541580
},
{
"version": "1.0.3",
Expand Down
2 changes: 1 addition & 1 deletion packages/order-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
"@0xproject/web3-wrapper": "^3.0.0",
"bintrees": "^1.0.2",
"ethereum-types": "^1.0.7",
"ethereumjs-blockstream": "5.0.0",
"ethereumjs-blockstream": "6.0.0",
"ethers": "3.0.22",
"lodash": "^4.17.5"
},
Expand Down
49 changes: 33 additions & 16 deletions packages/order-watcher/src/order_watcher/event_watcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
import { BlockParamLiteral, BlockWithoutTransactionData, LogEntry, Provider } from 'ethereum-types';
import { marshaller, Web3Wrapper } from '@0xproject/web3-wrapper';
import { BlockParamLiteral, FilterObject, LogEntry, Provider, RawLogEntry } from 'ethereum-types';
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';

Expand All @@ -20,7 +20,6 @@ enum LogEventState {
*/
export class EventWatcher {
private readonly _web3Wrapper: Web3Wrapper;
private readonly _stateLayer: BlockParamLiteral;
private readonly _isVerbose: boolean;
private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined;
private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer;
Expand All @@ -35,7 +34,6 @@ export class EventWatcher {
) {
this._isVerbose = isVerbose;
this._web3Wrapper = new Web3Wrapper(provider);
this._stateLayer = stateLayer;
this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs)
? DEFAULT_EVENT_POLLING_INTERVAL_MS
: pollingIntervalIfExistsMs;
Expand All @@ -62,8 +60,8 @@ export class EventWatcher {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
this._getBlockOrNullAsync.bind(this),
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
this._blockstreamGetBlockOrNullAsync.bind(this),
this._blockstreamGetLogsAsync.bind(this),
this._onBlockAndLogStreamerError.bind(this),
);
const catchAllLogFilter = {};
Expand All @@ -83,12 +81,30 @@ export class EventWatcher {
);
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _getBlockOrNullAsync(): Promise<BlockWithoutTransactionData | null> {
const blockIfExists = await this._web3Wrapper.getBlockIfExistsAsync.bind(this._web3Wrapper);
if (_.isUndefined(blockIfExists)) {
return null;
}
return blockIfExists;
private async _blockstreamGetBlockOrNullAsync(hash: string): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByHash',
params: [hash, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLatestBlockOrNullAsync(): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByNumber',
params: [BlockParamLiteral.Latest, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLogsAsync(filterOptions: FilterObject): Promise<RawLogEntry[]> {
const logs = await this._web3Wrapper.sendRawPayloadAsync<RawLogEntry[]>({
method: 'eth_getLogs',
params: [filterOptions],
});
return logs as RawLogEntry[];
}
private _stopBlockAndLogStream(): void {
if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
Expand All @@ -103,19 +119,20 @@ export class EventWatcher {
private async _onLogStateChangedAsync(
callback: EventWatcherCallback,
isRemoved: boolean,
log: LogEntry,
rawLog: RawLogEntry,
): Promise<void> {
const log: LogEntry = marshaller.unmarshalLog(rawLog);
await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
}
private async _reconcileBlockAsync(): Promise<void> {
const latestBlockIfExists = await this._web3Wrapper.getBlockIfExistsAsync(this._stateLayer);
if (_.isUndefined(latestBlockIfExists)) {
const latestBlockOrNull = await this._blockstreamGetLatestBlockOrNullAsync();
if (_.isNull(latestBlockOrNull)) {
return; // noop
}
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlockIfExists as any) as Block);
await this._blockAndLogStreamerIfExists.reconcileNewBlock(latestBlockOrNull);
}
}
private async _emitDifferencesAsync(
Expand Down
5 changes: 5 additions & 0 deletions packages/web3-wrapper/CHANGELOG.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
"note":
"Rename `getBlockAsync` to `getBlockIfExistsAsync` and rather then throw if the requested block wasn't found, return undefined.",
"pr": 1082
},
{
"note":
"Expose `sendRawPayloadAsync` so one can easily extend `Web3Wrapper` with their own custom JSON RPC calls",
"pr": 1080
}
],
"timestamp": 1537875740
Expand Down
Loading

0 comments on commit 7570f3d

Please sign in to comment.