2020-07-23 16:27:54 -05:00

239 lines
9.2 KiB
Swift

//
// WithLatestFrom.swift
// CombineExt
//
// Created by Shai Mishali on 29/08/2019.
// Copyright © 2020 Combine Community. All rights reserved.
//
#if canImport(Combine)
import Combine
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> Publishers.WithLatestFrom<Self, Other, Result> {
return .init(upstream: self, second: other, resultSelector: resultSelector)
}
/// Merges three publishers into a single publisher by combining each value
/// from self with the latest value from the second and third publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second and third source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second and third publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Other1: Publisher, Result>(_ other: Other,
_ other1: Other1,
resultSelector: @escaping (Output, (Other.Output, Other1.Output)) -> Result)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output), Self.Failure>, Result>
where Other.Failure == Failure, Other1.Failure == Failure {
let combined = other.combineLatest(other1)
.eraseToAnyPublisher()
return .init(upstream: self, second: combined, resultSelector: resultSelector)
}
/// Merges four publishers into a single publisher by combining each value
/// from self with the latest value from the second, third and fourth publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter other2: A fourth publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second, third and fourth source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second, third and fourth publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher, Result>(_ other: Other,
_ other1: Other1,
_ other2: Other2,
resultSelector: @escaping (Output, (Other.Output, Other1.Output, Other2.Output)) -> Result)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Self.Failure>, Result>
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
let combined = other.combineLatest(other1, other2)
.eraseToAnyPublisher()
return .init(upstream: self, second: combined, resultSelector: resultSelector)
}
/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: A second publisher source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
return .init(upstream: self, second: other) { $1 }
}
/// Upon an emission from self, emit the latest value from the
/// second and third publisher, if any exists.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
///
/// - returns: A publisher containing the latest value from the second and third publisher, if any.
func withLatestFrom<Other: Publisher, Other1: Publisher>(_ other: Other,
_ other1: Other1)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output), Self.Failure>, (Other.Output, Other1.Output)>
where Other.Failure == Failure, Other1.Failure == Failure {
withLatestFrom(other, other1) { $1 }
}
/// Upon an emission from self, emit the latest value from the
/// second, third and forth publisher, if any exists.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter other2: A forth publisher source.
///
/// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher>(_ other: Other,
_ other1: Other1,
_ other2: Other2)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Self.Failure>, (Other.Output, Other1.Output, Other2.Output)>
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
withLatestFrom(other, other1, other2) { $1 }
}
}
// MARK: - Publisher
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publishers {
struct WithLatestFrom<Upstream: Publisher,
Other: Publisher,
Output>: Publisher where Upstream.Failure == Other.Failure {
public typealias Failure = Upstream.Failure
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output
private let upstream: Upstream
private let second: Other
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.resultSelector = resultSelector
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream,
downstream: subscriber,
second: second,
resultSelector: resultSelector))
}
}
}
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension Publishers.WithLatestFrom {
class Subscription<Downstream: Subscriber>: Combine.Subscription, CustomStringConvertible where Downstream.Input == Output, Downstream.Failure == Failure {
private let resultSelector: ResultSelector
private var sink: Sink<Upstream, Downstream>?
private let upstream: Upstream
private let downstream: Downstream
private let second: Other
// Secondary (other) publisher
private var latestValue: Other.Output?
private var otherSubscription: Cancellable?
private var preInitialDemand = Subscribers.Demand.none
init(upstream: Upstream,
downstream: Downstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.downstream = downstream
self.resultSelector = resultSelector
trackLatestFromSecond { [weak self] in
guard let self = self else { return }
self.request(self.preInitialDemand)
self.preInitialDemand = .none
}
}
func request(_ demand: Subscribers.Demand) {
guard latestValue != nil else {
preInitialDemand += demand
return
}
self.sink?.demand(demand)
}
// Create an internal subscription to the `Other` publisher,
// constantly tracking its latest value
private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) {
var gotInitialValue = false
let subscriber = AnySubscriber<Other.Output, Other.Failure>(
receiveSubscription: { [weak self] subscription in
self?.otherSubscription = subscription
subscription.request(.unlimited)
},
receiveValue: { [weak self] value in
guard let self = self else { return .none }
self.latestValue = value
if !gotInitialValue {
// When getting initial value, start pulling values
// from upstream in the main sink
self.sink = Sink(upstream: self.upstream,
downstream: self.downstream,
transformOutput: { [weak self] value in
guard let self = self,
let other = self.latestValue else { return nil }
return self.resultSelector(value, other)
},
transformFailure: { $0 })
// Signal initial value to start fulfilling downstream demand
gotInitialValue = true
onInitialValue()
}
return .unlimited
},
receiveCompletion: nil)
self.second.subscribe(subscriber)
}
var description: String {
return "WithLatestFrom.Subscription<\(Output.self), \(Failure.self)>"
}
func cancel() {
sink = nil
otherSubscription?.cancel()
}
}
}
#endif