Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub subscription with ack_deadline set causes HTTP 400 #1169

Closed
gtaylor opened this issue Oct 1, 2015 · 8 comments · Fixed by #1182
Closed

PubSub subscription with ack_deadline set causes HTTP 400 #1169

gtaylor opened this issue Oct 1, 2015 · 8 comments · Fixed by #1182
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API.

Comments

@gtaylor
Copy link
Contributor

gtaylor commented Oct 1, 2015

Page Name: pubsub-usage
Release: 0.7.1

This appears to return an API error:

subscription = topic.subscription('subscription_name', ack_deadline=600)

Here is what I am seeing:

...
  File "/home/greg.taylor/workspace/aclima/sig-cassandra-extractor/aclima/cass_extractor/queue.py", line 42, in run
    self.subscription.create()
  File "/home/greg.taylor/.virtualenvs/cas-e/lib/python3.4/site-packages/gcloud/pubsub/subscription.py", line 121, in create
    client.connection.api_request(method='PUT', path=self.path, data=data)
  File "/home/greg.taylor/.virtualenvs/cas-e/lib/python3.4/site-packages/gcloud/connection.py", line 419, in api_request
    error_info=method + ' ' + url)
gcloud.exceptions.BadRequest: 400 Invalid JSON payload received. Unknown name "ack_deadline": Cannot find field. (PUT https://pubsub.googleapis.com/v1/projects/aclima-gsa/subscriptions/cassandra_extractor)

If I remove the ack_deadline kwarg, all is well. We definitely want the ack_deadline, thoguh.

@dhermes
Copy link
Contributor

dhermes commented Oct 2, 2015

Thanks for filing @gtaylor!

@tseaver PTAL

@gtaylor
Copy link
Contributor Author

gtaylor commented Oct 2, 2015

Whoops, forgot to include the most important detail (that removing ack_deadilne causes things to work again).

@dhermes dhermes changed the title [Documentation Issue] PubSub subscription with ack_deadline set causes HTTP 400 PubSub subscription with ack_deadline set causes HTTP 400 Oct 2, 2015
@dhermes dhermes added the api: pubsub Issues related to the Pub/Sub API. label Oct 2, 2015
@dhermes
Copy link
Contributor

dhermes commented Oct 2, 2015

@gtaylor Your snippet is only

subscription = topic.subscription('subscription_name', ack_deadline=600)

but in your stacktrace it looks like the error comes from

self.subscription.create()

presumably elsewhere, where a Subscription object has already been instantiated and set as self.subscription.

@gtaylor
Copy link
Contributor Author

gtaylor commented Oct 2, 2015

Here's the whole class:

import logging

from gcloud import pubsub
from gcloud.exceptions import Conflict
from oauth2client.client import GoogleCredentials

logger = logging.getLogger(__name__)


class CassEQueue(object):
    """
    A convenience class that facilitates (but does not fully abstract away)
    Google Cloud PubSub.
    """
    def __init__(self, project_id, topic_name, subscription_name,
                 ack_deadline):
        """
        :param str project_id: The Google Cloud project name that the
            PubSub topic resides in.
        :param str topic_name: The name of the PubSub topic in Google Cloud.
        :param str subscription_name: The name of the PubSub topic
            subscription in Google Cloud.
        :param int ack_deadline: If a job isn't completed and acknowledged
            within this amount of time, the job ends up re-scheduled.
        """
        self.project_id = project_id
        self.topic_name = topic_name
        self.subscription_name = subscription_name
        self.ack_deadline = int(ack_deadline)
        self.client = None
        self.topic = None
        self.subscription = None

    def run(self):
        """
        Authenticates with Google Cloud, retrieve and/or creates the
        PubSub topics and subscriptions, and gets us ready for action.
        """
        credentials = GoogleCredentials.get_application_default()
        self.client = pubsub.Client(
            project=self.project_id, credentials=credentials)

        self.topic = self.client.topic(self.topic_name)
        try:
            self.topic.create()
        except Conflict:
            logger.info("Using existing PubSub topic '%s'.", self.topic_name)
        else:
            logger.info("Created new PubSub topic '%s'", self.topic_name)

        self.subscription = self.topic.subscription(
            name=self.subscription_name,
            # Was getting an HTTP 400 error with this set. Seems to be some
            # kind of bug. Re-add when this gets resolved.
            # ack_deadline=self.ack_deadline
        )
        try:
            self.subscription.create()
        except Conflict:
            logger.info(
                "Using existing PubSub subscription '%s'.", self.topic_name)
        else:
            logger.info("Created new PubSub subscription '%s'", self.topic_name)

    def push_job(self, keyspace_name, table_name, loc_id, start_time):
        """
        Pushes a job to PubSub.

        :param str keyspace_name: The keyspace to work within.
        :param str table_name: The Cassandra table's that we are
            extracting from.
        :param int loc_id: The location ID to extract from.
        :param datetime.datetime start_time: The start time to SELECT by.
        """
        self.topic.publish(
            message='extract-and-store'.encode('utf-8'),
            keyspace_name=keyspace_name,
            table_name=table_name,
            loc_id=str(loc_id),
            start_time=start_time.isoformat(),
        )

    def pull_job(self, block=True):
        """
        Pulls a job from PubSub.

        :keyword bool block: If True, execution will block on this call
            until a job is pulled. If False, we return immediately (even if
            empty-handed).
        :rtype: tuple
        :return: A tuple in the form of (ack_id, message), where ``ack_id``
            is a string ID that we must ack back upon job completion, and
            ``message`` is an ``gcloud.pubsub.message.Message`` instance.
        """
        job = self.subscription.pull(return_immediately=not block)
        return job[0]

    def acknowledge_job(self, ack_id):
        """
        When we have successfully processed a job, we must acknowledge
        completion so it doesn't get auto-requeued.

        :param str ack_id: The job's ack ID.
        """
        if not isinstance(ack_id, list):
            job_id = [ack_id]
        self.subscription.acknowledge(job_id)

    def health_check(self):
        """
        This gets called when we want to verify that authentication and
        connectivity is good between cass_extractor and Google PubSub.

        :rtype: bool
        :return: True if everything is fine.
        :raises: All sorts of scary stuff if something is amiss.
        """
        self.client.list_topics(page_size=1)
        return True

@dhermes
Copy link
Contributor

dhermes commented Oct 2, 2015

Thanks. That confirms what I was asking.

I notice you're using except Conflict for existing objects. Do you find this a useful way to detect the conflict or would you prefer something else (i.e. maybe a feature)?

@gtaylor
Copy link
Contributor Author

gtaylor commented Oct 2, 2015

I don't have strong feelings, as long as Conflict doesn't mean anything else in this context (I haven't verified that at all, so I may be hosed). If I have some kind of unique exception to catch if the topic/subscription already exists, I'm going to be pretty happy.

I recall boto silently eating the exception for stuff like create_bucket() if it already existed, but that makes it harder for a third party to come in and understand quickly.

@dhermes
Copy link
Contributor

dhermes commented Oct 2, 2015

Conflict only happens when the backend returns a 409 (which is a the Conflict status code) so no hosing going on here.

@tseaver
Copy link
Contributor

tseaver commented Oct 14, 2015

@gtaylor I can see a bug (passing ackDeadline instead of ackDeadlineSeconds in the create API request), but it wouldn't produce the traceback you show above: is it possible you accidentally "fixed" it to show ack_deadline instead of ackDeadline?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants