From 6ca71bcdd156d719240abdb98d302b70378712e0 Mon Sep 17 00:00:00 2001
From: James Bebbington <jbebbington@google.com>
Date: Mon, 15 Jun 2020 17:22:30 +1000
Subject: [PATCH] Enable to Collector to be run as a Windows service

---
 cmd/otelcol/main.go                          |  32 +++--
 cmd/otelcol/main_others.go                   |  23 ++++
 cmd/otelcol/main_windows.go                  |  46 +++++++
 service/logger.go                            |   4 +-
 service/service.go                           |  22 +--
 service/service_test.go                      |  34 ++++-
 service/service_windows.go                   | 135 +++++++++++++++++++
 service/service_windows_test.go              |  56 ++++++++
 service/telemetry.go                         |  15 ++-
 service/testdata/otelcol-config-minimal.yaml |  13 ++
 10 files changed, 357 insertions(+), 23 deletions(-)
 create mode 100644 cmd/otelcol/main_others.go
 create mode 100644 cmd/otelcol/main_windows.go
 create mode 100644 service/service_windows.go
 create mode 100644 service/service_windows_test.go
 create mode 100644 service/testdata/otelcol-config-minimal.yaml

diff --git a/cmd/otelcol/main.go b/cmd/otelcol/main.go
index f40a39532f0..32ffbb51786 100644
--- a/cmd/otelcol/main.go
+++ b/cmd/otelcol/main.go
@@ -17,6 +17,7 @@
 package main
 
 import (
+	"fmt"
 	"log"
 
 	"go.opentelemetry.io/collector/internal/version"
@@ -25,14 +26,19 @@ import (
 )
 
 func main() {
-	handleErr := func(message string, err error) {
-		if err != nil {
-			log.Fatalf("%s: %v", message, err)
-		}
+	params, err := applicationParameters()
+	if err != nil {
+		log.Fatal(err)
 	}
 
+	run(params)
+}
+
+func applicationParameters() (service.Parameters, error) {
 	factories, err := defaultcomponents.Components()
-	handleErr("Failed to build default components", err)
+	if err != nil {
+		return service.Parameters{}, fmt.Errorf("failed to build default components: %v", err)
+	}
 
 	info := service.ApplicationStartInfo{
 		ExeName:  "otelcol",
@@ -41,9 +47,17 @@ func main() {
 		GitHash:  version.GitHash,
 	}
 
-	svc, err := service.New(service.Parameters{ApplicationStartInfo: info, Factories: factories})
-	handleErr("Failed to construct the application", err)
+	return service.Parameters{Factories: factories, ApplicationStartInfo: info}, nil
+}
 
-	err = svc.Start()
-	handleErr("Application run finished with error", err)
+func runInteractive(params service.Parameters) {
+	app, err := service.New(params)
+	if err != nil {
+		log.Fatalf("failed to construct the application: %v", err)
+	}
+
+	err = app.Start()
+	if err != nil {
+		log.Fatalf("application run finished with error: %v", err)
+	}
 }
diff --git a/cmd/otelcol/main_others.go b/cmd/otelcol/main_others.go
new file mode 100644
index 00000000000..72ea43e6514
--- /dev/null
+++ b/cmd/otelcol/main_others.go
@@ -0,0 +1,23 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !windows
+
+package main
+
+import "go.opentelemetry.io/collector/service"
+
+func run(params service.Parameters) {
+	runInteractive(params)
+}
diff --git a/cmd/otelcol/main_windows.go b/cmd/otelcol/main_windows.go
new file mode 100644
index 00000000000..5cdcfb02494
--- /dev/null
+++ b/cmd/otelcol/main_windows.go
@@ -0,0 +1,46 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build windows
+
+package main
+
+import (
+	"log"
+
+	"golang.org/x/sys/windows/svc"
+
+	"go.opentelemetry.io/collector/service"
+)
+
+func run(params service.Parameters) {
+	isInteractive, err := svc.IsAnInteractiveSession()
+	if err != nil {
+		log.Fatalf("failed to determine if we are running in an interactive session: %v", err)
+	}
+
+	if isInteractive {
+		runInteractive(params)
+	} else {
+		runService(params)
+	}
+}
+
+func runService(params service.Parameters) {
+	// do not need to supply service name when startup is invoked through Service Control Manager directly
+	err := svc.Run("", service.NewWindowsService(params))
+	if err != nil {
+		log.Fatalf("failed to start service: %v", err)
+	}
+}
diff --git a/service/logger.go b/service/logger.go
index 38b22c911e5..1246181611c 100644
--- a/service/logger.go
+++ b/service/logger.go
@@ -39,7 +39,7 @@ func loggerFlags(flags *flag.FlagSet) {
 	loggerProfilePtr = flags.String(logProfileCfg, "", "Logging profile to use (dev, prod)")
 }
 
-func newLogger() (*zap.Logger, error) {
+func newLogger(hooks ...func(zapcore.Entry) error) (*zap.Logger, error) {
 	var level zapcore.Level
 	err := (&level).UnmarshalText([]byte(*loggerLevelPtr))
 	if err != nil {
@@ -62,5 +62,5 @@ func newLogger() (*zap.Logger, error) {
 	}
 
 	conf.Level.SetLevel(level)
-	return conf.Build()
+	return conf.Build(zap.Hooks(hooks...))
 }
diff --git a/service/service.go b/service/service.go
index 2da897deda9..e1aadc9aab1 100644
--- a/service/service.go
+++ b/service/service.go
@@ -32,6 +32,7 @@ import (
 	"github.com/spf13/cobra"
 	"github.com/spf13/viper"
 	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
 
 	"go.opentelemetry.io/collector/component"
 	"go.opentelemetry.io/collector/component/componenterror"
@@ -83,6 +84,9 @@ type Application struct {
 	// stopTestChan is used to terminate the application in end to end tests.
 	stopTestChan chan struct{}
 
+	// signalsChannel is used to receive termination signals from the OS.
+	signalsChannel chan os.Signal
+
 	// asyncErrorChannel is used to signal a fatal error from any component.
 	asyncErrorChannel chan error
 }
@@ -118,6 +122,8 @@ type Parameters struct {
 	// If it is not provided the default factory (FileLoaderConfigFactory) is used.
 	// The default factory loads the configuration specified as a command line flag.
 	ConfigFactory ConfigFactory
+	// LoggingHooks provides a way to supply a hook into logging events
+	LoggingHooks []func(zapcore.Entry) error
 }
 
 // ConfigFactory creates config.
@@ -156,7 +162,7 @@ func New(params Parameters) (*Application, error) {
 		Use:  params.ApplicationStartInfo.ExeName,
 		Long: params.ApplicationStartInfo.LongName,
 		RunE: func(cmd *cobra.Command, args []string) error {
-			err := app.init()
+			err := app.init(params.LoggingHooks...)
 			if err != nil {
 				return err
 			}
@@ -237,8 +243,8 @@ func (app *Application) SignalTestComplete() {
 	close(app.stopTestChan)
 }
 
-func (app *Application) init() error {
-	l, err := newLogger()
+func (app *Application) init(hooks ...func(zapcore.Entry) error) error {
+	l, err := newLogger(hooks...)
 	if err != nil {
 		return errors.Wrap(err, "failed to get logger")
 	}
@@ -261,19 +267,17 @@ func (app *Application) setupTelemetry(ballastSizeBytes uint64) error {
 func (app *Application) runAndWaitForShutdownEvent() {
 	app.logger.Info("Everything is ready. Begin running and processing data.")
 
-	// Plug SIGTERM signal into a channel.
-	signalsChannel := make(chan os.Signal, 1)
-	signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
+	// plug SIGTERM signal into a channel.
+	app.signalsChannel = make(chan os.Signal, 1)
+	signal.Notify(app.signalsChannel, os.Interrupt, syscall.SIGTERM)
 
 	// set the channel to stop testing.
 	app.stopTestChan = make(chan struct{})
-	// notify tests that it is ready.
-
 	app.stateChannel <- Running
 	select {
 	case err := <-app.asyncErrorChannel:
 		app.logger.Error("Asynchronous error received, terminating process", zap.Error(err))
-	case s := <-signalsChannel:
+	case s := <-app.signalsChannel:
 		app.logger.Info("Received signal from OS", zap.String("signal", s.String()))
 	case <-app.stopTestChan:
 		app.logger.Info("Received stop test request")
diff --git a/service/service_test.go b/service/service_test.go
index 77026c2a48d..b6fa3064035 100644
--- a/service/service_test.go
+++ b/service/service_test.go
@@ -26,10 +26,12 @@ import (
 	"strings"
 	"testing"
 
+	"github.com/pkg/errors"
 	"github.com/spf13/viper"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
 
 	"go.opentelemetry.io/collector/component"
 	"go.opentelemetry.io/collector/config"
@@ -43,7 +45,13 @@ func TestApplication_Start(t *testing.T) {
 	factories, err := defaultcomponents.Components()
 	require.NoError(t, err)
 
-	app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
+	loggingHookCalled := false
+	hook := func(entry zapcore.Entry) error {
+		loggingHookCalled = true
+		return nil
+	}
+
+	app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}, LoggingHooks: []func(entry zapcore.Entry) error{hook}})
 	require.NoError(t, err)
 	assert.Equal(t, app.rootCmd, app.Command())
 
@@ -66,6 +74,7 @@ func TestApplication_Start(t *testing.T) {
 	assert.Equal(t, Running, <-app.GetStateChannel())
 	require.True(t, isAppAvailable(t, "http://localhost:13133"))
 	assert.Equal(t, app.logger, app.GetLogger())
+	assert.True(t, loggingHookCalled)
 
 	assertMetricsPrefix(t, testPrefix, metricsPort)
 
@@ -75,6 +84,29 @@ func TestApplication_Start(t *testing.T) {
 	assert.Equal(t, Closed, <-app.GetStateChannel())
 }
 
+func TestApplication_ReportError(t *testing.T) {
+	factories, err := defaultcomponents.Components()
+	require.NoError(t, err)
+
+	app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
+	require.NoError(t, err)
+
+	app.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})
+
+	appDone := make(chan struct{})
+	go func() {
+		defer close(appDone)
+		assert.NoError(t, app.Start())
+	}()
+
+	assert.Equal(t, Starting, <-app.GetStateChannel())
+	assert.Equal(t, Running, <-app.GetStateChannel())
+	app.ReportFatalError(errors.New("err1"))
+	<-appDone
+	assert.Equal(t, Closing, <-app.GetStateChannel())
+	assert.Equal(t, Closed, <-app.GetStateChannel())
+}
+
 func TestApplication_StartAsGoRoutine(t *testing.T) {
 	factories, err := defaultcomponents.Components()
 	require.NoError(t, err)
diff --git a/service/service_windows.go b/service/service_windows.go
new file mode 100644
index 00000000000..ce9c58f9594
--- /dev/null
+++ b/service/service_windows.go
@@ -0,0 +1,135 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build windows
+
+package service
+
+import (
+	"fmt"
+	"log"
+	"syscall"
+
+	"go.uber.org/zap/zapcore"
+	"golang.org/x/sys/windows/svc"
+	"golang.org/x/sys/windows/svc/eventlog"
+)
+
+type WindowsService struct {
+	app    *Application
+	params Parameters
+}
+
+func NewWindowsService(params Parameters) *WindowsService {
+	return &WindowsService{params: params}
+}
+
+func (s *WindowsService) Execute(args []string, requests <-chan svc.ChangeRequest, changes chan<- svc.Status) (ssec bool, errno uint32) {
+	if len(args) == 0 {
+		log.Fatal("expected first argument supplied to service.Execute to be the service name")
+	}
+
+	changes <- svc.Status{State: svc.StartPending}
+	s.start(args[0], s.params)
+	changes <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop | svc.AcceptShutdown}
+
+	for req := range requests {
+		switch req.Cmd {
+		case svc.Interrogate:
+			changes <- req.CurrentStatus
+		case svc.Stop, svc.Shutdown:
+			changes <- svc.Status{State: svc.StopPending}
+			s.stop()
+			changes <- svc.Status{State: svc.Stopped}
+			return
+		default:
+			log.Fatalf(fmt.Sprintf("unexpected control request #%d", req))
+		}
+	}
+
+	return
+}
+
+func (s *WindowsService) start(logSourceName string, params Parameters) {
+	var err error
+	s.app, err = newWithEventViewerLoggingHook(logSourceName, params)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	// app.Start blocks until receiving a SIGTERM signal, so we need to start it asynchronously
+	go func() {
+		err = s.app.Start()
+		if err != nil {
+			log.Fatalf("application run finished with error: %v", err)
+		}
+	}()
+
+	// wait until the app is in the Running state
+	for state := range s.app.GetStateChannel() {
+		if state == Running {
+			break
+		}
+	}
+}
+
+func (s *WindowsService) stop() {
+	// simulate a SIGTERM signal to terminate the application
+	s.app.signalsChannel <- syscall.SIGTERM
+
+	// wait until the app is in the Closed state
+	for state := range s.app.GetStateChannel() {
+		if state == Closed {
+			break
+		}
+	}
+
+	s.app = nil
+}
+
+func newWithEventViewerLoggingHook(serviceName string, params Parameters) (*Application, error) {
+	elog, err := eventlog.Open(serviceName)
+	if err != nil {
+		return nil, fmt.Errorf("failed to open event log: %v", err)
+	}
+
+	params.LoggingHooks = append(
+		params.LoggingHooks,
+		func(entry zapcore.Entry) error {
+			msg := fmt.Sprintf("%v\r\n\r\nStack Trace:\r\n%v", entry.Message, entry.Stack)
+
+			switch entry.Level {
+			case zapcore.FatalLevel, zapcore.PanicLevel, zapcore.DPanicLevel:
+				// golang.org/x/sys/windows/svc/eventlog does not support Critical level event logs
+				return elog.Error(3, msg)
+			case zapcore.ErrorLevel:
+				return elog.Error(3, msg)
+			case zapcore.WarnLevel:
+				return elog.Warning(2, msg)
+			case zapcore.InfoLevel:
+				return elog.Info(1, msg)
+			}
+
+			// ignore Debug level logs
+			return nil
+		},
+	)
+
+	app, err := New(params)
+	if err != nil {
+		return nil, err
+	}
+
+	return app, nil
+}
diff --git a/service/service_windows_test.go b/service/service_windows_test.go
new file mode 100644
index 00000000000..45de112b2d9
--- /dev/null
+++ b/service/service_windows_test.go
@@ -0,0 +1,56 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build windows
+
+package service
+
+import (
+	"os"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"golang.org/x/sys/windows/svc"
+
+	"go.opentelemetry.io/collector/service/defaultcomponents"
+)
+
+func TestWindowsService_Execute(t *testing.T) {
+	os.Args = []string{"otelcol", "--config", "testdata/otelcol-config-minimal.yaml"}
+
+	factories, err := defaultcomponents.Components()
+	require.NoError(t, err)
+
+	s := NewWindowsService(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
+
+	appDone := make(chan struct{})
+	requests := make(chan svc.ChangeRequest)
+	changes := make(chan svc.Status)
+	go func() {
+		defer close(appDone)
+		b, errno := s.Execute([]string{"svc name"}, requests, changes)
+		assert.Equal(t, uint32(0), errno)
+		assert.Equal(t, false, b)
+	}()
+
+	assert.Equal(t, svc.StartPending, (<-changes).State)
+	assert.Equal(t, svc.Running, (<-changes).State)
+	requests <- svc.ChangeRequest{Cmd: svc.Interrogate, CurrentStatus: svc.Status{State: svc.Running}}
+	assert.Equal(t, svc.Running, (<-changes).State)
+	requests <- svc.ChangeRequest{Cmd: svc.Stop}
+	assert.Equal(t, svc.StopPending, (<-changes).State)
+	assert.Equal(t, svc.Stopped, (<-changes).State)
+	<-appDone
+}
diff --git a/service/telemetry.go b/service/telemetry.go
index 6d924bbfdc3..b71966a3e9a 100644
--- a/service/telemetry.go
+++ b/service/telemetry.go
@@ -40,7 +40,8 @@ var (
 )
 
 type appTelemetry struct {
-	views []*view.View
+	views  []*view.View
+	server *http.Server
 }
 
 func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error {
@@ -103,7 +104,13 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
 	go func() {
 		mux := http.NewServeMux()
 		mux.Handle("/metrics", pe)
-		serveErr := http.ListenAndServe(metricsAddr, mux)
+
+		tel.server = &http.Server{
+			Addr:    metricsAddr,
+			Handler: mux,
+		}
+
+		serveErr := tel.server.ListenAndServe()
 		if serveErr != nil && serveErr != http.ErrServerClosed {
 			asyncErrorChannel <- serveErr
 		}
@@ -114,6 +121,10 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
 
 func (tel *appTelemetry) shutdown() {
 	view.Unregister(tel.views...)
+
+	if tel.server != nil {
+		_ = tel.server.Close()
+	}
 }
 
 func sanitizePrometheusKey(str string) string {
diff --git a/service/testdata/otelcol-config-minimal.yaml b/service/testdata/otelcol-config-minimal.yaml
new file mode 100644
index 00000000000..448973ae172
--- /dev/null
+++ b/service/testdata/otelcol-config-minimal.yaml
@@ -0,0 +1,13 @@
+receivers:
+  otlp:
+
+exporters:
+  otlp:
+    endpoint: "locahost:14250"
+
+service:
+  pipelines:
+    traces:
+      receivers: [otlp]
+      exporters: [otlp]
+