Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issues/112 #124

Merged
merged 29 commits into from
Jun 1, 2019
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ most recent version is listed first.
- bugfix; `asyncio.streams.StreamWriter.drain` should not be called concurrently by multiple coroutines
- when shutting down, `naz` now tries to make sure that write buffers are properly flushed.
- replace naz json config file with a python file: https://github.com/komuw/naz/pull/123
- bugfix: `naz` would attempt to send messages(`submit_sm`) out before it had even connected to SMSC: https://github.com/komuw/naz/pull/124


## **version:** v0.6.0-beta.1
- Bug fix: https://github.com/komuw/naz/pull/98
Expand Down
43 changes: 0 additions & 43 deletions cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,6 @@
os.environ["PYTHONASYNCIODEBUG"] = "1"


def load_class(dotted_path):
"""
taken from: https://github.com/coleifer/huey/blob/4138d454cc6fd4d252c9350dbd88d74dd3c67dcb/huey/utils.py#L44
huey is released under MIT license a copy of which can be found at: https://github.com/coleifer/huey/blob/master/LICENSE

The license is also included below:

Copyright (c) 2017 Charles Leifer

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
try:
path, klass = dotted_path.rsplit(".", 1)
__import__(path)
mod = sys.modules[path]
attttr = getattr(mod, klass)
return attttr
except Exception:
cur_dir = os.getcwd()
if cur_dir not in sys.path:
sys.path.insert(0, cur_dir)
return load_class(dotted_path)
err_mesage = "Error importing {0}".format(dotted_path)
sys.stderr.write("\033[91m{0}\033[0m\n".format(err_mesage))
raise


def make_parser():
"""
this is abstracted into its own method so that it is easier to test it.
Expand Down
Binary file added documentation/sphinx-docs/naz-sentry.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
71 changes: 52 additions & 19 deletions naz/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,22 @@ async def enquire_link(self, TESTING: bool = False) -> typing.Union[bytes, None]
TESTING: indicates whether this method is been called while running tests.
"""
# sleep during startup so that `naz` can have had time to connect & bind
await asyncio.sleep(self.enquire_link_interval)
while self.current_session_state != SmppSessionState.BOUND_TRX:
retry_after = self.connect_timeout / 10
self._log(
logging.DEBUG,
{
"event": "naz.Client.enquire_link",
"stage": "start",
"current_session_state": self.current_session_state,
"state": "awaiting naz to change session state to `BOUND_TRX`. sleeping for {0}minutes".format(
retry_after / 60
),
},
)
await asyncio.sleep(retry_after)
if TESTING:
return None

smpp_command = SmppCommand.ENQUIRE_LINK
while True:
Expand Down Expand Up @@ -1514,6 +1529,24 @@ async def dequeue_messages(
Parameters:
TESTING: indicates whether this method is been called while running tests.
"""
# sleep during startup so that `naz` can have had time to connect & bind
while self.current_session_state != SmppSessionState.BOUND_TRX:
retry_after = self.connect_timeout / 10
self._log(
logging.DEBUG,
{
"event": "naz.Client.dequeue_messages",
"stage": "start",
"current_session_state": self.current_session_state,
"state": "awaiting naz to change session state to `BOUND_TRX`. sleeping for {0}minutes".format(
retry_after / 60
),
},
)
await asyncio.sleep(retry_after)
if TESTING:
return {"state": "awaiting naz to change session state to `BOUND_TRX`"}

retry_count = 0
while True:
self._log(logging.INFO, {"event": "naz.Client.dequeue_messages", "stage": "start"})
Expand Down Expand Up @@ -1664,7 +1697,7 @@ async def dequeue_messages(

async def receive_data(self, TESTING: bool = False) -> typing.Union[bytes, None]:
"""
In loop; read bytes from the network connected to SMSC and hand them over to the :func:`throparserttled <Client.parse_response_pdu>`.
In loop; read bytes from the network connected to SMSC and hand them over to the :func:`throparserttled <Client._parse_response_pdu>`.

Parameters:
TESTING: indicates whether this method is been called while running tests.
Expand Down Expand Up @@ -1764,21 +1797,21 @@ async def receive_data(self, TESTING: bool = False) -> typing.Union[bytes, None]
chunks.append(chunk)
bytes_recd = bytes_recd + len(chunk)
full_pdu_data = command_length_header_data + b"".join(chunks)
await self.parse_response_pdu(full_pdu_data)
await self._parse_response_pdu(full_pdu_data)
self._log(logging.INFO, {"event": "naz.Client.receive_data", "stage": "end"})
if TESTING:
# offer escape hatch for tests to come out of endless loop
return full_pdu_data

async def parse_response_pdu(self, pdu: bytes) -> None:
async def _parse_response_pdu(self, pdu: bytes) -> None:
"""
Take the bytes that have been read from network and parse them into their corresponding PDU.
The resulting PDU is then handed over to :func:`speficic_handlers <Client.speficic_handlers>`
The resulting PDU is then handed over to :func:`command_handlers <Client.command_handlers>`

Parameters:
pdu: PDU in bytes, that have been read from network
"""
self._log(logging.DEBUG, {"event": "naz.Client.parse_response_pdu", "stage": "start"})
self._log(logging.DEBUG, {"event": "naz.Client._parse_response_pdu", "stage": "start"})

header_data = pdu[:16]
body_data = pdu[16:]
Expand All @@ -1795,7 +1828,7 @@ async def parse_response_pdu(self, pdu: bytes) -> None:
self._log(
logging.ERROR,
{
"event": "naz.Client.parse_response_pdu",
"event": "naz.Client._parse_response_pdu",
"stage": "end",
"log_id": "",
"state": "command_id:{0} is unknown.".format(command_id),
Expand All @@ -1813,15 +1846,15 @@ async def parse_response_pdu(self, pdu: bytes) -> None:
self._log(
logging.ERROR,
{
"event": "naz.Client.parse_response_pdu",
"event": "naz.Client._parse_response_pdu",
"stage": "start",
"log_id": log_id,
"state": "correlater get error",
"error": str(e),
},
)

await self.speficic_handlers(
await self.command_handlers(
body_data=body_data,
smpp_command=smpp_command,
command_status_value=command_status,
Expand All @@ -1832,15 +1865,15 @@ async def parse_response_pdu(self, pdu: bytes) -> None:
self._log(
logging.DEBUG,
{
"event": "naz.Client.parse_response_pdu",
"event": "naz.Client._parse_response_pdu",
"stage": "end",
"smpp_command": smpp_command,
"log_id": log_id,
"command_status": command_status,
},
)

async def speficic_handlers(
async def command_handlers(
self,
body_data: bytes,
smpp_command: str,
Expand All @@ -1867,7 +1900,7 @@ async def speficic_handlers(
self._log(
logging.ERROR,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "start",
"smpp_command": smpp_command,
"log_id": log_id,
Expand All @@ -1879,7 +1912,7 @@ async def speficic_handlers(
self._log(
logging.ERROR,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "start",
"smpp_command": smpp_command,
"log_id": log_id,
Expand All @@ -1891,7 +1924,7 @@ async def speficic_handlers(
self._log(
logging.INFO,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "start",
"smpp_command": smpp_command,
"log_id": log_id,
Expand All @@ -1910,7 +1943,7 @@ async def speficic_handlers(
self._log(
logging.ERROR,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "end",
"error": str(e),
"smpp_command": smpp_command,
Expand Down Expand Up @@ -1961,7 +1994,7 @@ async def speficic_handlers(
self._log(
logging.ERROR,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "end",
"smpp_command": smpp_command,
"log_id": log_id,
Expand Down Expand Up @@ -2034,7 +2067,7 @@ async def speficic_handlers(
self._log(
logging.ERROR,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "start",
"log_id": log_id,
"state": "correlater get error",
Expand All @@ -2049,7 +2082,7 @@ async def speficic_handlers(
self._log(
logging.ERROR,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "end",
"smpp_command": smpp_command,
"log_id": log_id,
Expand All @@ -2073,7 +2106,7 @@ async def speficic_handlers(
self._log(
logging.ERROR,
{
"event": "naz.Client.speficic_handlers",
"event": "naz.Client.command_handlers",
"stage": "end",
"smpp_command": smpp_command,
"log_id": log_id,
Expand Down
Loading