# Basic Concurrency and Parallelism in Common Lisp – Part 4b (Parallelism using lparallel – Error Handling)

In this final part, we will discuss the very important topic of error handling, how lparallel handles it, and cap off the series with a small benchmarking example that will tie in all the concepts covered thus far.

The demo will help check actual core usage on the machine when using the lparallel library.

## Initial Setup

There is no additional setup required for this tutorial from the last tutorial.

In case you missed it, please check out the previous post – Parallelism fundamentals using lparallel.

## 5-minute Error Handling refresher

Before we jump headlong into the demos, here is quick refresher guide to Conditions and Restarts in Common Lisp (error handling). In case you are comfortably familiar with this topic, please skip ahead to the next section.

In case you are a novice interested in getting a more comprehensive treatment of Conditions and Restarts in Common Lisp, I recommend two things – firstly, check out my detailed post on the fundamentals of Conditions and Restarts in Common Lisp, and secondly, check out the links in the References section at the end of this post.

For our refresher, let’s take a simple example. We have a custom square root function. To keeps things simple, let us have a single check to ensure that the argument is zero or positive. We will forego all other validation.

First we define the relevant error condition:

(defpackage :positive-sqrt-user
(:use :cl))

(in-package :positive-sqrt-user)

;;; define the error condition
(define-condition negative-error (error)


Now let’s define the square root function itself. It is a simple implementation of the Newton-Raphson algorithm for finding the square root of a positive number (or zero). We take the first approximation/guess as 1.0d0:

(defconstant +eps+ 1e-9)

(defun square-root (n)
"Find the square root using the Newton-Raphson method."
(if (< n 0)
(error 'negative-error :message "number must be zero or positive"))
(let ((f 1.0d0))
(loop
when (< (abs (- (* f f) n)) +eps+)
do (return f)
do (setf f (/ (+ f (/ n f)) 2.0d0)))))


Nothing special there. The function simply loops until the candidate square root is within acceptable limits from the actual square root of the argument. For the sake of completion, the key step in the algorithm is the following:

(setf f (/ (+ f (/ n f)) 2.0d0)

This is as per the formula for calculating the next square root approximation at each stage:

$x_{n} = \frac{1}{2}\left(x_{n-1}+ \frac{n}{f}\right)$

In terms of error handling, we can handle the error in three different canonical ways (amongst others).

First, we can catch and process the error directly (similar to the try-catch-finally construct in some other languages:

;;; handle the error directly
(defun test-sqrt-handler-case ()
(let ((n (progn
(princ "Enter a number: ")
(unwind-protect (handler-case (square-root n)
(negative-error (o) (format t "Caught ~a~%" (error-message o)) nil))
(format t "Nothing to clean up!"))))


Testing it out:

POSITIVE-SQRT-USER> (test-sqrt-handler-case)
Enter a number: 200
Nothing to clean up!
14.142135623730955d0

POSITIVE-SQRT-USER> (test-sqrt-handler-case)
Enter a number: -200
Caught number must be zero or positive
Nothing to clean up!
NIL


Or, we could handle it automatically using a restart. Suppose we want to automatically return 1.0d0 as the result if we encounter an invalid argument to square-root, we could something like this:

;;; automatic restart
(defun test-sqrt-handler-bind ()
(let ((n (progn
(princ "Enter a number: ")
(handler-bind
((negative-error #'(lambda (c)
(format t "Caught: ~a~%" (error-message c))
(invoke-restart 'return-one))))
(restart-case (square-root n)
(return-one () 1.0d0)))))


Test run:

POSITIVE-SQRT-USER> (test-sqrt-handler-bind)
Enter a number: 200

14.142135623730955d0
POSITIVE-SQRT-USER> (test-sqrt-handler-bind)
Enter a number: -200
Caught: number must be zero or positive
1.0d0


Of course, the real usefulness of this scheme is realised when we have more restart cases available than these trivial ones.

And finally, we could handle it interactively, which allows us to enter a new value for the argument to square-root. (This interactive mode of development/operation is unique to the Lisp world).

(defun read-new-value ()
(format *query-io* "Enter a new value: ")
(force-output *query-io*)

;;; Interactive restart
(defun test-sqrt-interactive ()
(let ((n (progn
(princ "Enter a number: ")
(restart-case (square-root n)
(return-nil () nil)
(enter-new-value (num)
:report "Try entering a positive number.”
(square-root num)))))


Test drive!

POSITIVE-SQRT-USER> (test-sqrt-interactive)
Enter a number: 200

14.142135623730955d0

POSITIVE-SQRT-USER> (test-sqrt-interactive)
Enter a number: -200

Condition POSITIVE-SQRT-USER::NEGATIVE-ERROR was signalled.
[Condition of type NEGATIVE-ERROR]

Restarts:
0: [RETURN-NIL] RETURN-NIL
1: [ENTER-NEW-VALUE] Try entering a positive number.
2: [RETRY] Retry SLIME REPL evaluation request.

Enter a new value: 200

14.142135623730955d0


## Error Handling in lparallel

lparallel provides the lparallel:task-handler-bind construct. This is, for all means and purposes, equivalent to the handler-bind construct in Common Lisp. However, it is optimised for error handling inside of parallel tasks launched using the lparallel library.

### The problem

Why is this important? Well, take the following example for instance:

(define-condition foo (error) ())

;;; error handling with handler-bind
(defun test-errors-normal ()
(handler-bind
((foo #'(lambda (c)
(declare (ignore c))
(invoke-restart 'print-error-message))))
(pmap 'vector #'(lambda (x)
(declare (ignore x))
(restart-case (error 'foo)
(print-error-message () "error!")))
'(1 2 3 4 5))))


We declare a handler-bind in the current thread, and we invoke the restart print-error-message when we encounter an error of type foo.

Then we have a single pmap task inside the handler-bind. Notice that we define the restart-case inside the lambda function passed to pmap.

Now, inside the lambda function, we explicitly signal foo. Our expectation then is that the result of the operation is a vector of size 5, with each element being “error!”, right? Wrong! Here’s what we get instead:

Condition CONDS-RESTARTS-USER::FOO was signalled.
[Condition of type CONDS-RESTARTS-USER::FOO]

Restarts:
0: [PRINT-ERROR-MESSAGE] CONDS-RESTARTS-USER::PRINT-ERROR-MESSAGE
1: [TRANSFER-ERROR] Transfer this error to a dependent thread, if one exists.
2: [KILL-ERRORS] Kill errors in workers (remove debugger instances).


So what happened? The transfer-error restart case presents a clue. The reason the code didn’t’ work is because the error was spawned in a different context (inside a task), whereas we are trying to handle it in the current thread. To fix this, we can modify the code so that handler-bind is places inside the lambda function itself, in the same thread context:

;;; error handling with handler-bind modified
(defun test-errors-normal-modified ()
(pmap 'vector #'(lambda (x)
(declare (ignore x))
(handler-bind
((foo #'(lambda (c)
(declare (ignore c))
(invoke-restart 'print-error-message))))
(restart-case (error 'foo)
(print-error-message () "error!"))))
'(1 2 3 4 5)))


Take it for a spin:

CONDS-RESTARTS-USER> (test-errors-normal-modified)

#("error!" "error!" "error!" "error!" "error!")


And now we see the correct output! However, this approach does not scale. Imagine having 100 tasks, each with its own handler-bind! This is one of the compelling reasons we should use what the library provides us – lparallel:task-handler-bind as we shall see next.

### The solution

The lparallel:task-handler-bind version of the code looks so:

;;; error handling with task-handler-bind
(defun test-errors-lparallel ()
((foo #'(lambda (c)
(declare (ignore c))
(invoke-restart 'print-error-message))))
(pmap 'vector #'(lambda (x)
(declare (ignore x))
(restart-case (error 'foo)
(print-error-message () "error!")))
'(1 2 3 4 5))))


And the output is exactly what we expect:

CONDS-RESTARTS-USER> (test-errors-lparallel)

#("error!" "error!" "error!" "error!" "error!")


All we did was to replace handler-bind with lparallel:task-handler-bind in the original code!

Note: You can still override the behaviour per task using: (lparallel:task-handler-bind ((error #’invoke-transfer-error)…), which automatically transfers the error to a thread capable of providing a proper restart for the error condition (if available), by using (lparallel:task-handler-bind ((error #’invoke-transfer-error) …) to always trigger the debugger (good for interactive mode).

Let’s move on now to the demo to complete this whole series!

## Demos

The best way of observing performance differences between parallel and non-parallel operations is through a real example (albeit a simple one).

### Prime number generation

Top

The code:

;;;; A  benchmarking demo using prime number generation.

(defpackage :benchmarking-demo
(:use :cl :lparallel))

(in-package :benchmarking-demo)

;;; error conditions
(define-condition prime-number-error (error) ())

(defun primep (x)
(cond ((<= x 0)
(error 'prime-number-error))
((= x 1)
nil)
((= x 2)
t)
(t (loop for i from 2 to (floor (sqrt x))
when (zerop (mod x i))
do (return nil)
finally (return t)))))

;;; prime number generation
(defun gen-prime-numbers (start end)
(premove-if-not #'(lambda (x)
(restart-case (if (primep x) t nil)
(just-continue () nil)))
(loop for i from start to end
collect i)))

(defun prime-client ()
((prime-number-error #'(lambda (c)
(declare (ignore c))
(invoke-restart 'just-continue))))
(dotimes (i 1000000000000)
(gen-prime-numbers (1+ i) (+ i 1000000))
(incf i 1000000))))


This is a direct implementation of the basic prime number generation algorithm – test from 2 upto sqrt(number) for divisibility. I’m basically creating 1e6 chunks of 1e6 numbers each for the prime number test.

premove-if-not simply filters out the prime numbers from the list that is created from the start and end arguments to gen-prime-numbers.

The code took a long long time to run, and I could hear the poor machine hissing in protest (I just killed the process after 15 minutes), but on the bright side, all the cores were overloaded full time. Note that I don’t collect the generated numbers into a list because that would definitely have crashed SLIME in any case if I had let it run on.

I had contemplated doing another demo with matrix multiplication, but from an edificational perspective, this single demo seems to have done the job, so I’ll skip matrix multiplication for now.

## References

Some additional useful references (definitely check out the video in the second link. Patrick Stein’s tutorial using a simple range class example is most excellent):

That concludes this series on Concurrency and Parallelism using Common Lisp! Next up, we will discuss another extremely important topic – interop between languages. That will also be a mini-series of sorts, and I might throw in a random but useful post in between (depending on what interests me at that point!).

Till then, happy hacking!

# Basic Concurrency and Parallelism in Common Lisp – Part 4a (Parallelism using lparallel – fundamentals)

In these concluding parts of this mini-series, we will have a taste of parallel programming in Common Lisp using the lparallel library.

It is important to note that lparallel also provides extensive support for asynchronous programming, and is not a purely parallel programming library. As stated before, parallelism is merely an abstract concept in which tasks are conceptually independent of one another.

## Installation

lparallel can be installed using Quicklisp. In case you are not sure about how Quicklisp works, please check my previous post on how to setup a Common Lisp environment.

CL-USER> (ql:system-apropos "lparallel")
#<SYSTEM lparallel / lparallel-20160825-git / quicklisp 2016-08-25>
#<SYSTEM lparallel-bench / lparallel-20160825-git / quicklisp 2016-08-25>
#<SYSTEM lparallel-test / lparallel-20160825-git / quicklisp 2016-08-25>
; No value


Looks like it is. Let’s go ahead and install it:

CL-USER> (ql:quickload :lparallel)
Install 1 Quicklisp release:
lparallel
; Fetching #<URL "http://beta.quicklisp.org/archive/lparallel/2016-08-25/lparallel-20160825-git.tgz">
; 76.71KB
==================================================
78,551 bytes in 0.62 seconds (124.33KB/sec)
[package lparallel.util]..........................
[package lparallel.raw-queue].....................
[package lparallel.cons-queue]....................
[package lparallel.vector-queue]..................
[package lparallel.queue].........................
[package lparallel.counter].......................
[package lparallel.spin-queue]....................
[package lparallel.kernel]........................
[package lparallel.kernel-util]...................
[package lparallel.promise].......................
[package lparallel.ptree].........................
[package lparallel.slet]..........................
[package lparallel.defpun]........................
[package lparallel.cognate].......................
[package lparallel]
(:LPARALLEL)


And that’s all it took! Now let’s see how this library actually works.

## The lparallel library

The lparallel library is built on top of the Bordeaux threading library (see previous post for more on this library).

As mentioned in the previous post, parallelism and concurrency can be (and usually are) implemented using the same means — threads, processes, etc. The difference between lies in their conceptual differences.

Note that not all the examples shown in this post are necessarily parallel. Asynchronous constructs such as Promises and Futures are, in particular, more suited to concurrent programming than parallel programming.

The modus operandi of using the lparallel library (for a basic use case) is as follows:

• Create an instance of what the library calls a kernel using lparallel:make-kernel. The kernel is the component that schedules and executes tasks.
• Design the code in terms of futures, promises and other higher level functional concepts. To this end, lparallel provides support for channels, promises, futures, and cognates.
• Perform operations using what the library calls cognates, which are simply functions which have equivalents in the Common Lisp language itself. For instance, the lparallel:pmap function is the parallel equivalent of the Common Lisp map function.
• Finally, close the kernel created in the first step using lparallel:end-kernel.

Note that the onus of ensuring that the tasks being carried out are logically parallelisable as well as taking care of all mutable state is on the developer.

Top

First, let’s get hold of the number of threads that we are going to use for our parallel examples. Ideally, we’d like to have a 1:1 match between the number of worker threads and the number of available cores.

We can use the wonderful cffi library to this end. I plan to have a detailed blog post for this extremely useful library soon, but for now, let’s get on with it:

Install CFFI:

CL-USER> (ql:quickload :cffi)
alexandria babel trivial-features uiop
Install 1 Quicklisp release:
cffi
; Fetching #<URL "http://beta.quicklisp.org/archive/cffi/2016-03-18/cffi_0.17.1.tgz">
; 234.48KB
==================================================
240,107 bytes in 5.98 seconds (39.22KB/sec)
[package cffi-sys]................................
[package cffi]....................................
..................................................
[package cffi-features]
(:CFFI)


Write C code to get the number of logical cores on the machine:

#include <stdio.h>
#include <sys/types.h>
#include <sys/sysctl.h>

int get_core_count();

int main()
{
printf("%d\n", get_core_count());

return 0;
}

int32_t get_core_count()
{
const char* s = "hw.logicalcpu";
int32_t core_count;
size_t len = sizeof(core_count);

sysctlbyname(s, &core_count, &len, NULL, 0);

return core_count;
}


Bundle the C code into a shared library (note, I am using Mac OS X which comes bundled with Clang. For pure gcc, refer to the relevant documentation):

Timmys-MacBook-Pro:Parallelism z0ltan$clang -dynamiclib get_core_count.c -o libcorecount.dylib  Invoke the function from Common Lisp: CL-USER> (cffi:use-foreign-library "libcorecount.dylib") #<CFFI:FOREIGN-LIBRARY LIBCORECOUNT.DYLIB-853 "libcorecount.dylib"> CL-USER> (cffi:foreign-funcall "get_core_count" :int) 8  We can see that the result is 8 cores on the machine (which is correct) and can be verified from the command line as well: Timmys-MacBook-Pro:Parallelism z0ltan$ sysctl -n "hw.logicalcpu"
8


### Common Setup

In this example, we will go through the initial setup bit, and also show some useful information once the setup is done.

CL-USER> (ql:quickload :lparallel)
lparallel

(:LPARALLEL)


Initialise the lparallel kernel:

CL-USER> (setf lparallel:*kernel* (lparallel:make-kernel 8 :name "custom-kernel"))
#<LPARALLEL.KERNEL:KERNEL :NAME "custom-kernel" :WORKER-COUNT 8 :USE-CALLER NIL :ALIVE T :SPIN-COUNT 2000 {1003141F03}>


Note that the *kernel* global variable can be rebound — this allows multiple kernels to co-exist during the same run. Now, some useful information about the kernel:

CL-USER> (defun show-kernel-info ()
(let ((name (lparallel:kernel-name))
(count (lparallel:kernel-worker-count))
(context (lparallel:kernel-context))
(bindings (lparallel:kernel-bindings)))
(format t "Kernel name = ~a~%" name)
(format t "Worker threads count = ~d~%" count)
(format t "Kernel context = ~a~%" context)
(format t "Kernel bindings = ~a~%" bindings)))

WARNING: redefining COMMON-LISP-USER::SHOW-KERNEL-INFO in DEFUN
SHOW-KERNEL-INFO

CL-USER> (show-kernel-info)
Kernel name = custom-kernel
Kernel context = #<FUNCTION FUNCALL>
Kernel bindings = ((*STANDARD-OUTPUT* . #<SLIME-OUTPUT-STREAM {10044EEEA3}>)
(*ERROR-OUTPUT* . #<SLIME-OUTPUT-STREAM {10044EEEA3}>))
NIL


End the kernel (this is important since *kernel* does not get garbage collected until we explictly end it):

CL-USER> (lparallel:end-kernel :wait t)


Let’s move on to some more examples of different aspects of the lparallel library.

For these demos, we will be using the following initial setup from a coding perspective:

(require ‘lparallel)
(require ‘bt-semaphore)

(defpackage :lparallel-user
(:use :cl :lparallel :lparallel.queue :bt-semaphore))

(in-package :lparallel-user)

;;; initialise the kernel
(defun init ()
(setf *kernel* (make-kernel 8 :name "channel-queue-kernel")))

(init)


So we will be using a kernel with 8 worker threads (one for each CPU core on the machine).

And once we’re done will all the examples, the following code will be run to close the kernel and free all used system resources:

;;; shut the kernel down
(defun shutdown ()
(end-kernel :wait t))

(shutdown)


Top

First some definitions are in order.

A task is a job that is submitted to the kernel. It is simply a function object along with its arguments.

A channel in lparallel is similar to the same concept in Go. A channel is simply a means of communication with a worker thread. In our case, it is one particular way of submitting tasks to the kernel.

For instance, we can calculate the square of a number as:

(defun calculate-square (n)
(let* ((channel (lparallel:make-channel))
(res nil))
(* x x))
n)
(format t "Square of ~d = ~d~%" n res)))


And the output:

LPARALLEL-USER> (calculate-square 100)
Square of 100 = 10000
NIL


Now let’s try submitting multiple tasks to the same channel. In this simple example, we are simpy creating three tasks that square, triple, and quadrupls the supplied input respectively.

Note that in case of multiple tasks, the output will be in non-deterministic order:

(defun test-basic-channel-multiple-tasks ()
(let ((channel (make-channel))
(res '()))
(* x x))
10)
(* y y y))
10)
(* z z z z))
10)
(dotimes (i 3 res)


And the output:

LPARALLEL-USER> (dotimes (i 3)

(100 1000 10000)
(100 1000 10000)
(10000 1000 100)
NIL


lparallel also provides support for creating a blocking queue in order to enable message passing between worker threads. A queue is created using lparallel.queue:make-queue

Some useful functions for using queues are:

• lparallel.queue:make-queue: create a FIFO blocking queue
• lparallel.queue:push-queue: insert an element into the queue
• lparallel.queue:pop-queue: pop an item from the queue
• lparallel.queue:peek-queue: inspect value without popping it
• lparallel.queue:queue-count: the number of entries in the queue
• lparallel.queue:queue-full-p: check if the queue is full
• lparallel.queue:queue-empty-p:check if the queue is empty
• lparallel.queue:with-locked-queue: lock the queue during access
• A basic demo showing basic queue properties:

(defun test-queue-properties ()
(let ((queue (make-queue :fixed-capacity 5)))
(loop
when (queue-full-p queue)
do (return)
do (push-queue (random 100) queue))
(print (queue-full-p queue))
(loop
when (queue-empty-p queue)
do (return)
do (print (pop-queue queue)))
(print (queue-empty-p queue)))
nil)


Which produces:

LPARALLEL-USER> (test-queue-properties)

T
17
51
55
42
82
T
NIL


Note: lparallel.queue:make-queue is a generic interface which is actually backed by different types of queues. For instance, in the previous example, the actual type of the queue is lparallel.vector-queue since we specified it to be of fixed size using the :fixed-capacity keyword argument.

The documentation doesn’t actually specify what keyword arguments we can pass to lparallel.queue:make-queue, so let’s and find that out in a different way:

LPARALLEL-USER> (describe 'lparallel.queue:make-queue)
LPARALLEL.QUEUE:MAKE-QUEUE
[symbol]

MAKE-QUEUE names a compiled function:
Lambda-list: (&REST ARGS)
Derived type: FUNCTION
Documentation:
Create a queue.

The queue contents may be initialized with the keyword argument
initial-contents'.

By default there is no limit on the queue capacity. Passing a
fixed-capacity' keyword argument limits the capacity to the value
passed. push-queue' will block for a full fixed-capacity queue.
Source file: /Users/z0ltan/quicklisp/dists/quicklisp/software/lparallel-20160825-git/src/queue.lisp

MAKE-QUEUE has a compiler-macro:
Source file: /Users/z0ltan/quicklisp/dists/quicklisp/software/lparallel-20160825-git/src/queue.lisp
; No value


So, as we can see, it supports the following keyword arguments – :fixed-capacity, and initial-contents.

Now, if we do specify :fixed-capacity, then the actual type of the queue will be lparallel.vector-queue, and if we skip that keyword argument, the queue will be of type lparallel.cons-queue (which is a queue of unlimited size), as can be seen from the output of the following snippet:

(defun check-queue-types ()
(let ((queue-one (make-queue :fixed-capacity 5))
(queue-two (make-queue)))
(format t "queue-one is of type: ~a~%" (type-of queue-one))
(format t "queue-two is of type: ~a~%" (type-of queue-two))))

LPARALLEL-USER> (check-queue-types)
queue-one is of type: VECTOR-QUEUE
queue-two is of type: CONS-QUEUE
NIL


Of course, you can always create instances of the specific queue types yourself, but it is always better, when you can, to stick to the generic interface and letting the library create the proper type of queue for you.

Now, let’s just see the queue in action!

(defun test-basic-queue ()
(let ((queue (make-queue))
(channel (make-channel))
(res '()))
(loop for entry = (pop-queue queue)
when (queue-empty-p queue)
do (return)
do (push (* entry entry) res))))
(dotimes (i 100)
(push-queue i queue))
(format t "~{~d ~}~%" res)))


Here we submit a single task that repeatedly scans the queue till it’s empty, pops the available values, and pushes them into the res list.

And the output:

LPARALLEL-USER> (test-basic-queue)
9604 9409 9216 9025 8836 8649 8464 8281 8100 7921 7744 7569 7396 7225 7056 6889 6724 6561 6400 6241 6084 5929 5776 5625 5476 5329 5184 5041 4900 4761 4624 4489 4356 4225 4096 3969 3844 3721 3600 3481 3364 3249 3136 3025 2916 2809 2704 2601 2500 2401 2304 2209 2116 2025 1936 1849 1764 1681 1600 1521 1444 1369 1296 1225 1156 1089 1024 961 900 841 784 729 676 625 576 529 484 441 400 361 324 289 256 225 196 169 144 121 100 81 64 49 36 25 16 9 4 1 0
NIL


A small note mentioning the lparallel:kill-task function would be apropos at this juncture. This function is useful in those cases when tasks are unresponsive. The lparallel documentation clearly states that this must only be used as a last resort.

All tasks which are created are by default assigned a category of :default. The dynamic property, *task-category* holds this value, and can be dynamically bound to different values (as we shall see).

;;; kill default tasks
(let ((channel (make-channel))
(stream *query-io*))
(dotimes (i 10)
(sleep (random 10))
(format stream "~d~%" (* x x))) (random 10)))
(sleep (random 2))


Sample run:

LPARALLEL-USER> (test-kill-all-tasks)
16
1
8
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.


Since we had created 10 tasks, all the 8 kernel worker threads were presumably busy with a task each. When we killed tasks of category :default, all these threads were killed as well and had to be regenerated (which is an expensive operation). This is part of the reason why lparallel:kill-tasks must be avoided.

Now, in the example above, all running tasks were killed since all of them belonged to the :default category. Suppose we wish to kill only specific tasks, we can do that by binding *task-category* when we create those tasks, and then specifying the category when we invoke lparallel:kill-tasks.

For example, suppose we have two categories of tasks – tasks which square their arguments, and tasks which cube theirs. Let’s assign them categories ’squaring-tasks and ’cubing-tasks respectively. Let’s then kill tasks of a randomly chosen category ’squaring-tasks or ’cubing-tasks.

Here is the code:

;;; kill tasks of a randomly chosen category
(let ((channel (make-channel))
(stream *query-io*))
(dotimes (i 5)
(sleep (random 5))
(format stream "~%[Squaring] ~d = ~d" x (* x x))) i)))
(dotimes (i 5)
(sleep (random 5))
(format stream "~%[Cubing] ~d = ~d" x (* x x x))) i)))
(sleep 1)
(if (evenp (random 10))
(progn
(progn


And here is a sample run:

LPARALLEL-USER> (test-kill-random-tasks)

[Cubing] 2 = 8
[Squaring] 4 = 16
[Cubing] 4
= [Cubing] 643 = 27
4
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.

[Cubing] 1 = 1
[Cubing] 0 = 0

[Squaring] 1 = 1
[Squaring] 3 = 9
5
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.

[Squaring] 2 = 4
WARNING: lparallel: Replacing lost or dead worker.
WARNING: lparallel: Replacing lost or dead worker.

[Squaring] 0 = 0
[Squaring] 4 = 16


Top

Promises and Futures provide support for Asynchronous Programming.

In lparallel-speak, a lparallel:promise is a placeholder for a result which is fulfilled by providing it with a value. The promise object itself is created using lparallel:promise, and the promise is given a value using the lparallel:fulfill macro.

To check whether the promise has been fulfilled yet or not, we can use the lparallel:fulfilledp predicate function.
Finally, the lparallel:force function is used to extract the value out of the promise. Note that this function blocks until the operation is complete.

Let’s solidify these concepts with a very simple example first:

(defun test-promise ()
(let ((p (promise)))
(loop
(progn
(return))))
(force p)))


Which generates the output:

LPARALLEL-USER> (test-promise)
5
1
3
10
[/code]

Explanation: This simple example simply keeps looping forever until an even number has been entered. The promise is fulfilled inside the loop using lparallel:fulfill, and the value is then returned from the function by forcing it with lparallel:force.


Now, let’s take a bigger example. Assuming that we don’t want to have to wait for the promise to be fulfilled, and instead have the current do some useful work, we can delegate the promise fulfillment to external explicitly as seen in the next example.

Consider we have a function that squares its argument. And, for the sake of argument, it consumes a lot of time doing so. From our client code, we want to invoke it, and wait till the squared value is available.

(let ((p (promise))
(stream *query-io*)
(n (progn
(princ "Enter a number: ")
(format t "In main function...~%")
#'(lambda ()
(sleep (random 10))
(format stream "Inside thread... fulfilling promise~%")
(fulfill p (* n n))))
#'(lambda ()
(loop
when (fulfilledp p)
do (return)
do (progn
(format stream "~d~%" (random 100))
(sleep (* 0.01 (random 100)))))))
(format t "Inside main function, received value: ~d~%" (force p))))


And the output:

LPARALLEL-USER> (promise-with-threads)
Enter a number: 19
In main function...
44
59
90
34
30
76
Inside main function, received value: 361
NIL


Explanation: There is nothing much in this example. We create a promise object p, and we spawn off a thread that sleeps for some random time and then fulfills the promise by giving it a value.

Meanwhile, in the main thread, we spawn off another thread that keeps hecking if the promise has been fulfilled or not. If not, it prints some random number and continues checking. Once the promise has been fulfilled, we can extract the value using lparallel:force in the main thread as shown.

This shows that promises can be fulfilled by different threads while the code that created the promise need not wait for the promise to be fulfilled. This is especially important since, as mentioned before, lparallel:force is a blocking call. We want to delay forcing the promise until the value is actually available.

Another point to note when using promises is that once a promise has been fulfilled, invoking force on the same object will always return the same value. That is to say, a promise can be successfully fulfilled only once.

For instance:

(defun multiple-fulfilling ()
(let ((p (promise)))
(dotimes (i 10)
(fulfill p (random 100))
(format t "~d~%" (force p)))))


Which produces:

LPARALLEL-USER> (multiple-fulfilling)
15
15
15
15
15
15
15
15
15
15
NIL


So how does a future differ from a promise?

A lparallel:future is simply a promise that is run in parallel, and as such, it does not block the main thread like a default use of <code<lparallel:promise would. It is executed in its own thread (by the lparallel library, of course).

Here is a simple example of a future:

(defun test-future ()
(let ((f (future
(sleep (random 5))
(print "Hello from future!"))))
(loop
when (fulfilledp f)
do (return)
do (sleep (* 0.01 (random 100)))
(format t "~d~%" (random 100)))
(format t "~d~%" (force f))))


And the output:

LPARALLEL-USER> (test-future)
5
19
91
11
Hello from future!
NIL


Explanation: This exactly is similar to the promise-with-threads example. Observe two differences, however - first of all, the lparallel:future macro has a body as well. This allows the future to fulfill itself! What this means is that as soon as the body of the future is done executing, lparallel:fulfilledp will always return true for the future object.

Secondly, the future itself is spawned off on a separate thread by the library, so it does not interfere with the execution of the current thread very much unlike promises as could be seen in the promise-with-threads example (which needed an explicit thread for the fulfilling code in order to avoid blocking the current thread).

The most interesting bit is that (even in terms of the actual theory propounded by Dan Friedman and others), a Future is conceptually something that fulfills a Promise. That is to say, a promise is a contract that some value will be generated sometime in the future, and a future is precisely that “something” that does that job.

What this means is that even when using the lparallel library, the basic use of a future would be to fulfill a promise. This means that hacks like promise-with-threads need not be made by the user.

Let’s take a small example to demonstrate this point (a pretty contrived example, I must admit!).

Here’s the scenario: we want to read in a number and calculate its square. So we offload this work to another function, and continue with our own work. When the result is ready, we want it to be printed on the console without any intervention from us.

Here’s how the code looks:

;;; Callback example using promises and futures
(defun callback-promise-future-demo ()
(let* ((p (promise))
(stream *query-io*)
(n (progn
(princ "Enter a number: ")
(f (future
(sleep (random 10))
(fulfill p (* n n))
(force (future
(format stream "Square of ~d = ~d~%" n (force p)))))))
(loop
when (fulfilledp f)
do (return)
do (sleep (* 0.01 (random 100))))))


And the output:

LPARALLEL-USER> (callback-promise-future-demo)
Enter a number: 19
Square of 19 = 361
NIL


Explanation: All right, so first off, we create a promise to hold the squared value when it is generated. This is the p object. The input value is stored in the local variable n.

Then we create a future object f. This future simply squares the input value and fulfills the promise with this value. Finally, since we want to print the output in its own time, we force an anonymous future which simply prints the output string as shown.

Note that this is very similar to the situation in an environment like Node, where we pass callback functions to other functions with the understanding that the callback will be called when the invoked function is done with its work.

Finally note that the following snippet is still fine (even if it uses the blocking lparallel:force call because it’s on a separate thread):

 (force (future (format stream "Square of ~d = ~d~%" n (force p)))) 

To summarise, the general idiom of usage is: define objects which will hold the results of asynchronous computations in promises, and use futures to fulfill those promises.

Top

Cognates are argubaly the raison d’etre of the lparallel library. These constructs are what truly provide parallelism in the lparalle. Note, however, that most (if not all) of these constructs are built on top of futures and promises.

To put it in a nutshell, cognates are simply functions that are intended to be the parallel equivalents of their Common Lisp counterparts. However, there are a few extra lparallel cognates that have no Common Lisp equivalents.

At this juncture, it is important to know that cognates come in two basic flavours:

1. Constructs for fine-grained parallelism: defpun, plet, plet-if, etc.
2. Explicit functions and macros for performing parallel operations - pmap, preduce, psort, pdotimes, etc.

In the first case we don’t have much explicit control over the operations themselves. We mostly rely on the fact that the library itself will optimise and parallelise the forms to whatever extent it can. In this post, we will focus on the second category of cognates.

Take, for instance, the cognate function lparallel:pmap is exactly the same as the Common Lisp equivalent, map, but it runs in parallel. Let’s demonstrate that through an example.

Suppose we had a list of random strings of length varying from 3 to 10, and we wished to collect their lengths in a vector.

Let’s first set up the helper functions that will generate the random strings:

(defvar *chars*
(remove-duplicates
(sort
(loop for c across "The quick brown fox jumps over the lazy dog"
when (alpha-char-p c)
collect (char-downcase c))
#'char<)))

(defun get-random-strings (&optional (count 100000))
"generate random strings between lengths 3 and 10"
(loop repeat count
collect
(concatenate 'string  (loop repeat (+ 3 (random 8))
collect (nth (random 26) *chars*)))))


And here’s how the Common Lisp map version of the solution might look like:

;;; map demo
(defun test-map ()
(map 'vector #'length (get-random-strings 100)))


And let’s have a test run:

LPARALLEL-USER> (test-map)
#(7 5 10 8 7 5 3 4 4 10)


And here’s the lparallel:pmap equivalent:

;;;pmap demo
(defun test-pmap ()
(pmap 'vector #'length (get-random-strings 100)))


which produces:

LPARALLEL-USER> (test-pmap)
#(8 7 6 7 6 4 5 6 5 7)
LPARALLEL-USER>


As you can see from the definitions of test-map and test-pmap, the syntax of the lparallel:map and lparallel:pmap functions are exactly the same (well, almost - lparallel:pmap has a few more optional arguments).

Some useful cognate functions and macros (all of them are functions except when marked so explicitly. Note that there are quite a few cognates, and I have chosen a few to try and represent every category through an example:

• lparallel:pmap:
Parallel version of map.

Note that all the mapping functions (lparallel:pmap, lparallel:pmapc,lparallel:pmapcar, etc.) are take two special keyword arguments - :size, specifiying the number of elements of the input sequence(s) to process, and :parts which specifies the number of parallel parts to divide the sequence(s) into.

;;; pmap - function
(defun test-pmap ()
(let ((numbers (loop for i below 10
collect i)))
(pmap 'vector #'(lambda (x)
(* x x))
:parts (length numbers)
numbers)))


Sample run:

LPARALLEL-USER> (test-pmap)

#(0 1 4 9 16 25 36 49 64 81)

• lparallel:por:
Parallel version of or. The behaviour is that it returns the first non-nil element amongst its arguments. However, due to the parallel nature of this macro, that element varies.

;;; por - macro
(defun test-por ()
(let ((a 100)
(b 200)
(c nil)
(d 300))
(por a b c d)))


Sample run:

LPARALLEL-USER> (dotimes (i 10)
(print (test-por)))

300
300
100
100
100
300
100
100
100
100
NIL


In the case of the normal or operator, it would always have returned the first non-nil element viz. 100.

• lparallel:pdotimes:
Parallel version of dotimes. Note that this macro also take an optional :parts argument.

;;; pdotimes - macro
(defun test-pdotimes ()
(pdotimes (i 5)
(declare (ignore i))
(print (random 100))))


Sample run:

LPARALLEL-USER> (test-pdotimes)

39
29
81
42
56
NIL

• lparallel:pfuncall:
Parallel version of funcall.

;;; pfuncall - macro
(defun test-pfuncall ()
(pfuncall #'* 1 2 3 4 5))


Sample run:

LPARALLEL-USER> (test-pfuncall)

120

• lparallel:preduce:
Parallel version of reduce.

This very important function also takes two optional keyword arguments - :parts (same meaning as explained), and :recurse. If :recurse is non-nil, it recursively applies lparallel:preduce to its arguments, otherwise it default to using reduce.

;;; preduce - function
(defun test-preduce ()
(let ((numbers (loop for i from 1 to 100
collect i)))
(preduce #'+
numbers
:parts (length numbers)
:recurse t)))


Sample run:

LPARALLEL-USER> (test-preduce)

5050

• lparallel:premove-if-not:
Parallel version of remove-if-not. This is essentially equivalent to “filter” in Functional Programming parlance.

;;; premove-if-not
(defun test-premove-if-not ()
(let ((numbers (loop for i from 1 to 100
collect i)))
(premove-if-not #'evenp numbers)))


Sample run:

LPARALLEL-USER> (test-premove-if-not)

(2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54
56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 100)

• lparallel:pevery:
Parallel version of every.

;;; pevery - function
(defun test-pevery ()
(let ((numbers (loop for i from 1 to 100
collect i)))
(list (pevery #'evenp numbers)
(pevery #'integerp numbers))))


Sample run:

LPARALLEL-USER> (test-pevery)

(NIL T)


In this example, we are performing two checks - firstly, whether all the numbers in the range [1,100] are even, and secondly, whether all the numbers in the same range are integers.

• lparallel:count:
Parallel version of count.

;;; pcount - function
(defun test-pcount ()
(let ((chars "The quick brown fox jumps over the lazy dog"))
(pcount #\e chars)))


Sample run:

LPARALLEL-USER> (test-pcount)

3

• lparallel:psort:
Parallel version of sort.

;;; psort - function
(defstruct person
name
age)

(defun test-psort ()
(let* ((names (list "Rich" "Peter" "Sybil" "Basil" "Candy" "Slava" "Olga"))
(people (loop for name in names
collect (make-person :name name :age (+ (random 20) 20)))))
(print "Before sorting...")
(print people)
(fresh-line)
(print "After sorting...")
(psort
people
#'(lambda (x y)
(< (person-age x)
(person-age y)))
:test #'=)))


Sample run:

LPARALLEL-USER> (test-psort)

"Before sorting..."
(#S(PERSON :NAME "Rich" :AGE 38) #S(PERSON :NAME "Peter" :AGE 24)
#S(PERSON :NAME "Sybil" :AGE 20) #S(PERSON :NAME "Basil" :AGE 22)
#S(PERSON :NAME "Candy" :AGE 23) #S(PERSON :NAME "Slava" :AGE 37)
#S(PERSON :NAME "Olga" :AGE 33))

"After sorting..."
(#S(PERSON :NAME "Sybil" :AGE 20) #S(PERSON :NAME "Basil" :AGE 22)
#S(PERSON :NAME "Candy" :AGE 23) #S(PERSON :NAME "Peter" :AGE 24)
#S(PERSON :NAME "Olga" :AGE 33) #S(PERSON :NAME "Slava" :AGE 37)
#S(PERSON :NAME "Rich" :AGE 38))


In this example, we first define a structure of type person` for storing information about people. Then we create a list of 7 people with randomly generated ages (between 20 and 39). Finally, we sort them by age in non-decreasing order.

There are, of course, a lot more functions, objects, and idiomatic ways of performing parallel computations using the lparallel library. This post barely scratches the surface on those. However, the general flow of operation is amply demonstrated here, and for further reading, you may find the following resources useful:

In the final part of this series, we will discuss an extremely important topic common to all that was covered in this post - error handling.