文档章节

大数据量txt文本数据分块去重

风吹屁屁凉
 风吹屁屁凉
发布于 2017/01/11 17:17
字数 1481
阅读 233
收藏 1
package cn;
import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Created by fanling.meng
 */
public class PartFile {
    // 内存监控
    final static Runtime currRuntime = Runtime.getRuntime();
    // 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用
    final static long MEMERY_LIMIT = 1024 * 1024 * 3;
    // 内存限制,我内存最多容纳的文件大小
    static final long FILE_LIMIT_SIZE = 1024 * 1024 * 20;
    // 文件写入缓冲区 ,我默认1M
    static final int CACHE_SIZE = 1024 * 1024;
    // 默认文件后缀
    static final String FILE_SUFFIX = ".txt";
    // 临时分割的文件目录,可以删除~。~
    static final String FILE_PREFIX = "test/";
    // 汇总的文件名
    static final String REQUST_FILE_NAME = "resultFile.txt";
    // 存放大文件 引用,以及分割位置
    static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>();
    // 存放小文件的,驱除重复数据
    static Map<String, String> fileLinesMap = new HashMap<String, String>(10000);
    public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        new PartFile().partFile(new File("bigData.txt"), Integer.MAX_VALUE,
                Integer.MIN_VALUE);
        long result = System.currentTimeMillis() - begin;
        System.out.println("除去重复时间为:" + result + " 毫秒");
    }
    // 按hashCode 范围分割
    public void partFile(File origFile, long maxNum, long minNum) {
        String line = null;
        long hashCode = 0;
        long max_left_hashCode = 0;
        long min_left_hashCode = 0;
        long max_right_hashCode = 0;
        long min_right_hashCode = 0;
        BufferedWriter rightWriter = null;
        BufferedWriter leftWriter = null;
        BufferedReader reader = null;
        try {
            reader = new BufferedReader(new FileReader(origFile));
            long midNum = (maxNum + minNum) / 2;
            // 以文件hashCode 范围作为子文件名
            File leftFile = new File(FILE_PREFIX + minNum + "_" + midNum
                    + FILE_SUFFIX);
            File rightFile = new File(FILE_PREFIX + midNum + "_" + maxNum
                    + FILE_SUFFIX);
            leftWriter = new BufferedWriter(new FileWriter(leftFile),
                    CACHE_SIZE);
            rightWriter = new BufferedWriter(new FileWriter(rightFile),
                    CACHE_SIZE);
            ChildFile leftChild = new ChildFile(leftFile);
            ChildFile rightChild = new ChildFile(rightFile);
            // 字符串 组合写入也行
            // StringBuilder leftStr = new StringBuilder(100000);
            // StringBuilder rightStr = new StringBuilder(100000);
            // hashCode 的范围作为分割线
            while ((line = reader.readLine()) != null) {
                hashCode = line.hashCode();
                if (hashCode > midNum) {
                    if (max_right_hashCode < hashCode
                            || max_right_hashCode == 0) {
                        max_right_hashCode = hashCode;
                    } else if (min_right_hashCode > hashCode
                            || min_right_hashCode == 0) {
                        min_right_hashCode = hashCode;
                    }
                    // 按行写入缓存
                    writeToFile(rightWriter, line);
                } else {
                    if (max_left_hashCode < hashCode || max_left_hashCode == 0) {
                        max_left_hashCode = hashCode;
                    } else if (min_left_hashCode > hashCode
                            || min_left_hashCode == 0) {
                        min_left_hashCode = hashCode;
                    }
                    writeToFile(leftWriter, line);
                }
            }
            // 保存子文件信息
            leftChild.setHashCode(min_left_hashCode, max_left_hashCode);
            rightChild.setHashCode(min_right_hashCode, max_right_hashCode);
            closeWriter(rightWriter);
            closeWriter(leftWriter);
            closeReader(reader);
            // 删除原始文件,保留最原始的文件
            if (!origFile.getName().equals("bigData.txt")) {
                origFile.delete();
            }
            // 分析子文件信息,是否写入或者迭代
            analyseChildFile(rightChild, leftChild);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    // 分析子文件信息
    public void analyseChildFile(ChildFile rightChild, ChildFile leftChild) {
        // 将分割后 还是大于内存的文件保存 继续分割
        File rightFile = rightChild.getChildFile();
        if (isSurpassFileSize(rightFile)) {
            bigChildFiles.add(rightChild);
        } else if (rightFile.length() > 0) {
            orderAndWriteToFiles(rightFile);
        }
        File leftFile = leftChild.getChildFile();
        if (isSurpassFileSize(leftFile)) {
            bigChildFiles.add(leftChild);
        } else if (leftFile.length() > 0) {
            orderAndWriteToFiles(leftFile);
        }
        // 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出
        if (bigChildFiles.size() > 0) {
            ChildFile e = bigChildFiles.get(bigChildFiles.size() - 1);
            bigChildFiles.remove(e);
            // 迭代分割
            partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode());
        }
    }
    // 将小文件读到内存排序除重复
    public void orderAndWriteToFiles(File file) {
        BufferedReader reader = null;
        String line = null;
        BufferedWriter totalWriter = null;
        StringBuilder sb = new StringBuilder(1000000);
        try {
            totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,
                    true), CACHE_SIZE);
            reader = new BufferedReader(new FileReader(file));
            while ((line = reader.readLine()) != null) {
                if (!fileLinesMap.containsKey(line)) {
                    fileLinesMap.put(line, null);
                    sb.append(line + "\r\n");
                    // totalWriter.write(line+"\r\n");
                }
            }
            totalWriter.write(sb.toString());
            fileLinesMap.clear();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeReader(reader);
            closeWriter(totalWriter);
            // 删除子文件
            file.delete();
        }
    }
    // 判断该文件是否超过 内存限制
    public boolean isSurpassFileSize(File file) {
        return FILE_LIMIT_SIZE < file.length();
    }
    // 将数据写入文件
    public void writeToFile(BufferedWriter writer, String writeInfo) {
        try {
            writer.write(writeInfo + "\r\n");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    // 关闭流
    public void closeReader(Reader reader) {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    // 关闭流
    public void closeWriter(Writer writer) {
        if (writer != null) {
            try {
                writer.flush();
                writer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    // 内部类,记录子文件信息
    class ChildFile {
        // 文件 和 内容 hash 分布
        File childFile;
        long maxHashCode;
        long minHashCode;
        public ChildFile(File childFile) {
            this.childFile = childFile;
        }
        public ChildFile(File childFile, long maxHashCode, long minHashCode) {
            super();
            this.childFile = childFile;
            this.maxHashCode = maxHashCode;
            this.minHashCode = minHashCode;
        }
        public File getChildFile() {
            return childFile;
        }
        public void setChildFile(File childFile) {
            this.childFile = childFile;
        }
        public long getMaxHashCode() {
            return maxHashCode;
        }
        public void setMaxHashCode(long maxHashCode) {
            this.maxHashCode = maxHashCode;
        }
        public long getMinHashCode() {
            return minHashCode;
        }
        public void setMinHashCode(long minHashCode) {
            this.minHashCode = minHashCode;
        }
        public void setHashCode(long minHashCode, long maxHashCode) {
            this.setMaxHashCode(maxHashCode);
            this.setMinHashCode(minHashCode);
        }
    }
}

方法分析:

       1.采用hashCode 范围迭代分割的方式,可以分割成内存可以容纳的小文件,然后完成功能

       2.我们发现每次迭代,相当于重复读取里面的文件,然后再进行分割,这样浪费了很多时间,那么有没有更好的方式呢? 我们可以这样设计,假设我们知道文件的总大小,已经大概的行数,比如2G,1亿行,我们一开始就分配区间,在分配完全均匀的情况下,1亿行数据,最多占用1亿个空间,那么可以这样分配,用hashCode 的范围,也就是Integer的最大值和最小值进行模拟分配。分配范围 根据内存进行,比如:读取第一行的的hashCode 值为100,那么,我们可以分配到1-1000000,(这里以100W 为单位),也就是说只要hashCode 范围在这个区间的都分配到这里,同理,读到任何一个hashCode 值,除以单位(100W),就能找到你的区间,比如hasCode 是 2345678,那么就是200W-300W的区间。这里有些区间可能空的,就删除了,有些区间很多,就用上面的迭代,空间足够 就直接写入汇总文件。当然区间单位的颗粒度划分,根据内存和数据总量 自己弄,这样下来就会一次读取 ,就能尽量的分配均匀,就不会大量迭代读取浪费时间了。

       3.我们发现分割的时候是直接写入,没有进行任何排序或者其他操作,如果我们在分割的时候保存一部分到集合内存,发现有重复的,就不写入分割文件了,如果写入量超过集合限制了,就清空集合,这样能保证单个小文件 一次达到除重复的效果,大文件部分除重复,减少子文件的数据量,如果重复数据较多,可以采用,极端的情况下完全不重复,那么集合会浪费你的空间,并且除重复的时候会浪费你的时间,这里自己对数据进行整体考虑。

       4.这里内存的控制是我测试进行控制,用JDK 的方法进行内存监控,因为涉及到回收频率 以及时间上的问题,我没有动态的对集合进行监控,占了多少内存,还剩多少内存等等,可以尝试。

© 著作权归作者所有

风吹屁屁凉

风吹屁屁凉

粉丝 11
博文 50
码字总数 11160
作品 0
海淀
程序员
私信 提问
数据分析:基于Python的自定义文件格式转换系统

数据分析:基于Python的自定义文件格式转换系统 (白宁超 2018年7月16日14:47:41 ) 导读:随着大数据的快速发展,自然语言处理、数据挖掘、机器学习技术应用愈加广泛。针对大数据的预处理工作...

伏草惟存
2018/07/18
0
0
关于C++实现内存数据库的问题-为大数据打基础

C++的KV内存数据库类可达到300万次/秒的查询量,难怪有大数据离不开内存数据库的说法。在大数据的技术中,其中一项是“流式计算”。要在数据传输流转的过程中完成相关的计算,必须有高效响应...

hac2009
2014/09/28
818
2
存储大量爬虫数据的数据库,了解一下?

"当然, 并不是所有数据都适合" 在学习爬虫的过程中, 遇到过不少坑. 今天这个坑可能以后你也会遇到, 随着爬取数据量的增加, 以及爬取的网站数据字段的变化, 以往在爬虫入门时使用的方法局限性...

fesoncn
2018/04/09
0
0
《Greenplum:从大数据战略到实践》读者技术沙龙 第一场

活动介绍 Greenplum官方技术教程、《数字化三部曲》之二--《Greenplum:从大数据战略到实践》即将全新发布。这部以全媒体思维打造的技术著作,凝聚了Greenplum研发团队的创新智慧。为了给技...

Greenplum中文社区
04/24
56
0
毕业设计紧急求助!万分感谢!

请问各位,我现在需要将txt文本里的一组数(数据量很大,且数量不定)导入python里,然后把这组数转化为数组。请问这个过程怎么实现?另外,我之后对这组数进行操作,形成一个新的数组,我怎...

刘歪歪爱读书
2014/03/14
101
4

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
7
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
7
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
9
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部