Declarative Reactive Programming with Combine
13 December 2021Any non-trivial application uses some form of data, whether retrieved from a web service, from a database or generated by user interaction. A well-behaved application properly responds to data changing over time while preserving a delightful uninterrupted user experience. Achieving this result can be a challenge, though, as most applications rely on several data sources whose changes might require a user interface update at any time.
Reactive programming offers an elegant solution to dealing with heterogeneous data sources, as it provides a comprehensive toolbox with which their results can be processed and consolidated. Though reactive programming is not new in the Apple ecosystem it historically required integrating third-party libraries like ReactiveSwift or RXSwift, which could be a tough decision.
This is why the fact that Apple officially introduced Combine in 2019 is a huge step forward, as it makes reactive programming immediately available to any project targeting iOS 13, tvOS 13, watchOS 6 or macOS 10.15. Adopting Combine or reactive programming might sometimes be frowned upon, though, as the initial learning curve might be steep. This is true but, as this article will attempt to illustrate, Combine and reactive programming can be invaluable tools to help you manage data sources in a way that is both expressive and scalable.
This article assumes some familiarity with Combine and reactive programming in general. If you are new to Combine you should probably watch the corresponding WWDC talk for a mild introduction first. You can also peek at Joseph Heck’s excellent Using Combine book at any time to better understand code snippets appearing in this article.
- Foreword
- Data Emitters
- Data Pipelines
- Triggers and Signals
- Paginated Publishers
- Refresh Publishers
- Accumulators
- Advanced Pipeline Design
- Declarative Data Pipelines and View Models
- User-driven Data Emitters
- Conclusion
- Sample Code
Remark
For the sake of simplicity code samples appearing in this article often define types and methods at global scope. In practice these should be of course be part of a narrower scope for better encapsulation.
Foreword
Async/await and actors draw a lot of attention nowadays and Combine did not receive any significant updates since its introduction in 2019, merely a few stability fixes and a couple of API improvements. The question of whether Combine might soon be discontinued entirely in favor of async/await, or even whether Combine is still relevant now that async/await is available with the same version requirements,1 regularly appears in articles and on Twitter since WWDC 2021.
I am pretty confident that Combine is here to stay, especially considered its tight relationship with SwiftUI. The Combine API was already comprehensive in 2019 and I think that the fact that no significant updates were made is a proof of maturity, rather than the worrying sign of a decaying framework.
This article might also convince you that async/await and Combine are complementary rather than mutually exclusive. Combine is namely a reactive framework with the concept of back pressure at its heart, while asyc/await and actors provide language constructs for structured concurrency. Both approaches might address common concerns like avoiding concurrent access to mutable states or scalability, but in general they serve different purposes.
A significant time I spend every week as an app developer involves writing or updating code that aggregates data from several sources. I found reactive programming and Combine to be invaluable tools for such tasks. It would certainly be possible to achieve the same results using other approaches but, as this article will hopefully illustrate, Combine provides a formalism with which the obtained code can be both expressive and scalable, whereas other approaches usually quickly lead to code which is hard to maintain. If I were to implement a database access layer or a cache, though, I would rather use actors instead, since safe access to a shared resource is more relevant in this case than reactiveness or back pressure. Different purposes, different tools.
Some people might of course argue that Combine and reactive programming are not easy and that compiler support is not always great, and they would certainly be right. But async/await and actors, or even GCD, are not easy either. Concurrency in general is a hard problem and it is only natural that approaches dealing with it have some learning curve, advantages and drawbacks. Just stay open-minded about which options are available and where they work best to pick the one that will help you write better code. We never had as many great options as we have nowadays, and it would be a shame not to use the one that works best in a specific case.
Data Emitters
It usually does not matter how a data source actually fetches data internally. This might happen through a network request or a database operation, but how the associated network or data access layers are internally implemented is not important. Instead you should treat data sources in your application as opaque data emitters with a well-defined API contract. This contract essentially boils down to what kind of data is emitted, how the process might fail, and whether an emitter delivers data once or several times until possible exhaustion.
With Combine any data emitter can be exposed as a publisher, either because this is how its API was designed in the first place or because some asynchronous API call was wrapped into a Future
. The specifics of data delivery are expressed through output and failure generic types, for example:
func profilePublisher(forUserId: String) -> AnyPublisher<Profile, Error>
func filesPublisher(inDirectoryAt: url: URL) -> AnyPublisher<[File], Error>
func lifecycleEventPublisher() -> AnyPublisher<NSNotification, Never>
Data Pipelines
Once you have identified the data emitters you need you can connect them together, ultimately forming a tree-shaped structure describing the entire data delivery process. No matter how complex this tree can be, it can be seen as a data pipeline assembling data from various emitters, transforming and consolidating it along the way, before delivering it to a single endpoint.
Combine provides a rich toolset to build pipelines:
- Operators with which you can shape the data, e.g. extracting or formatting a value, or wrapping it into a different type (
map
,filter
, etc.). - Operators with which you can tweak the delivery process (
throttle
,debounce
, etc.). - Operators which gather data before delivering it further (
scan
,reduce
). - Publishers that aggregate other publishers (
Publishers.Merge
,Publishers.Zip
,Publishers.CombineLatest
). - And many more…
Using publishers like Publishers.Merge
or Publishers.CombineLatest
, as well as combinations of operators like map
and switchToLatest
, a trunk is able to grow branches, which themselves can grow other branches and so forth, allowing you to create abritrarily complex data delivery trees, e.g. to consolidate data associated with a user id:
The process of building a pipeline this way is inherently declarative, as you focus on the description of the data flow rather than on the intricacies of its implementation. Moreover, since any pipeline is itself a publisher, arbitrarily complex pipelines can be treated as opaque data emitters and inserted into other pipelines, in a composite fashion. For example the above pipeline reduces to a single user data emitter and can therefore be used in a pipeline fetching several user ids and consolidating the result as a view model state:
Triggers and Signals
Some data emitters in a pipeline might deliver data in pages of results. Traditional examples include network or database requests, as usually only partial results can be retrieved at a time, not the whole result set at once. When additional pages of results need to be loaded on-demand (e.g. when the user reaches the bottom of a list), it would be tempting to use a secondary pipeline for retrieving the subsequent pages of results. This approach, often applied in imperative block-based network code, can of course be applied with reactive pipelines, but suffers from similar drawbacks:
- A pointer to the next retrievable page must be stored as a mutable state external to the pipelines.
- The results must be consolidated between the two pipelines, again via an external mutable state.
- Cancellation must be properly coordinated. In our example, if a reload is triggered for the first pipeline while the secondary pipeline is still retrieving the next page of results, then both pipelines must likely be reset so that the overall state remains consistent.
This approach does not scale well, since more states need to be added and synchronized as the number of paginated emitters increases. To make things worse, and due to the usually concurrent nature of data retrieval, extra care is required when accessing all these shared mutable states.
With the help of a few additions, though, reactive programming and Combine make it possible to eliminate these issues entirely. If we namely only keep the first pipeline describing the whole data delivery process (without pagination), we can directly build pagination into this pipeline if we are able to ask any publisher directly for more results when needed. Updated results will then flow through the usual pipeline and deliver a consolidated update consistently.
For example, if the document and address publishers in the above example both support pagination, all we need to add pagination support is a way to contact these two data emitters directly when more data is required:
The idea of contacting a publisher directly can be neatly implemented using signals and triggers. A signal is a publisher whose values we are not interested in (Void
), which never fails, and whose only purpose is therefore to signal that something happened, for example:
func networkReachableAgainSignal() -> AnyPublisher<Void, Never>
A trigger, on the other hand, is a communication device with a set of associated signals, which it can contact on-demand so that they emit a value. Being publishers, signals associated with a trigger can be inserted into any pipeline at design time to define control points which can be activated on-demand later on.
Implementing triggers and associated signals is straightforward:
struct Trigger {
typealias Index = Int
typealias Signal = AnyPublisher<Void, Never>
private let sender = PassthroughSubject<Index, Never>()
func signal(activatedBy index: Index) -> Signal {
return sender
.filter { $0 == index }
.map { _ in }
.eraseToAnyPublisher()
}
func activate(for index: Index) {
sender.send(index)
}
}
By leveraging publishers to implement communication between triggers and signals we ensure that no state or multithreading issues arise, as all the heavy lifting is entirely managed by Combine itself.
Now assume we are equipped with a trigger:
let trigger = Trigger()
We can create an associated signal publisher for some identifier:
let signal = trigger.signal(activatedBy: 1)
which will emit a value when activated by the trigger:
trigger.activate(for: 1)
Note that we use integers as identifiers, but we can simply use any hashable type as well with the help of the following extension:
extension Trigger {
func signal<T>(activatedBy t: T) -> Signal where T: Hashable {
return signal(activatedBy: t.hashValue)
}
func activate<T>(for t: T) where T: Hashable {
activate(for: t.hashValue)
}
}
With triggers and signals now defined, let us see how they can be used to implement publishers supporting pagination.
Paginated Publishers
Assume we have some paginated publisher which returns items for a specific page, as well as a next
page which can be used to load the next page of results (if any):
func itemsPublisher(at page: Page?) -> AnyPublisher<(items: [Items], next: Page?), Error>
The first page of results can be obtained with page
set to nil
, and the next
page is set to nil
when all results have been returned. The publisher might internally retrieve results from a web service or from a database, and the next page might be extracted from the data itself, from a database cursor or from a response HTTP headers, but this is not important to our present discussion.
We cannot use this publisher as is in a complex declarative data pipeline, as the page parameter would require some state to be stored externally, as discussed in the previous section. But what if we could have the publisher itself manage this state internally?
This is exactly what we can achieve with triggers and signals. Let us namely define a helper publisher which takes a signal bound to a trigger as parameter. Since pagination is not necessarily desired in all cases we define this parameter as optional:
func itemsPublisher(at page: Page?, paginatedBy paginator: Trigger.Signal?) -> AnyPublisher<(items: [Items], next: Page?), Error>
If we could find a way to have this publisher enter a dormant state each time a page of results has been emitted, we could wake it up when a new page of results is desired, so that pagination can be entirely managed by the publisher implementation, eliminating the need for external states. This is actually possible if we introduce a wait(untilOutputFrom:)
operator which, as its name suggests, simply waits for another signal to emit a value:
extension Publisher {
func wait<S>(untilOutputFrom signal: S) -> AnyPublisher<Self.Output, Self.Failure> where S: Publisher, S.Failure == Never {
return prepend(
Empty(completeImmediately: false)
.prefix(untilOutputFrom: signal)
)
.eraseToAnyPublisher()
}
}
Using this new operator we can manage pagination entirely within our helper publisher implementation:
func itemsPublisher(at page: Page?, paginatedBy paginator: Trigger.Signal?) -> AnyPublisher<(items: [Items], next: Page?), Error> {
return itemsPublisher(at: page)
.map { result -> AnyPublisher<(items: [Items], next: Page?), Error> in
if let paginator = paginator, let next = result.next {
return itemsPublisher(at: next, paginatedBy: paginator)
.wait(untilOutputFrom: paginator)
.retry(.max)
.prepend(result)
.eraseToAnyPublisher()
}
else {
return Just(result)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
}
.switchToLatest()
}
This implementation deserves a few words of explanation:
- If a paginator has been provided and a next page is available we return the result immediately (
prepend
) and recursively prepare the request for the subsequent page of results, whose delivery is controlled by await
(the pipeline must be read from the bottom to the top here). We also insert aretry(.max)
so that, in the event a request fails, the trigger can be used to perform new attempts. - Otherwise we just return the result we have and the publisher completes.
This helper publisher can finally be used to implement the paginated item publisher we need, which can be inserted into any pipeline and controlled externally when more results are needed:
func itemsPublisher(paginatedBy paginator: Trigger.Signal? = nil) -> AnyPublisher<[Item], Error> {
return itemsPublisher(at: nil, paginatedBy: paginator)
.map(\.items)
.eraseToAnyPublisher()
}
Now that have discussed how triggers and signals can be used to insert control points into any declarative pipelines, let us discuss how signals can be also used to refresh an entire pipeline or subset thereof.
Refresh Publishers
Let us assume we have some pipeline delivering data to a view model the first time its associated view is displayed, and that this process can fail (e.g. because network requests are somehow involved):
func dataPublisher() -> AnyPublisher<Data, Error>
An application should in general be able to execute this pipeline again, most notably:
- When the user manually reloads the screen (e.g. pull-to-refresh).
- When the application returns to the foreground, so that data is up to date.
- When the network is reachable again, so that the application can automatically recover from network failure or ensure data is up to date.
It is tempting to recreate the publisher again in such cases, but there is in fact a better way. If we namely realize that the above events can be translated into corresponding signal publishers, we can namely build these events right into our original reactive pipeline.
Assume we have wrapped the above events into publishers with the following signatures:
func reloadSignal() -> AnyPublisher<Void, Never>
func foregroundSignal() -> AnyPublisher<Void, Never>
func networkReachableAgainSignal() -> AnyPublisher<Void, Never>
We can use any of these signals to force our pipeline or a subset thereof to be executed or repeated when they emit a value, provided we introduce the following publisher helpers:
extension Publishers {
static func Publish<S, P>(onOutputFrom signal: S, _ publisher: @escaping () -> P) -> AnyPublisher<P.Output, P.Failure> where S: Publisher, P: Publisher, S.Failure == Never {
return signal
.map { _ in }
.setFailureType(to: P.Failure.self) // Required for iOS 13, can be removed for iOS 14+
.map { _ in
return publisher()
}
.switchToLatest()
.eraseToAnyPublisher()
}
static func PublishAndRepeat<S, P>(onOutputFrom signal: S, _ publisher: @escaping () -> P) -> AnyPublisher<P.Output, P.Failure> where S: Publisher, P: Publisher, S.Failure == Never {
return signal
.map { _ in }
.prepend(())
.setFailureType(to: P.Failure.self) // Required for iOS 13, can be removed for iOS 14+
.map { _ in
return publisher()
}
.switchToLatest()
.eraseToAnyPublisher()
}
}
The first publisher only executes the wrapped publisher when a signal emits a value, while the second one always executes the wrapped publisher at least once (a value is namely prepended to the pipeline), repeating the process each time a signal emits a value.
Equipped with these publishers it is straightforward to have dataPublisher()
execute once and repeat when, for example, a reload is made:
Publishers.PublishAndRepeat(onOutputFrom: reloadSignal()) {
return dataPublisher()
}
We can even respond to any of the above listed signals by merging them together first:
func consolidatedReloadSignal() -> AnyPublisher<Void, Never> {
return Publishers.MergeMany(
reloadSignal(),
foregroundSignal(),
networkReachableAgainSignal()
)
.eraseToAnyPublisher()
}
then using the consolidated signal instead:
Publishers.PublishAndRepeat(onOutputFrom: consolidatedReloadSignal()) {
return dataPublisher()
}
Accumulators
Suppose we want to build a Netflix-like homepage whose main structure is made of different topic rows (e.g. Comedy, Drama, Documentaries, etc.), each one presenting a list of associated medias. The topic list and medias for each topic are retrieved from a webservice through associated data emitters:
func topics() -> AnyPublisher<[Topic], Error>
func medias(forTopicId topicId: String) -> AnyPublisher<[Media], Error>
delivering instances of the following types:
struct Topic {
var id: String
var title: String
// ...
}
struct Media {
var id: String
var title: String
// ...
}
We want to design a pipeline retrieving topics and their associated medias, consolidating them into rows:
struct Row {
let topic: Topic
let medias: [Media]
}
Since the number of topics is not known beforehand we cannot directly use Publishers.CombineLatest
to retrieve medias once the topic list is known, as Publishers.CombineLatest
does not support arbitrary lists of publishers (currently only 2, 3 or 4 publishers are supported, though variadic generics might lift this restriction in the future).
With simple bissection, though, we can implement a Publishers.AccumulateLatestMany
publisher which supports an arbitrary number of publishers, returning their output in order as an array rather than as a tuple:
extension Publishers {
static func AccumulateLatestMany<Upstream>(_ publishers: Upstream...) -> AnyPublisher<[Upstream.Output], Upstream.Failure> where Upstream: Publisher {
return AccumulateLatestMany(publishers)
}
static func AccumulateLatestMany<Upstream, S>(_ publishers: S) -> AnyPublisher<[Upstream.Output], Upstream.Failure> where Upstream: Publisher, S: Swift.Sequence, S.Element == Upstream {
let publishersArray = Array(publishers)
switch publishersArray.count {
case 0:
return Just([])
.setFailureType(to: Upstream.Failure.self)
.eraseToAnyPublisher()
case 1:
return publishersArray[0]
.map { [$0] }
.eraseToAnyPublisher()
case 2:
return Publishers.CombineLatest(publishersArray[0], publishersArray[1])
.map { t1, t2 in
return [t1, t2]
}
.eraseToAnyPublisher()
case 3:
return Publishers.CombineLatest3(publishersArray[0], publishersArray[1], publishersArray[2])
.map { t1, t2, t3 in
return [t1, t2, t3]
}
.eraseToAnyPublisher()
default:
let half = publishersArray.count / 2
return Publishers.CombineLatest(
AccumulateLatestMany(Array(publishersArray[0..<half])),
AccumulateLatestMany(Array(publishersArray[half..<publishersArray.count]))
)
.map { array1, array2 in
return array1 + array2
}
.eraseToAnyPublisher()
}
}
}
With the help of Publishers.AccumulateLatestMany
we can now write a publisher for our Netflix-like homepage:
func rowsPublisher() -> AnyPublisher<[Row], Error> {
return topics()
.map { topics in
return Publishers.AccumulateLatestMany(topics.map { topic in
return medias(forTopicId: topic.id)
.map { Row(topic: topic, medias: $0) }
})
}
.switchToLatest()
.eraseToAnyPublisher()
}
This publisher is kept as simple as possible but could be improved in several ways:
- Since
Publishers.AccumulateLatestMany
is based onPublishers.CombineLatest
, a result is only delivered by the pipeline once all rows could be loaded. In general, though, we would prefer delivering rows as they are retrieved. This behavior can be implemented by immediately emitting a row before performing themedias(forTopicId:)
request using theprepend
operator. - Similarly, if a media request fails for some topic, the entire pipeline fails. Again this can be improved by using
replaceError
to emit a row instead.
A good row value to use in both cases is a list of placeholder medias, or a list of previously retrieved medias. Please have a look at the sample code associated with this article to see how this can be achieved in practice.
Advanced Pipeline Design
With triggers, signals, refresh publishers, accumulators and a recipe to implement paginated publishers, we can now write a single declarative pipeline for our Netflix-like homepage which not only retrieves topics and their medias in order, but also supports global reloads and individual pagination per topic.
Let us assume that media lists per topic support pagination. We can augment the API contract to support pagination via trigger and signals as described in the Paginated Publishers section:
func topics() -> AnyPublisher<[Topic], Error>
func medias(forTopicId topicId: String, paginatedBy paginator: Trigger.Signal? = nil) -> AnyPublisher<[Media], Error>
On a Netflix-like homepage, where media lists are displayed as horizontally scrollable rows for each topic, we want to be able to load more content when the user scrolls to the end of a row. Moreover, we want a pull-to-refresh to be available so that the user can reload the entire screen when desired. Let us introduce a hashable enum describing these use cases:
enum TriggerId: Hashable {
case reload
case loadMore(topicId: String)
}
Also assume we have a trigger stored somewhere:2
let trigger = Trigger()
We can now enhance our rowsPublisher()
pipeline to add support for global reloads and pagination in each section:
func rowsPublisher() -> AnyPublisher<[Row], Error> {
return Publishers.PublishAndRepeat(onOutputFrom: trigger.signal(activatedBy: TriggerId.reload)) { [trigger] in
return topics()
.map { topics in
return Publishers.AccumulateLatestMany(topics.map { topic in
return medias(forTopicId: topic.id, paginatedBy: TriggerId.loadMore(topicId: topic.id))
.scan([]) { $0 + $1 }
.map { Row(topic: topic, medias: $0) }
})
}
.switchToLatest()
.eraseToAnyPublisher()
}
}
Only three changes were required in comparison to the pipeline we obtained at the end of the Accumulators section:
Publishers.PublishAndRepeat
ensures the pipeline is executed once and repeated when a reload is triggered.- A paginator signal is provided to
medias(forTopicId:paginatedBy:)
so that more medias can be loaded for any topic, independently and on-demand. scan
is used to gather pages of medias returned by the paginatedmedias(forTopicId:paginatedBy:)
publisher.
We proceeded here like you would in practice, namely by starting with a basic pipeline implementing the nominal case, then inserting signals where appropriate to deliver more data or repeat part or the entirety of the pipeline. This approach is especially nice, as it allows a pipeline to be gradually enhanced with surgical code changes. Think about the code you would have written if you had to do the same with block-based APIs and you will probably better understand the advantages of this approach.
Even better, you can very easily throttle data delivery (e.g. to avoid signals triggering too many unnecessary reloads in part of the pipeline) or use debouncing to add some delay (e.g. if a signal is bound to keyboard input), with only a few additional operator insertions. This is an area where reactive programming really shines in comparison to more imperative approaches which would require much more convoluted code to be written.
Declarative Data Pipelines and View Models
Declarative data pipelines are especially useful when writing view models conforming to ObservableObject
. Since pipelines deliver consolidated results they can be immediately wired to a published state
property so that the view automatically reflects its changes.
For example our Netflix-like homepage could be driven by the following view model, which is based on our rowsPublisher()
from the previous section, whose result is wrapped into a State
:
final class HomepageViewModel: ObservableObject {
enum State {
case loading
case loaded(rows: [Row])
case failure(error: Error)
}
@Published private(set) var state: State = .loading
private let trigger = Trigger()
init() {
Publishers.PublishAndRepeat(onOutputFrom: consolidatedReloadSignal()) { [trigger] in
return topics()
.map { topics in
return Publishers.AccumulateLatestMany(topics.map { topic in
return medias(forTopicId: topic.id, paginatedBy: TriggerId.loadMore(topicId: topic.id))
.scan([]) { $0 + $1 }
.map { Row(topic: topic, medias: $0) }
})
}
.switchToLatest()
.map { State.loaded(rows: $0) }
.catch { error in
return Just(State.failure(error: error))
}
.eraseToAnyPublisher()
}
.receive(on: DispatchQueue.main)
.assign(to: &$state)
}
func reload() {
trigger.activate(for: TriggerId.reload)
}
func loadMore(for topic: Topic) {
trigger.activate(for: TriggerId.loadMore(topicId: topic.id))
}
private func consolidatedReloadSignal() {
return Publishers.Merge(
trigger.signal(activatedBy: TriggerId.reload),
networkReachableAgainSignal(),
foregroundSignal()
)
.eraseToAnyPublisher()
}
}
By assigning the publisher to the state
property publisher using the assign(to:)
operator, we bind the lifetime of our pipeline to the lifetime of the state
property and therefore to the HomepageViewModel
instance itself. This ensures correct resource management without the need for explicit cancellables. We also catch errors and replace them with a new publisher so that the pipeline never finishes, even in case of failure.
Here is a visual representation of the pipeline, with reloads and pagination indicated:
Just take a deep breath and consider which features the above view model provides in ~50 lines of code:
- The model delivers state updates in a reactive way, with loading and failure states, as well as a loaded state with the relevant content attached.
- It performs a request for topics and, for each one, performs another request to gather medias associated with it.
- It provides support for reloading, either in response to user interaction (e.g. pull-to-refresh) or in response to application environment changes.
- More medias can be individually loaded for each topic (as the user scrolls, for example).
- If a reload occurs while more medias are still being loaded for one or several rows, all pipelines are properly cancelled accordingly.
Remark
There is no way to cancel subscriptions made with the assign(to:)
operator since no AnyCancellable
is returned. This means that associated subscriptions remain valid until the parent ObservableObject
is deinitialized.
Also note that calling assign(to:)
several times on the same published property does not replace existing subscriptions. New subscriptions will pile up instead, which is why you should avoid code that calls assign(to:)
unnecessarily.
For these reasons declaring a reactive pipeline from a designated intializer is a good practice, especially when this pipeline is wired to a published property using assign(to:)
.
Sometimes a pipeline might depend on parameters supplied by the user, though, for example a search criterium or a username and password pair. Such parameters are not known at initialization time and you might wonder how their updated values can be provided to an existing pipeline. The next section will show you how this can be achieved.
User-driven Data Emitters
Assume our example webservice provides an additional endpoint to search medias using some query and additional settings. We introduce a corresponding data emitter:
func medias(matchingQuery query: String, settings: Settings) -> AnyPublisher<[Media], Error>
with settings letting the user pick a date range or a sort order, for example:
struct Setting {
let dateRange: DateRange?
let ascending: Bool
// ...
}
In most scenarios the above emitter would likely support pagination as described in the Paginated Publishers section, but this is left as an exercise for the reader. As usual the details of the data emitter implementation are not important, only its signature is.
Similar to what we did in the previous section we can now implement the view model of a basic search screen. This view model must support query and setting updates made by the user. We declare a pipeline in the view model initializer, starting our implementation with constant query and settings:
final class SearchViewModel: ObservableObject {
enum State {
case loading
case loaded(medias: [Media])
case failure(error: Error)
}
let query = ""
let settings = Settings()
@Published private(set) var state: State = .loading
init() {
Publishers.PublishAndRepeat(onOutputFrom: consolidatedReloadSignal()) { [query, settings] in
return medias(matchingQuery: query, settings: settings)
.map { State.loaded(medias: $0) }
.catch { error in
return Just(State.failure(error: error))
}
.eraseToAnyPublisher()
}
.receive(on: DispatchQueue.main)
.assign(to: &$state)
}
// ...
}
We implement pull-to-refresh and respond to the application being woken up by using the same consolidatedReloadSignal()
signal introduced in the previous section (details are therefore omitted here). Note that the use of Publishers.PublishAndRepeat(onOutputFrom:)
requires the query and settings to be accessible within the associated closure, which is here achieved through a capture list.
The query and settings must support updates made by the user, but simply turning associated properties into var
s does not work:
- Since
String
andSettings
are value types, their initial value is captured. Updates will never be visible in the closure. - There is no way to reload the pipeline in response to these values being changed.
The need to respond to change is a hint about how we can solve both problems. Instead of simple mutable properties, let us namely introduce @Published
properties:
final class SearchViewModel: ObservableObject {
// ...
@Published var query = ""
@Published var settings = Settings()
// ...
}
The publishers associated with these properties can now be captured and inserted into the pipeline so that search results are updated when the query or settings change:
final class SearchViewModel: ObservableObject {
enum State {
case loading
case loaded(medias: [Media])
case failure(error: Error)
}
@Published var query = ""
@Published var settings = Settings()
@Published private(set) var state: State = .loading
init() {
Publishers.PublishAndRepeat(onOutputFrom: consolidatedReloadSignal()) { [$query, $settings] in
Publishers.CombineLatest($query, $settings)
.map { query, settings in
return medias(matchingQuery: query, settings: settings)
}
.switchToLatest()
.map { State.loaded(medias: $0) }
.prepend(State.loading)
.catch { error in
return Just(State.failure(error: error))
}
.eraseToAnyPublisher()
}
.receive(on: DispatchQueue.main)
.assign(to: &$state)
}
// ...
}
The above code deserves a brief explanation:
- Query and setting publishers are assembled using
Publishers.CombineLatest
so that any update received from them triggers a new search. Note thatPublishers.CombineLatest
requires each involved publisher to emit a value before emitting its first value, which is here guaranteed since@Published
publishers provide their initial value automatically upon subscription. The pipeline therefore always executes once. - A usual
map
andswitchToLatest
combination is used to produce a single request matching the latest search parameters, ensuring prior requests are properly discarded. - The
prepend
operator is used to reset the state toState.loading
before each new search attempt.
As usual we can easily tweak this pipeline further, for example to avoid performing a new request if the query did not change, or to ensure requests are not immediately sent while the user is still typing or updating search settings:
final class SearchViewModel: ObservableObject {
enum State {
case loading
case loaded(medias: [Media])
case failure(error: Error)
}
@Published var query = ""
@Published var settings = Settings()
@Published private(set) var state: State = .loading
init() {
Publishers.PublishAndRepeat(onOutputFrom: consolidatedReloadSignal()) { [$query, $settings] in
Publishers.CombineLatest($query.removeDuplicates(), $settings)
.debounce(for: 0.3, scheduler: DispatchQueue.main)
.map { query, settings in
return medias(matchingQuery: query, settings: settings)
}
.switchToLatest()
.map { State.loaded(medias: $0) }
.prepend(State.loading)
.catch { error in
return Just(State.failure(error: error))
}
.eraseToAnyPublisher()
}
.receive(on: DispatchQueue.main)
.assign(to: &$state)
}
// ...
}
Conclusion
In this article we illustrated how reactive programming and Combine can be used to build data delivery pipelines in a declarative way. The strategies we elaborated are not only scalable, but also eliminate challenges associated with shared mutable states. This neatly avoid issues commonly encountered when aggregating several asynchronous data sources using imperative block-based or async/await-based approaches.
Key concepts we introduced are signals and triggers, which can be inserted as control points into any pipeline, as well as a wait(untilOutputFrom:)
operator which can be used to implement publishers natively supporting signal-based pagination. We also introduced Publishers.AccumulateLatestMany
to solve limitations of Publishers.CombineLatest
, especially when an arbitrary number of publishers must deliver their results in a specific order.
Finally, we applied this declarative formalism to view model building, opening the door to implementations where view and view models are built declaratively.3 Exactly like a SwiftUI View
is a view recipe, focusing on the desired result rather than on its actual implementation, a declarative pipeline is namely a data delivery recipe and as such is a perfect fit for view models.
The approach discussed in this article has of course a few drawbacks. Combine and reactive programming have a steep learning curve, and compiler messages might be cryptic, though experience definitely helps. Pipeline design also requires special care, as it can be quite surgical. There is definitely an entry fee involved, but hopefully this article helped you realize the return might be worth the investment.
Finally, and though this discussion was focused on reactive programming with Combine, all the concepts introduced in this article could likely be transposed to any other declarative framework. For this reason I hope this article can be insightful outside Apple or Swift development communities as well, or useful to people using reactive frameworks like ReactiveSwift or RxSwift.
Sample Code
Sample code is provided on GitHub. The Combine toolset built in this article is provided as a Swift package associated with the project, which implements a basic Netflix-like homepage as described in this article, and built using SwiftUI. The underlying view model includes a few improvements in comparison to the one discussed in this article, mostly to display placeholders while loading content or if a media list cannot be loaded for some reason.
-
As of Xcode 13.2 async/await are backward compatible with the same minimum OS versions as Combine. ↩
-
Recall that we use a global scope for simplicity, but in this case trigger and row publishers would likely be stored in a class (therefore the capture list used to make
trigger
accessible within the block). ↩ -
SwiftUI or UIKit diffable data sources and compositional layouts, for example, nicely support this philosophy. ↩