From 619c4123d7aac0a8bab2625db4dd8708ff3c6a5d Mon Sep 17 00:00:00 2001 From: Kaili Zhu Date: Thu, 30 Nov 2023 20:52:14 +0900 Subject: [PATCH] Add error handling (#15) --- apis | 2 +- build.gradle | 16 +-- .../java/io/xcherry/core/client/Client.java | 69 +++++------ .../java/io/xcherry/core/context/Context.java | 4 +- .../xcherry/core/exception/HttpException.java | 6 +- .../xcherry/core/persistence/Persistence.java | 112 ++++++++---------- .../PersistenceTableRowToUpsert.java | 6 +- .../schema/PersistenceTableSchema.java | 16 +-- .../PersistenceTableSchemaToLoadData.java | 12 +- .../core/process/ProcessStartConfig.java | 4 +- .../io/xcherry/core/utils/ProcessUtil.java | 61 +++++----- .../io/xcherry/core/worker/WorkerService.java | 73 ++++++++++-- .../worker/WorkerServiceResponseEntity.java | 18 +++ .../integ/spring/WorkerApiController.java | 25 ++-- 14 files changed, 230 insertions(+), 194 deletions(-) create mode 100644 src/main/java/io/xcherry/core/worker/WorkerServiceResponseEntity.java diff --git a/apis b/apis index 9ff80b8..349f7e0 160000 --- a/apis +++ b/apis @@ -1 +1 @@ -Subproject commit 9ff80b8ba2c1bf8390f123e35707c63f56ef4d63 +Subproject commit 349f7e04593c11f4c1cdca552c1fe46ba2a811e4 diff --git a/build.gradle b/build.gradle index 27cccdd..68a9e70 100644 --- a/build.gradle +++ b/build.gradle @@ -28,17 +28,11 @@ java { withSourcesJar() } -//tasks.withType(Javadoc) { -// // Set the source files for which Javadoc will be generated -// source = sourceSets.main.allJava -// -// // Set the destination directory for the generated Javadoc -// destinationDir = file("$buildDir/docs/javadoc") -// -// // Specify the files or packages to be excluded from Javadoc generation -// exclude '**/AsyncStateOptions.java' -// exclude '**/ProcessOptions.java' -//} +tasks.withType(Javadoc) { + // Specify the files or packages to be excluded from Javadoc generation + exclude '**/AsyncStateOptions.java' + exclude '**/ProcessOptions.java' +} repositories { mavenCentral() diff --git a/src/main/java/io/xcherry/core/client/Client.java b/src/main/java/io/xcherry/core/client/Client.java index 4119214..0244700 100644 --- a/src/main/java/io/xcherry/core/client/Client.java +++ b/src/main/java/io/xcherry/core/client/Client.java @@ -14,17 +14,12 @@ import io.xcherry.core.rpc.RpcInterceptor; import io.xcherry.core.state.AsyncState; import io.xcherry.core.utils.ProcessUtil; -import io.xcherry.gen.models.GlobalAttributeConfig; -import io.xcherry.gen.models.GlobalAttributeTableConfig; import io.xcherry.gen.models.LocalQueueMessage; import io.xcherry.gen.models.ProcessExecutionDescribeResponse; import io.xcherry.gen.models.ProcessExecutionStartRequest; import io.xcherry.gen.models.ProcessExecutionStopType; import io.xcherry.gen.models.PublishToLocalQueueRequest; -import io.xcherry.gen.models.TableColumnValue; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import net.bytebuddy.ByteBuddy; @@ -316,28 +311,28 @@ private io.xcherry.gen.models.ProcessStartConfig toApiModel( return null; } - final GlobalAttributeConfig globalAttributeConfig; - if (globalAttributesToUpsert.isEmpty()) { - globalAttributeConfig = null; - } else { - globalAttributeConfig = new GlobalAttributeConfig(); - - for (final PersistenceTableRowToUpsert tableRowToUpsert : globalAttributesToUpsert.values()) { - globalAttributeConfig.addTableConfigsItem( - new GlobalAttributeTableConfig() - .tableName(tableRowToUpsert.getTableName()) - // TODO - // .primaryKey(toApiModel(tableRowToUpsert.getPrimaryKeyColumns())) - .initialWrite(toApiModel(tableRowToUpsert.getOtherColumns())) - .initialWriteMode(tableRowToUpsert.getWriteConflictMode()) - ); - } - } + // final GlobalAttributeConfig globalAttributeConfig; + // if (globalAttributesToUpsert.isEmpty()) { + // globalAttributeConfig = null; + // } else { + // globalAttributeConfig = new GlobalAttributeConfig(); + // + // for (final PersistenceTableRowToUpsert tableRowToUpsert : globalAttributesToUpsert.values()) { + // globalAttributeConfig.addTableConfigsItem( + // new GlobalAttributeTableConfig() + // .tableName(tableRowToUpsert.getTableName()) + // // TODO + // // .primaryKey(toApiModel(tableRowToUpsert.getPrimaryKeyColumns())) + // .initialWrite(toApiModel(tableRowToUpsert.getOtherColumns())) + // .initialWriteMode(tableRowToUpsert.getWriteConflictMode()) + // ); + // } + // } return new io.xcherry.gen.models.ProcessStartConfig() .timeoutSeconds(processStartConfig.getTimeoutSeconds()) - .idReusePolicy(processStartConfig.getProcessIdReusePolicy()) - .globalAttributeConfig(globalAttributeConfig); + .idReusePolicy(processStartConfig.getProcessIdReusePolicy()); + // .globalAttributeConfig(globalAttributeConfig); } private void validateThePersistenceSchema( @@ -417,19 +412,19 @@ private void validateThePersistenceSchema( }); } - private List toApiModel(final Map columnNameToValueMap) { - final List columns = new ArrayList<>(); - - columnNameToValueMap.forEach((k, v) -> { - columns.add( - new TableColumnValue() - .dbColumn(k) - .dbQueryValue(clientOptions.getDatabaseStringEncoder().encodeToString(v)) - ); - }); - - return columns; - } + // private List toApiModel(final Map columnNameToValueMap) { + // final List columns = new ArrayList<>(); + // + // columnNameToValueMap.forEach((k, v) -> { + // columns.add( + // new TableColumnValue() + // .dbColumn(k) + // .dbQueryValue(clientOptions.getDatabaseStringEncoder().encodeToString(v)) + // ); + // }); + // + // return columns; + // } /** * Publish message(s) to the local queue for consumption by the process execution. diff --git a/src/main/java/io/xcherry/core/context/Context.java b/src/main/java/io/xcherry/core/context/Context.java index 8fb38bd..3533679 100644 --- a/src/main/java/io/xcherry/core/context/Context.java +++ b/src/main/java/io/xcherry/core/context/Context.java @@ -1,6 +1,6 @@ package io.xcherry.core.context; -import io.xcherry.gen.models.StateApiType; +import io.xcherry.gen.models.WorkerApiType; import lombok.Builder; import lombok.Getter; @@ -15,7 +15,7 @@ public class Context { private final long firstAttemptTimestamp; private final int attempt; private final String recoverFromStateExecutionId; - private final StateApiType recoverFromApi; + private final WorkerApiType recoverFromApi; public static Context fromApiModel(final io.xcherry.gen.models.Context context) { return Context diff --git a/src/main/java/io/xcherry/core/exception/HttpException.java b/src/main/java/io/xcherry/core/exception/HttpException.java index 5aabf91..fa5da2d 100644 --- a/src/main/java/io/xcherry/core/exception/HttpException.java +++ b/src/main/java/io/xcherry/core/exception/HttpException.java @@ -7,6 +7,7 @@ import io.xcherry.core.exception.status.ProcessNotFoundException; import io.xcherry.gen.models.ApiErrorResponse; import io.xcherry.gen.models.EncodedObject; +import io.xcherry.gen.models.ErrorSubType; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Optional; @@ -37,9 +38,10 @@ public HttpException(final ObjectEncoder objectEncoder, final FeignException.Fei } } - // TODO apiErrorResponse = - new ApiErrorResponse().detail("empty or unable to decode to apiErrorResponse: " + decodeErrorMessage); + new ApiErrorResponse() + .errorSubType(ErrorSubType.UNCATEGORIZED_ERROR) + .details("empty or unable to decode to apiErrorResponse: " + decodeErrorMessage); } public static HttpException fromFeignException( diff --git a/src/main/java/io/xcherry/core/persistence/Persistence.java b/src/main/java/io/xcherry/core/persistence/Persistence.java index ea82229..a27eb05 100644 --- a/src/main/java/io/xcherry/core/persistence/Persistence.java +++ b/src/main/java/io/xcherry/core/persistence/Persistence.java @@ -1,16 +1,10 @@ package io.xcherry.core.persistence; -import com.google.common.collect.ImmutableList; import io.xcherry.core.encoder.base.DatabaseStringEncoder; import io.xcherry.core.exception.persistence.GlobalAttributeNotFoundException; import io.xcherry.core.persistence.schema.PersistenceSchema; -import io.xcherry.gen.models.GlobalAttributeTableRowUpdate; -import io.xcherry.gen.models.LoadGlobalAttributeResponse; -import io.xcherry.gen.models.TableColumnValue; -import io.xcherry.gen.models.TableReadResponse; -import java.util.ArrayList; +import io.xcherry.gen.models.AppDatabaseReadResponse; import java.util.HashMap; -import java.util.List; import java.util.Map; public class Persistence { @@ -27,43 +21,42 @@ public class Persistence { private final DatabaseStringEncoder databaseStringEncoder; public Persistence( - final LoadGlobalAttributeResponse loadGlobalAttributeResponse, + final AppDatabaseReadResponse appDatabaseReadResponse, final PersistenceSchema persistenceSchema, final DatabaseStringEncoder databaseStringEncoder ) { this.databaseStringEncoder = databaseStringEncoder; - if (loadGlobalAttributeResponse == null) { + if (appDatabaseReadResponse == null) { return; } - - final List tableResponses = loadGlobalAttributeResponse.getTableResponses() == null - ? ImmutableList.of() - : loadGlobalAttributeResponse.getTableResponses(); - - for (final TableReadResponse tableResponse : tableResponses) { - if (!globalAttributes.containsKey(tableResponse.getTableName())) { - globalAttributes.put(tableResponse.getTableName(), new HashMap<>()); - } - - final List columns = tableResponse.getColumns() == null - ? ImmutableList.of() - : tableResponse.getColumns(); - - for (final TableColumnValue column : columns) { - final Class columnValueType = persistenceSchema.getGlobalAttributeColumnValueType( - tableResponse.getTableName(), - column.getDbColumn() - ); - - globalAttributes - .get(tableResponse.getTableName()) - .put( - column.getDbColumn(), - databaseStringEncoder.decodeFromString(column.getDbQueryValue(), columnValueType) - ); - } - } + // final List tableResponses = loadGlobalAttributeResponse.getTableResponses() == null + // ? ImmutableList.of() + // : loadGlobalAttributeResponse.getTableResponses(); + // + // for (final TableReadResponse tableResponse : tableResponses) { + // if (!globalAttributes.containsKey(tableResponse.getTableName())) { + // globalAttributes.put(tableResponse.getTableName(), new HashMap<>()); + // } + // + // final List columns = tableResponse.getColumns() == null + // ? ImmutableList.of() + // : tableResponse.getColumns(); + // + // for (final TableColumnValue column : columns) { + // final Class columnValueType = persistenceSchema.getGlobalAttributeColumnValueType( + // tableResponse.getTableName(), + // column.getDbColumn() + // ); + // + // globalAttributes + // .get(tableResponse.getTableName()) + // .put( + // column.getDbColumn(), + // databaseStringEncoder.decodeFromString(column.getDbQueryValue(), columnValueType) + // ); + // } + // } } /** @@ -107,27 +100,26 @@ public void upsertGlobalAttribute(final String tableName, final String columnNam globalAttributesToUpdate.get(tableName).put(columnName, columnValue); } - - public List getGlobalAttributesToUpsert(final PersistenceSchema schema) { - final List globalAttributes = new ArrayList<>(); - - globalAttributesToUpdate.forEach((tableName, columnsToUpdate) -> { - final List columns = new ArrayList<>(); - - columnsToUpdate.forEach((columnName, value) -> { - // Just trying to get the value type to make sure this is a valid column defined in the schema - final Class columnValueType = schema.getGlobalAttributeColumnValueType(tableName, columnName); - - columns.add( - new TableColumnValue() - .dbColumn(columnName) - .dbQueryValue(databaseStringEncoder.encodeToString(value)) - ); - }); - - globalAttributes.add(new GlobalAttributeTableRowUpdate().tableName(tableName).updateColumns(columns)); - }); - - return globalAttributes; - } + // public List getGlobalAttributesToUpsert(final PersistenceSchema schema) { + // final List globalAttributes = new ArrayList<>(); + // + // globalAttributesToUpdate.forEach((tableName, columnsToUpdate) -> { + // final List columns = new ArrayList<>(); + // + // columnsToUpdate.forEach((columnName, value) -> { + // // Just trying to get the value type to make sure this is a valid column defined in the schema + // final Class columnValueType = schema.getGlobalAttributeColumnValueType(tableName, columnName); + // + // columns.add( + // new TableColumnValue() + // .dbColumn(columnName) + // .dbQueryValue(databaseStringEncoder.encodeToString(value)) + // ); + // }); + // + // globalAttributes.add(new GlobalAttributeTableRowUpdate().tableName(tableName).updateColumns(columns)); + // }); + // + // return globalAttributes; + // } } diff --git a/src/main/java/io/xcherry/core/persistence/PersistenceTableRowToUpsert.java b/src/main/java/io/xcherry/core/persistence/PersistenceTableRowToUpsert.java index e81d932..71a705b 100644 --- a/src/main/java/io/xcherry/core/persistence/PersistenceTableRowToUpsert.java +++ b/src/main/java/io/xcherry/core/persistence/PersistenceTableRowToUpsert.java @@ -1,6 +1,6 @@ package io.xcherry.core.persistence; -import io.xcherry.gen.models.AttributeWriteConflictMode; +import io.xcherry.gen.models.WriteConflictMode; import java.util.Map; import lombok.AccessLevel; import lombok.Getter; @@ -19,7 +19,7 @@ public class PersistenceTableRowToUpsert { * column name: column value */ private final Map otherColumns; - private final AttributeWriteConflictMode writeConflictMode; + private final WriteConflictMode writeConflictMode; /** * Create a table row to upsert. @@ -34,7 +34,7 @@ public static PersistenceTableRowToUpsert create( final String tableName, final Map primaryKeyColumns, final Map otherColumns, - final AttributeWriteConflictMode writeConflictMode + final WriteConflictMode writeConflictMode ) { return new PersistenceTableRowToUpsert(tableName, primaryKeyColumns, otherColumns, writeConflictMode); } diff --git a/src/main/java/io/xcherry/core/persistence/schema/PersistenceTableSchema.java b/src/main/java/io/xcherry/core/persistence/schema/PersistenceTableSchema.java index 35b34ad..f7e192f 100644 --- a/src/main/java/io/xcherry/core/persistence/schema/PersistenceTableSchema.java +++ b/src/main/java/io/xcherry/core/persistence/schema/PersistenceTableSchema.java @@ -2,7 +2,7 @@ import io.xcherry.core.exception.persistence.GlobalAttributeNotFoundException; import io.xcherry.core.persistence.schema_to_load.PersistenceTableSchemaToLoadData; -import io.xcherry.gen.models.TableReadLockingPolicy; +import io.xcherry.gen.models.DatabaseLockingType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -24,7 +24,7 @@ public class PersistenceTableSchema { * column name : column schema */ private final Map otherColumns; - private final TableReadLockingPolicy tableReadLockingPolicy; + private final DatabaseLockingType databaseLockingType; /** * Create a table schema to be used in {@link PersistenceSchema} with the default NO_LOCKING reading policy. @@ -37,20 +37,20 @@ public static PersistenceTableSchema create( final String tableName, final PersistenceTableColumnSchema... tableColumnSchemas ) { - return PersistenceTableSchema.create(tableName, TableReadLockingPolicy.NO_LOCKING, tableColumnSchemas); + return PersistenceTableSchema.create(tableName, DatabaseLockingType.NO_LOCKING, tableColumnSchemas); } /** * Create a table schema to be used in {@link PersistenceSchema}. * * @param tableName table name. - * @param tableReadLockingPolicy locking policy when reading the table. - * @param tableColumnSchemas the schema of all the table columns. + * @param databaseLockingType locking policy when reading the database. + * @param tableColumnSchemas the schema of all the table columns. * @return the created table schema. */ public static PersistenceTableSchema create( final String tableName, - final TableReadLockingPolicy tableReadLockingPolicy, + final DatabaseLockingType databaseLockingType, final PersistenceTableColumnSchema... tableColumnSchemas ) { final Map primaryKeyColumns = new HashMap<>(); @@ -64,7 +64,7 @@ public static PersistenceTableSchema create( } } - return new PersistenceTableSchema(tableName, primaryKeyColumns, otherColumns, tableReadLockingPolicy); + return new PersistenceTableSchema(tableName, primaryKeyColumns, otherColumns, databaseLockingType); } public Class getColumnValueType(final String columnName) { @@ -95,6 +95,6 @@ public PersistenceTableSchemaToLoadData getPersistenceTableSchemaToLoadData() { } }); - return PersistenceTableSchemaToLoadData.create(tableName, columnsToLoadData, tableReadLockingPolicy); + return PersistenceTableSchemaToLoadData.create(tableName, columnsToLoadData, databaseLockingType); } } diff --git a/src/main/java/io/xcherry/core/persistence/schema_to_load/PersistenceTableSchemaToLoadData.java b/src/main/java/io/xcherry/core/persistence/schema_to_load/PersistenceTableSchemaToLoadData.java index 362865c..c288f9e 100644 --- a/src/main/java/io/xcherry/core/persistence/schema_to_load/PersistenceTableSchemaToLoadData.java +++ b/src/main/java/io/xcherry/core/persistence/schema_to_load/PersistenceTableSchemaToLoadData.java @@ -1,6 +1,6 @@ package io.xcherry.core.persistence.schema_to_load; -import io.xcherry.gen.models.TableReadLockingPolicy; +import io.xcherry.gen.models.DatabaseLockingType; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -14,7 +14,7 @@ public class PersistenceTableSchemaToLoadData { private final String tableName; private final List columnNames; - private final TableReadLockingPolicy tableReadLockingPolicy; + private final DatabaseLockingType databaseLockingType; /** * Create a table schema to load with the default NO_LOCKING reading policy. @@ -38,7 +38,7 @@ public static PersistenceTableSchemaToLoadData create(final String tableName, fi * @return the created table schema to load. */ public static PersistenceTableSchemaToLoadData create(final String tableName, final List columnNames) { - return PersistenceTableSchemaToLoadData.create(tableName, columnNames, TableReadLockingPolicy.NO_LOCKING); + return PersistenceTableSchemaToLoadData.create(tableName, columnNames, DatabaseLockingType.NO_LOCKING); } /** @@ -46,14 +46,14 @@ public static PersistenceTableSchemaToLoadData create(final String tableName, fi * * @param tableName table name. * @param columnNames the columns to load. - * @param tableReadLockingPolicy locking policy when reading the table. + * @param databaseLockingType locking policy when reading the database. * @return the created table schema to load. */ public static PersistenceTableSchemaToLoadData create( final String tableName, final List columnNames, - final TableReadLockingPolicy tableReadLockingPolicy + final DatabaseLockingType databaseLockingType ) { - return new PersistenceTableSchemaToLoadData(tableName, columnNames, tableReadLockingPolicy); + return new PersistenceTableSchemaToLoadData(tableName, columnNames, databaseLockingType); } } diff --git a/src/main/java/io/xcherry/core/process/ProcessStartConfig.java b/src/main/java/io/xcherry/core/process/ProcessStartConfig.java index ba1a780..bebc677 100644 --- a/src/main/java/io/xcherry/core/process/ProcessStartConfig.java +++ b/src/main/java/io/xcherry/core/process/ProcessStartConfig.java @@ -1,8 +1,8 @@ package io.xcherry.core.process; import io.xcherry.core.persistence.PersistenceTableRowToUpsert; -import io.xcherry.gen.models.AttributeWriteConflictMode; import io.xcherry.gen.models.ProcessIdReusePolicy; +import io.xcherry.gen.models.WriteConflictMode; import java.util.HashMap; import java.util.Map; import lombok.Builder; @@ -32,7 +32,7 @@ public ProcessStartConfig initializeGlobalAttributes( final String tableName, final Map primaryKeyColumns, final Map otherColumns, - final AttributeWriteConflictMode writeConflictMode + final WriteConflictMode writeConflictMode ) { globalAttributesToUpsert.put( tableName, diff --git a/src/main/java/io/xcherry/core/utils/ProcessUtil.java b/src/main/java/io/xcherry/core/utils/ProcessUtil.java index c702976..7f261a9 100644 --- a/src/main/java/io/xcherry/core/utils/ProcessUtil.java +++ b/src/main/java/io/xcherry/core/utils/ProcessUtil.java @@ -1,15 +1,9 @@ package io.xcherry.core.utils; import io.xcherry.core.persistence.schema_to_load.PersistenceSchemaToLoadData; -import io.xcherry.core.persistence.schema_to_load.PersistenceTableSchemaToLoadData; import io.xcherry.core.process.Process; import io.xcherry.core.state.AsyncState; import io.xcherry.gen.models.AsyncStateConfig; -import io.xcherry.gen.models.LoadGlobalAttributesRequest; -import io.xcherry.gen.models.TableColumnDef; -import io.xcherry.gen.models.TableReadRequest; -import java.util.ArrayList; -import java.util.List; public class ProcessUtil { @@ -46,7 +40,7 @@ public static AsyncStateConfig getAsyncStateConfig(final AsyncState state, final ? process.getPersistenceSchema().getPersistenceSchemaToLoadData() : state.getOptions().getPersistenceSchemaToLoadData(); - asyncStateConfig = asyncStateConfig.loadGlobalAttributesRequest(toApiModel(persistenceSchemaToLoadData)); + // asyncStateConfig = asyncStateConfig.loadGlobalAttributesRequest(toApiModel(persistenceSchemaToLoadData)); if (state.getOptions() == null) { return asyncStateConfig; @@ -62,31 +56,30 @@ public static AsyncStateConfig getAsyncStateConfig(final AsyncState state, final return asyncStateConfig; } - - private static LoadGlobalAttributesRequest toApiModel( - final PersistenceSchemaToLoadData persistenceSchemaToLoadData - ) { - if (persistenceSchemaToLoadData == null || persistenceSchemaToLoadData.getGlobalAttributes().isEmpty()) { - return null; - } - - final LoadGlobalAttributesRequest loadGlobalAttributesRequest = new LoadGlobalAttributesRequest(); - - for (final PersistenceTableSchemaToLoadData globalAttribute : persistenceSchemaToLoadData.getGlobalAttributes()) { - final List columns = new ArrayList<>(); - - for (final String columnName : globalAttribute.getColumnNames()) { - columns.add(new TableColumnDef().dbColumn(columnName)); - } - - loadGlobalAttributesRequest.addTableRequestsItem( - new TableReadRequest() - .tableName(globalAttribute.getTableName()) - .lockingPolicy(globalAttribute.getTableReadLockingPolicy()) - .columns(columns) - ); - } - - return loadGlobalAttributesRequest; - } + // private static LoadGlobalAttributesRequest toApiModel( + // final PersistenceSchemaToLoadData persistenceSchemaToLoadData + // ) { + // if (persistenceSchemaToLoadData == null || persistenceSchemaToLoadData.getGlobalAttributes().isEmpty()) { + // return null; + // } + // + // final LoadGlobalAttributesRequest loadGlobalAttributesRequest = new LoadGlobalAttributesRequest(); + // + // for (final PersistenceTableSchemaToLoadData globalAttribute : persistenceSchemaToLoadData.getGlobalAttributes()) { + // final List columns = new ArrayList<>(); + // + // for (final String columnName : globalAttribute.getColumnNames()) { + // columns.add(new TableColumnDef().dbColumn(columnName)); + // } + // + // loadGlobalAttributesRequest.addTableRequestsItem( + // new TableReadRequest() + // .tableName(globalAttribute.getTableName()) + // .lockingPolicy(globalAttribute.getTableReadLockingPolicy()) + // .columns(columns) + // ); + // } + // + // return loadGlobalAttributesRequest; + // } } diff --git a/src/main/java/io/xcherry/core/worker/WorkerService.java b/src/main/java/io/xcherry/core/worker/WorkerService.java index 5f413f5..a475390 100644 --- a/src/main/java/io/xcherry/core/worker/WorkerService.java +++ b/src/main/java/io/xcherry/core/worker/WorkerService.java @@ -1,5 +1,7 @@ package io.xcherry.core.worker; +import static io.xcherry.core.worker.WorkerServiceResponseEntity.HTTP_STATUS_FAILED_DEPENDENCY; + import com.google.common.collect.ImmutableList; import io.xcherry.core.command.BaseCommand; import io.xcherry.core.command.CommandResults; @@ -22,6 +24,9 @@ import io.xcherry.gen.models.StateDecision; import io.xcherry.gen.models.StateMovement; import io.xcherry.gen.models.TimerCommand; +import io.xcherry.gen.models.WorkerErrorResponse; +import java.io.PrintWriter; +import java.io.StringWriter; import java.lang.reflect.Method; import java.util.List; import java.util.stream.Collectors; @@ -37,7 +42,32 @@ public class WorkerService { private final Registry registry; private final WorkerServiceOptions workerServiceOptions; - public AsyncStateWaitUntilResponse handleAsyncStateWaitUntil(final AsyncStateWaitUntilRequest request) { + public WorkerServiceResponseEntity handleAsyncStateWaitUntil(final AsyncStateWaitUntilRequest request) { + try { + return WorkerServiceResponseEntity.ok(handleAsyncStateWaitUntilInternal(request)); + } catch (final Exception e) { + return processWorkerException(e, HTTP_STATUS_FAILED_DEPENDENCY); + } + } + + public WorkerServiceResponseEntity handleAsyncStateExecute(final AsyncStateExecuteRequest request) { + try { + // TODO: handling 406 + return WorkerServiceResponseEntity.ok(handleAsyncStateExecuteInternal(request)); + } catch (final Exception e) { + return processWorkerException(e, HTTP_STATUS_FAILED_DEPENDENCY); + } + } + + public WorkerServiceResponseEntity handleProcessRpc(final ProcessRpcWorkerRequest request) { + try { + return WorkerServiceResponseEntity.ok(handleProcessRpcInternal(request)); + } catch (final Exception e) { + return processWorkerException(e, HTTP_STATUS_FAILED_DEPENDENCY); + } + } + + private AsyncStateWaitUntilResponse handleAsyncStateWaitUntilInternal(final AsyncStateWaitUntilRequest request) { final AsyncState state = registry.getProcessState(request.getProcessType(), request.getStateId()); final Object input = workerServiceOptions .getObjectEncoder() @@ -56,7 +86,7 @@ public AsyncStateWaitUntilResponse handleAsyncStateWaitUntil(final AsyncStateWai .publishToLocalQueue(communication.getLocalQueueMessagesToPublish()); } - public AsyncStateExecuteResponse handleAsyncStateExecute(final AsyncStateExecuteRequest request) { + private AsyncStateExecuteResponse handleAsyncStateExecuteInternal(final AsyncStateExecuteRequest request) { final AsyncState state = registry.getProcessState(request.getProcessType(), request.getStateId()); final Object input = workerServiceOptions .getObjectEncoder() @@ -64,7 +94,7 @@ public AsyncStateExecuteResponse handleAsyncStateExecute(final AsyncStateExecute final Communication communication = new Communication(workerServiceOptions.getObjectEncoder()); final Persistence persistence = new Persistence( - request.getLoadedGlobalAttributes(), + request.getAppDatabaseReadResponse(), registry.getPersistenceSchema(request.getProcessType()), workerServiceOptions.getDatabaseStringEncoder() ); @@ -79,13 +109,13 @@ public AsyncStateExecuteResponse handleAsyncStateExecute(final AsyncStateExecute return new AsyncStateExecuteResponse() .stateDecision(toApiModel(request.getProcessType(), stateDecision)) - .publishToLocalQueue(communication.getLocalQueueMessagesToPublish()) - .writeToGlobalAttributes( - persistence.getGlobalAttributesToUpsert(registry.getPersistenceSchema(request.getProcessType())) - ); + .publishToLocalQueue(communication.getLocalQueueMessagesToPublish()); + // .writeToGlobalAttributes( + // persistence.getGlobalAttributesToUpsert(registry.getPersistenceSchema(request.getProcessType())) + // ); } - public ProcessRpcWorkerResponse handleProcessRpc(final ProcessRpcWorkerRequest request) { + private ProcessRpcWorkerResponse handleProcessRpcInternal(final ProcessRpcWorkerRequest request) { final Process process = registry.getProcess(request.getProcessType()); final Method rpcMethod = registry.getRpcMethod(request.getProcessType(), request.getRpcName()); @@ -101,7 +131,7 @@ public ProcessRpcWorkerResponse handleProcessRpc(final ProcessRpcWorkerRequest r final Communication communication = new Communication(workerServiceOptions.getObjectEncoder()); final Persistence persistence = new Persistence( - request.getLoadedGlobalAttributes(), + request.getAppDatabaseReadResponse(), registry.getPersistenceSchema(request.getProcessType()), workerServiceOptions.getDatabaseStringEncoder() ); @@ -118,10 +148,10 @@ public ProcessRpcWorkerResponse handleProcessRpc(final ProcessRpcWorkerRequest r return new ProcessRpcWorkerResponse() .output(workerServiceOptions.getObjectEncoder().encodeToEncodedObject(output)) .stateDecision(toApiModel(request.getProcessType(), communication.getStateDecision())) - .publishToLocalQueue(communication.getLocalQueueMessagesToPublish()) - .writeToGlobalAttributes( - persistence.getGlobalAttributesToUpsert(registry.getPersistenceSchema(request.getProcessType())) - ); + .publishToLocalQueue(communication.getLocalQueueMessagesToPublish()); + // .writeToGlobalAttributes( + // persistence.getGlobalAttributesToUpsert(registry.getPersistenceSchema(request.getProcessType())) + // ); } private StateDecision toApiModel( @@ -180,4 +210,21 @@ private CommandRequest toApiModel(final io.xcherry.core.command.CommandRequest c return apiCommandRequest; } + + private WorkerServiceResponseEntity processWorkerException(final Exception e, final int statusCode) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + + String stackTrace = sw.toString(); // stack trace as a string + if (stackTrace.length() > 2000) { + stackTrace = stackTrace.substring(0, 2000) + "...(truncated)"; + } + + final WorkerErrorResponse workerErrorResponse = new WorkerErrorResponse() + .detail(e.getMessage() + "\n stacktrace: \n" + stackTrace) + .errorType(e.getClass().getName()); + + return new WorkerServiceResponseEntity(statusCode, workerErrorResponse); + } } diff --git a/src/main/java/io/xcherry/core/worker/WorkerServiceResponseEntity.java b/src/main/java/io/xcherry/core/worker/WorkerServiceResponseEntity.java new file mode 100644 index 0000000..c457c16 --- /dev/null +++ b/src/main/java/io/xcherry/core/worker/WorkerServiceResponseEntity.java @@ -0,0 +1,18 @@ +package io.xcherry.core.worker; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public class WorkerServiceResponseEntity { + + public static final int HTTP_STATUS_FAILED_DEPENDENCY = 424; + + private final int statusCode; + private final Object body; + + public static final WorkerServiceResponseEntity ok(final Object body) { + return new WorkerServiceResponseEntity(200, body); + } +} diff --git a/src/test/java/integ/spring/WorkerApiController.java b/src/test/java/integ/spring/WorkerApiController.java index 28a7631..7f21285 100644 --- a/src/test/java/integ/spring/WorkerApiController.java +++ b/src/test/java/integ/spring/WorkerApiController.java @@ -5,12 +5,10 @@ import static io.xcherry.core.worker.WorkerService.API_PATH_PROCESS_RPC; import io.xcherry.core.worker.WorkerService; +import io.xcherry.core.worker.WorkerServiceResponseEntity; import io.xcherry.gen.models.AsyncStateExecuteRequest; -import io.xcherry.gen.models.AsyncStateExecuteResponse; import io.xcherry.gen.models.AsyncStateWaitUntilRequest; -import io.xcherry.gen.models.AsyncStateWaitUntilResponse; import io.xcherry.gen.models.ProcessRpcWorkerRequest; -import io.xcherry.gen.models.ProcessRpcWorkerResponse; import lombok.RequiredArgsConstructor; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; @@ -24,23 +22,20 @@ public class WorkerApiController { private final WorkerService workerService; @PostMapping(API_PATH_ASYNC_STATE_WAIT_UNTIL) - public ResponseEntity handleAsyncStateWaitUntil( - final @RequestBody AsyncStateWaitUntilRequest request - ) { - return ResponseEntity.ok(workerService.handleAsyncStateWaitUntil(request)); + public ResponseEntity handleAsyncStateWaitUntil(final @RequestBody AsyncStateWaitUntilRequest request) { + final WorkerServiceResponseEntity responseEntity = workerService.handleAsyncStateWaitUntil(request); + return ResponseEntity.status(responseEntity.getStatusCode()).body(responseEntity.getBody()); } @PostMapping(API_PATH_ASYNC_STATE_EXECUTE) - public ResponseEntity handleAsyncStateExecute( - final @RequestBody AsyncStateExecuteRequest request - ) { - return ResponseEntity.ok(workerService.handleAsyncStateExecute(request)); + public ResponseEntity handleAsyncStateExecute(final @RequestBody AsyncStateExecuteRequest request) { + final WorkerServiceResponseEntity responseEntity = workerService.handleAsyncStateExecute(request); + return ResponseEntity.status(responseEntity.getStatusCode()).body(responseEntity.getBody()); } @PostMapping(API_PATH_PROCESS_RPC) - public ResponseEntity handleProcessRpc( - final @RequestBody ProcessRpcWorkerRequest request - ) { - return ResponseEntity.ok(workerService.handleProcessRpc(request)); + public ResponseEntity handleProcessRpc(final @RequestBody ProcessRpcWorkerRequest request) { + final WorkerServiceResponseEntity responseEntity = workerService.handleProcessRpc(request); + return ResponseEntity.status(responseEntity.getStatusCode()).body(responseEntity.getBody()); } }