Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to write a local Redis or Garent based pub/sub ? #49

Open
zfchai opened this issue Apr 7, 2024 · 0 comments
Open

How to write a local Redis or Garent based pub/sub ? #49

zfchai opened this issue Apr 7, 2024 · 0 comments
Labels
kind/question A question about the SDK

Comments

@zfchai
Copy link

zfchai commented Apr 7, 2024

About redis pub/sub

  • the redis-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: pollIntervalSeconds
    value: 5
- name: maxMessages
    value: <max messages to receive per poll>
  • How do I write a pub/sub that uses the redis component?
using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.PubSub;
using Dapr.Proto.Components.V1;
using Microsoft.Extensions.Logging;

namespace Test.Dapr.WebApp3.Components;

internal sealed class RedisQueuesPubSub(ILogger<RedisQueuesPubSub> logger) : IPubSub
{
    private readonly ILogger<RedisQueuesPubSub> _logger = logger ?? throw new ArgumentNullException(nameof(logger));

    private string? redisHost;
    private string? redisPassword;
    private TimeSpan pollInterval = TimeSpan.FromSeconds(5);
    private int maxMessages = 5;

    // Called to initialize the component with its configured metadata...
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation("Init request");

        this.redisHost = request.Properties["redisHost"];
        this.redisPassword = request.Properties["redisPassword"];

        if (request.Properties.TryGetValue("pollIntervalSeconds", out var pollIntervalString))
        {
            this.pollInterval = TimeSpan.FromSeconds(int.Parse(pollIntervalString));
        }

        if (request.Properties.TryGetValue("maxMessages", out var maxMessagesString))
        {
            this.maxMessages = int.Parse(maxMessagesString);
        }

        return Task.CompletedTask;
    }

    // Send the message to the "topic"...
    public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation("Publish request");


        throw new NotImplementedException();
    }

    // Until canceled, check the topic for messages and deliver them to the Dapr runtime...
    public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation($"Pull messages request for topic {topic.Name}");

        // Poll the topic until canceled...
        while (!cancellationToken.IsCancellationRequested)
        {
            var messages = // Poll topic for messages...

            foreach (var message in messages)
            {
                // Deliver the message to the Dapr runtime...
                await deliveryHandler(
                    new PubSubPullMessagesResponse(topicName)
                    {
                        // Set the message content...
                    },
                    // Callback invoked when application acknowledges the message...
                    async errorMessage => {
                        // An empty message indicates the application successfully processed the message...
                        if (string.IsNullOrWhiteSpace(errorMessage))
                        {
                            // Delete the message from the topic...
                        }
                    });
            }

            // Wait for the next poll (or cancellation)...
            await Task.Delay(this.pollInterval, cancellationToken);
        }

    }
}

The redis component is deployed in a local docker environment, how do I implement the specific code, looking forward to your guidance!

@zfchai zfchai added the kind/question A question about the SDK label Apr 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/question A question about the SDK
Projects
None yet
Development

No branches or pull requests

1 participant