RxSwift шпаргалка по операторам (+ PDF)

4121ee0577d34944969a7512a4e92172.png

Заинтересовавшись темой функционального программирования я встал на распутье, — какой фреймворк выбрать для ознакомления. ReactiveCocoa — ветеран в iOS кругах, по нему вдоволь информации. Но он вырос с Objective-C, и хотя это не является проблемой, но все же в данный момент я в основном пишу именно на Swift, — хотелось бы взять решение изначально спроектированное с учетом всех плюшек языка. RxSwift же порт Reactive Extensions, имеющего долгую историю, но сам порт свежий и написанный именно под Swift. На нем я и решил остановиться.
Но специфика документации по RxSwift в том, что описание всех команд ведет на reactivex.io, а там в основном дается общая информация, руки у разработчиков не дошли еще сделать документацию именно для RxSwift, что не всегда удобно. Некоторые команды имеют тонкости в реализации, есть те, о которых в общей документации нет ничего кроме упоминания.
Прочитав все главы вики с RxSwift гитхаба, я сразу решил поразбираться с официальными примерами, тут то и стало ясно, что с RX такое не пройдет, нужно хорошо понимать основы, иначе будешь как мартышка с копипастом гранатой. Я начал разбирать самые сложные для понимания команды, потом те, что вроде понятны, но задав себе вопросы по ним я понял, что лишь догадываюсь на то как верно ответить, но не уверен.
В общем ничтоже сумняшеся я решил проработать все операторы RxSwift. Лучший способ что то понять в программировании — запустить код и посмотреть как он отработает. Затем учитывая специфику реактивного программирования — очень полезны схемы, ну и краткое описание на русском. Закончив сегодня работу, я подумал, что грех не поделиться результатами с тем, кто лишь присматривается к теме реактивного программирования.
Много картинок и текста под катом, очень много!

Предварительно я рекомендую просмотреть официальную документацию, у меня передана основная суть и специфика RxSwift команд, а не основы.
Так же можно «поиграться» с шариками в схемах, так называемые RxMarbles, есть бесплатная версия под iPhone/iPad

Итак, в этой статье я рассмотрю все (ну или почти все) команды RxSwift, по каждой я дам краткое описание, схему (если это имеет смысл), код, результат выполнения, при необходимости сделаю комментарии по выводу в лог результатов выполнения кода.
В статье заголовок каждой команды — ссылка на на официальную документацию, т.к. я не ставил перед собой цели перевести все нюансы по командам.

Вот ссылка на гитхаб, куда я склонировал официальный репозиторий RxSwift, и добавил свою песочницу (DescribeOperators.playground), где и будет практически тот же код, что и в статье.
А вот и ссылка конкретно на PDF где в виде mindMap собраны все команды, что позволяет быстро просмотреть их все. Кусочки кода в PDF приложены для того чтобы увидеть как и с каким параметрами нужно работать с командой. Изначально ради этого PDF я все и затеял — иметь под рукой документ в котором наглядно видны все команды с их схемами. PDF получился огромным (в плане рабочего пространства, а не веса), но я проверял, даже на iPad 2 все нормально просматривается.

Обо всех ошибках просьба писать в личку, объем работ оказался слегка великоват, после четвертой вычитки текста мои глаза меня прокляли.
Что ж, надеюсь моя работа кому то пригодится. Приступим.


Заметки

Создание Observable


asObservable
create
deferred
empty
error
interval
just
never
of
range
repeatElement
timer

Комбинирование Observable


amb
combineLatest
concat
merge
startWith
switchLatest
withLatestFrom
zip

Фильтрация


distinctUntilChanged
elementAt
filter
ignoreElements
sample
single
skip
skip (duration)
skipUntil
skipWhile
skipWhileWithIndex
take
take (duration)
takeLast
takeUntil
takeWhile
takeWhileWithIndex
throttle

Трансформация


buffer
flatMap
flatMapFirst
flatMapLatest
flatMapWithIndex
map
mapWithIndex
window

Операторы математические и агрегирования


reduce
scan
toArray

Работа с ошибками


catchError
catchErrorJustReturn
retry
retryWhen

Операторы для работы с Connectable Observable


multicast
publish
refCount
reply
replayAll
debug
doOn / doOnNext
delaySubscription
observeOn
subscribe
subscribeOn
timeout
using

В схемах я буду использовать обозначение Source/SO в качестве Source Observable, RO/Result в качестве Result Observable.

В качестве вспомогательного кода я буду пользоваться функцией createSequenceWithWait, она создает Observable из массива элементов с указанным интервалом между элементами.

public enum ResultType {
    case Infinite
    case Completed
    case Error
}

public func createSequenceWithWait(array: [T], waitTime: Int64 = 1, resultType: ResultType = .Completed, describer: ((value: T) -> U)? = nil) -> Observable {
    return Observable.create{ observer  in
        for (idx, letter) in array.enumerate() {
            let time = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(idx) * Int64(NSEC_PER_SEC))
            dispatch_after(time, dispatch_get_main_queue()) {
                if let describer = describer {
                    let value = describer(value: letter)
                    observer.on(.Next(value))
                } else {
                    observer.on(.Next(letter as! U))
                }
                
            }
        }
        
        if resultType != .Infinite {
            let allTime = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(array.count) * Int64(NSEC_PER_SEC))
            dispatch_after(allTime, dispatch_get_main_queue()) {
                switch resultType {
                case .Completed:
                    observer.onCompleted()
                case .Error:
                    observer.onError(RxError.Unknown)
                default:
                    break
                }
                
            }
        }
        
        return NopDisposable.instance
    }
}


Функция example — просто позволяет отделять вывод в консоли, её код следующий (взят из RxSwift)

public func example(description: String, action: () -> ()) {
    print("\n--- \(description) example ---")
    action()
}


Во всех примерах, где необходимо работать с временными задержками, если этот код будет запускаться в песочнице — необходимо прописать

import XCPlayground
XCPlaygroundPage.currentPage.needsIndefiniteExecution = true

Так же подразумевается, что читатель имеет общее представление о том, что такое реактивное программирование в общем и о RxSwift в частности. Не знаю есть ли смысл городить очередную вводную.



asObservable


Этот метод реализован в классах RxSwift, если они поддерживают конвертацию в Observable. Например: ControlEvent, ControlProperty, Variable, Driver

example("as Observable") {
    let variable = Variable(0)
    
    variable.asObservable().subscribe { e in
        print(e)
    }
    variable.value = 1
}

Консоль:

--- as Observable example ---
Next(0)
Next(1)
Completed

В данном примере мы Variable преобразовали в Observable и подписались на его события


create

Этот метод позволяет создавать Observable с нуля, полностью контролируя какие элементы и когда он будет генерировать.

example("create") {
    let firstSequence = Observable.of(1, 2, 3)
    let secondSequence = Observable.of("A", "B", "C")
    
    let multipleSequence = Observable>.create { observer in
        observer.on(.Next(firstSequence))
        observer.on(.Next(secondSequence))
        return NopDisposable.instance
    }
    let concatSequence = multipleSequence.concat()
    concatSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- create example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)

В данном примере мы создали Observable вручную, он сгенерирует два Observable, элементы которых мы затем объединим командой concat, на получившийся Observable мы и подпишемся


deferred


Этот оператор позволяет отложить создание Observable, до момента подписки с помощью subscribe

example("without deferred") {
    var i = 1
    let justObservable = Observable.just(i)
    i = 2
    _ = justObservable.subscribeNext{ print ("i = \($0)") }
}

example("with deferred") {
    var i = 1
    let deferredJustObservable = Observable.deferred{
        Observable.just(i)
    }
    i = 2
    _ = deferredJustObservable.subscribeNext{ print ("i = \($0)") }
}

Консоль:

--- without deferred example ---
i = 1

--- with deferred example ---
i = 2

В первом случае Observable создается сразу, с помощью Observable.just (i), и изменение значение i уже не влияет на генерируемый элемент это последовательностью. Во втором же случае мы создаем с помощью deferred, и мы можем поменять значение i перед subscribe


empty


Пустая последовательность, заканчивающаяся Completed

bab24735e0de416792a959cdcd519180.png

example("empty") {
    let sequence = Observable.empty()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- empty example ---
Completed


error


Создаст последовательность, которая состоит из одного события — Error

b3439cb513de4be9996f48bebfb6aaf3.png

example("error") {
    let sequence = Observable
        .error(RxError.Unknown)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- error example ---
Error(Unknown error occured.)


interval


Создает бесконечную последовательность, возрастающую с 0 с шагом 1, с указанной периодичностью

011ba04fda0a478c9e0a225fb36748ec.png

example("interval") {
    let sequence = Observable.interval(1, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- interval example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
....


just


Создает последовательность из любого значения, которая завершается Completed

0e8fac44dcb44bbc86a8a0ed452aec7b.png

example("just") {
    let sequence = Observable.just(100)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- just example ---
Next(100)
Completed


never


Пустая последовательность, чьи observer«ы никогда не вызываются, т.е. не будет сгенерировано ни одно событие

503a867579524903afe5594b3c1fc6dd.png

example("never") {
    let sequence = Observable.never()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- never example ---


of


Последовательность из variadic переменной, после всех элементов генерируется Completed

96235583734c4667a247f7279f62609c.png

example("simple of") {
    let sequence = Observable.of(1, 2)

    sequence.subscribe { e in
        print(e)
    }
}

example("of for Observables") {
    let firstSequence = Observable.of(1, 2, 3)
    let secondSequence = Observable.of("A", "B", "C")
    
    let bothSequence = Observable.of(firstSequence, secondSequence)
    let mergedSequence = bothSequence.merge()
    
    mergedSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- simple of example ---
Next(1)
Next(2)
Completed

--- of for Observables example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed

В первом случае мы создали последовательность из двух чисел. Во втором из двух Observable, а затем их объединили между собой с помощью оператора merge


range


Создает последовательность с конечным числом элементов, возрастающую с шагом 1 от указанного значения указанное число раз, после всех элементов генерируется Completed

b8c645c248954839b6ba4e6c86a56979.png

example("range") {
    let sequence = Observable.range(start: 5, count: 3)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- range example ---
Next(5)
Next(6)
Next(7)
Completed


Сгенерировались элементы начиная с 5, 3 раза с шагом 1

repeatElement


Бесконечно создавать указанный элемент, без задержек. Никогда не будет сгенерированы события Completed или Error

0e8bde18055743ad87475af402ab60f9.png

example("repeatElement") {
    let sequence = Observable.repeatElement(1, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- repeatElement example ---
Next(1)
Next(2)
Next(3)
.....


timer


Бесконечная последовательность, возрастающая с 0 с шагом 1, с указанной периодичностью и возможность задержки при старте. Никогда не будет сгенерированы события Completed или Error

f8e230c64e5a4f2999f9fc10eb7b0815.png

example("timer") {
    let sequence = Observable.timer(2, period: 3, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- timer example ---
Next(0)
Next(1)
Next(2)

В данном примере последовательность начнет генерировать элементы с задержкой в 2 секунды, каждые 3 секунды


amb

SO = [Observable] или SO1, SO2 = Observable
RO = Observable

Из всех Observable SO выбирается тот, который первым начинает генерировать элементы, его элементы и дублируются в RO, остальные SO игнорируются

6e68723996bd488a830ffb4b6cf35ea2.png

example("amb") {
    let subjectA = PublishSubject()
    let subjectB = PublishSubject()
    let subjectC = PublishSubject()
    let subjectD = PublishSubject()
    
    let ambSequence = [subjectA, subjectB, subjectC, subjectD].amb()
    ambSequence.subscribe { e in
        print(e)
    }
    
    subjectC.onNext(0)
    subjectA.onNext(3)
    subjectB.onNext(102)
    subjectC.onNext(1)
    subjectD.onNext(45)
}

Консоль:

--- amb example ---
Next(0)
Next(1)

Т.к. первым сгенерировал элемент subjectC, — лишь его элементы дублируются в RO, остальные игнорируются


combineLatest

SO = SO1, SO2,... SON = Observable
RO = Observable 

Как только все Observable сгенерировали хотя бы по одному элементу — эти элементы используются в качестве параметров в переданную функцию, и результат этой функции генерируется RO в качестве элемента. В дальнейшем при генерации любым Observable элемента — генерируется новый результат функции с последними элементами из всех комбинируемых Observable

d1e398ec43d44c08ab5670293a15d853.png

example("combineLatest") {
    let firstSequence = createSequenceWithWait([1,2,3], waitTime: 2) { element in
        "\(element)"
    }.debug("firstSequence")
    let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 1) { element in
        "\(element)"
        }
        .delaySubscription(3, scheduler: MainScheduler.instance)
        .debug("secondSequence")
    
    let concatSequence = Observable.combineLatest(firstSequence, secondSequence) {
        first, second -> String in
        "\(first) - \(second)"
    }
    concatSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- combineLatest example ---
2016-04-12 16:59:35.421: firstSequence -> subscribed
2016-04-12 16:59:35.422: secondSequence -> subscribed
2016-04-12 16:59:35.434: firstSequence -> Event Next(1)
2016-04-12 16:59:37.423: firstSequence -> Event Next(2)
2016-04-12 16:59:38.423: secondSequence -> Event Next(A)
Next(2 - A)
2016-04-12 16:59:39.423: firstSequence -> Event Next(3)
Next(3 - A)
2016-04-12 16:59:39.522: secondSequence -> Event Next(B)
Next(3 - B)
2016-04-12 16:59:40.622: secondSequence -> Event Next(C)
Next(3 - C)
2016-04-12 16:59:41.722: firstSequence -> Event Completed
2016-04-12 16:59:41.722: firstSequence -> disposed
2016-04-12 16:59:41.722: secondSequence -> Event Completed
2016-04-12 16:59:41.722: secondSequence -> disposed
Completed

В этом примере я создал Observable с помощью createSequenceWithWait, чтобы элементы генерировались с разной задержкой, чтобы было видно как перемешиваются элементы.
firstSequence успел сгенерировать 1 и 2, прежде чем secondSequence сгенерировал A, поэтому 1 отбросили, и первым выводом стало 2 — A


concat

SO = Observable> или SO1, SO2 = Observable
RO = Observable

В RO элементы включают сначала все элементы первого Observable, и лишь затем следующего. Это означает, что если первый Observable никогда не сгенерирует Completed, — элементы второго никогда не поступят в RO. Ошибка в текущем Observable пробрасывается в RO

125e09b9c0df42f39a1792e0192e8b9a.png

example("concat object method") {
    let firstSequence = Observable.of(1, 2, 3)
    let secondSequence = Observable.of("A", "B", "C")
    let concatSequence = firstSequence.concat(secondSequence)
    concatSequence.subscribe { e in
        print(e)
    }
}

example("concat from array") {
    let firstSequence = Observable.of(1,2,3)
    let secondSequence = Observable.of(4,5,6)
    let concatSequence = Observable.of(firstSequence, secondSequence)
        .concat()
    
    concatSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- concat object method example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed

--- concat from array example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(5)
Next(6)
Completed

В первом примере мы присоединяем второй Observable к первому.
Во втором генерируем последовательность из массива.


merge

SO = Observable>
RO = Observable

Элементы RO включают элементы из исходных Observable в том порядке, в котором они были выпущены в исходных Observable

8c24ab2ed59a41a6afd992e149d2c4fb.png

example("simple merge") {
    let firstSequence = Observable.of(1, 2, 3)
    let secondSequence = Observable.of("A", "B", "C")

    let bothSequence = Observable.of(firstSequence, secondSequence)
    let mergedSequence = bothSequence.merge()
    
    mergedSequence.subscribe { e in
        print(e)
    }
}

example("merge with wait") {
    let firstSequence = createSequenceWithWait([1,2,3]) { element in
        "\(element)"
    }
    let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 2) { element in
        "\(element)"
    }

    let bothSequence = Observable.of(firstSequence, secondSequence)
    let mergedSequence = bothSequence.merge()
    
    mergedSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- simple merge example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed

--- merge with wait example ---
Next(1)
Next(A)
Next(2)
Next(3)
Next(B)
Next(C)
Completed

В первом примере мы сливаем две последовательности созданные без задержки, в итоге первый Observable успевает сгенерировать все свои элементы перед тем как это начнет делать второй, результат оказывается идентичен concat
Во втором же случае последовательности сделаны с задержкой в генерации, и видно что элементы в RO теперь вперемешку, в том порядке в котором они были сгенерированы в исходных Observable


startWith

SO = Observable
RO = Observable

В начало SO добавляются элементы переданные в качестве аргумента

0c23f3479a1b453788863898b10126ca.png

example("startWith") {
    let sequence = Observable.of(1, 2, 3).startWith(0)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- startWith example ---
Next(0)
Next(1)
Next(2)
Next(3)
Completed


switchLatest

SO = Observable>
RO = Observable 

Изначально подписываемся на O1 генерируемого SO, его элементы зеркально генерируются в RO. Как только из SO генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка от O1, подписываемся на O2 и так далее. Таким образом в RO — элементы лишь из последнего сгенерированного Observable

822951742ec64c7396cf8fb92151cbba.png

example("switchLatest") {
    let varA = Variable(0)
    let varB = Variable(100)
    
    let proxyVar = Variable(varA.asObservable())
    let concatSequence = proxyVar.asObservable().switchLatest()
    
    concatSequence.subscribe { e in
        print(e)
    }
    varA.value = 1
    varA.value = 2
    varB.value = 3
    proxyVar.value = varB.asObservable()
    varB.value = 4
    varA.value = 5
}


example("switchLatest") {
    let observableA = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(10))
        }
        delay(3) {
            observer.on(.Next(20))
        }
        delay(5) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("oA")
    let observableB = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(100))
        }
        delay(1) {
            observer.on(.Next(200))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("oB")
    
    let observableC = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(1000))
        }
        delay(1) {
            observer.on(.Next(2000))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("oC")
    
        let subjects = [observableA, observableB, observableC]
        let sequence:Observable> = createSequenceWithWait([observableA, observableB, observableC],waitTime:1) {$0}
        let switchLatestSequence:Observable = sequence.switchLatest()
        switchLatestSequence.subscribe { e in
            print(e)
        }
}

Консоль:


--- switchLatest example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
Completed

--- switchLatest example ---
2016-04-12 17:15:22.710: oA -> subscribed
2016-04-12 17:15:22.711: oA -> Event Next(10)
Next(10)
2016-04-12 17:15:23.797: oA -> disposed // происходит отписка как только сгенерировался oB
2016-04-12 17:15:23.797: oB -> subscribed
2016-04-12 17:15:23.797: oB -> Event Next(100)
Next(100)
2016-04-12 17:15:24.703: oB -> disposed // происходит отписка как только сгенерировался oC
2016-04-12 17:15:24.703: oC -> subscribed 
2016-04-12 17:15:24.703: oC -> Event Next(1000)
Next(1000)
2016-04-12 17:15:25.800: oC -> Event Next(2000)
Next(2000)
2016-04-12 17:15:26.703: oC -> Event Completed
2016-04-12 17:15:26.703: oC -> disposed
Completed

В первом примере показано как команда работает в статике, когда мы руками переподключаем Observable.
Во втором же у нас последовательности с задержками. observableA, observableB, observableC из SO генерируются раз в 1 секунду. Их же элементы генерируются с различными задержками.


withLatestFrom

SO1, SO2 = Observable
RO = Observable

Как только O1 генерирует элемент проверяется сгенерирован ли хоть один элемент в O2, если да, то берутся последние элементы из O1 и O2 и используются в качестве аргументов для переданной функции, результат которой генерируется RO в качестве элемента

cb8c5be5a30442e7b16e3eac78651a02.png

example("withLatestFrom") {
    let varA = Variable(0)
    let varB = Variable(10)
    
    let withLatestFromSequence = varA.asObservable().withLatestFrom(varB.asObservable()) {
        "\($0) - \($1)"
    }
    withLatestFromSequence.subscribe { e in
        print(e)
    }
    varA.value = 1
    varA.value = 2 
    varB.value = 20
    
    varB.value = 30
    varA.value = 5
    varA.value = 6
}

Консоль:

--- withLatestFrom example ---
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
Next(5 - 30)
Next(6 - 30)
Completed


zip

SO = Observable>
RO = Observable 

Элементы RO представляют собой комбинацию из элементов сгенерированных исходными Observable, объединение идет по индексу выпущенного элемента

b9fa76866cf64559bb7d1bb024adb35c.png

example("zip with simple Variable") {
    let varA = Variable(0)
    let varB = Variable(10)
    
    let zippedSequence = Observable.zip(varA.asObservable(), varB.asObservable()) { "\($0) - \($1)"
    }
    
    zippedSequence.subscribe { e in
        print(e)
    }
    varA.value = 1
    varA.value = 2
    varB.value = 20
    
    varB.value = 30
    varA.value = 3
    varA.value = 4
}

example("zip with PublishSubject") {
    let subjectA = PublishSubject()
    let subjectB = PublishSubject()

    let zippedSequence = Observable.zip(subjectA, subjectB) { "\($0) - \($1)"
    }
    
    zippedSequence.subscribe { e in
        print(e)
    }
    subjectA.onNext(0)
    subjectA.onNext(1)
    subjectA.onNext(2)
    subjectB.onNext(100)
    subjectB.onNext(101)
    subjectA.onNext(3)
    subjectB.onNext(102)
    subjectA.onNext(4)
}

Консоль:

--- zip with simple Variable example ---
Next(0 - 10)
Next(1 - 20)
Next(2 - 30)
Completed

--- zip with PublishSubject example ---
Next(0 - 100)
Next(1 - 101)
Next(2 - 102)

Из примеров видно, что элементы комбинируются в том порядке, в каком они были сгенерированы в исходных Observable


distinctUntilChanged


Пропускаем все повторяющиеся подряд идущие элементы

3838bfaba1b345069682fd5561e8f24e.png

example("distinctUntilChanged") {
    let sequence = Observable.of(1, 2, 2, 3, 4, 4, 4, 1).distinctUntilChanged()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- distinctUntilChanged example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(1)
Completed

Здесь тонкий момент, что отбрасываются не уникальные для всей последовательности элементы, а лишь те, которые идут подряд.


elementAt

В RO попадает лишь элемент выпущенный N по счету

2058c5dfd4c44783846cb3142afb3e3e.png

example("elementAt") {
    let sequence = Observable.of(0, 10, 20, 30, 40)
        .elementAt(2)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- elementAt example ---
Next(20)
Completed


filter

Отбрасываются все элементы, которые не удовлетворяют заданным условиям

ff1a054945cb42ab9093f7b68b3645e8.png

example("filter") {
    let sequence = Observable.of(1, 20, 3, 40)
        .filter{ $0 > 10}
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- filter example ---
Next(20)
Next(40)
Completed


ignoreElements

Отбрасывает все элементы, передаёт только терминальные сообщения Completed и Error

0870adbdffa44870a1213f08d7b29b8b.png

example("ignoreElements") {
    let sequence = Observable.of(1, 2, 3, 4)
        .ignoreElements()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- ignoreElements example ---
Completed


sample

При каждом сгенерированном элементе последовательности семплера (воспринимать как таймер) — брать последний выпущенный элемент исходной последовательности и дублировать его в RO, ЕСЛИ он не был сгенерирован ранее

f692683e77ca44089e106d7ac704393d.png

example("sampler") {
    let sampler = Observable.interval(1, scheduler: MainScheduler.instance).debug("sampler")
    
    let sequence:Observable = createSequenceWithWait([1,2,3,4], waitTime: 3).sample(sampler)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- sampler example ---
2016-04-12 18:28:20.322: sampler -> subscribed
2016-04-12 18:28:21.323: sampler -> Event Next(0)
Next(1)
2016-04-12 18:28:22.324: sampler -> Event Next(1) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:23.323: sampler -> Event Next(2)
Next(2)
2016-04-12 18:28:24.323: sampler -> Event Next(3) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:25.323: sampler -> Event Next(4) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:26.323: sampler -> Event Next(5)
Next(3)
...


single

Из исходной последовательности берется единственный элемент, если элементов > 1 — генерировать ошибку. Есть вариант с предикатом

2126d02199a8426d9617a7e5a3c8ad96.png

example("single generate error") {
    let sequence = Observable.of(1, 2, 3, 4).single()
    sequence.subscribe { e in
        print(e)
    }
}

example("single") {
    let sequence = Observable.of(1, 2, 3, 5).single {
        $0 % 2 == 0
    }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- single generate error example ---
Next(1)
Error(Sequence contains more than one element.)

--- single example ---
Next(2)
Completed

В первом примере в исходной последовательности оказалось больше 1 элемента, поэтому была сгенерирована ошибка в момент генерирования в SO второго элемента
Во втором примере условиям предиката удовлетворил всего 1 элемент, поэтому ошибки сгенерировано не было


skip


Из SO отбрасываем первые N элементов

19470a3a89e941949aa7a5c9a8a87e7a.png

example("skip") {
    let sequence = Observable.of(1, 2, 3, 4).skip(2)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skip example ---
Next(3)
Next(4)
Completed


skip (duration)

Из SO отбрасываем первые элементы, которые были сгенерированы в первые N

82e042f1d6b04781beb1f172c5489f36.png

example("skip duration with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }.skip(2, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skip duration with wait example ---
Next(3)
Next(4)
Completed


skipUntil

Отбрасываем из SO элементы, которые были сгенерированы до начала генерации элементов последовательностью переданной в качестве параметра

e9fa880557014f69865c887368ae896f.png

example("skipUntil") {
    let firstSequence = createSequenceWithWait([1,2,3,4]) { $0 }
    let secondSequence = Observable.just(1)
        .delaySubscription(1, scheduler: MainScheduler.instance)
    let skippedSequence = firstSequence.skipUntil(secondSequence)
    
    skippedSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skipUntil example ---
Next(3)
Next(4)
Completed

Генерация элементов в secondSequence была отложена на 1 секунду с помощью команды delaySubscription, таким образом элементы из firstSequence стали дублироваться в RO лишь через 1 секунду


skipWhile

Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией

373863a24f6b4cb6b9cbf82f858f241f.png

example("skipWhile") {
    let firstSequence = [1,2,3,4,0].toObservable()
    let skipSequence = firstSequence.skipWhile { $0 < 3 }
    
    skipSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skipWhile example ---
Next(3)
Next(4)
Next(0)
Completed


skipWhileWithIndex


Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией. Отличие от skipWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента

a7d9690cd41d4ab59a77e620e80740d7.png

example("skipWhileWithIndex") {
    let firstSequence = [1,2,5,0,7].toObservable()
    let skipSequence = firstSequence.skipWhileWithIndex{ value, idx in
        value < 4 || idx < 2
    }
    skipSequence.subscribe { e in
        print(e)
    }
}


Консоль:

--- skipWhileWithIndex example ---
Next(5)
Next(0)
Next(7)
Completed


take


Из SO берутся лишь первые N элементов

65c0b9b983894312a9f5b147d5bef422.png

example("take") {
    let sequence = Observable.of(1, 2, 3, 4).take(2)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- take example ---
Next(1)
Next(2)
Completed


take (duration)

Из SO берутся лишь элементы сгенерированные в первые N секунд

b799656eaf11412187d8dcf5697e616a.png

example("take duration with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
    let takeSequence = sequence.take(2, scheduler: MainScheduler.instance)
    takeSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- take duration with wait example ---
Next(1)
Next(2)
Completed


takeLast

Из SO берутся лишь последние N элементов. Что означает, если SO никогда не закончит генерировать элементы — в RO не попадет ни одного элемента.

4b197f2530e043b0bd5ee13dd43cc445.png

example("takeLast") {
    let sequence = Observable.of(1, 2, 3, 4).takeLast(2)
    sequence.subscribe { e in
        print(e)
    }
}

example("takeLast with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
    let takeSequence = sequence.takeLast(2)
    takeSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeLast example ---
Next(3)
Next(4)
Completed

--- takeLast with wait example ---
Next(3)
Next(4)
Completed


Второй пример приведен для иллюстрации в задержке генерации элементов в RO из за ожидания завершения генерации элементов в SO

takeUntil

Из SO берутся элементы, которые были выпущены до начала генерации элементов последовательностью переданной в качестве параметра

f62c4c834e63461da88d8b208c037e14.png

example("takeUntil") {
    let stopSequence = Observable.just(1)
        .delaySubscription(2, scheduler: MainScheduler.instance)
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
        .takeUntil(stopSequence)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeUntil example ---
Next(1)
Next(2)
Completed


takeWhile

Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией

2b6e59c4c0fb42c3a7f11da4cdf85a36.png

example("takeWhile") {
    let sequence = [1,2,3,4].toObservable().takeWhile{ $0 < 3 }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeWhile example ---
Next(1)
Next(2)
Completed


takeWhileWithIndex

Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией. Отличие от takeWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента

445d0b259f054d8bbe21218fba5a4f18.png

example("takeWhileWithIndex") {
    let sequence = [1,2,3,4,5,6].toObservable()
        .takeWhileWithIndex{ (val, idx) in
            val % 2 == 0 || idx < 3
    }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeWhileWithIndex example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed


throttle

Из SO берутся лишь элементы, после которых не было новых элементов N секунд.

da65c79c1e6a4630acb8eafd50a9495e.png

example("throttle") {
    let sequence = Observable.of(1, 2, 3, 4)
        .throttle(1, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

example("throttle with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
        .throttle(0.5, scheduler: MainScheduler.instance)
    
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- throttle example ---
Next(4)
Completed

--- throttle with wait example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed

В первом случае SO генерирует элементы без задержек, поэтому лишь последний элемент не имеет после себя новых элементов.
Во втором примере элементы генерируются медленнее, чем N секунд переданные в throttle, поэтому за каждым генерируемым элементом достаточный временной промежуток.


buffer

SO = Observable<>>
RO = Observable<[T]>

Элементы из SO по определенным правилам объединяются в массивы и генерируются в RO. В качестве параметров передается count, — максимальное число элементов в массиве, и timeSpan время максимального ожидания наполнения текущего массива из элементов SO. Таким образом элемент RO, являет собой массив [T], длинной от 0 до count.

caa2896ac5104866a1c405c3e803aa84.png

example("buffer") {
    let varA = Variable(0)
    
    let bufferSequence = varA.asObservable()
        .buffer(timeSpan: 3, count: 3, scheduler: MainScheduler.instance)
    bufferSequence.subscribe { e in
        print("\(NSDate()) - \(e)")
    }
    varA.value = 1
    varA.value = 2
    varA.value = 3
    delay(3) {
        varA.value = 4
        varA.value = 5
        delay(5) {
            varA.value = 6
        }
    }
}

Консоль:

--- buffer example ---
2016-04-12 16:10:58 +0000 - Next([0, 1, 2])
2016-04-12 16:11:01 +0000 - Next([3]) 
2016-04-12 16:11:04 +0000 - Next([4, 5])
2016-04-12 16:11:07 +0000 - Next([6])
2016-04-12 16:11:07 +0000 - Completed



Длина массива была указана как 3, — как только были сгенерированы 3 элемента — в RO сгенерировался элемент [0, 1, 2]
После генерации элемента 3, — была задержка в 3 секунды, сработал таймаут, и массив не был заполнен полностью.
То же касается и задержки после генерации элемента 5

flatMap

Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge)

6d75e2dce2bf42978ce604aae5c2217f.png

example("flatMap with wait") {
    let sequence:Observable = createSequenceWithWait([0,1,2], waitTime: 1) { $0 }
    let flatMapSequence:Observable = sequence.flatMap{val in
        createSequenceWithWait([10,11,12], waitTime: 2) { element in
            "\(element) - \(val)"
        }
    }
    flatMapSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- flatMap with wait example ---
Next(10 - 0)
Next(10 - 1)
Next(11 - 0)
Next(10 - 2)
Next(11 - 1)
Next(12 - 0)
Next(11 - 2)
Next(12 - 1)
Next(12 - 2)
Completed


flatMapFirst

Каждый элемент SO превращается в отдельный Observable.
1) Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Пока O1 генерирует элементы — все последующие Observable сгенерированные из SO отбрасываются, на них не подписываемся.
2) как только O1 оканчивается, — если будет сгенерирован новый Observable — на него подпишутся и его элементы будут дублироваться в RO.
Повторяем пункт 1, но вместо O1 берем последний сгенерированный Observable

d32de4c093da4668a1a6bb78ba4ef1c1.png

example("flatMapFirst") {
    let sequence:Observable = Observable.of(10, 20, 30)
        .debug("sequence")
    let flatMapSequence:Observable = sequence
        .flatMapFirst{val in
            Observable.of(0, 1, 2)
                .map{"\($0) - \(val)"
            }
    }
    flatMapSequence.subscribe { e in
        print(e)
    }
}


example("flatMapFirst with delay") {
    let subjectA = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(10))
        }
        delay(1) {
            observer.on(.Next(20))
        }
        delay(7) {
            observer.onCompleted()
        }
        return NopDisposable.instance
    }.debug("sA")
    let subjectB = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(100))
        }
        delay(1) {
            observer.on(.Next(200))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
    }.debug("sB")

    let subjectC = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(1000))
        }
        delay(1) {
            observer.on(.Next(2000))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
    }.debug("sC")

    let subjects = [subjectA, subjectB, subjectC]
    let sequence:Observable = createSequenceWithWait([0, 1, 2],waitTime:4){$0}
        .debug("sequence")
    let flatMapSequence:Observable = sequence.flatMapFirst{val in
        return subjects[val].asObservable()
        }.debug("flatMapSequence")
    flatMapSequence.subscribe { e in
        print(e)
    }
}

Консоль:


--- flatMapFirst example ---
2016-04-12 19:19:46.915: sequence -> subscribed
2016-04-12 19:19:46.916: sequence -> Event Next(10)
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
2016-04-12 19:19:46.918: sequence -> Event Next(20)
Next(0 - 20)
Next(1 - 20)
Next(2 - 20)
2016-04-12 19:19:46.919: sequence -> Event Next(30)
Next(0 - 30)
Next(1 - 30)
Next(2 - 30)
2016-04-12 19:19:46.921: sequence -> Event Completed
Completed
2016-04-12 19:19:46.921: sequence -> disposed

--- flatMapFirst with delay example ---
2016-04-12 19:19:46.925: flatMapSequence -> subscribed
2016-04-12 19:19:46.926: sequence -> subscribed
2016-04-12 19:19:46.935: sequence -> Event Next(0) // SO генерирует 1й элемент
2016-04-12 19:19:46.935: sA -> subscribed // на его основе генерируется Observable sA, на  который мы подписываемся
2016-04-12 19:19:46.936: sA -> Event Next(10)
2016-04-12 19:19:46.936: flatMapSequence -> Event Next(10)
Next(10)
2016-04-12 19:19:47.936: sA -> Event Next(20)
2016-04-12 19:19:47.936: flatMapSequence -> Event Next(20)
Next(20)
2016-04-12 19:19:50.926: sequence -> Event Next(1) // SO генерирует 2й элемент, но на этот момент sA еще генерирует элементы, поэтому подписки на sB не произошло, он просто отбрасывается
2016-04-12 19:19:53.935: sA -> Event Completed 
2016-04-12 19:19:53.936: sA -> disposed // sA закончил генерировать элементы, от него отписались
2016-04-12 19:19:55.137: sequence -> Event Next(2) // SO генерирует 3й элемент
2016-04-12 19:19:55.137: sC -> subscribed // т.к. на этот момент нет активных Observable (от sA мы отписались, sB - мы отбросили) - мы можем подписаться на него
2016-04-12 19:19:55.137: sC -> Event Next(1000)
2016-04-12 19:19:55.137: flatMapSequence -> Event Next(1000)
Next(1000)
2016-04-12 19:19:56.236: sC -> Event Next(2000)
2016-04-12 19:19:56.236: flatMapSequence -> Event Next(2000)
Next(2000)
2016-04-12 19:19:57.335: sC -> Event Completed
2016-04-12 19:19:57.336: sC -> disposed
2016-04-12 19:19:58.926: sequence -> Event Completed
2016-04-12 19:19:58.926: flatMapSequence -> Event Completed
Completed
2016-04-12 19:19:58.926: sequence -> disposed

Первый пример показывает, что т.к. Observable успевают сгенерировать свои элементы, ко времени когда приходит очередь подписаться на новый Observable — это уже разрешено, поэтому в RO попадают элементы со всех Observable
А вот второй пример очень объемный, но позволят в подробностях наблюдать как происходят подписка/отписка и как это влияет на генерацию элементов


flatMapLatest

Каждый элемент SO превращается в отдельный Observable. Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Как только из SO выпускается очередной элемент и на его основе генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка. Таким образом в RO — элементы из последнего генерированного Observable

6648c2e110b14a6faa31ba7b9ecb72a7.png

example("flatMapLatest") {
    let sequence:Observable = Observable.of(10, 20, 30)
    let flatMapSequence = sequence.flatMapLatest{val in Observable.of(0, 1, 2)
        .map{"\($0) - \(val)"
        }
    }
    flatMapSequence.subscribe { e in
        print(e)
    }
}

example("flatMapLatest with delay") {
    let subjectA = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(10))
        }
        delay(3) {
            observer.on(.Next(20))
        }
        delay(5) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("sA")
    let subjectB = Observable.create{ observer in
        delay(0) {
            observer.on(.Next(100))
        }
        delay(1) {
            observer.on(.Next(200))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposabl
    
            

© Habrahabr.ru