diff --git a/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java b/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java index 208a287c4..e6a4ae7bf 100644 --- a/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java @@ -27,7 +27,6 @@ import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter; import com.hivemq.api.AbstractApi; import com.hivemq.api.adapters.AdapterConfigModel; -import com.hivemq.api.format.DataUrl; import com.hivemq.api.errors.AlreadyExistsError; import com.hivemq.api.errors.BadRequestError; import com.hivemq.api.errors.InternalServerError; @@ -39,6 +38,7 @@ import com.hivemq.api.errors.adapters.AdapterTypeNotFoundError; import com.hivemq.api.errors.adapters.AdapterTypeReadOnlyError; import com.hivemq.api.errors.adapters.DomainTagNotFoundError; +import com.hivemq.api.format.DataUrl; import com.hivemq.api.json.CustomConfigSchemaGenerator; import com.hivemq.api.model.ApiConstants; import com.hivemq.api.model.ApiErrorMessages; @@ -190,7 +190,8 @@ public ProtocolAdaptersResourceImpl( final Optional protocolAdapterType = protocolAdapterManager.getAdapterTypeById(adapterType); if (protocolAdapterType.isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format("Adapter not found '%s'", adapterType))); + return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format("Adapter not found '%s'", + adapterType))); } final List adapters = protocolAdapterManager.getProtocolAdapters() .values() @@ -206,7 +207,8 @@ public ProtocolAdaptersResourceImpl( public @NotNull Response getAdapter(final @NotNull String adapterId) { final Optional instance = protocolAdapterManager.getAdapterById(adapterId); if (instance.isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); } return Response.ok(convertToAdapter(instance.get())).build(); } @@ -234,7 +236,8 @@ public ProtocolAdaptersResourceImpl( final Optional instance = protocolAdapterManager.getAdapterById(adapterId); if (instance.isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); } final ProtocolAdapterWrapper adapterInstance = instance.get(); @@ -283,7 +286,8 @@ public int getDepth() { final Optional protocolAdapterType = protocolAdapterManager.getAdapterTypeById(adapterType); if (protocolAdapterType.isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format("Adapter not found '%s'", adapterType))); + return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format("Adapter not found '%s'", + adapterType))); } final ApiErrorMessages errorMessages = ApiErrorUtils.createErrorContainer(); final Optional instance = protocolAdapterManager.getAdapterById(adapter.getId()); @@ -316,7 +320,8 @@ public int getDepth() { public @NotNull Response updateAdapter(final @NotNull String adapterId, final @NotNull Adapter adapter) { final Optional instance = protocolAdapterManager.getAdapterById(adapterId); if (instance.isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); } final ApiErrorMessages errorMessages = ApiErrorUtils.createErrorContainer(); validateAdapterSchema(errorMessages, adapter); @@ -341,7 +346,8 @@ public int getDepth() { public @NotNull Response deleteAdapter(final @NotNull String adapterId) { final Optional instance = protocolAdapterManager.getAdapterById(adapterId); if (instance.isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); } if (logger.isDebugEnabled()) { logger.debug("Deleting adapter \"{}\".", adapterId); @@ -359,7 +365,8 @@ public int getDepth() { ApiErrorUtils.validateRequiredFieldRegex(errorMessages, "id", adapterId, HiveMQEdgeConstants.ID_REGEX); ApiErrorUtils.validateRequiredEntity(errorMessages, "command", command); if (protocolAdapterManager.getAdapterById(adapterId).isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); } if (ApiErrorUtils.hasRequestErrors(errorMessages)) { return ErrorResponseUtil.errorResponse(new AdapterFailedSchemaValidationError(errorMessages.toErrorList())); @@ -391,7 +398,8 @@ public int getDepth() { return ErrorResponseUtil.errorResponse(new AdapterFailedSchemaValidationError(errorMessages.toErrorList())); } if (protocolAdapterManager.getAdapterById(adapterId).isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); } return Response.ok(getStatusInternal(adapterId)).build(); } @@ -442,15 +450,18 @@ protected void validateAdapterSchema( @Override public @NotNull Response getDomainTagsForAdapter(final @NotNull String adapterId) { - return protocolAdapterManager.getTagsForAdapter(adapterId).map(tags -> { - if (tags.isEmpty()) { - return Response.ok(new DomainTagModelList(List.of())).build(); - } else { - final List domainTagModels = - tags.stream().map(DomainTagModel::fromDomainTag).collect(Collectors.toList()); - return Response.ok(new DomainTagModelList(domainTagModels)).build(); - } - }).orElse(ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId)))); + return protocolAdapterManager.getTagsForAdapter(adapterId) + .map(tags -> { + if (tags.isEmpty()) { + return Response.ok(new DomainTagModelList(List.of())).build(); + } else { + final List domainTagModels = + tags.stream().map(DomainTagModel::fromDomainTag).collect(Collectors.toList()); + return Response.ok(new DomainTagModelList(domainTagModels)).build(); + } + }) + .orElse(ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId)))); } @Override @@ -468,7 +479,8 @@ protected void validateAdapterSchema( "' cannot be created since another item already exists with the same id.")); case ADAPTER_MISSING: log.warn("Tags could not be added for adapter '{}' because the adapter was not found.", adapterId); - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); default: log.error("Unhandled PUT-status: {}", domainTagAddResult.getDomainTagPutStatus()); return ErrorResponseUtil.errorResponse(new InternalServerError(null)); @@ -529,7 +541,7 @@ protected void validateAdapterSchema( case ALREADY_USED_BY_ANOTHER_ADAPTER: //noinspection DataFlowIssue cant be null here. final @NotNull String tagName = domainTagUpdateResult.getErrorMessage(); - return ErrorResponseUtil.errorResponse(new DomainTagNotFoundError("The tag '" + + return ErrorResponseUtil.errorResponse(new AlreadyExistsError("The tag '" + tagName + "' cannot be created since another item already exists with the same id.")); case INTERNAL_ERROR: @@ -563,13 +575,14 @@ protected void validateAdapterSchema( public @NotNull Response getTagSchema(final @NotNull String protocolId) { return protocolAdapterManager.getAdapterTypeById(protocolId) .map(info -> Response.ok(new TagSchema(protocolId, - customConfigSchemaGenerator.generateJsonSchema(info.tagConfigurationClass()))) - .build()) + customConfigSchemaGenerator.generateJsonSchema(info.tagConfigurationClass()))).build()) .orElseGet(() -> { log.warn( "Json Schema for tags for protocols of type '{}' could not be generated because the protocol id is unknown ton this edge instance.", protocolId); - return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format("Adapter not found '%s'", protocolId))); + return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format( + "Adapter not found '%s'", + protocolId))); }); } @@ -581,7 +594,8 @@ protected void validateAdapterSchema( protocolAdapterManager.getAdapterById(adapterId); if (optionalProtocolAdapterWrapper.isEmpty()) { log.warn("The Json Schema for an adapter '{}' was requested, but the adapter does not exist.", adapterId); - return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId))); + return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", + adapterId))); } final com.hivemq.adapter.sdk.api.ProtocolAdapter adapter = optionalProtocolAdapterWrapper.get().getAdapter(); @@ -589,7 +603,9 @@ protected void validateAdapterSchema( if (!(adapter instanceof WritingProtocolAdapter)) { log.warn("The Json Schema for an adapter '{}' was requested, which does not support writing to PLCs.", adapterId); - return ErrorResponseUtil.errorResponse(new AdapterTypeReadOnlyError("The adapter with id '" + adapterId + "' exists, but it does not support writing to PLCs.")); + return ErrorResponseUtil.errorResponse(new AdapterTypeReadOnlyError("The adapter with id '" + + adapterId + + "' exists, but it does not support writing to PLCs.")); } final TagSchemaCreationOutputImpl tagSchemaCreationOutput = new TagSchemaCreationOutputImpl(); @@ -603,14 +619,21 @@ protected void validateAdapterSchema( log.debug("Original exception: ", e); return ErrorResponseUtil.errorResponse(new InternalServerError(null)); } catch (final ExecutionException e) { - if (e.getCause() instanceof UnsupportedOperationException) { - return ErrorResponseUtil.errorResponse(new AdapterOperationNotSupportedError(String.format("Operation not supported '%s'", e.getCause().getMessage()))); - } else if (e.getCause() instanceof IllegalStateException) { - return ErrorResponseUtil.errorResponse(new AdapterOperationNotSupportedError(String.format("Adapter not started '%s'", e.getCause().getMessage()))); - } else { - log.warn("Exception was raised during creation of json schema for writing to PLCs."); - log.debug("Original exception: ", e); - return ErrorResponseUtil.errorResponse(new InternalServerError(null)); + switch (tagSchemaCreationOutput.getStatus()) { + case NOT_SUPPORTED: + return ErrorResponseUtil.errorResponse(new AdapterOperationNotSupportedError(String.format( + "Operation not supported '%s'", + e.getCause().getMessage()))); + case ADAPTER_NOT_STARTED: + return ErrorResponseUtil.errorResponse(new AdapterOperationNotSupportedError(String.format( + "Adapter not started '%s'", + e.getCause().getMessage()))); + case TAG_NOT_FOUND: + return ErrorResponseUtil.errorResponse(new DomainTagNotFoundError(tagName)); + default: + log.warn("Exception was raised during creation of json schema for writing to PLCs."); + log.debug("Original exception: ", e); + return ErrorResponseUtil.errorResponse(new InternalServerError(null)); } } } @@ -624,7 +647,8 @@ protected void validateAdapterSchema( final Optional protocolAdapterInformation = protocolAdapterManager.getAdapterTypeById(adapterType); if (protocolAdapterInformation.isEmpty()) { - return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format("Adapter not found '%s'", adapterType))); + return ErrorResponseUtil.errorResponse(new AdapterTypeNotFoundError(String.format("Adapter not found '%s'", + adapterType))); } final ApiErrorMessages errorMessages = ApiErrorUtils.createErrorContainer(); final String adapterId = adapter.getAdapter().getId(); @@ -693,7 +717,9 @@ protected void validateAdapterSchema( .collect(Collectors.toList())) .map(NorthboundMappingListModel::new) .map(mappingsList -> Response.ok(mappingsList).build()) - .orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId)))); + .orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format( + "Adapter not found '%s'", + adapterId)))); } @@ -721,34 +747,41 @@ protected void validateAdapterSchema( @Override public Response updateNorthboundMappingsForAdapter( final @NotNull String adapterId, final @NotNull NorthboundMappingListModel northboundMappingListModel) { - return protocolAdapterManager.getAdapterById(adapterId).map(adapter -> { - final Set requiredTags = new HashSet<>(); - final List converted = northboundMappingListModel.getItems().stream().map(mapping -> { - requiredTags.add(mapping.getTagName()); - return mapping.to(); - }).collect(Collectors.toList()); - adapter.getTags().forEach(tag -> requiredTags.remove(tag.getName())); - - // TODO for now simulation does not need tags - if (adapter.getProtocolAdapterInformation().getProtocolId().equals("simulation")) { - requiredTags.clear(); - } + return protocolAdapterManager.getAdapterById(adapterId) + .map(adapter -> { + final Set requiredTags = new HashSet<>(); + final List converted = + northboundMappingListModel.getItems().stream().map(mapping -> { + requiredTags.add(mapping.getTagName()); + return mapping.to(); + }).collect(Collectors.toList()); + adapter.getTags().forEach(tag -> requiredTags.remove(tag.getName())); + + // TODO for now simulation does not need tags + if (adapter.getProtocolAdapterInformation().getProtocolId().equals("simulation")) { + requiredTags.clear(); + } - if (requiredTags.isEmpty()) { - if (protocolAdapterManager.updateAdapterFromMappings(adapterId, converted)) { - log.info("Successfully updated northbound mappings for adapter '{}'.", adapterId); - return Response.ok(northboundMappingListModel).build(); - } else { - log.error("Something went wrong updating the adapter {}", adapterId); - return ErrorResponseUtil.errorResponse(new InternalServerError(null)); - } - } else { - log.error("The following tags were missing for updating the northbound mappings for adapter {}: {}", - adapterId, - requiredTags); - return ErrorResponseUtil.errorResponse(new BadRequestError("Tags were missing for updating the northbound mappings" + requiredTags)); - } - }).orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId)))); + if (requiredTags.isEmpty()) { + if (protocolAdapterManager.updateAdapterFromMappings(adapterId, converted)) { + log.info("Successfully updated northbound mappings for adapter '{}'.", adapterId); + return Response.ok(northboundMappingListModel).build(); + } else { + log.error("Something went wrong updating the adapter {}", adapterId); + return ErrorResponseUtil.errorResponse(new InternalServerError(null)); + } + } else { + log.error( + "The following tags were missing for updating the northbound mappings for adapter {}: {}", + adapterId, + requiredTags); + return ErrorResponseUtil.errorResponse(new BadRequestError( + "Tags were missing for updating the northbound mappings" + requiredTags)); + } + }) + .orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format( + "Adapter not found '%s'", + adapterId)))); } @Override @@ -760,35 +793,44 @@ public Response updateNorthboundMappingsForAdapter( .collect(Collectors.toList())) .map(SouthboundMappingListModel::new) .map(mappingsList -> Response.ok(mappingsList).build()) - .orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId)))); + .orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format( + "Adapter not found '%s'", + adapterId)))); } @Override public @NotNull Response updateSouthboundMappingsForAdapter( final @NotNull String adapterId, final @NotNull SouthboundMappingListModel southboundMappingListModel) { - return protocolAdapterManager.getAdapterById(adapterId).map(adapter -> { - final Set requiredTags = new HashSet<>(); - final List converted = southboundMappingListModel.getItems().stream().map(mapping -> { - requiredTags.add(mapping.getTagName()); - return parseAndEnrichWithSchema(mapping); - }).collect(Collectors.toList()); - adapter.getTags().forEach(tag -> requiredTags.remove(tag.getName())); - - if (requiredTags.isEmpty()) { - if (protocolAdapterManager.updateAdapterToMappings(adapterId, converted)) { - log.info("Successfully updated fromMappings for adapter '{}'.", adapterId); - return Response.ok(southboundMappingListModel).build(); - } else { - log.error("Something went wrong updating the adapter {}", adapterId); - return ErrorResponseUtil.errorResponse(new InternalServerError(null)); - } - } else { - log.error("The following tags were missing for updating the southbound mappings for adapter {}: {}", - adapterId, - requiredTags); - return ErrorResponseUtil.errorResponse(new BadRequestError("Tags were missing for updating the southbound mappings" + requiredTags)); - } - }).orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'", adapterId)))); + return protocolAdapterManager.getAdapterById(adapterId) + .map(adapter -> { + final Set requiredTags = new HashSet<>(); + final List converted = + southboundMappingListModel.getItems().stream().map(mapping -> { + requiredTags.add(mapping.getTagName()); + return parseAndEnrichWithSchema(mapping); + }).collect(Collectors.toList()); + adapter.getTags().forEach(tag -> requiredTags.remove(tag.getName())); + + if (requiredTags.isEmpty()) { + if (protocolAdapterManager.updateAdapterToMappings(adapterId, converted)) { + log.info("Successfully updated fromMappings for adapter '{}'.", adapterId); + return Response.ok(southboundMappingListModel).build(); + } else { + log.error("Something went wrong updating the adapter {}", adapterId); + return ErrorResponseUtil.errorResponse(new InternalServerError(null)); + } + } else { + log.error( + "The following tags were missing for updating the southbound mappings for adapter {}: {}", + adapterId, + requiredTags); + return ErrorResponseUtil.errorResponse(new BadRequestError( + "Tags were missing for updating the southbound mappings" + requiredTags)); + } + }) + .orElseGet(() -> ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format( + "Adapter not found '%s'", + adapterId)))); } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientWrapper.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientWrapper.java index 8a2c00627..02f5ff898 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientWrapper.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientWrapper.java @@ -62,16 +62,16 @@ public class OpcUaClientWrapper { private final String adapterId; public final @NotNull OpcUaClient client; - public final @NotNull Optional jsonToOpcUAConverter; - public final @NotNull Optional jsonSchemaGenerator; + public final @NotNull JsonToOpcUAConverter jsonToOpcUAConverter; + public final @NotNull JsonSchemaGenerator jsonSchemaGenerator; public final @NotNull OpcUaSubscriptionLifecycle opcUaSubscriptionLifecycle; public OpcUaClientWrapper( final @NotNull String adapterId, final @NotNull OpcUaClient client, final @NotNull OpcUaSubscriptionLifecycle opcUaSubscriptionLifecycle, - final @NotNull Optional jsonToOpcUAConverter, - final @NotNull Optional jsonSchemaGenerator) { + final @NotNull JsonToOpcUAConverter jsonToOpcUAConverter, + final @NotNull JsonSchemaGenerator jsonSchemaGenerator) { this.adapterId = adapterId; this.client = client; this.jsonToOpcUAConverter = jsonToOpcUAConverter; @@ -88,14 +88,7 @@ public void createMqttPayloadJsonSchema( final @NotNull OpcuaTag tag, final @NotNull TagSchemaCreationOutput output) { final String nodeId = tag.getDefinition().getNode(); - jsonSchemaGenerator.ifPresentOrElse(gen -> gen.createJsonSchema(NodeId.parse(nodeId)) - .whenComplete((jsonNode, throwable) -> { - if (throwable != null) { - output.fail(throwable, null); - } else { - output.finish(jsonNode); - } - }), () -> output.fail(new IllegalArgumentException("Missing JSON Schema generator"), null)); + jsonSchemaGenerator.createJsonSchema(NodeId.parse(nodeId), output); } public void discoverValues( @@ -145,23 +138,19 @@ public void write( final NodeId nodeId = NodeId.parse(opcuaTag.getDefinition().getNode()); try { - jsonToOpcUAConverter.map(conv -> conv.convertToOpcUAValue(opcUAWritePayload.getValue(), nodeId)) - .ifPresentOrElse(opcUaObject -> { - final Variant variant = new Variant(opcUaObject); - final DataValue dataValue = new DataValue(variant, null, null); - final CompletableFuture writeFuture = client.writeValue(nodeId, dataValue); - writeFuture.whenComplete((statusCode, throwable) -> { - if (throwable != null) { - log.error("Exception while writing to opcua node '{}'", - writeContext.getTagName(), - throwable); - writingOutput.fail(throwable, null); - } else { - log.info("Wrote '{}' to nodeId={}", variant, nodeId); - writingOutput.finish(); - } - }); - }, () -> writingOutput.fail("JsonToOpcUaConverter not available")); + final Object opcuaObject = jsonToOpcUAConverter.convertToOpcUAValue(opcUAWritePayload.getValue(), nodeId); + final Variant variant = new Variant(opcuaObject); + final DataValue dataValue = new DataValue(variant, null, null); + final CompletableFuture writeFuture = client.writeValue(nodeId, dataValue); + writeFuture.whenComplete((statusCode, throwable) -> { + if (throwable != null) { + log.error("Exception while writing to opcua node '{}'", writeContext.getTagName(), throwable); + writingOutput.fail(throwable, null); + } else { + log.info("Wrote '{}' to nodeId={}", variant, nodeId); + writingOutput.finish(); + } + }); } catch (final Exception e) { writingOutput.fail(e, null); } @@ -281,8 +270,7 @@ public void onSessionActive(final @NotNull UaSession session) { }); return opcUaClient.connect().thenCompose(uaClient -> { - final OpcUaSubscriptionLifecycle opcUaSubscriptionLifecycle = new OpcUaSubscriptionLifecycle( - opcUaClient, + final OpcUaSubscriptionLifecycle opcUaSubscriptionLifecycle = new OpcUaSubscriptionLifecycle(opcUaClient, adapterId, protocolId, protocolAdapterMetricsService, @@ -294,17 +282,16 @@ public void onSessionActive(final @NotNull UaSession session) { opcUaClient.getSubscriptionManager().addSubscriptionListener(opcUaSubscriptionLifecycle); try { - final Optional jsonToOpcUAConverterOpt = - Optional.of(new JsonToOpcUAConverter(opcUaClient)); - final Optional jsonSchemaGeneratorOpt = - Optional.of(new JsonSchemaGenerator(opcUaClient, new ObjectMapper())); + final JsonToOpcUAConverter jsonToOpcUAConverter = new JsonToOpcUAConverter(opcUaClient); + final JsonSchemaGenerator jsonSchemaGenerator = + new JsonSchemaGenerator(opcUaClient, new ObjectMapper()); if (adapterConfig.getOpcuaToMqttConfig() != null) { return opcUaSubscriptionLifecycle.subscribeAll(northboundsMappings) .thenApply(ignored -> new OpcUaClientWrapper(adapterId, opcUaClient, opcUaSubscriptionLifecycle, - jsonToOpcUAConverterOpt, - jsonSchemaGeneratorOpt)); + jsonToOpcUAConverter, + jsonSchemaGenerator)); } else { return CompletableFuture.completedFuture(null); } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index d18e39f22..5e9146d70 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -88,17 +88,16 @@ public void start( synchronized (this) { if (opcUaClientWrapper == null) { try { - OpcUaClientWrapper.createAndConnect( - adapterId, + OpcUaClientWrapper.createAndConnect(adapterId, adapterConfig, - tags, northboundMappings, + tags, + northboundMappings, protocolAdapterState, moduleServices.eventService(), moduleServices.adapterPublishService(), adapterInformation.getProtocolId(), protocolAdapterMetricsService, - output) - .thenApply(wrapper -> { + output).thenApply(wrapper -> { protocolAdapterState.setConnectionStatus(CONNECTED); output.startedSuccessfully(); opcUaClientWrapper = wrapper; @@ -188,8 +187,14 @@ public void createTagSchema( .findFirst() .ifPresentOrElse(def -> opcUaClientWrapperTemp.createMqttPayloadJsonSchema((OpcuaTag) def, output), () -> { - //FIXME needs logging - output.adapterNotStarted(); + log.warn( + "The tag '{}' was not found during creation of schema for tags on remote plc. Available tags: {}", + input.getTagName(), + tags); + output.tagNotFound(String.format( + "The tag '%s' was not found during creation of schema for tags on remote plc. Available tags: '%s'", + input.getTagName(), + tags)); }); } else { output.adapterNotStarted(); diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java index 335b50387..17844331f 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java @@ -69,7 +69,7 @@ public class OpcUaSpecificAdapterConfig implements ProtocolSpecificAdapterConfig @JsonProperty(value = "opcuaToMqtt") @ModuleConfigField(title = "OPC UA To MQTT Config", description = "The configuration for a data stream from OPC UA to MQTT") - private final @Nullable OpcUaToMqttConfig opcuaToMqttConfig; + private final @NotNull OpcUaToMqttConfig opcuaToMqttConfig; @JsonCreator public OpcUaSpecificAdapterConfig( @@ -85,7 +85,6 @@ public OpcUaSpecificAdapterConfig( this.tls = requireNonNullElse(tls, new Tls(false, null, null)); this.opcuaToMqttConfig = Objects.requireNonNullElseGet(opcuaToMqttConfig, () -> new OpcUaToMqttConfig(null, null)); - this.security = requireNonNullElse(security, new Security(SecPolicy.DEFAULT)); } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java index 31130a1c5..b40426432 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java @@ -41,4 +41,9 @@ public boolean equals(final @Nullable Object o) { public int hashCode() { return node.hashCode(); } + + @Override + public @NotNull String toString() { + return "OpcuaTagDefinition{" + "node='" + node + '\'' + '}'; + } } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/mqtt2opcua/JsonSchemaGenerator.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/mqtt2opcua/JsonSchemaGenerator.java index 34570a746..90d5a3523 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/mqtt2opcua/JsonSchemaGenerator.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/mqtt2opcua/JsonSchemaGenerator.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; +import com.hivemq.adapter.sdk.api.schema.TagSchemaCreationOutput; import org.eclipse.milo.opcua.binaryschema.AbstractCodec; import org.eclipse.milo.opcua.sdk.client.DataTypeTreeBuilder; import org.eclipse.milo.opcua.sdk.client.OpcUaClient; @@ -56,26 +57,31 @@ public JsonSchemaGenerator(final @NotNull OpcUaClient client, final @NotNull Obj this.builtinJsonSchema = new BuiltinJsonSchema(); } - public @NotNull CompletableFuture<@NotNull JsonNode> createJsonSchema(final @NotNull NodeId destinationNodeId) { + public void createJsonSchema( + final @NotNull NodeId destinationNodeId, final @NotNull TagSchemaCreationOutput output) { final CompletableFuture variableNodeFuture = client.getAddressSpace().getVariableNodeAsync(destinationNodeId); - return variableNodeFuture.thenApply(uaVariableNode -> { + variableNodeFuture.whenComplete((uaVariableNode, throwable) -> { + if (throwable != null) { + // no node was found for the given nodeId + output.tagNotFound("No node was found for the given node id '" + destinationNodeId + "'"); + return; + } final NodeId dataTypeNodeId = uaVariableNode.getDataType(); final DataTypeTree.DataType dataType = tree.getDataType(dataTypeNodeId); if (dataType == null) { - throw new RuntimeException("No data type was found in the DataTypeTree for node id '" + - dataTypeNodeId + - "'"); + output.fail("Unable to find the data type for the given node id '" + destinationNodeId + "'."); + return; } final BuiltinDataType builtinType = tree.getBuiltinType(dataType.getNodeId()); if (builtinType != BuiltinDataType.ExtensionObject) { - return builtinJsonSchema.getJsonSchema(builtinType); + output.finish(builtinJsonSchema.getJsonSchema(builtinType)); } else { final NodeId binaryEncodingId = dataType.getBinaryEncodingId(); if (binaryEncodingId == null) { - throw new RuntimeException("No encoding was present for data type: '" + dataType + "'"); + output.fail("No encoding was present for the complex data type: '" + dataType + "'."); } - return jsonSchemaFromNodeId(binaryEncodingId); + output.finish(jsonSchemaFromNodeId(binaryEncodingId)); } }); }