Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker: remove eslint exceptions #245

Merged
merged 9 commits into from
Oct 13, 2021
7 changes: 1 addition & 6 deletions packages/broker/.eslintrc
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{
"extends": "eslint-config-streamr-ts",
"rules": {
"@typescript-eslint/ban-ts-comment": "warn",
"max-len": "warn",
"no-underscore-dangle": "warn"
}
"extends": "eslint-config-streamr-ts"
}
1 change: 1 addition & 0 deletions packages/broker/bin/broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ program
if (configFile == null) {
configFile = path.join(os.homedir(), '/.streamr/broker-config.json')
if (!fs.existsSync(configFile)) {
// eslint-disable-next-line max-len
console.error('Config file not found in the default location ~/.streamr/broker-config.json. You can run "streamr-broker-init" to generate a config file interactively, or specify the config file as argument: "streamr-broker path-to-config/file.json"')
process.exit(1)
}
Expand Down
1 change: 1 addition & 0 deletions packages/broker/bin/run-all.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const forkProcess = (processName, filePath, args, color) => {
})
}

/* eslint-disable max-len */
forkProcess('T1', './tracker.js', ['0xe5abc5ee43b8830e7b0f98d03efff5d6cae574d52a43204528eab7b52cd6408d', 'T1', '--port=30301'], chalk.hex('#66CC66')) // 0xb9e7cEBF7b03AE26458E32a059488386b05798e8
forkProcess('T2', './tracker.js', ['0x96de9d06f9e409119a2cd9b57dfc326f66d953a0418f3937b92c8930f930893c', 'T2', '--port=30302'], chalk.hex('#00FF66')) // 0x0540A3e144cdD81F402e7772C76a5808B71d2d30
forkProcess('T3', './tracker.js', ['0x6117b7a7cb8f3c8d40e3b7e87823c11af7f401515bc4fdf2bfdda70f1b833027', 'T3', '--port=30303'], chalk.hex('#66FFAA')) // 0xf2C195bE194a2C91e93Eacb1d6d55a00552a85E2
Expand Down
1 change: 1 addition & 0 deletions packages/broker/src/ConfigWizard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ const PRIVATE_KEY_PROMPTS: Array<inquirer.Question | inquirer.ListQuestion | inq
{
type: 'confirm',
name: 'revealGeneratedPrivateKey',
// eslint-disable-next-line max-len
message: 'We strongly recommend backing up your private key. It will be written into the config file, but would you also like to see this sensitive information on screen now?',
default: false,
when: (answers: inquirer.Answers): boolean => {
Expand Down
6 changes: 4 additions & 2 deletions packages/broker/src/RequestAuthenticatorMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ const logger = new Logger(module)
/**
* Middleware used to authenticate REST API requests
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export const authenticator = (streamFetcher: StreamFetcher, permission = 'stream_subscribe') => (req: Request & { stream?: Todo }, res: Response, next: NextFunction) => {
export const authenticator = (
streamFetcher: StreamFetcher,
permission = 'stream_subscribe'
) => (req: Request & { stream?: Todo }, res: Response, next: NextFunction): void => {
let sessionToken

// Try to parse authorization header if defined
Expand Down
12 changes: 6 additions & 6 deletions packages/broker/src/StreamFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ export class StreamFetcher {

constructor(baseUrl: string) {
this.apiUrl = `${baseUrl}/api/v1`
this.fetch = memoize<StreamFetcher['_fetch']>(this._fetch, {
this.fetch = memoize<StreamFetcher['uncachedFetch']>(this.uncachedFetch, {
maxAge: MAX_AGE,
promise: true,
})
this.checkPermission = memoize<StreamFetcher['_checkPermission']>(this._checkPermission, {
this.checkPermission = memoize<StreamFetcher['uncachedCheckPermission']>(this.uncachedCheckPermission, {
maxAge: MAX_AGE,
promise: true,
})
this.authenticate = memoize<StreamFetcher['_authenticate']>(this._authenticate, {
this.authenticate = memoize<StreamFetcher['uncachedAuthenticate']>(this.uncachedAuthenticate, {
maxAge: MAX_AGE_MINUTE,
promise: true,
})
Expand All @@ -74,7 +74,7 @@ export class StreamFetcher {
return client.session.getSessionToken()
}

private async _authenticate(streamId: string, sessionToken: string|undefined, operation = 'stream_subscribe'): Promise<Todo> {
private async uncachedAuthenticate(streamId: string, sessionToken: string|undefined, operation = 'stream_subscribe'): Promise<Todo> {
await this.checkPermission(streamId, sessionToken, operation)
return this.fetch(streamId, sessionToken)
}
Expand All @@ -88,7 +88,7 @@ export class StreamFetcher {
* @returns {Promise.<TResult>}
* @private
*/
private async _fetch(streamId: string, sessionToken?: string): Promise<Todo> {
private async uncachedFetch(streamId: string, sessionToken?: string): Promise<Todo> {
const url = `${this.apiUrl}/streams/${encodeURIComponent(streamId)}`
const headers = formAuthorizationHeader(sessionToken)

Expand All @@ -115,7 +115,7 @@ export class StreamFetcher {
* @returns {Promise}
* @private
*/
private async _checkPermission(streamId: string, sessionToken: string | undefined | null, operation = 'stream_subscribe'): Promise<true> {
private async uncachedCheckPermission(streamId: string, sessionToken: string | undefined | null, operation = 'stream_subscribe'): Promise<true> {
if (streamId == null) {
throw new Error('_checkPermission: streamId can not be null!')
}
Expand Down
32 changes: 13 additions & 19 deletions packages/broker/src/StreamStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,8 @@ function getStreamLookupKey(streamId: string, streamPartition: number) {
}

export class StreamStateManager<C> {

_streams: Record<string,Stream<C>>
_timeouts: Record<string,NodeJS.Timeout>

constructor() {
this._streams = {}
this._timeouts = {}
}
private streams: Record<string,Stream<C>> = {}
private timeouts: Record<string,NodeJS.Timeout> = {}

getOrCreate(streamId: string, streamPartition: number, name = ''): Stream<C> {
const stream = this.get(streamId, streamPartition)
Expand All @@ -26,13 +20,13 @@ export class StreamStateManager<C> {
}

get(streamId: string, streamPartition: number): Stream<C> {
return this._streams[getStreamLookupKey(streamId, streamPartition)]
return this.streams[getStreamLookupKey(streamId, streamPartition)]
}

getByName(name: string): Stream<C>|null {
const streamId = Object.keys(this._streams)
.find((key) => { return this._streams[key].getName() === name })
return streamId ? this._streams[streamId] : null
const streamId = Object.keys(this.streams)
.find((key) => { return this.streams[key].getName() === name })
return streamId ? this.streams[streamId] : null
}

/**
Expand All @@ -44,12 +38,12 @@ export class StreamStateManager<C> {
}

const key = getStreamLookupKey(streamId, streamPartition)
if (this._streams[key]) {
if (this.streams[key]) {
throw new Error(`stream already exists for ${key}`)
}

const stream = new Stream<C>(streamId, streamPartition, name)
this._streams[key] = stream
this.streams[key] = stream

/*
* In normal conditions, the Stream object is cleaned when no more
Expand All @@ -61,7 +55,7 @@ export class StreamStateManager<C> {
* end up in subscribed state within one minute (for example, ill-behaving)
* clients only asking for resends and never subscribing.
*/
this._timeouts[key] = setTimeout(() => {
this.timeouts[key] = setTimeout(() => {
if (stream.state !== 'subscribed') {
logger.debug('Stream "%s:%d" never subscribed, cleaning..', streamId, streamPartition)
this.delete(streamId, streamPartition)
Expand All @@ -80,16 +74,16 @@ export class StreamStateManager<C> {
const stream = this.get(streamId, streamPartition)
if (stream) {
const key = getStreamLookupKey(streamId, streamPartition)
clearTimeout(this._timeouts[key])
delete this._timeouts[key]
delete this._streams[key]
clearTimeout(this.timeouts[key])
delete this.timeouts[key]
delete this.streams[key]
}

logger.debug('Stream object "%s" deleted', stream.toString())
}

close(): void {
Object.values(this._timeouts).forEach((timeout) => {
Object.values(this.timeouts).forEach((timeout) => {
clearTimeout(timeout)
})
}
Expand Down
6 changes: 3 additions & 3 deletions packages/broker/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export interface StorageNodeConfig {
registry: StorageNodeRegistryItem[] | NetworkSmartContract
}

export type ApiAuthenticationConfig = { keys: string[] } | null

export interface Config {
ethereumPrivateKey: string
generateSessionId: boolean
Expand All @@ -50,7 +52,5 @@ export interface Config {
storageNodeConfig: StorageNodeConfig,
httpServer: HttpServerConfig
plugins: Record<string,any>
apiAuthentication: {
keys: string[]
} | null
apiAuthentication: ApiAuthenticationConfig
}
6 changes: 5 additions & 1 deletion packages/broker/src/httpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ const createAuthenticatorMiddleware = (apiAuthenticator: ApiAuthenticator) => {
}
}

export const startServer = async (routers: express.Router[], config: HttpServerConfig, apiAuthenticator: ApiAuthenticator): Promise<HttpServer|https.Server> => {
export const startServer = async (
routers: express.Router[],
config: HttpServerConfig,
apiAuthenticator: ApiAuthenticator
): Promise<HttpServer|https.Server> => {
const app = express()
app.use(cors())
app.use(createAuthenticatorMiddleware(apiAuthenticator))
Expand Down
10 changes: 5 additions & 5 deletions packages/broker/src/plugins/legacyMqtt/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,24 @@ export class Connection extends events.EventEmitter {

// Connection refused, server unavailable
sendConnectionRefusedServerUnavailable(): void {
this._sendConnack(3)
this.sendConnack(3)
}

// Connection refused, bad user name or password
sendConnectionRefused(): void {
this._sendConnack(4)
this.sendConnack(4)
}

// Connection refused, not authorized
sendConnectionNotAuthorized(): void {
this._sendConnack(5)
this.sendConnack(5)
}

sendConnectionAccepted(): void {
this._sendConnack(0)
this.sendConnack(0)
}

_sendConnack(code = 0): void {
private sendConnack(code = 0): void {
try {
this.client.connack({
returnCode: code
Expand Down
34 changes: 15 additions & 19 deletions packages/broker/src/plugins/legacyWebsocket/RequestHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Protocol } from 'streamr-network'
const { ControlLayer, Utils, ControlMessageType } = Protocol
const { ControlLayer, Utils, ControlMessageType, ErrorCode } = Protocol
import { ArrayMultimap } from '@teppeis/multimaps'
import { HttpError } from '../../errors/HttpError'
import { FailedToPublishError } from '../../errors/FailedToPublishError'
Expand Down Expand Up @@ -84,8 +84,7 @@ export class RequestHandler {
version: request.version,
requestId: request.requestId,
errorMessage: `Unknown request type: ${request.type}`,
// @ts-expect-error
errorCode: 'INVALID_REQUEST',
errorCode: ErrorCode.INVALID_REQUEST,
}))
return Promise.resolve()
}
Expand All @@ -108,33 +107,32 @@ export class RequestHandler {
let errorCode
if (err instanceof HttpError && err.code === 401) {
errorMessage = `Authentication failed while trying to publish to stream ${streamMessage.getStreamId()}`
errorCode = 'AUTHENTICATION_FAILED'
errorCode = ErrorCode.AUTHENTICATION_FAILED
} else if (err instanceof HttpError && err.code === 403) {
errorMessage = `You are not allowed to write to stream ${streamMessage.getStreamId()}`
errorCode = 'PERMISSION_DENIED'
errorCode = ErrorCode.PERMISSION_DENIED
} else if (err instanceof HttpError && err.code === 404) {
errorMessage = `Stream ${streamMessage.getStreamId()} not found.`
errorCode = 'NOT_FOUND'
errorCode = ErrorCode.NOT_FOUND
} else if (err instanceof FailedToPublishError) {
errorMessage = err.message
errorCode = 'FUTURE_TIMESTAMP'
errorCode = ErrorCode.FUTURE_TIMESTAMP
} else {
errorMessage = `Publish request failed: ${err.message || err}`
errorCode = 'REQUEST_FAILED'
errorCode = ErrorCode.REQUEST_FAILED
}

connection.send(new ControlLayer.ErrorResponse({
version: request.version,
requestId: request.requestId,
errorMessage,
// @ts-expect-error
errorCode,
}))
}
}

private async resend(connection: Connection, request: ResendFromRequest|ResendLastRequest|ResendRangeRequest) {
await this._validateSubscribeOrResendRequest(request)
await this.validateSubscribeOrResendRequest(request)
let streamingStorageData
try {
const response = await createHistoricalDataResponse(request, this.storageNodeRegistry)
Expand Down Expand Up @@ -230,7 +228,7 @@ export class RequestHandler {

private async subscribe(connection: Connection, request: SubscribeRequest) {
try {
await this._validateSubscribeOrResendRequest(request)
await this.validateSubscribeOrResendRequest(request)

if (connection.isDead()) {
return
Expand Down Expand Up @@ -266,23 +264,22 @@ export class RequestHandler {
let errorCode
if (err instanceof HttpError && err.code === 401) {
errorMessage = `Authentication failed while trying to subscribe to stream ${request.streamId}`
errorCode = 'AUTHENTICATION_FAILED'
errorCode = ErrorCode.AUTHENTICATION_FAILED
} else if (err instanceof HttpError && err.code === 403) {
errorMessage = `You are not allowed to subscribe to stream ${request.streamId}`
errorCode = 'PERMISSION_DENIED'
errorCode = ErrorCode.PERMISSION_DENIED
} else if (err instanceof HttpError && err.code === 404) {
errorMessage = `Stream ${request.streamId} not found.`
errorCode = 'NOT_FOUND'
errorCode = ErrorCode.NOT_FOUND
} else {
errorMessage = `Subscribe request failed: ${err}`
errorCode = 'REQUEST_FAILED'
errorCode = ErrorCode.REQUEST_FAILED
}

connection.send(new ControlLayer.ErrorResponse({
version: request.version,
requestId: request.requestId,
errorMessage,
// @ts-expect-error
errorCode,
}))
}
Expand Down Expand Up @@ -335,14 +332,13 @@ export class RequestHandler {
version: request.version,
requestId: request.requestId,
errorMessage: `Not subscribed to stream ${request.streamId} partition ${request.streamPartition}!`,
// @ts-expect-error
errorCode: 'INVALID_REQUEST',
errorCode: ErrorCode.INVALID_REQUEST,
}))
}
}
}

private async _validateSubscribeOrResendRequest(request: SubscribeRequest|ResendFromRequest|ResendLastRequest|ResendRangeRequest) {
private async validateSubscribeOrResendRequest(request: SubscribeRequest|ResendFromRequest|ResendLastRequest|ResendRangeRequest) {
if (Utils.StreamMessageValidator.isKeyExchangeStream(request.streamId)) {
if (request.streamPartition !== 0) {
throw new Error(`Key exchange streams only have partition 0. Tried to subscribe to ${request.streamId}:${request.streamPartition}`)
Expand Down
7 changes: 5 additions & 2 deletions packages/broker/src/plugins/legacyWebsocket/historicalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const getDataQueryEndpointUrl = (request: ResendFromRequest|ResendLastRequest|Re
...query,
format: 'raw'
}, { skipNulls: true })
// eslint-disable-next-line max-len
return `${baseUrl}/streams/${encodeURIComponent(request.streamId)}/data/partitions/${request.streamPartition}/${endpointSuffix}?${queryParameters}`
}
let r
Expand All @@ -41,15 +42,17 @@ const getDataQueryEndpointUrl = (request: ResendFromRequest|ResendLastRequest|Re
r = request as ResendFromRequest
return createUrl('from', {
fromTimestamp: r.fromMsgRef.timestamp,
// TODO client should provide sequenceNumber, remove MIN_SEQUENCE_NUMBER_VALUE defaults when NET-267 have been implemented
// TODO client should provide sequenceNumber, remove MIN_SEQUENCE_NUMBER_VALUE defaults when NET-267
// have been implemented
fromSequenceNumber: r.fromMsgRef.sequenceNumber ?? MIN_SEQUENCE_NUMBER_VALUE,
publisherId: r.publisherId,
})
case ControlLayer.ControlMessage.TYPES.ResendRangeRequest:
r = request as ResendRangeRequest
return createUrl('range', {
fromTimestamp: r.fromMsgRef.timestamp,
// TODO client should provide sequenceNumber, remove MIN_SEQUENCE_NUMBER_VALUE&MAX_SEQUENCE_NUMBER_VALUE defaults when NET-267 have been implemented
// TODO client should provide sequenceNumber, remove MIN_SEQUENCE_NUMBER_VALUE&MAX_SEQUENCE_NUMBER_VALUE
// defaults when NET-267 have been implemented
fromSequenceNumber: r.fromMsgRef.sequenceNumber ?? MIN_SEQUENCE_NUMBER_VALUE,
toTimestamp: r.toMsgRef.timestamp,
toSequenceNumber: r.toMsgRef.sequenceNumber ?? MAX_SEQUENCE_NUMBER_VALUE,
Expand Down
Loading