Android/RxJava

Android RxJava - Hot Observable, Cold Observable

강태종 2022. 3. 2. 00:25

Cold Observable

Cold Observable은 Observable를 생성하고 Observer가 subscribe를 호출할 때 데이터 발행을 시작합니다. 즉, Observer가 없으면 데이터를 발행하지 않습니다. 다른말로 Lazy하게 데이터를 발행한다고 표현합니다.

일반적으로 One Time 질의(DB 조회, API 호출, File IO 등)에 사용하고, 원하는 시점에 요청하여 결과를 받아오는 작업에 쓰입니다.

 

* 지금까지 예시로 사용했던 Single, Maybe 등이 Cold Observable에 속합니다.


Hot Observable

Cold와 반대로 Observable를 생성하면 Observer 유무의 상관없이 데이터를 발행합니다. 중간에 Observer가 등록되어 subscribe를 시작하면 이전에 발행됐던 데이터를 받는 것을 보장할 수 없으며, 이어서 발행되는 데이터를 수신하게 됩니다.

일반적으로 사용자 입력, 이벤트, 센서 데이터 등에 사용합니다.

ConnectableObservable

Cold Observable에서 publish()를 호출하게 되면 ConnectableObservable로 변환되며 HotObservable처럼 작동합니다. 데이터를 발행하기 위해서는 connect()를 호출하면 됩니다.

 

1. observer1를 만들고 3초 뒤에 observer2를 만듭니다.

- ConnectableObservable가 connect()를 호출하지 않았기 때문에 데이터를 발행하지 않습니다.

- connect()를 호출하면 데이터 발행을 시작하고 observer1, observer2는 동시에 데이터를 수신하게 됩니다.

fun connect1() {
    val observable = Observable.interval(1, TimeUnit.SECONDS)
        .publish()

    observable.subscribe {
        /*
        처음에는 observable.connect() 가 호출되지 않았기 때문에 데이터가 발행되지 않습니다.
         */
        println("Observer 1 -> $it")
    }
    Thread.sleep(3000L)
    observable.subscribe {
        println("Observer 2 -> $it")
    }

    /*
    observable.connect() 가 호출되면 데이터가 발행되기 시작합니다.
    observer1, observer2는 이후에 데이터를 수신하기 시작합니다.
     */
    observable.connect()

    Thread.sleep(3000L)
}
Observer 1 -> 0
Observer 2 -> 0
Observer 1 -> 1
Observer 2 -> 1
Observer 1 -> 2
Observer 2 -> 2

 

2. observer1를 만들고 connect()를 호출합니다. 그리고 3초 뒤 observer2를 만듭니다.

- observer1는 처음부터 데이터를 수신합니다.

- observer2는 중간부터 데이터를 수신하며, 이전 데이터는 수신하지 않습니다.

fun connect2() {
    val observable = Observable.interval(1, TimeUnit.SECONDS)
        .publish()

    observable.subscribe {
        println("Observer 1 -> $it")
    }
    observable.connect()

    Thread.sleep(3000L)
    observable.subscribe {
        /*
        중간부터 값을 수신하며 기존에 발행됐던 데이터는 수신하지 않습니다.
         */
        println("Observer 2 -> $it")
    }

    Thread.sleep(3000L)
}
Observer 1 -> 0
Observer 1 -> 1
Observer 1 -> 2
Observer 1 -> 3
Observer 2 -> 3
Observer 1 -> 4
Observer 2 -> 4
Observer 1 -> 5
Observer 2 -> 5

 

3. refCount

refCount는 Observer 수의 따라 데이터를 발행하게 됩니다. Observe가 생기면 데이터를 발행하고, Observe가 없으면 데이터 발행을 중지합니다. 이때 처음으로 Observe가 생길 때 상태가 초기화됩니다.

 

- observe1, observe2를 관찰하다 종료합니다.

- observe3을 관찰합니다.

    fun refCount() {
        val observable = Observable.interval(1, TimeUnit.SECONDS)
            .publish()
            .refCount()

        val disposable1 = observable.subscribe {
            println("Observe1 -> $it")
        } // 1, 2, 3 ...

        Thread.sleep(3000L)

        val disposable2 = observable.subscribe {
            println("Observe2 -> $it")
        } // 4, 5, 6 ...

        Thread.sleep(3000L)
        disposable1.dispose()
        disposable2.dispose()

        observable.subscribe {
            println("Observe3 -> $it")
        } // 1, 2, 3

        Thread.sleep(3000L)
    }
Observe1 -> 0
Observe1 -> 1
Observe1 -> 2
Observe1 -> 3
Observe2 -> 3
Observe1 -> 4
Observe2 -> 4
Observe1 -> 5
Observe2 -> 5
Observe3 -> 0
Observe3 -> 1
Observe3 -> 2