Skip to content

Commit

Permalink
Added initial WebSocketProvider (#141).
Browse files Browse the repository at this point in the history
  • Loading branch information
ricmoo committed Mar 12, 2020
1 parent fe3b3fa commit 117a5dd
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 47 deletions.
2 changes: 1 addition & 1 deletion packages/ethers/src.ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import * as ethers from "./ethers";

try {
const anyGlobal = (window as any);
const anyGlobal = ((window || { }) as any);

if (anyGlobal._ethers == null) {
anyGlobal._ethers = ethers;
Expand Down
4 changes: 3 additions & 1 deletion packages/providers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"author": "Richard Moore <[email protected]>",
"browser": {
"./ipc-provider": "./lib/browser-ipc-provider.js",
"ws": "./lib/browser-ws.js",
"net": "./lib/browser-net.js"
},
"dependencies": {
Expand All @@ -19,7 +20,8 @@
"@ethersproject/rlp": ">=5.0.0-beta.126",
"@ethersproject/strings": ">=5.0.0-beta.130",
"@ethersproject/transactions": ">=5.0.0-beta.128",
"@ethersproject/web": ">=5.0.0-beta.129"
"@ethersproject/web": ">=5.0.0-beta.129",
"ws": "7.2.3"
},
"description": "Ethereum Providers for ethers.",
"devDependencies": {
Expand Down
97 changes: 58 additions & 39 deletions packages/providers/src.ts/base-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ function getTime() {
* - transaction hash
*/

class Event {
export class Event {
readonly listener: Listener;
readonly once: boolean;
readonly tag: string;
Expand All @@ -123,6 +123,27 @@ class Event {
defineReadOnly(this, "once", once);
}

get type(): string {
return this.tag.split(":")[0]
}

get hash(): string {
const comps = this.tag.split(":");
if (comps[0] !== "tx") { return null; }
return comps[1];
}

get filter(): Filter {
const comps = this.tag.split(":");
if (comps[0] !== "filter") { return null; }
const filter = {
address: comps[1],
topics: deserializeTopics(comps[2])
}
if (!filter.address || filter.address === "*") { delete filter.address; }
return filter;
}

pollable(): boolean {
return (this.tag.indexOf(":") >= 0 || this.tag === "block" || this.tag === "pending");
}
Expand Down Expand Up @@ -154,7 +175,7 @@ export class BaseProvider extends Provider {
_emitted: { [ eventName: string ]: number | "pending" };

_pollingInterval: number;
_poller: any; // @TODO: what does TypeScript think setInterval returns?
_poller: NodeJS.Timer;

_lastBlockNumber: number;

Expand Down Expand Up @@ -309,10 +330,9 @@ export class BaseProvider extends Provider {

// Find all transaction hashes we are waiting on
this._events.forEach((event) => {
const comps = event.tag.split(":");
switch (comps[0]) {
switch (event.type) {
case "tx": {
const hash = comps[1];
const hash = event.hash;
let runner = this.getTransactionReceipt(hash).then((receipt) => {
if (!receipt || receipt.blockNumber == null) { return null; }
this._emitted["t:" + hash] = receipt.blockNumber;
Expand All @@ -326,14 +346,9 @@ export class BaseProvider extends Provider {
}

case "filter": {
const topics = deserializeTopics(comps[2]);
const filter = {
address: comps[1],
fromBlock: this._lastBlockNumber + 1,
toBlock: blockNumber,
topics: topics
}
if (!filter.address || filter.address === "*") { delete filter.address; }
const filter = event.filter;
filter.fromBlock = this._lastBlockNumber + 1;
filter.toBlock = blockNumber;

const runner = this.getLogs(filter).then((logs) => {
if (logs.length === 0) { return; }
Expand All @@ -342,7 +357,6 @@ export class BaseProvider extends Provider {
this._emitted["t:" + log.transactionHash] = log.blockNumber;
this.emit(filter, log);
});
return null;
}).catch((error: Error) => { this.emit("error", error); });
runners.push(runner);

Expand Down Expand Up @@ -937,25 +951,19 @@ export class BaseProvider extends Provider {
return logger.throwError(method + " not implemented", Logger.errors.NOT_IMPLEMENTED, { operation: method });
}

_startPending(): void {
console.log("WARNING: this provider does not support pending events");
}

_stopPending(): void {
_startEvent(event: Event): void {
this.polling = (this._events.filter((e) => e.pollable()).length > 0);
}

// Returns true if there are events that still require polling
_checkPolling(): void {
_stopEvent(event: Event): void {
this.polling = (this._events.filter((e) => e.pollable()).length > 0);
}

_addEventListener(eventName: EventType, listener: Listener, once: boolean): this {
this._events.push(new Event(getEventTag(eventName), listener, once));

if (eventName === "pending") { this._startPending(); }
const event = new Event(getEventTag(eventName), listener, once)
this._events.push(event);

// Do we still now have any events that require polling?
this._checkPolling();
this._startEvent(event);

return this;
}
Expand All @@ -972,18 +980,27 @@ export class BaseProvider extends Provider {
emit(eventName: EventType, ...args: Array<any>): boolean {
let result = false;

let stopped: Array<Event> = [ ];

let eventTag = getEventTag(eventName);
this._events = this._events.filter((event) => {
if (event.tag !== eventTag) { return true; }

setTimeout(() => {
event.listener.apply(this, args);
}, 0);

result = true;
return !(event.once);

if (event.once) {
stopped.push(event);
return false;
}

return true;
});

// Do we still have any events that require polling? ("once" events remove themselves)
this._checkPolling();
stopped.forEach((event) => { this._stopEvent(event); });

return result;
}
Expand Down Expand Up @@ -1013,38 +1030,40 @@ export class BaseProvider extends Provider {
return this.removeAllListeners(eventName);
}

const stopped: Array<Event> = [ ];

let found = false;

let eventTag = getEventTag(eventName);
this._events = this._events.filter((event) => {
if (event.tag !== eventTag || event.listener != listener) { return true; }
if (found) { return true; }
found = true;
stopped.push(event);
return false;
});

if (eventName === "pending" && this.listenerCount("pending") === 0) { this._stopPending(); }

// Do we still have any events that require polling?
this._checkPolling();
stopped.forEach((event) => { this._stopEvent(event); });

return this;
}

removeAllListeners(eventName?: EventType): this {
let stopped: Array<Event> = [ ];
if (eventName == null) {
stopped = this._events;

this._events = [ ];
this._stopPending();
} else {
let eventTag = getEventTag(eventName);
const eventTag = getEventTag(eventName);
this._events = this._events.filter((event) => {
return (event.tag !== eventTag);
if (event.tag !== eventTag) { return true; }
stopped.push(event);
return false;
});
if (eventName === "pending") { this._stopPending(); }
}

// Do we still have any events that require polling?
this._checkPolling();
stopped.forEach((event) => { this._stopEvent(event); });

return this;
}
Expand Down
17 changes: 17 additions & 0 deletions packages/providers/src.ts/browser-ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"use strict";

import { Logger } from "@ethersproject/logger";
import { version } from "./_version";

let WS = (WebSocket as any);

if (WS == null) {
const logger = new Logger(version);
WS = function() {
logger.throwError("WebSockets not supported in this environment", Logger.errors.UNSUPPORTED_OPERATION, {
operation: "new WebSocket()"
});
}
}

module.exports = WS;
2 changes: 2 additions & 0 deletions packages/providers/src.ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { InfuraProvider } from "./infura-provider";
import { JsonRpcProvider, JsonRpcSigner } from "./json-rpc-provider";
import { NodesmithProvider } from "./nodesmith-provider";
import { Web3Provider } from "./web3-provider";
import { WebSocketProvider } from "./websocket-provider";

import { AsyncSendable } from "./web3-provider";

Expand Down Expand Up @@ -86,6 +87,7 @@ export {
JsonRpcProvider,
NodesmithProvider,
Web3Provider,
WebSocketProvider,

IpcProvider,

Expand Down
21 changes: 17 additions & 4 deletions packages/providers/src.ts/json-rpc-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Logger } from "@ethersproject/logger";
import { version } from "./_version";
const logger = new Logger(version);

import { BaseProvider } from "./base-provider";
import { BaseProvider, Event } from "./base-provider";


function timer(timeout: number): Promise<any> {
Expand Down Expand Up @@ -263,7 +263,7 @@ export class JsonRpcProvider extends BaseProvider {
}

// Default URL
if (!url) { url = "http:/" + "/localhost:8545"; }
if (!url) { url = getStatic<() => string>(this.constructor, "defaultUrl")(); }

if (typeof(url) === "string") {
this.connection = Object.freeze({
Expand All @@ -276,6 +276,10 @@ export class JsonRpcProvider extends BaseProvider {
this._nextId = 42;
}

static defaultUrl(): string {
return "http:/" + "/localhost:8545";
}

getSigner(addressOrIndex?: string | number): JsonRpcSigner {
return new JsonRpcSigner(_constructorGuard, this, addressOrIndex);
}
Expand Down Expand Up @@ -402,6 +406,11 @@ export class JsonRpcProvider extends BaseProvider {
return logger.throwError(method + " not implemented", Logger.errors.NOT_IMPLEMENTED, { operation: method });
}

_startEvent(event: Event): void {
if (event.tag === "pending") { this._startPending(); }
super._startEvent(event);
}

_startPending(): void {
if (this._pendingFilter != null) { return; }
let self = this;
Expand Down Expand Up @@ -445,10 +454,14 @@ export class JsonRpcProvider extends BaseProvider {
}).catch((error: Error) => { });
}

_stopPending(): void {
this._pendingFilter = null;
_stopEvent(event: Event): void {
if (event.tag === "pending" && this.listenerCount("pending") === 0) {
this._pendingFilter = null;
}
super._stopEvent(event);
}


// Convert an ethers.js transaction into a JSON-RPC transaction
// - gasLimit => gas
// - All values hexlified
Expand Down
Loading

0 comments on commit 117a5dd

Please sign in to comment.