[RxJava] RxJava 이해하기 - 6. subscribeOn과 observeOn의 차이

 


subscribeOn과 observeOn의 차이

RxJava는 operator를 통해 chaining 기법을 사용할 수 있다. subscribeOn과 observeOn도 마찬가지로 chaining 기법을 적용할 수 있다.

먼저 두 연산자의 역할은 다음과 같다.

subscribeOn은 Observable이 데이터 흐름을 발생시키고 연산하는 스레드를 지정할 수 있고, observeOn은 Observable이 Observer에게 알림을 보내는 스레드를 지정할 수 있다.

그럼 두 연산자의 차이는 무엇일까?

subscribeOn은 여러번 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관없다.
observeOn은 여러번 호출될 수 있으며 이후에 실행되는 연산에 영향을 주므로 위치가 중요하다.


예제

ArrayList<MyShape> shapes = new ArrayList<>();
shapes.add(new MyShape("Red","Ball"));
shapes.add(new MyShape("Green","Ball"));
shapes.add(new MyShape("Blue","Ball"));

Observable.fromIterable(shapes)
        .subscribeOn(Schedulers.computation())             // (A)
        .subscribeOn(Schedulers.io())                      // (B)
        // 1. 현재 스레드(main)에서 Observable을 구독
        .doOnSubscribe(data -> MyUtil.printData("doOnSubscribe"))
        // 2. (A)에 의해 computation 스케줄러에서 데이터 흐름 발생, (B)는 영향 X
        .doOnNext(data -> MyUtil.printData("doOnNext", data))
        // 3. (C)에 의해 map 연산이 new thread에서 실행
        .observeOn(Schedulers.newThread())                  // (C)
        .map(data -> {data.shape = "Square"; return data;})
        .doOnNext(data -> MyUtil.printData("map(Square)", data))
        // 4. (D)에 의해 map 연산이 new thread에서 실행
        .observeOn(Schedulers.newThread())                  // (D)
        .map(data -> {data.shape = "Triangle"; return data;})
        .doOnNext(data -> MyUtil.printData("map(Triangle)", data))
        // 5. (E)에 의해 new thread에서 데이터 소비(subscribe)
        .observeOn(Schedulers.newThread())                  // (E)
        .subscribe(data -> MyUtil.printData("subscribe", data));

[실행결과]
main | doOnSubscribe |
RxComputationThreadPool-1 | doOnNext | MyShape{color='Red', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Green', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Blue', shape='Ball'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Red', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Green', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Blue', shape='Square'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Blue', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Blue', shape='Triangle'}

예제에서 사용한 MyShape과 MyUtil 클래스이다.

class MyShape{
    String color;
    String shape;

    MyShape(String color, String shape) {
        this.color = color;
        this.shape = shape;
    }

    @Override
    public String toString() {
        return "MyShape{" +
                "color='" + color + '\'' +
                ", shape='" + shape + '\'' +
                '}';
    }
}

class MyUtil {
    static void printData(String message) {
        System.out.println(""+Thread.currentThread().getName()+" | "+message+" | ");
    }

    static void printData(String message, Object obj) {
        System.out.println(""+Thread.currentThread().getName()+" | "+message+" | " +obj.toString());
    }
}



:bookmark: REFERENCE
유동환, 박정준, 「RxJava 프로그래밍」, 한빛미디어