Skip to content

Commit

Permalink
Inspection
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Jul 3, 2024
1 parent 6c0548a commit a2a85a8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 21 deletions.
28 changes: 10 additions & 18 deletions src/main/java/io/kcache/kwack/KwackEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.kcache.caffeine.CaffeineCache;
import io.kcache.kwack.KwackConfig.RowAttribute;
import io.kcache.kwack.KwackConfig.SerdeType;
import io.kcache.kwack.loader.json.JsonLoader;
import io.kcache.kwack.schema.ColumnDef;
import io.kcache.kwack.schema.MapColumnDef;
import io.kcache.kwack.schema.StructColumnDef;
Expand Down Expand Up @@ -54,22 +55,14 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.duckdb.DuckDBArray;
Expand Down Expand Up @@ -106,11 +99,8 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import sqlline.BuiltInProperty;
import sqlline.SqlLine;
import sqlline.SqlLine.Status;
Expand Down Expand Up @@ -355,7 +345,7 @@ public Optional<ParsedSchema> parseSchema(String schemaType, String schema,
parsedSchema.validate(false);
return Optional.of(parsedSchema);
} catch (Exception e) {
LOG.error("Could not parse schema " + schema, e);
LOG.error("Could not parse schema {}", schema, e);
return Optional.empty();
}
}
Expand All @@ -368,7 +358,7 @@ public Optional<ParsedSchema> getLatestSchema(String subject) {
SchemaMetadata schema = getSchemaRegistry().getLatestSchemaMetadata(subject);
return getSchemaRegistry().parseSchema(new Schema(null, schema));
} catch (Exception e) {
LOG.error("Could not find latest schema for subject " + subject, e);
LOG.error("Could not find latest schema for subject {}", subject, e);
return Optional.empty();
}
}
Expand All @@ -378,7 +368,7 @@ public Optional<ParsedSchema> getSchemaById(int id) {
ParsedSchema schema = getSchemaRegistry().getSchemaById(id);
return Optional.of(schema);
} catch (Exception e) {
LOG.error("Could not find schema with id " + id, e);
LOG.error("Could not find schema with id {}", id, e);
return Optional.empty();
}
}
Expand Down Expand Up @@ -407,6 +397,7 @@ private Object deserialize(boolean isKey, String topic, byte[] bytes) throws IOE
loader = new AvroLoader();
break;
case "JSON":
loader = new JsonLoader();
break;
case "PROTOBUF":
break;
Expand Down Expand Up @@ -490,7 +481,7 @@ private void initTable(DuckDBConnection conn, String topic) {
try {
conn.createStatement().execute(ddl);
} catch (SQLException e) {
LOG.warn("Could not execute DDL: " + e.getMessage());
LOG.warn("Could not execute DDL: {}", e.getMessage());
}
}

Expand All @@ -506,7 +497,7 @@ private void initTable(DuckDBConnection conn, String topic) {
try {
conn.createStatement().execute(ddl);
} catch (SQLException e) {
LOG.error("Could not execute DDL: " + ddl, e);
LOG.error("Could not execute DDL: {}", ddl, e);
throw new RuntimeException(e);
}
}
Expand All @@ -520,6 +511,7 @@ private ColumnDef toColumnDef(boolean isKey, Either<SerdeType, ParsedSchema> sch
loader = new AvroLoader();
break;
case "JSON":
loader = new JsonLoader();
break;
case "PROTOBUF":
break;
Expand Down Expand Up @@ -774,7 +766,7 @@ public void sync() {
try {
value.sync();
} catch (Exception e) {
LOG.warn("Could not sync cache for " + key);
LOG.warn("Could not sync cache for {}", key);
}
});
}
Expand All @@ -789,7 +781,7 @@ public void close() throws IOException {
try {
value.close();
} catch (IOException e) {
LOG.warn("Could not close cache for " + key);
LOG.warn("Could not close cache for {}", key);
}
});
resetSchemaRegistry(config.getSchemaRegistryUrls(), schemaRegistry);
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/io/kcache/kwack/KwackMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import sqlline.BuiltInProperty;
import sqlline.SqlLine;
import sqlline.SqlLine.Status;

@Command(name = "kwack", mixinStandardHelpOptions = true,
versionProvider = KwackMain.ManifestVersionProvider.class,
Expand Down

0 comments on commit a2a85a8

Please sign in to comment.