From b7f36435a66859fde62186aad1499f0882be4b23 Mon Sep 17 00:00:00 2001 From: kartik2k Date: Thu, 3 Oct 2024 20:45:50 +0530 Subject: [PATCH 1/4] updated --- .../java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java index c17f50d4a3..0b11c9dcc9 100644 --- a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java +++ b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java @@ -206,7 +206,7 @@ private String getCreateTaskInProgressTableStatement() { return SchemaBuilder.createTable(properties.getKeyspace(), TABLE_TASK_IN_PROGRESS) .ifNotExists() .addPartitionKey(TASK_DEF_NAME_KEY, DataType.text()) - .addClusteringColumn(TASK_ID_KEY, DataType.uuid()) + .addPartitionKey(TASK_ID_KEY, DataType.uuid()) .addColumn(WORKFLOW_ID_KEY, DataType.uuid()) .addColumn(TASK_IN_PROG_STATUS_KEY, DataType.cboolean()) .getQueryString(); From e3da263725abe2708d30c26891e1dcd127a4107a Mon Sep 17 00:00:00 2001 From: kartik2k Date: Fri, 4 Oct 2024 00:23:42 +0530 Subject: [PATCH 2/4] updated --- .../cassandra/dao/CassandraExecutionDAO.java | 8 ++++ .../core/dal/ExecutionDAOFacade.java | 3 +- .../netflix/conductor/dao/ExecutionDAO.java | 6 +++ .../conductor/scylla/dao/ScyllaBaseDAO.java | 2 +- .../scylla/dao/ScyllaExecutionDAO.java | 45 +++++++++++++++++++ 5 files changed, 62 insertions(+), 2 deletions(-) diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java index 4d72ad495f..76a64ebd56 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java @@ -361,6 +361,14 @@ public void updateTask(TaskModel task) { } } + @Override + public void updateTasksInBatch(List tasks) { + TaskModel task = tasks.get(0); + LOGGER.debug( + "Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask", + task.getTaskDefName(), task.getWorkflowInstanceId(), task.getTaskId(), task.getTaskType(), task.getStatus().name()); + } + /** * @method to verify the task status and update the task_in_progress table * also removes if its a terminal task diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index d881500058..077857e709 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -527,7 +527,8 @@ public void updateTask(TaskModel taskModel) { } public void updateTasks(List tasks) { - tasks.forEach(this::updateTask); + executionDAO.updateTasksInBatch(tasks); +// tasks.forEach(this::updateTask); } public void removeTask(String taskId) { diff --git a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java index 8e33cac29b..181beea6ec 100644 --- a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java @@ -52,6 +52,12 @@ public interface ExecutionDAO { */ void updateTask(TaskModel task); + + /** + * @param tasks Task to be updated + */ + void updateTasksInBatch(List tasks); + /** * Checks if the number of tasks in progress for the given taskDef will exceed the limit if the * task is scheduled to be in progress (given to the worker or for system tasks start() method diff --git a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java index 0b11c9dcc9..c17f50d4a3 100644 --- a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java +++ b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaBaseDAO.java @@ -206,7 +206,7 @@ private String getCreateTaskInProgressTableStatement() { return SchemaBuilder.createTable(properties.getKeyspace(), TABLE_TASK_IN_PROGRESS) .ifNotExists() .addPartitionKey(TASK_DEF_NAME_KEY, DataType.text()) - .addPartitionKey(TASK_ID_KEY, DataType.uuid()) + .addClusteringColumn(TASK_ID_KEY, DataType.uuid()) .addColumn(WORKFLOW_ID_KEY, DataType.uuid()) .addColumn(TASK_IN_PROG_STATUS_KEY, DataType.cboolean()) .getQueryString(); diff --git a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java index 0e85dd54b6..a8a6447243 100644 --- a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java +++ b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java @@ -382,6 +382,51 @@ public void updateTask(TaskModel task) { } } + + @Override + public void updateTasksInBatch(List tasks) { + if (tasks == null || tasks.isEmpty()) { + return; // No tasks to process + } + + long startBatch = System.currentTimeMillis(); + + try { + // Acquire a global lock for batch update if needed + // Create a batch statement to execute multiple updates in one call + BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED); + + // Loop through the list of tasks and prepare batch updates + for (TaskModel task : tasks) { + Integer correlationId = Objects.isNull(task.getCorrelationId()) ? 0 : Integer.parseInt(task.getCorrelationId()); + String taskPayload = toJson(task); + + // Add the update statement for each task to the batch + batchStatement.add( + insertTaskStatement.bind( + UUID.fromString(task.getWorkflowInstanceId()), + correlationId, + task.getTaskId(), + taskPayload + ) + ); + } + + // Execute the batch of updates + long tstart = System.currentTimeMillis(); + session.execute(batchStatement); + LOGGER.info("Batch execution of task updates completed in {} ms for {} tasks.", + (System.currentTimeMillis() - tstart), tasks.size()); + } catch (DriverException e) { + Monitors.error(CLASS_NAME, "updateTaskInBatch"); + String errorMsg = String.format("Error updating batch of tasks. Size: %d", tasks.size()); + LOGGER.error(errorMsg, e); + throw new TransientException(errorMsg, e); + } + LOGGER.info("[Conductor] [ScyllaExecutionDAO] Batch updateTask Time taken for {} tasks: {} ms", + tasks.size(), (System.currentTimeMillis() - startBatch)); + } + /** * @method to verify the task status and update the task_in_progress table * also removes if its a terminal task From 263298a6ab1fb899b870f3d9fe27a0b89de69938 Mon Sep 17 00:00:00 2001 From: kartik2k Date: Fri, 4 Oct 2024 00:25:46 +0530 Subject: [PATCH 3/4] updated --- .../conductor/redis/dao/RedisExecutionDAO.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java index 33902640d4..8496fa3b78 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java @@ -260,6 +260,18 @@ public void updateTask(TaskModel task) { } } + @Override + public void updateTasksInBatch(List tasks) { + TaskModel task = tasks.get(0); + LOGGER.debug( + "Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask", + nsKey(IN_PROGRESS_TASKS, task.getTaskDefName()), + task.getWorkflowInstanceId(), + task.getTaskId(), + task.getTaskType(), + task.getStatus().name()); + } + @Override public boolean exceedsLimit(TaskModel task) { Optional taskDefinition = task.getTaskDefinition(); From 256f32447de8bbe68ad060476874cab82071f488 Mon Sep 17 00:00:00 2001 From: kartik2k Date: Fri, 4 Oct 2024 00:31:52 +0530 Subject: [PATCH 4/4] add task in progresss query optimise --- .../scylla/dao/ScyllaExecutionDAO.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java index a8a6447243..77c83f3102 100644 --- a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java +++ b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java @@ -14,6 +14,7 @@ import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.querybuilder.QueryBuilder; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -310,22 +311,18 @@ public List createTasks(List tasks) { * @method to add the task_in_progress table with the status of the task if task is not already present */ public void addTaskInProgress(TaskModel task) { - ResultSet resultSet = - session.execute( - selectTaskInProgressStatement.bind(task.getTaskDefName(), - UUID.fromString(task.getTaskId()))); - if (resultSet.all().isEmpty() || resultSet.all().size()<1) { - session.execute( - insertTaskInProgressStatement.bind(task.getTaskDefName(), - UUID.fromString(task.getTaskId()), - UUID.fromString(task.getWorkflowInstanceId()), - true)); + ResultSet resultSet = session.execute( + QueryBuilder.insertInto(properties.getKeyspace(), TABLE_TASK_IN_PROGRESS) + .value(TASK_DEF_NAME_KEY, task.getTaskDefName()) + .value(TASK_ID_KEY, UUID.fromString(task.getTaskId())) + .value(WORKFLOW_ID_KEY, UUID.fromString(task.getWorkflowInstanceId())) + .value(TASK_IN_PROG_STATUS_KEY, true) + .ifNotExists() // Ensures this is a lightweight transaction + .getQueryString() + ); + if (!resultSet.wasApplied()) { + LOGGER.info("Task with defName {} and Id {} already exists, insert skipped.", task.getTaskDefName(), task.getTaskId()); } - else { - LOGGER.info("Task with defName {} and Id {} and status {} in addTaskInProgress NOT inserted as already exists " - ,task.getTaskDefName(), task.getTaskId(),task.getStatus()); - } - } /**