Presto UDF自定义函数开发

原创
2017/02/09 11:36
阅读数 6.7K

1.初识Presto函数

在 Presto 中,函数大体分为三种:scalar,aggregation 和 window 类型。

scalar 就是标量函数,简单来说就是 Java 中的一个静态方法,本身没有任何状态.

aggregation 就是需要累积状态的函数,例如 COUNT 、 AVG 等。开发 Aggregation 函数的难点在于维护状态。

window就是窗口函数,可以理解为分析型函数,即将某一列多行中的值按照一定的聚合规则进行计算。

2.如何开发UDF

2.1scalar函数

 scalar 就是标量函数,简单来说就是 Java 中的一个静态方法,本身没有任何状态。例如,如下函数将 timestamp 类型的时间戳换成 类似 2016-08-16 的字符串: 

public class HujjFunctions {

    public static final String DATE_FORMAT = "yyyy-MM-dd";

    @ScalarFunction("demo_to_date")//函数名称
    @Description("hive to_date function")//函数描述
    @SqlType(StandardTypes.VARCHAR)//函数返回类型
    public static Slice to_date_bigint(@SqlType(StandardTypes.BIGINT) long input) {  //函数入参类型
        final DateFormat format = new SimpleDateFormat(DATE_FORMAT);
 return Slices.utf8Slice(format.format(new Date(input)));
 }

}

开发过程很简单,

  • 首先,定义一个 Java 类, 用 @ScalarFunction 的 Annotation 标记实现业务逻辑的静态方法

  • 使用 @Description 描述函数的作用,这里的内容会在  SHOW FUNCTIONS 中显示

  • 使用 @SqlType 标记函数的返回值类型,这里返回字符串,因此是  StandardTypes.VARCHAR

  • Java 方法的返回值必须使用 Presto 内部的序列化方式,因此字符串类型必须返回 Slice , 使用  Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型 

2.2.aggregation 函数

开发 Aggregation 函数的难点在于维护状态。设想在单机状态下,我们仅仅使用一个变量就可以存储状态。但 Presto 是一个分布式的计算引擎,数据分布在多个节点,状态数据需要在节点间传输,因此状态数据需要被序列化成 Presto 的内部格式才可以被传输。Presto 把 Aggregation 函数分解成三个步骤执行:

input(state, data)
combine(state1, state2)
output(final_state, out)

假设一次 Aggregation 计算仅涉及三个节点,如下图所示:

  • 首先,在节点1、2、3分别执行多次 input(state, data) 函数,分别得到  state1 、  state2 、 state3 。

  • 在 state1state2 上执行  combine(state1, state2) 函数,得到  merged_state1 的中间结果。

  • 在 merged_state1state3 上执行  combine 函数,得到最终的计算结果: final_state 。

  • 最后,调用 output(final_state, out) 函数,将最终结果序列化,Aggregation 函数执行完成。 

 

接下来,我们来实现一个简单的avg函数。

首先需要一个保存状态的state类,注意:貌似不能直接使用presto-main包里面的LongAndDoubleState 类,会有错误,需要自定义。

最后,说一说 Aggregation 函数的状态的保存。在 Presto 的 Aggregation 函数中,状态的保存必须使用 AccumulatorState 接口的子接口,并使用  @AccumulatorStateMetadata 提供序列化(stateSerializerClass指定)和 Factory 类信息(stateFactoryClass指定)。

public interface LongAndDoubleState extends AccumulatorState {

    long getLong();

 void setLong(long value);

 double getDouble();

 void setDouble(double value);
}

其次,实现自己的函数 

  •  首先,定义一个 Java 类,使用 @AggregationFunction 标记为 Aggregation 函数

  • 使用 @InputFunction 、  @CombineFunction 、 @OutputFunction 分别标记计算函数、合并结果函数和最终输出函数 

@AggregationFunction("demo_avg")//函数名称
@Description("demo avg")
public class HujjDemoAggregationFunction {

    @InputFunction
 public static void input(LongAndDoubleState state, @SqlType(StandardTypes.BIGINT) long value){
        //记录计算平均值所需内容
 //总记录次数
 state.setLong(state.getLong() + 1);
 //总和
 state.setDouble(state.getDouble() + value);
 }

    @CombineFunction
 public static void combine(LongAndDoubleState state,LongAndDoubleState otherState){
        state.setLong(state.getLong() + otherState.getLong());
 state.setDouble(state.getDouble() + otherState.getDouble());
 }

    @OutputFunction(StandardTypes.DOUBLE)
    public static void output(LongAndDoubleState state, BlockBuilder out){
        long count = state.getLong();
 if(count == 0){
            out.appendNull();
 }else{
            double value = state.getDouble();
 out.writeLong((long) (value/count));
 }
    }

}

3.如何注册UDF

3.1presto的插件机制

Presto 的插件(Plugin)机制,是 Presto 能够整合多种数据源的核心。通过实现不同的 Plugin,Presto 允许用户在不同类型的数据源之间进行 JOIN 等计算。Presto 内部的所有数据源都是通过插件机制实现, 例如 MySQL、Hive、HBase等。Presto 插件机制不仅通过加载 Connector 来实现不同数据源的访问,还通过加载 FunctionFactory 来实现 UDF 的加载。 Presto 的 Plugin 遵循 Java 中的 ServiceLoader 规范, 实现非常简单。 

  • 实现一个 Plugin 接口的子类,例如:

public class HujjFunctionsPlugin implements Plugin{

    private static Logger logger = LoggerFactory.getLogger(HujjFunctionsPlugin.class);

 @Override
 public Set<Class<?>> getFunctions() {
        return ImmutableSet.<Class<?>>builder()
                .add(HujjFunctions.class)
                .add(HujjDemoAggregationFunction.class)
                .build();
 }
}
  • 在 src/main/resources/META-INF/services 中新建名为  com.facebook.presto.spi.Plugin 的文本文件,内容就是之前实现  Plugin 接口的类名

  • 打包成 jar 时记得将 META-INF/services 打包进去。

  • 将代码连同项目的依赖,打包成一个文件夹,放到 ${PRESTO_HOME}/plugin/${你的插件名称}/ 中重启 Presto 即可。启动日志中会看到类似 :

表示插件部署成功,注意:所以worker节点都要部署。

4.验证udf

4.1验证demo_to_date

SQL:

select task_id,demo_to_date(createtime) from hive.ods.mk_task limit 10

RESULT:

 

4.2验证demo_avg

SQL:

select demo_avg(sign_num) from hive.ods.mk_task limit 10

RESULT:

 

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部