Skip to content

Commit

Permalink
Adding Read many api support in the Ctl Workload (#25491)
Browse files Browse the repository at this point in the history
* Adding Read many api support in the Ctl Workload

* Adding Read many api support in the Ctl Workload

* Resolving Comments

Co-authored-by: Aayush Kataria <[email protected]>
  • Loading branch information
aayush3011 and Aayush Kataria authored Nov 22, 2021
1 parent fb7b3b1 commit 89f6eee
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 28 deletions.
10 changes: 5 additions & 5 deletions sdk/cosmos/azure-cosmos-benchmark/ctl/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ else
throughput=$ctl_throughput
fi

if [ -z "$ctl_read_write_query_pct" ]; then
read_write_query_pct="90,9,1"
if [ -z "$ctl_read_write_query_readmany_pct" ]; then
read_write_query_readmany_pct="90,8,1,1"
else
read_write_query_pct=$ctl_read_write_query_pct
read_write_query_readmany_pct=$ctl_read_write_query_readmany_pct
fi

if [ -z "$ctl_number_of_operations" ]; then
Expand Down Expand Up @@ -95,9 +95,9 @@ additional_benchmark_options="-documentDataFieldSize 10 -documentDataFieldCount
additional_benchmark_options="$additional_benchmark_options -maxConnectionPoolSize $gateway_connection_poolsize"

if [ -z "$ctl_graphite_endpoint" ]; then
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -DCOSMOS.ENVIRONMENT_NAME=$ctl_env -DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$ctl_client_telemetry_endpoint -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$db_name" -collectionId "$col_name" -readWriteQueryPct "$read_write_query_pct" -diagnosticsThresholdDuration "$diagnostics_threshold_duration" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -preferredRegionsList "$ctl_preferred_regions" $additional_benchmark_options 2>&1 | tee -a "$log_filename"
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -DCOSMOS.ENVIRONMENT_NAME=$ctl_env -DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$ctl_client_telemetry_endpoint -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$db_name" -collectionId "$col_name" -readWriteQueryReadManyPct "$read_write_query_readmany_pct" -diagnosticsThresholdDuration "$diagnostics_threshold_duration" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -preferredRegionsList "$ctl_preferred_regions" $additional_benchmark_options 2>&1 | tee -a "$log_filename"
else
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -DCOSMOS.ENVIRONMENT_NAME=$ctl_env -DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$ctl_client_telemetry_endpoint -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$db_name" -collectionId "$col_name" -readWriteQueryPct "$read_write_query_pct" -diagnosticsThresholdDuration "$diagnostics_threshold_duration" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -preferredRegionsList "$ctl_preferred_regions" $ctl_accountNameInGraphiteReporter $additional_benchmark_options 2>&1 | tee -a "$log_filename"
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -DCOSMOS.ENVIRONMENT_NAME=$ctl_env -DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$ctl_client_telemetry_endpoint -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$db_name" -collectionId "$col_name" -readWriteQueryReadManyPct "$read_write_query_readmany_pct" -diagnosticsThresholdDuration "$diagnostics_threshold_duration" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -preferredRegionsList "$ctl_preferred_regions" $ctl_accountNameInGraphiteReporter $additional_benchmark_options 2>&1 | tee -a "$log_filename"
fi

end=$(date +%s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public class Configuration {
@Parameter(names = "-numberOfCollectionForCtl", description = "Number of collections for ctl load")
private int numberOfCollectionForCtl = 4;

@Parameter(names = "-readWriteQueryPct", description = "Comma separated read write query workload percent")
private String readWriteQueryPct = "90,9,1";
@Parameter(names = "-readWriteQueryReadManyPct", description = "Comma separated read write query readMany workload percent")
private String readWriteQueryReadManyPct = "90,8,1,1";

@Parameter(names = "-manageDatabase", description = "Control switch for creating/deleting underlying database resource")
private boolean manageDatabase = false;
Expand Down Expand Up @@ -438,8 +438,8 @@ public int getNumberOfCollectionForCtl(){
return this.numberOfCollectionForCtl;
}

public String getReadWriteQueryPct() {
return this.readWriteQueryPct;
public String getReadWriteQueryReadManyPct() {
return this.readWriteQueryReadManyPct;
}

public boolean shouldManageDatabase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.ThroughputProperties;
Expand Down Expand Up @@ -54,7 +55,7 @@


public class AsyncCtlWorkload {
private final String PERCENT_PARSING_ERROR = "Unable to parse user provided readWriteQueryPct ";
private final String PERCENT_PARSING_ERROR = "Unable to parse user provided readWriteQueryReadManyPct ";
private final String prefixUuidForCreate;
private final String dataFieldValue;
private final String partitionKey;
Expand All @@ -63,12 +64,14 @@ public class AsyncCtlWorkload {
private final CosmosAsyncClient cosmosClient;
private final Configuration configuration;
private final Map<String, List<PojoizedJson>> docsToRead = new HashMap<>();
private final Map<String, List<CosmosItemIdentity>> itemIdentityMap = new HashMap<>();
private final Semaphore concurrencyControlSemaphore;
private final Random random;

private Timer readLatency;
private Timer writeLatency;
private Timer queryLatency;
private Timer readManyLatency;
private ScheduledReporter reporter;

private Meter readSuccessMeter;
Expand All @@ -77,6 +80,8 @@ public class AsyncCtlWorkload {
private Meter writeFailureMeter;
private Meter querySuccessMeter;
private Meter queryFailureMeter;
private Meter readManySuccessMeter;
private Meter readManyFailureMeter;

private CosmosAsyncDatabase cosmosAsyncDatabase;
private List<CosmosAsyncContainer> containers = new ArrayList<>();
Expand All @@ -85,6 +90,7 @@ public class AsyncCtlWorkload {
private int readPct;
private int writePct;
private int queryPct;
private int readManyPct;

public AsyncCtlWorkload(Configuration cfg) {
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
Expand All @@ -105,7 +111,7 @@ public AsyncCtlWorkload(Configuration cfg) {
configuration = cfg;
logger = LoggerFactory.getLogger(this.getClass());

parsedReadWriteQueryPct(configuration.getReadWriteQueryPct());
parsedReadWriteQueryReadManyPct(configuration.getReadWriteQueryReadManyPct());

createDatabaseAndContainers(configuration);

Expand All @@ -117,6 +123,7 @@ public AsyncCtlWorkload(Configuration cfg) {
logger.info("PRE-populating {} documents ....", cfg.getNumberOfPreCreatedDocuments());
dataFieldValue = RandomStringUtils.randomAlphabetic(configuration.getDocumentDataFieldSize());
createPrePopulatedDocs(configuration.getNumberOfPreCreatedDocuments());
createItemIdentityMap(docsToRead);

if (configuration.isEnableJvmStats()) {
metricsRegistry.register("gc", new GarbageCollectorMetricSet());
Expand Down Expand Up @@ -154,7 +161,7 @@ public void shutdown() {
cosmosClient.close();
}

private void performWorkload(BaseSubscriber<Object> documentSubscriber, OperationType type, long i) throws Exception {
private void performWorkload(BaseSubscriber<Object> documentSubscriber, OperationType type, long i, boolean isReadMany) throws Exception {
Flux<? extends Object> obs;
CosmosAsyncContainer container = containers.get((int) i % containers.size());
if (type.equals(OperationType.Create)) {
Expand All @@ -163,11 +170,11 @@ private void performWorkload(BaseSubscriber<Object> documentSubscriber, Operatio
partitionKey,
configuration.getDocumentDataFieldCount());
obs = container.createItem(data).flux();
} else if (type.equals(OperationType.Query)) {
} else if (type.equals(OperationType.Query) && !isReadMany) {
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
String sqlQuery = "Select top 100 * from c order by c._ts";
obs = container.queryItems(sqlQuery, options, PojoizedJson.class).byPage(10);
} else {
} else if (type.equals(OperationType.Read)){
int index = random.nextInt(docsToRead.get(container.getId()).size());
RequestOptions options = new RequestOptions();
String partitionKeyValue = docsToRead.get(container.getId()).get(index).getId();
Expand All @@ -176,6 +183,10 @@ private void performWorkload(BaseSubscriber<Object> documentSubscriber, Operatio
new PartitionKey(partitionKeyValue),
PojoizedJson.class)
.flux();
} else {
List<CosmosItemIdentity> itemIdentityList = itemIdentityMap.get(container.getId());
obs = container.readMany(itemIdentityList,
PojoizedJson.class).flux();
}

concurrencyControlSemaphore.acquire();
Expand All @@ -190,16 +201,20 @@ public void run() throws Exception {
writeFailureMeter = metricsRegistry.meter("#Write Unsuccessful Operations");
querySuccessMeter = metricsRegistry.meter("#Query Successful Operations");
queryFailureMeter = metricsRegistry.meter("#Query Unsuccessful Operations");
readManySuccessMeter = metricsRegistry.meter("#Read Many Successful Operations");
readManyFailureMeter = metricsRegistry.meter("#Read Many Unsuccessful Operations");
readLatency = metricsRegistry.register("Read Latency", new Timer(new HdrHistogramResetOnSnapshotReservoir()));
writeLatency = metricsRegistry.register("Write Latency", new Timer(new HdrHistogramResetOnSnapshotReservoir()));
queryLatency = metricsRegistry.register("Query Latency", new Timer(new HdrHistogramResetOnSnapshotReservoir()));
readManyLatency = metricsRegistry.register("Read Many Latency", new Timer(new HdrHistogramResetOnSnapshotReservoir()));

reporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS);
long startTime = System.currentTimeMillis();

AtomicLong count = new AtomicLong(0);
long i;
int writeRange = readPct + writePct;
int queryRange = readPct + writePct + queryPct;
for (i = 0; BenchmarkHelper.shouldContinue(startTime, i, configuration); i++) {
int index = (int) i % 100;
if (index < readPct) {
Expand All @@ -209,24 +224,32 @@ public void run() throws Exception {
count,
configuration.getDiagnosticsThresholdDuration());
readSubscriber.context = readLatency.time();
performWorkload(readSubscriber, OperationType.Read, i);
performWorkload(readSubscriber, OperationType.Read, i, false);
} else if (index < writeRange) {
BenchmarkRequestSubscriber<Object> writeSubscriber = new BenchmarkRequestSubscriber<>(writeSuccessMeter,
writeFailureMeter,
concurrencyControlSemaphore,
count,
configuration.getDiagnosticsThresholdDuration());
writeSubscriber.context = writeLatency.time();
performWorkload(writeSubscriber, OperationType.Create, i);
performWorkload(writeSubscriber, OperationType.Create, i, false);

} else {
} else if (index < queryRange){
BenchmarkRequestSubscriber<Object> querySubscriber = new BenchmarkRequestSubscriber<>(querySuccessMeter,
queryFailureMeter,
concurrencyControlSemaphore,
count,
configuration.getDiagnosticsThresholdDuration());
querySubscriber.context = queryLatency.time();
performWorkload(querySubscriber, OperationType.Query, i);
performWorkload(querySubscriber, OperationType.Query, i, false);
} else {
BenchmarkRequestSubscriber<Object> readManySubscriber = new BenchmarkRequestSubscriber<>(readManySuccessMeter,
readManyFailureMeter,
concurrencyControlSemaphore,
count,
configuration.getDiagnosticsThresholdDuration());
readManySubscriber.context = readManyLatency.time();
performWorkload(readManySubscriber, OperationType.Query, i, true);
}
}

Expand All @@ -244,22 +267,23 @@ public void run() throws Exception {
reporter.close();
}

private void parsedReadWriteQueryPct(String readWriteQueryPct) {
String[] readWriteQueryPctList = readWriteQueryPct.split(",");
if (readWriteQueryPctList.length == 3) {
private void parsedReadWriteQueryReadManyPct(String readWriteQueryReadManyPct) {
String[] readWriteQueryReadManyPctList = readWriteQueryReadManyPct.split(",");
if (readWriteQueryReadManyPctList.length == 4) {
try {
if (Integer.valueOf(readWriteQueryPctList[0]) + Integer.valueOf(readWriteQueryPctList[1]) + Integer.valueOf(readWriteQueryPctList[2]) == 100) {
readPct = Integer.valueOf(readWriteQueryPctList[0]);
writePct = Integer.valueOf(readWriteQueryPctList[1]);
queryPct = Integer.valueOf(readWriteQueryPctList[2]);
if (Integer.valueOf(readWriteQueryReadManyPctList[0]) + Integer.valueOf(readWriteQueryReadManyPctList[1]) + Integer.valueOf(readWriteQueryReadManyPctList[2]) + Integer.valueOf(readWriteQueryReadManyPctList[3]) == 100) {
readPct = Integer.valueOf(readWriteQueryReadManyPctList[0]);
writePct = Integer.valueOf(readWriteQueryReadManyPctList[1]);
queryPct = Integer.valueOf(readWriteQueryReadManyPctList[2]);
readManyPct = Integer.valueOf(readWriteQueryReadManyPctList[3]);
} else {
throw new IllegalArgumentException(PERCENT_PARSING_ERROR + readWriteQueryPct);
throw new IllegalArgumentException(PERCENT_PARSING_ERROR + readWriteQueryReadManyPct);
}
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(PERCENT_PARSING_ERROR + readWriteQueryPct);
throw new IllegalArgumentException(PERCENT_PARSING_ERROR + readWriteQueryReadManyPct);
}
} else {
throw new IllegalArgumentException(PERCENT_PARSING_ERROR + readWriteQueryPct);
throw new IllegalArgumentException(PERCENT_PARSING_ERROR + readWriteQueryReadManyPct);
}
}

Expand Down Expand Up @@ -299,6 +323,14 @@ private void createPrePopulatedDocs(int numberOfPreCreatedDocuments) {
}
}

private void createItemIdentityMap(Map<String, List<PojoizedJson>> docsToRead) {
docsToRead.entrySet().stream()
.forEach(doc -> doc.getValue()
.forEach(pojoizedJson -> itemIdentityMap
.computeIfAbsent(doc.getKey(), d -> new ArrayList<>())
.add(new CosmosItemIdentity(new PartitionKey(pojoizedJson.getId()), pojoizedJson.getId()))));
}

private void createDatabaseAndContainers(Configuration cfg) {
try {
cosmosAsyncDatabase = cosmosClient.getDatabase(this.configuration.getDatabaseId());
Expand Down

0 comments on commit 89f6eee

Please sign in to comment.