diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java index 4d72ad495f..76a64ebd56 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java @@ -361,6 +361,14 @@ public void updateTask(TaskModel task) { } } + @Override + public void updateTasksInBatch(List tasks) { + TaskModel task = tasks.get(0); + LOGGER.debug( + "Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask", + task.getTaskDefName(), task.getWorkflowInstanceId(), task.getTaskId(), task.getTaskType(), task.getStatus().name()); + } + /** * @method to verify the task status and update the task_in_progress table * also removes if its a terminal task diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index d881500058..077857e709 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -527,7 +527,8 @@ public void updateTask(TaskModel taskModel) { } public void updateTasks(List tasks) { - tasks.forEach(this::updateTask); + executionDAO.updateTasksInBatch(tasks); +// tasks.forEach(this::updateTask); } public void removeTask(String taskId) { diff --git a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java index 8e33cac29b..181beea6ec 100644 --- a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java @@ -52,6 +52,12 @@ public interface ExecutionDAO { */ void updateTask(TaskModel task); + + /** + * @param tasks Task to be updated + */ + void updateTasksInBatch(List tasks); + /** * Checks if the number of tasks in progress for the given taskDef will exceed the limit if the * task is scheduled to be in progress (given to the worker or for system tasks start() method diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java index 33902640d4..8496fa3b78 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java @@ -260,6 +260,18 @@ public void updateTask(TaskModel task) { } } + @Override + public void updateTasksInBatch(List tasks) { + TaskModel task = tasks.get(0); + LOGGER.debug( + "Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask", + nsKey(IN_PROGRESS_TASKS, task.getTaskDefName()), + task.getWorkflowInstanceId(), + task.getTaskId(), + task.getTaskType(), + task.getStatus().name()); + } + @Override public boolean exceedsLimit(TaskModel task) { Optional taskDefinition = task.getTaskDefinition(); diff --git a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java index 0e85dd54b6..77c83f3102 100644 --- a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java +++ b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java @@ -14,6 +14,7 @@ import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.querybuilder.QueryBuilder; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -310,22 +311,18 @@ public List createTasks(List tasks) { * @method to add the task_in_progress table with the status of the task if task is not already present */ public void addTaskInProgress(TaskModel task) { - ResultSet resultSet = - session.execute( - selectTaskInProgressStatement.bind(task.getTaskDefName(), - UUID.fromString(task.getTaskId()))); - if (resultSet.all().isEmpty() || resultSet.all().size()<1) { - session.execute( - insertTaskInProgressStatement.bind(task.getTaskDefName(), - UUID.fromString(task.getTaskId()), - UUID.fromString(task.getWorkflowInstanceId()), - true)); + ResultSet resultSet = session.execute( + QueryBuilder.insertInto(properties.getKeyspace(), TABLE_TASK_IN_PROGRESS) + .value(TASK_DEF_NAME_KEY, task.getTaskDefName()) + .value(TASK_ID_KEY, UUID.fromString(task.getTaskId())) + .value(WORKFLOW_ID_KEY, UUID.fromString(task.getWorkflowInstanceId())) + .value(TASK_IN_PROG_STATUS_KEY, true) + .ifNotExists() // Ensures this is a lightweight transaction + .getQueryString() + ); + if (!resultSet.wasApplied()) { + LOGGER.info("Task with defName {} and Id {} already exists, insert skipped.", task.getTaskDefName(), task.getTaskId()); } - else { - LOGGER.info("Task with defName {} and Id {} and status {} in addTaskInProgress NOT inserted as already exists " - ,task.getTaskDefName(), task.getTaskId(),task.getStatus()); - } - } /** @@ -382,6 +379,51 @@ public void updateTask(TaskModel task) { } } + + @Override + public void updateTasksInBatch(List tasks) { + if (tasks == null || tasks.isEmpty()) { + return; // No tasks to process + } + + long startBatch = System.currentTimeMillis(); + + try { + // Acquire a global lock for batch update if needed + // Create a batch statement to execute multiple updates in one call + BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED); + + // Loop through the list of tasks and prepare batch updates + for (TaskModel task : tasks) { + Integer correlationId = Objects.isNull(task.getCorrelationId()) ? 0 : Integer.parseInt(task.getCorrelationId()); + String taskPayload = toJson(task); + + // Add the update statement for each task to the batch + batchStatement.add( + insertTaskStatement.bind( + UUID.fromString(task.getWorkflowInstanceId()), + correlationId, + task.getTaskId(), + taskPayload + ) + ); + } + + // Execute the batch of updates + long tstart = System.currentTimeMillis(); + session.execute(batchStatement); + LOGGER.info("Batch execution of task updates completed in {} ms for {} tasks.", + (System.currentTimeMillis() - tstart), tasks.size()); + } catch (DriverException e) { + Monitors.error(CLASS_NAME, "updateTaskInBatch"); + String errorMsg = String.format("Error updating batch of tasks. Size: %d", tasks.size()); + LOGGER.error(errorMsg, e); + throw new TransientException(errorMsg, e); + } + LOGGER.info("[Conductor] [ScyllaExecutionDAO] Batch updateTask Time taken for {} tasks: {} ms", + tasks.size(), (System.currentTimeMillis() - startBatch)); + } + /** * @method to verify the task status and update the task_in_progress table * also removes if its a terminal task