diff --git a/mongodb/connection/connection.factor b/mongodb/connection/connection.factor index 2a7e04f504..569a68aa3b 100644 --- a/mongodb/connection/connection.factor +++ b/mongodb/connection/connection.factor @@ -23,7 +23,7 @@ TUPLE: mdb name nodes collections ; : ismaster-cmd ( node -- result ) binary "admin.$cmd" H{ { "ismaster" 1 } } - '[ _ write-request read-reply ] with-client + '[ _ write-message read-message ] with-client objects>> first ; : -push ( seq elt -- ) diff --git a/mongodb/index/index.factor b/mongodb/index/index.factor index bb930e02d2..487251c27f 100644 --- a/mongodb/index/index.factor +++ b/mongodb/index/index.factor @@ -25,7 +25,6 @@ SYMBOLS: +fieldindex+ +compoundindex+ +deepindex+ ; "%s-%s-%s-Idx" sprintf ; : build-index ( element slot -- assoc ) - break swap [ ] 2dip [ rest ] keep first ! assoc slot options itype { { +fieldindex+ [ drop [ 1 ] dip pick set-at ] } @@ -70,7 +69,7 @@ USE: mongodb.query : load-indices ( mdb-collection -- indexlist ) [ mdb>> name>> ] dip name>> "%s.%s" sprintf "ns" H{ } clone [ set-at ] keep [ mdb>> name>> index-ns ] dip - '[ _ write-request read-reply ] + '[ _ write-message read-message ] [ mdb>> master>> binary ] dip with-client objects>> [ [ index new ] dip [ [ "ns" ] dip at >>ns ] @@ -96,7 +95,7 @@ USE: mongodb.query dup length 0 > [ [ mdb>> name>> "%s.system.indexes" sprintf ] dip - [ mdb>> master>> binary ] dip '[ _ write-request ] with-client + [ mdb>> master>> binary ] dip '[ _ write-message ] with-client ] [ drop ] if ; diff --git a/mongodb/mmm/mmm.factor b/mongodb/mmm/mmm.factor new file mode 100644 index 0000000000..93281f4134 --- /dev/null +++ b/mongodb/mmm/mmm.factor @@ -0,0 +1,90 @@ +USING: accessors fry io io.encodings.binary io.servers.connection +io.sockets io.streams.byte-array kernel math mongodb.msg classes formatting +mongodb.msg.private namespaces prettyprint tools.walker calendar calendar.format ; + +IN: mongodb.mmm + +SYMBOLS: mmm-port mmm-server-ip mmm-server-port mmm-server mmm-dump-output mmm-t-srv ; + +GENERIC: dump-message ( message -- ) + +: check-options ( -- ) + mmm-port get [ 27040 mmm-port set ] unless + mmm-server-ip get [ "127.0.0.1" mmm-server-ip set ] unless + mmm-server-port get [ 27017 mmm-server-port set ] unless + mmm-server-ip get mmm-server-port get mmm-server set ; + +: read-msg-binary ( -- ) + read-int32 + [ write-int32 ] keep + 4 - read write ; + +: read-request-header ( -- msg-stub ) + mdb-msg new + read-int32 MSG-HEADER-SIZE - >>length + read-int32 >>req-id + read-int32 >>resp-id + read-int32 >>opcode ; + +: read-request ( -- msg-stub binary ) + binary [ read-msg-binary ] with-byte-writer + [ binary [ read-request-header ] with-byte-reader ] keep ; ! msg-stub binary + +: dump-request ( msg-stub binary -- ) + [ mmm-dump-output get ] 2dip + '[ _ drop _ binary [ read-message dump-message ] with-byte-reader ] with-output-stream ; + +: read-reply ( -- binary ) + binary [ read-msg-binary ] with-byte-writer ; + +: forward-request-read-reply ( msg-stub binary -- binary ) + [ mmm-server get binary ] 2dip + '[ _ opcode>> _ write flush + OP_Query = + [ read-reply ] + [ f ] if ] with-client ; + +: dump-reply ( binary -- ) + [ mmm-dump-output get ] dip + '[ _ binary [ read-message dump-message ] with-byte-reader ] with-output-stream ; + +: message-prefix ( message -- tst name message ) + [ now timestamp>http-string ] dip + [ class name>> ] keep ; inline + +M: mdb-query-msg dump-message ( message -- ) + message-prefix + collection>> + "%s: %s -> %s \n" printf ; + +M: mdb-insert-msg dump-message ( message -- ) + message-prefix + collection>> + "%s: %s -> %s \n" printf ; + +M: mdb-msg dump-message ( message -- ) + message-prefix drop "%s: %s \n" printf ; + +: forward-reply ( binary -- ) + write flush ; + +: handle-mmm-connection ( -- ) + read-request + [ dump-request ] 2keep + forward-request-read-reply + [ dump-reply ] keep + forward-reply ; + +: start-mmm-server ( -- ) + output-stream get mmm-dump-output set + [ mmm-t-srv set ] keep + "127.0.0.1" mmm-port get >>insecure + binary >>encoding + [ handle-mmm-connection ] >>handler + start-server* ; + +: run-mmm ( -- ) + check-options + start-mmm-server ; + +MAIN: run-mmm \ No newline at end of file diff --git a/mongodb/mongodb.factor b/mongodb/mongodb.factor index a1cd3d7aff..96800d3d87 100644 --- a/mongodb/mongodb.factor +++ b/mongodb/mongodb.factor @@ -27,7 +27,7 @@ M: mdb-persistent store ( tuple -- ) prepare-store ! H { collection { ... values ... } [ [ get-collection-fqn ] dip values - [ mdb>> master>> binary ] dip '[ _ write-request ] with-client + [ mdb>> master>> binary ] dip '[ _ write-message ] with-client ] assoc-each ; M: mdb-persistent find ( example -- result ) diff --git a/mongodb/msg/msg.factor b/mongodb/msg/msg.factor index e61006e01b..88d2421ce3 100644 --- a/mongodb/msg/msg.factor +++ b/mongodb/msg/msg.factor @@ -15,32 +15,39 @@ CONSTANT: OP_GetMore 2005 CONSTANT: OP_Delete 2006 CONSTANT: OP_KillCursors 2007 +PREDICATE: mdb-reply-op < integer OP_Reply = ; +PREDICATE: mdb-query-op < integer OP_Query = ; +PREDICATE: mdb-insert-op < integer OP_Insert = ; +PREDICATE: mdb-delete-op < integer OP_Delete = ; +PREDICATE: mdb-getmore-op < integer OP_GetMore = ; +PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ; + PRIVATE> TUPLE: mdb-msg - { opcode integer } - { req-id integer initial: 0 } - { resp-id integer initial: 0 } - { length integer initial: 0 } ; +{ opcode integer } +{ req-id integer initial: 0 } +{ resp-id integer initial: 0 } +{ length integer initial: 0 } +{ flags integer initial: 0 } ; TUPLE: mdb-insert-msg < mdb-msg - { collection string } - { objects sequence } ; +{ collection string } +{ objects sequence } ; TUPLE: mdb-query-msg < mdb-msg - { collection string } - { skip# integer initial: 0 } - { return# integer initial: 0 } - { query assoc } - { returnfields assoc } - { orderby sequence } ; +{ collection string } +{ skip# integer initial: 0 } +{ return# integer initial: 0 } +{ query assoc } +{ returnfields assoc } +{ orderby sequence } ; TUPLE: mdb-reply-msg < mdb-msg - { flags integer initial: 0 } - { cursor integer initial: 0 } - { start# integer initial: 0 } - { returned# integer initial: 0 } - { objects sequence } ; +{ cursor integer initial: 0 } +{ start# integer initial: 0 } +{ returned# integer initial: 0 } +{ objects sequence } ; : ( collection assoc -- mdb-query-msg ) @@ -68,8 +75,7 @@ M: sequence ( collection sequence -- mdb-insert-msg ) : ( -- mdb-reply-msg ) mdb-reply-msg new ; inline - -GENERIC: write-request ( message -- ) +GENERIC: write-message ( message -- ) string ; inline -PRIVATE> +GENERIC: (read-message) ( message opcode -- message ) -: read-reply-header ( message -- message ) +: copy-header ( message msg-stub -- message ) + [ length>> ] keep [ >>length ] dip + [ req-id>> ] keep [ >>req-id ] dip + [ resp-id>> ] keep [ >>resp-id ] dip + [ opcode>> ] keep [ >>opcode ] dip + flags>> >>flags ; + +M: mdb-query-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-query-msg new ] dip copy-header + read-cstring >>collection + read-int32 >>skip# + read-int32 >>return# + H{ } stream>assoc >>query ; + +M: mdb-insert-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-insert-msg new ] dip copy-header + read-cstring >>collection + H{ } stream>assoc >>objects ; + +M: mdb-reply-op (read-message) ( msg-stub opcode -- message ) + drop + [ ] dip copy-header + read-longlong >>cursor + read-int32 >>start# + read-int32 [ >>returned# ] keep + [ H{ } stream>assoc ] accumulator [ times ] dip >>objects ; + +: read-header ( message -- message ) read-int32 >>length read-int32 >>req-id read-int32 >>resp-id - read-int32 >>opcode ; inline + read-int32 >>opcode + read-int32 >>flags ; inline -: read-reply-message ( message -- message ) - read-int32 >>flags read-longlong >>cursor - read-int32 >>start# read-int32 tuck >>returned# swap - [ H{ } stream>assoc ] accumulator [ times ] dip >>objects ; inline - -: read-reply ( -- message ) - - read-reply-header - read-reply-message ; inline - -: write-request-header ( message length -- ) +: write-header ( message length -- ) MSG-HEADER-SIZE + write-int32 [ req-id>> write-int32 ] keep [ resp-id>> write-int32 ] keep opcode>> write-int32 ; inline +PRIVATE> + +: read-message ( -- message ) + mdb-msg new + read-header + [ ] [ opcode>> ] bi (read-message) ; + + + +M: mdb-query-msg write-message ( message -- ) dup '[ _ [ 4 write-int32 ] dip @@ -134,8 +171,8 @@ M: mdb-query-msg write-request ( message -- ) [ return#>> write-int32 ] keep query>> assoc>array write ] (write-message) ; - -M: mdb-insert-msg write-request ( message -- ) + +M: mdb-insert-msg write-message ( message -- ) dup '[ _ [ 0 write-int32 ] dip diff --git a/mongodb/query/query.factor b/mongodb/query/query.factor index aede6a267f..c3477d2678 100644 --- a/mongodb/query/query.factor +++ b/mongodb/query/query.factor @@ -20,7 +20,7 @@ TUPLE: mdb-result { cursor integer } PRIVATE> : (find) ( inet query -- result ) - '[ _ write-request read-reply ] (execute-query) ; inline + '[ _ write-message read-message ] (execute-query) ; inline : (find-one) ( inet query -- result ) (find) objects>> first ; inline