Skip to content

Commit

Permalink
Revert "Refactor Index Management Job (#2232)"
Browse files Browse the repository at this point in the history
This reverts commit 193178e.
  • Loading branch information
ykadowak committed Nov 30, 2023
1 parent acb3934 commit 9016a8e
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 1,141 deletions.
17 changes: 17 additions & 0 deletions internal/config/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,20 @@ func (c *Corrector) Bind() *Corrector {
}
return c
}

// GetStreamListConcurrency returns the StreamListConcurrency field value if set, otherwise 200 is set,
// since not setting this could use up all the available momory
func (c *Corrector) GetStreamListConcurrency() int {
if c != nil {
return c.StreamListConcurrency
}
return 200 //nolint:gomnd
}

// GetBboltAsyncWriteConcurrency returns 2048 when not specified since not setting this could use up all the available momory
func (c *Corrector) GetBboltAsyncWriteConcurrency() int {
if c != nil {
return c.BboltAsyncWriteConcurrency
}
return 2048 //nolint:gomnd
}
250 changes: 246 additions & 4 deletions internal/config/corrector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ package config
// AgentName string
// AgentNamespace string
// AgentDNS string
// CreationPoolSize uint32
// NodeName string
// StreamListConcurrency int
// BboltAsyncWriteConcurrency int
// IndexReplica int
// Discoverer *DiscovererClient
// }
// type want struct {
Expand Down Expand Up @@ -54,10 +54,10 @@ package config
// AgentName:"",
// AgentNamespace:"",
// AgentDNS:"",
// CreationPoolSize:0,
// NodeName:"",
// StreamListConcurrency:0,
// BboltAsyncWriteConcurrency:0,
// IndexReplica:0,
// Discoverer:DiscovererClient{},
// },
// want: want{},
Expand All @@ -81,10 +81,10 @@ package config
// AgentName:"",
// AgentNamespace:"",
// AgentDNS:"",
// CreationPoolSize:0,
// NodeName:"",
// StreamListConcurrency:0,
// BboltAsyncWriteConcurrency:0,
// IndexReplica:0,
// Discoverer:DiscovererClient{},
// },
// want: want{},
Expand Down Expand Up @@ -120,10 +120,10 @@ package config
// AgentName: test.fields.AgentName,
// AgentNamespace: test.fields.AgentNamespace,
// AgentDNS: test.fields.AgentDNS,
// CreationPoolSize: test.fields.CreationPoolSize,
// NodeName: test.fields.NodeName,
// StreamListConcurrency: test.fields.StreamListConcurrency,
// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency,
// IndexReplica: test.fields.IndexReplica,
// Discoverer: test.fields.Discoverer,
// }
//
Expand All @@ -135,3 +135,245 @@ package config
// })
// }
// }
//
// func TestCorrector_GetStreamListConcurrency(t *testing.T) {
// type fields struct {
// AgentPort int
// AgentName string
// AgentNamespace string
// AgentDNS string
// CreationPoolSize uint32
// NodeName string
// StreamListConcurrency int
// BboltAsyncWriteConcurrency int
// Discoverer *DiscovererClient
// }
// type want struct {
// want int
// }
// type test struct {
// name string
// fields fields
// want want
// checkFunc func(want, int) error
// beforeFunc func(*testing.T)
// afterFunc func(*testing.T)
// }
// defaultCheckFunc := func(w want, got int) error {
// if !reflect.DeepEqual(got, w.want) {
// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
// }
// return nil
// }
// tests := []test{
// // TODO test cases
// /*
// {
// name: "test_case_1",
// fields: fields {
// AgentPort:0,
// AgentName:"",
// AgentNamespace:"",
// AgentDNS:"",
// CreationPoolSize:0,
// NodeName:"",
// StreamListConcurrency:0,
// BboltAsyncWriteConcurrency:0,
// Discoverer:DiscovererClient{},
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T,) {
// t.Helper()
// },
// afterFunc: func(t *testing.T,) {
// t.Helper()
// },
// },
// */
//
// // TODO test cases
// /*
// func() test {
// return test {
// name: "test_case_2",
// fields: fields {
// AgentPort:0,
// AgentName:"",
// AgentNamespace:"",
// AgentDNS:"",
// CreationPoolSize:0,
// NodeName:"",
// StreamListConcurrency:0,
// BboltAsyncWriteConcurrency:0,
// Discoverer:DiscovererClient{},
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T,) {
// t.Helper()
// },
// afterFunc: func(t *testing.T,) {
// t.Helper()
// },
// }
// }(),
// */
// }
//
// for _, tc := range tests {
// test := tc
// t.Run(test.name, func(tt *testing.T) {
// tt.Parallel()
// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
// if test.beforeFunc != nil {
// test.beforeFunc(tt)
// }
// if test.afterFunc != nil {
// defer test.afterFunc(tt)
// }
// checkFunc := test.checkFunc
// if test.checkFunc == nil {
// checkFunc = defaultCheckFunc
// }
// c := &Corrector{
// AgentPort: test.fields.AgentPort,
// AgentName: test.fields.AgentName,
// AgentNamespace: test.fields.AgentNamespace,
// AgentDNS: test.fields.AgentDNS,
// CreationPoolSize: test.fields.CreationPoolSize,
// NodeName: test.fields.NodeName,
// StreamListConcurrency: test.fields.StreamListConcurrency,
// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency,
// Discoverer: test.fields.Discoverer,
// }
//
// got := c.GetStreamListConcurrency()
// if err := checkFunc(test.want, got); err != nil {
// tt.Errorf("error = %v", err)
// }
//
// })
// }
// }
//
// func TestCorrector_GetBboltAsyncWriteConcurrency(t *testing.T) {
// type fields struct {
// AgentPort int
// AgentName string
// AgentNamespace string
// AgentDNS string
// CreationPoolSize uint32
// NodeName string
// StreamListConcurrency int
// BboltAsyncWriteConcurrency int
// Discoverer *DiscovererClient
// }
// type want struct {
// want int
// }
// type test struct {
// name string
// fields fields
// want want
// checkFunc func(want, int) error
// beforeFunc func(*testing.T)
// afterFunc func(*testing.T)
// }
// defaultCheckFunc := func(w want, got int) error {
// if !reflect.DeepEqual(got, w.want) {
// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
// }
// return nil
// }
// tests := []test{
// // TODO test cases
// /*
// {
// name: "test_case_1",
// fields: fields {
// AgentPort:0,
// AgentName:"",
// AgentNamespace:"",
// AgentDNS:"",
// CreationPoolSize:0,
// NodeName:"",
// StreamListConcurrency:0,
// BboltAsyncWriteConcurrency:0,
// Discoverer:DiscovererClient{},
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T,) {
// t.Helper()
// },
// afterFunc: func(t *testing.T,) {
// t.Helper()
// },
// },
// */
//
// // TODO test cases
// /*
// func() test {
// return test {
// name: "test_case_2",
// fields: fields {
// AgentPort:0,
// AgentName:"",
// AgentNamespace:"",
// AgentDNS:"",
// CreationPoolSize:0,
// NodeName:"",
// StreamListConcurrency:0,
// BboltAsyncWriteConcurrency:0,
// Discoverer:DiscovererClient{},
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T,) {
// t.Helper()
// },
// afterFunc: func(t *testing.T,) {
// t.Helper()
// },
// }
// }(),
// */
// }
//
// for _, tc := range tests {
// test := tc
// t.Run(test.name, func(tt *testing.T) {
// tt.Parallel()
// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
// if test.beforeFunc != nil {
// test.beforeFunc(tt)
// }
// if test.afterFunc != nil {
// defer test.afterFunc(tt)
// }
// checkFunc := test.checkFunc
// if test.checkFunc == nil {
// checkFunc = defaultCheckFunc
// }
// c := &Corrector{
// AgentPort: test.fields.AgentPort,
// AgentName: test.fields.AgentName,
// AgentNamespace: test.fields.AgentNamespace,
// AgentDNS: test.fields.AgentDNS,
// CreationPoolSize: test.fields.CreationPoolSize,
// NodeName: test.fields.NodeName,
// StreamListConcurrency: test.fields.StreamListConcurrency,
// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency,
// Discoverer: test.fields.Discoverer,
// }
//
// got := c.GetBboltAsyncWriteConcurrency()
// if err := checkFunc(test.want, got); err != nil {
// tt.Errorf("error = %v", err)
// }
//
// })
// }
// }
2 changes: 1 addition & 1 deletion pkg/index/job/correction/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewConfig(path string) (cfg *Data, err error) {
if cfg.Corrector != nil {
cfg.Corrector = cfg.Corrector.Bind()
} else {
return nil, errors.ErrInvalidConfig
cfg.Corrector = new(config.Corrector).Bind()
}

return cfg, nil
Expand Down
Loading

0 comments on commit 9016a8e

Please sign in to comment.