diff --git a/atlasdb-workload-server-api/src/main/java/com/palantir/atlasdb/workload/workflow/TableConfiguration.java b/atlasdb-workload-server-api/src/main/java/com/palantir/atlasdb/workload/workflow/TableConfiguration.java new file mode 100644 index 00000000000..f3b4e04f4d4 --- /dev/null +++ b/atlasdb-workload-server-api/src/main/java/com/palantir/atlasdb/workload/workflow/TableConfiguration.java @@ -0,0 +1,31 @@ +/* + * (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.workload.workflow; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.palantir.atlasdb.workload.store.IsolationLevel; +import org.immutables.value.Value; + +@Value.Immutable +@JsonSerialize(as = ImmutableTableConfiguration.class) +@JsonDeserialize(as = ImmutableTableConfiguration.class) +public interface TableConfiguration { + String tableName(); + + IsolationLevel isolationLevel(); +} diff --git a/atlasdb-workload-server-api/src/main/java/com/palantir/atlasdb/workload/workflow/WorkflowConfiguration.java b/atlasdb-workload-server-api/src/main/java/com/palantir/atlasdb/workload/workflow/WorkflowConfiguration.java index 576b587f50e..a62866eedfc 100644 --- a/atlasdb-workload-server-api/src/main/java/com/palantir/atlasdb/workload/workflow/WorkflowConfiguration.java +++ b/atlasdb-workload-server-api/src/main/java/com/palantir/atlasdb/workload/workflow/WorkflowConfiguration.java @@ -16,15 +16,9 @@ package com.palantir.atlasdb.workload.workflow; -import com.google.common.util.concurrent.ListeningExecutorService; -import org.immutables.value.Value; +import com.fasterxml.jackson.annotation.JsonTypeInfo; -@Value.Immutable +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") public interface WorkflowConfiguration { int iterationCount(); - - /** - * Executor used to execute workload threads (i.e., transactions). - */ - ListeningExecutorService executionExecutor(); } diff --git a/atlasdb-workload-server/build.gradle b/atlasdb-workload-server/build.gradle index 79750fff000..8f30fb237a5 100644 --- a/atlasdb-workload-server/build.gradle +++ b/atlasdb-workload-server/build.gradle @@ -17,6 +17,7 @@ dependencies { testImplementation project(':commons-executors') testImplementation 'org.mockito:mockito-core' + testImplementation 'com.palantir.conjure.java.runtime:conjure-java-jackson-serialization' testImplementation 'com.palantir.safe-logging:preconditions-assertj' testImplementation('org.jmock:jmock') { exclude group: 'org.hamcrest' diff --git a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/util/AtlasDbUtils.java b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/util/AtlasDbUtils.java index 94a22213ba8..bf29ecb36d3 100644 --- a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/util/AtlasDbUtils.java +++ b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/util/AtlasDbUtils.java @@ -24,7 +24,10 @@ import com.palantir.atlasdb.table.description.NameMetadataDescription; import com.palantir.atlasdb.table.description.TableMetadata; import com.palantir.atlasdb.transaction.api.ConflictHandler; +import com.palantir.atlasdb.workload.store.IsolationLevel; import com.palantir.atlasdb.workload.store.WorkloadCell; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; public final class AtlasDbUtils { @@ -50,6 +53,20 @@ public static byte[] tableMetadata(ConflictHandler conflictHandler) { .persistToBytes(); } + public static byte[] tableMetadata(IsolationLevel isolationLevel) { + switch (isolationLevel) { + case SERIALIZABLE: + return tableMetadata(ConflictHandler.SERIALIZABLE); + case SNAPSHOT: + return tableMetadata(ConflictHandler.RETRY_ON_WRITE_WRITE); + case NONE: + return tableMetadata(ConflictHandler.IGNORE_ALL); + default: + throw new SafeIllegalStateException( + "Unknown isolation level", SafeArg.of("isolationLevel", isolationLevel)); + } + } + public static byte[] indexMetadata(ConflictHandler baseTableConflictHandler) { ConflictHandler conflictHandler = baseTableConflictHandler.checkReadWriteConflicts() ? ConflictHandler.SERIALIZABLE_INDEX diff --git a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflow.java b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflow.java index 66ec2c1b9a5..cdafd0cea6d 100644 --- a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflow.java +++ b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflow.java @@ -17,6 +17,7 @@ package com.palantir.atlasdb.workload.workflow; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListeningExecutorService; import com.palantir.atlasdb.workload.store.ReadOnlyTransactionStore; import com.palantir.atlasdb.workload.store.TransactionStore; import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransaction; @@ -45,9 +46,12 @@ private DefaultWorkflow( } public static Workflow create( - TransactionStore store, KeyedTransactionTask transactionTask, WorkflowConfiguration configuration) { + TransactionStore store, + KeyedTransactionTask transactionTask, + WorkflowConfiguration configuration, + ListeningExecutorService executionExecutor) { return new DefaultWorkflow( - new ConcurrentTransactionRunner(store, configuration.executionExecutor()), + new ConcurrentTransactionRunner(store, executionExecutor), transactionTask, configuration, new ReadOnlyTransactionStore(store)); diff --git a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowConfiguration.java b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowConfiguration.java new file mode 100644 index 00000000000..05d4f3bfef3 --- /dev/null +++ b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowConfiguration.java @@ -0,0 +1,43 @@ +/* + * (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.workload.workflow; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.util.concurrent.RateLimiter; +import org.immutables.value.Value; + +@Value.Immutable +@JsonSerialize(as = ImmutableSingleRowTwoCellsWorkflowConfiguration.class) +@JsonDeserialize(as = ImmutableSingleRowTwoCellsWorkflowConfiguration.class) +@JsonTypeName(SingleRowTwoCellsWorkflowConfiguration.TYPE) +public interface SingleRowTwoCellsWorkflowConfiguration extends WorkflowConfiguration { + String TYPE = "SINGLE_ROW_TWO_CELLS"; + + TableConfiguration tableConfiguration(); + + @Value.Default + default double rateLimit() { + return 100; + } + + @Value.Lazy + default RateLimiter transactionRateLimiter() { + return RateLimiter.create(rateLimit()); + } +} diff --git a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflows.java b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflows.java new file mode 100644 index 00000000000..a051b975092 --- /dev/null +++ b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflows.java @@ -0,0 +1,103 @@ +/* + * (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.workload.workflow; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.palantir.atlasdb.workload.store.ImmutableWorkloadCell; +import com.palantir.atlasdb.workload.store.TransactionStore; +import com.palantir.atlasdb.workload.store.WorkloadCell; +import com.palantir.atlasdb.workload.transaction.DeleteTransactionAction; +import com.palantir.atlasdb.workload.transaction.ReadTransactionAction; +import com.palantir.atlasdb.workload.transaction.TransactionAction; +import com.palantir.atlasdb.workload.transaction.WriteTransactionAction; +import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransaction; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Consider a single row in a database that has two cells, which should between them maintain the invariant that + * at most one cell has a value (this workflow is extremely common in AtlasDB Proxy). This workflow maintains + * concurrent transactions that read the row (both cells), and then sends an update to one of the cells and attempts + * to delete the other. + */ +public final class SingleRowTwoCellsWorkflows { + private static final int SINGLE_ROW = 1; + private static final int FIRST_COLUMN = 1; + private static final int SECOND_COLUMN = 2; + + @VisibleForTesting + static final WorkloadCell FIRST_CELL = ImmutableWorkloadCell.of(SINGLE_ROW, FIRST_COLUMN); + + @VisibleForTesting + static final WorkloadCell SECOND_CELL = ImmutableWorkloadCell.of(SINGLE_ROW, SECOND_COLUMN); + + private SingleRowTwoCellsWorkflows() { + // static factory + } + + public static Workflow createSingleRowTwoCell( + TransactionStore store, + SingleRowTwoCellsWorkflowConfiguration singleRowTwoCellsWorkflowConfiguration, + ListeningExecutorService executionExecutor) { + return DefaultWorkflow.create( + store, + (txnStore, index) -> run(txnStore, index, singleRowTwoCellsWorkflowConfiguration), + singleRowTwoCellsWorkflowConfiguration, + executionExecutor); + } + + private static Optional run( + TransactionStore store, int taskIndex, SingleRowTwoCellsWorkflowConfiguration workflowConfiguration) { + workflowConfiguration.transactionRateLimiter().acquire(); + + List transactionActions = createTransactionActions( + taskIndex, workflowConfiguration.tableConfiguration().tableName()); + return store.readWrite(transactionActions); + } + + @VisibleForTesting + static List createTransactionActions(int taskIndex, String tableName) { + List cellReads = createCellReadActions(tableName); + List cellUpdates = createCellUpdateActions(taskIndex, tableName); + return Streams.concat(cellReads.stream(), cellUpdates.stream(), cellReads.stream()) + .collect(Collectors.toList()); + } + + @VisibleForTesting + static boolean shouldWriteToFirstCell(int taskIndex) { + return taskIndex % 2 == 0; + } + + private static List createCellUpdateActions(int taskIndex, String tableName) { + return shouldWriteToFirstCell(taskIndex) + ? ImmutableList.of( + WriteTransactionAction.of(tableName, FIRST_CELL, taskIndex), + DeleteTransactionAction.of(tableName, SECOND_CELL)) + : ImmutableList.of( + DeleteTransactionAction.of(tableName, FIRST_CELL), + WriteTransactionAction.of(tableName, SECOND_CELL, taskIndex)); + } + + private static List createCellReadActions(String tableName) { + return ImmutableList.of( + ReadTransactionAction.of(tableName, FIRST_CELL), ReadTransactionAction.of(tableName, SECOND_CELL)); + } +} diff --git a/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflowTest.java b/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflowTest.java index b1d358eeb4d..74c98c744a2 100644 --- a/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflowTest.java +++ b/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/DefaultWorkflowTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.palantir.atlasdb.workload.store.TransactionStore; import com.palantir.atlasdb.workload.transaction.witnessed.ImmutableWitnessedTransaction; @@ -38,13 +39,15 @@ public class DefaultWorkflowTest { private final KeyedTransactionTask transactionTask = mock(KeyedTransactionTask.class); private final ScheduledExecutorService scheduler = PTExecutors.newSingleThreadScheduledExecutor(); + private final ListeningExecutorService executionExecutor = MoreExecutors.listeningDecorator(scheduler); @Test public void handlesExceptionsInUnderlyingTasks() { RuntimeException transactionException = new RuntimeException("boo"); when(transactionTask.apply(eq(store), anyInt())).thenThrow(transactionException); - Workflow workflow = DefaultWorkflow.create(store, transactionTask, createWorkflowConfiguration(2)); + Workflow workflow = + DefaultWorkflow.create(store, transactionTask, createWorkflowConfiguration(2), executionExecutor); assertThatThrownBy(workflow::run) .hasMessage("Error when running workflow task") .hasCause(transactionException); @@ -99,10 +102,12 @@ public void sortsReadAndWriteTransactionsByEffectiveTimestamp() { } private WorkflowConfiguration createWorkflowConfiguration(int iterationCount) { - return ImmutableWorkflowConfiguration.builder() - .iterationCount(iterationCount) - .executionExecutor(MoreExecutors.listeningDecorator(scheduler)) - .build(); + return new WorkflowConfiguration() { + @Override + public int iterationCount() { + return iterationCount; + } + }; } private static WitnessedTransaction createReadOnlyWitnessedTransactionWithoutActions(long start) { diff --git a/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowConfigurationTest.java b/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowConfigurationTest.java new file mode 100644 index 00000000000..08764ef8040 --- /dev/null +++ b/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowConfigurationTest.java @@ -0,0 +1,72 @@ +/* + * (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.workload.workflow; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.RateLimiter; +import com.palantir.atlasdb.workload.store.IsolationLevel; +import com.palantir.atlasdb.workload.transaction.WorkloadTestHelpers; +import com.palantir.conjure.java.serialization.ObjectMappers; +import org.junit.Test; + +public class SingleRowTwoCellsWorkflowConfigurationTest { + private static final TableConfiguration TABLE_CONFIGURATION = ImmutableTableConfiguration.builder() + .tableName(WorkloadTestHelpers.TABLE) + .isolationLevel(IsolationLevel.SERIALIZABLE) + .build(); + private static final ObjectMapper OBJECT_MAPPER = ObjectMappers.newServerObjectMapper(); + + static { + OBJECT_MAPPER.registerSubtypes(SingleRowTwoCellsWorkflowConfiguration.class); + } + + @Test + public void rateLimiterCreatedWithConfiguredNumberOfPermits() { + SingleRowTwoCellsWorkflowConfiguration configuration = createWithRateLimit(Double.MIN_NORMAL); + RateLimiter rateLimiter = configuration.transactionRateLimiter(); + assertThat(rateLimiter.tryAcquire()).isTrue(); + assertThat(rateLimiter.tryAcquire()) + .as("the rate limiter granted two requests, but was not configured to accept more than one every" + + " ~2.2 * 10^308 seconds") + .isFalse(); + } + + @Test + public void deserializationIsInverseOfSerialization() throws JsonProcessingException { + SingleRowTwoCellsWorkflowConfiguration configuration = createWithRateLimit(5); + assertThat(OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(configuration), WorkflowConfiguration.class)) + .isEqualTo(configuration); + } + + @Test + public void rateLimitAccountedForInSerializedForm() throws JsonProcessingException { + assertThat(OBJECT_MAPPER.writeValueAsString(createWithRateLimit(10))) + .isNotEqualTo(OBJECT_MAPPER.writeValueAsString(createWithRateLimit(20))); + } + + private static SingleRowTwoCellsWorkflowConfiguration createWithRateLimit(double rateLimit) { + return ImmutableSingleRowTwoCellsWorkflowConfiguration.builder() + .tableConfiguration(TABLE_CONFIGURATION) + .rateLimit(rateLimit) + .iterationCount(10) + .build(); + } +} diff --git a/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowsTest.java b/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowsTest.java new file mode 100644 index 00000000000..4bc5be54fe2 --- /dev/null +++ b/atlasdb-workload-server/src/test/java/com/palantir/atlasdb/workload/workflow/SingleRowTwoCellsWorkflowsTest.java @@ -0,0 +1,135 @@ +/* + * (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.workload.workflow; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.palantir.atlasdb.factory.TransactionManagers; +import com.palantir.atlasdb.keyvalue.api.TableReference; +import com.palantir.atlasdb.workload.store.AtlasDbTransactionStore; +import com.palantir.atlasdb.workload.store.IsolationLevel; +import com.palantir.atlasdb.workload.store.ReadOnlyTransactionStore; +import com.palantir.atlasdb.workload.store.TransactionStore; +import com.palantir.atlasdb.workload.transaction.DeleteTransactionAction; +import com.palantir.atlasdb.workload.transaction.ReadTransactionAction; +import com.palantir.atlasdb.workload.transaction.WriteTransactionAction; +import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedDeleteTransactionAction; +import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedReadTransactionAction; +import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedWriteTransactionAction; +import com.palantir.atlasdb.workload.util.AtlasDbUtils; +import com.palantir.common.concurrent.PTExecutors; +import java.util.Optional; +import org.junit.Test; + +public class SingleRowTwoCellsWorkflowsTest { + private static final String TABLE_NAME = "my.coffee"; + private static final SingleRowTwoCellsWorkflowConfiguration CONFIGURATION = + ImmutableSingleRowTwoCellsWorkflowConfiguration.builder() + .tableConfiguration(ImmutableTableConfiguration.builder() + .tableName(TABLE_NAME) + .isolationLevel(IsolationLevel.SERIALIZABLE) + .build()) + .rateLimit(Double.MAX_VALUE) + .iterationCount(1) + .build(); + + @Test + public void shouldWriteToFirstCellOnEvenIndices() { + assertThat(SingleRowTwoCellsWorkflows.shouldWriteToFirstCell(0)).isTrue(); + assertThat(SingleRowTwoCellsWorkflows.shouldWriteToFirstCell(2)).isTrue(); + assertThat(SingleRowTwoCellsWorkflows.shouldWriteToFirstCell(24682468)).isTrue(); + } + + @Test + public void shouldNotWriteToFirstCellOnOddIndices() { + assertThat(SingleRowTwoCellsWorkflows.shouldWriteToFirstCell(1)).isFalse(); + assertThat(SingleRowTwoCellsWorkflows.shouldWriteToFirstCell(3)).isFalse(); + assertThat(SingleRowTwoCellsWorkflows.shouldWriteToFirstCell(35793579)).isFalse(); + } + + @Test + public void createsReadsThenMutationsThenReads() { + assertThat(SingleRowTwoCellsWorkflows.createTransactionActions(0, TABLE_NAME)) + .containsExactly( + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL), + WriteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL, 0), + DeleteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL)); + assertThat(SingleRowTwoCellsWorkflows.createTransactionActions(1, TABLE_NAME)) + .containsExactly( + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL), + DeleteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + WriteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL, 1), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL)); + } + + @Test + public void writesValueCorrespondingToTaskIndexInRelevantCell() { + assertThat(SingleRowTwoCellsWorkflows.createTransactionActions(31415926, TABLE_NAME)) + .containsExactly( + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL), + WriteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL, 31415926), + DeleteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL)); + assertThat(SingleRowTwoCellsWorkflows.createTransactionActions(6021023, TABLE_NAME)) + .containsExactly( + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL), + DeleteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + WriteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL, 6021023), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL), + ReadTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL)); + } + + @Test + public void workflowHistoryPreservesStoreAndCapturesWitnessedActions() { + TransactionStore memoryStore = AtlasDbTransactionStore.create( + TransactionManagers.createInMemory(ImmutableSet.of()), + ImmutableMap.of( + TableReference.createWithEmptyNamespace(TABLE_NAME), + AtlasDbUtils.tableMetadata(IsolationLevel.SERIALIZABLE))); + WorkflowHistory history = SingleRowTwoCellsWorkflows.createSingleRowTwoCell( + memoryStore, CONFIGURATION, MoreExecutors.listeningDecorator(PTExecutors.newFixedThreadPool(1))) + .run(); + + assertThat(history.transactionStore()) + .as("should return a read only tranasction store") + .isInstanceOf(ReadOnlyTransactionStore.class); + assertThat(Iterables.getOnlyElement(history.history()).actions()) + .containsExactly( + WitnessedReadTransactionAction.of( + TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL, Optional.empty()), + WitnessedReadTransactionAction.of( + TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL, Optional.empty()), + WitnessedWriteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL, 0), + WitnessedDeleteTransactionAction.of(TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL), + WitnessedReadTransactionAction.of( + TABLE_NAME, SingleRowTwoCellsWorkflows.FIRST_CELL, Optional.of(0)), + WitnessedReadTransactionAction.of( + TABLE_NAME, SingleRowTwoCellsWorkflows.SECOND_CELL, Optional.empty())); + } +}