[RxJava] RxJava 이해하기 - 7. Backpressure와 Flowable

 


Backpressure와 Flowable

RxJava에는 Backpressure라는 개념과 이를 처리하는 Flowable class가 존재한다.
Backpressure가 무엇이고 Flowable은 어떻게 쓰는 것인지 알아보자.


배압(Backpressure)

배압이란 데이터 생산과 소비가 불균형적일 때 일어나는 현상이다. 만약 10,000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓이게 된다. Observable이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하는 것이다. 이는 결국 메모리가 overflow되고 OutOfMemoryError로 이어져 앱이 터질 것이다. 이러한 현상을 배압(Backpressure)이라고 하며 RxJava에서는 배압 현상을 제어할 수 있는 방법을 제공한다.

img


Flowable

기존의 Observable이 배압 현상을 제어하지 못하는 반면, Flowable은 배압 현상을 스스로 제어할 수 있다. 다음의 두 코드를 살펴보자.


Observable

Observable.range(1, 10000)
        .doOnNext(integer -> System.out.println("Emit Data : "+integer))
        .observeOn(Schedulers.io())
        .subscribe(integer -> {
            System.out.println("Consume Data : "+integer);
            Thread.sleep(100);
        });
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 9998
Emit Data : 9999
Emit Data : 10000
Consume Data : 2
Consume Data : 3
Consume Data : 4
...


Flowable

Flowable.range(1, 10000)
        .doOnNext(integer -> System.out.println("Emit Data : "+integer))
        .observeOn(Schedulers.io())
        .subscribe(integer -> {
            System.out.println("Consume Data : "+integer);
            Thread.sleep(100);
        });
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 126
Emit Data : 127
Emit Data : 128
Consume Data : 2
Consume Data : 3
Consume Data : 4
...

두 예제 모두 10,000개의 데이터를 발행하면서, 소비는 100ms의 delay를 두었다.
Observable을 사용한 경우에는 데이터 발행과 소비가 균형적으로 일어나지 않으며 데이터는 소비와 상관없이 스트림에 계속 쌓이게 된다.
반면 Flowable을 사용한 경우에는 데이터가 일정량 누적되면 데이터를 더이상 발행하지 않는 것을 확인할 수 있다.
이와 같이, Flowable은 스트림에 끊임없이 쌓이는 데이터의 양을 제어할 수 있는 Observable의 또다른 형태이다.


When to use Observable? When to use Flowable?

그렇다면 언제 Observable을, 언제 Flowable을 사용해야할까? RxJava Wiki에 Observable과 Flowable을 선택하는 기준이 포스트되어있다.

Observable을 사용해야하는 경우

  • 1,000개 미만의 데이터 흐름이 발생하는 경우
  • 적은 데이터 소스만을 활용하여 OutOfMemoryException이 발생할 확률이 적은 경우
  • 마우스 이벤트나 터치 이벤트와 같은 GUI 프로그래밍을 하는 경우 (초당 1,000회 이하의 이벤트는 Observable의 sample()이나 debounce()로 핸들링 가능)
  • 동기적인 프로그래밍이 필요하지만 플랫폼에서 Java Streams을 지원하지 않는 경우

Flowable을 사용해야하는 경우

  • 10,000개 이상의 데이터 흐름이 발생하는 경우
  • 디스크에서 파일을 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
  • JDBC에서 데이터베이스를 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
  • 네트워크 IO 실행 시
  • Blocking/Pull-based 방식을 사용하고 있는데 나중에 Non-Blocking 방식의 Reactive API/드라이버에서 데이터를 가져올 일이 있는 경우


배압 전략

Flowable에도 배압을 제어하지 못해 MissingBackpressureException이 발생할 수 있는 예외상황1이 존재한다. 따라서 Flowable에 배압 전략을 명시함으로써 배압을 제어할 수 있다. 5가지의 배압 전략이 존재하며 각각의 내용은 다음과 같다.

📌 1. Flowable과 interval()을 같이 사용하는 경우. interval 연산자는 스케줄러와 관계없이 시간에 의존해 데이터를 발행하므로 에러가 발생한다.


이름 enum 내용
MISSING BackpressureStrategy.MISSING 배압 전략을 구현하지 않음
ERROR BackpressureStrategy.ERROR 소비 속도가 발행 속도를 따라가지 못하는 경우 MissingBackpressureException 발생
BUFFER BackpressureStrategy.BUFFER 데이터를 소비할 때까지 데이터를 버퍼에 넣어둠. 무한한 크기의 큐이지만 OOME이 발생할 수 있음.
DROP BackpressureStrategy.DROP 소비 속도가 발행 속도를 따라가지 못하는 경우 발행된 데이터를 모두 버림
LATEST BackpressureStrategy.LATEST 구독자가 데이터를 받을 준비가 될 때까지 최신 데이터만 유지하고 나머지는 버림


예제

create() 연산자를 통해 Flowable을 생성하는 경우 배압 전략을 명시해주어야 한다.

Flowable.create(emitter -> {
    for (int i = 0; i < 10000; i++) emitter.onNext(i);
    emitter.onComplete();
}, BackpressureStrategy.DROP)
        .observeOn(Schedulers.io())
        .subscribe();


배압 제어 연산자

RxJava의 연산자 중에는 생성된 Flowable에 배압 전략을 적용할 수 있는 3가지 연산자를 제공한다.


onBackPressureBuffer()

BackpressureStrategy.BUFFER 전략을 적용한다. 매개변수로 버퍼의 용량, 버퍼 overflow 발생 시의 동작 등을 함께 전달할 수 있다.


onBackPressureDrop()

BackpressureStrategy.DROP 전략을 적용한다. 매개변수로 데이터를 버릴 때의 동작을 정의할 수 있다.


onBackPressureLatest()

BackpressureStrategy.LATEST 전략을 적용한다.


예제

Flowable.range(1, 1000)
        .onBackpressureLatest()
        .doOnNext(integer -> System.out.println("Emit Data : "+integer))
        .observeOn(Schedulers.io())
        .subscribe(integer -> {
            System.out.println("Consume Data : "+integer);
            Thread.sleep(100);
        });
Thread.sleep(100*1000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Consume Data : 1
Emit Data : 5
...
Emit Data : 128
Consume Data : 2
Consume Data : 3
...
Consume Data : 95
Consume Data : 96
Emit Data : 1000
Consume Data : 97
Consume Data : 98
...
Consume Data : 128
Consume Data : 1000




:bookmark: REFERENCE
유동환, 박정준, 「RxJava 프로그래밍」, 한빛미디어
옥수환, 「아키텍처를 알아야 앱 개발이 보인다」, 비제이퍼블릭
What does the term “backpressure” mean in Rxjava?
RxJava Wiki | Which type to use?