diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index 649e06884b01..3518d584ac02 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -17,7 +17,7 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} -func (nh *nopHost) ReportComponentStatus(_ *component.StatusEvent) {} +func (nh *nopHost) ReportComponentStatus(_ component.Status, _ ...component.StatusEventOption) {} func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil diff --git a/component/host.go b/component/host.go index 79e653a19a2e..3939ce57a3f7 100644 --- a/component/host.go +++ b/component/host.go @@ -21,7 +21,7 @@ type Host interface { // May be called by the component any time after Component.Start is called or while // Component.Start call is executing. // May be called concurrently with itself. - ReportComponentStatus(event *StatusEvent) + ReportComponentStatus(status Status, options ...StatusEventOption) // GetFactory of the specified kind. Returns the factory for a component type. // This allows components to create other components. For example: diff --git a/component/status.go b/component/status.go index 91aed56236b3..cb56812ad64b 100644 --- a/component/status.go +++ b/component/status.go @@ -5,44 +5,104 @@ package component // import "go.opentelemetry.io/collector/component" import ( "errors" + "fmt" + "time" ) type Status int32 +// Enumeration of possible component statuses const ( - StatusOK Status = iota - StatusError + StatusStarting Status = iota + StatusOK + StatusRecoverableError + StatusPermanentError + StatusFatalError + StatusStopping + StatusStopped ) +// String returns a string representation of a Status +func (s Status) String() string { + switch s { + case StatusStarting: + return "StatusStarting" + case StatusOK: + return "StatusOK" + case StatusRecoverableError: + return "StatusRecoverableError" + case StatusPermanentError: + return "StatusPermanentError" + case StatusFatalError: + return "StatusFatalError" + case StatusStopping: + return "StatusStopping" + case StatusStopped: + return "StatusStopped" + } + return "StatusUnknown" +} + +// errorStatuses is a set of statuses that can have associated errors +var errorStatuses = map[Status]struct{}{ + StatusRecoverableError: {}, + StatusPermanentError: {}, + StatusFatalError: {}, +} + +// StatusEvent contains a status and timestamp, and can contain an error type StatusEvent struct { - status Status - err error + status Status + err error + timestamp time.Time } +// Status returns the Status (enum) associated with the StatusEvent func (ev *StatusEvent) Status() Status { return ev.status } -// Err returns the error associated with the ComponentEvent. +// Err returns the error associated with the StatusEvent. func (ev *StatusEvent) Err() error { return ev.err } +// Timestamp returns the timestamp associated with the StatusEvent +func (ev *StatusEvent) Timestamp() time.Time { + return ev.timestamp +} + // StatusEventOption applies options to a StatusEvent. type StatusEventOption func(*StatusEvent) error -// WithError sets the error object of the Event. It is optional +// errStatusEventInvalidArgument indicates an invalid option was specified when creating a status +// event. This will happen when using WithError for a non-error status. +var errStatusEventInvalidArgument = errors.New("status event argument error") + +// WithError sets the error object of the StatusEvent. It is optional // and should only be applied to an Event of type ComponentError. func WithError(err error) StatusEventOption { return func(o *StatusEvent) error { - if o.status == StatusOK { - return errors.New("event with ComponentOK cannot have an error") + if _, ok := errorStatuses[o.status]; !ok { + return fmt.Errorf( + "event with %s cannot have an error: %w", + o.status, + errStatusEventInvalidArgument, + ) } o.err = err return nil } } +// WithTimestamp is optional, when used it sets the timestamp of the StatusEvent. +func WithTimestamp(t time.Time) StatusEventOption { + return func(o *StatusEvent) error { + o.timestamp = t + return nil + } +} + // NewStatusEvent creates and returns a StatusEvent with default and provided // options. Will return an error if an error is provided for a non-error event // type (status.ComponentOK). @@ -58,6 +118,10 @@ func NewStatusEvent(status Status, options ...StatusEventOption) (*StatusEvent, } } + if ev.timestamp.IsZero() { + ev.timestamp = time.Now() + } + return &ev, nil } diff --git a/component/status_test.go b/component/status_test.go index 1faed3e33283..44870d3c9dc2 100644 --- a/component/status_test.go +++ b/component/status_test.go @@ -3,29 +3,77 @@ package component import ( - "errors" + "fmt" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestStatusEventOK(t *testing.T) { - event, err := NewStatusEvent(StatusOK) - require.NoError(t, err) - require.Equal(t, StatusOK, event.Status()) - require.Nil(t, event.Err()) +func TestStatusEventWithoutError(t *testing.T) { + statuses := []Status{ + StatusStarting, + StatusOK, + StatusRecoverableError, + StatusPermanentError, + StatusFatalError, + StatusStopping, + StatusStopped, + } + + for _, status := range statuses { + t.Run(fmt.Sprintf("%s without error", status), func(t *testing.T) { + ev, err := NewStatusEvent(status) + require.NoError(t, err) + require.Equal(t, status, ev.Status()) + require.Nil(t, ev.Err()) + require.False(t, ev.Timestamp().IsZero()) + }) + } +} + +func TestStatusEventWithError(t *testing.T) { + statuses := []Status{ + StatusRecoverableError, + StatusRecoverableError, + StatusFatalError, + } + + for _, status := range statuses { + t.Run(fmt.Sprintf("error status: %s with error", status), func(t *testing.T) { + ev, err := NewStatusEvent(status, WithError(assert.AnError)) + require.NoError(t, err) + require.Equal(t, status, ev.Status()) + require.Equal(t, assert.AnError, ev.Err()) + require.False(t, ev.Timestamp().IsZero()) + }) + } } -func TestStatusEventOKWithError(t *testing.T) { - event, err := NewStatusEvent(StatusOK, WithError(errors.New("an error"))) - require.Error(t, err) - require.Nil(t, event) +func TestNonErrorStatusWithError(t *testing.T) { + statuses := []Status{ + StatusStarting, + StatusOK, + StatusStopping, + StatusStopped, + } + + for _, status := range statuses { + t.Run(fmt.Sprintf("non error status: %s with error", status), func(t *testing.T) { + ev, err := NewStatusEvent(status, WithError(assert.AnError)) + require.Error(t, err) + require.ErrorIs(t, err, errStatusEventInvalidArgument) + require.Nil(t, ev) + }) + } } -func TestStatusEventError(t *testing.T) { - eventErr := errors.New("an error") - event, err := NewStatusEvent(StatusError, WithError(eventErr)) +func TestStatusEventWithTimestamp(t *testing.T) { + ts := time.Now() + ev, err := NewStatusEvent(StatusOK, WithTimestamp(ts)) require.NoError(t, err) - require.Equal(t, StatusError, event.Status()) - require.Equal(t, eventErr, event.Err()) + require.Equal(t, StatusOK, ev.Status()) + require.Nil(t, ev.Err()) + require.Equal(t, ts, ev.Timestamp()) } diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index 17174cd10d63..968e138219b5 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -166,6 +166,10 @@ func TestComponentStatusWatcher(t *testing.T) { changedComponents := map[*component.InstanceID]component.Status{} var mux sync.Mutex onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) { + // skip the startup notifications + if event.Status() == component.StatusStarting { + return + } mux.Lock() defer mux.Unlock() changedComponents[source] = event.Status() @@ -201,7 +205,7 @@ func TestComponentStatusWatcher(t *testing.T) { // All processors must report a status change with the same ID assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID) // And all must be in StatusError - assert.EqualValues(t, component.StatusError, v) + assert.EqualValues(t, component.StatusRecoverableError, v) } // We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml // We must have exactly 3 items in our map. This ensures that the "source" argument diff --git a/processor/processortest/unhealthy_processor.go b/processor/processortest/unhealthy_processor.go index 3bf6e86e790d..9b6daab115f8 100644 --- a/processor/processortest/unhealthy_processor.go +++ b/processor/processortest/unhealthy_processor.go @@ -59,8 +59,7 @@ type unhealthyProcessor struct { func (unhealthyProcessor) Start(_ context.Context, host component.Host) error { go func() { - evt, _ := component.NewStatusEvent(component.StatusError) - host.ReportComponentStatus(evt) + host.ReportComponentStatus(component.StatusRecoverableError) }() return nil } diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 056664673afa..92ba2d1c82c9 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -10,20 +10,23 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/servicehost" + "go.opentelemetry.io/collector/service/internal/status" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. // TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { servicehost.Host - component *component.InstanceID + component *component.InstanceID + statusNotifier status.Notifier *zap.Logger } -func NewHostWrapper(host servicehost.Host, component *component.InstanceID, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, instanceID *component.InstanceID, logger *zap.Logger) component.Host { return &hostWrapper{ host, - component, + instanceID, + status.NewNotifier(host, instanceID), logger, } } @@ -34,8 +37,15 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) // nolint:staticcheck } -func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) { - hw.Host.ReportComponentStatus(hw.component, event) +func (hw *hostWrapper) ReportComponentStatus(status component.Status, options ...component.StatusEventOption) { + // The following can return an error. The two cases that would result in an error would be: + // - An invalid state transition + // - Invalid arguments (basically providing a component.WithError option to a non-error status) + // The latter is a programming error and should be corrected. The former, is something that is + // likely to happen, but not something the programmer should be concerned about. An example would be + // reporting StatusRecoverableError multiple times, which, could happen while recovering, however, + // only the first invocation would result in a successful status transition. + _ = hw.statusNotifier.Event(status, options...) } // RegisterZPages is used by zpages extension to register handles from service. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 282701e6a213..194c62c4a2ea 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -7,17 +7,14 @@ import ( "errors" "testing" - "github.com/stretchr/testify/assert" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/servicehost" ) -func Test_newHostWrapper(t *testing.T) { +func Test_newHostWrapper(_ *testing.T) { hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) - ev, err := component.NewStatusEvent(component.StatusOK) - assert.NoError(t, err) - hw.ReportComponentStatus(ev) + hw.ReportComponentStatus(component.StatusOK) } diff --git a/service/internal/status/status.go b/service/internal/status/status.go new file mode 100644 index 000000000000..ac0c6736fd9f --- /dev/null +++ b/service/internal/status/status.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status // import "go.opentelemetry.io/collector/service/internal/status" + +import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" +) + +// onTransitionFunc receives a component.StatusEvent on a successful state transition +type onTransitionFunc func(*component.StatusEvent) + +var errInvalidStateTransition = errors.New("invalid state transition") + +// fsm is a finite state machine that models transitions for component status +type fsm struct { + current *component.StatusEvent + transitions map[component.Status]map[component.Status]struct{} + onTransition onTransitionFunc +} + +// Event will attempt to execute a state transition. If successful, it calls the onTransitionFunc +// with a StatusEvent representing the new state. Returns an error if the arguments result in an +// invalid status, or if the state transition is not valid. +func (m *fsm) Event(status component.Status, options ...component.StatusEventOption) error { + if _, ok := m.transitions[m.current.Status()][status]; !ok { + return fmt.Errorf( + "cannot transition from %s to %s: %w", + m.current.Status(), + status, + errInvalidStateTransition, + ) + } + + ev, err := component.NewStatusEvent(status, options...) + if err != nil { + return err + } + + m.current = ev + m.onTransition(ev) + + return nil +} + +// newStatusFSM creates a state machine with all valid transitions for component.Status. +// It sets the initial state to component.StatusStarting and triggers the onTransitionFunc +// for the initial state. +func newStatusFSM(onTransition onTransitionFunc) *fsm { + starting, _ := component.NewStatusEvent(component.StatusStarting) + m := &fsm{ + current: starting, + onTransition: onTransition, + transitions: map[component.Status]map[component.Status]struct{}{ + component.StatusStarting: { + component.StatusOK: {}, + component.StatusRecoverableError: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: {}, + component.StatusStopped: {}, + }, + component.StatusOK: { + component.StatusRecoverableError: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: {}, + component.StatusStopped: {}, + }, + component.StatusRecoverableError: { + component.StatusOK: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: {}, + component.StatusStopped: {}, + }, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: { + component.StatusRecoverableError: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopped: {}, + }, + component.StatusStopped: {}, + }, + } + + // fire initial starting event + m.onTransition(starting) + return m +} + +// A Notifier emits status events +type Notifier interface { + Event(status component.Status, options ...component.StatusEventOption) error +} + +// NewNotifier returns a status.Notifier that reports component status through the given +// servicehost. The underlying implementation is a finite state machine. +func NewNotifier(host servicehost.Host, instanceID *component.InstanceID) Notifier { + return newStatusFSM( + func(ev *component.StatusEvent) { + host.ReportComponentStatus(instanceID, ev) + }, + ) +} diff --git a/service/internal/status/status_test.go b/service/internal/status/status_test.go new file mode 100644 index 000000000000..b5ba41bf793a --- /dev/null +++ b/service/internal/status/status_test.go @@ -0,0 +1,143 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" +) + +func TestStatusFSM(t *testing.T) { + for _, tc := range []struct { + name string + reportedStatuses []component.Status + expectedStatuses []component.Status + expectedErrorCount int + }{ + { + name: "successful startup and shutdown", + reportedStatuses: []component.Status{ + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + }, + { + name: "component recovered", + reportedStatuses: []component.Status{ + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + }, + { + name: "repeated events are errors", + reportedStatuses: []component.Status{ + component.StatusOK, + component.StatusRecoverableError, + component.StatusRecoverableError, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedErrorCount: 2, + }, + { + name: "PermanentError is terminal", + reportedStatuses: []component.Status{ + component.StatusOK, + component.StatusPermanentError, + component.StatusOK, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusPermanentError, + }, + expectedErrorCount: 1, + }, + { + name: "FatalError is terminal", + reportedStatuses: []component.Status{ + component.StatusOK, + component.StatusFatalError, + component.StatusOK, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusFatalError, + }, + expectedErrorCount: 1, + }, + { + name: "Stopped is terminal", + reportedStatuses: []component.Status{ + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + component.StatusOK, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedErrorCount: 1, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var receivedStatuses []component.Status + fsm := newStatusFSM( + func(ev *component.StatusEvent) { + receivedStatuses = append(receivedStatuses, ev.Status()) + }, + ) + + errorCount := 0 + for _, status := range tc.reportedStatuses { + if err := fsm.Event(status); err != nil { + errorCount++ + require.ErrorIs(t, err, errInvalidStateTransition) + } + } + + require.Equal(t, tc.expectedErrorCount, errorCount) + require.Equal(t, tc.expectedStatuses, receivedStatuses) + }) + } +} + +func TestNewNotifier(t *testing.T) { + notifier := NewNotifier(servicehost.NewNopHost(), &component.InstanceID{}) + require.NoError(t, notifier.Event(component.StatusOK)) +}