diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a22895e..754b116c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ most recent version is listed first. - bugfix; `asyncio.streams.StreamWriter.drain` should not be called concurrently by multiple coroutines - when shutting down, `naz` now tries to make sure that write buffers are properly flushed. - replace naz json config file with a python file: https://github.com/komuw/naz/pull/123 +- bugfix: `naz` would attempt to send messages(`submit_sm`) out before it had even connected to SMSC: https://github.com/komuw/naz/pull/124 + ## **version:** v0.6.0-beta.1 - Bug fix: https://github.com/komuw/naz/pull/98 diff --git a/cli/cli.py b/cli/cli.py index ba4212af..3d02ecb4 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -13,49 +13,6 @@ os.environ["PYTHONASYNCIODEBUG"] = "1" -def load_class(dotted_path): - """ - taken from: https://github.com/coleifer/huey/blob/4138d454cc6fd4d252c9350dbd88d74dd3c67dcb/huey/utils.py#L44 - huey is released under MIT license a copy of which can be found at: https://github.com/coleifer/huey/blob/master/LICENSE - - The license is also included below: - - Copyright (c) 2017 Charles Leifer - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. - """ - try: - path, klass = dotted_path.rsplit(".", 1) - __import__(path) - mod = sys.modules[path] - attttr = getattr(mod, klass) - return attttr - except Exception: - cur_dir = os.getcwd() - if cur_dir not in sys.path: - sys.path.insert(0, cur_dir) - return load_class(dotted_path) - err_mesage = "Error importing {0}".format(dotted_path) - sys.stderr.write("\033[91m{0}\033[0m\n".format(err_mesage)) - raise - - def make_parser(): """ this is abstracted into its own method so that it is easier to test it. diff --git a/documentation/sphinx-docs/naz-sentry.png b/documentation/sphinx-docs/naz-sentry.png new file mode 100644 index 00000000..57076d2c Binary files /dev/null and b/documentation/sphinx-docs/naz-sentry.png differ diff --git a/naz/client.py b/naz/client.py index e432de18..10acf18c 100644 --- a/naz/client.py +++ b/naz/client.py @@ -831,7 +831,22 @@ async def enquire_link(self, TESTING: bool = False) -> typing.Union[bytes, None] TESTING: indicates whether this method is been called while running tests. """ # sleep during startup so that `naz` can have had time to connect & bind - await asyncio.sleep(self.enquire_link_interval) + while self.current_session_state != SmppSessionState.BOUND_TRX: + retry_after = self.connect_timeout / 10 + self._log( + logging.DEBUG, + { + "event": "naz.Client.enquire_link", + "stage": "start", + "current_session_state": self.current_session_state, + "state": "awaiting naz to change session state to `BOUND_TRX`. sleeping for {0}minutes".format( + retry_after / 60 + ), + }, + ) + await asyncio.sleep(retry_after) + if TESTING: + return None smpp_command = SmppCommand.ENQUIRE_LINK while True: @@ -1514,6 +1529,24 @@ async def dequeue_messages( Parameters: TESTING: indicates whether this method is been called while running tests. """ + # sleep during startup so that `naz` can have had time to connect & bind + while self.current_session_state != SmppSessionState.BOUND_TRX: + retry_after = self.connect_timeout / 10 + self._log( + logging.DEBUG, + { + "event": "naz.Client.dequeue_messages", + "stage": "start", + "current_session_state": self.current_session_state, + "state": "awaiting naz to change session state to `BOUND_TRX`. sleeping for {0}minutes".format( + retry_after / 60 + ), + }, + ) + await asyncio.sleep(retry_after) + if TESTING: + return {"state": "awaiting naz to change session state to `BOUND_TRX`"} + retry_count = 0 while True: self._log(logging.INFO, {"event": "naz.Client.dequeue_messages", "stage": "start"}) @@ -1664,7 +1697,7 @@ async def dequeue_messages( async def receive_data(self, TESTING: bool = False) -> typing.Union[bytes, None]: """ - In loop; read bytes from the network connected to SMSC and hand them over to the :func:`throparserttled `. + In loop; read bytes from the network connected to SMSC and hand them over to the :func:`throparserttled `. Parameters: TESTING: indicates whether this method is been called while running tests. @@ -1764,21 +1797,21 @@ async def receive_data(self, TESTING: bool = False) -> typing.Union[bytes, None] chunks.append(chunk) bytes_recd = bytes_recd + len(chunk) full_pdu_data = command_length_header_data + b"".join(chunks) - await self.parse_response_pdu(full_pdu_data) + await self._parse_response_pdu(full_pdu_data) self._log(logging.INFO, {"event": "naz.Client.receive_data", "stage": "end"}) if TESTING: # offer escape hatch for tests to come out of endless loop return full_pdu_data - async def parse_response_pdu(self, pdu: bytes) -> None: + async def _parse_response_pdu(self, pdu: bytes) -> None: """ Take the bytes that have been read from network and parse them into their corresponding PDU. - The resulting PDU is then handed over to :func:`speficic_handlers ` + The resulting PDU is then handed over to :func:`command_handlers ` Parameters: pdu: PDU in bytes, that have been read from network """ - self._log(logging.DEBUG, {"event": "naz.Client.parse_response_pdu", "stage": "start"}) + self._log(logging.DEBUG, {"event": "naz.Client._parse_response_pdu", "stage": "start"}) header_data = pdu[:16] body_data = pdu[16:] @@ -1795,7 +1828,7 @@ async def parse_response_pdu(self, pdu: bytes) -> None: self._log( logging.ERROR, { - "event": "naz.Client.parse_response_pdu", + "event": "naz.Client._parse_response_pdu", "stage": "end", "log_id": "", "state": "command_id:{0} is unknown.".format(command_id), @@ -1813,7 +1846,7 @@ async def parse_response_pdu(self, pdu: bytes) -> None: self._log( logging.ERROR, { - "event": "naz.Client.parse_response_pdu", + "event": "naz.Client._parse_response_pdu", "stage": "start", "log_id": log_id, "state": "correlater get error", @@ -1821,7 +1854,7 @@ async def parse_response_pdu(self, pdu: bytes) -> None: }, ) - await self.speficic_handlers( + await self.command_handlers( body_data=body_data, smpp_command=smpp_command, command_status_value=command_status, @@ -1832,7 +1865,7 @@ async def parse_response_pdu(self, pdu: bytes) -> None: self._log( logging.DEBUG, { - "event": "naz.Client.parse_response_pdu", + "event": "naz.Client._parse_response_pdu", "stage": "end", "smpp_command": smpp_command, "log_id": log_id, @@ -1840,7 +1873,7 @@ async def parse_response_pdu(self, pdu: bytes) -> None: }, ) - async def speficic_handlers( + async def command_handlers( self, body_data: bytes, smpp_command: str, @@ -1867,7 +1900,7 @@ async def speficic_handlers( self._log( logging.ERROR, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "start", "smpp_command": smpp_command, "log_id": log_id, @@ -1879,7 +1912,7 @@ async def speficic_handlers( self._log( logging.ERROR, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "start", "smpp_command": smpp_command, "log_id": log_id, @@ -1891,7 +1924,7 @@ async def speficic_handlers( self._log( logging.INFO, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "start", "smpp_command": smpp_command, "log_id": log_id, @@ -1910,7 +1943,7 @@ async def speficic_handlers( self._log( logging.ERROR, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "end", "error": str(e), "smpp_command": smpp_command, @@ -1961,7 +1994,7 @@ async def speficic_handlers( self._log( logging.ERROR, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "end", "smpp_command": smpp_command, "log_id": log_id, @@ -2034,7 +2067,7 @@ async def speficic_handlers( self._log( logging.ERROR, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "start", "log_id": log_id, "state": "correlater get error", @@ -2049,7 +2082,7 @@ async def speficic_handlers( self._log( logging.ERROR, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "end", "smpp_command": smpp_command, "log_id": log_id, @@ -2073,7 +2106,7 @@ async def speficic_handlers( self._log( logging.ERROR, { - "event": "naz.Client.speficic_handlers", + "event": "naz.Client.command_handlers", "stage": "end", "smpp_command": smpp_command, "log_id": log_id, diff --git a/tests/test_client.py b/tests/test_client.py index 8c3f6339..613908bc 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -5,7 +5,7 @@ import json import struct import asyncio -from unittest import TestCase, mock +from unittest import TestCase, mock, skip import naz import docker @@ -65,6 +65,7 @@ def setUp(self): system_id="smppclient1", password=os.getenv("password", "password"), outboundqueue=self.outboundqueue, + loglevel="DEBUG", # run tests with debug so as to debug what is going on ) self.docker_client = docker.from_env() @@ -248,28 +249,28 @@ def test_submit_sm_sending(self): def test_parse_response_pdu(self): with mock.patch( - "naz.Client.speficic_handlers", new=AsyncMock() - ) as mock_naz_speficic_handlers: + "naz.Client.command_handlers", new=AsyncMock() + ) as mock_naz_command_handlers: self._run( - self.cli.parse_response_pdu( + self.cli._parse_response_pdu( pdu=b"\x00\x00\x00\x18\x80\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\x06SMPPSim\x00" ) ) - self.assertTrue(mock_naz_speficic_handlers.mock.called) - self.assertEqual(mock_naz_speficic_handlers.mock.call_count, 1) + self.assertTrue(mock_naz_command_handlers.mock.called) + self.assertEqual(mock_naz_command_handlers.mock.call_count, 1) self.assertEqual( - mock_naz_speficic_handlers.mock.call_args[1]["smpp_command"], + mock_naz_command_handlers.mock.call_args[1]["smpp_command"], naz.SmppCommand.BIND_TRANSCEIVER_RESP, ) - def test_speficic_handlers(self): + def test_command_handlers(self): with mock.patch( "naz.Client.enquire_link_resp", new=AsyncMock() ) as mock_naz_enquire_link_resp: sequence_number = 3 self._run( - self.cli.speficic_handlers( + self.cli.command_handlers( body_data=b"body_data", smpp_command=naz.SmppCommand.ENQUIRE_LINK, command_status_value=0, @@ -284,11 +285,11 @@ def test_speficic_handlers(self): mock_naz_enquire_link_resp.mock.call_args[1]["sequence_number"], sequence_number ) - def test_speficic_handlers_unbind(self): + def test_command_handlers_unbind(self): with mock.patch("naz.Client.send_data", new=AsyncMock()) as mock_naz_send_data: sequence_number = 7 self._run( - self.cli.speficic_handlers( + self.cli.command_handlers( body_data=b"body_data", smpp_command=naz.SmppCommand.UNBIND, command_status_value=0, @@ -303,11 +304,11 @@ def test_speficic_handlers_unbind(self): mock_naz_send_data.mock.call_args[1]["smpp_command"], naz.SmppCommand.UNBIND_RESP ) - def test_speficic_handlers_deliver_sm(self): + def test_command_handlers_deliver_sm(self): with mock.patch("naz.q.SimpleOutboundQueue.enqueue", new=AsyncMock()) as mock_naz_enqueue: sequence_number = 7 self._run( - self.cli.speficic_handlers( + self.cli.command_handlers( body_data=b"body_data", smpp_command=naz.SmppCommand.DELIVER_SM, command_status_value=0, @@ -333,6 +334,7 @@ def test_unbind(self): def test_enquire_link(self): with mock.patch("naz.Client.send_data", new=AsyncMock()) as mock_naz_send_data: + self.cli.current_session_state = naz.SmppSessionState.BOUND_TRX self._run(self.cli.enquire_link(TESTING=True)) self.assertTrue(mock_naz_send_data.mock.called) self.assertEqual(mock_naz_send_data.mock.call_count, 1) @@ -353,6 +355,7 @@ def test_no_sending_if_throttler(self): password=os.getenv("password", "password"), outboundqueue=self.outboundqueue, throttle_handler=throttle_handler, + loglevel="DEBUG", ) log_id = "12345" @@ -366,12 +369,12 @@ def test_no_sending_if_throttler(self): "destination_addr": "254711999999", } self._run(cli.connect()) + cli.current_session_state = naz.SmppSessionState.BOUND_TRX # mock SMSC throttling naz for _ in range(0, int(sample_size) * 2): self._run(cli.throttle_handler.throttled()) self._run(cli.dequeue_messages(TESTING=True)) - self.assertFalse(mock_naz_dequeue.mock.called) def test_okay_smsc_response(self): @@ -382,7 +385,7 @@ def test_okay_smsc_response(self): ) as mock_throttled: sequence_number = 7 self._run( - self.cli.speficic_handlers( + self.cli.command_handlers( body_data=b"body_data", smpp_command=naz.SmppCommand.DELIVER_SM, command_status_value=0, @@ -403,7 +406,7 @@ def test_throttling_smsc_response(self): ) as mock_throttled: sequence_number = 7 self._run( - self.cli.speficic_handlers( + self.cli.command_handlers( body_data=b"body_data", smpp_command=naz.SmppCommand.DELIVER_SM, command_status_value=0x00000058, @@ -419,7 +422,7 @@ def test_throttling_smsc_response(self): def test_response_hook_called(self): with mock.patch("naz.hooks.SimpleHook.response", new=AsyncMock()) as mock_hook_response: self._run( - self.cli.parse_response_pdu( + self.cli._parse_response_pdu( pdu=b"\x00\x00\x00\x12\x80\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x030\x00" ) ) @@ -451,7 +454,7 @@ def test_hook_called_with_metadata(self): self._run(self.cli.connect()) # hack to allow sending submit_sm even when state is wrong - self.cli.current_session_state = "BOUND_TRX" + self.cli.current_session_state = naz.SmppSessionState.BOUND_TRX self._run(self.cli.dequeue_messages(TESTING=True)) self.assertTrue(mock_hook_request.mock.called) @@ -486,7 +489,7 @@ def test_enquire_link_resp(self): with mock.patch("naz.q.SimpleOutboundQueue.enqueue", new=AsyncMock()) as mock_naz_enqueue: sequence_number = 7 self._run( - self.cli.speficic_handlers( + self.cli.command_handlers( body_data=b"body_data", smpp_command=naz.SmppCommand.ENQUIRE_LINK, command_status_value=0, @@ -501,7 +504,7 @@ def test_enquire_link_resp(self): naz.SmppCommand.ENQUIRE_LINK_RESP, ) - def test__retry_after(self): + def test_retry_after(self): self.assertEqual(self.cli._retry_after(current_retries=-23) / 60, 1) self.assertEqual(self.cli._retry_after(current_retries=0) / 60, 1) self.assertEqual(self.cli._retry_after(current_retries=1) / 60, 2) @@ -545,6 +548,7 @@ def test_broken_broker(self): ) as mock_naz_dequeue, mock.patch("asyncio.streams.StreamWriter.write") as mock_naz_writer: mock_naz_dequeue.mock.side_effect = ValueError("This test broker has 99 Problems") self._run(self.cli.connect()) + self.cli.current_session_state = naz.SmppSessionState.BOUND_TRX res = self._run(self.cli.dequeue_messages(TESTING=True)) self.assertEqual(res, {"broker_error": "broker_error"}) self.assertFalse(mock_naz_writer.called) @@ -568,6 +572,7 @@ def test_session_state(self): } self._run(self.cli.connect()) + self.cli.current_session_state = naz.SmppSessionState.OPEN self._run(self.cli.dequeue_messages(TESTING=True)) self.assertFalse(mock_naz_writer.called) @@ -577,7 +582,9 @@ def test_submit_with_session_state_closed(self): """ with mock.patch( "naz.q.SimpleOutboundQueue.dequeue", new=AsyncMock() - ) as mock_naz_dequeue, mock.patch("asyncio.streams.StreamWriter.write") as mock_naz_writer: + ) as mock_naz_dequeue, mock.patch( + "naz.client.asyncio.sleep", new=AsyncMock() + ) as mock_sleep: log_id = "12345" short_message = "hello smpp" mock_naz_dequeue.mock.return_value = { @@ -589,7 +596,8 @@ def test_submit_with_session_state_closed(self): "destination_addr": "254711999999", } self._run(self.cli.dequeue_messages(TESTING=True)) - self.assertFalse(mock_naz_writer.called) + self.assertTrue(mock_sleep.mock.called) + self.assertEqual(mock_sleep.mock.call_args[0][0], self.cli.connect_timeout / 10) def test_correlater_put_called(self): with mock.patch( @@ -613,7 +621,7 @@ def test_correlater_put_called(self): self._run(self.cli.connect()) # hack to allow sending submit_sm even when state is wrong - self.cli.current_session_state = "BOUND_TRX" + self.cli.current_session_state = naz.SmppSessionState.BOUND_TRX self._run(self.cli.dequeue_messages(TESTING=True)) self.assertTrue(mock_correlater_put.mock.called) @@ -627,9 +635,9 @@ def test_correlater_get_called(self): with mock.patch( "naz.correlater.SimpleCorrelater.get", new=AsyncMock() ) as mock_correlater_get: - mock_correlater_get.return_value = "log_id", "hook_metadata" + mock_correlater_get.mock.return_value = "log_id", "hook_metadata" self._run( - self.cli.parse_response_pdu( + self.cli._parse_response_pdu( pdu=b"\x00\x00\x00\x18\x80\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\x06SMPPSim\x00" ) ) @@ -640,19 +648,19 @@ def test_logger_called(self): with mock.patch("naz.logger.SimpleLogger.log") as mock_logger_log: mock_logger_log.return_value = None self._run( - self.cli.parse_response_pdu( + self.cli._parse_response_pdu( pdu=b"\x00\x00\x00\x18\x80\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\x06SMPPSim\x00" ) ) self.assertTrue(mock_logger_log.called) self.assertEqual( - mock_logger_log.call_args[0][1]["event"], "naz.Client.parse_response_pdu" + mock_logger_log.call_args[0][1]["event"], "naz.Client._parse_response_pdu" ) def test_parse_deliver_sm(self): with mock.patch( - "naz.Client.speficic_handlers", new=AsyncMock() - ) as mock_naz_speficic_handlers: + "naz.Client.command_handlers", new=AsyncMock() + ) as mock_naz_command_handlers: # see: https://github.com/mozes/smpp.pdu deliver_sm_pdu = ( b"\x00\x00\x00M\x00\x00\x00\x05\x00\x00" @@ -661,12 +669,12 @@ def test_parse_deliver_sm(self): b"\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00" b"\x11id:123456 sub:SSS dlvrd:DDD blah blah" ) - self._run(self.cli.parse_response_pdu(pdu=deliver_sm_pdu)) + self._run(self.cli._parse_response_pdu(pdu=deliver_sm_pdu)) - self.assertTrue(mock_naz_speficic_handlers.mock.called) - self.assertEqual(mock_naz_speficic_handlers.mock.call_count, 1) + self.assertTrue(mock_naz_command_handlers.mock.called) + self.assertEqual(mock_naz_command_handlers.mock.call_count, 1) self.assertEqual( - mock_naz_speficic_handlers.mock.call_args[1]["smpp_command"], + mock_naz_command_handlers.mock.call_args[1]["smpp_command"], naz.SmppCommand.DELIVER_SM, ) @@ -696,7 +704,7 @@ def test_submit_sm_AND_deliver_sm_correlation(self): # 1. SEND SUBMIT_SM self._run(self.cli.connect()) # hack to allow sending submit_sm even when state is wrong - self.cli.current_session_state = "BOUND_TRX" + self.cli.current_session_state = naz.SmppSessionState.BOUND_TRX self._run(self.cli.dequeue_messages(TESTING=True)) self.assertTrue(self.cli.correlation_handler.store[mock_sequence_number]) self.assertEqual( @@ -718,7 +726,7 @@ def test_submit_sm_AND_deliver_sm_correlation(self): ">IIII", command_length, command_id, command_status, mock_sequence_number ) # SUBMIT_SM_RESP should have same sequence_number as SUBMIT_SM submit_sm_resp_full_pdu = header + body - self._run(self.cli.parse_response_pdu(pdu=submit_sm_resp_full_pdu)) + self._run(self.cli._parse_response_pdu(pdu=submit_sm_resp_full_pdu)) # assert message_id was stored self.assertTrue(self.cli.correlation_handler.store[submit_sm_resp_smsc_message_id]) @@ -745,7 +753,7 @@ def test_submit_sm_AND_deliver_sm_correlation(self): b"\x00\x00\x03\x00\x11id:1618Z-0102G-2333M-25FJF sub:SSS dlvrd:DDD blah blah" ) deliver_sm_pdu = deliver_sm_pdu + tag_n_len + value - self._run(self.cli.parse_response_pdu(pdu=deliver_sm_pdu)) + self._run(self.cli._parse_response_pdu(pdu=deliver_sm_pdu)) self.assertTrue(mock_hook_response.mock.called) self.assertEqual( @@ -802,3 +810,20 @@ def test_issues_67(self): ) ) self.assertTrue(mock_naz_tranceiver_bind.mock.called) + + @skip("TODO:fix this. It does not work.") + def test_issues_112(self): + """ + test to prove we have fixed: https://github.com/komuw/naz/issues/112 + + Run `Client.enquire_link`. Check if `StreamWriter.write` was called twice(one for `tranceiver_bind` and another for `enquire_link`) + If `StreamWriter.write` was called, it means that our `enquire_link` call didnt get a: + enquire_link cannot be sent to SMSC when the client session state is: OPEN error. + """ + with mock.patch("asyncio.streams.StreamWriter.write") as mock_naz_writer: + self._run(self.cli.connect()) + self._run(self.cli.tranceiver_bind()) + # self.cli.current_session_state = naz.SmppSessionState.BOUND_TRX + self._run(self.cli.enquire_link(TESTING=True)) + self.assertTrue(mock_naz_writer.called) + self.assertEqual(mock_naz_writer.call_count, 2)