Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make #16284 backward compatible #16334

Merged
merged 6 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ The following table outlines the configuration properties related to the `lagBas
|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000|
|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
|`lagAggregate`|The aggregate function used to compute the lag metric for scaling decisions. Possible values are `MAX`, `SUM` and `AVERAGE`. |No|`SUM`|

The following example shows a supervisor spec with `lagBased` autoscaler:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,6 @@ protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetad
);
}

@Override
public long computeLagForAutoScaler()
{
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getMaxLag();
}

private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> terminatedPartitionIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -154,8 +156,17 @@
LOCK.lock();
try {
if (!spec.isSuspended()) {
long lag = supervisor.computeLagForAutoScaler();
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
LagStats lagStats = supervisor.computeLagStats();

if (lagStats != null) {
AggregateFunction aggregate = lagBasedAutoScalerConfig.getLagAggregate() == null ?
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
lagStats.getAggregateForScaling() :
lagBasedAutoScalerConfig.getLagAggregate();
long lag = lagStats.getMetric(aggregate);
lagMetricsQueue.offer(lag > 0 ? lag : 0L);

Check notice

Code scanning / CodeQL

Ignored error status of call Note

Method run ignores exceptional return value of CircularFifoQueue.offer.
} else {
lagMetricsQueue.offer(0L);

Check notice

Code scanning / CodeQL

Ignored error status of call Note

Method run ignores exceptional return value of CircularFifoQueue.offer.
}
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand All @@ -45,6 +46,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
private final int scaleOutStep;
private final boolean enableTaskAutoScaler;
private final long minTriggerScaleActionFrequencyMillis;
private final AggregateFunction lagAggregate;

@JsonCreator
public LagBasedAutoScalerConfig(
Expand All @@ -61,7 +63,8 @@ public LagBasedAutoScalerConfig(
@Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
@Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
@Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler,
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis,
@Nullable @JsonProperty("lagAggregate") AggregateFunction lagAggregate
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
Expand All @@ -73,6 +76,7 @@ public LagBasedAutoScalerConfig(
this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000;
this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3;
this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9;
this.lagAggregate = lagAggregate;

// Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled.
// If autoscaler is disabled, no matter what configs are set, they are not used.
Expand Down Expand Up @@ -186,6 +190,13 @@ public long getMinTriggerScaleActionFrequencyMillis()
return minTriggerScaleActionFrequencyMillis;
}

@JsonProperty
@Nullable
public AggregateFunction getLagAggregate()
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
{
return lagAggregate;
}

@Override
public String toString()
{
Expand All @@ -204,6 +215,7 @@ public String toString()
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
", scaleInStep=" + scaleInStep +
", scaleOutStep=" + scaleOutStep +
", lagAggregate=" + lagAggregate +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,5 @@ default Boolean isHealthy()
*/
LagStats computeLagStats();

/**
* Used by AutoScaler to make scaling decisions.
*/
default long computeLagForAutoScaler()
{
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getTotalLag();
}

int getActiveTaskGroupsCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord.supervisor.autoscaler;

public enum AggregateFunction
{
MAX,
SUM,
AVERAGE
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ public class LagStats
private final long maxLag;
private final long totalLag;
private final long avgLag;
private final AggregateFunction aggregateForScaling;

public LagStats(long maxLag, long totalLag, long avgLag)
{
this(maxLag, totalLag, avgLag, AggregateFunction.SUM);
}

public LagStats(long maxLag, long totalLag, long avgLag, AggregateFunction aggregateForScaling)
{
this.maxLag = maxLag;
this.totalLag = totalLag;
this.avgLag = avgLag;
this.aggregateForScaling = aggregateForScaling == null ? AggregateFunction.SUM : aggregateForScaling;
}

public long getMaxLag()
Expand All @@ -46,4 +53,26 @@ public long getAvgLag()
{
return avgLag;
}

/**
* The preferred scaling metric that supervisor may specify to be used.
* This could be overrided by the autscaler.
*/
public AggregateFunction getAggregateForScaling()
{
return aggregateForScaling;
}

public long getMetric(AggregateFunction metric)
{
switch (metric) {
case MAX:
return getMaxLag();
case SUM:
return getTotalLag();
case AVERAGE:
return getAvgLag();
}
throw new IllegalStateException("Unknown scale metric");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@

package org.apache.druid.indexing.overlord.supervisor;

import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class SupervisorTest
public class LagStatsTest
{
@Test
public void testAutoScalerLagComputation()
{
Supervisor supervisor = Mockito.spy(Supervisor.class);
LagStats lagStats = new LagStats(1, 2, 3);

Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3));
Assert.assertEquals(2, supervisor.computeLagForAutoScaler());

Mockito.when(supervisor.computeLagStats()).thenReturn(null);
Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
Assert.assertEquals(1, lagStats.getMetric(AggregateFunction.MAX));
Assert.assertEquals(2, lagStats.getMetric(AggregateFunction.SUM));
Assert.assertEquals(3, lagStats.getMetric(AggregateFunction.AVERAGE));
Assert.assertEquals(AggregateFunction.SUM, lagStats.getAggregateForScaling());
}
}
Loading