文档章节

解决关联表的查询以及批量导入数据至solr cloud

h
 haoqidemao
发布于 2016/12/09 18:39
字数 1584
阅读 108
收藏 1
点赞 0
评论 0

 

 

    mapreduce适合的工作方式是一开始就准备好数据资源,由mapper一条一条的处理数据,再由reducer整合。所谓兵马未动粮草先行,数据资源准备充足,mapreduce才能顺利算出结果。然而目前面临的问题是一条数据在mapper处理的过程中,需要多次查询数据库,以及写入solr cloud中。如果还是逐条处理,那么一条数据的生产代价将会非常昂贵。一个解决思路是在mapper的过程中缓存数据,批量处理。

    话不多说,直接上代码

ServiceRecord类

public class ServiceRecord implements Writable, DBWritable {
    
    // 基础信息,从表xx_service查得
    int saleId;
    String g_sid;
    String title;
    String content;
    
    // 地域信息,从表xx_service_info查得
    String provinceName;
    String cityName;
    String townName;

    // 这里实现clone方法,因为mapper在逐条格式化数据时会回收ServiceRecord对象
    public ServiceRecord clone() {
        ServiceRecord serviceRecord = new ServiceRecord();
        serviceRecord.saleId = saleId;
        serviceRecord.g_sid = g_sid;
        serviceRecord.title = title;
        serviceRecord.content = content;
        serviceRecord.provinceName = provinceName;
        serviceRecord.cityName = cityName;
        serviceRecord.townName = townName;
        return serviceRecord;
    }
    
    public void readFields(DataInput dataInput) throws IOException {
    }

    public void write(DataOutput dataOutput) throws IOException {
    }

    public void write(PreparedStatement statement) throws SQLException {
    }

    public void readFields(ResultSet resultSet) throws SQLException {
        saleId = resultSet.getInt("saleId");
        title = resultSet.getString("title");
        content = resultSet.getString("content");
        g_sid = UUID.nameUUIDFromBytes((saleId + "").getBytes()).toString();
    }
     
    public void readPositionFields(ResultSet resultSet) throws SQLException {
        provinceName = resultSet.getString("provinceName");
        cityName = resultSet.getString("cityName");
        townName = resultSet.getString("townName");
    }
}

ServiceMapper类:


import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ServiceMapper extends Mapper<LongWritable, ServiceRecord, Text, IntWritable>{

    // 每缓存MAX_CAPACITY条数据后批量查询关联表,并且提交至solr
    private static final int MAX_CAPACITY = 100;
    
    // 总共提交至solr的数据量
    private int commitCount;
    // 缓存ServiceRecord
    private Map<Integer, ServiceRecord> serviceRecordMap;

    @Override
    protected void setup(Mapper<LongWritable, ServiceRecord, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        super.setup(context);
        System.out.println("setup");
        
        commitCount = 0;
        serviceRecordMap = new HashMap<Integer, ServiceRecord>(MAX_CAPACITY);
        
        Configuration conf = context.getConfiguration();
        
        // init SQLUtil
        String sqlUrl = conf.get("mysql.url");
        String sqlUser = conf.get("mysql.user");
        String sqlPassword = conf.get("mysql.password");
        SQLUtil.instance().setDbUrl(sqlUrl);
        SQLUtil.instance().setDbUser(sqlUser);
        SQLUtil.instance().setDbPass(sqlPassword);
        SQLUtil.instance().init();
        
        // init SolrUtil
        String solrUrl = conf.get("solr.url");
        SolrUtil.instance().setUrl(solrUrl);
        SolrUtil.instance().init();
    }
    
    @Override
    protected void map(LongWritable key, ServiceRecord value,
            Mapper<LongWritable, ServiceRecord, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        // 注意这里clone了value,因为value会被回收
        serviceRecordMap.put(value.saleId, value.clone());
        if (serviceRecordMap.size() >= MAX_CAPACITY) {
            // 批量处理缓存数据
            batchProcess();
        }
    }
    
    @Override
    protected void cleanup(Mapper<LongWritable, ServiceRecord, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        // 处理剩余的缓存数据
        if (!serviceRecordMap.isEmpty()) {
            batchProcess();
        }
        // solr hard commit
        SolrUtil.instance().commit(false);
        super.cleanup(context);
    }
    
    private boolean batchProcess() {
        // 1. 查询关联表
        getPositionInfo();
        // 2. 更新至solr
        updateSolr();
        // 3. solr soft commit
        SolrUtil.instance().commit(true);
        // 4. increase commitCount
        commitCount += serviceRecordMap.size();
        // 5. 清除缓存数据
        serviceRecordMap.clear();
        System.out.println("commit: " + commitCount);
        return true;
    }
    
    private boolean getPositionInfo() {
        if (serviceRecordMap.isEmpty()) {
            return true;
        }
        // fetch connection
        Connection connection = SQLUtil.instance().getConnection();
        String sql = "SELECT service_id as saleId, province_name as provinceName, city_name as cityName, "
                + "town_name as townName from xx_service_info where service_id in ("
                + StringUtils.join(serviceRecordMap.keySet(), ',') + ");";
        Statement statement = null;
        ResultSet resultSet = null;
        try {
            statement = connection.createStatement();
            statement.execute(sql);
            resultSet = statement.getResultSet();
            if (resultSet != null) {
                while (resultSet.next()) {
                    int saleID = resultSet.getInt("saleId");
                    ServiceRecord serviceRecord = serviceRecordMap.get(saleID);
                    if (serviceRecord != null) {
                        serviceRecord.readPositionFields(resultSet);
                    }
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
            return false;
        } finally {
            SQLUtil.instance().releaseResourceNoThrow(resultSet, statement, connection);
        }
        return true;
    }
    
    private boolean updateSolr() {
        if (serviceRecordMap.isEmpty()) {
            return true;
        }
        List<ServiceRecord> serviceRecords = new ArrayList<ServiceRecord>(serviceRecordMap.values());
        return SolrUtil.instance().indexAsJson(serviceRecords);
    }

}

SQLUtil类:

import java.beans.PropertyVetoException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Wrapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import javax.sql.DataSource;

import com.mchange.v2.c3p0.ComboPooledDataSource;

public class SQLUtil {

    private static SQLUtil instance = null;

    // 连接池
    private DataSource dataSource = null;

    private String dbUrl = null;
    private String dbUser = null;
    private String dbPass = null;

    private SQLUtil() {
    }

    public static SQLUtil instance() {
        if (instance == null) {
            synchronized (SQLUtil.class) {
                if (instance == null) {
                    instance = new SQLUtil();
                }
            }
        }
        return instance;
    }

    public void init() {
        Objects.requireNonNull(dbUrl);
        Objects.requireNonNull(dbUser);
        Objects.requireNonNull(dbPass);
        create();
    }

    private void create() {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e1) {
            e1.printStackTrace();
        }
        ComboPooledDataSource dataSource = new ComboPooledDataSource();
        dataSource.setJdbcUrl(dbUrl);
        dataSource.setUser(dbUser);
        dataSource.setPassword(dbPass);
        try {
            dataSource.setDriverClass("com.mysql.jdbc.Driver");
        } catch (PropertyVetoException e) {
            e.printStackTrace();
        }
        dataSource.setDataSourceName("SQLUtil");
        dataSource.setAutoCommitOnClose(false);
        dataSource.setPreferredTestQuery("select 1");
        dataSource.setMaxConnectionAge(3600);
        dataSource.setTestConnectionOnCheckout(true);
        dataSource.setIdleConnectionTestPeriod(300);
        dataSource.setDebugUnreturnedConnectionStackTraces(true);
        dataSource.setUnreturnedConnectionTimeout(120);

        this.dataSource = dataSource;
    }

    public Connection getConnection() {
        try {
            Connection connection = dataSource.getConnection();
            return connection;
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void releaseResourceNoThrow(Wrapper... wrapperSet) {

        // 1. close ResultSet
        for (Wrapper wrapper : wrapperSet) {
            if (wrapper instanceof ResultSet) {
                try {
                    ((ResultSet) wrapper).close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

        // 2. close Statement or PreparedStatement
        for (Wrapper wrapper : wrapperSet) {
            if (wrapper instanceof PreparedStatement) {
                try {
                    ((PreparedStatement) wrapper).close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            } else if (wrapper instanceof Statement) {
                try {
                    ((Statement) wrapper).close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

        // 3. close Connection
        for (Wrapper wrapper : wrapperSet) {
            if (wrapper instanceof Connection) {
                try {
                    ((Connection) wrapper).close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void setDbUrl(String dbUrl) {
        this.dbUrl = dbUrl;
    }

    public void setDbUser(String dbUser) {
        this.dbUser = dbUser;
    }

    public void setDbPass(String dbPass) {
        this.dbPass = dbPass;
    }
}

SolrUtil类:

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class SolrUtil {
    private Gson gson = null;
    private List<NameValuePair> softCommitPairs = null;
    private List<NameValuePair> hardCommitPairs = null;
    
    private String url = null;
    
    private static SolrUtil instance = null;
    
    private SolrUtil() {
        gson = new GsonBuilder().create();
        
        NameValuePair softCommitPair = new BasicNameValuePair("softCommit", "true");
        softCommitPairs = new ArrayList<NameValuePair>();
        softCommitPairs.add(softCommitPair);
        
        NameValuePair hardCommitPair = new BasicNameValuePair("commit", "true");
        hardCommitPairs = new ArrayList<NameValuePair>();
        hardCommitPairs.add(hardCommitPair);
    }
    
    public static SolrUtil instance() {
        if (instance == null) {
            synchronized (SolrUtil.class) {
                if (instance == null) {
                    instance = new SolrUtil();
                }
            }
        }
        return instance;
    }
    
    public void init() {
        Objects.requireNonNull(url);
    }
    
    public boolean indexAsJson(ServiceRecord serviceRecord) {
        List<ServiceRecord> serviceRecords = new ArrayList<ServiceRecord>(1);
        serviceRecords.add(serviceRecord);
        return indexAsJson(serviceRecords);
    }
    
    public boolean indexAsJson(List<ServiceRecord> serviceRecords) {
        
        HttpClient httpclient = new DefaultHttpClient();
        try {
            HttpPost httppost = new HttpPost(url);
            InputStreamEntity reqEntity = new InputStreamEntity(
                    new ByteArrayInputStream(gson.toJson(serviceRecords).getBytes("UTF-8")), -1);
            reqEntity.setContentType("application/json");
            httppost.setEntity(reqEntity);
            HttpResponse response = httpclient.execute(httppost);
            if (response.getStatusLine().getStatusCode() != 200) {
                System.out.println(String.format("commit:%s with error:%s-%s",
                        httppost.getRequestLine(), 
                        response.getStatusLine().toString(), 
                        EntityUtils.toString(response.getEntity())));
                return false;
            }
        } catch (ClientProtocolException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // When HttpClient instance is no longer needed,
            // shut down the connection manager to ensure
            // immediate deallocation of all system resources
            httpclient.getConnectionManager().shutdown();
        }
        return true;
    }
    
    public boolean commit(boolean isSoftCommit) {
        HttpClient httpclient = new DefaultHttpClient();
        try {
            HttpPost httppost = new HttpPost(url);
            UrlEncodedFormEntity entity = null;
            if (isSoftCommit) {
                entity = new UrlEncodedFormEntity(softCommitPairs, HTTP.UTF_8);
            } else {
                entity = new UrlEncodedFormEntity(hardCommitPairs, HTTP.UTF_8);
            }
            httppost.setEntity(entity);
            HttpResponse response = httpclient.execute(httppost);
            if (response.getStatusLine().getStatusCode() != 200) {
                System.out.println(String.format("commit:%s with error:%s-%s",
                        httppost.getRequestLine(), 
                        response.getStatusLine().toString(), 
                        EntityUtils.toString(response.getEntity())));
            }
        } catch (ClientProtocolException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // When HttpClient instance is no longer needed,
            // shut down the connection manager to ensure
            // immediate deallocation of all system resources
            httpclient.getConnectionManager().shutdown();
        }
        return true;
    }
    
    public void setUrl(String url) {
        this.url = url;
    }
    
}

最后由IndexDriver创建job并提交job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class IndexDriver {
    
    public static void main(String []args) throws Exception {
        String driverClass = "com.mysql.jdbc.Driver";
        String dbUrl = "jdbc:mysql://<ip>:<port>/<database>";
        String userName = "<user>";
        String password = "<password>";

        String solrUrl = "http://<ip>:<port>/solr/<collection>/update";
        
        long time_start, time_end;
        
        Configuration conf = new Configuration();
        conf.set("mysql.url", dbUrl);
        conf.set("mysql.user", userName);
        conf.set("mysql.password", password);
        conf.set("solr.url", solrUrl);
        // 单机执行时这个设置似乎没起作用,只创建了一个mapper实例
        conf.setInt(MRJobConfig.NUM_MAPS, 5);
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, password);
        
        Job job = Job.getInstance(conf, IndexDriver.class.getName());
        job.setJarByClass(IndexDriver.class);
        DBInputFormat.setInput(job, ServiceRecord.class, "xx_service", null, null, "service_id as saleId", "subject as title", "cont as content");
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path("/user/tanghuan/output"));
        job.setMapperClass(ServiceMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        time_start = System.currentTimeMillis();
        job.waitForCompletion(true);
        time_end = System.currentTimeMillis();
        System.out.println("finish job, take time: " + (time_end - time_start) + " ms");
        
    }

}

单机测试结果:commit: 572026. finish job, take time: 661384 ms

大约:865条/s

这里每条数据查mysql表两次,更新solr一次,solr每更新100条数据执行一次soft commit,在最后执行一次hard commit

总结

  1. ServiceRecord需要实现clone方法
  2. mapper并没有提供批量处理数据的接口,需要自己缓存数据再批量处理,且该方案得利于DBInputFormat查询数据库后以流的方式读取数据。

下一篇文章讲讲利用docker在单机上搭建虚拟的hadoop集群,再测试一下性能


 

© 著作权归作者所有

共有 人打赏支持
h
粉丝 1
博文 4
码字总数 5225
作品 0
solr的DIH操作同步mysql数据

1.创建MySQL数据 CREATE TABLE ( INT(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '城市编号', INT(10) UNSIGNED NOT NULL COMMENT '省份编号', VARCHAR(25) NULL DEFAULT NULL COMMENT ......

yushiwh ⋅ 2017/09/28 ⋅ 0

一步一步学solr:在开始前我们应该明白什么

我就用自己的项目来讲solr应用了,当然他的功能很多,大家可以看这里 http://my.oschina.net/fengnote/blog/288581 功能那是相当的多。 solr可以理解为与应用分离的一个搜索服务,我们要搭建...

王爵nice ⋅ 2014/07/23 ⋅ 0

技术分享:如何用Solr搭建大数据查询平台

0×00 开头照例扯淡 自从各种脱裤门事件开始层出不穷,在下就学乖了,各个地方的密码全都改成不一样的,重要帐号的密码定期更换,生怕被人社出祖宗十八代的我,甚至开始用起了假名字,我给自己起一...

广岛秋泽 ⋅ 2016/03/30 ⋅ 0

票据打印软件--FCNS.Bill 票据通

完全免费的票据打印软件. FCNS.Bill票据通采用当今最先进的.NET技术开发,是国内第一套全开放式、模板最多、功能最强的票据打印管理软件。其主要功能: 1、国内首创全开放式理念设计,所有票...

yuhaoping ⋅ 2011/02/06 ⋅ 1

如何使用Hive集成Solr?

(一)Hive+Solr简介 Hive作为Hadoop生态系统里面离线的数据仓库,可以非常方便的使用SQL的方式来离线分析海量的历史数据,并根据分析的结果,来干一些其他的事情,如报表统计查询等。 Solr...

九劫散仙 ⋅ 2016/03/17 ⋅ 0

solr与关系型数据库整合

oracle数据库里记录越来越多,模糊查询越来越慢,所以想用solr来做全文检索,比如要检索单据,但是单据和好几张表关联,用solr无法实现关系型数据库的表连接。 我们现在想到的做法是将要检索...

一个人混 ⋅ 2016/04/16 ⋅ 1

Solrj操作Solr4.0 中CloudSolrServer的一般过程

Solrj操作Solr4.0 中CloudSolrServer的一般过程 一、准备工作:创建eclipse工程 1. 下载solr4.0_src的源码包,解压后,进入目录中, 在命令行执行:ant eclipse 则可以生成一个eclipse工程,...

小样 ⋅ 2013/02/26 ⋅ 0

探讨数据库的数据导入方法

每个数据库治理员都会面临数据导入的问 题,这有可能发生在数据库的新老移植过程中,或者是在数据库崩溃后的恢复重建过程中,还有可能是在创建测试数据库的模拟环境过程中,总之作为一名合格...

紫地瓜 ⋅ 2012/11/27 ⋅ 0

(三)solrj使用

1、查询 (1)获取cloud solr server Collapse source /** * 获取cloud solr server * @author caibosi * @created 2013-12-04 */public enum SolrServerManager { CONTRACT_LIST("collection......

xixicat ⋅ 2014/03/30 ⋅ 1

solr cloud 入门示例

本次实验采用版本 solr-6.0.0.tgz zookeeper-3.4.8.tar.gz 安装启动 zookeeper,zookeeper 采用默认的配置 bin/zkServer.sh start 修改 solr 默认的配置文件 cp server/solr/solr.xml server......

walle-Liao ⋅ 2016/05/29 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

容器之重命名镜像

使用docker tag命令来重命名镜像名称,先执行help,查看如何使用如下 mjduan@mjduandeMacBook-Pro:~/Docker % docker tag --helpUsage:docker tag SOURCE_IMAGE[:TAG] TARGET_IMAGE[:TA...

汉斯-冯-拉特 ⋅ 13分钟前 ⋅ 0

with 的高级用法

那么 上下文管理器 又是什么呢? 上下文管理器协议包含 __enter__ 和 __exit__ 两个方法。with 语句开始运行时,会在上下文管理器对象上调用 __enter__ 方法。with 语句运行结束后,会在上下...

阿豪boy ⋅ 33分钟前 ⋅ 0

使用 jsoup 模拟登录 urp 教务系统

需要的 jsoup 相关 jar包:https://www.lanzous.com/i1abckj 1、首先打开教务系统的登录页面,F12 开启浏览器调试,注意一下 Request Headers 一栏的 Cookie 选项,我们一会需要拿这个 Cook...

大灰狼时间 ⋅ 33分钟前 ⋅ 0

关于线程的创建

转自自己的笔记: http://note.youdao.com/noteshare?id=87584d4874acdeaf4aa027bdc9cb7324&sub=B49E8956E145476191C3FD1E4AB40DFA 1.创建线程的方法 Java使用Thread类代表线程,所有的线程对......

MarinJ_Shao ⋅ 44分钟前 ⋅ 0

工厂模式学习

1. 参考资料 工厂模式-伯乐在线 三种工厂-思否 深入理解工厂模式 2. 知识点理解 2.1 java三种工厂 简单工厂 工厂模式 抽象工厂 2.2 异同点 逐级复杂 简单工厂通过构造时传入的标识来生产产品...

liuyan_lc ⋅ 56分钟前 ⋅ 0

Java NIO

1.目录 Java IO的历史 Java NIO之Channel Java NIO之Buffer Java NIO之Selector Java NIO之文件处理 Java NIO之Charset Java 可扩展IO 2.简介 “IO的历史”讲述了Java IO API从开始到现在的发...

士别三日 ⋅ 今天 ⋅ 0

[Err] ORA-24344: success with compilation error

从txt文本复制出创建function的脚本,直接执行,然后报错:[Err] ORA-24344: success with compilation error。 突然发现脚本的关键字,居然不是高亮显示。 然后我把脚本前面的空格去掉,执行...

wenzhizhon ⋅ 今天 ⋅ 0

Spring Security授权过程

前言 本文是接上一章Spring Security认证过程进一步分析Spring Security用户名密码登录授权是如何实现得; 类图 调试过程 使用debug方式启动https://github.com/longfeizheng/logback该项目,...

hutaishi ⋅ 今天 ⋅ 0

HAProxy基于KeepAlived实现Web高可用及动静分离

前言 软件负载均衡一般通过两种方式来实现: 基于操作系统的软负载实现 基于第三方应用的软负载实现 LVS是基于Linux操作系统实现的一种软负载,而HAProxy则是基于第三方应用实现的软负载。 ...

寰宇01 ⋅ 今天 ⋅ 0

微软自研处理器的小动作:已经开始移植其他平台的工具链

微软将 Windows 10 、Linux 以及工具链如 C/C++ 和 .NET Core 运行时库、Visual C++ 2017 命令行工具、RyuJIT 编辑器等移植到其自主研发的处理器架构 E2。微软还移植了广泛使用的 LLVM C/C++...

linux-tao ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部