added mongo message monitor in factor... fixed some things, unified read/write message

db4
Sascha Matzke 2009-01-28 21:11:45 +01:00
parent ef9971840d
commit fa8aa747b9
6 changed files with 169 additions and 43 deletions

View File

@ -23,7 +23,7 @@ TUPLE: mdb name nodes collections ;
: ismaster-cmd ( node -- result )
binary "admin.$cmd" H{ { "ismaster" 1 } } <mdb-query-one-msg>
'[ _ write-request read-reply ] with-client
'[ _ write-message read-message ] with-client
objects>> first ;
: -push ( seq elt -- )

View File

@ -25,7 +25,6 @@ SYMBOLS: +fieldindex+ +compoundindex+ +deepindex+ ;
"%s-%s-%s-Idx" sprintf ;
: build-index ( element slot -- assoc )
break
swap [ <linked-hash> ] 2dip
[ rest ] keep first ! assoc slot options itype
{ { +fieldindex+ [ drop [ 1 ] dip pick set-at ] }
@ -70,7 +69,7 @@ USE: mongodb.query
: load-indices ( mdb-collection -- indexlist )
[ mdb>> name>> ] dip name>> "%s.%s" sprintf
"ns" H{ } clone [ set-at ] keep [ mdb>> name>> index-ns ] dip <mdb-query-msg>
'[ _ write-request read-reply ]
'[ _ write-message read-message ]
[ mdb>> master>> binary ] dip with-client
objects>> [ [ index new ] dip
[ [ "ns" ] dip at >>ns ]
@ -96,7 +95,7 @@ USE: mongodb.query
dup length 0 >
[ [ mdb>> name>> "%s.system.indexes" sprintf ] dip
<mdb-insert-msg>
[ mdb>> master>> binary ] dip '[ _ write-request ] with-client
[ mdb>> master>> binary ] dip '[ _ write-message ] with-client
]
[ drop ] if ;

90
mongodb/mmm/mmm.factor Normal file
View File

@ -0,0 +1,90 @@
USING: accessors fry io io.encodings.binary io.servers.connection
io.sockets io.streams.byte-array kernel math mongodb.msg classes formatting
mongodb.msg.private namespaces prettyprint tools.walker calendar calendar.format ;
IN: mongodb.mmm
SYMBOLS: mmm-port mmm-server-ip mmm-server-port mmm-server mmm-dump-output mmm-t-srv ;
GENERIC: dump-message ( message -- )
: check-options ( -- )
mmm-port get [ 27040 mmm-port set ] unless
mmm-server-ip get [ "127.0.0.1" mmm-server-ip set ] unless
mmm-server-port get [ 27017 mmm-server-port set ] unless
mmm-server-ip get mmm-server-port get <inet> mmm-server set ;
: read-msg-binary ( -- )
read-int32
[ write-int32 ] keep
4 - read write ;
: read-request-header ( -- msg-stub )
mdb-msg new
read-int32 MSG-HEADER-SIZE - >>length
read-int32 >>req-id
read-int32 >>resp-id
read-int32 >>opcode ;
: read-request ( -- msg-stub binary )
binary [ read-msg-binary ] with-byte-writer
[ binary [ read-request-header ] with-byte-reader ] keep ; ! msg-stub binary
: dump-request ( msg-stub binary -- )
[ mmm-dump-output get ] 2dip
'[ _ drop _ binary [ read-message dump-message ] with-byte-reader ] with-output-stream ;
: read-reply ( -- binary )
binary [ read-msg-binary ] with-byte-writer ;
: forward-request-read-reply ( msg-stub binary -- binary )
[ mmm-server get binary ] 2dip
'[ _ opcode>> _ write flush
OP_Query =
[ read-reply ]
[ f ] if ] with-client ;
: dump-reply ( binary -- )
[ mmm-dump-output get ] dip
'[ _ binary [ read-message dump-message ] with-byte-reader ] with-output-stream ;
: message-prefix ( message -- tst name message )
[ now timestamp>http-string ] dip
[ class name>> ] keep ; inline
M: mdb-query-msg dump-message ( message -- )
message-prefix
collection>>
"%s: %s -> %s \n" printf ;
M: mdb-insert-msg dump-message ( message -- )
message-prefix
collection>>
"%s: %s -> %s \n" printf ;
M: mdb-msg dump-message ( message -- )
message-prefix drop "%s: %s \n" printf ;
: forward-reply ( binary -- )
write flush ;
: handle-mmm-connection ( -- )
read-request
[ dump-request ] 2keep
forward-request-read-reply
[ dump-reply ] keep
forward-reply ;
: start-mmm-server ( -- )
output-stream get mmm-dump-output set
<threaded-server> [ mmm-t-srv set ] keep
"127.0.0.1" mmm-port get <inet4> >>insecure
binary >>encoding
[ handle-mmm-connection ] >>handler
start-server* ;
: run-mmm ( -- )
check-options
start-mmm-server ;
MAIN: run-mmm

View File

@ -27,7 +27,7 @@ M: mdb-persistent store ( tuple -- )
prepare-store ! H { collection { ... values ... }
[ [ get-collection-fqn ] dip
values <mdb-insert-msg>
[ mdb>> master>> binary ] dip '[ _ write-request ] with-client
[ mdb>> master>> binary ] dip '[ _ write-message ] with-client
] assoc-each ;
M: mdb-persistent find ( example -- result )

View File

@ -15,13 +15,21 @@ CONSTANT: OP_GetMore 2005
CONSTANT: OP_Delete 2006
CONSTANT: OP_KillCursors 2007
PREDICATE: mdb-reply-op < integer OP_Reply = ;
PREDICATE: mdb-query-op < integer OP_Query = ;
PREDICATE: mdb-insert-op < integer OP_Insert = ;
PREDICATE: mdb-delete-op < integer OP_Delete = ;
PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
PRIVATE>
TUPLE: mdb-msg
{ opcode integer }
{ req-id integer initial: 0 }
{ resp-id integer initial: 0 }
{ length integer initial: 0 } ;
{ length integer initial: 0 }
{ flags integer initial: 0 } ;
TUPLE: mdb-insert-msg < mdb-msg
{ collection string }
@ -36,7 +44,6 @@ TUPLE: mdb-query-msg < mdb-msg
{ orderby sequence } ;
TUPLE: mdb-reply-msg < mdb-msg
{ flags integer initial: 0 }
{ cursor integer initial: 0 }
{ start# integer initial: 0 }
{ returned# integer initial: 0 }
@ -68,8 +75,7 @@ M: sequence <mdb-insert-msg> ( collection sequence -- mdb-insert-msg )
: <mdb-reply-msg> ( -- mdb-reply-msg )
mdb-reply-msg new ; inline
GENERIC: write-request ( message -- )
GENERIC: write-message ( message -- )
<PRIVATE
@ -96,36 +102,67 @@ CONSTANT: MSG-HEADER-SIZE 16
B{ } clone
(read-cstring) utf8 alien>string ; inline
PRIVATE>
GENERIC: (read-message) ( message opcode -- message )
: read-reply-header ( message -- message )
: copy-header ( message msg-stub -- message )
[ length>> ] keep [ >>length ] dip
[ req-id>> ] keep [ >>req-id ] dip
[ resp-id>> ] keep [ >>resp-id ] dip
[ opcode>> ] keep [ >>opcode ] dip
flags>> >>flags ;
M: mdb-query-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-query-msg new ] dip copy-header
read-cstring >>collection
read-int32 >>skip#
read-int32 >>return#
H{ } stream>assoc >>query ;
M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-insert-msg new ] dip copy-header
read-cstring >>collection
H{ } stream>assoc >>objects ;
M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
drop
[ <mdb-reply-msg> ] dip copy-header
read-longlong >>cursor
read-int32 >>start#
read-int32 [ >>returned# ] keep
[ H{ } stream>assoc ] accumulator [ times ] dip >>objects ;
: read-header ( message -- message )
read-int32 >>length
read-int32 >>req-id
read-int32 >>resp-id
read-int32 >>opcode ; inline
read-int32 >>opcode
read-int32 >>flags ; inline
: read-reply-message ( message -- message )
read-int32 >>flags read-longlong >>cursor
read-int32 >>start# read-int32 tuck >>returned# swap
[ H{ } stream>assoc ] accumulator [ times ] dip >>objects ; inline
: read-reply ( -- message )
<mdb-reply-msg>
read-reply-header
read-reply-message ; inline
: write-request-header ( message length -- )
: write-header ( message length -- )
MSG-HEADER-SIZE + write-int32
[ req-id>> write-int32 ] keep
[ resp-id>> write-int32 ] keep
opcode>> write-int32 ; inline
PRIVATE>
: read-message ( -- message )
mdb-msg new
read-header
[ ] [ opcode>> ] bi (read-message) ;
<PRIVATE
: (write-message) ( message quot -- )
[ binary ] dip with-byte-writer dup
[ length write-request-header ] dip
[ length write-header ] dip
write flush ; inline
M: mdb-query-msg write-request ( message -- )
PRIVATE>
M: mdb-query-msg write-message ( message -- )
dup
'[ _
[ 4 write-int32 ] dip
@ -135,7 +172,7 @@ M: mdb-query-msg write-request ( message -- )
query>> assoc>array write
] (write-message) ;
M: mdb-insert-msg write-request ( message -- )
M: mdb-insert-msg write-message ( message -- )
dup
'[ _
[ 0 write-int32 ] dip

View File

@ -20,7 +20,7 @@ TUPLE: mdb-result { cursor integer }
PRIVATE>
: (find) ( inet query -- result )
'[ _ write-request read-reply ] (execute-query) ; inline
'[ _ write-message read-message ] (execute-query) ; inline
: (find-one) ( inet query -- result )
(find) objects>> first ; inline