From c88de8e631e070573a94665f0cf085e8a2a963ad Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 2 Dec 2024 10:51:56 +0530 Subject: [PATCH] Refatoring code for PR --- .../apache/atlas/tasks/TaskQueueWatcher.java | 26 ++--------- .../org/apache/atlas/tasks/TaskRegistry.java | 46 +++++-------------- 2 files changed, 17 insertions(+), 55 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index f6165a639d..c212aad09e 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -78,61 +78,45 @@ public void shutdown() { public void run() { boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean(); if (isMaintenanceMode) { - LOG.info("TaskQueueWatcher: Maintenance mode is enabled. New tasks will not be loaded into the queue until next restart."); + LOG.info("TaskQueueWatcher: Maintenance mode is enabled, new tasks will not be loaded into the queue until next restart"); return; } shouldRun.set(true); if (LOG.isDebugEnabled()) { - LOG.debug("TaskQueueWatcher: Starting thread {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); + LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); } while (shouldRun.get()) { - LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing."); TasksFetcher fetcher = new TasksFetcher(registry); try { - LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { - LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK); List tasks = fetcher.getTasks(); - LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0); - if (CollectionUtils.isNotEmpty(tasks)) { final CountDownLatch latch = new CountDownLatch(tasks.size()); - LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size()); submitAll(tasks, latch); - - LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete."); + LOG.info("Submitted {} tasks to the queue", tasks.size()); waitForTasksToComplete(latch); - LOG.info("TaskQueueWatcher: All tasks have been processed."); } else { - LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } - - LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); + LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); break; } catch (Exception e) { - LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); + LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); } finally { - LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); fetcher.clearTasks(); - LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher."); } } - - LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false."); } - private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException { if (latch.getCount() != 0) { LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount()); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 8ddda944e5..d405a900c6 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -385,47 +385,38 @@ public List getTasksForReQueueIndexSearch() { DirectIndexQueryResult indexQueryResult = null; List ret = new ArrayList<>(); - int size = 1000; // Batch size for fetching tasks - int from = 0; // Pagination offset - int totalFetched = 0; // Tracks the total number of tasks fetched - - LOG.info("Starting fetch of PENDING and IN_PROGRESS tasks. Queue size limit: {}", queueSize); + int size = 1000; + int from = 0; IndexSearchParams indexSearchParams = new IndexSearchParams(); - // Build query for PENDING and IN_PROGRESS tasks - List> statusClauseList = new ArrayList<>(); + List statusClauseList = new ArrayList(); statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString()))); statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.PENDING.toString()))); - Map dsl = mapOf( - "query", mapOf("bool", mapOf("should", statusClauseList)) - ); + Map dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList))); dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")))); dsl.put("size", size); - + int totalFetched = 0; while (true) { int fetched = 0; try { if (totalFetched + size > queueSize) { - size = queueSize - totalFetched; // Adjust size to not exceed queue size - LOG.info("Adjusting fetch size to {} based on queue size constraint.", size); + size = queueSize - totalFetched; } dsl.put("from", from); dsl.put("size", size); - LOG.debug("DSL Query for iteration: {}", dsl); indexSearchParams.setDsl(dsl); - LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size); AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams); try { indexQueryResult = indexQuery.vertices(indexSearchParams); - LOG.info("Query executed successfully for from: {} with size: {}", from, size); } catch (AtlasBaseException e) { - LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e); + LOG.error("Failed to fetch pending/in-progress task vertices to re-que"); + e.printStackTrace(); break; } @@ -437,47 +428,34 @@ public List getTasksForReQueueIndexSearch() { if (vertex != null) { AtlasTask atlasTask = toAtlasTask(vertex); - - LOG.debug("Processing fetched task: {}", atlasTask); if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) || - atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) { - LOG.info("Adding task to the result list: {}", atlasTask); + atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){ + LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString())); ret.add(atlasTask); } else { LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}", atlasTask.getGuid(), atlasTask.getStatus()); - - // Repair mismatched task String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay())); - LOG.info("Repairing mismatched task with docId: {}", docId); repairMismatchedTask(atlasTask, docId); } } else { - LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched); + LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); } fetched++; } - LOG.info("Fetched {} tasks in this iteration.", fetched); - } else { - LOG.warn("Index query result is null for from: {} and size: {}", from, size); } totalFetched += fetched; - LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size); - from += size; if (fetched < size || totalFetched >= queueSize) { - LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize); break; } - } catch (Exception e) { - LOG.error("Exception occurred during task fetching process. Exiting loop.", e); + } catch (Exception e){ break; } } - LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched); return ret; }