A good reactive programming library takes a huge maintenance burden off some of the most commonly written, bug-prone code in applications.
In the previous article I presented my own library for reactive programming, CwlSignal, but simple syntax examples don’t really demonstrate how to use reactive programming to solve problems.
In this article, I’ll explain why I consider reactive programming to be one of the most important design patterns for application programming by looking at three scenarios that are common in application development, yet are a drain on development time, lead to frequent bugs and make design and refactoring hard. I’ll show how reactive programming addresses the verbosity, eliminates the unsafety and restructures the code to aid maintainability.
The specific scenarios I’ll examine will be:
- A button whose
isEnabled
status is dependent on two different state values - A thread-safe storage type
- An asynchronous task with a timeout
A description of reactive programming
Reactive programming addresses the following problem:
Any “getter” for mutable state causes problems because it gives you the current value without ensuring you respond to the changes.
by suggesting the following solution:
Never store mutable state on your types. Instead, when you generate a new value in response to a change, send the value into a channel. Any part of the program that relies upon that value must subscribe to the channel.
The idea is that we remove state from exposed parts of our program and encapsulate it inside channels instead. Channels clearly show data dependencies and effects, make changes easy to reason about and easy to maintain and otherwise simplify how we modify application state.
Reactive programming supports this underlying principle with an approach that centers on serial and parallel compositions of channels to transform streams of data as they are emitted and merge changes that may occur concurrently or in otherwise intersecting patterns.
Simply put: reactive programming manages asynchronous data flows between sources of data and components that need to react to that data.
A button dependent on two state values
Let’s start with a fundamental task in application programming: setting the isEnabled
status of a button.
Imagine your app as a screen to upload files to a server. To do this, you have an “Upload” button. The “Upload” button should be enabled if (and only if) both of the following are true:
- the app is connected to a server
- the selection is non-empty
Let’s include two complicating factors:
- the connection status is typically updated on a background thread
- either the server connection or selection objects could completely change or even be set to nil while we’re observing them
Assuming updates for relevant values are sent using Key-Value-Observing then our view controller might contain the following code:
serverObs = sharedServer.observe(\.currentServer, options: .initial) {
[uploadButton, serverStatusButton] object, change in
uploadButton.isEnabled =
object.currentServer != nil && sharedSelection.currentSelection?.selection.isEmpty == false
}
selectionObs = sharedSelection.observe(\.currentSelection, options: .initial) {
[filesSelectedLabel, uploadButton, weak self] object, change in
if let s = object.currentSelection {
self?.selectionNameObs = s.observe(\.selection, options: .initial) {
[filesSelectedLabel, uploadButton] object, change in
uploadButton.isEnabled =
sharedServer.currentServer != nil && object.selection.isEmpty == false
}
} else {
uploadButton.isEnabled = false
}
}
It’s exhausting just to look at. The logic around updating three controls takes 18 lines of densely written code. The isEnabled
flag on the uploadButton
is updated in three different locations with subtly different logic in each location. And it’s still not enough to do things properly.
This code includes a few subtle but common mistakes that occur, not because we’re bad programmers but because we code for the path of least resistance. We fix problems if they’re immediately apparent but if the code mostly works – as this code does – it’s likely to get through testing.
What problems are in this code that could cause headaches later?
- Thread unsafe: with the server connection updating on a background thread, its closure is called on the background thread, resulting in unsafe access to the
uploadButton
. - Not transactional: both observation methods need to access the value reported by the other observation method, meaning that you might process a change twice or you might process changes in an inconsistent order
There are some other issue that are not bugs but are maintenance risks:
- It’s difficult to refactor: We update the
uploadButton.isEnabled
value in 3 different places. If another dependency were added to this value, we would need to remember to update all three locations. - We access values directly: Since this approach requires accessing changing values directly when other values change, it is possible to forget to observe some values at all, leading to change propagation failure.
- Nested observations: When the current selection changes, we need to observe the new selection. Handling observations inside observations is really clumsy and easy to get wrong.
In reactive programming, assuming all state values send their changes through reactive programming channels, rather than Key-Value-Observing, you’d only need the following:
let latestSelection = FileSelection.currentSelection
.flatMapLatest { curSel in curSel?.selection.map(Optional.some) ?? .just(nil) }
lifetimes += Server.currentServer
.combineLatest(latestSelection) { server, selection in server != nil && selection?.isEmpty == false }
.subscribeValues(context: .main) { [uploadButton] canUpload in uploadButton.isEnabled = canUpload }
The above problems are fixed:
- this code is completely threadsafe
- all notifications are issued in sequence without repeated processing of values
- no repeated logic
- no values accessed outside observation
- nested observation is handled through the dedicated
flatMapLatest
function which handles the clumsy aspects
Maintaining a threadsafe dictionary of values
That was a look at the “subscriber” end of a dependency. Let’s look at the “sender” end by taking a look at a threadsafe dictionary of values.
You might use a dictionary as the “model” in a trivial app. Even if more sophisticated storage than a dictionary is required, the pattern of updating and notifying shown here should be the same in any well written app.
Solving the problem with standard Cocoa classes DispatchQueue
and NotificationCenter
might look like the following:
class DocumentValues {
typealias Dict = Dictionary<AnyHashable, Any>
typealias Tuple = (AnyHashable, Any?)
static let changed = Notification.Name("com.mycompany.mymodule.documentvalues.changed")
// Underlying storage protected by a `DispatchQueue` mutex
private var storage = Dict()
private let mutex = DispatchQueue(label: "")
init() {}
// Access to the storage involves copying out of the mutex
var values: Dict {
return mutex.sync {
return storage
}
}
// Remove a value and send a change notification
func removeValue(forKey key: AnyHashable) {
let latest = mutex.sync { () -> Dict in
storage.removeValue(forKey: key)
return storage
}
NotificationCenter.default.post(name: DocumentValues.changed, object: self,
userInfo: latest)
}
// Create/change a value and send a change notification
func setValue(_ value: Any, forKey key: AnyHashable) {
let latest = mutex.sync { () -> Dict in
storage[key] = value
return storage
}
NotificationCenter.default.post(name: DocumentValues.changed, object: self,
userInfo: latest)
}
}
I have been carefully copying values in and out of the mutex and it is memory safe in all cases. The class uses notifications so that other interfaces can receive updates.
What’s the problem?
Let’s look at the key failings, again:
- Bad behavior is encouraged: It is easy for another interface to access the current
values
property but it is additional work to properly observe theDocumentValues.changed
notification, so you’re encouraging dependent interfaces to forget to properly observe changes and fall out-of-sync. - There is no safe way to initialize and subscribe: If you get the
values
then start observing notifications, it’s possible that a change could occur between these two actions (causing you to lose a notification). If you observe notifications first, then get thevalues
, you might get a first notification before you’ve properly initialized.NSKeyValueObservingOptions.initial
could fix the problem for KVO but withNotification
s, you’d need some clever coding to work around this problem. - Prone to deadlocks: The
removeValue
function onstorage
deletes an arbitrary value inside a mutex. If there is adeinit
on this deleted value and thedeinit
tries to change theDocumentValues
(re-entering the mutex), you’ve created a deadlock. - It’s difficult to refactor: There’s no single point that all changes to the
storage
go through. If you need to add functionality in future – like writingDocumentValues
to a file on each change – you’d have to carefully integrate this change into multiple places. - No lifecycle notifications: If the
DocumentValues
object is deleted, it doesn’t notify this, by default.
Many of these are the same or similar problems to the previous example. As before, these problems can be solved through additional careful coding but as before, each solution would require additional code and additional complexity and more than that: you’d need to first realize that the problem exists; due to the subtlety of all of these problems, you might not notice any issues during testing.
An implementation using reactive programming would replace the values
property with a channel that emits the current value, followed by future updates, as a stream.
class DocumentValues {
typealias Dict = Dictionary<AnyHashable, Any>
typealias Tuple = (AnyHashable, Any?)
private let input: SignalInput<Tuple>
// Access to the data is via the signal.
public let signal: SignalMulti<Dict>
init() {
// Actual values storage is encapsulated within the signal
(self.input, self.signal) = Signal<Tuple>.channel()
// All updates pass through this single, common function.
.map(initialState: [:]) { (state: inout Dict, update: Tuple) in
switch update {
case (let key, .some(let value)): state[key] = value
case (let key, .none): state.removeValue(forKey: key)
}
return state
}
// Convert single `Signal` into multi-subscribable `SignalMulti` with `continuous`
.continuous(initialValue: [:]).tuple
}
func removeValue(forKey key: AnyHashable) {
input.send((key, nil))
}
func setValue(_ value: Any, forKey key: AnyHashable) {
input.send((key, value))
}
}
The code size is not significantly different (27 non-blank, non-comment lines before versus 23 after) but in this case, every problem mentioned above is solved implicitly.
- The same work is involved in accessing a value once or subscribing properly so good behavior is encouraged.
- If separate handling of initial value and subsequent values is required (e.g. using a
capture
andsubscribe
sequence as described in the previous article) the stream is correctly paused so you can’t miss a notification. - Everything is threadsafe (the
map
closure will never be concurrently invoked and re-entrancy is not possible) - All changes go through the
map
function and can be coordinated there. - A
SignalError.cancelled
message is automatically sent to subscribers ifinput
is released.
Associated with being “threadsafe”, notice that there are no longer any mutable variables in the class; state is encapsulated inside the Signal
.
Not only is this less code than the previous class but there are far fewer implementation mistakes you could make; code is more declarative and contained and there’s no mutex to carefully administer and work around.
An asynchronous task with a timeout
I originally gave the following code in a previous article, Testing Actions over Time. The code contains a class with a start
function that does two things:
- Invokes a
work
function, which takes a callback and invokes it on completion - Starts a timer, which, if it fires before the
work
function invokes its completion handler, cancels thework
function.
I’ve made the class a little complicated by allowing the user to call start
multiple times on the Service
class – possibly while a previous call to start
still has asynchronous tasks outstanding.
class TimeoutService {
struct Timeout: Error {}
var currentAction: Lifetime? = nil
// Define the interface for the underlying connection
typealias ResultHandler = (Result<String>) -> Void
typealias WorkFunction = (DispatchQueue, @escaping ResultHandler) -> Lifetime
// This is the configurable connection to the underlying service
let work: WorkFunction
// Every action for this service should occur in in this queue
let queue = DispatchQueue(label: "\(TimeoutService.self)")
// Construction of the Service lets us specify the underlying service
init(work: @escaping WorkFunction) {
self.work = work
}
// This `Service` invokes the `underlyingConnect` and starts a timer
func start(timeout seconds: Double, handler: @escaping ResultHandler) {
var previousAction: Lifetime? = nil
queue.sync {
previousAction = self.currentAction
let current = AggregateLifetime()
// Run the underlying connection
let underlyingAction = self.work(self.queue) { [weak current] result in
// Cancel the timer if the success occurs first
current?.cancel()
handler(result)
}
// Run the timeout timer
let timer = DispatchSource.singleTimer(interval: .interval(seconds), queue: self.queue) {
[weak current] in
// Cancel the connection if the timer fires first
current?.cancel()
handler(.failure(Timeout()))
} as! DispatchSource
current += timer
current += underlyingAction
self.currentAction = current
}
// Good rule of thumb: never release lifetime objects inside a mutex
withExtendedLifetime(previousAction) {}
}
}
Now, this code works and as far as I’m aware, there are no bugs. But it’s gigantic given that all it is doing is applying a timeout to an underlying function.
Most of the size is due to careful coding to avoid problems. After the start
function starts the work
and the timer
, it needs to aggregate the lifetimes of both and store in the currentAction
. There’s some careful handling of the previousAction
which is released outside the queue.sync
(to prevent re-entrancy deadlock problems). Both the underlyingAction
’s handler closure and the timer
’s handler closure need to access the current
lifetime (to cancel things properly) and since this lifetime retains the closures themselves, there’s some weak reference juggling going on too.
We shouldn’t need to carefully code around so many issues. We want the code smaller. We want it simpler.
Let’s try the whole thing with reactive programming.
struct TimeoutService {
let startWithTimeout: SignalMultiInput<DispatchTimeInterval>
let signal: SignalMulti<Result<String>>
init(asynchronousWork: @escaping () -> Signal<String>) {
(startWithTimeout, signal) = Signal<DispatchTimeInterval>.multiChannel()
.map { seconds in
asynchronousWork()
.timeout(interval: seconds)
.materialize()
}
.switchLatest()
.multicast()
.tuple
}
}
The difference is astounding; it doesn’t even resemble the original class. However, this class does the same thing, just over reactive programming channels, rather than over callbacks with state and mutexes. With reactive programming, all of the threading and lifetime management that caused a huge burden for the previous implementation are implicit – they’re still there but they’re just handled internally.
The two implementations aren’t identical. In this implementation, inputs and outputs follow reactive conventions. Instead of calling a start
function, the asynchronous work is started by sending a timeout time to the startWithTimeout
input. Instead of passing a callback function to the start
function to receive completion, we can subscribe to the signal
.
Getting comfortable with common reactive programming conventions and operators like switchLatest
and materialize
can take a little time but these functions follow the behavior of typical ReactiveX implementations so you can also consult that documentation to gain additional insight.
In case you think using a built-in function for the timeout
is cheating in this comparison, it is simple to replace the asynchronousWork().timeout(interval: seconds).materialize()
code with:
let timer = Signal<()>.timer(interval: .seconds(10))
return asynchronousWork().combine(second: timer) { eitherSignal, next in
switch eitherSignal {
case .result1(let r): next.send(result: r)
case .result2: next.send(error: MyErrors.timeout)
}
}.materialize()
It’s a little more verbose but still not complicated.
Conclusion
Reactive programming changes how data is stored, how it flows through your program and how the elements of your program are connected. The result is significant improvement across the following categories:
- Thread safety
- Coordinating concurrent asynchronous tasks
- Loose coupling of components
- Data dependencies
The biggest advantage comes when you realize that in applying a solution to just one of these problems, you’ve gained a solution to the other three for free.
For situations where the code already addresses these issues properly, reactive programming can deliver significant lines-of-code savings.
The end result is code that is easier to write and easier to maintain.
Looking forward…
There’s a lot of carefully written code in the CwlSignal library. Even if – despite all the discussion in this article – you’re not interested in reactive programming, there’s lots to learn about how CwlSignal solves problems internally like avoiding re-entrancy and integrating with Key-Value-Observing.
Appendix: A little bit of history
Even though the terms “reactive” and “reactive systems” have been in use in programming at least since Alan Kay’s 1969 paper The Reactive Engine, modern use of the term “reactive programming” really refers to ideas started in Conal Elliot and Paul Hudak’s 1997 paper Function Reactive Animation.
Functional reactive animation, later called functional reactive programming (FRP) is about “behaviors” (continuous, time-dependent equations like those describing the position of a moving object) and how they change and interact with “events” (one-off occurrences at an instantaneous time).
All of this might have been irrelevant to imperative programming languages since pull-driven, lazily evaluated “behaviors” (the primary output of functional reactive programming) are uninteresting for most imperative programming which can use push-driven stateful calculations to produce the same results in a simpler way.
It turns out, however, that the patterns that functional reactive programming provides for handling, transforming and responding to “events” (side effects that functional reactive programming tries to minimize) easily carry across to imperative languages and remain similarly useful for managing and responding to streams of discrete events – even when those streams are predominantly push driven, as in imperative programming, rather than lazily evaluated pull driven, as is typical in functional languages.
Thus we get “reactive programming” (note the absence of “functional”) which focuses on emitting, transforming and subscribing to streams of events.
The biggest difference between functional reactive and imperative reactive is ultimately “time”. In functional reactive programming, time is a parameter passed into each calculation. In imperative reactive programming, time may be pulled from the context but the actual time is usually less important than the order in which relevant mutexes are acquired. This means that functional reactive programming operates a bit more like a simulation (“if two objects should happen to interact in this way at this time…") whereas reactive programming is more like a simple ordering of blocks into a sequence by the order that they happen to be processed. One of the effect of this is that functional reactive programming can handle two events that are literally simultaneous whereas imperative reactive programming has no such notion of simultaneity.
While there may have been prior implementations of reactive programming in an imperative languages, the first popular implementation is the Reactive Extensions (Rx) for .NET which was first released in late 2009.
The Reactive Extensions largely mimic the .NET framework’s IObservable
as the primary metaphor. Along with integration of IDisposable
and ideas from IEnumerable
and LINQ. Many implementations of reactive programming on imperative platforms continue to employ terminology like “Observable” and “Disposable” for the core communication and lifetime protocols, respectively, revealing their lineage from the Reactive Extensions.
CwlSignal, a library for reactive programming
Reference counted releases in Swift