diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 64cb38f7..7c133100 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -127,6 +127,9 @@ public interface ConfigurationOptions { String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect"; boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false; - + /** + * compress_type + */ + String DORIS_SINK_DATA_COMPRESS_TYPE = "doris.sink.properties.compress_type"; } \ No newline at end of file diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 9fdf4c8f..08bc29d1 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -39,6 +39,7 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; @@ -50,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -70,7 +72,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.stream.Collectors; - +import java.util.zip.GZIPOutputStream; /** * DorisStreamLoad @@ -107,6 +109,7 @@ public class DorisStreamLoad implements Serializable { private final Integer txnRetries; private final Integer txnIntervalMs; private final boolean autoRedirect; + private final String compressType; public DorisStreamLoad(SparkSettings settings) { String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); @@ -147,6 +150,7 @@ public DorisStreamLoad(SparkSettings settings) { this.autoRedirect = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT); + compressType=settings.getProperty(ConfigurationOptions.DORIS_SINK_DATA_COMPRESS_TYPE); } public String getLoadUrlStr() { @@ -222,13 +226,30 @@ public long load(Iterator rows, StructType schema) String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); this.loadUrlStr = loadUrlStr; HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema); - RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) - .format(dataFormat) - .sep(FIELD_DELIMITER) - .delim(LINE_DELIMITER) - .schema(schema) - .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); - httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); + + if(StringUtils.isNotEmpty(compressType)){ + if("gz".equals(compressType.toLowerCase()) && dataFormat.equals(DataFormat.CSV) ){ + RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows) + .format(dataFormat) + .sep(FIELD_DELIMITER) + .delim(LINE_DELIMITER) + .schema(schema) + .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); + String content = recordBatchString.getContent(); + byte[] compressedData = compressByGZ(content); + httpPut.setEntity(new ByteArrayEntity(compressedData)); + }else{ + throw new StreamLoadException("Not support the compress type [" + compressType + "] for the dataformat [" + dataFormat + "]"); + } + }else{ + RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) + .format(dataFormat) + .sep(FIELD_DELIMITER) + .delim(LINE_DELIMITER) + .schema(schema) + .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); + httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); + } HttpResponse httpResponse = httpClient.execute(httpPut); loadResponse = new LoadResponse(httpResponse); } catch (IOException e) { @@ -447,7 +468,7 @@ public BackendCacheLoader(SparkSettings settings) { @Override public List load(String key) throws Exception { - return RestService.getBackendRows(settings, LOG); + return RestService.getBackendRows(settings, LOG); } } @@ -505,4 +526,19 @@ private void handleStreamPassThrough() { } + /** + * compress data by gz compression algorithm + */ + public byte[] compressByGZ(String content) throws IOException{ + byte[] compressedData; + try(ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos); + ){ + gzipOutputStream.write(content.getBytes("UTF-8")); + gzipOutputStream.finish(); + compressedData = baos.toByteArray(); + } + return compressedData; + } + } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java new file mode 100644 index 00000000..c8024eee --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.load; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.doris.spark.exception.DorisException; +import org.apache.doris.spark.exception.IllegalArgumentException; +import org.apache.doris.spark.util.DataUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * InputStream for batch load + */ +public class RecordBatchString { + + public static final Logger LOG = LoggerFactory.getLogger(RecordBatchString.class); + + /** + * Load record batch + */ + private final RecordBatch recordBatch; + + private final byte[] delim; + + /** + * record count has been read + */ + private int readCount = 0; + + /** + * streaming mode pass through data without process + */ + private final boolean passThrough; + + public RecordBatchString(RecordBatch recordBatch, boolean passThrough) { + this.recordBatch = recordBatch; + this.passThrough = passThrough; + this.delim = recordBatch.getDelim(); + } + + public String getContent() throws IOException { + String delimStr = new String(this.recordBatch.getDelim()); + StringBuilder builder = new StringBuilder(); + Iterator iterator = recordBatch.getIterator(); + while (iterator.hasNext()) { + try { + builder.append(rowToString(iterator.next())); + } catch (DorisException e) { + throw new IOException(e); + } + builder.append(delimStr); + } + return builder.toString().substring(0, builder.length()-delimStr.length()); + } + + + /** + * Convert Spark row data to string + * + * @param row row data + * @return byte array + * @throws DorisException + */ + private String rowToString(InternalRow row) throws DorisException { + + String str; + + if (passThrough) { + str = row.getString(0); + return str; + } + + switch (recordBatch.getFormat()) { + case CSV: + str = DataUtil.rowToCsvString(row, recordBatch.getSchema(), recordBatch.getSep(), recordBatch.getAddDoubleQuotes()); + break; + case JSON: + try { + str = DataUtil.rowToJsonString(row, recordBatch.getSchema()); + } catch (JsonProcessingException e) { + throw new DorisException("parse row to json bytes failed", e); + } + break; + default: + throw new IllegalArgumentException("Unsupported format: ", recordBatch.getFormat().toString()); + } + + return str; + + } +} diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index f7218c34..5c4527e1 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -40,7 +40,7 @@ public class DataUtil { public static final String NULL_VALUE = "\\N"; - public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) { + public static String rowToCsvString(InternalRow row, StructType schema, String sep, boolean quote) { StructField[] fields = schema.fields(); int n = row.numFields(); if (n > 0) { @@ -51,9 +51,13 @@ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String se value = "\"" + value + "\""; } return value.toString(); - }).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8); + }).collect(Collectors.joining(sep)); } - return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8); + return StringUtils.EMPTY; + } + + public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) { + return rowToCsvString(row, schema, sep, quote).getBytes(StandardCharsets.UTF_8); } public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException { @@ -65,4 +69,13 @@ public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws J return MAPPER.writeValueAsBytes(rowMap); } + public static String rowToJsonString(InternalRow row, StructType schema) throws JsonProcessingException { + StructField[] fields = schema.fields(); + Map rowMap = new HashMap<>(row.numFields()); + for (int i = 0; i < fields.length; i++) { + rowMap.put(fields[i].name(), SchemaUtils.rowColumnValue(row, i, fields[i].dataType())); + } + return MAPPER.writeValueAsString(rowMap); + } + } diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java new file mode 100644 index 00000000..7fdd88ff --- /dev/null +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.load; + +import org.apache.doris.spark.cfg.SparkSettings; +import org.apache.spark.SparkConf; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TestDorisStreamLoad { + + @Test + public void compressByGZ() throws IOException { + String content = "1,aa,1\n" + + "2,aa,2\n" + + "3,aa,3\n" + + "4,aa,4\n" + + "5,aa,5\n" + + "6,aa,6\n" + + "7,aa,7\n" + + "8,aa,8\n" + + "9,aa,9\n" + + "10,aa,10\n" + + "11,aa,11\n" + + "12,aa,12\n" + + "13,aa,13\n" + + "14,aa,14\n" + + "15,aa,15\n" + + "16,aa,16\n" + + "17,aa,17\n" + + "18,aa,18\n" + + "19,aa,19\n" + + "20,aa,20\n" + + "21,aa,21\n" + + "22,aa,22\n" + + "23,aa,23\n" + + "24,aa,24\n" + + "25,aa,25\n" + + "26,aa,26\n" + + "27,aa,27\n" + + "28,aa,28\n" + + "29,aa,29\n" + + "30,aa,30\n" + + "31,aa,31\n" + + "32,aa,32\n" + + "33,aa,33\n" + + "34,aa,34\n" + + "35,aa,35\n" + + "36,aa,36\n" + + "37,aa,37\n" + + "38,aa,38\n" + + "39,aa,39"; + byte[] compressByte = new DorisStreamLoad(new SparkSettings(new SparkConf().set("doris.table.identifier", "aa.bb"))).compressByGZ(content); + + int contentByteLength = content.getBytes("utf-8").length; + int compressByteLength = compressByte.length; + System.out.println(contentByteLength); + System.out.println(compressByteLength); + Assert.assertTrue(contentByteLength > compressByteLength); + + java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); + java.io.ByteArrayInputStream in = new java.io.ByteArrayInputStream(compressByte); + java.util.zip.GZIPInputStream ungzip = new java.util.zip.GZIPInputStream(in); + byte[] buffer = new byte[1024]; + int n; + while ((n = ungzip.read(buffer)) >= 0) out.write(buffer, 0, n); + byte[] unGzipByte = out.toByteArray(); + + String unGzipStr = new String(unGzipByte); + Assert.assertArrayEquals(unGzipStr.getBytes("utf-8"), content.getBytes("utf-8")); + Assert.assertEquals(unGzipStr, content); + } +}