[RxJava] RxJava 이해하기 - 5. 스케줄러

 


스케줄러

스케줄러란

RxJava의 스케줄러는 RxJava의 코드가 어느 스레드에서 실행될 것인지 지정하는 역할을 한다. RxJava만 사용한다고 비동기 처리가 되는 것이 아니라, 스케줄러를 통해 스레드를 분리해주어야 비동기 작업이 가능한 것이다. 스케줄러의 지정은 RxJava의 subscribeOnobserveOn 연산자를 통해 가능하다.

subscribeOn은 Observable이 데이터 흐름을 발생시키고 연산하는 스레드를 지정할 수 있고, observeOn은 Observable이 Observer에게 알림을 보내는 스레드를 지정할 수 있다.

RxJava의 큰 장점은 특정 스케줄러를 사용하다가 다른 스케줄러로 변경하기 쉽다는 점이다. 다음 그림을 보자.

(1) 세번째 subscribeOn 연산자에 의해 처음 데이터 발행은 파란색 스레드에서 일어난다.
(2) 첫번째 observeOn 연산자에 의해 그 다음의 map 연산이 주황색 스레드에서 일어난다.
(3) 마지막 observeOn 연산자에 의해 그 다음의 작업이 핑크색 스레드에서 일어난다.


스케줄러의 종류

스케줄러는 RxJava의 Schedulers 클래스의 정적 팩토리 메소드를 통해 생성할 수 있으며 총 5개의 스케줄러가 존재한다.

public final class Schedulers {
    @NonNull
    static final Scheduler SINGLE;

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;

    @NonNull
    static final Scheduler NEW_THREAD;
}


각각을 정리해보면 다음과 같다.

스케줄러 생성 방법 내용
SINGLE Schedulers.newThread() 단일 스레드를 생성해 계속 재사용
COMPUTATION Schedulers.computation() 내부적으로 스레드 풀 생성, 스레드 개수=프로세서 개수
IO Schedulers.io() 필요할때마다 스레드를 계속 생성
TRAMPOLINE Schedulers.trampoline() 현재 스레드에 무한한 크기의 대기 큐 생성
NEW_THREAD Schedulers.single() 매번 새로운 스레드 생성

:bulb: RxJava에서는 이 중 Computation, IO, Trampoline 세 가지의 스케줄러를 권장한다.


Single Thread Scheduler

Single 스레드 스케줄러는 단일 스레드를 계속 재사용한다. RxJava 내부에서 스레드를 별도로 생성하며, 한 번 생성된 스레드로 여러 작업을 처리한다. 비동기 처리를 지향한다면 Single 스레드 스케줄러를 사용할 일은 거의 없다.

String[] source = {"First", "Second", "Third"};
Observable.fromArray(source)
        .observeOn(Schedulers.single())
        .subscribe( data -> {
            System.out.println("Observe On : "+Thread.currentThread().getName()+" | "+"value : "+data);
        });
Thread.sleep(100);
[실행결과]
Observe On : RxSingleScheduler-1 | value : First
Observe On : RxSingleScheduler-1 | value : Second
Observe On : RxSingleScheduler-1 | value : Third


Computation Thread Scheduler

Computation 스레드 스케줄러는 CPU에 대응하는 계산용 스케줄러이다. IO 작업을 하지 않고 일반적인 계산/연산 작업을 할 때 사용한다. 내부적으로 스레드 풀을 생성하고 생성된 스레드를 이용한다. 기본적으로 스레드의 개수는 프로세서의 개수와 같다.

String[] source = {"First", "Second", "Third"};
Observable.fromArray(source)
        .observeOn(Schedulers.computation())
        .subscribe( data -> {
            System.out.println("Observe On : "+Thread.currentThread().getName()+" | "+"value : "+data);
        });
Thread.sleep(100);
[실행결과]
Observe On : RxComputationThreadPool-1 | value : First
Observe On : RxComputationThreadPool-1 | value : Second
Observe On : RxComputationThreadPool-1 | value : Third


IO Thread Scheduler

파일 입출력 등의 IO 작업을 하거나 네트워크 요청 처리 시에 사용하는 스케줄러이다. Computation 스케줄러와 다르게 필요할 때마다 스레드를 계속 생성한다.

String[] source = {"First", "Second", "Third"};
Observable.fromArray(source)
        .observeOn(Schedulers.io())
        .subscribe( data -> {
            System.out.println("Observe On : "+Thread.currentThread().getName()+" | "+"value : "+data);
        });
Thread.sleep(100);
[실행결과]
Observe On : RxCachedThreadScheduler-1 | value : First
Observe On : RxCachedThreadScheduler-1 | value : Second
Observe On : RxCachedThreadScheduler-1 | value : Third


Trampoline Thread Scheduler

트램펄린 스케줄러는 새로운 스레드를 생성하지 않고 사용하고 있는 현재 스레드에 무한한 크기의 대기 큐를 생성한다.

String[] source = {"First", "Second", "Third"};
Observable.fromArray(source)
        .observeOn(Schedulers.trampoline())
        .subscribe( data -> {
            System.out.println("Observe On : "+Thread.currentThread().getName()+" | "+"value : "+data);
        });
Thread.sleep(100);
[실행결과]
Observe On : main | value : First
Observe On : main | value : Second
Observe On : main | value : Third


New Thread Scheduler

New Thread 스케줄러는 다른 스케줄러와 달리 요청을 받을 때 마다 매번 새로운 스레드를 생성한다.

String[] source = {"First", "Second", "Third"};
Observable.fromArray(source)
        .observeOn(Schedulers.newThread())
        .subscribe( data -> {
            System.out.println("Observe On : "+Thread.currentThread().getName()+" | "+"value : "+data);
        });
Thread.sleep(100);
[실행결과]
Observe On : RxNewThreadScheduler-1 | value : First
Observe On : RxNewThreadScheduler-1 | value : Second
Observe On : RxNewThreadScheduler-1 | value : Third


Thread Pool을 직접 생성하는 방법

Java에서는 Executor를 통해 스레드 풀을 직접 생성할 수 있으며, 생성한 스레드 풀을 Rxjava에서 사용할 수 있다.

String[] source = {"First", "Second", "Third"};
Executor executor = Executors.newFixedThreadPool(10);
Observable.fromArray(source)
        .observeOn(Schedulers.from(executor))
        .subscribe( data -> {
            System.out.println("Observe On : "+Thread.currentThread().getName()+" | "+"value : "+data);
        });
Thread.sleep(100);
[실행결과]
Observe On : pool-1-thread-1 | value : First
Observe On : pool-1-thread-1 | value : Second
Observe On : pool-1-thread-1 | value : Third



:bookmark: REFERENCE
유동환, 박정준, 「RxJava 프로그래밍」, 한빛미디어
ReactiveX Scheduler