diff --git a/mongodb/collection/collection.factor b/mongodb/collection/collection.factor new file mode 100644 index 0000000000..c7c72d8fad --- /dev/null +++ b/mongodb/collection/collection.factor @@ -0,0 +1,105 @@ +USING: accessors assocs formatting kernel math classes sequences splitting strings + words classes.tuple vectors ; + +IN: mongodb.collection + +GENERIC: mdb-slot-definitions>> ( tuple -- string ) +GENERIC: mdb-collection>> ( object -- mdb-collection ) + +CONSTANT: MDB_COLLECTIONS "mdb_collections" + +SYMBOLS: +transient+ +load+ ; + +UNION: boolean t POSTPONE: f ; + +TUPLE: mdb-collection + { name string } + { capped boolean initial: f } + { size integer initial: -1 } + { max integer initial: -1 } + { classes sequence } ; + +USING: mongodb.persistent mongodb.msg mongodb.tuple +mongodb.connection mongodb.query mongodb.index ; + +>) ( class -- mdb-collection ) + dup props>> [ MDB_COL_PROP ] dip at + [ [ drop ] dip ] + [ superclass [ (mdb-collection>>) ] [ f ] if* ] if* ; inline recursive + +: (mdb-slot-definitions>>) ( class -- slot-defs ) + superclasses [ MDB_SLOTOPT_PROP word-prop ] map assoc-combine ; inline + +: link-class ( class collection -- ) + tuck classes>> ! col class v{} + [ 2dup member? [ 2drop ] [ push ] if ] + [ 1vector >>classes ] if* drop ; + + +PRIVATE> + +M: tuple-class mdb-collection>> ( tuple -- mdb-collection ) + (mdb-collection>>) ; + +M: mdb-persistent mdb-collection>> ( tuple -- mdb-collection ) + class (mdb-collection>>) ; + +M: mdb-persistent mdb-slot-definitions>> ( tuple -- string ) + class (mdb-slot-definitions>>) ; + +M: tuple-class mdb-slot-definitions>> ( class -- assoc ) + (mdb-slot-definitions>>) ; + +M: mdb-collection mdb-slot-definitions>> ( collection -- assoc ) + classes>> [ mdb-slot-definitions>> ] map assoc-combine ; + +: link-collection ( class collection -- ) + 2dup link-class + swap [ MDB_COL_PROP ] dip props>> set-at ; inline + +: declared-collections> ( -- assoc ) + MDB_COLLECTIONS mdb-persistent props>> at + [ H{ } clone + [ MDB_COLLECTIONS mdb-persistent props>> set-at ] keep + ] unless* ; + +: ( name -- mdb-collection ) + declared-collections> 2dup key? + [ at ] + [ [ mdb-collection new ] 2dip + [ [ >>name dup ] keep ] dip set-at ] if ; + +: load-collections ( -- collections ) + namespaces-ns + H{ } clone (find) + objects>> [ [ "name" ] dip at "." split second ] map + dup [ ensure-indices ] each + [ mdb>> ] dip >>collections collections>> ; + +: check-ok ( result -- ? ) + [ "ok" ] dip key? ; inline + +: create-collection ( mdb-collection -- ) + dup name>> "create" H{ } clone [ set-at ] keep + [ + mdb>> [ master>> ] keep name>> "%s.$cmd" sprintf + ] dip (find-one) + check-ok + [ [ ensure-indices ] keep dup name>> mdb>> collections>> set-at ] + [ "could not create collection" throw ] if ; + +: get-collection-fqn ( mdb-collection -- fqdn ) + mdb>> collections>> + dup keys length 0 = + [ drop load-collections ] + [ ] if + [ dup name>> ] dip + key? + [ ] + [ dup create-collection ] if + name>> [ mdb>> name>> ] dip "%s.%s" sprintf ; diff --git a/mongodb/connection/connection.factor b/mongodb/connection/connection.factor new file mode 100644 index 0000000000..c870ed7875 --- /dev/null +++ b/mongodb/connection/connection.factor @@ -0,0 +1,61 @@ +USING: accessors assocs fry io.sockets kernel math mongodb.msg +mongodb.query namespaces sequences splitting math.parser ; + +IN: mongodb.connection + +TUPLE: mdb-node master? inet ; + +TUPLE: mdb name nodes collections ; + +: mdb>> ( -- mdb ) + mdb get ; inline + +: with-db ( mdb quot -- ... ) + '[ _ mdb set _ call ] with-scope ; + +: master>> ( mdb -- inet ) + nodes>> [ t ] dip at inet>> ; + +: slave>> ( mdb -- inet ) + nodes>> [ f ] dip at inet>> ; + + + (find-one-raw) ; inline + +: -push ( seq elt -- ) + swap push ; inline + +: split-host-str ( hoststr -- host port ) + ":" split [ first ] keep + second string>number ; inline + +: check-nodes ( node -- nodelist ) + [ V{ } clone ] dip + [ -push ] 2keep + dup inet>> ismaster-cmd ! vec node result + dup [ "ismaster" ] dip at + >fixnum 1 = ! vec node result + [ [ t >>master? drop ] dip f ] + [ [ f >>master? drop ] dip t ] if + [ "remote" ] 2dip [ at split-host-str ] dip + swap mdb-node boa swap + [ push ] keep ; + +: verify-nodes ( -- ) + mdb>> nodes>> [ t ] dip at + check-nodes + H{ } clone tuck + '[ dup master?>> _ set-at ] each + [ mdb>> ] dip >>nodes drop ; + +PRIVATE> + +: () ( db host port -- mdb ) + [ f ] 2dip mdb-node boa + check-nodes + H{ } clone tuck + '[ dup master?>> _ set-at ] each + V{ } mdb boa ; diff --git a/mongodb/index/index.factor b/mongodb/index/index.factor new file mode 100644 index 0000000000..407abe5b48 --- /dev/null +++ b/mongodb/index/index.factor @@ -0,0 +1,99 @@ +USING: accessors assocs combinators formatting fry kernel memoize +linked-assocs mongodb.persistent mongodb.msg +sequences sequences.deep io.encodings.binary +io.sockets prettyprint sets ; + +IN: mongodb.index + +DEFER: mdb-slot-definitions>> + +TUPLE: index name ns key ; + +SYMBOLS: +fieldindex+ +compoundindex+ +deepindex+ ; + + ] 2dip + [ rest ] keep first ! assoc slot options itype + { { +fieldindex+ [ drop [ 1 ] dip pick set-at ] } + { +deepindex+ [ first "%s.%s" sprintf [ 1 ] dip pick set-at ] } + { +compoundindex+ [ + 2over swap [ 1 ] 2dip set-at [ drop ] dip ! assoc options + over '[ _ [ 1 ] 2dip set-at ] each ] } + } case ; + +: build-index-seq ( slot optlist ns -- index-seq ) + [ V{ } clone ] 3dip ! v{} slot optl ns + [ index new ] dip ! v{} slot optl index ns + >>ns + [ pick ] dip swap ! v{} slot optl index v{} + [ swap ] 2dip ! v{} optl slot index v{ } + '[ _ _ ! element slot exemplar + clone 2over swap index-name >>name ! element slot clone + [ build-index ] dip swap >>key _ push + ] each ; + +: is-index-declaration? ( entry -- ? ) + first + { { +fieldindex+ [ t ] } + { +compoundindex+ [ t ] } + { +deepindex+ [ t ] } + [ drop f ] } case ; + +: index-assoc ( seq -- assoc ) + H{ } clone tuck '[ dup name>> _ set-at ] each ; + +: delete-index ( name ns -- ) + "Drop index %s - %s" sprintf . ; + +: clean-indices ( existing defined -- ) + [ index-assoc ] bi@ assoc-diff values + [ [ name>> ] [ ns>> ] bi delete-index ] each ; + +PRIVATE> + +USE: mongodb.query + +: load-indices ( mdb-collection -- indexlist ) + [ mdb>> name>> ] dip name>> "%s.%s" sprintf + "ns" H{ } clone [ set-at ] keep [ index-ns ] dip + '[ _ write-request read-reply ] + [ mdb>> master>> binary ] dip with-client + objects>> [ [ index new ] dip + [ [ "ns" ] dip at >>ns ] + [ [ "name" ] dip at >>name ] + [ [ "key" ] dip at >>key ] tri + ] map ; + +: build-indices ( mdb-collection mdb -- seq ) + name>> + [ [ mdb-slot-definitions>> ] keep name>> ] dip + swap "%s.%s" sprintf + [ V{ } clone ] 2dip pick + '[ _ + [ [ is-index-declaration? ] filter ] dip + build-index-seq _ push + ] assoc-each flatten ; + +: ensure-indices ( mdb-collection -- ) + [ load-indices ] keep mdb>> build-indices + [ clean-indices ] keep + V{ } clone tuck + '[ _ [ tuple>query ] dip push ] each + mdb>> name>> "%s.system.indexes" sprintf >>collection + [ mdb>> master>> binary ] dip '[ _ write-request ] with-client ; + + +: show-indices ( mdb-collection -- ) + load-indices . ; diff --git a/mongodb/mongodb.factor b/mongodb/mongodb.factor index b9c15c0317..4c258eeb98 100644 --- a/mongodb/mongodb.factor +++ b/mongodb/mongodb.factor @@ -1,14 +1,19 @@ USING: accessors assocs fry io.encodings.binary io.sockets kernel math -math.parser mongodb.msg mongodb.persistent mongodb.query mongodb.tuple -namespaces sequences splitting ; +math.parser namespaces sequences splitting ; IN: mongodb -INTERSECTION: storable mdb-persistent ; +! generic methods +GENERIC: store ( tuple/ht -- ) +GENERIC: find ( example -- tuple/ht ) +GENERIC: findOne ( exampe -- tuple/ht ) +GENERIC: load ( object -- object ) + +USING: mongodb.msg mongodb.persistent mongodb.query mongodb.tuple +mongodb.collection mongodb.connection ; > get-collection-fqn ] keep H{ } tuple>query ; inline @@ -20,16 +25,8 @@ PRIVATE> () ; -GENERIC: store ( tuple/ht -- ) -GENERIC: find ( example -- tuple/ht ) - -GENERIC: findOne ( exampe -- tuple/ht ) - -GENERIC: load ( object -- object ) - - -M: storable store ( tuple -- ) +M: mdb-persistent store ( tuple -- ) prepare-store ! H { collection { ... values ... } [ [ ] 2dip [ get-collection-fqn >>collection ] dip @@ -37,11 +34,11 @@ M: storable store ( tuple -- ) [ mdb>> master>> binary ] dip '[ _ write-request ] with-client ] assoc-each ; -M: storable find ( example -- result ) +M: mdb-persistent find ( example -- result ) prepare-find (find) build-result ; -M: storable findOne ( example -- result ) +M: mdb-persistent findOne ( example -- result ) prepare-find (find-one) dup returned#>> 1 = [ objects>> first ] diff --git a/mongodb/msg/msg.factor b/mongodb/msg/msg.factor new file mode 100644 index 0000000000..faafaf4b7b --- /dev/null +++ b/mongodb/msg/msg.factor @@ -0,0 +1,144 @@ +USING: io io.encodings.utf8 io.encodings.binary alien.c-types alien.strings math +bson.writer sequences kernel accessors io.streams.byte-array fry generalizations +combinators bson.reader sequences tools.walker assocs strings mongodb.persistent ; + +IN: mongodb.msg + +DEFER: tuple>linked-assoc + + + +TUPLE: mdb-msg + { opcode integer } + { req-id integer initial: 0 } + { resp-id integer initial: 0 } + { length integer initial: 0 } ; + +TUPLE: mdb-insert-msg < mdb-msg + { collection string } + { objects sequence } ; + +TUPLE: mdb-query-msg < mdb-msg + { collection string } + { skip# integer initial: 0 } + { return# integer initial: 0 } + { query assoc } + { returnfields assoc } + { orderby sequence } ; + +TUPLE: mdb-reply-msg < mdb-msg + { flags integer initial: 0 } + { cursor integer initial: 0 } + { start# integer initial: 0 } + { returned# integer initial: 0 } + { objects sequence } ; + + +: ( collection assoc -- mdb-query-msg ) + [ mdb-query-msg new ] 2dip + [ >>collection ] dip + >>query OP_Query >>opcode ; inline + +: ( collection assoc -- mdb-query-msg ) + 1 >>return# ; inline + +GENERIC: ( sequence -- mdb-insert-msg ) + +M: tuple ( tuple -- mdb-insert-msg ) + [ mdb-insert-msg new ] dip + tuple>linked-assoc V{ } clone tuck push + >>objects OP_Insert >>opcode ; + +M: sequence ( sequence -- mdb-insert-msg ) + [ mdb-insert-msg new ] dip >>objects OP_Insert >>opcode ; + + +: ( -- mdb-reply-msg ) + mdb-reply-msg new ; inline + + +GENERIC: write-request ( message -- ) + + write ; inline +: write-int32 ( int -- ) write ; inline +: write-double ( real -- ) write ; inline +: write-cstring ( string -- ) utf8 string>alien write ; inline +: write-longlong ( object -- ) write ; inline + +: read-int32 ( -- int32 ) 4 read *int ; inline +: read-longlong ( -- longlong ) 8 read *longlong ; inline +: read-byte-raw ( -- byte-raw ) 1 read ; inline +: read-byte ( -- byte ) read-byte-raw *char ; inline + +: (read-cstring) ( acc -- acc ) + read-byte-raw dup + B{ 0 } = + [ append ] + [ append (read-cstring) ] if ; recursive inline + +: read-cstring ( -- string ) + B{ } clone + (read-cstring) utf8 alien>string ; inline + +PRIVATE> + +: read-reply-header ( message -- message ) + read-int32 >>length + read-int32 >>req-id + read-int32 >>resp-id + read-int32 >>opcode ; inline + +: read-reply-message ( message -- message ) + read-int32 >>flags read-longlong >>cursor + read-int32 >>start# read-int32 tuck >>returned# swap + [ H{ } stream>assoc ] accumulator [ times ] dip >>objects ; inline + +: read-reply ( -- message ) + + read-reply-header + read-reply-message ; inline + +: write-request-header ( message length -- ) + MSG-HEADER-SIZE + write-int32 + [ req-id>> write-int32 ] keep + [ resp-id>> write-int32 ] keep + opcode>> write-int32 ; inline + +: (write-message) ( message quot -- ) + [ binary ] dip with-byte-writer dup + [ length write-request-header ] dip + write flush ; inline + +M: mdb-query-msg write-request ( message -- ) + dup + '[ _ + [ 4 write-int32 ] dip + [ collection>> write-cstring ] keep + [ skip#>> write-int32 ] keep + [ return#>> write-int32 ] keep + query>> assoc>array write + ] (write-message) ; + +M: mdb-insert-msg write-request ( message -- ) + dup + '[ _ + [ 0 write-int32 ] dip + [ collection>> write-cstring ] keep + objects>> [ assoc>array write ] each + ] (write-message) ; + diff --git a/mongodb/persistent/persistent.factor b/mongodb/persistent/persistent.factor index c7c3fcf134..7967fd129c 100644 --- a/mongodb/persistent/persistent.factor +++ b/mongodb/persistent/persistent.factor @@ -20,13 +20,9 @@ GENERIC# tuple>assoc 1 ( tuple exemplar -- assoc ) GENERIC# tuple>query 1 ( tuple examplar -- query-assoc ) -GENERIC: mdb-collection>> ( tuple -- string ) - -GENERIC: mdb-slot-definitions>> ( tuple -- string ) - - DEFER: assoc>tuple DEFER: create-mdb-command +DEFER: mdb-collection>> > ( -- mdb ) + mdb get ; inline + +: with-db ( mdb quot -- * ) + '[ _ mdb set _ call ] with-scope ; inline + +: master>> ( mdb -- inet ) + nodes>> [ t ] dip at ; + +: slave>> ( mdb -- inet ) + nodes>> [ f ] dip at ; + +TUPLE: mdb-result { cursor integer } +{ start# integer } +{ returned# integer } +{ objects sequence } ; + +: index-ns ( -- ns ) + mdb>> name>> "%s.system.indexes" sprintf ; inline + +: namespaces-ns ( -- ns ) + mdb>> name>> "%s.system.namespaces" sprintf ; inline + + + +: (find-raw) ( inet query -- result ) + '[ _ write-request read-reply ] (execute-query) ; inline + +: (find-one-raw) ( inet query -- result ) + (find-raw) objects>> first ; inline + +: (find) ( query -- result ) + [ mdb>> master>> ] dip (find-raw) ; + +: (find-one) ( query -- result ) + [ mdb>> master>> ] dip (find-one-raw) ; + +: build-result ( resultmsg -- mdb-result ) + [ mdb-result new ] dip + { + [ cursor>> >>cursor ] + [ start#>> >>start# ] + [ returned#>> >>returned# ] + [ objects>> [ assoc>tuple ] map >>objects ] + } cleave ; + +: query-collections ( -- result ) + namespaces-ns H{ } clone (find) ; + diff --git a/mongodb/tuple/tuple.factor b/mongodb/tuple/tuple.factor new file mode 100644 index 0000000000..3a8cb09292 --- /dev/null +++ b/mongodb/tuple/tuple.factor @@ -0,0 +1,66 @@ +USING: accessors assocs classes classes.mixin classes.tuple vectors math +classes.tuple.parser formatting generalizations kernel sequences fry +prettyprint strings compiler.units slots tools.walker words arrays +mongodb.collection mongodb.persistent ; + +IN: mongodb.tuple + +> ] map [ MDB_OID ] dip memq? + [ ] + [ MDB_P_SLOTS prepend ] if ; inline + +PRIVATE> + +: show-persistence-info ( class -- ) + H{ } clone + [ [ dup mdb-collection>> "collection" ] dip set-at ] keep + [ [ mdb-slot-definitions>> "slots" ] dip set-at ] keep . ; + +GENERIC: mdb-persisted? ( tuple -- ? ) + +M: mdb-persistent mdb-persisted? ( tuple -- ? ) + _id>> f = not ; + +M: assoc mdb-persisted? ( assoc -- ? ) + [ MDB_OID ] dip key? ; inline + +: MDBTUPLE: + parse-tuple-definition + mdb-check-id-slot + define-tuple-class ; parsing + +> ! col class v{} + [ 2dup member? [ 2drop ] [ push ] if ] + [ 1vector >>classes ] if* drop ; + +: optl>assoc ( seq -- assoc ) + [ dup assoc? + [ 1array { "" } append ] unless + ] map ; + +PRIVATE> + +: set-slot-options ( class options -- ) + H{ } clone tuck '[ _ [ split-olist optl>assoc swap ] dip set-at ] each + over [ MDB_SLOTOPT_PROP ] dip props>> set-at + dup mdb-collection>> link-collection ; + +: define-collection ( class collection options -- ) + [ [ dup ] dip link-collection ] dip ! cl options + [ dup '[ _ mdb-persistent add-mixin-instance ] with-compilation-unit ] dip + set-slot-options ; +