Skip to content

Commit

Permalink
Changes of incremental import billing (#3354)
Browse files Browse the repository at this point in the history
* Changes of incremental import billing

* code review comments addressed

* removed JobStatus arg in SendImportMetricsNotification method

* had to revert change to fix ImportOrchestratorJobTests
  • Loading branch information
rajithaalurims authored Jun 19, 2023
1 parent 38f901e commit 7d60b61
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public ImportJobMetricsNotification(
DateTimeOffset endTime,
long? dataSize,
long? succeededCount,
long? failedCount)
long? failedCount,
ImportMode importMode)
{
FhirOperation = AuditEventSubType.Import;
ResourceType = null;
Expand All @@ -30,6 +31,7 @@ public ImportJobMetricsNotification(
DataSize = dataSize;
SucceededCount = succeededCount;
FailedCount = failedCount;
ImportMode = importMode;
}

public string FhirOperation { get; }
Expand All @@ -49,5 +51,7 @@ public ImportJobMetricsNotification(
public long? SucceededCount { get; }

public long? FailedCount { get; }

public ImportMode ImportMode { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,5 @@ public class ImportOrchestratorJobResult
public long SucceedImportCount { get; set; } // TODO: remove in stage 3

public long FailedImportCount { get; set; } // TODO: remove in stage 3

public int CreatedJobCount { get; set; } // TODO: remove in stage 3

public long? TotalSizeInBytes { get; set; } // TODO: remove in stage 3
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre

// Processing jobs has been cancelled by CancelImportRequestHandler
await WaitCancelledJobCompletedAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult);
await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode);
}
catch (OperationCanceledException canceledEx)
{
Expand All @@ -157,7 +157,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre

// Processing jobs has been cancelled by CancelImportRequestHandler
await WaitCancelledJobCompletedAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult);
await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode);
}
catch (IntegrationDataStoreException integrationDataStoreEx)
{
Expand All @@ -169,7 +169,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre
ErrorMessage = integrationDataStoreEx.Message,
};

await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode);
}
catch (ImportFileEtagNotMatchException eTagEx)
{
Expand All @@ -181,7 +181,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre
ErrorMessage = eTagEx.Message,
};

await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode);
}
catch (ImportProcessingException processingEx)
{
Expand All @@ -196,7 +196,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre

// Cancel other processing jobs
await CancelProcessingJobsAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode);
}
catch (RetriableJobException ex)
{
Expand All @@ -217,7 +217,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre

// Cancel processing jobs for critical error in orchestrator job
await CancelProcessingJobsAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode);
}

// Post-process operation cannot be cancelled.
Expand Down Expand Up @@ -246,7 +246,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre
throw new JobExecutionException(errorResult.ErrorMessage, errorResult);
}

await SendImportMetricsNotification(JobStatus.Completed, jobInfo, currentResult);
await SendImportMetricsNotification(JobStatus.Completed, jobInfo, currentResult, inputData.ImportMode);
return JsonConvert.SerializeObject(currentResult);
}

Expand All @@ -265,7 +265,7 @@ private async Task ValidateResourcesAsync(ImportOrchestratorJobDefinition inputD
});
}

private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jobInfo, ImportOrchestratorJobResult currentResult)
private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jobInfo, ImportOrchestratorJobResult currentResult, ImportMode importMode)
{
var importJobMetricsNotification = new ImportJobMetricsNotification(
jobInfo.Id.ToString(),
Expand All @@ -274,7 +274,8 @@ private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jo
Clock.UtcNow,
currentResult.TotalBytes,
currentResult.SucceededResources,
currentResult.FailedResources);
currentResult.FailedResources,
importMode);

await _mediator.Publish(importJobMetricsNotification, CancellationToken.None);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading;
using System.Threading.Tasks;
using Hl7.Fhir.Model;
using MediatR;
using Microsoft.Health.Fhir.Api.Features.Operations.Import;
using Microsoft.Health.Fhir.Client;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
Expand Down Expand Up @@ -145,6 +146,104 @@ public async Task GivenIncrementalLoad_MultipleInputVersions_ResourceNotExisting
Assert.Equal(GetLastUpdated("2002"), result.Resource.Meta.LastUpdated);
}

[Fact]
public async Task GivenIncrementalImportInvalidResource__WhenImportData_ThenErrorLogsShouldBeOutputAndFailedCountShouldMatch()
{
_metricHandler?.ResetCount();
string patientNdJsonResource = Samples.GetNdJson("Import-InvalidPatient");
patientNdJsonResource = Regex.Replace(patientNdJsonResource, "##PatientID##", m => Guid.NewGuid().ToString("N"));
(Uri location, string etag) = await ImportTestHelper.UploadFileAsync(patientNdJsonResource, _fixture.CloudStorageAccount);

var request = new ImportRequest()
{
InputFormat = "application/fhir+ndjson",
InputSource = new Uri("https://other-server.example.org"),
StorageDetail = new ImportRequestStorageDetail() { Type = "azure-blob" },
Input = new List<InputResource>()
{
new InputResource()
{
Url = location,
Etag = etag,
Type = "Patient",
},
},
Mode = ImportMode.IncrementalLoad.ToString(),
};

Uri checkLocation = await ImportTestHelper.CreateImportTaskAsync(_client, request);

HttpResponseMessage response;
while ((response = await _client.CheckImportAsync(checkLocation, CancellationToken.None)).StatusCode == System.Net.HttpStatusCode.Accepted)
{
await Task.Delay(TimeSpan.FromSeconds(5));
}

Assert.Equal(System.Net.HttpStatusCode.OK, response.StatusCode);
ImportJobResult result = JsonConvert.DeserializeObject<ImportJobResult>(await response.Content.ReadAsStringAsync());
Assert.NotEmpty(result.Output);
Assert.Equal(1, result.Error.Count);
Assert.NotEmpty(result.Request);

string errorLocation = result.Error.ToArray()[0].Url;
string[] errorContents = (await ImportTestHelper.DownloadFileAsync(errorLocation, _fixture.CloudStorageAccount)).Split("\r\n", StringSplitOptions.RemoveEmptyEntries);
Assert.True(errorContents.Count() >= 1); // when run locally there might be duplicates. no idea why.

// Only check metric for local tests
if (_fixture.IsUsingInProcTestServer)
{
var resourceCount = Regex.Matches(patientNdJsonResource, "{\"resourceType\":").Count;
var notificationList = _metricHandler.NotificationMapping[typeof(ImportJobMetricsNotification)];
Assert.Single(notificationList);
var notification = notificationList.First() as ImportJobMetricsNotification;
Assert.Equal(JobStatus.Completed.ToString(), notification.Status);
Assert.NotNull(notification.DataSize);
Assert.Equal(resourceCount, notification.SucceededCount);
Assert.Equal(1, notification.FailedCount);
Assert.Equal(ImportMode.IncrementalLoad, notification.ImportMode);
}
}

[Fact]
[Trait(Traits.Category, Categories.Authorization)]
public async Task GivenAUserWithoutImportPermissions_WhenImportData_ThenServerShouldReturnForbidden_WithNoImportNotification()
{
_metricHandler?.ResetCount();
TestFhirClient tempClient = _client.CreateClientForUser(TestUsers.ReadOnlyUser, TestApplications.NativeClient);
string patientNdJsonResource = Samples.GetNdJson("Import-Patient");
(Uri location, string etag) = await ImportTestHelper.UploadFileAsync(patientNdJsonResource, _fixture.CloudStorageAccount);

var request = new ImportRequest()
{
InputFormat = "application/fhir+ndjson",
InputSource = new Uri("https://other-server.example.org"),
StorageDetail = new ImportRequestStorageDetail() { Type = "azure-blob" },
Input = new List<InputResource>()
{
new InputResource()
{
Url = location,
Type = "Patient",
},
},
Mode = ImportMode.IncrementalLoad.ToString(),
};

request.Mode = ImportMode.IncrementalLoad.ToString();
request.Force = true;
FhirClientException fhirException = await Assert.ThrowsAsync<FhirClientException>(async () => await tempClient.ImportAsync(request.ToParameters(), CancellationToken.None));
Assert.StartsWith(ForbiddenMessage, fhirException.Message);
Assert.Equal(HttpStatusCode.Forbidden, fhirException.StatusCode);

// Only check metric for local tests
if (_fixture.IsUsingInProcTestServer)
{
List<INotification> notificationList;
_metricHandler.NotificationMapping.TryGetValue(typeof(ImportJobMetricsNotification), out notificationList);
Assert.Null(notificationList);
}
}

[Fact]
public async Task GivenIncrementalLoad_WhenOutOfOrder_ThenCurrentDatabaseVersionShouldRemain()
{
Expand Down

0 comments on commit 7d60b61

Please sign in to comment.