Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/storage: Force checkpoint sync when block info unavailable #4080

Merged
merged 7 commits into from
Jun 25, 2021
1 change: 1 addition & 0 deletions .changelog/4080.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/storage: Force checkpoint sync when block info unavailable
1 change: 1 addition & 0 deletions .changelog/4080.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/storage: Synchronize checkpoints with consensus layer
1 change: 1 addition & 0 deletions .changelog/4080.internal.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus: Expose consensus state checkpointer
1 change: 1 addition & 0 deletions .changelog/4080.internal.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/storage/mkvs/checkpoint: Add ForceCheckpoint and WatchCheckpoints
5 changes: 5 additions & 0 deletions .changelog/4080.internal.3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/runtime: Delay subscriptions until after consensus sync

Previously if the node used consensus state sync it would fail to receive any
updates for the various descriptors until the descriptors were updated after
the state sync checkpoint.
6 changes: 6 additions & 0 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
mkvsNode "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
)

Expand Down Expand Up @@ -228,6 +229,11 @@ type Backend interface {

// GetAddresses returns the consensus backend addresses.
GetAddresses() ([]node.ConsensusAddress, error)

// Checkpointer returns the checkpointer associated with consensus state.
//
// This may be nil in case checkpoints are disabled.
Checkpointer() checkpoint.Checkpointer
}

// HaltHook is a function that gets called when consensus needs to halt for some reason.
Expand Down
4 changes: 4 additions & 0 deletions go/consensus/tendermint/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (s *applicationState) Storage() storage.LocalBackend {
return s.storage
}

func (s *applicationState) Checkpointer() checkpoint.Checkpointer {
return s.checkpointer
}

func (s *applicationState) InitialHeight() int64 {
return int64(s.initialHeight)
}
Expand Down
10 changes: 10 additions & 0 deletions go/consensus/tendermint/api/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
)

Expand Down Expand Up @@ -77,6 +78,11 @@ type ApplicationQueryState interface {
// Storage returns the storage backend.
Storage() storage.LocalBackend

// Checkpointer returns the checkpointer associated with the application state.
//
// This may be nil in case checkpoints are disabled.
Checkpointer() checkpoint.Checkpointer

// BlockHeight returns the last committed block height.
BlockHeight() int64

Expand Down Expand Up @@ -125,6 +131,10 @@ func (ms *mockApplicationState) Storage() storage.LocalBackend {
panic("not implemented")
}

func (ms *mockApplicationState) Checkpointer() checkpoint.Checkpointer {
return nil
}

func (ms *mockApplicationState) InitialHeight() int64 {
return ms.cfg.Genesis.Height
}
Expand Down
5 changes: 5 additions & 0 deletions go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
roothashAPI "github.com/oasisprotocol/oasis-core/go/roothash/api"
schedulerAPI "github.com/oasisprotocol/oasis-core/go/scheduler/api"
stakingAPI "github.com/oasisprotocol/oasis-core/go/staking/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
upgradeAPI "github.com/oasisprotocol/oasis-core/go/upgrade/api"
)

Expand Down Expand Up @@ -318,6 +319,10 @@ func (t *fullService) GetAddresses() ([]node.ConsensusAddress, error) {
return []node.ConsensusAddress{addr}, nil
}

func (t *fullService) Checkpointer() checkpoint.Checkpointer {
return t.mux.State().Checkpointer()
}

func (t *fullService) StateToGenesis(ctx context.Context, blockHeight int64) (*genesisAPI.Document, error) {
blk, err := t.GetTendermintBlock(ctx, blockHeight)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions go/consensus/tendermint/seed/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer"
)

Expand Down Expand Up @@ -173,6 +174,11 @@ func (srv *seedService) GetAddresses() ([]node.ConsensusAddress, error) {
return []node.ConsensusAddress{addr}, nil
}

// Implements Backend.
func (srv *seedService) Checkpointer() checkpoint.Checkpointer {
return nil
}

// Implements Backend.
func (srv *seedService) SubmitEvidence(ctx context.Context, evidence *consensus.Evidence) error {
return consensus.ErrUnsupported
Expand Down
15 changes: 6 additions & 9 deletions go/keymanager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ var ErrKeyManagerNotAvailable = errors.New("keymanager/client: key manager not a
type Client struct {
runtime runtimeRegistry.Runtime

backend api.Backend
registry registry.Backend
consensus consensus.Backend

ctx context.Context
initCh chan struct{}
Expand Down Expand Up @@ -116,7 +115,7 @@ func (c *Client) CallRemote(ctx context.Context, data []byte) ([]byte, error) {
}

func (c *Client) worker() {
stCh, stSub := c.backend.WatchStatuses()
stCh, stSub := c.consensus.KeyManager().WatchStatuses()
defer stSub.Close()

rtCh, rtSub, err := c.runtime.WatchRegistryDescriptor()
Expand Down Expand Up @@ -155,7 +154,7 @@ func (c *Client) worker() {
}

// Fetch current key manager status.
st, err := c.backend.GetStatus(c.ctx, &registry.NamespaceQuery{
st, err := c.consensus.KeyManager().GetStatus(c.ctx, &registry.NamespaceQuery{
ID: *kmID,
Height: consensus.HeightLatest,
})
Expand Down Expand Up @@ -208,11 +207,10 @@ func (c *Client) updateState(status *api.Status) {
func New(
ctx context.Context,
runtime runtimeRegistry.Runtime,
backend api.Backend,
registry registry.Backend,
consensus consensus.Backend,
identity *identity.Identity,
) (*Client, error) {
committeeNodes, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, registry)
committeeNodes, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, consensus)
if err != nil {
return nil, fmt.Errorf("keymanager/client: failed to create node descriptor watcher: %w", err)
}
Expand All @@ -228,8 +226,7 @@ func New(

c := &Client{
runtime: runtime,
backend: backend,
registry: registry,
consensus: consensus,
ctx: ctx,
initCh: make(chan struct{}),
committeeWatcher: committeeNodes,
Expand Down
5 changes: 5 additions & 0 deletions go/oasis-node/cmd/debug/dumpdb/dumpdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
storageDB "github.com/oasisprotocol/oasis-core/go/storage/database"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
)

const (
Expand Down Expand Up @@ -385,6 +386,10 @@ func (qs *dumpQueryState) Storage() storage.LocalBackend {
return qs.ldb
}

func (qs *dumpQueryState) Checkpointer() checkpoint.Checkpointer {
return nil
}

func (qs *dumpQueryState) BlockHeight() int64 {
return qs.height
}
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) {
require.NoError(t, err, "GetRuntime")
localBackend := rt.Storage().(storageAPI.LocalBackend)

client, err := storageClient.NewStatic(ctx, node.Identity, node.Consensus.Registry(), node.Identity.NodeSigner.Public())
client, err := storageClient.NewStatic(ctx, node.Identity, node.Consensus, node.Identity.NodeSigner.Public())
require.NoError(t, err, "NewStatic")

// Determine the current round. This is required so that we can commit into
Expand Down
89 changes: 82 additions & 7 deletions go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package runtime

import (
"context"
"encoding/hex"
"fmt"
"strings"
"time"

control "github.com/oasisprotocol/oasis-core/go/control/api"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
Expand Down Expand Up @@ -45,6 +47,11 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) {
// new node registers.
f.Network.SetMockEpoch()

// Enable consensus layer checkpoints.
f.Network.Consensus.Parameters.StateCheckpointInterval = 10
f.Network.Consensus.Parameters.StateCheckpointNumKept = 2
f.Network.Consensus.Parameters.StateCheckpointChunkSize = 1024 * 1024

// Make the first storage worker check for checkpoints more often.
f.StorageWorkers[0].CheckpointCheckInterval = 1 * time.Second
// Configure runtime to allow a smaller replication factor as otherwise execution may fail when
Expand All @@ -69,11 +76,22 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) {
CheckpointSyncEnabled: true,
LogWatcherHandlerFactories: []log.WatcherHandlerFactory{oasis.LogAssertCheckpointSync()},
})
// And one more storage worker that will sync the consensus layer via state sync.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
NoAutoStart: true,
CheckpointSyncEnabled: true,
LogWatcherHandlerFactories: []log.WatcherHandlerFactory{
oasis.LogAssertCheckpointSync(),
oasis.LogEventABCIStateSyncComplete(),
},
})

return f, nil
}

func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
func (sc *storageSyncImpl) Run(childEnv *env.Env) error { //nolint: gocyclo
clientErrCh, cmd, err := sc.runtimeImpl.start(childEnv)
if err != nil {
return err
Expand Down Expand Up @@ -196,15 +214,72 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
}
}

// Now spin up the last storage worker and check if it syncs with a checkpoint.
sc.Logger.Info("running first late storage worker")

// Now spin up the first late storage worker and check if it syncs with a checkpoint.
lateWorker := sc.Net.StorageWorkers()[3]
err = lateWorker.Start()
if err != nil {
return fmt.Errorf("can't start last storage worker: %w", err)
if err = lateWorker.Start(); err != nil {
return fmt.Errorf("can't start first late storage worker: %w", err)
}
if err := lateWorker.WaitReady(ctx); err != nil {
return fmt.Errorf("error waiting for late storage worker to become ready: %w", err)
if err = lateWorker.WaitReady(ctx); err != nil {
return fmt.Errorf("error waiting for first late storage worker to become ready: %w", err)
}

sc.Logger.Info("running second late storage worker")

// Get the TLS public key from the validators.
var (
consensusNodes []string
trustHeight uint64
trustHash string
)
for _, v := range sc.Net.Validators() {
var ctrl *oasis.Controller
ctrl, err = oasis.NewController(v.SocketPath())
if err != nil {
return fmt.Errorf("failed to create controller for validator %s: %w", v.Name, err)
}

var status *control.Status
status, err = ctrl.GetStatus(ctx)
if err != nil {
return fmt.Errorf("failed to get status for validator %s: %w", v.Name, err)
}

if status.Registration.Descriptor == nil {
return fmt.Errorf("validator %s has not registered", v.Name)
}
if len(status.Registration.Descriptor.TLS.Addresses) == 0 {
return fmt.Errorf("validator %s has no TLS addresses", v.Name)
}

var rawAddress []byte
tlsAddress := status.Registration.Descriptor.TLS.Addresses[0]
rawAddress, err = tlsAddress.MarshalText()
if err != nil {
return fmt.Errorf("failed to marshal TLS address: %w", err)
}
consensusNodes = append(consensusNodes, string(rawAddress))

trustHeight = uint64(status.Consensus.LatestHeight)
trustHash = hex.EncodeToString(status.Consensus.LatestHash)
}

// Configure state sync for the last storage node.
lateWorker = sc.Net.StorageWorkers()[4]
lateWorker.SetConsensusStateSync(&oasis.ConsensusStateSyncCfg{
ConsensusNodes: consensusNodes,
TrustHeight: trustHeight,
TrustHash: trustHash,
})

if err = lateWorker.Start(); err != nil {
return fmt.Errorf("can't start second late storage worker: %w", err)
}
if err = lateWorker.WaitReady(ctx); err != nil {
return fmt.Errorf("error waiting for second late storage worker to become ready: %w", err)
}

// Wait a bit to give the logger in the node time to sync; the message has already been
// logged by this point, it just might not be on disk yet.
<-time.After(1 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion go/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (c *runtimeClient) CallEnclave(ctx context.Context, request *enclaverpc.Cal
if km = c.kmClients[rt.ID()]; km == nil {
c.logger.Debug("creating new key manager client instance")

km, err = keymanager.New(c.common.ctx, rt, c.common.consensus.KeyManager(), c.common.consensus.Registry(), nil)
km, err = keymanager.New(c.common.ctx, rt, c.common.consensus, nil)
if err != nil {
c.Unlock()
c.logger.Error("failed to create key manager client instance",
Expand Down
Loading