글 작성자: HEROHJK

오늘은 Combine계열 함수들을 정리해보려고 합니다.

포스트들을 보니, Filter, Mapping 계열 함수들을 공부만해놓고 포스팅하진 않았네요.. 이것들도 조만간 포스팅하겠습니다..

StartWith

시퀀스를 방출하기 전, 특정 요소를 방출하기 위한 함수입니다.

example(of: "startWith") {
     let numbers = Observable.of(2, 3, 4)

     let observable = numbers.startWith(1)
     observable.subscribe(onNext: {
         print($0)
     })

     /* Prints:
      1
      2
      3
      4
     */
 }

2,3,4만 방출하도록 하지만, startWith를 이용하면, 해당 요소를 먼저 방출한 후 방출합니다.

특이한 점은, Cold ObservableHot Observable로 바꿔주는 역할도 하게 됩니다.

그래서 이런식으로 코드가 작동됩니다.

let disposeBag = DisposeBag()

let subject = PublishSubject<String>()

subject.startWith("초기값")
    .startWith("두번째 초기값")
    .startWith("연속된 첫번째 초기값", "연속된 두번째 초기값", "연속된 세번째 초기값")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)


/*
연속된 첫번째 초기값
연속된 두번째 초기값
연속된 세번째 초기값
두번째 초기값
초기값
*/

이렇게... 따로 subject에서 onNext를 한적이 없음에도 저렇게 요소를 보내주게 됩니다.

그리고 잘 보면, 호출순이 아닌 호출의 역순으로 값이 방출이 되는것을 확인할수 있었습니다.

concat

요소들을 방출 후 Complete가 난 후 특정 요소를 방출하는, startWith의 반대개념입니다.
기본적으로 Observable두개를 연결하여, 첫번째 시퀀스가 종료 된 후 두번째 시퀀스를 방출하는 함수입니다.
다만 요소가 아닌 시퀀스(Observable)를 연결해줘야 하네요.
그리고 두번째부터 시퀀스가 Hot Observable 이라고 해도, 앞의 시퀀스가 Complete되지 않으면 방출되지 않습니다.

import RxSwift

let disposeBag = DisposeBag()

let subject = PublishSubject<String>()

// BehaviorSubject는 대표적인 Hot Observable
let subject2 = BehaviorSubject<String>(value: "써")


Observable.concat([subject, subject2])
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject.onNext("퍼")
subject.onNext("블")
subject.onNext("리")
subject.onNext("쒸")

// subject.onComplete()

subject2.onNext("브")
subject2.onNext("젝")
subject2.onNext("트")
subject2.onNext("!")

/*
퍼
블
리
쒸
*/

이런식으로 중간에 subject.onComplete()가 주석처리가 되어서, 퍼블리쒸 까지밖에 출력이 되지 않습니다.
주석을 풀어주면 퍼블리쒸써브젝트! 가 방출이 되겠죠? 중간에 BehaviorSubject의 초기값이니깐요.

다만 아래처럼 코드가 작성되면 어떻게 될까요?

import RxSwift

let disposeBag = DisposeBag()

let subject = PublishSubject<String>()
let subject2 = BehaviorSubject<String>(value: "써")

Observable.concat([subject, subject2])
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject.onNext("퍼")
subject.onNext("블")
subject.onNext("리")
subject.onNext("쒸")

subject2.onNext("브")
subject.onCompleted()
subject2.onNext("젝")
subject2.onNext("트")
subject2.onNext("!")

subject2의 초기값인 로 대체되지만, 대체되는 시점이전에는 방출될수가 없기때문에 출력이 되지 않습니다.

따라서 퍼블리쒸브젝트! 라고 출력이 되겠습니다.

concatMap

이건 flatMap과 비슷한 함수입니다.

똑같이 파이프를 합쳐주죠.

하지만 flatMap의 경우 시퀀스에서 이벤트를 방출하는 순서를 보장하지 않지만, concatMap같은 경우 이벤트를 방출하는 순서가 보장이 됩니다. 시퀀스에서 보장이 되는데요. 코드를 보시면 이해가 편합니다.

import RxSwift

let disposeBag = DisposeBag()

let subject = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<PublishSubject<String>>()

subject3.concatMap { $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject3.onNext(subject)
subject3.onNext(subject2)

subject.onNext("퍼")
subject.onNext("블")
subject.onNext("리")
subject.onNext("쒸")
// subject.onNext(" ")

subject2.onNext("써") // subject의 complete가 나지 않았기 때문에,
subject2.onNext("브") // subject2의 방출항목은 무시된다.
subject2.onNext("젝")
subject2.onNext("트")
subject2.onNext("!")
/*
퍼
블
리
쒸
*/

위 함수처럼 subject에서 complete가 되지 않았기 때문에, subject2는 무시됩니다.

merge

결합계열 함수중에 가장 디폴트적인 함수입니다.

그냥 요소 혹은 시퀀스들을 합쳐주는 함수입니다.

Static 함수라는 특징이 있습니다.

RxMarble에는 나와있지 않지만, Error의 경우 시퀀스가 중단되지만 onComplete의 경우 연결되어있는 모든 시퀀스가 중단되는게 아닌 이상 중단되지 않습니다.

import RxSwift

let disposeBag = DisposeBag()

let subject = PublishSubject<String>()
let subject2 = PublishSubject<String>()
// 어떻게 써도 상관 없지만, merge는 Static 함수이다.
// 따라서 subject3.merge(~~~~ 같은 코드는 불가능하다.

//PublishSubject<String>.merge(subject, subject2)
//    .subscribe(onNext: { print($0) })
//    .disposed(by: disposeBag)

Observable.of(subject, subject2).merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject.onNext("퍼")
subject.onNext("블")
subject.onNext("리")
subject.onNext("쒸")
subject.onNext("")

subject2.onNext("써")
subject2.onNext("브")
subject2.onNext("젝")
subject2.onNext("트")
subject2.onNext("!")

/*
퍼
블
리
쒸

써
브
젝
트
!
*/

combineLatest

N개의 시퀀스를 합쳐서, 어느 시퀀스에서든 최신 요소가 방출이 되면, 그것과 함께 나머지 시퀀스들에게서 최근 요소들을 방출하는 함수입니다.

이것 또한 Static 함수입니다

import RxSwift

let disposeBag = DisposeBag()

let subject = PublishSubject<String>()
let subject2 = PublishSubject<String>()

// merge와 마찬가지로 Static 함수이다.
PublishSubject<String>
    .combineLatest(subject, subject2, 
                   resultSelector: { "\($0) \($1)" })
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//Observable.combineLatest(subject, subject2,
//                         resultSelector: { "\($0) \($1)" })
//    .subscribe(onNext: { print($0) })
//    .disposed(by: disposeBag)

subject.onNext("퍼")
subject2.onNext("블")
subject.onNext("리")
subject2.onNext("쒸")
subject.onNext("써")
subject2.onNext("브")
subject.onNext("젝")
subject2.onNext("트")

/*
퍼 블
리 블
리 쒸
써 쒸
써 브
젝 브
젝 트
*/

참고로 제가 예제로 작성한 resultSelector 같은 경우, 어떻게 매핑을 해서 내보낼지 정하는 파라미터입니다.

만약 resultSelector를 안붙인다면, 저 두가지 인수에 대한 처리는 subscribe에서 하게 됩니다.

회원가입 필수항목, 로그인화면 같은곳에서 여러가지의 Flag들을 일괄적으로 체크할때 사용할만 합니다.

zip

N개의 시퀀스들을 통으로 묶어주는 함수입니다.

combineLatest가 하나의 시퀀스에서 최신값들을 갱신할 때, 다른 시퀀스들은 기존 최근 값들을 가져다가 썼다면,

zip같은 경우 무조건 다같이 최신의 값들이 들어올 때에만 이벤트를 방출합니다.

일종의 시퀀스 동기적 함수라고 보면 됩니다.

import RxSwift

let disposeBag = DisposeBag()

let subject = PublishSubject<String>()
let subject2 = PublishSubject<String>()

PublishSubject<String>
    .zip(subject, subject2, resultSelector: { "\($0) \($1)" })
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject.onNext("퍼")
subject2.onNext("블")

subject.onNext("리")
subject2.onNext("쒸")

subject.onNext("써")
subject2.onNext("브")

subject.onNext("젝")
subject2.onNext("트")

subject2.onNext("!") // 얘는 subject에서 무언가를 방출해주지 않으면 절대 볼수 없다.

/*
퍼 블
리 쒸
써 브
젝 트
*/

withLatestFrom

A, B, C세가지 시퀀스가 있다고 가정해봅시다.

A는 버튼, B는 데이터, C가 그것들을 관리하는 파이프 시퀀스라고 할 때,

C에서 버튼 시퀀스인 A를 호출할 때 마다, 데이터 시퀀스인 B의 마지막 요소가 방출되는 일종의 트리거 함수입니다.

import RxSwift

let disposeBag = DisposeBag()

// 순수히 호출(버튼?)의 역할만 하기 때문에 Void, 뭐 딴걸 쓰려면 써도 괜찮다.
let subject = PublishSubject<Void>()
// 방출될 데이터를 가지고 있는 시퀀스
let subject2 = PublishSubject<String>()

// 두개를 엮어 처리할 파이프 시퀀스
let subject3 = subject.withLatestFrom(subject2)

subject3
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject2.onNext("펑")

subject.onNext(())

subject2.onNext("퉁")

subject.onNext(())

subject2.onNext("탕..!")

subject.onNext(())
subject.onNext(())
subject.onNext(())
subject.onNext(())

/*
펑
퉁
탕..!
탕..!
탕..!
탕..!
*/

sample

withLatestFrom과 유사하지만, 중복된 항목의 경우 방출하지 않습니다.

import RxSwift

let disposeBag = DisposeBag()

// 순수히 호출(버튼?)의 역할만 하기 때문에 Void, 뭐 딴걸 쓰려면 써도 괜찮다.
let subject = PublishSubject<Void>()
// 방출될 데이터를 가지고 있는 시퀀스
let subject2 = PublishSubject<String>()

// 두개를 엮어 처리할 파이프 시퀀스
// 이부분이 중요. 트리거를 인자로 넣어준다.
let subject3 = subject2.sample(subject)

subject3
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject2.onNext("펑")
subject.onNext(())
subject2.onNext("퉁")
subject.onNext(())
subject2.onNext("탕..!")
subject.onNext(())
subject.onNext(())
subject.onNext(())
subject.onNext(())

/*
펑
퉁
탕..!
*/

다만 예제코드에서 조금 다른부분이 한가지 더 있는데요, 파이프 시퀀스의 주객이 트리거버튼 시퀀스 -> 데이터시퀀스로 변경되는 점이 있습니다.

amb

두가지 시퀀스를 등록해놓고, 둘중 먼저 오는 시퀀스를 선택하여 방출하는 함수입니다.

선택받지 못한 시퀀스는 영영 방출되지 않습니다.

import RxSwift

let disposeBag = DisposeBag()

let rabbit = PublishSubject<String>()
let turtle = PublishSubject<String>()

let pipe = rabbit.amb(turtle)

pipe.subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

rabbit.onNext("껑충껑충")
turtle.onNext("엉금엉금")
turtle.onNext("엉금엉금")
turtle.onNext("엉금엉금")
turtle.onNext("엉금엉금")
turtle.onNext("엉금엉금")
turtle.onCompleted()
rabbit.onNext("껑충껑충")
rabbit.onNext("껑충껑충")
rabbit.onCompleted()

/*
껑충껑충
껑충껑충
껑충껑충
*/

switchOnNext

파이프 시퀀스, 스위치 시퀀스, 데이터 시퀀스 N개이상 총 4개 이상의 시퀀스가 들어갑니다.

스위치 시퀀스에서 선택한 시퀀스가 파이프시퀀스에 방출되는 개념입니다.

import RxSwift

let disposeBag = DisposeBag()

let trainLine1 = PublishSubject<String>()
let trainLine2 = PublishSubject<String>()

let lineSwitch = PublishSubject<Observable<String>>()

let pipe = lineSwitch.switchLatest()

pipe
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

lineSwitch.onNext(trainLine1)
trainLine1.onNext("칙칙폭폭")
trainLine1.onNext("칙칙폭폭")
trainLine1.onNext("칙칙폭폭")

lineSwitch.onNext(trainLine2)
trainLine1.onNext("칙칙폭폭")
trainLine2.onNext("우우우웅")
trainLine2.onNext("우우우웅")
trainLine2.onNext("우우우웅")

lineSwitch.onNext(trainLine1)
trainLine2.onNext("우우우웅")
trainLine1.onNext("칙칙폭폭")
trainLine1.onNext("칙칙폭폭")
trainLine1.onNext("칙칙폭폭")

/*
칙칙폭폭
칙칙폭폭
칙칙폭폭

우우우웅
우우우웅
우우우웅

칙칙폭폭
칙칙폭폭
칙칙폭폭
*/

reduce, scan

방출된 요소의 값들을 계속해서 연산해나가는 함수입니다.

더하거나, 빼거나, 나누거나, 곱하거나.. 혹은 Mapping으로 계산식을 만들어서 이용할 수 잇습니다.

Complete가 나올때 최종값만이 방출되는 함수는 reduce.

Complete에 상관없이 매번 계산된 값이 방출되는 함수는 scan입니다.

import RxSwift

let disposeBag = DisposeBag()

let scanSubject = PublishSubject<Int>()

let observable = scanSubject.scan(0, accumulator: +)

observable.subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

reduceSubject.onNext(1)
reduceSubject.onNext(1)
reduceSubject.onNext(1)
reduceSubject.onNext(1)
reduceSubject.onNext(1)

/*
1
2
3
4
5
*/
반응형