Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linux Refactor: Episode 2: Attack of the Overlays #2670

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/services/ardupilot_manager/ArduPilotManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def get_default_params_cmdline(self, platform: Platform) -> str:

async def start_linux_board(self, board: LinuxFlightController) -> None:
self._current_board = board
board.setup_board()
if not self.firmware_manager.is_firmware_installed(self._current_board):
if board.platform == Platform.Navigator:
self.firmware_manager.install_firmware_from_file(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// This is a custom device tree overlay for the spi0 peripheral on the
// Raspberry Pi 4. It will configure only the spi0 mosi pin
// (The other spi0 pins will not be driven by the spi0 peripheral,
// and can be used for other functions). This is to be used with
// the Blue Robotics Navigator autopilot hat, where the RGB
// 'neopixel' led data pin is connected to the spi0 mosi pin on the
// Raspberry Pi 4.

/dts-v1/;
/plugin/;


/ {
compatible = "brcm,bcm2835";

fragment@0 {
target = <&spi0_cs_pins>;
frag0: __overlay__ {
brcm,pins = <>;
};
};

fragment@1 {
target = <&spi0>;
frag1: __overlay__ {
cs-gpios = <>;
status = "okay";
};
};

fragment@2 {
target = <&spidev1>;
__overlay__ {
status = "disabled";
};
};

fragment@3 {
target = <&spi0_pins>;
__overlay__ {
brcm,pins = <10>;
};
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import re
import shlex
import subprocess
import time
from dataclasses import dataclass
from typing import List, Optional

all_dtparams = [
"i2c_vc=on",
"i2c_arm_baudrate=1000000",
"spi=on",
"enable_uart=1",
]


other_overlays = [
"dwc2 dr_mode=otg",
]
devices = {
"ADS1115": (0x48, 1),
"AK09915": (0x0C, 1),
"BME280": (0x76, 1),
"PCA9685": (0x40, 4),
}


class SetupError(Exception):
pass


@dataclass
class I2cDevice:
device: str
overlay: str
pins: dict[int, str]


@dataclass
class SpiDevice:
device: str
overlay: str
pins: dict[int, str]


@dataclass
class SerialDevice:
device: str
overlay: str
pins: dict[int, str]


@dataclass
class GpioSetup:
number: int
function: str
pull: str
value: str


i2c_module = "i2c-dev"
i2c_dtparams = [
"i2c_vc=on",
"i2c_arm_baudrate=1000000",
]

i2c_devices: List[I2cDevice] = [
I2cDevice(device="i2c-6", overlay="i2c6 pins_22_23=true baudrate=400000", pins={22: "SDA6", 23: "SCL6"}),
I2cDevice(device="i2c-1", overlay="i2c1", pins={2: "SDA1", 3: "SCL1"}),
I2cDevice(device="i2c-4", overlay="i2c4 pins_6_7=true baudrate=400000", pins={6: "SDA4", 7: "SCL4"}),
]

spi_devices: List[SpiDevice] = [
SpiDevice(device="spidev1.0", overlay="spi1-3cs", pins={19: "SPI1_MISO", 20: "SPI1_MOSI", 21: "SPI1_SCLK"}),
SpiDevice(
device="spidev0.0",
overlay="spi0-led",
pins={
10: "SPI0_MOSI",
},
),
]

gpios = [
GpioSetup(number=11, function="OUT", pull="UP", value="HIGH"),
GpioSetup(number=24, function="OUT", pull="UP", value="HIGH"),
GpioSetup(number=25, function="OUT", pull="UP", value="HIGH"),
GpioSetup(number=37, function="OUT", pull="DOWN", value="LOW"),
]


def enable_i2c_module() -> None:
modules = subprocess.check_output("lsmod")
if "i2c_dev" in str(modules):
return
print(f"loading module {i2c_module}...")
output = subprocess.check_output(shlex.split(f"modprobe {i2c_module}"))
print(output)


def enable_spi_module() -> None:
modules = subprocess.check_output("lsmod")
if "spi_dev" in str(modules):
return
print(f"loading module {i2c_module}...")
output = subprocess.check_output(shlex.split(f"modprobe {i2c_module}"))
print(output)


@dataclass
class GpioState:
number: int
level: int
fsel: int
alt: Optional[int]
func: str
pull: str


def get_gpios_state() -> dict[int, GpioState]:
output = subprocess.check_output(["raspi-gpio", "get"]).decode("utf-8")
pattern = r"GPIO (?P<gpio>\d+): level=(?P<level>\d) fsel=(?P<fsel>\d)(?: alt=(?P<alt>\d))? func=(?P<func>[\w\d_]+) pull=(?P<pull>\w+)"

# Using findall to extract all matches
matches = re.finditer(pattern, output)
gpio_states = {}
# Print each match
for match in matches:
gpio_states[int(match.group("gpio"))] = GpioState(
number=int(match.group("gpio")),
level=int(match.group("level")),
fsel=int(match.group("fsel")),
alt=int(match.group("alt")) if match.group("alt") else None,
func=match.group("func"),
pull=match.group("pull"),
)
return gpio_states


def load_overlay(overlay: str) -> None:
output = subprocess.check_output(shlex.split(f"dtoverlay {overlay}")).decode("utf-8")
print(output)


enable_i2c_module()
enable_spi_module()

states = get_gpios_state()
all_devices: List[I2cDevice | SpiDevice] = [*i2c_devices, *spi_devices]
for device in all_devices:
needs_reload = False
for gpio, function in device.pins.items():
if states[gpio].func != function:
print(f"GPIO {gpio} is not configured as {function}, instad it is {states[gpio].func}")
print(f"{device.overlay} needs to be loaded")
needs_reload = True
if needs_reload:
load_overlay(device.overlay)
time.sleep(2)
new_state = get_gpios_state()
for gpio, function in device.pins.items():
if states[gpio].func != function:
print(f"GPIO {gpio} is STILL not configured as {function}, instad it is {states[gpio].func}")
raise SetupError("Failed to configure device")
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def get_serial_cmdlines(self) -> str:
def get_serials(self) -> List[Serial]:
raise NotImplementedError

def setup_board(self) -> None:
raise NotImplementedError

def check_for_i2c_device(self, bus_number: int, address: int) -> bool:
try:
bus = SMBus(bus_number)
Expand Down
Loading