From 09d0295110cb7e571deaf26647b3364679fa51dc Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 15 Nov 2022 16:02:53 -0500 Subject: [PATCH] Introduce component status reporting This is an alternate to https://github.com/open-telemetry/opentelemetry-collector/pull/6550 ## Summary of changes - Add component status concept. Components can report their status via Host.ReportComponentStatus(). Interested extensions can watch status events if they implement StatusWatcher interface. This is similar to how PipelineWatcher works today. - Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus(). ## TODO after this is merged - healthcheck extension must implement StatusWatcher. - Replace all ReportFatalError() calls by ReportComponentStatus() calls in core and contrib. ## Open Questions - StatusWatchers need to be able to tell if all current components are healthy. It is assumed that the listeners need to maintain a map of components and track the status of each component. This works only if we assume that the set of components cannot change during the lifetime of the listener. This assumption is true today but can change in the future if we introduce partial pipeline restarts where only modified/added/removed components are recreated (this will break listener's assumption and the map will become invalid). Should we instead keep track of this entire status map in the Host and broadcast the entire status to the listeners as a whole instead of (or in addition to) individual component events? --- component/componenttest/nop_host.go | 2 + component/host.go | 6 + component/status.go | 106 ++++++++++++++++++ service/collector.go | 4 +- service/extensions/extensions.go | 29 ++++- service/host.go | 10 +- service/internal/components/host_wrapper.go | 14 ++- .../internal/components/host_wrapper_test.go | 9 +- service/internal/pipelines/pipelines.go | 29 ++++- service/internal/pipelines/pipelines_test.go | 9 +- service/internal/servicehost/host.go | 38 +++++++ service/internal/servicehost/nop_host.go | 45 ++++++++ 12 files changed, 283 insertions(+), 18 deletions(-) create mode 100644 component/status.go create mode 100644 service/internal/servicehost/host.go create mode 100644 service/internal/servicehost/nop_host.go diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index b535b674af6..c24584b2b2a 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -28,6 +28,8 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} +func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {} + func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } diff --git a/component/host.go b/component/host.go index 92c395b0306..12d703cf542 100644 --- a/component/host.go +++ b/component/host.go @@ -23,8 +23,14 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError) ReportFatalError(err error) + // ReportComponentStatus can be used by a component to communicate its status to the Host. + // The Host implementations may broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(event *StatusEvent) + // GetFactory of the specified kind. Returns the factory for a component type. // This allows components to create other components. For example: // func (r MyReceiver) Start(host component.Host) error { diff --git a/component/status.go b/component/status.go new file mode 100644 index 00000000000..a4667929809 --- /dev/null +++ b/component/status.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "errors" + "time" +) + +type Status int + +const ( + StatusOK Status = iota + StatusError +) + +// StatusSource component that reports a status about itself. +// The implementation of this interface must be comparable to be useful as a map key. +type StatusSource interface { + ID() ID +} + +type StatusEvent struct { + timestamp time.Time + status Status + err error +} + +func (ev *StatusEvent) Status() Status { + return ev.status +} + +// Timestamp returns the time of the event. +func (ev *StatusEvent) Timestamp() time.Time { + return ev.timestamp +} + +// Err returns the error associated with the ComponentEvent. +func (ev *StatusEvent) Err() error { + return ev.err +} + +// statusEventOption applies options to a StatusEvent. +type statusEventOption func(*StatusEvent) error + +// WithTimestamp sets the timestamp for a ComponentEvent. +func WithTimestamp(timestamp time.Time) statusEventOption { + return func(o *StatusEvent) error { + o.timestamp = timestamp + return nil + } +} + +// WithError sets the error object of the Event. It is optional +// and should only be applied to an Event of type ComponentError. +func WithError(err error) statusEventOption { + return func(o *StatusEvent) error { + if o.status == StatusOK { + return errors.New("event with ComponentOK cannot have an error") + } + o.err = err + return nil + } +} + +// NewStatusEvent creates and returns a StatusEvent with default and provided +// options. Will return an error if an error is provided for a non-error event +// type (status.ComponentOK). +// If the timestamp is not provided will set it to time.Now(). +func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { + ev := StatusEvent{ + status: status, + } + + for _, opt := range options { + if err := opt(&ev); err != nil { + return nil, err + } + } + + if ev.timestamp.IsZero() { + ev.timestamp = time.Now() + } + + return &ev, nil +} + +// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry +// Collector that is to be implemented by extensions interested in changes to component +// status. +type StatusWatcher interface { + // StatusChanged notifies about a change in the source component status. + StatusChanged(timestamp time.Time, source StatusSource, event *StatusEvent) +} diff --git a/service/collector.go b/service/collector.go index 4250e9874a5..abcaeaa4422 100644 --- a/service/collector.go +++ b/service/collector.go @@ -28,9 +28,9 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service/internal/grpclog" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // State defines Collector's state. @@ -278,7 +278,7 @@ func (col *Collector) setCollectorState(state State) { col.state.Store(int32(state)) } -func getBallastSize(host component.Host) uint64 { +func getBallastSize(host servicehost.Host) uint64 { var ballastSize uint64 extensions := host.GetExtensions() for _, extension := range extensions { diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 1e20f430290..9c352f2b00f 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -19,12 +19,14 @@ import ( "fmt" "net/http" "sort" + "time" "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -36,13 +38,26 @@ type Extensions struct { extMap map[component.ID]component.Extension } +type statusReportingExtension struct { + id component.ID +} + +func (s *statusReportingExtension) GetKind() component.Kind { + return component.KindExtension +} + +func (s *statusReportingExtension) ID() component.ID { + return s.id +} + // Start starts all extensions. -func (bes *Extensions) Start(ctx context.Context, host component.Host) error { +func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for extID, ext := range bes.extMap { extLogger := extensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + statusSource := &statusReportingExtension{extID} + if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -83,6 +98,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } +func (bes *Extensions) NotifyStatusChange(timestamp time.Time, source component.StatusSource, event *component.StatusEvent) error { + var errs error + for _, ext := range bes.extMap { + if pw, ok := ext.(component.StatusWatcher); ok { + pw.StatusChanged(timestamp, source, event) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Extension { result := make(map[component.ID]component.Extension, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/host.go b/service/host.go index 31be010608d..76c4d5b74d1 100644 --- a/service/host.go +++ b/service/host.go @@ -15,12 +15,15 @@ package service // import "go.opentelemetry.io/collector/service" import ( + "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/pipelines" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -var _ component.Host = (*serviceHost)(nil) +var _ servicehost.Host = (*serviceHost)(nil) type serviceHost struct { asyncErrorChannel chan error @@ -34,10 +37,15 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.65.0] Replaced by ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } +func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { + host.extensions.NotifyStatusChange(time.Now(), source, event) +} + func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 78428e4e2f0..67beaf0b981 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -20,17 +20,21 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. +// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { - component.Host + servicehost.Host + component component.StatusSource *zap.Logger } -func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host { return &hostWrapper{ host, + component, logger, } } @@ -41,6 +45,12 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) } +var emptyComponentID = component.ID{} + +func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) { + hw.Host.ReportComponentStatus(hw.component, event) +} + // RegisterZPages is used by zpages extension to register handles from service. // When the wrapper is passed to the extension it won't be successful when casting // the interface, for the time being expose the interface here. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 720669bd847..0738df63600 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -18,12 +18,17 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" - "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) func Test_newHostWrapper(t *testing.T) { - hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop()) + hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) + ev, err := component.NewStatusEvent(component.StatusOK) + assert.NoError(t, err) + hw.ReportComponentStatus(ev) } diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 24907012a86..bcb6e0879a0 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/fanoutconsumer" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -65,18 +66,32 @@ type Pipelines struct { pipelines map[component.ID]*builtPipeline } +type statusReportingComponent struct { + kind component.Kind + id component.ID +} + +func (s *statusReportingComponent) GetKind() component.Kind { + return s.kind +} + +func (s *statusReportingComponent) ID() component.ID { + return s.id +} + // StartAll starts all pipelines. // // Start with exporters, processors (in reverse configured order), then receivers. // This is important so that components that are earlier in the pipeline and reference components that are // later in the pipeline do not start sending data to later components which are not yet started. -func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { +func (bps *Pipelines) StartAll(ctx context.Context, host servicehost.Host) error { bps.telemetry.Logger.Info("Starting exporters...") for dt, expByID := range bps.allExporters { for expID, exp := range expByID { expLogger := exporterLogger(bps.telemetry.Logger, expID, dt) expLogger.Info("Exporter is starting...") - if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindExporter, expID} + if err := exp.Start(ctx, components.NewHostWrapper(host, statusSource, expLogger)); err != nil { return err } expLogger.Info("Exporter started.") @@ -86,9 +101,12 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { bps.telemetry.Logger.Info("Starting processors...") for pipelineID, bp := range bps.pipelines { for i := len(bp.processors) - 1; i >= 0; i-- { - procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID) + processor := bp.processors[i] + procID := processor.id + procLogger := processorLogger(bps.telemetry.Logger, procID, pipelineID) procLogger.Info("Processor is starting...") - if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindProcessor, procID} + if err := processor.comp.Start(ctx, components.NewHostWrapper(host, statusSource, procLogger)); err != nil { return err } procLogger.Info("Processor started.") @@ -100,7 +118,8 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { for recvID, recv := range recvByID { recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt) recvLogger.Info("Receiver is starting...") - if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindReceiver, recvID} + if err := recv.Start(ctx, components.NewHostWrapper(host, statusSource, recvLogger)); err != nil { return err } recvLogger.Info("Receiver started.") diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go index 2bc2ed63096..d92f05db19f 100644 --- a/service/internal/pipelines/pipelines_test.go +++ b/service/internal/pipelines/pipelines_test.go @@ -32,6 +32,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/service/internal/configunmarshaler" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/testcomponents" ) @@ -95,7 +96,7 @@ func TestBuild(t *testing.T) { pipelines, err := Build(context.Background(), toSettings(factories, cfg)) assert.NoError(t, err) - assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) // Verify exporters created, started and empty. for _, expID := range test.exporterIDs { @@ -311,7 +312,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -325,7 +326,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -339,7 +340,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go new file mode 100644 index 00000000000..754f483e49c --- /dev/null +++ b/service/internal/servicehost/host.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// Host mirrors component.Host interface, with one important difference: servicehost.Host +// is not associated with a component and thus ReportComponentStatus() requires the source +// component to be explicitly specified. +type Host interface { + // ReportComponentStatus is used to communicate the status of a source component to the Host. + // The Host implementations will broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) + + // See component.Host for the documentation of the rest of the functions. + + // Deprecated: [0.65.0] Replaced by ReportComponentStatus. + ReportFatalError(err error) + + GetFactory(kind component.Kind, componentType component.Type) component.Factory + GetExtensions() map[component.ID]component.Extension + GetExporters() map[component.DataType]map[component.ID]component.Exporter +} diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go new file mode 100644 index 00000000000..7a5717624fb --- /dev/null +++ b/service/internal/servicehost/nop_host.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// nopHost mocks a receiver.ReceiverHost for test purposes. +type nopHost struct{} + +func (n nopHost) ReportFatalError(err error) { +} + +func (n nopHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { +} + +func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { + return nil +} + +func (n nopHost) GetExtensions() map[component.ID]component.Extension { + return nil +} + +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { + return nil +} + +// NewNopHost returns a new instance of nopHost with proper defaults for most tests. +func NewNopHost() Host { + return &nopHost{} +}