文档章节

Java IO类库之管道流PipeInputStream与PipeOutputStream

 老韭菜
发布于 07/23 00:53
字数 2340
阅读 588
收藏 14

一、java管道流介绍

    在java多线程通信中管道通信是一种重要的通信方式,在java中我们通过配套使用管道输出流PipedOutputStream和管道输入流PipedInputStream完成线程间通信。多线程管道通信的主要流程是在一个线程中向PipedOutputStream写入数据,这些数据会自动传送到对应的管道输入流PipedInputStream中,其他线程通过读取PipeInputStream中缓冲的数据实现多线程间通信。

二、PipedInputStream

 1 - PipedInputStream介绍

    PipeInputStream是管道输入流,继承自InputStream,连接到一个管道输出流PipedOutputStream。可以缓存连接的管道输出流PipedOutputStream写入的字节数据。通常在一个线程使用PipedInputStream读取数据,在其他线程使用PipedOutputStream写入字节数据。不推荐在一个线程中使用PipedInputStream和PipedOutputStream可能会在线程中造成死锁,管道输入流包含一个缓冲区buff用于读操作和写操作。

2 - PipeInputStream源码分析

1)成员变量

package java.io;
public class PipedInputStream extends InputStream {
    //管道输出流是否关闭标记
    boolean closedByWriter = false;
    //管道输入流是否标记
    volatile boolean closedByReader = false;
    //管道输入流与管道输出流是否建立连接
    boolean connected = false;
    //读取“管道”数据即PipedInputStream线程
    Thread readSide;
    //向管道写入数据即PipedOutputStream线程
    Thread writeSide;

    //管道默认大小
    private static final int DEFAULT_PIPE_SIZE = 1024;

    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
 
    // 缓冲区   
    protected byte buffer[];

    //下一个写入字节的位置
    protected int in = -1;

    //下一个读取字节的位置。若out==in说明管道输出流写入的数据全部被读取
    protected int out = 0;
}

2)构造方法

    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
         initPipe(pipeSize);
         connect(src);
    }

    public PipedInputStream() {
        initPipe(DEFAULT_PIPE_SIZE);
    }

    从源码我们可以知道管道输入流PipedInputStream构造方法做了两件事,按照指定大小pipeSize初始化缓冲区,如果还指定了关联的管道输出流PipedOutputStream,那么调用connect方法连接它。如果指定的pipeSize小于等于0那么抛出IllegalArgumentException异常,如果当前的管道输入流已经和指定的管道输出流建立连接那么抛出IOException异常提示连接已经建立。

3)其他成员方法

    //根据指定大小pipeSize初始化缓冲区
    private void initPipe(int pipeSize) {
         if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         }
         buffer = new byte[pipeSize];
    }

    //绑定管道输入流与管道输出流
    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this);
    }

    //接收绑定的管道输出流PipedOutputStream的write(int b)方法写入的int类型数据
    protected synchronized void receive(int b) throws IOException {
        //检查管道状态
        checkStateForReceive();
        //获取“写入管道“即PipedOutputStream的线程
        writeSide = Thread.currentThread();
        //若管道输出流写入的数据全部被读取则等待
        if (in == out)
            awaitSpace();
        if (in < 0) {
            in = 0;
            out = 0;
        }
        //将读取的字节b保存到缓冲区
        buffer[in++] = (byte)(b & 0xFF);
        //如果当前写入的字节数大于缓冲区大小那么从头覆盖之前写入的字节数据
        if (in >= buffer.length) {
            in = 0;
        }
    }

    //接收管道输出流的write(byte b[],int off, int len)方法调用写入的字节数组b
    synchronized void receive(byte b[], int off, int len)  throws IOException {
        //检查管道状态
        checkStateForReceive();
        //获取”写入管道“线程
        writeSide = Thread.currentThread();
        //获取写入字节长度
        int bytesToTransfer = len;
        //循环将字节数组b写入管道输入流内部缓冲数组buffer
        while (bytesToTransfer > 0) {
            //若写入管道的字节长度in等于读取字节长度长度out,则等待
            if (in == out)
                awaitSpace();
            int nextTransferAmount = 0;
            //若管道中读取的字节数out小于写入的字节数in,nextTransferAmount等于buffer.length-in
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            }
            //若管道中写入的字节数小于读取的字节数,
            else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            //将数据复制到缓冲区buffer中
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            //缓冲区溢出继续写入则覆盖原有字节数据
            if (in >= buffer.length) {
                in = 0;
            }
        }
    }
    
    //检查管道状态
    private void checkStateForReceive() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
    }
    
    //等待。若”写入管道“的数据被全部读取完,则唤醒”读取管道“线程继续读取字节数据以让缓冲区空出空间继续写入数据,等待
    //1000ms
    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            /* full: kick any waiting readers */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    //连接的管道输出流关闭时被调用用于更新管道输入流关闭状态,并唤醒读取管道线程
    synchronized void receivedLast() {
        closedByWriter = true;
        notifyAll();
    }

    //从管道输入流更确切的说缓冲区buffer中读取一个字节并转化为int值
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = 2;
        //如果写入字节数小于0,且非写入管道线程异常终止或者管道输出流写入结束正常关闭,那么唤醒写入管道等待1s
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }

        return ret;
    }

    //从管道输入流缓冲数组buffer中读取字节数据并填入数组b中逻辑与read()类似
    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        }
        b[off] = (byte) c;
        int rlen = 1;
        while ((in >= 0) && (len > 1)) {

            int available;

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /* now empty */
                in = -1;
            }
        }
        return rlen;
    }

    //返回可不受阻塞地从此输入流中读取的字节数,注意available返回是缓冲数组中的可读字节长度
    public synchronized int available() throws IOException {
        if(in < 0)
            return 0;
        else if(in == out)
            return buffer.length;
        else if (in > out)
            return in - out;
        else
            return in + buffer.length - out;
    }

    //关闭输出流
    public void close()  throws IOException {
        closedByReader = true;
        synchronized (this) {
            in = -1;
        }
    }

三、PipedOutputStream

1 - PipedOutputStream介绍

    PipedOutputStream是管道输出流,继承自OutputStream。通过与一个管道输入流PipedInputStream建立连接往管道输入流中写入数据,其实质是往绑定的管道输入流的内部缓存区中填入数据。PipedInputStream和PipedOutputStream主要用于线程间通信,不建议在单线程中使用PipedInputStream和PipedOutputStream可能造成死锁。

2 - PipedOutputStream数据结构

public
class PipedOutputStream extends OutputStream {

    //绑定的管道输入流对象
    private PipedInputStream sink;
}

3 - PipedOutputSteam源码分析

1)构造函数

    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);
    }

    public PipedOutputStream() {

    }

    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        sink = snk;
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;
    }

    PipedOutputStream类内部定义了两个构造函数,一个无参构造函数没做啥,我们分析下入参是管道输入流对象引用的构造函数。由源码可知与PipedInputStream构造函数做的一样其实PipedInputStream带参构造函数方法内部调用的也是本方法,参数判空,判断是否此前是否已建立连接,内部管道输入流对象引用指向指定对象,初始化对象的读取写入位置,设置连接状态为已连接。

 

2)void write(byte b[], int off, int len)写入字节数据

    public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                   ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        sink.receive(b, off, len);
    }

    这个方法首先对当前绑定的管道输入流对象进行判空,为空抛出一个IO异常提示未建立管道连接,对写入的字节数组b判空为空抛出NullException异常,接着对待写入字节数组b写入起点off,写入长度len进行范围合法性校验,校验失败抛出IndexOutOfBoundsException数组越界异常,如果写入长度为0直接返回,下面就是调用绑定的管道输入流对象sink.receive(b, off, len)方法,源码解析在前面关于PipedInputStream源码的解析部分。从这边我们知道PipedOutputStream写入数据其实就是调用绑定输入流的receive方法往内部缓冲数组buffer中填入字节数据。

3)其他成员方法

    //写入单个字节,实质就是往绑定的管道输入流内部缓冲数组填入一个字节数据
    public void write(int b)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(b);
    }
   
   //释放输入流对象锁,继续读取缓冲区数据
   public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }

    //调用管道输入流的receiveLast关闭通道
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }
}

 

© 著作权归作者所有

共有 人打赏支持
粉丝 11
博文 64
码字总数 107681
作品 0
杭州
后端工程师
加载中

评论(1)

chally
chally
继续啊,写完啊,希望写的详细点。:stuck_out_tongue:
JAVA 字符流与字节流的区别

Java 流在处理上分为字符流和字节流。字符流处理的单元为 2 个字节的 Unicode 字符,分别操作字符、字符数组或字符串,而字节流处理单元为 1 个字节,操作字节和字节数组。 Java 内用 Unicod...

owensliu
2014/03/25
0
1
JAVA IO 设计模式彻底分析

一。引子(概括地介绍Java的IO)   无论是哪种编程语言,输入跟输出都是重要的一部分,Java也不例外,而且Java将输入/输出的功能和使用范畴做了很大的扩充。它采用了流的 机制来实现输入...

2k10
2015/03/13
0
0
做几道基础的Java测试题,看看最近有进步吗?欢迎来学习

Java是一种可以撰写跨平台应用软件的面向对象的程序设计语言。Java 技术具有卓越的通用性、高效性、平台移植性和安全性,广泛应用于PC、数据中心、游戏控制台、科学超级计算机、移动电话和互...

启示录是真的
05/24
0
0
java编程学习:文件流操作!

Java是一种可以撰写跨平台应用软件的面向对象的程序设计语言。Java 技术具有卓越的通用性、高效性、平台移植性和安全性,广泛应用于PC、数据中心、游戏控制台、科学超级计算机、移动电话和互...

Java小辰
06/04
0
0
Effective Java 第三版——48. 谨慎使用流并行

Tips 《Effective Java, Third Edition》一书英文版已经出版,这本书的第二版想必很多人都读过,号称Java四大名著之一,不过第二版2009年出版,到现在已经将近8年的时间,但随着Java 6,7,8...

M104
10/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

初级开发-编程题

` public static void main(String[] args) { System.out.println(changeStrToUpperCase("user_name_abc")); System.out.println(changeStrToLowerCase(changeStrToUpperCase("user_name_abc......

小池仔
今天
4
0
现场看路演了!

HiBlock
昨天
14
0
Rabbit MQ基本概念介绍

RabbitMQ介绍 • RabbitMQ是一个消息中间件,是一个很好用的消息队列框架。 • ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的s...

寰宇01
昨天
9
0
官方精简版Windows10:微软自己都看不过去了

微软宣布,该公司正在寻求解决方案,以减轻企业客户的Windows 10规模。该公司声称,企业客户下载整个Windows 10文件以更新设备既费钱又费时。 微软宣布,该公司正在寻求解决方案,以减轻企业...

linux-tao
昨天
19
0
TypeScript基础入门之JSX(二)

转发 TypeScript基础入门之JSX(二) 属性类型检查 键入检查属性的第一步是确定元素属性类型。 内在元素和基于价值的元素之间略有不同。 对于内部元素,它是JSX.IntrinsicElements上的属性类型...

durban
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部