diff --git a/cmd/otelcol/main.go b/cmd/otelcol/main.go index f40a39532f0..32ffbb51786 100644 --- a/cmd/otelcol/main.go +++ b/cmd/otelcol/main.go @@ -17,6 +17,7 @@ package main import ( + "fmt" "log" "go.opentelemetry.io/collector/internal/version" @@ -25,14 +26,19 @@ import ( ) func main() { - handleErr := func(message string, err error) { - if err != nil { - log.Fatalf("%s: %v", message, err) - } + params, err := applicationParameters() + if err != nil { + log.Fatal(err) } + run(params) +} + +func applicationParameters() (service.Parameters, error) { factories, err := defaultcomponents.Components() - handleErr("Failed to build default components", err) + if err != nil { + return service.Parameters{}, fmt.Errorf("failed to build default components: %v", err) + } info := service.ApplicationStartInfo{ ExeName: "otelcol", @@ -41,9 +47,17 @@ func main() { GitHash: version.GitHash, } - svc, err := service.New(service.Parameters{ApplicationStartInfo: info, Factories: factories}) - handleErr("Failed to construct the application", err) + return service.Parameters{Factories: factories, ApplicationStartInfo: info}, nil +} - err = svc.Start() - handleErr("Application run finished with error", err) +func runInteractive(params service.Parameters) { + app, err := service.New(params) + if err != nil { + log.Fatalf("failed to construct the application: %v", err) + } + + err = app.Start() + if err != nil { + log.Fatalf("application run finished with error: %v", err) + } } diff --git a/cmd/otelcol/main_others.go b/cmd/otelcol/main_others.go new file mode 100644 index 00000000000..72ea43e6514 --- /dev/null +++ b/cmd/otelcol/main_others.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package main + +import "go.opentelemetry.io/collector/service" + +func run(params service.Parameters) { + runInteractive(params) +} diff --git a/cmd/otelcol/main_windows.go b/cmd/otelcol/main_windows.go new file mode 100644 index 00000000000..5cdcfb02494 --- /dev/null +++ b/cmd/otelcol/main_windows.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package main + +import ( + "log" + + "golang.org/x/sys/windows/svc" + + "go.opentelemetry.io/collector/service" +) + +func run(params service.Parameters) { + isInteractive, err := svc.IsAnInteractiveSession() + if err != nil { + log.Fatalf("failed to determine if we are running in an interactive session: %v", err) + } + + if isInteractive { + runInteractive(params) + } else { + runService(params) + } +} + +func runService(params service.Parameters) { + // do not need to supply service name when startup is invoked through Service Control Manager directly + err := svc.Run("", service.NewWindowsService(params)) + if err != nil { + log.Fatalf("failed to start service: %v", err) + } +} diff --git a/service/logger.go b/service/logger.go index 38b22c911e5..1246181611c 100644 --- a/service/logger.go +++ b/service/logger.go @@ -39,7 +39,7 @@ func loggerFlags(flags *flag.FlagSet) { loggerProfilePtr = flags.String(logProfileCfg, "", "Logging profile to use (dev, prod)") } -func newLogger() (*zap.Logger, error) { +func newLogger(hooks ...func(zapcore.Entry) error) (*zap.Logger, error) { var level zapcore.Level err := (&level).UnmarshalText([]byte(*loggerLevelPtr)) if err != nil { @@ -62,5 +62,5 @@ func newLogger() (*zap.Logger, error) { } conf.Level.SetLevel(level) - return conf.Build() + return conf.Build(zap.Hooks(hooks...)) } diff --git a/service/service.go b/service/service.go index 2da897deda9..e1aadc9aab1 100644 --- a/service/service.go +++ b/service/service.go @@ -32,6 +32,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" @@ -83,6 +84,9 @@ type Application struct { // stopTestChan is used to terminate the application in end to end tests. stopTestChan chan struct{} + // signalsChannel is used to receive termination signals from the OS. + signalsChannel chan os.Signal + // asyncErrorChannel is used to signal a fatal error from any component. asyncErrorChannel chan error } @@ -118,6 +122,8 @@ type Parameters struct { // If it is not provided the default factory (FileLoaderConfigFactory) is used. // The default factory loads the configuration specified as a command line flag. ConfigFactory ConfigFactory + // LoggingHooks provides a way to supply a hook into logging events + LoggingHooks []func(zapcore.Entry) error } // ConfigFactory creates config. @@ -156,7 +162,7 @@ func New(params Parameters) (*Application, error) { Use: params.ApplicationStartInfo.ExeName, Long: params.ApplicationStartInfo.LongName, RunE: func(cmd *cobra.Command, args []string) error { - err := app.init() + err := app.init(params.LoggingHooks...) if err != nil { return err } @@ -237,8 +243,8 @@ func (app *Application) SignalTestComplete() { close(app.stopTestChan) } -func (app *Application) init() error { - l, err := newLogger() +func (app *Application) init(hooks ...func(zapcore.Entry) error) error { + l, err := newLogger(hooks...) if err != nil { return errors.Wrap(err, "failed to get logger") } @@ -261,19 +267,17 @@ func (app *Application) setupTelemetry(ballastSizeBytes uint64) error { func (app *Application) runAndWaitForShutdownEvent() { app.logger.Info("Everything is ready. Begin running and processing data.") - // Plug SIGTERM signal into a channel. - signalsChannel := make(chan os.Signal, 1) - signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + // plug SIGTERM signal into a channel. + app.signalsChannel = make(chan os.Signal, 1) + signal.Notify(app.signalsChannel, os.Interrupt, syscall.SIGTERM) // set the channel to stop testing. app.stopTestChan = make(chan struct{}) - // notify tests that it is ready. - app.stateChannel <- Running select { case err := <-app.asyncErrorChannel: app.logger.Error("Asynchronous error received, terminating process", zap.Error(err)) - case s := <-signalsChannel: + case s := <-app.signalsChannel: app.logger.Info("Received signal from OS", zap.String("signal", s.String())) case <-app.stopTestChan: app.logger.Info("Received stop test request") diff --git a/service/service_test.go b/service/service_test.go index 77026c2a48d..b6fa3064035 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -26,10 +26,12 @@ import ( "strings" "testing" + "github.com/pkg/errors" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" @@ -43,7 +45,13 @@ func TestApplication_Start(t *testing.T) { factories, err := defaultcomponents.Components() require.NoError(t, err) - app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}}) + loggingHookCalled := false + hook := func(entry zapcore.Entry) error { + loggingHookCalled = true + return nil + } + + app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}, LoggingHooks: []func(entry zapcore.Entry) error{hook}}) require.NoError(t, err) assert.Equal(t, app.rootCmd, app.Command()) @@ -66,6 +74,7 @@ func TestApplication_Start(t *testing.T) { assert.Equal(t, Running, <-app.GetStateChannel()) require.True(t, isAppAvailable(t, "http://localhost:13133")) assert.Equal(t, app.logger, app.GetLogger()) + assert.True(t, loggingHookCalled) assertMetricsPrefix(t, testPrefix, metricsPort) @@ -75,6 +84,29 @@ func TestApplication_Start(t *testing.T) { assert.Equal(t, Closed, <-app.GetStateChannel()) } +func TestApplication_ReportError(t *testing.T) { + factories, err := defaultcomponents.Components() + require.NoError(t, err) + + app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}}) + require.NoError(t, err) + + app.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"}) + + appDone := make(chan struct{}) + go func() { + defer close(appDone) + assert.NoError(t, app.Start()) + }() + + assert.Equal(t, Starting, <-app.GetStateChannel()) + assert.Equal(t, Running, <-app.GetStateChannel()) + app.ReportFatalError(errors.New("err1")) + <-appDone + assert.Equal(t, Closing, <-app.GetStateChannel()) + assert.Equal(t, Closed, <-app.GetStateChannel()) +} + func TestApplication_StartAsGoRoutine(t *testing.T) { factories, err := defaultcomponents.Components() require.NoError(t, err) diff --git a/service/service_windows.go b/service/service_windows.go new file mode 100644 index 00000000000..ce9c58f9594 --- /dev/null +++ b/service/service_windows.go @@ -0,0 +1,135 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package service + +import ( + "fmt" + "log" + "syscall" + + "go.uber.org/zap/zapcore" + "golang.org/x/sys/windows/svc" + "golang.org/x/sys/windows/svc/eventlog" +) + +type WindowsService struct { + app *Application + params Parameters +} + +func NewWindowsService(params Parameters) *WindowsService { + return &WindowsService{params: params} +} + +func (s *WindowsService) Execute(args []string, requests <-chan svc.ChangeRequest, changes chan<- svc.Status) (ssec bool, errno uint32) { + if len(args) == 0 { + log.Fatal("expected first argument supplied to service.Execute to be the service name") + } + + changes <- svc.Status{State: svc.StartPending} + s.start(args[0], s.params) + changes <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop | svc.AcceptShutdown} + + for req := range requests { + switch req.Cmd { + case svc.Interrogate: + changes <- req.CurrentStatus + case svc.Stop, svc.Shutdown: + changes <- svc.Status{State: svc.StopPending} + s.stop() + changes <- svc.Status{State: svc.Stopped} + return + default: + log.Fatalf(fmt.Sprintf("unexpected control request #%d", req)) + } + } + + return +} + +func (s *WindowsService) start(logSourceName string, params Parameters) { + var err error + s.app, err = newWithEventViewerLoggingHook(logSourceName, params) + if err != nil { + log.Fatal(err) + } + + // app.Start blocks until receiving a SIGTERM signal, so we need to start it asynchronously + go func() { + err = s.app.Start() + if err != nil { + log.Fatalf("application run finished with error: %v", err) + } + }() + + // wait until the app is in the Running state + for state := range s.app.GetStateChannel() { + if state == Running { + break + } + } +} + +func (s *WindowsService) stop() { + // simulate a SIGTERM signal to terminate the application + s.app.signalsChannel <- syscall.SIGTERM + + // wait until the app is in the Closed state + for state := range s.app.GetStateChannel() { + if state == Closed { + break + } + } + + s.app = nil +} + +func newWithEventViewerLoggingHook(serviceName string, params Parameters) (*Application, error) { + elog, err := eventlog.Open(serviceName) + if err != nil { + return nil, fmt.Errorf("failed to open event log: %v", err) + } + + params.LoggingHooks = append( + params.LoggingHooks, + func(entry zapcore.Entry) error { + msg := fmt.Sprintf("%v\r\n\r\nStack Trace:\r\n%v", entry.Message, entry.Stack) + + switch entry.Level { + case zapcore.FatalLevel, zapcore.PanicLevel, zapcore.DPanicLevel: + // golang.org/x/sys/windows/svc/eventlog does not support Critical level event logs + return elog.Error(3, msg) + case zapcore.ErrorLevel: + return elog.Error(3, msg) + case zapcore.WarnLevel: + return elog.Warning(2, msg) + case zapcore.InfoLevel: + return elog.Info(1, msg) + } + + // ignore Debug level logs + return nil + }, + ) + + app, err := New(params) + if err != nil { + return nil, err + } + + return app, nil +} diff --git a/service/service_windows_test.go b/service/service_windows_test.go new file mode 100644 index 00000000000..45de112b2d9 --- /dev/null +++ b/service/service_windows_test.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package service + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sys/windows/svc" + + "go.opentelemetry.io/collector/service/defaultcomponents" +) + +func TestWindowsService_Execute(t *testing.T) { + os.Args = []string{"otelcol", "--config", "testdata/otelcol-config-minimal.yaml"} + + factories, err := defaultcomponents.Components() + require.NoError(t, err) + + s := NewWindowsService(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}}) + + appDone := make(chan struct{}) + requests := make(chan svc.ChangeRequest) + changes := make(chan svc.Status) + go func() { + defer close(appDone) + b, errno := s.Execute([]string{"svc name"}, requests, changes) + assert.Equal(t, uint32(0), errno) + assert.Equal(t, false, b) + }() + + assert.Equal(t, svc.StartPending, (<-changes).State) + assert.Equal(t, svc.Running, (<-changes).State) + requests <- svc.ChangeRequest{Cmd: svc.Interrogate, CurrentStatus: svc.Status{State: svc.Running}} + assert.Equal(t, svc.Running, (<-changes).State) + requests <- svc.ChangeRequest{Cmd: svc.Stop} + assert.Equal(t, svc.StopPending, (<-changes).State) + assert.Equal(t, svc.Stopped, (<-changes).State) + <-appDone +} diff --git a/service/telemetry.go b/service/telemetry.go index 6d924bbfdc3..b71966a3e9a 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -40,7 +40,8 @@ var ( ) type appTelemetry struct { - views []*view.View + views []*view.View + server *http.Server } func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error { @@ -103,7 +104,13 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u go func() { mux := http.NewServeMux() mux.Handle("/metrics", pe) - serveErr := http.ListenAndServe(metricsAddr, mux) + + tel.server = &http.Server{ + Addr: metricsAddr, + Handler: mux, + } + + serveErr := tel.server.ListenAndServe() if serveErr != nil && serveErr != http.ErrServerClosed { asyncErrorChannel <- serveErr } @@ -114,6 +121,10 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u func (tel *appTelemetry) shutdown() { view.Unregister(tel.views...) + + if tel.server != nil { + _ = tel.server.Close() + } } func sanitizePrometheusKey(str string) string { diff --git a/service/testdata/otelcol-config-minimal.yaml b/service/testdata/otelcol-config-minimal.yaml new file mode 100644 index 00000000000..448973ae172 --- /dev/null +++ b/service/testdata/otelcol-config-minimal.yaml @@ -0,0 +1,13 @@ +receivers: + otlp: + +exporters: + otlp: + endpoint: "locahost:14250" + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [otlp] +