Skip to content

Commit

Permalink
Status implemented as a finite state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Aug 23, 2023
1 parent 4386622 commit 5dcb80b
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 38 deletions.
2 changes: 1 addition & 1 deletion component/componenttest/nop_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewNopHost() component.Host {

func (nh *nopHost) ReportFatalError(_ error) {}

func (nh *nopHost) ReportComponentStatus(_ *component.StatusEvent) {}
func (nh *nopHost) ReportComponentStatus(_ component.Status, _ ...component.StatusEventOption) {}

func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
return nil
Expand Down
2 changes: 1 addition & 1 deletion component/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Host interface {
// May be called by the component any time after Component.Start is called or while
// Component.Start call is executing.
// May be called concurrently with itself.
ReportComponentStatus(event *StatusEvent)
ReportComponentStatus(status Status, options ...StatusEventOption)

// GetFactory of the specified kind. Returns the factory for a component type.
// This allows components to create other components. For example:
Expand Down
80 changes: 72 additions & 8 deletions component/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,104 @@ package component // import "go.opentelemetry.io/collector/component"

import (
"errors"
"fmt"
"time"
)

type Status int32

// Enumeration of possible component statuses
const (
StatusOK Status = iota
StatusError
StatusStarting Status = iota
StatusOK
StatusRecoverableError
StatusPermanentError
StatusFatalError
StatusStopping
StatusStopped
)

// String returns a string representation of a Status
func (s Status) String() string {
switch s {
case StatusStarting:
return "StatusStarting"
case StatusOK:
return "StatusOK"
case StatusRecoverableError:
return "StatusRecoverableError"
case StatusPermanentError:
return "StatusPermanentError"
case StatusFatalError:
return "StatusFatalError"
case StatusStopping:
return "StatusStopping"
case StatusStopped:
return "StatusStopped"
}
return "StatusUnknown"
}

// errorStatuses is a set of statuses that can have associated errors
var errorStatuses = map[Status]struct{}{
StatusRecoverableError: {},
StatusPermanentError: {},
StatusFatalError: {},
}

// StatusEvent contains a status and timestamp, and can contain an error
type StatusEvent struct {
status Status
err error
status Status
err error
timestamp time.Time
}

// Status returns the Status (enum) associated with the StatusEvent
func (ev *StatusEvent) Status() Status {
return ev.status
}

// Err returns the error associated with the ComponentEvent.
// Err returns the error associated with the StatusEvent.
func (ev *StatusEvent) Err() error {
return ev.err
}

// Timestamp returns the timestamp associated with the StatusEvent
func (ev *StatusEvent) Timestamp() time.Time {
return ev.timestamp
}

// StatusEventOption applies options to a StatusEvent.
type StatusEventOption func(*StatusEvent) error

// WithError sets the error object of the Event. It is optional
// errStatusEventInvalidArgument indicates an invalid option was specified when creating a status
// event. This will happen when using WithError for a non-error status.
var errStatusEventInvalidArgument = errors.New("status event argument error")

// WithError sets the error object of the StatusEvent. It is optional
// and should only be applied to an Event of type ComponentError.
func WithError(err error) StatusEventOption {
return func(o *StatusEvent) error {
if o.status == StatusOK {
return errors.New("event with ComponentOK cannot have an error")
if _, ok := errorStatuses[o.status]; !ok {
return fmt.Errorf(
"event with %s cannot have an error: %w",
o.status,
errStatusEventInvalidArgument,
)
}
o.err = err
return nil
}
}

// WithTimestamp is optional, when used it sets the timestamp of the StatusEvent.
func WithTimestamp(t time.Time) StatusEventOption {
return func(o *StatusEvent) error {
o.timestamp = t
return nil
}
}

// NewStatusEvent creates and returns a StatusEvent with default and provided
// options. Will return an error if an error is provided for a non-error event
// type (status.ComponentOK).
Expand All @@ -58,6 +118,10 @@ func NewStatusEvent(status Status, options ...StatusEventOption) (*StatusEvent,
}
}

if ev.timestamp.IsZero() {
ev.timestamp = time.Now()
}

return &ev, nil
}

Expand Down
78 changes: 63 additions & 15 deletions component/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,77 @@
package component

import (
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStatusEventOK(t *testing.T) {
event, err := NewStatusEvent(StatusOK)
require.NoError(t, err)
require.Equal(t, StatusOK, event.Status())
require.Nil(t, event.Err())
func TestStatusEventWithoutError(t *testing.T) {
statuses := []Status{
StatusStarting,
StatusOK,
StatusRecoverableError,
StatusPermanentError,
StatusFatalError,
StatusStopping,
StatusStopped,
}

for _, status := range statuses {
t.Run(fmt.Sprintf("%s without error", status), func(t *testing.T) {
ev, err := NewStatusEvent(status)
require.NoError(t, err)
require.Equal(t, status, ev.Status())
require.Nil(t, ev.Err())
require.False(t, ev.Timestamp().IsZero())
})
}
}

func TestStatusEventWithError(t *testing.T) {
statuses := []Status{
StatusRecoverableError,
StatusRecoverableError,
StatusFatalError,
}

for _, status := range statuses {
t.Run(fmt.Sprintf("error status: %s with error", status), func(t *testing.T) {
ev, err := NewStatusEvent(status, WithError(assert.AnError))
require.NoError(t, err)
require.Equal(t, status, ev.Status())
require.Equal(t, assert.AnError, ev.Err())
require.False(t, ev.Timestamp().IsZero())
})
}
}

func TestStatusEventOKWithError(t *testing.T) {
event, err := NewStatusEvent(StatusOK, WithError(errors.New("an error")))
require.Error(t, err)
require.Nil(t, event)
func TestNonErrorStatusWithError(t *testing.T) {
statuses := []Status{
StatusStarting,
StatusOK,
StatusStopping,
StatusStopped,
}

for _, status := range statuses {
t.Run(fmt.Sprintf("non error status: %s with error", status), func(t *testing.T) {
ev, err := NewStatusEvent(status, WithError(assert.AnError))
require.Error(t, err)
require.ErrorIs(t, err, errStatusEventInvalidArgument)
require.Nil(t, ev)
})
}
}

func TestStatusEventError(t *testing.T) {
eventErr := errors.New("an error")
event, err := NewStatusEvent(StatusError, WithError(eventErr))
func TestStatusEventWithTimestamp(t *testing.T) {
ts := time.Now()
ev, err := NewStatusEvent(StatusOK, WithTimestamp(ts))
require.NoError(t, err)
require.Equal(t, StatusError, event.Status())
require.Equal(t, eventErr, event.Err())
require.Equal(t, StatusOK, ev.Status())
require.Nil(t, ev.Err())
require.Equal(t, ts, ev.Timestamp())
}
6 changes: 5 additions & 1 deletion otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func TestComponentStatusWatcher(t *testing.T) {
changedComponents := map[*component.InstanceID]component.Status{}
var mux sync.Mutex
onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) {
// skip the startup notifications
if event.Status() == component.StatusStarting {
return
}
mux.Lock()
defer mux.Unlock()
changedComponents[source] = event.Status()
Expand Down Expand Up @@ -201,7 +205,7 @@ func TestComponentStatusWatcher(t *testing.T) {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
// And all must be in StatusError
assert.EqualValues(t, component.StatusError, v)
assert.EqualValues(t, component.StatusRecoverableError, v)
}
// We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml
// We must have exactly 3 items in our map. This ensures that the "source" argument
Expand Down
3 changes: 1 addition & 2 deletions processor/processortest/unhealthy_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ type unhealthyProcessor struct {

func (unhealthyProcessor) Start(_ context.Context, host component.Host) error {
go func() {
evt, _ := component.NewStatusEvent(component.StatusError)
host.ReportComponentStatus(evt)
host.ReportComponentStatus(component.StatusRecoverableError)
}()
return nil
}
20 changes: 15 additions & 5 deletions service/internal/components/host_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/servicehost"
"go.opentelemetry.io/collector/service/internal/status"
)

// hostWrapper adds behavior on top of the component.Host being passed when starting the built components.
// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose.
type hostWrapper struct {
servicehost.Host
component *component.InstanceID
component *component.InstanceID
statusNotifier status.Notifier
*zap.Logger
}

func NewHostWrapper(host servicehost.Host, component *component.InstanceID, logger *zap.Logger) component.Host {
func NewHostWrapper(host servicehost.Host, instanceID *component.InstanceID, logger *zap.Logger) component.Host {
return &hostWrapper{
host,
component,
instanceID,
status.NewNotifier(host, instanceID),
logger,
}
}
Expand All @@ -34,8 +37,15 @@ func (hw *hostWrapper) ReportFatalError(err error) {
hw.Host.ReportFatalError(err) // nolint:staticcheck
}

func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) {
hw.Host.ReportComponentStatus(hw.component, event)
func (hw *hostWrapper) ReportComponentStatus(status component.Status, options ...component.StatusEventOption) {
// The following can return an error. The two cases that would result in an error would be:
// - An invalid state transition
// - Invalid arguments (basically providing a component.WithError option to a non-error status)
// The latter is a programming error and should be corrected. The former, is something that is
// likely to happen, but not something the programmer should be concerned about. An example would be
// reporting StatusRecoverableError multiple times, which, could happen while recovering, however,
// only the first invocation would result in a successful status transition.
_ = hw.statusNotifier.Event(status, options...)
}

// RegisterZPages is used by zpages extension to register handles from service.
Expand Down
7 changes: 2 additions & 5 deletions service/internal/components/host_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,14 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

func Test_newHostWrapper(t *testing.T) {
func Test_newHostWrapper(_ *testing.T) {
hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop())
hw.ReportFatalError(errors.New("test error"))
ev, err := component.NewStatusEvent(component.StatusOK)
assert.NoError(t, err)
hw.ReportComponentStatus(ev)
hw.ReportComponentStatus(component.StatusOK)
}
Loading

0 comments on commit 5dcb80b

Please sign in to comment.