Skip to content

Commit

Permalink
PR 81903: Implement IPC between Worker and Agent
Browse files Browse the repository at this point in the history
- Implement IPC between Worker and Agent:- Add WorkerManager, Worker classes
  • Loading branch information
Stan Iliev committed Feb 23, 2016
2 parents 485c286 + ad8d1e7 commit 15cf059
Show file tree
Hide file tree
Showing 22 changed files with 1,041 additions and 203 deletions.
25 changes: 0 additions & 25 deletions src/Agent.Listener/MessageDispatcher.cs

This file was deleted.

131 changes: 69 additions & 62 deletions src/Agent.Listener/MessageListener.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Microsoft.VisualStudio.Services.Agent.Configuration;
Expand Down Expand Up @@ -77,82 +76,90 @@ public async Task<Boolean> CreateSessionAsync()
}

return false;
}
}

public async Task ListenAsync()
{
if (Session == null)
{
throw new InvalidOperationException("Must create a session before listening");
}

Debug.Assert(_settings != null, "settings should not be null");

var dispatcher = HostContext.GetService<IMessageDispatcher>();
Debug.Assert(_settings != null, "settings should not be null");
var taskServer = HostContext.GetService<ITaskServer>();

long? lastMessageId = null;
while (true)
//TODO: Interaction with the WorkerManager is the responsibility of the caller. Listener just returns the message.
using (var workerManager = HostContext.GetService<IWorkerManager>())
{
TaskAgentMessage message = null;
try
{
message = await taskServer.GetAgentMessageAsync(_settings.PoolId,
Session.SessionId,
lastMessageId,
HostContext.CancellationToken);
}
catch (TimeoutException)
{
Trace.Verbose("MessageListener.Listen - TimeoutException received.");
}
catch (TaskCanceledException)
{
Trace.Verbose("MessageListener.Listen - TaskCanceledException received.");
}
catch (TaskAgentSessionExpiredException)
{
Trace.Verbose("MessageListener.Listen - TaskAgentSessionExpiredException received.");
// TODO: Throw a specific exception so the caller can control the flow appropriately.
return;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
Trace.Verbose("MessageListener.Listen - Exception received.");
Trace.Error(ex);
// TODO: Throw a specific exception so the caller can control the flow appropriately.
return;
}

if (message == null)
{
Trace.Verbose("MessageListener.Listen - No message retrieved from session '{0}'.", this.Session.SessionId);
continue;
}

Trace.Verbose("MessageListener.Listen - Message '{0}' received from session '{1}'.", message.MessageId, this.Session.SessionId);
try
long? lastMessageId = null;
while (true)
{
// Check if refresh is required.
if (String.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
TaskAgentMessage message = null;
try
{
// Throw a specific exception so the caller can control the flow appropriately.
message = await taskServer.GetAgentMessageAsync(_settings.PoolId,
Session.SessionId,
lastMessageId,
HostContext.CancellationToken);
}
catch (TimeoutException)
{
Trace.Verbose("MessageListener.Listen - TimeoutException received.");
}
catch (TaskCanceledException)
{
Trace.Verbose("MessageListener.Listen - TaskCanceledException received.");
}
catch (TaskAgentSessionExpiredException)
{
Trace.Verbose("MessageListener.Listen - TaskAgentSessionExpiredException received.");
// TODO: Throw a specific exception so the caller can control the flow appropriately.
return;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
Trace.Warning("MessageListener.Listen - Exception received.");
Trace.Error(ex);
// TODO: Throw a specific exception so the caller can control the flow appropriately.
return;
}

if (message == null)
{
Trace.Verbose("MessageListener.Listen - No message retrieved from session '{0}'.", this.Session.SessionId);
continue;
}

dispatcher.Dispatch(message);
}
finally
{
lastMessageId = message.MessageId;
await taskServer.DeleteAgentMessageAsync(_settings.PoolId,
lastMessageId.Value,
Session.SessionId,
HostContext.CancellationToken);
Trace.Verbose("MessageListener.Listen - Message '{0}' received from session '{1}'.", message.MessageId, this.Session.SessionId);
try
{
// Check if refresh is required.
if (String.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
Trace.Warning("Referesh message received, but not yet handled by agent implementation.");
}
else if (String.Equals(message.MessageType, JobRequestMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
var newJobMessage = JsonUtility.FromString<JobRequestMessage>(message.Body);
await workerManager.Run(newJobMessage);
}
else if (String.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
var cancelJobMessage = JsonUtility.FromString<JobCancelMessage>(message.Body);
await workerManager.Cancel(cancelJobMessage);
}
}
finally
{
lastMessageId = message.MessageId;
await taskServer.DeleteAgentMessageAsync(_settings.PoolId,
lastMessageId.Value,
Session.SessionId,
HostContext.CancellationToken);
}
}
}
}
Expand Down
27 changes: 8 additions & 19 deletions src/Agent.Listener/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static Int32 Main(String[] args)
Int32 rc = 0;
try
{
rc = ExecuteCommand(context, parser).Result;
rc = ExecuteCommand(context, parser).Result;
}
catch (Exception e)
{
Expand Down Expand Up @@ -108,30 +108,19 @@ private static async Task<Int32> ExecuteCommand(HostContext context, CommandLine
//_trace.Info("Worker.exe Exit: {0}", exitCode);

ICredentialProvider cred = configManager.AcquireCredentials(parser.Args, isUnattended);
return RunAsync(context).Result;
return await RunAsync(context);
}

public static async Task<Int32> RunAsync(IHostContext context)
{
/*
try
{
var listener = context.GetService<IMessageListener>();
if (await listener.CreateSessionAsync())
{
await listener.ListenAsync();
}
await listener.DeleteSessionAsync();
}
catch (Exception)
{
var listener = context.GetService<IMessageListener>();
if (await listener.CreateSessionAsync())
{
// TODO: Log exception.
return 1;
await listener.ListenAsync();
}
*/

return 0;
await listener.DeleteSessionAsync();
return 0;
}

private static void PrintUsage()
Expand Down
112 changes: 112 additions & 0 deletions src/Agent.Listener/Worker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
using System;
using System.Collections.Generic;
using Microsoft.VisualStudio.Services.Agent.Util;
using System.IO;

namespace Microsoft.VisualStudio.Services.Agent.Listener
{
public enum WorkerState
{
New,
Starting,
Finished,
}

[ServiceLocator(Default = typeof(Worker))]
public interface IWorker : IDisposable, IAgentService
{
event EventHandler StateChanged;
Guid JobId { get; set; }
IProcessChannel ProcessChannel { get; set; }
//TODO: instead of LaunchProcess, do something like Task RunAsync(...) and make sure you take a cancellation token. The way the IWorkerManager can handle cancelling the worker is to simply signal the cancellation token that it handed to the IWorker.RunAsync method.
void LaunchProcess(String pipeHandleOut, String pipeHandleIn, string workingFolder);
}

public class Worker : AgentService, IWorker
{
#if OS_WINDOWS
private const String WorkerProcessName = "Agent.Worker.exe";
#else
private const String WorkerProcessName = "Agent.Worker";
#endif

public event EventHandler StateChanged;
public Guid JobId { get; set; }
public IProcessChannel ProcessChannel { get; set; }
private IProcessInvoker _processInvoker;
private WorkerState _state;
public WorkerState State
{
get
{
return _state;
}
private set
{
if (value != _state)
{
_state = value;
if (null != StateChanged)
{
StateChanged(this, null);
}
}
}
}
public Worker()
{
State = WorkerState.New;
}

public void LaunchProcess(String pipeHandleOut, String pipeHandleIn, string workingFolder)
{
string workerFileName = Path.Combine(AssemblyUtil.AssemblyDirectory, WorkerProcessName);
_processInvoker = HostContext.GetService<IProcessInvoker>();
_processInvoker.Exited += _processInvoker_Exited;
State = WorkerState.Starting;
var environmentVariables = new Dictionary<String, String>();
_processInvoker.Execute(workingFolder, workerFileName, "spawnclient " + pipeHandleOut + " " + pipeHandleIn,
environmentVariables);
}

private void _processInvoker_Exited(object sender, EventArgs e)
{
_processInvoker.Exited -= _processInvoker_Exited;
if (null != ProcessChannel)
{
ProcessChannel.Dispose();
ProcessChannel = null;
}
State = WorkerState.Finished;
}

#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls

protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (null != ProcessChannel)
{
ProcessChannel.Dispose();
ProcessChannel = null;
}
if (null != _processInvoker)
{
_processInvoker.Dispose();
_processInvoker = null;
}
disposedValue = true;
}
}

// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
}
#endregion
}
}
Loading

0 comments on commit 15cf059

Please sign in to comment.