Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup service, combine small functions to better read the overall flow #4006

Merged
merged 3 commits into from
Sep 11, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 56 additions & 110 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,55 @@ func newService(set *svcSettings) (*service, error) {
return nil, fmt.Errorf("invalid configuration: %w", err)
}

if err := srv.buildExtensions(); err != nil {
var err error
srv.builtExtensions, err = builder.BuildExtensions(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Extensions)
if err != nil {
return nil, fmt.Errorf("cannot build extensions: %w", err)
}

if err := srv.buildPipelines(); err != nil {
// Pipeline is built backwards, starting from exporters, so that we create objects
// which are referenced before objects which reference them.

// First create exporters.
srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Exporters)
if err != nil {
return nil, fmt.Errorf("cannot build exporters: %w", err)
}

// Create pipelines and their processors and plug exporters to the end of the pipelines.
srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors)
if err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
}

// Create receivers and plug them into the start of the pipelines.
srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers)
if err != nil {
return nil, fmt.Errorf("cannot build receivers: %w", err)
}

return srv, nil
}

func (srv *service) Start(ctx context.Context) error {
if err := srv.startExtensions(ctx); err != nil {
return fmt.Errorf("cannot setup extensions: %w", err)
srv.logger.Info("Starting extensions...")
if err := srv.builtExtensions.StartAll(ctx, srv); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}

srv.logger.Info("Starting exporters...")
if err := srv.builtExporters.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start builtExporters: %w", err)
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
}

if err := srv.startPipelines(ctx); err != nil {
return fmt.Errorf("cannot setup pipelines: %w", err)
srv.logger.Info("Starting processors...")
if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil {
return fmt.Errorf("cannot start processors: %w", err)
}

srv.logger.Info("Starting receivers...")
if err := srv.builtReceivers.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start receivers: %w", err)
}

return srv.builtExtensions.NotifyPipelineReady()
Expand All @@ -90,11 +121,27 @@ func (srv *service) Shutdown(ctx context.Context) error {
errs = append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
}

if err := srv.shutdownPipelines(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err))
// Pipeline shutdown order is the reverse of building/starting: first receivers, then flushing pipelines
// giving senders a chance to send all their data. This may take time, the allowed
// time should be part of configuration.

srv.logger.Info("Stopping receivers...")
if err := srv.builtReceivers.ShutdownAll(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to stop receivers: %w", err))
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
}

srv.logger.Info("Stopping processors...")
if err := srv.builtPipelines.ShutdownProcessors(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown processors: %w", err))
}

srv.logger.Info("Stopping exporters...")
if err := srv.builtExporters.ShutdownAll(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown exporters: %w", err))
}

if err := srv.shutdownExtensions(ctx); err != nil {
srv.logger.Info("Stopping extensions...")
if err := srv.builtExtensions.ShutdownAll(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
}

Expand Down Expand Up @@ -129,104 +176,3 @@ func (srv *service) GetExtensions() map[config.ComponentID]component.Extension {
func (srv *service) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
return srv.builtExporters.ToMapByDataType()
}

func (srv *service) buildExtensions() error {
var err error
srv.builtExtensions, err = builder.BuildExtensions(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Extensions)
if err != nil {
return fmt.Errorf("cannot build builtExtensions: %w", err)
}
return nil
}

func (srv *service) startExtensions(ctx context.Context) error {
srv.logger.Info("Starting extensions...")
err := srv.builtExtensions.StartAll(ctx, srv)
if err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}
return nil
}

func (srv *service) shutdownExtensions(ctx context.Context) error {
srv.logger.Info("Stopping extensions...")
err := srv.builtExtensions.ShutdownAll(ctx)
if err != nil {
return fmt.Errorf("failed to shutdown extensions: %w", err)
}
return nil
}

func (srv *service) buildPipelines() error {
// Pipeline is built backwards, starting from exporters, so that we create objects
// which are referenced before objects which reference them.

// First create exporters.
var err error
srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Exporters)
if err != nil {
return fmt.Errorf("cannot build builtExporters: %w", err)
}

// Create pipelines and their processors and plug exporters to the
// end of the pipelines.
srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors)
if err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}

// Create receivers and plug them into the start of the pipelines.
srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers)
if err != nil {
return fmt.Errorf("cannot build receivers: %w", err)
}

return nil
}

func (srv *service) startPipelines(ctx context.Context) error {
srv.logger.Info("Starting exporters...")
if err := srv.builtExporters.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start builtExporters: %w", err)
}

srv.logger.Info("Starting processors...")
if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil {
return fmt.Errorf("cannot start processors: %w", err)
}

srv.logger.Info("Starting receivers...")
if err := srv.builtReceivers.StartAll(ctx, srv); err != nil {
return fmt.Errorf("cannot start receivers: %w", err)
}

return nil
}

func (srv *service) shutdownPipelines(ctx context.Context) error {
// Shutdown order is the reverse of building: first receivers, then flushing pipelines
// giving senders a chance to send all their data. This may take time, the allowed
// time should be part of configuration.

var errs []error

srv.logger.Info("Stopping receivers...")
err := srv.builtReceivers.ShutdownAll(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("failed to stop receivers: %w", err))
}

srv.logger.Info("Stopping processors...")
err = srv.builtPipelines.ShutdownProcessors(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown processors: %w", err))
}

srv.logger.Info("Stopping exporters...")
err = srv.builtExporters.ShutdownAll(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown exporters: %w", err))
}

return consumererror.Combine(errs)
}