Skip to content

Commit

Permalink
Shallow copying EnvironmentVariables map before modification (#5182) (#…
Browse files Browse the repository at this point in the history
…186)

## Overview
Cherry-picking the [OSS PR](#5182). This fixes issues with concurrent map writes in ArrayNodes where the `ExecutionContext` has environment variables (outside of those injected by ArrayNode). 

## Test Plan
Testing locally and against known repro workflow.

## Rollout Plan (if applicable)
Should be no compatibility issues.

## Upstream Changes
Pulled from upstream.

## Jira Issue
_NA_

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
hamersaw authored Apr 5, 2024
1 parent a688479 commit b0ec5c0
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions flytepropeller/pkg/controller/nodes/array/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ func (a *arrayExecutionContext) GetExecutionConfig() v1alpha1.ExecutionConfig {

func newArrayExecutionContext(executionContext executors.ExecutionContext, subNodeIndex int) *arrayExecutionContext {
executionConfig := executionContext.GetExecutionConfig()
if executionConfig.EnvironmentVariables == nil {
executionConfig.EnvironmentVariables = make(map[string]string)

// since maps are all reference types in golang if we are going to modify the
// EnvironmentVariables for each subNode we need to at least shallow copy the map to ensure
// there are no concurrent modifications during parallelized evaluation of subNodes
environmentVariables := executionConfig.EnvironmentVariables
executionConfig.EnvironmentVariables = make(map[string]string)
for key, value := range environmentVariables {
executionConfig.EnvironmentVariables[key] = value
}
executionConfig.EnvironmentVariables[JobIndexVarName] = FlyteK8sArrayIndexVarName
executionConfig.EnvironmentVariables[FlyteK8sArrayIndexVarName] = strconv.Itoa(subNodeIndex)

executionConfig.MaxParallelism = 0 // hardcoded to 0 because parallelism is handled by the array node

return &arrayExecutionContext{
Expand Down

0 comments on commit b0ec5c0

Please sign in to comment.