Skip to content

Commit

Permalink
Added more logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
hr2904 committed Dec 2, 2024
1 parent 0da6164 commit 5047e77
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,36 +87,52 @@ public void run() {
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
}
while (shouldRun.get()) {
LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing.");
TasksFetcher fetcher = new TasksFetcher(registry);
try {
LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK);
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS);
Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS);
continue;
}
LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK);

List<AtlasTask> tasks = fetcher.getTasks();
LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0);

if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());

LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete.");
waitForTasksToComplete(latch);
LOG.info("TaskQueueWatcher: All tasks have been processed.");
} else {
LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}

LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval);
Thread.sleep(pollInterval);
} catch (InterruptedException interruptedException) {
LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart");
LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException);
break;
} catch (Exception e) {
LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e);
LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e);
} finally {
LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
fetcher.clearTasks();
LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher.");
}
}

LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false.");
}


private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException {
if (latch.getCount() != 0) {
LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount());
Expand Down
31 changes: 23 additions & 8 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,21 +402,24 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
int fetched = 0;
try {
if (totalFetched + size > queueSize) {
size = queueSize - totalFetched;
size = queueSize - totalFetched; // Adjust size to not exceed queue size
LOG.info("Adjusting fetch size to {} based on queue size constraint.", size);
}

dsl.put("from", from);
dsl.put("size", size);

LOG.info("DSL Query for iteration: {}", dsl);
indexSearchParams.setDsl(dsl);

LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size);
AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams);

try {
indexQueryResult = indexQuery.vertices(indexSearchParams);
LOG.info("Query executed successfully for from: {} with size: {}", from, size);
} catch (AtlasBaseException e) {
LOG.error("Failed to fetch pending/in-progress task vertices to re-que");
e.printStackTrace();
LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e);
break;
}

Expand All @@ -428,34 +431,46 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {

if (vertex != null) {
AtlasTask atlasTask = toAtlasTask(vertex);

LOG.info("Processing fetched task: {}", atlasTask);
if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) ||
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){
LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString()));
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) {
LOG.info("Adding task to the result list: {}", atlasTask);
ret.add(atlasTask);
} else {
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
atlasTask.getGuid(), atlasTask.getStatus());
// Repair mismatched task
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
LOG.info("Repairing mismatched task with docId: {}", docId);
repairMismatchedTask(atlasTask, docId);
}
} else {
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched);
}

fetched++;
}
LOG.info("Fetched {} tasks in this iteration.", fetched);
} else {
LOG.warn("Index query result is null for from: {} and size: {}", from, size);
}

totalFetched += fetched;
LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size);

from += size;
if (fetched < size || totalFetched >= queueSize) {
LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize);
break;
}
} catch (Exception e){
} catch (Exception e) {
LOG.error("Exception occurred during task fetching process. Exiting loop.", e);
break;
}
}

LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched);
return ret;
}

Expand Down Expand Up @@ -494,7 +509,7 @@ private void repairMismatchedTask(AtlasTask atlasTask, String docId) {
Map<String, LinkedHashMap> result = indexQuery.directUpdateByQuery(queryDsl);

if (result != null) {
LOG.info("Elasticsearch UpdateByQuery Result: " + result);
LOG.info("Elasticsearch UpdateByQuery Result: " + result + "\nfor task : " + atlasTask.getGuid());
} else {
LOG.info("No documents updated in Elasticsearch for guid: " + atlasTask.getGuid());
}
Expand Down

0 comments on commit 5047e77

Please sign in to comment.