文档章节

Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析

_夜枫
 _夜枫
发布于 2017/03/21 11:38
字数 1355
阅读 37
收藏 0

Logstash是一款开源日志收集处理框架,有各种不同的input、filter、output插件,用户使用这些插件可以将各种数据源导入到其他系统。
logstash-output-datahub插件,实现将数据导入DataHub的功能,通过简单的配置即可完成数据采集和向DataHub的传输任务。
结合StreamCompute(Galaxy)用户可以方便的完成流式数据从采集,传输,开发到结果数据存储与展示的整套解决方案。
同时,还可以通过创建Collector同步任务将数据同步到MaxCompute(ODPS),之后在MaxCompute上进行完备的数据开发工作。

接下来,会将各个流程步骤在文章中作详细描述,以帮助用户使用Logstash+DataHub+StreamCompute/MaxCompute快速构建起自己的流式数据应用。

数据通道

DataHub服务是阿里云的基于飞天开发的pubsub服务;
创建用于数据采集与传输的DataHub Topic是我们的第一步。

Endpoint列表

公共云DataHub服务Endpoint列表:

公有网络 经典网络ECS Endpoint VPC ECS Endpoint
http://dh-cn-hangzhou.aliyuncs.com http://dh-cn-hangzhou.aliyun-inc.com http://dh-cn-hangzhou-vpc.aliyuncs.com

基本概念

首先,明确DataHub中的几个概念,具体可参见DataHub基本概念

  • Shard: Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态 : "Opening" - 启动中,"Active" - 启动完成可服务
  • Lifecycle: 表示一个Topic中写入数据可以保存的时间,以天为单位
  • Record: 用户数据和DataHub服务端交互的基本单位
  • Schema: 描述Record必须遵守的格式,以及每个字段的类型,包括:bigint、string、boolean、double和timestamp

创建Topic

目前DataHub提供的工具包括Datahub Java SDK和DataHub webconsole,另外console还处于试用阶段,若有需要可联系我们提供。

  • Webconsole

用户可在webconsole上完成对所属资源的基本操作,包括创建、查看、删除Topic以及数据抽样等。在webconsole中创建Topic如下所示:

创建Topic

  • SDK

依次调用以下接口来完成Project和Topic的创建,SDK的一些基本接口可参考SDK基本说明

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.3.0-public</version>
</dependency>

public class DatahubClient {
    /** * 初始化DatahubClient, * @param conf Datahub的配置信息,包括用户的账号信息和datahub endpoint */
    public DatahubClient(DatahubConfiguration conf);

    /** * 创建Datahub topic * @param projectName 该topic所属的project * @param topicName 要创建的topic名字 * @param shardCount 指定该topic的shard数量 * @param lifeCycle 数据回收时间 * @param recordType 该topic的record类型,包括TUPLE和BLOB * @param recordSchema 当recordType为TUPLE时,需要指定schema * @param desc topic的描述信息 */
    public createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String desc);
}


数据采集

由于DataHub提供创建具有schema的Topic的功能,所以用户在使用logstash将数据采集到datahub时,可同时完成对原始数据清洗工作。这样在后续的数据分析工作中,用户能更加方便的进行数据开发。

安装

$ {LOG_STASH_HOME}/bin/plugin install --local logstash-output-datahub-1.0.0.gem
  • 直接下载免安装版logstash(下载地址)。 解压即可使用。
$ tar -xzvf logstash-with-datahub-2.3.0.tar.gz
$ cd logstash-with-datahub-2.3.0

配置信息

我们以一条典型的日志为例,说明如何配置logstash和datahub topic.

示例日志为:

20:04:30.359 [qtp1453606810-20] INFO  AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms

对应的Datahub Topic的schema定义为:

字段名称 字段类型
request_time STRING
thread_id STRING
log_level STRING
class_name STRING
request_id STRING
detail STRING

Logstash配置文件为:

input {
    file {
        path => "${APP_HOME}/log/bayes.log"
        start_position => "beginning"
    }
}

filter{
    # 对每一条日志message进行分割,并将各分片指定对应的tag
    # 若将整条日志作为Topic的一个字段,可创建只包含(message string)字段的Topic,从而不用配置grok filter
    grok {
        match => {
           "message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-(?<detail>.+)"
        }
    }
}

output {
    datahub {
        access_id => ""
        access_key => ""
        endpoint => ""
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/u1/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

启动logstash数据采集

使用命令启动logstash开始数据采集

logstash -f 上述配置文件地址

可使用参数 -b 指定每次batch大小,即每次请求的记录条数,可进行性能调试

# 缓存1000条数据后发送,不指定时默认为125(logstash的默认配置)
logstash -f 上述配置文件地址 -b 1000

数据分析

目前DataHub和计算引擎StreamCompute(Galaxy)和MaxCompute(ODPS)已打通。

在StreamCompute中,可以通过配置DataHub数据源,直接进行数据开发,写入DataHub的数据会被StreamCompute订阅并进行实时计算。

同时,通过创建同步到MaxCompute的Collector,可以将DataHub数据同步到MaxCompute,从而在MaxCompute中进行数据开发。

StreamCompute

在StreamCompute中注册DataHub数据源(帮助文档)

在StreamCompute中查看或使用DataHub数据(帮助文档)

MaxCompute

可以通过创建Connector,将DataHub数据导入到MaxCompute(ODPS).
在Webconsole创建Connector是一件方便的事情,(webconsole地址)。如果有很多topic或者topic的field很多,不方便在页面上手动操作,也可以使用SDK。

创建Connector

创建Connector之前,用户必须已创建好MaxCompute的Table,并且所使用的账号必须具备该MaxCompute Project的CreateInstance权限和归档ODPS表的Desc、Alter、Update权限。
在webconsole创建Connector步骤可参考创建Connector.

欢迎加入MaxCompute钉钉群讨论
42559c7dde62e4d333c90e02efdf416257a4be27_jpeg

本文转载自:https://yq.aliyun.com/articles/61766

_夜枫
粉丝 10
博文 506
码字总数 0
作品 0
朝阳
后端工程师
私信 提问
数据进入Maxcompute的N种方式,大数据实战Demo系统数据上云实践

2018 “MaxCompute开发者交流”钉钉群直播分享,由阿里云数据技术专家彬甫带来以“数据进入MaxCompute的N种方式”为题的演讲。本文讲述了在阿里云内部开发了一个实战Demo系统,它能够实现自动...

云迹九州
2018/07/27
0
0
【干货索引】阿里云大数据计算服务MaxCompute与生态系统的融合

摘要: MaxCompute大家都不陌生,之前产品名称叫ODPS,之后随国际化而更名。从支持阿里集团内部99%数据业务到计算能力对外输出,帮助政府、互联网公司、金融等进行大数据项目服务,使得数据变...

阿里云云栖社区
2018/01/12
25
0
阿里云大数据利器之-使用sql实现流计算做实时展现业务( flume故障转移版 )

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以。那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现。本文就用阿里云产品简单实现了一个...

上单
2017/07/25
0
0
【大数据】odps数据迁移方式

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/binggetong/article/details/82762289 1、按迁移工具分类 一共分为2种:Tunnel批量数据通道、DataHub实时通道...

Elsa晓冰
2018/09/18
0
0
玩转大数据系列之一:数据采集与同步

数据的采集和同步,是先将数据从设备、或者本地数据源采集、同步到阿里云上,然后在阿里云上对数据进行分析和处理,最终完成您的业务要求。本文向您介绍阿里云各产品的数据采集和同步的操作实...

阿里云云栖社区
01/07
162
0

没有更多内容

加载失败,请刷新页面

加载更多

Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
昨天
2
0
Linux创建yum仓库

第一步、搞定自己的光盘 #创建文件夹 mkdir -p /media/cdrom #挂载光盘 mount /dev/cdrom /media/cdrom #编辑配置文件使其永久生效 vim /etc/fstab 第二步,编辑yun源 vim /ect yum.repos.d...

究极小怪兽zzz
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部