Skip to content

Commit

Permalink
Move error out of ReportComponentStatus function signature, use `Re…
Browse files Browse the repository at this point in the history
…portStatus` instead (#9175)

Fixes #9148
  • Loading branch information
atoulme authored Jan 9, 2024
1 parent 6ed7ad1 commit c5a2c78
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 122 deletions.
25 changes: 25 additions & 0 deletions .chloggen/statuschange_invalidtransition.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: status

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate `ReportComponentStatus` in favor of `ReportStatus`. This new function does not return an error.

# One or more tracking issues or pull requests related to the change
issues: [9148]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 1 addition & 2 deletions component/componenttest/nop_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func NewNopTelemetrySettings() component.TelemetrySettings {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.StatusEvent) error {
return nil
ReportStatus: func(*component.StatusEvent) {
},
}
}
16 changes: 8 additions & 8 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ type TelemetrySettings struct {

// ReportComponentStatus allows a component to report runtime changes in status. The service
// will automatically report status for a component during startup and shutdown. Components can
// use this method to report status after start and before shutdown. ReportComponentStatus
// will only return errors if the API used incorrectly. The two scenarios where an error will
// be returned are:
//
// - An illegal state transition
// - Calling this method before component startup
//
// If the API is being used properly, these errors are safe to ignore.
// use this method to report status after start and before shutdown.
// Deprecated: [v0.92.0] This function will be removed in a future release.
// Use ReportStatus instead.
ReportComponentStatus func(*StatusEvent) error

// ReportStatus allows a component to report runtime changes in status. The service
// will automatically report status for a component during startup and shutdown. Components can
// use this method to report status after start and before shutdown.
ReportStatus func(*StatusEvent)
}
23 changes: 10 additions & 13 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,14 @@ type Map[K comparable, V component.Component] struct {
func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) {
if c, ok := m.components[key]; ok {
// If we haven't already seen this telemetry settings, this shared component represents
// another instance. Wrap ReportComponentStatus to report for all instances this shared
// another instance. Wrap ReportStatus to report for all instances this shared
// component represents.
if _, ok := c.seenSettings[telemetrySettings]; !ok {
c.seenSettings[telemetrySettings] = struct{}{}
prev := c.telemetry.ReportComponentStatus
c.telemetry.ReportComponentStatus = func(ev *component.StatusEvent) error {
err := telemetrySettings.ReportComponentStatus(ev)
if prevErr := prev(ev); prevErr != nil {
err = prevErr
}
return err
prev := c.telemetry.ReportStatus
c.telemetry.ReportStatus = func(ev *component.StatusEvent) {
telemetrySettings.ReportStatus(ev)
prev(ev)
}
}
return c, nil
Expand Down Expand Up @@ -89,9 +86,9 @@ func (c *Component[V]) Start(ctx context.Context, host component.Host) error {
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// status reporting in graph a no-op.
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
c.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStarting))
if err = c.component.Start(ctx, host); err != nil {
_ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
c.telemetry.ReportStatus(component.NewPermanentErrorEvent(err))
}
})
return err
Expand All @@ -105,12 +102,12 @@ func (c *Component[V]) Shutdown(ctx context.Context) error {
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// status reporting in graph a no-op.
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
c.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopping))
err = c.component.Shutdown(ctx)
if err != nil {
_ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
c.telemetry.ReportStatus(component.NewPermanentErrorEvent(err))
} else {
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped))
c.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped))
}
c.removeFunc()
})
Expand Down
38 changes: 14 additions & 24 deletions internal/sharedcomponent/sharedcomponent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,13 @@ func TestSharedComponent(t *testing.T) {
}
func TestSharedComponentsReportStatus(t *testing.T) {
reportedStatuses := make(map[*component.InstanceID][]component.Status)
newStatusFunc := func() func(*component.StatusEvent) error {
newStatusFunc := func() func(*component.StatusEvent) {
instanceID := &component.InstanceID{}
return func(ev *component.StatusEvent) error {
// Use an event with component.StatusNone to simulate an error.
return func(ev *component.StatusEvent) {
if ev.Status() == component.StatusNone {
return assert.AnError
return
}
reportedStatuses[instanceID] = append(reportedStatuses[instanceID], ev.Status())
return nil
}
}

Expand All @@ -128,10 +126,10 @@ func TestSharedComponentsReportStatus(t *testing.T) {
// make a shared component that represents three instances
for i := 0; i < 3; i++ {
telemetrySettings = newNopTelemetrySettings()
telemetrySettings.ReportComponentStatus = newStatusFunc()
telemetrySettings.ReportStatus = newStatusFunc()
// The initial settings for the shared component need to match the ones passed to the first
// invocation of LoadOrStore so that underlying telemetry settings reference can be used to
// wrap ReportComponentStatus for subsequently added "instances".
// wrap ReportStatus for subsequently added "instances".
if i == 0 {
comp.telemetry = telemetrySettings
}
Expand All @@ -152,24 +150,18 @@ func TestSharedComponentsReportStatus(t *testing.T) {
telemetrySettings,
)

err := comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
require.NoError(t, err)
comp.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStarting))

// ok
err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusOK))
require.NoError(t, err)
comp.telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK))

// simulate an error
err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusNone))
require.ErrorIs(t, err, assert.AnError)
comp.telemetry.ReportStatus(component.NewStatusEvent(component.StatusNone))

// stopping
err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
require.NoError(t, err)
comp.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopping))

// stopped
err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped))
require.NoError(t, err)
comp.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped))

// The shared component represents 3 component instances. Reporting status for the shared
// component should report status for each of the instances it represents.
Expand Down Expand Up @@ -227,11 +219,10 @@ func TestReportStatusOnStartShutdown(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
reportedStatuses := make(map[*component.InstanceID][]component.Status)
newStatusFunc := func() func(*component.StatusEvent) error {
newStatusFunc := func() func(*component.StatusEvent) {
instanceID := &component.InstanceID{}
return func(ev *component.StatusEvent) error {
return func(ev *component.StatusEvent) {
reportedStatuses[instanceID] = append(reportedStatuses[instanceID], ev.Status())
return nil
}
}
base := &baseComponent{}
Expand All @@ -250,7 +241,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) {
var err error
for i := 0; i < 3; i++ {
telemetrySettings := newNopTelemetrySettings()
telemetrySettings.ReportComponentStatus = newStatusFunc()
telemetrySettings.ReportStatus = newStatusFunc()
if i == 0 {
base.telemetry = telemetrySettings
}
Expand All @@ -266,8 +257,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) {
require.Equal(t, tc.startErr, err)

if tc.startErr == nil {
err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusOK))
require.NoError(t, err)
comp.telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK))

err = comp.Shutdown(context.Background())
require.Equal(t, tc.shutdownErr, err)
Expand Down
2 changes: 1 addition & 1 deletion processor/processortest/unhealthy_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type unhealthyProcessor struct {

func (p unhealthyProcessor) Start(_ context.Context, _ component.Host) error {
go func() {
_ = p.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusRecoverableError))
p.telemetry.ReportStatus(component.NewStatusEvent(component.StatusRecoverableError))
}()
return nil
}
12 changes: 6 additions & 6 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
return err
}
_ = bes.telemetry.Status.ReportComponentOKIfStarting(instanceID)
bes.telemetry.Status.ReportOKIfStarting(instanceID)
extLogger.Info("Extension started.")
}
return nil
Expand All @@ -62,19 +62,19 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)
if err := ext.Shutdown(ctx); err != nil {
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
errs = multierr.Append(errs, err)
continue
}
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
Expand Down
2 changes: 2 additions & 0 deletions service/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
var actualStatuses []*component.StatusEvent
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses = append(actualStatuses, ev)
}, func(err error) {
require.NoError(t, err)
})
extensions.telemetry.Status = rep
rep.Ready()
Expand Down
12 changes: 6 additions & 6 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,20 +388,20 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)

if compErr := comp.Start(ctx, host); compErr != nil {
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
return compErr
}

_ = g.telemetry.Status.ReportComponentOKIfStarting(instanceID)
g.telemetry.Status.ReportOKIfStarting(instanceID)
}
return nil
}
Expand All @@ -427,21 +427,21 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)

if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
continue
}

_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
Expand Down
1 change: 1 addition & 0 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent)
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses[id] = append(actualStatuses[id], ev)
}, func(err error) {
})

pg.telemetry.Status = rep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ func NewNopTelemetrySettings() TelemetrySettings {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}, func(err error) {}),
}
}
11 changes: 5 additions & 6 deletions service/internal/servicetelemetry/nop_telemetry_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ func TestNewNopSettings(t *testing.T) {
require.Equal(t, noopmetric.NewMeterProvider(), set.MeterProvider)
require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel)
require.Equal(t, pcommon.NewResource(), set.Resource)
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
set.Status.ReportStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
)
require.NoError(t, set.Status.ReportComponentOKIfStarting(&component.InstanceID{}))
set.Status.ReportOKIfStarting(&component.InstanceID{})

}
17 changes: 11 additions & 6 deletions service/internal/servicetelemetry/telemetry_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ type TelemetrySettings struct {
// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from
// this service level Settings object.
func (s TelemetrySettings) ToComponentTelemetrySettings(id *component.InstanceID) component.TelemetrySettings {
statusFunc := status.NewReportStatusFunc(id, s.Status.ReportStatus)
return component.TelemetrySettings{
Logger: s.Logger,
TracerProvider: s.TracerProvider,
MeterProvider: s.MeterProvider,
MetricsLevel: s.MetricsLevel,
Resource: s.Resource,
ReportComponentStatus: status.NewComponentStatusFunc(id, s.Status.ReportComponentStatus),
Logger: s.Logger,
TracerProvider: s.TracerProvider,
MeterProvider: s.MeterProvider,
MetricsLevel: s.MetricsLevel,
Resource: s.Resource,
ReportComponentStatus: func(event *component.StatusEvent) error {
statusFunc(event)
return nil
},
ReportStatus: statusFunc,
}
}
16 changes: 8 additions & 8 deletions service/internal/servicetelemetry/telemetry_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ func TestSettings(t *testing.T) {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
Status: status.NewReporter(
func(*component.InstanceID, *component.StatusEvent) {},
func(err error) { require.NoError(t, err) }),
}
set.Status.Ready()
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
set.Status.ReportStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
)
require.NoError(t, set.Status.ReportComponentOKIfStarting(&component.InstanceID{}))
set.Status.ReportOKIfStarting(&component.InstanceID{})

compSet := set.ToComponentTelemetrySettings(&component.InstanceID{})
require.NoError(t, compSet.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)))
compSet.ReportStatus(component.NewStatusEvent(component.StatusStarting))
}
Loading

0 comments on commit c5a2c78

Please sign in to comment.