首页 > 代码库 > [JAVA]流控及超流控后的延迟处理

[JAVA]流控及超流控后的延迟处理

流控检查(每半秒累计,因此最小留空阀值只能做到每秒2条):

import java.text.SimpleDateFormat;import java.util.Date;import java.lang.Thread;/** * 流量控制 *  * @author chenx */public class OverflowController {	private int maxSendCountPerSecend; // 该条链路上流控阀值	private Date sendTime = new Date();	private int sendCount = 0; // 该条链路上发送的数量	public OverflowController(int maxSendCountPerSecend) {		if (maxSendCountPerSecend < 2) {			maxSendCountPerSecend = 2;		}		this.maxSendCountPerSecend = maxSendCountPerSecend;	}	public int getMaxSendCountPerSecend() {		if (getMilliseconds(new Date()) >= 500) {			return maxSendCountPerSecend / 2;		}		return maxSendCountPerSecend - (maxSendCountPerSecend / 2);	}	/**	 * 是否超流控	 */	public boolean isOverflow(int sendNum) {		synchronized (this) {			Date now = new Date();			if (now.getTime() - sendTime.getTime() >= 500) {				sendTime = now;				sendCount = sendNum;			} else {				if (sendCount + sendNum > getMaxSendCountPerSecend()) {					return true;				} else {					sendCount += sendNum;				}			}			return false;		}	}	/**	 * 获取指定时间的毫秒数	 */	private int getMilliseconds(Date date) {		SimpleDateFormat df = new SimpleDateFormat("SSS");		return Integer.valueOf(df.format(date));	}	public static void main(String[] args) throws InterruptedException {		OverflowController oc = new OverflowController(50);		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");		for (int i = 0; i <= 100; i++) {			if (oc.isOverflow(1)) {				System.out.println(i + "-isOverflow-" + df.format(new Date()));			} else {				System.out.println(i + "-sendOk-" + df.format(new Date()));			}			Thread.sleep(10);		}	}}

超流控后的延迟处理,由于java中没有.net的“延迟委托”一说:

ThreadPool.RegisterWaitForSingleObject(
 WaitHandle waitObject,
      WaitOrTimerCallback callBack,
      Object state,
     int millisecondsTimeOutInterval,
     bool executeOnlyOnce

)

Java下需实现一个简单的延迟队列:

import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class DelayEntry implements Delayed {	private int count;	private long dequeuedTimeMillis; // 出队列时间	public int getCount() {		return count;	}	public void setCount(int count) {		this.count = count;	}	public long getDequeuedTimeMillis() {		return dequeuedTimeMillis;	}	public DelayEntry(long delayMillis) {		dequeuedTimeMillis = System.currentTimeMillis() + delayMillis;	}	@Override	public int compareTo(Delayed o) {		DelayEntry de = (DelayEntry) o;		long timeout = dequeuedTimeMillis - de.dequeuedTimeMillis;		return timeout > 0 ? 1 : timeout < 0 ? -1 : 0;	}	@Override	public long getDelay(TimeUnit unit) {		return dequeuedTimeMillis - System.currentTimeMillis();	}}

 

import java.util.concurrent.DelayQueue;public class DelayService {	public void run() {		DelayQueue<DelayEntry> queue = new DelayQueue<DelayEntry>();		DelayConsumer delayConsumer = new DelayConsumer(queue);		delayConsumer.start();		for (int i = 0; i < 100; i++) {			DelayEntry de = new DelayEntry(5000);			de.setCount(i);			System.out.println(System.currentTimeMillis() + "--------" + de.getCount());			queue.add(de);		}	}	class DelayConsumer extends Thread {		DelayQueue<DelayEntry> queue;		public DelayConsumer(DelayQueue<DelayEntry> queue) {			this.queue = queue;		}		public void run() {			while (true) {				try {					DelayEntry de = queue.take();					System.out.println("queue size=" + queue.size());					System.out.println(de.getCount());					System.out.println(System.currentTimeMillis());				} catch (InterruptedException e) {					e.printStackTrace();				}			}		}	}	public static void main(String[] args) {		DelayService ds = new DelayService();		ds.run();	}}

 

[JAVA]流控及超流控后的延迟处理