文档章节

SequoiaDB Spark Yarn部署及案例演示

巨杉数据库
 巨杉数据库
发布于 2017/02/06 12:31
字数 2238
阅读 17
收藏 0

 

1、 背景

由于MRv1在扩展性、可靠性、资源利用率和多框架等方面存在明显的不足,在Hadoop MRv2中引入了资源管理和调度系统YARN。YARN是 Hadoop MRv2计算机框架中构建的一个独立的、通用的资源管理系统可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率资源统一管理和数据共享等方面带来了巨大好处。主要体现在以下几个方面:

(1)资源利用率大大提高。一种计算框架一个集群,往往会由于应用程序数量和资源需求的不均衡性,使得在某段时间有些计算框架集群资源紧张,而另外一些集群资源空闲。共享集群模式则通过多种框架共享资源,使得集群中的资源得到更加充分的利用;

(2)运维成本大大降低。共享集群模式使得少数管理员就可以完成多个框架的统一管理;

(3)共享集群的模式也让多种框架共享数据和硬件资源更为方便。

2、 产品介绍

巨杉数据库SequoiaDB是一款分布式非关系型文档数据库,可以被用来存取海量非关系型的数据,其底层主要基于分布式,高可用,高性能与动态数据类型设计,它兼顾了关系型数据库中众多的优秀设计:如索引、动态查询和更新等,同时以文档记录为基础更好地处理了动态灵活的数据类型。并且为了用户能够使用常见的分布式计算框架,SequoiaDB可以和常见分布式计算框架如Spark、Hadoop、HBase进行整合。本文主要讲解SequoiaDB与Spark、YARN的整合以及通过一个案例来演示MapReduce分析存储在SequoiaDB中的业务数据。

3、 环境搭建

3.1、 服务器分布

   
   
   
   

 

3.2、 软件配置

操作系统:RedHat6.5

JDK版本:1.7.0_80 64位

Scala版本:

Hadoop版本:2.7.2

Spark版本:2.0

SequoiaDB版本:2.0

 

3.3、 安装步骤

1、JDK安装

tar -xvf jdk-7u45-linux-x64.tar.gz –C /usr/local

cd /usr/local 

ln -s jdk1.7.0_45 jdk

 

配置环境变量

vim ~/.bash_profile 

export JAVA_HOME=/usr/local/jdk 

export CLASS_PATH=$JAVA_HOME/lib:$JAVA_HOME/jre/lib 

export PATH=$PATH:$JAVA_HOME/bin

source /etc/profile 

2、Scala安装

tar -xvf scala-2.11.8.tgz –C /usr/local

cd /usr/local 

ln -s scala-2.11.8 scala

 

配置环境变量

vim ~/.bash_profile 

export SCALA_HOME=/usr/local/scala 

export PATH=$PATH:$SCALA_HOME/bin 

3、修改主机hosts文件配置

在每台主机上修改host文件

vim /etc/hosts

192.168.1.46 node01

192.168.1.47 node02

192.168.1.48  master

4、 SSH免密钥登录

在master节点中执行ssh-keygen按回车键

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 

 

将master节点中的授权文件authorized_keys传输到slave节点中

scp ~/.ssh/id_rsa.pub root@master:~/.ssh/

 

在slave节点中执行

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

 

在slave节点中验证SSH免密钥登录

ssh master

 

5、Hadoop集群安装

拷贝hadoop文件hadoop-2.7.2.tar.gz到/opt目录中

解压hadoop安装包

tar –xvf hadoop-2.7.2.tar.gz

mv hadoop-2.7.2 /opt/cloud/hadoop

创建hadoop数据存储及临时目录

mkdir –p /opt/hadoop/data

mkdir –p /opt/hadoop/tmp

配置Hadoop jdk环境变量

vim hadoop-env.sh

export JAVA_HOME=/usr/local/jdk

编辑core.xml文件
 

<configuration>

   <property>

      <name>fs.defaultFS</name>

      <value>hdfs://master:9000</value>

   </property>

   <property>

      <name>hadoop.tmp.dir</name>

      <value>/opt/data/tmp</value>

   </property>

   <property>

      <name>io.file.buffer.size</name>

      <value>4096</value>

   </property>

</configuration>

编辑mapred-site.xml

<configuration>



   <property>

      <name>mapreduce.framework.name</name>

      <value>yarn</value>

   </property>

   <property>

      <name>mapreduce.jobtracker.http.address</name>

      <value> master:50030</value>

   </property>

   <property>

      <name>mapreduce.jobhistory.address</name>

      <value> master:10020</value>

   </property>

   <property>

      <name>mapreduce.jobhistory.webapp.address</name>

      <value>master:19888</value>

   </property>

</configuration>

 

编辑hdfs-site.xml

<configuration>

   <property>

      <name>dfs.nameservices</name>

      <value>master</value>

   </property>

   <property>

      <name>dfs.namenode.secondary.http-address</name>

      <value> master:50090</value>

   </property>

   <property>

      <name>dfs.namenode.name.dir</name>

      <value>file:///opt/hadoop/data/name</value>

   </property>

   <property>

      <name>dfs.datanode.data.dir</name>

      <value>file:///opt/hadoop/data</value>

   </property>

   <property>

      <name>dfs.replication</name>

      <value>3</value>

   </property>

   <property>

      <name>dfs.webhdfs.enabled</name>

      <value>true</value>

   </property>

</configuration>

编辑yarn-site.xml

<configuration>

   <property>

      <name>yarn.nodemanager.aux-services</name>

      <value>mapreduce_shuffle</value>

   </property>

   <property>

      <name>yarn.resourcemanager.address</name>

      <value> master:8032</value>

   </property>

   <property>

      <name>yarn.resourcemanager.scheduler.address</name>

      <value> master:8030</value>

   </property>

   <property>

      <name>yarn.resourcemanager.resource-tracker.address</name>

      <value> master:8031</value>

   </property>

   <property>

      <name>yarn.resourcemanager.admin.address</name>

      <value> master:8033</value>

   </property>

   <property>

      <name>yarn.resourcemanager.webapp.address</name>

      <value>master:8088</value>

   </property>

   <property>

      <name>yarn.nodemanager.resource.memory-mb</name>

      <value>12288</value>

   </property>

  <property>

      <name>yarn.nodemanager.log-dirs</name>

      <value>/opt/hadoop/tmp/userlogs</value>

   </property>

</configuration>

 

启动Hadoop

首次启动集群时,做如下操作

进入到/opt/cloud/hadoop/bin目录中执行./hdfs namenode –format格式化

 

hdfs文件系统

进入到/opt/cloud/hadoop/sbin目录中执行./start-all.sh启动hadoop集群

 

6、安装Spark集群

拷贝Spark安装包到/opt目录中,解压

tar –xvf spark-2.0.0-bin-hadoop2.7.tgz

mv spark-2.0.0-bin-hadoop2.7 /opt/cloud/spark

 

编辑spark-env.sh

vim spark-env.sh

JAVA_HOME="/usr/jdk1.7"

SPARK_DRIVER_MEMORY="1g"

SPARK_EXECUTOR_CORES=1

SPARK_EXECUTOR_MEMORY="512m"

SPARK_MASTER_PORT="7077"

SPARK_MASTER_WEBUI_PORT="8070"

SPARK_CLASSPATH="/opt/cloud/spark/jars/sequoiadb.jar:/opt/cloud/spark/jars/spark-sequoiadb_2.11-2.6.0.jar"

SPARK_MASTER_IP="node03"

SPARK_WORKER_MEMORY="712m"

SPARK_WORKER_CORES=1

SPARK_WORKER_INSTANCES=1

SPARK_WORKER_DIR="/opt/data/spark/work"

SPARK_LOCAL_DIRS="/opt/data/spark/tmp"

HADOOP_HOME="/opt/cloud/hadoop"

HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

编辑 slaves

node02

node03

启动spark集群

进入到目录/opt/cloud/spark/sbin目录中

./start-all.sh

Spark成功启动后截图如下:

 

7、Spark Yarn连接SequoiaDB

在SequoiaDB中创建集合空间、集合

db.createCS('poc');

db.poc.createCL('test');

 

进入到spark安装目录bin中,执行./spark-sql –master yarn启动spark sql交互界面

 

创建表,映射到上述poc集合空间中test集合

CREATE TABLE `test` (`id` INT, `name` STRING)

USING com.sequoiadb.spark

OPTIONS (

  `collection` 'test',

  `host` 'node02:11810,node03:11810',

  `serialization.format` '1',

  `collectionspace` 'poc'

);

查询表test数据,执行:

Select * from test;

 

进入到yarn管理页面查看spark任务

 

5、 案例演示

为了配合司法部门的执法和银行内部的风险监管,部分商业银行对于存取款业务定制了相关预警方案,本案例以个人存取款业务高频交易来讲述MapReduce如何分析SequoiaDB中的个人交易明细数据。

具体场景为:分析同一实体柜员办理,1小时内同一账户连续3笔以上支取类金额的交易账户及明细。

本演示案例采用Hadoop Map Reduce实现,开发语言为Java语言。整个测试程序分为两个部分Map算法和Reduce算法。演示程序中Map算法负责将同一个账号的所有对应交易明细归并在一起并输出给Reduce端,Reduce端根据Map算法的结果运算具体的业务场景,最后将运算结果写入到SequoiaDB中。

具体架构如下:

Reduce端具体算法流程如下:

 

Map端算法代码如下:

static class TMapper extends Mapper<Object, BSONWritable,Text,BSONWritable>{

        @Override

        protected void map(Object key, BSONWritable value, Context context)

                throws IOException, InterruptedException {

                       BSONObject obj = value.getBson();

            String acct_no=(String) obj.get("ACCT_NO");

            context.write(new Text(acct_no), value);

        }

}

Reduce端算法代码如下:

static class TReducer extends Reducer<Text,BSONWritable,NullWritable,NullWritable>{

     private static String pattern = "yyyy-MM-dd HH:mm:ss";

     private DateFormat df = new SimpleDateFormat(pattern);

     private static int tradeNum1 = 3;

     private static int tradeTime1 = 3600;

    

     private static int tradeNum2 = 2;

     private static int tradeTime2 = 1800;

     private static int tradeAll = 100000;

    

     private Sequoiadb sdb = null;

     private CollectionSpace cs = null;

     private DBCollection cl_1 = null;

     private DBCollection cl_2 = null;

     private static String CS_NAME="";

     private static String CL_NAME_1="";

     private static String CL_NAME_2="";

    

     public TReducer(){

     if (null == sdb) {

     sdb = ConnectionPool.getInstance().getConnection();

     }


     if (sdb.isCollectionSpaceExist(CS_NAME)) {

     cs = sdb.getCollectionSpace(CS_NAME);

     } else {

     throw new BaseException("集合空间" + CS_NAME + "不存在!");

     }


     if (null == cs) {

     throw new BaseException("集合空间不能为null!");

     } else {

     this.cl_1 = cs.getCollection(CL_NAME_1);

    

     }

     if (null == cs) {

     throw new BaseException("集合空间不能为null!");

     } else {

     this.cl_2 = cs.getCollection(CL_NAME_2);

    

     }

     }

    

        @Override

        protected void reduce(Text key, Iterable<BSONWritable> values,

                Context context)

                throws IOException, InterruptedException{

            Iterator<BSONWritable> iterator=values.iterator();

            long sum=0;

            List<BSONWritable> oldList = new ArrayList<BSONWritable>();

            

            while(iterator.hasNext()){

             BSONWritable bsonWritable = iterator.next();

             oldList.add(bsonWritable);

            }

            //对values进行排序,排序字段为TRN_TIME(交易时间)

            Collections.sort(oldList, new Comparator<BSONWritable>() {

     @Override

     public int compare(BSONWritable o1, BSONWritable o2) {

     String trn_time1 = (String)o1.getBson().get("TRN_TIME");

     String trn_time2 = (String)o2.getBson().get("TRN_TIME");

     return trn_time2.compareTo(trn_time1);

     }

     });

            

            Map<String,BSONWritable> result = new HashMap<String,BSONWritable>();

            if(oldList != null && oldList.size() > 0){

             //记录同一账户满足条件的笔数

             Map<String,BSONWritable> tempMap = new HashMap<String,BSONWritable>();

             for(int i=0;i<oldList.size()-1;i++){

             BSONWritable bSONWritable1 = oldList.get(i);

             //交易代码

             String trn_cd = (String)bSONWritable1.getBson().get("TRN_CD");

             if(trn_cd.equals("000045") || trn_cd.equals("001045") 

             || trn_cd.equals("021031") || trn_cd.equals("020031") 

             || trn_cd.equals("001060") || trn_cd.equals("000060")){

             //交易柜员

                 String tran_teller_no1 = (String)bSONWritable1.getBson().get("TRAN_TELLER_NO");

                 //流水号

                 String jrnl_no = (String)bSONWritable1.getBson().get("JRNL_NO");

                 //交易日期

                 String trn_date1 = (String)bSONWritable1.getBson().get("TRN_DATE");

                 //交易时间

                 String trn_time1 = (String)bSONWritable1.getBson().get("TRN_TIME");

                 Date bigDate = null;

                 try {

     bigDate = df.parse(trn_date1+" "+trn_time1);

     } catch (ParseException e) {

     e.printStackTrace();

     }

                 tempMap.put(jrnl_no,bSONWritable1);

                 for(int j=i+1;j<oldList.size();j++){

                     BSONWritable bSONWritable2 = oldList.get(j);

                     //交易代码

                     String trn_cd1 = (String)bSONWritable2.getBson().get("TRN_CD");

                     if(trn_cd1.equals("000045") || trn_cd1.equals("001045") 

                     || trn_cd1.equals("021031") || trn_cd1.equals("020031") 

                     || trn_cd1.equals("001060") || trn_cd1.equals("000060")){

                     //交易柜员

                         String tran_teller_no2 = (String)bSONWritable2.getBson().get("TRAN_TELLER_NO");

                         //流水号

                         String jrnl_no2 = (String)bSONWritable2.getBson().get("JRNL_NO");

                         //交易日期

                         String trn_date2 = (String)bSONWritable2.getBson().get("TRN_DATE");

                         //交易时间

                         String trn_time2 = (String)bSONWritable2.getBson().get("TRN_TIME");

                         Date smallDate = null;

                         try {

             smallDate = df.parse(trn_date1+" "+trn_time1);

             } catch (ParseException e) {

             e.printStackTrace();

             }

                         //判断是否是同一实体{交易柜员}办理

                         if(!tran_teller_no1.equals(tran_teller_no2)){

                         continue;

                         }

                         //判断{交易日期}{交易时间}是否是[1小时]内

                         if((bigDate.getTime()-smallDate.getTime())/1000 > tradeTime1){

                         break;

                         }

                         tempMap.put(jrnl_no2,bSONWritable2);

                        

                     }else{ //end if TRN_CD1.equals("000045")

                     continue;

                     }

                     }//end for

                

                 if(tempMap.size() >= tradeNum1){

                 result.putAll(tempMap);

                 tempMap.clear();

                 }

             }else{

             continue;

             }//end if ||

             }//end for

            }

            

            Map<String,BSONWritable> result2 = new HashMap<String,BSONWritable>();

            

            List<BSONObject> cl_1_list = new ArrayList<BSONObject>();

            //结果写入sdb

            Iterator iter1 = result.keySet().iterator();

            while(iter1.hasNext()){

             String keyValue = (String)iter1.next();

             BSONWritable resultValue = result.get(keyValue);

             cl_1_list.add(resultValue.getBson());

             cl_1.insert(resultValue.getBson());

            }

            cl_1.bulkInsert(cl_1_list, DBCollection.FLG_INSERT_CONTONDUP);

            cl_1_list = null;

            List<BSONObject> cl_2_list = new ArrayList<BSONObject>();

            context.write(null,null);

        }

    }

 

 

SequoiaDB巨杉数据库2.6最新版下载

SequoiaDB巨杉数据库技术博客
SequoiaDB巨杉数据库社区

© 著作权归作者所有

巨杉数据库
粉丝 54
博文 109
码字总数 222243
作品 1
朝阳
数据库管理员
私信 提问
SequoiaDB 巨杉数据库 2.6 版本正式发布

2016年,SequoiaDB已经正式进入 “2.0时代”,并且正式发布了SequoiaDB 2.0 企业版。2.0版本以来,各个版本在众多的企业用户中得到了广泛的应用,通过实战的检验,也得到了众多企业客户的认可...

巨杉数据库
2016/11/10
2.7K
9
Spark官方Blog:SequoiaDB与Spark深度整合

这是一篇来自我们的技术合作伙伴,SequoiaDB巨杉数据库的博客。作者是SequoiaDB的联合创始人和CTO王涛先生,SequoiaDB是一款JSON文档型的事务型数据库。王涛带着技术上非凡的远见,带领Sequo...

ark43420
2015/08/04
1
0
SequoiaDB(巨杉数据库)成为国内首家Spark认证数据库

近日,Spark的官方博客中刊登了其全球战略合作伙伴SequoiaDB发布的技术博客,介绍SequoiaDB对于Spark的整合以及SequoiaDB+Spark的解决方案。目前,SequoiaDB也成为了Spark官方认证的全球合作...

ark43420
2015/08/04
1
0
SequoiaDB x Spark 新主流架构引领企业级应用

6月,汇集当今大数据界精英的Spark Summit 2017盛大召开,Spark作为当今最炙手可热的大数据技术框架,向全世界展示了最新的技术成果、生态体系及未来发展规划。 巨杉作为业内领先的分布式数据...

巨杉数据库
2017/07/03
8
0
SequoiaDB 受邀参加旧金山Spark 技术峰会,与Spark联手推动大数据企业级应用

巨杉将亮相Spark Summit大会 6月5日至7日,全球大数据界最顶尖的技术盛会,Spark Summit峰会将在加州旧金山召开。 巨杉数据库作为唯一来自中国的分布式数据库厂商,同时作为Spark的全球14家发...

巨杉数据库
2017/06/05
1
0

没有更多内容

加载失败,请刷新页面

加载更多

PostgreSQL 11.3 locking

rudi
今天
5
0
Mybatis Plus sql注入器

一、继承AbstractMethod /** * @author beth * @data 2019-10-23 20:39 */public class DeleteAllMethod extends AbstractMethod { @Override public MappedStatement injectMap......

一个yuanbeth
今天
10
1
一次写shell脚本的经历记录——特殊字符惹的祸

本文首发于微信公众号“我的小碗汤”,扫码文末二维码即可关注,欢迎一起交流! redis在容器化的过程中,涉及到纵向扩pod实例cpu、内存以及redis实例的maxmemory值,statefulset管理的pod需要...

码农实战
今天
4
0
为什么阿里巴巴Java开发手册中不建议在循环体中使用+进行字符串拼接?

之前在阅读《阿里巴巴Java开发手册》时,发现有一条是关于循环体中字符串拼接的建议,具体内容如下: 那么我们首先来用例子来看看在循环体中用 + 或者用 StringBuilder 进行字符串拼接的效率...

武培轩
今天
8
0
队列-链式(c/c++实现)

队列是在线性表功能稍作修改形成的,在生活中排队是不能插队的吧,先排队先得到对待,慢来得排在最后面,这样来就形成了”先进先出“的队列。作用就是通过伟大的程序员来实现算法解决现实生活...

白客C
今天
81
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部