Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Organize async task queues by network #375

Merged
merged 4 commits into from
Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions packages/testnet-faucets/src/ts/dispatch_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { intervalUtils } from '@0xproject/utils';
import * as _ from 'lodash';

const MAX_QUEUE_SIZE = 500;
const DEFAULT_QUEUE_INTERVAL_MS = 1000;

export class DispatchQueue {
private _queueIntervalMs: number;
private _queue: Array<() => Promise<void>>;
private _queueIntervalIdIfExists?: NodeJS.Timer;
constructor() {
this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS;
this._queue = [];
this._start();
}
public add(task: () => Promise<void>): boolean {
if (this.isFull()) {
return false;
}
this._queue.push(task);
return true;
}
public size(): number {
return this._queue.length;
}
public isFull(): boolean {
return this.size() >= MAX_QUEUE_SIZE;
}
public stop() {
if (!_.isUndefined(this._queueIntervalIdIfExists)) {
intervalUtils.clearAsyncExcludingInterval(this._queueIntervalIdIfExists);
}
}
private _start() {
this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
async () => {
const task = this._queue.shift();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename task to taskAsync

if (_.isUndefined(task)) {
return Promise.resolve();
}
await task();
},
this._queueIntervalMs,
_.noop,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should actually handle the error here. If we encounter one, let's log the error to Rollbar and stop the thrown error from crashing the process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see this is done in the individual actions. It would be cleaner to move it up here, then it only lives in one place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm its a bit tricky because the error reporting is asynchronous, let's sync up if theres a way to fix this

);
}
}
54 changes: 54 additions & 0 deletions packages/testnet-faucets/src/ts/dispense_asset_tasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { ZeroEx } from '0x.js';
import { BigNumber, promisify } from '@0xproject/utils';
import * as _ from 'lodash';
import * as Web3 from 'web3';

import { configs } from './configs';
import { errorReporter } from './error_reporter';
import { utils } from './utils';

const DISPENSE_AMOUNT_ETHER = 0.1;
const DISPENSE_AMOUNT_TOKEN = 0.1;

export const dispenseAssetTasks = {
dispenseEtherTask(recipientAddress: string, web3: Web3) {
return async () => {
utils.consoleLog(`Processing ETH ${recipientAddress}`);
const sendTransactionAsync = promisify(web3.eth.sendTransaction);
try {
const txHash = await sendTransactionAsync({
from: configs.DISPENSER_ADDRESS,
to: recipientAddress,
value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
});
utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
} catch (err) {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
await errorReporter.reportAsync(err);
}
};
},
dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) {
return async () => {
utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`);
const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN);
try {
const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol);
if (_.isUndefined(token)) {
throw new Error(`Unsupported asset type: ${tokenSymbol}`);
}
const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals);
const txHash = await zeroEx.token.transferAsync(
token.address,
configs.DISPENSER_ADDRESS,
recipientAddress,
baseUnitAmount,
);
utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`);
} catch (err) {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
await errorReporter.reportAsync(err);
}
};
},
};
27 changes: 0 additions & 27 deletions packages/testnet-faucets/src/ts/ether_request_queue.ts

This file was deleted.

100 changes: 55 additions & 45 deletions packages/testnet-faucets/src/ts/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Order, SignedOrder, ZeroEx } from '0x.js';
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import { BigNumber } from '@0xproject/utils';
import * as express from 'express';
import * as _ from 'lodash';
Expand All @@ -10,17 +9,23 @@ import * as Web3 from 'web3';
// we are not running in a browser env.
// Filed issue: https://github.com/ethereum/web3.js/issues/844
(global as any).XMLHttpRequest = undefined;
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import ProviderEngine = require('web3-provider-engine');
import HookedWalletSubprovider = require('web3-provider-engine/subproviders/hooked-wallet');
import RpcSubprovider = require('web3-provider-engine/subproviders/rpc');

import { configs } from './configs';
import { EtherRequestQueue } from './ether_request_queue';
import { DispatchQueue } from './dispatch_queue';
import { dispenseAssetTasks } from './dispense_asset_tasks';
import { idManagement } from './id_management';
import { RequestQueue } from './request_queue';
import { rpcUrls } from './rpc_urls';
import { utils } from './utils';
import { ZRXRequestQueue } from './zrx_request_queue';

interface NetworkConfig {
dispatchQueue: DispatchQueue;
web3: Web3;
zeroEx: ZeroEx;
}

interface ItemByNetworkId<T> {
[networkId: string]: T;
Expand All @@ -35,30 +40,7 @@ enum RequestedAssetType {
const FIVE_DAYS_IN_MS = 4.32e8; // TODO: make this configurable

export class Handler {
private _zeroExByNetworkId: ItemByNetworkId<ZeroEx> = {};
private _etherRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private _zrxRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private static _dispenseAsset(
req: express.Request,
res: express.Response,
requestQueueByNetworkId: ItemByNetworkId<RequestQueue>,
requestedAssetType: RequestedAssetType,
) {
const requestQueue = _.get(requestQueueByNetworkId, req.params.networkId);
if (_.isUndefined(requestQueue)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const didAddToQueue = requestQueue.add(req.params.recipient);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(
`Added ${req.params.recipient} to queue: ${requestedAssetType} networkId: ${req.params.networkId}`,
);
res.status(200).end();
}
private _networkConfigByNetworkId: ItemByNetworkId<NetworkConfig> = {};
private static _createProviderEngine(rpcUrl: string) {
const engine = new ProviderEngine();
engine.addProvider(new NonceTrackerSubprovider());
Expand All @@ -79,48 +61,76 @@ export class Handler {
networkId: +networkId,
};
const zeroEx = new ZeroEx(web3.currentProvider, zeroExConfig);
this._zeroExByNetworkId[networkId] = zeroEx;
this._etherRequestQueueByNetworkId[networkId] = new EtherRequestQueue(web3);
this._zrxRequestQueueByNetworkId[networkId] = new ZRXRequestQueue(web3, zeroEx);
const dispatchQueue = new DispatchQueue();
this._networkConfigByNetworkId[networkId] = {
dispatchQueue,
web3,
zeroEx,
};
});
}
public getQueueInfo(req: express.Request, res: express.Response) {
res.setHeader('Content-Type', 'application/json');
const queueInfo = _.mapValues(rpcUrls, (rpcUrl: string, networkId: string) => {
const etherRequestQueue = this._etherRequestQueueByNetworkId[networkId];
const zrxRequestQueue = this._zrxRequestQueueByNetworkId[networkId];
const dispatchQueue = this._networkConfigByNetworkId[networkId].dispatchQueue;
return {
ether: {
full: etherRequestQueue.isFull(),
size: etherRequestQueue.size(),
},
zrx: {
full: zrxRequestQueue.isFull(),
size: zrxRequestQueue.size(),
},
full: dispatchQueue.isFull(),
size: dispatchQueue.size(),
};
});
const payload = JSON.stringify(queueInfo);
res.status(200).send(payload);
}
public dispenseEther(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._etherRequestQueueByNetworkId, RequestedAssetType.ETH);
this._dispenseAsset(req, res, RequestedAssetType.ETH);
}
public dispenseZRX(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._zrxRequestQueueByNetworkId, RequestedAssetType.ZRX);
this._dispenseAsset(req, res, RequestedAssetType.ZRX);
}
public async dispenseWETHOrder(req: express.Request, res: express.Response) {
await this._dispenseOrder(req, res, RequestedAssetType.WETH);
}
public async dispenseZRXOrder(req: express.Request, res: express.Response, next: express.NextFunction) {
await this._dispenseOrder(req, res, RequestedAssetType.ZRX);
}
private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const networkId = req.params.networkId;
const recipient = req.params.recipient;
const networkConfig = _.get(this._networkConfigByNetworkId, networkId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's simply index in here rather then use _.get.

if (_.isUndefined(networkConfig)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
let dispenserTask;
switch (requestedAssetType) {
case RequestedAssetType.ETH:
dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3);
break;
case RequestedAssetType.WETH:
dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these be defaulted to the same case statement body?

case RequestedAssetType.WETH:
case RequestedAssetType.ZRX:
    dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx);
    break;

break;
case RequestedAssetType.ZRX:
dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx);
break;
default:
throw new Error(`Unsupported asset type: ${requestedAssetType}`);
}

const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(`Added ${recipient} to queue: ${requestedAssetType} networkId: ${networkId}`);
res.status(200).end();
}
private async _dispenseOrder(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const zeroEx = _.get(this._zeroExByNetworkId, req.params.networkId);
if (_.isUndefined(zeroEx)) {
const networkConfig = _.get(this._networkConfigByNetworkId, req.params.networkId);
if (_.isUndefined(networkConfig)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const zeroEx = networkConfig.zeroEx;
res.setHeader('Content-Type', 'application/json');
const makerTokenAddress = await zeroEx.tokenRegistry.getTokenAddressBySymbolIfExistsAsync(requestedAssetType);
if (_.isUndefined(makerTokenAddress)) {
Expand Down
56 changes: 0 additions & 56 deletions packages/testnet-faucets/src/ts/request_queue.ts

This file was deleted.

43 changes: 0 additions & 43 deletions packages/testnet-faucets/src/ts/zrx_request_queue.ts

This file was deleted.

Loading