Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aboitreaud committed Dec 20, 2024
1 parent ad9ff10 commit c9cd812
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
30 changes: 16 additions & 14 deletions pkg/fleet/installer/setup/djm/emr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

const (
emrInjectorVersion = "0.26.0-1"
emrJavaVersion = "1.42.2-1"
emrJavaTracerVersion = "1.42.2-1"
emrAgentVersion = "7.58.2-1"
commandTimeoutDuration = 10 * time.Second
)
Expand Down Expand Up @@ -75,22 +75,24 @@ func SetupEmr(s *common.Setup) error {

s.Packages.Install(common.DatadogAgentPackage, emrAgentVersion)
s.Packages.Install(common.DatadogAPMInjectPackage, emrInjectorVersion)
s.Packages.Install(common.DatadogAPMLibraryJavaPackage, emrJavaVersion)
s.Packages.Install(common.DatadogAPMLibraryJavaPackage, emrJavaTracerVersion)

hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("failed to get hostname: %w", err)
}
s.Config.DatadogYAML.Hostname = hostname
s.Config.DatadogYAML.DJM.Enabled = true
s.Config.InjectTracerYAML.AdditionalEnvironmentVariables = tracerEnvConfigEmr

// Ensure tags are always attached with the metrics
s.Config.DatadogYAML.ExpectedTagsDuration = "10m"
isMaster, clusterName, err := setupCommonEmrHostTags(s)
if err != nil {
return fmt.Errorf("failed to set tags: %w", err)
}
if isMaster {
setupEmrDriver(s, clusterName)
setupEmrResourceManager(s, clusterName)
}
return nil
}
Expand Down Expand Up @@ -122,32 +124,32 @@ func setupCommonEmrHostTags(s *common.Setup) (bool, string, error) {
}
setHostTag(s, "job_flow_id", extraInfo.JobFlowID)
setHostTag(s, "cluster_id", extraInfo.JobFlowID)
setHostTag(s, "emr_version", extraInfo.ReleaseLabel)
s.Span.SetTag("emr_version", extraInfo.ReleaseLabel)

emrResponseRaw, err := executeCommandWithTimeout("aws", "emr", "describe-cluster", "--cluster-id", extraInfo.JobFlowID)
if err != nil {
return info.IsMaster, extraInfo.JobFlowID, fmt.Errorf("error describing emr cluster, using cluster id as name: %w", err)
log.Warn("error describing emr cluster, using cluster id as name")
return info.IsMaster, extraInfo.JobFlowID, nil
}
var response emrResponse
if err = json.Unmarshal(emrResponseRaw, &response); err != nil {
return info.IsMaster, extraInfo.JobFlowID, fmt.Errorf("error unmarshalling AWS EMR response, using cluster id as name: %w", err)
}

setHostTag(s, "cluster_name", response.Cluster.Name)
return info.IsMaster, response.Cluster.Name, nil
clusterName := response.Cluster.Name
if clusterName == "" {
log.Warn("clusterName is empty, using cluster id as name")
clusterName = extraInfo.JobFlowID
}
setHostTag(s, "cluster_name", clusterName)
return info.IsMaster, clusterName, nil
}

func setupEmrDriver(s *common.Setup, clusterName string) {

s.Config.InjectTracerYAML.AdditionalEnvironmentVariables = tracerEnvConfigEmr
func setupEmrResourceManager(s *common.Setup, clusterName string) {

var sparkIntegration common.IntegrationConfig
var yarnIntegration common.IntegrationConfig

if clusterName == "" {
log.Warn("clusterName is empty, Spark and yarn integrations are not set up")
return
}
sparkIntegration.Instances = []any{
common.IntegrationConfigInstanceSpark{
SparkURL: "http://127.0.0.1:8088",
Expand Down
3 changes: 1 addition & 2 deletions pkg/fleet/installer/setup/djm/emr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func TestSetupCommonEmrHostTags(t *testing.T) {
span, _ := telemetry.StartSpanFromContext(context.Background(), "test")
s := &common.Setup{Span: span}

isMaster, clusterName, err := setupCommonEmrHostTags(s)
fmt.Println(isMaster, clusterName, err)
_, _, err := setupCommonEmrHostTags(s)
assert.Nil(t, err)
assert.ElementsMatch(t, tt.wantTags, s.Config.DatadogYAML.Tags)
})
Expand Down

0 comments on commit c9cd812

Please sign in to comment.