diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index b535b674af6..c24584b2b2a 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -28,6 +28,8 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} +func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {} + func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } diff --git a/component/host.go b/component/host.go index 92c395b0306..12d703cf542 100644 --- a/component/host.go +++ b/component/host.go @@ -23,8 +23,14 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError) ReportFatalError(err error) + // ReportComponentStatus can be used by a component to communicate its status to the Host. + // The Host implementations may broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(event *StatusEvent) + // GetFactory of the specified kind. Returns the factory for a component type. // This allows components to create other components. For example: // func (r MyReceiver) Start(host component.Host) error { diff --git a/component/status.go b/component/status.go new file mode 100644 index 00000000000..a4667929809 --- /dev/null +++ b/component/status.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "errors" + "time" +) + +type Status int + +const ( + StatusOK Status = iota + StatusError +) + +// StatusSource component that reports a status about itself. +// The implementation of this interface must be comparable to be useful as a map key. +type StatusSource interface { + ID() ID +} + +type StatusEvent struct { + timestamp time.Time + status Status + err error +} + +func (ev *StatusEvent) Status() Status { + return ev.status +} + +// Timestamp returns the time of the event. +func (ev *StatusEvent) Timestamp() time.Time { + return ev.timestamp +} + +// Err returns the error associated with the ComponentEvent. +func (ev *StatusEvent) Err() error { + return ev.err +} + +// statusEventOption applies options to a StatusEvent. +type statusEventOption func(*StatusEvent) error + +// WithTimestamp sets the timestamp for a ComponentEvent. +func WithTimestamp(timestamp time.Time) statusEventOption { + return func(o *StatusEvent) error { + o.timestamp = timestamp + return nil + } +} + +// WithError sets the error object of the Event. It is optional +// and should only be applied to an Event of type ComponentError. +func WithError(err error) statusEventOption { + return func(o *StatusEvent) error { + if o.status == StatusOK { + return errors.New("event with ComponentOK cannot have an error") + } + o.err = err + return nil + } +} + +// NewStatusEvent creates and returns a StatusEvent with default and provided +// options. Will return an error if an error is provided for a non-error event +// type (status.ComponentOK). +// If the timestamp is not provided will set it to time.Now(). +func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { + ev := StatusEvent{ + status: status, + } + + for _, opt := range options { + if err := opt(&ev); err != nil { + return nil, err + } + } + + if ev.timestamp.IsZero() { + ev.timestamp = time.Now() + } + + return &ev, nil +} + +// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry +// Collector that is to be implemented by extensions interested in changes to component +// status. +type StatusWatcher interface { + // StatusChanged notifies about a change in the source component status. + StatusChanged(timestamp time.Time, source StatusSource, event *StatusEvent) +} diff --git a/service/collector.go b/service/collector.go index 4250e9874a5..abcaeaa4422 100644 --- a/service/collector.go +++ b/service/collector.go @@ -28,9 +28,9 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service/internal/grpclog" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // State defines Collector's state. @@ -278,7 +278,7 @@ func (col *Collector) setCollectorState(state State) { col.state.Store(int32(state)) } -func getBallastSize(host component.Host) uint64 { +func getBallastSize(host servicehost.Host) uint64 { var ballastSize uint64 extensions := host.GetExtensions() for _, extension := range extensions { diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 1e20f430290..9c352f2b00f 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -19,12 +19,14 @@ import ( "fmt" "net/http" "sort" + "time" "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -36,13 +38,26 @@ type Extensions struct { extMap map[component.ID]component.Extension } +type statusReportingExtension struct { + id component.ID +} + +func (s *statusReportingExtension) GetKind() component.Kind { + return component.KindExtension +} + +func (s *statusReportingExtension) ID() component.ID { + return s.id +} + // Start starts all extensions. -func (bes *Extensions) Start(ctx context.Context, host component.Host) error { +func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for extID, ext := range bes.extMap { extLogger := extensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + statusSource := &statusReportingExtension{extID} + if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -83,6 +98,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } +func (bes *Extensions) NotifyStatusChange(timestamp time.Time, source component.StatusSource, event *component.StatusEvent) error { + var errs error + for _, ext := range bes.extMap { + if pw, ok := ext.(component.StatusWatcher); ok { + pw.StatusChanged(timestamp, source, event) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Extension { result := make(map[component.ID]component.Extension, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/host.go b/service/host.go index 31be010608d..76c4d5b74d1 100644 --- a/service/host.go +++ b/service/host.go @@ -15,12 +15,15 @@ package service // import "go.opentelemetry.io/collector/service" import ( + "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/pipelines" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -var _ component.Host = (*serviceHost)(nil) +var _ servicehost.Host = (*serviceHost)(nil) type serviceHost struct { asyncErrorChannel chan error @@ -34,10 +37,15 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.65.0] Replaced by ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } +func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { + host.extensions.NotifyStatusChange(time.Now(), source, event) +} + func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 78428e4e2f0..67beaf0b981 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -20,17 +20,21 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. +// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { - component.Host + servicehost.Host + component component.StatusSource *zap.Logger } -func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host { return &hostWrapper{ host, + component, logger, } } @@ -41,6 +45,12 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) } +var emptyComponentID = component.ID{} + +func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) { + hw.Host.ReportComponentStatus(hw.component, event) +} + // RegisterZPages is used by zpages extension to register handles from service. // When the wrapper is passed to the extension it won't be successful when casting // the interface, for the time being expose the interface here. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 720669bd847..0738df63600 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -18,12 +18,17 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" - "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) func Test_newHostWrapper(t *testing.T) { - hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop()) + hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) + ev, err := component.NewStatusEvent(component.StatusOK) + assert.NoError(t, err) + hw.ReportComponentStatus(ev) } diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 24907012a86..bcb6e0879a0 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/fanoutconsumer" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -65,18 +66,32 @@ type Pipelines struct { pipelines map[component.ID]*builtPipeline } +type statusReportingComponent struct { + kind component.Kind + id component.ID +} + +func (s *statusReportingComponent) GetKind() component.Kind { + return s.kind +} + +func (s *statusReportingComponent) ID() component.ID { + return s.id +} + // StartAll starts all pipelines. // // Start with exporters, processors (in reverse configured order), then receivers. // This is important so that components that are earlier in the pipeline and reference components that are // later in the pipeline do not start sending data to later components which are not yet started. -func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { +func (bps *Pipelines) StartAll(ctx context.Context, host servicehost.Host) error { bps.telemetry.Logger.Info("Starting exporters...") for dt, expByID := range bps.allExporters { for expID, exp := range expByID { expLogger := exporterLogger(bps.telemetry.Logger, expID, dt) expLogger.Info("Exporter is starting...") - if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindExporter, expID} + if err := exp.Start(ctx, components.NewHostWrapper(host, statusSource, expLogger)); err != nil { return err } expLogger.Info("Exporter started.") @@ -86,9 +101,12 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { bps.telemetry.Logger.Info("Starting processors...") for pipelineID, bp := range bps.pipelines { for i := len(bp.processors) - 1; i >= 0; i-- { - procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID) + processor := bp.processors[i] + procID := processor.id + procLogger := processorLogger(bps.telemetry.Logger, procID, pipelineID) procLogger.Info("Processor is starting...") - if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindProcessor, procID} + if err := processor.comp.Start(ctx, components.NewHostWrapper(host, statusSource, procLogger)); err != nil { return err } procLogger.Info("Processor started.") @@ -100,7 +118,8 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { for recvID, recv := range recvByID { recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt) recvLogger.Info("Receiver is starting...") - if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindReceiver, recvID} + if err := recv.Start(ctx, components.NewHostWrapper(host, statusSource, recvLogger)); err != nil { return err } recvLogger.Info("Receiver started.") diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go index 2bc2ed63096..d92f05db19f 100644 --- a/service/internal/pipelines/pipelines_test.go +++ b/service/internal/pipelines/pipelines_test.go @@ -32,6 +32,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/service/internal/configunmarshaler" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/testcomponents" ) @@ -95,7 +96,7 @@ func TestBuild(t *testing.T) { pipelines, err := Build(context.Background(), toSettings(factories, cfg)) assert.NoError(t, err) - assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) // Verify exporters created, started and empty. for _, expID := range test.exporterIDs { @@ -311,7 +312,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -325,7 +326,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -339,7 +340,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go new file mode 100644 index 00000000000..754f483e49c --- /dev/null +++ b/service/internal/servicehost/host.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// Host mirrors component.Host interface, with one important difference: servicehost.Host +// is not associated with a component and thus ReportComponentStatus() requires the source +// component to be explicitly specified. +type Host interface { + // ReportComponentStatus is used to communicate the status of a source component to the Host. + // The Host implementations will broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) + + // See component.Host for the documentation of the rest of the functions. + + // Deprecated: [0.65.0] Replaced by ReportComponentStatus. + ReportFatalError(err error) + + GetFactory(kind component.Kind, componentType component.Type) component.Factory + GetExtensions() map[component.ID]component.Extension + GetExporters() map[component.DataType]map[component.ID]component.Exporter +} diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go new file mode 100644 index 00000000000..7a5717624fb --- /dev/null +++ b/service/internal/servicehost/nop_host.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// nopHost mocks a receiver.ReceiverHost for test purposes. +type nopHost struct{} + +func (n nopHost) ReportFatalError(err error) { +} + +func (n nopHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { +} + +func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { + return nil +} + +func (n nopHost) GetExtensions() map[component.ID]component.Extension { + return nil +} + +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { + return nil +} + +// NewNopHost returns a new instance of nopHost with proper defaults for most tests. +func NewNopHost() Host { + return &nopHost{} +}