Skip to content

Commit

Permalink
Validation support for properties kafka file
Browse files Browse the repository at this point in the history
Fixes jlandersen#152

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and fbricon committed Apr 15, 2021
1 parent 9e18baa commit 161f21a
Show file tree
Hide file tree
Showing 20 changed files with 1,172 additions and 167 deletions.
8 changes: 8 additions & 0 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ Completion is available for

![Topic completion](assets/kafka-file-consumer-topic-completion.png)

#### Validation

Validation will help you write valid consumers in .kafka files.

Here is an example of topic validation:

![Topic syntax validation](assets/kafka-file-consumer-topic-syntax-validation.png)

### Start Consumer command

![Start Consumer from command palette](assets/start-consumer-from-command.png)
Expand Down
8 changes: 8 additions & 0 deletions docs/Producing.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ Completion is available for

![Topic completion](assets/kafka-file-producer-topic-completion.png)

### Validation

Validation will help you write valid producers in .kafka files.

Here is an example of value validation:

![Empty value](assets/kafka-file-producer-empty-value-validation.png)

## Randomized content

Record content can be randomized by injecting mustache-like placeholders of [faker.js properties](https://github.com/Marak/faker.js#api-methods), like ``{{name.lastName}}`` or ``{{random.number}}``. Some randomized properties can be localized via the `kafka.producers.fakerjs.locale` setting.
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 3 additions & 20 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import * as vscode from "vscode";

import { pickClient, pickConsumerGroupId, pickTopic } from "./common";
import { ConsumerCollection, ClientAccessor, createConsumerUri, ConsumerInfoUri, parsePartitions, ConsumerLaunchState } from "../client";
import { ConsumerCollection, ClientAccessor, createConsumerUri, ConsumerInfoUri, ConsumerLaunchState } from "../client";
import { KafkaExplorer } from "../explorer";
import { ConsumerVirtualTextDocumentProvider } from "../providers";
import { ProgressLocation, window } from "vscode";
import { getErrorMessage } from "../errors";
import { ConsumerValidator } from "../validators/consumer";

export interface LaunchConsumerCommand extends ConsumerInfoUri {

Expand Down Expand Up @@ -63,8 +64,7 @@ abstract class LaunchConsumerCommandHandler {
}

// Validate start command
validatePartitions(command.partitions);
validateOffset(command.fromOffset);
ConsumerValidator.validate(command);

// Open the document which tracks consumer messages.
const consumeUri = createConsumerUri(command);
Expand Down Expand Up @@ -112,23 +112,6 @@ export class StopConsumerCommandHandler extends LaunchConsumerCommandHandler {
}
}

function validatePartitions(partitions?: string) {
if (!partitions) {
return;
}
parsePartitions(partitions);
}

function validateOffset(offset?: string) {
if (!offset || offset === 'earliest' || offset === 'latest') {
return;
}
const valueAsNumber = parseInt(offset, 10);
if (isNaN(valueAsNumber) || valueAsNumber < 0) {
throw new Error(`from must be a positive number or equal to 'earliest' or 'latest'.`);
}
}

export class ToggleConsumerCommandHandler {

public static commandId = 'vscode-kafka.consumer.toggle';
Expand Down
79 changes: 44 additions & 35 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { pickClient } from "./common";
import { MessageFormat, serialize } from "../client/serialization";
import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer";
import { ProducerRecord } from "kafkajs";
import { ProducerValidator } from "../validators/producer";
import { getErrorMessage } from "../errors";

export interface ProduceRecordCommand extends ProducerInfoUri {
messageKeyFormat?: MessageFormat;
Expand All @@ -35,47 +37,54 @@ export class ProduceRecordCommandHandler {
return;
}

const { topicId, key, value } = command;
const channel = this.channelProvider.getChannel("Kafka Producer Log");
if (topicId === undefined) {
channel.appendLine("No topic");
return;
}
if (this.settings.producerFakerJSEnabled) {
faker.setLocale(this.settings.producerFakerJSLocale);
}
try {
ProducerValidator.validate(command);

const messages = [...Array(times).keys()].map(() => {
const { topicId, key, value } = command;
const channel = this.channelProvider.getChannel("Kafka Producer Log");
if (topicId === undefined) {
channel.appendLine("No topic");
return;
}
if (this.settings.producerFakerJSEnabled) {
//Use same seed for key and value so we can generate content like
// key: customer-{{random.uuid}} // same value as in id
// {"id": "{{random.uuid}}"} // same value as in key
const seed = Math.floor(Math.random() * 1000000);
faker.seed(seed);
const randomizedKey = (key) ? faker.fake(key) : key;
faker.seed(seed);
const randomizedValue = faker.fake(value);
faker.setLocale(this.settings.producerFakerJSLocale);
}

const messages = [...Array(times).keys()].map(() => {
if (this.settings.producerFakerJSEnabled) {
//Use same seed for key and value so we can generate content like
// key: customer-{{random.uuid}} // same value as in id
// {"id": "{{random.uuid}}"} // same value as in key
const seed = Math.floor(Math.random() * 1000000);
faker.seed(seed);
const randomizedKey = (key) ? faker.fake(key) : key;
faker.seed(seed);
const randomizedValue = faker.fake(value);
return {
key: serialize(randomizedKey, command.messageKeyFormat),
value: serialize(randomizedValue, command.messageValueFormat)
};
}

// Return key/value message as-is
return {
key: serialize(randomizedKey, command.messageKeyFormat),
value: serialize(randomizedValue, command.messageValueFormat)
key: serialize(key, command.messageKeyFormat),
value: serialize(value, command.messageValueFormat)
};
}
});

// Return key/value message as-is
return {
key: serialize(key, command.messageKeyFormat),
value: serialize(value, command.messageValueFormat)
command.clusterId = client.cluster.id;
const producerUri = createProducerUri(command);
const record = {
topic: topicId,
messages: messages,
};
});

command.clusterId = client.cluster.id;
const producerUri = createProducerUri(command);
const record = {
topic: topicId,
messages: messages,
};
// Start the producer
await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer);
// Start the producer
await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer);
}
catch (e) {
vscode.window.showErrorMessage(`Error while producing: ${getErrorMessage(e)}`);
}
}
}

Expand Down
85 changes: 71 additions & 14 deletions src/kafka-file/kafkaFileClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ConsumerLaunchStateProvider, getLanguageService, LanguageService, Produ
import { runSafeAsync } from "./utils/runner";
import { TopicItem } from "../explorer";
import { KafkaModelProvider } from "../explorer/models/kafka";
import { ThrottledDelayer } from "./utils/async";

export function startLanguageClient(
clusterSettings: ClusterSettings,
Expand All @@ -25,20 +26,6 @@ export function startLanguageClient(
// Create the Kafka file language service.
const languageService = createLanguageService(clusterSettings, producerCollection, consumerCollection, modelProvider);

// Open / Close document
context.subscriptions.push(vscode.workspace.onDidOpenTextDocument(e => {
if (e.languageId === 'kafka') {
openedDocuments.set(e.uri.toString(), e);
}
}));

context.subscriptions.push(vscode.workspace.onDidCloseTextDocument(e => {
if (e.languageId === 'kafka') {
openedDocuments.delete(e.uri.toString());
kafkaFileDocuments.onDocumentRemoved(e);
}
}));

const documentSelector = [
{ language: "kafka", scheme: "file" },
{ language: "kafka", scheme: "untitled" },
Expand Down Expand Up @@ -70,6 +57,32 @@ export function startLanguageClient(
new KafkaFileCompletionItemProvider(kafkaFileDocuments, languageService),
':', '{', '.'));

// Validation
const diagnostics = new KafkaFileDiagnostics(kafkaFileDocuments, languageService);
context.subscriptions.push(diagnostics);

// Open / Close document
context.subscriptions.push(vscode.workspace.onDidOpenTextDocument(e => {
if (e.languageId === 'kafka') {
openedDocuments.set(e.uri.toString(), e);
diagnostics.triggerValidate(e);
}
}));

context.subscriptions.push(vscode.workspace.onDidChangeTextDocument(e => {
if (e.document.languageId === 'kafka') {
diagnostics.triggerValidate(e.document);
}
}));

context.subscriptions.push(vscode.workspace.onDidCloseTextDocument(e => {
if (e.languageId === 'kafka') {
openedDocuments.delete(e.uri.toString());
kafkaFileDocuments.onDocumentRemoved(e);
diagnostics.delete(e);
}
}));

return {
dispose() {
kafkaFileDocuments.dispose();
Expand Down Expand Up @@ -173,3 +186,47 @@ class KafkaFileCompletionItemProvider extends AbstractKafkaFileFeature implement

}

class KafkaFileDiagnostics extends AbstractKafkaFileFeature implements vscode.Disposable {

private diagnosticCollection: vscode.DiagnosticCollection;
private delayers?: { [key: string]: ThrottledDelayer<void> };

constructor(
kafkaFileDocuments: LanguageModelCache<KafkaFileDocument>,
languageService: LanguageService
) {
super(kafkaFileDocuments, languageService);
this.diagnosticCollection = vscode.languages.createDiagnosticCollection('kafka');
this.delayers = Object.create(null);
}

delete(textDocument: vscode.TextDocument) {
this.diagnosticCollection.delete(textDocument.uri);
}

public triggerValidate(textDocument: vscode.TextDocument): void {
let trigger = () => {
let key = textDocument.uri.toString();
let delayer = this.delayers![key];
if (!delayer) {
delayer = new ThrottledDelayer<void>(250);
this.delayers![key] = delayer;
}
delayer.trigger(() => this.doValidate(textDocument));
};
trigger();
}

private doValidate(document: vscode.TextDocument): Promise<void> {
return new Promise<void>((resolve) => {
const kafkaFileDocument = this.getKafkaFileDocument(document);
const diagnostics = this.languageService.doDiagnostics(document, kafkaFileDocument);
this.diagnosticCollection!.set(document.uri, diagnostics);
resolve();
});
}
dispose(): void {
this.diagnosticCollection.clear();
this.diagnosticCollection.dispose();
}
}
23 changes: 17 additions & 6 deletions src/kafka-file/languageservice/kafkaFileLanguageService.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { CodeLens, CompletionList, Position, TextDocument, Uri } from "vscode";
import { CodeLens, CompletionList, Diagnostic, Position, TextDocument, Uri } from "vscode";
import { ConsumerLaunchState } from "../../client";
import { ProducerLaunchState } from "../../client/producer";
import { KafkaFileDocument, parseKafkaFile } from "./parser/kafkaFileParser";
import { KafkaFileCodeLenses } from "./services/codeLensProvider";
import { KafkaFileCompletion } from "./services/completion";
import { KafkaFileDiagnostics } from "./services/diagnostics";

/**
* Provider API which gets the state for a given producer.
Expand Down Expand Up @@ -70,7 +71,15 @@ export interface LanguageService {
* @param kafkaFileDocument the parsed AST.
* @param position the position where the completion was triggered.
*/
doComplete(document: TextDocument, kafkaFileDocument: KafkaFileDocument, position: Position): Promise<CompletionList | undefined>
doComplete(document: TextDocument, kafkaFileDocument: KafkaFileDocument, position: Position): Promise<CompletionList | undefined>;

/**
* Returns the diagnostics result for the given text document and parsed AST.
*
* @param document the text document.
* @param kafkaFileDocument the parsed AST.
*/
doDiagnostics(document: TextDocument, kafkaFileDocument: KafkaFileDocument): Diagnostic[];
}

/**
Expand All @@ -83,11 +92,13 @@ export interface LanguageService {
*/
export function getLanguageService(producerLaunchStateProvider: ProducerLaunchStateProvider, consumerLaunchStateProvider: ConsumerLaunchStateProvider, selectedClusterProvider: SelectedClusterProvider, topicProvider: TopicProvider): LanguageService {

const kafkaFileCodeLenses = new KafkaFileCodeLenses(producerLaunchStateProvider, consumerLaunchStateProvider, selectedClusterProvider);
const kafkaFileCompletion = new KafkaFileCompletion(selectedClusterProvider, topicProvider);
const codeLenses = new KafkaFileCodeLenses(producerLaunchStateProvider, consumerLaunchStateProvider, selectedClusterProvider);
const completion = new KafkaFileCompletion(selectedClusterProvider, topicProvider);
const diagnostics = new KafkaFileDiagnostics();
return {
parseKafkaFileDocument: (document: TextDocument) => parseKafkaFile(document),
getCodeLenses: kafkaFileCodeLenses.getCodeLenses.bind(kafkaFileCodeLenses),
doComplete: kafkaFileCompletion.doComplete.bind(kafkaFileCompletion)
getCodeLenses: codeLenses.getCodeLenses.bind(codeLenses),
doComplete: completion.doComplete.bind(completion),
doDiagnostics: diagnostics.doDiagnostics.bind(diagnostics)
};
}
Loading

0 comments on commit 161f21a

Please sign in to comment.