From 07c4f3736d0b7a54b81fe4d3cd2b5770462ce19c Mon Sep 17 00:00:00 2001 From: Benoit Orihuela Date: Wed, 16 Oct 2024 09:58:48 +0100 Subject: [PATCH] fix: improve logs / skip entities with no values to insert (#43) Signed-off-by: Benoit Orihuela --- .../nifi/processors/ngsild/NgsiLdToPostgreSQL.java | 12 ++++++------ .../io/nifi/processors/ngsild/PostgreSQLBackend.java | 6 +++++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/NgsiLdToPostgreSQL.java b/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/NgsiLdToPostgreSQL.java index 2e1200d..a3c6283 100755 --- a/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/NgsiLdToPostgreSQL.java +++ b/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/NgsiLdToPostgreSQL.java @@ -235,18 +235,18 @@ public class NgsiLdToPostgreSQL extends AbstractSessionFactoryProcessor { final PreparedStatement stmt = enclosure.getCachedStatement(conn); JdbcCommon.setParameters(stmt, flowFile.getAttributes()); try { - getLogger().info("Gonna create schema {}", schemaName); + getLogger().debug("Gonna create schema {}", schemaName); conn.createStatement().execute(postgres.createSchema(schemaName)); - getLogger().info("Gonna create table {} with columns {}", tableName, updatedListOfTypedFields); + getLogger().debug("Gonna create table {} with columns {}", tableName, updatedListOfTypedFields); conn.createStatement().execute(postgres.createTable(schemaName, tableName, updatedListOfTypedFields)); ResultSet rs = conn.createStatement().executeQuery(postgres.checkColumnNames(tableName)); Map newColumns = postgres.getNewColumns(rs, updatedListOfTypedFields); - if (newColumns.size() > 0) { - getLogger().info("Identified new columns to create: {}", newColumns); + if (!newColumns.isEmpty()) { + getLogger().debug("Identified new columns to create: {}", newColumns); conn.createStatement().execute(postgres.addColumns(schemaName, tableName, newColumns)); } } catch (SQLException s) { - getLogger().error(s.toString(), s); + getLogger().error("Error when preparing schema: {}", s.toString(), s); } stmt.addBatch(); }, onFlowFileError(context, session, result))) { @@ -255,7 +255,7 @@ public class NgsiLdToPostgreSQL extends AbstractSessionFactoryProcessor { enclosure.addFlowFile(flowFile); } } catch (Exception e) { - getLogger().error(e.toString(), e); + getLogger().error("Unexpected exception processing flow file: {}", e.toString(), e); } } }; diff --git a/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/PostgreSQLBackend.java b/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/PostgreSQLBackend.java index 34d5c6c..ecfb4dd 100755 --- a/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/PostgreSQLBackend.java +++ b/nifi-ngsild-postgresql-processors/src/main/java/egm/io/nifi/processors/ngsild/PostgreSQLBackend.java @@ -401,7 +401,11 @@ public String insertQuery( List valuesForInsert = this.getValuesForInsert(entity, listOfFields, creationTime, datasetIdPrefixToTruncate, exportSysAttrs, ignoreEmptyObservedAt, flattenObservations); - return "insert into " + schemaName + "." + tableName + " " + this.getFieldsForInsert(listOfFields.keySet()) + " values " + String.join(",", valuesForInsert) + ";"; + if (valuesForInsert.isEmpty()) { + logger.warn("Unable to get values to insert for {}, returning fake statement", entity.entityId); + return "select 1;"; + } else + return "insert into " + schemaName + "." + tableName + " " + this.getFieldsForInsert(listOfFields.keySet()) + " values " + String.join(",", valuesForInsert) + ";"; } public String checkColumnNames(String tableName) {