Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: modify task completed to notify job track but discrete overseer (MAPCO-4798) #167

Merged
merged 4 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions MergerLogic/Clients/HeartbeatClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using MergerLogic.Utils;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Reflection;
using System.Text;
using System.Timers;
using System.Net.Http.Json;

namespace MergerLogic.Clients
{
Expand All @@ -27,14 +30,16 @@ public HeartbeatClient(ILogger<GpkgClient> logger, IConfigurationManager configu
this._timer.Elapsed += this.Send;
}

~HeartbeatClient() {
~HeartbeatClient()
{
this._timer.Elapsed -= this.Send;
this._timer.Dispose();
}

public void Start(string taskId)
{
if (this._timer.Enabled) {
if (this._timer.Enabled)
{
this.Stop();
}
this._logger.LogInformation($"[{MethodBase.GetCurrentMethod().Name}] Starting heartbeat for task={taskId}");
Expand All @@ -50,7 +55,23 @@ public void Stop()
}
this._logger.LogInformation($"[{MethodBase.GetCurrentMethod().Name}] Stops heartbeats for taskId={this._taskId}");
this._timer.Stop();
this._taskId = null;
try
{
string relativeUri = $"heartbeat/remove";
string url = new Uri(new Uri(this._baseUrl), relativeUri).ToString();
var content = JsonContent.Create(new[] { this._taskId });
this._httpClient.PostData(url, content);
}
catch (Exception e)
{
string message = $"[{MethodBase.GetCurrentMethod().Name}] Could not delete heartbeat for task={this._taskId}, {e.Message}";
this._logger.LogError(message);
throw new Exception(message, e);
}
finally
{
this._taskId = null;
}
}

public void Send(object? sender, ElapsedEventArgs elapsedEventArgs)
Expand All @@ -66,7 +87,6 @@ public void Send(object? sender, ElapsedEventArgs elapsedEventArgs)
this._logger.LogError($"[{MethodBase.GetCurrentMethod().Name}] Could not send heartbeat for task={this._taskId}, {e.Message}");
throw;
}

}
}
}
}
10 changes: 5 additions & 5 deletions MergerService/Models/Jobs/JobMergeMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ public class JobMergeMetadata
[JsonInclude] public string[] FileNames { get; }
[JsonInclude] public string OriginDirectory { get; }
[JsonInclude] public string LayerRelativePath { get; }
[JsonInclude] public string? ManagerCallbackUrl { get; }
[JsonInclude] public AdditionalParams? AdditionalParams { get; }

[System.Text.Json.Serialization.JsonIgnore]
private JsonSerializerSettings _jsonSerializerSettings;

public JobMergeMetadata(JobMetadata metadata, string[] fileNames,string originDirectory, string layerRelativePath,
string managerCallbackUrl)
public JobMergeMetadata(JobMetadata metadata, string[] fileNames, string originDirectory, string layerRelativePath,
AdditionalParams additionalParams)
{
this.Metadata = metadata;
this.FileNames = fileNames;
this.OriginDirectory = originDirectory;
this.LayerRelativePath = layerRelativePath;
this.ManagerCallbackUrl = managerCallbackUrl;
this.AdditionalParams = additionalParams;

this._jsonSerializerSettings = new JsonSerializerSettings();
this._jsonSerializerSettings.Converters.Add(new StringEnumConverter());
Expand All @@ -38,5 +38,5 @@ public override string ToString()
return JsonConvert.SerializeObject(this, this._jsonSerializerSettings)!;
}
}

}
23 changes: 23 additions & 0 deletions MergerService/Models/Jobs/JobParamersAdditiomalParams.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System.Text.Json.Serialization;

namespace MergerService.Models.Jobs
{
public class AdditionalParams
{
[JsonInclude] public string? JobTrackerServiceURL { get; }

[System.Text.Json.Serialization.JsonIgnore]
private JsonSerializerSettings _jsonSerializerSettings;

public AdditionalParams(string jobTrackerServiceURL)
{
this.JobTrackerServiceURL = jobTrackerServiceURL;

this._jsonSerializerSettings = new JsonSerializerSettings();
this._jsonSerializerSettings.Converters.Add(new StringEnumConverter());
}
}

}
2 changes: 1 addition & 1 deletion MergerService/Runners/TaskRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public bool RunTask(MergeTask? task)
}

this._logger.LogInformation($"[{methodName}] Run Task: jobId {task.JobId}, taskId {task.Id}");
string? managerCallbackUrl = this._jobUtils.GetJob(task.JobId)?.Parameters.ManagerCallbackUrl;
string? managerCallbackUrl = this._jobUtils.GetJob(task.JobId)?.Parameters.AdditionalParams?.JobTrackerServiceURL;
string log = managerCallbackUrl == null ? "managerCallbackUrl not provided as job parameter" : $"managerCallback url: {managerCallbackUrl}";
this._logger.LogDebug($"[{methodName}]{log}");

Expand Down
10 changes: 5 additions & 5 deletions MergerService/Utils/TaskUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public TaskUtils(MergerLogic.Utils.IConfigurationManager configuration, IHttpReq
}
}

private void NotifyOnStatusChange(string jobId, string taskId, string managerCallbackUrl)
private void NotifyOnStatusChange(string taskId, string managerCallbackUrl)
{
using (this._activitySource.StartActivity("notify Manager on task completion"))
{
// Notify Manager on task completion
this._logger.LogInformation($"[{MethodBase.GetCurrentMethod().Name}] Notifying Manager on completion, job: {jobId}, task: {taskId}");
string relativeUri = $"jobs/{jobId}/{taskId}/completed";
this._logger.LogInformation($"[{MethodBase.GetCurrentMethod().Name}] Notifying Manager on completion, task: {taskId}");
string relativeUri = $"tasks/{taskId}/notify";
string url = new Uri(new Uri(managerCallbackUrl), relativeUri).ToString();
_ = this._httpClient.PostData(url, null);
}
Expand Down Expand Up @@ -116,7 +116,7 @@ public void UpdateCompletion(string jobId, string taskId, string? managerCallbac
if (managerCallbackUrl is not null)
{
// Update Manager on task completion
NotifyOnStatusChange(jobId, taskId, managerCallbackUrl);
NotifyOnStatusChange(taskId, managerCallbackUrl);
}
}

Expand Down Expand Up @@ -160,7 +160,7 @@ private void UpdateFailed(string jobId, string taskId, int attempts, string reas
if (managerCallbackUrl is not null)
{
// Notify Manager on task failure
NotifyOnStatusChange(jobId, taskId, managerCallbackUrl);
NotifyOnStatusChange(taskId, managerCallbackUrl);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ apiVersion: v2
name: gpkg-merger
description: A Helm chart for gpkg-merger service # refers to MergerService
type: application
version: 1.4.1
appVersion: 1.4.1
version: 2.0.0
appVersion: 2.0.0
Loading