RxSwift Operators

Darren Sapalo

April 22nd, 2016

In the previous article, we talked about how the Rx framework for Swift could help in performing asynchronous tasks, creating an observable from a network request, dealing with streams of data, and handling errors and displaying successfully retrieved data elegantly on the main thread coming from the background thread. This article will talk about how to take advantage of the operators on observables to transform data.

Hot and Cold Observables

There are different ways to create observables, and we saw an example of it previously using the Observable.create method. Conveniently, RxSwift provides extensions to arrays: the Array.toObservable method.

var data = ["alpha" : ["title":"Doctor Who"], "beta" : ["title":"One Punch Man"]]
var dataObservable = data.toObservable()

Note however, that code inside the Observer.create method does not run when you call it. This is because it is a Cold Observable, meaning that it requires an observer to be subscribed on the observable before it will run the code segment defined in the Observable.create method. In the previous article, this means that running Observer.create won’t trigger the network query until an observer is subscribed to the Observable. IntroToRx provides a better explanation of Hot and Cold Observables in their article.

Rx Operators

When you begin to work with observables, you’ll realize that RxSwift provides numerous functions that encourages you to think of processing data as streams or sequences. For example, you might want to filter an array of numbers to only get the even numbers. You can do this using the filter operation on an observable.

var data = [1, 2, 3, 4, 5, 6, 7, 8]
var dataObservable = data.toObservable().filter{elem: Int -> Bool in
  return elem % 2 == 0
}

dataObservable.subscribeNext { elem: Int in
  print(“Element value: \(elem)”)
}

Chaining Operators

These operators can be chained together and is actually much more readable (and easier to debug) than a lot of nested code caused by numerous callbacks. For example, I might want to query a list of news articles, get only the ones above a certain date, and only take three to be displayed at a time.

API.rxGetAllNews()
   .filter{elem: News -> Bool in
     return elem.date.compare(dateParam) == NSOrderedDescending
   }
   .take(3)
   .subscribe(
     onNext: { elem: News in 
       print(elem.description)
     }
   }

Elegantly Handling Errors

Rx gives you the control over your data streams so that you can handle errors easier. For example, your network call might fail because you don’t have any network connection. Some applications would then work better if they default to the data available in their local device. You can check the type of error (e.g. no server response) and use an Rx Observable as a replacement for the stream and still proceed to do the same observer code.

API.rxGetAllNews()
   .filter{elem: News -> Bool in
     return elem.date.compare(dateParam) == NSOrderedDescending
   }
   .take(3)
   .catchError{ e: ErrorType -> Observable<Int> in 
     return LocalData.rxGetAllNewsFromCache()
   }
   .subscribe(
     onNext: { elem: News in 
       print(elem.description)
     }
   }

Cleaning up Your Data

One of my experiences wherein Rx was useful was when I was retrieving JSON data from a server but the JSON data had some items that needed to be merged.

The data looked something like below:

[ [“name”: “apple”, “count”: 4],
  [“name”: “orange”, “count”: 6],
  [“name”: “grapes”, “count”: 4], 
  [“name”: “flour”, “count”: 2],
  [“name”: “apple”, “count”: 7], 
  [“name”: “flour”, “count”: 1.3]
]

The problem is, I need to update my local data based on the total of these quantities, not create multiple rows/instances in my database! What I did was first transform the JSON array entries into an observable, emitting each element.

class func dictToObservable(dict: [NSDictionary]) -> Observable<NSDictionary> {
  return Observable.create{ observer in
    dict.forEach({ (e:NSDictionary) -> () in
      observer.onNext(e)
    })
    observer.onCompleted()
    return NopDisposable.instance
  }
}

Afterwards, I called the observable, and performed a reduce function to merge the data.

class func mergeDuplicates(dict: [NSDictionary]) -> Observable<[NSMutableDictionary]>{
  let observable = dictToObservable(dict) as Observable<NSDictionary>

  return observable.reduce([],
  accumulator: { (var result, elem: NSDictionary) -> [NSMutableDictionary] in
    let filteredSet = result.filter({ (filteredElem: NSDictionary) -> Bool in
	  return filteredElem.valueForKey("name") as! String == elem.valueForKey("name") as! String
    })

    if filteredSet.count > 0 {
      if let element = filteredSet.first {
          let a = NSDecimalNumber(decimal: (element.valueForKey("count") as! NSNumber).decimalValue)
          let b = NSDecimalNumber(decimal: (elem.valueForKey("count") as! NSNumber).decimalValue)
	  element.setValue(a.decimalNumberByAdding(b), forKey: "count")
      }
    } else {
      let m = NSMutableDictionary(dictionary: elem)
      m.setValue(NSDecimalNumber(decimal: (elem.valueForKey("count") as! NSNumber).decimalValue), forKey: "count")
      result.append(m)
    }

   return result
   })
}

I created an accumulator variable, which I initialized to be [], an empty array. Then, for each element emitted by the observable, I checked if the name already exists in the accumulator (result) by filtering through the result to see if a name exists already.

If the filteredSet returns a value greater than zero that means it already exists. That means that ‘element’ is the instance inside the result whose count should be updated, which ultimately updates my accumulator (result).

If it doesn’t exist, then a new entry is added to the result. Once all entries are finished, the accumulator (result) is returned to be used by the next emission, or the final result after processing the data sequence.

Where Do I Go From Here?

The Rx community is slowly growing with more and more people contributing to the documentation and bringing it to their languages and platforms.

I highly suggest you go straight to their website and documentation for a more thorough introduction to their framework. This gentle introduction to Rx was meant to prepare you for the wealth of knowledge and great design patterns they have provided in the documentation!

If you’re having difficulty understanding streams, sequences, and what the operators do, RxMarbles.com provides interactive diagrams for some of the Rx operators. It’s an intuitive way of playing with Rx without touching code with only a higher level of understanding. Go check them out! RxMarbles is also available on the Android platform.

About the Author

Darren Sapalo is a software developer, an advocate for UX, and a student taking up his Master's degree in Computer Science. He enjoyed developing games on his free time when he was twelve. Finally finished with his undergraduate thesis on computer vision, he took up some industry work with Apollo Technologies Inc. developing for both the Android and iOS platforms.