286 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Factor
		
	
	
			
		
		
	
	
			286 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Factor
		
	
	
| USING: accessors arrays assocs base64 bson.constants
 | |
| byte-arrays byte-vectors calendar combinators
 | |
| combinators.short-circuit destructors formatting fry hashtables
 | |
| io kernel linked-assocs locals math math.parser mongodb.cmd
 | |
| mongodb.connection mongodb.driver mongodb.msg namespaces
 | |
| sequences splitting strings ;
 | |
| FROM: mongodb.driver => update ;
 | |
| IN: mongodb.gridfs
 | |
| 
 | |
| CONSTANT: default-chunk-size 262144
 | |
| 
 | |
| TUPLE: gridfs 
 | |
|     { bucket string } 
 | |
|     { files string }
 | |
|     { chunks string } ;
 | |
| 
 | |
| 
 | |
| <PRIVATE
 | |
| 
 | |
| : gridfs> ( -- gridfs )
 | |
|     gridfs get ; inline
 | |
| 
 | |
| : files-collection ( -- str ) gridfs> files>> ; inline
 | |
| : chunks-collection ( -- str ) gridfs> chunks>> ; inline
 | |
| 
 | |
| 
 | |
| : init-gridfs ( gridfs -- )
 | |
|     chunks>> "ChunkIdx" H{ { "files_id" 1 } { "n" 1 } } 
 | |
|     <index-spec> ensure-index ; inline
 | |
| 
 | |
| PRIVATE>
 | |
| 
 | |
| : <gridfs> ( bucket -- gridfs )
 | |
|     [  ] 
 | |
|     [ "files" "%s.%s" sprintf  ] 
 | |
|     [ "chunks" "%s.%s" sprintf ] tri
 | |
|     gridfs boa [ init-gridfs ] keep ;
 | |
| 
 | |
| : with-gridfs ( gridfs quot -- * )
 | |
|     [ gridfs ] dip with-variable ; inline
 | |
| 
 | |
| TUPLE: entry 
 | |
|     { id oid }
 | |
|     { filename string }
 | |
|     { content-type string }
 | |
|     { length integer }
 | |
|     { chunk-size integer }
 | |
|     { created timestamp }
 | |
|     { aliases array }
 | |
|     { metadata hashtable }
 | |
|     { md5 string } ;
 | |
| 
 | |
| <PRIVATE
 | |
| 
 | |
| : id>base64 ( id -- str )
 | |
|     [ a>> >hex ] [ b>> >hex ] bi 
 | |
|     2array "#" join >base64 >string ; inline
 | |
| 
 | |
| : base64>id ( str -- objid )
 | |
|     base64> >string "#" split 
 | |
|     [ first ] [ second ] bi 
 | |
|     [ hex> ] bi@ oid boa ; inline
 | |
|     
 | |
| PRIVATE>
 | |
| 
 | |
| : <entry> ( name content-type -- entry )
 | |
|     entry new 
 | |
|     swap >>content-type swap >>filename 
 | |
|     <oid> >>id 0 >>length default-chunk-size >>chunk-size 
 | |
|     now >>created ; inline
 | |
| 
 | |
| <PRIVATE 
 | |
| 
 | |
| TUPLE: chunk 
 | |
|     { id oid }
 | |
|     { fileid oid }
 | |
|     { n integer }
 | |
|     { data byte-array } ;
 | |
| 
 | |
| : at> ( assoc key -- value/f )
 | |
|     swap at ; inline
 | |
| 
 | |
| :: >set-at ( assoc value key -- )
 | |
|     value key assoc set-at ; inline
 | |
| 
 | |
| : (update-file) ( entry assoc -- entry )
 | |
|     { 
 | |
|         [ "_id" at> >>id ]
 | |
|         [ "filename" at> >>filename ]
 | |
|         [ "contentType" at> >>content-type ]
 | |
|         [ "length" at> >>length ]
 | |
|         [ "chunkSize" at> >>chunk-size ]
 | |
|         [ "uploadDate" at> >>created ]
 | |
|         [ "aliases" at> >>aliases ]
 | |
|         [ "metadata" at> >>metadata ]
 | |
|         [ "md5" at> >>md5 ]
 | |
|     } cleave ; inline
 | |
| 
 | |
| : assoc>chunk ( assoc -- chunk )
 | |
|     [ chunk new ] dip
 | |
|     {  
 | |
|         [ "_id" at> >>id ]
 | |
|         [ "files_id" at> >>fileid ]
 | |
|         [ "n" at> >>n ]
 | |
|         [ "data" at> >>data ]
 | |
|     } cleave ;
 | |
| 
 | |
| : assoc>entry ( assoc -- entry )
 | |
|     [ entry new ] dip (update-file) ;
 | |
|     
 | |
| : entry>assoc ( entry -- assoc )
 | |
|     [ H{  } clone ] dip
 | |
|     {
 | |
|         [ id>> "_id" >set-at ]
 | |
|         [ filename>> "filename" >set-at ]
 | |
|         [ content-type>> "contentType" >set-at ]
 | |
|         [ length>> "length" >set-at ]
 | |
|         [ chunk-size>> "chunkSize" >set-at ]
 | |
|         [ created>> "uploadDate" >set-at ]
 | |
|         [ aliases>> "aliases" >set-at ]
 | |
|         [ metadata>> "metadata" >set-at ]
 | |
|         [ md5>> "md5" >set-at ]
 | |
|         [ drop ]
 | |
|     } 2cleave ; inline
 | |
| 
 | |
| : create-entry ( entry -- entry )
 | |
|     [ [ files-collection ] dip entry>assoc save ] [ ] bi ;
 | |
| 
 | |
| TUPLE: state bytes count ;
 | |
| 
 | |
| : <state> ( -- state )
 | |
|     0 0 state boa ; inline
 | |
| 
 | |
| : get-state ( -- n )
 | |
|     state get ; inline
 | |
| 
 | |
| : with-state ( quot -- state )
 | |
|     [ <state> state ] dip 
 | |
|     [ get-state ] compose 
 | |
|     with-variable ; inline
 | |
| 
 | |
| : update-state ( bytes -- )
 | |
|     [ get-state ] dip
 | |
|     '[ _ + ] change-bytes 
 | |
|     [ 1 + ] change-count drop ; inline
 | |
| 
 | |
| :: store-chunk ( chunk entry n -- ) 
 | |
|     entry id>> :> id
 | |
|     H{ { "files_id" id }
 | |
|        { "n" n } { "data" chunk } }
 | |
|     [ chunks-collection ] dip save ; inline
 | |
| 
 | |
| :: write-chunks ( stream entry -- length )
 | |
|     entry chunk-size>> :> chunk-size
 | |
|     [
 | |
|         [ 
 | |
|             chunk-size stream stream-read dup [
 | |
|                 [ entry get-state count>> store-chunk ]
 | |
|                 [ length update-state ] bi 
 | |
|             ] when*
 | |
|         ] loop
 | |
|     ] with-state bytes>> ;
 | |
| 
 | |
| : (entry-selector) ( entry -- selector )
 | |
|     id>> "_id" associate ; inline
 | |
| 
 | |
| :: file-md5 ( id -- md5-str )
 | |
|     filemd5-cmd make-cmd
 | |
|     id "filemd5" set-cmd-opt
 | |
|     gridfs> bucket>> "root" set-cmd-opt
 | |
|     send-cmd "md5" at> ; inline
 | |
| 
 | |
| : update-entry ( bytes entry -- entry )
 | |
|     [ swap >>length dup id>> file-md5 >>md5  ]
 | |
|     [ nip [ (entry-selector) ] [  ] bi
 | |
|         [ length>> "length" associate "$set" associate 
 | |
|           [ files-collection ] 2dip <update> update ]
 | |
|         [ md5>> "md5" associate "$set" associate 
 | |
|           [ files-collection ] 2dip <update> update ] 2bi 
 | |
|     ] 2bi ;
 | |
| 
 | |
| TUPLE: gridfs-input-stream entry chunk n offset cpos ;
 | |
| 
 | |
| : <gridfs-input-stream> ( entry -- stream )
 | |
|     [ gridfs-input-stream new ] dip
 | |
|     >>entry 0 >>offset 0 >>cpos -1 >>n ;
 | |
| 
 | |
| PRIVATE>
 | |
| 
 | |
| : write-entry ( input-stream entry -- entry )
 | |
|     create-entry [ write-chunks ] keep update-entry  ;
 | |
| 
 | |
| : get-entry ( id -- entry )
 | |
|     [ files-collection ] dip
 | |
|     "_id" associate <query> find-one assoc>entry ;
 | |
| 
 | |
| : open-entry ( entry -- input-stream )
 | |
|     <gridfs-input-stream> ;
 | |
| 
 | |
| : entry-contents ( entry -- bytearray )
 | |
|     <gridfs-input-stream> stream-contents ;
 | |
| 
 | |
| <PRIVATE
 | |
| 
 | |
| : load-chunk ( stream -- chunk/f )
 | |
|     [ entry>> id>> "files_id" associate ]
 | |
|     [ n>> "n" associate ] bi assoc-union
 | |
|     [ chunks-collection ] dip 
 | |
|     <query> find-one dup [ assoc>chunk ] when ;
 | |
| 
 | |
| : exhausted? ( stream -- boolean )
 | |
|     [ offset>> ] [ entry>> length>> ] bi = ; inline
 | |
| 
 | |
| : fresh? ( stream -- boolean )
 | |
|     [ offset>> 0 = ] [ chunk>> f = ] bi and ; inline
 | |
| 
 | |
| : data-available ( stream -- int/f )
 | |
|     [ cpos>> ] [ chunk>> data>> length ] bi 
 | |
|     2dup < [ swap - ] [ 2drop f ] if ; inline
 | |
| 
 | |
| : next-chunk ( stream -- available chunk/f )
 | |
|     0 >>cpos [ 1 + ] change-n
 | |
|     [  ] [ load-chunk ] bi >>chunk
 | |
|     [ data-available ] [ chunk>> ] bi ; inline
 | |
| 
 | |
| : ?chunk ( stream -- available chunk/f )
 | |
|     dup fresh? [ next-chunk ] [ 
 | |
|         dup exhausted? [ drop 0 f ] [  
 | |
|             dup data-available [ swap chunk>> ] [ next-chunk ] if*
 | |
|         ] if
 | |
|     ] if ; inline
 | |
| 
 | |
| : set-stream ( n stream -- )
 | |
|     swap { 
 | |
|         [ >>offset drop ]
 | |
|         [ over entry>> chunk-size>> /mod [ >>n ] [ >>cpos ] bi* drop ]
 | |
|         [ drop dup load-chunk >>chunk drop ]
 | |
|     } 2cleave ; inline
 | |
| 
 | |
| :: advance-stream ( n stream -- )
 | |
|     stream [ n + ] change-cpos [ n + ] change-offset drop ; inline
 | |
| 
 | |
| : read-part ( n stream chunk -- seq/f )
 | |
|     [ [ cpos>> swap [ drop ] [ + ] 2bi ] [ data>> ] bi* <slice> ]
 | |
|     [ drop advance-stream ] 3bi ; inline
 | |
| 
 | |
| :: (stream-read-partial) ( n stream -- seq/f )
 | |
|     stream ?chunk :> chunk :> available
 | |
|     chunk [
 | |
|         n available < 
 | |
|         [ n ] [ available ] if 
 | |
|         stream chunk read-part 
 | |
|     ] [ f ] if ; inline
 | |
| 
 | |
| :: (stream-read) ( n stream acc -- )
 | |
|     n stream (stream-read-partial)
 | |
|     {
 | |
|         { [ dup not ] [ drop ] }
 | |
|         { [ dup length n = ] [ acc push-all ] }
 | |
|         { [ dup length n < ] [
 | |
|             [ acc push-all ] [ length ] bi
 | |
|             n swap - stream acc (stream-read) ]
 | |
|         }
 | |
|     } cond ; inline recursive 
 | |
| 
 | |
| PRIVATE>
 | |
| 
 | |
| M: gridfs-input-stream stream-element-type drop +byte+ ;
 | |
| 
 | |
| M: gridfs-input-stream stream-read ( n stream -- seq/f )
 | |
|     over <byte-vector> [ (stream-read) ] [ ] bi
 | |
|     dup empty? [ drop f ] [ >byte-array ] if ;
 | |
| 
 | |
| M: gridfs-input-stream stream-read-partial ( n stream -- seq/f )
 | |
|     (stream-read-partial) ;
 | |
| 
 | |
| M: gridfs-input-stream stream-tell ( stream -- n ) 
 | |
|     offset>> ;
 | |
| 
 | |
| M: gridfs-input-stream stream-seek ( n seek-type stream -- )
 | |
|     swap seek-absolute = 
 | |
|     [ set-stream ] 
 | |
|     [ "seek-type not supported" throw ] if ;
 | |
| 
 | |
| M: gridfs-input-stream dispose drop ;
 |