首页 > 代码库 > jdk并发包ReentrantLock 源码导读

jdk并发包ReentrantLock 源码导读

1,ReentrantLock实现了Lock接口,下面是Lock接口。定义了一些Lock的基本操作。


2,ReentrantLock根据在高并发下获取锁的算法分为FairSync和NonfairSync两种。默认为NonfairSync。

3,FairSync和NonfairSync继承了Sync。而Sync是锁的基础控制类。FairSync依然需要检查当前线程是否是等待队列的第一个,NonfairSync则不需要直接从列表中取一个。实际中公平锁吞吐量比非公平锁小很多。

4,Sync通过AbstractQueuedSynchronizer(AQS)来管理对Lock的使用。AQS内部通过维护一个lock queue来维护对锁的争用。改lock queue是CLH locks 的一个变种。下面是节点的定义

       static final class Node {        /** waitStatus value to indicate thread has cancelled */        static final int CANCELLED =  1;        /** waitStatus value to indicate successor's thread needs unparking */        static final int SIGNAL    = -1;        /** waitStatus value to indicate thread is waiting on condition */        static final int CONDITION = -2;        /** Marker to indicate a node is waiting in shared mode */        static final Node SHARED = new Node();        /** Marker to indicate a node is waiting in exclusive mode */        static final Node EXCLUSIVE = null;        /**         * Status field, taking on only the values:         *   SIGNAL:     The successor of this node is (or will soon be)         *               blocked (via park), so the current node must         *               unpark its successor when it releases or         *               cancels. To avoid races, acquire methods must         *               first indicate they need a signal,         *               then retry the atomic acquire, and then,         *               on failure, block.         *   CANCELLED:  This node is cancelled due to timeout or interrupt.         *               Nodes never leave this state. In particular,         *               a thread with cancelled node never again blocks.         *   CONDITION:  This node is currently on a condition queue.         *               It will not be used as a sync queue node until         *               transferred. (Use of this value here         *               has nothing to do with the other uses         *               of the field, but simplifies mechanics.)         *   0:          None of the above         *         * The values are arranged numerically to simplify use.         * Non-negative values mean that a node doesn't need to         * signal. So, most code doesn't need to check for particular         * values, just for sign.         *         * The field is initialized to 0 for normal sync nodes, and         * CONDITION for condition nodes.  It is modified only using         * CAS.         */        volatile int waitStatus;        /**         * Link to predecessor node that current node/thread relies on         * for checking waitStatus. Assigned during enqueing, and nulled         * out (for sake of GC) only upon dequeuing.  Also, upon         * cancellation of a predecessor, we short-circuit while         * finding a non-cancelled one, which will always exist         * because the head node is never cancelled: A node becomes         * head only as a result of successful acquire. A         * cancelled thread never succeeds in acquiring, and a thread only         * cancels itself, not any other node.         */        volatile Node prev;        /**         * Link to the successor node that the current node/thread         * unparks upon release. Assigned once during enqueuing, and         * nulled out (for sake of GC) when no longer needed.  Upon         * cancellation, we cannot adjust this field, but can notice         * status and bypass the node if cancelled.  The enq operation         * does not assign next field of a predecessor until after         * attachment, so seeing a null next field does not         * necessarily mean that node is at end of queue. However, if         * a next field appears to be null, we can scan prev's from         * the tail to double-check.         */        volatile Node next;        /**         * The thread that enqueued this node.  Initialized on         * construction and nulled out after use.         */        volatile Thread thread;        /**         * Link to next node waiting on condition, or the special         * value SHARED.  Because condition queues are accessed only         * when holding in exclusive mode, we just need a simple         * linked queue to hold nodes while they are waiting on         * conditions. They are then transferred to the queue to         * re-acquire. And because conditions can only be exclusive,         * we save a field by using special value to indicate shared         * mode.         */        Node nextWaiter;        /**         * Returns true if node is waiting in shared mode         */        final boolean isShared() {            return nextWaiter == SHARED;        }        /**         * Returns previous node, or throws NullPointerException if         * null.  Use when predecessor cannot be null.         * @return the predecessor of this node         */        final Node predecessor() throws NullPointerException {            Node p = prev;            if (p == null)                throw new NullPointerException();            else                return p;        }        Node() {    // Used to establish initial head or SHARED marker        }        Node(Thread thread, Node mode) {     // Used by addWaiter            this.nextWaiter = mode;            this.thread = thread;        }        Node(Thread thread, int waitStatus) { // Used by Condition            this.waitStatus = waitStatus;            this.thread = thread;        }    }

5,ReentrantLock中对锁的调用实际调的都是Sync的子类。比如:

    public void lock() {
        sync.lock();
    }

6,在中有一个重要的方法如下。通过该方法我们可以获取Condition.通过条件变量Condition,我们更灵活的使用锁。可以实现类似Object.wait/notify/notifyAll的应用场景

    public Condition newCondition() {
        return sync.newCondition();
    }

7,下面是condition的例子

package com.j2se.concurrence.lockcondition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Lock test
 * 
 * @author joeyon
 * @date Aug 14, 2014 8:04:57 PM
 */
public class LockTest {
	private static int i;
	private static Lock lk = new ReentrantLock();

	public static void test() {
		List<Thread> list = new ArrayList<Thread>();
		int tcount = 3;
		// prepare threads
		for (int i = 0; i < tcount; i++) {
			list.add(new Thread(new TmpRunnable(), "t-" + i));
		}
		// start threads
		for (int i = 0; i < tcount; i++) {
			list.get(i).start();
		}
	}

	private static class TmpRunnable implements Runnable {
		@Override
		public void run() {
			lk.lock();
			try {
				printTime("begin");
				Thread.sleep(1000 * 1); // sleep a while, for test purpose
				printTime("end");
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				lk.unlock();
			}
		}
	}

	public static void printTime() {
		printTime("");
	}

	/**
	 * print thread name & time
	 * 
	 * @param info
	 *            additional info to print
	 */
	public synchronized static void printTime(String info) {
		System.out.printf("%s:\t%d,\t,%d,\t%s\n", Thread.currentThread().getName(), ++i, System.currentTimeMillis() / 1000, info);
	}

	public static void main(String[] args) {
		test();
	}
}

8,read/write lock例子 :

package com.j2se.concurrence.lockcondition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * Read Write Lock test
 * 
 * @author joeyon
 * @date Aug 14, 2014 10:34:08 PM
 */
public class ReadWriteLockTest {
	private static int i;

	private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
	private static Lock rlk = lock.readLock();
	private static Lock wlk = lock.writeLock();

	private static String data = "";

	private static volatile long lastUpdate; // track last publish date

	/**
	 * publish data, use write lock,
	 * 
	 * @param newData
	 */
	public static void publish(String newData) {
		wlk.lock();
		try {
			printTime("begin publish");
			data = newData;
			lastUpdate = System.currentTimeMillis(); // modify last update date
			printTime("data:\n\t" + data);
			printTime("end publish");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			wlk.unlock();
		}
	}

	/**
	 * view data, use read lock
	 * 
	 * @param previousView
	 *            last viewed publish date
	 * @return date of new publish, or -1 if no new publish
	 */
	public static long view(long previousView) {
		if (previousView < lastUpdate) { // new publish
			rlk.lock();
			try {
				printTime("view data:\n\t" + data);
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				rlk.unlock();
			}
			return lastUpdate;
		} else { // no new publish
			printTime("no new publish yet");
			return -1;
		}
	}

	public static void test() throws InterruptedException {
		Thread tPublish = new Thread(new Runnable() {
			@Override
			public void run() {
				while (true) {
					publish("hi, xxxxxx, data_" + i + "_xxxxxx");
					try {
						Thread.sleep(1000 * 10); // update interval
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}, "t-publish");

		// prepare view threads
		int tViewCount = 3; // count of view thread
		List<Thread> tViewList = new ArrayList<Thread>();
		final List<Long> tLastView = new ArrayList<Long>(); // keep track of last viewed publish date
		for (int i = 0; i < tViewCount; i++) {
			final int _index = i;
			tViewList.add(new Thread(new Runnable() {
				@Override
				public void run() {
					while (true) {
						long _lastDate = view(tLastView.get(_index));
						if (_lastDate > 0) {
							tLastView.set(_index, _lastDate); // update last viewed publish date, if has new publish
						}
						try {
							Thread.sleep(1000 * 4); // view interval
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			}, "t-view-" + i));
			tLastView.add(0L);
		}

		tPublish.start();
		for (int i = 0; i < tViewCount; i++) {
			tViewList.get(i).start();
			Thread.sleep(1000 * 5); // start interval of view threads
		}
	}

	public static void printTime() {
		printTime("");
	}

	/**
	 * print thread name & time
	 * 
	 * @param info
	 *            additional info to print
	 */
	public synchronized static void printTime(String info) {
		System.out.printf("%s:\t%d,\t,%d,\t%s\n", Thread.currentThread().getName(), ++i, System.currentTimeMillis() / 1000, info);
	}

	public static void main(String[] args) throws InterruptedException {
		test();
	}
}

参考:http://kuchaguangjie.iteye.com/blog/1632154

            http://www.blogjava.net/xylz/archive/2010/07/15/326152.html

            http://www.cnblogs.com/MichaelPeng/archive/2010/02/12/1667947.html

            http://flyfoxs.iteye.com/blog/2101244







jdk并发包ReentrantLock 源码导读