Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature retry cypher query #97

Merged
merged 8 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ e2e/fixtures/config/configstore/
e2e/fixtures/config/Electron/
e2e/fixtures/cache/neo4j-relate/dbmss/
e2e/fixtures/cache/neo4j-relate/extensions/
e2e/fixtures/data/neo4j-relate/relate.secret.key

known_connections
1 change: 1 addition & 0 deletions packages/cli/docs/dbms.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ARGUMENTS
DBMS Name or ID of a Neo4j instance

OPTIONS
-c, --credentials=credentials (required)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make this testable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer we mock STDIN than adding a flag for credentials. Passing credentials as flags/arguments is not a good practice and I think we should avoid it as much as we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree. Do you have any example on doing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just looked at the built-in mocking of STDIN that oclif provides. They don't close the stream after sending data, which makes it useless in our case.

An alternative to that would be mocking the passwordPrompt module using jest.

jest.mock('../../prompts', () => {
    return {
        passwordPrompt: (): Promise<string> => Promise.resolve(TestDbmss.DBMS_CREDENTIALS),
    };
});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great suggestion, updated!

-e, --environment=environment Name of the environment to run the command against
-u, --user=user [default: neo4j] Neo4j DBMS user to create the token for
```
Expand Down
7 changes: 6 additions & 1 deletion packages/cli/src/commands/dbms/access-token.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {flags} from '@oclif/command';

import BaseCommand from '../../base.command';
import {ARGS, FLAGS} from '../../constants';
import {ARGS, FLAGS, REQUIRED_FOR_SCRIPTS} from '../../constants';
import {AccessTokenModule} from '../../modules/dbms/access-token.module';

export default class AccessTokenCommand extends BaseCommand {
Expand All @@ -20,5 +20,10 @@ export default class AccessTokenCommand extends BaseCommand {
default: 'neo4j',
description: 'Neo4j DBMS user to create the token for',
}),
// @todo: change before merge
credentials: flags.string({
char: 'c',
required: REQUIRED_FOR_SCRIPTS,
}),
};
}
55 changes: 30 additions & 25 deletions packages/cli/src/modules/app/app.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ import AccessTokenCommand from '../../commands/dbms/access-token';
import OpenCommand from '../../commands/app/open';
import StartCommand from '../../commands/dbms/start';

const appRoot = 'fakeRoot';

jest.mock('cli-ux', () => {
return {
open: (): Promise<void> => Promise.resolve(),
};
});

jest.mock('node-fetch', () => {
return () =>
Promise.resolve({
ok: true,
json: () => ({appRoot}),
});
});

const JWT_REGEX = /^[A-Za-z0-9-_=]+\.[A-Za-z0-9-_=]+\.?[A-Za-z0-9-_.+/=]*$/m;
const TEST_ENVIRONMENT_ID = 'test';
let TEST_DB_NAME: string;
Expand All @@ -31,29 +41,24 @@ describe('$relate app', () => {
await Promise.all([extensions.teardown(), dbmss.teardown()]);
});

test.skip()
.stdout()
.it('logs app launch token', async (ctx) => {
await StartCommand.run([TEST_DB_NAME, '--environment', TEST_ENVIRONMENT_ID]);

// arbitrary wait for Neo4j to come online
await new Promise((resolve) => setTimeout(resolve, 25000));

await AccessTokenCommand.run([
TEST_DB_NAME,
'--principal=neo4j',
`--credentials=${TestDbmss.DBMS_CREDENTIALS}`,
`--environment=${TEST_ENVIRONMENT_ID}`,
]);

await OpenCommand.run([
testExtension.name,
`--dbmsId=${TEST_DB_NAME}`,
'--principal=neo4j',
`--environment=${TEST_ENVIRONMENT_ID}`,
'-L',
]);

expect(ctx.stdout).toEqual(expect.stringMatching(JWT_REGEX));
});
test.stdout().it('logs app launch token', async (ctx) => {
await StartCommand.run([TEST_DB_NAME, '--environment', TEST_ENVIRONMENT_ID]);

await AccessTokenCommand.run([
TEST_DB_NAME,
'--user=neo4j',
`--credentials=${TestDbmss.DBMS_CREDENTIALS}`,
`--environment=${TEST_ENVIRONMENT_ID}`,
]);

await OpenCommand.run([
testExtension.name,
`--dbmsId=${TEST_DB_NAME}`,
'--user=neo4j',
`--environment=${TEST_ENVIRONMENT_ID}`,
'-L',
]);

expect(ctx.stdout).toEqual(expect.stringMatching(JWT_REGEX));
});
});
2 changes: 2 additions & 0 deletions packages/cli/src/modules/app/open.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ export class OpenModule implements OnApplicationBootstrap {

private async getServerAppRoot(environment: Environment): Promise<string> {
let res: Response;

const error = new NotFoundError(`Could not connect to the @relate/web server`, [
'If you are connecting locally, run "relate-web start" and try again.',
'If you are connecting to a remote, ensure the "@relate/web" package is installed and running.',
]);

try {
res = await fetch(`${environment.httpOrigin}/health`);

if (!res.ok) {
throw error;
}
Expand Down
18 changes: 16 additions & 2 deletions packages/cli/src/modules/dbms/access-token.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Inject, Module, OnApplicationBootstrap} from '@nestjs/common';
import {AuthTokenModel, SystemModule, SystemProvider} from '@relate/common';
import {AuthTokenModel, HOOK_EVENTS, registerHookListener, SystemModule, SystemProvider} from '@relate/common';

import AccessTokenCommand from '../../commands/dbms/access-token';
import {selectDbmsPrompt, passwordPrompt, inputPrompt} from '../../prompts';
Expand All @@ -25,7 +25,8 @@ export class AccessTokenModule implements OnApplicationBootstrap {
const dbmsId =
args.dbms || (await selectDbmsPrompt('Select a DBMS to create an access token for', environment));
const principal = flags.user || (await inputPrompt('Enter a Neo4j DBMS user'));
const credentials = await passwordPrompt('Enter passphrase');
// eslint-disable-next-line no-restricted-properties
const credentials = flags.credentials || (await passwordPrompt('Enter passphrase'));

const dbms = await environment.dbmss.get(dbmsId);
const authToken = new AuthTokenModel({
Expand All @@ -34,6 +35,19 @@ export class AccessTokenModule implements OnApplicationBootstrap {
scheme: 'basic',
});

registerHookListener(HOOK_EVENTS.RUN_QUERY_RETRY, ({retry}) => {
if (retry < 1) {
return;
}

if (retry === 1) {
this.utils.log('DBMS connection not available, retrying...');
return;
}

this.utils.log('still retrying...');
});

return environment.dbmss
.createAccessToken(AccessTokenModule.DEFAULT_APP_ID, dbms.id, authToken)
.then((accessToken) =>
Expand Down
14 changes: 9 additions & 5 deletions packages/cli/src/modules/dbms/dbms.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ describe('$relate dbms', () => {
expect(ctx.stdout).toContain(DBMS_STATUS.STARTED);
});

test.skip()
.stdout()
// arbitrary wait for Neo4j to come online
.do(() => new Promise((resolve) => setTimeout(resolve, 25000)))
test.stdout()
.stdin(TestDbmss.DBMS_CREDENTIALS)
.it('logs access token', async (ctx) => {
await AccessTokenCommand.run([TEST_DB_NAME, '--principal=neo4j', '--environment', TEST_ENVIRONMENT_ID]);
await AccessTokenCommand.run([
TEST_DB_NAME,
'--user=neo4j',
'--environment',
TEST_ENVIRONMENT_ID,
'--credentials',
TestDbmss.DBMS_CREDENTIALS,
]);
expect(ctx.stdout).toEqual(expect.stringMatching(JWT_REGEX));
});

Expand Down
16 changes: 7 additions & 9 deletions packages/cli/src/modules/extension/extension.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ describe('$relate extension', () => {
expect(ctx.stdout).toContain(testExtension.name);
});

test.skip()
.stderr()
.it('throws when extension not in cache', async () => {
try {
await InstallCommand.run(['missing-extension', '-V', '1.0.0', '--environment', TEST_ENVIRONMENT_NAME]);
} catch (e) {
expect(e).toEqual(new NotFoundError('fetch and install [email protected]'));
}
});
test.stderr().it('throws when extension not in cache', async () => {
try {
await InstallCommand.run(['missing-extension', '-V', '1.0.0', '--environment', TEST_ENVIRONMENT_NAME]);
} catch (e) {
expect(e).toEqual(new NotFoundError(`Unable to find the requested extension: missing-extension online`));
}
});

test.stdout().it('uninstalls extension', async (ctx) => {
await UninstallCommand.run([testExtension.name, '--environment', TEST_ENVIRONMENT_NAME]);
Expand Down
19 changes: 15 additions & 4 deletions packages/common/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {SignOptions} from 'jsonwebtoken';

export const RELATE_IS_TESTING = process.env.NODE_ENV === 'test';

export const JSON_FILE_EXTENSION = '.json';
export const DOWNLOADING_FILE_EXTENSION = '.rdownload';

Expand All @@ -10,7 +12,7 @@ export const DBMS_DIR_NAME = 'dbmss';
export const NEW_LINE = '\n';
export const PROPERTIES_SEPARATOR = '=';
// @todo: this should be generated when installing daedalus instance
export const JWT_INSTANCE_TOKEN_SALT = 'hello world!';
export const RELATE_TOKEN_SALT_FILE_NAME = 'relate.secret.key';
export const TWENTY_FOUR_HOURS_SECONDS = 24 * 60 * 60;

export const EXTENSION_DIR_NAME = 'extensions';
Expand Down Expand Up @@ -43,9 +45,6 @@ export enum DBMS_TLS_LEVEL {
ENABLED = 'ENABLED',
}

export type Listener<T = any> = (eventData: T) => void | Promise<void>;
export type Actor<T = any> = (eventData: T) => T | Promise<T>;

export enum HOOK_EVENTS {
ELECTRON_WINDOW_OPTIONS = 'ELECTRON_WINDOW_OPTIONS',
ELECTRON_WINDOW_CREATED = 'ELECTRON_WINDOW_CREATED',
Expand All @@ -66,8 +65,16 @@ export enum HOOK_EVENTS {
RELATE_EXTENSION_DEPENDENCIES_INSTALL_START = 'RELATE_EXTENSION_DEPENDENCIES_INSTALL_START',
RELATE_EXTENSION_DEPENDENCIES_INSTALL_STOP = 'RELATE_EXTENSION_DEPENDENCIES_INSTALL_STOP',
DOWNLOAD_PROGRESS = 'DOWNLOAD_PROGRESS',
RUN_QUERY_RETRY = 'RUN_QUERY_RETRY',
}

export interface IHookEventPayloads {
[HOOK_EVENTS.RUN_QUERY_RETRY]: {query: string; params: any; retry: number};
[key: string]: any;
}
export type Listener<E extends HOOK_EVENTS> = (eventData: IHookEventPayloads[E]) => void | Promise<void>;
export type Actor<E extends HOOK_EVENTS, T = IHookEventPayloads[E]> = (eventData: T) => T | Promise<T>;

export const DEFAULT_JWT_SIGN_OPTIONS: SignOptions = {expiresIn: TWENTY_FOUR_HOURS_SECONDS};

export const AUTH_TOKEN_KEY = 'X-Auth-Token';
Expand Down Expand Up @@ -101,3 +108,7 @@ export enum PUBLIC_GRAPHQL_METHODS {
APP_LAUNCH_DATA = 'appLaunchData',
CREATE_APP_LAUNCH_TOKEN = 'createAppLaunchToken',
}

// seconds
export const CONNECTION_RETRY_STEP = RELATE_IS_TESTING ? 10 : 4;
export const MAX_CONNECTION_RETRIES = 5;
99 changes: 97 additions & 2 deletions packages/common/src/entities/dbmss/dbmss.abstract.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import {List} from '@relate/types';
import {IAuthToken} from '@huboneo/tapestry';
import {List, Str} from '@relate/types';
import {Driver, DRIVER_HEADERS, DRIVER_RESULT_TYPE, IAuthToken, JsonUnpacker} from '@huboneo/tapestry';
import * as rxjs from 'rxjs';
import * as rxjsOps from 'rxjs/operators';

import {IDbms, IDbmsInfo, IDbmsVersion} from '../../models';

import {EnvironmentAbstract} from '../environments';
import {PropertiesFile} from '../../system/files';
import {DbmsQueryError, ErrorAbstract, InvalidConfigError, NotAllowedError, NotFoundError} from '../../errors';
import {CONNECTION_RETRY_STEP, DBMS_STATUS, HOOK_EVENTS, MAX_CONNECTION_RETRIES} from '../../constants';
import {emitHookEvent} from '../../utils';

export type TapestryJSONResponse<Res = any> = {
header: {fields: string[]};
type: DRIVER_RESULT_TYPE;
data: Res;
};

export abstract class DbmssAbstract<Env extends EnvironmentAbstract> {
public dbmss: {[id: string]: IDbms} = {};
Expand All @@ -30,4 +41,88 @@ export abstract class DbmssAbstract<Env extends EnvironmentAbstract> {
abstract createAccessToken(appName: string, dbmsId: string, authToken: IAuthToken): Promise<string>;

abstract getDbmsConfig(dbmsId: string): Promise<PropertiesFile>;

runQuery<Res = any>(
driver: Driver<TapestryJSONResponse<Res>>,
query: string,
params: any = {},
retry = 0,
): rxjs.Observable<TapestryJSONResponse<Res>> {
return driver.query(query, params).pipe(
rxjsOps.tap((res) => {
if (res.type === DRIVER_RESULT_TYPE.FAILURE) {
throw new DbmsQueryError(`Failed to execute query`, res);
}
}),
rxjsOps.filter(({type}) => type === DRIVER_RESULT_TYPE.RECORD),
rxjsOps.catchError((e) => {
if (typeof retry !== 'number' || e instanceof ErrorAbstract) {
throw e;
}

const message = Str.from(e.message);

if (!message.includes('ECONNREFUSED') && retry > 2) {
throw new NotAllowedError('Unable to connect to DBMS');
}

if (retry < MAX_CONNECTION_RETRIES) {
const seconds = CONNECTION_RETRY_STEP * retry;

return rxjs.from([1]).pipe(
rxjsOps.flatMap(() =>
rxjs.from(
emitHookEvent(HOOK_EVENTS.RUN_QUERY_RETRY, {
query,
params,
retry: retry + 1,
}),
),
),
rxjsOps.delay(1000 * seconds),
rxjsOps.flatMap((update) => this.runQuery(driver, update.query, update.params, update.retry)),
);
}

throw new NotAllowedError('Unable to connect to DBMS');
}),
);
}

protected async getJSONDriverInstance(
dbmsId: string,
authToken: IAuthToken,
): Promise<Driver<TapestryJSONResponse>> {
const dbmsInfo = (await this.info([dbmsId])).first.getOrElse(() => {
throw new NotFoundError(`Could not find Dbms ${dbmsId}`);
});

if (dbmsInfo.status !== DBMS_STATUS.STARTED) {
throw new NotAllowedError(`Dbms ${dbmsId} is not started`);
}

try {
// @todo: add support for encrypted connections
const {hostname, port} = new URL(dbmsInfo.connectionUri);
const driver = new Driver<TapestryJSONResponse>({
connectionConfig: {
authToken,
host: hostname,
port: Number(port),
unpacker: JsonUnpacker,
getResponseHeader: (data): DRIVER_HEADERS => data[0] || DRIVER_HEADERS.FAILURE,
getResponseData: (data): any => data[1] || [],
},
mapToResult: (headerRecord, type, data) => ({
header: headerRecord,
type,
data,
}),
});

return driver;
} catch (e) {
throw new InvalidConfigError('Unable to connect to DBMS');
}
}
}
Loading