-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbag2img.py
210 lines (175 loc) · 6.93 KB
/
bag2img.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#!/usr/bin/env python
"""Extract images from a rosbag.
"""
import os
import argparse
import cv2
import rosbag
from sensor_msgs.msg import Image, NavSatFix, TimeReference
from cv_bridge import CvBridge
#for testing
#import std_msgs
#import rospy
import piexif
from fractions import Fraction
from datetime import datetime
def to_deg(value, loc):
"""convert decimal coordinates into degrees, munutes and seconds tuple
Keyword arguments: value is float gps-value, loc is direction list ["S", "N"] or ["W", "E"]
return: tuple like (25, 13, 48.343 ,'N')
"""
if value < 0:
loc_value = loc[0]
elif value > 0:
loc_value = loc[1]
else:
loc_value = ""
abs_value = abs(value)
deg = int(abs_value)
t1 = (abs_value-deg)*60
min = int(t1)
sec = round((t1 - min)* 60, 5)
return (deg, min, sec, loc_value)
def change_to_rational(number):
"""convert a number to rantional
Keyword arguments: number
return: tuple like (1, 2), (numerator, denominator)
"""
f = Fraction(str(number))
return (f.numerator, f.denominator)
def set_gps_location(file_name, lat, lng, altitude, gpsTime):
"""Adds GPS position as EXIF metadata
Keyword arguments:
file_name -- image file
lat -- latitude (as float)
lng -- longitude (as float)
altitude -- altitude (as float)
"""
lat_deg = to_deg(lat, ["S", "N"])
lng_deg = to_deg(lng, ["W", "E"])
exiv_lat = (change_to_rational(lat_deg[0]), change_to_rational(lat_deg[1]), change_to_rational(lat_deg[2]))
exiv_lng = (change_to_rational(lng_deg[0]), change_to_rational(lng_deg[1]), change_to_rational(lng_deg[2]))
gps_ifd = {
piexif.GPSIFD.GPSVersionID: (2, 0, 0, 0),
piexif.GPSIFD.GPSAltitudeRef: 0,
piexif.GPSIFD.GPSAltitude: change_to_rational(round(altitude + 50)),
piexif.GPSIFD.GPSLatitudeRef: lat_deg[3],
piexif.GPSIFD.GPSLatitude: exiv_lat,
piexif.GPSIFD.GPSLongitudeRef: lng_deg[3],
piexif.GPSIFD.GPSLongitude: exiv_lng,
piexif.GPSIFD.GPSDateStamp: gpsTime
}
exif_dict = {"GPS": gps_ifd}
exif_bytes = piexif.dump(exif_dict)
piexif.insert(exif_bytes, file_name)
def getNearestData(data,stamp):
"""Get nearest datapoint for query stamp from a dataarray with stamps
Keyword arguments:
data -- a dataarray to query from
stamp -- query stamp
"""
i = 0
for oneData in data:
if stamp.to_time() < oneData.header.stamp.to_time() and i > 0:
deltaPrev = abs(data[i-1].header.stamp - stamp)
deltaNext= abs(data[i].header.stamp - stamp)
if deltaPrev < deltaNext:
return data[i-1]
else:
return data[i]
i=i+1
return data[-1]
def rostime2floatSecs(rostime):
return rostime.secs + (rostime.nsecs / 1000000000.0)
def main():
"""Extract a folder of images with metadata from a rosbag.
"""
parser = argparse.ArgumentParser(description="Extract images from a ROS bag.")
parser.add_argument("bag_file", help="Input ROS bag.")
parser.add_argument("output_dir", help="Output directory.")
parser.add_argument("compressed", help="image compressed \"1\" or not \"0\"")
args = parser.parse_args()
bag_file = args.bag_file #"_2019-03-27-22-30-28.bag"
output_dir = args.output_dir #"extractedImages/"
compressed = args.compressed #true false
if compressed=="1":
image_topic = "/teli_camera/image_raw/compressed"
# "/toshiba/usb/image_raw/compressed"
else:
image_topic = "/camera/image_color"
gpsPos_topic = "/dji_osdk_ros/gps_position"
# "/mavros/global_position/global" # /mavros/global_position/raw/fix
gpsTime_topic = "/mavros/time_reference"
useGpsTime = False
jpg_quality = 100
compr = cv2.IMWRITE_JPEG_QUALITY
quality = jpg_quality # jpg quality is in [0,100] range, png [0,9]
params = [compr, quality]
bag = rosbag.Bag(bag_file, "r")
bridge = CvBridge()
print("Parse GPS Data...")
gpsPosData=[]
for topic, msg, t in bag.read_messages(topics=[gpsPos_topic]):
gpsPosData.append(msg)
if len(gpsPosData) == 0:
print("No GPS position data found -> dont save pos to exif")
print(len(gpsPosData))
gpsTimeData=[]
if useGpsTime:
for topic, msg, t in bag.read_messages(topics=[gpsTime_topic]):
gpsTimeData.append(msg)
if useGpsTime == True and len(gpsTimeData)== 0:
print("No GPS Time data found -> use unix recording timestamps instead")
useGpsTime = False
print(len(gpsTimeData))
print("Save images...")
count = 0
denom = 3
for topic, msg, t in bag.read_messages(topics=[image_topic]):
if count % denom == 0:
if compressed=="1":
cv_img = bridge.compressed_imgmsg_to_cv2(msg, "bgr8")
else:
cv_img = bridge.imgmsg_to_cv2(msg, "bgr8") # desired_encoding="passthrough"
if cv_img is None:
print("nnonon")
continue
print(topic, t, cv_img)
if useGpsTime:
nearestPicGpsTimeData = getNearestData(gpsTimeData,msg.header.stamp)
picGpsTimeFlt = rostime2floatSecs(nearestPicGpsTimeData.time_ref) - (rostime2floatSecs(nearestPicGpsTimeData.header.stamp) - rostime2floatSecs(msg.header.stamp))
else:
picGpsTimeFlt = rostime2floatSecs(msg.header.stamp)
picNameTimeString = datetime.utcfromtimestamp(picGpsTimeFlt).strftime('%Y_%m_%d_%H_%M_%S_%.1f')
picSaveName=""
picSaveName = picNameTimeString + ".jpg"
print("save image " + picSaveName)
picPath = os.path.join(output_dir, picSaveName)
cv2.imwrite(picPath, cv_img, params)
if len(gpsPosData) > 0:
picGpsPos = getNearestData(gpsPosData, msg.header.stamp)
gpsTimeString = datetime.utcfromtimestamp(picGpsTimeFlt).strftime('%Y:%m:%d %H:%M:%S')
set_gps_location(picPath,lat=picGpsPos.latitude, lng=picGpsPos.longitude,altitude=picGpsPos.altitude, gpsTime=gpsTimeString )
count += 1
bag.close()
return
if __name__ == '__main__':
main()
"""
#Test gps data assignment to pic based on stamp
headerMsg = NavSatFix()
headerMsg.header.stamp = rospy.rostime.Time.from_seconds(1553698847.23123)
picGpsPos = getNearestData(gpsPosData,headerMsg.header.stamp )
picGpsTime = getNearestData(gpsTimeData,headerMsg.header.stamp )
picGpsTimeFlt = picGpsTime.time_ref.secs + (picGpsTime.time_ref.nsecs / 1000000000.0)
picGpsTimeString = datetime.utcfromtimestamp(picGpsTimeFlt).strftime('%Y-%m-%dT%H:%M:%S.%f')
print(picGpsTime.time_ref)
print(GpsTimeString)
"""
"""
#EXIF Reader test
exif_dict = piexif.load("")
for ifd in ("0th", "Exif", "GPS", "1st"):
for tag in exif_dict[ifd]:
print(piexif.TAGS[ifd][tag]["name"], exif_dict[ifd][tag])
"""