Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

Commit

Permalink
IPFS announcer: announces batches on ipfs as parquet file (#28)
Browse files Browse the repository at this point in the history
* add announcements to payload

* park some work

* add parquet placeholders

* some placeholders

* add more logic for IPFS announcement

* cleanup

* add depdencies

* remove helia

* setup helia

* breaking

* comment our multiformats for now

* set pinning note for announcer

* cleanup

* cleanup: todo, multiformats issue

* use multiformats  0.9.9

* placeholders

* set more placeholders

* base 32

* cleanup

* fill in the blanks

* adda test

* cleanup batchAnnouncer/extract dnspConverter

* revert

* cleanup

* cleanup

* address feedback

* cleanupand fix tests

* revert

* use cache for schemas

* rename
  • Loading branch information
saraswatpuneet authored Sep 6, 2023
1 parent cbfeece commit b2db31c
Show file tree
Hide file tree
Showing 16 changed files with 700 additions and 57 deletions.
4 changes: 4 additions & 0 deletions apps/api/src/config/config.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ describe('ContentPublishingConfigService', () => {
const ALL_ENV: { [key: string]: string | undefined } = {
REDIS_URL: undefined,
FREQUENCY_URL: undefined,
IPFS_ENDPOINT: undefined,
IPFS_GATEWAY_URL: undefined,
IPFS_BASIC_AUTH_USER: undefined,
IPFS_BASIC_AUTH_SECRET: undefined,
PROVIDER_ID: undefined,
BLOCKCHAIN_SCAN_INTERVAL_MINUTES: undefined,
QUEUE_HIGH_WATER: undefined,
Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/config/env.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import { mnemonicValidate } from '@polkadot/util-crypto';
export const configModuleOptions: ConfigModuleOptions = {
isGlobal: true,
validationSchema: Joi.object({
IPFS_ENDPOINT: Joi.string().uri().required(),
IPFS_GATEWAY_URL: Joi.string().required(), // This is parse as string as the required format of this not a valid uri, check .env.template
IPFS_BASIC_AUTH_USER: Joi.string().allow('').default(''),
IPFS_BASIC_AUTH_SECRET: Joi.string().allow('').default(''),
REDIS_URL: Joi.string().uri().required(),
FREQUENCY_URL: Joi.string().uri().required(),
PROVIDER_ID: Joi.required().custom((value: string, helpers) => {
Expand Down
9 changes: 6 additions & 3 deletions apps/worker/src/batch_announcer/batch.announcer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import { RedisModule } from '@liaoliaots/nestjs-redis';
import { BatchAnnouncementService } from './batch.announcer.service';
import { ConfigModule } from '../../../api/src/config/config.module';
import { ConfigService } from '../../../api/src/config/config.service';
import { IPFSAnnouncer } from './ipfs.announcer';
import { BatchAnnouncer } from './batch.announcer';
import { QueueConstants } from '../../../../libs/common/src';
import { BlockchainModule } from '../blockchain/blockchain.module';
import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client';

@Module({
imports: [
ConfigModule,
BlockchainModule,
EventEmitterModule,
RedisModule.forRootAsync(
{
Expand Down Expand Up @@ -75,7 +78,7 @@ import { QueueConstants } from '../../../../libs/common/src';
),
],
controllers: [],
providers: [BatchAnnouncementService, IPFSAnnouncer],
exports: [BullModule, BatchAnnouncementService, IPFSAnnouncer],
providers: [BatchAnnouncementService, BatchAnnouncer, IpfsService],
exports: [BullModule, BatchAnnouncementService, BatchAnnouncer, IpfsService],
})
export class BatchAnnouncerModule {}
10 changes: 5 additions & 5 deletions apps/worker/src/batch_announcer/batch.announcer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Redis from 'ioredis';
import { SchedulerRegistry } from '@nestjs/schedule';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { ConfigService } from '../../../api/src/config/config.service';
import { IPFSAnnouncer } from './ipfs.announcer';
import { BatchAnnouncer } from './batch.announcer';
import { CAPACITY_EPOCH_TIMEOUT_NAME } from '../../../../libs/common/src/constants';
import { IBatchAnnouncerJobData } from '../interfaces/batch-announcer.job.interface';
import { QueueConstants } from '../../../../libs/common/src';
Expand All @@ -18,13 +18,11 @@ import { QueueConstants } from '../../../../libs/common/src';
export class BatchAnnouncementService extends WorkerHost implements OnApplicationBootstrap, OnModuleDestroy {
private logger: Logger;

private capacityExhausted = false;

constructor(
@InjectRedis() private cacheManager: Redis,
@InjectQueue(QueueConstants.PUBLISH_QUEUE_NAME) private publishQueue: Queue,
private configService: ConfigService,
private ipfsPublisher: IPFSAnnouncer,
private ipfsPublisher: BatchAnnouncer,
private schedulerRegistry: SchedulerRegistry,
private eventEmitter: EventEmitter2,
) {
Expand All @@ -47,7 +45,9 @@ export class BatchAnnouncementService extends WorkerHost implements OnApplicatio
async process(job: Job<IBatchAnnouncerJobData, any, string>): Promise<any> {
this.logger.log(`Processing job ${job.id} of type ${job.name}`);
try {
await this.ipfsPublisher.announce(job.data);
const publisherJob = await this.ipfsPublisher.announce(job.data);

await this.publishQueue.add(publisherJob.id, publisherJob);
this.logger.log(`Completed job ${job.id} of type ${job.name}`);
return job.data;
} catch (e) {
Expand Down
88 changes: 88 additions & 0 deletions apps/worker/src/batch_announcer/batch.announcer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { expect, describe, jest, it, beforeEach } from '@jest/globals';
import assert from 'assert';
import { FrequencyParquetSchema } from '@dsnp/frequency-schemas/types/frequency';
import Redis from 'ioredis-mock';
import { BatchAnnouncer } from './batch.announcer';

// Create a mock for the dependencies
const mockConfigService = {
getIpfsCidPlaceholder: jest.fn(),
};

const mockBlockchainService = {
getSchema: jest.fn(),
};

const mockIpfsService = {
getPinned: jest.fn(),
ipfsPin: jest.fn(),
};

describe('BatchAnnouncer', () => {
let ipfsAnnouncer: BatchAnnouncer;

const broadcast: FrequencyParquetSchema = [
{
name: 'announcementType',
column_type: {
INTEGER: {
bit_width: 32,
sign: true,
},
},
compression: 'GZIP',
bloom_filter: false,
},
{
name: 'contentHash',
column_type: 'BYTE_ARRAY',
compression: 'GZIP',
bloom_filter: true,
},
{
name: 'fromId',
column_type: {
INTEGER: {
bit_width: 64,
sign: false,
},
},
compression: 'GZIP',
bloom_filter: true,
},
{
name: 'url',
column_type: 'STRING',
compression: 'GZIP',
bloom_filter: false,
},
];
const mockClient = new Redis();

beforeEach(async () => {
ipfsAnnouncer = new BatchAnnouncer(mockClient, mockConfigService as any, mockBlockchainService as any, mockIpfsService as any);
});
it('should be defined', () => {
expect(ipfsAnnouncer).toBeDefined();
});

// Write your test cases here
it('should announce a batch to IPFS', async () => {
// Mock the necessary dependencies' behavior
mockConfigService.getIpfsCidPlaceholder.mockReturnValue('mockIpfsUrl');
mockBlockchainService.getSchema.mockReturnValue({ model: JSON.stringify(broadcast) });
mockIpfsService.getPinned.mockReturnValue(Buffer.from('mockContentBuffer'));
mockIpfsService.ipfsPin.mockReturnValue({ cid: 'mockCid', size: 'mockSize' });

const batchJob = {
batchId: 'mockBatchId',
schemaId: 123,
announcements: [],
};

const result = await ipfsAnnouncer.announce(batchJob);
assert(result);
expect(mockConfigService.getIpfsCidPlaceholder).toHaveBeenCalledWith('mockCid');
expect(mockBlockchainService.getSchema).toHaveBeenCalledWith(123);
});
});
89 changes: 89 additions & 0 deletions apps/worker/src/batch_announcer/batch.announcer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Injectable, Logger } from '@nestjs/common';
import { PassThrough } from 'node:stream';
import { ParquetWriter } from '@dsnp/parquetjs';
import { fromFrequencySchema } from '@dsnp/frequency-schemas/parquet';
import { InjectRedis } from '@liaoliaots/nestjs-redis';
import Redis from 'ioredis';
import { PalletSchemasSchema } from '@polkadot/types/lookup';
import { BlockchainService } from '../blockchain/blockchain.service';
import { ConfigService } from '../../../api/src/config/config.service';
import { IBatchAnnouncerJobData } from '../interfaces/batch-announcer.job.interface';
import { IPublisherJob } from '../interfaces/publisher-job.interface';
import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client';

@Injectable()
export class BatchAnnouncer {
private logger: Logger;

constructor(
@InjectRedis() private cacheManager: Redis,
private configService: ConfigService,
private blockchainService: BlockchainService,
private ipfsService: IpfsService,
) {
this.logger = new Logger(BatchAnnouncer.name);
}

public async announce(batchJob: IBatchAnnouncerJobData): Promise<IPublisherJob> {
this.logger.debug(`Announcing batch ${batchJob.batchId} on IPFS`);
const { batchId, schemaId, announcements } = batchJob;

let frequencySchema: PalletSchemasSchema;

const schemaCacheKey = `schema:${schemaId}`;
const cachedSchema = await this.cacheManager.get(schemaCacheKey);
if (cachedSchema) {
frequencySchema = JSON.parse(cachedSchema);
} else {
frequencySchema = await this.blockchainService.getSchema(schemaId);
await this.cacheManager.set(schemaCacheKey, JSON.stringify(frequencySchema));
}

const schema = JSON.parse(frequencySchema.model.toString());
if (!schema) {
throw new Error(`Unable to parse schema for schemaId ${schemaId}`);
}

const [parquetSchema, writerOptions] = fromFrequencySchema(schema);
const publishStream = new PassThrough();

const writer = await ParquetWriter.openStream(parquetSchema, publishStream as any, writerOptions);

announcements.forEach(async (announcement) => {
writer.appendRow(announcement);
});

await writer.close();
const buffer = await this.bufferPublishStream(publishStream);
const [cid, hash] = await this.pinStringToIPFS(buffer);
const ipfsUrl = await this.formIpfsUrl(cid);
this.logger.debug(`Batch ${batchId} published to IPFS at ${ipfsUrl}`);
this.logger.debug(`Batch ${batchId} hash: ${hash}`);
return { id: batchId, schemaId, data: { cid, payloadLength: buffer.length } };
}

private async bufferPublishStream(publishStream: PassThrough): Promise<Buffer> {
this.logger.debug('Buffering publish stream');
return new Promise((resolve, reject) => {
const buffers: Buffer[] = [];
publishStream.on('data', (data) => {
buffers.push(data);
});
publishStream.on('end', () => {
resolve(Buffer.concat(buffers));
});
publishStream.on('error', (err) => {
reject(err);
});
});
}

private async pinStringToIPFS(buf: Buffer): Promise<[string, string]> {
const { cid, size } = await this.ipfsService.ipfsPin('application/octet-stream', buf);
return [cid.toString(), size.toString()];
}

private async formIpfsUrl(cid: string): Promise<string> {
return this.configService.getIpfsCidPlaceholder(cid);
}
}
13 changes: 0 additions & 13 deletions apps/worker/src/batch_announcer/ipfs.announcer.spec.ts

This file was deleted.

16 changes: 0 additions & 16 deletions apps/worker/src/batch_announcer/ipfs.announcer.ts

This file was deleted.

9 changes: 7 additions & 2 deletions apps/worker/src/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { KeyringPair } from '@polkadot/keyring/types';
import { BlockHash, BlockNumber } from '@polkadot/types/interfaces';
import { SubmittableExtrinsic } from '@polkadot/api/types';
import { AnyNumber, ISubmittableResult } from '@polkadot/types/types';
import { u32, Option, u128 } from '@polkadot/types';
import { PalletCapacityCapacityDetails, PalletCapacityEpochInfo } from '@polkadot/types/lookup';
import { u32, Option } from '@polkadot/types';
import { PalletCapacityCapacityDetails, PalletCapacityEpochInfo, PalletSchemasSchema } from '@polkadot/types/lookup';
import { ConfigService } from '../../../api/src/config/config.service';
import { Extrinsic } from './extrinsic';

Expand Down Expand Up @@ -139,4 +139,9 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
public async capacityBatchLimit(): Promise<number> {
return this.api.consts.frequencyTxPayment.maximumCapacityBatchLength.toNumber();
}

public async getSchema(schemaId: number): Promise<PalletSchemasSchema> {
const schema: PalletSchemasSchema = await this.query('schemas', 'schemas', schemaId);
return schema;
}
}
3 changes: 3 additions & 0 deletions apps/worker/src/interfaces/batch-announcer.job.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { Announcement } from '../../../../libs/common/src/interfaces/dsnp';

export interface IBatchAnnouncerJobData {
batchId: string;
schemaId: number;
announcements: Announcement[];
}
4 changes: 4 additions & 0 deletions env.template
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Copy this file to ".env.dev" and ".env.docker.dev", and then tweak values for local development
IPFS_ENDPOINT="https://ipfs.infura.io:5001"
IPFS_BASIC_AUTH_USER="Infura Project ID Here or Blank for Kubo RPC"
IPFS_BASIC_AUTH_SECRET="Infura Secret Here or Blank for Kubo RPC"
IPFS_GATEWAY_URL="https://ipfs.io/ipfs/[CID]"
FREQUENCY_URL=ws://0.0.0.0:9944
PROVIDER_ID=1
REDIS_URL=redis://0.0.0.0:6379
Expand Down
Loading

0 comments on commit b2db31c

Please sign in to comment.