Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/process batch v2 #10

6 changes: 3 additions & 3 deletions src/idl/coproc_idl.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"namespace": "coproc",
"service_name": "management",
"service_name": "script_manager",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't we agree on deleting src/idl or maybe using symlinks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, we agree to do that, but no in this series.

"includes": [
"coproc/types.h"
],
Expand All @@ -9,12 +9,12 @@
{
"name": "enable_copros",
"input_type": "enable_copros_request",
"output_type": "enable_copros_response"
"output_type": "enable_copros_reply"
},
{
"name": "disable_copros",
"input_type": "disable_copros_request",
"output_type": "disable_copros_response"
"output_type": "disable_copros_reply"
}
]
}
15 changes: 15 additions & 0 deletions src/idl/process_batch_coproc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"namespace": "coproc",
"service_name": "supervisor",
"includes": [
"coproc/types.h"
],
"js_include": "../../domain/generatedRpc/generatedClasses",
"methods": [
{
"name": "process_batch",
"input_type": "process_batch_request",
"output_type": "process_batch_reply"
}
]
}
5 changes: 5 additions & 0 deletions src/js/generate-entries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ cd "$root"/tools/ts-generator/rpc &&
python rpc_gen_js.py \
--server-define-file "$root"/src/idl/coproc_idl.json \
--output-file "$root"/src/js/modules/rpc/serverAndClients/server.ts

cd "$root"/tools/ts-generator/rpc &&
python rpc_gen_js.py \
--server-define-file "$root"/src/idl/process_batch_coproc.json \
--output-file "$root"/src/js/modules/rpc/serverAndClients/processBatch.ts
27 changes: 20 additions & 7 deletions src/js/modules/domain/generatedRpc/enableDisableCoproc.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
},
{
"name": "compression",
"type": "int8"
"type": "uint8"
},
{
"name": "payloadSize",
Expand All @@ -33,6 +33,19 @@
}
]
},
{
"className": "TopicEnableRequest",
"fields": [
{
"name": "topic",
"type": "string"
},
{
"name": "injectionPolicy",
"type": "int8"
}
]
},
{
"className": "EnableCoprocRequest",
"fields": [
Expand All @@ -42,7 +55,7 @@
},
{
"name": "topics",
"type": "Array<string>"
"type": "Array<TopicEnableRequest>"
}
]
},
Expand All @@ -56,7 +69,7 @@
]
},
{
"className": "EnableCoprocResponse",
"className": "EnableCoprocReply",
"fields": [
{
"name": "id",
Expand All @@ -69,16 +82,16 @@
]
},
{
"className": "EnableCoprosResponse",
"className": "EnableCoprosReply",
"fields": [
{
"name": "inputs",
"type": "Array<EnableCoprocResponse>"
"type": "Array<EnableCoprocReply>"
}
]
},
{
"className": "DisableCoprosResponse",
"className": "DisableCoprosReply",
"fields": [
{
"name": "inputs",
Expand All @@ -96,4 +109,4 @@
]
}
]
}
}
86 changes: 67 additions & 19 deletions src/js/modules/domain/generatedRpc/entitiesDefinition.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,6 @@
}
]
},
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think it makes sense to check in generated code, it makes the PR bigger for no reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm I don't understand your comment, but I need to redefine this entity, for that reason I remove it but I create again after in the bottom file ln:212

"className": "ProcessBatchRequest",
"fields": [
{
"name": "npt",
"type": "Ntp"
},
{
"name": "recordBatch",
"type": "RecordBatch"
}
]
},
{
"className": "Ntp",
"fields": [
Expand All @@ -72,7 +59,8 @@
},
{
"name": "records",
"type": "Array<Record>"
"type": "Array<Record>",
"size": "header.recordCount"
}
]
},
Expand Down Expand Up @@ -130,6 +118,14 @@
{
"name": "recordCount",
"type": "int32"
},
{
"name": "term",
"type": "int64"
},
{
"name": "isCompressed",
"type": "int8"
}
]
},
Expand All @@ -138,31 +134,31 @@
"fields": [
{
"name": "length",
"type": "varint"
"type": "uint32"
},
{
"name": "attributes",
"type": "int8"
},
{
"name": "timestampDelta",
"type": "varint"
"type": "int64"
},
{
"name": "offsetDelta",
"type": "varint"
"type": "int32"
},
{
"name": "keyLength",
"type": "varint"
"type": "int32"
},
{
"name": "key",
"type": "buffer"
},
{
"name": "valueLen",
"type": "varint"
"type": "int32"
},
{
"name": "value",
Expand Down Expand Up @@ -194,6 +190,58 @@
"type": "buffer"
}
]
},
{
"className": "ProcessBatchRequestItem",
"fields": [
{
"name": "coprocessorIds",
"type": "Array<uint64>"
},
{
"name": "ntp",
"type": "Ntp"
},
{
"name": "recordBatch",
"type": "Array<RecordBatch>"
}
]
},
{
"className": "ProcessBatchRequest",
"fields": [
{
"name": "requests",
"type": "Array<ProcessBatchRequestItem>"
}
]
},
{
"className": "ProcessBatchReplyItem",
"fields": [
{
"name": "coprocessorId",
"type": "uint64"
},
{
"name": "ntp",
"type": "Ntp"
},
{
"name": "resultRecordBatch",
"type": "Array<RecordBatch>"
}
]
},
{
"className": "ProcessBatchReply",
"fields": [
{
"name": "result",
"type": "Array<ProcessBatchReplyItem>"
}
]
}
]
}
12 changes: 7 additions & 5 deletions src/js/modules/public/Coprocessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ interface RecordBatchHeader {
producerEpoch: number;
baseSequence: number;
recordCount: number;
term: bigint;
isCompressed: number;
}

interface Record {
length: bigint;
length: number;
attributes: number;
timestampDelta: bigint;
offsetDelta: bigint;
keyLength: bigint;
offsetDelta: number;
keyLength: number;
key: Buffer;
valueLen: bigint;
valueLen: number;
value: Buffer;
headers: Array<RecordHeader>;
}
Expand All @@ -55,7 +57,7 @@ interface Coprocessor {
inputTopics: string[];
policyError: PolicyError;
globalId: bigint;
apply(record: RecordBatch): RecordBatch;
apply: (record: RecordBatch) => Map<string, RecordBatch>;
}

export { RecordBatchHeader, RecordHeader, Record, RecordBatch, Coprocessor };
4 changes: 2 additions & 2 deletions src/js/modules/public/SimpleTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class SimpleTransform implements Coprocessor {
inputTopics: string[];
policyError: PolicyError;

apply(record: RecordBatch): RecordBatch {
apply = (record: RecordBatch): Map<string, RecordBatch> => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have an explicit named type for topic, so the code is self documenting: Map<topic, RecordBatch>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

throw Error("processRecord isn't implemented yet");
}
};
}
10 changes: 6 additions & 4 deletions src/js/modules/public/Utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const createHeader = (
recordBatchType: 0,
recordCount: 0,
sizeBytes: 0,
term: BigInt(0),
isCompressed: 0,
...header,
};
};
Expand All @@ -43,12 +45,12 @@ const createRecord = (record: Partial<Record>): Record => {
return {
attributes: 0,
key: Buffer.from(""),
keyLength: BigInt(0),
length: BigInt(0),
offsetDelta: BigInt(0),
keyLength: 0,
length: 0,
offsetDelta: 0,
timestampDelta: BigInt(0),
value: Buffer.from(""),
valueLen: BigInt(0),
valueLen: 0,
...record,
headers: headers.map(createRecordHeader),
};
Expand Down
Loading