2020-07-23 16:27:54 -05:00

152 lines
4.6 KiB
Swift

//
// DemandBuffer.swift
// CombineExt
//
// Created by Shai Mishali on 21/02/2020.
// Copyright © 2020 Combine Community. All rights reserved.
//
#if canImport(Combine)
import Combine
import class Foundation.NSRecursiveLock
/// A buffer responsible for managing the demand of a downstream
/// subscriber for an upstream publisher
///
/// It buffers values and completion events and forwards them dynamically
/// according to the demand requested by the downstream
///
/// In a sense, the subscription only relays the requests for demand, as well
/// the events emitted by the upstream to this buffer, which manages
/// the entire behavior and backpressure contract
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
class DemandBuffer<S: Subscriber> {
private let lock = NSRecursiveLock()
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()
/// Initialize a new demand buffer for a provided downstream subscriber
///
/// - parameter subscriber: The downstream subscriber demanding events
init(subscriber: S) {
self.subscriber = subscriber
}
/// Buffer an upstream value to later be forwarded to
/// the downstream subscriber, once it demands it
///
/// - parameter value: Upstream value to buffer
///
/// - returns: The demand fulfilled by the bufferr
func buffer(value: S.Input) -> Subscribers.Demand {
precondition(self.completion == nil,
"How could a completed publisher sent values?! Beats me 🤷‍♂️")
switch demandState.requested {
case .unlimited:
return subscriber.receive(value)
default:
buffer.append(value)
return flush()
}
}
/// Complete the demand buffer with an upstream completion event
///
/// This method will deplete the buffer immediately,
/// based on the currently accumulated demand, and relay the
/// completion event down as soon as demand is fulfilled
///
/// - parameter completion: Completion event
func complete(completion: Subscribers.Completion<S.Failure>) {
precondition(self.completion == nil,
"Completion have already occured, which is quite awkward 🥺")
self.completion = completion
_ = flush()
}
/// Signal to the buffer that the downstream requested new demand
///
/// - note: The buffer will attempt to flush as many events rqeuested
/// by the downstream at this point
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
}
/// Flush buffered events to the downstream based on the current
/// state of the downstream's demand
///
/// - parameter newDemand: The new demand to add. If `nil`, the flush isn't the
/// result of an explicit demand change
///
/// - note: After fulfilling the downstream's request, if completion
/// has already occured, the buffer will be cleared and the
/// completion event will be sent to the downstream subscriber
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
lock.lock()
defer { lock.unlock() }
if let newDemand = newDemand {
demandState.requested += newDemand
}
// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
demandState.processed += 1
}
if let completion = completion {
// Completion event was already sent
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
}
let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand
}
}
// MARK: - Private Helpers
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension DemandBuffer {
/// A model that tracks the downstream's
/// accumulated demand state
struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}
// MARK: - Internally-scoped helpers
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Subscription {
/// Reqeust demand if it's not empty
///
/// - parameter demand: Requested demand
func requestIfNeeded(_ demand: Subscribers.Demand) {
guard demand > .none else { return }
request(demand)
}
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Optional where Wrapped == Subscription {
/// Cancel the Optional subscription and nullify it
mutating func kill() {
self?.cancel()
self = nil
}
}
#endif