RedisRateLimiter의 내부 동작

RedisRateLimiter 의 내부 동작

RequestRateLimiterGatewayFilterFactory

참고


생성자

RequestRateLimiterGatewayFilterFactory 는 GatewayFilter 객체를 생성하는 역할을 합니다. RequestRateLimiterGatewayFilterFactory 의 생성자 코드는 아래와 같습니다.

// ...
 
@ConfigurationProperties("spring.cloud.gateway.filter.request-rate-limiter")
public class RequestRateLimiterGatewayFilterFactory
		extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {
    // ...
    private final RateLimiter defaultRateLimiter;
	private final KeyResolver defaultKeyResolver;
    
    public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
		super(Config.class);
		this.defaultRateLimiter = defaultRateLimiter;
		this.defaultKeyResolver = defaultKeyResolver;
	}
}

defaultRateLimiter, defaultKeyResolver 는 사용자가 별도로 defaultRateLimiter, defaultKeyResolver 를 주입하지 않으면 Spring Cloud Gateway 내부적으로 의존성 주입을 합니다.

만약 RequestRateLimitGatewayFilterFactory 의 생성자를 이용해서 @Bean 설정을 직접 한다면, 위의 생성자를 통해서 객체를 생성해서 @Bean 으로 등록하며 defaultRateLimiter, defaultKeyResolver 에 해당하는 인자값에 실제 객체를 주입해주면 됩니다.


apply(Config)

이번에는 apply(Config) 메서드를 살펴봅니다.

뭔가 굉장히 어려운 내용인 것 같지만 그렇게 대단한 코드는 없습니다.

@ConfigurationProperties("spring.cloud.gateway.filter.request-rate-limiter")
public class RequestRateLimiterGatewayFilterFactory
		extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {
    // ...
    
    @SuppressWarnings("unchecked")
	@Override
	public GatewayFilter apply(Config config) {
		KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
		RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
		boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
		HttpStatusHolder emptyKeyStatus = HttpStatusHolder
				.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
 
        // 1)
		return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
			if (EMPTY_KEY.equals(key)) {
				if (denyEmpty) {
					setResponseStatus(exchange, emptyKeyStatus);
					return exchange.getResponse().setComplete();
				}
				return chain.filter(exchange);
			}
			String routeId = config.getRouteId();
            // 2) 
			if (routeId == null) {
				Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
				routeId = route.getId();
			}
            // 3)
			return limiter.isAllowed(routeId, key).flatMap(response -> {
 
                // 3-1)
				for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
					exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
				}
                // 3-2)
				if (response.isAllowed()) {
					return chain.filter(exchange);
				}
                // 3-3)
				setResponseStatus(exchange, config.getStatusCode());
				return exchange.getResponse().setComplete();
			});
		});
	}
    
    private <T> T getOrDefault(T configValue, T defaultValue) {
		return (configValue != null) ? configValue : defaultValue;
	}
    
    // ...
    
}

참고


1)

  • Exchange 를 KeyResolver 로 resolve() 한 결과가 null 이면 EMPTY_KEY 를 return 하고, 그 외의 경우는 flatMap(...) 을 통해 다음 처리로 넘어갑니다.

2) routeId 가 null 일 경우에 대한 설정입니다.

  • ServerWebExchangeUtils 에 정의된 키에 대해 Spring Webflux 의 ServerWebExchange 에 기본설정된 Gateway Router 에 대한 어트리뷰트 값(Value)을 얻어냅니다. 그리고 이 값은 Route 객체인데 이 기본 객체에 대한 routeId 를 얻어냅니다.

3) RateLimiter::isAllowed(routerId, key) 를 통해 얻어낸 Mono<Response> 를 flatMap 처리합니다. RateLimiter::isAllowed(routerId, key) 메서드는 뒤에서 정리합니다.

  • 3-1) RateLimiter::isAllowed(routerId, key) 에서 만들어낸 Response 객체 내의 header 의 Key, Value 의 쌍 들을 모두 복사해서 Exchange 내의 header 에 복제합니다.
  • 3-2) RateLimiter::isAllowed(routerId, key) 에서 만들어낸 Response 객체가 isAllowed 된 객체라면 chain.filter(exchange) 를 통해 다음 Filter 처리를 하도록 DispatcherHandler 에게 전달합니다.
  • 3-3) RateLimiter::isAllowed(routerId, key) 에서 만들어낸 Response 객체가 isAllowed가 아니라면 별도로 responseStatus 를 세팅하고, exchange 의 response 의 상태를 complete 로 변경합니다.

RateLimiter


RateLimiter 는 interface 입니다. 이 interface 를 구현해서 필요에 맞게 다양한 구현체를 적용가능합니다. Spring Cloud Gateway 는 Redis 를 사용하는데, Spring Cloud Gateway 는 RateLimiter 에 대한 구현체로 RedisRateLimiter 를 채택하고 있습니다.

AbstractRateLimiter 에서는 RateLimiter 를 구현하는 데에 있어서 필요한 필수적인 로직들을 구현하고 있는 abstract 클래스입니다.

RateLimiter interface 의 내용은 아래와 같습니다.

package org.springframework.cloud.gateway.filter.ratelimit;
 
public interface RateLimiter<C> extends StatefulConfigurable<C> {
 
	Mono<Response> isAllowed(String routeId, String id);
 
	class Response {
 
		private final boolean allowed;
 
		private final long tokensRemaining;
 
		private final Map<String, String> headers;
 
		public Response(boolean allowed, Map<String, String> headers) {
			this.allowed = allowed;
			this.tokensRemaining = -1;
			Assert.notNull(headers, "headers may not be null");
			this.headers = headers;
		}
 
		public boolean isAllowed() {
			return allowed;
		}
 
		public Map<String, String> getHeaders() {
			return Collections.unmodifiableMap(headers);
		}
 
		@Override
		public String toString() {
			final StringBuffer sb = new StringBuffer("Response{");
			sb.append("allowed=").append(allowed);
			sb.append(", headers=").append(headers);
			sb.append(", tokensRemaining=").append(tokensRemaining);
			sb.append('}');
			return sb.toString();
		}
 
	}
 
}

Response 라고 하는 Response 데이터를 담는용도의 클래스가 정의되어 있으며, isAllowed(routeId, id) 를 메서드로 제공하고 있습니다.


RedisRateLimiter


RedisRateLimiter 의 내용 중 이번 문서에서는 isAllowed(), lua 스크립트 를 알아볼 예정입니다.

  • isAllowed() 메서드
  • lua 스크립트

RedisRateLimiter::isAllowed() 메서드

RedisRateLimiter::isAllowed() 메서드 (opens in a new tab) 는 아래와 같이 정의되어 있습니다. 꽤 복잡해보이지만 크게 복잡한 내용은 없습니다.

package org.springframework.cloud.gateway.filter.ratelimit;
// ...
 
@ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter")
public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config> implements ApplicationContextAware {
    
    // ...
    private RedisScript<List<Long>> script;
    
    // ...
    public RedisRateLimiter(ReactiveStringRedisTemplate redisTemplate, RedisScript<List<Long>> script,
			ConfigurationService configurationService) {
		super(Config.class, CONFIGURATION_PROPERTY_NAME, configurationService);
		this.redisTemplate = redisTemplate;
		this.script = script;
		this.initialized.compareAndSet(false, true);
	}
    
    // ...
    
    @Override
	@SuppressWarnings("unchecked")
	public Mono<Response> isAllowed(String routeId, String id) {
		if (!this.initialized.get()) {
			throw new IllegalStateException("RedisRateLimiter is not initialized");
		}
 
		Config routeConfig = loadConfiguration(routeId);
 
        // (0)
		// How many requests per second do you want a user to be allowed to do?
		int replenishRate = routeConfig.getReplenishRate();
 
		// How much bursting do you want to allow?
		int burstCapacity = routeConfig.getBurstCapacity();
 
		// How many tokens are requested per request?
		int requestedTokens = routeConfig.getRequestedTokens();
 
		try {
			List<String> keys = getKeys(id);
 
            // (1)
			// The arguments to the LUA script. time() returns unixtime in seconds.
			List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
					Instant.now().getEpochSecond() + "", requestedTokens + "");
            
            // (2)
			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
			// .log("redisratelimiter", Level.FINER);
			return flux.onErrorResume(throwable -> {
				if (log.isDebugEnabled()) {
					log.debug("Error calling rate limiter lua", throwable);
				}
				return Flux.just(Arrays.asList(1L, -1L));
			}).reduce(new ArrayList<Long>(), (longs, l) -> {
				longs.addAll(l);
				return longs;
			}).map(results -> {
				boolean allowed = results.get(0) == 1L;
				Long tokensLeft = results.get(1);
 
				Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
 
				if (log.isDebugEnabled()) {
					log.debug("response: " + response);
				}
				return response;
			});
		}
		catch (Exception e) {
			/*
			 * We don't want a hard dependency on Redis to allow traffic. Make sure to set
			 * an alert so you know if this is happening too much. Stripe's observed
			 * failure rate is 0.01%.
			 */
			log.error("Error determining if user allowed from redis", e);
		}
		return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
	}
}

(0)

  • replenishRate, burstCapacity, requestedTokens 와 같은 값들을 routeConfig 에서 가져옵니다. burstCapacity, replenishRate, requestedTokens 등과 같은 값들은 모두 개발자가 선언한 application.yml 파일에 정의되어 있는 내용입니다.

(1)

  • (0) 에서 가져온 burstCapacity, replenishRate, requestedTokens 와 같은 인자값들을 문자열로 변형해서 List로 만듭니다.

(2)

  • (2) 에서는 redisTemplate 을 이용해서 lua script 를 실행시키는데, 이때 Argument 는 (1) 에서 초기화한 List<String> 입니다.
  • this.script 는 RedisRateLimiter 내에 정의된 RedisScript<List<Long>> 타입의 lua script 객체입니다.

lua 스크립트

Mac OS 에서는 brew, Linux 에서는 apt-get 등과 같은 패키지 매니저를 통해서 설치 가능합니다. 제 경우에는 개발 PC를 깔끔하게 유지하는것을 선호하기에 온라인 에디터를 사용했고 온라인 에디터에 대해서는 https://replit.com/languages/lua (opens in a new tab) 을 참고해주시기 바랍니다.

Lua 스크립트는 레디스 내의 인터프리터는 Lua 스크립트를 읽을수 있으며 실행 가능합니다. Lua 스크립트를 사용하면, 레디스 머신 내에서 레디스 실행엔진이 Lua 스크립트를 인터프리터로 해석해서 스크립트를 실행합니다.

클라이언트인 각각의 개별 서비스에서 레디스의 List,Set 의 데이터를 복사한 후 열어서 접근할 때 레디스 입장에서는 동일한 값이 복제되었을 때에 대해 데이터가 모호해질 수 있습니다.

이런 경우에는 Redis 내에 구성한 List, Set 등과 같은 자료구조에 접근하는 주체가 Redis 머신이 되도록 바꿔주면, 데이터의 불일치 현상을 방지할 수 있습니다.

Redis 에 Script 를 통해 접근하면 Redis 서버 내에서의 Atomic 연산을 보장해줍니다. 따라서 Rate Limiter, Coupon 발급기 같은 작업 대기열 기반 트래픽 처리를 할 때 유용합니다.

Lua 라는 언어는 브라질에서 1993년도애 개발된 프로그래밍 언어인데, C/C++ 프로그램 내부에 포함시키기 쉬우면서 깔끔하고 쉬운 문법의 스크립트 언어입니다. Redis 는 C/C++ 기반으로 작성된 데이터 플랫폼입니다. 그리고 Lua 스크립트를 실행할 수 있도록 Lua 스크립트와 관련된 기능을 지원하고 있습니다.

Lua 언어의 설치는 https://www.lua.org/download.html (opens in a new tab) 에서 다운로드 받아서 설치 가능합니다.


request_rate_limiter.lua

만약 우리가 RedisLimiter 를 의존성 주입하지 않는다면 Spring Cloud Gateway 에서 기본으로 제공해주는 request_rate_limiter.lua (opens in a new tab)가 선택되어 사용하게 됩니다.

request_rate_limiter.lua 의 경로는 spring-cloud-gateway-server/src/main/resources/META-INF/scripts/request_rate_limiter.lua 입니다.

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
 
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
 
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
 
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
 
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
 
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
 
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end
 
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
 
if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end
 
-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

자바 코드에서는 아래의 인자값을 넘겨줬었습니다.

  • replenishRate
  • burstCapacity
  • Instant.now().getEpochSecond() : 초 단위로 구분된 Epoch Time
  • requestedTokens

위의 스크립트에서는 위의 인자값들을 ARGV 라는 배열로 받습니다.

위의 코드는 Token Bucket Algorithm 이 구현된 것인데, Token Bucket Algorithm 에 대해서는 문서가 길어지게 되어서 별도의 문서에서 따로 정리하기로 결정했습니다.