]> gitweb.factorcode.org Git - factor.git/blob - unmaintained/mongodb/gridfs/gridfs/gridfs.factor
tools.test: Make the flag public. Finish porting tester changes to fuzzer.
[factor.git] / unmaintained / mongodb / gridfs / gridfs / gridfs.factor
1 USING: accessors arrays assocs base64 bson.constants
2 byte-arrays byte-vectors calendar combinators
3 combinators.short-circuit destructors formatting fry hashtables
4 io kernel linked-assocs locals math math.parser mongodb.cmd
5 mongodb.connection mongodb.driver mongodb.msg namespaces
6 sequences splitting strings ;
7 FROM: mongodb.driver => update ;
8 IN: mongodb.gridfs
9
10 CONSTANT: default-chunk-size 262144
11
12 TUPLE: gridfs 
13     { bucket string } 
14     { files string }
15     { chunks string } ;
16
17
18 <PRIVATE
19
20 : gridfs> ( -- gridfs )
21     gridfs get ; inline
22
23 : files-collection ( -- str ) gridfs> files>> ; inline
24 : chunks-collection ( -- str ) gridfs> chunks>> ; inline
25
26
27 : init-gridfs ( gridfs -- )
28     chunks>> "ChunkIdx" H{ { "files_id" 1 } { "n" 1 } } 
29     <index-spec> ensure-index ; inline
30
31 PRIVATE>
32
33 : <gridfs> ( bucket -- gridfs )
34     [  ] 
35     [ "files" "%s.%s" sprintf  ] 
36     [ "chunks" "%s.%s" sprintf ] tri
37     gridfs boa [ init-gridfs ] keep ;
38
39 : with-gridfs ( gridfs quot -- * )
40     [ gridfs ] dip with-variable ; inline
41
42 TUPLE: entry 
43     { id oid }
44     { filename string }
45     { content-type string }
46     { length integer }
47     { chunk-size integer }
48     { created timestamp }
49     { aliases array }
50     { metadata hashtable }
51     { md5 string } ;
52
53 <PRIVATE
54
55 : id>base64 ( id -- str )
56     [ a>> >hex ] [ b>> >hex ] bi 
57     2array "#" join >base64 >string ; inline
58
59 : base64>id ( str -- objid )
60     base64> >string "#" split 
61     [ first ] [ second ] bi 
62     [ hex> ] bi@ oid boa ; inline
63     
64 PRIVATE>
65
66 : <entry> ( name content-type -- entry )
67     entry new 
68     swap >>content-type swap >>filename 
69     <oid> >>id 0 >>length default-chunk-size >>chunk-size 
70     now >>created ; inline
71
72 <PRIVATE 
73
74 TUPLE: chunk 
75     { id oid }
76     { fileid oid }
77     { n integer }
78     { data byte-array } ;
79
80 : at> ( assoc key -- value/f )
81     swap at ; inline
82
83 :: >set-at ( assoc value key -- )
84     value key assoc set-at ; inline
85
86 : (update-file) ( entry assoc -- entry )
87     { 
88         [ "_id" at> >>id ]
89         [ "filename" at> >>filename ]
90         [ "contentType" at> >>content-type ]
91         [ "length" at> >>length ]
92         [ "chunkSize" at> >>chunk-size ]
93         [ "uploadDate" at> >>created ]
94         [ "aliases" at> >>aliases ]
95         [ "metadata" at> >>metadata ]
96         [ "md5" at> >>md5 ]
97     } cleave ; inline
98
99 : assoc>chunk ( assoc -- chunk )
100     [ chunk new ] dip
101     {  
102         [ "_id" at> >>id ]
103         [ "files_id" at> >>fileid ]
104         [ "n" at> >>n ]
105         [ "data" at> >>data ]
106     } cleave ;
107
108 : assoc>entry ( assoc -- entry )
109     [ entry new ] dip (update-file) ;
110     
111 : entry>assoc ( entry -- assoc )
112     [ H{  } clone ] dip
113     {
114         [ id>> "_id" >set-at ]
115         [ filename>> "filename" >set-at ]
116         [ content-type>> "contentType" >set-at ]
117         [ length>> "length" >set-at ]
118         [ chunk-size>> "chunkSize" >set-at ]
119         [ created>> "uploadDate" >set-at ]
120         [ aliases>> "aliases" >set-at ]
121         [ metadata>> "metadata" >set-at ]
122         [ md5>> "md5" >set-at ]
123         [ drop ]
124     } 2cleave ; inline
125
126 : create-entry ( entry -- entry )
127     [ [ files-collection ] dip entry>assoc save ] [ ] bi ;
128
129 TUPLE: state bytes count ;
130
131 : <state> ( -- state )
132     0 0 state boa ; inline
133
134 : get-state ( -- n )
135     state get ; inline
136
137 : with-state ( quot -- state )
138     [ <state> state ] dip 
139     [ get-state ] compose 
140     with-variable ; inline
141
142 : update-state ( bytes -- )
143     [ get-state ] dip
144     '[ _ + ] change-bytes 
145     [ 1 + ] change-count drop ; inline
146
147 :: store-chunk ( chunk entry n -- ) 
148     entry id>> :> id
149     H{ { "files_id" id }
150        { "n" n } { "data" chunk } }
151     [ chunks-collection ] dip save ; inline
152
153 :: write-chunks ( stream entry -- length )
154     entry chunk-size>> :> chunk-size
155     [
156         [ 
157             chunk-size stream stream-read dup [
158                 [ entry get-state count>> store-chunk ]
159                 [ length update-state ] bi 
160             ] when*
161         ] loop
162     ] with-state bytes>> ;
163
164 : (entry-selector) ( entry -- selector )
165     id>> "_id" associate ; inline
166
167 :: file-md5 ( id -- md5-str )
168     filemd5-cmd make-cmd
169     id "filemd5" set-cmd-opt
170     gridfs> bucket>> "root" set-cmd-opt
171     send-cmd "md5" at> ; inline
172
173 : update-entry ( bytes entry -- entry )
174     [ swap >>length dup id>> file-md5 >>md5  ]
175     [ nip [ (entry-selector) ] [  ] bi
176         [ length>> "length" associate "$set" associate 
177           [ files-collection ] 2dip <update> update ]
178         [ md5>> "md5" associate "$set" associate 
179           [ files-collection ] 2dip <update> update ] 2bi 
180     ] 2bi ;
181
182 TUPLE: gridfs-input-stream entry chunk n offset cpos ;
183
184 : <gridfs-input-stream> ( entry -- stream )
185     [ gridfs-input-stream new ] dip
186     >>entry 0 >>offset 0 >>cpos -1 >>n ;
187
188 PRIVATE>
189
190 : write-entry ( input-stream entry -- entry )
191     create-entry [ write-chunks ] keep update-entry  ;
192
193 : get-entry ( id -- entry )
194     [ files-collection ] dip
195     "_id" associate <query> find-one assoc>entry ;
196
197 : open-entry ( entry -- input-stream )
198     <gridfs-input-stream> ;
199
200 : entry-contents ( entry -- bytearray )
201     <gridfs-input-stream> stream-contents ;
202
203 <PRIVATE
204
205 : load-chunk ( stream -- chunk/f )
206     [ entry>> id>> "files_id" associate ]
207     [ n>> "n" associate ] bi assoc-union
208     [ chunks-collection ] dip 
209     <query> find-one dup [ assoc>chunk ] when ;
210
211 : exhausted? ( stream -- boolean )
212     [ offset>> ] [ entry>> length>> ] bi = ; inline
213
214 : fresh? ( stream -- boolean )
215     [ offset>> 0 = ] [ chunk>> f = ] bi and ; inline
216
217 : data-available ( stream -- int/f )
218     [ cpos>> ] [ chunk>> data>> length ] bi 
219     2dup < [ swap - ] [ 2drop f ] if ; inline
220
221 : next-chunk ( stream -- available chunk/f )
222     0 >>cpos [ 1 + ] change-n
223     [  ] [ load-chunk ] bi >>chunk
224     [ data-available ] [ chunk>> ] bi ; inline
225
226 : ?chunk ( stream -- available chunk/f )
227     dup fresh? [ next-chunk ] [ 
228         dup exhausted? [ drop 0 f ] [  
229             dup data-available [ swap chunk>> ] [ next-chunk ] if*
230         ] if
231     ] if ; inline
232
233 : set-stream ( n stream -- )
234     swap { 
235         [ >>offset drop ]
236         [ over entry>> chunk-size>> /mod [ >>n ] [ >>cpos ] bi* drop ]
237         [ drop dup load-chunk >>chunk drop ]
238     } 2cleave ; inline
239
240 :: advance-stream ( n stream -- )
241     stream [ n + ] change-cpos [ n + ] change-offset drop ; inline
242
243 : read-part ( n stream chunk -- seq/f )
244     [ [ cpos>> swap [ drop ] [ + ] 2bi ] [ data>> ] bi* <slice> ]
245     [ drop advance-stream ] 3bi ; inline
246
247 :: (stream-read-partial) ( n stream -- seq/f )
248     stream ?chunk :> chunk :> available
249     chunk [
250         n available < 
251         [ n ] [ available ] if 
252         stream chunk read-part 
253     ] [ f ] if ; inline
254
255 :: (stream-read) ( n stream acc -- )
256     n stream (stream-read-partial)
257     {
258         { [ dup not ] [ drop ] }
259         { [ dup length n = ] [ acc push-all ] }
260         { [ dup length n < ] [
261             [ acc push-all ] [ length ] bi
262             n swap - stream acc (stream-read) ]
263         }
264     } cond ; inline recursive 
265
266 PRIVATE>
267
268 M: gridfs-input-stream stream-element-type drop +byte+ ;
269
270 M: gridfs-input-stream stream-read ( n stream -- seq/f )
271     over <byte-vector> [ (stream-read) ] [ ] bi
272     [ f ] [ >byte-array ] if-empty ;
273
274 M: gridfs-input-stream stream-read-partial ( n stream -- seq/f )
275     (stream-read-partial) ;
276
277 M: gridfs-input-stream stream-tell ( stream -- n ) 
278     offset>> ;
279
280 M: gridfs-input-stream stream-seek ( n seek-type stream -- )
281     swap seek-absolute = 
282     [ set-stream ] 
283     [ "seek-type not supported" throw ] if ;
284
285 M: gridfs-input-stream dispose drop ;