From 429579092b98d46e8aaa6162283aee6e179e8ce0 Mon Sep 17 00:00:00 2001 From: bncriju Date: Mon, 9 Dec 2024 20:41:51 +0530 Subject: [PATCH] introducing Intermediate class --- .../devui/runtime/rpc/DataIndexCounter.java | 80 +++++++ .../runtime/rpc/JBPMDevUIEventPublisher.java | 118 +++++----- .../runtime/rpc/JBPMDevuiJsonRPCService.java | 219 ++++++++---------- 3 files changed, 234 insertions(+), 183 deletions(-) create mode 100644 packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/DataIndexCounter.java diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/DataIndexCounter.java b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/DataIndexCounter.java new file mode 100644 index 00000000000..912bb8fdf23 --- /dev/null +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/DataIndexCounter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jbpm.quarkus.devui.runtime.rpc; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.WebClient; +import jakarta.enterprise.context.ApplicationScoped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class DataIndexCounter { + private static final String DATA_INDEX_URL = "kogito.data-index.url"; + private static final Logger LOGGER = LoggerFactory.getLogger(DataIndexCounter.class); + + private Multi multi; + private WebClient dataIndexWebClient; + + public DataIndexCounter(String query, String graphname, BroadcastProcessor stream,WebClient dataIndexWebClient, JBPMDevUIEventPublisher eventPublisher) { + this.dataIndexWebClient = dataIndexWebClient; + + Vertx vertx = Vertx.vertx(); + this.multi = Multi.createFrom().emitter(emitter -> { + vertx.setTimer(1000, id -> emitter.emit("Initial data emitted")); + }); + + + vertx.setTimer(1000, id -> refreshData(stream,query,graphname)); + } + + private void refreshData(BroadcastProcessor stream, String query, String graphname ) { + LOGGER.info("Refreshing data for query: {}", query); + + doQuery(query, graphname).toCompletionStage() + .thenAccept(result -> { + stream.onNext(result); + }); + } + + private Future doQuery(String query, String graphModelName) { + if(dataIndexWebClient == null) { + LOGGER.warn("Cannot perform '{}' query, dataIndexWebClient couldn't be set. Is DataIndex correctly? Please verify '{}' value", graphModelName, DATA_INDEX_URL); + return Future.succeededFuture("-"); + } + return this.dataIndexWebClient.post("/graphql") + .putHeader("content-type", "application/json") + .sendJson(new JsonObject(query)) + .map(response -> { + if(response.statusCode() == 200) { + JsonObject responseData = response.bodyAsJsonObject().getJsonObject("data"); + return String.valueOf(responseData.getJsonArray(graphModelName).size()); + } + return "-"; + }); + } + + public Multi getMulti() { + return multi; + } +} \ No newline at end of file diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevUIEventPublisher.java b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevUIEventPublisher.java index d451e002bc5..2166133e70c 100644 --- a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevUIEventPublisher.java +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevUIEventPublisher.java @@ -17,60 +17,64 @@ * under the License. */ -package org.jbpm.quarkus.devui.runtime.rpc; -import jakarta.enterprise.context.ApplicationScoped; -import org.kie.kogito.event.DataEvent; -import org.kie.kogito.event.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Objects; - -@ApplicationScoped -public class JBPMDevUIEventPublisher implements EventPublisher { - - private static final Logger LOGGER = LoggerFactory.getLogger(JBPMDevUIEventPublisher.class); - private Runnable onProcessEvent; - private Runnable onTaskEvent; - private Runnable onJobEvent; - - @Override - public void publish(DataEvent event) { - switch (event.getType()) { - case "ProcessInstanceStateDataEvent": - maybeRun(onProcessEvent); - break; - case "UserTaskInstanceStateDataEvent": - maybeRun(onTaskEvent); - break; - case "JobEvent": - maybeRun(onJobEvent); - break; - default: - LOGGER.debug("Unknown type of event '{}', ignoring for this publisher", event.getType()); - } - } - - @Override - public void publish(Collection> events) { - events.forEach(this::publish); - } - - private void maybeRun(Runnable runnable) { - if(Objects.nonNull(runnable)) { - runnable.run(); - } - } - - public void setOnProcessEventListener(Runnable onProcessEvent) { - this.onProcessEvent = onProcessEvent; - } - - public void setOnTaskEventListener(Runnable onTaskEvent) { - this.onTaskEvent = onTaskEvent; - } - - public void setOnJobEventListener(Runnable onJobEvent) { - this.onJobEvent = onJobEvent; - } -} + package org.jbpm.quarkus.devui.runtime.rpc; + import jakarta.enterprise.context.ApplicationScoped; + import org.kie.kogito.event.DataEvent; + import org.kie.kogito.event.EventPublisher; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import io.quarkus.arc.profile.IfBuildProfile; + + import java.util.Collection; + import java.util.Objects; + + @ApplicationScoped + @IfBuildProfile("dev") + public class JBPMDevUIEventPublisher implements EventPublisher { + + private static final Logger LOGGER = LoggerFactory.getLogger(JBPMDevUIEventPublisher.class); + private Runnable onProcessEvent; + private Runnable onTaskEvent; + private Runnable onJobEvent; + + @Override + public void publish(DataEvent event) { + switch (event.getType()) { + case "ProcessInstanceStateDataEvent": + maybeRun(onProcessEvent); + break; + case "UserTaskInstanceStateDataEvent": + maybeRun(onTaskEvent); + break; + case "JobEvent": + maybeRun(onJobEvent); + break; + default: + LOGGER.debug("Unknown type of event '{}', ignoring for this publisher", event.getType()); + } + } + + @Override + public void publish(Collection> events) { + events.forEach(this::publish); + } + + private void maybeRun(Runnable runnable) { + if(Objects.nonNull(runnable)) { + runnable.run(); + } + } + + public void setOnProcessEventListener(Runnable onProcessEvent) { + this.onProcessEvent = onProcessEvent; + } + + public void setOnTaskEventListener(Runnable onTaskEvent) { + this.onTaskEvent = onTaskEvent; + } + + public void setOnJobEventListener(Runnable onJobEvent) { + this.onJobEvent = onJobEvent; + } + } \ No newline at end of file diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java index dcf73367b88..67d826a5214 100644 --- a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java @@ -17,137 +17,104 @@ * under the License. */ -package org.jbpm.quarkus.devui.runtime.rpc; + package org.jbpm.quarkus.devui.runtime.rpc; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Optional; - -import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.Multi; + import java.net.MalformedURLException; + import java.net.URL; + import java.util.Optional; + + import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.WebClientOptions; -import jakarta.annotation.PostConstruct; -import org.eclipse.microprofile.config.ConfigProvider; -import org.jbpm.quarkus.devui.runtime.forms.FormsStorage; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@ApplicationScoped -public class JBPMDevuiJsonRPCService { - private static final String DATA_INDEX_URL = "kogito.data-index.url"; +import io.smallrye.mutiny.Multi; + import io.vertx.core.Vertx; + import io.vertx.ext.web.client.WebClient; + import io.vertx.ext.web.client.WebClientOptions; + import jakarta.annotation.PostConstruct; + import org.eclipse.microprofile.config.ConfigProvider; + import org.jbpm.quarkus.devui.runtime.forms.FormsStorage; + + import jakarta.enterprise.context.ApplicationScoped; + import jakarta.inject.Inject; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + @ApplicationScoped + public class JBPMDevuiJsonRPCService { + private static final String DATA_INDEX_URL = "kogito.data-index.url"; private final BroadcastProcessor processesStream = BroadcastProcessor.create(); private final BroadcastProcessor userTaskStream = BroadcastProcessor.create(); private final BroadcastProcessor jobsStreams = BroadcastProcessor.create(); - - private static final Logger LOGGER = LoggerFactory.getLogger(JBPMDevuiJsonRPCService.class); - - public static final String PROCESS_INSTANCES = "ProcessInstances"; - public static final String USER_TASKS = "UserTaskInstances"; - public static final String JOBS = "Jobs"; - - public static final String ALL_TASKS_IDS_QUERY = "{ \"operationName\": \"getAllTasksIds\", \"query\": \"query getAllTasksIds{ UserTaskInstances{ id } }\" }"; - public static final String ALL_PROCESS_INSTANCES_IDS_QUERY = "{ \"operationName\": \"getAllProcessesIds\", \"query\": \"query getAllProcessesIds{ ProcessInstances{ id } }\" }"; - public static final String ALL_JOBS_IDS_QUERY = "{ \"operationName\": \"getAllJobsIds\", \"query\": \"query getAllJobsIds{ Jobs{ id } }\" }"; - - private WebClient dataIndexWebClient; - - private final Vertx vertx; - private final JBPMDevUIEventPublisher eventPublisher; - private final FormsStorage formsStorage; - - @Inject + + private static final Logger LOGGER = LoggerFactory.getLogger(JBPMDevuiJsonRPCService.class); + + public static final String PROCESS_INSTANCES = "ProcessInstances"; + public static final String USER_TASKS = "UserTaskInstances"; + public static final String JOBS = "Jobs"; + + public static final String ALL_TASKS_IDS_QUERY = "{ \"operationName\": \"getAllTasksIds\", \"query\": \"query getAllTasksIds{ UserTaskInstances{ id } }\" }"; + public static final String ALL_PROCESS_INSTANCES_IDS_QUERY = "{ \"operationName\": \"getAllProcessesIds\", \"query\": \"query getAllProcessesIds{ ProcessInstances{ id } }\" }"; + public static final String ALL_JOBS_IDS_QUERY = "{ \"operationName\": \"getAllJobsIds\", \"query\": \"query getAllJobsIds{ Jobs{ id } }\" }"; + + private WebClient dataIndexWebClient; + + private final Vertx vertx; + private final JBPMDevUIEventPublisher eventPublisher; + private final FormsStorage formsStorage; + DataIndexCounter processesCounter; + DataIndexCounter tasksCounter; + DataIndexCounter jobsCounter; + + @Inject public JBPMDevuiJsonRPCService(Vertx vertx, JBPMDevUIEventPublisher eventPublisher, FormsStorage formsStorage) { - this.vertx = vertx; - this.eventPublisher = eventPublisher; - this.formsStorage = formsStorage; - this.eventPublisher.setOnProcessEventListener(this::onProcessEvent); - this.eventPublisher.setOnTaskEventListener(this::onUserTaskEvent); - this.eventPublisher.setOnJobEventListener(this::onJobEvent); - } - - @PostConstruct - public void init() { - Optional dataIndexURL = ConfigProvider.getConfig().getOptionalValue(DATA_INDEX_URL, String.class); - dataIndexURL.ifPresent(this::initDataIndexWebClient); - this.onProcessEvent(); - this.onUserTaskEvent(); - this.onJobEvent(); - } - - private void initDataIndexWebClient(String dataIndexURL) { - try { - this.dataIndexWebClient = WebClient.create(vertx, buildWebClientOptions(dataIndexURL)); - } catch (Exception ex) { - LOGGER.warn("Cannot configure dataIndexWebClient with 'kogito.data-index.url'='{}':", dataIndexURL, ex); - } - } - - protected WebClientOptions buildWebClientOptions(String dataIndexURL) throws MalformedURLException { - URL url = new URL(dataIndexURL); - return new WebClientOptions() - .setDefaultHost(url.getHost()) - .setDefaultPort((url.getPort() != -1 ? url.getPort() : url.getDefaultPort())) - .setSsl(url.getProtocol().compareToIgnoreCase("https") == 0); - } - - public Multi queryProcessInstancesCount() { - return processesStream; - } - - public Multi queryTasksCount() { - return userTaskStream; - } - - public Multi queryJobsCount() { - return jobsStreams; - } - - private void onProcessEvent() { - doQuery(processesStream, ALL_PROCESS_INSTANCES_IDS_QUERY, PROCESS_INSTANCES); - } - private void onUserTaskEvent() { - doQuery(userTaskStream, ALL_TASKS_IDS_QUERY, USER_TASKS); - } - private void onJobEvent() { - doQuery(jobsStreams, ALL_JOBS_IDS_QUERY, JOBS); - } - - private void doQuery(BroadcastProcessor stream, String query, String graphModelName) { - LOGGER.warn("Query: " + graphModelName); - doQuery(query, graphModelName).toCompletionStage() - .thenAccept(result -> { - LOGGER.warn("Query: " + graphModelName + ". Received response: " + result); - stream.onNext(result); - }); - } - - private Future doQuery(String query, String graphModelName) { - if(dataIndexWebClient == null) { - LOGGER.warn("Cannot perform '{}' query, dataIndexWebClient couldn't be set. Is DataIndex correctly? Please verify '{}' value", graphModelName, DATA_INDEX_URL); - return Future.succeededFuture("-"); - } - return this.dataIndexWebClient.post("/graphql") - .putHeader("content-type", "application/json") - .sendJson(new JsonObject(query)) - .map(response -> { - if(response.statusCode() == 200) { - JsonObject responseData = response.bodyAsJsonObject().getJsonObject("data"); - return String.valueOf(responseData.getJsonArray(graphModelName).size()); - } - return "-"; - }); - } - + this.vertx = vertx; + this.eventPublisher = eventPublisher; + this.formsStorage = formsStorage; + + processesCounter = new DataIndexCounter(ALL_PROCESS_INSTANCES_IDS_QUERY, PROCESS_INSTANCES, processesStream, dataIndexWebClient,eventPublisher); + tasksCounter = new DataIndexCounter(ALL_TASKS_IDS_QUERY, USER_TASKS, userTaskStream, dataIndexWebClient,eventPublisher); + jobsCounter = new DataIndexCounter(ALL_JOBS_IDS_QUERY, JOBS, jobsStreams, dataIndexWebClient,eventPublisher); + + this.eventPublisher.setOnJobEventListener(this::queryProcessInstancesCount); + this.eventPublisher.setOnTaskEventListener(this::queryTasksCount); + this.eventPublisher.setOnJobEventListener(this::queryJobsCount); + } + + @PostConstruct + public void init() { + Optional dataIndexURL = ConfigProvider.getConfig().getOptionalValue(DATA_INDEX_URL, String.class); + dataIndexURL.ifPresent(this::initDataIndexWebClient); + } + + private void initDataIndexWebClient(String dataIndexURL) { + try { + this.dataIndexWebClient = WebClient.create(vertx, buildWebClientOptions(dataIndexURL)); + } catch (Exception ex) { + LOGGER.warn("Cannot configure dataIndexWebClient with 'kogito.data-index.url'='{}':", dataIndexURL, ex); + } + } + + protected WebClientOptions buildWebClientOptions(String dataIndexURL) throws MalformedURLException { + URL url = new URL(dataIndexURL); + return new WebClientOptions() + .setDefaultHost(url.getHost()) + .setDefaultPort((url.getPort() != -1 ? url.getPort() : url.getDefaultPort())) + .setSsl(url.getProtocol().compareToIgnoreCase("https") == 0); + } + + public Multi queryProcessInstancesCount() { + return processesCounter.getMulti(); + } + + public Multi queryTasksCount() { + return tasksCounter.getMulti(); + } + + public Multi queryJobsCount() { + return jobsCounter.getMulti(); + } + public Uni getFormsCount() { - return Uni.createFrom().item(String.valueOf(this.formsStorage.getFormsCount())); - } -} + return Uni.createFrom().item(String.valueOf(this.formsStorage.getFormsCount())); + } +} \ No newline at end of file