Skip to content

Commit

Permalink
Fix issue where Persist failures were being reported as rejects. (#29)
Browse files Browse the repository at this point in the history
* Fix issue where Persist failures were being reported as rejects.

Use an enum instead of a class on flowcontrol.

* fix syntax that makes build server angry
  • Loading branch information
to11mtm authored Aug 31, 2021
1 parent 644f270 commit 61fb205
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Immutable;
using System.Data;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -138,21 +139,18 @@ private async Task WriteJournalRows(Seq<JournalRow> xs)
{
//hot path:
//If we only have one row, penalty for BulkCopy
//Isn't worth it due to insert caching/etc.
if (xs.Count > 1)
{
//Isn't worth it due to insert caching/transaction/etc.
var count = xs.Count;
if (count > 1)
await InsertMultiple(xs);
}
else if (xs.Count > 0)
{
await InsertSingle(xs);
}
else if (count == 1) await InsertSingle(xs);
}

}

private async Task InsertSingle(Seq<JournalRow> xs)
{
//If we are writing a single row,
//we don't need to worry about transactions.
using (var db = _connectionFactory.GetConnection())
{
await db.InsertAsync(xs.Head);
Expand All @@ -176,9 +174,6 @@ await db.GetTable<JournalRow>()
.MaxRowByRowSize
? BulkCopyType.Default
: BulkCopyType.MultipleRows,
//TODO: When Parameters are allowed,
//Make a Config Option
//Or default to true
UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert,
MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize
}, xs);
Expand All @@ -200,45 +195,61 @@ await db.GetTable<JournalRow>()
}
}

public async Task<IImmutableList<Exception>> AsyncWriteMessages(
IEnumerable<AtomicWrite> messages, long timeStamp = 0)
{
var serializedTries = Serializer.Serialize(messages, timeStamp);

//Just a little bit of magic here;
//.ToList() keeps it all working later for whatever reason
//while still keeping our allocations in check.
//By using a custom flatten here, we avoid an Enumerable/LINQ allocation
//And are able to have a little more control over default capacity of array.
static List<JournalRow> FlattenListOfListsToList(List<Akka.Util.Try<List<JournalRow>>> source) {

/*var trySet = new List<JournalRow>();
foreach (var serializedTry in serializedTries)
//List<JournalRow> ResultSet(
// Akka.Util.Try<List<JournalRow>> item)
//{
// return item.Success.GetOrElse(new List<JournalRow>(0));
//}

List<JournalRow> rows = new List<JournalRow>(source.Count > 4 ? source.Count:4);
for (var index = 0; index < source.Count; index++)
{
trySet.AddRange(serializedTry.Success.GetOrElse(new List<JournalRow>(0)));
var item = source[index].Success.Value;
if (item != null)
{
rows.AddRange(item);
}
//rows.AddRange(ResultSet(source[index]));
}

var rows = Seq(trySet);*/
var rows = Seq(serializedTries.SelectMany(serializedTry =>
serializedTry.Success.GetOrElse(new List<JournalRow>(0)))
.ToList());
//
return rows;
}

public async Task<IImmutableList<Exception>> AsyncWriteMessages(
IEnumerable<AtomicWrite> messages, long timeStamp = 0)
{
var serializedTries = Serializer.Serialize(messages, timeStamp);

return await QueueWriteJournalRows(rows).ContinueWith(task =>
{
//We actually are trying to interleave our tasks here...
//Basically, if serialization failed our task will likely
//Show success
//But we instead should display the serialization failure
return serializedTries.Select(r =>
r.IsSuccess
? (task.IsFaulted
? TryUnwrapException(task.Exception)
: null)
: r.Failure.Value).ToImmutableList();
}, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
//Fold our List of Lists into a single sequence
var rows = Seq(FlattenListOfListsToList(serializedTries));
//Wait for the write to go through. If Task fails, write will be captured
//As WriteMessagesFailure.
await QueueWriteJournalRows(rows);
//If we get here, we build an ImmutableList containing our rejections.
//These will be captured as WriteMessagesRejected
return BaseByteArrayJournalDao
.BuildWriteRejections(serializedTries);
}

protected static ImmutableList<Exception> BuildWriteRejections(
List<Akka.Util.Try<List<JournalRow>>> serializedTries)
{
Exception[] builderEx =
new Exception[serializedTries.Count];
for (int i = 0; i < serializedTries.Count; i++)
{
builderEx[i] = (serializedTries[i].Failure.Value);
}
return ImmutableList.CreateRange<Exception>(builderEx);
}
protected static ImmutableList<Exception> FailWriteThrowHelper(Exception e)
{
throw TryUnwrapException(e);
}
protected static Exception TryUnwrapException(Exception e)
{
var aggregateException = e as AggregateException;
Expand Down Expand Up @@ -346,18 +357,31 @@ protected IQueryable<long> MaxMarkedForDeletionMaxPersistenceIdQuery(DataConnect
.OrderByDescending(r => r.sequenceNumber)
.Select(r => r.sequenceNumber).Take(1);
}

static readonly Expression<Func<JournalMetaData, PersistenceIdAndSequenceNumber>> metaDataSelector =

md =>
new PersistenceIdAndSequenceNumber()
{
SequenceNumber = md.SequenceNumber,
PersistenceId = md.PersistenceId
};

static readonly Expression<Func<JournalRow, PersistenceIdAndSequenceNumber>> rowDataSelector =
md =>
new PersistenceIdAndSequenceNumber()
{
SequenceNumber = md.sequenceNumber,
PersistenceId = md.persistenceId
};
private IQueryable<long> MaxSeqNumberForPersistenceIdQuery(
DataConnection db, string persistenceId, long minSequenceNumber = 0)
{



var queryable = db.GetTable<JournalRow>()
.Where(r => r.persistenceId == persistenceId).Select(r =>
new
{
SequenceNumber = r.sequenceNumber,
PersistenceId = r.persistenceId
});
.Where(r => r.persistenceId == persistenceId).Select(rowDataSelector);
if (minSequenceNumber != 0)
{
queryable = queryable.Where(r =>
Expand All @@ -370,18 +394,19 @@ private IQueryable<long> MaxSeqNumberForPersistenceIdQuery(
.Where(r =>
r.SequenceNumber > minSequenceNumber &&
r.PersistenceId == persistenceId);
queryable = queryable.Union(nextQuery.Select(md =>
new
{
SequenceNumber = md.SequenceNumber,
PersistenceId = md.PersistenceId
}));
queryable = queryable.Union(nextQuery.Select(metaDataSelector));
}

return queryable.OrderByDescending(r => r.SequenceNumber)
.Select(r => r.SequenceNumber).Take(1);
return queryable.OrderByDescending(sequenceNumberSelector)
.Select(sequenceNumberSelector).Take(1);
}

private static readonly
Expression<Func<PersistenceIdAndSequenceNumber, long>>
sequenceNumberSelector =
r => r.SequenceNumber;


public async Task<Done> Update(string persistenceId, long sequenceNr,
object payload)
{
Expand Down Expand Up @@ -477,4 +502,10 @@ public override
}
};
}

public class PersistenceIdAndSequenceNumber
{
public long SequenceNumber { get; set; }
public string PersistenceId { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,91 +33,90 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec,
public Source<Util.Try<ReplayCompletion>, NotUsed> MessagesWithBatch(string persistenceId, long fromSequenceNr,
long toSequenceNr, int batchSize, Util.Option<(TimeSpan,IScheduler)> refreshInterval)
{
var src = Source
.UnfoldAsync<(long, FlowControl),
return Source
.UnfoldAsync<(long, FlowControlEnum),
Seq<Util.Try<ReplayCompletion>>>(
(Math.Max(1, fromSequenceNr),
FlowControl.Continue.Instance),
FlowControlEnum.Continue),
async opt =>
{
async Task<Util.Option<((long, FlowControl), Seq<Util.Try<ReplayCompletion>>)>>
async Task<Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)>>
RetrieveNextBatch()
{
Seq<
Util.Try<ReplayCompletion>> msg;
using (var conn =
_connectionFactory.GetConnection())
{
var waited = Messages(conn, persistenceId,
opt.Item1,
toSequenceNr, batchSize);
msg = await waited
msg = await Messages(conn, persistenceId,
opt.Item1,
toSequenceNr, batchSize)
.RunWith(
ExtSeq.Seq<Util.Try<ReplayCompletion>>(), mat);
}
var hasMoreEvents = msg.Count == batchSize;
var lastMsg = msg.LastOrDefault();
//var lastMsg = msg.IsEmpty.LastOrDefault();
Util.Option<long> lastSeq = Util.Option<long>.None;
if (lastMsg != null && lastMsg.IsSuccess)
if (msg.IsEmpty == false)
{
lastSeq = lastMsg.Success.Select(r => r.Repr.SequenceNr);
}
else if (lastMsg != null && lastMsg.Failure.HasValue)
{
throw lastMsg.Failure.Value;
lastSeq = msg.Last.Get().Repr.SequenceNr;
}
var hasLastEvent =
lastSeq.HasValue &&
lastSeq.Value >= toSequenceNr;
FlowControl nextControl = null;
if (hasLastEvent || opt.Item1 > toSequenceNr)
FlowControlEnum nextControl = FlowControlEnum.Unknown;
if ((lastSeq.HasValue &&
lastSeq.Value >= toSequenceNr) || opt.Item1 > toSequenceNr)
{
nextControl = FlowControl.Stop.Instance;
nextControl = FlowControlEnum.Stop;
}
else if (hasMoreEvents)
{
nextControl = FlowControl.Continue.Instance;
nextControl = FlowControlEnum.Continue;
}
else if (refreshInterval.HasValue == false)
{
nextControl = FlowControl.Stop.Instance;
nextControl = FlowControlEnum.Stop;
}
else
{
nextControl = FlowControl.ContinueDelayed
.Instance;
nextControl = FlowControlEnum.ContinueDelayed;
}
long nextFrom = 0;
long nextFrom = opt.Item1;
if (lastSeq.HasValue)
{
nextFrom = lastSeq.Value + 1;
}
else
{
nextFrom = opt.Item1;
}
return new Util.Option<((long, FlowControl), Seq<Util.Try<ReplayCompletion>>)>((
return new Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)>((
(nextFrom, nextControl), msg));
}
switch (opt.Item2)
{
case FlowControl.Stop _:
return Util.Option<((long, FlowControl), Seq<Util.Try<ReplayCompletion>>)>.None;
case FlowControl.Continue _:
case FlowControlEnum.Stop:
return Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)>.None;
case FlowControlEnum.Continue:
return await RetrieveNextBatch();
case FlowControl.ContinueDelayed _ when refreshInterval.HasValue:
case FlowControlEnum.ContinueDelayed when refreshInterval.HasValue:
return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2, RetrieveNextBatch);
default:
throw new Exception($"Got invalid FlowControl from Queue! Type : {opt.Item2.GetType()}");
return InvalidFlowThrowHelper(opt);
}
});
}).SelectMany(r => r);;
}

return src.SelectMany(r => r);
private static Util.Option<long> MessagesWithBatchThrowHelper(Util.Try<ReplayCompletion> lastMsg)
{
throw lastMsg.Failure.Value;
}

private static Util.Option<((long, FlowControlEnum), Seq<Util.Try<ReplayCompletion>>)> InvalidFlowThrowHelper((long, FlowControlEnum) opt)
{
throw new Exception(
$"Got invalid FlowControl from Queue! Type : {opt.Item2.ToString()}");
}
}
}
32 changes: 5 additions & 27 deletions src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/FlowControl.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,10 @@
namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO
{
public class FlowControl
public enum FlowControlEnum
{
public class Continue : FlowControl
{
private Continue()
{
}

public static Continue Instance = new Continue();
}

public class ContinueDelayed : FlowControl
{
private ContinueDelayed()
{
}

public static ContinueDelayed Instance = new ContinueDelayed();
}

public class Stop : FlowControl
{
private Stop()
{
}

public static Stop Instance = new Stop();
}
Unknown,
Continue,
Stop,
ContinueDelayed
}
}
Loading

0 comments on commit 61fb205

Please sign in to comment.