Skip to content

Commit

Permalink
Add layer HICP (#4075)
Browse files Browse the repository at this point in the history
  • Loading branch information
claire-lex authored Jul 26, 2023
1 parent 5ac3c28 commit 204605b
Show file tree
Hide file tree
Showing 2 changed files with 391 additions and 0 deletions.
278 changes: 278 additions & 0 deletions scapy/contrib/hicp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
# SPDX-License-Identifier: GPL-2.0-or-later
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) 2023 - Claire VACHEROT <clairelex[at]pm.me>

"""HICP
Support for HICP (Host IP Control Protocol).
This protocol is used by HMS Anybus software for device discovery and
configuration.
Note : As the specification is not public, this layer was built based on the
Wireshark dissector and HMS's HICP DLL. It was tested with a Anybus X-gateway
device. Therefore, this implementation may differ from what is written in the
standard.
"""

# scapy.contrib.name = HICP
# scapy.contrib.description = HMS Anybus Host IP Control Protocol
# scapy.contrib.status = loads

from re import match

from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.fields import StrField, MACField, IPField, ByteField, RawVal
from scapy.layers.inet import UDP

# HICP command codes
CMD_MODULESCAN = b"Module scan"
CMD_MSRESPONSE = b"Module scan response"
CMD_CONFIGURE = b"Configure"
CMD_RECONFIGURED = b"Reconfigured"
CMD_INVALIDCONF = b"Invalid Configuration"
CMD_INVALIDPWD = b"Invalid Password"
CMD_WINK = b"Wink"
# These commands are implemented in the DLL but never seen in use
CMD_START = b"Start"
CMD_STOP = b"Stop"

# Most of the fields have the format "KEY = value" for each field
KEYS = {
"protocol_version": "Protocol version",
"fieldbus_type": "FB type",
"module_version": "Module version",
"mac_address": "MAC",
"new_password": "New password",
"password": "PSWD",
"ip_address": "IP",
"subnet_mask": "SN",
"gateway_address": "GW",
"dhcp": "DHCP",
"hostname": "HN",
"dns1": "DNS1",
"dns2": "DNS2"
}

# HICP MAC format is xx-xx-xx-xx-xx-xx (not with :) as str.
FROM_MACFIELD = lambda x: x.replace(":", "-")
TO_MACFIELD = lambda x: x.replace("-", ":")

# Note on building and dissecting: Since the protocol is primarily text-based
# but also highly inconsistent in terms of message format, most of the
# dissection and building process must be reworked for each message type.


class HICPConfigure(Packet):
name = "Configure request"
fields_desc = [
MACField("target", "ff:ff:ff:ff:ff:ff"),
StrField("password", ""),
StrField("new_password", ""),
IPField("ip_address", "255.255.255.255"),
IPField("subnet_mask", "255.255.255.0"),
IPField("gateway_address", "0.0.0.0"),
StrField("dhcp", "OFF"), # ON or OFF
StrField("hostname", ""),
IPField("dns1", "0.0.0.0"),
IPField("dns2", "0.0.0.0"),
ByteField("padding", 0)
]

def post_build(self, p, pay):
p = ["{0}: {1};".format(CMD_CONFIGURE.decode('utf-8'),
FROM_MACFIELD(self.target))]
for field in self.fields_desc[1:]:
if field.name in KEYS:
value = getattr(self, field.name)
if isinstance(value, bytes):
value = value.decode('utf-8')
if field.name in ["password", "new_password"] and not value:
continue
key = KEYS[field.name]
# The key for password is not the same as usual...
if field.name == "password":
key = "Password"
p.append("{0} = {1};".format(key, value))
return "".join(p).encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(".*: ([^;]+);", s.decode('utf-8'))
if res:
self.target = TO_MACFIELD(res.group(1))
s = s[len(self.target) + 3:]
for arg in s.split(b";"):
kv = [x.strip().replace(b"\x00", b"") for x in arg.split(b"=")]
if len(kv) != 2 or not kv[1]:
continue
kv[0] = kv[0].decode('utf-8')
if kv[0] in KEYS.values():
field = [x for x, y in KEYS.items() if y == kv[0]][0]
setattr(self, field, kv[1])


class HICPReconfigured(Packet):
name = "Reconfigured"
fields_desc = [
MACField("source", "ff:ff:ff:ff:ff:ff")
]

def post_build(self, p, pay):
p = "{0}: {1}".format(CMD_RECONFIGURED.decode('utf-8'),
FROM_MACFIELD(self.source))
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(r".*: ([a-fA-F0-9\-\:]+)", s.decode('utf-8'))
if res:
self.source = TO_MACFIELD(res.group(1))
return None


class HICPInvalidConfiguration(Packet):
name = "Invalid configuration"
fields_desc = [
MACField("source", "ff:ff:ff:ff:ff:ff")
]

def post_build(self, p, pay):
p = "{0}: {1}".format(CMD_INVALIDCONF.decode('utf-8'),
FROM_MACFIELD(self.source))
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(r".*: ([a-fA-F0-9\-\:]+)", s.decode('utf-8'))
if res:
self.source = TO_MACFIELD(res.group(1))
return None


class HICPInvalidPassword(Packet):
name = "Invalid password"
fields_desc = [
MACField("source", "ff:ff:ff:ff:ff:ff")
]

def post_build(self, p, pay):
p = "{0}: {1}".format(CMD_INVALIDPWD.decode('utf-8'),
FROM_MACFIELD(self.source))
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match(r".*: ([a-fA-F0-9\-\:]+)", s.decode('utf-8'))
if res:
self.source = TO_MACFIELD(res.group(1))
return None


class HICPWink(Packet):
name = "Wink"
fields_desc = [
MACField("target", "ff:ff:ff:ff:ff:ff"),
ByteField("padding", 0)
]

def post_build(self, p, pay):
p = "To: {0};{1};".format(FROM_MACFIELD(self.target),
CMD_WINK.decode('utf-8').upper())
return p.encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
res = match("^To: ([^;]+);", s.decode('utf-8'))
if res:
self.target = TO_MACFIELD(res.group(1))


class HICPModuleScanResponse(Packet):
name = "Module scan response"
fields_desc = [
StrField("protocol_version", "1.00"),
StrField("fieldbus_type", ""),
StrField("module_version", ""),
MACField("mac_address", "ff:ff:ff:ff:ff:ff"),
IPField("ip_address", "255.255.255.255"),
IPField("subnet_mask", "255.255.255.0"),
IPField("gateway_address", "0.0.0.0"),
StrField("dhcp", "OFF"), # ON or OFF
StrField("password", "OFF"), # ON or OFF
StrField("hostname", ""),
IPField("dns1", "0.0.0.0"),
IPField("dns2", "0.0.0.0"),
ByteField("padding", 0)
]

def post_build(self, p, pay):
p = []
for field in self.fields_desc:
if field.name in KEYS:
value = getattr(self, field.name)
if isinstance(value, bytes):
value = value.decode('utf-8')
p.append("{0} = {1};".format(KEYS[field.name], value))
return "".join(p).encode('utf-8') + b"\x00" + pay

def do_dissect(self, s):
for arg in s.split(b";"):
kv = [x.strip().replace(b"\x00", b"") for x in arg.split(b"=")]
if len(kv) != 2 or not kv[1]:
continue
kv[0] = kv[0].decode('utf-8')
if kv[0] in KEYS.values():
field = [x for x, y in KEYS.items() if y == kv[0]][0]
if field == "mac_address":
kv[1] = TO_MACFIELD(kv[1].decode('utf-8'))
setattr(self, field, kv[1])


class HICPModuleScan(Packet):
name = "Module scan request"
fields_desc = [
StrField("hicp_command", CMD_MODULESCAN),
ByteField("padding", 0)
]

def do_dissect(self, s):
if len(s) > len(CMD_MODULESCAN):
self.hicp_command = s[:len(CMD_MODULESCAN)]
self.padding = s[len(CMD_MODULESCAN):]
else:
self.padding = RawVal(s)

def post_build(self, p, pay):
return p.upper() + pay


class HICP(Packet):
name = "HICP"
fields_desc = [
StrField("hicp_command", "")
]

def do_dissect(self, s):
for cmd in [CMD_MODULESCAN, CMD_CONFIGURE, CMD_RECONFIGURED,
CMD_INVALIDCONF, CMD_INVALIDPWD]:
if s[:len(cmd)] == cmd:
self.hicp_command = cmd
return s[len(cmd):]
if s[:len("To:")] == b"To:":
self.hicp_command = CMD_WINK
else:
self.hicp_command = CMD_MSRESPONSE
return s

def post_build(self, p, pay):
p = p[len(self.hicp_command):]
return p + pay


bind_bottom_up(UDP, HICP, dport=3250)
bind_bottom_up(UDP, HICP, sport=3250)
bind_layers(UDP, HICP, sport=3250, dport=3250)
bind_layers(HICP, HICPModuleScan, hicp_command=CMD_MODULESCAN)
bind_layers(HICP, HICPModuleScanResponse, hicp_command=CMD_MSRESPONSE)
bind_layers(HICP, HICPWink, hicp_command=CMD_WINK)
bind_layers(HICP, HICPConfigure, hicp_command=CMD_CONFIGURE)
bind_layers(HICP, HICPReconfigured, hicp_command=CMD_RECONFIGURED)
bind_layers(HICP, HICPInvalidConfiguration, hicp_command=CMD_INVALIDCONF)
bind_layers(HICP, HICPInvalidPassword, hicp_command=CMD_INVALIDPWD)
113 changes: 113 additions & 0 deletions test/contrib/hicp.uts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
% HICP test campaign

#
# execute test:
# > test/run_tests -t test/contrib/hicp.uts
#

+ Syntax check
= Import the HICP layer
from scapy.contrib.hicp import *

+ HICP Module scan request
= Build and dissect module scan
pkt = HICPModuleScan()
assert(pkt.hicp_command == b"Module scan")
assert(raw(pkt) == b"MODULE SCAN\x00")
pkt = HICP(b"Module scan\x00")
assert(pkt.hicp_command == b"Module scan")

+ HICP Module scan response
= Build and dissect device description
pkt=HICPModuleScanResponse(fieldbus_type="kwack")
assert(pkt.protocol_version == b"1.00")
assert(pkt.fieldbus_type == b"kwack")
assert(pkt.mac_address == "ff:ff:ff:ff:ff:ff")
pkt=HICP(
b"\x50\x72\x6f\x74\x6f\x63\x6f\x6c\x20\x76\x65\x72\x73\x69\x6f\x6e" \
b"\x20\x3d\x20\x31\x2e\x30\x30\x3b\x46\x42\x20\x74\x79\x70\x65\x20" \
b"\x3d\x20\x3b\x4d\x6f\x64\x75\x6c\x65\x20\x76\x65\x72\x73\x69\x6f" \
b"\x6e\x20\x3d\x20\x3b\x4d\x41\x43\x20\x3d\x20\x65\x65\x3a\x65\x65" \
b"\x3a\x65\x65\x3a\x65\x65\x3a\x65\x65\x3a\x65\x65\x3b\x49\x50\x20" \
b"\x3d\x20\x32\x35\x35\x2e\x32\x35\x35\x2e\x32\x35\x35\x2e\x32\x35" \
b"\x35\x3b\x53\x4e\x20\x3d\x20\x32\x35\x35\x2e\x32\x35\x35\x2e\x32" \
b"\x35\x35\x2e\x30\x3b\x47\x57\x20\x3d\x20\x30\x2e\x30\x2e\x30\x2e" \
b"\x30\x3b\x44\x48\x43\x50\x20\x3d\x20\x4f\x46\x46\x3b\x48\x4e\x20" \
b"\x3d\x20\x3b\x44\x4e\x53\x31\x20\x3d\x20\x30\x2e\x30\x2e\x30\x2e" \
b"\x30\x3b\x44\x4e\x53\x32\x20\x3d\x20\x30\x2e\x30\x2e\x30\x2e\x30" \
b"\x3b\x00"
)
assert(pkt.hicp_command == b"Module scan response")
assert(pkt.protocol_version == b"1.00")
assert(pkt.mac_address == "ee:ee:ee:ee:ee:ee")
assert(pkt.subnet_mask == "255.255.255.0")
pkt=HICP(b"Protocol version = 2; FB type = TEST;Module version = 1.0.0;MAC = cc:cc:cc:cc:cc:cc;IP = 192.168.1.1;SN = 255.255.255.0;GW = 192.168.1.254;DHCP=ON;HN = bonjour;DNS1 = 1.1.1.1;DNS2 = 2.2.2.2")
assert(pkt.hicp_command == b"Module scan response")
assert(pkt.protocol_version == b"2")
assert(pkt.fieldbus_type == b"TEST")
assert(pkt.module_version == b"1.0.0")
assert(pkt.mac_address == "cc:cc:cc:cc:cc:cc")
assert(pkt.ip_address == "192.168.1.1")
assert(pkt.subnet_mask == "255.255.255.0")
assert(pkt.gateway_address == "192.168.1.254")
assert(pkt.dhcp == b"ON")
assert(pkt.hostname == b"bonjour")
assert(pkt.dns1 == "1.1.1.1")
assert(pkt.dns2 == "2.2.2.2")

+ HICP Wink request
= Build and dissect Winks
pkt = HICPWink(target="dd:dd:dd:dd:dd:dd")
assert(pkt.target == "dd:dd:dd:dd:dd:dd")
pkt = HICP(b"To: bb:bb:bb:bb:bb:bb;WINK;\x00")
assert(pkt.target == "bb:bb:bb:bb:bb:bb")

+ HICP Configure request
= Build and dissect new network settings
pkt = HICPConfigure(target="aa:aa:aa:aa:aa:aa", hostname="llama")
assert(pkt.target == "aa:aa:aa:aa:aa:aa")
assert(pkt.ip_address == "255.255.255.255")
assert(pkt.hostname == b"llama")
assert(raw(pkt) == b"Configure: aa-aa-aa-aa-aa-aa;IP = 255.255.255.255;SN = 255.255.255.0;GW = 0.0.0.0;DHCP = OFF;HN = llama;DNS1 = 0.0.0.0;DNS2 = 0.0.0.0;\x00")
pkt = HICP(b"Configure: aa-aa-aa-aa-aa-aa;IP = 255.255.255.255;SN = 255.255.255.0;GW = 0.0.0.0;DHCP = OFF;HN = llama;DNS1 = 0.0.0.0;DNS2 = 0.0.0.0;\x00")
assert(pkt.hicp_command == b"Configure")
assert(pkt.target == "aa:aa:aa:aa:aa:aa")
assert(pkt.ip_address == "255.255.255.255")
assert(pkt.hostname == b"llama")

+ HICP Configure response
= Build and dissect successful response to configure request

pkt = HICPReconfigured(source="11:00:00:00:00:00")
assert(pkt.source == "11:00:00:00:00:00")
assert(raw(pkt) == b"Reconfigured: 11-00-00-00-00-00\x00")
pkt = HICP(b"\x52\x65\x63\x6f\x6e\x66\x69\x67\x75\x72\x65\x64\x3a\x20\x31\x31" \
b"\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x00")
assert(pkt.hicp_command == b"Reconfigured")
assert(pkt.source == "11:00:00:00:00:00")

+ HICP Configure error
= Build and dissect error response to configure request

pkt = HICPInvalidConfiguration(source="00:11:00:00:00:00")
assert(pkt.source == "00:11:00:00:00:00")
assert(raw(pkt) == b"Invalid Configuration: 00-11-00-00-00-00\x00")
pkt = HICP(
b"\x49\x6e\x76\x61\x6c\x69\x64\x20\x43\x6f\x6e\x66\x69\x67\x75\x72" \
b"\x61\x74\x69\x6f\x6e\x3a\x20\x30\x30\x2d\x31\x31\x2d\x30\x30\x2d" \
b"\x30\x30\x2d\x30\x30\x2d\x30\x30\x00"
)
assert(pkt.hicp_command == b"Invalid Configuration")
assert(pkt.source == "00:11:00:00:00:00")

+ HICP Configure invalid password
= Build and dissect invalid password response to configure request

pkt = HICPInvalidPassword(source="00:00:11:00:00:00")
assert(pkt.source == "00:00:11:00:00:00")
assert(raw(pkt) == b"Invalid Password: 00-00-11-00-00-00\x00")
pkt = HICP(b"\x49\x6e\x76\x61\x6c\x69" \
b"\x64\x20\x50\x61\x73\x73\x77\x6f\x72\x64\x3a\x20\x30\x30\x2d\x30" \
b"\x30\x2d\x31\x31\x2d\x30\x30\x2d\x30\x30\x2d\x30\x30\x00")
assert(pkt.hicp_command == b"Invalid Password")
assert(pkt.source == "00:00:11:00:00:00")

0 comments on commit 204605b

Please sign in to comment.