Hope is a good thing, and maybe the best thing of all

编程不止是一份工作,还是一种乐趣!!!

《微服务架构》:限流

限流是微服务架构下保障系统的主要手段之一。通常来说系统的吞吐量是可以提前预测的,当请求量超过预期的伐值时可以采取一些限制措施来保障系统的稳定运行,比如延迟处理、拒绝服务等。


常用的限流算法


常见的限流算法有:计数器、漏桶、令牌桶。

计数器算法

采用计数器实现限流比较简单粗爆,对于java语言我们可以使用AtomicLong或者Semaphore来实现。

// AtomicLong实现
try {
    if (atomic.incrementAndGet() > limit) {
        // 限流处理
    }

    // 处理请求
} finally {
    atomic.decrementAndGet();
}

// Semaphore实现
boolean allowed = false;
try {
    allowed = semaphore.tryAcquire();
    if (!allowed) {
        // 限流处理
    }

    // 处理请求
} finally {
    semaphore.release(1);
}

两种实现的区别在于Semaphore是阻塞式的,而AtomicLong是基于CAS的非阻塞实现,冲突不高的情况下AtomicLong更好一些。


漏桶算法

类似生活用到的漏斗,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出,算法描述如下:

  • 一个固定容量的漏桶,按照固定的速率流出水滴(请求)。
  • 如果桶是空的,则不需要流出水滴。
  • 可以以任意的速率流入水滴到桶中,如果超出了桶的容量,则流入的水滴溢出(拒绝请求)。
  • 桶的容量始终保持不变。

算法实现上,可以使用一个BlockingQueue表示漏桶,请求进来时放入这个BlockingQueue中。另起一个线程以固定的速率从BlockingQueue中取出请求,再提交给业务线程池处理。漏桶算法有个弊端:无法应对短时间的突发流量


令牌桶算法

令牌桶算法是对漏桶算法的一种改进,漏桶算法能够限制请求流出的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用,算法描述如下:

  • 有一个固定容量的桶,按照固定的速率向桶中添加令牌。
  • 如果桶满了,则新添加的令牌被丢弃。
  • 当请求进来时,必须从桶中获取一个令牌才能继续处理,否则拒绝请求,或者暂存到某个缓冲区等待可用令牌。

算法实现上,可以用一个BlockingQueue表示桶,起一个线程以固定的速率向桶中添加令牌。请求到达时需要先从桶中获取令牌,否则拒绝请求或等待。此外,Google开源的guava包也提供了很完善的令牌算法。


Redis + Lua实现基于时间窗口的计算器限流


static String lua = "local key = KEYS[1] " // 限流key
        + "local limit = tonumber(ARGV[1]) "// 限流伐值
        + "local current = tonumber(redis.call('get', key) or '0') "
        + "if (current + 1) > limit then "
        + "    return 0 "
        + "elseif current == 0 then "
        + "    redis.call('incrby', key, '1') "
        + "    redis.call('expire', key, '2') " // 设置2秒过期时间
        + "    return 1 "
        + "else "
        + "    redis.call('incrby', key, '1') "
        + "    return 1 "
        + "end";

public boolean isLimit() {
    Jedis jedis = null;
    try {
        jedis = pool.getResource();
        // 每秒限流1000次
        String key = "Limit.Key." + System.currentTimeMillis() / 1000;
        String limit = "1000";

        Object rlt = jedis.eval(lua, 1, key, limit);
        return Long.valueOf(0) == rlt;
    } finally {
        if (null != jedis) {
            jedis.close();
        }
    }
}


基于AtomicInteger实现令牌桶算法


public class TokenBucket {
    private final static int DEFAULT_SIZE = 100;
    private final static int DEFAULT_BATCH = 1;
    private final static int DEFAULT_FREQ  = 1;
    private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

    private AtomicInteger a = new AtomicInteger(0);
    private final int size;

    public TokenBucket() {
        this(DEFAULT_SIZE, DEFAULT_BATCH, DEFAULT_FREQ, DEFAULT_TIME_UNIT);
    }

    public TokenBucket(int size, int batch, int freq, TimeUnit unit) {
        this.size = size;

        new Thread(new TokenProducer(batch, freq, unit)).start();
    }

    public boolean acquire() {
        while (true) {
            int current = a.get();
            if (current == 0) {
                return false;
            }

            if (a.compareAndSet(current, current - 1)) {
                return true;
            }
        }
    }

    public void release(int n) {
        while (true) {
            int current = a.get();
            int next = current + n;
            if (next > size) {
                next = size;
            }

            if (a.compareAndSet(current, next)) {
                return;
            }
        }
    }

    public int current() {
        return a.get();
    }

    class TokenProducer implements Runnable {

        private int batch;
        private int freq;
        private TimeUnit unit;

        public TokenProducer(int batch, int freq, TimeUnit unit) {
            this.batch = batch;
            this.freq = freq;
            this.unit = unit;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    TokenBucket.this.release(this.batch);

                    try {
                        unit.sleep(freq);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    // ignore
                }
            }
        }

    }
}