1 USING: accessors assocs bson.reader bson.writer byte-arrays
2 byte-vectors combinators formatting fry io io.binary io.encodings.private
3 io.encodings.binary io.encodings.string io.encodings.utf8 io.encodings.utf8.private io.files
4 kernel locals math mongodb.msg namespaces sequences uuid bson.writer.private ;
8 M: byte-vector byte-length length ;
10 IN: mongodb.operations
14 PREDICATE: mdb-reply-op < integer OP_Reply = ;
15 PREDICATE: mdb-query-op < integer OP_Query = ;
16 PREDICATE: mdb-insert-op < integer OP_Insert = ;
17 PREDICATE: mdb-update-op < integer OP_Update = ;
18 PREDICATE: mdb-delete-op < integer OP_Delete = ;
19 PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
20 PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
24 GENERIC: write-message ( message -- )
28 CONSTANT: MSG-HEADER-SIZE 16
30 SYMBOL: msg-bytes-read
32 : bytes-read> ( -- integer )
33 msg-bytes-read get ; inline
35 : >bytes-read ( integer -- )
36 msg-bytes-read set ; inline
38 : change-bytes-read ( integer -- )
39 bytes-read> [ 0 ] unless* + >bytes-read ; inline
41 : read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline
42 : read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline
43 : read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline
44 : read-byte ( -- byte ) read-byte-raw first ; inline
46 : (read-cstring) ( acc -- )
47 [ read-byte ] dip ! b acc
49 [ 0 = ] dip ! bool acc
50 '[ _ (read-cstring) ] unless ; inline recursive
52 : read-cstring ( -- string )
54 [ (read-cstring) ] keep
56 >byte-array utf8 decode ; inline
58 GENERIC: (read-message) ( message opcode -- message )
60 : copy-header ( message msg-stub -- message )
61 [ length>> ] keep [ >>length ] dip
62 [ req-id>> ] keep [ >>req-id ] dip
63 [ resp-id>> ] keep [ >>resp-id ] dip
64 [ opcode>> ] keep [ >>opcode ] dip
67 M: mdb-query-op (read-message) ( msg-stub opcode -- message )
69 [ mdb-query-msg new ] dip copy-header
70 read-cstring >>collection
73 H{ } stream>assoc change-bytes-read >>query
74 dup length>> bytes-read> >
75 [ H{ } stream>assoc change-bytes-read >>returnfields ] when ;
77 M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
79 [ mdb-insert-msg new ] dip copy-header
80 read-cstring >>collection
82 [ '[ _ length>> bytes-read> > ] ] keep tuck
83 '[ H{ } stream>assoc change-bytes-read _ objects>> push ]
86 M: mdb-delete-op (read-message) ( msg-stub opcode -- message )
88 [ mdb-delete-msg new ] dip copy-header
89 read-cstring >>collection
90 H{ } stream>assoc change-bytes-read >>selector ;
92 M: mdb-getmore-op (read-message) ( msg-stub opcode -- message )
94 [ mdb-getmore-msg new ] dip copy-header
95 read-cstring >>collection
97 read-longlong >>cursor ;
99 M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message )
101 [ mdb-killcursors-msg new ] dip copy-header
102 read-int32 >>cursors#
104 [ [ cursors#>> ] keep
105 '[ read-longlong _ cursors>> push ] times ] keep ;
107 M: mdb-update-op (read-message) ( msg-stub opcode -- message )
109 [ mdb-update-msg new ] dip copy-header
110 read-cstring >>collection
112 H{ } stream>assoc change-bytes-read >>selector
113 H{ } stream>assoc change-bytes-read >>object ;
115 M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
117 [ <mdb-reply-msg> ] dip copy-header
118 read-longlong >>cursor
120 read-int32 [ >>returned# ] keep
121 [ H{ } stream>assoc drop ] accumulator [ times ] dip >>objects ;
123 : read-header ( message -- message )
128 read-int32 >>flags ; inline
130 : write-header ( message -- )
131 [ req-id>> write-int32 ] keep
132 [ resp-id>> write-int32 ] keep
133 opcode>> write-int32 ; inline
137 : read-message ( -- message )
141 [ ] [ opcode>> ] bi (read-message) ;
147 : dump-to-file ( array -- )
148 [ uuid1 "/tmp/mfb/%s.dump" sprintf binary ] dip
149 '[ _ write ] with-file-writer ;
151 : (write-message) ( message quot -- )
152 '[ [ [ _ write-header ] dip _ call ] with-length-prefix ] with-buffer
153 ! [ dump-to-file ] keep
156 : build-query-object ( query -- selector )
157 [let | selector [ H{ } clone ] |
158 { [ orderby>> [ "orderby" selector set-at ] when* ]
159 [ explain>> [ "$explain" selector set-at ] when* ]
160 [ hint>> [ "$hint" selector set-at ] when* ]
161 [ query>> "query" selector set-at ]
168 M: mdb-query-msg write-message ( message -- )
171 [ flags>> write-int32 ] keep
172 [ collection>> write-cstring ] keep
173 [ skip#>> write-int32 ] keep
174 [ return#>> write-int32 ] keep
175 [ build-query-object assoc>stream ] keep
176 returnfields>> [ assoc>stream ] when*
179 M: mdb-insert-msg write-message ( message -- )
182 [ flags>> write-int32 ] keep
183 [ collection>> write-cstring ] keep
184 objects>> [ assoc>stream ] each
187 M: mdb-update-msg write-message ( message -- )
190 [ flags>> write-int32 ] keep
191 [ collection>> write-cstring ] keep
192 [ upsert?>> write-int32 ] keep
193 [ selector>> assoc>stream ] keep
194 object>> assoc>stream
197 M: mdb-delete-msg write-message ( message -- )
200 [ flags>> write-int32 ] keep
201 [ collection>> write-cstring ] keep
203 selector>> assoc>stream
206 M: mdb-getmore-msg write-message ( message -- )
209 [ flags>> write-int32 ] keep
210 [ collection>> write-cstring ] keep
211 [ return#>> write-int32 ] keep
212 cursor>> write-longlong
215 M: mdb-killcursors-msg write-message ( message -- )
218 [ flags>> write-int32 ] keep
219 [ cursors#>> write-int32 ] keep
220 cursors>> [ write-longlong ] each