指标圈选在数据应用平台的实现

原创
04/20 09:47
阅读数 595

1

业务背景

58营销平台从58主战网站或其他来源收集到的联系过58的企业,可能联系58的服务付费的企业用户,对于这样的企业信息在营销平台中称为商机。一个商机通过一系列过滤补充及数仓计算后入到营销的数仓中。其中商机包含很多对应属性,类似于:商机名称、企业所在行业、企业规模、企业地址、商机最后联系时间、商机近7天/30天/60天/90天联系次数。

平台相关运营依据自定义配置规则,条件组合选出所需商机,对所选出的商机完成所需操作。类似于:商机打标签(商机增加标签标识,销售人员依据标签选出商机做对应跟进)、商机入沉寂(业务系统中数据状态转为沉寂,减少跟进)、商机回流(将沉寂库商机转入公共库使销售人员持续跟进)......等等类似操作。

针对以上需求及场景,开发数据应用平台基于数仓的商机表输出的es索引,配置自定义数据圈选规则,并对圈选出的指定商机创建数据应用任务。数据应用任务根据场景配置调用时间窗口等规则定时调用,处理后的数据应用至各个业务场景。


2

平台架构


数据应用平台整体架构如图所示:

  • 应用层:用户配置任务规则、创建标签等与交互层直接操作

  • 配置层:配置圈选规则时用到的指标字段配置、对应字典配置、圈选所查询的数据集索引配置、任务配置的具体执行动作配置

  • 交互层:标签创建,按设置的在业务系统中展示的位置设置应用位置并创建业务标签;任务创建,创建的标签在打标任务时选择,创建的任务按配置的执行时间段执行;任务调度及任务查看为具体执行任务及监控

  • 执行层:创建任务时配置圈选数据的规则,组装SQL;任务执行时按照配置的规则生成的SQL查询数据并调用 打标/清洗接口;数据对比差集计算为打标时计算前一天结果与当天圈选结果差集

  • 存储层:数据圈选所用数据集使用es/mysql;差集计算使用spark查询hive得到;任务执行队列存储在redis的list中;mysql中持久化相关任务圈选规则及其他配置


3

核心功能与整体流程

3.1 数据圈选


对于这样的条件配置数据圈选,运营及业务方根据es表所输出的字段查出所需数据,并对圈选出的数据做对应操作的任务创建,则涉及到where条件的拼接。

本身单表的多个单指某个字段的条件查询,其实是个以 and/or为头的二叉树,两边是表达式。类似于像商机(企业信息)的这种查询,查询企业名称(customer)包含 装修 或包含 餐饮,并且产品线(productLine)为招聘,并且最后发帖时间(lastTime)为7天以内,则写SQL会表达为==>

(customer like '%装修%' or customer like '%餐饮%') and productLine = 56 and lastTime >= DATE_FORMAT(DATE_SUB(now(),INTERVAL 1 DAY),'%Y-%m-%d')


要表达这样一个关系,和前端商定对应的协议和参数,是用到了json形式中间包含数组属性来做到,同时涉及到不同类型的字段处理,程序中再具体用到策略模式在不同实现类中处理等


3.2 任务调度

对于任务的调度,此处涉及到用户创建的任务需要在一天的不同时间段执行,并且可能在一段时间内每天执行或每周执行。此时便涉及到将任务下发到集群各个节点并行执行,并且多任务在一定时间内很多的情况下,需排队等待机器资源。想到过的解决方案

  • 用公司封装的wjob(xxl-job)组件的分片广播策略完成

  • 把任务当成消息作为消息队列实现

此两种方式的实现会遇到不同程度的问题,xxl-job来分发任务分片广播策略会发到各个节点但不会是同一时间会有时间和资源上的浪费;消息队列则因为这种场景会遇到任务少量并且长时间执行的情况不完全使用消息发送的场景。同时等待过程中会需要观察监控任务,队列中的数据不容易完整查看。

针对此场景,采用redis的list作为队列存放任务,wjob的任务每小时扫描当前时间可放入队列的任务数据,封装放入list。每个节点的机器开启每分钟的定时任务pop队列的第一个并执行。这样放在redis的list中的任务数据,随时使用lrange命令查看,并且控制好命令下标做到分页查看。此方式和开源组件resque的实现方式不谋而合。


3.3 整体功能流程


  • 任务不同频率处理-一次性/每日/每周

    • 采用策略模式处理对应每天job执行扫描到对应任务时判断是否需要放入队列

  • 选择数据源(es、mysql、...)时配置在表中,通过模板方法执行相应步骤

    • 连接数据源-获取配置sql-查询数据-执行

  • 执行调用数据接口过程,不同动作对应参数通过同一接口的map类型 ext额外参数字段控制

    • 下游对应不同业务时,不同主题数据字段不同,通过ext的map参数扩展

  • 执行动作通过表配置,并对应策略模式不同实现类对应不同调用方式


4

难点与迭代

4.1 商机打标-大数据量数据集对比

商机打标操作为当天任务配置的圈选条件查出的商机数据与前一天的做对比操作

  • 对比结果中需新增的做对应操作,需解绑的做解绑操作

  • 少量数据的情况下,100万以下时在内存中对比,总量300万以上预计占用内存接近10g

  • 大数据量时,使用配置的查询条件生成的hive_sql,通过spark任务查询商机hive表与标签商机绑定关系hive表,对比结果存为2张表存入mysql

  • 操作商机量计数与任务关系一张表

  • 商机id与标签对应关系的一张表,需新增及需解绑的数据在此表中

问题解决方案与迭代

  • 数据侧

  • 启动spark环境

  • 获取所有任务信息,遍历所有任务,每个任务提交一个spark sql job

  • 查询任务对应规则,生成对于过滤sql,获得任务圈选商机

  • 任务圈选商机 和 标签绑定商机关联,获得 标签新增商机 和解绑商机 

  • 结果写入对应的表

  • 结果表写入mysql

后端处理

  • 读取数据同学spark任务输出的mysql表或内存中自主计算差集通过任务类别标识等判断

  • 读取spark任务输出的mysql表数据时分页处理

  • 分开读取需新增标签、解绑标签

  • 读取表时使用不同条件读取并处理

  • 分页处理一批后清除内存中相应list,节省内存

  • 生成hive_sql 的where条件时,类似map接口等类型数据相应按sql配置生成时的策略模式实现类中处理

4.2 数据调用接口限流处理

问题分析

  • 圈选商机->商机对应调用下游接口->接口设置商机状态至 沉寂库/公海/商机删除...

  • 调用下游接口时需限流,每分钟只能调用2000次,超出接口调用上限时,scf云平台会限流抛弃请求并报警,导致数据未处理

  • 相对应的,数据应用平台作为上游调用方,对各个场景做主动限流,每分钟只调用对应量数据,超出部门排队到下一分钟调用

功能迭代与问题解决

  • 使用redis+lua脚本作为限流key计数,发送请求前计数并查询

  • 超出计数时排队等待至下一分钟

  • 出现相应问题:

  • 统计每分钟计数,例:一分钟的第50秒开始执行任务,执行2000条到下一分钟

  • 此时到下一分钟后当前分钟计数为0,执行2000条

  • 实际情况则前一分钟50秒至当前分钟10秒见,20秒调用接口4000次,超出限流量

  • 针对此问题使用滑动窗口计数

  • 每10秒一个窗口,每次统计限流量时,统计当前窗口+前5个窗口,合计60秒6个窗口



5

总结

整体功能对数据圈选在业务方自定义配置下创建任务并定时调整数据,中间经过迭代对任务执行调度及数据对比等功能根据问题调整。整体达到预期,后续继续接入新的数据应用业务时持续迭代改进功能。

本文分享自微信公众号 - 58技术(architects_58)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部