Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file watch to support config reload on file change #4454

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

- Add semconv 1.7.0 and 1.8.0 (#4452)
- Added `feature-gates` CLI flag for controlling feature gate state. (#4368)
- Add file watch to support config reload on file change (#4454)

## v0.39.0 Beta

Expand Down
65 changes: 63 additions & 2 deletions config/configmapprovider/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"go.opentelemetry.io/collector/config"
)

var errNoOp = errors.New("no change")

type fileMapProvider struct {
fileName string
watching bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this? It does't seem to be set anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this one. this is to ensure that we create a watch only once as Retrieve is called each time the onChange is invoked during a file change.

}

// NewFile returns a new Provider that reads the configuration from the given file.
Expand All @@ -33,7 +38,7 @@ func NewFile(fileName string) Provider {
}
}

func (fmp *fileMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (Retrieved, error) {
func (fmp *fileMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
if fmp.fileName == "" {
return nil, errors.New("config file not specified")
}
Expand All @@ -43,9 +48,65 @@ func (fmp *fileMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (R
return nil, fmt.Errorf("error loading config file %q: %w", fmp.fileName, err)
}

// Ensure that watches are only created once as Retrieve is called each time that `onChange()` is invoked.
if !fmp.watching {
watchFile(ctx, fmp.fileName, onChange)
fmp.watching = true
}

return &simpleRetrieved{confMap: cp}, nil
Copy link
Member

@tigrannajaryan tigrannajaryan Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will work as expected by the Provider interface. The Retrieved that is returned is expected to implement a Close function that stops watching and guarantees that onChange will not be called after that. See

// Close signals that the configuration for which it was used to retrieve values is

}

func (*fileMapProvider) Shutdown(context.Context) error {
func (*fileMapProvider) Shutdown(_ context.Context) error {
return nil
}

// watchFile sets up a watch on a filename to detect changes and calls onChange() with a suitable ChangeEvent.
// The watch is cancelled when the context is done.
func watchFile(ctx context.Context, filename string, onChange func(*ChangeEvent)) {
var (
lastfi os.FileInfo
)

check := func() error {
currfi, err := os.Stat(filename)
modified := false
if err != nil {
if os.IsNotExist(err) && lastfi != nil {
return errNoOp
}
return err
}
if lastfi != nil && (currfi.Size() != lastfi.Size() || !currfi.ModTime().Equal(lastfi.ModTime())) {
modified = true
}

lastfi = currfi
if modified {
return nil
}

return errNoOp
}

// Check the file every second and if it's been updated then initiate a config reload.
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// If check returns a valid event, exit the loop. A new watch will be placed on the next Retrieve()
err := check()
if err == nil || err != errNoOp {
time.Sleep(time.Second * 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this will result in reading the file 2 seconds after the first modification. A slightly better approach is to wait 2 seconds after the last modification. The difference is small but may be visible if we have small writers. It is probably fine for now.

onChange(&ChangeEvent{
Error: err,
})
}
}
}
}()
}
81 changes: 81 additions & 0 deletions config/configmapprovider/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configmapprovider

import (
"context"
"io/ioutil"
"os"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestWatchFile(t *testing.T) {
// Create a temporary file.
file, err := ioutil.TempFile("", "file_watcher_test")
require.NoError(t, err)
defer func() {
file.Close()
os.Remove(file.Name())
}()

received := atomic.Value{}
received.Store(false)

// Write some initial content.
_, err = file.WriteString("hello")
require.NoError(t, err)
require.NoError(t, file.Sync())

// Setup the watch.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watchFile(ctx, file.Name(), testingOnChange(t, &received))
time.Sleep(time.Second * 2)

// Update the file and verify we see the updated content.
_, err = file.WriteString(" world")
require.NoError(t, err)
require.NoError(t, file.Sync())

require.Eventually(t, func() bool {
return received.Load().(bool)
}, time.Second*10, time.Second)

// Cancel the context.
cancel()
}

func TestWatchFile_ReloadError(t *testing.T) {
// Create then delete a temporary file so we have a filename that we know can't be opened.
file, err := ioutil.TempFile("", "file_watcher_test")
require.NoError(t, err)
_ = file.Close()
require.NoError(t, os.Remove(file.Name()))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watchFile(ctx, file.Name(), func(event *ChangeEvent) {})
}

func testingOnChange(t *testing.T, r *atomic.Value) func(c *ChangeEvent) {
return func(c *ChangeEvent) {
require.Nil(t, c.Error)
r.Store(true)
}
}
2 changes: 1 addition & 1 deletion config/configmapprovider/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewProperties(properties []string) Provider {
}
}

func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
func (pmp *propertiesMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (Retrieved, error) {
if len(pmp.properties) == 0 {
return &simpleRetrieved{confMap: config.NewMap()}, nil
}
Expand Down
48 changes: 29 additions & 19 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
)

// State defines Collector's state.
type State int

const (
Expand Down Expand Up @@ -106,7 +106,6 @@ func New(set CollectorSettings) (*Collector, error) {
set: set,
state: state,
}, nil

}

// GetState returns current state of the collector server.
Expand Down Expand Up @@ -147,16 +146,24 @@ LOOP:
select {
case err := <-col.cfgW.watcher:
col.logger.Warn("Config updated", zap.Error(err))
if err != nil {
goto LOOP
}

col.setCollectorState(Closing)

if err = col.cfgW.close(ctx); err != nil {
return fmt.Errorf("failed to close config watcher: %w", err)
// Get the config and ensure that it is valid. If it is not, keep running in the last valid state and
// keep trying until the configuration is fixed.
cfg, err := col.cfgW.get()
if err != nil {
col.logger.Error("unable to parse configuration", zap.Error(err))
goto LOOP
}

col.setCollectorState(Closing)
if err = col.service.Shutdown(ctx); err != nil {
return fmt.Errorf("failed to shutdown the retiring config: %w", err)
}
if err = col.setupConfigurationComponents(ctx); err != nil {

if err = col.setupConfigurationComponents(ctx, cfg); err != nil {
return fmt.Errorf("failed to setup configuration components: %w", err)
}
case err := <-col.asyncErrorChannel:
Expand All @@ -175,24 +182,20 @@ LOOP:

// setupConfigurationComponents loads the config and starts the components. If all the steps succeeds it
// sets the col.service with the service currently running.
func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
col.setCollectorState(Starting)

func (col *Collector) setupConfigurationComponents(ctx context.Context, cfg *config.Config) error {
var err error
if col.cfgW, err = newConfigWatcher(ctx, col.set); err != nil {
return err
}

if col.logger, err = telemetrylogs.NewLogger(col.cfgW.cfg.Service.Telemetry.Logs, col.set.LoggingOptions); err != nil {
if col.logger, err = telemetrylogs.NewLogger(cfg.Service.Telemetry.Logs, col.set.LoggingOptions); err != nil {
return fmt.Errorf("failed to get logger: %w", err)
}

col.logger.Info("Applying configuration...")
col.setCollectorState(Starting)

col.service, err = newService(&svcSettings{
BuildInfo: col.set.BuildInfo,
Factories: col.set.Factories,
Config: col.cfgW.cfg,
Config: cfg,
Telemetry: component.TelemetrySettings{
Logger: col.logger,
TracerProvider: col.tracerProvider,
Expand All @@ -210,6 +213,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

return nil

}

// Run starts the collector according to the given configuration given, and waits for it to complete.
Expand All @@ -228,7 +232,14 @@ func (col *Collector) Run(ctx context.Context) error {

col.asyncErrorChannel = make(chan error)

if err := col.setupConfigurationComponents(ctx); err != nil {
// set up config watcher
col.cfgW = newConfigWatcher(ctx, col.set)
cfg, err := col.cfgW.get()
if err != nil {
return fmt.Errorf("unable to get config: %w", err)
}

if err := col.setupConfigurationComponents(ctx, cfg); err != nil {
return err
}

Expand All @@ -254,9 +265,8 @@ func (col *Collector) shutdown(ctx context.Context) error {
// Begin shutdown sequence.
col.logger.Info("Starting shutdown...")

if err := col.cfgW.close(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to close config provider watcher: %w", err))
}
// Close the config watcher
col.cfgW.close()

if err := col.set.ConfigMapProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err))
Expand Down
45 changes: 31 additions & 14 deletions service/config_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,54 @@ import (
)

type configWatcher struct {
cfg *config.Config
ret configmapprovider.Retrieved
ctx context.Context
set CollectorSettings
watcher chan error
ret configmapprovider.Retrieved
}

func newConfigWatcher(ctx context.Context, set CollectorSettings) (*configWatcher, error) {
cm := &configWatcher{watcher: make(chan error, 1)}
func newConfigWatcher(ctx context.Context, set CollectorSettings) *configWatcher {
cm := &configWatcher{
ctx: ctx,
watcher: make(chan error, 1),
set: set,
}

return cm
}

ret, err := set.ConfigMapProvider.Retrieve(ctx, cm.onChange)
func (cm *configWatcher) get() (*config.Config, error) {
// Ensure that a previously existing Retrieved is closed out properly.
if cm.ret != nil {
err := cm.ret.Close(cm.ctx)
if err != nil {
return nil, fmt.Errorf("cannot close previously retrieved config: %w", err)
}
}

var (
cfg *config.Config
err error
)
cm.ret, err = cm.set.ConfigMapProvider.Retrieve(cm.ctx, cm.onChange)
if err != nil {
return nil, fmt.Errorf("cannot retrieve the configuration: %w", err)
}

var cfg *config.Config
m, err := ret.Get(ctx)
m, err := cm.ret.Get(cm.ctx)
if err != nil {
return nil, fmt.Errorf("cannot get the configuration: %w", err)
}
if cfg, err = set.ConfigUnmarshaler.Unmarshal(m, set.Factories); err != nil {

if cfg, err = cm.set.ConfigUnmarshaler.Unmarshal(m, cm.set.Factories); err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err)
}

if err = cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid configuration: %w", err)
}

cm.cfg = cfg
cm.ret = ret

return cm, nil
return cfg, nil
}

func (cm *configWatcher) onChange(event *configmapprovider.ChangeEvent) {
Expand All @@ -62,7 +80,6 @@ func (cm *configWatcher) onChange(event *configmapprovider.ChangeEvent) {
}
}

func (cm *configWatcher) close(ctx context.Context) error {
func (cm *configWatcher) close() {
close(cm.watcher)
return cm.ret.Close(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have modified the code flow in a way that the config watcher is only created once in the lifecycle of the collector as compared to how it was originally implemented where a config watcher is created per change in the config file. once that change was made, it didnt make sense to close the retrieved and pass the error down. i moved that logic into a get() method that follows the Retrieve() -> watch -> Close() lifecycle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have modified the code flow in a way that the config watcher is only created once in the lifecycle of the collector

The new rearranged code is harder to follow and understand. Please refactor it to clearly show that the code follows lifecycle described in the Provider comments:

// The typical usage is the following:
//
//		r := mapProvider.Retrieve()
//		r.Get()
//		// wait for onChange() to be called.
//		r.Close()
//		r = mapProvider.Retrieve()
//		r.Get()
//		// wait for onChange() to be called.
//		r.Close()
//		// repeat Retrieve/Get/wait/Close cycle until it is time to shut down the Collector process.
//		// ...
//		mapProvider.Shutdown()

It was more visible before this change, admittedly it was not ideal but was better than what we have now. Now it is even harder to see that we are actually following the required lifecycle. All the current loop in runAndWaitForShutdownEvent shows is a watch, followed by get().

}
Loading