From 54d87d35552b163ac168c1148eaf6ad0ef972ae9 Mon Sep 17 00:00:00 2001 From: lucian Date: Fri, 19 May 2023 11:03:58 +0300 Subject: [PATCH] fix server shutdown requested (#41) --- examples/components/comp-nats.yaml | 27 ++++++++++--------- examples/components/config-api-pipeline.yaml | 11 -------- examples/components/config-node-pipeline.yaml | 18 ++++++------- pkg/api/runtime/grpc/grpc_api.go | 2 +- 4 files changed, 24 insertions(+), 34 deletions(-) delete mode 100644 examples/components/config-api-pipeline.yaml diff --git a/examples/components/comp-nats.yaml b/examples/components/comp-nats.yaml index f22b662..17775f2 100644 --- a/examples/components/comp-nats.yaml +++ b/examples/components/comp-nats.yaml @@ -6,16 +6,17 @@ spec: type: pubsub.natsstreaming version: v1 metadata: - - name: natsURL - value: "nats://kube-worker1.totalsoft.local:31291" - - name: natsStreamingClusterID - value: "faas-cluster" - # below are subscription configuration. - - name: subscriptionType - value: queue # Required. Allowed values: topic, queue. - - name: ackWaitTime - value: "" # Optional. - - name: maxInFlight - value: "1" # Optional. - - name: durableSubscriptionName - value: "" # Optional. \ No newline at end of file + - name: natsURL + value: "nats://kube-worker1:31291" + - name: natsStreamingClusterID + value: faas-cluster + - name: subscriptionType + value: queue + - name: connectWait + value: 10s + - name: ackWaitTime + value: 50s + - name: maxInFlight + value: '1' + - name: durableSubscriptionName + value: durable diff --git a/examples/components/config-api-pipeline.yaml b/examples/components/config-api-pipeline.yaml deleted file mode 100644 index 2db3aa2..0000000 --- a/examples/components/config-api-pipeline.yaml +++ /dev/null @@ -1,11 +0,0 @@ -apiVersion: rusi.io/v1alpha1 -kind: Configuration -metadata: - name: api-pipeline-config -spec: - publisherPipeline: - handlers: - - name: pubsub-uppercase - type: middleware.pubsub.uppercase - pubSub: - name: natsstreaming-pubsub diff --git a/examples/components/config-node-pipeline.yaml b/examples/components/config-node-pipeline.yaml index bea194f..c4b3883 100644 --- a/examples/components/config-node-pipeline.yaml +++ b/examples/components/config-node-pipeline.yaml @@ -5,14 +5,14 @@ metadata: spec: metric: enabled: true - tracing: - samplingRate: '1' - jaeger: - useAgent: false - collectorEndpointAddress: 'http://kube-worker1.totalsoft.local:31034/api/traces' - subscriberPipeline: - handlers: - - name: pubsub-uppercase - type: middleware.pubsub.uppercase +# tracing: +# samplingRate: '1' +# jaeger: +# useAgent: false +# collectorEndpointAddress: 'http://kube-worker1.totalsoft.local:31034/api/traces' +# subscriberPipeline: +# handlers: +# - name: pubsub-uppercase +# type: middleware.pubsub.uppercase pubSub: name: natsstreaming-pubsub diff --git a/pkg/api/runtime/grpc/grpc_api.go b/pkg/api/runtime/grpc/grpc_api.go index b820261..940ff15 100644 --- a/pkg/api/runtime/grpc/grpc_api.go +++ b/pkg/api/runtime/grpc/grpc_api.go @@ -166,7 +166,7 @@ func (srv *rusiServerImpl) Subscribe(stream v1.Rusi_SubscribeServer) error { if exit { klog.V(4).InfoS("closing subscription stream", "topic", request.Topic, "error", err) if errors.Is(err, context.Canceled) { - return nil + err = status.Error(codes.Aborted, "Cancellation requested") } return err }