Skip to content

Commit

Permalink
Introduce component status reporting
Browse files Browse the repository at this point in the history
This is an alternate to #6550

## Summary of changes

- Add component status concept. Components can report their status via
  Host.ReportComponentStatus(). Interested extensions can watch status events
  if they implement StatusWatcher interface.
  This is similar to how PipelineWatcher works today.
- Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus().

## TODO after this is merged

- healthcheck extension must implement StatusWatcher.
- Replace all ReportFatalError() calls by ReportComponentStatus() calls in core and contrib.

## Open Questions

- StatusWatchers need to be able to tell if all current components are healthy.
  It is assumed that the listeners need to maintain a map of components and track
  the status of each component. This works only if we assume that the set of
  components cannot change during the lifetime of the listener. This assumption
  is true today but can change in the future if we introduce partial pipeline
  restarts where only modified/added/removed components are recreated (this will
  break listener's assumption and the map will become invalid). Should we
  instead keep track of this entire status map in the Host and broadcast the
  entire status to the listeners as a whole instead of (or in addition to)
  individual component events?
  • Loading branch information
tigrannajaryan committed Nov 21, 2022
1 parent a8d6280 commit 444ed5d
Show file tree
Hide file tree
Showing 17 changed files with 523 additions and 18 deletions.
2 changes: 2 additions & 0 deletions component/componenttest/nop_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func NewNopHost() component.Host {

func (nh *nopHost) ReportFatalError(_ error) {}

func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {}

func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
return nil
}
Expand Down
61 changes: 61 additions & 0 deletions component/componenttest/statuswatcher_extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
)

// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions.
func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings {
return component.ExtensionCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type statusWatcherExtensionConfig struct {
config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions.
func NewStatusWatcherExtensionFactory(
onStatusChanged func(source component.StatusSource, event *component.StatusEvent),
) component.ExtensionFactory {
return component.NewExtensionFactory(
"statuswatcher",
func() component.ExtensionConfig {
return &statusWatcherExtensionConfig{
ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")),
}
},
func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) {
return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil
},
component.StabilityLevelStable)
}

// statusWatcherExtension stores consumed traces and metrics for testing purposes.
type statusWatcherExtension struct {
nopComponent
onStatusChanged func(source component.StatusSource, event *component.StatusEvent)
}

func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) {
e.onStatusChanged(source, event)
}
81 changes: 81 additions & 0 deletions component/componenttest/unhealthy_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
)

// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions.
func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings {
return component.ProcessorCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type unhealthyProcessorConfig struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors.
func NewUnhealthyProcessorFactory() component.ProcessorFactory {
return component.NewProcessorFactory(
"unhealthy",
func() component.ProcessorConfig {
return &unhealthyProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")),
}
},
component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable),
component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable),
component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable),
)
}

func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) {
return unhealthyProcessorInstance, nil
}

func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) {
return unhealthyProcessorInstance, nil
}

func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) {
return unhealthyProcessorInstance, nil
}

var unhealthyProcessorInstance = &unhealthyProcessor{
Consumer: consumertest.NewNop(),
}

// unhealthyProcessor stores consumed traces and metrics for testing purposes.
type unhealthyProcessor struct {
nopComponent
consumertest.Consumer
}

func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error {
go func() {
evt, _ := component.NewStatusEvent(component.StatusError)
host.ReportComponentStatus(evt)
}()
return nil
}
9 changes: 9 additions & 0 deletions component/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,17 @@ type Host interface {
//
// ReportFatalError should be called by the component anytime after Component.Start() ends and
// before Component.Shutdown() begins.
// Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError)
ReportFatalError(err error)

// ReportComponentStatus can be used by a component to communicate its status to the Host.
// The Host implementations may broadcast this information to interested parties via
// StatusWatcher interface.
// May be called by the component any time after Component.Start is called or while
// Component.Start call is executing.
// May be called concurrently with itself.
ReportComponentStatus(event *StatusEvent)

// GetFactory of the specified kind. Returns the factory for a component type.
// This allows components to create other components. For example:
// func (r MyReceiver) Start(host component.Host) error {
Expand Down
90 changes: 90 additions & 0 deletions component/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"errors"
)

type Status int32

const (
StatusOK Status = iota
StatusError
)

// StatusSource component that reports a status about itself.
// The implementation of this interface must be comparable to be useful as a map key.
type StatusSource interface {
ID() ID
}

type StatusEvent struct {
status Status
err error
}

func (ev *StatusEvent) Status() Status {
return ev.status
}

// Err returns the error associated with the ComponentEvent.
func (ev *StatusEvent) Err() error {
return ev.err
}

// statusEventOption applies options to a StatusEvent.
type statusEventOption func(*StatusEvent) error

// WithError sets the error object of the Event. It is optional
// and should only be applied to an Event of type ComponentError.
func WithError(err error) statusEventOption {
return func(o *StatusEvent) error {
if o.status == StatusOK {
return errors.New("event with ComponentOK cannot have an error")
}
o.err = err
return nil
}
}

// NewStatusEvent creates and returns a StatusEvent with default and provided
// options. Will return an error if an error is provided for a non-error event
// type (status.ComponentOK).
// If the timestamp is not provided will set it to time.Now().
func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) {
ev := StatusEvent{
status: status,
}

for _, opt := range options {
if err := opt(&ev); err != nil {
return nil, err
}
}

return &ev, nil
}

// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry
// Collector that is to be implemented by extensions interested in changes to component
// status.
type StatusWatcher interface {
// ComponentStatusChanged notifies about a change in the source component status.
// Extensions that implement this interface must be ready that the ComponentStatusChanged
// may be called before, after or concurrently with Component.Shutdown() call.
// The function may be called concurrently with itself.
ComponentStatusChanged(source StatusSource, event *StatusEvent)
}
11 changes: 11 additions & 0 deletions component/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package component

import (
"fmt"
"testing"
"unsafe"
)

func TestStatusEventSize(t *testing.T) {
fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{}))
}
4 changes: 2 additions & 2 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/grpclog"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

// State defines Collector's state.
Expand Down Expand Up @@ -278,7 +278,7 @@ func (col *Collector) setCollectorState(state State) {
col.state.Store(int32(state))
}

func getBallastSize(host component.Host) uint64 {
func getBallastSize(host servicehost.Host) uint64 {
var ballastSize uint64
extensions := host.GetExtensions()
for _, extension := range extensions {
Expand Down
28 changes: 26 additions & 2 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/servicehost"
"go.opentelemetry.io/collector/service/internal/zpages"
)

Expand All @@ -36,13 +37,26 @@ type Extensions struct {
extMap map[component.ID]component.Extension
}

type statusReportingExtension struct {
id component.ID
}

func (s *statusReportingExtension) GetKind() component.Kind {
return component.KindExtension
}

func (s *statusReportingExtension) ID() component.ID {
return s.id
}

// Start starts all extensions.
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for extID, ext := range bes.extMap {
extLogger := extensionLogger(bes.telemetry.Logger, extID)
extLogger.Info("Extension is starting...")
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
statusSource := &statusReportingExtension{extID}
if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil {
return err
}
extLogger.Info("Extension started.")
Expand Down Expand Up @@ -83,6 +97,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
return errs
}

func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error {
var errs error
for _, ext := range bes.extMap {
if pw, ok := ext.(component.StatusWatcher); ok {
pw.ComponentStatusChanged(source, event)
}
}
return errs
}

func (bes *Extensions) GetExtensions() map[component.ID]component.Extension {
result := make(map[component.ID]component.Extension, len(bes.extMap))
for extID, v := range bes.extMap {
Expand Down
8 changes: 7 additions & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

var _ component.Host = (*serviceHost)(nil)
var _ servicehost.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
Expand All @@ -34,10 +35,15 @@ type serviceHost struct {
// ReportFatalError is used to report to the host that the receiver encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
// Deprecated: [0.65.0] Replaced by ReportComponentStatus
func (host *serviceHost) ReportFatalError(err error) {
host.asyncErrorChannel <- err
}

func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) {
host.extensions.NotifyComponentStatusChange(source, event)
}

func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
switch kind {
case component.KindReceiver:
Expand Down
Loading

0 comments on commit 444ed5d

Please sign in to comment.