Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZEPPELIN-1395] Local or Remote Interpreter by Configuration #1385

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public class SparkInterpreter extends Interpreter {


public SparkInterpreter(Properties property) {
super(property);
super(property);
out = new SparkOutputStream(logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,6 @@ public static class RegisteredInterpreter {
private Map<String, InterpreterProperty> properties;
private String path;

public RegisteredInterpreter(String name, String group, String className,
Map<String, InterpreterProperty> properties) {
this(name, group, className, false, properties);
}

public RegisteredInterpreter(String name, String group, String className,
boolean defaultInterpreter, Map<String, InterpreterProperty> properties) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ <h4>Create new interpreter</h4>
</div>
<br />

<div class="col-md-12" style="padding-left:0px">
<div class="checkbox">
<span class="input-group" style="line-height:30px;">
<label><input type="checkbox" style="width:20px" ng-model="newInterpreterSetting.option.remote"/> Remote Interpreter </label>
</span>
</div>
</div>
<div class="col-md-12" style="padding-left:0px">
<div class="checkbox">
<span class="input-group" style="line-height:30px;">
Expand Down
3 changes: 3 additions & 0 deletions zeppelin-web/src/app/interpreter/interpreter.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ angular.module('zeppelinWebApp').controller('InterpreterCtrl',
if (!setting.option) {
setting.option = {};
}
if (setting.option.remote === undefined) {
setting.option.remote = true;
}
if (setting.option.isExistingProcess === undefined) {
setting.option.isExistingProcess = false;
}
Expand Down
12 changes: 11 additions & 1 deletion zeppelin-web/src/app/interpreter/interpreter.html
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,22 @@ <h5>Option</h5>
<span>Interpreter for note</span>
</div>

<br />
<div class="col-md-12">
<div class="checkbox">
<span class="input-group" style="line-height:30px;">
<label><input type="checkbox" style="width:20px" id="isRemote" ng-model="setting.option.remote" ng-disabled="!valueform.$visible"/>
Remote Interpreter </label>
</span>
</div>
</div>

<br />
<div class="col-md-12">
<div class="checkbox">
<span class="input-group" style="line-height:30px;">
<label><input type="checkbox" style="width:20px" id="isExistingProcess" ng-model="setting.option.isExistingProcess" ng-disabled="!valueform.$visible"/>
Connect to existing process </label>
Connect to existing process </label>
</span>
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ public String getShiroPath() {
return getRelativeDir(String.format("%s/shiro.ini", getConfDir()));
}

public boolean getInterpreterRemote() {
return getBoolean(ConfVars.ZEPPELIN_INTERPRETER_REMOTE);
}

public String getInterpreterRemoteRunnerPath() {
return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
}
Expand Down Expand Up @@ -548,6 +552,7 @@ public static enum ConfVars {
ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
ZEPPELIN_INTERPRETER_REMOTE("zeppelin.interpreter.remote", true),
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
System.getProperty("os.name")
.startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Set;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
Expand Down Expand Up @@ -157,10 +158,12 @@ public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultO
}

private void init() throws InterpreterException, IOException, RepositoryException {

String interpreterJson = conf.getInterpreterJson();
ClassLoader cl = Thread.currentThread().getContextClassLoader();

Path interpretersDir = Paths.get(conf.getInterpreterDir());

if (Files.exists(interpretersDir)) {
for (Path interpreterDir : Files
.newDirectoryStream(interpretersDir, new DirectoryStream.Filter<Path>() {
Expand All @@ -169,18 +172,22 @@ public boolean accept(Path entry) throws IOException {
return Files.exists(entry) && Files.isDirectory(entry);
}
})) {

String interpreterDirString = interpreterDir.toString();

registerInterpreterFromPath(interpreterDirString, interpreterJson);
URL[] urls = recursiveBuildLibList(new File(interpreterDirString));
URLClassLoader urlClassLoader = new URLClassLoader(urls, cl);

registerInterpreterFromResource(cl, interpreterDirString, interpreterJson);
registerInterpreterFromPath(urlClassLoader, interpreterDir, interpreterJson);

registerInterpreterFromResource(urlClassLoader, interpreterDir, interpreterJson);

/*
* TODO(jongyoul)
* - Remove these codes below because of legacy code
* - Support ThreadInterpreter
*/
URLClassLoader ccl = new URLClassLoader(recursiveBuildLibList(interpreterDir.toFile()), cl);
URL[] urls = recursiveBuildLibList(interpreterDir.toFile());
URLClassLoader ccl = new URLClassLoader(urls, cl);
for (String className : interpreterClassList) {
try {
// Load classes
Expand All @@ -192,13 +199,14 @@ public boolean accept(Path entry) throws IOException {
Interpreter.registeredInterpreters.get(interpreterKey)
.setPath(interpreterDirString);
logger.info("Interpreter " + interpreterKey + " found. class=" + className);
cleanCl.put(interpreterDirString, ccl);
}
}
cleanCl.put(interpreterDirString, ccl);
} catch (Throwable t) {
// nothing to do
}
}
*/
}
}

Expand Down Expand Up @@ -277,31 +285,33 @@ private Properties convertInterpreterProperties(Map<String, InterpreterProperty>
return properties;
}

private void registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
private void registerInterpreterFromResource(URLClassLoader cl, Path interpreterDir,
String interpreterJson) throws IOException, RepositoryException {
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
ClassLoader tempClassLoader = new URLClassLoader(urls, cl);

InputStream inputStream = tempClassLoader.getResourceAsStream(interpreterJson);
InputStream inputStream = cl.getResourceAsStream(interpreterJson);

if (null != inputStream) {
logger.debug("Reading {} from resources in {}", interpreterJson, interpreterDir);
List<RegisteredInterpreter> registeredInterpreterList =
getInterpreterListFromJson(inputStream);
registerInterpreters(registeredInterpreterList, interpreterDir);
registerInterpreters(cl, registeredInterpreterList, interpreterDir);
}

}

private void registerInterpreterFromPath(String interpreterDir, String interpreterJson)
private void registerInterpreterFromPath(URLClassLoader cl,
Path interpreterDir, String interpreterJson)
throws IOException, RepositoryException {

Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
Path interpreterJsonPath = Paths.get(interpreterDir.toString(), interpreterJson);

if (Files.exists(interpreterJsonPath)) {
logger.debug("Reading {}", interpreterJsonPath);
List<RegisteredInterpreter> registeredInterpreterList =
getInterpreterListFromJson(interpreterJsonPath);
registerInterpreters(registeredInterpreterList, interpreterDir);
registerInterpreters(cl, registeredInterpreterList, interpreterDir);
}

}

private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename)
Expand All @@ -315,8 +325,11 @@ private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream strea
return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType);
}

private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
String absolutePath) throws IOException, RepositoryException {
private void registerInterpreters(URLClassLoader cl,
List<RegisteredInterpreter> registeredInterpreters,
Path interpreterDir) throws IOException, RepositoryException {

cleanCl.put(interpreterDir.toFile().getAbsolutePath(), cl);

for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
InterpreterInfo interpreterInfo =
Expand All @@ -331,7 +344,8 @@ private void registerInterpreters(List<RegisteredInterpreter> registeredInterpre
}
}

add(registeredInterpreter.getGroup(), interpreterInfo, properties, absolutePath);
add(registeredInterpreter.getGroup(),
interpreterInfo, properties, interpreterDir.toFile().getAbsolutePath());
}

}
Expand Down Expand Up @@ -361,12 +375,6 @@ private void loadFromFile() throws IOException {
for (String k : info.interpreterSettings.keySet()) {
InterpreterSetting setting = info.interpreterSettings.get(k);

// Always use separate interpreter process
// While we decided to turn this feature on always (without providing
// enable/disable option on GUI).
// previously created setting should turn this feature on here.
setting.getOption().setRemote(true);

// Update transient information from InterpreterSettingRef
interpreterSettingObject = interpreterSettingsRef.get(setting.getGroup());
if (interpreterSettingObject == null) {
Expand Down Expand Up @@ -918,9 +926,11 @@ public void run() {

private Interpreter createRepl(String dirName, String className, Properties property)
throws InterpreterException {

logger.info("Create repl {} from {}", className, dirName);

ClassLoader oldcl = Thread.currentThread().getContextClassLoader();

try {

URLClassLoader ccl = cleanCl.get(dirName);
Expand All @@ -929,32 +939,17 @@ private Interpreter createRepl(String dirName, String className, Properties prop
ccl = URLClassLoader.newInstance(new URL[] {}, oldcl);
}

boolean separateCL = true;
try { // check if server's classloader has driver already.
Class cls = this.getClass().forName(className);
if (cls != null) {
separateCL = false;
}
} catch (Exception e) {
logger.error("exception checking server classloader driver", e);
}

URLClassLoader cl;
Thread.currentThread().setContextClassLoader(ccl);

if (separateCL == true) {
cl = URLClassLoader.newInstance(new URL[] {}, ccl);
} else {
cl = ccl;
}
Thread.currentThread().setContextClassLoader(cl);

Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
Class<Interpreter> replClass = (Class<Interpreter>) ccl.loadClass(className);
Constructor<Interpreter> constructor =
replClass.getConstructor(new Class[] {Properties.class});
Interpreter repl = constructor.newInstance(property);
repl.setClassloaderUrls(ccl.getURLs());
LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl));
LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, ccl));

return intp;

} catch (SecurityException e) {
throw new InterpreterException(e);
} catch (NoSuchMethodException e) {
Expand Down Expand Up @@ -1018,15 +1013,19 @@ public List<InterpreterSetting> getInterpreterSettings(String noteId) {
List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId);
LinkedList<InterpreterSetting> settings = new LinkedList<>();
synchronized (interpreterSettingIds) {
List<String> idsToBeRemoved = Lists.newArrayList();
for (String id : interpreterSettingIds) {
InterpreterSetting setting = get(id);
if (setting == null) {
// interpreter setting is removed from factory. remove id from here, too
interpreterSettingIds.remove(id);
idsToBeRemoved.add(id);
} else {
settings.add(setting);
}
}
for (String id: idsToBeRemoved) {
interpreterSettingIds.remove(id);
}
}
return settings;
}
Expand Down Expand Up @@ -1245,8 +1244,7 @@ public void setEnv(Map<String, String> env) {

private Interpreter getDevInterpreter() {
if (devInterpreter == null) {
InterpreterOption option = new InterpreterOption();
option.setRemote(true);
InterpreterOption option = new InterpreterOption(conf.getInterpreterRemote());

InterpreterGroup interpreterGroup = createInterpreterGroup("dev", option);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

/**
* Information of interpreters in this interpreter setting.
* this will be serialized for conf/interpreter.json and REST api response.
* this will be serialized for conf/interpreter-setting.json and REST api response.
*/
public class InterpreterInfo {
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class InterpreterOption {
boolean setPermission;
List<String> users;

private InterpreterOption() {}

public InterpreterOption(boolean remote) {
this.remote = remote;
}

public boolean isExistingProcess() {
return isExistingProcess;
}
Expand Down Expand Up @@ -61,14 +67,6 @@ public List<String> getUsers() {
return users;
}

public InterpreterOption() {
remote = false;
}

public InterpreterOption(boolean remote) {
this.remote = remote;
}

public boolean isRemote() {
return remote;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.gson.annotations.SerializedName;

import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.Dependency;

import static org.apache.zeppelin.notebook.utility.IdHashes.generateId;
Expand All @@ -48,6 +49,7 @@ public class InterpreterSetting {
private List<Dependency> dependencies;
private InterpreterOption option;
private transient String path;
private transient ZeppelinConfiguration conf = ZeppelinConfiguration.create();

@Deprecated private transient InterpreterGroupFactory interpreterGroupFactory;

Expand Down Expand Up @@ -79,7 +81,7 @@ public InterpreterSetting(String name, String group, List<InterpreterInfo> inter
*
* @param o interpreterSetting from interpreterSettingRef
*/
public InterpreterSetting(InterpreterSetting o) {
public InterpreterSetting(InterpreterSetting o, ZeppelinConfiguration conf) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It remains ZeppelinConfiguration. Could you check it again?

this(generateId(), o.getName(), o.getGroup(), o.getInterpreterInfos(), o.getProperties(),
o.getDependencies(), o.getOption(), o.getPath());
}
Expand Down Expand Up @@ -164,7 +166,7 @@ public void setDependencies(List<Dependency> dependencies) {

public InterpreterOption getOption() {
if (option == null) {
option = new InterpreterOption();
option = new InterpreterOption(conf.getInterpreterRemote());
}

return option;
Expand Down