Skip to content

Commit

Permalink
add corrector logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Aug 14, 2023
1 parent 089d824 commit bb037ab
Show file tree
Hide file tree
Showing 10 changed files with 652 additions and 30 deletions.
26 changes: 26 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2631,3 +2631,29 @@ manager:
net:
dialer:
keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual
# @schema {"name": "manager.index.corrector", "type": "object"}
corrector:
# @schema {"name": "manager.index.corrector.agent_namespace", "type": "string"}
# manager.index.corrector.agent_namespace -- namespace of agent pods to manage
agent_namespace: _MY_POD_NAMESPACE_
# @schema {"name": "manager.index.corrector.node_name", "type": "string"}
# manager.index.corrector.node_name -- node name
node_name: "" # _MY_NODE_NAME_
# @schema {"name": "manager.index.corrector.concurrency", "type": "integer", "minimum": 1}
# manager.index.corrector.concurrency -- concurrency
concurrency: 1
# @schema {"name": "manager.index.corrector.discoverer", "type": "object"}
discoverer:
# @schema {"name": "manager.index.corrector.discoverer.duration", "type": "string"}
# manager.index.corrector.discoverer.duration -- refresh duration to discover
duration: 500ms
# @schema {"name": "manager.index.corrector.discoverer.client", "alias": "grpc.client"}
# manager.index.corrector.discoverer.client -- gRPC client for discoverer (overrides defaults.grpc.client)
client: {}
# @schema {"name": "manager.index.corrector.discoverer.agent_client_options", "alias": "grpc.client"}
# manager.index.corrector.discoverer.agent_client_options -- gRPC client options for agents (overrides defaults.grpc.client)
agent_client_options:
dial_option:
net:
dialer:
keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual
20 changes: 15 additions & 5 deletions cmd/index/job/correction/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main

import (
Expand All @@ -8,10 +21,8 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/runner"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/index/job/correction/config"
"github.com/vdaas/vald/pkg/index/job/correction/usecase"


"github.com/vdaas/vald/pkg/manager/index/config" // FIXME: あとで独自のconfigに切り替え
)

const (
Expand All @@ -21,7 +32,6 @@ const (
)

func main() {
// FIXME: demon前提なので基本的に止まらない。独自のrunnerを作る必要があるか
if err := safety.RecoverFunc(func() error {
return runner.Do(
context.Background(),
Expand All @@ -35,7 +45,7 @@ func main() {
return cfg, &cfg.GlobalConfig, nil
}),
runner.WithDaemonInitializer(func(cfg interface{}) (runner.Runner, error) {
return usecase.New()
return usecase.New(cfg.(*config.Data))
}),
)
})(); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/index/job/correction/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ server_config:
cert: /path/to/cert
enabled: false
key: /path/to/key
gateway:
index_replica: 3
corrector:
agent_port: 8081
agent_name: "vald-agent-ngt"
agent_dns: vald-agent-ngt.default.svc.cluster.local
agent_namespace: "_MY_POD_NAMESPACE_"
agent_namespace: "default"
node_name: ""
discoverer:
duration: 500ms
Expand Down Expand Up @@ -197,4 +199,3 @@ corrector:
cert: /path/to/cert
enabled: false
key: /path/to/key

18 changes: 9 additions & 9 deletions internal/config/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ type Corrector struct {
}

// Bind binds the actual data from the Indexer receiver field.
func (im *Corrector) Bind() *Corrector {
im.AgentName = GetActualValue(im.AgentName)
im.AgentNamespace = GetActualValue(im.AgentNamespace)
im.AgentDNS = GetActualValue(im.AgentDNS)
im.NodeName = GetActualValue(im.NodeName)

if im.Discoverer != nil {
im.Discoverer = im.Discoverer.Bind()
func (c *Corrector) Bind() *Corrector {
c.AgentName = GetActualValue(c.AgentName)
c.AgentNamespace = GetActualValue(c.AgentNamespace)
c.AgentDNS = GetActualValue(c.AgentDNS)
c.NodeName = GetActualValue(c.NodeName)

if c.Discoverer != nil {
c.Discoverer = c.Discoverer.Bind()
}
return im
return c
}
20 changes: 20 additions & 0 deletions internal/errors/corrector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package errors provides error types and function
package errors

var ErrIndexReplicaOne = New("nothing to correct when index replica is 1")
2 changes: 2 additions & 0 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func Run(ctx context.Context, run Runner, name string) (err error) {
emap[err.Error()]++
}

// waif for all the goroutines to finish.
// this errgroup is global across the program
err = errgroup.Wait()
if err != nil &&
!errors.Is(err, context.DeadlineExceeded) &&
Expand Down
1 change: 0 additions & 1 deletion internal/servers/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func (s *server) ListenAndServe(ctx context.Context, ech chan<- error) (err erro
s.mu.RUnlock()
log.Infof("%s server %s stopped", s.mode.String(), s.name)
}
return nil
}))
}
return nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/index/job/correction/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Data struct {

// Indexer represent agent auto indexing service configuration
Corrector *config.Corrector `json:"corrector" yaml:"corrector"`

// FIXME: ここから読み込むときLB側の設定とのconsistencyをどう担保するのか
// Gateway represent agent gateway service configuration
Gateway *config.LB `json:"gateway" yaml:"gateway"`
}

func NewConfig(path string) (cfg *Data, err error) {
Expand Down Expand Up @@ -71,5 +75,12 @@ func NewConfig(path string) (cfg *Data, err error) {
} else {
cfg.Corrector = new(config.Corrector).Bind()
}

if cfg.Gateway != nil {
cfg.Gateway = cfg.Gateway.Bind()
} else {
cfg.Gateway = new(config.LB).Bind()
}

return cfg, nil
}
Loading

0 comments on commit bb037ab

Please sign in to comment.