Skip to content

Commit

Permalink
go/consensus/tendermint: Add support for state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed May 11, 2020
1 parent 1181332 commit 5629f67
Show file tree
Hide file tree
Showing 10 changed files with 806 additions and 1 deletion.
1 change: 1 addition & 0 deletions .changelog/2880.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Add support for state sync
218 changes: 218 additions & 0 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
genesis "github.com/oasislabs/oasis-core/go/genesis/api"
"github.com/oasislabs/oasis-core/go/storage/mkvs/checkpoint"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

Expand Down Expand Up @@ -877,6 +878,223 @@ func (mux *abciMux) Commit() types.ResponseCommit {
}
}

func (mux *abciMux) ListSnapshots(req types.RequestListSnapshots) types.ResponseListSnapshots {
// Get a list of all current checkpoints.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: 1,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseListSnapshots{}
}

var rsp types.ResponseListSnapshots
for _, cp := range cps {
cpHash := cp.EncodedHash()

rsp.Snapshots = append(rsp.Snapshots, &types.Snapshot{
Height: cp.Root.Version,
Format: uint32(cp.Version),
Chunks: uint32(len(cp.Chunks)),
Hash: cpHash[:],
Metadata: cbor.Marshal(cp),
})
}

return rsp
}

func (mux *abciMux) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot {
if req.Snapshot == nil {
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if req.Snapshot.Format != 1 {
mux.logger.Warn("received snapshot with unsupported version",
"version", req.Snapshot.Format,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT_FORMAT}
}

// Decode checkpoint metadata hash and sanity check against the request.
var metadataHash hash.Hash
metadataHash.FromBytes(req.Snapshot.Metadata)
var h hash.Hash
if err := h.UnmarshalBinary(req.Snapshot.Hash); err != nil {
mux.logger.Warn("received snapshot with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if !metadataHash.Equal(&h) {
mux.logger.Warn("received snapshot with mismatching hash",
"expected_hash", h,
"hash", metadataHash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Decode checkpoint metadata.
var cp checkpoint.Metadata
if err := cbor.Unmarshal(req.Snapshot.Metadata, &cp); err != nil {
mux.logger.Warn("received snapshot with malformed metadata",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Number of chunks must match.
if int(req.Snapshot.Chunks) != len(cp.Chunks) {
mux.logger.Warn("received snapshot with mismatching number of chunks",
"expected_chunks", len(cp.Chunks),
"chunks", req.Snapshot.Chunks,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
// Root hash must match.
var appHash hash.Hash
if err := appHash.UnmarshalBinary(req.AppHash); err != nil {
// NOTE: This should never happen as it indicates a problem with Tendermint.
mux.logger.Error("received request with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}
if !cp.Root.Hash.Equal(&appHash) {
mux.logger.Warn("received snapshot with mismatching root hash",
"expected_root", appHash,
"root", cp.Root.Hash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Snapshot seems correct (e.g., it is for the correct root), start the restoration process.
if err := mux.state.storage.Checkpointer().StartRestore(mux.state.ctx, &cp); err != nil {
mux.logger.Error("failed to start restore",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}

mux.logger.Info("started state restore process",
"root", cp.Root,
)

return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ACCEPT}
}

func (mux *abciMux) LoadSnapshotChunk(req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk {
// Fetch the metadata for the specified checkpoint.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: uint16(req.Format),
RootVersion: &req.Height,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
if len(cps) != 1 {
mux.logger.Error("failed to get checkpoints",
"cps", len(cps),
)
return types.ResponseLoadSnapshotChunk{}
}

// Fetch the chunk itself.
chunk, err := cps[0].GetChunkMetadata(uint64(req.Chunk))
if err != nil {
mux.logger.Error("failed to get chunk metadata",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
var buf bytes.Buffer
if err := mux.state.storage.Checkpointer().GetCheckpointChunk(mux.state.ctx, chunk, &buf); err != nil {
mux.logger.Error("failed to get chunk",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}

return types.ResponseLoadSnapshotChunk{Chunk: buf.Bytes()}
}

func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk {
cp := mux.state.storage.Checkpointer().GetCurrentCheckpoint()

mux.logger.Debug("attempting to restore a chunk",
"root", cp.Root,
"index", req.Index,
)

buf := bytes.NewBuffer(req.Chunk)
done, err := mux.state.storage.Checkpointer().RestoreChunk(mux.state.ctx, uint64(req.Index), buf)
switch {
case err == nil:
case errors.Is(err, checkpoint.ErrNoRestoreInProgress):
// This should never happen.
mux.logger.Error("ApplySnapshotChunk called without OfferSnapshot, aborting state sync")
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
case errors.Is(err, checkpoint.ErrChunkAlreadyRestored):
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
case errors.Is(err, checkpoint.ErrChunkCorrupted):
// Corrupted chunk, refetch.
mux.logger.Warn("received corrupted chunk",
"sender", req.Sender,
"index", req.Index,
"err", err,
)

return types.ResponseApplySnapshotChunk{
RefetchChunks: []uint32{req.Index},
// TODO: Consider banning the sender.
Result: types.ResponseApplySnapshotChunk_RETRY,
}
case errors.Is(err, checkpoint.ErrChunkProofVerificationFailed):
// Chunk was as specified in the manifest but did not match the reported root. In this case
// we need to abort processing the given snapshot.
mux.logger.Warn("chunk contains invalid proof, snapshot is bad",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT}
default:
// Unspecified error during restoration.
mux.logger.Error("error during chunk restoration, aborting state sync",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

// Check if we are done with the restoration. In this case, finalize the root.
if done {
err = mux.state.storage.NodeDB().Finalize(mux.state.ctx, cp.Root.Version, []hash.Hash{cp.Root.Hash})
if err != nil {
mux.logger.Error("failed to finalize restored root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

if err = mux.state.doApplyStateSync(cp.Root); err != nil {
mux.logger.Error("failed to apply state sync root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

mux.logger.Info("successfully synced state",
"root", cp.Root,
)
}

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
}

func (mux *abciMux) doCleanup() {
mux.state.doCleanup()

Expand Down
41 changes: 40 additions & 1 deletion go/consensus/tendermint/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
storage "github.com/oasislabs/oasis-core/go/storage/api"
storageDB "github.com/oasislabs/oasis-core/go/storage/database"
"github.com/oasislabs/oasis-core/go/storage/mkvs"
"github.com/oasislabs/oasis-core/go/storage/mkvs/checkpoint"
)

var _ api.ApplicationState = (*applicationState)(nil)
Expand All @@ -46,6 +47,8 @@ type applicationState struct { // nolint: maligned
prunerClosedCh chan struct{}
prunerNotifyCh *channels.RingChannel

checkpointer checkpoint.Checkpointer

blockLock sync.RWMutex
blockTime time.Time
blockCtx *api.BlockContext
Expand Down Expand Up @@ -238,6 +241,24 @@ func (s *applicationState) doInitChain(now time.Time) error {
return s.doCommitOrInitChainLocked(now)
}

func (s *applicationState) doApplyStateSync(root storage.Root) error {
s.blockLock.Lock()
defer s.blockLock.Unlock()

s.stateRoot = root

s.deliverTxTree.Close()
s.deliverTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())
s.checkTxTree.Close()
s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())

if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil {
return err
}

return nil
}

func (s *applicationState) doCommit(now time.Time) (uint64, error) {
s.blockLock.Lock()
defer s.blockLock.Unlock()
Expand All @@ -262,10 +283,12 @@ func (s *applicationState) doCommit(now time.Time) (uint64, error) {
s.checkTxTree.Close()
s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), s.stateRoot, mkvs.WithoutWriteLog())

// Notify pruner of a new block.
// Notify pruner and checkpointer of a new block.
s.prunerNotifyCh.In() <- s.stateRoot.Version
// Discover the version below which all versions can be discarded from block history.
lastRetainedVersion := s.statePruner.GetLastRetainedVersion()
// Notify the checkpointer of the new version.
s.checkpointer.NotifyNewVersion(s.stateRoot.Version)

return lastRetainedVersion, nil
}
Expand Down Expand Up @@ -430,6 +453,21 @@ func newApplicationState(ctx context.Context, cfg *ApplicationConfig) (*applicat
return nil, fmt.Errorf("state: failed to create pruner: %w", err)
}

// Initialize the checkpointer.
checkpointerCfg := checkpoint.CheckpointerConfig{
CheckInterval: 1 * time.Minute, // XXX: Make this configurable.
RootsPerVersion: 1,
Parameters: &checkpoint.CreationParameters{ // XXX: Make these configurable.
Interval: 10,
NumKept: 1,
ChunkSize: 1024 * 1024,
},
}
checkpointer, err := checkpoint.NewCheckpointer(ctx, ndb, ldb.Checkpointer(), checkpointerCfg)
if err != nil {
return nil, fmt.Errorf("state: failed to create checkpointer: %w", err)
}

var minGasPrice quantity.Quantity
if err = minGasPrice.FromInt64(int64(cfg.MinGasPrice)); err != nil {
return nil, fmt.Errorf("state: invalid minimum gas price: %w", err)
Expand All @@ -445,6 +483,7 @@ func newApplicationState(ctx context.Context, cfg *ApplicationConfig) (*applicat
checkTxTree: checkTxTree,
stateRoot: stateRoot,
storage: ldb,
checkpointer: checkpointer,
statePruner: statePruner,
prunerClosedCh: make(chan struct{}),
prunerNotifyCh: channels.NewRingChannel(1),
Expand Down
Loading

0 comments on commit 5629f67

Please sign in to comment.