Skip to content

Commit

Permalink
Add idempotency configuration (#80)
Browse files Browse the repository at this point in the history
Copied doc from
https://github.com/inngest/inngest/blob/6594ccb692121bfe62b11676fe002416df13300b/docs/SDK_SPEC.md?plain=1#L534-L538

https://www.inngest.com/docs/guides/handling-idempotency#at-the-function-level-the-consumer

The integration test validates idempotency by
- checking an observable side effect, in this case incrementing a static
  counter variable
- check that the second run of the same idempotency was ignored by the
  dev server
  • Loading branch information
albertchae authored Sep 10, 2024
1 parent 937f7af commit bf5da4f
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.*;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;

public class IdempotentFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("idempotent-fn")
.name("Idempotent Function")
.triggerEvent("test/idempotent")
.idempotency("event.data.companyId");
}

@Getter
private static int counter = 0;
@Override
public Integer execute(FunctionContext ctx, Step step) {
return step.run("increment-count", () -> counter++, Integer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new ThrottledFunction());
addInngestFunction(functions, new DebouncedFunction());
addInngestFunction(functions, new PriorityFunction());
addInngestFunction(functions, new IdempotentFunction());

return functions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.inngest.springbootdemo;

import com.inngest.Inngest;
import com.inngest.springbootdemo.testfunctions.IdempotentFunction;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collections;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class IdempotentFunctionIntegrationTest {
@Autowired
private DevServerComponent devServer;

@Autowired
private Inngest client;

@Test
void testIdempotencyKey() throws Exception {
Map dataPayload = Collections.singletonMap("companyId", 42);
String eventWithIdempotencyKey = InngestFunctionTestHelpers.sendEvent(client, "test/idempotent", dataPayload).getIds()[0];
String eventWithSameIdempotencyKey = InngestFunctionTestHelpers.sendEvent(client, "test/idempotent", dataPayload).getIds()[0];

Thread.sleep(2000);

// With the same idempotency key, only one of the events should have run
RunEntry<Object> firstRun = devServer.runsByEvent(eventWithIdempotencyKey).first();
assertEquals(0, devServer.runsByEvent(eventWithSameIdempotencyKey).data.length);
assertEquals("Completed", firstRun.getStatus());

// This would be 2 if the function was not idempotent
assertEquals(1, IdempotentFunction.getCounter());


Map differentDataPayload = Collections.singletonMap("companyId", 43);
String eventWithDifferentIdempotencyKey = InngestFunctionTestHelpers.sendEvent(client, "test/idempotent", differentDataPayload).getIds()[0];

Thread.sleep(2000);

// Event with a different idempotency key will run once
RunEntry<Object> otherRun = devServer.runsByEvent(eventWithDifferentIdempotencyKey).first();
assertEquals("Completed", otherRun.getStatus());
assertEquals(2, IdempotentFunction.getCounter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import com.inngest.*;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class InngestFunctionTestHelpers {

static SendEventsResponse sendEvent(Inngest inngest, String eventName) {
InngestEvent event = new InngestEvent(eventName, new HashMap<String, String>());
return sendEvent(inngest, eventName, new HashMap());
}

static SendEventsResponse sendEvent(Inngest inngest, String eventName, Map<String, String> data) {
InngestEvent event = new InngestEvent(eventName, data);
SendEventsResponse response = inngest.send(event);

assert Objects.requireNonNull(response).getIds().length > 0;
Expand Down
3 changes: 3 additions & 0 deletions inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ internal class InternalFunctionConfig
val throttle: Throttle? = null,
@Json(serializeNull = false)
val debounce: Debounce? = null,
@Json(serializeNull = false)
val priority: Priority? = null,
@Json(serializeNull = false)
val idempotency: String? = null,
@Json(serializeNull = false)
val batchEvents: BatchEvents? = null,
val steps: Map<String, StepConfig>,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class InngestFunctionConfigBuilder {
private var throttle: Throttle? = null
private var debounce: Debounce? = null
private var priority: Priority? = null
private var idempotency: String? = null
private var batchEvents: BatchEvents? = null

/**
Expand Down Expand Up @@ -182,7 +183,6 @@ class InngestFunctionConfigBuilder {
): InngestFunctionConfigBuilder = apply { this.debounce = Debounce(period, key, timeout) }

/**
*
* Configure how the priority of a function run is decided when multiple
* functions are triggered at the same time.
*
Expand All @@ -194,6 +194,15 @@ class InngestFunctionConfigBuilder {
*/
fun priority(run: String): InngestFunctionConfigBuilder = apply { this.priority = Priority(run) }

/**
* Allow the specification of an idempotency key using event data. If
* specified, this overrides the `rateLimit` object.
*
* @param idempotencyKey An expression using event payload data for a
* unique string key for idempotency.
*/
fun idempotency(idempotencyKey: String): InngestFunctionConfigBuilder = apply { this.idempotency = idempotencyKey }

private fun buildSteps(serveUrl: String): Map<String, StepConfig> {
val scheme = serveUrl.split("://")[0]
return mapOf(
Expand Down Expand Up @@ -228,6 +237,7 @@ class InngestFunctionConfigBuilder {
throttle,
debounce,
priority,
idempotency,
batchEvents,
steps = buildSteps(serverUrl),
)
Expand Down

0 comments on commit bf5da4f

Please sign in to comment.