Skip to content

Commit

Permalink
Ksql engine refactor (#1897)
Browse files Browse the repository at this point in the history
* Start consolidating Rest / Standalone
  • Loading branch information
big-andy-coates authored Sep 26, 2018
1 parent 0c01ba2 commit 20afd28
Show file tree
Hide file tree
Showing 30 changed files with 492 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -326,10 +327,8 @@ private static String getStructString(final Schema schema) {
.collect(Collectors.joining(", ", "STRUCT <", ">"));
}

public static String buildAvroSchema(final Schema schema, final String name) {

final org.apache.avro.SchemaBuilder.FieldAssembler fieldAssembler =
org.apache.avro.SchemaBuilder
static org.apache.avro.Schema buildAvroSchema(final Schema schema, final String name) {
final FieldAssembler<org.apache.avro.Schema> fieldAssembler = org.apache.avro.SchemaBuilder
.record(name).namespace("ksql")
.fields();

Expand All @@ -340,7 +339,7 @@ public static String buildAvroSchema(final Schema schema, final String name) {
.withDefault(null);
}

return fieldAssembler.endRecord().toString();
return fieldAssembler.endRecord();
}

private static org.apache.avro.Schema getAvroSchemaForField(final Schema fieldSchema) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -147,7 +147,7 @@ public void shouldCreateCorrectAvroSchemaWithNullableFields() {
.field("orderunits", Schema.OPTIONAL_FLOAT64_SCHEMA)
.field("arraycol", SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional().build())
.field("mapcol", SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA)).optional().build();
final String avroSchemaString = SchemaUtil.buildAvroSchema(schemaBuilder.build(), "orders");
final String avroSchemaString = SchemaUtil.buildAvroSchema(schemaBuilder.build(), "orders").toString();
assertThat(avroSchemaString, equalTo(
"{\"type\":\"record\",\"name\":\"orders\",\"namespace\":\"ksql\",\"fields\":"
+ "[{\"name\":\"ordertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":"
Expand Down Expand Up @@ -212,7 +212,7 @@ public void shouldGetTheCorrectJavaTypeForMap() {
public void shouldFailForCorrectJavaType() {

try {
final Class class8 = SchemaUtil.getJavaType(Schema.BYTES_SCHEMA);
SchemaUtil.getJavaType(Schema.BYTES_SCHEMA);
Assert.fail();
} catch (final KsqlException ksqlException) {
assertThat("Invalid type retured.",ksqlException.getMessage(), equalTo("Type is not "
Expand Down Expand Up @@ -295,7 +295,7 @@ public void shouldGetTheCorrectSchemaForMap() {
public void shouldFailForIncorrectSchema() {

try {
final Schema schema8 = SchemaUtil.getTypeSchema("BYTES");
SchemaUtil.getTypeSchema("BYTES");
Assert.fail();
} catch (final Exception e) {
assertThat(e.getMessage(), equalTo("Unsupported type: BYTES"));
Expand Down Expand Up @@ -421,6 +421,7 @@ public void shouldReturnFieldNameWithoutAliasAsIs() {
equalTo(schema.fields().get(0).name()));
}

@Test
public void shouldResolveIntAndLongSchemaToLong() {
assertThat(
SchemaUtil.resolveArithmeticType(Schema.Type.INT64, Schema.Type.INT32).type(),
Expand Down Expand Up @@ -558,8 +559,10 @@ public void shouldSetDocOnGetSchemaFromType() {
}

// Following methods not invoked but used to test conversion from Type -> Schema
@SuppressWarnings("unused")
private void mapType(final Map<String, Integer> map) {}

@SuppressWarnings("unused")
private void listType(final List<Double> list) {}
}

Loading

0 comments on commit 20afd28

Please sign in to comment.