diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/InvokeFailureFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/InvokeFailureFunction.java new file mode 100644 index 00000000..d1c8c951 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/InvokeFailureFunction.java @@ -0,0 +1,36 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.*; +import org.jetbrains.annotations.NotNull; + +import java.util.LinkedHashMap; + +public class InvokeFailureFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("invoke-failure-fn") + .name("Invoke Function") + .triggerEvent("test/invoke.failure"); + } + + @Override + public String execute(FunctionContext ctx, Step step) { + try { + step.invoke( + "failing-function", + "spring_test_demo", + "non-retriable-fn", + new LinkedHashMap(), + null, + Object.class); + } catch (StepError e) { + return e.getMessage(); + } + + return "An error should have been thrown and this message should not be returned"; + } +} diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java index b6f9527a..16d5464e 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java @@ -17,7 +17,7 @@ public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) @Override public String execute(FunctionContext ctx, Step step) { step.run("fail-step", () -> { - throw new NonRetriableError("something fatally went wrong"); + throw new NonRetriableError("Something fatally went wrong"); }, String.class); return "Success"; diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TryCatchRunFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TryCatchRunFunction.java new file mode 100644 index 00000000..d76473b2 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TryCatchRunFunction.java @@ -0,0 +1,36 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.*; +import org.jetbrains.annotations.NotNull; + +class CustomException extends RuntimeException { + public CustomException(String message) { + super(message); + } +} + +public class TryCatchRunFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("try-catch-run-fn") + .name("Try catch run") + .triggerEvent("test/try.catch.run") + .retries(0); + } + + @Override + public String execute(FunctionContext ctx, Step step) { + try { + step.run("fail-step", () -> { + throw new CustomException("Something fatally went wrong"); + }, String.class); + } catch (StepError e) { + return e.getMessage(); + } + + return "An error should have been thrown and this message should not be returned"; + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index cd23d5f7..e7d0c2f9 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -21,6 +21,8 @@ protected HashMap functions() { addInngestFunction(functions, new NonRetriableErrorFunction()); addInngestFunction(functions, new RetriableErrorFunction()); addInngestFunction(functions, new ZeroRetriesFunction()); + addInngestFunction(functions, new InvokeFailureFunction()); + addInngestFunction(functions, new TryCatchRunFunction()); return functions; } diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/ErrorsInStepsIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/ErrorsInStepsIntegrationTest.java index a2042384..d54f3ae5 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/ErrorsInStepsIntegrationTest.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/ErrorsInStepsIntegrationTest.java @@ -35,7 +35,7 @@ void testNonRetriableShouldFail() throws Exception { assertNotNull(run.getEnded_at()); assert output.get("name").contains("NonRetriableError"); assert output.get("stack").contains("NonRetriableErrorFunction.lambda$execute"); - assertEquals(output.get("message"), "something fatally went wrong"); + assertEquals(output.get("message"), "Something fatally went wrong"); } @Test diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/StepErrorsIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/StepErrorsIntegrationTest.java new file mode 100644 index 00000000..ea81986f --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/StepErrorsIntegrationTest.java @@ -0,0 +1,53 @@ +package com.inngest.springbootdemo; + +import com.inngest.Inngest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class StepErrorsIntegrationTest { + @Autowired + private DevServerComponent devServer; + + static int sleepTime = 5000; + + @Autowired + private Inngest client; + + @Test + void testShouldCatchStepErrorWhenInvokeThrows() throws Exception { + String eventId = InngestFunctionTestHelpers.sendEvent(client, "test/invoke.failure").first(); + + Thread.sleep(sleepTime); + + RunEntry run = devServer.runsByEvent(eventId).first(); + String output = (String) run.getOutput(); + + assertEquals("Completed", run.getStatus() ); + assertNotNull(run.getEnded_at()); + + assertEquals("Something fatally went wrong", output); + } + + @Test + void testShouldCatchStepErrorWhenRunThrows() throws Exception { + String eventId = InngestFunctionTestHelpers.sendEvent(client, "test/try.catch.run").first(); + + Thread.sleep(sleepTime); + + RunEntry run = devServer.runsByEvent(eventId).first(); + String output = (String) run.getOutput(); + + assertEquals("Completed", run.getStatus()); + assertNotNull(run.getEnded_at()); + + assertEquals("Something fatally went wrong", output); + } + +} diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt index f378e066..30cf8e52 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt @@ -26,6 +26,8 @@ fun Application.module() { RestoreFromGlacier(), ProcessUserSignup(), TranscodeVideo(), + ImageFromPrompt(), + PushToSlackChannel(), ), ) } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ImageFromPrompt.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ImageFromPrompt.kt new file mode 100644 index 00000000..55b599af --- /dev/null +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ImageFromPrompt.kt @@ -0,0 +1,47 @@ +package com.inngest.testserver + +import com.inngest.* + +class ImageFromPrompt : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("ImageFromPrompt") + .name("Image from Prompt") + .triggerEvent("media/prompt.created") + + override fun execute( + ctx: FunctionContext, + step: Step, + ): String { + val imageURL = + try { + step.run("generate-image-dall-e") { + // Call the DALL-E model to generate an image + throw Exception("Failed to generate image") + + "example.com/image-dall-e.jpg" + } + } catch (e: StepError) { + // Fall back to a different image generation model + step.run("generate-image-midjourney") { + // Call the MidJourney model to generate an image + "example.com/image-midjourney.jpg" + } + } + + try { + step.invoke>( + "push-to-slack-channel", + "ktor-dev", + "PushToSlackChannel", + mapOf("image" to imageURL), + null, + ) + } catch (e: StepError) { + // Pushing to Slack is not critical, so we can ignore the error, log it + // or handle it in some other way. + } + + return imageURL + } +} diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/PushToSlackChannel.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/PushToSlackChannel.kt new file mode 100644 index 00000000..f72a09cc --- /dev/null +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/PushToSlackChannel.kt @@ -0,0 +1,22 @@ +package com.inngest.testserver + +import com.inngest.* + +class PushToSlackChannel : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("PushToSlackChannel") + .name("Push to Slack Channel") + .triggerEvent("media/image.generated") + + override fun execute( + ctx: FunctionContext, + step: Step, + ): String = + step.run("push-to-slack-channel") { + // Call Slack API to push the image to a channel + throw NonRetriableError("Failed to push image to Slack channel ${ctx.event.data["image"]}") + + "Image pushed to Slack channel" + } +} diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index f817d251..33d3ddd7 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -52,6 +52,8 @@ data class CommError( val __serialized: Boolean = true, ) +private val stepTerminalStatusCodes = setOf(ResultStatusCode.StepComplete, ResultStatusCode.StepError) + class CommHandler( functions: Map, val client: Inngest, @@ -81,7 +83,7 @@ class CommHandler( val result = function.call(ctx = ctx, client = client, requestBody) var body: Any? = null - if (result.statusCode == ResultStatusCode.StepComplete || result is StepOptions) { + if (result.statusCode in stepTerminalStatusCodes || result is StepOptions) { body = listOf(result) } if (result is StepResult && result.statusCode == ResultStatusCode.FunctionComplete) { @@ -94,7 +96,8 @@ class CommHandler( ) } catch (e: Exception) { val retryDecision = RetryDecision.fromException(e) - val statusCode = if (retryDecision.shouldRetry) ResultStatusCode.RetriableError else ResultStatusCode.NonRetriableError + val statusCode = + if (retryDecision.shouldRetry) ResultStatusCode.RetriableError else ResultStatusCode.NonRetriableError val err = CommError( diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index f994eb61..7f5bcabc 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -6,6 +6,7 @@ import java.util.function.BiFunction // TODO - Add an abstraction layer between the Function call response and the comm handler response enum class OpCode { StepRun, + StepError, Sleep, StepStateFailed, // TODO Step, @@ -21,6 +22,7 @@ enum class ResultStatusCode( val message: String, ) { StepComplete(206, "Step Complete"), + StepError(206, "Step Error"), FunctionComplete(200, "Function Complete"), NonRetriableError(400, "Bad Request"), RetriableError(500, "Function Error"), @@ -40,6 +42,7 @@ data class StepResult( override val op: OpCode, override val statusCode: ResultStatusCode, val data: Any? = null, + val error: Exception? = null, ) : StepOp(id, name, op, statusCode) data class StepOptions( @@ -202,6 +205,14 @@ internal open class InternalInngestFunction( } }, ) + } catch (e: StepInterruptErrorException) { + return StepResult( + id = e.hashedId, + name = e.id, + op = OpCode.StepError, + statusCode = ResultStatusCode.StepError, + error = e.error, + ) } catch (e: StepInterruptException) { // NOTE - Currently this error could be caught in the user's own function // that wraps a diff --git a/inngest/src/main/kotlin/com/inngest/State.kt b/inngest/src/main/kotlin/com/inngest/State.kt index 616bb375..50e98942 100644 --- a/inngest/src/main/kotlin/com/inngest/State.kt +++ b/inngest/src/main/kotlin/com/inngest/State.kt @@ -38,8 +38,8 @@ class State( val dataNode = stepResult.get(fieldName) return mapper.treeToValue(dataNode, type) } else if (stepResult.has("error")) { - // TODO - Parse the error and throw it - return null + val error = mapper.treeToValue(stepResult.get("error"), StepError::class.java) + throw error } // NOTE - Sleep steps will be stored as null // TODO - Investigate if sendEvents stores null as well. diff --git a/inngest/src/main/kotlin/com/inngest/Step.kt b/inngest/src/main/kotlin/com/inngest/Step.kt index 60dc022e..8ffe17f6 100644 --- a/inngest/src/main/kotlin/com/inngest/Step.kt +++ b/inngest/src/main/kotlin/com/inngest/Step.kt @@ -71,6 +71,12 @@ class StepInterruptWaitForEventException( val ifExpression: String?, ) : StepInterruptException(id, hashedId, null) +class StepInterruptErrorException( + id: String, + hashedId: String, + val error: Exception, +) : StepInterruptException(id, hashedId, null) + class Step( private val state: State, val client: Inngest, @@ -80,6 +86,8 @@ class Step( * * @param id unique step id for memoization * @param fn the function to run + * + * @exception StepError if the function throws an [Exception]. */ inline fun run( id: String, @@ -100,15 +108,34 @@ class Step( } } catch (e: StateNotFound) { // If there is no existing result, run the lambda - val data = fn() - throw StepInterruptException(id, hashedId, data) + executeStep(id, hashedId, fn) + } catch (e: StepError) { + throw e } - // TODO - Catch Step Error here and throw it when error parsing is added to getState // TODO - handle invalidly stored step types properly throw Exception("step state incorrect type") } + private fun executeStep( + id: String, + hashedId: String, + fn: () -> T, + ) { + try { + val data = fn() + throw StepInterruptException(id, hashedId, data) + } catch (exception: Exception) { + when (exception) { + is RetryAfterError, + is NonRetriableError, + -> throw exception + + else -> throw StepInterruptErrorException(id, hashedId, exception) + } + } + } + /** * Invoke another Inngest function as a step * @@ -118,6 +145,8 @@ class Step( * @param data the data to pass within `event.data` to the function * @param timeout an optional timeout for the invoked function. If the invoked function does * not finish within this time, the invoked function will be marked as failed. + * + * @exception StepError if the invoked function fails. */ inline fun invoke( id: String, @@ -143,7 +172,10 @@ class Step( } } catch (e: StateNotFound) { throw StepInterruptInvokeException(id, hashedId, appId, fnId, data, timeout) + } catch (e: StepError) { + throw e } + // TODO - handle invalidly stored step types properly throw Exception("step state incorrect type") } diff --git a/inngest/src/main/kotlin/com/inngest/StepError.kt b/inngest/src/main/kotlin/com/inngest/StepError.kt new file mode 100644 index 00000000..e56a449e --- /dev/null +++ b/inngest/src/main/kotlin/com/inngest/StepError.kt @@ -0,0 +1,19 @@ +package com.inngest + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + +/** + * Wraps user-space errors that occurred during the execution of a step. + * + * @param message The user-space error message + * @param name The name of the user-space error + * @param stack The original stack trace of the user-space error + */ +@JsonIgnoreProperties(ignoreUnknown = true) +class StepError + @JvmOverloads + constructor( + message: String, + val name: String = "", + val stack: String = "", + ) : RuntimeException(message)