Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] Fix reading arrays #1800

Merged
merged 3 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm

private TableSchema schema;

private ClickHouseColumn[] columns;

private volatile boolean hasNext = true;

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
Expand Down Expand Up @@ -85,7 +87,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
*/
public boolean readRecord(Map<String, Object> record) throws IOException {
boolean firstColumn = true;
for (ClickHouseColumn column : getSchema().getColumns()) {
for (ClickHouseColumn column : columns) {
try {
Object val = binaryStreamReader.readValue(column);
if (val != null) {
Expand Down Expand Up @@ -170,6 +172,9 @@ protected void endReached() {

protected void setSchema(TableSchema schema) {
this.schema = schema;
if (schema != null) {
columns = schema.getColumns().toArray(new ClickHouseColumn[0]);
}
}

@Override
Expand Down Expand Up @@ -371,8 +376,8 @@ public ClickHouseGeoMultiPolygonValue getGeoMultiPolygon(String colName) {

@Override
public <T> List<T> getList(String colName) {
ClickHouseArrayValue<?> array = readValue(colName);
return null;
BinaryStreamReader.ArrayValue array = readValue(colName);
return array.asList();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
Expand Down Expand Up @@ -146,8 +148,10 @@ private <T> T readValueImpl(ClickHouseColumn column) throws IOException {
case IntervalNanosecond:
return (T) readBigIntegerLE(input, 8, true);
case IPv4:
return (T) Inet4Address.getByAddress(readNBytes(input, 4));
// https://clickhouse.com/docs/en/sql-reference/data-types/ipv4
return (T) Inet4Address.getByAddress(readNBytesLE(input, 4));
case IPv6:
// https://clickhouse.com/docs/en/sql-reference/data-types/ipv6
return (T) Inet6Address.getByAddress(readNBytes(input, 16));
case UUID:
return (T) new UUID(readLongLE(input), readLongLE(input));
Expand Down Expand Up @@ -290,6 +294,22 @@ public static byte[] readNBytes(InputStream inputStream, int len) throws IOExcep
return bytes;
}

public static byte[] readNBytesLE(InputStream input, int len) throws IOException {
byte[] bytes = readNBytes(input, len);

int s = 0;
int i = len - 1;
while (s < i) {
byte b = bytes[s];
bytes[s] = bytes[i];
bytes[i] = b;
s++;
i--;
}

return bytes;
}

private ArrayValue readArray(ClickHouseColumn column) throws IOException {
Class<?> itemType = column.getArrayBaseColumn().getDataType().getWiderPrimitiveClass();
int len = readVarInt(input);
Expand Down Expand Up @@ -345,6 +365,24 @@ public void set(int index, Object value) {
" value " + value + " of class " + value.getClass().getName(), e);
}
}

private List<?> list = null;

public synchronized <T> List<T> asList() {
if (list == null) {
ArrayList<T> list = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
Object item = get(i);
if (item instanceof ArrayValue) {
list.add((T) ((ArrayValue) item).asList());
} else {
list.add((T) item);
}
}
this.list = list;
}
return (List<T>) list;
}
}

private Map<?,?> readMap(ClickHouseColumn column) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class TableSchema {

private List<ClickHouseColumn> columns;

private List<ClickHouseColumn> columnsView;
Paultagoras marked this conversation as resolved.
Show resolved Hide resolved

private Map<String, Map<String, Object>> metadata;

private Map<String, Integer> colIndex;
Expand All @@ -25,6 +27,7 @@ public class TableSchema {
public TableSchema() {
this.metadata = new HashMap<>();
this.columns = new ArrayList<>();
this.columnsView = Collections.unmodifiableList(this.columns);
this.colIndex = new HashMap<>();
}

Expand All @@ -34,7 +37,7 @@ public TableSchema() {
* @return - collection of columns in the table
*/
public List<ClickHouseColumn> getColumns() {
return Collections.unmodifiableList(columns);
return columnsView;
}

public String getDatabaseName() {
Expand Down
107 changes: 104 additions & 3 deletions client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
import java.io.OutputStreamWriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -181,6 +185,39 @@ public void testBigUnsignedInt() throws Exception {
Assert.assertEquals(firstRecord.getBigInteger("i256"), expected256);
}

@Test(groups = {"integration"})
public void testEndianReadingNumbers() throws Exception {

byte[][] numbers = new byte[][] {
new byte[] {0x00, 0x02, 0x00, 0x01},
new byte[] {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08},
new byte[] {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10},
};


for (byte[] number : numbers) {
String typeName = "UInt32";
if (number.length == 8) {
typeName = "UInt64";
} else if (number.length == 16) {
typeName = "UInt128";
}
BigInteger expected = new BigInteger(number);
String sqlQuery = "SELECT to" + typeName + "('" + expected + "') as value1";
System.out.println(sqlQuery);
Records records = client.queryRecords(sqlQuery).get(3, TimeUnit.SECONDS);
GenericRecord firstRecord = records.iterator().next();

if (number.length == 4) {
System.out.println(firstRecord.getLong("value1"));
Assert.assertEquals(firstRecord.getLong("value1"), expected.longValue());
} else {
System.out.println(firstRecord.getBigInteger("value1"));
Assert.assertEquals(firstRecord.getBigInteger("value1"), expected);
}
}
}

@Test(groups = {"integration"})
public void testReadRecordsWithStreamAPI() throws Exception {
final int tables = 10;
Expand Down Expand Up @@ -426,7 +463,8 @@ record = reader.next();

private final static List<Function<String, Object>> ARRAY_VALUE_GENERATORS = Arrays.asList(
c ->
RANDOM.ints(10, 0, 100),
RANDOM.ints(10, 0, 100)
.asLongStream().collect(ArrayList::new, ArrayList::add, ArrayList::addAll),
c -> {
List<List<Integer>> values = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Expand All @@ -453,9 +491,13 @@ public void testArrayValues() throws Exception {

Map<String, Object> record = reader.next();
Assert.assertNotNull(record);
Map<String, Object> datasetRecord = data.get(0);
long[] col1Values = reader.getLongArray("col1");
System.out.println("col1: " + Arrays.toString(col1Values));
System.out.println("Record: " + record);
Assert.assertEquals(Arrays.stream(col1Values).collect(ArrayList<Long>::new, ArrayList::add,
ArrayList::addAll), datasetRecord.get("col1"));
Assert.assertEquals(reader.getList("col1"), datasetRecord.get("col1"));
List<List<Long>> col2Values = reader.getList("col2");
Assert.assertEquals(col2Values, data.get(0).get("col2"));
}

private final static List<String> MAP_COLUMNS = Arrays.asList(
Expand Down Expand Up @@ -574,6 +616,65 @@ public void testNullValues() throws Exception {
}
}

@Test
public void testIPAddresses() throws Exception {

final List<String> columns = Arrays.asList(
"srcV4 IPv4",
"targetV4 IPv4",
"srcV6 IPv6",
"targetV6 IPv6"

);

Random random = new Random();
byte[] ipv4 = new byte[4];
random.nextBytes(ipv4);
InetAddress ipv4src = Inet4Address.getByAddress(ipv4);
random.nextBytes(ipv4);
InetAddress ipv4target = Inet4Address.getByAddress(ipv4);
byte[] ipv6 = new byte[16];
random.nextBytes(ipv6);
InetAddress ipv6src = Inet6Address.getByAddress(ipv6);
random.nextBytes(ipv6);
InetAddress ipv6target = Inet6Address.getByAddress(ipv6);


final List<Supplier<String>> valueGenerators = Arrays.asList(
() -> sq(ipv4src.getHostAddress()),
() -> sq(ipv4target.getHostAddress()),
() -> sq(ipv6src.getHostAddress()),
() -> sq(ipv6target.getHostAddress())
);

final List<Consumer<ClickHouseBinaryFormatReader>> verifiers = new ArrayList<>();
verifiers.add(r -> {
Assert.assertTrue(r.hasValue("srcV4"), "No value for column srcV4 found");
Assert.assertEquals(r.getInet4Address("srcV4"), ipv4src);
Assert.assertEquals(r.getInet4Address(1), ipv4src);
});

verifiers.add(r -> {
Assert.assertTrue(r.hasValue("targetV4"), "No value for column targetV4 found");
Assert.assertEquals(r.getInet4Address("targetV4"), ipv4target);
Assert.assertEquals(r.getInet4Address(2), ipv4target);
});

verifiers.add(r -> {
Assert.assertTrue(r.hasValue("srcV6"), "No value for column src6 found");
Assert.assertEquals(r.getInet6Address("srcV6"), ipv6src);
Assert.assertEquals(r.getInet6Address(3), ipv6src);
});

verifiers.add(r -> {
Assert.assertTrue(r.hasValue("targetV6"), "No value for column targetV6 found");
Assert.assertEquals(r.getInet6Address("targetV6"), ipv6target);
Assert.assertEquals(r.getInet6Address(4), ipv6target);
});

testDataTypes(columns, valueGenerators, verifiers);
}

@Test
public void testDateTimeDataTypes() {
final List<String> columns = Arrays.asList(
Expand Down
Loading