Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make #16284 backward compatible #16334

Merged
merged 6 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ The following table outlines the configuration properties related to the `lagBas
|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000|
|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
|`lagStatsType`|The stat ("MAX"/"TOTAL"/"AVG") to choose from the partitions lag for scaling decisions|No|Default provided by the supervisor|
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved

The following example shows a supervisor spec with `lagBased` autoscaler:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,6 @@ protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetad
);
}

@Override
public long computeLagForAutoScaler()
{
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getMaxLag();
}

private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> terminatedPartitionIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -154,8 +155,15 @@
LOCK.lock();
try {
if (!spec.isSuspended()) {
long lag = supervisor.computeLagForAutoScaler();
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
LagStats lagStats = supervisor.computeLagStats();
if (lagStats != null) {
long lag = lagBasedAutoScalerConfig.getLagStatsType() != null ?
lagStats.getMetric(lagBasedAutoScalerConfig.getLagStatsType()) :
lagStats.getPrefferedScalingMetric();
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
lagMetricsQueue.offer(lag > 0 ? lag : 0L);

Check notice

Code scanning / CodeQL

Ignored error status of call Note

Method run ignores exceptional return value of CircularFifoQueue.offer.
} else {
lagMetricsQueue.offer(0L);

Check notice

Code scanning / CodeQL

Ignored error status of call Note

Method run ignores exceptional return value of CircularFifoQueue.offer.
}
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScalingMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand All @@ -45,6 +46,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
private final int scaleOutStep;
private final boolean enableTaskAutoScaler;
private final long minTriggerScaleActionFrequencyMillis;
private final ScalingMetric lagStatsType;

@JsonCreator
public LagBasedAutoScalerConfig(
Expand All @@ -61,7 +63,8 @@ public LagBasedAutoScalerConfig(
@Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
@Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
@Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler,
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis,
@Nullable @JsonProperty("lagStatsType") ScalingMetric lagStatsType
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
Expand All @@ -73,6 +76,7 @@ public LagBasedAutoScalerConfig(
this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000;
this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3;
this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9;
this.lagStatsType = lagStatsType;

// Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled.
// If autoscaler is disabled, no matter what configs are set, they are not used.
Expand Down Expand Up @@ -186,6 +190,12 @@ public long getMinTriggerScaleActionFrequencyMillis()
return minTriggerScaleActionFrequencyMillis;
}

@JsonProperty
public ScalingMetric getLagStatsType()
{
return lagStatsType;
}

@Override
public String toString()
{
Expand All @@ -204,6 +214,7 @@ public String toString()
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
", scaleInStep=" + scaleInStep +
", scaleOutStep=" + scaleOutStep +
", lagStatsType=" + lagStatsType +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,7 @@ private static Map<String, Object> getScaleOutProperties(int maxTaskCount)
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
autoScalerConfig.put("lagStatsType", "MAX");
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
return autoScalerConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,5 @@ default Boolean isHealthy()
*/
LagStats computeLagStats();

/**
* Used by AutoScaler to make scaling decisions.
*/
default long computeLagForAutoScaler()
{
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getTotalLag();
}

int getActiveTaskGroupsCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ public class LagStats
private final long maxLag;
private final long totalLag;
private final long avgLag;
private final ScalingMetric preferredScalingMetric;
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved

public LagStats(long maxLag, long totalLag, long avgLag)
{
this(maxLag, totalLag, avgLag, ScalingMetric.TOTAL);
}

public LagStats(long maxLag, long totalLag, long avgLag, ScalingMetric preferredScalingMetric)
{
this.maxLag = maxLag;
this.totalLag = totalLag;
this.avgLag = avgLag;
this.preferredScalingMetric = preferredScalingMetric;
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
}

public long getMaxLag()
Expand All @@ -46,4 +53,22 @@ public long getAvgLag()
{
return avgLag;
}

public long getPrefferedScalingMetric()
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
{
return getMetric(preferredScalingMetric);
}

public long getMetric(ScalingMetric metric)
{
switch (metric) {
case MAX:
return getMaxLag();
case TOTAL:
return getTotalLag();
case AVG:
return getAvgLag();
}
throw new IllegalStateException("Unknown scale metric");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord.supervisor.autoscaler;

public enum ScalingMetric
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
{
MAX,
TOTAL,
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
AVG
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@
package org.apache.druid.indexing.overlord.supervisor;

import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScalingMetric;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class SupervisorTest
public class LagStatsTest
{
@Test
public void testAutoScalerLagComputation()
{
Supervisor supervisor = Mockito.spy(Supervisor.class);
LagStats lagStats = new LagStats(1, 2, 3);

Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3));
Assert.assertEquals(2, supervisor.computeLagForAutoScaler());
Assert.assertEquals(1, lagStats.getMetric(ScalingMetric.MAX));
Assert.assertEquals(2, lagStats.getMetric(ScalingMetric.TOTAL));
Assert.assertEquals(3, lagStats.getMetric(ScalingMetric.AVG));
Assert.assertEquals(2, lagStats.getPrefferedScalingMetric());

Mockito.when(supervisor.computeLagStats()).thenReturn(null);
Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
lagStats = new LagStats(1, 2, 3, ScalingMetric.MAX);
Assert.assertEquals(1, lagStats.getPrefferedScalingMetric());
}
}
Loading