Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Fix dropped events issue in Order-watcher and Contract-wrappers subscriptions #1080

Merged
merged 16 commits into from
Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions packages/contract-wrappers/CHANGELOG.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
[
{
"timestamp": 1537875740,
"version": "2.0.0",
"changes": [
{
"note":
"Fixes dropped events in subscriptions by fetching logs by blockHash instead of blockNumber. Support for fetching by blockHash was added in Geth > v1.8.13 and Parity > v2.1.0. Infura works too.",
"pr": 1080
},
{
"note":
"Fix misunderstanding about blockstream interface callbacks and pass the raw JSON RPC responses to it",
"pr": 1080
}
]
},
{
"version": "1.0.5",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537875740
},
{
"timestamp": 1537541580,
"version": "1.0.4",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537541580
},
{
"version": "1.0.3",
Expand Down
2 changes: 1 addition & 1 deletion packages/contract-wrappers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
"@0xproject/utils": "^1.0.10",
"@0xproject/web3-wrapper": "^3.0.0",
"ethereum-types": "^1.0.7",
"ethereumjs-blockstream": "5.0.0",
"ethereumjs-blockstream": "6.0.0",
"ethereumjs-util": "^5.1.1",
"ethers": "3.0.22",
"js-sha3": "^0.7.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { AbiDecoder, intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
import { marshaller, Web3Wrapper } from '@0xproject/web3-wrapper';
import {
BlockParamLiteral,
BlockWithoutTransactionData,
ContractAbi,
ContractArtifact,
FilterObject,
LogEntry,
LogWithDecodedArgs,
RawLog,
RawLogEntry,
} from 'ethereum-types';
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';
Expand Down Expand Up @@ -158,7 +158,8 @@ export abstract class ContractWrapper {
return addressIfExists;
}
}
private _onLogStateChanged<ArgsType extends ContractEventArgs>(isRemoved: boolean, log: LogEntry): void {
private _onLogStateChanged<ArgsType extends ContractEventArgs>(isRemoved: boolean, rawLog: RawLogEntry): void {
const log: LogEntry = marshaller.unmarshalLog(rawLog);
_.forEach(this._filters, (filter: FilterObject, filterToken: string) => {
if (filterUtils.matchesFilter(log, filter)) {
const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs<ArgsType>;
Expand All @@ -175,8 +176,8 @@ export abstract class ContractWrapper {
throw new Error(ContractWrappersError.SubscriptionAlreadyPresent);
}
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
this._getBlockOrNullAsync.bind(this),
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
this._blockstreamGetBlockOrNullAsync.bind(this),
this._blockstreamGetLogsAsync.bind(this),
ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose),
);
const catchAllLogFilter = {};
Expand All @@ -196,12 +197,30 @@ export abstract class ContractWrapper {
);
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _getBlockOrNullAsync(): Promise<BlockWithoutTransactionData | null> {
const blockIfExists = await this._web3Wrapper.getBlockIfExistsAsync.bind(this._web3Wrapper);
if (_.isUndefined(blockIfExists)) {
return null;
}
return blockIfExists;
private async _blockstreamGetBlockOrNullAsync(hash: string): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByHash',
params: [hash, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLatestBlockOrNullAsync(): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByNumber',
params: [BlockParamLiteral.Latest, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLogsAsync(filterOptions: FilterObject): Promise<RawLogEntry[]> {
const logs = await this._web3Wrapper.sendRawPayloadAsync<RawLogEntry[]>({
method: 'eth_getLogs',
params: [filterOptions],
});
return logs as RawLogEntry[];
}
// HACK: This should be a package-scoped method (which doesn't exist in TS)
// We don't want this method available in the public interface for all classes
Expand All @@ -221,14 +240,14 @@ export abstract class ContractWrapper {
delete this._blockAndLogStreamerIfExists;
}
private async _reconcileBlockAsync(): Promise<void> {
const latestBlockIfExists = await this._web3Wrapper.getBlockIfExistsAsync(BlockParamLiteral.Latest);
if (_.isUndefined(latestBlockIfExists)) {
const latestBlockOrNull = await this._blockstreamGetLatestBlockOrNullAsync();
if (_.isNull(latestBlockOrNull)) {
return; // noop
}
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlockIfExists as any) as Block);
await this._blockAndLogStreamerIfExists.reconcileNewBlock(latestBlockOrNull);
}
}
}
1 change: 1 addition & 0 deletions packages/ethereum-types/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export interface CallData extends CallTxDataBase {
export interface FilterObject {
fromBlock?: number | string;
toBlock?: number | string;
blockHash?: string;
address?: string;
topics?: LogTopic[];
}
Expand Down
23 changes: 19 additions & 4 deletions packages/order-watcher/CHANGELOG.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
[
{
"timestamp": 1537875740,
"version": "2.0.0",
"changes": [
{
"note":
"Fixes dropped events issue by fetching logs by blockHash instead of blockNumber. Support for fetching by blockHash was added in Geth > v1.8.13 and Parity > v2.1.0. Infura works too.",
"pr": 1080
},
{
"note":
"Fix misunderstanding about blockstream interface callbacks and pass the raw JSON RPC responses to it",
"pr": 1080
}
]
},
{
"version": "1.0.5",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537875740
},
{
"timestamp": 1537541580,
"version": "1.0.4",
"changes": [
{
"note": "Dependencies updated"
}
]
],
"timestamp": 1537541580
},
{
"version": "1.0.3",
Expand Down
2 changes: 1 addition & 1 deletion packages/order-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
"@0xproject/web3-wrapper": "^3.0.0",
"bintrees": "^1.0.2",
"ethereum-types": "^1.0.7",
"ethereumjs-blockstream": "5.0.0",
"ethereumjs-blockstream": "6.0.0",
"ethers": "3.0.22",
"lodash": "^4.17.5"
},
Expand Down
49 changes: 33 additions & 16 deletions packages/order-watcher/src/order_watcher/event_watcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
import { BlockParamLiteral, BlockWithoutTransactionData, LogEntry, Provider } from 'ethereum-types';
import { marshaller, Web3Wrapper } from '@0xproject/web3-wrapper';
import { BlockParamLiteral, FilterObject, LogEntry, Provider, RawLogEntry } from 'ethereum-types';
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';

Expand All @@ -20,7 +20,6 @@ enum LogEventState {
*/
export class EventWatcher {
private readonly _web3Wrapper: Web3Wrapper;
private readonly _stateLayer: BlockParamLiteral;
private readonly _isVerbose: boolean;
private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined;
private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer;
Expand All @@ -35,7 +34,6 @@ export class EventWatcher {
) {
this._isVerbose = isVerbose;
this._web3Wrapper = new Web3Wrapper(provider);
this._stateLayer = stateLayer;
this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs)
? DEFAULT_EVENT_POLLING_INTERVAL_MS
: pollingIntervalIfExistsMs;
Expand All @@ -62,8 +60,8 @@ export class EventWatcher {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
this._getBlockOrNullAsync.bind(this),
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
this._blockstreamGetBlockOrNullAsync.bind(this),
this._blockstreamGetLogsAsync.bind(this),
this._onBlockAndLogStreamerError.bind(this),
);
const catchAllLogFilter = {};
Expand All @@ -83,12 +81,30 @@ export class EventWatcher {
);
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _getBlockOrNullAsync(): Promise<BlockWithoutTransactionData | null> {
const blockIfExists = await this._web3Wrapper.getBlockIfExistsAsync.bind(this._web3Wrapper);
if (_.isUndefined(blockIfExists)) {
return null;
}
return blockIfExists;
private async _blockstreamGetBlockOrNullAsync(hash: string): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByHash',
params: [hash, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLatestBlockOrNullAsync(): Promise<Block | null> {
const shouldIncludeTransactionData = false;
const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
method: 'eth_getBlockByNumber',
params: [BlockParamLiteral.Latest, shouldIncludeTransactionData],
});
return blockOrNull;
}
// This method only exists in order to comply with the expected interface of Blockstream's constructor
private async _blockstreamGetLogsAsync(filterOptions: FilterObject): Promise<RawLogEntry[]> {
const logs = await this._web3Wrapper.sendRawPayloadAsync<RawLogEntry[]>({
method: 'eth_getLogs',
params: [filterOptions],
});
return logs as RawLogEntry[];
}
private _stopBlockAndLogStream(): void {
if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
Expand All @@ -103,19 +119,20 @@ export class EventWatcher {
private async _onLogStateChangedAsync(
callback: EventWatcherCallback,
isRemoved: boolean,
log: LogEntry,
rawLog: RawLogEntry,
): Promise<void> {
const log: LogEntry = marshaller.unmarshalLog(rawLog);
await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
}
private async _reconcileBlockAsync(): Promise<void> {
const latestBlockIfExists = await this._web3Wrapper.getBlockIfExistsAsync(this._stateLayer);
if (_.isUndefined(latestBlockIfExists)) {
const latestBlockOrNull = await this._blockstreamGetLatestBlockOrNullAsync();
if (_.isNull(latestBlockOrNull)) {
return; // noop
}
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlockIfExists as any) as Block);
await this._blockAndLogStreamerIfExists.reconcileNewBlock(latestBlockOrNull);
}
}
private async _emitDifferencesAsync(
Expand Down
5 changes: 5 additions & 0 deletions packages/web3-wrapper/CHANGELOG.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
"note":
"Rename `getBlockAsync` to `getBlockIfExistsAsync` and rather then throw if the requested block wasn't found, return undefined.",
"pr": 1082
},
{
"note":
"Expose `sendRawPayloadAsync` so one can easily extend `Web3Wrapper` with their own custom JSON RPC calls",
"pr": 1080
}
],
"timestamp": 1537875740
Expand Down
Loading