diff --git a/.changelog/5553.internal.md b/.changelog/5553.internal.md new file mode 100644 index 00000000000..782c6fa2dc8 --- /dev/null +++ b/.changelog/5553.internal.md @@ -0,0 +1,4 @@ +go/worker/compute: Simplify I/O root commit + +This also avoids an intermediate committed IO root which complicates the +required database layout. diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 2083414cc8a..e1c23279bf5 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -238,7 +238,6 @@ func (n *Node) transitionStateToProcessingFailure( proposal: proposal, rank: rank, computed: nil, - raw: nil, } } @@ -436,21 +435,6 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { }() } -func (n *Node) storeTransactions(ctx context.Context, blk *block.Block, inputWriteLog storage.WriteLog, inputRoot hash.Hash) error { - var emptyRoot hash.Hash - emptyRoot.Empty() - - return n.storage.Apply(ctx, &storage.ApplyRequest{ - Namespace: blk.Header.Namespace, - RootType: storage.RootTypeIO, - SrcRound: blk.Header.Round + 1, - SrcRoot: emptyRoot, - DstRound: blk.Header.Round + 1, - DstRoot: inputRoot, - WriteLog: inputWriteLog, - }) -} - func (n *Node) publishProposal(ctx context.Context, proposal *commitment.Proposal) error { if err := proposal.Sign(n.commonNode.Identity.NodeSigner, n.commonNode.Runtime.ID()); err != nil { return fmt.Errorf("failed to sign proposal header: %w", err) @@ -528,115 +512,6 @@ func (n *Node) startSchedulingBatch(ctx context.Context, batch []*txpool.TxQueue } } -func (n *Node) startLocalStorageReplication( - ctx context.Context, - blk *block.Block, - ioRootHash hash.Hash, - batch transaction.RawBatch, -) <-chan error { - ch := make(chan error, 1) - - ioRoot := storage.Root{ - Namespace: blk.Header.Namespace, - Version: blk.Header.Round + 1, - Type: storage.RootTypeIO, - Hash: ioRootHash, - } - - // If we have a local storage node, replicate batch locally so we will be able to Apply - // locally later when proposing a batch. This also avoids needless replication for things - // that we already have. - replicateIO := make(chan error) - go func() { - defer close(replicateIO) - - // Check if the root is already present as in this case no replication is needed. - if n.storage.NodeDB().HasRoot(ioRoot) { - replicateIO <- nil - return - } - - n.logger.Debug("replicating I/O root locally", - "io_root", ioRoot, - ) - - emptyRoot := ioRoot - emptyRoot.Hash.Empty() - - ioTree := transaction.NewTree(nil, emptyRoot) - defer ioTree.Close() - - for idx, tx := range batch { - if err := ioTree.AddTransaction(ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil { - n.logger.Error("failed to create I/O tree", - "err", err, - ) - replicateIO <- err - return - } - } - - ioWriteLog, ioRootHashCheck, err := ioTree.Commit(ctx) - if err != nil { - n.logger.Error("failed to create I/O tree", - "err", err, - ) - replicateIO <- err - return - } - if !ioRootHashCheck.Equal(&ioRootHash) { - n.logger.Error("inconsistent I/O root", - "io_root_hash", ioRootHashCheck, - "expected", ioRootHash, - ) - replicateIO <- fmt.Errorf("inconsistent I/O root") - return - } - - err = n.storage.Apply(ctx, &storage.ApplyRequest{ - Namespace: ioRoot.Namespace, - RootType: ioRoot.Type, - SrcRound: ioRoot.Version, - SrcRoot: emptyRoot.Hash, - DstRound: ioRoot.Version, - DstRoot: ioRoot.Hash, - WriteLog: ioWriteLog, - }) - if err != nil { - n.logger.Error("failed to apply I/O tree locally", - "err", err, - ) - replicateIO <- err - return - } - - replicateIO <- nil - }() - - // Wait for replication to complete. - go func() { - defer close(ch) - - var combinedErr error - select { - case <-ctx.Done(): - combinedErr = ctx.Err() - case err := <-replicateIO: - if err != nil { - combinedErr = fmt.Errorf("failed to replicate I/O root: %w", err) - } - } - - n.logger.Debug("local storage replication done", - "io_root", ioRoot, - ) - - ch <- combinedErr - }() - - return ch -} - func (n *Node) runtimeExecuteTxBatch( ctx context.Context, rt host.RichRuntime, @@ -747,9 +622,6 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr "batch_size", len(batch), ) - // Optionally start local storage replication in parallel to batch dispatch. - replicateCh := n.startLocalStorageReplication(ctx, n.blockInfo.RuntimeBlock, proposal.Header.BatchHash, batch) - // Ask the runtime to execute the batch. rsp, err := n.runtimeExecuteTxBatch( ctx, @@ -770,26 +642,12 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr return } - // Wait for replication to complete before proposing a batch to ensure that we can cleanly - // apply any updates. - select { - case <-ctx.Done(): - return - case err = <-replicateCh: - if err != nil { - n.logger.Error("local storage replication failed", - "err", err, - ) - return - } - } - // Submit response to the round worker. n.processedBatchCh <- &processedBatch{ - proposal: proposal, - rank: rank, - computed: &rsp.Batch, - raw: batch, + proposal: proposal, + rank: rank, + computed: &rsp.Batch, + txInputWriteLog: rsp.TxInputWriteLog, } } @@ -822,7 +680,7 @@ func (n *Node) proposeBatch( n.logger.Debug("proposing batch", "scheduler_id", processed.proposal.NodeID, "node_id", n.commonNode.Identity.NodeSigner.Public(), - "batch_size", len(processed.raw), + "batch_size", len(processed.proposal.Batch), "io_root", *batch.Header.IORoot, "state_root", *batch.Header.StateRoot, "messages_hash", *batch.Header.MessagesHash, @@ -845,8 +703,6 @@ func (n *Node) proposeBatch( ec.Messages = batch.Messages } - inputRoot := processed.proposal.Header.BatchHash - // Commit I/O and state write logs to storage. storageErr := func() error { start := time.Now() @@ -858,14 +714,17 @@ func (n *Node) proposeBatch( defer cancel() // Store final I/O root. + var emptyRoot hash.Hash + emptyRoot.Empty() + err := n.storage.Apply(ctx, &storage.ApplyRequest{ Namespace: lastHeader.Namespace, RootType: storage.RootTypeIO, SrcRound: lastHeader.Round + 1, - SrcRoot: inputRoot, + SrcRoot: emptyRoot, DstRound: lastHeader.Round + 1, DstRoot: *batch.Header.IORoot, - WriteLog: batch.IOWriteLog, + WriteLog: append(processed.txInputWriteLog, batch.IOWriteLog...), }) if err != nil { return err @@ -926,20 +785,10 @@ func (n *Node) proposeBatch( return } - // Due to backwards compatibility with runtimes that don't provide transaction hashes as output - // we need to manually compute them here. - txHashes := processed.proposal.Batch - if len(processed.raw) > 0 && len(txHashes) == 0 { - txHashes = make([]hash.Hash, 0, len(processed.raw)) - for _, tx := range processed.raw { - txHashes = append(txHashes, hash.NewFromBytes(tx)) - } - } - n.proposedBatch = &proposedBatch{ batchStartTime: state.batchStartTime, proposedIORoot: *ec.Header.Header.IORoot, - txHashes: txHashes, + txHashes: processed.proposal.Batch, } n.transitionState(StateWaitingForBatch{}) @@ -1232,16 +1081,9 @@ func (n *Node) handleProcessedBatch(ctx context.Context, batch *processedBatch) "input_root", batch.proposal.Header.BatchHash, "tx_hashes", batch.proposal.Batch, ) - err := n.storeTransactions(ctx, n.blockInfo.RuntimeBlock, batch.txInputWriteLog, batch.proposal.Header.BatchHash) - if err != nil { - n.logger.Error("failed to store transaction", - "err", err, - ) - return - } // Sign and submit the proposal to P2P network. - err = n.publishProposal(ctx, batch.proposal) + err := n.publishProposal(ctx, batch.proposal) if err != nil { n.logger.Error("failed to sign and publish proposal", "err", err, diff --git a/go/worker/compute/executor/committee/state.go b/go/worker/compute/executor/committee/state.go index 85bbbd50b16..0c6dd5630bc 100644 --- a/go/worker/compute/executor/committee/state.go +++ b/go/worker/compute/executor/committee/state.go @@ -167,7 +167,6 @@ type processedBatch struct { rank uint64 computed *protocol.ComputedBatch - raw transaction.RawBatch txInputWriteLog storage.WriteLog }