-
Notifications
You must be signed in to change notification settings - Fork 9
xpath
The XPathProcessor class defined in spark-xml-utils provides a collection of static methods that enable relatively easy processing of xpath expressions against a record. The record is assumed to be a string of xml.
If any of the xpath expressions contain namespace prefixes, the XPathProcessor will need to be initialized. The initialization is simply providing a HashMap of the prefix to namespace uri mappings. Below is a simple example.
HashMap<String,String> pfxUriMap = new HashMap<String,String>();
pfxUriMap.put("xocs"", "http://www.elsevier.com/xml/xocs/dtd");
pfxUriMap.put(ja", "http://www.elsevier.com/xml/ja/dtd");
pfxUriMap.put("si", "http://www.elsevier.com/xml/si/dtd");
pfxUriMap.put("ehs", "http://www.elsevier.com/xml/ehs-book/dtd");
pfxUriMap.put("bk", "http://www.elsevier.com/xml/bk/dtd");
pfxUriMap.put("ce", "http://www.elsevier.com/xml/common/dtd");
pfxUriMap.put("sb", "http://www.elsevier.com/xml/common/struct-bib/dtd");
pfxUriMap.put("tb", "http://www.elsevier.com/xml/common/table/dtd");
pfxUriMap.put("xlink", "http://www.w3.org/1999/xlink");
pfxUriMap.put("mml", "http://www.w3.org/1998/Math/MathML");
pfxUriMap.put("cals", "http://www.elsevier.com/xml/common/cals/dtd");
XPathProcessor.init(pfxUriMap);
The result of a filter operation will be a boolean TRUE/FALSE. The filter operation applies an xpath expression against a string. In the example below, the string being evaluated is "john" and the xpath expression applied is "/name[.='john']". In this example, the result of the filterString will be a TRUE.
XPathProcessor.filterString("<name>john</name>", "/name[.='john']")
If there is an error encountered during the operation, the error will be logged but an exception will not be raised. Instead, a value of FALSE will be returned.
The result of an evaluation operation will be the result of the xpath expression (serialized as a string). Like the filter operation, the evaluation operation applies an xpath expression against a string. In the example below, the string being evaluated is "john" and the xpath expression applied is "/name[.='john']". In this example, the result of the evaluateString will be a "john".
XPathProcessor.evaluateString("<name>john</name>", "/name[.='john']")
If there is an error encountered during the operation, the error will be logged but an exception will not be raised. Instead, a value of "<error/>" will be returned.
This is simply a helper method that would allow you to clear any namespace prefix/uri mappings that would have been cached. Typically, this would be followed by an XPathProcessor.init() to re-initialize any mappings.
XPathProcessor.clear()
A more complete XPath example is provided in the code samples. The XPath code sample will consist of an XPath driver class (that will be executed on the master) and an XPath worker class (that will be executed on the worker). By using these classes as a template, it should be straightforward to apply modifications to meet your needs.
Copy the spark-xml-utils.jar to the master node. We are assuming you are in the installation directory for Spark on the master and that you have copied the .jar file to the 'lib' folder under this location. Once this is done, execute the following command.
cd spark-install-dir
./bin/spark-shell --jars lib/spark-xml-utils.jar
Let's assume we have created a PairRDD by loading in a hadoop sequence file by executing a command like the following.
scala> val xmlKeyPair = sc.sequenceFile[String,String]("s3n://els-ats/darin/sd-xml/part*").cache()
Since our xpath expression will contain namespaces, we now need to initialize the Spark partitions associated with the xmlKeyPairRDD. This is done by executing the following commands. In the example below, the HashMap pfxuriMap contains the mappings of namespace prefixes to namespace uris.
scala> import com.elsevier.xml.XPathProcessor
scala> import java.util.HashMap
scala> var pfxUriMap : HashMap[String,String] = new HashMap[String,String]()
scala> pfxUriMap.put("xocs", "http://www.elsevier.com/xml/xocs/dtd")
scala> XPathProcessor.init(pfxUriMap)
scala> xmlKeyPair.foreachPartition(i => {XPathProcessor.init(pfxUriMap)})
To create a new PairRDD from xmlKeyPair where the content type is for 'journals', the stage is 'S300', and the date is between 2000 and 2014 we could use the following command, we could use filterString and execute the following command.
scala> val filteredXmlKeyPair = xmlKeyPair.filter(pair => XPathProcessor.filterString(pair._2,"xocs:doc[./xocs:meta[xocs:content-type='JL' and xocs:item-stage='S300' and xocs:cover-date-year > 2000 and xocs:cover-date-year < 2014]]"))
If we then wanted to only get the xocs:meta section for these documents, we could use evaluateString and execute the following command.
scala> val metaFilteredXmlKeyPair = filteredXmlKeyPair.mapValues(v => XPathProcessor.evaluateString(v,"/xocs:doc/xocs:meta"))
Keep in mind that the above statements (since they are transformations) will not execute until an action occurs. For example, simply adding a count will force an action.
scala> filteredXmlKeyPair.count
scala> metaFilteredXmlKeyPair.count