Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add namespace test in destination acceptance test #10793

Merged
merged 41 commits into from
Mar 20, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2075116
Add namespace test for snowflake
tuliren Mar 2, 2022
53e035b
Enable namespace test for bigquery
tuliren Mar 2, 2022
72cbd76
Format code
tuliren Mar 2, 2022
2292000
Capitalize test case id
tuliren Mar 2, 2022
7c6fdc2
Update exception message to point to test case file
tuliren Mar 2, 2022
668e0fe
Merge branch 'master' into liren/destination-namespace-charset
tuliren Mar 18, 2022
82b4f58
Update snowflake name transformer to prepend underscore
tuliren Mar 18, 2022
e2d3506
Override convertStreamName instead of getIdentifier
tuliren Mar 18, 2022
e46e0f2
Add missing state message
tuliren Mar 18, 2022
1a5e73d
Remove unused import
tuliren Mar 18, 2022
d7eda39
Disable more namespace test cases
tuliren Mar 18, 2022
1435264
Dry method that mutates namespace
tuliren Mar 18, 2022
e698272
Pass through null
tuliren Mar 18, 2022
241455a
Normalize namespace
tuliren Mar 18, 2022
cc7da57
Fix test case
tuliren Mar 18, 2022
7cd900b
Revert consumer factory changes
tuliren Mar 18, 2022
21bb4d4
Normalize namespace in catalog
tuliren Mar 18, 2022
bae6f39
Revert catalog normalization
tuliren Mar 18, 2022
acb6613
Enable namespace test for all snowflake destination tests
tuliren Mar 18, 2022
8c4334b
Test namespace for both bigquery destination tests
tuliren Mar 18, 2022
8fd8030
Add unit test for bigquery name transformer
tuliren Mar 18, 2022
66f0c6e
Transform bigquery schema name
tuliren Mar 18, 2022
64c359e
Fix avro name transformer
tuliren Mar 18, 2022
fcedace
Normalize avro namespace
tuliren Mar 18, 2022
8f63404
Standardize namespace in gcs utils
tuliren Mar 18, 2022
019d64a
Bump version for snowflake and bigquery
tuliren Mar 18, 2022
c14e92d
Enable namespace test for bigquery denormalized
tuliren Mar 18, 2022
3f9111b
Dry bigquery denormalized acceptance test
tuliren Mar 18, 2022
e4be015
Revert some of the variable scope change
tuliren Mar 18, 2022
6412b7b
Fix unit test
tuliren Mar 18, 2022
dcb8f98
Bump version
tuliren Mar 18, 2022
89f9f35
Introduce getNamespace method
tuliren Mar 18, 2022
bf159c0
Implement getNamespace method for bigquery
tuliren Mar 18, 2022
b758324
Switch to getNamespace methods
tuliren Mar 18, 2022
7b8708a
Update comments
tuliren Mar 18, 2022
7ca043d
Fix bigquery denormalized acceptance test
tuliren Mar 18, 2022
9e93a4a
Format code
tuliren Mar 18, 2022
c226dfc
Dry bigquery destination test
tuliren Mar 19, 2022
b9caead
Skip partition test for gcs mode
tuliren Mar 19, 2022
b4d789c
Merge branch 'master' into liren/destination-namespace-charset
tuliren Mar 19, 2022
c359284
Bump version
tuliren Mar 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class DataArgumentsProvider implements ArgumentsProvider {
new CatalogMessageTestConfigPair("exchange_rate_catalog.json", "exchange_rate_messages.txt");
public static final CatalogMessageTestConfigPair EDGE_CASE_CONFIG =
new CatalogMessageTestConfigPair("edge_case_catalog.json", "edge_case_messages.txt");
public static final CatalogMessageTestConfigPair NAMESPACE_CONFIG =
new CatalogMessageTestConfigPair("namespace_catalog.json", "namespace_messages.txt");

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.JobGetSpecConfig;
Expand All @@ -27,6 +28,7 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -64,16 +66,21 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -915,6 +922,55 @@ void testSyncWriteSameTableNameDifferentNamespace() throws Exception {
retrieveRawRecordsAndAssertSameMessages(catalog, allMessages, defaultSchema);
}

public static class NamespaceTestCaseProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {
final JsonNode testCases =
Jsons.deserialize(MoreResources.readResource("namespace_test_cases.json"));
return MoreIterators.toList(testCases.elements()).stream()
.filter(testCase -> testCase.get("enabled").asBoolean())
.map(testCase -> Arguments.of(
testCase.get("id").asText(),
testCase.get("namespace").asText(),
testCase.get("normalized").asText()));
}

}

@ParameterizedTest
@ArgumentsSource(NamespaceTestCaseProvider.class)
public void testNamespaces(final String testCaseId, final String namespace, final String normalizedNamespace) throws Exception {
final Optional<NamingConventionTransformer> nameTransformer = getNameTransformer();
nameTransformer.ifPresent(namingConventionTransformer -> assertNamespaceNormalization(testCaseId, normalizedNamespace,
namingConventionTransformer.getIdentifier(namespace)));

if (!implementsNamespaces() || !supportNamespaceTest()) {
return;
}

final AirbyteCatalog catalog = Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.catalogFile), AirbyteCatalog.class);
catalog.getStreams().forEach(stream -> stream.setNamespace(namespace));
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final List<AirbyteMessage> messages = MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
messages.forEach(
message -> {
if (message.getRecord() != null) {
message.getRecord().setNamespace(namespace);
}
});

final JsonNode config = getConfig();
try {
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
} catch (final Exception e) {
throw new IOException(String.format("[%s] Destination failed to sync data to namespace %s", testCaseId, namespace), e);
}
}

/**
* In order to launch a source on Kubernetes in a pod, we need to be able to wrap the entrypoint.
* The source connector must specify its entrypoint in the AIRBYTE_ENTRYPOINT variable. This test
Expand All @@ -933,6 +989,32 @@ public void testEntrypointEnvVar() throws Exception {
assertFalse(entrypoint.isBlank());
}

/**
* Whether the destination should be tested against different namespaces.
*/
protected boolean supportNamespaceTest() {
return false;
}

/**
* Set up the name transformer used by a destination to test it against a variety of namespaces.
*/
protected Optional<NamingConventionTransformer> getNameTransformer() {
return Optional.empty();
}

/**
* Override this method if the normalized namespace is different from the default one. E.g. S3 does
* not allow a name starting with a number. So it should change the expected normalized namespace
* when testCaseId = "s3a-1". Find the testCaseId in "namespace_test_cases.json".
*/
protected void assertNamespaceNormalization(final String testCaseId,
final String expectedNormalizedNamespace,
final String actualNormalizedNamespace) {
assertEquals(expectedNormalizedNamespace, actualNormalizedNamespace,
String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId));
}

private ConnectorSpecification runSpec() throws WorkerException {
return new DefaultGetSpecWorker(
workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"streams": [
{
"name": "data_stream",
"json_schema": {
"properties": {
"field1": {
"type": "boolean"
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type": "RECORD", "record": {"stream": "data_stream", "emitted_at": 1602637589000, "data": { "field1" : true }}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[
{
"id": "s1-1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: can these IDs be a little more descriptive (e.g. all_caps) to make them easier to cross-reference? right now if a test fails, it'll just log that case s1-1 failed, and then the developer would need to find this file to understand why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I also updated the exception message to ask people to look into the namespace_test_case.json for details.

"description": "namespace are converted to lowercase",
"namespace": "NAMESPACE",
"enabled": true,
"normalized": "namespace"
},
{
"id": "s2-1",
"description": "namespace allows alphabets, numbers, and underscore",
"namespace": "Dest_1001_Namespace",
"enabled": true,
"normalized": "dest_1001_namespace"
},
{
"id": "s2a-1",
"description": "namespace romanization",
"namespace": "namespace_with_spécial_character",
"enabled": true,
"normalized": "namespace_with_special_character"
},
{
"id": "s2a-2",
"description": "namespace romanization (japanese)",
"namespace": "namespace_こんにちは",
"enabled": false,
"normalized": "namespace_konnichiwa"
},
{
"id": "s3a-1",
"description": "namespace starting with a number",
"namespace": "99namespace",
"enabled": true,
"normalized": "99namespace"
},
{
"id": "s3b-1",
"description": "long namespace (300 characters)",
"namespace": "a_300_characters_looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo_namespace",
"enabled": false,
"normalized": ""
},
{
"id": "s3c-1",
"description": "reserved word (for future testing)",
"namespace": "select",
"enabled": false,
"normalized": ""
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;

public class BigQueryGcsDestinationAcceptanceTest extends BigQueryDestinationAcceptanceTest {

private static final NamingConventionTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer();

private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");

@Override
Expand Down Expand Up @@ -60,4 +64,14 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
setupBigQuery(bigqueryConfigFromSecretFile);
}

@Override
protected boolean supportNamespaceTest() {
return true;
}

@Override
protected Optional<NamingConventionTransformer> getNameTransformer() {
return Optional.of(NAME_TRANSFORMER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import java.nio.file.Path;
import java.util.Optional;

public class SnowflakeInternalStagingDestinatiomAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
public class SnowflakeInternalStagingDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {

private static final NamingConventionTransformer NAME_TRANSFORMER = new SnowflakeSQLNameTransformer();

public JsonNode getStaticConfig() {
final JsonNode internalStagingConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/internal_staging_config.json")));
Expand All @@ -19,4 +23,14 @@ public JsonNode getStaticConfig() {
return internalStagingConfig;
}

@Override
protected boolean supportNamespaceTest() {
return true;
}

@Override
protected Optional<NamingConventionTransformer> getNameTransformer() {
return Optional.of(NAME_TRANSFORMER);
}

}