Skip to content

Commit

Permalink
Merge pull request #400 from ydb-platform/topic
Browse files Browse the repository at this point in the history
Add topic write retrier, test reader and continues reauth
  • Loading branch information
Alexey Zorkaltsev authored Sep 30, 2024
2 parents b09086f + bc22f6d commit d82ac26
Show file tree
Hide file tree
Showing 48 changed files with 1,506 additions and 786 deletions.
3 changes: 2 additions & 1 deletion .env.dev.sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
YDB_ANONYMOUS_CREDENTIALS=1
YDB_SSL_ROOT_CERTIFICATES_FILE=../slo-tests/playground/data/ydb_certs/ca.pem
YDB_ENDPOINT=grpc://<localhost / fqdn / ip-address>:<2135 / 2136>
YDB_ENDPOINT=grpc://localhost:2136
YDB_LOG_LEVEL=debug
YDB_DETAILED_TRACE_STACK=true
1 change: 1 addition & 0 deletions examples/topic-service-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Due to a problem with a reference to json - TEMPORARY example is as md-file
70 changes: 70 additions & 0 deletions examples/topic-service-example/index.ts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {Driver as YDB} from '../../src';
import {AnonymousAuthService} from "../../src/credentials/anonymous-auth-service";
import {Ydb} from "ydb-sdk-proto";
import {SimpleLogger} from "../../src/logger/simple-logger";
import {Context} from "../../src/context";

require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

async function main() {
const db = new YDB({
endpoint: ENDPOINT,
database: DATABASE,
authService: new AnonymousAuthService(),
logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
});
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
await db.topic.createTopic({
path: 'demoTopic',
consumers: [{
name: 'demo',
}],
});
const writer = await db.topic.createWriter({
path: 'demoTopic',
// producerId: '...', // will be genereted automatically
// messageGroupId: '...' // will be the same as producerId
getLastSeqNo: true, // seqNo will be assigned automatically
});
await writer.sendMessages({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from('Hello, world'),
uncompressedSize: 'Hello, world'.length,
}],
});
const promises = [];
for (let n = 0; n < 4; n++) {
// ((writer as any).innerWriteStream as TopicWriteStreamWithEvents).close(Context.createNew().ctx, new Error('Fake error'));

// await sleep(3000); // TODO:

promises.push(writer.sendMessages({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from(`Message N${n}`),
uncompressedSize: `Message N${n}`.length,
}],
}));
}
await Promise.all(promises);
const reader = await db.topic.createReader(Context.createNew({
timeout: 3000,
}).ctx, {
topicsReadSettings: [{
path: 'demoTopic',
}],
consumer: 'demo',
receiveBufferSizeInBytes: 10_000_000,
});
for await (const message of reader.messages) {
console.info(`Message: ${message.data!.toString()}`);
await message.commit();
}
await reader.close(); // graceful close() - complete when all messages are commited
}

main();
1 change: 1 addition & 0 deletions jest.config.dev.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ module.exports = {
},
testRegex: '(/__tests__/.*|(\\.|/)(test|spec))\\.tsx?$',
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],
noStackTrace: true,
}
6 changes: 3 additions & 3 deletions slo-workload/DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ in the _slo-workload_ folder

### Create the test database

`npx ts-node src/index.ts create grpcs://localhost:2135 local`
`npx ts-node src/index.ts.md create grpcs://localhost:2135 local`

### Run the test - for 5 min

`npx ts-node src/index.ts run grpcs://localhost:2135 local`
`npx ts-node src/index.ts.md run grpcs://localhost:2135 local`

### Clean the baseClean the base

`npx ts-node src/index.ts cleanup grpcs://localhost:2135 local`
`npx ts-node src/index.ts.md cleanup grpcs://localhost:2135 local`

### What to do in case of problems

Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import {initDriver, destroyDriver} from "../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
Expand Down
4 changes: 4 additions & 0 deletions src/__tests__/e2e/query-service/method-execute.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import DiscoveryService from "../../../discovery/discovery-service";
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
Expand All @@ -10,6 +11,8 @@ import {Context} from "../../../context";
import {ctxSymbol} from "../../../query/symbols";
import StatsMode = Ydb.Query.StatsMode;
import ExecMode = Ydb.Query.ExecMode;
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

Expand Down Expand Up @@ -238,6 +241,7 @@ describe('Query.execute()', () => {
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
retrier: new RetryStrategy(new RetryParameters(), logger),
logger,
});
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/query-service/query-service-client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from "../../../driver";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
import * as errors from "../../../errors";
Expand Down
4 changes: 4 additions & 0 deletions src/__tests__/e2e/query-service/rows-conversion.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import DiscoveryService from "../../../discovery/discovery-service";
import {QuerySession, RowType} from "../../../query";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
Expand All @@ -8,6 +9,8 @@ import {Ydb} from "ydb-sdk-proto";
import {getDefaultLogger} from "../../../logger/get-default-logger";
import {ctxSymbol} from "../../../query/symbols";
import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

Expand Down Expand Up @@ -155,6 +158,7 @@ describe('Rows conversion', () => {
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
retrier: new RetryStrategy(new RetryParameters(), logger),
logger,
});

Expand Down
5 changes: 4 additions & 1 deletion src/__tests__/e2e/query-service/transactions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
import DiscoveryService from "../../../discovery/discovery-service";
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
Expand All @@ -7,6 +8,8 @@ import * as symbols from "../../../query/symbols";
import {getDefaultLogger} from "../../../logger/get-default-logger";
import {ctxSymbol} from "../../../query/symbols";
import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

Expand Down Expand Up @@ -122,8 +125,8 @@ describe('Query service transactions', () => {
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
retrier: new RetryStrategy(new RetryParameters(), logger),
logger,

});

await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
Expand Down
19 changes: 12 additions & 7 deletions src/__tests__/e2e/retries.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
Unavailable,
Undetermined,
YdbError,
// ExternalError // TODO: Add test for this error
} from '../../errors';
import {retryable, RetryParameters} from '../../retries_obsoleted';
import {Endpoint} from "../../discovery";
Expand All @@ -26,12 +27,14 @@ import {LogLevel, SimpleLogger} from "../../logger/simple-logger";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const MAX_RETRIES = 3;

const logger = new SimpleLogger({level: LogLevel.error});
class ErrorThrower {
constructor(public endpoint: Endpoint) {}

@retryable(
new RetryParameters({maxRetries: 3, backoffCeiling: 3, backoffSlotDuration: 5}),
new RetryParameters({maxRetries: MAX_RETRIES, backoffCeiling: 3, backoffSlotDuration: 5}),
logger,
)
@pessimizable
Expand All @@ -40,6 +43,7 @@ class ErrorThrower {
}
}

// TODO: Remake for new retry policy - no attempts limit, only optional timeout
describe('Retries on errors', () => {
let driver: Driver;

Expand Down Expand Up @@ -71,20 +75,21 @@ describe('Retries on errors', () => {

createError(BadRequest);
createError(InternalError);
createError(Aborted, 3); // have retries
createError(Aborted, MAX_RETRIES); // have retries
createError(Unauthenticated);
createError(Unauthorized);
createError(Unavailable, 3); // have retries
createError(Unavailable, MAX_RETRIES); // have retries
createError(Undetermined); // TODO: have retries for idempotent queries
createError(Overloaded, 3); // have retries
// createError(ExternalError); // TODO: have retries for idempotent queries
createError(Overloaded, MAX_RETRIES); // have retries
createError(SchemeError);
createError(GenericError);
createError(Timeout); // TODO: have retries for idempotent queries
createError(BadSession); // WHY?
createError(PreconditionFailed);
// Transport/Client errors
createError(TransportUnavailable, 3); // TODO: have retries for idempotent queries, BUT now always have retries
createError(ClientResourceExhausted, 3);
createError(ClientDeadlineExceeded, 3);
createError(TransportUnavailable, MAX_RETRIES); // TODO: have retries for idempotent queries, BUT now always have retries
createError(ClientResourceExhausted, MAX_RETRIES);
createError(ClientDeadlineExceeded, MAX_RETRIES);
// TODO: Add EXTERNAL ERROR
});
3 changes: 2 additions & 1 deletion src/__tests__/e2e/table-service/alter-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import { Types } from '../../../types';
import {
Expand Down Expand Up @@ -128,7 +129,7 @@ describe('Alter table', () => {
alterTableDescription.addIndexes = [idxOverTestBool];

await session.alterTable(TABLE_NAME, alterTableDescription);
await new Promise((resolve) => setTimeout(resolve, 200)); // wait 200ms
await new Promise((resolve) => setTimeout(resolve, 1000)); // wait 1000ms
const alteredTableDescription = await session.describeTable(TABLE_NAME);

expect(JSON.stringify(alteredTableDescription.indexes)).toBe(
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/bulk-upsert.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import {Ydb} from 'ydb-sdk-proto';
import Driver from '../../../driver';
import {TableSession} from "../../../table";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {declareType, TypedData, Types} from '../../../types';
import {withRetries} from '../../../retries_obsoleted';
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/create-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {TypedValues, Types} from '../../../types';
import Long from 'long';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import http from 'http';
import Driver from "../../../driver";

Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/read-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {TypedValues, TypedData} from '../../../types';
import {ReadTableSettings, TableSession} from "../../../table";
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/scan-query.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {TypedData} from '../../../types';
import {TableSession} from "../../../table";
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/types.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Long from 'long';
import {google, Ydb} from 'ydb-sdk-proto';
import Driver from '../../../driver';
Expand Down
Loading

0 comments on commit d82ac26

Please sign in to comment.