Claypoole: Threadpool tools for Clojure
The claypoole library provides threadpool-based parallel versions of Clojure functions such as
Claypoole gives us tools to deal with common parallelism issues by letting us use and manage our own threadpools (a.k.a. thread pools). Our blog post gives a nice overview of the project and its motivations.
Also, if you like learning via video, Leon gave a talk at Clojure West 2015. It gives an intro to how Clojure parallelism works, including describing some of the motivations for Claypoole. It describes what advantages Claypoole gives and also talks about some of the other useful tools for Clojure parallelism (e.g. core.async, reducers, and Tesser).
Clojure has some nice tools for simple parallelism, but they're not a complete solution for doing complex things (such as controlling the level of parallelism). Instead, people tend to fall back on Java. For instance, on http://clojure.org/concurrent_programming, the recommendation is to create an
ExecutorServiceand call its
reducersis elegant, but we can't control its level of parallelism.
Essentially, we wanted a
pmapfunction that improves on the original in several ways:
pmapuses, so it would be tunable for non-CPU-bound tasks like network requests.
pmaps to control the amount of simultaneous work we're doing.
com.climate.claypoole.lazy. See the section Lazy below.
pmap, so that we can start handling the first response as fast as possible.
To use claypoole, make a threadpool via
threadpooland use it with claypoole's version of one of these standard Clojure functions:
Instead of lazy sequences, our functions will return eagerly streaming sequences. Such a sequence basically looks like
(map deref futures)with a thread in the background running
doallon it, so all the requested work is being done behind the scenes. Reading the sequence will only block on uncompleted work.
Note that these functions are eager! That's on purpose--we like to be able to start work going and know it'll get done. But if you try to
(range), you're going to have a bad time.
As of Claypoole 0.4.0, there are lazy functions in
com.climate.claypoole.lazy. See the section Lazy below.
(require '[com.climate.claypoole :as cp]) ;; A threadpool with 2 threads. (def pool (cp/threadpool 2)) ;; Future (def fut (cp/future pool (myfn myinput))) ;; Completable Future (def cfut (cp/completable-future pool (myfn myinput))) ;; Ordered pmap (def intermediates (cp/pmap pool myfn1 myinput-a myinput-b)) ;; We can feed the streaming sequence right into another parallel function. (def output (cp/pmap pool myfn2 intermediates)) ;; We can read the output from the stream as it is available. (doseq [o output] (prn o)) ;; NOTE: The JVM doesn't automatically clean up threads for us. (cp/shutdown pool)
Claypoole also contains functions for unordered parallelism. We found that for minimizing latency, we often wanted to deal with results as soon as they became available. This works well with our eager streaming, so we can chain together streams using the same or different threadpools to get work done as quickly as possible.
The unordered versions of core functions are: *
Here is an example of them in use.
(require '[com.climate.claypoole :as cp]) ;; We'll use the with-shutdown! form to guarantee that pools are cleaned up. (cp/with-shutdown! [net-pool (cp/threadpool 100) cpu-pool (cp/threadpool (cp/ncpus))] ;; Unordered pmap doesn't return output in the same order as the input(!), ;; but that means we can start using service2 as soon as possible. (def service1-resps (cp/upmap net-pool service1-request myinputs)) (def service2-resps (cp/upmap net-pool service2-request service1-resps)) (def results (cp/upmap cpu-pool handle-response service2-resps)) ;; ...eventually... ;; Make sure sure the computation is complete before we shutdown the pools. (doall results))
Claypoole also provides ordered and unordered parallel
formacros. Note that only the bodies of the for loops will be run in parallel; everything in the for binding will be done in the calling thread.
(def ordered (cp/pfor pool [x xs y ys] (myfn x y))) (def unordered (cp/upfor pool [x xs ;; This let is done in the calling thread. :let [ys (range x)] y ys] (myfn x y)))
Claypoole also lets you prioritize your backlog of tasks. Higher-priority tasks will be assigned to threads first. Here's an example; there is a more detailed description below.
(def pool (cp/priority-threadpool 10)) (def task1 (cp/future (cp/with-priority pool 1000) (myfn 1))) (def task2 (cp/future (cp/with-priority pool 0) (myfn 2))) (def moretasks (cp/pmap (cp/with-priority pool 10) myfn (range 3 10)))
It's worth mentioning that both
prun!block. Since they don't create a streaming sequence, their work doesn't happen in the "background". Instead, the calling thread will wait for all the tasks to complete. If you want to do work without blocking, use something that makes a sequence, like
You don't need to specifically declare a threadpool. Instead, you can just give a parallel function a number. Then, it will create its own private threadpool and shut it down safely when all its tasks are complete.
;; Use a temporary threadpool with 4 threads. (cp/pmap 4 myfunction myinput) ;; Use a temporary threadpool with ncpus + 2 threads. (cp/pmap (+ 2 (cp/ncpus)) myfunction myinput)
You can also pass the keyword
:builtinas a threadpool. In that case, the function will be run using Clojure's built-in, dynamically-scaling threadpool. This is equivalent to running each task in its own
;; Use built-in parallelism. (def f (cp/future :builtin (myfn myinput))) (def results (cp/pfor :builtin [x xs] (myfn x)))
You can also pass the keyword
:serialas a threadpool. In that case, the function will be run eagerly in series in the calling thread, not in parallel, so the parallel function will not return until all the work is done. We find this is helpful for testing and benchmarking. (See also about
;; Use no parallelism at all; blocks until all work is complete. (cp/pmap :serial myfunction myinput)
As of Claypoole version 0.3, by default all threadpools are daemon threadpools, so they should shut down when your main thread exits. But it's still good practice to clean up these resources. Neither the OS nor the JVM will take care of them for you!
The JVM does not automatically garbage collect threads for you. Instead, when you're done with your threadpool, you should use
shutdownto gently shut down the pool, or
shutdown!to kill the threads forcibly.
Of course, we have provided a convenience macro
leta threadpool and clean it up automatically:
(cp/with-shutdown! [pool (cp/threadpool 3)] ;; Use the pool, confident it will be shut down when you're done. But be ;; careful--if work is still going on in the pool when you reach the end of ;; the body of with-shutdown!, it will be forcibly killed! ...)
Alternately, daemon threads will be automatically killed when the JVM process exits. By default all threadpools are daemon threadpools and will exit when the main thread exits! You can create a non-daemon threadpool via:
(def pool (cp/threadpool 10 :daemon false))
To construct a threadpool, use the
threadpoolfunction. It takes optional keyword arguments allowing you to change the thread names, their daemon status, and their priority. (NOTE: Thread priority is a system-level property that depends on the OS; it is not the same as the task priority, described below.)
(def pool (cp/threadpool (cp/ncpus) :daemon false :thread-priority 3 :name "my-pool"))
We have found that benchmarking and some other tests are most easily done in series. You can do that in one of two ways.
First, you can just pass the keyword
:serialto a parallel function.
(def results (cp/pmap :serial myfn inputs))
Second, you can bind or
cp/*parallel*to false, like so:
(binding [cp/*parallel* false] (with-shutdown! [pool (cp/threadpool 2)] ;; This is in series; we block until all work is complete! (cp/pmap pool myfn inputs)))
As of Claypoole 0.4.0, there is a namespace
com.climate.claypoole.lazythat contains lazy versions of all the parallel functions. These lazy versions do not compute work until forced by something like
(doall), just like core
pmap, they will only run the outputs you realize, plus a few more (a buffer). The buffer is used to help keep the threadpool busy. Unlike core
pmap, the buffer size is not fixed at
ncpus + 2; instead, it defaults to the size of the threadpool to keep the pool full. Each parallel function also comes with a -buffer variant that has an extra argument, allowing you to specify the buffer size.
For instance, these will both cause 10 items to be realized:
(require '[com.climate.claypoole :as cp]) (require '[com.climate.claypoole.lazy :as lazy]) (cp/with-shutdown! [pool 2] (doall (take 8 (lazy/pmap pool inc (range)))) (doall (take 4 (lazy/pmap-buffer pool 6 inc (range)))))
The lazy functions work well with sequences too large to fit in memory. Furthermore, the lazy functions work well with chained maps that operate at different speeds over large amounts of data. If an eager map that runs quickly feeds into an eager map that runs slowly, the buffer between them will tend to grow, possibly running the system out of memory. Lazy functions will avoid this by only performing work as needed.
As a rule of thumb, if you are working with data too large to fit into memory, you probably want a lazy operation.
The disadvantage of the lazy functions is that they may not keep the threadpool fully busy. For instance, this pmap will take 6 seconds to run:
(doall (lazy/pmap 2 #(Thread/sleep (* % 1000)) [4 3 2 1]))
That's because it will not realize the 2 task until the 4 task is complete, so one thread in the pool will sit idle for 1 second. On the other hand, an eager function would only take 5 seconds, since the two threads would be assigned tasks as follows:
Note also that an unordered map (
upmap) would also take 5 seconds here, since as soon as 3 is complete, it would be returned and the next item would be forced. In general, to use the threadpool most efficiently with these lazy functions, prefer the unordered versions.
Exceptions are a little tricky in a
pmapfor two reasons.
First, exceptions in
pmapsare tricky because the task was run in a future, so Java "helpfully" rethrows the exception as a
java.util.concurrent.ExecutionException. Claypoole unwraps that
ExecutionExceptionso your code only sees the original exception (as of Claypoole version 0.4.0). For instance:
;; Core map throws the expected NullPointerException. (try (first (map inc [nil])) (catch NullPointerException e)) ;; Core pmap throws a java.util.concurrent.ExecutionException. (try (first (pmap inc [nil])) (catch java.util.concurrent.ExecutionException e)) ;; Claypoole pmap throws the expected NullPointerException. (try (first (cp/pmap 2 inc [nil])) (catch NullPointerException e))
Second, exceptions in
pmapsare tricky because there are other tasks running, and it's not quite clear what to do with those. Should they be aborted? Should they be allowed to continue? How many should be allowed to continue?
Claypoole (as of 0.4.0) works just like core
pmap: it will not kill queued tasks, but it will stop adding tasks to the queue. Like core
pmap, Claypoole's functions have a buffer of tasks enqueued to keep the thread pool busy. Whereas core
pmaphas a buffer of ncpus + 2, Claypoole's eager functions use a buffer size of twice the pool size (to keep the pool busy), and its lazy functions use a buffer size equal to the pool size (so that they minimize unneeded work).
(let [slow (fn [x] (Thread/sleep 100) x) ; we slow the work so the buffer fills prn+1 (comp prn inc slow) data (cons nil (iterate inc 0))] ; we use iterate inc to avoid chunking ;; Core map does no work after an exception, so no numbers will be printed. (dorun (map prn+1 data)) ;; Core pmap does ncpus + 2 work after an exception, so on a quad-core ;; computer, 6 numbers will be printed. (doall (pmap prn+1 data)) ;; Claypoole eager pmap does pool size * 2 - 1 work after an exception, since ;; the exceptional task is part of the buffer, so 5 numbers will be printed. (doall (cp/pmap 3 prn+1 data)) ;; Claypoole lazy pmap does pool size work after an exception, so 3 numbers ;; will be printed. (doall (lazy/pmap 3 prn+1 data)))
Note that if tasks are still running when the pool is force-shutdown with
shutdown!, those tasks will be aborted.
(cp/with-shutdown! [pool 2] (doall (cp/pmap pool inc (cons nil (range 100))))) ;; Some of those inc tasks will have been aborted because the sequence was ;; truncated by the exception, so the doall finished, and then the pool was ;; immediately shutdown! while a few tasks were still running.
(Note: Before Claypoole 0.4.0, its behavior was to kill later tasks if one task failed. In addition to possibly surprising users, that required more code complexity than was wise.)
You can create a threadpool that respects task priorities by creating a
(def p1 (cp/priority-threadpool 5)) (def p2 (cp/priority-threadpool 5 :default-priority -10))
Then, use functions
with-priority-fnto set the priority of your tasks, or just set the
(cp/future (cp/with-priority p1 100) (myfn)) ;; Nothing bad happens if you nest with-priority. The outermost one "wins"; ;; this task runs at priority 2. (cp/future (cp/with-priority (cp-with-priority p1 1) 2) (myfn)) ;; For pmaps, you can use a priority function, which is called with your ;; arguments. This will run 3 tasks at priorities 6, 5, and 4, respectively. (cp/upmap (cp/with-priority-fn p1 (fn [x _] x)) + [6 5 4] [1 2 3]) ;; For for loops, you can use the special :priority binding, which must be the ;; last for binding. (cp/upfor p1 [i (range 10) :priority (- i)] (myfn i))
Under the hood, threadpools are just instances of
java.util.concurrent.ExecutorService. You can use any
ExecutorServicein place of a threadpool, and you can use a threadpool just as you would an
ExecutorService. This means you can create custom threadpools and use them easily.
You might also like using dirigiste threadpools with Claypoole. Dirigiste provides a fast, instrumented ExecutorService that scales in a controllable way. Note that dirigiste by default creates thread pools that throw exceptions when full, which makes it hard for Claypoole to enqueue tasks. Instead, you can use the Executor constructor directly, though it's a bit ugly:
(def pool (io.aleph.dirigiste.Executor. (java.util.concurrent.Executors/defaultThreadFactory) ;; Here's where we insert our non-error queue (java.util.concurrent.LinkedBlockingQueue.) (io.aleph.dirigiste.Executors/fixedController n-threads) n-threads ;; Metrics to capture (java.util.EnumSet/noneOf io.aleph.dirigiste.Stats$Metric) ;; Dirigiste default sample period 25 ;; Dirigiste default control period 10000 java.util.concurrent.TimeUnit/MILLISECONDS))
There are a few cases where threadpools will stop your program from exiting that can surprise users. We have endeavored to minimize them, but they can still be problems.
Claypoole actually uses some
clojure.core/futures. Unfortunately, those threads are from the agent threadpool, and they are not daemon threads. Those threads will automatically die 60 seconds after they're used. So if your main thread doesn't call either
System/exit, those threads will keep going for a while!
You'll experience the same thing if you start a
pmap--those extra threads have to be explicitly shut down, or they'll naturally die after 60 seconds.
The answer is basically to call
(shutdown-agents)before your main thread exits.
In a future version of Claypoole, we may switch to using a separate daemon threadpool for those futures. However, that could have other complications, so we are deferring that decision.
Claypoole now (as of version 0.3) defaults to daemon threadpools, but before that threadpools were not daemons. If you have any non-daemon threadpools running and your main process exits, those threadpools will never die, and your program will hang indefinitely!
The answer is: either use daemon threadpools (which is now the default) or be very careful to shut down your threadpools when you're done with them. See the above section on disposing of threadpools.
The claypoole library is named after John Claypoole (Betsy Ross's third husband) for reasons that are at best obscure.
Copyright (C) 2014 The Climate Corporation. Distributed under the Apache License, Version 2.0. You may not use this library except in compliance with the License. You may obtain a copy of the License at
See the NOTICE file distributed with this work for additional information regarding copyright ownership. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.