Skip to content

Commit

Permalink
Controller enable-folded-pipe argument.
Browse files Browse the repository at this point in the history
Now the controller does not enable the folded pipeline by default. This
allows to use 2-pipes switches, but limits the processable packet
payload to 256B. To support a 1024B payload, a switch with 4 pipes is
needed, and the enable-folded-pipe argument will set all ports in pipes
1,2,3 in loopback mode (only ports in pipe 0 can be used to connect
servers).
  • Loading branch information
AmedeoSapio committed Mar 11, 2022
1 parent 00beb26 commit 7ded628
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 103 deletions.
1 change: 1 addition & 0 deletions dev_root/controller/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The optional arguments are the following:
| --switch-mac SWITCH_MAC | MAC address of the switch | 00:11:22:33:44:55 |
| --switch-ip SWITCH_IP | IP address of switch | 10.0.0.254 |
| --ports FILE | YAML file describing machines connected to ports | ports.yaml |
| --enable-folded-pipe | Enable the folded pipeline (requires a 4 pipes switch) | disabled |
| --log-level LEVEL | Logging level: ERROR, WARNING, INFO, DEBUG | INFO |


Expand Down
10 changes: 9 additions & 1 deletion dev_root/controller/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
class GRPCServer(switchml_pb2_grpc.SessionServicer,
switchml_pb2_grpc.SyncServicer):

def __init__(self, ip='[::]', port=50099):
def __init__(self, ip='[::]', port=50099, folded_pipe=False):

self.log = logging.getLogger(__name__)

self.ip = ip
self.port = port
self.folded_pipe = folded_pipe

# Event to stop the server
self._stopped = asyncio.Event()
Expand Down Expand Up @@ -213,6 +214,13 @@ def RdmaSession(self, request, context):
str(PacketSize(request.packet_size)).split('.')[1][4:],
request.message_size, request.qpns, request.psns))

if not self.folded_pipe and PacketSize(
request.packet_size) == PacketSize.MTU_1024:
self.log.error(
"Processing 1024B per packet requires a folded pipeline. Using 256B payload."
)
request.packet_size = int(PacketSize.MTU_256)

if not self.ctrl:
# This is a test, return the received parameters
return switchml_pb2.RdmaSessionResponse(
Expand Down
77 changes: 41 additions & 36 deletions dev_root/controller/next_step_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Flag(IntEnum):

class NextStepSelector(Control):

def __init__(self, target, gc, bfrt_info):
def __init__(self, target, gc, bfrt_info, folded_pipe):
# Set up base class
super(NextStepSelector, self).__init__(target, gc)

Expand All @@ -39,6 +39,8 @@ def __init__(self, target, gc, bfrt_info):
]
self.table = self.tables[0]

self.folded_pipe = folded_pipe

# Counters
self.broadcast_counter = bfrt_info.table_get(
'pipe.Ingress.next_step_selector.broadcast_counter')
Expand All @@ -61,7 +63,7 @@ def add_default_entries(self):
''' Add default entries '''

# Special recirculation ports used for harvest passes
port = {1: 452, 2: 324, 3: 448, 4: 196, 5: 192, 6: 68, 7: 64}
port = {1: 452, 2: 324, 3: 448, 4: 196, 5: 192, 6: 64, 7: 68}

# yapf: disable
entries = [
Expand All @@ -83,41 +85,44 @@ def add_default_entries(self):
## 512B packets not supported at the moment
( PacketSize.MTU_512, None, None, None, None, 5, 'drop', None)]

## 1024B packets
entries.extend([
## Pipe 0, packet not seen before -> pipe 1 (load balance on worker id)
(PacketSize.MTU_1024, i, PacketType.CONSUME0, None, 0, 6, 'recirculate_for_CONSUME1', (1 << 7) + i * 4)
for i in range(16)])
entries.extend([
## Pipe 0, retransmitted packet to a full slot -> pipe 1
## Run through the same path as novel packets (do not skip to harvest) to ensure ordering
(PacketSize.MTU_1024, i, PacketType.CONSUME0, Flag.FIRST, None, 7, 'recirculate_for_CONSUME1', (1 << 7) + i * 4)
for i in range(16)])
if self.folded_pipe:
## 1024B packets
entries.extend([
## Pipe 0, packet not seen before -> pipe 1 (load balance on worker id)
(PacketSize.MTU_1024, i, PacketType.CONSUME0, None, 0, 6, 'recirculate_for_CONSUME1', (1 << 7) + i * 4)
for i in range(16)])
entries.extend([
## Pipe 0, retransmitted packet to a full slot -> pipe 1
## Run through the same path as novel packets (do not skip to harvest) to ensure ordering
(PacketSize.MTU_1024, i, PacketType.CONSUME0, Flag.FIRST, None, 7, 'recirculate_for_CONSUME1', (1 << 7) + i * 4)
for i in range(16)])
entries.extend([
## Drop other CONSUME0 packets
(PacketSize.MTU_1024, None, PacketType.CONSUME0, None, None, 8, 'drop', None),
## Pipe 1 -> pipe 2
(PacketSize.MTU_1024, None, PacketType.CONSUME1, None, None, 9, 'recirculate_for_CONSUME2_same_port_next_pipe', None),
## Pipe 2 -> pipe 3
(PacketSize.MTU_1024, None, PacketType.CONSUME2, None, None, 10, 'recirculate_for_CONSUME3_same_port_next_pipe', None),
## Pipe 4
## For CONSUME3 packets that are the last packet, recirculate for harvest
## The last pass is a combined consume/harvest pass, so skip directly to HARVEST1
(PacketSize.MTU_1024, None, PacketType.CONSUME3, Flag.LAST, None, 11, 'recirculate_for_HARVEST1', port[1]),
## Just consume any CONSUME3 packets if they're not last and we haven't seen them before
(PacketSize.MTU_1024, None, PacketType.CONSUME3, None, 0, 12, 'finish_consume', None),
## CONSUME3 packets that are retransmitted packets to a full slot -> recirculate for harvest
(PacketSize.MTU_1024, None, PacketType.CONSUME3, Flag.FIRST, None, 13, 'recirculate_for_HARVEST1', port[1]),
## Drop others
(PacketSize.MTU_1024, None, PacketType.CONSUME3, None, None, 14, 'drop', None),
## Harvesting 128B at a time
(PacketSize.MTU_1024, None, PacketType.HARVEST0, None, None, 15, 'recirculate_for_HARVEST1', port[1]),
(PacketSize.MTU_1024, None, PacketType.HARVEST1, None, None, 16, 'recirculate_for_HARVEST2', port[2]),
(PacketSize.MTU_1024, None, PacketType.HARVEST2, None, None, 17, 'recirculate_for_HARVEST3', port[3]),
(PacketSize.MTU_1024, None, PacketType.HARVEST3, None, None, 18, 'recirculate_for_HARVEST4', port[4]),
(PacketSize.MTU_1024, None, PacketType.HARVEST4, None, None, 19, 'recirculate_for_HARVEST5', port[5]),
(PacketSize.MTU_1024, None, PacketType.HARVEST5, None, None, 20, 'recirculate_for_HARVEST6', port[6]),
(PacketSize.MTU_1024, None, PacketType.HARVEST6, None, None, 21, 'recirculate_for_HARVEST7', port[7])])

entries.extend([
## Drop other CONSUME0 packets
(PacketSize.MTU_1024, None, PacketType.CONSUME0, None, None, 8, 'drop', None),
## Pipe 1 -> pipe 2
(PacketSize.MTU_1024, None, PacketType.CONSUME1, None, None, 9, 'recirculate_for_CONSUME2_same_port_next_pipe', None),
## Pipe 2 -> pipe 3
(PacketSize.MTU_1024, None, PacketType.CONSUME2, None, None, 10, 'recirculate_for_CONSUME3_same_port_next_pipe', None),
## Pipe 4
## For CONSUME3 packets that are the last packet, recirculate for harvest
## The last pass is a combined consume/harvest pass, so skip directly to HARVEST1
(PacketSize.MTU_1024, None, PacketType.CONSUME3, Flag.LAST, None, 11, 'recirculate_for_HARVEST1', port[1]),
## Just consume any CONSUME3 packets if they're not last and we haven't seen them before
(PacketSize.MTU_1024, None, PacketType.CONSUME3, None, 0, 12, 'finish_consume', None),
## CONSUME3 packets that are retransmitted packets to a full slot -> recirculate for harvest
(PacketSize.MTU_1024, None, PacketType.CONSUME3, Flag.FIRST, None, 13, 'recirculate_for_HARVEST1', port[1]),
## Drop others
(PacketSize.MTU_1024, None, PacketType.CONSUME3, None, None, 14, 'drop', None),
## Harvesting 128B at a time
(PacketSize.MTU_1024, None, PacketType.HARVEST0, None, None, 15, 'recirculate_for_HARVEST1', port[1]),
(PacketSize.MTU_1024, None, PacketType.HARVEST1, None, None, 16, 'recirculate_for_HARVEST2', port[2]),
(PacketSize.MTU_1024, None, PacketType.HARVEST2, None, None, 17, 'recirculate_for_HARVEST3', port[3]),
(PacketSize.MTU_1024, None, PacketType.HARVEST3, None, None, 18, 'recirculate_for_HARVEST4', port[4]),
(PacketSize.MTU_1024, None, PacketType.HARVEST4, None, None, 19, 'recirculate_for_HARVEST5', port[5]),
(PacketSize.MTU_1024, None, PacketType.HARVEST5, None, None, 20, 'recirculate_for_HARVEST6', port[6]),
(PacketSize.MTU_1024, None, PacketType.HARVEST6, None, None, 21, 'recirculate_for_HARVEST7', port[7]),
## Harvest pass 7: final pass
## Read final 128B and broadcast any HARVEST packets that are not
## retransmitted and are the last packet
Expand Down
17 changes: 1 addition & 16 deletions dev_root/controller/pre.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

class PRE(Control):

def __init__(self, target, gc, bfrt_info, cpu_port):
def __init__(self, target, gc, bfrt_info):
# Set up base class
super(PRE, self).__init__(target, gc)

Expand All @@ -39,11 +39,8 @@ def __init__(self, target, gc, bfrt_info, cpu_port):
self.node = self.tables[1]
self.port = self.tables[5]

self.cpu_port = cpu_port

# Clear tables and add defaults
self._clear()
self.add_default_entries()

def _clear(self):
''' Remove all existing multicast groups and nodes '''
Expand All @@ -57,18 +54,6 @@ def _clear(self):
for _, k in resp:
self.node.entry_del(self.target, [k])

def add_default_entries(self):
''' Set CPU port '''

self.log.info('CPU port: {}'.format(self.cpu_port))

self.port.entry_add(self.target, [
self.port.make_key([self.gc.KeyTuple('$DEV_PORT', self.cpu_port)])
], [
self.port.make_data(
[self.gc.DataTuple('$COPY_TO_CPU_PORT_ENABLE', bool_val=True)])
])

def add_multicast_group(self, mgid):
''' Add an empty multicast group.
Expand Down
124 changes: 74 additions & 50 deletions dev_root/controller/switchml.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,22 @@ def critical_error(self, msg):
#sys.exit(1)
os.kill(os.getpid(), signal.SIGTERM)

def setup(
self,
program,
switch_mac,
switch_ip,
bfrt_ip,
bfrt_port,
ports_file,
):
def setup(self,
program,
switch_mac,
switch_ip,
bfrt_ip,
bfrt_port,
ports_file,
folded_pipe=False):

# Device 0
self.dev = 0
# Target all pipes
self.target = gc.Target(self.dev, pipe_id=0xFFFF)
# Folded pipe
self.folded_pipe = folded_pipe

# Connect to BFRT server
try:
interface = gc.ClientInterface('{}:{}'.format(bfrt_ip, bfrt_port),
Expand All @@ -118,47 +120,61 @@ def setup(
# Ports table
self.ports = Ports(self.target, gc, self.bfrt_info)

# Enable loopback on front panel ports
loopback_ports = (
[64] + # Pipe 0 CPU ethernet port
# Pipe 0: all 16 front-panel ports
#list(range( 0, 0+64,4)) +
# Pipe 1: all 16 front-panel ports
list(range(128, 128 + 64, 4)) +
# Pipe 2: all 16 front-panel ports
list(range(256, 256 + 64, 4)) +
# Pipe 3: all 16 front-panel ports
list(range(384, 384 + 64, 4)))
print('Setting {} front panel ports in loopback mode'.format(
len(loopback_ports)))
self.ports.set_loopback_mode(loopback_ports)

# Enable loopback on PktGen ports
pktgen_ports = [192, 448]

if not self.ports.get_loopback_mode_pktgen(pktgen_ports):
# Not all PktGen ports are in loopback mode

print('\nYou must \'remove\' the ports in the BF ucli:\n')
for p in pktgen_ports:
print(' bf-sde> dvm rmv_port 0 {}'.format(p))
input('\nPress Enter to continue...')

if not self.ports.set_loopback_mode_pktgen(pktgen_ports):
if self.folded_pipe:
try:
# Enable loopback on front panel ports
loopback_ports = (
[64] + # Pipe 0 CPU ethernet port
# Pipe 0: all 16 front-panel ports
#list(range( 0, 0+64,4)) +
# Pipe 1: all 16 front-panel ports
list(range(128, 128 + 64, 4)) +
# Pipe 2: all 16 front-panel ports
list(range(256, 256 + 64, 4)) +
# Pipe 3: all 16 front-panel ports
list(range(384, 384 + 64, 4)))
print(
'Setting {} front panel ports in loopback mode'.format(
len(loopback_ports)))
self.ports.set_loopback_mode(loopback_ports)

# Enable loopback on PktGen ports
pktgen_ports = [192, 448]

if not self.ports.get_loopback_mode_pktgen(pktgen_ports):
# Not all PktGen ports are in loopback mode

print(
'\nYou must \'remove\' the ports in the BF ucli:\n')
for p in pktgen_ports:
print(' bf-sde> dvm rmv_port 0 {}'.format(p))
input('\nPress Enter to continue...')

if not self.ports.set_loopback_mode_pktgen(
pktgen_ports):
self.critical_error(
'Failed setting front panel ports in loopback mode'
)

print('\nAdd the ports again:\n')
for p in pktgen_ports:
print(
' bf-sde> dvm add_port 0 {} 100 0'.format(p))
input('\nPress Enter to continue...')

if not self.ports.get_loopback_mode_pktgen(
pktgen_ports):
self.critical_error(
'Front panel ports are not in loopback mode')

except gc.BfruntimeReadWriteRpcException:
self.critical_error(
'Failed setting front panel ports in loopback mode')

print('\nAdd the ports again:\n')
for p in pktgen_ports:
print(' bf-sde> dvm add_port 0 {} 100 0'.format(p))
input('\nPress Enter to continue...')

if not self.ports.get_loopback_mode_pktgen(pktgen_ports):
self.critical_error(
'Front panel ports are not in loopback mode')
'Error while setting ports in loopback mode. \
If the switch has only 2 pipes, the folded pipeline cannot be enabled.'
)

# Packet Replication Engine table
self.pre = PRE(self.target, gc, self.bfrt_info, self.cpu_port)
self.pre = PRE(self.target, gc, self.bfrt_info)

# Setup tables
# Forwarder
Expand Down Expand Up @@ -187,7 +203,8 @@ def setup(
self.processors.append(p)
# Next step selector
self.next_step_selector = NextStepSelector(self.target, gc,
self.bfrt_info)
self.bfrt_info,
self.folded_pipe)
# RDMA sender
self.rdma_sender = RDMASender(self.target, gc, self.bfrt_info)
# UDP sender
Expand All @@ -209,7 +226,9 @@ def setup(
self.cli.setup(self, prompt='SwitchML', name='SwitchML controller')

# Set up gRPC server
self.grpc_server = GRPCServer(ip='[::]', port=50099)
self.grpc_server = GRPCServer(ip='[::]',
port=50099,
folded_pipe=self.folded_pipe)

# Run event loop for gRPC server in a separate thread
# limit concurrency to 1 to avoid synchronization problems in the BFRT interface
Expand Down Expand Up @@ -620,6 +639,11 @@ def run(self):
default='ports.yaml',
help=
'YAML file describing machines connected to ports. Default: ports.yaml')
argparser.add_argument(
'--enable-folded-pipe',
default=False,
action='store_true',
help='Enable the folded pipeline (requires a 4 pipes switch)')
argparser.add_argument('--log-level',
default='INFO',
choices=['ERROR', 'WARNING', 'INFO', 'DEBUG'],
Expand Down Expand Up @@ -652,7 +676,7 @@ def run(self):

ctrl = SwitchML()
ctrl.setup(args.program, args.switch_mac, args.switch_ip, args.bfrt_ip,
args.bfrt_port, args.ports)
args.bfrt_port, args.ports, args.enable_folded_pipe)

# Start controller
ctrl.run()
Expand Down

0 comments on commit 7ded628

Please sign in to comment.