Coverage report: /home/ellis/comp/core/std/thread.lisp

KindCoveredAll%
expression1741490 11.7
branch252 3.8
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; threads.lisp --- Multi-thread utilities
2
 
3
 ;; Threading Macros
4
 
5
 ;;; Commentary:
6
 
7
 ;; mostly yoinked from sb-thread and lparallel
8
 
9
 #|
10
 ;; unix-getrusage  
11
 ;; 0,-1,-2
12
 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
13
 ;; (setf sb-unix::*on-dangerous-wait* :error)
14
 
15
 ;; TODO 2024-10-03: with-cas-lock?
16
 |#
17
 ;;; Code:
18
 (in-package :std/thread)
19
 
20
 ;; (sb-thread:thread-os-tid sb-thread:*current-thread*)
21
 ;; sb-thread:interrupt-thread
22
 ;;; Kernel Classes
23
 (defclass kernel-class ()
24
   ()
25
   (:metaclass funcallable-standard-class)
26
   (:documentation "Standard kernel class."))
27
 
28
 (defclass kernel-object (funcallable-standard-object)
29
   ()
30
   (:metaclass funcallable-standard-class)
31
   (:documentation "Standard kernel object."))
32
 
33
 (definline make-kernel (fn)
34
   "Return a new KERNEL-OBJECT and set the instance function to FN."
35
   (declare (function fn))
36
   (let ((fin (make-instance 'kernel-object)))
37
     (set-funcallable-instance-function fin (compile nil fn))
38
     fin))
39
 
40
 ;; (defmethod :before initialize-instance ((self kernel-object) &key)
41
 ;;   (when *kernel* (set-funcallable-instance-function self *kernel*)))
42
 
43
 ;; make-instance (set-funcallable-instance-function kernel)
44
 
45
 ;;; Types
46
 (deftype kernel () 
47
   "A type which specifies kernels. A kernel may be a list which is interpreted as
48
 a lambda expression, a symbol which names a function, or a compiled-function."
49
   '(or cons symbol compiled-function kernel-object))
50
 
51
 (deftype kernel-function ()
52
   "A function of at least one argument with no return value."
53
   '(function (t &rest args) (values)))
54
 
55
 (deftype worker-kernel-function (&optional (kind 'worker))
56
   "A function which is suitable as a kernel for KIND workers."
57
   `(function (,kind t t) (values)))
58
 (deftype channel-kernel-function (&optional (kind 'channel))
59
   "A function which is suitable as a kernel for KIND channels."
60
   `(function (,kind) (values)))
61
 (deftype pool-kernel-function (&optional (kind 'thread-pool))
62
   "A function which is suitable as a kernel for KIND thread-pools."
63
   `(function (,kind scheduler t t &rest args) (values)))
64
 
65
 ;;; Vars
66
 (defvar *default-special-bindings* nil
67
   "This variable holds an alist associating special variable symbols
68
   to forms to evaluate. Special variables named in this list will
69
   be locally bound in the new thread before it begins executing user code.
70
 
71
   This variable may be rebound around calls to MAKE-THREAD to
72
   add/alter default bindings. The effect of mutating this list is
73
   undefined, but earlier forms take precedence over later forms for
74
   the same symbol, so defaults may be overridden by consing to the
75
   head of the list.")
76
 
77
 (defvar *worker-class* 'worker
78
   "The default WORKER class used to initialize THREAD-POOLs.")
79
 (defvar *worker* nil
80
   "The current WORKER or nil.")
81
 (defvar *work-priority* :default
82
   "The default priority assigned to new work.")
83
 (defvar *scheduler-class* 'biased-scheduler
84
   "The default class of the scheduler used in THREAD-POOLs.")
85
 (defvar *thread-pool* nil
86
   "The current THREAD-POOL or nil.")
87
 ;; on core-i7 3.4ghz, a single spin takes ~ 2.5 microseconds.
88
 (defvar *default-spin-count* 2000
89
   "Default value of the 'spin-count' argument to MAKE-THREAD-POOL.")
90
 
91
 (defvar *debug-threads-p* t
92
   "When non-nil the debugger is invoked when an error goes unhandled in a
93
 threaded context.")
94
 
95
 (defvar *lisp-exiting-p* nil
96
   "True if the Lisp process is exiting - used for skipping auto-replacement of
97
 killed workers during shutdown.")
98
 
99
 ;; TODO 2025-06-17: 
100
 (defun %pool (state)
101
   "An empty pool kernel."
102
   (declare (ignore state)))
103
 
104
 (defvar *pool-kernel* (make-kernel #'%pool)
105
   "A function which drives THREAD-POOLs.")
106
 
107
 ;; TODO 2025-06-17: 
108
 (defun %work (work)
109
   "An empty worker kernel."
110
   (let ((work (or work (pop-spin-queue (work *worker*)))))
111
     (typecase work
112
       (function (funcall work))
113
       (cons (apply (car work) (cdr work)))
114
       (t work))))
115
 
116
 (defvar *kernel* nil
117
   "The current thread's kernel, or nil.")
118
 
119
 (defvar *worker-kernel* (make-kernel #'%work)
120
   "A kernel which drives WORKERs.")
121
 
122
 ;;; Globals
123
 (sb-ext:defglobal *worker-threads* nil
124
     "list of worker threads.")
125
 (sb-ext:defglobal *super-threads* nil
126
     "List of threads with supervisor privileges.")
127
 (sb-ext:defglobal *oracle-table* (make-hash-table)
128
     "Hashtable containining (ID . ORACLE-SCOPE).")
129
 (sb-ext:defglobal *thread-pool-table* (make-hash-table)
130
     "Hashtable containing (NAME . THREAD-POOL).")
131
 
132
 ;;; Conditions
133
 (defvar *error-workers* nil
134
   "Track debugger popups in order to kill them.")
135
 
136
 (defvar *error-workers-lock* (make-mutex :name "error workers")
137
   "Lock for *ERROR-WORKERS*.")
138
 
139
 (defun invoke-transfer-error (error)
140
   "Equivalent to (invoke-restart 'transfer-error error)."
141
   (invoke-restart 'transfer-error error))
142
 
143
 (defun transfer-error-report (stream)
144
   (format stream "Transfer this error to a dependent thread, if one exists."))
145
 
146
 (defun condition-handler (condition)
147
   "Mimic the CL handling mechanism, calling handlers until one assumes
148
 control (or not)."
149
   (loop for ((condition-type . handler) . rest) on *handlers*
150
         do (when (typep condition condition-type)
151
              (let ((*handlers* rest))
152
                (handler-bind ((condition #'condition-handler))
153
                  (funcall handler condition)))))
154
   (when (typep condition 'error)
155
     (invoke-transfer-error condition)))
156
 
157
 (defconstant +work-tag+ 'my-work)
158
 
159
 (defvar *debugger-error* nil
160
   "Track the error inside the debugger for the `transfer-error' restart.")
161
 
162
 (defvar *handler-active-p* nil
163
   "Non-nil when handlers have been established via `call-with-work-handler'.")
164
 
165
 (defun unwrap-result (result)
166
   "In `receive-result', this is called on the stored work result. The
167
 user receives the return value of this function."
168
   (typecase result
169
     (wrapped-error
170
      ;; A `wrapped-error' signals an error upon being unwrapped.
171
      (error (wrapped-condition-value result)))
172
     (t
173
      ;; Most objects unwrap to themselves.
174
      result)))
175
 
176
 (defmacro work-handler-bind (clauses &body body)
177
   "Like `handler-bind' but handles conditions signaled inside work
178
 that was created in `body'."
179
   (let ((forms (loop for clause in clauses
180
                      for (name fn . more) = clause
181
                      do (unless (and name (symbolp name) fn (not more))
182
                           (error "Ill-formed binding in `work-handler-bind': ~a"
183
                                  clause))
184
                      collect `(cons ',name ,fn))))
185
     `(let ((*handlers* (list* ,@forms *handlers*)))
186
        ,@body)))
187
 
188
 (defun transfer-error-restart (&optional (err *debugger-error*))
189
   (when err
190
     (throw '#.+work-tag+ (wrap-error err))))
191
 
192
 (defun call-with-tracked-error (condition body-fn)
193
   (when *worker*
194
     (with-mutex (*error-workers-lock*)
195
       (push *worker* *error-workers*)))
196
   (unwind-protect
197
        (let ((*debugger-error* condition))
198
          (funcall body-fn))
199
     (when *worker*
200
       (with-mutex (*error-workers-lock*)
201
         (setf *error-workers*
202
               (delete *worker* *error-workers*))))))
203
 
204
 (defmacro with-tracked-error (condition &body body)
205
   `(call-with-tracked-error ,condition (lambda () ,@body)))
206
 
207
 (defun make-debugger-hook ()
208
   "Record `*debugger-error*' for the `transfer-error' restart."
209
   (if *debugger-hook*
210
       (let ((previous-hook *debugger-hook*))
211
         (lambda (condition self)
212
           (with-tracked-error condition
213
             (funcall previous-hook condition self))))
214
       (lambda (condition self)
215
         (declare (ignore self))
216
         (with-tracked-error condition
217
           (invoke-debugger condition)))))
218
 
219
 (defmacro with-work-context (&body body)
220
   "Eval BODY in a context where throw to +WORK-TAG+ will be caught."
221
   `(catch +work-tag+ ,@body))
222
 
223
 (defun %call-with-work-handler (fn)
224
   "Call FN with worker conditions handled."
225
   (declare (function fn))
226
   (let ((*handler-active-p* t)
227
         (*debugger-hook* (make-debugger-hook)))
228
     (handler-bind ((condition #'condition-handler))
229
       (restart-bind ((transfer-error #'transfer-error-restart
230
                                      :report-function #'transfer-error-report))
231
         (funcall fn)))))
232
 
233
 (defun call-with-work-handler (fn)
234
   "Call FN in a worker context with conditions handled."
235
   (declare (function fn))
236
   (with-work-context
237
     (if *handler-active-p*
238
         (funcall fn)
239
         (%call-with-work-handler fn))))
240
 
241
 (define-condition worker-killed-error (error) ()
242
   (:report
243
    "The worker was killed.")
244
   (:documentation
245
    "Error signaled when attempting to obtain a result from a killed worker."))
246
 
247
 (define-condition kernel-init-error (error) ()
248
   (:report
249
    "The kernel failed to initialize.")
250
   (:documentation
251
    "Error signaled when a kernel object fails to initialize."))
252
 
253
 (define-condition no-kernel-error (error) ()
254
   (:report "invalid *KERNEL*")
255
   (:documentation
256
    "Error signaled when a kernel object is invalid."))
257
 
258
 (define-condition no-thread-pool-error () ()
259
   (:report
260
    "invalid *THREAD-POOL*")
261
   (:documentation
262
    "Error signaled when a kernel object is invalid."))
263
 
264
 ;;; Utils
265
 (defmacro mod-inc (k n)
266
   `(the array-index (mod (the array-index (1+ (the array-index ,k)))
267
                          (the array-index ,n))))
268
 
269
 (defmacro mod-dec (k n)
270
   `(the array-index (mod (the fixnum (1- (the array-index ,k)))
271
                          (the array-index ,n))))
272
 
273
 (defmacro mod-incf (place n)
274
   `(the array-index (setf ,place (mod-inc ,place ,n))))
275
 
276
 (defmacro mod-decf (place n)
277
   `(the array-index (setf ,place (mod-dec ,place ,n))))
278
 
279
 (defun thread-support-p () 
280
   "Return Non-nil if threads are supported on this system. (:THREAD-SUPPORT feature)"
281
   (member :thread-support *features*))
282
 
283
 (eval-always
284
   (defun print-top-level (msg)
285
     "Print MSG to the top-level *STANDARD-OUTPUT*."
286
     (let ((*standard-output* *standard-output*))
287
       (sb-thread:make-thread
288
        (lambda ()
289
          (format *standard-output* "~A" msg)))
290
       nil)))
291
 
292
 (defun println-top-level (msg)
293
   "Print MSG to the top-level *STANDARD-OUTPUT* followed by a newline."
294
   (let ((*standard-output* *standard-output*))
295
     (sb-thread:make-thread
296
      (lambda ()
297
        (format *standard-output* "~A~%" msg)))
298
     nil))
299
 
300
 (defun find-thread-by-id (id)
301
   "Search for thread by ID which must be an u64. On success returns the thread itself or nil."
302
   (find id (sb-thread::list-all-threads) :test '= :key 'thread-os-tid))
303
 
304
 (defun find-thread (name)
305
   "Find a thread by name."
306
   (find name (sb-thread::list-all-threads) :test 'equal :key 'thread-name))
307
 
308
 (defun thread-key-list ()
309
   "Return AVLNODE-KEYs associated with threads in *ALL-THREADS*."
310
   (sb-thread::avltree-filter #'sb-thread::avlnode-key sb-thread::*all-threads*))
311
 
312
 (defun thread-id-list ()
313
   "Return the THREAD-OS-TID associated with thread in *ALL-THREADS*."
314
   (sb-thread::avltree-filter (lambda (th) (thread-os-tid (sb-thread::avlnode-data th))) sb-thread::*all-threads*))
315
 
316
 (defun thread-count ()
317
   "Return the current count of threads in *ALL-THREADS*."
318
   (sb-thread::avl-count sb-thread::*all-threads*))
319
 
320
 (defun make-threads (n thunk &key (name "thread"))
321
   "Make N number of threads which each eval THUNK."
322
   (declare (type fixnum n))
323
   (loop for i below n
324
         collect (make-thread thunk :name (format nil "~A-~D" name i))))
325
 
326
 (defun make-ephemeral-thread (name)
327
   "Make a new 'ephemeral' thread called NAME."
328
   (sb-thread::%make-thread name t (make-semaphore :name name)))
329
 
330
 (defgeneric designate-oracle (host guest)
331
   (:documentation "Designate an oracle GUEST for HOST."))
332
 (defgeneric assign-supervisor (worker supervisor)
333
   (:documentation "Assign a SUPERVISOR for WORKER."))
334
 
335
 ;;; Threads
336
 (defmacro with-thread ((&key bindings name) &body body)
337
   "Eval BODY in a new thread with optional BINDINGS and NAME."
338
   `(with-default-special-bindings ,bindings
339
      (make-thread (lambda () ,@body)
340
                   ,@(when name `(:name ,name)))))
341
 
342
 (declaim (inline parse-lambda-list-names))
343
 (defun parse-lambda-list-names (ll)
344
   (multiple-value-bind (idx _ args) (sb-int:parse-lambda-list ll)
345
     (declare (ignore idx _))
346
     (loop for a in args
347
           collect
348
              (etypecase a
349
                (atom a)
350
                (cons (car a))))))
351
 
352
 (defmacro with-threads ((i n &key return bindings args name) &body body)
353
   "Eval BODY N times in a function with I bound to a new thread. Optional
354
 keywords modify the bindings in effect."
355
   `(with-default-special-bindings ,bindings
356
      (dotimes (,i ,n ,@(when return (list return)))
357
        (make-thread (lambda (,@args) ,@body)
358
                     ,@(when name `(:name (symbolicate ,name i)))))))
359
 
360
 (defun finish-threads (&rest threads)
361
   "Finish THREADS, attempting to join them, else calling TERMINATE-THREAD."
362
   (let ((threads (flatten threads)))
363
     (unwind-protect
364
          (mapc #'join-thread threads)
365
       (dolist (thread threads)
366
         (when (thread-alive-p thread)
367
           (terminate-thread thread))))))
368
 
369
 (defun timed-join-thread (thread timeout)
370
   "Join THREAD waiting at most TIMEOUT seconds."
371
   (declare (type thread thread) (type float timeout))
372
   (handler-case (sb-sys:with-deadline (:seconds timeout)
373
                   (join-thread thread :default :aborted))
374
     (sb-ext:timeout ()
375
       :timeout)))
376
 
377
 (defun hang ()
378
   "Attempt to join the current thread, causing it to hang. You should never call this."
379
   (join-thread *current-thread*))
380
 
381
 (defun kill-thread (thread)
382
   "Kill THREAD, ignoring all errors which may occur."
383
   (when (thread-alive-p thread)
384
     (ignore-errors
385
      (terminate-thread thread))))
386
 
387
 ;; (sb-vm::primitive-object-slots (sb-vm::primitive-object 'sb-vm::thread))
388
 ;; (defun init-session (&optional (thread *current-thread*)) (sb-thread::new-session thread))
389
 
390
 ;; (sb-thread::with-progressive-timeout (timet :seconds 4) (dotimes (i 4000) (print (timet))))
391
 
392
 ;; (describe sb-thread::*session*)
393
 
394
 ;; make-listener-thread 
395
 
396
 ;; with-progressive-timeout
397
 
398
 ;; (definline all-threads-sap ()
399
 ;;   (sb-vm::extern-alien "all_threads" sb-vm::system-area-pointer))
400
 
401
 ;; from sb-thread
402
 (defun dump-thread ()
403
   "Dump the contents of THREAD."
404
   (let* ((slots (sb-vm::primitive-object-slots #1=(sb-vm::primitive-object 'sb-vm::thread)))
405
          (sap (current-thread-sap))
406
          (thread-obj-len (sb-vm::primitive-object-length #1#))
407
          (names (make-array thread-obj-len :initial-element "")))
408
     (loop for slot across slots
409
           do
410
              (setf (aref names (sb-vm::slot-offset slot)) (sb-vm::slot-name slot)))
411
     (flet ((safely-read (sap offset &aux (bits (sb-vm::sap-ref-word sap offset)))
412
              (cond ((eql bits sb-vm:no-tls-value-marker) :no-tls-value)
413
                    ((eql (logand bits sb-vm:widetag-mask) sb-vm:unbound-marker-widetag) :unbound)
414
                    (t (sb-vm::sap-ref-lispobj sap offset))))
415
            (show (sym val)
416
              (declare (type fixnum sym))
417
              (let ((*print-right-margin* 128)
418
                    (*print-lines* 4))
419
                (format t " ~3d ~30a : ~s~%"
420
                        #+sb-thread (ash sym (- sb-vm:word-shift))
421
                        #-sb-thread 0
422
                        #+sb-thread (sb-vm:symbol-from-tls-index sym)
423
                        #-sb-thread sym
424
                        val))))
425
       (format t "~&TLS: (base=~x)~%" (sb-vm::sap-int sap))
426
       (loop for tlsindex from sb-vm:n-word-bytes below
427
                #+sb-thread (ash sb-vm::*free-tls-index* sb-vm:n-fixnum-tag-bits)
428
                #-sb-thread (ash thread-obj-len sb-vm:word-shift)
429
             by sb-vm:n-word-bytes
430
             do
431
                (unless (<= sb-vm::thread-allocator-histogram-slot
432
                            (ash tlsindex (- sb-vm:word-shift))
433
                            (1- sb-vm::thread-lisp-thread-slot))
434
                  (let ((thread-slot-name
435
                          (if (< tlsindex (ash thread-obj-len sb-vm:word-shift))
436
                              (aref names (ash tlsindex (- sb-vm:word-shift))))))
437
                    (if (and thread-slot-name (sb-vm::neq thread-slot-name 'sb-vm::lisp-thread))
438
                        (format t " ~3d ~30a : #x~x~%" (ash tlsindex (- sb-vm:word-shift))
439
                                thread-slot-name (sb-vm::sap-ref-word sap tlsindex))
440
                        (let ((val (safely-read sap tlsindex)))
441
                          (unless (eq val :no-tls-value)
442
                            (show tlsindex val)))))))
443
       (let ((from (sb-vm::descriptor-sap sb-vm:*binding-stack-start*))
444
             (to (sb-vm::binding-stack-pointer-sap)))
445
         (format t "~%Binding stack: (depth ~d)~%"
446
                 (/ (sb-vm::sap- to from) (* sb-vm:binding-size sb-vm:n-word-bytes)))
447
         (loop
448
           (when (sb-vm::sap>= from to) (return))
449
           (let ((val (safely-read from 0))
450
                 (sym #+sb-thread (sb-vm::sap-ref-word from sb-vm:n-word-bytes) ; a TLS index
451
                      #-sb-thread (sb-vm::sap-ref-lispobj from sb-vm:n-word-bytes)))
452
             (show sym val))
453
           (setq from (sb-vm::sap+ from (* sb-vm:binding-size sb-vm:n-word-bytes))))))))
454
 
455
 (definline wait-for-threads (threads)
456
   (map 'list (lambda (thread) (sb-thread:join-thread thread :default nil)) threads))
457
 
458
 (defun process-all-interrupts (&optional (thread sb-thread:*current-thread*))
459
   (sb-ext:wait-for (null (sb-thread::thread-interruptions thread))))
460
 
461
 ;;;; Thread Wrappers
462
 ;; BORDEAUX-THREADS version
463
 (defun condition-wait* (cvar lock &key timeout)
464
   (let ((success (condition-wait cvar lock :timeout timeout)))
465
     (when (not success)
466
       (grab-mutex lock))
467
     success))
468
 
469
 (sb-ext:defglobal .known-threads-lock. (make-mutex :name "known-threads-lock"))
470
 (sb-ext:defglobal .known-threads. (make-hash-table :weakness :key))
471
 
472
 (defun %get-thread-wrapper (native-thread)
473
   (multiple-value-bind (thread presentp)
474
       (with-mutex (.known-threads-lock.)
475
         (gethash native-thread .known-threads.))
476
     (if presentp
477
         thread
478
         (error "Thread wrapper is supposed to exist for ~S"
479
                native-thread))))
480
 
481
 (defun (setf thread-wrapper) (thread native-thread)
482
   (with-mutex (.known-threads-lock.)
483
     (setf (gethash native-thread .known-threads.) thread)))
484
 
485
 (defun remove-thread-wrapper (native-thread)
486
   (with-mutex (.known-threads-lock.)
487
     (remhash native-thread .known-threads.)))
488
 
489
 ;; Forms are evaluated in the new thread or in the calling thread?
490
 
491
 (macrolet
492
     ((defbindings (name docstring &body initforms)
493
          (check-type docstring string)
494
        `(std/macs:define-constant ,name
495
             (list
496
              ,@(loop for (special form) in initforms
497
                      collect `(cons ',special ',form)))
498
           :test #'equal
499
           :documentation ,docstring)))
500
   (defbindings +standard-io-bindings+
501
       "Standard bindings of printer/reader control variables as per
502
 CL:WITH-STANDARD-IO-SYNTAX. Forms are evaluated in the calling thread."
503
     (*package*                   (find-package :common-lisp-user))
504
     (*print-array*               t)
505
     (*print-base*                10)
506
     (*print-case*                :upcase)
507
     (*print-circle*              nil)
508
     (*print-escape*              t)
509
     (*print-gensym*              t)
510
     (*print-length*              nil)
511
     (*print-level*               nil)
512
     (*print-lines*               nil)
513
     (*print-miser-width*         nil)
514
     (*print-pprint-dispatch*     (copy-pprint-dispatch nil))
515
     (*print-pretty*              nil)
516
     (*print-radix*               nil)
517
     (*print-readably*            t)
518
     (*print-right-margin*        nil)
519
     (*random-state*              (make-random-state t))
520
     (*read-base*                 10)
521
     (*read-default-float-format* 'double-float)
522
     (*read-eval*                 nil)
523
     (*read-suppress*             nil)
524
     (*readtable*                 (copy-readtable nil))))
525
 
526
 (defun compute-special-bindings (bindings)
527
   (remove-duplicates (append bindings +standard-io-bindings+)
528
                      :from-end t :key #'car))
529
 
530
 (defvar *%current-thread*)
531
 
532
 (defun establish-dynamic-env (thread function special-bindings trap-conditions)
533
   "Return a closure that binds the symbols in SPECIAL-BINDINGS and calls
534
 FUNCTION."
535
   (let* ((bindings (compute-special-bindings special-bindings))
536
          (specials (mapcar #'car bindings))
537
          (values (mapcar (lambda (f) (eval (cdr f))) bindings)))
538
     (std/macs:named-lambda %establish-dynamic-env-wrapper ()
539
       (progv specials values
540
         (with-slots (%lock %return-values %exit-condition)
541
             thread
542
           (flet ((record-condition (c)
543
                    (with-mutex (%lock)
544
                      (setf %exit-condition c)))
545
                  (run-function ()
546
                    (let ((*%current-thread* nil))
547
                      ;; Wait until the thread creator has finished creating
548
                      ;; the wrapper.
549
                      (with-mutex (%lock)
550
                        (setf *%current-thread* (%get-thread-wrapper *%current-thread*)))
551
                      (let ((retval
552
                              (multiple-value-list (funcall function))))
553
                        (with-mutex (%lock)
554
                          (setf %return-values retval))
555
                        retval))))
556
             (unwind-protect
557
                  (if trap-conditions
558
                      (handler-case
559
                          (values-list (run-function))
560
                        (condition (c)
561
                          (record-condition c)))
562
                      (handler-bind
563
                          ((condition #'record-condition))
564
                        (values-list (run-function)))))))))))
565
 
566
 ;;; Queues
567
 ;;;; Biased Queue
568
 (defstruct (biased-queue (:conc-name queue-))
569
   (lock (make-mutex :name "queue-lock"))
570
   (cvar (make-waitqueue :name "queue-cvar"))
571
   (high (make-queue) :type queue)
572
   (low (make-queue) :type queue))
573
 
574
 (defun push-biased-queue (obj queue)
575
   (declare (biased-queue queue))
576
   (push-queue obj (queue-high queue))
577
   (condition-notify (queue-cvar queue))
578
   (values))
579
 
580
 (defun push-biased-queue-low (obj queue)
581
   (declare (biased-queue queue))
582
   (push-queue obj (queue-low queue))
583
   (condition-notify (queue-cvar queue))
584
   (values))
585
 
586
 ;;; Channel
587
 (defclass channel (funcallable-standard-object)
588
   ((queue :initform (make-queue) :type queue :initarg :queue :accessor channel-queue)
589
    (pool :initform *thread-pool* :type kernel :initarg :kernel :accessor channel-pool))
590
   (:metaclass funcallable-standard-class))
591
 
592
 ;;; Limiter
593
 (defclass thread-limiter ()
594
   ((accept-work-p :accessor accept-work-p :type boolean :initarg :accept-work-p)
595
    (limiter-lock :accessor limiter-lock :initarg :limiter-lock)
596
    (limiter-count :accessor limiter-count :initarg :limiter-count :type fixnum)))
597
 
598
 (defun initial-limiter-count (thread-count) (+ thread-count 1))
599
 
600
 ;;; Kill
601
 (defconstant +worker-suicide-tag+ 'worker-suicide-tag)
602
 
603
 (defun kill (pool)
604
   (assert pool)
605
   (let ((kill-count 0))
606
     (with-slots (lock workers) pool
607
       (with-mutex (lock)
608
         (sb-sequence:dosequence (worker workers)
609
           (when (not (eq (worker-thread worker) *current-thread*))
610
             ;; (eql category (running-category worker))
611
             (terminate-thread (worker-thread worker))
612
             (incf kill-count)))
613
         (when *worker*
614
           (assert (eq (worker-thread *worker*) *current-thread*))
615
           ;; (when (eql category (running-category *worker*))
616
           (throw '#.+worker-suicide-tag+ nil))))
617
     kill-count))
618
 
619
 (defun kill-errors ()
620
   (let ((suicide nil))
621
     (with-mutex (*error-workers-lock*)
622
       (dolist (worker *error-workers*)
623
         (if (and *worker* (eq worker *worker*))
624
             (setf suicide t)
625
             ;; user could possibly (though unlikely) destroy the
626
             ;; thread simultaneously, so ignore double-destroy error
627
             (ignore-errors (terminate-thread (worker-thread worker)))))
628
       (when suicide
629
         (assert (eq (worker-thread *worker*) *current-thread*))
630
         (throw '#.+worker-suicide-tag+ nil)))))
631
 
632
 (defun kill-errors-report (stream)
633
   (format stream "Kill errors in workers (remove debugger instances)."))
634
 
635
 (eval-always
636
   (defvar *worker-restarts* '((kill-errors #'kill-errors :report-function #'kill-errors-report))
637
     "A list of restarts available in the body of a WITH-WORKER-RESTARTS form."))
638
 
639
 (defmacro with-worker-restarts (&body body)
640
   "Eval BODY in a worker context with restarts and a catch for
641
 +WORKER-SUICIDE-TAG+. See variable *WORKER-RESTARTS*."
642
   `(catch +worker-suicide-tag+ 
643
      (restart-bind ,*worker-restarts*
644
        ,@body)))
645
 
646
 ;;; Worker
647
 (defvar *default-worker-tx-capacity* 8)
648
 (defclass worker-status ()
649
   ((%rx :initform (sb-concurrency:make-gate))
650
    (%tx :initform (make-queue :capacity *default-worker-tx-capacity*))))
651
 
652
 (defclass worker (worker-status)
653
   ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
654
            :accessor worker-thread
655
            :initarg :thread)
656
    (kernel :initform *worker-kernel* :accessor kernel)
657
    (work :accessor work :type spin-queue :initarg :work)
658
    (index :reader worker-index :type array-index :initarg :index :accessor index)
659
    (bind :type list :accessor worker-bind :initarg :bind :initform *default-special-bindings* :accessor bind)))
660
 
661
 (defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
662
   (push (worker-thread self) *worker-threads*))
663
 
664
 (defun make-worker* (&key thread kernel bind index)
665
   (apply #'make-instance *worker-class*
666
          `(,@(when thread `(:thread ,thread))
667
            ,@(when index `(:index ,index))
668
            ,@(when kernel `(:kernel ,kernel))
669
            ,@(when bind `(:bind ,bind)))))
670
 
671
 (defmacro with-default-special-bindings (bindings &body body)
672
   `(let ((*default-special-bindings* ,bindings))
673
      ,@body))
674
 
675
 ;; TODO 2024-10-03: pause/resume
676
 (declaim (inline kill-worker join-worker start-worker run-worker))
677
 (defun start-worker (worker &rest args)
678
   (with-default-special-bindings (worker-bind worker)
679
     (sb-thread::start-thread (worker-thread worker) (kernel worker) args)))
680
 
681
 (defun run-worker (worker &key bind wait)
682
   (when bind
683
     (setf (worker-bind worker) bind))
684
   (start-worker worker)
685
   (if wait (join-worker worker)
686
       worker))
687
 
688
 (defmethod run-object ((self worker) &key)
689
   (run-worker self))
690
 
691
 (defun run-with-worker (worker object &key wait)
692
   (run-worker worker :bind object :wait wait))
693
 
694
 (definline thread= (a b) (and a b (= (thread-os-tid a) (thread-os-tid b))))
695
 (definline worker= (a b) 
696
   (and a b
697
        (or
698
         (= (index a) (index b))
699
         (thread= (worker-thread a) (worker-thread b)))))
700
 
701
 (defun kill-worker (worker) 
702
   (declare (worker worker))
703
   (let ((th (worker-thread worker)))
704
     (unless (null th)
705
       (remove th *worker-threads* :test 'thread=)
706
       (kill-thread th))))
707
 
708
 (defun join-worker (worker)
709
   (declare (worker worker))
710
   (let ((th (worker-thread worker)))
711
     (remove th *worker-threads* :test 'thread=)
712
     (join-thread th)))
713
 
714
 (defun send-worker-start (worker)
715
   (assert (sb-concurrency:open-gate (slot-value worker '%rx)) nil "Failed to start worker ~A" worker))
716
 
717
 (defun receive-worker-start (worker)
718
   (print-top-level (format nil "worker ~A starting...~%" (worker-index worker)))
719
   (assert (sb-concurrency:gate-open-p (slot-value worker '%rx)) nil "Worker hijacked? ~A" worker))
720
 
721
 (defun receive-worker-status (worker)
722
   (ecase (pop-queue (slot-value worker '%tx))
723
     (ok (print-top-level (format nil "worker ~A OK.~%" (worker-index worker))))
724
     (error (error 'kernel-init-error))))
725
 
726
 (defun send-worker-status (worker status)
727
   (check-type status (member ok error))
728
   (print-top-level (format nil "worker ~A status: ~A~%" (worker-index worker) status))
729
   (push-queue status (slot-value worker '%tx)))
730
 
731
 (defun notify-exit (worker)
732
   (sb-concurrency:close-gate (slot-value worker '%rx)))
733
 
734
 (defun wait-for-worker (worker &optional timeout)
735
   (assert (sb-concurrency:wait-on-gate (slot-value worker '%rx) :timeout timeout)))
736
 
737
 ;;;; Worker Protocol
738
 (defgeneric workers (self))
739
 (defgeneric work (self))
740
 (defgeneric run-thread (self thunk &key name &allow-other-keys))
741
 
742
 (defun make-workers (count &key thread kernel bind (return-type 'vector))
743
   (let ((ret))
744
     (dotimes (i count)
745
       (push (make-worker* :thread thread :kernel kernel :bind bind) ret))
746
     (if return-type (coerce ret return-type) ret)))
747
 
748
 ;;; Scheduler
749
 (defclass scheduler ()
750
   ((workers :type simple-vector :accessor workers :initarg :workers)
751
    (wait-cvar :initform (make-waitqueue :name "wait-cvar"))
752
    (wait-lock :initform (make-mutex :name "wait-lock"))
753
    (wait-count :initform (make-counter) :type counter)
754
    (notify-count :initform 0 :type (integer 0))
755
    (spin-count :type array-index :initarg :spin-count :initform *default-spin-count*)
756
    ;; cursor?
757
    (index :initform 0 :type array-index :initarg :index :accessor scheduler-index))
758
   (:documentation
759
    "A scheduler is responsible for finding and sequencing work to be executed by
760
 WORKER threads."))
761
 
762
 (defclass biased-scheduler (scheduler)
763
   ((low-priority-work
764
     :initform (make-spin-queue) 
765
     :type spin-queue 
766
     :accessor low-priority-work
767
     :initarg :low-priority-work))
768
   (:documentation "A 'biased' scheduler with an additional spin-queue for 'low priority' work."))
769
 
770
 (defun make-scheduler (workers spin-count)
771
   (make-instance *scheduler-class* :workers workers :spin-count spin-count))
772
 
773
 (defmacro with-pop-success (sym queue &body body)
774
   (with-gensyms (presentp)
775
     `(multiple-value-bind (,sym ,presentp) (pop-spin-queue ,queue)
776
        (when ,presentp
777
          ,@body))))
778
 
779
 (defun push-to-random-worker (work sched)
780
   (declare (scheduler sched))
781
   (with-slots (workers) sched
782
     (push-spin-queue work (work (svref workers (mod-decf (scheduler-index sched) (length workers))))))
783
   (values))
784
 
785
 (defmacro with-mutex-p ((mutex predicate &key (wait-p t) timeout) &body body)
786
   ;; intentially eval PREDICATE twice
787
   `(when ,predicate
788
      (with-mutex (,mutex :wait-p ,wait-p :timeout ,timeout)
789
        (when ,predicate
790
          ,@body))))
791
 
792
 (defun maybe-wake-a-worker (sched)
793
   (declare (scheduler sched))
794
   (with-slots (wait-lock wait-cvar wait-count notify-count) sched
795
     (with-mutex-p (wait-lock (plusp (counter-value wait-count)))
796
       (incf notify-count)
797
       (condition-notify wait-cvar)))
798
   (values))
799
 
800
 (defun schedule-work (sched work &optional priority)
801
   (declare (scheduler sched))
802
   (ccase priority
803
     (:low (with-slots (low-priority-work) sched (push-spin-queue work low-priority-work)))
804
     (:default (push-to-random-worker work sched))
805
     (t (push-to-random-worker work sched)))
806
   (maybe-wake-a-worker sched)
807
   (values))
808
 
809
 (defmacro %repeat (count &body body)
810
   (with-gensyms (left)
811
     `(let ((,left (the fixnum ,count)))
812
        (declare (type fixnum ,left))
813
        (loop
814
          (when (zerop ,left)
815
            (return (values)))
816
          (decf ,left)
817
          ,@body))))
818
 
819
 (defmacro do-indexes ((ivar size hindex from-hindex-p) &body body)
820
   ;; size is positive
821
   (with-gensyms (svar hivar)
822
     `(let ((,ivar (the array-index ,hindex))
823
            (,svar (the array-index ,size))
824
            (,hivar (the array-index ,hindex)))
825
        (declare (type array-index ,ivar ,svar ,hivar))
826
        (loop
827
             ,(let ((next `(mod-incf ,ivar ,svar)))
828
                (if from-hindex-p
829
                    `(progn ,@body ,next)
830
                    `(progn ,next ,@body)))
831
             (when (= ,ivar ,hivar)
832
               (return (values)))))))
833
 
834
 (defmacro do-workers ((wvar workers hindex &optional from-hindex-p) &body body)
835
   (with-gensyms (wsvar ivar)
836
     `(let ((,wsvar ,workers))
837
        (declare (simple-vector ,wsvar))
838
        (do-indexes (,ivar (length (the simple-vector ,wsvar)) ,hindex ,from-hindex-p)
839
          (let ((,wvar (svref (the simple-vector ,wsvar) ,ivar)))
840
            (declare (worker ,wvar))
841
            ,@body)))))
842
 
843
 (defun find-work (sched w)
844
   (declare (scheduler sched) (worker w))
845
   (labels ((try-pop (queue)
846
              (declare (type spin-queue queue))
847
              (with-pop-success work queue
848
                (return-from find-work work))
849
              (values))
850
            (try-pop-all ()
851
              (with-slots (workers) sched
852
                (do-workers (w workers (worker-index w) nil)
853
                  (try-pop (work w))))
854
              (values))
855
            (maybe-sleep ()
856
              (with-slots (wait-cvar wait-lock wait-count
857
                           notify-count low-priority-work) sched
858
                (inc-counter wait-count)
859
                (unwind-protect 
860
                     (with-mutex (wait-lock)
861
                       (try-pop (work w))
862
                       (try-pop low-priority-work)
863
                       (loop until (plusp notify-count)
864
                             do (condition-wait wait-cvar wait-lock)
865
                             finally (decf notify-count)))
866
                  (dec-counter wait-count)))
867
              (values)))
868
     (declare (dynamic-extent #'try-pop #'try-pop-all #'maybe-sleep))
869
     (with-slots (spin-count) sched
870
       (loop
871
         (try-pop (work w))
872
         (try-pop-all)
873
         (%repeat spin-count
874
           (try-pop-all))
875
         (maybe-sleep)))))
876
 
877
 (defun steal-work (scheduler)
878
   (declare (scheduler scheduler))
879
   (with-slots (workers index low-priority-work) scheduler
880
     (let ((low-priority-work low-priority-work))
881
       (flet ((try-pop (work)
882
                (declare (spin-queue work low-priority-work))
883
                (with-pop-success w work
884
                  (when w ; don't steal nil, the end condition flag
885
                    (return-from steal-work w))
886
                  (push-spin-queue w low-priority-work))
887
                (values)))
888
         (declare (dynamic-extent #'try-pop))
889
         ;; Start with the worker that has the most recently submitted
890
         ;; work (approximately) and advance rightward.
891
         (do-workers (worker workers index t)
892
           ;; FIX 2025-07-14: 
893
           (try-pop (work worker)))
894
         (try-pop low-priority-work))))
895
   nil)
896
 
897
 (defun steal-work* (pool worker)
898
   (when-let ((w (steal-work (scheduler pool))))
899
     (if worker
900
         (exec-with-worker w worker)
901
         (exec-without-worker w))
902
     t))
903
 
904
 (defgeneric schedule (self &key &allow-other-keys))
905
 (defgeneric (setf schedule) (new self &key &allow-other-keys))
906
 
907
 ;;; Supervisor
908
 (defclass supervisor ()
909
   ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
910
    (domain)
911
    (scope))
912
   (:documentation "Supervisors are threads which are responsible for a set of worker threads
913
 within their DOMAIN and SCOPE."))
914
 
915
 (defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
916
   (push (supervisor-thread self) *super-threads*))
917
 
918
 ;;; Oracle
919
 (defstruct (oracle (:constructor %make-oracle (id thread)))
920
   "Oracles provide a tagged view into some threaded scope of work."
921
   (id 0 :type (unsigned-byte 32) :read-only t)
922
   (thread *current-thread* :read-only t))
923
 
924
 (defun oracle-of-id (id)
925
   (gethash id *oracle-table*))
926
 
927
 (defun make-oracle (&optional (thread *current-thread*))
928
   (let ((id (thread-os-tid thread)))
929
     (if-let ((found (oracle-of-id id)))
930
       (values id found)
931
       (let ((orc (%make-oracle id thread)))
932
         (setf (gethash id *oracle-table*) (make-array 0 :adjustable t :fill-pointer 0))
933
         (values id orc)))))
934
 
935
 ;;; Thread Pool
936
 (defclass thread-pool-context ()
937
   ((bind :initarg :bind :initform *default-special-bindings* :type list :accessor bind)
938
    (name :accessor name :initarg :name)))
939
 
940
 (defclass thread-pool (thread-limiter thread-pool-context)
941
   ((kernel :initform *pool-kernel* :type kernel :accessor kernel :initarg :kernel)
942
    (scheduler :initarg :scheduler :accessor scheduler)
943
    (workers :initarg :workers :accessor workers :type (simple-array worker))
944
    (lock :initarg :lock :initform (make-mutex :name "workers") :type mutex :accessor lock)
945
    (alive :initform t :reader alive :type boolean :initarg :alive))
946
   (:documentation "Thread pools are similar to LPARALLEL kernels - they encompass the scheduling
947
 and execution of concurrent work using a pool of 'worker' threads."))
948
 
949
 (definline register-thread-pool (name pool)
950
   (declare (thread-pool pool))
951
   (setf (gethash name *thread-pool-table*) pool))
952
 
953
 (defun find-thread-pool (name) (gethash name *thread-pool-table*))
954
 
955
 (defmethod initialize-instance :after ((self thread-pool) &key name &allow-other-keys)
956
   (when name (register-thread-pool name self)))
957
 
958
 (defmacro ensure-working-p (pool)
959
   `(locally (declare (optimize (speed 3) (safety 0)))
960
      (accept-work-p (the thread-pool ,pool))))
961
 
962
 (defun update-limiter-count* (pool delta)
963
   (declare (thread-pool pool) (fixnum delta) 
964
            (optimize (speed 3) (safety 0)))
965
   (incf (the fixnum (limiter-count pool)) delta)
966
   (setf (accept-work-p pool)
967
         (plusp (the fixnum (limiter-count pool))))
968
   (values))
969
 
970
 ;; REVIEW 2025-04-27: may need to add more to std/spin
971
 (defun update-limiter-count (pool delta)
972
   (declare (thread-pool pool) (fixnum delta)
973
            (optimize (speed 3) (safety 0)))
974
   (with-mutex ((limiter-lock pool))
975
     (update-limiter-count* pool delta))
976
   (values))
977
 
978
 ;;;; Core
979
 (defun exec-with-worker (work worker)
980
   (declare (worker worker))
981
   (funcall (kernel worker) work))
982
 
983
 (defun exec-without-worker (work)
984
   (check-kernel)
985
   (call-with-work-handler (funcall *kernel* work)))
986
 
987
 (defun replace-worker (pool worker &optional (kernel *worker-kernel*))
988
   (with-slots (workers lock) pool
989
     (with-mutex (lock)
990
       (let ((i (position worker workers :test #'worker=)))
991
         (assert i)
992
         (assert (eql i (worker-index worker)))
993
         (warn "Replacing lost or dead worker")
994
         (let ((new-worker 
995
                 (make-instance 'worker
996
                   :kernel kernel 
997
                   :index i 
998
                   :bind (worker-bind worker)
999
                   :work (make-spin-queue))))
1000
           (setf (svref workers i) new-worker)
1001
           (send-worker-start new-worker)
1002
           (receive-worker-start new-worker))))))
1003
 
1004
 (defun worker-loop (pool worker)
1005
   (declare (thread-pool pool) (worker worker))
1006
   (let ((sched (scheduler pool)))
1007
     (unwind-protect
1008
          (loop (let ((work (find-work sched worker)))
1009
                  (if work
1010
                      (exec-with-worker work worker)
1011
                      (return))))
1012
       (unless *lisp-exiting-p*
1013
         (replace-worker pool worker)))))
1014
 
1015
 (defun call-with-worker-context (fn context pool worker)
1016
   (receive-worker-start worker)
1017
   (unwind-protect
1018
        (funcall (print context)
1019
                 (lambda ()
1020
                   (let ((*worker* (find *current-thread* (workers pool)
1021
                                         :key #'worker-thread)))
1022
                     (assert *worker*)
1023
                     (send-worker-status worker 'ok)
1024
                     (with-worker-restarts
1025
                       (%call-with-work-handler fn)))))
1026
     ;; This error notification is seen when `worker-context' does not
1027
     ;; call its worker-loop parameter, otherwise it's ignored.
1028
     (send-worker-status worker 'error)))
1029
 
1030
 (defun enter-worker-loop (pool worker)
1031
   (call-with-worker-context
1032
    (lambda () (worker-loop pool worker))
1033
    (kernel worker)
1034
    pool
1035
    worker))
1036
 
1037
 (defun make-all-bindings (kernel bindings)
1038
   (append bindings (list (cons '*kernel* kernel))))
1039
 
1040
 (defun %make-worker (index class)
1041
   (make-instance class :index index :thread nil))
1042
 
1043
 (defun make-worker-thread (pool worker &optional bind)
1044
   (with-thread (:bindings (or bind (worker-bind worker)))
1045
     (unwind-protect (enter-worker-loop pool worker)
1046
       (notify-exit worker))))
1047
 
1048
 (defun make-worker (pool index &optional work (class *worker-class*))
1049
   (let* ((worker (%make-worker index class))
1050
          (bind (make-all-bindings *worker-kernel* (bind pool)))
1051
          (worker-thread (make-worker-thread pool worker bind)))
1052
     (setf (worker-thread worker) worker-thread
1053
           (worker-bind worker) bind)
1054
     (when work (setf (work worker) work))
1055
     worker))
1056
 
1057
 (defmacro with-fill-workers-handler (workers &body body)
1058
   `(unwind-protect (progn ,@body)
1059
      (map 'simple-vector
1060
           (lambda (w)
1061
             (when (typep w 'worker)
1062
               (terminate-thread (worker-thread w))))
1063
           ,workers)))
1064
 
1065
 (defun %fill-workers (workers pool)
1066
   (dotimes (i (length workers))
1067
     (setf (aref workers i) (make-worker pool i (make-spin-queue)))))
1068
 
1069
 (defun fill-workers (workers pool)
1070
   (with-fill-workers-handler workers
1071
     (%fill-workers workers pool)
1072
     (map nil #'send-worker-start workers)))
1073
 
1074
 ;; (map nil #'receive-worker-start workers)))
1075
 ;; (map nil #'receive-worker-start workers)))
1076
 
1077
 (defun make-thread-pool (worker-count &key (name :default)
1078
                                            (bind `((*standard-output* . ,*standard-output*)
1079
                                                    (*error-output* . ,*error-output*)))
1080
                                            (worker-kernel *worker-kernel*)
1081
                                            (spin-count *default-spin-count*)
1082
                                            (alive t)
1083
                                            (kernel *pool-kernel*)
1084
                                            (class 'thread-pool))
1085
   "Create a THREAD-POOL with WORKER-COUNT number of available worker threads.
1086
 
1087
 NAME when non-nil is an EQL-unique identifier associated with the thread-pool
1088
 in *THREAD-POOL-TABLE*.
1089
 
1090
 BIND is an alist for establishing thread-local dynamic bindings inside worker
1091
 threads.
1092
 
1093
 WORKER-KERNEL which begins the worker loop and returns when the worker exits.
1094
 
1095
 KERNEL is a function which drives the THREAD-POOL.
1096
 
1097
 CLASS is the designated class of the returned THREAD-POOL object.
1098
 
1099
 SPIN-COUNT is the number of work-searching iterations done by the worker
1100
 before going to sleep."
1101
   (check-type worker-count positive-fixnum)
1102
   (check-type spin-count array-index)
1103
   (let ((*worker-kernel* worker-kernel)
1104
         (*pool-kernel* kernel))
1105
     (let* ((workers (make-array worker-count))
1106
            (pool (make-instance class
1107
                    :name name
1108
                    :bind bind
1109
                    :kernel *pool-kernel*
1110
                    :accept-work-p alive
1111
                    :alive alive
1112
                    :workers workers
1113
                    :scheduler (make-scheduler workers spin-count)
1114
                    :limiter-count (initial-limiter-count worker-count)
1115
                    :limiter-lock (make-spin-lock))))
1116
       (fill-workers workers pool)
1117
       pool)))
1118
 
1119
 (defun check-kernel ()
1120
   "Check the current value of *KERNEL*, ensuring it is bound appropriately
1121
 according to the current thread (worker, pool, super). STORE-VALUE
1122
 restarts is provided. *KERNEL* is returned."
1123
   ;; TODO 2025-04-21: 
1124
   (or (and *worker* (eql *kernel* *worker-kernel*))
1125
       (and (not *worker*) (not *kernel*))
1126
       (and (eql *pool-kernel* *kernel*))
1127
       (restart-case (error 'no-kernel-error)
1128
         (store-value (value)
1129
           :report "Assign a value to *KERNEL*."
1130
           :interactive (lambda () (print "Value: ") (read t ))
1131
           (check-type value kernel)
1132
           (setf *kernel* value))))
1133
   *kernel*)
1134
 
1135
 (defun check-thread-pool ()
1136
   "Check the current value of *THREAD-POOL*, ensuring it is bound to a
1137
 THREAD-POOL object. STORE-VALUE and MAKE-THREAD-POOL restarts are
1138
 provided. *THREAD-POOL* is returned."
1139
   (or *thread-pool*
1140
       (restart-case (error 'no-thread-pool-error)
1141
         (make-thread-pool (worker-count)
1142
           :report "Make a thread-pool now, prompting for number of workers."
1143
           :interactive (lambda () (princ "Worker count: ") (list (read)))
1144
           (setf *thread-pool* (make-thread-pool worker-count)))
1145
         (store-value (value)
1146
           :report "Assigne a value to *THREAD-POOL*."
1147
           :interactive (lambda () (print "Value for *THREAD-POOL*: ") (read))
1148
           (check-type value thread-pool)
1149
           (setf *thread-pool* value)))))
1150
 
1151
 (defun worker-count (pool)
1152
   "Return the worker count of POOL."
1153
   (length (workers pool)))
1154
 
1155
 (defun worker-count* ()
1156
   "Return the worker count of *THREAD-POOL*."
1157
   (worker-count *thread-pool*))
1158
 
1159
 (defun worker-index* ()
1160
   "If called from inside a worker return the worker's assigned index, ranging from 0 to (worker-count*)."
1161
   (when-let ((worker *worker*))
1162
     (worker-index worker)))
1163
 
1164
 (defun workers* () (workers *thread-pool*))
1165
 
1166
 (defun scheduler* () (scheduler *thread-pool*))
1167
 
1168
 (defmacro work-lambda (&body body)
1169
   "Generate a 'work-lambda' with BODY. *HANDLERS* will be bound for the duration
1170
 of the returned lambda."
1171
   (with-gensyms (body-fn handlers)
1172
     `(flet ((,body-fn () ,@body))
1173
        (declare (optimize (speed 3) (safety 0)))
1174
        (let ((,handlers *handlers*))
1175
          (if ,handlers
1176
              (lambda ()
1177
                (let ((*handlers* ,handlers))
1178
                  (,body-fn)))
1179
              #',body-fn)))))
1180
 
1181
 ;; TODO 2025-04-30: 
1182
 (defmacro pool-lambda (state &body body)
1183
   "Generate a 'pool-lambda' with provided BODY. *THREAD-POOL* and *HANDLERS* are
1184
 bound for the duration of the returned lambda and STATE is the name of the
1185
 single required argument of the lambda. The lambda should run all code
1186
 assigned to the input state and then return two values."
1187
   (with-gensyms (body-fn handlers pool)
1188
     `(labels ((,body-fn (,state) ,@body))
1189
        (declare (optimize (speed 3) (safety 0)))
1190
        (let ((,handlers *handlers*)
1191
              (,pool *thread-pool*))
1192
          (if ,handlers
1193
              (lambda (,state)
1194
                (let ((*handlers* ,handlers)
1195
                      (*thread-pool* ,pool))
1196
                  (,body-fn ,state)))
1197
              (lambda (,state)
1198
                (let ((*thread-pool* ,pool))
1199
                  (,body-fn ,state))))))))
1200
 
1201
 ;; (defmacro super-lambda (&body body))
1202
 
1203
 ;; (defmacro channel-lambda (ch &body body))
1204
 
1205
 (defun make-channeled-work (channel fn args)
1206
   (declare (channel channel) (function fn) (list args))
1207
   (let ((queue (channel-queue channel)))
1208
     (work-lambda
1209
       (unwind-protect (push-queue (with-work-context (apply fn args)) queue)
1210
         (push-queue (wrap-error 'worker-killed-error) queue)))))
1211
 
1212
 ;; make-work 
1213
 (defun submit-raw-work (work pool &optional (priority *work-priority*))
1214
   (unless (alive pool)
1215
     (error "attempted to submit work to a dead thread-pool"))
1216
   (schedule-work (scheduler pool) work priority))
1217
 
1218
 (defun submit-work (ch fn &rest args)
1219
   (check-type ch channel)
1220
   (submit-raw-work
1221
    (make-channeled-work ch
1222
                         (std/curry:ensure-function fn)
1223
                         args)
1224
    (channel-pool ch)))
1225
 
1226
 (defun receive-result (channel)
1227
   "Remove a result from CHANNEL. If nothing is available the call will block
1228
 until a result is received."
1229
   (unwrap-result (pop-queue (channel-queue channel))))
1230
 
1231
 (defun try-receive-result (channel &key timeout)
1232
   "Attempt to remove a result from CHANNEL and return (values RESULT t).
1233
 
1234
 By default if the channel is empty return (values nil nil)
1235
 immediately. TIMEOUT, if non-nil is the number of seconds to wait for a result
1236
 to appear on the queue."
1237
   (multiple-value-bind (result presentp)
1238
       (try-pop-queue (channel-queue channel) :timeout timeout)
1239
     (if presentp
1240
         (values (unwrap-result result) t)
1241
         (values nil nil))))
1242
 
1243
 (defmacro! do-fast-receives ((ret o!ch o!n) &body body)
1244
   "Receive N results from channel CH, executing BODY each iteration with results
1245
 bound to RET."
1246
   `(loop for i below ,g!n
1247
          do (let ((,ret (receive-result ,g!ch)))
1248
               ,@body)))
1249
 
1250
 (defun steal-until-receive-result (channel worker fn)
1251
   (loop
1252
     (multiple-value-bind (result presentp) (try-receive-result channel)
1253
       (when presentp
1254
         (when fn
1255
           (locally (declare (type function fn))
1256
             (funcall fn result)))
1257
         (return)))
1258
     (steal-work* (channel-pool channel) worker)))
1259
 
1260
 (defun receive-results (channel count fn)
1261
   (let ((worker *worker*))
1262
     (if worker
1263
         (%repeat count
1264
           (steal-until-receive-result channel worker fn))
1265
         (if fn
1266
             (do-fast-receives (result channel count)
1267
               (locally (declare (type function fn))
1268
                 (funcall fn result)))
1269
             (do-fast-receives (result channel count)
1270
               (declare (ignore result)))))))
1271
 
1272
 (defmacro with-submit-counted (&body body)
1273
   (with-gensyms (count channel)
1274
     `(let ((,count 0)
1275
            (,channel (make-channel)))
1276
        (declare (fixnum ,count))
1277
        (flet ((submit-counted (&rest args)
1278
                 (declare (dynamic-extent args))
1279
                 (apply #'submit-work ,channel args)
1280
                 (incf ,count))
1281
               (receive-counted ()
1282
                 (receive-results ,channel ,count nil)))
1283
          (declare (inline submit-counted receive-counted))
1284
          ,@body))))
1285
 
1286
 (defun shutdown-channel (channel pool)
1287
   (let ((*work-priority* :low))
1288
     (submit-work channel (lambda ())))
1289
   (receive-result channel)
1290
   (with-slots (scheduler workers alive) pool
1291
     (loop for i below (length workers)
1292
           do (schedule-work scheduler nil :low))
1293
     (map nil #'wait-for-worker workers)
1294
     (setf alive nil)))
1295
 
1296
 (defun end-thread-pool (&key wait)
1297
   (when-let ((pool *thread-pool*))
1298
     (let ((name (when (slot-boundp pool 'name) (name pool))))
1299
       (when name (remhash name *thread-pool-table*))
1300
       (setf *thread-pool* nil
1301
             *worker-threads* (flatten *worker-threads*))
1302
       (when (alive pool)
1303
         (let ((channel (let ((*thread-pool* pool)) (make-instance 'channel)))
1304
               (threads (map 'list #'worker-thread (workers pool))))
1305
           (cond (wait
1306
                  (shutdown-channel channel pool)
1307
                  threads)
1308
                 (t
1309
                  (cons (with-thread (:name (format nil "%shutdown-~A" (or name "thread-pool")))
1310
                          (shutdown-channel channel pool))
1311
                        threads))))))))
1312
 
1313
 (defun thread-pool-info (pool)
1314
   (list :workers (worker-count pool)
1315
         :alive (alive pool)
1316
         :spin-count (slot-value (scheduler pool) 'spin-count)
1317
         :limiter-count (limiter-count pool)))
1318
 
1319
 (defmethod print-object ((pool thread-pool) stream)
1320
   (print-unreadable-object (pool stream :type t :identity t)
1321
     (format stream "~(~s ~^~)~{~s~^ ~}" (name pool) (thread-pool-info pool))))
1322
 
1323
 (defun broadcast-work (function &rest args)
1324
   "Wait for current and pending work to complete, if any, then
1325
 simultaneously execute the given work inside each worker. Wait until
1326
 this work is complete, then return the results in a vector.
1327
 
1328
 Calling `broadcast-work' from inside a worker is an error."
1329
   (when *worker*
1330
     (error "Cannot call `broadcast-work' from inside a worker."))
1331
   (let* ((function (ensure-function function))
1332
          (*thread-pool* (check-thread-pool))
1333
          (worker-count (worker-count*))
1334
          (channel (make-instance 'channel))
1335
          (from-workers (make-semaphore :name "from-workers"))
1336
          (to-workers (make-semaphore :name "to-workers")))
1337
     (loop repeat worker-count 
1338
           do (submit-work channel (lambda ()
1339
                                     (signal-semaphore from-workers)
1340
                                     (wait-on-semaphore to-workers)
1341
                                     (apply function args))))
1342
     (loop repeat worker-count
1343
           do (wait-on-semaphore from-workers))
1344
     (loop repeat worker-count 
1345
           do (signal-semaphore to-workers))
1346
     (map-into (make-array worker-count) (lambda () (receive-result channel)))))
1347
 
1348
 (defun %exit-threads ()
1349
   (setf *lisp-exiting-p* t))
1350
 
1351
 (pushnew '%exit-threads sb-ext:*exit-hooks*)
1352
 
1353
 ;;; Kernel
1354
 ;; kernel utils
1355
 (defun indexing-wrapper (array index function args)
1356
   (setf (aref array index) (apply function args)))
1357
 
1358
 (defmacro! with-submit-indexed (o!count o!array &body body)
1359
   (with-gensyms (channel)
1360
     `(let ((,channel (make-instance 'channel)))
1361
        (flet ((submit-indexed (index function &rest args)
1362
                 (submit-work
1363
                  ,channel #'indexing-wrapper ,g!array index function args))
1364
               (receive-indexed ()
1365
                 (receive-results ,channel ,g!count nil)
1366
                 ,g!array))
1367
          (declare (inline submit-indexed receive-indexed))
1368
          ,@body))))
1369
 
1370
 (defmacro with-submit-cancelable (&body body)
1371
   (with-gensyms (canceledp channel count)
1372
     `(let ((,canceledp nil)
1373
            (,count 0)
1374
            (,channel (make-channel)))
1375
        (flet ((submit-cancelable (fn &rest args)
1376
                 (submit-work ,channel
1377
                              (lambda ()
1378
                                (if ,canceledp
1379
                                    'task-canceled
1380
                                    (apply fn args))))
1381
                 (incf ,count)))
1382
          (macrolet ((receive-cancelables (result &body body)
1383
                       `(receive-results
1384
                         ,',channel ,',count (lambda (,result) ,@body))))
1385
            (unwind-protect (progn ,@body)
1386
              (setf ,canceledp t)))))))
1387
 
1388
 (defun call-with-temp-pool (fn &rest args)
1389
   ;; ensure that we end the same pool we create
1390
   (let ((pool (apply #'make-thread-pool args)))
1391
     (unwind-protect
1392
          (let ((*thread-pool* pool))
1393
            (funcall fn))
1394
       (let ((*thread-pool* pool))
1395
         (end-thread-pool :wait t)))))
1396
 
1397
 (defmacro with-temp-pool ((&rest make-pool-args) &body body)
1398
   "Create a temporary pool for the duration of `body', ensuring that
1399
 `end-thread-pool' is eventually called. `make-thread-pool' is given the
1400
 arguments `make-pool-args'.
1401
 
1402
 **NOTE**: Use this only if you understand its implications. Since
1403
 `*thread-pool*' is unaffected outside `body', the REPL will be useless with
1404
 respect to the temporary pool. For instance calling `kill'
1405
 from the REPL will not affect tasks that are running in the temporary
1406
 pool.
1407
 
1408
 Multiple uses of `with-temp-pool' within the same application are
1409
 prone to defeat the purpose and benefits of having a thread pool. This
1410
 is an especial risk if `with-temp-pool' appears inside a library,
1411
 which is likely to be a suboptimal situation.
1412
 
1413
 While using `with-temp-pool' is generally a bad idea, there are a
1414
 few valid uses, such as for testing, where the code is non-critical or
1415
 where convenience trumps other concerns."
1416
   `(call-with-temp-pool (lambda () ,@body) ,@make-pool-args))
1417
 
1418
 (defmacro defkernel (name-and-opts (&rest args) &body body)
1419
   "Define a new kernel function with NAME ARGS and BODY."
1420
   (typecase name-and-opts
1421
     (cons `(definline ,(car name-and-opts) ,args ,@body))
1422
     (t `(definline ,name-and-opts ,args ,@body))))
1423
 
1424
 ;;; Pipes
1425
 ;; From Shinmera's VERBOSE
1426
 (defstruct sync-message
1427
   (condition (make-waitqueue))
1428
   (lock (make-mutex)))
1429
 
1430
 (defmethod lock ((self sync-message)) (sync-message-lock self))
1431
 
1432
 (defmethod msg ((vector vector) (msg sync-message))
1433
   ;; ensure we're waiting on the condition..
1434
   (with-mutex ((sync-message-lock msg)))
1435
   (condition-notify (sync-message-condition msg)))
1436
 
1437
 (defmacro with-sync-message (s &body body)
1438
   `(let ((,s (make-sync-message)))
1439
      (with-mutex ((sync-message-lock ,s))
1440
        ,@body
1441
        (condition-wait* (sync-message-condition ,s) (sync-message-lock ,s)))))