Skip to content

Commit

Permalink
v.0.9.8
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Achihăei committed Mar 7, 2018
1 parent 4be9be4 commit b9913e0
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ RUN bash -c "if [ ! -f /opt/ros_hadoop/master/dist/HMB_4.bag ] ; then wget --qui
java -jar "$ROSIF_JAR" -f /opt/ros_hadoop/master/dist/HMB_4.bag

RUN bash -c "/start_hadoop.sh" && \
/opt/apache/hadoop/bin/hdfs dfsadmin -report && \
/opt/apache/hadoop/bin/hdfs dfsadmin -safemode wait && \
/opt/apache/hadoop/bin/hdfs dfsadmin -report && \
/opt/apache/hadoop/bin/hdfs dfs -mkdir /user && \
/opt/apache/hadoop/bin/hdfs dfs -mkdir /user/root && \
/opt/apache/hadoop/bin/hdfs dfs -put /opt/ros_hadoop/master/dist/HMB_4.bag && \
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lazy val root = (project in file(".")).
inThisBuild(List(
organization := "org.apache.spark.input",
scalaVersion := "2.11.8",
version := "0.9.5"
version := "0.9.8"
)),
name := "RosbagInputFormat",
libraryDependencies ++= Seq(
Expand Down
211 changes: 211 additions & 0 deletions doc/Rosbag larger than 2 GB.ipynb

Large diffs are not rendered by default.

110 changes: 78 additions & 32 deletions doc/Tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
"Example data can be found for instance at https://github.com/udacity/self-driving-car/tree/master/datasets published under MIT License.\n",
"\n",
"## Check that the rosbag file version is V2.0\n",
"The code you cloned is located in ```/opt/ros_hadoop/master``` while the latest release is in ```/opt/ros_hadoop/latest```\n",
"\n",
"../lib/rosbaginputformat.jar is a symlink to a recent version. You can replace it with the version you would like to test.\n",
"\n",
"```bash\n",
"java -jar lib/rosbaginputformat_2.11-0.9.3.jar --version -f HMB_1.bag\n",
"java -jar ../lib/rosbaginputformat.jar --version -f HMB_1.bag\n",
"```\n",
"\n",
"## Extract the index as configuration\n",
Expand All @@ -24,45 +28,62 @@
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Current working directory: /opt/ros_hadoop/latest/doc\n",
"\n",
"\n",
"/opt/ros_hadoop/\n",
"├── latest\n",
"│   ├── doc\n",
"│   ├── lib\n",
"│   ├── project\n",
"│   └── src\n",
"└── master\n",
" ├── bin\n",
" ├── dist\n",
" ├── doc\n",
" ├── lib\n",
" ├── project\n",
" ├── src\n",
" └── target\n",
"\n",
"13 directories\n"
]
}
],
"source": [
"%%bash\n",
"# assuming you start the notebook in the doc/ folder \n",
"java -jar ../lib/rosbaginputformat_2.11-0.9.3.jar -f /srv/data/HMB_4.bag"
"echo -e \"Current working directory: $(pwd)\\n\\n\"\n",
"\n",
"tree -d -L 2 /opt/ros_hadoop/"
]
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"total 56\n",
"drwxrwxr-x 3 root root 4096 Jun 23 15:07 src/\n",
"drwxrwxr-x 2 root root 4096 Jun 23 15:07 project/\n",
"drwxrwxr-x 2 root root 4096 Jun 23 15:07 lib/\n",
"drwxrwxr-x 2 root root 4096 Jun 23 15:07 doc/\n",
"-rw-rw-r-- 1 root root 650 Jun 23 15:07 build.sbt\n",
"-rw-rw-r-- 1 root root 2776 Jun 23 15:07 README.md\n",
"-rw-rw-r-- 1 root root 638 Jun 23 15:07 NOTICE\n",
"-rw-rw-r-- 1 root root 11476 Jun 23 15:07 LICENSE\n",
"-rw-rw-r-- 1 root root 380 Jun 23 15:07 AUTHORS\n",
"-rw-rw-r-- 1 root root 63 Jun 23 15:07 .gitignore\n",
"drwxrwxr-x 6 root root 4096 Jun 23 15:07 ./\n",
"drwxr-xr-x 3 root root 4096 Jul 14 14:36 ../\n"
"min: 347674885\n",
"idxpos: 347442336 347674885\n",
"Found: 421 chunks\n",
"\n"
]
}
],
"source": [
"%%bash\n",
"ls -tralF /srv/data/"
"# assuming you start the notebook in the doc/ folder of master (default Dockerfile build)\n",
"java -jar ../lib/rosbaginputformat.jar -f /opt/ros_hadoop/master/dist/HMB_4.bag"
]
},
{
Expand All @@ -77,33 +98,58 @@
"\n",
"**Note:** keep the index json file as configuration to your jobs, **do not** put small files in HDFS.\n",
"\n",
"For convenience we already provide an example file (/srv/data/HMB_4.bag) in the HDFS under /user/root/\n",
"For convenience we already provide an example file (/opt/ros_hadoop/master/dist/HMB_4.bag) in the HDFS under /user/root/\n",
"\n",
"```bash\n",
"hdfs dfs -put /srv/data/HMB_4.bag\n",
"hdfs dfs -put /opt/ros_hadoop/master/dist/HMB_4.bag\n",
"hdfs dfs -ls\n",
"```\n",
"\n",
"## Process the ros bag file in Spark using the RosbagInputFormat\n",
"\n",
"![images/header.png](images/header.png)\n",
"\n",
"## Create the Spark Session or get an existing one"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext, SparkConf\n",
"from pyspark.sql import SparkSession\n",
"\n",
"sparkConf = SparkConf()\n",
"sparkConf.setMaster(\"local[*]\")\n",
"sparkConf.setAppName(\"ros_hadoop\")\n",
"sparkConf.set(\"spark.jars\", \"../lib/protobuf-java-3.3.0.jar,../lib/rosbaginputformat.jar,../lib/scala-library-2.11.8.jar\")\n",
"\n",
"spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()\n",
"sc = spark.sparkContext"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create an RDD from the Rosbag file\n",
"**Note:** your HDFS address might differ."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"fin = sc.newAPIHadoopFile(\n",
" path = \"hdfs://127.0.0.1:9000/user/root/HMB_4.bag\",\n",
" inputFormatClass = \"de.valtech.foss.RosbagMapInputFormat\",\n",
" keyClass = \"org.apache.hadoop.io.LongWritable\",\n",
" valueClass = \"org.apache.hadoop.io.MapWritable\",\n",
" conf = {\"RosbagInputFormat.chunkIdx\":\"/srv/data/HMB_4.bag.idx.bin\"})"
" conf = {\"RosbagInputFormat.chunkIdx\":\"/opt/ros_hadoop/master/dist/HMB_4.bag.idx.bin\"})"
]
},
{
Expand All @@ -120,7 +166,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 3,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -164,7 +210,7 @@
" '/vehicle/dbw_enabled']"
]
},
"execution_count": 5,
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
Expand Down
2 changes: 1 addition & 1 deletion lib/rosbaginputformat.jar
Binary file not shown.
9 changes: 6 additions & 3 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ${RESET}By default will just create the protobuf idx file needed for configurati
def process(): Unit = {
val fin = new File(pargs("file").asInstanceOf[String])
use(new FileInputStream(fin)) { stream => {
printf("min: %s\n", Math.min(1073741824, fin.length) )
//printf("min: %s\n", Math.min(1073741824, fin.length) )
val buffer = stream.getChannel.map(READ_ONLY, 0, Math.min(1073741824, fin.length)).order(LITTLE_ENDIAN)
val p:RosbagParser = new RosbagParser(buffer)
val version = p.read_version()
Expand All @@ -90,12 +90,15 @@ ${RESET}By default will just create the protobuf idx file needed for configurati
return
}
val idxpos = h.header.fields("index_pos").asInstanceOf[Long]
printf("idxpos: %s %s\n", idxpos, Math.min(1073741824, fin.length) )
//printf("idxpos: %s %s\n", idxpos, Math.min(1073741824, fin.length) )
val b = stream.getChannel.map(READ_ONLY, idxpos, Math.min(1073741824, fin.length - idxpos)).order(LITTLE_ENDIAN)
val pp:RosbagParser = new RosbagParser(b)
val c = pp.read_connections(h.header, Nil)
val chunk_idx = pp.read_chunk_infos(c)
println("Found: "+chunk_idx.size+" chunks\n")
Console.err.printf(s"""${RESET}${GREEN}Found: """
+ chunk_idx.size
+s""" chunks\n${RESET}It should be the same number reported by rosbag tool.\nIf you encounter any issues try reindexing your file and submit an issue.
${RESET}\n""")
val fout = new FileOutputStream(pargs("file").asInstanceOf[String] + ".idx.bin")
val builder = RosbagIdx.newBuilder
for(i <- chunk_idx) builder.addArray(i)
Expand Down

0 comments on commit b9913e0

Please sign in to comment.