Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/compute: Simplify I/O root commit #5553

Merged
merged 1 commit into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/5553.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/compute: Simplify I/O root commit

This also avoids an intermediate committed IO root which complicates the
required database layout.
182 changes: 12 additions & 170 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (n *Node) transitionStateToProcessingFailure(
proposal: proposal,
rank: rank,
computed: nil,
raw: nil,
}
}

Expand Down Expand Up @@ -436,21 +435,6 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) {
}()
}

func (n *Node) storeTransactions(ctx context.Context, blk *block.Block, inputWriteLog storage.WriteLog, inputRoot hash.Hash) error {
var emptyRoot hash.Hash
emptyRoot.Empty()

return n.storage.Apply(ctx, &storage.ApplyRequest{
Namespace: blk.Header.Namespace,
RootType: storage.RootTypeIO,
SrcRound: blk.Header.Round + 1,
SrcRoot: emptyRoot,
DstRound: blk.Header.Round + 1,
DstRoot: inputRoot,
WriteLog: inputWriteLog,
})
}

func (n *Node) publishProposal(ctx context.Context, proposal *commitment.Proposal) error {
if err := proposal.Sign(n.commonNode.Identity.NodeSigner, n.commonNode.Runtime.ID()); err != nil {
return fmt.Errorf("failed to sign proposal header: %w", err)
Expand Down Expand Up @@ -528,115 +512,6 @@ func (n *Node) startSchedulingBatch(ctx context.Context, batch []*txpool.TxQueue
}
}

func (n *Node) startLocalStorageReplication(
ctx context.Context,
blk *block.Block,
ioRootHash hash.Hash,
batch transaction.RawBatch,
) <-chan error {
ch := make(chan error, 1)

ioRoot := storage.Root{
Namespace: blk.Header.Namespace,
Version: blk.Header.Round + 1,
Type: storage.RootTypeIO,
Hash: ioRootHash,
}

// If we have a local storage node, replicate batch locally so we will be able to Apply
// locally later when proposing a batch. This also avoids needless replication for things
// that we already have.
replicateIO := make(chan error)
go func() {
defer close(replicateIO)

// Check if the root is already present as in this case no replication is needed.
if n.storage.NodeDB().HasRoot(ioRoot) {
replicateIO <- nil
return
}

n.logger.Debug("replicating I/O root locally",
"io_root", ioRoot,
)

emptyRoot := ioRoot
emptyRoot.Hash.Empty()

ioTree := transaction.NewTree(nil, emptyRoot)
defer ioTree.Close()

for idx, tx := range batch {
if err := ioTree.AddTransaction(ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
}

ioWriteLog, ioRootHashCheck, err := ioTree.Commit(ctx)
if err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
if !ioRootHashCheck.Equal(&ioRootHash) {
n.logger.Error("inconsistent I/O root",
"io_root_hash", ioRootHashCheck,
"expected", ioRootHash,
)
replicateIO <- fmt.Errorf("inconsistent I/O root")
return
}

err = n.storage.Apply(ctx, &storage.ApplyRequest{
Namespace: ioRoot.Namespace,
RootType: ioRoot.Type,
SrcRound: ioRoot.Version,
SrcRoot: emptyRoot.Hash,
DstRound: ioRoot.Version,
DstRoot: ioRoot.Hash,
WriteLog: ioWriteLog,
})
if err != nil {
n.logger.Error("failed to apply I/O tree locally",
"err", err,
)
replicateIO <- err
return
}

replicateIO <- nil
}()

// Wait for replication to complete.
go func() {
defer close(ch)

var combinedErr error
select {
case <-ctx.Done():
combinedErr = ctx.Err()
case err := <-replicateIO:
if err != nil {
combinedErr = fmt.Errorf("failed to replicate I/O root: %w", err)
}
}

n.logger.Debug("local storage replication done",
"io_root", ioRoot,
)

ch <- combinedErr
}()

return ch
}

func (n *Node) runtimeExecuteTxBatch(
ctx context.Context,
rt host.RichRuntime,
Expand Down Expand Up @@ -747,9 +622,6 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr
"batch_size", len(batch),
)

// Optionally start local storage replication in parallel to batch dispatch.
replicateCh := n.startLocalStorageReplication(ctx, n.blockInfo.RuntimeBlock, proposal.Header.BatchHash, batch)

// Ask the runtime to execute the batch.
rsp, err := n.runtimeExecuteTxBatch(
ctx,
Expand All @@ -770,26 +642,12 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr
return
}

// Wait for replication to complete before proposing a batch to ensure that we can cleanly
// apply any updates.
select {
case <-ctx.Done():
return
case err = <-replicateCh:
if err != nil {
n.logger.Error("local storage replication failed",
"err", err,
)
return
}
}

// Submit response to the round worker.
n.processedBatchCh <- &processedBatch{
proposal: proposal,
rank: rank,
computed: &rsp.Batch,
raw: batch,
proposal: proposal,
rank: rank,
computed: &rsp.Batch,
txInputWriteLog: rsp.TxInputWriteLog,
}
}

Expand Down Expand Up @@ -822,7 +680,7 @@ func (n *Node) proposeBatch(
n.logger.Debug("proposing batch",
"scheduler_id", processed.proposal.NodeID,
"node_id", n.commonNode.Identity.NodeSigner.Public(),
"batch_size", len(processed.raw),
"batch_size", len(processed.proposal.Batch),
"io_root", *batch.Header.IORoot,
"state_root", *batch.Header.StateRoot,
"messages_hash", *batch.Header.MessagesHash,
Expand All @@ -845,8 +703,6 @@ func (n *Node) proposeBatch(
ec.Messages = batch.Messages
}

inputRoot := processed.proposal.Header.BatchHash

// Commit I/O and state write logs to storage.
storageErr := func() error {
start := time.Now()
Expand All @@ -858,14 +714,17 @@ func (n *Node) proposeBatch(
defer cancel()

// Store final I/O root.
var emptyRoot hash.Hash
emptyRoot.Empty()

err := n.storage.Apply(ctx, &storage.ApplyRequest{
Namespace: lastHeader.Namespace,
RootType: storage.RootTypeIO,
SrcRound: lastHeader.Round + 1,
SrcRoot: inputRoot,
SrcRoot: emptyRoot,
DstRound: lastHeader.Round + 1,
DstRoot: *batch.Header.IORoot,
WriteLog: batch.IOWriteLog,
WriteLog: append(processed.txInputWriteLog, batch.IOWriteLog...),
})
if err != nil {
return err
Expand Down Expand Up @@ -926,20 +785,10 @@ func (n *Node) proposeBatch(
return
}

// Due to backwards compatibility with runtimes that don't provide transaction hashes as output
// we need to manually compute them here.
txHashes := processed.proposal.Batch
if len(processed.raw) > 0 && len(txHashes) == 0 {
txHashes = make([]hash.Hash, 0, len(processed.raw))
for _, tx := range processed.raw {
txHashes = append(txHashes, hash.NewFromBytes(tx))
}
}

n.proposedBatch = &proposedBatch{
batchStartTime: state.batchStartTime,
proposedIORoot: *ec.Header.Header.IORoot,
txHashes: txHashes,
txHashes: processed.proposal.Batch,
}

n.transitionState(StateWaitingForBatch{})
Expand Down Expand Up @@ -1232,16 +1081,9 @@ func (n *Node) handleProcessedBatch(ctx context.Context, batch *processedBatch)
"input_root", batch.proposal.Header.BatchHash,
"tx_hashes", batch.proposal.Batch,
)
err := n.storeTransactions(ctx, n.blockInfo.RuntimeBlock, batch.txInputWriteLog, batch.proposal.Header.BatchHash)
if err != nil {
n.logger.Error("failed to store transaction",
"err", err,
)
return
}

// Sign and submit the proposal to P2P network.
err = n.publishProposal(ctx, batch.proposal)
err := n.publishProposal(ctx, batch.proposal)
if err != nil {
n.logger.Error("failed to sign and publish proposal",
"err", err,
Expand Down
1 change: 0 additions & 1 deletion go/worker/compute/executor/committee/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ type processedBatch struct {
rank uint64

computed *protocol.ComputedBatch
raw transaction.RawBatch

txInputWriteLog storage.WriteLog
}
Expand Down
Loading