Skip to content

Commit

Permalink
Merge pull request #1 from fluozhiye/V20240117
Browse files Browse the repository at this point in the history
V20240117
  • Loading branch information
fluozhiye authored Jan 18, 2024
2 parents 95c278d + f7c9368 commit 248eb7a
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public interface ConfigurationOptions {
String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect";
boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false;


/**
* compress_type
*/
String DORIS_SINK_DATA_COMPRESS_TYPE = "doris.sink.properties.compress_type";

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
Expand All @@ -50,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
Expand All @@ -70,7 +72,7 @@
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import java.util.zip.GZIPOutputStream;

/**
* DorisStreamLoad
Expand Down Expand Up @@ -107,6 +109,7 @@ public class DorisStreamLoad implements Serializable {
private final Integer txnRetries;
private final Integer txnIntervalMs;
private final boolean autoRedirect;
private final String compressType;

public DorisStreamLoad(SparkSettings settings) {
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
Expand Down Expand Up @@ -147,6 +150,7 @@ public DorisStreamLoad(SparkSettings settings) {

this.autoRedirect = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT);
compressType=settings.getProperty(ConfigurationOptions.DORIS_SINK_DATA_COMPRESS_TYPE);
}

public String getLoadUrlStr() {
Expand Down Expand Up @@ -222,13 +226,34 @@ public long load(Iterator<InternalRow> rows, StructType schema)
String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));

if(StringUtils.isNotEmpty(compressType)){
if("gz".equals(compressType.toLowerCase())){
if(dataFormat.equals(DataFormat.CSV)){
RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
String content = recordBatchString.getContent();
byte[] compressedData = compressByGZ(content);
httpPut.setEntity(new ByteArrayEntity(compressedData));
}else{
throw new StreamLoadException("compress data of JSON format is not supported");
}
}else{
throw new StreamLoadException("not support the compress type: " + compressType);
}
}else{
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
}
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
} catch (IOException e) {
Expand Down Expand Up @@ -447,7 +472,7 @@ public BackendCacheLoader(SparkSettings settings) {

@Override
public List<BackendV2.BackendRowV2> load(String key) throws Exception {
return RestService.getBackendRows(settings, LOG);
return RestService.getBackendRows(settings, LOG);
}
}

Expand Down Expand Up @@ -505,4 +530,19 @@ private void handleStreamPassThrough() {

}

/**
* compress data by gz compression algorithm
*/
private byte[] compressByGZ(String content) throws IOException{
byte[] compressedData;
try(ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
){
gzipOutputStream.write(content.getBytes("UTF-8"));
gzipOutputStream.finish();
compressedData = baos.toByteArray();
}
return compressedData;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.load;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.util.DataUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;

/**
* InputStream for batch load
*/
public class RecordBatchString {

public static final Logger LOG = LoggerFactory.getLogger(RecordBatchString.class);

/**
* Load record batch
*/
private final RecordBatch recordBatch;

private final byte[] delim;

/**
* record count has been read
*/
private int readCount = 0;

/**
* streaming mode pass through data without process
*/
private final boolean passThrough;

public RecordBatchString(RecordBatch recordBatch, boolean passThrough) {
this.recordBatch = recordBatch;
this.passThrough = passThrough;
this.delim = recordBatch.getDelim();
}

public String getContent() throws IOException {
String delimStr = new String(this.recordBatch.getDelim());
StringBuilder builder = new StringBuilder();
Iterator<InternalRow> iterator = recordBatch.getIterator();
while (iterator.hasNext()) {
try {
builder.append(rowToString(iterator.next()));
} catch (DorisException e) {
throw new IOException(e);
}
builder.append(delimStr);
}
return builder.toString().substring(0, builder.length()-delimStr.length());
}


/**
* Convert Spark row data to string
*
* @param row row data
* @return byte array
* @throws DorisException
*/
private String rowToString(InternalRow row) throws DorisException {

String str;

if (passThrough) {
str = row.getString(0);
return str;
}

switch (recordBatch.getFormat()) {
case CSV:
str = DataUtil.rowToCsvString(row, recordBatch.getSchema(), recordBatch.getSep(), recordBatch.getAddDoubleQuotes());
break;
case JSON:
try {
str = DataUtil.rowToJsonString(row, recordBatch.getSchema());
} catch (JsonProcessingException e) {
throw new DorisException("parse row to json bytes failed", e);
}
break;
default:
throw new IllegalArgumentException("Unsupported format: ", recordBatch.getFormat().toString());
}

return str;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class DataUtil {

public static final String NULL_VALUE = "\\N";

public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
public static String rowToCsvString(InternalRow row, StructType schema, String sep, boolean quote) {
StructField[] fields = schema.fields();
int n = row.numFields();
if (n > 0) {
Expand All @@ -51,9 +51,13 @@ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String se
value = "\"" + value + "\"";
}
return value.toString();
}).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8);
}).collect(Collectors.joining(sep));
}
return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8);
return StringUtils.EMPTY;
}

public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
return rowToCsvString(row, schema, sep, quote).getBytes(StandardCharsets.UTF_8);
}

public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException {
Expand All @@ -65,4 +69,13 @@ public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws J
return MAPPER.writeValueAsBytes(rowMap);
}

public static String rowToJsonString(InternalRow row, StructType schema) throws JsonProcessingException {
StructField[] fields = schema.fields();
Map<String, Object> rowMap = new HashMap<>(row.numFields());
for (int i = 0; i < fields.length; i++) {
rowMap.put(fields[i].name(), SchemaUtils.rowColumnValue(row, i, fields[i].dataType()));
}
return MAPPER.writeValueAsString(rowMap);
}

}

0 comments on commit 248eb7a

Please sign in to comment.