Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update streams-programming-apis.md implicit subscription section to b… #41988

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions docs/orleans/streaming/streams-programming-apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,24 @@ Below are the guidelines on how to write the subscription logic for various case

**Implicit subscriptions:**

For implicit subscriptions, the grain needs to subscribe to attach the processing logic. This should be done in the grain's `OnActivateAsync` method. The grain should simply execute `await stream.SubscribeAsync(OnNext ...)` in its `OnActivateAsync` method. That will cause this particular activation to attach the `OnNext` function to process that stream. The grain can optionally specify the `StreamSequenceToken` as an argument to `SubscribeAsync`, which will cause this implicit subscription to start consuming from that token. There is never a need for an implicit subscription to call `ResumeAsync`.
For implicit subscriptions, the grain still needs to subscribe to attach the processing logic. This can be done in the consumer grain by implementing the `IStreamSubscriptionObserver` and `IAsyncObserver<T>` interfaces, allowing the grain to activate separately from subscribing. To subscribe to the stream, the grain creates a handle and calls `await handle.ResumeAsync(this)` in its `OnSubscribed(...)` method.

To process messages, the `IAsyncObserver<T>.OnNextAsync(...)` method is implemented to receive stream data and a sequence token. Alternatively, the `ResumeAsync` method may take a set of delegates representing the methods of the `IAsyncObserver<T>` interface, `onNextAsync`, `onErrorAsync`, and `onCompletedAsync`.

<!-- markdownlint-disable MD044 -->
:::zone target="docs" pivot="orleans-7-0"
<!-- markdownlint-enable MD044 -->

```csharp
public override async Task OnActivateAsync(CancellationToken cancellationToken)
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
_logger.LogInformation($"Received an item from the stream: {item}");
}

await stream.SubscribeAsync(OnNextAsync);
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
jsedlak marked this conversation as resolved.
Show resolved Hide resolved
}
```

Expand Down
Loading