文档章节

用MapReduce 向Hbase 中插入数据

xiaozhou18
 xiaozhou18
发布于 2016/12/04 19:25
字数 643
阅读 126
收藏 0

首先要保证hbase中有要插入的表

package hbasemapperreduce;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HbaseMapper extends Mapper<LongWritable, Text, Text, Text>{
    @Override
    protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
        String line=value.toString();
        String[] splited=line.split("\t");
SimpleDateFormat simpleDateFormatimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
        String format = simpleDateFormatimpleDateFormat.format(new Date(Long.parseLong(splited[0].trim())));
        String rowKey=splited[1]+"_"+format;
        context.write(new Text(rowKey), new Text(rowKey+"\t"+line));
    }
}

package hbasemapperreduce;
import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;;

public class HbaseReduce extends TableReducer<Text, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<Text> arg1,Context arg2)throws IOException, InterruptedException {
        for (Text text : arg1) {
            String value=text.toString();
            String row=key.toString();
            String tel=value.split("\t")[2];
            String apmac=value.split("\t")[3];
            String acmac=value.split("\t")[4];
            String host=value.split("\t")[5];
            String sitetype=value.split("\t")[6];
            String uppack=value.split("\t")[7];
            String downpack=value.split("\t")[8];
            String upload=value.split("\t")[9];
            String download=value.split("\t")[10];
            String httpstatus=value.split("\t")[11];
            Put put=new Put(row.getBytes());
            put.addColumn("dianxin".getBytes(), "tel".getBytes(), tel.getBytes());
            put.addColumn("dianxin".getBytes(), "apmac".getBytes(), apmac.getBytes());
            put.addColumn("dianxin".getBytes(), "acmac".getBytes(), acmac.getBytes());
            put.addColumn("dianxin".getBytes(), "host".getBytes(), host.getBytes());
            put.addColumn("dianxin".getBytes(), "sitetype".getBytes(), sitetype.getBytes());
            put.addColumn("dianxin".getBytes(), "uppack".getBytes(),uppack.getBytes());
            put.addColumn("dianxin".getBytes(), "downpack".getBytes(), downpack.getBytes());
            put.addColumn("dianxin".getBytes(), "upload".getBytes(), upload.getBytes());
            put.addColumn("dianxin".getBytes(), "download".getBytes(), download.getBytes());
            put.addColumn("dianxin".getBytes(), "httpstatus".getBytes(), httpstatus.getBytes());
            arg2.write(NullWritable.get(), put);
        }
    }
}

package hbasemapperreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class HbaseJob {
    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();
        //在本地直接调用,执行过程在服务器上(真正企业运行环境)  在src下加hadoop的配置文件
        conf.set("mapred.jar", "E:\\hbase.jar");
        conf.set("hbase.zookeeper.quorum", "node22:2181,node33:2181,node44:2181");
        conf.set(TableOutputFormat.OUTPUT_TABLE, "dianhuajilu");
        Job job;
        try {
            job = Job.getInstance(conf);
            TableMapReduceUtil.addDependencyJars(job);
            job.setJarByClass(HbaseJob.class);
            job.setJobName("hbasemapper");
            job.setMapperClass(HbaseMapper.class);
            job.setReducerClass(HbaseReduce.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputFormatClass(TableOutputFormat.class);
            FileInputFormat.addInputPath(job, new Path("/dianxinin/"));
            boolean b = job.waitForCompletion(true);
            if(b){
                System.out.println("执行成功");
            }else{
                System.out.println("执行失败");
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        
        
    }

}

 

mapreduce 生成hfile 代码

public class HBaseMR extends Configured implements Tool{

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new HBaseMR(), args);
        System.exit(status);

    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration configuration = this.getConf();
        Job job=Job.getInstance(configuration);
        job.setJarByClass(HBaseMR.class);
        job.setJobName(this.getClass().getSimpleName());
        // set input and set mapper
        FileInputFormat.setInputPaths(job, new Path("hdfs://bigdata01:8020/user/xiaozhou/csv"));
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(HBaseMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        //set reduce
        job.setReducerClass(PutSortReducer.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);
        //生成hfile路径
        FileOutputFormat.setOutputPath(job, new Path("/user/xiaozhou/hbase1/hfileoutput"));
        HTable table=new HTable(configuration, "person4".getBytes());
        
        HFileOutputFormat2.configureIncrementalLoad(job, table);
        job.setNumReduceTasks(1);   // at least one, adjust as required
                
        boolean b = job.waitForCompletion(true);
        
        return b?0:1;
    }

}

public class HBaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        String rowkey = split[0];
        String name = split[1];
        String age = split[2];
        String sex = split[3];
        String address = split[4];
        String phone = split[5];
        Put put=new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(phone));
        context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put);
    }
}

用completebulkload 把hfile加载到hbase中

./yarn jar /opt/moduels/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2-tests.jar completebulkload hdfs://bigdata01:8020/user/xiaozhou/hbase1/hfileoutput person4
 

© 著作权归作者所有

xiaozhou18
粉丝 4
博文 44
码字总数 89781
作品 0
哈尔滨
程序员
私信 提问
利用BulkLoad导入Hbase表

1、插入HBase表传统方法具有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使用的是TableOutputFormat方式,在map/reduce中直接生成put对象写入HBase,该方式在大量数据...

混绅士
2018/06/28
0
0
Hive Hbase区别 整理

Hive是为了简化编写MapReduce程序而生的,使用MapReduce做过数据分析的人都知道,很多分析程序除业务逻辑不同外,程序流程基本一样。在这种情况下,就需要Hive这样的用戶编程接口。Hive本身不...

李超
2015/04/17
334
0
Apache HBase 1.2.7 发布,分布式数据库

Apache HBase 1.2.7 发布了,HBase 1.2.7 是 HBase 1.2 系列中的最新维护版本,继续以向 Hadoop 和 NoSQL 社区提供稳定、可靠的数据库为目标。此版本包括 250 多个自 1.2.6 以来的错误修复。...

局长
2018/09/27
1K
1
mapreduce操作hbase报错NoClassDefFoundError

下面是具体的报错信息,执行方式:/opt/cloudera/parcels/CDH/bin/hadoop jar mapreduce.jar 查了官方文档,可以通过制定hadoop_classpath实现,但是我效,操作如下: 第一步:指定环境变量 ...

skysky
2015/05/27
3.1K
2
hadoop--Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别

Pig 一种操作hadoop的轻量级脚本语言,最初又雅虎公司推出,不过现在正在走下坡路了。当初雅虎自己慢慢退出pig的维护之后将它开源贡献到开源社区由所有爱好者来维护。不过现在还是有些公司在...

寒月谷
2018/08/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

WOW.js 使用教程 页面动画

使用教程 wow.js依赖于animate.css,首先需要在 head内引入animate.css或者animate.min.css。 <link rel="stylesheet" href="css/animate.css"> 引入wow.js或者wow.min.js,然后js文件里再写......

东东笔记
22分钟前
4
0
JavaMail实现简单邮件开发——163邮箱

1、打开 P0P3/SMTP/IMAP 2、开发工具类 CodeUtils.java package com.demo.mymail.utils;import java.util.UUID;public class CodeUtils { public static String generateUn......

开源中国首席碉堡了
23分钟前
5
0
Windows上安装Docker

Windows Docker 安装 win7、win8 系统 win7、win8 等需要利用 docker toolbox 来安装,国内可以使用阿里云的镜像来下载,下载地址:http://mirrors.aliyun.com/docker-toolbox/windows/dock...

城市之雾
28分钟前
3
0
jdk 设置setConnectionTimeout和SetReadTimeout的意义(详细)

先看例子吧 客户端: package top.quartz.util;/** * description: //connect timeout 是建立连接的超时时间; * //read timeout,是传递数据的超时时间。 * ...

hexiaoming123
28分钟前
17
0
Bootstrap Table含有按钮时表格不对齐

使用Bootstrap Table的时候,在含有按钮的情况,有时候表格的线垂直方向上不对齐 解决方法: (1)调整表格的高度小些或者大些(至于为什么,并不清楚,调整高度后就对齐了) (2)加setTime...

tianyawhl
34分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部