首页 > 代码库 > 秒杀多线程第十一篇 读者写者问题(续)

秒杀多线程第十一篇 读者写者问题(续)

java实现:

本问题的关键是读者写者之间的同步问题,尤其使用java来操作。

1.等待读者,使用CountDownLatch mReaderLatch, 但是CountDownLatch只能使用一次,所以需要每次都new 一个。

或者可以考虑使用semaphore代替,但是semaphore需要acquire(READ_THREAD_SIZE)才能等待所有读者线程结束。

2.等待写入操作。使用semaphore来控制, mWriteSema.release(READ_THREAD_SIZE);表示读者都可以同时操作。

关键代码如下:

public void waitReaderEnd()    {        //多个read End,using countdownlatch        try {            mReaderLatch.await();            mReaderLatch = new CountDownLatch(READ_THREAD_SIZE);        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                    }        public void singalWriteEnd(){        mWriteSema.release(READ_THREAD_SIZE);    }                public void waitWriteEnd(){        try {            mWriteSema.acquire();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    }        public void singalReadEnd(){                mReaderLatch.countDown();    }        public void initReadNone(){        try {            mWriteSema.acquire(READ_THREAD_SIZE);            for(int i=0;i<READ_THREAD_SIZE;i++)            {                mReaderLatch.countDown();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                    }

 

详细代码如下:

package com.multithread.readwrite;import java.util.ArrayList;import java.util.LinkedList;import java.util.List;import java.util.Queue;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CountDownLatch;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import com.multithread.main.ExampleInterface;import com.multithread.prosumer.ProducerThread;/** * 1.写者要等待所有读者读完才能继续写入。 * 2.所有读者要等待写者写完才能继续读取。 * 3.读者这件可以共享读取文件。 *  *    *  * */public class ReaderWriterExample extends ExampleInterface {    public static final int READ_THREAD_SIZE = 4;    public static final int BUFFER_LENGTH = 100;        public List<Integer> g_productor = new ArrayList<Integer>();        public CountDownLatch mReaderLatch = new CountDownLatch(READ_THREAD_SIZE);    public Semaphore mWriteSema = new Semaphore(READ_THREAD_SIZE);        public boolean bStopFlag = false;    public CountDownLatch mLatchDown = new CountDownLatch(1+READ_THREAD_SIZE);    public CountDownLatch mLatchStart = new CountDownLatch(1+READ_THREAD_SIZE);    @Override    public void startDemo() {        // TODO Auto-generated method stub                try {                        initReadNone();            bStopFlag = false;            Executor mEcecutor = Executors.newFixedThreadPool(1+READ_THREAD_SIZE);            mEcecutor.execute(new WriteThread(this,"Writer"));                        for(int i=1;i<=READ_THREAD_SIZE;i++)            {                mEcecutor.execute(new ReadThread(this,"Reader"+i));            }                        mLatchStart.await();            System.out.println("All Thread is runnning");                        mLatchDown.await();                        System.out.println("All Thread is Down");        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                    }            public void waitReaderEnd()    {        //多个read End,using countdownlatch        try {            mReaderLatch.await();            mReaderLatch = new CountDownLatch(READ_THREAD_SIZE);        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                    }        public void singalWriteEnd(){        mWriteSema.release(READ_THREAD_SIZE);    }                public void waitWriteEnd(){        try {            mWriteSema.acquire();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    }        public void singalReadEnd(){                mReaderLatch.countDown();    }        public void initReadNone(){        try {            mWriteSema.acquire(READ_THREAD_SIZE);            for(int i=0;i<READ_THREAD_SIZE;i++)            {                mReaderLatch.countDown();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                    }}
package com.multithread.readwrite;public class ReadThread extends Thread {        ReaderWriterExample mRwExample = null;    String name = null;    boolean flag = true;    public ReadThread(ReaderWriterExample re,String name)    {        mRwExample = re;        this.name = name;    }        @Override    public void run() {        mRwExample.mLatchStart.countDown();        while(flag)        {            //等待写入结束            mRwExample.waitWriteEnd();                        //读取文件,直到本次末尾 //check 文件写入操作是否已经彻底结束,结束read线程。            int mReadLength = mRwExample.g_productor.size();            String mReadStr = "";            if(mReadLength>0)            {                for(Integer a:mRwExample.g_productor)                {                    mReadStr+=String.format("%x", a);                }                System.out.println(name+"读取数据:"+mReadStr);            }                        if(mRwExample.bStopFlag)            {                flag = false;            }                        //通知本次读者结束。            mRwExample.singalReadEnd();        }                System.out.println(name+"读取数据结束");        mRwExample.mLatchDown.countDown();    }    }
package com.multithread.readwrite;public class WriteThread extends Thread {        ReaderWriterExample mRwExample = null;    String name = null;    int iFlag = 0;        public WriteThread(ReaderWriterExample re,String name)    {        mRwExample = re;        this.name = name;    }            @Override    public void run() {        mRwExample.mLatchStart.countDown();        int index = 0;        while(index<ReaderWriterExample.BUFFER_LENGTH)        {            //等待所有读者结束读取操作。            mRwExample.waitReaderEnd();                        int mWriteLength = (int) (Math.random()*9)+1;//1-10;            if(mWriteLength >(ReaderWriterExample.BUFFER_LENGTH - index))            {                mWriteLength = ReaderWriterExample.BUFFER_LENGTH - index;            }                            //写入数据到文件最后  //check 写入数据到最后。            mRwExample.g_productor.clear();            int mParam = 0;            String writeline = "";            for(int i=0;i<mWriteLength;i++)            {                mParam = (int) (Math.random()*14)+1;//0-E                mRwExample.g_productor.add(mParam);                writeline+= String.format("%1$x", mParam);            }                        index = index +mWriteLength;            System.out.println(name+"写入数据:"+writeline+"\t 当前index:"+index);            if(index>=ReaderWriterExample.BUFFER_LENGTH)            {                mRwExample.bStopFlag = true;            }                        //通知写入操作结束,可以读取。            mRwExample.singalWriteEnd();                                    iFlag++;        }                System.out.println(name+"线程操作结束");        mRwExample.mLatchDown.countDown();    }    }

 

秒杀多线程第十一篇 读者写者问题(续)