Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Commit

Permalink
Merge pull request #375 from 0xProject/feature/testnet-faucets/queue-…
Browse files Browse the repository at this point in the history
…by-network

Organize async task queues by network
  • Loading branch information
BMillman19 authored Feb 7, 2018
2 parents e443cb2 + 2404ff0 commit 852811b
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 169 deletions.
54 changes: 54 additions & 0 deletions packages/testnet-faucets/src/ts/dispatch_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { intervalUtils } from '@0xproject/utils';
import * as _ from 'lodash';

import { errorReporter } from './error_reporter';
import { utils } from './utils';

const MAX_QUEUE_SIZE = 500;
const DEFAULT_QUEUE_INTERVAL_MS = 1000;

export class DispatchQueue {
private _queueIntervalMs: number;
private _queue: Array<() => Promise<void>>;
private _queueIntervalIdIfExists?: NodeJS.Timer;
constructor() {
this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS;
this._queue = [];
this._start();
}
public add(taskAsync: () => Promise<void>): boolean {
if (this.isFull()) {
return false;
}
this._queue.push(taskAsync);
return true;
}
public size(): number {
return this._queue.length;
}
public isFull(): boolean {
return this.size() >= MAX_QUEUE_SIZE;
}
public stop() {
if (!_.isUndefined(this._queueIntervalIdIfExists)) {
intervalUtils.clearAsyncExcludingInterval(this._queueIntervalIdIfExists);
}
}
private _start() {
this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
async () => {
const taskAsync = this._queue.shift();
if (_.isUndefined(taskAsync)) {
return Promise.resolve();
}
await taskAsync();
},
this._queueIntervalMs,
(err: Error) => {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
// tslint:disable-next-line:no-floating-promises
errorReporter.reportAsync(err);
},
);
}
}
44 changes: 44 additions & 0 deletions packages/testnet-faucets/src/ts/dispense_asset_tasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { ZeroEx } from '0x.js';
import { BigNumber, promisify } from '@0xproject/utils';
import * as _ from 'lodash';
import * as Web3 from 'web3';

import { configs } from './configs';
import { errorReporter } from './error_reporter';
import { utils } from './utils';

const DISPENSE_AMOUNT_ETHER = 0.1;
const DISPENSE_AMOUNT_TOKEN = 0.1;

export const dispenseAssetTasks = {
dispenseEtherTask(recipientAddress: string, web3: Web3) {
return async () => {
utils.consoleLog(`Processing ETH ${recipientAddress}`);
const sendTransactionAsync = promisify(web3.eth.sendTransaction);
const txHash = await sendTransactionAsync({
from: configs.DISPENSER_ADDRESS,
to: recipientAddress,
value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
});
utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
};
},
dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) {
return async () => {
utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`);
const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN);
const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol);
if (_.isUndefined(token)) {
throw new Error(`Unsupported asset type: ${tokenSymbol}`);
}
const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals);
const txHash = await zeroEx.token.transferAsync(
token.address,
configs.DISPENSER_ADDRESS,
recipientAddress,
baseUnitAmount,
);
utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`);
};
},
};
3 changes: 0 additions & 3 deletions packages/testnet-faucets/src/ts/error_reporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ export const errorReporter = {
rollbar.init(configs.ROLLBAR_ACCESS_KEY, {
environment: configs.ENVIRONMENT,
});

rollbar.handleUncaughtExceptions(configs.ROLLBAR_ACCESS_KEY);

process.on('unhandledRejection', async (err: Error) => {
utils.consoleLog(`Uncaught exception ${err}. Stack: ${err.stack}`);
await this.reportAsync(err);
Expand All @@ -22,7 +20,6 @@ export const errorReporter = {
if (configs.ENVIRONMENT === 'development') {
return; // Do not log development environment errors
}

return new Promise((resolve, reject) => {
rollbar.handleError(err, req, (rollbarErr: Error) => {
if (rollbarErr) {
Expand Down
27 changes: 0 additions & 27 deletions packages/testnet-faucets/src/ts/ether_request_queue.ts

This file was deleted.

97 changes: 52 additions & 45 deletions packages/testnet-faucets/src/ts/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Order, SignedOrder, ZeroEx } from '0x.js';
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import { BigNumber } from '@0xproject/utils';
import * as express from 'express';
import * as _ from 'lodash';
Expand All @@ -10,17 +9,23 @@ import * as Web3 from 'web3';
// we are not running in a browser env.
// Filed issue: https://github.com/ethereum/web3.js/issues/844
(global as any).XMLHttpRequest = undefined;
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import ProviderEngine = require('web3-provider-engine');
import HookedWalletSubprovider = require('web3-provider-engine/subproviders/hooked-wallet');
import RpcSubprovider = require('web3-provider-engine/subproviders/rpc');

import { configs } from './configs';
import { EtherRequestQueue } from './ether_request_queue';
import { DispatchQueue } from './dispatch_queue';
import { dispenseAssetTasks } from './dispense_asset_tasks';
import { idManagement } from './id_management';
import { RequestQueue } from './request_queue';
import { rpcUrls } from './rpc_urls';
import { utils } from './utils';
import { ZRXRequestQueue } from './zrx_request_queue';

interface NetworkConfig {
dispatchQueue: DispatchQueue;
web3: Web3;
zeroEx: ZeroEx;
}

interface ItemByNetworkId<T> {
[networkId: string]: T;
Expand All @@ -35,30 +40,7 @@ enum RequestedAssetType {
const FIVE_DAYS_IN_MS = 4.32e8; // TODO: make this configurable

export class Handler {
private _zeroExByNetworkId: ItemByNetworkId<ZeroEx> = {};
private _etherRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private _zrxRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private static _dispenseAsset(
req: express.Request,
res: express.Response,
requestQueueByNetworkId: ItemByNetworkId<RequestQueue>,
requestedAssetType: RequestedAssetType,
) {
const requestQueue = _.get(requestQueueByNetworkId, req.params.networkId);
if (_.isUndefined(requestQueue)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const didAddToQueue = requestQueue.add(req.params.recipient);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(
`Added ${req.params.recipient} to queue: ${requestedAssetType} networkId: ${req.params.networkId}`,
);
res.status(200).end();
}
private _networkConfigByNetworkId: ItemByNetworkId<NetworkConfig> = {};
private static _createProviderEngine(rpcUrl: string) {
const engine = new ProviderEngine();
engine.addProvider(new NonceTrackerSubprovider());
Expand All @@ -79,48 +61,73 @@ export class Handler {
networkId: +networkId,
};
const zeroEx = new ZeroEx(web3.currentProvider, zeroExConfig);
this._zeroExByNetworkId[networkId] = zeroEx;
this._etherRequestQueueByNetworkId[networkId] = new EtherRequestQueue(web3);
this._zrxRequestQueueByNetworkId[networkId] = new ZRXRequestQueue(web3, zeroEx);
const dispatchQueue = new DispatchQueue();
this._networkConfigByNetworkId[networkId] = {
dispatchQueue,
web3,
zeroEx,
};
});
}
public getQueueInfo(req: express.Request, res: express.Response) {
res.setHeader('Content-Type', 'application/json');
const queueInfo = _.mapValues(rpcUrls, (rpcUrl: string, networkId: string) => {
const etherRequestQueue = this._etherRequestQueueByNetworkId[networkId];
const zrxRequestQueue = this._zrxRequestQueueByNetworkId[networkId];
const dispatchQueue = this._networkConfigByNetworkId[networkId].dispatchQueue;
return {
ether: {
full: etherRequestQueue.isFull(),
size: etherRequestQueue.size(),
},
zrx: {
full: zrxRequestQueue.isFull(),
size: zrxRequestQueue.size(),
},
full: dispatchQueue.isFull(),
size: dispatchQueue.size(),
};
});
const payload = JSON.stringify(queueInfo);
res.status(200).send(payload);
}
public dispenseEther(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._etherRequestQueueByNetworkId, RequestedAssetType.ETH);
this._dispenseAsset(req, res, RequestedAssetType.ETH);
}
public dispenseZRX(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._zrxRequestQueueByNetworkId, RequestedAssetType.ZRX);
this._dispenseAsset(req, res, RequestedAssetType.ZRX);
}
public async dispenseWETHOrder(req: express.Request, res: express.Response) {
await this._dispenseOrder(req, res, RequestedAssetType.WETH);
}
public async dispenseZRXOrder(req: express.Request, res: express.Response, next: express.NextFunction) {
await this._dispenseOrder(req, res, RequestedAssetType.ZRX);
}
private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const networkId = req.params.networkId;
const recipient = req.params.recipient;
const networkConfig = this._networkConfigByNetworkId[networkId];
let dispenserTask;
switch (requestedAssetType) {
case RequestedAssetType.ETH:
dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3);
break;
case RequestedAssetType.WETH:
case RequestedAssetType.ZRX:
dispenserTask = dispenseAssetTasks.dispenseTokenTask(
recipient,
requestedAssetType,
networkConfig.zeroEx,
);
break;
default:
throw new Error(`Unsupported asset type: ${requestedAssetType}`);
}
const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(`Added ${recipient} to queue: ${requestedAssetType} networkId: ${networkId}`);
res.status(200).end();
}
private async _dispenseOrder(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const zeroEx = _.get(this._zeroExByNetworkId, req.params.networkId);
if (_.isUndefined(zeroEx)) {
const networkConfig = _.get(this._networkConfigByNetworkId, req.params.networkId);
if (_.isUndefined(networkConfig)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const zeroEx = networkConfig.zeroEx;
res.setHeader('Content-Type', 'application/json');
const makerTokenAddress = await zeroEx.tokenRegistry.getTokenAddressBySymbolIfExistsAsync(requestedAssetType);
if (_.isUndefined(makerTokenAddress)) {
Expand Down
51 changes: 0 additions & 51 deletions packages/testnet-faucets/src/ts/request_queue.ts

This file was deleted.

43 changes: 0 additions & 43 deletions packages/testnet-faucets/src/ts/zrx_request_queue.ts

This file was deleted.

0 comments on commit 852811b

Please sign in to comment.