Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Organize async task queues by network #375

Merged
merged 4 commits into from
Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions packages/testnet-faucets/src/ts/dispatch_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { intervalUtils } from '@0xproject/utils';
import * as _ from 'lodash';

import { errorReporter } from './error_reporter';
import { utils } from './utils';

const MAX_QUEUE_SIZE = 500;
const DEFAULT_QUEUE_INTERVAL_MS = 1000;

export class DispatchQueue {
private _queueIntervalMs: number;
private _queue: Array<() => Promise<void>>;
private _queueIntervalIdIfExists?: NodeJS.Timer;
constructor() {
this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS;
this._queue = [];
this._start();
}
public add(taskAsync: () => Promise<void>): boolean {
if (this.isFull()) {
return false;
}
this._queue.push(taskAsync);
return true;
}
public size(): number {
return this._queue.length;
}
public isFull(): boolean {
return this.size() >= MAX_QUEUE_SIZE;
}
public stop() {
if (!_.isUndefined(this._queueIntervalIdIfExists)) {
intervalUtils.clearAsyncExcludingInterval(this._queueIntervalIdIfExists);
}
}
private _start() {
this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
async () => {
const taskAsync = this._queue.shift();
if (_.isUndefined(taskAsync)) {
return Promise.resolve();
}
await taskAsync();
},
this._queueIntervalMs,
(err: Error) => {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
// tslint:disable-next-line:no-floating-promises
errorReporter.reportAsync(err);
},
);
}
}
44 changes: 44 additions & 0 deletions packages/testnet-faucets/src/ts/dispense_asset_tasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { ZeroEx } from '0x.js';
import { BigNumber, promisify } from '@0xproject/utils';
import * as _ from 'lodash';
import * as Web3 from 'web3';

import { configs } from './configs';
import { errorReporter } from './error_reporter';
import { utils } from './utils';

const DISPENSE_AMOUNT_ETHER = 0.1;
const DISPENSE_AMOUNT_TOKEN = 0.1;

export const dispenseAssetTasks = {
dispenseEtherTask(recipientAddress: string, web3: Web3) {
return async () => {
utils.consoleLog(`Processing ETH ${recipientAddress}`);
const sendTransactionAsync = promisify(web3.eth.sendTransaction);
const txHash = await sendTransactionAsync({
from: configs.DISPENSER_ADDRESS,
to: recipientAddress,
value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
});
utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
};
},
dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) {
return async () => {
utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`);
const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN);
const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol);
if (_.isUndefined(token)) {
throw new Error(`Unsupported asset type: ${tokenSymbol}`);
}
const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals);
const txHash = await zeroEx.token.transferAsync(
token.address,
configs.DISPENSER_ADDRESS,
recipientAddress,
baseUnitAmount,
);
utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`);
};
},
};
3 changes: 0 additions & 3 deletions packages/testnet-faucets/src/ts/error_reporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ export const errorReporter = {
rollbar.init(configs.ROLLBAR_ACCESS_KEY, {
environment: configs.ENVIRONMENT,
});

rollbar.handleUncaughtExceptions(configs.ROLLBAR_ACCESS_KEY);

process.on('unhandledRejection', async (err: Error) => {
utils.consoleLog(`Uncaught exception ${err}. Stack: ${err.stack}`);
await this.reportAsync(err);
Expand All @@ -22,7 +20,6 @@ export const errorReporter = {
if (configs.ENVIRONMENT === 'development') {
return; // Do not log development environment errors
}

return new Promise((resolve, reject) => {
rollbar.handleError(err, req, (rollbarErr: Error) => {
if (rollbarErr) {
Expand Down
27 changes: 0 additions & 27 deletions packages/testnet-faucets/src/ts/ether_request_queue.ts

This file was deleted.

97 changes: 52 additions & 45 deletions packages/testnet-faucets/src/ts/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Order, SignedOrder, ZeroEx } from '0x.js';
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import { BigNumber } from '@0xproject/utils';
import * as express from 'express';
import * as _ from 'lodash';
Expand All @@ -10,17 +9,23 @@ import * as Web3 from 'web3';
// we are not running in a browser env.
// Filed issue: https://github.com/ethereum/web3.js/issues/844
(global as any).XMLHttpRequest = undefined;
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import ProviderEngine = require('web3-provider-engine');
import HookedWalletSubprovider = require('web3-provider-engine/subproviders/hooked-wallet');
import RpcSubprovider = require('web3-provider-engine/subproviders/rpc');

import { configs } from './configs';
import { EtherRequestQueue } from './ether_request_queue';
import { DispatchQueue } from './dispatch_queue';
import { dispenseAssetTasks } from './dispense_asset_tasks';
import { idManagement } from './id_management';
import { RequestQueue } from './request_queue';
import { rpcUrls } from './rpc_urls';
import { utils } from './utils';
import { ZRXRequestQueue } from './zrx_request_queue';

interface NetworkConfig {
dispatchQueue: DispatchQueue;
web3: Web3;
zeroEx: ZeroEx;
}

interface ItemByNetworkId<T> {
[networkId: string]: T;
Expand All @@ -35,30 +40,7 @@ enum RequestedAssetType {
const FIVE_DAYS_IN_MS = 4.32e8; // TODO: make this configurable

export class Handler {
private _zeroExByNetworkId: ItemByNetworkId<ZeroEx> = {};
private _etherRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private _zrxRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private static _dispenseAsset(
req: express.Request,
res: express.Response,
requestQueueByNetworkId: ItemByNetworkId<RequestQueue>,
requestedAssetType: RequestedAssetType,
) {
const requestQueue = _.get(requestQueueByNetworkId, req.params.networkId);
if (_.isUndefined(requestQueue)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const didAddToQueue = requestQueue.add(req.params.recipient);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(
`Added ${req.params.recipient} to queue: ${requestedAssetType} networkId: ${req.params.networkId}`,
);
res.status(200).end();
}
private _networkConfigByNetworkId: ItemByNetworkId<NetworkConfig> = {};
private static _createProviderEngine(rpcUrl: string) {
const engine = new ProviderEngine();
engine.addProvider(new NonceTrackerSubprovider());
Expand All @@ -79,48 +61,73 @@ export class Handler {
networkId: +networkId,
};
const zeroEx = new ZeroEx(web3.currentProvider, zeroExConfig);
this._zeroExByNetworkId[networkId] = zeroEx;
this._etherRequestQueueByNetworkId[networkId] = new EtherRequestQueue(web3);
this._zrxRequestQueueByNetworkId[networkId] = new ZRXRequestQueue(web3, zeroEx);
const dispatchQueue = new DispatchQueue();
this._networkConfigByNetworkId[networkId] = {
dispatchQueue,
web3,
zeroEx,
};
});
}
public getQueueInfo(req: express.Request, res: express.Response) {
res.setHeader('Content-Type', 'application/json');
const queueInfo = _.mapValues(rpcUrls, (rpcUrl: string, networkId: string) => {
const etherRequestQueue = this._etherRequestQueueByNetworkId[networkId];
const zrxRequestQueue = this._zrxRequestQueueByNetworkId[networkId];
const dispatchQueue = this._networkConfigByNetworkId[networkId].dispatchQueue;
return {
ether: {
full: etherRequestQueue.isFull(),
size: etherRequestQueue.size(),
},
zrx: {
full: zrxRequestQueue.isFull(),
size: zrxRequestQueue.size(),
},
full: dispatchQueue.isFull(),
size: dispatchQueue.size(),
};
});
const payload = JSON.stringify(queueInfo);
res.status(200).send(payload);
}
public dispenseEther(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._etherRequestQueueByNetworkId, RequestedAssetType.ETH);
this._dispenseAsset(req, res, RequestedAssetType.ETH);
}
public dispenseZRX(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._zrxRequestQueueByNetworkId, RequestedAssetType.ZRX);
this._dispenseAsset(req, res, RequestedAssetType.ZRX);
}
public async dispenseWETHOrder(req: express.Request, res: express.Response) {
await this._dispenseOrder(req, res, RequestedAssetType.WETH);
}
public async dispenseZRXOrder(req: express.Request, res: express.Response, next: express.NextFunction) {
await this._dispenseOrder(req, res, RequestedAssetType.ZRX);
}
private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const networkId = req.params.networkId;
const recipient = req.params.recipient;
const networkConfig = this._networkConfigByNetworkId[networkId];
let dispenserTask;
switch (requestedAssetType) {
case RequestedAssetType.ETH:
dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3);
break;
case RequestedAssetType.WETH:
case RequestedAssetType.ZRX:
dispenserTask = dispenseAssetTasks.dispenseTokenTask(
recipient,
requestedAssetType,
networkConfig.zeroEx,
);
break;
default:
throw new Error(`Unsupported asset type: ${requestedAssetType}`);
}
const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(`Added ${recipient} to queue: ${requestedAssetType} networkId: ${networkId}`);
res.status(200).end();
}
private async _dispenseOrder(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const zeroEx = _.get(this._zeroExByNetworkId, req.params.networkId);
if (_.isUndefined(zeroEx)) {
const networkConfig = _.get(this._networkConfigByNetworkId, req.params.networkId);
if (_.isUndefined(networkConfig)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const zeroEx = networkConfig.zeroEx;
res.setHeader('Content-Type', 'application/json');
const makerTokenAddress = await zeroEx.tokenRegistry.getTokenAddressBySymbolIfExistsAsync(requestedAssetType);
if (_.isUndefined(makerTokenAddress)) {
Expand Down
51 changes: 0 additions & 51 deletions packages/testnet-faucets/src/ts/request_queue.ts

This file was deleted.

43 changes: 0 additions & 43 deletions packages/testnet-faucets/src/ts/zrx_request_queue.ts

This file was deleted.