223 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Factor
		
	
	
		
		
			
		
	
	
			223 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Factor
		
	
	
|  | USING: accessors assocs bson.reader bson.writer byte-arrays | ||
|  | byte-vectors combinators formatting fry io io.binary io.encodings.private | ||
|  | io.encodings.binary io.encodings.string io.encodings.utf8 io.encodings.utf8.private io.files | ||
|  | kernel locals math mongodb.msg namespaces sequences uuid bson.writer.private ;
 | ||
|  | 
 | ||
|  | IN: alien.c-types | ||
|  | 
 | ||
|  | M: byte-vector byte-length length ;
 | ||
|  | 
 | ||
|  | IN: mongodb.operations | ||
|  | 
 | ||
|  | <PRIVATE
 | ||
|  | 
 | ||
|  | PREDICATE: mdb-reply-op < integer OP_Reply = ;
 | ||
|  | PREDICATE: mdb-query-op < integer OP_Query = ;
 | ||
|  | PREDICATE: mdb-insert-op < integer OP_Insert = ;
 | ||
|  | PREDICATE: mdb-update-op < integer OP_Update = ;
 | ||
|  | PREDICATE: mdb-delete-op < integer OP_Delete = ;
 | ||
|  | PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
 | ||
|  | PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
 | ||
|  | 
 | ||
|  | PRIVATE>
 | ||
|  | 
 | ||
|  | GENERIC: write-message ( message -- )
 | ||
|  | 
 | ||
|  | <PRIVATE
 | ||
|  | 
 | ||
|  | CONSTANT: MSG-HEADER-SIZE 16
 | ||
|  | 
 | ||
|  | SYMBOL: msg-bytes-read  | ||
|  | 
 | ||
|  | : bytes-read> ( -- integer )
 | ||
|  |     msg-bytes-read get ; inline
 | ||
|  | 
 | ||
|  | : >bytes-read ( integer -- )
 | ||
|  |     msg-bytes-read set ; inline
 | ||
|  | 
 | ||
|  | : change-bytes-read ( integer -- )
 | ||
|  |     bytes-read> [ 0 ] unless* + >bytes-read ; inline
 | ||
|  | 
 | ||
|  | : read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline
 | ||
|  | : read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline
 | ||
|  | : read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline
 | ||
|  | : read-byte ( -- byte ) read-byte-raw first ; inline
 | ||
|  | 
 | ||
|  | : (read-cstring) ( acc -- )
 | ||
|  |     [ read-byte ] dip ! b acc | ||
|  |     2dup push             ! b acc | ||
|  |     [ 0 = ] dip      ! bool acc | ||
|  |     '[ _ (read-cstring) ] unless ; inline recursive
 | ||
|  | 
 | ||
|  | : read-cstring ( -- string )
 | ||
|  |     BV{ } clone
 | ||
|  |     [ (read-cstring) ] keep
 | ||
|  |     [ zero? ] trim-tail
 | ||
|  |     >byte-array utf8 decode ; inline
 | ||
|  | 
 | ||
|  | GENERIC: (read-message) ( message opcode -- message )
 | ||
|  | 
 | ||
|  | : copy-header ( message msg-stub -- message )
 | ||
|  |     [ length>> ] keep [ >>length ] dip
 | ||
|  |     [ req-id>> ] keep [ >>req-id ] dip
 | ||
|  |     [ resp-id>> ] keep [ >>resp-id ] dip
 | ||
|  |     [ opcode>> ] keep [ >>opcode ] dip
 | ||
|  |     flags>> >>flags ;
 | ||
|  | 
 | ||
|  | M: mdb-query-op (read-message) ( msg-stub opcode -- message )
 | ||
|  |     drop
 | ||
|  |     [ mdb-query-msg new ] dip copy-header | ||
|  |     read-cstring >>collection | ||
|  |     read-int32 >>skip# | ||
|  |     read-int32 >>return# | ||
|  |     H{ } stream>assoc change-bytes-read >>query  | ||
|  |     dup length>> bytes-read> >
 | ||
|  |     [ H{ } stream>assoc change-bytes-read >>returnfields ] when ;
 | ||
|  | 
 | ||
|  | M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
 | ||
|  |     drop
 | ||
|  |     [ mdb-insert-msg new ] dip copy-header | ||
|  |     read-cstring >>collection | ||
|  |     V{ } clone >>objects | ||
|  |     [ '[ _ length>> bytes-read> > ] ] keep tuck | ||
|  |     '[ H{ } stream>assoc change-bytes-read _ objects>> push ] | ||
|  |     while ;
 | ||
|  | 
 | ||
|  | M: mdb-delete-op (read-message) ( msg-stub opcode -- message )
 | ||
|  |     drop
 | ||
|  |     [ mdb-delete-msg new ] dip copy-header | ||
|  |     read-cstring >>collection | ||
|  |     H{ } stream>assoc change-bytes-read >>selector ;
 | ||
|  | 
 | ||
|  | M: mdb-getmore-op (read-message) ( msg-stub opcode -- message )
 | ||
|  |     drop
 | ||
|  |     [ mdb-getmore-msg new ] dip copy-header | ||
|  |     read-cstring >>collection | ||
|  |     read-int32 >>return# | ||
|  |     read-longlong >>cursor ;
 | ||
|  | 
 | ||
|  | M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message )
 | ||
|  |     drop
 | ||
|  |     [ mdb-killcursors-msg new ] dip copy-header | ||
|  |     read-int32 >>cursors# | ||
|  |     V{ } clone >>cursors | ||
|  |     [ [ cursors#>> ] keep  | ||
|  |       '[ read-longlong _ cursors>> push ] times ] keep ;
 | ||
|  | 
 | ||
|  | M: mdb-update-op (read-message) ( msg-stub opcode -- message )
 | ||
|  |     drop
 | ||
|  |     [ mdb-update-msg new ] dip copy-header | ||
|  |     read-cstring >>collection | ||
|  |     read-int32 >>upsert? | ||
|  |     H{ } stream>assoc change-bytes-read >>selector | ||
|  |     H{ } stream>assoc change-bytes-read >>object ;
 | ||
|  | 
 | ||
|  | M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
 | ||
|  |     drop
 | ||
|  |     [ <mdb-reply-msg> ] dip copy-header | ||
|  |     read-longlong >>cursor | ||
|  |     read-int32 >>start# | ||
|  |     read-int32 [ >>returned# ] keep
 | ||
|  |     [ H{ } stream>assoc drop ] accumulator [ times ] dip >>objects ;     | ||
|  | 
 | ||
|  | : read-header ( message -- message )
 | ||
|  |     read-int32 >>length | ||
|  |     read-int32 >>req-id | ||
|  |     read-int32 >>resp-id | ||
|  |     read-int32 >>opcode | ||
|  |     read-int32 >>flags ; inline
 | ||
|  | 
 | ||
|  | : write-header ( message -- )
 | ||
|  |     [ req-id>> write-int32 ] keep
 | ||
|  |     [ resp-id>> write-int32 ] keep  | ||
|  |     opcode>> write-int32 ; inline
 | ||
|  | 
 | ||
|  | PRIVATE>
 | ||
|  | 
 | ||
|  | : read-message ( -- message )
 | ||
|  |     mdb-msg new
 | ||
|  |     0 >bytes-read | ||
|  |     read-header | ||
|  |     [ ] [ opcode>> ] bi (read-message) ;
 | ||
|  | 
 | ||
|  | <PRIVATE
 | ||
|  | 
 | ||
|  | USE: tools.walker | ||
|  | 
 | ||
|  | : dump-to-file ( array -- )
 | ||
|  |     [ uuid1 "/tmp/mfb/%s.dump" sprintf binary ] dip
 | ||
|  |     '[ _ write ] with-file-writer ;
 | ||
|  | 
 | ||
|  | : (write-message) ( message quot -- )     | ||
|  |     '[ [ [ _ write-header ] dip _ call ] with-length-prefix ] with-buffer | ||
|  |     ! [ dump-to-file ] keep | ||
|  |     write flush ; inline
 | ||
|  | 
 | ||
|  | : build-query-object ( query -- selector )
 | ||
|  |     [let | selector [ H{ } clone ] | | ||
|  |         { [ orderby>> [ "orderby" selector set-at ] when* ] | ||
|  |           [ explain>> [ "$explain" selector set-at ] when* ] | ||
|  |           [ hint>> [ "$hint" selector set-at ] when* ]  | ||
|  |           [ query>> "query" selector set-at ] | ||
|  |         } cleave
 | ||
|  |         selector
 | ||
|  |     ] ;      | ||
|  | 
 | ||
|  | PRIVATE>
 | ||
|  | 
 | ||
|  | M: mdb-query-msg write-message ( message -- )
 | ||
|  |      dup
 | ||
|  |      '[ _  | ||
|  |         [ flags>> write-int32 ] keep  | ||
|  |         [ collection>> write-cstring ] keep
 | ||
|  |         [ skip#>> write-int32 ] keep
 | ||
|  |         [ return#>> write-int32 ] keep
 | ||
|  |         [ build-query-object assoc>stream ] keep
 | ||
|  |         returnfields>> [ assoc>stream ] when*  | ||
|  |      ] (write-message) ;
 | ||
|  |   | ||
|  | M: mdb-insert-msg write-message ( message -- )
 | ||
|  |     dup
 | ||
|  |     '[ _ | ||
|  |        [ flags>> write-int32 ] keep
 | ||
|  |        [ collection>> write-cstring ] keep
 | ||
|  |        objects>> [ assoc>stream ] each
 | ||
|  |     ] (write-message) ;
 | ||
|  | 
 | ||
|  | M: mdb-update-msg write-message ( message -- )
 | ||
|  |     dup
 | ||
|  |     '[ _ | ||
|  |        [ flags>> write-int32 ] keep
 | ||
|  |        [ collection>> write-cstring ] keep
 | ||
|  |        [ upsert?>> write-int32 ] keep
 | ||
|  |        [ selector>> assoc>stream ] keep
 | ||
|  |        object>> assoc>stream | ||
|  |     ] (write-message) ;
 | ||
|  | 
 | ||
|  | M: mdb-delete-msg write-message ( message -- )
 | ||
|  |     dup
 | ||
|  |     '[ _ | ||
|  |        [ flags>> write-int32 ] keep
 | ||
|  |        [ collection>> write-cstring ] keep
 | ||
|  |        0 write-int32 | ||
|  |        selector>> assoc>stream | ||
|  |     ] (write-message) ;
 | ||
|  | 
 | ||
|  | M: mdb-getmore-msg write-message ( message -- )
 | ||
|  |     dup
 | ||
|  |     '[ _ | ||
|  |        [ flags>> write-int32 ] keep
 | ||
|  |        [ collection>> write-cstring ] keep
 | ||
|  |        [ return#>> write-int32 ] keep
 | ||
|  |        cursor>> write-longlong | ||
|  |     ] (write-message) ;
 | ||
|  | 
 | ||
|  | M: mdb-killcursors-msg write-message ( message -- )
 | ||
|  |     dup
 | ||
|  |     '[ _ | ||
|  |        [ flags>> write-int32 ] keep
 | ||
|  |        [ cursors#>> write-int32 ] keep
 | ||
|  |        cursors>> [ write-longlong ] each
 | ||
|  |     ] (write-message) ;
 | ||
|  | 
 |