Skip to content

Commit

Permalink
fix(peering): Don't use null partition on local db (#3466)
Browse files Browse the repository at this point in the history
For historical reasons, `partition=null` is set on all current executions (since sqlrepo runs without a partition).
So when checking if an execution is local we need to check if `parition == MY_PARTITION` OR `partition is null`.
However, this check is not correct when copying from the peer. From the peer we need to get `partition == peerId or partition is null`,
but locally it must always be `partition == peerId` because we force set the partition during copy.
Without that we would think that we need to delete all our local executions since they would not show up in the peer query
This change fixes this issue
  • Loading branch information
marchello2000 authored Feb 26, 2020
1 parent 1321898 commit 3d3117a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,37 @@ open class MySqlRawAccess(
maxPacketSize -= 8192
}

override fun getCompletedExecutionIds(executionType: Execution.ExecutionType, partitionName: String, updatedAfter: Long): List<ExecutionDiffKey> {
override fun getCompletedExecutionIds(executionType: Execution.ExecutionType, partitionName: String?, updatedAfter: Long): List<ExecutionDiffKey> {
val partitionConstraint = if (partitionName == null) {
field("`partition`").isNull
} else {
field("`partition`").eq(partitionName)
}

return withPool(poolName) {
jooq
.select(field("id"), field("updated_at"))
.from(getExecutionTable(executionType))
.where(field("status").`in`(*completedStatuses.toTypedArray())
.and(field("updated_at").gt(updatedAfter))
.and(field("`partition`").eq(partitionName).or(field("`partition`").isNull)))
.and(partitionConstraint))
.fetchInto(ExecutionDiffKey::class.java)
}
}

override fun getActiveExecutionIds(executionType: Execution.ExecutionType, partitionName: String): List<String> {
override fun getActiveExecutionIds(executionType: Execution.ExecutionType, partitionName: String?): List<String> {
val partitionConstraint = if (partitionName == null) {
field("`partition`").isNull
} else {
field("`partition`").eq(partitionName)
}

return withPool(poolName) {
jooq
.select(field("id"))
.from(getExecutionTable(executionType))
.where(field("status").notIn(*completedStatuses.toTypedArray())
.and(field("`partition`").eq(partitionName).or(field("`partition`").isNull)))
.and(partitionConstraint))
.fetch(field("id"), String::class.java)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class PeeringAgent(
log.debug("Starting active $executionType copy for peering")

val activePipelineIds = srcDB.getActiveExecutionIds(executionType, peeredId)
.plus(srcDB.getActiveExecutionIds(executionType, null))

if (activePipelineIds.isNotEmpty()) {
log.debug("Found ${activePipelineIds.size} active $executionType, copying all")
Expand Down Expand Up @@ -139,6 +140,7 @@ class PeeringAgent(

// Compute diff
val completedPipelineKeys = srcDB.getCompletedExecutionIds(executionType, peeredId, updatedAfter)
.plus(srcDB.getCompletedExecutionIds(executionType, null, updatedAfter))
val migratedPipelineKeys = destDB.getCompletedExecutionIds(executionType, peeredId, updatedAfter)

val completedPipelineKeysMap = completedPipelineKeys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ abstract class SqlRawAccess(
/**
* Returns a list of execution IDs and their update_at times for completed executions
*/
abstract fun getCompletedExecutionIds(executionType: Execution.ExecutionType, partitionName: String, updatedAfter: Long): List<ExecutionDiffKey>
abstract fun getCompletedExecutionIds(executionType: Execution.ExecutionType, partitionName: String?, updatedAfter: Long): List<ExecutionDiffKey>

/**
* Returns a list of execution IDs for active (not completed) executions
*/
abstract fun getActiveExecutionIds(executionType: Execution.ExecutionType, partitionName: String): List<String>
abstract fun getActiveExecutionIds(executionType: Execution.ExecutionType, partitionName: String?): List<String>

/**
* Returns a list of stage IDs that belong to the given executions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class PeeringAgentSpec extends Specification {
return false
}
0 * src.getCompletedExecutionIds(_, _, _)
0 * dest.getCompletedExecutionIds(_, _, _)

when: 'disabled for a given agent only'
peeringAgent.tick()
Expand Down Expand Up @@ -88,6 +89,7 @@ class PeeringAgentSpec extends Specification {

then:
1 * src.getCompletedExecutionIds(executionType, "peeredId", mostRecentTimeStamp) >> srcKeys
1 * src.getCompletedExecutionIds(executionType, null, mostRecentTimeStamp) >> srcKeysNull
1 * dest.getCompletedExecutionIds(executionType, "peeredId", mostRecentTimeStamp) >> destKeys

callCount * dest.deleteExecutions(executionType, toDelete)
Expand All @@ -97,26 +99,30 @@ class PeeringAgentSpec extends Specification {
new ExecutionCopier.MigrationChunkResult(30, 2, false)

if (executionType == PIPELINE) {
peeringAgent.completedPipelinesMostRecentUpdatedTime == srcKeys.max { it.updated_at }?.updated_at ?: 1
peeringAgent.completedPipelinesMostRecentUpdatedTime == (srcKeys + srcKeysNull).max { it.updated_at }?.updated_at ?: 1
peeringAgent.completedOrchestrationsMostRecentUpdatedTime == 2
} else {
peeringAgent.completedPipelinesMostRecentUpdatedTime == 1
peeringAgent.completedOrchestrationsMostRecentUpdatedTime == srcKeys.max { it.updated_at }?.updated_at ?: 2
peeringAgent.completedOrchestrationsMostRecentUpdatedTime == (srcKeys + srcKeysNull).max { it.updated_at }?.updated_at ?: 2
}

where:
// Note: since the logic for executions and orchestrations should be the same, it's overkill to have the same set of tests for each
// but it's easy so why not?
executionType | mostRecentTimeStamp | srcKeys | destKeys || toDelete | toCopy
PIPELINE | 1 | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 10), key("ID4", 10)] || ["ID4"] | ["ID2", "ID3"]
PIPELINE | 1 | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || [] | []
PIPELINE | 1 | [] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || ["ID1", "ID2", "ID3"] | []
PIPELINE | 1 | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] | [] || [] | ["ID1", "ID2", "ID3"]

ORCHESTRATION | 2 | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 10), key("ID4", 10)] || ["ID4"] | ["ID2", "ID3"]
ORCHESTRATION | 2 | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || [] | []
ORCHESTRATION | 2 | [] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || ["ID1", "ID2", "ID3"] | []
ORCHESTRATION | 2 | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] | [] || [] | ["ID1", "ID2", "ID3"]
executionType | mostRecentTimeStamp | srcKeys | srcKeysNull | destKeys || toDelete | toCopy
PIPELINE | 1 | [key("ID1", 10)] | [key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 10), key("ID4", 10)] || ["ID4"] | ["ID2", "ID3"]
PIPELINE | 1 | [key("ID1", 10)] | [key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || [] | []
PIPELINE | 1 | [] | [] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || ["ID1", "ID2", "ID3"] | []
PIPELINE | 1 | [key("ID1", 10)] | [key("ID2", 20), key("ID3", 30)] | [] || [] | ["ID1", "ID2", "ID3"]
PIPELINE | 1 | [] | [key("ID2", 20)] | [] || [] | ["ID2"]
PIPELINE | 1 | [key("ID1", 10)] | [] | [] || [] | ["ID1"]

ORCHESTRATION | 2 | [key("ID1", 10)] | [key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 10), key("ID4", 10)] || ["ID4"] | ["ID2", "ID3"]
ORCHESTRATION | 2 | [key("ID1", 10)] | [key("ID2", 20), key("ID3", 30)] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || [] | []
ORCHESTRATION | 2 | [] | [] | [key("ID1", 10), key("ID2", 20), key("ID3", 30)] || ["ID1", "ID2", "ID3"] | []
ORCHESTRATION | 2 | [key("ID1", 10)] | [key("ID2", 20), key("ID3", 30)] | [] || [] | ["ID1", "ID2", "ID3"]
ORCHESTRATION | 2 | [] | [key("ID2", 20)] | [] || [] | ["ID2"]
ORCHESTRATION | 2 | [key("ID1", 10)] | [] | [] || [] | ["ID1"]
}

def "copies all running executions of #executionType"() {
Expand All @@ -128,17 +134,18 @@ class PeeringAgentSpec extends Specification {

then:
1 * src.getActiveExecutionIds(executionType, "peeredId") >> activeIds
copyCallCount * copier.copyInParallel(executionType, activeIds, ExecutionState.ACTIVE) >>
1 * src.getActiveExecutionIds(executionType, null) >> activeIdsNull
copyCallCount * copier.copyInParallel(executionType, activeIds + activeIdsNull, ExecutionState.ACTIVE) >>
new ExecutionCopier.MigrationChunkResult(30, 2, false)

where:
executionType | activeIds | copyCallCount
PIPELINE | [] | 0
PIPELINE | ["ID1"] | 1
PIPELINE | ["ID1", "ID4"] | 1
ORCHESTRATION | [] | 0
ORCHESTRATION | ["ID1"] | 1
ORCHESTRATION | ["ID1", "ID4"] | 1
executionType | activeIds | activeIdsNull | copyCallCount
PIPELINE | [] | [] | 0
PIPELINE | ["ID1"] | [] | 1
PIPELINE | ["ID1", "ID4"] | ["ID5"] | 1
ORCHESTRATION | [] | [] | 0
ORCHESTRATION | ["ID1"] | [] | 1
ORCHESTRATION | ["ID1", "ID4"] | ["ID5"] | 1
}

def "doesn't delete the world"() {
Expand All @@ -159,6 +166,7 @@ class PeeringAgentSpec extends Specification {

then:
1 * src.getCompletedExecutionIds(executionType, "peeredId", 1) >> srcKeys
1 * src.getCompletedExecutionIds(executionType, null, 1) >> []
1 * dest.getCompletedExecutionIds(executionType, "peeredId", 1) >> destKeys

deleteCallCount * dest.deleteExecutions(executionType, toDelete)
Expand All @@ -179,7 +187,7 @@ class PeeringAgentSpec extends Specification {
ORCHESTRATION | [key("ID3", 30)] | [key("IDx", 10), key("ID3", 10)] || ["IDx"] | ["ID3"]
}

private static def key(id, updatedat) {
return new SqlRawAccess.ExecutionDiffKey(id, updatedat)
private static def key(id, updated_at) {
return new SqlRawAccess.ExecutionDiffKey(id, updated_at)
}
}

0 comments on commit 3d3117a

Please sign in to comment.