首页 > 代码库 > java并发读&写文件
java并发读&写文件
最近在看Brian Goetz 的<<Java并发实战>>,这本书有两个版本,电子工业出版社的译本很糟糕,建议使用机械工业出版社出版出版的书籍.
在看到第三四章的时候突然想到了多线程读写文件,同时遇到一些书中没有的问题
1, 如何保证组合对象的安全性?
2, 如何判断不变性的约束条件
3, 如何不通过synchronized关键字和锁进行同步处理?
下面是一段代码, 用来从source 读取数据,通过多线程写入target文件中
思路:
1, 如何read/write文件?
2, 如何设计Reader类?
3, Reader类的是否需要状态来描述? 状态变量如何同步?
4, 如何保证当前Thread可以准确地读取当前segment?
代码综述:
package org.mushroom.multithread; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; /** * 要点: * 1, 将一个文件分为 ,大小为 {@link Reader#segmentLength} 的 {@link Reader#segments} 个文件 * ,每个线程通过读取pure block来读取数据,被访问的块会通过{@link Reader#segment}标记 * ,自增1. * 2, 多个线程的全局变量是final {@link Reader#source}, final {@link Reader#target}和 * {@link Reader#segment},需要保证这些变量的安全性 */ public class Reader implements Runnable { private static final int BYTE = 1024; private static final long NEGATIVE_ONE = -1L; private static final long ZERO = 0L; private static final long ONE = 1L; // 全局变量 供多线程访问块 private final AtomicLong segment = new AtomicLong(NEGATIVE_ONE); // 单个文件大小 private final int segmentLength = 30 * BYTE * BYTE; // 原始文件 private final File source; // 复制后的文件 private final File target; // 文件被分割后的块数 private final long segments; // 最后一块文件实际大小 private final long remains; public Reader(String sourcePath, String targetPath) throws IOException { this.source = new File(sourcePath); this.target = new File(targetPath); if (!this.target.exists()) { this.target.createNewFile(); } this.remains = (this.source.length() % segmentLength); //如果余数不为0, 则需要多一个块来存储多余的bytes,否则会丢失 if (this.remains != ZERO) { this.segments = this.source.length() / segmentLength + ONE; } else { this.segments = sourcePath.length() / segmentLength; } } /** * run: * 1, while true: 当前块未被访问, 从{@link Reader#segment = 0}开始第一次访问 * 2, {@link Reader#readBlock(RandomAccessFile, long)}从文件中读取数据,并返回 byte[] * 3, {@link Reader#writeBlock(RandomAccessFile, byte[], long)},设置position后将缓冲写入文件 */ public void run() { RandomAccessFile reader = null; RandomAccessFile writer = null; try { reader = new RandomAccessFile(source, "r"); writer = new RandomAccessFile(target, "rw"); long position = -1L; //循环计数当前segment, 多个线程均可修改 while ((position = segment.incrementAndGet()) < segments) { final byte[] bytes = readBlock(reader, position); writeBlock(writer, bytes, position); } } catch (IOException e) { e.printStackTrace(); } finally { close(writer); close(reader); } } private void writeBlock(RandomAccessFile writer, byte[] bytes, long position) throws IOException { writer.seek(position * segmentLength); writer.write(bytes); } /** * 1, reader设置position * 2, 创建缓冲数组 * 3, 将数据写入byte[] * 4, 返回缓冲数组 * * @return position 供 {@link RandomAccessFile#write(byte[])}使用 */ private byte[] readBlock(RandomAccessFile reader, long position) throws IOException { reader.seek(position * segmentLength); final byte[] bytes = new byte[getWriteLength(position)]; reader.read(bytes); return bytes; } /** * 获得当前byte[]实际可写入长度可能是{@link Reader#segmentLength} 或者 {@link Reader#remains} */ private int getWriteLength(long position) throws IOException { if (position == segments + NEGATIVE_ONE && remains > ZERO) { return (int) remains; } return segmentLength; } /** * 关闭流的通用接口方法 * * @param closeable */ private void close(Closeable closeable) { try { if (Objects.nonNull(closeable)) { closeable.close(); } } catch (IOException e) { e.printStackTrace(); } } }
测试代码:注意junit对多线程测试无能为力,建议使用groboutils或者简单地main方法
package org.mushroom.multithread; import java.io.IOException; public class ReaderTest { public static void main(String[] args) throws IOException { final String source = ""; final String target = ""; Reader reader = new Reader(source, target); new Thread(reader).start(); new Thread(reader).start(); new Thread(reader).start(); new Thread(reader).start(); } }
解决方式:
1, 多线程读取文件需要从不同的地方读取数据,所以普通的stream不符合要求,所以选择java.io.RandomAccessFile它可任意设置文件cursor.并且包含read&write方式
2, ①reader&writer设计
多线程执行的是reader类的run方法,所以每个线程都必须有自己独立的reader&writer,所以reader&writer必须是线程私有的,也就是说需要把reader&writer封装在线程中,线程之间的reader&writer不能共享.
②如何处理source&target?
每个线程处理自己所负责的块,最终的块合在一起就是一个完整的target.也就是将source分为大小为segment length的segment块,但是注意,最后一个segment块的实际有效数据长度并非segment length.
③其他域设计
需要两个File来描述source⌖
需要两个变量描述source被分成多少个segments以及segment的大小segmentLength;
需要一个变量描述最后一块remain的大小;
需要一个共享变量segment来描述source文件的文件状态;
3, source文件需要一个segment变量来描述它被多线程处理的状态,也就是当前线程处理到哪里了.还有那些尚未处理
4,Thread读取过的segment可以使用一个java.util.concurrent.atomic.AtomicLong类型的变量来描述它(也就是主要描述了当前块是第几个块), AtomicLong类的增/减操作是线程安全的.所以每一次读取的块的位置都是安全的.
本文出自 “小蘑菇博客大世界” 博客,请务必保留此出处http://smallmushroom.blog.51cto.com/9474297/1862018
java并发读&写文件