From e35dcb38afea380140600ae3825f6a4584428b97 Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Mon, 9 Oct 2023 05:03:47 -0500 Subject: [PATCH] feat: support shard affinity (#245) ## Rationale In some cases, some specific shards may contain tables whose query or write requests are massive, and these tables are hoped to be migrated to some node alone. ## Detailed Changes - Provide the shard affinity mechanism when generating the shard topology - Support to manipulate the affinity configures by debug api - Regroupping the packages of different scheduler ## Test Plan New unit tests are added. --- go.mod | 18 +- go.sum | 29 ++- pkg/coderr/code.go | 15 +- server/cluster/cluster.go | 8 +- server/coordinator/scheduler/manager/error.go | 7 + .../{ => manager}/scheduler_manager.go | 126 ++++++++--- .../{ => manager}/scheduler_manager_test.go | 10 +- .../coordinator/scheduler/nodepicker/error.go | 7 + .../nodepicker}/hash/consistent_uniform.go | 207 +++++++++++++++--- .../hash/consistent_uniform_test.go | 164 +++++++++++--- .../{ => scheduler/nodepicker}/node_picker.go | 42 +++- .../nodepicker}/node_picker_test.go | 23 +- .../scheduler.go} | 119 +++++++--- .../scheduler_test.go} | 7 +- .../scheduler.go} | 45 ++-- .../scheduler_test.go} | 6 +- server/coordinator/scheduler/scheduler.go | 15 +- .../scheduler/{ => static}/error.go | 4 +- .../scheduler.go} | 53 +++-- .../scheduler_test.go} | 7 +- server/service/http/api.go | 91 +++++++- server/service/http/error.go | 41 ++-- server/service/http/types.go | 4 + 23 files changed, 804 insertions(+), 244 deletions(-) create mode 100644 server/coordinator/scheduler/manager/error.go rename server/coordinator/scheduler/{ => manager}/scheduler_manager.go (59%) rename server/coordinator/scheduler/{ => manager}/scheduler_manager_test.go (75%) create mode 100644 server/coordinator/scheduler/nodepicker/error.go rename server/{ => coordinator/scheduler/nodepicker}/hash/consistent_uniform.go (55%) rename server/{ => coordinator/scheduler/nodepicker}/hash/consistent_uniform_test.go (56%) rename server/coordinator/{ => scheduler/nodepicker}/node_picker.go (58%) rename server/coordinator/{ => scheduler/nodepicker}/node_picker_test.go (82%) rename server/coordinator/scheduler/{rebalanced_shard_scheduler.go => rebalanced/scheduler.go} (56%) rename server/coordinator/scheduler/{rebalanced_shard_scheduler_test.go => rebalanced/scheduler_test.go} (79%) rename server/coordinator/scheduler/{reopen_shard_scheduler.go => reopen/scheduler.go} (60%) rename server/coordinator/scheduler/{reopen_shard_scheduler_test.go => reopen/scheduler_test.go} (91%) rename server/coordinator/scheduler/{ => static}/error.go (52%) rename server/coordinator/scheduler/{static_topology_shard_scheduler.go => static/scheduler.go} (65%) rename server/coordinator/scheduler/{static_topology_shard_scheduler_test.go => static/scheduler_test.go} (81%) diff --git a/go.mod b/go.mod index 61c5347d..7739d328 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,10 @@ require ( go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/server/v3 v3.5.4 go.uber.org/zap v1.21.0 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/exp v0.0.0-20231006140011-7918f672742d + golang.org/x/sync v0.4.0 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba - golang.org/x/tools v0.1.10 + golang.org/x/tools v0.14.0 google.golang.org/grpc v1.47.0 google.golang.org/protobuf v1.28.0 gotest.tools/gotestsum v1.8.1 @@ -89,13 +90,12 @@ require ( go.opentelemetry.io/proto/otlp v0.7.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect - golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect - golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect - golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect - golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect - golang.org/x/text v0.3.7 // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/mod v0.13.0 // indirect + golang.org/x/net v0.16.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 9dfb1cae..01bd3a04 100644 --- a/go.sum +++ b/go.sum @@ -154,8 +154,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -422,13 +422,16 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -447,8 +450,9 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= +golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= +golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -472,8 +476,9 @@ golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= +golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -487,8 +492,9 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -523,18 +529,21 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE= @@ -568,12 +577,12 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= +golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= +golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= diff --git a/pkg/coderr/code.go b/pkg/coderr/code.go index 2ce43305..42bc636c 100644 --- a/pkg/coderr/code.go +++ b/pkg/coderr/code.go @@ -7,13 +7,14 @@ import "net/http" type Code int const ( - Invalid Code = -1 - Ok = 0 - InvalidParams = http.StatusBadRequest - BadRequest = http.StatusBadRequest - NotFound = http.StatusNotFound - TooManyRequests = http.StatusTooManyRequests - Internal = http.StatusInternalServerError + Invalid Code = -1 + Ok = 0 + InvalidParams = http.StatusBadRequest + BadRequest = http.StatusBadRequest + NotFound = http.StatusNotFound + TooManyRequests = http.StatusTooManyRequests + Internal = http.StatusInternalServerError + ErrNotImplemented = http.StatusNotImplemented // HTTPCodeUpperBound is a bound under which any Code should have the same meaning with the http status code. HTTPCodeUpperBound = Code(1000) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 192521dc..de5ae346 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -9,7 +9,7 @@ import ( "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" - "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/manager" "github.com/CeresDB/ceresmeta/server/id" "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" @@ -27,7 +27,7 @@ type Cluster struct { procedureFactory *coordinator.Factory procedureManager procedure.Manager - schedulerManager scheduler.Manager + schedulerManager manager.SchedulerManager } func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string) (*Cluster, error) { @@ -39,7 +39,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client * dispatch := eventdispatch.NewDispatchImpl() procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage) - schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize()) + schedulerManager := manager.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize()) return &Cluster{ logger: logger, @@ -82,7 +82,7 @@ func (c *Cluster) GetProcedureFactory() *coordinator.Factory { return c.procedureFactory } -func (c *Cluster) GetSchedulerManager() scheduler.Manager { +func (c *Cluster) GetSchedulerManager() manager.SchedulerManager { return c.schedulerManager } diff --git a/server/coordinator/scheduler/manager/error.go b/server/coordinator/scheduler/manager/error.go new file mode 100644 index 00000000..ac54187f --- /dev/null +++ b/server/coordinator/scheduler/manager/error.go @@ -0,0 +1,7 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +package manager + +import "github.com/CeresDB/ceresmeta/pkg/coderr" + +var ErrInvalidTopologyType = coderr.NewCodeError(coderr.InvalidParams, "invalid topology type") diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/manager/scheduler_manager.go similarity index 59% rename from server/coordinator/scheduler/scheduler_manager.go rename to server/coordinator/scheduler/manager/scheduler_manager.go index 7e3f93d6..55abdccd 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/manager/scheduler_manager.go @@ -1,6 +1,6 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler +package manager import ( "context" @@ -10,9 +10,15 @@ import ( "sync/atomic" "time" + "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/nodepicker" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/rebalanced" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/reopen" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/static" "github.com/CeresDB/ceresmeta/server/coordinator/watch" "github.com/CeresDB/ceresmeta/server/storage" "github.com/pkg/errors" @@ -24,11 +30,11 @@ const ( schedulerInterval = time.Second * 5 ) -// Manager used to manage schedulers, it will register all schedulers when it starts. +// SchedulerManager used to manage schedulers, it will register all schedulers when it starts. // // Each registered scheduler will generate procedures if the cluster topology matches the scheduling condition. -type Manager interface { - ListScheduler() []Scheduler +type SchedulerManager interface { + ListScheduler() []scheduler.Scheduler Start(ctx context.Context) error @@ -43,32 +49,42 @@ type Manager interface { // GetDeployMode can only be used in dynamic mode, it will throw error when topology type is static. GetDeployMode(ctx context.Context) (bool, error) + // AddShardAffinityRule adds a shard affinity rule to the manager, and then apply it to the underlying schedulers. + AddShardAffinityRule(ctx context.Context, rule scheduler.ShardAffinityRule) error + + // Remove the shard rules applied to some specific rule. + RemoveShardAffinityRule(ctx context.Context, shardID storage.ShardID) error + + // ListShardAffinityRules lists all the rules about shard affinity of all the registered schedulers. + ListShardAffinityRules(ctx context.Context) (map[string]scheduler.ShardAffinityRule, error) + // Scheduler will be called when received new heartbeat, every scheduler registered in schedulerManager will be called to generate procedures. // Scheduler cloud be schedule with fix time interval or heartbeat. - Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult + Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []scheduler.ScheduleResult } -type ManagerImpl struct { +type schedulerManagerImpl struct { logger *zap.Logger procedureManager procedure.Manager factory *coordinator.Factory - nodePicker coordinator.NodePicker + nodePicker nodepicker.NodePicker client *clientv3.Client clusterMetadata *metadata.ClusterMetadata rootPath string // This lock is used to protect the following field. lock sync.RWMutex - registerSchedulers []Scheduler + registerSchedulers []scheduler.Scheduler shardWatch watch.ShardWatch isRunning atomic.Bool enableSchedule bool topologyType storage.TopologyType procedureExecutingBatchSize uint32 deployMode bool + shardAffinities map[storage.ShardID]scheduler.ShardAffinityRule } -func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) Manager { +func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) SchedulerManager { var shardWatch watch.ShardWatch switch topologyType { case storage.TopologyTypeDynamic: @@ -78,11 +94,11 @@ func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory shardWatch = watch.NewNoopShardWatch() } - return &ManagerImpl{ + return &schedulerManagerImpl{ procedureManager: procedureManager, - registerSchedulers: []Scheduler{}, + registerSchedulers: []scheduler.Scheduler{}, factory: factory, - nodePicker: coordinator.NewConsistentUniformHashNodePicker(logger), + nodePicker: nodepicker.NewConsistentUniformHashNodePicker(logger), clusterMetadata: clusterMetadata, client: client, shardWatch: shardWatch, @@ -91,10 +107,11 @@ func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory topologyType: topologyType, procedureExecutingBatchSize: procedureExecutingBatchSize, logger: logger, + shardAffinities: make(map[storage.ShardID]scheduler.ShardAffinityRule), } } -func (m *ManagerImpl) Stop(ctx context.Context) error { +func (m *schedulerManagerImpl) Stop(ctx context.Context) error { m.lock.Lock() defer m.lock.Unlock() @@ -109,7 +126,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error { return nil } -func (m *ManagerImpl) Start(ctx context.Context) error { +func (m *schedulerManagerImpl) Start(ctx context.Context) error { m.lock.Lock() defer m.lock.Unlock() @@ -185,8 +202,8 @@ func (callback *schedulerWatchCallback) OnShardExpired(ctx context.Context, even } // Schedulers should to be initialized and registered here. -func (m *ManagerImpl) initRegister() { - var schedulers []Scheduler +func (m *schedulerManagerImpl) initRegister() { + var schedulers []scheduler.Scheduler switch m.topologyType { case storage.TopologyTypeDynamic: schedulers = m.createDynamicTopologySchedulers() @@ -198,33 +215,33 @@ func (m *ManagerImpl) initRegister() { } } -func (m *ManagerImpl) createStaticTopologySchedulers() []Scheduler { - staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize) - reopenShardScheduler := NewReopenShardScheduler(m.factory, m.procedureExecutingBatchSize) - return []Scheduler{staticTopologyShardScheduler, reopenShardScheduler} +func (m *schedulerManagerImpl) createStaticTopologySchedulers() []scheduler.Scheduler { + staticTopologyShardScheduler := static.NewShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize) + reopenShardScheduler := reopen.NewShardScheduler(m.factory, m.procedureExecutingBatchSize) + return []scheduler.Scheduler{staticTopologyShardScheduler, reopenShardScheduler} } -func (m *ManagerImpl) createDynamicTopologySchedulers() []Scheduler { - rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker, m.procedureExecutingBatchSize) - reopenShardScheduler := NewReopenShardScheduler(m.factory, m.procedureExecutingBatchSize) - return []Scheduler{rebalancedShardScheduler, reopenShardScheduler} +func (m *schedulerManagerImpl) createDynamicTopologySchedulers() []scheduler.Scheduler { + rebalancedShardScheduler := rebalanced.NewShardScheduler(m.logger, m.factory, m.nodePicker, m.procedureExecutingBatchSize) + reopenShardScheduler := reopen.NewShardScheduler(m.factory, m.procedureExecutingBatchSize) + return []scheduler.Scheduler{rebalancedShardScheduler, reopenShardScheduler} } -func (m *ManagerImpl) registerScheduler(scheduler Scheduler) { +func (m *schedulerManagerImpl) registerScheduler(scheduler scheduler.Scheduler) { m.logger.Info("register new scheduler", zap.String("schedulerName", reflect.TypeOf(scheduler).String()), zap.Int("totalSchedulerLen", len(m.registerSchedulers))) m.registerSchedulers = append(m.registerSchedulers, scheduler) } -func (m *ManagerImpl) ListScheduler() []Scheduler { +func (m *schedulerManagerImpl) ListScheduler() []scheduler.Scheduler { m.lock.RLock() defer m.lock.RUnlock() return m.registerSchedulers } -func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult { +func (m *schedulerManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []scheduler.ScheduleResult { // TODO: Every scheduler should run in an independent goroutine. - results := make([]ScheduleResult, 0, len(m.registerSchedulers)) + results := make([]scheduler.ScheduleResult, 0, len(m.registerSchedulers)) for _, scheduler := range m.registerSchedulers { result, err := scheduler.Schedule(ctx, clusterSnapshot) if err != nil { @@ -236,7 +253,7 @@ func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Sn return results } -func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) { +func (m *schedulerManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) { m.lock.Lock() m.enableSchedule = enableSchedule m.lock.Unlock() @@ -244,12 +261,12 @@ func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule boo m.logger.Info("scheduler manager update enableSchedule", zap.Bool("enableSchedule", enableSchedule)) } -func (m *ManagerImpl) UpdateDeployMode(ctx context.Context, enable bool) error { +func (m *schedulerManagerImpl) UpdateDeployMode(ctx context.Context, enable bool) error { m.lock.Lock() defer m.lock.Unlock() if m.topologyType != storage.TopologyTypeDynamic { - return errors.WithMessage(ErrInvalidTopologyType, "deploy mode could only update when topology type is dynamic") + return ErrInvalidTopologyType.WithCausef("deploy mode could only update when topology type is dynamic") } m.deployMode = enable @@ -260,13 +277,54 @@ func (m *ManagerImpl) UpdateDeployMode(ctx context.Context, enable bool) error { return nil } -func (m *ManagerImpl) GetDeployMode(_ context.Context) (bool, error) { +func (m *schedulerManagerImpl) GetDeployMode(_ context.Context) (bool, error) { m.lock.RLock() defer m.lock.RUnlock() if m.topologyType != storage.TopologyTypeDynamic { - return false, errors.WithMessage(ErrInvalidTopologyType, "deploy mode could only get when topology type is dynamic") + return false, ErrInvalidTopologyType.WithCausef("deploy mode could only get when topology type is dynamic") } return m.deployMode, nil } + +func (m *schedulerManagerImpl) AddShardAffinityRule(ctx context.Context, rule scheduler.ShardAffinityRule) error { + var lastErr error + for _, scheduler := range m.registerSchedulers { + if err := scheduler.AddShardAffinityRule(ctx, rule); err != nil { + log.Error("failed to add shard affinity rule of a scheduler", zap.String("scheduler", scheduler.Name()), zap.Error(err)) + lastErr = err + } + } + + return lastErr +} + +func (m *schedulerManagerImpl) RemoveShardAffinityRule(ctx context.Context, shardID storage.ShardID) error { + var lastErr error + for _, scheduler := range m.registerSchedulers { + if err := scheduler.RemoveShardAffinityRule(ctx, shardID); err != nil { + log.Error("failed to remove shard affinity rule of a scheduler", zap.String("scheduler", scheduler.Name()), zap.Error(err)) + lastErr = err + } + } + + return lastErr +} + +func (m *schedulerManagerImpl) ListShardAffinityRules(ctx context.Context) (map[string]scheduler.ShardAffinityRule, error) { + rules := make(map[string]scheduler.ShardAffinityRule, len(m.registerSchedulers)) + var lastErr error + + for _, scheduler := range m.registerSchedulers { + rule, err := scheduler.ListShardAffinityRule(ctx) + if err != nil { + log.Error("failed to list shard affinity rule of a scheduler", zap.String("scheduler", scheduler.Name()), zap.Error(err)) + lastErr = err + } + + rules[scheduler.Name()] = rule + } + + return rules, lastErr +} diff --git a/server/coordinator/scheduler/scheduler_manager_test.go b/server/coordinator/scheduler/manager/scheduler_manager_test.go similarity index 75% rename from server/coordinator/scheduler/scheduler_manager_test.go rename to server/coordinator/scheduler/manager/scheduler_manager_test.go index 98c2df57..f8bda05a 100644 --- a/server/coordinator/scheduler/scheduler_manager_test.go +++ b/server/coordinator/scheduler/manager/scheduler_manager_test.go @@ -1,6 +1,6 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler_test +package manager_test import ( "context" @@ -9,7 +9,7 @@ import ( "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/coordinator/procedure/test" - "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/manager" "github.com/CeresDB/ceresmeta/server/etcdutil" "github.com/CeresDB/ceresmeta/server/storage" "github.com/stretchr/testify/require" @@ -31,14 +31,14 @@ func TestSchedulerManager(t *testing.T) { _, client, _ := etcdutil.PrepareEtcdServerAndClient(t) // Create scheduler manager with enableScheduler equal to false. - schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1) + schedulerManager := manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) err = schedulerManager.Stop(ctx) re.NoError(err) // Create scheduler manager with static topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1) + schedulerManager = manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers := schedulerManager.ListScheduler() @@ -47,7 +47,7 @@ func TestSchedulerManager(t *testing.T) { re.NoError(err) // Create scheduler manager with dynamic topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1) + schedulerManager = manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers = schedulerManager.ListScheduler() diff --git a/server/coordinator/scheduler/nodepicker/error.go b/server/coordinator/scheduler/nodepicker/error.go new file mode 100644 index 00000000..f5e668d5 --- /dev/null +++ b/server/coordinator/scheduler/nodepicker/error.go @@ -0,0 +1,7 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package nodepicker + +import "github.com/CeresDB/ceresmeta/pkg/coderr" + +var ErrNoAliveNodes = coderr.NewCodeError(coderr.InvalidParams, "no alive nodes is found") diff --git a/server/hash/consistent_uniform.go b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go similarity index 55% rename from server/hash/consistent_uniform.go rename to server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go index d629b4a7..816a3741 100644 --- a/server/hash/consistent_uniform.go +++ b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go @@ -38,8 +38,12 @@ import ( "sort" "github.com/CeresDB/ceresmeta/pkg/assert" + "github.com/CeresDB/ceresmeta/pkg/log" + "go.uber.org/zap" + "golang.org/x/exp/slices" ) +// TODO: Modify these error definitions to coderr. var ( // ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task. ErrInsufficientMemberCount = errors.New("insufficient member count") @@ -60,6 +64,10 @@ var ( ErrEmptyMembers = errors.New("at least one member is required") ) +// hashSeparator is used to building the virtual node name for member. +// With this special separator, it will be hard to generate duplicate virtual node names. +const hashSeparator = "@$" + type Hasher interface { Sum64([]byte) uint64 } @@ -69,6 +77,11 @@ type Member interface { String() string } +type PartitionAffinity struct { + PartitionID int + NumAllowedOtherPartitions uint +} + // Config represents a structure to control consistent package. type Config struct { // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice. @@ -78,6 +91,9 @@ type Config struct { // distribute keys uniformly. Select a big PartitionCount if you have // too many keys. ReplicationFactor int + + // The rule describes the partition affinity. + PartitionAffinities []PartitionAffinity } type virtualNode uint64 @@ -86,14 +102,14 @@ type virtualNode uint64 // consistent as possible while the members has some tiny changes. type ConsistentUniformHash struct { config Config - minLoad float64 - maxLoad float64 + minLoad int + maxLoad int numPartitions uint32 // Member name => Member members map[string]Member - // Member name => Member's load - memLoads map[string]float64 - // partition id => index of the virtualNode in the sortedRing + // Member name => Partitions allocated to this member + memPartitions map[string]map[int]struct{} + // Partition ID => index of the virtualNode in the sortedRing partitionDist map[int]int // The nodeToMems contains all the virtual nodes nodeToMems map[virtualNode]Member @@ -126,24 +142,37 @@ func BuildConsistentUniformHash(numPartitions int, members []Member, config Conf numReplicatedNodes := len(members) * config.ReplicationFactor avgLoad := float64(numPartitions) / float64(len(members)) + minLoad := int(math.Floor(avgLoad)) + maxLoad := int(math.Ceil(avgLoad)) + + memPartitions := make(map[string]map[int]struct{}, len(members)) + for _, mem := range members { + memPartitions[mem.String()] = make(map[int]struct{}, maxLoad) + } + + // Sort the affinity rule to ensure consistency. + sort.Slice(config.PartitionAffinities, func(i, j int) bool { + return config.PartitionAffinities[i].PartitionID < config.PartitionAffinities[j].PartitionID + }) c := &ConsistentUniformHash{ config: config, - minLoad: math.Floor(avgLoad), - maxLoad: math.Ceil(avgLoad), + minLoad: minLoad, + maxLoad: maxLoad, numPartitions: uint32(numPartitions), sortedRing: make([]virtualNode, 0, numReplicatedNodes), + memPartitions: memPartitions, members: make(map[string]Member, len(members)), - memLoads: make(map[string]float64, len(members)), partitionDist: make(map[int]int, numPartitions), nodeToMems: make(map[virtualNode]Member, numReplicatedNodes), } c.initializeVirtualNodes(members) c.distributePartitions() + c.ensureAffinity() return c, nil } -func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeIdx int, allowedLoad float64) bool { +func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeIdx int, allowedLoad int) bool { // A fast path to avoid unnecessary loop. if allowedLoad == 0 { return false @@ -157,10 +186,12 @@ func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeI } i := c.sortedRing[virtualNodeIdx] member := c.nodeToMems[i] - load := c.memLoads[member.String()] - if load+1 <= allowedLoad { + partitions, ok := c.memPartitions[member.String()] + assert.Assert(ok) + + if len(partitions)+1 <= allowedLoad { c.partitionDist[partID] = virtualNodeIdx - c.memLoads[member.String()]++ + partitions[partID] = struct{}{} return true } virtualNodeIdx++ @@ -171,12 +202,12 @@ func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeI } func (c *ConsistentUniformHash) distributePartition(partID, virtualNodeIdx int) { - ok := c.distributePartitionWithLoad(partID, virtualNodeIdx, c.MinLoad()) + ok := c.distributePartitionWithLoad(partID, virtualNodeIdx, c.minLoad) if ok { return } - ok = c.distributePartitionWithLoad(partID, virtualNodeIdx, c.MaxLoad()) + ok = c.distributePartitionWithLoad(partID, virtualNodeIdx, c.maxLoad) assert.Assertf(ok, "not enough room to distribute partitions") } @@ -195,20 +226,52 @@ func (c *ConsistentUniformHash) distributePartitions() { } } -func (c *ConsistentUniformHash) MinLoad() float64 { - return c.minLoad +func (c *ConsistentUniformHash) MinLoad() uint { + return uint(c.minLoad) +} + +func (c *ConsistentUniformHash) MaxLoad() uint { + return uint(c.maxLoad) +} + +// LoadDistribution exposes load distribution of members. +func (c *ConsistentUniformHash) LoadDistribution() map[string]uint { + loads := make(map[string]uint, len(c.memPartitions)) + for member, partitions := range c.memPartitions { + loads[member] = uint(len(partitions)) + } + return loads } -func (c *ConsistentUniformHash) MaxLoad() float64 { - return c.maxLoad +// GetPartitionOwner returns the owner of the given partition. +func (c *ConsistentUniformHash) GetPartitionOwner(partID int) Member { + virtualNodeIdx, ok := c.partitionDist[partID] + if !ok { + return nil + } + virtualNode := c.sortedRing[virtualNodeIdx] + mem, ok := c.nodeToMems[virtualNode] + assert.Assertf(ok, "member must exist for the virtual node") + return mem } func (c *ConsistentUniformHash) initializeVirtualNodes(members []Member) { + // Ensure the order of members to avoid inconsistency caused by hash collisions. + sort.Slice(members, func(i, j int) bool { + return members[i].String() < members[j].String() + }) + for _, mem := range members { for i := 0; i < c.config.ReplicationFactor; i++ { // TODO: Shall use a more generic hasher which receives multiple slices or string? - key := []byte(fmt.Sprintf("%s%d", mem.String(), i)) + key := []byte(fmt.Sprintf("%s%s%d", mem.String(), hashSeparator, i)) h := virtualNode(c.config.Hasher.Sum64(key)) + + oldMem, ok := c.nodeToMems[h] + if ok { + log.Warn("found hash collision", zap.String("oldMem", oldMem.String()), zap.String("newMem", mem.String())) + } + c.nodeToMems[h] = mem c.sortedRing = append(c.sortedRing, h) } @@ -220,24 +283,98 @@ func (c *ConsistentUniformHash) initializeVirtualNodes(members []Member) { }) } -// LoadDistribution exposes load distribution of members. -func (c *ConsistentUniformHash) LoadDistribution() map[string]float64 { - // Create a thread-safe copy - res := make(map[string]float64) - for member, load := range c.memLoads { - res[member] = load +func (c *ConsistentUniformHash) ensureAffinity() { + offloadedMems := make(map[string]struct{}, len(c.config.PartitionAffinities)) + + for _, affinity := range c.config.PartitionAffinities { + partID := affinity.PartitionID + vNodeIdx := c.partitionDist[partID] + vNode := c.sortedRing[vNodeIdx] + mem, ok := c.nodeToMems[vNode] + assert.Assert(ok) + offloadedMems[mem.String()] = struct{}{} + + allowedLoad := int(affinity.NumAllowedOtherPartitions) + 1 + memPartIDs, ok := c.memPartitions[mem.String()] + assert.Assert(ok) + memLoad := len(memPartIDs) + if memLoad > allowedLoad { + c.offloadMember(mem, memPartIDs, partID, allowedLoad, offloadedMems) + } } - return res } -// GetPartitionOwner returns the owner of the given partition. -func (c *ConsistentUniformHash) GetPartitionOwner(partID int) Member { - virtualNodeIdx, ok := c.partitionDist[partID] - if !ok { - return nil +// offloadMember tries to offload the given member by moving its partitions to other members. +func (c *ConsistentUniformHash) offloadMember(mem Member, memPartitions map[int]struct{}, retainedPartID, numAllowedParts int, offloadedMems map[string]struct{}) { + assert.Assertf(numAllowedParts >= 1, "At least the partition itself should be allowed") + partIDsToOffload := make([]int, 0, len(memPartitions)-numAllowedParts) + // The `retainedPartID` must be retained. + numRetainedParts := 1 + for partID := range memPartitions { + if partID == retainedPartID { + continue + } + + if numRetainedParts < numAllowedParts { + numRetainedParts++ + continue + } + + partIDsToOffload = append(partIDsToOffload, partID) } - virtualNode := c.sortedRing[virtualNodeIdx] - mem, ok := c.nodeToMems[virtualNode] - assert.Assertf(ok, "member must exist for the virtual node") - return mem + + slices.Sort(partIDsToOffload) + for _, partID := range partIDsToOffload { + c.offloadPartition(partID, mem, offloadedMems) + } +} + +func (c *ConsistentUniformHash) offloadPartition(sourcePartID int, sourceMem Member, blackedMembers map[string]struct{}) { + // Ensure all members' load smaller than the max load as much as possible. + loadUpperBound := c.numPartitions + for load := c.maxLoad; load < int(loadUpperBound); load++ { + if done := c.offloadPartitionWithAllowedLoad(sourcePartID, sourceMem, load, blackedMembers); done { + return + } + } + + log.Warn("failed to offload partition") +} + +func (c *ConsistentUniformHash) offloadPartitionWithAllowedLoad(sourcePartID int, sourceMem Member, allowedMaxLoad int, blackedMembers map[string]struct{}) bool { + vNodeIdx := c.partitionDist[sourcePartID] + // Skip the first member which must not be the target to move. + for loopCnt := 1; loopCnt < len(c.sortedRing); loopCnt++ { + vNodeIdx++ + if vNodeIdx == len(c.sortedRing) { + vNodeIdx = 0 + } + + vNode := c.sortedRing[vNodeIdx] + mem, ok := c.nodeToMems[vNode] + assert.Assert(ok) + + // Check whether this member is blacked. + if _, blacked := blackedMembers[mem.String()]; blacked { + continue + } + + memPartitions, ok := c.memPartitions[mem.String()] + assert.Assert(ok) + memLoad := len(memPartitions) + // Check whether the member's load is too allowed. + if memLoad+1 > allowedMaxLoad { + continue + } + + // The member meets the requirement, let's move the `sourcePartID` to this member. + memPartitions[sourcePartID] = struct{}{} + c.partitionDist[sourcePartID] = vNodeIdx + sourceMemPartitions, ok := c.memPartitions[sourceMem.String()] + assert.Assert(ok) + delete(sourceMemPartitions, sourcePartID) + return true + } + + return false } diff --git a/server/hash/consistent_uniform_test.go b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go similarity index 56% rename from server/hash/consistent_uniform_test.go rename to server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go index d4ffd569..3eb1c3cb 100644 --- a/server/hash/consistent_uniform_test.go +++ b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go @@ -32,34 +32,36 @@ import ( "github.com/stretchr/testify/assert" ) -func newConfig() Config { - return Config{ - ReplicationFactor: 127, - Hasher: hasher{}, - } -} - type testMember string func (tm testMember) String() string { return string(tm) } -type hasher struct{} +type testHasher struct{} -func (hs hasher) Sum64(data []byte) uint64 { +func (hs testHasher) Sum64(data []byte) uint64 { h := fnv.New64() _, _ = h.Write(data) return h.Sum64() } -func checkUniform(t *testing.T, numPartitions, numMembers int) { +func buildTestMembers(n int) []Member { members := []Member{} - for i := 0; i < numMembers; i++ { + for i := 0; i < n; i++ { member := testMember(fmt.Sprintf("node-%d", i)) members = append(members, member) } - cfg := newConfig() + + return members +} + +func checkUniform(t *testing.T, numPartitions, numMembers int) { + members := buildTestMembers(numMembers) + cfg := Config{ + ReplicationFactor: 127, + Hasher: testHasher{}, + } c, err := BuildConsistentUniformHash(numPartitions, members, cfg) assert.NoError(t, err) @@ -80,7 +82,7 @@ func checkUniform(t *testing.T, numPartitions, numMembers int) { func TestZeroReplicationFactor(t *testing.T) { cfg := Config{ ReplicationFactor: 0, - Hasher: hasher{}, + Hasher: testHasher{}, } _, err := BuildConsistentUniformHash(0, []Member{testMember("")}, cfg) assert.Error(t, err) @@ -98,7 +100,7 @@ func TestEmptyHasher(t *testing.T) { func TestEmptyMembers(t *testing.T) { cfg := Config{ ReplicationFactor: 127, - Hasher: hasher{}, + Hasher: testHasher{}, } _, err := BuildConsistentUniformHash(0, []Member{}, cfg) assert.Error(t, err) @@ -107,7 +109,7 @@ func TestEmptyMembers(t *testing.T) { func TestNegativeNumPartitions(t *testing.T) { cfg := Config{ ReplicationFactor: 127, - Hasher: hasher{}, + Hasher: testHasher{}, } _, err := BuildConsistentUniformHash(-1, []Member{testMember("")}, cfg) assert.Error(t, err) @@ -137,12 +139,11 @@ func computeDiffBetweenDist(t *testing.T, oldDist, newDist map[int]string) int { } func checkConsistent(t *testing.T, numPartitions, numMembers, maxDiff int) { - members := make([]Member, 0, numMembers) - for i := 0; i < numMembers; i++ { - member := testMember(fmt.Sprintf("node-%d", i)) - members = append(members, member) + members := buildTestMembers(numMembers) + cfg := Config{ + ReplicationFactor: 127, + Hasher: testHasher{}, } - cfg := newConfig() c, err := BuildConsistentUniformHash(numPartitions, members, cfg) assert.NoError(t, err) @@ -150,10 +151,23 @@ func checkConsistent(t *testing.T, numPartitions, numMembers, maxDiff int) { for partID := 0; partID < numPartitions; partID++ { distribution[partID] = c.GetPartitionOwner(partID).String() } + sortedRing := c.sortedRing + nodeToMems := c.nodeToMems { - c, err := BuildConsistentUniformHash(numPartitions, members, cfg) + newMembers := make([]Member, 0, numMembers) + for i := numMembers - 1; i >= 0; i-- { + newMembers = append(newMembers, members[i]) + } + c, err := BuildConsistentUniformHash(numPartitions, newMembers, cfg) assert.NoError(t, err) + + newSortedRing := c.sortedRing + assert.Equal(t, sortedRing, newSortedRing) + + newNodeToMems := c.nodeToMems + assert.Equal(t, nodeToMems, newNodeToMems) + newDistribution := make(map[int]string, numPartitions) for partID := 0; partID < numPartitions; partID++ { newDistribution[partID] = c.GetPartitionOwner(partID).String() @@ -186,9 +200,107 @@ func checkConsistent(t *testing.T, numPartitions, numMembers, maxDiff int) { } func TestConsistency(t *testing.T) { - checkConsistent(t, 120, 20, 30) - checkConsistent(t, 100, 20, 25) - checkConsistent(t, 128, 70, 26) - checkConsistent(t, 256, 30, 70) - checkConsistent(t, 17, 5, 7) + checkConsistent(t, 120, 20, 12) + checkConsistent(t, 100, 20, 11) + checkConsistent(t, 128, 70, 10) + checkConsistent(t, 256, 30, 42) + checkConsistent(t, 17, 5, 10) +} + +func checkAffinity(t *testing.T, numPartitions, numMembers int, affinities []PartitionAffinity, revisedMaxLoad uint) { + members := buildTestMembers(numMembers) + cfg := Config{ + ReplicationFactor: 127, + Hasher: testHasher{}, + PartitionAffinities: affinities, + } + c, err := BuildConsistentUniformHash(numPartitions, members, cfg) + assert.NoError(t, err) + + minLoad := c.MinLoad() + maxLoad := c.MaxLoad() + if maxLoad < revisedMaxLoad { + maxLoad = revisedMaxLoad + } + loadDistribution := c.LoadDistribution() + for _, mem := range members { + load, ok := loadDistribution[mem.String()] + if !ok { + assert.Equal(t, 0.0, minLoad) + } + assert.LessOrEqual(t, load, maxLoad) + } + + for _, affinity := range affinities { + mem := c.GetPartitionOwner(affinity.PartitionID) + load := loadDistribution[mem.String()] + allowedMaxLoad := affinity.NumAllowedOtherPartitions + 1 + assert.LessOrEqual(t, load, allowedMaxLoad) + } + + distribution := make(map[int]string, numPartitions) + for partID := 0; partID < numPartitions; partID++ { + distribution[partID] = c.GetPartitionOwner(partID).String() + } + { + newMembers := make([]Member, 0, numMembers) + for i := numMembers - 1; i >= 0; i-- { + newMembers = append(newMembers, members[i]) + } + c, err := BuildConsistentUniformHash(numPartitions, newMembers, cfg) + assert.NoError(t, err) + + newDistribution := make(map[int]string, numPartitions) + for partID := 0; partID < numPartitions; partID++ { + newDistribution[partID] = c.GetPartitionOwner(partID).String() + } + numDiffs := computeDiffBetweenDist(t, distribution, newDistribution) + assert.Equal(t, numDiffs, 0) + } +} + +func TestAffinity(t *testing.T) { + rule := []PartitionAffinity{} + checkAffinity(t, 120, 72, rule, 0) + checkAffinity(t, 0, 72, rule, 0) + + rule = []PartitionAffinity{ + {0, 0}, + {1, 0}, + {2, 120}, + } + checkAffinity(t, 3, 72, rule, 0) + checkAffinity(t, 72, 72, rule, 0) + + rule = []PartitionAffinity{ + {7, 0}, + {31, 0}, + {41, 0}, + {45, 0}, + {58, 0}, + {81, 0}, + {87, 0}, + {88, 0}, + {89, 0}, + } + checkAffinity(t, 128, 72, rule, 0) +} + +func TestInvalidAffinity(t *testing.T) { + // This affinity rule requires at least 4 member, but it should work too. + rule := []PartitionAffinity{ + {0, 0}, + {1, 0}, + {2, 0}, + {3, 0}, + } + + members := buildTestMembers(3) + cfg := Config{ + ReplicationFactor: 127, + Hasher: testHasher{}, + PartitionAffinities: rule, + } + _, err := BuildConsistentUniformHash(4, members, cfg) + assert.NoError(t, err) } diff --git a/server/coordinator/node_picker.go b/server/coordinator/scheduler/nodepicker/node_picker.go similarity index 58% rename from server/coordinator/node_picker.go rename to server/coordinator/scheduler/nodepicker/node_picker.go index 91e27db6..89fc3c20 100644 --- a/server/coordinator/node_picker.go +++ b/server/coordinator/scheduler/nodepicker/node_picker.go @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -package coordinator +package nodepicker import ( "context" @@ -8,14 +8,33 @@ import ( "github.com/CeresDB/ceresmeta/pkg/assert" "github.com/CeresDB/ceresmeta/server/cluster/metadata" - "github.com/CeresDB/ceresmeta/server/hash" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/nodepicker/hash" "github.com/CeresDB/ceresmeta/server/storage" "github.com/spaolacci/murmur3" "go.uber.org/zap" ) +type Config struct { + NumTotalShards uint32 + ShardAffinityRule map[storage.ShardID]scheduler.ShardAffinity +} + +func (c Config) genPartitionAffinities() []hash.PartitionAffinity { + affinities := make([]hash.PartitionAffinity, 0, len(c.ShardAffinityRule)) + for shardID, affinity := range c.ShardAffinityRule { + partitionID := int(shardID) + affinities = append(affinities, hash.PartitionAffinity{ + PartitionID: partitionID, + NumAllowedOtherPartitions: affinity.NumAllowedOtherShards, + }) + } + + return affinities +} + type NodePicker interface { - PickNode(ctx context.Context, shardIDs []storage.ShardID, shardTotalNum uint32, registerNodes []metadata.RegisteredNode) (map[storage.ShardID]metadata.RegisteredNode, error) + PickNode(ctx context.Context, config Config, shardIDs []storage.ShardID, registerNodes []metadata.RegisteredNode) (map[storage.ShardID]metadata.RegisteredNode, error) } type ConsistentUniformHashNodePicker struct { @@ -56,10 +75,10 @@ func filterExpiredNodes(nodes []metadata.RegisteredNode) map[string]metadata.Reg return aliveNodes } -func (p *ConsistentUniformHashNodePicker) PickNode(_ context.Context, shardIDs []storage.ShardID, shardTotalNum uint32, registerNodes []metadata.RegisteredNode) (map[storage.ShardID]metadata.RegisteredNode, error) { +func (p *ConsistentUniformHashNodePicker) PickNode(_ context.Context, config Config, shardIDs []storage.ShardID, registerNodes []metadata.RegisteredNode) (map[storage.ShardID]metadata.RegisteredNode, error) { aliveNodes := filterExpiredNodes(registerNodes) if len(aliveNodes) == 0 { - return nil, ErrPickNode.WithCausef("no alive node in cluster") + return nil, ErrNoAliveNodes.WithCausef("registerNodes:%+v", registerNodes) } mems := make([]hash.Member, 0, len(aliveNodes)) @@ -69,18 +88,19 @@ func (p *ConsistentUniformHashNodePicker) PickNode(_ context.Context, shardIDs [ } } - conf := hash.Config{ - ReplicationFactor: uniformHashReplicationFactor, - Hasher: hasher{}, + hashConf := hash.Config{ + ReplicationFactor: uniformHashReplicationFactor, + Hasher: hasher{}, + PartitionAffinities: config.genPartitionAffinities(), } - h, err := hash.BuildConsistentUniformHash(int(shardTotalNum), mems, conf) + h, err := hash.BuildConsistentUniformHash(int(config.NumTotalShards), mems, hashConf) if err != nil { - return nil, ErrPickNode.WithCause(err) + return nil, err } shardNodes := make(map[storage.ShardID]metadata.RegisteredNode, len(registerNodes)) for _, shardID := range shardIDs { - assert.Assert(shardID < storage.ShardID(shardTotalNum)) + assert.Assert(shardID < storage.ShardID(config.NumTotalShards)) partID := int(shardID) nodeName := h.GetPartitionOwner(partID).String() node, ok := aliveNodes[nodeName] diff --git a/server/coordinator/node_picker_test.go b/server/coordinator/scheduler/nodepicker/node_picker_test.go similarity index 82% rename from server/coordinator/node_picker_test.go rename to server/coordinator/scheduler/nodepicker/node_picker_test.go index 465773b3..a0e47fcc 100644 --- a/server/coordinator/node_picker_test.go +++ b/server/coordinator/scheduler/nodepicker/node_picker_test.go @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -package coordinator +package nodepicker_test import ( "context" @@ -10,6 +10,7 @@ import ( "time" "github.com/CeresDB/ceresmeta/server/cluster/metadata" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/nodepicker" "github.com/CeresDB/ceresmeta/server/storage" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -25,10 +26,13 @@ func TestNodePicker(t *testing.T) { re := require.New(t) ctx := context.Background() - nodePicker := NewConsistentUniformHashNodePicker(zap.NewNop()) + nodePicker := nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()) var nodes []metadata.RegisteredNode - _, err := nodePicker.PickNode(ctx, []storage.ShardID{0}, defaultTotalShardNum, nodes) + config := nodepicker.Config{ + NumTotalShards: defaultTotalShardNum, + } + _, err := nodePicker.PickNode(ctx, config, []storage.ShardID{0}, nodes) re.Error(err) for i := 0; i < nodeLength; i++ { @@ -37,7 +41,7 @@ func TestNodePicker(t *testing.T) { ShardInfos: nil, }) } - _, err = nodePicker.PickNode(ctx, []storage.ShardID{0}, defaultTotalShardNum, nodes) + _, err = nodePicker.PickNode(ctx, config, []storage.ShardID{0}, nodes) re.Error(err) nodes = nodes[:0] @@ -47,7 +51,7 @@ func TestNodePicker(t *testing.T) { ShardInfos: nil, }) } - _, err = nodePicker.PickNode(ctx, []storage.ShardID{0}, defaultTotalShardNum, nodes) + _, err = nodePicker.PickNode(ctx, config, []storage.ShardID{0}, nodes) re.NoError(err) nodes = nodes[:0] @@ -58,7 +62,7 @@ func TestNodePicker(t *testing.T) { }) } nodes[selectOnlineNodeIndex].Node.LastTouchTime = uint64(time.Now().UnixMilli()) - shardNodeMapping, err := nodePicker.PickNode(ctx, []storage.ShardID{0}, defaultTotalShardNum, nodes) + shardNodeMapping, err := nodePicker.PickNode(ctx, config, []storage.ShardID{0}, nodes) re.NoError(err) re.Equal(strconv.Itoa(selectOnlineNodeIndex), shardNodeMapping[0].Node.Name) } @@ -67,7 +71,7 @@ func TestUniformity(t *testing.T) { re := require.New(t) ctx := context.Background() - nodePicker := NewConsistentUniformHashNodePicker(zap.NewNop()) + nodePicker := nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()) mapping := allocShards(ctx, nodePicker, 30, 256, re) maxShardNum := 256/30 + 1 for _, shards := range mapping { @@ -118,7 +122,7 @@ func TestUniformity(t *testing.T) { } } -func allocShards(ctx context.Context, nodePicker NodePicker, nodeNum int, shardNum int, re *require.Assertions) map[string][]int { +func allocShards(ctx context.Context, nodePicker nodepicker.NodePicker, nodeNum int, shardNum int, re *require.Assertions) map[string][]int { var nodes []metadata.RegisteredNode for i := 0; i < nodeNum; i++ { nodes = append(nodes, metadata.RegisteredNode{ @@ -131,7 +135,8 @@ func allocShards(ctx context.Context, nodePicker NodePicker, nodeNum int, shardN for i := 0; i < shardNum; i++ { shardIDs = append(shardIDs, storage.ShardID(i)) } - shardNodeMapping, err := nodePicker.PickNode(ctx, shardIDs, uint32(shardNum), nodes) + config := nodepicker.Config{NumTotalShards: uint32(shardNum)} + shardNodeMapping, err := nodePicker.PickNode(ctx, config, shardIDs, nodes) re.NoError(err) for shardID, node := range shardNodeMapping { mapping[node.Node.Name] = append(mapping[node.Node.Name], int(shardID)) diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced/scheduler.go similarity index 56% rename from server/coordinator/scheduler/rebalanced_shard_scheduler.go rename to server/coordinator/scheduler/rebalanced/scheduler.go index 0ecb3bed..eab45504 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced/scheduler.go @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler +package rebalanced import ( "context" @@ -12,61 +12,98 @@ import ( "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/nodepicker" "github.com/CeresDB/ceresmeta/server/storage" "go.uber.org/zap" + "golang.org/x/exp/maps" ) -type RebalancedShardScheduler struct { +type schedulerImpl struct { logger *zap.Logger factory *coordinator.Factory - nodePicker coordinator.NodePicker + nodePicker nodepicker.NodePicker procedureExecutingBatchSize uint32 - // Mutex is used to protect following fields. + // The lock is used to protect following fields. lock sync.Mutex // latestShardNodeMapping is used to record last stable shard topology, // when deployMode is true, rebalancedShardScheduler will recover cluster according to the topology. latestShardNodeMapping map[storage.ShardID]metadata.RegisteredNode - deployMode bool + // The `latestShardNodeMapping` will be used directly, if deployMode is set. + deployMode bool + // shardAffinityRule is used to control the shard distribution. + shardAffinityRule map[storage.ShardID]scheduler.ShardAffinity } -func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { - return &RebalancedShardScheduler{ +func NewShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker nodepicker.NodePicker, procedureExecutingBatchSize uint32) scheduler.Scheduler { + return &schedulerImpl{ logger: logger, factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize, + lock: sync.Mutex{}, + shardAffinityRule: make(map[storage.ShardID]scheduler.ShardAffinity), + latestShardNodeMapping: make(map[storage.ShardID]metadata.RegisteredNode), } } -func (r *RebalancedShardScheduler) UpdateDeployMode(_ context.Context, enable bool) { +func (r *schedulerImpl) Name() string { + return "rebalanced_scheduler" +} + +func (r *schedulerImpl) UpdateDeployMode(_ context.Context, enable bool) { r.updateDeployMode(enable) } -func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { +func (r *schedulerImpl) AddShardAffinityRule(_ context.Context, rule scheduler.ShardAffinityRule) error { + r.lock.Lock() + defer r.lock.Unlock() + + for _, shardAffinity := range rule.Affinities { + r.shardAffinityRule[shardAffinity.ShardID] = shardAffinity + } + + return nil +} + +func (r *schedulerImpl) RemoveShardAffinityRule(_ context.Context, shardID storage.ShardID) error { + r.lock.Lock() + defer r.lock.Unlock() + + delete(r.shardAffinityRule, shardID) + + return nil +} + +func (r *schedulerImpl) ListShardAffinityRule(_ context.Context) (scheduler.ShardAffinityRule, error) { + r.lock.Lock() + defer r.lock.Unlock() + + affinities := make([]scheduler.ShardAffinity, 0, len(r.shardAffinityRule)) + for _, affinity := range r.shardAffinityRule { + affinities = append(affinities, affinity) + } + + return scheduler.ShardAffinityRule{Affinities: affinities}, nil +} + +func (r *schedulerImpl) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (scheduler.ScheduleResult, error) { // RebalancedShardScheduler can only be scheduled when the cluster is not empty. if clusterSnapshot.Topology.ClusterView.State == storage.ClusterStateEmpty { - return ScheduleResult{}, nil + return scheduler.ScheduleResult{}, nil } var procedures []procedure.Procedure var reasons strings.Builder - // TODO: Improve scheduling efficiency and verify whether the topology changes. - shardIDs := make([]storage.ShardID, 0, len(clusterSnapshot.Topology.ShardViewsMapping)) - for shardID := range clusterSnapshot.Topology.ShardViewsMapping { - shardIDs = append(shardIDs, shardID) - } - numShards := uint32(len(clusterSnapshot.Topology.ShardViewsMapping)) // ShardNodeMapping only update when deployMode is false. - if !r.deployMode { - newShardNodeMapping, err := r.nodePicker.PickNode(ctx, shardIDs, numShards, clusterSnapshot.RegisteredNodes) - if err != nil { - return ScheduleResult{}, err - } - r.updateShardNodeMapping(newShardNodeMapping) + shardNodeMapping, err := r.generateLatestShardNodeMapping(ctx, clusterSnapshot) + if err != nil { + return scheduler.ScheduleResult{}, nil } + numShards := uint32(len(clusterSnapshot.Topology.ShardViewsMapping)) // Generate assigned shards mapping and transfer leader if node is changed. assignedShardIDs := make(map[storage.ShardID]struct{}, numShards) for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes { @@ -77,7 +114,7 @@ func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot // Mark the shard assigned. assignedShardIDs[shardNode.ID] = struct{}{} - newLeaderNode, ok := r.latestShardNodeMapping[shardNode.ID] + newLeaderNode, ok := shardNodeMapping[shardNode.ID] assert.Assert(ok) if newLeaderNode.Node.Name != shardNode.NodeName { r.logger.Info("rebalanced shard scheduler try to assign shard to another node", zap.Uint64("shardID", uint64(shardNode.ID)), zap.String("originNode", shardNode.NodeName), zap.String("newNode", newLeaderNode.Node.Name)) @@ -88,7 +125,7 @@ func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot NewLeaderNodeName: newLeaderNode.Node.Name, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } procedures = append(procedures, p) @@ -116,7 +153,7 @@ func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot NewLeaderNodeName: node.Node.Name, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } procedures = append(procedures, p) @@ -125,7 +162,7 @@ func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot } if len(procedures) == 0 { - return ScheduleResult{}, nil + return scheduler.ScheduleResult{}, nil } batchProcedure, err := r.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ @@ -133,20 +170,40 @@ func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot BatchType: procedure.TransferLeader, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } - return ScheduleResult{batchProcedure, reasons.String()}, nil + return scheduler.ScheduleResult{Procedure: batchProcedure, Reason: reasons.String()}, nil } -func (r *RebalancedShardScheduler) updateShardNodeMapping(newShardNodeMapping map[storage.ShardID]metadata.RegisteredNode) { +func (r *schedulerImpl) generateLatestShardNodeMapping(ctx context.Context, snapshot metadata.Snapshot) (map[storage.ShardID]metadata.RegisteredNode, error) { + numShards := uint32(len(snapshot.Topology.ShardViewsMapping)) + // TODO: Improve scheduling efficiency and verify whether the topology changes. + shardIDs := make([]storage.ShardID, 0, numShards) + for shardID := range snapshot.Topology.ShardViewsMapping { + shardIDs = append(shardIDs, shardID) + } + r.lock.Lock() defer r.lock.Unlock() + var err error + shardNodeMapping := r.latestShardNodeMapping + if !r.deployMode { + pickConfig := nodepicker.Config{ + NumTotalShards: numShards, + ShardAffinityRule: maps.Clone(r.shardAffinityRule), + } + shardNodeMapping, err = r.nodePicker.PickNode(ctx, pickConfig, shardIDs, snapshot.RegisteredNodes) + if err != nil { + return nil, err + } + r.latestShardNodeMapping = shardNodeMapping + } - r.latestShardNodeMapping = newShardNodeMapping + return shardNodeMapping, nil } -func (r *RebalancedShardScheduler) updateDeployMode(deployMode bool) { +func (r *schedulerImpl) updateDeployMode(deployMode bool) { r.lock.Lock() defer r.lock.Unlock() diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced/scheduler_test.go similarity index 79% rename from server/coordinator/scheduler/rebalanced_shard_scheduler_test.go rename to server/coordinator/scheduler/rebalanced/scheduler_test.go index 5691d4a7..8015b364 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced/scheduler_test.go @@ -1,6 +1,6 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler_test +package rebalanced_test import ( "context" @@ -8,7 +8,8 @@ import ( "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure/test" - "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/nodepicker" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/rebalanced" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -19,7 +20,7 @@ func TestRebalancedScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) + s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/coordinator/scheduler/reopen_shard_scheduler.go b/server/coordinator/scheduler/reopen/scheduler.go similarity index 60% rename from server/coordinator/scheduler/reopen_shard_scheduler.go rename to server/coordinator/scheduler/reopen/scheduler.go index 31e72ab0..e008b34f 100644 --- a/server/coordinator/scheduler/reopen_shard_scheduler.go +++ b/server/coordinator/scheduler/reopen/scheduler.go @@ -1,6 +1,6 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler +package reopen import ( "context" @@ -11,30 +11,47 @@ import ( "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" "github.com/CeresDB/ceresmeta/server/storage" ) -// ReopenShardScheduler used to reopen shards in status PartitionOpen. -type ReopenShardScheduler struct { +// schedulerImpl used to reopen shards in status PartitionOpen. +type schedulerImpl struct { factory *coordinator.Factory procedureExecutingBatchSize uint32 } -func NewReopenShardScheduler(factory *coordinator.Factory, procedureExecutingBatchSize uint32) ReopenShardScheduler { - return ReopenShardScheduler{ +func NewShardScheduler(factory *coordinator.Factory, procedureExecutingBatchSize uint32) scheduler.Scheduler { + return schedulerImpl{ factory: factory, procedureExecutingBatchSize: procedureExecutingBatchSize, } } -func (r ReopenShardScheduler) UpdateDeployMode(_ context.Context, _ bool) { +func (r schedulerImpl) Name() string { + return "reopen_scheduler" +} + +func (r schedulerImpl) UpdateDeployMode(_ context.Context, _ bool) { // ReopenShardScheduler do not need deployMode. } -func (r ReopenShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { +func (r schedulerImpl) AddShardAffinityRule(_ context.Context, _ scheduler.ShardAffinityRule) error { + return nil +} + +func (r schedulerImpl) RemoveShardAffinityRule(_ context.Context, _ storage.ShardID) error { + return nil +} + +func (r schedulerImpl) ListShardAffinityRule(_ context.Context) (scheduler.ShardAffinityRule, error) { + return scheduler.ShardAffinityRule{Affinities: []scheduler.ShardAffinity{}}, nil +} + +func (r schedulerImpl) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (scheduler.ScheduleResult, error) { // ReopenShardScheduler can only be scheduled when the cluster is stable. if !clusterSnapshot.Topology.IsStable() { - return ScheduleResult{}, nil + return scheduler.ScheduleResult{}, nil } now := time.Now() @@ -57,7 +74,7 @@ func (r ReopenShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta NewLeaderNodeName: registeredNode.Node.Name, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } procedures = append(procedures, p) @@ -69,7 +86,7 @@ func (r ReopenShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta } if len(procedures) == 0 { - return ScheduleResult{}, nil + return scheduler.ScheduleResult{}, nil } batchProcedure, err := r.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ @@ -77,12 +94,12 @@ func (r ReopenShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta BatchType: procedure.TransferLeader, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } - return ScheduleResult{ - batchProcedure, - reasons.String(), + return scheduler.ScheduleResult{ + Procedure: batchProcedure, + Reason: reasons.String(), }, nil } diff --git a/server/coordinator/scheduler/reopen_shard_scheduler_test.go b/server/coordinator/scheduler/reopen/scheduler_test.go similarity index 91% rename from server/coordinator/scheduler/reopen_shard_scheduler_test.go rename to server/coordinator/scheduler/reopen/scheduler_test.go index 744fec52..000fc288 100644 --- a/server/coordinator/scheduler/reopen_shard_scheduler_test.go +++ b/server/coordinator/scheduler/reopen/scheduler_test.go @@ -1,6 +1,6 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler_test +package reopen_test import ( "context" @@ -9,7 +9,7 @@ import ( "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure/test" - "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/reopen" "github.com/CeresDB/ceresmeta/server/storage" "github.com/stretchr/testify/require" ) @@ -20,7 +20,7 @@ func TestReopenShardScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewReopenShardScheduler(procedureFactory, 1) + s := reopen.NewShardScheduler(procedureFactory, 1) emptyCluster := test.InitEmptyCluster(ctx, t) // ReopenShardScheduler should not schedule when cluster is not stable. diff --git a/server/coordinator/scheduler/scheduler.go b/server/coordinator/scheduler/scheduler.go index eee9ca71..2494855f 100644 --- a/server/coordinator/scheduler/scheduler.go +++ b/server/coordinator/scheduler/scheduler.go @@ -7,6 +7,7 @@ import ( "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/storage" ) type ScheduleResult struct { @@ -15,11 +16,23 @@ type ScheduleResult struct { Reason string } +type ShardAffinity struct { + ShardID storage.ShardID `json:"shardID"` + NumAllowedOtherShards uint `json:"numAllowedOtherShards"` +} + +type ShardAffinityRule struct { + Affinities []ShardAffinity +} + type Scheduler interface { + Name() string // Schedule will generate procedure based on current cluster snapshot, which will be submitted to ProcedureManager, and whether it is actually executed depends on the current state of ProcedureManager. Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) - // UpdateDeployMode is used to update deployMode for scheduler, // DeployMode means that the cluster topology is locked and the mapping between shards and nodes cannot be changed. UpdateDeployMode(ctx context.Context, enable bool) + AddShardAffinityRule(ctx context.Context, rule ShardAffinityRule) error + RemoveShardAffinityRule(ctx context.Context, shardID storage.ShardID) error + ListShardAffinityRule(ctx context.Context) (ShardAffinityRule, error) } diff --git a/server/coordinator/scheduler/error.go b/server/coordinator/scheduler/static/error.go similarity index 52% rename from server/coordinator/scheduler/error.go rename to server/coordinator/scheduler/static/error.go index 81963752..3c732279 100644 --- a/server/coordinator/scheduler/error.go +++ b/server/coordinator/scheduler/static/error.go @@ -1,7 +1,7 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler +package static import "github.com/CeresDB/ceresmeta/pkg/coderr" -var ErrInvalidTopologyType = coderr.NewCodeError(coderr.Internal, "invalid topology type") +var ErrNotImplemented = coderr.NewCodeError(coderr.ErrNotImplemented, "no") diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static/scheduler.go similarity index 65% rename from server/coordinator/scheduler/static_topology_shard_scheduler.go rename to server/coordinator/scheduler/static/scheduler.go index a804143b..ac35fb47 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static/scheduler.go @@ -1,6 +1,6 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler +package static import ( "context" @@ -11,31 +11,49 @@ import ( "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/nodepicker" "github.com/CeresDB/ceresmeta/server/storage" "github.com/pkg/errors" ) -type StaticTopologyShardScheduler struct { +type schedulerImpl struct { factory *coordinator.Factory - nodePicker coordinator.NodePicker + nodePicker nodepicker.NodePicker procedureExecutingBatchSize uint32 } -func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { - return StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize} +func NewShardScheduler(factory *coordinator.Factory, nodePicker nodepicker.NodePicker, procedureExecutingBatchSize uint32) scheduler.Scheduler { + return schedulerImpl{factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize} } -func (s StaticTopologyShardScheduler) UpdateDeployMode(_ context.Context, _ bool) { +func (s schedulerImpl) Name() string { + return "static_scheduler" +} + +func (s schedulerImpl) UpdateDeployMode(_ context.Context, _ bool) { // StaticTopologyShardScheduler do not need deployMode. } -func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { +func (s schedulerImpl) AddShardAffinityRule(_ context.Context, _ scheduler.ShardAffinityRule) error { + return ErrNotImplemented.WithCausef("static topology scheduler doesn't support shard affinity") +} + +func (s schedulerImpl) RemoveShardAffinityRule(_ context.Context, _ storage.ShardID) error { + return ErrNotImplemented.WithCausef("static topology scheduler doesn't support shard affinity") +} + +func (s schedulerImpl) ListShardAffinityRule(_ context.Context) (scheduler.ShardAffinityRule, error) { + return scheduler.ShardAffinityRule{}, ErrNotImplemented.WithCausef("static topology scheduler doesn't support shard affinity") +} + +func (s schedulerImpl) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (scheduler.ScheduleResult, error) { var procedures []procedure.Procedure var reasons strings.Builder switch clusterSnapshot.Topology.ClusterView.State { case storage.ClusterStateEmpty: - return ScheduleResult{}, nil + return scheduler.ScheduleResult{}, nil case storage.ClusterStatePrepare: unassignedShardIds := make([]storage.ShardID, 0, len(clusterSnapshot.Topology.ShardViewsMapping)) for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping { @@ -45,10 +63,13 @@ func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnaps } unassignedShardIds = append(unassignedShardIds, shardView.ShardID) } + pickConfig := nodepicker.Config{ + NumTotalShards: uint32(len(clusterSnapshot.Topology.ShardViewsMapping)), + } // Assign shards - shardNodeMapping, err := s.nodePicker.PickNode(ctx, unassignedShardIds, uint32(len(clusterSnapshot.Topology.ShardViewsMapping)), clusterSnapshot.RegisteredNodes) + shardNodeMapping, err := s.nodePicker.PickNode(ctx, pickConfig, unassignedShardIds, clusterSnapshot.RegisteredNodes) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } for shardID, node := range shardNodeMapping { // Shard exists and ShardNode not exists. @@ -59,7 +80,7 @@ func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnaps NewLeaderNodeName: node.Node.Name, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardID, node.Node.Name)) @@ -72,7 +93,7 @@ func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnaps shardNode := clusterSnapshot.Topology.ClusterView.ShardNodes[i] node, err := findOnlineNodeByName(shardNode.NodeName, clusterSnapshot.RegisteredNodes) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } if !containsShard(node.ShardInfos, shardNode.ID) { // Shard need to be reopened @@ -83,7 +104,7 @@ func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnaps NewLeaderNodeName: node.Node.Name, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name)) @@ -95,7 +116,7 @@ func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnaps } if len(procedures) == 0 { - return ScheduleResult{}, nil + return scheduler.ScheduleResult{}, nil } batchProcedure, err := s.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ @@ -103,10 +124,10 @@ func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnaps BatchType: procedure.TransferLeader, }) if err != nil { - return ScheduleResult{}, err + return scheduler.ScheduleResult{}, err } - return ScheduleResult{batchProcedure, reasons.String()}, nil + return scheduler.ScheduleResult{Procedure: batchProcedure, Reason: reasons.String()}, nil } func findOnlineNodeByName(nodeName string, nodes []metadata.RegisteredNode) (metadata.RegisteredNode, error) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static/scheduler_test.go similarity index 81% rename from server/coordinator/scheduler/static_topology_shard_scheduler_test.go rename to server/coordinator/scheduler/static/scheduler_test.go index 9660a5a6..7c4addc3 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go +++ b/server/coordinator/scheduler/static/scheduler_test.go @@ -1,6 +1,6 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -package scheduler_test +package static_test import ( "context" @@ -8,7 +8,8 @@ import ( "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure/test" - "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/nodepicker" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler/static" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -19,7 +20,7 @@ func TestStaticTopologyScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) + s := static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/service/http/api.go b/server/service/http/api.go index 53cd211c..361b792c 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go @@ -18,6 +18,7 @@ import ( "github.com/CeresDB/ceresmeta/server/config" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/scheduler" "github.com/CeresDB/ceresmeta/server/limiter" "github.com/CeresDB/ceresmeta/server/member" "github.com/CeresDB/ceresmeta/server/status" @@ -55,6 +56,9 @@ func (a *API) NewAPIRouter() *Router { router.Post("/cluster", wrap(a.createCluster, true, a.forwardClient)) router.Put(fmt.Sprintf("/cluster/:%s", clusterNameParam), wrap(a.updateCluster, true, a.forwardClient)) router.Get(fmt.Sprintf("/cluster/:%s/procedure", clusterNameParam), wrap(a.listProcedures, true, a.forwardClient)) + router.Get(fmt.Sprintf("/cluster/:%s/shardAffinities", clusterNameParam), wrap(a.listShardAffinities, true, a.forwardClient)) + router.Post(fmt.Sprintf("/cluster/:%s/shardAffinities", clusterNameParam), wrap(a.addShardAffinities, true, a.forwardClient)) + router.Del(fmt.Sprintf("/cluster/:%s/shardAffinities", clusterNameParam), wrap(a.removeShardAffinities, true, a.forwardClient)) router.Post("/table/query", wrap(a.queryTable, true, a.forwardClient)) router.Get(fmt.Sprintf("/cluster/:%s/deployMode", clusterNameParam), wrap(a.getDeployMode, true, a.forwardClient)) router.Put(fmt.Sprintf("/cluster/:%s/deployMode", clusterNameParam), wrap(a.updateDeployMode, true, a.forwardClient)) @@ -298,7 +302,7 @@ func (a *API) createCluster(req *http.Request) apiFuncResult { func (a *API) updateCluster(req *http.Request) apiFuncResult { clusterName := Param(req.Context(), clusterNameParam) if len(clusterName) == 0 { - return errResult(ErrParseRequest, "clusterName cloud not be empty") + return errResult(ErrParseRequest, "clusterName could not be empty") } var updateClusterRequest UpdateClusterRequest @@ -366,7 +370,7 @@ func (a *API) listProcedures(req *http.Request) apiFuncResult { ctx := req.Context() clusterName := Param(ctx, clusterNameParam) if len(clusterName) == 0 { - return errResult(ErrParseRequest, "clusterName cloud not be empty") + return errResult(ErrParseRequest, "clusterName could not be empty") } c, err := a.clusterManager.GetCluster(ctx, clusterName) @@ -383,6 +387,89 @@ func (a *API) listProcedures(req *http.Request) apiFuncResult { return okResult(infos) } +func (a *API) listShardAffinities(req *http.Request) apiFuncResult { + ctx := req.Context() + clusterName := Param(ctx, clusterNameParam) + if len(clusterName) == 0 { + return errResult(ErrParseRequest, "clusterName could not be empty") + } + + c, err := a.clusterManager.GetCluster(ctx, clusterName) + if err != nil { + return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error())) + } + + affinityRules, err := c.GetSchedulerManager().ListShardAffinityRules(ctx) + if err != nil { + return errResult(ErrListAffinityRules, fmt.Sprintf("err: %v", err)) + } + + return okResult(affinityRules) +} + +func (a *API) addShardAffinities(req *http.Request) apiFuncResult { + ctx := req.Context() + clusterName := Param(ctx, clusterNameParam) + if len(clusterName) == 0 { + return errResult(ErrParseRequest, "clusterName could not be empty") + } + + var affinities []scheduler.ShardAffinity + err := json.NewDecoder(req.Body).Decode(&affinities) + if err != nil { + log.Error("decode request body failed", zap.Error(err)) + return errResult(ErrParseRequest, err.Error()) + } + + log.Info("try to apply shard affinity rule", zap.String("cluster", clusterName), zap.String("affinity", fmt.Sprintf("%+v", affinities))) + + c, err := a.clusterManager.GetCluster(ctx, clusterName) + if err != nil { + return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error())) + } + + err = c.GetSchedulerManager().AddShardAffinityRule(ctx, scheduler.ShardAffinityRule{Affinities: affinities}) + if err != nil { + log.Error("failed to apply shard affinity rule", zap.String("cluster", clusterName), zap.String("affinity", fmt.Sprintf("%+v", affinities))) + return errResult(ErrAddAffinityRule, fmt.Sprintf("err: %v", err)) + } + + log.Info("finish applying shard affinity rule", zap.String("cluster", clusterName), zap.String("rules", fmt.Sprintf("%+v", affinities))) + + return okResult(nil) +} + +func (a *API) removeShardAffinities(req *http.Request) apiFuncResult { + ctx := req.Context() + clusterName := Param(ctx, clusterNameParam) + if len(clusterName) == 0 { + return errResult(ErrParseRequest, "clusterName could not be empty") + } + + var decodedReq RemoveShardAffinitiesRequest + err := json.NewDecoder(req.Body).Decode(&decodedReq) + if err != nil { + log.Error("decode request body failed", zap.Error(err)) + return errResult(ErrParseRequest, err.Error()) + } + + c, err := a.clusterManager.GetCluster(ctx, clusterName) + if err != nil { + return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error())) + } + + for _, shardID := range decodedReq.ShardIDs { + log.Info("try to remove shard affinity rule", zap.String("cluster", clusterName), zap.Int("shardID", int(shardID))) + err := c.GetSchedulerManager().RemoveShardAffinityRule(ctx, shardID) + if err != nil { + log.Error("failed to remove shard affinity rule", zap.String("cluster", clusterName), zap.Int("shardID", int(shardID)), zap.Error(err)) + return errResult(ErrRemoveAffinityRule, fmt.Sprintf("err: %s", err)) + } + } + + return okResult(nil) +} + func (a *API) queryTable(r *http.Request) apiFuncResult { var req QueryTableRequest err := json.NewDecoder(r.Body).Decode(&req) diff --git a/server/service/http/error.go b/server/service/http/error.go index c60e9fd1..473630ca 100644 --- a/server/service/http/error.go +++ b/server/service/http/error.go @@ -5,23 +5,26 @@ package http import "github.com/CeresDB/ceresmeta/pkg/coderr" var ( - ErrParseRequest = coderr.NewCodeError(coderr.BadRequest, "parse request params") - ErrTable = coderr.NewCodeError(coderr.Internal, "table") - ErrRoute = coderr.NewCodeError(coderr.Internal, "route table") - ErrGetNodeShards = coderr.NewCodeError(coderr.Internal, "get node shards") - ErrCreateProcedure = coderr.NewCodeError(coderr.Internal, "create procedure") - ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit procedure") - ErrGetCluster = coderr.NewCodeError(coderr.Internal, "get cluster") - ErrAllocShardID = coderr.NewCodeError(coderr.Internal, "alloc shard id") - ErrForwardToLeader = coderr.NewCodeError(coderr.Internal, "forward to leader") - ErrParseLeaderAddr = coderr.NewCodeError(coderr.Internal, "parse leader addr") - ErrHealthCheck = coderr.NewCodeError(coderr.Internal, "server health check") - ErrParseTopology = coderr.NewCodeError(coderr.Internal, "parse topology type") - ErrUpdateFlowLimiter = coderr.NewCodeError(coderr.Internal, "update flow limiter") - ErrGetDeployMode = coderr.NewCodeError(coderr.Internal, "get deploy mode") - ErrUpdateDeployMode = coderr.NewCodeError(coderr.Internal, "update deploy mode") - ErrAddLearner = coderr.NewCodeError(coderr.Internal, "add member as learner") - ErrListMembers = coderr.NewCodeError(coderr.Internal, "get member list") - ErrRemoveMembers = coderr.NewCodeError(coderr.Internal, "remove member") - ErrGetMember = coderr.NewCodeError(coderr.Internal, "get member") + ErrParseRequest = coderr.NewCodeError(coderr.BadRequest, "parse request params") + ErrTable = coderr.NewCodeError(coderr.Internal, "table") + ErrRoute = coderr.NewCodeError(coderr.Internal, "route table") + ErrGetNodeShards = coderr.NewCodeError(coderr.Internal, "get node shards") + ErrCreateProcedure = coderr.NewCodeError(coderr.Internal, "create procedure") + ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit procedure") + ErrGetCluster = coderr.NewCodeError(coderr.Internal, "get cluster") + ErrAllocShardID = coderr.NewCodeError(coderr.Internal, "alloc shard id") + ErrForwardToLeader = coderr.NewCodeError(coderr.Internal, "forward to leader") + ErrParseLeaderAddr = coderr.NewCodeError(coderr.Internal, "parse leader addr") + ErrHealthCheck = coderr.NewCodeError(coderr.Internal, "server health check") + ErrParseTopology = coderr.NewCodeError(coderr.Internal, "parse topology type") + ErrUpdateFlowLimiter = coderr.NewCodeError(coderr.Internal, "update flow limiter") + ErrGetDeployMode = coderr.NewCodeError(coderr.Internal, "get deploy mode") + ErrUpdateDeployMode = coderr.NewCodeError(coderr.Internal, "update deploy mode") + ErrAddLearner = coderr.NewCodeError(coderr.Internal, "add member as learner") + ErrListMembers = coderr.NewCodeError(coderr.Internal, "get member list") + ErrRemoveMembers = coderr.NewCodeError(coderr.Internal, "remove member") + ErrGetMember = coderr.NewCodeError(coderr.Internal, "get member") + ErrListAffinityRules = coderr.NewCodeError(coderr.Internal, "list affinity rules") + ErrAddAffinityRule = coderr.NewCodeError(coderr.Internal, "add affinity rule") + ErrRemoveAffinityRule = coderr.NewCodeError(coderr.Internal, "remove affinity rule") ) diff --git a/server/service/http/types.go b/server/service/http/types.go index a352839c..c6f0f4b7 100644 --- a/server/service/http/types.go +++ b/server/service/http/types.go @@ -141,3 +141,7 @@ type UpdateFlowLimiterRequest struct { type UpdateDeployModeRequest struct { Enable bool `json:"enable"` } + +type RemoveShardAffinitiesRequest struct { + ShardIDs []storage.ShardID `json:"shardIDs"` +}