-
Notifications
You must be signed in to change notification settings - Fork 9
xquery
The XQueryProcessor class defined in spark-xml-utils provides methods that enable processing of XQuery evaluate expressions against a record. The record is assumed to be a string of XML.
The following imports are required for the XQueryProcessor. The JavaConverters and HashMap are only required if the XQuery expression contains namespace prefixes.
import com.elsevier.spark_xml_utils.xquery.XQueryProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
First there is the XQuery expression that you will want to apply to the XML records. Below is an example of a simple XQuery evaluate expression that will return a json document with the journal source title and publication year.
val xquery= "for $x in /xocs:doc/xocs:meta return " +
"string-join(('{ \"srctitle\" :\"',$x/xocs:srctitle, '\",\"year\":',$x/xocs:cover-date-year,'}'),'')"
If the XQuery expression contains namespace prefixes, the XQueryProcessor will need to be told about the these mappings. This involves simply providing a HashMap of the prefix to namespace uri mappings. Below is a simple example.
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd"
).asJava)
Putting it all together, the XQueryProcessor would be created as follows:
val xqueryProc = XQueryProcessor.getInstance(xquery,namespaces)
If the XQuery expression does not contain namespace prefixes, such as
val xquery= "for $x in /*:doc/*:meta return " +
"string-join(('{ \"srctitle\" :\"',$x/*:srctitle, '\",\"year\":',$x/*:cover-date-year,'}'),'')"
or
val xquery= "for $x in /doc/meta return " +
"string-join(('{ \"srctitle\" :\"',$x/srctitle, '\",\"year\":',$x/cover-date-year,'}'),'')"
the XQueryProcessor would be created as follows:
val xqueryProc = XQueryProcessor.getInstance(xquery)
The result of an evaluation operation will be the result of the XQuery expression (serialized as a string). The evaluation operation applies an XQuery expression against a string. The evaluation can occur locally on the driver (if you have returned records to the driver) or on the workers. In practice, the evaluation will typically occur on the workers but I will show examples of both. In addition to the evaluateString method, there is an equivalent evaluate() which will accept either a String or an InputStream.
When evaluating locally on the driver , the code would be something like the following. In the example below local is an Array of (String,String) where the first item is the key and the second item is the string of XML.
import com.elsevier.spark_xml_utils.xquery.XQueryProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val local = xmlKeyPair.take(10)
val xquery = "for $x in /xocs:doc/xocs:meta return " +
"string-join(('{ \"srctitle\" :\"',$x/xocs:srctitle, '\",\"year\":',$x/xocs:cover-date-year,'}'),'')"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd"
).asJava)
val proc = XQueryProcessor.getInstance(xquery,namespaces)
val localEval = local.map(rec => proc.evaluateString(rec._2))
When evaluating on the workers, the code would be something like the following. In the example below xmlKeyPair is an RDD of (String,String) where the first item is the key and the second item is the string of XML. We use mapPartitions to initialize the processor for XQuery once per partition for optimal performance. We then use an iterator to process each record in the partition.
import com.elsevier.spark_xml_utils.xquery.XQueryProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val srcyearJson = xmlKeyPair.mapPartitions(recsIter => {
val xquery = "for $x in /xocs:doc/xocs:meta return " +
"string-join(('{ \"srctitle\" :\"',$x/xocs:srctitle, '\",\"year\":',$x/xocs:cover-date-year,'}'),'')"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd"
).asJava)
val proc = XQueryProcessor.getInstance(xquery,namespaces)
recsIter.map(rec => proc.evaluateString(rec._2))
})
If there is an error encountered during the operation, the error will be logged and an exception will be raised.
I have successfully used XQueryProcessor from the spark-shell and notebook environments (such as Databricks and Zeppelin). Depending on the environment, you just need to get the spark-xml-utils.jar installed and available to the driver and workers. For the spark-shell, something like the following would be done.
cd {spark-install-dir}
./bin/spark-shell --jars lib/uber-spark-xml-utils-1.4.0.jar
You can also use the 'packages' option as well.
cd {spark-install-dir}
./bin/spark-shell --packages elsevierlabs-os:spark-xml-utils:1.4.0