Skip to content

Commit

Permalink
Avoid error on duplicate view name (#80)
Browse files Browse the repository at this point in the history
On rare occasions I have seen errors where Spark complains about
creating two temporary tables with the same name. In the loader we
create table names based on the window's start time. The error was
unexpected because each window should have a different start time.

I believe this is the fix. It ensures view name is computed right at the
start of the window, and not after waiting for the table to be
initialized. It prevents consecutive windows from picking the same
timestamp in the case when the table is very slow to initialize.
  • Loading branch information
istreeter authored Sep 9, 2024
1 parent e186cf5 commit 972e471
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ object Processing {
deferredTableExists: F[Unit]
): EventProcessor[F] = { in =>
val resources = for {
_ <- Stream.eval(deferredTableExists)
windowState <- Stream.eval(WindowState.build[F])
_ <- Stream.eval(deferredTableExists)
stateRef <- Stream.eval(Ref[F].of(windowState))
_ <- manageDataFrame(env, windowState.viewName)
} yield stateRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v19700101000010", 4),
Action.CommittedToTheLake("v19700101000010"),
Action.AppendedRowsToDataFrame("v19700101000000", 4),
Action.CommittedToTheLake("v19700101000000"),
Action.AddedCommittedCountMetric(4),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action.Checkpointed(tokened.map(_.ack)),
Action.RemovedDataFrameFromDisk("v19700101000010")
Action.RemovedDataFrameFromDisk("v19700101000000")
)
)

Expand All @@ -76,7 +76,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.AddedBadCountMetric(2),
Action.SentToBad(2),
Expand All @@ -87,7 +87,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Action.AddedBadCountMetric(2),
Action.SentToBad(2),
Action.Checkpointed(tokened.map(_.ack)),
Action.RemovedDataFrameFromDisk("v19700101000010")
Action.RemovedDataFrameFromDisk("v19700101000000")
)
)

Expand All @@ -111,14 +111,14 @@ class ProcessingSpec extends Specification with CatsEffect {
Action.CreatedTable,

/* window 1 */
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v19700101000010", 2),
Action.CommittedToTheLake("v19700101000010"),
Action.AppendedRowsToDataFrame("v19700101000000", 2),
Action.CommittedToTheLake("v19700101000000"),
Action.AddedCommittedCountMetric(2),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action.Checkpointed(window1.map(_.ack)),
Action.RemovedDataFrameFromDisk("v19700101000010"),
Action.RemovedDataFrameFromDisk("v19700101000000"),

/* window 2 */
Action.InitializedLocalDataFrame("v19700101000052"),
Expand Down Expand Up @@ -160,18 +160,18 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v19700101000010", 2),
Action.AppendedRowsToDataFrame("v19700101000000", 2),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v19700101000010", 2),
Action.AppendedRowsToDataFrame("v19700101000000", 2),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v19700101000010", 2),
Action.CommittedToTheLake("v19700101000010"),
Action.AppendedRowsToDataFrame("v19700101000000", 2),
Action.CommittedToTheLake("v19700101000000"),
Action.AddedCommittedCountMetric(6),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action.Checkpointed(tokened.map(_.ack)),
Action.RemovedDataFrameFromDisk("v19700101000010")
Action.RemovedDataFrameFromDisk("v19700101000000")
)
)

Expand All @@ -191,7 +191,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.AddedBadCountMetric(2),
Action.SentToBad(2),
Expand All @@ -208,12 +208,12 @@ class ProcessingSpec extends Specification with CatsEffect {
Action.AddedBadCountMetric(2),
Action.SentToBad(2),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v19700101000010", 8),
Action.CommittedToTheLake("v19700101000010"),
Action.AppendedRowsToDataFrame("v19700101000000", 8),
Action.CommittedToTheLake("v19700101000000"),
Action.AddedCommittedCountMetric(8),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action.Checkpointed((bads1 ::: goods1 ::: bads2 ::: goods2).map(_.ack)),
Action.RemovedDataFrameFromDisk("v19700101000010")
Action.RemovedDataFrameFromDisk("v19700101000000")
)
)

Expand All @@ -239,17 +239,17 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v20231024100042"),
Action.InitializedLocalDataFrame("v20231024100032"),
Action.SetLatencyMetric(42123.millis),
Action.AddedReceivedCountMetric(2),
Action.SetLatencyMetric(42123.millis),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v20231024100042", 4),
Action.CommittedToTheLake("v20231024100042"),
Action.AppendedRowsToDataFrame("v20231024100032", 4),
Action.CommittedToTheLake("v20231024100032"),
Action.AddedCommittedCountMetric(4),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action.Checkpointed(tokened.map(_.ack)),
Action.RemovedDataFrameFromDisk("v20231024100042")
Action.RemovedDataFrameFromDisk("v20231024100032")
)
)

Expand Down Expand Up @@ -279,14 +279,14 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v19700101000010", 2),
Action.CommittedToTheLake("v19700101000010"),
Action.AppendedRowsToDataFrame("v19700101000000", 2),
Action.CommittedToTheLake("v19700101000000"),
Action.AddedCommittedCountMetric(2),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action.Checkpointed(tokened.map(_.ack)),
Action.RemovedDataFrameFromDisk("v19700101000010")
Action.RemovedDataFrameFromDisk("v19700101000000")
)
)

Expand Down Expand Up @@ -316,16 +316,16 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.AddedBadCountMetric(1),
Action.SentToBad(1),
Action.AppendedRowsToDataFrame("v19700101000010", 1),
Action.CommittedToTheLake("v19700101000010"),
Action.AppendedRowsToDataFrame("v19700101000000", 1),
Action.CommittedToTheLake("v19700101000000"),
Action.AddedCommittedCountMetric(1),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action.Checkpointed(tokened.map(_.ack)),
Action.RemovedDataFrameFromDisk("v19700101000010")
Action.RemovedDataFrameFromDisk("v19700101000000")
)
)

Expand Down Expand Up @@ -356,10 +356,10 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v19700101000010"),
Action.InitializedLocalDataFrame("v19700101000000"),
Action.AddedReceivedCountMetric(2),
Action.BecameUnhealthy(RuntimeService.Iglu),
Action.RemovedDataFrameFromDisk("v19700101000010")
Action.RemovedDataFrameFromDisk("v19700101000000")
)
)

Expand Down

0 comments on commit 972e471

Please sign in to comment.