Skip to content

Commit

Permalink
Add Reconnection Logic (#43)
Browse files Browse the repository at this point in the history
* ✨ Re-export with Pharo 12 format

* ✨ Add RabbitMQPublisher

* ✅ Add RabbitMQPublisher tests

* ✨ Add support for reconnection logic and also enhance logging

* ✨ Add support for client reconnection

* ♻️ Refactor RabbitMQ code

* 💡 Added RabbitMQPublisher class comment

* ♻️ Refactor RabbitMQWorker and its tests

* 🧪 Add rabbit disconnection test

* ♻️ RabbitMQ client, worker , publisher refactors

* ♻️ AmqpChannel refactors

* ✅ Add RabbitMQWorker reconnection test

* upgrade dependencies

* add pharo 12 to the CI actions

* 🎨 Add support to handle nextFrame

* 🎨 Add waitForReply handle

* 🧪 Updated test to allow Pharo retrocompatibility

* 📝 Added Pharo 12 support

* Update references of LaunchpadLogRecord to LogRecord

* Refactor the RabbitMQClient hierarchy to make the worker and publisher subclasses of it.

Push up the publisher's logic to reconnect to RabbitMQClient

* handle unexpected errors in heartbeat process

* Refactor worker to be used by composition instead of inheritance

* Include Bell-SUnit as dependency to use LoggingAsserter

* Refactor RabbitMQPublisherTest to use LoggingAsserter
Improve AMQPChannel and AMQPConnection printOn

* Refactor RabbitMQTextReverserText to use LoggingAsserter

* Add messages ack callback in RabbitMQPublisher

* Make queue durability configurable in RabbitMQWorker

* Refactor AMQPRabbitTest to user RabbitMQPublisher and Worker

* Make AmqpConnection printOn more robust to disconnections

* Upgrade actions to use checkout@v4

* Make RabbitMQPublisherTest more portable to previous versions

* Remove redundant error handled

* 🗑️ Deprecate method #confirmMessagesPublicationWith: andThoseNotProcessedWith: for: #confirmPublicationWith:otherwise:

* 📝 Add migration guide doc

* ♻️ Refactor RabbitMQClientTest

* 🔧 Add extra client properties to AmqpConnection

* ✅ Add RabbitMQ reconnection test

* Extract asserts in testPublisherConfirmationWhenMessageProcessed

* Homogeneize AmqpConnection closing logic

* Increment heartbeat priority to highIO
Also improve printOn

* Use ZdcSocketStream because it's flush handles better the disconnections across pharo versions
SocketStream>>#flush don't detect them properly in Pharo 11 and previous versions

* Create an AmqpSocketStream with a #flush implementation consistent for every pharo version

* Move the fixed #flush implementation to an extension loaded only on Pharo < 12 versions

* Correct the deprecated message

* Rename RabbitMQWorker instantiation

Refactor RabbitMQPublisher delivery mode message

* Update migration guide and docs with new RabbitMQ Clients

* Update timeframeBetweenAttempts default value

* Add support of debugging info in RabbitMQWorker

* Fix markdown linter errors

* Include deliveryTag in the worker debugging logs

* Refactor AmqpChannelHandler instance variables

* Refactor the SocketConnectionStatus hierarchy and improve some handling on socket status

* Improve the reconnection logic when the failure is in the client's socket

* Add SocketError as a possible connectivity error

* Add PrimitiveFailed as a possible connectivity error for compatibility with pharo older versions

* Improve AmqpConnection description

* Homogeneize the exchange and queue declaration using a builder
Deprecate older messages

* Create a global to wrap the socket error across different pharo versions

* Use Smalltalk>>#includesKey: to make it portable to Pharo8

* Avoid Heartbeat process termination infinite loop in some Pharo versions

* Use ifTrue:ifFalse: in postLoadInitialization
At the moment of executing the code, that extension is not yet loaded

* Release waiting processes on socket when that one is destroyed

This fixes the issue pharo-project/pharo#15975 presented in older version of pharo

* Add another expected error for compatibility with previous version of Pharo 12

* Stop heartbeat after socket was closed

* Rename ensure to assert in messages that only checks something and raise error

---------

Co-authored-by: Agustin Salvidio <[email protected]>
  • Loading branch information
jvanecek and AgusSalvidio authored Sep 2, 2024
1 parent 5de0394 commit e2390e7
Show file tree
Hide file tree
Showing 61 changed files with 1,997 additions and 1,029 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/loading-groups.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
fail-fast: false
matrix:
smalltalk: [ Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
smalltalk: [ Pharo64-12, Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
load-spec: [ deployment, tests, development, tools ]
name: ${{ matrix.smalltalk }} + ${{ matrix.load-spec }}
services:
Expand All @@ -18,7 +18,7 @@ jobs:
- 5672:5672
options: --health-cmd "rabbitmqctl node_health_check" --health-interval 10s --health-timeout 5s --health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: hpi-swa/setup-smalltalkCI@v1
with:
smalltalk-image: ${{ matrix.smalltalk }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/markdown-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
name: runner / markdownlint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: markdownlint
uses: reviewdog/action-markdownlint@v0
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
fail-fast: false
matrix:
smalltalk: [ Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
smalltalk: [ Pharo64-12, Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
name: ${{ matrix.smalltalk }}
services:
rabbitmq:
Expand All @@ -18,7 +18,7 @@ jobs:
- 5672:5672
options: --health-cmd "rabbitmqctl node_health_check" --health-interval 10s --health-timeout 5s --health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: hpi-swa/setup-smalltalkCI@v1
with:
smalltalk-image: ${{ matrix.smalltalk }}
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Ansible is a AMQP client library for Smalltalk supporting 0-8 and 0-9-1
[![Pharo 8.0](https://img.shields.io/badge/Pharo-8.0-informational)](https://pharo.org)
[![Pharo 9.0](https://img.shields.io/badge/Pharo-9.0-informational)](https://pharo.org)
[![Pharo 10](https://img.shields.io/badge/Pharo-10-informational)](https://pharo.org)
[![Pharo 11](https://img.shields.io/badge/Pharo-10-informational)](https://pharo.org)
[![Pharo 11](https://img.shields.io/badge/Pharo-11-informational)](https://pharo.org)
[![Pharo 12](https://img.shields.io/badge/Pharo-12-informational)](https://pharo.org)

An [Ansible](https://en.wikipedia.org/wiki/Ansible) is a fictional device
capable of near-instantaneous communication. It can send and receive message
Expand Down
70 changes: 70 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Migration Guide

## Migration from v2 to v3

* Manually load the package `Ansible-Deprecated-V3` that has transformation of
deprecated messages.

* `RabbitMQWorker` has been refactored to be used by composition instead of
inheritance: The classes that subclassified it implemented the subclass
responsibility `#configureConnection:` and `#process: payload` should be
changed to instantiate `RabbitMQWorker` as to pass that process logic to the
`processingPayloadWith:` collaborator.

For instance,

```smalltalk
Class {
#name : 'RabbitMQTextReverser',
#superclass : 'RabbitMQWorker',
#instVars : [
'testCase',
]
}
{ #category : 'initialization' }
RabbitMQTextReverser >> initializeWorkingWith: aTestCase
testCase := aTestCase
{ #category : 'private' }
RabbitMQTextReverser >> #configureConnection: builder
builder hostname: 'localhost'.
builder portNumber: 5672.
builder username: 'guest'.
builder password: 'guest'.
{ #category : 'private' }
RabbitMQTextReverser >> process: payload
testCase storeText: payload utf8Decoded reversed
```

Should be refactored to

```smalltalk
Class {
#name : 'RabbitMQTextReverser',
#superclass : 'Object',
#instVars : [
'worker'
]
}
RabbitMQTextReverser >> initializeWorkingWith: aTestCase
worker := RabbitMQWorker
configuredBy: [ :options |
options
at: #hostname 'localhost';.
at: #port 5672;.
at: #username 'guest';.
at: #password 'guest'
]
processingMessagesWith: [ :payload |
aTestCase storeText: payload utf8Decoded reversed
].
worker start
```
8 changes: 8 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ They are not a rewrite but rather my interpretation.
reading it to get a complete understanding. They also provide this [great tool](http://tryrabbitmq.com)
to help you explore different messaging patterns.

## Use RabbitMQ clients reifications

We provide two objects to simplify the instantiations of a publisher and a
consumer:

1. [RabbitMQPublisher](tutorials/RabbitMQPublisher.md)
2. [RabbitMQWorker](tutorials/RabbitMQWorker.md)

---

To use the project as a dependency of your project, take a look at:
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/PublishSubscribe.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ logger name: 'Transcript logger'.
logger resume
```

## Receiveing notifications
## Receiving notifications

Here's the script to spawn a process that will pop up a toast notification on
every log message received
Expand Down
19 changes: 19 additions & 0 deletions docs/tutorials/RabbitMQPublisher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# RabbitMQPublisher

This object will connect to an AMQP channel and knows how to publish messages
to the specified queue for further processing.

Accepts the following options:

<!-- markdownlint-disable MD013 -->
| Attribute name | Description | Optional/Mandatory | Default value |
| ---------------|-------------|--------------------|---------------|
| #hostname | Hostname of the rabbitmq broker | Optional | localhost |
| #port | Port numbre of the rabbitmq broker | Optional | 5672 |
| #username | Username of the rabbitmq broker | Optional | guest |
| #password | Username of the rabbitmq broker | Optional | guest |
| #maximumConnectionAttemps | Amount of retries when connecting to the broker fails | Optional | 3 |
| #timeSlotBetweenConnectionRetriesInMs | Time duration between retry attempts determined by using the exponential backoff algorithm | Optional | 3000 |
| #enableDebuggingLogs | A boolean indicating whether to log debugging events | Optional | false |
| #extraClientProperties | A dictionary with keys and values to set the [client properties](https://www.rabbitmq.com/docs/connections#capabilities) |Optional | Empty |
| #retry | A block that can configure the internal `Retry` instance | Optional | `[]` |
21 changes: 21 additions & 0 deletions docs/tutorials/RabbitMQWorker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# RabbitMQWorker

This object will connect to an AMQP channel and knows how to consume messages
from a specified queue for processing.

Accepts the following options:

<!-- markdownlint-disable MD013 -->
| Attribute name | Description | Optional/Mandatory | Default value |
| ---------------|-------------|--------------------|---------------|
| #hostname | Hostname of the rabbitmq broker | Optional | localhost |
| #port | Port numbre of the rabbitmq broker | Optional | 5672 |
| #username | Username of the rabbitmq broker | Optional | guest |
| #password | Username of the rabbitmq broker | Optional | guest |
| #maximumConnectionAttemps | Amount of retries when connecting to the broker fails | Optional | 3 |
| #timeSlotBetweenConnectionRetriesInMs | Time duration between retry attempts determined by using the exponential backoff algorithm | Optional | 3000 |
| #enableDebuggingLogs | A boolean indicating whether to log debugging events | Optional | false |
| #extraClientProperties | A dictionary with keys and values to set the [client properties](https://www.rabbitmq.com/docs/connections#capabilities)| Optional | Empty |
| #retry | A block that can configure the internal `Retry` instance | Optional | `[]` |
| #queueName | Queue name where to consume from | Mandatory | |
| #queueDurable | When false sets the [queue durability](https://www.rabbitmq.com/docs/queues#durability) to transient, otherwise will be durable | Optional | true |
110 changes: 110 additions & 0 deletions source/Ansible-Deprecated-v3/AmqpChannel.extension.st
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
Extension { #name : 'AmqpChannel' }

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> confirmMessagesPublicationWith: anAckBlock andThoseNotProcessedWith: aNackBlock [

self
deprecated: 'Use confirmPublicationWith:otherwise:'
transformWith:
'`@receiver confirmMessagesPublicationWith: `@anAckBlock andThoseNotProcessedWith: `@aNackBlock'
-> '`@receiver onPublicationConfirmationDo: `@anAckBlock onRejectionDo: `@aNackBlock'.

self onPublicationConfirmationDo: anAckBlock onRejectionDo: aNackBlock
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self
exchangeDeclare: exchangeName
type: typeString
durable: false
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString durable: durable [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self
exchangeDeclare: exchangeName
type: typeString
durable: durable
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString durable: durable autoDelete: autoDelete passive: passive [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self
exchangeDeclare: exchangeName
type: typeString
durable: durable
autoDelete: autoDelete
passive: passive
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString durable: durable autoDelete: autoDelete passive: passive arguments: aDictionary [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self declareExchangeNamed: exchangeName of: typeString applying: [ :builder |
passive then: [ builder bePassive ].
durable then: [ builder beDurable ].
autoDelete then: [ builder autoDelete ].
builder useAsArguments: aDictionary
]
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> queueDeclare: queueName [

self deprecated: 'Use #declareQueueApplying: directly'.
^ self
queueDeclare: queueName
durable: false
exclusive: false
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> queueDeclare: queueName durable: durable [

self deprecated: 'Use #declareQueueApplying: directly'.
^ self
queueDeclare: queueName
durable: durable
exclusive: false
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> queueDeclare: queueName durable: durable exclusive: exclusive autoDelete: autoDelete passive: passive arguments: aDictionary [

self deprecated: 'Use #declareQueueApplying: directly'.

^ self declareQueueApplying: [ :builder |
builder
name: queueName;
useAsArguments: aDictionary.
passive then: [ builder bePassive ].
durable then: [ builder beDurable ].
exclusive then: [ builder beExclusive ].
autoDelete then: [ builder autoDelete ]
]
]
1 change: 1 addition & 0 deletions source/Ansible-Deprecated-v3/package.st
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Package { #name : 'Ansible-Deprecated-v3' }
10 changes: 10 additions & 0 deletions source/Ansible-Pharo-Pending-Patches/Semaphore.extension.st
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Extension { #name : 'Semaphore' }

{ #category : '*Ansible-Pharo-Pending-Patches' }
Semaphore >> waitTimeoutMilliseconds: anInteger [
"Wait on this semaphore for up to the given number of milliseconds, then timeout.
Return true if the deadline expired, false otherwise."
| d |
d := DelayWaitTimeout new setDelay: (anInteger max: 0) forSemaphore: self.
^d wait
]
Loading

0 comments on commit e2390e7

Please sign in to comment.