Skip to content

Commit

Permalink
Optimize tag query by avoiding multi-query (#223)
Browse files Browse the repository at this point in the history
* Optimize tag query, avoid multi-query

* Fix NRE bug
  • Loading branch information
Arkatufus authored Apr 18, 2023
1 parent 599346e commit 35eca99
Showing 1 changed file with 30 additions and 41 deletions.
71 changes: 30 additions & 41 deletions src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
using Akka.Streams.Dsl;
using Akka.Util;
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.Tools;
using TaskExtensions = LanguageExt.TaskExtensions;

namespace Akka.Persistence.Sql.Query.Dao
{
Expand Down Expand Up @@ -118,11 +115,7 @@ where lp.OrderingId > input.offset &&
orderby r.Ordering
select r;
var mainRows = await query.ToListAsync();
await AddTagDataFromTagTable(mainRows, connection);
return mainRows;
return await AddTagDataFromTagTable(query, connection);
})
.Via(_deserializeFlow),

Expand All @@ -141,21 +134,17 @@ public override Source<Try<ReplayCompletion>, NotUsed> Messages(
new { connection, persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max) },
async state =>
{
var mainRows = await connection
var query = connection
.GetTable<JournalRow>()
.Where(r =>
r.PersistenceId == state.persistenceId &&
r.SequenceNumber >= state.fromSequenceNr &&
r.SequenceNumber <= state.toSequenceNr &&
r.Deleted == false)
.OrderBy(r => r.SequenceNumber)
.Take(state.toTake)
.ToListAsync();
.Take(state.toTake);
if (_readJournalConfig.PluginConfig.TagMode == TagMode.TagTable)
await AddTagDataFromTagTable(mainRows, connection);
return mainRows;
return await AddTagDataIfNeeded(query, connection);
})
.Via(_deserializeFlow)
.Select(
Expand Down Expand Up @@ -223,51 +212,51 @@ private static int MaxTake(long max)
{
await using var connection = input._connectionFactory.GetConnection();
var events = await connection
var query = connection
.GetTable<JournalRow>()
.Where(r =>
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(input.maxTake)
.ToListAsync();
.Take(input.maxTake);
return await AddTagDataIfNeeded(events, connection);
return await AddTagDataIfNeeded(query, connection);
}
).Via(_deserializeFlow);
}

private async Task<List<JournalRow>> AddTagDataIfNeeded(List<JournalRow> toAdd, AkkaDataConnection connection)
private async Task<List<JournalRow>> AddTagDataIfNeeded(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection)
{
if (_readJournalConfig.PluginConfig.TagMode == TagMode.TagTable)
await AddTagDataFromTagTable(toAdd, connection);

return toAdd;
if (_readJournalConfig.PluginConfig.TagMode != TagMode.TagTable)
return await rowQuery.ToListAsync();
return await AddTagDataFromTagTable(rowQuery, connection);
}

private static async Task AddTagDataFromTagTable(List<JournalRow> toAdd, AkkaDataConnection connection)
private static async Task<List<JournalRow>> AddTagDataFromTagTable(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection)
{
if (toAdd.Count == 0)
return;

var tagRows = await connection
.GetTable<JournalTagRow>()
.Where(r => r.OrderingId.In(toAdd.Select(row => row.Ordering).Distinct()))
.Select(r => new TagRow
var tagTable = connection.GetTable<JournalTagRow>();
var q =
from jr in rowQuery
select new
{
OrderingId = r.OrderingId,
TagValue = r.TagValue
})
.ToListAsync();
row = jr,
tags = tagTable
.Where(r => r.OrderingId == jr.Ordering)
.StringAggregate(";", r => r.TagValue)
.ToValue()
};

var res = await q.ToListAsync();

foreach (var journalRow in toAdd)
var result = new List<JournalRow>();
foreach (var row in res)
{
journalRow.TagArr = tagRows
.Where(r => r.OrderingId == journalRow.Ordering)
.Select(r => r.TagValue)
.ToArray();
row.row.TagArr = row.tags?.Split(';') ?? Array.Empty<string>();
result.Add(row.row);
}
return result;
}
}
}

0 comments on commit 35eca99

Please sign in to comment.