文档章节

Mapreduce任务实现邮件监控

闵开慧
 闵开慧
发布于 2015/08/18 11:51
字数 950
阅读 378
收藏 11

Mapreduce任务实现邮件监控


    这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里先将json转换成xml格式然后再发送到邮件。具体代码如下

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URI;
import java.util.Properties;
import java.util.StringTokenizer;

import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.xml.XMLSerializer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;

public class Email {

    private static final String USERNAME = "123456@qq.com";//发送邮件的用户名
    private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码
    private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host

    public static void main(String args[]) {
        try {
            sendEmail("测试邮件", "测试邮件内容!", "test@qq.com");
            System.out.println("email ok !");
        } catch (MessagingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取
     * @param to 目标邮箱(可以多个邮箱,用,号隔开)
     * @param job 通过mapreduce的job获取jobID
     * @param time 通过时间戳访问错误日志路径
     * @throws Exception
     */
    public static void sendErrMail(String to, Job job, String time)
            throws Exception {
        String subject = job.getJobName();
        String message = getErr(job, time);
        LoginMail lm = new LoginMail(USERNAME, PASSWORD);
        // 创建session
        Properties props = new Properties();
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.host", EMAIL_HOST);
        Session session = Session.getDefaultInstance(props, lm);

        // 创建 message
        Message msg = new MimeMessage(session);

        // 设置发送源地址
        msg.setFrom(new InternetAddress(USERNAME));

        // 多用户分解
        StringTokenizer st = new StringTokenizer(to, ",");
        String[] recipients = new String[st.countTokens()];
        int rc = 0;
        while (st.hasMoreTokens())
            recipients[rc++] = st.nextToken();
        InternetAddress[] addressTo = new InternetAddress[recipients.length];
        for (int i = 0; i < recipients.length; i++) {
            addressTo[i] = new InternetAddress(recipients[i]);
        }
        msg.setRecipients(Message.RecipientType.TO, addressTo);

        // 设置邮件主题并发送邮件
        msg.setSubject(subject);
        msg.setContent(message, "text/html;charset=utf-8");
        Transport.send(msg);
    }

    /**
     * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写
     * @param subject 主题
     * @param body 内容
     * @param to 目标邮箱
     * @throws MessagingException
     */
    public static void sendEmail(String subject, String body, String to)
            throws MessagingException {
        LoginMail lm = new LoginMail(USERNAME, PASSWORD);
        // 创建session
        Properties props = new Properties();
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.host", EMAIL_HOST);
        Session session = Session.getDefaultInstance(props, lm);

        // 创建 message
        Message msg = new MimeMessage(session);

        // 设置发送源地址
        msg.setFrom(new InternetAddress(USERNAME));

        // 多用户分解
        StringTokenizer st = new StringTokenizer(to, ",");
        String[] recipients = new String[st.countTokens()];
        int rc = 0;
        while (st.hasMoreTokens())
            recipients[rc++] = st.nextToken();
        InternetAddress[] addressTo = new InternetAddress[recipients.length];
        for (int i = 0; i < recipients.length; i++) {
            addressTo[i] = new InternetAddress(recipients[i]);
        }
        msg.setRecipients(Message.RecipientType.TO, addressTo);

        // 设置邮件主题并发送邮件
        msg.setSubject(subject);
        msg.setContent(body, "text/html;charset=utf-8");
        Transport.send(msg);

    }

    /**
     * @category 获取日志文件
     * @param job
     * @param time
     * @return FSDataInputStream
     * @throws IOException
     */
    public static FSDataInputStream getFile(Job job, String time)
            throws IOException {
        String year = time.substring(0, 4);
        String month = time.substring(4, 6);
        String day = time.substring(6, 8);
        String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/"
                + year + "/" + month + "/" + day + "/000000";
        FileSystem fs = FileSystem.get(URI.create(dst), new Configuration());
        FileStatus[] status = fs.listStatus(new Path(dst));
        FSDataInputStream in = null;
        for (int i = 0; i < status.length; i++) {
            if (status[i].getPath().getName()
                    .contains(job.getJobID().toString())
                    && status[i].getPath().getName().endsWith("jhist")) {
                in = new FSDataInputStream(fs.open(status[i].getPath()));
            }
        }
        return in;
    }

    /**
     * @category 解析文件类容为xml
     * @param job
     * @param time
     * @return xml
     * @throws IOException
     * @throws InterruptedException
     */
    public static String getErr(Job job, String time) throws IOException,
            InterruptedException {
        FSDataInputStream in = getFile(job, time);
        Thread t1 = new Thread();
        while (in == null) {
            t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成
            t1.join();
            in = getFile(job, time);
        }
        BufferedReader br = new BufferedReader(new InputStreamReader(in));

        String line = "";
        JSONObject jo;
        JSONArray jsa = new JSONArray();
        String xml = "";
        XMLSerializer xmlSerializer = new XMLSerializer();
        while ((line = br.readLine()) != null) {
            if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) {
                jo = JSONObject.fromObject(line);
                jsa.add(jo);
            }
        }
        xml = xmlSerializer.write(jsa);
        in.close();
        br.close();
        return xml;

    }

    /**
     * @category 获取try-catch中的异常内容
     * @param e Exception
     * @return 异常内容
     */
    public static String getException(Exception e) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        PrintStream pout = new PrintStream(out);
        e.printStackTrace(pout);
        String ret = new String(out.toByteArray());
        pout.close();
        try {
            out.close();
        } catch (Exception ex) {
        }
        return ret;
    }
}

class LoginMail extends Authenticator {

    private String username;
    private String password;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    protected PasswordAuthentication getPasswordAuthentication() {
        return new PasswordAuthentication(username, password);
    }

    public LoginMail(String username, String password) {
        this.username = username;
        this.password = password;
    }
}


© 著作权归作者所有

共有 人打赏支持
闵开慧
粉丝 334
博文 607
码字总数 266601
作品 0
青浦
高级程序员
Hadoop中的MapReduce(5)

在MapReduce中,它也是主从结构,主节点:JobTracker,从节点:TaskTracker。主节点只有一个从节点有很多个,主节点在主机上,从节点分布到其他机器上。 JobTracker: 作用: 1、负责接收用户...

肖鋭
2014/02/23
0
0
大数据开发 | MapReduce介绍

1. MapReduce 介绍 1.1MapReduce的作用 假设有一个计算文件中单词个数的需求,文件比较多也比较大,在单击运行的时候机器的内存受限,磁盘受限,运算能力受限,而一旦将单机版程序扩展到集群...

嘿你好夏天
04/18
0
0
hadoop 学习笔记:mapreduce框架详解

这个觉得写得特别的详细,有一些细节可能要去看书,会理解的更好点,,,   Mapreduce初析   Mapreduce是一个计算框架,既然是做计算的框架,那么表现形式就是有个输入(input),mapre...

LIPING234
2013/10/25
0
0
2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门

1.MapReduce的简单概念 百度百科:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的...

查封炉台
2014/11/16
0
8
Hadoop MapReduce计算框架

1、MapReduce理论 1.1、MapReduce是什么? MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据);MapReduce是谷歌公...

巴利奇
06/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

MySQL 乱七八糟的可重复读隔离级别实现

MySQL 乱七八糟的可重复读隔离级别实现 摘要: 原文可阅读 http://www.iocoder.cn/Fight/MySQL-messy-implementation-of-repeatable-read-isolation-levels 「shimohq」欢迎转载,保留摘要,谢...

DemonsI
今天
2
0
Spring源码阅读——2

在阅读源码之前,先了解下Spring的整体架构: 1、Spring的整体架构 1. Ioc(控制反转) Spring核心模块实现了Ioc的功能,它将类与类之间的依赖从代码中脱离出来,用配置的方式进行依赖关系描...

叶枫啦啦
今天
1
0
jQuery.post() 函数格式详解

jquery的Post方法$.post() $.post是jquery自带的一个方法,使用前需要引入jquery.js 语法:$.post(url,data,callback,type); url(必须):发送请求的地址,String类型 data(可选):发送给后台的...

森火
今天
0
0
referer是什么意思?

看看下面这个回答(打不开网页可以把网址复制到搜索栏): https://zhidao.baidu.com/question/577842068.html

杉下
今天
1
0
使用U盘安装CentOS-解决U盘找不到源

1. 使用UltraISO制作CentOS安装盘 如果需要安装带界面的系统,为保证安装顺利,可选择Everything版本的ISO制作安装盘。 2. 在BIOS中选择使用U盘安装 系统启动后,进入安装选择界面,其中有三...

Houor
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部