Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/consensus/tendermint: Add support for state sync #2882

Merged
merged 5 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/2880.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Add support for state sync
1 change: 1 addition & 0 deletions .changelog/2882.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Bump Tendermint Core to 0.34
9 changes: 9 additions & 0 deletions .changelog/2882.cfg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Remove explicit evidence-related consensus parameters

The following evidence-related consensus parameters have been removed as they
are now derived based on the debonding period and other parameters:

- `max_evidence_age_blocks`
- `max_evidence_age_time`

Make sure to update the genesis file.
2 changes: 1 addition & 1 deletion go/common/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func NewServer(config *ServerConfig) (*Server, error) {
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.MaxSendMsgSize(maxSendMsgSize),
grpc.KeepaliveParams(serverKeepAliveParams),
grpc.CustomCodec(&CBORCodec{}),
grpc.CustomCodec(&CBORCodec{}), // nolint: staticcheck
}
if config.Identity != nil && config.Identity.GetTLSCertificate() != nil {
tlsConfig := &tls.Config{
Expand Down
2 changes: 1 addition & 1 deletion go/common/grpc/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestAccessPolicy(t *testing.T) {
Name: host,
Port: port,
Identity: &identity.Identity{},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&cmnGrpc.CBORCodec{})},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&cmnGrpc.CBORCodec{})}, // nolint: staticcheck
}
serverConfig.Identity.SetTLSCertificate(serverTLSCert)
grpcServer, err := cmnGrpc.NewServer(serverConfig)
Expand Down
2 changes: 1 addition & 1 deletion go/common/grpc/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestGRPCProxy(t *testing.T) {
Name: host,
Port: port,
Identity: &identity.Identity{},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&commonGrpc.CBORCodec{})},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&commonGrpc.CBORCodec{})}, // nolint: staticcheck
}
serverConfig.Identity.SetTLSCertificate(serverTLSCert)
grpcServer, err := commonGrpc.NewServer(serverConfig)
Expand Down
2 changes: 1 addition & 1 deletion go/common/grpc/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestGrpcWrapper(t *testing.T) {
serverConfig := &ServerConfig{
Name: host,
Port: port,
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&CBORCodec{})},
CustomOptions: []grpc.ServerOption{grpc.CustomCodec(&CBORCodec{})}, // nolint: staticcheck
InstallWrapper: true,
}
grpcServer, err := NewServer(serverConfig)
Expand Down
14 changes: 7 additions & 7 deletions go/consensus/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
)

// Genesis contains various consensus config flags that should be part of the genesis state.
Expand All @@ -16,16 +17,15 @@ type Genesis struct {
}

// Parameters are the consensus parameters.
type Parameters struct {
type Parameters struct { // nolint: maligned
TimeoutCommit time.Duration `json:"timeout_commit"`
SkipTimeoutCommit bool `json:"skip_timeout_commit"`
EmptyBlockInterval time.Duration `json:"empty_block_interval"`

MaxTxSize uint64 `json:"max_tx_size"`
MaxBlockSize uint64 `json:"max_block_size"`
MaxBlockGas transaction.Gas `json:"max_block_gas"`
MaxEvidenceAgeBlocks uint64 `json:"max_evidence_age_blocks"`
MaxEvidenceAgeTime time.Duration `json:"max_evidence_age_time"`
MaxTxSize uint64 `json:"max_tx_size"`
MaxBlockSize uint64 `json:"max_block_size"`
MaxBlockGas transaction.Gas `json:"max_block_gas"`
MaxEvidenceNum uint32 `json:"max_evidence_num"`

// StateCheckpointInterval is the expected state checkpoint interval (in blocks).
StateCheckpointInterval uint64 `json:"state_checkpoint_interval"`
Expand Down Expand Up @@ -54,7 +54,7 @@ func (g *Genesis) SanityCheck() error {
return fmt.Errorf("consensus: sanity check failed: timeout commit must be >= 1ms")
}

if params.StateCheckpointInterval > 0 {
if params.StateCheckpointInterval > 0 && !flags.DebugDontBlameOasis() {
if params.StateCheckpointInterval < 1000 {
return fmt.Errorf("consensus: sanity check failed: state checkpoint interval must be >= 1000")
}
Expand Down
226 changes: 226 additions & 0 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
Expand All @@ -30,6 +31,7 @@ import (
abciState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/abci/state"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
)

Expand Down Expand Up @@ -759,6 +761,13 @@ func (mux *abciMux) EndBlock(req types.RequestEndBlock) types.ResponseEndBlock {
// Update tags.
resp.Events = ctx.GetEvents()

// Update version to what we are actually running.
resp.ConsensusParamUpdates = &types.ConsensusParams{
Version: &tmproto.VersionParams{
AppVersion: version.ConsensusProtocol.ToU64(),
},
}

// Clear block context.
mux.state.blockCtx = nil

Expand Down Expand Up @@ -789,6 +798,223 @@ func (mux *abciMux) Commit() types.ResponseCommit {
}
}

func (mux *abciMux) ListSnapshots(req types.RequestListSnapshots) types.ResponseListSnapshots {
// Get a list of all current checkpoints.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: 1,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseListSnapshots{}
}

var rsp types.ResponseListSnapshots
for _, cp := range cps {
cpHash := cp.EncodedHash()

rsp.Snapshots = append(rsp.Snapshots, &types.Snapshot{
Height: cp.Root.Version,
Format: uint32(cp.Version),
Chunks: uint32(len(cp.Chunks)),
Hash: cpHash[:],
Metadata: cbor.Marshal(cp),
})
}

return rsp
}

func (mux *abciMux) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot {
if req.Snapshot == nil {
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if req.Snapshot.Format != 1 {
mux.logger.Warn("received snapshot with unsupported version",
"version", req.Snapshot.Format,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT_FORMAT}
}

// Decode checkpoint metadata hash and sanity check against the request.
var metadataHash hash.Hash
metadataHash.FromBytes(req.Snapshot.Metadata)
var h hash.Hash
if err := h.UnmarshalBinary(req.Snapshot.Hash); err != nil {
mux.logger.Warn("received snapshot with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
if !metadataHash.Equal(&h) {
mux.logger.Warn("received snapshot with mismatching hash",
"expected_hash", h,
"hash", metadataHash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Decode checkpoint metadata.
var cp checkpoint.Metadata
if err := cbor.Unmarshal(req.Snapshot.Metadata, &cp); err != nil {
mux.logger.Warn("received snapshot with malformed metadata",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Number of chunks must match.
if int(req.Snapshot.Chunks) != len(cp.Chunks) {
mux.logger.Warn("received snapshot with mismatching number of chunks",
"expected_chunks", len(cp.Chunks),
"chunks", req.Snapshot.Chunks,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}
// Root hash must match.
var appHash hash.Hash
if err := appHash.UnmarshalBinary(req.AppHash); err != nil {
// NOTE: This should never happen as it indicates a problem with Tendermint.
mux.logger.Error("received request with malformed hash",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}
if !cp.Root.Hash.Equal(&appHash) {
mux.logger.Warn("received snapshot with mismatching root hash",
"expected_root", appHash,
"root", cp.Root.Hash,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT}
}

// Snapshot seems correct (e.g., it is for the correct root), start the restoration process.
if err := mux.state.storage.Checkpointer().StartRestore(mux.state.ctx, &cp); err != nil {
mux.logger.Error("failed to start restore",
"err", err,
)
return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT}
}

mux.logger.Info("started state restore process",
"root", cp.Root,
)

return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ACCEPT}
}

func (mux *abciMux) LoadSnapshotChunk(req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk {
// Fetch the metadata for the specified checkpoint.
cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{
Version: uint16(req.Format),
RootVersion: &req.Height,
})
if err != nil {
mux.logger.Error("failed to get checkpoints",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
if len(cps) != 1 {
mux.logger.Error("failed to get checkpoints",
"cps", len(cps),
)
return types.ResponseLoadSnapshotChunk{}
}

// Fetch the chunk itself.
chunk, err := cps[0].GetChunkMetadata(uint64(req.Chunk))
if err != nil {
mux.logger.Error("failed to get chunk metadata",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}
var buf bytes.Buffer
if err := mux.state.storage.Checkpointer().GetCheckpointChunk(mux.state.ctx, chunk, &buf); err != nil {
mux.logger.Error("failed to get chunk",
"err", err,
)
return types.ResponseLoadSnapshotChunk{}
}

return types.ResponseLoadSnapshotChunk{Chunk: buf.Bytes()}
}

func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk {
cp := mux.state.storage.Checkpointer().GetCurrentCheckpoint()

mux.logger.Debug("attempting to restore a chunk",
"root", cp.Root,
"index", req.Index,
)

buf := bytes.NewBuffer(req.Chunk)
done, err := mux.state.storage.Checkpointer().RestoreChunk(mux.state.ctx, uint64(req.Index), buf)
switch {
case err == nil:
case errors.Is(err, checkpoint.ErrNoRestoreInProgress):
// This should never happen.
mux.logger.Error("ApplySnapshotChunk called without OfferSnapshot, aborting state sync")
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
case errors.Is(err, checkpoint.ErrChunkAlreadyRestored):
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
case errors.Is(err, checkpoint.ErrChunkCorrupted):
// Corrupted chunk, refetch.
mux.logger.Warn("received corrupted chunk",
"sender", req.Sender,
"index", req.Index,
"err", err,
)

return types.ResponseApplySnapshotChunk{
RefetchChunks: []uint32{req.Index},
// TODO: Consider banning the sender.
Result: types.ResponseApplySnapshotChunk_RETRY,
}
case errors.Is(err, checkpoint.ErrChunkProofVerificationFailed):
// Chunk was as specified in the manifest but did not match the reported root. In this case
// we need to abort processing the given snapshot.
mux.logger.Warn("chunk contains invalid proof, snapshot is bad",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT}
default:
// Unspecified error during restoration.
mux.logger.Error("error during chunk restoration, aborting state sync",
"err", err,
)

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

// Check if we are done with the restoration. In this case, finalize the root.
if done {
err = mux.state.storage.NodeDB().Finalize(mux.state.ctx, cp.Root.Version, []hash.Hash{cp.Root.Hash})
if err != nil {
mux.logger.Error("failed to finalize restored root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

if err = mux.state.doApplyStateSync(cp.Root); err != nil {
mux.logger.Error("failed to apply state sync root",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

mux.logger.Info("successfully synced state",
"root", cp.Root,
)
}

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
}

func (mux *abciMux) doCleanup() {
mux.state.doCleanup()

Expand Down
18 changes: 18 additions & 0 deletions go/consensus/tendermint/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,24 @@ func (s *applicationState) doInitChain(now time.Time) error {
return s.doCommitOrInitChainLocked(now)
}

func (s *applicationState) doApplyStateSync(root storage.Root) error {
s.blockLock.Lock()
defer s.blockLock.Unlock()

s.stateRoot = root

s.deliverTxTree.Close()
s.deliverTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())
s.checkTxTree.Close()
s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())

if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil {
Yawning marked this conversation as resolved.
Show resolved Hide resolved
return err
}

return nil
}

func (s *applicationState) doCommit(now time.Time) (uint64, error) {
s.blockLock.Lock()
defer s.blockLock.Unlock()
Expand Down
Loading