Skip to content

Commit

Permalink
Merge pull request #2882 from oasisprotocol/kostko/feature/tm-state-s…
Browse files Browse the repository at this point in the history
…ync-test

go/consensus/tendermint: Add support for state sync
  • Loading branch information
kostko authored Aug 4, 2020
2 parents 3a0b2f3 + b3912eb commit 3c03fc2
Show file tree
Hide file tree
Showing 39 changed files with 1,159 additions and 174 deletions.
1 change: 1 addition & 0 deletions .changelog/2880.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Add support for state sync
1 change: 1 addition & 0 deletions .changelog/2882.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Bump Tendermint Core to 0.34
9 changes: 9 additions & 0 deletions .changelog/2882.cfg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Remove explicit evidence-related consensus parameters

The following evidence-related consensus parameters have been removed as they
are now derived based on the debonding period and other parameters:

- `max_evidence_age_blocks`
- `max_evidence_age_time`

Make sure to update the genesis file.
2 changes: 1 addition & 1 deletion go/common/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func NewServer(config *ServerConfig) (*Server, error) {
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.MaxSendMsgSize(maxSendMsgSize),
grpc.KeepaliveParams(serverKeepAliveParams),
grpc.CustomCodec(&CBORCodec{}),
grpc.CustomCodec(&CBORCodec{}), // nolint: staticcheck
}
if config.Identity != nil && config.Identity.GetTLSCertificate() != nil {
tlsConfig := &tls.Config{
Expand Down
2 changes: 1 addition & 1 deletion go/common/grpc/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestAccessPolicy(t *testing.T) {
Name: host,
Port: port,
Identity: &identity.Identity{},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&cmnGrpc.CBORCodec{})},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&cmnGrpc.CBORCodec{})}, // nolint: staticcheck
}
serverConfig.Identity.SetTLSCertificate(serverTLSCert)
grpcServer, err := cmnGrpc.NewServer(serverConfig)
Expand Down
2 changes: 1 addition & 1 deletion go/common/grpc/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestGRPCProxy(t *testing.T) {
Name: host,
Port: port,
Identity: &identity.Identity{},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&commonGrpc.CBORCodec{})},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&commonGrpc.CBORCodec{})}, // nolint: staticcheck
}
serverConfig.Identity.SetTLSCertificate(serverTLSCert)
grpcServer, err := commonGrpc.NewServer(serverConfig)
Expand Down
2 changes: 1 addition & 1 deletion go/common/grpc/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestGrpcWrapper(t *testing.T) {
serverConfig := &ServerConfig{
Name: host,
Port: port,
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&CBORCodec{})},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&CBORCodec{})}, // nolint: staticcheck
InstallWrapper: true,
}
grpcServer, err := NewServer(serverConfig)
Expand Down
14 changes: 7 additions & 7 deletions go/consensus/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
)

// Genesis contains various consensus config flags that should be part of the genesis state.
Expand All @@ -16,16 +17,15 @@ type Genesis struct {
}

// Parameters are the consensus parameters.
type Parameters struct {
type Parameters struct { // nolint: maligned
TimeoutCommit time.Duration `json:"timeout_commit"`
SkipTimeoutCommit bool `json:"skip_timeout_commit"`
EmptyBlockInterval time.Duration `json:"empty_block_interval"`

MaxTxSize uint64 `json:"max_tx_size"`
MaxBlockSize uint64 `json:"max_block_size"`
MaxBlockGas transaction.Gas `json:"max_block_gas"`
MaxEvidenceAgeBlocks uint64 `json:"max_evidence_age_blocks"`
MaxEvidenceAgeTime time.Duration `json:"max_evidence_age_time"`
MaxTxSize uint64 `json:"max_tx_size"`
MaxBlockSize uint64 `json:"max_block_size"`
MaxBlockGas transaction.Gas `json:"max_block_gas"`
MaxEvidenceNum uint32 `json:"max_evidence_num"`

// StateCheckpointInterval is the expected state checkpoint interval (in blocks).
StateCheckpointInterval uint64 `json:"state_checkpoint_interval"`
Expand Down Expand Up @@ -54,7 +54,7 @@ func (g *Genesis) SanityCheck() error {
return fmt.Errorf("consensus: sanity check failed: timeout commit must be >= 1ms")
}

if params.StateCheckpointInterval > 0 {
if params.StateCheckpointInterval > 0 && !flags.DebugDontBlameOasis() {
if params.StateCheckpointInterval < 1000 {
return fmt.Errorf("consensus: sanity check failed: state checkpoint interval must be >= 1000")
}
Expand Down
226 changes: 226 additions & 0 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
Expand All @@ -30,6 +31,7 @@ import (
abciState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/abci/state"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
)

Expand Down Expand Up @@ -759,6 +761,13 @@ func (mux *abciMux) EndBlock(req types.RequestEndBlock) types.ResponseEndBlock {
// Update tags.
resp.Events = ctx.GetEvents()

// Update version to what we are actually running.
resp.ConsensusParamUpdates = &types.ConsensusParams{
Version: &tmproto.VersionParams{
AppVersion: version.ConsensusProtocol.ToU64(),
},
}

// Clear block context.
mux.state.blockCtx = nil

Expand Down Expand Up @@ -789,6 +798,223 @@ func (mux *abciMux) Commit() types.ResponseCommit {
}
}

func (mux *abciMux) ListSnapshots(req types.RequestListSnapshots) types.ResponseListSnapshots {
// Get a list of all current checkpoints.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: 1,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseListSnapshots{}
}

var rsp types.ResponseListSnapshots
for _, cp := range cps {
cpHash := cp.EncodedHash()

rsp.Snapshots = append(rsp.Snapshots, &types.Snapshot{
Height: cp.Root.Version,
Format: uint32(cp.Version),
Chunks: uint32(len(cp.Chunks)),
Hash: cpHash[:],
Metadata: cbor.Marshal(cp),
})
}

return rsp
}

func (mux *abciMux) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot {
if req.Snapshot == nil {
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if req.Snapshot.Format != 1 {
mux.logger.Warn("received snapshot with unsupported version",
"version", req.Snapshot.Format,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT_FORMAT}
}

// Decode checkpoint metadata hash and sanity check against the request.
var metadataHash hash.Hash
metadataHash.FromBytes(req.Snapshot.Metadata)
var h hash.Hash
if err := h.UnmarshalBinary(req.Snapshot.Hash); err != nil {
mux.logger.Warn("received snapshot with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if !metadataHash.Equal(&h) {
mux.logger.Warn("received snapshot with mismatching hash",
"expected_hash", h,
"hash", metadataHash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Decode checkpoint metadata.
var cp checkpoint.Metadata
if err := cbor.Unmarshal(req.Snapshot.Metadata, &cp); err != nil {
mux.logger.Warn("received snapshot with malformed metadata",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Number of chunks must match.
if int(req.Snapshot.Chunks) != len(cp.Chunks) {
mux.logger.Warn("received snapshot with mismatching number of chunks",
"expected_chunks", len(cp.Chunks),
"chunks", req.Snapshot.Chunks,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
// Root hash must match.
var appHash hash.Hash
if err := appHash.UnmarshalBinary(req.AppHash); err != nil {
// NOTE: This should never happen as it indicates a problem with Tendermint.
mux.logger.Error("received request with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}
if !cp.Root.Hash.Equal(&appHash) {
mux.logger.Warn("received snapshot with mismatching root hash",
"expected_root", appHash,
"root", cp.Root.Hash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Snapshot seems correct (e.g., it is for the correct root), start the restoration process.
if err := mux.state.storage.Checkpointer().StartRestore(mux.state.ctx, &cp); err != nil {
mux.logger.Error("failed to start restore",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}

mux.logger.Info("started state restore process",
"root", cp.Root,
)

return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ACCEPT}
}

func (mux *abciMux) LoadSnapshotChunk(req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk {
// Fetch the metadata for the specified checkpoint.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: uint16(req.Format),
RootVersion: &req.Height,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
if len(cps) != 1 {
mux.logger.Error("failed to get checkpoints",
"cps", len(cps),
)
return types.ResponseLoadSnapshotChunk{}
}

// Fetch the chunk itself.
chunk, err := cps[0].GetChunkMetadata(uint64(req.Chunk))
if err != nil {
mux.logger.Error("failed to get chunk metadata",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
var buf bytes.Buffer
if err := mux.state.storage.Checkpointer().GetCheckpointChunk(mux.state.ctx, chunk, &buf); err != nil {
mux.logger.Error("failed to get chunk",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}

return types.ResponseLoadSnapshotChunk{Chunk: buf.Bytes()}
}

func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk {
cp := mux.state.storage.Checkpointer().GetCurrentCheckpoint()

mux.logger.Debug("attempting to restore a chunk",
"root", cp.Root,
"index", req.Index,
)

buf := bytes.NewBuffer(req.Chunk)
done, err := mux.state.storage.Checkpointer().RestoreChunk(mux.state.ctx, uint64(req.Index), buf)
switch {
case err == nil:
case errors.Is(err, checkpoint.ErrNoRestoreInProgress):
// This should never happen.
mux.logger.Error("ApplySnapshotChunk called without OfferSnapshot, aborting state sync")
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
case errors.Is(err, checkpoint.ErrChunkAlreadyRestored):
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
case errors.Is(err, checkpoint.ErrChunkCorrupted):
// Corrupted chunk, refetch.
mux.logger.Warn("received corrupted chunk",
"sender", req.Sender,
"index", req.Index,
"err", err,
)

return types.ResponseApplySnapshotChunk{
RefetchChunks: []uint32{req.Index},
// TODO: Consider banning the sender.
Result: types.ResponseApplySnapshotChunk_RETRY,
}
case errors.Is(err, checkpoint.ErrChunkProofVerificationFailed):
// Chunk was as specified in the manifest but did not match the reported root. In this case
// we need to abort processing the given snapshot.
mux.logger.Warn("chunk contains invalid proof, snapshot is bad",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT}
default:
// Unspecified error during restoration.
mux.logger.Error("error during chunk restoration, aborting state sync",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

// Check if we are done with the restoration. In this case, finalize the root.
if done {
err = mux.state.storage.NodeDB().Finalize(mux.state.ctx, cp.Root.Version, []hash.Hash{cp.Root.Hash})
if err != nil {
mux.logger.Error("failed to finalize restored root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

if err = mux.state.doApplyStateSync(cp.Root); err != nil {
mux.logger.Error("failed to apply state sync root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

mux.logger.Info("successfully synced state",
"root", cp.Root,
)
}

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
}

func (mux *abciMux) doCleanup() {
mux.state.doCleanup()

Expand Down
18 changes: 18 additions & 0 deletions go/consensus/tendermint/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,24 @@ func (s *applicationState) doInitChain(now time.Time) error {
return s.doCommitOrInitChainLocked(now)
}

func (s *applicationState) doApplyStateSync(root storage.Root) error {
s.blockLock.Lock()
defer s.blockLock.Unlock()

s.stateRoot = root

s.deliverTxTree.Close()
s.deliverTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())
s.checkTxTree.Close()
s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())

if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil {
return err
}

return nil
}

func (s *applicationState) doCommit(now time.Time) (uint64, error) {
s.blockLock.Lock()
defer s.blockLock.Unlock()
Expand Down
Loading

0 comments on commit 3c03fc2

Please sign in to comment.