-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Eventhubs blobstorage checkpointstore merge to preview3 (#7109)
- Loading branch information
1 parent
8e7e1c1
commit 13a8fe7
Showing
15 changed files
with
295 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
21 changes: 21 additions & 0 deletions
21
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
MIT License | ||
|
||
Copyright (c) Microsoft Corporation. All rights reserved. | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE |
2 changes: 2 additions & 0 deletions
2
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/MANIFEST.in
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
include *.md | ||
include azure/__init__.py |
Empty file.
5 changes: 5 additions & 0 deletions
5
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# -------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
# -------------------------------------------------------------------------------------------- | ||
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore |
5 changes: 5 additions & 0 deletions
5
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# -------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
# -------------------------------------------------------------------------------------------- | ||
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore |
5 changes: 5 additions & 0 deletions
5
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# -------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
# -------------------------------------------------------------------------------------------- | ||
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore |
12 changes: 12 additions & 0 deletions
12
...hubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# -------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
# -------------------------------------------------------------------------------------------- | ||
|
||
__version__ = "1.0.0b1" | ||
|
||
from .blobstoragepm import BlobPartitionManager | ||
|
||
__all__ = [ | ||
"BlobPartitionManager", | ||
] |
123 changes: 123 additions & 0 deletions
123
...checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# -------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
# -------------------------------------------------------------------------------------------- | ||
from typing import Iterable, Dict, Any | ||
import logging | ||
from collections import defaultdict | ||
import asyncio | ||
from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError # type: ignore | ||
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore | ||
from azure.storage.blob.aio import ContainerClient # type: ignore | ||
|
||
logger = logging.getLogger(__name__) | ||
UPLOAD_DATA = "" | ||
|
||
|
||
class BlobPartitionManager(PartitionManager): | ||
"""An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data. | ||
This class implements methods list_ownership, claim_ownership, and update_checkpoint that are defined in class | ||
azure.eventhub.eventprocessor.PartitionManager of package azure-eventhub. | ||
""" | ||
def __init__(self, container_client: ContainerClient): | ||
""" | ||
:param container_client: The Azure Blob Storage Container client. | ||
""" | ||
self._container_client = container_client | ||
self._cached_ownership_dict = defaultdict(dict) # type: Dict[str, Dict[str, Any]] | ||
# lock each partition for list_ownership, claim_ownership and update_checkpoint etag doesn't get out of sync | ||
# when the three methods are running concurrently | ||
self._cached_ownership_locks = defaultdict(asyncio.Lock) # type:Dict[str, asyncio.Lock] | ||
|
||
async def _upload_blob(self, ownership, metadata): | ||
etag = ownership.get("etag") | ||
if etag: | ||
etag_match = {"if_match": etag} | ||
else: | ||
etag_match = {"if_none_match": '*'} | ||
partition_id = ownership["partition_id"] | ||
blob_client = await self._container_client.upload_blob( | ||
name=partition_id, data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match | ||
) | ||
uploaded_blob_properties = await blob_client.get_blob_properties() | ||
ownership["etag"] = uploaded_blob_properties.etag | ||
ownership["last_modified_time"] = uploaded_blob_properties.last_modified.timestamp() | ||
|
||
async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]: | ||
try: | ||
blobs = self._container_client.list_blobs(include=['metadata']) | ||
except Exception as err: # pylint:disable=broad-except | ||
logger.warning("An exception occurred during list_ownership for eventhub %r consumer group %r. " | ||
"Exception is %r", eventhub_name, consumer_group_name, err) | ||
raise | ||
async for b in blobs: | ||
async with self._cached_ownership_locks[b.name]: | ||
if b.name not in self._cached_ownership_dict \ | ||
or b.last_modified.timestamp() >= self._cached_ownership_dict[b.name].get("last_modified_time"): | ||
metadata = b.metadata | ||
ownership = { | ||
"eventhub_name": eventhub_name, | ||
"consumer_group_name": consumer_group_name, | ||
"partition_id": b.name, | ||
"owner_id": metadata["owner_id"], | ||
"etag": b.etag, | ||
"last_modified_time": b.last_modified.timestamp() if b.last_modified else None | ||
} | ||
ownership.update(metadata) | ||
self._cached_ownership_dict[b.name] = ownership | ||
return self._cached_ownership_dict.values() | ||
|
||
async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: | ||
result = [] | ||
for ownership in ownership_list: | ||
partition_id = ownership["partition_id"] | ||
eventhub_name = ownership["eventhub_name"] | ||
consumer_group_name = ownership["consumer_group_name"] | ||
owner_id = ownership["owner_id"] | ||
|
||
async with self._cached_ownership_locks[partition_id]: | ||
metadata = {"owner_id": ownership["owner_id"]} | ||
if "offset" in ownership: | ||
metadata["offset"] = ownership["offset"] | ||
if "sequence_number" in ownership: | ||
metadata["sequence_number"] = ownership["sequence_number"] | ||
try: | ||
await self._upload_blob(ownership, metadata) | ||
self._cached_ownership_dict[partition_id] = ownership | ||
result.append(ownership) | ||
except (ResourceModifiedError, ResourceExistsError): | ||
logger.info( | ||
"EventProcessor instance %r of eventhub %r consumer group %r lost ownership to partition %r", | ||
owner_id, eventhub_name, consumer_group_name, partition_id) | ||
except Exception as err: # pylint:disable=broad-except | ||
logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for " | ||
"eventhub %r consumer group %r partition %r. The ownership is now lost. Exception " | ||
"is %r", owner_id, eventhub_name, consumer_group_name, partition_id, err) | ||
return result | ||
|
||
async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, | ||
offset, sequence_number) -> None: | ||
metadata = { | ||
"owner_id": owner_id, | ||
"offset": offset, | ||
"sequence_number": str(sequence_number) | ||
} | ||
cached_ownership = self._cached_ownership_dict[partition_id] | ||
async with self._cached_ownership_locks[partition_id]: | ||
try: | ||
await self._upload_blob(cached_ownership, metadata) | ||
except (ResourceModifiedError, ResourceExistsError): | ||
logger.info( | ||
"EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " | ||
"partition %r because the ownership has been stolen", | ||
owner_id, eventhub_name, consumer_group_name, partition_id) | ||
raise OwnershipLostError() | ||
except Exception as err: | ||
logger.warning( | ||
"EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " | ||
"partition %r because of unexpected error. Exception is %r", | ||
owner_id, eventhub_name, consumer_group_name, partition_id, err) | ||
raise # EventProcessor will catch the exception and handle it |
Empty file.
1 change: 1 addition & 0 deletions
1
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
-e ../../eventhub/azure-eventhubs |
42 changes: 42 additions & 0 deletions
42
.../azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import asyncio | ||
import logging | ||
import os | ||
from azure.eventhub.aio import EventHubClient | ||
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor | ||
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager | ||
from azure.storage.blob.aio import ContainerClient | ||
|
||
RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout | ||
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small | ||
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] | ||
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
|
||
|
||
async def do_operation(event): | ||
# do some sync or async operations. If the operation is i/o intensive, async will have better performance | ||
print(event) | ||
|
||
|
||
class MyPartitionProcessor(PartitionProcessor): | ||
async def process_events(self, events, partition_context): | ||
if events: | ||
await asyncio.gather(*[do_operation(event) for event in events]) | ||
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) | ||
else: | ||
print("empty events received", "partition:", partition_context.partition_id) | ||
|
||
|
||
if __name__ == '__main__': | ||
loop = asyncio.get_event_loop() | ||
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) | ||
container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, container="eventprocessor") | ||
partition_manager = BlobPartitionManager(container_client=container_client) | ||
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=10) | ||
try: | ||
loop.run_until_complete(event_processor.start()) | ||
except KeyboardInterrupt: | ||
loop.run_until_complete(event_processor.stop()) | ||
finally: | ||
loop.stop() |
2 changes: 2 additions & 0 deletions
2
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/sdk_packaging.toml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[packaging] | ||
auto_update = false |
2 changes: 2 additions & 0 deletions
2
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.cfg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[bdist_wheel] | ||
universal=1 |
75 changes: 75 additions & 0 deletions
75
sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
#!/usr/bin/env python | ||
|
||
#------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for | ||
# license information. | ||
#-------------------------------------------------------------------------- | ||
|
||
import re | ||
import os.path | ||
import sys | ||
from io import open | ||
from setuptools import find_packages, setup | ||
|
||
if sys.version_info < (3, 5, 3): | ||
raise RuntimeError('Only python 3.5.3 or above is supported') | ||
|
||
# Change the PACKAGE_NAME only to change folder and different name | ||
PACKAGE_NAME = "azure-eventhub-checkpointstoreblob-aio" | ||
PACKAGE_PPRINT_NAME = "Event Hubs checkpointer implementation with Blob Storage" | ||
|
||
package_folder_path = "azure/eventhub/extensions/checkpointstoreblobaio" | ||
namespace_name = "azure.eventhub.extensions.checkpointstoreblobaio" | ||
|
||
# Version extraction inspired from 'requests' | ||
with open(os.path.join(package_folder_path, '__init__.py'), 'r') as fd: | ||
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', | ||
fd.read(), re.MULTILINE).group(1) | ||
|
||
if not version: | ||
raise RuntimeError('Cannot find version information') | ||
|
||
with open('README.md') as f: | ||
readme = f.read() | ||
with open('HISTORY.md') as f: | ||
history = f.read() | ||
|
||
exclude_packages = [ | ||
'tests', | ||
'examples', | ||
# Exclude packages that will be covered by PEP420 or nspkg | ||
'azure', | ||
'azure.eventhub', | ||
'azure.eventhub.extensions', | ||
] | ||
|
||
setup( | ||
name=PACKAGE_NAME, | ||
version=version, | ||
description='Microsoft Azure {} Client Library for Python'.format(PACKAGE_PPRINT_NAME), | ||
long_description=readme + '\n\n' + history, | ||
long_description_content_type='text/markdown', | ||
license='MIT License', | ||
author='Microsoft Corporation', | ||
author_email='[email protected]', | ||
url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointerblob-aio', | ||
classifiers=[ | ||
'Development Status :: 3 - Alpha', | ||
'Programming Language :: Python', | ||
'Programming Language :: Python :: 3', | ||
'Programming Language :: Python :: 3.5', | ||
'Programming Language :: Python :: 3.6', | ||
'Programming Language :: Python :: 3.7', | ||
'License :: OSI Approved :: MIT License', | ||
], | ||
zip_safe=False, | ||
packages=find_packages(exclude=exclude_packages), | ||
install_requires=[ | ||
'azure-storage-blob<13.0.0,>=12.0.0b2', | ||
'azure-eventhub<6.0.0,>=5.0.0b2', | ||
], | ||
extras_require={ | ||
|
||
} | ||
) |