-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a116226
commit ccf2d83
Showing
15 changed files
with
291 additions
and
65 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
151 changes: 151 additions & 0 deletions
151
esgrpc/src/main/java/org/fuin/esc/esgrpc/GrpcProjectionAdminEventStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package org.fuin.esc.esgrpc; | ||
|
||
import com.eventstore.dbclient.CreateProjectionOptions; | ||
import com.eventstore.dbclient.DeleteProjectionOptions; | ||
import com.eventstore.dbclient.EventStoreDBProjectionManagementClient; | ||
import io.grpc.Status; | ||
import io.grpc.StatusRuntimeException; | ||
import jakarta.validation.constraints.NotNull; | ||
import org.fuin.esc.api.*; | ||
import org.fuin.esc.spi.ProjectionJavaScriptBuilder; | ||
import org.fuin.esc.spi.TenantStreamId; | ||
import org.fuin.objects4j.common.ConstraintViolationException; | ||
import org.fuin.objects4j.common.Contract; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
/** | ||
* GRPC based eventstore projection admin implementation. | ||
*/ | ||
public final class GrpcProjectionAdminEventStore implements ProjectionAdminEventStore { | ||
|
||
private final EventStoreDBProjectionManagementClient es; | ||
|
||
private final TenantId tenantId; | ||
|
||
/** | ||
* Constructor with mandatory data. | ||
* | ||
* @param es Eventstore client to use. | ||
*/ | ||
public GrpcProjectionAdminEventStore(EventStoreDBProjectionManagementClient es) { | ||
this(es, null); | ||
} | ||
|
||
/** | ||
* Constructor with all data. | ||
* | ||
* @param es Eventstore client to use. | ||
* @param tenantId Tenant ID or {@literal null}. | ||
*/ | ||
public GrpcProjectionAdminEventStore(EventStoreDBProjectionManagementClient es, TenantId tenantId) { | ||
Contract.requireArgNotNull("es", es); | ||
this.es = es; | ||
this.tenantId = tenantId; | ||
} | ||
|
||
@Override | ||
public ProjectionAdminEventStore open() { | ||
// Do nothing | ||
return this; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
// Do nothing | ||
} | ||
|
||
@Override | ||
public boolean projectionExists(StreamId projectionId) { | ||
Contract.requireArgNotNull("projectionId", projectionId); | ||
requireProjection(projectionId); | ||
|
||
try { | ||
es.getStatus(new TenantStreamId(tenantId, projectionId).asString()).get(); | ||
return true; | ||
} catch (final InterruptedException | ExecutionException ex) { // NOSONAR | ||
if (ex.getCause() instanceof StatusRuntimeException sre) { | ||
if (sre.getStatus().getCode().equals(Status.UNKNOWN.getCode()) | ||
&& sre.getMessage() != null && sre.getMessage().contains("NotFound")) { | ||
return false; | ||
} | ||
} | ||
throw new RuntimeException("Error waiting for getStatus(..) result", ex); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public void enableProjection(StreamId projectionId) throws StreamNotFoundException { | ||
Contract.requireArgNotNull("projectionId", projectionId); | ||
requireProjection(projectionId); | ||
|
||
try { | ||
es.enable(new TenantStreamId(tenantId, projectionId).asString()).get(); | ||
} catch (final InterruptedException | ExecutionException ex) { // NOSONAR | ||
throw new RuntimeException("Error waiting for enable(..) result", ex); | ||
} | ||
} | ||
|
||
@Override | ||
public void disableProjection(StreamId projectionId) throws StreamNotFoundException { | ||
Contract.requireArgNotNull("projectionId", projectionId); | ||
requireProjection(projectionId); | ||
|
||
try { | ||
es.disable(new TenantStreamId(tenantId, projectionId).asString()).get(); | ||
} catch (final InterruptedException | ExecutionException ex) { // NOSONAR | ||
throw new RuntimeException("Error waiting for disable(..) result", ex); | ||
} | ||
} | ||
|
||
@Override | ||
public void createProjection(StreamId projectionId, boolean enable, @NotNull TypeName... eventType) throws StreamAlreadyExistsException { | ||
createProjection(projectionId, enable, Arrays.asList(eventType)); | ||
} | ||
|
||
@Override | ||
public void createProjection(StreamId projectionId, boolean enable, List<TypeName> eventTypes) throws StreamAlreadyExistsException { | ||
Contract.requireArgNotNull("projectionId", projectionId); | ||
requireProjection(projectionId); | ||
|
||
final TenantStreamId pid = new TenantStreamId(tenantId, projectionId); | ||
final String javascript = new ProjectionJavaScriptBuilder(pid).types(eventTypes).build(); | ||
try { | ||
es.create(pid.asString(), javascript, CreateProjectionOptions.get().emitEnabled(false).trackEmittedStreams(true)).get(); | ||
} catch (final InterruptedException | ExecutionException ex) { // NOSONAR | ||
throw new RuntimeException("Error waiting for create(..) result", ex); | ||
} | ||
if (enable) { | ||
enableProjection(pid); | ||
} else { | ||
// Workaround for https://github.com/EventStore/EventStoreDB-Client-Java/issues/259 (not a perfect one...) | ||
disableProjection(pid); | ||
} | ||
} | ||
|
||
@Override | ||
public void deleteProjection(StreamId projectionId) throws StreamNotFoundException { | ||
Contract.requireArgNotNull("projectionId", projectionId); | ||
requireProjection(projectionId); | ||
|
||
disableProjection(projectionId); | ||
|
||
final TenantStreamId pid = new TenantStreamId(tenantId, projectionId); | ||
try { | ||
es.delete(pid.asString(), DeleteProjectionOptions.get().deleteCheckpointStream().deleteStateStream().deleteEmittedStreams()).get(); | ||
} catch (final InterruptedException | ExecutionException ex) { // NOSONAR | ||
throw new RuntimeException("Error waiting for delete(..) result", ex); | ||
} | ||
|
||
} | ||
|
||
static void requireProjection(final StreamId projectionId) { | ||
if (!projectionId.isProjection()) { | ||
throw new ConstraintViolationException("The stream identifier is not a projection id"); | ||
} | ||
} | ||
|
||
} |
88 changes: 88 additions & 0 deletions
88
esgrpc/src/test/java/org/fuin/esc/esgrpc/GrpcProjectionAdminEventStoreIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package org.fuin.esc.esgrpc; | ||
|
||
import com.eventstore.dbclient.EventStoreDBClientSettings; | ||
import com.eventstore.dbclient.EventStoreDBConnectionString; | ||
import com.eventstore.dbclient.EventStoreDBProjectionManagementClient; | ||
import org.fuin.esc.api.ProjectionStreamId; | ||
import org.fuin.esc.api.TypeName; | ||
import org.junit.jupiter.api.BeforeAll; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.net.MalformedURLException; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
/** | ||
* Tests the {@link GrpcProjectionAdminEventStore} class. | ||
*/ | ||
class GrpcProjectionAdminEventStoreIT { | ||
|
||
private static EventStoreDBProjectionManagementClient client; | ||
|
||
private GrpcProjectionAdminEventStore testee; | ||
|
||
@BeforeAll | ||
static void beforeAll() { | ||
final EventStoreDBClientSettings setts = EventStoreDBConnectionString | ||
.parseOrThrow("esdb://localhost:2113?tls=false"); | ||
client = EventStoreDBProjectionManagementClient.create(setts); | ||
} | ||
|
||
@BeforeEach | ||
void beforeEach() throws MalformedURLException { | ||
testee = new GrpcProjectionAdminEventStore(client, null); | ||
} | ||
|
||
@Test | ||
void testProjectionNotExists() { | ||
assertThat(testee.projectionExists(new ProjectionStreamId("grpc-test-not-existing"))).isFalse(); | ||
} | ||
|
||
@Test | ||
void testEnableDisableProjection() { | ||
|
||
// GIVEN | ||
final ProjectionStreamId projectionId = new ProjectionStreamId("grpc-test-disabled"); | ||
testee.createProjection(projectionId, false, new TypeName("one"), new TypeName("two")); | ||
|
||
// WHEN | ||
testee.enableProjection(projectionId); | ||
|
||
// THEN | ||
// TODO assertThat(testee.projectionEnabled()).isTrue(); | ||
|
||
} | ||
|
||
@Test | ||
void testCreateAndExistsProjection() { | ||
|
||
// GIVEN | ||
final ProjectionStreamId projectionId = new ProjectionStreamId("grpc-test-create"); | ||
assertThat(testee.projectionExists(projectionId)).isFalse(); | ||
|
||
// WHEN | ||
testee.createProjection(projectionId, true, new TypeName("one"), new TypeName("two")); | ||
|
||
// THEN | ||
assertThat(testee.projectionExists(projectionId)).isTrue(); | ||
|
||
} | ||
|
||
@Test | ||
void testDeleteProjection() { | ||
|
||
// GIVEN | ||
final ProjectionStreamId projectionId = new ProjectionStreamId("grpc-test-delete"); | ||
testee.createProjection(projectionId, false, new TypeName("one"), new TypeName("two")); | ||
assertThat(testee.projectionExists(projectionId)).isTrue(); | ||
|
||
// WHEN | ||
testee.deleteProjection(projectionId); | ||
|
||
// THEN | ||
assertThat(testee.projectionExists(projectionId)).isFalse(); | ||
|
||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.