-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathconfig.go
153 lines (125 loc) · 4.52 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright 2020 CrowdStrike Holdings, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package egress
import (
"fmt"
"strings"
"time"
"github.com/CrowdStrike/kafka-replicator/pkg/core"
"github.com/CrowdStrike/kafka-replicator/pkg/utils"
"github.com/pkg/errors"
)
// DefaultTopicConfig is the default egress topic configuration.
var DefaultTopicConfig = TopicConfig{
MaxSegmentMessages: 1000000,
MaxSegmentSize: 100 * 1024 * 1024, // 100 MiB
MaxSegmentAge: 5 * time.Minute,
WorkerChanSize: 10000,
WorkerStopTimeout: 10 * time.Second,
}
// ControllerConfig represents the egress controller configuration
type ControllerConfig struct {
// Consumer is used to consume topic messages.
Consumer core.Factory `required:"true"`
// SegmentStore is used to write segments.
SegmentStore core.Factory `required:"true"`
// Unique name that identifies the local region/data center/cloud.
//
// Field value is required.
LocalRegion string `required:"true"`
// Source Kafka topic names that will be consumed and written to segment store.
//
// Will use DefaultTopicConfig if topic config was not set.
//
// Field value is required.
Topics map[string]*TopicConfig `required:"true"`
}
// TopicConfig represents the egress topic configuration.
type TopicConfig struct {
// Maximum number of messages written to a segment.
MaxSegmentMessages int `min:"2"`
// Maximum byte size of messages contained in a segment.
//
// The value is the raw Kafka message size before compression
// and other possible encoding performed by the formatter.
MaxSegmentSize uint64 `min:"1"`
// Maximum duration from segment creation to completion.
//
// A higher value results in fewer and larger segments being created
// at the expense of increased message delivery latency.
MaxSegmentAge time.Duration `min:"1ms"`
// Size of egress worker buffered channel.
WorkerChanSize int `min:"1"`
// Egress worker shutdown grace period.
WorkerStopTimeout time.Duration `min:"1ms"`
// Retrier instance used for Consumer operations
ConsumerRetrier core.Retrier
// Retrier instance used for SegmentStore operations
SegmentStoreRetrier core.Retrier
}
// New returns a new egress controller instance
func New(config ControllerConfig) (*Controller, error) {
config, err := validateConfig(config)
if err != nil {
return nil, err
}
instances, err := utils.CallFactory(
utils.FactoryType{Factory: config.Consumer, Type: (*core.Consumer)(nil)},
utils.FactoryType{Factory: config.SegmentStore, Type: (*core.SegmentStore)(nil)})
if err != nil {
return nil, err
}
return &Controller{
config: config,
consumer: instances[0].(core.Consumer),
segmentStore: instances[1].(core.SegmentStore),
nextWorkerID: 1,
workers: map[workerKey]*worker{},
controlChan: make(chan interface{}, 1),
}, nil
}
func validateConfig(config ControllerConfig) (ControllerConfig, error) {
var errMessages []string
if errMessage := utils.Validate(config); len(errMessage) > 0 {
errMessages = append(errMessages, errMessage)
}
addTopicError := func(topic, message string) {
errMessages = append(errMessages, fmt.Sprintf("Topic %s: %s", topic, message))
}
topics := map[string]*TopicConfig{}
for topic, cfg := range config.Topics {
if len(topic) == 0 {
errMessages = append(errMessages, "Empty topic name")
continue
}
tconfig := DefaultTopicConfig
if cfg != nil {
tconfig = *cfg
utils.SetDefaultInt(&tconfig.WorkerChanSize, DefaultTopicConfig.WorkerChanSize)
utils.SetDefaultDuration(&tconfig.WorkerStopTimeout, DefaultTopicConfig.WorkerStopTimeout)
}
utils.SetDefaultRetrier(&tconfig.ConsumerRetrier, core.DefaultKafkaRetrier)
utils.SetDefaultRetrier(&tconfig.SegmentStoreRetrier, core.DefaultS3Retrier)
if errMessage := utils.Validate(tconfig); len(errMessage) > 0 {
addTopicError(topic, errMessage)
continue
}
topics[topic] = &tconfig
}
if len(errMessages) > 0 {
return ControllerConfig{}, errors.Errorf("Invalid egress config: %s", strings.Join(errMessages, "; "))
}
config.Topics = topics
return config, nil
}