Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Percent encode opensearch index name #2564

Merged
merged 4 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.apache.commons.lang3.StringUtils.strip;

import java.util.Set;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -19,6 +20,9 @@ public class IndexQueryDetails {

public static final String STRIP_CHARS = "`";

private static final Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
Expand Down Expand Up @@ -103,6 +107,21 @@ public String openSearchIndexName() {
indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName();
break;
}
return indexName.toLowerCase();
return percentEncode(indexName).toLowerCase();
}

/*
* Percent-encode invalid OpenSearch index name characters.
*/
private String percentEncode(String indexName) {
StringBuilder builder = new StringBuilder(indexName.length());
for (char ch : indexName.toCharArray()) {
if (INVALID_INDEX_NAME_CHARS.contains(ch)) {
builder.append(String.format("%%%02X", (int) ch));
} else {
builder.append(ch);
}
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,20 +334,31 @@ public class FlintDatasetMock {
final FlintIndexType indexType;
final String indexName;
boolean isLegacy = false;
boolean isSpecialCharacter = false;
String latestId;

FlintDatasetMock isLegacy(boolean isLegacy) {
this.isLegacy = isLegacy;
return this;
}

FlintDatasetMock isSpecialCharacter(boolean isSpecialCharacter) {
this.isSpecialCharacter = isSpecialCharacter;
return this;
}

FlintDatasetMock latestId(String latestId) {
this.latestId = latestId;
return this;
}

public void createIndex() {
String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1";
if (isSpecialCharacter) {
createIndexWithMappings(
indexName, loadMappings(pathPrefix + "/" + "flint_special_character_index.json"));
return;
}
switch (indexType) {
case SKIPPING:
createIndexWithMappings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import org.opensearch.sql.spark.rest.model.LangType;

public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
private final String specialName = "`test ,:\"+/\\|?#><`";
private final String encodedName = "test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c";

public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs";
public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs";
public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mys3.default.http_logs_metrics";
public final String REFRESH_SCI = "REFRESH SKIPPING INDEX on mys3.default." + specialName;

public final FlintDatasetMock LEGACY_SKIPPING =
new FlintDatasetMock(
Expand All @@ -53,6 +57,15 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
"flint_mys3_default_http_logs_metrics")
.isLegacy(true);

public final FlintDatasetMock LEGACY_SPECIAL_CHARACTERS =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON mys3.default." + specialName,
REFRESH_SCI,
FlintIndexType.SKIPPING,
"flint_mys3_default_" + encodedName + "_skipping_index")
.isLegacy(true)
.isSpecialCharacter(true);

public final FlintDatasetMock SKIPPING =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON mys3.default.http_logs",
Expand All @@ -74,6 +87,16 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
FlintIndexType.MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics")
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19tZXRyaWNz");
public final FlintDatasetMock SPECIAL_CHARACTERS =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON mys3.default." + specialName,
REFRESH_SCI,
FlintIndexType.SKIPPING,
"flint_mys3_default_" + encodedName + "_skipping_index")
.isSpecialCharacter(true)
.latestId(
"ZmxpbnRfbXlzM19kZWZhdWx0X3Rlc3QlMjAlMmMlM2ElMjIlMmIlMmYlNWMlN2MlM2YlMjMlM2UlM2Nfc2tpcHBpbmdfaW5kZXg=");

public final String CREATE_SI_AUTO =
"CREATE SKIPPING INDEX ON mys3.default.http_logs"
+ "(l_orderkey VALUE_SET) WITH (auto_refresh = true)";
Expand All @@ -93,7 +116,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
*/
@Test
public void legacyBasicDropAndFetchAndCancel() {
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING)
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
LocalEMRSClient emrsClient =
Expand Down Expand Up @@ -141,7 +164,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
*/
@Test
public void legacyDropIndexNoJobRunning() {
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV)
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
LocalEMRSClient emrsClient =
Expand Down Expand Up @@ -178,7 +201,7 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
*/
@Test
public void legacyDropIndexCancelJobTimeout() {
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV)
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
// Mock EMR-S always return running.
Expand Down Expand Up @@ -209,14 +232,48 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
});
}

/**
* Legacy Test, without state index support. Not EMR-S job running. expectation is
*
* <p>(1) Drop Index response is SUCCESS
*/
@Test
public void legacyDropIndexSpecialCharacter() {
FlintDatasetMock mockDS = LEGACY_SPECIAL_CHARACTERS;
LocalEMRSClient emrsClient =
new LocalEMRSClient() {
@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
throw new IllegalArgumentException("Job run is not in a cancellable state");
}
};
EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient;
AsyncQueryExecutorService asyncQueryExecutorService =
createAsyncQueryExecutorService(emrServerlessClientFactory);

// Mock flint index
mockDS.createIndex();

// 1.drop index
CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null));

// 2.fetch result.
AsyncQueryExecutionResponse asyncQueryResults =
asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId());
assertEquals("SUCCESS", asyncQueryResults.getStatus());
assertNull(asyncQueryResults.getError());
}

/**
* Happy case. expectation is
*
* <p>(1) Drop Index response is SUCCESS (2) change index state to: DELETED
*/
@Test
public void dropAndFetchAndCancel() {
ImmutableList.of(SKIPPING, COVERING, MV)
ImmutableList.of(SKIPPING, COVERING, MV, SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
LocalEMRSClient emrsClient =
Expand Down Expand Up @@ -584,6 +641,47 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
});
}

/**
* Cancel EMR-S job, but not job running. expectation is
*
* <p>(1) Drop Index response is SUCCESS (2) change index state to: DELETED
*/
@Test
public void dropIndexSpecialCharacter() {
FlintDatasetMock mockDS = SPECIAL_CHARACTERS;
// Mock EMR-S job is not running
LocalEMRSClient emrsClient =
new LocalEMRSClient() {
@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
throw new IllegalArgumentException("Job run is not in a cancellable state");
}
};
EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient;
AsyncQueryExecutorService asyncQueryExecutorService =
createAsyncQueryExecutorService(emrServerlessClientFactory);

// Mock flint index
mockDS.createIndex();
// Mock index state in refresh state.
MockFlintSparkJob flintIndexJob =
new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE);
flintIndexJob.refreshing();

// 1.drop index
CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null));

// 2.fetch result.
AsyncQueryExecutionResponse asyncQueryResults =
asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId());
assertEquals("SUCCESS", asyncQueryResults.getStatus());
assertNull(asyncQueryResults.getError());

flintIndexJob.assertState(FlintIndexState.DELETED);
}

/**
* No Job running, expectation is
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec {
mockDataset(
"VACUUM MATERIALIZED VIEW mys3.default.http_logs_metrics",
MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics"));
"flint_mys3_default_http_logs_metrics"),
mockDataset(
"VACUUM SKIPPING INDEX ON mys3.default.`test ,:\"+/\\|?#><`",
SKIPPING,
"flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index")
.isSpecialCharacter(true));

@Test
public void shouldVacuumIndexInRefreshingState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,19 @@ public void materializedViewIndexNameNotFullyQualified() {
.build()
.openSearchIndexName());
}

@Test
public void sanitizedIndexName() {
assertEquals(
"flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index",
IndexQueryDetails.builder()
.indexName("invalid")
.fullyQualifiedTableName(
new FullyQualifiedTableName("mys3.default.`test ,:\"+/\\|?#><`"))
.indexOptions(new FlintIndexOptions())
.indexQueryActionType(IndexQueryActionType.DROP)
.indexType(FlintIndexType.SKIPPING)
.build()
.openSearchIndexName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"_meta": {
"kind": "skipping",
"indexedColumns": [
{
"columnType": "int",
"kind": "VALUE_SET",
"columnName": "status"
}
],
"name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index",
"options": {},
"source": "mys3.default.`test ,:\"+/\\|?#><`",
"version": "0.1.0",
"properties": {
"env": {
"SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p",
"SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q"
}
},
"latestId": "ZmxpbnRfbXlzM19kZWZhdWx0X3Rlc3QlMjAlMmMlM2ElMjIlMmIlMmYlNWMlN2MlM2YlMjMlM2UlM2Nfc2tpcHBpbmdfaW5kZXg="
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"_meta": {
"kind": "skipping",
"indexedColumns": [
{
"columnType": "int",
"kind": "VALUE_SET",
"columnName": "status"
}
],
"name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index",
"options": {},
"source": "mys3.default.`test ,:\"+/\\|?#><`",
"version": "0.1.0",
"properties": {
"env": {
"SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p",
"SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q"
}
}
}
}
Loading