Skip to content

Commit

Permalink
Add support for alternate client (#193)
Browse files Browse the repository at this point in the history
* Add support for alternate client

* Refactor test case

* Fix replacement logic in case other parts of url contain "api"
  • Loading branch information
boma96 authored Aug 14, 2024
1 parent 2f789bf commit 32c5cdb
Show file tree
Hide file tree
Showing 20 changed files with 236 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@ public class WorkflowController : ControllerBase
private readonly IMetadataService _metadataService;
private readonly IWorkflowService _workflowService;
private readonly ITaskService _taskService;
private readonly IMetadataService _alternateMetadataService;
private const string NotificationWorfklowName = "NOTIFICATION_send_to_customer";

public WorkflowController(IMetadataService metadataService, IWorkflowService workflowService, ITaskService taskService)
public WorkflowController(
IMetadataService metadataService,
IWorkflowService workflowService,
ITaskService taskService,
[FromKeyedServices("Alternate")] IMetadataService alternateMetadataService
)
{
_metadataService = metadataService;
_workflowService = workflowService;
_taskService = taskService;
_alternateMetadataService = alternateMetadataService;
}

[HttpGet("get-workflows")]
Expand All @@ -45,4 +52,12 @@ await _workflowService.StartAsync(
Input = new Dictionary<string, object> { { "task_to_execute", "CUSTOMER_get" }, { "customer_id", request.CustomerId } }
}
);

[HttpGet("get-workflows/{discriminator}")]
public async Task<ICollection<WorkflowDef>> GetRegisteredWorkflows(string discriminator) =>
discriminator switch
{
"alternate" => await _alternateMetadataService.ListWorkflowsAsync(),
_ => await _metadataService.ListWorkflowsAsync()
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static IServiceCollection ConfigureApiEnabled(this IServiceCollection hos
{
hostBuilder
.AddConductorSharp(baseUrl: configuration.GetValue<string>("Conductor:BaseUrl"))
.AddAlternateClient(baseUrl: configuration.GetValue<string>("Conductor:AlternateUrl"), "Alternate", "api/workflow", true)
.AddExecutionManager(
maxConcurrentWorkers: configuration.GetValue<int>("Conductor:MaxConcurrentWorkers"),
sleepInterval: configuration.GetValue<int>("Conductor:SleepInterval"),
Expand Down
1 change: 1 addition & 0 deletions examples/ConductorSharp.ApiEnabled/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"Conductor": {
"BaseUrl": "http://localhost:8080",
"AlternateUrl": "https://fm-dev.10.7.6.124.nip.io",
"LongPollInterval": 100,
"MaxConcurrentWorkers": 10,
"SleepInterval": 500
Expand Down
3 changes: 2 additions & 1 deletion src/ConductorSharp.Client/ConductorSharp.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<PackageId>ConductorSharp.Client</PackageId>
<Version>3.0.1-beta8</Version>
<Version>3.0.1-beta9</Version>
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
<PackageTags>netflix;conductor</PackageTags>
Expand All @@ -17,6 +17,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
Expand Down
14 changes: 8 additions & 6 deletions src/ConductorSharp.Client/Service/AdminService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@

namespace ConductorSharp.Client.Service
{
public class AdminService(ConductorClient client) : IAdminService
public class AdminService(IHttpClientFactory httpClientFactory, string clientName) : IAdminService
{
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task<string> QueueRunningWorkflowsForSweepAsync(string workflowId, CancellationToken cancellationToken = default) =>
await client.RequeueSweepAsync(workflowId, cancellationToken);
await _client.RequeueSweepAsync(workflowId, cancellationToken);

public async Task<string> VerifyAndRepairWorkflowConsistencyAsync(string workflowId, CancellationToken cancellationToken = default) =>
await client.VerifyAndRepairWorkflowConsistencyAsync(workflowId, cancellationToken);
await _client.VerifyAndRepairWorkflowConsistencyAsync(workflowId, cancellationToken);

public async Task<ICollection<Generated.Task>> ListPendingTasksAsync(
string taskType,
int? start,
int? count,
CancellationToken cancellationToken = default
) => await client.ViewAsync(taskType, start, count, cancellationToken);
) => await _client.ViewAsync(taskType, start, count, cancellationToken);

public async Task<IDictionary<string, object>> GetEventQueueMapAsync(bool? verbose, CancellationToken cancellationToken = default) =>
await client.GetEventQueuesAsync(verbose, cancellationToken);
await _client.GetEventQueuesAsync(verbose, cancellationToken);

public async Task<IDictionary<string, object>> GetConfigMapAsync(CancellationToken cancellationToken = default) =>
await client.GetAllConfigAsync(cancellationToken);
await _client.GetAllConfigAsync(cancellationToken);
}
}
14 changes: 8 additions & 6 deletions src/ConductorSharp.Client/Service/EventService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,26 @@

namespace ConductorSharp.Client.Service
{
public class EventService(ConductorClient client) : IEventService
public class EventService(IHttpClientFactory httpClientFactory, string clientName) : IEventService
{
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task<ICollection<EventHandler>> ListAsync(CancellationToken cancellationToken = default) =>
await client.GetEventHandlersAsync(cancellationToken);
await _client.GetEventHandlersAsync(cancellationToken);

public async Task UpdateAsync(EventHandler eventHandler, CancellationToken cancellationToken = default) =>
await client.UpdateEventHandlerAsync(eventHandler, cancellationToken);
await _client.UpdateEventHandlerAsync(eventHandler, cancellationToken);

public async Task AddAsync(EventHandler eventHandler, CancellationToken cancellationToken = default) =>
await client.AddEventHandlerAsync(eventHandler, cancellationToken);
await _client.AddEventHandlerAsync(eventHandler, cancellationToken);

public async Task<ICollection<EventHandler>> ListForEventAsync(
string @event,
bool? activeOnly = null,
CancellationToken cancellationToken = default
) => await client.GetEventHandlersForEventAsync(@event, activeOnly, cancellationToken);
) => await _client.GetEventHandlersForEventAsync(@event, activeOnly, cancellationToken);

public async Task RemoveEventHandlerStatusAsync(string name, CancellationToken cancellationToken = default) =>
await client.RemoveEventHandlerStatusAsync(name, cancellationToken);
await _client.RemoveEventHandlerStatusAsync(name, cancellationToken);
}
}
9 changes: 6 additions & 3 deletions src/ConductorSharp.Client/Service/ExternalPayloadService.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using ConductorSharp.Client.Generated;
using System.Net.Http;
using ConductorSharp.Client.Generated;

namespace ConductorSharp.Client.Service
{
public class ExternalPayloadService(ConductorClient client) : IExternalPayloadService
public class ExternalPayloadService(IHttpClientFactory httpClientFactory, string clientName) : IExternalPayloadService
{
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task<FileResponse> GetExternalStorageDataAsync(string externalPayloadPath, CancellationToken cancellationToken = default) =>
await client.GetExternalStorageDataAsync(externalPayloadPath, cancellationToken);
await _client.GetExternalStorageDataAsync(externalPayloadPath, cancellationToken);
}
}
9 changes: 5 additions & 4 deletions src/ConductorSharp.Client/Service/HealthService.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using ConductorSharp.Client.Generated;
using System.Net.Http;
using ConductorSharp.Client.Generated;

namespace ConductorSharp.Client.Service
{
public class HealthService(ConductorClient client) : IHealthService
public class HealthService(IHttpClientFactory httpClientFactory, string clientName) : IHealthService
{
private readonly ConductorClient _conductorClient = client;
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task<HealthCheckStatus> CheckHealthAsync(CancellationToken cancellationToken = default) =>
await _conductorClient.DoCheckAsync(cancellationToken);
await _client.DoCheckAsync(cancellationToken);
}
}
30 changes: 16 additions & 14 deletions src/ConductorSharp.Client/Service/MetadataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,47 @@

namespace ConductorSharp.Client.Service
{
public class MetadataService(ConductorClient client) : IMetadataService
public class MetadataService(IHttpClientFactory httpClientFactory, string clientName) : IMetadataService
{
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task<ICollection<WorkflowDef>> ListWorkflowsAsync(CancellationToken cancellationToken = default) =>
await client.GetAllAsync(cancellationToken);
await _client.GetAllAsync(cancellationToken);

public async Task<BulkResponse> UpdateWorkflowsAsync(IEnumerable<WorkflowDef> workflows, CancellationToken cancellationToken = default) =>
await client.UpdateAsync(workflows, cancellationToken);
await _client.UpdateAsync(workflows, cancellationToken);

public async Task AddWorkflowAsync(WorkflowDef workflowDef, CancellationToken cancellationToken = default) =>
await client.CreateAsync(workflowDef, cancellationToken);
await _client.CreateAsync(workflowDef, cancellationToken);

public async Task<ICollection<TaskDef>> ListTasksAsync(CancellationToken cancellationToken = default) =>
await client.GetTaskDefsAsync(cancellationToken);
await _client.GetTaskDefsAsync(cancellationToken);

public async Task AddTaskAsync(TaskDef taskDef, CancellationToken cancellationToken = default) =>
await client.RegisterTaskDefAsync(taskDef, cancellationToken);
await _client.RegisterTaskDefAsync(taskDef, cancellationToken);

public async Task AddTasksAsync(IEnumerable<TaskDef> taskDefs, CancellationToken cancellationToken = default) =>
await client.RegisterTaskDef_1Async(taskDefs, cancellationToken);
await _client.RegisterTaskDef_1Async(taskDefs, cancellationToken);

public async Task ValidateWorkflowAsync(WorkflowDef workflowDef, CancellationToken cancellationToken = default) =>
await client.ValidateAsync(workflowDef, cancellationToken);
await _client.ValidateAsync(workflowDef, cancellationToken);

public async Task<WorkflowDef> GetWorkflowAsync(string name, int? version = null, CancellationToken cancellationToken = default) =>
await client.GetAsync(name, version, cancellationToken);
await _client.GetAsync(name, version, cancellationToken);

public async Task<IDictionary<string, object>> GetWorkflowNamesAndVersionsAsync(CancellationToken cancellationToken = default) =>
await client.GetWorkflowNamesAndVersionsAsync(cancellationToken);
await _client.GetWorkflowNamesAndVersionsAsync(cancellationToken);

public async Task<ICollection<WorkflowDef>> GetAllWorkflowsWithLatestVersionsAsync(CancellationToken cancellationToken = default) =>
await client.GetAllWorkflowsWithLatestVersionsAsync(cancellationToken);
await _client.GetAllWorkflowsWithLatestVersionsAsync(cancellationToken);

public async Task<TaskDef> GetTaskAsync(string taskType, CancellationToken cancellationToken = default) =>
await client.GetTaskDefAsync(taskType, cancellationToken);
await _client.GetTaskDefAsync(taskType, cancellationToken);

public async Task DeleteTaskAsync(string taskType, CancellationToken cancellationToken = default) =>
await client.UnregisterTaskDefAsync(taskType, cancellationToken);
await _client.UnregisterTaskDefAsync(taskType, cancellationToken);

public async Task DeleteWorkflowAsync(string name, int version, CancellationToken cancellationToken = default) =>
await client.UnregisterWorkflowDefAsync(name, version, cancellationToken);
await _client.UnregisterWorkflowDefAsync(name, version, cancellationToken);
}
}
15 changes: 9 additions & 6 deletions src/ConductorSharp.Client/Service/QueueAdminService.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
using ConductorSharp.Client.Generated;
using System.Net.Http;
using ConductorSharp.Client.Generated;

namespace ConductorSharp.Client.Service;

public class QueueAdminService(ConductorClient client) : IQueueAdminService
public class QueueAdminService(IHttpClientFactory httpClientFactory, string clientName) : IQueueAdminService
{
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task MarkWaitTaskCompletedAsync(
string workflowId,
string taskRefName,
Status2 status,
IDictionary<string, object> output,
CancellationToken cancellationToken = default
) => await client.Update_1Async(workflowId, taskRefName, status, output, cancellationToken);
) => await _client.Update_1Async(workflowId, taskRefName, status, output, cancellationToken);

public async Task MarkWaitTaskCompletedAsync(
string workflowId,
string taskId,
Generated.TaskStatus status,
IDictionary<string, object> output,
CancellationToken cancellationToken = default
) => await client.UpdateByTaskIdAsync(workflowId, taskId, status, output, cancellationToken);
) => await _client.UpdateByTaskIdAsync(workflowId, taskId, status, output, cancellationToken);

public async Task<IDictionary<string, long>> GetQueueLengthAsync(CancellationToken cancellationToken = default) =>
await client.Size_1Async(cancellationToken);
await _client.Size_1Async(cancellationToken);

public async Task<IDictionary<string, string>> GetQueueNamesAsync(CancellationToken cancellationToken = default) =>
await client.NamesAsync(cancellationToken);
await _client.NamesAsync(cancellationToken);
}
4 changes: 2 additions & 2 deletions src/ConductorSharp.Client/Service/TaskService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace ConductorSharp.Client.Service
{
public class TaskService(ConductorClient client) : ITaskService
public class TaskService(IHttpClientFactory httpClientFactory, string clientName) : ITaskService
{
private readonly ConductorClient _client = client;
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task<string> UpdateAsync(TaskResult updateRequest, CancellationToken cancellationToken = default) =>
await _client.UpdateTaskAsync(updateRequest, cancellationToken);
Expand Down
14 changes: 8 additions & 6 deletions src/ConductorSharp.Client/Service/WorkflowBulkService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@

namespace ConductorSharp.Client.Service;

public class WorkflowBulkService(ConductorClient client) : IWorkflowBulkService
public class WorkflowBulkService(IHttpClientFactory httpClientFactory, string clientName) : IWorkflowBulkService
{
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

public async Task<BulkResponse> ResumeAsync(IEnumerable<string> workflowIds, CancellationToken cancellationToken = default) =>
await client.ResumeWorkflow_1Async(workflowIds, cancellationToken);
await _client.ResumeWorkflow_1Async(workflowIds, cancellationToken);

public async Task<BulkResponse> PauseAsync(IEnumerable<string> workflowIds, CancellationToken cancellationToken = default) =>
await client.PauseWorkflow_1Async(workflowIds, cancellationToken);
await _client.PauseWorkflow_1Async(workflowIds, cancellationToken);

public async Task<BulkResponse> TerminateAsync(
IEnumerable<string> worklowIds,
string? reason = null,
CancellationToken cancellationToken = default
) => await client.TerminateAsync(reason, worklowIds, cancellationToken);
) => await _client.TerminateAsync(reason, worklowIds, cancellationToken);

public async Task<BulkResponse> RetryAsync(IEnumerable<string> workflowIds, CancellationToken cancellationToken = default) =>
await client.Retry_1Async(workflowIds, cancellationToken);
await _client.Retry_1Async(workflowIds, cancellationToken);

public async Task<BulkResponse> RestartAsync(
IEnumerable<string> workflowIds,
bool? useLatestDefinition = null,
CancellationToken cancellationToken = default
) => await client.Restart_1Async(useLatestDefinition, workflowIds, cancellationToken);
) => await _client.Restart_1Async(useLatestDefinition, workflowIds, cancellationToken);
}
7 changes: 4 additions & 3 deletions src/ConductorSharp.Client/Service/WorkflowService.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using ConductorSharp.Client.Generated;
using System.Net.Http;
using ConductorSharp.Client.Generated;

namespace ConductorSharp.Client.Service
{
public class WorkflowService(ConductorClient client) : IWorkflowService
public class WorkflowService(IHttpClientFactory httpClientFactory, string clientName) : IWorkflowService
{
private readonly ConductorClient _client = client;
private readonly ConductorClient _client = new(httpClientFactory.CreateClient(clientName));

/// <summary>
/// Skips a given task from a current running workflow
Expand Down
2 changes: 1 addition & 1 deletion src/ConductorSharp.Engine/ConductorSharp.Engine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<PackageId>ConductorSharp.Engine</PackageId>
<Version>3.0.1-beta8</Version>
<Version>3.0.1-beta9</Version>
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
<PackageTags>netflix;conductor</PackageTags>
Expand Down
28 changes: 28 additions & 0 deletions src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Net.Http;
using System.Reflection;
using ConductorSharp.Client.Service;
using ConductorSharp.Engine.Behaviors;
using ConductorSharp.Engine.Health;
using ConductorSharp.Engine.Interface;
Expand Down Expand Up @@ -86,5 +88,31 @@ public IConductorSharpBuilder SetBuildConfiguration(BuildConfiguration buildConf
Builder.AddSingleton(buildConfiguration);
return this;
}

public IConductorSharpBuilder AddAlternateClient(string baseUrl, string key, string apiPath = "api", bool ignoreInvalidCertificate = false)
{
var clientBuilder = builder
.AddHttpClient(key, client => client.BaseAddress = new(baseUrl))
.AddHttpMessageHandler(() => new ApiPathOverrideHttpHandler(apiPath));

if (ignoreInvalidCertificate)
{
clientBuilder.ConfigurePrimaryHttpMessageHandler(
() => new HttpClientHandler { ServerCertificateCustomValidationCallback = (_, _, _, _) => true }
);
}

builder.AddKeyedTransient<IAdminService, AdminService>(key, ((sp, _) => new(sp.GetService<IHttpClientFactory>(), key)));
builder.AddKeyedTransient<IEventService, EventService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));
builder.AddKeyedTransient<IExternalPayloadService, ExternalPayloadService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));
builder.AddKeyedTransient<IQueueAdminService, QueueAdminService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));
builder.AddKeyedTransient<IWorkflowBulkService, WorkflowBulkService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));
builder.AddKeyedTransient<ITaskService, TaskService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));
builder.AddKeyedTransient<IHealthService, HealthService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));
builder.AddKeyedTransient<IMetadataService, MetadataService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));
builder.AddKeyedTransient<IWorkflowService, WorkflowService>(key, (sp, _) => new(sp.GetService<IHttpClientFactory>(), key));

return this;
}
}
}
Loading

0 comments on commit 32c5cdb

Please sign in to comment.