用rocketMQ写文件传输有意义吗?

原创
03/04 10:56
阅读数 177

用rocketMQ写文件传输有意义吗?

我不明白为什么会有人考虑用消息队列写文件同步!!!!?

rt,我也不清楚用rocketMQ写文件传输是否有意义,突然觉得可以试一下,于是这么写了。我不清楚rocketMQ在这些场景是否存在优势,或者存在什么劣势,我只管写,写完之后我去对比,尝试,然后得出结论:我再也不会考虑使用rocketMQ来写文件同步了。

功能需求

本demo实现的功能是监听本地某个文件夹的文件创建和修改,并实现和服务器的同步。

环境配置

本文采用的环境如下:

Redis-x64-3.0.504

Apache RocketMQ-4.2.0

pom配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ezsyncxz</groupId>
    <artifactId>efficiency</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>efficiency</name>
    <description>How to improve transmission efficiency</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.directory.studio/org.apache.commons.io -->
        <dependency>
            <groupId>org.apache.directory.studio</groupId>
            <artifactId>org.apache.commons.io</artifactId>
            <version>2.4</version>
        </dependency>



        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.9</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

efficiency流程图

efficiency流程图.jpg

文件监听

参考代码:

package com.ezsyncxz.efficiency.fileMonitor;

import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import java.io.File;

public class FileMonitor {

    FileAlterationMonitor monitor = null;

    public FileMonitor(long interval) {
        monitor = new FileAlterationMonitor(interval);
    }

    public void monitor(String path, FileAlterationListener listener) {
        FileAlterationObserver observer = new FileAlterationObserver(new File(path));
        monitor.addObserver(observer);
        observer.addListener(listener);
    }

    public void stop() throws Exception {
        monitor.stop();
    }

    public void start() throws Exception {
        monitor.start();
    }
}
package com.ezsyncxz.efficiency.fileMonitor;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.ezsyncxz.efficiency.data.DataCollection;
import com.ezsyncxz.efficiency.utils.ApplicationContextUtil;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;

public class FileListener implements FileAlterationListener {

    private static final Logger logger = LoggerFactory.getLogger(FileListener.class);

    FileMonitor monitor = null;

    @Override
    public void onStart(FileAlterationObserver observer) {
//        logger.warn("正在监控文件 文件夹:{}", observer.getDirectory().getAbsolutePath());
    }

    @Override
    public void onDirectoryCreate(File directory) {
        logger.warn("监控到文件夹创建动作,开始同步数据 文件夹:{}", directory.getName());
    }

    @Override
    public void onDirectoryChange(File directory) {
        logger.warn("监听到文件夹变化动作,开始增量同步 文件夹:{}", directory.getName());
    }

    @Override
    public void onDirectoryDelete(File directory) {
        logger.warn("监听到文件夹删除动作 文件夹:{}", directory.getName());
    }

    @Override
    public void onFileCreate(File file) {
        logger.warn("监听到文件新建动作,启动同步任务,开始文件同步 文件名:{}", file.getName());
        try {
            DataCollection dataCollection = ApplicationContextUtil.getBean(DataCollection.class);
            dataCollection.collect(file.getAbsolutePath());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onFileChange(File file) {
        logger.warn("监听到文件变化动作,开始增量同步 文件名:{}", file.getName());
    }

    @Override
    public void onFileDelete(File file) {
        logger.warn("监听到文件删除动作 文件名:{}", file.getName());
    }

    @Override
    public void onStop(FileAlterationObserver observer) {
//        logger.warn("关闭文件监控");
    }

}

监听文件夹,我们用到的是Apache的commons.io包。通过这个包,我们可以监听到文件、文件夹的创建和修改,这样我们就能针对文件的创建进行全量同步,文件的修改利用Rsync算法进行增量同步,对文件夹我们可以进行归档压缩的方式进行同步,当然也可以对文件夹进行递归同步,但是我觉得这样实现的话可能会比较麻烦。这里暂时是只实现了文件创建时候的同步动作。

文件采集

参考代码:

package com.ezsyncxz.efficiency.data;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.ezsyncxz.efficiency.entity.FileFragment;
import com.ezsyncxz.efficiency.utils.ByteUtils;
import com.ezsyncxz.efficiency.utils.CompressUtils;
import com.ezsyncxz.efficiency.utils.FileUtils;
import com.ezsyncxz.efficiency.utils.MD5Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;

/**
 * @ClassName DataCollection
 * @Description 采集指定文件夹数据,将数据读取,归档,分块写入消息队列中,rocketMq能够存入的消息大小最大为4MB,因此我们需要对过大的数据进行切割
 * @Author chenwj
 * @Date 2020/2/24 15:46
 * @Version 1.0
 **/
@Component
public class DataCollection {

    private static final Logger logger = LoggerFactory.getLogger(DataCollection.class);

    @Autowired
    private DefaultMQProducer producer;

    public static final String tar = "D:\\chenwj\\dev\\test\\efficiency_tar\\";

    /**
     * 采集文件夹下所有的文件,包括文件夹
     * @param src
     */
    public void collect(String src) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        File file = new File(src);

        String filename = file.getName();

        // 文件不存在则返回
        if(!file.exists()) {
            logger.error("不存在该文件路径: {}", src);
            return;
        }

        boolean needCompress = false;

        // 读取文件为字节数组
        byte[] message = FileUtils.File2byte(src);
        int msgTotalSize = message.length;
        int orderID = 0;
        int maxSize = 3000000;
        String tag = MD5Utils.MD54bytes(message);
        int msgCount = (int) Math.ceil((message.length / maxSize));

        // 切割消息,rocket能够接受的消息大小为4mb
        while (message.length > maxSize) {
            byte[] subBytes = ByteUtils.subBytes(message, 0, maxSize);
            // 调用消息队列进行传输
            FileFragment fileFragment = FileFragment.newBuilder()
                    .body(subBytes)
                    .msgCount(msgCount)
                    .needCompress(needCompress)
                    .tarPath(tar)
                    .filename(filename)
                    .score(orderID)
                    .build();
            Message sendMessage = new Message("DemoTopic", tag, JSONObject.toJSONString(fileFragment).getBytes());
            message = ByteUtils.subBytes(message, maxSize, message.length - maxSize);
            producer.send(sendMessage, (mqs, msg, arg) -> {
                int o = (int) arg;
                int index = o % mqs.size();
                return mqs.get(index);
            }, orderID);
//            logger.warn("发送的消息id为: {}", orderID);
            orderID += 1;
        }

        // 传输最后一段消息
        if(message.length > 0) {
            // 调用消息队列进行传输
            FileFragment fileFragment = FileFragment.newBuilder()
                    .needCompress(needCompress)
                    .body(message)
                    .msgCount(msgCount)
                    .tarPath(tar)
                    .filename(filename)
                    .score(orderID)
                    .build();
            Message sendMessage = new Message("DemoTopic", tag, JSONObject.toJSONString(fileFragment).getBytes());
            producer.send(sendMessage, (mqs, msg, arg) -> {
                int o = (int) arg;
                int index = o % mqs.size();
                return mqs.get(index);
            }, orderID);
//            logger.warn("发送的消息id为:{}", orderID);
        }

        logger.warn("消息传输完毕 消息总大小:{}字节 消息总数:{} 消息哈希:{} 消息目标路径: {}", msgTotalSize, msgCount, tag, tar + filename);

    }
}

文件采集程序实际上是将本地文件读取成字节数组,切割字节数组为一个个文件片段,因为本文采用的是rocketMq,rocketMq不支持大文件传输,他支持的每个消息大小为4mb,预保留对象结构可能占用的大小,我这里保守地每次传输3000000个字节,对于文件稍微大一点的数据传输可能会产生巨多的消息。这里发送顺序消息或者普通消息对结果不会有影响,只要对文件片段编号,消费者会根据编号顺序重组消息,所以消费者可以并发地消费消息,生产者也可以并发地生产消息。

消费端重组文件

package com.ezsyncxz.efficiency.consumer;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.ezsyncxz.efficiency.entity.FileFragment;
import com.ezsyncxz.efficiency.mq.annotation.MQConsumeService;
import com.ezsyncxz.efficiency.mq.entity.MQConsumeResult;
import com.ezsyncxz.efficiency.mq.processor.AbstractMQMsgProcessor;
import com.ezsyncxz.efficiency.redis.RedisUtil;
import com.ezsyncxz.efficiency.utils.ByteUtils;
import com.ezsyncxz.efficiency.utils.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.Set;

/**
 * @ClassName DataCollectionConsumer
 * @Description TODO
 * @Author chenwj
 * @Date 2020/2/25 15:49
 * @Version 1.0
 **/

@MQConsumeService(topic = "DemoTopic", tags = {"*"})
public class DataCollectionConsumer extends AbstractMQMsgProcessor {

    @Autowired
    private RedisUtil redisUtil;

    @Override
    protected MQConsumeResult consumeMessage(String tag, List<String> keys, MessageExt messageExt) {
//        logger.warn("{}接收到来自{}的消息,开始处理...", Thread.currentThread().getName(), tag);
        try {
            byte[] body = messageExt.getBody();
            String bodyString = new String(body);
            FileFragment fileFragment = JSONObject.parseObject(bodyString, FileFragment.class);
//            logger.warn("接收到消息 消息编号:{}", fileFragment.getScore());
            redisUtil.zsSetAndSorte(tag, bodyString, fileFragment.getScore());
//            logger.warn("消息已写入缓存!");

            // 当文件的所有片段读完,开始写入磁盘
            byte[] fileBody = new byte[0];
            long size = redisUtil.zsGetSize(tag);
            if (size == fileFragment.getMsgCount()) {
                Set<Object> objects = redisUtil.zsGetAsc(tag);
                for (Object object : objects) {
                    String fragmentString = (String) object;
                    FileFragment fragment = JSONObject.parseObject(fragmentString, FileFragment.class);
                    fileBody = ByteUtils.concateBytes(fileBody, fragment.getBody());
                }
                FileUtils.byte2File(fileBody, fileFragment.getTarPath(), fileFragment.getFilename());
                redisUtil.del(tag);
                logger.warn("文件写入完毕!已删除缓存 文件路径:{}", fileFragment.getTarPath() + fileFragment.getFilename());
            }
            return MQConsumeResult.newBuilder().isSuccess(true).build();
        }catch (Exception e) {
            logger.warn("文件写入异常,删除缓存");
            e.printStackTrace();
            redisUtil.del(tag);
        }
        return MQConsumeResult.newBuilder().isSuccess(true).build();
    }
}

消费端接收到文件片段之后会暂存到redis缓存,以tag,也就是文件哈希为键,片段编号为score存入到有序集合中,当文件全部读取完毕则从缓存中按正序取出拼接后写入磁盘,再删除缓存。

测试结果

第一次测试结果.png

从测试结果不难看出,用redis发生了并发的问题。问题不难分析,消费者并发地消费消息,没有加锁,所以在if (size == fileFragment.getMsgCount()) {这里发生了并发问题,导致将文件多次写入磁盘,这种并发问题不会影响结果,因为不论文件多少次写入磁盘,结果都是一样的,但是它造成了不必要的浪费。

这里用到了redis应该是一大败笔,以前在工作中用到redis能明显的感觉到redis的维护是一个十分艰巨的过程,一不留心在redis中留下一点脏数据,长久之后就会变得难以维护,此外,redis是一个作为缓存数据库,他并不考虑被用作文件的中转缓存,因为文件太大了,他更多地被考虑缓存维护一些对象的中间状态,或者说作为影子设备的存在。因此在实现了以redis作为文件中转的方案后,我听取了一些大佬的意见,用RandomAccessFile对这一块进行改进。

用RandomAccessFile改进

改进后的DataCollection:

package com.ezsyncxz.efficiency.data;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.ezsyncxz.efficiency.entity.FileFragment;
import com.ezsyncxz.efficiency.utils.ByteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.UUID;

/**
 * @ClassName DataCollection
 * @Description 采集指定文件夹数据,将数据读取,归档,分块写入消息队列中,rocketMq能够存入的消息大小最大为4MB,因此我们需要对过大的数据进行切割
 * @Author chenwj
 * @Date 2020/2/24 15:46
 * @Version 1.0
 **/
@Component
public class DataCollection {

    private static final Logger logger = LoggerFactory.getLogger(DataCollection.class);

    @Autowired
    private DefaultMQProducer producer;

    public static final String tar = "D:\\chenwj\\dev\\test\\efficiency_tar\\";

    /**
     * 采集文件夹下所有的文件,包括文件夹
     *
     * @param src
     */
    public void collect(String src) throws IOException, InterruptedException, RemotingException, MQClientException, MQBrokerException {

        File file = new File(src);

        String filename = file.getName();

        // 文件不存在则返回
        if (!file.exists()) {
            logger.error("不存在该文件路径: {}", src);
            return;
        }

        // 用文件随机读写的方式读取文件片段
        int len = 3000000; // 每个消息文件片段的大小
        int off = 0; // 每个消息片段的偏移量
        byte[] bytes = new byte[len]; // 缓冲接收文件
        long length = file.length(); // 文件大小
        RandomAccessFile r = new RandomAccessFile(src, "r");
        int rLen = 0; // 每次读取的字节数
        String tag = UUID.randomUUID().toString();
        long startTime = System.currentTimeMillis();
        while ((rLen = r.read(bytes)) > 0) {

            if(rLen != bytes.length) {
                bytes = ByteUtils.subBytes(bytes, 0, rLen);
                logger.warn("最后一个文件不足{}B", len);
            }

            FileFragment fileFragment = FileFragment.newBuilder()
                    .filename(filename)
                    .tarPath(tar)
                    .body(bytes)
                    .needCompress(false)
                    .length(length)
                    .off(off)
                    .startTime(startTime)
                    .build();

            Message sendMessage = new Message("DemoTopic", tag, JSONObject.toJSONString(fileFragment).getBytes());
            producer.send(sendMessage);
            off += rLen;
        }
        r.close();
    }

    public static void main(String[] args){

    }
}

改进后的DataCollectionConsumer:

package com.ezsyncxz.efficiency.consumer;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.ezsyncxz.efficiency.entity.FileFragment;
import com.ezsyncxz.efficiency.mq.annotation.MQConsumeService;
import com.ezsyncxz.efficiency.mq.entity.MQConsumeResult;
import com.ezsyncxz.efficiency.mq.processor.AbstractMQMsgProcessor;
import com.ezsyncxz.efficiency.redis.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.*;
import java.util.List;

/**
 * @ClassName DataCollectionConsumer
 * @Description TODO
 * @Author chenwj
 * @Date 2020/2/25 15:49
 * @Version 1.0
 **/

@MQConsumeService(topic = "DemoTopic", tags = {"*"})
public class DataCollectionConsumer extends AbstractMQMsgProcessor {

    @Autowired
    private RedisUtil redisUtil;

    @Override
    protected MQConsumeResult consumeMessage(String tag, List<String> keys, MessageExt messageExt) {
        // 用RandomAccessFile直接将拿到的片段写入磁盘
        byte[] body = messageExt.getBody();
        String bodyString = new String(body);
        FileFragment fileFragment = JSONObject.parseObject(bodyString, FileFragment.class);
        String tarPath = fileFragment.getTarPath();
        String filename = fileFragment.getFilename();
        String path = tarPath + File.separator + filename;
        File file = new File(path);
        if (!file.exists()) {
            try {
                file.createNewFile();
                logger.warn("创建新文件 文件名: {}", file.getName());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        try {
            RandomAccessFile w = new RandomAccessFile(path, "rw");
            w.seek(fileFragment.getOff());
            w.write(fileFragment.getBody());
            logger.warn("写入文件片段 文件名:{} 偏移量:{}", filename, fileFragment.getOff());
            w.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        long incr = redisUtil.incr(tag, fileFragment.getBody().length);
        if(incr == fileFragment.getLength()) {
            long endTime = System.currentTimeMillis();
            logger.warn("文件写入完毕 总耗时:{}ms 文件:{}", endTime - fileFragment.getStartTime(), tarPath + File.separator + filename);
            redisUtil.del(tag);
        }
        return MQConsumeResult.newBuilder().isSuccess(true).build();
    }
}

之后测试文件同步,同步之后的文件对比文件哈希是没有任何问题的,证明同步是可以的。但是在同步一个800多MB的文件时,基于消息队列的同步方式花费了差不多40s,而直接建立socket连接的方式则差不多是8s左右。原因大概是通过消息队列的方式同步文件,存在对文件的多次io,在消费端,频繁地开启和关闭和本地磁盘的通道,花费了大量时间,以及多个消息在生产消费的过程中也存在时间消耗。此外,还测试了多个小文件同步,我原本想法是mq并发消费,在处理多个小文件同步上或许能够优于socket传输,事实证明,在多个小文件的传输上还是用socket更有优势,在776个小文件的传输中,mq的方式花费了22s左右,而socket的方式花费了8s左右。

用socket替换掉mq

替换后的DataCollection

package com.ezsyncxz.efficiency.data;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.ezsyncxz.efficiency.utils.ByteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * @ClassName DataCollection
 * @Description 采集指定文件夹数据,将数据读取,归档,分块写入消息队列中,rocketMq能够存入的消息大小最大为4MB,因此我们需要对过大的数据进行切割
 * @Author chenwj
 * @Date 2020/2/24 15:46
 * @Version 1.0
 **/
@Component
public class DataCollection {

    private static final Logger logger = LoggerFactory.getLogger(DataCollection.class);

    @Autowired
    private DefaultMQProducer producer;

    public static final String tar = "D:\\chenwj\\dev\\test\\efficiency_tar\\";

    /**
     * 采集文件夹下所有的文件,包括文件夹
     *
     * @param src
     */
    public void collect(String src) throws IOException, InterruptedException, RemotingException, MQClientException, MQBrokerException {

        File file = new File(src);

        String filename = file.getName();

        // 文件不存在则返回
        if (!file.exists()) {
            logger.error("不存在该文件路径: {}", src);
            return;
        }

        RandomAccessFile accessFile = new RandomAccessFile(src, "r");
        int length = 0;
        double sumL = 0 ;
        byte[] sendBytes = null;
        Socket socket = null;
        DataOutputStream dos = null;
        boolean bool = false;
        try {
            long l = file.length();
            socket = new Socket();
            socket.connect(new InetSocketAddress("127.0.0.1", 48123));
            dos = new DataOutputStream(socket.getOutputStream());
            sendBytes = new byte[1024];

            //传输文件路径,前4个字节是长度
            String fileName = file.getName();
            String filePath = tar + File.separator + fileName;
            int len = filePath.getBytes().length;
            byte[] lenBytes = ByteUtils.intToByteArray(len);
            byte[] bytes = ByteUtils.concateBytes(lenBytes, filePath.getBytes());
            dos.write(bytes);
            dos.flush();

            // 传输文件内容
            while ((length = accessFile.read(sendBytes)) > 0) {
                sumL += length;
                logger.warn("已传输:{}", ((sumL/l)*100)+"%");
                dos.write(sendBytes, 0, length);
                dos.flush();
            }
            if(sumL==l){
                bool = true;
            }
        }catch (Exception e) {
            System.out.println("客户端文件传输异常");
            bool = false;
            e.printStackTrace();
        } finally{
            if (dos != null)
                dos.close();
            if (socket != null)
                socket.close();
        }
        if(bool) {
            logger.warn("传输完毕!");
        } else {
            logger.warn("文件传输失败!");
        }
    }
}

新增服务端BxServerSocket代码:

package com.ezsyncxz.efficiency.socket;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;


/**
 * 接收文件服务
 * @author admin_Hzw
 *
 */
public class BxServerSocket {
	
	/**
	 * 工程main方法
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			final ServerSocket server = new ServerSocket(48123);
			Thread th = new Thread(new Runnable() {
				public void run() {
					while (true) {
						try {  
							System.out.println("开始监听...");
							/*
							 * 如果没有访问它会自动等待
							 */
							Socket socket = server.accept();
							System.out.println("有链接");
							receiveFile(socket);
						} catch (Exception e) {
							System.out.println("服务器异常");
							e.printStackTrace();
						}
					}
				}
			});
			th.run(); //启动线程运行
		} catch (Exception e) {
			e.printStackTrace();
		}     
	}
 
	public void run() {
	}
 
	/**
	 * 接收文件方法
	 * @param socket
	 * @throws IOException
	 */
	public static void receiveFile(Socket socket) throws IOException {
		byte[] inputByte = null;
		int length = 0;
		DataInputStream dis = null;
		FileOutputStream fos = null;
		try {
			try {
				dis = new DataInputStream(socket.getInputStream());
				/*
				 * 文件存储位置  
				 */
				int len = dis.readInt();
				byte[] bytes = new byte[len];
				dis.read(bytes);
				java.lang.String filePath = new java.lang.String(bytes);

				fos = new FileOutputStream(new File(filePath));
				inputByte = new byte[1024];   
				System.out.println("开始接收数据...");  
				while ((length = dis.read(inputByte)) > 0) {
					fos.write(inputByte, 0, length);
					fos.flush();    
				}
				System.out.println("完成接收:"+"");
			} finally {
				if (fos != null)
					fos.close();
				if (dis != null)
					dis.close();
				if (socket != null)
					socket.close(); 
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

取消了队列的生产者消费者模式,而是直接在服务端开启一个socket端口,直接传输数据。

上面相关的代码,我已上传到github,有兴趣的可以下载来看看。

参考

SpringBoot整合Redis及Redis工具类撰写

RocketMq消息监听程序中消除大量的if..else

对我的文章感兴趣的话,请关注我的公众号 公众号二维码

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部