Skip to content

Commit

Permalink
Cleanup service, combine small functions to better read the overall flow
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Sep 10, 2021
1 parent 82d1af4 commit 02d1d37
Showing 1 changed file with 57 additions and 111 deletions.
168 changes: 57 additions & 111 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,55 @@ func newService(set *svcSettings) (*service, error) {
return nil, fmt.Errorf("invalid configuration: %w", err)
}

if err := srv.buildExtensions(); err != nil {
return nil, fmt.Errorf("cannot build extensions: %w", err)
var err error
srv.builtExtensions, err = builder.BuildExtensions(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Extensions)
if err != nil {
return nil, fmt.Errorf("cannot build builtExtensions: %w", err)
}

if err := srv.buildPipelines(); err != nil {
// Pipeline is built backwards, starting from exporters, so that we create objects
// which are referenced before objects which reference them.

// First create exporters.
srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Exporters)
if err != nil {
return nil, fmt.Errorf("cannot build builtExporters: %w", err)
}

// Create pipelines and their processors and plug exporters to the end of the pipelines.
srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors)
if err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
}

// Create receivers and plug them into the start of the pipelines.
srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers)
if err != nil {
return nil, fmt.Errorf("cannot build receivers: %w", err)
}

return srv, nil
}

func (srv *service) Start(ctx context.Context) error {
if err := srv.startExtensions(ctx); err != nil {
return fmt.Errorf("cannot setup extensions: %w", err)
srv.logger.Info("Starting extensions...")
if err := srv.builtExtensions.StartAll(ctx, srv); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}

srv.logger.Info("Starting exporters...")
if err := srv.builtExporters.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start builtExporters: %w", err)
}

if err := srv.startPipelines(ctx); err != nil {
return fmt.Errorf("cannot setup pipelines: %w", err)
srv.logger.Info("Starting processors...")
if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil {
return fmt.Errorf("cannot start processors: %w", err)
}

srv.logger.Info("Starting receivers...")
if err := srv.builtReceivers.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start receivers: %w", err)
}

return srv.builtExtensions.NotifyPipelineReady()
Expand All @@ -90,11 +121,27 @@ func (srv *service) Shutdown(ctx context.Context) error {
errs = append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
}

if err := srv.shutdownPipelines(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err))
// Pipeline shutdown order is the reverse of building/starting: first receivers, then flushing pipelines
// giving senders a chance to send all their data. This may take time, the allowed
// time should be part of configuration.

srv.logger.Info("Stopping receivers...")
if err := srv.builtReceivers.ShutdownAll(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to stop receivers: %w", err))
}

srv.logger.Info("Stopping processors...")
if err := srv.builtPipelines.ShutdownProcessors(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown processors: %w", err))
}

srv.logger.Info("Stopping exporters...")
if err := srv.builtExporters.ShutdownAll(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown exporters: %w", err))
}

if err := srv.shutdownExtensions(ctx); err != nil {
srv.logger.Info("Stopping extensions...")
if err := srv.builtExtensions.ShutdownAll(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
}

Expand Down Expand Up @@ -129,104 +176,3 @@ func (srv *service) GetExtensions() map[config.ComponentID]component.Extension {
func (srv *service) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
return srv.builtExporters.ToMapByDataType()
}

func (srv *service) buildExtensions() error {
var err error
srv.builtExtensions, err = builder.BuildExtensions(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Extensions)
if err != nil {
return fmt.Errorf("cannot build builtExtensions: %w", err)
}
return nil
}

func (srv *service) startExtensions(ctx context.Context) error {
srv.logger.Info("Starting extensions...")
err := srv.builtExtensions.StartAll(ctx, srv)
if err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}
return nil
}

func (srv *service) shutdownExtensions(ctx context.Context) error {
srv.logger.Info("Stopping extensions...")
err := srv.builtExtensions.ShutdownAll(ctx)
if err != nil {
return fmt.Errorf("failed to shutdown extensions: %w", err)
}
return nil
}

func (srv *service) buildPipelines() error {
// Pipeline is built backwards, starting from exporters, so that we create objects
// which are referenced before objects which reference them.

// First create exporters.
var err error
srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Exporters)
if err != nil {
return fmt.Errorf("cannot build builtExporters: %w", err)
}

// Create pipelines and their processors and plug exporters to the
// end of the pipelines.
srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors)
if err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}

// Create receivers and plug them into the start of the pipelines.
srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers)
if err != nil {
return fmt.Errorf("cannot build receivers: %w", err)
}

return nil
}

func (srv *service) startPipelines(ctx context.Context) error {
srv.logger.Info("Starting exporters...")
if err := srv.builtExporters.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start builtExporters: %w", err)
}

srv.logger.Info("Starting processors...")
if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil {
return fmt.Errorf("cannot start processors: %w", err)
}

srv.logger.Info("Starting receivers...")
if err := srv.builtReceivers.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start receivers: %w", err)
}

return nil
}

func (srv *service) shutdownPipelines(ctx context.Context) error {
// Shutdown order is the reverse of building: first receivers, then flushing pipelines
// giving senders a chance to send all their data. This may take time, the allowed
// time should be part of configuration.

var errs []error

srv.logger.Info("Stopping receivers...")
err := srv.builtReceivers.ShutdownAll(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("failed to stop receivers: %w", err))
}

srv.logger.Info("Stopping processors...")
err = srv.builtPipelines.ShutdownProcessors(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown processors: %w", err))
}

srv.logger.Info("Stopping exporters...")
err = srv.builtExporters.ShutdownAll(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown exporters: %w", err))
}

return consumererror.Combine(errs)
}

0 comments on commit 02d1d37

Please sign in to comment.