Skip to content

Commit

Permalink
Merge pull request #13200 from serathius/downgrade-storage
Browse files Browse the repository at this point in the history
Implement schema migration and panic when trying to downgrade storage
  • Loading branch information
ptabor authored Sep 10, 2021
2 parents 58fb625 + 79f6faa commit c2937d7
Show file tree
Hide file tree
Showing 20 changed files with 1,361 additions and 169 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Previous change logs can be found at [CHANGELOG-3.5](https://github.com/etcd-io/

See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).

### Breaking Changes

- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command.

### etcdctl v3

- Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133).
Expand Down
27 changes: 14 additions & 13 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,29 +103,30 @@ func migrateCommandFunc(c *migrateConfig) error {
defer c.be.Close()
lg := GetLogger()
tx := c.be.BatchTx()
tx.Lock()
current, err := schema.DetectSchemaVersion(lg, tx)
if err != nil {
tx.Unlock()
lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
return err
}
if *current == *c.targetVersion {
tx.Unlock()
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(current)))
if current == *c.targetVersion {
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}
if c.force {
unsafeMigrateForce(lg, tx, c.targetVersion)
tx.Unlock()
c.be.ForceCommit()
return nil
err = schema.Migrate(lg, tx, *c.targetVersion)
if err != nil {
if !c.force {
return err
}
lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(lg, tx, c.targetVersion)
}
tx.Unlock()
return fmt.Errorf("storage version migration is not yet supported")
c.be.ForceCommit()
return nil
}

func unsafeMigrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
tx.Lock()
defer tx.Unlock()
// Storage version is only supported since v3.6
if target.LessThan(schema.V3_6) {
schema.UnsafeClearStorageVersion(tx)
Expand Down
52 changes: 52 additions & 0 deletions pkg/notify/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package notify

import (
"sync"
)

// Notifier is a thread safe struct that can be used to send notification about
// some event to multiple consumers.
type Notifier struct {
mu sync.RWMutex
channel chan struct{}
}

// NewNotifier returns new notifier
func NewNotifier() *Notifier {
return &Notifier{
channel: make(chan struct{}),
}
}

// Receive returns channel that can be used to wait for notification.
// Consumers will be informed by closing the channel.
func (n *Notifier) Receive() <-chan struct{} {
n.mu.RLock()
defer n.mu.RUnlock()
return n.channel
}

// Notify closes the channel passed to consumers and creates new channel to used
// for next notification.
func (n *Notifier) Notify() {
newChannel := make(chan struct{})
n.mu.Lock()
channelToClose := n.channel
n.channel = newChannel
n.mu.Unlock()
close(channelToClose)
}
43 changes: 43 additions & 0 deletions server/etcdserver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"

"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
Expand All @@ -28,6 +30,14 @@ import (
// serverVersionAdapter implements Server interface needed by serverversion.Monitor
type serverVersionAdapter struct {
*EtcdServer
tx backend.BatchTx
}

func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter {
return &serverVersionAdapter{
EtcdServer: s,
tx: nil,
}
}

var _ serverversion.Server = (*serverVersionAdapter)(nil)
Expand Down Expand Up @@ -56,3 +66,36 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
}

func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx)
if err != nil {
return nil
}
return &v
}

func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
err := schema.UnsafeMigrate(s.lg, s.tx, target)
if err != nil {
s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err))
}
}

func (s *serverVersionAdapter) Lock() {
s.tx = s.be.BatchTx()
s.tx.Lock()
}

func (s *serverVersionAdapter) Unlock() {
s.tx.Unlock()
s.tx = nil
}
11 changes: 10 additions & 1 deletion server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/pkg/v3/notify"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
Expand Down Expand Up @@ -57,7 +58,8 @@ type RaftCluster struct {
// removed id cannot be reused.
removed map[types.ID]bool

downgradeInfo *DowngradeInfo
downgradeInfo *DowngradeInfo
versionChanged *notify.Notifier
}

// ConfigChangeContext represents a context for confChange.
Expand Down Expand Up @@ -247,6 +249,10 @@ func (c *RaftCluster) SetBackend(be MembershipBackend) {
c.be.MustCreateBackendBuckets()
}

func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) {
c.versionChanged = n
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -545,6 +551,9 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
}
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1)
if c.versionChanged != nil {
c.versionChanged.Notify()
}
onSet(c.lg, ver)
}

Expand Down
17 changes: 15 additions & 2 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,29 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co
ci = cindex.NewConsistentIndex(nil)
beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci)
be = serverstorage.OpenBackend(cfg, beHooks)
defer func() {
if err != nil && be != nil {
be.Close()
}
}()
ci.SetBackend(be)
schema.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err := maybeDefragBackend(cfg, be)
err = maybeDefragBackend(cfg, be)
if err != nil {
be.Close()
return nil, nil, false, nil, err
}
}
cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))

// TODO(serathius): Implement schema setup in fresh storage
if beExist {
err = schema.Validate(cfg.Logger, be.BatchTx())
if err != nil {
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
return nil, nil, false, nil, err
}
}
return be, ci, beExist, beHooks, nil
}

Expand Down
Loading

0 comments on commit c2937d7

Please sign in to comment.