1 USING: accessors arrays assocs base64 bson.constants
2 byte-arrays byte-vectors calendar combinators
3 combinators.short-circuit destructors formatting fry hashtables
4 io kernel linked-assocs locals math math.parser mongodb.cmd
5 mongodb.connection mongodb.driver mongodb.msg namespaces
6 sequences splitting strings ;
7 FROM: mongodb.driver => update ;
10 CONSTANT: default-chunk-size 262144
20 : gridfs> ( -- gridfs )
23 : files-collection ( -- str ) gridfs> files>> ; inline
24 : chunks-collection ( -- str ) gridfs> chunks>> ; inline
27 : init-gridfs ( gridfs -- )
28 chunks>> "ChunkIdx" H{ { "files_id" 1 } { "n" 1 } }
29 <index-spec> ensure-index ; inline
33 : <gridfs> ( bucket -- gridfs )
35 [ "files" "%s.%s" sprintf ]
36 [ "chunks" "%s.%s" sprintf ] tri
37 gridfs boa [ init-gridfs ] keep ;
39 : with-gridfs ( gridfs quot -- * )
40 [ gridfs ] dip with-variable ; inline
45 { content-type string }
47 { chunk-size integer }
50 { metadata hashtable }
55 : id>base64 ( id -- str )
56 [ a>> >hex ] [ b>> >hex ] bi
57 2array "#" join >base64 >string ; inline
59 : base64>id ( str -- objid )
60 base64> >string "#" split
61 [ first ] [ second ] bi
62 [ hex> ] bi@ oid boa ; inline
66 : <entry> ( name content-type -- entry )
68 swap >>content-type swap >>filename
69 <oid> >>id 0 >>length default-chunk-size >>chunk-size
70 now >>created ; inline
80 : at> ( assoc key -- value/f )
83 :: >set-at ( assoc value key -- )
84 value key assoc set-at ; inline
86 : (update-file) ( entry assoc -- entry )
89 [ "filename" at> >>filename ]
90 [ "contentType" at> >>content-type ]
91 [ "length" at> >>length ]
92 [ "chunkSize" at> >>chunk-size ]
93 [ "uploadDate" at> >>created ]
94 [ "aliases" at> >>aliases ]
95 [ "metadata" at> >>metadata ]
99 : assoc>chunk ( assoc -- chunk )
103 [ "files_id" at> >>fileid ]
105 [ "data" at> >>data ]
108 : assoc>entry ( assoc -- entry )
109 [ entry new ] dip (update-file) ;
111 : entry>assoc ( entry -- assoc )
114 [ id>> "_id" >set-at ]
115 [ filename>> "filename" >set-at ]
116 [ content-type>> "contentType" >set-at ]
117 [ length>> "length" >set-at ]
118 [ chunk-size>> "chunkSize" >set-at ]
119 [ created>> "uploadDate" >set-at ]
120 [ aliases>> "aliases" >set-at ]
121 [ metadata>> "metadata" >set-at ]
122 [ md5>> "md5" >set-at ]
126 : create-entry ( entry -- entry )
127 [ [ files-collection ] dip entry>assoc save ] [ ] bi ;
129 TUPLE: state bytes count ;
131 : <state> ( -- state )
132 0 0 state boa ; inline
137 : with-state ( quot -- state )
138 [ <state> state ] dip
139 [ get-state ] compose
140 with-variable ; inline
142 : update-state ( bytes -- )
144 '[ _ + ] change-bytes
145 [ 1 + ] change-count drop ; inline
147 :: store-chunk ( chunk entry n -- )
150 { "n" n } { "data" chunk } }
151 [ chunks-collection ] dip save ; inline
153 :: write-chunks ( stream entry -- length )
154 entry chunk-size>> :> chunk-size
157 chunk-size stream stream-read dup [
158 [ entry get-state count>> store-chunk ]
159 [ length update-state ] bi
162 ] with-state bytes>> ;
164 : (entry-selector) ( entry -- selector )
165 id>> "_id" associate ; inline
167 :: file-md5 ( id -- md5-str )
169 id "filemd5" set-cmd-opt
170 gridfs> bucket>> "root" set-cmd-opt
171 send-cmd "md5" at> ; inline
173 : update-entry ( bytes entry -- entry )
174 [ swap >>length dup id>> file-md5 >>md5 ]
175 [ nip [ (entry-selector) ] [ ] bi
176 [ length>> "length" associate "$set" associate
177 [ files-collection ] 2dip <update> update ]
178 [ md5>> "md5" associate "$set" associate
179 [ files-collection ] 2dip <update> update ] 2bi
182 TUPLE: gridfs-input-stream entry chunk n offset cpos ;
184 : <gridfs-input-stream> ( entry -- stream )
185 [ gridfs-input-stream new ] dip
186 >>entry 0 >>offset 0 >>cpos -1 >>n ;
190 : write-entry ( input-stream entry -- entry )
191 create-entry [ write-chunks ] keep update-entry ;
193 : get-entry ( id -- entry )
194 [ files-collection ] dip
195 "_id" associate <query> find-one assoc>entry ;
197 : open-entry ( entry -- input-stream )
198 <gridfs-input-stream> ;
200 : entry-contents ( entry -- bytearray )
201 <gridfs-input-stream> stream-contents ;
205 : load-chunk ( stream -- chunk/f )
206 [ entry>> id>> "files_id" associate ]
207 [ n>> "n" associate ] bi assoc-union
208 [ chunks-collection ] dip
209 <query> find-one dup [ assoc>chunk ] when ;
211 : exhausted? ( stream -- boolean )
212 [ offset>> ] [ entry>> length>> ] bi = ; inline
214 : fresh? ( stream -- boolean )
215 [ offset>> 0 = ] [ chunk>> f = ] bi and ; inline
217 : data-available ( stream -- int/f )
218 [ cpos>> ] [ chunk>> data>> length ] bi
219 2dup < [ swap - ] [ 2drop f ] if ; inline
221 : next-chunk ( stream -- available chunk/f )
222 0 >>cpos [ 1 + ] change-n
223 [ ] [ load-chunk ] bi >>chunk
224 [ data-available ] [ chunk>> ] bi ; inline
226 : ?chunk ( stream -- available chunk/f )
227 dup fresh? [ next-chunk ] [
228 dup exhausted? [ drop 0 f ] [
229 dup data-available [ swap chunk>> ] [ next-chunk ] if*
233 : set-stream ( n stream -- )
236 [ over entry>> chunk-size>> /mod [ >>n ] [ >>cpos ] bi* drop ]
237 [ drop dup load-chunk >>chunk drop ]
240 :: advance-stream ( n stream -- )
241 stream [ n + ] change-cpos [ n + ] change-offset drop ; inline
243 : read-part ( n stream chunk -- seq/f )
244 [ [ cpos>> swap [ drop ] [ + ] 2bi ] [ data>> ] bi* <slice> ]
245 [ drop advance-stream ] 3bi ; inline
247 :: (stream-read-partial) ( n stream -- seq/f )
248 stream ?chunk :> chunk :> available
251 [ n ] [ available ] if
252 stream chunk read-part
255 :: (stream-read) ( n stream acc -- )
256 n stream (stream-read-partial)
258 { [ dup not ] [ drop ] }
259 { [ dup length n = ] [ acc push-all ] }
260 { [ dup length n < ] [
261 [ acc push-all ] [ length ] bi
262 n swap - stream acc (stream-read) ]
264 } cond ; inline recursive
268 M: gridfs-input-stream stream-element-type drop +byte+ ;
270 M: gridfs-input-stream stream-read ( n stream -- seq/f )
271 over <byte-vector> [ (stream-read) ] [ ] bi
272 [ f ] [ >byte-array ] if-empty ;
274 M: gridfs-input-stream stream-read-partial ( n stream -- seq/f )
275 (stream-read-partial) ;
277 M: gridfs-input-stream stream-tell ( stream -- n )
280 M: gridfs-input-stream stream-seek ( n seek-type stream -- )
283 [ "seek-type not supported" throw ] if ;
285 M: gridfs-input-stream dispose drop ;