Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize tag query by avoiding multi-query #223

Merged
merged 2 commits into from
Apr 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 30 additions & 41 deletions src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
using Akka.Streams.Dsl;
using Akka.Util;
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.Tools;
using TaskExtensions = LanguageExt.TaskExtensions;

namespace Akka.Persistence.Sql.Query.Dao
{
Expand Down Expand Up @@ -118,11 +115,7 @@ where lp.OrderingId > input.offset &&
orderby r.Ordering
select r;

var mainRows = await query.ToListAsync();

await AddTagDataFromTagTable(mainRows, connection);

return mainRows;
return await AddTagDataFromTagTable(query, connection);
})
.Via(_deserializeFlow),

Expand All @@ -141,21 +134,17 @@ public override Source<Try<ReplayCompletion>, NotUsed> Messages(
new { connection, persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max) },
async state =>
{
var mainRows = await connection
var query = connection
.GetTable<JournalRow>()
.Where(r =>
r.PersistenceId == state.persistenceId &&
r.SequenceNumber >= state.fromSequenceNr &&
r.SequenceNumber <= state.toSequenceNr &&
r.Deleted == false)
.OrderBy(r => r.SequenceNumber)
.Take(state.toTake)
.ToListAsync();
.Take(state.toTake);

if (_readJournalConfig.PluginConfig.TagMode == TagMode.TagTable)
await AddTagDataFromTagTable(mainRows, connection);

return mainRows;
return await AddTagDataIfNeeded(query, connection);
})
.Via(_deserializeFlow)
.Select(
Expand Down Expand Up @@ -223,51 +212,51 @@ private static int MaxTake(long max)
{
await using var connection = input._connectionFactory.GetConnection();

var events = await connection
var query = connection
.GetTable<JournalRow>()
.Where(r =>
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(input.maxTake)
.ToListAsync();
.Take(input.maxTake);

return await AddTagDataIfNeeded(events, connection);
return await AddTagDataIfNeeded(query, connection);
}
).Via(_deserializeFlow);
}

private async Task<List<JournalRow>> AddTagDataIfNeeded(List<JournalRow> toAdd, AkkaDataConnection connection)
private async Task<List<JournalRow>> AddTagDataIfNeeded(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection)
{
if (_readJournalConfig.PluginConfig.TagMode == TagMode.TagTable)
await AddTagDataFromTagTable(toAdd, connection);

return toAdd;
if (_readJournalConfig.PluginConfig.TagMode != TagMode.TagTable)
return await rowQuery.ToListAsync();
return await AddTagDataFromTagTable(rowQuery, connection);
}

private static async Task AddTagDataFromTagTable(List<JournalRow> toAdd, AkkaDataConnection connection)
private static async Task<List<JournalRow>> AddTagDataFromTagTable(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection)
{
if (toAdd.Count == 0)
return;

var tagRows = await connection
.GetTable<JournalTagRow>()
.Where(r => r.OrderingId.In(toAdd.Select(row => row.Ordering).Distinct()))
.Select(r => new TagRow
var tagTable = connection.GetTable<JournalTagRow>();
var q =
from jr in rowQuery
select new
{
OrderingId = r.OrderingId,
TagValue = r.TagValue
})
.ToListAsync();
row = jr,
tags = tagTable
.Where(r => r.OrderingId == jr.Ordering)
.StringAggregate(";", r => r.TagValue)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use SQL built-in string aggregate function to retrieve all tags in one go

.ToValue()
};

var res = await q.ToListAsync();

foreach (var journalRow in toAdd)
var result = new List<JournalRow>();
foreach (var row in res)
{
journalRow.TagArr = tagRows
.Where(r => r.OrderingId == journalRow.Ordering)
.Select(r => r.TagValue)
.ToArray();
row.row.TagArr = row.tags?.Split(';') ?? Array.Empty<string>();
result.Add(row.row);
}
return result;
}
}
}