首页 > 代码库 > java并发读&写文件

java并发读&写文件

最近在看Brian Goetz 的<<Java并发实战>>,这本书有两个版本,电子工业出版社的译本很糟糕,建议使用机械工业出版社出版出版的书籍.

在看到第三四章的时候突然想到了多线程读写文件,同时遇到一些书中没有的问题

1, 如何保证组合对象的安全性?

2, 如何判断不变性的约束条件

3, 如何不通过synchronized关键字和锁进行同步处理?

下面是一段代码, 用来从source 读取数据,通过多线程写入target文件中

思路:

1, 如何read/write文件?

2, 如何设计Reader类?

3, Reader类的是否需要状态来描述? 状态变量如何同步?

4, 如何保证当前Thread可以准确地读取当前segment?

代码综述:


package org.mushroom.multithread;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 要点:
 * 1, 将一个文件分为 ,大小为 {@link Reader#segmentLength} 的 {@link Reader#segments} 个文件
 * ,每个线程通过读取pure block来读取数据,被访问的块会通过{@link Reader#segment}标记
 * ,自增1.
 * 2, 多个线程的全局变量是final {@link Reader#source}, final {@link Reader#target}和
 * {@link Reader#segment},需要保证这些变量的安全性
 */
public class Reader implements Runnable {
    private static final int BYTE = 1024;
    private static final long NEGATIVE_ONE = -1L;
    private static final long ZERO = 0L;
    private static final long ONE = 1L;
    // 全局变量 供多线程访问块
    private final AtomicLong segment = new AtomicLong(NEGATIVE_ONE);
    // 单个文件大小
    private final int segmentLength = 30 * BYTE * BYTE;
    // 原始文件
    private final File source;
    // 复制后的文件
    private final File target;
    // 文件被分割后的块数
    private final long segments;
    // 最后一块文件实际大小
    private final long remains;

    public Reader(String sourcePath, String targetPath) throws IOException {
        this.source = new File(sourcePath);
        this.target = new File(targetPath);
        if (!this.target.exists()) {
            this.target.createNewFile();
        }
        this.remains = (this.source.length() % segmentLength);
        //如果余数不为0, 则需要多一个块来存储多余的bytes,否则会丢失
        if (this.remains != ZERO) {
            this.segments = this.source.length() / segmentLength + ONE;
        } else {
            this.segments = sourcePath.length() / segmentLength;
        }

    }

    /**
     * run:
     * 1, while true: 当前块未被访问, 从{@link Reader#segment = 0}开始第一次访问
     * 2, {@link Reader#readBlock(RandomAccessFile, long)}从文件中读取数据,并返回 byte[]
     * 3, {@link Reader#writeBlock(RandomAccessFile, byte[], long)},设置position后将缓冲写入文件
     */
    public void run() {
        RandomAccessFile reader = null;
        RandomAccessFile writer = null;
        try {
            reader = new RandomAccessFile(source, "r");
            writer = new RandomAccessFile(target, "rw");
            long position = -1L;
            //循环计数当前segment, 多个线程均可修改
            while ((position = segment.incrementAndGet()) < segments) {
                final byte[] bytes = readBlock(reader, position);
                writeBlock(writer, bytes, position);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(writer);
            close(reader);
        }
    }

    private void writeBlock(RandomAccessFile writer, byte[] bytes, long position) throws IOException {
        writer.seek(position * segmentLength);
        writer.write(bytes);
    }

    /**
     * 1, reader设置position
     * 2, 创建缓冲数组
     * 3, 将数据写入byte[]
     * 4, 返回缓冲数组
     *
     * @return position 供 {@link RandomAccessFile#write(byte[])}使用
     */
    private byte[] readBlock(RandomAccessFile reader, long position) throws IOException {
        reader.seek(position * segmentLength);
        final byte[] bytes = new byte[getWriteLength(position)];
        reader.read(bytes);
        return bytes;
    }

    /**
     * 获得当前byte[]实际可写入长度可能是{@link Reader#segmentLength} 或者 {@link Reader#remains}
     */
    private int getWriteLength(long position) throws IOException {
        if (position == segments + NEGATIVE_ONE && remains > ZERO) {
            return (int) remains;
        }
        return segmentLength;
    }

    /**
     * 关闭流的通用接口方法
     *
     * @param closeable
     */

    private void close(Closeable closeable) {
        try {
            if (Objects.nonNull(closeable)) {
                closeable.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

测试代码:注意junit对多线程测试无能为力,建议使用groboutils或者简单地main方法

package org.mushroom.multithread;

import java.io.IOException;

public class ReaderTest {

    public static void main(String[] args) throws IOException {

        final String source = "";
        final String target = "";
        Reader reader = new Reader(source, target);
        new Thread(reader).start();
        new Thread(reader).start();
        new Thread(reader).start();
        new Thread(reader).start();
    }
}

解决方式:

1, 多线程读取文件需要从不同的地方读取数据,所以普通的stream不符合要求,所以选择java.io.RandomAccessFile它可任意设置文件cursor.并且包含read&write方式

2, ①reader&writer设计

多线程执行的是reader类的run方法,所以每个线程都必须有自己独立的reader&writer,所以reader&writer必须是线程私有的,也就是说需要把reader&writer封装在线程中,线程之间的reader&writer不能共享.

   ②如何处理source&target?

每个线程处理自己所负责的块,最终的块合在一起就是一个完整的target.也就是将source分为大小为segment length的segment块,但是注意,最后一个segment块的实际有效数据长度并非segment length.

  ③其他域设计

需要两个File来描述source&target;

需要两个变量描述source被分成多少个segments以及segment的大小segmentLength;

需要一个变量描述最后一块remain的大小;

需要一个共享变量segment来描述source文件的文件状态;

3, source文件需要一个segment变量来描述它被多线程处理的状态,也就是当前线程处理到哪里了.还有那些尚未处理

4,Thread读取过的segment可以使用一个java.util.concurrent.atomic.AtomicLong类型的变量来描述它(也就是主要描述了当前块是第几个块), AtomicLong类的增/减操作是线程安全的.所以每一次读取的块的位置都是安全的.


本文出自 “小蘑菇博客大世界” 博客,请务必保留此出处http://smallmushroom.blog.51cto.com/9474297/1862018

java并发读&写文件