diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportOrchestratorJobTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportOrchestratorJobTests.cs index 40cd1c0872..449dab4d20 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportOrchestratorJobTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportOrchestratorJobTests.cs @@ -57,12 +57,30 @@ public async Task GivenAnOrchestratorJob_WhenResumeFromFailure_ThenJobShouldBeCo await VerifyCommonOrchestratorJobAsync(105, 6, 10); } + [Fact] + public async Task GivenAnOrchestratorJob_WhenAllResumeFromFailure_ThenJobShouldBeCompleted() + { + await VerifyCommonOrchestratorJobAsync(105, 6, 105); + } + [Fact] public async Task GivenAnOrchestratorJob_WhenResumeFromFailureSomeJobStillRunning_ThenJobShouldBeCompleted() { await VerifyCommonOrchestratorJobAsync(105, 6, 10, 5); } + [Fact] + public async Task GivenAnOrchestratorJob_WhenSomeJobsCancelled_ThenOperationCanceledExceptionShouldBeThrowAndWaitForOtherSubJobsCompleted() + { + await VerifyJobStatusChangedAsync(100, 1, JobStatus.Cancelled, 20, 20); + } + + [Fact] + public async Task GivenAnOrchestratorJob_WhenSomeJobsFailed_ThenImportProcessingExceptionShouldBeThrowAndWaitForOtherSubJobsCompleted() + { + await VerifyJobStatusChangedAsync(100, 1, JobStatus.Failed, 14, 14); + } + [Fact] public async Task GivenAnOrchestratorJobAndWrongEtag_WhenOrchestratorJobStart_ThenJobShouldFailedWithDetails() { @@ -100,7 +118,7 @@ public async Task GivenAnOrchestratorJobAndWrongEtag_WhenOrchestratorJobStart_Th fhirDataBulkImportOperation, integrationDataStoreClient, testQueueClient, - Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1}), + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1 }), loggerFactory); JobExecutionException jobExecutionException = await Assert.ThrowsAsync(async () => await orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); @@ -153,7 +171,7 @@ public async Task GivenAnOrchestratorJob_WhenIntegrationExceptionThrow_ThenJobSh fhirDataBulkImportOperation, integrationDataStoreClient, testQueueClient, - Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1}), + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1 }), loggerFactory); JobExecutionException jobExecutionException = await Assert.ThrowsAsync(async () => await orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); @@ -295,6 +313,408 @@ public async Task GivenAnOrchestratorJob_WhenRetriableExceptionThrow_ThenJobExec Arg.Any()); } + [Fact] + public async Task GivenAnOrchestratorJob_WhenLastSubJobFailed_ThenImportProcessingExceptionShouldBeThrowAndWaitForOtherSubJobsCancelledAndCompleted() + { + IImportOrchestratorJobDataStoreOperation fhirDataBulkImportOperation = Substitute.For(); + RequestContextAccessor contextAccessor = Substitute.For>(); + ILoggerFactory loggerFactory = new NullLoggerFactory(); + IIntegrationDataStoreClient integrationDataStoreClient = Substitute.For(); + ISequenceIdGenerator sequenceIdGenerator = Substitute.For>(); + IMediator mediator = Substitute.For(); + ImportOrchestratorJobInputData importOrchestratorInputData = new ImportOrchestratorJobInputData(); + ImportOrchestratorJobResult importOrchestratorJobResult = new ImportOrchestratorJobResult(); + TestQueueClient testQueueClient = new TestQueueClient(); + bool getJobByGroupIdCalledTime = false; + + testQueueClient.GetJobByIdFunc = (queueClient, id, _) => + { + JobInfo jobInfo = queueClient.JobInfos.First(t => t.Id == id); + if (jobInfo.Id == 3) + { + jobInfo.Status = JobStatus.Failed; + jobInfo.Result = JsonConvert.SerializeObject(new ImportProcessingJobErrorResult() { Message = "Job Failed" }); + } + + return jobInfo; + }; + testQueueClient.GetJobByGroupIdFunc = (queueClient, groupId, _) => + { + IEnumerable jobInfos = queueClient.JobInfos.Where(t => t.GroupId == groupId); + if (!getJobByGroupIdCalledTime) + { + foreach (JobInfo jobInfo in jobInfos) + { + if (jobInfo.Status == JobStatus.Running) + { + jobInfo.Status = JobStatus.Completed; + } + } + } + + getJobByGroupIdCalledTime = true; + return jobInfos.ToList(); + }; + importOrchestratorInputData.CreateTime = Clock.UtcNow; + importOrchestratorInputData.BaseUri = new Uri("http://dummy"); + var inputs = new List(); + + importOrchestratorJobResult.Progress = ImportOrchestratorJobProgress.PreprocessCompleted; + inputs.Add(new InputResource() { Type = "Resource", Url = new Uri($"http://dummy/3") }); + inputs.Add(new InputResource() { Type = "Resource", Url = new Uri($"http://dummy/4") }); + importOrchestratorInputData.Input = inputs; + importOrchestratorInputData.InputFormat = "ndjson"; + importOrchestratorInputData.InputSource = new Uri("http://dummy"); + importOrchestratorInputData.RequestUri = new Uri("http://dummy"); + JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorInputData) }, 1, false, false, CancellationToken.None)).First(); + + integrationDataStoreClient.GetPropertiesAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + Dictionary properties = new Dictionary(); + properties[IntegrationDataStoreClientConstants.BlobPropertyETag] = "test"; + properties[IntegrationDataStoreClientConstants.BlobPropertyLength] = 1000L; + return properties; + }); + + sequenceIdGenerator.GetCurrentSequenceId().Returns(_ => 0L); + + ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob( + mediator, + contextAccessor, + fhirDataBulkImportOperation, + integrationDataStoreClient, + testQueueClient, + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 3 }), + loggerFactory); + orchestratorJob.PollingFrequencyInSeconds = 0; + var jobExecutionException = await Assert.ThrowsAnyAsync(() => orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); + + ImportOrchestratorJobErrorResult resultDetails = (ImportOrchestratorJobErrorResult)jobExecutionException.Error; + Assert.Equal(HttpStatusCode.BadRequest, resultDetails.HttpStatusCode); + Assert.Equal(1, testQueueClient.JobInfos.Count(t => t.Status == JobStatus.Failed)); + Assert.Equal(2, testQueueClient.JobInfos.Count(t => t.Status == JobStatus.Cancelled)); + + _ = mediator.Received().Publish( + Arg.Is( + notification => notification.Id == orchestratorJobInfo.Id.ToString() && + notification.Status == JobStatus.Failed.ToString() && + notification.CreatedTime == importOrchestratorInputData.CreateTime), + Arg.Any()); + } + + [Fact] + public async Task GivenAnOrchestratorJob_WhenSubJobFailedAndOthersRunning_ThenImportProcessingExceptionShouldBeThrowAndContextUpdated() + { + IImportOrchestratorJobDataStoreOperation fhirDataBulkImportOperation = Substitute.For(); + RequestContextAccessor contextAccessor = Substitute.For>(); + ILoggerFactory loggerFactory = new NullLoggerFactory(); + IIntegrationDataStoreClient integrationDataStoreClient = Substitute.For(); + ISequenceIdGenerator sequenceIdGenerator = Substitute.For>(); + IMediator mediator = Substitute.For(); + ImportOrchestratorJobInputData importOrchestratorInputData = new ImportOrchestratorJobInputData(); + TestQueueClient testQueueClient = new TestQueueClient(); + bool getJobByGroupIdCalledTime = false; + testQueueClient.GetJobByIdFunc = (queueClient, id, _) => + { + if (id > 10) + { + return new JobInfo() + { + Id = id, + Status = JobManagement.JobStatus.Failed, + Result = JsonConvert.SerializeObject(new ImportProcessingJobErrorResult() { Message = "error" }), + }; + } + + JobInfo jobInfo = testQueueClient.JobInfos.First(t => t.Id == id); + if (jobInfo.Status == JobStatus.Created) + { + jobInfo.Status = JobStatus.Running; + return jobInfo; + } + + return jobInfo; + }; + testQueueClient.GetJobByGroupIdFunc = (queueClient, groupId, _) => + { + IEnumerable jobInfos = queueClient.JobInfos.Where(t => t.GroupId == groupId); + if (!getJobByGroupIdCalledTime) + { + foreach (JobInfo jobInfo in jobInfos) + { + if (jobInfo.Status == JobStatus.Running) + { + jobInfo.Status = JobStatus.Completed; + } + } + } + + getJobByGroupIdCalledTime = true; + return jobInfos.ToList(); + }; + + importOrchestratorInputData.CreateTime = Clock.UtcNow; + importOrchestratorInputData.BaseUri = new Uri("http://dummy"); + + var inputs = new List(); + for (int i = 0; i < 100; i++) + { + inputs.Add(new InputResource() { Type = "Resource", Url = new Uri($"http://dummy/{i}") }); + } + + importOrchestratorInputData.Input = inputs; + importOrchestratorInputData.InputFormat = "ndjson"; + importOrchestratorInputData.InputSource = new Uri("http://dummy"); + importOrchestratorInputData.RequestUri = new Uri("http://dummy"); + JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorInputData) }, 1, false, false, CancellationToken.None)).First(); + + integrationDataStoreClient.GetPropertiesAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + Dictionary properties = new Dictionary(); + properties[IntegrationDataStoreClientConstants.BlobPropertyETag] = "test"; + properties[IntegrationDataStoreClientConstants.BlobPropertyLength] = 1000L; + return properties; + }); + + sequenceIdGenerator.GetCurrentSequenceId().Returns(_ => 0L); + ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob( + mediator, + contextAccessor, + fhirDataBulkImportOperation, + integrationDataStoreClient, + testQueueClient, + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 30 }), + loggerFactory); + orchestratorJob.PollingFrequencyInSeconds = 0; + + var jobExecutionException = await Assert.ThrowsAnyAsync(() => orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); + ImportOrchestratorJobErrorResult resultDetails = (ImportOrchestratorJobErrorResult)jobExecutionException.Error; + + Assert.Equal(HttpStatusCode.BadRequest, resultDetails.HttpStatusCode); + + _ = mediator.Received().Publish( + Arg.Is( + notification => notification.Id == orchestratorJobInfo.Id.ToString() && + notification.Status == JobStatus.Failed.ToString() && + notification.CreatedTime == importOrchestratorInputData.CreateTime && + notification.SucceedCount == 0 && + notification.FailedCount == 0), + Arg.Any()); + } + + [Fact] + public async Task GivenAnOrchestratorJob_WhneSubJobCancelledAfterThreeCalls_ThenOperationCanceledExceptionShouldBeThrowAndContextUpdate() + { + IImportOrchestratorJobDataStoreOperation fhirDataBulkImportOperation = Substitute.For(); + RequestContextAccessor contextAccessor = Substitute.For>(); + ILoggerFactory loggerFactory = new NullLoggerFactory(); + IIntegrationDataStoreClient integrationDataStoreClient = Substitute.For(); + ISequenceIdGenerator sequenceIdGenerator = Substitute.For>(); + IMediator mediator = Substitute.For(); + ImportOrchestratorJobInputData importOrchestratorJobInputData = new ImportOrchestratorJobInputData(); + TestQueueClient testQueueClient = new TestQueueClient(); + int callTime = 0; + testQueueClient.GetJobByIdFunc = (queueClient, id, _) => + { + JobInfo jobInfo = queueClient.JobInfos.First(t => t.Id == id); + if (++callTime > 3) + { + jobInfo.Status = JobStatus.Cancelled; + jobInfo.Result = JsonConvert.SerializeObject(new ImportProcessingJobErrorResult() { Message = "Job Cancelled" }); + } + + return jobInfo; + }; + importOrchestratorJobInputData.CreateTime = Clock.UtcNow; + importOrchestratorJobInputData.BaseUri = new Uri("http://dummy"); + + var inputs = new List(); + inputs.Add(new InputResource() { Type = "Resource", Url = new Uri($"http://dummy") }); + + importOrchestratorJobInputData.Input = inputs; + importOrchestratorJobInputData.InputFormat = "ndjson"; + importOrchestratorJobInputData.InputSource = new Uri("http://dummy"); + importOrchestratorJobInputData.RequestUri = new Uri("http://dummy"); + + integrationDataStoreClient.GetPropertiesAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + Dictionary properties = new Dictionary(); + properties[IntegrationDataStoreClientConstants.BlobPropertyETag] = "test"; + properties[IntegrationDataStoreClientConstants.BlobPropertyLength] = 1000L; + return properties; + }); + + sequenceIdGenerator.GetCurrentSequenceId().Returns(_ => 0L); + JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorJobInputData) }, 1, false, false, CancellationToken.None)).First(); + ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob( + mediator, + contextAccessor, + fhirDataBulkImportOperation, + integrationDataStoreClient, + testQueueClient, + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1 }), + loggerFactory); + orchestratorJob.PollingFrequencyInSeconds = 0; + + var jobExecutionException = await Assert.ThrowsAnyAsync(() => orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); + ImportOrchestratorJobErrorResult resultDetails = (ImportOrchestratorJobErrorResult)jobExecutionException.Error; + + Assert.Equal(HttpStatusCode.BadRequest, resultDetails.HttpStatusCode); + + _ = mediator.Received().Publish( + Arg.Is( + notification => notification.Id == orchestratorJobInfo.Id.ToString() && + notification.Status == JobStatus.Cancelled.ToString() && + notification.CreatedTime == importOrchestratorJobInputData.CreateTime && + notification.SucceedCount == 0 && + notification.FailedCount == 0), + Arg.Any()); + } + + [Fact] + public async Task GivenAnOrchestratorJob_WhenSubJobFailedAfterThreeCalls_ThenImportProcessingExceptionShouldBeThrowAndContextUpdated() + { + IImportOrchestratorJobDataStoreOperation fhirDataBulkImportOperation = Substitute.For(); + RequestContextAccessor contextAccessor = Substitute.For>(); + ILoggerFactory loggerFactory = new NullLoggerFactory(); + IIntegrationDataStoreClient integrationDataStoreClient = Substitute.For(); + ISequenceIdGenerator sequenceIdGenerator = Substitute.For>(); + IMediator mediator = Substitute.For(); + ImportOrchestratorJobInputData importOrchestratorJobInputData = new ImportOrchestratorJobInputData(); + TestQueueClient testQueueClient = new TestQueueClient(); + int callTime = 0; + testQueueClient.GetJobByIdFunc = (queueClient, id, _) => + { + JobInfo jobInfo = queueClient.JobInfos.First(t => t.Id == id); + if (++callTime > 3) + { + jobInfo.Status = JobStatus.Failed; + jobInfo.Result = JsonConvert.SerializeObject(new ImportProcessingJobErrorResult() { Message = "error" }); + } + + return jobInfo; + }; + importOrchestratorJobInputData.CreateTime = Clock.UtcNow; + importOrchestratorJobInputData.BaseUri = new Uri("http://dummy"); + + var inputs = new List(); + inputs.Add(new InputResource() { Type = "Resource", Url = new Uri($"http://dummy") }); + + importOrchestratorJobInputData.Input = inputs; + importOrchestratorJobInputData.InputFormat = "ndjson"; + importOrchestratorJobInputData.InputSource = new Uri("http://dummy"); + importOrchestratorJobInputData.RequestUri = new Uri("http://dummy"); + + integrationDataStoreClient.GetPropertiesAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + Dictionary properties = new Dictionary(); + properties[IntegrationDataStoreClientConstants.BlobPropertyETag] = "test"; + properties[IntegrationDataStoreClientConstants.BlobPropertyLength] = 1000L; + return properties; + }); + + sequenceIdGenerator.GetCurrentSequenceId().Returns(_ => 0L); + JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorJobInputData) }, 1, false, false, CancellationToken.None)).First(); + ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob( + mediator, + contextAccessor, + fhirDataBulkImportOperation, + integrationDataStoreClient, + testQueueClient, + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1 }), + loggerFactory); + orchestratorJob.PollingFrequencyInSeconds = 0; + + var jobExecutionException = await Assert.ThrowsAnyAsync(() => orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); + ImportOrchestratorJobErrorResult resultDetails = (ImportOrchestratorJobErrorResult)jobExecutionException.Error; + + Assert.Equal(HttpStatusCode.BadRequest, resultDetails.HttpStatusCode); + Assert.Equal("error", resultDetails.ErrorMessage); + + _ = mediator.Received().Publish( + Arg.Is( + notification => notification.Id == orchestratorJobInfo.Id.ToString() && + notification.Status == JobStatus.Failed.ToString() && + notification.CreatedTime == importOrchestratorJobInputData.CreateTime && + notification.SucceedCount == 0 && + notification.FailedCount == 0), + Arg.Any()); + } + + [Fact] + public async Task GivenAnOrchestratorJob_WhenSubJobCancelled_ThenOperationCancelledExceptionShouldBeThrowAndContextUpdated() + { + IImportOrchestratorJobDataStoreOperation fhirDataBulkImportOperation = Substitute.For(); + RequestContextAccessor contextAccessor = Substitute.For>(); + ILoggerFactory loggerFactory = new NullLoggerFactory(); + IIntegrationDataStoreClient integrationDataStoreClient = Substitute.For(); + ISequenceIdGenerator sequenceIdGenerator = Substitute.For>(); + IMediator mediator = Substitute.For(); + ImportOrchestratorJobInputData importOrchestratorInputData = new ImportOrchestratorJobInputData(); + TestQueueClient testQueueClient = new TestQueueClient(); + testQueueClient.GetJobByIdFunc = (queueClient, id, _) => + { + JobInfo jobInfo = new JobInfo() + { + Status = JobManagement.JobStatus.Cancelled, + Result = JsonConvert.SerializeObject(new ImportProcessingJobErrorResult() { Message = "error" }), + }; + + return jobInfo; + }; + + importOrchestratorInputData.CreateTime = Clock.UtcNow; + importOrchestratorInputData.BaseUri = new Uri("http://dummy"); + + var inputs = new List(); + inputs.Add(new InputResource() { Type = "Resource", Url = new Uri($"http://dummy") }); + + importOrchestratorInputData.Input = inputs; + importOrchestratorInputData.InputFormat = "ndjson"; + importOrchestratorInputData.InputSource = new Uri("http://dummy"); + importOrchestratorInputData.RequestUri = new Uri("http://dummy"); + + integrationDataStoreClient.GetPropertiesAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + Dictionary properties = new Dictionary(); + properties[IntegrationDataStoreClientConstants.BlobPropertyETag] = "test"; + properties[IntegrationDataStoreClientConstants.BlobPropertyLength] = 1000L; + return properties; + }); + + sequenceIdGenerator.GetCurrentSequenceId().Returns(_ => 0L); + JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorInputData) }, 1, false, false, CancellationToken.None)).First(); + + ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob( + mediator, + contextAccessor, + fhirDataBulkImportOperation, + integrationDataStoreClient, + testQueueClient, + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1 }), + loggerFactory); + orchestratorJob.PollingFrequencyInSeconds = 0; + + var jobExecutionException = await Assert.ThrowsAnyAsync(() => orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); + ImportOrchestratorJobErrorResult resultDetails = (ImportOrchestratorJobErrorResult)jobExecutionException.Error; + + Assert.Equal(HttpStatusCode.BadRequest, resultDetails.HttpStatusCode); + + _ = mediator.Received().Publish( + Arg.Is( + notification => notification.Id == orchestratorJobInfo.Id.ToString() && + notification.Status == JobStatus.Cancelled.ToString() && + notification.CreatedTime == importOrchestratorInputData.CreateTime && + notification.SucceedCount == 0 && + notification.FailedCount == 0), + Arg.Any()); + } + [Fact] public async Task GivenAnOrchestratorJob_WhenSubJobFailed_ThenImportProcessingExceptionShouldBeThrowAndContextUpdated() { @@ -440,7 +860,7 @@ public async Task GivenAnOrchestratorJob_WhenFailedAtPostProcessStep_ThenRetrabl fhirDataBulkImportOperation, integrationDataStoreClient, testQueueClient, - Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1}), + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1 }), loggerFactory); orchestratorJob.PollingFrequencyInSeconds = 0; @@ -508,7 +928,7 @@ public async Task GivenAnOrchestratorJob_WhenCancelledBeforeCompleted_ThenProces fhirDataBulkImportOperation, integrationDataStoreClient, testQueueClient, - Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1}), + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = 1 }), loggerFactory); orchestratorJob.PollingFrequencyInSeconds = 0; @@ -519,6 +939,147 @@ public async Task GivenAnOrchestratorJob_WhenCancelledBeforeCompleted_ThenProces Assert.True(testQueueClient.JobInfos.All(t => t.Status != JobStatus.Cancelled && !t.CancelRequested)); } + private static async Task VerifyJobStatusChangedAsync(int inputFileCount, int concurrentCount, JobStatus jobStatus, int succeedCount, int failedCount, int resumeFrom = -1, int completedCount = 0) + { + IImportOrchestratorJobDataStoreOperation fhirDataBulkImportOperation = Substitute.For(); + RequestContextAccessor contextAccessor = Substitute.For>(); + ILoggerFactory loggerFactory = new NullLoggerFactory(); + IIntegrationDataStoreClient integrationDataStoreClient = Substitute.For(); + ISequenceIdGenerator sequenceIdGenerator = Substitute.For>(); + IMediator mediator = Substitute.For(); + ImportOrchestratorJobInputData importOrchestratorJobInputData = new ImportOrchestratorJobInputData(); + ImportOrchestratorJobResult importOrchestratorJobResult = new ImportOrchestratorJobResult(); + + TestQueueClient testQueueClient = new TestQueueClient(); + List<(long begin, long end)> surrogatedIdRanges = new List<(long begin, long end)>(); + testQueueClient.GetJobByIdFunc = (testQueueClient, id, _) => + { + JobInfo jobInfo = testQueueClient.JobInfos.First(t => t.Id == id); + + if (jobInfo == null) + { + return null; + } + + if (jobInfo.Status == JobManagement.JobStatus.Completed) + { + return jobInfo; + } + + if (jobInfo.Id > succeedCount + 1) + { + return new JobInfo() + { + Id = jobInfo.Id, + Status = jobStatus, + Result = JsonConvert.SerializeObject(new ImportProcessingJobErrorResult() { Message = "error" }), + }; + } + + ImportProcessingJobInputData processingInput = JsonConvert.DeserializeObject(jobInfo.Definition); + ImportProcessingJobResult processingResult = new ImportProcessingJobResult(); + processingResult.ResourceType = processingInput.ResourceType; + processingResult.SucceedCount = 1; + processingResult.FailedCount = 1; + processingResult.ErrorLogLocation = "http://dummy/error"; + surrogatedIdRanges.Add((processingInput.BeginSequenceId, processingInput.EndSequenceId)); + + jobInfo.Result = JsonConvert.SerializeObject(processingResult); + jobInfo.Status = JobManagement.JobStatus.Completed; + return jobInfo; + }; + + importOrchestratorJobInputData.CreateTime = Clock.UtcNow; + importOrchestratorJobInputData.BaseUri = new Uri("http://dummy"); + var inputs = new List(); + + bool resumeMode = resumeFrom >= 0; + for (int i = 0; i < inputFileCount; ++i) + { + string location = $"http://dummy/{i}"; + inputs.Add(new InputResource() { Type = "Resource", Url = new Uri(location) }); + + if (resumeMode) + { + if (i <= resumeFrom) + { + ImportProcessingJobInputData processingInput = new ImportProcessingJobInputData() + { + ResourceLocation = "http://test", + BeginSequenceId = i, + EndSequenceId = i + 1, + }; + + JobInfo jobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(processingInput) }, 1, false, false, CancellationToken.None)).First(); + + ImportProcessingJobResult processingResult = new ImportProcessingJobResult(); + processingResult.ResourceType = "Resource"; + processingResult.SucceedCount = 1; + processingResult.FailedCount = 1; + processingResult.ErrorLogLocation = "http://dummy/error"; + processingResult.ResourceLocation = location; + + jobInfo.Result = JsonConvert.SerializeObject(processingResult); + if (i < completedCount) + { + jobInfo.Status = JobManagement.JobStatus.Completed; + importOrchestratorJobResult.SucceedImportCount += 1; + importOrchestratorJobResult.FailedImportCount += 1; + } + else + { + jobInfo.Status = JobManagement.JobStatus.Running; + importOrchestratorJobResult.RunningJobIds.Add(jobInfo.Id); + } + + importOrchestratorJobResult.CreatedJobCount += 1; + importOrchestratorJobResult.CurrentSequenceId += 1; + } + + importOrchestratorJobResult.Progress = ImportOrchestratorJobProgress.PreprocessCompleted; + } + } + + importOrchestratorJobInputData.Input = inputs; + importOrchestratorJobInputData.InputFormat = "ndjson"; + importOrchestratorJobInputData.InputSource = new Uri("http://dummy"); + importOrchestratorJobInputData.RequestUri = new Uri("http://dummy"); + JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorJobInputData) }, 1, false, false, CancellationToken.None)).First(); + + integrationDataStoreClient.GetPropertiesAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + Dictionary properties = new Dictionary(); + properties[IntegrationDataStoreClientConstants.BlobPropertyETag] = "test"; + properties[IntegrationDataStoreClientConstants.BlobPropertyLength] = 1000L; + return properties; + }); + + sequenceIdGenerator.GetCurrentSequenceId().Returns(_ => 0L); + + ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob( + mediator, + contextAccessor, + fhirDataBulkImportOperation, + integrationDataStoreClient, + testQueueClient, + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = concurrentCount }), + loggerFactory); + orchestratorJob.PollingFrequencyInSeconds = 0; + var jobExecutionException = await Assert.ThrowsAnyAsync(() => orchestratorJob.ExecuteAsync(orchestratorJobInfo, new Progress(), CancellationToken.None)); + ImportOrchestratorJobErrorResult resultDetails = (ImportOrchestratorJobErrorResult)jobExecutionException.Error; + + Assert.Equal(HttpStatusCode.BadRequest, resultDetails.HttpStatusCode); + _ = mediator.Received().Publish( + Arg.Is( + notification => notification.Id.Equals(orchestratorJobInfo.Id.ToString()) && + notification.Status == jobStatus.ToString() && + notification.CreatedTime == importOrchestratorJobInputData.CreateTime && + notification.SucceedCount == succeedCount && + notification.FailedCount == failedCount), + Arg.Any()); + } + private static async Task VerifyCommonOrchestratorJobAsync(int inputFileCount, int concurrentCount, int resumeFrom = -1, int completedCount = 0) { IImportOrchestratorJobDataStoreOperation fhirDataBulkImportOperation = Substitute.For(); @@ -636,7 +1197,7 @@ private static async Task VerifyCommonOrchestratorJobAsync(int inputFileCount, i fhirDataBulkImportOperation, integrationDataStoreClient, testQueueClient, - Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = concurrentCount}), + Options.Create(new Configs.ImportTaskConfiguration() { MaxRunningProcessingJobCount = concurrentCount }), loggerFactory) { PollingFrequencyInSeconds = 0, diff --git a/src/Microsoft.Health.TaskManagement.UnitTests/TestQueueClient.cs b/src/Microsoft.Health.TaskManagement.UnitTests/TestQueueClient.cs index 525321aa0f..783499c176 100644 --- a/src/Microsoft.Health.TaskManagement.UnitTests/TestQueueClient.cs +++ b/src/Microsoft.Health.TaskManagement.UnitTests/TestQueueClient.cs @@ -24,6 +24,8 @@ public class TestQueueClient : IQueueClient public Func GetJobByIdFunc { get; set; } + public Func> GetJobByGroupIdFunc { get; set; } + public List JobInfos { get { return jobInfos; } @@ -152,6 +154,11 @@ public Task> EnqueueAsync(byte queueType, string[] defini public Task> GetJobByGroupIdAsync(byte queueType, long groupId, bool returnDefinition, CancellationToken cancellationToken) { + if (GetJobByGroupIdFunc != null) + { + return Task.FromResult(GetJobByGroupIdFunc(this, groupId, cancellationToken)); + } + IReadOnlyList result = jobInfos.Where(t => t.GroupId == groupId).ToList(); return Task.FromResult(result); } @@ -175,7 +182,7 @@ public Task> GetJobsByIdsAsync(byte queueType, long[] job } IReadOnlyList result = jobInfos.Where(t => jobIds.Contains(t.Id)).ToList(); - return Task.FromResult>(result); + return Task.FromResult(result); } public bool IsInitialized() diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs index 9eb13f8ff6..038e75c48d 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs @@ -35,6 +35,7 @@ internal enum TestQueueType : byte GivenGroupJobs_WhenCompleteJob_ThenJobsShouldBeCompleted, GivenGroupJobs_WhenCancelJobsByGroupId_ThenAllJobsShouldBeCancelled, GivenGroupJobs_WhenCancelJobsById_ThenOnlySingleJobShouldBeCancelled, + GivenGroupJobs_WhenCancelJobsByGroupIdCalledTwice_ThenJobStatusShouldNotChange, GivenGroupJobs_WhenOneJobFailedAndRequestCancellation_ThenAllJobsShouldBeCancelled, ExecuteWithHeartbeat, ExecuteWithHeartbeatsHeavy, @@ -263,6 +264,42 @@ public async Task GivenGroupJobs_WhenCancelJobsByGroupId_ThenAllJobsShouldBeCanc Assert.Equal(jobInfo2.Result, jobInfo.Result); } + [Fact] + public async Task GivenGroupJobs_WhenCancelJobsByGroupIdCalledTwice_ThenJobStatusShouldNotChange() + { + byte queueType = (byte)TestQueueType.GivenGroupJobs_WhenCancelJobsByGroupIdCalledTwice_ThenJobStatusShouldNotChange; + await _queueClient.EnqueueAsync(queueType, new string[] { "job1", "job2", "job3" }, null, false, false, CancellationToken.None); + + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo2 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + IEnumerable jobs = await _queueClient.GetJobByGroupIdAsync(queueType, jobInfo1.GroupId, false, CancellationToken.None); + JobInfo jobInfo3 = jobs.First(t => t.Id != jobInfo1.Id && t.Id != jobInfo2.Id); + + await _queueClient.CancelJobByIdAsync(queueType, jobInfo1.Id, CancellationToken.None); + + jobInfo1.Status = JobStatus.Failed; + jobInfo1.Result = "job failed"; + await _queueClient.CompleteJobAsync(jobInfo1, false, CancellationToken.None); + + await _queueClient.CancelJobByGroupIdAsync(queueType, jobInfo2.GroupId, CancellationToken.None); + Assert.True((await _queueClient.GetJobByGroupIdAsync(queueType, jobInfo2.GroupId, false, CancellationToken.None)).All(t => t.Status == JobStatus.Cancelled || t.Status == JobStatus.Failed || (t.Status == JobStatus.Running && t.CancelRequested))); + jobInfo1 = await _queueClient.GetJobByIdAsync(queueType, jobInfo1.Id, false, CancellationToken.None); + jobInfo2 = await _queueClient.GetJobByIdAsync(queueType, jobInfo2.Id, false, CancellationToken.None); + jobInfo3 = await _queueClient.GetJobByIdAsync(queueType, jobInfo3.Id, false, CancellationToken.None); + Assert.Equal(JobStatus.Failed, jobInfo1.Status); + Assert.Equal(JobStatus.Running, jobInfo2.Status); + Assert.Equal(JobStatus.Cancelled, jobInfo3.Status); + + await _queueClient.CancelJobByGroupIdAsync(queueType, jobInfo2.GroupId, CancellationToken.None); + Assert.True((await _queueClient.GetJobByGroupIdAsync(queueType, jobInfo2.GroupId, false, CancellationToken.None)).All(t => t.Status == JobStatus.Cancelled || t.Status == JobStatus.Failed || (t.Status == JobStatus.Running && t.CancelRequested))); + jobInfo1 = await _queueClient.GetJobByIdAsync(queueType, jobInfo1.Id, false, CancellationToken.None); + jobInfo2 = await _queueClient.GetJobByIdAsync(queueType, jobInfo2.Id, false, CancellationToken.None); + jobInfo3 = await _queueClient.GetJobByIdAsync(queueType, jobInfo3.Id, false, CancellationToken.None); + Assert.Equal(JobStatus.Failed, jobInfo1.Status); + Assert.Equal(JobStatus.Running, jobInfo2.Status); + Assert.Equal(JobStatus.Cancelled, jobInfo3.Status); + } + [Fact] public async Task GivenGroupJobs_WhenCancelJobsById_ThenOnlySingleJobShouldBeCancelled() {