From 3109237b3816d129fc2881952db62ff66409d2d8 Mon Sep 17 00:00:00 2001 From: Nitin Mittal Date: Sat, 12 Aug 2023 22:15:04 +0400 Subject: [PATCH] fix: requested changes --- README.md | 44 +++++------ .../matic_transfer_data_transformer.test.ts | 79 ------------------- .../interfaces/data_transformer_config.ts | 8 -- internal/interfaces/event_producer.ts | 5 -- internal/interfaces/event_transformer.ts | 6 -- internal/interfaces/index.ts | 4 - internal/interfaces/transformer_config.ts | 8 -- public/data_transformation/transform.ts | 35 ++++---- public/interfaces/data_transformer_config.ts | 1 - public/interfaces/event_producer.ts | 6 +- public/interfaces/event_transformer.ts | 7 +- public/interfaces/index.ts | 3 + public/interfaces/transformer_config.ts | 9 ++- .../kafka/consumer/asynchronous_consumer.ts | 4 +- public/kafka/consumer/consume.ts | 62 ++++----------- public/kafka/consumer/synchronous_consumer.ts | 4 +- .../kafka/producer/asynchronous_producer.ts | 12 ++- public/kafka/producer/produce.ts | 44 ++++------- public/kafka/producer/synchronous_producer.ts | 14 ++-- 19 files changed, 109 insertions(+), 246 deletions(-) delete mode 100644 example/transformer/tests/matic_transfer_data_transformer.test.ts delete mode 100644 internal/interfaces/data_transformer_config.ts delete mode 100644 internal/interfaces/event_producer.ts delete mode 100644 internal/interfaces/event_transformer.ts delete mode 100644 internal/interfaces/transformer_config.ts delete mode 100644 public/interfaces/data_transformer_config.ts diff --git a/README.md b/README.md index 2acb845..8f45e23 100644 --- a/README.md +++ b/README.md @@ -11,21 +11,21 @@ ChainFlow, is a powerful framework designed to facilitate the development of fle In today's rapidly evolving blockchain ecosystem, the need for efficient and reliable data processing is paramount. EVM (Ethereum Virtual Machine) blockchains, such as Ethereum itself and its compatible networks, have gained significant traction due to their smart contract capabilities and decentralized nature. However, working with blockchain data at scale and in real time presents unique challenges. -ChainFlow addresses these challenges by providing a comprehensive set of Node.js packages that simplify the development of event-driven data pipelines. With its intuitive design and seamless integration with Kafka, one of the most popular and battle-tested distributed streaming platforms, ChainFlow offers a robust and reliable infrastructure for processing blockchain events efficiently. +ChainFlow addresses these challenges by providing a comprehensive Node.js package that simplifies the development of event-driven data pipelines. With its intuitive design and seamless integration with Kafka, one of the most popular and battle-tested distributed streaming platforms, ChainFlow offers a robust and reliable infrastructure for processing blockchain events efficiently. ## Key Features - **Event-driven Architecture:** ChainFlow embraces the power of event-driven architecture, allowing developers to create pipelines that react to blockchain events in real time. By leveraging this approach, applications built with ChainFlow can easily respond to changes on the blockchain, enabling near-instantaneous data processing. -- **Flexible Data Pipelines:** ChainFlow offers a flexible and extensible framework for building data pipelines that suit your specific needs. Developers can easily define their desired data flow, including event filtering, transformation, and aggregation, by utilizing the rich set of ChainFlow's features and packages. +- **Flexible Data Pipelines:** ChainFlow offers a flexible and extensible framework for building data pipelines that suit your specific needs. Developers can easily define their desired data flow, including event filtering, transformation, and aggregation, by utilizing the feature set of ChainFlow. - **Seamless Integration with Kafka:** As the backbone of ChainFlow, Kafka provides the necessary infrastructure for handling high-throughput, fault-tolerant, and scalable data streams. ChainFlow's integration with Kafka ensures reliable data processing and enables seamless interoperability with other Kafka-based systems, further enhancing the versatility of your data pipelines. -- **EVM Blockchain Compatibility:** ChainFlow is specifically designed for EVM blockchains, enabling developers to harness the power of smart contracts and decentralized applications. Whether you are working with Ethereum or any other EVM-compatible blockchain, ChainFlow provides a unified and consistent approach to processing blockchain events across different networks. ChainFlow packages can also be used for other chains with custom implementations. +- **EVM Blockchain Compatibility:** ChainFlow is specifically designed for EVM blockchains, enabling developers to harness the power of smart contracts and decentralized applications. Whether you are working with Ethereum or any other EVM-compatible blockchain, ChainFlow provides a unified and consistent approach to processing blockchain events across different networks. ChainFlow can also be used for other chains with custom implementations of the provided interfaces and abstract classes. - **Extensive Package Ecosystem:** ChainFlow offers a rich ecosystem of Node.js packages that cater to various aspects of building event-driven data pipelines. From connecting to blockchain networks, managing Kafka topics, to implementing data processing logic, the ChainFlow package ecosystem provides a comprehensive toolkit to expedite your development process. With ChainFlow, you can unlock the true potential of EVM blockchains by seamlessly integrating them into your data infrastructure. Whether you are building real-time analytics, decentralized applications, or any other data-driven solution, this documentation will guide you through the intricacies of using ChainFlow's packages and assist you in developing robust and efficient event-driven data pipelines on EVM blockchains. ## Installation -You can install the package using [NPM](https://www.npmjs.com/package/web3) or using [Yarn](https://yarnpkg.com/package/web3) +You can install the package using [NPM](https://www.npmjs.com/package/@maticnetwork/chainflow) or using [Yarn](https://yarnpkg.com/package/@maticnetwork/chainflow) ### Using NPM @@ -41,7 +41,7 @@ yarn add @maticnetwork/chainflow ## Usage -```js +```typescript // Import the chainflow module const chainflow = require('@maticnetwork/chainflow'); ``` @@ -69,6 +69,9 @@ By leveraging the events generated by the transformers, consumers can react in r Together, these three layers form the foundation of ChainFlow, providing a comprehensive framework for building flexible and scalable event-driven data pipelines on EVM blockchains. Whether it's building real-time analytics, decentralized applications, or any other data-driven solution, ChainFlow's architecture offers the necessary tools and abstractions to streamline the development process and unlock the full potential of EVM blockchains. +## Examples +To gain a clearer understanding of the entire process, let's consider a straightforward example that involves indexing MATIC transfer events from the Ethereum blockchain. This [example](./example/README.md) encompasses all the layers involved, starting from producers, moving through transformers, and concluding with consumers. + ## Producers ### Block Producers @@ -478,7 +481,7 @@ consume( The **Asynchronous Consumer** class is designed for scenarios where the speed of data consumption takes precedence over potential data loss. If the timely processing of events is critical and the occasional loss of some events is acceptable within defined limits, the asynchronous approach offers enhanced performance. By consuming events in a non-blocking manner, it allows for faster processing and higher throughput, albeit with a higher risk of occasional data loss. -```js +```typescript // Import the required modules import { AsynchronousConsumer } from "@maticnetwork/chainflow/kafka/consumer/asynchronous_consumer"; import { Coder } from "@maticnetwork/chainflow/coder/protobuf_coder"; @@ -626,22 +629,7 @@ consume(consumerConfig, { ``` -## Examples -To gain a clearer understanding of the entire process, let's consider a straightforward example that involves indexing MATIC transfer events from the Ethereum blockchain. This [example](./example/README.md) encompasses all the layers involved, starting from producers, moving through transformers, and concluding with consumers. - -## Conclusion - -This horizontal architectural approach allows to handle increasing workloads by adding more resources and not like vertical where you increase the resources of existing servers. It enables the system to maintain performance and handle larger amounts of data and traffic as the demands grow.Data in Kafka is organized into topics, and producers write data to specific topics. Consumers subscribe to topics and process the data in real-time. This decoupling of producers and consumers allows for a flexible and scalable architecture, as more producers and consumers can be added without affecting the existing ones. - -Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records. It is designed to handle high-throughput, real-time data feeds and provides features such as fault tolerance, scalability, and message persistence, making it well-suited for handling blockchain data. - -Here's how abstracting historic blockchain data through Kafka is providing reliability and performance benefits: - -1. **Separating data producers and consumers**: By using Kafka, you can separate the blockchain data producers (e.g., blockchain nodes) from the consumers (e.g., applications specific data). This allows each component to work independently and reducing the risk of data loss or any delay. -2. **Reliable message delivery**: If a consumer fails in kafka, it can resume from where it got stopped once it recovers, avoiding any data loss. This is important when working with historical blockchain data that needs to be processed accurately. -3. **Multiple consumers and data replay**: Kafka allows same topics from the producer to be consumed by multiple consumers any number of times. this helps in preventing producing of blockchain data multiple times into the kafka stream. A consumer can even replay the processing of same topic if any change in required. - -## Additional Information +## Helpers ### Database @@ -787,6 +775,18 @@ console.log(`Is Topic in Bloom Filter? ${isTopicInBloom}`); // true or false ``` +## Conclusion + +This horizontal architectural approach allows to handle increasing workloads by adding more resources and not like vertical where you increase the resources of existing servers. It enables the system to maintain performance and handle larger amounts of data and traffic as the demands grow.Data in Kafka is organized into topics, and producers write data to specific topics. Consumers subscribe to topics and process the data in real-time. This decoupling of producers and consumers allows for a flexible and scalable architecture, as more producers and consumers can be added without affecting the existing ones. + +Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records. It is designed to handle high-throughput, real-time data feeds and provides features such as fault tolerance, scalability, and message persistence, making it well-suited for handling blockchain data. + +Here's how abstracting historic blockchain data through Kafka is providing reliability and performance benefits: + +1. **Separating data producers and consumers**: By using Kafka, you can separate the blockchain data producers (e.g., blockchain nodes) from the consumers (e.g., applications specific data). This allows each component to work independently and reducing the risk of data loss or any delay. +2. **Reliable message delivery**: If a consumer fails in kafka, it can resume from where it got stopped once it recovers, avoiding any data loss. This is important when working with historical blockchain data that needs to be processed accurately. +3. **Multiple consumers and data replay**: Kafka allows same topics from the producer to be consumed by multiple consumers any number of times. this helps in preventing producing of blockchain data multiple times into the kafka stream. A consumer can even replay the processing of same topic if any change in required. + ## Building ### Requirements diff --git a/example/transformer/tests/matic_transfer_data_transformer.test.ts b/example/transformer/tests/matic_transfer_data_transformer.test.ts deleted file mode 100644 index 6f96906..0000000 --- a/example/transformer/tests/matic_transfer_data_transformer.test.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { ITransformedBlock } from "@maticnetwork/chainflow/interfaces/transformed_block"; -import { Coder } from "@maticnetwork/chainflow/coder/protobuf_coder"; -import { ABICoder } from "@maticnetwork/chainflow/coder/abi_coder"; -import { IBlock } from "@maticnetwork/chainflow/interfaces/block"; -import { IConsumerConfig } from "@maticnetwork/chainflow/interfaces/consumer_config"; -import { IProducerConfig } from "@maticnetwork/chainflow/interfaces/producer_config"; -import { ICoder } from "@maticnetwork/chainflow/interfaces/coder"; -import { IKafkaCoderConfig } from "@maticnetwork/chainflow/interfaces/kafka_coder_config"; -import { BloomFilter } from "@maticnetwork/chainflow/filter"; -import { MaticTransferMapper } from "../dist/mappers/matic_transfer_mapper.js"; -import { MaticTransferDataTransformer } from "../dist/matic_transfer_data_transformer.js"; -import IMaticTransferTx from "../dist/interfaces/matic_transfer_tx.js"; -import ethereumFullBlock from "./mock_data/ethereum_full_block.json"; - -jest.mock("../dist/mappers/matic_transfer_mapper.js"); -jest.mock("@maticnetwork/chainflow/coder/abi_coder"); -jest.mock("@maticnetwork/chainflow/filter"); - -describe("MaticTransferDataTransformer", () => { - class ExtendedTransformer extends MaticTransferDataTransformer { - public transform(block: IBlock): Promise> { - return super.transform(block); - } - } - - let - mockedMaticTransferMapperObject: jest.MockedObject, - mockedBloomFilter: jest.MockedClass, - mockedABICoder: jest.MockedClass, - mockedCoderClass: jest.MockedClass, - extendedTransformer: ExtendedTransformer; - - beforeEach(() => { - mockedBloomFilter = BloomFilter as jest.MockedClass; - mockedABICoder = ABICoder as jest.MockedClass; - mockedCoderClass = Coder as jest.MockedClass; - }); - - describe("transform", () => { - beforeEach(() => { - mockedMaticTransferMapperObject = new MaticTransferMapper() as jest.MockedObject - - extendedTransformer = new ExtendedTransformer( - {topic: "mocktopic", coders: {} as IKafkaCoderConfig} as IConsumerConfig, - {coder: {} as ICoder} as IProducerConfig, - mockedMaticTransferMapperObject - - ) - }); - - test("mapper.map must be called with transaction details", async () => { - //@ts-ignore - await extendedTransformer.transform(ethereumFullBlock); - - expect(mockedMaticTransferMapperObject.map).toBeCalledWith( - ethereumFullBlock.transactions[0] - ); - }); - - test("must return transformed block with matic transfer events.", async () => { - mockedMaticTransferMapperObject.map.mockReturnValueOnce([{ - transactionHash: "mock" - }] as IMaticTransferTx[]) - - - await expect( - //@ts-ignore - extendedTransformer.transform(ethereumFullBlock) - ).resolves.toEqual({ - blockNumber: 17855673, - timestamp: 1691321963, - data: [{ - transactionHash: "mock" - }] - }); - }); - - }); -}) diff --git a/internal/interfaces/data_transformer_config.ts b/internal/interfaces/data_transformer_config.ts deleted file mode 100644 index b384107..0000000 --- a/internal/interfaces/data_transformer_config.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { IConsumerConfig } from "./consumer_config.js"; -import { IProducerConfig } from "./producer_config.js"; - -export interface IDataTransformerConfig { - consumerConfig: IConsumerConfig, - producerConfig: IProducerConfig, - type: string, -} diff --git a/internal/interfaces/event_producer.ts b/internal/interfaces/event_producer.ts deleted file mode 100644 index 969610d..0000000 --- a/internal/interfaces/event_producer.ts +++ /dev/null @@ -1,5 +0,0 @@ -export interface IEventProducer { - subscribe: () => Promise | void - error: (value: E) => void - closed: () => void -} diff --git a/internal/interfaces/event_transformer.ts b/internal/interfaces/event_transformer.ts deleted file mode 100644 index 3728302..0000000 --- a/internal/interfaces/event_transformer.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { ITransformedBlock } from "./transformed_block.js"; - -export interface IEventTransformer { - transform: (value: G) => Promise> - error: (error: E) => void -} diff --git a/internal/interfaces/index.ts b/internal/interfaces/index.ts index 9d023aa..c071d2d 100644 --- a/internal/interfaces/index.ts +++ b/internal/interfaces/index.ts @@ -12,12 +12,9 @@ export * from "./common_kafka_events.js"; export * from "./config.js"; export * from "./consumer_config.js"; export * from "./consumer_queue_object.js"; -export * from "./data_transformer_config.js"; export * from "./deposit.js"; export * from "./deserialised_kafka_message.js"; export * from "./event_log.js"; -export * from "./event_producer.js"; -export * from "./event_transformer.js"; export * from "./kafka_coder_config.js"; export * from "./logger_config.js"; export * from "./mapper.js"; @@ -35,6 +32,5 @@ export * from "./synchronous_producer.js"; export * from "./transaction_receipt.js"; export * from "./transaction.js"; export * from "./transformed_block.js"; -export * from "./transformer_config.js"; export * from "./web3_transaction_receipt.js"; export * from "./web3_transaction.js"; diff --git a/internal/interfaces/transformer_config.ts b/internal/interfaces/transformer_config.ts deleted file mode 100644 index de5f3a5..0000000 --- a/internal/interfaces/transformer_config.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { IConsumerConfig } from "./consumer_config.js"; -import { IProducerConfig } from "./producer_config.js"; - -export interface ITransformerConfig { - consumerConfig: IConsumerConfig, - producerConfig: IProducerConfig - type: string, -} diff --git a/public/data_transformation/transform.ts b/public/data_transformation/transform.ts index d23bfbd..551162f 100644 --- a/public/data_transformation/transform.ts +++ b/public/data_transformation/transform.ts @@ -1,7 +1,7 @@ import { AsynchronousDataTransformer } from "./asynchronous_data_transformer.js"; import { SynchronousDataTransformer } from "./synchronous_data_transformer.js"; -import { ITransformerConfig } from "@internal/interfaces/transformer_config.js"; -import { IEventTransformer } from "@internal/interfaces/event_transformer.js"; +import { ITransformerConfig } from "../interfaces/transformer_config.js"; +import { IEventTransformer } from "../interfaces/event_transformer.js"; import { KafkaError } from "@internal/errors/kafka_error.js"; /** @@ -23,27 +23,34 @@ export function transform( let transformer: AsynchronousDataTransformer | SynchronousDataTransformer | null = null; - if (type === "asynchronous") { - //@ts-ignore - transformer = new AsynchronousDataTransformer(consumerConfig, producerConfig); - } - - if (type === "synchronous") { - //@ts-ignore - transformer = new SynchronousDataTransformer(consumerConfig, producerConfig); - } - - if (!transformer) { - throw new Error("Invalid type"); + switch (type) { + case "asynchronous": { + //@ts-ignore + transformer = new AsynchronousDataTransformer(consumerConfig, producerConfig); + break; + } + + case "synchronous": { + //@ts-ignore + transformer = new SynchronousDataTransformer(consumerConfig, producerConfig); + break; + } + + default: { + throw new Error("Invalid type"); + } } //@ts-ignore transformer.transform = eventTransformer.transform; + //@ts-ignore transformer.on("dataTransformer.fatalError", eventTransformer.error); + //@ts-ignore transformer.start(); + //@ts-ignore return transformer; } diff --git a/public/interfaces/data_transformer_config.ts b/public/interfaces/data_transformer_config.ts deleted file mode 100644 index 29c2d60..0000000 --- a/public/interfaces/data_transformer_config.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "@internal/interfaces/data_transformer_config.js"; diff --git a/public/interfaces/event_producer.ts b/public/interfaces/event_producer.ts index ea8acc6..969610d 100644 --- a/public/interfaces/event_producer.ts +++ b/public/interfaces/event_producer.ts @@ -1 +1,5 @@ -export * from "@internal/interfaces/event_producer.js"; +export interface IEventProducer { + subscribe: () => Promise | void + error: (value: E) => void + closed: () => void +} diff --git a/public/interfaces/event_transformer.ts b/public/interfaces/event_transformer.ts index 3a763a8..b103804 100644 --- a/public/interfaces/event_transformer.ts +++ b/public/interfaces/event_transformer.ts @@ -1 +1,6 @@ -export * from "@internal/interfaces/event_transformer.js"; +import { ITransformedBlock } from "@internal/interfaces/transformed_block.js"; + +export interface IEventTransformer { + transform: (value: G) => Promise> + error: (error: E) => void +} diff --git a/public/interfaces/index.ts b/public/interfaces/index.ts index c071d2d..67105c4 100644 --- a/public/interfaces/index.ts +++ b/public/interfaces/index.ts @@ -15,6 +15,8 @@ export * from "./consumer_queue_object.js"; export * from "./deposit.js"; export * from "./deserialised_kafka_message.js"; export * from "./event_log.js"; +export * from "./event_producer.js"; +export * from "./event_transformer.js"; export * from "./kafka_coder_config.js"; export * from "./logger_config.js"; export * from "./mapper.js"; @@ -32,5 +34,6 @@ export * from "./synchronous_producer.js"; export * from "./transaction_receipt.js"; export * from "./transaction.js"; export * from "./transformed_block.js"; +export * from "./transformer_config.js"; export * from "./web3_transaction_receipt.js"; export * from "./web3_transaction.js"; diff --git a/public/interfaces/transformer_config.ts b/public/interfaces/transformer_config.ts index 8ab4c70..866f2f1 100644 --- a/public/interfaces/transformer_config.ts +++ b/public/interfaces/transformer_config.ts @@ -1 +1,8 @@ -export * from "@internal/interfaces/transformer_config.js"; +import { IConsumerConfig } from "@internal/interfaces/consumer_config.js"; +import { IProducerConfig } from "@internal/interfaces/producer_config.js"; + +export interface ITransformerConfig { + consumerConfig: IConsumerConfig, + producerConfig: IProducerConfig + type: string, +} diff --git a/public/kafka/consumer/asynchronous_consumer.ts b/public/kafka/consumer/asynchronous_consumer.ts index 42aaaa0..92bc7d4 100644 --- a/public/kafka/consumer/asynchronous_consumer.ts +++ b/public/kafka/consumer/asynchronous_consumer.ts @@ -34,7 +34,7 @@ export class AsynchronousConsumer extends InternalAsynchronousConsumer { } if (Array.isArray(coders) || "fileName" in coders) { - const coderConfig = coders as ICoderConfig | ICoderConfig[]; + const coderConfig = coders; coders = {}; if (Array.isArray(topic) && Array.isArray(coderConfig)) { for (let topicIndex = 0; topicIndex < topic.length; topicIndex++) { @@ -44,7 +44,7 @@ export class AsynchronousConsumer extends InternalAsynchronousConsumer { coderConfig[topicIndex].messageType ); } - } else if (!Array.isArray(topic) && !Array.isArray(coders)) { + } else if (!Array.isArray(topic) && !Array.isArray(coderConfig)) { coders[topic] = new Coder( (coderConfig as ICoderConfig).fileName, (coderConfig as ICoderConfig).packageName, diff --git a/public/kafka/consumer/consume.ts b/public/kafka/consumer/consume.ts index a1f409a..3f45b63 100644 --- a/public/kafka/consumer/consume.ts +++ b/public/kafka/consumer/consume.ts @@ -1,12 +1,9 @@ import { IConsumerConfig } from "@internal/interfaces/consumer_config.js"; -import { SynchronousConsumer } from "@internal/kafka/consumer/synchronous_consumer.js"; -import { AsynchronousConsumer } from "@internal/kafka/consumer/asynchronous_consumer.js"; -import { Coder } from "@internal/coder/protobuf_coder.js"; +import { SynchronousConsumer } from "./synchronous_consumer.js"; +import { AsynchronousConsumer } from "./asynchronous_consumer.js"; import { IObserver } from "@internal/interfaces/observer.js"; import { DeserialisedMessage } from "public/index.js"; import { BaseError } from "@internal/errors/base_error.js"; -import { IKafkaCoderConfig } from "@internal/interfaces/kafka_coder_config.js"; -import { ICoderConfig } from "@internal/interfaces/coder_config.js"; /** * Function to be used as functional implementation for the consumer classes for asynchronous @@ -21,56 +18,25 @@ import { ICoderConfig } from "@internal/interfaces/coder_config.js"; export function consume( config: IConsumerConfig, observer?: IObserver ): AsynchronousConsumer | SynchronousConsumer { - let coders = config.coders; const type = config.type; - const topic = config.topic; delete config.type; - delete config.topic; - delete config.coders; - - if (!topic) { - throw new Error("Please provide topic"); - } - - if (!coders) { - throw new Error("Please provide coders"); - } - - if (Array.isArray(coders) || "fileName" in coders) { - const coderConfig = coders as ICoderConfig | ICoderConfig[]; - coders = {}; - if (Array.isArray(topic) && Array.isArray(coderConfig)) { - for (let topicIndex = 0; topicIndex < topic.length; topicIndex++) { - coders[topic[topicIndex]] = new Coder( - coderConfig[topicIndex].fileName, - coderConfig[topicIndex].packageName, - coderConfig[topicIndex].messageType - ); - } - } else if (!Array.isArray(topic) && !Array.isArray(coders)) { - coders[topic] = new Coder( - (coderConfig as ICoderConfig).fileName, - (coderConfig as ICoderConfig).packageName, - (coderConfig as ICoderConfig).messageType, - (coderConfig as ICoderConfig).fileDirectory, - ); - } else { - throw new Error("Please provide valid coder config or topic"); - } - } let consumer: AsynchronousConsumer | SynchronousConsumer | null = null; - if (type === "asynchronous") { - consumer = new AsynchronousConsumer(topic, coders as IKafkaCoderConfig, config); - } + switch (type) { + case "asynchronous": { + consumer = new AsynchronousConsumer(config); + break; + } - if (type === "synchronous") { - consumer = new SynchronousConsumer(topic, coders as IKafkaCoderConfig, config); - } + case "synchronous": { + consumer = new SynchronousConsumer(config); + break; + } - if (!consumer) { - throw new Error("Invalid type"); + default: { + throw new Error("Invalid type"); + } } if (observer) { diff --git a/public/kafka/consumer/synchronous_consumer.ts b/public/kafka/consumer/synchronous_consumer.ts index c74cecb..cf5c8b9 100644 --- a/public/kafka/consumer/synchronous_consumer.ts +++ b/public/kafka/consumer/synchronous_consumer.ts @@ -34,7 +34,7 @@ export class SynchronousConsumer extends InternalSynchronousConsumer { } if (Array.isArray(coders) || "fileName" in coders) { - const coderConfig = coders as ICoderConfig | ICoderConfig[]; + const coderConfig = coders; coders = {}; if (Array.isArray(topic) && Array.isArray(coderConfig)) { for (let topicIndex = 0; topicIndex < topic.length; topicIndex++) { @@ -44,7 +44,7 @@ export class SynchronousConsumer extends InternalSynchronousConsumer { coderConfig[topicIndex].messageType ); } - } else if (!Array.isArray(topic) && !Array.isArray(coders)) { + } else if (!Array.isArray(topic) && !Array.isArray(coderConfig)) { coders[topic] = new Coder( (coderConfig as ICoderConfig).fileName, (coderConfig as ICoderConfig).packageName, diff --git a/public/kafka/producer/asynchronous_producer.ts b/public/kafka/producer/asynchronous_producer.ts index f09e67f..2559501 100644 --- a/public/kafka/producer/asynchronous_producer.ts +++ b/public/kafka/producer/asynchronous_producer.ts @@ -1,8 +1,6 @@ import { AsynchronousProducer as InternalAsynchronousProducer } from "@internal/kafka/producer/asynchronous_producer.js"; import { IProducerConfig } from "@internal/interfaces/producer_config.js"; -import { ICoder } from "@internal/interfaces/coder.js"; import { Coder } from "@internal/coder/protobuf_coder.js"; -import { ICoderConfig } from "@internal/interfaces/coder_config.js"; /** * AsynchronousProducer class entends InternalAsynchronousProducer which creates an instance of AsynchronousProducer @@ -26,15 +24,15 @@ export class AsynchronousProducer extends InternalAsynchronousProducer { if ("fileName" in coder) { coder = new Coder( - (coder as ICoderConfig).fileName, - (coder as ICoderConfig).packageName, - (coder as ICoderConfig).messageType, - (coder as ICoderConfig).fileDirectory, + coder.fileName, + coder.packageName, + coder.messageType, + coder.fileDirectory, ); } super( - coder as ICoder, + coder, config ); } diff --git a/public/kafka/producer/produce.ts b/public/kafka/producer/produce.ts index 6b0e425..6b3a4da 100644 --- a/public/kafka/producer/produce.ts +++ b/public/kafka/producer/produce.ts @@ -1,11 +1,8 @@ -import { SynchronousProducer } from "@internal/kafka/producer/synchronous_producer.js"; -import { AsynchronousProducer } from "@internal/kafka/producer/asynchronous_producer.js"; +import { SynchronousProducer } from "./synchronous_producer.js"; +import { AsynchronousProducer } from "./asynchronous_producer.js"; import { IProducerConfig } from "@internal/interfaces/producer_config.js"; -import { Coder } from "@internal/coder/protobuf_coder.js"; -import { ICoder } from "@internal/interfaces/coder.js"; -import { ICoderConfig } from "@internal/interfaces/coder_config.js"; -import { IEventProducer } from "@internal/interfaces/event_producer.js"; import { KafkaError } from "@internal/errors/kafka_error.js"; +import { IEventProducer } from "../../interfaces/event_producer.js"; /** * Function to be used as functional implementation for the producer classes for asynchronous @@ -22,35 +19,24 @@ export function produce( eventProducer: IEventProducer ): AsynchronousProducer | SynchronousProducer { const type = config.type; - let coder = config.coder; delete config.type; - delete config.coder; - - if (!coder) { - throw new Error("Please provide coder"); - } - - if ("fileName" in coder) { - coder = new Coder( - (coder as ICoderConfig).fileName, - (coder as ICoderConfig).packageName, - (coder as ICoderConfig).messageType, - (coder as ICoderConfig).fileDirectory, - ); - } let producer: AsynchronousProducer | SynchronousProducer | null = null; - if (type === "asynchronous") { - producer = new AsynchronousProducer(coder as ICoder, config); - } + switch (type) { + case "asynchronous": { + producer = new AsynchronousProducer(config); + break; + } - if (type === "synchronous") { - producer = new SynchronousProducer(coder as ICoder, config); - } + case "synchronous": { + producer = new SynchronousProducer(config); + break; + } - if (!producer) { - throw new Error("Invalid type"); + default: { + throw new Error("Invalid type"); + } } producer.start(); diff --git a/public/kafka/producer/synchronous_producer.ts b/public/kafka/producer/synchronous_producer.ts index 141cd82..e104b66 100644 --- a/public/kafka/producer/synchronous_producer.ts +++ b/public/kafka/producer/synchronous_producer.ts @@ -1,8 +1,6 @@ import { SynchronousProducer as InternalSynchronousProducer } from "@internal/kafka/producer/synchronous_producer.js"; import { IProducerConfig } from "@internal/interfaces/producer_config.js"; -import { ICoder } from "@internal/interfaces/coder.js"; import { Coder } from "@internal/coder/protobuf_coder.js"; -import { ICoderConfig } from "@internal/interfaces/coder_config.js"; /** * SynchronousProducer class entends InternalSynchronousProducer which creates an instance of SynchronousProducer @@ -21,20 +19,20 @@ export class SynchronousProducer extends InternalSynchronousProducer { delete config.coder; if (!coder) { - throw new Error("Please provide coder"); + throw new Error("Please provide coder"); } if ("fileName" in coder) { coder = new Coder( - (coder as ICoderConfig).fileName, - (coder as ICoderConfig).packageName, - (coder as ICoderConfig).messageType, - (coder as ICoderConfig).fileDirectory, + coder.fileName, + coder.packageName, + coder.messageType, + coder.fileDirectory, ); } super( - coder as ICoder, + coder, config ); }