JAVA流控及超流控后的延迟处理实例

本文实例讲述了JAVA流控及超流控后的延迟处理方法。分享给大家供大家参考。具体实现方法如下:

流控检查(每半秒累计,因此最小留空阀值只能做到每秒2条):

import java.text.SimpleDateFormat;

import java.util.Date;

import java.lang.Thread;

 

/**

 * 流量控制

 *

 * @author chenx

 */

public class OverflowController {

 

    private int maxSendCountPerSecend; // 该条链路上流控阀值

    private Date sendTime = new Date();

    private int sendCount = 0; // 该条链路上发送的数量

 

    public OverflowController(int maxSendCountPerSecend) {

        if (maxSendCountPerSecend < 2) {

            maxSendCountPerSecend = 2;

        }

 

        this.maxSendCountPerSecend = maxSendCountPerSecend;

    }

 

    public int getMaxSendCountPerSecend() {

        if (getMilliseconds(new Date()) >= 500) {

            return maxSendCountPerSecend / 2;

        }

 

        return maxSendCountPerSecend - (maxSendCountPerSecend / 2);

    }

 

    /**

     * 是否超流控

     */

    public boolean isOverflow(int sendNum) {

        synchronized (this) {

            Date now = new Date();

            if (now.getTime() - sendTime.getTime() >= 500) {

                sendTime = now;

                sendCount = sendNum;

            } else {

                if (sendCount + sendNum > getMaxSendCountPerSecend()) {

                    return true;

                } else {

                    sendCount += sendNum;

                }

            }

 

            return false;

        }

    }

 

    /**

     * 获取指定时间的毫秒数

     */

    private int getMilliseconds(Date date) {

        SimpleDateFormat df = new SimpleDateFormat("SSS");

        return Integer.valueOf(df.format(date));

    }

 

    public static void main(String[] args) throws InterruptedException {

        OverflowController oc = new OverflowController(50);

        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");

        for (int i = 0; i <= 100; i++) {

            if (oc.isOverflow(1)) {

                System.out.println(i + "-isOverflow-" + df.format(new Date()));

            } else {

                System.out.println(i + "-sendOk-" + df.format(new Date()));

            }

 

            Thread.sleep(10);

        }

    }

}

超流控后的延迟处理,由于java中没有.net的“延迟委托”一说:
ThreadPool.RegisterWaitForSingleObject(

 WaitHandle waitObject,

      WaitOrTimerCallback callBack,

      Object state,

     int millisecondsTimeOutInterval,

     bool executeOnlyOnce

)

Java下需实现一个简单的延迟队列:

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

 

public class DelayEntry implements Delayed {

 

    private int count;

    private long dequeuedTimeMillis; // 出队列时间

 

    public int getCount() {

        return count;

    }

 

    public void setCount(int count) {

        this.count = count;

    }

 

    public long getDequeuedTimeMillis() {

        return dequeuedTimeMillis;

    }

 

    public DelayEntry(long delayMillis) {

        dequeuedTimeMillis = System.currentTimeMillis() + delayMillis;

    }

 

    @Override

    public int compareTo(Delayed o) {

        DelayEntry de = (DelayEntry) o;

        long timeout = dequeuedTimeMillis - de.dequeuedTimeMillis;

        return timeout > 0 ? 1 : timeout < 0 ? -1 : 0;

    }

 

    @Override

    public long getDelay(TimeUnit unit) {

        return dequeuedTimeMillis - System.currentTimeMillis();

    }

}

 
import java.util.concurrent.DelayQueue;

 

public class DelayService {

 

    public void run() {

        DelayQueue<DelayEntry> queue = new DelayQueue<DelayEntry>();

        DelayConsumer delayConsumer = new DelayConsumer(queue);

        delayConsumer.start();

 

        for (int i = 0; i < 100; i++) {

            DelayEntry de = new DelayEntry(5000);

            de.setCount(i);

            System.out.println(System.currentTimeMillis() + "--------" + de.getCount());

            queue.add(de);

        }

    }

 

    class DelayConsumer extends Thread {

        DelayQueue<DelayEntry> queue;

        public DelayConsumer(DelayQueue<DelayEntry> queue) {

            this.queue = queue;

        }

 

        public void run() {

            while (true) {

                try {

                    DelayEntry de = queue.take();

                    System.out.println("queue size=" + queue.size());

                    System.out.println(de.getCount());

                    System.out.println(System.currentTimeMillis());

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }

 

    public static void main(String[] args) {

        DelayService ds = new DelayService();

        ds.run();

    }

}

希望本文所述对大家的Java程序设计有所帮助。