fixed reading of multiple bson objects in one message (tracking bytes read and comparing with overall message size)
parent
399838b960
commit
e5ba1d2509
|
@ -1,6 +1,6 @@
|
||||||
USING: io io.encodings.utf8 io.encodings.binary alien.c-types alien.strings math
|
USING: io io.encodings.utf8 io.encodings.binary alien.c-types alien.strings math
|
||||||
bson.writer sequences kernel accessors io.streams.byte-array fry generalizations
|
bson.writer sequences kernel accessors io.streams.byte-array fry generalizations
|
||||||
combinators bson.reader sequences tools.walker assocs strings linked-assocs ;
|
combinators bson.reader sequences tools.walker assocs strings linked-assocs namespaces ;
|
||||||
|
|
||||||
IN: mongodb.msg
|
IN: mongodb.msg
|
||||||
|
|
||||||
|
@ -81,15 +81,26 @@ GENERIC: write-message ( message -- )
|
||||||
|
|
||||||
CONSTANT: MSG-HEADER-SIZE 16
|
CONSTANT: MSG-HEADER-SIZE 16
|
||||||
|
|
||||||
|
SYMBOL: msg-bytes-read
|
||||||
|
|
||||||
|
: bytes-read> ( -- integer )
|
||||||
|
msg-bytes-read get ; inline
|
||||||
|
|
||||||
|
: >bytes-read ( integer -- )
|
||||||
|
msg-bytes-read set ; inline
|
||||||
|
|
||||||
|
: change-bytes-read ( integer -- )
|
||||||
|
bytes-read> [ 0 ] unless* + >bytes-read ; inline
|
||||||
|
|
||||||
: write-byte ( byte -- ) <char> write ; inline
|
: write-byte ( byte -- ) <char> write ; inline
|
||||||
: write-int32 ( int -- ) <int> write ; inline
|
: write-int32 ( int -- ) <int> write ; inline
|
||||||
: write-double ( real -- ) <double> write ; inline
|
: write-double ( real -- ) <double> write ; inline
|
||||||
: write-cstring ( string -- ) utf8 string>alien write ; inline
|
: write-cstring ( string -- ) utf8 string>alien write ; inline
|
||||||
: write-longlong ( object -- ) <longlong> write ; inline
|
: write-longlong ( object -- ) <longlong> write ; inline
|
||||||
|
|
||||||
: read-int32 ( -- int32 ) 4 read *int ; inline
|
: read-int32 ( -- int32 ) 4 [ read *int ] [ change-bytes-read ] bi ; inline
|
||||||
: read-longlong ( -- longlong ) 8 read *longlong ; inline
|
: read-longlong ( -- longlong ) 8 [ read *longlong ] [ change-bytes-read ] bi ; inline
|
||||||
: read-byte-raw ( -- byte-raw ) 1 read ; inline
|
: read-byte-raw ( -- byte-raw ) 1 [ read ] [ change-bytes-read ] bi ; inline
|
||||||
: read-byte ( -- byte ) read-byte-raw *char ; inline
|
: read-byte ( -- byte ) read-byte-raw *char ; inline
|
||||||
|
|
||||||
: (read-cstring) ( acc -- acc )
|
: (read-cstring) ( acc -- acc )
|
||||||
|
@ -117,13 +128,21 @@ M: mdb-query-op (read-message) ( msg-stub opcode -- message )
|
||||||
read-cstring >>collection
|
read-cstring >>collection
|
||||||
read-int32 >>skip#
|
read-int32 >>skip#
|
||||||
read-int32 >>return#
|
read-int32 >>return#
|
||||||
H{ } stream>assoc >>query ;
|
H{ } stream>assoc change-bytes-read >>query ! message length
|
||||||
|
dup length>> bytes-read> >
|
||||||
|
[ H{ } stream>assoc change-bytes-read >>returnfields
|
||||||
|
dup length>> bytes-read> >
|
||||||
|
[ H{ } stream>assoc drop >>orderby ] when
|
||||||
|
] when ;
|
||||||
|
|
||||||
M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
|
M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
|
||||||
drop
|
drop
|
||||||
[ mdb-insert-msg new ] dip copy-header
|
[ mdb-insert-msg new ] dip copy-header
|
||||||
read-cstring >>collection
|
read-cstring >>collection
|
||||||
H{ } stream>assoc >>objects ;
|
V{ } clone >>objects
|
||||||
|
[ '[ _ length>> bytes-read> > ] ] keep tuck
|
||||||
|
'[ H{ } stream>assoc change-bytes-read _ objects>> push ]
|
||||||
|
[ ] while ;
|
||||||
|
|
||||||
M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
|
M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
|
||||||
drop
|
drop
|
||||||
|
@ -131,7 +150,7 @@ M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
|
||||||
read-longlong >>cursor
|
read-longlong >>cursor
|
||||||
read-int32 >>start#
|
read-int32 >>start#
|
||||||
read-int32 [ >>returned# ] keep
|
read-int32 [ >>returned# ] keep
|
||||||
[ H{ } stream>assoc ] accumulator [ times ] dip >>objects ;
|
[ H{ } stream>assoc drop ] accumulator [ times ] dip >>objects ;
|
||||||
|
|
||||||
: read-header ( message -- message )
|
: read-header ( message -- message )
|
||||||
read-int32 >>length
|
read-int32 >>length
|
||||||
|
@ -150,6 +169,7 @@ PRIVATE>
|
||||||
|
|
||||||
: read-message ( -- message )
|
: read-message ( -- message )
|
||||||
mdb-msg new
|
mdb-msg new
|
||||||
|
0 >bytes-read
|
||||||
read-header
|
read-header
|
||||||
[ ] [ opcode>> ] bi (read-message) ;
|
[ ] [ opcode>> ] bi (read-message) ;
|
||||||
|
|
||||||
|
@ -173,10 +193,10 @@ M: mdb-query-msg write-message ( message -- )
|
||||||
] (write-message) ;
|
] (write-message) ;
|
||||||
|
|
||||||
M: mdb-insert-msg write-message ( message -- )
|
M: mdb-insert-msg write-message ( message -- )
|
||||||
dup
|
dup
|
||||||
'[ _
|
'[ _
|
||||||
[ 0 write-int32 ] dip
|
[ 0 write-int32 ] dip
|
||||||
[ collection>> write-cstring ] keep
|
[ collection>> write-cstring ] keep
|
||||||
objects>> [ assoc>array write ] each
|
objects>> [ assoc>array write ] each
|
||||||
] (write-message) ;
|
] (write-message) ;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue