]> gitweb.factorcode.org Git - factor.git/blob - extra/mongodb/operations/operations.factor
Merge commit 'mongo-factor-driver/master' into mongo-factor-driver
[factor.git] / extra / mongodb / operations / operations.factor
1 USING: accessors assocs bson.reader bson.writer byte-arrays
2 byte-vectors combinators formatting fry io io.binary io.encodings.private
3 io.encodings.binary io.encodings.string io.encodings.utf8 io.encodings.utf8.private io.files
4 kernel locals math mongodb.msg namespaces sequences uuid bson.writer.private ;
5
6 IN: alien.c-types
7
8 M: byte-vector byte-length length ;
9
10 IN: mongodb.operations
11
12 <PRIVATE
13
14 PREDICATE: mdb-reply-op < integer OP_Reply = ;
15 PREDICATE: mdb-query-op < integer OP_Query = ;
16 PREDICATE: mdb-insert-op < integer OP_Insert = ;
17 PREDICATE: mdb-update-op < integer OP_Update = ;
18 PREDICATE: mdb-delete-op < integer OP_Delete = ;
19 PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
20 PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
21
22 PRIVATE>
23
24 GENERIC: write-message ( message -- )
25
26 <PRIVATE
27
28 CONSTANT: MSG-HEADER-SIZE 16
29
30 SYMBOL: msg-bytes-read 
31
32 : bytes-read> ( -- integer )
33     msg-bytes-read get ; inline
34
35 : >bytes-read ( integer -- )
36     msg-bytes-read set ; inline
37
38 : change-bytes-read ( integer -- )
39     bytes-read> [ 0 ] unless* + >bytes-read ; inline
40
41 : read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline
42 : read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline
43 : read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline
44 : read-byte ( -- byte ) read-byte-raw first ; inline
45
46 : (read-cstring) ( acc -- )
47     [ read-byte ] dip ! b acc
48     2dup push             ! b acc
49     [ 0 = ] dip      ! bool acc
50     '[ _ (read-cstring) ] unless ; inline recursive
51
52 : read-cstring ( -- string )
53     BV{ } clone
54     [ (read-cstring) ] keep
55     [ zero? ] trim-tail
56     >byte-array utf8 decode ; inline
57
58 GENERIC: (read-message) ( message opcode -- message )
59
60 : copy-header ( message msg-stub -- message )
61     [ length>> ] keep [ >>length ] dip
62     [ req-id>> ] keep [ >>req-id ] dip
63     [ resp-id>> ] keep [ >>resp-id ] dip
64     [ opcode>> ] keep [ >>opcode ] dip
65     flags>> >>flags ;
66
67 M: mdb-query-op (read-message) ( msg-stub opcode -- message )
68     drop
69     [ mdb-query-msg new ] dip copy-header
70     read-cstring >>collection
71     read-int32 >>skip#
72     read-int32 >>return#
73     H{ } stream>assoc change-bytes-read >>query 
74     dup length>> bytes-read> >
75     [ H{ } stream>assoc change-bytes-read >>returnfields ] when ;
76
77 M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
78     drop
79     [ mdb-insert-msg new ] dip copy-header
80     read-cstring >>collection
81     V{ } clone >>objects
82     [ '[ _ length>> bytes-read> > ] ] keep tuck
83     '[ H{ } stream>assoc change-bytes-read _ objects>> push ]
84     while ;
85
86 M: mdb-delete-op (read-message) ( msg-stub opcode -- message )
87     drop
88     [ mdb-delete-msg new ] dip copy-header
89     read-cstring >>collection
90     H{ } stream>assoc change-bytes-read >>selector ;
91
92 M: mdb-getmore-op (read-message) ( msg-stub opcode -- message )
93     drop
94     [ mdb-getmore-msg new ] dip copy-header
95     read-cstring >>collection
96     read-int32 >>return#
97     read-longlong >>cursor ;
98
99 M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message )
100     drop
101     [ mdb-killcursors-msg new ] dip copy-header
102     read-int32 >>cursors#
103     V{ } clone >>cursors
104     [ [ cursors#>> ] keep 
105       '[ read-longlong _ cursors>> push ] times ] keep ;
106
107 M: mdb-update-op (read-message) ( msg-stub opcode -- message )
108     drop
109     [ mdb-update-msg new ] dip copy-header
110     read-cstring >>collection
111     read-int32 >>upsert?
112     H{ } stream>assoc change-bytes-read >>selector
113     H{ } stream>assoc change-bytes-read >>object ;
114
115 M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
116     drop
117     [ <mdb-reply-msg> ] dip copy-header
118     read-longlong >>cursor
119     read-int32 >>start#
120     read-int32 [ >>returned# ] keep
121     [ H{ } stream>assoc drop ] accumulator [ times ] dip >>objects ;    
122
123 : read-header ( message -- message )
124     read-int32 >>length
125     read-int32 >>req-id
126     read-int32 >>resp-id
127     read-int32 >>opcode
128     read-int32 >>flags ; inline
129
130 : write-header ( message -- )
131     [ req-id>> write-int32 ] keep
132     [ resp-id>> write-int32 ] keep 
133     opcode>> write-int32 ; inline
134
135 PRIVATE>
136
137 : read-message ( -- message )
138     mdb-msg new
139     0 >bytes-read
140     read-header
141     [ ] [ opcode>> ] bi (read-message) ;
142
143 <PRIVATE
144
145 USE: tools.walker
146
147 : dump-to-file ( array -- )
148     [ uuid1 "/tmp/mfb/%s.dump" sprintf binary ] dip
149     '[ _ write ] with-file-writer ;
150
151 : (write-message) ( message quot -- )    
152     '[ [ [ _ write-header ] dip _ call ] with-length-prefix ] with-buffer
153     ! [ dump-to-file ] keep
154     write flush ; inline
155
156 : build-query-object ( query -- selector )
157     [let | selector [ H{ } clone ] |
158         { [ orderby>> [ "orderby" selector set-at ] when* ]
159           [ explain>> [ "$explain" selector set-at ] when* ]
160           [ hint>> [ "$hint" selector set-at ] when* ] 
161           [ query>> "query" selector set-at ]
162         } cleave
163         selector
164     ] ;     
165
166 PRIVATE>
167
168 M: mdb-query-msg write-message ( message -- )
169      dup
170      '[ _ 
171         [ flags>> write-int32 ] keep 
172         [ collection>> write-cstring ] keep
173         [ skip#>> write-int32 ] keep
174         [ return#>> write-int32 ] keep
175         [ build-query-object assoc>stream ] keep
176         returnfields>> [ assoc>stream ] when* 
177      ] (write-message) ;
178  
179 M: mdb-insert-msg write-message ( message -- )
180     dup
181     '[ _
182        [ flags>> write-int32 ] keep
183        [ collection>> write-cstring ] keep
184        objects>> [ assoc>stream ] each
185     ] (write-message) ;
186
187 M: mdb-update-msg write-message ( message -- )
188     dup
189     '[ _
190        [ flags>> write-int32 ] keep
191        [ collection>> write-cstring ] keep
192        [ upsert?>> write-int32 ] keep
193        [ selector>> assoc>stream ] keep
194        object>> assoc>stream
195     ] (write-message) ;
196
197 M: mdb-delete-msg write-message ( message -- )
198     dup
199     '[ _
200        [ flags>> write-int32 ] keep
201        [ collection>> write-cstring ] keep
202        0 write-int32
203        selector>> assoc>stream
204     ] (write-message) ;
205
206 M: mdb-getmore-msg write-message ( message -- )
207     dup
208     '[ _
209        [ flags>> write-int32 ] keep
210        [ collection>> write-cstring ] keep
211        [ return#>> write-int32 ] keep
212        cursor>> write-longlong
213     ] (write-message) ;
214
215 M: mdb-killcursors-msg write-message ( message -- )
216     dup
217     '[ _
218        [ flags>> write-int32 ] keep
219        [ cursors#>> write-int32 ] keep
220        cursors>> [ write-longlong ] each
221     ] (write-message) ;
222