Skip to content

Commit

Permalink
Enable to Collector to be run as a Windows service
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed Jun 18, 2020
1 parent 8b9fe07 commit 9028089
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 22 deletions.
29 changes: 17 additions & 12 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@
package main

import (
"fmt"
"log"

"go.opentelemetry.io/collector/internal/version"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/defaultcomponents"
)

func main() {
handleErr := func(message string, err error) {
if err != nil {
log.Fatalf("%s: %v", message, err)
}
}

func applicationParameters() (service.Parameters, error) {
factories, err := defaultcomponents.Components()
handleErr("Failed to build default components", err)
if err != nil {
return service.Parameters{}, fmt.Errorf("failed to build default components: %v", err)
}

info := service.ApplicationStartInfo{
ExeName: "otelcol",
Expand All @@ -41,9 +38,17 @@ func main() {
GitHash: version.GitHash,
}

svc, err := service.New(service.Parameters{ApplicationStartInfo: info, Factories: factories})
handleErr("Failed to construct the application", err)
return service.Parameters{Factories: factories, ApplicationStartInfo: info}, nil
}

err = svc.Start()
handleErr("Application run finished with error", err)
func runInteractive(params service.Parameters) {
app, err := service.New(params)
if err != nil {
log.Fatalf("failed to construct the application: %v", err)
}

err = app.Start()
if err != nil {
log.Fatalf("application run finished with error: %v", err)
}
}
28 changes: 28 additions & 0 deletions cmd/otelcol/main_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows

package main

import "log"

func main() {
params, err := applicationParameters()
if err != nil {
log.Fatal(err)
}

runInteractive(params)
}
51 changes: 51 additions & 0 deletions cmd/otelcol/main_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build windows

package main

import (
"log"

"golang.org/x/sys/windows/svc"

"go.opentelemetry.io/collector/service"
)

func main() {
params, err := applicationParameters()
if err != nil {
log.Fatal(err)
}

isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
log.Fatalf("failed to determine if we are running in an interactive session: %v", err)
}

if isInteractive {
runInteractive(params)
} else {
runService(params)
}
}

func runService(params service.Parameters) {
// do not need to supply service name when startup is invoked through Service Control Manager directly
err := svc.Run("", service.NewWindowsService(params))
if err != nil {
log.Fatalf("failed to start service: %v", err)
}
}
4 changes: 2 additions & 2 deletions service/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func loggerFlags(flags *flag.FlagSet) {
loggerProfilePtr = flags.String(logProfileCfg, "", "Logging profile to use (dev, prod)")
}

func newLogger() (*zap.Logger, error) {
func newLogger(hooks ...func(zapcore.Entry) error) (*zap.Logger, error) {
var level zapcore.Level
err := (&level).UnmarshalText([]byte(*loggerLevelPtr))
if err != nil {
Expand All @@ -62,5 +62,5 @@ func newLogger() (*zap.Logger, error) {
}

conf.Level.SetLevel(level)
return conf.Build()
return conf.Build(zap.Hooks(hooks...))
}
20 changes: 13 additions & 7 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
Expand Down Expand Up @@ -80,6 +81,9 @@ type Application struct {
factories config.Factories
config *configmodels.Config

// signalsChannel is used to receive termination signals from the OS.
signalsChannel chan os.Signal

// stopTestChan is used to terminate the application in end to end tests.
stopTestChan chan struct{}

Expand Down Expand Up @@ -118,6 +122,8 @@ type Parameters struct {
// If it is not provided the default factory (FileLoaderConfigFactory) is used.
// The default factory loads the configuration specified as a command line flag.
ConfigFactory ConfigFactory
// LoggingHooks provides a way to supply a hook into logging events
LoggingHooks []func(zapcore.Entry) error
}

// ConfigFactory creates config.
Expand Down Expand Up @@ -156,7 +162,7 @@ func New(params Parameters) (*Application, error) {
Use: params.ApplicationStartInfo.ExeName,
Long: params.ApplicationStartInfo.LongName,
RunE: func(cmd *cobra.Command, args []string) error {
err := app.init()
err := app.init(params.LoggingHooks...)
if err != nil {
return err
}
Expand Down Expand Up @@ -228,8 +234,8 @@ func (app *Application) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.HandleFunc(path.Join(pathPrefix, extensionzPath), app.handleExtensionzRequest)
}

func (app *Application) init() error {
l, err := newLogger()
func (app *Application) init(hooks ...func(zapcore.Entry) error) error {
l, err := newLogger(hooks...)
if err != nil {
return errors.Wrap(err, "failed to get logger")
}
Expand All @@ -252,9 +258,9 @@ func (app *Application) setupTelemetry(ballastSizeBytes uint64) error {
func (app *Application) runAndWaitForShutdownEvent() {
app.logger.Info("Everything is ready. Begin running and processing data.")

// Plug SIGTERM signal into a channel.
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
// plug SIGTERM signal into a channel.
app.signalsChannel = make(chan os.Signal, 1)
signal.Notify(app.signalsChannel, os.Interrupt, syscall.SIGTERM)

// set the channel to stop testing.
app.stopTestChan = make(chan struct{})
Expand All @@ -264,7 +270,7 @@ func (app *Application) runAndWaitForShutdownEvent() {
select {
case err := <-app.asyncErrorChannel:
app.logger.Error("Asynchronous error received, terminating process", zap.Error(err))
case s := <-signalsChannel:
case s := <-app.signalsChannel:
app.logger.Info("Received signal from OS", zap.String("signal", s.String()))
case <-app.stopTestChan:
app.logger.Info("Received stop test request")
Expand Down
10 changes: 9 additions & 1 deletion service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand All @@ -42,7 +43,13 @@ func TestApplication_Start(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
loggingHookCalled := false
hook := func(entry zapcore.Entry) error {
loggingHookCalled = true
return nil
}

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}, LoggingHooks: []func(entry zapcore.Entry) error{hook}})
require.NoError(t, err)
assert.Equal(t, app.rootCmd, app.Command())

Expand All @@ -65,6 +72,7 @@ func TestApplication_Start(t *testing.T) {
assert.Equal(t, Running, <-app.GetStateChannel())
require.True(t, isAppAvailable(t, "http://localhost:13133"))
assert.Equal(t, app.logger, app.GetLogger())
assert.True(t, loggingHookCalled)

assertMetricsPrefix(t, testPrefix, metricsPort)

Expand Down
133 changes: 133 additions & 0 deletions service/service_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build windows

package service

import (
"fmt"
"log"
"syscall"

"go.uber.org/zap/zapcore"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/eventlog"
)

type WindowsService struct {
app *Application
params Parameters
}

func NewWindowsService(params Parameters) *WindowsService {
return &WindowsService{params: params}
}

func (s *WindowsService) Execute(args []string, requests <-chan svc.ChangeRequest, changes chan<- svc.Status) (ssec bool, errno uint32) {
if len(args) == 0 {
log.Fatal("expected first argument supplied to service.Execute to be the service name")
}

changes <- svc.Status{State: svc.StartPending}
s.start(args[0], s.params)
changes <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop | svc.AcceptShutdown}

for req := range requests {
switch req.Cmd {
case svc.Interrogate:
changes <- req.CurrentStatus
case svc.Stop, svc.Shutdown:
s.stop()
return
default:
log.Fatalf(fmt.Sprintf("unexpected control request #%d", req))
}
}

return
}

func (s *WindowsService) start(logSourceName string, params Parameters) {
var err error
s.app, err = newWithEventViewerLoggingHook(logSourceName, params)
if err != nil {
log.Fatal(err)
}

// app.Start blocks until receiving a SIGTERM signal, so we need to start it asynchronously
go func() {
err = s.app.Start()
if err != nil {
log.Fatalf("application run finished with error: %v", err)
}
}()

// wait until the app is in the Running state
for state := range s.app.GetStateChannel() {
if state == Running {
break
}
}
}

func (s *WindowsService) stop() {
// simulate a SIGTERM signal to terminate the application
s.app.signalsChannel <- syscall.SIGTERM

// wait until the app is in the Closed state
for state := range s.app.GetStateChannel() {
if state == Closed {
break
}
}

s.app = nil
}

func newWithEventViewerLoggingHook(serviceName string, params Parameters) (*Application, error) {
elog, err := eventlog.Open(serviceName)
if err != nil {
return nil, fmt.Errorf("failed to open event log: %v", err)
}

params.LoggingHooks = append(
params.LoggingHooks,
func(entry zapcore.Entry) error {
msg := fmt.Sprintf("%v\r\n\r\nStack Trace:\r\n%v", entry.Message, entry.Stack)

switch entry.Level {
case zapcore.FatalLevel, zapcore.PanicLevel, zapcore.DPanicLevel:
// golang.org/x/sys/windows/svc/eventlog does not support Critical level event logs
return elog.Error(3, msg)
case zapcore.ErrorLevel:
return elog.Error(3, msg)
case zapcore.WarnLevel:
return elog.Warning(2, msg)
case zapcore.InfoLevel:
return elog.Info(1, msg)
}

// ignore Debug level logs
return nil
},
)

app, err := New(params)
if err != nil {
return nil, err
}

return app, nil
}

0 comments on commit 9028089

Please sign in to comment.