Skip to content

Commit

Permalink
Added Auth to Importer Tool (#3641)
Browse files Browse the repository at this point in the history
* Added token to importer tool via defaultazurecredential

* Simplified handler

* fixed typo

* updated importer docs

* .NET 8 updates

* remove unneeded message

* formatting

* formatting
  • Loading branch information
mikaelweave authored Dec 19, 2023
1 parent 1dae2e5 commit 6787f2b
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 17 deletions.
5 changes: 3 additions & 2 deletions Microsoft.Health.Fhir.sln
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
CustomAnalysisRules.Test.ruleset = CustomAnalysisRules.Test.ruleset
Directory.Build.props = Directory.Build.props
Directory.Packages.props = Directory.Packages.props
build\docker\Dockerfile = build\docker\Dockerfile
GitVersion.yml = GitVersion.yml
global.json = global.json
testauthenvironment.json = testauthenvironment.json
xunit.runner.json = xunit.runner.json
build\docker\Dockerfile = build\docker\Dockerfile
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{B5F2D2DF-D0C7-4861-8259-F6A041DB9854}"
Expand Down Expand Up @@ -547,6 +547,7 @@ Global
{5E456FD7-E5A5-41F4-A0D3-7215585AEB7A} = {FCD51BAF-BFE5-476F-B562-C9AB36AA9839}
{A5DED132-32B1-4804-95F5-EBC6092EC8AE} = {85F39C13-BC62-49A0-9C06-3BBA724D35ED}
{D6C90E8C-50AF-45D8-B2D3-1B9B07E65F3E} = {1295CCC3-73FB-4376-AE95-F6F31A37B152}
{F4DE2945-80C5-48FE-B58A-4AD1264C9FEA} = {B70945F4-01A6-4351-955B-C4A2943B5E3B}
{73BC4365-2BFB-41C7-B0B1-610059F967AC} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{C834E05D-79CA-4983-8599-28AC098F755A} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
Expand All @@ -558,8 +559,8 @@ Global
{76C29222-8D35-43A2-89C5-43114D113C10} = {B70945F4-01A6-4351-955B-C4A2943B5E3B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
RESX_SortFileContentOnSave = True
SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8}
RESX_SortFileContentOnSave = True
EndGlobalSection
GlobalSection(SharedMSBuildProjectFiles) = preSolution
test\Microsoft.Health.Fhir.Shared.Tests.E2E.Common\Microsoft.Health.Fhir.Shared.Tests.E2E.Common.projitems*{0478b687-7105-40f6-a2dc-81057890e944}*SharedItemsImports = 13
Expand Down
6 changes: 5 additions & 1 deletion tools/Importer/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<add key="ConnectionString" value="Connection string to Azure blob storage" />
<add key="ContainerName" value="Container name which contains files in ndjson format" />
<!-- FHIR endpoints. To add secondary endpoints use ";" separator. -->
<add key="FhirEndpoints" value="https:/(your fhir app service).azurewebsites.net;" />
<add key="FhirEndpoints" value="https://(your fhir app service).azurewebsites.net;" />
<!-- Code is processing blobs in ranges.
If blobs are grouped in subdirectories with close number of resources per subdirectory, set this value to number of blobs in subdirectory. -->
<add key="BlobRangeSize" value="19" />
Expand All @@ -44,5 +44,9 @@
<!-- UseStringJsonParserCompare allows to compare results of lightweight string parser with convert to JObject.
It takes affect only if string parser is enabled. Use true only for test purposes. -->
<add key="UseStringJsonParserCompare" value="false" />
<!-- UseAuth will cause requests to be used with a bearer token generated by DefaultAzureCredential. -->
<add key="UseFhirAuth" value="false" />
<!-- Override for a custom auth (works by default with Azure FHIR managed services). Used with UseAuth.-->
<add key="FhirScopes" value="" />
</appSettings>
</configuration>
156 changes: 156 additions & 0 deletions tools/Importer/BearerTokenHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Identity;

namespace Microsoft.Health.Fhir.Importer;

#pragma warning disable SA1010 // Opening square brackets should be spaced correctly. Fixed https://github.com/DotNetAnalyzers/StyleCopAnalyzers/pull/3745 but not available yet.

public class BearerTokenHandler : DelegatingHandler
{
private readonly Dictionary<string, AccessTokenCache> _accessTokenCaches = [];

public BearerTokenHandler(TokenCredential tokenCredential, Uri[] baseAddresses, string[] scopes)
: this(tokenCredential, baseAddresses, scopes, TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(30))
{
}

internal BearerTokenHandler(
TokenCredential tokenCredential,
Uri[] baseAddresses,
string[] scopes,
TimeSpan tokenRefreshOffset,
TimeSpan tokenRefreshRetryDelay)
{
if (scopes.Length == 0)
{
scopes = baseAddresses.Select(ba => $"{ba.GetLeftPart(UriPartial.Authority)}/.default").ToArray();
}

if (scopes.Length != baseAddresses.Length)
{
throw new ArgumentException("The number of scopes must match the number of base addresses.", nameof(scopes));
}

foreach ((Uri baseAddress, string scope) in baseAddresses.Zip(scopes, (ba, s) => (ba, s)))
{
_accessTokenCaches.Add(baseAddress.GetLeftPart(UriPartial.Authority), new AccessTokenCache(tokenCredential, scope, tokenRefreshOffset, tokenRefreshRetryDelay));
}

InnerHandler = new HttpClientHandler();
}

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
// Only add header for requests that don't already have one.
if (request is null || request.Headers is null || request.Headers.Authorization is not null)
{
return await base.SendAsync(request, cancellationToken);
}

if (request.RequestUri.Scheme != Uri.UriSchemeHttps && request.RequestUri.Host != "localhost")
{
throw new InvalidOperationException("Bearer token authentication is not permitted for non TLS protected (https) endpoints.");
}

if (_accessTokenCaches.TryGetValue(request.RequestUri.GetLeftPart(UriPartial.Authority), out AccessTokenCache tc))
{
AccessToken cachedToken = await tc.GetTokenAsync(cancellationToken).ConfigureAwait(false);
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", cachedToken.Token);
}

// Send the request.
return await base.SendAsync(request, cancellationToken);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
foreach ((string _, AccessTokenCache ac) in _accessTokenCaches)
{
ac.Dispose();
}
}

base.Dispose(disposing);
}

private sealed class AccessTokenCache(
TokenCredential tokenCredential,
string scope,
TimeSpan tokenRefreshOffset,
TimeSpan tokenRefreshRetryDelay) : IDisposable
{
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
private bool _disposed;
private readonly TokenCredential _tokenCredential = tokenCredential;
private readonly string _scope = scope;
private readonly TimeSpan _tokenRefreshOffset = tokenRefreshOffset;
private readonly TimeSpan _tokenRefreshRetryDelay = tokenRefreshRetryDelay;
private AccessToken? _accessToken;
private DateTimeOffset _accessTokenExpiration;

public async Task<AccessToken> GetTokenAsync(CancellationToken cancellationToken)
{
await _semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_accessToken is null || _accessTokenExpiration <= DateTimeOffset.UtcNow + _tokenRefreshOffset)
{
try
{
_accessToken = await _tokenCredential.GetTokenAsync(new TokenRequestContext([_scope]), cancellationToken).ConfigureAwait(false);
_accessTokenExpiration = _accessToken.Value.ExpiresOn;
}
catch (AuthenticationFailedException)
{
// If the token acquisition fails, retry after the delay.
await Task.Delay(_tokenRefreshRetryDelay, cancellationToken).ConfigureAwait(false);
_accessToken = await _tokenCredential.GetTokenAsync(new TokenRequestContext([_scope]), cancellationToken).ConfigureAwait(false);
_accessTokenExpiration = _accessToken.Value.ExpiresOn;
}
}

return _accessToken.Value;
}
finally
{
_semaphoreSlim.Release();
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

public void Dispose(bool disposing)
{
if (_disposed)
{
return;
}

if (disposing)
{
_semaphoreSlim?.Dispose();
}

_disposed = true;
}
}
}
32 changes: 29 additions & 3 deletions tools/Importer/Importer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using System.Net.Http;
using System.Text;
using System.Threading;
using Azure.Core;
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Health.Fhir.Store.Utils;
Expand All @@ -22,7 +24,6 @@ namespace Microsoft.Health.Fhir.Importer
{
internal static class Importer
{
private static readonly HttpClient HttpClient = new HttpClient();
private static readonly string Endpoints = ConfigurationManager.AppSettings["FhirEndpoints"];
private static readonly string ConnectionString = ConfigurationManager.AppSettings["ConnectionString"];
private static readonly string ContainerName = ConfigurationManager.AppSettings["ContainerName"];
Expand All @@ -35,6 +36,8 @@ internal static class Importer
private static readonly int MaxRetries = int.Parse(ConfigurationManager.AppSettings["MaxRetries"]);
private static readonly bool UseStringJsonParser = bool.Parse(ConfigurationManager.AppSettings["UseStringJsonParser"]);
private static readonly bool UseStringJsonParserCompare = bool.Parse(ConfigurationManager.AppSettings["UseStringJsonParserCompare"]);
private static readonly bool UseFhirAuth = bool.Parse(ConfigurationManager.AppSettings["UseFhirAuth"]);
private static readonly string FhirScopes = ConfigurationManager.AppSettings["FhirScopes"];

private static long totalReads = 0L;
private static long readers = 0L;
Expand All @@ -44,6 +47,11 @@ internal static class Importer
private static long epCalls = 0L;
private static long waits = 0L;
private static List<string> endpoints;
private static TokenCredential credential;
private static HttpClient httpClient = new();
private static DelegatingHandler handler;

#pragma warning disable SA1010 // Opening square brackets should be spaced correctly. Fixed https://github.com/DotNetAnalyzers/StyleCopAnalyzers/pull/3745 but not available yet.

internal static void Run()
{
Expand All @@ -52,7 +60,25 @@ internal static void Run()
throw new ArgumentException("FhirEndpoints value is empty");
}

endpoints = Endpoints.Split(";", StringSplitOptions.RemoveEmptyEntries).ToList();
endpoints = [.. Endpoints.Split(";", StringSplitOptions.RemoveEmptyEntries)];

if (UseFhirAuth)
{
List<string> scopes = [.. FhirScopes.Split(";", StringSplitOptions.RemoveEmptyEntries)];
if (scopes.Count == 0)
{
scopes = endpoints.Select(x => $"{x}/.default").ToList();
}

if (scopes.Count != endpoints.Count)
{
throw new ArgumentException("FhirScopes and FhirEndpoints must have the same number of values.");
}

credential = new DefaultAzureCredential();
handler = new BearerTokenHandler(credential, endpoints.Select(x => new Uri(x)).ToArray(), [.. scopes]);
httpClient = new HttpClient(handler);
}

var globalPrefix = $"RequestedBlobRange=[{NumberOfBlobsToSkip + 1}-{MaxBlobIndexForImport}]";
Console.WriteLine($"{globalPrefix}: Starting...");
Expand Down Expand Up @@ -199,7 +225,7 @@ private static void PutResource(string jsonString, IndexIncrementor incrementor)
try
{
Thread.Sleep(40);
var response = HttpClient.PutAsync(uri, content).Result;
var response = httpClient.PutAsync(uri, content).Result;
switch (response.StatusCode)
{
case HttpStatusCode.OK:
Expand Down
3 changes: 2 additions & 1 deletion tools/Importer/Importer.csproj
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Newtonsoft.Json" />
<PackageReference Include="System.Configuration.ConfigurationManager" />
Expand Down
15 changes: 5 additions & 10 deletions tools/Importer/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The Importer is a .net console application that loads data to the FHIR server at

Note that the Importer works for service APIs with either Cosmos DB and SQL database backend. However, high throughput rates, as noted in the app.config file, can currently be obtained with Cosmos DB backend with scaled up/out resources. The similar throughput rates cannot be obtained with SQL database backend regardless because resources in bundles are processed one by one through the API, which does not deliver the maximum performance with SQL database.

For SQL database backend, you can use the [bulk import](../../docs/BulkImport.md) ($import) feature for initial data loading.
For SQL database backend, you should use the [bulk import](../../docs/BulkImport.md) ($import) feature for initial data loading.

For more info on other data loading tools and options, check the [documentation](https://docs.microsoft.com/azure/healthcare-apis/fhir/bulk-importing-fhir-data).

Expand All @@ -19,18 +19,13 @@ The primary goal of the Importer is to help load data to the FHIR server as quic
- Define batches for read threads (e.g. 19 files in a batch) and write threads (e.g. 100 lines in a batch from an ndjson file).
- Report reads and writes data for a defined period time, e.g. 30 seconds.
- Retry if an error is encountered, e.g. 10 times.
- Auth using [DefaultAzureCredential](https://learn.microsoft.com/en-us/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) which provides auth foor managed identities, client credentials (via envirment variables), Azure CLI, Azure Powershell, and Visual Studio credentials.

## Scale for performance

The data processing speeds depend on resources of Azure App Service and Cosmos DB RU/s. You can scale up and/or scale out the App service in the app service plan.

## Known Limits
## Configuring Auth

The Importer is kept lightweight and works with the open-source FHIR server with security disabled. However, you can still use the tool when Azure AD based security is enabled. All you need to do is to complete the following steps.

- Complete application registration.
- Grant the client application with access permissions to the FHIR server.
- Add code to get an Azure Active Directory access token.
- Add the access token in your Put request.

For more details on the steps mentioned above, check [documentation here](https://docs.microsoft.com/azure/healthcare-apis/register-application).
1. Change the `UseFhirAuth` configuration setting to `true`. If your endpoint is a PaaS service that doesn't have a custom audience, you do not need to set an audience. For other scenarios, you must add the correct scope your endpoint requires. Examples include: `https://servicename.azurehealthcareapis.com/.default` and `https://myappservice.azurewebsites.net/user_impersonation`.
1. Choose and ensure the principal you are using to load data has access to the FHIR endpoint. For example with loading data to PaaS from a Virtual Machine in Azure, you need to enable Managed Identity on the VM and assign the `FHIR Data Contributor` and `FHIR Importer` roles to the Managed Identity.

0 comments on commit 6787f2b

Please sign in to comment.