From 92c348c46b2b820df9653819de57c8b9614df524 Mon Sep 17 00:00:00 2001 From: felen <1489341561@qq.com> Date: Thu, 18 Jan 2024 09:18:17 +0800 Subject: [PATCH 1/2] compress data before stream_load --- .../doris/spark/cfg/ConfigurationOptions.java | 5 +- .../doris/spark/load/DorisStreamLoad.java | 58 +++++++-- .../doris/spark/load/RecordBatchString.java | 111 ++++++++++++++++++ .../org/apache/doris/spark/util/DataUtil.java | 19 ++- 4 files changed, 180 insertions(+), 13 deletions(-) create mode 100644 spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 64cb38f7..28e8fdd7 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -127,6 +127,9 @@ public interface ConfigurationOptions { String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect"; boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false; - + /** + * Use automatic redirection of fe without explicitly obtaining the be list + */ + String DORIS_SINK_DATA_COMPRESS_TYPE = "doris.sink.properties.compress_type"; } \ No newline at end of file diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 9fdf4c8f..ee65d45b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -39,6 +39,7 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; @@ -50,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -70,7 +72,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.stream.Collectors; - +import java.util.zip.GZIPOutputStream; /** * DorisStreamLoad @@ -107,6 +109,7 @@ public class DorisStreamLoad implements Serializable { private final Integer txnRetries; private final Integer txnIntervalMs; private final boolean autoRedirect; + private final String compressType; public DorisStreamLoad(SparkSettings settings) { String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); @@ -147,6 +150,7 @@ public DorisStreamLoad(SparkSettings settings) { this.autoRedirect = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT); + compressType=settings.getProperty(ConfigurationOptions.DORIS_SINK_DATA_COMPRESS_TYPE); } public String getLoadUrlStr() { @@ -222,13 +226,34 @@ public long load(Iterator rows, StructType schema) String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); this.loadUrlStr = loadUrlStr; HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema); - RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) - .format(dataFormat) - .sep(FIELD_DELIMITER) - .delim(LINE_DELIMITER) - .schema(schema) - .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); - httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); + + if(StringUtils.isNotEmpty(compressType)){ + if("gz".equals(compressType.toLowerCase())){ + if(dataFormat.equals(DataFormat.CSV)){ + RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows) + .format(dataFormat) + .sep(FIELD_DELIMITER) + .delim(LINE_DELIMITER) + .schema(schema) + .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); + String content = recordBatchString.getContent(); + byte[] compressedData = compressByGZ(content); + httpPut.setEntity(new ByteArrayEntity(compressedData)); + }else{ + throw new StreamLoadException("compress data of JSON format is not supported"); + } + }else{ + throw new StreamLoadException("not support the compress type: " + compressType); + } + }else{ + RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) + .format(dataFormat) + .sep(FIELD_DELIMITER) + .delim(LINE_DELIMITER) + .schema(schema) + .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); + httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); + } HttpResponse httpResponse = httpClient.execute(httpPut); loadResponse = new LoadResponse(httpResponse); } catch (IOException e) { @@ -447,7 +472,7 @@ public BackendCacheLoader(SparkSettings settings) { @Override public List load(String key) throws Exception { - return RestService.getBackendRows(settings, LOG); + return RestService.getBackendRows(settings, LOG); } } @@ -505,4 +530,19 @@ private void handleStreamPassThrough() { } + /** + * compress data by gz compression algorithm + */ + private byte[] compressByGZ(String content) throws IOException{ + byte[] compressedData; + try(ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos); + ){ + gzipOutputStream.write(content.getBytes("UTF-8")); + gzipOutputStream.finish(); + compressedData = baos.toByteArray(); + } + return compressedData; + } + } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java new file mode 100644 index 00000000..c8024eee --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.load; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.doris.spark.exception.DorisException; +import org.apache.doris.spark.exception.IllegalArgumentException; +import org.apache.doris.spark.util.DataUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * InputStream for batch load + */ +public class RecordBatchString { + + public static final Logger LOG = LoggerFactory.getLogger(RecordBatchString.class); + + /** + * Load record batch + */ + private final RecordBatch recordBatch; + + private final byte[] delim; + + /** + * record count has been read + */ + private int readCount = 0; + + /** + * streaming mode pass through data without process + */ + private final boolean passThrough; + + public RecordBatchString(RecordBatch recordBatch, boolean passThrough) { + this.recordBatch = recordBatch; + this.passThrough = passThrough; + this.delim = recordBatch.getDelim(); + } + + public String getContent() throws IOException { + String delimStr = new String(this.recordBatch.getDelim()); + StringBuilder builder = new StringBuilder(); + Iterator iterator = recordBatch.getIterator(); + while (iterator.hasNext()) { + try { + builder.append(rowToString(iterator.next())); + } catch (DorisException e) { + throw new IOException(e); + } + builder.append(delimStr); + } + return builder.toString().substring(0, builder.length()-delimStr.length()); + } + + + /** + * Convert Spark row data to string + * + * @param row row data + * @return byte array + * @throws DorisException + */ + private String rowToString(InternalRow row) throws DorisException { + + String str; + + if (passThrough) { + str = row.getString(0); + return str; + } + + switch (recordBatch.getFormat()) { + case CSV: + str = DataUtil.rowToCsvString(row, recordBatch.getSchema(), recordBatch.getSep(), recordBatch.getAddDoubleQuotes()); + break; + case JSON: + try { + str = DataUtil.rowToJsonString(row, recordBatch.getSchema()); + } catch (JsonProcessingException e) { + throw new DorisException("parse row to json bytes failed", e); + } + break; + default: + throw new IllegalArgumentException("Unsupported format: ", recordBatch.getFormat().toString()); + } + + return str; + + } +} diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index f7218c34..5c4527e1 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -40,7 +40,7 @@ public class DataUtil { public static final String NULL_VALUE = "\\N"; - public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) { + public static String rowToCsvString(InternalRow row, StructType schema, String sep, boolean quote) { StructField[] fields = schema.fields(); int n = row.numFields(); if (n > 0) { @@ -51,9 +51,13 @@ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String se value = "\"" + value + "\""; } return value.toString(); - }).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8); + }).collect(Collectors.joining(sep)); } - return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8); + return StringUtils.EMPTY; + } + + public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) { + return rowToCsvString(row, schema, sep, quote).getBytes(StandardCharsets.UTF_8); } public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException { @@ -65,4 +69,13 @@ public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws J return MAPPER.writeValueAsBytes(rowMap); } + public static String rowToJsonString(InternalRow row, StructType schema) throws JsonProcessingException { + StructField[] fields = schema.fields(); + Map rowMap = new HashMap<>(row.numFields()); + for (int i = 0; i < fields.length; i++) { + rowMap.put(fields[i].name(), SchemaUtils.rowColumnValue(row, i, fields[i].dataType())); + } + return MAPPER.writeValueAsString(rowMap); + } + } From f7c9368fd5565ca5fb4cae73e3bed21a6a3f764e Mon Sep 17 00:00:00 2001 From: felen <1489341561@qq.com> Date: Thu, 18 Jan 2024 09:24:06 +0800 Subject: [PATCH 2/2] compress data before stream_load --- .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 28e8fdd7..7c133100 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -128,7 +128,7 @@ public interface ConfigurationOptions { boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false; /** - * Use automatic redirection of fe without explicitly obtaining the be list + * compress_type */ String DORIS_SINK_DATA_COMPRESS_TYPE = "doris.sink.properties.compress_type";