Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/maven/com.mysql-mysql-connector…
Browse files Browse the repository at this point in the history
…-j-9.1.0
  • Loading branch information
RustedBones authored Dec 11, 2024
2 parents 7b15d31 + 1f38aa8 commit 07443b4
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 176 deletions.
19 changes: 19 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,34 @@ updates:
- dependency-name: com.fasterxml.jackson.core:jackson-annotations
- dependency-name: com.fasterxml.jackson.core:jackson-core
- dependency-name: com.fasterxml.jackson.core:jackson-databind
- dependency-name: com.fasterxml.jackson:jackson-bom
- dependency-name: com.github.luben:zstd-jni
- dependency-name: com.google.api-client:google-api-client
- dependency-name: com.google.api.grpc:proto-google-iam-v1
- dependency-name: com.google.apis:google-api-services-storage
- dependency-name: com.google.auto.value:auto-value
- dependency-name: com.google.auto.value:auto-value-annotations
- dependency-name: com.google.cloud.sql:mysql-socket-factory
- dependency-name: com.google.cloud.sql:postgres-socket-factory
- dependency-name: com.google.cloud:libraries-bom
- dependency-name: com.google.errorprone:error_prone_annotations
- dependency-name: com.google.guava:guava-bom
- dependency-name: com.google.http-client:google-http-client
- dependency-name: com.google.http-client:google-http-client-test
- dependency-name: com.google.oauth-client:google-oauth-client
- dependency-name: com.google.protobuf:protobuf-java
- dependency-name: com.google.protobuf:protobuf-java-util
- dependency-name: commons-codec:commons-codec
- dependency-name: org.apache.beam:beam-sdks-java-bom # manually bump this
- dependency-name: org.apache.commons:commons-compress
- dependency-name: org.apache.httpcomponents:httpcore
- dependency-name: org.apache.httpcomponents:httpclient
- dependency-name: org.checkerframework:checker-qual
- dependency-name: org.slf4j:slf4j-api
- dependency-name: org.slf4j:slf4j-jdk14
- dependency-name: org.threeten:threetenbp
- dependency-name: io.grpc:grpc-bom
- dependency-name: io.netty:netty-bom
- dependency-name: joda-time:joda-time
# sync with libraries-bom
- dependency-name: io.opencensus:opencensus-contrib-grpc-metrics
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DBeam
Copyright 2017 Spotify AB
Copyright 2024 Spotify AB
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ static SqlFunction<ResultSet, Object> computeMapping(
} else {
return resultSet -> nullableBytes(resultSet.getBytes(column));
}
case ARRAY:
return resultSet -> resultSet.getArray(column);
case BINARY:
case VARBINARY:
case LONGVARBINARY:
case ARRAY:
case BLOB:
return resultSet -> nullableBytes(resultSet.getBytes(column));
case DOUBLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,39 @@ public ByteBuffer convertResultSetIntoAvroBytes() throws SQLException, IOExcepti
binaryEncoder.writeNull();
} else {
binaryEncoder.writeIndex(1);
if (value instanceof String) {
binaryEncoder.writeString((String) value);
} else if (value instanceof Long) {
binaryEncoder.writeLong((Long) value);
} else if (value instanceof Integer) {
binaryEncoder.writeInt((Integer) value);
} else if (value instanceof Boolean) {
binaryEncoder.writeBoolean((Boolean) value);
} else if (value instanceof ByteBuffer) {
binaryEncoder.writeBytes((ByteBuffer) value);
} else if (value instanceof Double) {
binaryEncoder.writeDouble((Double) value);
} else if (value instanceof Float) {
binaryEncoder.writeFloat((Float) value);
}
writeValue(value, binaryEncoder);
}
}
binaryEncoder.flush();
return ByteBuffer.wrap(out.getBufffer(), 0, out.size());
}

private void writeValue(Object value, BinaryEncoder binaryEncoder)
throws SQLException, IOException {
if (value instanceof String) {
binaryEncoder.writeString((String) value);
} else if (value instanceof Long) {
binaryEncoder.writeLong((Long) value);
} else if (value instanceof Integer) {
binaryEncoder.writeInt((Integer) value);
} else if (value instanceof Boolean) {
binaryEncoder.writeBoolean((Boolean) value);
} else if (value instanceof ByteBuffer) {
binaryEncoder.writeBytes((ByteBuffer) value);
} else if (value instanceof Double) {
binaryEncoder.writeDouble((Double) value);
} else if (value instanceof Float) {
binaryEncoder.writeFloat((Float) value);
} else if (value instanceof java.sql.Array) {
binaryEncoder.writeArrayStart();
Object[] array = (Object[]) ((java.sql.Array) value).getArray();
binaryEncoder.setItemCount(array.length);
for (Object arrayItem : array) {
binaryEncoder.startItem();
writeValue(arrayItem, binaryEncoder);
}

binaryEncoder.writeArrayEnd();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public static Schema createSchemaByReadingOneRow(
try (Statement statement = connection.createStatement()) {
final ResultSet resultSet = statement.executeQuery(queryBuilderArgs.sqlQueryWithLimitOne());

resultSet.next();

final Schema schema =
createAvroSchema(
resultSet,
Expand Down Expand Up @@ -107,7 +109,7 @@ public static Schema createAvroSchema(
.prop("tableName", tableName)
.prop("connectionUrl", connectionUrl)
.fields();
return createAvroFields(meta, builder, useLogicalTypes).endRecord();
return createAvroFields(resultSet, builder, useLogicalTypes).endRecord();
}

static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException {
Expand All @@ -123,11 +125,13 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep
}

private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final ResultSetMetaData meta,
final SchemaBuilder.FieldAssembler<Schema> builder,
final ResultSet resultSet,
final SchemaBuilder.FieldAssembler<Schema> builder,
final boolean useLogicalTypes)
throws SQLException {

ResultSetMetaData meta = resultSet.getMetaData();

for (int i = 1; i <= meta.getColumnCount(); i++) {

final String columnName;
Expand All @@ -140,7 +144,8 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final int columnType = meta.getColumnType(i);
final String typeName = JDBCType.valueOf(columnType).getName();
final String columnClassName = meta.getColumnClassName(i);
final SchemaBuilder.FieldBuilder<Schema> field =
final String columnTypeName = meta.getColumnTypeName(i);
SchemaBuilder.FieldBuilder<Schema> field =
builder
.name(normalizeForAvro(columnName))
.doc(String.format("From sqlType %d %s (%s)", columnType, typeName, columnClassName))
Expand All @@ -149,13 +154,21 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
.prop("typeName", typeName)
.prop("columnClassName", columnClassName);

if (columnTypeName != null) {
field = field.prop("columnTypeName", columnTypeName);
}

final SchemaBuilder.BaseTypeBuilder<
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and();

Integer arrayItemType = resultSet.isFirst() && columnType == ARRAY
? resultSet.getArray(i).getBaseType() : null;

final SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>> schemaFieldAssembler =
setAvroColumnType(
columnType,
arrayItemType,
meta.getPrecision(i),
columnClassName,
useLogicalTypes,
Expand All @@ -181,6 +194,7 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
private static SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>
setAvroColumnType(
final int columnType,
final Integer arrayItemType,
final int precision,
final String columnClassName,
final boolean useLogicalTypes,
Expand Down Expand Up @@ -225,10 +239,12 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
} else {
return field.bytesType();
}
case ARRAY:
return setAvroColumnType(arrayItemType, null, precision, columnClassName,
useLogicalTypes, field.array().items());
case BINARY:
case VARBINARY:
case LONGVARBINARY:
case ARRAY:
case BLOB:
return field.bytesType();
case DOUBLE:
Expand Down
53 changes: 46 additions & 7 deletions dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import com.google.auto.value.AutoValue;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

// A fictitious DB model to test different SQL types
@AutoValue
Expand All @@ -42,7 +45,9 @@ public static Coffee create(
final java.sql.Timestamp created,
final Optional<java.sql.Timestamp> updated,
final UUID uid,
final Long rownum) {
final Long rownum,
final List<Integer> intArr,
final List<String> textArr) {
return new AutoValue_Coffee(
name,
supId,
Expand All @@ -55,7 +60,9 @@ public static Coffee create(
created,
updated,
uid,
rownum);
rownum,
new ArrayList<>(intArr),
new ArrayList<>(textArr));
}

public abstract String name();
Expand All @@ -82,10 +89,15 @@ public static Coffee create(

public abstract Long rownum();

public abstract List<Integer> intArr();

public abstract List<String> textArr();

public String insertStatement() {
return String.format(
Locale.ENGLISH,
"INSERT INTO COFFEES " + "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d)",
"INSERT INTO COFFEES " + "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d,"
+ " ARRAY [%s], ARRAY ['%s'])",
name(),
supId().orElse(null),
price().toString(),
Expand All @@ -97,7 +109,9 @@ public String insertStatement() {
created(),
updated().orElse(null),
uid(),
rownum());
rownum(),
String.join(",", intArr().stream().map(x -> (CharSequence) x.toString())::iterator),
String.join("','", textArr()));
}

public static String ddl() {
Expand All @@ -114,7 +128,9 @@ public static String ddl() {
+ "\"CREATED\" TIMESTAMP NOT NULL,"
+ "\"UPDATED\" TIMESTAMP,"
+ "\"UID\" UUID NOT NULL,"
+ "\"ROWNUM\" BIGINT NOT NULL);";
+ "\"ROWNUM\" BIGINT NOT NULL,"
+ "\"INT_ARR\" INTEGER ARRAY NOT NULL,"
+ "\"TEXT_ARR\" VARCHAR ARRAY NOT NULL);";
}

public static Coffee COFFEE1 =
Expand All @@ -130,7 +146,19 @@ public static String ddl() {
new java.sql.Timestamp(1488300933000L),
Optional.empty(),
UUID.fromString("123e4567-e89b-12d3-a456-426655440000"),
1L);
1L,
new ArrayList<Integer>() {{
add(5);
add(7);
add(11);
}},
new ArrayList<String>() {{
add("rock");
add("scissors");
add("paper");
}}
);

public static Coffee COFFEE2 =
create(
"colombian caffee",
Expand All @@ -144,5 +172,16 @@ public static String ddl() {
new java.sql.Timestamp(1488300723000L),
Optional.empty(),
UUID.fromString("123e4567-e89b-a456-12d3-426655440000"),
2L);
2L,
new ArrayList<Integer>() {{
add(7);
add(11);
add(23);
}},
new ArrayList<String>() {{
add("scissors");
add("paper");
add("rock");
}}
);
}
Loading

0 comments on commit 07443b4

Please sign in to comment.