diff --git a/.changelog/2880.feature.md b/.changelog/2880.feature.md new file mode 100644 index 00000000000..ea8ee75a650 --- /dev/null +++ b/.changelog/2880.feature.md @@ -0,0 +1 @@ +go/consensus/tendermint: Add support for state sync diff --git a/go/consensus/tendermint/abci/mux.go b/go/consensus/tendermint/abci/mux.go index 2de7bcfa45b..e346812bb6a 100644 --- a/go/consensus/tendermint/abci/mux.go +++ b/go/consensus/tendermint/abci/mux.go @@ -31,6 +31,7 @@ import ( "github.com/oasislabs/oasis-core/go/consensus/tendermint/api" epochtime "github.com/oasislabs/oasis-core/go/epochtime/api" genesis "github.com/oasislabs/oasis-core/go/genesis/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/checkpoint" upgrade "github.com/oasislabs/oasis-core/go/upgrade/api" ) @@ -877,6 +878,223 @@ func (mux *abciMux) Commit() types.ResponseCommit { } } +func (mux *abciMux) ListSnapshots(req types.RequestListSnapshots) types.ResponseListSnapshots { + // Get a list of all current checkpoints. + cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{ + Version: 1, + }) + if err != nil { + mux.logger.Error("failed to get checkpoints", + "err", err, + ) + return types.ResponseListSnapshots{} + } + + var rsp types.ResponseListSnapshots + for _, cp := range cps { + cpHash := cp.EncodedHash() + + rsp.Snapshots = append(rsp.Snapshots, &types.Snapshot{ + Height: cp.Root.Version, + Format: uint32(cp.Version), + Chunks: uint32(len(cp.Chunks)), + Hash: cpHash[:], + Metadata: cbor.Marshal(cp), + }) + } + + return rsp +} + +func (mux *abciMux) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot { + if req.Snapshot == nil { + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + if req.Snapshot.Format != 1 { + mux.logger.Warn("received snapshot with unsupported version", + "version", req.Snapshot.Format, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT_FORMAT} + } + + // Decode checkpoint metadata hash and sanity check against the request. + var metadataHash hash.Hash + metadataHash.FromBytes(req.Snapshot.Metadata) + var h hash.Hash + if err := h.UnmarshalBinary(req.Snapshot.Hash); err != nil { + mux.logger.Warn("received snapshot with malformed hash", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + if !metadataHash.Equal(&h) { + mux.logger.Warn("received snapshot with mismatching hash", + "expected_hash", h, + "hash", metadataHash, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Decode checkpoint metadata. + var cp checkpoint.Metadata + if err := cbor.Unmarshal(req.Snapshot.Metadata, &cp); err != nil { + mux.logger.Warn("received snapshot with malformed metadata", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Number of chunks must match. + if int(req.Snapshot.Chunks) != len(cp.Chunks) { + mux.logger.Warn("received snapshot with mismatching number of chunks", + "expected_chunks", len(cp.Chunks), + "chunks", req.Snapshot.Chunks, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + // Root hash must match. + var appHash hash.Hash + if err := appHash.UnmarshalBinary(req.AppHash); err != nil { + // NOTE: This should never happen as it indicates a problem with Tendermint. + mux.logger.Error("received request with malformed hash", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} + } + if !cp.Root.Hash.Equal(&appHash) { + mux.logger.Warn("received snapshot with mismatching root hash", + "expected_root", appHash, + "root", cp.Root.Hash, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Snapshot seems correct (e.g., it is for the correct root), start the restoration process. + if err := mux.state.storage.Checkpointer().StartRestore(mux.state.ctx, &cp); err != nil { + mux.logger.Error("failed to start restore", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} + } + + mux.logger.Info("started state restore process", + "root", cp.Root, + ) + + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ACCEPT} +} + +func (mux *abciMux) LoadSnapshotChunk(req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk { + // Fetch the metadata for the specified checkpoint. + cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{ + Version: uint16(req.Format), + RootVersion: &req.Height, + }) + if err != nil { + mux.logger.Error("failed to get checkpoints", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + if len(cps) != 1 { + mux.logger.Error("failed to get checkpoints", + "cps", len(cps), + ) + return types.ResponseLoadSnapshotChunk{} + } + + // Fetch the chunk itself. + chunk, err := cps[0].GetChunkMetadata(uint64(req.Chunk)) + if err != nil { + mux.logger.Error("failed to get chunk metadata", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + var buf bytes.Buffer + if err := mux.state.storage.Checkpointer().GetCheckpointChunk(mux.state.ctx, chunk, &buf); err != nil { + mux.logger.Error("failed to get chunk", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + + return types.ResponseLoadSnapshotChunk{Chunk: buf.Bytes()} +} + +func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk { + cp := mux.state.storage.Checkpointer().GetCurrentCheckpoint() + + mux.logger.Debug("attempting to restore a chunk", + "root", cp.Root, + "index", req.Index, + ) + + buf := bytes.NewBuffer(req.Chunk) + done, err := mux.state.storage.Checkpointer().RestoreChunk(mux.state.ctx, uint64(req.Index), buf) + switch { + case err == nil: + case errors.Is(err, checkpoint.ErrNoRestoreInProgress): + // This should never happen. + mux.logger.Error("ApplySnapshotChunk called without OfferSnapshot, aborting state sync") + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + case errors.Is(err, checkpoint.ErrChunkAlreadyRestored): + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT} + case errors.Is(err, checkpoint.ErrChunkCorrupted): + // Corrupted chunk, refetch. + mux.logger.Warn("received corrupted chunk", + "sender", req.Sender, + "index", req.Index, + "err", err, + ) + + return types.ResponseApplySnapshotChunk{ + RefetchChunks: []uint32{req.Index}, + // TODO: Consider banning the sender. + Result: types.ResponseApplySnapshotChunk_RETRY, + } + case errors.Is(err, checkpoint.ErrChunkProofVerificationFailed): + // Chunk was as specified in the manifest but did not match the reported root. In this case + // we need to abort processing the given snapshot. + mux.logger.Warn("chunk contains invalid proof, snapshot is bad", + "err", err, + ) + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT} + default: + // Unspecified error during restoration. + mux.logger.Error("error during chunk restoration, aborting state sync", + "err", err, + ) + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + // Check if we are done with the restoration. In this case, finalize the root. + if done { + err = mux.state.storage.NodeDB().Finalize(mux.state.ctx, cp.Root.Version, []hash.Hash{cp.Root.Hash}) + if err != nil { + mux.logger.Error("failed to finalize restored root", + "err", err, + ) + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + if err = mux.state.doApplyStateSync(cp.Root); err != nil { + mux.logger.Error("failed to apply state sync root", + "err", err, + ) + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + mux.logger.Info("successfully synced state", + "root", cp.Root, + ) + } + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT} +} + func (mux *abciMux) doCleanup() { mux.state.doCleanup() diff --git a/go/consensus/tendermint/abci/state.go b/go/consensus/tendermint/abci/state.go index ff3f5e62395..0fce01d2e85 100644 --- a/go/consensus/tendermint/abci/state.go +++ b/go/consensus/tendermint/abci/state.go @@ -24,6 +24,7 @@ import ( storage "github.com/oasislabs/oasis-core/go/storage/api" storageDB "github.com/oasislabs/oasis-core/go/storage/database" "github.com/oasislabs/oasis-core/go/storage/mkvs" + "github.com/oasislabs/oasis-core/go/storage/mkvs/checkpoint" ) var _ api.ApplicationState = (*applicationState)(nil) @@ -46,6 +47,8 @@ type applicationState struct { // nolint: maligned prunerClosedCh chan struct{} prunerNotifyCh *channels.RingChannel + checkpointer checkpoint.Checkpointer + blockLock sync.RWMutex blockTime time.Time blockCtx *api.BlockContext @@ -238,6 +241,24 @@ func (s *applicationState) doInitChain(now time.Time) error { return s.doCommitOrInitChainLocked(now) } +func (s *applicationState) doApplyStateSync(root storage.Root) error { + s.blockLock.Lock() + defer s.blockLock.Unlock() + + s.stateRoot = root + + s.deliverTxTree.Close() + s.deliverTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog()) + s.checkTxTree.Close() + s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog()) + + if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil { + return err + } + + return nil +} + func (s *applicationState) doCommit(now time.Time) (uint64, error) { s.blockLock.Lock() defer s.blockLock.Unlock() @@ -262,10 +283,12 @@ func (s *applicationState) doCommit(now time.Time) (uint64, error) { s.checkTxTree.Close() s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), s.stateRoot, mkvs.WithoutWriteLog()) - // Notify pruner of a new block. + // Notify pruner and checkpointer of a new block. s.prunerNotifyCh.In() <- s.stateRoot.Version // Discover the version below which all versions can be discarded from block history. lastRetainedVersion := s.statePruner.GetLastRetainedVersion() + // Notify the checkpointer of the new version. + s.checkpointer.NotifyNewVersion(s.stateRoot.Version) return lastRetainedVersion, nil } @@ -430,6 +453,21 @@ func newApplicationState(ctx context.Context, cfg *ApplicationConfig) (*applicat return nil, fmt.Errorf("state: failed to create pruner: %w", err) } + // Initialize the checkpointer. + checkpointerCfg := checkpoint.CheckpointerConfig{ + CheckInterval: 1 * time.Minute, // XXX: Make this configurable. + RootsPerVersion: 1, + Parameters: &checkpoint.CreationParameters{ // XXX: Make these configurable. + Interval: 10, + NumKept: 1, + ChunkSize: 1024 * 1024, + }, + } + checkpointer, err := checkpoint.NewCheckpointer(ctx, ndb, ldb.Checkpointer(), checkpointerCfg) + if err != nil { + return nil, fmt.Errorf("state: failed to create checkpointer: %w", err) + } + var minGasPrice quantity.Quantity if err = minGasPrice.FromInt64(int64(cfg.MinGasPrice)); err != nil { return nil, fmt.Errorf("state: invalid minimum gas price: %w", err) @@ -445,6 +483,7 @@ func newApplicationState(ctx context.Context, cfg *ApplicationConfig) (*applicat checkTxTree: checkTxTree, stateRoot: stateRoot, storage: ldb, + checkpointer: checkpointer, statePruner: statePruner, prunerClosedCh: make(chan struct{}), prunerNotifyCh: channels.NewRingChannel(1), diff --git a/go/consensus/tendermint/light.go b/go/consensus/tendermint/light.go new file mode 100644 index 00000000000..8439b9ba8ce --- /dev/null +++ b/go/consensus/tendermint/light.go @@ -0,0 +1,169 @@ +package tendermint + +import ( + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + tmlite "github.com/tendermint/tendermint/lite2" + tmliteprovider "github.com/tendermint/tendermint/lite2/provider" + tmtypes "github.com/tendermint/tendermint/types" + + cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc" + consensusAPI "github.com/oasislabs/oasis-core/go/consensus/api" +) + +// lightClientProvider implements Tendermint's light client provider interface using the Oasis Core +// light client API. +type lightClientProvider struct { + ctx context.Context + + chainID string + client consensusAPI.LightClientBackend +} + +// Implements tmliteprovider.Provider. +func (lp *lightClientProvider) ChainID() string { + return lp.chainID +} + +// Implements tmliteprovider.Provider. +func (lp *lightClientProvider) SignedHeader(height int64) (*tmtypes.SignedHeader, error) { + shdr, err := lp.client.GetSignedHeader(lp.ctx, height) + switch { + case err == nil: + case errors.Is(err, consensusAPI.ErrVersionNotFound): + return nil, tmliteprovider.ErrSignedHeaderNotFound + default: + return nil, fmt.Errorf("failed to fetch signed header: %w", err) + } + + // Decode Tendermint-specific signed header. + var sh tmtypes.SignedHeader + if err = aminoCodec.UnmarshalBinaryBare(shdr.Meta, &sh); err != nil { + return nil, fmt.Errorf("received malformed header: %w", err) + } + + if lp.chainID != sh.ChainID { + return nil, fmt.Errorf("incorrect chain ID (expected: %s got: %s)", + lp.chainID, + sh.ChainID, + ) + } + + return &sh, nil +} + +// Implements tmliteprovider.Provider. +func (lp *lightClientProvider) ValidatorSet(height int64) (*tmtypes.ValidatorSet, error) { + vs, err := lp.client.GetValidatorSet(lp.ctx, height) + switch { + case err == nil: + case errors.Is(err, consensusAPI.ErrVersionNotFound): + return nil, tmliteprovider.ErrValidatorSetNotFound + default: + return nil, fmt.Errorf("failed to fetch validator set: %w", err) + } + + // Decode Tendermint-specific validator set. + var vals tmtypes.ValidatorSet + if err = aminoCodec.UnmarshalBinaryBare(vs.Meta, &vals); err != nil { + return nil, fmt.Errorf("received malformed validator set: %w", err) + } + + return &vals, nil +} + +// Implements tmliteprovider.Provider. +func (lp *lightClientProvider) ReportEvidence(ev tmtypes.Evidence) error { + // TODO: Implement SubmitEvidence. + return fmt.Errorf("not yet implemented") +} + +// newLightClientProvider creates a new provider for the Tendermint's light client. +// +// The provided chain ID must be the Tendermint chain ID. +func newLightClientProvider( + ctx context.Context, + chainID string, + address string, +) (tmliteprovider.Provider, error) { + creds := credentials.NewTLS(&tls.Config{ + // XXX: As a defense in depth measure this should validate certificates. However the light + // endpoint is untrusted and is verified through other means. + // + // This should be easier once we start support validating only by Ed25519 public keys + // as those are easier to input than full certificates (see oasis-core#2556). + InsecureSkipVerify: true, + }) + conn, err := cmnGrpc.Dial(address, grpc.WithTransportCredentials(creds)) + if err != nil { + return nil, fmt.Errorf("failed to dial public consensus service endpoint %s: %w", address, err) + } + + return &lightClientProvider{ + ctx: ctx, + chainID: chainID, + client: consensusAPI.NewConsensusLightClient(conn), + }, nil +} + +// lightService is a Tendermint consensus service that uses the light client API to talk with a +// remote Tendermint node and verify responses. +// +// This should eventually become a replacement for the full node tendermintService. +type lightService struct { + // lc is the Tendermint light client used for verifying headers. + lc *tmlite.Client + // client is the consensus light client backend connected to a remote node. + client consensusAPI.LightClientBackend +} + +func (ls *lightService) getParameters(ctx context.Context, height int64) (*tmtypes.ConsensusParams, error) { + p, err := ls.client.GetParameters(ctx, height) + if err != nil { + return nil, err + } + if p.Height <= 0 { + return nil, fmt.Errorf("malformed height in response: %d", p.Height) + } + + // Decode Tendermint-specific parameters. + var params tmtypes.ConsensusParams + if err = aminoCodec.UnmarshalBinaryBare(p.Meta, ¶ms); err != nil { + return nil, fmt.Errorf("malformed parameters: %w", err) + } + if err = params.Validate(); err != nil { + return nil, fmt.Errorf("malformed parameters: %w", err) + } + + // Fetch the header from the light client. + h, err := ls.lc.VerifyHeaderAtHeight(p.Height, time.Now()) + if err != nil { + return nil, fmt.Errorf("failed to fetch header %d from light client: %w", p.Height, err) + } + + // Verify hash. + if localHash := params.Hash(); !bytes.Equal(localHash, h.ConsensusHash) { + return nil, fmt.Errorf("mismatched parameters hash (expected: %X got: %X)", + h.ConsensusHash, + localHash, + ) + } + + return ¶ms, nil +} + +// newLightService creates a light Tendermint consensus service. +func newLightService(client consensusAPI.LightClientBackend, lc *tmlite.Client) (*lightService, error) { + return &lightService{ + lc: lc, + client: client, + }, nil +} diff --git a/go/consensus/tendermint/statesync.go b/go/consensus/tendermint/statesync.go new file mode 100644 index 00000000000..428275d4def --- /dev/null +++ b/go/consensus/tendermint/statesync.go @@ -0,0 +1,163 @@ +package tendermint + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/spf13/viper" + tmlite "github.com/tendermint/tendermint/lite2" + tmliteprovider "github.com/tendermint/tendermint/lite2/provider" + tmlitedb "github.com/tendermint/tendermint/lite2/store/db" + tmstate "github.com/tendermint/tendermint/state" + tmstatesync "github.com/tendermint/tendermint/statesync" + tmtypes "github.com/tendermint/tendermint/types" + tmdb "github.com/tendermint/tm-db" + + "github.com/oasislabs/oasis-core/go/common/logging" +) + +// stateProviderConfig is the configuration for the state provider. +type stateProviderConfig struct { + // ChainID is the Tendermint chain ID. + ChainID string + + // ConsensusNodes is a list of nodes exposing the Oasis Core public consensus services that are + // used to fetch data required for syncing light clients. The first node is considered the + // primary and at least two nodes must be specified. + ConsensusNodes []string + + // TrustOptions are Tendermint light client trust options. + TrustOptions tmlite.TrustOptions +} + +type stateProvider struct { + sync.Mutex + + ctx context.Context + lc *tmlite.Client + + logger *logging.Logger +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) AppHash(height uint64) ([]byte, error) { + sp.Lock() + defer sp.Unlock() + + // We have to fetch the next height, which contains the app hash for the previous height. + header, err := sp.lc.VerifyHeaderAtHeight(int64(height+1), time.Now()) + if err != nil { + return nil, err + } + return header.AppHash, nil +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) Commit(height uint64) (*tmtypes.Commit, error) { + sp.Lock() + defer sp.Unlock() + + header, err := sp.lc.VerifyHeaderAtHeight(int64(height), time.Now()) + if err != nil { + return nil, err + } + return header.Commit, nil +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) State(height uint64) (tmstate.State, error) { + sp.Lock() + defer sp.Unlock() + + state := tmstate.State{ + ChainID: sp.lc.ChainID(), + Version: tmstate.InitStateVersion, + } + + // We need to verify up until h+2, to get the validator set. This also prefetches the headers + // for h and h+1 in the typical case where the trusted header is after the snapshot height. + _, err := sp.lc.VerifyHeaderAtHeight(int64(height+2), time.Now()) + if err != nil { + return tmstate.State{}, err + } + header, err := sp.lc.VerifyHeaderAtHeight(int64(height), time.Now()) + if err != nil { + return tmstate.State{}, err + } + nextHeader, err := sp.lc.VerifyHeaderAtHeight(int64(height+1), time.Now()) + if err != nil { + return tmstate.State{}, err + } + state.LastBlockHeight = header.Height + state.LastBlockTime = header.Time + state.LastBlockID = header.Commit.BlockID + state.AppHash = nextHeader.AppHash + state.LastResultsHash = nextHeader.LastResultsHash + + state.LastValidators, _, err = sp.lc.TrustedValidatorSet(int64(height)) + if err != nil { + return tmstate.State{}, err + } + state.Validators, _, err = sp.lc.TrustedValidatorSet(int64(height + 1)) + if err != nil { + return tmstate.State{}, err + } + state.NextValidators, _, err = sp.lc.TrustedValidatorSet(int64(height + 2)) + if err != nil { + return tmstate.State{}, err + } + state.LastHeightValidatorsChanged = int64(height) + + // Fetch consensus parameters with light client verification. + primaryClient := sp.lc.Primary().(*lightClientProvider).client + ls, err := newLightService(primaryClient, sp.lc) + if err != nil { + return tmstate.State{}, fmt.Errorf("failed to create a new light service: %w", err) + } + params, err := ls.getParameters(sp.ctx, nextHeader.Height) + if err != nil { + return tmstate.State{}, fmt.Errorf("failed to fetch consensus parameters for height %d: %w", + nextHeader.Height, + err, + ) + } + state.ConsensusParams = *params + + return state, nil +} + +func newStateProvider(ctx context.Context, cfg stateProviderConfig) (tmstatesync.StateProvider, error) { + if numNodes := len(cfg.ConsensusNodes); numNodes < 2 { + return nil, fmt.Errorf("at least two consensus nodes must be provided (got %d)", numNodes) + } + + var providers []tmliteprovider.Provider + for _, address := range cfg.ConsensusNodes { + p, err := newLightClientProvider(ctx, cfg.ChainID, address) + if err != nil { + return nil, fmt.Errorf("failed to create light client provider: %w", err) + } + providers = append(providers, p) + } + + lc, err := tmlite.NewClient( + cfg.ChainID, + cfg.TrustOptions, + providers[0], // Primary provider. + providers[1:], // Witnesses. + tmlitedb.New(tmdb.NewMemDB(), ""), + tmlite.MaxRetryAttempts(5), + tmlite.Logger(newLogAdapter(!viper.GetBool(cfgLogDebug))), + ) + if err != nil { + return nil, fmt.Errorf("failed to create light client: %w", err) + } + + return &stateProvider{ + ctx: ctx, + lc: lc, + logger: logging.GetLogger("consensus/tendermint/stateprovider"), + }, nil +} diff --git a/go/consensus/tendermint/tendermint.go b/go/consensus/tendermint/tendermint.go index bf7b136b55c..5db1f8e754b 100644 --- a/go/consensus/tendermint/tendermint.go +++ b/go/consensus/tendermint/tendermint.go @@ -18,12 +18,14 @@ import ( tmconfig "github.com/tendermint/tendermint/config" tmlog "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmlite "github.com/tendermint/tendermint/lite2" tmmempool "github.com/tendermint/tendermint/mempool" tmnode "github.com/tendermint/tendermint/node" tmp2p "github.com/tendermint/tendermint/p2p" tmproxy "github.com/tendermint/tendermint/proxy" tmcli "github.com/tendermint/tendermint/rpc/client/local" tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" + tmstatesync "github.com/tendermint/tendermint/statesync" tmtypes "github.com/tendermint/tendermint/types" tmdb "github.com/tendermint/tm-db" @@ -124,6 +126,18 @@ const ( // CfgConsensusDebugDisableCheckTx disables CheckTx. CfgConsensusDebugDisableCheckTx = "consensus.tendermint.debug.disable_check_tx" + // CfgConsensusStateSyncEnabled enabled consensus state sync. + CfgConsensusStateSyncEnabled = "consensus.tendermint.state_sync.enabled" + // CfgConsensusStateSyncConsensusNode specifies nodes exposing public consensus services which + // are used to sync a light client. + CfgConsensusStateSyncConsensusNode = "consensus.tendermint.state_sync.consensus_node" + // CfgConsensusStateSyncTrustPeriod is the light client trust period. + CfgConsensusStateSyncTrustPeriod = "consensus.tendermint.state_sync.trust_period" + // CfgConsensusStateSyncTrustHeight is the known trusted height for the light client. + CfgConsensusStateSyncTrustHeight = "consensus.tendermint.state_sync.trust_height" + // CfgConsensusStateSyncTrustHash is the known trusted block header hash for the light client. + CfgConsensusStateSyncTrustHash = "consensus.tendermint.state_sync.trust_hash" + // StateDir is the name of the directory located inside the node's data // directory which contains the tendermint state. StateDir = "tendermint" @@ -1056,6 +1070,33 @@ func (t *tendermintService) lazyInit() error { return db, nil } + // Configure state sync if enabled. + var stateProvider tmstatesync.StateProvider + if viper.GetBool(CfgConsensusStateSyncEnabled) { + t.Logger.Info("state sync enabled") + + // Enable state sync in the configuration. + tenderConfig.StateSync.Enable = true + tenderConfig.StateSync.TrustHash = viper.GetString(CfgConsensusStateSyncTrustHash) + + // Create new state sync state provider. + cfg := stateProviderConfig{ + ChainID: tmGenDoc.ChainID, + ConsensusNodes: viper.GetStringSlice(CfgConsensusStateSyncConsensusNode), + TrustOptions: tmlite.TrustOptions{ + Period: viper.GetDuration(CfgConsensusStateSyncTrustPeriod), + Height: int64(viper.GetUint64(CfgConsensusStateSyncTrustHeight)), + Hash: tenderConfig.StateSync.TrustHashBytes(), + }, + } + if stateProvider, err = newStateProvider(t.ctx, cfg); err != nil { + t.Logger.Error("failed to create state sync state provider", + "err", err, + ) + return fmt.Errorf("failed to create state sync state provider: %w", err) + } + } + // HACK: tmnode.NewNode() triggers block replay and or ABCI chain // initialization, instead of t.node.Start(). This is a problem // because at the time that lazyInit() is called, none of the ABCI @@ -1072,6 +1113,7 @@ func (t *tendermintService) lazyInit() error { wrapDbProvider, tmnode.DefaultMetricsProvider(tenderConfig.Instrumentation), newLogAdapter(!viper.GetBool(cfgLogDebug)), + tmnode.StateProvider(stateProvider), ) if err != nil { return fmt.Errorf("tendermint: failed to create node: %w", err) @@ -1472,6 +1514,13 @@ func init() { Flags.Bool(CfgConsensusDebugDisableCheckTx, false, "do not perform CheckTx on incoming transactions (UNSAFE)") Flags.Bool(CfgDebugUnsafeReplayRecoverCorruptedWAL, false, "Enable automatic recovery from corrupted WAL during replay (UNSAFE).") + // State sync. + Flags.Bool(CfgConsensusStateSyncEnabled, false, "enable state sync") + Flags.StringSlice(CfgConsensusStateSyncConsensusNode, []string{}, "consensus node to use for syncing the light client") + Flags.Duration(CfgConsensusStateSyncTrustPeriod, 24*time.Hour, "light client trust period") + Flags.Uint64(CfgConsensusStateSyncTrustHeight, 0, "light client trusted height") + Flags.String(CfgConsensusStateSyncTrustHash, "", "light client trusted consensus header hash") + _ = Flags.MarkHidden(cfgLogDebug) _ = Flags.MarkHidden(CfgDebugP2PAddrBookLenient) _ = Flags.MarkHidden(CfgDebugP2PAllowDuplicateIP) diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index 6b2c0eb4501..7a40c7b6aa5 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -176,6 +176,22 @@ func (args *argBuilder) tendermintDebugAllowDuplicateIP() *argBuilder { return args } +func (args *argBuilder) tendermintStateSync( + consensusNodes []string, + trustHeight uint64, + trustHash string, +) *argBuilder { + args.vec = append(args.vec, + "--"+tendermint.CfgConsensusStateSyncEnabled, + "--"+tendermint.CfgConsensusStateSyncTrustHeight, strconv.FormatUint(trustHeight, 10), + "--"+tendermint.CfgConsensusStateSyncTrustHash, trustHash, + ) + for _, address := range consensusNodes { + args.vec = append(args.vec, "--"+tendermint.CfgConsensusStateSyncConsensusNode, address) + } + return args +} + func (args *argBuilder) storageBackend(backend string) *argBuilder { args.vec = append(args.vec, []string{ "--" + storage.CfgBackend, backend, diff --git a/go/oasis-test-runner/oasis/oasis.go b/go/oasis-test-runner/oasis/oasis.go index 77b86cd3035..ba1182076e8 100644 --- a/go/oasis-test-runner/oasis/oasis.go +++ b/go/oasis-test-runner/oasis/oasis.go @@ -52,6 +52,13 @@ const ( maxNodes = 32 // Arbitrary ) +// ConsensusStateSyncCfg is a node's consensus state sync configuration. +type ConsensusStateSyncCfg struct { + ConsensusNodes []string + TrustHeight uint64 + TrustHash string +} + // Node defines the common fields for all node types. type Node struct { // nolint: maligned sync.Mutex @@ -74,6 +81,7 @@ type Node struct { // nolint: maligned logWatcherHandlerFactories []log.WatcherHandlerFactory consensus ConsensusFixture + consensusStateSync *ConsensusStateSyncCfg customGrpcSocketPath string } @@ -163,6 +171,14 @@ func (n *Node) handleExit(cmdErr error) error { } } +// SetConsensusStateSync configures wheteher a node should perform +func (n *Node) SetConsensusStateSync(cfg *ConsensusStateSyncCfg) { + n.Lock() + defer n.Unlock() + + n.consensusStateSync = cfg +} + // NodeCfg defines the common node configuration options. type NodeCfg struct { // nolint: maligned AllowEarlyTermination bool @@ -662,6 +678,13 @@ func (net *Network) startOasisNode( extraArgs = extraArgs.debugDontBlameOasis() extraArgs = extraArgs.grpcDebugGrpcInternalSocketPath(node.customGrpcSocketPath) } + if node.consensusStateSync != nil { + extraArgs = extraArgs.tendermintStateSync( + node.consensusStateSync.ConsensusNodes, + node.consensusStateSync.TrustHeight, + node.consensusStateSync.TrustHash, + ) + } if viper.IsSet(metrics.CfgMetricsAddr) { extraArgs = extraArgs.appendNodeMetrics(node) } diff --git a/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go new file mode 100644 index 00000000000..30b39bb00b9 --- /dev/null +++ b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go @@ -0,0 +1,125 @@ +package e2e + +import ( + "context" + "encoding/hex" + "fmt" + "time" + + consensus "github.com/oasislabs/oasis-core/go/consensus/api" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/env" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/oasis" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/scenario" +) + +var ( + // ConsensusStateSync is the consensus state sync scenario. + ConsensusStateSync scenario.Scenario = &consensusStateSyncImpl{ + runtimeImpl: *newRuntimeImpl("consensus-state-sync", "", nil), + } +) + +type consensusStateSyncImpl struct { + runtimeImpl +} + +func (sc *consensusStateSyncImpl) Clone() scenario.Scenario { + return &consensusStateSyncImpl{ + runtimeImpl: *sc.runtimeImpl.Clone().(*runtimeImpl), + } +} + +func (sc *consensusStateSyncImpl) Fixture() (*oasis.NetworkFixture, error) { + f, err := sc.runtimeImpl.Fixture() + if err != nil { + return nil, err + } + + // We don't need any runtimes. + f.Runtimes = nil + f.Keymanagers = nil + f.StorageWorkers = nil + f.ComputeWorkers = nil + f.Sentries = nil + + // Enable public consensus RPC services worker so that the nodes can be used for light clients. + f.Validators = []oasis.ValidatorFixture{ + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + } + + return f, nil +} + +func (sc *consensusStateSyncImpl) Run(childEnv *env.Env) error { + if err := sc.net.Start(); err != nil { + return err + } + + sc.logger.Info("waiting for network to come up") + ctx := context.Background() + if err := sc.net.Controller().WaitNodesRegistered(ctx, len(sc.net.Validators())); err != nil { + return err + } + + // Stop one of the validators. + val := sc.net.Validators()[2] + if err := val.Stop(); err != nil { + return fmt.Errorf("failed to stop validator: %w", err) + } + + // Let the network run for 50 blocks. This should generate some checkpoints. + blockCh, blockSub, err := sc.net.Controller().Consensus.WatchBlocks(ctx) + if err != nil { + return err + } + defer blockSub.Close() + + sc.logger.Info("waiting for some blocks") + var blk *consensus.Block + for { + select { + case blk = <-blockCh: + if blk.Height < 50 { + continue + } + case <-time.After(30 * time.Second): + return fmt.Errorf("timed out waiting for blocks") + } + + break + } + + sc.logger.Info("got some blocks, starting the validator back", + "trust_height", blk.Height, + "trust_hash", hex.EncodeToString(blk.Hash), + ) + + // Configure state sync for the consensus validator. + val.SetConsensusStateSync(&oasis.ConsensusStateSyncCfg{ + ConsensusNodes: []string{ + sc.net.Validators()[0].ExternalGRPCAddress(), + sc.net.Validators()[1].ExternalGRPCAddress(), + }, + TrustHeight: uint64(blk.Height), + TrustHash: hex.EncodeToString(blk.Hash), + }) + + if err := val.Start(); err != nil { + return fmt.Errorf("failed to start validator back: %w", err) + } + + // Wait for the validator to finish syncing. + sc.logger.Info("waiting for the validator to sync") + valCtrl, err := oasis.NewController(val.SocketPath()) + if err != nil { + return err + } + if err = valCtrl.WaitSync(ctx); err != nil { + return err + } + + return nil +} diff --git a/go/oasis-test-runner/scenario/e2e/e2e.go b/go/oasis-test-runner/scenario/e2e/e2e.go index d7426c1d80c..1498b669979 100644 --- a/go/oasis-test-runner/scenario/e2e/e2e.go +++ b/go/oasis-test-runner/scenario/e2e/e2e.go @@ -124,6 +124,8 @@ func RegisterScenarios() error { Debond, // Late start test. LateStart, + // Consensus state sync. + ConsensusStateSync, } { if err := cmd.Register(s); err != nil { return err