From 4cad04b15bf02f4a001c2bebcca20879ad3a5811 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Tue, 23 Jun 2020 12:31:10 +0200 Subject: [PATCH] go/consensus/tendermint: Add support for state sync --- .changelog/2880.feature.md | 1 + go/consensus/genesis/genesis.go | 3 +- go/consensus/tendermint/abci/mux.go | 218 +++++++++++++++ go/consensus/tendermint/abci/state.go | 18 ++ go/consensus/tendermint/light/client.go | 254 ++++++++++++++++++ go/consensus/tendermint/light/light.go | 26 ++ go/consensus/tendermint/statesync.go | 123 +++++++++ go/consensus/tendermint/tendermint.go | 57 ++++ go/oasis-node/cmd/genesis/genesis.go | 18 +- go/oasis-test-runner/oasis/args.go | 16 ++ go/oasis-test-runner/oasis/oasis.go | 25 ++ .../scenario/e2e/consensus_state_sync.go | 146 ++++++++++ go/oasis-test-runner/scenario/e2e/e2e.go | 2 + 13 files changed, 897 insertions(+), 10 deletions(-) create mode 100644 .changelog/2880.feature.md create mode 100644 go/consensus/tendermint/light/client.go create mode 100644 go/consensus/tendermint/light/light.go create mode 100644 go/consensus/tendermint/statesync.go create mode 100644 go/oasis-test-runner/scenario/e2e/consensus_state_sync.go diff --git a/.changelog/2880.feature.md b/.changelog/2880.feature.md new file mode 100644 index 00000000000..ea8ee75a650 --- /dev/null +++ b/.changelog/2880.feature.md @@ -0,0 +1 @@ +go/consensus/tendermint: Add support for state sync diff --git a/go/consensus/genesis/genesis.go b/go/consensus/genesis/genesis.go index 73b6044c7a2..3c031bf8cb1 100644 --- a/go/consensus/genesis/genesis.go +++ b/go/consensus/genesis/genesis.go @@ -7,6 +7,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" + "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" ) // Genesis contains various consensus config flags that should be part of the genesis state. @@ -54,7 +55,7 @@ func (g *Genesis) SanityCheck() error { return fmt.Errorf("consensus: sanity check failed: timeout commit must be >= 1ms") } - if params.StateCheckpointInterval > 0 { + if params.StateCheckpointInterval > 0 && !flags.DebugDontBlameOasis() { if params.StateCheckpointInterval < 1000 { return fmt.Errorf("consensus: sanity check failed: state checkpoint interval must be >= 1000") } diff --git a/go/consensus/tendermint/abci/mux.go b/go/consensus/tendermint/abci/mux.go index baca38f13b4..70ca2059b03 100644 --- a/go/consensus/tendermint/abci/mux.go +++ b/go/consensus/tendermint/abci/mux.go @@ -31,6 +31,7 @@ import ( abciState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/abci/state" "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api" epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" ) @@ -797,6 +798,223 @@ func (mux *abciMux) Commit() types.ResponseCommit { } } +func (mux *abciMux) ListSnapshots(req types.RequestListSnapshots) types.ResponseListSnapshots { + // Get a list of all current checkpoints. + cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{ + Version: 1, + }) + if err != nil { + mux.logger.Error("failed to get checkpoints", + "err", err, + ) + return types.ResponseListSnapshots{} + } + + var rsp types.ResponseListSnapshots + for _, cp := range cps { + cpHash := cp.EncodedHash() + + rsp.Snapshots = append(rsp.Snapshots, &types.Snapshot{ + Height: cp.Root.Version, + Format: uint32(cp.Version), + Chunks: uint32(len(cp.Chunks)), + Hash: cpHash[:], + Metadata: cbor.Marshal(cp), + }) + } + + return rsp +} + +func (mux *abciMux) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot { + if req.Snapshot == nil { + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + if req.Snapshot.Format != 1 { + mux.logger.Warn("received snapshot with unsupported version", + "version", req.Snapshot.Format, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT_FORMAT} + } + + // Decode checkpoint metadata hash and sanity check against the request. + var metadataHash hash.Hash + metadataHash.FromBytes(req.Snapshot.Metadata) + var h hash.Hash + if err := h.UnmarshalBinary(req.Snapshot.Hash); err != nil { + mux.logger.Warn("received snapshot with malformed hash", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + if !metadataHash.Equal(&h) { + mux.logger.Warn("received snapshot with mismatching hash", + "expected_hash", h, + "hash", metadataHash, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Decode checkpoint metadata. + var cp checkpoint.Metadata + if err := cbor.Unmarshal(req.Snapshot.Metadata, &cp); err != nil { + mux.logger.Warn("received snapshot with malformed metadata", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Number of chunks must match. + if int(req.Snapshot.Chunks) != len(cp.Chunks) { + mux.logger.Warn("received snapshot with mismatching number of chunks", + "expected_chunks", len(cp.Chunks), + "chunks", req.Snapshot.Chunks, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + // Root hash must match. + var appHash hash.Hash + if err := appHash.UnmarshalBinary(req.AppHash); err != nil { + // NOTE: This should never happen as it indicates a problem with Tendermint. + mux.logger.Error("received request with malformed hash", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} + } + if !cp.Root.Hash.Equal(&appHash) { + mux.logger.Warn("received snapshot with mismatching root hash", + "expected_root", appHash, + "root", cp.Root.Hash, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Snapshot seems correct (e.g., it is for the correct root), start the restoration process. + if err := mux.state.storage.Checkpointer().StartRestore(mux.state.ctx, &cp); err != nil { + mux.logger.Error("failed to start restore", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} + } + + mux.logger.Info("started state restore process", + "root", cp.Root, + ) + + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ACCEPT} +} + +func (mux *abciMux) LoadSnapshotChunk(req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk { + // Fetch the metadata for the specified checkpoint. + cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{ + Version: uint16(req.Format), + RootVersion: &req.Height, + }) + if err != nil { + mux.logger.Error("failed to get checkpoints", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + if len(cps) != 1 { + mux.logger.Error("failed to get checkpoints", + "cps", len(cps), + ) + return types.ResponseLoadSnapshotChunk{} + } + + // Fetch the chunk itself. + chunk, err := cps[0].GetChunkMetadata(uint64(req.Chunk)) + if err != nil { + mux.logger.Error("failed to get chunk metadata", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + var buf bytes.Buffer + if err := mux.state.storage.Checkpointer().GetCheckpointChunk(mux.state.ctx, chunk, &buf); err != nil { + mux.logger.Error("failed to get chunk", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + + return types.ResponseLoadSnapshotChunk{Chunk: buf.Bytes()} +} + +func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk { + cp := mux.state.storage.Checkpointer().GetCurrentCheckpoint() + + mux.logger.Debug("attempting to restore a chunk", + "root", cp.Root, + "index", req.Index, + ) + + buf := bytes.NewBuffer(req.Chunk) + done, err := mux.state.storage.Checkpointer().RestoreChunk(mux.state.ctx, uint64(req.Index), buf) + switch { + case err == nil: + case errors.Is(err, checkpoint.ErrNoRestoreInProgress): + // This should never happen. + mux.logger.Error("ApplySnapshotChunk called without OfferSnapshot, aborting state sync") + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + case errors.Is(err, checkpoint.ErrChunkAlreadyRestored): + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT} + case errors.Is(err, checkpoint.ErrChunkCorrupted): + // Corrupted chunk, refetch. + mux.logger.Warn("received corrupted chunk", + "sender", req.Sender, + "index", req.Index, + "err", err, + ) + + return types.ResponseApplySnapshotChunk{ + RefetchChunks: []uint32{req.Index}, + // TODO: Consider banning the sender. + Result: types.ResponseApplySnapshotChunk_RETRY, + } + case errors.Is(err, checkpoint.ErrChunkProofVerificationFailed): + // Chunk was as specified in the manifest but did not match the reported root. In this case + // we need to abort processing the given snapshot. + mux.logger.Warn("chunk contains invalid proof, snapshot is bad", + "err", err, + ) + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT} + default: + // Unspecified error during restoration. + mux.logger.Error("error during chunk restoration, aborting state sync", + "err", err, + ) + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + // Check if we are done with the restoration. In this case, finalize the root. + if done { + err = mux.state.storage.NodeDB().Finalize(mux.state.ctx, cp.Root.Version, []hash.Hash{cp.Root.Hash}) + if err != nil { + mux.logger.Error("failed to finalize restored root", + "err", err, + ) + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + if err = mux.state.doApplyStateSync(cp.Root); err != nil { + mux.logger.Error("failed to apply state sync root", + "err", err, + ) + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + mux.logger.Info("successfully synced state", + "root", cp.Root, + ) + } + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT} +} + func (mux *abciMux) doCleanup() { mux.state.doCleanup() diff --git a/go/consensus/tendermint/abci/state.go b/go/consensus/tendermint/abci/state.go index 960592b4b9a..3bdf20b50a2 100644 --- a/go/consensus/tendermint/abci/state.go +++ b/go/consensus/tendermint/abci/state.go @@ -252,6 +252,24 @@ func (s *applicationState) doInitChain(now time.Time) error { return s.doCommitOrInitChainLocked(now) } +func (s *applicationState) doApplyStateSync(root storage.Root) error { + s.blockLock.Lock() + defer s.blockLock.Unlock() + + s.stateRoot = root + + s.deliverTxTree.Close() + s.deliverTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog()) + s.checkTxTree.Close() + s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog()) + + if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil { + return err + } + + return nil +} + func (s *applicationState) doCommit(now time.Time) (uint64, error) { s.blockLock.Lock() defer s.blockLock.Unlock() diff --git a/go/consensus/tendermint/light/client.go b/go/consensus/tendermint/light/client.go new file mode 100644 index 00000000000..47b7ac9ae78 --- /dev/null +++ b/go/consensus/tendermint/light/client.go @@ -0,0 +1,254 @@ +package light + +import ( + "bytes" + "context" + "errors" + "fmt" + "time" + + "google.golang.org/grpc" + + tmlight "github.com/tendermint/tendermint/light" + tmlightprovider "github.com/tendermint/tendermint/light/provider" + tmlightdb "github.com/tendermint/tendermint/light/store/db" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" + tmdb "github.com/tendermint/tm-db" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" + cmnGrpc "github.com/oasisprotocol/oasis-core/go/common/grpc" + "github.com/oasisprotocol/oasis-core/go/common/identity" + "github.com/oasisprotocol/oasis-core/go/common/node" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer" +) + +// ClientConfig is the configuration for the light client. +type ClientConfig struct { + // GenesisDocument is the Tendermint genesis document. + GenesisDocument *tmtypes.GenesisDoc + + // ConsensusNodes is a list of nodes exposing the Oasis Core public consensus services that are + // used to fetch data required for syncing light clients. The first node is considered the + // primary and at least two nodes must be specified. + ConsensusNodes []node.TLSAddress + + // TrustOptions are Tendermint light client trust options. + TrustOptions tmlight.TrustOptions +} + +// lightClientProvider implements Tendermint's light client provider interface using the Oasis Core +// light client API. +type lightClientProvider struct { + ctx context.Context // XXX: Hack needed because tmlightprovider.Provider doesn't pass contexts. + + chainID string + client consensus.LightClientBackend +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) ChainID() string { + return lp.chainID +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) SignedHeader(height int64) (*tmtypes.SignedHeader, error) { + shdr, err := lp.client.GetSignedHeader(lp.ctx, height) + switch { + case err == nil: + case errors.Is(err, consensus.ErrVersionNotFound): + return nil, tmlightprovider.ErrSignedHeaderNotFound + default: + return nil, fmt.Errorf("failed to fetch signed header: %w", err) + } + + // Decode Tendermint-specific signed header. + var protoSigHdr tmproto.SignedHeader + if err = protoSigHdr.Unmarshal(shdr.Meta); err != nil { + return nil, fmt.Errorf("received malformed header: %w", err) + } + sh, err := tmtypes.SignedHeaderFromProto(&protoSigHdr) + if err != nil { + return nil, fmt.Errorf("received malformed header: %w", err) + } + + if lp.chainID != sh.ChainID { + return nil, fmt.Errorf("incorrect chain ID (expected: %s got: %s)", + lp.chainID, + sh.ChainID, + ) + } + + return sh, nil +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) ValidatorSet(height int64) (*tmtypes.ValidatorSet, error) { + vs, err := lp.client.GetValidatorSet(lp.ctx, height) + switch { + case err == nil: + case errors.Is(err, consensus.ErrVersionNotFound): + return nil, tmlightprovider.ErrValidatorSetNotFound + default: + return nil, fmt.Errorf("failed to fetch validator set: %w", err) + } + + // Decode Tendermint-specific validator set. + var protoVals tmproto.ValidatorSet + if err = protoVals.Unmarshal(vs.Meta); err != nil { + return nil, fmt.Errorf("received malformed validator set: %w", err) + } + vals, err := tmtypes.ValidatorSetFromProto(&protoVals) + if err != nil { + return nil, fmt.Errorf("received malformed validator set: %w", err) + } + + return vals, nil +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) ReportEvidence(ev tmtypes.Evidence) error { + // TODO: Implement SubmitEvidence. + return fmt.Errorf("not yet implemented") +} + +// newLightClientProvider creates a new provider for the Tendermint's light client. +// +// The provided chain ID must be the Tendermint chain ID. +func newLightClientProvider( + ctx context.Context, + chainID string, + address node.TLSAddress, +) (tmlightprovider.Provider, error) { + // Create TLS credentials. + opts := cmnGrpc.ClientOptions{ + CommonName: identity.CommonName, + ServerPubKeys: map[signature.PublicKey]bool{ + address.PubKey: true, + }, + } + creds, err := cmnGrpc.NewClientCreds(&opts) + if err != nil { + return nil, fmt.Errorf("failed to create TLS client credentials: %w", err) + } + + conn, err := cmnGrpc.Dial(address.Address.String(), grpc.WithTransportCredentials(creds)) + if err != nil { + return nil, fmt.Errorf("failed to dial public consensus service endpoint %s: %w", address, err) + } + + return &lightClientProvider{ + ctx: ctx, + chainID: chainID, + client: consensus.NewConsensusLightClient(conn), + }, nil +} + +type lightClient struct { + // tmc is the Tendermint light client used for verifying headers. + tmc *tmlight.Client +} + +// Implements consensus.LightClientBackend. +func (lc *lightClient) GetSignedHeader(ctx context.Context, height int64) (*consensus.SignedHeader, error) { + return lc.getPrimary().GetSignedHeader(ctx, height) +} + +// Implements consensus.LightClientBackend. +func (lc *lightClient) GetValidatorSet(ctx context.Context, height int64) (*consensus.ValidatorSet, error) { + return lc.getPrimary().GetValidatorSet(ctx, height) +} + +// Implements consensus.LightClientBackend. +func (lc *lightClient) GetParameters(ctx context.Context, height int64) (*consensus.Parameters, error) { + return lc.getPrimary().GetParameters(ctx, height) +} + +// Implements consensus.LightClientBackend. +func (lc *lightClient) State() syncer.ReadSyncer { + return lc.getPrimary().State() +} + +// Implements Client. +func (lc *lightClient) GetVerifiedSignedHeader(ctx context.Context, height int64) (*tmtypes.SignedHeader, error) { + return lc.tmc.VerifyHeaderAtHeight(height, time.Now()) +} + +func (lc *lightClient) GetVerifiedValidatorSet(ctx context.Context, height int64) (*tmtypes.ValidatorSet, int64, error) { + return lc.tmc.TrustedValidatorSet(height) +} + +// Implements Client. +func (lc *lightClient) GetVerifiedParameters(ctx context.Context, height int64) (*tmproto.ConsensusParams, error) { + p, err := lc.getPrimary().GetParameters(ctx, height) + if err != nil { + return nil, err + } + if p.Height <= 0 { + return nil, fmt.Errorf("malformed height in response: %d", p.Height) + } + + // Decode Tendermint-specific parameters. + var params tmproto.ConsensusParams + if err = params.Unmarshal(p.Meta); err != nil { + return nil, fmt.Errorf("malformed parameters: %w", err) + } + if err = tmtypes.ValidateConsensusParams(params); err != nil { + return nil, fmt.Errorf("malformed parameters: %w", err) + } + + // Fetch the header from the light client. + h, err := lc.tmc.VerifyHeaderAtHeight(p.Height, time.Now()) + if err != nil { + return nil, fmt.Errorf("failed to fetch header %d from light client: %w", p.Height, err) + } + + // Verify hash. + if localHash := tmtypes.HashConsensusParams(params); !bytes.Equal(localHash, h.ConsensusHash) { + return nil, fmt.Errorf("mismatched parameters hash (expected: %X got: %X)", + h.ConsensusHash, + localHash, + ) + } + + return ¶ms, nil +} + +func (lc *lightClient) getPrimary() consensus.LightClientBackend { + return lc.tmc.Primary().(*lightClientProvider).client +} + +// NewClient creates a new light client. +func NewClient(ctx context.Context, cfg ClientConfig) (Client, error) { + if numNodes := len(cfg.ConsensusNodes); numNodes < 2 { + return nil, fmt.Errorf("at least two consensus nodes must be provided (got %d)", numNodes) + } + + var providers []tmlightprovider.Provider + for _, address := range cfg.ConsensusNodes { + p, err := newLightClientProvider(ctx, cfg.GenesisDocument.ChainID, address) + if err != nil { + return nil, fmt.Errorf("failed to create light client provider: %w", err) + } + providers = append(providers, p) + } + + tmc, err := tmlight.NewClient( + cfg.GenesisDocument.ChainID, + cfg.TrustOptions, + providers[0], // Primary provider. + providers[1:], // Witnesses. + tmlightdb.New(tmdb.NewMemDB(), ""), // TODO: Make the database configurable. + tmlight.MaxRetryAttempts(5), // TODO: Make this configurable. + // TODO: Refactor log adapter to tendermint common. + // tmlight.Logger(newLogAdapter(!viper.GetBool(cfgLogDebug))), + ) + if err != nil { + return nil, fmt.Errorf("failed to create light client: %w", err) + } + + return &lightClient{ + tmc: tmc, + }, nil +} diff --git a/go/consensus/tendermint/light/light.go b/go/consensus/tendermint/light/light.go new file mode 100644 index 00000000000..f31f381b2d7 --- /dev/null +++ b/go/consensus/tendermint/light/light.go @@ -0,0 +1,26 @@ +// Package light provides a light Tendermint consensus backend implementation. +package light + +import ( + "context" + + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" + + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" +) + +// Client is a Tendermint consensus light client that talks with a remote oasis-node that is using +// the Tendermint consensus backend and verifies responses. +type Client interface { + consensus.LightClientBackend + + // GetVerifiedSignedHeader returns a verified signed header. + GetVerifiedSignedHeader(ctx context.Context, height int64) (*tmtypes.SignedHeader, error) + + // GetVerifiedValidatorSet returns a verified validator set. + GetVerifiedValidatorSet(ctx context.Context, height int64) (*tmtypes.ValidatorSet, int64, error) + + // GetVerifiedParameters returns verified consensus parameters. + GetVerifiedParameters(ctx context.Context, height int64) (*tmproto.ConsensusParams, error) +} diff --git a/go/consensus/tendermint/statesync.go b/go/consensus/tendermint/statesync.go new file mode 100644 index 00000000000..f1b8b45c494 --- /dev/null +++ b/go/consensus/tendermint/statesync.go @@ -0,0 +1,123 @@ +package tendermint + +import ( + "context" + "fmt" + "sync" + + tmstate "github.com/tendermint/tendermint/state" + tmstatesync "github.com/tendermint/tendermint/statesync" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/light" +) + +type stateProvider struct { + sync.Mutex + + ctx context.Context + lc light.Client + genesisDocument *tmtypes.GenesisDoc + + logger *logging.Logger +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) AppHash(height uint64) ([]byte, error) { + sp.Lock() + defer sp.Unlock() + + // We have to fetch the next height, which contains the app hash for the previous height. + header, err := sp.lc.GetVerifiedSignedHeader(sp.ctx, int64(height+1)) + if err != nil { + return nil, err + } + return header.AppHash, nil +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) Commit(height uint64) (*tmtypes.Commit, error) { + sp.Lock() + defer sp.Unlock() + + header, err := sp.lc.GetVerifiedSignedHeader(sp.ctx, int64(height)) + if err != nil { + return nil, err + } + return header.Commit, nil +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) State(height uint64) (tmstate.State, error) { + sp.Lock() + defer sp.Unlock() + + state := tmstate.State{ + ChainID: sp.genesisDocument.ChainID, + Version: tmstate.InitStateVersion, + } + // XXX: This will fail in case an upgrade happened inbetween. + state.Version.Consensus.App = version.ConsensusProtocol.ToU64() + + // We need to verify up until h+2, to get the validator set. This also prefetches the headers + // for h and h+1 in the typical case where the trusted header is after the snapshot height. + _, err := sp.lc.GetVerifiedSignedHeader(sp.ctx, int64(height+2)) + if err != nil { + return tmstate.State{}, err + } + header, err := sp.lc.GetVerifiedSignedHeader(sp.ctx, int64(height)) + if err != nil { + return tmstate.State{}, err + } + nextHeader, err := sp.lc.GetVerifiedSignedHeader(sp.ctx, int64(height+1)) + if err != nil { + return tmstate.State{}, err + } + state.LastBlockHeight = header.Height + state.LastBlockTime = header.Time + state.LastBlockID = header.Commit.BlockID + state.AppHash = nextHeader.AppHash + state.LastResultsHash = nextHeader.LastResultsHash + + state.LastValidators, _, err = sp.lc.GetVerifiedValidatorSet(sp.ctx, int64(height)) + if err != nil { + return tmstate.State{}, err + } + state.Validators, _, err = sp.lc.GetVerifiedValidatorSet(sp.ctx, int64(height+1)) + if err != nil { + return tmstate.State{}, err + } + state.NextValidators, _, err = sp.lc.GetVerifiedValidatorSet(sp.ctx, int64(height+2)) + if err != nil { + return tmstate.State{}, err + } + state.LastHeightValidatorsChanged = int64(height) + + // Fetch consensus parameters with light client verification. + params, err := sp.lc.GetVerifiedParameters(sp.ctx, nextHeader.Height) + if err != nil { + return tmstate.State{}, fmt.Errorf("failed to fetch consensus parameters for height %d: %w", + nextHeader.Height, + err, + ) + } + state.ConsensusParams = *params + + return state, nil +} + +func newStateProvider(ctx context.Context, cfg light.ClientConfig) (tmstatesync.StateProvider, error) { + lc, err := light.NewClient(ctx, cfg) + if err != nil { + return nil, err + } + + return &stateProvider{ + ctx: ctx, + lc: lc, + genesisDocument: cfg.GenesisDocument, + logger: logging.GetLogger("consensus/tendermint/stateprovider"), + }, nil +} diff --git a/go/consensus/tendermint/tendermint.go b/go/consensus/tendermint/tendermint.go index 3e383a162b2..6c01dbbc366 100644 --- a/go/consensus/tendermint/tendermint.go +++ b/go/consensus/tendermint/tendermint.go @@ -19,6 +19,7 @@ import ( tmconfig "github.com/tendermint/tendermint/config" tmlog "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmlight "github.com/tendermint/tendermint/light" tmmempool "github.com/tendermint/tendermint/mempool" tmnode "github.com/tendermint/tendermint/node" tmp2p "github.com/tendermint/tendermint/p2p" @@ -26,6 +27,7 @@ import ( tmcli "github.com/tendermint/tendermint/rpc/client/local" tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" tmstate "github.com/tendermint/tendermint/state" + tmstatesync "github.com/tendermint/tendermint/statesync" tmtypes "github.com/tendermint/tendermint/types" tmdb "github.com/tendermint/tm-db" @@ -54,6 +56,7 @@ import ( tmepochtime "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/epochtime" tmepochtimemock "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/epochtime_mock" tmkeymanager "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/keymanager" + "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/light" tmregistry "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/registry" tmroothash "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/roothash" tmscheduler "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/scheduler" @@ -134,6 +137,18 @@ const ( // CfgSupplementarySanityInterval configures the supplementary sanity check interval. CfgSupplementarySanityInterval = "consensus.tendermint.supplementarysanity.interval" + // CfgConsensusStateSyncEnabled enabled consensus state sync. + CfgConsensusStateSyncEnabled = "consensus.tendermint.state_sync.enabled" + // CfgConsensusStateSyncConsensusNode specifies nodes exposing public consensus services which + // are used to sync a light client. + CfgConsensusStateSyncConsensusNode = "consensus.tendermint.state_sync.consensus_node" + // CfgConsensusStateSyncTrustPeriod is the light client trust period. + CfgConsensusStateSyncTrustPeriod = "consensus.tendermint.state_sync.trust_period" + // CfgConsensusStateSyncTrustHeight is the known trusted height for the light client. + CfgConsensusStateSyncTrustHeight = "consensus.tendermint.state_sync.trust_height" + // CfgConsensusStateSyncTrustHash is the known trusted block header hash for the light client. + CfgConsensusStateSyncTrustHash = "consensus.tendermint.state_sync.trust_hash" + // StateDir is the name of the directory located inside the node's data // directory which contains the tendermint state. StateDir = "tendermint" @@ -1258,6 +1273,40 @@ func (t *tendermintService) lazyInit() error { return db, nil } + // Configure state sync if enabled. + var stateProvider tmstatesync.StateProvider + if viper.GetBool(CfgConsensusStateSyncEnabled) { + t.Logger.Info("state sync enabled") + + // Enable state sync in the configuration. + tenderConfig.StateSync.Enable = true + tenderConfig.StateSync.TrustHash = viper.GetString(CfgConsensusStateSyncTrustHash) + + // Create new state sync state provider. + cfg := light.ClientConfig{ + GenesisDocument: tmGenDoc, + TrustOptions: tmlight.TrustOptions{ + Period: viper.GetDuration(CfgConsensusStateSyncTrustPeriod), + Height: int64(viper.GetUint64(CfgConsensusStateSyncTrustHeight)), + Hash: tenderConfig.StateSync.TrustHashBytes(), + }, + } + for _, rawAddr := range viper.GetStringSlice(CfgConsensusStateSyncConsensusNode) { + var addr node.TLSAddress + if err = addr.UnmarshalText([]byte(rawAddr)); err != nil { + return fmt.Errorf("failed to parse state sync consensus node address (%s): %w", rawAddr, err) + } + + cfg.ConsensusNodes = append(cfg.ConsensusNodes, addr) + } + if stateProvider, err = newStateProvider(t.ctx, cfg); err != nil { + t.Logger.Error("failed to create state sync state provider", + "err", err, + ) + return fmt.Errorf("failed to create state sync state provider: %w", err) + } + } + // HACK: tmnode.NewNode() triggers block replay and or ABCI chain // initialization, instead of t.node.Start(). This is a problem // because at the time that lazyInit() is called, none of the ABCI @@ -1287,6 +1336,7 @@ func (t *tendermintService) lazyInit() error { wrapDbProvider, tmnode.DefaultMetricsProvider(tenderConfig.Instrumentation), newLogAdapter(!viper.GetBool(cfgLogDebug)), + tmnode.StateProvider(stateProvider), ) if err != nil { return fmt.Errorf("tendermint: failed to create node: %w", err) @@ -1615,6 +1665,13 @@ func init() { Flags.Bool(CfgDebugDisableCheckTx, false, "do not perform CheckTx on incoming transactions (UNSAFE)") Flags.Bool(CfgDebugUnsafeReplayRecoverCorruptedWAL, false, "Enable automatic recovery from corrupted WAL during replay (UNSAFE).") + // State sync. + Flags.Bool(CfgConsensusStateSyncEnabled, false, "enable state sync") + Flags.StringSlice(CfgConsensusStateSyncConsensusNode, []string{}, "consensus node to use for syncing the light client") + Flags.Duration(CfgConsensusStateSyncTrustPeriod, 24*time.Hour, "light client trust period") + Flags.Uint64(CfgConsensusStateSyncTrustHeight, 0, "light client trusted height") + Flags.String(CfgConsensusStateSyncTrustHash, "", "light client trusted consensus header hash") + Flags.Bool(CfgSupplementarySanityEnabled, false, "enable supplementary sanity checks (slows down consensus)") Flags.Uint64(CfgSupplementarySanityInterval, 10, "supplementary sanity check interval (in blocks)") diff --git a/go/oasis-node/cmd/genesis/genesis.go b/go/oasis-node/cmd/genesis/genesis.go index e52446e9fbb..abbd39596e4 100644 --- a/go/oasis-node/cmd/genesis/genesis.go +++ b/go/oasis-node/cmd/genesis/genesis.go @@ -88,9 +88,9 @@ const ( cfgConsensusMaxBlockGas = "consensus.tendermint.max_block_gas" cfgConsensusMaxEvidenceAgeBlocks = "consensus.tendermint.max_evidence_age_blocks" cfgConsensusMaxEvidenceAgeTime = "consensus.tendermint.max_evidence_age_time" - cfgConsensusStateCheckpointInterval = "consensus.state_checkpoint.interval" - cfgConsensusStateCheckpointNumKept = "consensus.state_checkpoint.num_kept" - cfgConsensusStateCheckpointChunkSize = "consensus.state_checkpoint.chunk_size" + CfgConsensusStateCheckpointInterval = "consensus.state_checkpoint.interval" + CfgConsensusStateCheckpointNumKept = "consensus.state_checkpoint.num_kept" + CfgConsensusStateCheckpointChunkSize = "consensus.state_checkpoint.chunk_size" CfgConsensusGasCostsTxByte = "consensus.gas_costs.tx_byte" cfgConsensusBlacklistPublicKey = "consensus.blacklist_public_key" @@ -243,9 +243,9 @@ func doInitGenesis(cmd *cobra.Command, args []string) { MaxBlockGas: transaction.Gas(viper.GetUint64(cfgConsensusMaxBlockGas)), MaxEvidenceAgeBlocks: viper.GetUint64(cfgConsensusMaxEvidenceAgeBlocks), MaxEvidenceAgeTime: viper.GetDuration(cfgConsensusMaxEvidenceAgeTime), - StateCheckpointInterval: viper.GetUint64(cfgConsensusStateCheckpointInterval), - StateCheckpointNumKept: viper.GetUint64(cfgConsensusStateCheckpointNumKept), - StateCheckpointChunkSize: uint64(viper.GetSizeInBytes(cfgConsensusStateCheckpointChunkSize)), + StateCheckpointInterval: viper.GetUint64(CfgConsensusStateCheckpointInterval), + StateCheckpointNumKept: viper.GetUint64(CfgConsensusStateCheckpointNumKept), + StateCheckpointChunkSize: uint64(viper.GetSizeInBytes(CfgConsensusStateCheckpointChunkSize)), GasCosts: transaction.Costs{ consensusGenesis.GasOpTxByte: transaction.Gas(viper.GetUint64(CfgConsensusGasCostsTxByte)), }, @@ -698,9 +698,9 @@ func init() { initGenesisFlags.Uint64(cfgConsensusMaxBlockGas, 0, "tendermint max gas used per block") initGenesisFlags.Uint64(cfgConsensusMaxEvidenceAgeBlocks, 100000, "tendermint max evidence age (in blocks)") initGenesisFlags.Duration(cfgConsensusMaxEvidenceAgeTime, 48*time.Hour, "tendermint max evidence age (in time)") - initGenesisFlags.Uint64(cfgConsensusStateCheckpointInterval, 10000, "consensus state checkpoint interval (in blocks)") - initGenesisFlags.Uint64(cfgConsensusStateCheckpointNumKept, 2, "number of kept consensus state checkpoints") - initGenesisFlags.String(cfgConsensusStateCheckpointChunkSize, "8mb", "consensus state checkpoint chunk size (in bytes)") + initGenesisFlags.Uint64(CfgConsensusStateCheckpointInterval, 10000, "consensus state checkpoint interval (in blocks)") + initGenesisFlags.Uint64(CfgConsensusStateCheckpointNumKept, 2, "number of kept consensus state checkpoints") + initGenesisFlags.String(CfgConsensusStateCheckpointChunkSize, "8mb", "consensus state checkpoint chunk size (in bytes)") initGenesisFlags.Uint64(CfgConsensusGasCostsTxByte, 1, "consensus gas costs: each transaction byte") initGenesisFlags.StringSlice(cfgConsensusBlacklistPublicKey, nil, "blacklist public key") diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index ec69a6d6adb..5048478af0a 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -174,6 +174,22 @@ func (args *argBuilder) tendermintDebugAllowDuplicateIP() *argBuilder { return args } +func (args *argBuilder) tendermintStateSync( + consensusNodes []string, + trustHeight uint64, + trustHash string, +) *argBuilder { + args.vec = append(args.vec, + "--"+tendermint.CfgConsensusStateSyncEnabled, + "--"+tendermint.CfgConsensusStateSyncTrustHeight, strconv.FormatUint(trustHeight, 10), + "--"+tendermint.CfgConsensusStateSyncTrustHash, trustHash, + ) + for _, address := range consensusNodes { + args.vec = append(args.vec, "--"+tendermint.CfgConsensusStateSyncConsensusNode, address) + } + return args +} + func (args *argBuilder) storageBackend(backend string) *argBuilder { args.vec = append(args.vec, []string{ "--" + storage.CfgBackend, backend, diff --git a/go/oasis-test-runner/oasis/oasis.go b/go/oasis-test-runner/oasis/oasis.go index e7cceac2198..1e55ac0d034 100644 --- a/go/oasis-test-runner/oasis/oasis.go +++ b/go/oasis-test-runner/oasis/oasis.go @@ -57,6 +57,13 @@ const ( maxNodes = 32 // Arbitrary ) +// ConsensusStateSyncCfg is a node's consensus state sync configuration. +type ConsensusStateSyncCfg struct { + ConsensusNodes []string + TrustHeight uint64 + TrustHash string +} + // Node defines the common fields for all node types. type Node struct { // nolint: maligned sync.Mutex @@ -80,6 +87,7 @@ type Node struct { // nolint: maligned logWatcherHandlerFactories []log.WatcherHandlerFactory consensus ConsensusFixture + consensusStateSync *ConsensusStateSyncCfg customGrpcSocketPath string } @@ -191,6 +199,14 @@ func (n *Node) handleExit(cmdErr error) error { } } +// SetConsensusStateSync configures wheteher a node should perform +func (n *Node) SetConsensusStateSync(cfg *ConsensusStateSyncCfg) { + n.Lock() + defer n.Unlock() + + n.consensusStateSync = cfg +} + // NodeCfg defines the common node configuration options. type NodeCfg struct { // nolint: maligned AllowEarlyTermination bool @@ -735,6 +751,13 @@ func (net *Network) startOasisNode( extraArgs = extraArgs.debugDontBlameOasis() extraArgs = extraArgs.grpcDebugGrpcInternalSocketPath(node.customGrpcSocketPath) } + if node.consensusStateSync != nil { + extraArgs = extraArgs.tendermintStateSync( + node.consensusStateSync.ConsensusNodes, + node.consensusStateSync.TrustHeight, + node.consensusStateSync.TrustHash, + ) + } if viper.IsSet(metrics.CfgMetricsAddr) { extraArgs = extraArgs.appendNodeMetrics(node) } @@ -802,6 +825,8 @@ func (net *Network) makeGenesis() error { "--" + genesis.CfgRegistryDebugAllowTestRuntimes, "true", "--scheduler.max_validators_per_entity", strconv.Itoa(len(net.Validators())), "--" + genesis.CfgConsensusGasCostsTxByte, strconv.FormatUint(uint64(net.cfg.Consensus.Parameters.GasCosts[consensusGenesis.GasOpTxByte]), 10), + "--" + genesis.CfgConsensusStateCheckpointInterval, strconv.FormatUint(net.cfg.Consensus.Parameters.StateCheckpointInterval, 10), + "--" + genesis.CfgConsensusStateCheckpointNumKept, strconv.FormatUint(net.cfg.Consensus.Parameters.StateCheckpointNumKept, 10), "--" + genesis.CfgStakingTokenSymbol, genesisTestHelpers.TestStakingTokenSymbol, "--" + genesis.CfgStakingTokenValueExponent, strconv.FormatUint( uint64(genesisTestHelpers.TestStakingTokenValueExponent), 10), diff --git a/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go new file mode 100644 index 00000000000..a598b167c9c --- /dev/null +++ b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go @@ -0,0 +1,146 @@ +package e2e + +import ( + "context" + "encoding/hex" + "fmt" + "time" + + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + control "github.com/oasisprotocol/oasis-core/go/control/api" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" +) + +// ConsensusStateSync is the consensus state sync scenario. +var ConsensusStateSync scenario.Scenario = &consensusStateSyncImpl{ + E2E: *NewE2E("consensus-state-sync"), +} + +type consensusStateSyncImpl struct { + E2E +} + +func (sc *consensusStateSyncImpl) Clone() scenario.Scenario { + return &consensusStateSyncImpl{ + E2E: sc.E2E.Clone(), + } +} + +func (sc *consensusStateSyncImpl) Fixture() (*oasis.NetworkFixture, error) { + f, err := sc.E2E.Fixture() + if err != nil { + return nil, err + } + + // Enable checkpoints. + f.Network.Consensus.Parameters.StateCheckpointInterval = 10 + f.Network.Consensus.Parameters.StateCheckpointNumKept = 100 + f.Network.Consensus.Parameters.StateCheckpointChunkSize = 1024 * 1024 + // Add an extra validator. + f.Validators = append(f.Validators, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + ) + + return f, nil +} + +func (sc *consensusStateSyncImpl) Run(childEnv *env.Env) error { + if err := sc.Net.Start(); err != nil { + return err + } + + sc.Logger.Info("waiting for network to come up") + ctx := context.Background() + if err := sc.Net.Controller().WaitNodesRegistered(ctx, len(sc.Net.Validators())); err != nil { + return err + } + + // Stop one of the validators. + val := sc.Net.Validators()[2] + if err := val.Stop(); err != nil { + return fmt.Errorf("failed to stop validator: %w", err) + } + + // Let the network run for 50 blocks. This should generate some checkpoints. + blockCh, blockSub, err := sc.Net.Controller().Consensus.WatchBlocks(ctx) + if err != nil { + return err + } + defer blockSub.Close() + + sc.Logger.Info("waiting for some blocks") + var blk *consensus.Block + for { + select { + case blk = <-blockCh: + if blk.Height < 50 { + continue + } + case <-time.After(30 * time.Second): + return fmt.Errorf("timed out waiting for blocks") + } + + break + } + + sc.Logger.Info("got some blocks, starting the validator back", + "trust_height", blk.Height, + "trust_hash", hex.EncodeToString(blk.Hash), + ) + + // Get the TLS public key from the validators. + var consensusNodes []string + for _, v := range sc.Net.Validators()[:2] { + var ctrl *oasis.Controller + ctrl, err = oasis.NewController(v.SocketPath()) + if err != nil { + return fmt.Errorf("failed to create controller for validator %s: %w", v.Name, err) + } + + var status *control.Status + status, err = ctrl.GetStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get status for validator %s: %w", v.Name, err) + } + + if status.Registration.Descriptor == nil { + return fmt.Errorf("validator %s has not registered", v.Name) + } + if len(status.Registration.Descriptor.TLS.Addresses) == 0 { + return fmt.Errorf("validator %s has no TLS addresses", v.Name) + } + + var rawAddress []byte + tlsAddress := status.Registration.Descriptor.TLS.Addresses[0] + rawAddress, err = tlsAddress.MarshalText() + if err != nil { + return fmt.Errorf("failed to marshal TLS address: %w", err) + } + consensusNodes = append(consensusNodes, string(rawAddress)) + } + + // Configure state sync for the consensus validator. + val.SetConsensusStateSync(&oasis.ConsensusStateSyncCfg{ + ConsensusNodes: consensusNodes, + TrustHeight: uint64(blk.Height), + TrustHash: hex.EncodeToString(blk.Hash), + }) + + if err = val.Start(); err != nil { + return fmt.Errorf("failed to start validator back: %w", err) + } + + // Wait for the validator to finish syncing. + sc.Logger.Info("waiting for the validator to sync") + valCtrl, err := oasis.NewController(val.SocketPath()) + if err != nil { + return err + } + if err = valCtrl.WaitSync(ctx); err != nil { + return err + } + + return nil +} diff --git a/go/oasis-test-runner/scenario/e2e/e2e.go b/go/oasis-test-runner/scenario/e2e/e2e.go index fdcd8fd7e10..ac53a0e27bb 100644 --- a/go/oasis-test-runner/scenario/e2e/e2e.go +++ b/go/oasis-test-runner/scenario/e2e/e2e.go @@ -328,6 +328,8 @@ func RegisterScenarios() error { Debond, // Early query test. EarlyQuery, + // Consensus state sync. + ConsensusStateSync, } { if err := cmd.Register(s); err != nil { return err