From 8ae35970811f29846443f9549c5d416a6a6876d4 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 23 Aug 2023 10:46:06 +0200 Subject: [PATCH] perf: flush import batches in parallel (backport #793) (#820) Co-authored-by: Elias Naur <103319121+elias-orijtech@users.noreply.github.com> --- import.go | 22 ++++++++++++++++++++-- import_test.go | 12 ++++++++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/import.go b/import.go index fe9cb6bab..44802170f 100644 --- a/import.go +++ b/import.go @@ -28,6 +28,9 @@ type Importer struct { batchSize uint32 stack []*Node nonces []uint32 + + // inflightCommit tracks a batch commit, if any. + inflightCommit <-chan error } // newImporter creates a new Importer for an empty MutableTree. @@ -78,10 +81,21 @@ func (i *Importer) writeNode(node *Node) error { i.batchSize++ if i.batchSize >= maxBatchSize { - if err := i.batch.Write(); err != nil { + // Wait for previous batch. + var err error + if i.inflightCommit != nil { + err = <-i.inflightCommit + i.inflightCommit = nil + } + if err != nil { return err } - i.batch.Close() + result := make(chan error) + i.inflightCommit = result + go func(batch db.Batch) { + defer batch.Close() + result <- batch.Write() + }(i.batch) i.batch = i.tree.ndb.db.NewBatch() i.batchSize = 0 } @@ -92,6 +106,10 @@ func (i *Importer) writeNode(node *Node) error { // Close frees all resources. It is safe to call multiple times. Uncommitted nodes may already have // been flushed to the database, but will not be visible. func (i *Importer) Close() { + if i.inflightCommit != nil { + <-i.inflightCommit + i.inflightCommit = nil + } if i.batch != nil { i.batch.Close() } diff --git a/import_test.go b/import_test.go index a3dc4071f..c027b99cc 100644 --- a/import_test.go +++ b/import_test.go @@ -233,9 +233,17 @@ func TestImporter_Commit_Empty(t *testing.T) { } func BenchmarkImport(b *testing.B) { + benchmarkImport(b, 4096) +} + +func BenchmarkImportBatch(b *testing.B) { + benchmarkImport(b, maxBatchSize*10) +} + +func benchmarkImport(b *testing.B, nodes int) { b.StopTimer() - tree := setupExportTreeSized(b, 4096) - exported := make([]*ExportNode, 0, 4096) + tree := setupExportTreeSized(b, nodes) + exported := make([]*ExportNode, 0, nodes) exporter, err := tree.Export() require.NoError(b, err) for {