RxJava를 공부하면서 핵심 개념들을 직접 코드로 써봤다. 사용한 버전은 RxJava 3.1.1이다.
RxJava란
ReactiveX의 Java 구현체다. 비동기 데이터 스트림을 Observer 패턴, Iterator 패턴, 함수형 프로그래밍을 조합해 처리한다. 생산자(Producer)가 데이터를 통지하면 소비자(Consumer)가 구독해서 처리하는 구조다.
생산자 타입
RxJava 3에서 생산자 타입은 크게 다섯 가지다.
Observable
가장 기본적인 타입. 0개 이상의 데이터를 통지하고, 완료(onComplete) 또는 에러(onError)로 끝난다. 백프레셔(backpressure)를 지원하지 않는다.
Observable<String> observable = Observable.create(emitter -> {
String[] datas = {"Hello, World", "안녕 RxJava"};
for (String data : datas) {
if (emitter.isDisposed()) return;
emitter.onNext(data);
}
emitter.onComplete();
});
observable
.observeOn(Schedulers.computation())
.subscribe(new Observer<>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(String item) {
System.out.println(Thread.currentThread().getName() + ": " + item);
}
@Override public void onComplete() {
System.out.println(Thread.currentThread().getName() + ": 완료");
}
@Override public void onError(Throwable e) { e.printStackTrace(); }
});observeOn(Schedulers.computation())으로 소비자 측 처리를 별도 스레드에서 실행할 수 있다.
Flowable
Observable과 구조는 같지만 백프레셔를 지원한다. 생산 속도가 소비 속도보다 빠를 때 데이터를 어떻게 처리할지 BackpressureStrategy로 지정한다. Subscriber가 request(n)으로 받을 데이터 개수를 직접 제어한다.
Flowable<String> flowable = Flowable.create(emitter -> {
String[] datas = {"Hello, World", "안녕 RxJava"};
for (String data : datas) {
if (emitter.isCancelled()) return;
emitter.onNext(data);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable
.observeOn(Schedulers.computation())
.subscribe(new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1L);
}
@Override
public void onNext(String data) {
System.out.println(Thread.currentThread().getName() + ": " + data);
this.subscription.request(1L);
}
@Override public void onComplete() { ... }
@Override public void onError(Throwable e) { ... }
});Observable의 Disposable 대신 Subscription을 사용하고, 구독 해지는 cancel()로 한다.
Single
데이터를 딱 하나만 통지하는 타입. onSuccess 또는 onError로 끝난다. onComplete가 없다.
Single<DayOfWeek> single = Single.create(emitter -> {
emitter.onSuccess(LocalDate.now().getDayOfWeek());
});
single.subscribe(new SingleObserver<>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onSuccess(DayOfWeek dayOfWeek) {
System.out.println(dayOfWeek);
}
@Override public void onError(Throwable e) { System.out.println("에러: " + e); }
});Maybe
0개 또는 1개의 데이터를 통지한다. 데이터가 있으면 onSuccess, 없으면 onComplete, 오류면 onError다. Single과 달리 빈 결과를 자연스럽게 표현할 수 있다.
Maybe<DayOfWeek> maybe = Maybe.create(emitter -> {
emitter.onSuccess(LocalDate.now().getDayOfWeek());
});데이터 없이 emitter.onComplete()만 호출하면 MaybeObserver.onComplete()가 불린다.
Completable
데이터를 통지하지 않고 완료 여부만 알린다. 결과값이 필요 없는 작업(저장, 삭제 등)에 적합하다.
Completable completable = Completable.create(CompletableEmitter::onComplete);
completable.subscribe(new CompletableObserver() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onComplete() { System.out.println("완료"); }
@Override public void onError(Throwable e) { System.out.println("에러: " + e); }
});오퍼레이터
메서드 체이닝으로 데이터를 변환하거나 필터링한다.
Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter(data -> data % 2 == 0)
.map(data -> data * 100)
.subscribe(data -> System.out.println("data=" + data));filter로 짝수만 남기고, map으로 100을 곱했다. 결과는 200, 400, 600, 800, 1000이다.
scan은 누적 연산자다. 이전 결과와 현재 데이터를 조합해서 새 값을 만든다. 아래 예제는 중간에 외부 상태(calsMethod)를 바꿔서 계산 방식을 더하기에서 곱하기로 전환한다.
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(7)
.scan((sum, data) -> {
if (calsMethod == State.ADD) return sum + data;
else return sum * data;
})
.subscribe(data -> System.out.println("data=" + data));interval은 일정 주기로 0부터 순서대로 값을 통지한다. take(n)으로 개수를 제한할 수 있다.
구독 해지
Disposable.dispose()(Observable) 또는 Subscription.cancel()(Flowable)로 구독을 해지한다. 아래는 구독 시작 후 500밀리초가 지나면 스스로 해지하는 예제다.
Flowable.interval(200L, TimeUnit.MILLISECONDS)
.subscribe(new Subscriber<>() {
private Subscription subscription;
private long startTime;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.startTime = System.currentTimeMillis();
this.subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long data) {
if ((System.currentTimeMillis() - startTime) > 500) {
this.subscription.cancel();
System.out.println("구독 해지");
return;
}
System.out.println("data=" + data);
}
...
});내 생각
RxJava의 핵심은 결국 데이터 흐름을 선언적으로 표현하는 것이다. Observable과 Flowable의 차이, Disposable과 Subscription의 차이처럼 타입 간 미묘한 구분이 처음에는 헷갈렸다. 직접 코드를 작성해보니 백프레셔가 왜 필요한지, request(n) 패턴이 어떤 의미인지 조금 더 감이 잡혔다.