Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

Publisher Processor #14

Merged
merged 15 commits into from
Aug 29, 2023
Merged

Publisher Processor #14

merged 15 commits into from
Aug 29, 2023

Conversation

saraswatpuneet
Copy link
Collaborator

@saraswatpuneet saraswatpuneet commented Aug 24, 2023

Requirement

Content publisher is intended to process a publishQueue and post IPFS messages on frequency blockchain

Close #6

Details

  • PublishingService : processing the publishQueue
  • Publisher: the actual IPFS batch processor executing transaction for publication to frequency via a publish function that take batches
  • PublisherModule: module exporting publisher processor
  • Refactored Blockchain Module within worker and removed from api
  • Added placeholder test files for blockchain and publisher modules

Acceptance Criteria:

  • Publisher be generic over data types as IPFS type message are agnostic
  • Keep a track of capacity being used per-epoch by publisher
  • Added placeholder files for unit tests

Tests

  • set IPublishJob in publishQueue and ensure a message is registered on frequency

@saraswatpuneet saraswatpuneet marked this pull request as ready for review August 25, 2023 13:40
@saraswatpuneet saraswatpuneet changed the title [WIP] Publisher Processor Publisher Processor Aug 28, 2023
@saraswatpuneet saraswatpuneet requested a review from aramikm August 28, 2023 21:54
try {
const currrentEpoch = await this.blockchainService.getCurrentCapacityEpoch();
const [event, eventMap] = await this.blockchainService
.createExtrinsic({ pallet: 'frequencyTxPayment', extrinsic: 'payWithCapacityBatchAll' }, { eventPallet: 'utility', event: 'BatchCompleted' }, providerKeys, batch)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not in MVP 1 but we would rather like to pass a list of expected events to blockchain service to specifically catch errors instead try-catch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally sure about benefits of using batch in here since we are already batching in prior steps. I think using payWithCapacityBatchAll might complicate the exact events and errors that we might get from chain. I think since the batching is already done I wouldn't want to change it but if in future it complicates error handling we might need to revisit it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a capacity batching, basically instead of sending one IPFS message to frequency we send as many as capacity accepts in batch (10 for now) than sending one tx at a time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also currently we are only sending one message at a time, I kept it like reconnection service, in case we would want to batch more than one

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think what I might have been missing is understanding the benefits of capacity batching for our scenario. I know we can use it for transactional purposes if we want a few calls apply at the same time but that is not that relevant for us since each message is treated independently. Are there any other benefits to capacity batching besides transaction?

Copy link
Collaborator

@aramikm aramikm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Added some questions and suggestions.

apps/worker/src/publisher/publisher.module.ts Outdated Show resolved Hide resolved
private async handleCapacityExhausted() {
this.logger.debug('Received capacity.exhausted event');
this.capacityExhausted = true;
await this.publishQueue.pause();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this pause being applied in redis or is it only applied in current worker instance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its only applied to publishQueue only no other queues, I believe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that applied to the queue representation inside redis or inside workers memory? To put it another way if we have two workers does the other worker also pause it's processing of that queue or not?

apps/worker/src/publisher/publishing.service.ts Outdated Show resolved Hide resolved
@OnEvent('capacity.refilled', { async: true, promisify: true })
private async handleCapacityRefilled() {
this.logger.debug('Received capacity.refilled event');
this.capacityExhausted = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we unpause a paused queue? Is it only this flag?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is refilled event, queue is paused in capacity.exhausted and unknown.error events, the pause is put till next epoch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we find out about the next epoch? Is the scheduler used for that?

package.json Show resolved Hide resolved
@saraswatpuneet
Copy link
Collaborator Author

Merging this to unblock other stories, things that are going to be tackled in future stories

  • Error handling
  • More tests
  • Better event handling from chain

@saraswatpuneet saraswatpuneet merged commit 2b74399 into main Aug 29, 2023
@saraswatpuneet saraswatpuneet deleted the processors_publisher branch August 29, 2023 13:33
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Publisher processor
2 participants