-
Notifications
You must be signed in to change notification settings - Fork 9
xpath
The XPathProcessor class defined in spark-xml-utils provides a collection of static methods that enable relatively easy processing of xpath expressions against a record. The record could be a string or the record could be an object contained in an S3 bucket.
If any of the xpath expressions will contain namespace prefixes, the XPathProcessor will need to be initialized. The initialization is simply providing the S3 bucket location (and object name) for a file that will contain some namespace prefix to namespace uri mappings. Below is a sample file.
xocs=http://www.elsevier.com/xml/xocs/dtd
ja=http://www.elsevier.com/xml/ja/dtd
si=http://www.elsevier.com/xml/si/dtd
ehs=http://www.elsevier.com/xml/ehs-book/dtd
bk=http://www.elsevier.com/xml/bk/dtd
ce=http://www.elsevier.com/xml/common/dtd
sb=http://www.elsevier.com/xml/common/struct-bib/dtd
tb=http://www.elsevier.com/xml/common/table/dtd
xlink=http://www.w3.org/1999/xlink
mml=http://www.w3.org/1998/Math/MathML
cals=http://www.elsevier.com/xml/common/cals/dtd
Since S3 is being used to store the namespace prefix/uri mappings in a bucket, the AWS access keys must be set in the environment.
export AWS_ACCESS_KEY_ID="put-your-value-here"
export AWS_SECRET_ACCESS_KEY="put-your-value-here"
Assuming the above sample file is stored in the S3 bucket 'spark-xml-namespaces' and assigned a key of 'sd-namespaces', the following line of code will initialize the XPathProcessor. Keep in mind that the AWS access keys must have 'read' access for the AWS bucket/file.
XPathProcessor.init("spark-xml-namespaces","sd-namespaces");
The result of a filter operation will be a boolean TRUE/FALSE. The filter operation applies an xpath expression against a string. In the example below, the string being evaluated is "john" and the xpath expression applied is "/name[.='john']". In this example, the result of the filterString will be a TRUE.
XPathProcessor.filterString("<name>john</name>", "/name[.='john']")
If there is an error encountered during the operation, the error will be logged but an exception will not be raised. Instead, a value of FALSE will be returned.
The result of an evaluation operation will be the result of the xpath expression (serialized as a string). Like the filter operation, the evaluation operation applies an xpath expression against a string. In the example below, the string being evaluated is "john" and the xpath expression applied is "/name[.='john']". In this example, the result of the evaluateString will be a "john".
XPathProcessor.evaluateString("<name>john</name>", "/name[.='john']")
If there is an error encountered during the operation, the error will be logged but an exception will not be raised. Instead, a value of "" will be returned.
This is simply a helper method that would allow you to clear any namespace prefix/uri mappings that would have been cached. Typically, this would be followed by an XPathProcessor.init() to re-initialize any mappings.
XPathProcessor.clear()
There are some other methods for both filter and evaluation that will retrieve an object from an S3 bucket before applying the xpath expression. Since it is likely rare that these will be used, they are not covered. Feel free to look at the code for their usage. If you decide to use this approach and did not already call XPathProcessor.init() with a bucket/key parameters, you will need to call XPathProcesser.init without any parameters. You will also need to export out the AWS keys (as shown in the init section. )This is necessary to initialize the underlying S3 client so it can retrieve the object from the S3 bucket.
A more complete XPath example is provided in the code samples. The XPath code sample will consist of an XPath driver class (that will be executed on the master) and an XPath worker class (that will be executed on the worker). By using these classes as a template, it should be straightforward to apply modifications to meet your needs.
Prior to starting the shell, we will want to export the AWS access keys (on the master node). Since we are using namespaces in the xpath expressions, this is required.
export AWS_ACCESS_KEY_ID="put-your-value-here"
export AWS_SECRET_ACCESS_KEY="put-your-value-here"
Next copy the spark-xml-utils.jar to the master node. We are assuming you are in the installation directory for Spark on the master and that you have copied the .jar file to the 'lib' folder under this location. Once this is done, execute the following command.
cd spark-install-dir
./bin/spark-shell --jars lib/spark-xml-utils.jar
Let's assume we have created a PairRDD by loading in a hadoop sequence file by executing a command like the following.
scala> val xmlKeyPair = sc.sequenceFile[String,String]("s3n://els-ats/darin/sd-xml/part*").cache()
We now need to initialize the Spark partitions associated with the xmlKeyPairRDD. This is done by executing the following commands. In the example below "els-ats" is the S3 bucket and "Namespaces/SDNamespaceContext" is the key for the file containing the namespace prefix/uri mappings.
scala> import com.elsevier.xml.XPathProcessor
scala> val awsid = System.getenv("AWS_ACCESS_KEY_ID")
scala> val awskey = System.getenv("AWS_SECRET_ACCESS_KEY")
scala> xmlKeyPair.foreachPartition(i => {System.setProperty("AWS_ACCESS_KEY_ID",awsid); System.setProperty("AWS_SECRET_ACCESS_KEY",awskey); XPathProcessor.init("els-ats", "Namespaces/SDNamespaceContext"); })
Now, we are ready to either use filterString to create a new PairRDD from xmlKeyPair where the content type is for 'journals', the stage is 'S300', and the date is between 2000 and 2014 we could use the following command.
scala> val filteredXmlKeyPair = xmlKeyPair.filter(pair => XPathProcessor.filterString(pair._2,"xocs:doc[./xocs:meta[xocs:content-type='JL' and xocs:item-stage='S300' and xocs:cover-date-year > 2000 and xocs:cover-date-year < 2014]]"))
If we then wanted to only get the xocs:meta section for these documents, we could evaluateString and execute the following command.
scala> val metaFilteredXmlKeyPair = filteredXmlKeyPair.mapValues(v => XPathProcessor.evaluateString(v,"/xocs:doc/xocs:meta"))
Keep in mind that the above statements (since they are transformations) will not execute until an action occurs. For example, simply adding a count will force an action.
scala> filteredXmlKeyPair.count
scala> metaFilteredXmlKeyPair.count