Skip to content

Commit

Permalink
fix: duplicate entry exception for correlation_ids table. (#4521)
Browse files Browse the repository at this point in the history
* fix: duplicate entry exception for correlation_ids table.

* feat: add unit test.

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
(cherry picked from commit 8523343)
  • Loading branch information
armory-abedonik authored and mergify[bot] committed Sep 22, 2023
1 parent c68cc87 commit 557899d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,28 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
status << ExecutionStatus.values()
}

def "should return task ref for currently running pipeline by correlation id"() {
given:
def execution = pipeline {
trigger = new DefaultTrigger("manual", "covfefe")
}
repository().store(execution)
repository().updateStatus(execution.type, execution.id, RUNNING)

when:
def result = repository().retrievePipelineForCorrelationId('covfefe')

then:
result.id == execution.id

when:
repository().updateStatus(execution.type, execution.id, SUCCEEDED)
repository().retrievePipelineForCorrelationId('covfefe')

then:
thrown(ExecutionNotFoundException)
}

def "should return task ref for currently running orchestration by correlation id"() {
given:
def execution = orchestration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ public PipelineExecution retrieveByCorrelationId(
@Override
public PipelineExecution retrievePipelineForCorrelationId(@Nonnull String correlationId)
throws ExecutionNotFoundException {
String key = format("pipelineCorrelation:%s", correlationId);
String key = format("correlation:%s", correlationId);
return getRedisDelegate(key)
.withCommandsClient(
correlationRedis -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,18 +893,12 @@ class SqlExecutionRepository(
}

withPool(poolName) {
val exists = ctx.fetchExists(
ctx.select()
.from("correlation_ids")
.where(field("id").eq(execution.trigger.correlationId))
.and(executionIdField.eq(execution.id))
)
if (!exists) {
ctx.insertInto(table("correlation_ids"))
.columns(field("id"), executionIdField)
.values(execution.trigger.correlationId, execution.id)
.execute()
}
ctx.insertInto(table("correlation_ids"))
.columns(field("id"), executionIdField)
.values(execution.trigger.correlationId, execution.id)
.onConflict()
.doNothing()
.execute()
}
}
}
Expand Down

0 comments on commit 557899d

Please sign in to comment.