Coverage report: /home/ellis/comp/core/std/thread.lisp
Kind | Covered | All | % |
expression | 174 | 1490 | 11.7 |
branch | 2 | 52 | 3.8 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; threads.lisp --- Multi-thread utilities
7
;; mostly yoinked from sb-thread and lparallel
12
;; (multiple-value-list (sb-unix:unix-getrusage 0))
13
;; (setf sb-unix::*on-dangerous-wait* :error)
15
;; TODO 2024-10-03: with-cas-lock?
18
(in-package :std/thread)
20
;; (sb-thread:thread-os-tid sb-thread:*current-thread*)
21
;; sb-thread:interrupt-thread
23
(defclass kernel-class ()
25
(:metaclass funcallable-standard-class)
26
(:documentation "Standard kernel class."))
28
(defclass kernel-object (funcallable-standard-object)
30
(:metaclass funcallable-standard-class)
31
(:documentation "Standard kernel object."))
33
(definline make-kernel (fn)
34
"Return a new KERNEL-OBJECT and set the instance function to FN."
35
(declare (function fn))
36
(let ((fin (make-instance 'kernel-object)))
37
(set-funcallable-instance-function fin (compile nil fn))
40
;; (defmethod :before initialize-instance ((self kernel-object) &key)
41
;; (when *kernel* (set-funcallable-instance-function self *kernel*)))
43
;; make-instance (set-funcallable-instance-function kernel)
47
"A type which specifies kernels. A kernel may be a list which is interpreted as
48
a lambda expression, a symbol which names a function, or a compiled-function."
49
'(or cons symbol compiled-function kernel-object))
51
(deftype kernel-function ()
52
"A function of at least one argument with no return value."
53
'(function (t &rest args) (values)))
55
(deftype worker-kernel-function (&optional (kind 'worker))
56
"A function which is suitable as a kernel for KIND workers."
57
`(function (,kind t t) (values)))
58
(deftype channel-kernel-function (&optional (kind 'channel))
59
"A function which is suitable as a kernel for KIND channels."
60
`(function (,kind) (values)))
61
(deftype pool-kernel-function (&optional (kind 'thread-pool))
62
"A function which is suitable as a kernel for KIND thread-pools."
63
`(function (,kind scheduler t t &rest args) (values)))
66
(defvar *default-special-bindings* nil
67
"This variable holds an alist associating special variable symbols
68
to forms to evaluate. Special variables named in this list will
69
be locally bound in the new thread before it begins executing user code.
71
This variable may be rebound around calls to MAKE-THREAD to
72
add/alter default bindings. The effect of mutating this list is
73
undefined, but earlier forms take precedence over later forms for
74
the same symbol, so defaults may be overridden by consing to the
77
(defvar *worker-class* 'worker
78
"The default WORKER class used to initialize THREAD-POOLs.")
80
"The current WORKER or nil.")
81
(defvar *work-priority* :default
82
"The default priority assigned to new work.")
83
(defvar *scheduler-class* 'biased-scheduler
84
"The default class of the scheduler used in THREAD-POOLs.")
85
(defvar *thread-pool* nil
86
"The current THREAD-POOL or nil.")
87
;; on core-i7 3.4ghz, a single spin takes ~ 2.5 microseconds.
88
(defvar *default-spin-count* 2000
89
"Default value of the 'spin-count' argument to MAKE-THREAD-POOL.")
91
(defvar *debug-threads-p* t
92
"When non-nil the debugger is invoked when an error goes unhandled in a
95
(defvar *lisp-exiting-p* nil
96
"True if the Lisp process is exiting - used for skipping auto-replacement of
97
killed workers during shutdown.")
101
"An empty pool kernel."
102
(declare (ignore state)))
104
(defvar *pool-kernel* (make-kernel #'%pool)
105
"A function which drives THREAD-POOLs.")
109
"An empty worker kernel."
110
(let ((work (or work (pop-spin-queue (work *worker*)))))
112
(function (funcall work))
113
(cons (apply (car work) (cdr work)))
117
"The current thread's kernel, or nil.")
119
(defvar *worker-kernel* (make-kernel #'%work)
120
"A kernel which drives WORKERs.")
123
(sb-ext:defglobal *worker-threads* nil
124
"list of worker threads.")
125
(sb-ext:defglobal *super-threads* nil
126
"List of threads with supervisor privileges.")
127
(sb-ext:defglobal *oracle-table* (make-hash-table)
128
"Hashtable containining (ID . ORACLE-SCOPE).")
129
(sb-ext:defglobal *thread-pool-table* (make-hash-table)
130
"Hashtable containing (NAME . THREAD-POOL).")
133
(defvar *error-workers* nil
134
"Track debugger popups in order to kill them.")
136
(defvar *error-workers-lock* (make-mutex :name "error workers")
137
"Lock for *ERROR-WORKERS*.")
139
(defun invoke-transfer-error (error)
140
"Equivalent to (invoke-restart 'transfer-error error)."
141
(invoke-restart 'transfer-error error))
143
(defun transfer-error-report (stream)
144
(format stream "Transfer this error to a dependent thread, if one exists."))
146
(defun condition-handler (condition)
147
"Mimic the CL handling mechanism, calling handlers until one assumes
149
(loop for ((condition-type . handler) . rest) on *handlers*
150
do (when (typep condition condition-type)
151
(let ((*handlers* rest))
152
(handler-bind ((condition #'condition-handler))
153
(funcall handler condition)))))
154
(when (typep condition 'error)
155
(invoke-transfer-error condition)))
157
(defconstant +work-tag+ 'my-work)
159
(defvar *debugger-error* nil
160
"Track the error inside the debugger for the `transfer-error' restart.")
162
(defvar *handler-active-p* nil
163
"Non-nil when handlers have been established via `call-with-work-handler'.")
165
(defun unwrap-result (result)
166
"In `receive-result', this is called on the stored work result. The
167
user receives the return value of this function."
170
;; A `wrapped-error' signals an error upon being unwrapped.
171
(error (wrapped-condition-value result)))
173
;; Most objects unwrap to themselves.
176
(defmacro work-handler-bind (clauses &body body)
177
"Like `handler-bind' but handles conditions signaled inside work
178
that was created in `body'."
179
(let ((forms (loop for clause in clauses
180
for (name fn . more) = clause
181
do (unless (and name (symbolp name) fn (not more))
182
(error "Ill-formed binding in `work-handler-bind': ~a"
184
collect `(cons ',name ,fn))))
185
`(let ((*handlers* (list* ,@forms *handlers*)))
188
(defun transfer-error-restart (&optional (err *debugger-error*))
190
(throw '#.+work-tag+ (wrap-error err))))
192
(defun call-with-tracked-error (condition body-fn)
194
(with-mutex (*error-workers-lock*)
195
(push *worker* *error-workers*)))
197
(let ((*debugger-error* condition))
200
(with-mutex (*error-workers-lock*)
201
(setf *error-workers*
202
(delete *worker* *error-workers*))))))
204
(defmacro with-tracked-error (condition &body body)
205
`(call-with-tracked-error ,condition (lambda () ,@body)))
207
(defun make-debugger-hook ()
208
"Record `*debugger-error*' for the `transfer-error' restart."
210
(let ((previous-hook *debugger-hook*))
211
(lambda (condition self)
212
(with-tracked-error condition
213
(funcall previous-hook condition self))))
214
(lambda (condition self)
215
(declare (ignore self))
216
(with-tracked-error condition
217
(invoke-debugger condition)))))
219
(defmacro with-work-context (&body body)
220
"Eval BODY in a context where throw to +WORK-TAG+ will be caught."
221
`(catch +work-tag+ ,@body))
223
(defun %call-with-work-handler (fn)
224
"Call FN with worker conditions handled."
225
(declare (function fn))
226
(let ((*handler-active-p* t)
227
(*debugger-hook* (make-debugger-hook)))
228
(handler-bind ((condition #'condition-handler))
229
(restart-bind ((transfer-error #'transfer-error-restart
230
:report-function #'transfer-error-report))
233
(defun call-with-work-handler (fn)
234
"Call FN in a worker context with conditions handled."
235
(declare (function fn))
237
(if *handler-active-p*
239
(%call-with-work-handler fn))))
241
(define-condition worker-killed-error (error) ()
243
"The worker was killed.")
245
"Error signaled when attempting to obtain a result from a killed worker."))
247
(define-condition kernel-init-error (error) ()
249
"The kernel failed to initialize.")
251
"Error signaled when a kernel object fails to initialize."))
253
(define-condition no-kernel-error (error) ()
254
(:report "invalid *KERNEL*")
256
"Error signaled when a kernel object is invalid."))
258
(define-condition no-thread-pool-error () ()
260
"invalid *THREAD-POOL*")
262
"Error signaled when a kernel object is invalid."))
265
(defmacro mod-inc (k n)
266
`(the array-index (mod (the array-index (1+ (the array-index ,k)))
267
(the array-index ,n))))
269
(defmacro mod-dec (k n)
270
`(the array-index (mod (the fixnum (1- (the array-index ,k)))
271
(the array-index ,n))))
273
(defmacro mod-incf (place n)
274
`(the array-index (setf ,place (mod-inc ,place ,n))))
276
(defmacro mod-decf (place n)
277
`(the array-index (setf ,place (mod-dec ,place ,n))))
279
(defun thread-support-p ()
280
"Return Non-nil if threads are supported on this system. (:THREAD-SUPPORT feature)"
281
(member :thread-support *features*))
284
(defun print-top-level (msg)
285
"Print MSG to the top-level *STANDARD-OUTPUT*."
286
(let ((*standard-output* *standard-output*))
287
(sb-thread:make-thread
289
(format *standard-output* "~A" msg)))
292
(defun println-top-level (msg)
293
"Print MSG to the top-level *STANDARD-OUTPUT* followed by a newline."
294
(let ((*standard-output* *standard-output*))
295
(sb-thread:make-thread
297
(format *standard-output* "~A~%" msg)))
300
(defun find-thread-by-id (id)
301
"Search for thread by ID which must be an u64. On success returns the thread itself or nil."
302
(find id (sb-thread::list-all-threads) :test '= :key 'thread-os-tid))
304
(defun find-thread (name)
305
"Find a thread by name."
306
(find name (sb-thread::list-all-threads) :test 'equal :key 'thread-name))
308
(defun thread-key-list ()
309
"Return AVLNODE-KEYs associated with threads in *ALL-THREADS*."
310
(sb-thread::avltree-filter #'sb-thread::avlnode-key sb-thread::*all-threads*))
312
(defun thread-id-list ()
313
"Return the THREAD-OS-TID associated with thread in *ALL-THREADS*."
314
(sb-thread::avltree-filter (lambda (th) (thread-os-tid (sb-thread::avlnode-data th))) sb-thread::*all-threads*))
316
(defun thread-count ()
317
"Return the current count of threads in *ALL-THREADS*."
318
(sb-thread::avl-count sb-thread::*all-threads*))
320
(defun make-threads (n thunk &key (name "thread"))
321
"Make N number of threads which each eval THUNK."
322
(declare (type fixnum n))
324
collect (make-thread thunk :name (format nil "~A-~D" name i))))
326
(defun make-ephemeral-thread (name)
327
"Make a new 'ephemeral' thread called NAME."
328
(sb-thread::%make-thread name t (make-semaphore :name name)))
330
(defgeneric designate-oracle (host guest)
331
(:documentation "Designate an oracle GUEST for HOST."))
332
(defgeneric assign-supervisor (worker supervisor)
333
(:documentation "Assign a SUPERVISOR for WORKER."))
336
(defmacro with-thread ((&key bindings name) &body body)
337
"Eval BODY in a new thread with optional BINDINGS and NAME."
338
`(with-default-special-bindings ,bindings
339
(make-thread (lambda () ,@body)
340
,@(when name `(:name ,name)))))
342
(declaim (inline parse-lambda-list-names))
343
(defun parse-lambda-list-names (ll)
344
(multiple-value-bind (idx _ args) (sb-int:parse-lambda-list ll)
345
(declare (ignore idx _))
352
(defmacro with-threads ((i n &key return bindings args name) &body body)
353
"Eval BODY N times in a function with I bound to a new thread. Optional
354
keywords modify the bindings in effect."
355
`(with-default-special-bindings ,bindings
356
(dotimes (,i ,n ,@(when return (list return)))
357
(make-thread (lambda (,@args) ,@body)
358
,@(when name `(:name (symbolicate ,name i)))))))
360
(defun finish-threads (&rest threads)
361
"Finish THREADS, attempting to join them, else calling TERMINATE-THREAD."
362
(let ((threads (flatten threads)))
364
(mapc #'join-thread threads)
365
(dolist (thread threads)
366
(when (thread-alive-p thread)
367
(terminate-thread thread))))))
369
(defun timed-join-thread (thread timeout)
370
"Join THREAD waiting at most TIMEOUT seconds."
371
(declare (type thread thread) (type float timeout))
372
(handler-case (sb-sys:with-deadline (:seconds timeout)
373
(join-thread thread :default :aborted))
378
"Attempt to join the current thread, causing it to hang. You should never call this."
379
(join-thread *current-thread*))
381
(defun kill-thread (thread)
382
"Kill THREAD, ignoring all errors which may occur."
383
(when (thread-alive-p thread)
385
(terminate-thread thread))))
387
;; (sb-vm::primitive-object-slots (sb-vm::primitive-object 'sb-vm::thread))
388
;; (defun init-session (&optional (thread *current-thread*)) (sb-thread::new-session thread))
390
;; (sb-thread::with-progressive-timeout (timet :seconds 4) (dotimes (i 4000) (print (timet))))
392
;; (describe sb-thread::*session*)
394
;; make-listener-thread
396
;; with-progressive-timeout
398
;; (definline all-threads-sap ()
399
;; (sb-vm::extern-alien "all_threads" sb-vm::system-area-pointer))
402
(defun dump-thread ()
403
"Dump the contents of THREAD."
404
(let* ((slots (sb-vm::primitive-object-slots #1=(sb-vm::primitive-object 'sb-vm::thread)))
405
(sap (current-thread-sap))
406
(thread-obj-len (sb-vm::primitive-object-length #1#))
407
(names (make-array thread-obj-len :initial-element "")))
408
(loop for slot across slots
410
(setf (aref names (sb-vm::slot-offset slot)) (sb-vm::slot-name slot)))
411
(flet ((safely-read (sap offset &aux (bits (sb-vm::sap-ref-word sap offset)))
412
(cond ((eql bits sb-vm:no-tls-value-marker) :no-tls-value)
413
((eql (logand bits sb-vm:widetag-mask) sb-vm:unbound-marker-widetag) :unbound)
414
(t (sb-vm::sap-ref-lispobj sap offset))))
416
(declare (type fixnum sym))
417
(let ((*print-right-margin* 128)
419
(format t " ~3d ~30a : ~s~%"
420
#+sb-thread (ash sym (- sb-vm:word-shift))
422
#+sb-thread (sb-vm:symbol-from-tls-index sym)
425
(format t "~&TLS: (base=~x)~%" (sb-vm::sap-int sap))
426
(loop for tlsindex from sb-vm:n-word-bytes below
427
#+sb-thread (ash sb-vm::*free-tls-index* sb-vm:n-fixnum-tag-bits)
428
#-sb-thread (ash thread-obj-len sb-vm:word-shift)
429
by sb-vm:n-word-bytes
431
(unless (<= sb-vm::thread-allocator-histogram-slot
432
(ash tlsindex (- sb-vm:word-shift))
433
(1- sb-vm::thread-lisp-thread-slot))
434
(let ((thread-slot-name
435
(if (< tlsindex (ash thread-obj-len sb-vm:word-shift))
436
(aref names (ash tlsindex (- sb-vm:word-shift))))))
437
(if (and thread-slot-name (sb-vm::neq thread-slot-name 'sb-vm::lisp-thread))
438
(format t " ~3d ~30a : #x~x~%" (ash tlsindex (- sb-vm:word-shift))
439
thread-slot-name (sb-vm::sap-ref-word sap tlsindex))
440
(let ((val (safely-read sap tlsindex)))
441
(unless (eq val :no-tls-value)
442
(show tlsindex val)))))))
443
(let ((from (sb-vm::descriptor-sap sb-vm:*binding-stack-start*))
444
(to (sb-vm::binding-stack-pointer-sap)))
445
(format t "~%Binding stack: (depth ~d)~%"
446
(/ (sb-vm::sap- to from) (* sb-vm:binding-size sb-vm:n-word-bytes)))
448
(when (sb-vm::sap>= from to) (return))
449
(let ((val (safely-read from 0))
450
(sym #+sb-thread (sb-vm::sap-ref-word from sb-vm:n-word-bytes) ; a TLS index
451
#-sb-thread (sb-vm::sap-ref-lispobj from sb-vm:n-word-bytes)))
453
(setq from (sb-vm::sap+ from (* sb-vm:binding-size sb-vm:n-word-bytes))))))))
455
(definline wait-for-threads (threads)
456
(map 'list (lambda (thread) (sb-thread:join-thread thread :default nil)) threads))
458
(defun process-all-interrupts (&optional (thread sb-thread:*current-thread*))
459
(sb-ext:wait-for (null (sb-thread::thread-interruptions thread))))
462
;; BORDEAUX-THREADS version
463
(defun condition-wait* (cvar lock &key timeout)
464
(let ((success (condition-wait cvar lock :timeout timeout)))
469
(sb-ext:defglobal .known-threads-lock. (make-mutex :name "known-threads-lock"))
470
(sb-ext:defglobal .known-threads. (make-hash-table :weakness :key))
472
(defun %get-thread-wrapper (native-thread)
473
(multiple-value-bind (thread presentp)
474
(with-mutex (.known-threads-lock.)
475
(gethash native-thread .known-threads.))
478
(error "Thread wrapper is supposed to exist for ~S"
481
(defun (setf thread-wrapper) (thread native-thread)
482
(with-mutex (.known-threads-lock.)
483
(setf (gethash native-thread .known-threads.) thread)))
485
(defun remove-thread-wrapper (native-thread)
486
(with-mutex (.known-threads-lock.)
487
(remhash native-thread .known-threads.)))
489
;; Forms are evaluated in the new thread or in the calling thread?
492
((defbindings (name docstring &body initforms)
493
(check-type docstring string)
494
`(std/macs:define-constant ,name
496
,@(loop for (special form) in initforms
497
collect `(cons ',special ',form)))
499
:documentation ,docstring)))
500
(defbindings +standard-io-bindings+
501
"Standard bindings of printer/reader control variables as per
502
CL:WITH-STANDARD-IO-SYNTAX. Forms are evaluated in the calling thread."
503
(*package* (find-package :common-lisp-user))
506
(*print-case* :upcase)
513
(*print-miser-width* nil)
514
(*print-pprint-dispatch* (copy-pprint-dispatch nil))
518
(*print-right-margin* nil)
519
(*random-state* (make-random-state t))
521
(*read-default-float-format* 'double-float)
523
(*read-suppress* nil)
524
(*readtable* (copy-readtable nil))))
526
(defun compute-special-bindings (bindings)
527
(remove-duplicates (append bindings +standard-io-bindings+)
528
:from-end t :key #'car))
530
(defvar *%current-thread*)
532
(defun establish-dynamic-env (thread function special-bindings trap-conditions)
533
"Return a closure that binds the symbols in SPECIAL-BINDINGS and calls
535
(let* ((bindings (compute-special-bindings special-bindings))
536
(specials (mapcar #'car bindings))
537
(values (mapcar (lambda (f) (eval (cdr f))) bindings)))
538
(std/macs:named-lambda %establish-dynamic-env-wrapper ()
539
(progv specials values
540
(with-slots (%lock %return-values %exit-condition)
542
(flet ((record-condition (c)
544
(setf %exit-condition c)))
546
(let ((*%current-thread* nil))
547
;; Wait until the thread creator has finished creating
550
(setf *%current-thread* (%get-thread-wrapper *%current-thread*)))
552
(multiple-value-list (funcall function))))
554
(setf %return-values retval))
559
(values-list (run-function))
561
(record-condition c)))
563
((condition #'record-condition))
564
(values-list (run-function)))))))))))
568
(defstruct (biased-queue (:conc-name queue-))
569
(lock (make-mutex :name "queue-lock"))
570
(cvar (make-waitqueue :name "queue-cvar"))
571
(high (make-queue) :type queue)
572
(low (make-queue) :type queue))
574
(defun push-biased-queue (obj queue)
575
(declare (biased-queue queue))
576
(push-queue obj (queue-high queue))
577
(condition-notify (queue-cvar queue))
580
(defun push-biased-queue-low (obj queue)
581
(declare (biased-queue queue))
582
(push-queue obj (queue-low queue))
583
(condition-notify (queue-cvar queue))
587
(defclass channel (funcallable-standard-object)
588
((queue :initform (make-queue) :type queue :initarg :queue :accessor channel-queue)
589
(pool :initform *thread-pool* :type kernel :initarg :kernel :accessor channel-pool))
590
(:metaclass funcallable-standard-class))
593
(defclass thread-limiter ()
594
((accept-work-p :accessor accept-work-p :type boolean :initarg :accept-work-p)
595
(limiter-lock :accessor limiter-lock :initarg :limiter-lock)
596
(limiter-count :accessor limiter-count :initarg :limiter-count :type fixnum)))
598
(defun initial-limiter-count (thread-count) (+ thread-count 1))
601
(defconstant +worker-suicide-tag+ 'worker-suicide-tag)
605
(let ((kill-count 0))
606
(with-slots (lock workers) pool
608
(sb-sequence:dosequence (worker workers)
609
(when (not (eq (worker-thread worker) *current-thread*))
610
;; (eql category (running-category worker))
611
(terminate-thread (worker-thread worker))
614
(assert (eq (worker-thread *worker*) *current-thread*))
615
;; (when (eql category (running-category *worker*))
616
(throw '#.+worker-suicide-tag+ nil))))
619
(defun kill-errors ()
621
(with-mutex (*error-workers-lock*)
622
(dolist (worker *error-workers*)
623
(if (and *worker* (eq worker *worker*))
625
;; user could possibly (though unlikely) destroy the
626
;; thread simultaneously, so ignore double-destroy error
627
(ignore-errors (terminate-thread (worker-thread worker)))))
629
(assert (eq (worker-thread *worker*) *current-thread*))
630
(throw '#.+worker-suicide-tag+ nil)))))
632
(defun kill-errors-report (stream)
633
(format stream "Kill errors in workers (remove debugger instances)."))
636
(defvar *worker-restarts* '((kill-errors #'kill-errors :report-function #'kill-errors-report))
637
"A list of restarts available in the body of a WITH-WORKER-RESTARTS form."))
639
(defmacro with-worker-restarts (&body body)
640
"Eval BODY in a worker context with restarts and a catch for
641
+WORKER-SUICIDE-TAG+. See variable *WORKER-RESTARTS*."
642
`(catch +worker-suicide-tag+
643
(restart-bind ,*worker-restarts*
647
(defvar *default-worker-tx-capacity* 8)
648
(defclass worker-status ()
649
((%rx :initform (sb-concurrency:make-gate))
650
(%tx :initform (make-queue :capacity *default-worker-tx-capacity*))))
652
(defclass worker (worker-status)
653
((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
654
:accessor worker-thread
656
(kernel :initform *worker-kernel* :accessor kernel)
657
(work :accessor work :type spin-queue :initarg :work)
658
(index :reader worker-index :type array-index :initarg :index :accessor index)
659
(bind :type list :accessor worker-bind :initarg :bind :initform *default-special-bindings* :accessor bind)))
661
(defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
662
(push (worker-thread self) *worker-threads*))
664
(defun make-worker* (&key thread kernel bind index)
665
(apply #'make-instance *worker-class*
666
`(,@(when thread `(:thread ,thread))
667
,@(when index `(:index ,index))
668
,@(when kernel `(:kernel ,kernel))
669
,@(when bind `(:bind ,bind)))))
671
(defmacro with-default-special-bindings (bindings &body body)
672
`(let ((*default-special-bindings* ,bindings))
675
;; TODO 2024-10-03: pause/resume
676
(declaim (inline kill-worker join-worker start-worker run-worker))
677
(defun start-worker (worker &rest args)
678
(with-default-special-bindings (worker-bind worker)
679
(sb-thread::start-thread (worker-thread worker) (kernel worker) args)))
681
(defun run-worker (worker &key bind wait)
683
(setf (worker-bind worker) bind))
684
(start-worker worker)
685
(if wait (join-worker worker)
688
(defmethod run-object ((self worker) &key)
691
(defun run-with-worker (worker object &key wait)
692
(run-worker worker :bind object :wait wait))
694
(definline thread= (a b) (and a b (= (thread-os-tid a) (thread-os-tid b))))
695
(definline worker= (a b)
698
(= (index a) (index b))
699
(thread= (worker-thread a) (worker-thread b)))))
701
(defun kill-worker (worker)
702
(declare (worker worker))
703
(let ((th (worker-thread worker)))
705
(remove th *worker-threads* :test 'thread=)
708
(defun join-worker (worker)
709
(declare (worker worker))
710
(let ((th (worker-thread worker)))
711
(remove th *worker-threads* :test 'thread=)
714
(defun send-worker-start (worker)
715
(assert (sb-concurrency:open-gate (slot-value worker '%rx)) nil "Failed to start worker ~A" worker))
717
(defun receive-worker-start (worker)
718
(print-top-level (format nil "worker ~A starting...~%" (worker-index worker)))
719
(assert (sb-concurrency:gate-open-p (slot-value worker '%rx)) nil "Worker hijacked? ~A" worker))
721
(defun receive-worker-status (worker)
722
(ecase (pop-queue (slot-value worker '%tx))
723
(ok (print-top-level (format nil "worker ~A OK.~%" (worker-index worker))))
724
(error (error 'kernel-init-error))))
726
(defun send-worker-status (worker status)
727
(check-type status (member ok error))
728
(print-top-level (format nil "worker ~A status: ~A~%" (worker-index worker) status))
729
(push-queue status (slot-value worker '%tx)))
731
(defun notify-exit (worker)
732
(sb-concurrency:close-gate (slot-value worker '%rx)))
734
(defun wait-for-worker (worker &optional timeout)
735
(assert (sb-concurrency:wait-on-gate (slot-value worker '%rx) :timeout timeout)))
738
(defgeneric workers (self))
739
(defgeneric work (self))
740
(defgeneric run-thread (self thunk &key name &allow-other-keys))
742
(defun make-workers (count &key thread kernel bind (return-type 'vector))
745
(push (make-worker* :thread thread :kernel kernel :bind bind) ret))
746
(if return-type (coerce ret return-type) ret)))
749
(defclass scheduler ()
750
((workers :type simple-vector :accessor workers :initarg :workers)
751
(wait-cvar :initform (make-waitqueue :name "wait-cvar"))
752
(wait-lock :initform (make-mutex :name "wait-lock"))
753
(wait-count :initform (make-counter) :type counter)
754
(notify-count :initform 0 :type (integer 0))
755
(spin-count :type array-index :initarg :spin-count :initform *default-spin-count*)
757
(index :initform 0 :type array-index :initarg :index :accessor scheduler-index))
759
"A scheduler is responsible for finding and sequencing work to be executed by
762
(defclass biased-scheduler (scheduler)
764
:initform (make-spin-queue)
766
:accessor low-priority-work
767
:initarg :low-priority-work))
768
(:documentation "A 'biased' scheduler with an additional spin-queue for 'low priority' work."))
770
(defun make-scheduler (workers spin-count)
771
(make-instance *scheduler-class* :workers workers :spin-count spin-count))
773
(defmacro with-pop-success (sym queue &body body)
774
(with-gensyms (presentp)
775
`(multiple-value-bind (,sym ,presentp) (pop-spin-queue ,queue)
779
(defun push-to-random-worker (work sched)
780
(declare (scheduler sched))
781
(with-slots (workers) sched
782
(push-spin-queue work (work (svref workers (mod-decf (scheduler-index sched) (length workers))))))
785
(defmacro with-mutex-p ((mutex predicate &key (wait-p t) timeout) &body body)
786
;; intentially eval PREDICATE twice
788
(with-mutex (,mutex :wait-p ,wait-p :timeout ,timeout)
792
(defun maybe-wake-a-worker (sched)
793
(declare (scheduler sched))
794
(with-slots (wait-lock wait-cvar wait-count notify-count) sched
795
(with-mutex-p (wait-lock (plusp (counter-value wait-count)))
797
(condition-notify wait-cvar)))
800
(defun schedule-work (sched work &optional priority)
801
(declare (scheduler sched))
803
(:low (with-slots (low-priority-work) sched (push-spin-queue work low-priority-work)))
804
(:default (push-to-random-worker work sched))
805
(t (push-to-random-worker work sched)))
806
(maybe-wake-a-worker sched)
809
(defmacro %repeat (count &body body)
811
`(let ((,left (the fixnum ,count)))
812
(declare (type fixnum ,left))
819
(defmacro do-indexes ((ivar size hindex from-hindex-p) &body body)
821
(with-gensyms (svar hivar)
822
`(let ((,ivar (the array-index ,hindex))
823
(,svar (the array-index ,size))
824
(,hivar (the array-index ,hindex)))
825
(declare (type array-index ,ivar ,svar ,hivar))
827
,(let ((next `(mod-incf ,ivar ,svar)))
829
`(progn ,@body ,next)
830
`(progn ,next ,@body)))
831
(when (= ,ivar ,hivar)
832
(return (values)))))))
834
(defmacro do-workers ((wvar workers hindex &optional from-hindex-p) &body body)
835
(with-gensyms (wsvar ivar)
836
`(let ((,wsvar ,workers))
837
(declare (simple-vector ,wsvar))
838
(do-indexes (,ivar (length (the simple-vector ,wsvar)) ,hindex ,from-hindex-p)
839
(let ((,wvar (svref (the simple-vector ,wsvar) ,ivar)))
840
(declare (worker ,wvar))
843
(defun find-work (sched w)
844
(declare (scheduler sched) (worker w))
845
(labels ((try-pop (queue)
846
(declare (type spin-queue queue))
847
(with-pop-success work queue
848
(return-from find-work work))
851
(with-slots (workers) sched
852
(do-workers (w workers (worker-index w) nil)
856
(with-slots (wait-cvar wait-lock wait-count
857
notify-count low-priority-work) sched
858
(inc-counter wait-count)
860
(with-mutex (wait-lock)
862
(try-pop low-priority-work)
863
(loop until (plusp notify-count)
864
do (condition-wait wait-cvar wait-lock)
865
finally (decf notify-count)))
866
(dec-counter wait-count)))
868
(declare (dynamic-extent #'try-pop #'try-pop-all #'maybe-sleep))
869
(with-slots (spin-count) sched
877
(defun steal-work (scheduler)
878
(declare (scheduler scheduler))
879
(with-slots (workers index low-priority-work) scheduler
880
(let ((low-priority-work low-priority-work))
881
(flet ((try-pop (work)
882
(declare (spin-queue work low-priority-work))
883
(with-pop-success w work
884
(when w ; don't steal nil, the end condition flag
885
(return-from steal-work w))
886
(push-spin-queue w low-priority-work))
888
(declare (dynamic-extent #'try-pop))
889
;; Start with the worker that has the most recently submitted
890
;; work (approximately) and advance rightward.
891
(do-workers (worker workers index t)
893
(try-pop (work worker)))
894
(try-pop low-priority-work))))
897
(defun steal-work* (pool worker)
898
(when-let ((w (steal-work (scheduler pool))))
900
(exec-with-worker w worker)
901
(exec-without-worker w))
904
(defgeneric schedule (self &key &allow-other-keys))
905
(defgeneric (setf schedule) (new self &key &allow-other-keys))
908
(defclass supervisor ()
909
((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
912
(:documentation "Supervisors are threads which are responsible for a set of worker threads
913
within their DOMAIN and SCOPE."))
915
(defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
916
(push (supervisor-thread self) *super-threads*))
919
(defstruct (oracle (:constructor %make-oracle (id thread)))
920
"Oracles provide a tagged view into some threaded scope of work."
921
(id 0 :type (unsigned-byte 32) :read-only t)
922
(thread *current-thread* :read-only t))
924
(defun oracle-of-id (id)
925
(gethash id *oracle-table*))
927
(defun make-oracle (&optional (thread *current-thread*))
928
(let ((id (thread-os-tid thread)))
929
(if-let ((found (oracle-of-id id)))
931
(let ((orc (%make-oracle id thread)))
932
(setf (gethash id *oracle-table*) (make-array 0 :adjustable t :fill-pointer 0))
936
(defclass thread-pool-context ()
937
((bind :initarg :bind :initform *default-special-bindings* :type list :accessor bind)
938
(name :accessor name :initarg :name)))
940
(defclass thread-pool (thread-limiter thread-pool-context)
941
((kernel :initform *pool-kernel* :type kernel :accessor kernel :initarg :kernel)
942
(scheduler :initarg :scheduler :accessor scheduler)
943
(workers :initarg :workers :accessor workers :type (simple-array worker))
944
(lock :initarg :lock :initform (make-mutex :name "workers") :type mutex :accessor lock)
945
(alive :initform t :reader alive :type boolean :initarg :alive))
946
(:documentation "Thread pools are similar to LPARALLEL kernels - they encompass the scheduling
947
and execution of concurrent work using a pool of 'worker' threads."))
949
(definline register-thread-pool (name pool)
950
(declare (thread-pool pool))
951
(setf (gethash name *thread-pool-table*) pool))
953
(defun find-thread-pool (name) (gethash name *thread-pool-table*))
955
(defmethod initialize-instance :after ((self thread-pool) &key name &allow-other-keys)
956
(when name (register-thread-pool name self)))
958
(defmacro ensure-working-p (pool)
959
`(locally (declare (optimize (speed 3) (safety 0)))
960
(accept-work-p (the thread-pool ,pool))))
962
(defun update-limiter-count* (pool delta)
963
(declare (thread-pool pool) (fixnum delta)
964
(optimize (speed 3) (safety 0)))
965
(incf (the fixnum (limiter-count pool)) delta)
966
(setf (accept-work-p pool)
967
(plusp (the fixnum (limiter-count pool))))
970
;; REVIEW 2025-04-27: may need to add more to std/spin
971
(defun update-limiter-count (pool delta)
972
(declare (thread-pool pool) (fixnum delta)
973
(optimize (speed 3) (safety 0)))
974
(with-mutex ((limiter-lock pool))
975
(update-limiter-count* pool delta))
979
(defun exec-with-worker (work worker)
980
(declare (worker worker))
981
(funcall (kernel worker) work))
983
(defun exec-without-worker (work)
985
(call-with-work-handler (funcall *kernel* work)))
987
(defun replace-worker (pool worker &optional (kernel *worker-kernel*))
988
(with-slots (workers lock) pool
990
(let ((i (position worker workers :test #'worker=)))
992
(assert (eql i (worker-index worker)))
993
(warn "Replacing lost or dead worker")
995
(make-instance 'worker
998
:bind (worker-bind worker)
999
:work (make-spin-queue))))
1000
(setf (svref workers i) new-worker)
1001
(send-worker-start new-worker)
1002
(receive-worker-start new-worker))))))
1004
(defun worker-loop (pool worker)
1005
(declare (thread-pool pool) (worker worker))
1006
(let ((sched (scheduler pool)))
1008
(loop (let ((work (find-work sched worker)))
1010
(exec-with-worker work worker)
1012
(unless *lisp-exiting-p*
1013
(replace-worker pool worker)))))
1015
(defun call-with-worker-context (fn context pool worker)
1016
(receive-worker-start worker)
1018
(funcall (print context)
1020
(let ((*worker* (find *current-thread* (workers pool)
1021
:key #'worker-thread)))
1023
(send-worker-status worker 'ok)
1024
(with-worker-restarts
1025
(%call-with-work-handler fn)))))
1026
;; This error notification is seen when `worker-context' does not
1027
;; call its worker-loop parameter, otherwise it's ignored.
1028
(send-worker-status worker 'error)))
1030
(defun enter-worker-loop (pool worker)
1031
(call-with-worker-context
1032
(lambda () (worker-loop pool worker))
1037
(defun make-all-bindings (kernel bindings)
1038
(append bindings (list (cons '*kernel* kernel))))
1040
(defun %make-worker (index class)
1041
(make-instance class :index index :thread nil))
1043
(defun make-worker-thread (pool worker &optional bind)
1044
(with-thread (:bindings (or bind (worker-bind worker)))
1045
(unwind-protect (enter-worker-loop pool worker)
1046
(notify-exit worker))))
1048
(defun make-worker (pool index &optional work (class *worker-class*))
1049
(let* ((worker (%make-worker index class))
1050
(bind (make-all-bindings *worker-kernel* (bind pool)))
1051
(worker-thread (make-worker-thread pool worker bind)))
1052
(setf (worker-thread worker) worker-thread
1053
(worker-bind worker) bind)
1054
(when work (setf (work worker) work))
1057
(defmacro with-fill-workers-handler (workers &body body)
1058
`(unwind-protect (progn ,@body)
1061
(when (typep w 'worker)
1062
(terminate-thread (worker-thread w))))
1065
(defun %fill-workers (workers pool)
1066
(dotimes (i (length workers))
1067
(setf (aref workers i) (make-worker pool i (make-spin-queue)))))
1069
(defun fill-workers (workers pool)
1070
(with-fill-workers-handler workers
1071
(%fill-workers workers pool)
1072
(map nil #'send-worker-start workers)))
1074
;; (map nil #'receive-worker-start workers)))
1075
;; (map nil #'receive-worker-start workers)))
1077
(defun make-thread-pool (worker-count &key (name :default)
1078
(bind `((*standard-output* . ,*standard-output*)
1079
(*error-output* . ,*error-output*)))
1080
(worker-kernel *worker-kernel*)
1081
(spin-count *default-spin-count*)
1083
(kernel *pool-kernel*)
1084
(class 'thread-pool))
1085
"Create a THREAD-POOL with WORKER-COUNT number of available worker threads.
1087
NAME when non-nil is an EQL-unique identifier associated with the thread-pool
1088
in *THREAD-POOL-TABLE*.
1090
BIND is an alist for establishing thread-local dynamic bindings inside worker
1093
WORKER-KERNEL which begins the worker loop and returns when the worker exits.
1095
KERNEL is a function which drives the THREAD-POOL.
1097
CLASS is the designated class of the returned THREAD-POOL object.
1099
SPIN-COUNT is the number of work-searching iterations done by the worker
1100
before going to sleep."
1101
(check-type worker-count positive-fixnum)
1102
(check-type spin-count array-index)
1103
(let ((*worker-kernel* worker-kernel)
1104
(*pool-kernel* kernel))
1105
(let* ((workers (make-array worker-count))
1106
(pool (make-instance class
1109
:kernel *pool-kernel*
1110
:accept-work-p alive
1113
:scheduler (make-scheduler workers spin-count)
1114
:limiter-count (initial-limiter-count worker-count)
1115
:limiter-lock (make-spin-lock))))
1116
(fill-workers workers pool)
1119
(defun check-kernel ()
1120
"Check the current value of *KERNEL*, ensuring it is bound appropriately
1121
according to the current thread (worker, pool, super). STORE-VALUE
1122
restarts is provided. *KERNEL* is returned."
1124
(or (and *worker* (eql *kernel* *worker-kernel*))
1125
(and (not *worker*) (not *kernel*))
1126
(and (eql *pool-kernel* *kernel*))
1127
(restart-case (error 'no-kernel-error)
1128
(store-value (value)
1129
:report "Assign a value to *KERNEL*."
1130
:interactive (lambda () (print "Value: ") (read t ))
1131
(check-type value kernel)
1132
(setf *kernel* value))))
1135
(defun check-thread-pool ()
1136
"Check the current value of *THREAD-POOL*, ensuring it is bound to a
1137
THREAD-POOL object. STORE-VALUE and MAKE-THREAD-POOL restarts are
1138
provided. *THREAD-POOL* is returned."
1140
(restart-case (error 'no-thread-pool-error)
1141
(make-thread-pool (worker-count)
1142
:report "Make a thread-pool now, prompting for number of workers."
1143
:interactive (lambda () (princ "Worker count: ") (list (read)))
1144
(setf *thread-pool* (make-thread-pool worker-count)))
1145
(store-value (value)
1146
:report "Assigne a value to *THREAD-POOL*."
1147
:interactive (lambda () (print "Value for *THREAD-POOL*: ") (read))
1148
(check-type value thread-pool)
1149
(setf *thread-pool* value)))))
1151
(defun worker-count (pool)
1152
"Return the worker count of POOL."
1153
(length (workers pool)))
1155
(defun worker-count* ()
1156
"Return the worker count of *THREAD-POOL*."
1157
(worker-count *thread-pool*))
1159
(defun worker-index* ()
1160
"If called from inside a worker return the worker's assigned index, ranging from 0 to (worker-count*)."
1161
(when-let ((worker *worker*))
1162
(worker-index worker)))
1164
(defun workers* () (workers *thread-pool*))
1166
(defun scheduler* () (scheduler *thread-pool*))
1168
(defmacro work-lambda (&body body)
1169
"Generate a 'work-lambda' with BODY. *HANDLERS* will be bound for the duration
1170
of the returned lambda."
1171
(with-gensyms (body-fn handlers)
1172
`(flet ((,body-fn () ,@body))
1173
(declare (optimize (speed 3) (safety 0)))
1174
(let ((,handlers *handlers*))
1177
(let ((*handlers* ,handlers))
1182
(defmacro pool-lambda (state &body body)
1183
"Generate a 'pool-lambda' with provided BODY. *THREAD-POOL* and *HANDLERS* are
1184
bound for the duration of the returned lambda and STATE is the name of the
1185
single required argument of the lambda. The lambda should run all code
1186
assigned to the input state and then return two values."
1187
(with-gensyms (body-fn handlers pool)
1188
`(labels ((,body-fn (,state) ,@body))
1189
(declare (optimize (speed 3) (safety 0)))
1190
(let ((,handlers *handlers*)
1191
(,pool *thread-pool*))
1194
(let ((*handlers* ,handlers)
1195
(*thread-pool* ,pool))
1198
(let ((*thread-pool* ,pool))
1199
(,body-fn ,state))))))))
1201
;; (defmacro super-lambda (&body body))
1203
;; (defmacro channel-lambda (ch &body body))
1205
(defun make-channeled-work (channel fn args)
1206
(declare (channel channel) (function fn) (list args))
1207
(let ((queue (channel-queue channel)))
1209
(unwind-protect (push-queue (with-work-context (apply fn args)) queue)
1210
(push-queue (wrap-error 'worker-killed-error) queue)))))
1213
(defun submit-raw-work (work pool &optional (priority *work-priority*))
1214
(unless (alive pool)
1215
(error "attempted to submit work to a dead thread-pool"))
1216
(schedule-work (scheduler pool) work priority))
1218
(defun submit-work (ch fn &rest args)
1219
(check-type ch channel)
1221
(make-channeled-work ch
1222
(std/curry:ensure-function fn)
1226
(defun receive-result (channel)
1227
"Remove a result from CHANNEL. If nothing is available the call will block
1228
until a result is received."
1229
(unwrap-result (pop-queue (channel-queue channel))))
1231
(defun try-receive-result (channel &key timeout)
1232
"Attempt to remove a result from CHANNEL and return (values RESULT t).
1234
By default if the channel is empty return (values nil nil)
1235
immediately. TIMEOUT, if non-nil is the number of seconds to wait for a result
1236
to appear on the queue."
1237
(multiple-value-bind (result presentp)
1238
(try-pop-queue (channel-queue channel) :timeout timeout)
1240
(values (unwrap-result result) t)
1243
(defmacro! do-fast-receives ((ret o!ch o!n) &body body)
1244
"Receive N results from channel CH, executing BODY each iteration with results
1246
`(loop for i below ,g!n
1247
do (let ((,ret (receive-result ,g!ch)))
1250
(defun steal-until-receive-result (channel worker fn)
1252
(multiple-value-bind (result presentp) (try-receive-result channel)
1255
(locally (declare (type function fn))
1256
(funcall fn result)))
1258
(steal-work* (channel-pool channel) worker)))
1260
(defun receive-results (channel count fn)
1261
(let ((worker *worker*))
1264
(steal-until-receive-result channel worker fn))
1266
(do-fast-receives (result channel count)
1267
(locally (declare (type function fn))
1268
(funcall fn result)))
1269
(do-fast-receives (result channel count)
1270
(declare (ignore result)))))))
1272
(defmacro with-submit-counted (&body body)
1273
(with-gensyms (count channel)
1275
(,channel (make-channel)))
1276
(declare (fixnum ,count))
1277
(flet ((submit-counted (&rest args)
1278
(declare (dynamic-extent args))
1279
(apply #'submit-work ,channel args)
1282
(receive-results ,channel ,count nil)))
1283
(declare (inline submit-counted receive-counted))
1286
(defun shutdown-channel (channel pool)
1287
(let ((*work-priority* :low))
1288
(submit-work channel (lambda ())))
1289
(receive-result channel)
1290
(with-slots (scheduler workers alive) pool
1291
(loop for i below (length workers)
1292
do (schedule-work scheduler nil :low))
1293
(map nil #'wait-for-worker workers)
1296
(defun end-thread-pool (&key wait)
1297
(when-let ((pool *thread-pool*))
1298
(let ((name (when (slot-boundp pool 'name) (name pool))))
1299
(when name (remhash name *thread-pool-table*))
1300
(setf *thread-pool* nil
1301
*worker-threads* (flatten *worker-threads*))
1303
(let ((channel (let ((*thread-pool* pool)) (make-instance 'channel)))
1304
(threads (map 'list #'worker-thread (workers pool))))
1306
(shutdown-channel channel pool)
1309
(cons (with-thread (:name (format nil "%shutdown-~A" (or name "thread-pool")))
1310
(shutdown-channel channel pool))
1313
(defun thread-pool-info (pool)
1314
(list :workers (worker-count pool)
1316
:spin-count (slot-value (scheduler pool) 'spin-count)
1317
:limiter-count (limiter-count pool)))
1319
(defmethod print-object ((pool thread-pool) stream)
1320
(print-unreadable-object (pool stream :type t :identity t)
1321
(format stream "~(~s ~^~)~{~s~^ ~}" (name pool) (thread-pool-info pool))))
1323
(defun broadcast-work (function &rest args)
1324
"Wait for current and pending work to complete, if any, then
1325
simultaneously execute the given work inside each worker. Wait until
1326
this work is complete, then return the results in a vector.
1328
Calling `broadcast-work' from inside a worker is an error."
1330
(error "Cannot call `broadcast-work' from inside a worker."))
1331
(let* ((function (ensure-function function))
1332
(*thread-pool* (check-thread-pool))
1333
(worker-count (worker-count*))
1334
(channel (make-instance 'channel))
1335
(from-workers (make-semaphore :name "from-workers"))
1336
(to-workers (make-semaphore :name "to-workers")))
1337
(loop repeat worker-count
1338
do (submit-work channel (lambda ()
1339
(signal-semaphore from-workers)
1340
(wait-on-semaphore to-workers)
1341
(apply function args))))
1342
(loop repeat worker-count
1343
do (wait-on-semaphore from-workers))
1344
(loop repeat worker-count
1345
do (signal-semaphore to-workers))
1346
(map-into (make-array worker-count) (lambda () (receive-result channel)))))
1348
(defun %exit-threads ()
1349
(setf *lisp-exiting-p* t))
1351
(pushnew '%exit-threads sb-ext:*exit-hooks*)
1355
(defun indexing-wrapper (array index function args)
1356
(setf (aref array index) (apply function args)))
1358
(defmacro! with-submit-indexed (o!count o!array &body body)
1359
(with-gensyms (channel)
1360
`(let ((,channel (make-instance 'channel)))
1361
(flet ((submit-indexed (index function &rest args)
1363
,channel #'indexing-wrapper ,g!array index function args))
1365
(receive-results ,channel ,g!count nil)
1367
(declare (inline submit-indexed receive-indexed))
1370
(defmacro with-submit-cancelable (&body body)
1371
(with-gensyms (canceledp channel count)
1372
`(let ((,canceledp nil)
1374
(,channel (make-channel)))
1375
(flet ((submit-cancelable (fn &rest args)
1376
(submit-work ,channel
1382
(macrolet ((receive-cancelables (result &body body)
1384
,',channel ,',count (lambda (,result) ,@body))))
1385
(unwind-protect (progn ,@body)
1386
(setf ,canceledp t)))))))
1388
(defun call-with-temp-pool (fn &rest args)
1389
;; ensure that we end the same pool we create
1390
(let ((pool (apply #'make-thread-pool args)))
1392
(let ((*thread-pool* pool))
1394
(let ((*thread-pool* pool))
1395
(end-thread-pool :wait t)))))
1397
(defmacro with-temp-pool ((&rest make-pool-args) &body body)
1398
"Create a temporary pool for the duration of `body', ensuring that
1399
`end-thread-pool' is eventually called. `make-thread-pool' is given the
1400
arguments `make-pool-args'.
1402
**NOTE**: Use this only if you understand its implications. Since
1403
`*thread-pool*' is unaffected outside `body', the REPL will be useless with
1404
respect to the temporary pool. For instance calling `kill'
1405
from the REPL will not affect tasks that are running in the temporary
1408
Multiple uses of `with-temp-pool' within the same application are
1409
prone to defeat the purpose and benefits of having a thread pool. This
1410
is an especial risk if `with-temp-pool' appears inside a library,
1411
which is likely to be a suboptimal situation.
1413
While using `with-temp-pool' is generally a bad idea, there are a
1414
few valid uses, such as for testing, where the code is non-critical or
1415
where convenience trumps other concerns."
1416
`(call-with-temp-pool (lambda () ,@body) ,@make-pool-args))
1418
(defmacro defkernel (name-and-opts (&rest args) &body body)
1419
"Define a new kernel function with NAME ARGS and BODY."
1420
(typecase name-and-opts
1421
(cons `(definline ,(car name-and-opts) ,args ,@body))
1422
(t `(definline ,name-and-opts ,args ,@body))))
1425
;; From Shinmera's VERBOSE
1426
(defstruct sync-message
1427
(condition (make-waitqueue))
1428
(lock (make-mutex)))
1430
(defmethod lock ((self sync-message)) (sync-message-lock self))
1432
(defmethod msg ((vector vector) (msg sync-message))
1433
;; ensure we're waiting on the condition..
1434
(with-mutex ((sync-message-lock msg)))
1435
(condition-notify (sync-message-condition msg)))
1437
(defmacro with-sync-message (s &body body)
1438
`(let ((,s (make-sync-message)))
1439
(with-mutex ((sync-message-lock ,s))
1441
(condition-wait* (sync-message-condition ,s) (sync-message-lock ,s)))))