]> gitweb.factorcode.org Git - factor.git/blob - extra/mongodb/operations/operations.factor
7aaea3d6af37c48ce00d3dc30a1823f05e012dfb
[factor.git] / extra / mongodb / operations / operations.factor
1 USING: accessors assocs bson.reader bson.writer byte-arrays
2 byte-vectors combinators formatting fry io io.binary io.encodings.private
3 io.encodings.binary io.encodings.string io.encodings.utf8 io.encodings.utf8.private io.files
4 kernel locals math mongodb.msg namespaces sequences uuid bson.writer.private ;
5
6 FROM: mongodb.connection => connection-buffer ;
7 FROM: alien => byte-length ;
8
9 IN: mongodb.operations
10
11 M: byte-vector byte-length length ;
12
13 <PRIVATE
14
15 PREDICATE: mdb-reply-op < integer OP_Reply = ;
16 PREDICATE: mdb-query-op < integer OP_Query = ;
17 PREDICATE: mdb-insert-op < integer OP_Insert = ;
18 PREDICATE: mdb-update-op < integer OP_Update = ;
19 PREDICATE: mdb-delete-op < integer OP_Delete = ;
20 PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
21 PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
22
23 CONSTANT: MSG-HEADER-SIZE 16
24
25 SYMBOL: msg-bytes-read
26
27 : bytes-read> ( -- integer )
28     msg-bytes-read get ; inline
29
30 : >bytes-read ( integer -- )
31     msg-bytes-read set ; inline
32
33 : change-bytes-read ( integer -- )
34     bytes-read> [ 0 ] unless* + >bytes-read ; inline
35
36 : read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline
37 : read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline
38 : read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline
39 : read-byte ( -- byte ) read-byte-raw first ; inline
40
41 : copy-header ( message msg-stub -- message )
42     {
43         [ length>> >>length ]
44         [ req-id>> >>req-id ]
45         [ resp-id>> >>resp-id ]
46         [ opcode>> >>opcode ]
47         [ flags>> >>flags ]
48     } cleave ; inline
49
50 : reply-read-message ( msg-stub -- message )
51     [ <mdb-reply-msg> ] dip copy-header
52     read-longlong >>cursor
53     read-int32 >>start#
54     read-int32 [ >>returned# ] keep
55     [ H{ } clone stream>assoc ] collector [ times ] dip >>objects ;
56
57 : (read-message) ( message opcode -- message )
58     OP_Reply =
59     [ reply-read-message ]
60     [ "unknown message type" throw ] if ; inline
61
62 : read-header ( message -- message )
63     read-int32 >>length
64     read-int32 >>req-id
65     read-int32 >>resp-id
66     read-int32 >>opcode
67     read-int32 >>flags ; inline
68
69 : write-header ( message -- )
70     [ req-id>> write-int32 ]
71     [ resp-id>> write-int32 ]
72     [ opcode>> write-int32 ] tri ; inline
73
74 PRIVATE>
75
76 : read-message ( -- message )
77     [
78         mdb-msg new 0 >bytes-read read-header
79         [ ] [ opcode>> ] bi (read-message)
80     ] with-scope ;
81
82 <PRIVATE
83
84 : (write-message) ( message quot -- )
85     [ connection-buffer dup ] 2dip
86     '[
87         [ _ [ write-header ] _ bi ] with-length-prefix
88     ] with-output-stream* write flush ; inline
89
90 :: build-query-object ( query -- selector )
91     H{ } clone :> selector
92     query {
93         [ orderby>> [ "$orderby" selector set-at ] when* ]
94         [ explain>> [ "$explain" selector set-at ] when* ]
95         [ hint>> [ "$hint" selector set-at ] when* ]
96         [ query>> "query" selector set-at ]
97     } cleave selector ; inline
98
99 : write-query-message ( message -- )
100     [
101         {
102             [ flags>> write-int32 ]
103             [ collection>> write-cstring ]
104             [ skip#>> write-int32 ]
105             [ return#>> write-int32 ]
106             [ build-query-object assoc>stream ]
107             [ returnfields>> [ assoc>stream ] when* ]
108         } cleave
109     ] (write-message) ; inline
110
111 : write-insert-message ( message -- )
112     [
113        [ flags>> write-int32 ]
114        [ collection>> write-cstring ]
115        [ objects>> [ assoc>stream ] each ] tri
116     ] (write-message) ; inline
117
118 : write-update-message ( message -- )
119     [
120         {
121             [ flags>> write-int32 ]
122             [ collection>> write-cstring ]
123             [ update-flags>> write-int32 ]
124             [ selector>> assoc>stream ]
125             [ object>> assoc>stream ]
126         } cleave
127     ] (write-message) ; inline
128
129 : write-delete-message ( message -- )
130     [
131         {
132             [ flags>> write-int32 ]
133             [ collection>> write-cstring ]
134             [ delete-flags>> write-int32 ]
135             [ selector>> assoc>stream ]
136         } cleave
137     ] (write-message) ; inline
138
139 : write-getmore-message ( message -- )
140     [
141         {
142            [ flags>> write-int32 ]
143            [ collection>> write-cstring ]
144            [ return#>> write-int32 ]
145            [ cursor>> write-longlong ]
146         } cleave
147     ] (write-message) ; inline
148
149 : write-killcursors-message ( message -- )
150     [
151        [ flags>> write-int32 ]
152        [ cursors#>> write-int32 ]
153        [ cursors>> [ write-longlong ] each ] tri
154     ] (write-message) ; inline
155
156 PRIVATE>
157
158 : write-message ( message -- )
159     {
160         { [ dup mdb-query-msg? ] [ write-query-message ] }
161         { [ dup mdb-insert-msg? ] [ write-insert-message ] }
162         { [ dup mdb-update-msg? ] [ write-update-message ] }
163         { [ dup mdb-delete-msg? ] [ write-delete-message ] }
164         { [ dup mdb-getmore-msg? ] [ write-getmore-message ] }
165         { [ dup mdb-killcursors-msg? ] [ write-killcursors-message ] }
166     } cond ;