-
-
Notifications
You must be signed in to change notification settings - Fork 522
/
Copy pathReaggregation.cs
148 lines (121 loc) · 5.56 KB
/
Reaggregation.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
using FluentAssertions;
using Marten.Events.Projections;
using Marten.Integration.Tests.TestsInfrastructure;
using Xunit;
namespace Marten.Integration.Tests.EventStore.Aggregate
{
public record IssueCreated(
Guid IssueId,
string Description
);
public record IssueUpdated(
Guid IssueId,
string Description
);
namespace OldVersion
{
public class Issue
{
public Guid Id { get; set; }
public string Description { get; set; } = default!;
public void Apply(IssueCreated @event)
{
Id = @event.IssueId;
Description = @event.Description;
}
public void Apply(IssueUpdated @event)
{
Description = @event.Description;
}
}
}
namespace NewVersion
{
public class Issue
{
public Guid Id { get; set; }
public string Description { get; set; } = default!;
public void Apply(IssueCreated @event)
{
Id = @event.IssueId;
Description = $"New Logic: {@event.Description}";
}
public void Apply(IssueUpdated @event)
{
Description = $"New Logic: {@event.Description}";
}
}
}
public class Reaggregation(MartenFixture fixture): MartenTest(fixture.PostgreSqlContainer, false)
{
public IDocumentSession CreateSessionWithInlineAggregationFor<TIssue>() where TIssue : class, new()
{
return base.CreateSession(options =>
{
options.Events.AddEventTypes(new[] { typeof(IssueCreated), typeof(IssueUpdated) });
//It's needed to manualy set that inline aggegation should be applied
options.Projections.Snapshot<TIssue>(SnapshotLifecycle.Inline);
});
}
[Fact]
public void Given_When_Then()
{
var taskId = Guid.NewGuid();
var events = new object[]
{
new IssueCreated(taskId, "Issue 1"), new IssueUpdated(taskId, "Issue 1 Updated"),
};
OldVersion.Issue issueFromV1InlineAggregation;
OldVersion.Issue issueFromV1OnlineAggregation;
using (var session = CreateSessionWithInlineAggregationFor<OldVersion.Issue>())
{
//1. Publish events
session.Events.StartStream<OldVersion.Issue>(taskId, events);
session.SaveChanges();
issueFromV1InlineAggregation = session.Load<OldVersion.Issue>(taskId)!;
issueFromV1OnlineAggregation = session.Events.AggregateStream<OldVersion.Issue>(taskId)!;
}
//2. Both inline and online aggregation for the same type should be the same
issueFromV1InlineAggregation.Description.Should().Be("Issue 1 Updated");
issueFromV1InlineAggregation.Description.Should().Be(issueFromV1OnlineAggregation.Description);
//3. Simulate change to aggregation logic
NewVersion.Issue issueFromV2InlineAggregation;
NewVersion.Issue issueFromV2OnlineAggregation;
using (var session = CreateSessionWithInlineAggregationFor<NewVersion.Issue>())
{
issueFromV2InlineAggregation = session.Load<NewVersion.Issue>(taskId)!;
issueFromV2OnlineAggregation = session.Events.AggregateStream<NewVersion.Issue>(taskId)!;
}
//4. Inline aggregated snapshot won't change automatically
issueFromV2InlineAggregation.Description.Should().Be(issueFromV1InlineAggregation.Description);
issueFromV2InlineAggregation.Description.Should().NotBe("New Logic: Issue 1 Updated");
//5. But online aggregation is being applied automatically
issueFromV2OnlineAggregation.Description.Should().NotBe(issueFromV1OnlineAggregation.Description);
issueFromV2OnlineAggregation.Description.Should().Be("New Logic: Issue 1 Updated");
//6. Reagregation
using (var session = CreateSessionWithInlineAggregationFor<NewVersion.Issue>())
{
//7. Get online aggregation
//8. Store manually online aggregation as inline aggregation
session.Store(issueFromV2OnlineAggregation);
session.SaveChanges();
var taskFromV2AfterReaggregation = session.Load<NewVersion.Issue>(taskId)!;
taskFromV2AfterReaggregation.Description.Should().NotBe(issueFromV1OnlineAggregation.Description);
taskFromV2AfterReaggregation.Description.Should().Be(issueFromV2OnlineAggregation.Description);
taskFromV2AfterReaggregation.Description.Should().Be("New Logic: Issue 1 Updated");
}
using (var session = CreateSessionWithInlineAggregationFor<NewVersion.Issue>())
{
//9. Check if next event would be properly applied to inline aggregation
session.Events.Append(taskId, new IssueUpdated(taskId, "Completely New text"));
session.SaveChanges();
}
// TODO: Something has changed here in Marten v7
// using (var session = CreateSessionWithInlineAggregationFor<NewVersion.Issue>())
// {
// var taskFromV2NewInlineAggregation = session.Load<NewVersion.Issue>(taskId)!;
// taskFromV2NewInlineAggregation.Description.Should().Be("New Logic: Completely New text");
// }
}
}
}