Coverage report: /home/ellis/comp/core/std/task.lisp

KindCoveredAll%
expression0228 0.0
branch00nil
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; task.lisp --- Standard Task API
2
 
3
 ;; 
4
 
5
 ;;; Code:
6
 (in-package :std/task)
7
 
8
 ;;; Vars
9
 (defvar *tasks*)
10
 (defvar *jobs*)
11
 (defvar *job*)
12
 (defvar *stage*)
13
 (defvar *task*)
14
 (defvar *task-class* 'task)
15
 (defvar *task-priority* nil)
16
 (defvar *result* nil)
17
 
18
 (define-condition task-error (thread-error) ()
19
   (:report (lambda (condition stream)
20
              (format stream "Unhandled task error in thread ~A" 
21
                      (thread-error-thread condition))))
22
   (:documentation "An error which occurs while processing a task."))
23
 
24
 (defun task-error (thread)
25
   "Signal a TASK-ERROR associated with THREAD."
26
   (error 'task-error :thread thread))
27
 
28
 ;;; Kernel
29
 (defmacro make-task-kernel (name args lock queue mailbox timeout &body body &environment env)
30
   (declare (ignorable env))
31
   `(compile ',name
32
             (lambda ,args 
33
               (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
34
               (let ((*task* (dequeue ,queue)))
35
                 (unwind-protect 
36
                      (handler-case (setf *result* (progn ,@body))
37
                        (error () (task-error *current-thread*)))
38
                   (send-message ,mailbox *result*)
39
                   (release-foreground))))))
40
 
41
 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
42
   "Define a task kernel.
43
 
44
 The kernel should process all options and return a function - the
45
 'kernel function'.
46
 
47
 The kernel function is installed in worker threads by passing it to
48
 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
49
 specified by ARGS.
50
 
51
 Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE)
52
 and *result* is bound to the return value of BODY.
53
 
54
 This interface is experimental and subject to change."
55
   `(make-task-kernel ,name ,args 
56
        ,(if lock lock '(sb-thread:make-semaphore))
57
        ,(if queue queue '(make-queue))
58
        ,(if mailbox mailbox '(sb-concurrency:make-mailbox))
59
        ,timeout
60
      ,@body))
61
 
62
 ;;; Proto
63
 (defgeneric task (self)
64
   (:documentation "Return the task associated with SELF."))
65
 (defgeneric result (self)
66
   (:documentation "Return the result associated with SELF."))
67
 
68
 (defgeneric tasks (self)
69
   (:documentation "Return the tasks associated with SELF."))
70
 (defgeneric results (self)
71
   (:documentation "Return the results associated with SELF."))
72
 
73
 (defgeneric jobp (self)
74
   (:method ((self t)) nil)
75
   (:documentation "Return Non-nil if SELF is a job."))
76
 (defgeneric taskp (self)
77
   (:method ((self t)) nil)
78
   (:documentation "Return Non-nil if SELF is a task."))
79
 
80
 ;;; Task Worker
81
 (defclass task-worker (worker)
82
   ((tasks :accessor tasks :initarg :tasks :type spin-queue))
83
   (:documentation "A Worker which stores a queue of TASKS."))
84
 
85
 ;;; Task Pool
86
 (defclass task-pool (thread-pool)
87
   ((tasks :initform (if (boundp '*tasks*) *tasks*) :initarg :tasks :accessor tasks)
88
    ;; TODO: test weak-vector here
89
    (workers :initform (make-array 0 :element-type 'task-worker :adjustable t) :type (vector worker)
90
             :initarg :workers :accessor workers)
91
    (results :initform (sb-concurrency:make-mailbox :name "results") :accessor results :initarg :results))
92
   (:documentation "A thread-pool which maintains a dynamic list of TASKS."))
93
 
94
 (defun task-pool-info (tp)
95
   "Return a plist of info about task-pool TP."
96
   (append
97
    (std/thread::thread-pool-info tp)
98
    (list
99
     :tasks (queue-count (tasks tp))
100
     :results (sb-concurrency:mailbox-count (results tp)))))
101
 
102
 (defmethod print-object ((self task-pool) stream)
103
   (print-unreadable-object (self stream :type t :identity t)
104
     (format stream "~(~A ~^~)~{~s~^ ~}" (name self) (task-pool-info self))))
105
 
106
 (defun kill-workers (pool)
107
   "Call FINISH-THREADS on task-pool's workers."
108
   (dotimes (i (length (workers pool)))
109
     (kill-worker (vector-pop (workers pool)))))
110
 
111
 (defmethod designate-oracle ((self task-pool) (guest thread))
112
   (let ((id (make-oracle guest)))
113
     (setf (gethash id *oracle-table*)
114
           (vector-push-extend (sb-ext:make-weak-pointer self) (gethash id *oracle-table*)))))
115
 
116
 (defmethod designate-oracle ((self task-pool) (guest (eql t)))
117
   (designate-oracle self *current-thread*))
118
 
119
 (declaim (inline push-worker push-workers pop-worker))
120
 (defun push-worker (worker pool)
121
   (vector-push-extend worker (workers pool)))
122
 
123
 (defun push-workers (threads pool)
124
   "Push a list of THREADS to POOL."
125
   (with-slots (workers) pool
126
     (dolist (w threads)
127
       (vector-push-extend w workers))))
128
 
129
 (defmethod pop-worker (pool)
130
   "Pop the next worker from POOL."
131
   (vector-pop (workers pool)))
132
 
133
 (defun start-task-worker (pool index)
134
   "Start the TASK-WORKER at INDEX of POOL."
135
   ;; (with-recursive-lock
136
   (start-worker (aref (workers pool) index)))
137
 
138
 (defun start-task-workers (pool)
139
   "Start all workers in the given task POOL."
140
   (loop for w across (workers pool)
141
         do (start-worker w)))
142
 
143
 ;;; Task
144
 (defclass task ()
145
   ((state :initform nil :initarg :state :accessor task-state))
146
   (:documentation "This object represents a single unit of work to be done by some
147
 worker. Tasks are typically distributed from the task-pool, but workers may
148
 also be granted the ability to create and distribute their own tasks. Once a
149
 task is assigned, the 'owner', i.e. the worker that is assigned this task, may
150
 modify the object. When the work associated with a task is complete, the owner
151
 is responsible for indicating in the state slot the result of the computation."))
152
 
153
 (defmethod print-object ((self task) stream)
154
   (print-unreadable-object (self stream :type t)
155
     (format stream ":state ~A" (task-state self))))
156
 
157
 (defmethod taskp ((self task)) t)
158
 
159
 (defun run-task (worker task)
160
   "Run TASK on WORKER."
161
   (push task (tasks worker))
162
   (run-worker worker))
163
 
164
 (defmethod run-object ((self task) &key worker)
165
   (run-task worker self))
166
 
167
 ;;;; Scheduled Tasks
168
 (defclass scheduled-task (task)
169
   ((schedule :initarg :schedule :initform (get-universal-time) :accessor task-schedule))
170
   (:documentation "A task object with an associated schedule."))
171
 
172
 (defmethod run-object ((self scheduled-task) &key time repeat absolute-p catch-up worker name)
173
   (sb-ext:schedule-timer 
174
    (sb-ext:make-timer (task-state self) :thread worker :name name)
175
    time :repeat-interval repeat :absolute-p absolute-p :catch-up catch-up))
176
 
177
 ;;; Job
178
 (defclass job (task)
179
   ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
180
           :type (array task *)
181
           :initarg :tasks
182
           :accessor tasks)
183
    (lock :initform (make-mutex :name "job") :type mutex
184
          :initarg :lock))
185
   (:documentation "A collection of tasks forming a single unit of work."))
186
 
187
 (defgeneric jobs (self)
188
   (:documentation "Return the jobs associated with SELF."))
189
 (defmethod jobp ((self job)) t)
190
 (defmethod taskp ((self job)) t)
191
   
192
 (declaim (inline make-job))
193
 (defun make-job (&rest tasks)
194
   "Return a new job containing TASKS."
195
   (make-instance 'job
196
     :tasks (make-array (length tasks) 
197
                        :element-type 'task
198
                        :initial-contents tasks)))
199
 
200
 (defmethod print-object ((self job) stream)
201
   (print-unreadable-object (self stream :type t)
202
     (format stream "~A tasks" (length (tasks self)))))
203
 
204
 (defun run-job (worker job)
205
   "Run JOB on WORKER."
206
   (setf (tasks worker) (coerce 'list (tasks job)))
207
   (run-worker worker))
208
 
209
 (defmethod run-object ((self job) &key worker)
210
   (run-job worker self))
211
 
212
 ;;; Work Scope
213
 (defclass work-scope ()
214
   ((tasks  :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
215
            :initarg :tasks
216
            :accessor tasks
217
            :type (vector task))
218
    (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex))
219
   (:documentation "A scope of work containing TASKS and a LOCK."))
220
 
221
 (defmethod print-object ((self work-scope) stream)
222
   (print-unreadable-object (self stream :type t)
223
     (format stream "~A" (tasks self))))
224
 
225
 (defun make-task-pool (worker-count &key (name :default) (kernel *kernel*) 
226
                                          (task-class *task-class*) initial-task
227
                                          tasks
228
                                          alive)
229
   "Make a new TASK-POOL with a worker capacity of WORKER-COUNT."
230
   (let ((*worker-class* 'task-worker))
231
     (let ((tp (make-thread-pool
232
                worker-count 
233
                :class 'task-pool
234
                :alive alive
235
                :name name
236
                :kernel kernel))
237
           (%tasks (or tasks worker-count)))
238
       (declare (task-pool tp))
239
       (setf (tasks tp)
240
             (make-queue
241
              :initial-contents
242
              (make-array %tasks
243
                          :element-type task-class
244
                          :initial-element (or initial-task (make-instance task-class))))
245
             (results tp) (sb-concurrency:make-mailbox :name "results"))
246
       tp)))
247
 
248
 ;;; Macros
249
 (defmacro with-task-pool ((sym &key (tasks (std/alien:num-cpus)) (workers (std/alien:num-cpus)) #+nil start)
250
                           &body body)
251
   "Eval BODY with SYM bound to a new TASK-POOL."
252
   `(let ((,sym (make-task-pool ,workers :tasks ,tasks)))
253
      ;; ,@(when start `((start-task-workers ,sym)))
254
      ,@body))