152 lines
4.6 KiB
Swift
152 lines
4.6 KiB
Swift
|
//
|
||
|
// DemandBuffer.swift
|
||
|
// CombineExt
|
||
|
//
|
||
|
// Created by Shai Mishali on 21/02/2020.
|
||
|
// Copyright © 2020 Combine Community. All rights reserved.
|
||
|
//
|
||
|
|
||
|
#if canImport(Combine)
|
||
|
import Combine
|
||
|
import class Foundation.NSRecursiveLock
|
||
|
|
||
|
/// A buffer responsible for managing the demand of a downstream
|
||
|
/// subscriber for an upstream publisher
|
||
|
///
|
||
|
/// It buffers values and completion events and forwards them dynamically
|
||
|
/// according to the demand requested by the downstream
|
||
|
///
|
||
|
/// In a sense, the subscription only relays the requests for demand, as well
|
||
|
/// the events emitted by the upstream — to this buffer, which manages
|
||
|
/// the entire behavior and backpressure contract
|
||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||
|
class DemandBuffer<S: Subscriber> {
|
||
|
private let lock = NSRecursiveLock()
|
||
|
private var buffer = [S.Input]()
|
||
|
private let subscriber: S
|
||
|
private var completion: Subscribers.Completion<S.Failure>?
|
||
|
private var demandState = Demand()
|
||
|
|
||
|
/// Initialize a new demand buffer for a provided downstream subscriber
|
||
|
///
|
||
|
/// - parameter subscriber: The downstream subscriber demanding events
|
||
|
init(subscriber: S) {
|
||
|
self.subscriber = subscriber
|
||
|
}
|
||
|
|
||
|
/// Buffer an upstream value to later be forwarded to
|
||
|
/// the downstream subscriber, once it demands it
|
||
|
///
|
||
|
/// - parameter value: Upstream value to buffer
|
||
|
///
|
||
|
/// - returns: The demand fulfilled by the bufferr
|
||
|
func buffer(value: S.Input) -> Subscribers.Demand {
|
||
|
precondition(self.completion == nil,
|
||
|
"How could a completed publisher sent values?! Beats me 🤷♂️")
|
||
|
|
||
|
switch demandState.requested {
|
||
|
case .unlimited:
|
||
|
return subscriber.receive(value)
|
||
|
default:
|
||
|
buffer.append(value)
|
||
|
return flush()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Complete the demand buffer with an upstream completion event
|
||
|
///
|
||
|
/// This method will deplete the buffer immediately,
|
||
|
/// based on the currently accumulated demand, and relay the
|
||
|
/// completion event down as soon as demand is fulfilled
|
||
|
///
|
||
|
/// - parameter completion: Completion event
|
||
|
func complete(completion: Subscribers.Completion<S.Failure>) {
|
||
|
precondition(self.completion == nil,
|
||
|
"Completion have already occured, which is quite awkward 🥺")
|
||
|
|
||
|
self.completion = completion
|
||
|
_ = flush()
|
||
|
}
|
||
|
|
||
|
/// Signal to the buffer that the downstream requested new demand
|
||
|
///
|
||
|
/// - note: The buffer will attempt to flush as many events rqeuested
|
||
|
/// by the downstream at this point
|
||
|
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
|
||
|
flush(adding: demand)
|
||
|
}
|
||
|
|
||
|
/// Flush buffered events to the downstream based on the current
|
||
|
/// state of the downstream's demand
|
||
|
///
|
||
|
/// - parameter newDemand: The new demand to add. If `nil`, the flush isn't the
|
||
|
/// result of an explicit demand change
|
||
|
///
|
||
|
/// - note: After fulfilling the downstream's request, if completion
|
||
|
/// has already occured, the buffer will be cleared and the
|
||
|
/// completion event will be sent to the downstream subscriber
|
||
|
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
|
||
|
lock.lock()
|
||
|
defer { lock.unlock() }
|
||
|
|
||
|
if let newDemand = newDemand {
|
||
|
demandState.requested += newDemand
|
||
|
}
|
||
|
|
||
|
// If buffer isn't ready for flushing, return immediately
|
||
|
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
|
||
|
|
||
|
while !buffer.isEmpty && demandState.processed < demandState.requested {
|
||
|
demandState.requested += subscriber.receive(buffer.remove(at: 0))
|
||
|
demandState.processed += 1
|
||
|
}
|
||
|
|
||
|
if let completion = completion {
|
||
|
// Completion event was already sent
|
||
|
buffer = []
|
||
|
demandState = .init()
|
||
|
self.completion = nil
|
||
|
subscriber.receive(completion: completion)
|
||
|
return .none
|
||
|
}
|
||
|
|
||
|
let sentDemand = demandState.requested - demandState.sent
|
||
|
demandState.sent += sentDemand
|
||
|
return sentDemand
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// MARK: - Private Helpers
|
||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||
|
private extension DemandBuffer {
|
||
|
/// A model that tracks the downstream's
|
||
|
/// accumulated demand state
|
||
|
struct Demand {
|
||
|
var processed: Subscribers.Demand = .none
|
||
|
var requested: Subscribers.Demand = .none
|
||
|
var sent: Subscribers.Demand = .none
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// MARK: - Internally-scoped helpers
|
||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||
|
extension Subscription {
|
||
|
/// Reqeust demand if it's not empty
|
||
|
///
|
||
|
/// - parameter demand: Requested demand
|
||
|
func requestIfNeeded(_ demand: Subscribers.Demand) {
|
||
|
guard demand > .none else { return }
|
||
|
request(demand)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||
|
extension Optional where Wrapped == Subscription {
|
||
|
/// Cancel the Optional subscription and nullify it
|
||
|
mutating func kill() {
|
||
|
self?.cancel()
|
||
|
self = nil
|
||
|
}
|
||
|
}
|
||
|
#endif
|