Skip to content

Commit

Permalink
fix: requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nitinmittal23 committed Aug 12, 2023
1 parent 3ca67b2 commit 3109237
Show file tree
Hide file tree
Showing 19 changed files with 109 additions and 246 deletions.
44 changes: 22 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ ChainFlow, is a powerful framework designed to facilitate the development of fle

In today's rapidly evolving blockchain ecosystem, the need for efficient and reliable data processing is paramount. EVM (Ethereum Virtual Machine) blockchains, such as Ethereum itself and its compatible networks, have gained significant traction due to their smart contract capabilities and decentralized nature. However, working with blockchain data at scale and in real time presents unique challenges.

ChainFlow addresses these challenges by providing a comprehensive set of Node.js packages that simplify the development of event-driven data pipelines. With its intuitive design and seamless integration with Kafka, one of the most popular and battle-tested distributed streaming platforms, ChainFlow offers a robust and reliable infrastructure for processing blockchain events efficiently.
ChainFlow addresses these challenges by providing a comprehensive Node.js package that simplifies the development of event-driven data pipelines. With its intuitive design and seamless integration with Kafka, one of the most popular and battle-tested distributed streaming platforms, ChainFlow offers a robust and reliable infrastructure for processing blockchain events efficiently.

## Key Features

- **Event-driven Architecture:** ChainFlow embraces the power of event-driven architecture, allowing developers to create pipelines that react to blockchain events in real time. By leveraging this approach, applications built with ChainFlow can easily respond to changes on the blockchain, enabling near-instantaneous data processing.
- **Flexible Data Pipelines:** ChainFlow offers a flexible and extensible framework for building data pipelines that suit your specific needs. Developers can easily define their desired data flow, including event filtering, transformation, and aggregation, by utilizing the rich set of ChainFlow's features and packages.
- **Flexible Data Pipelines:** ChainFlow offers a flexible and extensible framework for building data pipelines that suit your specific needs. Developers can easily define their desired data flow, including event filtering, transformation, and aggregation, by utilizing the feature set of ChainFlow.
- **Seamless Integration with Kafka:** As the backbone of ChainFlow, Kafka provides the necessary infrastructure for handling high-throughput, fault-tolerant, and scalable data streams. ChainFlow's integration with Kafka ensures reliable data processing and enables seamless interoperability with other Kafka-based systems, further enhancing the versatility of your data pipelines.
- **EVM Blockchain Compatibility:** ChainFlow is specifically designed for EVM blockchains, enabling developers to harness the power of smart contracts and decentralized applications. Whether you are working with Ethereum or any other EVM-compatible blockchain, ChainFlow provides a unified and consistent approach to processing blockchain events across different networks. ChainFlow packages can also be used for other chains with custom implementations.
- **EVM Blockchain Compatibility:** ChainFlow is specifically designed for EVM blockchains, enabling developers to harness the power of smart contracts and decentralized applications. Whether you are working with Ethereum or any other EVM-compatible blockchain, ChainFlow provides a unified and consistent approach to processing blockchain events across different networks. ChainFlow can also be used for other chains with custom implementations of the provided interfaces and abstract classes.
- **Extensive Package Ecosystem:** ChainFlow offers a rich ecosystem of Node.js packages that cater to various aspects of building event-driven data pipelines. From connecting to blockchain networks, managing Kafka topics, to implementing data processing logic, the ChainFlow package ecosystem provides a comprehensive toolkit to expedite your development process.

With ChainFlow, you can unlock the true potential of EVM blockchains by seamlessly integrating them into your data infrastructure. Whether you are building real-time analytics, decentralized applications, or any other data-driven solution, this documentation will guide you through the intricacies of using ChainFlow's packages and assist you in developing robust and efficient event-driven data pipelines on EVM blockchains.

## Installation

You can install the package using [NPM](https://www.npmjs.com/package/web3) or using [Yarn](https://yarnpkg.com/package/web3)
You can install the package using [NPM](https://www.npmjs.com/package/@maticnetwork/chainflow) or using [Yarn](https://yarnpkg.com/package/@maticnetwork/chainflow)

### Using NPM

Expand All @@ -41,7 +41,7 @@ yarn add @maticnetwork/chainflow

## Usage

```js
```typescript
// Import the chainflow module
const chainflow = require('@maticnetwork/chainflow');
```
Expand Down Expand Up @@ -69,6 +69,9 @@ By leveraging the events generated by the transformers, consumers can react in r

Together, these three layers form the foundation of ChainFlow, providing a comprehensive framework for building flexible and scalable event-driven data pipelines on EVM blockchains. Whether it's building real-time analytics, decentralized applications, or any other data-driven solution, ChainFlow's architecture offers the necessary tools and abstractions to streamline the development process and unlock the full potential of EVM blockchains.

## Examples
To gain a clearer understanding of the entire process, let's consider a straightforward example that involves indexing MATIC transfer events from the Ethereum blockchain. This [example](./example/README.md) encompasses all the layers involved, starting from producers, moving through transformers, and concluding with consumers.

## Producers

### Block Producers
Expand Down Expand Up @@ -478,7 +481,7 @@ consume(

The **Asynchronous Consumer** class is designed for scenarios where the speed of data consumption takes precedence over potential data loss. If the timely processing of events is critical and the occasional loss of some events is acceptable within defined limits, the asynchronous approach offers enhanced performance. By consuming events in a non-blocking manner, it allows for faster processing and higher throughput, albeit with a higher risk of occasional data loss.

```js
```typescript
// Import the required modules
import { AsynchronousConsumer } from "@maticnetwork/chainflow/kafka/consumer/asynchronous_consumer";
import { Coder } from "@maticnetwork/chainflow/coder/protobuf_coder";
Expand Down Expand Up @@ -626,22 +629,7 @@ consume(consumerConfig, {

```

## Examples
To gain a clearer understanding of the entire process, let's consider a straightforward example that involves indexing MATIC transfer events from the Ethereum blockchain. This [example](./example/README.md) encompasses all the layers involved, starting from producers, moving through transformers, and concluding with consumers.

## Conclusion

This horizontal architectural approach allows to handle increasing workloads by adding more resources and not like vertical where you increase the resources of existing servers. It enables the system to maintain performance and handle larger amounts of data and traffic as the demands grow.Data in Kafka is organized into topics, and producers write data to specific topics. Consumers subscribe to topics and process the data in real-time. This decoupling of producers and consumers allows for a flexible and scalable architecture, as more producers and consumers can be added without affecting the existing ones.

Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records. It is designed to handle high-throughput, real-time data feeds and provides features such as fault tolerance, scalability, and message persistence, making it well-suited for handling blockchain data.

Here's how abstracting historic blockchain data through Kafka is providing reliability and performance benefits:

1. **Separating data producers and consumers**: By using Kafka, you can separate the blockchain data producers (e.g., blockchain nodes) from the consumers (e.g., applications specific data). This allows each component to work independently and reducing the risk of data loss or any delay.
2. **Reliable message delivery**: If a consumer fails in kafka, it can resume from where it got stopped once it recovers, avoiding any data loss. This is important when working with historical blockchain data that needs to be processed accurately.
3. **Multiple consumers and data replay**: Kafka allows same topics from the producer to be consumed by multiple consumers any number of times. this helps in preventing producing of blockchain data multiple times into the kafka stream. A consumer can even replay the processing of same topic if any change in required.

## Additional Information
## Helpers

### Database

Expand Down Expand Up @@ -787,6 +775,18 @@ console.log(`Is Topic in Bloom Filter? ${isTopicInBloom}`); // true or false

```

## Conclusion

This horizontal architectural approach allows to handle increasing workloads by adding more resources and not like vertical where you increase the resources of existing servers. It enables the system to maintain performance and handle larger amounts of data and traffic as the demands grow.Data in Kafka is organized into topics, and producers write data to specific topics. Consumers subscribe to topics and process the data in real-time. This decoupling of producers and consumers allows for a flexible and scalable architecture, as more producers and consumers can be added without affecting the existing ones.

Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records. It is designed to handle high-throughput, real-time data feeds and provides features such as fault tolerance, scalability, and message persistence, making it well-suited for handling blockchain data.

Here's how abstracting historic blockchain data through Kafka is providing reliability and performance benefits:

1. **Separating data producers and consumers**: By using Kafka, you can separate the blockchain data producers (e.g., blockchain nodes) from the consumers (e.g., applications specific data). This allows each component to work independently and reducing the risk of data loss or any delay.
2. **Reliable message delivery**: If a consumer fails in kafka, it can resume from where it got stopped once it recovers, avoiding any data loss. This is important when working with historical blockchain data that needs to be processed accurately.
3. **Multiple consumers and data replay**: Kafka allows same topics from the producer to be consumed by multiple consumers any number of times. this helps in preventing producing of blockchain data multiple times into the kafka stream. A consumer can even replay the processing of same topic if any change in required.

## Building

### Requirements
Expand Down
79 changes: 0 additions & 79 deletions example/transformer/tests/matic_transfer_data_transformer.test.ts

This file was deleted.

8 changes: 0 additions & 8 deletions internal/interfaces/data_transformer_config.ts

This file was deleted.

5 changes: 0 additions & 5 deletions internal/interfaces/event_producer.ts

This file was deleted.

6 changes: 0 additions & 6 deletions internal/interfaces/event_transformer.ts

This file was deleted.

4 changes: 0 additions & 4 deletions internal/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ export * from "./common_kafka_events.js";
export * from "./config.js";
export * from "./consumer_config.js";
export * from "./consumer_queue_object.js";
export * from "./data_transformer_config.js";
export * from "./deposit.js";
export * from "./deserialised_kafka_message.js";
export * from "./event_log.js";
export * from "./event_producer.js";
export * from "./event_transformer.js";
export * from "./kafka_coder_config.js";
export * from "./logger_config.js";
export * from "./mapper.js";
Expand All @@ -35,6 +32,5 @@ export * from "./synchronous_producer.js";
export * from "./transaction_receipt.js";
export * from "./transaction.js";
export * from "./transformed_block.js";
export * from "./transformer_config.js";
export * from "./web3_transaction_receipt.js";
export * from "./web3_transaction.js";
8 changes: 0 additions & 8 deletions internal/interfaces/transformer_config.ts

This file was deleted.

35 changes: 21 additions & 14 deletions public/data_transformation/transform.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsynchronousDataTransformer } from "./asynchronous_data_transformer.js";
import { SynchronousDataTransformer } from "./synchronous_data_transformer.js";
import { ITransformerConfig } from "@internal/interfaces/transformer_config.js";
import { IEventTransformer } from "@internal/interfaces/event_transformer.js";
import { ITransformerConfig } from "../interfaces/transformer_config.js";
import { IEventTransformer } from "../interfaces/event_transformer.js";
import { KafkaError } from "@internal/errors/kafka_error.js";

/**
Expand All @@ -23,27 +23,34 @@ export function transform<T, G>(

let transformer: AsynchronousDataTransformer<T, G> | SynchronousDataTransformer<T, G> | null = null;

if (type === "asynchronous") {
//@ts-ignore
transformer = new AsynchronousDataTransformer<T, G>(consumerConfig, producerConfig);
}

if (type === "synchronous") {
//@ts-ignore
transformer = new SynchronousDataTransformer<T, G>(consumerConfig, producerConfig);
}

if (!transformer) {
throw new Error("Invalid type");
switch (type) {
case "asynchronous": {
//@ts-ignore
transformer = new AsynchronousDataTransformer<T, G>(consumerConfig, producerConfig);
break;
}

case "synchronous": {
//@ts-ignore
transformer = new SynchronousDataTransformer<T, G>(consumerConfig, producerConfig);
break;
}

default: {
throw new Error("Invalid type");
}
}

//@ts-ignore
transformer.transform = eventTransformer.transform;

//@ts-ignore
transformer.on("dataTransformer.fatalError", eventTransformer.error);

//@ts-ignore
transformer.start();

//@ts-ignore
return transformer;

}
1 change: 0 additions & 1 deletion public/interfaces/data_transformer_config.ts

This file was deleted.

6 changes: 5 additions & 1 deletion public/interfaces/event_producer.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export * from "@internal/interfaces/event_producer.js";
export interface IEventProducer<E> {
subscribe: () => Promise<void> | void
error: (value: E) => void
closed: () => void
}
7 changes: 6 additions & 1 deletion public/interfaces/event_transformer.ts
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
export * from "@internal/interfaces/event_transformer.js";
import { ITransformedBlock } from "@internal/interfaces/transformed_block.js";

export interface IEventTransformer<G, T, E> {
transform: (value: G) => Promise<ITransformedBlock<T>>
error: (error: E) => void
}
3 changes: 3 additions & 0 deletions public/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export * from "./consumer_queue_object.js";
export * from "./deposit.js";
export * from "./deserialised_kafka_message.js";
export * from "./event_log.js";
export * from "./event_producer.js";
export * from "./event_transformer.js";
export * from "./kafka_coder_config.js";
export * from "./logger_config.js";
export * from "./mapper.js";
Expand All @@ -32,5 +34,6 @@ export * from "./synchronous_producer.js";
export * from "./transaction_receipt.js";
export * from "./transaction.js";
export * from "./transformed_block.js";
export * from "./transformer_config.js";
export * from "./web3_transaction_receipt.js";
export * from "./web3_transaction.js";
9 changes: 8 additions & 1 deletion public/interfaces/transformer_config.ts
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
export * from "@internal/interfaces/transformer_config.js";
import { IConsumerConfig } from "@internal/interfaces/consumer_config.js";
import { IProducerConfig } from "@internal/interfaces/producer_config.js";

export interface ITransformerConfig {
consumerConfig: IConsumerConfig,
producerConfig: IProducerConfig
type: string,
}
Loading

0 comments on commit 3109237

Please sign in to comment.