Skip to content

Commit

Permalink
🎉 Source Mysql - added basic MySql performance tests (#7820)
Browse files Browse the repository at this point in the history
* [ticket 7489, PR 7820] Source Mysql - added skeleton for MySql performance tests
  • Loading branch information
etsybaev authored Nov 15, 2021
1 parent 63c6249 commit 3a0eaaf
Show file tree
Hide file tree
Showing 8 changed files with 571 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.DefaultCheckConnectionWorker;
Expand All @@ -32,6 +34,8 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

Expand Down Expand Up @@ -155,4 +159,31 @@ protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog catalog, f
return messages;
}

protected Map<String, Integer> runReadVerifyNumberOfReceivedMsgs(final ConfiguredAirbyteCatalog catalog,
final JsonNode state,
final Map<String, Integer> mapOfExpectedRecordsCount)
throws Exception {

final WorkerSourceConfig sourceConfig = new WorkerSourceConfig()
.withSourceConnectionConfiguration(getConfig())
.withState(state == null ? null : new State().withState(state))
.withCatalog(catalog);

final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory));
source.start(sourceConfig, jobRoot);

while (!source.isFinished()) {
Optional<AirbyteMessage> airbyteMessageOptional = source.attemptRead();
if (airbyteMessageOptional.isPresent() && airbyteMessageOptional.get().getType().equals(Type.RECORD)) {
AirbyteMessage airbyteMessage = airbyteMessageOptional.get();
AirbyteRecordMessage record = airbyteMessage.getRecord();

final String streamName = record.getStream();
mapOfExpectedRecordsCount.put(streamName, mapOfExpectedRecordsCount.get(streamName) - 1);
}
}
source.close();
return mapOfExpectedRecordsCount;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.source;

import static org.junit.jupiter.api.Assertions.fail;

import com.google.common.collect.Lists;
import io.airbyte.db.Database;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This abstract class contains common helpers and boilerplate for comprehensively testing that all
* data types in a source can be read and handled correctly by the connector and within Airbyte's
* type system.
*/
public abstract class AbstractSourcePerformanceTest extends AbstractSourceConnectorTest {

protected static final Logger c = LoggerFactory.getLogger(AbstractSourcePerformanceTest.class);
private static final String TEST_VALUE_TEMPLATE = "\"Some test value %s\"";
protected String databaseName = "test";

protected int numberOfColumns; // 240 is near the max value for varchar(8) type
// 200 is near the max value for 1 batch call,if need more - implement multiple batching for single
// stream
protected int numberOfDummyRecords; // 200;
protected int numberOfStreams;

/**
* The column name will be used for a PK column in the test tables. Override it if default name is
* not valid for your source.
*
* @return Id column name
*/
protected String getIdColumnName() {
return "id";
}

/**
* The column name will be used for a test column in the test tables. Override it if default name is
* not valid for your source.
*
* @return Test column name
*/
protected String getTestColumnName() {
return "test_column";
}

/**
* The stream name template will be used for a test tables. Override it if default name is not valid
* for your source.
*
* @return Test steam name template
*/
protected String getTestStreamNameTemplate() {
return "test_%S";
}

/**
* Setup the test database. All tables and data described in the registered tests will be put there.
*
* @return configured test database
* @throws Exception - might throw any exception during initialization.
*/
protected abstract Database setupDatabase() throws Exception;

/**
* Get a create table template for a DB
*
* @return a create tabple template, ex. "CREATE TABLE test.%s(id INTEGER PRIMARY KEY, %s)"
*/
protected abstract String getCreateTableTemplate();

/**
* Get a INSERT query template for a DB
*
* @return an INSERT into table query template, ex. "INSERT INTO test.%s (%s) VALUES %s"
*/
protected abstract String getInsertQueryTemplate();

/**
* Get a test field'stype that will be used in DB for table creation.
*
* @return a test's field type. Ex: varchar(8)
*/
protected abstract String getTestFieldType();

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
// DO NOTHING. Mandatory to override. DB will be setup as part of each test
}

/**
* Provide a source namespace. It's allocated place for table creation. It also known ask "Database
* Schema" or "Dataset"
*
* @return source name space
*/
protected abstract String getNameSpace();

protected void validateNumberOfReceivedMsgs(final Map<String, Integer> checkStatusMap) {
// Iterate through all streams map and check for streams where
Map<String, Integer> failedStreamsMap = checkStatusMap.entrySet().stream()
.filter(el -> el.getValue() != 0).collect(Collectors.toMap(Entry::getKey, Entry::getValue));

if (!failedStreamsMap.isEmpty()) {
fail("Non all messages were delivered. " + failedStreamsMap.toString());
}
c.info("Finished all checks, no issues found for {} of streams", checkStatusMap.size());
}

protected Map<String, Integer> prepareMapWithExpectedRecords(final int streamNumber,
final int expectedRecordsNumberInEachStream) {
Map<String, Integer> resultMap = new HashMap<>(); // streamName&expected records in stream

for (int currentStream = 0; currentStream < streamNumber; currentStream++) {
final String streamName = String.format(getTestStreamNameTemplate(), currentStream);
resultMap.put(streamName, expectedRecordsNumberInEachStream);
}
return resultMap;
}

protected String prepareCreateTableQuery(final int numberOfColumns,
final String currentTableName) {

StringJoiner sj = new StringJoiner(",");
for (int i = 0; i < numberOfColumns; i++) {
sj.add(String.format(" %s%s %s", getTestColumnName(), i, getTestFieldType()));
}

return String.format(getCreateTableTemplate(), databaseName, currentTableName, sj.toString());
}

// ex. INSERT INTO test.test_1 (id, test_column0, test_column1) VALUES (101,"zzz0", "sss0"), ("102",
// "zzzz1", "sss1");
protected String prepareInsertQueryTemplate(final int numberOfColumns, final int recordsNumber) {

StringJoiner fieldsNames = new StringJoiner(",");
fieldsNames.add("id");

StringJoiner baseInsertQuery = new StringJoiner(",");
baseInsertQuery.add("%s");

for (int i = 0; i < numberOfColumns; i++) {
fieldsNames.add(getTestColumnName() + i);
baseInsertQuery.add(String.format(TEST_VALUE_TEMPLATE, i));
}

StringJoiner insertGroupValuesJoiner = new StringJoiner(",");

for (int currentRecordNumber = 0; currentRecordNumber < recordsNumber; currentRecordNumber++) {
insertGroupValuesJoiner
.add("(" + String.format(baseInsertQuery.toString(), currentRecordNumber) + ")");
}

return String
.format(getInsertQueryTemplate(), databaseName, "%s", fieldsNames.toString(),
insertGroupValuesJoiner.toString());
}

/**
* Configures streams for all registered data type tests.
*
* @return configured catalog
*/
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
List<ConfiguredAirbyteStream> streams = new ArrayList<>();

for (int currentStream = 0; currentStream < numberOfStreams; currentStream++) {

// CREATE TABLE test.test_1_int(id INTEGER PRIMARY KEY, test_column int)
List<Field> fields = new ArrayList<>();

fields.add(Field.of(getIdColumnName(), JsonSchemaPrimitive.NUMBER));
for (int currentColumnNumber = 0;
currentColumnNumber < numberOfColumns;
currentColumnNumber++) {
fields.add(Field.of(getTestColumnName() + currentColumnNumber, JsonSchemaPrimitive.STRING));
}

AirbyteStream airbyteStream = CatalogHelpers
.createAirbyteStream(String.format(getTestStreamNameTemplate(), currentStream),
getNameSpace(), fields)
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of(getIdColumnName())))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));

ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList(getIdColumnName()))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(airbyteStream);

streams.add(configuredAirbyteStream);

}

return new ConfiguredAirbyteCatalog().withStreams(streams);
}

}
39 changes: 39 additions & 0 deletions airbyte-integrations/connectors/source-mysql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# MySQL Source

## Documentation
This is the repository for the MySQL only source connector in Java.
For information about how to use this connector within Airbyte, see [User Documentation](https://docs.airbyte.io/integrations/sources/mysql)

## Local development

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-mysql:build
```

### Locally running the connector docker image

#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:source-mysql:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

## Testing
We use `JUnit` for Java tests.

### Test Configuration
#### Acceptance Tests
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
```

#### Performance Tests
To run performance tests:
```
./gradlew :airbyte-integrations:connectors:source-mysql:performanceTest
```
5 changes: 5 additions & 0 deletions airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
id 'airbyte-performance-test-java'
}

application {
Expand All @@ -28,6 +29,10 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mysql')

performanceTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
performanceTestJavaImplementation project(':airbyte-integrations:connectors:source-mysql')

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
performanceTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Loading

0 comments on commit 3a0eaaf

Please sign in to comment.