文档章节

基于自定义日志打印的UDAF调试

_夜枫
 _夜枫
发布于 2017/04/05 22:56
字数 2284
阅读 34
收藏 0

看到最近有一些用户,代码在本地IDE环境里调试成功了后,到线上调试出现结果不符合预期的情况。因为IDE里无法模拟多个worker进行分布式调试UDAF的场景,所以有一些BUG可能需要到线上用一些简单的测试数据进行调试。这里用最简单的手工打印日志的方法,针对代码调试中最麻烦的UDAF的例子做一次调试。通过问题的定位和解决,希望能给大家在面对UDF的线上调试的时候提供一些思路。

初始化

首先,线上的真实数据可能非常多,千万不要直接对着上亿条数据直接调试,否则很难定位到原因。面对线上的问题,最好先根据数据情况,简化计算场景。比如我这里,就先把测试数据简化成:

drop table if exists testUDAF;
create table testUDAF(
    str string
) partitioned by (ds string);

--dual表是我早前已经创建好的就一条数据的表
insert overwrite table testUDAF partition (ds)
select str,ds from (
    select 'a' as str,1 as ds from dual union all
    select 'a' as str,1 as ds from dual union all
    select 'b' as str,1 as ds from dual union all
    select 'a' as str,2 as ds from dual union all
    select 'c' as str,2 as ds from dual union all
    select 'c' as str,2 as ds from dual
) a;

select * from testUDAF;

可以看到模拟数据是
screenshot
这样一共6一条记录,分布在2个不同的分区里。
我们希望UDAF的计算结果能类似:

SELECT wm_concat(',', concat(str, ':', cnt)) AS ret
FROM (
    SELECT str, COUNT(*) AS cnt
    FROM testUDAF
    GROUP BY str
) a;

screenshot

代码编写

在本地已经调试好的JAVA代码如下:

package com.aliyun.odps.udaf;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;

import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.annotation.Resolve;

@Resolve({"string->string"})
public class MySum extends Aggregator {
    private static final String rd = ":";
    private static final String fd = ",";

    private static class SumBuffer implements Writable {
        private HashMap<String, Long> dict = new HashMap<>();

        @Override
        public void write(DataOutput out) throws IOException {
            String dictStr = dict.toString();
            out.writeUTF(dictStr);
        }

        /* * 做了个简单的反序列化 * */
        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
        }
    }


    @Override
    public Writable newBuffer() {
        return new SumBuffer();
    }

    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        SumBuffer iterateDictBuffer = (SumBuffer) buffer;
        String content;

        if (args[0] instanceof NullWritable) {
            content = "Null";
        } else {
            content = args[0].toString();
        }
        Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;

        iterateDictBuffer.dict.put(content, count + 1);
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;
        SumBuffer p = (SumBuffer) partial;

        for (Entry<String, Long> entry : p.dict.entrySet()) {
            Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
            buf.dict.put(entry.getKey(), count);
        }
    }


    @Override
    public Writable terminate(Writable buffer) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;

        StringBuilder sb = new StringBuilder();
        for (Entry<String, Long> entry : buf.dict.entrySet()) {
            sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
        }

        Text resault = new Text();
        resault.set(sb.substring(0,sb.length()-1));
        return resault;
    }
}

因为逻辑不复杂,所以也没有添加更多的注释。可以看到用一个Map来存放中间数据,并用toString来做序列化,然后写了段简单的代码进行反序列化。到了terminate后,拼成需要的结果再返回。

打包后,注册一下函数并测试一下结果:

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >create function mysum as com.aliyun.odps.udaf.MySum using mysum.jar;
Success: Function 'mysum' have been created.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+

排查思路

可以看到,这里c的值不知道为什么变成了4,这个是在本地没有发现的问题。还好我们的数据量比较小,所以定位起来比较方便。目前的思路是,我们已经明确输入的数据是什么,也知道我们期望的结果是什么。那么我们首先需要知道,在中间数据的一步步流转的过程中,从哪里开始和我们预期的不一样。定位到是哪里开始数据和预期不符合后,再结合上下文的代码逻辑,定位到问题的原因。

首先我们给代码加上一些异常打印,看看流转过程中的数据分别是什么。通过System.err.println,我们把我们想要的信息打印到stderr里。

    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        SumBuffer iterateDictBuffer = (SumBuffer) buffer;         String content;

        if (args[0] instanceof NullWritable) {
            content = "Null";
        } else {
            content = args[0].toString();
        }
        Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;
        System.err.println("input in iterate:" + content+"\tdict:"+iterateDictBuffer.dict); //拿到原始的输入和当前的状态
        iterateDictBuffer.dict.put(content, count + 1);
        System.err.println("output in iterate:" + iterateDictBuffer.dict);                  //打印iterate输出的内容     }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;         SumBuffer p = (SumBuffer) partial;
        System.err.println("buffer in merge:" + buf.dict);                  //打印merge里的buffer的内容         System.err.println("partial in merge:" + p.dict); //打印merge里的partial的内容
        for (Entry<String, Long> entry : p.dict.entrySet()) {
            Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
            buf.dict.put(entry.getKey(), count);
        }
        System.err.println("output in merge:" + buf.dict);                  //打印merge里的输出的内容
    }


    @Override
    public Writable terminate(Writable buffer) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;         System.err.println("output in terminate:" + buf.dict);                  //打印terminate里的输入的内容
        StringBuilder sb = new StringBuilder();         for (Entry<String, Long> entry : buf.dict.entrySet()) {
            sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
        }
        System.err.println(sb.substring(0,sb.length()-1));                  //打印terminate里的输出的内容
        Text resault = new Text();
        resault.set(sb.substring(0,sb.length()-1));         return resault;
    }

先打印了这么几个方法里。这样打印的思路主要是,看看每次调用的时候的数据输入输出是什么。从而定位到是从哪里开始出现的问题。

打包,替换掉jar包,然后重新调用一下函数,可以看到

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+

结果数据是不变的,但是我们可以看下日志。打开里面的logview,可以看到:
screenshot
里面的日志,2个Map里的日志分别是:

Heap Size: 1024M
input in iterate:a  dict:{}
output in iterate:{a=1}
input in iterate:c  dict:{a=1}
output in iterate:{a=1, c=1}
input in iterate:c  dict:{a=1, c=1}
output in iterate:{a=1, c=2}

Heap Size: 1024M
input in iterate:a  dict:{}
output in iterate:{a=1}
input in iterate:a  dict:{a=1}
output in iterate:{a=2}
input in iterate:b  dict:{a=2}
output in iterate:{a=2, b=1}

看到都是对的,然后看下Reduce里的结果:

Heap Size: 1024M
buffer in merge:{}
partial in merge:{a=1, c=2}
output in merge:{a=1, c=2}
buffer in merge:{a=1, c=2}
partial in merge:{a=2, b=1, c=2}
output in merge:{a=3, b=1, c=4}
output in terminate:{a=3, b=1, c=4}
a:3,b:1,c:4

看一下,partial in merge:{a=2, b=1, c=2} 这条数据不符合预期。照道理说,我们前面输出的是output in iterate:{a=2, b=1},怎么到这里就变成了{a=2, b=1, c=2}了呢。

这种的变化,是在多个worker之间进行传递的时候,我们做了序列号和反序列化,于是我们在这里又打了一些日志:

        @Override
        public void write(DataOutput out) throws IOException {
            String dictStr = dict.toString();
            out.writeUTF(dictStr);
            System.err.println("dict in write:" + dictStr);                 //打印序列化输出
        }

        /* * 做了个简单的反序列化 * */
        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            System.err.println("dictStr in readFields:" + dictStr);                 //打印反序列化输出
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
            System.err.println("dict in readFields:" + dict);                   //打印反序列化输出
        }

重新打包跑一次,这次看到的日志是这样:

--map阶段:
dict in write:{a=1, c=2}
dict in write:{a=2, b=1}

--reduce阶段:
dictStr in readFields:{a=1, c=2}
dict in readFields:{a=1, c=2}
dictStr in readFields:{a=2, b=1}
dict in readFields:{a=2, b=1, c=2}

果然反序列化的时候输出的结果就有问题了。但是从这里还没有明确的证据说明是哪行代码出的问题。看到dict输出的结果不符合预期,我们先看看输入的时候是什么。于是再加一行日志:

        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            System.err.println("dictStr in readFields:" + dictStr);                 //打印反序列化输出
            System.err.println("dict in readFields before put:" + dict);                    //打印反序列化输出
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
            System.err.println("dict in readFields:" + dict);                   //打印反序列化输出
        }

看到这会的reduce阶段日志

dictStr in readFields:{a=1, c=2}
dict in readFields before put:{}
dict in readFields:{a=1, c=2}

dictStr in readFields:{a=2, b=1}
dict in readFields before put:{a=1, c=2}
dict in readFields:{a=2, b=1, c=2}

这下真相大白了。我们第二次调用readFields序列化{a=2, b=1}这个字符串的时候,发现本来应该为空的dict的内容竟然是上次计算后的结果。实际上,在readFields里,相同worker里的SumBuffer被复用了。这种情况下,为了保证计算的准确性,我们可以自己清空一下dict的内容

        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            dict = new HashMap<>();
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
        }
    }

这下终于对了

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:2 |
+-----+

总结

代码还有其他更多可以优化的地方。不过这次为了能简单说明调试的过程,简化代码逻辑,就没在这方面再多下功夫。实际的业务代码里还需要考虑到性能和异常捕捉等问题。

System.err.println这个办法虽然很笨,但是很有效,不是吗?

本文转载自:https://yq.aliyun.com/articles/68489

_夜枫
粉丝 10
博文 506
码字总数 0
作品 0
朝阳
后端工程师
私信 提问
Hive 随谈(六)– Hive 的扩展特性

Hive 是一个很开放的系统,很多内容都支持用户定制,包括: 文件格式:Text File,Sequence File 内存中的数据格式: Java Integer/String, Hadoop IntWritable/Text 用户提供的 map/reduce...

红薯
2010/04/21
3.1K
1
Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版)

在Spark中,也支持Hive中的自定义函数。自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似tochar,todate等 UDAF(User- Defined Aggregation Funcation...

青夜之衫
2017/12/04
0
0
自己动手写Impala UDF

概述 出于对可扩展性和性能的考虑,UDF已变成大数据生态圈查询引擎的必备功能之一,无论是Calcite、Hive、Impala都对其进行支持,但是UDF的支持有利也有弊,好处在于它提供了对某些用户独有需...

Zero零_度
2016/10/12
172
0
14.Spark SQL:UDAF自定义聚合函数实战

UDAF自定义函数实战 UDAF:User Defined Aggregate Function。用户自定义聚合函数。是Spark 1.5.x引入的最新特性。 UDF,其实更多的是针对单行输入,返回一个输出 这里的UDAF,则可以针对多行...

weixin_32265569
2017/11/16
0
0
Apache Hivemall 0.5.2 发布,可扩展的机器学习库

Apache Hivemall 0.5.2 发布了,Apache Hivemall 基于 Hive UDF/UDAF/UDTF,是一个可扩展的机器学习库,运行基于 Hadoop 的数据处理框架,特别是 Apache Hive、Apache Spark 和 Apache Pig。...

h4cd
2018/12/11
648
0

没有更多内容

加载失败,请刷新页面

加载更多

centos7 linuxdeployqt qt5.13.1 打包程序

原文链接:https://www.cnblogs.com/linuxAndMcu/p/11016322.html 一、简介 linuxdeployqt 是Linux下的qt打包工具,可以将应用程序使用的资源(如库,图形和插件)复制到二进制运行文件所在的...

shzwork
16分钟前
3
0
IDEA 配置Springboot项目热部署

实现的方式概述 注意以下的热部署方式在IDEA是默认没有打开自动编译的,手动编译需要快捷键(Ctrl+Shift+F9),自动编译的修改配置如下:(注意刷新不要太快,会有1-2秒延迟) File-Settings-C...

小强的进阶之路
27分钟前
5
0
免费数据分析工具:secsoso

前段时间思考了理想数据分析平台,之后我们根据这个思路开发了spl语言并提供了一个数据分析平台,这个平台主要用在搜索ES,数据库索引中的数据。但后来发现对文件的事后处理也是个非常重要的...

赛克蓝德
29分钟前
4
0
暗黑2不能正常启动?带你轻松使用WIN10运行游戏

暗黑破坏神2这款游戏由于年代比较久远,所以设置启动这方面与现在的大部分游戏有很大差距,由于当初完美运行暗黑2是当年使用最多的XP系统,在使用现在大多数玩家使用的WIN7到WIN10系统常会出...

太空堡垒185
33分钟前
6
0
maven项目对象模型(二)

1.4.4.传递性依赖 一个传递性依赖就是一个依赖的依赖。如果project-a依赖于project-b,而后者接着依赖于project-c,那么project-c就是被认为是project-a的传递性依赖。如果project-c依赖于p...

万建宁
33分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部