Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ZEPPELIN-1571. Support pig udf interpreter #1545

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/zeppelin-site.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@

<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter, org.apache.zeppelin.pig.PigUDFInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

Expand Down
23 changes: 23 additions & 0 deletions docs/interpreter/pig.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ group: manual

Almost the same as `%pig.script`. The only difference is that you don't need to add alias in the last statement. And the display type is table.

- '%pig.udf'

Java editor for writing pig udf, these java udf will be compiled and built into jars which will be register to pig automatically.

## Supported runtime mode
- Local
- MapReduce
Expand Down Expand Up @@ -95,3 +99,22 @@ foreach c generate group as category, COUNT($1) as count;
```

Data is shared between `%pig` and `%pig.query`, so that you can do some common work in `%pig`, and do different kinds of query based on the data of `%pig`.

##### pig.udf

```
%pig.udf

import org.apache.pig.data.Tuple;
import org.apache.pig.EvalFunc;
import java.io.IOException;

class UDF1 extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
return "1";
}
}
```

If your udf depends on other third party libraries, you need to specify these libraries in pig's dependency in interpreter setting page. These dependencies will be
registered to pig automatically.
16 changes: 16 additions & 0 deletions pig/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@
<version>${tez.version}</version>
</dependency>

<dependency>
<groupId>com.thoughtworks.qdox</groupId>
<artifactId>qdox</artifactId>
<version>2.0-M3</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -135,6 +141,16 @@
</executions>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<systemPropertyVariables>
<zeppelin.pig.localRepo>${basedir}/src/test/resources</zeppelin.pig.localRepo>
</systemPropertyVariables>
</configuration>
</plugin>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for test in PigUDFInterpreterTest

<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
Expand Down
268 changes: 268 additions & 0 deletions pig/src/main/java/org/apache/zeppelin/pig/PigUDFInterpreter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.pig;

import com.thoughtworks.qdox.JavaProjectBuilder;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the license of this qdox library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add that to LICENSE file?

import com.thoughtworks.qdox.model.JavaClass;
import com.thoughtworks.qdox.model.JavaSource;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pig.PigServer;
import org.apache.zeppelin.interpreter.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.tools.*;
import java.io.*;
import java.net.URI;
import java.util.*;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;

/**
* Interpreter for Pig UDF
*/
public class PigUDFInterpreter extends Interpreter {

private static final Logger LOGGER = LoggerFactory.getLogger(PigUDFInterpreter.class);

private PigServer pigServer;
private String udfBuildClasspath;

public PigUDFInterpreter(Properties property) {
super(property);
}

@Override
public void open() {
pigServer = getPigInterpreter().getPigServer();
// register dependency jars
String localRepo = getProperty("zeppelin.interpreter.localRepo");
if (localRepo != null && new File(localRepo).exists()) {
File[] jars = new File(localRepo).listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.isFile() && pathname.getName().endsWith(".jar");
}
});
StringBuilder classPathBuilder = new StringBuilder(System.getProperty("java.class.path"));
for (File jar : jars) {
try {
pigServer.registerJar(jar.getAbsolutePath());
classPathBuilder.append(":" + jar.getAbsolutePath());
LOGGER.debug("Register dependency jar:" + jar.getAbsolutePath());
} catch (IOException e) {
LOGGER.error("Fail to register dependency jar", e);
}
}
this.udfBuildClasspath = classPathBuilder.toString();
LOGGER.debug("udfBuildClass:" + udfBuildClasspath);
} else {
LOGGER.error("localRepo is missing or doesn't exist, " +
"zeppelin.interpreter.localRepo=" + localRepo);
}
}

@Override
public void close() {

}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
try {
CompiledClass compiledClass = compile(st);
File jarFile = buildJar(compiledClass);
pigServer.registerJar(jarFile.getAbsolutePath());

return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Build successfully");
} catch (Exception e) {
LOGGER.error("Fail to compile/build udf", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
}

@Override
public void cancel(InterpreterContext context) {

}

@Override
public FormType getFormType() {
return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext context) {
return 0;
}


private PigInterpreter getPigInterpreter() {
LazyOpenInterpreter lazy = null;
PigInterpreter pig = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());

while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
pig = (PigInterpreter) p;

if (lazy != null) {
lazy.open();
}
return pig;
}

private CompiledClass compile(String code) throws Exception {

JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();

// Java parsing
JavaProjectBuilder builder = new JavaProjectBuilder();
JavaSource src = builder.addSource(new StringReader(code));

// get all classes in code (paragraph)
List<JavaClass> classes = src.getClasses();
if (classes.size() != 1) {
throw new Exception("Either you doesn't define class or define multiple classes " +
"in on paragraph.");
}
String className = classes.get(0).getName();
String packageName = classes.get(0).getPackageName();
JavaFileObject file = new JavaSourceFromString(className, code.toString());
Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);

ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();

// Creating new stream to get the output data
PrintStream newOut = new PrintStream(baosOut);
PrintStream newErr = new PrintStream(baosErr);
// Save the old System.out!
PrintStream oldOut = System.out;
PrintStream oldErr = System.err;
// Tell Java to use your special stream
System.setOut(newOut);
System.setErr(newErr);

List<String> options = new ArrayList<>();
options.addAll(Arrays.asList("-classpath", udfBuildClasspath));
JavaCompiler.CompilationTask task = compiler.getTask(null, null, diagnostics, options, null,
compilationUnits);

// executing the compilation process
boolean success = task.call();

// if success is false will get error
if (!success) {
for (Diagnostic diagnostic : diagnostics.getDiagnostics()) {
if (diagnostic.getLineNumber() == -1) {
continue;
}
System.err.println("line " + diagnostic.getLineNumber() + " : "
+ diagnostic.getMessage(null));
}
System.out.flush();
System.err.flush();

System.setOut(oldOut);
System.setErr(oldErr);
logger.error("Exception in Interpreter while compilation", baosErr.toString());
throw new Exception(baosErr.toString());
} else {
System.out.flush();
System.err.flush();

// set the stream to old stream
System.setOut(oldOut);
System.setErr(oldErr);
return new CompiledClass(packageName, new File(className + ".class"));
}
}

private File buildJar(CompiledClass clazz) throws IOException {
File tmpJarFile = File.createTempFile("zeppelin_pig", ".jar");
FileOutputStream fOut = null;
JarOutputStream jarOut = null;
try {
fOut = new FileOutputStream(tmpJarFile);
jarOut = new JarOutputStream(fOut);
String entryPath = null;
if (clazz.packageName.isEmpty()) {
entryPath = clazz.classFile.getName();
} else {
entryPath = clazz.packageName.replace(".", "/") + "/" + clazz.classFile.getName();
}
jarOut.putNextEntry(new JarEntry(entryPath));
jarOut.write(FileUtils.readFileToByteArray(clazz.classFile));
jarOut.closeEntry();
LOGGER.debug("pig udf jar is created under " + tmpJarFile.getAbsolutePath());
return tmpJarFile;
} catch (IOException e) {
throw e;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add LOGGER.error

} finally {
if (jarOut != null) {
jarOut.close();
}
if (fOut != null) {
fOut.close();
}
}
}

/**
*
*/
public static class JavaSourceFromString extends SimpleJavaFileObject {
final String code;

JavaSourceFromString(String name, String code) {
super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE);
this.code = code;
}

@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) {
return code;
}
}

/**
*
*/
public static class CompiledClass {
public final String packageName;
public final File classFile;

public CompiledClass(String packageName, File classFile) {
this.packageName = packageName;
this.classFile = classFile;
}
}
}


12 changes: 8 additions & 4 deletions pig/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
"defaultValue": "false",
"description": "flag to include job stats in output"
}
},
"editor": {
"language": "pig"
}
},
{
Expand All @@ -38,9 +35,16 @@
"defaultValue": "1000",
"description": "max row number for %pig.query"
}
}
},
{
"group": "pig",
"name": "udf",
"className": "org.apache.zeppelin.pig.PigUDFInterpreter",
"properties": {
},
"editor": {
"language": "pig"
"language": "java"
}
}
]
Loading