diff --git a/docs/orleans/streaming/streams-programming-apis.md b/docs/orleans/streaming/streams-programming-apis.md index b68ea443b3df4..862cfaa33d2e3 100644 --- a/docs/orleans/streaming/streams-programming-apis.md +++ b/docs/orleans/streaming/streams-programming-apis.md @@ -164,20 +164,24 @@ Below are the guidelines on how to write the subscription logic for various case **Implicit subscriptions:** -For implicit subscriptions, the grain needs to subscribe to attach the processing logic. This should be done in the grain's `OnActivateAsync` method. The grain should simply execute `await stream.SubscribeAsync(OnNext ...)` in its `OnActivateAsync` method. That will cause this particular activation to attach the `OnNext` function to process that stream. The grain can optionally specify the `StreamSequenceToken` as an argument to `SubscribeAsync`, which will cause this implicit subscription to start consuming from that token. There is never a need for an implicit subscription to call `ResumeAsync`. +For implicit subscriptions, the grain still needs to subscribe to attach the processing logic. This can be done in the consumer grain by implementing the `IStreamSubscriptionObserver` and `IAsyncObserver` interfaces, allowing the grain to activate separately from subscribing. To subscribe to the stream, the grain creates a handle and calls `await handle.ResumeAsync(this)` in its `OnSubscribed(...)` method. + +To process messages, the `IAsyncObserver.OnNextAsync(...)` method is implemented to receive stream data and a sequence token. Alternatively, the `ResumeAsync` method may take a set of delegates representing the methods of the `IAsyncObserver` interface, `onNextAsync`, `onErrorAsync`, and `onCompletedAsync`. :::zone target="docs" pivot="orleans-7-0" ```csharp -public override async Task OnActivateAsync(CancellationToken cancellationToken) +public Task OnNextAsync(string item, StreamSequenceToken? token = null) { - var streamProvider = this.GetStreamProvider(PROVIDER_NAME); - var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey()); - var stream = streamProvider.GetStream(streamId); + _logger.LogInformation($"Received an item from the stream: {item}"); +} - await stream.SubscribeAsync(OnNextAsync); +public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory) +{ + var handle = handleFactory.Create(); + await handle.ResumeAsync(this); } ```