Skip to content

Commit

Permalink
server: Move setting storage version to schema
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jul 12, 2021
1 parent 5b6f457 commit f6534f1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
8 changes: 4 additions & 4 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ type EtcdServer struct {

*AccessController

// Ensure that storage version is updated only once.
storageVersionUpdated sync.Once
// Ensure that storage schema is updated only once.
updateStorageSchema sync.Once
}

type backendHooks struct {
Expand Down Expand Up @@ -2136,8 +2136,8 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
s.storageVersionUpdated.Do(func() {
err := serverversion.UpdateStorageVersion(s.lg, s.be.BatchTx())
s.updateStorageSchema.Do(func() {
err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx())
if err != nil {
s.lg.Warn("failed to update storage version", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package version
package schema

import (
"fmt"
Expand All @@ -21,16 +21,15 @@ import (
"go.uber.org/zap"

"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)

var (
V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6}
)

// UpdateStorageVersion updates storage version.
func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
// UpdateStorageSchema updates storage version.
func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
v, err := detectStorageVersion(lg, tx)
Expand All @@ -41,7 +40,7 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
schema.UnsafeSetStorageVersion(tx, &V3_6)
UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
Expand All @@ -50,17 +49,17 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
}

func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
v := schema.UnsafeReadStorageVersion(tx)
v := UnsafeReadStorageVersion(tx)
if v != nil {
return v, nil
}
confstate := schema.UnsafeConfStateFromBackend(lg, tx)
confstate := UnsafeConfStateFromBackend(lg, tx)
if confstate == nil {
return nil, fmt.Errorf("missing %q key", schema.MetaConfStateName)
return nil, fmt.Errorf("missing %q key", MetaConfStateName)
}
_, term := schema.UnsafeReadConsistentIndex(tx)
_, term := UnsafeReadConsistentIndex(tx)
if term == 0 {
return nil, fmt.Errorf("missing %q key", schema.MetaTermKeyName)
return nil, fmt.Errorf("missing %q key", MetaTermKeyName)
}
copied := V3_5
return &copied, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package version
package schema

import (
"testing"
Expand All @@ -24,7 +24,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)

Expand All @@ -47,33 +46,33 @@ func TestUpdateStorageVersion(t *testing.T) {
{
name: `Backend before 3.6 without "term" should be rejected`,
version: "",
metaKeys: [][]byte{schema.MetaConfStateName},
metaKeys: [][]byte{MetaConfStateName},
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
},
{
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName},
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName},
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName},
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName, []byte("future-key")},
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
},
}
Expand All @@ -86,34 +85,34 @@ func TestUpdateStorageVersion(t *testing.T) {
t.Fatal("batch tx is nil")
}
tx.Lock()
schema.UnsafeCreateMetaBucket(tx)
UnsafeCreateMetaBucket(tx)
for _, k := range tc.metaKeys {
switch string(k) {
case string(schema.MetaConfStateName):
schema.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
case string(schema.MetaTermKeyName):
schema.UnsafeUpdateConsistentIndex(tx, 1, 1, false)
case string(MetaConfStateName):
MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
case string(MetaTermKeyName):
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
default:
tx.UnsafePut(schema.Meta, k, []byte{})
tx.UnsafePut(Meta, k, []byte{})
}
}
if tc.version != "" {
schema.UnsafeSetStorageVersion(tx, semver.New(tc.version))
UnsafeSetStorageVersion(tx, semver.New(tc.version))
}
tx.Unlock()
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
err := UpdateStorageVersion(lg, b.BatchTx())
err := UpdateStorageSchema(lg, b.BatchTx())
if (err != nil) != tc.expectError {
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
}
if err != nil && err.Error() != tc.expectedErrorMsg {
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
}
v := schema.UnsafeReadStorageVersion(b.BatchTx())
v := UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v)
})
}
Expand Down

0 comments on commit f6534f1

Please sign in to comment.