首页 > 代码库 > Java 并发编程之测试

Java 并发编程之测试

并发程序测试的要点

  • 吞吐量
  • 响应性
  • 可伸缩性

正确性测试

首先需要一个可供测试的程序做为栗子。就是下面这个了。一个固定长度的 队列,其中定义可阻塞的put和take方法,并通过两个计数器进行控制。

import java.util.concurrent.Semaphore;

public class BoundedBuffer<E> {

	private final Semaphore availableItems, availableSpaces;
	private final E[] Items;
	private int putPosition = 0, takePosition = 0;

	public BoundedBuffer(int capacity) {
		availableItems = new Semaphore(0);
		availableSpaces = new Semaphore(capacity);
		Items = (E[]) new Object[capacity];
	}

	public boolean isEmpty() {
		return availableItems.availablePermits() == 0;
	}

	public boolean isFull() {
		return availableSpaces.availablePermits() == 0;
	}

	public void put(E x) throws InterruptedException {
		availableSpaces.acquire();
		doInsert(x);
		availableItems.release();
	}

	public E take() throws InterruptedException {
		availableItems.acquire();
		E item = doExtra();
		availableSpaces.release();
		return item;
	}

	private synchronized void doInsert(E x) {
		// TODO Auto-generated method stub
		int i = putPosition;
		Items[i] = x;
		putPosition = (++i == Items.length) ? 0 : i;
	}

	private synchronized E doExtra() {
		int i = takePosition;
		E x = Items[i];
		Items[i] = null;
		takePosition = (++i == Items.length) ? 0 : i;
		return x;
	}
}

基本的单元测试

使用JUnit就可以了

import static org.junit.Assert.*;

import org.junit.Test;

public class BoundedBufferTest {

	@Test
	public void test() {
		testIsEmptyWhenConstructed();
		try {
			testIsFullAfterPuts();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	void testIsEmptyWhenConstructed() {
		BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
		assertTrue(bb.isEmpty());
		assertFalse(bb.isFull());
	}

	void testIsFullAfterPuts() throws InterruptedException {
		BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
		for (int i = 0; i < 10; i++) {
			bb.put(i);
		}
		assertFalse(bb.isEmpty());
		assertTrue(bb.isFull());
	}
}

主要是测试边界情况。

对阻塞操作的测试

测试代码尝试从空的缓存中take一个元素,如果能成功那么就测试失败了。然后再等待一段时间 ,再中断该线程。如果获取线程正确的在take中阻塞,那么将抛出interruptedException。捕获到异常的catch块将此试为测试成功并让线程退出,然后主测试线程尝试与获取线程合并,通过调用isAlive方法验证Join方法是否成功返回。如果获取线程可以中断线程,那么 join能很快完成。

	void testTakeBlocksWhenEmpty() {
		final BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
		Thread taker = new Thread() {

			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					int unused = bb.take();
					fail();
				} catch (InterruptedException success) {
				}
			}
		};
		try {
			taker.start();
			Thread.sleep(LOCKUP_DETECT_TIMEOUT);
			taker.interrupt();
			taker.join(LOCKUP_DETECT_TIMEOUT);
			assertFalse(taker.isAlive());
		} catch (Exception unException) {
			fail();
		}
	}

安全性测试

测试在数据竞争条件下是否会发生错误。这就需要一个并发的测试程序。或许比编写本身要测试的类更加困难。

通过一个对顺序敏感的校验和计算函数来计算 所有入列的元素以及出列元素的检验和,并进行比较。如果二者相等,那么测试就是成功的。如果只有一个生产者一个消费者,那么 这种方法能发挥最大的作用。因为它不仅能测试出是否取出了正确的元素,而且还能测试出元素被取出的顺序是否正确、

如果要将这种方法扩展到多生产者多消费者的情况时,就需要一个对元素入列和出列顺序不敏感的校验和函数。从而在测试程序运行完以后,可以将多个检验和以不同的顺序组合起来,如果不是这样,多个线程就需要访问 同一个共享的检验和变量 ,因此就需要同步,这将成为一个并发的瓶颈。

要确保测试程序能够正确地测试所有要点,就一定不能让编译器可以预先猜测到检验 和的值。那么会对许多 其他 的测试造成影响。由于大多数随机类生成器都是线程安全的。并且会带来额外的同步开销。所以还不如用一个简单的伪随机函数 。

	static int XorShift(int y) {
		y ^= (y << 6);
		y ^= (y >>> 21);
		y ^= (y << 7);
		return y;
	}

参数用nanotime();我想用上面这个伪随机数的作用可能就是拖延一点时间 ,因为nanotime是获取纳秒级别的时间 。通过运算拖延一点时间 后,再产生的数据必然不一样。所以就基本达到了获得不同的数据的需求。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;

import org.junit.Test;

public class PutTakeTest {
	private static final ExecutorService pool = Executors.newCachedThreadPool();
	private final AtomicInteger putSum = new AtomicInteger(0);
	private final AtomicInteger takeSum = new AtomicInteger(0);
	private final CyclicBarrier barrier;
	private final BoundedBuffer<Integer> bb;
	private final int Ntrials, nPairs;

	public PutTakeTest(int capacity, int ntrials, int nPairs) {
		this.bb = new BoundedBuffer<Integer>(capacity);
		Ntrials = ntrials;
		this.nPairs = nPairs;
		this.barrier = new CyclicBarrier(nPairs * 2 + 1);
	}

	public static void main(String[] args) {
		new PutTakeTest(10, 100000, 10).test();
		pool.shutdown();
	}

	void test() {
		try {
			for (int i = 0; i < nPairs; i++) {
				pool.execute(new Producer());
				pool.execute(new Consumer());

			}
			barrier.await();
			barrier.await();
			assertEquals(putSum.get(), takeSum.get());
		} catch (Exception e) {
			// TODO: handle exception
			throw new RuntimeException(e);
		}
	}

	class Producer implements Runnable {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			try {
				int seed = (this.hashCode() ^ (int) System.nanoTime());
				int sum = 0;
				barrier.await();
				for (int i = Ntrials; i > 0; i--) {
					bb.put(seed);
					sum += seed;
					seed = XorShift(seed);
				}
				putSum.getAndAdd(sum);
				barrier.await();
			} catch (Exception e) {
				// TODO: handle exception
				throw new RuntimeException(e);
			}
		}
	}

	class Consumer implements Runnable {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			try {
				barrier.await();
				int sum = 0;
				for (int i = Ntrials; i > 0; i--) {
					sum += bb.take();
				}
				takeSum.getAndAdd(sum);
				barrier.await();
			} catch (Exception e) {
				// TODO: handle exception
				throw new RuntimeException(e);
			}
		}

	}

	static int XorShift(int y) {
		y ^= (y << 6);
		y ^= (y >>> 21);
		y ^= (y << 7);
		return y;
	}
}

这种测试应该放在多处理器的系统上运行。要最大程序地检测出一些对执行时序敏感的数据竞争,那么测试中的线程数量应该多于CPU数量,这样在任意时刻都会有一些线程在运行,而另一些被交换出去,从而可以检查线程是交替行为的可预测性。








Java 并发编程之测试