diff --git a/.chloggen/component-status.yaml b/.chloggen/component-status.yaml deleted file mode 100755 index f920bb8c897..00000000000 --- a/.chloggen/component-status.yaml +++ /dev/null @@ -1,16 +0,0 @@ -# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: enhancement - -# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) -component: core - -# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Adds the ability for components to report status and for extensions to subscribe to status events by implementing an optional StatusWatcher interface. - -# One or more tracking issues or pull requests related to the change -issues: [7682] - -# (Optional) One or more lines of additional information to render under the primary note. -# These lines will be padded with 2 spaces and then inserted directly into the document. -# Use pipe (|) for multiline entries. -subtext: diff --git a/component/component.go b/component/component.go index 3b3fe3fc7cf..9a6a95d798a 100644 --- a/component/component.go +++ b/component/component.go @@ -175,10 +175,3 @@ type CreateDefaultConfigFunc func() Config func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config { return f() } - -// InstanceID uniquely identifies a component instance -type InstanceID struct { - ID ID - Kind Kind - PipelineIDs map[ID]struct{} -} diff --git a/component/componenttest/nop_telemetry.go b/component/componenttest/nop_telemetry.go index a14abfb8978..438f9ec761a 100644 --- a/component/componenttest/nop_telemetry.go +++ b/component/componenttest/nop_telemetry.go @@ -21,8 +21,5 @@ func NewNopTelemetrySettings() component.TelemetrySettings { MeterProvider: noop.NewMeterProvider(), MetricsLevel: configtelemetry.LevelNone, Resource: pcommon.NewResource(), - ReportComponentStatus: func(*component.StatusEvent) error { - return nil - }, } } diff --git a/component/host.go b/component/host.go index 732e37c8c44..ea3825d743f 100644 --- a/component/host.go +++ b/component/host.go @@ -12,8 +12,6 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. - // Deprecated: [0.87.0] Use TelemetrySettings.ReportComponentStatus instead (with an event - // component.StatusFatalError) ReportFatalError(err error) // GetFactory of the specified kind. Returns the factory for a component type. diff --git a/component/status.go b/component/status.go deleted file mode 100644 index 36a449edc12..00000000000 --- a/component/status.go +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package component // import "go.opentelemetry.io/collector/component" - -import ( - "time" -) - -type Status int32 - -// Enumeration of possible component statuses -const ( - StatusNone Status = iota - StatusStarting - StatusOK - StatusRecoverableError - StatusPermanentError - StatusFatalError - StatusStopping - StatusStopped -) - -// String returns a string representation of a Status -func (s Status) String() string { - switch s { - case StatusStarting: - return "StatusStarting" - case StatusOK: - return "StatusOK" - case StatusRecoverableError: - return "StatusRecoverableError" - case StatusPermanentError: - return "StatusPermanentError" - case StatusFatalError: - return "StatusFatalError" - case StatusStopping: - return "StatusStopping" - case StatusStopped: - return "StatusStopped" - } - return "StatusNone" -} - -// StatusEvent contains a status and timestamp, and can contain an error -type StatusEvent struct { - status Status - err error - timestamp time.Time -} - -// Status returns the Status (enum) associated with the StatusEvent -func (ev *StatusEvent) Status() Status { - return ev.status -} - -// Err returns the error associated with the StatusEvent. -func (ev *StatusEvent) Err() error { - return ev.err -} - -// Timestamp returns the timestamp associated with the StatusEvent -func (ev *StatusEvent) Timestamp() time.Time { - return ev.timestamp -} - -// NewStatusEvent creates and returns a StatusEvent with the specified status and sets the timestamp -// time.Now(). To set an error on the event for an error status use one of the dedicated -// constructors (e.g. NewRecoverableErrorEvent, NewPermanentErrorEvent, NewFatalErrorEvent) -func NewStatusEvent(status Status) *StatusEvent { - return &StatusEvent{ - status: status, - timestamp: time.Now(), - } -} - -// NewRecoverableErrorEvent creates and returns a StatusEvent with StatusRecoverableError, the -// specified error, and a timestamp set to time.Now(). -func NewRecoverableErrorEvent(err error) *StatusEvent { - ev := NewStatusEvent(StatusRecoverableError) - ev.err = err - return ev -} - -// NewPermanentErrorEvent creates and returns a StatusEvent with StatusPermanentError, the -// specified error, and a timestamp set to time.Now(). -func NewPermanentErrorEvent(err error) *StatusEvent { - ev := NewStatusEvent(StatusPermanentError) - ev.err = err - return ev -} - -// NewFatalErrorEvent creates and returns a StatusEvent with StatusFatalError, the -// specified error, and a timestamp set to time.Now(). -func NewFatalErrorEvent(err error) *StatusEvent { - ev := NewStatusEvent(StatusFatalError) - ev.err = err - return ev -} - -// StatusFunc is the expected type of ReportComponentStatus for component.TelemetrySettings -type StatusFunc func(*StatusEvent) error - -// AggregateStatus will derive a status for the given input using the following rules in order: -// 1. If all instances have the same status, there is nothing to aggregate, return it. -// 2. If any instance encounters a fatal error, the component is in a Fatal Error state. -// 3. If any instance is in a Permanent Error state, the component status is Permanent Error. -// 4. If any instance is Stopping, the component is in a Stopping state. -// 5. An instance is Stopped, but not all instances are Stopped, we must be in the process of Stopping the component. -// 6. If any instance is in a Recoverable Error state, the component status is Recoverable Error. -// 7. By process of elimination, the only remaining state is starting. -func AggregateStatus[K comparable](eventMap map[K]*StatusEvent) Status { - seen := make(map[Status]struct{}) - for _, ev := range eventMap { - seen[ev.Status()] = struct{}{} - } - - // All statuses are the same. Note, this will handle StatusOK and StatusStopped as these two - // cases require all components be in the same state. - if len(seen) == 1 { - for st := range seen { - return st - } - } - - // Handle mixed status cases - if _, isFatal := seen[StatusFatalError]; isFatal { - return StatusFatalError - } - - if _, isPermanent := seen[StatusPermanentError]; isPermanent { - return StatusPermanentError - } - - if _, isStopping := seen[StatusStopping]; isStopping { - return StatusStopping - } - - if _, isStopped := seen[StatusStopped]; isStopped { - return StatusStopping - } - - if _, isRecoverable := seen[StatusRecoverableError]; isRecoverable { - return StatusRecoverableError - } - - // By process of elimination, this is the last possible status; no check necessary. - return StatusStarting -} - -// StatusIsError returns true for error statuses (e.g. StatusRecoverableError, -// StatusPermanentError, or StatusFatalError) -func StatusIsError(status Status) bool { - return status == StatusRecoverableError || - status == StatusPermanentError || - status == StatusFatalError -} - -// AggregateStatusEvent returns a status event where: -// - The status is set to the aggregate status of the events in the eventMap -// - The timestamp is set to the latest timestamp of the events in the eventMap -// - For an error status, the event will have same error as the most current event of the same -// error type from the eventMap -func AggregateStatusEvent[K comparable](eventMap map[K]*StatusEvent) *StatusEvent { - var lastEvent, lastMatchingEvent *StatusEvent - aggregateStatus := AggregateStatus[K](eventMap) - - for _, ev := range eventMap { - if lastEvent == nil || lastEvent.timestamp.Before(ev.timestamp) { - lastEvent = ev - } - if aggregateStatus == ev.Status() && - (lastMatchingEvent == nil || lastMatchingEvent.timestamp.Before(ev.timestamp)) { - lastMatchingEvent = ev - } - } - - // the effective status matches an existing event - if lastEvent.Status() == aggregateStatus { - return lastEvent - } - - // the effective status requires a synthetic event - aggregateEvent := &StatusEvent{ - status: aggregateStatus, - timestamp: lastEvent.timestamp, - } - if StatusIsError(aggregateStatus) { - aggregateEvent.err = lastMatchingEvent.err - } - - return aggregateEvent -} diff --git a/component/status_test.go b/component/status_test.go deleted file mode 100644 index 13755d078a5..00000000000 --- a/component/status_test.go +++ /dev/null @@ -1,330 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -package component - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestNewStatusEvent(t *testing.T) { - statuses := []Status{ - StatusStarting, - StatusOK, - StatusRecoverableError, - StatusPermanentError, - StatusFatalError, - StatusStopping, - StatusStopped, - } - - for _, status := range statuses { - t.Run(fmt.Sprintf("%s without error", status), func(t *testing.T) { - ev := NewStatusEvent(status) - require.Equal(t, status, ev.Status()) - require.Nil(t, ev.Err()) - require.False(t, ev.Timestamp().IsZero()) - }) - } -} - -func TestStatusEventsWithError(t *testing.T) { - statusConstructorMap := map[Status]func(error) *StatusEvent{ - StatusRecoverableError: NewRecoverableErrorEvent, - StatusPermanentError: NewPermanentErrorEvent, - StatusFatalError: NewFatalErrorEvent, - } - - for status, newEvent := range statusConstructorMap { - t.Run(fmt.Sprintf("error status constructor for: %s", status), func(t *testing.T) { - ev := newEvent(assert.AnError) - require.Equal(t, status, ev.Status()) - require.Equal(t, assert.AnError, ev.Err()) - require.False(t, ev.Timestamp().IsZero()) - }) - } -} - -func TestAggregateStatus(t *testing.T) { - for _, tc := range []struct { - name string - statusMap map[*InstanceID]*StatusEvent - expectedStatus Status - }{ - { - name: "aggregate status with fatal is FatalError", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusFatalError), - {}: NewStatusEvent(StatusRecoverableError), - }, - expectedStatus: StatusFatalError, - }, - { - name: "aggregate status with permanent is PermanentError", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusPermanentError), - {}: NewStatusEvent(StatusRecoverableError), - }, - expectedStatus: StatusPermanentError, - }, - { - name: "aggregate status with stopping is Stopping", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusRecoverableError), - {}: NewStatusEvent(StatusStopping), - }, - expectedStatus: StatusStopping, - }, - { - name: "aggregate status with stopped and non-stopped is Stopping", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusRecoverableError), - {}: NewStatusEvent(StatusStopped), - }, - expectedStatus: StatusStopping, - }, - { - name: "aggregate status with all stopped is Stopped", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStopped), - {}: NewStatusEvent(StatusStopped), - {}: NewStatusEvent(StatusStopped), - }, - expectedStatus: StatusStopped, - }, - { - name: "aggregate status with recoverable is RecoverableError", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusRecoverableError), - }, - expectedStatus: StatusRecoverableError, - }, - { - name: "aggregate status with starting is Starting", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - }, - expectedStatus: StatusStarting, - }, - { - name: "aggregate status with all ok is OK", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusOK), - }, - expectedStatus: StatusOK, - }, - } { - t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.expectedStatus, AggregateStatus(tc.statusMap)) - }) - } -} - -func TestStatusIsError(t *testing.T) { - for _, tc := range []struct { - status Status - isError bool - }{ - { - status: StatusStarting, - isError: false, - }, - { - status: StatusOK, - isError: false, - }, - { - status: StatusRecoverableError, - isError: true, - }, - { - status: StatusPermanentError, - isError: true, - }, - { - status: StatusFatalError, - isError: true, - }, - { - status: StatusStopping, - isError: false, - }, - { - status: StatusStopped, - isError: false, - }, - } { - name := fmt.Sprintf("StatusIsError(%s) is %t", tc.status, tc.isError) - t.Run(name, func(t *testing.T) { - assert.Equal(t, tc.isError, StatusIsError(tc.status)) - }) - } -} - -func TestAggregateStatusEvent(t *testing.T) { - // maxTime is used to make sure we select the event with the latest timestamp - maxTime := time.Unix(1<<63-62135596801, 999999999) - // latest sets the timestamp for an event to maxTime - latest := func(ev *StatusEvent) *StatusEvent { - ev.timestamp = maxTime - return ev - } - - for _, tc := range []struct { - name string - statusMap map[*InstanceID]*StatusEvent - expectedStatus *StatusEvent - }{ - { - name: "FatalError - existing event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: latest(NewFatalErrorEvent(assert.AnError)), - {}: NewStatusEvent(StatusRecoverableError), - }, - expectedStatus: &StatusEvent{ - status: StatusFatalError, - timestamp: maxTime, - err: assert.AnError, - }, - }, - { - name: "FatalError - synthetic event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewFatalErrorEvent(assert.AnError), - {}: latest(NewStatusEvent(StatusRecoverableError)), - }, - expectedStatus: &StatusEvent{ - status: StatusFatalError, - timestamp: maxTime, - err: assert.AnError, - }, - }, - { - name: "PermanentError - existing event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: latest(NewPermanentErrorEvent(assert.AnError)), - {}: NewStatusEvent(StatusRecoverableError), - }, - expectedStatus: &StatusEvent{ - status: StatusPermanentError, - timestamp: maxTime, - err: assert.AnError, - }, - }, - { - name: "PermanentError - synthetic event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewPermanentErrorEvent(assert.AnError), - {}: latest(NewStatusEvent(StatusRecoverableError)), - }, - expectedStatus: &StatusEvent{ - status: StatusPermanentError, - timestamp: maxTime, - err: assert.AnError, - }, - }, - { - name: "Stopping - existing event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusRecoverableError), - {}: latest(NewStatusEvent(StatusStopping)), - }, - expectedStatus: &StatusEvent{ - status: StatusStopping, - timestamp: maxTime, - }, - }, - { - name: "Stopping - synthetic event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: NewStatusEvent(StatusRecoverableError), - {}: latest(NewStatusEvent(StatusStopped)), - }, - expectedStatus: &StatusEvent{ - status: StatusStopping, - timestamp: maxTime, - }, - }, - { - name: "Stopped - existing event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStopped), - {}: latest(NewStatusEvent(StatusStopped)), - {}: NewStatusEvent(StatusStopped), - }, - expectedStatus: &StatusEvent{ - status: StatusStopped, - timestamp: maxTime, - }, - }, - { - name: "RecoverableError - existing event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: NewStatusEvent(StatusOK), - {}: latest(NewRecoverableErrorEvent(assert.AnError)), - }, - expectedStatus: &StatusEvent{ - status: StatusRecoverableError, - timestamp: maxTime, - err: assert.AnError, - }, - }, - { - name: "Starting - synthetic event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusStarting), - {}: latest(NewStatusEvent(StatusOK)), - }, - expectedStatus: &StatusEvent{ - status: StatusStarting, - timestamp: maxTime, - }, - }, - { - name: "OK - existing event", - statusMap: map[*InstanceID]*StatusEvent{ - {}: NewStatusEvent(StatusOK), - {}: latest(NewStatusEvent(StatusOK)), - {}: NewStatusEvent(StatusOK), - }, - expectedStatus: &StatusEvent{ - status: StatusOK, - timestamp: maxTime, - }, - }, - } { - t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.expectedStatus, AggregateStatusEvent(tc.statusMap)) - }) - } -} diff --git a/component/telemetry.go b/component/telemetry.go index 5eb6bcf457a..9617e456319 100644 --- a/component/telemetry.go +++ b/component/telemetry.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" ) -type TelemetrySettingsBase[T any] struct { +type TelemetrySettings struct { // Logger that the factory can use during creation and can pass to the created // component to be used later as well. Logger *zap.Logger @@ -29,20 +29,4 @@ type TelemetrySettingsBase[T any] struct { // Resource contains the resource attributes for the collector's telemetry. Resource pcommon.Resource - - // ReportComponentStatus allows a component to report runtime changes in status. The service - // will automatically report status for a component during startup and shutdown. Components can - // use this method to report status after start and before shutdown. ReportComponentStatus - // will only return errors if the API used incorrectly. The two scenarios where an error will - // be returned are: - // - // - An illegal state transition - // - Calling this method before component startup - // - // If the API is being used properly, these errors are safe to ignore. - ReportComponentStatus T } - -// TelemetrySettings and servicetelemetry.Settings differ in the method signature for -// ReportComponentStatus -type TelemetrySettings TelemetrySettingsBase[StatusFunc] diff --git a/extension/extension.go b/extension/extension.go index 2521fc65a18..6b8df571b81 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -40,17 +40,6 @@ type ConfigWatcher interface { NotifyConfig(ctx context.Context, conf *confmap.Conf) error } -// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry -// Collector that is to be implemented by extensions interested in changes to component -// status. -type StatusWatcher interface { - // ComponentStatusChanged notifies about a change in the source component status. - // Extensions that implement this interface must be ready that the ComponentStatusChanged - // may be called before, after or concurrently with calls to Component.Start() and Component.Shutdown(). - // The function may be called concurrently with itself. - ComponentStatusChanged(source *component.InstanceID, event *component.StatusEvent) -} - // CreateSettings is passed to Factory.Create(...) function. type CreateSettings struct { // ID returns the ID of the component that will be created. diff --git a/extension/extensiontest/statuswatcher_extension.go b/extension/extensiontest/statuswatcher_extension.go deleted file mode 100644 index e501353dcc6..00000000000 --- a/extension/extensiontest/statuswatcher_extension.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package extensiontest // import "go.opentelemetry.io/collector/extension/extensiontest" - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/extension" -) - -// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. -func NewStatusWatcherExtensionCreateSettings() extension.CreateSettings { - return extension.CreateSettings{ - TelemetrySettings: componenttest.NewNopTelemetrySettings(), - BuildInfo: component.NewDefaultBuildInfo(), - } -} - -// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory to construct a status watcher extension. -func NewStatusWatcherExtensionFactory( - onStatusChanged func(source *component.InstanceID, event *component.StatusEvent), -) extension.Factory { - return extension.NewFactory( - "statuswatcher", - func() component.Config { - return &struct{}{} - }, - func(context.Context, extension.CreateSettings, component.Config) (component.Component, error) { - return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil - }, - component.StabilityLevelStable) -} - -// statusWatcherExtension receives status events reported via component status reporting for testing -// purposes. -type statusWatcherExtension struct { - component.StartFunc - component.ShutdownFunc - onStatusChanged func(source *component.InstanceID, event *component.StatusEvent) -} - -func (e statusWatcherExtension) ComponentStatusChanged(source *component.InstanceID, event *component.StatusEvent) { - e.onStatusChanged(source, event) -} diff --git a/extension/extensiontest/statuswatcher_extension_test.go b/extension/extensiontest/statuswatcher_extension_test.go deleted file mode 100644 index 0bfdf4f5eda..00000000000 --- a/extension/extensiontest/statuswatcher_extension_test.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package extensiontest - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/extension" -) - -func TestStatusWatcherExtension(t *testing.T) { - statusChanged := false - factory := NewStatusWatcherExtensionFactory( - func(*component.InstanceID, *component.StatusEvent) { - statusChanged = true - }, - ) - require.NotNil(t, factory) - assert.Equal(t, component.Type("statuswatcher"), factory.Type()) - cfg := factory.CreateDefaultConfig() - assert.Equal(t, &struct{}{}, cfg) - - ext, err := factory.CreateExtension(context.Background(), NewStatusWatcherExtensionCreateSettings(), cfg) - require.NoError(t, err) - assert.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost())) - assert.False(t, statusChanged) - - ext.(extension.StatusWatcher).ComponentStatusChanged(&component.InstanceID{}, &component.StatusEvent{}) - - assert.True(t, statusChanged) - assert.NoError(t, ext.Shutdown(context.Background())) -} diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index cddebb59902..2d1c74f355e 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -27,37 +27,19 @@ func NewSharedComponents[K comparable, V component.Component]() *SharedComponent // GetOrAdd returns the already created instance if exists, otherwise creates a new instance // and adds it to the map of references. -func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) { +func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error)) (*SharedComponent[V], error) { if c, ok := scs.comps[key]; ok { - // If we haven't already seen this telemetry settings, this shared component represents - // another instance. Wrap ReportComponentStatus to report for all instances this shared - // component represents. - if _, ok := c.seenSettings[telemetrySettings]; !ok { - c.seenSettings[telemetrySettings] = struct{}{} - prev := c.telemetry.ReportComponentStatus - c.telemetry.ReportComponentStatus = func(ev *component.StatusEvent) error { - if err := telemetrySettings.ReportComponentStatus(ev); err != nil { - return err - } - return prev(ev) - } - } return c, nil } comp, err := create() if err != nil { return nil, err } - newComp := &SharedComponent[V]{ component: comp, removeFunc: func() { delete(scs.comps, key) }, - telemetry: telemetrySettings, - seenSettings: map[*component.TelemetrySettings]struct{}{ - telemetrySettings: {}, - }, } scs.comps[key] = newComp return newComp, nil @@ -71,9 +53,6 @@ type SharedComponent[V component.Component] struct { startOnce sync.Once stopOnce sync.Once removeFunc func() - - telemetry *component.TelemetrySettings - seenSettings map[*component.TelemetrySettings]struct{} } // Unwrap returns the original component. @@ -85,14 +64,7 @@ func (r *SharedComponent[V]) Unwrap() V { func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error { var err error r.startOnce.Do(func() { - // It's important that status for a sharedcomponent is reported through its - // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates - // and takes priority over the automated status reporting that happens in graph, making the - // status reporting in graph a no-op. - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) - if err = r.component.Start(ctx, host); err != nil { - _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) - } + err = r.component.Start(ctx, host) }) return err } @@ -101,17 +73,7 @@ func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) err func (r *SharedComponent[V]) Shutdown(ctx context.Context) error { var err error r.stopOnce.Do(func() { - // It's important that status for a sharedcomponent is reported through its - // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates - // and takes priority over the automated status reporting that happens in graph, making the - // the status reporting in graph a no-op. - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) err = r.component.Shutdown(ctx) - if err != nil { - _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) - } else { - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) - } r.removeFunc() }) return err diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index 5ce081fa752..112f1d79d07 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -20,7 +20,6 @@ var id = component.NewID("test") type baseComponent struct { component.StartFunc component.ShutdownFunc - telemetry *component.TelemetrySettings } func TestNewSharedComponents(t *testing.T) { @@ -32,11 +31,7 @@ func TestNewSharedComponentsCreateError(t *testing.T) { comps := NewSharedComponents[component.ID, *baseComponent]() assert.Len(t, comps.comps, 0) myErr := errors.New("my error") - _, err := comps.GetOrAdd( - id, - func() (*baseComponent, error) { return nil, myErr }, - newNopTelemetrySettings(), - ) + _, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return nil, myErr }) assert.ErrorIs(t, err, myErr) assert.Len(t, comps.comps, 0) } @@ -45,31 +40,18 @@ func TestSharedComponentsGetOrAdd(t *testing.T) { nop := &baseComponent{} comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd( - id, - func() (*baseComponent, error) { return nop, nil }, - newNopTelemetrySettings(), - ) + got, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return nop, nil }) require.NoError(t, err) assert.Len(t, comps.comps, 1) assert.Same(t, nop, got.Unwrap()) - gotSecond, err := comps.GetOrAdd( - id, - func() (*baseComponent, error) { panic("should not be called") }, - newNopTelemetrySettings(), - ) - + gotSecond, err := comps.GetOrAdd(id, func() (*baseComponent, error) { panic("should not be called") }) require.NoError(t, err) assert.Same(t, got, gotSecond) // Shutdown nop will remove assert.NoError(t, got.Shutdown(context.Background())) assert.Len(t, comps.comps, 0) - gotThird, err := comps.GetOrAdd( - id, - func() (*baseComponent, error) { return nop, nil }, - newNopTelemetrySettings(), - ) + gotThird, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return nop, nil }) require.NoError(t, err) assert.NotSame(t, got, gotThird) } @@ -89,11 +71,7 @@ func TestSharedComponent(t *testing.T) { }} comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd( - id, - func() (*baseComponent, error) { return comp, nil }, - newNopTelemetrySettings(), - ) + got, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return comp, nil }) require.NoError(t, err) assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost())) assert.Equal(t, 1, calledStart) @@ -106,184 +84,3 @@ func TestSharedComponent(t *testing.T) { assert.NoError(t, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) } - -func TestSharedComponentsReportStatus(t *testing.T) { - reportedStatuses := make(map[*component.InstanceID][]component.Status) - newStatusFunc := func() func(*component.StatusEvent) error { - instanceID := &component.InstanceID{} - return func(ev *component.StatusEvent) error { - // Use an event with component.StatusNone to simulate an error. - if ev.Status() == component.StatusNone { - return assert.AnError - } - reportedStatuses[instanceID] = append(reportedStatuses[instanceID], ev.Status()) - return nil - } - } - - comp := &baseComponent{} - comps := NewSharedComponents[component.ID, *baseComponent]() - var telemetrySettings *component.TelemetrySettings - - // make a shared component that represents three instances - for i := 0; i < 3; i++ { - telemetrySettings = newNopTelemetrySettings() - telemetrySettings.ReportComponentStatus = newStatusFunc() - // The initial settings for the shared component need to match the ones passed to the first - // invocation of GetOrAdd so that underlying telemetry settings reference can be used to - // wrap ReportComponentStatus for subsequently added "instances". - if i == 0 { - comp.telemetry = telemetrySettings - } - got, err := comps.GetOrAdd( - id, - func() (*baseComponent, error) { return comp, nil }, - telemetrySettings, - ) - require.NoError(t, err) - assert.Len(t, comps.comps, 1) - assert.Same(t, comp, got.Unwrap()) - } - - // make sure we don't try to represent a fourth instance if we reuse a telemetrySettings - _, _ = comps.GetOrAdd( - id, - func() (*baseComponent, error) { return comp, nil }, - telemetrySettings, - ) - - err := comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) - require.NoError(t, err) - - // ok - err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusOK)) - require.NoError(t, err) - - // simulate an error - err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusNone)) - require.ErrorIs(t, err, assert.AnError) - - // stopping - err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) - require.NoError(t, err) - - // stopped - err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) - require.NoError(t, err) - - // The shared component represents 3 component instances. Reporting status for the shared - // component should report status for each of the instances it represents. - expectedStatuses := []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - } - - require.Equal(t, 3, len(reportedStatuses)) - - for _, actualStatuses := range reportedStatuses { - require.Equal(t, expectedStatuses, actualStatuses) - } -} - -func TestReportStatusOnStartShutdown(t *testing.T) { - for _, tc := range []struct { - name string - startErr error - shutdownErr error - expectedStatuses []component.Status - }{ - { - name: "successful start/stop", - startErr: nil, - shutdownErr: nil, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - }, - { - name: "start error", - startErr: assert.AnError, - shutdownErr: nil, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusPermanentError, - }, - }, - { - name: "shutdown error", - shutdownErr: assert.AnError, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusPermanentError, - }, - }, - } { - t.Run(tc.name, func(t *testing.T) { - reportedStatuses := make(map[*component.InstanceID][]component.Status) - newStatusFunc := func() func(*component.StatusEvent) error { - instanceID := &component.InstanceID{} - return func(ev *component.StatusEvent) error { - reportedStatuses[instanceID] = append(reportedStatuses[instanceID], ev.Status()) - return nil - } - } - base := &baseComponent{} - if tc.startErr != nil { - base.StartFunc = func(context.Context, component.Host) error { - return tc.startErr - } - } - if tc.shutdownErr != nil { - base.ShutdownFunc = func(context.Context) error { - return tc.shutdownErr - } - } - comps := NewSharedComponents[component.ID, *baseComponent]() - var comp *SharedComponent[*baseComponent] - var err error - for i := 0; i < 3; i++ { - telemetrySettings := newNopTelemetrySettings() - telemetrySettings.ReportComponentStatus = newStatusFunc() - if i == 0 { - base.telemetry = telemetrySettings - } - comp, err = comps.GetOrAdd( - id, - func() (*baseComponent, error) { return base, nil }, - telemetrySettings, - ) - require.NoError(t, err) - } - - err = comp.Start(context.Background(), componenttest.NewNopHost()) - require.Equal(t, tc.startErr, err) - - if tc.startErr == nil { - err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusOK)) - require.NoError(t, err) - - err = comp.Shutdown(context.Background()) - require.Equal(t, tc.shutdownErr, err) - } - - require.Equal(t, 3, len(reportedStatuses)) - - for _, actualStatuses := range reportedStatuses { - require.Equal(t, tc.expectedStatuses, actualStatuses) - } - }) - } -} - -// newNopTelemetrySettings streamlines getting a pointer to a NopTelemetrySettings -func newNopTelemetrySettings() *component.TelemetrySettings { - set := componenttest.NewNopTelemetrySettings() - return &set -} diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index f1a59c54430..1a0ad8d0607 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -19,8 +19,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/converter/expandconverter" - "go.opentelemetry.io/collector/extension/extensiontest" - "go.opentelemetry.io/collector/processor/processortest" ) func TestStateString(t *testing.T) { @@ -153,85 +151,6 @@ func TestCollectorReportError(t *testing.T) { assert.Equal(t, StateClosed, col.GetState()) } -func TestComponentStatusWatcher(t *testing.T) { - factories, err := nopFactories() - assert.NoError(t, err) - - // Use a processor factory that creates "unhealthy" processor: one that - // always reports StatusRecoverableError after successful Start. - unhealthyProcessorFactory := processortest.NewUnhealthyProcessorFactory() - factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory - - // Keep track of all status changes in a map. - changedComponents := map[*component.InstanceID][]component.Status{} - var mux sync.Mutex - onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) { - if source.ID.Type() != unhealthyProcessorFactory.Type() { - return - } - mux.Lock() - defer mux.Unlock() - changedComponents[source] = append(changedComponents[source], event.Status()) - } - - // Add a "statuswatcher" extension that will receive notifications when processor - // status changes. - factory := extensiontest.NewStatusWatcherExtensionFactory(onStatusChanged) - factories.Extensions[factory.Type()] = factory - - // Read config from file. This config uses 3 "unhealthy" processors. - validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-statuswatcher.yaml")})) - require.NoError(t, err) - - // Create a collector - col, err := NewCollector(CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: factories, - ConfigProvider: validProvider, - }) - require.NoError(t, err) - - // Start the newly created collector. - wg := startCollector(context.Background(), t, col) - - // An unhealthy processor asynchronously reports a recoverable error. - expectedStatuses := []component.Status{ - component.StatusStarting, - component.StatusRecoverableError, - } - - // The "unhealthy" processors will now begin to asynchronously report StatusRecoverableError. - // We expect to see these reports. - assert.Eventually(t, func() bool { - mux.Lock() - defer mux.Unlock() - - for k, v := range changedComponents { - // All processors must report a status change with the same ID - assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID) - // And all must have the expected statuses - assert.Equal(t, expectedStatuses, v) - } - // We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml - // We must have exactly 3 items in our map. This ensures that the "source" argument - // passed to status change func is unique per instance of source component despite - // components having the same IDs (having same ID for different component instances - // is a normal situation for processors). - return len(changedComponents) == 3 - }, 2*time.Second, time.Millisecond*100) - - col.Shutdown() - wg.Wait() - - // Check for additional statuses after Shutdown. - expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped) - for _, v := range changedComponents { - assert.Equal(t, expectedStatuses, v) - } - - assert.Equal(t, StateClosed, col.GetState()) -} - func TestCollectorSendSignal(t *testing.T) { factories, err := nopFactories() require.NoError(t, err) diff --git a/otelcol/testdata/otelcol-statuswatcher.yaml b/otelcol/testdata/otelcol-statuswatcher.yaml deleted file mode 100644 index 2dcc322d341..00000000000 --- a/otelcol/testdata/otelcol-statuswatcher.yaml +++ /dev/null @@ -1,31 +0,0 @@ -receivers: - nop: - -processors: - nop: - unhealthy: - -exporters: - nop: - -extensions: - statuswatcher: - -service: - telemetry: - metrics: - address: localhost:8888 - extensions: [statuswatcher] - pipelines: - traces: - receivers: [nop] - processors: [nop,unhealthy] - exporters: [nop] - metrics: - receivers: [nop] - processors: [nop,unhealthy] - exporters: [nop] - logs: - receivers: [nop] - processors: [nop,unhealthy] - exporters: [nop] diff --git a/processor/processortest/unhealthy_processor.go b/processor/processortest/unhealthy_processor.go deleted file mode 100644 index eeb2e1b8d87..00000000000 --- a/processor/processortest/unhealthy_processor.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package processortest // import "go.opentelemetry.io/collector/processor/processortest" - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/processor" -) - -// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. -func NewUnhealthyProcessorCreateSettings() processor.CreateSettings { - return processor.CreateSettings{ - TelemetrySettings: componenttest.NewNopTelemetrySettings(), - BuildInfo: component.NewDefaultBuildInfo(), - } -} - -// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. -func NewUnhealthyProcessorFactory() processor.Factory { - return processor.NewFactory( - "unhealthy", - func() component.Config { - return &struct{}{} - }, - processor.WithTraces(createUnhealthyTracesProcessor, component.StabilityLevelStable), - processor.WithMetrics(createUnhealthyMetricsProcessor, component.StabilityLevelStable), - processor.WithLogs(createUnhealthyLogsProcessor, component.StabilityLevelStable), - ) -} - -func createUnhealthyTracesProcessor(_ context.Context, set processor.CreateSettings, _ component.Config, _ consumer.Traces) (processor.Traces, error) { - return &unhealthyProcessor{ - Consumer: consumertest.NewNop(), - telemetry: set.TelemetrySettings, - }, nil -} - -func createUnhealthyMetricsProcessor(_ context.Context, set processor.CreateSettings, _ component.Config, _ consumer.Metrics) (processor.Metrics, error) { - return &unhealthyProcessor{ - Consumer: consumertest.NewNop(), - telemetry: set.TelemetrySettings, - }, nil -} - -func createUnhealthyLogsProcessor(_ context.Context, set processor.CreateSettings, _ component.Config, _ consumer.Logs) (processor.Logs, error) { - return &unhealthyProcessor{ - Consumer: consumertest.NewNop(), - telemetry: set.TelemetrySettings, - }, nil -} - -type unhealthyProcessor struct { - component.StartFunc - component.ShutdownFunc - consumertest.Consumer - telemetry component.TelemetrySettings -} - -func (p unhealthyProcessor) Start(_ context.Context, _ component.Host) error { - go func() { - _ = p.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusRecoverableError)) - }() - return nil -} diff --git a/processor/processortest/unhealthy_processor_test.go b/processor/processortest/unhealthy_processor_test.go deleted file mode 100644 index adc80322f22..00000000000 --- a/processor/processortest/unhealthy_processor_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package processortest - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -func TestNewUnhealthyProcessorFactory(t *testing.T) { - factory := NewUnhealthyProcessorFactory() - require.NotNil(t, factory) - assert.Equal(t, component.Type("unhealthy"), factory.Type()) - cfg := factory.CreateDefaultConfig() - assert.Equal(t, &struct{}{}, cfg) - - traces, err := factory.CreateTracesProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) - require.NoError(t, err) - assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) - assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) - assert.NoError(t, traces.Shutdown(context.Background())) - - metrics, err := factory.CreateMetricsProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) - require.NoError(t, err) - assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) - assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) - assert.NoError(t, metrics.Shutdown(context.Background())) - - logs, err := factory.CreateLogsProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) - require.NoError(t, err) - assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) - assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) - assert.NoError(t, logs.Shutdown(context.Background())) -} diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index cce8b363cd4..11cf3dc6668 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -68,13 +68,9 @@ func createTraces( nextConsumer consumer.Traces, ) (receiver.Traces, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( - oCfg, - func() (*otlpReceiver, error) { - return newOtlpReceiver(oCfg, &set) - }, - &set.TelemetrySettings, - ) + r, err := receivers.GetOrAdd(oCfg, func() (*otlpReceiver, error) { + return newOtlpReceiver(oCfg, set) + }) if err != nil { return nil, err } @@ -93,13 +89,9 @@ func createMetrics( consumer consumer.Metrics, ) (receiver.Metrics, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( - oCfg, - func() (*otlpReceiver, error) { - return newOtlpReceiver(oCfg, &set) - }, - &set.TelemetrySettings, - ) + r, err := receivers.GetOrAdd(oCfg, func() (*otlpReceiver, error) { + return newOtlpReceiver(oCfg, set) + }) if err != nil { return nil, err } @@ -118,13 +110,9 @@ func createLog( consumer consumer.Logs, ) (receiver.Logs, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( - oCfg, - func() (*otlpReceiver, error) { - return newOtlpReceiver(oCfg, &set) - }, - &set.TelemetrySettings, - ) + r, err := receivers.GetOrAdd(oCfg, func() (*otlpReceiver, error) { + return newOtlpReceiver(oCfg, set) + }) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index c07fea243cb..faf9a68fe04 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -43,13 +43,13 @@ type otlpReceiver struct { obsrepGRPC *receiverhelper.ObsReport obsrepHTTP *receiverhelper.ObsReport - settings *receiver.CreateSettings + settings receiver.CreateSettings } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config, set *receiver.CreateSettings) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, set receiver.CreateSettings) (*otlpReceiver, error) { r := &otlpReceiver{ cfg: cfg, settings: set, @@ -62,7 +62,7 @@ func newOtlpReceiver(cfg *Config, set *receiver.CreateSettings) (*otlpReceiver, r.obsrepGRPC, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", - ReceiverCreateSettings: *set, + ReceiverCreateSettings: set, }) if err != nil { return nil, err @@ -70,7 +70,7 @@ func newOtlpReceiver(cfg *Config, set *receiver.CreateSettings) (*otlpReceiver, r.obsrepHTTP, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "http", - ReceiverCreateSettings: *set, + ReceiverCreateSettings: set, }) if err != nil { return nil, err diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index bb073f74cc5..54af9d6544c 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -23,9 +22,8 @@ const zExtensionName = "zextensionname" // Extensions is a map of extensions created from extension configs. type Extensions struct { - telemetry servicetelemetry.TelemetrySettings - extMap map[component.ID]extension.Extension - instanceIDs map[component.ID]*component.InstanceID + telemetry component.TelemetrySettings + extMap map[component.ID]extension.Extension } // Start starts all extensions. @@ -34,10 +32,7 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error { for extID, ext := range bes.extMap { extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - instanceID := bes.instanceIDs[extID] - _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting)) if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { - _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err)) return err } extLogger.Info("Extension started.") @@ -49,15 +44,8 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error { func (bes *Extensions) Shutdown(ctx context.Context) error { bes.telemetry.Logger.Info("Stopping extensions...") var errs error - for extID, ext := range bes.extMap { - instanceID := bes.instanceIDs[extID] - _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping)) - if err := ext.Shutdown(ctx); err != nil { - _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err)) - errs = multierr.Append(errs, err) - continue - } - _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped)) + for _, ext := range bes.extMap { + errs = multierr.Append(errs, ext.Shutdown(ctx)) } return errs @@ -96,14 +84,6 @@ func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) err return errs } -func (bes *Extensions) NotifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) { - for _, ext := range bes.extMap { - if sw, ok := ext.(extension.StatusWatcher); ok { - sw.ComponentStatusChanged(source, event) - } - } -} - func (bes *Extensions) GetExtensions() map[component.ID]component.Component { result := make(map[component.ID]component.Component, len(bes.extMap)) for extID, v := range bes.extMap { @@ -140,7 +120,7 @@ func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) { // Settings holds configuration for building Extensions. type Settings struct { - Telemetry servicetelemetry.TelemetrySettings + Telemetry component.TelemetrySettings BuildInfo component.BuildInfo // Extensions builder for extensions. @@ -150,18 +130,13 @@ type Settings struct { // New creates a new Extensions from Config. func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { exts := &Extensions{ - telemetry: set.Telemetry, - extMap: make(map[component.ID]extension.Extension), - instanceIDs: make(map[component.ID]*component.InstanceID), + telemetry: set.Telemetry, + extMap: make(map[component.ID]extension.Extension), } for _, extID := range cfg { - instanceID := &component.InstanceID{ - ID: extID, - Kind: component.KindExtension, - } extSet := extension.CreateSettings{ ID: extID, - TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID), + TelemetrySettings: set.Telemetry, BuildInfo: set.BuildInfo, } extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) @@ -177,7 +152,6 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { } exts.extMap[extID] = ext - exts.instanceIDs[extID] = instanceID } return exts, nil diff --git a/service/extensions/extensions_test.go b/service/extensions/extensions_test.go index 4b8a574f4f0..cbb3dd5238d 100644 --- a/service/extensions/extensions_test.go +++ b/service/extensions/extensions_test.go @@ -16,8 +16,6 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensiontest" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" - "go.opentelemetry.io/collector/service/internal/status" ) func TestBuildExtensions(t *testing.T) { @@ -83,7 +81,7 @@ func TestBuildExtensions(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, err := New(context.Background(), Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories), }, tt.config) @@ -169,7 +167,7 @@ func TestNotifyConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { extensions, err := New(context.Background(), Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories), }, tt.serviceExtensions) @@ -243,122 +241,3 @@ func newCreateErrorExtensionFactory() extension.Factory { component.StabilityLevelDevelopment, ) } - -func TestStatusReportedOnStartupShutdown(t *testing.T) { - // compare two slices of status events ignoring timestamp - assertEqualStatuses := func(t *testing.T, evts1, evts2 []*component.StatusEvent) { - assert.Equal(t, len(evts1), len(evts2)) - for i := 0; i < len(evts1); i++ { - ev1 := evts1[i] - ev2 := evts2[i] - assert.Equal(t, ev1.Status(), ev2.Status()) - assert.Equal(t, ev1.Err(), ev2.Err()) - } - } - - for _, tc := range []struct { - name string - expectedStatuses []*component.StatusEvent - startErr error - shutdownErr error - }{ - { - name: "successful startup/shutdown", - expectedStatuses: []*component.StatusEvent{ - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewStatusEvent(component.StatusStopped), - }, - startErr: nil, - shutdownErr: nil, - }, - { - name: "start error", - expectedStatuses: []*component.StatusEvent{ - component.NewStatusEvent(component.StatusStarting), - component.NewPermanentErrorEvent(assert.AnError), - }, - startErr: assert.AnError, - shutdownErr: nil, - }, - { - name: "shutdown error", - expectedStatuses: []*component.StatusEvent{ - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewPermanentErrorEvent(assert.AnError), - }, - startErr: nil, - shutdownErr: assert.AnError, - }, - } { - t.Run(tc.name, func(t *testing.T) { - compID := component.NewID("statustest") - factory := newStatusTestExtensionFactory("statustest", tc.startErr, tc.shutdownErr) - config := factory.CreateDefaultConfig() - extensionsConfigs := map[component.ID]component.Config{ - compID: config, - } - factories := map[component.Type]extension.Factory{ - "statustest": factory, - } - extensions, err := New( - context.Background(), - Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), - BuildInfo: component.NewDefaultBuildInfo(), - Extensions: extension.NewBuilder(extensionsConfigs, factories), - }, - []component.ID{compID}, - ) - - assert.NoError(t, err) - - var actualStatuses []*component.StatusEvent - init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) { - actualStatuses = append(actualStatuses, ev) - }) - extensions.telemetry.ReportComponentStatus = statusFunc - init() - - assert.Equal(t, tc.startErr, extensions.Start(context.Background(), componenttest.NewNopHost())) - if tc.startErr == nil { - assert.Equal(t, tc.shutdownErr, extensions.Shutdown(context.Background())) - } - assertEqualStatuses(t, tc.expectedStatuses, actualStatuses) - }) - } -} - -type statusTestExtension struct { - startErr error - shutdownErr error -} - -func (ext *statusTestExtension) Start(_ context.Context, _ component.Host) error { - return ext.startErr -} - -func (ext *statusTestExtension) Shutdown(_ context.Context) error { - return ext.shutdownErr -} - -func newStatusTestExtension(startErr, shutdownErr error) *statusTestExtension { - return &statusTestExtension{ - startErr: startErr, - shutdownErr: shutdownErr, - } -} - -func newStatusTestExtensionFactory(name component.Type, startErr, shutdownErr error) extension.Factory { - return extension.NewFactory( - name, - func() component.Config { - return &struct{}{} - }, - func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) { - return newStatusTestExtension(startErr, shutdownErr), nil - }, - component.StabilityLevelDevelopment, - ) -} diff --git a/service/host.go b/service/host.go index b749564b0dd..d216ae94adb 100644 --- a/service/host.go +++ b/service/host.go @@ -33,10 +33,10 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. -// Deprecated: [0.87.0] Replaced by servicetelemetry.Settings.ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } + func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: @@ -66,10 +66,3 @@ func (host *serviceHost) GetExtensions() map[component.ID]component.Component { func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Component { return host.pipelines.GetExporters() } - -func (host *serviceHost) notifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) { - host.serviceExtensions.NotifyComponentStatusChange(source, event) - if event.Status() == component.StatusFatalError { - host.asyncErrorChannel <- event.Err() - } -} diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 902bc3a5afd..5b8a404d66f 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -22,13 +22,12 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/pipelines" ) // Settings holds configuration for building builtPipelines. type Settings struct { - Telemetry servicetelemetry.TelemetrySettings + Telemetry component.TelemetrySettings BuildInfo component.BuildInfo ReceiverBuilder *receiver.Builder @@ -46,19 +45,12 @@ type Graph struct { // Keep track of how nodes relate to pipelines, so we can declare edges in the graph. pipelines map[component.ID]*pipelineNodes - - // Keep track of status source per node - instanceIDs map[int64]*component.InstanceID - - telemetry servicetelemetry.TelemetrySettings } func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)), - instanceIDs: make(map[int64]*component.InstanceID), - telemetry: set.Telemetry, } for pipelineID := range set.PipelineConfigs { pipelines.pipelines[pipelineID] = &pipelineNodes{ @@ -90,15 +82,14 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) continue } - rcvrNode := g.createReceiver(pipelineID, recvID) + rcvrNode := g.createReceiver(pipelineID.Type(), recvID) pipe.receivers[rcvrNode.ID()] = rcvrNode } pipe.capabilitiesNode = newCapabilitiesNode(pipelineID) for _, procID := range pipelineCfg.Processors { - procNode := g.createProcessor(pipelineID, procID) - pipe.processors = append(pipe.processors, procNode) + pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID)) } pipe.fanOutNode = newFanOutNode(pipelineID) @@ -109,7 +100,7 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID) continue } - expNode := g.createExporter(pipelineID, exprID) + expNode := g.createExporter(pipelineID.Type(), exprID) pipe.exporters[expNode.ID()] = expNode } } @@ -165,7 +156,6 @@ func (g *Graph) createNodes(set Settings) error { continue } connNode := g.createConnector(eID, rID, connID) - g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode } @@ -174,70 +164,36 @@ func (g *Graph) createNodes(set Settings) error { return nil } -func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode { - rcvrNode := newReceiverNode(pipelineID.Type(), recvID) +func (g *Graph) createReceiver(pipelineType component.DataType, recvID component.ID) *receiverNode { + rcvrNode := newReceiverNode(pipelineType, recvID) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { - g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{} return node.(*receiverNode) } g.componentGraph.AddNode(rcvrNode) - g.instanceIDs[rcvrNode.ID()] = &component.InstanceID{ - ID: recvID, - Kind: component.KindReceiver, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } return rcvrNode } func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode { procNode := newProcessorNode(pipelineID, procID) g.componentGraph.AddNode(procNode) - g.instanceIDs[procNode.ID()] = &component.InstanceID{ - ID: procID, - Kind: component.KindProcessor, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } return procNode } -func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode { - expNode := newExporterNode(pipelineID.Type(), exprID) +func (g *Graph) createExporter(pipelineType component.DataType, exprID component.ID) *exporterNode { + expNode := newExporterNode(pipelineType, exprID) if node := g.componentGraph.Node(expNode.ID()); node != nil { - g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{} return node.(*exporterNode) } g.componentGraph.AddNode(expNode) - g.instanceIDs[expNode.ID()] = &component.InstanceID{ - ID: expNode.componentID, - Kind: component.KindExporter, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } return expNode } func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode { connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID) if node := g.componentGraph.Node(connNode.ID()); node != nil { - instanceID := g.instanceIDs[connNode.ID()] - instanceID.PipelineIDs[exprPipelineID] = struct{}{} - instanceID.PipelineIDs[rcvrPipelineID] = struct{}{} return node.(*connectorNode) } g.componentGraph.AddNode(connNode) - g.instanceIDs[connNode.ID()] = &component.InstanceID{ - ID: connNode.componentID, - Kind: component.KindConnector, - PipelineIDs: map[component.ID]struct{}{ - exprPipelineID: {}, - rcvrPipelineID: {}, - }, - } return connNode } @@ -271,22 +227,15 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { for i := len(nodes) - 1; i >= 0; i-- { node := nodes[i] - - // skipped for capabilitiesNodes and fanoutNodes as they are not assigned componentIDs. - var telemetrySettings component.TelemetrySettings - if instanceID, ok := g.instanceIDs[node.ID()]; ok { - telemetrySettings = set.Telemetry.ToComponentTelemetrySettings(instanceID) - } - switch n := node.(type) { case *receiverNode: - err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) + err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) case *processorNode: - err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) + err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) case *exporterNode: - err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder) + err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder) case *connectorNode: - err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID())) + err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID())) case *capabilitiesNode: capability := consumer.Capabilities{MutatesData: false} for _, proc := range g.pipelines[n.pipelineID].processors { @@ -377,19 +326,12 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error { // are started before upstream components. This ensures that each // component's consumer is ready to consume. for i := len(nodes) - 1; i >= 0; i-- { - node := nodes[i] - comp, ok := node.(component.Component) - + comp, ok := nodes[i].(component.Component) if !ok { // Skip capabilities/fanout nodes continue } - - instanceID := g.instanceIDs[node.ID()] - _ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting)) - if compErr := comp.Start(ctx, host); compErr != nil { - _ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr)) return compErr } } @@ -408,24 +350,12 @@ func (g *Graph) ShutdownAll(ctx context.Context) error { // before the consumer is stopped. var errs error for i := 0; i < len(nodes); i++ { - node := nodes[i] - comp, ok := node.(component.Component) - + comp, ok := nodes[i].(component.Component) if !ok { // Skip capabilities/fanout nodes continue } - - instanceID := g.instanceIDs[node.ID()] - _ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping)) - - if compErr := comp.Shutdown(ctx); compErr != nil { - errs = multierr.Append(errs, compErr) - _ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr)) - continue - } - - _ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped)) + errs = multierr.Append(errs, comp.Shutdown(ctx)) } return errs } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 4e6bc0256eb..1279ff145d5 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -27,8 +27,6 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" - "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/testcomponents" "go.opentelemetry.io/collector/service/pipelines" ) @@ -143,13 +141,8 @@ func TestGraphStartStop(t *testing.T) { } pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = servicetelemetry.NewNopTelemetrySettings() - pg.instanceIDs = make(map[int64]*component.InstanceID) - for _, edge := range tt.edges { f, t := &testNode{id: edge[0]}, &testNode{id: edge[1]} - pg.instanceIDs[f.ID()] = &component.InstanceID{} - pg.instanceIDs[t.ID()] = &component.InstanceID{} pg.componentGraph.SetEdge(simple.Edge{F: f, T: t}) } @@ -175,13 +168,6 @@ func TestGraphStartStopCycle(t *testing.T) { c1 := &testNode{id: component.NewIDWithName("c", "1")} e1 := &testNode{id: component.NewIDWithName("e", "1")} - pg.instanceIDs = map[int64]*component.InstanceID{ - r1.ID(): {}, - p1.ID(): {}, - c1.ID(): {}, - e1.ID(): {}, - } - pg.componentGraph.SetEdge(simple.Edge{F: r1, T: p1}) pg.componentGraph.SetEdge(simple.Edge{F: p1, T: c1}) pg.componentGraph.SetEdge(simple.Edge{F: c1, T: e1}) @@ -198,22 +184,15 @@ func TestGraphStartStopCycle(t *testing.T) { func TestGraphStartStopComponentError(t *testing.T) { pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = servicetelemetry.NewNopTelemetrySettings() - r1 := &testNode{ - id: component.NewIDWithName("r", "1"), - startErr: errors.New("foo"), - } - e1 := &testNode{ - id: component.NewIDWithName("e", "1"), - shutdownErr: errors.New("bar"), - } - pg.instanceIDs = map[int64]*component.InstanceID{ - r1.ID(): {}, - e1.ID(): {}, - } pg.componentGraph.SetEdge(simple.Edge{ - F: r1, - T: e1, + F: &testNode{ + id: component.NewIDWithName("r", "1"), + startErr: errors.New("foo"), + }, + T: &testNode{ + id: component.NewIDWithName("e", "1"), + shutdownErr: errors.New("bar"), + }, }) assert.EqualError(t, pg.StartAll(context.Background(), componenttest.NewNopHost()), "foo") assert.EqualError(t, pg.ShutdownAll(context.Background()), "bar") @@ -639,7 +618,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { t.Run(test.name, func(t *testing.T) { // Build the pipeline set := Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -905,7 +884,7 @@ func TestConnectorRouter(t *testing.T) { ctx := context.Background() set := Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -1949,7 +1928,7 @@ func TestGraphBuildErrors(t *testing.T) { t.Run(test.name, func(t *testing.T) { set := Settings{ BuildInfo: component.NewDefaultBuildInfo(), - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), ReceiverBuilder: receiver.NewBuilder( test.receiverCfgs, map[component.Type]receiver.Factory{ @@ -1996,7 +1975,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { nopConnectorFactory := connectortest.NewNopFactory() set := Settings{ - Telemetry: servicetelemetry.NewNopTelemetrySettings(), + Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -2103,152 +2082,6 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } } -func TestStatusReportedOnStartupShutdown(t *testing.T) { - - rNoErr := &testNode{id: component.NewIDWithName("r-no-err", "1")} - rStErr := &testNode{id: component.NewIDWithName("r-st-err", "1"), startErr: assert.AnError} - rSdErr := &testNode{id: component.NewIDWithName("r-sd-err", "1"), shutdownErr: assert.AnError} - - eNoErr := &testNode{id: component.NewIDWithName("e-no-err", "1")} - eStErr := &testNode{id: component.NewIDWithName("e-st-err", "1"), startErr: assert.AnError} - eSdErr := &testNode{id: component.NewIDWithName("e-sd-err", "1"), shutdownErr: assert.AnError} - - instanceIDs := map[*testNode]*component.InstanceID{ - rNoErr: {ID: rNoErr.id}, - rStErr: {ID: rStErr.id}, - rSdErr: {ID: rSdErr.id}, - eNoErr: {ID: eNoErr.id}, - eStErr: {ID: eStErr.id}, - eSdErr: {ID: eSdErr.id}, - } - - // compare two maps of status events ignoring timestamp - assertEqualStatuses := func(t *testing.T, evMap1, evMap2 map[*component.InstanceID][]*component.StatusEvent) { - assert.Equal(t, len(evMap1), len(evMap2)) - for id, evts1 := range evMap1 { - evts2 := evMap2[id] - assert.Equal(t, len(evts1), len(evts2)) - for i := 0; i < len(evts1); i++ { - ev1 := evts1[i] - ev2 := evts2[i] - assert.Equal(t, ev1.Status(), ev2.Status()) - assert.Equal(t, ev1.Err(), ev2.Err()) - } - } - - } - - for _, tc := range []struct { - name string - edge [2]*testNode - expectedStatuses map[*component.InstanceID][]*component.StatusEvent - startupErr error - shutdownErr error - }{ - { - name: "successful startup/shutdown", - edge: [2]*testNode{rNoErr, eNoErr}, - expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ - instanceIDs[rNoErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewStatusEvent(component.StatusStopped), - }, - instanceIDs[eNoErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewStatusEvent(component.StatusStopped), - }, - }, - }, - { - name: "early startup error", - edge: [2]*testNode{rNoErr, eStErr}, - expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ - instanceIDs[eStErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewPermanentErrorEvent(assert.AnError), - }, - }, - startupErr: assert.AnError, - }, - { - name: "late startup error", - edge: [2]*testNode{rStErr, eNoErr}, - expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ - instanceIDs[rStErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewPermanentErrorEvent(assert.AnError), - }, - instanceIDs[eNoErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewStatusEvent(component.StatusStopped), - }, - }, - startupErr: assert.AnError, - }, - { - name: "early shutdown error", - edge: [2]*testNode{rSdErr, eNoErr}, - expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ - instanceIDs[rSdErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewPermanentErrorEvent(assert.AnError), - }, - instanceIDs[eNoErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewStatusEvent(component.StatusStopped), - }, - }, - shutdownErr: assert.AnError, - }, - { - name: "late shutdown error", - edge: [2]*testNode{rNoErr, eSdErr}, - expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ - instanceIDs[rNoErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewStatusEvent(component.StatusStopped), - }, - instanceIDs[eSdErr]: { - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusStopping), - component.NewPermanentErrorEvent(assert.AnError), - }, - }, - shutdownErr: assert.AnError, - }, - } { - t.Run(tc.name, func(t *testing.T) { - pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = servicetelemetry.NewNopTelemetrySettings() - - actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent) - init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) { - actualStatuses[id] = append(actualStatuses[id], ev) - }) - - pg.telemetry.ReportComponentStatus = statusFunc - init() - - e0, e1 := tc.edge[0], tc.edge[1] - pg.instanceIDs = map[int64]*component.InstanceID{ - e0.ID(): instanceIDs[e0], - e1.ID(): instanceIDs[e1], - } - pg.componentGraph.SetEdge(simple.Edge{F: e0, T: e1}) - - assert.Equal(t, tc.startupErr, pg.StartAll(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, tc.shutdownErr, pg.ShutdownAll(context.Background())) - assertEqualStatuses(t, tc.expectedStatuses, actualStatuses) - }) - } -} - func (g *Graph) getReceivers() map[component.DataType]map[component.ID]component.Component { receiversMap := make(map[component.DataType]map[component.ID]component.Component) receiversMap[component.DataTypeTraces] = make(map[component.ID]component.Component) diff --git a/service/internal/servicetelemetry/nop_telemetry_settings.go b/service/internal/servicetelemetry/nop_telemetry_settings.go deleted file mode 100644 index e0ee305346d..00000000000 --- a/service/internal/servicetelemetry/nop_telemetry_settings.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry" - -import ( - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/pdata/pcommon" -) - -// NewNopTelemetrySettings returns a new nop settings for Create* functions. -func NewNopTelemetrySettings() TelemetrySettings { - return TelemetrySettings{ - Logger: zap.NewNop(), - TracerProvider: trace.NewNoopTracerProvider(), - MeterProvider: noop.NewMeterProvider(), - MetricsLevel: configtelemetry.LevelNone, - Resource: pcommon.NewResource(), - ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error { - return nil - }, - } -} diff --git a/service/internal/servicetelemetry/nop_telemetry_settings_test.go b/service/internal/servicetelemetry/nop_telemetry_settings_test.go deleted file mode 100644 index dd5014c7e0f..00000000000 --- a/service/internal/servicetelemetry/nop_telemetry_settings_test.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry - -import ( - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/pdata/pcommon" -) - -func TestNewNopSettings(t *testing.T) { - set := NewNopTelemetrySettings() - - require.NotNil(t, set) - require.IsType(t, TelemetrySettings{}, set) - require.Equal(t, zap.NewNop(), set.Logger) - require.Equal(t, trace.NewNoopTracerProvider(), set.TracerProvider) - require.Equal(t, noop.NewMeterProvider(), set.MeterProvider) - require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel) - require.Equal(t, pcommon.NewResource(), set.Resource) - require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusStarting))) -} diff --git a/service/internal/servicetelemetry/telemetry_settings.go b/service/internal/servicetelemetry/telemetry_settings.go deleted file mode 100644 index 00062764d93..00000000000 --- a/service/internal/servicetelemetry/telemetry_settings.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry" - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/service/internal/status" -) - -// TelemetrySettings mirrors component.TelemetrySettings except for the method signature of -// ReportComponentStatus. The service level TelemetrySettings is not bound a specific component, and -// therefore takes a component.InstanceID as an argument. -type TelemetrySettings component.TelemetrySettingsBase[status.ServiceStatusFunc] - -// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from -// this service level Settings object. -func (s TelemetrySettings) ToComponentTelemetrySettings(id *component.InstanceID) component.TelemetrySettings { - return component.TelemetrySettings{ - Logger: s.Logger, - TracerProvider: s.TracerProvider, - MeterProvider: s.MeterProvider, - MetricsLevel: s.MetricsLevel, - Resource: s.Resource, - ReportComponentStatus: status.NewComponentStatusFunc(id, s.ReportComponentStatus), - } -} diff --git a/service/internal/servicetelemetry/telemetry_settings_test.go b/service/internal/servicetelemetry/telemetry_settings_test.go deleted file mode 100644 index 17300404d2f..00000000000 --- a/service/internal/servicetelemetry/telemetry_settings_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicetelemetry - -import ( - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/pdata/pcommon" -) - -func TestSettings(t *testing.T) { - set := TelemetrySettings{ - Logger: zap.NewNop(), - TracerProvider: trace.NewNoopTracerProvider(), - MeterProvider: noop.NewMeterProvider(), - MetricsLevel: configtelemetry.LevelNone, - Resource: pcommon.NewResource(), - ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error { - return nil - }, - } - require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusOK))) - - compSet := set.ToComponentTelemetrySettings(&component.InstanceID{}) - require.NoError(t, compSet.ReportComponentStatus(component.NewStatusEvent(component.StatusOK))) -} diff --git a/service/internal/status/status.go b/service/internal/status/status.go deleted file mode 100644 index bbccc6939ae..00000000000 --- a/service/internal/status/status.go +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package status // import "go.opentelemetry.io/collector/service/internal/status" - -import ( - "errors" - "fmt" - "sync" - - "go.opentelemetry.io/collector/component" -) - -// onTransitionFunc receives a component.StatusEvent on a successful state transition -type onTransitionFunc func(*component.StatusEvent) - -// errInvalidStateTransition is returned for invalid state transitions -var errInvalidStateTransition = errors.New("invalid state transition") - -// fsm is a finite state machine that models transitions for component status -type fsm struct { - current *component.StatusEvent - transitions map[component.Status]map[component.Status]struct{} - onTransition onTransitionFunc -} - -// transition will attempt to execute a state transition. If it's successful, it calls the -// onTransitionFunc with a StatusEvent representing the new state. Returns an error if the arguments -// result in an invalid status, or if the state transition is not valid. -func (m *fsm) transition(ev *component.StatusEvent) error { - if _, ok := m.transitions[m.current.Status()][ev.Status()]; !ok { - return fmt.Errorf( - "cannot transition from %s to %s: %w", - m.current.Status(), - ev.Status(), - errInvalidStateTransition, - ) - } - m.current = ev - m.onTransition(ev) - return nil -} - -// newFSM creates a state machine with all valid transitions for component.Status. -// The initial state is set to component.StatusNone. -func newFSM(onTransition onTransitionFunc) *fsm { - return &fsm{ - current: component.NewStatusEvent(component.StatusNone), - onTransition: onTransition, - transitions: map[component.Status]map[component.Status]struct{}{ - component.StatusNone: { - component.StatusStarting: {}, - }, - component.StatusStarting: { - component.StatusOK: {}, - component.StatusRecoverableError: {}, - component.StatusPermanentError: {}, - component.StatusFatalError: {}, - component.StatusStopping: {}, - }, - component.StatusOK: { - component.StatusRecoverableError: {}, - component.StatusPermanentError: {}, - component.StatusFatalError: {}, - component.StatusStopping: {}, - }, - component.StatusRecoverableError: { - component.StatusOK: {}, - component.StatusPermanentError: {}, - component.StatusFatalError: {}, - component.StatusStopping: {}, - }, - component.StatusPermanentError: {}, - component.StatusFatalError: {}, - component.StatusStopping: { - component.StatusRecoverableError: {}, - component.StatusPermanentError: {}, - component.StatusFatalError: {}, - component.StatusStopped: {}, - }, - component.StatusStopped: {}, - }, - } -} - -// InitFunc can be used to toggle a ready flag to true -type InitFunc func() - -// readFunc can be used to check the value of a ready flag -type readyFunc func() bool - -// initAndReadyFuncs returns a pair of functions to set and check a boolean ready flag -func initAndReadyFuncs() (InitFunc, readyFunc) { - mu := sync.RWMutex{} - isReady := false - - init := func() { - mu.Lock() - defer mu.Unlock() - isReady = true - } - - ready := func() bool { - mu.RLock() - defer mu.RUnlock() - return isReady - } - - return init, ready -} - -// NotifyStatusFunc is the receiver of status events after successful state transitions -type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent) - -// ServiceStatusFunc is the expected type of ReportComponentStatus for servicetelemetry.Settings -type ServiceStatusFunc func(*component.InstanceID, *component.StatusEvent) error - -// errStatusNotReady is returned when trying to report status before service start -var errStatusNotReady = errors.New("report component status is not ready until service start") - -// NewServiceStatusFunc returns a function to be used as ReportComponentStatus for -// servicetelemetry.Settings, which differs from component.TelemetrySettings in that -// the service version does not correspond to a specific component, and thus needs -// the a component.InstanceID as a parameter. -func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, ServiceStatusFunc) { - init, isReady := initAndReadyFuncs() - // mu synchronizes access to the fsmMap and the underlying fsm during a state transition - mu := sync.Mutex{} - fsmMap := make(map[*component.InstanceID]*fsm) - return init, - func(id *component.InstanceID, ev *component.StatusEvent) error { - if !isReady() { - return errStatusNotReady - } - mu.Lock() - defer mu.Unlock() - fsm, ok := fsmMap[id] - if !ok { - fsm = newFSM(func(ev *component.StatusEvent) { - notifyStatusChange(id, ev) - }) - fsmMap[id] = fsm - } - return fsm.transition(ev) - } - -} - -// NewComponentStatusFunc returns a function to be used as ReportComponentStatus for -// component.TelemetrySettings, which differs from servicetelemetry.Settings in that -// the component version is tied to specific component instance. -func NewComponentStatusFunc(id *component.InstanceID, srvStatus ServiceStatusFunc) component.StatusFunc { - return func(ev *component.StatusEvent) error { - return srvStatus(id, ev) - } -} diff --git a/service/internal/status/status_test.go b/service/internal/status/status_test.go deleted file mode 100644 index c439cea39af..00000000000 --- a/service/internal/status/status_test.go +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package status - -import ( - "fmt" - "sync" - "testing" - - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" -) - -func TestStatusFSM(t *testing.T) { - for _, tc := range []struct { - name string - reportedStatuses []component.Status - expectedStatuses []component.Status - expectedErrorCount int - }{ - { - name: "successful startup and shutdown", - reportedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - }, - { - name: "component recovered", - reportedStatuses: []component.Status{ - component.StatusStarting, - component.StatusRecoverableError, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusRecoverableError, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - }, - { - name: "repeated events are errors", - reportedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusRecoverableError, - component.StatusRecoverableError, - component.StatusRecoverableError, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusRecoverableError, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - expectedErrorCount: 2, - }, - { - name: "PermanentError is terminal", - reportedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusPermanentError, - component.StatusOK, - }, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusPermanentError, - }, - expectedErrorCount: 1, - }, - { - name: "FatalError is terminal", - reportedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusFatalError, - component.StatusOK, - }, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusFatalError, - }, - expectedErrorCount: 1, - }, - { - name: "Stopped is terminal", - reportedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - component.StatusOK, - }, - expectedStatuses: []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - }, - expectedErrorCount: 1, - }, - } { - t.Run(tc.name, func(t *testing.T) { - var receivedStatuses []component.Status - fsm := newFSM( - func(ev *component.StatusEvent) { - receivedStatuses = append(receivedStatuses, ev.Status()) - }, - ) - - errorCount := 0 - for _, status := range tc.reportedStatuses { - if err := fsm.transition(component.NewStatusEvent(status)); err != nil { - errorCount++ - require.ErrorIs(t, err, errInvalidStateTransition) - } - } - - require.Equal(t, tc.expectedErrorCount, errorCount) - require.Equal(t, tc.expectedStatuses, receivedStatuses) - }) - } -} - -func TestValidSeqsToStopped(t *testing.T) { - events := []*component.StatusEvent{ - component.NewStatusEvent(component.StatusStarting), - component.NewStatusEvent(component.StatusOK), - component.NewStatusEvent(component.StatusRecoverableError), - component.NewStatusEvent(component.StatusPermanentError), - component.NewStatusEvent(component.StatusFatalError), - } - - for _, ev := range events { - name := fmt.Sprintf("transition from: %s to: %s invalid", ev.Status(), component.StatusStopped) - t.Run(name, func(t *testing.T) { - fsm := newFSM(func(*component.StatusEvent) {}) - if ev.Status() != component.StatusStarting { - require.NoError(t, fsm.transition(component.NewStatusEvent(component.StatusStarting))) - } - require.NoError(t, fsm.transition(ev)) - // skipping to stopped is not allowed - err := fsm.transition(component.NewStatusEvent(component.StatusStopped)) - require.ErrorIs(t, err, errInvalidStateTransition) - - // stopping -> stopped is allowed for non-fatal, non-permanent errors - err = fsm.transition(component.NewStatusEvent(component.StatusStopping)) - if ev.Status() == component.StatusPermanentError || ev.Status() == component.StatusFatalError { - require.ErrorIs(t, err, errInvalidStateTransition) - } else { - require.NoError(t, err) - require.NoError(t, fsm.transition(component.NewStatusEvent(component.StatusStopped))) - } - }) - } - -} - -func TestStatusFuncs(t *testing.T) { - id1 := &component.InstanceID{} - id2 := &component.InstanceID{} - - actualStatuses := make(map[*component.InstanceID][]component.Status) - statusFunc := func(id *component.InstanceID, ev *component.StatusEvent) { - actualStatuses[id] = append(actualStatuses[id], ev.Status()) - } - - statuses1 := []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - } - - statuses2 := []component.Status{ - component.StatusStarting, - component.StatusOK, - component.StatusRecoverableError, - component.StatusOK, - component.StatusStopping, - component.StatusStopped, - } - - expectedStatuses := map[*component.InstanceID][]component.Status{ - id1: statuses1, - id2: statuses2, - } - - init, serviceStatusFn := NewServiceStatusFunc(statusFunc) - comp1Func := NewComponentStatusFunc(id1, serviceStatusFn) - comp2Func := NewComponentStatusFunc(id2, serviceStatusFn) - init() - - for _, st := range statuses1 { - require.NoError(t, comp1Func(component.NewStatusEvent(st))) - } - - for _, st := range statuses2 { - require.NoError(t, comp2Func(component.NewStatusEvent(st))) - } - - require.Equal(t, expectedStatuses, actualStatuses) -} - -func TestStatusFuncsConcurrent(t *testing.T) { - ids := []*component.InstanceID{{}, {}, {}, {}} - count := 0 - statusFunc := func(id *component.InstanceID, ev *component.StatusEvent) { - count++ - } - init, serviceStatusFn := NewServiceStatusFunc(statusFunc) - init() - - wg := sync.WaitGroup{} - wg.Add(len(ids)) - - for _, id := range ids { - id := id - go func() { - compFn := NewComponentStatusFunc(id, serviceStatusFn) - _ = compFn(component.NewStatusEvent(component.StatusStarting)) - for i := 0; i < 1000; i++ { - _ = compFn(component.NewStatusEvent(component.StatusRecoverableError)) - _ = compFn(component.NewStatusEvent(component.StatusOK)) - } - wg.Done() - }() - } - - wg.Wait() - require.Equal(t, 8004, count) -} - -func TestStatusFuncReady(t *testing.T) { - statusFunc := func(*component.InstanceID, *component.StatusEvent) {} - init, serviceStatusFn := NewServiceStatusFunc(statusFunc) - id := &component.InstanceID{} - - err := serviceStatusFn(id, component.NewStatusEvent(component.StatusStarting)) - require.ErrorIs(t, err, errStatusNotReady) - - init() - - err = serviceStatusFn(id, component.NewStatusEvent(component.StatusStarting)) - require.NoError(t, err) -} diff --git a/service/service.go b/service/service.go index 477ed2266ef..ed9540ec948 100644 --- a/service/service.go +++ b/service/service.go @@ -29,8 +29,6 @@ import ( "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/proctelemetry" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" - "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/telemetry" ) @@ -71,11 +69,10 @@ type Settings struct { type Service struct { buildInfo component.BuildInfo telemetry *telemetry.Telemetry - telemetrySettings servicetelemetry.TelemetrySettings + telemetrySettings component.TelemetrySettings host *serviceHost telemetryInitializer *telemetryInitializer collectorConf *confmap.Conf - statusInit status.InitFunc } func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { @@ -107,7 +104,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { res := buildResource(set.BuildInfo, cfg.Telemetry) pcommonRes := pdataFromSdk(res) - srv.telemetrySettings = servicetelemetry.TelemetrySettings{ + srv.telemetrySettings = component.TelemetrySettings{ Logger: srv.telemetry.Logger(), TracerProvider: srv.telemetry.TracerProvider(), MeterProvider: noop.NewMeterProvider(), @@ -122,8 +119,6 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { } srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp - srv.statusInit, srv.telemetrySettings.ReportComponentStatus = - status.NewServiceStatusFunc(srv.host.notifyComponentStatusChange) // process the configuration and initialize the pipeline if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil { @@ -145,9 +140,6 @@ func (srv *Service) Start(ctx context.Context) error { zap.Int("NumCPU", runtime.NumCPU()), ) - // enable status reporting - srv.statusInit() - if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) } diff --git a/service/service_test.go b/service/service_test.go index 8b47218dc6b..16c5fd6f82f 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -414,28 +414,6 @@ func TestServiceTelemetryLogger(t *testing.T) { assert.NotNil(t, srv.telemetrySettings.Logger) } -func TestServiceFatalError(t *testing.T) { - set := newNopSettings() - set.AsyncErrorChannel = make(chan error) - - srv, err := New(context.Background(), set, newNopConfig()) - require.NoError(t, err) - - assert.NoError(t, srv.Start(context.Background())) - t.Cleanup(func() { - assert.NoError(t, srv.Shutdown(context.Background())) - }) - - go func() { - ev := component.NewFatalErrorEvent(assert.AnError) - srv.host.notifyComponentStatusChange(&component.InstanceID{}, ev) - }() - - err = <-srv.host.asyncErrorChannel - - require.ErrorIs(t, err, assert.AnError) -} - func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) { for key, labelValue := range expectedLabels { lookupKey, ok := prometheusToOtelConv[key] diff --git a/service/telemetry.go b/service/telemetry.go index bec39c8adf7..70fe22bc637 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -29,10 +29,10 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/service/internal/proctelemetry" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/telemetry" ) @@ -71,7 +71,7 @@ func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig b } } -func (tel *telemetryInitializer) init(res *resource.Resource, settings servicetelemetry.TelemetrySettings, cfg telemetry.Config, asyncErrorChannel chan error) error { +func (tel *telemetryInitializer) init(res *resource.Resource, settings component.TelemetrySettings, cfg telemetry.Config, asyncErrorChannel chan error) error { if cfg.Metrics.Level == configtelemetry.LevelNone || (cfg.Metrics.Address == "" && len(cfg.Metrics.Readers) == 0) { settings.Logger.Info( "Skipping telemetry setup.", diff --git a/service/telemetry_test.go b/service/telemetry_test.go index e22c7c88fd4..a4144e27890 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/internal/proctelemetry" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/telemetry" ) @@ -273,7 +272,7 @@ func TestTelemetryInit(t *testing.T) { } otelRes := buildResource(buildInfo, *tc.cfg) res := pdataFromSdk(otelRes) - settings := servicetelemetry.TelemetrySettings{ + settings := component.TelemetrySettings{ Logger: zap.NewNop(), Resource: res, }