Skip to content

Commit

Permalink
Merge pull request #3800 from atlanhq/plt529test-beta
Browse files Browse the repository at this point in the history
DG-1942 Fix task fetching inconsistency causing excessive lock churn and memory heap buildup in pods
  • Loading branch information
hr2904 authored Dec 2, 2024
2 parents d395a6d + 5047e77 commit f6321f0
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ private Map<String, Object> runQueryWithLowLevelClient(String query) throws Atla
}
}

private Map<String, LinkedHashMap> runUpdateByQueryWithLowLevelClient(String query) throws AtlasBaseException {
try {
String responseString = performDirectUpdateByQuery(query);

Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);
return responseMap;

} catch (IOException e) {
LOG.error("Failed to execute direct query on ES {}", e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage());
}
}

private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("performAsyncDirectIndexQuery");
DirectIndexQueryResult result = null;
Expand Down Expand Up @@ -444,6 +457,30 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla
return EntityUtils.toString(response.getEntity());
}

private String performDirectUpdateByQuery(String query) throws AtlasBaseException, IOException {
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
String endPoint;

endPoint = index + "/_update_by_query";

Request request = new Request("POST", endPoint);
request.setEntity(entity);

Response response;
try {
response = lowLevelRestClient.performRequest(request);
} catch (ResponseException rex) {
if (rex.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.warn(String.format("ES index with name %s not found", index));
throw new AtlasBaseException(INDEX_NOT_FOUND, index);
} else {
throw new AtlasBaseException(String.format("Error in executing elastic query: %s", EntityUtils.toString(entity)), rex);
}
}

return EntityUtils.toString(response.getEntity());
}

private DirectIndexQueryResult getResultFromResponse(String responseString, boolean async) throws IOException {
Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);
return getResultFromResponse(responseMap.get("response"));
Expand Down Expand Up @@ -495,6 +532,10 @@ public Map<String, Object> directIndexQuery(String query) throws AtlasBaseExcept
return runQueryWithLowLevelClient(query);
}

public Map<String, LinkedHashMap> directUpdateByQuery(String query) throws AtlasBaseException {
return runUpdateByQueryWithLowLevelClient(query);
}

@Override
public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices() {
SearchRequest searchRequest = getSearchRequest(index, sourceBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,40 +87,52 @@ public void run() {
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
}
while (shouldRun.get()) {
LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing.");
TasksFetcher fetcher = new TasksFetcher(registry);
try {
LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK);
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS);
Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS);
continue;
}

TasksFetcher fetcher = new TasksFetcher(registry);

Thread tasksFetcherThread = new Thread(fetcher);
tasksFetcherThread.start();
tasksFetcherThread.join();
LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK);

List<AtlasTask> tasks = fetcher.getTasks();
LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0);

if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());

LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete.");
waitForTasksToComplete(latch);
LOG.info("TaskQueueWatcher: All tasks have been processed.");
} else {
LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}
fetcher.clearTasks();

LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval);
Thread.sleep(pollInterval);
} catch (InterruptedException interruptedException) {
LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart");
LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException);
break;
} catch (Exception e) {
LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e);
LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e);
} finally {
LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
fetcher.clearTasks();
LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher.");
}
}

LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false.");
}


private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException {
if (latch.getCount() != 0) {
LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount());
Expand All @@ -146,15 +158,14 @@ private void submitAll(List<AtlasTask> tasks, CountDownLatch latch) {
}
}

static class TasksFetcher implements Runnable {
static class TasksFetcher {
private TaskRegistry registry;
private List<AtlasTask> tasks = new ArrayList<>();

public TasksFetcher(TaskRegistry registry) {
this.registry = registry;
}

@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("TasksFetcher: Fetching tasks for queuing");
Expand All @@ -164,6 +175,7 @@ public void run() {
}

public List<AtlasTask> getTasks() {
run();
return tasks;
}

Expand Down
86 changes: 76 additions & 10 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.atlas.tasks;

import com.datastax.oss.driver.shaded.fasterxml.jackson.databind.ObjectMapper;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
Expand All @@ -31,10 +32,12 @@
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.janusgraph.util.encoding.LongEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -49,6 +52,7 @@
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -61,6 +65,7 @@ public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
public static final int TASK_FETCH_BATCH_SIZE = 100;
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));
public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index";

private AtlasGraph graph;
private TaskService taskService;
Expand Down Expand Up @@ -397,21 +402,24 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
int fetched = 0;
try {
if (totalFetched + size > queueSize) {
size = queueSize - totalFetched;
size = queueSize - totalFetched; // Adjust size to not exceed queue size
LOG.info("Adjusting fetch size to {} based on queue size constraint.", size);
}

dsl.put("from", from);
dsl.put("size", size);

LOG.info("DSL Query for iteration: {}", dsl);
indexSearchParams.setDsl(dsl);

LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size);
AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams);

try {
indexQueryResult = indexQuery.vertices(indexSearchParams);
LOG.info("Query executed successfully for from: {} with size: {}", from, size);
} catch (AtlasBaseException e) {
LOG.error("Failed to fetch pending/in-progress task vertices to re-que");
e.printStackTrace();
LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e);
break;
}

Expand All @@ -423,35 +431,93 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {

if (vertex != null) {
AtlasTask atlasTask = toAtlasTask(vertex);

LOG.info("Processing fetched task: {}", atlasTask);
if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) ||
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){
LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString()));
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) {
LOG.info("Adding task to the result list: {}", atlasTask);
ret.add(atlasTask);
}
else {
LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid()));
} else {
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
atlasTask.getGuid(), atlasTask.getStatus());
// Repair mismatched task
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
LOG.info("Repairing mismatched task with docId: {}", docId);
repairMismatchedTask(atlasTask, docId);
}
} else {
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched);
}

fetched++;
}
LOG.info("Fetched {} tasks in this iteration.", fetched);
} else {
LOG.warn("Index query result is null for from: {} and size: {}", from, size);
}

totalFetched += fetched;
LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size);

from += size;
if (fetched < size || totalFetched >= queueSize) {
LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize);
break;
}
} catch (Exception e){
} catch (Exception e) {
LOG.error("Exception occurred during task fetching process. Exiting loop.", e);
break;
}
}

LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched);
return ret;
}

private void repairMismatchedTask(AtlasTask atlasTask, String docId) {
AtlasElasticsearchQuery indexQuery = null;

try {
// Create a map for the fields to be updated
Map<String, Object> fieldsToUpdate = new HashMap<>();
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString());
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp

// Convert fieldsToUpdate map to JSON using Jackson
ObjectMapper objectMapper = new ObjectMapper();
String fieldsToUpdateJson = objectMapper.writeValueAsString(fieldsToUpdate);

// Construct the Elasticsearch update by query DSL
String queryDsl = "{"
+ "\"script\": {"
+ " \"source\": \"for (entry in params.fields.entrySet()) { ctx._source[entry.getKey()] = entry.getValue() }\","
+ " \"params\": {"
+ " \"fields\": " + fieldsToUpdateJson
+ " }"
+ "},"
+ "\"query\": {"
+ " \"term\": {"
+ " \"_id\": \"" + docId + "\""
+ " }"
+ "}"
+ "}";

// Execute the Elasticsearch query
indexQuery = (AtlasElasticsearchQuery) graph.elasticsearchQuery(JANUSGRAPH_VERTEX_INDEX);
Map<String, LinkedHashMap> result = indexQuery.directUpdateByQuery(queryDsl);

if (result != null) {
LOG.info("Elasticsearch UpdateByQuery Result: " + result + "\nfor task : " + atlasTask.getGuid());
} else {
LOG.info("No documents updated in Elasticsearch for guid: " + atlasTask.getGuid());
}
} catch (Exception e) {
e.printStackTrace();
}
}

public void commit() {
this.graph.commit();
}
Expand Down

0 comments on commit f6321f0

Please sign in to comment.