Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Release the state lock before calling the publish api #8234

Merged
merged 2 commits into from
Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 57 additions & 49 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,56 +187,63 @@ def _commit(self):
_LOGGER.debug("Batch is already in progress, exiting commit")
return

# Sanity check: If there are no messages, no-op.
if not self._messages:
_LOGGER.debug("No messages to publish, exiting commit")
self._status = base.BatchStatus.SUCCESS
return

# Begin the request to publish these messages.
# Log how long the underlying request takes.
start = time.time()

try:
response = self._client.api.publish(self._topic, self._messages)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, set the exception on all futures and
# exit.
self._status = base.BatchStatus.ERROR

for future in self._futures:
future.set_exception(exc)
# Once in the IN_PROGRESS state, no other thread can publish additional
# messages or initiate a commit (those operations become a no-op), thus
# it is safe to release the state lock here. Releasing the lock avoids
# blocking other threads in case api.publish() below takes a long time
# to complete.
plamut marked this conversation as resolved.
Show resolved Hide resolved
# https://github.com/googleapis/google-cloud-python/issues/8036

# Sanity check: If there are no messages, no-op.
if not self._messages:
_LOGGER.debug("No messages to publish, exiting commit")
self._status = base.BatchStatus.SUCCESS
return

# Begin the request to publish these messages.
# Log how long the underlying request takes.
start = time.time()

try:
response = self._client.api.publish(self._topic, self._messages)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, set the exception on all futures and
# exit.
self._status = base.BatchStatus.ERROR

for future in self._futures:
future.set_exception(exc)

_LOGGER.exception("Failed to publish %s messages.", len(self._futures))
return

end = time.time()
_LOGGER.debug("gRPC Publish took %s seconds.", end - start)

if len(response.message_ids) == len(self._futures):
# Iterate over the futures on the queue and return the response
# IDs. We are trusting that there is a 1:1 mapping, and raise
# an exception if not.
self._status = base.BatchStatus.SUCCESS
zip_iter = six.moves.zip(response.message_ids, self._futures)
for message_id, future in zip_iter:
future.set_result(message_id)
else:
# Sanity check: If the number of message IDs is not equal to
# the number of futures I have, then something went wrong.
self._status = base.BatchStatus.ERROR
exception = exceptions.PublishError(
"Some messages were not successfully published."
)

_LOGGER.exception("Failed to publish %s messages.", len(self._futures))
return
for future in self._futures:
future.set_exception(exception)

end = time.time()
_LOGGER.debug("gRPC Publish took %s seconds.", end - start)

if len(response.message_ids) == len(self._futures):
# Iterate over the futures on the queue and return the response
# IDs. We are trusting that there is a 1:1 mapping, and raise
# an exception if not.
self._status = base.BatchStatus.SUCCESS
zip_iter = six.moves.zip(response.message_ids, self._futures)
for message_id, future in zip_iter:
future.set_result(message_id)
else:
# Sanity check: If the number of message IDs is not equal to
# the number of futures I have, then something went wrong.
self._status = base.BatchStatus.ERROR
exception = exceptions.PublishError(
"Some messages were not successfully published."
)

for future in self._futures:
future.set_exception(exception)

_LOGGER.error(
"Only %s of %s messages were published.",
len(response.message_ids),
len(self._futures),
)
_LOGGER.error(
"Only %s of %s messages were published.",
len(response.message_ids),
len(self._futures),
)

def monitor(self):
"""Commit this batch after sufficient time has elapsed.
Expand All @@ -258,7 +265,8 @@ def publish(self, message):

Add the given message to this object; this will cause it to be
published once the batch either has enough messages or a sufficient
period of time has elapsed.
period of time has elapsed. If the batch is full or the commit is
already in progress, the method does not do anything.

This method is called by :meth:`~.PublisherClient.publish`.

Expand Down
33 changes: 32 additions & 1 deletion pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import threading
import time

import mock
import pytest

import google.api_core.exceptions
from google.auth import credentials
Expand All @@ -39,7 +41,7 @@ def create_batch(autocommit=False, **batch_settings):
autocommit (bool): Whether the batch should commit after
``max_latency`` seconds. By default, this is ``False``
for unit testing.
kwargs (dict): Arguments passed on to the
batch_settings (dict): Arguments passed on to the
:class:``~.pubsub_v1.types.BatchSettings`` constructor.

Returns:
Expand Down Expand Up @@ -149,6 +151,35 @@ def test_blocking__commit():
assert futures[1].result() == "b"


def test_client_api_publish_not_blocking_additional_publish_calls():
batch = create_batch(max_messages=1)
api_publish_called = threading.Event()

def api_publish_delay(_, messages):
api_publish_called.set()
time.sleep(1.0)
message_ids = [str(i) for i in range(len(messages))]
return types.PublishResponse(message_ids=message_ids)

api_publish_patch = mock.patch.object(
type(batch.client.api), "publish", side_effect=api_publish_delay
)

with api_publish_patch:
batch.publish({"data": b"first message"})

start = datetime.datetime.now()
event_set = api_publish_called.wait(timeout=1.0)
if not event_set:
pytest.fail("API publish was not called in time")
batch.publish({"data": b"second message"})
plamut marked this conversation as resolved.
Show resolved Hide resolved
end = datetime.datetime.now()

# While a batch commit in progress, waiting for the API publish call to
# complete should not unnecessariliy delay other calls to batch.publish().
assert (end - start).total_seconds() < 1.0


@mock.patch.object(thread, "_LOGGER")
def test_blocking__commit_starting(_LOGGER):
batch = create_batch()
Expand Down