Skip to content

Commit

Permalink
Add error handling (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
zklgame authored Nov 30, 2023
1 parent 0c595e0 commit 619c412
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 194 deletions.
2 changes: 1 addition & 1 deletion apis
Submodule apis updated 354 files
16 changes: 5 additions & 11 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,11 @@ java {
withSourcesJar()
}

//tasks.withType(Javadoc) {
// // Set the source files for which Javadoc will be generated
// source = sourceSets.main.allJava
//
// // Set the destination directory for the generated Javadoc
// destinationDir = file("$buildDir/docs/javadoc")
//
// // Specify the files or packages to be excluded from Javadoc generation
// exclude '**/AsyncStateOptions.java'
// exclude '**/ProcessOptions.java'
//}
tasks.withType(Javadoc) {
// Specify the files or packages to be excluded from Javadoc generation
exclude '**/AsyncStateOptions.java'
exclude '**/ProcessOptions.java'
}

repositories {
mavenCentral()
Expand Down
69 changes: 32 additions & 37 deletions src/main/java/io/xcherry/core/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,12 @@
import io.xcherry.core.rpc.RpcInterceptor;
import io.xcherry.core.state.AsyncState;
import io.xcherry.core.utils.ProcessUtil;
import io.xcherry.gen.models.GlobalAttributeConfig;
import io.xcherry.gen.models.GlobalAttributeTableConfig;
import io.xcherry.gen.models.LocalQueueMessage;
import io.xcherry.gen.models.ProcessExecutionDescribeResponse;
import io.xcherry.gen.models.ProcessExecutionStartRequest;
import io.xcherry.gen.models.ProcessExecutionStopType;
import io.xcherry.gen.models.PublishToLocalQueueRequest;
import io.xcherry.gen.models.TableColumnValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import net.bytebuddy.ByteBuddy;
Expand Down Expand Up @@ -316,28 +311,28 @@ private io.xcherry.gen.models.ProcessStartConfig toApiModel(
return null;
}

final GlobalAttributeConfig globalAttributeConfig;
if (globalAttributesToUpsert.isEmpty()) {
globalAttributeConfig = null;
} else {
globalAttributeConfig = new GlobalAttributeConfig();

for (final PersistenceTableRowToUpsert tableRowToUpsert : globalAttributesToUpsert.values()) {
globalAttributeConfig.addTableConfigsItem(
new GlobalAttributeTableConfig()
.tableName(tableRowToUpsert.getTableName())
// TODO
// .primaryKey(toApiModel(tableRowToUpsert.getPrimaryKeyColumns()))
.initialWrite(toApiModel(tableRowToUpsert.getOtherColumns()))
.initialWriteMode(tableRowToUpsert.getWriteConflictMode())
);
}
}
// final GlobalAttributeConfig globalAttributeConfig;
// if (globalAttributesToUpsert.isEmpty()) {
// globalAttributeConfig = null;
// } else {
// globalAttributeConfig = new GlobalAttributeConfig();
//
// for (final PersistenceTableRowToUpsert tableRowToUpsert : globalAttributesToUpsert.values()) {
// globalAttributeConfig.addTableConfigsItem(
// new GlobalAttributeTableConfig()
// .tableName(tableRowToUpsert.getTableName())
// // TODO
// // .primaryKey(toApiModel(tableRowToUpsert.getPrimaryKeyColumns()))
// .initialWrite(toApiModel(tableRowToUpsert.getOtherColumns()))
// .initialWriteMode(tableRowToUpsert.getWriteConflictMode())
// );
// }
// }

return new io.xcherry.gen.models.ProcessStartConfig()
.timeoutSeconds(processStartConfig.getTimeoutSeconds())
.idReusePolicy(processStartConfig.getProcessIdReusePolicy())
.globalAttributeConfig(globalAttributeConfig);
.idReusePolicy(processStartConfig.getProcessIdReusePolicy());
// .globalAttributeConfig(globalAttributeConfig);
}

private void validateThePersistenceSchema(
Expand Down Expand Up @@ -417,19 +412,19 @@ private void validateThePersistenceSchema(
});
}

private List<TableColumnValue> toApiModel(final Map<String, Object> columnNameToValueMap) {
final List<TableColumnValue> columns = new ArrayList<>();

columnNameToValueMap.forEach((k, v) -> {
columns.add(
new TableColumnValue()
.dbColumn(k)
.dbQueryValue(clientOptions.getDatabaseStringEncoder().encodeToString(v))
);
});

return columns;
}
// private List<TableColumnValue> toApiModel(final Map<String, Object> columnNameToValueMap) {
// final List<TableColumnValue> columns = new ArrayList<>();
//
// columnNameToValueMap.forEach((k, v) -> {
// columns.add(
// new TableColumnValue()
// .dbColumn(k)
// .dbQueryValue(clientOptions.getDatabaseStringEncoder().encodeToString(v))
// );
// });
//
// return columns;
// }

/**
* Publish message(s) to the local queue for consumption by the process execution.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/xcherry/core/context/Context.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.xcherry.core.context;

import io.xcherry.gen.models.StateApiType;
import io.xcherry.gen.models.WorkerApiType;
import lombok.Builder;
import lombok.Getter;

Expand All @@ -15,7 +15,7 @@ public class Context {
private final long firstAttemptTimestamp;
private final int attempt;
private final String recoverFromStateExecutionId;
private final StateApiType recoverFromApi;
private final WorkerApiType recoverFromApi;

public static Context fromApiModel(final io.xcherry.gen.models.Context context) {
return Context
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/xcherry/core/exception/HttpException.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.xcherry.core.exception.status.ProcessNotFoundException;
import io.xcherry.gen.models.ApiErrorResponse;
import io.xcherry.gen.models.EncodedObject;
import io.xcherry.gen.models.ErrorSubType;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
Expand Down Expand Up @@ -37,9 +38,10 @@ public HttpException(final ObjectEncoder objectEncoder, final FeignException.Fei
}
}

// TODO
apiErrorResponse =
new ApiErrorResponse().detail("empty or unable to decode to apiErrorResponse: " + decodeErrorMessage);
new ApiErrorResponse()
.errorSubType(ErrorSubType.UNCATEGORIZED_ERROR)
.details("empty or unable to decode to apiErrorResponse: " + decodeErrorMessage);
}

public static HttpException fromFeignException(
Expand Down
112 changes: 52 additions & 60 deletions src/main/java/io/xcherry/core/persistence/Persistence.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package io.xcherry.core.persistence;

import com.google.common.collect.ImmutableList;
import io.xcherry.core.encoder.base.DatabaseStringEncoder;
import io.xcherry.core.exception.persistence.GlobalAttributeNotFoundException;
import io.xcherry.core.persistence.schema.PersistenceSchema;
import io.xcherry.gen.models.GlobalAttributeTableRowUpdate;
import io.xcherry.gen.models.LoadGlobalAttributeResponse;
import io.xcherry.gen.models.TableColumnValue;
import io.xcherry.gen.models.TableReadResponse;
import java.util.ArrayList;
import io.xcherry.gen.models.AppDatabaseReadResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Persistence {
Expand All @@ -27,43 +21,42 @@ public class Persistence {
private final DatabaseStringEncoder databaseStringEncoder;

public Persistence(
final LoadGlobalAttributeResponse loadGlobalAttributeResponse,
final AppDatabaseReadResponse appDatabaseReadResponse,
final PersistenceSchema persistenceSchema,
final DatabaseStringEncoder databaseStringEncoder
) {
this.databaseStringEncoder = databaseStringEncoder;

if (loadGlobalAttributeResponse == null) {
if (appDatabaseReadResponse == null) {
return;
}

final List<TableReadResponse> tableResponses = loadGlobalAttributeResponse.getTableResponses() == null
? ImmutableList.of()
: loadGlobalAttributeResponse.getTableResponses();

for (final TableReadResponse tableResponse : tableResponses) {
if (!globalAttributes.containsKey(tableResponse.getTableName())) {
globalAttributes.put(tableResponse.getTableName(), new HashMap<>());
}

final List<TableColumnValue> columns = tableResponse.getColumns() == null
? ImmutableList.of()
: tableResponse.getColumns();

for (final TableColumnValue column : columns) {
final Class<?> columnValueType = persistenceSchema.getGlobalAttributeColumnValueType(
tableResponse.getTableName(),
column.getDbColumn()
);

globalAttributes
.get(tableResponse.getTableName())
.put(
column.getDbColumn(),
databaseStringEncoder.decodeFromString(column.getDbQueryValue(), columnValueType)
);
}
}
// final List<TableReadResponse> tableResponses = loadGlobalAttributeResponse.getTableResponses() == null
// ? ImmutableList.of()
// : loadGlobalAttributeResponse.getTableResponses();
//
// for (final TableReadResponse tableResponse : tableResponses) {
// if (!globalAttributes.containsKey(tableResponse.getTableName())) {
// globalAttributes.put(tableResponse.getTableName(), new HashMap<>());
// }
//
// final List<TableColumnValue> columns = tableResponse.getColumns() == null
// ? ImmutableList.of()
// : tableResponse.getColumns();
//
// for (final TableColumnValue column : columns) {
// final Class<?> columnValueType = persistenceSchema.getGlobalAttributeColumnValueType(
// tableResponse.getTableName(),
// column.getDbColumn()
// );
//
// globalAttributes
// .get(tableResponse.getTableName())
// .put(
// column.getDbColumn(),
// databaseStringEncoder.decodeFromString(column.getDbQueryValue(), columnValueType)
// );
// }
// }
}

/**
Expand Down Expand Up @@ -107,27 +100,26 @@ public void upsertGlobalAttribute(final String tableName, final String columnNam

globalAttributesToUpdate.get(tableName).put(columnName, columnValue);
}

public List<GlobalAttributeTableRowUpdate> getGlobalAttributesToUpsert(final PersistenceSchema schema) {
final List<GlobalAttributeTableRowUpdate> globalAttributes = new ArrayList<>();

globalAttributesToUpdate.forEach((tableName, columnsToUpdate) -> {
final List<TableColumnValue> columns = new ArrayList<>();

columnsToUpdate.forEach((columnName, value) -> {
// Just trying to get the value type to make sure this is a valid column defined in the schema
final Class<?> columnValueType = schema.getGlobalAttributeColumnValueType(tableName, columnName);

columns.add(
new TableColumnValue()
.dbColumn(columnName)
.dbQueryValue(databaseStringEncoder.encodeToString(value))
);
});

globalAttributes.add(new GlobalAttributeTableRowUpdate().tableName(tableName).updateColumns(columns));
});

return globalAttributes;
}
// public List<GlobalAttributeTableRowUpdate> getGlobalAttributesToUpsert(final PersistenceSchema schema) {
// final List<GlobalAttributeTableRowUpdate> globalAttributes = new ArrayList<>();
//
// globalAttributesToUpdate.forEach((tableName, columnsToUpdate) -> {
// final List<TableColumnValue> columns = new ArrayList<>();
//
// columnsToUpdate.forEach((columnName, value) -> {
// // Just trying to get the value type to make sure this is a valid column defined in the schema
// final Class<?> columnValueType = schema.getGlobalAttributeColumnValueType(tableName, columnName);
//
// columns.add(
// new TableColumnValue()
// .dbColumn(columnName)
// .dbQueryValue(databaseStringEncoder.encodeToString(value))
// );
// });
//
// globalAttributes.add(new GlobalAttributeTableRowUpdate().tableName(tableName).updateColumns(columns));
// });
//
// return globalAttributes;
// }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.xcherry.core.persistence;

import io.xcherry.gen.models.AttributeWriteConflictMode;
import io.xcherry.gen.models.WriteConflictMode;
import java.util.Map;
import lombok.AccessLevel;
import lombok.Getter;
Expand All @@ -19,7 +19,7 @@ public class PersistenceTableRowToUpsert {
* column name: column value
*/
private final Map<String, Object> otherColumns;
private final AttributeWriteConflictMode writeConflictMode;
private final WriteConflictMode writeConflictMode;

/**
* Create a table row to upsert.
Expand All @@ -34,7 +34,7 @@ public static PersistenceTableRowToUpsert create(
final String tableName,
final Map<String, Object> primaryKeyColumns,
final Map<String, Object> otherColumns,
final AttributeWriteConflictMode writeConflictMode
final WriteConflictMode writeConflictMode
) {
return new PersistenceTableRowToUpsert(tableName, primaryKeyColumns, otherColumns, writeConflictMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.xcherry.core.exception.persistence.GlobalAttributeNotFoundException;
import io.xcherry.core.persistence.schema_to_load.PersistenceTableSchemaToLoadData;
import io.xcherry.gen.models.TableReadLockingPolicy;
import io.xcherry.gen.models.DatabaseLockingType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -24,7 +24,7 @@ public class PersistenceTableSchema {
* column name : column schema
*/
private final Map<String, PersistenceTableColumnSchema> otherColumns;
private final TableReadLockingPolicy tableReadLockingPolicy;
private final DatabaseLockingType databaseLockingType;

/**
* Create a table schema to be used in {@link PersistenceSchema} with the default NO_LOCKING reading policy.
Expand All @@ -37,20 +37,20 @@ public static PersistenceTableSchema create(
final String tableName,
final PersistenceTableColumnSchema... tableColumnSchemas
) {
return PersistenceTableSchema.create(tableName, TableReadLockingPolicy.NO_LOCKING, tableColumnSchemas);
return PersistenceTableSchema.create(tableName, DatabaseLockingType.NO_LOCKING, tableColumnSchemas);
}

/**
* Create a table schema to be used in {@link PersistenceSchema}.
*
* @param tableName table name.
* @param tableReadLockingPolicy locking policy when reading the table.
* @param tableColumnSchemas the schema of all the table columns.
* @param databaseLockingType locking policy when reading the database.
* @param tableColumnSchemas the schema of all the table columns.
* @return the created table schema.
*/
public static PersistenceTableSchema create(
final String tableName,
final TableReadLockingPolicy tableReadLockingPolicy,
final DatabaseLockingType databaseLockingType,
final PersistenceTableColumnSchema... tableColumnSchemas
) {
final Map<String, PersistenceTableColumnSchema> primaryKeyColumns = new HashMap<>();
Expand All @@ -64,7 +64,7 @@ public static PersistenceTableSchema create(
}
}

return new PersistenceTableSchema(tableName, primaryKeyColumns, otherColumns, tableReadLockingPolicy);
return new PersistenceTableSchema(tableName, primaryKeyColumns, otherColumns, databaseLockingType);
}

public Class<?> getColumnValueType(final String columnName) {
Expand Down Expand Up @@ -95,6 +95,6 @@ public PersistenceTableSchemaToLoadData getPersistenceTableSchemaToLoadData() {
}
});

return PersistenceTableSchemaToLoadData.create(tableName, columnsToLoadData, tableReadLockingPolicy);
return PersistenceTableSchemaToLoadData.create(tableName, columnsToLoadData, databaseLockingType);
}
}
Loading

0 comments on commit 619c412

Please sign in to comment.