Coverage report: /home/ellis/comp/core/lib/q/query.lisp

KindCoveredAll%
expression0612 0.0
branch014 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; obj/query/pkg.lisp --- Query Objects
2
 
3
 ;; Lisp primitive Query objects for DIY query engines.
4
 
5
 ;;; Commentary:
6
 
7
 ;; This package provides the base set of classes and methods for implementing
8
 ;; query engines.
9
 
10
 ;; The intention is to use these objects in several high-level packages where
11
 ;; we need the ability to ask complex questions about some arbitrary data
12
 ;; source.
13
 
14
 ;; The type of high-level packages can loosely be categorized as:
15
 
16
 ;; - Frontends :: The interface exposed to the user - SQL, Prolog, etc.
17
 
18
 ;; - Middleware :: interfaces which are used internally and exposed publicly -
19
 ;;   query planners/optimizers/ast
20
 
21
 ;; - Backends :: The interface exposed to the underlying data sources -
22
 ;;   RocksDB, SQLite, etc.
23
 
24
 ;;;; Refs
25
 
26
 ;; https://gist.github.com/twitu/221c8349887cec0a83b395e4cbb492a7
27
 
28
 ;; https://www1.columbia.edu/sec/acis/db2/db2d0/db2d0103.htm
29
 
30
 ;; https://howqueryengineswork.com/
31
 
32
 ;;; Code:
33
 (in-package :q/proto)
34
 (declaim (optimize (debug 3)))
35
 (defvar *query* nil)
36
 
37
 ;;; Proto
38
 (defgeneric select (self names)
39
   (:method ((self schema) (names list))
40
     (let* ((fields (fields self))
41
            (ret (make-array (length fields) :element-type 'field :fill-pointer 0
42
                                             :initial-element (make-field))))
43
       (make-instance 'schema
44
         :fields (dolist (n names ret)
45
                   (if-let ((found (find n fields :test 'equal :key 'field-name)))
46
                     (vector-push found ret)
47
                     (error 'invalid-argument :item n :reason "Invalid column name"))))))
48
   (:method ((self schema) (names vector))
49
     (let* ((fields (fields self))
50
            (ret (make-array (length fields) :element-type 'field :fill-pointer 0
51
                                             :initial-element (make-field))))
52
       (make-instance 'schema
53
         :fields (loop for n across names
54
                       do (if-let ((found (find n fields :test 'equal :key 'field-name)))
55
                            (vector-push found ret)
56
                            (error 'invalid-argument :item n :reason "Invalid column name"))
57
                       finally (return ret))))))
58
 
59
 (defgeneric project (self indices)
60
   (:method ((self schema) (indices list))
61
     (make-instance 'schema
62
       :fields (coerce (mapcar (lambda (i) (aref (fields self) i)) indices) 'field-vector)))
63
   (:method ((self schema) (indices vector))
64
     (make-instance 'schema
65
       :fields (coerce
66
                (loop for i across indices
67
                      collect (aref (fields self) i))
68
                'field-vector))))
69
 
70
 ;;; Expressions
71
 (defclass query-expr (expr) ())
72
 
73
 (defclass query-plan (ast)
74
   ((schema :type schema :accessor schema :initarg :schema)
75
    (ast :type (vector query-plan))))
76
 
77
 (defclass logical-query-plan (query-plan logical-plan)
78
   ((ast :type (vector logical-query-plan) :accessor ast :initarg :ast)))
79
 
80
 (defclass physical-query-plan (query-plan physical-plan)
81
   ((ast :type (vector physical-query-plan))))
82
 
83
 ;;; Logical Expressions
84
 (defgeneric to-field (self input)
85
   (:method ((self string) (input logical-query-plan))
86
     (declare (ignore input))
87
     (make-field :name self :type 'string))
88
   (:method ((self number) (input logical-query-plan))
89
     (declare (ignore input))
90
     (make-field :name (princ-to-string self) :type 'number)))
91
 
92
 (defclass column-expression (logical-expr query-expr)
93
   ((name :type string :initarg :name :accessor name)))
94
 
95
 (defmethod to-field ((self column-expression) (input logical-query-plan))
96
   (or (find (name self) (fields (schema input)) :test 'equal :key 'field-name)
97
       (error 'invalid-argument :item (name self) :reason "Invalid column name")))
98
 
99
 (defmethod df-col ((self string))
100
   (make-instance 'column-expression :name self))
101
 
102
 ;;;;; Alias
103
 (defclass alias-expression (logical-expr)
104
   ((expr :type logical-expr :initarg :expr :accessor expr)
105
    (alias :type string :initarg :alias)))
106
 
107
 (defclass cast-expression (logical-expr)
108
   ((expr :type logical-expr :initarg :expr :accessor expr)
109
    (data-type :type form :initarg :data-type)))
110
 
111
 (defmethod to-field ((self cast-expression) (input logical-query-plan))
112
   (make-field :name (field-name (to-field (expr self) input)) :type (slot-value self 'data-type)))
113
 
114
 ;;;;; Unary
115
 (defclass unary-expression (logical-expr unary-expr)
116
   ((expr :type logical-expr :accessor expr)))
117
 
118
 ;;;;; Binary
119
 (defclass binary-expression (logical-expr binary-expr) ())
120
 
121
 (defclass boolean-binary-expression (binary-expression)
122
   ((name :initarg :name :type string :accessor name)
123
    (op :initarg :op :type symbol :accessor expr-op)))
124
 
125
 (defmethod to-field ((self boolean-binary-expression) (input logical-query-plan))
126
   (declare (ignore input))
127
   (make-field :name (name self) :type 'boolean))
128
 
129
 ;; Equiv Expr
130
 (defclass eq-expression (boolean-binary-expression) ()
131
   (:default-initargs
132
    :name "eq"
133
    :op 'eq))
134
 
135
 (defclass neq-expression (boolean-binary-expression) ()
136
   (:default-initargs
137
    :name "neq"
138
    :op 'neq))
139
 
140
 (defclass gt-expression (boolean-binary-expression) ()
141
   (:default-initargs
142
    :name "gt"
143
    :op '>))
144
 
145
 (defclass lt-expression (boolean-binary-expression) ()
146
   (:default-initargs
147
    :name "lt"
148
    :op '<))
149
 
150
 (defclass gteq-expression (boolean-binary-expression) ()
151
   (:default-initargs
152
    :name "gteq"
153
    :op '>=))
154
 
155
 (defclass lteq-expression (boolean-binary-expression) ()
156
   (:default-initargs
157
    :name "lteq"
158
    :op '<=))
159
 
160
 ;; Bool Expr
161
 (defclass and-expression (boolean-binary-expression) ()
162
   (:default-initargs
163
    :name "and"
164
    :op 'and))
165
 
166
 (defclass or-expression (boolean-binary-expression) ()
167
   (:default-initargs
168
    :name "or"
169
    :op 'or))
170
 
171
 ;; Math Expr
172
 (defclass math-expression (binary-expression)
173
   ((name :initarg :name :type string :accessor name)
174
    (op :initarg :op :type symbol :accessor expr-op)))
175
 
176
 ;; ;; TODO 2024-08-03: ???
177
 ;; (defmethod to-field ((self math-expression) (input logical-query-plan))
178
 ;;   (declare (ignorable input))
179
 ;;   (make-field :name "*" :type (field-type (to-field (lhs self) input))))
180
 
181
 (defclass add-expression (math-expression) ()
182
   (:default-initargs
183
    :name "add"
184
    :op '+))
185
 
186
 (defclass sub-expression (math-expression) ()
187
   (:default-initargs
188
    :name "sub"
189
    :op '-))
190
 
191
 (defclass mult-expression (math-expression) ()
192
   (:default-initargs
193
    :name "mult"
194
    :op '*))
195
 
196
 (defclass div-expression (math-expression) ()
197
   (:default-initargs
198
    :name "div"
199
    :op '/))
200
 
201
 (defclass mod-expression (math-expression) ()
202
   (:default-initargs
203
    :name "mod"
204
    :op 'mod))
205
 
206
 ;;;;; Agg Expr
207
 (deftype aggregate-function () `(function ((input logical-expr)) query-expr))
208
 
209
 (deftype aggregate-function-designator () `(or aggregate-function symbol))
210
 
211
 (defclass aggregate-expression (logical-expr)
212
   ((name :type string)
213
    (expr :type logical-expr :accessor expr)))
214
 
215
 (defgeneric aggregate-expression-p (self)
216
   (:method ((self aggregate-expression)) t)
217
   (:method ((self alias-expression)) (aggregate-expression-p (expr self)))
218
   (:method ((self t)) nil))
219
 
220
 (defmethod to-field ((self aggregate-expression) (input logical-query-plan))
221
   (declare (ignorable input))
222
   (make-field :name (slot-value self 'name) :type (field-type (to-field (slot-value self 'expr) input))))
223
 
224
 (defclass sum-expression (aggregate-expression) ()
225
   (:default-initargs
226
    :name "SUM"))
227
 
228
 (defclass min-expression (aggregate-expression) ()
229
   (:default-initargs
230
    :name "MIN"))
231
 
232
 (defclass max-expression (aggregate-expression) ()
233
   (:default-initargs
234
    :name "MAX"))
235
 
236
 (defclass avg-expression (aggregate-expression) ()
237
   (:default-initargs
238
    :name "AVG"))
239
 
240
 (defclass count-expression (aggregate-expression) ()
241
   (:default-initargs
242
    :name "COUNT"))
243
 
244
 (defmethod to-field ((self count-expression) (input logical-query-plan))
245
   (declare (ignore input))
246
   (make-field :name "COUNT" :type 'number))
247
 
248
 ;;; Logical Plan
249
 
250
 ;;;;; Scan
251
 (defclass scan-data (logical-query-plan)
252
   ((path :type string :initarg :path)
253
    (data-source :type data-source :initarg :data-source)
254
    (projection :type (vector string) :initarg :projection)))
255
 
256
 ;; (defmethod derive-schema ((self scan-data))
257
 ;;   (let ((proj (slot-value self 'projection)))
258
 ;;     (if (= 0 (length proj))
259
 ;;         (schema self)
260
 ;;         (select (slot-value self 'schema) proj))))
261
 
262
 (defmethod schema ((self scan-data))
263
   (derive-schema self))
264
 
265
 ;;;;; Projection
266
 (defclass projection (logical-query-plan)
267
   ((input :type logical-query-plan :initarg :input)
268
    (expr :type (vector logical-expr) :initarg :expr)))
269
 
270
 ;; (defmethod schema ((self projection))
271
 ;;   (schema (slot-value self 'input)))
272
 
273
 ;;;;; Selection
274
 (defclass selection (logical-query-plan)
275
   ((input :type logical-query-plan :initarg :input)
276
    (expr :type logical-expr :initarg :expr)))
277
 
278
 ;; (defmethod schema ((self selection))
279
 ;;   (schema (slot-value self 'input)))
280
 
281
 ;;;;; Aggregate
282
 (defclass aggregate (logical-query-plan)
283
   ((input :type logical-query-plan :initarg :input)
284
    (group-expr :type (vector logical-expr) :initarg :group-expr)
285
    (agg-expr :type (vector aggregate-expression) :initarg :agg-expr)))
286
 
287
 ;; (defmethod schema ((self aggregate))
288
 ;;   (let ((input (slot-value self 'input))
289
 ;;         (ret))
290
 ;;     (loop for g across (slot-value self 'group-expr)
291
 ;;           do (push (to-field g input) ret))
292
 ;;     (loop for a across (slot-value self 'agg-expr)
293
 ;;           do (push (to-field a input) ret))
294
 ;;     (apply 'make-simple-schema ret)))
295
 
296
 ;;;;; Limit
297
 (defclass limit (logical-query-plan)
298
   ((input :type logical-query-plan :initarg :input)
299
    (limit :type integer)))
300
 
301
 ;; (defmethod schema ((self limit))
302
 ;;   (setf (slot-value self 'schema)
303
 ;;         (schema (slot-value self 'input))))
304
 
305
 ;; (defmethod ast ((self limit))
306
 ;;   (setf (slot-value self 'ast)
307
 ;;         (ast (slot-value self 'input))))
308
 
309
 ;;;;; Joins
310
 (defclass join (logical-query-plan)
311
   ((left :accessor lhs)
312
    (right :accessor rhs)
313
    (on :accessor join-on)))
314
 
315
 (defclass inner-join (join) ())
316
 (defclass outer-join (join) ())
317
 (defclass left-join (join) ())
318
 (defclass right-join (join) ())
319
 ;; left-outer-join
320
 ;; right-outer-join
321
 ;; semi-join
322
 ;; anti-join
323
 ;; cross-join
324
 
325
 (defmethod schema ((self join))
326
   ;; TODO 2024-08-04: test better dupe impl
327
   (let ((dupes (mapcon #'(lambda (l) (when (eq (car l) (second l)) (list (car l))))
328
                        (coerce (join-on self) 'cons)))
329
         (schema (make-instance 'schema)))
330
     (setf (fields schema)
331
           (typecase self
332
             (right-join
333
              (let ((l (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (lhs self)))))
334
                    (r (fields (schema (rhs self)))))
335
                (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))
336
             (inner-join
337
              (let ((l (fields (schema (lhs self))))
338
                    (r (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (rhs self))))))
339
                (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))))
340
     schema))
341
 
342
 (defmethod ast ((self join))
343
   (vector (lhs self) (rhs self))) 
344
 
345
 ;;; Subqueries
346
 
347
 ;;  TODO 2024-08-02: 
348
 
349
 ;; subquery
350
 
351
 ;; correlated-subquery
352
 
353
 ;; SELECT id, name, (SELECT count(*) FROM orders WHERE customer_id = customer.id) AS num_orders FROM customers
354
 
355
 ;; uncorrelated-subquery
356
 
357
 ;; scalar-subquery
358
 
359
 ;; SELECT * FROM orders WHERE total > (SELECT avg(total) FROM sales WHERE customer_state = 'CA')
360
 
361
 ;; NOTE 2024-08-02: EXISTS, IN, NOT EXISTS, and NOT IN are also subqueries
362
 
363
 ;;; Dataframes
364
 (defgeneric df-project (df exprs)
365
   (:method ((df data-frame) (expr list))
366
     (df-project df (coerce expr 'vector)))
367
   (:method ((df data-frame) (expr vector))
368
     (setf (df-plan df)
369
           (make-instance 'projection
370
             :input (df-plan df)
371
             :expr expr))
372
     df))
373
 
374
 (defgeneric df-filter (df expr)
375
   (:method ((df data-frame) (expr logical-expr))
376
     (setf (df-plan df)
377
           (make-instance 'selection :input (df-plan df) :expr expr))
378
     df))
379
 
380
 (defgeneric df-aggregate (df group-by agg-expr)
381
   (:method ((df data-frame) (group-by vector) (agg-expr vector))
382
     (setf (df-plan df)
383
           (make-instance 'aggregate :input (df-plan df)
384
                          :group-expr group-by
385
                          :agg-expr agg-expr))
386
     df)
387
   (:method ((df data-frame) (group-by list) (agg-expr list))
388
     (df-aggregate df (coerce group-by 'vector) (coerce agg-expr 'vector))))
389
 
390
 ;;; Physical Expression
391
 (defclass literal-physical-expression (physical-expr literal-expr) ())
392
 
393
 (defgeneric evaluate (self input)
394
   (:documentation "Evaluate the expression SELF with INPUT and return a COLUMN-VECTOR result.")
395
   (:method ((self string) (input record-batch))
396
     (make-instance 'literal-value-vector
397
       :size (row-count input)
398
       :type 'string
399
       :data (sb-ext:string-to-octets self)))
400
   (:method ((self number) (input record-batch))
401
     (make-instance 'literal-value-vector :size (row-count input) :type 'number :data self)))
402
 
403
 (defclass column-physical-expression (physical-expr)
404
   ((val :type array-index :initarg :val)))
405
 
406
 (defmethod evaluate ((self column-physical-expression) (input record-batch))
407
   (field input (slot-value self 'val)))
408
 
409
 (defclass binary-physical-expression (physical-expr)
410
   ((lhs :type physical-expr :accessor lhs :initarg :lhs)
411
    (rhs :type physical-expr :accessor rhs :initarg :rhs)))
412
 
413
 (defgeneric evaluate2 (self lhs rhs))
414
 
415
 (defmethod evaluate ((self binary-physical-expression) (input record-batch))
416
   (let ((ll (evaluate (lhs self) input))
417
         (rr (evaluate (rhs self) input)))
418
     (assert (= (length ll) (length rr)))
419
     (if (eql (column-type ll) (column-type rr))
420
         (evaluate2 self ll rr)
421
         (error "invalid state: lhs != rhs"))))
422
 
423
 (defclass eq-physical-expression (binary-physical-expression) ())
424
 
425
 (defmethod evaluate2 ((self eq-physical-expression) lhs rhs)
426
   (declare (ignore self))
427
   (equal lhs rhs))
428
 
429
 (defclass neq-physical-expression (binary-physical-expression) ())
430
 
431
 (defmethod evaluate2 ((self neq-physical-expression) lhs rhs)
432
   (declare (ignore self))
433
   (equal lhs rhs))
434
 
435
 (defclass lt-physical-expression (binary-physical-expression) ())
436
 
437
 (defclass gt-physical-expression (binary-physical-expression) ())
438
 
439
 (defclass lteq-physical-expression (binary-physical-expression) ())
440
 
441
 (defclass gteq-physical-expression (binary-physical-expression) ())
442
 
443
 (defclass and-physical-expression (binary-physical-expression) ())
444
 
445
 (defclass or-physical-expression (binary-physical-expression) ())
446
 
447
 (defclass math-physical-expression (binary-physical-expression) ())
448
 
449
 (defmethod evaluate2 ((self math-physical-expression) (lhs column-vector) (rhs column-vector))
450
   (coerce (loop for i below (column-size lhs)
451
                 collect (evaluate2 self (column-value lhs i) (column-value rhs i)))
452
           'field-vector))
453
 
454
 (defclass add-physical-expresion (math-expression) ())
455
 
456
 (defmethod evaluate2 ((self add-physical-expresion) lhs rhs)
457
   (declare (ignore self))
458
   (+ lhs rhs))
459
 
460
 (defclass sub-physical-expression (math-expression) ())
461
 
462
 (defmethod evaluate2 ((self sub-physical-expression) lhs rhs)
463
   (declare (ignore self))
464
   (- lhs rhs))
465
 
466
 (defclass mult-physical-expression (math-expression) ())
467
 
468
 (defmethod evaluate2 ((self mult-physical-expression) lhs rhs)
469
   (declare (ignore self))
470
   (* lhs rhs))
471
 
472
 (defclass div-physical-expression (math-expression) ())
473
 
474
 (defmethod evaluate2 ((self div-physical-expression) lhs rhs)
475
   (declare (ignore self))
476
   (/ lhs rhs))
477
 
478
 (defclass aggregate-physical-expression (physical-expr)
479
   ((input :type physical-expression)))
480
 
481
 (defclass max-physical-expression (aggregate-physical-expression) ())
482
 
483
 (defmethod make-accumulator ((self max-physical-expression))
484
   (make-instance 'max-accumulator))
485
 
486
 ;;; Physical Plan
487
 (defgeneric execute (self)
488
   (:documentation "Execute the PHYSICAL-QUERY-PLAN represented by object SELF.")
489
   (:method ((self data-frame))
490
     (execute (df-plan self))))
491
 
492
 (defclass scan-exec (physical-query-plan)
493
   ((data-source :type data-source :initarg :data-source)
494
    (projection :type (vector string) :initarg :projection)))
495
 
496
 ;; (defmethod schema ((self scan-exec))
497
 ;;   (select (schema (slot-value self 'data-source)) (slot-value self 'projection)))
498
 
499
 ;; (defmethod execute ((self scan-exec))
500
 ;;   (scan-data (slot-value self 'data-source) (slot-value self 'projection)))
501
 
502
 (defclass projection-exec (physical-query-plan)
503
   ((input :type physical-query-plan :initarg :input)
504
    (expr :type (vector physical-expr) :initarg :expr)))
505
 
506
 ;; (defmethod execute ((self projection-exec))
507
 ;;   (coerce
508
 ;;    (loop for batch across (fields (execute (slot-value self 'input)))
509
 ;;          collect (make-record-batch :schema (slot-value self 'schema)
510
 ;;                                     :fields (coerce
511
 ;;                                              (loop for e across (slot-value self 'expr)
512
 ;;                                                    collect (evaluate e batch))
513
 ;;                                              'field-vector)))
514
 ;;    '(vector record-batch)))
515
                                                  
516
 
517
 (defclass selection-exec (physical-query-plan)
518
   ((input :type physical-query-plan :initarg :input)
519
    (expr :type physical-expr :initarg :expr)))
520
 
521
 ;; (defmethod schema ((self selection-exec))
522
 ;;   (schema (slot-value self 'input)))
523
 
524
 ;; (defmethod execute ((self selection-exec))
525
 ;;   (coerce
526
 ;;    (loop for batch across (execute (slot-value self 'input))
527
 ;;          with res = (coerce (evaluate (slot-value self 'expr) batch) 'bit-vector)
528
 ;;          with schema = (schema batch)
529
 ;;          with count = (column-count (fields (schema batch)))
530
 ;;          with filtered = (loop for i from 0 below count
531
 ;;                                collect (filter self (field batch i) res))
532
 ;;          collect (make-record-batch :schema schema :fields (coerce filtered 'field-vector)))
533
 ;;    '(vector record-batch)))
534
 
535
 (defgeneric filter (self columns selection)
536
   (:method ((self selection-exec) (columns column-vector) (selection simple-bit-vector))
537
     (coerce
538
      (loop for i from 0 below (length selection)
539
            unless (zerop (bit selection i))
540
            collect (column-value columns i))
541
      'field-vector)))
542
 
543
 (defclass hash-aggregate-exec (physical-query-plan)
544
   ((input :type physical-query-plan :initarg :input)
545
    (group-expr :type (vector physical-query-plan) :initarg :group-expr)
546
    (agg-expr :type (vector aggregate-physical-expression) :initarg :agg-expr)))
547
 
548
 ;; (defmethod execute ((self hash-aggregate-exec))
549
 ;;   (coerce 
550
 ;;    (loop for batch across (execute (slot-value self 'input))
551
 ;;          with map = (make-hash-table :test 'equal)
552
 ;;          with groupkeys = (map 'vector  (lambda (x) (evaluate x batch)) (slot-value self 'group-expr))
553
 ;;          with aggr-inputs = (map 'vector (lambda (x) (evaluate (slot-value x 'input) batch))
554
 ;;                                  (slot-value self 'agg-expr))
555
 ;;          do (loop for row-idx from 0 below (row-count batch)
556
 ;;                   with row-key = (map 'vector
557
 ;;                                       (lambda (x)
558
 ;;                                         (when-let ((val (column-value x row-idx)))
559
 ;;                                           (typecase val
560
 ;;                                             (octet-vector (sb-ext:octets-to-string val))
561
 ;;                                             (t val))))
562
 ;;                                       groupkeys)
563
 ;;                   with accs = (if-let ((val (gethash row-key map)))
564
 ;;                                 val
565
 ;;                                 (setf
566
 ;;                                  (gethash row-key map)
567
 ;;                                  (map 'vector
568
 ;;                                       #'make-accumulator
569
 ;;                                       (slot-value self 'agg-expr))))
570
 ;;                   ;; start accumulating
571
 ;;                   do (loop for i from 0 below (length accs)
572
 ;;                            for accum across accs
573
 ;;                            with val = (column-value (aref aggr-inputs i) row-idx)
574
 ;;                            return (accumulate accum val))
575
 ;;                      ;; collect results in array
576
 ;;                   with ret = (make-record-batch :schema (slot-value self 'schema)
577
 ;;                                                 :fields (make-array (hash-table-size map)
578
 ;;                                                                     :element-type 'field
579
 ;;                                                                     :initial-element (make-field)))
580
 ;;                   do (loop for row-idx from 0 below (hash-table-size map)
581
 ;;                            for gkey being the hash-keys of map
582
 ;;                            using (hash-value accums)
583
 ;;                            with glen = (length (slot-value self 'group-expr))
584
 ;;                            do (loop for i from 0 below glen
585
 ;;                                     do (setf (aref (aref (fields ret) i) row-idx)
586
 ;;                                              (aref gkey i)))
587
 ;;                            do (loop for i from 0 below (length (slot-value self 'agg-expr))
588
 ;;                                     do (setf (aref (aref (fields ret) (+ i glen)) row-idx)
589
 ;;                                              (accumulator-value (aref accums i)))))
590
 ;;                   collect ret))
591
 ;;    '(vector record-batch)))
592
 
593
 ;;; Planner
594
 
595
 ;; The Query Planner is effectively a compiler which translates logical
596
 ;; expressions and plans into their physical counterparts.
597
 
598
 (defclass query-planner (planner) ())
599
 
600
 (defgeneric make-physical-expression (expr input)
601
   (:documentation "Translate logical expression EXPR and logical plan INPUT
602
   into a physical expression.")
603
   (:method ((expr string) (input logical-query-plan))
604
     (declare (ignore input))
605
     expr)
606
   (:method ((expr number) (input logical-query-plan))
607
     (declare (ignore input))
608
     expr)
609
   (:method ((expr column-expression) (input logical-query-plan))
610
     (let ((i (position (name expr) (fields (schema input)) :key 'field-name :test 'equal)))
611
       (make-instance 'column-physical-expression :val i)))
612
   (:method ((expr binary-expression) (input logical-query-plan))
613
     (let ((l (make-physical-expression (lhs expr) input))
614
           (r (make-physical-expression (rhs expr) input)))
615
       (etypecase expr
616
         (eq-expression (make-instance 'eq-physical-expression :lhs l :rhs r))
617
         (neq-expression (make-instance 'neq-physical-expression :lhs l :rhs r))
618
         (gt-expression (make-instance 'gt-physical-expression :lhs l :rhs r))
619
         (gteq-expression (make-instance 'gteq-physical-expression :lhs l :rhs r))
620
         (lt-expression (make-instance 'lt-physical-expression :lhs l :rhs r))
621
         (lteq-expression (make-instance 'lteq-physical-expression :lhs l :rhs r))
622
         (and-expression (make-instance 'and-physical-expression :lhs l :rhs r))
623
         (or-expression (make-instance 'or-physical-expression :lhs l :rhs r))
624
         (add-expression (make-instance 'add-physical-expresion :lhs l :rhs r))
625
         (sub-expression (make-instance 'sub-physical-expression :lhs l :rhs r))
626
         (mult-expression (make-instance 'mult-physical-expression :lhs l :rhs r))
627
         (div-expression (make-instance 'div-physical-expression :lhs l :rhs r))))))
628
 
629
 ;; ;; Control Stack dies here?
630
 ;; (defmethod make-physical-plan ((plan logical-query-plan))
631
 ;;   (etypecase plan
632
 ;;     (scan-data (make-instance 'scan-exec
633
 ;;                  :data-source (slot-value plan 'data-source)
634
 ;;                  :projection (slot-value plan 'projection)))
635
 ;;     (projection (make-instance 'projection-exec
636
 ;;                   :schema (make-instance 'schema
637
 ;;                             :fields
638
 ;;                             (map 'field-vector
639
 ;;                                  (lambda (x) (to-field x (slot-value plan 'input)))
640
 ;;                                  (slot-value plan 'expr)))
641
 ;;                   :input (make-physical-plan (slot-value plan 'input))
642
 ;;                   :expr (map 'vector (lambda (x) (make-physical-expression x (slot-value plan 'input)))
643
 ;;                              (slot-value plan 'expr))))
644
 ;;     (selection (make-instance 'selection-exec
645
 ;;                  :input (make-physical-plan (slot-value plan 'input))
646
 ;;                  :expr (make-physical-expression (slot-value plan 'expr) (slot-value plan 'input))))
647
 ;;     (aggregate (make-instance 'hash-aggregate-exec
648
 ;;                  :input (make-physical-plan (slot-value plan 'input))
649
 ;;                  :group-expr (make-physical-expression (slot-value plan 'group-expr) (slot-value plan 'input))
650
 ;;                  :agg-expr (make-physical-expression (slot-value plan 'agg-expr) (slot-value plan 'input))))))
651
 
652
 ;;; Optimizer
653
 
654
 ;; The Query Optimizer is responsible for walking a QUERY-PLAN and returning a
655
 ;; modified version of the same object. Usually we want to run optimization on
656
 ;; LOGICAL-QUERY-PLANs but we also support specializing on PHYSICAL-QUERY-PLAN.
657
 
658
 ;; Rule-based Optimizers: projection/predicate push-down, sub-expr elim
659
 
660
 ;; Lowerings: hdsl -> ldsl
661
 
662
 ;; Extensibility principle - A low level DSL should have greater than or equal
663
 ;; to expressiveness of a high level DSL
664
 
665
 ;; Transformation cohesion principle - There should be a unique path lowering
666
 ;; a high-level DSL to a low-level DSL. This also prevents loops between high
667
 ;; and low level DSLs.
668
 
669
 ;; TBD: Cost-based optimizers
670
 ;; TODO 2024-07-10: 
671
 (defclass query-optimizer () ())
672
 
673
 (defstruct (query-vop (:constructor make-query-vop (info)))
674
   "A virtual query operation available to query compilers."
675
   (info nil))
676
 
677
 (defgeneric optimize-query (self plan)
678
   (:documentation "Optimize the query expressed by PLAN using the optimizer SELF."))
679
 
680
 ;; Projection Pushdown
681
 (defun extract-columns (expr input &optional accum)
682
   "Recursively check an expression for field indicators and add the to an
683
 accumulator."
684
   (etypecase expr
685
     (array-index (accumulate accum (field (fields (schema input)) expr)))
686
     (column-expression (accumulate accum (name expr)))
687
     (binary-expression
688
      (extract-columns (lhs expr) input accum)
689
      (extract-columns (rhs expr) input accum))
690
     (alias-expression (extract-columns (expr expr) input accum))
691
     (cast-expression (extract-columns (expr expr) input accum))
692
     (literal-expr nil)))
693
 
694
 (defun extract-columns* (exprs input &optional accum)
695
   (mapcar (lambda (x) (extract-columns x input accum)) exprs))
696
 
697
 (defclass projection-pushdown-optimizer (query-optimizer) ())
698
 
699
 (defun %pushdown (plan &optional column-names)
700
   (declare (logical-query-plan plan))
701
   (etypecase plan
702
     (projection
703
      (extract-columns (slot-value plan 'expr) column-names)
704
      (let ((input (%pushdown (slot-value plan 'input) column-names)))
705
        (make-instance 'projection :input input :expr (slot-value plan 'expr))))
706
     (selection
707
      (extract-columns (slot-value plan 'expr) column-names)
708
      (let ((input (%pushdown (slot-value plan 'input) column-names)))
709
        (make-instance 'selection :input input :expr (slot-value plan 'expr))))
710
     (aggregate
711
      (extract-columns (slot-value plan 'group-expr) column-names)
712
      (extract-columns*
713
       (loop for x across (slot-value plan 'agg-expr) collect (slot-value x 'input))
714
       column-names)
715
      (let ((input (%pushdown (slot-value plan 'input) column-names)))
716
        (make-instance 'aggregate
717
          :input input
718
          :group-expr (slot-value plan 'group-expr)
719
          :agg-expr (slot-value plan 'agg-expr))))
720
     (scan-data (make-instance 'scan-data
721
                  :path (slot-value plan 'name)
722
                  :data-source (slot-value plan 'data-source)
723
                  :projection column-names)))) ;; maybe sort here?
724
 
725
 (defmethod optimize-query ((self projection-pushdown-optimizer) (plan logical-query-plan))
726
   (%pushdown plan))
727
 
728
 ;;; Query
729
 (defclass query () ())
730
 
731
 (defclass simple-query (query ast id) ())
732
 
733
 (defgeneric make-query (self &rest initargs &key &allow-other-keys)
734
   (:documentation "Make a new QUERY object.")
735
   (:method ((self t) &rest initargs)
736
     (apply 'make-instance 'query initargs))
737
   (:method ((self (eql :simple)) &rest initargs &key &allow-other-keys)
738
     (apply 'make-instance 'simple-query initargs)))
739
 
740
 ;;; Execution Context
741
 (defclass execution-context () ())
742
 
743
 (defgeneric register-df (self name df)
744
   (:documentation "Register a DATA-FRAME with an EXECUTION-CONTEXT."))
745
 
746
 (defgeneric register-data-source (self name source)
747
   (:documentation "Register a DATA-SOURCE with an EXECUTION-CONTEXT."))
748
 
749
 (defgeneric register-file (self name path &key type &allow-other-keys)
750
   (:documentation "Register a DATA-SOURCE contained in a file of type TYPE at PATH."))
751
 
752
 (defgeneric execute* (self df)
753
   (:documentation "Execute the DATA-FRAME DF in CONTEXT. This is the stateful version of EXECUTE.")
754
   (:method ((self execution-context) (df data-frame))
755
     (declare (ignore self))
756
     (execute df)))
757
 
758
 (defmethod execute ((self logical-query-plan))
759
   (execute
760
    (make-physical-plan
761
     (optimize-query (make-instance 'projection-pushdown-optimizer) self))))