Coverage report: /home/ellis/comp/core/std/task.lisp
Kind | Covered | All | % |
expression | 0 | 228 | 0.0 |
branch | 0 | 0 | nil |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; task.lisp --- Standard Task API
14
(defvar *task-class* 'task)
15
(defvar *task-priority* nil)
18
(define-condition task-error (thread-error) ()
19
(:report (lambda (condition stream)
20
(format stream "Unhandled task error in thread ~A"
21
(thread-error-thread condition))))
22
(:documentation "An error which occurs while processing a task."))
24
(defun task-error (thread)
25
"Signal a TASK-ERROR associated with THREAD."
26
(error 'task-error :thread thread))
29
(defmacro make-task-kernel (name args lock queue mailbox timeout &body body &environment env)
30
(declare (ignorable env))
33
(wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
34
(let ((*task* (dequeue ,queue)))
36
(handler-case (setf *result* (progn ,@body))
37
(error () (task-error *current-thread*)))
38
(send-message ,mailbox *result*)
39
(release-foreground))))))
41
(defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
42
"Define a task kernel.
44
The kernel should process all options and return a function - the
47
The kernel function is installed in worker threads by passing it to
48
SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
51
Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE)
52
and *result* is bound to the return value of BODY.
54
This interface is experimental and subject to change."
55
`(make-task-kernel ,name ,args
56
,(if lock lock '(sb-thread:make-semaphore))
57
,(if queue queue '(make-queue))
58
,(if mailbox mailbox '(sb-concurrency:make-mailbox))
63
(defgeneric task (self)
64
(:documentation "Return the task associated with SELF."))
65
(defgeneric result (self)
66
(:documentation "Return the result associated with SELF."))
68
(defgeneric tasks (self)
69
(:documentation "Return the tasks associated with SELF."))
70
(defgeneric results (self)
71
(:documentation "Return the results associated with SELF."))
73
(defgeneric jobp (self)
74
(:method ((self t)) nil)
75
(:documentation "Return Non-nil if SELF is a job."))
76
(defgeneric taskp (self)
77
(:method ((self t)) nil)
78
(:documentation "Return Non-nil if SELF is a task."))
81
(defclass task-worker (worker)
82
((tasks :accessor tasks :initarg :tasks :type spin-queue))
83
(:documentation "A Worker which stores a queue of TASKS."))
86
(defclass task-pool (thread-pool)
87
((tasks :initform (if (boundp '*tasks*) *tasks*) :initarg :tasks :accessor tasks)
88
;; TODO: test weak-vector here
89
(workers :initform (make-array 0 :element-type 'task-worker :adjustable t) :type (vector worker)
90
:initarg :workers :accessor workers)
91
(results :initform (sb-concurrency:make-mailbox :name "results") :accessor results :initarg :results))
92
(:documentation "A thread-pool which maintains a dynamic list of TASKS."))
94
(defun task-pool-info (tp)
95
"Return a plist of info about task-pool TP."
97
(std/thread::thread-pool-info tp)
99
:tasks (queue-count (tasks tp))
100
:results (sb-concurrency:mailbox-count (results tp)))))
102
(defmethod print-object ((self task-pool) stream)
103
(print-unreadable-object (self stream :type t :identity t)
104
(format stream "~(~A ~^~)~{~s~^ ~}" (name self) (task-pool-info self))))
106
(defun kill-workers (pool)
107
"Call FINISH-THREADS on task-pool's workers."
108
(dotimes (i (length (workers pool)))
109
(kill-worker (vector-pop (workers pool)))))
111
(defmethod designate-oracle ((self task-pool) (guest thread))
112
(let ((id (make-oracle guest)))
113
(setf (gethash id *oracle-table*)
114
(vector-push-extend (sb-ext:make-weak-pointer self) (gethash id *oracle-table*)))))
116
(defmethod designate-oracle ((self task-pool) (guest (eql t)))
117
(designate-oracle self *current-thread*))
119
(declaim (inline push-worker push-workers pop-worker))
120
(defun push-worker (worker pool)
121
(vector-push-extend worker (workers pool)))
123
(defun push-workers (threads pool)
124
"Push a list of THREADS to POOL."
125
(with-slots (workers) pool
127
(vector-push-extend w workers))))
129
(defmethod pop-worker (pool)
130
"Pop the next worker from POOL."
131
(vector-pop (workers pool)))
133
(defun start-task-worker (pool index)
134
"Start the TASK-WORKER at INDEX of POOL."
135
;; (with-recursive-lock
136
(start-worker (aref (workers pool) index)))
138
(defun start-task-workers (pool)
139
"Start all workers in the given task POOL."
140
(loop for w across (workers pool)
141
do (start-worker w)))
145
((state :initform nil :initarg :state :accessor task-state))
146
(:documentation "This object represents a single unit of work to be done by some
147
worker. Tasks are typically distributed from the task-pool, but workers may
148
also be granted the ability to create and distribute their own tasks. Once a
149
task is assigned, the 'owner', i.e. the worker that is assigned this task, may
150
modify the object. When the work associated with a task is complete, the owner
151
is responsible for indicating in the state slot the result of the computation."))
153
(defmethod print-object ((self task) stream)
154
(print-unreadable-object (self stream :type t)
155
(format stream ":state ~A" (task-state self))))
157
(defmethod taskp ((self task)) t)
159
(defun run-task (worker task)
160
"Run TASK on WORKER."
161
(push task (tasks worker))
164
(defmethod run-object ((self task) &key worker)
165
(run-task worker self))
168
(defclass scheduled-task (task)
169
((schedule :initarg :schedule :initform (get-universal-time) :accessor task-schedule))
170
(:documentation "A task object with an associated schedule."))
172
(defmethod run-object ((self scheduled-task) &key time repeat absolute-p catch-up worker name)
173
(sb-ext:schedule-timer
174
(sb-ext:make-timer (task-state self) :thread worker :name name)
175
time :repeat-interval repeat :absolute-p absolute-p :catch-up catch-up))
179
((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
183
(lock :initform (make-mutex :name "job") :type mutex
185
(:documentation "A collection of tasks forming a single unit of work."))
187
(defgeneric jobs (self)
188
(:documentation "Return the jobs associated with SELF."))
189
(defmethod jobp ((self job)) t)
190
(defmethod taskp ((self job)) t)
192
(declaim (inline make-job))
193
(defun make-job (&rest tasks)
194
"Return a new job containing TASKS."
196
:tasks (make-array (length tasks)
198
:initial-contents tasks)))
200
(defmethod print-object ((self job) stream)
201
(print-unreadable-object (self stream :type t)
202
(format stream "~A tasks" (length (tasks self)))))
204
(defun run-job (worker job)
206
(setf (tasks worker) (coerce 'list (tasks job)))
209
(defmethod run-object ((self job) &key worker)
210
(run-job worker self))
213
(defclass work-scope ()
214
((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
218
(lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex))
219
(:documentation "A scope of work containing TASKS and a LOCK."))
221
(defmethod print-object ((self work-scope) stream)
222
(print-unreadable-object (self stream :type t)
223
(format stream "~A" (tasks self))))
225
(defun make-task-pool (worker-count &key (name :default) (kernel *kernel*)
226
(task-class *task-class*) initial-task
229
"Make a new TASK-POOL with a worker capacity of WORKER-COUNT."
230
(let ((*worker-class* 'task-worker))
231
(let ((tp (make-thread-pool
237
(%tasks (or tasks worker-count)))
238
(declare (task-pool tp))
243
:element-type task-class
244
:initial-element (or initial-task (make-instance task-class))))
245
(results tp) (sb-concurrency:make-mailbox :name "results"))
249
(defmacro with-task-pool ((sym &key (tasks (std/alien:num-cpus)) (workers (std/alien:num-cpus)) #+nil start)
251
"Eval BODY with SYM bound to a new TASK-POOL."
252
`(let ((,sym (make-task-pool ,workers :tasks ,tasks)))
253
;; ,@(when start `((start-task-workers ,sym)))