Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Collector to be run as a Windows service #1120

Merged
merged 1 commit into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@ package main
import (
"log"

"github.com/pkg/errors"

"go.opentelemetry.io/collector/internal/version"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/defaultcomponents"
)

func main() {
handleErr := func(message string, err error) {
if err != nil {
log.Fatalf("%s: %v", message, err)
}
}

factories, err := defaultcomponents.Components()
handleErr("Failed to build default components", err)
if err != nil {
log.Fatalf("failed to build default components: %v", err)
}

info := service.ApplicationStartInfo{
ExeName: "otelcol",
Expand All @@ -41,9 +39,21 @@ func main() {
GitHash: version.GitHash,
}

svc, err := service.New(service.Parameters{ApplicationStartInfo: info, Factories: factories})
handleErr("Failed to construct the application", err)
if err := run(service.Parameters{ApplicationStartInfo: info, Factories: factories}); err != nil {
log.Fatal(err)
}
}

func runInteractive(params service.Parameters) error {
app, err := service.New(params)
if err != nil {
return errors.Wrap(err, "failed to construct the application")
}

err = app.Start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rant: we should really rename this function to Run (change #615 was rejected, but there were quite a few other breaking changes in the codebase since then).

Copy link
Member Author

@james-bebbington james-bebbington Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this is admittedly particularly confusing in this PR since we wait for the response of app.Start in the windowsservice.stop function. But if we rename this, I think it should be done in a separate PR

if err != nil {
return errors.Wrap(err, "application run finished with error: %v")
}

err = svc.Start()
handleErr("Application run finished with error", err)
return nil
}
23 changes: 23 additions & 0 deletions cmd/otelcol/main_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows

package main

import "go.opentelemetry.io/collector/service"

func run(params service.Parameters) error {
return runInteractive(params)
}
46 changes: 46 additions & 0 deletions cmd/otelcol/main_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build windows

package main

import (
"github.com/pkg/errors"
"golang.org/x/sys/windows/svc"

"go.opentelemetry.io/collector/service"
)

func run(params service.Parameters) error {
isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
return errors.Wrap(err, "failed to determine if we are running in an interactive session")
}

if isInteractive {
return runInteractive(params)
} else {
return runService(params)
}
}

func runService(params service.Parameters) error {
// do not need to supply service name when startup is invoked through Service Control Manager directly
if err := svc.Run("", service.NewWindowsService(params)); err != nil {
return errors.Wrap(err, "failed to start service")
}

return nil
}
4 changes: 2 additions & 2 deletions service/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func loggerFlags(flags *flag.FlagSet) {
loggerProfilePtr = flags.String(logProfileCfg, "", "Logging profile to use (dev, prod)")
}

func newLogger() (*zap.Logger, error) {
func newLogger(hooks ...func(zapcore.Entry) error) (*zap.Logger, error) {
var level zapcore.Level
err := (&level).UnmarshalText([]byte(*loggerLevelPtr))
if err != nil {
Expand All @@ -62,5 +62,5 @@ func newLogger() (*zap.Logger, error) {
}

conf.Level.SetLevel(level)
return conf.Build()
return conf.Build(zap.Hooks(hooks...))
}
27 changes: 17 additions & 10 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
Expand Down Expand Up @@ -83,6 +84,9 @@ type Application struct {
// stopTestChan is used to terminate the application in end to end tests.
stopTestChan chan struct{}

// signalsChannel is used to receive termination signals from the OS.
signalsChannel chan os.Signal

// asyncErrorChannel is used to signal a fatal error from any component.
asyncErrorChannel chan error
}
Expand Down Expand Up @@ -118,6 +122,8 @@ type Parameters struct {
// If it is not provided the default factory (FileLoaderConfigFactory) is used.
// The default factory loads the configuration specified as a command line flag.
ConfigFactory ConfigFactory
// LoggingHooks provides a way to supply a hook into logging events
LoggingHooks []func(zapcore.Entry) error
}

// ConfigFactory creates config.
Expand Down Expand Up @@ -156,7 +162,7 @@ func New(params Parameters) (*Application, error) {
Use: params.ApplicationStartInfo.ExeName,
Long: params.ApplicationStartInfo.LongName,
RunE: func(cmd *cobra.Command, args []string) error {
err := app.init()
err := app.init(params.LoggingHooks...)
if err != nil {
return err
}
Expand Down Expand Up @@ -237,8 +243,8 @@ func (app *Application) SignalTestComplete() {
close(app.stopTestChan)
}

func (app *Application) init() error {
l, err := newLogger()
func (app *Application) init(hooks ...func(zapcore.Entry) error) error {
l, err := newLogger(hooks...)
if err != nil {
return errors.Wrap(err, "failed to get logger")
}
Expand All @@ -261,19 +267,17 @@ func (app *Application) setupTelemetry(ballastSizeBytes uint64) error {
func (app *Application) runAndWaitForShutdownEvent() {
app.logger.Info("Everything is ready. Begin running and processing data.")

// Plug SIGTERM signal into a channel.
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
// plug SIGTERM signal into a channel.
app.signalsChannel = make(chan os.Signal, 1)
signal.Notify(app.signalsChannel, os.Interrupt, syscall.SIGTERM)

// set the channel to stop testing.
app.stopTestChan = make(chan struct{})
// notify tests that it is ready.

app.stateChannel <- Running
select {
case err := <-app.asyncErrorChannel:
app.logger.Error("Asynchronous error received, terminating process", zap.Error(err))
case s := <-signalsChannel:
case s := <-app.signalsChannel:
app.logger.Info("Received signal from OS", zap.String("signal", s.String()))
case <-app.stopTestChan:
app.logger.Info("Received stop test request")
Expand Down Expand Up @@ -462,7 +466,10 @@ func (app *Application) execute(ctx context.Context, factory ConfigFactory) erro
errs = append(errs, errors.Wrap(err, "failed to shutdown extensions"))
}

AppTelemetry.shutdown()
err = AppTelemetry.shutdown()
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to shutdown extensions"))
}

app.logger.Info("Shutdown complete.")
app.stateChannel <- Closed
Expand Down
52 changes: 50 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"sort"
"strconv"
"strings"
"syscall"
"testing"

"github.com/pkg/errors"
"github.com/prometheus/common/expfmt"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand All @@ -43,7 +46,13 @@ func TestApplication_Start(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
loggingHookCalled := false
hook := func(entry zapcore.Entry) error {
loggingHookCalled = true
return nil
}

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}, LoggingHooks: []func(entry zapcore.Entry) error{hook}})
require.NoError(t, err)
assert.Equal(t, app.rootCmd, app.Command())

Expand All @@ -65,6 +74,7 @@ func TestApplication_Start(t *testing.T) {
assert.Equal(t, Running, <-app.GetStateChannel())
require.True(t, isAppAvailable(t, "http://localhost:13133"))
assert.Equal(t, app.logger, app.GetLogger())
assert.True(t, loggingHookCalled)

// All labels added to all collector metrics by default are listed below.
// These labels are hard coded here in order to avoid inadvertent changes:
Expand All @@ -77,7 +87,45 @@ func TestApplication_Start(t *testing.T) {
}
assertMetrics(t, testPrefix, metricsPort, mandatoryLabels)

close(app.stopTestChan)
app.signalsChannel <- syscall.SIGTERM
<-appDone
assert.Equal(t, Closing, <-app.GetStateChannel())
assert.Equal(t, Closed, <-app.GetStateChannel())
}

type mockAppTelemetry struct{}

func (tel *mockAppTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error {
return nil
}

func (tel *mockAppTelemetry) shutdown() error {
return errors.New("err1")
}

func TestApplication_ReportError(t *testing.T) {
// use a mock AppTelemetry struct to return an error on shutdown
preservedAppTelemetry := AppTelemetry
AppTelemetry = &mockAppTelemetry{}
defer func() { AppTelemetry = preservedAppTelemetry }()

factories, err := defaultcomponents.Components()
require.NoError(t, err)

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
require.NoError(t, err)

app.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})

appDone := make(chan struct{})
go func() {
defer close(appDone)
assert.EqualError(t, app.Start(), "failed to shutdown extensions: err1")
}()

assert.Equal(t, Starting, <-app.GetStateChannel())
assert.Equal(t, Running, <-app.GetStateChannel())
app.ReportFatalError(errors.New("err2"))
<-appDone
assert.Equal(t, Closing, <-app.GetStateChannel())
assert.Equal(t, Closed, <-app.GetStateChannel())
Expand Down
Loading