Skip to content

Commit

Permalink
Handle case, when the same continuation was added multiple times
Browse files Browse the repository at this point in the history
This may happen, because continuation is added outside of an outer
transaction, when AwaitingState is applied. If an outer transaction was failed,
when continuation has added, and retry was performed, the same continuation
may be added several times.
  • Loading branch information
odinserj committed Mar 13, 2017
1 parent f4b1266 commit 44ec05d
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions src/Hangfire.Core/ContinuationsSupportAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,19 @@ private void AddContinuation(ElectStateContext context, AwaitingState awaitingSt
using (connection.AcquireDistributedJobLock(parentId, AddJobLockTimeout))
{
var continuations = GetContinuations(connection, parentId);
continuations.Add(new Continuation { JobId = context.BackgroundJob.Id, Options = awaitingState.Options });

// Continuation may be already added. This may happen, when outer transaction
// was failed after adding a continuation last time, since the addition is
// performed outside of an outer transaction.
if (!continuations.Exists(x => x.JobId == context.BackgroundJob.Id))
{
continuations.Add(new Continuation { JobId = context.BackgroundJob.Id, Options = awaitingState.Options });

// Set continuation only after ensuring that parent job still
// exists. Otherwise we could create add non-expiring (garbage)
// parameter for the parent job.
SetContinuations(connection, parentId, continuations);
}

var jobData = connection.GetJobData(parentId);
if (jobData == null)
Expand All @@ -111,11 +123,6 @@ private void AddContinuation(ElectStateContext context, AwaitingState awaitingSt

var currentState = connection.GetStateData(parentId);

// Set continuation only after ensuring that parent job still
// exists. Otherwise we could create add non-expiring (garbage)
// parameter for the parent job.
SetContinuations(connection, parentId, continuations);

if (currentState != null && _knownFinalStates.Contains(currentState.Name))
{
var startImmediately = !awaitingState.Options.HasFlag(JobContinuationOptions.OnlyOnSucceededState) ||
Expand Down Expand Up @@ -151,30 +158,37 @@ private void ExecuteContinuationsIfExist(ElectStateContext context)
// the state of a continuation, we should simply skip it.
if (currentState.Name != AwaitingState.StateName) continue;

if (continuation.Options.HasFlag(JobContinuationOptions.OnlyOnSucceededState) &&
context.CandidateState.Name != SucceededState.StateName)
{
nextStates.Add(continuation.JobId, new DeletedState { Reason = "Continuation condition was not met" });
continue;
}

IState nextState;

try
if (continuation.Options.HasFlag(JobContinuationOptions.OnlyOnSucceededState) &&
context.CandidateState.Name != SucceededState.StateName)
{
nextState = JsonConvert.DeserializeObject<IState>(
currentState.Data["NextState"],
new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects });
nextState = new DeletedState { Reason = "Continuation condition was not met" };
}
catch (Exception ex)
else
{
nextState = new FailedState(ex)
try
{
nextState = JsonConvert.DeserializeObject<IState>(
currentState.Data["NextState"],
new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects });
}
catch (Exception ex)
{
Reason = "An error occurred while deserializing the continuation"
};
nextState = new FailedState(ex)
{
Reason = "An error occurred while deserializing the continuation"
};
}
}

nextStates.Add(continuation.JobId, nextState);
if (!nextStates.ContainsKey(continuation.JobId))
{
// Duplicate continuations possible, when they were added before version 1.6.10.
// Please see details in comments for the AddContinuation method near the line
// with checking for existence (continuations.Exists).
nextStates.Add(continuation.JobId, nextState);
}
}

foreach (var tuple in nextStates)
Expand Down

0 comments on commit 44ec05d

Please sign in to comment.