factor/extra/mongodb/gridfs/gridfs.factor

286 lines
7.4 KiB
Factor

USING: accessors arrays assocs base64 bson.constants
byte-arrays byte-vectors calendar combinators
combinators.short-circuit destructors formatting fry hashtables
io kernel linked-assocs locals math math.parser mongodb.cmd
mongodb.connection mongodb.driver mongodb.msg namespaces
sequences splitting strings ;
FROM: mongodb.driver => update ;
IN: mongodb.gridfs
CONSTANT: default-chunk-size 262144
TUPLE: gridfs
{ bucket string }
{ files string }
{ chunks string } ;
<PRIVATE
: gridfs> ( -- gridfs )
gridfs get ; inline
: files-collection ( -- str ) gridfs> files>> ; inline
: chunks-collection ( -- str ) gridfs> chunks>> ; inline
: init-gridfs ( gridfs -- )
chunks>> "ChunkIdx" H{ { "files_id" 1 } { "n" 1 } }
<index-spec> ensure-index ; inline
PRIVATE>
: <gridfs> ( bucket -- gridfs )
[ ]
[ "files" "%s.%s" sprintf ]
[ "chunks" "%s.%s" sprintf ] tri
gridfs boa [ init-gridfs ] keep ;
: with-gridfs ( gridfs quot -- * )
[ gridfs ] dip with-variable ; inline
TUPLE: entry
{ id oid }
{ filename string }
{ content-type string }
{ length integer }
{ chunk-size integer }
{ created timestamp }
{ aliases array }
{ metadata hashtable }
{ md5 string } ;
<PRIVATE
: id>base64 ( id -- str )
[ a>> >hex ] [ b>> >hex ] bi
2array "#" join >base64 >string ; inline
: base64>id ( str -- objid )
base64> >string "#" split
[ first ] [ second ] bi
[ hex> ] bi@ oid boa ; inline
PRIVATE>
: <entry> ( name content-type -- entry )
entry new
swap >>content-type swap >>filename
<oid> >>id 0 >>length default-chunk-size >>chunk-size
now >>created ; inline
<PRIVATE
TUPLE: chunk
{ id oid }
{ fileid oid }
{ n integer }
{ data byte-array } ;
: at> ( assoc key -- value/f )
swap at ; inline
:: >set-at ( assoc value key -- )
value key assoc set-at ; inline
: (update-file) ( entry assoc -- entry )
{
[ "_id" at> >>id ]
[ "filename" at> >>filename ]
[ "contentType" at> >>content-type ]
[ "length" at> >>length ]
[ "chunkSize" at> >>chunk-size ]
[ "uploadDate" at> >>created ]
[ "aliases" at> >>aliases ]
[ "metadata" at> >>metadata ]
[ "md5" at> >>md5 ]
} cleave ; inline
: assoc>chunk ( assoc -- chunk )
[ chunk new ] dip
{
[ "_id" at> >>id ]
[ "files_id" at> >>fileid ]
[ "n" at> >>n ]
[ "data" at> >>data ]
} cleave ;
: assoc>entry ( assoc -- entry )
[ entry new ] dip (update-file) ;
: entry>assoc ( entry -- assoc )
[ H{ } clone ] dip
{
[ id>> "_id" >set-at ]
[ filename>> "filename" >set-at ]
[ content-type>> "contentType" >set-at ]
[ length>> "length" >set-at ]
[ chunk-size>> "chunkSize" >set-at ]
[ created>> "uploadDate" >set-at ]
[ aliases>> "aliases" >set-at ]
[ metadata>> "metadata" >set-at ]
[ md5>> "md5" >set-at ]
[ drop ]
} 2cleave ; inline
: create-entry ( entry -- entry )
[ [ files-collection ] dip entry>assoc save ] [ ] bi ;
TUPLE: state bytes count ;
: <state> ( -- state )
0 0 state boa ; inline
: get-state ( -- n )
state get ; inline
: with-state ( quot -- state )
[ <state> state ] dip
[ get-state ] compose
with-variable ; inline
: update-state ( bytes -- )
[ get-state ] dip
'[ _ + ] change-bytes
[ 1 + ] change-count drop ; inline
:: store-chunk ( chunk entry n -- )
entry id>> :> id
H{ { "files_id" id }
{ "n" n } { "data" chunk } }
[ chunks-collection ] dip save ; inline
:: write-chunks ( stream entry -- length )
entry chunk-size>> :> chunk-size
[
[
chunk-size stream stream-read dup [
[ entry get-state count>> store-chunk ]
[ length update-state ] bi
] when*
] loop
] with-state bytes>> ;
: (entry-selector) ( entry -- selector )
id>> "_id" associate ; inline
:: file-md5 ( id -- md5-str )
filemd5-cmd make-cmd
id "filemd5" set-cmd-opt
gridfs> bucket>> "root" set-cmd-opt
send-cmd "md5" at> ; inline
: update-entry ( bytes entry -- entry )
[ swap >>length dup id>> file-md5 >>md5 ]
[ nip [ (entry-selector) ] [ ] bi
[ length>> "length" associate "$set" associate
[ files-collection ] 2dip <update> update ]
[ md5>> "md5" associate "$set" associate
[ files-collection ] 2dip <update> update ] 2bi
] 2bi ;
TUPLE: gridfs-input-stream entry chunk n offset cpos ;
: <gridfs-input-stream> ( entry -- stream )
[ gridfs-input-stream new ] dip
>>entry 0 >>offset 0 >>cpos -1 >>n ;
PRIVATE>
: write-entry ( input-stream entry -- entry )
create-entry [ write-chunks ] keep update-entry ;
: get-entry ( id -- entry )
[ files-collection ] dip
"_id" associate <query> find-one assoc>entry ;
: open-entry ( entry -- input-stream )
<gridfs-input-stream> ;
: entry-contents ( entry -- bytearray )
<gridfs-input-stream> stream-contents ;
<PRIVATE
: load-chunk ( stream -- chunk/f )
[ entry>> id>> "files_id" associate ]
[ n>> "n" associate ] bi assoc-union
[ chunks-collection ] dip
<query> find-one dup [ assoc>chunk ] when ;
: exhausted? ( stream -- boolean )
[ offset>> ] [ entry>> length>> ] bi = ; inline
: fresh? ( stream -- boolean )
[ offset>> 0 = ] [ chunk>> f = ] bi and ; inline
: data-available ( stream -- int/f )
[ cpos>> ] [ chunk>> data>> length ] bi
2dup < [ swap - ] [ 2drop f ] if ; inline
: next-chunk ( stream -- available chunk/f )
0 >>cpos [ 1 + ] change-n
[ ] [ load-chunk ] bi >>chunk
[ data-available ] [ chunk>> ] bi ; inline
: ?chunk ( stream -- available chunk/f )
dup fresh? [ next-chunk ] [
dup exhausted? [ drop 0 f ] [
dup data-available [ swap chunk>> ] [ next-chunk ] if*
] if
] if ; inline
: set-stream ( n stream -- )
swap {
[ >>offset drop ]
[ over entry>> chunk-size>> /mod [ >>n ] [ >>cpos ] bi* drop ]
[ drop dup load-chunk >>chunk drop ]
} 2cleave ; inline
:: advance-stream ( n stream -- )
stream [ n + ] change-cpos [ n + ] change-offset drop ; inline
: read-part ( n stream chunk -- seq/f )
[ [ cpos>> swap [ drop ] [ + ] 2bi ] [ data>> ] bi* <slice> ]
[ drop advance-stream ] 3bi ; inline
:: (stream-read-partial) ( n stream -- seq/f )
stream ?chunk :> chunk :> available
chunk [
n available <
[ n ] [ available ] if
stream chunk read-part
] [ f ] if ; inline
:: (stream-read) ( n stream acc -- )
n stream (stream-read-partial)
{
{ [ dup not ] [ drop ] }
{ [ dup length n = ] [ acc push-all ] }
{ [ dup length n < ] [
[ acc push-all ] [ length ] bi
n swap - stream acc (stream-read) ]
}
} cond ; inline recursive
PRIVATE>
M: gridfs-input-stream stream-element-type drop +byte+ ;
M: gridfs-input-stream stream-read ( n stream -- seq/f )
over <byte-vector> [ (stream-read) ] [ ] bi
[ f ] [ >byte-array ] if-empty ;
M: gridfs-input-stream stream-read-partial ( n stream -- seq/f )
(stream-read-partial) ;
M: gridfs-input-stream stream-tell ( stream -- n )
offset>> ;
M: gridfs-input-stream stream-seek ( n seek-type stream -- )
swap seek-absolute =
[ set-stream ]
[ "seek-type not supported" throw ] if ;
M: gridfs-input-stream dispose drop ;