티스토리 뷰

Android/RxJava

Android RxJava - Observable

강태종 2022. 3. 1. 15:31

Observable

RxJava의 가장 핵심적인 요소입니다. Observable은 데이터를 흐름에 맞게 만들어서 Observer에게 보내는 역할을 합니다.

RxJava는 이벤트를 통한 Observer Pattern을 기반으로 만들어졌습니다.


Observable

가장 기본적인 형태, N개의 데이터를 발행할 수 있습니다. onComplete를 통해 완료를 알릴 수 있으며 onError를 통해 에러를 처리할 수 있습니다. onComplete, onError가 호출되면 이후에 발행하는 onNext는 무시합니다.

fun observable() {
    Observable.create<Int> { emitter ->
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
        emitter.onNext(4)
    }.subscribe {
        println(it)
    }
}
1
2
3

 

Floawable

Observable과 비슷한 성격을 가집니다. Observable과 차이점은 Backpressure를 지원합니다.

fun flowable() {
    Flowable.just(1, 2, 3)
        .subscribe {
            println(it)
        }
}
1
2
3

 

Maybe

Observer와 다른 점은 데이터를 단 한번만 발행할 수 있습니다.

fun maybe() {
    Maybe.create<Int> { emitter ->
        emitter.onSuccess(1)
        emitter.onSuccess(2)
        emitter.onSuccess(3)
        emitter.onComplete()
    }.subscribe {
        println(it)
    }
}
1

 

Single

Maybe와 다른 점은 onComplete 함수가 없습니다. 즉 Maybe는 데이터를 발행하지 않아도 완료할 수 있지만, Single은 데이터를 발행하지 않으면 완료할 수 없습니다.

fun single() {
    Single.create<Int> { emitter ->
        emitter.onSuccess(1)
        emitter.onSuccess(2)
        emitter.onSuccess(3)
    }.subscribe { value ->
        println(value)
    }
}
1

 

Completable

성공 혹은 실패만 결과를 제공할 수 있습니다.

fun complete() {
    Completable.create { emitter -> 
        emitter.onComplete()
    }.subscribe { 
        // doSomething
    }
}

생성하기

Observable 객체를 직접 생성하는 것이 아닌 just, create 등 팩토리 함수를 통해 생성합니다. Observable마다 구현된 팩토리 함수가 조금씩 차이가 있습니다. (Single, Maybe는 매개 변수를 여러개 갖는 just 함수가 없습니다.)

 

just

기본적인 형태입니다. Single, Maybe는 하나의 데이터만 발행할 수 있지만, Observable, Flowable은 최대 10개의 데이터를 발행할 수 있습니다. 자동으로 onSubscribe, onComplete가 호출되며 Operator를 사용하여 데이터 스트림을 변환할 수 있습니다.

fun just() {
    Observable.just(1, 2, 3)
        .subscribe(
            object : Observer<Int> {
                override fun onSubscribe(d: Disposable) {
                    println("onSubscribe")
                }

                override fun onNext(t: Int) {
                    println("onNext : $t")
                }

                override fun onError(e: Throwable) {
                    println("onError $e")
                }

                override fun onComplete() {
                    println("onComplete")
                }
            }
        )
}
onSubscribe
onNext : 1
onNext : 2
onNext : 3
onComplete

 

내부 구현부를 확인하면 매개변수가 1개인 함수부터 10개인 함수까지 구현되어 있으며 null을 가질 수 없고 fromArray로 묶어서 반환합니다.

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> Observable<T> just(@NonNull T item1, @NonNull T item2, @NonNull T item3, @NonNull T item4, @NonNull T item5, @NonNull T item6, @NonNull T item7, @NonNull T item8, @NonNull T item9, @NonNull T item10) {
    Objects.requireNonNull(item1, "item1 is null");
    Objects.requireNonNull(item2, "item2 is null");
    Objects.requireNonNull(item3, "item3 is null");
    Objects.requireNonNull(item4, "item4 is null");
    Objects.requireNonNull(item5, "item5 is null");
    Objects.requireNonNull(item6, "item6 is null");
    Objects.requireNonNull(item7, "item7 is null");
    Objects.requireNonNull(item8, "item8 is null");
    Objects.requireNonNull(item9, "item9 is null");
    Objects.requireNonNull(item10, "item10 is null");

    return fromArray(item1, item2, item3, item4, item5, item6, item7, item8, item9, item10);
}

 

 

create

just와 다르게 직접 onNext, onComplete, onError 등 함수를 호출해야 합니다.

fun createTest() {
    Observable.create<Int> { emitter ->
        println("onCreate Start")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onComplete()
        println("onCreate Finish")
    }.subscribe(
        object : Observer<Int> {
            override fun onSubscribe(d: Disposable) {
                println("onSubscribe")
            }

            override fun onNext(t: Int) {
                println("onNext : $t")
            }

            override fun onError(e: Throwable) {
                println("onError $e")
            }

            override fun onComplete() {
                println("onComplete")
            }
        }
    )
}
onSubscribe
onCreate Start
onNext : 1
onNext : 2
onComplete
onCreate Finish

 

 

fromXXX

fromArray, fromIterator 등 다양한 형태를 Observable로 만들 수 있습니다.

fun from() {
    Observable.fromArray(1, 2, 3)
        .subscribe { 
            println(it)
        }
    
    Observable.fromIterable(listOf(4, 5, 6))
        .subscribe {
            println(it)
        }
}
1
2
3
4
5
6

 

기타 (Link)

  • Defer : Observer가 구독을 시작하면 Observable 생성
  • Empty : 데이터를 발행하지 않고 종료
  • Never : 데이터를 발행하지 않으며, 종료하지도 않는다.
  • Throw : 데이터를 발행하지 않으며 에러를 발생한다.
  • Interval : 주기별로 데이터를 발행한다.
  • Range : 범위의 정수를 발행한다.
  • Repeat : 특정 횟수만큼 반복하여 데이터를 발행한다.
  • Start : 연산 후 특정 값을 반환한다.
  • Timer : 지정된 시간 후에 데이터를 발행한다.

Observable vs Flowable

MissingBackpressureException, OutOfMemoryError를 피하기 위해 ObservableFlowable를 고민한다면 아래의 사항을 확인해야 합니다. (원문)

 

Observable

  • 1000개 미만의 데이터 또는 OOME가 발생할 가능성이 적으면 Observable를 사용합니다.
  • GUI Event(Mouse event, Touch event)를 다룰 때 사용합니다. 하지만 sampling, debouncing을 고려하는 것도 좋은 방법입니다.
  • 일반적인 상황에서 Observable은 Flowable보다 좋은 성능을 보여줍니다.

Flowable

  • 만개 이상의 데이터를 핸들링할 때 사용합니다.
  • 파일을 읽으면서 Thread를 차단하는 경우
  • 데이터베이스를 읽으면서 Thread를 차단하는 경우
  • Network IO를 통해 Thread를 차단하는 경우

'Android > RxJava' 카테고리의 다른 글

Android RxJava - Scheduler  (0) 2022.03.05
Android RxJava - Backpressure  (0) 2022.03.02
Android RxJava - Hot Observable, Cold Observable  (0) 2022.03.02
Android RxJava - Operator  (0) 2022.03.01
Android RxJava - Reactive Programming  (0) 2022.02.28
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/07   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
글 보관함