Skip to content

Commit

Permalink
Add missing trace file prefix adjust to common order
Browse files Browse the repository at this point in the history
Fixes #816.

Signed-off-by: Pierre R. Mai <[email protected]>
  • Loading branch information
pmai committed Jun 10, 2024
1 parent af72665 commit ceeb575
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 113 deletions.
26 changes: 19 additions & 7 deletions doc/architecture/trace_file_naming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,35 @@ The names of OSI trace files should have the following format:

**Types**

`sd`::
Trace file contains `SensorData` messages.

`sv`::
Trace file contains `SensorView` messages.

`svc`::
Trace file contains `SensorViewConfiguration` messages.

`gt`::
Trace file contains `GroundTruth` messages.

`tu`::
Trace file contains `TrafficUpdate` messages.
`hvd`::
Trace file contains `HostVehicleData` messages.

`sd`::
Trace file contains `SensorData` messages.

`tc`::
Trace file contains `TrafficCommand` messages.

`hvd`::
Trace file contains `HostVehicleData` messages.
`tcu`::
Trace file contains `TrafficCommandUpdate` messages.

`tu`::
Trace file contains `TrafficUpdate` messages.

`mr`::
Trace file contains `MotionRequest` messages.

`su`::
Trace file contains `StreamingUpdate` messages.

**Example**

Expand Down
8 changes: 4 additions & 4 deletions osi3trace/osi_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,26 @@
import struct

from osi3.osi_sensorview_pb2 import SensorView
from osi3.osi_sensorviewconfiguration_pb2 import SensorViewConfiguration
from osi3.osi_groundtruth_pb2 import GroundTruth
from osi3.osi_hostvehicledata_pb2 import HostVehicleData
from osi3.osi_sensordata_pb2 import SensorData
from osi3.osi_sensorviewconfiguration_pb2 import SensorViewConfiguration
from osi3.osi_trafficupdate_pb2 import TrafficUpdate
from osi3.osi_trafficcommand_pb2 import TrafficCommand
from osi3.osi_trafficcommandupdate_pb2 import TrafficCommandUpdate
from osi3.osi_trafficupdate_pb2 import TrafficUpdate
from osi3.osi_motionrequest_pb2 import MotionRequest
from osi3.osi_streamingupdate_pb2 import StreamingUpdate


MESSAGES_TYPE = {
"SensorView": SensorView,
"SensorViewConfiguration": SensorViewConfiguration,
"GroundTruth": GroundTruth,
"HostVehicleData": HostVehicleData,
"SensorData": SensorData,
"SensorViewConfiguration": SensorViewConfiguration,
"TrafficUpdate": TrafficUpdate,
"TrafficCommand": TrafficCommand,
"TrafficCommandUpdate": TrafficCommandUpdate,
"TrafficUpdate": TrafficUpdate,
"MotionRequest": MotionRequest,
"StreamingUpdate": StreamingUpdate,
}
Expand Down
204 changes: 102 additions & 102 deletions tests/test_osi_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

from osi3trace.osi_trace import OSITrace
from osi3.osi_sensorview_pb2 import SensorView
from osi3.osi_sensorviewconfiguration_pb2 import SensorViewConfiguration
from osi3.osi_groundtruth_pb2 import GroundTruth
from osi3.osi_hostvehicledata_pb2 import HostVehicleData
from osi3.osi_sensordata_pb2 import SensorData
from osi3.osi_sensorviewconfiguration_pb2 import SensorViewConfiguration
from osi3.osi_trafficupdate_pb2 import TrafficUpdate
from osi3.osi_trafficcommand_pb2 import TrafficCommand
from osi3.osi_trafficcommandupdate_pb2 import TrafficCommandUpdate
from osi3.osi_trafficupdate_pb2 import TrafficUpdate
from osi3.osi_motionrequest_pb2 import MotionRequest
from osi3.osi_streamingupdate_pb2 import StreamingUpdate

Expand All @@ -35,6 +35,23 @@ def test_osi_trace_sv(self):

self.assertTrue(os.path.exists(path_output))

def test_osi_trace_svc(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path_output = os.path.join(tmpdirname, "output_svc.txth")
path_input = os.path.join(tmpdirname, "input_svc.osi")
create_sample_svc(path_input)

trace = OSITrace(path_input, "SensorViewConfiguration")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, SensorViewConfiguration)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 1)
trace.close()

self.assertTrue(os.path.exists(path_output))

def test_osi_trace_gt(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path_output = os.path.join(tmpdirname, "output_gt.txth")
Expand Down Expand Up @@ -86,40 +103,6 @@ def test_osi_trace_sd(self):

self.assertTrue(os.path.exists(path_output))

def test_osi_trace_svc(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path_output = os.path.join(tmpdirname, "output_svc.txth")
path_input = os.path.join(tmpdirname, "input_svc.osi")
create_sample_svc(path_input)

trace = OSITrace(path_input, "SensorViewConfiguration")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, SensorViewConfiguration)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 1)
trace.close()

self.assertTrue(os.path.exists(path_output))

def test_osi_trace_tu(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path_output = os.path.join(tmpdirname, "output_tu.txth")
path_input = os.path.join(tmpdirname, "input_tu.osi")
create_sample_tu(path_input)

trace = OSITrace(path_input, "TrafficUpdate")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, TrafficUpdate)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))

def test_osi_trace_tc(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path_output = os.path.join(tmpdirname, "output_tc.txth")
Expand Down Expand Up @@ -154,6 +137,23 @@ def test_osi_trace_tcu(self):

self.assertTrue(os.path.exists(path_output))

def test_osi_trace_tu(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path_output = os.path.join(tmpdirname, "output_tu.txth")
path_input = os.path.join(tmpdirname, "input_tu.osi")
create_sample_tu(path_input)

trace = OSITrace(path_input, "TrafficUpdate")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, TrafficUpdate)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))

def test_osi_trace_mr(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path_output = os.path.join(tmpdirname, "output_mr.txth")
Expand Down Expand Up @@ -257,6 +257,31 @@ def create_sample_sv(path):
f.close()


def create_sample_svc(path):
f = open(path, "ab")
sensorviewconfig = SensorViewConfiguration()

sensorviewconfig.version.version_major = 3
sensorviewconfig.version.version_minor = 0
sensorviewconfig.version.version_patch = 0

sensorviewconfig.sensor_id.value = 42

sensorviewconfig.mounting_position.position.x = 0.8
sensorviewconfig.mounting_position.position.y = 1.0
sensorviewconfig.mounting_position.position.z = 0.5

sensorviewconfig.mounting_position.orientation.roll = 0.10
sensorviewconfig.mounting_position.orientation.pitch = 0.15
sensorviewconfig.mounting_position.orientation.yaw = 0.25

"""Serialize"""
bytes_buffer = sensorviewconfig.SerializeToString()
f.write(struct.pack("<L", len(bytes_buffer)) + bytes_buffer)

f.close()


def create_sample_gt(path):
f = open(path, "ab")
ground_truth = GroundTruth()
Expand Down Expand Up @@ -379,72 +404,6 @@ def create_sample_sd(path):
f.close()


def create_sample_svc(path):
f = open(path, "ab")
sensorviewconfig = SensorViewConfiguration()

sensorviewconfig.version.version_major = 3
sensorviewconfig.version.version_minor = 0
sensorviewconfig.version.version_patch = 0

sensorviewconfig.sensor_id.value = 42

sensorviewconfig.mounting_position.position.x = 0.8
sensorviewconfig.mounting_position.position.y = 1.0
sensorviewconfig.mounting_position.position.z = 0.5

sensorviewconfig.mounting_position.orientation.roll = 0.10
sensorviewconfig.mounting_position.orientation.pitch = 0.15
sensorviewconfig.mounting_position.orientation.yaw = 0.25

"""Serialize"""
bytes_buffer = sensorviewconfig.SerializeToString()
f.write(struct.pack("<L", len(bytes_buffer)) + bytes_buffer)

f.close()


def create_sample_tu(path):
f = open(path, "ab")
trafficupdate = TrafficUpdate()

trafficupdate.version.version_major = 3
trafficupdate.version.version_minor = 0
trafficupdate.version.version_patch = 0

trafficupdate.timestamp.seconds = 0
trafficupdate.timestamp.nanos = 0

moving_object = trafficupdate.update.add()
moving_object.id.value = 114

# Generate 10 OSI messages for 9 seconds
for i in range(10):
# Increment the time
trafficupdate.timestamp.seconds += 1
trafficupdate.timestamp.nanos += 100000

moving_object.vehicle_classification.type = 2

moving_object.base.dimension.length = 5
moving_object.base.dimension.width = 2
moving_object.base.dimension.height = 1

moving_object.base.position.x = 0.0 + i
moving_object.base.position.y = 0.0
moving_object.base.position.z = 0.0

moving_object.base.orientation.roll = 0.0
moving_object.base.orientation.pitch = 0.0
moving_object.base.orientation.yaw = 0.0

"""Serialize"""
bytes_buffer = trafficupdate.SerializeToString()
f.write(struct.pack("<L", len(bytes_buffer)) + bytes_buffer)

f.close()


def create_sample_tc(path):
f = open(path, "ab")
trafficcommand = TrafficCommand()
Expand Down Expand Up @@ -508,6 +467,47 @@ def create_sample_tcu(path):
f.close()


def create_sample_tu(path):
f = open(path, "ab")
trafficupdate = TrafficUpdate()

trafficupdate.version.version_major = 3
trafficupdate.version.version_minor = 0
trafficupdate.version.version_patch = 0

trafficupdate.timestamp.seconds = 0
trafficupdate.timestamp.nanos = 0

moving_object = trafficupdate.update.add()
moving_object.id.value = 114

# Generate 10 OSI messages for 9 seconds
for i in range(10):
# Increment the time
trafficupdate.timestamp.seconds += 1
trafficupdate.timestamp.nanos += 100000

moving_object.vehicle_classification.type = 2

moving_object.base.dimension.length = 5
moving_object.base.dimension.width = 2
moving_object.base.dimension.height = 1

moving_object.base.position.x = 0.0 + i
moving_object.base.position.y = 0.0
moving_object.base.position.z = 0.0

moving_object.base.orientation.roll = 0.0
moving_object.base.orientation.pitch = 0.0
moving_object.base.orientation.yaw = 0.0

"""Serialize"""
bytes_buffer = trafficupdate.SerializeToString()
f.write(struct.pack("<L", len(bytes_buffer)) + bytes_buffer)

f.close()


def create_sample_mr(path):
f = open(path, "ab")
motionrequest = MotionRequest()
Expand Down

0 comments on commit ceeb575

Please sign in to comment.