Skip to content

Commit

Permalink
fix: Upgrade grpc to JS implementation (#591)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnarea authored Aug 18, 2021
1 parent 8ba33b9 commit 612dbf7
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 547 deletions.
535 changes: 44 additions & 491 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
},
"homepage": "https://docs.relaycorp.tech/relaynet-internet-gateway/",
"dependencies": {
"@relaycorp/cogrpc": "^1.3.19",
"@grpc/grpc-js": "^1.3.7",
"@relaycorp/cogrpc": "^1.3.23",
"@relaycorp/keystore-vault": "^1.2.6",
"@relaycorp/object-storage": "^1.4.5",
"@relaycorp/pino-cloud": "^1.0.4",
Expand All @@ -82,8 +83,7 @@
"env-var": "^7.0.1",
"fastify": "^3.20.2",
"fastify-mongoose": "^0.3.0",
"grpc": "^1.24.11",
"grpc-health-check": "^1.8.0",
"grpc-js-health-check": "^1.0.2",
"it-pipe": "^1.1.0",
"mongoose": "^5.13.7",
"node-nats-streaming": "^0.3.2",
Expand Down
2 changes: 1 addition & 1 deletion src/functionalTests/cogrpc_server.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as grpc from '@grpc/grpc-js';
import { CogRPCClient, CogRPCError } from '@relaycorp/cogrpc';
import {
Cargo,
Expand All @@ -9,7 +10,6 @@ import {
} from '@relaycorp/relaynet-core';
import { deliverParcel } from '@relaycorp/relaynet-pohttp';
import bufferToArray from 'buffer-to-arraybuffer';
import grpc from 'grpc';
import { Message, Stan, Subscription } from 'node-nats-streaming';

import {
Expand Down
2 changes: 1 addition & 1 deletion src/services/cogrpc/_test_utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import grpc from 'grpc';
import * as grpc from '@grpc/grpc-js';
import { Duplex } from 'stream';

export class MockGrpcBidiCall<Input, Output> extends Duplex {
Expand Down
34 changes: 18 additions & 16 deletions src/services/cogrpc/server.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as grpc from '@grpc/grpc-js';
import { CargoRelayService } from '@relaycorp/cogrpc';
import * as grpc from 'grpc';
import * as grpcHealthCheck from 'grpc-health-check';
import * as grpcHealthCheck from 'grpc-js-health-check';
import { Logger } from 'pino';
import selfsigned from 'selfsigned';

Expand All @@ -17,11 +17,11 @@ const makeServiceImplementationSpy = mockSpy(
);
const mockServer = {
addService: mockSpy(jest.fn()),
bind: mockSpy(jest.fn(), () => 1),
bindAsync: mockSpy(jest.fn(), (_netloc, _credentials, cb) => cb()),
start: mockSpy(jest.fn()),
};
jest.mock('grpc', () => {
const grpcOriginal = jest.requireActual('grpc');
jest.mock('@grpc/grpc-js', () => {
const grpcOriginal = jest.requireActual('@grpc/grpc-js');
return {
...grpcOriginal,
Server: jest.fn().mockImplementation(() => mockServer),
Expand Down Expand Up @@ -161,9 +161,8 @@ describe('runServer', () => {
expect.anything(),
expect.objectContaining({
statusMap: {
'': grpcHealthCheck.messages.HealthCheckResponse.ServingStatus.SERVING,
'relaynet.cogrpc.CargoRelay':
grpcHealthCheck.messages.HealthCheckResponse.ServingStatus.SERVING,
'': grpcHealthCheck.servingStatus.SERVING,
'relaynet.cogrpc.CargoRelay': grpcHealthCheck.servingStatus.SERVING,
},
}),
);
Expand All @@ -172,24 +171,27 @@ describe('runServer', () => {
test('Server should listen on 0.0.0.0:8080', async () => {
await runServer();

expect(mockServer.bind).toBeCalledTimes(1);
expect(mockServer.bind).toBeCalledWith('0.0.0.0:8080', expect.anything());
expect(mockServer.bindAsync).toBeCalledTimes(1);
expect(mockServer.bindAsync).toBeCalledWith(
'0.0.0.0:8080',
expect.anything(),
expect.anything(),
);
});

test('Failing to listen on specified port should result in error', async () => {
mockServer.bind.mockReturnValueOnce(-1);
const bindError = new Error('Port is apparently taken');
mockServer.bindAsync.mockImplementation((_netloc, _credentials, cb) => cb(bindError));

await expect(() => runServer()).rejects.toMatchObject({
message: 'Failed to listen on 0.0.0.0:8080',
});
await expect(() => runServer()).rejects.toBe(bindError);
});

test('Server should use TLS with a self-issued certificate', async () => {
const spiedCreateSsl = jest.spyOn(grpc.ServerCredentials, 'createSsl');

await runServer();

expect(mockServer.bind).toBeCalledTimes(1);
expect(mockServer.bindAsync).toBeCalledTimes(1);
expect(spiedCreateSsl).toBeCalledWith(null, [
{
cert_chain: Buffer.from(mockSelfSignedOutput.cert),
Expand All @@ -215,7 +217,7 @@ describe('runServer', () => {

expect(mockServer.start).toBeCalledTimes(1);
expect(mockServer.start).toBeCalledWith();
expect(mockServer.start).toHaveBeenCalledAfter(mockServer.bind as jest.Mock);
expect(mockServer.start).toHaveBeenCalledAfter(mockServer.bindAsync as jest.Mock);
});

test('A log should be produced when the server is ready', async () => {
Expand Down
30 changes: 16 additions & 14 deletions src/services/cogrpc/server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { KeyCertPair, Server, ServerCredentials } from '@grpc/grpc-js';
import { CargoRelayService } from '@relaycorp/cogrpc';
import { get as getEnvVar } from 'env-var';
import { KeyCertPair, Server, ServerCredentials } from 'grpc';
import grpcHealthCheck from 'grpc-health-check';
import * as grpcHealthCheck from 'grpc-js-health-check';
import { Logger } from 'pino';
import * as selfsigned from 'selfsigned';
import { configureExitHandling } from '../../utilities/exitHandling';
Expand Down Expand Up @@ -46,23 +46,25 @@ export async function runServer(logger?: Logger): Promise<void> {
parcelStoreBucket,
publicAddress,
});
server.addService(CargoRelayService, serviceImplementation);
server.addService(CargoRelayService, serviceImplementation as any);

// TODO: Health checks should be probing backing services
const healthCheckService = new grpcHealthCheck.Implementation({
'': grpcHealthCheck.messages.HealthCheckResponse.ServingStatus.SERVING,
'relaynet.cogrpc.CargoRelay':
grpcHealthCheck.messages.HealthCheckResponse.ServingStatus.SERVING,
'': grpcHealthCheck.servingStatus.SERVING,
'relaynet.cogrpc.CargoRelay': grpcHealthCheck.servingStatus.SERVING,
});
server.addService(grpcHealthCheck.service, healthCheckService);
server.addService(grpcHealthCheck.service, healthCheckService as any);

const bindResult = server.bind(
NETLOC,
ServerCredentials.createSsl(null, [await selfIssueCertificate()]),
);
if (bindResult < 0) {
throw new Error(`Failed to listen on ${NETLOC}`);
}
const certificate = await selfIssueCertificate();
await new Promise((resolve, reject) => {
server.bindAsync(NETLOC, ServerCredentials.createSsl(null, [certificate]), (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
server.start();

baseLogger.info('Ready to receive requests');
Expand Down
2 changes: 1 addition & 1 deletion src/services/cogrpc/service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as grpc from '@grpc/grpc-js';
import { CargoDelivery, CargoDeliveryAck, CargoRelayServerMethodSet } from '@relaycorp/cogrpc';
import {
Cargo,
Expand All @@ -23,7 +24,6 @@ import {
import * as typegoose from '@typegoose/typegoose';
import bufferToArray from 'buffer-to-arraybuffer';
import { EventEmitter } from 'events';
import * as grpc from 'grpc';
import mongoose from 'mongoose';

import {
Expand Down
5 changes: 2 additions & 3 deletions src/services/cogrpc/service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as grpc from '@grpc/grpc-js';
import { CargoDelivery, CargoDeliveryAck, CargoRelayServerMethodSet } from '@relaycorp/cogrpc';
import { VaultPrivateKeyStore } from '@relaycorp/keystore-vault';
import {
Expand All @@ -8,11 +9,9 @@ import {
Gateway,
} from '@relaycorp/relaynet-core';
import bufferToArray from 'buffer-to-arraybuffer';
import * as grpc from 'grpc';
import pipe from 'it-pipe';
import { Connection } from 'mongoose';
import { Logger } from 'pino';
import * as streamToIt from 'stream-to-it';
import uuid from 'uuid-random';

import { createMongooseConnectionFromEnv, initMongoDBKeyStore } from '../../backingServices/mongo';
Expand Down Expand Up @@ -135,7 +134,7 @@ async function deliverCargo(
}

try {
await pipe(streamToIt.source(call), validateDelivery, natsPublisher, ackDelivery);
await pipe(call, validateDelivery, natsPublisher, ackDelivery);
} catch (err) {
logger.error({ err }, 'Failed to store cargo');
call.emit('error', INTERNAL_SERVER_ERROR); // Also ends the call
Expand Down
17 changes: 0 additions & 17 deletions src/types/grpc-health-check.d.ts

This file was deleted.

13 changes: 13 additions & 0 deletions src/types/grpc-js-health-check.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
declare module 'grpc-js-health-check' {
import { ServiceDefinition } from '@grpc/grpc-js';

export const service: ServiceDefinition<any>;
export class Implementation {
constructor(map: { readonly [key: string]: string });
}

export const servingStatus: {
readonly NOT_SERVING: any;
readonly SERVING: any;
};
}

0 comments on commit 612dbf7

Please sign in to comment.