Skip to content

Fixed: Redis-backed queues do not consume further messages once a queue is empty #365

Fixed: Redis-backed queues do not consume further messages once a queue is empty

Fixed: Redis-backed queues do not consume further messages once a queue is empty #365

Workflow file for this run

name: Integration Test WF
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
check:
runs-on: ubuntu-latest
outputs:
packages: ${{ steps.filter.outputs.changes }}
steps:
- name: Check out code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2
id: filter
with:
filters: |
kafkachanges:
- 'kafka-http-connector/**'
rabbitmqchanges:
- 'rabbitmq-http-connector/**'
sqschanges:
- 'aws-sqs-http-connector/**'
redischanges:
- 'redis-http-connector/**'
natschanges:
- 'nats-streaming-http-connector/**'
natsjetstreamchanges:
- 'nats-jetstream-http-connector/**'
kafka:
needs: check
if: contains(needs.check.outputs.packages, 'kafkachanges')
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Setup Keda
uses: ./.github/actions/setup-keda
- name: Deploy Kafka Cluster
run: |
kubectl create namespace kafka
curl -L http://strimzi.io/install/latest | sed 's/namespace: .*/namespace: kafka/' | kubectl create -f - -n kafka
sleep 10s
kubectl create -f kafka-http-connector/test/kubernetes/kafka-cluster.yaml
echo "Kafka Cluster is getting up."
kubectl wait -f kafka-http-connector/test/kubernetes/kafka-cluster.yaml --for=condition=ready --timeout=-1s -n kafka
sleep 10s
kubectl get pods -n kafka
kubectl wait pod -l app.kubernetes.io/name=zookeeper --for=condition=ready --timeout=-1s -n kafka
- name: Create Kafka topics
run: |
kubectl apply -f kafka-http-connector/test/kubernetes/kafka-req-topic.yaml
kubectl apply -f kafka-http-connector/test/kubernetes/kafka-err-topic.yaml
kubectl apply -f kafka-http-connector/test/kubernetes/kafka-res-topic.yaml
- name: Deploy Kafka Keda Connector and Keda ScaledObject
run: |
ko apply -f kafka-http-connector/test/kubernetes/keda-deployment.yml
kubectl get pods -n kafka
sleep 15s
kubectl get pods -n kafka
kubectl wait pod -l keda=kafka --for=condition=ready --timeout=30s -n kafka
kubectl apply -f kafka-http-connector/test/kubernetes/Keda-ScaledObj.yml -n kafka
- name: Produce Kafka messages Using Producer
run: |
ko apply -f kafka-http-connector/test/kubernetes/kafka-produer.yaml
kubectl wait job -l app=pi --for=condition=complete --timeout=-1s -n kafka
kubectl delete job pi -n kafka
- name: Collect Kafka Consumer Messages
run: |
kubectl get pods -n kafka
ko apply -f kafka-http-connector/test/consumer/consumer-deployment.yaml
kubectl wait pod -l app=consumer --for=condition=ready --timeout=-1s -n kafka
kubectl logs -l app=consumer --all-containers=true -n kafka
rabbitmq:
needs: check
if: contains(needs.check.outputs.packages, 'rabbitmqchanges')
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Setup Keda
uses: ./.github/actions/setup-keda
- name: Deploy Rabbitmq Deployment files
run: |
cd rabbitmq-http-connector/test/kubernetes/
kubectl create ns rabbits
kubectl apply -n rabbits -f rabbit-rbac.yaml
kubectl apply -n rabbits -f rabbit-configmap.yaml
kubectl apply -n rabbits -f rabbit-secret.yaml
kubectl apply -n rabbits -f rabbit-statefulset.yaml
bash rabbitmq-cluster-instances.sh
- name: Publish Rabbitmq messages in a queue
run: |
cd rabbitmq-http-connector/test/publisher/
kubectl apply -f deployment.yaml
ko apply -f publisher-job.yaml
- name: Bring up the Rabbitmq consumer queue and Listen for incoming messages
run: |
cd rabbitmq-http-connector/test/consumer/
ko apply -f consumer-deployment.yaml
- name: Deploy Rabbitmq Keda Connector and Keda ScaledObject
run: |
cd rabbitmq-http-connector/test/kubernetes/
ko apply -f keda-deployment.yml
kubectl wait pod -l keda=rabbitmq --for=condition=ready --timeout=-1s -n rabbits
kubectl apply -f Keda-ScaledObj.yml
- name: Get Rabbitmq consumed messages from queue
run: |
sleep 10s
kubectl logs -n rabbits deployment.apps/rabbitmq-consumer
sqs:
needs: check
if: contains(needs.check.outputs.packages, 'sqschanges')
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Setup Keda
uses: ./.github/actions/setup-keda
- name: Deploy Localstack with helm
run: |
helm repo add localstack-charts https://localstack.github.io/helm-charts
helm repo update
helm install localstack localstack-charts/localstack
- name: Deploy SQS Keda Connector, Keda ScaledObject and SQS test queue
run: |
cd aws-sqs-http-connector/test/kubernetes
kubectl apply -f aws-secret.yaml
ko apply -f keda-deployment.yml
kubectl apply -f keda-scaledObj.yml
ko apply -f test.yaml
sleep 10s
- name: Build and Deploy SQS test queue
run: |
kubectl get pods
kubectl wait pod -l app=queue --for=condition=ready --timeout=200s
- name: Get SQS consumed messages from queue
run: |
sleep 30s
kubectl logs -l app=queue
redis:
needs: check
if: contains(needs.check.outputs.packages, 'redischanges')
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Setup Keda
uses: ./.github/actions/setup-keda
- name: Deploy Redis cluster
run: |
helm repo add ot-helm https://ot-container-kit.github.io/helm-charts/
kubectl create namespace redis-operator
helm upgrade redis-operator ot-helm/redis-operator --install --namespace redis-operator --version 0.7.0
sleep 50s
cd redis-http-connector/test/kubernetes
kubectl create namespace ot-operators
helm upgrade redis ot-helm/redis -f redis-values.yaml --install --namespace ot-operators --version 0.7.0
sleep 50s
kubectl get pods -n ot-operators
- name: Deploy Redis Keda Connector and Keda ScaledObject
run: |
cd redis-http-connector/test/kubernetes/
ko apply -f keda-deployment.yaml
sleep 30s
kubectl apply -f Keda-ScaledObj.yaml
sleep 30s
- name: Publish Redis messages in a queue
run: |
cd redis-http-connector/test/publisher/
ko apply -f publisher-deployment.yaml
sleep 30s
- name: Bring up the Redis consumer queue and Listen for incoming messages
run: |
cd redis-http-connector/test/consumer/
ko apply -f consumer-deployment.yaml
- name: Get Redis consumed messages from queue
run: |
sleep 30s
kubectl logs deployment.apps/consumer-deployment
kubectl logs deployment.apps/consumer-deployment | grep -q "consumed"
echo $?
nats:
needs: check
if: contains(needs.check.outputs.packages, 'natschanges')
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Setup Keda
uses: ./.github/actions/setup-keda
- name: Deploy Nats streaming cluster
run: |
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm repo update
helm install nats nats/nats
sleep 50s
helm install my-stan nats/stan --set stan.nats.url=nats://nats:4222
sleep 30s
kubectl get pods
- name: Deploy Nats Keda Connector and Keda ScaledObject
run: |
cd nats-streaming-http-connector/test/kubernetes/
ko apply -f Keda-deployment.yaml
sleep 30s
kubectl apply -f keda-ScaledObject.yaml
sleep 30s
- name: Publish Nats messages in a queue
run: |
cd nats-streaming-http-connector/test/producer/
ko apply -f deployment.yaml
sleep 30s
- name: Bring up the Nats consumer queue and Listen for incoming messages
run: |
cd nats-streaming-http-connector/test/consumer/
ko apply -f deployment.yaml
- name: Get Nats consumed messages from queue
run: |
sleep 30s
kubectl logs deployment.apps/consumer-deployment
kubectl logs deployment.apps/consumer-deployment | grep -q "consumed"
echo $?
natsjetstream:
needs: check
if: contains(needs.check.outputs.packages, 'natsjetstreamchanges')
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Setup Keda
uses: ./.github/actions/setup-keda
- name: Deploy Jetstream Nats cluster
run: |
cd nats-jetstream-http-connector/test/jetstream
kubectl apply -f jetstream-server.yaml
- name: Publish Nats messages in a stream
run: |
cd nats-jetstream-http-connector/test/producer/
ko apply -f deployment.yaml
sleep 30s
- name: Bring up the Nats consumer stream and Listen for incoming messages
run: |
cd nats-jetstream-http-connector/test/consumer/
ko apply -f deployment.yaml
- name: Deploy Nats Keda Connector and Keda ScaledObject
run: |
cd nats-jetstream-http-connector/test/kubernetes/
ko apply -f Keda-deployment.yaml
sleep 30s
kubectl apply -f keda-ScaledObject.yaml
sleep 30s
- name: Get Nats consumed messages from stream
run: |
# sleep 90s
kubectl wait pod -l app=consumer --for=condition=ready --timeout=-1s
kubectl logs deployment.apps/consumer-deployment | grep -q "consumed"
echo $?