Reactor는 Spring Framwork의 주도하에 개발된 리액티브 스트림즈 구현체로 Spring 5버전부터 리액티브 스택에 포함되었다.
Spring WebFlux 기반의 리액티브 어플리케이션 제작의 핵심 역할이다.
따라서 Reactor Core 라이브러리는 Spring WebFlux에 라이브러리로 포함되어 있다.
우선 마블 다이어그램에 대해서 익히고 이를 기반삼아 Reactor를 공부하는게 좋다.
1. 마블 다이어그램
마블은 구슬을 의미한다. 즉 여러 가지 구슬 도형으로 구성된 도표를 의미하고 Reactor에서 지원하는 Operator를 이해하는데 중요한 역할을 한다.
기본 구성
1. 두개의 타임라인이 위아래로 존재하는데 위부분은 Upstream이 데이터를 emit하는 타임라인이고 아래 부분 데이터가 Operator를 통해 가공처리되고 출력되는 타임라인이다. 타임라인은 왼쪽에서 오른쪽으로 시간이 흐르는 것을 의미한다.
2. 타임라인 위에 구슬은 emit되거나 가공된 데이터들을 말한다. 왼쪽에 있는 구슬 데이터가 시간상 먼저 emit되거나 가공된 데이터이다.
3. 위 부분 타임라인 가장 끝에 수직으로 된 바는 데이터의 emit이 정상적으로 끝났음(onComplete)을 의미한다.
4. 가운데 네모 박스는 Operator 함수를 의미한다.
5. 위 부분 타임라인에서 아래 부분 타임라인으로 점선으로 연결된 구슬끼리는 operator를 거치기 전후를 말한다.
6. X 표시는 에러가 발생해서 데이터 처리가 종료된 것을 의미한다.
2. Project Reactor
2-1 특징
1. Publisher와 Subscriber 간의 상호 작용은 Java의 함수형 프로그래밍 API 를 통해서 이뤄진다.
2. Flux[N] : Publisher 타입은 크게 두가지이고 그 중 하나가 Flux이다. 0~N의 데이터를 emit한다.
3. Mono[0|1] : Publisher 타입 중 하나이고 0~1개의 데이터를 emit한다. 즉 하나도 emit하지 않거나 하나만 emit하는 단발성 데이터 emit에 특화된 Publisher 타입이다.
4. Backpressure : emit된 데이터에 있어 과부하가 걸리지 않도록 제어하는 Backpressure를 지원한다.
2-2 Cold Sequence와 Hot Sequence
1. Cold Sequence
- Sequence는 Publisher가 데이터를 emit하는 데이터의 연속적인 흐름을 말한다. Cold Sequence는 Subscriber가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequence를 말한다. 위의 마블 다이어그램을 보면 구독 시점이 달라도 Publisher는 동일 데이터를 emit한다. 즉 Subscriber의 구독 시점이 달라도 구독 할 때마다 Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름을 Cold Sequence라고 한다.
2. Hot Sequence
- 구독이 발생한 시점 이전에 Publisher가 emit한 데이터는 Subscriber가 전달받지 못하고 구독이 발생한 시점 이후 데이터만 전달 받을 수 있다. 즉 Subscriber는 구독 시점 이후 Publisher가 emit한 데이터만 전달받는 것이다.즉 타임라인이 하나만 생긴다고 볼 수 있다.
3. Backpressure
이를 우리말로 하면 배압 또는 역압이라고 하며, 배관에서 액체나 기체의 흐름을 제어하기 위해 역으로 가해지는 압력을 의미한다.
Publisher가 데이터1을 emit 하고 Subscriber가 데이터1을 처리하기 전에 date2..3..4 를 끊임없이 emit하면 데이터가 지속적으로 쌓이면서 오버플로가 발생하거나 시스템 장애가 발생할 수 있다. 이런 문제를 해결해주는게 Backpressure이다.
3-1 데이터 수 제어
Subscriber가 적절하게 처리할 수 있는 데이터 수를 Publisher에게 요청하는 것이다.
Flux.range(1, 10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
public void hookOnNext(Integer integer) {
System.out.println("Cancelling after having received " + integer);
}
});
위 소스는 Reactor 공식 문서에서 설명하는 내용이다. hookOnSubscribe() 메서드는 최초 데이터 요청 개수를 제어하는 역할을 하고
hookOnNext() 메서드는 Publisher가 emit 한 데이터를 처리 후 Publisher에게 또다시 데이터를 요청하는 역할을 한다.
위 소스는 한번 Publisher가 emit하는 데이터 수를 1개로 지정해서 요청하는 소스이다.
3-2 Backpressure 전략 사용
Reactor에서 제공하는 Backpressure 전략을 사용하는 것이다.
- IGNORE 전략 : Backpressure을 적용하지 않는 전략이다. Subscriber가 처리할 수 있는 데이터 양보다 emit된 데이터의 수가 발행되면 더 많은 데이터를 처리할 수 없다는 신호로 IllegalStateException를 발생시킬 수 있다.
- ERROR 전략 : Downstream의 처리 속도가 emit 되는 데이터 속도보다 느려서 버퍼가 가득 찰 경우 IllegalStateException를 발생시키고 이 때 Publisher는 error signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기한다.
@Slf4j
public class Example8_2 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureError()
.doOnNext(data -> log.info("# doOnNext: {}", data))
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
Error전략을 사용하기 위해 onBackpressureError()를 사용하고 interval()로 0부터 1씩 증가하는 숫자를 0.001초에 한번씩 아주 빠르게 emit하도록 정의하고 Subcrriber는 전달받은 데이터를 처리하는데 Thread.sleep(5L)로 지연시켜 실행시키면 Downstream의 처리 속도가 emit 되는 데이터 속도보다 느려서 실행되다가 에러가 발생하게 된다. 에러가 발생하면 Sequence가 종료된다.
- DROP 전략 : Publisher가 Downstream으로 데이터를 emit 할 때, 버퍼가 가득찰 경우 버퍼 밖에서 대기중인 데이터 중 가장 먼저 emit된 데이터부터 폐기한다.
여기서 주목해야 하는건 버퍼 밖에서 대기중인 데이터가 폐기된다는 점이다.
@Slf4j
public class Example8_2 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
dropped 로그로 찍히는건 폐기된 데이터를 말한다.
- LATEST 전략 : 버퍼가 가득찰 경우 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에 emit된 데이터를 버퍼에 채우는 전략이다. DROP과 다른점은 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남기고 나머지 데이터는 폐기한다.
@Slf4j
public class Example8_2 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureLatest()
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
- BUFFER 전략 : 버퍼의 데이터를 폐기하지 않고 버퍼링을 하는 전략이다. 버퍼가 가득차면 데이터를 폐기하거나 에러를 발생시키는 전략등을 지원한다. 폐기 전략은 DROP과 LATEST전략과 달리 버퍼 안에 있는 데이터를 폐기하는것을 의미한다. 폐기 전략에는 DROP_LATEST, DROP_OLDEST 전략이 있다.
- DROP_LATEST :버퍼가 가득 찰 경우 가장 최근에 버퍼 안에 채워진 데이터를 폐기한다.
- DROP_OLDEST : 버퍼가 가득 찰 경우 가장 마지막에 버퍼 안에 채워진 데이터를 폐기한다.
'개발 > Spring' 카테고리의 다른 글
#4) Spring WebFlux(기술 스택 차이, 요청 처리 흐름, 핵심 컴포넌트, 프로세스 구조) (0) | 2024.06.05 |
---|---|
#2) Spring WebFlux(리액티브 프로그래밍 특징, 리액티브 스트림즈) (0) | 2024.01.30 |
#1) Spring WebFlux (함수 호출 관점 동기/비동기 Blocking/Non-Blocking, 함수형 인터페이스, WebFlux 장점) (2) | 2023.12.03 |
#2) Spring Security Oauth2 Client + Apple 로그인 연동하기 (7) | 2023.03.27 |
#1) Spring Security Oauth2 Client + Apple 로그인 연동하기 (0) | 2023.03.27 |