Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: correlation test fail except for external event with extended session #430

Merged
merged 7 commits into from
Nov 2, 2020
16 changes: 13 additions & 3 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
() =>
{
var isReplaying = session.RuntimeState.ExecutionStartedEvent?.IsPlayed ?? false;
TraceContextBase parentTraceContext = GetParentTraceContext(session.CurrentMessageBatch);
TraceContextBase parentTraceContext = GetParentTraceContext(session);
currentRequestTraceContext = GetRequestTraceContext(isReplaying, parentTraceContext);
});

Expand Down Expand Up @@ -763,8 +763,9 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
}
}

static TraceContextBase GetParentTraceContext(IList<MessageData> messages)
TraceContextBase GetParentTraceContext(OrchestrationSession session)
{
var messages = session.CurrentMessageBatch;
TraceContextBase parentTraceContext = null;
foreach(var message in messages)
{
Expand Down Expand Up @@ -803,7 +804,16 @@ static TraceContextBase GetParentTraceContext(IList<MessageData> messages)

break;
}
}
} else
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
{
if (message.TaskMessage.Event is EventRaisedEvent)
{
var history = session.RuntimeState.Events;

var traceContextString = history.Where(p => p.EventType == EventType.ExecutionStarted).Select(p => (ExecutionStartedEvent)p).FirstOrDefault().Correlation;
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
parentTraceContext = TraceContextBase.Restore(traceContextString);
}
}
}

return parentTraceContext ?? TraceContextFactory.Empty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class AzureTableTrackingStore : TrackingStoreBase
OutputProperty,
"Reason",
"Details",
"Correlation"
};

readonly string storageAccountName;
Expand Down Expand Up @@ -946,7 +947,7 @@ public override async Task<string> UpdateStateAsync(
["LastUpdatedTime"] = new EntityProperty(newEvents.Last().Timestamp),
}
};

for (int i = 0; i < newEvents.Count; i++)
{
bool isFinalEvent = i == newEvents.Count - 1;
Expand Down Expand Up @@ -980,6 +981,12 @@ public override async Task<string> UpdateStateAsync(
instanceEntity.Properties["Version"] = new EntityProperty(executionStartedEvent.Version);
instanceEntity.Properties["CreatedTime"] = new EntityProperty(executionStartedEvent.Timestamp);
instanceEntity.Properties["RuntimeStatus"] = new EntityProperty(OrchestrationStatus.Running.ToString());
CorrelationTraceClient.Propagate(() =>
{
historyEntity.Properties["Correlation"] = new EntityProperty(executionStartedEvent.Correlation);
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
estimatedBytes += System.Text.ASCIIEncoding.ASCII.GetByteCount(executionStartedEvent.Correlation);
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
});

this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
Expand Down
19 changes: 19 additions & 0 deletions src/DurableTask.Core/CorrelationTraceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace DurableTask.Core
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using DurableTask.Core.Settings;

/// <summary>
Expand Down Expand Up @@ -113,6 +114,24 @@ public static void Propagate(Action action)
Execute(action);
}

/// <summary>
/// Execute Aysnc Function for propagete correlation information
/// It suppresses the execution when <see cref="CorrelationSettings"/>.DisablePropagation is true.
/// </summary>
/// <param name="func"></param>
/// <returns></returns>
public static Task PropagateAsync(Func<Task> func)
{
if (CorrelationSettings.Current.EnableDistributedTracing)
{
return func();
}
else
{
return Task.CompletedTask;
}
}

static void Tracking(Action tracking)
{
Execute(tracking);
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.Core/History/ExecutionStartedEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,11 @@ internal ExecutionStartedEvent()
/// </summary>
[DataMember]
public IDictionary<string, string> Tags { get; set; }

/// <summary>
/// Gets or sets the serialized end-to-end correlation state.
/// </summary>
[DataMember]
public string Correlation { get; set; }
}
}
1 change: 1 addition & 0 deletions src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ protected HistoryEvent(int eventId)
/// Implementation for <see cref="IExtensibleDataObject.ExtensionData"/>.
/// </summary>
public ExtensionDataObject ExtensionData { get; set; }

}
}
4 changes: 3 additions & 1 deletion src/DurableTask.Core/TaskHubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ async Task<OrchestrationInstance> InternalCreateOrchestrationInstanceWithRaisedE
};

this.logHelper.SchedulingOrchestration(startedEvent);

CorrelationTraceClient.Propagate(() => CreateAndTrackDependencyTelemetry(requestTraceContext));

// Raised events and create orchestration calls use different methods so get handled separately
await this.ServiceClient.CreateTaskOrchestrationAsync(startMessage, dedupeStatuses);
Expand All @@ -559,10 +561,10 @@ async Task<OrchestrationInstance> InternalCreateOrchestrationInstanceWithRaisedE
},
Event = eventRaisedEvent,
};

await this.ServiceClient.SendTaskOrchestrationMessageAsync(eventMessage);
}

CorrelationTraceClient.Propagate(() => CreateAndTrackDependencyTelemetry(requestTraceContext));

return orchestrationInstance;
}
Expand Down
12 changes: 12 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,13 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
}
}

// correlation
CorrelationTraceClient.Propagate(() => {
if (runtimeState.ExecutionStartedEvent != null)
runtimeState.ExecutionStartedEvent.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved


// finish up processing of the work item
if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
{
Expand All @@ -416,6 +423,11 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
workItem.InstanceId,
"Updating state for continuation");

// correlation
CorrelationTraceClient.Propagate(() => {
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
continueAsNewExecutionStarted.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});

runtimeState = new OrchestrationRuntimeState();
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(continueAsNewExecutionStarted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace DurableTask.AzureStorage.Tests.Correlation
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.DataContracts;
using Microsoft.ApplicationInsights.Extensibility.Implementation;
using Microsoft.ApplicationInsights.W3C;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;

Expand Down Expand Up @@ -519,9 +518,10 @@ public async Task MultipleParentScenarioAsync(Protocol protocol, bool enableExte

while (IsNotReadyForRaiseEvent(host.Client))
{
await Task.Delay(TimeSpan.FromMilliseconds(300));
await Task.Delay(TimeSpan.FromMilliseconds(100));
}

await Task.Delay(TimeSpan.FromSeconds(1));
tasks.Add(host.Client.RaiseEventAsync("someEvent", "hi"));
await Task.WhenAll(tasks);

Expand Down Expand Up @@ -575,6 +575,146 @@ internal static void Reset()
}
}

[DataTestMethod]
[DataRow(Protocol.W3CTraceContext, false)]
[DataRow(Protocol.HttpCorrelationProtocol, false)]
[DataRow(Protocol.W3CTraceContext, true)]
[DataRow(Protocol.HttpCorrelationProtocol, true)]
public async Task MultipleParentMultiLayerScenarioAsync(Protocol protocol, bool enableExtendedSessions)
{
MultiParentOrchestrator.Reset();
CorrelationSettings.Current.Protocol = protocol;
CorrelationSettings.Current.EnableDistributedTracing = true;
var host = new TestCorrelationOrchestrationHost();
var tasks = new List<Task>();
tasks.Add(host.ExecuteOrchestrationAsync(typeof(MultiParentMultiLayeredOrchestrator), "world", 30, enableExtendedSessions));

while (IsNotReadyForTwoRaiseEvents(host.Client))
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
}

await Task.Delay(TimeSpan.FromSeconds(1));
foreach(string instanceId in MultiParentChildOrchestrator.InstanceIds)
{
tasks.Add(host.Client.RaiseEventAsync(instanceId, "someEvent", "hi"));
}
await Task.WhenAll(tasks);

List<OperationTelemetry> actual = Convert(tasks[0]);

Assert.AreEqual(11, actual.Count);
CollectionAssert.AreEqual(
new (Type, string)[]
{
(typeof(RequestTelemetry), TraceConstants.Client),
(typeof(DependencyTelemetry), TraceConstants.Client),
(typeof(RequestTelemetry), $"{TraceConstants.Orchestrator} MultiParentMultiLayeredOrchestrator"),
(typeof(DependencyTelemetry), $"{TraceConstants.Orchestrator} {typeof(MultiParentChildOrchestrator).FullName}"),
(typeof(RequestTelemetry), $"{TraceConstants.Orchestrator} MultiParentChildOrchestrator"),
(typeof(DependencyTelemetry), $"{TraceConstants.Orchestrator} {typeof(Hello).FullName}"),
(typeof(RequestTelemetry), $"{TraceConstants.Activity} Hello"),
(typeof(DependencyTelemetry), $"{TraceConstants.Orchestrator} {typeof(MultiParentChildOrchestrator).FullName}"),
(typeof(RequestTelemetry), $"{TraceConstants.Orchestrator} MultiParentChildOrchestrator"),
(typeof(DependencyTelemetry), $"{TraceConstants.Orchestrator} {typeof(Hello).FullName}"),
(typeof(RequestTelemetry), $"{TraceConstants.Activity} Hello")
}, actual.Select(x => (x.GetType(), x.Name)).ToList());
MultiParentChildOrchestrator.Reset();

}

bool IsNotReadyForTwoRaiseEvents(TestOrchestrationClient client)
{
return client == null || !(MultiParentChildOrchestrator.ReadyForExternalEvent == 2);
}

[KnownType(typeof(MultiParentChildOrchestrator))]
[KnownType(typeof(Hello))]
internal class MultiParentMultiLayeredOrchestrator : TaskOrchestration<string, string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
var tasks = new List<Task<string>>();
tasks.Add(context.CreateSubOrchestrationInstance<string>(typeof(MultiParentChildOrchestrator), "foo"));
tasks.Add(context.CreateSubOrchestrationInstance<string>(typeof(MultiParentChildOrchestrator), "bar"));
await Task.WhenAll(tasks);
return $"{tasks[0].Result}:{tasks[1].Result}";
}
}
[KnownType(typeof(Hello))]
internal class MultiParentChildOrchestrator : TaskOrchestration<string, string>
{
private static object lockExternalEvent = new object();
private static object lockId = new object();
private static int readyCountForExternalEvent = 0;
private static List<string> orchestrationIds = new List<string>();
public static int ReadyForExternalEvent
{
get
{
return readyCountForExternalEvent;
}

set
{
lock (lockExternalEvent)
{
readyCountForExternalEvent = value;
}
}
}
public static IEnumerable<string> InstanceIds
{
get
{
IEnumerable<string> result;
lock(lockId)
{
result = orchestrationIds.ToList<string>();
}
return result;
}
}

public static void AddOrchestrationId(string orchestrationId)
{
lock(lockId)
{
orchestrationIds.Add(orchestrationId);
}
}

public static void IncrementReadyForExternalEvent()
{
lock (lockExternalEvent)
{
readyCountForExternalEvent++;
}
}

readonly TaskCompletionSource<object> receiveEvent = new TaskCompletionSource<object>();

public async override Task<string> RunTask(OrchestrationContext context, string input)
{
AddOrchestrationId(context.OrchestrationInstance.InstanceId);
IncrementReadyForExternalEvent();
await this.receiveEvent.Task;
await context.ScheduleTask<string>(typeof(Hello), input);
return "done";
}

public override void OnEvent(OrchestrationContext context, string name, string input)
{
this.receiveEvent.SetResult(null);
}

internal static void Reset()
{
ReadyForExternalEvent = 0;
orchestrationIds = new List<string>();
}
}

[DataTestMethod]
[DataRow(Protocol.W3CTraceContext, false)]
[DataRow(Protocol.HttpCorrelationProtocol, false)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ public Task RaiseEventAsync(string eventName, object eventData)
return this.client.RaiseEventAsync(instance, eventName, eventData);
}

public Task RaiseEventAsync(string instanceId, string eventName, object eventData)
{
Trace.TraceInformation($"Raising event to instance {instanceId} with name = {eventName}.");
var instance = new OrchestrationInstance { InstanceId = instanceId };
return this.client.RaiseEventAsync(instance, eventName, eventData);
}

public Task TerminateAsync(string reason)
{
Trace.TraceInformation($"Terminating instance {this.instanceId} with reason = {reason}.");
Expand Down