122 lines
3.8 KiB
Swift
122 lines
3.8 KiB
Swift
|
//
|
|||
|
// ReplaySubject.swift
|
|||
|
// CombineExt
|
|||
|
//
|
|||
|
// Created by Jasdev Singh on 13/04/2020.
|
|||
|
// Copyright © 2020 Combine Community. All rights reserved.
|
|||
|
//
|
|||
|
|
|||
|
#if canImport(Combine)
|
|||
|
import Combine
|
|||
|
|
|||
|
/// A `ReplaySubject` is a subject that can buffer one or more values. It stores value events, up to its `bufferSize` in a
|
|||
|
/// first-in-first-out manner and then replays it to
|
|||
|
/// future subscribers and also forwards completion events.
|
|||
|
///
|
|||
|
/// The implementation borrows heavily from [Entwine’s](https://github.com/tcldr/Entwine/blob/b839c9fcc7466878d6a823677ce608da998b95b9/Sources/Entwine/Operators/ReplaySubject.swift).
|
|||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
|||
|
public final class ReplaySubject<Output, Failure: Error>: Subject {
|
|||
|
public typealias Output = Output
|
|||
|
public typealias Failure = Failure
|
|||
|
|
|||
|
private let bufferSize: Int
|
|||
|
private var buffer = [Output]()
|
|||
|
|
|||
|
// Keeping track of all live subscriptions, so `send` events can be forwarded to them.
|
|||
|
private var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]()
|
|||
|
|
|||
|
private var completion: Subscribers.Completion<Failure>?
|
|||
|
private var isActive: Bool { completion == nil }
|
|||
|
|
|||
|
/// Create a `ReplaySubject`, buffering up to `bufferSize` values and replaying them to new subscribers
|
|||
|
/// - Parameter bufferSize: The maximum number of value events to buffer and replay to all future subscribers.
|
|||
|
public init(bufferSize: Int) {
|
|||
|
self.bufferSize = bufferSize
|
|||
|
}
|
|||
|
|
|||
|
public func send(_ value: Output) {
|
|||
|
guard isActive else { return }
|
|||
|
|
|||
|
buffer.append(value)
|
|||
|
|
|||
|
if buffer.count > bufferSize {
|
|||
|
buffer.removeFirst()
|
|||
|
}
|
|||
|
|
|||
|
subscriptions.forEach { $0.forwardValueToBuffer(value) }
|
|||
|
}
|
|||
|
|
|||
|
public func send(completion: Subscribers.Completion<Failure>) {
|
|||
|
guard isActive else { return }
|
|||
|
|
|||
|
self.completion = completion
|
|||
|
|
|||
|
subscriptions.forEach { $0.forwardCompletionToBuffer(completion) }
|
|||
|
}
|
|||
|
|
|||
|
public func send(subscription: Combine.Subscription) {
|
|||
|
subscription.request(.unlimited)
|
|||
|
}
|
|||
|
|
|||
|
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output == Subscriber.Input {
|
|||
|
let subscriberIdentifier = subscriber.combineIdentifier
|
|||
|
|
|||
|
let subscription = Subscription(downstream: AnySubscriber(subscriber)) { [weak self] in
|
|||
|
guard let self = self,
|
|||
|
let subscriptionIndex = self.subscriptions
|
|||
|
.firstIndex(where: { $0.innerSubscriberIdentifier == subscriberIdentifier }) else { return }
|
|||
|
|
|||
|
self.subscriptions.remove(at: subscriptionIndex)
|
|||
|
}
|
|||
|
|
|||
|
subscriptions.append(subscription)
|
|||
|
|
|||
|
subscriber.receive(subscription: subscription)
|
|||
|
subscription.replay(buffer, completion: completion)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
|||
|
extension ReplaySubject {
|
|||
|
final class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
|
|||
|
private var demandBuffer: DemandBuffer<Downstream>?
|
|||
|
private var cancellationHandler: (() -> Void)?
|
|||
|
|
|||
|
fileprivate let innerSubscriberIdentifier: CombineIdentifier
|
|||
|
|
|||
|
init(downstream: Downstream, cancellationHandler: (() -> Void)?) {
|
|||
|
self.demandBuffer = DemandBuffer(subscriber: downstream)
|
|||
|
self.innerSubscriberIdentifier = downstream.combineIdentifier
|
|||
|
self.cancellationHandler = cancellationHandler
|
|||
|
}
|
|||
|
|
|||
|
func replay(_ buffer: [Output], completion: Subscribers.Completion<Failure>?) {
|
|||
|
buffer.forEach(forwardValueToBuffer)
|
|||
|
|
|||
|
if let completion = completion {
|
|||
|
forwardCompletionToBuffer(completion)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func forwardValueToBuffer(_ value: Output) {
|
|||
|
_ = demandBuffer?.buffer(value: value)
|
|||
|
}
|
|||
|
|
|||
|
func forwardCompletionToBuffer(_ completion: Subscribers.Completion<Failure>) {
|
|||
|
demandBuffer?.complete(completion: completion)
|
|||
|
}
|
|||
|
|
|||
|
func request(_ demand: Subscribers.Demand) {
|
|||
|
_ = demandBuffer?.demand(demand)
|
|||
|
}
|
|||
|
|
|||
|
func cancel() {
|
|||
|
cancellationHandler?()
|
|||
|
cancellationHandler = nil
|
|||
|
|
|||
|
demandBuffer = nil
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
#endif
|