-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce component status reporting
This is an alternate to #6550 ## Summary of changes - Add component status concept. Components can report their status via Host.ReportComponentStatus(). Interested extensions can watch status events if they implement StatusWatcher interface. This is similar to how PipelineWatcher works today. - Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus(). ## TODO after this is merged - healthcheck extension must implement StatusWatcher. - Replace all ReportFatalError() calls by ReportComponentStatus() calls in core and contrib. ## Open Questions - StatusWatchers need to be able to tell if all current components are healthy. It is assumed that the listeners need to maintain a map of components and track the status of each component. This works only if we assume that the set of components cannot change during the lifetime of the listener. This assumption is true today but can change in the future if we introduce partial pipeline restarts where only modified/added/removed components are recreated (this will break listener's assumption and the map will become invalid). Should we instead keep track of this entire status map in the Host and broadcast the entire status to the listeners as a whole instead of (or in addition to) individual component events?
- Loading branch information
1 parent
a8d6280
commit 2ac7d38
Showing
17 changed files
with
517 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package componenttest // import "go.opentelemetry.io/collector/component/componenttest" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
) | ||
|
||
// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. | ||
func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { | ||
return component.ExtensionCreateSettings{ | ||
TelemetrySettings: NewNopTelemetrySettings(), | ||
BuildInfo: component.NewDefaultBuildInfo(), | ||
} | ||
} | ||
|
||
type statusWatcherExtensionConfig struct { | ||
config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct | ||
} | ||
|
||
// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. | ||
func NewStatusWatcherExtensionFactory( | ||
onStatusChanged func(source component.StatusSource, event *component.StatusEvent), | ||
) component.ExtensionFactory { | ||
return component.NewExtensionFactory( | ||
"statuswatcher", | ||
func() component.ExtensionConfig { | ||
return &statusWatcherExtensionConfig{ | ||
ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), | ||
} | ||
}, | ||
func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { | ||
return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil | ||
}, | ||
component.StabilityLevelStable) | ||
} | ||
|
||
// statusWatcherExtension stores consumed traces and metrics for testing purposes. | ||
type statusWatcherExtension struct { | ||
nopComponent | ||
onStatusChanged func(source component.StatusSource, event *component.StatusEvent) | ||
} | ||
|
||
func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) { | ||
e.onStatusChanged(source, event) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package componenttest // import "go.opentelemetry.io/collector/component/componenttest" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/consumer/consumertest" | ||
) | ||
|
||
// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. | ||
func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { | ||
return component.ProcessorCreateSettings{ | ||
TelemetrySettings: NewNopTelemetrySettings(), | ||
BuildInfo: component.NewDefaultBuildInfo(), | ||
} | ||
} | ||
|
||
type unhealthyProcessorConfig struct { | ||
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct | ||
} | ||
|
||
// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. | ||
func NewUnhealthyProcessorFactory() component.ProcessorFactory { | ||
return component.NewProcessorFactory( | ||
"unhealthy", | ||
func() component.ProcessorConfig { | ||
return &unhealthyProcessorConfig{ | ||
ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), | ||
} | ||
}, | ||
component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), | ||
component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), | ||
component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), | ||
) | ||
} | ||
|
||
func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { | ||
return unhealthyProcessorInstance, nil | ||
} | ||
|
||
func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { | ||
return unhealthyProcessorInstance, nil | ||
} | ||
|
||
func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { | ||
return unhealthyProcessorInstance, nil | ||
} | ||
|
||
var unhealthyProcessorInstance = &unhealthyProcessor{ | ||
Consumer: consumertest.NewNop(), | ||
} | ||
|
||
// unhealthyProcessor stores consumed traces and metrics for testing purposes. | ||
type unhealthyProcessor struct { | ||
nopComponent | ||
consumertest.Consumer | ||
} | ||
|
||
func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error { | ||
go func() { | ||
evt, _ := component.NewStatusEvent(component.StatusError) | ||
host.ReportComponentStatus(evt) | ||
}() | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package component | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
type Status int32 | ||
|
||
const ( | ||
StatusOK Status = iota | ||
StatusError | ||
) | ||
|
||
// StatusSource component that reports a status about itself. | ||
// The implementation of this interface must be comparable to be useful as a map key. | ||
type StatusSource interface { | ||
ID() ID | ||
} | ||
|
||
type StatusEvent struct { | ||
status Status | ||
err error | ||
} | ||
|
||
func (ev *StatusEvent) Status() Status { | ||
return ev.status | ||
} | ||
|
||
// Err returns the error associated with the ComponentEvent. | ||
func (ev *StatusEvent) Err() error { | ||
return ev.err | ||
} | ||
|
||
// statusEventOption applies options to a StatusEvent. | ||
type statusEventOption func(*StatusEvent) error | ||
|
||
// WithError sets the error object of the Event. It is optional | ||
// and should only be applied to an Event of type ComponentError. | ||
func WithError(err error) statusEventOption { | ||
return func(o *StatusEvent) error { | ||
if o.status == StatusOK { | ||
return errors.New("event with ComponentOK cannot have an error") | ||
} | ||
o.err = err | ||
return nil | ||
} | ||
} | ||
|
||
// NewStatusEvent creates and returns a StatusEvent with default and provided | ||
// options. Will return an error if an error is provided for a non-error event | ||
// type (status.ComponentOK). | ||
// If the timestamp is not provided will set it to time.Now(). | ||
func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { | ||
ev := StatusEvent{ | ||
status: status, | ||
} | ||
|
||
for _, opt := range options { | ||
if err := opt(&ev); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return &ev, nil | ||
} | ||
|
||
// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry | ||
// Collector that is to be implemented by extensions interested in changes to component | ||
// status. | ||
type StatusWatcher interface { | ||
// ComponentStatusChanged notifies about a change in the source component status. | ||
ComponentStatusChanged(source StatusSource, event *StatusEvent) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package component | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"unsafe" | ||
) | ||
|
||
func TestStatusEventSize(t *testing.T) { | ||
fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{})) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.