diff --git a/.changeset/friendly-birds-happen.md b/.changeset/friendly-birds-happen.md new file mode 100644 index 000000000..0fc98209a --- /dev/null +++ b/.changeset/friendly-birds-happen.md @@ -0,0 +1,6 @@ +--- +'@segment/analytics-plugin-validation': major +'@segment/analytics-core': minor +--- + +Migrate common code into core. diff --git a/constraints.pro b/constraints.pro index 6a11c8717..c05b8cea3 100644 --- a/constraints.pro +++ b/constraints.pro @@ -26,7 +26,12 @@ gen_enforced_dependency(WorkspaceCwd, DependencyIdent, DependencyRange2, Depende \+ member(DependencyIdent, [ % Allow examples to use different versions of react and 'react', 'react-dom', - '@types/react' + '@types/react', + % Allow the usage of workspace^ -- there is a better way to do this =) + '@segment/analytics-next', + '@segment/analytics-node', + '@segment/analytics-core', + '@internal/config' ]). % Enforces that a dependency doesn't appear in both `dependencies` and `devDependencies` diff --git a/internal/README.md b/internal/README.md new file mode 100644 index 000000000..66f3860c5 --- /dev/null +++ b/internal/README.md @@ -0,0 +1,3 @@ +# @internal/* + +Internal packages are private and never published to npm. Internal packages may _only_ be installed as devDependencies (if installed in a public package). diff --git a/internal/config/package.json b/internal/config/package.json new file mode 100644 index 000000000..2a7e750b4 --- /dev/null +++ b/internal/config/package.json @@ -0,0 +1,7 @@ +{ + "name": "@internal/config", + "version": "0.0.0", + "private": true, + "main": "./src", + "packageManager": "yarn@3.2.1" +} diff --git a/internal/config/src/index.js b/internal/config/src/index.js new file mode 100644 index 000000000..824089237 --- /dev/null +++ b/internal/config/src/index.js @@ -0,0 +1,4 @@ +module.exports = { + createJestTSConfig: require('./jest/config').createJestTSConfig, + lintStagedConfig: require('./lint-staged/config'), +} diff --git a/internal/config/src/jest/config.js b/internal/config/src/jest/config.js new file mode 100644 index 000000000..847313b5c --- /dev/null +++ b/internal/config/src/jest/config.js @@ -0,0 +1,34 @@ +const { getJestModuleMap } = require('./get-module-map') + +/** + * Create Config + * @param {import('jest').Config} Overrides. + * @param {object} getJestModuleMap options. + * @returns {import('jest').Config} + */ +const createJestTSConfig = ( + { modulePathIgnorePatterns, testMatch, ...overridesToMerge } = {}, + { packageRoot, skipPackageMap } = {} +) => { + return { + moduleNameMapper: getJestModuleMap(packageRoot, skipPackageMap), + preset: 'ts-jest', + modulePathIgnorePatterns: [ + '/dist/', + ...(modulePathIgnorePatterns || []), + ], + testEnvironment: 'node', + testMatch: ['**/?(*.)+(test).[jt]s?(x)', ...(testMatch || [])], + clearMocks: true, + globals: { + 'ts-jest': { + isolatedModules: true, + }, + }, + ...(overridesToMerge || {}), + } +} + +module.exports = { + createJestTSConfig, +} diff --git a/internal/config/src/jest/get-module-map.js b/internal/config/src/jest/get-module-map.js new file mode 100644 index 000000000..68eeb4feb --- /dev/null +++ b/internal/config/src/jest/get-module-map.js @@ -0,0 +1,31 @@ +const getPackages = require('get-monorepo-packages') + +// do not map modules in CI to catch any package install bugs (slower)... not in use ATM +const doNotMapPackages = process.env.JEST_SKIP_PACKAGE_MAP === 'true' + +/** + * Allows ts-jest to dynamically resolve packages so "build" + */ +const getJestModuleMap = ( + packageRoot = '../../', + skipPackageMap = doNotMapPackages +) => { + // get listing of packages in the mono repo + const createLocation = (name) => { + return `/./${name}/src/$1` + } + const moduleNameMapper = getPackages(packageRoot).reduce( + (acc, el) => ({ + ...acc, + [`${el.package.name}(.*)$`]: createLocation(el.location), + }), + {} + ) + + return { + '@/(.+)': '/../../src/$1', + ...(skipPackageMap ? {} : moduleNameMapper), + } +} + +module.exports = { getJestModuleMap } diff --git a/internal/config/src/lint-staged/config.js b/internal/config/src/lint-staged/config.js new file mode 100644 index 000000000..ed9279df6 --- /dev/null +++ b/internal/config/src/lint-staged/config.js @@ -0,0 +1,4 @@ +module.exports = { + '*.{js,jsx,ts,tsx}': ['eslint --fix'], + '*.json*': ['prettier --write'], +} diff --git a/package.json b/package.json index 283bc07e6..01a49da36 100644 --- a/package.json +++ b/package.json @@ -4,31 +4,31 @@ "version": "0.0.0", "workspaces": [ "examples/*", - "packages/*" + "packages/*", + "internal/*" ], "engines": { "node": "^14.15.0" }, "scripts": { - "test": "turbo run test", + "test": "FORCE_COLOR=1 turbo run test --filter='./packages/*'", "test:scripts": "jest --config scripts/jest.config.js", "lint": "yarn constraints && turbo run lint", - "build": "turbo run build", - "build:packages": "turbo run build --filter='./packages/*'", + "build": "turbo run build --filter='./packages/*'", "watch": "turbo run watch --filter='./packages/*'", "dev": "yarn workspace with-next-js run dev", "postinstall": "husky install", "changeset": "changeset", "update-versions-and-changelogs": "changeset version && yarn version-run-all && bash scripts/update-lockfile.sh", - "release": "yarn build:packages --force && changeset publish && yarn postpublish-run-all && git push origin --tags --no-verify", + "release": "yarn build --force && changeset publish && yarn postpublish-run-all && git push origin --tags --no-verify", "postpublish-run-all": "yarn workspaces foreach -vpt --no-private run postpublish", "version-run-all": "yarn workspaces foreach -vpt --no-private run version", "core": "yarn workspace @segment/analytics-core", - "core+deps": "turbo run --filter='analytics-core'", + "core+deps": "turbo run --filter=@segment/analytics-core...", "browser": "yarn workspace @segment/analytics-next", - "browser+deps": "turbo run --filter='analytics-next'", + "browser+deps": "turbo run --filter=@segment/analytics-next...", "node": "yarn workspace @segment/analytics-node", - "node+deps": "turbo run --filter='analytics-node'", + "node+deps": "turbo run --filter=@segment/analytics-node...", "clean": "bash scripts/clean.sh" }, "packageManager": "yarn@3.2.1", diff --git a/packages/browser/.lintstagedrc.js b/packages/browser/.lintstagedrc.js index 92a200a01..bc1f1c780 100644 --- a/packages/browser/.lintstagedrc.js +++ b/packages/browser/.lintstagedrc.js @@ -1,3 +1 @@ -module.exports = { - '*.{js,jsx,ts,tsx}': ['eslint --fix'], -} +module.exports = require("@internal/config").lintStagedConfig diff --git a/packages/browser/jest.config.js b/packages/browser/jest.config.js index 8baecf944..4d75f7273 100644 --- a/packages/browser/jest.config.js +++ b/packages/browser/jest.config.js @@ -1,26 +1,9 @@ -module.exports = { - preset: 'ts-jest', - modulePathIgnorePatterns: [ - '/dist/', - '/e2e-tests', - '/qa', - ], - testEnvironment: 'jsdom', - testMatch: ["**/?(*.)+(test).[jt]s?(x)"], - clearMocks: true, - testEnvironmentOptions: { - resources: 'usable', - }, - moduleNameMapper: { - '@/(.+)': '/../../src/$1', - }, +const { createJestTSConfig } = require('@internal/config') + +module.exports = createJestTSConfig({ + modulePathIgnorePatterns: ['/e2e-tests', '/qa'], setupFilesAfterEnv: ['./jest.setup.js'], - globals: { - 'ts-jest': { - isolatedModules: true, - }, - }, - reporters: ['default'], + testEnvironment: 'jsdom', coverageThreshold: { global: { branches: 80.91, @@ -29,4 +12,4 @@ module.exports = { statements: 87.25, }, }, -} +}) diff --git a/packages/browser/package.json b/packages/browser/package.json index 5c985d94a..fc26947dd 100644 --- a/packages/browser/package.json +++ b/packages/browser/package.json @@ -31,7 +31,7 @@ "tsc": "yarn run -T tsc", "jest": "yarn run -T jest", "concurrently": "yarn run -T concurrently", - "watch": "yarn concurrently 'NODE_ENV=production yarn umd --watch' 'yarn pkg --watch' 'yarn cjs --watch'", + "watch": "yarn concurrently 'NODE_ENV=production yarn umd --watch' 'yarn pkg --watch --incremental'", "build": "yarn clean && yarn build-prep && yarn concurrently 'NODE_ENV=production yarn umd' 'yarn pkg' 'yarn cjs'", "postpublish": "echo 'running postpublish build step...' && NODE_ENV=production PROD_RELEASE=true bash scripts/release.sh", "pkg": "yarn tsc -p tsconfig.build.json", @@ -60,6 +60,7 @@ "unfetch": "^4.1.0" }, "devDependencies": { + "@internal/config": "0.0.0", "@segment/inspector-webext": "^1.1.0", "@size-limit/preset-big-lib": "^7.0.8", "@types/flat": "^5.0.1", diff --git a/packages/core-integration-tests/.eslintrc.js b/packages/core-integration-tests/.eslintrc.js new file mode 100644 index 000000000..b266f16dd --- /dev/null +++ b/packages/core-integration-tests/.eslintrc.js @@ -0,0 +1,4 @@ +/** @type { import('eslint').Linter.Config } */ +module.exports = { + extends: ["../../.eslintrc"], +} diff --git a/packages/core-integration-tests/.lintstagedrc.js b/packages/core-integration-tests/.lintstagedrc.js new file mode 100644 index 000000000..bc1f1c780 --- /dev/null +++ b/packages/core-integration-tests/.lintstagedrc.js @@ -0,0 +1 @@ +module.exports = require("@internal/config").lintStagedConfig diff --git a/packages/core-integration-tests/README.md b/packages/core-integration-tests/README.md new file mode 100644 index 000000000..08ec468c6 --- /dev/null +++ b/packages/core-integration-tests/README.md @@ -0,0 +1,2 @@ +Core tests that require AnalyticsBrowser, etc. +This exists because we can't create circular dependencies -- so, for example, installing AnalyticsBrowser as a dev dependency on core. \ No newline at end of file diff --git a/packages/core-integration-tests/jest.config.js b/packages/core-integration-tests/jest.config.js new file mode 100644 index 000000000..43505965a --- /dev/null +++ b/packages/core-integration-tests/jest.config.js @@ -0,0 +1,3 @@ +const { createJestTSConfig } = require('@internal/config') + +module.exports = createJestTSConfig() diff --git a/packages/core-integration-tests/package.json b/packages/core-integration-tests/package.json new file mode 100644 index 000000000..ad0d5f317 --- /dev/null +++ b/packages/core-integration-tests/package.json @@ -0,0 +1,20 @@ +{ + "name": "@internal/core-integration-tests", + "version": "0.0.0", + "private": true, + "scripts": { + "test": "yarn jest", + "lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'", + "watch:test": "yarn test --watch", + "tsc": "yarn run -T tsc", + "eslint": "yarn run -T eslint", + "concurrently": "yarn run -T concurrently", + "jest": "yarn run -T jest" + }, + "packageManager": "yarn@3.2.1", + "devDependencies": { + "@internal/config": "workspace:^", + "@segment/analytics-core": "workspace:^", + "@segment/analytics-next": "workspace:^" + } +} diff --git a/packages/core-integration-tests/src/plugins/middleware/index.test.ts b/packages/core-integration-tests/src/plugins/middleware/index.test.ts new file mode 100644 index 000000000..38e674d2a --- /dev/null +++ b/packages/core-integration-tests/src/plugins/middleware/index.test.ts @@ -0,0 +1,236 @@ +/** + * @jest-environment jsdom + */ +import { + MiddlewareFunction, + sourceMiddlewarePlugin, + CoreContext, + CorePlugin, +} from '@segment/analytics-core' +import { Analytics } from '@segment/analytics-next' + +describe(sourceMiddlewarePlugin, () => { + const simpleMiddleware: MiddlewareFunction = ({ payload, next }) => { + if (!payload.obj.context) { + payload.obj.context = {} + } + + payload.obj.context.hello = 'from the other side' + + next(payload) + } + + const xt = sourceMiddlewarePlugin(simpleMiddleware, {}) + + it('creates a source middleware', () => { + expect(xt.name).toEqual('Source Middleware simpleMiddleware') + expect(xt.version).toEqual('0.1.0') + }) + + it('is loaded automatically', async () => { + // @ts-expect-error + expect(await xt.load(CoreContext.system())).toBeTruthy() + expect(xt.isLoaded()).toBe(true) + }) + + describe('Middleware', () => { + it('allows for changing the event', async () => { + const changeProperties: MiddlewareFunction = ({ payload, next }) => { + if (!payload.obj.properties) { + payload.obj.properties = {} + } + payload.obj.properties.hello = 'from the other side' + next(payload) + } + + const xt = sourceMiddlewarePlugin(changeProperties, {}) + + expect( + ( + await xt.track!( + new CoreContext({ + type: 'track', + }) + ) + ).event.properties + ).toEqual({ + hello: 'from the other side', + }) + }) + + it('uses a segment facade object', async () => { + let type = '' + const facadeMiddleware: MiddlewareFunction = ({ payload, next }) => { + type = payload.type() + next(payload) + } + + const xt = sourceMiddlewarePlugin(facadeMiddleware, {}) + + await xt.track!( + new CoreContext({ + type: 'track', + }) + ) + + expect(type).toEqual(type) + }) + + it('cancels the event if `next` is not called', async () => { + const hangs: MiddlewareFunction = () => {} + const hangsXT = sourceMiddlewarePlugin(hangs, {}) + + const doesNotHang: MiddlewareFunction = ({ next, payload }) => { + next(payload) + } + + const doesNotHangXT = sourceMiddlewarePlugin(doesNotHang, {}) + const toReturn = new CoreContext({ type: 'track' }) + const returnedCtx = await doesNotHangXT.track!(toReturn) + + expect(returnedCtx).toBe(toReturn) + + const toCancel = new CoreContext({ type: 'track' }) + await Promise.resolve(hangsXT.track!(toCancel)).catch((err) => { + expect(err).toMatchInlineSnapshot(` + ContextCancelation { + "reason": "Middleware \`next\` function skipped", + "retry": false, + "type": "middleware_cancellation", + } + `) + }) + }) + }) + + describe('Common use cases', () => { + it('can be used to cancel an event altogether', async () => { + const blowUp: MiddlewareFunction = () => { + // do nothing + // do not invoke next + } + + const ajs = new Analytics({ + writeKey: 'abc', + }) + + const ctx = await ajs.page('hello') + expect(ctx.logs().map((l) => l.message)).toContain('Delivered') + + // TODO: make browser use CoreContext + // @ts-ignore + await ajs.addSourceMiddleware(blowUp) + const notDelivered = await ajs.page('hello') + expect(notDelivered.logs().map((l) => l.message)).not.toContain( + 'Delivered' + ) + }) + + it('can be used to re-route/cancel destinations', async () => { + let middlewareInvoked = false + const pageMock = jest.fn() + + const skipGA: MiddlewareFunction = ({ payload, next }) => { + if (!payload.obj.integrations) { + payload.obj.integrations = {} + } + + payload.obj.integrations['Google Analytics'] = false + middlewareInvoked = true + next(payload) + } + + const gaDestination: CorePlugin = { + name: 'Google Analytics', + isLoaded: () => true, + load: async () => {}, + type: 'destination', + version: '1.0', + page: async (ctx) => { + pageMock() + return ctx + }, + } + + const ajs = new Analytics({ + writeKey: 'abc', + }) + + // TODO: make browser use CoreContext + // @ts-ignore + await ajs.register(gaDestination) + + await ajs.page('hello') + expect(pageMock).toHaveBeenCalled() + + // TODO: make browser use CoreContext + // @ts-ignore + await ajs.addSourceMiddleware(skipGA) + await ajs.page('hello') + expect(middlewareInvoked).toBe(true) + expect(pageMock).toHaveBeenCalledTimes(1) + }) + }) + + describe('Event API', () => { + it('wraps track', async () => { + const evt = new CoreContext({ + type: 'track', + }) + + expect((await xt.track!(evt)).event.context).toMatchInlineSnapshot(` + Object { + "hello": "from the other side", + } + `) + }) + + it('wraps identify', async () => { + const evt = new CoreContext({ + type: 'identify', + }) + + expect((await xt.identify!(evt)).event.context).toMatchInlineSnapshot(` + Object { + "hello": "from the other side", + } + `) + }) + + it('wraps page', async () => { + const evt = new CoreContext({ + type: 'page', + }) + + expect((await xt.page!(evt)).event.context).toMatchInlineSnapshot(` + Object { + "hello": "from the other side", + } + `) + }) + + it('wraps group', async () => { + const evt = new CoreContext({ + type: 'group', + }) + + expect((await xt.group!(evt)).event.context).toMatchInlineSnapshot(` + Object { + "hello": "from the other side", + } + `) + }) + + it('wraps alias', async () => { + const evt = new CoreContext({ + type: 'alias', + }) + + expect((await xt.alias!(evt)).event.context).toMatchInlineSnapshot(` + Object { + "hello": "from the other side", + } + `) + }) + }) +}) diff --git a/packages/core-integration-tests/tsconfig.json b/packages/core-integration-tests/tsconfig.json new file mode 100644 index 000000000..9936980ac --- /dev/null +++ b/packages/core-integration-tests/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../tsconfig.json", + "exclude": ["node_modules", "dist"], + "compilerOptions": { + "module": "esnext", + "target": "ES5", + "moduleResolution": "node", + "lib": ["es2020"] + } +} diff --git a/packages/core/.lintstagedrc.js b/packages/core/.lintstagedrc.js index 4374de952..bc1f1c780 100644 --- a/packages/core/.lintstagedrc.js +++ b/packages/core/.lintstagedrc.js @@ -1,4 +1 @@ -module.exports = { - '*.{js,ts}': ['eslint --fix'], - '*.json*': ['prettier --write'], -} +module.exports = require("@internal/config").lintStagedConfig diff --git a/packages/core/jest.config.js b/packages/core/jest.config.js index b5b90775c..6662f48b4 100644 --- a/packages/core/jest.config.js +++ b/packages/core/jest.config.js @@ -1,14 +1,5 @@ -module.exports = { - preset: 'ts-jest', - modulePathIgnorePatterns: [ - '/dist/', - ], - testEnvironment: 'node', - testMatch: ["**/?(*.)+(test).[jt]s?(x)"], - clearMocks: true, - globals: { - 'ts-jest': { - isolatedModules: true, - }, - }, -} +const { createJestTSConfig } = require('@internal/config') + +module.exports = createJestTSConfig({ + projects: ['', '/../core-integration-tests'], +}) diff --git a/packages/core/jest.setup.js b/packages/core/jest.setup.js new file mode 100644 index 000000000..600a1cc54 --- /dev/null +++ b/packages/core/jest.setup.js @@ -0,0 +1,10 @@ +const { TextEncoder, TextDecoder } = require('util') +const { setImmediate } = require('timers') + +// fix: "ReferenceError: TextEncoder is not defined" after upgrading JSDOM +global.TextEncoder = TextEncoder +global.TextDecoder = TextDecoder +// fix: jsdom uses setImmediate under the hood for preflight XHR requests, +// and jest removed setImmediate, so we need to provide it to prevent console +// logging ReferenceErrors made by integration tests that call Amplitude. +global.setImmediate = setImmediate diff --git a/packages/core/package.json b/packages/core/package.json index 8ee7975a4..afc5b3ace 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -13,16 +13,17 @@ "files": [ "dist/", "src/", - "!**/__tests__/**" + "!**/__tests__/**", + "!*.tsbuildinfo" ], "sideEffects": false, "scripts": { "test": "yarn jest", "lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'", - "build": "rm -rf dist && yarn concurrently 'yarn:build:*'", + "build": "yarn concurrently 'yarn:build:*'", "build:esm": "yarn tsc -p tsconfig.build.json", "build:cjs": "yarn tsc -p tsconfig.build.json --outDir ./dist/cjs --module commonjs", - "watch": "yarn concurrently 'yarn:build:cjs --watch' 'yarn:build:esm --watch'", + "watch": "yarn build:esm --watch --incremental", "watch:test": "yarn test --watch", "tsc": "yarn run -T tsc", "eslint": "yarn run -T eslint", @@ -31,6 +32,8 @@ }, "packageManager": "yarn@3.2.1", "dependencies": { + "@lukeed/uuid": "^2.0.0", + "dset": "^3.1.2", "tslib": "^2.4.0" } } diff --git a/packages/core/src/analytics/dispatch.ts b/packages/core/src/analytics/dispatch.ts new file mode 100644 index 000000000..3645d02af --- /dev/null +++ b/packages/core/src/analytics/dispatch.ts @@ -0,0 +1,59 @@ +import { CoreContext } from '../context' +import { CoreSegmentEvent, Callback } from '../events/interfaces' +import { EventQueue } from '../queue/event-queue' +import { isOffline } from '../connection' +import { Emitter } from '../emitter' +import { invokeCallback } from '../callback' + +type DispatchOptions = { + timeout?: number + debug?: boolean + callback?: Callback + retryQueue?: boolean +} + +/** + * Push an event into the dispatch queue and invoke any callbacks. + * + * @param event - Segment event to enqueue. + * @param queue - Queue to dispatch against. + * @param emitter - This is typically an instance of "Analytics" -- used for metrics / progress information. + * @param options + */ +export async function dispatch( + event: CoreSegmentEvent, + queue: EventQueue, + emitter: Emitter, + options?: DispatchOptions +): Promise { + const ctx = new CoreContext(event) + emitter.emit('message_dispatch_pending', ctx) // TODO: inspectorHost.triggered?.(ctx as any) + + if (isOffline() && !options?.retryQueue) { + return ctx + } + + const startTime = Date.now() + let dispatched: CoreContext + if (queue.isEmpty()) { + dispatched = await queue.dispatchSingle(ctx) + } else { + dispatched = await queue.dispatch(ctx) + } + const elapsedTime = Date.now() - startTime + const timeoutInMs = options?.timeout + + if (options?.callback) { + dispatched = await invokeCallback( + dispatched, + options.callback, + Math.max((timeoutInMs ?? 300) - elapsedTime, 0), + timeoutInMs + ) + } + if (options?.debug) { + dispatched.flush() + } + + return dispatched +} diff --git a/packages/core/src/analytics/index.ts b/packages/core/src/analytics/index.ts new file mode 100644 index 000000000..75cbf89d4 --- /dev/null +++ b/packages/core/src/analytics/index.ts @@ -0,0 +1,13 @@ +import { CoreContext } from '../context' + +export interface CoreAnalytics { + track(...args: unknown[]): Promise + page(...args: unknown[]): Promise + identify(...args: unknown[]): Promise + group(...args: unknown[]): Promise + alias(...args: unknown[]): Promise + screen(...args: unknown[]): Promise + register(...plugins: unknown[]): Promise + deregister(...plugins: unknown[]): Promise + readonly VERSION: string +} diff --git a/packages/core/src/arguments-resolver/__tests__/page.test.ts b/packages/core/src/arguments-resolver/__tests__/page.test.ts new file mode 100644 index 000000000..0ddd87abb --- /dev/null +++ b/packages/core/src/arguments-resolver/__tests__/page.test.ts @@ -0,0 +1,166 @@ +import { resolvePageArguments } from '../page' + +const bananaPhone = { + banana: 'phone', +} + +const baseOptions = { + integrations: { + amplitude: false, + }, +} + +describe(resolvePageArguments, () => { + test('should accept (category, name, properties, options, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + 'category', + 'name', + bananaPhone, + baseOptions, + fn + ) + + expect(category).toEqual('category') + expect(name).toEqual('name') + expect(properties).toEqual(bananaPhone) + expect(options).toEqual(baseOptions) + expect(cb).toEqual(fn) + }) + + test('empty strings ("", "", "", { integrations })', () => { + const [category, name, properties, options] = resolvePageArguments( + '', + '', + null, + { + integrations: { + Amplitude: { + sessionId: '123', + }, + }, + } + ) + + expect(category).toEqual('') + expect(name).toEqual('') + expect(properties).toEqual({}) + expect(options).toEqual({ + integrations: { + Amplitude: { + sessionId: '123', + }, + }, + }) + }) + + test('should accept (category, name, properties, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + 'category', + 'name', + bananaPhone, + fn + ) + + expect(category).toEqual('category') + expect(name).toEqual('name') + expect(properties).toEqual(bananaPhone) + expect(cb).toEqual(fn) + + expect(options).toEqual({}) + }) + + it('should accept (category, name, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + 'category', + 'name', + fn + ) + + expect(category).toEqual('category') + expect(name).toEqual('name') + expect(properties).toEqual({}) + expect(cb).toEqual(fn) + + expect(options).toEqual({}) + }) + + it('should accept (name, properties, options, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + 'name', + bananaPhone, + baseOptions, + fn + ) + + expect(category).toEqual(null) + expect(name).toEqual('name') + expect(properties).toEqual(bananaPhone) + expect(options).toEqual(baseOptions) + expect(cb).toEqual(fn) + }) + + it('should accept (name, properties, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + 'name', + bananaPhone, + fn + ) + + expect(category).toEqual(null) + expect(name).toEqual('name') + expect(properties).toEqual(bananaPhone) + expect(cb).toEqual(fn) + expect(options).toEqual({}) + }) + + it('should accept (name, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + 'name', + fn + ) + + expect(name).toEqual('name') + expect(cb).toEqual(fn) + + expect(category).toEqual(null) + expect(properties).toEqual({}) + expect(options).toEqual({}) + }) + + it('should accept (properties, options, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + bananaPhone, + baseOptions, + fn + ) + + expect(cb).toEqual(fn) + expect(properties).toEqual(bananaPhone) + expect(options).toEqual(baseOptions) + + expect(name).toEqual(null) + expect(category).toEqual(null) + }) + + it('should accept (properties, callback)', () => { + const fn = jest.fn() + const [category, name, properties, options, cb] = resolvePageArguments( + bananaPhone, + fn + ) + + expect(properties).toEqual(bananaPhone) + expect(cb).toEqual(fn) + + expect(options).toEqual({}) + expect(name).toEqual(null) + expect(category).toEqual(null) + }) +}) diff --git a/packages/core/src/arguments-resolver/index.ts b/packages/core/src/arguments-resolver/index.ts new file mode 100644 index 000000000..feed53c7e --- /dev/null +++ b/packages/core/src/arguments-resolver/index.ts @@ -0,0 +1 @@ +export * from './page' diff --git a/packages/core/src/arguments-resolver/page.ts b/packages/core/src/arguments-resolver/page.ts new file mode 100644 index 000000000..714008e5a --- /dev/null +++ b/packages/core/src/arguments-resolver/page.ts @@ -0,0 +1,57 @@ +import { isFunction, isPlainObject, isString } from '../validation' +import { JSONObject, EventProperties, CoreOptions } from '../events' +import { Callback } from '../events/interfaces' + +/** + * Helper for page, screen methods + */ +export function resolvePageArguments( + category?: string | object, + name?: string | object | Callback, + properties?: EventProperties | CoreOptions | Callback | null, + options?: CoreOptions | Callback, + callback?: Callback +): [ + string | null, + string | null, + EventProperties, + CoreOptions, + Callback | undefined +] { + let resolvedCategory: string | undefined | null = null + let resolvedName: string | undefined | null = null + const args = [category, name, properties, options, callback] + + const strings = args.filter(isString) + if (strings[0] !== undefined && strings[1] !== undefined) { + resolvedCategory = strings[0] + resolvedName = strings[1] + } + + if (strings.length === 1) { + resolvedCategory = null + resolvedName = strings[0] + } + + const resolvedCallback = args.find(isFunction) as Callback | undefined + + const objects = args.filter((obj) => { + if (resolvedName === null) { + return isPlainObject(obj) + } + return isPlainObject(obj) || obj === null + }) as Array + + const resolvedProperties = (objects[0] ?? {}) as EventProperties + const resolvedOptions = (objects[1] ?? {}) as CoreOptions + + return [ + resolvedCategory, + resolvedName, + resolvedProperties, + resolvedOptions, + resolvedCallback, + ] +} + +export type PageParams = Parameters diff --git a/packages/core/src/callback/__tests__/index.test.ts b/packages/core/src/callback/__tests__/index.test.ts new file mode 100644 index 000000000..dc6d562d8 --- /dev/null +++ b/packages/core/src/callback/__tests__/index.test.ts @@ -0,0 +1,85 @@ +import { invokeCallback } from '..' +import { CoreContext } from '../../context' + +describe(invokeCallback, () => { + afterEach(() => { + jest.useRealTimers() + }) + + it('invokes a callback asynchronously', async () => { + const ctx = new CoreContext({ + type: 'track', + }) + + const fn = jest.fn() + const returned = await invokeCallback(ctx, fn, 0) + + expect(fn).toHaveBeenCalledWith(ctx) + expect(returned).toBe(ctx) + }) + + // Fixes GitHub issue: https://github.com/segmentio/analytics-next/issues/409 + // A.JS classic waited for the timeout before invoking callback, + // so keep same behavior in A.JS next. + it('calls the callback after a timeout', async () => { + const ctx = new CoreContext({ + type: 'track', + }) + + const fn = jest.fn() + const timeout = 100 + + const startTime = Date.now() + const returned = await invokeCallback(ctx, fn, timeout) + const endTime = Date.now() + + expect(fn).toHaveBeenCalled() + expect(endTime - startTime).toBeGreaterThanOrEqual(timeout - 1) + expect(returned).toBe(ctx) + }) + + it('ignores the callback after a timeout', async () => { + const ctx = new CoreContext({ + type: 'track', + }) + + const slow = (_ctx: CoreContext): Promise => { + return new Promise((resolve) => { + setTimeout(resolve, 200) + }) + } + + const returned = await invokeCallback(ctx, slow, 0, 50) + expect(returned).toBe(ctx) + + const logs = returned.logs() + expect(logs[0].extras).toMatchInlineSnapshot(` + Object { + "error": [Error: Promise timed out], + } + `) + + expect(logs[0].level).toEqual('warn') + }) + + it('does not crash if the callback crashes', async () => { + const ctx = new CoreContext({ + type: 'track', + }) + + const boo = (_ctx: CoreContext): Promise => { + throw new Error('👻 boo!') + } + + const returned = await invokeCallback(ctx, boo, 0) + expect(returned).toBe(ctx) + + const logs = returned.logs() + expect(logs[0].extras).toMatchInlineSnapshot(` + Object { + "error": [Error: 👻 boo!], + } + `) + expect(logs[0].level).toEqual('warn') + }) +}) diff --git a/packages/core/src/callback/index.ts b/packages/core/src/callback/index.ts new file mode 100644 index 000000000..cdad20627 --- /dev/null +++ b/packages/core/src/callback/index.ts @@ -0,0 +1,52 @@ +import { CoreContext } from '../context' +import type { Callback } from '../events' + +export function pTimeout( + cb: Promise, + timeout: number +): Promise { + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(Error('Promise timed out')) + }, timeout) + + cb.then((val) => { + clearTimeout(timeoutId) + return resolve(val) + }).catch(reject) + }) +} + +function sleep(timeoutInMs: number): Promise { + return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) +} + +/** + * @param delayTimeout - The amount of time in ms to wait before invoking the callback. + * @param timeout - The maximum amount of time in ms to allow the callback to run for. + */ +export function invokeCallback( + ctx: CoreContext, + callback: Callback, + delayTimeout: number, + timeout?: number +): Promise { + const cb = () => { + try { + return Promise.resolve(callback(ctx)) + } catch (err) { + return Promise.reject(err) + } + } + + return ( + sleep(delayTimeout) + // pTimeout ensures that the callback can't cause the context to hang + .then(() => pTimeout(cb(), timeout ?? 1000)) + .catch((err) => { + ctx?.log('warn', 'Callback Error', { error: err }) + ctx?.stats?.increment('callback_error') + }) + .then(() => ctx) + ) +} diff --git a/packages/core/src/connection/__tests__/index.test.ts b/packages/core/src/connection/__tests__/index.test.ts new file mode 100644 index 000000000..8f988a4e0 --- /dev/null +++ b/packages/core/src/connection/__tests__/index.test.ts @@ -0,0 +1,32 @@ +/** + * @jest-environment jsdom + */ + +import { isOffline, isOnline } from '..' + +describe('connection', () => { + let online = true + + beforeEach(() => { + Object.defineProperty(window.navigator, 'onLine', { + configurable: true, + get() { + return online + }, + }) + }) + + test('checks that the browser is online', () => { + online = true + + expect(isOnline()).toBe(true) + expect(isOffline()).toBe(false) + }) + + test('checks that the browser is offline', () => { + online = false + + expect(isOnline()).toBe(false) + expect(isOffline()).toBe(true) + }) +}) diff --git a/packages/core/src/connection/index.ts b/packages/core/src/connection/index.ts new file mode 100644 index 000000000..dac05aca1 --- /dev/null +++ b/packages/core/src/connection/index.ts @@ -0,0 +1,13 @@ +import { isBrowser } from '../utils/environment' + +export function isOnline(): boolean { + if (isBrowser()) { + return window.navigator.onLine + } + + return true +} + +export function isOffline(): boolean { + return !isOnline() +} diff --git a/packages/core/src/context/index.ts b/packages/core/src/context/index.ts new file mode 100644 index 000000000..3834da010 --- /dev/null +++ b/packages/core/src/context/index.ts @@ -0,0 +1,121 @@ +import { CoreSegmentEvent } from '../events/interfaces' + +import { v4 as uuid } from '@lukeed/uuid' +import { dset } from 'dset' +import { CoreLogger, LogLevel, LogMessage } from '../logger' +import Stats, { Metric } from '../stats' + +export interface SerializedContext { + id: string + event: CoreSegmentEvent + logs: LogMessage[] + metrics?: Metric[] +} + +export interface ContextFailedDelivery { + reason: unknown +} + +export interface CancelationOptions { + retry?: boolean + reason?: string + type?: string +} + +export class ContextCancelation { + retry: boolean + type: string + reason?: string + + constructor(options: CancelationOptions) { + this.retry = options.retry ?? true + this.type = options.type ?? 'plugin Error' + this.reason = options.reason ?? '' + } +} + +export class CoreContext { + event: Event + logger: CoreLogger + stats?: Stats + attempts = 0 + + private _failedDelivery?: ContextFailedDelivery + private _id: string + + constructor( + event: Event, + id = uuid(), + stats?: Stats, + logger: CoreLogger = new CoreLogger() + ) { + this.event = event + this._id = id + this.logger = logger + this.stats = stats + } + + static system(): CoreContext { + return new CoreContext({ type: 'track', event: 'system' }) + } + + isSame(other: CoreContext): boolean { + return other.id === this.id + } + + cancel = (error?: Error | ContextCancelation): never => { + if (error) { + throw error + } + + throw new ContextCancelation({ reason: 'Context Cancel' }) + } + + log(level: LogLevel, message: string, extras?: object): void { + this.logger.log(level, message, extras) + } + + get id(): string { + return this._id + } + + updateEvent(path: string, val: unknown): Event { + // Don't allow integrations that are set to false to be overwritten with integration settings. + if (path.split('.')[0] === 'integrations') { + const integrationName = path.split('.')[1] + + if (this.event.integrations?.[integrationName] === false) { + return this.event + } + } + + dset(this.event, path, val) + return this.event + } + + failedDelivery(): ContextFailedDelivery | undefined { + return this._failedDelivery + } + + setFailedDelivery(options: ContextFailedDelivery) { + this._failedDelivery = options + } + + logs(): LogMessage[] { + return this.logger.logs + } + + flush(): void { + this.logger.flush() + this.stats?.flush() + } + + toJSON(): SerializedContext { + return { + id: this._id, + event: this.event, + logs: this.logger.logs, + metrics: this.stats?.metrics, + } + } +} diff --git a/packages/core/src/emitter/index.ts b/packages/core/src/emitter/index.ts index 377285b51..3dcb8361e 100644 --- a/packages/core/src/emitter/index.ts +++ b/packages/core/src/emitter/index.ts @@ -1,12 +1,12 @@ -export class Emitter { - private callbacks: Record = {} +export class Emitter { + private callbacks: Partial> = {} - on(event: string, callback: Function): this { + on(event: EventName, callback: Function): this { this.callbacks[event] = [...(this.callbacks[event] ?? []), callback] return this } - once(event: string, fn: Function): this { + once(event: EventName, fn: Function): this { const on = (...args: unknown[]): void => { this.off(event, on) fn.apply(this, args) @@ -16,14 +16,14 @@ export class Emitter { return this } - off(event: string, callback: Function): this { + off(event: EventName, callback: Function): this { const fns = this.callbacks[event] ?? [] const without = fns.filter((fn) => fn !== callback) this.callbacks[event] = without return this } - emit(event: string, ...args: unknown[]): this { + emit(event: EventName, ...args: unknown[]): this { const callbacks = this.callbacks[event] ?? [] callbacks.forEach((callback) => { callback.apply(this, args) diff --git a/packages/core/src/events/index.ts b/packages/core/src/events/index.ts new file mode 100644 index 000000000..c85ef4d01 --- /dev/null +++ b/packages/core/src/events/index.ts @@ -0,0 +1,251 @@ +export * from './interfaces' +import { v4 as uuid } from '@lukeed/uuid' +import { dset } from 'dset' +import { ID, User } from '../user' +import { + Integrations, + EventProperties, + Traits, + CoreSegmentEvent, + CoreOptions, +} from './interfaces' +import md5 from 'spark-md5' + +export class EventFactory { + user?: User + + constructor(user?: User) { + this.user = user + } + + track( + event: string, + properties?: EventProperties, + options?: CoreOptions, + globalIntegrations?: Integrations + ) { + return this.normalize({ + ...this.baseEvent(), + event, + type: 'track', + properties, + options: { ...options }, + integrations: { ...globalIntegrations }, + }) + } + + page( + category: string | null, + page: string | null, + properties?: EventProperties, + options?: CoreOptions, + globalIntegrations?: Integrations + ): CoreSegmentEvent { + const event: CoreSegmentEvent = { + type: 'page', + properties: { ...properties }, + options: { ...options }, + integrations: { ...globalIntegrations }, + } + + if (category !== null) { + event.category = category + event.properties = event.properties ?? {} + event.properties.category = category + } + + if (page !== null) { + event.name = page + } + + return this.normalize({ + ...this.baseEvent(), + ...event, + }) + } + + screen( + category: string | null, + screen: string | null, + properties?: EventProperties, + options?: CoreOptions, + globalIntegrations?: Integrations + ): CoreSegmentEvent { + const event: CoreSegmentEvent = { + type: 'screen', + properties: { ...properties }, + options: { ...options }, + integrations: { ...globalIntegrations }, + } + + if (category !== null) { + event.category = category + } + + if (screen !== null) { + event.name = screen + } + + return this.normalize({ + ...this.baseEvent(), + ...event, + }) + } + + identify( + userId: ID, + traits?: Traits, + options?: CoreOptions, + globalIntegrations?: Integrations + ): CoreSegmentEvent { + return this.normalize({ + ...this.baseEvent(), + type: 'identify', + userId, + traits, + options, + integrations: globalIntegrations, + }) + } + + group( + groupId: ID, + traits?: Traits, + options?: CoreOptions, + globalIntegrations?: Integrations + ): CoreSegmentEvent { + return this.normalize({ + ...this.baseEvent(), + type: 'group', + traits, + options: { ...options }, + integrations: { ...globalIntegrations }, + groupId, + }) + } + + alias( + to: string, + from: string | null, + options?: CoreOptions, + globalIntegrations?: Integrations + ): CoreSegmentEvent { + const base: CoreSegmentEvent = { + userId: to, + type: 'alias', + options: { ...options }, + integrations: { ...globalIntegrations }, + } + + if (from !== null) { + base.previousId = from + } + + if (to === undefined) { + return this.normalize({ + ...base, + ...this.baseEvent(), + }) + } + + return this.normalize({ + ...this.baseEvent(), + ...base, + }) + } + + private baseEvent(): Partial { + const base: Partial = { + integrations: {}, + options: {}, + } + + if (!this.user) return base + + const user = this.user + + if (user.id()) { + base.userId = user.id() + } + + if (user.anonymousId()) { + base.anonymousId = user.anonymousId() + } + + return base + } + + /** + * Builds the context part of an event based on "foreign" keys that + * are provided in the `Options` parameter for an Event + */ + private context(event: CoreSegmentEvent): [object, object] { + const optionsKeys = ['integrations', 'anonymousId', 'timestamp', 'userId'] + + const options = event.options ?? {} + delete options['integrations'] + + const providedOptionsKeys = Object.keys(options) + + const context = event.options?.context ?? {} + const overrides = {} + + providedOptionsKeys.forEach((key) => { + if (key === 'context') { + return + } + + if (optionsKeys.includes(key)) { + dset(overrides, key, options[key]) + } else { + dset(context, key, options[key]) + } + }) + + return [context, overrides] + } + + public normalize(event: CoreSegmentEvent): CoreSegmentEvent { + const integrationBooleans = Object.keys(event.integrations ?? {}).reduce( + (integrationNames, name) => { + return { + ...integrationNames, + [name]: Boolean(event.integrations?.[name]), + } + }, + {} as Record + ) + + // This is pretty trippy, but here's what's going on: + // - a) We don't pass initial integration options as part of the event, only if they're true or false + // - b) We do accept per integration overrides (like integrations.Amplitude.sessionId) at the event level + // Hence the need to convert base integration options to booleans, but maintain per event integration overrides + const allIntegrations = { + // Base config integrations object as booleans + ...integrationBooleans, + + // Per event overrides, for things like amplitude sessionId, for example + ...event.options?.integrations, + } + + const [context, overrides] = this.context(event) + const { options, ...rest } = event + + const body = { + timestamp: new Date(), + ...rest, + context, + integrations: allIntegrations, + ...overrides, + } + + const messageId = 'ajs-next-' + md5.hash(JSON.stringify(body) + uuid()) + + const evt: CoreSegmentEvent = { + ...body, + messageId, + } + + return evt + } +} diff --git a/packages/core/src/events/interfaces.ts b/packages/core/src/events/interfaces.ts new file mode 100644 index 000000000..5f25ee1b7 --- /dev/null +++ b/packages/core/src/events/interfaces.ts @@ -0,0 +1,210 @@ +import { CoreContext } from '../context' +import { ID } from '../user' + +export type Callback = (ctx: CoreContext) => Promise | unknown + +export type SegmentEventType = + | 'track' + | 'page' + | 'identify' + | 'group' + | 'alias' + | 'screen' + +export type JSONPrimitive = string | number | boolean | null +export type JSONValue = JSONPrimitive | JSONObject | JSONArray +export type JSONObject = { [member: string]: JSONValue } +export type JSONArray = JSONValue[] + +export type Traits = Record + +export type EventProperties = Record + +export type Integrations = { + All?: boolean + [integration: string]: boolean | JSONObject | undefined +} + +// renamed +export type CoreOptions = { + integrations?: Integrations + timestamp?: Date | string + context?: CoreAnalyticsContext + traits?: Traits + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [key: string]: any +} + +interface CoreAnalyticsContext { + page?: { + /** + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L151} + */ + path?: string + referrer?: string + search?: string + title?: string + url?: string + } + + /** + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L285} + */ + userAgent?: string + + /** + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L286-L289} + */ + locale?: string + + /** + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L290-L291} + */ + library?: { + name: string + version: string + } + + /** + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L292-L301} + */ + traits?: { + crossDomainId?: string + } + + /** + * utm params + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L303-L305} + * {@link https://github.com/segmentio/utm-params/blob/master/lib/index.js#L49} + */ + campaign?: { + /** + * This can also come from the "utm_campaign" param + * + * {@link https://github.com/segmentio/utm-params/blob/master/lib/index.js#L40} + */ + name: string + term: string + source: string + medium: string + content: string + } + + /** + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L415} + */ + referrer?: { + btid?: string + urid?: string + } + + /** + * {@link https://github.com/segmentio/analytics.js-integrations/blob/2d5c637c022d2661c23449aed237d0d546bf062d/integrations/segmentio/lib/index.js#L322} + */ + amp?: { + id: string + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [key: string]: any +} + +export interface CoreSegmentEvent { + messageId?: string + type: SegmentEventType + + // page specific + category?: string + name?: string + + properties?: EventProperties + + traits?: Traits + + integrations?: Integrations + context?: CoreAnalyticsContext | CoreOptions + options?: CoreOptions + + userId?: ID + anonymousId?: ID + groupId?: ID + previousId?: ID + + event?: string + + writeKey?: string + + sentAt?: Date + + _metadata?: SegmentEventMetadata + + timestamp?: Date | string +} + +export interface SegmentEventMetadata { + failedInitializations?: unknown[] + bundled?: string[] + unbundled?: string[] + nodeVersion?: string + bundledConfigIds?: string[] + unbundledConfigIds?: string[] + bundledIds?: string[] +} + +export type SegmentEventTimestamp = Date | string + +export interface Plan { + track?: TrackPlan +} + +export interface TrackPlan { + [key: string]: PlanEvent | undefined + // __default SHOULD always exist, but marking as optional for extra safety. + __default?: PlanEvent +} + +interface PlanEvent { + /** + * Whether or not this plan event is enabled + */ + enabled: boolean + /** + * Which integrations the plan event applies to + */ + integrations: { + [key: string]: boolean + } +} + +export interface ReservedTraits { + address: Partial<{ + city: string + country: string + postalCode: string + state: string + street: string + }> + age: number + avatar: string + birthday: Date + company: Partial<{ + name: string + id: string | number + industry: string + employee_count: number + }> + plan: string + createdAt: Date + description: string + email: string + firstName: string + gender: string + id: string + lastName: string + name: string + phone: string + title: string + username: string + website: string +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 7af4f69eb..7b134220e 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1 +1,15 @@ -export { Emitter } from './emitter' +export * from './emitter' +export * from './plugins' +export * from './plugins/middleware' +export * from './events/interfaces' +export * from './events' +export * from './priority-queue' +export * from './context' +export * from './queue/event-queue' +export * from './analytics' +export * from './analytics/dispatch' +export * from './arguments-resolver' +export * from './validation' +export * from './utils/to-facade' +export * from './utils/bind-all' +export * from './stats' diff --git a/packages/core/src/logger/__tests__/index.test.ts b/packages/core/src/logger/__tests__/index.test.ts new file mode 100644 index 000000000..079c84578 --- /dev/null +++ b/packages/core/src/logger/__tests__/index.test.ts @@ -0,0 +1,66 @@ +import { CoreLogger } from '..' + +describe(CoreLogger, () => { + let logger: CoreLogger + + beforeEach(() => { + logger = new CoreLogger() + }) + + it('logs events at different levels', () => { + logger.log('debug', 'Debugging', { test: 'debug', emoji: '🐛' }) + logger.log('info', 'Info', { test: 'info', emoji: '📰' }) + logger.log('warn', 'Warning', { test: 'warn', emoji: '⚠️' }) + logger.log('error', 'Error', { test: 'error', emoji: '💥' }) + + expect(logger.logs).toEqual([ + { + extras: { + emoji: '🐛', + test: 'debug', + }, + level: 'debug', + message: 'Debugging', + time: expect.any(Date), + }, + { + extras: { + emoji: '📰', + test: 'info', + }, + level: 'info', + message: 'Info', + time: expect.any(Date), + }, + { + extras: { + emoji: '⚠️', + test: 'warn', + }, + level: 'warn', + message: 'Warning', + time: expect.any(Date), + }, + { + extras: { + emoji: '💥', + test: 'error', + }, + level: 'error', + message: 'Error', + time: expect.any(Date), + }, + ]) + }) + + it('flushes logs to the console', () => { + jest.spyOn(console, 'table').mockImplementationOnce(() => {}) + + logger.log('info', 'my log') + logger.log('debug', 'my log') + + logger.flush() + expect(console.table).toHaveBeenCalled() + expect(logger.logs).toEqual([]) + }) +}) diff --git a/packages/core/src/logger/index.ts b/packages/core/src/logger/index.ts new file mode 100644 index 000000000..fb1f6a33a --- /dev/null +++ b/packages/core/src/logger/index.ts @@ -0,0 +1,75 @@ +export type LogLevel = 'debug' | 'info' | 'warn' | 'error' +export type LogMessage = { + level: LogLevel + message: string + time?: Date + extras?: Record +} + +// interface is just for clarity +export interface Logger { + log(level: LogLevel, message: string, extras?: object): void + flush(): void + logs: LogMessage[] +} + +export class CoreLogger implements Logger { + private _logs: LogMessage[] = [] + + log = (level: LogLevel, message: string, extras?: object): void => { + const time = new Date() + this._logs.push({ + level, + message, + time, + extras, + }) + } + + public get logs(): LogMessage[] { + return this._logs + } + + public flush(): void { + if (this.logs.length > 1) { + const formatted = this._logs.reduce((logs, log) => { + const line = { + ...log, + json: JSON.stringify(log.extras, null, ' '), + extras: log.extras, + } + + delete line['time'] + + let key = log.time?.toISOString() ?? '' + if (logs[key]) { + key = `${key}-${Math.random()}` + } + + return { + ...logs, + [key]: line, + } + }, {} as Record) + + // ie doesn't like console.table + if (console.table) { + console.table(formatted) + } else { + console.log(formatted) + } + } else { + this.logs.forEach((logEntry) => { + const { level, message, extras } = logEntry + + if (level === 'info' || level === 'debug') { + console.log(message, extras ?? '') + } else { + console[level](message, extras ?? '') + } + }) + } + + this._logs = [] + } +} diff --git a/packages/core/src/plugins/index.ts b/packages/core/src/plugins/index.ts new file mode 100644 index 000000000..602c950e1 --- /dev/null +++ b/packages/core/src/plugins/index.ts @@ -0,0 +1,43 @@ +import { CoreAnalytics } from '../analytics' +import { CoreContext } from '../context' + +interface CorePluginConfig { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + options: any + priority: 'critical' | 'non-critical' // whether AJS should expect this plugin to be loaded before starting event delivery +} + +export type PluginType = + | 'before' + | 'after' + | 'destination' + | 'enrichment' + | 'utility' + +// enrichment - modifies the event. Enrichment can happen in parallel, by reducing all changes in the final event. Failures in this stage could halt event delivery. +// destination - runs in parallel at the end of the lifecycle. Cannot modify the event, can fail and not halt execution. +// utility - do not affect lifecycle. Should be run and executed once. Their `track/identify` calls don't really do anything. example + +export interface CorePlugin< + Analytics extends CoreAnalytics = CoreAnalytics, + Ctx extends CoreContext = CoreContext +> { + name: string + version: string + type: PluginType + isLoaded: () => boolean + load: ( + ctx: Ctx, + instance: Analytics, + config?: CorePluginConfig + ) => Promise + + unload?: (ctx: Ctx, instance: Analytics) => Promise | unknown + ready?: () => Promise + track?: (ctx: Ctx) => Promise | Ctx + identify?: (ctx: Ctx) => Promise | Ctx + page?: (ctx: Ctx) => Promise | Ctx + group?: (ctx: Ctx) => Promise | Ctx + alias?: (ctx: Ctx) => Promise | Ctx + screen?: (ctx: Ctx) => Promise | Ctx +} diff --git a/packages/core/src/plugins/middleware/index.ts b/packages/core/src/plugins/middleware/index.ts new file mode 100644 index 000000000..c9ca33f0c --- /dev/null +++ b/packages/core/src/plugins/middleware/index.ts @@ -0,0 +1,127 @@ +import { CoreContext, ContextCancelation } from '../../context' +import { CoreSegmentEvent as SegmentEvent } from '../../events' +import { CorePlugin } from '../' +import { SegmentFacade, toFacade } from '../../utils/to-facade' + +export interface MiddlewareParams { + payload: SegmentFacade + + integrations?: SegmentEvent['integrations'] + next: (payload: MiddlewareParams['payload'] | null) => void +} + +export interface DestinationMiddlewareParams { + payload: SegmentFacade + integration: string + next: (payload: MiddlewareParams['payload'] | null) => void +} + +export type MiddlewareFunction = (middleware: MiddlewareParams) => void +export type DestinationMiddlewareFunction = ( + middleware: DestinationMiddlewareParams +) => void + +export async function applyDestinationMiddleware( + destination: string, + evt: SegmentEvent, + middleware: DestinationMiddlewareFunction[] +): Promise { + async function applyMiddleware( + event: SegmentEvent, + fn: DestinationMiddlewareFunction + ): Promise { + let nextCalled = false + let returnedEvent: SegmentEvent | null = null + + await Promise.resolve( + fn({ + payload: toFacade(event, { + clone: true, + traverse: false, + }), + integration: destination, + next(evt) { + nextCalled = true + + if (evt === null) { + returnedEvent = null + } + + if (evt) { + returnedEvent = evt.obj + } + }, + }) + ) + + if (!nextCalled && returnedEvent !== null) { + returnedEvent = returnedEvent as SegmentEvent + returnedEvent.integrations = { + ...event.integrations, + [destination]: false, + } + } + + return returnedEvent + } + + for (const md of middleware) { + const result = await applyMiddleware(evt, md) + if (result === null) { + return null + } + evt = result + } + + return evt +} + +export function sourceMiddlewarePlugin( + fn: MiddlewareFunction, + integrations: SegmentEvent['integrations'] +): CorePlugin { + async function apply(ctx: CoreContext): Promise { + let nextCalled = false + + await Promise.resolve( + fn({ + payload: toFacade(ctx.event, { + clone: true, + traverse: false, + }), + integrations: integrations ?? {}, + next(evt) { + nextCalled = true + if (evt) { + ctx.event = evt.obj + } + }, + }) + ) + + if (!nextCalled) { + throw new ContextCancelation({ + retry: false, + type: 'middleware_cancellation', + reason: 'Middleware `next` function skipped', + }) + } + + return ctx + } + + return { + name: `Source Middleware ${fn.name}`, + type: 'before', + version: '0.1.0', + + isLoaded: (): boolean => true, + load: (ctx) => Promise.resolve(ctx), + + track: apply, + page: apply, + identify: apply, + alias: apply, + group: apply, + } +} diff --git a/packages/core/src/priority-queue/__tests__/backoff.test.ts b/packages/core/src/priority-queue/__tests__/backoff.test.ts new file mode 100644 index 000000000..3c6beac2f --- /dev/null +++ b/packages/core/src/priority-queue/__tests__/backoff.test.ts @@ -0,0 +1,23 @@ +import { backoff } from '../backoff' + +describe('backoff', () => { + it('increases with the number of attempts', () => { + expect(backoff({ attempt: 1 })).toBeGreaterThan(1000) + expect(backoff({ attempt: 2 })).toBeGreaterThan(2000) + expect(backoff({ attempt: 3 })).toBeGreaterThan(3000) + expect(backoff({ attempt: 4 })).toBeGreaterThan(4000) + }) + + it('accepts a max timeout', () => { + expect(backoff({ attempt: 1, maxTimeout: 3000 })).toBeGreaterThan(1000) + expect(backoff({ attempt: 3, maxTimeout: 3000 })).toBeLessThanOrEqual(3000) + expect(backoff({ attempt: 4, maxTimeout: 3000 })).toBeLessThanOrEqual(3000) + }) + + it('accepts a growth factor', () => { + const f2 = backoff({ attempt: 2, factor: 2 }) + const f3 = backoff({ attempt: 2, factor: 3 }) + + expect(f3).toBeGreaterThan(f2) + }) +}) diff --git a/packages/core/src/priority-queue/__tests__/index.test.ts b/packages/core/src/priority-queue/__tests__/index.test.ts new file mode 100644 index 000000000..ccf327664 --- /dev/null +++ b/packages/core/src/priority-queue/__tests__/index.test.ts @@ -0,0 +1,158 @@ +import { PriorityQueue } from '..' + +type Item = { + id: string +} + +describe('RetryQueue', () => { + it('accepts new work', () => { + const queue = new PriorityQueue(10, []) + queue.push({ id: 'abc' }, { id: 'cde' }) + expect(queue.length).toBe(2) + }) + + it('pops items off the queue', () => { + const queue = new PriorityQueue(10, []) + queue.push({ id: 'abc' }, { id: 'cde' }) + + expect(queue.pop()).toEqual({ id: 'abc' }) + expect(queue.length).toBe(1) + + expect(queue.pop()).toEqual({ id: 'cde' }) + expect(queue.length).toBe(0) + }) + + it('deprioritizes repeated items', () => { + const queue = new PriorityQueue(10, []) + queue.push({ id: 'abc' }) + queue.push({ id: 'abc' }) + + queue.push({ id: 'cde' }) + + // deprioritizes 'abc' because it was seen twice + expect(queue.pop()).toEqual({ id: 'cde' }) + }) + + it('deprioritizes repeated items even though they have been popped before', () => { + const queue = new PriorityQueue(10, []) + queue.push({ id: 'abc' }) + queue.pop() + + queue.push({ id: 'abc' }) + queue.push({ id: 'cde' }) + + // a queue does not forget + expect(queue.pop()).toEqual({ id: 'cde' }) + }) + + it('stops accepting an item after attempts have been exausted', () => { + const queue = new PriorityQueue(3, []) + queue.push({ id: 'abc' }) + expect(queue.length).toBe(1) + queue.pop() + + queue.push({ id: 'abc' }) + expect(queue.length).toBe(1) + queue.pop() + + queue.push({ id: 'abc' }) + expect(queue.length).toBe(1) + queue.pop() + + queue.push({ id: 'abc' }) + // does not accept it anymore + expect(queue.length).toBe(0) + }) +}) + +describe('backoffs', () => { + afterEach(() => { + jest.clearAllTimers() + }) + + it('accepts new work', () => { + const queue = new PriorityQueue(10, []) + + queue.pushWithBackoff({ id: 'abc' }) + queue.pushWithBackoff({ id: 'cde' }) + + expect(queue.length).toBe(2) + expect(queue.todo).toBe(2) + }) + + it('ignores when item has not been worked on', () => { + const queue = new PriorityQueue(10, []) + + expect(queue.pushWithBackoff({ id: 'abc' })).toBe(true) + expect(queue.pushWithBackoff({ id: 'abc' })).toBe(false) + expect(queue.length).toBe(1) + expect(queue.todo).toBe(1) + }) + + it('schedules as future work when item returns to the queue', () => { + const queue = new PriorityQueue(10, []) + + queue.pushWithBackoff({ id: 'abc' }) + queue.pop() + + // accepted work + expect(queue.pushWithBackoff({ id: 'abc' })).toBe(true) + + // not in the main queue yet + expect(queue.length).toBe(0) + + // present in future work + expect(queue.todo).toBe(1) + expect(queue.includes({ id: 'abc' })).toBe(true) + }) + + it('schedules as future work for later', () => { + jest.useFakeTimers() + const spy = jest.spyOn(global, 'setTimeout') + + const queue = new PriorityQueue(10, []) + + queue.pushWithBackoff({ id: 'abc' }) + expect(spy).not.toHaveBeenCalled() + + queue.pop() + + queue.pushWithBackoff({ id: 'abc' }) + expect(spy).toHaveBeenCalled() + + const delay = spy.mock.calls[0][1] + expect(delay).toBeGreaterThan(1000) + }) + + it('increases the delay as work gets requeued', () => { + jest.useFakeTimers() + const spy = jest.spyOn(global, 'setTimeout') + + const queue = new PriorityQueue(10, []) + + queue.pushWithBackoff({ id: 'abc' }) + jest.advanceTimersToNextTimer() + queue.pop() + + queue.pushWithBackoff({ id: 'abc' }) + jest.advanceTimersToNextTimer() + queue.pop() + + queue.pushWithBackoff({ id: 'abc' }) + jest.advanceTimersToNextTimer() + queue.pop() + + queue.pushWithBackoff({ id: 'abc' }) + jest.advanceTimersToNextTimer() + queue.pop() + + const firstDelay = spy.mock.calls[0][1] + expect(firstDelay).toBeGreaterThan(1000) + + const secondDelay = spy.mock.calls[1][1] + expect(secondDelay).toBeGreaterThan(2000) + + const thirdDelay = spy.mock.calls[2][1] + expect(thirdDelay).toBeGreaterThan(3000) + }) +}) diff --git a/packages/core/src/priority-queue/backoff.ts b/packages/core/src/priority-queue/backoff.ts new file mode 100644 index 000000000..5ef3e4552 --- /dev/null +++ b/packages/core/src/priority-queue/backoff.ts @@ -0,0 +1,24 @@ +type BackoffParams = { + /** The number of milliseconds before starting the first retry. Default is 500 */ + minTimeout?: number + + /** The maximum number of milliseconds between two retries. Default is Infinity */ + maxTimeout?: number + + /** The exponential factor to use. Default is 2. */ + factor?: number + + /** The current attempt */ + attempt: number +} + +export function backoff(params: BackoffParams): number { + const random = Math.random() + 1 + const { + minTimeout = 500, + factor = 2, + attempt, + maxTimeout = Infinity, + } = params + return Math.min(random * minTimeout * Math.pow(factor, attempt), maxTimeout) +} diff --git a/packages/core/src/priority-queue/index.ts b/packages/core/src/priority-queue/index.ts new file mode 100644 index 000000000..16c611af9 --- /dev/null +++ b/packages/core/src/priority-queue/index.ts @@ -0,0 +1,99 @@ +import { Emitter } from '../emitter' +import { backoff } from './backoff' + +/** + * @internal + */ +export const ON_REMOVE_FROM_FUTURE = 'onRemoveFromFuture' + +export interface WithID { + id: string +} + +export class PriorityQueue extends Emitter { + protected future: T[] = [] + protected queue: T[] + protected seen: Record + + public maxAttempts: number + + constructor(maxAttempts: number, queue: T[], seen?: Record) { + super() + this.maxAttempts = maxAttempts + this.queue = queue + this.seen = seen ?? {} + } + + push(...operations: T[]): boolean[] { + const accepted = operations.map((operation) => { + const attempts = this.updateAttempts(operation) + + if (attempts > this.maxAttempts || this.includes(operation)) { + return false + } + + this.queue.push(operation) + return true + }) + + this.queue = this.queue.sort( + (a, b) => this.getAttempts(a) - this.getAttempts(b) + ) + return accepted + } + + pushWithBackoff(operation: T): boolean { + if (this.getAttempts(operation) === 0) { + return this.push(operation)[0] + } + + const attempt = this.updateAttempts(operation) + + if (attempt > this.maxAttempts || this.includes(operation)) { + return false + } + + const timeout = backoff({ attempt: attempt - 1 }) + + setTimeout(() => { + this.queue.push(operation) + // remove from future list + this.future = this.future.filter((f) => f.id !== operation.id) + // Lets listeners know that a 'future' message is now available in the queue + this.emit(ON_REMOVE_FROM_FUTURE) + }, timeout) + + this.future.push(operation) + return true + } + + public getAttempts(operation: T): number { + return this.seen[operation.id] ?? 0 + } + + public updateAttempts(operation: T): number { + this.seen[operation.id] = this.getAttempts(operation) + 1 + return this.getAttempts(operation) + } + + includes(operation: T): boolean { + return ( + this.queue.includes(operation) || + this.future.includes(operation) || + Boolean(this.queue.find((i) => i.id === operation.id)) || + Boolean(this.future.find((i) => i.id === operation.id)) + ) + } + + pop(): T | undefined { + return this.queue.shift() + } + + public get length(): number { + return this.queue.length + } + + public get todo(): number { + return this.queue.length + this.future.length + } +} diff --git a/packages/core/src/priority-queue/persisted.ts b/packages/core/src/priority-queue/persisted.ts new file mode 100644 index 000000000..8aeeb2397 --- /dev/null +++ b/packages/core/src/priority-queue/persisted.ts @@ -0,0 +1,136 @@ +import { PriorityQueue } from '.' +import { CoreContext, SerializedContext } from '../context' + +const nullStorage = (): Storage => ({ + getItem: () => null, + setItem: () => null, + removeItem: () => null, + length: 0, + clear: () => null, + key: () => null, +}) + +function persisted(loc: Storage, key: string): CoreContext[] { + const items = loc.getItem(key) + return (items ? JSON.parse(items) : []).map( + (p: SerializedContext) => new CoreContext(p.event, p.id) + ) +} + +function persistItems(loc: Storage, key: string, items: CoreContext[]): void { + const existing = persisted(loc, key) + const all = [...items, ...existing] + + const merged = all.reduce((acc, item) => { + return { + ...acc, + [item.id]: item, + } + }, {} as Record) + + loc.setItem(key, JSON.stringify(Object.values(merged))) +} + +function seen(loc: Storage, key: string): Record { + const stored = loc.getItem(key) + return stored ? JSON.parse(stored) : {} +} + +function persistSeen( + loc: Storage, + key: string, + memory: Record +): void { + const stored = seen(loc, key) + + loc.setItem( + key, + JSON.stringify({ + ...stored, + ...memory, + }) + ) +} + +function remove(loc: Storage, key: string): void { + loc.removeItem(key) +} + +const now = (): number => new Date().getTime() + +function mutex( + loc: Storage, + key: string, + onUnlock: Function, + attempt = 0 +): void { + const lockTimeout = 50 + const lockKey = `persisted-queue:v1:${key}:lock` + + const expired = (lock: number): boolean => new Date().getTime() > lock + const rawLock = loc.getItem(lockKey) + const lock = rawLock ? (JSON.parse(rawLock) as number) : null + + const allowed = lock === null || expired(lock) + if (allowed) { + loc.setItem(lockKey, JSON.stringify(now() + lockTimeout)) + onUnlock() + loc.removeItem(lockKey) + return + } + + if (!allowed && attempt < 3) { + setTimeout(() => { + mutex(loc, key, onUnlock, attempt + 1) + }, lockTimeout) + } else { + console.error('Unable to retrieve lock') + } +} + +export class PersistedPriorityQueue extends PriorityQueue { + loc: Storage + constructor(maxAttempts: number, key: string) { + super(maxAttempts, []) + + if (typeof window === undefined) { + throw new Error('must be run in browser.') + } + + this.loc = window.localStorage ? window.localStorage : nullStorage() + + const itemsKey = `persisted-queue:v1:${key}:items` + const seenKey = `persisted-queue:v1:${key}:seen` + + let saved: CoreContext[] = [] + let lastSeen: Record = {} + + mutex(this.loc, key, () => { + try { + saved = persisted(this.loc, itemsKey) + lastSeen = seen(this.loc, seenKey) + remove(this.loc, itemsKey) + remove(this.loc, seenKey) + + this.queue = [...saved, ...this.queue] + this.seen = { ...lastSeen, ...this.seen } + } catch (err) { + console.error(err) + } + }) + + window.addEventListener('beforeunload', () => { + if (this.todo > 0) { + const items = [...this.queue, ...this.future] + try { + mutex(this.loc, key, () => { + persistItems(this.loc, itemsKey, items) + persistSeen(this.loc, seenKey, this.seen) + }) + } catch (err) { + console.error(err) + } + } + }) + } +} diff --git a/packages/core/src/queue/__tests__/event-queue.test.ts b/packages/core/src/queue/__tests__/event-queue.test.ts new file mode 100644 index 000000000..68d58c2f7 --- /dev/null +++ b/packages/core/src/queue/__tests__/event-queue.test.ts @@ -0,0 +1,766 @@ +/* eslint-disable @typescript-eslint/no-floating-promises */ +import { noop } from 'lodash' +import { CoreAnalytics } from '../../analytics' +import { pWhile } from '../../utils/p-while' +import * as timer from '../../priority-queue/backoff' +import { + MiddlewareFunction, + sourceMiddlewarePlugin, +} from '../../plugins/middleware' +import { CoreContext, ContextCancelation } from '../../context' +import { CorePlugin } from '../../plugins' +import { pTimeout } from '../../callback' +import { EventQueue as EQ } from '../event-queue' +import { PriorityQueue } from '../../priority-queue' + +class EventQueue extends EQ { + constructor() { + super(new PriorityQueue(4, [])) + } +} + +async function flushAll(eq: EventQueue): Promise { + const flushSpy = jest.spyOn(eq, 'flush') + await pWhile( + () => eq.queue.length > 0, + async () => { + return new Promise((r) => setTimeout(r, 0)) + } + ) + const results = flushSpy.mock.results.map((r) => r.value) + flushSpy.mockClear() + + const flushed = await Promise.all(results) + return flushed.reduce((prev, cur) => { + return prev.concat(cur) + }, []) +} + +const testPlugin: CorePlugin = { + name: 'test', + type: 'before', + version: '0.1.0', + load: () => Promise.resolve(), + isLoaded: () => true, +} + +const ajs = {} as CoreAnalytics + +let fruitBasket: CoreContext, basketView: CoreContext, shopper: CoreContext + +beforeEach(() => { + fruitBasket = new CoreContext({ + type: 'track', + event: 'Fruit Basket', + properties: { + banana: '🍌', + apple: '🍎', + grape: '🍇', + }, + }) + + basketView = new CoreContext({ + type: 'page', + }) + + shopper = new CoreContext({ + type: 'identify', + traits: { + name: 'Netto Farah', + }, + }) +}) + +test('can send events', async () => { + const eq = new EventQueue() + const evt = await eq.dispatch(fruitBasket) + expect(evt).toBe(fruitBasket) +}) + +test('delivers events out of band', async () => { + jest.useFakeTimers() + + const eq = new EventQueue() + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + eq.dispatch(fruitBasket) + + expect(jest.getTimerCount()).toBe(1) + expect(eq.queue.includes(fruitBasket)).toBe(true) + + // run timers and deliver events + jest.runAllTimers() + await eq.flush() + + expect(eq.queue.length).toBe(0) +}) + +test('does not enqueue multiple flushes at once', async () => { + jest.useFakeTimers() + + const eq = new EventQueue() + + const anothaOne = new CoreContext({ + type: 'page', + }) + + eq.dispatch(fruitBasket) + eq.dispatch(anothaOne) + + expect(jest.getTimerCount()).toBe(1) + expect(eq.queue.length).toBe(2) + + // Ensure already enqueued tasks are executed + jest.runAllTimers() + + // reset the world to use the real timers + jest.useRealTimers() + await flushAll(eq) + + expect(eq.queue.length).toBe(0) +}) + +describe('Flushing', () => { + beforeEach(() => { + jest.useRealTimers() + }) + + test('works until the queue is empty', async () => { + const eq = new EventQueue() + + eq.dispatch(fruitBasket) + eq.dispatch(basketView) + eq.dispatch(shopper) + + expect(eq.queue.length).toBe(3) + + const flushed = await flushAll(eq) + + expect(eq.queue.length).toBe(0) + expect(flushed).toEqual([fruitBasket, basketView, shopper]) + }) + + test('re-queues failed events', async () => { + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + track: (ctx) => { + if (ctx === fruitBasket) { + throw new Error('aaay') + } + + return Promise.resolve(ctx) + }, + }, + ajs + ) + + eq.dispatch(fruitBasket) + eq.dispatch(basketView) + eq.dispatch(shopper) + + expect(eq.queue.length).toBe(3) + + const flushed = await flushAll(eq) + + // flushed good events + expect(flushed).toEqual([basketView, shopper]) + + // attempted to deliver multiple times + expect(eq.queue.getAttempts(fruitBasket)).toEqual(2) + }) + + test('waits for critical tasks to finish before performing event deliveries', async () => { + jest.useRealTimers() + + const eq = new EventQueue() + + let finishCriticalTask: () => void = noop + const startTask = () => + new Promise((res) => (finishCriticalTask = res)) + + // some preceding events that've been scheduled + const p1 = eq.dispatch(fruitBasket) + const p2 = eq.dispatch(basketView) + // a critical task has been kicked off + eq.criticalTasks.run(startTask) + // a succeeding event + const p3 = eq.dispatch(shopper) + + // even after a good amount of time, none of the events should be delivered + await expect(pTimeout(Promise.race([p1, p2, p3]), 1000)).rejects.toThrow() + + // give the green light + finishCriticalTask() + + // now that the task is complete, the delivery should resume + expect(await Promise.all([p1, p2, p3])).toMatchObject([ + fruitBasket, + basketView, + shopper, + ]) + }) + + test('delivers events on retry', async () => { + jest.useRealTimers() + + // make sure all backoffs return immediatelly + jest.spyOn(timer, 'backoff').mockImplementationOnce(() => 100) + + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + track: (ctx) => { + // only fail first attempt + if (ctx === fruitBasket && ctx.attempts === 1) { + throw new Error('aaay') + } + + return Promise.resolve(ctx) + }, + }, + ajs + ) + + eq.dispatch(fruitBasket) + eq.dispatch(basketView) + eq.dispatch(shopper) + + expect(eq.queue.length).toBe(3) + + let flushed = await flushAll(eq) + // delivered both basket and shopper + expect(flushed).toEqual([basketView, shopper]) + + // wait for the exponential backoff + await new Promise((res) => setTimeout(res, 100)) + + // second try + flushed = await flushAll(eq) + expect(eq.queue.length).toBe(0) + + expect(flushed).toEqual([fruitBasket]) + expect(flushed[0].attempts).toEqual(2) + }) + + test('does not retry non retriable cancelations', async () => { + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + track: async (ctx) => { + if (ctx === fruitBasket) { + throw new ContextCancelation({ retry: false, reason: 'Test!' }) + } + return ctx + }, + }, + ajs + ) + + const dispatches = [ + eq.dispatch(fruitBasket), + eq.dispatch(basketView), + eq.dispatch(shopper), + ] + + expect(eq.queue.length).toBe(3) + + const flushed = await Promise.all(dispatches) + + // delivered both basket and shopper + expect(flushed).toEqual([fruitBasket, basketView, shopper]) + + // nothing was retried + expect(basketView.attempts).toEqual(1) + expect(shopper.attempts).toEqual(1) + expect(fruitBasket.attempts).toEqual(1) + expect(eq.queue.length).toBe(0) + }) + + test('does not retry non retriable cancelations (dispatchSingle)', async () => { + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + track: async (ctx) => { + if (ctx === fruitBasket) { + throw new ContextCancelation({ retry: false, reason: 'Test!' }) + } + return ctx + }, + }, + ajs + ) + + const context = await eq.dispatchSingle(fruitBasket) + + expect(context.attempts).toEqual(1) + }) + + test('retries retriable cancelations', async () => { + // make sure all backoffs return immediatelly + jest.spyOn(timer, 'backoff').mockImplementationOnce(() => 100) + + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + track: (ctx) => { + // only fail first attempt + if (ctx === fruitBasket && ctx.attempts === 1) { + ctx.cancel(new ContextCancelation({ retry: true })) + } + + return Promise.resolve(ctx) + }, + }, + ajs + ) + + eq.dispatch(fruitBasket) + eq.dispatch(basketView) + eq.dispatch(shopper) + + expect(eq.queue.length).toBe(3) + + let flushed = await flushAll(eq) + // delivered both basket and shopper + expect(flushed).toEqual([basketView, shopper]) + + // wait for the exponential backoff + await new Promise((res) => setTimeout(res, 100)) + + // second try + flushed = await flushAll(eq) + expect(eq.queue.length).toBe(0) + + expect(flushed).toEqual([fruitBasket]) + expect(flushed[0].attempts).toEqual(2) + }) + + test('client: can block on delivery', async () => { + jest.useRealTimers() + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + track: (ctx) => { + // only fail first attempt + if (ctx === fruitBasket && ctx.attempts === 1) { + throw new Error('aaay') + } + + return Promise.resolve(ctx) + }, + }, + ajs + ) + + const fruitBasketDelivery = eq.dispatch(fruitBasket) + const basketViewDelivery = eq.dispatch(basketView) + const shopperDelivery = eq.dispatch(shopper) + + expect(eq.queue.length).toBe(3) + + const [fruitBasketCtx, basketViewCtx, shopperCtx] = await Promise.all([ + fruitBasketDelivery, + basketViewDelivery, + shopperDelivery, + ]) + + expect(eq.queue.length).toBe(0) + + expect(fruitBasketCtx.attempts).toBe(2) + expect(basketViewCtx.attempts).toBe(1) + expect(shopperCtx.attempts).toBe(1) + }) + + describe('denyList permutations', () => { + const amplitude = { + ...testPlugin, + name: 'Amplitude', + type: 'destination' as const, + track: (ctx: CoreContext): Promise | CoreContext => { + return Promise.resolve(ctx) + }, + } + + const mixPanel = { + ...testPlugin, + name: 'Mixpanel', + type: 'destination' as const, + track: (ctx: CoreContext): Promise | CoreContext => { + return Promise.resolve(ctx) + }, + } + + const segmentio = { + ...testPlugin, + name: 'Segment.io', + type: 'after' as const, + track: (ctx: CoreContext): Promise | CoreContext => { + return Promise.resolve(ctx) + }, + } + + test('does not delivery to destinations on denyList', async () => { + const eq = new EventQueue() + + jest.spyOn(amplitude, 'track') + jest.spyOn(mixPanel, 'track') + jest.spyOn(segmentio, 'track') + + const evt = { + type: 'track' as const, + integrations: { + Mixpanel: false, + 'Segment.io': false, + }, + } + + const ctx = new CoreContext(evt) + + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), mixPanel, ajs) + await eq.register(CoreContext.system(), segmentio, ajs) + + eq.dispatch(ctx) + + expect(eq.queue.length).toBe(1) + + const flushed = await flushAll(eq) + + expect(flushed).toEqual([ctx]) + + expect(mixPanel.track).not.toHaveBeenCalled() + expect(amplitude.track).toHaveBeenCalled() + expect(segmentio.track).not.toHaveBeenCalled() + }) + + test('does not deliver to any destination except Segment.io if All: false ', async () => { + const eq = new EventQueue() + + jest.spyOn(amplitude, 'track') + jest.spyOn(mixPanel, 'track') + jest.spyOn(segmentio, 'track') + + const evt = { + type: 'track' as const, + integrations: { + All: false, + }, + } + + const ctx = new CoreContext(evt) + + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), mixPanel, ajs) + await eq.register(CoreContext.system(), segmentio, ajs) + + eq.dispatch(ctx) + + expect(eq.queue.length).toBe(1) + const flushed = await flushAll(eq) + + expect(flushed).toEqual([ctx]) + + expect(mixPanel.track).not.toHaveBeenCalled() + expect(amplitude.track).not.toHaveBeenCalled() + expect(segmentio.track).toHaveBeenCalled() + }) + + test('does not deliver when All: false and destination is also explicitly false', async () => { + const eq = new EventQueue() + + jest.spyOn(amplitude, 'track') + jest.spyOn(mixPanel, 'track') + jest.spyOn(segmentio, 'track') + + const evt = { + type: 'track' as const, + integrations: { + All: false, + Amplitude: false, + 'Segment.io': false, + }, + } + + const ctx = new CoreContext(evt) + + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), mixPanel, ajs) + await eq.register(CoreContext.system(), segmentio, ajs) + + eq.dispatch(ctx) + + expect(eq.queue.length).toBe(1) + const flushed = await flushAll(eq) + + expect(flushed).toEqual([ctx]) + + expect(mixPanel.track).not.toHaveBeenCalled() + expect(amplitude.track).not.toHaveBeenCalled() + expect(segmentio.track).not.toHaveBeenCalled() + }) + + test('delivers to destinations if All: false but the destination is allowed', async () => { + const eq = new EventQueue() + + jest.spyOn(amplitude, 'track') + jest.spyOn(mixPanel, 'track') + jest.spyOn(segmentio, 'track') + + const evt = { + type: 'track' as const, + integrations: { + All: false, + Amplitude: true, + 'Segment.io': true, + }, + } + + const ctx = new CoreContext(evt) + + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), mixPanel, ajs) + await eq.register(CoreContext.system(), segmentio, ajs) + + eq.dispatch(ctx) + + expect(eq.queue.length).toBe(1) + const flushed = await flushAll(eq) + + expect(flushed).toEqual([ctx]) + + expect(mixPanel.track).not.toHaveBeenCalled() + expect(amplitude.track).toHaveBeenCalled() + expect(segmentio.track).toHaveBeenCalled() + }) + + test('delivers to Segment.io if All: false but Segment.io is not specified', async () => { + const eq = new EventQueue() + + jest.spyOn(amplitude, 'track') + jest.spyOn(mixPanel, 'track') + jest.spyOn(segmentio, 'track') + + const evt = { + type: 'track' as const, + integrations: { + All: false, + Amplitude: true, + }, + } + + const ctx = new CoreContext(evt) + + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), mixPanel, ajs) + await eq.register(CoreContext.system(), segmentio, ajs) + + eq.dispatch(ctx) + + expect(eq.queue.length).toBe(1) + const flushed = await flushAll(eq) + + expect(flushed).toEqual([ctx]) + + expect(mixPanel.track).not.toHaveBeenCalled() + expect(amplitude.track).toHaveBeenCalled() + expect(segmentio.track).toHaveBeenCalled() + }) + + test('delivers to destinations that exist as an object', async () => { + const eq = new EventQueue() + + jest.spyOn(amplitude, 'track') + jest.spyOn(segmentio, 'track') + + const evt = { + type: 'track' as const, + integrations: { + All: false, + Amplitude: { + amplitudeKey: 'foo', + }, + 'Segment.io': {}, + }, + } + + const ctx = new CoreContext(evt) + + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), segmentio, ajs) + + eq.dispatch(ctx) + + expect(eq.queue.length).toBe(1) + const flushed = await flushAll(eq) + + expect(flushed).toEqual([ctx]) + + expect(amplitude.track).toHaveBeenCalled() + expect(segmentio.track).toHaveBeenCalled() + }) + + test('respect deny lists generated by other plugin', async () => { + const eq = new EventQueue() + + jest.spyOn(amplitude, 'track') + jest.spyOn(mixPanel, 'track') + jest.spyOn(segmentio, 'track') + + const evt = { + type: 'track' as const, + integrations: { + Amplitude: true, + MixPanel: true, + 'Segment.io': true, + }, + } + + const ctx = new CoreContext(evt) + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), mixPanel, ajs) + await eq.register(CoreContext.system(), segmentio, ajs) + await eq.dispatch(ctx) + + const skipAmplitudeAndSegment: MiddlewareFunction = ({ + payload, + next, + }) => { + if (!payload.obj.integrations) { + payload.obj.integrations = {} + } + + payload.obj.integrations['Amplitude'] = false + payload.obj.integrations['Segment.io'] = false + next(payload) + } + + await eq.register( + CoreContext.system(), + sourceMiddlewarePlugin(skipAmplitudeAndSegment, {}), + ajs + ) + + await eq.dispatch(ctx) + + expect(mixPanel.track).toHaveBeenCalledTimes(2) + expect(amplitude.track).toHaveBeenCalledTimes(1) + expect(segmentio.track).toHaveBeenCalledTimes(1) + }) + }) +}) + +describe('deregister', () => { + it('remove plugin from plugins list', async () => { + const eq = new EventQueue() + const toBeRemoved = { ...testPlugin, name: 'remove-me' } + const plugins = [testPlugin, toBeRemoved] + + const promises = plugins.map((p) => + eq.register(CoreContext.system(), p, ajs) + ) + await Promise.all(promises) + + await eq.deregister(CoreContext.system(), toBeRemoved, ajs) + expect(eq.plugins.length).toBe(1) + expect(eq.plugins[0]).toBe(testPlugin) + }) + + it('invokes plugin.unload', async () => { + const eq = new EventQueue() + const toBeRemoved = { ...testPlugin, name: 'remove-me', unload: jest.fn() } + const plugins = [testPlugin, toBeRemoved] + + const promises = plugins.map((p) => + eq.register(CoreContext.system(), p, ajs) + ) + await Promise.all(promises) + + await eq.deregister(CoreContext.system(), toBeRemoved, ajs) + expect(toBeRemoved.unload).toHaveBeenCalled() + expect(eq.plugins.length).toBe(1) + expect(eq.plugins[0]).toBe(testPlugin) + }) +}) + +describe('dispatchSingle', () => { + it('dispatches events without placing them on the queue', async () => { + const eq = new EventQueue() + const promise = eq.dispatchSingle(fruitBasket) + + expect(eq.queue.length).toBe(0) + await promise + expect(eq.queue.length).toBe(0) + }) + + it.skip('records delivery metrics', async () => { + // Skip because we don't support metrics atm + const eq = new EventQueue() + const ctx = await eq.dispatchSingle( + new CoreContext({ + type: 'track', + }) + ) + + expect(ctx.logs().map((l) => l.message)).toMatchInlineSnapshot(` + Array [ + "Dispatching", + "Delivered", + ] + `) + + expect(ctx.stats?.metrics.map((m) => m.metric)).toMatchInlineSnapshot(` + Array [ + "message_dispatched", + "message_delivered", + "delivered", + ] + `) + }) + + test('retries retriable cancelations', async () => { + // make sure all backoffs return immediatelly + jest.spyOn(timer, 'backoff').mockImplementationOnce(() => 100) + + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + track: (ctx) => { + // only fail first attempt + if (ctx === fruitBasket && ctx.attempts === 1) { + ctx.cancel(new ContextCancelation({ retry: true })) + } + + return Promise.resolve(ctx) + }, + }, + ajs + ) + + expect(eq.queue.length).toBe(0) + + const attempted = await eq.dispatchSingle(fruitBasket) + expect(attempted.attempts).toEqual(2) + }) +}) diff --git a/packages/core/src/queue/__tests__/extension-flushing.test.ts b/packages/core/src/queue/__tests__/extension-flushing.test.ts new file mode 100644 index 000000000..4e2138e8c --- /dev/null +++ b/packages/core/src/queue/__tests__/extension-flushing.test.ts @@ -0,0 +1,411 @@ +import { shuffle } from 'lodash' +import { CoreAnalytics } from '../../analytics' +import { PriorityQueue } from '../../priority-queue' +import { CoreContext } from '../../context' +import { CorePlugin as Plugin } from '../../plugins' +import { EventQueue as EQ } from '../event-queue' + +class EventQueue extends EQ { + constructor() { + super(new PriorityQueue(4, [])) + } +} +const fruitBasket = new CoreContext({ + type: 'track', + event: 'Fruit Basket', + properties: { + banana: '🍌', + apple: '🍎', + grape: '🍇', + }, +}) + +const testPlugin: Plugin = { + name: 'test', + type: 'before', + version: '0.1.0', + load: () => Promise.resolve(), + isLoaded: () => true, +} + +const ajs = {} as CoreAnalytics + +describe('Registration', () => { + test('can register plugins', async () => { + const eq = new EventQueue() + const load = jest.fn() + + const plugin: Plugin = { + name: 'test', + type: 'before', + version: '0.1.0', + load, + isLoaded: () => true, + } + + const ctx = CoreContext.system() + await eq.register(ctx, plugin, ajs) + + expect(load).toHaveBeenCalledWith(ctx, ajs) + }) + + test('fails if plugin cant be loaded', async () => { + const eq = new EventQueue() + + const plugin: Plugin = { + name: 'test', + type: 'before', + version: '0.1.0', + load: () => Promise.reject(new Error('👻')), + isLoaded: () => false, + } + + const ctx = CoreContext.system() + await expect( + eq.register(ctx, plugin, ajs) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"👻"`) + }) + + test('allows for destinations to fail registration', async () => { + const eq = new EventQueue() + + const plugin: Plugin = { + name: 'test', + type: 'destination', + version: '0.1.0', + load: () => Promise.reject(new Error('👻')), + isLoaded: () => false, + } + + const ctx = CoreContext.system() + await eq.register(ctx, plugin, ajs) + + expect(ctx.logs()[0].level).toEqual('warn') + expect(ctx.logs()[0].message).toEqual('Failed to load destination') + }) +}) + +describe('Plugin flushing', () => { + test('ensures `before` plugins are run', async () => { + const eq = new EventQueue() + const queue = new PriorityQueue(1, []) + + eq.queue = queue + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + type: 'before', + }, + ajs + ) + + const flushed = await eq.dispatch(fruitBasket) + expect(flushed.logs().map((l) => l.message)).toContain('Delivered') + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + name: 'Faulty before', + type: 'before', + track: () => { + throw new Error('aaay') + }, + }, + ajs + ) + + const failedFlush: CoreContext = await eq + .dispatch( + new CoreContext({ + type: 'track', + }) + ) + .catch((ctx) => ctx) + + const messages = failedFlush.logs().map((l) => l.message) + expect(messages).not.toContain('Delivered') + }) + + test('atempts `enrichment` plugins', async () => { + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + name: 'Faulty enrichment', + type: 'enrichment', + track: () => { + throw new Error('aaay') + }, + }, + ajs + ) + + const flushed = await eq.dispatch( + new CoreContext({ + type: 'track', + }) + ) + + const messages = flushed.logs().map((l) => l.message) + expect(messages).toContain('Delivered') + }) + + test('attempts `destination` plugins', async () => { + const eq = new EventQueue() + + const amplitude: Plugin = { + ...testPlugin, + name: 'Amplitude', + type: 'destination', + track: async () => { + throw new Error('Boom!') + }, + } + + const fullstory: Plugin = { + ...testPlugin, + name: 'FullStory', + type: 'destination', + } + + await eq.register(CoreContext.system(), amplitude, ajs) + await eq.register(CoreContext.system(), fullstory, ajs) + + const flushed = await eq.dispatch( + new CoreContext({ + type: 'track', + }) + ) + + const messages = flushed + .logs() + .map((l) => ({ message: l.message, extras: l.extras })) + + expect(messages).toMatchInlineSnapshot(` + Array [ + Object { + "extras": undefined, + "message": "Dispatching", + }, + Object { + "extras": Object { + "plugin": "Amplitude", + }, + "message": "plugin", + }, + Object { + "extras": Object { + "plugin": "FullStory", + }, + "message": "plugin", + }, + Object { + "extras": Object { + "error": [Error: Boom!], + "plugin": "Amplitude", + }, + "message": "plugin Error", + }, + Object { + "extras": Object { + "type": "track", + }, + "message": "Delivered", + }, + ] + `) + }) + + test('attempts `after` plugins', async () => { + const eq = new EventQueue() + + const afterFailed: Plugin = { + ...testPlugin, + name: 'after-failed', + type: 'after', + track: async () => { + throw new Error('Boom!') + }, + } + + const after: Plugin = { + ...testPlugin, + name: 'after', + type: 'after', + } + + await eq.register(CoreContext.system(), afterFailed, ajs) + await eq.register(CoreContext.system(), after, ajs) + + const flushed = await eq.dispatch( + new CoreContext({ + type: 'track', + }) + ) + + const messages = flushed + .logs() + .map((l) => ({ message: l.message, extras: l.extras })) + expect(messages).toMatchInlineSnapshot(` + Array [ + Object { + "extras": undefined, + "message": "Dispatching", + }, + Object { + "extras": Object { + "plugin": "after-failed", + }, + "message": "plugin", + }, + Object { + "extras": Object { + "plugin": "after", + }, + "message": "plugin", + }, + Object { + "extras": Object { + "error": [Error: Boom!], + "plugin": "after-failed", + }, + "message": "plugin Error", + }, + Object { + "extras": Object { + "type": "track", + }, + "message": "Delivered", + }, + ] + `) + }) + + test('runs `enrichment` and `before` inline', async () => { + const eq = new EventQueue() + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + name: 'Kiwi', + type: 'enrichment', + track: async (ctx) => { + ctx.updateEvent('properties.kiwi', '🥝') + return ctx + }, + }, + ajs + ) + + await eq.register( + CoreContext.system(), + { + ...testPlugin, + name: 'Watermelon', + type: 'enrichment', + track: async (ctx) => { + ctx.updateEvent('properties.watermelon', '🍉') + return ctx + }, + }, + ajs + ) + let trackCalled = false + await eq.register( + CoreContext.system(), + { + ...testPlugin, + name: 'Before', + type: 'before', + track: async (ctx) => { + trackCalled = true + return ctx + }, + }, + ajs + ) + + const flushed = await eq.dispatch( + new CoreContext({ + type: 'track', + }) + ) + + expect(flushed.event.properties).toEqual({ + watermelon: '🍉', + kiwi: '🥝', + }) + + expect(trackCalled).toBeTruthy() + }) + + test('respects execution order', async () => { + const eq = new EventQueue() + + let trace = 0 + + const before: Plugin = { + ...testPlugin, + name: 'Before', + type: 'before', + track: async (ctx) => { + trace++ + expect(trace).toBe(1) + return ctx + }, + } + + const enrichment: Plugin = { + ...testPlugin, + name: 'Enrichment', + type: 'enrichment', + track: async (ctx) => { + trace++ + expect(trace === 2 || trace === 3).toBe(true) + return ctx + }, + } + + const enrichmentTwo: Plugin = { + ...testPlugin, + name: 'Enrichment 2', + type: 'enrichment', + track: async (ctx) => { + trace++ + expect(trace === 2 || trace === 3).toBe(true) + return ctx + }, + } + + const destination: Plugin = { + ...testPlugin, + name: 'Destination', + type: 'destination', + track: async (ctx) => { + trace++ + expect(trace).toBe(4) + return ctx + }, + } + + // shuffle plugins so we can verify order + const plugins = shuffle([before, enrichment, enrichmentTwo, destination]) + for (const xt of plugins) { + await eq.register(CoreContext.system(), xt, ajs) + } + + await eq.dispatch( + new CoreContext({ + type: 'track', + }) + ) + + expect(trace).toBe(4) + }) +}) diff --git a/packages/core/src/queue/delivery.ts b/packages/core/src/queue/delivery.ts new file mode 100644 index 000000000..31825a15a --- /dev/null +++ b/packages/core/src/queue/delivery.ts @@ -0,0 +1,74 @@ +import { CoreContext, ContextCancelation } from '../context' +import { CorePlugin } from '../plugins' +async function tryOperation( + op: () => CoreContext | Promise +): Promise { + try { + return await op() + } catch (err) { + return Promise.reject(err) + } +} + +export function attempt( + ctx: CoreContext, + plugin: CorePlugin +): Promise { + ctx.log('debug', 'plugin', { plugin: plugin.name }) + const start = new Date().getTime() + + const hook = plugin[ctx.event.type] + if (hook === undefined) { + return Promise.resolve(ctx) + } + + const newCtx = tryOperation(() => hook.apply(plugin, [ctx])) + .then((ctx) => { + const done = new Date().getTime() - start + ctx.stats?.gauge('plugin_time', done, [`plugin:${plugin.name}`]) + + return ctx + }) + .catch((err) => { + if ( + err instanceof ContextCancelation && + err.type === 'middleware_cancellation' + ) { + throw err + } + + if (err instanceof ContextCancelation) { + ctx.log('warn', err.type, { + plugin: plugin.name, + error: err, + }) + + return err + } + + ctx.log('error', 'plugin Error', { + plugin: plugin.name, + error: err, + }) + ctx.stats?.increment('plugin_error', 1, [`plugin:${plugin.name}`]) + + return err as Error + }) + + return newCtx +} + +export function ensure( + ctx: CoreContext, + plugin: CorePlugin +): Promise { + return attempt(ctx, plugin).then((newContext) => { + if (newContext instanceof CoreContext) { + return newContext + } + + ctx.log('debug', 'Context canceled') + ctx.stats?.increment('context_canceled') + ctx.cancel(newContext) + }) +} diff --git a/packages/core/src/queue/event-queue.ts b/packages/core/src/queue/event-queue.ts new file mode 100644 index 000000000..fa1191362 --- /dev/null +++ b/packages/core/src/queue/event-queue.ts @@ -0,0 +1,320 @@ +import { CoreAnalytics } from '../analytics' +import { groupBy } from '../utils/group-by' +import { ON_REMOVE_FROM_FUTURE, PriorityQueue } from '../priority-queue' + +import { CoreContext, ContextCancelation } from '../context' +import { Emitter } from '../emitter' +import { Integrations } from '../events/interfaces' +import { CorePlugin } from '../plugins' +import { createTaskGroup, TaskGroup } from '../task/task-group' +import { attempt, ensure } from './delivery' +import { isOffline } from '../connection' + +type CorePluginsByType = { + before: CorePlugin[] + after: CorePlugin[] + enrichment: CorePlugin[] + destinations: CorePlugin[] +} + +type EmittedEventNames = + | 'message_dispatched' + | 'message_delivered' + | 'message_enriched' + | 'delivery_success' + | 'delivery_failure' + | 'flush' + | 'plugin_loaded' + | 'plugin_error' + | 'context_cancelled' + +export class EventQueue extends Emitter { + /** + * All event deliveries get suspended until all the tasks in this task group are complete. + * For example: a middleware that augments the event object should be loaded safely as a + * critical task, this way, event queue will wait for it to be ready before sending events. + * + * This applies to all the events already in the queue, and the upcoming ones + */ + criticalTasks: TaskGroup = createTaskGroup() + queue: PriorityQueue + plugins: CorePlugin[] = [] + failedInitializations: string[] = [] + private flushing = false + + constructor(priorityQueue: PriorityQueue) { + super() + + this.queue = priorityQueue + this.queue.on(ON_REMOVE_FROM_FUTURE, () => { + this.scheduleFlush(0) + }) + } + + async register( + ctx: CoreContext, + plugin: CorePlugin, + instance: CoreAnalytics + ): Promise { + await Promise.resolve(plugin.load(ctx, instance)) + .then(() => { + this.plugins.push(plugin) + }) + .catch((err) => { + if (plugin.type === 'destination') { + this.failedInitializations.push(plugin.name) + console.warn(plugin.name, err) + + ctx.log('warn', 'Failed to load destination', { + plugin: plugin.name, + error: err, + }) + + return + } + + throw err + }) + } + + async deregister( + ctx: CoreContext, + plugin: CorePlugin, + instance: CoreAnalytics + ): Promise { + try { + if (plugin.unload) { + await Promise.resolve(plugin.unload(ctx, instance)) + } + + this.plugins = this.plugins.filter((p) => p.name !== plugin.name) + } catch (e) { + ctx.log('warn', 'Failed to unload destination', { + plugin: plugin.name, + error: e, + }) + } + } + + async dispatch(ctx: CoreContext): Promise { + ctx.log('debug', 'Dispatching') + this.emit('message_dispatched') + ctx.stats?.increment('message_dispatched') + + this.queue.push(ctx) + const willDeliver = this.subscribeToDelivery(ctx) + this.scheduleFlush(0) + return willDeliver + } + + private async subscribeToDelivery(ctx: CoreContext): Promise { + return new Promise((resolve) => { + const onDeliver = (flushed: CoreContext, delivered: boolean): void => { + if (flushed.isSame(ctx)) { + this.off('flush', onDeliver) + if (delivered) { + resolve(flushed) + } else { + resolve(flushed) + } + } + } + + this.on('flush', onDeliver) + }) + } + + async dispatchSingle(ctx: CoreContext): Promise { + ctx.log('debug', 'Dispatching') + this.emit('message_dispatched') + + this.queue.updateAttempts(ctx) + ctx.attempts = 1 + + return this.deliver(ctx).catch((err) => { + if (err instanceof ContextCancelation && err.retry === false) { + ctx.setFailedDelivery({ reason: err }) + return ctx + } + + const accepted = this.enqueuRetry(err, ctx) + if (!accepted) { + ctx.setFailedDelivery({ reason: err }) + return ctx + } + + return this.subscribeToDelivery(ctx) + }) + } + + isEmpty(): boolean { + return this.queue.length === 0 + } + + private scheduleFlush(timeout = 500): void { + if (this.flushing) { + return + } + + this.flushing = true + + setTimeout(() => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.flush().then(() => { + setTimeout(() => { + this.flushing = false + + if (this.queue.length) { + this.scheduleFlush(0) + } + }, 0) + }) + }, timeout) + } + + private async deliver(ctx: CoreContext): Promise { + await this.criticalTasks.done() + + const start = Date.now() + try { + ctx = await this.flushOne(ctx) + const done = Date.now() - start + this.emit('delivery_success', done) + ctx.stats?.gauge('delivered', done) + ctx.log('debug', 'Delivered', ctx.event) + return ctx + } catch (err) { + ctx.log('error', 'Failed to deliver', err as object) + this.emit('delivery_failure') + ctx.stats?.increment('delivery_failed') + throw err + } + } + + private enqueuRetry(err: Error, ctx: CoreContext): boolean { + const notRetriable = + err instanceof ContextCancelation && err.retry === false + const retriable = !notRetriable + + if (retriable) { + const accepted = this.queue.pushWithBackoff(ctx) + return accepted + } + + return false + } + + async flush(): Promise { + if (this.queue.length === 0 || isOffline()) { + return [] + } + + let ctx = this.queue.pop() + if (!ctx) { + return [] + } + + ctx.attempts = this.queue.getAttempts(ctx) + + try { + ctx = await this.deliver(ctx) + this.emit('flush', ctx, true) + } catch (err: any) { + const accepted = this.enqueuRetry(err, ctx) + + if (!accepted) { + ctx.setFailedDelivery({ reason: err }) + this.emit('flush', ctx, false) + } + + return [] + } + + return [ctx] + } + + private isReady(): boolean { + // return this.plugins.every((p) => p.isLoaded()) + // should we wait for every plugin to load? + return true + } + + private availableExtensions(denyList: Integrations): CorePluginsByType { + const available = this.plugins.filter((p) => { + // Only filter out destination plugins or the Segment.io plugin + if (p.type !== 'destination' && p.name !== 'Segment.io') { + return true + } + + // Explicit integration option takes precedence, `All: false` does not apply to Segment.io + return ( + denyList[p.name] ?? + (p.name === 'Segment.io' ? true : denyList.All) !== false + ) + }) + + const { + before = [], + enrichment = [], + destination = [], + after = [], + } = groupBy(available, 'type') + + return { + before, + enrichment, + destinations: destination, + after, + } + } + + private async flushOne(ctx: CoreContext): Promise { + if (!this.isReady()) { + throw new Error('Not ready') + } + + const { before, enrichment } = this.availableExtensions( + ctx.event.integrations ?? {} + ) + + for (const beforeWare of before) { + const temp: CoreContext | undefined = await ensure(ctx, beforeWare) + if (temp instanceof CoreContext) { + ctx = temp + } + } + + for (const enrichmentWare of enrichment) { + const temp = await attempt(ctx, enrichmentWare) + if (temp instanceof CoreContext) { + ctx = temp + } + } + + this.emit('message_enriched', ctx) // TODO: inspectorHost.enriched?.(ctx as any) + + // Enrichment and before plugins can re-arrange the deny list dynamically + // so we need to pluck them at the end + const { destinations, after } = this.availableExtensions( + ctx.event.integrations ?? {} + ) + + await new Promise((resolve, reject) => { + setTimeout(() => { + const attempts = destinations.map((destination) => + attempt(ctx, destination) + ) + Promise.all(attempts).then(resolve).catch(reject) + }, 0) + }) + + this.emit('message_delivered', ctx) + ctx.stats?.increment('message_delivered') + // should emit to: TODO: inspectorHost.delivered?.(ctx as any, ['segment.io']) -- can we move all this inspector stuff to a plugin that listens to these events? + + const afterCalls = after.map((after) => attempt(ctx, after)) + await Promise.all(afterCalls) + + return ctx + } +} diff --git a/packages/core/src/stats/index.ts b/packages/core/src/stats/index.ts new file mode 100644 index 000000000..6c6d22fb1 --- /dev/null +++ b/packages/core/src/stats/index.ts @@ -0,0 +1,89 @@ +import { RemoteMetrics } from './remote-metrics' + +type MetricType = 'gauge' | 'counter' +type CompactMetricType = 'g' | 'c' + +export interface Metric { + metric: string + value: number + type: MetricType + tags: string[] + timestamp: number // unit milliseconds +} + +export interface CompactMetric { + m: string // metric name + v: number // value + k: CompactMetricType + t: string[] // tags + e: number // timestamp in unit milliseconds +} + +const compactMetricType = (type: MetricType): CompactMetricType => { + const enums: Record = { + gauge: 'g', + counter: 'c', + } + return enums[type] +} + +export default class Stats { + metrics: Metric[] = [] + + private remoteMetrics?: RemoteMetrics + + constructor(remoteMetrics?: RemoteMetrics) { + this.remoteMetrics = remoteMetrics + } + + increment(metric: string, by = 1, tags?: string[]): void { + this.metrics.push({ + metric, + value: by, + tags: tags ?? [], + type: 'counter', + timestamp: Date.now(), + }) + + void this.remoteMetrics?.increment(metric, tags ?? []) + } + + gauge(metric: string, value: number, tags?: string[]): void { + this.metrics.push({ + metric, + value, + tags: tags ?? [], + type: 'gauge', + timestamp: Date.now(), + }) + } + + flush(): void { + const formatted = this.metrics.map((m) => ({ + ...m, + tags: m.tags.join(','), + })) + // ie doesn't like console.table + if (console.table) { + console.table(formatted) + } else { + console.log(formatted) + } + this.metrics = [] + } + + /** + * compact keys for smaller payload + */ + serialize(): CompactMetric[] { + return this.metrics.map((m) => { + return { + m: m.metric, + v: m.value, + t: m.tags, + k: compactMetricType(m.type), + e: m.timestamp, + } + }) + } +} diff --git a/packages/core/src/stats/remote-metrics.ts b/packages/core/src/stats/remote-metrics.ts new file mode 100644 index 000000000..41f0e7cce --- /dev/null +++ b/packages/core/src/stats/remote-metrics.ts @@ -0,0 +1,16 @@ +type MetricType = 'gauge' | 'counter' + +export interface Metric { + metric: string + value: number + type: MetricType + tags: string[] + timestamp: number // unit milliseconds +} + +export interface RemoteMetrics { + sampleRate: number + increment(metric: string, tags: string[]): Promise + queue: Metric[] + flush: Promise +} diff --git a/packages/core/src/task/task-group.ts b/packages/core/src/task/task-group.ts new file mode 100644 index 000000000..5ada27160 --- /dev/null +++ b/packages/core/src/task/task-group.ts @@ -0,0 +1,31 @@ +import { isThenable } from '../utils/is-thenable' + +export type TaskGroup = { + done: () => Promise + run: any>( + op: Operation + ) => ReturnType +} + +export const createTaskGroup = (): TaskGroup => { + let taskCompletionPromise: Promise + let resolvePromise: () => void + let count = 0 + + return { + done: () => taskCompletionPromise, + run: (op) => { + const returnValue = op() + + if (isThenable(returnValue)) { + if (++count === 1) { + taskCompletionPromise = new Promise((res) => (resolvePromise = res)) + } + + returnValue.finally(() => --count === 0 && resolvePromise()) + } + + return returnValue + }, + } +} diff --git a/packages/core/src/user/index.ts b/packages/core/src/user/index.ts new file mode 100644 index 000000000..31c472d2d --- /dev/null +++ b/packages/core/src/user/index.ts @@ -0,0 +1,7 @@ +export type ID = string | null | undefined + +// TODO: this is a base user +export interface User { + id(): ID + anonymousId(): ID +} diff --git a/packages/core/src/utils/__tests__/group-by.test.ts b/packages/core/src/utils/__tests__/group-by.test.ts new file mode 100644 index 000000000..432723df6 --- /dev/null +++ b/packages/core/src/utils/__tests__/group-by.test.ts @@ -0,0 +1,96 @@ +import { groupBy } from '../group-by' + +describe('groupBy', () => { + it('groups a collection by key', () => { + const collection = [ + { + id: 'go', + type: 'server', + }, + { + id: 'ruby', + type: 'server', + }, + { + id: 'js', + type: 'browser', + }, + { + id: 'react', + type: 'app', + }, + ] + + expect(groupBy(collection, 'type')).toEqual({ + app: [ + { + id: 'react', + type: 'app', + }, + ], + browser: [ + { + id: 'js', + type: 'browser', + }, + ], + server: [ + { + id: 'go', + type: 'server', + }, + { + id: 'ruby', + type: 'server', + }, + ], + }) + }) + + it('accepts a function', () => { + const collection = [ + { + id: 1, + type: 'server', + }, + { + id: 2, + type: 'server', + }, + { + id: 3, + type: 'browser', + }, + { + id: 4, + type: 'app', + }, + ] + + const oddEven = groupBy(collection, (c) => + c.id % 2 === 0 ? 'even' : 'odd' + ) + expect(oddEven).toEqual({ + even: [ + { + id: 2, + type: 'server', + }, + { + id: 4, + type: 'app', + }, + ], + odd: [ + { + id: 1, + type: 'server', + }, + { + id: 3, + type: 'browser', + }, + ], + }) + }) +}) diff --git a/packages/core/src/utils/__tests__/is-thenable.test.ts b/packages/core/src/utils/__tests__/is-thenable.test.ts new file mode 100644 index 000000000..8c61439bd --- /dev/null +++ b/packages/core/src/utils/__tests__/is-thenable.test.ts @@ -0,0 +1,39 @@ +import { isThenable } from '../is-thenable' + +describe('isThenable', () => { + test('es6 promises', () => { + const p = Promise.resolve(1) + expect(isThenable(p)).toBeTruthy() + }) + + test('on the prototype', () => { + class Foo { + then() { + return '123' + } + } + const p = new Foo() + expect(isThenable(p)).toBeTruthy() + }) + + test('on the pojo', () => { + const p = { + then: () => { + return '123' + }, + } + expect(isThenable(p)).toBeTruthy() + }) + + test('unhappy path', () => { + expect(isThenable(null)).toBeFalsy() + expect(isThenable(undefined)).toBeFalsy() + expect(isThenable({})).toBeFalsy() + expect( + isThenable({ + then: true, + }) + ).toBeFalsy() + expect(isThenable(new (class Foo {})())).toBeFalsy() + }) +}) diff --git a/packages/core/src/utils/bind-all.ts b/packages/core/src/utils/bind-all.ts new file mode 100644 index 000000000..410d64468 --- /dev/null +++ b/packages/core/src/utils/bind-all.ts @@ -0,0 +1,19 @@ +export function bindAll< + ObjType extends { [key: string]: any }, + KeyType extends keyof ObjType +>(obj: ObjType): ObjType { + const proto = obj.constructor.prototype + for (const key of Object.getOwnPropertyNames(proto)) { + if (key !== 'constructor') { + const desc = Object.getOwnPropertyDescriptor( + obj.constructor.prototype, + key + ) + if (!!desc && typeof desc.value === 'function') { + obj[key as KeyType] = obj[key].bind(obj) + } + } + } + + return obj +} diff --git a/packages/core/src/utils/environment.ts b/packages/core/src/utils/environment.ts new file mode 100644 index 000000000..48f5c6f95 --- /dev/null +++ b/packages/core/src/utils/environment.ts @@ -0,0 +1,7 @@ +export function isBrowser(): boolean { + return typeof window !== 'undefined' +} + +export function isServer(): boolean { + return !isBrowser() +} diff --git a/packages/core/src/utils/get-global.ts b/packages/core/src/utils/get-global.ts new file mode 100644 index 000000000..9d88259d0 --- /dev/null +++ b/packages/core/src/utils/get-global.ts @@ -0,0 +1,16 @@ +// This an imperfect polyfill for globalThis +export const getGlobal = () => { + if (typeof globalThis !== 'undefined') { + return globalThis + } + if (typeof self !== 'undefined') { + return self + } + if (typeof window !== 'undefined') { + return window + } + if (typeof global !== 'undefined') { + return global + } + return null +} diff --git a/packages/core/src/utils/group-by.ts b/packages/core/src/utils/group-by.ts new file mode 100644 index 000000000..4bebc4579 --- /dev/null +++ b/packages/core/src/utils/group-by.ts @@ -0,0 +1,30 @@ +type Grouper = (obj: T) => string | number + +export function groupBy( + collection: T[], + grouper: keyof T | Grouper +): Record { + const results: Record = {} + + collection.forEach((item) => { + let key: string | number | undefined = undefined + + if (typeof grouper === 'string') { + const suggestedKey = item[grouper] + key = + typeof suggestedKey !== 'string' + ? JSON.stringify(suggestedKey) + : suggestedKey + } else if (grouper instanceof Function) { + key = grouper(item) + } + + if (key === undefined) { + return + } + + results[key] = [...(results[key] ?? []), item] + }) + + return results +} diff --git a/packages/core/src/utils/has-properties.ts b/packages/core/src/utils/has-properties.ts new file mode 100644 index 000000000..ba9a946b0 --- /dev/null +++ b/packages/core/src/utils/has-properties.ts @@ -0,0 +1,7 @@ +export function hasProperties( + obj: T, + ...keys: K[] +): obj is T & { [J in K]: unknown } { + // eslint-disable-next-line no-prototype-builtins + return !!obj && keys.every((key) => obj.hasOwnProperty(key)) +} diff --git a/packages/core/src/utils/is-thenable.ts b/packages/core/src/utils/is-thenable.ts new file mode 100644 index 000000000..fc85d3291 --- /dev/null +++ b/packages/core/src/utils/is-thenable.ts @@ -0,0 +1,9 @@ +/** + * Check if thenable + * (instanceof Promise doesn't respect realms) + */ +export const isThenable = (value: unknown): boolean => + typeof value === 'object' && + value !== null && + 'then' in value && + typeof (value as any).then === 'function' diff --git a/packages/core/src/utils/p-while.ts b/packages/core/src/utils/p-while.ts new file mode 100644 index 000000000..c28095658 --- /dev/null +++ b/packages/core/src/utils/p-while.ts @@ -0,0 +1,12 @@ +export const pWhile = async ( + condition: (value: T | undefined) => boolean, + action: () => T | PromiseLike +): Promise => { + const loop = async (actionResult: T | undefined): Promise => { + if (condition(actionResult)) { + return loop(await action()) + } + } + + return loop(undefined) +} diff --git a/packages/core/src/utils/to-facade.ts b/packages/core/src/utils/to-facade.ts new file mode 100644 index 000000000..97e7e2d31 --- /dev/null +++ b/packages/core/src/utils/to-facade.ts @@ -0,0 +1,53 @@ +import { + Alias, + Facade, + Group, + Identify, + Options, + Page, + Screen, + Track, +} from '@segment/facade' +import { CoreSegmentEvent } from '../events' + +export type SegmentFacade = Facade & { + obj: CoreSegmentEvent +} + +export function toFacade( + evt: CoreSegmentEvent, + options?: Options +): SegmentFacade { + let fcd = new Facade(evt, options) + + if (evt.type === 'track') { + fcd = new Track(evt, options) + } + + if (evt.type === 'identify') { + fcd = new Identify(evt, options) + } + + if (evt.type === 'page') { + fcd = new Page(evt, options) + } + + if (evt.type === 'alias') { + fcd = new Alias(evt, options) + } + + if (evt.type === 'group') { + fcd = new Group(evt, options) + } + + if (evt.type === 'screen') { + fcd = new Screen(evt, options) + } + + Object.defineProperty(fcd, 'obj', { + value: evt, + writable: true, + }) + + return fcd as SegmentFacade +} diff --git a/packages/core/src/validation/index.ts b/packages/core/src/validation/index.ts new file mode 100644 index 000000000..013a4d0d2 --- /dev/null +++ b/packages/core/src/validation/index.ts @@ -0,0 +1,27 @@ +import { CoreSegmentEvent } from '../events' + +export function isString(obj: unknown): obj is string { + return typeof obj === 'string' +} + +export function isNumber(obj: unknown): obj is number { + return typeof obj === 'number' +} + +export function isFunction(obj: unknown): obj is Function { + return typeof obj === 'function' +} + +export function isPlainObject( + obj: unknown +): obj is Record { + return ( + Object.prototype.toString.call(obj).slice(8, -1).toLowerCase() === 'object' + ) +} + +export function hasUser(event: CoreSegmentEvent): boolean { + const id = + event.userId ?? event.anonymousId ?? event.groupId ?? event.previousId + return isString(id) +} diff --git a/packages/core/tsconfig.build.json b/packages/core/tsconfig.build.json index 235d92029..19eb743ae 100644 --- a/packages/core/tsconfig.build.json +++ b/packages/core/tsconfig.build.json @@ -3,7 +3,7 @@ "include": ["src"], "exclude": ["**/__tests__/**", "**/*.test.*"], "compilerOptions": { - "incremental": false, + "incremental": true, "outDir": "./dist/esm", "importHelpers": true, // publish sourceMaps diff --git a/packages/core/tsconfig.json b/packages/core/tsconfig.json index 9936980ac..4e403a914 100644 --- a/packages/core/tsconfig.json +++ b/packages/core/tsconfig.json @@ -5,6 +5,7 @@ "module": "esnext", "target": "ES5", "moduleResolution": "node", + "resolveJsonModule": true, "lib": ["es2020"] } } diff --git a/packages/node/.eslintrc.js b/packages/node/.eslintrc.js index ac6dfcd26..a88041a34 100644 --- a/packages/node/.eslintrc.js +++ b/packages/node/.eslintrc.js @@ -3,5 +3,8 @@ module.exports = { extends: ["../../.eslintrc"], env: { node: true, + }, + "rules": { + "@typescript-eslint/no-empty-interface": "off" // since this is a lib, sometimes we want to use interfaces rather than types for the ease of declaration merging. } } diff --git a/packages/node/README.md b/packages/node/README.md index f87f5c14c..5f8ecc4bf 100644 --- a/packages/node/README.md +++ b/packages/node/README.md @@ -1 +1,26 @@ -# TODO \ No newline at end of file +# TODO: API Documentation is out of date + +https://segment.com/docs/connections/sources/catalog/libraries/server/node/ + + +NOTE: @segment/analytics-node is unstable! do not use. + + +```ts +// TODO: finalize API +import { load } from '@segment/analytics-node' + + +// some file +export const ajsPromise = load({ writeKey: 'abc123' }).then(([ajs]) => ajs) + +// some other file +import { ajsPromise } from '../analytics' + +router.post('/user', async (req, res) => { + ajsPromise.then((ajs) => ajs.track(user.id, "some event")) + res.json(user) +}) + + +``` diff --git a/packages/node/jest.config.js b/packages/node/jest.config.js index 38155dd11..43505965a 100644 --- a/packages/node/jest.config.js +++ b/packages/node/jest.config.js @@ -1,17 +1,3 @@ -module.exports = { - preset: 'ts-jest', - modulePathIgnorePatterns: [ - '/dist/', - ], - testEnvironment: 'node', - testMatch: ["**/?(*.)+(test).[jt]s?(x)"], - clearMocks: true, - moduleNameMapper: { - '@/(.+)': '/../../src/$1', - }, - globals: { - 'ts-jest': { - isolatedModules: true, - }, - }, -} +const { createJestTSConfig } = require('@internal/config') + +module.exports = createJestTSConfig() diff --git a/packages/node/package.json b/packages/node/package.json index fe86108ad..fd5e4ccc7 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -1,6 +1,6 @@ { "name": "@segment/analytics-node", - "version": "1.0.0", + "version": "0.0.0", "private": true, "main": "./dist/cjs/index.js", "module": "./dist/esm/index.js", @@ -12,7 +12,8 @@ "files": [ "dist/", "src/", - "!**/__tests__/**" + "!**/__tests__/**", + "!*.tsbuildinfo" ], "engines": { "node": ">=12" @@ -20,10 +21,10 @@ "scripts": { "test": "yarn jest", "lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'", - "build": "rm -rf dist && yarn concurrently 'yarn:build:*'", + "build": "yarn concurrently 'yarn:build:*'", "build:cjs": "yarn tsc -p tsconfig.build.json --outDir ./dist/cjs --module commonjs", "build:esm": "yarn tsc -p tsconfig.build.json", - "watch": "yarn concurrently 'yarn:build:cjs --watch' 'yarn:build:esm --watch'", + "watch": "yarn build:esm --watch --incremental", "watch:test": "yarn test --watch", "tsc": "yarn run -T tsc", "eslint": "yarn run -T eslint", @@ -31,10 +32,13 @@ "jest": "yarn run -T jest" }, "dependencies": { + "@segment/analytics-core": "1.0.1", + "@segment/analytics-plugin-validation": "0.0.0", "node-fetch": "^2.6.7", "tslib": "^2.4.0" }, "devDependencies": { + "@internal/config": "0.0.0", "@types/node": "^12.12.14" }, "packageManager": "yarn@3.2.1" diff --git a/packages/node/src/__tests__/example.test.ts b/packages/node/src/__tests__/example.test.ts deleted file mode 100644 index 176912dcb..000000000 --- a/packages/node/src/__tests__/example.test.ts +++ /dev/null @@ -1,5 +0,0 @@ -describe('example', () => { - it('should work', () => { - expect(true).toBeTruthy() - }) -}) diff --git a/packages/node/src/__tests__/integration.test.ts b/packages/node/src/__tests__/integration.test.ts new file mode 100644 index 000000000..84fd38f7e --- /dev/null +++ b/packages/node/src/__tests__/integration.test.ts @@ -0,0 +1,267 @@ +import { load, AnalyticsNode } from '../index' + +import { CorePlugin as Plugin } from '@segment/analytics-core' +const writeKey = 'foo' + +const testPlugin: Plugin = { + isLoaded: jest.fn().mockReturnValue(true), + load: jest.fn().mockResolvedValue(undefined), + unload: jest.fn().mockResolvedValue(undefined), + name: 'Test Plugin', + type: 'destination', + version: '0.1.0', + alias: jest.fn((ctx) => Promise.resolve(ctx)), + group: jest.fn((ctx) => Promise.resolve(ctx)), + identify: jest.fn((ctx) => Promise.resolve(ctx)), + page: jest.fn((ctx) => Promise.resolve(ctx)), + screen: jest.fn((ctx) => Promise.resolve(ctx)), + track: jest.fn((ctx) => Promise.resolve(ctx)), +} + +describe('AnalyticsNode', () => { + describe('Initialization', () => { + it('loads analytics-node-next plugin', async () => { + const [analytics] = await load({ + writeKey, + }) + + expect((analytics as any as AnalyticsNode).queue.plugins.length).toBe(2) + + const ajsNodeXt = (analytics as any as AnalyticsNode).queue.plugins.find( + (xt) => xt.name === 'analytics-node-next' + ) + expect(ajsNodeXt).toBeDefined() + expect(ajsNodeXt?.isLoaded()).toBeTruthy() + }) + }) + + describe('alias', () => { + it('generates alias events', async () => { + const [analytics] = await load({ writeKey }) + + const ctx = await analytics.alias('chris radek', 'chris') + + expect(ctx.event.userId).toEqual('chris radek') + expect(ctx.event.previousId).toEqual('chris') + expect(ctx.event.anonymousId).toBeUndefined() + }) + + it('populates anonymousId if provided', async () => { + const [analytics] = await load({ writeKey }) + + const ctx = await analytics.alias('chris radek', 'chris', { + anonymousId: 'foo', + }) + + expect(ctx.event.userId).toEqual('chris radek') + expect(ctx.event.previousId).toEqual('chris') + expect(ctx.event.anonymousId).toEqual('foo') + }) + }) + + describe('group', () => { + it('generates group events', async () => { + const [analytics] = await load({ writeKey }) + + const ctx = await analytics.group( + 'coolKids', + { coolKids: true }, + { + userId: 'foo', + } + ) + + expect(ctx.event.groupId).toEqual('coolKids') + expect(ctx.event.traits).toEqual({ coolKids: true }) + expect(ctx.event.userId).toEqual('foo') + expect(ctx.event.anonymousId).toBeUndefined() + }) + + it('invocations are isolated', async () => { + const [analytics] = await load({ writeKey }) + + const ctx1 = await analytics.group( + 'coolKids', + { foo: 'foo' }, + { + anonymousId: 'unknown', + } + ) + + const ctx2 = await analytics.group( + 'coolKids', + { bar: 'bar' }, + { + userId: 'me', + } + ) + + expect(ctx1.event.traits).toEqual({ foo: 'foo' }) + expect(ctx1.event.anonymousId).toEqual('unknown') + expect(ctx1.event.userId).toBeUndefined() + + expect(ctx2.event.traits).toEqual({ bar: 'bar' }) + expect(ctx2.event.anonymousId).toBeUndefined() + expect(ctx2.event.userId).toEqual('me') + }) + }) + + describe('identify', () => { + it('generates identify events', async () => { + const [analytics] = await load({ writeKey }) + + const ctx1 = await analytics.identify('user-id', { + name: 'Chris Radek', + }) + + expect(ctx1.event.userId).toEqual('user-id') + expect(ctx1.event.anonymousId).toBeUndefined() + expect(ctx1.event.traits).toEqual({ name: 'Chris Radek' }) + + const ctx2 = await analytics.identify( + 'user-id', + {}, + { + anonymousId: 'unknown', + } + ) + + expect(ctx2.event.userId).toEqual('user-id') + expect(ctx2.event.anonymousId).toEqual('unknown') + expect(ctx2.event.traits).toEqual({}) + }) + }) + + describe('page', () => { + it('generates page events', async () => { + const [analytics] = await load({ writeKey }) + + const category = 'Docs' + const name = 'How to write a test' + + const ctx1 = await analytics.page( + category, + name, + {}, + { anonymousId: 'unknown' } + ) + + expect(ctx1.event.type).toEqual('page') + expect(ctx1.event.name).toEqual(name) + expect(ctx1.event.anonymousId).toEqual('unknown') + expect(ctx1.event.userId).toBeUndefined() + expect(ctx1.event.properties).toEqual({ category }) + + const ctx2 = await analytics.page( + name, + { title: 'wip' }, + { userId: 'user-id' } + ) + + expect(ctx2.event.type).toEqual('page') + expect(ctx2.event.name).toEqual(name) + expect(ctx2.event.anonymousId).toBeUndefined() + expect(ctx2.event.userId).toEqual('user-id') + expect(ctx2.event.properties).toEqual({ title: 'wip' }) + + const ctx3 = await analytics.page( + { title: 'invisible' }, + { userId: 'user-id' } + ) + + expect(ctx3.event.type).toEqual('page') + expect(ctx3.event.name).toBeUndefined() + expect(ctx3.event.anonymousId).toBeUndefined() + expect(ctx3.event.userId).toEqual('user-id') + expect(ctx3.event.properties).toEqual({ title: 'invisible' }) + }) + }) + + describe('screen', () => { + it('generates screen events', async () => { + const [analytics] = await load({ writeKey }) + + const name = 'Home Screen' + + const ctx1 = await analytics.screen( + name, + { title: 'wip' }, + { userId: 'user-id' } + ) + + expect(ctx1.event.type).toEqual('screen') + expect(ctx1.event.name).toEqual(name) + expect(ctx1.event.anonymousId).toBeUndefined() + expect(ctx1.event.userId).toEqual('user-id') + expect(ctx1.event.properties).toEqual({ title: 'wip' }) + + const ctx2 = await analytics.screen( + { title: 'invisible' }, + { userId: 'user-id' } + ) + + expect(ctx2.event.type).toEqual('screen') + expect(ctx2.event.name).toBeUndefined() + expect(ctx2.event.anonymousId).toBeUndefined() + expect(ctx2.event.userId).toEqual('user-id') + expect(ctx2.event.properties).toEqual({ title: 'invisible' }) + }) + }) + + describe('track', () => { + it('generates track events', async () => { + const [analytics] = await load({ writeKey }) + + const eventName = 'Test Event' + + const ctx1 = await analytics.track( + eventName, + {}, + { + anonymousId: 'unknown', + userId: 'known', + } + ) + + expect(ctx1.event.type).toEqual('track') + expect(ctx1.event.event).toEqual(eventName) + expect(ctx1.event.properties).toEqual({}) + expect(ctx1.event.anonymousId).toEqual('unknown') + expect(ctx1.event.userId).toEqual('known') + + const ctx2 = await analytics.track( + eventName, + { foo: 'bar' }, + { + userId: 'known', + } + ) + + expect(ctx2.event.type).toEqual('track') + expect(ctx2.event.event).toEqual(eventName) + expect(ctx2.event.properties).toEqual({ foo: 'bar' }) + expect(ctx2.event.anonymousId).toBeUndefined() + expect(ctx2.event.userId).toEqual('known') + }) + }) + + describe('register', () => { + it('registers a plugin', async () => { + const [analytics] = await load({ writeKey }) + + await analytics.register(testPlugin) + + expect(testPlugin.load).toHaveBeenCalledTimes(1) + }) + }) + + describe('deregister', () => { + it('deregisters a plugin given its name', async () => { + const [analytics] = await load({ writeKey }) + await analytics.register(testPlugin) + + await analytics.deregister(testPlugin.name) + expect(testPlugin.unload).toHaveBeenCalledTimes(1) + }) + }) +}) diff --git a/packages/node/src/__tests__/more-integration.test.ts b/packages/node/src/__tests__/more-integration.test.ts new file mode 100644 index 000000000..4929f0022 --- /dev/null +++ b/packages/node/src/__tests__/more-integration.test.ts @@ -0,0 +1,80 @@ +// I found both of these test files, and there doesn't seem to be a difference. + +const fetcher = jest.fn() +jest.mock('node-fetch', () => fetcher) + +import { load, AnalyticsNode } from '..' + +const myDate = new Date('2016') +const _Date = Date + +describe('Analytics Node', () => { + let ajs: AnalyticsNode + + beforeEach(async () => { + jest.resetAllMocks() + + const [analytics] = await load({ + writeKey: 'abc123', + }) + + ajs = analytics + + // @ts-ignore + global.Date = jest.fn(() => myDate) + /* eslint-disable @typescript-eslint/unbound-method */ + global.Date.UTC = _Date.UTC + global.Date.parse = _Date.parse + global.Date.now = _Date.now + }) + + test('fireEvent instantiates the right event types', async () => { + await ajs.identify('identify') + expect(fetcher).toHaveBeenCalledWith( + 'https://api.segment.io/v1/identify', + expect.anything() + ) + + await ajs.track( + 'track', + {}, + { + anonymousId: 'foo', + } + ) + expect(fetcher).toHaveBeenCalledWith( + 'https://api.segment.io/v1/track', + expect.anything() + ) + + await ajs.page( + 'page', + {}, + { + anonymousId: 'foo', + } + ) + expect(fetcher).toHaveBeenCalledWith( + 'https://api.segment.io/v1/page', + expect.anything() + ) + + await ajs.group('group', {}, { anonymousId: 'foo' }) + expect(fetcher).toHaveBeenCalledWith( + 'https://api.segment.io/v1/group', + expect.anything() + ) + + await ajs.alias('alias', 'previous') + expect(fetcher).toHaveBeenCalledWith( + 'https://api.segment.io/v1/alias', + expect.anything() + ) + + await ajs.screen('screen', {}, { anonymousId: 'foo' }) + expect(fetcher).toHaveBeenCalledWith( + 'https://api.segment.io/v1/screen', + expect.anything() + ) + }) +}) diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts new file mode 100644 index 000000000..1294a4421 --- /dev/null +++ b/packages/node/src/app/analytics-node.ts @@ -0,0 +1,345 @@ +import { + EventProperties, + Traits, + Emitter, + CoreAnalytics, + CoreContext, + CorePlugin, + EventFactory, + EventQueue, + dispatch, + resolvePageArguments, + PageParams, + Integrations, + CoreOptions, + Callback, + CoreSegmentEvent, + bindAll, +} from '@segment/analytics-core' + +import { version } from '../../package.json' + +/** create a derived class since we may want to add node specific things to Context later */ +export class NodeContext extends CoreContext {} + +export type NodeSegmentEventOptions = CoreOptions & Identity + +export type Identity = + | { userId: string; anonymousId?: string } + | { userId?: string; anonymousId: string } + +type NodeSegmentEventType = 'track' | 'page' | 'identify' | 'alias' | 'screen' + +export interface NodeSegmentEvent extends CoreSegmentEvent { + type: NodeSegmentEventType + options?: NodeSegmentEventOptions +} + +export interface AnalyticsSettings { + writeKey: string + timeout?: number + plugins?: CorePlugin[] +} + +export interface InitOptions { + integrations?: Integrations + retryQueue?: boolean +} + +export class AnalyticsNode extends Emitter implements CoreAnalytics { + private eventFactory: EventFactory + protected settings: AnalyticsSettings + integrations: Integrations + options: InitOptions + queue: EventQueue + get VERSION() { + return version + } + constructor( + settings: AnalyticsSettings, + options: InitOptions, + queue: EventQueue + ) { + super() + this.settings = settings + this.eventFactory = new EventFactory() + this.integrations = options?.integrations ?? {} + this.options = options ?? {} + this.queue = queue + bindAll(this) + } + + /** + * Combines two unassociated user identities. + * @param userId - The new user id you want to associate with the user. + * @param previousId - The previous id that the user was recognized by. + * @param options + * @param callback + */ + alias( + userId: string, + previousId: string, + options?: NodeSegmentEventOptions, + callback?: Callback + ): Promise { + const segmentEvent = this.eventFactory.alias( + userId, + previousId, + options, + this.integrations + ) + return dispatch(segmentEvent, this.queue, this, { + callback: callback, + retryQueue: this.options.retryQueue, + }) + .then((ctx) => { + this.emit('alias', userId, previousId, ctx.event.options) + return ctx + }) + .catch((ctx) => ctx) + } + + /** + * Associates an identified user with a collective. + * @param groupId - The group id to associate with the provided user. + * @param traits - A dictionary of traits for the group. + * @param options - A dictionary of options including the user id. + * @param callback + */ + group( + groupId: string, + traits: Traits, + options: NodeSegmentEventOptions, + callback?: Callback + ): Promise { + const segmentEvent = this.eventFactory.group( + groupId, + traits, + options, + this.integrations + ) + + return dispatch(segmentEvent, this.queue, this, { callback }) + .then((ctx) => { + this.emit('group', groupId, ctx.event.traits, ctx.event.options) + return ctx + }) + .catch((ctx) => ctx) + } + + /** + * Includes a unique userId and/or anonymousId and any optional traits you know about them. + * @param userId + * @param traits + * @param options + * @param callback + */ + identify( + userId: string, + traits: Traits = {}, + options?: NodeSegmentEventOptions, + callback?: Callback + ): Promise { + const segmentEvent = this.eventFactory.identify( + userId, + traits, + options, + this.integrations + ) + + return dispatch(segmentEvent, this.queue, this, { callback }) + .then((ctx) => { + this.emit('identify', userId, ctx.event.traits, ctx.event.options) + return ctx + }) + .catch((ctx) => ctx) + } + + /** + * Records page views on your website, along with optional extra information + * about the page viewed by the user. + * @param properties + * @param options + * @param callback + */ + page( + properties: EventProperties, + options: NodeSegmentEventOptions, + callback?: Callback + ): Promise + /** + * Records page views on your website, along with optional extra information + * about the page viewed by the user. + * @param name - The name of the page. + * @param properties - A dictionary of properties of the page. + * @param options + * @param callback + */ + page( + name: string, + properties: EventProperties, + options: NodeSegmentEventOptions, + callback?: Callback + ): Promise + /** + * Records page views on your website, along with optional extra information + * about the page viewed by the user. + * @param category - The category of the page. + * Useful for cases like ecommerce where many pages might live under a single category. + * @param name - The name of the page. + * @param properties - A dictionary of properties of the page. + * @param options + * @param callback + */ + page( + category: string, + name: string, + properties: EventProperties, + options: NodeSegmentEventOptions, + callback?: Callback + ): Promise + + page(...args: PageParams): Promise { + const [category, page, properties, options, callback] = + resolvePageArguments(...args) + + const segmentEvent = this.eventFactory.page( + category, + page, + properties, + options, + this.integrations + ) + + return dispatch(segmentEvent, this.queue, this, { callback }) + .then((ctx) => { + this.emit( + 'page', + category, + page, + ctx.event.properties, + ctx.event.options + ) + return ctx + }) + .catch((ctx) => ctx) + } + + /** + * Records screen views on your app, along with optional extra information + * about the screen viewed by the user. + * @param properties + * @param options + * @param callback + */ + screen( + properties: object, + options: NodeSegmentEventOptions, + callback?: Callback + ): Promise + /** + * Records screen views on your app, along with optional extra information + * about the screen viewed by the user. + * @param name - The name of the screen. + * @param properties + * @param options + * @param callback + */ + screen( + name: string, + properties: object, + options: NodeSegmentEventOptions, + callback?: Callback + ): Promise + + screen(...args: PageParams): Promise { + const [category, page, properties, options, callback] = + resolvePageArguments(...args) + + const segmentEvent = this.eventFactory.screen( + category, + page, + properties, + options, + this.integrations + ) + + return dispatch(segmentEvent, this.queue, this, { callback }) + .then((ctx) => { + this.emit( + 'page', + category, + page, + ctx.event.properties, + ctx.event.options + ) + return ctx + }) + .catch((ctx) => ctx) + } + /** + * Records actions your users perform. + * @param event - The name of the event you're tracking. + * @param properties - A dictionary of properties for the event. + * @param options + * @param callback + */ + track( + event: string, + properties: object, + options: NodeSegmentEventOptions, + callback?: Callback + ): Promise { + const segmentEvent = this.eventFactory.track( + event, + properties as EventProperties, + options, + this.integrations + ) + + return dispatch(segmentEvent, this.queue, this, { + callback, + }) + .then((ctx) => { + this.emit('track', event, ctx.event.properties, ctx.event.options) + return ctx + }) + .catch((ctx) => ctx) + } + + /** + * Registers one or more plugins to augment Analytics functionality. + * @param plugins + */ + async register(...plugins: CorePlugin[]): Promise { + const ctx = NodeContext.system() + + const registrations = plugins.map((xt) => + this.queue.register(ctx, xt, this) + ) + await Promise.all(registrations) + + return ctx + } + + /** + * Deregisters one or more plugins based on their names. + * @param pluginNames - The names of one or more plugins to deregister. + */ + async deregister(...pluginNames: string[]): Promise { + const ctx = CoreContext.system() + + const deregistrations = pluginNames.map(async (pl) => { + const plugin = this.queue.plugins.find((p) => p.name === pl) + if (plugin) { + return this.queue.deregister(ctx, plugin, this) + } else { + ctx.log('warn', `plugin ${pl} not found`) + } + }) + + await Promise.all(deregistrations) + + return ctx + } +} diff --git a/packages/node/src/app/load.ts b/packages/node/src/app/load.ts new file mode 100644 index 000000000..7a778edc8 --- /dev/null +++ b/packages/node/src/app/load.ts @@ -0,0 +1,32 @@ +import { validation } from '@segment/analytics-plugin-validation' +import { PriorityQueue, EventQueue } from '@segment/analytics-core' +import { + AnalyticsNode, + NodeContext, + InitOptions, + AnalyticsSettings, +} from './analytics-node' +import { analyticsNode, AnalyticsNodePluginSettings } from './plugin' + +export async function load( + settings: AnalyticsSettings, + options: InitOptions = {} +): Promise<[AnalyticsNode, NodeContext]> { + const queue = new EventQueue(new PriorityQueue(3, [])) + + const analytics = new AnalyticsNode(settings, options, queue) + + const nodeSettings: AnalyticsNodePluginSettings = { + name: 'analytics-node-next', + type: 'after', + version: 'latest', + writeKey: settings.writeKey, + } + + // TODO: this shouldn't be asnyc??? this is a regression + const ctx = await analytics.register(validation, analyticsNode(nodeSettings)) + + analytics.emit('initialize', settings) + + return [analytics, ctx] +} diff --git a/packages/node/src/app/plugin.ts b/packages/node/src/app/plugin.ts new file mode 100644 index 000000000..ab73b67dd --- /dev/null +++ b/packages/node/src/app/plugin.ts @@ -0,0 +1,67 @@ +import { + CorePlugin, + CoreSegmentEvent, + PluginType, + CoreContext, +} from '@segment/analytics-core' +import fetch from 'node-fetch' +import { version } from '../../package.json' + +export interface AnalyticsNodePluginSettings { + writeKey: string + name: string + type: PluginType + version: string +} + +const btoa = (val: string): string => Buffer.from(val).toString('base64') + +export async function post( + event: CoreSegmentEvent, + writeKey: string +): Promise { + const res = await fetch(`https://api.segment.io/v1/${event.type}`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-node-next/latest', + Authorization: `Basic ${btoa(writeKey)}`, + }, + body: JSON.stringify(event), + }) + + if (!res.ok) { + throw new Error('Message Rejected') + } + + return event +} + +export function analyticsNode( + settings: AnalyticsNodePluginSettings +): CorePlugin { + const send = async (ctx: CoreContext): Promise => { + ctx.updateEvent('context.library.name', 'analytics-node-next') + ctx.updateEvent('context.library.version', version) + ctx.updateEvent('_metadata.nodeVersion', process.versions.node) + + await post(ctx.event, settings.writeKey) + return ctx + } + + return { + name: settings.name, + type: settings.type, + version: settings.version, + + load: (ctx) => Promise.resolve(ctx), + isLoaded: () => true, + + track: send, + identify: send, + page: send, + alias: send, + group: send, + screen: send, + } +} diff --git a/packages/node/src/example.ts b/packages/node/src/example.ts deleted file mode 100644 index e9ee08177..000000000 --- a/packages/node/src/example.ts +++ /dev/null @@ -1 +0,0 @@ -export const foo = (num: number) => console.log(num) diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index 608f8f231..504ee3b8e 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -1,5 +1,6 @@ -import { foo } from './example' -// TODO - -console.log('hello world') -foo(123) +export { load } from './app/load' +export { + AnalyticsNode, + NodeContext, + AnalyticsSettings, +} from './app/analytics-node' diff --git a/packages/node/tsconfig.build.json b/packages/node/tsconfig.build.json index 2e368b3ff..580714a7e 100644 --- a/packages/node/tsconfig.build.json +++ b/packages/node/tsconfig.build.json @@ -3,7 +3,6 @@ "include": ["src"], "exclude": ["**/__tests__/**"], "compilerOptions": { - "incremental": false, "outDir": "./dist/esm", "importHelpers": true, // publish sourceMaps diff --git a/packages/node/tsconfig.json b/packages/node/tsconfig.json index ab91e611d..5b0fc837a 100644 --- a/packages/node/tsconfig.json +++ b/packages/node/tsconfig.json @@ -2,6 +2,7 @@ "extends": "../../tsconfig.json", "exclude": ["node_modules", "dist"], "compilerOptions": { + "resolveJsonModule": true, "module": "esnext", "target": "ES5", "moduleResolution": "node", diff --git a/packages/plugin-validation/.eslintrc.js b/packages/plugin-validation/.eslintrc.js new file mode 100644 index 000000000..b266f16dd --- /dev/null +++ b/packages/plugin-validation/.eslintrc.js @@ -0,0 +1,4 @@ +/** @type { import('eslint').Linter.Config } */ +module.exports = { + extends: ["../../.eslintrc"], +} diff --git a/packages/plugin-validation/.lintstagedrc.js b/packages/plugin-validation/.lintstagedrc.js new file mode 100644 index 000000000..bc1f1c780 --- /dev/null +++ b/packages/plugin-validation/.lintstagedrc.js @@ -0,0 +1 @@ +module.exports = require("@internal/config").lintStagedConfig diff --git a/packages/plugin-validation/LICENSE.md b/packages/plugin-validation/LICENSE.md new file mode 100644 index 000000000..fc7da3d74 --- /dev/null +++ b/packages/plugin-validation/LICENSE.md @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright © 2021 Segment + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/packages/plugin-validation/jest.config.js b/packages/plugin-validation/jest.config.js new file mode 100644 index 000000000..43505965a --- /dev/null +++ b/packages/plugin-validation/jest.config.js @@ -0,0 +1,3 @@ +const { createJestTSConfig } = require('@internal/config') + +module.exports = createJestTSConfig() diff --git a/packages/plugin-validation/package.json b/packages/plugin-validation/package.json new file mode 100644 index 000000000..ad1bfa55a --- /dev/null +++ b/packages/plugin-validation/package.json @@ -0,0 +1,38 @@ +{ + "name": "@segment/analytics-plugin-validation", + "version": "0.0.0", + "repository": { + "type": "git", + "url": "https://github.com/segmentio/analytics-next", + "directory": "packages/plugin-validation" + }, + "license": "MIT", + "main": "./dist/cjs/index.js", + "module": "./dist/esm/index.js", + "types": "./dist/types/index.d.ts", + "files": [ + "dist/", + "src/", + "!**/__tests__/**", + "!*.tsbuildinfo" + ], + "sideEffects": false, + "scripts": { + "test": "yarn jest", + "lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'", + "build": "yarn concurrently 'yarn:build:*'", + "build:esm": "yarn tsc -p tsconfig.build.json", + "build:cjs": "yarn tsc -p tsconfig.build.json --outDir ./dist/cjs --module commonjs", + "watch": "yarn build:esm --watch --incremental", + "watch:test": "yarn test --watch", + "tsc": "yarn run -T tsc", + "eslint": "yarn run -T eslint", + "concurrently": "yarn run -T concurrently", + "jest": "yarn run -T jest" + }, + "packageManager": "yarn@3.2.1", + "dependencies": { + "@segment/analytics-core": "1.0.1", + "tslib": "^2.4.0" + } +} diff --git a/packages/plugin-validation/src/__tests__/index.test.ts b/packages/plugin-validation/src/__tests__/index.test.ts new file mode 100644 index 000000000..b2ad3ae46 --- /dev/null +++ b/packages/plugin-validation/src/__tests__/index.test.ts @@ -0,0 +1,81 @@ +import { validation } from '..' +import { CoreContext, CoreSegmentEvent } from '@segment/analytics-core' + +const validEvent: CoreSegmentEvent = { + type: 'track', + anonymousId: 'abc', + event: 'test', + properties: {}, + traits: {}, +} + +describe('validation', () => { + ;['track', 'identify', 'group', 'page', 'alias'].forEach((method) => { + describe(method, () => { + it('validates that the `event` exists', async () => { + const val = async () => + // @ts-ignore + validation[method]( + // @ts-ignore + new CoreContext() + ) + + await expect(val()).rejects.toMatchInlineSnapshot( + `[Error: Event is missing]` + ) + }) + + it('validates that `event.event` exists', async () => { + const val = async () => + // @ts-ignore + validation[method]( + new CoreContext({ + ...validEvent, + event: undefined, + }) + ) + + if (method === 'track') { + await expect(val()).rejects.toMatchInlineSnapshot( + `[Error: Event is not a string]` + ) + } + }) + + it('validates that `properties` or `traits` are objects', async () => { + if (method === 'alias') { + return + } + const val = async () => + // @ts-ignore + validation[method]( + new CoreContext({ + ...validEvent, + properties: undefined, + traits: undefined, + }) + ) + + await expect(val()).rejects.toMatchInlineSnapshot( + `[Error: properties is not an object]` + ) + }) + + it('validates that it contains an user', async () => { + const val = async () => + // @ts-ignore + validation[method]( + new CoreContext({ + ...validEvent, + userId: undefined, + anonymousId: undefined, + }) + ) + + await expect(val()).rejects.toMatchInlineSnapshot( + `[Error: Missing userId or anonymousId]` + ) + }) + }) + }) +}) diff --git a/packages/plugin-validation/src/index.ts b/packages/plugin-validation/src/index.ts new file mode 100644 index 000000000..bbb89f8c2 --- /dev/null +++ b/packages/plugin-validation/src/index.ts @@ -0,0 +1,60 @@ +import { + CoreContext, + isString, + isPlainObject, + hasUser, + CorePlugin, +} from '@segment/analytics-core' + +class ValidationError extends Error { + field: string + + constructor(field: string, message: string) { + super(message) + this.field = field + } +} + +function validate(ctx: CoreContext): CoreContext { + const eventType: unknown = ctx && ctx.event && ctx.event.type + const event = ctx.event + + if (event === undefined) { + throw new ValidationError('event', 'Event is missing') + } + + if (!isString(eventType)) { + throw new ValidationError('event', 'Event is not a string') + } + + if (eventType === 'track' && !isString(event.event)) { + throw new ValidationError('event', 'Event is not a string') + } + + const props = event.properties ?? event.traits + if (eventType !== 'alias' && !isPlainObject(props)) { + throw new ValidationError('properties', 'properties is not an object') + } + + if (!hasUser(event)) { + throw new ValidationError('userId', 'Missing userId or anonymousId') + } + + return ctx +} + +export const validation: CorePlugin = { + name: 'Event Validation', + type: 'before', + version: '1.0.0', + + isLoaded: () => true, + load: () => Promise.resolve(), + + track: validate, + identify: validate, + page: validate, + alias: validate, + group: validate, + screen: validate, +} diff --git a/packages/plugin-validation/tsconfig.build.json b/packages/plugin-validation/tsconfig.build.json new file mode 100644 index 000000000..19eb743ae --- /dev/null +++ b/packages/plugin-validation/tsconfig.build.json @@ -0,0 +1,17 @@ +{ + "extends": "./tsconfig.json", + "include": ["src"], + "exclude": ["**/__tests__/**", "**/*.test.*"], + "compilerOptions": { + "incremental": true, + "outDir": "./dist/esm", + "importHelpers": true, + // publish sourceMaps + "sourceMap": true, + // publish declarationMaps (enable go-to-definition in IDE) + "declarationMap": true, + // add type declarations to "types" folder + "declaration": true, + "declarationDir": "./dist/types" + } +} diff --git a/packages/plugin-validation/tsconfig.json b/packages/plugin-validation/tsconfig.json new file mode 100644 index 000000000..9936980ac --- /dev/null +++ b/packages/plugin-validation/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../tsconfig.json", + "exclude": ["node_modules", "dist"], + "compilerOptions": { + "module": "esnext", + "target": "ES5", + "moduleResolution": "node", + "lib": ["es2020"] + } +} diff --git a/turbo.json b/turbo.json index 07bbd0eb3..35ad31a22 100644 --- a/turbo.json +++ b/turbo.json @@ -3,29 +3,19 @@ "pipeline": { "build": { "dependsOn": ["^build"], + "inputs": ["src/**/*.tsx", "src/**/*.ts", ":!__tests__/**"], "outputs": ["dist/**", ".next/**"] }, - "build:packages": { - "dependsOn": [ - "@segment/analytics-next#build", - "@segment/analytics-node#build", - "@segment/analytics-core#build" - ], - "outputs": ["dist/**"] - }, "test": { - "dependsOn": ["build:packages"], - "outputs": [], "inputs": ["src/**", "test*/**"] }, "watch": { "cache": false, - "dependsOn": ["build:packages"] + "outputs": ["dist/**"] }, "lint": { - "dependsOn": ["build:packages"], - "outputs": [], - "inputs": ["**/*.ts", "**/*.tsx", "**/*.js"] + "inputs": ["**/*.ts", "**/*.tsx", "**/*.js"], + "outputs": [] } } } diff --git a/yarn.lock b/yarn.lock index 499fb879c..da4c30b62 100644 --- a/yarn.lock +++ b/yarn.lock @@ -939,6 +939,22 @@ __metadata: languageName: node linkType: hard +"@internal/config@0.0.0, @internal/config@workspace:^, @internal/config@workspace:internal/config": + version: 0.0.0-use.local + resolution: "@internal/config@workspace:internal/config" + languageName: unknown + linkType: soft + +"@internal/core-integration-tests@workspace:packages/core-integration-tests": + version: 0.0.0-use.local + resolution: "@internal/core-integration-tests@workspace:packages/core-integration-tests" + dependencies: + "@internal/config": "workspace:^" + "@segment/analytics-core": "workspace:^" + "@segment/analytics-next": "workspace:^" + languageName: unknown + linkType: soft + "@istanbuljs/load-nyc-config@npm:^1.0.0": version: 1.1.0 resolution: "@istanbuljs/load-nyc-config@npm:1.1.0" @@ -1493,10 +1509,12 @@ __metadata: languageName: node linkType: hard -"@segment/analytics-core@1.0.1, @segment/analytics-core@workspace:packages/core": +"@segment/analytics-core@1.0.1, @segment/analytics-core@workspace:^, @segment/analytics-core@workspace:packages/core": version: 0.0.0-use.local resolution: "@segment/analytics-core@workspace:packages/core" dependencies: + "@lukeed/uuid": ^2.0.0 + dset: ^3.1.2 tslib: ^2.4.0 languageName: unknown linkType: soft @@ -1505,6 +1523,7 @@ __metadata: version: 0.0.0-use.local resolution: "@segment/analytics-next@workspace:packages/browser" dependencies: + "@internal/config": 0.0.0 "@lukeed/uuid": ^2.0.0 "@segment/analytics-core": 1.0.1 "@segment/analytics.js-video-plugins": ^0.2.1 @@ -1562,12 +1581,24 @@ __metadata: version: 0.0.0-use.local resolution: "@segment/analytics-node@workspace:packages/node" dependencies: + "@internal/config": 0.0.0 + "@segment/analytics-core": 1.0.1 + "@segment/analytics-plugin-validation": 0.0.0 "@types/node": ^12.12.14 node-fetch: ^2.6.7 tslib: ^2.4.0 languageName: unknown linkType: soft +"@segment/analytics-plugin-validation@0.0.0, @segment/analytics-plugin-validation@workspace:packages/plugin-validation": + version: 0.0.0-use.local + resolution: "@segment/analytics-plugin-validation@workspace:packages/plugin-validation" + dependencies: + "@segment/analytics-core": 1.0.1 + tslib: ^2.4.0 + languageName: unknown + linkType: soft + "@segment/analytics.js-video-plugins@npm:^0.2.1": version: 0.2.1 resolution: "@segment/analytics.js-video-plugins@npm:0.2.1"