Skip to content

Commit

Permalink
Use a proper yield enumerator in `NumericRangeEntry
Browse files Browse the repository at this point in the history
  • Loading branch information
to11mtm committed May 2, 2021
1 parent 9b48301 commit 9251484
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -134,13 +135,38 @@ private async Task<NotUsed> QueueWriteJournalRows(Seq<JournalRow> xs)

private async Task WriteJournalRows(Seq<JournalRow> xs)
{
using (var db = _connectionFactory.GetConnection())
{
//hot path:
//If we only have one row, penalty for BulkCopy
//Isn't worth it due to insert caching/etc.
if (xs.Count > 1)
{
await InsertMultiple(xs);
}
else if (xs.Count > 0)
{
await InsertSingle(xs);
}
}

}

private async Task InsertSingle(Seq<JournalRow> xs)
{
using (var db = _connectionFactory.GetConnection())
{
await db.InsertAsync(xs.Head);
}
}

private async Task InsertMultiple(Seq<JournalRow> xs)
{
using (var db = _connectionFactory.GetConnection())
{
try
{
await db.BeginTransactionAsync(IsolationLevel
.ReadCommitted);
await db.GetTable<JournalRow>()
.BulkCopyAsync(
new BulkCopyOptions()
Expand All @@ -150,22 +176,32 @@ await db.GetTable<JournalRow>()
.MaxRowByRowSize
? BulkCopyType.Default
: BulkCopyType.MultipleRows,
UseInternalTransaction = true
//TODO: When Parameters are allowed,
//Make a Config Option
//Or default to true
//UseParameters = false
}, xs);
await db.CommitTransactionAsync();
}
else if (xs.Count > 0)
catch (Exception e)
{
await db.InsertAsync(xs.Head);
try
{
await db.RollbackTransactionAsync();
}
catch (Exception exception)
{
throw e;
}

throw;
}
}

}

public async Task<IImmutableList<Exception>> AsyncWriteMessages(
IEnumerable<AtomicWrite> messages, long timeStamp = 0)
{
new Either<Exception,List<JournalRow>>()
Either<Exception,System.Collections.Generic.List<JournalRow>>.Bottom
var serializedTries = Serializer.Serialize(messages, timeStamp);

//Just a little bit of magic here;
Expand Down
5 changes: 0 additions & 5 deletions src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ public JournalRow()
{

}

public JournalRow(long ordering)
{
this.ordering = ordering;
}
public long ordering { get; set; }

public long Timestamp { get; set; } = 0;
Expand Down
15 changes: 1 addition & 14 deletions src/Akka.Persistence.Sql.Linq2Db/Query/NumericRangeEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,10 @@ public bool InRange(long number)

public IEnumerable<long> ToEnumerable()
{
var itemCount = until - from;
List<long> returnList;
if (itemCount < Int32.MaxValue)
{
returnList = new List<long>();
}
else
{
returnList = new List<long>();
}

for (long i = from; i < until; i++)
{
returnList.Add(i);
yield return i;
}

return returnList;
}

public IEnumerator<long> GetEnumerator()
Expand Down

0 comments on commit 9251484

Please sign in to comment.