diff --git a/service/service.go b/service/service.go index de08cc615ff..466d82cc0a7 100644 --- a/service/service.go +++ b/service/service.go @@ -59,24 +59,55 @@ func newService(set *svcSettings) (*service, error) { return nil, fmt.Errorf("invalid configuration: %w", err) } - if err := srv.buildExtensions(); err != nil { + var err error + srv.builtExtensions, err = builder.BuildExtensions(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Extensions) + if err != nil { return nil, fmt.Errorf("cannot build extensions: %w", err) } - if err := srv.buildPipelines(); err != nil { + // Pipeline is built backwards, starting from exporters, so that we create objects + // which are referenced before objects which reference them. + + // First create exporters. + srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Exporters) + if err != nil { + return nil, fmt.Errorf("cannot build exporters: %w", err) + } + + // Create pipelines and their processors and plug exporters to the end of the pipelines. + srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors) + if err != nil { return nil, fmt.Errorf("cannot build pipelines: %w", err) } + // Create receivers and plug them into the start of the pipelines. + srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers) + if err != nil { + return nil, fmt.Errorf("cannot build receivers: %w", err) + } + return srv, nil } func (srv *service) Start(ctx context.Context) error { - if err := srv.startExtensions(ctx); err != nil { - return fmt.Errorf("cannot setup extensions: %w", err) + srv.logger.Info("Starting extensions...") + if err := srv.builtExtensions.StartAll(ctx, srv); err != nil { + return fmt.Errorf("failed to start extensions: %w", err) + } + + srv.logger.Info("Starting exporters...") + if err := srv.builtExporters.StartAll(ctx, srv); err != nil { + return fmt.Errorf("cannot start exporters: %w", err) } - if err := srv.startPipelines(ctx); err != nil { - return fmt.Errorf("cannot setup pipelines: %w", err) + srv.logger.Info("Starting processors...") + if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil { + return fmt.Errorf("cannot start processors: %w", err) + } + + srv.logger.Info("Starting receivers...") + if err := srv.builtReceivers.StartAll(ctx, srv); err != nil { + return fmt.Errorf("cannot start receivers: %w", err) } return srv.builtExtensions.NotifyPipelineReady() @@ -90,11 +121,27 @@ func (srv *service) Shutdown(ctx context.Context) error { errs = append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } - if err := srv.shutdownPipelines(ctx); err != nil { - errs = append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err)) + // Pipeline shutdown order is the reverse of building/starting: first receivers, then flushing pipelines + // giving senders a chance to send all their data. This may take time, the allowed + // time should be part of configuration. + + srv.logger.Info("Stopping receivers...") + if err := srv.builtReceivers.ShutdownAll(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to shutdown receivers: %w", err)) + } + + srv.logger.Info("Stopping processors...") + if err := srv.builtPipelines.ShutdownProcessors(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to shutdown processors: %w", err)) } - if err := srv.shutdownExtensions(ctx); err != nil { + srv.logger.Info("Stopping exporters...") + if err := srv.builtExporters.ShutdownAll(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to shutdown exporters: %w", err)) + } + + srv.logger.Info("Stopping extensions...") + if err := srv.builtExtensions.ShutdownAll(ctx); err != nil { errs = append(errs, fmt.Errorf("failed to shutdown extensions: %w", err)) } @@ -129,104 +176,3 @@ func (srv *service) GetExtensions() map[config.ComponentID]component.Extension { func (srv *service) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter { return srv.builtExporters.ToMapByDataType() } - -func (srv *service) buildExtensions() error { - var err error - srv.builtExtensions, err = builder.BuildExtensions(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Extensions) - if err != nil { - return fmt.Errorf("cannot build builtExtensions: %w", err) - } - return nil -} - -func (srv *service) startExtensions(ctx context.Context) error { - srv.logger.Info("Starting extensions...") - err := srv.builtExtensions.StartAll(ctx, srv) - if err != nil { - return fmt.Errorf("failed to start extensions: %w", err) - } - return nil -} - -func (srv *service) shutdownExtensions(ctx context.Context) error { - srv.logger.Info("Stopping extensions...") - err := srv.builtExtensions.ShutdownAll(ctx) - if err != nil { - return fmt.Errorf("failed to shutdown extensions: %w", err) - } - return nil -} - -func (srv *service) buildPipelines() error { - // Pipeline is built backwards, starting from exporters, so that we create objects - // which are referenced before objects which reference them. - - // First create exporters. - var err error - srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.factories.Exporters) - if err != nil { - return fmt.Errorf("cannot build builtExporters: %w", err) - } - - // Create pipelines and their processors and plug exporters to the - // end of the pipelines. - srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors) - if err != nil { - return fmt.Errorf("cannot build pipelines: %w", err) - } - - // Create receivers and plug them into the start of the pipelines. - srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.tracerProvider, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers) - if err != nil { - return fmt.Errorf("cannot build receivers: %w", err) - } - - return nil -} - -func (srv *service) startPipelines(ctx context.Context) error { - srv.logger.Info("Starting exporters...") - if err := srv.builtExporters.StartAll(ctx, srv); err != nil { - return fmt.Errorf("cannot start builtExporters: %w", err) - } - - srv.logger.Info("Starting processors...") - if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil { - return fmt.Errorf("cannot start processors: %w", err) - } - - srv.logger.Info("Starting receivers...") - if err := srv.builtReceivers.StartAll(ctx, srv); err != nil { - return fmt.Errorf("cannot start receivers: %w", err) - } - - return nil -} - -func (srv *service) shutdownPipelines(ctx context.Context) error { - // Shutdown order is the reverse of building: first receivers, then flushing pipelines - // giving senders a chance to send all their data. This may take time, the allowed - // time should be part of configuration. - - var errs []error - - srv.logger.Info("Stopping receivers...") - err := srv.builtReceivers.ShutdownAll(ctx) - if err != nil { - errs = append(errs, fmt.Errorf("failed to stop receivers: %w", err)) - } - - srv.logger.Info("Stopping processors...") - err = srv.builtPipelines.ShutdownProcessors(ctx) - if err != nil { - errs = append(errs, fmt.Errorf("failed to shutdown processors: %w", err)) - } - - srv.logger.Info("Stopping exporters...") - err = srv.builtExporters.ShutdownAll(ctx) - if err != nil { - errs = append(errs, fmt.Errorf("failed to shutdown exporters: %w", err)) - } - - return consumererror.Combine(errs) -}