-
Notifications
You must be signed in to change notification settings - Fork 9
xpath
The XPathProcessor class defined in spark-xml-utils provides methods that enable processing of XPath expressions (filter/evaluate) against a record. The record is assumed to be a string of XML.
The following imports are required for the XPathProcessor. The JavaConverters and HashMap are only required if the XPath expression contains namespace prefixes.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
First there is the XPath expression that you will want to apply to the XML records. Below is an example of a simple XPath filter expression that will filter on content of type 'JL' that has a publication year greater than 2012 but less than 2015.
val xpathFilter = "exists(/xocs:doc[xocs:meta[xocs:content-type='JL' and xocs:cover-date-year > xs:int(2012) and xocs:cover-date-year < xs:int(2015)]])"
It also possible to evaluate an XPath expression (returning a serialized string). Below is an example that would return the srctitle for a record.
val xpathEvaluate = "/xocs:doc/xocs:meta/xocs:srctitle/text()"
If the XPath expression contains namespace prefixes, the XPathProcessor will need to be told about the these mappings. This involves simply providing a HashMap of the prefix to namespace uri mappings. Below is a simple example.
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd",
).asJava)
Putting it all together, the XPathProcessor would be created as follows:
val filterProc = XPathProcessor.getInstance(xpathFilter,namespaces)
val evalProc = XPathProcessor.getInstance(xpathEvaluate,namespaces)
If the XPath expression does not contain namespace prefixes, such as
val xpath = "exists(/*:doc)"
or
val xpath = "exists(/doc)"
the XPathProcessor would be created as follows:
val xpathProc = XPathProcessor.getInstance(xpath)
The result of a filter operation will be a boolean TRUE/FALSE. The filter operation applies an XPath expression against a string (assumed to be XML). The filter can occur locally on the driver (if you have returned records to the driver) or on the workers. In practice, the filter will typically occur on the workers but I will show examples of both. In addition to the filterString method, there is an equivalent filter() which will accept either a String or an InputStream.
When filtering locally on the driver , the code would be something like the following. In the example below local is an Array of (String,String) where the first item is the key and the second item is the string of XML.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val local = xmlKeyPair.take(10)
val xpath = "exists(/xocs:doc[xocs:meta[xocs:content-type='JL' and xocs:cover-date-year > xs:int(2012) and xocs:cover-date-year < xs:int(2015)]])"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
val localFiltered = local.filter(rec => proc.filterString(rec._2))
When filtering on the workers, the code would be something like the following. In the example below xmlKeyPair is an RDD of (String,String) where the first item is the key and the second item is the string of XML. We use mapPartitions to initialize the processor for XPath once per partition for optimal performance. We then use an iterator to process each record in the partition.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val filtered = xmlKeyPair.mapPartitions(recsIter => {
val xpath = "exists(/xocs:doc[xocs:meta[xocs:content-type='JL' xocs:cover-date-year > xs:int(2012) and xocs:cover-date-year < xs:int(2015)]])"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
recsIter.filter(rec => proc.filterString(rec._2))
})
If there is an error encountered during the operation, the error will be logged and an exception will be raised.
The result of an evaluation operation will be the result of the XPath expression (serialized as a string). Like the filter operation, the evaluation operation applies an XPath expression against a string. The evaluation can occur locally on the driver (if you have returned records to the driver) or on the workers. In practice, the evaluation will typically occur on the workers but I will show examples of both. In addition to the evaluateString method, there is an equivalent evaluate() which will accept either a String or an InputStream.
When evaluating locally on the driver , the code would be something like the following. In the example below local is an Array of (String,String) where the first item is the key and the second item is the string of XML.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val local = xmlKeyPair.take(10)
val xpath = "/xocs:doc/xocs:meta/xocs:srctitle/text()"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
val localEval = local.map(rec => proc.evaluateString(rec._2))
When evaluating on the workers, the code would be something like the following. In the example below xmlKeyPair is an RDD of (String,String) where the first item is the key and the second item is the string of XML. Once again, we use mapPartitions to initialize the processor for XPath once per partition for optimal performance. We then use an iterator to process each record in the partition.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val srctitles = xmlKeyPair.mapPartitions(recsIter => {
val xpath = "/xocs:doc/xocs:meta/xocs:srctitle/text()"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
recsIter.map(rec => proc.evaluateString(rec._2))
})
If there is an error encountered during the operation, the error will be logged and an exception will be raised.
I have successfully used com.elsevier.spark_xml_utils.xpath.XPathProcessor from the spark-shell and notebook environments (such as Databricks and Zeppelin). Depending on the environment, you just need to get the spark-xml-utils.jar installed and available to the driver and workers. For the spark-shell, something like the following would be done.
cd {spark-install-dir}
./bin/spark-shell --jars lib/uber-spark-xml-utils-1.4.0.jar
You can also use the 'packages' option as well.
cd {spark-install-dir}
./bin/spark-shell --packages elsevierlabs-os:spark-xml-utils:1.4.0