]> gitweb.factorcode.org Git - factor.git/blob - extra/mongodb/operations/operations.factor
endian: replaces io.binary and io.binary.fast.
[factor.git] / extra / mongodb / operations / operations.factor
1 USING: accessors assocs bson.reader bson.writer byte-arrays
2 byte-vectors combinators formatting endian fry io
3 io.encodings.private io.encodings.binary io.encodings.string
4 io.encodings.utf8 io.encodings.utf8.private io.files kernel
5 locals math mongodb.msg namespaces sequences uuid
6 bson.writer.private ;
7
8 FROM: mongodb.connection => connection-buffer ;
9 FROM: alien => byte-length ;
10
11 IN: mongodb.operations
12
13 M: byte-vector byte-length length ;
14
15 <PRIVATE
16
17 PREDICATE: mdb-reply-op < integer OP_Reply = ;
18 PREDICATE: mdb-query-op < integer OP_Query = ;
19 PREDICATE: mdb-insert-op < integer OP_Insert = ;
20 PREDICATE: mdb-update-op < integer OP_Update = ;
21 PREDICATE: mdb-delete-op < integer OP_Delete = ;
22 PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
23 PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
24
25 CONSTANT: MSG-HEADER-SIZE 16
26
27 SYMBOL: msg-bytes-read
28
29 : bytes-read> ( -- integer )
30     msg-bytes-read get ; inline
31
32 : >bytes-read ( integer -- )
33     msg-bytes-read set ; inline
34
35 : change-bytes-read ( integer -- )
36     bytes-read> [ 0 ] unless* + >bytes-read ; inline
37
38 : read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline
39 : read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline
40 : read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline
41 : read-byte ( -- byte ) read-byte-raw first ; inline
42
43 : copy-header ( message msg-stub -- message )
44     {
45         [ length>> >>length ]
46         [ req-id>> >>req-id ]
47         [ resp-id>> >>resp-id ]
48         [ opcode>> >>opcode ]
49         [ flags>> >>flags ]
50     } cleave ; inline
51
52 : reply-read-message ( msg-stub -- message )
53     [ <mdb-reply-msg> ] dip copy-header
54     read-longlong >>cursor
55     read-int32 >>start#
56     read-int32 [ >>returned# ] keep
57     [ H{ } clone stream>assoc ] collector [ times ] dip >>objects ;
58
59 : (read-message) ( message opcode -- message )
60     OP_Reply =
61     [ reply-read-message ]
62     [ "unknown message type" throw ] if ; inline
63
64 : read-header ( message -- message )
65     read-int32 >>length
66     read-int32 >>req-id
67     read-int32 >>resp-id
68     read-int32 >>opcode
69     read-int32 >>flags ; inline
70
71 : write-header ( message -- )
72     [ req-id>> write-int32 ]
73     [ resp-id>> write-int32 ]
74     [ opcode>> write-int32 ] tri ; inline
75
76 PRIVATE>
77
78 : read-message ( -- message )
79     [
80         mdb-msg new 0 >bytes-read read-header
81         [ ] [ opcode>> ] bi (read-message)
82     ] with-scope ;
83
84 <PRIVATE
85
86 : (write-message) ( message quot -- )
87     [ connection-buffer dup ] 2dip
88     '[
89         [ _ [ write-header ] _ bi ] with-length-prefix
90     ] with-output-stream* write flush ; inline
91
92 :: build-query-object ( query -- selector )
93     H{ } clone :> selector
94     query {
95         [ orderby>> [ "$orderby" selector set-at ] when* ]
96         [ explain>> [ "$explain" selector set-at ] when* ]
97         [ hint>> [ "$hint" selector set-at ] when* ]
98         [ query>> "query" selector set-at ]
99     } cleave selector ; inline
100
101 : write-query-message ( message -- )
102     [
103         {
104             [ flags>> write-int32 ]
105             [ collection>> write-cstring ]
106             [ skip#>> write-int32 ]
107             [ return#>> write-int32 ]
108             [ build-query-object assoc>stream ]
109             [ returnfields>> [ assoc>stream ] when* ]
110         } cleave
111     ] (write-message) ; inline
112
113 : write-insert-message ( message -- )
114     [
115        [ flags>> write-int32 ]
116        [ collection>> write-cstring ]
117        [ objects>> [ assoc>stream ] each ] tri
118     ] (write-message) ; inline
119
120 : write-update-message ( message -- )
121     [
122         {
123             [ flags>> write-int32 ]
124             [ collection>> write-cstring ]
125             [ update-flags>> write-int32 ]
126             [ selector>> assoc>stream ]
127             [ object>> assoc>stream ]
128         } cleave
129     ] (write-message) ; inline
130
131 : write-delete-message ( message -- )
132     [
133         {
134             [ flags>> write-int32 ]
135             [ collection>> write-cstring ]
136             [ delete-flags>> write-int32 ]
137             [ selector>> assoc>stream ]
138         } cleave
139     ] (write-message) ; inline
140
141 : write-getmore-message ( message -- )
142     [
143         {
144            [ flags>> write-int32 ]
145            [ collection>> write-cstring ]
146            [ return#>> write-int32 ]
147            [ cursor>> write-longlong ]
148         } cleave
149     ] (write-message) ; inline
150
151 : write-killcursors-message ( message -- )
152     [
153        [ flags>> write-int32 ]
154        [ cursors#>> write-int32 ]
155        [ cursors>> [ write-longlong ] each ] tri
156     ] (write-message) ; inline
157
158 PRIVATE>
159
160 : write-message ( message -- )
161     {
162         { [ dup mdb-query-msg? ] [ write-query-message ] }
163         { [ dup mdb-insert-msg? ] [ write-insert-message ] }
164         { [ dup mdb-update-msg? ] [ write-update-message ] }
165         { [ dup mdb-delete-msg? ] [ write-delete-message ] }
166         { [ dup mdb-getmore-msg? ] [ write-getmore-message ] }
167         { [ dup mdb-killcursors-msg? ] [ write-killcursors-message ] }
168     } cond ;