diff --git a/addons/jobs/jobs-api/pom.xml b/addons/jobs/jobs-api/pom.xml
index 90a19e98e35..8e52661f5b8 100644
--- a/addons/jobs/jobs-api/pom.xml
+++ b/addons/jobs/jobs-api/pom.xml
@@ -18,4 +18,20 @@
provided
+
+
+
+ org.jboss.jandex
+ jandex-maven-plugin
+
+
+ make-index
+
+ jandex
+
+
+
+
+
+
\ No newline at end of file
diff --git a/addons/jobs/jobs-api/resources/META-INF/beans.xml b/addons/jobs/jobs-api/resources/META-INF/beans.xml
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/addons/jobs/jobs-service/.gitignore b/addons/jobs/jobs-service/.gitignore
index bc4ac54309a..142cb5de994 100644
--- a/addons/jobs/jobs-service/.gitignore
+++ b/addons/jobs/jobs-service/.gitignore
@@ -8,6 +8,7 @@ target/
/*.ipr
/*.iws
*.iml
-*.log
+/*.log**
+
diff --git a/addons/jobs/jobs-service/pom.xml b/addons/jobs/jobs-service/pom.xml
index 80222ae130e..f50f80674ea 100644
--- a/addons/jobs/jobs-service/pom.xml
+++ b/addons/jobs/jobs-service/pom.xml
@@ -8,9 +8,6 @@
8.0.0-SNAPSHOT
jobs-service
- Jobs Service
- Jobs Service (Timers and Async Jobs)
-
org.kie.kogito
@@ -36,6 +33,12 @@
io.quarkus
quarkus-resteasy-jackson
+
+
+ io.quarkus
+ quarkus-jackson
+
+
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
@@ -45,6 +48,23 @@
quarkus-smallrye-openapi
+
+
+ io.quarkus
+ quarkus-smallrye-reactive-messaging
+
+
+
+
+ io.quarkus
+ quarkus-infinispan-client
+
+
+ org.infinispan
+ infinispan-query-dsl
+
+
+
io.quarkus
@@ -57,7 +77,32 @@
test
+
+ org.infinispan
+ infinispan-server-hotrod
+ test
+
+
+
+ org.infinispan
+ infinispan-remote-query-server
+ test
+
+
+ Jobs Service
+ Jobs Service (Timers and Async Jobs)
+
+
+
+ io.quarkus
+ quarkus-bom
+ ${version.io.quarkus}
+ pom
+ import
+
+
+
@@ -124,6 +169,18 @@
+
+ org.jboss.jandex
+ jandex-maven-plugin
+
+
+ make-index
+
+ jandex
+
+
+
+
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/executor/HttpJobExecutor.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/executor/HttpJobExecutor.java
index c073f32cc46..187347437ea 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/executor/HttpJobExecutor.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/executor/HttpJobExecutor.java
@@ -24,12 +24,20 @@
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
+import io.smallrye.reactive.messaging.annotations.Channel;
+import io.smallrye.reactive.messaging.annotations.Emitter;
+import io.smallrye.reactive.messaging.annotations.OnOverflow;
import io.vertx.axle.core.Vertx;
+import io.vertx.axle.core.buffer.Buffer;
+import io.vertx.axle.ext.web.client.HttpResponse;
import io.vertx.axle.ext.web.client.WebClient;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.api.Job;
+import org.kie.kogito.jobs.service.stream.AvailableStreams;
import org.kie.kogito.jobs.service.converters.HttpConverters;
import org.kie.kogito.jobs.service.model.HTTPRequestCallback;
-import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,27 +54,64 @@ public class HttpJobExecutor implements JobExecutor {
@Inject
HttpConverters httpConverters;
+ /**
+ * Publish on Stream of Job Error events
+ */
@Inject
- ReactiveJobRepository jobRepository;
+ @Channel(AvailableStreams.JOB_ERROR)
+ @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
+ Emitter jobErrorEmitter;
+
+ /**
+ * Publish on Stream of Job Success events
+ */
+ @Inject
+ @Channel(AvailableStreams.JOB_SUCCESS)
+ @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
+ Emitter jobSuccessEmitter;
@PostConstruct
void initialize() {
this.client = WebClient.create(vertx);
}
- private CompletionStage executeCallback(HTTPRequestCallback request) {
+ private CompletionStage> executeCallback(HTTPRequestCallback request) {
LOGGER.info("Executing callback {}", request);
final URL url = httpConverters.convertURL(request.getUrl());
return client.request(httpConverters.convertHttpMethod(request.getMethod()),
url.getPort(),
url.getHost(),
url.getPath())
- .send()
- .thenApplyAsync(response -> Optional
- .ofNullable(response.statusCode())
- .filter(new Integer(200)::equals)
- .map(code -> Boolean.TRUE)
- .orElse(Boolean.FALSE));
+ .send();
+ }
+
+ private String getResponseCode(HttpResponse response) {
+ return Optional.ofNullable(response.statusCode())
+ .map(String::valueOf)
+ .orElse(null);
+ }
+
+ private PublisherBuilder handleResponse(JobExecutionResponse response) {
+ LOGGER.info("handle response {}", response);
+ return ReactiveStreams.of(response)
+ .map(JobExecutionResponse::getCode)
+ .flatMap(code -> code.equals("200")
+ ? handleSuccess(response)
+ : handleError(response));
+ }
+
+ private PublisherBuilder handleError(JobExecutionResponse response) {
+ LOGGER.info("handle error {}", response);
+ return ReactiveStreams.of(response)
+ .peek(jobErrorEmitter::send)
+ .peek(r -> LOGGER.info("Error executing job {}.", r));
+ }
+
+ private PublisherBuilder handleSuccess(JobExecutionResponse response) {
+ LOGGER.info("handle success {}", response);
+ return ReactiveStreams.of(response)
+ .peek(jobSuccessEmitter::send)
+ .peek(r -> LOGGER.info("Success executing job {}.", r));
}
@Override
@@ -77,16 +122,25 @@ public CompletionStage execute(Job job) {
.method(HTTPRequestCallback.HTTPMethod.POST)
.build();
- return executeCallback(callback)
- .thenApply(result -> {
- LOGGER.info("Response of executed job {} {}", result, job);
- jobRepository.delete(job.getId());
- return job;
- })
- //handle error
- .exceptionally(ex -> {
- LOGGER.error("Error executing job " + job, ex);
+ return ReactiveStreams.fromCompletionStage(executeCallback(callback))
+ .map(response -> JobExecutionResponse.builder()
+ .message(response.statusMessage())
+ .code(getResponseCode(response))
+ .now()
+ .jobId(job.getId())
+ .build())
+ .flatMap(this::handleResponse)
+ .findFirst()
+ .run()
+ .thenApply(response -> job)
+ .exceptionally((ex) -> {
+ LOGGER.error("Generic error executing job {}", job, ex);
+ jobErrorEmitter.send(JobExecutionResponse.builder()
+ .message(ex.getMessage())
+ .now()
+ .jobId(job.getId())
+ .build());
return job;
});
}
-}
+}
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
index 9fb11ac548f..38c21007263 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
@@ -17,22 +17,28 @@
package org.kie.kogito.jobs.service.json;
import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Produces;
+import javax.inject.Singleton;
import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-
import io.quarkus.jackson.ObjectMapperCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ApplicationScoped
-public class JacksonConfiguration implements ObjectMapperCustomizer {
-
-
- @Override
- public void customize(ObjectMapper objectMapper) {
- objectMapper.registerModule(new JavaTimeModule());
- objectMapper.findAndRegisterModules();
- objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
+public class JacksonConfiguration {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JacksonConfiguration.class);
+
+ @Singleton
+ @Produces
+ public ObjectMapperCustomizer customizer() {
+ return (objectMapper) -> {
+ LOGGER.info("Jackson customization initialized.");
+ objectMapper.registerModule(new JavaTimeModule());
+ objectMapper.findAndRegisterModules();
+ objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
+ };
}
-
-}
+}
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/JobExecutionResponse.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/JobExecutionResponse.java
new file mode 100644
index 00000000000..f0f3dac7a5c
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/JobExecutionResponse.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.model;
+
+import java.time.ZonedDateTime;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+public class JobExecutionResponse {
+
+ private String message;
+ private String code;
+ private ZonedDateTime timestamp;
+ private String jobId;
+
+ public JobExecutionResponse() {
+ }
+
+ public JobExecutionResponse(String message, String code, ZonedDateTime timestamp, String jobId) {
+ this.message = message;
+ this.code = code;
+ this.timestamp = timestamp;
+ this.jobId = jobId;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public String getCode() {
+ return code;
+ }
+
+ public ZonedDateTime getTimestamp() {
+ return timestamp;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JobExecutionResponse)) {
+ return false;
+ }
+ JobExecutionResponse that = (JobExecutionResponse) o;
+ return Objects.equals(getMessage(), that.getMessage()) &&
+ Objects.equals(getCode(), that.getCode()) &&
+ Objects.equals(getTimestamp(), that.getTimestamp()) &&
+ Objects.equals(getJobId(), that.getJobId());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getMessage(), getCode(), getTimestamp(), getJobId());
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", JobExecutionResponse.class.getSimpleName() + "[", "]")
+ .add("message='" + message + "'")
+ .add("code='" + code + "'")
+ .add("timestamp=" + timestamp)
+ .add("jobId='" + jobId + "'")
+ .toString();
+ }
+
+ public static JobExecutionResponseBuilder builder(){
+ return new JobExecutionResponseBuilder();
+ }
+
+ public static class JobExecutionResponseBuilder {
+
+ private String message;
+ private String code;
+ private ZonedDateTime timestamp;
+ private String jobId;
+
+ public JobExecutionResponseBuilder message(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public JobExecutionResponseBuilder code(String code) {
+ this.code = code;
+ return this;
+ }
+
+ public JobExecutionResponseBuilder timestamp(ZonedDateTime timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public JobExecutionResponseBuilder now() {
+ this.timestamp = ZonedDateTime.now();
+ return this;
+ }
+
+ public JobExecutionResponseBuilder jobId(String jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public JobExecutionResponse build() {
+ return new JobExecutionResponse(message, code, timestamp, jobId);
+ }
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
new file mode 100644
index 00000000000..deb09c5609e
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.model;
+
+public enum JobStatus {
+ ERROR,//final
+ EXECUTED,//final
+ SCHEDULED,//active
+ RETRY,//active
+ CANCELED//final
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/ScheduledJob.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/ScheduledJob.java
index 9cd33debbbb..e0c313327d7 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/ScheduledJob.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/model/ScheduledJob.java
@@ -16,20 +16,32 @@
package org.kie.kogito.jobs.service.model;
+import java.time.ZonedDateTime;
import java.util.Optional;
-import java.util.StringJoiner;
import org.kie.kogito.jobs.api.Job;
+import org.kie.kogito.jobs.service.utils.DateUtil;
public class ScheduledJob {
private Job job;
private String scheduledId;
private Integer retries;
+ private JobStatus status;
+ private ZonedDateTime lastUpdate;
+ private JobExecutionResponse executionResponse;
- public ScheduledJob(Job job, String scheduledId) {
+ public ScheduledJob() {
+ }
+
+ public ScheduledJob(Job job, String scheduledId, Integer retries, JobStatus status, ZonedDateTime lastUpdate,
+ JobExecutionResponse executionResponse) {
this.job = job;
this.scheduledId = scheduledId;
+ this.retries = retries;
+ this.status = status;
+ this.lastUpdate = lastUpdate;
+ this.executionResponse = executionResponse;
}
public Job getJob() {
@@ -44,12 +56,91 @@ public Integer getRetries() {
return retries;
}
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public ZonedDateTime getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public JobExecutionResponse getExecutionResponse() {
+ return executionResponse;
+ }
+
+ public static ScheduledJobBuilder builder() {
+ return new ScheduledJobBuilder();
+ }
+
@Override
public String toString() {
- return new StringJoiner(", ", ScheduledJob.class.getSimpleName() + "[", "]")
- .add("job=" + Optional.ofNullable(job).map(Job::getId).orElse(null))
- .add("scheduledId='" + scheduledId + "'")
- .add("retries=" + retries)
- .toString();
+ final StringBuilder sb = new StringBuilder("ScheduledJob{");
+ sb.append("job=").append(job);
+ sb.append(", scheduledId='").append(scheduledId).append('\'');
+ sb.append(", retries=").append(retries);
+ sb.append(", status=").append(status);
+ sb.append(", lastUpdate=").append(lastUpdate);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public static class ScheduledJobBuilder {
+
+ private Job job;
+ private String scheduledId;
+ private Integer retries = 0;
+ private JobStatus status;
+ private ZonedDateTime lastUpdate;
+ private JobExecutionResponse executionResponse;
+
+ public ScheduledJobBuilder job(Job job) {
+ this.job = job;
+ return this;
+ }
+
+ public ScheduledJobBuilder scheduledId(String scheduledId) {
+ this.scheduledId = scheduledId;
+ return this;
+ }
+
+ public ScheduledJobBuilder retries(Integer retries) {
+ this.retries = retries;
+ return this;
+ }
+
+ public ScheduledJobBuilder incrementRetries() {
+ this.retries++;
+ return this;
+ }
+
+ public ScheduledJobBuilder of(ScheduledJob scheduledJob) {
+ return job(scheduledJob.getJob())
+ .scheduledId(scheduledJob.getScheduledId())
+ .retries(scheduledJob.getRetries())
+ .status(scheduledJob.getStatus());
+ }
+
+ public ScheduledJobBuilder status(JobStatus status) {
+ this.status = status;
+ return this;
+ }
+
+ public ScheduledJobBuilder lastUpdate(ZonedDateTime time) {
+ this.lastUpdate = time;
+ return this;
+ }
+
+ public ScheduledJobBuilder lastUpdate(JobExecutionResponse executionResponse) {
+ this.executionResponse = executionResponse;
+ return this;
+ }
+
+ public ScheduledJob build() {
+ return new ScheduledJob(job, scheduledId, retries, status, getLastUpdate(), executionResponse);
+ }
+
+ private ZonedDateTime getLastUpdate() {
+ return Optional.ofNullable(lastUpdate).orElseGet(() -> DateUtil.now());
+ }
}
}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/qualifier/Repository.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/qualifier/Repository.java
new file mode 100644
index 00000000000..10823544e04
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/qualifier/Repository.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.qualifier;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import javax.enterprise.util.AnnotationLiteral;
+import javax.inject.Qualifier;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Qualifier
+@Retention(RUNTIME)
+@Target({TYPE, METHOD, FIELD, PARAMETER})
+public @interface Repository {
+
+ String value();
+
+ final class Literal extends AnnotationLiteral implements Repository {
+
+ private String value;
+
+ public Literal(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String value() {
+ return value;
+ }
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java
index 39d243af21b..41b13501dc1 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java
@@ -16,11 +16,11 @@
package org.kie.kogito.jobs.service.repository;
-import java.time.ZonedDateTime;
import java.util.concurrent.CompletionStage;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
-import org.reactivestreams.Publisher;
public interface ReactiveJobRepository {
@@ -28,9 +28,12 @@ public interface ReactiveJobRepository {
CompletionStage get(String id);
- Publisher getByTime(ZonedDateTime expirationTime);
-
CompletionStage exists(String id);
CompletionStage delete(String id);
+
+ PublisherBuilder findByStatus(JobStatus... status);
+
+ PublisherBuilder findAll();
+
}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
new file mode 100644
index 00000000000..a76caf025e5
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.impl;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import io.vertx.core.Vertx;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+
+public abstract class BaseReactiveJobRepository implements ReactiveJobRepository {
+
+ private Vertx vertx;
+
+ public BaseReactiveJobRepository(Vertx vertx) {
+ this.vertx = vertx;
+ }
+
+ public CompletionStage runAsync(Supplier function) {
+ final CompletableFuture future = new CompletableFuture<>();
+ vertx.runOnContext((v) -> future.complete(function.get()));
+ return future;
+ }
+
+ @Override
+ public PublisherBuilder findByStatus(JobStatus... status) {
+ return findAll()
+ .filter(job -> Objects.nonNull(job.getStatus()))
+ .filter(job -> Arrays.stream(status).anyMatch(job.getStatus()::equals));
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
index 7704729d7ad..e97ff2c60f3 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
@@ -16,41 +16,39 @@
package org.kie.kogito.jobs.service.repository.impl;
-import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import io.vertx.core.Vertx;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.qualifier.Repository;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.reactivestreams.Publisher;
@ApplicationScoped
-public class InMemoryJobRepository implements ReactiveJobRepository {
+@Repository("in-memory")
+public class InMemoryJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository {
private final Map jobMap = new ConcurrentHashMap<>();
- private final Map> jobTimeMap = new ConcurrentHashMap<>();
+
+ public InMemoryJobRepository() {
+ super(null);
+ }
@Inject
- Vertx vertx;
+ public InMemoryJobRepository(Vertx vertx) {
+ super(vertx);
+ }
@Override
public CompletionStage save(ScheduledJob job) {
return runAsync(() -> {
jobMap.put(job.getJob().getId(), job);
- jobTimeMap.putIfAbsent(getExpirationTime(job), new ArrayList<>());
- jobTimeMap.get(getExpirationTime(job)).add(job.getJob().getId());
return job;
});
}
@@ -67,28 +65,11 @@ public CompletionStage exists(String key) {
@Override
public CompletionStage delete(String key) {
- return runAsync(() -> {
- Optional.ofNullable(jobTimeMap.get(getExpirationTime(jobMap.get(key))))
- .map(jobs-> jobs.remove(key));
- return jobMap.remove(key);
- });
- }
-
- private ZonedDateTime getExpirationTime(ScheduledJob job) {
- return job.getJob().getExpirationTime();
+ return runAsync(() -> jobMap.remove(key));
}
@Override
- public Publisher getByTime(ZonedDateTime expirationTime) {
- return ReactiveStreams.fromIterable(jobTimeMap.getOrDefault(expirationTime, Collections.emptyList()))
- .map(this::get)
- .flatMapCompletionStage(j -> j)
- .buildRs();
- }
-
- private CompletionStage runAsync(Supplier function) {
- final CompletableFuture future = new CompletableFuture<>();
- vertx.runOnContext((v) -> future.complete(function.get()));
- return future;
+ public PublisherBuilder findAll() {
+ return ReactiveStreams.fromIterable(jobMap.values());
}
}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/JobRepositoryDelegate.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/JobRepositoryDelegate.java
new file mode 100644
index 00000000000..a073b5f36e5
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/impl/JobRepositoryDelegate.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.impl;
+
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Any;
+import javax.enterprise.inject.Default;
+import javax.enterprise.inject.Instance;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.qualifier.Repository;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.infinispan.InfinispanConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Default
+@ApplicationScoped
+public class JobRepositoryDelegate implements ReactiveJobRepository {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobRepositoryDelegate.class);
+
+ private ReactiveJobRepository delegate;
+
+ JobRepositoryDelegate() {
+ }
+
+ @Inject
+ public JobRepositoryDelegate(@Any Instance instances,
+ @ConfigProperty(name = InfinispanConfiguration.PERSISTENCE_CONFIG_KEY)
+ Optional persistence) {
+ delegate = instances.select(new Repository.Literal(persistence.orElse("in-memory"))).get();
+ LOGGER.info("JobRepository selected {}", delegate.getClass());
+ }
+
+ @Override
+ public CompletionStage save(ScheduledJob job) {
+ return delegate.save(job);
+ }
+
+ @Override
+ public CompletionStage get(String id) {
+ return delegate.get(id);
+ }
+
+ @Override
+ public CompletionStage exists(String id) {
+ return delegate.exists(id);
+ }
+
+ @Override
+ public CompletionStage delete(String id) {
+ return delegate.delete(id);
+ }
+
+ @Override
+ public PublisherBuilder findByStatus(JobStatus... status) {
+ return delegate.findByStatus(status);
+ }
+
+ @Override
+ public PublisherBuilder findAll() {
+ return delegate.findAll();
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanConfiguration.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanConfiguration.java
new file mode 100644
index 00000000000..a09e3291f06
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanConfiguration.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.infinispan;
+
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
+import javax.enterprise.inject.Instance;
+import javax.inject.Inject;
+
+import io.quarkus.runtime.StartupEvent;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ApplicationScoped
+public class InfinispanConfiguration {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanConfiguration.class);
+ public static final String PERSISTENCE_CONFIG_KEY = "kogito.job-service.persistence";
+ private final Configuration config = new ConfigurationBuilder().build();
+
+ /**
+ * Constants for Caches
+ */
+ public static class Caches {
+
+ public static final String SCHEDULED_JOBS = "SCHEDULED_JOBS";
+
+ public static String[] ALL() {
+ return new String[]{SCHEDULED_JOBS};
+ }
+ }
+
+ private Optional cacheManager;
+
+ @Inject
+ public InfinispanConfiguration(Instance cacheManagerInstance,
+ @ConfigProperty(name = PERSISTENCE_CONFIG_KEY)
+ Optional persistence) {
+
+ LOGGER.info("Persistence config {}", persistence);
+ this.cacheManager = persistence
+ .filter("infinispan"::equals)
+ .map(p -> cacheManagerInstance.get());
+ }
+
+ CompletionStage onStart(@Observes StartupEvent startupEvent) {
+ return ReactiveStreams.of(Caches.ALL())
+ .forEach(name -> cacheManager
+ .map(RemoteCacheManager::administration)
+ .ifPresent(adm -> adm.getOrCreateCache(name, config)))
+ .run()
+ .thenAccept(c -> LOGGER.info("Executed Infinispan configuration"));
+ }
+}
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
new file mode 100644
index 00000000000..01ac1cb5589
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.infinispan;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import io.quarkus.infinispan.client.Remote;
+import io.vertx.core.Vertx;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.Search;
+import org.infinispan.query.dsl.QueryFactory;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.qualifier.Repository;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
+
+import static org.kie.kogito.jobs.service.repository.infinispan.InfinispanConfiguration.Caches.SCHEDULED_JOBS;
+
+@ApplicationScoped
+@Repository("infinispan")
+public class InfinispanJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository {
+
+ private RemoteCache cache;
+ private QueryFactory queryFactory;
+
+ InfinispanJobRepository() {
+ super(null);
+ }
+
+ @Inject
+ public InfinispanJobRepository(Vertx vertx,
+ @Remote(SCHEDULED_JOBS) RemoteCache cache) {
+ super(vertx);
+ this.cache = cache;
+ this.queryFactory = Search.getQueryFactory(cache);
+ }
+
+ @Override
+ public CompletionStage save(ScheduledJob job) {
+ return runAsync(() -> cache.put(job.getJob().getId(), job))
+ .thenCompose(j -> get(job.getJob().getId()));
+ }
+
+ @Override
+ public CompletionStage get(String id) {
+ return runAsync(() -> cache.get(id));
+ }
+
+ @Override
+ public CompletionStage exists(String id) {
+ return runAsync(() -> cache.containsKey(id));
+ }
+
+ @Override
+ public CompletionStage delete(String id) {
+ return runAsync(() -> cache
+ .withFlags(Flag.FORCE_RETURN_VALUE)
+ .remove(id));
+ }
+
+ @Override
+ public PublisherBuilder findAll() {
+ return ReactiveStreams
+ .fromIterable(Optional.ofNullable(cache)
+ .>map(RemoteCache::values)
+ .orElse(Collections.emptyList()));
+ }
+
+ @Override
+ public PublisherBuilder findByStatus(JobStatus... status) {
+ return ReactiveStreams.fromIterable(queryFactory.from(ScheduledJob.class)
+ .having("status")
+ .in(Arrays.stream(status)
+ .map(JobStatus::name)
+ .collect(Collectors.toList()))
+ .maxResults(50000)
+ .build()
+ .list());
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/BaseMarshaller.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/BaseMarshaller.java
new file mode 100644
index 00000000000..a9c32bf1b7c
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/BaseMarshaller.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.infinispan.marshaller;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+
+import org.infinispan.protostream.MessageMarshaller;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+public abstract class BaseMarshaller implements MessageMarshaller {
+
+ public String getPackage() {
+ return "job.service";
+ }
+
+ protected Instant zonedDateTimeToInstant(ZonedDateTime dateTime) {
+ return Optional.ofNullable(dateTime).map(ZonedDateTime::toInstant).orElse(null);
+ }
+
+ protected ZonedDateTime instantToZonedDateTime(Instant instant) throws IOException {
+ return Optional.ofNullable(instant)
+ .map(i -> ZonedDateTime.ofInstant(i, DateUtil.DEFAULT_ZONE))
+ .orElse(null);
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/JobMarshaller.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/JobMarshaller.java
new file mode 100644
index 00000000000..e57fc8556f0
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/JobMarshaller.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.infinispan.marshaller;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+import org.kie.kogito.jobs.api.Job;
+import org.kie.kogito.jobs.api.JobBuilder;
+
+public class JobMarshaller extends BaseMarshaller {
+
+ @Override
+ public String getTypeName() {
+ return getPackage() + ".Job";
+ }
+
+ @Override
+ public Class extends Job> getJavaClass() {
+ return Job.class;
+ }
+
+ @Override
+ public void writeTo(ProtoStreamWriter writer, Job job) throws IOException {
+ writer.writeString("id", job.getId());
+ writer.writeString("callbackEndpoint", job.getCallbackEndpoint());
+ writer.writeInstant("expirationTime", zonedDateTimeToInstant(job.getExpirationTime()));
+ writer.writeInt("priority", job.getPriority());
+ writer.writeString("processId", job.getProcessId());
+ writer.writeString("processInstanceId", job.getProcessInstanceId());
+ writer.writeString("rootProcessId", job.getRootProcessId());
+ writer.writeString("rootProcessInstanceId", job.getRootProcessInstanceId());
+ }
+
+ @Override
+ public Job readFrom(ProtoStreamReader reader) throws IOException {
+ String id = reader.readString("id");
+ String callbackEndpoint = reader.readString("callbackEndpoint");
+ ZonedDateTime expirationTime = instantToZonedDateTime(reader.readInstant("expirationTime"));
+ Integer priority = reader.readInt("priority");
+ String processId = reader.readString("processId");
+ String processInstanceId = reader.readString("processInstanceId");
+ String rootProcessId = reader.readString("rootProcessId");
+ String rootProcessInstanceId = reader.readString("rootProcessInstanceId");
+ return JobBuilder.builder()
+ .callbackEndpoint(callbackEndpoint)
+ .id(id)
+ .expirationTime(expirationTime)
+ .priority(priority)
+ .processId(processId)
+ .processInstanceId(processInstanceId)
+ .rootProcessId(rootProcessId)
+ .rootProcessInstanceId(rootProcessInstanceId)
+ .build();
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/MarshallersProducer.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/MarshallersProducer.java
new file mode 100644
index 00000000000..7014acbe286
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/MarshallersProducer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.infinispan.marshaller;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Produces;
+
+import org.infinispan.protostream.MessageMarshaller;
+
+@ApplicationScoped
+public class MarshallersProducer {
+
+ @Produces
+ public MessageMarshaller scheduledJobMarshaller() {
+ return new ScheduledJobMarshaller();
+ }
+
+ @Produces
+ public MessageMarshaller jobMarshaller() {
+ return new JobMarshaller();
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/ScheduledJobMarshaller.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/ScheduledJobMarshaller.java
new file mode 100644
index 00000000000..f0a8871580e
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/ScheduledJobMarshaller.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.infinispan.marshaller;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+import org.kie.kogito.jobs.api.Job;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+
+public class ScheduledJobMarshaller extends BaseMarshaller {
+
+ @Override
+ public String getTypeName() {
+ return getPackage() + ".ScheduledJob";
+ }
+
+ @Override
+ public Class extends ScheduledJob> getJavaClass() {
+ return ScheduledJob.class;
+ }
+
+ @Override
+ public void writeTo(ProtoStreamWriter writer, ScheduledJob scheduledJob) throws IOException {
+ writer.writeString("scheduledId", scheduledJob.getScheduledId());
+ writer.writeObject("job", scheduledJob.getJob(), Job.class);
+ writer.writeInt("retries", scheduledJob.getRetries());
+ writer.writeString("status", scheduledJob.getStatus().name());
+ writer.writeInstant("lastUpdate", zonedDateTimeToInstant(scheduledJob.getLastUpdate()));
+
+ }
+
+ @Override
+ public ScheduledJob readFrom(ProtoStreamReader reader) throws IOException {
+ String scheduledId = reader.readString("scheduledId");
+ Job job = reader.readObject("job", Job.class);
+ Integer retries = reader.readInt("retries");
+ JobStatus status = JobStatus.valueOf(reader.readString("status"));
+ ZonedDateTime lastUpdate = instantToZonedDateTime(reader.readInstant("lastUpdate"));
+ return ScheduledJob.builder()
+ .scheduledId(scheduledId)
+ .retries(retries)
+ .status(status)
+ .job(job)
+ .lastUpdate(lastUpdate)
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/JobResource.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/JobResource.java
index 725f5a5bfcd..613402aa899 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/JobResource.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/JobResource.java
@@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.concurrent.CompletionStage;
+import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
@@ -37,6 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@ApplicationScoped
@Path("/job")
public class JobResource {
@@ -46,22 +48,18 @@ public class JobResource {
VertxJobScheduler scheduler;
@Inject
- ReactiveJobRepository reactiveJobRepository;
+ ReactiveJobRepository jobRepository;
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public CompletionStage create(Job job) {
LOGGER.debug("REST create {}", job);
- final CompletionStage response = ReactiveStreams.fromPublisher(scheduler.schedule(job))
+ return ReactiveStreams.fromPublisher(scheduler.schedule(job))
.map(ScheduledJob::getJob)
.findFirst()
.run()
- .thenApply(j -> j.orElseThrow(() -> new RuntimeException("Failed to schedule job " + job.getId())))
- .whenCompleteAsync((r, t) -> Optional
- .ofNullable(t)
- .ifPresent(ex -> LOGGER.error("Error Scheduling Job: {}. Details: {}", job, t)));
- return response;
+ .thenApply(j -> j.orElseThrow(() -> new RuntimeException("Failed to schedule job " + job)));
}
@DELETE
@@ -69,12 +67,12 @@ public CompletionStage create(Job job) {
@Path("/{id}")
public CompletionStage delete(@PathParam("id") String id) {
LOGGER.debug("REST delete id {}", id);
- return scheduler
+ return scheduler
.cancel(id)
.thenApply(result -> Optional
.ofNullable(result)
.map(ScheduledJob::getJob)
- .orElseThrow(() -> new RuntimeException("Failed to cancel job scheduling")));
+ .orElseThrow(() -> new RuntimeException("Failed to cancel job scheduling for jobId " + id)));
}
@GET
@@ -82,8 +80,17 @@ public CompletionStage delete(@PathParam("id") String id) {
@Path("/{id}")
public CompletionStage get(@PathParam("id") String id) {
LOGGER.debug("REST get {}", id);
- return reactiveJobRepository
+ return jobRepository
.get(id)
.thenApply(ScheduledJob::getJob);
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/scheduled/{id}")
+ public CompletionStage getScheduledJob(@PathParam("id") String id) {
+ LOGGER.debug("REST get {}", id);
+ return jobRepository
+ .get(id);
+ }
}
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/error/DefaultErrorMapper.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/error/DefaultErrorMapper.java
index 42b52908511..76efc918d3f 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/error/DefaultErrorMapper.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/resource/error/DefaultErrorMapper.java
@@ -25,12 +25,17 @@
import javax.ws.rs.ext.Provider;
import org.kie.kogito.jobs.service.model.ErrorResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Provider
public class DefaultErrorMapper implements ExceptionMapper {
+ private final static Logger LOGGER = LoggerFactory.getLogger(DefaultErrorMapper.class);
+
@Override
public Response toResponse(Exception exception) {
+ LOGGER.error("Sending error response", exception);
return Response.status(Optional.ofNullable(exception)
.filter(WebApplicationException.class::isInstance)
.map(WebApplicationException.class::cast)
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
index 11459a990e8..8cb61b19fa6 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
@@ -17,19 +17,23 @@
package org.kie.kogito.jobs.service.scheduler;
import java.time.Duration;
-import java.time.ZoneId;
import java.time.ZonedDateTime;
-import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.inject.Inject;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.service.executor.JobExecutor;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.utils.DateUtil;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +44,13 @@
*/
public abstract class BaseTimerJobScheduler implements ReactiveJobScheduler {
- private Logger logger = LoggerFactory.getLogger(BaseTimerJobScheduler.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseTimerJobScheduler.class);
+
+ @ConfigProperty(name = "kogito.job-service.backoffRetryMillis", defaultValue = "1000")
+ long backoffRetryMillis;
+
+ @ConfigProperty(name = "kogito.job-service.maxIntervalLimitToRetryMillis", defaultValue = "60000")
+ long maxIntervalLimitToRetryMillis;
@Inject
JobExecutor jobExecutor;
@@ -50,42 +60,171 @@ public abstract class BaseTimerJobScheduler implements ReactiveJobScheduler schedule(Job job) {
+ LOGGER.debug("Scheduling {}", job);
return ReactiveStreams
//1- check if the job is already scheduled
.fromCompletionStage(jobRepository.exists(job.getId()))
- .flatMapCompletionStage(exists -> exists
- ? cancel(job.getId()).thenApply(Objects::nonNull)
- : CompletableFuture.completedFuture(Boolean.TRUE))
+ .flatMap(exists -> exists
+ ? handleExistingJob(job)
+ : ReactiveStreams.of(Boolean.TRUE))
.filter(Boolean.TRUE::equals)
//2- calculate the delay (when the job should be executed)
.map(checked -> job.getExpirationTime())
- .map(expirationTime -> Duration.between(ZonedDateTime.now(ZoneId.of("UTC")), expirationTime))
+ .map(expirationTime -> calculateDelay(expirationTime))
//3- schedule the job
.map(delay -> doSchedule(delay, job))
.flatMapRsPublisher(p -> p)
+ .map(scheduleId -> ScheduledJob
+ .builder()
+ .job(job)
+ .scheduledId(scheduleId)
+ .status(JobStatus.SCHEDULED)
+ .build())
.map(scheduledJob -> jobRepository.save(scheduledJob))
.flatMapCompletionStage(p -> p)
.buildRs();
}
- public abstract Publisher doSchedule(Duration delay, Job job);
+ private PublisherBuilder handleExistingJob(Job job) {
+ //always returns true, canceling in case the job is already schedule
+ return ReactiveStreams.fromCompletionStage(jobRepository.get(job.getId()))
+ //handle scheduled and retry cases
+ .flatMap(
+ j -> {
+ switch (j.getStatus()) {
+ case SCHEDULED:
+ return handleExpirationTime(j)
+ .map(CompletableFuture::completedFuture)
+ .map(this::cancel);
+ case RETRY:
+ return handleRetry(CompletableFuture.completedFuture(j));
+ default:
+ //empty to break the stream processing
+ return ReactiveStreams.empty();
+ }
+ })
+ .map(j -> Boolean.TRUE)
+ .onErrorResumeWith(t -> ReactiveStreams.empty());
+ }
+
+ private Duration calculateDelay(ZonedDateTime expirationTime) {
+ return Duration.between(DateUtil.now(), expirationTime);
+ }
+
+ @Override
+ public PublisherBuilder handleJobExecutionSuccess(JobExecutionResponse response) {
+ return ReactiveStreams.of(response)
+ .map(JobExecutionResponse::getJobId)
+ .flatMapCompletionStage(jobRepository::get)
+ .map(scheduledJob -> ScheduledJob
+ .builder()
+ .of(scheduledJob)
+ .status(JobStatus.EXECUTED)
+ .build())
+ //final state, removing the job
+ .map(ScheduledJob::getJob)
+ .map(Job::getId)
+ .flatMapCompletionStage(jobRepository::delete);
+ }
+
+ private boolean notExpired(ZonedDateTime expirationTime) {
+ return !isExpired(expirationTime);
+ }
+
+ private boolean isExpired(ZonedDateTime expirationTime) {
+ final Duration limit = Duration.ofMillis(maxIntervalLimitToRetryMillis);
+ return calculateDelay(expirationTime).plus(limit).isNegative();
+ }
+
+ private PublisherBuilder handleExpirationTime(ScheduledJob scheduledJob) {
+ return ReactiveStreams.of(scheduledJob)
+ .map(ScheduledJob::getJob)
+ .map(Job::getExpirationTime)
+ .flatMapCompletionStage(time -> isExpired(time)
+ ? handleExpiredJob(scheduledJob)
+ : CompletableFuture.completedFuture(scheduledJob));
+ }
+
+ /**
+ * Retries to schedule the job execution with a backoff time of {@link BaseTimerJobScheduler#backoffRetryMillis}
+ * between retries and a limit of max interval of {@link BaseTimerJobScheduler#maxIntervalLimitToRetryMillis}
+ * to retry, after this interval it the job it the job is not successfully executed it will remain in error
+ * state, with no more retries.
+ * @param errorResponse
+ * @return
+ */
+ @Override
+ public PublisherBuilder handleJobExecutionError(JobExecutionResponse errorResponse) {
+ return handleRetry(jobRepository.get(errorResponse.getJobId()));
+ }
+
+ private PublisherBuilder handleRetry(CompletionStage futureJob) {
+ return ReactiveStreams.fromCompletionStage(futureJob)
+ .flatMap(scheduledJob -> handleExpirationTime(scheduledJob)
+ .map(ScheduledJob::getStatus)
+ .filter(s -> !JobStatus.ERROR.equals(s))
+ .map(time -> doSchedule(Duration.ofMillis(backoffRetryMillis), scheduledJob.getJob()))
+ .flatMapRsPublisher(p -> p)
+ .map(scheduleId -> ScheduledJob
+ .builder()
+ .of(scheduledJob)
+ .scheduledId(scheduleId)
+ .status(JobStatus.RETRY)
+ .incrementRetries()
+ .build())
+ .map(jobRepository::save)
+ .flatMapCompletionStage(p -> p));
+ }
+
+ private CompletionStage handleExpiredJob(ScheduledJob scheduledJob) {
+ return Optional.of(ScheduledJob.builder()
+ .of(scheduledJob)
+ .status(JobStatus.ERROR)
+ .build())
+ //final state, removing the job
+ .map(j -> Optional.of(j)
+ .map(ScheduledJob::getJob)
+ .map(Job::getId)
+ .map(id -> jobRepository
+ .delete(id)
+ .thenApply(deleted -> j)))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .orElse(null);
+ }
+
+ public abstract Publisher doSchedule(Duration delay, Job job);
protected CompletionStage execute(Job job) {
- logger.info("Job executed ! {}", job);
+ LOGGER.info("Job executed ! {}", job);
return jobExecutor.execute(job);
}
- @Override
- public CompletionStage cancel(String jobId) {
- logger.debug("Cancel Job Scheduling {}", jobId);
+ public CompletionStage cancel(CompletionStage futureJob) {
return ReactiveStreams
- .fromCompletionStageNullable(jobRepository.get(jobId))
- .flatMapRsPublisher(this::doCancel)
- .filter(Boolean.TRUE::equals)
- .map(r -> jobRepository.delete(jobId))
+ .fromCompletionStageNullable(futureJob)
+ .peek(job -> LOGGER.debug("Cancel Job Scheduling {}", job))
+ .filter(scheduledJob -> JobStatus.SCHEDULED.equals(scheduledJob.getStatus()))
+ .flatMap(scheduledJob -> ReactiveStreams.of(scheduledJob)
+ .flatMapRsPublisher(this::doCancel)
+ .filter(Boolean.TRUE::equals)
+ .map(c -> ScheduledJob
+ .builder()
+ .of(scheduledJob)
+ .status(JobStatus.CANCELED)
+ .build())
+ //final state, removing the job
+ .map(ScheduledJob::getJob)
+ .map(Job::getId)
+ .flatMapCompletionStage(jobRepository::delete))
.findFirst()
.run()
- .thenCompose(job -> job.orElseThrow(()-> new RuntimeException("Failed to cancel job scheduling " + jobId)));
+ .thenApply(job -> job.orElse(null));
+ }
+
+ @Override
+ public CompletionStage cancel(String jobId) {
+ return cancel(jobRepository.get(jobId));
}
public abstract Publisher doCancel(ScheduledJob scheduledJob);
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
new file mode 100644
index 00000000000..7db1a2789ad
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.scheduler;
+
+import java.util.concurrent.CompletionStage;
+
+import javax.enterprise.event.Observes;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import io.quarkus.runtime.StartupEvent;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.scheduler.impl.VertxJobScheduler;
+import org.kie.kogito.jobs.service.utils.ErrorHandling;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class JobSchedulerManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobSchedulerManager.class);
+
+ @Inject
+ VertxJobScheduler scheduler;
+
+ @Inject
+ ReactiveJobRepository repository;
+
+ CompletionStage loadScheduledJobs(@Observes StartupEvent startupEvent) {
+ LOGGER.info("Loading scheduled jobs");
+ return repository.findByStatus(JobStatus.SCHEDULED, JobStatus.RETRY)
+ .map(ScheduledJob::getJob)
+ //is is necessary to skip error on the publisher to continue processing, otherwise the subscribe
+ // terminated
+ .flatMapRsPublisher(t -> ErrorHandling.skipErrorPublisher(scheduler::schedule, t))
+ .onError(ex -> LOGGER.error("Error loading jobs", ex))
+ .forEach(a -> {
+ LOGGER.info("Loaded and scheduled job {}", a);
+ })
+ .run()
+ .thenAccept(c -> LOGGER.info("Loading scheduled jobs completed !"));
+ }
+}
+
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java
index 63948fc2930..77d9f92bbd8 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java
@@ -18,7 +18,9 @@
import java.util.concurrent.CompletionStage;
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.api.Job;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.reactivestreams.Publisher;
@@ -27,4 +29,9 @@ public interface ReactiveJobScheduler extends JobScheduler, Comp
Publisher schedule(Job job);
CompletionStage cancel(String jobId);
+
+ PublisherBuilder handleJobExecutionError(JobExecutionResponse errorResponse);
+
+ PublisherBuilder handleJobExecutionSuccess(JobExecutionResponse errorResponse);
+
}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxJobScheduler.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxJobScheduler.java
index 47cb9a941ab..5f38dea7852 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxJobScheduler.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxJobScheduler.java
@@ -36,21 +36,21 @@
@ApplicationScoped
public class VertxJobScheduler extends BaseTimerJobScheduler {
- private Logger logger = LoggerFactory.getLogger(VertxJobScheduler.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(VertxJobScheduler.class);
@Inject
Vertx vertx;
@Override
- public Publisher doSchedule(Duration delay, Job job) {
- logger.debug("Job Scheduling {}", job);
+ public Publisher doSchedule(Duration delay, Job job) {
+ LOGGER.debug("Job Scheduling {}", job);
return ReactiveStreams
.of(setTimer(delay, job))
- .map(id -> new ScheduledJob(job, String.valueOf(id)))
+ .map(String::valueOf)
.buildRs();
}
- private long setTimer(Duration delay, Job job) {
+ private long setTimer(Duration delay, Job job) {vertx.setPeriodic(1000, i->{});
return vertx.setTimer(delay.toMillis(), scheduledId -> execute(job));
}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/stream/AvailableStreams.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/stream/AvailableStreams.java
new file mode 100644
index 00000000000..a136d4359c9
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/stream/AvailableStreams.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.stream;
+
+public final class AvailableStreams {
+
+ public static final String JOB_ERROR = "job-error";
+ public static final String JOB_SUCCESS = "job-success";
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
new file mode 100644
index 00000000000..f34224cace8
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.stream;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that configure the Consumers for Job Streams,like Job Executed, Job Error... and execute the actions for each
+ * received item.
+ */
+@ApplicationScoped
+public class JobStreams {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobStreams.class);
+
+ private ReactiveJobScheduler jobScheduler;
+
+ @Inject
+ public JobStreams(ReactiveJobScheduler jobScheduler) {
+ this.jobScheduler = jobScheduler;
+ }
+
+ @Incoming(AvailableStreams.JOB_ERROR)
+ @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
+ public void jobErrorProcessor(JobExecutionResponse error) {
+ LOGGER.warn("Error received {}", error);
+ jobScheduler.handleJobExecutionError(error)
+ .findFirst()
+ .run()
+ .thenAccept(job -> LOGGER.info("Rescheduled {}", job.orElse(null)));
+ }
+
+ @Incoming(AvailableStreams.JOB_SUCCESS)
+ @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
+ public void jobSuccessProcessor(JobExecutionResponse response) {
+ LOGGER.warn("Success received {}", response);
+ jobScheduler.handleJobExecutionSuccess(response)
+ .findFirst()
+ .run();
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/executor/RetryJobExecutor.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/utils/DateUtil.java
similarity index 69%
rename from addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/executor/RetryJobExecutor.java
rename to addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/utils/DateUtil.java
index b74a118f965..982cdb40964 100644
--- a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/executor/RetryJobExecutor.java
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/utils/DateUtil.java
@@ -14,16 +14,16 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.executor;
+package org.kie.kogito.jobs.service.utils;
-import java.util.concurrent.CompletionStage;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
-import org.kie.kogito.jobs.api.Job;
+public class DateUtil {
-public class RetryJobExecutor implements JobExecutor {
+ public static final ZoneId DEFAULT_ZONE = ZoneId.of("UTC");
- @Override
- public CompletionStage execute(Job job) {
- return null;
+ public static ZonedDateTime now() {
+ return ZonedDateTime.now(DEFAULT_ZONE);
}
}
diff --git a/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/utils/ErrorHandling.java b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/utils/ErrorHandling.java
new file mode 100644
index 00000000000..4f362d6d861
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/java/org/kie/kogito/jobs/service/utils/ErrorHandling.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.utils;
+
+import java.util.function.Function;
+
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ErrorHandling {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ErrorHandling.class);
+
+ /**
+ * Utility method that receives execute function that returns a {@link Publisher} and skip any the error element,
+ * returning an empty item. It can be used while processing Reactive Streams when it is necessary to continue the
+ * processing even with an error on some operation on the Stream.
+ *
+ *
+ * @param function Function to be executed
+ * @param input Input object
+ * @param return type
+ * @param input type
+ * @return
+ */
+ public static Publisher skipErrorPublisher(Function super T, Publisher> function, T input) {
+ return ReactiveStreams
+ .fromPublisher(function.apply(input))
+ .onError(t -> LOGGER.warn("Error skipped when processing {}.", input, t))
+ .onErrorResumeWithRsPublisher((t) -> ReactiveStreams.empty().buildRs())
+ .buildRs();
+ }
+}
diff --git a/addons/jobs/jobs-service/src/main/resources/META-INF/beans.xml b/addons/jobs/jobs-service/src/main/resources/META-INF/beans.xml
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/addons/jobs/jobs-service/src/main/resources/META-INF/hotrod-client.properties b/addons/jobs/jobs-service/src/main/resources/META-INF/hotrod-client.properties
new file mode 100644
index 00000000000..491c70f9241
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/resources/META-INF/hotrod-client.properties
@@ -0,0 +1,20 @@
+#
+# Copyright 2019 Red Hat, Inc. and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Docker 4 Mac workaround
+infinispan.client.hotrod.client_intelligence=BASIC
+# https://docs.jboss.org/infinispan/9.4/apidocs/org/infinispan/client/hotrod/configuration/package-summary.html
+# https://infinispan.org/docs/dev/user_guide/user_guide.html#configuration_10
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/main/resources/META-INF/library.proto b/addons/jobs/jobs-service/src/main/resources/META-INF/library.proto
new file mode 100644
index 00000000000..841bbd58173
--- /dev/null
+++ b/addons/jobs/jobs-service/src/main/resources/META-INF/library.proto
@@ -0,0 +1,20 @@
+package job.service;
+
+message ScheduledJob {
+ optional string scheduledId = 1;
+ optional Job job = 2;
+ optional int32 retries = 3; // no native Date type available in Protobuf
+ required string status = 4;
+ optional int64 lastUpdate = 5;
+}
+
+message Job {
+ required string id = 1;
+ optional string callbackEndpoint = 2;
+ optional int64 expirationTime = 3;
+ optional int32 priority = 4;
+ optional string processId = 5;
+ optional string processInstanceId = 6;
+ optional string rootProcessId = 7;
+ optional string rootProcessInstanceId = 8;
+}
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/main/resources/application.properties b/addons/jobs/jobs-service/src/main/resources/application.properties
index dd004efa9eb..720715c9daa 100644
--- a/addons/jobs/jobs-service/src/main/resources/application.properties
+++ b/addons/jobs/jobs-service/src/main/resources/application.properties
@@ -15,30 +15,45 @@
#
#Log Config
-quarkus.log.level=ERROR
+quarkus.log.level=INFO
quarkus.log.category."org.kie.kogito.jobs".level=DEBUG
##Console
quarkus.log.console.enable=true
-quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %h %-5p [%c] (%t) %s%e%n
+quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %h %-5p [%c:%L] (%t) %s%e%n
quarkus.log.console.color=true
-quarkus.log.console.async=false
+quarkus.log.console.async=true
##File
quarkus.log.file.enable=true
-quarkus.log.file.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %h %-5p [%c] (%t) %s%e%n
+quarkus.log.file.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %h %-5p [%c:%L] (%t) %s%e%n
quarkus.log.file.path=jobs-service.log
quarkus.log.file.async=true
quarkus.log.file.rotation.file-suffix=yyyy-MM-dd
#Web Config
quarkus.ssl.native=true
-quarkus.resteasy.gzip.enabled=true
-quarkus.resteasy.gzip.max-input=10M
+quarkus.resteasy.gzip.enabled=true
+quarkus.resteasy.gzip.max-input=10M
quarkus.http.cors=true
quarkus.http.cors.methods=GET,PUT,POST,DELETE
quarkus.http.limits.max-body-size=10M
+quarkus.http.port=8080
#Swagger
quarkus.smallrye-openapi.path=/swagger
quarkus.swagger-ui.always-include=true
-quarkus.swagger-ui.path=/swagger-ui
\ No newline at end of file
+quarkus.swagger-ui.path=/swagger-ui
+
+#Infinispan - more specific configs on hotrod-client.properties file.
+quarkus.infinispan-client.server-list=localhost:11222
+quarkus.infinispan-client.auth-username=${infinispan_username:}
+quarkus.infinispan-client.auth-password=${infinispan_password:}
+quarkus.infinispan-client.use-auth=${infinispan_useauth:false}
+quarkus.infinispan-client.auth-realm=${infinispan_authrealm:}
+quarkus.infinispan-client.sasl-mechanism=${infinispan_saslmechanism:}
+
+#Job Service
+#Persistence values = in-memory, infinispan
+kogito.job-service.persistence=in-memory
+kogito.job-service.maxIntervalLimitToRetryMillis=60000
+kogito.job-service.backoffRetryMillis=1000
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/test/java/org/kie/kogito/jobs/service/resource/InfinispanServerTestResource.java b/addons/jobs/jobs-service/src/test/java/org/kie/kogito/jobs/service/resource/InfinispanServerTestResource.java
new file mode 100644
index 00000000000..9801d8c5734
--- /dev/null
+++ b/addons/jobs/jobs-service/src/test/java/org/kie/kogito/jobs/service/resource/InfinispanServerTestResource.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.resource;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.infinispan.commons.dataconversion.MediaType;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.cache.Index;
+import org.infinispan.configuration.global.GlobalConfiguration;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.server.core.admin.embeddedserver.EmbeddedServerAdminOperationHandler;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.server.hotrod.configuration.HotRodServerConfiguration;
+import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder;
+import org.kie.kogito.jobs.service.repository.infinispan.InfinispanConfiguration;
+
+public class InfinispanServerTestResource implements QuarkusTestResourceLifecycleManager {
+
+ private HotRodServer hotRodServer;
+ private EmbeddedCacheManager cacheManager;
+ private static final Integer PORT = 11232;
+
+ @Override
+ public Map start() {
+ Configuration configuration = new ConfigurationBuilder()
+ .encoding()
+ .key()
+ .mediaType(MediaType.APPLICATION_PROTOSTREAM_TYPE)
+ .encoding()
+ .value()
+ .mediaType(MediaType.APPLICATION_PROTOSTREAM_TYPE)
+ .indexing().index(Index.PRIMARY_OWNER).addProperty("default.directory_provider", "local-heap")
+ .build();
+
+ GlobalConfiguration globalConfig = new GlobalConfigurationBuilder()
+ .defaultCacheName("default")
+ .serialization()
+ .build();
+
+ cacheManager = new DefaultCacheManager(globalConfig, configuration);
+
+ hotRodServer = new HotRodServer();
+ HotRodServerConfiguration cfg = new HotRodServerConfigurationBuilder()
+ .host("localhost")
+ .proxyHost("localhost")
+ .port(PORT)
+ .proxyPort(PORT)
+ .adminOperationsHandler(new EmbeddedServerAdminOperationHandler())
+ .build();
+ hotRodServer.start(cfg, cacheManager);
+
+ Stream.of(InfinispanConfiguration.Caches.SCHEDULED_JOBS)
+ .forEach(name -> cacheManager.administration().getOrCreateCache(name, configuration));
+
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void stop() {
+ Optional.ofNullable(hotRodServer)
+ .ifPresent(HotRodServer::stop);
+ }
+}
diff --git a/addons/jobs/jobs-service/src/test/java/org/kie/kogito/jobs/service/resource/JobResourceTest.java b/addons/jobs/jobs-service/src/test/java/org/kie/kogito/jobs/service/resource/JobResourceTest.java
index 6574dc2563d..1956734e3ac 100644
--- a/addons/jobs/jobs-service/src/test/java/org/kie/kogito/jobs/service/resource/JobResourceTest.java
+++ b/addons/jobs/jobs-service/src/test/java/org/kie/kogito/jobs/service/resource/JobResourceTest.java
@@ -16,42 +16,53 @@
package org.kie.kogito.jobs.service.resource;
-import static io.restassured.RestAssured.given;
-import static org.hamcrest.CoreMatchers.equalTo;
-
+import java.io.IOException;
+import java.time.ZoneId;
import java.time.ZonedDateTime;
import javax.inject.Inject;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.kie.kogito.jobs.api.Job;
-import org.kie.kogito.jobs.api.JobBuilder;
-
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-
+import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import io.restassured.response.ValidatableResponse;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.kie.kogito.jobs.api.Job;
+import org.kie.kogito.jobs.api.JobBuilder;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+import static io.restassured.RestAssured.given;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
@QuarkusTest
-class JobResourceTest {
+@QuarkusTestResource(InfinispanServerTestResource.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class JobResourceTest {
- @Inject
- private ObjectMapper mapper;
+ private JobResourceTest() {
- @BeforeEach
- void setUp() {
}
+ @Inject
+ private ObjectMapper objectMapper;
+
@Test
void create() throws Exception {
- final String body = getJob("1");
- create(body);
+ final Job job = getJob("1");
+ final Job response = create(jobToJson(job))
+ .extract()
+ .as(Job.class);
+ assertEquals(job, response);
}
- private ValidatableResponse create(String body) {
+ private ValidatableResponse create(String body) throws IOException {
return given()
.contentType(ContentType.JSON)
.body(body)
@@ -59,48 +70,45 @@ private ValidatableResponse create(String body) {
.post("/job")
.then()
.statusCode(200)
- .contentType(ContentType.JSON)
- .assertThat()
- .body(equalTo(body))
- .log()
- .body();
+ .contentType(ContentType.JSON);
+ }
+
+ private String jobToJson(Job job) throws JsonProcessingException {
+ return objectMapper.writeValueAsString(job);
}
- private String getJob(String id) throws JsonProcessingException {
- final Job job = JobBuilder
+ private Job getJob(String id) {
+ return JobBuilder
.builder()
.id(id)
- .expirationTime(ZonedDateTime.now().plusMinutes(1))
+ .expirationTime(DateUtil.now().plusSeconds(10))
+ .callbackEndpoint("http://localhost:8081/callback")
.priority(1)
.build();
- String str = mapper.writeValueAsString(job);
- System.out.println(str);
- return str;
}
@Test
void deleteAfterCreate() throws Exception {
final String id = "2";
- final String body = getJob(id);
- create(body);
- given().pathParam("id", id)
+ final Job job = getJob(id);
+ create(jobToJson(job));
+ final Job response = given().pathParam("id", id)
.when()
.delete("/job/{id}")
.then()
.statusCode(200)
.contentType(ContentType.JSON)
- .assertThat()
- .body(equalTo(body))
- .log()
- .body();
+ .extract()
+ .as(Job.class);
+ assertEquals(job, response);
}
@Test
void getAfterCreate() throws Exception {
final String id = "3";
- final String body = getJob(id);
- create(body);
- given()
+ final Job job = getJob(id);
+ create(jobToJson(job));
+ final Job scheduledJob = given()
.pathParam("id", id)
.when()
.get("/job/{id}")
@@ -108,8 +116,30 @@ void getAfterCreate() throws Exception {
.statusCode(200)
.contentType(ContentType.JSON)
.assertThat()
- .body(equalTo(body))
- .log()
- .body();
+ .extract()
+ .as(Job.class);
+ assertEquals(scheduledJob, job);
+
+ }
+
+ @Test
+ void executeTest() throws Exception {
+ final String id = "4";
+ final Job job = getJob(id);
+ create(jobToJson(job));
+ final ScheduledJob scheduledJob = given()
+ .pathParam("id", id)
+ .when()
+ .get("/job/scheduled/{id}")
+ .then()
+ .statusCode(200)
+ .contentType(ContentType.JSON)
+ .assertThat()
+ .extract()
+ .as(ScheduledJob.class);
+ assertEquals(scheduledJob.getJob(), job);
+ assertEquals(scheduledJob.getRetries(), 0);
+ assertEquals(scheduledJob.getStatus(), JobStatus.SCHEDULED);
+ assertNotNull(scheduledJob.getScheduledId());
}
}
\ No newline at end of file
diff --git a/addons/jobs/jobs-service/src/test/resources/application.properties b/addons/jobs/jobs-service/src/test/resources/application.properties
new file mode 100644
index 00000000000..e0c04ccb250
--- /dev/null
+++ b/addons/jobs/jobs-service/src/test/resources/application.properties
@@ -0,0 +1,18 @@
+#
+# Copyright 2019 Red Hat, Inc. and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#Infinispan
+quarkus.infinispan-client.server-list=localhost:11232