reworked connection handling, added mdb-pool connection pooling
parent
8965b31325
commit
c936517c45
|
@ -0,0 +1,82 @@
|
||||||
|
USING: accessors assocs fry io.encodings.binary io.sockets kernel math
|
||||||
|
math.parser mongodb.msg mongodb.operations namespaces destructors
|
||||||
|
constructors sequences splitting ;
|
||||||
|
|
||||||
|
IN: mongodb.connection
|
||||||
|
|
||||||
|
TUPLE: mdb-db name username password nodes collections ;
|
||||||
|
|
||||||
|
TUPLE: mdb-node master? inet ;
|
||||||
|
|
||||||
|
CONSTRUCTOR: mdb-node ( inet master? -- mdb-node ) ;
|
||||||
|
|
||||||
|
TUPLE: mdb-connection instance handle remote local ;
|
||||||
|
|
||||||
|
: (<mdb-db>) ( name nodes -- mdb-db )
|
||||||
|
mdb-db new swap >>nodes swap >>name H{ } clone >>collections ;
|
||||||
|
|
||||||
|
: master-node ( mdb -- inet )
|
||||||
|
nodes>> [ t ] dip at inet>> ;
|
||||||
|
|
||||||
|
: slave-node ( mdb -- inet )
|
||||||
|
nodes>> [ f ] dip at inet>> ;
|
||||||
|
|
||||||
|
: >mdb-connection ( stream -- )
|
||||||
|
mdb-connection set ; inline
|
||||||
|
|
||||||
|
: mdb-connection> ( -- stream )
|
||||||
|
mdb-connection get ; inline
|
||||||
|
|
||||||
|
: mdb-instance ( -- mdb )
|
||||||
|
mdb-connection> instance>> ;
|
||||||
|
|
||||||
|
<PRIVATE
|
||||||
|
|
||||||
|
|
||||||
|
: ismaster-cmd ( node -- result )
|
||||||
|
binary "admin.$cmd" H{ { "ismaster" 1 } } <mdb-query-msg>
|
||||||
|
1 >>return# '[ _ write-message read-message ] with-client
|
||||||
|
objects>> first ;
|
||||||
|
|
||||||
|
: split-host-str ( hoststr -- host port )
|
||||||
|
":" split [ first ] keep
|
||||||
|
second string>number ; inline
|
||||||
|
|
||||||
|
: eval-ismaster-result ( node result -- node result )
|
||||||
|
[ [ "ismaster" ] dip at
|
||||||
|
>fixnum 1 =
|
||||||
|
[ t >>master? ] [ f >>master? ] if ] keep ;
|
||||||
|
|
||||||
|
: check-node ( node -- node remote )
|
||||||
|
dup inet>> ismaster-cmd
|
||||||
|
eval-ismaster-result
|
||||||
|
[ "remote" ] dip at ;
|
||||||
|
|
||||||
|
PRIVATE>
|
||||||
|
|
||||||
|
: check-nodes ( node -- nodelist )
|
||||||
|
check-node
|
||||||
|
[ V{ } clone [ push ] keep ] dip
|
||||||
|
[ split-host-str <inet> [ f ] dip
|
||||||
|
mdb-node boa check-node drop
|
||||||
|
swap tuck push
|
||||||
|
] when* ;
|
||||||
|
|
||||||
|
: verify-nodes ( -- )
|
||||||
|
mdb-instance nodes>> [ t ] dip at
|
||||||
|
check-nodes
|
||||||
|
H{ } clone tuck
|
||||||
|
'[ dup master?>> _ set-at ] each
|
||||||
|
[ mdb-instance ] dip >>nodes drop ;
|
||||||
|
|
||||||
|
: mdb-open ( mdb -- connection )
|
||||||
|
mdb-connection new swap
|
||||||
|
[ >>instance ] keep
|
||||||
|
master-node [ >>remote ] keep
|
||||||
|
binary <client> [ >>handle ] dip >>local ; inline
|
||||||
|
|
||||||
|
: mdb-close ( mdb-connection -- )
|
||||||
|
[ dispose f ] change-handle drop ;
|
||||||
|
|
||||||
|
M: mdb-connection dispose
|
||||||
|
mdb-close ;
|
|
@ -212,29 +212,12 @@ HELP: load-index-list
|
||||||
}
|
}
|
||||||
{ $description "" } ;
|
{ $description "" } ;
|
||||||
|
|
||||||
HELP: master>>
|
|
||||||
{ $values
|
|
||||||
{ "mdb" null }
|
|
||||||
{ "inet" null }
|
|
||||||
}
|
|
||||||
{ $description "" } ;
|
|
||||||
|
|
||||||
HELP: mdb
|
|
||||||
{ $values
|
|
||||||
|
|
||||||
{ "mdb" null }
|
|
||||||
}
|
|
||||||
{ $description "" } ;
|
|
||||||
|
|
||||||
HELP: mdb-collection
|
HELP: mdb-collection
|
||||||
{ $var-description "" } ;
|
{ $var-description "" } ;
|
||||||
|
|
||||||
HELP: mdb-cursor
|
HELP: mdb-cursor
|
||||||
{ $var-description "" } ;
|
{ $var-description "" } ;
|
||||||
|
|
||||||
HELP: mdb-db
|
|
||||||
{ $var-description "" } ;
|
|
||||||
|
|
||||||
HELP: mdb-error
|
HELP: mdb-error
|
||||||
{ $values
|
{ $values
|
||||||
{ "id" null }
|
{ "id" null }
|
||||||
|
@ -242,12 +225,6 @@ HELP: mdb-error
|
||||||
}
|
}
|
||||||
{ $description "" } ;
|
{ $description "" } ;
|
||||||
|
|
||||||
HELP: mdb-instance
|
|
||||||
{ $var-description "" } ;
|
|
||||||
|
|
||||||
HELP: mdb-node
|
|
||||||
{ $var-description "" } ;
|
|
||||||
|
|
||||||
HELP: r/
|
HELP: r/
|
||||||
{ $values
|
{ $values
|
||||||
{ "token" null }
|
{ "token" null }
|
||||||
|
@ -277,13 +254,6 @@ HELP: skip
|
||||||
}
|
}
|
||||||
{ $description "" } ;
|
{ $description "" } ;
|
||||||
|
|
||||||
HELP: slave>>
|
|
||||||
{ $values
|
|
||||||
{ "mdb" null }
|
|
||||||
{ "inet" null }
|
|
||||||
}
|
|
||||||
{ $description "" } ;
|
|
||||||
|
|
||||||
HELP: sort
|
HELP: sort
|
||||||
{ $values
|
{ $values
|
||||||
{ "mdb-query" null }
|
{ "mdb-query" null }
|
||||||
|
|
|
@ -1,14 +1,12 @@
|
||||||
USING: accessors assocs bson.constants bson.writer combinators
|
USING: accessors assocs bson.constants bson.writer combinators
|
||||||
constructors continuations destructors formatting fry io
|
constructors continuations destructors formatting fry io io.pools
|
||||||
io.encodings.binary io.sockets io.streams.duplex kernel linked-assocs
|
io.encodings.binary io.sockets io.streams.duplex kernel linked-assocs hashtables
|
||||||
math math.parser memoize mongodb.msg mongodb.operations namespaces
|
math math.parser memoize mongodb.connection mongodb.msg mongodb.operations namespaces
|
||||||
parser prettyprint sequences sets splitting strings uuid ;
|
parser prettyprint sequences sets splitting strings uuid arrays ;
|
||||||
|
|
||||||
IN: mongodb.driver
|
IN: mongodb.driver
|
||||||
|
|
||||||
TUPLE: mdb-node master? inet ;
|
TUPLE: mdb-pool < pool { mdb mdb-db } ;
|
||||||
|
|
||||||
TUPLE: mdb-db name nodes collections ;
|
|
||||||
|
|
||||||
TUPLE: mdb-cursor collection id return# ;
|
TUPLE: mdb-cursor collection id return# ;
|
||||||
|
|
||||||
|
@ -23,6 +21,11 @@ TUPLE: mdb-collection
|
||||||
: <mdb-collection> ( name -- collection )
|
: <mdb-collection> ( name -- collection )
|
||||||
[ mdb-collection new ] dip >>name ; inline
|
[ mdb-collection new ] dip >>name ; inline
|
||||||
|
|
||||||
|
M: mdb-pool make-connection
|
||||||
|
mdb>> mdb-open ;
|
||||||
|
|
||||||
|
: <mdb-pool> ( mdb -- pool ) mdb-pool <pool> swap >>mdb ;
|
||||||
|
|
||||||
CONSTANT: MDB-GENERAL-ERROR 1
|
CONSTANT: MDB-GENERAL-ERROR 1
|
||||||
|
|
||||||
CONSTANT: PARTIAL? "partial?"
|
CONSTANT: PARTIAL? "partial?"
|
||||||
|
@ -30,49 +33,24 @@ CONSTANT: DIRTY? "dirty?"
|
||||||
|
|
||||||
ERROR: mdb-error id msg ;
|
ERROR: mdb-error id msg ;
|
||||||
|
|
||||||
SYMBOL: mdb-instance
|
|
||||||
|
|
||||||
: mdb ( -- mdb )
|
|
||||||
mdb-instance get ; inline
|
|
||||||
|
|
||||||
: master>> ( mdb -- inet )
|
|
||||||
nodes>> [ t ] dip at inet>> ;
|
|
||||||
|
|
||||||
: slave>> ( mdb -- inet )
|
|
||||||
nodes>> [ f ] dip at inet>> ;
|
|
||||||
|
|
||||||
<PRIVATE
|
<PRIVATE
|
||||||
|
|
||||||
CONSTRUCTOR: mdb-cursor ( id collection return# -- cursor ) ;
|
CONSTRUCTOR: mdb-cursor ( id collection return# -- cursor ) ;
|
||||||
|
|
||||||
SYMBOL: mdb-socket-stream
|
|
||||||
|
|
||||||
: >>mdb-stream ( stream -- )
|
|
||||||
mdb-socket-stream set ; inline
|
|
||||||
|
|
||||||
: mdb-stream>> ( -- stream )
|
|
||||||
mdb-socket-stream get ; inline
|
|
||||||
|
|
||||||
: check-ok ( result -- ? )
|
: check-ok ( result -- ? )
|
||||||
[ "ok" ] dip key? ; inline
|
[ "ok" ] dip key? ; inline
|
||||||
|
|
||||||
: >mdbregexp ( value -- regexp )
|
: >mdbregexp ( value -- regexp )
|
||||||
first <mdbregexp> ; inline
|
first <mdbregexp> ; inline
|
||||||
|
|
||||||
: prepare-mdb-session ( mdb -- stream )
|
|
||||||
[ mdb-instance set ] keep
|
|
||||||
master>> [ remote-address set ] keep
|
|
||||||
binary <client> local-address set ; inline
|
|
||||||
|
|
||||||
PRIVATE>
|
PRIVATE>
|
||||||
|
|
||||||
SYNTAX: r/ ( token -- mdbregexp )
|
SYNTAX: r/ ( token -- mdbregexp )
|
||||||
\ / [ >mdbregexp ] parse-literal ;
|
\ / [ >mdbregexp ] parse-literal ;
|
||||||
|
|
||||||
: with-db ( mdb quot -- ... )
|
: with-db ( mdb quot -- ... )
|
||||||
[ [ prepare-mdb-session ] dip
|
swap [ mdb-open &dispose >mdb-connection ] curry
|
||||||
[ >>mdb-stream ] prepose
|
prepose with-destructors ; inline
|
||||||
with-disposal ] with-scope ; inline
|
|
||||||
|
|
||||||
: build-id-selector ( assoc -- selector )
|
: build-id-selector ( assoc -- selector )
|
||||||
[ MDB_OID_FIELD swap at ] keep
|
[ MDB_OID_FIELD swap at ] keep
|
||||||
|
@ -81,76 +59,41 @@ SYNTAX: r/ ( token -- mdbregexp )
|
||||||
<PRIVATE
|
<PRIVATE
|
||||||
|
|
||||||
: index-collection ( -- ns )
|
: index-collection ( -- ns )
|
||||||
mdb name>> "%s.system.indexes" sprintf ; inline
|
mdb-instance name>> "%s.system.indexes" sprintf ; inline
|
||||||
|
|
||||||
: namespaces-collection ( -- ns )
|
: namespaces-collection ( -- ns )
|
||||||
mdb name>> "%s.system.namespaces" sprintf ; inline
|
mdb-instance name>> "%s.system.namespaces" sprintf ; inline
|
||||||
|
|
||||||
: cmd-collection ( -- ns )
|
: cmd-collection ( -- ns )
|
||||||
mdb name>> "%s.$cmd" sprintf ; inline
|
mdb-instance name>> "%s.$cmd" sprintf ; inline
|
||||||
|
|
||||||
: index-ns ( colname -- index-ns )
|
: index-ns ( colname -- index-ns )
|
||||||
[ mdb name>> ] dip "%s.%s" sprintf ; inline
|
[ mdb-instance name>> ] dip "%s.%s" sprintf ; inline
|
||||||
|
|
||||||
: ismaster-cmd ( node -- result )
|
|
||||||
binary "admin.$cmd" H{ { "ismaster" 1 } } <mdb-query-msg>
|
|
||||||
1 >>return# '[ _ write-message read-message ] with-client
|
|
||||||
objects>> first ;
|
|
||||||
|
|
||||||
: split-host-str ( hoststr -- host port )
|
|
||||||
":" split [ first ] keep
|
|
||||||
second string>number ; inline
|
|
||||||
|
|
||||||
: eval-ismaster-result ( node result -- node result )
|
|
||||||
[ [ "ismaster" ] dip at
|
|
||||||
>fixnum 1 =
|
|
||||||
[ t >>master? ] [ f >>master? ] if ] keep ;
|
|
||||||
|
|
||||||
: check-node ( node -- node remote )
|
|
||||||
dup inet>> ismaster-cmd
|
|
||||||
eval-ismaster-result
|
|
||||||
[ "remote" ] dip at ;
|
|
||||||
|
|
||||||
: check-nodes ( node -- nodelist )
|
|
||||||
check-node
|
|
||||||
[ V{ } clone [ push ] keep ] dip
|
|
||||||
[ split-host-str <inet> [ f ] dip
|
|
||||||
mdb-node boa check-node drop
|
|
||||||
swap tuck push
|
|
||||||
] when* ;
|
|
||||||
|
|
||||||
: verify-nodes ( -- )
|
|
||||||
mdb nodes>> [ t ] dip at
|
|
||||||
check-nodes
|
|
||||||
H{ } clone tuck
|
|
||||||
'[ dup master?>> _ set-at ] each
|
|
||||||
[ mdb ] dip >>nodes drop ;
|
|
||||||
|
|
||||||
: send-message ( message -- )
|
: send-message ( message -- )
|
||||||
[ mdb-stream>> ] dip '[ _ write-message ] with-stream* ;
|
[ mdb-connection> handle>> ] dip '[ _ write-message ] with-stream* ;
|
||||||
|
|
||||||
: send-query-plain ( query-message -- result )
|
: send-query-plain ( query-message -- result )
|
||||||
[ mdb-stream>> ] dip
|
[ mdb-connection> handle>> ] dip
|
||||||
'[ _ write-message read-message ] with-stream* ;
|
'[ _ write-message read-message ] with-stream* ;
|
||||||
|
|
||||||
: send-query ( query-message -- cursor result )
|
: make-cursor ( mdb-result-msg -- cursor/f )
|
||||||
|
dup cursor>> 0 >
|
||||||
|
[ [ cursor>> ] [ collection>> ] [ requested#>> ] tri <mdb-cursor> ]
|
||||||
|
[ drop f ] if ;
|
||||||
|
|
||||||
|
: send-query ( query-message -- cursor/f result )
|
||||||
[ send-query-plain ] keep
|
[ send-query-plain ] keep
|
||||||
{ [ collection>> >>collection drop ]
|
[ collection>> >>collection drop ]
|
||||||
[ return#>> >>requested# ]
|
[ return#>> >>requested# ] 2bi
|
||||||
} 2cleave
|
[ make-cursor ] [ objects>> ] bi ;
|
||||||
[ [ cursor>> 0 > ] keep
|
|
||||||
'[ _ [ cursor>> ] [ collection>> ] [ requested#>> ] tri <mdb-cursor> ]
|
|
||||||
[ f ] if
|
|
||||||
] [ objects>> ] bi ;
|
|
||||||
|
|
||||||
PRIVATE>
|
PRIVATE>
|
||||||
|
|
||||||
: <mdb> ( db host port -- mdb )
|
: <mdb> ( db host port -- mdb )
|
||||||
[ f ] 2dip <inet> mdb-node boa
|
<inet> f <mdb-node>
|
||||||
check-nodes
|
check-nodes [ [ master?>> ] keep 2array ] map
|
||||||
H{ } clone tuck
|
>hashtable (<mdb-db>) ;
|
||||||
'[ dup master?>> _ set-at ] each
|
|
||||||
H{ } clone mdb-db boa ;
|
|
||||||
|
|
||||||
GENERIC: create-collection ( name -- )
|
GENERIC: create-collection ( name -- )
|
||||||
M: string create-collection
|
M: string create-collection
|
||||||
|
@ -181,7 +124,7 @@ M: mdb-collection create-collection ( mdb-collection -- )
|
||||||
'[ _ "%s contains invalid characters ( . $ ; )" sprintf throw ] when ; inline
|
'[ _ "%s contains invalid characters ( . $ ; )" sprintf throw ] when ; inline
|
||||||
|
|
||||||
: (ensure-collection) ( collection -- )
|
: (ensure-collection) ( collection -- )
|
||||||
mdb collections>> dup keys length 0 =
|
mdb-instance collections>> dup keys length 0 =
|
||||||
[ load-collection-list
|
[ load-collection-list
|
||||||
[ [ "options" ] dip key? ] filter
|
[ [ "options" ] dip key? ] filter
|
||||||
[ [ "name" ] dip at "." split second <mdb-collection> ] map
|
[ [ "name" ] dip at "." split second <mdb-collection> ] map
|
||||||
|
@ -196,11 +139,11 @@ PRIVATE>
|
||||||
|
|
||||||
MEMO: ensure-collection ( collection -- fq-collection )
|
MEMO: ensure-collection ( collection -- fq-collection )
|
||||||
dup mdb-collection? [ name>> ] when
|
dup mdb-collection? [ name>> ] when
|
||||||
"." split1 over mdb name>> =
|
"." split1 over mdb-instance name>> =
|
||||||
[ nip ] [ drop ] if
|
[ nip ] [ drop ] if
|
||||||
[ ] [ reserved-namespace? ] bi
|
[ ] [ reserved-namespace? ] bi
|
||||||
[ [ (ensure-collection) ] keep ] unless
|
[ [ (ensure-collection) ] keep ] unless
|
||||||
[ mdb name>> ] dip "%s.%s" sprintf ; inline
|
[ mdb-instance name>> ] dip "%s.%s" sprintf ; inline
|
||||||
|
|
||||||
: <query> ( collection query -- mdb-query )
|
: <query> ( collection query -- mdb-query )
|
||||||
[ ensure-collection ] dip
|
[ ensure-collection ] dip
|
||||||
|
@ -243,7 +186,8 @@ M: mdb-query-msg explain.
|
||||||
|
|
||||||
GENERIC: find-one ( mdb-query -- result/f )
|
GENERIC: find-one ( mdb-query -- result/f )
|
||||||
M: mdb-query-msg find-one
|
M: mdb-query-msg find-one
|
||||||
1 >>return# send-query-plain objects>> [ first ] [ f ] if* ;
|
1 >>return# send-query-plain objects>>
|
||||||
|
dup empty? [ drop f ] [ first ] if ;
|
||||||
|
|
||||||
GENERIC: count ( collection selector -- result )
|
GENERIC: count ( collection selector -- result )
|
||||||
M: assoc count
|
M: assoc count
|
||||||
|
|
|
@ -66,8 +66,7 @@ PRIVATE>
|
||||||
[ ] [ MDB_ADDON_SLOTS prepend ] if ; inline
|
[ ] [ MDB_ADDON_SLOTS prepend ] if ; inline
|
||||||
|
|
||||||
: set-slot-map ( class options -- )
|
: set-slot-map ( class options -- )
|
||||||
'[ _ optl>map MDB_SLOTDEF_LIST set-word-prop ] keep
|
optl>map MDB_SLOTDEF_LIST set-word-prop ; inline
|
||||||
dup tuple-collection link-collection ; inline
|
|
||||||
|
|
||||||
M: tuple-class tuple-collection ( tuple -- mdb-collection )
|
M: tuple-class tuple-collection ( tuple -- mdb-collection )
|
||||||
(mdb-collection) ;
|
(mdb-collection) ;
|
||||||
|
|
|
@ -52,11 +52,9 @@ TUPLE: cond-value value quot ;
|
||||||
CONSTRUCTOR: cond-value ( value quot -- cond-value ) ;
|
CONSTRUCTOR: cond-value ( value quot -- cond-value ) ;
|
||||||
|
|
||||||
: write-mdb-persistent ( value quot: ( tuple -- assoc ) -- value' )
|
: write-mdb-persistent ( value quot: ( tuple -- assoc ) -- value' )
|
||||||
over needs-store? mdb-dirty-handling? get and
|
over [ (( tuple -- assoc )) call-effect ] dip
|
||||||
[ over [ (( tuple -- assoc )) call-effect ] dip
|
|
||||||
[ tuple-collection name>> ] keep
|
[ tuple-collection name>> ] keep
|
||||||
[ add-storable ] dip
|
[ add-storable ] dip
|
||||||
] [ drop ] if
|
|
||||||
[ tuple-collection name>> ] [ _id>> ] bi <objref> ; inline
|
[ tuple-collection name>> ] [ _id>> ] bi <objref> ; inline
|
||||||
|
|
||||||
: write-field ( value quot: ( tuple -- assoc ) -- value' )
|
: write-field ( value quot: ( tuple -- assoc ) -- value' )
|
||||||
|
|
|
@ -11,10 +11,9 @@ SYNTAX: MDBTUPLE:
|
||||||
define-tuple-class ;
|
define-tuple-class ;
|
||||||
|
|
||||||
: define-persistent ( class collection options -- )
|
: define-persistent ( class collection options -- )
|
||||||
[ <mdb-tuple-collection> ] dip
|
[ [ <mdb-tuple-collection> dupd link-collection ] when* ] dip
|
||||||
[ [ dup ] dip link-collection ] dip ! cl options
|
|
||||||
[ dup '[ _ mdb-persistent add-mixin-instance ] with-compilation-unit ] dip
|
[ dup '[ _ mdb-persistent add-mixin-instance ] with-compilation-unit ] dip
|
||||||
[ dup annotate-writers ] dip
|
! [ dup annotate-writers ] dip
|
||||||
set-slot-map ;
|
set-slot-map ;
|
||||||
|
|
||||||
: ensure-table ( class -- )
|
: ensure-table ( class -- )
|
||||||
|
|
Loading…
Reference in New Issue