USING: accessors arrays assocs base64 bson.constants byte-arrays byte-vectors calendar combinators combinators.short-circuit destructors formatting fry hashtables io kernel linked-assocs locals math math.parser mongodb.cmd mongodb.connection mongodb.driver mongodb.msg namespaces sequences splitting strings ; FROM: mongodb.driver => update ; IN: mongodb.gridfs CONSTANT: default-chunk-size 262144 TUPLE: gridfs { bucket string } { files string } { chunks string } ; ( -- gridfs ) gridfs get ; inline : files-collection ( -- str ) gridfs> files>> ; inline : chunks-collection ( -- str ) gridfs> chunks>> ; inline : init-gridfs ( gridfs -- ) chunks>> "ChunkIdx" H{ { "files_id" 1 } { "n" 1 } } ensure-index ; inline PRIVATE> : ( bucket -- gridfs ) [ ] [ "files" "%s.%s" sprintf ] [ "chunks" "%s.%s" sprintf ] tri gridfs boa [ init-gridfs ] keep ; : with-gridfs ( gridfs quot -- * ) [ gridfs ] dip with-variable ; inline TUPLE: entry { id oid } { filename string } { content-type string } { length integer } { chunk-size integer } { created timestamp } { aliases array } { metadata hashtable } { md5 string } ; base64 ( id -- str ) [ a>> >hex ] [ b>> >hex ] bi 2array "#" join >base64 >string ; inline : base64>id ( str -- objid ) base64> >string "#" split [ first ] [ second ] bi [ hex> ] bi@ oid boa ; inline PRIVATE> : ( name content-type -- entry ) entry new swap >>content-type swap >>filename >>id 0 >>length default-chunk-size >>chunk-size now >>created ; inline ( assoc key -- value/f ) swap at ; inline :: >set-at ( assoc value key -- ) value key assoc set-at ; inline : (update-file) ( entry assoc -- entry ) { [ "_id" at> >>id ] [ "filename" at> >>filename ] [ "contentType" at> >>content-type ] [ "length" at> >>length ] [ "chunkSize" at> >>chunk-size ] [ "uploadDate" at> >>created ] [ "aliases" at> >>aliases ] [ "metadata" at> >>metadata ] [ "md5" at> >>md5 ] } cleave ; inline : assoc>chunk ( assoc -- chunk ) [ chunk new ] dip { [ "_id" at> >>id ] [ "files_id" at> >>fileid ] [ "n" at> >>n ] [ "data" at> >>data ] } cleave ; : assoc>entry ( assoc -- entry ) [ entry new ] dip (update-file) ; : entry>assoc ( entry -- assoc ) [ H{ } clone ] dip { [ id>> "_id" >set-at ] [ filename>> "filename" >set-at ] [ content-type>> "contentType" >set-at ] [ length>> "length" >set-at ] [ chunk-size>> "chunkSize" >set-at ] [ created>> "uploadDate" >set-at ] [ aliases>> "aliases" >set-at ] [ metadata>> "metadata" >set-at ] [ md5>> "md5" >set-at ] [ drop ] } 2cleave ; inline : create-entry ( entry -- entry ) [ [ files-collection ] dip entry>assoc save ] [ ] bi ; TUPLE: state bytes count ; : ( -- state ) 0 0 state boa ; inline : get-state ( -- n ) state get ; inline : with-state ( quot -- state ) [ state ] dip [ get-state ] compose with-variable ; inline : update-state ( bytes -- ) [ get-state ] dip '[ _ + ] change-bytes [ 1 + ] change-count drop ; inline :: store-chunk ( chunk entry n -- ) entry id>> :> id H{ { "files_id" id } { "n" n } { "data" chunk } } [ chunks-collection ] dip save ; inline :: write-chunks ( stream entry -- length ) entry chunk-size>> :> chunk-size [ [ chunk-size stream stream-read dup [ [ entry get-state count>> store-chunk ] [ length update-state ] bi ] when* ] loop ] with-state bytes>> ; : (entry-selector) ( entry -- selector ) id>> "_id" associate ; inline :: file-md5 ( id -- md5-str ) filemd5-cmd make-cmd id "filemd5" set-cmd-opt gridfs> bucket>> "root" set-cmd-opt send-cmd "md5" at> ; inline : update-entry ( bytes entry -- entry ) [ swap >>length dup id>> file-md5 >>md5 ] [ nip [ (entry-selector) ] [ ] bi [ length>> "length" associate "$set" associate [ files-collection ] 2dip update ] [ md5>> "md5" associate "$set" associate [ files-collection ] 2dip update ] 2bi ] 2bi ; TUPLE: gridfs-input-stream entry chunk n offset cpos ; : ( entry -- stream ) [ gridfs-input-stream new ] dip >>entry 0 >>offset 0 >>cpos -1 >>n ; PRIVATE> : write-entry ( input-stream entry -- entry ) create-entry [ write-chunks ] keep update-entry ; : get-entry ( id -- entry ) [ files-collection ] dip "_id" associate find-one assoc>entry ; : open-entry ( entry -- input-stream ) ; : entry-contents ( entry -- bytearray ) stream-contents ; > id>> "files_id" associate ] [ n>> "n" associate ] bi assoc-union [ chunks-collection ] dip find-one dup [ assoc>chunk ] when ; : exhausted? ( stream -- boolean ) [ offset>> ] [ entry>> length>> ] bi = ; inline : fresh? ( stream -- boolean ) [ offset>> 0 = ] [ chunk>> f = ] bi and ; inline : data-available ( stream -- int/f ) [ cpos>> ] [ chunk>> data>> length ] bi 2dup < [ swap - ] [ 2drop f ] if ; inline : next-chunk ( stream -- available chunk/f ) 0 >>cpos [ 1 + ] change-n [ ] [ load-chunk ] bi >>chunk [ data-available ] [ chunk>> ] bi ; inline : ?chunk ( stream -- available chunk/f ) dup fresh? [ next-chunk ] [ dup exhausted? [ drop 0 f ] [ dup data-available [ swap chunk>> ] [ next-chunk ] if* ] if ] if ; inline : set-stream ( n stream -- ) swap { [ >>offset drop ] [ over entry>> chunk-size>> /mod [ >>n ] [ >>cpos ] bi* drop ] [ drop dup load-chunk >>chunk drop ] } 2cleave ; inline :: advance-stream ( n stream -- ) stream [ n + ] change-cpos [ n + ] change-offset drop ; inline : read-part ( n stream chunk -- seq/f ) [ [ cpos>> swap [ drop ] [ + ] 2bi ] [ data>> ] bi* ] [ drop advance-stream ] 3bi ; inline :: (stream-read-partial) ( n stream -- seq/f ) stream ?chunk :> chunk :> available chunk [ n available < [ n ] [ available ] if stream chunk read-part ] [ f ] if ; inline :: (stream-read) ( n stream acc -- ) n stream (stream-read-partial) { { [ dup not ] [ drop ] } { [ dup length n = ] [ acc push-all ] } { [ dup length n < ] [ [ acc push-all ] [ length ] bi n swap - stream acc (stream-read) ] } } cond ; inline recursive PRIVATE> M: gridfs-input-stream stream-element-type drop +byte+ ; M: gridfs-input-stream stream-read ( n stream -- seq/f ) over [ (stream-read) ] [ ] bi [ f ] [ >byte-array ] if-empty ; M: gridfs-input-stream stream-read-partial ( n stream -- seq/f ) (stream-read-partial) ; M: gridfs-input-stream stream-tell ( stream -- n ) offset>> ; M: gridfs-input-stream stream-seek ( n seek-type stream -- ) swap seek-absolute = [ set-stream ] [ "seek-type not supported" throw ] if ; M: gridfs-input-stream dispose drop ;