Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples/contrib/explain.py leveraging Rapid SCADA #1665

Merged
merged 6 commits into from
Jul 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions examples/contrib/explain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""
How to explain pymodbus logs using https://rapidscada.net/modbus/ and requests.

Created on 7/19/2023 to support Python 3.8 to 3.11 on macOS, Ubuntu, or Windows.
"""

import contextlib
import os
import shutil
import tempfile
from dataclasses import dataclass
from html.parser import HTMLParser
from typing import List, Optional, Tuple, Union


with contextlib.suppress:
import requests


RAPID_SCADA_URL = "https://rapidscada.net/modbus/"


@dataclass(frozen=True)
class ParsedModbusResult: # pylint: disable=too-many-instance-attributes
"""Simple data structure to hold post response of Rapid SCADA."""

transaction_id: int
length: int
unit_id: int
func_code: int
is_receive: bool
zero_index_reg: Optional[int] = None
quantity: Optional[int] = None
byte_count: Optional[int] = None
registers: Optional[List[int]] = None

def summarize(self) -> dict:
"""Get a summary representation for readability."""
summary = {"is_receive": self.is_receive}
if self.zero_index_reg is not None:
summary["one_index_reg"] = self.zero_index_reg + 1
if self.registers is not None:
summary["registers"] = self.registers
return summary


def explain_with_rapid_scada(
packet: str,
is_modbus_tcp: bool = True,
is_receive: bool = False,
timeout: Union[float, Tuple[float, float], None] = 15.0,
) -> ParsedModbusResult:
"""
Explain a Modbus packet using https://rapidscada.net/modbus/.

Args:
packet: Packet from pymodbus logs.
is_modbus_tcp: Set True (default) for Modbus TCP or False for Modbus RTU.
is_receive: Set True if pymodbus log says RECV, otherwise False for SEND.
timeout: Optional timeout (sec) for the HTTP post, defaulted to 15-sec.

Returns:
Parsed data from Rapid SCADA Modbus Parser.
"""

class NonEmptyDataFromHTML(HTMLParser):
"""Aggregate all data from an HTML blob."""

def __init__(self, *, convert_charrefs=True):
super().__init__(convert_charrefs=convert_charrefs)
self._data = []

@property
def data(self) -> List[str]:
return self._data

def handle_data(self, data: str) -> None:
if not data.strip():
return
self._data.append(data.strip())

data_packet = "+".join(
[f"{int(hex_str, base=16):02X}" for hex_str in packet.split(" ")],
)
raw_response: requests.Response = requests.post(
f"{RAPID_SCADA_URL}?ModbusMode={int(is_modbus_tcp)}"
f"&DataDirection={int(is_receive)}&DataPackage={data_packet}",
timeout=timeout,
)
raw_response.raise_for_status()
parser = NonEmptyDataFromHTML()
parser.feed(raw_response.text)

# pylint: disable-next=dangerous-default-value
def get_next_field(prior_field: str, data: List[str] = parser.data) -> str:
return data[data.index(prior_field) + 1]

def parse_next_field(prior_field: str, split_index: int = 0) -> int:
return int(get_next_field(prior_field).split(" ")[split_index], base=16)

base_result_data = {
"transaction_id": parse_next_field("Transaction identifier"),
"length": parse_next_field("Length"),
"unit_id": parse_next_field("Unit identifier"),
"func_code": parse_next_field("Function code"),
"is_receive": is_receive,
}
is_receive_fn_code: Tuple[bool, int] = is_receive, base_result_data["func_code"]
if is_receive_fn_code in [(False, 0x03), (True, 0x10)]:
return ParsedModbusResult(
**base_result_data,
zero_index_reg=parse_next_field("Starting address", split_index=1),
quantity=parse_next_field("Quantity"),
)
if is_receive_fn_code in [(False, 0x10), (True, 0x03)]:
next_field = "Register value" if is_receive else "Registers value"
return ParsedModbusResult(
**base_result_data,
byte_count=parse_next_field("Byte count"),
registers=[
int(raw_value.split(" ")[0], base=16)
for raw_value in get_next_field(next_field).split(", ")
],
)
raise NotImplementedError(
f"Unhandled case with {is_receive=} and {parser.data=}.",
)


def annotate_pymodbus_logs(file: Union[str, os.PathLike]) -> None:
"""Annotate a pymodbus log file in-place with explanations."""
with open(file, encoding="utf-8") as in_file, tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", delete=False
) as out_file:
for i, line in enumerate(in_file):
if "Running transaction" in line and i > 0:
out_file.write("\n")
out_file.write(line)
if "SEND:" in line:
explained = explain_with_rapid_scada(
packet=line.split("SEND:")[1].strip(),
)
out_file.write(
f"Send explained: {explained}\n"
f"Send summary: {explained.summarize()}\n",
)
if "RECV:" in line:
explained = explain_with_rapid_scada(
packet=line.split("RECV:")[1].strip(),
is_receive=True,
)
out_file.write(
f"Receive explained: {explained}\n"
f"Receive summary: {explained.summarize()}\n",
)
# NOTE: per NamedTemporaryFile docs, the name cannot be reused on Windows
# while the file is still open. So we have to use delete=False followed by
# manually removing the temp file
shutil.copyfile(out_file.name, file)
with contextlib.suppress(FileNotFoundError):
os.remove(out_file.name)