subscribeOn과 observeOn의 차이
RxJava는 operator를 통해 chaining 기법을 사용할 수 있다.
subscribeOn과 observeOn도 마찬가지로 chaining 기법을 적용할 수 있다.
먼저 두 연산자의 역할은 다음과 같다.
subscribeOn
은 Observable이 데이터 흐름을 발생시키고 연산하는 스레드를 지정할 수 있고,
observeOn
은 Observable이 Observer에게 알림을 보내는 스레드를 지정할 수 있다.
그럼 두 연산자의 차이는 무엇일까?
subscribeOn은 여러번 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관없다.
observeOn은 여러번 호출될 수 있으며 이후에 실행되는 연산에 영향을 주므로 위치가 중요하다.
예제
ArrayList<MyShape> shapes = new ArrayList<>();
shapes.add(new MyShape("Red","Ball"));
shapes.add(new MyShape("Green","Ball"));
shapes.add(new MyShape("Blue","Ball"));
Observable.fromIterable(shapes)
.subscribeOn(Schedulers.computation()) // (A)
.subscribeOn(Schedulers.io()) // (B)
// 1. 현재 스레드(main)에서 Observable을 구독
.doOnSubscribe(data -> MyUtil.printData("doOnSubscribe"))
// 2. (A)에 의해 computation 스케줄러에서 데이터 흐름 발생, (B)는 영향 X
.doOnNext(data -> MyUtil.printData("doOnNext", data))
// 3. (C)에 의해 map 연산이 new thread에서 실행
.observeOn(Schedulers.newThread()) // (C)
.map(data -> {data.shape = "Square"; return data;})
.doOnNext(data -> MyUtil.printData("map(Square)", data))
// 4. (D)에 의해 map 연산이 new thread에서 실행
.observeOn(Schedulers.newThread()) // (D)
.map(data -> {data.shape = "Triangle"; return data;})
.doOnNext(data -> MyUtil.printData("map(Triangle)", data))
// 5. (E)에 의해 new thread에서 데이터 소비(subscribe)
.observeOn(Schedulers.newThread()) // (E)
.subscribe(data -> MyUtil.printData("subscribe", data));
[실행결과]
main | doOnSubscribe |
RxComputationThreadPool-1 | doOnNext | MyShape{color='Red', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Green', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Blue', shape='Ball'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Red', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Green', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Blue', shape='Square'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Blue', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Blue', shape='Triangle'}
예제에서 사용한 MyShape과 MyUtil 클래스이다.
class MyShape{
String color;
String shape;
MyShape(String color, String shape) {
this.color = color;
this.shape = shape;
}
@Override
public String toString() {
return "MyShape{" +
"color='" + color + '\'' +
", shape='" + shape + '\'' +
'}';
}
}
class MyUtil {
static void printData(String message) {
System.out.println(""+Thread.currentThread().getName()+" | "+message+" | ");
}
static void printData(String message, Object obj) {
System.out.println(""+Thread.currentThread().getName()+" | "+message+" | " +obj.toString());
}
}
REFERENCE
유동환, 박정준, 「RxJava 프로그래밍」, 한빛미디어