diff --git a/packages/thrift-client/package.json b/packages/thrift-client/package.json index 0040a66c..e6840b9d 100644 --- a/packages/thrift-client/package.json +++ b/packages/thrift-client/package.json @@ -42,7 +42,7 @@ "request": "^2.83.0" }, "devDependencies": { - "@creditkarma/dynamic-config": "~0.4.3", + "@creditkarma/dynamic-config": "~0.5.0", "@creditkarma/thrift-server-core": "~0.4.3", "@creditkarma/thrift-server-express": "~0.4.3", "@creditkarma/thrift-server-hapi": "~0.4.3", @@ -61,6 +61,7 @@ "hapi": "^16.6.2", "lab": "^14.3.1", "request": "^2.83.0", + "request-promise-native": "^1.0.5", "rimraf": "^2.6.2", "ts-node": "^3.3.0", "tslint": "^5.7.0", diff --git a/packages/thrift-client/src/main/connections/HttpConnection.ts b/packages/thrift-client/src/main/connections/HttpConnection.ts index 511393b6..fdcdecfa 100644 --- a/packages/thrift-client/src/main/connections/HttpConnection.ts +++ b/packages/thrift-client/src/main/connections/HttpConnection.ts @@ -5,16 +5,28 @@ import { ThriftConnection, } from '@creditkarma/thrift-server-core' +import { + CoreOptions, + Request, + RequestAPI, + RequestResponse, + RequiredUriUrl, + UrlOptions, +} from 'request' + import { IHttpConnectionOptions, - IRequestMiddleware, IRequestResponse, - IResponseMiddleware, IThriftMiddleware, + IThriftMiddlewareConfig, RequestHandler, - ThriftMiddleware, + ThriftContext, } from '../types' +import { + deepMerge, +} from '../utils' + function normalizePath(path: string = '/'): string { if (path.startsWith('/')) { return path @@ -27,34 +39,8 @@ function normalizePath(path: string = '/'): string { export type HttpProtocol = 'http' | 'https' -export interface IMiddlewareMap { - response: Array - request: Array> -} - -export type IPromisedFunction = (val: T) => Promise - -async function reducePromises( - promises: Array>, - initial: T, -): Promise { - if (promises.length === 0) { - return initial - - } else { - const [head, ...tail] = promises - const nextValue: T = await head(initial) - if (tail.length === 0) { - return nextValue - - } else { - return reducePromises(tail, nextValue) - } - } -} - -function filterByMethod(method: string): (middleware: IThriftMiddleware) => boolean { - return (middleware: IThriftMiddleware): boolean => { +export function filterByMethod(method: string): (middleware: IThriftMiddleware) => boolean { + return (middleware: IThriftMiddleware): boolean => { return ( middleware.methods.length === 0 || middleware.methods.indexOf(method) > -1 @@ -62,94 +48,120 @@ function filterByMethod(method: string): (middleware: IThriftMiddleware) => bool } } -function getHandler(middleware: IRequestMiddleware): RequestHandler { +export function getHandler(middleware: IThriftMiddleware): RequestHandler { return middleware.handler } -export abstract class HttpConnection extends ThriftConnection { +export type RequestInstance = + RequestAPI + +export class HttpConnection extends ThriftConnection> { protected readonly port: number protected readonly hostName: string protected readonly path: string protected readonly url: string protected readonly protocol: HttpProtocol - protected readonly middleware: IMiddlewareMap + protected readonly middleware: Array> + private readonly request: RequestAPI - constructor(options: IHttpConnectionOptions) { + // constructor(request: RequestInstance, options: IHttpConnectionOptions) { + // super(options) + // this.request = request + // } + + constructor(request: RequestInstance, options: IHttpConnectionOptions) { super( getTransport(options.transport), getProtocol(options.protocol), ) + this.request = request this.port = options.port this.hostName = options.hostName this.path = normalizePath(options.path || '/thrift') this.protocol = ((options.https === true) ? 'https' : 'http') this.url = `${this.protocol}://${this.hostName}:${this.port}${this.path}` - this.middleware = { - response: [], - request: [], - } + this.middleware = [] } - // Provides an empty context for outgoing middleware - public abstract emptyContext(): Context - - public abstract write(dataToWrite: Buffer, context?: Context): Promise - - public register(...middleware: Array>): void { - middleware.forEach((next: ThriftMiddleware) => { - switch (next.type) { - case 'request': - return this.middleware.request.push({ - type: 'request', - methods: next.methods || [], - handler: next.handler, - }) - - default: - return this.middleware.response.push({ - type: 'response', - methods: next.methods || [], - handler: next.handler, - }) - } + public register(...middleware: Array>): void { + middleware.forEach((next: IThriftMiddlewareConfig) => { + this.middleware.push({ + methods: next.methods || [], + handler: next.handler, + }) }) } + // export type NextFunction = + // (data?: Buffer, options?: Options) => Promise + public send( dataToSend: Buffer, - context: Context = this.emptyContext(), + context: ThriftContext = this.emptyContext(), ): Promise { const requestMethod: string = readThriftMethod(dataToSend, this.Transport, this.Protocol) + const handlers: Array> = this.handlersForMethod(requestMethod) + + const applyHandlers = ( + data: Buffer, + currentContext: ThriftContext, + [head, ...tail]: Array>, + ): Promise => { + if (head === undefined) { + return this.write(data, currentContext) + + } else { + return head(data, currentContext, (nextData?: Buffer, nextContext?: CoreOptions): Promise => { + const resolvedContext = deepMerge(currentContext, (nextContext || {})) + return applyHandlers((nextData || data), resolvedContext, tail) + }) + } + } - return reducePromises( - this.handlersForMethod(requestMethod), - context, - ).then((resolvedContext: Context | undefined) => { - return this.write(dataToSend, resolvedContext).then((res: IRequestResponse) => { - return this.middleware.response - .filter(filterByMethod(requestMethod)) - .reduce((acc: Promise, next: IResponseMiddleware): Promise => { - return acc.then(next.handler) - }, Promise.resolve(res.body)) + return applyHandlers(dataToSend, context, handlers).then((res: IRequestResponse) => { + return res.body + }) + } + + public emptyContext(): ThriftContext { + return {} + } + + public write(dataToWrite: Buffer, options: CoreOptions = {}): Promise { + // Merge user options with required options + const requestOptions: CoreOptions & UrlOptions = deepMerge(options, { + method: 'POST', + body: dataToWrite, + encoding: null, // Needs to be explicitly set to null to get Buffer in response body + url: this.url, + headers: { + 'content-length': dataToWrite.length, + 'content-type': 'application/octet-stream', + }, + }) + + return new Promise((resolve, reject) => { + this.request(requestOptions, (err: any, response: RequestResponse, body: Buffer) => { + if (err !== null) { + reject(err) + + } else if (response.statusCode && (response.statusCode < 200 || response.statusCode > 299)) { + reject(response) + + } else { + resolve({ + statusCode: response.statusCode, + headers: response.headers, + body, + }) + } }) }) } - private handlersForMethod(name: string): Array> { - return this.middleware.request + private handlersForMethod(name: string): Array> { + return this.middleware .filter(filterByMethod(name)) .map(getHandler) } } - -/** - * interface NextFunction { - * (data?: Buffer, context?: IThriftContext): Promise - * } - * - * interface RequestHandler { - * (data: Buffer, context: IThriftContext, next: NextFunction): Promise - * } - * - * - */ diff --git a/packages/thrift-client/src/main/connections/RequestConnection.ts b/packages/thrift-client/src/main/connections/RequestConnection.ts deleted file mode 100644 index 6cd6cfb3..00000000 --- a/packages/thrift-client/src/main/connections/RequestConnection.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { - CoreOptions, - Request, - RequestAPI, - RequestResponse, - RequiredUriUrl, - UrlOptions, -} from 'request' - -import { - HttpConnection, -} from './HttpConnection' - -import { - IHttpConnectionOptions, - IRequestResponse, -} from '../types' - -import { - deepMerge, -} from '../utils' - -export type RequestInstance = - RequestAPI - -export class RequestConnection extends HttpConnection { - private readonly request: RequestAPI - - constructor(request: RequestInstance, options: IHttpConnectionOptions) { - super(options) - this.request = request - } - - public emptyContext(): CoreOptions { - return {} - } - - public write(dataToWrite: Buffer, options: CoreOptions = {}): Promise { - // Merge user options with required options - const requestOptions: CoreOptions & UrlOptions = deepMerge(options, { - method: 'POST', - body: dataToWrite, - encoding: null, // Needs to be explicitly set to null to get Buffer in response body - url: this.url, - headers: { - 'content-length': dataToWrite.length, - 'content-type': 'application/octet-stream', - }, - }) - - return new Promise((resolve, reject) => { - this.request(requestOptions, (err: any, response: RequestResponse, body: Buffer) => { - if (err !== null) { - reject(err) - - } else if (response.statusCode && (response.statusCode < 200 || response.statusCode > 299)) { - reject(response) - - } else { - resolve({ - statusCode: response.statusCode, - headers: response.headers, - body, - }) - } - }) - }) - } -} diff --git a/packages/thrift-client/src/main/connections/index.ts b/packages/thrift-client/src/main/connections/index.ts index b0c76ead..3cd1d2bd 100644 --- a/packages/thrift-client/src/main/connections/index.ts +++ b/packages/thrift-client/src/main/connections/index.ts @@ -2,26 +2,26 @@ import { IClientConstructor } from '@creditkarma/thrift-server-core' import * as request from 'request' import { - RequestConnection, + HttpConnection, RequestInstance, -} from './RequestConnection' +} from './HttpConnection' import { - ICreateClientOptions, + ICreateHttpClientOptions, + ThriftContext, } from '../types' export * from './HttpConnection' -export * from './RequestConnection' export function createClient( - ServiceClient: IClientConstructor, - options: ICreateClientOptions, + ServiceClient: IClientConstructor>, + options: ICreateHttpClientOptions, ): TClient { const requestClient: RequestInstance = request.defaults(options.requestOptions || {}) - const connection: RequestConnection = - new RequestConnection(requestClient, options) + const connection: HttpConnection = + new HttpConnection(requestClient, options) // Register optional middleware connection.register(...(options.register || [])) @@ -30,8 +30,8 @@ export function createClient( } export function createHttpClient( - ServiceClient: IClientConstructor, - options: ICreateClientOptions, + ServiceClient: IClientConstructor>, + options: ICreateHttpClientOptions, ): TClient { return createClient(ServiceClient, options) } diff --git a/packages/thrift-client/src/main/observability/zipkin.ts b/packages/thrift-client/src/main/observability/zipkin.ts index 909ead38..2dffa53b 100644 --- a/packages/thrift-client/src/main/observability/zipkin.ts +++ b/packages/thrift-client/src/main/observability/zipkin.ts @@ -1,83 +1,76 @@ -// import { Tracer, Instrumentation } from 'zipkin' import { - // getTracerForService + Instrumentation, + TraceId, + Tracer, +} from 'zipkin' + +import { + addL5Dheaders, + asyncScope, + getTracerForService, + hasL5DHeader, + IRequestContext, + IRequestHeaders, + IZipkinPluginOptions, } from '@creditkarma/thrift-server-core' + import { CoreOptions } from 'request' import { - IRequestMiddleware, - IThriftContext, + IRequestResponse, + IThriftMiddleware, + NextFunction, + ThriftContext, } from '../types' -export interface IZipkinPluginOptions { - serviceName: string - port?: number - debug?: boolean - endpoint?: string - sampleRate?: number +function applyL5DHeaders(incomingHeaders: IRequestHeaders, headers: IRequestHeaders): IRequestHeaders { + if (hasL5DHeader(incomingHeaders)) { + return addL5Dheaders(headers) + } else { + return headers + } } export function ZipkinTracePlugin({ - serviceName, + localServiceName, + remoteServiceName, port = 0, debug = false, endpoint, sampleRate, -}: IZipkinPluginOptions): IRequestMiddleware { +}: IZipkinPluginOptions): IThriftMiddleware { return { - type: 'request', methods: [], - handler(context: IThriftContext): Promise { - console.log('context: ', context) - // const tracer: Tracer = getTracerForService(serviceName, { debug, endpoint, sampleRate }) - // const instrumentation = new Instrumentation.HttpClient({ tracer, remoteServiceName: serviceName }) - // if (context.headers !== undefined) { - - // } - - return Promise.resolve(context.options || {}) - }, - } -} + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + const tracer: Tracer = getTracerForService(localServiceName, { debug, endpoint, sampleRate }) + const requestContext: IRequestContext | null = asyncScope.get('requestContext') + if (requestContext !== null) { + const traceId: TraceId = requestContext.traceId + const incomingHeaders: IRequestHeaders = requestContext.requestHeaders + tracer.setId(traceId) + return tracer.scoped(() => { + const instrumentation = new Instrumentation.HttpClient({ tracer, remoteServiceName }) + let { headers } = instrumentation.recordRequest({ headers: {} }, '', 'post') + headers = applyL5DHeaders(incomingHeaders, headers) -/** - * -const { - Instrumentation -} = require('zipkin'); + return next(data, { headers }).then((res: IRequestResponse) => { + tracer.scoped(() => { + instrumentation.recordResponse((traceId as any), `${res.statusCode}`) + }) -function wrapFetch(fetch, { tracer, serviceName, remoteServiceName }) { - const instrumentation = - new Instrumentation.HttpClient({ - tracer, - serviceName, - remoteServiceName - }); + return res - return function zipkinfetch(url, opts = {}) { - return new Promise((resolve, reject) => { - tracer.scoped(() => { - const method = opts.method || 'GET'; - const zipkinOpts = instrumentation.recordRequest(opts, url, method); - const traceId = tracer.id; + }, (err: any) => { + tracer.scoped(() => { + instrumentation.recordError((traceId as any), err) + }) - fetch(url, zipkinOpts).then(res => { - tracer.scoped(() => { - instrumentation.recordResponse(traceId, res.status); - }); - resolve(res); - - }).catch(err => { - tracer.scoped(() => { - instrumentation.recordError(traceId, err); - }); - reject(err); - }); - }); - }); - }; + return Promise.reject(err) + }) + }) + } else { + return next() + } + }, + } } - -module.exports = wrapFetch; - -*/ diff --git a/packages/thrift-client/src/main/types.ts b/packages/thrift-client/src/main/types.ts index 38650d21..5bb88f06 100644 --- a/packages/thrift-client/src/main/types.ts +++ b/packages/thrift-client/src/main/types.ts @@ -1,79 +1,54 @@ import { + IRequestHeaders, ProtocolType, TransportType, } from '@creditkarma/thrift-server-core' -import { TraceId } from 'zipkin' - import * as request from 'request' -export interface IRequestHeaders { - [name: string]: any -} - export interface IRequestResponse { statusCode: number headers: IRequestHeaders body: Buffer } -export interface IThriftContext { - options?: Options - traceId?: TraceId - headers?: IRequestHeaders -} +export type ThriftContext = + Options & { request?: { headers: IRequestHeaders} } + +export type ClientOptionsFunction = + () => ThriftContext -export interface IHttpConnectionOptions { +export interface IHttpConnectionOptions { hostName: string port: number path?: string https?: boolean transport?: TransportType protocol?: ProtocolType + context?: ThriftContext | ClientOptionsFunction } -export interface ICreateClientOptions extends IHttpConnectionOptions { +export interface ICreateHttpClientOptions extends IHttpConnectionOptions { serviceName?: string - register?: Array> + register?: Array> requestOptions?: request.CoreOptions } -export type MiddlewareType = - 'request' | 'response' - -export type ThriftMiddleware = - IResponseMiddlewareConfig | IRequestMiddlewareConfig - -export interface IThriftMiddlewareConfig { - type?: MiddlewareType - methods?: Array -} +export type NextFunction = + (data?: Buffer, options?: Options) => Promise -export type ResponseHandler = (data: Buffer) => Promise +export type RequestHandler = ( + data: Buffer, + context: ThriftContext, + next: NextFunction, +) => Promise -export type RequestHandler = (context: IThriftContext) => Promise - -export interface IResponseMiddlewareConfig extends IThriftMiddlewareConfig { - type?: 'response' - handler: ResponseHandler -} - -export interface IRequestMiddlewareConfig extends IThriftMiddlewareConfig { - type: 'request' - handler: RequestHandler -} - -export interface IThriftMiddleware { - type: MiddlewareType +export interface IThriftMiddleware { methods: Array + handler: RequestHandler } -export interface IResponseMiddleware extends IThriftMiddleware { - type: 'response' - handler: ResponseHandler -} - -export interface IRequestMiddleware extends IThriftMiddleware { - type: 'request' - handler: RequestHandler +export interface IThriftMiddlewareConfig { + methods?: Array + handler: RequestHandler } diff --git a/packages/thrift-client/src/tests/add-service.ts b/packages/thrift-client/src/tests/add-service.ts new file mode 100644 index 00000000..dc6bae03 --- /dev/null +++ b/packages/thrift-client/src/tests/add-service.ts @@ -0,0 +1,75 @@ +import { Int64 } from '@creditkarma/thrift-server-core' + +import { + createThriftServer, + zipkinPlugin, +} from '@creditkarma/thrift-server-hapi' + +import * as Hapi from 'hapi' + +import { ADD_SERVER_CONFIG } from './config' + +import { + AddService, +} from './generated/calculator/add-service' + +export function createServer(sampleRate: number = 0): Hapi.Server { + /** + * Implementation of our thrift service. + * + * Notice the second parameter, "context" - this is the Hapi request object, + * passed along to our service by the Hapi thrift plugin. Thus, you have access to + * all HTTP request data from within your service implementation. + */ + const impl = new AddService.Processor({ + ping(): void { + return + }, + add(a: number, b: number, context?: Hapi.Request): number { + return a + b + }, + addInt64(a: Int64, b: Int64, context?: Hapi.Request): Int64 { + return new Int64(a.toNumber() + b.toNumber()) + }, + }) + + /** + * Creates Hapi server with thrift endpoint. + */ + const server: Hapi.Server = createThriftServer({ + port: ADD_SERVER_CONFIG.port, + path: ADD_SERVER_CONFIG.path, + thriftOptions: { + serviceName: 'add-service', + handler: impl, + }, + }) + + server.register( + zipkinPlugin({ + localServiceName: 'add-service', + endpoint: 'http://localhost:9411/api/v1/spans', + sampleRate, + }), + (err: any) => { + if (err) { + console.log('error: ', err) + throw err + } + }, + ) + + /** + * The Hapi server can process requests that are not targeted to the thrift + * service + */ + server.route({ + method: 'GET', + path: '/control', + handler(request: Hapi.Request, reply: Hapi.ReplyWithContinue) { + reply('PASS') + }, + }) + + return server +} diff --git a/packages/thrift-client/src/tests/server.ts b/packages/thrift-client/src/tests/calculator-service.ts similarity index 75% rename from packages/thrift-client/src/tests/server.ts rename to packages/thrift-client/src/tests/calculator-service.ts index 3d8058c4..4073df8d 100644 --- a/packages/thrift-client/src/tests/server.ts +++ b/packages/thrift-client/src/tests/calculator-service.ts @@ -1,13 +1,19 @@ import { Int64 } from '@creditkarma/thrift-server-core' + import { createThriftServer, zipkinPlugin, } from '@creditkarma/thrift-server-hapi' + import * as Hapi from 'hapi' +import { CoreOptions } from 'request' import { SharedStruct, SharedUnion } from './generated/shared/shared' -import { SERVER_CONFIG } from './config' +import { + ADD_SERVER_CONFIG, + CALC_SERVER_CONFIG, +} from './config' import { Calculator, @@ -16,9 +22,30 @@ import { Work, } from './generated/calculator/calculator' -import { createClient } from '../main/index' +import { + AddService, +} from './generated/calculator/add-service' + +import { + createClient, + ThriftContext, + ZipkinTracePlugin, +} from '../main/index' + +export function createServer(sampleRate: number = 0): Hapi.Server { + // Create thrift client + const addServiceClient: AddService.Client> = + createClient(AddService.Client, { + hostName: ADD_SERVER_CONFIG.hostName, + port: ADD_SERVER_CONFIG.port, + register: [ ZipkinTracePlugin({ + localServiceName: 'calculator-service', + remoteServiceName: 'add-service', + endpoint: 'http://localhost:9411/api/v1/spans', + sampleRate, + }) ], + }) -export function createServer(): Hapi.Server { /** * Implementation of our thrift service. * @@ -30,11 +57,11 @@ export function createServer(): Hapi.Server { ping(): void { return }, - add(a: number, b: number): number { - return a + b + add(a: number, b: number, context?: Hapi.Request): Promise { + return addServiceClient.add(a, b) }, - addInt64(a: Int64, b: Int64, context?: Hapi.Request): Int64 { - return new Int64(a.toNumber() + b.toNumber()) + addInt64(a: Int64, b: Int64, context?: Hapi.Request): Promise { + return addServiceClient.addInt64(a, b) }, addWithContext(a: number, b: number, context?: Hapi.Request): number { if ( @@ -46,10 +73,10 @@ export function createServer(): Hapi.Server { throw new Error('Unauthorized') } }, - calculate(logId: number, work: Work): number { + calculate(logId: number, work: Work, context?: Hapi.Request): number | Promise { switch (work.op) { case Operation.ADD: - return work.num1 + work.num2 + return addServiceClient.add(work.num1, work.num2) case Operation.SUBTRACT: return work.num1 - work.num2 case Operation.DIVIDE: @@ -117,8 +144,8 @@ export function createServer(): Hapi.Server { * Creates Hapi server with thrift endpoint. */ const server: Hapi.Server = createThriftServer({ - port: SERVER_CONFIG.port, - path: SERVER_CONFIG.path, + port: CALC_SERVER_CONFIG.port, + path: CALC_SERVER_CONFIG.path, thriftOptions: { serviceName: 'calculator-service', handler: impl, @@ -127,8 +154,9 @@ export function createServer(): Hapi.Server { server.register( zipkinPlugin({ - serviceName: 'calculator-service', - sampleRate: 1, + localServiceName: 'calculator-service', + endpoint: 'http://localhost:9411/api/v1/spans', + sampleRate, }), (err: any) => { if (err) { @@ -140,9 +168,9 @@ export function createServer(): Hapi.Server { const client: Calculator.Client = createClient(Calculator.Client, { serviceName: 'calculator-service', - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, - path: SERVER_CONFIG.path, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, + path: CALC_SERVER_CONFIG.path, }) server.route({ diff --git a/packages/thrift-client/src/tests/client.ts b/packages/thrift-client/src/tests/client.ts index e56260a6..52641bc7 100644 --- a/packages/thrift-client/src/tests/client.ts +++ b/packages/thrift-client/src/tests/client.ts @@ -2,13 +2,12 @@ import { zipkinMiddleware } from '@creditkarma/thrift-server-express' import { createClient, - IThriftContext, + ThriftContext, ZipkinTracePlugin, } from '../main/' import * as express from 'express' import * as net from 'net' -import * as path from 'path' import { CoreOptions } from 'request' import { @@ -18,27 +17,30 @@ import { } from './generated/calculator/calculator' import { + CALC_SERVER_CONFIG, CLIENT_CONFIG, - SERVER_CONFIG, } from './config' -export function createClientServer(): Promise { +export function createClientServer(sampleRate: number = 0): Promise { // Get express instance const app = express() app.use(zipkinMiddleware({ - serviceName: 'calculator-client', - sampleRate: 1, + localServiceName: 'calculator-client', + endpoint: 'http://localhost:9411/api/v1/spans', + sampleRate, })) // Create thrift client - const thriftClient: Calculator.Client> = + const thriftClient: Calculator.Client> = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ ZipkinTracePlugin({ - serviceName: 'calculator-service', - sampleRate: 1, + localServiceName: 'calculator-client', + remoteServiceName: 'calculator-service', + endpoint: 'http://localhost:9411/api/v1/spans', + sampleRate, }) ], }) @@ -57,10 +59,6 @@ export function createClientServer(): Promise { } } - app.get('/', (req: express.Request, res: express.Response): void => { - res.sendFile(path.join(__dirname, './index.html')) - }) - app.get('/ping', (req: express.Request, res: express.Response): void => { thriftClient.ping().then(() => { res.send('success') diff --git a/packages/thrift-client/src/tests/config.ts b/packages/thrift-client/src/tests/config.ts index eefbd21c..f778b654 100644 --- a/packages/thrift-client/src/tests/config.ts +++ b/packages/thrift-client/src/tests/config.ts @@ -2,7 +2,12 @@ import { IHttpConnectionOptions, } from '../main' -export const SERVER_CONFIG: IHttpConnectionOptions = { +export const CALC_SERVER_CONFIG: IHttpConnectionOptions = { + hostName: 'localhost', + port: 8090, +} + +export const ADD_SERVER_CONFIG: IHttpConnectionOptions = { hostName: 'localhost', port: 8080, } diff --git a/packages/thrift-client/src/tests/connections/RequestConnection.spec.ts b/packages/thrift-client/src/tests/connections/HttpConnection.spec.ts similarity index 68% rename from packages/thrift-client/src/tests/connections/RequestConnection.spec.ts rename to packages/thrift-client/src/tests/connections/HttpConnection.spec.ts index 696e8ef6..1506ae35 100644 --- a/packages/thrift-client/src/tests/connections/RequestConnection.spec.ts +++ b/packages/thrift-client/src/tests/connections/HttpConnection.spec.ts @@ -1,17 +1,24 @@ import { readThriftMethod } from '@creditkarma/thrift-server-core' import * as Hapi from 'hapi' -import { RequestConnection, RequestInstance } from '../../main' +import { + HttpConnection, + IRequestResponse, + NextFunction, + RequestInstance, + ThriftContext, +} from '../../main' import * as request from 'request' import { CoreOptions } from 'request' -import { SERVER_CONFIG } from '../config' +import { CALC_SERVER_CONFIG } from '../config' import { expect } from 'code' import * as Lab from 'lab' -import { createServer } from '../server' +import { createServer as addService } from '../add-service' +import { createServer as calculatorService } from '../calculator-service' import { Calculator } from '../generated/calculator/calculator' @@ -22,29 +29,37 @@ const it = lab.it const before = lab.before const after = lab.after -describe('RequestConnection', () => { - let server: Hapi.Server +describe('HttpConnection', () => { + let calcServer: Hapi.Server + let addServer: Hapi.Server before(async () => { - server = createServer() - return server.start().then((err) => { + calcServer = calculatorService() + addServer = addService() + return Promise.all([ + calcServer.start(), + addServer.start(), + ]).then((err) => { console.log('Thrift server running') }) }) after(async () => { - return server.stop().then(() => { + return Promise.all([ + calcServer.stop(), + addServer.stop(), + ]).then((err) => { console.log('Thrift server stopped') }) }) describe('Basic Usage', () => { - let connection: RequestConnection - let client: Calculator.Client + let connection: HttpConnection + let client: Calculator.Client> before(async () => { const requestClient: RequestInstance = request.defaults({}) - connection = new RequestConnection(requestClient, SERVER_CONFIG) + connection = new HttpConnection(requestClient, CALC_SERVER_CONFIG) client = new Calculator.Client(connection) }) @@ -97,17 +112,16 @@ describe('RequestConnection', () => { it('should reject for a 500 server response', async () => { const requestClient: RequestInstance = request.defaults({}) - const badConnection: RequestConnection = new RequestConnection( + const badConnection: HttpConnection = new HttpConnection( requestClient, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, path: '/return500', }, ) - const badClient: Calculator.Client< - CoreOptions - > = new Calculator.Client(badConnection) + const badClient: Calculator.Client> = + new Calculator.Client(badConnection) return badClient.add(5, 7).then( (response: number) => { @@ -121,15 +135,15 @@ describe('RequestConnection', () => { it('should reject for a 400 server response', async () => { const requestClient: RequestInstance = request.defaults({}) - const badConnection: RequestConnection = new RequestConnection( + const badConnection: HttpConnection = new HttpConnection( requestClient, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, path: '/return400', }, ) - const badClient: Calculator.Client = + const badClient: Calculator.Client> = new Calculator.Client(badConnection) return badClient.add(5, 7).then( @@ -146,14 +160,14 @@ describe('RequestConnection', () => { const requestClient: RequestInstance = request.defaults({ timeout: 5000, }) - const badConnection: RequestConnection = new RequestConnection( + const badConnection: HttpConnection = new HttpConnection( requestClient, { hostName: 'fakehost', port: 8080, }, ) - const badClient: Calculator.Client = + const badClient: Calculator.Client> = new Calculator.Client(badConnection) return badClient.add(5, 7).then( @@ -172,16 +186,16 @@ describe('RequestConnection', () => { describe('IncomingMiddleware', () => { it('should resolve when middleware allows', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) connection.register({ - handler(data: Buffer): Promise { + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { if (readThriftMethod(data) === 'add') { - return Promise.resolve(data) + return next() } else { return Promise.reject( new Error( @@ -201,17 +215,17 @@ describe('RequestConnection', () => { it('should resolve when middleware passes method filter', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) connection.register({ methods: ['add'], - handler(data: Buffer): Promise { + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { if (readThriftMethod(data) === 'add') { - return Promise.resolve(data) + return next() } else { return Promise.reject( new Error( @@ -231,16 +245,16 @@ describe('RequestConnection', () => { it('should reject when middleware rejects', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) connection.register({ - handler(data: Buffer): Promise { + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { if (readThriftMethod(data) === 'nope') { - return Promise.resolve(data) + return next() } else { return Promise.reject( new Error( @@ -269,15 +283,15 @@ describe('RequestConnection', () => { it('should skip handler when middleware fails method filter', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) connection.register({ methods: ['nope'], - handler(data: Buffer): Promise { + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { return Promise.reject( new Error( `Unrecognized method name: ${readThriftMethod( @@ -297,22 +311,19 @@ describe('RequestConnection', () => { describe('OutgoingMiddleware', () => { it('should resolve when middleware adds auth token', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) connection.register({ - type: 'request', - handler(context: CoreOptions): Promise { - return Promise.resolve( - Object.assign({}, context, { - headers: { - 'X-Fake-Token': 'fake-token', - }, - }), - ) + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next(data, { + headers: { + 'X-Fake-Token': 'fake-token', + }, + }) }, }) @@ -323,23 +334,20 @@ describe('RequestConnection', () => { it('should resolve when middleware passes method filter', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) connection.register({ - type: 'request', methods: ['addWithContext'], - handler(context: CoreOptions): Promise { - return Promise.resolve( - Object.assign({}, context, { - headers: { - 'X-Fake-Token': 'fake-token', - }, - }), - ) + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next(data, { + headers: { + 'X-Fake-Token': 'fake-token', + }, + }) }, }) @@ -350,11 +358,11 @@ describe('RequestConnection', () => { it('should reject when middleware does not add auth token', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) return client.addWithContext(5, 7).then( (response: number) => { @@ -368,25 +376,22 @@ describe('RequestConnection', () => { ) }) - it('should resolve when middleware fails method filter', async () => { + it('should reject when middleware fails method filter', async () => { const requestClient: RequestInstance = request.defaults({}) - const connection: RequestConnection = new RequestConnection( + const connection: HttpConnection = new HttpConnection( requestClient, - SERVER_CONFIG, + CALC_SERVER_CONFIG, ) - const client = new Calculator.Client(connection) + const client = new Calculator.Client>(connection) connection.register({ - type: 'request', methods: ['add'], - handler(context: CoreOptions): Promise { - return Promise.resolve( - Object.assign({}, context, { + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next(data, { headers: { 'X-Fake-Token': 'fake-token', }, - }), - ) + }) }, }) diff --git a/packages/thrift-client/src/tests/connections/index.spec.ts b/packages/thrift-client/src/tests/connections/index.spec.ts index 86518a23..7471fb05 100644 --- a/packages/thrift-client/src/tests/connections/index.spec.ts +++ b/packages/thrift-client/src/tests/connections/index.spec.ts @@ -1,25 +1,31 @@ import { readThriftMethod } from '@creditkarma/thrift-server-core' import * as Hapi from 'hapi' -import { createClient } from '../../main' +import { + createClient, + IRequestResponse, + NextFunction, + ThriftContext, +} from '../../main' import { CoreOptions } from 'request' -import { SERVER_CONFIG } from '../config' +import { CALC_SERVER_CONFIG } from '../config' import { expect } from 'code' import * as Lab from 'lab' import { Calculator, - Choice, - FirstName, - LastName, + // Choice, + // FirstName, + // LastName, } from '../generated/calculator/calculator' -import { SharedStruct } from '../generated/shared/shared' +// import { SharedStruct } from '../generated/shared/shared' -import { createServer } from '../server' +import { createServer as addService } from '../add-service' +import { createServer as calculatorService } from '../calculator-service' export const lab = Lab.script() @@ -29,224 +35,234 @@ const before = lab.before const after = lab.after describe('createClient', () => { - let server: Hapi.Server + let calcServer: Hapi.Server + let addServer: Hapi.Server before(async () => { - server = createServer() - return server.start().then((err) => { + calcServer = calculatorService() + addServer = addService() + return Promise.all([ + calcServer.start(), + addServer.start(), + ]).then((err) => { console.log('Thrift server running') }) }) after(async () => { - return server.stop().then(() => { + return Promise.all([ + calcServer.stop(), + addServer.stop(), + ]).then((err) => { console.log('Thrift server stopped') }) }) - describe('Basic Usage', () => { - let client: Calculator.Client - - before(async () => { - client = createClient(Calculator.Client, SERVER_CONFIG) - }) - - it('should corrently handle a service client request', async () => { - return client.add(5, 7).then((response: number) => { - expect(response).to.equal(12) - }) - }) - - it('should corrently handle a void service client request', async () => { - return client.ping().then((response: any) => { - expect(response).to.equal(undefined) - }) - }) - - it('should corrently call endpoint with binary data', async () => { - const word: string = 'test_binary' - const data: Buffer = Buffer.from(word, 'utf-8') - return client.echoBinary(data).then((response: string) => { - expect(response).to.equal(word) - }) - }) - - it('should corrently call endpoint that string data', async () => { - const word: string = 'test_string' - return client.echoString(word).then((response: string) => { - expect(response).to.equal(word) - }) - }) - - it('should correctly call endpoint with lists as parameters', async () => { - return client - .mapOneList([1, 2, 3, 4]) - .then((response: Array) => { - expect>(response).to.equal([2, 3, 4, 5]) - }) - }) - - it('should correctly call endpoint with maps as parameters', async () => { - return client - .mapValues(new Map([['key1', 6], ['key2', 5]])) - .then((response: Array) => { - expect>(response).to.equal([6, 5]) - }) - }) - - it('should correctly call endpoint that returns a map', async () => { - return client - .listToMap([['key_1', 'value_1'], ['key_2', 'value_2']]) - .then((response: Map) => { - expect(response).to.equal( - new Map([['key_1', 'value_1'], ['key_2', 'value_2']]), - ) - }) - }) - - it('should call an endpoint with union arguments', async () => { - const firstName: Choice = new Choice({ - firstName: new FirstName({ name: 'Louis' }), - }) - const lastName: Choice = new Choice({ - lastName: new LastName({ name: 'Smith' }), - }) - - return Promise.all([ - client.checkName(firstName), - client.checkName(lastName), - ]).then((val: Array) => { - expect(val[0]).to.equal('FirstName: Louis') - expect(val[1]).to.equal('LastName: Smith') - }) - }) - - it('should call an endpoint with optional parameters', async () => { - return Promise.all([ - client.checkOptional('test_\nfirst'), - client.checkOptional(), - ]).then((val: Array) => { - expect(val[0]).to.equal('test_\nfirst') - expect(val[1]).to.equal('undefined') - }) - }) - - it('should corrently handle a service client request that returns a struct', async () => { - return client.getStruct(5).then((response: SharedStruct) => { - expect(response).to.equal( - new SharedStruct({ key: 0, value: 'test' }), - ) - }) - }) - - it('should corrently handle a service client request that returns a union', async () => { - return client.getUnion(1).then((response: any) => { - expect(response).to.equal({ option1: 'foo' }) - }) - }) - - it('should allow passing of a request context', async () => { - return client - .addWithContext(5, 7, { - headers: { 'X-Fake-Token': 'fake-token' }, - }) - .then((response: number) => { - expect(response).to.equal(12) - }) - }) - - it('should reject auth request without context', async () => { - return client.addWithContext(5, 7).then( - (response: number) => { - expect(false).to.equal(true) - }, - (err: any) => { - expect(err.message).to.equal('Unauthorized') - }, - ) - }) - - it('should reject for a 500 server response', async () => { - const badClient: Calculator.Client = createClient( - Calculator.Client, - { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, - path: '/return500', - }, - ) - - return badClient.add(5, 7).then( - (response: number) => { - throw new Error('Should reject with status 500') - }, - (err: any) => { - expect(err.statusCode).to.equal(500) - }, - ) - }) - - it('should reject for a 400 server response', async () => { - const badClient: Calculator.Client = createClient( - Calculator.Client, - { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, - path: '/return400', - }, - ) - - return badClient.add(5, 7).then( - (response: number) => { - throw new Error('Should reject with status 400') - }, - (err: any) => { - expect(err.statusCode).to.equal(400) - }, - ) - }) - - it('should reject for a request to a missing service', async () => { - const badClient: Calculator.Client = createClient( - Calculator.Client, - { - hostName: 'fakehost', - port: 8080, - }, - ) - - return badClient.add(5, 7).then( - (response: number) => { - throw new Error('Should reject with host not found') - }, - (err: any) => { - expect(err.message).to.equal( - 'getaddrinfo ENOTFOUND fakehost fakehost:8080', - ) - }, - ) - }) - }) + // describe('Basic Usage', () => { + // let client: Calculator.Client + + // before(async () => { + // client = createClient(Calculator.Client, SERVER_CONFIG) + // }) + + // it('should corrently handle a service client request', async () => { + // return client.add(5, 7).then((response: number) => { + // expect(response).to.equal(12) + // }) + // }) + + // it('should corrently handle a void service client request', async () => { + // return client.ping().then((response: any) => { + // expect(response).to.equal(undefined) + // }) + // }) + + // it('should corrently call endpoint with binary data', async () => { + // const word: string = 'test_binary' + // const data: Buffer = Buffer.from(word, 'utf-8') + // return client.echoBinary(data).then((response: string) => { + // expect(response).to.equal(word) + // }) + // }) + + // it('should corrently call endpoint that string data', async () => { + // const word: string = 'test_string' + // return client.echoString(word).then((response: string) => { + // expect(response).to.equal(word) + // }) + // }) + + // it('should correctly call endpoint with lists as parameters', async () => { + // return client + // .mapOneList([1, 2, 3, 4]) + // .then((response: Array) => { + // expect>(response).to.equal([2, 3, 4, 5]) + // }) + // }) + + // it('should correctly call endpoint with maps as parameters', async () => { + // return client + // .mapValues(new Map([['key1', 6], ['key2', 5]])) + // .then((response: Array) => { + // expect>(response).to.equal([6, 5]) + // }) + // }) + + // it('should correctly call endpoint that returns a map', async () => { + // return client + // .listToMap([['key_1', 'value_1'], ['key_2', 'value_2']]) + // .then((response: Map) => { + // expect(response).to.equal( + // new Map([['key_1', 'value_1'], ['key_2', 'value_2']]), + // ) + // }) + // }) + + // it('should call an endpoint with union arguments', async () => { + // const firstName: Choice = new Choice({ + // firstName: new FirstName({ name: 'Louis' }), + // }) + // const lastName: Choice = new Choice({ + // lastName: new LastName({ name: 'Smith' }), + // }) + + // return Promise.all([ + // client.checkName(firstName), + // client.checkName(lastName), + // ]).then((val: Array) => { + // expect(val[0]).to.equal('FirstName: Louis') + // expect(val[1]).to.equal('LastName: Smith') + // }) + // }) + + // it('should call an endpoint with optional parameters', async () => { + // return Promise.all([ + // client.checkOptional('test_\nfirst'), + // client.checkOptional(), + // ]).then((val: Array) => { + // expect(val[0]).to.equal('test_\nfirst') + // expect(val[1]).to.equal('undefined') + // }) + // }) + + // it('should corrently handle a service client request that returns a struct', async () => { + // return client.getStruct(5).then((response: SharedStruct) => { + // expect(response).to.equal( + // new SharedStruct({ key: 0, value: 'test' }), + // ) + // }) + // }) + + // it('should corrently handle a service client request that returns a union', async () => { + // return client.getUnion(1).then((response: any) => { + // expect(response).to.equal({ option1: 'foo' }) + // }) + // }) + + // it('should allow passing of a request context', async () => { + // return client + // .addWithContext(5, 7, { + // headers: { 'X-Fake-Token': 'fake-token' }, + // }) + // .then((response: number) => { + // expect(response).to.equal(12) + // }) + // }) + + // it('should reject auth request without context', async () => { + // return client.addWithContext(5, 7).then( + // (response: number) => { + // expect(false).to.equal(true) + // }, + // (err: any) => { + // expect(err.message).to.equal('Unauthorized') + // }, + // ) + // }) + + // it('should reject for a 500 server response', async () => { + // const badClient: Calculator.Client = createClient( + // Calculator.Client, + // { + // hostName: SERVER_CONFIG.hostName, + // port: SERVER_CONFIG.port, + // path: '/return500', + // }, + // ) + + // return badClient.add(5, 7).then( + // (response: number) => { + // throw new Error('Should reject with status 500') + // }, + // (err: any) => { + // expect(err.statusCode).to.equal(500) + // }, + // ) + // }) + + // it('should reject for a 400 server response', async () => { + // const badClient: Calculator.Client = createClient( + // Calculator.Client, + // { + // hostName: SERVER_CONFIG.hostName, + // port: SERVER_CONFIG.port, + // path: '/return400', + // }, + // ) + + // return badClient.add(5, 7).then( + // (response: number) => { + // throw new Error('Should reject with status 400') + // }, + // (err: any) => { + // expect(err.statusCode).to.equal(400) + // }, + // ) + // }) + + // it('should reject for a request to a missing service', async () => { + // const badClient: Calculator.Client = createClient( + // Calculator.Client, + // { + // hostName: 'fakehost', + // port: 8080, + // }, + // ) + + // return badClient.add(5, 7).then( + // (response: number) => { + // throw new Error('Should reject with host not found') + // }, + // (err: any) => { + // expect(err.message).to.equal( + // 'getaddrinfo ENOTFOUND fakehost fakehost:8080', + // ) + // }, + // ) + // }) + // }) describe('IncomingMiddleware', () => { it('should resolve when middleware allows', async () => { const client = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ { - handler(data: Buffer): Promise { - if (readThriftMethod(data) === 'add') { - return Promise.resolve(data) - } else { - return Promise.reject( - new Error( - `Unrecognized method name: ${readThriftMethod( - data, - )}`, - ), - ) - } + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next().then((res: IRequestResponse): Promise => { + if (readThriftMethod(res.body) === 'add') { + return Promise.resolve(res) + } else { + return Promise.reject( + new Error( + `Unrecognized method name: ${readThriftMethod( + data, + )}`, + ), + ) + } + }) }, }, ], @@ -259,23 +275,21 @@ describe('createClient', () => { it('should resolve when middleware passes method filter', async () => { const client = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ { methods: ['add'], - handler(data: Buffer): Promise { - if (readThriftMethod(data) === 'add') { - return Promise.resolve(data) - } else { - return Promise.reject( - new Error( - `Unrecognized method name: ${readThriftMethod( - data, - )}`, - ), - ) - } + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next().then((res: IRequestResponse) => { + if (readThriftMethod(res.body) === 'add') { + return Promise.resolve(res) + } else { + return Promise.reject( + new Error(`Unrecognized method name: ${readThriftMethod(res.body)}`), + ) + } + }) }, }, ], @@ -288,22 +302,20 @@ describe('createClient', () => { it('should reject when middleware rejects', async () => { const client = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ { - handler(data: Buffer): Promise { - if (readThriftMethod(data) === 'nope') { - return Promise.resolve(data) - } else { - return Promise.reject( - new Error( - `Unrecognized method name: ${readThriftMethod( - data, - )}`, - ), - ) - } + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next().then((res: IRequestResponse) => { + if (readThriftMethod(res.body) === 'nope') { + return Promise.resolve(res) + } else { + return Promise.reject( + new Error(`Unrecognized method name: ${readThriftMethod(res.body)}`), + ) + } + }) }, }, ], @@ -325,19 +337,21 @@ describe('createClient', () => { it('should skip handler when middleware fails method filter', async () => { const client = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ { methods: ['nope'], - handler(data: Buffer): Promise { - return Promise.reject( - new Error( - `Unrecognized method name: ${readThriftMethod( - data, - )}`, - ), - ) + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next().then(() => { + return Promise.reject( + new Error( + `Unrecognized method name: ${readThriftMethod( + data, + )}`, + ), + ) + }) }, }, ], @@ -352,19 +366,16 @@ describe('createClient', () => { describe('OutgoingMiddleware', () => { it('should resolve when middleware adds auth token', async () => { const client = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ { - type: 'request', - handler(context: CoreOptions): Promise { - return Promise.resolve( - Object.assign({}, context, { - headers: { - 'X-Fake-Token': 'fake-token', - }, - }), - ) + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next(data, { + headers: { + 'X-Fake-Token': 'fake-token', + }, + }) }, }, ], @@ -377,20 +388,17 @@ describe('createClient', () => { it('should resolve when middleware passes method filter', async () => { const client = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ { - type: 'request', methods: ['addWithContext'], - handler(context: CoreOptions): Promise { - return Promise.resolve( - Object.assign({}, context, { - headers: { - 'X-Fake-Token': 'fake-token', - }, - }), - ) + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next(data, { + headers: { + 'X-Fake-Token': 'fake-token', + }, + }) }, }, ], @@ -402,7 +410,7 @@ describe('createClient', () => { }) it('should reject when middleware does not add auth token', async () => { - const client = createClient(Calculator.Client, SERVER_CONFIG) + const client = createClient(Calculator.Client, CALC_SERVER_CONFIG) return client.addWithContext(5, 7).then( (response: number) => { @@ -418,20 +426,17 @@ describe('createClient', () => { it('should resolve when middleware fails method filter', async () => { const client = createClient(Calculator.Client, { - hostName: SERVER_CONFIG.hostName, - port: SERVER_CONFIG.port, + hostName: CALC_SERVER_CONFIG.hostName, + port: CALC_SERVER_CONFIG.port, register: [ { - type: 'request', methods: ['add'], - handler(context: CoreOptions): Promise { - return Promise.resolve( - Object.assign({}, context, { - headers: { - 'X-Fake-Token': 'fake-token', - }, - }), - ) + handler(data: Buffer, context: ThriftContext, next: NextFunction): Promise { + return next(data, { + headers: { + 'X-Fake-Token': 'fake-token', + }, + }) }, }, ], diff --git a/packages/thrift-client/src/tests/index.spec.ts b/packages/thrift-client/src/tests/index.spec.ts deleted file mode 100644 index e6044d30..00000000 --- a/packages/thrift-client/src/tests/index.spec.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { exec } from 'child_process' -import { expect } from 'code' -import * as Hapi from 'hapi' -import * as Lab from 'lab' -import * as net from 'net' - -import { - CLIENT_CONFIG, -} from './config' - -import { - createServer, -} from './server' - -import { - createClientServer, -} from './client' - -export const lab = Lab.script() - -const describe = lab.describe -const it = lab.it -const before = lab.before -const after = lab.after - -describe('Thrift Integration', () => { - let appServer: Hapi.Server - let clientServer: net.Server - - before(async () => { - appServer = createServer() - clientServer = await createClientServer() - - return appServer.start().then((err) => { - console.log('Thrift server running') - }) - }) - - after(async () => { - clientServer.close() - return appServer.stop().then(() => { - console.log('Thrift server stopped') - }) - }) - - it('should handle requests not pointed to thrift service', (done: any) => { - exec(`curl -G http://${CLIENT_CONFIG.hostName}:${CLIENT_CONFIG.port}/calculate --data-urlencode "left=5" --data-urlencode "op=add" --data-urlencode "right=9"`, (err, stout, sterr) => { - expect(stout).to.equal('result: 14') - done() - }) - }) -}) diff --git a/packages/thrift-client/src/tests/observability/config.ts b/packages/thrift-client/src/tests/observability/config.ts new file mode 100644 index 00000000..c11b934d --- /dev/null +++ b/packages/thrift-client/src/tests/observability/config.ts @@ -0,0 +1,4 @@ +export const COLLECTOR_CONFIG = { + host: 'localhost', + port: 9411, +} diff --git a/packages/thrift-client/src/tests/observability/index.spec.ts b/packages/thrift-client/src/tests/observability/index.spec.ts new file mode 100644 index 00000000..12c2b734 --- /dev/null +++ b/packages/thrift-client/src/tests/observability/index.spec.ts @@ -0,0 +1,241 @@ +import { + randomTraceId, + serializeLinkerdHeader, + traceIdFromTraceId, + ITraceId, +} from '@creditkarma/thrift-server-core' +import { expect } from 'code' +import * as Hapi from 'hapi' +import * as Lab from 'lab' +import * as net from 'net' +import * as rp from 'request-promise-native' + +import { + CLIENT_CONFIG, +} from '../config' + +import { createServer as addService } from '../add-service' +import { createServer as calculatorService } from '../calculator-service' +import { createServer as mockCollector } from './mock-collector' + +import { + createClientServer, +} from '../client' + +export const lab = Lab.script() + +const describe = lab.describe +const it = lab.it +const before = lab.before +const after = lab.after + +describe('Observability', () => { + let calcServer: Hapi.Server + let addServer: Hapi.Server + let clientServer: net.Server + let collectServer: net.Server + + before(async () => { + calcServer = calculatorService(1) + addServer = addService(1) + clientServer = await createClientServer(1) + collectServer = await mockCollector() + return Promise.all([ + calcServer.start(), + addServer.start(), + ]).then((err) => { + console.log('Thrift server running') + }) + }) + + after(async () => { + clientServer.close() + collectServer.close() + return Promise.all([ + calcServer.stop(), + addServer.stop(), + ]).then((err) => { + console.log('Thrift server stopped') + }) + }) + + it('should correctly trace request using B3 headers', (done: any) => { + const traceId_1: string = randomTraceId() + const traceId_2: string = randomTraceId() + Promise.all([ + rp(`http://${CLIENT_CONFIG.hostName}:${CLIENT_CONFIG.port}/calculate`, { + qs: { + left: 5, + op: 'add', + right: 9, + }, + headers: { + 'x-b3-traceid': traceId_1, + 'x-b3-spanid': traceId_1, + 'x-b3-parentspanid': traceId_1, + 'x-b3-sampled': true, + }, + }), + rp(`http://${CLIENT_CONFIG.hostName}:${CLIENT_CONFIG.port}/calculate`, { + qs: { + left: 7, + op: 'add', + right: 22, + }, + headers: { + 'x-b3-traceid': traceId_2, + 'x-b3-spanid': traceId_2, + 'x-b3-parentspanid': traceId_2, + 'x-b3-sampled': true, + }, + }), + ]).then((val: any) => { + expect(val).to.equal(['result: 14', 'result: 29']) + setTimeout(() => { + rp('http://localhost:9411/api/v1/spans').then((traces: any) => { + const result = JSON.parse(traces) + expect(Object.keys(result).length).to.equal(2) + expect(result[traceId_1]).to.exist() + expect(result[traceId_2]).to.exist() + expect(Object.keys(result[traceId_1]).length).to.equal(3) + expect(Object.keys(result[traceId_2]).length).to.equal(3) + done() + }) + }, 3000) + }) + }) + + it('should correctly trace request using L5D headers', (done: any) => { + const traceId_1: string = randomTraceId() + const traceId_2: string = randomTraceId() + Promise.all([ + rp(`http://${CLIENT_CONFIG.hostName}:${CLIENT_CONFIG.port}/calculate`, { + qs: { + left: 5, + op: 'add', + right: 9, + }, + headers: { + 'l5d-ctx-trace': serializeLinkerdHeader(traceIdFromTraceId({ + traceId: traceId_1, + spanId: traceId_1, + parentId: traceId_1, + sampled: true, + })), + }, + }), + rp(`http://${CLIENT_CONFIG.hostName}:${CLIENT_CONFIG.port}/calculate`, { + qs: { + left: 7, + op: 'add', + right: 22, + }, + headers: { + 'l5d-ctx-trace': serializeLinkerdHeader(traceIdFromTraceId({ + traceId: traceId_2, + spanId: traceId_2, + parentId: traceId_2, + sampled: true, + })), + }, + }), + ]).then((val: any) => { + expect(val).to.equal(['result: 14', 'result: 29']) + setTimeout(() => { + rp('http://localhost:9411/api/v1/spans').then((traces: any) => { + const result = JSON.parse(traces) + expect(Object.keys(result).length).to.equal(2) + expect(result[traceId_1]).to.exist() + expect(result[traceId_2]).to.exist() + expect(Object.keys(result[traceId_1]).length).to.equal(3) + expect(Object.keys(result[traceId_2]).length).to.equal(3) + done() + }) + }, 3000) + }) + }) + + it('should use B3 headers if traceIds do not match', (done: any) => { + const traceId_1: string = randomTraceId() + const traceId_2: string = randomTraceId() + Promise.all([ + rp(`http://${CLIENT_CONFIG.hostName}:${CLIENT_CONFIG.port}/calculate`, { + qs: { + left: 5, + op: 'add', + right: 9, + }, + headers: { + 'l5d-ctx-trace': serializeLinkerdHeader(traceIdFromTraceId({ + traceId: traceId_1, + spanId: traceId_1, + parentId: traceId_1, + sampled: true, + })), + 'x-b3-traceid': traceId_2, + 'x-b3-spanid': traceId_2, + 'x-b3-parentspanid': traceId_2, + 'x-b3-sampled': true, + }, + }), + ]).then((val: any) => { + expect(val).to.equal(['result: 14']) + setTimeout(() => { + rp('http://localhost:9411/api/v1/spans').then((traces: any) => { + const result = JSON.parse(traces) + expect(Object.keys(result).length).to.equal(1) + expect(Object.keys(result)[0]).to.equal(traceId_2) + expect(Object.keys(result[traceId_2]).length).to.equal(3) + done() + }) + }, 3000) + }) + }) + + it('should use L5D headers if traceIds do match', (done: any) => { + const traceId_1: string = randomTraceId() + const trace_1: ITraceId = { + traceId: traceId_1, + spanId: randomTraceId(), + parentId: randomTraceId(), + sampled: true, + } + const trace_2: ITraceId = { + traceId: traceId_1, + spanId: randomTraceId(), + parentId: randomTraceId(), + sampled: true, + } + Promise.all([ + rp(`http://${CLIENT_CONFIG.hostName}:${CLIENT_CONFIG.port}/calculate`, { + qs: { + left: 5, + op: 'add', + right: 9, + }, + headers: { + 'l5d-ctx-trace': serializeLinkerdHeader( + traceIdFromTraceId(trace_1) + ), + 'x-b3-traceid': trace_2.traceId, + 'x-b3-spanid': trace_2.spanId, + 'x-b3-parentspanid': trace_2.parentId, + 'x-b3-sampled': true, + }, + }), + ]).then((val: any) => { + expect(val).to.equal(['result: 14']) + setTimeout(() => { + rp('http://localhost:9411/api/v1/spans').then((traces: any) => { + const result = JSON.parse(traces) + const piece = result[trace_1.traceId][trace_1.spanId] + expect(Object.keys(result).length).to.equal(1) + expect(piece.traceId).to.equal(trace_1.traceId) + expect(piece.id).to.equal(trace_1.spanId) + expect(piece.parentId).to.equal(trace_1.parentId) + done() + }) + }, 3000) + }) + }) +}) diff --git a/packages/thrift-client/src/tests/observability/mock-collector.ts b/packages/thrift-client/src/tests/observability/mock-collector.ts new file mode 100644 index 00000000..a6b00827 --- /dev/null +++ b/packages/thrift-client/src/tests/observability/mock-collector.ts @@ -0,0 +1,65 @@ +import * as bodyParser from 'body-parser' +import * as express from 'express' +import * as net from 'net' + +import { + COLLECTOR_CONFIG, +} from './config' + +// http://localhost:9411/api/v1/spans + +export function serviceName(span: any): string | undefined { + if (span.annotations && span.annotations.length) { + if (span.annotations[0].endpoint) { + return span.annotations[0].endpoint.serviceName + } + } +} + +export function createServer(): Promise { + // Get express instance + const app = express() + + app.use(bodyParser.json()) + + let traces: any = {} + + app.post('/api/v1/spans', (req: express.Request, res: express.Response): void => { + if (req.body && req.body.length) { + req.body.forEach((next: any) => { + const traceId = next.traceId + const id = next.id + if (traces[traceId] === undefined) { + traces[traceId] = {} + } + + // traces[traceId][id] = next + traces[traceId][id] = { + traceId: next.traceId, + id: next.id, + parentId: next.parentId, + duration: next.duration, + serviceName: serviceName(next), + } + }) + } + + res.sendStatus(202) + }) + + app.get('/api/v1/spans', (req: express.Request, res: express.Response): void => { + res.send(traces) + traces = {} + }) + + return new Promise((resolve, reject) => { + const server: net.Server = app.listen(COLLECTOR_CONFIG.port, (err: any) => { + if (err) { + reject(err) + } else { + console.log(`MockCollector listening on port[${COLLECTOR_CONFIG.port}]`) + resolve(server) + } + }) + }) +} diff --git a/packages/thrift-client/src/tests/observability/mockCollector.ts b/packages/thrift-client/src/tests/observability/mockCollector.ts deleted file mode 100644 index e69de29b..00000000 diff --git a/packages/thrift-client/src/tests/thrift/add-service.thrift b/packages/thrift-client/src/tests/thrift/add-service.thrift new file mode 100644 index 00000000..7f6b518d --- /dev/null +++ b/packages/thrift-client/src/tests/thrift/add-service.thrift @@ -0,0 +1,26 @@ +/** + * Thrift files can namespace, package, or prefix their output in various + * target languages. + */ +namespace cpp calculator +namespace d calculator +namespace dart calculator +namespace java calculator +namespace php calculator +namespace perl calculator +namespace haxe calculator +namespace netcore calculator + +/** + * Ahh, now onto the cool part, defining a service. Services just need a name + * and can optionally inherit from another service using the extends keyword. + */ +service AddService { + + void ping(), + + i32 add(1: i32 num1, 2: i32 num2), + + i64 addInt64(1: i64 num1, 2: i64 num2), + +} diff --git a/packages/thrift-client/tslint.json b/packages/thrift-client/tslint.json index 1540592c..22a2243a 100644 --- a/packages/thrift-client/tslint.json +++ b/packages/thrift-client/tslint.json @@ -1,6 +1,7 @@ { "extends": "tslint:recommended", "rules": { + "variable-name": false, "max-line-length": false, "jsdoc-format": false, "object-literal-sort-keys": false, diff --git a/packages/thrift-server-core/package.json b/packages/thrift-server-core/package.json index be246f6b..c84c4e72 100644 --- a/packages/thrift-server-core/package.json +++ b/packages/thrift-server-core/package.json @@ -22,7 +22,7 @@ "pretest": "npm run build", "test": "npm run test:only --", "test:watch": "watch 'npm run test:only' ./dist", - "test:only": "lab --verbose -l -S -P spec dist/tests/unit" + "test:only": "lab --timeout 15000 --verbose -l -S -P spec dist/tests/unit" }, "author": "Credit Karma", "license": "Apache-2.0", diff --git a/packages/thrift-server-core/src/main/async-hooks/AsyncHook.ts b/packages/thrift-server-core/src/main/async-hooks/AsyncHook.ts new file mode 100644 index 00000000..abe34c1c --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/AsyncHook.ts @@ -0,0 +1,44 @@ +import { IAsyncHook, IHookCallbacks } from './types' + +export class AsyncHook implements IAsyncHook { + private enabled: boolean = false + private callbacks: IHookCallbacks + + constructor(callbacks: IHookCallbacks) { + this.callbacks = callbacks + } + + public init(asyncId: number, type: string, triggerAsyncId: number, resource: object): void { + if (this.enabled && this.callbacks.init) { + this.callbacks.init(asyncId, type, triggerAsyncId, resource) + } + } + + public before(asyncId: number): void { + if (this.enabled && this.callbacks.before) { + this.callbacks.before(asyncId) + } + } + + public after(asyncId: number): void { + if (this.enabled && this.callbacks.after) { + this.callbacks.after(asyncId) + } + } + + public destroy(asyncId: number): void { + if (this.enabled && this.callbacks.destroy) { + this.callbacks.destroy(asyncId) + } + } + + public enable(): this { + this.enabled = true + return this + } + + public disable(): this { + this.enabled = false + return this + } +} diff --git a/packages/thrift-server-core/src/main/async-hooks/AsyncHooks.ts b/packages/thrift-server-core/src/main/async-hooks/AsyncHooks.ts new file mode 100644 index 00000000..1617f5ae --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/AsyncHooks.ts @@ -0,0 +1,97 @@ +import { AsyncHook } from './AsyncHook' +import { Hooks } from './Hooks' +import { patches } from './patches' +import { State } from './State' +import { IAsyncHook, IHookCallbacks } from './types' +const asyncWrap: any = (process as any).binding('async_wrap') + +const state = new State() +const hooks = new Hooks(state) + +export const version: number = require('../../../package.json').version + +for (const key of Object.keys(patches)) { + patches[key](hooks, state) +} + +asyncWrap.setupHooks({ + init: hooks.init, + pre: hooks.pre, + post: hooks.post, + destroy: hooks.destroy, +}) + +asyncWrap.enable() + +export function createHook(callbacks: IHookCallbacks): IAsyncHook { + const hook: AsyncHook = new AsyncHook(callbacks) + hooks.add(hook) + return hook +} + +export function executionAsyncId(): number { + return state.currentId +} + +export function triggerAsyncId(): number { + return state.parentId +} + +// export class AsyncHooks { +// public version: number +// private state: State +// private hooks: Hooks +// // private providers: any + +// constructor() { +// this.state = new State() +// this.hooks = new Hooks(this.state) + +// // expose version for conflict detection +// this.version = require('../../../package.json').version + +// // expose the Providers map +// // this.providers = asyncWrap.Providers + +// // apply patches +// for (const key of Object.keys(patches)) { +// patches[key](this.hooks, this.state) +// } + +// // setup async wrap +// // if (!process.env.hasOwnProperty('NODE_ASYNC_HOOK_NO_WARNING')) { +// // console.error('warning: you are using async-hook which is unstable.') +// // } + +// asyncWrap.setupHooks({ +// init: this.hooks.init, +// pre: this.hooks.pre, +// post: this.hooks.post, +// destroy: this.hooks.destroy, +// }) + +// asyncWrap.enable() +// } + +// public addHooks(hooks: any) { +// this.hooks.add(hooks) +// } + +// public removeHooks(hooks: any) { +// this.hooks.remove(hooks) +// } + +// public enable() { +// this.state.enabled = true +// // asyncWrap.enable(); +// } + +// public disable() { +// this.state.enabled = false +// asyncWrap.disable() +// } + +// public executionAsyncId(): number { +// return this.state.currentId +// } +// } diff --git a/packages/thrift-server-core/src/main/async-hooks/Hooks.ts b/packages/thrift-server-core/src/main/async-hooks/Hooks.ts new file mode 100644 index 00000000..06931c17 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/Hooks.ts @@ -0,0 +1,102 @@ +// import { debug } from './debug' +import { State } from './State' + +import { + AsyncHook, +} from './AsyncHook' + +const asyncWrap: any = (process as any).binding('async_wrap') +const TIMERWRAP: number = asyncWrap.Providers.TIMERWRAP +const ignoreUIDs = new Set() + +export class Hooks { + public hooks: Array = [] + + public init: (uid: number, provider: any, parentId: number, parentHandle: any) => void + public pre: (uid: number) => void + public post: (uid: number, didThrow: boolean) => void + public destroy: (uid: number) => void + + private state: State + + constructor(state: State) { + this.state = state + + this.init = (uid: number, provider: any, parentUid: number, parentHandle: any) => { + // Ignore TIMERWRAP, since setTimeout etc. is monkey patched + if (provider === TIMERWRAP) { + ignoreUIDs.add(uid) + return + } + + const thisId = (this.state.nextId += 1) + this.state.idMap.set(uid, thisId) + + // debug(`init: id: ${uid}`) + // debug(`init: parent: ${parentUid}`) + // debug(`init: provider: `, provider) + // debug(`init: handle: `, parentHandle) + + // call hooks + for (const hook of this.hooks) { + hook.init(thisId, provider, this.state.currentId, parentHandle) + } + } + + this.pre = (uid: number) => { + this.state.previousIds.push(this.state.currentId) + this.state.currentId = this.state.idMap.get(uid) || 0 + if (ignoreUIDs.has(uid)) { + return + } + + // debug(`pre: id: ${this.state.currentId}`) + + // call hooks + for (const hook of this.hooks) { + hook.before(this.state.currentId) + } + } + + this.post = (uid: number, didThrow: boolean) => { + const thisId = this.state.idMap.get(uid) || 0 + this.state.currentId = this.state.previousIds.pop() || 0 + if (ignoreUIDs.has(uid)) { return } + + // debug(`post: id: ${thisId}`) + + // call hooks + for (const hook of this.hooks) { + hook.after(thisId) + } + } + + this.destroy = (uid: number) => { + // Cleanup the ignore list if this uid should be ignored + if (ignoreUIDs.has(uid)) { + ignoreUIDs.delete(uid) + return + } + + if (this.state.idMap.has(uid)) { + const thisId = this.state.idMap.get(uid) || 0 + this.state.idMap.delete(uid) + + // debug(`destroy: id: ${thisId}`) + + // call hooks + for (const hook of this.hooks) { + hook.destroy(thisId) + } + } + } + } + + public add(hook: AsyncHook) { + this.hooks.push(hook) + } + + public remove(hook: AsyncHook) { + this.hooks = this.hooks.filter((next: AsyncHook) => next !== hook) + } +} diff --git a/packages/thrift-server-core/src/main/async-hooks/State.ts b/packages/thrift-server-core/src/main/async-hooks/State.ts new file mode 100644 index 00000000..3fdfa295 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/State.ts @@ -0,0 +1,8 @@ +export class State { + public enabled: boolean = true + public previousIds: Array = [] + public nextId: number = 0 + public currentId: number = 0 + public parentId: number = 0 + public idMap: Map = new Map() +} diff --git a/packages/thrift-server-core/src/main/async-hooks/debug.ts b/packages/thrift-server-core/src/main/async-hooks/debug.ts new file mode 100644 index 00000000..43e6a4e0 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/debug.ts @@ -0,0 +1,11 @@ +import * as fs from 'fs' +import * as util from 'util' + +/** + * Using `console.log` causes new AsyncResources to be created, so using `console.log` in + * the AsyncHooks callbacks will cause infinite stack growth. Using this function for debugging + * is synchronous and avoids this issue. + */ +export function debug(msg: string, ...args: Array): void { + fs.writeSync(1, `${util.format(msg, ...args)}\n`) +} diff --git a/packages/thrift-server-core/src/main/async-hooks/index.ts b/packages/thrift-server-core/src/main/async-hooks/index.ts new file mode 100644 index 00000000..ee8b010b --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/index.ts @@ -0,0 +1,41 @@ +import { IAsyncHooks } from './types' +import { packageExists } from './utils' + +let instance: IAsyncHooks + +// If a another copy (same version or not) of stack-chain exists it will result +// in wrong stack traces (most likely dublicate callSites). +if ((global as any)._asyncHook !== undefined) { + // In case the version match, we can simply return the first initialized copy + if ((global as any)._asyncHook.version === require('./package.json').version) { + instance = (global as any)._asyncHook + + } else { + throw new Error('Conflicting version of async-hook found') + } + +} else if (packageExists('async_hooks')) { + instance = require('async_hooks') + +} else { + const stackChain: any = require('stack-chain') + + // Remove callSites from this module. AsyncWrap doesn't have any callSites + // and the hooks are expected to be completely transparent. + stackChain.filter.attach((error: any, frames: any) => { + return frames.filter((callSite: any) => { + const filename = callSite.getFileName() + // filename is not always a string, for example in case of eval it is + // undefined. So check if the filename is defined. + return !(filename && filename.slice(0, __dirname.length) === __dirname) + }) + }) + + instance = require('./AsyncHooks'); + + (global as any)._asyncHook = instance! +} + +export * from './debug' +export * from './types' +export const asyncHooks = instance diff --git a/packages/thrift-server-core/src/main/async-hooks/patches/index.ts b/packages/thrift-server-core/src/main/async-hooks/patches/index.ts new file mode 100644 index 00000000..be51a5f3 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/patches/index.ts @@ -0,0 +1,14 @@ +import { patchNextTick } from './next-tick' +import { patchPromise } from './promise' +import { patchTimers } from './timers' +import { Patch } from './types' + +export interface IPatchMap { + [name: string]: Patch +} + +export const patches: IPatchMap = { + nextTick: patchNextTick, + promise: patchPromise, + timers: patchTimers, +} diff --git a/packages/thrift-server-core/src/main/async-hooks/patches/next-tick.ts b/packages/thrift-server-core/src/main/async-hooks/patches/next-tick.ts new file mode 100644 index 00000000..475882e2 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/patches/next-tick.ts @@ -0,0 +1,54 @@ +import { Hooks } from '../Hooks' +import { State } from '../State' + +class NextTickWrap {} + +export function patchNextTick(hooks: Hooks, state: State) { + const oldNextTick = process.nextTick + process.nextTick = function tick() { + if (!state.enabled) { + return oldNextTick.apply(process, arguments) + } + + const args = Array.from(arguments) + const callback = args[0] + + if (typeof callback !== 'function') { + throw new TypeError('callback is not a function') + } + + const handle = new NextTickWrap() + const uid = state.nextId += 1 + + // call the init hook + hooks.init.call(handle, uid, 0, state.currentId, null) + + // overwrite callback + args[0] = function() { + // call the pre hook + hooks.pre.call(handle, uid) + + let didThrow = true + try { + callback.apply(this, arguments) + didThrow = false + } finally { + // If `callback` threw and there is an uncaughtException handler + // then call the `post` and `destroy` hook after the uncaughtException + // user handlers have been invoked. + if (didThrow && process.listenerCount('uncaughtException') > 0) { + process.once('uncaughtException', () => { + hooks.post.call(handle, uid, true) + hooks.destroy.call(null, uid) + }) + } + } + + // callback done successfully + hooks.post.call(handle, uid, false) + hooks.destroy.call(null, uid) + } + + return oldNextTick.apply(process, args) + } +} diff --git a/packages/thrift-server-core/src/main/async-hooks/patches/promise.ts b/packages/thrift-server-core/src/main/async-hooks/patches/promise.ts new file mode 100644 index 00000000..cfc82c54 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/patches/promise.ts @@ -0,0 +1,64 @@ +import { Hooks } from '../Hooks' +import { State } from '../State' + +class PromiseWrap {} + +export function patchPromise(hooks: Hooks, state: State) { + const Promise = global.Promise + + /* As per ECMAScript 2015, .catch must be implemented by calling .then, as + * such we need needn't patch .catch as well. see: + * http://www.ecma-international.org/ecma-262/6.0/#sec-promise.prototype.catch + */ + const oldThen = Promise.prototype.then + Promise.prototype.then = wrappedThen + + function makeWrappedHandler(fn: any, handle: any, uid: number, isOnFulfilled: boolean) { + if ('function' !== typeof fn) { + return isOnFulfilled + ? makeUnhandledResolutionHandler(uid) + : makeUnhandledRejectionHandler(uid) + } + + return function wrappedHandler(this: Promise) { + hooks.pre.call(handle, uid) + try { + return fn.apply(this, arguments) + } finally { + hooks.post.call(handle, uid, false) + hooks.destroy.call(null, uid) + } + } + } + + function makeUnhandledResolutionHandler(uid: number) { + return function unhandledResolutionHandler(val: any) { + hooks.destroy.call(null, uid) + return val + } + } + + function makeUnhandledRejectionHandler(uid: number) { + return function unhandledRejectedHandler(val: any) { + hooks.destroy.call(null, uid) + throw val + } + } + + function wrappedThen(this: Promise, onFulfilled: any, onRejected: any) { + if (!state.enabled) { + return oldThen.call(this, onFulfilled, onRejected) + } + + const handle = new PromiseWrap() + const uid = state.nextId += 1 + + hooks.init.call(handle, uid, 0, state.currentId, null) + + return oldThen.call( + this, + makeWrappedHandler(onFulfilled, handle, uid, true), + makeWrappedHandler(onRejected, handle, uid, false), + ) + } +} diff --git a/packages/thrift-server-core/src/main/async-hooks/patches/timers.ts b/packages/thrift-server-core/src/main/async-hooks/patches/timers.ts new file mode 100644 index 00000000..75158476 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/patches/timers.ts @@ -0,0 +1,122 @@ +import * as timers from 'timers' +// import { debug } from '../debug' +import { Hooks } from '../Hooks' +import { State } from '../State' + +function TimeoutWrap() {} +function IntervalWrap() {} +function ImmediateWrap() {} + +const timeoutMap = new Map() +const intervalMap = new Map() +const ImmediateMap = new Map() + +let activeCallback: any = null +let clearedInCallback: boolean = false + +export function patchTimers(hooks: Hooks, state: State) { + patchTimer(hooks, state, 'setTimeout', 'clearTimeout', TimeoutWrap, timeoutMap, true) + patchTimer(hooks, state, 'setInterval', 'clearInterval', IntervalWrap, intervalMap, false) + patchTimer(hooks, state, 'setImmediate', 'clearImmediate', ImmediateWrap, ImmediateMap, true) + + global.setTimeout = timers.setTimeout + global.setInterval = timers.setInterval + global.setImmediate = timers.setImmediate + + global.clearTimeout = timers.clearTimeout + global.clearInterval = timers.clearInterval + global.clearImmediate = timers.clearImmediate +} + +function patchTimer( + hooks: Hooks, + state: State, + setFn: string, + clearFn: string, + Handle: any, + timerMap: any, + singleCall: boolean, +): void { + const oldSetFn = (timers as any)[setFn] + const oldClearFn = (timers as any)[clearFn]; + + // overwrite set[Timeout] + (timers as any)[setFn] = function() { + if (!state.enabled) { + return oldSetFn.apply(timers, arguments) + } + + const args = Array.from(arguments) + const callback = args[0] + + if (typeof callback !== 'function') { + throw new TypeError('"callback" argument must be a function') + } + + const handle = new Handle() + const uid = state.nextId += 1 + let timerId: number + + // call the init hook + hooks.init.call(handle, uid, 0, state.currentId, null) + + // overwrite callback + args[0] = function() { + // call the pre hook + activeCallback = timerId + hooks.pre.call(handle, uid) + + let didThrow = true + try { + callback.apply(this, arguments) + didThrow = false + } finally { + // If `callback` threw and there is an uncaughtException handler + // then call the `post` and `destroy` hook after the uncaughtException + // user handlers have been invoked. + if (didThrow && process.listenerCount('uncaughtException') > 0) { + process.once('uncaughtException', () => { + // call the post hook + hooks.post.call(handle, uid, true) + // setInterval won't continue + timerMap.delete(timerId) + hooks.destroy.call(null, uid) + }) + } + } + + // callback done successfully + hooks.post.call(handle, uid, false) + activeCallback = null + + // call the destroy hook if the callback will only be called once + if (singleCall || clearedInCallback) { + clearedInCallback = false + timerMap.delete(timerId) + hooks.destroy.call(null, uid) + } + } + + timerId = oldSetFn.apply(timers, args) + // Bind the timerId and uid for later use, in case the clear* function is + // called. + timerMap.set(timerId, uid) + + return timerId + }; + + // overwrite clear[Timeout] + (timers as any)[clearFn] = (timerId: number) => { + // If clear* was called within the timer callback, then delay the destroy + // event to after the post event has been called. + if (activeCallback === timerId && timerId !== null) { + clearedInCallback = true + } else if (timerMap.has(timerId)) { + const uid = timerMap.get(timerId) + timerMap.delete(timerId) + hooks.destroy.call(null, uid) + } + + oldClearFn.apply(timers, arguments) + } +} diff --git a/packages/thrift-server-core/src/main/async-hooks/patches/types.ts b/packages/thrift-server-core/src/main/async-hooks/patches/types.ts new file mode 100644 index 00000000..f55f5e64 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/patches/types.ts @@ -0,0 +1,4 @@ +import { Hooks } from '../Hooks' +import { State } from '../State' + +export type Patch = (hooks: Hooks, state: State) => void diff --git a/packages/thrift-server-core/src/main/async-hooks/types.ts b/packages/thrift-server-core/src/main/async-hooks/types.ts new file mode 100644 index 00000000..b96a5137 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/types.ts @@ -0,0 +1,62 @@ +export interface IAsyncHooks { + createHook(options: IHookCallbacks): IAsyncHook + executionAsyncId(): number + triggerAsyncId(): number +} + +export interface IAsyncHook { + enable(): this + disable(): this +} + +export type InitCallback = + (asyncId: number, type: string, triggerAsyncId: number, resource: object) => void + +export type BeforeCallback = + (asyncId: number) => void + +export type AfterCallback = + (asyncId: number) => void + +export type PromiseCallback = + (asyncId: number) => void + +export type DestroyCallback = + (asyncId: number) => void + +export interface IHookCallbacks { + /** + * Called when a class is constructed that has the possibility to emit an asynchronous event. + * @param asyncId a unique ID for the async resource + * @param type the type of the async resource + * @param triggerAsyncId the unique ID of the async resource in whose execution context this async resource was created + * @param resource reference to the resource representing the async operation, needs to be released during destroy + */ + init?: InitCallback + + /** + * When an asynchronous operation is initiated or completes a callback is called to notify the user. + * The before callback is called just before said callback is executed. + * @param asyncId the unique identifier assigned to the resource about to execute the callback. + */ + before?: BeforeCallback + + /** + * Called immediately after the callback specified in before is completed. + * @param asyncId the unique identifier assigned to the resource which has executed the callback. + */ + after?: AfterCallback + + /** + * Called when a promise has resolve() called. This may not be in the same execution id + * as the promise itself. + * @param asyncId the unique id for the promise that was resolve()d. + */ + promiseResolve?: PromiseCallback + + /** + * Called after the resource corresponding to asyncId is destroyed + * @param asyncId a unique ID for the async resource + */ + destroy?: DestroyCallback +} diff --git a/packages/thrift-server-core/src/main/async-hooks/utils.ts b/packages/thrift-server-core/src/main/async-hooks/utils.ts new file mode 100644 index 00000000..54300506 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-hooks/utils.ts @@ -0,0 +1,8 @@ +export function packageExists(name: string): boolean { + try { + require.resolve(name) + return true + } catch (e) { + return false + } +} diff --git a/packages/thrift-server-core/src/main/async-scope/index.ts b/packages/thrift-server-core/src/main/async-scope/index.ts new file mode 100644 index 00000000..777e37d9 --- /dev/null +++ b/packages/thrift-server-core/src/main/async-scope/index.ts @@ -0,0 +1,141 @@ +import { + asyncHooks, + // debug, +} from '../async-hooks' + +export interface IAsyncScope { + get(key: string): T | null + set(key: string, value: T): void + delete(key: string): void +} + +interface IDictionary { + [key: string]: any +} + +interface IAsyncNode { + id: number + parentId: number + exited: boolean + data: IDictionary + children: Array +} + +type AsyncMap = Map + +function cleanUpParents(asyncId: number, parentId: number, asyncMap: AsyncMap): void { + if (asyncMap.has(parentId)) { + asyncMap.get(parentId)!.children = asyncMap.get(parentId)!.children.filter((next: number) => { + return next !== asyncId + }) + + if (asyncMap.get(parentId)!.exited && asyncMap.get(parentId)!.children.length === 0) { + const nextParentId: number = asyncMap.get(parentId)!.parentId + asyncMap.delete(parentId) + cleanUpParents(parentId, nextParentId, asyncMap) + } + } +} + +function recursiveGet(key: string, asyncId: number, asyncMap: AsyncMap): T | null { + if (asyncMap.has(asyncId)) { + if (asyncMap.get(asyncId)!.data[key] !== undefined) { + return asyncMap.get(asyncId)!.data[key] + } else { + return recursiveGet(key, asyncMap.get(asyncId)!.parentId, asyncMap) + } + } else { + return null + } +} + +function recursiveDelete(key: string, asyncId: number, asyncMap: AsyncMap): void { + if (asyncMap.has(asyncId)) { + const parentId: number = asyncMap.get(asyncId)!.parentId + + if (asyncMap.get(asyncId)!.data[key] !== undefined) { + delete asyncMap.get(asyncId)!.data[key] + } + + recursiveDelete(key, parentId, asyncMap) + } +} + +export class AsyncScope implements IAsyncScope { + private asyncMap: Map + + constructor() { + const self = this + this.asyncMap = new Map() + + asyncHooks.createHook({ + init(asyncId, type, triggerAsyncId, resource) { + // debug('init: ', arguments) + if (!self.asyncMap.has(triggerAsyncId)) { + self.asyncMap.set(triggerAsyncId, { + id: triggerAsyncId, + parentId: -1, + exited: false, + data: {}, + children: [], + }) + } + + self.asyncMap.get(triggerAsyncId)!.children.push(asyncId) + + self.asyncMap.set(asyncId, { + id: asyncId, + parentId: triggerAsyncId, + exited: false, + data: {}, + children: [], + }) + }, + before(asyncId) { + // debug('before: ', asyncId) + }, + after(asyncId) { + // debug('after: ', asyncId) + }, + promiseResolve(asyncId) { + // debug('promiseResolve: ', asyncId) + }, + destroy(asyncId) { + // debug('destroy: ', asyncId) + if (self.asyncMap.has(asyncId)) { + // Only delete if the the child scopes are not still active + if (self.asyncMap.get(asyncId)!.children.length === 0) { + const parentId: number = self.asyncMap.get(asyncId)!.parentId + self.asyncMap.delete(asyncId) + + cleanUpParents(asyncId, parentId, self.asyncMap) + + // If child scopes are still active mark this scope as exited so we can clean up + // when child scopes do exit. + } else { + self.asyncMap.get(asyncId)!.exited = true + } + } + }, + }).enable() + } + + public get(key: string): T | null { + const activeId: number = asyncHooks.executionAsyncId() + return recursiveGet(key, activeId, this.asyncMap) + } + + public set(key: string, value: T): void { + const activeId: number = asyncHooks.executionAsyncId() + if (this.asyncMap.has(activeId)) { + this.asyncMap.get(activeId)!.data[key] = value + } + } + + public delete(key: string): void { + const activeId: number = asyncHooks.executionAsyncId() + recursiveDelete(key, activeId, this.asyncMap) + } +} + +export const asyncScope: AsyncScope = new AsyncScope() diff --git a/packages/thrift-server-core/src/main/index.ts b/packages/thrift-server-core/src/main/index.ts index de90d745..5b9fc644 100644 --- a/packages/thrift-server-core/src/main/index.ts +++ b/packages/thrift-server-core/src/main/index.ts @@ -12,6 +12,7 @@ export * from './transports' export * from './observability' export * from './errors' export * from './utils' +export * from './async-scope' export function process(args: { processor: IThriftProcessor, diff --git a/packages/thrift-server-core/src/main/observability/AsyncContext.ts b/packages/thrift-server-core/src/main/observability/AsyncContext.ts new file mode 100644 index 00000000..51f2cae7 --- /dev/null +++ b/packages/thrift-server-core/src/main/observability/AsyncContext.ts @@ -0,0 +1,24 @@ +import { Context, TraceId } from 'zipkin' +import { asyncScope } from '../async-scope' + +export class AsyncContext implements Context { + public setContext(ctx: TraceId): void { + // console.log('setContext') + asyncScope.set('traceId', ctx) + } + + public getContext(): TraceId { + return asyncScope.get('traceId')! + } + + public scoped(callable: () => V): V { + return callable() + } + + public letContext(ctx: TraceId, callable: () => V): V { + return this.scoped(() => { + this.setContext(ctx) + return callable() + }) + } +} diff --git a/packages/thrift-server-core/src/main/observability/constants.ts b/packages/thrift-server-core/src/main/observability/constants.ts index ae83e72c..16a21294 100644 --- a/packages/thrift-server-core/src/main/observability/constants.ts +++ b/packages/thrift-server-core/src/main/observability/constants.ts @@ -1,8 +1,10 @@ export const L5D_TRACE_HDR: string = 'l5d-ctx-trace' const B3Prefix: string = 'x-b3' -export const B3_TRACE_HDR: string = `${B3Prefix}-traceid` -export const B3_SPAN_HDR: string = `${B3Prefix}-spanid` -export const B3_PARENT_SPAN_HDR: string = `${B3Prefix}-parentspanid` -export const B3_SAMPLED_HDR: string = `${B3Prefix}-sampled` -export const B3_FLAGS_HDR: string = `${B3Prefix}-flags` +export const ZipkinHeaders = { + TraceId: `${B3Prefix}-traceid`, + SpanId: `${B3Prefix}-spanid`, + ParentId: `${B3Prefix}-parentspanid`, + Sampled: `${B3Prefix}-sampled`, + Flags: `${B3Prefix}-flags`, +} diff --git a/packages/thrift-server-core/src/main/observability/types.ts b/packages/thrift-server-core/src/main/observability/types.ts index 543097f9..02a2893a 100644 --- a/packages/thrift-server-core/src/main/observability/types.ts +++ b/packages/thrift-server-core/src/main/observability/types.ts @@ -6,6 +6,23 @@ export interface ITraceId { traceIdHigh?: boolean } -export interface IHeaderMap { +export interface IRequestHeaders { [name: string]: string | Array | undefined } + +export interface IZipkinPluginOptions { + localServiceName: string + remoteServiceName?: string + port?: number + debug?: boolean + endpoint?: string + sampleRate?: number + httpInterval?: number +} + +export interface IZipkinTracerConfig { + debug?: boolean + endpoint?: string + sampleRate?: number + httpInterval?: number +} diff --git a/packages/thrift-server-core/src/main/observability/utils.ts b/packages/thrift-server-core/src/main/observability/utils.ts index a47b3bd8..a59ef1f8 100644 --- a/packages/thrift-server-core/src/main/observability/utils.ts +++ b/packages/thrift-server-core/src/main/observability/utils.ts @@ -1,6 +1,15 @@ import ByteBuffer = require('bytebuffer') +import { option, TraceId } from 'zipkin' -import { ITraceId } from './types' +import { + IRequestHeaders, + ITraceId, +} from './types' + +import { + L5D_TRACE_HDR, + ZipkinHeaders, +} from './constants' function isFlagSet(flags: number, field: number): boolean { return (flags & field) === field @@ -22,25 +31,31 @@ export function randomTraceId(): string { return traceId } -function getFlagBytes(traceId: ITraceId): Uint8Array { - if (traceId.sampled !== true) { +function getFlagBytes(traceId: TraceId): Uint8Array { + const value = traceId.sampled.getOrElse(false) + if (value !== true) { return new Uint8Array([0, 0, 0, 0, 0, 0, 0, 0]) + } else { return new Uint8Array([0, 0, 0, 0, 0, 0, 0, 6]) } } +function is128bit(traceId: TraceId): boolean { + return traceId.traceId.length === 32 +} + /** * For LinkerD l5d headers the extra bits for a 128-bit trace id are appended to the * end of the serialized header. */ -export function serializeLinkerdHeader(traceId: ITraceId): string { +export function serializeLinkerdHeader(traceId: TraceId): string { const serialized: ByteBuffer = ByteBuffer.concat([ ByteBuffer.fromHex(traceId.spanId), ByteBuffer.fromHex(traceId.parentId), ByteBuffer.fromHex(traceId.traceId.substring(0, 16)), getFlagBytes(traceId), - (traceId.traceIdHigh === true) ? + (is128bit(traceId)) ? ByteBuffer.fromHex(traceId.traceId.substring(16, 32)) : new Uint8Array([]), ]) @@ -48,11 +63,12 @@ export function serializeLinkerdHeader(traceId: ITraceId): string { return serialized.toBase64() } -export function deserializeLinkerdHeader(trace: string): ITraceId { +export function deserializeLinkerdHeader(trace: string): TraceId { const bytes: string = Buffer.from(trace, 'base64').toString('hex') if (bytes.length !== 64 && bytes.length !== 80) { throw new Error('TraceId headers must be 64 or 128-bit') + } else { const spanId = bytes.substring(0, 16) const parentId = bytes.substring(16, 32) @@ -68,6 +84,81 @@ export function deserializeLinkerdHeader(trace: string): ITraceId { traceId += bytes.substring(64, 80) } - return { spanId, parentId, traceId, sampled, traceIdHigh } + return traceIdFromTraceId({ + spanId, + parentId, + traceId, + sampled, + traceIdHigh, + }) + } +} + +function fromNullable(nullable: any): option.IOption { + if (nullable !== null && nullable !== undefined) { + return new option.Some(nullable) + } else { + return option.None } } + +export function traceIdValues(traceId: TraceId): ITraceId { + return { + traceId: traceId.traceId, + spanId: traceId.spanId, + parentId: traceId.parentId, + sampled: traceId.sampled.getOrElse(false), + traceIdHigh: (traceId.traceId.length > 16), + } +} + +export function traceIdFromTraceId(trace: ITraceId): TraceId { + return new TraceId({ + traceId: fromNullable(trace.traceId), + parentId: fromNullable(trace.parentId), + spanId: trace.spanId, + sampled: fromNullable(trace.sampled), + }) +} + +export function normalizeHeaders(headers: IRequestHeaders): IRequestHeaders { + if (headers[L5D_TRACE_HDR] !== undefined) { + const linkTrace = deserializeLinkerdHeader(headers[L5D_TRACE_HDR] as string) + if ( + headers[ZipkinHeaders.TraceId] !== undefined && + headers[ZipkinHeaders.TraceId] !== linkTrace.traceId + ) { + return headers + + } else { + headers[ZipkinHeaders.TraceId] = linkTrace.traceId + headers[ZipkinHeaders.SpanId] = linkTrace.spanId + headers[ZipkinHeaders.ParentId] = linkTrace.parentId + headers[ZipkinHeaders.Sampled] = linkTrace.sampled ? '1' : '0' + return headers + } + + } else { + return headers + } +} + +export function hasL5DHeader(headers: IRequestHeaders): boolean { + return headers[L5D_TRACE_HDR] !== undefined +} + +export function addL5Dheaders(headers: IRequestHeaders): IRequestHeaders { + const newHeaders = Object.keys(headers).reduce((acc: any, next: string) => { + acc[next.toLowerCase()] = headers[next] + return acc + }, {}) + + newHeaders[L5D_TRACE_HDR] = serializeLinkerdHeader(traceIdFromTraceId({ + traceId: newHeaders[ZipkinHeaders.TraceId] as string, + spanId: newHeaders[ZipkinHeaders.SpanId] as string, + parentId: newHeaders[ZipkinHeaders.ParentId] as string, + sampled: (newHeaders[ZipkinHeaders.Sampled] === '1') ? true : false, + })) + + return newHeaders +} diff --git a/packages/thrift-server-core/src/main/observability/zipkin.ts b/packages/thrift-server-core/src/main/observability/zipkin.ts index 50007a6c..7ebe3491 100644 --- a/packages/thrift-server-core/src/main/observability/zipkin.ts +++ b/packages/thrift-server-core/src/main/observability/zipkin.ts @@ -9,45 +9,62 @@ import { Tracer, } from 'zipkin' +import { + ZipkinHeaders, +} from './constants' + import { HttpLogger } from 'zipkin-transport-http' +import { + IZipkinTracerConfig, +} from './types' + const TRACER_CACHE: Map = new Map() /** * `http://localhost:9411/api/v1/spans` */ -export interface IZipkinPluginOptions { - serviceName: string - port?: number - debug?: boolean - endpoint?: string - sampleRate?: number -} - -export interface IZipkinConfig { - debug?: boolean - endpoint?: string - sampleRate?: number -} - -function recorderForOptions(options: IZipkinConfig): Recorder { +function recorderForOptions(options: IZipkinTracerConfig): Recorder { if (options.endpoint !== undefined) { return new BatchRecorder({ logger: new HttpLogger({ endpoint: options.endpoint, + httpInterval: options.httpInterval, }), }) + } else { return new ConsoleRecorder() } } -export function getTracerForService(name: string, options: IZipkinConfig): Tracer { - const maybeTracer = TRACER_CACHE.get(name) +export function getHeadersForTraceId(traceId?: TraceId): { [name: string]: any } { + if (traceId !== null && traceId !== undefined) { + const headers: { [name: string]: any } = {} + headers[ZipkinHeaders.TraceId] = traceId.traceId + headers[ZipkinHeaders.SpanId] = traceId.spanId + + traceId._parentId.ifPresent((val: string) => { + headers[ZipkinHeaders.ParentId] = val + }) + + traceId.sampled.ifPresent((sampled: boolean) => { + headers[ZipkinHeaders.Sampled] = sampled ? '1' : '0' + }) + + return headers + } else { + return {} + } +} + +export function getTracerForService(serviceName: string, options: IZipkinTracerConfig): Tracer { + const maybeTracer = TRACER_CACHE.get(serviceName) if (maybeTracer !== undefined) { return maybeTracer + } else { const ctxImpl: Context = new ExplicitContext() const recorder: Recorder = recorderForOptions(options) @@ -60,10 +77,10 @@ export function getTracerForService(name: string, options: IZipkinConfig): Trace options.sampleRate : 0.1, ), - localServiceName: name, // name of this application + localServiceName: serviceName, // name of this application }) - TRACER_CACHE.set(name, tracer) + TRACER_CACHE.set(serviceName, tracer) return tracer } diff --git a/packages/thrift-server-core/src/main/types.ts b/packages/thrift-server-core/src/main/types.ts index 6317aed1..d61e8592 100644 --- a/packages/thrift-server-core/src/main/types.ts +++ b/packages/thrift-server-core/src/main/types.ts @@ -1,4 +1,5 @@ import { EventEmitter } from 'events' +import { TraceId } from 'zipkin' import { TProtocol } from './protocols' import { TTransport } from './transports' @@ -6,6 +7,11 @@ export * from './Int64' export type LogFunction = (msg: string, data?: any) => void +export interface IRequestContext { + traceId: TraceId, + requestHeaders: { [name: string]: any } +} + /** * Options for any Thrift Server * diff --git a/packages/thrift-server-core/src/tests/unit/async-scope.spec.ts b/packages/thrift-server-core/src/tests/unit/async-scope.spec.ts new file mode 100644 index 00000000..7c53d255 --- /dev/null +++ b/packages/thrift-server-core/src/tests/unit/async-scope.spec.ts @@ -0,0 +1,138 @@ +import { expect } from 'code' +import * as Lab from 'lab' + +import { setTimeout } from 'timers' +import { asyncScope } from '../../main/async-scope' + +export const lab = Lab.script() + +const describe = lab.describe +const it = lab.it + +describe('AsyncStore', () => { + describe('set', () => { + it('should add value to current execution scope', (done) => { + asyncScope.set('set_test', 45) + expect(asyncScope.get('set_test')).to.equal(45) + done() + }) + }) + + describe('get', () => { + it('should allow fetching of values set on the same call tree', (done) => { + setTimeout(() => { + function childFunction() { + expect(asyncScope.get('foo')).to.equal(6) + asyncScope.set('bar', 89) + } + + function parentFunction() { + asyncScope.set('foo', 6) + setTimeout(() => { + childFunction() + }, 50) + + setTimeout(() => { + expect(asyncScope.get('bar')).to.equal(null) + done() + }, 250) + } + + parentFunction() + }, 500) + }) + + it('should not allow fetching of values set on different async call trees', (done) => { + setTimeout(() => { // runs first + asyncScope.set('boom', 109) + + function childFunction() { // runs fourth + expect(asyncScope.get('boo')).to.equal(null) + } + + function parentFunction() { // runs third + asyncScope.set('bam', 98) + setTimeout(childFunction, 200) + } + + parentFunction() + }, 100) + + setTimeout(() => { // runs second + asyncScope.set('boo', 37) + expect(asyncScope.get('boom')).to.equal(null) + + function childFunction() { // runs sixth + expect(asyncScope.get('bam')).to.equal(null) + done() + } + + function parentFunction() { // runs fifth + setTimeout(childFunction, 200) + } + + parentFunction() + }, 200) + }) + + it('should correctly fetch values across multiple sibling async contexts', (done) => { + const values: Array = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + asyncScope.set('all_siblings', 456) + values.forEach((next: number) => { + setTimeout(() => { + asyncScope.set(`key_${next}`, next) + setTimeout(() => { + expect(asyncScope.get(`key_${next}`)).to.equal(next) + expect(asyncScope.get('all_siblings')).to.equal(456) + values.forEach((v) => { + if (v !== next) { + expect(asyncScope.get(`key_${v}`)).to.equal(null) + } else { + expect(asyncScope.get(`key_${v}`)).to.equal(next) + } + }) + + if (next === 10) { + done() + } + }, (200 + (next * 200))) + }, (200 - (next * 18))) + }) + }) + }) + + describe('delete', () => { + it('should delete an existing value from the store', (done) => { + asyncScope.set('delete_test', 67) + expect(asyncScope.get('delete_test')).to.equal(67) + asyncScope.delete('delete_test') + expect(asyncScope.get('delete_test')).to.equal(null) + done() + }) + + it('should delete the value from all accessible scopes', (done) => { + setTimeout(() => { + asyncScope.set('test_val', 78) + expect(asyncScope.get('test_val')).to.equal(78) + setTimeout(() => { + expect(asyncScope.get('test_val')).to.equal(78) + asyncScope.set('test_val', 56) + expect(asyncScope.get('test_val')).to.equal(56) + setTimeout(() => { + expect(asyncScope.get('test_val')).to.equal(56) + asyncScope.set('test_val', 23) + expect(asyncScope.get('test_val')).to.equal(23) + setTimeout(() => { + expect(asyncScope.get('test_val')).to.equal(23) + asyncScope.set('test_val', 789) + expect(asyncScope.get('test_val')).to.equal(789) + asyncScope.delete('test_val') + expect(asyncScope.get('test_val')).to.equal(null) + done() + }, 100) + }, 100) + }, 100) + }, 100) + }) + }) +}) diff --git a/packages/thrift-server-core/src/tests/unit/zipkin.spec.ts b/packages/thrift-server-core/src/tests/unit/zipkin.spec.ts index 80588996..5a5ba870 100644 --- a/packages/thrift-server-core/src/tests/unit/zipkin.spec.ts +++ b/packages/thrift-server-core/src/tests/unit/zipkin.spec.ts @@ -1,15 +1,14 @@ import { expect } from 'code' import * as Lab from 'lab' -import { - // TraceId, - -} from 'zipkin' +import { TraceId } from 'zipkin' import { + addL5Dheaders, deserializeLinkerdHeader, - ITraceId, serializeLinkerdHeader, + traceIdFromTraceId, + traceIdValues, // randomTraceId, // SAMPLED, // SAMPLING_KNOWN, @@ -23,21 +22,21 @@ const it = lab.it // const after = lab.after describe('Zipkin', () => { - const traceId: ITraceId = { + const traceId: TraceId = traceIdFromTraceId({ traceId: '411d1802c9151ded', spanId: 'c3ba1a6560ca0c48', parentId: '2b5189ffa013ad73', sampled: true, traceIdHigh: false, - } + }) - const traceId128: ITraceId = { + const traceId128: TraceId = traceIdFromTraceId({ traceId: '411d1802c9151ded2b5189ffa013ad73', spanId: 'c3ba1a6560ca0c48', parentId: '2b5189ffa013ad73', sampled: true, traceIdHigh: true, - } + }) const serializedTraceId: string = 'w7oaZWDKDEgrUYn/oBOtc0EdGALJFR3tAAAAAAAAAAY=' @@ -62,4 +61,31 @@ describe('Zipkin', () => { expect(serializeLinkerdHeader(traceId128)).to.equal(serializedTraceId128) }) }) + + describe('addL5Dheaders', () => { + it('should add l5d header to headers', async () => { + const actual = addL5Dheaders({ + 'x-b3-traceId': '411d1802c9151ded', + 'x-b3-spanId': 'c3ba1a6560ca0c48', + 'x-b3-parentspanid': '2b5189ffa013ad73', + 'x-b3-sampled': '1', + }) + + expect(actual).to.equal({ + 'x-b3-traceid': '411d1802c9151ded', + 'x-b3-spanid': 'c3ba1a6560ca0c48', + 'x-b3-parentspanid': '2b5189ffa013ad73', + 'x-b3-sampled': '1', + 'l5d-ctx-trace': 'w7oaZWDKDEgrUYn/oBOtc0EdGALJFR3tAAAAAAAAAAY=', + }) + + expect(traceIdValues(deserializeLinkerdHeader(actual['l5d-ctx-trace'] as string))).to.equal({ + traceId: '411d1802c9151ded', + spanId: 'c3ba1a6560ca0c48', + parentId: '2b5189ffa013ad73', + sampled: true, + traceIdHigh: false, + }) + }) + }) }) diff --git a/packages/thrift-server-core/tslint.json b/packages/thrift-server-core/tslint.json index 2bae6853..86a8d1c2 100644 --- a/packages/thrift-server-core/tslint.json +++ b/packages/thrift-server-core/tslint.json @@ -1,6 +1,8 @@ { "extends": "tslint:recommended", "rules": { + "variable-name": false, + "only-arrow-functions": false, "max-classes-per-file": false, "max-line-length": false, "no-empty": false, diff --git a/packages/thrift-server-express/src/main/plugins/zipkin.ts b/packages/thrift-server-express/src/main/plugins/zipkin.ts index c49402b2..5eb1562d 100644 --- a/packages/thrift-server-express/src/main/plugins/zipkin.ts +++ b/packages/thrift-server-express/src/main/plugins/zipkin.ts @@ -1,6 +1,8 @@ import { + asyncScope, getTracerForService, IZipkinPluginOptions, + normalizeHeaders, } from '@creditkarma/thrift-server-core' import { @@ -24,21 +26,20 @@ function formatRequestUrl(req: express.Request): string { } export function zipkinMiddleware({ - serviceName, + localServiceName, port = 0, debug = false, endpoint, sampleRate, }: IZipkinPluginOptions): express.RequestHandler { - const tracer: Tracer = getTracerForService(serviceName, { debug, endpoint, sampleRate }) + const tracer: Tracer = getTracerForService(localServiceName, { debug, endpoint, sampleRate }) const instrumentation = new Instrumentation.HttpServer({ tracer, port }) return (req: express.Request, res: express.Response, next: express.NextFunction): void => { - console.log('zipkin request express') tracer.scoped(() => { - function readHeader(header: string): option.IOption { - const val = req.header(header) - console.log('val: ', val) - if (val != null) { + req.headers = normalizeHeaders(req.headers) + function readHeader(header: string): option.IOption> { + const val = req.headers[header.toLocaleLowerCase()] + if (val !== null && val !== undefined) { return new option.Some(val) } else { return option.None @@ -50,14 +51,17 @@ export function zipkinMiddleware({ req.method, formatRequestUrl(req), (readHeader as any), - ) as any as TraceId // Nasty but this method is incorectly typed + ) as any as TraceId // Nasty but this method is incorrectly typed - (req as any)._traceId = traceId + asyncScope.set('requestContext', { + traceId, + requestHeaders: req.headers, + }) res.on('finish', () => { tracer.scoped(() => { instrumentation.recordResponse( - (traceId as any as TraceId), + (traceId as any), // This method is also incorrectly typed `${res.statusCode}`, ) }) diff --git a/packages/thrift-server-hapi/src/main/index.ts b/packages/thrift-server-hapi/src/main/index.ts index 6974eb49..a262b39d 100644 --- a/packages/thrift-server-hapi/src/main/index.ts +++ b/packages/thrift-server-hapi/src/main/index.ts @@ -126,7 +126,7 @@ export function thirftServerHapi({ processor: options.handler, buffer: request.payload, Transport, diff --git a/packages/thrift-server-hapi/src/main/plugins/zipkin.ts b/packages/thrift-server-hapi/src/main/plugins/zipkin.ts index eb0f0eed..2995f170 100644 --- a/packages/thrift-server-hapi/src/main/plugins/zipkin.ts +++ b/packages/thrift-server-hapi/src/main/plugins/zipkin.ts @@ -1,6 +1,8 @@ import { + asyncScope, getTracerForService, IZipkinPluginOptions, + normalizeHeaders, } from '@creditkarma/thrift-server-core' import * as Hapi from 'hapi' import * as url from 'url' @@ -21,7 +23,7 @@ function readStatusCode({ response }: Hapi.Request): number { } export function zipkinPlugin({ - serviceName, + localServiceName, port = 0, debug = false, endpoint, @@ -29,12 +31,11 @@ export function zipkinPlugin({ }: IZipkinPluginOptions): Hapi.PluginRegistrationObject { const hapiZipkinPlugin: Hapi.PluginRegistrationObject = { register(server: Hapi.Server, nothing: never, next) { - const tracer = getTracerForService(serviceName, { debug, endpoint, sampleRate }) + const tracer = getTracerForService(localServiceName, { debug, endpoint, sampleRate }) const instrumentation = new Instrumentation.HttpServer({ tracer, port }) server.ext('onRequest', (request, reply) => { - console.log('zipkin request hapi') - const { headers } = request + (request.headers as any) = normalizeHeaders(request.headers) const plugins = request.plugins tracer.scoped(() => { @@ -42,8 +43,8 @@ export function zipkinPlugin({ request.method, url.format(request.url), (header: string): option.IOption => { - const val = headers[header.toLowerCase()] - if (val != null) { + const val = request.headers[header.toLowerCase()] + if (val !== null && val !== undefined) { return new option.Some(val) } else { return option.None @@ -52,6 +53,10 @@ export function zipkinPlugin({ ) plugins.zipkin = { traceId } + asyncScope.set('requestContext', { + traceId, + requestHeaders: request.headers, + }) return reply.continue() }) @@ -61,7 +66,8 @@ export function zipkinPlugin({ const statusCode = readStatusCode(request) tracer.scoped(() => { - instrumentation.recordResponse(request.plugins.zipkin.traceId, `${statusCode}`) + const traceId: any = request.plugins.zipkin.traceId + instrumentation.recordResponse(traceId, `${statusCode}`) }) return reply.continue()