编程不止是一份工作,还是一种乐趣!!!
限流是微服务架构下保障系统的主要手段之一。通常来说系统的吞吐量是可以提前预测的,当请求量超过预期的伐值时可以采取一些限制措施来保障系统的稳定运行,比如延迟处理、拒绝服务等。
常见的限流算法有:计数器、漏桶、令牌桶。
采用计数器实现限流比较简单粗爆,对于java语言我们可以使用AtomicLong或者Semaphore来实现。
// AtomicLong实现
try {
if (atomic.incrementAndGet() > limit) {
// 限流处理
}
// 处理请求
} finally {
atomic.decrementAndGet();
}
// Semaphore实现
boolean allowed = false;
try {
allowed = semaphore.tryAcquire();
if (!allowed) {
// 限流处理
}
// 处理请求
} finally {
semaphore.release(1);
}
两种实现的区别在于Semaphore是阻塞式的,而AtomicLong是基于CAS的非阻塞实现,冲突不高的情况下AtomicLong更好一些。
类似生活用到的漏斗,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出,算法描述如下:
算法实现上,可以使用一个BlockingQueue表示漏桶,请求进来时放入这个BlockingQueue中。另起一个线程以固定的速率从BlockingQueue中取出请求,再提交给业务线程池处理。漏桶算法有个弊端:无法应对短时间的突发流量。
令牌桶算法是对漏桶算法的一种改进,漏桶算法能够限制请求流出的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用,算法描述如下:
算法实现上,可以用一个BlockingQueue表示桶,起一个线程以固定的速率向桶中添加令牌。请求到达时需要先从桶中获取令牌,否则拒绝请求或等待。此外,Google开源的guava包也提供了很完善的令牌算法。
static String lua = "local key = KEYS[1] " // 限流key
+ "local limit = tonumber(ARGV[1]) "// 限流伐值
+ "local current = tonumber(redis.call('get', key) or '0') "
+ "if (current + 1) > limit then "
+ " return 0 "
+ "elseif current == 0 then "
+ " redis.call('incrby', key, '1') "
+ " redis.call('expire', key, '2') " // 设置2秒过期时间
+ " return 1 "
+ "else "
+ " redis.call('incrby', key, '1') "
+ " return 1 "
+ "end";
public boolean isLimit() {
Jedis jedis = null;
try {
jedis = pool.getResource();
// 每秒限流1000次
String key = "Limit.Key." + System.currentTimeMillis() / 1000;
String limit = "1000";
Object rlt = jedis.eval(lua, 1, key, limit);
return Long.valueOf(0) == rlt;
} finally {
if (null != jedis) {
jedis.close();
}
}
}
public class TokenBucket {
private final static int DEFAULT_SIZE = 100;
private final static int DEFAULT_BATCH = 1;
private final static int DEFAULT_FREQ = 1;
private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
private AtomicInteger a = new AtomicInteger(0);
private final int size;
public TokenBucket() {
this(DEFAULT_SIZE, DEFAULT_BATCH, DEFAULT_FREQ, DEFAULT_TIME_UNIT);
}
public TokenBucket(int size, int batch, int freq, TimeUnit unit) {
this.size = size;
new Thread(new TokenProducer(batch, freq, unit)).start();
}
public boolean acquire() {
while (true) {
int current = a.get();
if (current == 0) {
return false;
}
if (a.compareAndSet(current, current - 1)) {
return true;
}
}
}
public void release(int n) {
while (true) {
int current = a.get();
int next = current + n;
if (next > size) {
next = size;
}
if (a.compareAndSet(current, next)) {
return;
}
}
}
public int current() {
return a.get();
}
class TokenProducer implements Runnable {
private int batch;
private int freq;
private TimeUnit unit;
public TokenProducer(int batch, int freq, TimeUnit unit) {
this.batch = batch;
this.freq = freq;
this.unit = unit;
}
@Override
public void run() {
while (true) {
try {
TokenBucket.this.release(this.batch);
try {
unit.sleep(freq);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
// ignore
}
}
}
}
}