Skip to content

Commit

Permalink
[EventHubs] merge pyamqp to main (#27763)
Browse files Browse the repository at this point in the history
* [EventHub] basic receive event scenario with pyamqp (#19748)

* initial changes for receiving

* undo __init__ aio

* vendor

* remove c/pyx files

* adams comments

* [EventHubs & AMQP Python] Send Port (#19745)

* draft send port

* copy and paste code changes in amqp

* simple stress test scripts for sending and receiving

* review feedbacks

* [EventHubs&AMQP Python] Port amqp send large message (#19937)

* port amqp send large message

* send perf test in parallel

* use context manager for executor

* add throughput

* improve test code

* update test matrix

* update test

* handle server busy

* fix timeout

* precision to 2 decimal points

* update pyamqp changes and update tests

* [EventHubs] Pure Python AMQP Sycn Implementation Integration (#22397)

* copy amqp changes

* eh python amqp integration

* fix time unit

* rename module pyamqp to _pyamqp

* more pyamqp to _pyamqp

* simplify todo

* [EH Pyproto] Release preparation (#22433)

* cherry pick changes

* update docs

* cherry pick fixed retry PR

* minor fix

* fix mypy, pylint, brokenlink

* update doc

* opt out mypy/pylint/api stub

* try opt out checkpointstore in ci and test

* fix

* more fixes

* furuther opt out tests

* update tests

* bump version

* fix __str__

* add test play holder

* ignore azure checkpoinstore aio in ci

Co-authored-by: swathipil <[email protected]>

* update readme to drop uamqp

* revert async tests

* revert aio module in eventhub

* [EH Pyproto] Async support (#22957)

* async port

* add scripts for tests

* update async perf test scripts

* fix test scripts

* amqp implementation update + eh update + sync perf test scripts

* update pyamqp and eh async impl and test scripts

* update pyamqp async impl

* fix bug

* fix pyamqp transport ssl setting and asyncio exception module import

* use ensure future for 3.6

* update token generation to return bytes to avoid breaking changes

* update docs

* Increment version for eventhub releases (#22994)

Increment package version after release of azure-eventhub

* [EH Pyproto] Async recv perf improvement (#23122)

* stop spawning too much coroutines

* improve send

* async recv perf improvement

* async perf improve

* update version

* align with sync imple

* update method name

* remove redundant except catch

* [EH Pyproto] Release updates (#23349)

* update docs

* add todo

* Increment version for eventhub releases (#23420)

Increment package version after release of azure-eventhub

* AMQP websocket implementation (#23722)

* Initial implementation

* http proxy support

* change impl

* more changes

* working sol

* async impl

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* more changes

* sasl mixin

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py

* refactor

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* oops

* comments

* comment

* Apply suggestions from code review

Co-authored-by: swathipil <[email protected]>

* comments

* changes

* async test

* rasie

* lint

* changelog

* version

* comments

* move path to EH

Co-authored-by: swathipil <[email protected]>

* Revert "AMQP websocket implementation (#23722)" (#24344)

This reverts commit 0123f4d.

* AMQP websocket implementation (#24345)

* Initial implementation

* http proxy support

* change impl

* more changes

* working sol

* async impl

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* more changes

* sasl mixin

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py

* refactor

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* oops

* comments

* comment

* Apply suggestions from code review

Co-authored-by: swathipil <[email protected]>

* comments

* changes

* async test

* rasie

* lint

* changelog

* version

* comments

* move path to EH

* Fix typo

Co-authored-by: swathipil <[email protected]>

* [EventHub] basic receive event scenario with pyamqp (#19748)

* initial changes for receiving

* undo __init__ aio

* vendor

* remove c/pyx files

* adams comments

* [EventHubs & AMQP Python] Send Port (#19745)

* draft send port

* copy and paste code changes in amqp

* simple stress test scripts for sending and receiving

* review feedbacks

* [EventHubs&AMQP Python] Port amqp send large message (#19937)

* port amqp send large message

* send perf test in parallel

* use context manager for executor

* add throughput

* improve test code

* update test matrix

* update test

* handle server busy

* fix timeout

* precision to 2 decimal points

* update pyamqp changes and update tests

* [EventHubs] Pure Python AMQP Sycn Implementation Integration (#22397)

* copy amqp changes

* eh python amqp integration

* fix time unit

* rename module pyamqp to _pyamqp

* more pyamqp to _pyamqp

* simplify todo

* [EH Pyproto] Release preparation (#22433)

* cherry pick changes

* update docs

* cherry pick fixed retry PR

* minor fix

* fix mypy, pylint, brokenlink

* update doc

* opt out mypy/pylint/api stub

* try opt out checkpointstore in ci and test

* fix

* more fixes

* furuther opt out tests

* update tests

* bump version

* fix __str__

* add test play holder

* ignore azure checkpoinstore aio in ci

Co-authored-by: swathipil <[email protected]>

* update readme to drop uamqp

* revert async tests

* revert aio module in eventhub

* [EH Pyproto] Async support (#22957)

* async port

* add scripts for tests

* update async perf test scripts

* fix test scripts

* amqp implementation update + eh update + sync perf test scripts

* update pyamqp and eh async impl and test scripts

* update pyamqp async impl

* fix bug

* fix pyamqp transport ssl setting and asyncio exception module import

* use ensure future for 3.6

* update token generation to return bytes to avoid breaking changes

* update docs

* Increment version for eventhub releases (#22994)

Increment package version after release of azure-eventhub

* [EH Pyproto] Async recv perf improvement (#23122)

* stop spawning too much coroutines

* improve send

* async recv perf improvement

* async perf improve

* update version

* align with sync imple

* update method name

* remove redundant except catch

* [EH Pyproto] Release updates (#23349)

* update docs

* add todo

* Increment version for eventhub releases (#23420)

Increment package version after release of azure-eventhub

* AMQP websocket implementation (#23722)

* Initial implementation

* http proxy support

* change impl

* more changes

* working sol

* async impl

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* more changes

* sasl mixin

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py

* refactor

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* oops

* comments

* comment

* Apply suggestions from code review

Co-authored-by: swathipil <[email protected]>

* comments

* changes

* async test

* rasie

* lint

* changelog

* version

* comments

* move path to EH

Co-authored-by: swathipil <[email protected]>

* Revert "AMQP websocket implementation (#23722)" (#24344)

This reverts commit 0123f4d.

* AMQP websocket implementation (#24345)

* Initial implementation

* http proxy support

* change impl

* more changes

* working sol

* async impl

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* more changes

* sasl mixin

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py

* refactor

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

* oops

* comments

* comment

* Apply suggestions from code review

Co-authored-by: swathipil <[email protected]>

* comments

* changes

* async test

* rasie

* lint

* changelog

* version

* comments

* move path to EH

* Fix typo

Co-authored-by: swathipil <[email protected]>

* remove extra SR related code

* update docs + type hints

* fixing failing tests

* [EventHubs] merge working websocket changes to feature branch (#24444)

* adam's working changes

* Adding back Rakshith's websocket changes (#24410)

* Adding back Rakshith's sync websocket changes

* fix async send and receive

* fix transport bugs

* add websocket to dev reqs + async fix hostname

* thank you kashif

* fix tests + turn on websocket tests

* update consumer test timing

* fix merge bugs + remove pyamqp specific tests

* update sleep time in test

* enable live test for ws receive

* fix to create Batch properly

* [eventhub] Websocket timeout error exception thrown (#24504)

* sync websocket timeout to operationtimeout, not changing other transport types just yet

* async websocket to operation timeout

* upstream

* default timeout 1.0

* default timeout 1

* throwing a socket timeout, operationtimeout was throwing out a real error

* replacing socket.timeout with TimeoutError, added into except statments as well

* timeout is inherited from oserror, dont need both

* test timeoutexception throwing errors

* deafult timeout to 1

* [eventhub] websocket default timeout fix (#24565)

* websocket timeout fix

* timeout interval for both ssl and webscoket

* [eventhub] Custom Endpoint  (#24505)

* sync ce

* async ce

* add string ending

* only pass to transport

* running into same recieve issue with sync

* fixing async - needs to pass to sasl

* remove logger

* stopping here

* adding prefix to fix sample

* add in prefetch

* fixing transport remove print

* host being overriden

* removing trace

* fix to use url async

* aligning sync/async pattern

* removing uneeded hostname switch

* string formatting

* changelog

* adding docstrings for supported events

* pr comments refactoring sync

* mirroring on async

* pr comment docstring

* removing import

* missing _

* missing ssl

* if no port given, we use default set in config

* async of same ^

* add default port in connection stage if port is none

* adding in docstring to cliet/connection string constructor

* custom_endpoint_address in client base async to match sync

* fix import on websocket test

* fix import 2

* skipping tests

* removing import

* pytest.mark

* [EventHubs] pyproto - update release date + docs (#24723)

* add async doc rst file

* Increment version for eventhub releases (#24753)

Increment package version after release of azure-eventhub

* changes to update status (#25024)

* updating codeowners file in pyproto feature

* [Eventhub] pyamqp prefetch fix (#24890)

* prefetch fix

* adding async - sorry!

* async

* Use --no-cone in pipeline sparse checkout script (#25165) (#25208)

Co-authored-by: Ben Broderick Phillips <[email protected]>

Co-authored-by: Azure SDK Bot <[email protected]>
Co-authored-by: Ben Broderick Phillips <[email protected]>

* [AMQP Python] Eventhub Pyamqp tests (#24895)

* starting tests

* updates to websocket sync

* moving around format - unittest and live test

* live test + unittests starting

* websocket async passing

* eol

* assert not return

* assert not return

* fixed assert

* auth tests

* auth unittest pyamqp

* replicating uamqp tests

* keep_alive_thread

* skip for now - no keep alive

* pickle/deepcopy, might not want to keep all

* stopping here for now - need tls on rabbitmq

* cleaning up tests - pickle

* removing and editing uneeded tests

* removing unused test

* added receive amqp tests

* exceptions with pytest.raises, not live

* moving around tests

* testing mgmt calls like _start_producer

* Use --no-cone in pipeline sparse checkout script (#25165)

Co-authored-by: Ben Broderick Phillips <[email protected]>

* unused imports

Co-authored-by: Azure SDK Bot <[email protected]>
Co-authored-by: Ben Broderick Phillips <[email protected]>

* [Pyamqp] Fix network logging trace in client_base (#25218)

* pass in right kwarg for network tracing

* remove client changes. Another PR

* reverting link credit for now (#25310)

* [Pyamqp] Pyampq debug build Linkedin (#25296)

* enhanced logging for linkedin

* stuff

* fixes

* minor sample fix

* Changelog

* remove unused imports

* fix formatting changes

* change debug level

* update version info

* update changelog

* fix sample

* fix logging message for empty access token

* Increment version for eventhub releases (#25320)

Increment package version after release of azure-eventhub

* removing duplicate (#25321)

* add async unit tests (#25396)

* reset logging level (#25588)

* [Pyamqp] Remember Proxy Params (#25564)

* fix to keep proxy params

* async changes for proxy

* unit tests

* changes

* more changes

Co-authored-by: swathipil <[email protected]>

Co-authored-by: swathipil <[email protected]>

* [Pyamqp] Intial TODOS Clean Up (#25630)

* set default SSL version

* dont need it for EH & SB

* wont impact us, check mtg notes

* decode error wont affect us

* address in SB PR

* change language on logger

* it does close socket

* no other closes needed

* keep for better tracking

* close w/ error when max frame size is invalid

* detach links on session end

* clean up links on session outgoing_end

* reject link by detaching

* uncomment logging for later review

* reject the link that was set in the try

* [Pyamqp] test fixes for pipeline (#25749)

* test fixes for pipeline

* connect to EHerror

* [PyAMQP] Stress testing reform (#25770)

* reform stress test

* reform stress test

* updating stress test format

* can specify azure_identity on producer

* epoch level sync

* removing commands

* aligning sync/async

* pyamqp logging

* uamqp to pyamqp

* import remove

* fixing deploy commands

* values file

* checking eng/ file resources

* remove version

* moving location of this folder

* imagepullpolicy, azure_identity for consumer

* [PyAMQP] Updating pyamqp with SB changes  (#25804)

* pyamqp diff from anna's branch

* mgmt_request returns code, response, desc

* mgmt_request returns code, response, desc async

* handle error being thrown

* handle error being thrown 2

* handle error being thrown 3

* Fixed error path

* fix error path

Co-authored-by: Anna Tisch <[email protected]>
Co-authored-by: Kashif Khan <[email protected]>

* removing buff producer to add back in later

* update cspell

* fix uppercase link in doc/dev/issues/resolve_issues_effectively

* [PyAMQP] Connections TODO (#26018)

* protocol errors

* fix formatting

* [PyAMQP] Kashif Client refactoring changes (#25451)

Linked to
link #22051
for a TODO about typing in docstrings

* need to use pyamqp (#25895)

* [EventHubs] add amqp switch support (#25965)

This PR is for adding switch support to the `feature/eventhub/pyproto` changes including the uamqp switch from current `main`.
fixes #21246
Addressing Anna's comments from uamqp switch PR (#25193) + main changes:
- [x] Moving pyamqp logic out to the PyamqpTransport
- [x] **Confirmed: The size of encoded pyamqp.Message is larger than uamqp.Message.** I thought otherwise b/c I was adding the header/property objects by default when building the outgoing uamqp message, even if all values inside those are None. I've fixed this.
- [ ] Make BatchMessage transport agnostic: #25494 (comment)
  - Instead, updated EventDataBatch so that it takes an amqp_transport. If EventDataBatch is manually created and uses PyamqpTransport, inside `send_batch()`, if the producer client transport uses UamqpTransport, the BatchMessage corresponding to the client's amqp_transport will be built and sent.
- [X] Add `message` property to `EventData`/`EventDataBatch`, which return `LegacyMessage`/`LegacyMessageBatch` from `_pyamqp` for backcompat.
- [x] add `connection=None` parameter to `pyamqp.AMQPClient.open()` as per [this discussion](#25494 (comment))
- [x] Add an async SharedConnectionManager in pyamqp: #25494 (comment)

Issue created [[here](#25875)] to address the below TODOs in a separate PR:
- [x] add TODO in pyamqp that SenderClient should take msg_timeout: #25494 (comment)
- [x] add TODO in pyamqp that ReceiveClient should take timeout: #25494 (comment)
- [x] add both MAX_MESSAGE_LENGTH_BYTES and MAX_FRAME_SIZE_BYTES to pyamqp: #25494 (comment)

TODO:
- [ ] fix mypy/pylint issues
  - made partial progress. fix rest in separate PR for issue: #25936
- [x] investigate how to remove `message` property from public API.
  - mark as deprecated and log a deprecation warning if accessed

* prep release

* [Pyamqp] Exception Todos (#25893)

* throw proper exception and error condition on CBS

* error condition for ready

* close when open is on non-zero channel

* error condition is proper for timeout

* unattached handle

* max handles error condition

* fix strings

* fix

* end the session on unattached handle

* remove TODO

* conditions are proper

* raise link error on close or detach

* revert change

* remove TODO

* change to client error

* change to AmqpError

* change to AMQPError

* detach called

* detach the link, dont close the connection

* comments clean up + change condition

* handle invalid channel on end

* descriptive error message

* fix formatting

* detach err message on max handle

* [Pyamqp] Async WS implementation using a pure async library (#26234)

* init changes for aiohttp ws

* fixes + formatting

* fixes for context manager

* changes

* ssl options

* clean up

* move build opts in to mixin

* pass in proxy information

* attach port to proxy if given

* address comments

* remove self from proxy host

* [SB PyAMQP] Servicebus PyAMQP Working off of Anna's Branch (#24975)

* Added pyamqp

* Added message compatibility tests

* Start rewiring messages for pyamqp

* Added message backcompat layer

* Successful message send

* Started receiver

* Successful message receive

* Message settlement

* Fix other settlement outcomes

* Make tests live

* message partition_key if it can't be decoded - output value

* removing references to __future___ annotations for now - not supported in 3.6

* comparing name of transport - not the object

* passing in a dummy frame for new formatting of SBMessageReceived

* adding in fake frame for message in queue tests

* uamqp_mesage -> uamqp_message

* state should be auth_state

* switching this back - _message is Message

* Improved typing

* Revert "Improved typing"

This reverts commit aeffcb2.

* Fix TransportType enum

* Fix import statement

* Fix application property encoding

* Skip queue iterator tests

* Fix mgmt op timeout

* Fixes to mgmt link

* Fix frame decode tests

* More mgmt fixes

* Some message fixes

* Fix session filters

* Message tests

* Skip more iterator tests

* Update to retry policy

* adding in support for websockets  is CE supported?

* fixing up pylint-still some issues

* some more pylint/TODOs

* pylint changes

* fixing pylint

* more pylint connection

* More test fixes

* Fix scheduling

* Fix retry test

* Fix error handling

* Sender refactor for timeout

* Fix link detach

* Fixed receiver control flow

* Update pyamqp async code

* Updated sb async

* Typing fix

* Some async fixes

* Skip async iter tests

* Workaround socket timeout

* Literal import

* More async test fixes

* Added keepalive

* Pylint cleanup

* fix mypy errors in _pyamqp

* fix mypy sb layer

* fix bug

* unused import

* lint

* fix failing tests

* ignore sb iterator receive samples

Co-authored-by: antisch <[email protected]>
Co-authored-by: swathipil <[email protected]>

* [Pyamqp] Pyamqp fix conn (#26568)

* remove unnecessary pops

* fix var name + remove unnecessary pop

* fix

* Bring in changes to fix bandit from failing

* [EventHubs&ServiceBus] merge sb and eh pyamqp (#26548)

* merge sb and eh pyamqp

* reenable pylint for EH

* turn on mypy for EH

* fix mypy errors eh layer

* fix EH mypy/pylint

* fix SB failing tests

* fix more tests/mypy

* import literal from typing extensions

* remove whitespace

* fix typing cast bug in EH

* lint

* fix port url in async transport

* kashifs comments

* mypy/lint/kashifs comments

* [Pyamqp] Fix Async Invalid Host Error Test (#26595)

* resert transport to original state

* add in missing continue in except

* Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py

Co-authored-by: swathipil <[email protected]>

Co-authored-by: swathipil <[email protected]>

* fix merge conflict stress

* update cspell, ignore tables spelling

* skip connection verify tests sb for now

* fix logging formatting (#26682)

* copy over kashifs change to sb

* [ServiceBus&EventHubs] pyamqp - update EH/SB docs for release (#26741)

updating docs to prepare for release

* [ServiceBus&EventHubs] fix mypy/pylint (#26744)

* fix mypy/pylint

* bump SB version

* prep release alpha (#26755)

* Increment version for servicebus releases (#26762)

Increment package version after release of azure-servicebus

* Increment version for eventhub releases (#26766)

Increment package version after release of azure-eventhub

* [PyAMQP] Fix logging (#26785)

* make logging network trace a debug level log

* async changes

* await async sleep (#26853)

* fix for async socket (#26852)

* [Pyamqp] Possible solutions for network disruption using async websocket (#26856)

* heartbeat

* constant value for heartbeat

* address comments

* [Pyamqp] Fix to Improve Websocket Sync and Async Network Disruption Handling (#27006)

* changes

* fix lint

* remove prints

* remove unused import

* remove heartbeat from this PR

* [pyAMQP] Stress fixes for aiohttp and valueError (#27034)

* changes

* fix lint

* remove prints

* remove unused import

* remove heartbeat from this PR

* OS Error to catch client closed session error

* removing value error raise, change to logging -- causes error on network disconnect

* when looping on open clinet

* change to printf style

Co-authored-by: Kashif Khan <[email protected]>

* [Pyamqp] Changes for blocking exceptions (#27260)

* changes for blocking exceptions

* fix hang on unit test

* lint fixes

* fix bug for closes

* fixes

* lint issues

* Stress testing updates (#27456)

* can uncomment line 5 to run against git version of pyamqp

* can move line 6 into scripts/dev_requirement file

* test against newest version of pyamqp

* increase test time, get rid of unused tests

* removing test names

* change naming

* change naming

* raise logging level to catch only error level

* return logging to info

* adding resource requests

* message retention needs to last as long as the test

* websocket async test

* changing life of messages

* 32 partitions

* add uamqp flag - remove logging

* adding before trying matrix

* websocket dep

* helm ignore

* updating

* updating tests

* update consumer files

* remove log lines

* remove log lines

* remove commented

* update

* [EventHubs] kwargs/error testing (#27065)

* adding tests

* add auth/connection tests + fixes

* fix connection verify error handling

* revert consumer retry change

* call ws close in sync transport

* typo

* fix ws exc import

* fix async transport

* fix link detach vendor error exception parity

* add operationtimeouterror

* add more negative tests

* annas comments + lint

* lint + tests

* add ids for uamqp vs pyamqp tests

* update tests

* skip macos tests

* [EventHubs] check for any non-None values in amqp header/properties (#27444)

* add any method to amqp header/properties

* use count(None) to check non-None header/props vals

* [EventHubs] update sync receive client ready flow (#27411)

* update sync receive client ready flow

* lint + mypy

* fix reconnect test

* [pyamqp] os error add (#27351)

* os error add

* update except statement

* test mock of receive_bytes on pipeline

* client os error

* mock try 2

* pylint

* fix patch line

* fixed mock

* newline

* fixing spacing

* unused import

* fix error mssgs

* changes from perf run (#27703)

* Matrix Gen Stress Tests (#27754)

* stress matrix gen

* removing unused dockerfiles for now

* revert sb to main

* revert non-eh files

* restore samples/readmes to main

* restore ci/tests/shared reqs to main

* update to stable

* lint + fix tests for no uamqp import

* update test timeout

* re-organize changelog

Co-authored-by: Adam Ling (MSFT) <[email protected]>
Co-authored-by: Azure SDK Bot <[email protected]>
Co-authored-by: Rakshith Bhyravabhotla <[email protected]>
Co-authored-by: Kashif Khan <[email protected]>
Co-authored-by: Libba Lawrence <[email protected]>
Co-authored-by: Kashif Khan <[email protected]>
Co-authored-by: Ben Broderick Phillips <[email protected]>
Co-authored-by: Anna Tisch <[email protected]>
  • Loading branch information
9 people authored Dec 13, 2022
1 parent a8041f5 commit d111b55
Show file tree
Hide file tree
Showing 132 changed files with 17,954 additions and 1,057 deletions.
65 changes: 62 additions & 3 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 5.10.2 (Unreleased)
## 5.11.0 (Unreleased)

### Features Added

Expand Down Expand Up @@ -88,9 +88,66 @@ This version and all future versions will require Python 3.7+, Python 3.6 is no

## 5.9.0b1 (2022-02-09)

- The following features have been temporarily pulled out of async `EventHubProducerClient` and `EventHubConsumerClient` which will be added back in future previews as we work towards a stable release:
- Passing the following keyword arguments to the constructors and `from_connection_string` methods of the `EventHubProducerClient` and `EventHubConsumerClient` is not supported: `transport_type`, `http_proxy`, `custom_endpoint_address`, and `connection_verify`.

## 5.8.0b2 (2022-10-11)

### Features Added

- Updated the optional dependency for async transport using AMQP over WebSocket from `websocket-client` to `aiohttp` (Issue #24315, thanks @hansmbakker for the suggestion).

## 5.8.0b1 (2022-09-22)

This version and all future versions will require Python 3.7+. Python 3.6 is no longer supported.

### Other Changes

- Added the `uamqp_transport` optional parameter to the clients, to allow switching to the `uamqp` library as the transport.

## 5.8.0a5 (2022-07-19)

### Bugs Fixed

- Fixed bug that prevented token refresh at regular intervals.
- Fixed bug that was improperly passing the debug keyword argument, so that network trace debug logs are output when requested.

### Other Changes

- Added logging added in to track proper token refreshes & fetches, output exception reason for producer init failure.

## 5.8.0a4 (2022-06-07)

### Features Added

- Added support for connection using websocket and http proxy.
- Added support for custom endpoint connection over websocket.

## 5.8.0a3 (2022-03-08)

### Other Changes

- Improved the performance of async sending and receiving.

## 5.8.0a2 (2022-02-09)

### Features Added

- The classmethod `from_message_data` has been added to `EventData` for interoperability with the Schema Registry Avro Encoder library, and takes `data` and `content_type` as positional parameters.
- Added support for async `EventHubProducerClient` and `EventHubConsumerClient`.

## 5.8.0a1 (2022-01-13)

Version 5.8.0a1 is our first efforts to build an Azure Event Hubs client library based on pure python implemented AMQP stack.

### Breaking changes

- The following features have been temporarily pulled out which will be added back in future previews as we work towards a stable release:
- Async is not supported.
- Passing the following keyword arguments to the constructors and `from_connection_string` methods of the `EventHubProducerClient` and `EventHubConsumerClient` is not supported: `transport_type`, `http_proxy`, `custom_endpoint_address`, and `connection_verify`.

### Other Changes

- uAMQP dependency is removed.

## 5.7.0 (2022-01-12)

Expand Down Expand Up @@ -598,4 +655,6 @@ Version 5.0.0b1 is a preview of our efforts to create a client library that is u
- Further testing and minor bug fixes.


![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhub/HISTORY.png)
## 0.2.0a2 (2018-04-02)

- Updated uAQMP dependency.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
max_message_size_on_link: int,
executor: ThreadPoolExecutor,
*,
amqp_transport: AmqpTransport,
max_buffer_length: int,
max_wait_time: float = 1
):
Expand All @@ -50,10 +51,11 @@ def __init__(
self._max_message_size_on_link = max_message_size_on_link
self._check_max_wait_time_future = None
self.partition_id = partition_id
self._amqp_transport = amqp_transport

def start(self):
with self._lock:
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
self._running = True
if self._max_wait_time:
self._last_send_time = time.time()
Expand Down Expand Up @@ -113,12 +115,12 @@ def put_events(self, events, timeout_time=None):
self._buffered_queue.put(self._cur_batch)
self._buffered_queue.put(events)
# create a new batch for incoming events
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
except ValueError:
# add single event exceeds the cur batch size, create new batch
with self._lock:
self._buffered_queue.put(self._cur_batch)
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
self._cur_batch.add(events)
with self._lock:
self._cur_buffered_len += new_events_len
Expand Down Expand Up @@ -197,7 +199,7 @@ def flush(self, timeout_time=None, raise_error=True):
self._last_send_time = time.time()
#reset buffered count
self._cur_buffered_len = 0
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
_LOGGER.info("Partition %r finished flushing.", self.partition_id)

def check_max_wait_time_worker(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from ..exceptions import EventDataSendError, ConnectError, EventHubError

if TYPE_CHECKING:
from .._producer_client import SendEventTypes
from .._transport._base import AmqpTransport
from .._producer_client import SendEventTypes

_LOGGER = logging.getLogger(__name__)

Expand All @@ -31,6 +31,7 @@ def __init__(
eventhub_name: str,
max_message_size_on_link: int,
*,
amqp_transport: AmqpTransport,
max_buffer_length: int = 1500,
max_wait_time: float = 1,
executor: Optional[Union[ThreadPoolExecutor, int]] = None
Expand All @@ -47,6 +48,7 @@ def __init__(
self._max_wait_time = max_wait_time
self._max_buffer_length = max_buffer_length
self._existing_executor = False
self._amqp_transport = amqp_transport

if not executor:
self._executor = ThreadPoolExecutor()
Expand Down Expand Up @@ -88,6 +90,7 @@ def enqueue_events(
executor=self._executor,
max_wait_time=self._max_wait_time,
max_buffer_length=self._max_buffer_length,
amqp_transport = self._amqp_transport,
)
buffered_producer.start()
self._buffered_producers[pid] = buffered_producer
Expand Down
59 changes: 39 additions & 20 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import collections
from typing import Any, Dict, Tuple, List, Optional, TYPE_CHECKING, cast, Union
try:
from typing import TypeAlias
from typing import TypeAlias # type: ignore
except ImportError:
from typing_extensions import TypeAlias
from datetime import timedelta
Expand All @@ -25,11 +25,15 @@
from azure.core.utils import parse_connection_string as core_parse_connection_string
from azure.core.pipeline.policies import RetryMode


from ._transport._uamqp_transport import UamqpTransport
try:
from ._transport._uamqp_transport import UamqpTransport
except ImportError:
UamqpTransport = None # type: ignore
from ._transport._pyamqp_transport import PyamqpTransport
from .exceptions import ClientClosedError
from ._configuration import Configuration
from ._utils import utc_from_timestamp, parse_sas_credential, generate_sas_token
from ._utils import utc_from_timestamp, parse_sas_credential
from ._pyamqp.utils import generate_sas_token
from ._connection_manager import get_connection_manager
from ._constants import (
CONTAINER_PREFIX,
Expand All @@ -43,8 +47,14 @@

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential
from uamqp import Message as uamqp_Message
from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth
try:
from uamqp import Message as uamqp_Message
from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth
except ImportError:
uamqp_Message = None
uamqp_JWTTokenAuth = None
from ._pyamqp.message import Message
from ._pyamqp.authentication import JWTTokenAuth

_LOGGER = logging.getLogger(__name__)
_Address = collections.namedtuple("_Address", "hostname path")
Expand Down Expand Up @@ -165,7 +175,7 @@ def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
if retry_mode == RetryMode.Fixed:
backoff_value = backoff_factor
else:
backoff_value = backoff_factor * (2**retried_times)
backoff_value = backoff_factor * (2 ** retried_times)
return min(backoff_max, backoff_value)


Expand Down Expand Up @@ -262,6 +272,7 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
return AccessToken(signature, expiry)


# separate TYPE_CHECKING block here for EventHubSharedKeyCredential, o/w mypy raised error even with forward referencing
if TYPE_CHECKING:
from azure.core.credentials import TokenCredential

Expand All @@ -281,8 +292,10 @@ def __init__(
credential: CredentialTypes,
**kwargs: Any,
) -> None:
uamqp_transport = kwargs.pop("uamqp_transport", True)
self._amqp_transport = kwargs.pop("amqp_transport", UamqpTransport)
uamqp_transport = kwargs.pop("uamqp_transport", False)
if uamqp_transport and not UamqpTransport:
raise ValueError("To use the uAMQP transport, please install `uamqp>=1.6.0,<2.0.0`.")
self._amqp_transport = kwargs.pop("amqp_transport", UamqpTransport if uamqp_transport else PyamqpTransport)

self.eventhub_name = eventhub_name
if not eventhub_name:
Expand All @@ -305,7 +318,10 @@ def __init__(
**kwargs,
)
self._debug = self._config.network_tracing
self._conn_manager = get_connection_manager(**kwargs)
kwargs["custom_endpoint_address"] = self._config.custom_endpoint_address
self._conn_manager = get_connection_manager(
amqp_transport=self._amqp_transport,
**kwargs)
self._idle_timeout = kwargs.get("idle_timeout", None)

@staticmethod
Expand All @@ -322,7 +338,7 @@ def _from_connection_string(conn_str, **kwargs):
kwargs["credential"] = EventHubSharedKeyCredential(policy, key)
return kwargs

def _create_auth(self) -> uamqp_JWTTokenAuth:
def _create_auth(self) -> Union[uamqp_JWTTokenAuth, JWTTokenAuth]:
"""
Create an ~uamqp.authentication.SASTokenAuth instance
to authenticate the session.
Expand Down Expand Up @@ -381,7 +397,7 @@ def _backoff(
raise last_exception

def _management_request(
self, mgmt_msg: uamqp_Message, op_type: bytes
self, mgmt_msg: Union[uamqp_Message, Message], op_type: bytes
) -> Any:
# pylint:disable=assignment-from-none
retried_times = 0
Expand All @@ -401,26 +417,29 @@ def _management_request(
mgmt_msg.application_properties[
"security_token"
] = self._amqp_transport.get_updated_token(mgmt_auth)
response = self._amqp_transport.mgmt_client_request(
status_code, description, response = self._amqp_transport.mgmt_client_request(
mgmt_client,
mgmt_msg,
operation=READ_OPERATION,
operation_type=op_type,
status_code_field=MGMT_STATUS_CODE,
description_fields=MGMT_STATUS_DESC,
)
status_code = int(response.application_properties[MGMT_STATUS_CODE])
description = response.application_properties.get(
MGMT_STATUS_DESC
) # type: Optional[Union[str, bytes]]
status_code = int(status_code)
if description and isinstance(description, bytes):
description = description.decode("utf-8")
if status_code < 400:
return response
raise self._amqp_transport.get_error(status_code, description)
except Exception as exception: # pylint: disable=broad-except
# is_consumer=True passed in here, ALTHOUGH this method is shared by the producer and consumer.
# is_consumer will only be checked if FileNotFoundError is raised by self.mgmt_client.open() due to
# invalid/non-existent connection_verify filepath. The producer will encounter the FileNotFoundError
# when opening the SendClient, so is_consumer=True will not be passed to amqp_transport.handle_exception
# there. This is for uamqp exception parity, which raises FileNotFoundError in the consumer and
# EventHubError in the producer. TODO: Remove `is_consumer` kwarg when resolving issue #27128.
last_exception = self._amqp_transport._handle_exception( # pylint: disable=protected-access
exception, self
exception, self, is_consumer=True
)
self._backoff(
retried_times=retried_times, last_exception=last_exception
Expand Down Expand Up @@ -540,10 +559,10 @@ def _close_connection(self):
self._close_handler()
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception):
def _handle_exception(self, exception, *, is_consumer=False):
exception = self._amqp_transport.check_timeout_exception(self, exception)
return self._amqp_transport._handle_exception( # pylint: disable=protected-access
exception, self
exception, self, is_consumer=is_consumer
)

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
Expand Down
Loading

0 comments on commit d111b55

Please sign in to comment.