From 543a4a2bf7b4e528be285ea15795e61c74dcd92b Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 22 Nov 2021 22:01:08 +0800 Subject: [PATCH] feat: dfdaemon get scheduler list dynamically from manager (#812) * feat: dfdaemon get scheduler list dynamically from manager Signed-off-by: Gaius --- api/manager/docs.go | 6 + api/manager/swagger.json | 6 + api/manager/swagger.yaml | 4 + client/config/dynconfig.go | 199 ++++++++ client/config/dynconfig_test.go | 475 ++++++++++++++++++ client/config/mocks/manager_client_mock.go | 122 +++++ client/config/peerhost.go | 33 +- client/config/peerhost_darwin.go | 5 + client/config/peerhost_linux.go | 5 + client/config/peerhost_test.go | 6 + client/config/testdata/config/daemon.yaml | 5 + client/daemon/daemon.go | 92 +++- client/daemon/peer/peertask_dummy.go | 4 + .../test/mock/scheduler/scheduler_client.go | 55 +- deploy/helm-charts | 2 +- docs/en/api-reference/api-reference.md | 2 + docs/en/deployment/configuration/dfget.yaml | 14 +- docs/en/deployment/configuration/manager.yaml | 23 +- docs/zh-CN/api-reference/api-reference.md | 2 + .../zh-CN/deployment/configuration/dfget.yaml | 16 +- .../deployment/configuration/manager.yaml | 39 +- internal/dynconfig/dynconfig_manager.go | 2 +- manager/cache/cache.go | 4 +- manager/job/preheat.go | 2 +- manager/searcher/searcher.go | 22 +- manager/service/cdn_cluster.go | 14 +- manager/service/service_grpc.go | 10 +- manager/types/cdn_cluster.go | 14 +- pkg/rpc/client.go | 2 + pkg/rpc/scheduler/client/client.go | 2 + scheduler/config/dynconfig.go | 12 +- scheduler/config/dynconfig_test.go | 6 +- scheduler/job/job.go | 2 +- scheduler/scheduler.go | 13 +- 34 files changed, 1106 insertions(+), 114 deletions(-) create mode 100644 client/config/dynconfig.go create mode 100644 client/config/dynconfig_test.go create mode 100644 client/config/mocks/manager_client_mock.go diff --git a/api/manager/docs.go b/api/manager/docs.go index e7744b9ac..cd0518388 100644 --- a/api/manager/docs.go +++ b/api/manager/docs.go @@ -3945,6 +3945,9 @@ var doc = `{ "config": { "$ref": "#/definitions/types.CDNClusterConfig" }, + "is_default": { + "type": "boolean" + }, "name": { "type": "string" } @@ -4348,6 +4351,9 @@ var doc = `{ "config": { "$ref": "#/definitions/types.CDNClusterConfig" }, + "is_default": { + "type": "boolean" + }, "name": { "type": "string" } diff --git a/api/manager/swagger.json b/api/manager/swagger.json index e5ece420e..829403a0c 100644 --- a/api/manager/swagger.json +++ b/api/manager/swagger.json @@ -3931,6 +3931,9 @@ "config": { "$ref": "#/definitions/types.CDNClusterConfig" }, + "is_default": { + "type": "boolean" + }, "name": { "type": "string" } @@ -4334,6 +4337,9 @@ "config": { "$ref": "#/definitions/types.CDNClusterConfig" }, + "is_default": { + "type": "boolean" + }, "name": { "type": "string" } diff --git a/api/manager/swagger.yaml b/api/manager/swagger.yaml index 52a3893d7..9f0434e34 100644 --- a/api/manager/swagger.yaml +++ b/api/manager/swagger.yaml @@ -317,6 +317,8 @@ definitions: type: string config: $ref: '#/definitions/types.CDNClusterConfig' + is_default: + type: boolean name: type: string required: @@ -591,6 +593,8 @@ definitions: type: string config: $ref: '#/definitions/types.CDNClusterConfig' + is_default: + type: boolean name: type: string type: object diff --git a/client/config/dynconfig.go b/client/config/dynconfig.go new file mode 100644 index 000000000..5f4586814 --- /dev/null +++ b/client/config/dynconfig.go @@ -0,0 +1,199 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "os" + "path/filepath" + "time" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/dfpath" + internaldynconfig "d7y.io/dragonfly/v2/internal/dynconfig" + "d7y.io/dragonfly/v2/manager/searcher" + "d7y.io/dragonfly/v2/pkg/rpc/manager" + managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" +) + +var ( + // Dynconfig configure the cache path + cachePath = filepath.Join(dfpath.DefaultCacheDir, "daemon_dynconfig") + + // Watch dynconfig interval + watchInterval = 5 * time.Second +) + +type DynconfigData struct { + Schedulers []*manager.Scheduler +} + +type Dynconfig interface { + // Get the dynamic config from manager. + GetSchedulers() ([]*manager.Scheduler, error) + + // Get the dynamic config from manager. + Get() (*DynconfigData, error) + + // Register allows an instance to register itself to listen/observe events. + Register(Observer) + + // Deregister allows an instance to remove itself from the collection of observers/listeners. + Deregister(Observer) + + // Notify publishes new events to listeners. + Notify() error + + // Serve the dynconfig listening service. + Serve() error + + // Stop the dynconfig listening service. + Stop() error +} + +type Observer interface { + // OnNotify allows an event to be "published" to interface implementations. + OnNotify(*DynconfigData) +} + +type dynconfig struct { + *internaldynconfig.Dynconfig + observers map[Observer]struct{} + done chan bool +} + +func NewDynconfig(managerClient internaldynconfig.ManagerClient, expire time.Duration) (Dynconfig, error) { + client, err := internaldynconfig.New( + internaldynconfig.ManagerSourceType, + internaldynconfig.WithManagerClient(managerClient), + internaldynconfig.WithExpireTime(expire), + internaldynconfig.WithCachePath(cachePath), + ) + if err != nil { + return nil, err + } + + return &dynconfig{ + observers: map[Observer]struct{}{}, + done: make(chan bool), + Dynconfig: client, + }, nil +} + +func (d *dynconfig) GetSchedulers() ([]*manager.Scheduler, error) { + data, err := d.Get() + if err != nil { + return nil, err + } + + return data.Schedulers, nil +} + +func (d *dynconfig) Get() (*DynconfigData, error) { + var data DynconfigData + if err := d.Unmarshal(&data); err != nil { + return nil, err + } + + return &data, nil +} + +func (d *dynconfig) Register(l Observer) { + d.observers[l] = struct{}{} +} + +func (d *dynconfig) Deregister(l Observer) { + delete(d.observers, l) +} + +func (d *dynconfig) Notify() error { + data, err := d.Get() + if err != nil { + return err + } + + for o := range d.observers { + o.OnNotify(data) + } + + return nil +} + +func (d *dynconfig) Serve() error { + if err := d.Notify(); err != nil { + return err + } + + go d.watch() + + return nil +} + +func (d *dynconfig) watch() { + tick := time.NewTicker(watchInterval) + + for { + select { + case <-tick.C: + if err := d.Notify(); err != nil { + logger.Error("dynconfig notify failed", err) + } + case <-d.done: + return + } + } +} + +func (d *dynconfig) Stop() error { + close(d.done) + if err := os.Remove(cachePath); err != nil { + return err + } + + return nil +} + +type managerClient struct { + managerclient.Client + hostOption HostOption +} + +// New the manager client used by dynconfig +func NewManagerClient(client managerclient.Client, hostOption HostOption) internaldynconfig.ManagerClient { + return &managerClient{ + Client: client, + hostOption: hostOption, + } +} + +func (mc *managerClient) Get() (interface{}, error) { + schedulers, err := mc.ListSchedulers(&manager.ListSchedulersRequest{ + SourceType: manager.SourceType_CLIENT_SOURCE, + HostName: mc.hostOption.Hostname, + Ip: mc.hostOption.ListenIP, + HostInfo: map[string]string{ + searcher.ConditionSecurityDomain: mc.hostOption.SecurityDomain, + searcher.ConditionIDC: mc.hostOption.IDC, + searcher.ConditionNetTopology: mc.hostOption.NetTopology, + searcher.ConditionLocation: mc.hostOption.Location, + }, + }) + if err != nil { + return nil, err + } + + return schedulers, nil +} diff --git a/client/config/dynconfig_test.go b/client/config/dynconfig_test.go new file mode 100644 index 000000000..af73ae946 --- /dev/null +++ b/client/config/dynconfig_test.go @@ -0,0 +1,475 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "os" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/client/config/mocks" + "d7y.io/dragonfly/v2/pkg/rpc/manager" +) + +func TestDynconfigNewDynconfig(t *testing.T) { + tests := []struct { + name string + expire time.Duration + hostOption HostOption + cleanFileCache func(t *testing.T) + mock func(m *mocks.MockClientMockRecorder) + expect func(t *testing.T, err error) + }{ + { + name: "new dynconfig succeeded", + expire: 10 * time.Second, + hostOption: HostOption{ + Hostname: "foo", + }, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder) { + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "new dynconfig without empty host option", + expire: 10 * time.Millisecond, + hostOption: HostOption{}, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder) { + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "new dynconfig with list scheduler error", + expire: 10 * time.Millisecond, + hostOption: HostOption{}, + cleanFileCache: func(t *testing.T) {}, + mock: func(m *mocks.MockClientMockRecorder) { + m.ListSchedulers(gomock.Any()).Return(nil, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.Errorf(err, "foo") + }, + }, + { + name: "new dynconfig without expire time", + expire: 0, + hostOption: HostOption{ + Hostname: "foo", + }, + cleanFileCache: func(t *testing.T) {}, + mock: func(m *mocks.MockClientMockRecorder) {}, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.Errorf(err, "missing parameter Expire, use method WithExpireTime to assign") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockManagerClient := mocks.NewMockClient(ctl) + tc.mock(mockManagerClient.EXPECT()) + _, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire) + tc.expect(t, err) + tc.cleanFileCache(t) + }) + } +} + +func TestDynconfigGet(t *testing.T) { + tests := []struct { + name string + expire time.Duration + hostOption HostOption + data *DynconfigData + sleep func() + cleanFileCache func(t *testing.T) + mock func(m *mocks.MockClientMockRecorder, data *DynconfigData) + expect func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) + }{ + { + name: "get dynconfig cache data succeeded", + expire: 10 * time.Second, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler{ + { + HostName: "foo", + }, + }, + }, + sleep: func() {}, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{ + Schedulers: []*manager.Scheduler{ + { + HostName: data.Schedulers[0].HostName, + }, + }, + }, nil).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.Get() + assert.NoError(err) + assert.EqualValues(result, data) + }, + }, + { + name: "get dynconfig data succeeded", + expire: 10 * time.Millisecond, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler{ + { + HostName: "foo", + }, + }, + }, + sleep: func() { + time.Sleep(100 * time.Millisecond) + }, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1), + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{ + Schedulers: []*manager.Scheduler{ + { + HostName: data.Schedulers[0].HostName, + }, + }, + }, nil).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.Get() + assert.NoError(err) + assert.EqualValues(result, data) + }, + }, + { + name: "list schedulers error", + expire: 10 * time.Millisecond, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler{ + { + HostName: "foo", + }, + }, + }, + sleep: func() { + time.Sleep(100 * time.Millisecond) + }, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{ + Schedulers: []*manager.Scheduler{ + { + HostName: data.Schedulers[0].HostName, + }, + }, + }, nil).Times(1), + m.ListSchedulers(gomock.Any()).Return(nil, errors.New("foo")).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.Get() + assert.NoError(err) + assert.EqualValues(result, data) + }, + }, + { + name: "list schedulers empty", + expire: 10 * time.Millisecond, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler(nil), + }, + sleep: func() { + time.Sleep(100 * time.Millisecond) + }, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1), + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.Get() + assert.NoError(err) + assert.EqualValues(result, data) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockManagerClient := mocks.NewMockClient(ctl) + tc.mock(mockManagerClient.EXPECT(), tc.data) + dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire) + if err != nil { + t.Fatal(err) + } + + tc.sleep() + tc.expect(t, dynconfig, tc.data) + tc.cleanFileCache(t) + }) + } +} + +func TestDynconfigGetSchedulers(t *testing.T) { + tests := []struct { + name string + expire time.Duration + hostOption HostOption + data *DynconfigData + sleep func() + cleanFileCache func(t *testing.T) + mock func(m *mocks.MockClientMockRecorder, data *DynconfigData) + expect func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) + }{ + { + name: "get cache schedulers succeeded", + expire: 10 * time.Second, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler{ + { + HostName: "foo", + }, + }, + }, + sleep: func() {}, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{ + Schedulers: []*manager.Scheduler{ + { + HostName: data.Schedulers[0].HostName, + }, + }, + }, nil).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.GetSchedulers() + assert.NoError(err) + assert.EqualValues(result, data.Schedulers) + }, + }, + { + name: "get schedulers succeeded", + expire: 10 * time.Millisecond, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler{ + { + HostName: "foo", + }, + }, + }, + sleep: func() { + time.Sleep(100 * time.Millisecond) + }, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1), + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{ + Schedulers: []*manager.Scheduler{ + { + HostName: data.Schedulers[0].HostName, + }, + }, + }, nil).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.GetSchedulers() + assert.NoError(err) + assert.EqualValues(result, data.Schedulers) + }, + }, + { + name: "list schedulers error", + expire: 10 * time.Millisecond, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler{ + { + HostName: "foo", + }, + }, + }, + sleep: func() { + time.Sleep(100 * time.Millisecond) + }, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{ + Schedulers: []*manager.Scheduler{ + { + HostName: data.Schedulers[0].HostName, + }, + }, + }, nil).Times(1), + m.ListSchedulers(gomock.Any()).Return(nil, errors.New("foo")).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.GetSchedulers() + assert.NoError(err) + assert.EqualValues(result, data.Schedulers) + }, + }, + { + name: "list schedulers empty", + expire: 10 * time.Millisecond, + hostOption: HostOption{ + Hostname: "foo", + }, + data: &DynconfigData{ + Schedulers: []*manager.Scheduler(nil), + }, + sleep: func() { + time.Sleep(100 * time.Millisecond) + }, + cleanFileCache: func(t *testing.T) { + if err := os.Remove(cachePath); err != nil { + t.Fatal(err) + } + }, + mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) { + gomock.InOrder( + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1), + m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1), + ) + }, + expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) { + assert := assert.New(t) + result, err := dynconfig.GetSchedulers() + assert.NoError(err) + assert.EqualValues(result, data.Schedulers) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockManagerClient := mocks.NewMockClient(ctl) + tc.mock(mockManagerClient.EXPECT(), tc.data) + dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire) + if err != nil { + t.Fatal(err) + } + + tc.sleep() + tc.expect(t, dynconfig, tc.data) + tc.cleanFileCache(t) + }) + } +} diff --git a/client/config/mocks/manager_client_mock.go b/client/config/mocks/manager_client_mock.go new file mode 100644 index 000000000..45f5d10ec --- /dev/null +++ b/client/config/mocks/manager_client_mock.go @@ -0,0 +1,122 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: d7y.io/dragonfly/v2/pkg/rpc/manager/client (interfaces: Client) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + time "time" + + manager "d7y.io/dragonfly/v2/pkg/rpc/manager" + gomock "github.com/golang/mock/gomock" +) + +// MockClient is a mock of Client interface. +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder +} + +// MockClientMockRecorder is the mock recorder for MockClient. +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance. +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close)) +} + +// GetScheduler mocks base method. +func (m *MockClient) GetScheduler(arg0 *manager.GetSchedulerRequest) (*manager.Scheduler, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetScheduler", arg0) + ret0, _ := ret[0].(*manager.Scheduler) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetScheduler indicates an expected call of GetScheduler. +func (mr *MockClientMockRecorder) GetScheduler(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScheduler", reflect.TypeOf((*MockClient)(nil).GetScheduler), arg0) +} + +// KeepAlive mocks base method. +func (m *MockClient) KeepAlive(arg0 time.Duration, arg1 *manager.KeepAliveRequest) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "KeepAlive", arg0, arg1) +} + +// KeepAlive indicates an expected call of KeepAlive. +func (mr *MockClientMockRecorder) KeepAlive(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeepAlive", reflect.TypeOf((*MockClient)(nil).KeepAlive), arg0, arg1) +} + +// ListSchedulers mocks base method. +func (m *MockClient) ListSchedulers(arg0 *manager.ListSchedulersRequest) (*manager.ListSchedulersResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListSchedulers", arg0) + ret0, _ := ret[0].(*manager.ListSchedulersResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListSchedulers indicates an expected call of ListSchedulers. +func (mr *MockClientMockRecorder) ListSchedulers(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSchedulers", reflect.TypeOf((*MockClient)(nil).ListSchedulers), arg0) +} + +// UpdateCDN mocks base method. +func (m *MockClient) UpdateCDN(arg0 *manager.UpdateCDNRequest) (*manager.CDN, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateCDN", arg0) + ret0, _ := ret[0].(*manager.CDN) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateCDN indicates an expected call of UpdateCDN. +func (mr *MockClientMockRecorder) UpdateCDN(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateCDN", reflect.TypeOf((*MockClient)(nil).UpdateCDN), arg0) +} + +// UpdateScheduler mocks base method. +func (m *MockClient) UpdateScheduler(arg0 *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateScheduler", arg0) + ret0, _ := ret[0].(*manager.Scheduler) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateScheduler indicates an expected call of UpdateScheduler. +func (mr *MockClientMockRecorder) UpdateScheduler(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateScheduler", reflect.TypeOf((*MockClient)(nil).UpdateScheduler), arg0) +} diff --git a/client/config/peerhost.go b/client/config/peerhost.go index 39a906b3f..8d3c599d0 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -100,6 +100,11 @@ func (p *DaemonOption) Convert() error { p.Host.AdvertiseIP = ip.String() } + // ScheduleTimeout should not great then AliveTime + if p.AliveTime.Duration > 0 && p.Scheduler.ScheduleTimeout.Duration > p.AliveTime.Duration { + p.Scheduler.ScheduleTimeout.Duration = p.AliveTime.Duration - time.Second + } + return nil } @@ -107,24 +112,40 @@ func (p *DaemonOption) Validate() error { if len(p.Scheduler.NetAddrs) == 0 && stringutils.IsBlank(p.ConfigServer) { return errors.New("empty schedulers and config server is not specified") } - // ScheduleTimeout should not great then AliveTime - if p.AliveTime.Duration > 0 && p.Scheduler.ScheduleTimeout.Duration > p.AliveTime.Duration { - p.Scheduler.ScheduleTimeout.Duration = p.AliveTime.Duration - time.Second + + if p.Scheduler.Manager.Enable { + if p.Scheduler.Manager.Addr == "" { + return errors.New("manager addr is not specified") + } + + if p.Scheduler.Manager.RefreshInterval == 0 { + return errors.New("manager refreshInterval is not specified") + } } + return nil } type SchedulerOption struct { + // Manager is to get the scheduler configuration remotely + Manager ManagerOption `mapstructure:"manager" yaml:"manager"` // NetAddrs is scheduler addresses. NetAddrs []dfnet.NetAddr `mapstructure:"netAddrs" yaml:"netAddrs"` - // ScheduleTimeout is request timeout. ScheduleTimeout clientutil.Duration `mapstructure:"scheduleTimeout" yaml:"scheduleTimeout"` - // DisableAutoBackSource indicates not back source normally, only scheduler says back source DisableAutoBackSource bool `mapstructure:"disableAutoBackSource" yaml:"disableAutoBackSource"` } +type ManagerOption struct { + // Enable get configuration from manager + Enable bool `mapstructure:"enable" yaml:"enable"` + // Addr is manager addresse + Addr string `mapstructure:"addr" yaml:"addr"` + // RefreshInterval is the refresh interval + RefreshInterval time.Duration `mapstructure:"refreshInterval" yaml:"refreshInterval"` +} + type HostOption struct { // SecurityDomain is the security domain SecurityDomain string `mapstructure:"securityDomain" yaml:"securityDomain"` @@ -134,6 +155,8 @@ type HostOption struct { IDC string `mapstructure:"idc" yaml:"idc"` // Peerhost net topology for scheduler NetTopology string `mapstructure:"netTopology" yaml:"netTopology"` + // Hostname is daemon host name + Hostname string `mapstructure:"hostname" yaml:"hostname"` // The listen ip for all tcp services of daemon ListenIP string `mapstructure:"listenIP" yaml:"listenIP"` // The ip report to scheduler, normal same with listen ip diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index e0d771399..58029cb20 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -44,6 +44,10 @@ var peerHostConfig = DaemonOption{ GCInterval: clientutil.Duration{Duration: DefaultGCInterval}, KeepStorage: false, Scheduler: SchedulerOption{ + Manager: ManagerOption{ + Enable: false, + RefreshInterval: 5 * time.Minute, + }, NetAddrs: []dfnet.NetAddr{ { Type: dfnet.TCP, @@ -53,6 +57,7 @@ var peerHostConfig = DaemonOption{ ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout}, }, Host: HostOption{ + Hostname: iputils.HostName, ListenIP: net.IPv4zero.String(), AdvertiseIP: iputils.HostIP, SecurityDomain: "", diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index 6109f63d1..be861e218 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -44,6 +44,10 @@ var peerHostConfig = DaemonOption{ GCInterval: clientutil.Duration{Duration: DefaultGCInterval}, KeepStorage: false, Scheduler: SchedulerOption{ + Manager: ManagerOption{ + Enable: false, + RefreshInterval: 5 * time.Minute, + }, NetAddrs: []dfnet.NetAddr{ { Type: dfnet.TCP, @@ -53,6 +57,7 @@ var peerHostConfig = DaemonOption{ ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout}, }, Host: HostOption{ + Hostname: iputils.HostName, ListenIP: "0.0.0.0", AdvertiseIP: iputils.HostIP, SecurityDomain: "", diff --git a/client/config/peerhost_test.go b/client/config/peerhost_test.go index 4f36df613..a452befd7 100644 --- a/client/config/peerhost_test.go +++ b/client/config/peerhost_test.go @@ -228,6 +228,11 @@ func TestPeerHostOption_Load(t *testing.T) { WorkHome: "/tmp/dragonfly/dfdaemon/", KeepStorage: false, Scheduler: SchedulerOption{ + Manager: ManagerOption{ + Enable: false, + Addr: "127.0.0.1:65003", + RefreshInterval: 5 * time.Minute, + }, NetAddrs: []dfnet.NetAddr{ { Type: dfnet.TCP, @@ -239,6 +244,7 @@ func TestPeerHostOption_Load(t *testing.T) { }, }, Host: HostOption{ + Hostname: "d7y.io", SecurityDomain: "d7y.io", Location: "0.0.0.0", IDC: "d7y", diff --git a/client/config/testdata/config/daemon.yaml b/client/config/testdata/config/daemon.yaml index 49a2a20ed..0bf23bf49 100644 --- a/client/config/testdata/config/daemon.yaml +++ b/client/config/testdata/config/daemon.yaml @@ -4,12 +4,17 @@ dataDir: /tmp/dragonfly/dfdaemon/ workHome: /tmp/dragonfly/dfdaemon/ keepStorage: false scheduler: + manager: + enable: false + addr: "127.0.0.1:65003" + refreshInterval: 5m netAddrs: - type: tcp addr: 127.0.0.1:8002 scheduleTimeout: 0 host: + hostname: d7y.io listenIP: 0.0.0.0 advertiseIP: 0.0.0.0 location: 0.0.0.0 diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 7bd4327fa..104a9a44d 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -25,6 +25,7 @@ import ( "net" "net/http" "os" + "reflect" "runtime" "sync" "time" @@ -49,9 +50,10 @@ import ( "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" + "d7y.io/dragonfly/v2/pkg/rpc/manager" + managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" - "d7y.io/dragonfly/v2/pkg/util/net/iputils" ) type Daemon interface { @@ -80,9 +82,11 @@ type clientDaemon struct { PeerTaskManager peer.TaskManager PieceManager peer.PieceManager -} -var _ Daemon = (*clientDaemon)(nil) + dynconfig config.Dynconfig + schedulerAddrs []dfnet.NetAddr + schedulerClient schedulerclient.SchedulerClient +} func New(opt *config.DaemonOption) (Daemon, error) { host := &scheduler.PeerHost{ @@ -90,18 +94,46 @@ func New(opt *config.DaemonOption) (Daemon, error) { Ip: opt.Host.AdvertiseIP, RpcPort: int32(opt.Download.PeerGRPC.TCPListen.PortRange.Start), DownPort: 0, - HostName: iputils.HostName, + HostName: opt.Host.Hostname, SecurityDomain: opt.Host.SecurityDomain, Location: opt.Host.Location, Idc: opt.Host.IDC, NetTopology: opt.Host.NetTopology, } + var addrs []dfnet.NetAddr + var dynconfig config.Dynconfig + if opt.Scheduler.Manager.Enable == true { + // New manager client + managerClient, err := managerclient.New(opt.Scheduler.Manager.Addr) + if err != nil { + return nil, err + } + + // New dynconfig client + if dynconfig, err = config.NewDynconfig( + config.NewManagerClient(managerClient, opt.Host), + opt.Scheduler.Manager.RefreshInterval, + ); err != nil { + return nil, err + } + + // Get schedulers from manager + schedulers, err := dynconfig.GetSchedulers() + if err != nil { + return nil, err + } + + addrs = schedulersToNetAddrs(schedulers) + } else { + addrs = opt.Scheduler.NetAddrs + } + var opts []grpc.DialOption if opt.Options.Telemetry.Jaeger != "" { opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor())) } - sched, err := schedulerclient.GetClientByAddr(opt.Scheduler.NetAddrs, opts...) + sched, err := schedulerclient.GetClientByAddr(addrs, opts...) if err != nil { return nil, errors.Wrap(err, "failed to get schedulers") } @@ -186,6 +218,9 @@ func New(opt *config.DaemonOption) (Daemon, error) { UploadManager: uploadManager, StorageManager: storageManager, GCManager: gc.NewManager(opt.GCInterval.Duration), + dynconfig: dynconfig, + schedulerAddrs: addrs, + schedulerClient: sched, }, nil } @@ -431,6 +466,22 @@ func (cd *clientDaemon) Serve() error { }) } + // serve dynconfig service + if cd.dynconfig != nil { + // dynconfig register client daemon + cd.dynconfig.Register(cd) + + // servce dynconfig + g.Go(func() error { + if err := cd.dynconfig.Serve(); err != nil { + logger.Errorf("dynconfig start failed %v", err) + return err + } + logger.Info("dynconfig start successfully") + return nil + }) + } + werr := g.Wait() cd.Stop() return werr @@ -455,9 +506,40 @@ func (cd *clientDaemon) Stop() { logger.Infof("keep storage disabled") cd.StorageManager.CleanUp() } + + if cd.dynconfig != nil { + if err := cd.dynconfig.Stop(); err != nil { + logger.Errorf("dynconfig client closed failed %s", err) + } + logger.Info("dynconfig client closed") + } }) } +func (cd *clientDaemon) OnNotify(data *config.DynconfigData) { + addrs := schedulersToNetAddrs(data.Schedulers) + if reflect.DeepEqual(cd.schedulerAddrs, addrs) { + return + } + + // Update scheduler client addresses + cd.schedulerClient.UpdateState(addrs) + cd.schedulerAddrs = addrs +} + +// schedulersToNetAddrs coverts []*manager.Scheduler to []dfnet.NetAddr. +func schedulersToNetAddrs(schedulers []*manager.Scheduler) []dfnet.NetAddr { + netAddrs := make([]dfnet.NetAddr, 0, len(schedulers)) + for _, scheduler := range schedulers { + netAddrs = append(netAddrs, dfnet.NetAddr{ + Type: dfnet.TCP, + Addr: fmt.Sprintf("%s:%d", scheduler.HostName, scheduler.Port), + }) + } + + return netAddrs +} + func (cd *clientDaemon) ExportTaskManager() peer.TaskManager { return cd.PeerTaskManager } diff --git a/client/daemon/peer/peertask_dummy.go b/client/daemon/peer/peertask_dummy.go index 9504fa2f8..ef19144a7 100644 --- a/client/daemon/peer/peertask_dummy.go +++ b/client/daemon/peer/peertask_dummy.go @@ -23,6 +23,7 @@ import ( "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" + "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" ) @@ -50,6 +51,9 @@ func (d *dummySchedulerClient) Close() error { return nil } +func (d *dummySchedulerClient) UpdateState(addrs []dfnet.NetAddr) { +} + type dummyPeerPacketStream struct { } diff --git a/client/daemon/test/mock/scheduler/scheduler_client.go b/client/daemon/test/mock/scheduler/scheduler_client.go index 703d50aec..87fb54034 100644 --- a/client/daemon/test/mock/scheduler/scheduler_client.go +++ b/client/daemon/test/mock/scheduler/scheduler_client.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ../../../../../pkg/rpc/scheduler/client/client.go +// Source: d7y.io/dragonfly/v2/pkg/rpc/scheduler/client (interfaces: SchedulerClient) // Package mock_client is a generated GoMock package. package mock_client @@ -8,6 +8,7 @@ import ( context "context" reflect "reflect" + dfnet "d7y.io/dragonfly/v2/pkg/basic/dfnet" scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler" client "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" gomock "github.com/golang/mock/gomock" @@ -52,10 +53,10 @@ func (mr *MockSchedulerClientMockRecorder) Close() *gomock.Call { } // LeaveTask mocks base method. -func (m *MockSchedulerClient) LeaveTask(ctx context.Context, pt *scheduler.PeerTarget, opts ...grpc.CallOption) error { +func (m *MockSchedulerClient) LeaveTask(arg0 context.Context, arg1 *scheduler.PeerTarget, arg2 ...grpc.CallOption) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, pt} - for _, a := range opts { + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "LeaveTask", varargs...) @@ -64,17 +65,17 @@ func (m *MockSchedulerClient) LeaveTask(ctx context.Context, pt *scheduler.PeerT } // LeaveTask indicates an expected call of LeaveTask. -func (mr *MockSchedulerClientMockRecorder) LeaveTask(ctx, pt interface{}, opts ...interface{}) *gomock.Call { +func (mr *MockSchedulerClientMockRecorder) LeaveTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, pt}, opts...) + varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LeaveTask", reflect.TypeOf((*MockSchedulerClient)(nil).LeaveTask), varargs...) } // RegisterPeerTask mocks base method. -func (m *MockSchedulerClient) RegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) { +func (m *MockSchedulerClient) RegisterPeerTask(arg0 context.Context, arg1 *scheduler.PeerTaskRequest, arg2 ...grpc.CallOption) (*scheduler.RegisterResult, error) { m.ctrl.T.Helper() - varargs := []interface{}{ctx, ptr} - for _, a := range opts { + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "RegisterPeerTask", varargs...) @@ -84,17 +85,17 @@ func (m *MockSchedulerClient) RegisterPeerTask(ctx context.Context, ptr *schedul } // RegisterPeerTask indicates an expected call of RegisterPeerTask. -func (mr *MockSchedulerClientMockRecorder) RegisterPeerTask(ctx, ptr interface{}, opts ...interface{}) *gomock.Call { +func (mr *MockSchedulerClientMockRecorder) RegisterPeerTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, ptr}, opts...) + varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterPeerTask", reflect.TypeOf((*MockSchedulerClient)(nil).RegisterPeerTask), varargs...) } // ReportPeerResult mocks base method. -func (m *MockSchedulerClient) ReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error { +func (m *MockSchedulerClient) ReportPeerResult(arg0 context.Context, arg1 *scheduler.PeerResult, arg2 ...grpc.CallOption) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, pr} - for _, a := range opts { + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "ReportPeerResult", varargs...) @@ -103,17 +104,17 @@ func (m *MockSchedulerClient) ReportPeerResult(ctx context.Context, pr *schedule } // ReportPeerResult indicates an expected call of ReportPeerResult. -func (mr *MockSchedulerClientMockRecorder) ReportPeerResult(ctx, pr interface{}, opts ...interface{}) *gomock.Call { +func (mr *MockSchedulerClientMockRecorder) ReportPeerResult(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, pr}, opts...) + varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportPeerResult", reflect.TypeOf((*MockSchedulerClient)(nil).ReportPeerResult), varargs...) } // ReportPieceResult mocks base method. -func (m *MockSchedulerClient) ReportPieceResult(ctx context.Context, taskId string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (client.PeerPacketStream, error) { +func (m *MockSchedulerClient) ReportPieceResult(arg0 context.Context, arg1 string, arg2 *scheduler.PeerTaskRequest, arg3 ...grpc.CallOption) (client.PeerPacketStream, error) { m.ctrl.T.Helper() - varargs := []interface{}{ctx, taskId, ptr} - for _, a := range opts { + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "ReportPieceResult", varargs...) @@ -123,8 +124,20 @@ func (m *MockSchedulerClient) ReportPieceResult(ctx context.Context, taskId stri } // ReportPieceResult indicates an expected call of ReportPieceResult. -func (mr *MockSchedulerClientMockRecorder) ReportPieceResult(ctx, taskId, ptr interface{}, opts ...interface{}) *gomock.Call { +func (mr *MockSchedulerClientMockRecorder) ReportPieceResult(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, taskId, ptr}, opts...) + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportPieceResult", reflect.TypeOf((*MockSchedulerClient)(nil).ReportPieceResult), varargs...) } + +// UpdateState mocks base method. +func (m *MockSchedulerClient) UpdateState(arg0 []dfnet.NetAddr) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateState", arg0) +} + +// UpdateState indicates an expected call of UpdateState. +func (mr *MockSchedulerClientMockRecorder) UpdateState(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockSchedulerClient)(nil).UpdateState), arg0) +} diff --git a/deploy/helm-charts b/deploy/helm-charts index e3d9e72f6..30fa43099 160000 --- a/deploy/helm-charts +++ b/deploy/helm-charts @@ -1 +1 @@ -Subproject commit e3d9e72f64bb2328edeb574872d873d1336349b1 +Subproject commit 30fa43099a8568aa4d4ed2398c3175ed8f014985 diff --git a/docs/en/api-reference/api-reference.md b/docs/en/api-reference/api-reference.md index 6fd4145c2..d13240f52 100644 --- a/docs/en/api-reference/api-reference.md +++ b/docs/en/api-reference/api-reference.md @@ -3690,6 +3690,7 @@ delete role by uri config |---|---| |**bio**
*optional*|string| |**config**
*required*|[types.CDNClusterConfig](#types-cdnclusterconfig)| +|**is_default**
*optional*|boolean| |**name**
*required*|string| @@ -3902,6 +3903,7 @@ delete role by uri config |---|---| |**bio**
*optional*|string| |**config**
*optional*|[types.CDNClusterConfig](#types-cdnclusterconfig)| +|**is_default**
*optional*|boolean| |**name**
*optional*|string| diff --git a/docs/en/deployment/configuration/dfget.yaml b/docs/en/deployment/configuration/dfget.yaml index ce557b012..e2a4b193f 100644 --- a/docs/en/deployment/configuration/dfget.yaml +++ b/docs/en/deployment/configuration/dfget.yaml @@ -25,6 +25,13 @@ keepStorage: true # daemon will send tasks to a fixed scheduler by hashing the task url and meta data # caution: only tcp is supported scheduler: + manager: + # get scheduler list dynamically from manager + enable: false + # manager service address + addr: 127.0.0.1:65003 + # scheduler list refresh interval + refreshInterval: 5m # schedule timeout scheduleTimeout: 30s # when true, only scheduler says back source, daemon can back source @@ -49,11 +56,16 @@ host: # access ip for other peers # when local ip is different with access ip, advertiseIP should be set advertiseIP: 0.0.0.0 - # geographical location and network topology + # geographical location, separated by "|" characters location: "" + # idc deployed by daemon idc: "" + # security domain deployed by daemon, network isolation between different security domains securityDomain: "" + # network topology, separated by "|" characters netTopology: "" + # daemon hostname + # hostname: "" # download service option download: diff --git a/docs/en/deployment/configuration/manager.yaml b/docs/en/deployment/configuration/manager.yaml index 1235fdcbe..136fac28e 100644 --- a/docs/en/deployment/configuration/manager.yaml +++ b/docs/en/deployment/configuration/manager.yaml @@ -34,16 +34,15 @@ database: host: dragonfly port: 6379 db: 0 - # manager server cache -cache: - # redis cache configure - redis: - # cache ttl configure unit[nanosecond] - ttl: 30000000000 - # local cache configure - local: - # lfu cache size - size: 10000 - # cache ttl configure unit[nanosecond] - ttl: 30000000000 +# cache: +# # redis cache configure +# redis: +# # cache ttl configure +# ttl: 30s +# # local cache configure +# local: +# # lfu cache size +# size: 10000 +# # cache ttl configure +# ttl: 30s diff --git a/docs/zh-CN/api-reference/api-reference.md b/docs/zh-CN/api-reference/api-reference.md index e4f9b4aed..1a9fa135d 100644 --- a/docs/zh-CN/api-reference/api-reference.md +++ b/docs/zh-CN/api-reference/api-reference.md @@ -3690,6 +3690,7 @@ delete role by uri config |---|---| |**bio**
*可选*|string| |**config**
*必填*|[types.CDNClusterConfig](#types-cdnclusterconfig)| +|**is_default**
*可选*|boolean| |**name**
*必填*|string| @@ -3902,6 +3903,7 @@ delete role by uri config |---|---| |**bio**
*可选*|string| |**config**
*可选*|[types.CDNClusterConfig](#types-cdnclusterconfig)| +|**is_default**
*可选*|boolean| |**name**
*可选*|string| diff --git a/docs/zh-CN/deployment/configuration/dfget.yaml b/docs/zh-CN/deployment/configuration/dfget.yaml index 203e40195..da2c115a9 100644 --- a/docs/zh-CN/deployment/configuration/dfget.yaml +++ b/docs/zh-CN/deployment/configuration/dfget.yaml @@ -24,6 +24,13 @@ keepStorage: true # 尽量使用同一个地区的调度器. # daemon 将会根据 task id 来进行一致性 hash 来选择所有配置的调度器 scheduler: + manager: + # 通过 manager 接口动态获取 scheduler 列表 + enable: false + # manager 服务地址 + addr: 127.0.0.1:65003 + # scheduler 列表刷新时间 + refreshInterval: 5m # 调度超时 scheduleTimeout: 30s # 是否禁用回源,禁用回源后,在调度失败时不在 daemon 回源,直接返错 @@ -47,11 +54,16 @@ host: # 访问 IP 地址 # 其他 daemon 可以通过这个 IP 地址连接过来 advertiseIP: 0.0.0.0 - # 地理信息和网络地址 + # 地理信息, 通过 "|" 符号分隔 location: "" + # 机房信息 idc: "" + # 安全域信息,不同安全域之间网络隔离 securityDomain: "" + # 网络拓扑结构,通过 "|" 符号分隔 netTopology: "" + # 主机名称 + # hostname: "" # 下载服务选项 download: @@ -204,4 +216,4 @@ proxy: # 端口白名单 ports: # - 80 - # - 443 \ No newline at end of file + # - 443 diff --git a/docs/zh-CN/deployment/configuration/manager.yaml b/docs/zh-CN/deployment/configuration/manager.yaml index 001b8fe75..96afa9ca0 100644 --- a/docs/zh-CN/deployment/configuration/manager.yaml +++ b/docs/zh-CN/deployment/configuration/manager.yaml @@ -1,47 +1,46 @@ # 此文件是 manager 的配置文件模板,你可以通过根据需要改变对应的值来配置 manager 服务。 --- -# 当前的服务配置 +# 当前的服务配置 server: # grpc 服务配置 grpc: # 监听的 ip 地址 listen: 127.0.0.1 - # 监听的端口, manager 会从 start 到 end 之间的按顺序中选择一个可用端口 + # 监听的端口, manager 会从 start 到 end 之间的按顺序中选择一个可用端口 port: start: 65003 end: 65003 - # rest 服务配置 + # rest 服务配置 rest: - # 标准的 rest 服务地址: ip:port, ip 不配置则默认为0.0.0.0 + # 标准的 rest 服务地址: ip:port, ip 不配置则默认为0.0.0.0 addr: :8080 # 前端控制台资源路径 # publicPath: /dist -# 数据库配置, 当前只支持 mysql 以及 redis +# 数据库配置, 当前只支持 mysql 以及 redis database: - # mysql 配置 + # mysql 配置 mysql: user: dragonfly password: dragonfly host: dragonfly port: 3306 dbname: manager - # redis 配置 + # redis 配置 redis: password: dragonfly host: dragonfly port: 6379 db: 0 - -# 缓存配置 -cache: - # redis 缓存配置 - redis: - # ttl 配置,单位[纳秒] - ttl: 30000000000 - # 本地缓存配置 - local: - # LFU 缓存大小 - size: 10000 - # ttl 配置,单位[纳秒] - ttl: 3000000000 +# 缓存配置 +# cache: +# # redis 缓存配置 +# redis: +# # ttl 配置 +# ttl: 30s +# # 本地缓存配置 +# local: +# # LFU 缓存大小 +# size: 10000 +# # ttl 配置 +# ttl: 30s diff --git a/internal/dynconfig/dynconfig_manager.go b/internal/dynconfig/dynconfig_manager.go index 00b611bc5..f682a6209 100644 --- a/internal/dynconfig/dynconfig_manager.go +++ b/internal/dynconfig/dynconfig_manager.go @@ -60,7 +60,7 @@ func (d *dynconfigManager) get() (interface{}, error) { // Cache has expired // Reload and ignore client request error if err := d.load(); err != nil { - logger.Warn("reload failed", err) + logger.Warn("reload failed ", err) } dynconfig, ok := d.cache.Get(defaultCacheKey) diff --git a/manager/cache/cache.go b/manager/cache/cache.go index 3f1023bb2..8e8a8a826 100644 --- a/manager/cache/cache.go +++ b/manager/cache/cache.go @@ -72,6 +72,6 @@ func MakeSchedulerCacheKey(hostname string, clusterID uint) string { return MakeCacheKey(SchedulerNamespace, fmt.Sprintf("%s-%d", hostname, clusterID)) } -func MakeSchedulersCacheKey(hostname string) string { - return MakeCacheKey(SchedulersNamespace, hostname) +func MakeSchedulersCacheKey(hostname, ip string) string { + return MakeCacheKey(SchedulersNamespace, fmt.Sprintf("%s-%s", hostname, ip)) } diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 864019466..cee9fb3a0 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -160,7 +160,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.Prehe return nil, err } - logger.Infof("create preheat group job successed, group uuid: %s, urls:%s", group.GroupUUID, urls) + logger.Infof("create preheat group job succeeded, group uuid: %s, urls: %s", group.GroupUUID, urls) return &internaljob.GroupJobState{ GroupUUID: group.GroupUUID, State: machineryv1tasks.StatePending, diff --git a/manager/searcher/searcher.go b/manager/searcher/searcher.go index 4c5b5a142..5d7b4e9ab 100644 --- a/manager/searcher/searcher.go +++ b/manager/searcher/searcher.go @@ -28,17 +28,17 @@ import ( ) const ( - // Condition IDC key - conditionIDC = "idc" + // Condition security domain key + ConditionSecurityDomain = "security_domain" - // Condition location key - conditionLocation = "location" + // Condition IDC key + ConditionIDC = "idc" // Condition netTopology key - conditionNetTopology = "net_topology" + ConditionNetTopology = "net_topology" - // Condition security domain key - conditionSecurityDomain = "security_domain" + // Condition location key + ConditionLocation = "location" ) const ( @@ -96,7 +96,7 @@ func (s *searcher) FindSchedulerCluster(schedulerClusters []model.SchedulerClust // If the security domain condition does not exist, it will match all scheduler security domains. // Then use clusters sets to score according to scopes. var clusters []model.SchedulerCluster - securityDomain := conditions[conditionSecurityDomain] + securityDomain := conditions[ConditionSecurityDomain] if securityDomain == "" { logger.Infof("client %s %s have empty security domain", client.HostName, client.Ip) } @@ -146,9 +146,9 @@ func (s *searcher) FindSchedulerCluster(schedulerClusters []model.SchedulerClust // Evaluate the degree of matching between scheduler cluster and dfdaemon func evaluate(conditions map[string]string, scopes Scopes) float64 { - return idcAffinityWeight*calculateIDCAffinityScore(conditions[conditionIDC], scopes.IDC) + - locationAffinityWeight*calculateMultiElementAffinityScore(conditions[conditionLocation], scopes.Location) + - netTopologyAffinityWeight*calculateMultiElementAffinityScore(conditions[conditionNetTopology], scopes.NetTopology) + return idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) + + locationAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionLocation], scopes.Location) + + netTopologyAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionNetTopology], scopes.NetTopology) } // calculateIDCAffinityScore 0.0~1.0 larger and better diff --git a/manager/service/cdn_cluster.go b/manager/service/cdn_cluster.go index 596475e00..00e343894 100644 --- a/manager/service/cdn_cluster.go +++ b/manager/service/cdn_cluster.go @@ -31,9 +31,10 @@ func (s *rest) CreateCDNCluster(ctx context.Context, json types.CreateCDNCluster } cdnCluster := model.CDNCluster{ - Name: json.Name, - BIO: json.BIO, - Config: config, + Name: json.Name, + BIO: json.BIO, + Config: config, + IsDefault: json.IsDefault, } if err := s.db.WithContext(ctx).Create(&cdnCluster).Error; err != nil { @@ -64,9 +65,10 @@ func (s *rest) UpdateCDNCluster(ctx context.Context, id uint, json types.UpdateC cdnCluster := model.CDNCluster{} if err := s.db.WithContext(ctx).First(&cdnCluster, id).Updates(model.CDNCluster{ - Name: json.Name, - BIO: json.BIO, - Config: config, + Name: json.Name, + BIO: json.BIO, + Config: config, + IsDefault: json.IsDefault, }).Error; err != nil { return nil, err } diff --git a/manager/service/service_grpc.go b/manager/service/service_grpc.go index b295f0f4b..50b6ae20e 100644 --- a/manager/service/service_grpc.go +++ b/manager/service/service_grpc.go @@ -366,7 +366,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRequest) (*manager.ListSchedulersResponse, error) { var pbListSchedulersResponse manager.ListSchedulersResponse - cacheKey := cache.MakeSchedulersCacheKey(req.HostName) + cacheKey := cache.MakeSchedulersCacheKey(req.HostName, req.Ip) // Cache Hit if err := s.cache.Get(ctx, cacheKey, &pbListSchedulersResponse); err == nil { @@ -377,18 +377,14 @@ func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRe // Cache Miss logger.Infof("%s cache miss", cacheKey) var schedulerClusters []model.SchedulerCluster - if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("Schedulers", "status = ?", "active").Find(&schedulerClusters).Error; err != nil { + if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { return nil, status.Error(codes.Unknown, err.Error()) } // Search optimal scheduler cluster schedulerCluster, ok := s.searcher.FindSchedulerCluster(schedulerClusters, req) if !ok { - if err := s.db.WithContext(ctx).Find(&schedulerCluster, &model.SchedulerCluster{ - IsDefault: true, - }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) - } + return nil, status.Error(codes.NotFound, "scheduler cluster not found") } schedulers := []model.Scheduler{} diff --git a/manager/types/cdn_cluster.go b/manager/types/cdn_cluster.go index 07ce8e6ec..c5959f84f 100644 --- a/manager/types/cdn_cluster.go +++ b/manager/types/cdn_cluster.go @@ -31,15 +31,17 @@ type AddSchedulerClusterToCDNClusterParams struct { } type CreateCDNClusterRequest struct { - Name string `json:"name" binding:"required"` - BIO string `json:"bio" binding:"omitempty"` - Config *CDNClusterConfig `json:"config" binding:"required"` + Name string `json:"name" binding:"required"` + BIO string `json:"bio" binding:"omitempty"` + Config *CDNClusterConfig `json:"config" binding:"required"` + IsDefault bool `json:"is_default" binding:"omitempty"` } type UpdateCDNClusterRequest struct { - Name string `json:"name" binding:"omitempty"` - BIO string `json:"bio" binding:"omitempty"` - Config *CDNClusterConfig `json:"config" binding:"omitempty"` + Name string `json:"name" binding:"omitempty"` + BIO string `json:"bio" binding:"omitempty"` + Config *CDNClusterConfig `json:"config" binding:"omitempty"` + IsDefault bool `json:"is_default" binding:"omitempty"` } type GetCDNClustersQuery struct { diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 9043a9dc8..37c041a64 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -436,4 +436,6 @@ func (conn *Connection) UpdateState(addrs []dfnet.NetAddr) { defer conn.rwMutex.Unlock() conn.serverNodes = addrs conn.hashRing = hashring.New(addresses) + + logger.GrpcLogger.Infof("update grpc client addresses %v", addresses) } diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index f08913262..c2d9d7b03 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -59,6 +59,8 @@ type SchedulerClient interface { LeaveTask(context.Context, *scheduler.PeerTarget, ...grpc.CallOption) error + UpdateState(addrs []dfnet.NetAddr) + Close() error } diff --git a/scheduler/config/dynconfig.go b/scheduler/config/dynconfig.go index ea934267b..3ad3af818 100644 --- a/scheduler/config/dynconfig.go +++ b/scheduler/config/dynconfig.go @@ -33,7 +33,7 @@ import ( ) var ( - DefaultDynconfigCachePath = filepath.Join(dfpath.DefaultCacheDir, "scheduler_dynconfig") + cachePath = filepath.Join(dfpath.DefaultCacheDir, "scheduler_dynconfig") ) var ( @@ -105,7 +105,7 @@ type DynconfigInterface interface { Serve() error // Stop the dynconfig listening service. - Stop() + Stop() error } type Observer interface { @@ -130,6 +130,7 @@ func NewDynconfig(sourceType dc.SourceType, cdnDirPath string, options ...dc.Opt sourceType: sourceType, } + options = append(options, dc.WithCachePath(cachePath)) client, err := dc.New(sourceType, options...) if err != nil { return nil, err @@ -310,8 +311,13 @@ func (d *dynconfig) watch() { } } -func (d *dynconfig) Stop() { +func (d *dynconfig) Stop() error { close(d.done) + if err := os.Remove(cachePath); err != nil { + return err + } + + return nil } type managerClient struct { diff --git a/scheduler/config/dynconfig_test.go b/scheduler/config/dynconfig_test.go index a72dc2e18..0718a3793 100644 --- a/scheduler/config/dynconfig_test.go +++ b/scheduler/config/dynconfig_test.go @@ -44,7 +44,7 @@ func TestDynconfigGet_ManagerSourceType(t *testing.T) { name: "get dynconfig success", expire: 10 * time.Second, cleanFileCache: func(t *testing.T) { - if err := os.Remove(DefaultDynconfigCachePath); err != nil { + if err := os.Remove(cachePath); err != nil { t.Fatal(err) } }, @@ -73,7 +73,7 @@ func TestDynconfigGet_ManagerSourceType(t *testing.T) { name: "client failed to return for the second time", expire: 10 * time.Millisecond, cleanFileCache: func(t *testing.T) { - if err := os.Remove(DefaultDynconfigCachePath); err != nil { + if err := os.Remove(cachePath); err != nil { t.Fatal(err) } }, @@ -114,7 +114,7 @@ func TestDynconfigGet_ManagerSourceType(t *testing.T) { d, err := NewDynconfig(dc.ManagerSourceType, "", []dc.Option{ dc.WithManagerClient(NewManagerClient(mockManagerClient, uint(1))), - dc.WithCachePath(DefaultDynconfigCachePath), + dc.WithCachePath(cachePath), dc.WithExpireTime(tc.expire), }...) if err != nil { diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 69d982354..a97709491 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -186,7 +186,7 @@ func (t *job) preheat(ctx context.Context, req string) error { } if piece.Done == true { - plogger.Info("preheat successed") + plogger.Info("preheat succeeded") return nil } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 153f7ec1c..bdaf22b79 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -60,7 +60,7 @@ type Server struct { managerClient managerclient.Client // Dynamic config - dynConfig config.DynconfigInterface + dynconfig config.DynconfigInterface // Async job job job.Job @@ -99,7 +99,6 @@ func New(cfg *config.Config) (*Server, error) { if s.managerClient != nil && cfg.DynConfig.Type == dynconfig.ManagerSourceType { options = append(options, dynconfig.WithManagerClient(config.NewManagerClient(s.managerClient, cfg.Manager.SchedulerClusterID)), - dynconfig.WithCachePath(config.DefaultDynconfigCachePath), dynconfig.WithExpireTime(cfg.DynConfig.ExpireTime), ) } @@ -107,7 +106,7 @@ func New(cfg *config.Config) (*Server, error) { if err != nil { return nil, err } - s.dynConfig = dynConfig + s.dynconfig = dynConfig // Initialize GC s.gc = gc.New(gc.WithLogger(logger.GcLogger)) @@ -153,7 +152,7 @@ func New(cfg *config.Config) (*Server, error) { func (s *Server) Serve() error { // Serve dynConfig go func() { - if err := s.dynConfig.Serve(); err != nil { + if err := s.dynconfig.Serve(); err != nil { logger.Fatalf("dynconfig start failed %v", err) } logger.Info("dynconfig start successfully") @@ -222,8 +221,10 @@ func (s *Server) Serve() error { } func (s *Server) Stop() { - // Stop dynamic server - s.dynConfig.Stop() + // Stop dynconfig server + if err := s.dynconfig.Stop(); err != nil { + logger.Errorf("dynconfig client closed failed %s", err) + } logger.Info("dynconfig client closed") // Stop manager client