Skip to content

Commit

Permalink
[#21963] YSQL: New read restart workloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
pao214 committed Sep 25, 2024
1 parent 07e5c09 commit c50eabe
Show file tree
Hide file tree
Showing 6 changed files with 960 additions and 1 deletion.
229 changes: 229 additions & 0 deletions src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

package com.yugabyte.sample.apps;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.ThreadLocalRandom;

import org.apache.log4j.Logger;

/*
* Money transfers across bank accounts is a common usecase for a OLTP
* database. Transfers are a commonly used example for discussing
* transactions in databases because of its strong requirements on
* consistency guarantees.
*
* Simulate money transfers. The most important constraint here
* is that the total amount of money across all accounts should remain
* invariant. However, aggregating money across all accounts involves
* a full table scan and this exposes the query to read restarts.
*
* This app helps understand whether the new clockbound clock
* helps improve the performance of this workload.
*
* Database Configuration:
* configure with wallclock and compare the metrics with
* a clockbound clock configuration.
*
* Setup:
* 1. Create a bank_accounts TABLE with columns (account_id INT, balance INT).
* 2. Insert 1000 accounts with account_id 0 to 999 initialized to 1000.
*
* Workload:
* There are two main operations in this workload:
* a. Transfer: Transfers a random amount money from one account to another.
* The amount must be <= the balance of the source account.
* b. Verify: Verifies that the total amount of money across all accounts
* is 1000 * 1000.
*
* Transfer Operation:
* 1. Pick a sender and a receiver pair at random (they must be different).
* 2. Start a repeatable read transaction.
* 3. Query the account balance of the sender.
* 4. If the balance is zero, abort the transaction.
* 5. Pick a random amount [1, balance].
* 6. Decrement the balance of the sender by the amount.
* 7. Increment the balance of the receiver by the amount.
* 8. Commit the transaction.
*
* Verify Operation:
* 1. Sum the balances of all accounts.
* 2. Verify that the sum is 1000 * 1000.
*/
public class SqlBankTransfers extends AppBase {
private static final Logger LOG = Logger.getLogger(SqlBankTransfers.class);

// Static initialization of this app's config.
static {
// Use 1 Verify thread and 10 Transfer threads.
appConfig.readIOPSPercentage = -1;
appConfig.numReaderThreads = 1;
appConfig.numWriterThreads = 10;
// Disable number of keys.
appConfig.numKeysToRead = -1;
appConfig.numKeysToWrite = -1;
// Run the app for 1 minute.
appConfig.runTimeSeconds = 60;
// Report restart read requests metric by default.
appConfig.restartReadsReported = true;
// Avoid load balancing errors.
appConfig.loadBalance = false;
appConfig.disableYBLoadBalancingPolicy = true;
}

// The default table name to create and use for ops.
private static final String DEFAULT_TABLE_NAME = "bank_accounts";

// The number of accounts in the bank.
private static final int NUM_ACCOUNTS = 1000;

// Initial balance of each account.
private static final int INIT_BALANCE = 1000;

// Shared counter to store the number of inconsistent reads.
private static AtomicLong numInconsistentReads = new AtomicLong(0);

@Override
public void createTablesIfNeeded(TableOp tableOp) throws Exception {
try (Connection connection = getPostgresConnection()) {
// Every run should start cleanly.
connection.createStatement().execute(
String.format("DROP TABLE IF EXISTS %s", getTableName()));
LOG.info("Dropping any table(s) left from previous runs if any");
connection.createStatement().execute(String.format(
"CREATE TABLE %s (account_id INT, balance INT)",
getTableName()));
LOG.info(String.format("Created table: %s", getTableName()));
int numRows = connection.createStatement().executeUpdate(String.format(
"INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), %d",
getTableName(), NUM_ACCOUNTS, INIT_BALANCE));
LOG.info(String.format(
"Inserted %d rows into %s", numRows, getTableName()));
}
}

@Override
public String getTableName() {
String tableName = appConfig.tableName != null ?
appConfig.tableName : DEFAULT_TABLE_NAME;
return tableName.toLowerCase();
}

// Executes the Verify operation.
@Override
public long doRead() {
try (Connection connection = getPostgresConnection();
Statement statement = connection.createStatement()) {
try {
ResultSet resultSet = statement.executeQuery(String.format(
"SELECT SUM(balance) FROM %s", getTableName()));
if (!resultSet.next()) {
throw new SQLException("No rows returned from sum query");
}
int totalBalance = resultSet.getInt(1);

// verify total balance.
if (totalBalance != NUM_ACCOUNTS * INIT_BALANCE) {
LOG.error(String.format("Total balance is %d", totalBalance));
numInconsistentReads.incrementAndGet();
}
} catch (Exception e) {
LOG.error("Error verifying balances ", e);
}
} catch (Exception e) {
LOG.error("Error creating a connection ", e);
}
return 1;
}

// Executes the Transfer operation.
@Override
public long doWrite(int threadIdx) {
// Pick two random distinct accounts.
int sender = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS);
int receiver;
do {
receiver = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS);
} while (receiver == sender);

try (Connection connection = getPostgresConnection();
Statement statement = connection.createStatement()) {
// Start a repeatable read transaction.
connection.setAutoCommit(false);
connection.setTransactionIsolation(
Connection.TRANSACTION_REPEATABLE_READ);
try {
// Retrieve the balance of the sender.
ResultSet rs = statement.executeQuery(String.format(
"SELECT balance FROM %s WHERE account_id = %d",
getTableName(), sender));
if (!rs.next()) {
throw new SQLException("No row found for account " + sender);
}
int senderBalance = rs.getInt("balance");

// If the sender has no money, abort the transaction.
if (senderBalance <= 0) {
if (senderBalance < 0) {
LOG.error(String.format(
"Sender %d has negative balance %d", sender, senderBalance));
numInconsistentReads.incrementAndGet();
}
throw new SQLException("Sender has no money");
}

// Pick a random amount to transfer [1, sendBalance].
int amount = ThreadLocalRandom.current().nextInt(1, senderBalance + 1);

// Decrement the sender's balance.
statement.executeUpdate(String.format(
"UPDATE %s SET balance = balance - %d WHERE account_id = %d",
getTableName(), amount, sender));

// Increment the receiver's balance.
statement.executeUpdate(String.format(
"UPDATE %s SET balance = balance + %d WHERE account_id = %d",
getTableName(), amount, receiver));

// Commit the transaction.
connection.commit();

// Transfer successful.
return 1;
} catch (Exception e) {
LOG.error("Error transferring money ", e);
connection.rollback();
return 0;
}
} catch (Exception e) {
LOG.error("Error creating a connection ", e);
return 0;
}
}

/*
* Appends the number of inconsistent reads to the metrics output.
*/
@Override
public void appendMessage(StringBuilder sb) {
sb.append("Inconsistent reads: ").append(
numInconsistentReads.get()).append(" total ops | ");
super.appendMessage(sb);
}
}
164 changes: 164 additions & 0 deletions src/main/java/com/yugabyte/sample/apps/SqlConsistentHashing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

package com.yugabyte.sample.apps;

import java.sql.Connection;
import java.sql.Statement;

import java.util.concurrent.ThreadLocalRandom;

import org.apache.log4j.Logger;

/*
* Consistent hashing is useful when you have a dynamic set of nodes and
* you need to send a key-value request to one of the nodes. Consistent
* hashing is great at load balancing without moving too many keys when
* nodes are added or removed.
*
* This app maintains a list of hashes one for each "virtual" node and
* supports two operations:
* a. Config change: Add or remove a node.
* b. Get node: Get the node for a given key.
*
* Config Change Operation:
* 1. At coin flip, choose whether to add or remove a node.
* 2. If adding a node, add a node with a random hash.
* 3. If removing a node, remove a random node.
*
* Get Node Operation:
* 1. Pick a random key.
* 2. Find the node with the smallest hash greater than the key.
* If no such node exists, return the smallest hash node.
*/
public class SqlConsistentHashing extends AppBase {
private static final Logger LOG = Logger.getLogger(SqlEventCounter.class);

// Static initialization of this app's config.
static {
// Use 10 Get Node threads and 10 Config Change threads.
appConfig.readIOPSPercentage = -1;
appConfig.numReaderThreads = 10;
appConfig.numWriterThreads = 10;
// Disable number of keys.
appConfig.numKeysToRead = -1;
appConfig.numKeysToWrite = -1;
// Run the app for 1 minute.
appConfig.runTimeSeconds = 60;
// Report restart read requests metric by default.
appConfig.restartReadsReported = true;
// Avoid load balancing errors.
appConfig.loadBalance = false;
appConfig.disableYBLoadBalancingPolicy = true;
}

// The default table name to create and use for ops.
private static final String DEFAULT_TABLE_NAME = "consistent_hashing";

// Initial number of nodes.
private static final int INITIAL_NODES = 100000;

@Override
public void createTablesIfNeeded(TableOp tableOp) throws Exception {
try (Connection connection = getPostgresConnection()) {
// Every run should start cleanly.
connection.createStatement().execute(
String.format("DROP TABLE IF EXISTS %s", getTableName()));
LOG.info("Dropping any table(s) left from previous runs if any");
connection.createStatement().execute(String.format(
"CREATE TABLE %s (node_hash INT) SPLIT INTO 24 TABLETS",
getTableName()));
LOG.info("Created table " + getTableName());
connection.createStatement().execute(String.format(
"INSERT INTO %s" +
" SELECT (RANDOM() * 1000000000)::INT" +
" FROM generate_series(1, %d)",
getTableName(), INITIAL_NODES));
LOG.info("Inserted " + INITIAL_NODES + " nodes into " + getTableName());
}
}

@Override
public String getTableName() {
String tableName = appConfig.tableName != null ?
appConfig.tableName : DEFAULT_TABLE_NAME;
return tableName.toLowerCase();
}

@Override
public long doRead() {
try (Connection connection = getPostgresConnection();
Statement statement = connection.createStatement()) {
int key = ThreadLocalRandom.current().nextInt();
try {
statement.executeQuery(String.format(
"SELECT COALESCE(" +
" (SELECT MIN(node_hash) FROM %s WHERE node_hash > %d)," +
" (SELECT MIN(node_hash) FROM %s)" +
")",
getTableName(), key, getTableName()));
return 1;
} catch (Exception e) {
LOG.error("Error retrieving node uuid", e);
return 0;
}
} catch (Exception e) {
LOG.error("Error creating a connection ", e);
return 0;
}
}

@Override
public long doWrite(int threadIdx) {
try (Connection connection = getPostgresConnection();
Statement statement = connection.createStatement()) {
int coinFlip = ThreadLocalRandom.current().nextInt(2);
if (coinFlip == 0) {
// Add a node.
return addNode(statement);
} else {
// Remove a node.
return removeNode(statement);
}
} catch (Exception e) {
LOG.error("Error creating a connection ", e);
return 0;
}
}

public long addNode(Statement statement) {
try {
int nodeHash = ThreadLocalRandom.current().nextInt();
statement.executeUpdate(String.format(
"INSERT INTO %s (node_hash) VALUES (%d)",
getTableName(), nodeHash));
return 1;
} catch (Exception e) {
LOG.error("Error adding a node " + e);
return 0;
}
}

public long removeNode(Statement statement) {
try {
statement.executeUpdate(String.format(
"DELETE FROM %s WHERE node_hash =" +
" (SELECT node_hash FROM %s ORDER BY RANDOM() LIMIT 1)",
getTableName(), getTableName()));
return 1;
} catch (Exception e) {
LOG.error("Error removing a node " + e);
return 0;
}
}
}
Loading

0 comments on commit c50eabe

Please sign in to comment.