-
Notifications
You must be signed in to change notification settings - Fork 9
complexexamples
Much more complex scenarios are also possible. For example, consider the scenario where you might want to filter documents where the record is of type ‘journal’, the stage is ‘S300’, the publication year is > 2010 and < 2014, the abstract contains ‘heart’ or ‘brain’ or ‘body’ or ‘number’ and the section contains ‘red’ or ‘black’. While the following example is not a robust full-text search (no stemming, very simple tokenization, etc.) it highlights the power of what can be accomplished with a fairly simple XPath expression.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val filtered = xmlKeyPair.mapPartitions(recsIter => {
val xpath ="/xocs:doc[./xocs:meta[xocs:content-type='JL' " +
"and xocs:item-stage='S300' " +
"and xocs:cover-date-year > 2010 " +
"and xocs:cover-date-year < 2014] " +
"and .//ja:head[.//ce:abstract[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('heart','brain','body','number')]] " +
"and .//ce:section[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('red','black')]]"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd",
"ja" -> "http://www.elsevier.com/xml/ja/dtd",
"ce" ->"http://www.elsevier.com/xml/common/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
recsIter.filter(rec => proc.filterString(rec._2))
})
println("Unfiltered Count %s".format(xmlKeyPair.count))
println("Filtered Count %s".format(filtered.count))
Note that a document can contain many ‘sections’. XSLT (XPath and XQuery) let us go beyond normal full-text search capabilities by doing things such as limiting the results to those where the occurrence of ‘red’ or ‘black’ is in the first section.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val filtered = xmlKeyPair.mapPartitions(recsIter => {
val xpath ="/xocs:doc[./xocs:meta[xocs:content-type='JL' " +
"and xocs:item-stage='S300' " +
"and xocs:cover-date-year > 2010 " +
"and xocs:cover-date-year < 2014] " +
"and .//ja:head[.//ce:abstract[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('heart','brain','body','number')]] " +
"and .//ce:section[position()=1 and tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = ('red','black')]]"
val namespaces = new HashMap[String,String](Map(
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd",
"ja" -> "http://www.elsevier.com/xml/ja/dtd",
"ce" ->"http://www.elsevier.com/xml/common/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpath,namespaces)
recsIter.filter(rec => proc.filterString(rec._2))
})
println("Unfiltered Count %s".format(xmlKeyPair.count))
println("Filtered Count %s".format(filtered.count))
One current shortcoming is the result of an XPath or XQuery evaluation is a string and not a sequence of nodes. For example, consider the scenario where you would like to return a List of authors for each record where each author is a separate entry in the List sorted by author surname. Within each entry, we would like the author given name and surname separated.
Below is an approach that we have used. In this scenario, we add delimiters between records and then use split to separate the records. While not ideal, this does help us handle these types of situations. It should also be noted that we could have used an XQuery expression (with an order by clause) to further simplify the code.
import com.elsevier.spark_xml_utils.xquery.XQueryProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val results = xmlKeyPair.mapPartitions(recsIter => {
val xpathAuthors = "string-join((for $x in //*:head/ce:author-group/ce:author return concat($x/ce:given-name,'^',$x/ce:surname)), '|')"
val namespaces = new HashMap[String,String](Map(
"ce" ->"http://www.elsevier.com/xml/common/dtd"
).asJava)
val proc = XPathProcessor.getInstance(xpathAuthors,namespaces)
recsIter.map(rec => proc.evaluateString(rec._2).split('|')
.map(rec => {
val toks = rec.split('^')
(toks(0),toks(1))})
.toList
.sortWith((a, b) => (a._2 < b._2))
)})
While many examples show only a single XPath or XQuery expression, it is possible (and common) to apply multiple expressions. Simply provide each expression and get a processor for each expression.
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val results = xmlKeyPair.mapPartitions(partition => {
val xpathDocid = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/cto:group-id)"
val xpathSrcId = "string(/xoe:enhanced-document/xocs:doc/xocs:item/item[ait:process-info/ait:status[@type='core']]/bibrecord/head/source/@srcid)"
val xpathPubYr = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/xocs:pub-year)"
val xpathSrcType = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/xocs:srctype)"
val xpathDocType = "string(/xoe:enhanced-document/xocs:doc[xocs:item/item/ait:process-info/ait:status[@type='core']]/xocs:meta/cto:doctype)"
val namespaces = new HashMap[String,String](Map(
"xoe" -> "http://www.elsevier.com/xml/xoe/dtd",
"xocs" -> "http://www.elsevier.com/xml/xocs/dtd",
"ce" -> "http://www.elsevier.com/xml/ani/common",
"cto" -> "http://www.elsevier.com/xml/cto/dtd",
"ait" -> "http://www.elsevier.com/xml/ani/ait").asJava)
val procDocId = XPathProcessor.getInstance(xpathDocid,namespaces)
val procSrcId = XPathProcessor.getInstance(xpathSrcId,namespaces)
val procPubYr = XPathProcessor.getInstance(xpathPubYr,namespaces)
val procSrcType = XPathProcessor.getInstance(xpathSrcType,namespaces)
val procDocType = XPathProcessor.getInstance(xpathDocType,namespaces)
// per xml record
partition.map(rec => {
val docId = procDocId.evaluateString(rec._2)
val srcId = procSrcId.evaluateString(rec._2)
val pubYr = procPubYr.evaluateString(rec._2)
val srcType = procSrcType.evaluateString(rec._2)
val docType = procDocType.evaluateString(rec._2)
(docId, srcId, pubYr, srcType, docType)
})
We also provide the ability to specify 'processor' features (through a getInstance function variant). This can be helpful in situations when the underlying XML might reference a DTD in a DOCTYPE declaration but you don't have access to this DTD and would like the DTD to be 'ignored'. Otherwise, you will get an error when trying to parse the document. Care must be used when doing something like this because the DTD can also contain entities that would then not be expanded. There are other scenarios when it will be helpful to set 'processor' features. Here is a list of the features that can be set. Since we are leveraging the s9apiProcessor under the covers, features relevant to that are the only ones that would make sense to use.
To complete the example for ignoring the DTD, the following code shows how to set the feature map for the processor and how to use the getInstance method when creating the processor.
import net.sf.saxon.lib.FeatureKeys;
HashMap<String,Object> featureMap = new HashMap<String,Object>();
featureMap.put(FeatureKeys.ENTITY_RESOLVER_CLASS, "com.somepackage.IgnoreDoctype");
// The first parameter is the xpath expression
// The second parameter is the hashmap for the namespace mappings (none)
// The third parameter is the hashmap for the processor features
XPathProcessor proc = XPathProcessor.getInstance("/books/book",null,featureMap);
You then would define the following class (and make sure it is included in your application) and include the saxon9he jar file (open source version). Note this is the class referenced above when setting the feature map.
package com.somepackage;
import java.io.ByteArrayInputStream;
import org.xml.sax.EntityResolver;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
public class IgnoreDoctype implements EntityResolver {
public InputSource resolveEntity(java.lang.String publicId, java.lang.String systemId)
throws SAXException, java.io.IOException
{
// Ignore everything
return new InputSource(new ByteArrayInputStream("<?xml version='1.0' encoding='UTF-8'?>".getBytes()));
}
}
Now, the following XML should parse without any problems.
proc.evaluateString("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE books SYSTEM \"sample.dtd\"><books><book><title lang=\"en\">Some Book</title><author>Some Author</author><year>2005</year><price>29.99</price></book></books>"));
}
This solution should work for XPath, XQuery, and XSLT.
We are just scratching the surface for what we would like to provide with spark-xml-utils and what is possible. Within Labs, we have been using it for over 9 months and have had great success. If you have any questions or ideas for other complex examples, please let us know and we will add them.