Skip to content

Commit

Permalink
KOGITO-530/531 Jobs persistence and retries on error (#2)
Browse files Browse the repository at this point in the history
* KOGITO-18 - Pluggable timer/jobs service that can be used as service

* Initial Job Service implementation

Initial Job Service implementation

inserting swagger

* Apply PR comments and some code refactoring, test

* Fix JobScheduler when cancelling job

* Fixing cancel already scheduled job when re-scheduling

* Job retry implementation

* Inserting Infinispan persistence on Job service

* Inserting Infinispan persistence on Job service

* fix infinispan repository

* Inserting persistence selector based on config property

* Fix scheduler manager and infinispan repository to work with config

* Fix JobResourceTest

* Removed fixed version on pom

* Update plugin on job-service pom

* Fix comments on the PR

* Job repository type by config, remove jobs on final state, add retry jobs when load job on startup
  • Loading branch information
tiagodolphine authored and mswiderski committed Nov 19, 2019
1 parent c31b617 commit 668bcf4
Show file tree
Hide file tree
Showing 37 changed files with 1,648 additions and 166 deletions.
16 changes: 16 additions & 0 deletions addons/jobs/jobs-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,20 @@
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
<goals>
<goal>jandex</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Empty file.
3 changes: 2 additions & 1 deletion addons/jobs/jobs-service/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ target/
/*.ipr
/*.iws
*.iml
*.log
/*.log**



63 changes: 60 additions & 3 deletions addons/jobs/jobs-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
<version>8.0.0-SNAPSHOT</version>
</parent>
<artifactId>jobs-service</artifactId>
<name>Jobs Service</name>
<description>Jobs Service (Timers and Async Jobs)</description>

<dependencies>
<dependency>
<groupId>org.kie.kogito</groupId>
Expand All @@ -36,6 +33,12 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
Expand All @@ -45,6 +48,23 @@
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>

<!-- Messaging Streams -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
</dependency>

<!-- Persistence -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-infinispan-client</artifactId>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-query-dsl</artifactId>
</dependency>


<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -57,7 +77,32 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-server-hotrod</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-remote-query-server</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
<name>Jobs Service</name>
<description>Jobs Service (Timers and Async Jobs)</description>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bom</artifactId>
<version>${version.io.quarkus}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -124,6 +169,18 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
<goals>
<goal>jandex</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
import io.vertx.axle.core.Vertx;
import io.vertx.axle.core.buffer.Buffer;
import io.vertx.axle.ext.web.client.HttpResponse;
import io.vertx.axle.ext.web.client.WebClient;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.service.stream.AvailableStreams;
import org.kie.kogito.jobs.service.converters.HttpConverters;
import org.kie.kogito.jobs.service.model.HTTPRequestCallback;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,27 +54,64 @@ public class HttpJobExecutor implements JobExecutor {
@Inject
HttpConverters httpConverters;

/**
* Publish on Stream of Job Error events
*/
@Inject
ReactiveJobRepository jobRepository;
@Channel(AvailableStreams.JOB_ERROR)
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
Emitter<JobExecutionResponse> jobErrorEmitter;

/**
* Publish on Stream of Job Success events
*/
@Inject
@Channel(AvailableStreams.JOB_SUCCESS)
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
Emitter<JobExecutionResponse> jobSuccessEmitter;

@PostConstruct
void initialize() {
this.client = WebClient.create(vertx);
}

private CompletionStage<Boolean> executeCallback(HTTPRequestCallback request) {
private CompletionStage<HttpResponse<Buffer>> executeCallback(HTTPRequestCallback request) {
LOGGER.info("Executing callback {}", request);
final URL url = httpConverters.convertURL(request.getUrl());
return client.request(httpConverters.convertHttpMethod(request.getMethod()),
url.getPort(),
url.getHost(),
url.getPath())
.send()
.thenApplyAsync(response -> Optional
.ofNullable(response.statusCode())
.filter(new Integer(200)::equals)
.map(code -> Boolean.TRUE)
.orElse(Boolean.FALSE));
.send();
}

private String getResponseCode(HttpResponse<Buffer> response) {
return Optional.ofNullable(response.statusCode())
.map(String::valueOf)
.orElse(null);
}

private PublisherBuilder<JobExecutionResponse> handleResponse(JobExecutionResponse response) {
LOGGER.info("handle response {}", response);
return ReactiveStreams.of(response)
.map(JobExecutionResponse::getCode)
.flatMap(code -> code.equals("200")
? handleSuccess(response)
: handleError(response));
}

private PublisherBuilder<JobExecutionResponse> handleError(JobExecutionResponse response) {
LOGGER.info("handle error {}", response);
return ReactiveStreams.of(response)
.peek(jobErrorEmitter::send)
.peek(r -> LOGGER.info("Error executing job {}.", r));
}

private PublisherBuilder<JobExecutionResponse> handleSuccess(JobExecutionResponse response) {
LOGGER.info("handle success {}", response);
return ReactiveStreams.of(response)
.peek(jobSuccessEmitter::send)
.peek(r -> LOGGER.info("Success executing job {}.", r));
}

@Override
Expand All @@ -77,16 +122,25 @@ public CompletionStage<Job> execute(Job job) {
.method(HTTPRequestCallback.HTTPMethod.POST)
.build();

return executeCallback(callback)
.thenApply(result -> {
LOGGER.info("Response of executed job {} {}", result, job);
jobRepository.delete(job.getId());
return job;
})
//handle error
.exceptionally(ex -> {
LOGGER.error("Error executing job " + job, ex);
return ReactiveStreams.fromCompletionStage(executeCallback(callback))
.map(response -> JobExecutionResponse.builder()
.message(response.statusMessage())
.code(getResponseCode(response))
.now()
.jobId(job.getId())
.build())
.flatMap(this::handleResponse)
.findFirst()
.run()
.thenApply(response -> job)
.exceptionally((ex) -> {
LOGGER.error("Generic error executing job {}", job, ex);
jobErrorEmitter.send(JobExecutionResponse.builder()
.message(ex.getMessage())
.now()
.jobId(job.getId())
.build());
return job;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@
package org.kie.kogito.jobs.service.json;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import io.quarkus.jackson.ObjectMapperCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class JacksonConfiguration implements ObjectMapperCustomizer {


@Override
public void customize(ObjectMapper objectMapper) {
objectMapper.registerModule(new JavaTimeModule());
objectMapper.findAndRegisterModules();
objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
public class JacksonConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(JacksonConfiguration.class);

@Singleton
@Produces
public ObjectMapperCustomizer customizer() {
return (objectMapper) -> {
LOGGER.info("Jackson customization initialized.");
objectMapper.registerModule(new JavaTimeModule());
objectMapper.findAndRegisterModules();
objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
};
}

}
}
Loading

0 comments on commit 668bcf4

Please sign in to comment.