Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue for cold entities #64

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions src/main/java/org/opensearch/ad/ratelimit/ColdEntityQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ad.ratelimit;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_BATCH_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_SECS;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Random;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.ad.NodeStateManager;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

/**
* A queue slowly releasing low-priority requests to CheckpointReadQueue
*
* ColdEntityQueue is a queue to absorb cold entities. Like hot entities, we load a cold
* entity's model checkpoint from disk, train models if the checkpoint is not found,
* query for missed features to complete a shingle, use the models to check whether
* the incoming feature is normal, update models, and save the detection results to disks. 
* Implementation-wise, we reuse the queues we have developed for hot entities.
* The differences are: we process hot entities as long as resources (e.g., AD
* thread pool has availability) are available, while we release cold entity requests
* to other queues at a slow controlled pace. Also, cold entity requests' priority is low.
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
* So only when there are no hot entity requests to process are we going to process cold
* entity requests. 
*
*/
public class ColdEntityQueue extends RateLimitedQueue<EntityFeatureRequest> {
private static final Logger LOG = LogManager.getLogger(ColdEntityQueue.class);

private int batchSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add volatile?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

private final CheckpointReadQueue checkpointReadQueue;
private boolean scheduled;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does scheduled mean? Add some comments?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment "// indicate whether a future pull over cold entity queues is scheduled"

private int expectedExecutionTimeInSecsPerRequest;

public ColdEntityQueue(
long heapSizeInBytes,
int singleRequestSizeInBytes,
Setting<Float> maxHeapPercentForQueueSetting,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not get this setting like line 109 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it in the super class to reduce redundant code.

ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Clock clock,
float mediumSegmentPruneRatio,
float lowSegmentPruneRatio,
int maintenanceFreqConstant,
CheckpointReadQueue checkpointReadQueue,
Duration stateTtl,
NodeStateManager nodeStateManager
) {
super(
"cold-entity",
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
heapSizeInBytes,
singleRequestSizeInBytes,
maxHeapPercentForQueueSetting,
clusterService,
random,
adCircuitBreakerService,
threadPool,
settings,
maxQueuedTaskRatio,
clock,
mediumSegmentPruneRatio,
lowSegmentPruneRatio,
maintenanceFreqConstant,
stateTtl,
nodeStateManager
);

this.batchSize = CHECKPOINT_READ_QUEUE_BATCH_SIZE.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CHECKPOINT_READ_QUEUE_BATCH_SIZE, it -> this.batchSize = it);

this.checkpointReadQueue = checkpointReadQueue;
this.scheduled = false;

this.expectedExecutionTimeInSecsPerRequest = AnomalyDetectorSettings.EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_SECS.get(settings);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_SECS, it -> this.expectedExecutionTimeInSecsPerRequest = it);
}

private int pullRequests() {
int requestSize = 0;
try {
List<EntityFeatureRequest> requests = getRequests(batchSize);
if (requests == null || requests.isEmpty()) {
return 0;
}
checkpointReadQueue.putAll(requests);
requestSize = requests.size();
} catch (Exception e) {
LOG.error("Error enqueuing cold entity requests", e);
} finally {
if (requestSize < batchSize) {
scheduled = false;
} else {
// there might be more to fetch
// schedule a pull from queue every few seconds. Add randomness to
// cope with the case that we want to execute at least 1 request every
// three seconds, but cannot guarantee that.
schedulePulling(getScheduleDelay(requestSize));
scheduled = true;
}
}
return requestSize;
}

private synchronized void schedulePulling(TimeValue delay) {
try {
threadPool.schedule(this::pullRequests, delay, AnomalyDetectorPlugin.AD_THREAD_POOL_NAME);
} catch (Exception e) {
LOG.error("Fail to schedule cold entity pulling", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if fail to schedule? Seems we will just poll part of requests and put into checkpointReadQueue and ignore other requests if fail to schedule.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am logging failures. threadpool.schedule is the basic API provided by Opensearch. If it fails, I don't have fallback as sth is fundamentally wrong.

}
}

/**
* only pull requests to process when there's no other scheduled run
*/
@Override
protected void triggerProcess() {
if (false == scheduled) {
pullRequests();
}
}

/**
* @param requestSize requests to process
* @return the delay for the next scheduled run
*/
private TimeValue getScheduleDelay(int requestSize) {
int expectedSingleRequestExecutionMillis = 1000 * expectedExecutionTimeInSecsPerRequest;
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
int waitMilliSeconds = requestSize * expectedSingleRequestExecutionMillis;
return TimeValue.timeValueMillis(waitMilliSeconds + random.nextInt(waitMilliSeconds));
}
}