文档章节

hive指定多个字符作为列分隔符

秋叶不落
 秋叶不落
发布于 2014/02/18 14:58
字数 571
阅读 7956
收藏 2

hive创建表指定分隔符,不支持多个字符作为分隔符,如果想使用多个字符作为分割符的话就需要实现InputFormat.主要重写next方法,代码如下


package hiveStream;
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
public class MyHiveInputFormat extends TextInputFormat implements
        JobConfigurable {

    public RecordReader<LongWritable, Text> getRecordReader(
            InputSplit genericSplit, JobConf job, Reporter reporter)
            throws IOException {
        reporter.setStatus(genericSplit.toString());
        return new MyRecordReader((FileSplit) genericSplit, job);
    }
}



package hiveStream;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;

public class MyRecordReader implements RecordReader<LongWritable, Text> {

    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader lineReader;
    int maxLineLength;

    // 构造方法
    public MyRecordReader(FileSplit inputSplit, Configuration job)
            throws IOException {
        maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
                Integer.MAX_VALUE);
        start = inputSplit.getStart();
        end = start + inputSplit.getLength();
        final Path file = inputSplit.getPath();
        // 创建压缩器
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);
        // 打开文件系统
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(file);
        boolean skipFirstLine = false;

        if (codec != null) {
            lineReader = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            if (start != 0) {
                skipFirstLine = true;
                --start;
                fileIn.seek(start);
            }
            lineReader = new LineReader(fileIn, job);
        }

        if (skipFirstLine) {
            start += lineReader.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    public MyRecordReader(InputStream in, long offset, long endOffset,
            int maxLineLength) {
        this.maxLineLength = maxLineLength;
        this.start = offset;
        this.lineReader = new LineReader(in);
        this.pos = offset;
        this.end = endOffset;
    }

    public MyRecordReader(InputStream in, long offset, long endOffset,
            Configuration job) throws IOException {
        this.maxLineLength = job.getInt(
                "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
        this.lineReader = new LineReader(in, job);
        this.start = offset;
        this.end = endOffset;
    }

    @Override
    public void close() throws IOException {
        if (lineReader != null)
            lineReader.close();
    }

    @Override
    public LongWritable createKey() {
        return new LongWritable();
    }

    @Override
    public Text createValue() {
        return new Text();
    }

    @Override
    public long getPos() throws IOException {
        return pos;
    }

    @Override
    public float getProgress() throws IOException {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
    }

    @Override
    public boolean next(LongWritable key, Text value) throws IOException {
        while (pos < end) {
            key.set(pos);
            int newSize = lineReader.readLine(value, maxLineLength,
                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
                            maxLineLength));
            // 把字符串中的"##"转变为"#"
            String strReplace = value.toString().replace("##", "#");
            Text txtReplace = new Text();
            txtReplace.set(strReplace);
            value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
            if (newSize == 0)
                return false;
            pos += newSize;
            if (newSize < maxLineLength)
                return true;

        }
        return false;
    }
}


建表语句:自定义 outputformat/inputformat 后,在建表时需要指定 outputformat/inputformat

create external table testHiveInput( id int,name string,age int) row format delimited fields terminated by '|' stored as INPUTFORMAT 'hiveStream.MyHiveInputFormat'  
OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/user/hdfs/hiveInput';


测试数据:

1##Tom##22
2##Jerry##22
3##Jeny##22


测试代码( 通过jdbc来查询数据):

public static void testHive() throws Exception {
        String sql = "select id,name,age from testHiveInput";
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        String url = "jdbc:hive2://xxx.xxx.xxx.xxx:10000";
        Connection conn = DriverManager.getConnection(url, "hive", "passwd");
        Statement stmt = conn.createStatement();
        stmt.execute("add jar /usr/lib/hive/lib/hiveInput.jar");
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()) {
            System.out.println(rs.getString("id")+" "+ rs.getString("name") + " " + rs.getString("age"));
        }
    }





© 著作权归作者所有

秋叶不落
粉丝 0
博文 1
码字总数 571
作品 0
广州
私信 提问
加载中

评论(1)

cherish_DWJ
cherish_DWJ
请问hive单表字段又上限吗?最多支持多少?
详解hive的列分隔符和行分隔符的使用

hive中在创建表时,一般会根据导入的数据格式来指定字段分隔符和列分隔符。 一般导入的文本数据字段分隔符多为逗号分隔符或者制表符(但是实际开发中一般不用着这种容易在文本内容中出现的的...

小兔纸乖乖
2018/08/11
640
0
sqoop实战(五)

1 Importing Data Directly into Hive 关系型数据库-----hive sqoop import --connect jdbc:mysql://192.168.130.221/sqoop --username root --password root --table tblplace --hive-impor......

发光体
2016/02/26
176
0
Pig、Hive 自定义输入输出分隔符以及Map、Array嵌套分隔符冲突问题

PIG中输入输出分隔符默认是制表符t,而到了hive中,默认变成了八进制的001, 也就是ASCII: ctrl - A Oct Dec Hex ASCIIChar 001 1 01 SOH (start of heading) 官方的解释说是尽量不和文中的...

大数据之路
2012/10/21
5.7K
0
《Hadoop权威指南》书摘-关于Hive

转载请注明出处 独立博客:http://wangnan.tech 简书:http://www.jianshu.com/u/244399b1d776** 知乎:https://zhuanlan.zhihu.com/ghoststories 简介 hive是一个构建在hadoop上的数据仓框架...

GhostStories
2018/08/08
0
0
hive官方文档翻译

概念 Hive是什么 Hive是一个基于Apache Hadoop的数据仓库。对于数据存储与处理,Hadoop提供了主要的扩展和容错能力。 Hive设计的初衷是:对于大量的数据,使得数据汇总,查询和分析更加简单。...

qi49125
2017/09/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

编程作业20190210900169

1编写一个程序,提示用户输入名和姓,然后以“名,姓”的格式打印出来。 #include <stdio.h>#include <stdlib.h> int main(){ char firstName[20]; char lastName[20]; print......

1李嘉焘1
6分钟前
0
0
补码的优点及原理分析

只讨论整数 1.计算机内部为什么没有减法器? 减法运算本身其实就是加法,如x - y即x +(-y),所以只需要将负数成功表示出来并可以参加加法运算,那加法器就可同时实现“+”和“-”的运算。这...

清自以敬
21分钟前
54
0
Docker 可视化管理 portainer

官网安装指南: https://portainer.readthedocs.io/en/latest/deployment.html docker-compose.yml 位置,下载地址:https://downloads.portainer.io/docker-compose.yml...

Moks角木
48分钟前
5
0
Spring Security 实战干货:必须掌握的一些内置 Filter

1. 前言 上一文我们使用 Spring Security 实现了各种登录聚合的场面。其中我们是通过在 UsernamePasswordAuthenticationFilter 之前一个自定义的过滤器实现的。我怎么知道自定义过滤器要加在...

码农小胖哥
51分钟前
7
0
常见分布式事务解决方案

1 微服务的发展 微服务倡导将复杂的单体应用拆分为若干个功能简单、松耦合的服务,这样可以降低开发难度、增强扩展性、便于敏捷开发。当前被越来越多的开发者推崇,很多互联网行业巨头、开源...

asdf08442a
52分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部