diff --git a/src/main/java/io/kcache/kwack/KwackEngine.java b/src/main/java/io/kcache/kwack/KwackEngine.java index c6abb5b..075c83b 100644 --- a/src/main/java/io/kcache/kwack/KwackEngine.java +++ b/src/main/java/io/kcache/kwack/KwackEngine.java @@ -33,6 +33,7 @@ import io.kcache.kwack.translator.avro.AvroTranslator; import io.kcache.kwack.util.Jackson; import io.vavr.control.Either; +import java.io.PrintWriter; import java.io.UncheckedIOException; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -207,6 +208,7 @@ public void init() { public void start() throws IOException { if (query != null && !query.isEmpty()) { try { + PrintWriter pw = new PrintWriter(System.out); try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(query)) { ResultSetMetaData md = rs.getMetaData(); @@ -227,10 +229,10 @@ public void start() throws IOException { row.put(name, toJson(rs.getObject(i + 1))); } String s = MAPPER.writeValueAsString(row); - // TODO use PrintWriter - System.out.println(s); + pw.println(s); } } + pw.flush(); } catch (SQLException e) { throw new IOException(e); } @@ -571,7 +573,7 @@ private void initTable(DuckDBConnection conn, String topic) { } ddl += valueDdl; if (rowInfoSize > 0) { - ddl += ROWINFO + " " + ROWINFO; + ddl += ROWINFO + " " + rowInfoDef.toDdl(); } ddl += ")"; try {