Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[Workload Service] Proof-of-concept: one row two cells (#6453)
Browse files Browse the repository at this point in the history
* v0

* atlasdb store

* fix tests

* stash

* stash

* working v0

* working v0

* refactor

* add more tests

* boo

* templates and runners

* Cleanup and tests: CTR

* CTR: concretions to abstractions

* boo

* rocket launch

* self-CR

* final

* dependencies

* proof-of-concept: one row two cells

* la campanella

* la campanella

* explanations

* bleh

* fixes

* boo

* Slash and burn

* workflow behaves differently

* fix

* boo

* cleanup

* final

* clair de lune

* CR checkpoint

* CR

* CR

* Add witnessed transaction actions

* fixes

---------

Co-authored-by: Sam Kramer <[email protected]>
  • Loading branch information
jeremyk-91 and Sam Kramer authored Mar 20, 2023
1 parent 868f3e7 commit 409a00b
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.workflow;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.atlasdb.workload.store.IsolationLevel;
import org.immutables.value.Value;

@Value.Immutable
@JsonSerialize(as = ImmutableTableConfiguration.class)
@JsonDeserialize(as = ImmutableTableConfiguration.class)
public interface TableConfiguration {
String tableName();

IsolationLevel isolationLevel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@

package com.palantir.atlasdb.workload.workflow;

import com.google.common.util.concurrent.ListeningExecutorService;
import org.immutables.value.Value;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

@Value.Immutable
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface WorkflowConfiguration {
int iterationCount();

/**
* Executor used to execute workload threads (i.e., transactions).
*/
ListeningExecutorService executionExecutor();
}
1 change: 1 addition & 0 deletions atlasdb-workload-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {

testImplementation project(':commons-executors')
testImplementation 'org.mockito:mockito-core'
testImplementation 'com.palantir.conjure.java.runtime:conjure-java-jackson-serialization'
testImplementation 'com.palantir.safe-logging:preconditions-assertj'
testImplementation('org.jmock:jmock') {
exclude group: 'org.hamcrest'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import com.palantir.atlasdb.table.description.NameMetadataDescription;
import com.palantir.atlasdb.table.description.TableMetadata;
import com.palantir.atlasdb.transaction.api.ConflictHandler;
import com.palantir.atlasdb.workload.store.IsolationLevel;
import com.palantir.atlasdb.workload.store.WorkloadCell;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;

public final class AtlasDbUtils {

Expand All @@ -50,6 +53,20 @@ public static byte[] tableMetadata(ConflictHandler conflictHandler) {
.persistToBytes();
}

public static byte[] tableMetadata(IsolationLevel isolationLevel) {
switch (isolationLevel) {
case SERIALIZABLE:
return tableMetadata(ConflictHandler.SERIALIZABLE);
case SNAPSHOT:
return tableMetadata(ConflictHandler.RETRY_ON_WRITE_WRITE);
case NONE:
return tableMetadata(ConflictHandler.IGNORE_ALL);
default:
throw new SafeIllegalStateException(
"Unknown isolation level", SafeArg.of("isolationLevel", isolationLevel));
}
}

public static byte[] indexMetadata(ConflictHandler baseTableConflictHandler) {
ConflictHandler conflictHandler = baseTableConflictHandler.checkReadWriteConflicts()
? ConflictHandler.SERIALIZABLE_INDEX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.palantir.atlasdb.workload.workflow;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.palantir.atlasdb.workload.store.ReadOnlyTransactionStore;
import com.palantir.atlasdb.workload.store.TransactionStore;
import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransaction;
Expand Down Expand Up @@ -45,9 +46,12 @@ private DefaultWorkflow(
}

public static Workflow create(
TransactionStore store, KeyedTransactionTask transactionTask, WorkflowConfiguration configuration) {
TransactionStore store,
KeyedTransactionTask transactionTask,
WorkflowConfiguration configuration,
ListeningExecutorService executionExecutor) {
return new DefaultWorkflow(
new ConcurrentTransactionRunner(store, configuration.executionExecutor()),
new ConcurrentTransactionRunner(store, executionExecutor),
transactionTask,
configuration,
new ReadOnlyTransactionStore(store));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.workflow;

import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.util.concurrent.RateLimiter;
import org.immutables.value.Value;

@Value.Immutable
@JsonSerialize(as = ImmutableSingleRowTwoCellsWorkflowConfiguration.class)
@JsonDeserialize(as = ImmutableSingleRowTwoCellsWorkflowConfiguration.class)
@JsonTypeName(SingleRowTwoCellsWorkflowConfiguration.TYPE)
public interface SingleRowTwoCellsWorkflowConfiguration extends WorkflowConfiguration {
String TYPE = "SINGLE_ROW_TWO_CELLS";

TableConfiguration tableConfiguration();

@Value.Default
default double rateLimit() {
return 100;
}

@Value.Lazy
default RateLimiter transactionRateLimiter() {
return RateLimiter.create(rateLimit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.workflow;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.palantir.atlasdb.workload.store.ImmutableWorkloadCell;
import com.palantir.atlasdb.workload.store.TransactionStore;
import com.palantir.atlasdb.workload.store.WorkloadCell;
import com.palantir.atlasdb.workload.transaction.DeleteTransactionAction;
import com.palantir.atlasdb.workload.transaction.ReadTransactionAction;
import com.palantir.atlasdb.workload.transaction.TransactionAction;
import com.palantir.atlasdb.workload.transaction.WriteTransactionAction;
import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransaction;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Consider a single row in a database that has two cells, which should between them maintain the invariant that
* at most one cell has a value (this workflow is extremely common in AtlasDB Proxy). This workflow maintains
* concurrent transactions that read the row (both cells), and then sends an update to one of the cells and attempts
* to delete the other.
*/
public final class SingleRowTwoCellsWorkflows {
private static final int SINGLE_ROW = 1;
private static final int FIRST_COLUMN = 1;
private static final int SECOND_COLUMN = 2;

@VisibleForTesting
static final WorkloadCell FIRST_CELL = ImmutableWorkloadCell.of(SINGLE_ROW, FIRST_COLUMN);

@VisibleForTesting
static final WorkloadCell SECOND_CELL = ImmutableWorkloadCell.of(SINGLE_ROW, SECOND_COLUMN);

private SingleRowTwoCellsWorkflows() {
// static factory
}

public static Workflow createSingleRowTwoCell(
TransactionStore store,
SingleRowTwoCellsWorkflowConfiguration singleRowTwoCellsWorkflowConfiguration,
ListeningExecutorService executionExecutor) {
return DefaultWorkflow.create(
store,
(txnStore, index) -> run(txnStore, index, singleRowTwoCellsWorkflowConfiguration),
singleRowTwoCellsWorkflowConfiguration,
executionExecutor);
}

private static Optional<WitnessedTransaction> run(
TransactionStore store, int taskIndex, SingleRowTwoCellsWorkflowConfiguration workflowConfiguration) {
workflowConfiguration.transactionRateLimiter().acquire();

List<TransactionAction> transactionActions = createTransactionActions(
taskIndex, workflowConfiguration.tableConfiguration().tableName());
return store.readWrite(transactionActions);
}

@VisibleForTesting
static List<TransactionAction> createTransactionActions(int taskIndex, String tableName) {
List<TransactionAction> cellReads = createCellReadActions(tableName);
List<TransactionAction> cellUpdates = createCellUpdateActions(taskIndex, tableName);
return Streams.concat(cellReads.stream(), cellUpdates.stream(), cellReads.stream())
.collect(Collectors.toList());
}

@VisibleForTesting
static boolean shouldWriteToFirstCell(int taskIndex) {
return taskIndex % 2 == 0;
}

private static List<TransactionAction> createCellUpdateActions(int taskIndex, String tableName) {
return shouldWriteToFirstCell(taskIndex)
? ImmutableList.of(
WriteTransactionAction.of(tableName, FIRST_CELL, taskIndex),
DeleteTransactionAction.of(tableName, SECOND_CELL))
: ImmutableList.of(
DeleteTransactionAction.of(tableName, FIRST_CELL),
WriteTransactionAction.of(tableName, SECOND_CELL, taskIndex));
}

private static List<TransactionAction> createCellReadActions(String tableName) {
return ImmutableList.of(
ReadTransactionAction.of(tableName, FIRST_CELL), ReadTransactionAction.of(tableName, SECOND_CELL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.atlasdb.workload.store.TransactionStore;
import com.palantir.atlasdb.workload.transaction.witnessed.ImmutableWitnessedTransaction;
Expand All @@ -38,13 +39,15 @@ public class DefaultWorkflowTest {
private final KeyedTransactionTask transactionTask = mock(KeyedTransactionTask.class);

private final ScheduledExecutorService scheduler = PTExecutors.newSingleThreadScheduledExecutor();
private final ListeningExecutorService executionExecutor = MoreExecutors.listeningDecorator(scheduler);

@Test
public void handlesExceptionsInUnderlyingTasks() {
RuntimeException transactionException = new RuntimeException("boo");
when(transactionTask.apply(eq(store), anyInt())).thenThrow(transactionException);

Workflow workflow = DefaultWorkflow.create(store, transactionTask, createWorkflowConfiguration(2));
Workflow workflow =
DefaultWorkflow.create(store, transactionTask, createWorkflowConfiguration(2), executionExecutor);
assertThatThrownBy(workflow::run)
.hasMessage("Error when running workflow task")
.hasCause(transactionException);
Expand Down Expand Up @@ -99,10 +102,12 @@ public void sortsReadAndWriteTransactionsByEffectiveTimestamp() {
}

private WorkflowConfiguration createWorkflowConfiguration(int iterationCount) {
return ImmutableWorkflowConfiguration.builder()
.iterationCount(iterationCount)
.executionExecutor(MoreExecutors.listeningDecorator(scheduler))
.build();
return new WorkflowConfiguration() {
@Override
public int iterationCount() {
return iterationCount;
}
};
}

private static WitnessedTransaction createReadOnlyWitnessedTransactionWithoutActions(long start) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.workflow;

import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import com.palantir.atlasdb.workload.store.IsolationLevel;
import com.palantir.atlasdb.workload.transaction.WorkloadTestHelpers;
import com.palantir.conjure.java.serialization.ObjectMappers;
import org.junit.Test;

public class SingleRowTwoCellsWorkflowConfigurationTest {
private static final TableConfiguration TABLE_CONFIGURATION = ImmutableTableConfiguration.builder()
.tableName(WorkloadTestHelpers.TABLE)
.isolationLevel(IsolationLevel.SERIALIZABLE)
.build();
private static final ObjectMapper OBJECT_MAPPER = ObjectMappers.newServerObjectMapper();

static {
OBJECT_MAPPER.registerSubtypes(SingleRowTwoCellsWorkflowConfiguration.class);
}

@Test
public void rateLimiterCreatedWithConfiguredNumberOfPermits() {
SingleRowTwoCellsWorkflowConfiguration configuration = createWithRateLimit(Double.MIN_NORMAL);
RateLimiter rateLimiter = configuration.transactionRateLimiter();
assertThat(rateLimiter.tryAcquire()).isTrue();
assertThat(rateLimiter.tryAcquire())
.as("the rate limiter granted two requests, but was not configured to accept more than one every"
+ " ~2.2 * 10^308 seconds")
.isFalse();
}

@Test
public void deserializationIsInverseOfSerialization() throws JsonProcessingException {
SingleRowTwoCellsWorkflowConfiguration configuration = createWithRateLimit(5);
assertThat(OBJECT_MAPPER.readValue(
OBJECT_MAPPER.writeValueAsString(configuration), WorkflowConfiguration.class))
.isEqualTo(configuration);
}

@Test
public void rateLimitAccountedForInSerializedForm() throws JsonProcessingException {
assertThat(OBJECT_MAPPER.writeValueAsString(createWithRateLimit(10)))
.isNotEqualTo(OBJECT_MAPPER.writeValueAsString(createWithRateLimit(20)));
}

private static SingleRowTwoCellsWorkflowConfiguration createWithRateLimit(double rateLimit) {
return ImmutableSingleRowTwoCellsWorkflowConfiguration.builder()
.tableConfiguration(TABLE_CONFIGURATION)
.rateLimit(rateLimit)
.iterationCount(10)
.build();
}
}
Loading

0 comments on commit 409a00b

Please sign in to comment.