From 44b8ae145b505811775f5af915dd19198d556d55 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 10:23:04 +0200 Subject: [PATCH 1/3] etcdserver: Move datadir and wal to storage package --- CHANGELOG-3.6.md | 2 ++ contrib/raftexample/raft.go | 4 ++-- etcdutl/etcdutl/backup_command.go | 6 +++--- etcdutl/etcdutl/defrag_command.go | 2 +- etcdutl/etcdutl/snapshot_command.go | 2 +- etcdutl/snapshot/v3_snapshot.go | 4 ++-- server/config/config.go | 2 +- server/embed/util.go | 2 +- server/etcdserver/api/snap/snapshotter.go | 2 +- server/etcdserver/api/snap/snapshotter_test.go | 2 +- server/etcdserver/bootstrap.go | 4 ++-- server/etcdserver/storage.go | 4 ++-- server/{ => storage}/datadir/datadir.go | 0 server/{ => storage}/datadir/datadir_test.go | 2 +- server/{ => storage}/datadir/doc.go | 0 server/{ => storage}/wal/decoder.go | 2 +- server/{ => storage}/wal/doc.go | 0 server/{ => storage}/wal/encoder.go | 2 +- server/{ => storage}/wal/file_pipeline.go | 0 server/{ => storage}/wal/file_pipeline_test.go | 0 server/{ => storage}/wal/metrics.go | 0 server/{ => storage}/wal/record_test.go | 2 +- server/{ => storage}/wal/repair.go | 2 +- server/{ => storage}/wal/repair_test.go | 2 +- server/{ => storage}/wal/util.go | 0 server/{ => storage}/wal/wal.go | 2 +- server/{ => storage}/wal/wal_bench_test.go | 0 server/{ => storage}/wal/wal_test.go | 2 +- server/{ => storage}/wal/walpb/record.go | 0 server/{ => storage}/wal/walpb/record.pb.go | 0 server/{ => storage}/wal/walpb/record.proto | 0 server/{ => storage}/wal/walpb/record_test.go | 0 server/verify/verify.go | 6 +++--- tests/e2e/etcd_corrupt_test.go | 2 +- tools/etcd-dump-logs/etcd-dump-log_test.go | 2 +- tools/etcd-dump-logs/main.go | 4 ++-- 36 files changed, 34 insertions(+), 32 deletions(-) rename server/{ => storage}/datadir/datadir.go (100%) rename server/{ => storage}/datadir/datadir_test.go (94%) rename server/{ => storage}/datadir/doc.go (100%) rename server/{ => storage}/wal/decoder.go (99%) rename server/{ => storage}/wal/doc.go (100%) rename server/{ => storage}/wal/encoder.go (98%) rename server/{ => storage}/wal/file_pipeline.go (100%) rename server/{ => storage}/wal/file_pipeline_test.go (100%) rename server/{ => storage}/wal/metrics.go (100%) rename server/{ => storage}/wal/record_test.go (98%) rename server/{ => storage}/wal/repair.go (98%) rename server/{ => storage}/wal/repair_test.go (99%) rename server/{ => storage}/wal/util.go (100%) rename server/{ => storage}/wal/wal.go (99%) rename server/{ => storage}/wal/wal_bench_test.go (100%) rename server/{ => storage}/wal/wal_test.go (99%) rename server/{ => storage}/wal/walpb/record.go (100%) rename server/{ => storage}/wal/walpb/record.pb.go (100%) rename server/{ => storage}/wal/walpb/record.proto (100%) rename server/{ => storage}/wal/walpb/record_test.go (100%) diff --git a/CHANGELOG-3.6.md b/CHANGELOG-3.6.md index 36d0f7fa69b..a0e98aa91df 100644 --- a/CHANGELOG-3.6.md +++ b/CHANGELOG-3.6.md @@ -21,3 +21,5 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Package `mvcc` was moved to `storage/mvcc` - Package `mvcc/backend` was moved to `storage/backend` - Package `mvcc/buckets` was moved to `storage/schema` +- Package `wal` was moved to `storage/wal` +- Package `datadir` was moved to `storage/datadir` diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index b2bcb25626a..439a08375cd 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -31,8 +31,8 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats" - "go.etcd.io/etcd/server/v3/wal" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index bf8ede334ed..54fa68e1de0 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -27,15 +27,15 @@ import ( "go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/datadir" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/schema" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.etcd.io/etcd/server/v3/verify" - "go.etcd.io/etcd/server/v3/wal" - "go.etcd.io/etcd/server/v3/wal/walpb" bolt "go.etcd.io/bbolt" "go.uber.org/zap" diff --git a/etcdutl/etcdutl/defrag_command.go b/etcdutl/etcdutl/defrag_command.go index 67880c0c269..fe4f8430d50 100644 --- a/etcdutl/etcdutl/defrag_command.go +++ b/etcdutl/etcdutl/defrag_command.go @@ -21,8 +21,8 @@ import ( "github.com/spf13/cobra" "go.etcd.io/etcd/pkg/v3/cobrautl" - "go.etcd.io/etcd/server/v3/datadir" "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/datadir" ) var ( diff --git a/etcdutl/etcdutl/snapshot_command.go b/etcdutl/etcdutl/snapshot_command.go index 0ff6183399e..1b3f5ef6b9f 100644 --- a/etcdutl/etcdutl/snapshot_command.go +++ b/etcdutl/etcdutl/snapshot_command.go @@ -20,7 +20,7 @@ import ( "go.etcd.io/etcd/etcdutl/v3/snapshot" "go.etcd.io/etcd/pkg/v3/cobrautl" - "go.etcd.io/etcd/server/v3/datadir" + "go.etcd.io/etcd/server/v3/storage/datadir" "github.com/spf13/cobra" ) diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 36fc9ee8cdc..f6445498c1f 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -42,9 +42,9 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.etcd.io/etcd/server/v3/verify" - "go.etcd.io/etcd/server/v3/wal" - "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" ) diff --git a/server/config/config.go b/server/config/config.go index 9a8a6881f7d..c9e7d3aa3f0 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -25,7 +25,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/netutil" - "go.etcd.io/etcd/server/v3/datadir" + "go.etcd.io/etcd/server/v3/storage/datadir" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" bolt "go.etcd.io/bbolt" diff --git a/server/embed/util.go b/server/embed/util.go index ad461534551..269fbc80b20 100644 --- a/server/embed/util.go +++ b/server/embed/util.go @@ -17,7 +17,7 @@ package embed import ( "path/filepath" - "go.etcd.io/etcd/server/v3/wal" + "go.etcd.io/etcd/server/v3/storage/wal" ) func isMemberInitialized(cfg *Config) bool { diff --git a/server/etcdserver/api/snap/snapshotter.go b/server/etcdserver/api/snap/snapshotter.go index 52cc0ae2670..70e946d5675 100644 --- a/server/etcdserver/api/snap/snapshotter.go +++ b/server/etcdserver/api/snap/snapshotter.go @@ -31,7 +31,7 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) diff --git a/server/etcdserver/api/snap/snapshotter_test.go b/server/etcdserver/api/snap/snapshotter_test.go index 40ad084dd12..52a378dc793 100644 --- a/server/etcdserver/api/snap/snapshotter_test.go +++ b/server/etcdserver/api/snap/snapshotter_test.go @@ -25,7 +25,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 497d06ac1cc..14e1b28d158 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -43,8 +43,8 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" - "go.etcd.io/etcd/server/v3/wal" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index 24e0c6d541c..8170c29db53 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -17,8 +17,8 @@ package etcdserver import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" - "go.etcd.io/etcd/server/v3/wal" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) type Storage interface { diff --git a/server/datadir/datadir.go b/server/storage/datadir/datadir.go similarity index 100% rename from server/datadir/datadir.go rename to server/storage/datadir/datadir.go diff --git a/server/datadir/datadir_test.go b/server/storage/datadir/datadir_test.go similarity index 94% rename from server/datadir/datadir_test.go rename to server/storage/datadir/datadir_test.go index f6fe19b1c01..3933839c307 100644 --- a/server/datadir/datadir_test.go +++ b/server/storage/datadir/datadir_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/server/v3/datadir" + "go.etcd.io/etcd/server/v3/storage/datadir" ) func TestToBackendFileName(t *testing.T) { diff --git a/server/datadir/doc.go b/server/storage/datadir/doc.go similarity index 100% rename from server/datadir/doc.go rename to server/storage/datadir/doc.go diff --git a/server/wal/decoder.go b/server/storage/wal/decoder.go similarity index 99% rename from server/wal/decoder.go rename to server/storage/wal/decoder.go index 0251a72133d..7cc634a2ea0 100644 --- a/server/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -24,7 +24,7 @@ import ( "go.etcd.io/etcd/pkg/v3/crc" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) const minSectorSize = 512 diff --git a/server/wal/doc.go b/server/storage/wal/doc.go similarity index 100% rename from server/wal/doc.go rename to server/storage/wal/doc.go diff --git a/server/wal/encoder.go b/server/storage/wal/encoder.go similarity index 98% rename from server/wal/encoder.go rename to server/storage/wal/encoder.go index 61b4c20efb1..d9e221ff20c 100644 --- a/server/wal/encoder.go +++ b/server/storage/wal/encoder.go @@ -23,7 +23,7 @@ import ( "go.etcd.io/etcd/pkg/v3/crc" "go.etcd.io/etcd/pkg/v3/ioutil" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) // walPageBytes is the alignment for flushing records to the backing Writer. diff --git a/server/wal/file_pipeline.go b/server/storage/wal/file_pipeline.go similarity index 100% rename from server/wal/file_pipeline.go rename to server/storage/wal/file_pipeline.go diff --git a/server/wal/file_pipeline_test.go b/server/storage/wal/file_pipeline_test.go similarity index 100% rename from server/wal/file_pipeline_test.go rename to server/storage/wal/file_pipeline_test.go diff --git a/server/wal/metrics.go b/server/storage/wal/metrics.go similarity index 100% rename from server/wal/metrics.go rename to server/storage/wal/metrics.go diff --git a/server/wal/record_test.go b/server/storage/wal/record_test.go similarity index 98% rename from server/wal/record_test.go rename to server/storage/wal/record_test.go index d28807ebb8c..1de390dd8d5 100644 --- a/server/wal/record_test.go +++ b/server/storage/wal/record_test.go @@ -23,7 +23,7 @@ import ( "reflect" "testing" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) var ( diff --git a/server/wal/repair.go b/server/storage/wal/repair.go similarity index 98% rename from server/wal/repair.go rename to server/storage/wal/repair.go index 122ee49a6a4..b6b9b49f220 100644 --- a/server/wal/repair.go +++ b/server/storage/wal/repair.go @@ -21,7 +21,7 @@ import ( "time" "go.etcd.io/etcd/client/pkg/v3/fileutil" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) diff --git a/server/wal/repair_test.go b/server/storage/wal/repair_test.go similarity index 99% rename from server/wal/repair_test.go rename to server/storage/wal/repair_test.go index e2e3294acda..089d5a770bc 100644 --- a/server/wal/repair_test.go +++ b/server/storage/wal/repair_test.go @@ -22,7 +22,7 @@ import ( "testing" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) diff --git a/server/wal/util.go b/server/storage/wal/util.go similarity index 100% rename from server/wal/util.go rename to server/storage/wal/util.go diff --git a/server/wal/wal.go b/server/storage/wal/wal.go similarity index 99% rename from server/wal/wal.go rename to server/storage/wal/wal.go index 3c940e0cdeb..83c10d46f87 100644 --- a/server/wal/wal.go +++ b/server/storage/wal/wal.go @@ -30,7 +30,7 @@ import ( "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) diff --git a/server/wal/wal_bench_test.go b/server/storage/wal/wal_bench_test.go similarity index 100% rename from server/wal/wal_bench_test.go rename to server/storage/wal/wal_bench_test.go diff --git a/server/wal/wal_test.go b/server/storage/wal/wal_test.go similarity index 99% rename from server/wal/wal_test.go rename to server/storage/wal/wal_test.go index 05014086c26..d20d561c76c 100644 --- a/server/wal/wal_test.go +++ b/server/storage/wal/wal_test.go @@ -31,7 +31,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap/zaptest" "go.uber.org/zap" diff --git a/server/wal/walpb/record.go b/server/storage/wal/walpb/record.go similarity index 100% rename from server/wal/walpb/record.go rename to server/storage/wal/walpb/record.go diff --git a/server/wal/walpb/record.pb.go b/server/storage/wal/walpb/record.pb.go similarity index 100% rename from server/wal/walpb/record.pb.go rename to server/storage/wal/walpb/record.pb.go diff --git a/server/wal/walpb/record.proto b/server/storage/wal/walpb/record.proto similarity index 100% rename from server/wal/walpb/record.proto rename to server/storage/wal/walpb/record.proto diff --git a/server/wal/walpb/record_test.go b/server/storage/wal/walpb/record_test.go similarity index 100% rename from server/wal/walpb/record_test.go rename to server/storage/wal/walpb/record_test.go diff --git a/server/verify/verify.go b/server/verify/verify.go index fe34f7a2c09..f1de10b5da7 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -19,11 +19,11 @@ import ( "os" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/datadir" "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/schema" - wal2 "go.etcd.io/etcd/server/v3/wal" - "go.etcd.io/etcd/server/v3/wal/walpb" + wal2 "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) diff --git a/tests/e2e/etcd_corrupt_test.go b/tests/e2e/etcd_corrupt_test.go index edc95c0101b..0fd7be37369 100644 --- a/tests/e2e/etcd_corrupt_test.go +++ b/tests/e2e/etcd_corrupt_test.go @@ -25,7 +25,7 @@ import ( bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/datadir" + "go.etcd.io/etcd/server/v3/storage/datadir" ) // TODO: test with embedded etcd in integration package diff --git a/tools/etcd-dump-logs/etcd-dump-log_test.go b/tools/etcd-dump-logs/etcd-dump-log_test.go index 372f7597bfc..e78e896272f 100644 --- a/tools/etcd-dump-logs/etcd-dump-log_test.go +++ b/tools/etcd-dump-logs/etcd-dump-log_test.go @@ -29,7 +29,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/wal" + "go.etcd.io/etcd/server/v3/storage/wal" "go.uber.org/zap" ) diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index b5635634430..15c76a40ad7 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -34,8 +34,8 @@ import ( "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" - "go.etcd.io/etcd/server/v3/wal" - "go.etcd.io/etcd/server/v3/wal/walpb" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.uber.org/zap" ) From 23b742cfd327ee9e20f8cad49fad12eec1b61a98 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 16:58:42 +0200 Subject: [PATCH 2/3] server: Remove Quota direct dependency on EtcdServer --- server/etcdserver/api/v3rpc/quota.go | 4 ++-- server/etcdserver/apply.go | 2 +- server/etcdserver/quota.go | 34 +++++++++++++++------------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/server/etcdserver/api/v3rpc/quota.go b/server/etcdserver/api/v3rpc/quota.go index 7f53bd966bd..c660df52942 100644 --- a/server/etcdserver/api/v3rpc/quota.go +++ b/server/etcdserver/api/v3rpc/quota.go @@ -52,7 +52,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error { func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer { return "aKVServer{ NewKVServer(s), - quotaAlarmer{etcdserver.NewBackendQuota(s, "kv"), s, s.ID()}, + quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()}, } } @@ -85,6 +85,6 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer { return "aLeaseServer{ NewLeaseServer(s), - quotaAlarmer{etcdserver.NewBackendQuota(s, "lease"), s, s.ID()}, + quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()}, } } diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 5235d61cd8d..69cd8c8b30d 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -953,7 +953,7 @@ type quotaApplierV3 struct { } func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { - return "aApplierV3{app, NewBackendQuota(s, "v3-applier")} + return "aApplierV3{app, NewBackendQuota(s.Cfg, s.Backend(), "v3-applier")} } func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { diff --git a/server/etcdserver/quota.go b/server/etcdserver/quota.go index 33c06e61900..e999abb59bc 100644 --- a/server/etcdserver/quota.go +++ b/server/etcdserver/quota.go @@ -18,6 +18,8 @@ import ( "sync" pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/config" + "go.etcd.io/etcd/server/v3/storage/backend" humanize "github.com/dustin/go-humanize" "go.uber.org/zap" @@ -51,7 +53,7 @@ func (*passthroughQuota) Cost(interface{}) int { return 0 } func (*passthroughQuota) Remaining() int64 { return 1 } type backendQuota struct { - s *EtcdServer + be backend.Backend maxBackendBytes int64 } @@ -71,23 +73,23 @@ var ( ) // NewBackendQuota creates a quota layer with the given storage limit. -func NewBackendQuota(s *EtcdServer, name string) Quota { - lg := s.Logger() - quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes)) +func NewBackendQuota(cfg config.ServerConfig, be backend.Backend, name string) Quota { + lg := cfg.Logger + quotaBackendBytes.Set(float64(cfg.QuotaBackendBytes)) - if s.Cfg.QuotaBackendBytes < 0 { + if cfg.QuotaBackendBytes < 0 { // disable quotas if negative quotaLogOnce.Do(func() { lg.Info( "disabled backend quota", zap.String("quota-name", name), - zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), + zap.Int64("quota-size-bytes", cfg.QuotaBackendBytes), ) }) return &passthroughQuota{} } - if s.Cfg.QuotaBackendBytes == 0 { + if cfg.QuotaBackendBytes == 0 { // use default size if no quota size given quotaLogOnce.Do(func() { if lg != nil { @@ -100,16 +102,16 @@ func NewBackendQuota(s *EtcdServer, name string) Quota { } }) quotaBackendBytes.Set(float64(DefaultQuotaBytes)) - return &backendQuota{s, DefaultQuotaBytes} + return &backendQuota{be, DefaultQuotaBytes} } quotaLogOnce.Do(func() { - if s.Cfg.QuotaBackendBytes > MaxQuotaBytes { + if cfg.QuotaBackendBytes > MaxQuotaBytes { lg.Warn( "quota exceeds the maximum value", zap.String("quota-name", name), - zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), - zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), + zap.Int64("quota-size-bytes", cfg.QuotaBackendBytes), + zap.String("quota-size", humanize.Bytes(uint64(cfg.QuotaBackendBytes))), zap.Int64("quota-maximum-size-bytes", MaxQuotaBytes), zap.String("quota-maximum-size", maxQuotaSize), ) @@ -117,16 +119,16 @@ func NewBackendQuota(s *EtcdServer, name string) Quota { lg.Info( "enabled backend quota", zap.String("quota-name", name), - zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), - zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), + zap.Int64("quota-size-bytes", cfg.QuotaBackendBytes), + zap.String("quota-size", humanize.Bytes(uint64(cfg.QuotaBackendBytes))), ) }) - return &backendQuota{s, s.Cfg.QuotaBackendBytes} + return &backendQuota{be, cfg.QuotaBackendBytes} } func (b *backendQuota) Available(v interface{}) bool { // TODO: maybe optimize backend.Size() - return b.s.Backend().Size()+int64(b.Cost(v)) < b.maxBackendBytes + return b.be.Size()+int64(b.Cost(v)) < b.maxBackendBytes } func (b *backendQuota) Cost(v interface{}) int { @@ -168,5 +170,5 @@ func costTxn(r *pb.TxnRequest) int { } func (b *backendQuota) Remaining() int64 { - return b.maxBackendBytes - b.s.Backend().Size() + return b.maxBackendBytes - b.be.Size() } From 83a325ac4687289180b3dcae6360d9df026fe840 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 17:27:36 +0200 Subject: [PATCH 3/3] server: Move all functions needed for storage bootstrap to storage package This is prerequestite to move storage bootstrap, splitted to separate PR to make it easier to review. --- server/embed/etcd.go | 3 +- server/etcdserver/api/v3rpc/quota.go | 7 +- server/etcdserver/apply.go | 7 +- server/etcdserver/bootstrap.go | 19 +-- server/etcdserver/metrics.go | 8 +- server/etcdserver/raft.go | 108 ---------------- server/etcdserver/raft_test.go | 5 +- server/etcdserver/server.go | 54 +------- server/etcdserver/server_test.go | 17 +-- server/{etcdserver => storage}/backend.go | 18 +-- server/storage/hooks.go | 60 +++++++++ server/storage/metrics.go | 30 +++++ server/{etcdserver => storage}/quota.go | 18 +-- server/storage/util.go | 150 ++++++++++++++++++++++ tests/integration/metrics_test.go | 6 +- 15 files changed, 298 insertions(+), 212 deletions(-) rename server/{etcdserver => storage}/backend.go (82%) create mode 100644 server/storage/hooks.go create mode 100644 server/storage/metrics.go rename server/{etcdserver => storage}/quota.go (92%) create mode 100644 server/storage/util.go diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 37ea5fb17db..af27e353ed9 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -42,6 +42,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2v3" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/verify" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -303,7 +304,7 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized quota := ec.QuotaBackendBytes if quota == 0 { - quota = etcdserver.DefaultQuotaBytes + quota = storage.DefaultQuotaBytes } lg.Info( diff --git a/server/etcdserver/api/v3rpc/quota.go b/server/etcdserver/api/v3rpc/quota.go index c660df52942..fd41bc13339 100644 --- a/server/etcdserver/api/v3rpc/quota.go +++ b/server/etcdserver/api/v3rpc/quota.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/storage" ) type quotaKVServer struct { @@ -29,7 +30,7 @@ type quotaKVServer struct { } type quotaAlarmer struct { - q etcdserver.Quota + q storage.Quota a Alarmer id types.ID } @@ -52,7 +53,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error { func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer { return "aKVServer{ NewKVServer(s), - quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()}, + quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()}, } } @@ -85,6 +86,6 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer { return "aLeaseServer{ NewLeaseServer(s), - quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()}, + quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()}, } } diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 69cd8c8b30d..2f4d9f33225 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -32,6 +32,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/lease" + serverstorage "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/storage/mvcc" "github.com/gogo/protobuf/proto" @@ -770,7 +771,7 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) type applierV3Capped struct { applierV3 - q backendQuota + q serverstorage.BackendQuota } // newApplierV3Capped creates an applyV3 that will reject Puts and transactions @@ -949,11 +950,11 @@ func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequ type quotaApplierV3 struct { applierV3 - q Quota + q serverstorage.Quota } func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { - return "aApplierV3{app, NewBackendQuota(s.Cfg, s.Backend(), "v3-applier")} + return "aApplierV3{app, serverstorage.NewBackendQuota(s.Cfg, s.Backend(), "v3-applier")} } func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 14e1b28d158..9a31f659cad 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -41,6 +41,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" + serverstorage "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" @@ -117,7 +118,7 @@ type bootstrappedServer struct { st v2store.Store be backend.Backend ss *snap.Snapshotter - beHooks *backendHooks + beHooks *serverstorage.BackendHooks } func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { @@ -141,11 +142,11 @@ func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { return snap.New(cfg.Logger, cfg.SnapDir()) } -func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) { +func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *serverstorage.BackendHooks, err error) { beExist = fileutil.Exist(cfg.BackendPath()) ci = cindex.NewConsistentIndex(nil) - beHooks = &backendHooks{lg: cfg.Logger, indexer: ci} - be = openBackend(cfg, beHooks) + beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci) + be = serverstorage.OpenBackend(cfg, beHooks) ci.SetBackend(be) schema.CreateMetaBucket(be.BatchTx()) if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { @@ -249,7 +250,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st }, nil } -func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) { +func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -282,7 +283,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) } - if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { + if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { cfg.Logger.Error("illegal v2store content", zap.Error(err)) return nil, err } @@ -293,7 +294,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), ) - if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { + if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } s1, s2 := be.Size(), be.SizeInUse() @@ -578,9 +579,9 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { } func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry { - return createConfigChangeEnts( + return serverstorage.CreateConfigChangeEnts( wal.lg, - getIDs(wal.lg, wal.snapshot, wal.ents), + serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents), uint64(wal.id), wal.st.Term, wal.st.Commit, diff --git a/server/etcdserver/metrics.go b/server/etcdserver/metrics.go index 06263a9cd29..33ee02747fc 100644 --- a/server/etcdserver/metrics.go +++ b/server/etcdserver/metrics.go @@ -124,12 +124,7 @@ var ( Name: "lease_expired_total", Help: "The total number of expired leases.", }) - quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "etcd", - Subsystem: "server", - Name: "quota_backend_bytes", - Help: "Current backend storage quota size in bytes.", - }) + currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "server", @@ -191,7 +186,6 @@ func init() { prometheus.MustRegister(slowReadIndex) prometheus.MustRegister(readIndexFailed) prometheus.MustRegister(leaseExpired) - prometheus.MustRegister(quotaBackendBytes) prometheus.MustRegister(currentVersion) prometheus.MustRegister(currentGoVersion) prometheus.MustRegister(serverID) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 8b30b650fb8..4fb8da6ed28 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -15,21 +15,16 @@ package etcdserver import ( - "encoding/json" "expvar" "fmt" "log" - "sort" "sync" "time" "go.etcd.io/etcd/client/pkg/v3/logutil" - "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/contention" - "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.uber.org/zap" ) @@ -415,106 +410,3 @@ func (r *raftNode) advanceTicks(ticks int) { r.tick() } } - -// getIDs returns an ordered set of IDs included in the given snapshot and -// the entries. The given snapshot/entries can contain three kinds of -// ID-related entry: -// - ConfChangeAddNode, in which case the contained ID will be added into the set. -// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set. -// - ConfChangeAddLearnerNode, in which the contained ID will be added into the set. -func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { - ids := make(map[uint64]bool) - if snap != nil { - for _, id := range snap.Metadata.ConfState.Voters { - ids[id] = true - } - } - for _, e := range ents { - if e.Type != raftpb.EntryConfChange { - continue - } - var cc raftpb.ConfChange - pbutil.MustUnmarshal(&cc, e.Data) - switch cc.Type { - case raftpb.ConfChangeAddLearnerNode: - ids[cc.NodeID] = true - case raftpb.ConfChangeAddNode: - ids[cc.NodeID] = true - case raftpb.ConfChangeRemoveNode: - delete(ids, cc.NodeID) - case raftpb.ConfChangeUpdateNode: - // do nothing - default: - lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String())) - } - } - sids := make(types.Uint64Slice, 0, len(ids)) - for id := range ids { - sids = append(sids, id) - } - sort.Sort(sids) - return []uint64(sids) -} - -// createConfigChangeEnts creates a series of Raft entries (i.e. -// EntryConfChange) to remove the set of given IDs from the cluster. The ID -// `self` is _not_ removed, even if present in the set. -// If `self` is not inside the given ids, it creates a Raft entry to add a -// default member with the given `self`. -func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry { - found := false - for _, id := range ids { - if id == self { - found = true - } - } - - var ents []raftpb.Entry - next := index + 1 - - // NB: always add self first, then remove other nodes. Raft will panic if the - // set of voters ever becomes empty. - if !found { - m := membership.Member{ - ID: types.ID(self), - RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}}, - } - ctx, err := json.Marshal(m) - if err != nil { - lg.Panic("failed to marshal member", zap.Error(err)) - } - cc := &raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: self, - Context: ctx, - } - e := raftpb.Entry{ - Type: raftpb.EntryConfChange, - Data: pbutil.MustMarshal(cc), - Term: term, - Index: next, - } - ents = append(ents, e) - next++ - } - - for _, id := range ids { - if id == self { - continue - } - cc := &raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, - NodeID: id, - } - e := raftpb.Entry{ - Type: raftpb.EntryConfChange, - Data: pbutil.MustMarshal(cc), - Term: term, - Index: next, - } - ents = append(ents, e) - next++ - } - - return ents -} diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 3eb5345dc25..49de844b5ca 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -28,6 +28,7 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/mock/mockstorage" + serverstorage "go.etcd.io/etcd/server/v3/storage" "go.uber.org/zap" ) @@ -66,7 +67,7 @@ func TestGetIDs(t *testing.T) { if tt.confState != nil { snap.Metadata.ConfState = *tt.confState } - idSet := getIDs(testLogger, &snap, tt.ents) + idSet := serverstorage.GetIDs(testLogger, &snap, tt.ents) if !reflect.DeepEqual(idSet, tt.widSet) { t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet) } @@ -146,7 +147,7 @@ func TestCreateConfigChangeEnts(t *testing.T) { } for i, tt := range tests { - gents := createConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index) + gents := serverstorage.CreateConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index) if !reflect.DeepEqual(gents, tt.wents) { t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 95db2a29b03..590c98c8e4b 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -62,6 +62,7 @@ import ( serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" + serverstorage "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" @@ -258,7 +259,7 @@ type EtcdServer struct { lessor lease.Lessor bemu sync.Mutex be backend.Backend - beHooks *backendHooks + beHooks *serverstorage.BackendHooks authStore auth.AuthStore alarmStore *v3alarm.AlarmStore @@ -296,36 +297,6 @@ type EtcdServer struct { updateStorageSchema sync.Once } -type backendHooks struct { - indexer cindex.ConsistentIndexer - lg *zap.Logger - - // confState to be written in the next submitted backend transaction (if dirty) - confState raftpb.ConfState - // first write changes it to 'dirty'. false by default, so - // not initialized `confState` is meaningless. - confStateDirty bool - confStateLock sync.Mutex -} - -func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { - bh.indexer.UnsafeSave(tx) - bh.confStateLock.Lock() - defer bh.confStateLock.Unlock() - if bh.confStateDirty { - schema.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState) - // save bh.confState - bh.confStateDirty = false - } -} - -func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) { - bh.confStateLock.Lock() - defer bh.confStateLock.Unlock() - bh.confState = *confState - bh.confStateDirty = true -} - // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { @@ -462,23 +433,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return srv, nil } -// assertNoV2StoreContent -> depending on the deprecation stage, warns or report an error -// if the v2store contains custom content. -func assertNoV2StoreContent(lg *zap.Logger, st v2store.Store, deprecationStage config.V2DeprecationEnum) error { - metaOnly, err := membership.IsMetaStoreOnly(st) - if err != nil { - return err - } - if metaOnly { - return nil - } - if deprecationStage.IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) { - return fmt.Errorf("detected disallowed custom content in v2store for stage --v2-deprecation=%s", deprecationStage) - } - lg.Warn("detected custom v2store content. Etcd v3.5 is the last version allowing to access it using API v2. Please remove the content.") - return nil -} - func (s *EtcdServer) Logger() *zap.Logger { s.lgMu.RLock() l := s.lg @@ -1006,7 +960,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { // wait for raftNode to persist snapshot onto the disk <-apply.notifyc - newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks) + newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks) if err != nil { lg.Panic("failed to open snapshot backend", zap.Error(err)) } @@ -1069,7 +1023,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore v2 store", zap.Error(err)) } - if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil { + if err := serverstorage.AssertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil { lg.Panic("illegal v2store content", zap.Error(err)) } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 4f2e5d0b3c3..4524cd83215 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -51,6 +51,7 @@ import ( "go.etcd.io/etcd/server/v3/mock/mockstorage" "go.etcd.io/etcd/server/v3/mock/mockstore" "go.etcd.io/etcd/server/v3/mock/mockwait" + serverstorage "go.etcd.io/etcd/server/v3/storage" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" @@ -614,7 +615,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { id: 1, r: *r, cluster: cl, - beHooks: &backendHooks{lg: lg}, + beHooks: serverstorage.NewBackendHooks(lg, nil), } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, @@ -662,7 +663,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { cluster: cl, w: wait.New(), consistIndex: ci, - beHooks: &backendHooks{lg: lg, indexer: ci}, + beHooks: serverstorage.NewBackendHooks(lg, ci), } // create EntryConfChange entry @@ -746,7 +747,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { cluster: cl, w: wait.New(), consistIndex: ci, - beHooks: &backendHooks{lg: lg, indexer: ci}, + beHooks: serverstorage.NewBackendHooks(lg, ci), } ents := []raftpb.Entry{} for i := 1; i <= 4; i++ { @@ -1120,7 +1121,7 @@ func TestSnapshotOrdering(t *testing.T) { cluster: cl, SyncTicker: &time.Ticker{}, consistIndex: ci, - beHooks: &backendHooks{lg: lg, indexer: ci}, + beHooks: serverstorage.NewBackendHooks(lg, ci), } s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} @@ -1273,7 +1274,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { cluster: cl, SyncTicker: &time.Ticker{}, consistIndex: ci, - beHooks: &backendHooks{lg: lg, indexer: ci}, + beHooks: serverstorage.NewBackendHooks(lg, ci), } s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} @@ -1358,7 +1359,7 @@ func TestAddMember(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), - beHooks: &backendHooks{lg: lg}, + beHooks: serverstorage.NewBackendHooks(lg, nil), } s.start() m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} @@ -1405,7 +1406,7 @@ func TestRemoveMember(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), - beHooks: &backendHooks{lg: lg}, + beHooks: serverstorage.NewBackendHooks(lg, nil), } s.start() _, err := s.RemoveMember(context.Background(), 1234) @@ -1451,7 +1452,7 @@ func TestUpdateMember(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), - beHooks: &backendHooks{lg: lg}, + beHooks: serverstorage.NewBackendHooks(lg, nil), } s.start() wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} diff --git a/server/etcdserver/backend.go b/server/storage/backend.go similarity index 82% rename from server/etcdserver/backend.go rename to server/storage/backend.go index e0da65c6ce2..abbbf889d7d 100644 --- a/server/etcdserver/backend.go +++ b/server/storage/backend.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package storage import ( "fmt" @@ -55,8 +55,8 @@ func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { return backend.New(bcfg) } -// openSnapshotBackend renames a snapshot db to the current etcd db and opens it. -func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) { +// OpenSnapshotBackend renames a snapshot db to the current etcd db and opens it. +func OpenSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks *BackendHooks) (backend.Backend, error) { snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) if err != nil { return nil, fmt.Errorf("failed to find database snapshot file (%v)", err) @@ -64,11 +64,11 @@ func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot if err := os.Rename(snapPath, cfg.BackendPath()); err != nil { return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err) } - return openBackend(cfg, hooks), nil + return OpenBackend(cfg, hooks), nil } -// openBackend returns a backend using the current etcd db. -func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { +// OpenBackend returns a backend using the current etcd db. +func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { fn := cfg.BackendPath() now, beOpened := time.Now(), make(chan backend.Backend) @@ -92,11 +92,11 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { return <-beOpened } -// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes +// RecoverSnapshotBackend recovers the DB from a snapshot in case etcd crashes // before updating the backend db after persisting raft snapshot to disk, // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this // case, replace the db with the snapshot db sent by the leader. -func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { +func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx()) @@ -105,5 +105,5 @@ func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snap return oldbe, nil } oldbe.Close() - return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks) + return OpenSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks) } diff --git a/server/storage/hooks.go b/server/storage/hooks.go new file mode 100644 index 00000000000..e9a9f250d41 --- /dev/null +++ b/server/storage/hooks.go @@ -0,0 +1,60 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "sync" + + "go.uber.org/zap" + + "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/etcdserver/cindex" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/schema" +) + +type BackendHooks struct { + indexer cindex.ConsistentIndexer + lg *zap.Logger + + // confState to Be written in the next submitted Backend transaction (if dirty) + confState raftpb.ConfState + // first write changes it to 'dirty'. false by default, so + // not initialized `confState` is meaningless. + confStateDirty bool + confStateLock sync.Mutex +} + +func NewBackendHooks(lg *zap.Logger, indexer cindex.ConsistentIndexer) *BackendHooks { + return &BackendHooks{lg: lg, indexer: indexer} +} + +func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { + bh.indexer.UnsafeSave(tx) + bh.confStateLock.Lock() + defer bh.confStateLock.Unlock() + if bh.confStateDirty { + schema.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState) + // save bh.confState + bh.confStateDirty = false + } +} + +func (bh *BackendHooks) SetConfState(confState *raftpb.ConfState) { + bh.confStateLock.Lock() + defer bh.confStateLock.Unlock() + bh.confState = *confState + bh.confStateDirty = true +} diff --git a/server/storage/metrics.go b/server/storage/metrics.go new file mode 100644 index 00000000000..cb7f87057f9 --- /dev/null +++ b/server/storage/metrics.go @@ -0,0 +1,30 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "quota_backend_bytes", + Help: "Current backend storage quota size in bytes.", +}) + +func init() { + prometheus.MustRegister(quotaBackendBytes) +} diff --git a/server/etcdserver/quota.go b/server/storage/quota.go similarity index 92% rename from server/etcdserver/quota.go rename to server/storage/quota.go index e999abb59bc..e15079d453d 100644 --- a/server/etcdserver/quota.go +++ b/server/storage/quota.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package storage import ( "sync" @@ -52,7 +52,7 @@ func (*passthroughQuota) Available(interface{}) bool { return true } func (*passthroughQuota) Cost(interface{}) int { return 0 } func (*passthroughQuota) Remaining() int64 { return 1 } -type backendQuota struct { +type BackendQuota struct { be backend.Backend maxBackendBytes int64 } @@ -60,7 +60,7 @@ type backendQuota struct { const ( // leaseOverhead is an estimate for the cost of storing a lease leaseOverhead = 64 - // kvOverhead is an estimate for the cost of storing a key's metadata + // kvOverhead is an estimate for the cost of storing a key's Metadata kvOverhead = 256 ) @@ -102,7 +102,7 @@ func NewBackendQuota(cfg config.ServerConfig, be backend.Backend, name string) Q } }) quotaBackendBytes.Set(float64(DefaultQuotaBytes)) - return &backendQuota{be, DefaultQuotaBytes} + return &BackendQuota{be, DefaultQuotaBytes} } quotaLogOnce.Do(func() { @@ -123,15 +123,15 @@ func NewBackendQuota(cfg config.ServerConfig, be backend.Backend, name string) Q zap.String("quota-size", humanize.Bytes(uint64(cfg.QuotaBackendBytes))), ) }) - return &backendQuota{be, cfg.QuotaBackendBytes} + return &BackendQuota{be, cfg.QuotaBackendBytes} } -func (b *backendQuota) Available(v interface{}) bool { - // TODO: maybe optimize backend.Size() +func (b *BackendQuota) Available(v interface{}) bool { + // TODO: maybe optimize Backend.Size() return b.be.Size()+int64(b.Cost(v)) < b.maxBackendBytes } -func (b *backendQuota) Cost(v interface{}) int { +func (b *BackendQuota) Cost(v interface{}) int { switch r := v.(type) { case *pb.PutRequest: return costPut(r) @@ -169,6 +169,6 @@ func costTxn(r *pb.TxnRequest) int { return sizeSuccess } -func (b *backendQuota) Remaining() int64 { +func (b *BackendQuota) Remaining() int64 { return b.maxBackendBytes - b.be.Size() } diff --git a/server/storage/util.go b/server/storage/util.go new file mode 100644 index 00000000000..bdac72ec17c --- /dev/null +++ b/server/storage/util.go @@ -0,0 +1,150 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "encoding/json" + "fmt" + "sort" + + "go.uber.org/zap" + + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/config" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" +) + +// AssertNoV2StoreContent -> depending on the deprecation stage, warns or report an error +// if the v2store contains custom content. +func AssertNoV2StoreContent(lg *zap.Logger, st v2store.Store, deprecationStage config.V2DeprecationEnum) error { + metaOnly, err := membership.IsMetaStoreOnly(st) + if err != nil { + return err + } + if metaOnly { + return nil + } + if deprecationStage.IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) { + return fmt.Errorf("detected disallowed custom content in v2store for stage --v2-deprecation=%s", deprecationStage) + } + lg.Warn("detected custom v2store content. Etcd v3.5 is the last version allowing to access it using API v2. Please remove the content.") + return nil +} + +// CreateConfigChangeEnts creates a series of Raft entries (i.e. +// EntryConfChange) to remove the set of given IDs from the cluster. The ID +// `self` is _not_ removed, even if present in the set. +// If `self` is not inside the given ids, it creates a Raft entry to add a +// default member with the given `self`. +func CreateConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry { + found := false + for _, id := range ids { + if id == self { + found = true + } + } + + var ents []raftpb.Entry + next := index + 1 + + // NB: always add self first, then remove other nodes. Raft will panic if the + // set of voters ever becomes empty. + if !found { + m := membership.Member{ + ID: types.ID(self), + RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}}, + } + ctx, err := json.Marshal(m) + if err != nil { + lg.Panic("failed to marshal member", zap.Error(err)) + } + cc := &raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: self, + Context: ctx, + } + e := raftpb.Entry{ + Type: raftpb.EntryConfChange, + Data: pbutil.MustMarshal(cc), + Term: term, + Index: next, + } + ents = append(ents, e) + next++ + } + + for _, id := range ids { + if id == self { + continue + } + cc := &raftpb.ConfChange{ + Type: raftpb.ConfChangeRemoveNode, + NodeID: id, + } + e := raftpb.Entry{ + Type: raftpb.EntryConfChange, + Data: pbutil.MustMarshal(cc), + Term: term, + Index: next, + } + ents = append(ents, e) + next++ + } + + return ents +} + +// GetIDs returns an ordered set of IDs included in the given snapshot and +// the entries. The given snapshot/entries can contain three kinds of +// ID-related entry: +// - ConfChangeAddNode, in which case the contained ID will Be added into the set. +// - ConfChangeRemoveNode, in which case the contained ID will Be removed from the set. +// - ConfChangeAddLearnerNode, in which the contained ID will Be added into the set. +func GetIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { + ids := make(map[uint64]bool) + if snap != nil { + for _, id := range snap.Metadata.ConfState.Voters { + ids[id] = true + } + } + for _, e := range ents { + if e.Type != raftpb.EntryConfChange { + continue + } + var cc raftpb.ConfChange + pbutil.MustUnmarshal(&cc, e.Data) + switch cc.Type { + case raftpb.ConfChangeAddLearnerNode: + ids[cc.NodeID] = true + case raftpb.ConfChangeAddNode: + ids[cc.NodeID] = true + case raftpb.ConfChangeRemoveNode: + delete(ids, cc.NodeID) + case raftpb.ConfChangeUpdateNode: + // do nothing + default: + lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String())) + } + } + sids := make(types.Uint64Slice, 0, len(ids)) + for id := range ids { + sids = append(sids, id) + } + sort.Sort(sids) + return []uint64(sids) +} diff --git a/tests/integration/metrics_test.go b/tests/integration/metrics_test.go index 61276fc94b6..86636ce0691 100644 --- a/tests/integration/metrics_test.go +++ b/tests/integration/metrics_test.go @@ -24,7 +24,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/transport" - "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/storage" ) // TestMetricDbSizeBoot checks that the db size metric is set on boot. @@ -175,8 +175,8 @@ func TestMetricQuotaBackendBytes(t *testing.T) { if err != nil { t.Fatal(err) } - if int64(qv) != etcdserver.DefaultQuotaBytes { - t.Fatalf("expected %d, got %f", etcdserver.DefaultQuotaBytes, qv) + if int64(qv) != storage.DefaultQuotaBytes { + t.Fatalf("expected %d, got %f", storage.DefaultQuotaBytes, qv) } }