Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kie-issues#1669: jBPM Quarkus DevUI seems to not update the # of items #2814

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,21 @@ public CardPageBuildItem pages(
.metadata("page", "Processes")
.title("Process Instances")
.icon("font-awesome-solid:diagram-project")
.dynamicLabelJsonRPCMethodName("queryProcessInstancesCount"));
.streamingLabelJsonRPCMethodName("queryProcessInstancesCount"));

cardPageBuildItem.addPage(Page.webComponentPageBuilder()
.componentLink("qwc-jbpm-quarkus-devui.js")
.metadata("page", "Tasks")
.title("Tasks")
.icon("font-awesome-solid:bars-progress")
.dynamicLabelJsonRPCMethodName("queryTasksCount"));
.streamingLabelJsonRPCMethodName("queryTasksCount"));

cardPageBuildItem.addPage(Page.webComponentPageBuilder()
.componentLink("qwc-jbpm-quarkus-devui.js")
.metadata("page", "Jobs")
.title("Jobs")
.icon("font-awesome-solid:clock")
.dynamicLabelJsonRPCMethodName("queryJobsCount"));
.streamingLabelJsonRPCMethodName("queryJobsCount"));

cardPageBuildItem.addPage(Page.webComponentPageBuilder()
.componentLink("qwc-jbpm-quarkus-devui.js")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@
<artifactId>vertx-web-client</artifactId>
</dependency>

<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-api</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jbpm.quarkus.devui.runtime.rpc;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataIndexCounter {
private static final Logger LOGGER = LoggerFactory.getLogger(DataIndexCounter.class);
private final Vertx vertx;
private final Multi<String> multi;
private final WebClient dataIndexWebClient;

private String query;
private String field;
private String count = "-";
private MultiEmitter<? super String> emitter;
private long vertxTimer;

public DataIndexCounter(String query, String graphField, WebClient dataIndexWebClient) {
if (dataIndexWebClient == null) {
throw new IllegalArgumentException("dataIndexWebClient is null");
}
this.query = query;
this.field = graphField;
this.dataIndexWebClient = dataIndexWebClient;
this.vertx = Vertx.vertx();

this.multi = Multi.createFrom().emitter(emitter -> {
this.emitter = emitter;
vertxTimer = vertx.setPeriodic(500, id -> {
this.emit();
});
this.emit();
});
refreshCount();
}

public void refresh() {
vertx.setTimer(500, id -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timer looks like to short, in slow laptops (like main) this may not work, but I think we can go with it for now.

refreshCount();
});
}

public void stop() {
vertx.cancelTimer(vertxTimer);
}

private void emit() {
emitter.emit(count);
}

private void refreshCount() {
LOGGER.info("Refreshing data for query: {}", query);

doQuery(query, field).toCompletionStage()
.thenAccept(result -> {
this.count = result;
this.emit();
});
}

private Future<String> doQuery(String query, String graphModelName) {
return this.dataIndexWebClient.post("/graphql")
.putHeader("content-type", "application/json")
.sendJson(new JsonObject(query))
.map(response -> {
if (response.statusCode() == 200) {
JsonObject responseData = response.bodyAsJsonObject().getJsonObject("data");
return String.valueOf(responseData.getJsonArray(graphModelName).size());
}
return "-";
});
}

public Multi<String> getMulti() {
return multi;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.jbpm.quarkus.devui.runtime.rpc;

import jakarta.enterprise.context.ApplicationScoped;

import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.quarkus.arc.profile.IfBuildProfile;

import java.util.Collection;
import java.util.Objects;

@ApplicationScoped
@IfBuildProfile("dev")
public class JBPMDevUIEventPublisher implements EventPublisher {

private static final Logger LOGGER = LoggerFactory.getLogger(JBPMDevUIEventPublisher.class);
private Runnable onProcessEvent;
private Runnable onTaskEvent;
private Runnable onJobEvent;

@Override
public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceStateDataEvent":
maybeRun(onProcessEvent);
break;
case "UserTaskInstanceStateDataEvent":
maybeRun(onTaskEvent);
break;
case "JobEvent":
maybeRun(onJobEvent);
break;
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can get rid of the default

LOGGER.debug("Unknown type of event '{}', ignoring for this publisher", event.getType());
}
}

@Override
public void publish(Collection<DataEvent<?>> events) {
events.forEach(this::publish);
}

private void maybeRun(Runnable runnable) {
if (Objects.nonNull(runnable)) {
runnable.run();
}
}

public void setOnProcessEventListener(Runnable onProcessEvent) {
this.onProcessEvent = onProcessEvent;
}

public void setOnTaskEventListener(Runnable onTaskEvent) {
this.onTaskEvent = onTaskEvent;
}

public void setOnJobEventListener(Runnable onJobEvent) {
this.onJobEvent = onJobEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Optional;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;

import org.eclipse.microprofile.config.ConfigProvider;
import org.jbpm.quarkus.devui.runtime.forms.FormsStorage;

Expand All @@ -54,11 +56,16 @@ public class JBPMDevuiJsonRPCService {
private WebClient dataIndexWebClient;

private final Vertx vertx;
private final JBPMDevUIEventPublisher eventPublisher;
private final FormsStorage formsStorage;
private DataIndexCounter processesCounter;
private DataIndexCounter tasksCounter;
private DataIndexCounter jobsCounter;

@Inject
public JBPMDevuiJsonRPCService(Vertx vertx, FormsStorage formsStorage) {
public JBPMDevuiJsonRPCService(Vertx vertx, JBPMDevUIEventPublisher eventPublisher, FormsStorage formsStorage) {
this.vertx = vertx;
this.eventPublisher = eventPublisher;
this.formsStorage = formsStorage;
}

Expand All @@ -71,6 +78,14 @@ public void init() {
private void initDataIndexWebClient(String dataIndexURL) {
try {
this.dataIndexWebClient = WebClient.create(vertx, buildWebClientOptions(dataIndexURL));
this.processesCounter = new DataIndexCounter(ALL_PROCESS_INSTANCES_IDS_QUERY, PROCESS_INSTANCES,
dataIndexWebClient);
this.tasksCounter = new DataIndexCounter(ALL_TASKS_IDS_QUERY, USER_TASKS, dataIndexWebClient);
this.jobsCounter = new DataIndexCounter(ALL_JOBS_IDS_QUERY, JOBS, dataIndexWebClient);

this.eventPublisher.setOnProcessEventListener(processesCounter::refresh);
this.eventPublisher.setOnTaskEventListener(tasksCounter::refresh);
this.eventPublisher.setOnJobEventListener(jobsCounter::refresh);
} catch (Exception ex) {
LOGGER.warn("Cannot configure dataIndexWebClient with 'kogito.data-index.url'='{}':", dataIndexURL, ex);
}
Expand All @@ -84,36 +99,26 @@ protected WebClientOptions buildWebClientOptions(String dataIndexURL) throws Mal
.setSsl(url.getProtocol().compareToIgnoreCase("https") == 0);
}

public Uni<String> queryProcessInstancesCount() {
return doQuery(ALL_PROCESS_INSTANCES_IDS_QUERY, PROCESS_INSTANCES);
}

public Uni<String> queryTasksCount() {
return doQuery(ALL_TASKS_IDS_QUERY, USER_TASKS);
public Multi<String> queryProcessInstancesCount() {
return Multi.createFrom().deferred(() -> processesCounter.getMulti());
}

public Uni<String> queryJobsCount() {
return doQuery(ALL_JOBS_IDS_QUERY, JOBS);
public Multi<String> queryTasksCount() {
return Multi.createFrom().deferred(() -> tasksCounter.getMulti());
}

private Uni<String> doQuery(String query, String graphModelName) {
if(dataIndexWebClient == null) {
LOGGER.warn("Cannot perform '{}' query, dataIndexWebClient couldn't be set. Is DataIndex correctly? Please verify '{}' value", graphModelName, DATA_INDEX_URL);
return Uni.createFrom().item("-");
}
return Uni.createFrom().completionStage(this.dataIndexWebClient.post("/graphql")
.putHeader("content-type", "application/json")
.sendJson(new JsonObject(query))
.map(response -> {
if(response.statusCode() == 200) {
JsonObject responseData = response.bodyAsJsonObject().getJsonObject("data");
return String.valueOf(responseData.getJsonArray(graphModelName).size());
}
return "-";
}).toCompletionStage());
public Multi<String> queryJobsCount() {
return Multi.createFrom().deferred(() -> jobsCounter.getMulti());
}

public Uni<String> getFormsCount() {
return Uni.createFrom().item(String.valueOf(this.formsStorage.getFormsCount()));
}
}

@PreDestroy
public void destroy() {
processesCounter.stop();
tasksCounter.stop();
jobsCounter.stop();
}
}
Loading