Skip to content

Commit

Permalink
feat(clickhouse): make telementry send interval configurable (#40)
Browse files Browse the repository at this point in the history
Co-authored-by: kseniyakuzina <[email protected]>
  • Loading branch information
kseniya57 and kseniyakuzina authored Oct 16, 2023
1 parent 230911a commit cd4a9eb
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ To configure standard telemetry it is enough to provide following option:
, other options:
`appTelemetryChPort` – default 8443
`appTelemetryChTables` – object with description of custom tables (see explanation below)
`appTelemetryChSendInterval` – interval sending batch requests in milliseconds (default 3s)
`appTelemetryChBatchSize` – count of rows to send within a batch requests (default 30)
`appTelemetryChBacklogSize` – queue size (default 500)

Expand Down
9 changes: 8 additions & 1 deletion src/lib/telemetry/clickhouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import axios, {AxiosError} from 'axios';

import {Dict, TelemetryClickhouseTableDescription} from '../../types';
import type {AppContext} from '../context';
import {DEFAULT_BACKLOG_SIZE, DEFAULT_BATCH_SIZE, prepareBatchedQueue} from '../utils/batch';
import {
DEFAULT_BACKLOG_SIZE,
DEFAULT_BATCH_SIZE,
DEFAULT_TICK_INTERVAL,
prepareBatchedQueue,
} from '../utils/batch';

function escape(input = '') {
return input.replace(/\\/g, '\\').replace(/'/g, "\\'");
Expand Down Expand Up @@ -52,6 +57,7 @@ export function prepareClickhouseClient(ctx: Pick<AppContext, 'config' | 'log' |
const dbName = config.appTelemetryChDatabase;
const tables = Object.assign({}, DEFAULT_TABLES, config.appTelemetryChTables);

const tickInterval = config.appTelemetryChSendInterval || DEFAULT_TICK_INTERVAL;
const batchSize = config.appTelemetryChBatchSize || DEFAULT_BATCH_SIZE;
const backlogSize = config.appTelemetryChBacklogSize || DEFAULT_BACKLOG_SIZE;

Expand Down Expand Up @@ -131,6 +137,7 @@ export function prepareClickhouseClient(ctx: Pick<AppContext, 'config' | 'log' |
}
ctx.logError(message, error, extra);
},
tickInterval,
backlogSize,
batchSize,
});
Expand Down
6 changes: 4 additions & 2 deletions src/lib/utils/batch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type {AppContext} from '../context';

export const TICK_INTERVAL = 3000;
export const DEFAULT_TICK_INTERVAL = 3000;

export const DEFAULT_BACKLOG_SIZE = 500;
export const DEFAULT_BATCH_SIZE = 50;
Expand All @@ -9,6 +9,7 @@ export const DEFAULT_RETRIES_NUMBER = 3;
interface BatchedQueueArgs {
fn: Function;
logError?: AppContext['logError'];
tickInterval?: number;
backlogSize?: number;
batchSize?: number;
retriesNumber?: number;
Expand All @@ -28,6 +29,7 @@ export function prepareBatchedQueue({
fn,
// eslint-disable-next-line no-console
logError = console.error,
tickInterval = DEFAULT_TICK_INTERVAL,
backlogSize = DEFAULT_BACKLOG_SIZE,
batchSize = DEFAULT_BATCH_SIZE,
retriesNumber = DEFAULT_RETRIES_NUMBER,
Expand Down Expand Up @@ -81,7 +83,7 @@ export function prepareBatchedQueue({
const tickIntervalTimer = setInterval(() => {
cleanup();
send();
}, TICK_INTERVAL);
}, tickInterval);
process.on('SIGTERM', () => clearInterval(tickIntervalTimer));

function getBacklogSize() {
Expand Down
18 changes: 9 additions & 9 deletions src/tests/batch.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

import {TICK_INTERVAL, prepareBatchedQueue} from '../lib/utils/batch';
import {DEFAULT_TICK_INTERVAL, prepareBatchedQueue} from '../lib/utils/batch';

jest.useFakeTimers({legacyFakeTimers: true});

Expand Down Expand Up @@ -43,7 +43,7 @@ it('successfully sends payloads', async () => {

messages.forEach(push);

jest.advanceTimersByTime(TICK_INTERVAL);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL);

expect(sum(getSent())).toEqual(correctSum);
});
Expand All @@ -61,15 +61,15 @@ it('correctly manages backlog of messages', async () => {
messages.forEach(push);
expect(getBacklogSize()).toEqual(150);

jest.advanceTimersByTime(TICK_INTERVAL);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL);
expect(getBacklogSize()).toEqual(100);
expect(sum(getSent())).toEqual(50);

jest.advanceTimersByTime(TICK_INTERVAL);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL);
expect(getBacklogSize()).toEqual(50);
expect(sum(getSent())).toEqual(100);

jest.advanceTimersByTime(TICK_INTERVAL);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL);
expect(getBacklogSize()).toEqual(0);
expect(sum(getSent())).toEqual(150);
});
Expand All @@ -84,14 +84,14 @@ it('retries to send failed payloads', async () => {
setSendToFail();
messages.forEach(push);

jest.advanceTimersByTime(TICK_INTERVAL);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL);
expect(getSent().length).toEqual(0);

// advanceTimersByTime не работает с промисами
await flushPromises();

setSendToSucceed();
jest.advanceTimersByTime(TICK_INTERVAL);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL);
expect(sum(getSent())).toEqual(correctSum);
});

Expand All @@ -104,7 +104,7 @@ it('does not retry more than three times', async () => {
setSendToFail();
messages.forEach(push);

jest.advanceTimersByTime(TICK_INTERVAL * 5);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL * 5);
expect(getSent().length).toEqual(0);
expect(getBacklogSize()).toEqual(0);
});
Expand All @@ -123,6 +123,6 @@ it('does not overflow backlog', async () => {
messages.forEach(push);

// Ждем подольше, чтобы успели отработать все батчи
jest.advanceTimersByTime(TICK_INTERVAL * 5);
jest.advanceTimersByTime(DEFAULT_TICK_INTERVAL * 5);
expect(sum(getSent())).toEqual(LIMITED_BACKLOG_SIZE);
});
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export interface AppConfig {
appTelemetryChAuth?: string;
appTelemetryChDatabase?: string;
appTelemetryChTables?: {[name: string]: {[name: string]: 'number' | 'string' | 'timestamp'}};
appTelemetryChSendInterval?: number;
appTelemetryChBatchSize?: number;
appTelemetryChBacklogSize?: number;
appTelemetryChMirrorToLogs?: boolean;
Expand Down

0 comments on commit cd4a9eb

Please sign in to comment.