Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google PubSub: PubSub subscriber not consuming when in K8S (works locally) #4737

Closed
anorth2 opened this issue Jan 11, 2018 · 8 comments
Closed
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. release blocking Required feature/issue must be fixed prior to next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@anorth2
Copy link

anorth2 commented Jan 11, 2018

Possibly related to this: googleapis/nodejs-pubsub#11
Note that this issue occurs most of the time but not always. Sometimes it works with no rhyme or reason as to why.

  1. OS type and version
    Alpine Linux Docker image deployed in K8S
    Docker image works when deployed locally on Mac (High Sierra)

  2. Python version and virtual environment information python --version
    Python 3.6.3
    google-cloud-pubsub==0.30.1

  3. Stacktrace if available
    K8S: CI logs

2018-01-11 18:21:25,870 | root |  INFO: Listening for messages on projects/oem-services-dev/subscriptions/consumer-python-integration-test-20180111172948

locally:

pubsub-consumer_1  | 2018-01-11 18:01:24,214 | root |  INFO: Listening for messages on projects/oem-services-dev/subscriptions/consumer-python-integration-test-20180111172948
pubsub-consumer_1  | 2018-01-11 18:01:26,485 | root |  INFO: Received message: test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,499 | root |  INFO: Received message: test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,502 | root |  INFO: Received message: test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,504 | root |  INFO: Received message: test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,515 | root |  INFO: Received message: test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,519 | root |  INFO: Received message: test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,529 | root |  INFO: Response from server: Received message: data=test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,529 | root |  INFO: Received message: test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,536 | root |  INFO: Response from server: Received message: data=test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,537 | root |  INFO: Response from server: Received message: data=test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,539 | root |  INFO: Response from server: Received message: data=test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,543 | root |  INFO: Response from server: Received message: data=test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,545 | root |  INFO: Response from server: Received message: data=test_message
pubsub-consumer_1  | 2018-01-11 18:01:26,547 | root |  INFO: Received message: test_message
  1. Steps to reproduce
    Deploy subscriber to K8S, watch it fail.

  2. Code example
    callback code:

@callback_time.time()
def callback(message):
    try:
        messages_received.inc()
        message_data = message.data.decode('utf-8')

        logging.info('Received message: {}'.format(message_data))
        logging.debug(f"Sending to http://{destination_address}:{destination_port}/{destination_endpoint}")

        response = requests.post(f"http://{destination_address}:{destination_port}/{destination_endpoint}", data={"data": message_data})
        if not 200 <= response.status_code < 300:
            logging.error('Failed to send message')
            return
        logging.info(f"Response from server: {response.text}")
        messages_sent.inc()
        message.ack()
    except Exception as err:
        logging.exception(err)
        exception_count.inc()


logging.info('Listening for messages on {}'.format(subscription_path))

flow_control = pubsub_v1.types.FlowControl(max_messages=300)

Using GitHub flavored markdown can help make your request clearer.
See: https://guides.github.com/features/mastering-markdown/

@chemelnucfin chemelnucfin added api: pubsub Issues related to the Pub/Sub API. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Jan 11, 2018
@gylu
Copy link

gylu commented Jan 18, 2018

I might be experiencing the same thing. If I wait long enough on my OSX, it also happens. But on both K8s and OSX it can take several hours until it occurs, depending on what I set for flow_control. My symptom is that after several hours of running, even though Stackdriver still says there are still millions of messages in the PubSub subscription, my code doesn't consume anymore, and _load is at 0.0. If I restart the pod, it starts working again

@danoscarmike danoscarmike added triaged for GA release blocking Required feature/issue must be fixed prior to next release. labels Jan 19, 2018
@opyate
Copy link
Contributor

opyate commented Jan 20, 2018

@anorth2 please also show your subscriber setup. Mine is pretty much like the examples:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_name)

    # check if the subscription exists
    response = subscriber.get_subscription(subscription_path)
    if response:
        print('subscription exists', response.name, flush=True)
    else:
        print('no subscription', subscription_path, flush=True)

    def callback(message):
        # TODO modify ack deadline based on estimated message processing time
        message.modify_ack_deadline(600)
        payload = json.loads(message.data)
        print('subscription={}, payload={}, message={}'.format(subscription_name, payload, message), flush=True)
        assert(payload['type'] == config.WORKER_TYPE)
        try:
            processor.process(payload)
            message.ack()
        except:
            message.modify_ack_deadline(0)
            print('Error, worker_type={}, payload={}'.format(config.WORKER_TYPE, payload), flush=True)
            traceback.print_exc(file=stderr)
            stderr.flush()

    flow_control = pubsub_v1.types.FlowControl(max_messages=1)
    subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path), flush=True)
    while True:
        time.sleep(60)

Deployed to K8S, and not receiving/processing anything.

@opyate
Copy link
Contributor

opyate commented Jan 20, 2018

Digging through code; trying to see if there's debug mode to tell me if messages are being dropped, or if the consumer is stuck in pause.

UPDATE

     flow_control = pubsub_v1.types.FlowControl(max_messages=1)
-    subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
+    consumer = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)

     # The subscriber is non-blocking, so we must keep the main thread from
     # exiting to allow it to process messages in the background.
     print('Listening for messages on {}'.format(subscription_path), flush=True)
     while True:
+        print('consumer paused={}, load={}'.format(consumer._consumer.paused, consumer._load), flush=True)
         time.sleep(60)

@opyate
Copy link
Contributor

opyate commented Jan 20, 2018

Possibly related to grpc/grpc/issues/13746 ?

@anorth2
Copy link
Author

anorth2 commented Jan 30, 2018

So...any updates on this?

@ekhaydarov
Copy link

@opyate looks like youre mixing old and new pubsub. according to latest docs you need to actually call .open() after instantiating your subscriber
https://googlecloudplatform.github.io/google-cloud-python/latest/pubsub/index.html

@opyate
Copy link
Contributor

opyate commented Mar 1, 2018

@ekhaydarov the API must've changed since my comment. Thanks for flagging up the changes.

@theacodes
Copy link
Contributor

The research in #4978 was carried out on k8s. I'm closing this issue for now as not reproducable, but if it still appears when using grpcio 1.10.0rc2, let us know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. release blocking Required feature/issue must be fixed prior to next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

8 participants