Skip to content

Commit

Permalink
Refatoring code for PR
Browse files Browse the repository at this point in the history
  • Loading branch information
hr2904 committed Dec 2, 2024
1 parent 340fd36 commit c88de8e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,61 +78,45 @@ public void shutdown() {
public void run() {
boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean();
if (isMaintenanceMode) {
LOG.info("TaskQueueWatcher: Maintenance mode is enabled. New tasks will not be loaded into the queue until next restart.");
LOG.info("TaskQueueWatcher: Maintenance mode is enabled, new tasks will not be loaded into the queue until next restart");
return;
}
shouldRun.set(true);

if (LOG.isDebugEnabled()) {
LOG.debug("TaskQueueWatcher: Starting thread {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
}
while (shouldRun.get()) {
LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing.");
TasksFetcher fetcher = new TasksFetcher(registry);
try {
LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK);
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS);
Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS);
continue;
}
LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK);

List<AtlasTask> tasks = fetcher.getTasks();
LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0);

if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size());
submitAll(tasks, latch);

LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete.");
LOG.info("Submitted {} tasks to the queue", tasks.size());
waitForTasksToComplete(latch);
LOG.info("TaskQueueWatcher: All tasks have been processed.");
} else {
LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}

LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval);
Thread.sleep(pollInterval);
} catch (InterruptedException interruptedException) {
LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException);
LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart");
break;
} catch (Exception e) {
LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e);
LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e);
} finally {
LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
fetcher.clearTasks();
LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher.");
}
}

LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false.");
}


private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException {
if (latch.getCount() != 0) {
LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount());
Expand Down
46 changes: 12 additions & 34 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,47 +385,38 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
DirectIndexQueryResult indexQueryResult = null;
List<AtlasTask> ret = new ArrayList<>();

int size = 1000; // Batch size for fetching tasks
int from = 0; // Pagination offset
int totalFetched = 0; // Tracks the total number of tasks fetched

LOG.info("Starting fetch of PENDING and IN_PROGRESS tasks. Queue size limit: {}", queueSize);
int size = 1000;
int from = 0;

IndexSearchParams indexSearchParams = new IndexSearchParams();

// Build query for PENDING and IN_PROGRESS tasks
List<Map<String, Object>> statusClauseList = new ArrayList<>();
List statusClauseList = new ArrayList();
statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString())));
statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.PENDING.toString())));

Map<String, Object> dsl = mapOf(
"query", mapOf("bool", mapOf("should", statusClauseList))
);
Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
dsl.put("size", size);

int totalFetched = 0;
while (true) {
int fetched = 0;
try {
if (totalFetched + size > queueSize) {
size = queueSize - totalFetched; // Adjust size to not exceed queue size
LOG.info("Adjusting fetch size to {} based on queue size constraint.", size);
size = queueSize - totalFetched;
}

dsl.put("from", from);
dsl.put("size", size);

LOG.debug("DSL Query for iteration: {}", dsl);
indexSearchParams.setDsl(dsl);

LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size);
AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams);

try {
indexQueryResult = indexQuery.vertices(indexSearchParams);
LOG.info("Query executed successfully for from: {} with size: {}", from, size);
} catch (AtlasBaseException e) {
LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e);
LOG.error("Failed to fetch pending/in-progress task vertices to re-que");
e.printStackTrace();
break;
}

Expand All @@ -437,47 +428,34 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {

if (vertex != null) {
AtlasTask atlasTask = toAtlasTask(vertex);

LOG.debug("Processing fetched task: {}", atlasTask);
if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) ||
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) {
LOG.info("Adding task to the result list: {}", atlasTask);
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){
LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString()));
ret.add(atlasTask);
} else {
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
atlasTask.getGuid(), atlasTask.getStatus());

// Repair mismatched task
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
LOG.info("Repairing mismatched task with docId: {}", docId);
repairMismatchedTask(atlasTask, docId);
}
} else {
LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched);
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
}

fetched++;
}
LOG.info("Fetched {} tasks in this iteration.", fetched);
} else {
LOG.warn("Index query result is null for from: {} and size: {}", from, size);
}

totalFetched += fetched;
LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size);

from += size;
if (fetched < size || totalFetched >= queueSize) {
LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize);
break;
}
} catch (Exception e) {
LOG.error("Exception occurred during task fetching process. Exiting loop.", e);
} catch (Exception e){
break;
}
}

LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched);
return ret;
}

Expand Down

0 comments on commit c88de8e

Please sign in to comment.