From c55f771ecb603d8a5b9808a0c52da4fca7a0e0e0 Mon Sep 17 00:00:00 2001 From: rajithaaluri Date: Thu, 15 Jun 2023 13:02:17 -0500 Subject: [PATCH 1/4] Changes of incremental import billing --- .../Import/ImportJobMetricsNotification.cs | 6 +- .../Import/ImportOrchestratorJobResult.cs | 5 + .../Import/ImportOrchestratorJob.cs | 4 +- .../Rest/Import/ImportTests.cs | 98 +++++++++++++++++++ 4 files changed, 111 insertions(+), 2 deletions(-) diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportJobMetricsNotification.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportJobMetricsNotification.cs index 5e3513fb11..55f5d6031a 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportJobMetricsNotification.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportJobMetricsNotification.cs @@ -18,7 +18,8 @@ public ImportJobMetricsNotification( DateTimeOffset endTime, long? dataSize, long? succeededCount, - long? failedCount) + long? failedCount, + ImportMode importMode) { FhirOperation = AuditEventSubType.Import; ResourceType = null; @@ -30,6 +31,7 @@ public ImportJobMetricsNotification( DataSize = dataSize; SucceededCount = succeededCount; FailedCount = failedCount; + ImportMode = importMode; } public string FhirOperation { get; } @@ -49,5 +51,7 @@ public ImportJobMetricsNotification( public long? SucceededCount { get; } public long? FailedCount { get; } + + public ImportMode ImportMode { get; } } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs index a2555635f0..8e9b1df2ee 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs @@ -54,5 +54,10 @@ public class ImportOrchestratorJobResult public int CreatedJobCount { get; set; } // TODO: remove in stage 3 public long? TotalSizeInBytes { get; set; } // TODO: remove in stage 3 + + /// + /// Import mode. + /// + public ImportMode ImportMode { get; set; } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs index 6c404f5f48..12875cb7cf 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs @@ -91,6 +91,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre _contextAccessor.RequestContext = fhirRequestContext; currentResult.Request = inputData.RequestUri.ToString(); + currentResult.ImportMode = inputData.ImportMode; ImportOrchestratorJobErrorResult errorResult = null; @@ -273,7 +274,8 @@ private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jo Clock.UtcNow, currentResult.TotalBytes, currentResult.SucceededResources, - currentResult.FailedResources); + currentResult.FailedResources, + currentResult.ImportMode); await _mediator.Publish(importJobMetricsNotification, CancellationToken.None); } diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs index 6c89f6e913..55b1c784e2 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs @@ -12,6 +12,7 @@ using System.Threading; using System.Threading.Tasks; using Hl7.Fhir.Model; +using MediatR; using Microsoft.Health.Fhir.Api.Features.Operations.Import; using Microsoft.Health.Fhir.Client; using Microsoft.Health.Fhir.Core.Features.Operations.Import; @@ -145,6 +146,103 @@ public async Task GivenIncrementalLoad_MultipleInputVersions_ResourceNotExisting Assert.Equal(GetLastUpdated("2002"), result.Resource.Meta.LastUpdated); } + [Fact] + public async Task GivenIncrementalImportInvalidResource_ThenErrorLogsShouldBeOutput_FailedRCountShouldMatch() + { + _metricHandler?.ResetCount(); + string patientNdJsonResource = Samples.GetNdJson("Import-InvalidPatient"); + patientNdJsonResource = Regex.Replace(patientNdJsonResource, "##PatientID##", m => Guid.NewGuid().ToString("N")); + (Uri location, string etag) = await ImportTestHelper.UploadFileAsync(patientNdJsonResource, _fixture.CloudStorageAccount); + + var request = new ImportRequest() + { + InputFormat = "application/fhir+ndjson", + InputSource = new Uri("https://other-server.example.org"), + StorageDetail = new ImportRequestStorageDetail() { Type = "azure-blob" }, + Input = new List() + { + new InputResource() + { + Url = location, + Etag = etag, + Type = "Patient", + }, + }, + Mode = ImportMode.IncrementalLoad.ToString(), + }; + + Uri checkLocation = await ImportTestHelper.CreateImportTaskAsync(_client, request); + + HttpResponseMessage response; + while ((response = await _client.CheckImportAsync(checkLocation, CancellationToken.None)).StatusCode == System.Net.HttpStatusCode.Accepted) + { + await Task.Delay(TimeSpan.FromSeconds(5)); + } + + Assert.Equal(System.Net.HttpStatusCode.OK, response.StatusCode); + ImportJobResult result = JsonConvert.DeserializeObject(await response.Content.ReadAsStringAsync()); + Assert.NotEmpty(result.Output); + Assert.Equal(1, result.Error.Count); + Assert.NotEmpty(result.Request); + + string errorLocation = result.Error.ToArray()[0].Url; + string[] errorContents = (await ImportTestHelper.DownloadFileAsync(errorLocation, _fixture.CloudStorageAccount)).Split("\r\n", StringSplitOptions.RemoveEmptyEntries); + Assert.True(errorContents.Count() >= 1); // when run locally there might be duplicates. no idea why. + + // Only check metric for local tests + if (_fixture.IsUsingInProcTestServer) + { + var resourceCount = Regex.Matches(patientNdJsonResource, "{\"resourceType\":").Count; + var notificationList = _metricHandler.NotificationMapping[typeof(ImportJobMetricsNotification)]; + Assert.Single(notificationList); + var notification = notificationList.First() as ImportJobMetricsNotification; + Assert.Equal(JobStatus.Completed.ToString(), notification.Status); + Assert.NotNull(notification.DataSize); + Assert.Equal(resourceCount, notification.SucceededCount); + Assert.Equal(1, notification.FailedCount); + Assert.Equal(ImportMode.IncrementalLoad, notification.ImportMode); + } + } + + [Fact] + [Trait(Traits.Category, Categories.Authorization)] + public async Task GivenAUserWithoutImportPermissions_WhenImportData_ThenServerShouldReturnForbidden_WithNoImportNotification() + { + TestFhirClient tempClient = _client.CreateClientForUser(TestUsers.ReadOnlyUser, TestApplications.NativeClient); + string patientNdJsonResource = Samples.GetNdJson("Import-Patient"); + (Uri location, string etag) = await ImportTestHelper.UploadFileAsync(patientNdJsonResource, _fixture.CloudStorageAccount); + + var request = new ImportRequest() + { + InputFormat = "application/fhir+ndjson", + InputSource = new Uri("https://other-server.example.org"), + StorageDetail = new ImportRequestStorageDetail() { Type = "azure-blob" }, + Input = new List() + { + new InputResource() + { + Url = location, + Type = "Patient", + }, + }, + Mode = ImportMode.IncrementalLoad.ToString(), + }; + + request.Mode = ImportMode.IncrementalLoad.ToString(); + request.Force = true; + FhirClientException fhirException = await Assert.ThrowsAsync(async () => await tempClient.ImportAsync(request.ToParameters(), CancellationToken.None)); + Assert.StartsWith(ForbiddenMessage, fhirException.Message); + Assert.Equal(HttpStatusCode.Forbidden, fhirException.StatusCode); + + // Only check metric for local tests + if (_fixture.IsUsingInProcTestServer) + { + List notificationList; + _metricHandler.NotificationMapping.TryGetValue(typeof(ImportJobMetricsNotification), out notificationList); + Assert.Null(notificationList); + } + } + [Fact] public async Task GivenIncrementalLoad_WhenOutOfOrder_ThenCurrentDatabaseVersionShouldRemain() { From e59b3624a1eb6f6aa690b689baee086512e4e122 Mon Sep 17 00:00:00 2001 From: rajithaaluri Date: Fri, 16 Jun 2023 12:29:30 -0500 Subject: [PATCH 2/4] code review comments addressed --- .../Import/ImportOrchestratorJobResult.cs | 9 --------- .../Import/ImportOrchestratorJob.cs | 19 +++++++++---------- .../Rest/Import/ImportTests.cs | 3 ++- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs index 8e9b1df2ee..4894770413 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorJobResult.cs @@ -50,14 +50,5 @@ public class ImportOrchestratorJobResult public long SucceedImportCount { get; set; } // TODO: remove in stage 3 public long FailedImportCount { get; set; } // TODO: remove in stage 3 - - public int CreatedJobCount { get; set; } // TODO: remove in stage 3 - - public long? TotalSizeInBytes { get; set; } // TODO: remove in stage 3 - - /// - /// Import mode. - /// - public ImportMode ImportMode { get; set; } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs index 12875cb7cf..905d745097 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs @@ -91,7 +91,6 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre _contextAccessor.RequestContext = fhirRequestContext; currentResult.Request = inputData.RequestUri.ToString(); - currentResult.ImportMode = inputData.ImportMode; ImportOrchestratorJobErrorResult errorResult = null; @@ -143,7 +142,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Processing jobs has been cancelled by CancelImportRequestHandler await WaitCancelledJobCompletedAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult); + await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode); } catch (OperationCanceledException canceledEx) { @@ -157,7 +156,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Processing jobs has been cancelled by CancelImportRequestHandler await WaitCancelledJobCompletedAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult); + await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode); } catch (IntegrationDataStoreException integrationDataStoreEx) { @@ -169,7 +168,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre ErrorMessage = integrationDataStoreEx.Message, }; - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult); + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } catch (ImportFileEtagNotMatchException eTagEx) { @@ -181,7 +180,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre ErrorMessage = eTagEx.Message, }; - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult); + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } catch (ImportProcessingException processingEx) { @@ -196,7 +195,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Cancel other processing jobs await CancelProcessingJobsAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult); + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } catch (RetriableJobException ex) { @@ -217,7 +216,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Cancel processing jobs for critical error in orchestrator job await CancelProcessingJobsAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult); + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } // Post-process operation cannot be cancelled. @@ -246,7 +245,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre throw new JobExecutionException(errorResult.ErrorMessage, errorResult); } - await SendImportMetricsNotification(JobStatus.Completed, jobInfo, currentResult); + await SendImportMetricsNotification(JobStatus.Completed, jobInfo, currentResult, inputData.ImportMode); return JsonConvert.SerializeObject(currentResult); } @@ -265,7 +264,7 @@ private async Task ValidateResourcesAsync(ImportOrchestratorJobDefinition inputD }); } - private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jobInfo, ImportOrchestratorJobResult currentResult) + private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jobInfo, ImportOrchestratorJobResult currentResult, ImportMode importMode) { var importJobMetricsNotification = new ImportJobMetricsNotification( jobInfo.Id.ToString(), @@ -275,7 +274,7 @@ private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jo currentResult.TotalBytes, currentResult.SucceededResources, currentResult.FailedResources, - currentResult.ImportMode); + importMode); await _mediator.Publish(importJobMetricsNotification, CancellationToken.None); } diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs index 55b1c784e2..94b8a3869e 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs @@ -147,7 +147,7 @@ public async Task GivenIncrementalLoad_MultipleInputVersions_ResourceNotExisting } [Fact] - public async Task GivenIncrementalImportInvalidResource_ThenErrorLogsShouldBeOutput_FailedRCountShouldMatch() + public async Task GivenIncrementalImportInvalidResource__WhenImportData_ThenErrorLogsShouldBeOutputAndFailedCountShouldMatch() { _metricHandler?.ResetCount(); string patientNdJsonResource = Samples.GetNdJson("Import-InvalidPatient"); @@ -208,6 +208,7 @@ public async Task GivenIncrementalImportInvalidResource_ThenErrorLogsShouldBeOut [Trait(Traits.Category, Categories.Authorization)] public async Task GivenAUserWithoutImportPermissions_WhenImportData_ThenServerShouldReturnForbidden_WithNoImportNotification() { + _metricHandler?.ResetCount(); TestFhirClient tempClient = _client.CreateClientForUser(TestUsers.ReadOnlyUser, TestApplications.NativeClient); string patientNdJsonResource = Samples.GetNdJson("Import-Patient"); (Uri location, string etag) = await ImportTestHelper.UploadFileAsync(patientNdJsonResource, _fixture.CloudStorageAccount); From 9af61659defee358e435e83d4b9c1f2a66dfd582 Mon Sep 17 00:00:00 2001 From: rajithaaluri Date: Fri, 16 Jun 2023 14:23:38 -0500 Subject: [PATCH 3/4] removed JobStatus arg in SendImportMetricsNotification method --- .../Import/ImportOrchestratorJob.cs | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs index 905d745097..28a458ffc7 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs @@ -142,7 +142,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Processing jobs has been cancelled by CancelImportRequestHandler await WaitCancelledJobCompletedAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode); + jobInfo.Status = JobStatus.Cancelled; + await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); } catch (OperationCanceledException canceledEx) { @@ -156,7 +157,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Processing jobs has been cancelled by CancelImportRequestHandler await WaitCancelledJobCompletedAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode); + jobInfo.Status = JobStatus.Cancelled; + await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); } catch (IntegrationDataStoreException integrationDataStoreEx) { @@ -167,8 +169,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre HttpStatusCode = integrationDataStoreEx.StatusCode, ErrorMessage = integrationDataStoreEx.Message, }; - - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); + jobInfo.Status = JobStatus.Failed; + await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); } catch (ImportFileEtagNotMatchException eTagEx) { @@ -179,8 +181,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre HttpStatusCode = HttpStatusCode.BadRequest, ErrorMessage = eTagEx.Message, }; - - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); + jobInfo.Status = JobStatus.Failed; + await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); } catch (ImportProcessingException processingEx) { @@ -195,7 +197,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Cancel other processing jobs await CancelProcessingJobsAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); + jobInfo.Status = JobStatus.Failed; + await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); } catch (RetriableJobException ex) { @@ -216,7 +219,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Cancel processing jobs for critical error in orchestrator job await CancelProcessingJobsAsync(jobInfo); - await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); + jobInfo.Status = JobStatus.Failed; + await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); } // Post-process operation cannot be cancelled. @@ -245,7 +249,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre throw new JobExecutionException(errorResult.ErrorMessage, errorResult); } - await SendImportMetricsNotification(JobStatus.Completed, jobInfo, currentResult, inputData.ImportMode); + jobInfo.Status = JobStatus.Completed; + await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); return JsonConvert.SerializeObject(currentResult); } @@ -264,11 +269,11 @@ private async Task ValidateResourcesAsync(ImportOrchestratorJobDefinition inputD }); } - private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jobInfo, ImportOrchestratorJobResult currentResult, ImportMode importMode) + private async Task SendImportMetricsNotification(JobInfo jobInfo, ImportOrchestratorJobResult currentResult, ImportMode importMode) { var importJobMetricsNotification = new ImportJobMetricsNotification( jobInfo.Id.ToString(), - jobStatus.ToString(), + jobInfo.Status.ToString(), jobInfo.CreateDate, Clock.UtcNow, currentResult.TotalBytes, From e35297996d5d7498a5a44d419c51c366d96d1988 Mon Sep 17 00:00:00 2001 From: rajithaaluri Date: Mon, 19 Jun 2023 10:50:24 -0500 Subject: [PATCH 4/4] had to revert change to fix ImportOrchestratorJobTests --- .../Import/ImportOrchestratorJob.cs | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs index 28a458ffc7..905d745097 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/ImportOrchestratorJob.cs @@ -142,8 +142,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Processing jobs has been cancelled by CancelImportRequestHandler await WaitCancelledJobCompletedAsync(jobInfo); - jobInfo.Status = JobStatus.Cancelled; - await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); + await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode); } catch (OperationCanceledException canceledEx) { @@ -157,8 +156,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Processing jobs has been cancelled by CancelImportRequestHandler await WaitCancelledJobCompletedAsync(jobInfo); - jobInfo.Status = JobStatus.Cancelled; - await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); + await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode); } catch (IntegrationDataStoreException integrationDataStoreEx) { @@ -169,8 +167,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre HttpStatusCode = integrationDataStoreEx.StatusCode, ErrorMessage = integrationDataStoreEx.Message, }; - jobInfo.Status = JobStatus.Failed; - await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); + + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } catch (ImportFileEtagNotMatchException eTagEx) { @@ -181,8 +179,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre HttpStatusCode = HttpStatusCode.BadRequest, ErrorMessage = eTagEx.Message, }; - jobInfo.Status = JobStatus.Failed; - await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); + + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } catch (ImportProcessingException processingEx) { @@ -197,8 +195,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Cancel other processing jobs await CancelProcessingJobsAsync(jobInfo); - jobInfo.Status = JobStatus.Failed; - await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } catch (RetriableJobException ex) { @@ -219,8 +216,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre // Cancel processing jobs for critical error in orchestrator job await CancelProcessingJobsAsync(jobInfo); - jobInfo.Status = JobStatus.Failed; - await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); + await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode); } // Post-process operation cannot be cancelled. @@ -249,8 +245,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre throw new JobExecutionException(errorResult.ErrorMessage, errorResult); } - jobInfo.Status = JobStatus.Completed; - await SendImportMetricsNotification(jobInfo, currentResult, inputData.ImportMode); + await SendImportMetricsNotification(JobStatus.Completed, jobInfo, currentResult, inputData.ImportMode); return JsonConvert.SerializeObject(currentResult); } @@ -269,11 +264,11 @@ private async Task ValidateResourcesAsync(ImportOrchestratorJobDefinition inputD }); } - private async Task SendImportMetricsNotification(JobInfo jobInfo, ImportOrchestratorJobResult currentResult, ImportMode importMode) + private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jobInfo, ImportOrchestratorJobResult currentResult, ImportMode importMode) { var importJobMetricsNotification = new ImportJobMetricsNotification( jobInfo.Id.ToString(), - jobInfo.Status.ToString(), + jobStatus.ToString(), jobInfo.CreateDate, Clock.UtcNow, currentResult.TotalBytes,