Observable
Observable이란
RxJava의 가장 핵심적인 요소는 Observable이다. Observable은 데이터 흐름에 맞게 알림을 보내 Observer가 데이터를 사용할 수 있도록 한다. 즉, Observable을 이용해 데이터를 회수하고 변환하는 메커니즘을 정의하고, Observer는 이를 구독해 데이터가 준비되면 이에 반응한다.
이 패턴을 Oberver Pattern이라고 하며, Reactive Programming은 이 Oberver Pattern에 기반을 둔다.
Observable은 Collections(List, ArrayList, …)를 사용할 때와 같은 방식으로 비동기 이벤트 스트림을 처리할 수 있다. 다만 Collections의 Iterable이 Push 방식이라면, Observable은 Iterable의 Pull 버전이다.
Iterable은 Consumer(데이터를 소비하는 곳)가 값을 Pull한 후 값이 도착할 때까지 기다리며 Thread를 차단한다면, Observable은 Thread를 차단하지 않고 값이 사용가능하면 Consumer에게 값을 Push한다.
graph TB;
A[Observable]
B[Observer 1]
C[Observer 2]
D[??]
E[??]
A--data emit-->B;
B--subscribe-->A;
C--subscribe-->A;
A--data emit-->C;
B--do something-->D;
C--do something-->E;
정리하자면,
- Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
- 데이터를 발행할 때마다 구독하고 있는 모든 Observer가 알림을 받는다.
- Observer는 수신한 데이터를 가지고 어떠한 일을 한다.
Observer를 Subscriber, Wather, Reactor 등 사람마다 각기 다른 이름으로 불러서 처음에 배울 때 매우 헷갈렸다. 본 포스트에서는 ReactiveX가 부르는 이름인 Observer를 주로 사용하겠다.
어떻게 Subscribe를 하는가?
Observable이 데이터를 발행하고 알림(Event)을 보내면 Observer는 Observable을 구독(Subscribe)해 데이터를 소비(Consume)한다. 실제로는 Observable이 데이터 흐름을 정의하고 알림을 보낸 뒤 Observer가 Subscribe를 해야 데이터가 발행되고 소비된다.
그럼 Subscribe를 어떻게 할까? 코드를 통해 알아보자.
Observable의 데이터 발행
Observable이 데이터를 발행 한 후 보내는 알림에는 세 가지 종류가 있다.
// Emitter를 통해 알림을 보낸다고 생각하면 된다
public interface Emitter<@NonNull T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
-
onNext
: 데이터의 발행을 알림 -
onComplete
: 모든 데이터의 발행이 완료되었음을 알림, 딱 한 번만 발생하며 이후에 onNext가 발생하면 안됨 -
onError
: 오류가 발생했음을 알림, 이후에 onNext와 onComplete가 발생하지 않음
Observer의 Subscribe
구독(Subscribe)이란 단순하게 수신한 데이터를 가지고 할 행동을 정의하는 것이다.
Observer는 subsribe()
메소드에서 수신한 각각의 알림에 대해 실행할 내용을 지정한다.
public final Disposable subscribe()
public final Disposable subscribe(@NonNull Consumer<? super T> onNext)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete)
public final void subscribe(@NonNull Observer<? super T> observer)
여러 메소드가 overload 되어있다.
그 중 인자가 없는 subsribe()
는 주로 테스트나 디버깅할 때 사용되며, onError 이벤트가 발생했을 때에만 onErrorNotImplementedException를 throw하고 끝낸다.
Disposable class는 구독의 정상적인 해지를 돕는다.
onComplete 이벤트가 발생하면 dispose()를 호출해 Observable이 더 이상 데이터를 발행하지 않도록 구독을 해지한다.
또한 isDisposed()를 통해 구독이 해지되었는지 확인할 수 있다.
예제
Observable을 생성하고 구독하는 과정을 짜보면 다음과 같다.
//Observable 생성
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
// 데이터 흐름 정의
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
// onComplete() 이후의 데이터는 발행되지 않음
emitter.onNext(3);
}
});
// subscribe 함수를 통해 실제로 데이터를 발행하여 소비함
observable.subscribe(
// onNext
new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println("onNext : " + integer);
}
},
// onError
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
System.out.println("onError : " + throwable);
}
},
// onComplete
new Action() {
@Override
public void run() throws Throwable {
System.out.println("onComplete");
}
}
);
[실행결과]
onNext : 1
onNext : 2
onComplete
인터페이스를 확인하기 위해 익명객체를 생성하였지만, 람다식을 활용하면 코드가 훨씬 더 간결해진다!!
Observable의 특징
공식문서에 따르면 Observable을 채택한 이유는 다음과 같다.
Observables Are Composable
Java의 Futures는 비동기 실행을 도와준다. 하지만 여러 제약사항이 존재하여 적절한 흐름을 구성하기 어렵다. 이에 비해 Observable은 비동기 데이터의 적절한 흐름과 순서를 구성할 수 있다.
Observables Are Flexible
Observable은 단순한 scalar 값 뿐만 아니라 무한한 스트림을 차례로 발행할 수 있다. 또한 어떠한 유스케이스에서도 사용할 수 있다.
Observables Are Less Opinionated
Observable은 데이터 소스가 Concurrency이던 Asynchronicity이던 상관없다. Observable은 Thread-pool, Event Loop, Non-Blocking I/O 등 다양한 방식을 통해 구현될 수 있다.
Observable의 종류
RxJava2와 RxJava3에는 데이터 소스를 나타내는 5가지의 기본 클래스가 있다.
-
Observable
: 가장 기본적인 형태, 0개~N개의 데이터 발행, BackPressure 없음 -
Single
: 단 1개의 데이터, 혹은 오류 발행 -
Completable
: 성공 혹은 실패했다는 결과만 발행 -
Maybe
: 0개 또는 1개 완료, 오류 -
Flowable
: 0개~N개의 데이터 발행, BackPressure 존재
Observable 생성하기
Observable을 생성할 때에는 직접 인스턴스를 만들지 않고 정적 팩토리 함수(생성 연산자)를 호출한다. 이 중 가장 기본적인 팩토리 함수인 just()
, create()
, fromXXX()
를 알아볼 것이다.
just()
가장 간단한 생성 방식이다. 함수에 인자로 넣은 데이터를 차례로 발행한다. 인자로 10개까지 전달할 수 있다.
자동으로 onNext, onComplete 혹은 onError가 호출된다.
데이터가 그대로 발행되므로 다르게 변경하고 싶으면 map
과 같은 연산자를 통해 변환해야한다.
//그대로 발행
Observable.just(1, 2, 3)
.subscribe(System.out::println);
// 변환하고 싶은 경우
Observable.just(1, 2, 3)
.map(x -> x * 10)
.subscribe(System.out::println);
[실행결과]
1
2
3
[실행결과]
10
20
30
create()
Observable을 생성하지만 just()와는 다르게 프로그래머가 직접 onNext, onComplete, onError를 호출해야한다. 위에서 본 예제에서 쓰인 방식이다.
Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}).subscribe(System.out::println);
[실행결과]
1
2
3
fromXXX()
여러 데이터를 다뤄야 하는 경우 사용한다. 정의된 메소드의 종류는 다음과 같으며 특정 타입의 데이터를 Observable로 바꿔주는 메소드이다. 이 중 몇개만 대표적으로 보겠다.
- fromArray()
: 배열을 Observable로 변환해준다.
Integer [] array = {1, 2, 3, 4, 5};
Observable.fromArray(array)
.subscribe(System.out::println);
- fromIterable()
: Iterable Interface가 구현되어 있는 Class를 Observable로 변환한다. Iterable이 구현되어 있는 Class에는 대표적으로 ArrayList, Stack, Vector 등이 있다.
ArrayList<Integer> arrayList = new ArrayList<>();
arrayList.add(1);
arrayList.add(2);
arrayList.add(3);
Observable.fromIterable(arrayList)
.subscribe(System.out::println);
- fromSingle(), fromMaybe(), fromCompletable()
: RxJava의 데이터 소스 클래스인 Single, Maybe, Completable을 Observable로 변환해준다. 다음 포스트에서 알아볼 예정이다.
기타
이외에도 다른 생성 연산자들이 많은데, 간단히 정리하자면 다음과 같다.
Method | 설명 |
---|---|
Create |
onNext, onError, onComplete를 일일이 명시하며 Observable 생성 |
Defer |
Observer가 구독할 때까지 기다렸다가 구독하면 그 때 Observable 생성 |
Empty /Never /Throw
|
아이템을 0개 방출한 후 종료/종료하지않음/에러발생 |
From |
다른 객체를 Observable로 변환 |
Interval |
시간 간격을 두고 데이터를 방출하는 Observable 생성 |
Just |
parameter로 전달한 아이템을 그대로 발행하는 Observable 생성 |
Range |
특정 범위 내 Integer 형태의 아이템 발행하는 Observable 생성 |
Repeat |
아이템을 지정한 횟수만큼, 혹은 무한히 반복하여 발행 |
Start |
연산 후 특정 값을 반환, 함수처럼 작용함 |
Timer |
지정한 시간 delay 이후 아이템 발행 |
REFERENCE
유동환, 박정준, 「RxJava 프로그래밍」, 한빛미디어
ReactiveX Introduction
ReactiveX Operators