From 7be2c445f1a01f40f35ec5962952e612c33a3897 Mon Sep 17 00:00:00 2001 From: psainics Date: Wed, 23 Oct 2024 14:59:17 +0530 Subject: [PATCH] Escape column names --- .../cloudsql/mysql/CloudSQLMySQLSink.java | 25 +++++++++++++ .../cloudsql/mysql/CloudSQLMySQLSinkTest.java | 35 +++++++++++++++++++ .../io/cdap/plugin/mysql/MysqlDBRecord.java | 9 +++++ .../java/io/cdap/plugin/mysql/MysqlSink.java | 24 +++++++++++++ .../io/cdap/plugin/mysql/MysqlSinkTest.java | 35 +++++++++++++++++++ 5 files changed, 128 insertions(+) create mode 100644 cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java create mode 100644 mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java index 271012f7e..6149c114b 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java @@ -16,6 +16,7 @@ package io.cdap.plugin.cloudsql.mysql; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.cdap.cdap.api.annotation.Description; @@ -25,6 +26,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; @@ -40,7 +42,11 @@ import io.cdap.plugin.util.CloudSQLUtil; import io.cdap.plugin.util.DBUtils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.StringJoiner; import javax.annotation.Nullable; /** Sink support for a CloudSQL MySQL database. */ @@ -52,6 +58,7 @@ public class CloudSQLMySQLSink extends AbstractDBSink { private final CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig; + private static final Character ESCAPE_CHAR = '`'; public CloudSQLMySQLSink(CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig) { super(cloudsqlMysqlSinkConfig); @@ -78,6 +85,24 @@ protected DBRecord getDBRecord(StructuredRecord output) { return new MysqlDBRecord(output, columnTypes); } + @Override + protected void setColumnsInfo(List fields) { + List columnsList = new ArrayList<>(); + StringJoiner columnsJoiner = new StringJoiner(","); + for (Schema.Field field : fields) { + columnsList.add(field.getName()); + columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR); + } + + super.columns = Collections.unmodifiableList(columnsList); + super.dbColumns = columnsJoiner.toString(); + } + + @VisibleForTesting + String getDbColumns() { + return dbColumns; + } + @Override protected LineageRecorder getLineageRecorder(BatchSinkContext context) { String host; diff --git a/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java new file mode 100644 index 000000000..65a14502e --- /dev/null +++ b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cloudsql.mysql; + +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +public class CloudSQLMySQLSinkTest { + @Test + public void testSetColumnsInfo() { + Schema outputSchema = Schema.recordOf("output", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("insert", Schema.of(Schema.Type.STRING))); + CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig()); + Assert.assertNotNull(outputSchema.getFields()); + cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields()); + Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns()); + } +} diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java index 0560b10c3..94b711786 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java @@ -93,4 +93,13 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex); } + + @Override + protected void insertOperation(PreparedStatement stmt) throws SQLException { + for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) { + ColumnType columnType = columnTypes.get(fieldIndex); + Schema.Field field = record.getSchema().getField(columnType.getName(), true); + writeToDB(stmt, field, fieldIndex); + } + } } diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java index c839cb12b..f71371026 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java @@ -16,6 +16,7 @@ package io.cdap.plugin.mysql; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; @@ -24,6 +25,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; @@ -39,9 +41,12 @@ import io.cdap.plugin.db.sink.FieldsValidator; import io.cdap.plugin.util.DBUtils; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.StringJoiner; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -54,6 +59,7 @@ public class MysqlSink extends AbstractDBSink { private final MysqlSinkConfig mysqlSinkConfig; + private static final Character ESCAPE_CHAR = '`'; public MysqlSink(MysqlSinkConfig mysqlSinkConfig) { super(mysqlSinkConfig); @@ -85,6 +91,24 @@ protected SchemaReader getSchemaReader() { return new MysqlSchemaReader(null); } + @Override + protected void setColumnsInfo(List fields) { + List columnsList = new ArrayList<>(); + StringJoiner columnsJoiner = new StringJoiner(","); + for (Schema.Field field : fields) { + columnsList.add(field.getName()); + columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR); + } + + super.columns = Collections.unmodifiableList(columnsList); + super.dbColumns = columnsJoiner.toString(); + } + + @VisibleForTesting + String getDbColumns() { + return dbColumns; + } + /** * MySQL action configuration. */ diff --git a/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java new file mode 100644 index 000000000..1dd4e809e --- /dev/null +++ b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mysql; + +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +public class MysqlSinkTest { + @Test + public void testSetColumnsInfo() { + Schema outputSchema = Schema.recordOf("output", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("insert", Schema.of(Schema.Type.STRING))); + MysqlSink mySQLSink = new MysqlSink(new MysqlSink.MysqlSinkConfig()); + Assert.assertNotNull(outputSchema.getFields()); + mySQLSink.setColumnsInfo(outputSchema.getFields()); + Assert.assertEquals("`id`,`name`,`insert`", mySQLSink.getDbColumns()); + } +}