// // WithLatestFrom.swift // CombineExt // // Created by Shai Mishali on 29/08/2019. // Copyright © 2020 Combine Community. All rights reserved. // #if canImport(Combine) import Combine // MARK: - Operator methods @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public extension Publisher { /// Merges two publishers into a single publisher by combining each value /// from self with the latest value from the second publisher, if any. /// /// - parameter other: A second publisher source. /// - parameter resultSelector: Function to invoke for each value from the self combined /// with the latest value from the second source, if any. /// /// - returns: A publisher containing the result of combining each value of the self /// with the latest value from the second publisher, if any, using the /// specified result selector function. func withLatestFrom(_ other: Other, resultSelector: @escaping (Output, Other.Output) -> Result) -> Publishers.WithLatestFrom { return .init(upstream: self, second: other, resultSelector: resultSelector) } /// Merges three publishers into a single publisher by combining each value /// from self with the latest value from the second and third publisher, if any. /// /// - parameter other: A second publisher source. /// - parameter other1: A third publisher source. /// - parameter resultSelector: Function to invoke for each value from the self combined /// with the latest value from the second and third source, if any. /// /// - returns: A publisher containing the result of combining each value of the self /// with the latest value from the second and third publisher, if any, using the /// specified result selector function. func withLatestFrom(_ other: Other, _ other1: Other1, resultSelector: @escaping (Output, (Other.Output, Other1.Output)) -> Result) -> Publishers.WithLatestFrom, Result> where Other.Failure == Failure, Other1.Failure == Failure { let combined = other.combineLatest(other1) .eraseToAnyPublisher() return .init(upstream: self, second: combined, resultSelector: resultSelector) } /// Merges four publishers into a single publisher by combining each value /// from self with the latest value from the second, third and fourth publisher, if any. /// /// - parameter other: A second publisher source. /// - parameter other1: A third publisher source. /// - parameter other2: A fourth publisher source. /// - parameter resultSelector: Function to invoke for each value from the self combined /// with the latest value from the second, third and fourth source, if any. /// /// - returns: A publisher containing the result of combining each value of the self /// with the latest value from the second, third and fourth publisher, if any, using the /// specified result selector function. func withLatestFrom(_ other: Other, _ other1: Other1, _ other2: Other2, resultSelector: @escaping (Output, (Other.Output, Other1.Output, Other2.Output)) -> Result) -> Publishers.WithLatestFrom, Result> where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure { let combined = other.combineLatest(other1, other2) .eraseToAnyPublisher() return .init(upstream: self, second: combined, resultSelector: resultSelector) } /// Upon an emission from self, emit the latest value from the /// second publisher, if any exists. /// /// - parameter other: A second publisher source. /// /// - returns: A publisher containing the latest value from the second publisher, if any. func withLatestFrom(_ other: Other) -> Publishers.WithLatestFrom { return .init(upstream: self, second: other) { $1 } } /// Upon an emission from self, emit the latest value from the /// second and third publisher, if any exists. /// /// - parameter other: A second publisher source. /// - parameter other1: A third publisher source. /// /// - returns: A publisher containing the latest value from the second and third publisher, if any. func withLatestFrom(_ other: Other, _ other1: Other1) -> Publishers.WithLatestFrom, (Other.Output, Other1.Output)> where Other.Failure == Failure, Other1.Failure == Failure { withLatestFrom(other, other1) { $1 } } /// Upon an emission from self, emit the latest value from the /// second, third and forth publisher, if any exists. /// /// - parameter other: A second publisher source. /// - parameter other1: A third publisher source. /// - parameter other2: A forth publisher source. /// /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any. func withLatestFrom(_ other: Other, _ other1: Other1, _ other2: Other2) -> Publishers.WithLatestFrom, (Other.Output, Other1.Output, Other2.Output)> where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure { withLatestFrom(other, other1, other2) { $1 } } } // MARK: - Publisher @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public extension Publishers { struct WithLatestFrom: Publisher where Upstream.Failure == Other.Failure { public typealias Failure = Upstream.Failure public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output private let upstream: Upstream private let second: Other private let resultSelector: ResultSelector private var latestValue: Other.Output? init(upstream: Upstream, second: Other, resultSelector: @escaping ResultSelector) { self.upstream = upstream self.second = second self.resultSelector = resultSelector } public func receive(subscriber: S) where Failure == S.Failure, Output == S.Input { subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber, second: second, resultSelector: resultSelector)) } } } // MARK: - Subscription @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) private extension Publishers.WithLatestFrom { class Subscription: Combine.Subscription, CustomStringConvertible where Downstream.Input == Output, Downstream.Failure == Failure { private let resultSelector: ResultSelector private var sink: Sink? private let upstream: Upstream private let downstream: Downstream private let second: Other // Secondary (other) publisher private var latestValue: Other.Output? private var otherSubscription: Cancellable? private var preInitialDemand = Subscribers.Demand.none init(upstream: Upstream, downstream: Downstream, second: Other, resultSelector: @escaping ResultSelector) { self.upstream = upstream self.second = second self.downstream = downstream self.resultSelector = resultSelector trackLatestFromSecond { [weak self] in guard let self = self else { return } self.request(self.preInitialDemand) self.preInitialDemand = .none } } func request(_ demand: Subscribers.Demand) { guard latestValue != nil else { preInitialDemand += demand return } self.sink?.demand(demand) } // Create an internal subscription to the `Other` publisher, // constantly tracking its latest value private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) { var gotInitialValue = false let subscriber = AnySubscriber( receiveSubscription: { [weak self] subscription in self?.otherSubscription = subscription subscription.request(.unlimited) }, receiveValue: { [weak self] value in guard let self = self else { return .none } self.latestValue = value if !gotInitialValue { // When getting initial value, start pulling values // from upstream in the main sink self.sink = Sink(upstream: self.upstream, downstream: self.downstream, transformOutput: { [weak self] value in guard let self = self, let other = self.latestValue else { return nil } return self.resultSelector(value, other) }, transformFailure: { $0 }) // Signal initial value to start fulfilling downstream demand gotInitialValue = true onInitialValue() } return .unlimited }, receiveCompletion: nil) self.second.subscribe(subscriber) } var description: String { return "WithLatestFrom.Subscription<\(Output.self), \(Failure.self)>" } func cancel() { sink = nil otherSubscription?.cancel() } } } #endif