Skip to content

Commit

Permalink
go/consensus/tendermint: Add support for state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Aug 4, 2020
1 parent d9b6c61 commit b3912eb
Show file tree
Hide file tree
Showing 13 changed files with 904 additions and 10 deletions.
1 change: 1 addition & 0 deletions .changelog/2880.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Add support for state sync
3 changes: 2 additions & 1 deletion go/consensus/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
)

// Genesis contains various consensus config flags that should be part of the genesis state.
Expand Down Expand Up @@ -53,7 +54,7 @@ func (g *Genesis) SanityCheck() error {
return fmt.Errorf("consensus: sanity check failed: timeout commit must be >= 1ms")
}

if params.StateCheckpointInterval > 0 {
if params.StateCheckpointInterval > 0 && !flags.DebugDontBlameOasis() {
if params.StateCheckpointInterval < 1000 {
return fmt.Errorf("consensus: sanity check failed: state checkpoint interval must be >= 1000")
}
Expand Down
218 changes: 218 additions & 0 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
abciState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/abci/state"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
)

Expand Down Expand Up @@ -797,6 +798,223 @@ func (mux *abciMux) Commit() types.ResponseCommit {
}
}

func (mux *abciMux) ListSnapshots(req types.RequestListSnapshots) types.ResponseListSnapshots {
// Get a list of all current checkpoints.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: 1,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseListSnapshots{}
}

var rsp types.ResponseListSnapshots
for _, cp := range cps {
cpHash := cp.EncodedHash()

rsp.Snapshots = append(rsp.Snapshots, &types.Snapshot{
Height: cp.Root.Version,
Format: uint32(cp.Version),
Chunks: uint32(len(cp.Chunks)),
Hash: cpHash[:],
Metadata: cbor.Marshal(cp),
})
}

return rsp
}

func (mux *abciMux) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot {
if req.Snapshot == nil {
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if req.Snapshot.Format != 1 {
mux.logger.Warn("received snapshot with unsupported version",
"version", req.Snapshot.Format,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT_FORMAT}
}

// Decode checkpoint metadata hash and sanity check against the request.
var metadataHash hash.Hash
metadataHash.FromBytes(req.Snapshot.Metadata)
var h hash.Hash
if err := h.UnmarshalBinary(req.Snapshot.Hash); err != nil {
mux.logger.Warn("received snapshot with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if !metadataHash.Equal(&h) {
mux.logger.Warn("received snapshot with mismatching hash",
"expected_hash", h,
"hash", metadataHash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Decode checkpoint metadata.
var cp checkpoint.Metadata
if err := cbor.Unmarshal(req.Snapshot.Metadata, &cp); err != nil {
mux.logger.Warn("received snapshot with malformed metadata",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Number of chunks must match.
if int(req.Snapshot.Chunks) != len(cp.Chunks) {
mux.logger.Warn("received snapshot with mismatching number of chunks",
"expected_chunks", len(cp.Chunks),
"chunks", req.Snapshot.Chunks,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
// Root hash must match.
var appHash hash.Hash
if err := appHash.UnmarshalBinary(req.AppHash); err != nil {
// NOTE: This should never happen as it indicates a problem with Tendermint.
mux.logger.Error("received request with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}
if !cp.Root.Hash.Equal(&appHash) {
mux.logger.Warn("received snapshot with mismatching root hash",
"expected_root", appHash,
"root", cp.Root.Hash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Snapshot seems correct (e.g., it is for the correct root), start the restoration process.
if err := mux.state.storage.Checkpointer().StartRestore(mux.state.ctx, &cp); err != nil {
mux.logger.Error("failed to start restore",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}

mux.logger.Info("started state restore process",
"root", cp.Root,
)

return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ACCEPT}
}

func (mux *abciMux) LoadSnapshotChunk(req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk {
// Fetch the metadata for the specified checkpoint.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: uint16(req.Format),
RootVersion: &req.Height,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
if len(cps) != 1 {
mux.logger.Error("failed to get checkpoints",
"cps", len(cps),
)
return types.ResponseLoadSnapshotChunk{}
}

// Fetch the chunk itself.
chunk, err := cps[0].GetChunkMetadata(uint64(req.Chunk))
if err != nil {
mux.logger.Error("failed to get chunk metadata",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
var buf bytes.Buffer
if err := mux.state.storage.Checkpointer().GetCheckpointChunk(mux.state.ctx, chunk, &buf); err != nil {
mux.logger.Error("failed to get chunk",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}

return types.ResponseLoadSnapshotChunk{Chunk: buf.Bytes()}
}

func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk {
cp := mux.state.storage.Checkpointer().GetCurrentCheckpoint()

mux.logger.Debug("attempting to restore a chunk",
"root", cp.Root,
"index", req.Index,
)

buf := bytes.NewBuffer(req.Chunk)
done, err := mux.state.storage.Checkpointer().RestoreChunk(mux.state.ctx, uint64(req.Index), buf)
switch {
case err == nil:
case errors.Is(err, checkpoint.ErrNoRestoreInProgress):
// This should never happen.
mux.logger.Error("ApplySnapshotChunk called without OfferSnapshot, aborting state sync")
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
case errors.Is(err, checkpoint.ErrChunkAlreadyRestored):
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
case errors.Is(err, checkpoint.ErrChunkCorrupted):
// Corrupted chunk, refetch.
mux.logger.Warn("received corrupted chunk",
"sender", req.Sender,
"index", req.Index,
"err", err,
)

return types.ResponseApplySnapshotChunk{
RefetchChunks: []uint32{req.Index},
// TODO: Consider banning the sender.
Result: types.ResponseApplySnapshotChunk_RETRY,
}
case errors.Is(err, checkpoint.ErrChunkProofVerificationFailed):
// Chunk was as specified in the manifest but did not match the reported root. In this case
// we need to abort processing the given snapshot.
mux.logger.Warn("chunk contains invalid proof, snapshot is bad",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT}
default:
// Unspecified error during restoration.
mux.logger.Error("error during chunk restoration, aborting state sync",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

// Check if we are done with the restoration. In this case, finalize the root.
if done {
err = mux.state.storage.NodeDB().Finalize(mux.state.ctx, cp.Root.Version, []hash.Hash{cp.Root.Hash})
if err != nil {
mux.logger.Error("failed to finalize restored root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

if err = mux.state.doApplyStateSync(cp.Root); err != nil {
mux.logger.Error("failed to apply state sync root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

mux.logger.Info("successfully synced state",
"root", cp.Root,
)
}

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
}

func (mux *abciMux) doCleanup() {
mux.state.doCleanup()

Expand Down
18 changes: 18 additions & 0 deletions go/consensus/tendermint/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,24 @@ func (s *applicationState) doInitChain(now time.Time) error {
return s.doCommitOrInitChainLocked(now)
}

func (s *applicationState) doApplyStateSync(root storage.Root) error {
s.blockLock.Lock()
defer s.blockLock.Unlock()

s.stateRoot = root

s.deliverTxTree.Close()
s.deliverTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())
s.checkTxTree.Close()
s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())

if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil {
return err
}

return nil
}

func (s *applicationState) doCommit(now time.Time) (uint64, error) {
s.blockLock.Lock()
defer s.blockLock.Unlock()
Expand Down
57 changes: 57 additions & 0 deletions go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
tmabcitypes "github.com/tendermint/tendermint/abci/types"
tmconfig "github.com/tendermint/tendermint/config"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmlight "github.com/tendermint/tendermint/light"
tmmempool "github.com/tendermint/tendermint/mempool"
tmnode "github.com/tendermint/tendermint/node"
tmp2p "github.com/tendermint/tendermint/p2p"
tmproxy "github.com/tendermint/tendermint/proxy"
tmcli "github.com/tendermint/tendermint/rpc/client/local"
tmrpctypes "github.com/tendermint/tendermint/rpc/core/types"
tmstate "github.com/tendermint/tendermint/state"
tmstatesync "github.com/tendermint/tendermint/statesync"
tmtypes "github.com/tendermint/tendermint/types"
tmdb "github.com/tendermint/tm-db"

Expand Down Expand Up @@ -55,6 +57,7 @@ import (
tmepochtime "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/epochtime"
tmepochtimemock "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/epochtime_mock"
tmkeymanager "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/keymanager"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/light"
tmregistry "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/registry"
tmroothash "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/roothash"
tmscheduler "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/scheduler"
Expand Down Expand Up @@ -122,6 +125,18 @@ const (
CfgSupplementarySanityEnabled = "consensus.tendermint.supplementarysanity.enabled"
// CfgSupplementarySanityInterval configures the supplementary sanity check interval.
CfgSupplementarySanityInterval = "consensus.tendermint.supplementarysanity.interval"

// CfgConsensusStateSyncEnabled enabled consensus state sync.
CfgConsensusStateSyncEnabled = "consensus.tendermint.state_sync.enabled"
// CfgConsensusStateSyncConsensusNode specifies nodes exposing public consensus services which
// are used to sync a light client.
CfgConsensusStateSyncConsensusNode = "consensus.tendermint.state_sync.consensus_node"
// CfgConsensusStateSyncTrustPeriod is the light client trust period.
CfgConsensusStateSyncTrustPeriod = "consensus.tendermint.state_sync.trust_period"
// CfgConsensusStateSyncTrustHeight is the known trusted height for the light client.
CfgConsensusStateSyncTrustHeight = "consensus.tendermint.state_sync.trust_height"
// CfgConsensusStateSyncTrustHash is the known trusted block header hash for the light client.
CfgConsensusStateSyncTrustHash = "consensus.tendermint.state_sync.trust_hash"
)

const (
Expand Down Expand Up @@ -1217,6 +1232,40 @@ func (t *fullService) lazyInit() error {
return db, nil
}

// Configure state sync if enabled.
var stateProvider tmstatesync.StateProvider
if viper.GetBool(CfgConsensusStateSyncEnabled) {
t.Logger.Info("state sync enabled")

// Enable state sync in the configuration.
tenderConfig.StateSync.Enable = true
tenderConfig.StateSync.TrustHash = viper.GetString(CfgConsensusStateSyncTrustHash)

// Create new state sync state provider.
cfg := light.ClientConfig{
GenesisDocument: tmGenDoc,
TrustOptions: tmlight.TrustOptions{
Period: viper.GetDuration(CfgConsensusStateSyncTrustPeriod),
Height: int64(viper.GetUint64(CfgConsensusStateSyncTrustHeight)),
Hash: tenderConfig.StateSync.TrustHashBytes(),
},
}
for _, rawAddr := range viper.GetStringSlice(CfgConsensusStateSyncConsensusNode) {
var addr node.TLSAddress
if err = addr.UnmarshalText([]byte(rawAddr)); err != nil {
return fmt.Errorf("failed to parse state sync consensus node address (%s): %w", rawAddr, err)
}

cfg.ConsensusNodes = append(cfg.ConsensusNodes, addr)
}
if stateProvider, err = newStateProvider(t.ctx, cfg); err != nil {
t.Logger.Error("failed to create state sync state provider",
"err", err,
)
return fmt.Errorf("failed to create state sync state provider: %w", err)
}
}

// HACK: tmnode.NewNode() triggers block replay and or ABCI chain
// initialization, instead of t.node.Start(). This is a problem
// because at the time that lazyInit() is called, none of the ABCI
Expand Down Expand Up @@ -1246,6 +1295,7 @@ func (t *fullService) lazyInit() error {
wrapDbProvider,
tmnode.DefaultMetricsProvider(tenderConfig.Instrumentation),
tmcommon.NewLogAdapter(!viper.GetBool(tmcommon.CfgLogDebug)),
tmnode.StateProvider(stateProvider),
)
if err != nil {
return fmt.Errorf("tendermint: failed to create node: %w", err)
Expand Down Expand Up @@ -1454,6 +1504,13 @@ func init() {
Flags.Bool(CfgSupplementarySanityEnabled, false, "enable supplementary sanity checks (slows down consensus)")
Flags.Uint64(CfgSupplementarySanityInterval, 10, "supplementary sanity check interval (in blocks)")

// State sync.
Flags.Bool(CfgConsensusStateSyncEnabled, false, "enable state sync")
Flags.StringSlice(CfgConsensusStateSyncConsensusNode, []string{}, "state sync: consensus node to use for syncing the light client")
Flags.Duration(CfgConsensusStateSyncTrustPeriod, 24*time.Hour, "state sync: light client trust period")
Flags.Uint64(CfgConsensusStateSyncTrustHeight, 0, "state sync: light client trusted height")
Flags.String(CfgConsensusStateSyncTrustHash, "", "state sync: light client trusted consensus header hash")

_ = Flags.MarkHidden(CfgDebugP2PAllowDuplicateIP)
_ = Flags.MarkHidden(CfgDebugDisableCheckTx)
_ = Flags.MarkHidden(CfgDebugUnsafeReplayRecoverCorruptedWAL)
Expand Down
Loading

0 comments on commit b3912eb

Please sign in to comment.