文档章节

Storm+Hbase广告实时统计

飓风2000
 飓风2000
发布于 06/24 19:27
字数 1871
阅读 30
收藏 1

本文主要讲述使用Kafka+Strom+Hbase搭建的一套广告实时计算系统。其中服务器显示使用的是SpringBoot+Vue+ElementUI+EChats.

主要内容:

  • 1.需求
  • 2.日志格式
  • 3.Hbase表格设计
  • 4.编写Storm程序
  • 5.Kafka接收消息
  • 6.Hbase数据查询
  • 7.参考

1.需求

  • 1、某个广告在某个省的当前投放量
  • 2、某个广告在某个市的当前投放量
  • 3、某个广告在某个用户客户端上的当前投放量
  • 4、某个广告在累加一段时间内的某个省额历史投放趋势
  • 5、某个广告在累加一段时间内的某个市额历史投放趋势
  • 6、某个广告在累加一段时间内的某个客户端历史投放趋势
  • 7、某个广告的当前的点击量
  • 8、某个广告在累加一段时间内的点击趋势
 
 
 
 
 

2.日志格式

2014-01-13\t19:11:55\t{"adid":"31789","uid":"9871","action":"view"}\t63.237.239.3\t北京\t北京

日期:2014-01-13
时间:19:11:55
Json:方便扩展
  adid:广告ID
  uid:用户ID
  action:用户行为click、view
IP:63.237.239.3
省:北京
市:北京

3.Hbase建表

表名 realtime_ad_stat
行键 ADID_Province_20181212 ADID_City_20181212 ADID_UID_20181212
列簇 stat
view_cnt、click_cnt
# 创建表
create 'realtime_ad_stat',{NAME => 'stat',VERSIONS => 2147483647}

# 查看表
list

# 清空数据
truncate 'realtime_ad_stat'

# 删除表
disable 'realtime_ad_stat'
drop 'realtime_ad_stat'

4.编写Storm程序

4.1.AdTopology

public class AdTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        KafkaSpoutConfig<String, String> kafkaSpoutConfig =
                KafkaSpoutConfig.builder("hadoop1:9092,hadoop2:9092,hadoop3:9092", "AD")
                        .setProp(ConsumerConfig.GROUP_ID_CONFIG, "STORM_AD_GROUP")
                        .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
                        .build();
        topologyBuilder.setSpout("KafkaSpout", new KafkaSpout(kafkaSpoutConfig), 2);
        topologyBuilder.setBolt("me.jinkun.ad.storm.LogToModelBolt", new LogToModelBolt(), 2).localOrShuffleGrouping("KafkaSpout");
        topologyBuilder.setBolt("me.jinkun.ad.storm.ToHbaseBolt", new ToHbaseBolt(), 4).localOrShuffleGrouping("me.jinkun.ad.storm.LogToModelBolt");

        StormTopology topology = topologyBuilder.createTopology();
        Config config = new Config();
        config.setDebug(false);

        if (args != null && args.length > 0) {
            //运行集群模式
            config.setNumWorkers(4);
            StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("AdTopology", config, topology);
        }
    }
}

从Kafka里读取Topic为AD的最新的日志消息并发送个LogToModelBolt

4.2.LogToModelBolt

public class LogToModelBolt extends BaseBasicBolt {

    private static final Logger LOG = LoggerFactory.getLogger(LogToModelBolt.class);

    public void execute(Tuple input, BasicOutputCollector collector) {
        // 2014-01-13   19:11:55    {"adid":"31789","uid":"9871","action":"view"}    63.237.239.3    北京 北京
        String line = input.getStringByField("value");
        if (LOG.isInfoEnabled()) {
            LOG.info("line:[{}]", line);
        }
        String[] arr = line.split("\t", -1);
        if (arr.length == 6) {
            String date = arr[0].trim().replace("-", "");
            String time = arr[1].trim();
            String json = arr[2].trim();
            String ip = arr[3].trim();
            String province = arr[4].trim();
            String city = arr[5].trim();

            if (StringUtils.isNotEmpty(json)) {
                Ad ad = new Gson().fromJson(json, Ad.class);
                if (null != ad && StringUtils.isNotEmpty(ad.getAdid())) {
                    // 省
                    if (StringUtils.isNotEmpty(province)) {
                        String rowkey = ad.getAdid() + "_" + province + "_" + date;
                        collector.emit(new Values(ad.getAction(), rowkey, 1L));
                    }

                    // 市
                    if (StringUtils.isNotEmpty(city)) {
                        String rowkey = ad.getAdid() + "_" + city + "_" + date;
                        collector.emit(new Values(ad.getAction(), rowkey, 1L));
                    }

                    // 客户端
                    if (StringUtils.isNotEmpty(province)) {
                        String rowkey = ad.getAdid() + "_" + ad.getUid() + "_" + date;
                        collector.emit(new Values(ad.getAction(), rowkey, 1L));
                    }
                }
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("action", "rowkey", "cnt"));
    }
}

解析Log并转化为Model,发送给ToHbaseBolt

4.3.ToHbaseBolt

public class ToHbaseBolt extends BaseBasicBolt {

    private static final Logger LOG = LoggerFactory.getLogger(ToHbaseBolt.class);

    private Table table;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
            Connection conn = ConnectionFactory.createConnection(conf);
            table = conn.getTable(TableName.valueOf("realtime_ad_stat"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String action = input.getStringByField("action");
        String rowkey = input.getStringByField("rowkey");
        Long pv = input.getLongByField("cnt");

        try {
            if ("view".equals(action)) {
                table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"), pv);
            }
            if ("click".equals(action)) {
                table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"), pv);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

ToHbaseBolt 将处理后的数据写入到Hbase表里

5.Kafka

5.1.创建名为AD的Topic

#查看
kafka-topics.sh --describe \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

#创建AD
kafka-topics.sh --create \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD \
--partitions 3 \
--replication-factor 3

#消费者AD
kafka-console-consumer.sh \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD \
--from-beginning

#删除
kafka-topics.sh --delete \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD

5.2.模拟发送消息

public class ProducerClient {

    private static final Logger LOG = LoggerFactory.getLogger(ProducerClient.class);
    private static final String[] PROVINCES_CITIES = new String[]{
            "山东\t济南",
            "河北\t石家庄",
            "吉林\t长春",
            "黑龙江\t哈尔滨",
            "辽宁\t沈阳",
            "内蒙古\t呼和浩特",
            "新疆\t乌鲁木齐",
            "甘肃\t兰州",
            "宁夏\t银川",
            "山西\t太原",
            "陕西\t西安",
            "河南\t郑州",
            "安徽\t合肥",
            "江苏\t南京",
            "浙江\t杭州",
            "福建\t福州",
            "广东\t广州",
            "江西\t南昌",
            "海南\t海口",
            "广西\t南宁",
            "贵州\t贵阳",
            "湖南\t长沙",
            "湖北\t武汉",
            "四川\t成都",
            "云南\t昆明",
            "西藏\t拉萨",
            "青海\t西宁",
            "天津\t天津",
            "上海\t上海",
            "重庆\t重庆",
            "北京\t北京",
            "台湾\t台北",
            "香港\t香港",
            "澳门\t澳门"
    };
    private static final String[] ACTIONS = new String[]{
            "view", "click"
    };
    private static final String[] ADIDS = new String[]{
            "1", "2", "3", "4", "5"
    };

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
        boolean flag = true;
        if (flag) {
            for (int i = 0; i < 2000; i++) {
                //3、发送数据
                //2014-01-13   19:11:55    {"adid":"31789","uid":"9871"}    63.237.239.3    北京市 北京市
                StringBuilder sb = new StringBuilder();
                //sb.append(new SimpleDateFormat("yyyy-MM-dd").format(date));
                sb.append("2018-08-10");
                sb.append("\t");
                sb.append("12:00:00");
                sb.append("\t");
                sb.append("{\"adid\":\"" + ADIDS[new Random().nextInt(ADIDS.length)] + "\",\"uid\":\"" + new Random().nextInt(200) + "\",\"action\":\"" + ACTIONS[new Random().nextInt(ACTIONS.length)] + "\"}");
                sb.append("\t");
                sb.append(new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255));
                sb.append("\t");
                sb.append(PROVINCES_CITIES[new Random().nextInt(PROVINCES_CITIES.length)]);
                kafkaProducer.send(new ProducerRecord("AD", sb.toString()));
            }
            Thread.sleep(1000);
            kafkaProducer.flush();

            if (LOG.isInfoEnabled()) {
                LOG.info("{}", "发送消息完成");
            }
        }

        kafkaProducer.close();
    }
}
 
 
部分日志截图

6.Hbase数据查询

public Map<String, Object> get(Table table, String adid, String date, String province) {
  try {
    if (StringUtils.isNotEmpty(date)) {
      date = date.replace("-", "");
    }

    Map<String, Object> map = Maps.newHashMapWithExpectedSize(5);
    map.put("adid", adid);
    map.put("date", date);
    map.put("province", province);

    // adid_province_date or adid_city_date
    String rowKey = adid + "_" + province + "_" + date;

    Get get = new Get(Bytes.toBytes(rowKey));
    Result result = table.get(get);

    //获取stat:view_cnt
    long viewCnt = 0L;
    byte[] viewBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"));
    if (viewBytes != null) {
      viewCnt = Bytes.toLong(viewBytes);
    }
    map.put("view", viewCnt);

    //获取stat:click_cnt
    long clickCnt = 0L;
    byte[] clickBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"));
    if (clickBytes != null) {
      clickCnt = Bytes.toLong(clickBytes);
    }
    map.put("click", clickCnt);
    return map;
  } catch (IOException e) {
    e.printStackTrace();
    throw new ServiceException("查询列表失败");
  }
}

使用Hbase客户端将realtime_ad_stat表里的数据封装成Map对象并转为Json给前端展示

{
    "data":[
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"山东",
            "click":4
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"河北",
            "click":8
        },
        {
            "date":"20180810",
            "view":2,
            "adid":"1",
            "province":"吉林",
            "click":4
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"黑龙江",
            "click":2
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"辽宁",
            "click":7
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"内蒙古",
            "click":5
        },
        {
            "date":"20180810",
            "view":10,
            "adid":"1",
            "province":"新疆",
            "click":6
        },
        {
            "date":"20180810",
            "view":12,
            "adid":"1",
            "province":"甘肃",
            "click":5
        },
        {
            "date":"20180810",
            "view":11,
            "adid":"1",
            "province":"宁夏",
            "click":5
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"山西",
            "click":5
        },
        {
            "date":"20180810",
            "view":7,
            "adid":"1",
            "province":"陕西",
            "click":5
        },
        {
            "date":"20180810",
            "view":3,
            "adid":"1",
            "province":"河南",
            "click":6
        },
        {
            "date":"20180810",
            "view":1,
            "adid":"1",
            "province":"安徽",
            "click":8
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"江苏",
            "click":10
        },
        {
            "date":"20180810",
            "view":12,
            "adid":"1",
            "province":"浙江",
            "click":5
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"福建",
            "click":2
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"广东",
            "click":13
        },
        {
            "date":"20180810",
            "view":8,
            "adid":"1",
            "province":"江西",
            "click":6
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"海南",
            "click":1
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"广西",
            "click":7
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"贵州",
            "click":11
        },
        {
            "date":"20180810",
            "view":8,
            "adid":"1",
            "province":"湖南",
            "click":8
        },
        {
            "date":"20180810",
            "view":9,
            "adid":"1",
            "province":"湖北",
            "click":4
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"四川",
            "click":8
        },
        {
            "date":"20180810",
            "view":2,
            "adid":"1",
            "province":"云南",
            "click":7
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"西藏",
            "click":4
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"青海",
            "click":3
        },
        {
            "date":"20180810",
            "view":16,
            "adid":"1",
            "province":"天津",
            "click":4
        },
        {
            "date":"20180810",
            "view":12,
            "adid":"1",
            "province":"上海",
            "click":12
        },
        {
            "date":"20180810",
            "view":10,
            "adid":"1",
            "province":"重庆",
            "click":16
        },
        {
            "date":"20180810",
            "view":10,
            "adid":"1",
            "province":"北京",
            "click":14
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"台湾",
            "click":4
        },
        {
            "date":"20180810",
            "view":18,
            "adid":"1",
            "province":"香港",
            "click":10
        },
        {
            "date":"20180810",
            "view":8,
            "adid":"1",
            "province":"澳门",
            "click":12
        }
    ],
    "message":"操作成功!",
    "resultCode":"00000"
}

7.参考:

EChats
HBase企业应用开发实战 第8章
Hadoop集群环境搭建(三台)
Zookeeper集群安装
Strom之WordCount
Hbase之环境搭建
Kafka之集群安装

本文转载自:https://www.jianshu.com/p/161c90229809

飓风2000
粉丝 40
博文 360
码字总数 139939
作品 0
浦东
高级程序员
私信 提问
CPS渠道统计难?你可能没用过这个统计工具

CPS广告是一种按销售计费的新型广告模式,即按广告被点击之后实际销售产品的数量来收费的一种方式。用户每完成一次交易,网站主就会获得相应的佣金,它与CPA都是在一定程度上避免了广告主的风...

云山和大海
2018/12/12
0
0
Juuluu KgAd 金刚java广告系统--KgAd

KgAd,金刚广告系统,是一个速度超快的广告统计监测系统,kgAd对互联网广告投放,移动手机广告,软件终端广告提供流量统计,访客监测,效果分析,成本统计等服务。kgad采用java开发,是成功公...

匿名
2011/11/15
7.3K
0
兑吧:从自建HBase迁移到阿里云HBase实战经验

业务介绍 兑吧集团包含兑吧网络和推啊网络,兑吧网络是一家致力于帮助互联网企业提升运营效率的用户运营服务平台,提供积分商城和媒体运营服务。推啊网络是一家互动式广告平台,经过多年的探...

所在jason
2018/06/12
0
0
献给兼职赚外快的个人开发者

本文为android开发者介绍嵌入到Android软件中的常用的一些广告平台和应用市场,即android软件的商业模式: 1.免费android应用嵌入广告 目前国内个人开发者最普遍的赚钱方式之一,可以利用嵌入...

xoulxia
2013/10/12
1K
1
Adsense Notifier

该扩展可以在 firefox 状态栏动态显示您的 google adsense 广告收入情况。是博客或站长统计实时 google adsense 广告收入的好帮手。 已更新 2008 年 04 月 22 日...

匿名
2008/09/19
468
0

没有更多内容

加载失败,请刷新页面

加载更多

Qt编写自定义控件32-等待进度条控件

一、前言 在各种各样的执行任务界面,有时候需要比较多的时间,需要给出一个直观的等待进度条表示当前正在执行的进度,而不至于懵逼在那里,用户不会觉得程序死了还是干嘛了。 等待进度条有好...

飞扬青云
10分钟前
2
0
Packagist / Composer 中国全量镜像

还没安装 Composer 吗?请往下看如何安装 Composer 。 镜像用法 有两种方式启用本镜像服务: 系统全局配置: 即将配置信息添加到 Composer 的全局配置文件 config.json 中。见“方法一” 单个...

mdoo
10分钟前
1
0
mnist文件格式说明

根据官网 http://yann.lecun.com/exdb/mnist/ 的文件格式的定义 TRAINING SET LABEL FILE (train-labels-idx1-ubyte): [offset] [type] [value] [description] 0000 32 bit integer 0x00000......

冷基
29分钟前
2
0
DNS域名解析命令 —— host

命令host 用途: 使用域名服务器查询主机名字 语法: ]# host [选项] 主机名 [服务器] 常用选项: -t 指定查询的域名信息类型 A CERT DNAME IPSECKEY MX N...

迷失De挣扎
45分钟前
3
0
Ubuntu tty中文字符乱码

默认的tty只能显示一个字节,我们可以用setfont命令去改tty字体,但仅仅局限在一个字节内,不支持UTF-8多字节,所以我们就没办法使用汉字。但是我们可以使用FbTerm啊!FbTerm是支持中文显示的...

mbzhong
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部