Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add layer HICP #4075

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 278 additions & 0 deletions scapy/contrib/hicp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
# SPDX-License-Identifier: GPL-2.0-or-later
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) 2023 - Claire VACHEROT <clairelex[at]pm.me>

"""HICP

Support for HICP (Host IP Control Protocol).

This protocol is used by HMS Anybus software for device discovery and
configuration.

Note : As the specification is not public, this layer was built based on the
Wireshark dissector and HMS's HICP DLL. It was tested with a Anybus X-gateway
device. Therefore, this implementation may differ from what is written in the
standard.
"""

# scapy.contrib.name = HICP
# scapy.contrib.description = HMS Anybus Host IP Control Protocol
# scapy.contrib.status = loads

from re import match

from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.fields import StrField, MACField, IPField, ByteField, RawVal
from scapy.layers.inet import UDP

# HICP command codes
CMD_MODULESCAN = b"Module scan"
CMD_MSRESPONSE = b"Module scan response"
CMD_CONFIGURE = b"Configure"
CMD_RECONFIGURED = b"Reconfigured"
CMD_INVALIDCONF = b"Invalid Configuration"
CMD_INVALIDPWD = b"Invalid Password"
CMD_WINK = b"Wink"
# These commands are implemented in the DLL but never seen in use
CMD_START = b"Start"
CMD_STOP = b"Stop"

# Most of the fields have the format "KEY = value" for each field
KEYS = {
"protocol_version": "Protocol version",
"fieldbus_type": "FB type",
"module_version": "Module version",
"mac_address": "MAC",
"new_password": "New password",
"password": "PSWD",
"ip_address": "IP",
"subnet_mask": "SN",
"gateway_address": "GW",
"dhcp": "DHCP",
"hostname": "HN",
"dns1": "DNS1",
"dns2": "DNS2"
}

# HICP MAC format is xx-xx-xx-xx-xx-xx (not with :) as str.
FROM_MACFIELD = lambda x: x.replace(":", "-")
TO_MACFIELD = lambda x: x.replace("-", ":")

# Note on building and dissecting: Since the protocol is primarily text-based
# but also highly inconsistent in terms of message format, most of the
# dissection and building process must be reworked for each message type.


class HICPConfigure(Packet):
name = "Configure request"
fields_desc = [
MACField("target", "ff:ff:ff:ff:ff:ff"),
StrField("password", ""),
StrField("new_password", ""),
IPField("ip_address", "255.255.255.255"),
IPField("subnet_mask", "255.255.255.0"),
IPField("gateway_address", "0.0.0.0"),
StrField("dhcp", "OFF"), # ON or OFF
StrField("hostname", ""),
IPField("dns1", "0.0.0.0"),
IPField("dns2", "0.0.0.0"),
ByteField("padding", 0)
]

def post_build(self, p, pay):
p = ["{0}: {1};".format(CMD_CONFIGURE.decode('utf-8'),
FROM_MACFIELD(self.target))]
for field in self.fields_desc[1:]:
if field.name in KEYS:
value = getattr(self, field.name)
if isinstance(value, bytes):
value = value.decode('utf-8')
if field.name in ["password", "new_password"] and not value:
continue
key = KEYS[field.name]
# The key for password is not the same as usual...
if field.name == "password":
key = "Password"
p.append("{0} = {1};".format(key, value))
return "".join(p).encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(".*: ([^;]+);", s.decode('utf-8'))
if res:
self.target = TO_MACFIELD(res.group(1))
s = s[len(self.target) + 3:]
for arg in s.split(b";"):
kv = [x.strip().replace(b"\x00", b"") for x in arg.split(b"=")]
if len(kv) != 2 or not kv[1]:
continue
kv[0] = kv[0].decode('utf-8')
if kv[0] in KEYS.values():
field = [x for x, y in KEYS.items() if y == kv[0]][0]
setattr(self, field, kv[1])


class HICPReconfigured(Packet):
name = "Reconfigured"
fields_desc = [
MACField("source", "ff:ff:ff:ff:ff:ff")
]

def post_build(self, p, pay):
p = "{0}: {1}".format(CMD_RECONFIGURED.decode('utf-8'),
FROM_MACFIELD(self.source))
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(r".*: ([a-fA-F0-9\-\:]+)", s.decode('utf-8'))
if res:
self.source = TO_MACFIELD(res.group(1))
return None


class HICPInvalidConfiguration(Packet):
name = "Invalid configuration"
fields_desc = [
MACField("source", "ff:ff:ff:ff:ff:ff")
]

def post_build(self, p, pay):
p = "{0}: {1}".format(CMD_INVALIDCONF.decode('utf-8'),
FROM_MACFIELD(self.source))
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(r".*: ([a-fA-F0-9\-\:]+)", s.decode('utf-8'))
if res:
self.source = TO_MACFIELD(res.group(1))
return None


class HICPInvalidPassword(Packet):
name = "Invalid password"
fields_desc = [
MACField("source", "ff:ff:ff:ff:ff:ff")
]

def post_build(self, p, pay):
p = "{0}: {1}".format(CMD_INVALIDPWD.decode('utf-8'),
FROM_MACFIELD(self.source))
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(r".*: ([a-fA-F0-9\-\:]+)", s.decode('utf-8'))
if res:
self.source = TO_MACFIELD(res.group(1))
return None


class HICPWink(Packet):
name = "Wink"
fields_desc = [
MACField("target", "ff:ff:ff:ff:ff:ff"),
ByteField("padding", 0)
]

def post_build(self, p, pay):
p = "To: {0};{1};".format(FROM_MACFIELD(self.target),
CMD_WINK.decode('utf-8').upper())
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match("^To: ([^;]+);", s.decode('utf-8'))
if res:
self.target = TO_MACFIELD(res.group(1))


class HICPModuleScanResponse(Packet):
name = "Module scan response"
fields_desc = [
StrField("protocol_version", "1.00"),
StrField("fieldbus_type", ""),
StrField("module_version", ""),
MACField("mac_address", "ff:ff:ff:ff:ff:ff"),
IPField("ip_address", "255.255.255.255"),
IPField("subnet_mask", "255.255.255.0"),
IPField("gateway_address", "0.0.0.0"),
StrField("dhcp", "OFF"), # ON or OFF
StrField("password", "OFF"), # ON or OFF
StrField("hostname", ""),
IPField("dns1", "0.0.0.0"),
IPField("dns2", "0.0.0.0"),
ByteField("padding", 0)
]

def post_build(self, p, pay):
p = []
for field in self.fields_desc:
if field.name in KEYS:
value = getattr(self, field.name)
if isinstance(value, bytes):
value = value.decode('utf-8')
p.append("{0} = {1};".format(KEYS[field.name], value))
return "".join(p).encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
for arg in s.split(b";"):
kv = [x.strip().replace(b"\x00", b"") for x in arg.split(b"=")]
if len(kv) != 2 or not kv[1]:
continue
kv[0] = kv[0].decode('utf-8')
if kv[0] in KEYS.values():
field = [x for x, y in KEYS.items() if y == kv[0]][0]
if field == "mac_address":
kv[1] = TO_MACFIELD(kv[1].decode('utf-8'))
setattr(self, field, kv[1])


class HICPModuleScan(Packet):
name = "Module scan request"
fields_desc = [
StrField("hicp_command", CMD_MODULESCAN),
ByteField("padding", 0)
]

def do_dissect(self, s):
if len(s) > len(CMD_MODULESCAN):
self.hicp_command = s[:len(CMD_MODULESCAN)]
self.padding = s[len(CMD_MODULESCAN):]
else:
self.padding = RawVal(s)

def post_build(self, p, pay):
return p.upper() + pay


class HICP(Packet):
name = "HICP"
fields_desc = [
StrField("hicp_command", "")
]

def do_dissect(self, s):
for cmd in [CMD_MODULESCAN, CMD_CONFIGURE, CMD_RECONFIGURED,
CMD_INVALIDCONF, CMD_INVALIDPWD]:
if s[:len(cmd)] == cmd:
self.hicp_command = cmd
return s[len(cmd):]
if s[:len("To:")] == b"To:":
self.hicp_command = CMD_WINK
else:
self.hicp_command = CMD_MSRESPONSE
return s

def post_build(self, p, pay):
p = p[len(self.hicp_command):]
return p + pay


bind_bottom_up(UDP, HICP, dport=3250)
bind_bottom_up(UDP, HICP, sport=3250)
bind_layers(UDP, HICP, sport=3250, dport=3250)
bind_layers(HICP, HICPModuleScan, hicp_command=CMD_MODULESCAN)
bind_layers(HICP, HICPModuleScanResponse, hicp_command=CMD_MSRESPONSE)
bind_layers(HICP, HICPWink, hicp_command=CMD_WINK)
bind_layers(HICP, HICPConfigure, hicp_command=CMD_CONFIGURE)
bind_layers(HICP, HICPReconfigured, hicp_command=CMD_RECONFIGURED)
bind_layers(HICP, HICPInvalidConfiguration, hicp_command=CMD_INVALIDCONF)
bind_layers(HICP, HICPInvalidPassword, hicp_command=CMD_INVALIDPWD)
113 changes: 113 additions & 0 deletions test/contrib/hicp.uts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
% HICP test campaign

#
# execute test:
# > test/run_tests -t test/contrib/hicp.uts
#

+ Syntax check
= Import the HICP layer
from scapy.contrib.hicp import *

+ HICP Module scan request
= Build and dissect module scan
pkt = HICPModuleScan()
assert(pkt.hicp_command == b"Module scan")
assert(raw(pkt) == b"MODULE SCAN\x00")
pkt = HICP(b"Module scan\x00")
assert(pkt.hicp_command == b"Module scan")

+ HICP Module scan response
= Build and dissect device description
pkt=HICPModuleScanResponse(fieldbus_type="kwack")
assert(pkt.protocol_version == b"1.00")
assert(pkt.fieldbus_type == b"kwack")
assert(pkt.mac_address == "ff:ff:ff:ff:ff:ff")
pkt=HICP(
b"\x50\x72\x6f\x74\x6f\x63\x6f\x6c\x20\x76\x65\x72\x73\x69\x6f\x6e" \
b"\x20\x3d\x20\x31\x2e\x30\x30\x3b\x46\x42\x20\x74\x79\x70\x65\x20" \
b"\x3d\x20\x3b\x4d\x6f\x64\x75\x6c\x65\x20\x76\x65\x72\x73\x69\x6f" \
b"\x6e\x20\x3d\x20\x3b\x4d\x41\x43\x20\x3d\x20\x65\x65\x3a\x65\x65" \
b"\x3a\x65\x65\x3a\x65\x65\x3a\x65\x65\x3a\x65\x65\x3b\x49\x50\x20" \
b"\x3d\x20\x32\x35\x35\x2e\x32\x35\x35\x2e\x32\x35\x35\x2e\x32\x35" \
b"\x35\x3b\x53\x4e\x20\x3d\x20\x32\x35\x35\x2e\x32\x35\x35\x2e\x32" \
b"\x35\x35\x2e\x30\x3b\x47\x57\x20\x3d\x20\x30\x2e\x30\x2e\x30\x2e" \
b"\x30\x3b\x44\x48\x43\x50\x20\x3d\x20\x4f\x46\x46\x3b\x48\x4e\x20" \
b"\x3d\x20\x3b\x44\x4e\x53\x31\x20\x3d\x20\x30\x2e\x30\x2e\x30\x2e" \
b"\x30\x3b\x44\x4e\x53\x32\x20\x3d\x20\x30\x2e\x30\x2e\x30\x2e\x30" \
b"\x3b\x00"
)
assert(pkt.hicp_command == b"Module scan response")
assert(pkt.protocol_version == b"1.00")
assert(pkt.mac_address == "ee:ee:ee:ee:ee:ee")
assert(pkt.subnet_mask == "255.255.255.0")
pkt=HICP(b"Protocol version = 2; FB type = TEST;Module version = 1.0.0;MAC = cc:cc:cc:cc:cc:cc;IP = 192.168.1.1;SN = 255.255.255.0;GW = 192.168.1.254;DHCP=ON;HN = bonjour;DNS1 = 1.1.1.1;DNS2 = 2.2.2.2")
assert(pkt.hicp_command == b"Module scan response")
assert(pkt.protocol_version == b"2")
assert(pkt.fieldbus_type == b"TEST")
assert(pkt.module_version == b"1.0.0")
assert(pkt.mac_address == "cc:cc:cc:cc:cc:cc")
assert(pkt.ip_address == "192.168.1.1")
assert(pkt.subnet_mask == "255.255.255.0")
assert(pkt.gateway_address == "192.168.1.254")
assert(pkt.dhcp == b"ON")
assert(pkt.hostname == b"bonjour")
assert(pkt.dns1 == "1.1.1.1")
assert(pkt.dns2 == "2.2.2.2")

+ HICP Wink request
= Build and dissect Winks
pkt = HICPWink(target="dd:dd:dd:dd:dd:dd")
assert(pkt.target == "dd:dd:dd:dd:dd:dd")
pkt = HICP(b"To: bb:bb:bb:bb:bb:bb;WINK;\x00")
assert(pkt.target == "bb:bb:bb:bb:bb:bb")

+ HICP Configure request
= Build and dissect new network settings
pkt = HICPConfigure(target="aa:aa:aa:aa:aa:aa", hostname="llama")
assert(pkt.target == "aa:aa:aa:aa:aa:aa")
assert(pkt.ip_address == "255.255.255.255")
assert(pkt.hostname == b"llama")
assert(raw(pkt) == b"Configure: aa-aa-aa-aa-aa-aa;IP = 255.255.255.255;SN = 255.255.255.0;GW = 0.0.0.0;DHCP = OFF;HN = llama;DNS1 = 0.0.0.0;DNS2 = 0.0.0.0;\x00")
pkt = HICP(b"Configure: aa-aa-aa-aa-aa-aa;IP = 255.255.255.255;SN = 255.255.255.0;GW = 0.0.0.0;DHCP = OFF;HN = llama;DNS1 = 0.0.0.0;DNS2 = 0.0.0.0;\x00")
assert(pkt.hicp_command == b"Configure")
assert(pkt.target == "aa:aa:aa:aa:aa:aa")
assert(pkt.ip_address == "255.255.255.255")
assert(pkt.hostname == b"llama")

+ HICP Configure response
= Build and dissect successful response to configure request

pkt = HICPReconfigured(source="11:00:00:00:00:00")
assert(pkt.source == "11:00:00:00:00:00")
assert(raw(pkt) == b"Reconfigured: 11-00-00-00-00-00\x00")
pkt = HICP(b"\x52\x65\x63\x6f\x6e\x66\x69\x67\x75\x72\x65\x64\x3a\x20\x31\x31" \
b"\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x00")
assert(pkt.hicp_command == b"Reconfigured")
assert(pkt.source == "11:00:00:00:00:00")

+ HICP Configure error
= Build and dissect error response to configure request

pkt = HICPInvalidConfiguration(source="00:11:00:00:00:00")
assert(pkt.source == "00:11:00:00:00:00")
assert(raw(pkt) == b"Invalid Configuration: 00-11-00-00-00-00\x00")
pkt = HICP(
b"\x49\x6e\x76\x61\x6c\x69\x64\x20\x43\x6f\x6e\x66\x69\x67\x75\x72" \
b"\x61\x74\x69\x6f\x6e\x3a\x20\x30\x30\x2d\x31\x31\x2d\x30\x30\x2d" \
b"\x30\x30\x2d\x30\x30\x2d\x30\x30\x00"
)
assert(pkt.hicp_command == b"Invalid Configuration")
assert(pkt.source == "00:11:00:00:00:00")

+ HICP Configure invalid password
= Build and dissect invalid password response to configure request

pkt = HICPInvalidPassword(source="00:00:11:00:00:00")
assert(pkt.source == "00:00:11:00:00:00")
assert(raw(pkt) == b"Invalid Password: 00-00-11-00-00-00\x00")
pkt = HICP(b"\x49\x6e\x76\x61\x6c\x69" \
b"\x64\x20\x50\x61\x73\x73\x77\x6f\x72\x64\x3a\x20\x30\x30\x2d\x30" \
b"\x30\x2d\x31\x31\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x00")
assert(pkt.hicp_command == b"Invalid Password")
assert(pkt.source == "00:00:11:00:00:00")