Android/RxJava
Android RxJava - Subject
강태종
2022. 3. 8. 01:10
Subject(링크)
Subject는 데이터 발행과 구독을 한번에 할 수 있는 클래스입니다. onNext, onError, onComplete 함수로 데이터를 발행할 수 있으며, subscribe 함수로 데이터 구독을 할 수 있습니다.
fun subject() {
val subject = PublishSubject.create<Int>()
subject.subscribe {
println(it)
}
subject.onNext(1)
subject.onError(Exception())
subject.onComplete()
}
AsyncSubject
Subject에서 마지막으로 발행한 데이터만 받아옵니다. 완료되기 전 발행한 데이터는 무시합니다. 만약 오류가 발생하면 아무 값도 발행하지 않고 오류를 보냅니다.
fun asyncSubject() {
val subject = AsyncSubject.create<Int>()
subject.subscribe {
println("Observer1 : $it")
}
subject.onNext(1)
subject.onNext(2)
subject.subscribe {
println("Observer2 : $it")
}
subject.onNext(3)
subject.onComplete()
}
Observer1 : 3
Observer2 : 3
BehaviorSubject
Observer가 구독을 시작하면 가장 최근의 값을 받아온다. 만약 오류가 발생한다면 아무 값도 발행하지 않고 오류를 보냅니다.
fun behaviorSubject() {
val subject = BehaviorSubject.create<Int>()
subject.subscribe {
println("Observer1 : $it")
}
subject.onNext(1)
subject.onNext(2)
subject.subscribe {
println("Observer2 : $it")
}
subject.onNext(3)
subject.onComplete()
}
PublishSubject
가장 기본 형태의 Subject입니다. 구독을 시작하고 그 이후에 발행되는 값만 받아옵니다. 주의할 점은 Hot Observable로 작동할 때 구독전에 발행한 값을 저장하지 않으므로 잃어버립니다.
fun publishSubject() {
val subject = PublishSubject.create<Int>()
subject.subscribe {
println("Observer1 : $it")
}
subject.onNext(1)
subject.onNext(2)
subject.subscribe {
println("Observer2 : $it")
}
subject.onNext(3)
subject.onComplete()
}
Observer1 : 1
Observer1 : 2
Observer1 : 3
Observer2 : 3
ReplaySubject
Observer가 구독을 시작하면 발행됐던 모든 값을 수신합니다. Publish와 다르게 발행한 값을 잃어버릴 위험이 적습니다. 또한 OOME를 피하기 위해 적절한 Backpressure 전략이 필요합니다.
Cold, Hot, Subject 차이
Observable vs Subject
Subject는 데이터 발행과 구독을 할 수 있는 클래스입니다. Subject는 Observable과 다르게 상태를 저장합니다. 발행된 데이터를 저장하고 Observer를 관리합니다.
// Subject는 Observable을 상속받고, Observe를 구현하기 때문에 두 기능을 수행할 수 있다.
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
Hot Observable vs Cold Observable
실제 Hot, Cold을 나누는 Class는 존재하지 않습니다. 의미상 분류입니다.
Hot Observable
- 옵저버에 상관없이 데이터를 발행한다.
- 멀티 캐스트 방식이다. (하나의 스트림을 사용하고, 이벤트가 발생했을 때 스트림을 구독하는 모든 옵저버에 이벤트를 보낸다.)
- 옵저버는 구독한 시점부터 이벤트를 받는다. (HotObservable의 성격마다 가장 최근 데이터부터 데이터를 받거나 처음부터 발행된 모든 데이터를 받기도 한다.)
Cold Observable
- 구독을 했을 때 데이터 발행을 시작한다.
- 유니 캐스트 방식이다. (Observable과 Observer가 1:1로 대응하는 스트림이 있다.)
- 데이터를 처음부터 발행하여 모든 이벤트를 받는다.
@Test
fun coldObservableStream() {
val observable = Observable.create<Int> {
val value = Random.nextInt(100)
println("Create : $value")
it.onNext(value)
}
observable.subscribe {
println("Observer1 : $it")
}
observable.subscribe {
println("Observer2 : $it")
}
}
Create : 83
Observer1 : 83
Create : 98
Observer2 : 98
@Test
fun hotObservableStream() {
val observable = Observable.create<Int> {
val value = Random.nextInt(100)
println("Create : $value")
it.onNext(value)
}.publish()
observable.subscribe {
println("Observer1 : $it")
}
observable.subscribe {
println("Observer2 : $it")
}
observable.connect()
}
Create : 97
Observer1 : 97
Observer2 : 97