[HIVE] UDF解析及自定义UDF

原创
2018/11/21 11:48
阅读数 2.4K

一、UDF

内置的UDF,一般分为两类,extends UDF 、 extends GenericUDF.

GenericUDF Vs UDF

  1. 可以接受复杂的参数类型,返回复杂类型
  2. 可以接受变长参数个数(参数数组)

二、

UDF相对简单,这样只介绍GenericUDF

继承GenericUDF,需要实现三个函数。

最好在class上添加@Description注解,可以介绍该UDF的使用说明

/*
 仅仅会调用一次, 该方法的作用是对入参的个数、格式的校验,并指定出参的类型。

入参为ObjectInspector[],是函数参数的类型,这里可以做校验,如参数个数,参数类型等等校验
出参为ObjectInspector,即函数返回值的类型。

*/
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException{
}

/*
每条上游的记录,都会被执行一次。

入参为DeferredObject[] arguments 即为实际的值,可以根据不同类型的ObjectInspector/ListObjectInspector/MapObjectInspector...获取到对应的值

出参,即为实际的返回值。
*/
public Object evaluate(DeferredObject[] arguments) throws HiveException{
}

/*

*/
public String getDisplayString(String[] children){
}

三、ScoreUDF

  score.name<string> score.score_list<array<map<string,int>>>
  A [{"math":100,"english":90,"history":85}]
  B [{"math":95,"english":80,"history":100}]
  C [{"math":80,"english":90,"history":100}]

计算每个人的总分,即 A -> 275, B -> 275 , C -> 270

 

 


import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyMap;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.util.ArrayList;

public class ScoreUDF extends GenericUDF {
    // 1.输入变量定义
    private ObjectInspector nameObj;
    private ListObjectInspector listoi;
    private MapObjectInspector mapOI;

    // 尽量的避免new Object,
    private ArrayList<Object> valueList = new ArrayList<Object>();
    Object[] e = new Object[2];

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // 参数的长度校验
        if (arguments.length != 2) {
            throw new UDFArgumentLengthException("ScoreUDF() requires 2 argument, got " + arguments.length);
        }

        // 第一个参数的类型校验
        if (!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)) {
            throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + arguments[0].getCategory() + " is passed.");
        } else {
            if (!((PrimitiveObjectInspector) arguments[0]).getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
                throw new UDFArgumentTypeException(0, "Only primitive string type arguments are accepted but " + ((PrimitiveObjectInspector) arguments[0]).getPrimitiveCategory() + " is passed.");
            }
        }

        nameObj = (ObjectInspector) arguments[0];

        // 第二个参数的类型校验
        if (!arguments[1].getCategory().equals(ObjectInspector.Category.LIST)) {
            throw new UDFArgumentTypeException(0, "Only list type arguments are accepted but " + arguments[1].getCategory() + " is passed.");
        } else {
            if (!((ListObjectInspector) arguments[1]).getListElementObjectInspector().getCategory().equals(ObjectInspector.Category.MAP)) {
                throw new UDFArgumentTypeException(1, "Only map type arguments are accepted but " + ((ListObjectInspector) arguments[1]).getListElementObjectInspector().getCategory() + " is passed.");
            }
        }

        listoi = (ListObjectInspector) arguments[1];
        mapOI = ((MapObjectInspector) listoi.getListElementObjectInspector());


        //返回值字段名,可任意
        ArrayList structFieldNames = new ArrayList();
        ArrayList structFieldObjectInspectors = new ArrayList();
        structFieldNames.add("name");
        structFieldNames.add("totalScore");

        //返回值字段类型
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);


        e[0] = new Text();
        e[1] = new IntWritable();

        // 设置返回类型,为 string,int
        return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {

        // 获取第一个参数的实际值。
        LazyString LName = (LazyString) (arguments[0].get());
        String strName = ((StringObjectInspector) nameObj).getPrimitiveJavaObject(LName);

        // 获取第二个参数的实际值。
        int nelements = listoi.getListLength(arguments[1].get());

        int nTotalScore = 0;
        valueList.clear();

        //业务逻辑计算
        for (int i = 0; i < nelements; i++) {
            LazyMap LMap = (LazyMap) listoi.getListElement(arguments[1].get(), i);
            //获取map中的所有value值
            valueList.addAll(mapOI.getMap(LMap).values());
            for (int j = 0; j < valueList.size(); j++) {
                nTotalScore += Integer.parseInt(valueList.get(j).toString());
            }
        }

        //设置返回值
        ((Text) e[0]).set(strName);
        ((IntWritable) e[1]).set(nTotalScore);
        return e;
    }

    @Override
    public String getDisplayString(String[] children) {
        return null;
    }
}

四、

UDF一般在SelectOperator中使用,对有UDF函数,进行evaluate计算。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部