diff --git a/app/services/flu_svc/flu_monitor/flu_monitor.go b/app/services/flu_svc/flu_monitor/flu_monitor.go index 039cb6f3acd9..5bbafbbf4c5d 100644 --- a/app/services/flu_svc/flu_monitor/flu_monitor.go +++ b/app/services/flu_svc/flu_monitor/flu_monitor.go @@ -55,12 +55,13 @@ func (fm *FluMonitor) getOrCreateProjectHandler(flu models.FeedLineUnit) Project pHandler := NewProjectHandler(pc) fm.bulkProcessor.AddJobManager(pHandler.jobManager) - fm.projectHandlers[flu.ProjectId] = pHandler go pHandler.startFeedLineProcessor() go pHandler.startCBUProcessor() + pHandler.jobManager.Run() + projectHandler = pHandler plog.Info("Flu Monitor", "Project Handler Set", projectHandler.config.ProjectId) } diff --git a/bulk_processor/bulk_processor_test.go b/bulk_processor/bulk_processor_test.go index 31276d3ac999..b413ac0e635a 100644 --- a/bulk_processor/bulk_processor_test.go +++ b/bulk_processor/bulk_processor_test.go @@ -39,6 +39,7 @@ func TestDispatcher_Start(t *testing.T) { for _, c := range clients { dispatcher.AddJobManager(c.jobManager) + c.jobManager.Run() } dispatcher.Start() diff --git a/bulk_processor/dispatcher.go b/bulk_processor/dispatcher.go index 3c796b76a57c..4257b5ef9286 100644 --- a/bulk_processor/dispatcher.go +++ b/bulk_processor/dispatcher.go @@ -28,8 +28,6 @@ func (d *Dispatcher) Start() (started bool) { d.startCheck() - runJobManagers(d.jobManagers) - d.startWorkers(d.maxWorkers) go d.dispatch()