Skip to content

Commit

Permalink
Merge pull request #1759 from ClickHouse/fix_reading_binary
Browse files Browse the repository at this point in the history
[client-v2] Fix reading binary streams
  • Loading branch information
chernser authored Aug 2, 2024
2 parents 098772d + f7235db commit 5a49f48
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 93 deletions.
9 changes: 5 additions & 4 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public Builder setSocketLinger(int secondsToWait) {
* @param enabled - indicates if server response compression is enabled
*/
public Builder compressServerResponse(boolean enabled) {
this.configuration.put("compress", String.valueOf(enabled));
this.configuration.put(ClickHouseClientOption.COMPRESS.getKey(), String.valueOf(enabled));
return this;
}

Expand All @@ -403,7 +403,7 @@ public Builder compressServerResponse(boolean enabled) {
* @param enabled - indicates if client request compression is enabled
*/
public Builder compressClientRequest(boolean enabled) {
this.configuration.put("decompress", String.valueOf(enabled));
this.configuration.put(ClickHouseClientOption.DECOMPRESS.getKey(), String.valueOf(enabled));
return this;
}

Expand Down Expand Up @@ -1149,8 +1149,9 @@ public List<GenericRecord> queryAll(String sqlQuery) {
List<GenericRecord> records = new ArrayList<>();
if (response.getResultRows() > 0) {
ClickHouseBinaryFormatReader reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream());
while (reader.hasNext()) {
records.add(new MapBackedRecord(reader.next(), reader.getSchema()));
Map<String, Object> record;
while ((record = reader.next()) != null) {
records.add(new MapBackedRecord(record, reader.getSchema()));
}
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface ClickHouseBinaryFormatReader {
/**
* Moves cursor to the next row. Must be called before reading the first row.
*
* @return true if there are more rows to read, false otherwise
* @return map filled with column values or null if no more records are available
*/
Map<String, Object> next();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand All @@ -28,20 +29,30 @@ public NativeFormatReader(InputStream inputStream) {

public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
super(inputStream, settings, null);
readNextRecord();
}

@Override
protected void readRecord(Map<String, Object> record) throws IOException {
protected boolean readRecord(Map<String, Object> record) throws IOException {
if (currentBlock == null || blockRowIndex >= currentBlock.getnRows()) {
readBlock();
if (!readBlock()) {
return false;
}
}

currentBlock.fillRecord(blockRowIndex, record);
blockRowIndex++;
return true;
}

private void readBlock() throws IOException {
int nColumns = BinaryStreamReader.readVarInt(input);
private boolean readBlock() throws IOException {
int nColumns;
try {
nColumns = BinaryStreamReader.readVarInt(input);
} catch (EOFException e) {
endReached();
return false;
}
int nRows = BinaryStreamReader.readVarInt(input);

List<String> names = new ArrayList<>(nColumns);
Expand All @@ -61,6 +72,7 @@ private void readBlock() throws IOException {
currentBlock.add(values);
}
blockRowIndex = 0;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
Expand All @@ -17,13 +18,27 @@ public RowBinaryFormatReader(InputStream inputStream, TableSchema schema) {

public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
super(inputStream, querySettings, schema);
readNextRecord();
}

@Override
public void readRecord(Map<String, Object> record) throws IOException {
public boolean readRecord(Map<String, Object> record) throws IOException {
boolean firstColumn = true;
for (ClickHouseColumn column : getSchema().getColumns()) {
record.put(column.getColumnName(), binaryStreamReader
.readValue(column));
try {
Object val = binaryStreamReader.readValue(column);
if (val != null) {
record.put(column.getColumnName(),val);
}
firstColumn = false;
} catch (EOFException e) {
if (firstColumn) {
endReached();
return false;
}
throw e;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ private void readSchema() {
try {
TableSchema headerSchema = new TableSchema();
List<String> columns = new ArrayList<>();
int nCol = BinaryStreamReader.readVarInt(input);
int nCol;
try {
nCol = BinaryStreamReader.readVarInt(input);
} catch (EOFException e) {
endReached();
return;
}
for (int i = 0; i < nCol; i++) {
columns.add(BinaryStreamReader.readString(input));
}
Expand All @@ -44,21 +50,4 @@ private void readSchema() {
throw new ClientException("Failed to read header", e);
}
}

/**
* Reads a row to a map using column definitions from the schema.
* If column type mismatch and cannot be converted, an exception will be thrown.
*
* @param record data destination
* @throws IOException
*/
@Override
public void readRecord(Map<String, Object> record) throws IOException {
for (ClickHouseColumn column : getSchema().getColumns()) {
Object val = binaryStreamReader.readValue(column);
if (val != null) {
record.put(column.getColumnName(),val);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand All @@ -19,28 +20,34 @@ public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {

public RowBinaryWithNamesFormatReader(InputStream inputStream, TableSchema schema) {
this(inputStream, null, schema);
readNextRecord();
}

public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
super(inputStream, querySettings, schema);
}
int nCol = 0;
try {
nCol = BinaryStreamReader.readVarInt(input);
} catch (EOFException e) {
endReached();
columns = Collections.emptyList();
} catch (IOException e) {
throw new RuntimeException("Failed to read header", e);
}

@Override
public void readRecord(Map<String, Object> record) throws IOException {
if (columns == null) {
columns = new ArrayList<>();
int nCol = BinaryStreamReader.readVarInt(input);
for (int i = 0; i < nCol; i++) {
columns.add(BinaryStreamReader.readString(input));
if (nCol > 0) {
columns = new ArrayList<>(nCol);
try {
for (int i = 0; i < nCol; i++) {
columns.add(BinaryStreamReader.readString(input));
}
} catch (IOException e) {
throw new RuntimeException("Failed to read header", e);
}

columns = Collections.unmodifiableList(columns);
}

for (ClickHouseColumn column : getSchema().getColumns()) {
record.put(column.getColumnName(), binaryStreamReader
.readValue(column));
}
readNextRecord();
}

public List<String> getColumns() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm

private TableSchema schema;

protected volatile boolean hasNext = true;
private volatile boolean hasNext = true;

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
this.input = inputStream;
Expand All @@ -58,8 +58,28 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
}

protected Map<String, Object> currentRecord = new ConcurrentHashMap<>();
protected Map<String, Object> nextRecord = new ConcurrentHashMap<>();

protected abstract void readRecord(Map<String, Object> record) throws IOException;

protected boolean readRecord(Map<String, Object> record) throws IOException {
boolean firstColumn = true;
for (ClickHouseColumn column : getSchema().getColumns()) {
try {
Object val = binaryStreamReader.readValue(column);
if (val != null) {
record.put(column.getColumnName(),val);
}
firstColumn = false;
} catch (EOFException e) {
if (firstColumn) {
endReached();
return false;
}
throw e;
}
}
return true;
}

@Override
public <T> T readValue(int colIndex) {
Expand All @@ -77,37 +97,55 @@ public <T> T readValue(String colName) {

@Override
public boolean hasNext() {
if (hasNext) {
try {
hasNext = input.available() > 0;
return hasNext;
} catch (IOException e) {
return hasNext;
}


protected void readNextRecord() {
try {
nextRecord.clear();
if (!readRecord(nextRecord)) {
hasNext = false;
LOG.error("Failed to check if there is more data available", e);
return false;
}
} catch (IOException e) {
hasNext = false;
throw new ClientException("Failed to read next row", e);
}
return false;
}

@Override
public Map<String, Object> next() {
if (!hasNext) {
throw new NoSuchElementException();
return null;
}

try {
readRecord(currentRecord);
if (!nextRecord.isEmpty()) {
Map<String, Object> tmp = currentRecord;
currentRecord = nextRecord;
nextRecord = tmp;
readNextRecord();
return currentRecord;
} catch (EOFException e) {
hasNext = false;
return null;
} catch (IOException e) {
hasNext = false;
throw new ClientException("Failed to read row", e);
} else {
try {
currentRecord.clear();
if (readRecord(currentRecord)) {
readNextRecord();
return currentRecord;
} else {
currentRecord = null;
return null;
}
} catch (IOException e) {
hasNext = false;
throw new ClientException("Failed to read row", e);
}
}
}

protected void endReached() {
hasNext = false;
}

protected void setSchema(TableSchema schema) {
this.schema = schema;
}
Expand Down
Loading

0 comments on commit 5a49f48

Please sign in to comment.