-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Accept PubSub events after idle #599
Conversation
timestamp.toEpochMilli, | ||
eventMetadata = None, | ||
"", // FIXME entityType | ||
0, // FIXME slice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the slice should be the minSlice of the query
val event: Any = 0 // FIXME could be populated with idle slices, represented as bitmap 0-1023 | ||
new EventEnvelope( | ||
TimestampOffset(timestamp, Map.empty), | ||
"", // FIXME persistenceId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might need some fake persistenceId
"", // FIXME entityType | ||
0, // FIXME slice | ||
filtered = true, | ||
source = EnvelopeOrigin.SourceHeartbeat, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably more places downstreams where we need to handle this new source type
@@ -149,6 +154,22 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat | |||
tags = row.tags) | |||
} | |||
|
|||
def createEventEnvelopeHeartbeat(timestamp: Instant): EventEnvelope[Any] = { | |||
val event: Any = 0 // FIXME could be populated with idle slices, represented as bitmap 0-1023 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a next step would be to handle this in the OffsetStore, and thereby have progress per slice even if it's idle and even if there are no events for certain slices
def heeartbeat(state: QueryState): Option[Envelope] = { | ||
if (state.idleCount >= 1) { | ||
val timestamp = state.latestBacktracking.timestamp.plusMillis( | ||
settings.querySettings.refreshInterval.toMillis * state.idleCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is how time progress, still requires a first backtracking event to have a starting point (otherwise it starts from 1970)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to keep track of wall clock since the latestBacktracking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to just keep track of wall clock time since
@@ -150,8 +153,12 @@ final private[r2dbc] class ContinuousQuery[S, T]( | |||
next() | |||
} | |||
}) | |||
val sourceWithHeartbeat = heartbeat(newState) match { | |||
case None => source | |||
case Some(h) => Source.single(h).concat(source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a little strange to emit it before a real query, but I didn't see an easy way to do it after a query with a fully updated state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼 direction looks good to me
@@ -502,12 +503,22 @@ import org.slf4j.Logger | |||
.via(deserializeAndAddOffset(newState.currentOffset))) | |||
} | |||
|
|||
def heeartbeat(state: QueryState): Option[Envelope] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def heeartbeat(state: QueryState): Option[Envelope] = { | |
def heartbeat(state: QueryState): Option[Envelope] = { |
def heeartbeat(state: QueryState): Option[Envelope] = { | ||
if (state.idleCount >= 1) { | ||
val timestamp = state.latestBacktracking.timestamp.plusMillis( | ||
settings.querySettings.refreshInterval.toMillis * state.idleCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to just keep track of wall clock time since
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Don't know if we can improve that persistence id generation.
|
||
@tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = { | ||
if (n < 1000000) { | ||
val pid = PersistenceId.concat(entityType, s"_ht-$n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use _hb
here, to align with the HB
for the source name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, do you think it is unique enough to not have collision with a user's pid? Otherwise we could make it longer by adding a uuid, either a fixed one or a new one per R2dbcReadJournal instance (same lifecycle as the cache)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we want to keep it short, but would be good with something signalling that it is an akka pid if it shows up in some error somewhere, so maybe _hb-akka-$n
?
@tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = { | ||
if (n < 1000000) { | ||
val pid = PersistenceId.concat(entityType, s"_ht-$n") | ||
if (persistenceExt.sliceForPersistenceId(pid) == slice) | ||
pid | ||
else | ||
generateHeartbeatPersistenceId(entityType, slice, n + 1) | ||
} else | ||
throw new IllegalStateException(s"Couldn't find heartbeat persistenceId for [$entityType] with slice [$slice]") | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like the most awkward part of the solution. Needing to find a dummy persistence id that maps to the slice.
Could we have a completely custom persistence id instead, that doesn't include the entityType? So that the persistence id still hashes to the min slice correctly, and the entityType is in the envelope already. Then it could be generated upfront as a constant. Or do we rely on extracting the entity type from the persistence id in too many places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not frequent enough to worry about though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried about that the entityType extraction (and also the slice from pid) is used in many places, since everything is scoped around the entityType, such as the query itself and sharding. Might work but high risk.
Since it's cached with the same lifecycle as R2dbcReadJournal instance (typically one per ActorSystem) it will not be very many.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some tricks we could do (around how string hashcode is calculated) to avoid needing to brute force it. But that's something we could come back to later on. No problem to change what the heartbeat persistence id is, providing it still maps to the right slice.
// using wall clock to measure duration since the last backtracking event | ||
val timestamp = | ||
state.latestBacktracking.timestamp.plus(JDuration.between(state.latestBacktrackingWallClock, clock.instant())) | ||
createHeartbeat(timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so rather than just wall clock time, we make this relative to what could be the db timestamp. Makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this can create some quite old timestamps. If replaying older events, then the latest backtracking timestamp can be from some time ago, while latest backtracking wall clock is from when that offset was seen. So whatever the distance is between those, we end up that far back from the current time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I assume we wanted to avoid using the current wall clock instant, in case it's skewed from the database timestamps for events. But if we have a big enough gap between the event timestamp and when it was seen by backtracking, then the heartbeats will be outside the backtracking window and published events will need to be filtered. Won't resolve itself until new backtracking envelopes are seen, as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be tricky to have a relative timestamp for all cases, as it needs to be from an event that was not originally written too long ago. With backtracking events, only works when it's up to date, and these are seen close enough to their write times. To have a more accurate marker for the latest db timestamp, maybe this could use published events instead — they'll be arriving soon after their writes. If we use the last seen published event + wall clock difference, then it could give an accurate relative heartbeat time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess basing it all in published events would need to be quite different. Mostly tracking it together in the pubsub event filtering, with just an empty heartbeat on idle coming from the query here. And maybe opens up the possibility of heartbeats actually getting ahead of later published events?
Maybe this should be quite simple. Get the current timestamp from the database when the query starts, and then just create relative timestamps from that start timestamp for each heartbeat on idle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to using an initial database timestamp in #612.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but having a hard time understanding potential bad consequences
|
||
@tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = { | ||
if (n < 1000000) { | ||
val pid = PersistenceId.concat(entityType, s"_ht-$n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we want to keep it short, but would be good with something signalling that it is an akka pid if it shows up in some error somewhere, so maybe _hb-akka-$n
?
@tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = { | ||
if (n < 1000000) { | ||
// including a uuid to make sure it is not the same as any persistence id of the application | ||
val pid = PersistenceId.concat(entityType, s"_hb-$heartbeatUuid-$n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a uuid in the pid. Shouldn't be high frequency of these. Now it's one per idle tick (per 3 seconds), but we could emit less often. Could be nice to be able to track correlation via an uuid as well. Not a big deal, could change to s"_hb-akka-$n"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems ok. I guess we could also make it more unique with its format and mark as akka internal. But can be revisited at any time.
Yeah. I think needing to extend it within the existing constraints, with dummy events and special persistence id, rather than being able to have a different envelope type or something, makes it more difficult. |
This should probably not go in to a patch release since there could be cases in Akka Projection where it's not filtered out completely. akka/akka-projection#1200 fixes that and we can include this and that in next minor versions |
* PubSub events are ignored if they are too far ahead of backtracking. * That means that they will always be ignored after an idle period. * This emits heartbeat events when the query is idle and thereby progress the backtracking timestamp in the PubSub filter.
* including fake persistenceId corresponding to the slice
1be050a
to
f125c73
Compare
Included in #614 |
This is just a sketch so far. I haven't tried it at all yet.