文档章节

SPARK全栈 全流程 大数据实战 之 在线异常检测(一)

hblt-j
 hblt-j
发布于 2017/01/02 19:12
字数 1977
阅读 135
收藏 3

###在线异常检测模块设计

好这里开始 ,不用在管所有关于环境的事了,现在可以直面业务主题,直奔逻辑实现,这里以一个千万数据在线实时异常检测的例子,如广告恶意点击排名过滤,网络异常流量安全检测,或电商或其它在线应用中异常行为检测或是黑名单实时坐标跟踪等,来全面展示spark做对接各种数据源,到etl,数据预处理,数据建模,数据集市输出,ods/olap多维展示,或用mlib常用聚分类逻辑回归实现简单模型训练预测分析过程

为了更好展示这个完整的过程,我们做一个特殊设计,分为如下几大块:

  • 第一块、离线处理,整合不同来源或多系统的人员登记信息(etl,预处理,DW,DM)进行分类统计和预测(模型),并展示不同信用分级人在员不同区域的分布情况(OLAP),并建立黑名单共享中 心(ODS),为其它系统提供依据和支持

  • 第二块、实时处理,据黑名单在实时监控系统,或工共出行或地图等应用中,实时监控他们 的坐标和行踪,实现大屏实时动态跟踪展现

  • 第三块、简单演示以智能决策中心为主的BI应用或BI生态链系统的思路,如 对应用 中无记录和信用人员的实时预测并对异常行为进行预警或提示,并对其它辅助系统发送消息,行成联合智能协同 的商业智能系统和数据智能生态

####模块一:离线分类统计不同信 用人员的分布情况

  • 信息采集整合预处理,如下为常用总结:

                          数据来源:                                                   采集方法:                       最终存储介质和对接方法:
         业务数据:主要是以mysql等关系数据库为主                     sqoop                             hbase,hive,hdfs
         日志:主要是各种服务器系统收集来的Log                         fulme/fluent                  接口,hbase,hdfs,文本
         消息:有些理点数据或是智能硬件或采集器等                    kafka                              接口
         爬虫:其它网站论坛app等                             spider,screpy,还有java的hiretrix等    接口,文本,nosql                             
         其它文件:人工收集整理                                                   略                                    excel,文本,本地数据库等
    
      总结:
                到数据处理端无非三种形式:                                                    spark处理方法及函数:
     1>高层接口直接对接(KAFKA,restful API)                              kafkaUtils;flumeUtils;java web apI 
     2>本地和远程(分步式),结构或非数据库                                 sqlconten;jdbc/jdis/mogo等
     3>分步式文件,日志和其它待清理的文本数据                              textFile(file:///...|hdfs://...|http://...);jsonFile(...);reade.json(); read.format()
    

    这里这些辅助采集就不实际安装部署演示了,网上例子也很多,按照样例设计,先2数据库系统和3文本系统模拟数据,演示离线模块各种处理,这里先看整合和预处理

####user数据准备

val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
----------------------------------------------------------------------------------------------------
>sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1e15a466
>import sqlContext.implicits._
//模拟业务系统user表:uid,cardid,name,age,sex,province,city,district  结构性的
case class User(uid:Long,cardid:String,name: String,age:Long,sex:Long)//,Province:String,city:String,district:String) extends java.io.Serializable

----------------------------------------------------------------------------------------------------
>defined class User
val data = Seq.fill(50) {
  val uid=scala.util.Random.nextInt(100)
  val cardid="140581193210922000"+scala.util.Random.nextInt(100)
  val name="p"+scala.util.Random.nextInt(10) 
  val age=scala.util.Random.nextInt(10) 
  val sex=scala.util.Random.nextInt(2)
  val province=scala.util.Random.nextInt(5)
  val city=scala.util.Random.nextInt(10)
  val district=scala.util.Random.nextInt(50)
  User(uid,cardid,name,age,sex)//,province,city,district)
}
val user = sparkContext.parallelize(data).toDF()
user.registerTempTable("user")
user.show()

----------------------------------------------------------------------------------------------------

+---+--------------------+----+---+---+
|uid|              cardid|name|age|sex|
+---+--------------------+----+---+---+
| 59|14058119321092200025|  p0|  3|  1|
| 89|14058119321092200083|  p2|  8|  1|
| 13| 1405811932109220004|  p9|  7|  0|
| 82|14058119321092200043|  p8|  5|  1|
| 74|14058119321092200075|  p0|  4|  1|
| 97|14058119321092200029|  p5|  0|  0|
| 41|14058119321092200080|  p6|  3|  1|
| 20|14058119321092200050|  p4|  9|  1|
| 32|14058119321092200074|  p9|  3|  0|
| 89|14058119321092200066|  p5|  5|  0|
| 50|14058119321092200019|  p8|  3|  0|
| 28|14058119321092200085|  p6|  3|  1|
| 73|14058119321092200074|  p3|  8|  0|
| 98| 1405811932109220008|  p8|  3|  1|
|  1|14058119321092200045|  p1|  7|  1|
|  7|14058119321092200028|  p4|  2|  0|
| 90|14058119321092200099|  p0|  7|  0|
| 68|14058119321092200052|  p6|  8|  1|
| 42|14058119321092200098|  p0|  7|  0|
| 40|14058119321092200027|  p3|  1|  0|
+---+--------------------+----+---+---+
only showing top 20 rows

data: Seq[User1] = List(User1(59,14058119321092200025,p0,3,1), User1(89,14058119321092200083,p2,8,1), User1(13,1405811932109220004,p9,7,0), User1(82,14058119321092200043,p8,5,1), User1(74,14058119321092200075,p0,4,1), User1(97,14058119321092200029,p5,0,0), User1(41,14058119321092200080,p6,3,1), User1(20,14058119321092200050,p4,9,1), User1(32,14058119321092200074,p9,3,0), User1(89,14058119321092200066,p5,5,0), User1(50,14058119321092200019,p8,3,0), User1(28,14058119321092200085,p6,3,1), User1(73,14058119321092200074,p3,8,0), User1(98,1405811932109220008,p8,3,1), User1(1,14058119321092200045,p1,7,1), User1(7,14058119321092200028,p4,2,0), User1(90,14058119321092200099,p0,7,0), User1(68,14058119321092200052,p6,8,1), User1(42,14058119321092200098,p0,7,0), User1(40,14058119321092200027,p3,1,0...
display(user, maxPoints=50)

-----------------------------------------------------------------------------------
uid	cardid	name	age	sex
59	14058119321092200025	p0	3	1
89	14058119321092200083	p2	8	1
13	1405811932109220004	p9	7	0
82	14058119321092200043	p8	5	1
74	14058119321092200075	p0	4	1
97	14058119321092200029	p5	0	0
41	14058119321092200080	p6	3	1
20	14058119321092200050	p4	9	1
32	14058119321092200074	p9	3	0
89	14058119321092200066	p5	5	0
```
```
BarChart(user, Some(("name", "age")), maxPoints=40)

---------------------------------------------------------------------------
res20: notebook.front.widgets.charts.BarChart[org.apache.spark.sql.DataFrame] = <BarChart widget>

40 or more entries total

(Warning: showing only first 40 rows)

```

####**模拟三方证信接口对接**
以下均为模拟未经测试
```scala
//模拟三方证信给的restful api:{cardid::scores::...} 半结构性的
val path="http://www.xxx.com/api/"
val cardid="140581193210922111"
sparkContext.jsonFile(path+cardid).createOrReplaceTempView("scores")

```

####**模拟黑名单不规整文本并预处理**

```scala
//模拟有关部门给的部分黑名单不规整文本报告信息:记录有名字,严重程度和相关不良记录  非结构性的
case class blackManTop(cardid:String,scores:Int,levels:Int)
case class blackMan(cardid:String,levels:Int)
val blackMandf = sc.textFile("examples/src/main/resources/blackMan.txt").filter(_.trim!="")//清理空行
.map(_.split(",")).filter(_.size()==3)//过滤不规整行
.filter(_(1).trim.toInt==0).map(_(1)=10)//补缺省值
.map(p => blackMan(p(0), p(1).trim.toInt)).toDF()
blackMandf.registerTempTable("blackMan")

```

####**整合查询信用分低于100,不同恶劣等级最差的黑名单分步信息**

```scala

// SQL statements can be run by using the sql methods provided by sqlContext.
val blackMans = sqlContext.sql("SELECT u.name,u.age,u.sex,b.levels,s.scores FROM user u, scores s,blackMan b WHERE s.scores<= 100 group by b.levels order by b.levels asc,s.scores desc")
blackMans.registerTempTable("blackManTop")
```

```
display(blackManTop.where(".scores<= 100").groupBy("levels").count(), maxPoints=50)

---------------------------------------------------------------------------------------------
levels	count
0	3
1	6
2	3
3	11
4	4
5	5
6	1
7	5
```
####**统计,多维展现(OLAP)**
事实上多维分析,还有个重要的过程,就是数据建模,需要据需要做特征提取,降维,维度表维护,数据字典建立 等特征管理或治理

####**数据仓库DW,数据集市DM**

  咱们这里一个系统中用的一张表,其实dw仓库是将不同系统的多个数据库或其它相关数据源数据,汇集后按一定总体规划划分成一个个仓库中心,放在分步式文件系统中,使之可以从全局中管理所有来源的数据,如业务系统,渠道,上下游厂商,或地区 等,来总体的存放管理 所有数据,方便管理,统计和分析
    其实数仓和集市有很多架构和治理需要做的事,如仓储规划,维度表管理,元数据管理 ,还有相关辅助模块,如数据补录,增量数据等,这需业务和架构综合考量,这里不在例举
    数据集市是其的问一相关业务和部门下的一子集,也可以是个联合的新子集,如我们这里要建立黑名单信息dm和黑名单信用dm,一般是已经处理过的数据,常放在hbase等用于OLAP分析等

####**黑名单共享中心(ODS)**
ods一般是不同系统间差异导致的标识不统一而产生的一个转换中心,便于不同厂商系统之间共享数据,如电商的sku,不同系统的上游供应商ID,或是不同单位箱框袋个的转换,甚至是统一的日期时间和货币格式等,也可以是一个综合信息格式如这里的最差信用最严重程度的黑名单top10,最为其它系统间共用信息

```scala

// SQL statements can be run by using the sql methods provided by sqlContext.
val blackMantops = sqlContext.sql("SELECT u.name,u.age,u.sex,b.levels,s.scores FROM user u, scores s,blackMan b WHERE s.scores<= 100  order by b.levels ,s.scores limit 10")
blackMantops.registerTempTable("blackManTop10")
```

```
display(blackManTop10.where("age>8").groupBy("age").couter(), maxPoints=50)

---------------------------------------------------------------------------------------------
age	count
0	3
1	6
2	3
3	11
4	4
5	5
6	1
7	5
```

© 著作权归作者所有

hblt-j
粉丝 24
博文 218
码字总数 73000
作品 0
海淀
架构师
私信 提问
SequoiaDB x Spark 新主流架构引领企业级应用

6月,汇集当今大数据界精英的Spark Summit 2017盛大召开,Spark作为当今最炙手可热的大数据技术框架,向全世界展示了最新的技术成果、生态体系及未来发展规划。 巨杉作为业内领先的分布式数据...

巨杉数据库
2017/07/03
6
0
OSC 第 65 期高手问答 — Spark 企业级实战

OSCHINA 本期高手问答(3月23日-3月29日)我们请来了 @王家林 (王家林)为大家解答关于 Spark 开发方面的问题。 王 家林,Spark 亚太研究院院长和首席专家,当今云计算领域最火爆的技术Docke...

叶秀兰
2015/03/23
6.4K
22
Spark(二) -- Spark简单介绍

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45648737 spark是什么? spark开源的类Hadoop MapReduce的通用的并行计算框架 ...

jchubby
2015/05/11
0
0
上海大数据实战开发转型—程序员未来的筹码

活动将长期(每周六)举行,报名后我们会第一时间与您联系 活动流程 13:30 签到 14:00 老师分享 16:00 互动交流 16:30 活动结束 (Hadoop) (含项目实战) NoSQL专题(含HBase) Hadoop企业项...

简直是天才
2018/05/15
20
0
上海大数据实战开发转型—程序员未来的筹码

活动将长期(每周六)举行,报名后我们会第一时间与您联系 活动流程 13:30 签到 14:00 老师分享 16:00 互动交流 16:30 活动结束 (Hadoop) (含项目实战) NoSQL专题(含HBase) Hadoop企业项...

简直是天才
2018/05/15
84
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
2.3K
15
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
39
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
40
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
61
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部