239 lines
9.2 KiB
Swift
239 lines
9.2 KiB
Swift
|
//
|
||
|
// WithLatestFrom.swift
|
||
|
// CombineExt
|
||
|
//
|
||
|
// Created by Shai Mishali on 29/08/2019.
|
||
|
// Copyright © 2020 Combine Community. All rights reserved.
|
||
|
//
|
||
|
|
||
|
#if canImport(Combine)
|
||
|
import Combine
|
||
|
|
||
|
// MARK: - Operator methods
|
||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||
|
public extension Publisher {
|
||
|
/// Merges two publishers into a single publisher by combining each value
|
||
|
/// from self with the latest value from the second publisher, if any.
|
||
|
///
|
||
|
/// - parameter other: A second publisher source.
|
||
|
/// - parameter resultSelector: Function to invoke for each value from the self combined
|
||
|
/// with the latest value from the second source, if any.
|
||
|
///
|
||
|
/// - returns: A publisher containing the result of combining each value of the self
|
||
|
/// with the latest value from the second publisher, if any, using the
|
||
|
/// specified result selector function.
|
||
|
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
|
||
|
resultSelector: @escaping (Output, Other.Output) -> Result)
|
||
|
-> Publishers.WithLatestFrom<Self, Other, Result> {
|
||
|
return .init(upstream: self, second: other, resultSelector: resultSelector)
|
||
|
}
|
||
|
|
||
|
/// Merges three publishers into a single publisher by combining each value
|
||
|
/// from self with the latest value from the second and third publisher, if any.
|
||
|
///
|
||
|
/// - parameter other: A second publisher source.
|
||
|
/// - parameter other1: A third publisher source.
|
||
|
/// - parameter resultSelector: Function to invoke for each value from the self combined
|
||
|
/// with the latest value from the second and third source, if any.
|
||
|
///
|
||
|
/// - returns: A publisher containing the result of combining each value of the self
|
||
|
/// with the latest value from the second and third publisher, if any, using the
|
||
|
/// specified result selector function.
|
||
|
func withLatestFrom<Other: Publisher, Other1: Publisher, Result>(_ other: Other,
|
||
|
_ other1: Other1,
|
||
|
resultSelector: @escaping (Output, (Other.Output, Other1.Output)) -> Result)
|
||
|
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output), Self.Failure>, Result>
|
||
|
where Other.Failure == Failure, Other1.Failure == Failure {
|
||
|
let combined = other.combineLatest(other1)
|
||
|
.eraseToAnyPublisher()
|
||
|
return .init(upstream: self, second: combined, resultSelector: resultSelector)
|
||
|
}
|
||
|
|
||
|
/// Merges four publishers into a single publisher by combining each value
|
||
|
/// from self with the latest value from the second, third and fourth publisher, if any.
|
||
|
///
|
||
|
/// - parameter other: A second publisher source.
|
||
|
/// - parameter other1: A third publisher source.
|
||
|
/// - parameter other2: A fourth publisher source.
|
||
|
/// - parameter resultSelector: Function to invoke for each value from the self combined
|
||
|
/// with the latest value from the second, third and fourth source, if any.
|
||
|
///
|
||
|
/// - returns: A publisher containing the result of combining each value of the self
|
||
|
/// with the latest value from the second, third and fourth publisher, if any, using the
|
||
|
/// specified result selector function.
|
||
|
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher, Result>(_ other: Other,
|
||
|
_ other1: Other1,
|
||
|
_ other2: Other2,
|
||
|
resultSelector: @escaping (Output, (Other.Output, Other1.Output, Other2.Output)) -> Result)
|
||
|
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Self.Failure>, Result>
|
||
|
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
|
||
|
let combined = other.combineLatest(other1, other2)
|
||
|
.eraseToAnyPublisher()
|
||
|
return .init(upstream: self, second: combined, resultSelector: resultSelector)
|
||
|
}
|
||
|
|
||
|
/// Upon an emission from self, emit the latest value from the
|
||
|
/// second publisher, if any exists.
|
||
|
///
|
||
|
/// - parameter other: A second publisher source.
|
||
|
///
|
||
|
/// - returns: A publisher containing the latest value from the second publisher, if any.
|
||
|
func withLatestFrom<Other: Publisher>(_ other: Other)
|
||
|
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
|
||
|
return .init(upstream: self, second: other) { $1 }
|
||
|
}
|
||
|
|
||
|
/// Upon an emission from self, emit the latest value from the
|
||
|
/// second and third publisher, if any exists.
|
||
|
///
|
||
|
/// - parameter other: A second publisher source.
|
||
|
/// - parameter other1: A third publisher source.
|
||
|
///
|
||
|
/// - returns: A publisher containing the latest value from the second and third publisher, if any.
|
||
|
func withLatestFrom<Other: Publisher, Other1: Publisher>(_ other: Other,
|
||
|
_ other1: Other1)
|
||
|
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output), Self.Failure>, (Other.Output, Other1.Output)>
|
||
|
where Other.Failure == Failure, Other1.Failure == Failure {
|
||
|
withLatestFrom(other, other1) { $1 }
|
||
|
}
|
||
|
|
||
|
/// Upon an emission from self, emit the latest value from the
|
||
|
/// second, third and forth publisher, if any exists.
|
||
|
///
|
||
|
/// - parameter other: A second publisher source.
|
||
|
/// - parameter other1: A third publisher source.
|
||
|
/// - parameter other2: A forth publisher source.
|
||
|
///
|
||
|
/// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
|
||
|
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher>(_ other: Other,
|
||
|
_ other1: Other1,
|
||
|
_ other2: Other2)
|
||
|
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Self.Failure>, (Other.Output, Other1.Output, Other2.Output)>
|
||
|
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
|
||
|
withLatestFrom(other, other1, other2) { $1 }
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// MARK: - Publisher
|
||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||
|
public extension Publishers {
|
||
|
struct WithLatestFrom<Upstream: Publisher,
|
||
|
Other: Publisher,
|
||
|
Output>: Publisher where Upstream.Failure == Other.Failure {
|
||
|
public typealias Failure = Upstream.Failure
|
||
|
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output
|
||
|
|
||
|
private let upstream: Upstream
|
||
|
private let second: Other
|
||
|
private let resultSelector: ResultSelector
|
||
|
private var latestValue: Other.Output?
|
||
|
|
||
|
init(upstream: Upstream,
|
||
|
second: Other,
|
||
|
resultSelector: @escaping ResultSelector) {
|
||
|
self.upstream = upstream
|
||
|
self.second = second
|
||
|
self.resultSelector = resultSelector
|
||
|
}
|
||
|
|
||
|
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
|
||
|
subscriber.receive(subscription: Subscription(upstream: upstream,
|
||
|
downstream: subscriber,
|
||
|
second: second,
|
||
|
resultSelector: resultSelector))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// MARK: - Subscription
|
||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||
|
private extension Publishers.WithLatestFrom {
|
||
|
class Subscription<Downstream: Subscriber>: Combine.Subscription, CustomStringConvertible where Downstream.Input == Output, Downstream.Failure == Failure {
|
||
|
private let resultSelector: ResultSelector
|
||
|
private var sink: Sink<Upstream, Downstream>?
|
||
|
|
||
|
private let upstream: Upstream
|
||
|
private let downstream: Downstream
|
||
|
private let second: Other
|
||
|
|
||
|
// Secondary (other) publisher
|
||
|
private var latestValue: Other.Output?
|
||
|
private var otherSubscription: Cancellable?
|
||
|
private var preInitialDemand = Subscribers.Demand.none
|
||
|
|
||
|
init(upstream: Upstream,
|
||
|
downstream: Downstream,
|
||
|
second: Other,
|
||
|
resultSelector: @escaping ResultSelector) {
|
||
|
self.upstream = upstream
|
||
|
self.second = second
|
||
|
self.downstream = downstream
|
||
|
self.resultSelector = resultSelector
|
||
|
|
||
|
trackLatestFromSecond { [weak self] in
|
||
|
guard let self = self else { return }
|
||
|
self.request(self.preInitialDemand)
|
||
|
self.preInitialDemand = .none
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func request(_ demand: Subscribers.Demand) {
|
||
|
guard latestValue != nil else {
|
||
|
preInitialDemand += demand
|
||
|
return
|
||
|
}
|
||
|
|
||
|
self.sink?.demand(demand)
|
||
|
}
|
||
|
|
||
|
// Create an internal subscription to the `Other` publisher,
|
||
|
// constantly tracking its latest value
|
||
|
private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) {
|
||
|
var gotInitialValue = false
|
||
|
|
||
|
let subscriber = AnySubscriber<Other.Output, Other.Failure>(
|
||
|
receiveSubscription: { [weak self] subscription in
|
||
|
self?.otherSubscription = subscription
|
||
|
subscription.request(.unlimited)
|
||
|
},
|
||
|
receiveValue: { [weak self] value in
|
||
|
guard let self = self else { return .none }
|
||
|
self.latestValue = value
|
||
|
|
||
|
if !gotInitialValue {
|
||
|
// When getting initial value, start pulling values
|
||
|
// from upstream in the main sink
|
||
|
self.sink = Sink(upstream: self.upstream,
|
||
|
downstream: self.downstream,
|
||
|
transformOutput: { [weak self] value in
|
||
|
guard let self = self,
|
||
|
let other = self.latestValue else { return nil }
|
||
|
|
||
|
return self.resultSelector(value, other)
|
||
|
},
|
||
|
transformFailure: { $0 })
|
||
|
|
||
|
// Signal initial value to start fulfilling downstream demand
|
||
|
gotInitialValue = true
|
||
|
onInitialValue()
|
||
|
}
|
||
|
|
||
|
return .unlimited
|
||
|
},
|
||
|
receiveCompletion: nil)
|
||
|
|
||
|
self.second.subscribe(subscriber)
|
||
|
}
|
||
|
|
||
|
var description: String {
|
||
|
return "WithLatestFrom.Subscription<\(Output.self), \(Failure.self)>"
|
||
|
}
|
||
|
|
||
|
func cancel() {
|
||
|
sink = nil
|
||
|
otherSubscription?.cancel()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
#endif
|