Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor python tests #1325

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions bindings/util/estimator_kalman_emulator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import math
import cffirmware





class EstimatorKalmanEmulator:
"""
This class emulates the behavior of estimator_kalman.c and is used as a helper to enable testing of the kalman
core functionatlity. Estimator_kalman.c is tightly coupled to FreeRTOS (using
tasks for instance) and can not really be tested on this level, instead this class can be used to drive the
kalman core functionality.

The class emulates the measurement queue, the main loop in the task and the various calls to kalman core.

Methods are named in a similar way to the functions in estimator_kalman.c to make it easier to understand
how they are connected.

"""
def __init__(self, anchor_positions) -> None:
self.anchor_positions = anchor_positions
self.accSubSampler = cffirmware.Axis3fSubSampler_t()
self.gyroSubSampler = cffirmware.Axis3fSubSampler_t()
self.coreData = cffirmware.kalmanCoreData_t()
self.outlierFilterState = cffirmware.OutlierFilterTdoaState_t()

self.TDOA_ENGINE_MEASUREMENT_NOISE_STD = 0.30
self.PREDICT_RATE = 100
self.PREDICT_STEP_MS = 1000 / self.PREDICT_RATE

self._is_initialized = False

def run_one_1khz_iteration(self, sensor_samples) -> tuple[float, cffirmware.state_t]:
"""
Run one iteration of the estimation loop (runs at 1kHz)

Args:
sensor_samples : a list of samples to be consumed. The samples with time stamps that are used in this
iteration will be popped from the list.

Returns:
tuple[float, cffirmware.state_t]: A tuple containing the time stamp of this iteration and the
estimated state
"""
if not self._is_initialized:
first_sample = sensor_samples[0]
time_ms = int(first_sample[1]['timestamp'])
self._lazy_init(time_ms)

# Simplification, assume always flying
quad_is_flying = True

if self.now_ms > self.next_prediction_ms:
cffirmware.axis3fSubSamplerFinalize(self.accSubSampler)
cffirmware.axis3fSubSamplerFinalize(self.gyroSubSampler)

cffirmware.kalmanCorePredict(self.coreData, self.accSubSampler.subSample, self.gyroSubSampler.subSample,
self.now_ms, quad_is_flying)

self.next_prediction_ms += self.PREDICT_STEP_MS

cffirmware.kalmanCoreAddProcessNoise(self.coreData, self.coreParams, self.now_ms)

self._update_queued_measurements(self.now_ms, sensor_samples)

cffirmware.kalmanCoreFinalize(self.coreData)

# Main loop called at 1000 Hz in the firmware
self.now_ms += 1

external_state = cffirmware.state_t()
acc_latest = cffirmware.Axis3f()
cffirmware.kalmanCoreExternalizeState(self.coreData, external_state, acc_latest)

return self.now_ms, external_state

def _lazy_init(self, sample_time_ms):
self.now_ms = sample_time_ms
self.next_prediction_ms = self.now_ms + self.PREDICT_STEP_MS

GRAVITY_MAGNITUDE = 9.81
DEG_TO_RAD = math.pi / 180.0
cffirmware.axis3fSubSamplerInit(self.accSubSampler, GRAVITY_MAGNITUDE)
cffirmware.axis3fSubSamplerInit(self.gyroSubSampler, DEG_TO_RAD)

self.coreParams = cffirmware.kalmanCoreParams_t()
cffirmware.kalmanCoreDefaultParams(self.coreParams)
cffirmware.outlierFilterTdoaReset(self.outlierFilterState)
cffirmware.kalmanCoreInit(self.coreData, self.coreParams, self.now_ms)

self._is_initialized = True

def _update_queued_measurements(self, now_ms: int, sensor_samples):
done = False

while len(sensor_samples):
sample = sensor_samples.pop(0)
time_ms = int(sample[1]['timestamp'])
if time_ms <= now_ms:
self._update_with_sample(sample, now_ms)
else:
return

def _update_with_sample(self, sample, now_ms):
if sample[0] == 'estTDOA':
tdoa_data = sample[1]
tdoa = cffirmware.tdoaMeasurement_t()

tdoa.anchorIdA = int(tdoa_data['idA'])
tdoa.anchorIdB = int(tdoa_data['idB'])
tdoa.anchorPositionA = self.anchor_positions[tdoa.anchorIdA]
tdoa.anchorPositionB = self.anchor_positions[tdoa.anchorIdB]
tdoa.distanceDiff = float(tdoa_data['distanceDiff'])
tdoa.stdDev = self.TDOA_ENGINE_MEASUREMENT_NOISE_STD

cffirmware.kalmanCoreUpdateWithTdoa(self.coreData, tdoa, now_ms, self.outlierFilterState)

if sample[0] == 'estAcceleration':
acc_data = sample[1]

acc = cffirmware.Axis3f()
acc.x = float(acc_data['acc.x'])
acc.y = float(acc_data['acc.y'])
acc.z = float(acc_data['acc.z'])

cffirmware.axis3fSubSamplerAccumulate(self.accSubSampler, acc)

if sample[0] == 'estGyroscope':
gyro_data = sample[1]

gyro = cffirmware.Axis3f()
gyro.x = float(gyro_data['gyro.x'])
gyro.y = float(gyro_data['gyro.y'])
gyro.z = float(gyro_data['gyro.z'])

cffirmware.axis3fSubSamplerAccumulate(self.gyroSubSampler, gyro)
25 changes: 25 additions & 0 deletions bindings/util/loco_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import cffirmware
import yaml

def read_loco_anchor_positions(file_name: str) -> dict[int, cffirmware.vec3_s]:
"""
Read anchor position data from a file exported from the client.

Args:
file_name (str): The name of the file

Returns:
dict[int, cffirmware.vec3_s]: A dictionary from anchor id to a 3D-vector
"""
result = {}

with open(file_name, 'r') as file:
data = yaml.safe_load(file)
for id, vals in data.items():
point = cffirmware.vec3_s()
point.x = vals['x']
point.y = vals['y']
point.z = vals['z']
result[id] = point

return result
51 changes: 51 additions & 0 deletions bindings/util/sd_card_file_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import tools.usdlog.cfusdlog as cfusdlog
from bindings.util.estimator_kalman_emulator import EstimatorKalmanEmulator

class SdCardFileRunner:
"""
This class is used to read data from a file and feed it to a kalman estimator emulator, usually for testing
purposes.
"""

def __init__(self, file_name: str) -> None:
"""
Read sampled data from a file and feed to a kalman estimator emulator.
The supported file format is what is recorded using a uSD-card deck on the Crazyflie.
In the common use case the file should contain accelerometer and gyro data, as well as some other sensor
data that is to be fed to the kalman estimator.

Args:
file_name (str): Name of the file
"""
self.samples = self._read_sensor_data_sorted(file_name)

def run_estimator_loop(self, emulator: EstimatorKalmanEmulator):
result = []
while len(self.samples):
now_ms, external_state = emulator.run_one_1khz_iteration(self.samples)
result.append((now_ms, (external_state.position.x, external_state.position.y, external_state.position.z)))

return result

def _read_sensor_data_sorted(self, file_name: str):
"""Read sensor data from a file recorded using the uSD-card on a Crazyflie

Args:
file_name: The name of the file with recorded data

Returns:
A list of sensor samples, sorted in time order. The first field of each row identifies the sensor
that produced the sample
"""
log_data = cfusdlog.decode(file_name)

samples = []
for log_type, data in log_data.items():
for i in range(len(data['timestamp'])):
sample_data = {}
for name, data_list in data.items():
sample_data[name] = data_list[i]
samples.append((log_type, sample_data))

samples.sort(key=lambda x: x[1]['timestamp'])
return samples
Loading