Android/RxJava

Android RxJava - Backpressure

강태종 2022. 3. 2. 10:21

Observable

데이터 소비와 상관없이 데이터를 계속 발행합니다.

fun observableTest() {
    Observable.range(1, 10000)
        .doOnNext {
            println("emit : $it")
        }
        .observeOn(Schedulers.io())
        .subscribe {
            Thread.sleep(100L)
            println("comsume : $it")
        }

    Thread.sleep(100000)
}
emit : 1
emit : 2
emit : 3
...
emit : 9998
emit : 9999
emit : 10000
comsume : 1
comsume : 2
comsume : 3
...

 

Flowable

FlowableObservable과 다르게 Backpressure를 지원하며, 데이터 소비에 맞춰서 데이터를 발행합니다.

fun flowableTest() {
    Flowable.range(1, 10000)
        .doOnNext {
            println("emit : $it")
        }
        .observeOn(Schedulers.io())
        .subscribe {
            Thread.sleep(100L)
            println("comsume : $it")
        }

    Thread.sleep(100000)
}
emit : 1
emit : 2
emit : 3
...
emit : 126
emit : 127
emit : 128
comsume : 1
comsume : 2
comsume : 3
...
// 소비가 불균형하면 데이터 발행을 멈추고, 적당히 소비가 이뤄지면 다시 발행을 시작합니다.
comsume : 94
comsume : 95
comsume : 96
emit : 129
emit : 130
...

 

Observable는 데이터 소비와 상관없이 데이터 발행을 계속합니다. 이는 OOME의 위험이 있습니다. 반면 Flowable은 데이터 소비가 느려지면 데이터 발행을 잠시 멈추고, 적당히 소비가 진행되면 다시 발행을 시작합니다.


Backpressure

배압이란 데이터 발행과 소비가 불균형적으로 일어날 때 발생합니다. 데이터 발행이 매우 빠르고 데이터 소비가 매우 느린 경우, 데이터는 스트림에 계속 쌓이며 OOME가 발생합니다.

 

이러한 현상을 배압으로 제어할 수 있도록 RxJava는 제공합니다.

  • BackpressureStrategy.MISSING : 배압 전력을 구현하지 않습니다.
  • BackpressureStrategy.ERROR : 소비가 불균형할 때 MissingBackpressureException을 발생하여 처리합니다. (Observable의 기본값)
  • BackpressureStrategy.BUFFER : 데이터를 버퍼에 저장합니다. OOME가 발생할 수 있습니다. (Flowable의 기본값)
  • BackpressureStrategy.DROP : 소비가 불균형할 때 데이터를 버립니다.
  • BackpressureStrategy.LATEST : 소비가 불균형할 때 최신 데이터만 유지합니다.

onBackPressureBuffer

발행된 데이터를 버퍼에 저장하며, Overflow 발생 시 동작을 설정할 수 있습니다.

fun onBackpressureBuffer() {
    Flowable.range(1, 10000)
        .doOnNext {
            println("emit : $it")
        }.onBackpressureBuffer(BUFFER_SIZE) {
            println("OnOverFlow")
        }.observeOn(Schedulers.io())
        .subscribe {
            Thread.sleep(1000L)
            println("consume : $it")
        }
}
...
// 데이터 발행중 Overflow가 발생합니다
emit : 128
emit : 129
emit : 130
OnOverFlow
...

 

onBackPressureDrop()

Drop할 때 액션을 정할 수 있습니다.

fun onBackpressureDrop() {
    Flowable.range(1, 10000)
        .doOnNext {
            println("emit : $it")
        }
        .onBackpressureDrop {
            println("onDrop : $it")
        }
        .observeOn(Schedulers.io())
        .subscribe {
            Thread.sleep(100L)
            println("consume : $it")
        }

    Thread.sleep(2000L)
}
// 정상적으로 데이터를 발행합니다
emit : 1
emit : 2
emit : 3
emit : 4
emit : 5
...
// 데이터 소비가 불균형하여 버립니다.
emit : 129
onDrop : 129
emit : 130
onDrop : 130
emit : 131
onDrop : 131
emit : 132
onDrop : 132
emit : 133
onDrop : 133
...
emit : 9999
onDrop : 9999
emit : 10000
onDrop : 10000
consume : 1
consume : 2
consume : 3
...

 

onBackpressureLastest

기존의 발행한 데이터를 버립니다.

fun onBackpressureLatest() {
    Flowable.range(1, 10000)
        .onBackpressureLatest()
        .doOnNext {
            println("emit : $it")
        }
        .observeOn(Schedulers.io())
        .subscribe {
            Thread.sleep(10L)
            println("consume : $it")
        }

    Thread.sleep(100000)
}
// 정상적으로 데이터를 발행합니다.
emit : 1
emit : 2
emit : 3
...
emit : 125
emit : 126
emit : 127
emit : 128
...
// 소비가 시작하지만, 발행 속도를 따라가지 못하고 있습니다.
consume : 1
consume : 2
consume : 3
...
// 결국 이전에 발행한 데이터를 버리고 마지막 데이터(10000)를 발행합니다.
consume : 95
consume : 96
emit : 10000
consume : 97
consume : 98
...
// 129 ~ 9999 데이터가 버려졌습니다.
consume : 127
consume : 128
consume : 10000