文档章节

spark map reduce和spark sql

flyking
 flyking
发布于 2017/03/14 18:40
字数 670
阅读 4
收藏 0

map/reduce example

# -*- coding: UTF-8 -*-

from pyspark import SparkConf, SparkContext, StorageLevel
import json

conf = SparkConf().setAppName('SimpleApp').setMaster('yarn')
sc = SparkContext(conf=conf)

# Report-201702271315-201702271329.gz => 一个json一行的gzip压缩文件
rdd_file = sc.textFile('hdfs://master:9000/user/ubuntu/work/Report-201702271315-201702271329.gz')
# rdd_file = sc.textFile('s3a://my-bucket/tmp/Report-201702271315-201702271329.gz')

rdd_dict_list = rdd_file.map(json.loads)
rdd_dict_list.persist(StorageLevel.MEMORY_AND_DISK)  # MEMORY_ONLY,MEMORY_AND_DISK,DISK_ONLY


def lambda_map_page(d):
    event_type = d['event_type']
    page_id = d.get('resolve_page_id') or d.get('page_id', '')
    if page_id in ['23', '25', '33', '34', '39']:
        if event_type == 3:
            return page_id, 1
        else:
            return page_id, 0
    else:
        return '', 0


rdd_page_kv = rdd_dict_list.map(lambda_map_page).reduceByKey(lambda a, b: a + b)

print rdd_page_kv.collect()
# 打印出每个page_id的install数
# [('', 0), (u'34', 0), (u'23', 61), (u'39', 3), (u'33', 30)]


# 几个常用的 Transformation
# => map,filter,flatMap,sample,union,distinct,reduceByKey,groupByKey,sortByKey,join
# 几个常用的 Actions
# => reduce,collect,count,first,take,countByKey,saveAsTextFile,saveAsSequenceFile

spark sql

# -*- coding: UTF-8 -*-

from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import Row, SQLContext
from collections import OrderedDict
import json

conf = SparkConf().setAppName('SimpleApp').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 这个虽然可能不使用,但是必须要有
sql_context = SQLContext(sc)  # Not to mention you need a SQLContext to work with DataFrames anyway.

# rdd_file = sc.textFile('hdfs://master:9000/user/ubuntu/work/Report-2887718663-20031-201702271315-201702271329Ath.gz')
tmp_urls = ['stafmt/Report-2887712797-20010-201610010000-201610010014Ath.gz',
            'stafmt/Report-2887712797-20010-201610010015-201610010029Ath.gz',
            'stafmt/Report-2887712797-20010-201610010030-201610010044Ath.gz',
            'stafmt/Report-2887712797-20010-201610010045-201610010059Ath.gz',
            'stafmt/Report-2887712797-20010-201610010100-201610010114Ath.gz',
            'stafmt/Report-2887712797-20010-201610010115-201610010129Ath.gz',
            'stafmt/Report-2887712797-20010-201610020015-201610020029Ath.gz',
            'stafmt/Report-2887712797-20010-201610010130-201610010144Ath.gz']
s_urls = ','.join(['s3a://aff.parallel-app.com/{url}'.format(url=url) for url in tmp_urls])
rdd_file = sc.textFile(s_urls, 2)

rdd_dict_list = rdd_file.map(json.loads)


# rdd_dict_list.persist(StorageLevel.MEMORY_AND_DISK)  # MEMORY_ONLY,MEMORY_AND_DISK,DISK_ONLY

# print rdd_dict_list.take(2)


def convert_dict_to_row_old(d):
    return Row(**OrderedDict(sorted(d.items())))


def convert_dict_to_row(d):
    page_id = d.get('resolve_page_id') or d.get('page_id', '')
    event_type = d.get('event_type', 0)
    ymd = d.get('create_time_ymd', '')
    offer_src = d.get('resolve_offer_src') or d.get('offer_src', '')
    tmp_dict = {'page_id': page_id, 'event_type': event_type, 'ymd': ymd, 'offer_src': offer_src}
    return Row(**OrderedDict(sorted(tmp_dict.items())))


df = rdd_dict_list.map(convert_dict_to_row).toDF()

# 方式1(ORM)
df.show()
print df.where('page_id=33').groupBy(['offer_src', 'event_type', 'ymd']).agg({'*': 'count'}).collect()

# 方式2(RAW SQL)
df.registerTempTable("offer_report")
sql = """
select ymd,offer_src,event_type,count(*)
from offer_report where page_id='33' and event_type in (31,32,33,34)
group by offer_src,event_type,ymd
"""
sql_context.sql(sql).show()

一个spark sql实际用例

# -*- coding: UTF-8 -*-

from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import Row, SQLContext
from collections import OrderedDict
import json

conf = SparkConf().setAppName('SimpleApp').setMaster('local[*]')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)  # Not to mention you need a SQLContext to work with DataFrames anyway.
values = {}


def query_from_s3(ymd):
    rdd_file1 = sc.textFile('s3a://aff.parallel-app.com/fluentd/track/{ymd}/*'.format(ymd=ymd), 2)
    rdd_file2 = sc.textFile('s3a://aff.parallel-app.com/fluentd/track/20170428/track_2017042800*'.format(ymd=ymd), 2)
    rdd_file3 = sc.textFile('s3a://aff.parallel-app.com/fluentd/track/20170428/track_2017042801*'.format(ymd=ymd), 2)
    rdd_files = [rdd_file1, rdd_file2, rdd_file3]

    rdd_file = sc.union(rdd_files)

    def convert_line_to_row(line):
        line = line.strip()
        idx = line.find('{')
        if idx != -1:
            try:
                d = json.loads(line[idx:])
            except:
                d = {}
        else:
            d = {}
        ymd = d.get('ymd', '')
        hms = d.get('hms', '')
        aid = d.get('aid', '')
        host_package = d.get('host_package', '')
        ver = d.get('ver', 0)
        ad_type = d.get('ad_type', 0)
        ad_packages = d.get('ad_packages', '')
        ad_names = d.get('ad_names', '')
        label_reasons = d.get('label_reasons', '')

        trigger_event = d.get('trigger_event', 0)
        match_return = d.get('match_return', {})
        channel = d.get('channel', '')

        if match_return:
            match_return = 1
        else:
            match_return = 0

        tmp_dict = {'aid': aid, 'host_package': host_package, 'ver': ver, 'ad_type': ad_type, 'ymd': ymd, 'hms': hms,
                    'ad_packages': ad_packages, 'trigger_event': trigger_event, 'match_return': match_return,
                    'channel': channel, 'ad_names': ad_names, 'label_reasons': label_reasons}
        return Row(**OrderedDict(sorted(tmp_dict.items())))

    df = rdd_file.map(convert_line_to_row).toDF()
    df.persist(StorageLevel.DISK_ONLY)  # MEMORY_ONLY,MEMORY_AND_DISK,DISK_ONLY

    df.registerTempTable("tbl_track")

    sql_active = """
    SELECT aid, COUNT(*) AS cnt, COUNT(*) OVER() AS active_users
    FROM tbl_track
    GROUP BY aid
    LIMIT 10
    """.format(ymd=ymd)
    sql_context.sql(sql_active).show()
    active_rows = sql_context.sql(sql_active).collect()
    print active_rows


query_from_s3('20170427')

© 著作权归作者所有

flyking
粉丝 64
博文 218
码字总数 75579
作品 0
东城
程序员
私信 提问
spark和hive storm mapreduce的比较

Spark Streaming与Storm都可以用于进行实时流计算。但是他们两者的区别是非常大的。其中区别之一 就是,Spank Streaming和Stom的计算模型完全不一样,Spark Streaming是基于RDD的,因此需要将...

necther
2018/04/28
0
0
Spark(二) -- Spark简单介绍

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45648737 spark是什么? spark开源的类Hadoop MapReduce的通用的并行计算框架 ...

jchubby
2015/05/11
0
0
Spark2.1.0之模块设计

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80386736 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》和《Spark...

泰山不老生
2018/06/05
0
0
Apache Spark 1.6.2 发布,集群计算环境

Apache Spark 1.6.2 发布了,Apache Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句...

愚_者
2016/06/28
3.8K
1
Spark中文python文档

East 2015 (Nov 26, 2014) Spark wins Daytona Gray Sort 100TB Benchmark (Nov 05, 2014) Archive Download Spark Speed Run programs up to 100x faster than Hadoop MapReduce in memory,......

BryanYang
2015/01/22
6.8K
1

没有更多内容

加载失败,请刷新页面

加载更多

MySQL8.0.17 - Multi-Valued Indexes 简述

本文主要简单介绍下8.0.17新引入的功能multi-valued index, 顾名思义,索引上对于同一个Primary key, 可以建立多个二级索引项,实际上已经对array类型的基础功能做了支持 (感觉官方未来一定...

阿里云官方博客
41分钟前
4
0
make4.1降级 make-3.81、2错误

在编译 make-3.82 的时候出现如下错误提示 glob/glob.c:xxx: undefined reference to `__alloca'` 修改 /glob/glob.c // #if !defined __alloca && !defined __GNU_LIBRARY__ # ifdef __GNUC......

Domineering
42分钟前
7
0
Rainbond集群的安装和运维的原理

本文将解读Rainbond集群的安装和运维的原理,使用户基本了解Rainbond的安装机制和运维重点,便于用户搭建大型Rainbond集群。 1.Rainbond集群节点概述 1.1 节点分类 属性 类型 说明 manage 管...

好雨云帮
54分钟前
8
0
好程序员大数据学习路线分享UDF函数

1.为什么需要UDF? 1)、因为内部函数没法满足需求。 2)、hive它本身就是一个灵活框架,允许用自定义模块功能,如可以自定义UDF、serde、输入输出等。 2.UDF是什么? UDF:user difine fun...

好程序员官方
56分钟前
6
0
Groovy中 Base64 URL和文件名安全编码

Base64 URL和文件名安全编码 Groovy支持Base64编码很长一段时间。 从Groovy 2.5.0开始,我们还可以使用Base64 URL和Filename Safe编码来使用encodeBase64Url方法对字节数组进行编码。 结果是...

白石
59分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部