Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for one-way FPGA advertisement pin #22

Merged
merged 8 commits into from
Mar 7, 2024
77 changes: 68 additions & 9 deletions apollo_fpga/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# Copyright (c) 2020 Great Scott Gadgets <[email protected]>
# SPDX-License-Identifier: BSD-3-Clause

import os
import time
import usb.core

from .jtag import JTAGChain
Expand Down Expand Up @@ -32,13 +34,18 @@ def create_ila_frontend(ila, *, use_cs_multiplexing=False):
class ApolloDebugger:
""" Class representing a link to an Apollo Debug Module. """

# This VID/PID pair is unique to development LUNA boards.
# TODO: potentially change this to an OpenMoko VID, like other LUNA boards.
USB_IDS = [(0x1d50, 0x615c), (0x16d0, 0x05a5)]
# VID/PID pairs for Apollo and gateware.
APOLLO_USB_IDS = [(0x1d50, 0x615c)]
LUNA_USB_IDS = [(0x1d50, 0x615b)]

# If we have a LUNA_USB_IDS variable, we can use it to find the LUNA device.
if os.getenv("LUNA_USB_IDS"):
LUNA_USB_IDS += [tuple([int(x, 16) for x in os.getenv("LUNA_USB_IDS").split(":")])]

REQUEST_SET_LED_PATTERN = 0xa1
REQUEST_RECONFIGURE = 0xc0
REQUEST_FORCE_FPGA_OFFLINE = 0xc1
REQUEST_HONOR_FPGA_ADV = 0xc2

LED_PATTERN_IDLE = 500
LED_PATTERN_UPLOAD = 50
Expand Down Expand Up @@ -69,14 +76,30 @@ def __init__(self):
""" Sets up a connection to the debugger. """

# Try to create a connection to our Apollo debug firmware.
for vid, pid in self.USB_IDS:
device = usb.core.find(idVendor=vid, idProduct=pid)
if device is not None:
break
device = self._find_device(self.APOLLO_USB_IDS)

# If we couldn't find an Apollo device, bail out.
# If Apollo VID/PID is not found, try to find a gateware VID/PID with a valid Apollo stub
# interface. If found, request the gateware to liberate the USB port. In devices with a
# shared port, this effectively hands off the USB port to Apollo.
if device is None:
raise DebuggerNotFound()

# First, find the candidate device...
fpga_device = self._find_device(self.LUNA_USB_IDS, custom_match=self._device_has_stub_iface)
if fpga_device is None:
raise DebuggerNotFound("No Apollo or valid LUNA device found. "
"The LUNA_USB_IDS environment variable can be used to add custom VID:PID pairs.")

# ... and now request a USB handoff to Apollo
try:
self._request_handoff(fpga_device)
except usb.USBError as e:
raise DebuggerNotFound(f"Handoff request failed: {e.strerror}")

# Wait for Apollo to enumerate and try again
time.sleep(2)
device = self._find_device(self.APOLLO_USB_IDS)
if device is None:
raise DebuggerNotFound("Handoff was requested, but Apollo is not available")

self.device = device
self.major, self.minor = self.get_hardware_revision()
Expand All @@ -92,7 +115,39 @@ def __init__(self):
self.spi = DebugSPIConnection(self)
self.registers = self.spi

@classmethod
def _request_handoff(cls, device):
""" Requests the gateware to liberate the USB port. """
# Find the Apollo stub interface first
stub_if = cls._device_has_stub_iface(device, return_iface=True)
if stub_if is None:
raise DebuggerNotFound("No Apollo stub interface found")

# Send the request
intf_number = stub_if.bInterfaceNumber
REQUEST_APOLLO_ADV_STOP = 0xF0
request_type = usb.ENDPOINT_OUT | usb.RECIP_INTERFACE | usb.TYPE_VENDOR
device.ctrl_transfer(request_type, REQUEST_APOLLO_ADV_STOP, wIndex=intf_number, timeout=5000)

@staticmethod
def _find_device(ids, custom_match=None):
for vid, pid in ids:
device = usb.core.find(idVendor=vid, idProduct=pid, custom_match=custom_match)
if device is not None:
return device
return None

@staticmethod
def _device_has_stub_iface(device, return_iface=False):
""" Checks if a device has an Apollo stub interface present.

Optionally return the interface itself.
"""
for cfg in device:
stub_if = usb.util.find_descriptor(cfg, bInterfaceClass=0xFF, bInterfaceSubClass=0x00)
if stub_if is not None:
return stub_if if return_iface else True
return None if return_iface else False

def detect_connected_version(self):
""" Attempts to determine the revision of the connected hardware.
Expand Down Expand Up @@ -235,6 +290,10 @@ def force_fpga_offline(self):
""" Resets the target (FPGA/etc) connected to the debug controller. """
self.out_request(self.REQUEST_FORCE_FPGA_OFFLINE)

def honor_fpga_adv(self):
""" Tell Apollo to honor requests from FPGA_ADV again. Useful after reconfiguration. """
self.out_request(self.REQUEST_HONOR_FPGA_ADV)

def close(self):
""" Closes the USB device so it can be reused, possibly by another ApolloDebugger """

Expand Down
6 changes: 6 additions & 0 deletions apollo_fpga/commands/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def configure_fpga(device, args):

programmer.configure(bitstream)

# Let the LUNA gateware take over in devices with shared USB port
device.honor_fpga_adv()


def ensure_unconfigured(device):
with device.jtag as jtag:
Expand Down Expand Up @@ -210,6 +213,9 @@ def reconfigure_fpga(device, args):
""" Command that requests the attached ECP5 reconfigure itself from its SPI flash. """
device.soft_reset()

# Let the LUNA gateware take over in devices with shared USB port
device.honor_fpga_adv()


def force_fpga_offline(device, args):
""" Command that requests the attached ECP5 be held unconfigured. """
Expand Down
1 change: 1 addition & 0 deletions apollo_fpga/gateware/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .advertiser import ApolloAdvertiser
101 changes: 101 additions & 0 deletions apollo_fpga/gateware/advertiser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#
# This file is part of LUNA.
#
# Copyright (c) 2023 Great Scott Gadgets <[email protected]>
# SPDX-License-Identifier: BSD-3-Clause

""" Controllers for communicating with Apollo through the FPGA_ADV pin """

from amaranth import Elaboratable, Module, Signal, Mux
from amaranth_stdio.serial import AsyncSerialTX

from luna.gateware.usb.usb2.request import USBRequestHandler
from usb_protocol.types import USBRequestType, USBRequestRecipient


class ApolloAdvertiser(Elaboratable):
""" Gateware that implements a periodic announcement to Apollo using the FPGA_ADV pin.

Currently it is used to tell Apollo that the gateware wants to use the CONTROL port.
Apollo will keep the port switch connected to the FPGA after a reset as long as this
message is received periodically.
Once the port is lost, Apollo will ignore further messages until a specific vendor
request is called.

I/O ports:
I: stop -- Advertisement messages are stopped if this line is asserted.
"""
def __init__(self):
self.stop = Signal()

def default_request_handler(self, if_number):
return ApolloAdvertiserRequestHandler(if_number, self.stop)

def elaborate(self, platform):
m = Module()

clk_freq = platform.DEFAULT_CLOCK_FREQUENCIES_MHZ["sync"] * 1e6

# Communication is done with a serial transmitter (unidirectional)
baudrate = 9600
divisor = int(clk_freq // baudrate)
fpga_adv = AsyncSerialTX(divisor=divisor, data_bits=8, parity="even")
m.submodules += fpga_adv

# Counter with 50ms period
period = int(clk_freq * 50e-3)
timer = Signal(range(period))
m.d.sync += timer.eq(Mux(timer == period-1, 0, timer+1))

# Trigger announcement when the counter overflows
m.d.comb += [
fpga_adv.data .eq(ord('A')),
fpga_adv.ack .eq((timer == 0) & ~self.stop),
]

# Drive the FPGA_ADV pin with the serial transmitter
m.d.comb += platform.request("int").o.eq(fpga_adv.o)

return m


class ApolloAdvertiserRequestHandler(USBRequestHandler):
""" Request handler for ApolloAdvertiser.

Implements default vendor requests related to ApolloAdvertiser.
"""
REQUEST_APOLLO_ADV_STOP = 0xF0

def __init__(self, if_number, stop_pin):
super().__init__()
self.if_number = if_number
self.stop_pin = stop_pin

def elaborate(self, platform):
m = Module()

interface = self.interface
setup = self.interface.setup

#
# Vendor request handlers.

with m.If((setup.type == USBRequestType.VENDOR) & \
(setup.recipient == USBRequestRecipient.INTERFACE) & \
(setup.index == self.if_number)):

with m.If(setup.request == self.REQUEST_APOLLO_ADV_STOP):

# Notify that we want to manage this request
m.d.comb += interface.claim.eq(1)

# Once the receive is complete, respond with an ACK.
with m.If(interface.rx_ready_for_response):
m.d.comb += interface.handshakes_out.ack.eq(1)

# If we reach the status stage, send a ZLP.
with m.If(interface.status_requested):
m.d.comb += self.send_zlp()
m.d.usb += self.stop_pin.eq(1)

return m
154 changes: 154 additions & 0 deletions firmware/src/boards/cynthion_d11/fpga_adv.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* FPGA advertisement pin handling code.
*
* This file is part of Apollo.
*
* Copyright (c) 2023 Great Scott Gadgets <[email protected]>
* SPDX-License-Identifier: BSD-3-Clause
*/

#include "fpga_adv.h"
#include "usb_switch.h"
#include "apollo_board.h"
#include <hal/include/hal_gpio.h>

#include <bsp/board_api.h>
#include <hpl/pm/hpl_pm_base.h>
#include <hpl/gclk/hpl_gclk_base.h>
#include <peripheral_clk_config.h>

#ifdef BOARD_HAS_USB_SWITCH

// Store the timestamp of the last physical port advertisement
#define TIMEOUT 100UL
static uint32_t last_phy_adv = 0;

// Create a reference to our SERCOM object.
typedef Sercom sercom_t;
static sercom_t *sercom = SERCOM1;

static void fpga_adv_byte_received_cb(uint8_t byte, int parity_error);

#endif

/**
* Initialize FPGA_ADV receive-only serial port
*/
void fpga_adv_init(void)
{
#ifdef BOARD_HAS_USB_SWITCH
gpio_set_pin_direction(FPGA_ADV, GPIO_DIRECTION_IN);
gpio_set_pin_pull_mode(FPGA_ADV, GPIO_PULL_UP);

// Disable the SERCOM before configuring it, to 1) ensure we're not transacting
// during configuration; and 2) as many of the registers are R/O when the SERCOM is enabled.
while(sercom->USART.SYNCBUSY.bit.ENABLE);
sercom->USART.CTRLA.bit.ENABLE = 0;

// Software reset the SERCOM to restore initial values.
while(sercom->USART.SYNCBUSY.bit.SWRST);
sercom->USART.CTRLA.bit.SWRST = 1;

// The SWRST bit becomes accessible again once the software reset is
// complete -- we'll use this to wait for the reset to be finshed.
while(sercom->USART.SYNCBUSY.bit.SWRST);

// Ensure we can work with the full SERCOM.
while(sercom->USART.SYNCBUSY.bit.SWRST || sercom->USART.SYNCBUSY.bit.ENABLE);

// Pinmux the relevant pins to be used for the SERCOM.
gpio_set_pin_function(PIN_PA09, MUX_PA09C_SERCOM1_PAD3);

// Set up clocking for the SERCOM peripheral.
_pm_enable_bus_clock(PM_BUS_APBC, SERCOM1);
_gclk_enable_channel(SERCOM1_GCLK_ID_CORE, GCLK_CLKCTRL_GEN_GCLK0_Val);

// Configure the SERCOM for UART mode.
sercom->USART.CTRLA.reg =
SERCOM_USART_CTRLA_DORD | // LSB first
SERCOM_USART_CTRLA_RXPO(3) | // RX on PA09 (PAD[3])
SERCOM_USART_CTRLA_SAMPR(0) | // use 16x oversampling
SERCOM_USART_CTRLA_FORM(1) | // enable parity
SERCOM_USART_CTRLA_RUNSTDBY | // don't autosuspend the clock
SERCOM_USART_CTRLA_MODE_USART_INT_CLK; // use internal clock

// Configure our baud divisor.
const uint32_t baudrate = 9600;
const uint32_t baud = (((uint64_t)CONF_CPU_FREQUENCY << 16) - ((uint64_t)baudrate << 20)) / CONF_CPU_FREQUENCY;
sercom->USART.BAUD.reg = baud;

// Configure TX/RX and framing.
sercom->USART.CTRLB.reg =
SERCOM_USART_CTRLB_CHSIZE(0) | // 8-bit words
SERCOM_USART_CTRLB_RXEN; // Enable RX.

// Wait for our changes to apply.
while (sercom->USART.SYNCBUSY.bit.CTRLB);

// Enable our receive interrupt, as we want to asynchronously dump data into
// the UART console.
sercom->USART.INTENSET.reg = SERCOM_USART_INTENSET_RXC;

// Enable the UART IRQ.
NVIC_EnableIRQ(SERCOM1_IRQn);

// Finally, enable the SERCOM.
sercom->USART.CTRLA.bit.ENABLE = 1;
while(sercom->USART.SYNCBUSY.bit.ENABLE);

// Update timestamp
last_phy_adv = board_millis();
#endif
}

/**
* Task for things related with the advertisement pin
*/
void fpga_adv_task(void)
{
#ifdef BOARD_HAS_USB_SWITCH
// Take over USB after timeout
if (board_millis() - last_phy_adv >= TIMEOUT) {
take_over_usb();
}
#endif
}

/**
* Honor requests from FPGA_ADV again
*/
void honor_fpga_adv(void)
{
#ifdef BOARD_HAS_USB_SWITCH
if (board_millis() - last_phy_adv < TIMEOUT) {
hand_off_usb();
}
#endif
}


#ifdef BOARD_HAS_USB_SWITCH
/**
* FPGA_ADV interrupt handler.
*/
void SERCOM1_Handler(void)
{
// If we've just received a character, handle it.
if (sercom->USART.INTFLAG.bit.RXC)
{
// Read the relevant character, which marks this interrupt as serviced.
uint16_t byte = sercom->USART.DATA.reg;
fpga_adv_byte_received_cb(byte, sercom->USART.STATUS.bit.PERR);
}
}

static void fpga_adv_byte_received_cb(uint8_t byte, int parity_error) {
if (parity_error) {
return;
}

if (byte == 'A') {
last_phy_adv = board_millis();
}
}
#endif
Loading
Loading