Skip to content

Commit

Permalink
Separate metrics collection from WorkerSpawnRunner to distinct class.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 452041457
Change-Id: Id9bedc382a2224756e21bd1e94b3b32dbd974b73
  • Loading branch information
Googler authored and copybara-github committed May 31, 2022
1 parent 053fb77 commit fcaa247
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 216 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ java_library(
"WorkerParser.java",
"WorkerSpawnStrategy.java",
"WorkerKey.java",
"WorkerMetricsCollector.java",
],
),
deps = [
Expand Down Expand Up @@ -62,6 +63,7 @@ java_library(
java_library(
name = "worker_spawn_runner",
srcs = [
"WorkerMetricsCollector.java",
"WorkerParser.java",
"WorkerSpawnRunner.java",
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2022 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.worker;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.buildtool.CollectMetricsEvent;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.util.OS;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** Collects and populates system metrics about the workers. */
class WorkerMetricsCollector {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final ExtendedEventHandler reporter;
/** Mapping of worker ids to their metrics. */
private Map<Integer, WorkerMetric> workerIdToWorkerMetric = new ConcurrentHashMap<>();

public WorkerMetricsCollector(ExtendedEventHandler reporter, EventBus eventBus) {
this.reporter = reporter;
eventBus.register(this);
}

// Collects process stats for each worker
@VisibleForTesting
public Map<Long, WorkerMetric.WorkerStat> collectStats(OS os, List<Long> processIds) {
Map<Long, WorkerMetric.WorkerStat> pidResults = new HashMap<>();

if (os != OS.LINUX && os != OS.DARWIN) {
return pidResults;
}

BufferedReader psOutput;
try {
psOutput =
new BufferedReader(
new InputStreamReader(this.buildPsProcess(processIds).getInputStream(), UTF_8));
} catch (IOException e) {
logger.atWarning().withCause(e).log("Error while executing command for pids: %s", processIds);
return pidResults;
}

try {
// The output of the above ps command looks similar to this:
// PID RSS
// 211706 222972
// 2612333 6180
// We skip over the first line (the header) and then parse the PID and the resident memory
// size in kilobytes.
Instant now = Instant.now();
String output = null;
boolean isFirst = true;
while ((output = psOutput.readLine()) != null) {
if (isFirst) {
isFirst = false;
continue;
}

List<String> line = Splitter.on(" ").trimResults().omitEmptyStrings().splitToList(output);
if (line.size() != 2) {
logger.atWarning().log("Unexpected length of split line %s %d", output, line.size());
continue;
}

long pid = Long.parseLong(line.get(0));
int memoryInKb = Integer.parseInt(line.get(1)) / 1000;

pidResults.put(pid, new WorkerMetric.WorkerStat(memoryInKb, now));
}
} catch (IllegalArgumentException | IOException e) {
logger.atWarning().withCause(e).log("Error while parsing psOutput: %s", psOutput);
}
return pidResults;
}

@VisibleForTesting
public Process buildPsProcess(List<Long> processIds) throws IOException {
ImmutableList<Long> filteredProcessIds =
processIds.stream().filter(p -> p > 0).collect(toImmutableList());
String pids = Joiner.on(",").join(filteredProcessIds);
return new ProcessBuilder("ps", "-o", "pid,rss", "-p", pids).start();
}

@SuppressWarnings("unused")
@Subscribe
public void onCollectMetricsEvent(CollectMetricsEvent event) {
Map<Long, WorkerMetric.WorkerStat> workerStats =
collectStats(
OS.getCurrent(),
this.workerIdToWorkerMetric.values().stream()
.map(WorkerMetric::getProcessId)
.collect(toImmutableList()));

for (WorkerMetric workerMetric : this.workerIdToWorkerMetric.values()) {
WorkerMetric.WorkerStat workerStat = workerStats.get(workerMetric.getProcessId());
if (workerStat == null) {
workerMetric.setIsMeasurable(false);
continue;
}
workerMetric.addWorkerStat(workerStat);
}

this.reporter.post(
new WorkerMetricsEvent(new ArrayList<>(this.workerIdToWorkerMetric.values())));
this.workerIdToWorkerMetric.clear();

// remove dead workers from metrics list
Map<Integer, WorkerMetric> measurableWorkerMetrics = new HashMap<>();
for (WorkerMetric workerMetric : workerIdToWorkerMetric.values()) {
if (workerMetric.getIsMeasurable()) {
measurableWorkerMetrics.put(workerMetric.getWorkerId(), workerMetric);
}
}

this.workerIdToWorkerMetric = measurableWorkerMetrics;
}

/**
* Initializes metricsSet for workers. If worker metrics already exists for this worker, does
* nothing
*/
public void initializeMetricsSet(WorkerKey workerKey, Worker worker) {

if (workerIdToWorkerMetric.containsKey(worker.getWorkerId())) {
return;
}
long processId = worker.getProcessId();

WorkerMetric workerMetric =
new WorkerMetric(
worker.getWorkerId(),
processId,
workerKey.getMnemonic(),
workerKey.isMultiplex(),
workerKey.isSandboxed());

workerIdToWorkerMetric.put(worker.getWorkerId(), workerMetric);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public void registerSpawnStrategies(
RunfilesTreeUpdater.INSTANCE,
env.getOptions().getOptions(WorkerOptions.class),
env.getEventBus(),
Runtime.getRuntime(),
env.getXattrProvider());
ExecutionOptions executionOptions =
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class));
Expand Down
Loading

0 comments on commit fcaa247

Please sign in to comment.