Merge branch 'master' of git://factorcode.org/git/factor
commit
d9cda4188a
|
@ -2,13 +2,16 @@ USING: accessors assocs bson.constants byte-arrays byte-vectors fry io
|
|||
io.binary io.encodings.string io.encodings.utf8 kernel math namespaces
|
||||
sequences serialize arrays calendar io.encodings ;
|
||||
|
||||
FROM: kernel.private => declare ;
|
||||
FROM: io.encodings.private => (read-until) ;
|
||||
|
||||
IN: bson.reader
|
||||
|
||||
<PRIVATE
|
||||
|
||||
TUPLE: element { type integer } name ;
|
||||
TUPLE: state
|
||||
{ size initial: -1 } { read initial: 0 } exemplar
|
||||
{ size initial: -1 } exemplar
|
||||
result scope element ;
|
||||
|
||||
: <state> ( exemplar -- state )
|
||||
|
@ -17,25 +20,25 @@ TUPLE: state
|
|||
clone [ >>result ] [ V{ } clone [ push ] keep >>scope ] bi
|
||||
V{ } clone [ T_Object "" element boa swap push ] keep >>element ;
|
||||
|
||||
PREDICATE: bson-eoo < integer T_EOO = ;
|
||||
PREDICATE: bson-not-eoo < integer T_EOO > ;
|
||||
PREDICATE: bson-eoo < integer T_EOO = ;
|
||||
|
||||
PREDICATE: bson-double < integer T_Double = ;
|
||||
PREDICATE: bson-integer < integer T_Integer = ;
|
||||
PREDICATE: bson-string < integer T_String = ;
|
||||
PREDICATE: bson-object < integer T_Object = ;
|
||||
PREDICATE: bson-oid < integer T_OID = ;
|
||||
PREDICATE: bson-array < integer T_Array = ;
|
||||
PREDICATE: bson-integer < integer T_Integer = ;
|
||||
PREDICATE: bson-double < integer T_Double = ;
|
||||
PREDICATE: bson-date < integer T_Date = ;
|
||||
PREDICATE: bson-binary < integer T_Binary = ;
|
||||
PREDICATE: bson-boolean < integer T_Boolean = ;
|
||||
PREDICATE: bson-regexp < integer T_Regexp = ;
|
||||
PREDICATE: bson-null < integer T_NULL = ;
|
||||
PREDICATE: bson-ref < integer T_DBRef = ;
|
||||
PREDICATE: bson-binary-bytes < integer T_Binary_Bytes = ;
|
||||
PREDICATE: bson-binary-function < integer T_Binary_Function = ;
|
||||
PREDICATE: bson-binary-uuid < integer T_Binary_UUID = ;
|
||||
PREDICATE: bson-binary-custom < integer T_Binary_Custom = ;
|
||||
PREDICATE: bson-oid < integer T_OID = ;
|
||||
PREDICATE: bson-boolean < integer T_Boolean = ;
|
||||
PREDICATE: bson-date < integer T_Date = ;
|
||||
PREDICATE: bson-null < integer T_NULL = ;
|
||||
PREDICATE: bson-ref < integer T_DBRef = ;
|
||||
|
||||
GENERIC: element-read ( type -- cont? )
|
||||
GENERIC: element-data-read ( type -- object )
|
||||
|
@ -47,27 +50,27 @@ GENERIC: element-binary-read ( length type -- object )
|
|||
: get-state ( -- state )
|
||||
state get ; inline
|
||||
|
||||
: count-bytes ( count -- )
|
||||
[ get-state ] dip '[ _ + ] change-read drop ; inline
|
||||
|
||||
: read-int32 ( -- int32 )
|
||||
4 [ read byte-array>number ] [ count-bytes ] bi ; inline
|
||||
4 read byte-array>number ; inline
|
||||
|
||||
: read-longlong ( -- longlong )
|
||||
8 [ read byte-array>number ] [ count-bytes ] bi ; inline
|
||||
8 read byte-array>number ; inline
|
||||
|
||||
: read-double ( -- double )
|
||||
8 [ read byte-array>number bits>double ] [ count-bytes ] bi ; inline
|
||||
8 read byte-array>number bits>double ; inline
|
||||
|
||||
: read-byte-raw ( -- byte-raw )
|
||||
1 [ read ] [ count-bytes ] bi ; inline
|
||||
1 read ; inline
|
||||
|
||||
: read-byte ( -- byte )
|
||||
read-byte-raw first ; inline
|
||||
|
||||
: utf8-read-until ( seps stream encoding -- string/f sep/f )
|
||||
[ { utf8 } declare decode-char dup [ dup rot member? ] [ 2drop f t ] if ]
|
||||
3curry (read-until) ;
|
||||
|
||||
: read-cstring ( -- string )
|
||||
input-stream get utf8 <decoder>
|
||||
"\0" swap stream-read-until drop ; inline
|
||||
"\0" input-stream get utf8 utf8-read-until drop ; inline
|
||||
|
||||
: read-sized-string ( length -- string )
|
||||
drop read-cstring ; inline
|
||||
|
@ -141,13 +144,13 @@ M: bson-not-eoo element-read ( type -- cont? )
|
|||
M: bson-object element-data-read ( type -- object )
|
||||
(object-data-read) ;
|
||||
|
||||
M: bson-array element-data-read ( type -- object )
|
||||
(object-data-read) ;
|
||||
|
||||
M: bson-string element-data-read ( type -- object )
|
||||
drop
|
||||
read-int32 read-sized-string ;
|
||||
|
||||
M: bson-array element-data-read ( type -- object )
|
||||
(object-data-read) ;
|
||||
|
||||
M: bson-integer element-data-read ( type -- object )
|
||||
drop
|
||||
read-int32 ;
|
||||
|
@ -191,7 +194,7 @@ PRIVATE>
|
|||
|
||||
USE: tools.continuations
|
||||
|
||||
: stream>assoc ( exemplar -- assoc bytes-read )
|
||||
: stream>assoc ( exemplar -- assoc )
|
||||
<state> dup state
|
||||
[ read-int32 >>size read-elements ] with-variable
|
||||
[ result>> ] [ read>> ] bi ;
|
||||
result>> ;
|
||||
|
|
|
@ -6,25 +6,24 @@ io.encodings.utf8 io.streams.byte-array kernel math math.parser
|
|||
namespaces quotations sequences sequences.private serialize strings
|
||||
words combinators.short-circuit literals ;
|
||||
|
||||
FROM: io.encodings.utf8.private => char>utf8 ;
|
||||
FROM: kernel.private => declare ;
|
||||
|
||||
IN: bson.writer
|
||||
|
||||
<PRIVATE
|
||||
|
||||
SYMBOL: shared-buffer
|
||||
|
||||
CONSTANT: CHAR-SIZE 1
|
||||
CONSTANT: INT32-SIZE 4
|
||||
CONSTANT: CHAR-SIZE 1
|
||||
CONSTANT: INT64-SIZE 8
|
||||
|
||||
: (buffer) ( -- buffer )
|
||||
shared-buffer get
|
||||
[ 8192 <byte-vector> [ shared-buffer set ] keep ] unless* ; inline
|
||||
|
||||
: >le-stream ( x n -- )
|
||||
swap
|
||||
'[ _ swap nth-byte 0 B{ 0 }
|
||||
[ set-nth-unsafe ] keep write ] each ; inline
|
||||
|
||||
[ BV{ } clone [ shared-buffer set ] keep ] unless*
|
||||
{ byte-vector } declare ; inline
|
||||
|
||||
PRIVATE>
|
||||
|
||||
: reset-buffer ( buffer -- )
|
||||
|
@ -33,40 +32,38 @@ PRIVATE>
|
|||
: ensure-buffer ( -- )
|
||||
(buffer) drop ; inline
|
||||
|
||||
: with-buffer ( quot -- byte-vector )
|
||||
: with-buffer ( quot: ( -- ) -- byte-vector )
|
||||
[ (buffer) [ reset-buffer ] keep dup ] dip
|
||||
with-output-stream* dup encoder? [ stream>> ] when ; inline
|
||||
with-output-stream* ; inline
|
||||
|
||||
: with-length ( quot: ( -- ) -- bytes-written start-index )
|
||||
[ (buffer) [ length ] keep ] dip call
|
||||
length swap [ - ] keep ; inline
|
||||
[ (buffer) [ length ] keep ] dip
|
||||
call length swap [ - ] keep ; inline
|
||||
|
||||
: (with-length-prefix) ( quot: ( -- ) length-quot: ( bytes-written -- length ) -- )
|
||||
[ [ B{ 0 0 0 0 } write ] prepose with-length ] dip swap
|
||||
[ call ] dip (buffer) copy ; inline
|
||||
|
||||
: with-length-prefix ( quot: ( -- ) -- )
|
||||
[ B{ 0 0 0 0 } write ] prepose with-length
|
||||
[ INT32-SIZE >le ] dip (buffer)
|
||||
'[ _ over [ nth-unsafe ] dip _ + _ set-nth-unsafe ]
|
||||
[ INT32-SIZE ] dip each-integer ; inline
|
||||
|
||||
[ INT32-SIZE >le ] (with-length-prefix) ; inline
|
||||
|
||||
: with-length-prefix-excl ( quot: ( -- ) -- )
|
||||
[ B{ 0 0 0 0 } write ] prepose with-length
|
||||
[ INT32-SIZE - INT32-SIZE >le ] dip (buffer)
|
||||
'[ _ over [ nth-unsafe ] dip _ + _ set-nth-unsafe ]
|
||||
[ INT32-SIZE ] dip each-integer ; inline
|
||||
[ INT32-SIZE [ - ] keep >le ] (with-length-prefix) ; inline
|
||||
|
||||
<PRIVATE
|
||||
|
||||
GENERIC: bson-type? ( obj -- type ) foldable flushable
|
||||
GENERIC: bson-write ( obj -- )
|
||||
GENERIC: bson-type? ( obj -- type )
|
||||
GENERIC: bson-write ( obj -- )
|
||||
|
||||
M: t bson-type? ( boolean -- type ) drop T_Boolean ;
|
||||
M: f bson-type? ( boolean -- type ) drop T_Boolean ;
|
||||
|
||||
M: real bson-type? ( real -- type ) drop T_Double ;
|
||||
M: tuple bson-type? ( tuple -- type ) drop T_Object ;
|
||||
M: sequence bson-type? ( seq -- type ) drop T_Array ;
|
||||
M: string bson-type? ( string -- type ) drop T_String ;
|
||||
M: integer bson-type? ( integer -- type ) drop T_Integer ;
|
||||
M: assoc bson-type? ( assoc -- type ) drop T_Object ;
|
||||
M: real bson-type? ( real -- type ) drop T_Double ;
|
||||
M: tuple bson-type? ( tuple -- type ) drop T_Object ;
|
||||
M: sequence bson-type? ( seq -- type ) drop T_Array ;
|
||||
M: timestamp bson-type? ( timestamp -- type ) drop T_Date ;
|
||||
M: mdbregexp bson-type? ( regexp -- type ) drop T_Regexp ;
|
||||
|
||||
|
@ -76,28 +73,27 @@ M: word bson-type? ( word -- type ) drop T_Binary ;
|
|||
M: quotation bson-type? ( quotation -- type ) drop T_Binary ;
|
||||
M: byte-array bson-type? ( byte-array -- type ) drop T_Binary ;
|
||||
|
||||
: write-utf8-string ( string -- )
|
||||
output-stream get utf8 <encoder> stream-write ; inline
|
||||
: write-utf8-string ( string -- ) output-stream get '[ _ swap char>utf8 ] each ; inline
|
||||
|
||||
: write-byte ( byte -- ) CHAR-SIZE >le-stream ; inline
|
||||
: write-int32 ( int -- ) INT32-SIZE >le-stream ; inline
|
||||
: write-double ( real -- ) double>bits INT64-SIZE >le-stream ; inline
|
||||
: write-byte ( byte -- ) CHAR-SIZE >le write ; inline
|
||||
: write-int32 ( int -- ) INT32-SIZE >le write ; inline
|
||||
: write-double ( real -- ) double>bits INT64-SIZE >le write ; inline
|
||||
: write-cstring ( string -- ) write-utf8-string 0 write-byte ; inline
|
||||
: write-longlong ( object -- ) INT64-SIZE >le-stream ; inline
|
||||
: write-longlong ( object -- ) INT64-SIZE >le write ; inline
|
||||
|
||||
: write-eoo ( -- ) T_EOO write-byte ; inline
|
||||
: write-type ( obj -- obj ) [ bson-type? write-byte ] keep ; inline
|
||||
: write-pair ( name object -- ) write-type [ write-cstring ] dip bson-write ; inline
|
||||
|
||||
M: string bson-write ( obj -- )
|
||||
'[ _ write-cstring ] with-length-prefix-excl ;
|
||||
|
||||
M: f bson-write ( f -- )
|
||||
drop 0 write-byte ;
|
||||
|
||||
M: t bson-write ( t -- )
|
||||
drop 1 write-byte ;
|
||||
|
||||
M: string bson-write ( obj -- )
|
||||
'[ _ write-cstring ] with-length-prefix-excl ;
|
||||
|
||||
M: integer bson-write ( num -- )
|
||||
write-int32 ;
|
||||
|
||||
|
@ -153,8 +149,8 @@ PRIVATE>
|
|||
[ '[ _ bson-write ] with-buffer ] with-scope ; inline
|
||||
|
||||
: assoc>stream ( assoc -- )
|
||||
bson-write ; inline
|
||||
{ assoc } declare bson-write ; inline
|
||||
|
||||
: mdb-special-value? ( value -- ? )
|
||||
{ [ timestamp? ] [ quotation? ] [ mdbregexp? ]
|
||||
[ oid? ] [ byte-array? ] } 1|| ;
|
||||
[ oid? ] [ byte-array? ] } 1|| ; inline
|
|
@ -2,6 +2,7 @@ USING: calendar math fry kernel assocs math.ranges bson.reader io.streams.byte-a
|
|||
sequences formatting combinators namespaces io tools.time prettyprint io.encodings.binary
|
||||
accessors words mongodb.driver strings math.parser bson.writer ;
|
||||
FROM: mongodb.driver => find ;
|
||||
FROM: memory => gc ;
|
||||
|
||||
IN: mongodb.benchmark
|
||||
|
||||
|
@ -175,7 +176,7 @@ CONSTANT: DOC-LARGE H{ { "base_url" "http://www.example.com/test-me" }
|
|||
|
||||
: deserialize ( doc-quot: ( i -- doc ) -- quot: ( -- ) )
|
||||
[ 0 ] dip call( i -- doc ) assoc>bv
|
||||
'[ trial-size [ _ binary [ H{ } stream>assoc 2drop ] with-byte-reader ] times ] ;
|
||||
'[ trial-size [ _ binary [ H{ } stream>assoc drop ] with-byte-reader ] times ] ;
|
||||
|
||||
: check-for-key ( assoc key -- )
|
||||
CHECK-KEY [ swap key? [ "ups... where's the key" throw ] unless ] [ 2drop ] if ;
|
||||
|
@ -246,7 +247,7 @@ CONSTANT: DOC-LARGE H{ { "base_url" "http://www.example.com/test-me" }
|
|||
: [bench-quot] ( feat-seq op-word -- quot: ( doc-word -- ) )
|
||||
'[ _ swap _
|
||||
'[ [ [ _ execute( -- quot ) ] dip
|
||||
[ execute( -- ) ] each _ execute( quot -- quot ) benchmark ] with-result ] each
|
||||
[ execute( -- ) ] each _ execute( quot -- quot ) gc benchmark ] with-result ] each
|
||||
print-separator ] ;
|
||||
|
||||
: run-serialization-bench ( doc-word-seq feat-seq -- )
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
Sascha Matzke
|
|
@ -1,101 +0,0 @@
|
|||
USING: accessors fry io io.encodings.binary io.servers.connection
|
||||
io.sockets io.streams.byte-array kernel math mongodb.msg classes formatting
|
||||
namespaces prettyprint tools.walker calendar calendar.format bson.writer.private
|
||||
json.writer mongodb.operations.private mongodb.operations ;
|
||||
|
||||
IN: mongodb.mmm
|
||||
|
||||
SYMBOLS: mmm-port mmm-server-ip mmm-server-port mmm-server mmm-dump-output mmm-t-srv ;
|
||||
|
||||
GENERIC: dump-message ( message -- )
|
||||
|
||||
: check-options ( -- )
|
||||
mmm-port get [ 27040 mmm-port set ] unless
|
||||
mmm-server-ip get [ "127.0.0.1" mmm-server-ip set ] unless
|
||||
mmm-server-port get [ 27017 mmm-server-port set ] unless
|
||||
mmm-server-ip get mmm-server-port get <inet> mmm-server set ;
|
||||
|
||||
: read-msg-binary ( -- )
|
||||
read-int32
|
||||
[ write-int32 ] keep
|
||||
4 - read write ;
|
||||
|
||||
: read-request-header ( -- msg-stub )
|
||||
mdb-msg new
|
||||
read-int32 MSG-HEADER-SIZE - >>length
|
||||
read-int32 >>req-id
|
||||
read-int32 >>resp-id
|
||||
read-int32 >>opcode ;
|
||||
|
||||
: read-request ( -- msg-stub binary )
|
||||
binary [ read-msg-binary ] with-byte-writer
|
||||
[ binary [ read-request-header ] with-byte-reader ] keep ; ! msg-stub binary
|
||||
|
||||
: dump-request ( msg-stub binary -- )
|
||||
[ mmm-dump-output get ] 2dip
|
||||
'[ _ drop _ binary [ read-message dump-message ] with-byte-reader ] with-output-stream ;
|
||||
|
||||
: read-reply ( -- binary )
|
||||
binary [ read-msg-binary ] with-byte-writer ;
|
||||
|
||||
: forward-request-read-reply ( msg-stub binary -- binary )
|
||||
[ mmm-server get binary ] 2dip
|
||||
'[ _ opcode>> _ write flush
|
||||
OP_Query =
|
||||
[ read-reply ]
|
||||
[ f ] if ] with-client ;
|
||||
|
||||
: dump-reply ( binary -- )
|
||||
[ mmm-dump-output get ] dip
|
||||
'[ _ binary [ read-message dump-message ] with-byte-reader ] with-output-stream ;
|
||||
|
||||
: message-prefix ( message -- prefix message )
|
||||
[ now timestamp>http-string ] dip
|
||||
[ class name>> ] keep
|
||||
[ "%s: %s" sprintf ] dip ; inline
|
||||
|
||||
M: mdb-query-msg dump-message ( message -- )
|
||||
message-prefix
|
||||
[ collection>> ] keep
|
||||
query>> >json
|
||||
"%s -> %s: %s \n" printf ;
|
||||
|
||||
M: mdb-insert-msg dump-message ( message -- )
|
||||
message-prefix
|
||||
[ collection>> ] keep
|
||||
objects>> >json
|
||||
"%s -> %s : %s \n" printf ;
|
||||
|
||||
M: mdb-reply-msg dump-message ( message -- )
|
||||
message-prefix
|
||||
[ cursor>> ] keep
|
||||
[ start#>> ] keep
|
||||
[ returned#>> ] keep
|
||||
objects>> >json
|
||||
"%s -> cursor: %d, start: %d, returned#: %d, -> %s \n" printf ;
|
||||
|
||||
M: mdb-msg dump-message ( message -- )
|
||||
message-prefix drop "%s \n" printf ;
|
||||
|
||||
: forward-reply ( binary -- )
|
||||
write flush ;
|
||||
|
||||
: handle-mmm-connection ( -- )
|
||||
read-request
|
||||
[ dump-request ] 2keep
|
||||
forward-request-read-reply
|
||||
[ dump-reply ] keep
|
||||
forward-reply ;
|
||||
|
||||
: start-mmm-server ( -- )
|
||||
output-stream get mmm-dump-output set
|
||||
binary <threaded-server> [ mmm-t-srv set ] keep
|
||||
"127.0.0.1" mmm-port get <inet4> >>insecure
|
||||
[ handle-mmm-connection ] >>handler
|
||||
start-server* ;
|
||||
|
||||
: run-mmm ( -- )
|
||||
check-options
|
||||
start-mmm-server ;
|
||||
|
||||
MAIN: run-mmm
|
|
@ -1 +0,0 @@
|
|||
mongo-message-monitor - a small proxy to introspect messages send to MongoDB
|
|
@ -64,61 +64,13 @@ GENERIC: (read-message) ( message opcode -- message )
|
|||
[ opcode>> ] keep [ >>opcode ] dip
|
||||
flags>> >>flags ;
|
||||
|
||||
M: mdb-query-op (read-message) ( msg-stub opcode -- message )
|
||||
drop
|
||||
[ mdb-query-msg new ] dip copy-header
|
||||
read-cstring >>collection
|
||||
read-int32 >>skip#
|
||||
read-int32 >>return#
|
||||
H{ } stream>assoc change-bytes-read >>query
|
||||
dup length>> bytes-read> >
|
||||
[ H{ } stream>assoc change-bytes-read >>returnfields ] when ;
|
||||
|
||||
M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
|
||||
drop
|
||||
[ mdb-insert-msg new ] dip copy-header
|
||||
read-cstring >>collection
|
||||
V{ } clone >>objects
|
||||
[ '[ _ length>> bytes-read> > ] ] keep tuck
|
||||
'[ H{ } stream>assoc change-bytes-read _ objects>> push ]
|
||||
while ;
|
||||
|
||||
M: mdb-delete-op (read-message) ( msg-stub opcode -- message )
|
||||
drop
|
||||
[ mdb-delete-msg new ] dip copy-header
|
||||
read-cstring >>collection
|
||||
H{ } stream>assoc change-bytes-read >>selector ;
|
||||
|
||||
M: mdb-getmore-op (read-message) ( msg-stub opcode -- message )
|
||||
drop
|
||||
[ mdb-getmore-msg new ] dip copy-header
|
||||
read-cstring >>collection
|
||||
read-int32 >>return#
|
||||
read-longlong >>cursor ;
|
||||
|
||||
M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message )
|
||||
drop
|
||||
[ mdb-killcursors-msg new ] dip copy-header
|
||||
read-int32 >>cursors#
|
||||
V{ } clone >>cursors
|
||||
[ [ cursors#>> ] keep
|
||||
'[ read-longlong _ cursors>> push ] times ] keep ;
|
||||
|
||||
M: mdb-update-op (read-message) ( msg-stub opcode -- message )
|
||||
drop
|
||||
[ mdb-update-msg new ] dip copy-header
|
||||
read-cstring >>collection
|
||||
read-int32 >>upsert?
|
||||
H{ } stream>assoc change-bytes-read >>selector
|
||||
H{ } stream>assoc change-bytes-read >>object ;
|
||||
|
||||
M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
|
||||
drop
|
||||
[ <mdb-reply-msg> ] dip copy-header
|
||||
read-longlong >>cursor
|
||||
read-int32 >>start#
|
||||
read-int32 [ >>returned# ] keep
|
||||
[ H{ } stream>assoc drop ] accumulator [ times ] dip >>objects ;
|
||||
[ H{ } stream>assoc ] accumulator [ times ] dip >>objects ;
|
||||
|
||||
: read-header ( message -- message )
|
||||
read-int32 >>length
|
||||
|
|
Loading…
Reference in New Issue