Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource pools api #822

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
cbdb22d
Added serialization to resourcepoolutils.
fireboy1919 Mar 15, 2016
ffd092b
Added pool persistance.
fireboy1919 Mar 15, 2016
84cabe2
Added test.
fireboy1919 Mar 16, 2016
ffce0d9
Fixed merge conflict.
fireboy1919 Mar 17, 2016
187e45e
Merge remote-tracking branch 'apache/master' into persist_pool
fireboy1919 Mar 17, 2016
f1f0d91
Changed resource pool utils so that it will work anywhere.
fireboy1919 Mar 28, 2016
a19c48e
Removed unnecessary serialization.
fireboy1919 Mar 28, 2016
8fa84a8
Added explicit save to the serializer.
fireboy1919 Mar 28, 2016
1f2d2ec
Added vfs resource pool.
fireboy1919 Mar 21, 2016
5ab2962
Added pluggable resource pool.
fireboy1919 Apr 5, 2016
47e0ee5
Added missing rename.
fireboy1919 Apr 6, 2016
3c0e0c2
Added an API to access resource pools.
fireboy1919 Apr 6, 2016
cb4c167
Added a constructor to DistributedResourcePools to fix tests.
fireboy1919 Apr 6, 2016
cf658e3
Switched to java 7 method for java.util.properties.
fireboy1919 Apr 6, 2016
cb5519a
Removed the need for resource pools during tests.
fireboy1919 Apr 6, 2016
c0cd60d
Adding missing license file.
fireboy1919 Apr 6, 2016
f3fb719
Set mock interpreter to register properly.
fireboy1919 Apr 11, 2016
85a48c1
Merge branch 'pool_pluggable' into resource_pools_api
fireboy1919 Apr 11, 2016
8e2fe6a
Fixed typo and addressed issues with resource pool connector.
fireboy1919 Apr 11, 2016
e422d07
Removed requirement for spark dependency from abstract test rest api.
fireboy1919 Apr 12, 2016
1c8c53c
Fixed error with the JSON response.
fireboy1919 Apr 12, 2016
43fa7b9
Added serialization to resourcepoolutils.
fireboy1919 Mar 15, 2016
5de5756
Added pool persistance.
fireboy1919 Mar 15, 2016
e633c44
Added test.
fireboy1919 Mar 16, 2016
71609fa
Changed resource pool utils so that it will work anywhere.
fireboy1919 Mar 28, 2016
6a22e3b
Removed unnecessary serialization.
fireboy1919 Mar 28, 2016
17fed3d
Added explicit save to the serializer.
fireboy1919 Mar 28, 2016
067da37
Added vfs resource pool.
fireboy1919 Mar 21, 2016
0cfeb33
Added pluggable resource pool.
fireboy1919 Apr 5, 2016
52cf1be
Added missing rename.
fireboy1919 Apr 6, 2016
a1b0bbd
Added a constructor to DistributedResourcePools to fix tests.
fireboy1919 Apr 6, 2016
f241272
Switched to java 7 method for java.util.properties.
fireboy1919 Apr 6, 2016
3ed89b0
Removed the need for resource pools during tests.
fireboy1919 Apr 6, 2016
0fa4b32
Adding missing license file.
fireboy1919 Apr 6, 2016
69d11d3
Set mock interpreter to register properly.
fireboy1919 Apr 11, 2016
df46376
Switched interpreter to use default configuration.
fireboy1919 Apr 15, 2016
1c29288
Merge remote-tracking branch 'apache/master' into pool_pluggable_rebase
fireboy1919 Apr 18, 2016
bbeacac
Fixed missing comma.
fireboy1919 Apr 18, 2016
da75ca3
Merge branch 'pool_pluggable_rebase' into resource_pools_api
fireboy1919 Apr 18, 2016
8fef1ca
Added if-modified-by header support to notebookrestapi.
fireboy1919 Apr 18, 2016
3993c0d
Added fixes to tests
fireboy1919 Apr 19, 2016
075170a
Merge branch 'pool_pluggable_rebase' into resource_pools_api
fireboy1919 Apr 20, 2016
462cde8
Fixes a style error.
fireboy1919 Apr 20, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion zeppelin-interpreter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,18 @@
<exclusions>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.0</version>
<exclusions>
<exclusion>
<artifactId>plexus-utils</artifactId>
<groupId>org.codehaus.plexus</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,48 @@ public static void main(String[] args)
System.exit(0);
}


private DistributedResourcePool getResourcePool()
/* InterpreterGroup group,
Properties prop,
RemoteInterpreterEventClient client) */
throws TException {
if (resourcePool != null)
return resourcePool;
try {
Properties prop = interpreterGroup.getProperty();
//Happens during tests.
if (prop == null)
prop = new Properties();
String resourcePoolClassName = (String) prop.getProperty(
"zeppelin.interpreter.resourcePoolClass");
logger.debug("Getting resource pool {}", resourcePoolClassName);
Class resourcePoolClass = Class.forName(resourcePoolClassName);

Constructor<ResourcePool> constructor = resourcePoolClass
.getConstructor(new Class[] {String.class,
ResourcePoolConnector.class,
Properties.class });
resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(),
this.eventClient,
prop);
} catch (Exception e) {
logger.error("Did not find resource pool. Using DistributedResourcePool");
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), this.eventClient);
// throw new TException(e);
} finally {
interpreterGroup.setResourcePool(resourcePool);
return resourcePool;
}
}

@Override
public void createInterpreter(String interpreterGroupId, String noteId, String
className,
Map<String, String> properties) throws TException {
Map<String, String> properties) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
interpreterGroup.setResourcePool(resourcePool);
}

try {
Expand All @@ -171,15 +202,19 @@ public void createInterpreter(String interpreterGroupId, String noteId, String
}

logger.info("Instantiate interpreter {}", className);

interpreterGroup.setResourcePool(getResourcePool());

repl.setInterpreterGroup(interpreterGroup);

//setResourcePool(interpreterGroup, p, eventClient);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException
| InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
logger.error(e.toString(), e);
throw new TException(e);
}
}

private void setSystemProperty(Properties properties) {
for (Object key : properties.keySet()) {
if (!RemoteInterpreter.isEnvString((String) key)) {
Expand Down Expand Up @@ -367,11 +402,13 @@ protected Object jobRun() throws Throwable {
}

// put result into resource pool
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ParagraphResult.toString(),
combinedResult);
if (context.getResourcePool() != null) {
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ParagraphResult.toString(),
combinedResult);
}
return combinedResult;
} finally {
InterpreterContext.remove();
Expand Down Expand Up @@ -402,7 +439,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int

@Override
public int getProgress(String noteId, String className,
RemoteInterpreterContext interpreterContext)
RemoteInterpreterContext interpreterContext)
throws TException {
Interpreter intp = getInterpreter(noteId, className);
return intp.getProgress(convert(interpreterContext));
Expand All @@ -425,7 +462,7 @@ public List<String> completion(String noteId, String className, String buf, int
private InterpreterContext convert(RemoteInterpreterContext ric) {
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
new TypeToken<List<RemoteInterpreterContextRunner>>() {
new TypeToken<List<RemoteInterpreterContextRunner>>() {
}.getType());

for (InterpreterContextRunner r : runners) {
Expand Down Expand Up @@ -586,7 +623,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId,
if (value == null) {
try {
value = gson.fromJson(object,
new TypeToken<Map<String, Object>>() {
new TypeToken<Map<String, Object>>() {
}.getType());
} catch (Exception e) {
// it's not a generic json object, too. okay, proceed to threat as a string type
Expand Down Expand Up @@ -622,7 +659,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str
try {
value = gson.fromJson(object,
new TypeToken<Map<String, Object>>() {
}.getType());
}.getType());
} catch (Exception e) {
// it's okay. proceed to treat object as a string
logger.debug(e.getMessage(), e);
Expand All @@ -638,7 +675,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str

@Override
public void angularObjectRemove(String name, String noteId, String paragraphId) throws
TException {
TException {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
registry.remove(name, noteId, paragraphId, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@
*/
package org.apache.zeppelin.resource;

import java.util.Properties;

/**
* distributed resource pool
*/
public class DistributedResourcePool extends LocalResourcePool {

private final ResourcePoolConnector connector;

protected Properties property;

public DistributedResourcePool(String id, ResourcePoolConnector connector) {
super(id);
this.connector = connector;
this.property = new Properties();
}


public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) {
super(id);
this.connector = connector;
this.property = property;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;

import com.google.gson.Gson;
/**
* Makes a remote interpreter service client act as a resource pool connector.
*/
public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector {

private Client client;
private ResourceSet resources;

public RemoteInterpreterProcessResourcePoolConnector(Client client) {
this.client = client;
}

@Override
public ResourceSet getAllResources() {
try {
List<String> resourceList = client.resourcePoolGetAll();
resources = new ResourceSet();
Gson gson = new Gson();

for (String res : resourceList) {
RemoteResource r = gson.fromJson(res, RemoteResource.class);
r.setResourcePoolConnector(this);
resources.add(r);
}

return resources;
} catch (TException e) {
throw new RuntimeException(e);
}
}

@Override
public Object readResource(ResourceId id) {
try {
ByteBuffer buffer = client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName());
for (Resource r: resources) {
if (r.getResourceId().equals(id))
return r.deserializeObject(buffer);
}
return null;
} catch (TException | ClassNotFoundException | IOException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.zeppelin.resource;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
Expand Down Expand Up @@ -54,10 +56,11 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
List<String> resourceList = client.resourcePoolGetAll();
Gson gson = new Gson();
for (String res : resourceList) {
resourceSet.add(gson.fromJson(res, Resource.class));
RemoteInterpreterProcessResourcePoolConnector remoteConnector =
new RemoteInterpreterProcessResourcePoolConnector(client);

for (Resource r: remoteConnector.getAllResources()) {
resourceSet.add(r);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;

import java.lang.reflect.Type;

import com.google.gson.Gson;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
/**
* Serializes and Deserializes resources if they are serializable.
*/
public class ResourceSerializer implements JsonDeserializer<Resource>, JsonSerializer<Resource> {

public ResourceSerializer() {
}

@Override
public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) {
// This is straightforward at the moment.
Gson gson = new Gson();
JsonElement elem = gson.toJsonTree(src);
JsonObject obj = elem.getAsJsonObject();
if (src.isSerializable()) {
obj.add("r", gson.toJsonTree(src.get()));
}
return obj;
}

@Override
public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
// This requires that we use the class that's stored in the element to deserialize.
JsonObject obj = json.getAsJsonObject();
String className = obj.getAsJsonPrimitive("className").getAsString();

Gson gson = new Gson();
Object r;
try {
r = gson.fromJson(obj.get("r"), Class.forName(className));
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unable to deserialize the resource");
}
ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class);

return new Resource(id, r);
}

}
Loading