Basic Concurrency and Parallelism in Common Lisp – Part 4a (Parallelism using lparallel – fundamentals)


In these concluding parts of this mini-series, we will have a taste of parallel programming in Common Lisp using the lparallel library.

It is important to note that lparallel also provides extensive support for asynchronous programming, and is not a purely parallel programming library. As stated before, parallelism is merely an abstract concept in which tasks are conceptually independent of one another.

Contents

Installation

lparallel can be installed using Quicklisp. In case you are not sure about how Quicklisp works, please check my previous post on how to setup a Common Lisp environment.

Let’s check if lparallel is available for download using Quicklisp:

CL-USER> (ql:system-apropos "lparallel")
#<SYSTEM lparallel / lparallel-20160825-git / quicklisp 2016-08-25>
#<SYSTEM lparallel-bench / lparallel-20160825-git / quicklisp 2016-08-25>
#<SYSTEM lparallel-test / lparallel-20160825-git / quicklisp 2016-08-25>
; No value

Looks like it is. Let’s go ahead and install it:

CL-USER> (ql:quickload :lparallel)
To load "lparallel":
  Load 2 ASDF systems:
    alexandria bordeaux-threads
  Install 1 Quicklisp release:
    lparallel
; Fetching #<URL "http://beta.quicklisp.org/archive/lparallel/2016-08-25/lparallel-20160825-git.tgz">
; 76.71KB
==================================================
78,551 bytes in 0.62 seconds (124.33KB/sec)
; Loading "lparallel"
[package lparallel.util]..........................
[package lparallel.thread-util]...................
[package lparallel.raw-queue].....................
[package lparallel.cons-queue]....................
[package lparallel.vector-queue]..................
[package lparallel.queue].........................
[package lparallel.counter].......................
[package lparallel.spin-queue]....................
[package lparallel.kernel]........................
[package lparallel.kernel-util]...................
[package lparallel.promise].......................
[package lparallel.ptree].........................
[package lparallel.slet]..........................
[package lparallel.defpun]........................
[package lparallel.cognate].......................
[package lparallel]
(:LPARALLEL)

And that’s all it took! Now let’s see how this library actually works.

The lparallel library

Top

The lparallel library is built on top of the Bordeaux threading library (see previous post for more on this library).

As mentioned in the previous post, parallelism and concurrency can be (and usually are) implemented using the same means — threads, processes, etc. The difference between lies in their conceptual differences.

Note that not all the examples shown in this post are necessarily parallel. Asynchronous constructs such as Promises and Futures are, in particular, more suited to concurrent programming than parallel programming.

The modus operandi of using the lparallel library (for a basic use case) is as follows:

  • Create an instance of what the library calls a kernel using lparallel:make-kernel. The kernel is the component that schedules and executes tasks.
  • Design the code in terms of futures, promises and other higher level functional concepts. To this end, lparallel provides support for channels, promises, futures, and cognates.
  • Perform operations using what the library calls cognates, which are simply functions which have equivalents in the Common Lisp language itself. For instance, the lparallel:pmap function is the parallel equivalent of the Common Lisp map function.
  • Finally, close the kernel created in the first step using lparallel:end-kernel.

Note that the onus of ensuring that the tasks being carried out are logically parallelisable as well as taking care of all mutable state is on the developer.

Demos

Top

First, let’s get hold of the number of threads that we are going to use for our parallel examples. Ideally, we’d like to have a 1:1 match between the number of worker threads and the number of available cores.

We can use the wonderful cffi library to this end. I plan to have a detailed blog post for this extremely useful library soon, but for now, let’s get on with it:

Install CFFI:

CL-USER> (ql:quickload :cffi)
To load "cffi":
  Load 4 ASDF systems:
    alexandria babel trivial-features uiop
  Install 1 Quicklisp release:
    cffi
; Fetching #<URL "http://beta.quicklisp.org/archive/cffi/2016-03-18/cffi_0.17.1.tgz">
; 234.48KB
==================================================
240,107 bytes in 5.98 seconds (39.22KB/sec)
; Loading "cffi"
[package cffi-sys]................................
[package cffi]....................................
..................................................
[package cffi-features]
(:CFFI)

Write C code to get the number of logical cores on the machine:

#include <stdio.h>
#include <sys/types.h>
#include <sys/sysctl.h>

int get_core_count();

int main()
{
    printf("%d\n", get_core_count());

    return 0;
}

int32_t get_core_count()
{
    const char* s = "hw.logicalcpu";
    int32_t core_count;
    size_t len = sizeof(core_count);

    sysctlbyname(s, &core_count, &len, NULL, 0);
    
    return core_count;
}

Bundle the C code into a shared library (note, I am using Mac OS X which comes bundled with Clang. For pure gcc, refer to the relevant documentation):

Timmys-MacBook-Pro:Parallelism z0ltan$ clang -dynamiclib get_core_count.c -o libcorecount.dylib

Invoke the function from Common Lisp:

CL-USER> (cffi:use-foreign-library "libcorecount.dylib")
#<CFFI:FOREIGN-LIBRARY LIBCORECOUNT.DYLIB-853 "libcorecount.dylib">
CL-USER> (cffi:foreign-funcall "get_core_count" :int)
8

We can see that the result is 8 cores on the machine (which is correct) and can be verified from the command line as well:

Timmys-MacBook-Pro:Parallelism z0ltan$ sysctl -n "hw.logicalcpu"
8

Common Setup

Top

In this example, we will go through the initial setup bit, and also show some useful information once the setup is done.

Load the library:

CL-USER> (ql:quickload :lparallel)
To load "lparallel":
  Load 1 ASDF system:
    lparallel
; Loading "lparallel"

(:LPARALLEL)

Initialise the lparallel kernel:

CL-USER> (setf lparallel:*kernel* (lparallel:make-kernel 8 :name "custom-kernel"))
#<LPARALLEL.KERNEL:KERNEL :NAME "custom-kernel" :WORKER-COUNT 8 :USE-CALLER NIL :ALIVE T :SPIN-COUNT 2000 {1003141F03}>

Note that the *kernel* global variable can be rebound — this allows multiple kernels to co-exist during the same run. Now, some useful information about the kernel:

CL-USER> (defun show-kernel-info ()
           (let ((name (lparallel:kernel-name))
                 (count (lparallel:kernel-worker-count))
                 (context (lparallel:kernel-context))
                 (bindings (lparallel:kernel-bindings)))
             (format t "Kernel name = ~a~%" name)
             (format t "Worker threads count = ~d~%" count)
             (format t "Kernel context = ~a~%" context)
             (format t "Kernel bindings = ~a~%" bindings)))
           
             
WARNING: redefining COMMON-LISP-USER::SHOW-KERNEL-INFO in DEFUN
SHOW-KERNEL-INFO

CL-USER> (show-kernel-info)
Kernel name = custom-kernel
Worker threads count = 8
Kernel context = #<FUNCTION FUNCALL>
Kernel bindings = ((*STANDARD-OUTPUT* . #<SLIME-OUTPUT-STREAM {10044EEEA3}>)
                   (*ERROR-OUTPUT* . #<SLIME-OUTPUT-STREAM {10044EEEA3}>))
NIL

End the kernel (this is important since *kernel* does not get garbage collected until we explictly end it):

CL-USER> (lparallel:end-kernel :wait t)
(#<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {100723FA83}>
 #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {100723FE23}>
 #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {10072581E3}>
 #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007258583}>
 #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007258923}>
 #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007258CC3}>
 #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007259063}>
 #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007259403}>)

Let’s move on to some more examples of different aspects of the lparallel library.

For these demos, we will be using the following initial setup from a coding perspective:

(require ‘lparallel)
(require ‘bt-semaphore)

(defpackage :lparallel-user
  (:use :cl :lparallel :lparallel.queue :bt-semaphore))

(in-package :lparallel-user)

;;; initialise the kernel
(defun init ()
  (setf *kernel* (make-kernel 8 :name "channel-queue-kernel")))

(init)

So we will be using a kernel with 8 worker threads (one for each CPU core on the machine).

And once we’re done will all the examples, the following code will be run to close the kernel and free all used system resources:

;;; shut the kernel down
(defun shutdown ()
  (end-kernel :wait t))

(shutdown)

Using channels and queues

Top

First some definitions are in order.

A task is a job that is submitted to the kernel. It is simply a function object along with its arguments.

A channel in lparallel is similar to the same concept in Go. A channel is simply a means of communication with a worker thread. In our case, it is one particular way of submitting tasks to the kernel.

A channel is created in lparallel using lparallel:make-channel. A task is submitted using lparallel:submit-task, and the results received via lparallel:receive-result.

For instance, we can calculate the square of a number as:

(defun calculate-square (n)
  (let* ((channel (lparallel:make-channel))
         (res nil))
    (lparallel:submit-task channel #'(lambda (x)
                                       (* x x))
                           n)
    (setf res (lparallel:receive-result channel))
    (format t "Square of ~d = ~d~%" n res)))

And the output:

LPARALLEL-USER> (calculate-square 100)
Square of 100 = 10000
NIL

Now let’s try submitting multiple tasks to the same channel. In this simple example, we are simpy creating three tasks that square, triple, and quadrupls the supplied input respectively.

Note that in case of multiple tasks, the output will be in non-deterministic order:

(defun test-basic-channel-multiple-tasks ()
  (let ((channel (make-channel))
        (res '()))
    (submit-task channel #'(lambda (x)
                             (* x x))
                 10)
    (submit-task channel #'(lambda (y)
                             (* y y y))
                 10)
    (submit-task channel #'(lambda (z)
                             (* z z z z))
                 10)
     (dotimes (i 3 res)
       (push (receive-result channel) res))))

And the output:

LPARALLEL-USER> (dotimes (i 3)
                        	  (print (test-basic-channel-multiple-tasks)))

(100 1000 10000) 
(100 1000 10000) 
(10000 1000 100) 
NIL

lparallel also provides support for creating a blocking queue in order to enable message passing between worker threads. A queue is created using lparallel.queue:make-queue

Some useful functions for using queues are:

  • lparallel.queue:make-queue: create a FIFO blocking queue
  • lparallel.queue:push-queue: insert an element into the queue
  • lparallel.queue:pop-queue: pop an item from the queue
  • lparallel.queue:peek-queue: inspect value without popping it
  • lparallel.queue:queue-count: the number of entries in the queue
  • lparallel.queue:queue-full-p: check if the queue is full
  • lparallel.queue:queue-empty-p:check if the queue is empty
  • lparallel.queue:with-locked-queue: lock the queue during access
  • A basic demo showing basic queue properties:

    (defun test-queue-properties ()
      (let ((queue (make-queue :fixed-capacity 5)))
        (loop
           when (queue-full-p queue)
           do (return)
           do (push-queue (random 100) queue))
         (print (queue-full-p queue))
        (loop
           when (queue-empty-p queue)
           do (return)
           do (print (pop-queue queue)))
        (print (queue-empty-p queue)))
      nil)
    

    Which produces:

    LPARALLEL-USER> (test-queue-properties)
    
    T 
    17 
    51 
    55 
    42 
    82 
    T 
    NIL
    

    Note: lparallel.queue:make-queue is a generic interface which is actually backed by different types of queues. For instance, in the previous example, the actual type of the queue is lparallel.vector-queue since we specified it to be of fixed size using the :fixed-capacity keyword argument.

    The documentation doesn’t actually specify what keyword arguments we can pass to lparallel.queue:make-queue, so let’s and find that out in a different way:

    LPARALLEL-USER> (describe 'lparallel.queue:make-queue)
    LPARALLEL.QUEUE:MAKE-QUEUE
      [symbol]
    
    MAKE-QUEUE names a compiled function:
      Lambda-list: (&REST ARGS)
      Derived type: FUNCTION
      Documentation:
        Create a queue.
        
        The queue contents may be initialized with the keyword argument
        `initial-contents'.
        
        By default there is no limit on the queue capacity. Passing a
        `fixed-capacity' keyword argument limits the capacity to the value
        passed. `push-queue' will block for a full fixed-capacity queue.
      Source file: /Users/z0ltan/quicklisp/dists/quicklisp/software/lparallel-20160825-git/src/queue.lisp
    
    MAKE-QUEUE has a compiler-macro:
      Source file: /Users/z0ltan/quicklisp/dists/quicklisp/software/lparallel-20160825-git/src/queue.lisp
    ; No value
    

    So, as we can see, it supports the following keyword arguments – :fixed-capacity, and initial-contents.

    Now, if we do specify :fixed-capacity, then the actual type of the queue will be lparallel.vector-queue, and if we skip that keyword argument, the queue will be of type lparallel.cons-queue (which is a queue of unlimited size), as can be seen from the output of the following snippet:

    (defun check-queue-types ()
      (let ((queue-one (make-queue :fixed-capacity 5))
            (queue-two (make-queue)))
        (format t "queue-one is of type: ~a~%" (type-of queue-one))
        (format t "queue-two is of type: ~a~%" (type-of queue-two))))
    
    
    LPARALLEL-USER> (check-queue-types)
    queue-one is of type: VECTOR-QUEUE
    queue-two is of type: CONS-QUEUE
    NIL
    

    Of course, you can always create instances of the specific queue types yourself, but it is always better, when you can, to stick to the generic interface and letting the library create the proper type of queue for you.

    Now, let’s just see the queue in action!

    (defun test-basic-queue ()
      (let ((queue (make-queue))
            (channel (make-channel))
            (res '()))
        (submit-task channel #'(lambda ()
                         (loop for entry = (pop-queue queue)
                            when (queue-empty-p queue)
                            do (return)
                            do (push (* entry entry) res))))
        (dotimes (i 100)
          (push-queue i queue))
        (receive-result channel)
        (format t "~{~d ~}~%" res)))
    

    Here we submit a single task that repeatedly scans the queue till it’s empty, pops the available values, and pushes them into the res list.

    And the output:

    LPARALLEL-USER> (test-basic-queue)
    9604 9409 9216 9025 8836 8649 8464 8281 8100 7921 7744 7569 7396 7225 7056 6889 6724 6561 6400 6241 6084 5929 5776 5625 5476 5329 5184 5041 4900 4761 4624 4489 4356 4225 4096 3969 3844 3721 3600 3481 3364 3249 3136 3025 2916 2809 2704 2601 2500 2401 2304 2209 2116 2025 1936 1849 1764 1681 1600 1521 1444 1369 1296 1225 1156 1089 1024 961 900 841 784 729 676 625 576 529 484 441 400 361 324 289 256 225 196 169 144 121 100 81 64 49 36 25 16 9 4 1 0 
    NIL
    

    Killing tasks

    Top

    A small note mentioning the lparallel:kill-task function would be apropos at this juncture. This function is useful in those cases when tasks are unresponsive. The lparallel documentation clearly states that this must only be used as a last resort.

    All tasks which are created are by default assigned a category of :default. The dynamic property, *task-category* holds this value, and can be dynamically bound to different values (as we shall see).

    ;;; kill default tasks
    (defun test-kill-all-tasks ()
      (let ((channel (make-channel))
            (stream *query-io*))
        (dotimes (i 10)
          (submit-task channel #'(lambda (x)
                                   (sleep (random 10))
                                   (format stream "~d~%" (* x x))) (random 10)))
        (sleep (random 2))
        (kill-tasks :default)))
    

    Sample run:

    LPARALLEL-USER> (test-kill-all-tasks)
    16
    1
    8
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    

    Since we had created 10 tasks, all the 8 kernel worker threads were presumably busy with a task each. When we killed tasks of category :default, all these threads were killed as well and had to be regenerated (which is an expensive operation). This is part of the reason why lparallel:kill-tasks must be avoided.

    Now, in the example above, all running tasks were killed since all of them belonged to the :default category. Suppose we wish to kill only specific tasks, we can do that by binding *task-category* when we create those tasks, and then specifying the category when we invoke lparallel:kill-tasks.

    For example, suppose we have two categories of tasks – tasks which square their arguments, and tasks which cube theirs. Let’s assign them categories ’squaring-tasks and ’cubing-tasks respectively. Let’s then kill tasks of a randomly chosen category ’squaring-tasks or ’cubing-tasks.

    Here is the code:

    ;;; kill tasks of a randomly chosen category
    (defun test-kill-random-tasks ()
      (let ((channel (make-channel))
            (stream *query-io*))
        (let ((*task-category* 'squaring-tasks))
          (dotimes (i 5)
            (submit-task channel #'(lambda (x)
                                     (sleep (random 5))
                                     (format stream "~%[Squaring] ~d = ~d" x (* x x))) i)))
        (let ((*task-category* 'cubing-tasks))
          (dotimes (i 5)
            (submit-task channel #'(lambda (x)
                                     (sleep (random 5))
                                     (format stream "~%[Cubing] ~d = ~d" x (* x x x))) i)))
        (sleep 1)
        (if (evenp (random 10))
            (progn
              (print "Killing squaring tasks")
              (kill-tasks 'squaring-tasks))
            (progn
              (print "Killing cubing tasks")
              (kill-tasks 'cubing-tasks)))))
    

    And here is a sample run:

    LPARALLEL-USER> (test-kill-random-tasks)
    
    [Cubing] 2 = 8
    [Squaring] 4 = 16
    [Cubing] 4
     = [Cubing] 643 = 27
    "Killing squaring tasks" 
    4
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    
    [Cubing] 1 = 1
    [Cubing] 0 = 0
    
    LPARALLEL-USER> (test-kill-random-tasks)
    
    [Squaring] 1 = 1
    [Squaring] 3 = 9
    "Killing cubing tasks" 
    5
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    
    [Squaring] 2 = 4
    WARNING: lparallel: Replacing lost or dead worker.
    WARNING: lparallel: Replacing lost or dead worker.
    
    [Squaring] 0 = 0
    [Squaring] 4 = 16
    

    Using promises and futures

    Top

    Promises and Futures provide support for Asynchronous Programming.

    In lparallel-speak, a lparallel:promise is a placeholder for a result which is fulfilled by providing it with a value. The promise object itself is created using lparallel:promise, and the promise is given a value using the lparallel:fulfill macro.

    To check whether the promise has been fulfilled yet or not, we can use the lparallel:fulfilledp predicate function.
    Finally, the lparallel:force function is used to extract the value out of the promise. Note that this function blocks until the operation is complete.

    Let’s solidify these concepts with a very simple example first:

    (defun test-promise ()
      (let ((p (promise)))
        (loop
           do (if (evenp (read))
                  (progn
                    (fulfill p 'even-received!)
                    (return))))
        (force p)))
    

    Which generates the output:

    LPARALLEL-USER> (test-promise)
    5
    1
    3
    10
    EVEN-RECEIVED!
    [/code]

    Explanation: This simple example simply keeps looping forever until an even number has been entered. The promise is fulfilled inside the loop using lparallel:fulfill, and the value is then returned from the function by forcing it with lparallel:force.

    
    Now, let’s take a bigger example. Assuming that we don’t want to have to wait for the promise to be fulfilled, and instead have the current do some useful work, we can delegate the promise fulfillment to external explicitly as seen in the next example.
    
    Consider we have a function that squares its argument. And, for the sake of argument, it consumes a lot of time doing so. From our client code, we want to invoke it, and wait till the squared value is available.
    
    
    (defun promise-with-threads ()
      (let ((p (promise))
            (stream *query-io*)
            (n (progn
                 (princ "Enter a number: ")
                 (read))))
        (format t "In main function...~%")
        (bt:make-thread
         #'(lambda ()
             (sleep (random 10))
             (format stream "Inside thread... fulfilling promise~%")
             (fulfill p (* n n))))
        (bt:make-thread
         #'(lambda ()
             (loop
                when (fulfilledp p)
                do (return)
                do (progn
                     (format stream "~d~%" (random 100))
                     (sleep (* 0.01 (random 100)))))))
        (format t "Inside main function, received value: ~d~%" (force p))))
    

    And the output:

    LPARALLEL-USER> (promise-with-threads)
    Enter a number: 19
    In main function...
    44
    59
    90
    34
    30
    76
    Inside thread... fulfilling promise
    Inside main function, received value: 361
    NIL
    

    Explanation: There is nothing much in this example. We create a promise object p, and we spawn off a thread that sleeps for some random time and then fulfills the promise by giving it a value.

    Meanwhile, in the main thread, we spawn off another thread that keeps hecking if the promise has been fulfilled or not. If not, it prints some random number and continues checking. Once the promise has been fulfilled, we can extract the value using lparallel:force in the main thread as shown.

    This shows that promises can be fulfilled by different threads while the code that created the promise need not wait for the promise to be fulfilled. This is especially important since, as mentioned before, lparallel:force is a blocking call. We want to delay forcing the promise until the value is actually available.

    Another point to note when using promises is that once a promise has been fulfilled, invoking force on the same object will always return the same value. That is to say, a promise can be successfully fulfilled only once.

    For instance:

    (defun multiple-fulfilling ()
      (let ((p (promise)))
        (dotimes (i 10)
          (fulfill p (random 100))
          (format t "~d~%" (force p)))))
    

    Which produces:

    LPARALLEL-USER> (multiple-fulfilling)
    15
    15
    15
    15
    15
    15
    15
    15
    15
    15
    NIL
    

    So how does a future differ from a promise?

    A lparallel:future is simply a promise that is run in parallel, and as such, it does not block the main thread like a default use of <code<lparallel:promise would. It is executed in its own thread (by the lparallel library, of course).

    Here is a simple example of a future:

    (defun test-future ()
      (let ((f (future
                 (sleep (random 5))
                 (print "Hello from future!"))))
        (loop
           when (fulfilledp f)
           do (return)
           do (sleep (* 0.01 (random 100)))
             (format t "~d~%" (random 100)))
        (format t "~d~%" (force f))))
    

    And the output:

    LPARALLEL-USER> (test-future)
    5
    19
    91
    11
    Hello from future!
    NIL
    

    Explanation: This exactly is similar to the promise-with-threads example. Observe two differences, however - first of all, the lparallel:future macro has a body as well. This allows the future to fulfill itself! What this means is that as soon as the body of the future is done executing, lparallel:fulfilledp will always return true for the future object.

    Secondly, the future itself is spawned off on a separate thread by the library, so it does not interfere with the execution of the current thread very much unlike promises as could be seen in the promise-with-threads example (which needed an explicit thread for the fulfilling code in order to avoid blocking the current thread).

    The most interesting bit is that (even in terms of the actual theory propounded by Dan Friedman and others), a Future is conceptually something that fulfills a Promise. That is to say, a promise is a contract that some value will be generated sometime in the future, and a future is precisely that “something” that does that job.

    What this means is that even when using the lparallel library, the basic use of a future would be to fulfill a promise. This means that hacks like promise-with-threads need not be made by the user.

    Let’s take a small example to demonstrate this point (a pretty contrived example, I must admit!).

    Here’s the scenario: we want to read in a number and calculate its square. So we offload this work to another function, and continue with our own work. When the result is ready, we want it to be printed on the console without any intervention from us.

    Here’s how the code looks:

    ;;; Callback example using promises and futures
    (defun callback-promise-future-demo ()
      (let* ((p (promise))
             (stream *query-io*)
             (n (progn
                  (princ "Enter a number: ")
                  (read)))
             (f (future
                  (sleep (random 10))
                  (fulfill p (* n n))
                  (force (future
                           (format stream "Square of ~d = ~d~%" n (force p)))))))
        (loop
           when (fulfilledp f)
           do (return)
           do (sleep (* 0.01 (random 100))))))
    

    And the output:

    LPARALLEL-USER> (callback-promise-future-demo)
    Enter a number: 19
    Square of 19 = 361
    NIL
    

    Explanation: All right, so first off, we create a promise to hold the squared value when it is generated. This is the p object. The input value is stored in the local variable n.

    Then we create a future object f. This future simply squares the input value and fulfills the promise with this value. Finally, since we want to print the output in its own time, we force an anonymous future which simply prints the output string as shown.

    Note that this is very similar to the situation in an environment like Node, where we pass callback functions to other functions with the understanding that the callback will be called when the invoked function is done with its work.

    Finally note that the following snippet is still fine (even if it uses the blocking lparallel:force call because it’s on a separate thread):


    (force (future
    (format stream "Square of ~d = ~d~%" n (force p))))

    To summarise, the general idiom of usage is: define objects which will hold the results of asynchronous computations in promises, and use futures to fulfill those promises.

    Using cognates

    Top

    Cognates are argubaly the raison d’etre of the lparallel library. These constructs are what truly provide parallelism in the lparalle. Note, however, that most (if not all) of these constructs are built on top of futures and promises.

    To put it in a nutshell, cognates are simply functions that are intended to be the parallel equivalents of their Common Lisp counterparts. However, there are a few extra lparallel cognates that have no Common Lisp equivalents.

    At this juncture, it is important to know that cognates come in two basic flavours:

    1. Constructs for fine-grained parallelism: defpun, plet, plet-if, etc.
    2. Explicit functions and macros for performing parallel operations - pmap, preduce, psort, pdotimes, etc.

    In the first case we don’t have much explicit control over the operations themselves. We mostly rely on the fact that the library itself will optimise and parallelise the forms to whatever extent it can. In this post, we will focus on the second category of cognates.

    Take, for instance, the cognate function lparallel:pmap is exactly the same as the Common Lisp equivalent, map, but it runs in parallel. Let’s demonstrate that through an example.

    Suppose we had a list of random strings of length varying from 3 to 10, and we wished to collect their lengths in a vector.

    Let’s first set up the helper functions that will generate the random strings:

    (defvar *chars*
      (remove-duplicates
       (sort
        (loop for c across "The quick brown fox jumps over the lazy dog"
           when (alpha-char-p c)
           collect (char-downcase c))
        #'char<)))   
    
    (defun get-random-strings (&optional (count 100000))
      "generate random strings between lengths 3 and 10"
      (loop repeat count
         collect
           (concatenate 'string  (loop repeat (+ 3 (random 8))
                               collect (nth (random 26) *chars*)))))
    

    And here’s how the Common Lisp map version of the solution might look like:

    ;;; map demo
    (defun test-map ()
      (map 'vector #'length (get-random-strings 100)))
    

    And let’s have a test run:

    LPARALLEL-USER> (test-map)
    #(7 5 10 8 7 5 3 4 4 10)
    

    And here’s the lparallel:pmap equivalent:

    ;;;pmap demo
    (defun test-pmap ()
      (pmap 'vector #'length (get-random-strings 100)))
    

    which produces:

    LPARALLEL-USER> (test-pmap)
    #(8 7 6 7 6 4 5 6 5 7)
    LPARALLEL-USER> 
    

    As you can see from the definitions of test-map and test-pmap, the syntax of the lparallel:map and lparallel:pmap functions are exactly the same (well, almost - lparallel:pmap has a few more optional arguments).

    Some useful cognate functions and macros (all of them are functions except when marked so explicitly. Note that there are quite a few cognates, and I have chosen a few to try and represent every category through an example:

    • lparallel:pmap:
      Parallel version of map.

      Note that all the mapping functions (lparallel:pmap, lparallel:pmapc,lparallel:pmapcar, etc.) are take two special keyword arguments - :size, specifiying the number of elements of the input sequence(s) to process, and :parts which specifies the number of parallel parts to divide the sequence(s) into.

      ;;; pmap - function
      (defun test-pmap ()
        (let ((numbers (loop for i below 10
                          collect i)))
          (pmap 'vector #'(lambda (x)
                            (* x x))
                :parts (length numbers)
                numbers)))
      

      Sample run:

      LPARALLEL-USER> (test-pmap)
      
      #(0 1 4 9 16 25 36 49 64 81)
      
    • lparallel:por:
      Parallel version of or. The behaviour is that it returns the first non-nil element amongst its arguments. However, due to the parallel nature of this macro, that element varies.

      ;;; por - macro
      (defun test-por ()
        (let ((a 100)
              (b 200)
              (c nil)
              (d 300))
          (por a b c d)))
      

      Sample run:

      LPARALLEL-USER> (dotimes (i 10)
                        (print (test-por)))
      
      300 
      300 
      100 
      100 
      100 
      300 
      100 
      100 
      100 
      100 
      NIL
      

      In the case of the normal or operator, it would always have returned the first non-nil element viz. 100.

    • lparallel:pdotimes:
      Parallel version of dotimes. Note that this macro also take an optional :parts argument.

      ;;; pdotimes - macro
      (defun test-pdotimes ()
        (pdotimes (i 5)
          (declare (ignore i))
          (print (random 100))))
      

      Sample run:

      LPARALLEL-USER> (test-pdotimes)
      
      39 
      29 
      81 
      42 
      56 
      NIL
      
    • lparallel:pfuncall:
      Parallel version of funcall.

      ;;; pfuncall - macro
      (defun test-pfuncall ()
        (pfuncall #'* 1 2 3 4 5))
      

      Sample run:

      LPARALLEL-USER> (test-pfuncall)
      
      120
      
    • lparallel:preduce:
      Parallel version of reduce.

      This very important function also takes two optional keyword arguments - :parts (same meaning as explained), and :recurse. If :recurse is non-nil, it recursively applies lparallel:preduce to its arguments, otherwise it default to using reduce.

      ;;; preduce - function
      (defun test-preduce ()
        (let ((numbers (loop for i from 1 to 100
                          collect i)))
          (preduce #'+ 
                   numbers
                   :parts (length numbers)
                   :recurse t)))
      

      Sample run:

      LPARALLEL-USER> (test-preduce)
      
      5050
      
    • lparallel:premove-if-not:
      Parallel version of remove-if-not. This is essentially equivalent to “filter” in Functional Programming parlance.

      ;;; premove-if-not 
      (defun test-premove-if-not ()
        (let ((numbers (loop for i from 1 to 100
                          collect i)))
          (premove-if-not #'evenp numbers)))
      

      Sample run:

      LPARALLEL-USER> (test-premove-if-not)
      
      (2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54
       56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 100)
      
    • lparallel:pevery:
      Parallel version of every.

      ;;; pevery - function
      (defun test-pevery ()
        (let ((numbers (loop for i from 1 to 100
                          collect i)))
          (list (pevery #'evenp numbers)
                (pevery #'integerp numbers))))
      

      Sample run:

      LPARALLEL-USER> (test-pevery)
      
      (NIL T)
      

      In this example, we are performing two checks - firstly, whether all the numbers in the range [1,100] are even, and secondly, whether all the numbers in the same range are integers.

    • lparallel:count:
      Parallel version of count.

      ;;; pcount - function
      (defun test-pcount ()
        (let ((chars "The quick brown fox jumps over the lazy dog"))
          (pcount #\e chars)))
      

      Sample run:

      LPARALLEL-USER> (test-pcount)
      
      3
      
    • lparallel:psort:
      Parallel version of sort.

      ;;; psort - function
      (defstruct person
        name
        age)
      
      (defun test-psort ()
        (let* ((names (list "Rich" "Peter" "Sybil" "Basil" "Candy" "Slava" "Olga"))
               (people (loop for name in names
                          collect (make-person :name name :age (+ (random 20) 20)))))
          (print "Before sorting...")
          (print people)
          (fresh-line)
          (print "After sorting...")
          (psort
           people
           #'(lambda (x y)
               (< (person-age x)
                  (person-age y)))
           :test #'=)))
      

      Sample run:

      LPARALLEL-USER> (test-psort)
      
      "Before sorting..." 
      (#S(PERSON :NAME "Rich" :AGE 38) #S(PERSON :NAME "Peter" :AGE 24)
       #S(PERSON :NAME "Sybil" :AGE 20) #S(PERSON :NAME "Basil" :AGE 22)
       #S(PERSON :NAME "Candy" :AGE 23) #S(PERSON :NAME "Slava" :AGE 37)
       #S(PERSON :NAME "Olga" :AGE 33)) 
      
      "After sorting..." 
      (#S(PERSON :NAME "Sybil" :AGE 20) #S(PERSON :NAME "Basil" :AGE 22)
       #S(PERSON :NAME "Candy" :AGE 23) #S(PERSON :NAME "Peter" :AGE 24)
       #S(PERSON :NAME "Olga" :AGE 33) #S(PERSON :NAME "Slava" :AGE 37)
       #S(PERSON :NAME "Rich" :AGE 38))
      

      In this example, we first define a structure of type person for storing information about people. Then we create a list of 7 people with randomly generated ages (between 20 and 39). Finally, we sort them by age in non-decreasing order.

    References

    Top

    There are, of course, a lot more functions, objects, and idiomatic ways of performing parallel computations using the lparallel library. This post barely scratches the surface on those. However, the general flow of operation is amply demonstrated here, and for further reading, you may find the following resources useful:

    In the final part of this series, we will discuss an extremely important topic common to all that was covered in this post - error handling.

    Advertisements
Basic Concurrency and Parallelism in Common Lisp – Part 4a (Parallelism using lparallel – fundamentals)

One thought on “Basic Concurrency and Parallelism in Common Lisp – Part 4a (Parallelism using lparallel – fundamentals)

Speak your mind!

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s