-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add shuffle metrics for parallel indexing #10359
Changes from 3 commits
7e0fc0a
b73677f
fcd8cb2
bf23cbc
80b2949
e258dd3
e205c1d
d70529d
394b473
23312bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.worker.shuffle; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.errorprone.annotations.concurrent.GuardedBy; | ||
|
||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* Shuffle metrcis for middleManagers and indexers. This class is thread-safe because shuffle can be performed by | ||
* multiple HTTP threads while a monitoring thread periodically emits the snapshot of metrics. | ||
* | ||
* @see ShuffleResource | ||
* @see org.apache.druid.java.util.metrics.MonitorScheduler | ||
*/ | ||
public class ShuffleMetrics | ||
{ | ||
/** | ||
* This lock is used to synchronize accesses to the reference to {@link #datasourceMetrics} and the | ||
* {@link PerDatasourceShuffleMetrics} values of the map. This means, | ||
* | ||
* - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key) should be synchronized under this lock. | ||
* - Any updates on the reference to datasourceMetrics should be synchronized under this lock. | ||
*/ | ||
private final Object lock = new Object(); | ||
|
||
/** | ||
* A map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. This map is replaced with an empty map | ||
* whenever a snapshot is taken since the map can keep growing over time otherwise. For concurrent access pattern, | ||
* see {@link #shuffleRequested} and {@link #snapshotAndReset()}. | ||
*/ | ||
@GuardedBy("lock") | ||
private Map<String, PerDatasourceShuffleMetrics> datasourceMetrics = new HashMap<>(); | ||
|
||
/** | ||
* This method is called whenever a new shuffle is requested. Multiple tasks can request shuffle at the same time, | ||
* while the monitoring thread takes a snapshot of the metrics. There is a happens-before relationship between | ||
* shuffleRequested and {@link #snapshotAndReset()}. | ||
*/ | ||
public void shuffleRequested(String supervisorTaskId, long fileLength) | ||
{ | ||
synchronized (lock) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since there is a risk of the locking introducing a slow down here because of contention, can we update this to include a feature flag check? This way, if there are some unforeseen issues with locking, we can disable metric computation and reporting. I think a static feature flag - like a system property would be good enough for this use case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this locking would introduce any noticeable slow down, but feature flag sounds good. Now, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this approach a lot 🤘 |
||
datasourceMetrics.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()) | ||
.accumulate(fileLength); | ||
} | ||
} | ||
|
||
/** | ||
* This method is called whenever the monitoring thread takes a snapshot of the current metrics. The map inside | ||
* AtomicReference will be reset to an empty map after this call. This is to return the snapshot metrics collected | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment needs an update after the latest changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Fixed. |
||
* during the monitornig period. There is a happens-before relationship between snapshotAndReset() and | ||
* {@link #shuffleRequested}. | ||
*/ | ||
public Map<String, PerDatasourceShuffleMetrics> snapshotAndReset() | ||
{ | ||
synchronized (lock) { | ||
final Map<String, PerDatasourceShuffleMetrics> snapshot = Collections.unmodifiableMap(datasourceMetrics); | ||
datasourceMetrics = new HashMap<>(); | ||
return snapshot; | ||
} | ||
} | ||
|
||
/** | ||
* This method is visible only for testing. Use {@link #snapshotAndReset()} instead to get the current snapshot. | ||
*/ | ||
@VisibleForTesting | ||
Map<String, PerDatasourceShuffleMetrics> getDatasourceMetrics() | ||
{ | ||
synchronized (lock) { | ||
return datasourceMetrics; | ||
} | ||
} | ||
|
||
/** | ||
* This class represents shuffle metrics of one datasource. This class is not thread-safe and should never be accessed | ||
* by multiple threads at the same time. | ||
*/ | ||
public static class PerDatasourceShuffleMetrics | ||
{ | ||
private long shuffleBytes; | ||
private int shuffleRequests; | ||
|
||
private void accumulate(long shuffleBytes) | ||
{ | ||
this.shuffleBytes += shuffleBytes; | ||
this.shuffleRequests++; | ||
} | ||
|
||
public long getShuffleBytes() | ||
{ | ||
return shuffleBytes; | ||
} | ||
|
||
public int getShuffleRequests() | ||
{ | ||
return shuffleRequests; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.worker.shuffle; | ||
|
||
import com.google.inject.Binder; | ||
import com.google.inject.Module; | ||
import org.apache.druid.guice.Jerseys; | ||
import org.apache.druid.guice.LazySingleton; | ||
import org.apache.druid.server.metrics.MetricsModule; | ||
|
||
public class ShuffleModule implements Module | ||
{ | ||
@Override | ||
public void configure(Binder binder) | ||
{ | ||
Jerseys.addResource(binder, ShuffleResource.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a ModuleTest that validates the |
||
|
||
binder.bind(ShuffleMetrics.class).in(LazySingleton.class); | ||
binder.bind(ShuffleMonitor.class).in(LazySingleton.class); | ||
MetricsModule.register(binder, ShuffleMonitor.class); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.worker.shuffle; | ||
|
||
import com.google.inject.Inject; | ||
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; | ||
import org.apache.druid.java.util.emitter.service.ServiceEmitter; | ||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; | ||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder; | ||
import org.apache.druid.java.util.metrics.AbstractMonitor; | ||
|
||
import java.util.Map; | ||
|
||
public class ShuffleMonitor extends AbstractMonitor | ||
{ | ||
private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId"; | ||
private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes"; | ||
private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. other ingestion related metrics start with "ingest/" any thoughts on whether these metrics fall under the ingestion metrics category? I was thinking about where the metrics would live in the docs which is why I was asking this question. I thought maybe it belonged here https://druid.apache.org/docs/latest/operations/metrics.html#ingestion-metrics-realtime-process ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. The new metrics don't seem to belong to any existing section, so I added a new one. But our current doc doesn't seem organized well (for example, the metrics in the above link are not only for realtime processes, but for all task types as well), maybe we need to tidy up at some point after #10352 is done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I modified the metrics to start with |
||
|
||
private final ShuffleMetrics shuffleMetrics; | ||
|
||
@Inject | ||
public ShuffleMonitor(ShuffleMetrics shuffleMetrics) | ||
{ | ||
this.shuffleMetrics = shuffleMetrics; | ||
} | ||
|
||
@Override | ||
public boolean doMonitor(ServiceEmitter emitter) | ||
{ | ||
final Map<String, PerDatasourceShuffleMetrics> snapshot = shuffleMetrics.snapshotAndReset(); | ||
snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> { | ||
final Builder metricBuilder = ServiceMetricEvent | ||
.builder() | ||
.setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId); | ||
emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes())); | ||
emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests())); | ||
}); | ||
|
||
return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add unit tests for this function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, I thought I added one already. Added now. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious - why did you choose to use the guarded by pattern instead of a ConcurrentMap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was some prior discussion about it. It was mainly because not only updating the
datasourceMetrics
map, but also updatingPerDatasourceShuffleMetrics
should be synchronized as well. For example, if it was updatingPerDatasourceShuffleMetrics
whensnapshotAndReset()
is called, it should guarantee that the updating will be done beforesnapshotAndReset()
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - that makes sense. Thanks for the explanation