(For more resources related to this topic, see here.)
Parallelizing processing with pmap
The easiest way to parallelize data is to take a loop we already have and handle each item in it in a thread.
That is essentially what pmap does. If we replace a call to map with pmap, it takes each call to the function argument and executes it in a thread pool. pmap is not completely lazy, but it's not completely strict, either: it stays just ahead of the output consumed. So if the output is never used, it won't be fully realized.
For this recipe, we'll calculate the Mandelbrot set. Each point in the output takes enough time that this is a good candidate to parallelize. We can just swap map for pmap and immediately see a speedup.
How to do it...
The Mandelbrot set can be found by looking for points that don't settle on a value after passing through the formula that defines the set quickly.

We need a function that takes a point and the maximum number of iterations to try and return the iteration that it escapes on. That just means that the value gets above 4.
(defn getescapepoint [scaledx scaledy maxiterations] (loop [x 0, y 0, iteration 0] (let [x2 (* x x), y2 (* y y)] (if (and (< (+ x2 y2) 4) (< iteration maxiterations)) (recur (+ ( x2 y2) scaledx) (+ (* 2 x y) scaledy) (inc iteration)) iteration))))

The scaled points are the pixel points in the output, scaled to relative positions in the Mandelbrot set. Here are the functions that handle the scaling. Along with a particular xy coordinate in the output, they're given the range of the set and the number of pixels each direction.
(defn scaleto ([pixel maximum [lower upper]] (+ (* (/ pixel maximum) (Math/abs ( upper lower))) lower))) (defn scalepoint ([pixelx pixely maxx maxy setrange] [(scaleto pixelx maxx (:x setrange)) (scaleto pixely maxy (:y setrange))]))

The function outputpoints returns a sequence of x, y values for each of the pixels in the final output.
(defn outputpoints ([maxx maxy] (let [rangey (range maxy)] (mapcat (fn [x] (map #(vector x %) rangey)) (range maxx)))))

For each output pixel, we need to scale it to a location in the range of the Mandelbrot set and then get the escape point for that location.
(defn mandelbrotpixel ([maxx maxy maxiterations setrange] (partial mandelbrotpixel maxx maxy maxiterations setrange)) ([maxx maxy maxiterations setrange [pixelx pixely]] (let [[x y] (scalepoint pixelx pixely maxx maxy setrange)] (getescapepoint x y maxiterations))))

At this point, we can simply map mandelbrotpixel over the results of outputpoints. We'll also pass in the function to use (map or pmap).
(defn mandelbrot ([mapper maxiterations maxx maxy setrange] (doall (mapper (mandelbrotpixel maxx maxy maxiterations setrange) (outputpoints maxx maxy)))))

Finally, we have to define the range that the Mandelbrot set covers.
(def mandelbrotrange {:x [2.5, 1.0], :y [1.0, 1.0]})
How do these two compare? A lot depends on the parameters we pass them.
user=> (def m (time (mandelbrot map 500 1000 1000 mandelbrotrange))) "Elapsed time: 28981.112 msecs" #'user/m user=> (def m (time (mandelbrot pmap 500 1000 1000 mandelbrotrange))) "Elapsed time: 34205.122 msecs" #'user/m user=> (def m (time (mandelbrot map 1000 10001000 mandelbrotrange))) "Elapsed time: 85308.706 msecs" #'user/m user=> (def m (time (mandelbrot pmap 1000 10001000 mandelbrotrange))) "Elapsed time: 49067.584 msecs" #'user/m
Refer to the following chart:
If we only iterate at most 500 times for each point, it's slightly faster to use map and work sequentially. However, if we iterate 1,000 times each, pmap is faster.
How it works...
This shows that parallelization is a balancing act. If each separate work item is small, the overhead of creating the threads, coordinating them, and passing data back and forth takes more time than doing the work itself. However, when each thread has enough to do to make it worth it, we can get nice speedups just by using pmap.
Behind the scenes, pmap takes each item and uses future to run it in a thread pool. It forces only a couple more items than you have processors, so it keeps your machine busy, without generating more work or data than you need.
There's more...
For an indepth, excellent discussion of the nuts and bolts of pmap, along with pointers about things to watch out for, see David Liebke's talk, From Concurrency to Parallelism (http://blip.tv/clojure/davidliebkefromconcurrencytoparallelism4663526).
See also

The Partitioning Monte Carlo Simulations for better pmap performance recipe
Parallelizing processing with Incanter
One of its nice features is that it uses the Parallel Colt Java library (http://sourceforge.net/projects/parallelcolt/) to actually handle its processing, so when you use a lot of the matrix, statistical, or other functions, they're automatically executed on multiple threads.
For this, we'll revisit the Virginia housingunit census data and we'll fit it to a linear regression.
Getting ready
We'll need to add Incanter to our list of dependencies in our Leiningen project.clj file:
:dependencies [[org.clojure/clojure "1.5.0"] [incanter "1.3.0"]]
We'll also need to pull those libraries into our REPL or script:
(use '(incanter core datasets io optimize charts stats))
We can use the following filename:
(def datafile "data/all_160_in_51.P35.csv")
How to do it...
For this recipe, we'll extract the data to analyze and perform the linear regression. We'll then graph the data afterwards.

First, we'll read in the data and pull the population and housing unit columns into their own matrix.
(def data (tomatrix (sel (readdataset datafile :header true) :cols [:POP100 :HU100])))

From this matrix, we can bind the population and the housing unit data to their own names.
(def population (sel data :cols 0)) (def housingunits (sel data :cols 1))

Now that we have those, we can use Incanter to fit the data.
(def lm (linearmodel housingunits population))

Incanter makes it so easy, it's hard not to look at it.
(def plot (scatterplot population housingunits :legend true)) (addlines plot population (:fitted lm)) (view plot)
Here we can see that the graph of housing units to families makes a very straight line:
How it works…
Under the covers, Incanter takes the data matrix and partitions it into chunks. It then spreads those over the available CPUs to speed up processing. Of course, we don't have to worry about this. That's part of what makes Incanter so powerful.
Partitioning Monte Carlo simulations for better pmap performance
In the Parallelizing processing with pmap recipe, we found that while using pmap is easy enough, knowing when to use it is more complicated. Processing each task in the collection has to take enough time to make the costs of threading, coordinating processing, and communicating the data worth it. Otherwise, the program will spend more time concerned with how (parallelization) and not enough time with what (the task).
The way to get around this is to make sure that pmap has enough to do at each step that it parallelizes. The easiest way to do that is to partition the input collection into chunks and run pmap on groups of the input.
For this recipe, we'll use Monte Carlo methods to approximate pi . We'll compare a serial version against a naïve parallel version against a version that uses parallelization and partitions.
Getting ready
We'll use Criterium to handle benchmarking, so we'll need to include it as a dependency in our Leiningen project.clj file, shown as follows:
:dependencies [[org.clojure/clojure "1.5.0"] [criterium "0.3.0"]]
We'll use these dependencies and the java.lang.Math class in our script or REPL.
(use 'criterium.core) (import [java.lang Math])
How to do it…
To implement this, we'll define some core functions and then implement a Monte Carlo method for estimating pi that uses pmap.

We need to define the functions necessary for the simulation. We'll have one that generates a random twodimensional point that will fall somewhere in the unit square.
(defn randpoint [] [(rand) (rand)])

Now, we need a function to return a point's distance from the origin.
(defn centerdist [[x y]] (Math/sqrt (+ (* x x) (* y y))))

Next we'll define a function that takes a number of points to process, and creates that many random points. It will return the number of points that fall inside a circle.
(defn countincircle [n] (>> (repeatedly n randpoint) (map centerdist) (filter #(<= % 1.0)) count))

That simplifies our definition of the base (serial) version. This calls countincircle to get the proportion of random points in a unit square that fall inside a circle. It multiplies this by 4, which should approximate pi.
(defn mcpi [n] (* 4.0 (/ (countincircle n) n)))

We'll use a different approach for the simple pmap version. The function that we'll parallelize will take a point and return 1 if it's in the circle, or 0 if not. Then we can add those up to find the number in the circle.
(defn incircleflag [p] (if (<= (centerdist p) 1.0) 1 0)) (defn mcpipmap [n] (let [incircle (>> (repeatedly n randpoint) (pmap incircleflag) (reduce + 0))] (* 4.0 (/ incircle n))))

For the version that chunks the input, we'll do something different again. Instead of creating the sequence of random points and partitioning that, we'll have a sequence that tells how large each partition should be and have pmap walk across that, calling countincircle. This means that creating the larger sequences are also parallelized.
(defn mcpipart ([n] (mcpipart 512 n)) ([chunksize n] (let [step (int (Math/floor (float (/ n chunksize)))) remainder (mod n chunksize) parts (lazyseq (cons remainder (repeat step chunksize))) incircle (reduce + 0 (pmap countincircle parts))] (* 4.0 (/ incircle n)))))
Now, how do these work? We'll bind our parameters to names, and then we'll run one set of benchmarks before we look at a table of all of them. We'll discuss the results in the next section.
user=> (def chunksize 4096) #'user/chunksize user=> (def inputsize 1000000) #'user/inputsize user=> (quickbench (mcpi inputsize)) WARNING: Final GC required 4.001679309213317 % of runtime Evaluation count : 6 in 6 samples of 1 calls. Execution time mean :634.387833 ms Execution time stddeviation : 33.222001 ms Execution time lower quantile : 606.122000 ms ( 2.5%) Execution time upper quantile : 677.273125 ms (97.5%) nil
Here's all the information in the form of a table:
Function 
Input Size 
Chunk Size 
Mean 
Std Dev. 
GC Time 
mcpi 
1,000,000 
NA 
634.39ms 
33.22 ms 
4.0%

mcpipmap 
1,000,000 
NA 
1.92 sec 
888.52 ms 
2.60%

mcpipart 
1,000,000 
4,096 
455.94 ms 
4.19 ms 
8.75%

Here's a chart with the same information:
How it works…
There are a couple of things we should talk about here. Primarily, we'll need to look at chunking the inputs for pmap, but we should also discuss Monte Carlo methods.
Estimating with Monte Carlo simulations
Monte Carlo simulations work by throwing random data at a problem that is fundamentally deterministic, but when it's practically infeasible to attempt a more straightforward solution. Calculating pi is one example of this. By randomly filling in points in a unit square, p/4 will be approximately the ratio of points that will fall within a circle centered on 0, 0. The more random points that we use, the better the approximation.
I should note that this makes a good demonstration of Monte Carlo methods, but it's a terrible way to calculate pi. It tends to be both slower and less accurate than the other methods.
Although not good for this task, Monte Carlo methods have been used for designing heat shields, simulating pollution, ray tracing, financial option pricing, evaluating business or financial products, and many, many more things.
For a more indepth discussion, Wikipedia has a good introduction to Monte Carlo methods at http://en.wikipedia.org/wiki/Monte_Carlo_method.
Chunking data for pmap
The table we saw earlier makes it clear that partitioning helped: the partitioned version took just 72 percent of the time that the serial version did, while the naïve parallel version took more than three times longer. Based on the standard deviations, the results were also more consistent.
The speed up is because each thread is able to spend longer on each task. There is a performance penalty to spreading the work over multiple threads. Context switching (that is, switching between threads) costs time, and coordinating between threads does as well. But we expect to be able to make that time and more up by doing more things at once. However, if each task itself doesn't take long enough, then the benefit won't outweigh the costs. Chunking the input—and effectively creating larger individual tasks for each thread— gets around this by giving each thread more to do, and thereby spending less time context switching and coordinating, relative to the overall time spent running.
Finding the optimal partition size with simulated annealing
In the last recipe, Partitioning Monte Carlo simulations for better pmap performance , we more or less guessed what would make a good partition size. We tried a few different values and saw what gives us the best result. However, it's still largely guesswork, since just making the partitions larger or smaller doesn't give consistently better or worse results.
This is the type of task that computers are good at: searching a complex space to find the function parameters that results in an optimal output value. For this recipe, we'll use a fairly simple optimization algorithm called simulated annealing .Like many optimization algorithms, this one is based on a natural process: the way that molecules settle into lowenergy configurations as the temperature drops to freezing. This is what allows water to form efficient crystal lattices as it freezes.
In simulated annealing, we feed a state to a cost function. At each point, we evaluate a random neighboring state, and possibly move to it. As the energy in the system (the temperature) goes down, we are less likely to jump to a new state, especially if that state is worse than the current one, according to the cost function. Finally, after either reaching a target output or iterating through a set number of steps, we take the best match found. Like many optimization algorithms, this doesn't guarantee that the result will be the absolute best match, but it should be a good one.
For this recipe, we'll use the Monte Carlo pi approximation function that we did in the Partitioning Monte Carlo simulations for better pmap performance recipe, and we'll use simulated annealing to find a better partition size.
Getting ready
We'll need to use the same dependencies, uses, imports, and functions as we did in the Partitioning Monte Carlo simulations for better pmap performance recipe. In addition, we'll also need the mcpipart function from that recipe.
How to do it...
For this recipe, we'll first define a generic simulated annealing system, and then we'll define some functions to pass it as parameters.

Everything will be driven by the simulated annealing function that takes all the function parameters for the process as arguments. We'll discuss them in more detail in a minute.
(defn annealing [initial maxiter maxcost neighborfn costfn pfn tempfn] (let [getcost (memoize costfn) cost (getcost initial)] (loop [state initial cost cost k 1 bestseq [{:state state, :cost cost}]] (println '>>> 'sa k \. state \$ cost) (if (and (< k maxiter) (or (nil? maxcost) (> cost maxcost))) (let [t (tempfn (/ k maxiter)) nextstate (neighborfn state) nextcost (getcost nextstate) nextplace {:state nextstate, :cost nextcost}] (if (> (pfn cost nextcost t) (rand)) (recur nextstate nextcost (inc k) (conj bestseq nextplace)) (recur state cost (inc k) bestseq))) bestseq))))

For parameters, annealing takes an initial state, a limit to the number of iterations, a target output value, and a series of functions. The first function takes the current state and returns a new neighboring state.
To write this function, we have to decide how best to handle the state for this problem. Often, if the function to evaluate has multiple parameters we'd use a vector and randomly slide one value in that around. However, for this problem, we only have one input value, the partition size.
So for this problem, we'll instead use an integer between 0 and 20. The actual partition size will be 2 raised to that power. To find a neighbor, we just randomly slide the state value up or down at most five, within the range of 0 to 20.
(defn getneighbor [state] (max 0 (min 20 (+ state ( (randint 11) 5)))))

The next function parameter for annealing is the cost function. This will take the state and return the value that we're trying to minimize. In this case, we benchmark mcpipart with the given partition size (2 raised to the power) and return the average time.
(defn getpicost [n state] (let [chunksize (long (Math/pow 2 state))] (first (:mean (quickbenchmark (mcpipart chunksize n))))))

The next function takes the current state's cost, a potential new state's cost, and the current energy in the system (from 0 to 1). It returns the odds that the new state should be used. Currently, this will skip to an improved state always, or to a worse state 25 percent of the time (both of these are prorated by the temperature).
(defn shouldmove [c0 c1 t] (* t (if (< c0 c1) 0.25 1.0)))

The final function parameter takes the current percent through the iteration count and returns the energy or temperature as a number from 0 to 1. This can use a number of easing functions, but for this we'll just use a simple linear one.
(defn gettemp [r] ( 1.0 (float r)))
That's it. We can let this find a good partition size. We'll start with the value that we used in the Partitioning Monte Carlo simulations for better pmap performance recipe. We'll only allow ten iterations, since the search space is relatively small.
user=> (annealing 12 10 nil getneighbor #_=> (partial getpicost 1000000) #_=> shouldmove gettemp) >>> sa 1 . 12 $ 0.5805938333333334 >>> sa 2 . 8 $ 0.38975950000000004 >>> sa 3 . 8 $ 0.38975950000000004 >>> sa 4 . 8 $ 0.38975950000000004 >>> sa 5 . 8 $ 0.38975950000000004 >>> sa 6 . 8 $ 0.38975950000000004 >>> sa 7 . 6 $ 0.357514 >>> sa 8 . 6 $ 0.357514 >>> sa 9 . 6 $ 0.357514 >>> sa 10 . 6 $ 0.357514 [{:state 12, :cost 0.5805938333333334} {:state 8, :cost 0.38975950000000004} {:state 6, :cost 0.357514}]
We can see that a partition size of 64 (26) is the best time, and rerunning the benchmarks verifies this.
How it works…
In practice, this algorithm won't help if we run it over the full input data. But if we can get a large enough sample, this can help us process the full dataset more efficiently by taking a lot of the guesswork out of picking the right partition size for the full evaluation.
What we did was kind of interesting. Let's take the annealing function apart to see how it works.

The process is handled by a loop inside the annealing function. Its parameters are a snapshot of the state of the annealing process.
(loop [state initial cost cost k 1 bestseq [{:state state, :cost cost}]]

We only continue if we need more iterations or if we haven't bested the maximum cost.
(if (and (< k maxiter) (or (nil? maxcost) (> cost maxcost)))

If we continue, we calculate the next energy and get a potential state and cost to evaluate.
(let [t (tempfn (/ k maxiter)) nextstate (neighborfn state) nextcost (getcostcache nextstate) nextplace {:state nextstate, :cost nextcost}]

If the probability function (shouldmove, in this case) indicates so, we move to the next state and loop. Otherwise, we stay at the current state and loop.
(if (> (pfn cost nextcost t) (rand)) (recur nextstate nextcost (inc k) (conj bestseq nextplace)) (recur state cost (inc k) bestseq)))

If we're done, we return the sequence of best states and costs seen.
bestseq)))))
This provides a systematic way to explore the problem space: in this case to find a better partition size for this problem.
There's more…
Simulated annealing is one of a class of algorithms known as optimization algorithms. All of these take a function (the costfn function that we saw) and try to find the largest or smallest value for it. Other optimization algorithms include genetic algorithms, ant colony optimization, particle swarm optimization, and many others. This is a broad and interesting field, and being familiar with these algorithms can be helpful for anyone doing data analysis.
Parallelizing with reducers
Clojure 1.5 introduced the clojure.core.reducers library. This library provides a lot of interesting and exciting features, including composing multiple calls to map and other sequenceprocessing highorder functions and abstracting map and other functions for different types of collections while maintaining the collection type.
Looking at the following chart, initial operations on individual data items such as map and filter operate on items of the original dataset. Then the output of the operations on the items are combined using a reduce function. Finally, the outputs of the reduction step are progressively combined until the final result is produced. This could involve a reducetype operation such as addition, or an accumulation, such as the into function.
Another feature of reducers is that they can automatically partition and parallelize the processing of treebased data structures. This includes Clojure's native vectors and hash maps.
For this recipe, we'll continue the Monte Carlo simulation example that we started in the Partitioning Monte Carlo simulations for better pmap performance recipe. In this case, we'll write a version that uses reducers and see how it performs.
Getting ready
From the Partitioning Monte Carlo simulations for better pmap performance recipe, we'll use the same imports, as well as the randpoint function, the centerdist function, and the mcpi function.
Along with these, we'll also need to require the reducers and Criterium libraries:
(require '[clojure.core.reducers :as r]) (use 'criterium.core)
Also, if you're using Java 1.6, you'll need the ForkJoin library, which you can get by adding this to your project.clj dependencies:
[org.codehaus.jsr166mirror/jsr166y "1.7.0"]
How to do it…
This version of the Monte Carlo pi approximation algorithm will be structured similarly to how mcpi was in the Partitioning Monte Carlo simulations for better pmap performance recipe. First, we'll defi ne a countincircler function that uses the reducers library to compose the processing and spread it over the available cores.
(defn countitems [c _] (inc c)) (defn countincircler [n] (>> (repeatedly n randpoint) vec (r/map centerdist) (r/filter #(<= % 1.0)) (r/fold + countitems))) (defn mcpir [n] (* 4.0 (/ (countincircler n) n)))
Now, we can use Criterium to compare the two functions.
user=> (quickbench (mcpi 1000000)) WARNING: Final GC required 3.023487696312759 % of runtime Evaluation count : 6 in 6 samples of 1 calls. Execution time mean :1.999605 sec Execution time stddeviation : 217.056295 ms Execution time lower quantile : 1.761563 sec ( 2.5%) Execution time upper quantile : 2.235991 sec (97.5%) nil user=> (quickbench (mcpir 1000000)) WARNING: Final GC required 6.398394257011045 % of runtime Evaluation count : 6 in 6 samples of 1 calls. Execution time mean :947.908000 ms Execution time stddeviation : 306.273266 ms Execution time lower quantile : 776.947000 ms ( 2.5%) Execution time upper quantile : 1.477590 sec (97.5%) Found 1 outliers in 6 samples (16.6667 %) lowsevere 1 (16.6667 %) Variance from outliers : 81.6010 % Variance is severely inflated by outliers nil
Not bad. The version with reducers is over 50 percent faster than the serial one. This is more impressive because we've made relatively minor changes to the original code, especially compared to the version of this algorithm that partitioned the input before passing it to pmap, which we also saw in the Partitioning Monte Carlo simulations for better pmap performance recipe.
How it works…
The reducers library does a couple of things in this recipe. Let's look at some lines from countincircler. Converting the input to a vector was important, because vectors can be parallelized, but generic sequences cannot be.
Next, these two lines are composed into one reducer function that doesn't create an extra sequence between the call to r/map and r/filter . This is a small, but important, optimization, especially if we'd stacked more functions into this stage of the process.
(r/map centerdist) (r/filter #(<= % 1.0))
The bigger optimization is in the line for r/fold. r/reduce processes serially always, but if the input is a treebased data structure, r/fold will employ a forkjoin pattern to parallelize it. This line takes the place of a call to count by incrementing a counter function for every item in the sequence so far.
(r/fold + countitems))))
Graphically this process looks something like the following chart:
The reducers library is still fairly new to Clojure, but it has a lot of promise to automatically parallelize structured operations with a control and simplicity that we haven't seen elsewhere.
There's more...
For more about reducers, see Rich Hickey's blog posts at http://clojure.com/ blog/2012/05/08/reducersalibraryandmodelforcollectionprocessing.html and http://clojure.com/blog/2012/05/15/anatomyofreducer.html. Also, his presentation on reducers for EuroClojure 2012 (http://vimeo.com/45561411) has a lot of good information.
See also
 The Generating online summary statistics with reducers recipe
Generating online summary statistics with reducers
We can use reducers in a lot of different situations, but sometimes we'll need to change how we process data to do so.
For this example, we'll show how to compute summary statistics with reducers. We'll use some algorithms and formulas first proposed by Tony F. Chan, Gene H. Golub, and Randall J. LeVeque in 1979 and later extended by Timothy B. Terriberry in 2007. These allow us to approximate mean, standard deviation, and skew for online data—that is, for streaming data that we may only see once—so we'll need to compute all the statistics on one pass without holding the full collection in memory.
The following formulae are a little complicated and difficult to read in lispnotation. But there's a good overview of this process, with formulae, on the Wikipedia page for Algorithms for calculating variance (http://en.wikipedia.org/wiki/Algorithms_for_ calculating_variance). And to simplify this example somewhat, we'll only calculate the mean and variance.
Getting ready
For this, we'll need to have easy access to the reducers library and the Java Math class.
(require '[clojure.core.reducers :as r]) (import '[java.lang Math])
How to do it…
For this recipe, first we'll define the accumulator data structures and then the accumulator functions. Finally, we'll put it all together.

We need to define a data structure to store all the data that we want to accumulate and keep track of.
(def zerocounts {:n (long 0), :s 0.0, :mean 0.0, :m2 0.0})

Now, we'll need some way to add a datum to the counts and accumulation. The function accumcounts will take care of this.
(defn accumcounts ([] zerocounts) ([{:keys [n mean m2 s] :as accum} x] (let [newn (long (inc n)) delta ( x mean) deltan (/ delta newn) term1 (* delta deltan n) newmean (+ mean deltan)] {:n newn :mean newmean :s (+ s x) :m2 (+ m2 term1)})))

Next, we'll need a way to combine two accumulators. This has the complete, unsimplified versions of the formulae from accumcounts. Because some of the numbers can get very large and overflow the range of the primitive Java types, we'll use *'. This is a variant of the multiplication operator that automatically promotes values into Java's BigInteger types instead of overflowing.
(defn opfields "A utility function that calls a function on the values of a field from two maps." [op field item1 item2] (op (field item1) (field item2))) (defn combinecounts ([] zerocounts) ([xa xb] (let [n (long (opfields + :n xa xb)) delta (opfields  :mean xb xa) nxa*xb (*' (:n xa) (:n xb))] {:n n :mean (+ (:mean xa) (* delta (/ (:n xb) n))) :s (opfields + :s xa xb) :m2 (+ (:m2 xa) (:m2 xb) (* delta delta (/ nxa*xb n)))})))

Now we need a way to take the accumulated counts and values and turn them into the final statistics.
(defn statsfromsums [{:keys [n mean m2 s] :as sums}] {:mean (double (/ s n)) :variance (/ m2 (dec n))})

Finally, we combine all these functions to produce results.
(defn summarystatistics [coll] (statsfromsums (r/fold combinecounts accumcounts coll)))
For a pointless example, we can use this to find summary statistics on 1,000,000 random numbers:
user=> (summarystatistics (repeatedly 1000000 rand)) {:mean 0.5004908831693459, :variance 0.08346136740444697}
Harnessing your GPU with OpenCL and Calx
For calculations involving matrixes and floating point math, in today's computers our best option is executing them on the graphical processing unit, or GPU. Because these have been so highly tuned for 3D shading and rendering, they can handle these operations very quickly, sometimes an order of magnitude more quickly than general CPUs can.
But programming GPUs is a little different than general programming. For the most part, we're stuck coding in a subset of C with very specific parameters for the parts of the process that are handled by the GPU. There are some projects that convert Java bytecode to GPU code (https://github.com/pcpratts/rootbeer1, http://www.jocl.org/, or http://code.google.com/p/aparapi/). Unfortunately, at this time, none of them support using a dynamic JVM language, such as Clojure.
For this recipe, we'll use the Calx library (https://github.com/ztellman/calx/). This project has a warning about not being under active development, but it's already usable. This is a wrapper around an OpenCL (http://www.khronos.org/opencl/) library, which supports a wide range of video card vendors.
In general, you'll receive the most payoff from the GPU when doing floating point math, especially vector and matrix operations. Because of this, we'll once again calculate the Mandelbrot set. This will be an example that would see improvement from running on the GPU and having an existing implementation to compare to. This will also allow us to see just how much of a speed increase we're getting.
Getting ready
We need to include a dependency on Calx in our project.clj file.
:dependencies [[org.clojure/clojure "1.5.0"] [calx "0.2.1"]]
And we'll need to import Calx and java.lang.Math into our REPL or script.
(use 'calx) (import [java.lang Math])
We'll also use the outputpoints function from the Parallelizing processing with pmap recipe.
How to do it...
Even using Calx, most of this has to be in C. This is encoded as a string that is processed and compiled by Calx.
(def src "// scale from 2.5 to 1. float scale_x(float x) { return (x / 1000.0) * 3.5  2.5; } // scale from 1 to 1. float scale_y(float y) { return (y / 1000.0) * 2.0  1.0; } __kernel void escape( __global float *out) { int i = get_global_id(0); int j = get_global_id(1); int index = j * get_global_size(0) + i; float point_x = scale_x(i); float point_y = scale_y(j); int max_iterations = 1000; int iteration = 0; float x = 0.0; float y = 0.0; while (x*x + y*y <= 4 && iteration <max_iterations) { float tmp_x = (x*x  y*y) + point_x; y = (2 * x * y) + point_y; x = tmp_x; iteration++; } out[index] = iteration; }")
We'll also need a function to handle compiling the source code and passing data around.
(defn main [] (let [maxx 1000, maxy 1000] (withcl (withprogram (compileprogram src) (time (let [out (wrap (flatten (outputpoints maxx maxy)) :float32le)] (enqueuekernel :escape (* maxx maxy) out) (let [outseq (vec @(enqueueread out))] (spit "mandelbrotout.txt" (prnstr outseq)) (println "Calculated on " (platform) "/" (bestdevice)) (println "Output written to mandelbrotout.txt"))))))))
If we run this, we'll see how fast it is.
user=> (main) Calculated on #<CLPlatform Apple {vendor: Apple, version: OpenCL 1.0 (Dec 26 2010 12:52:21), profile: FULL_PROFILE, extensions: []}> / #<CLDevice ATI Radeon HD 6750M> Output written to mandelbrotout.txt "Elapsed time: 9659.691 msecs" nil
We can compare that 9.7 seconds to the parallel version of the Mandelbrot set program that we wrote in the Parallelizing processing with pmap recipe.
user=> (def mset (time (mandelbrot pmap maxiterations maxx maxy #_=> mandelbrotrange))) "Elapsed time: 19126.335 msecs" #'user/mset
That's about twice as fast. This was a very invasive change, but the results are quite good.
How it works...
There are two parts to this. First, the C code and GPU processing in general, and then the Calx interface to the GPU.
Writing the GPU code in C
A full explanation—or for that matter, even just an introduction— of GPU programming is beyond the scope of this recipe, so we'll just try to understand what's going on in this specific example. See the There's more... section of this recipe for where to go to learn more.
First, the code to run on the GPU needs to be marked __kernel . Notice that we're just passing one parameter into this function, the output array. The GPU function, escape, will be called many times, based upon the shape of the data.
Each time the GPU calls escape, we have to figure out which index into the data it's executing on. We do that by calling get_global_id (dimension). Instead of scaling the pixel point to the Mandelbrot range in Clojure, I've moved those functions into C (scale_x and scale_y). Otherwise, the code is more or less the same as we were doing before to calculate the Mandelbrot set, just in C.
This pattern—let the GPU handle the looping and only code the inner part of the loop—is generally how we'll structure everything that runs on the GPU.
Wrapping it in Calx
While this all may seem very lowlevel, Calx is still taking a lot of the pain out of GPU processing. Let's look at what else we have to do. After compiling everything in the call to compileprogram, the –main function breaks the process down into three steps. First, we need to move the data over to the GPU. This is done by converting the list of pixel coordinates to a float array on the GPU.
(let [out (wrap (flatten (outputpoints maxx maxy)) :float32le)]
Next, we queue the escape function written in C for processing on the GPU. We tell it how many data items we'll need and the parameters to pass to it (in this case, only the float array we just allocated).
(enqueuekernel :escape (* maxx maxy) out)
Finally, we queue a read operation on the out array, so that we can access its data. This returns a reference, which will block when we dereference it until the data is available.
(let [outseq (vec @(enqueueread out))]
GPU processing can be a bit different than the generalpurpose programming we're probably used to, but as we saw, it also allows us to get some spectacular performance improvements.
There's more...
To go further with the GPU, you'll want to find some more material. Personally, I found AMD's tutorial to be helpful (http://developer.amd.com/tools/heterogeneouscomputing/ amdacceleratedparallelprocessingappsdk/introductorytutorial toopencl/). There are a number of other tutorials out on the Internet, and there seemed to be a lot of variance in them. Google can help you there.
And as usual, your mileage may vary, so start on a few tutorials until you find one that makes sense to you.
Summary
In this article we saw different recipes showing us how parallel processing is a way of getting better performance that has implications in how we structure our programs. Thus parallelization is good for doing the same task many, many times, all at once.
Resources for Article :
Further resources on this subject:
 Introduction to Parallel Programming and CUDA with Sample Code [Article]
 Simplifying Parallelism Complexity in C# [Article]
 Oracle BI Publisher 11g: Working with Multiple Data Sources [Article]