Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test #45

Open
wants to merge 6 commits into
base: lt_test_load_gowtham
Choose a base branch
from
Open

test #45

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,14 @@ public void updateTask(TaskModel task) {
}
}

@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
TaskModel task = tasks.get(0);
LOGGER.debug(
"Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask",
task.getTaskDefName(), task.getWorkflowInstanceId(), task.getTaskId(), task.getTaskType(), task.getStatus().name());
}

/**
* @method to verify the task status and update the task_in_progress table
* also removes if its a terminal task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ public void updateTask(TaskModel taskModel) {
}

public void updateTasks(List<TaskModel> tasks) {
tasks.forEach(this::updateTask);
executionDAO.updateTasksInBatch(tasks);
// tasks.forEach(this::updateTask);
}

public void removeTask(String taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public interface ExecutionDAO {
*/
void updateTask(TaskModel task);


/**
* @param tasks Task to be updated
*/
void updateTasksInBatch(List<TaskModel> tasks);

/**
* Checks if the number of tasks in progress for the given taskDef will exceed the limit if the
* task is scheduled to be in progress (given to the worker or for system tasks start() method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ public void updateTask(TaskModel task) {
}
}

@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
TaskModel task = tasks.get(0);
LOGGER.debug(
"Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask",
nsKey(IN_PROGRESS_TASKS, task.getTaskDefName()),
task.getWorkflowInstanceId(),
task.getTaskId(),
task.getTaskType(),
task.getStatus().name());
}

@Override
public boolean exceedsLimit(TaskModel task) {
Optional<TaskDef> taskDefinition = task.getTaskDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -310,22 +311,18 @@ public List<TaskModel> createTasks(List<TaskModel> tasks) {
* @method to add the task_in_progress table with the status of the task if task is not already present
*/
public void addTaskInProgress(TaskModel task) {
ResultSet resultSet =
session.execute(
selectTaskInProgressStatement.bind(task.getTaskDefName(),
UUID.fromString(task.getTaskId())));
if (resultSet.all().isEmpty() || resultSet.all().size()<1) {
session.execute(
insertTaskInProgressStatement.bind(task.getTaskDefName(),
UUID.fromString(task.getTaskId()),
UUID.fromString(task.getWorkflowInstanceId()),
true));
ResultSet resultSet = session.execute(
QueryBuilder.insertInto(properties.getKeyspace(), TABLE_TASK_IN_PROGRESS)
.value(TASK_DEF_NAME_KEY, task.getTaskDefName())
.value(TASK_ID_KEY, UUID.fromString(task.getTaskId()))
.value(WORKFLOW_ID_KEY, UUID.fromString(task.getWorkflowInstanceId()))
.value(TASK_IN_PROG_STATUS_KEY, true)
.ifNotExists() // Ensures this is a lightweight transaction
.getQueryString()
);
if (!resultSet.wasApplied()) {
LOGGER.info("Task with defName {} and Id {} already exists, insert skipped.", task.getTaskDefName(), task.getTaskId());
}
else {
LOGGER.info("Task with defName {} and Id {} and status {} in addTaskInProgress NOT inserted as already exists "
,task.getTaskDefName(), task.getTaskId(),task.getStatus());
}

}

/**
Expand Down Expand Up @@ -382,6 +379,51 @@ public void updateTask(TaskModel task) {
}
}


@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
if (tasks == null || tasks.isEmpty()) {
return; // No tasks to process
}

long startBatch = System.currentTimeMillis();

try {
// Acquire a global lock for batch update if needed
// Create a batch statement to execute multiple updates in one call
BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED);

// Loop through the list of tasks and prepare batch updates
for (TaskModel task : tasks) {
Integer correlationId = Objects.isNull(task.getCorrelationId()) ? 0 : Integer.parseInt(task.getCorrelationId());
String taskPayload = toJson(task);

// Add the update statement for each task to the batch
batchStatement.add(
insertTaskStatement.bind(
UUID.fromString(task.getWorkflowInstanceId()),
correlationId,
task.getTaskId(),
taskPayload
)
);
}

// Execute the batch of updates
long tstart = System.currentTimeMillis();
session.execute(batchStatement);
LOGGER.info("Batch execution of task updates completed in {} ms for {} tasks.",
(System.currentTimeMillis() - tstart), tasks.size());
} catch (DriverException e) {
Monitors.error(CLASS_NAME, "updateTaskInBatch");
String errorMsg = String.format("Error updating batch of tasks. Size: %d", tasks.size());
LOGGER.error(errorMsg, e);
throw new TransientException(errorMsg, e);
}
LOGGER.info("[Conductor] [ScyllaExecutionDAO] Batch updateTask Time taken for {} tasks: {} ms",
tasks.size(), (System.currentTimeMillis() - startBatch));
}

/**
* @method to verify the task status and update the task_in_progress table
* also removes if its a terminal task
Expand Down