Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: iterable pinning #231

Merged
merged 25 commits into from
Jan 7, 2024
Merged

feat!: iterable pinning #231

merged 25 commits into from
Jan 7, 2024

Conversation

saul-jb
Copy link
Contributor

@saul-jb saul-jb commented Aug 14, 2023

This PR reworks the pinning to be async generators instead of promises with support for batching, granting users more fine-grained control of the pinning/downloading of blocks.

As expected using a higher batch number gives much better performance than the old pinning system since its doing more fetching in parallel. A batch number of 1 will replicate the old system of a simple queue.

This is a basic example of throttling the pins download, more advanced systems can be made to control how many large pins can be downloaded at once with various speeds by calling next on the iterables.

// Pin the first 10 blocks of the CID.
for await (const cids of helia.pins.add(cid, { batch: 10 }) {
  // Wait for a second before pinning the next 10 blocks.
  await new Promise(resolve => setTimeout(resolve, 1000)
}

// The CID is now pinned.

Another example, altering the batch size:

const itr = helia.pins.add(cid, { batch: 8 })

// Pin the first 8 blocks.
await itr.next()

// Pin the next 5 blocks.
await itr.next(5)

Adding pins:

// Before:
await helia.pins.add(cid)

// After:
await all(helia.pins.add(cid))

Removing pins:

// Before:
await helia.pins.rm(cid)

// After:
await all(helia.pins.rm(cid))

Getting the pinned value:

// Before:
const pin = await helia.pins.add(cid)

// After:
const itr = helia.pins.add(cid)

let next = await itr.next()

while (!next.done) {
  next = await itr.next()
}

const pin = next.value

@saul-jb saul-jb marked this pull request as ready for review August 14, 2023 01:54
@saul-jb saul-jb requested a review from a team as a code owner August 14, 2023 01:54
@saul-jb
Copy link
Contributor Author

saul-jb commented Aug 14, 2023

It might be desirable to change the default behavior of the first iteration call to allow it to pull all the local blocks without a batch limit. This would allow pins to quickly catch up to where they were if the program was interrupted without having to await the next iteration call, giving the user an easy way to have 'resume download' type behavior.

Something like this:

const itr = helia.pins.add(cid, { skipLocal: true })

// Fetches all the blocks that are in the local blockstore not limited to batch numbers:
const localCids = await itr.next()

// Fetches the next blocks from the network according to the batch number:
const downloadedCids = await itr.next()

@achingbrain
Copy link
Member

This is interesting, but I wonder if we couldn't simplify the internal implementation by just having it be a generator that pins blocks as fast as possible, then if the user wants to batch them up (or parallelise, or parallel batch) they can use existing modules like it-batch, it-parallel, it-parallel-batch or a combination of things from streaming-iterables?

That would give them a lot more control over the performance characteristics of pinning, and we'll have a simpler codebase to maintain.

@saul-jb
Copy link
Contributor Author

saul-jb commented Aug 16, 2023

That would give them a lot more control over the performance characteristics of pinning, and we'll have a simpler codebase to maintain.

I agree and have updated the code.

Adding pins:

// Before:
await helia.pins.add(cid)

// After:
await all(parallel(helia.pins.add(cid), { concurrency: 1 }))

Removing pins:

// Before:
await helia.pins.rm(cid)

// After:
await all(parallel(helia.pins.rm(cid), { concurrency: 1 }))

Getting the pinned value:

// Before:
const pin = await helia.pins.add(cid)

// After:
const itr = helia.pins.add(cid)

let output = await itr.next()

while (output.done === false) {
  await output.value()
  output = await itr.next()
}

const pin = next.value

One thing this is still missing that I would like to see is being able to continue a pin where it left off or catch up without rate limiting pinning of local blocks. The simple use case that illustrates this would be the rate limited pinning of a large DAG and the program crashes part way and you want it to continue where it left off.

Do you think a skipLocal option is the best way to implement this or is there a better way?

@achingbrain achingbrain mentioned this pull request Dec 5, 2023
3 tasks
@SgtPooki SgtPooki added this to the v3 milestone Dec 14, 2023
SgtPooki
SgtPooki previously approved these changes Dec 14, 2023
Copy link
Member

@SgtPooki SgtPooki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review, plus approving prior code. Should be looked at again by @saul-jb and @achingbrain

Comment on lines +38 to +47
/**
* Callback for updating a {@link DatastorePinnedBlock}'s properties when
* calling `#updatePinnedBlock`
*
* The callback should return `false` to prevent any pinning modifications to
* the block, and true in all other cases.
*/
interface WithPinnedBlockCallback {
(pinnedBlock: DatastorePinnedBlock): boolean
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achingbrain I pulled this out into a locally defined interface and added some context

await this.#updatePinnedBlock(cid, (pinnedBlock: DatastorePinnedBlock) => {
// do not update pinned block if this block is already pinned by this CID
if (pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null) {
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: we return false to say that we do not want to process the pinning of this content any further.

})
pinnedBlock.pinCount++
pinnedBlock.pinnedBy.push(cid.bytes)
return true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be returning true by default inside a WithPinnedBlockCallback

Comment on lines 188 to 191
const shouldContinue = withPinnedBlock(pinnedBlock)
if (!shouldContinue) {
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achingbrain @saul-jb this is how I've implemented the skipLocal option that was mentioned. We already had the logic to skip content if it was pinned, we just needed a way to exit early.

All tests pass, but I'm not sure of any implications of returning prior to checking for pinnedBlock.pinCount === 0. However, since only .add is returning false when pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null, and everything else returns true, I believe this should continue working as it had previously.

@@ -85,4 +87,35 @@ describe('pins (recursive)', () => {
}
}
})

it('can resume an interrupted pinning operation', async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test to simulate an interrupted pinning event by only iterating over it partially, and then attempting to pin from the root again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could optimize this further, but a worthy optimization would likely require a drastic change to the pinning response and input types.

We could try to prevent calls to #updatePinnedBlock entirely by asking users to provide cids to skip (which would have been returned in the first .add call). But pinnedBy is checking the block content explicitly with uint8ArrayEquals, not just CID strings, so I assume this would cause more work in cases where we're not resuming.

Open to ideas

Copy link
Contributor

@whizzzkid whizzzkid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few thoughts

depth
})
})
for await (const promise of this.#walkDag(cid, depth, options)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling this promise sounds a bit weird.

const dagWalker = this.dagWalkers[cid.code]
const enqueue = (cid: CID, depth: number): void => {
queue.push(async () => {
const promise = Promise.resolve().then(async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I follow, why does this need to be Promise.resolve().then(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is just another way to write ;(async () => {})() without the need for a semi colon.

Comment on lines 156 to 166
while (queue.length + promises.length !== 0) {
const func = queue.shift()

if (func == null) {
await promises.shift()

continue
}

yield func
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is quite loaded, having an explanation here would help. Under what circumstances would func be null? is it when the queue is exhausted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when the queue is exhausted but there are still promises running we wait until the next one has resolved before trying to pull more from the queue.

@SgtPooki SgtPooki dismissed their stale review December 14, 2023 17:42

Performance regression. See #346

@SgtPooki
Copy link
Member

SgtPooki commented Dec 14, 2023

dismissed my review and pointed to regression seen when updating benchmarks/gc in #346

Some tasks for this PR:

  • remove "resumable pinning" changes (very slight changes, so not hard work)
  • address performance regression

@achingbrain
Copy link
Member

I'm going to merge this so we have the API changes in and the v3 release is unblocked, we can address the performance characteristics in a follow-up.

The existing "resumable pinning" change is useful though it's more "restartable pinning" than "resumable" - I'll open another issue to discuss.

@achingbrain achingbrain merged commit c15c774 into ipfs:main Jan 7, 2024
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Status: 🎉 Done
Development

Successfully merging this pull request may close these issues.

4 participants