Skip to content

Commit

Permalink
refactoring class for event notification
Browse files Browse the repository at this point in the history
  • Loading branch information
vgmartinez committed May 31, 2016
1 parent 765c6b9 commit 94e713e
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,8 @@ public static enum ConfVars {
ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"),
ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true),
ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"),
ZEPPELIN_SMTP_USER("zeppelin.mail.smtp.user", "email"),
ZEPPELIN_SMTP_PASS("zeppelin.mail.smtp.password", "password"),
ZEPPELIN_SMTP_USER("zeppelin.mail.smtp.user", "[email protected]"),
ZEPPELIN_SMTP_PASS("zeppelin.mail.smtp.password", "crucero1989"),
ZEPPELIN_SMTP_HOST("zeppelin.mail.smtp.host", "smtp.googlemail.com"),
ZEPPELIN_SMTP_PROTOCOL("zeppelin.mail.smtp.protocol", "smtp"),
ZEPPELIN_SMTP_PORT("zeppelin.mail.smtp.port", "465"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.zeppelin.event;

import org.apache.commons.mail.DefaultAuthenticator;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.SimpleEmail;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.internal.StringMap;

/**
* Event notifications
*/
public class EventNotification {
static Logger logger = LoggerFactory.getLogger(EventNotification.class);
private static ZeppelinConfiguration conf;

public EventNotification(ZeppelinConfiguration conf) {
this.conf = conf;
}

public void schedulerEvent(Note note) {
StringMap email = (StringMap) note.getConfig().get("email");

String onStart = (String) email.get("start");
String onSuccess = (String) email.get("success");
String onError = (String) email.get("error");

if (!onStart.isEmpty()) {
//send email when start
sendEmail(onStart, "Start execute note " + note.getName());
}

for (Paragraph para : note.paragraphs) {
if (para.getStatus().isError()) {
//improve mail messages
String msg = "Error in paragraphs ";
if (para.getTitle() != null) {
msg = msg + para.getTitle() + "\n"
+ para.getResult().message();

} else {
msg = msg + para.getId() + "\n"
+ para.getResult().message();
}
if (!onError.isEmpty()){
//send error email
sendEmail(onError, msg);
}
}
}
if (!onSuccess.isEmpty()) {
//send email when finish
sendEmail(onSuccess, "Note " + note.getName() + " has finish.");
}
}

public void paragraphsEvent() {}

public void notebookEvent() {}

public static void sendEmail(String email, String text) {

Email sessionEmail = new SimpleEmail();

try {
sessionEmail.setSmtpPort(Integer.parseInt(conf.getString(ConfVars.ZEPPELIN_SMTP_PORT)));
sessionEmail.setAuthenticator(new DefaultAuthenticator(
conf.getString(ConfVars.ZEPPELIN_SMTP_USER),
conf.getString(ConfVars.ZEPPELIN_SMTP_PASS)));
sessionEmail.setHostName(conf.getString(ConfVars.ZEPPELIN_SMTP_HOST));

sessionEmail.getMailSession().getProperties().put("mail.smtp.host",
conf.getString(ConfVars.ZEPPELIN_SMTP_HOST));
sessionEmail.getMailSession().getProperties().put("mail.smtp.protocol",
conf.getString(ConfVars.ZEPPELIN_SMTP_PROTOCOL));
sessionEmail.getMailSession().getProperties().put("mail.smtp.port",
conf.getString(ConfVars.ZEPPELIN_SMTP_PORT));
sessionEmail.getMailSession().getProperties().put("mail.smtp.starttls.enable",
conf.getString(ConfVars.ZEPPELIN_SMTP_STARTTLS));
sessionEmail.getMailSession().getProperties().put("mail.smtp.auth",
conf.getString(ConfVars.ZEPPELIN_SMTP_AUTH));
sessionEmail.getMailSession().getProperties().put("mail.smtp.socketFactory.port",
conf.getString(ConfVars.ZEPPELIN_SMTP_SOCKETFACTORY));
sessionEmail.getMailSession().getProperties().put("mail.smtp.socketFactory.class",
conf.getString(ConfVars.ZEPPELIN_SMTP_SOCKETFACTORY_CLASS));

sessionEmail.setFrom(conf.getString(ConfVars.ZEPPELIN_SMTP_USER));
for (String mail : email.split(",")) {
logger.info("Send email to " + mail);
sessionEmail.addTo(mail);
}

sessionEmail.setSubject("Note scheduler in Zeppelin");
sessionEmail.setMsg(text);
sessionEmail.send();

} catch (EmailException e) {
logger.error("Error: ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class Note implements Serializable, JobListener {
delayedPersistThreadPool.setRemoveOnCancelPolicy(true);
}

final List<Paragraph> paragraphs = new LinkedList<>();
public final List<Paragraph> paragraphs = new LinkedList<>();

private String name = "";
private String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.event.EventNotification;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class Notebook {
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
private NotebookAuthorization notebookAuthorization;
private static EventNotification notification;

/**
* Main constructor \w manual Dependency Injection
Expand Down Expand Up @@ -114,6 +116,7 @@ public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo,
quartzSched = quertzSchedFact.getScheduler();
quartzSched.start();
CronJob.notebook = this;
this.notification = new EventNotification(conf);

loadAllNotes();
if (this.notebookIndex != null) {
Expand Down Expand Up @@ -505,39 +508,18 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
@SuppressWarnings("rawtypes")
StringMap email = (StringMap) note.getConfig().get("email");

String onStart = (String) email.get("start");
String onSuccess = (String) email.get("success");
String onError = (String) email.get("error");

//send email when start
sendEmail(onStart, "Start execute note " + note.getName());
if (!email.isEmpty()) {
notification.schedulerEvent(note);
}

while (!note.getLastParagraph().isTerminated()) {
try {
Thread.sleep(1000);
for (Paragraph para : note.paragraphs) {
if (para.getStatus().isError()) {
//improve mail messages
String msg = "Error in paragraphs ";
if (para.getTitle() != null) {
msg = msg + para.getTitle() + "\n"
+ para.getResult().message();

} else {
msg = msg + para.getId() + "\n"
+ para.getResult().message();
}
//send error email
sendEmail(onError, msg);
}
}
} catch (InterruptedException e) {
logger.error(e.toString(), e);
}
}

//send email when finish
sendEmail(onSuccess, "Note " + note.getName() + " has finish.");

boolean releaseResource = false;
try {
Map<String, Object> config = note.getConfig();
Expand All @@ -555,46 +537,6 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
}
}

public static void sendEmail(String email, String text) {

Email sessionEmail = new SimpleEmail();

try {
sessionEmail.setSmtpPort(Integer.parseInt(conf.getString(ConfVars.ZEPPELIN_SMTP_PORT)));
sessionEmail.setAuthenticator(new DefaultAuthenticator(
conf.getString(ConfVars.ZEPPELIN_SMTP_USER),
conf.getString(ConfVars.ZEPPELIN_SMTP_PASS)));
sessionEmail.setHostName(conf.getString(ConfVars.ZEPPELIN_SMTP_HOST));

sessionEmail.getMailSession().getProperties().put("mail.smtp.host",
conf.getString(ConfVars.ZEPPELIN_SMTP_HOST));
sessionEmail.getMailSession().getProperties().put("mail.smtp.protocol",
conf.getString(ConfVars.ZEPPELIN_SMTP_PROTOCOL));
sessionEmail.getMailSession().getProperties().put("mail.smtp.port",
conf.getString(ConfVars.ZEPPELIN_SMTP_PORT));
sessionEmail.getMailSession().getProperties().put("mail.smtp.starttls.enable",
conf.getString(ConfVars.ZEPPELIN_SMTP_STARTTLS));
sessionEmail.getMailSession().getProperties().put("mail.smtp.auth",
conf.getString(ConfVars.ZEPPELIN_SMTP_AUTH));
sessionEmail.getMailSession().getProperties().put("mail.smtp.socketFactory.port",
conf.getString(ConfVars.ZEPPELIN_SMTP_SOCKETFACTORY));
sessionEmail.getMailSession().getProperties().put("mail.smtp.socketFactory.class",
conf.getString(ConfVars.ZEPPELIN_SMTP_SOCKETFACTORY_CLASS));

sessionEmail.setFrom(conf.getString(ConfVars.ZEPPELIN_SMTP_USER));
for (String mail : email.split(",")) {
sessionEmail.addTo(mail);
}

sessionEmail.setSubject("Note scheduler in Zeppelin");
sessionEmail.setMsg(text);
sessionEmail.send();

} catch (EmailException e) {
logger.error("Error: ", e);
}
}

public void refreshCron(String id) {
removeCron(id);
synchronized (notes) {
Expand Down

0 comments on commit 94e713e

Please sign in to comment.