Skip to content

Commit

Permalink
Introduce component status reporting
Browse files Browse the repository at this point in the history
This is an alternate to #6550

## Summary of changes

- Add component status concept. Components can report their status via
  Host.ReportComponentStatus(). Interested extensions can watch status events
  if they implement StatusWatcher interface.
  This is similar to how PipelineWatcher works today.
- Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus().

## TODO after this is merged

- healthcheck extension must implement StatusWatcher.
- Replace all ReportFatalError() calls by ReportComponentStatus() calls in core and contrib.

## Open Questions

- StatusWatchers need to be able to tell if all current components are healthy.
  It is assumed that the listeners need to maintain a map of components and track
  the status of each component. This works only if we assume that the set of
  components cannot change during the lifetime of the listener. This assumption
  is true today but can change in the future if we introduce partial pipeline
  restarts where only modified/added/removed components are recreated (this will
  break listener's assumption and the map will become invalid). Should we
  instead keep track of this entire status map in the Host and broadcast the
  entire status to the listeners as a whole instead of (or in addition to)
  individual component events?
  • Loading branch information
tigrannajaryan committed Nov 16, 2022
1 parent a8d6280 commit 9d9b647
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 18 deletions.
2 changes: 2 additions & 0 deletions component/componenttest/nop_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func NewNopHost() component.Host {

func (nh *nopHost) ReportFatalError(_ error) {}

func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {}

func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions component/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ type Host interface {
//
// ReportFatalError should be called by the component anytime after Component.Start() ends and
// before Component.Shutdown() begins.
// Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError)
ReportFatalError(err error)

// ReportComponentStatus can be used by a component to communicate its status to the Host.
// The Host implementations may broadcast this information to interested parties via
// StatusWatcher interface.
ReportComponentStatus(event *StatusEvent)

// GetFactory of the specified kind. Returns the factory for a component type.
// This allows components to create other components. For example:
// func (r MyReceiver) Start(host component.Host) error {
Expand Down
87 changes: 87 additions & 0 deletions component/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"errors"
)

type Status int32

const (
StatusOK Status = iota
StatusError
)

// StatusSource component that reports a status about itself.
// The implementation of this interface must be comparable to be useful as a map key.
type StatusSource interface {
ID() ID
}

type StatusEvent struct {
status Status
err error
}

func (ev *StatusEvent) Status() Status {
return ev.status
}

// Err returns the error associated with the ComponentEvent.
func (ev *StatusEvent) Err() error {
return ev.err
}

// statusEventOption applies options to a StatusEvent.
type statusEventOption func(*StatusEvent) error

// WithError sets the error object of the Event. It is optional
// and should only be applied to an Event of type ComponentError.
func WithError(err error) statusEventOption {
return func(o *StatusEvent) error {
if o.status == StatusOK {
return errors.New("event with ComponentOK cannot have an error")
}
o.err = err
return nil
}
}

// NewStatusEvent creates and returns a StatusEvent with default and provided
// options. Will return an error if an error is provided for a non-error event
// type (status.ComponentOK).
// If the timestamp is not provided will set it to time.Now().
func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) {
ev := StatusEvent{
status: status,
}

for _, opt := range options {
if err := opt(&ev); err != nil {
return nil, err
}
}

return &ev, nil
}

// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry
// Collector that is to be implemented by extensions interested in changes to component
// status.
type StatusWatcher interface {
// StatusChanged notifies about a change in the source component status.
ComponentStatusChanged(source StatusSource, event *StatusEvent)
}
11 changes: 11 additions & 0 deletions component/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package component

import (
"fmt"
"testing"
"unsafe"
)

func TestStatusEventSize(t *testing.T) {
fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{}))
}
4 changes: 2 additions & 2 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/grpclog"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

// State defines Collector's state.
Expand Down Expand Up @@ -278,7 +278,7 @@ func (col *Collector) setCollectorState(state State) {
col.state.Store(int32(state))
}

func getBallastSize(host component.Host) uint64 {
func getBallastSize(host servicehost.Host) uint64 {
var ballastSize uint64
extensions := host.GetExtensions()
for _, extension := range extensions {
Expand Down
28 changes: 26 additions & 2 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/servicehost"
"go.opentelemetry.io/collector/service/internal/zpages"
)

Expand All @@ -36,13 +37,26 @@ type Extensions struct {
extMap map[component.ID]component.Extension
}

type statusReportingExtension struct {
id component.ID
}

func (s *statusReportingExtension) GetKind() component.Kind {
return component.KindExtension
}

func (s *statusReportingExtension) ID() component.ID {
return s.id
}

// Start starts all extensions.
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for extID, ext := range bes.extMap {
extLogger := extensionLogger(bes.telemetry.Logger, extID)
extLogger.Info("Extension is starting...")
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
statusSource := &statusReportingExtension{extID}
if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil {
return err
}
extLogger.Info("Extension started.")
Expand Down Expand Up @@ -83,6 +97,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
return errs
}

func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error {
var errs error
for _, ext := range bes.extMap {
if pw, ok := ext.(component.StatusWatcher); ok {
pw.ComponentStatusChanged(source, event)
}
}
return errs
}

func (bes *Extensions) GetExtensions() map[component.ID]component.Extension {
result := make(map[component.ID]component.Extension, len(bes.extMap))
for extID, v := range bes.extMap {
Expand Down
8 changes: 7 additions & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

var _ component.Host = (*serviceHost)(nil)
var _ servicehost.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
Expand All @@ -34,10 +35,15 @@ type serviceHost struct {
// ReportFatalError is used to report to the host that the receiver encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
// Deprecated: [0.65.0] Replaced by ReportComponentStatus
func (host *serviceHost) ReportFatalError(err error) {
host.asyncErrorChannel <- err
}

func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) {
host.extensions.NotifyComponentStatusChange(source, event)
}

func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
switch kind {
case component.KindReceiver:
Expand Down
14 changes: 12 additions & 2 deletions service/internal/components/host_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

// hostWrapper adds behavior on top of the component.Host being passed when starting the built components.
// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose.
type hostWrapper struct {
component.Host
servicehost.Host
component component.StatusSource
*zap.Logger
}

func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host {
func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host {
return &hostWrapper{
host,
component,
logger,
}
}
Expand All @@ -41,6 +45,12 @@ func (hw *hostWrapper) ReportFatalError(err error) {
hw.Host.ReportFatalError(err)
}

var emptyComponentID = component.ID{}

func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) {
hw.Host.ReportComponentStatus(hw.component, event)
}

// RegisterZPages is used by zpages extension to register handles from service.
// When the wrapper is passed to the extension it won't be successful when casting
// the interface, for the time being expose the interface here.
Expand Down
9 changes: 7 additions & 2 deletions service/internal/components/host_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

func Test_newHostWrapper(t *testing.T) {
hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop())
hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop())
hw.ReportFatalError(errors.New("test error"))
ev, err := component.NewStatusEvent(component.StatusOK)
assert.NoError(t, err)
hw.ReportComponentStatus(ev)
}
29 changes: 24 additions & 5 deletions service/internal/pipelines/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
"go.opentelemetry.io/collector/service/internal/servicehost"
"go.opentelemetry.io/collector/service/internal/zpages"
)

Expand Down Expand Up @@ -65,18 +66,32 @@ type Pipelines struct {
pipelines map[component.ID]*builtPipeline
}

type statusReportingComponent struct {
kind component.Kind
id component.ID
}

func (s *statusReportingComponent) GetKind() component.Kind {
return s.kind
}

func (s *statusReportingComponent) ID() component.ID {
return s.id
}

// StartAll starts all pipelines.
//
// Start with exporters, processors (in reverse configured order), then receivers.
// This is important so that components that are earlier in the pipeline and reference components that are
// later in the pipeline do not start sending data to later components which are not yet started.
func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error {
func (bps *Pipelines) StartAll(ctx context.Context, host servicehost.Host) error {
bps.telemetry.Logger.Info("Starting exporters...")
for dt, expByID := range bps.allExporters {
for expID, exp := range expByID {
expLogger := exporterLogger(bps.telemetry.Logger, expID, dt)
expLogger.Info("Exporter is starting...")
if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil {
statusSource := &statusReportingComponent{component.KindExporter, expID}
if err := exp.Start(ctx, components.NewHostWrapper(host, statusSource, expLogger)); err != nil {
return err
}
expLogger.Info("Exporter started.")
Expand All @@ -86,9 +101,12 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error {
bps.telemetry.Logger.Info("Starting processors...")
for pipelineID, bp := range bps.pipelines {
for i := len(bp.processors) - 1; i >= 0; i-- {
procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID)
processor := bp.processors[i]
procID := processor.id
procLogger := processorLogger(bps.telemetry.Logger, procID, pipelineID)
procLogger.Info("Processor is starting...")
if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil {
statusSource := &statusReportingComponent{component.KindProcessor, procID}
if err := processor.comp.Start(ctx, components.NewHostWrapper(host, statusSource, procLogger)); err != nil {
return err
}
procLogger.Info("Processor started.")
Expand All @@ -100,7 +118,8 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error {
for recvID, recv := range recvByID {
recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt)
recvLogger.Info("Receiver is starting...")
if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil {
statusSource := &statusReportingComponent{component.KindReceiver, recvID}
if err := recv.Start(ctx, components.NewHostWrapper(host, statusSource, recvLogger)); err != nil {
return err
}
recvLogger.Info("Receiver started.")
Expand Down
9 changes: 5 additions & 4 deletions service/internal/pipelines/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/service/internal/configunmarshaler"
"go.opentelemetry.io/collector/service/internal/servicehost"
"go.opentelemetry.io/collector/service/internal/testcomponents"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func TestBuild(t *testing.T) {
pipelines, err := Build(context.Background(), toSettings(factories, cfg))
assert.NoError(t, err)

assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost()))

// Verify exporters created, started and empty.
for _, expID := range test.exporterIDs {
Expand Down Expand Up @@ -311,7 +312,7 @@ func TestFailToStartAndShutdown(t *testing.T) {
}
pipelines, err := Build(context.Background(), set)
assert.NoError(t, err)
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost()))
assert.Error(t, pipelines.ShutdownAll(context.Background()))
})

Expand All @@ -325,7 +326,7 @@ func TestFailToStartAndShutdown(t *testing.T) {
}
pipelines, err := Build(context.Background(), set)
assert.NoError(t, err)
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost()))
assert.Error(t, pipelines.ShutdownAll(context.Background()))
})

Expand All @@ -339,7 +340,7 @@ func TestFailToStartAndShutdown(t *testing.T) {
}
pipelines, err := Build(context.Background(), set)
assert.NoError(t, err)
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost()))
assert.Error(t, pipelines.ShutdownAll(context.Background()))
})
}
Expand Down
Loading

0 comments on commit 9d9b647

Please sign in to comment.