Skip to content

Commit

Permalink
[Improve]Improve import speed by compressing data at low internet spe…
Browse files Browse the repository at this point in the history
…eds (#180)
  • Loading branch information
fluozhiye authored Jan 23, 2024
1 parent ade6a3c commit 1fcf275
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public interface ConfigurationOptions {
String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect";
boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false;


/**
* compress_type
*/
String DORIS_SINK_DATA_COMPRESS_TYPE = "doris.sink.properties.compress_type";

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
Expand All @@ -50,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
Expand All @@ -70,7 +72,7 @@
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import java.util.zip.GZIPOutputStream;

/**
* DorisStreamLoad
Expand Down Expand Up @@ -107,6 +109,7 @@ public class DorisStreamLoad implements Serializable {
private final Integer txnRetries;
private final Integer txnIntervalMs;
private final boolean autoRedirect;
private final String compressType;

public DorisStreamLoad(SparkSettings settings) {
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
Expand Down Expand Up @@ -147,6 +150,7 @@ public DorisStreamLoad(SparkSettings settings) {

this.autoRedirect = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT);
compressType=settings.getProperty(ConfigurationOptions.DORIS_SINK_DATA_COMPRESS_TYPE);
}

public String getLoadUrlStr() {
Expand Down Expand Up @@ -222,13 +226,30 @@ public long load(Iterator<InternalRow> rows, StructType schema)
String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));

if(StringUtils.isNotEmpty(compressType)){
if("gz".equals(compressType.toLowerCase()) && dataFormat.equals(DataFormat.CSV) ){
RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
String content = recordBatchString.getContent();
byte[] compressedData = compressByGZ(content);
httpPut.setEntity(new ByteArrayEntity(compressedData));
}else{
throw new StreamLoadException("Not support the compress type [" + compressType + "] for the dataformat [" + dataFormat + "]");
}
}else{
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
}
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
} catch (IOException e) {
Expand Down Expand Up @@ -447,7 +468,7 @@ public BackendCacheLoader(SparkSettings settings) {

@Override
public List<BackendV2.BackendRowV2> load(String key) throws Exception {
return RestService.getBackendRows(settings, LOG);
return RestService.getBackendRows(settings, LOG);
}
}

Expand Down Expand Up @@ -505,4 +526,19 @@ private void handleStreamPassThrough() {

}

/**
* compress data by gz compression algorithm
*/
public byte[] compressByGZ(String content) throws IOException{
byte[] compressedData;
try(ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
){
gzipOutputStream.write(content.getBytes("UTF-8"));
gzipOutputStream.finish();
compressedData = baos.toByteArray();
}
return compressedData;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.load;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.util.DataUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;

/**
* InputStream for batch load
*/
public class RecordBatchString {

public static final Logger LOG = LoggerFactory.getLogger(RecordBatchString.class);

/**
* Load record batch
*/
private final RecordBatch recordBatch;

private final byte[] delim;

/**
* record count has been read
*/
private int readCount = 0;

/**
* streaming mode pass through data without process
*/
private final boolean passThrough;

public RecordBatchString(RecordBatch recordBatch, boolean passThrough) {
this.recordBatch = recordBatch;
this.passThrough = passThrough;
this.delim = recordBatch.getDelim();
}

public String getContent() throws IOException {
String delimStr = new String(this.recordBatch.getDelim());
StringBuilder builder = new StringBuilder();
Iterator<InternalRow> iterator = recordBatch.getIterator();
while (iterator.hasNext()) {
try {
builder.append(rowToString(iterator.next()));
} catch (DorisException e) {
throw new IOException(e);
}
builder.append(delimStr);
}
return builder.toString().substring(0, builder.length()-delimStr.length());
}


/**
* Convert Spark row data to string
*
* @param row row data
* @return byte array
* @throws DorisException
*/
private String rowToString(InternalRow row) throws DorisException {

String str;

if (passThrough) {
str = row.getString(0);
return str;
}

switch (recordBatch.getFormat()) {
case CSV:
str = DataUtil.rowToCsvString(row, recordBatch.getSchema(), recordBatch.getSep(), recordBatch.getAddDoubleQuotes());
break;
case JSON:
try {
str = DataUtil.rowToJsonString(row, recordBatch.getSchema());
} catch (JsonProcessingException e) {
throw new DorisException("parse row to json bytes failed", e);
}
break;
default:
throw new IllegalArgumentException("Unsupported format: ", recordBatch.getFormat().toString());
}

return str;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class DataUtil {

public static final String NULL_VALUE = "\\N";

public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
public static String rowToCsvString(InternalRow row, StructType schema, String sep, boolean quote) {
StructField[] fields = schema.fields();
int n = row.numFields();
if (n > 0) {
Expand All @@ -51,9 +51,13 @@ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String se
value = "\"" + value + "\"";
}
return value.toString();
}).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8);
}).collect(Collectors.joining(sep));
}
return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8);
return StringUtils.EMPTY;
}

public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
return rowToCsvString(row, schema, sep, quote).getBytes(StandardCharsets.UTF_8);
}

public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException {
Expand All @@ -65,4 +69,13 @@ public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws J
return MAPPER.writeValueAsBytes(rowMap);
}

public static String rowToJsonString(InternalRow row, StructType schema) throws JsonProcessingException {
StructField[] fields = schema.fields();
Map<String, Object> rowMap = new HashMap<>(row.numFields());
for (int i = 0; i < fields.length; i++) {
rowMap.put(fields[i].name(), SchemaUtils.rowColumnValue(row, i, fields[i].dataType()));
}
return MAPPER.writeValueAsString(rowMap);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.load;

import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.spark.SparkConf;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class TestDorisStreamLoad {

@Test
public void compressByGZ() throws IOException {
String content = "1,aa,1\n" +
"2,aa,2\n" +
"3,aa,3\n" +
"4,aa,4\n" +
"5,aa,5\n" +
"6,aa,6\n" +
"7,aa,7\n" +
"8,aa,8\n" +
"9,aa,9\n" +
"10,aa,10\n" +
"11,aa,11\n" +
"12,aa,12\n" +
"13,aa,13\n" +
"14,aa,14\n" +
"15,aa,15\n" +
"16,aa,16\n" +
"17,aa,17\n" +
"18,aa,18\n" +
"19,aa,19\n" +
"20,aa,20\n" +
"21,aa,21\n" +
"22,aa,22\n" +
"23,aa,23\n" +
"24,aa,24\n" +
"25,aa,25\n" +
"26,aa,26\n" +
"27,aa,27\n" +
"28,aa,28\n" +
"29,aa,29\n" +
"30,aa,30\n" +
"31,aa,31\n" +
"32,aa,32\n" +
"33,aa,33\n" +
"34,aa,34\n" +
"35,aa,35\n" +
"36,aa,36\n" +
"37,aa,37\n" +
"38,aa,38\n" +
"39,aa,39";
byte[] compressByte = new DorisStreamLoad(new SparkSettings(new SparkConf().set("doris.table.identifier", "aa.bb"))).compressByGZ(content);

int contentByteLength = content.getBytes("utf-8").length;
int compressByteLength = compressByte.length;
System.out.println(contentByteLength);
System.out.println(compressByteLength);
Assert.assertTrue(contentByteLength > compressByteLength);

java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream();
java.io.ByteArrayInputStream in = new java.io.ByteArrayInputStream(compressByte);
java.util.zip.GZIPInputStream ungzip = new java.util.zip.GZIPInputStream(in);
byte[] buffer = new byte[1024];
int n;
while ((n = ungzip.read(buffer)) >= 0) out.write(buffer, 0, n);
byte[] unGzipByte = out.toByteArray();

String unGzipStr = new String(unGzipByte);
Assert.assertArrayEquals(unGzipStr.getBytes("utf-8"), content.getBytes("utf-8"));
Assert.assertEquals(unGzipStr, content);
}
}

0 comments on commit 1fcf275

Please sign in to comment.