From 02a76d0a3e0be7d8ba97b1c6a60dfb3542feb0c4 Mon Sep 17 00:00:00 2001 From: Sascha Matzke Date: Sun, 1 Mar 2009 22:45:38 +0100 Subject: [PATCH] started complete rewrite... Now there's a low-level driver (mongodb.driver) implementation. Tuple integration will follow soon. --- bson/constants/constants.factor | 8 +- bson/reader/reader.factor | 7 +- bson/writer/writer.factor | 4 + mongodb/connection/connection.factor | 65 ------ mongodb/driver/driver.factor | 282 +++++++++++++++++++++++++++ mongodb/index/index.factor | 108 ---------- mongodb/mongodb.factor | 40 ++-- mongodb/msg/msg.factor | 240 ++--------------------- mongodb/operations/operations.factor | 219 +++++++++++++++++++++ mongodb/persistent/persistent.factor | 6 +- mongodb/query/query.factor | 68 ------- mongodb/tuple/tuple.factor | 203 +++++++++++-------- 12 files changed, 681 insertions(+), 569 deletions(-) delete mode 100644 mongodb/connection/connection.factor create mode 100644 mongodb/driver/driver.factor delete mode 100644 mongodb/index/index.factor create mode 100644 mongodb/operations/operations.factor delete mode 100644 mongodb/query/query.factor diff --git a/bson/constants/constants.factor b/bson/constants/constants.factor index 29144ded86..039ea18089 100644 --- a/bson/constants/constants.factor +++ b/bson/constants/constants.factor @@ -1,11 +1,13 @@ -USING: accessors constructors uuid ; +USING: accessors kernel uuid ; IN: bson.constants TUPLE: objid id ; -CONSTRUCTOR: objid ( -- objid ) - uuid1 >>id ; inline +: ( -- objid ) + objid new uuid1 >>id ; inline + +TUPLE: oid { a initial: 0 } { b initial: 0 } ; TUPLE: objref ns objid ; diff --git a/bson/reader/reader.factor b/bson/reader/reader.factor index 0f699ca499..44eadef973 100644 --- a/bson/reader/reader.factor +++ b/bson/reader/reader.factor @@ -1,6 +1,6 @@ USING: accessors assocs bson.constants byte-arrays byte-vectors fry io io.binary io.encodings.string io.encodings.utf8 kernel math namespaces -sequences serialize ; +sequences serialize arrays ; IN: bson.reader @@ -176,6 +176,11 @@ M: bson-null element-data-read ( type -- bf ) drop f ; +M: bson-oid element-data-read ( type -- oid ) + drop + read-longlong + read-int32 oid boa ; + M: bson-binary-custom element-binary-read ( size type -- dbref ) 2drop read-cstring diff --git a/bson/writer/writer.factor b/bson/writer/writer.factor index a850c86e32..439cfb7372 100644 --- a/bson/writer/writer.factor +++ b/bson/writer/writer.factor @@ -25,6 +25,7 @@ M: string bson-type? ( string -- type ) drop T_String ; M: integer bson-type? ( integer -- type ) drop T_Integer ; M: sequence bson-type? ( seq -- type ) drop T_Array ; +M: oid bson-type? ( word -- type ) drop T_OID ; M: objid bson-type? ( objid -- type ) drop T_Binary ; M: objref bson-type? ( objref -- type ) drop T_Binary ; M: quotation bson-type? ( quotation -- type ) drop T_Binary ; @@ -68,6 +69,9 @@ M: quotation bson-write ( quotation -- ) T_Binary_Function write-byte write ; +M: oid bson-write ( oid -- ) + [ a>> write-longlong ] [ b>> write-int32 ] bi ; + M: objid bson-write ( oid -- ) id>> utf8 encode [ length write-int32 ] keep diff --git a/mongodb/connection/connection.factor b/mongodb/connection/connection.factor deleted file mode 100644 index 6e608dcb63..0000000000 --- a/mongodb/connection/connection.factor +++ /dev/null @@ -1,65 +0,0 @@ -USING: accessors assocs fry io.sockets kernel math mongodb.msg -namespaces sequences splitting math.parser io.encodings.binary ; - -IN: mongodb.connection - -TUPLE: mdb-node master? inet ; - -TUPLE: mdb name nodes collections ; - -: mdb>> ( -- mdb ) - mdb get ; inline - -: with-db ( mdb quot -- ... ) - '[ _ mdb set _ call ] with-scope ; - -: master>> ( mdb -- inet ) - nodes>> [ t ] dip at inet>> ; - -: slave>> ( mdb -- inet ) - nodes>> [ f ] dip at inet>> ; - - - 1 >>return# '[ _ write-message read-message ] with-client - objects>> first ; - -: split-host-str ( hoststr -- host port ) - ":" split [ first ] keep - second string>number ; inline - -: eval-ismaster-result ( node result -- node result ) - [ [ "ismaster" ] dip at - >fixnum 1 = - [ t >>master? ] [ f >>master? ] if ] keep ; - -: check-node ( node -- node remote ) - dup inet>> ismaster-cmd - eval-ismaster-result - [ "remote" ] dip at ; - -: check-nodes ( node -- nodelist ) - check-node - [ V{ } clone [ push ] keep ] dip - [ split-host-str [ f ] dip - mdb-node boa check-node drop - swap tuck push - ] when* ; - -: verify-nodes ( -- ) - mdb>> nodes>> [ t ] dip at - check-nodes - H{ } clone tuck - '[ dup master?>> _ set-at ] each - [ mdb>> ] dip >>nodes drop ; - -PRIVATE> - -: () ( db host port -- mdb ) - [ f ] 2dip mdb-node boa - check-nodes - H{ } clone tuck - '[ dup master?>> _ set-at ] each - H{ } clone mdb boa ; diff --git a/mongodb/driver/driver.factor b/mongodb/driver/driver.factor new file mode 100644 index 0000000000..6337452174 --- /dev/null +++ b/mongodb/driver/driver.factor @@ -0,0 +1,282 @@ +USING: accessors assocs fry io.sockets kernel math mongodb.msg formatting linked-assocs destructors continuations +mongodb.operations namespaces sequences splitting math.parser io.encodings.binary combinators io.streams.duplex +arrays io memoize constructors sets strings ; + +IN: mongodb.driver + +TUPLE: mdb-node master? inet ; + +TUPLE: mdb name nodes collections ; + +TUPLE: mdb-cursor collection id return# ; + +UNION: boolean t POSTPONE: f ; + +TUPLE: mdb-collection +{ name string } +{ capped boolean initial: f } +{ size integer initial: -1 } +{ max integer initial: -1 } ; + +CONSTRUCTOR: mdb-cursor ( id collection return# -- cursor ) ; +CONSTRUCTOR: mdb-collection ( name -- collection ) ; + +CONSTANT: MDB-GENERAL-ERROR 1 + +CONSTANT: MDB_OID "_id" +CONSTANT: MDB_PROPERTIES "_mdb_" + +CONSTANT: PARTIAL? "partial?" +CONSTANT: DIRTY? "dirty?" + +ERROR: mdb-error id msg ; + +> ( -- stream ) + mdb-socket-stream get ; inline + +: check-ok ( result -- ? ) + [ "ok" ] dip key? ; inline + +PRIVATE> + +: mdb>> ( -- mdb ) + mdb get ; inline + +: master>> ( mdb -- inet ) + nodes>> [ t ] dip at inet>> ; + +: slave>> ( mdb -- inet ) + nodes>> [ f ] dip at inet>> ; + +: with-db ( mdb quot -- ... ) + [ [ '[ _ [ mdb set ] keep master>> + [ remote-address set ] keep + binary + local-address set + mdb-socket-stream set ] ] dip compose + [ mdb-stream>> [ dispose ] when* ] [ ] cleanup + ] with-scope ; + +> name>> "%s.system.indexes" sprintf ; inline + +: namespaces-collection ( -- ns ) + mdb>> name>> "%s.system.namespaces" sprintf ; inline + +: cmd-collection ( -- ns ) + mdb>> name>> "%s.$cmd" sprintf ; inline + +: index-ns ( colname -- index-ns ) + [ mdb>> name>> ] dip "%s.%s" sprintf ; inline + +: ismaster-cmd ( node -- result ) + binary "admin.$cmd" H{ { "ismaster" 1 } } + 1 >>return# '[ _ write-message read-message ] with-client + objects>> first ; + +: split-host-str ( hoststr -- host port ) + ":" split [ first ] keep + second string>number ; inline + +: eval-ismaster-result ( node result -- node result ) + [ [ "ismaster" ] dip at + >fixnum 1 = + [ t >>master? ] [ f >>master? ] if ] keep ; + +: check-node ( node -- node remote ) + dup inet>> ismaster-cmd + eval-ismaster-result + [ "remote" ] dip at ; + +: check-nodes ( node -- nodelist ) + check-node + [ V{ } clone [ push ] keep ] dip + [ split-host-str [ f ] dip + mdb-node boa check-node drop + swap tuck push + ] when* ; + +: verify-nodes ( -- ) + mdb>> nodes>> [ t ] dip at + check-nodes + H{ } clone tuck + '[ dup master?>> _ set-at ] each + [ mdb>> ] dip >>nodes drop ; + +: send-message ( message -- ) + [ mdb-stream>> ] dip '[ _ write-message ] with-stream* ; + +: send-query-plain ( query-message -- result ) + [ mdb-stream>> ] dip + '[ _ write-message read-message ] with-stream* ; + +: send-query ( query-message -- cursor result ) + [ send-query-plain ] keep + { [ collection>> >>collection drop ] + [ return#>> >>requested# ] + } 2cleave + [ [ cursor>> 0 > ] keep + '[ _ [ cursor>> ] [ collection>> ] [ requested#>> ] tri ] + [ f ] if + ] [ objects>> ] bi ; + +PRIVATE> + +: ( db host port -- mdb ) + [ f ] 2dip mdb-node boa + check-nodes + H{ } clone tuck + '[ dup master?>> _ set-at ] each + H{ } clone mdb boa ; + +: create-collection ( name -- ) + [ cmd-collection ] dip + "create" H{ } clone [ set-at ] keep + 1 >>return# send-query-plain objects>> first check-ok + [ "could not create collection" throw ] unless ; + +: load-collection-list ( -- collection-list ) + namespaces-collection + H{ } clone send-query-plain objects>> ; + + ] keep + '[ _ "%s contains invalid characters ( . $ ; )" sprintf throw ] when ; inline + +: (ensure-collection) ( collection -- ) + mdb>> collections>> dup keys length 0 = + [ load-collection-list + [ [ "options" ] dip key? ] filter + [ [ "name" ] dip at "." split second ] map + over '[ [ ] [ name>> ] bi _ set-at ] each ] [ ] if + [ dup ] dip key? [ drop ] + [ [ ensure-valid-collection-name ] keep create-collection ] if ; inline + +MEMO: reserved-namespace? ( name -- ? ) + [ "$cmd" = ] [ "system" head? ] bi or ; + +PRIVATE> + +MEMO: ensure-collection ( collection -- fq-collection ) + "." split1 over mdb>> name>> = + [ [ drop ] dip ] [ drop ] if + [ ] [ reserved-namespace? ] bi + [ [ (ensure-collection) ] keep ] unless + [ mdb>> name>> ] dip "%s.%s" sprintf ; inline + +: ( collection query -- mdb-query ) + [ ensure-collection ] dip + ; inline + +GENERIC# limit 1 ( mdb-query limit# -- mdb-query ) +M: mdb-query-msg limit ( query limit# -- mdb-query ) + >>return# ; inline + +GENERIC# skip 1 ( mdb-query skip# -- mdb-query ) +M: mdb-query-msg skip ( query skip# -- mdb-query ) + >>skip# ; inline + +: asc ( key -- spec ) [ 1 ] dip H{ } clone [ set-at ] keep ; inline +: desc ( key -- spec ) [ -1 ] dip H{ } clone [ set-at ] keep ; inline + +GENERIC# sort 1 ( mdb-query quot -- mdb-query ) +M: mdb-query-msg sort ( query qout -- mdb-query ) + [ { } ] dip with-datastack >>orderby ; + +GENERIC# hint 1 ( mdb-query index-hint -- mdb-query ) +M: mdb-query-msg hint ( mdb-query index-hint -- mdb-query ) + >>hint ; + +: find ( mdb-query -- cursor result ) + send-query ; + +: explain ( mdb-query -- result ) + t >>explain find [ drop ] dip ; + +GENERIC: get-more ( mdb-cursor -- mdb-cursor objects ) +M: mdb-cursor get-more ( mdb-cursor -- mdb-cursor objects ) + [ [ collection>> ] [ return#>> ] [ id>> ] tri send-query ] + [ f f ] if* ; + +: find-one ( mdb-query -- result ) + 1 >>return# send-query-plain ; + +: count ( collection query -- result ) + [ "count" H{ } clone [ set-at ] keep ] dip + [ over [ "query" ] dip set-at ] when* + [ cmd-collection ] dip find-one objects>> first + [ check-ok ] keep '[ "n" _ at >fixnum ] [ f ] if ; + +: lasterror ( -- error ) + cmd-collection H{ { "getlasterror" 1 } } + find-one objects>> [ "err" ] at ; + +: validate ( collection -- ) + [ cmd-collection ] dip + "validate" H{ } clone [ set-at ] keep + find-one objects>> first [ check-ok ] keep + '[ "result" _ at print ] when ; + + + +: save ( collection object -- ) + [ ensure-collection ] dip + send-message-check-error ; + +: save-unsafe ( collection object -- ) + [ ensure-collection ] dip + send-message ; + +: ensure-index ( collection name spec -- ) + H{ } clone + [ [ "key" ] dip set-at ] keep + [ [ "name" ] dip set-at ] keep + [ [ index-ns "ns" ] dip set-at ] keep + [ index-collection ] dip + save ; + +: drop-index ( collection name -- ) + H{ } clone + [ [ "index" ] dip set-at ] keep + [ [ "deleteIndexes" ] dip set-at ] keep + [ cmd-collection ] dip find-one objects>> first + check-ok [ "could not drop index" throw ] unless ; + +: update ( collection selector object -- ) + [ ensure-collection ] dip + send-message-check-error ; + +: update-unsafe ( collection selector object -- ) + [ ensure-collection ] dip + send-message ; + +: delete ( collection selector -- ) + [ ensure-collection ] dip + send-message-check-error ; + +: delete-unsafe ( collection selector -- ) + [ ensure-collection ] dip + send-message ; + +: load-index-list ( -- index-list ) + index-collection + H{ } clone find [ drop ] dip ; + +: drop-collection ( name -- ) + [ cmd-collection ] dip + "drop" H{ } clone [ set-at ] keep + find-one objects>> first check-ok + [ "could not drop collection" throw ] unless ; diff --git a/mongodb/index/index.factor b/mongodb/index/index.factor deleted file mode 100644 index 487251c27f..0000000000 --- a/mongodb/index/index.factor +++ /dev/null @@ -1,108 +0,0 @@ -USING: accessors assocs combinators formatting fry kernel memoize -linked-assocs mongodb.persistent mongodb.msg mongodb.connection -sequences sequences.deep io.encodings.binary mongodb.tuple -io.sockets prettyprint sets tools.walker math ; - -IN: mongodb.index - -: index-ns ( name -- ns ) - "%s.system.indexes" sprintf ; inline - -TUPLE: index name ns key ; - -SYMBOLS: +fieldindex+ +compoundindex+ +deepindex+ ; - - ] 2dip - [ rest ] keep first ! assoc slot options itype - { { +fieldindex+ [ drop [ 1 ] dip pick set-at ] } - { +deepindex+ [ first "%s.%s" sprintf [ 1 ] dip pick set-at ] } - { +compoundindex+ [ - 2over swap [ 1 ] 2dip set-at [ drop ] dip ! assoc options - over '[ _ [ 1 ] 2dip set-at ] each ] } - } case ; - -: build-index-seq ( slot optlist ns -- index-seq ) - [ V{ } clone ] 3dip ! v{} slot optl ns - [ index new ] dip ! v{} slot optl index ns - >>ns - [ pick ] dip swap ! v{} slot optl index v{} - [ swap ] 2dip ! v{} optl slot index v{ } - '[ _ _ ! element slot exemplar - clone 2over swap index-name >>name ! element slot clone - [ build-index ] dip swap >>key _ push - ] each ; - -: is-index-declaration? ( entry -- ? ) - first - { { +fieldindex+ [ t ] } - { +compoundindex+ [ t ] } - { +deepindex+ [ t ] } - [ drop f ] } case ; - -: index-assoc ( seq -- assoc ) - H{ } clone tuck '[ dup name>> _ set-at ] each ; - -: delete-index ( name ns -- ) - "Drop index %s - %s" sprintf . ; - -: clean-indices ( existing defined -- ) - [ index-assoc ] bi@ assoc-diff values - [ [ name>> ] [ ns>> ] bi delete-index ] each ; - -PRIVATE> - -USE: mongodb.query - -: load-indices ( mdb-collection -- indexlist ) - [ mdb>> name>> ] dip name>> "%s.%s" sprintf - "ns" H{ } clone [ set-at ] keep [ mdb>> name>> index-ns ] dip - '[ _ write-message read-message ] - [ mdb>> master>> binary ] dip with-client - objects>> [ [ index new ] dip - [ [ "ns" ] dip at >>ns ] - [ [ "name" ] dip at >>name ] - [ [ "key" ] dip at >>key ] tri - ] map ; - -: build-indices ( mdb-collection mdb -- seq ) - name>> - [ [ mdb-slot-definitions>> ] keep name>> ] dip - swap "%s.%s" sprintf - [ V{ } clone ] 2dip pick - '[ _ - [ [ is-index-declaration? ] filter ] dip - build-index-seq _ push - ] assoc-each flatten ; - -: ensure-indices ( mdb-collection -- ) - [ load-indices ] keep mdb>> build-indices - [ clean-indices ] keep - V{ } clone tuck - '[ _ [ tuple>query ] dip push ] each - dup length 0 > - [ [ mdb>> name>> "%s.system.indexes" sprintf ] dip - - [ mdb>> master>> binary ] dip '[ _ write-message ] with-client - ] - [ drop ] if ; - -: show-indices ( mdb-collection -- ) - load-indices . ; - -: show-all-indices ( -- ) - mdb>> collections>> values - V{ } clone tuck - '[ load-indices _ push ] each flatten . ; \ No newline at end of file diff --git a/mongodb/mongodb.factor b/mongodb/mongodb.factor index 69c2809a1e..28ca6acc25 100644 --- a/mongodb/mongodb.factor +++ b/mongodb/mongodb.factor @@ -1,7 +1,8 @@ -USING: accessors assocs fry io.encodings.binary io.sockets kernel math -math.parser namespaces sequences splitting -mongodb.connection mongodb.persistent mongodb.msg mongodb.query -mongodb.tuple ; +USING: accessors assocs combinators fry io.encodings.binary +io.sockets kernel math math.parser mongodb.driver +mongodb.msg mongodb.operations mongodb.persistent +mongodb.tuple namespaces +sequences splitting ; IN: mongodb @@ -18,29 +19,32 @@ GENERIC: explain ( object -- object ) [ mdb-collection>> get-collection-fqn ] keep H{ } tuple>query ; inline +TUPLE: mdb-result { cursor integer } +{ start# integer } +{ returned# integer } +{ objects sequence } ; + +: build-result ( resultmsg -- mdb-result ) + [ mdb-result new ] dip + { + [ cursor>> >>cursor ] + [ start#>> >>start# ] + [ returned#>> >>returned# ] + [ objects>> [ assoc>tuple ] map >>objects ] + } cleave ; + PRIVATE> - -: ( db host port -- mdb ) - () ; - M: mdb-persistent store ( tuple -- ) prepare-store ! H { collection { ... values ... } [ [ get-collection-fqn ] dip - values - [ mdb>> master>> binary ] dip '[ _ write-message ] with-client + values send-message ] assoc-each ; M: mdb-persistent find ( example -- result ) - prepare-find [ mdb>> master>> ] dip (find) + prepare-find [ mdb>> master>> ] dip send-query build-result ; M: mdb-persistent nfind ( example n -- result ) [ prepare-find ] dip >>return# - [ mdb>> master>> ] dip (find) - build-result ; - -M: mdb-persistent explain ( example -- result ) - prepare-find [ query>> [ t "$explain" ] dip set-at ] keep - [ mdb>> master>> ] dip (find-one) - build-result ; + send-query build-result ; diff --git a/mongodb/msg/msg.factor b/mongodb/msg/msg.factor index 1df971b229..636e5e6755 100644 --- a/mongodb/msg/msg.factor +++ b/mongodb/msg/msg.factor @@ -1,12 +1,8 @@ -USING: accessors io.encodings.string assocs bson.reader -bson.writer byte-arrays byte-vectors constructors fry io io.binary -io.encodings.binary io.encodings.utf8 io.streams.byte-array kernel -linked-assocs math namespaces sequences strings ; +USING: accessors assocs constructors kernel linked-assocs math +sequences strings ; IN: mongodb.msg - +CONSTANT: OP_KillCursors 2007 TUPLE: mdb-msg { opcode integer } @@ -39,7 +25,7 @@ TUPLE: mdb-insert-msg < mdb-msg TUPLE: mdb-update-msg < mdb-msg { collection string } -{ upsert? integer initial: 1 } +{ upsert? integer initial: 0 } { selector assoc } { object assoc } ; @@ -62,16 +48,19 @@ TUPLE: mdb-query-msg < mdb-msg { return# integer initial: 0 } { query assoc } { returnfields assoc } -{ orderby sequence } ; +{ orderby sequence } +explain hint ; TUPLE: mdb-reply-msg < mdb-msg +{ collection string } { cursor integer initial: 0 } { start# integer initial: 0 } +{ requested# integer initial: 0 } { returned# integer initial: 0 } { objects sequence } ; -CONSTRUCTOR: mdb-getmore-msg ( collection return# -- mdb-getmore-msg ) +CONSTRUCTOR: mdb-getmore-msg ( collection return# cursor -- mdb-getmore-msg ) OP_GetMore >>opcode ; inline CONSTRUCTOR: mdb-delete-msg ( collection selector -- mdb-delete-msg ) @@ -90,213 +79,22 @@ M: sequence ( sequences -- mdb-killcursors-msg ) M: integer ( integer -- mdb-killcursors-msg ) V{ } clone [ push ] keep ; -GENERIC# 1 ( collection objects -- mdb-insert-msg ) - -M: linked-assoc ( collection linked-assoc -- mdb-insert-msg ) - [ mdb-insert-msg new ] 2dip - [ >>collection ] dip - V{ } clone tuck push - >>objects OP_Insert >>opcode ; +GENERIC: ( collection objects -- mdb-insert-msg ) M: sequence ( collection sequence -- mdb-insert-msg ) [ mdb-insert-msg new ] 2dip [ >>collection ] dip >>objects OP_Insert >>opcode ; -CONSTRUCTOR: mdb-update-msg ( collection object -- mdb-update-msg ) - dup object>> [ "_id" ] dip at "_id" H{ } clone [ set-at ] keep >>selector - OP_Update >>opcode ; +M: assoc ( collection assoc -- mdb-insert-msg ) + [ mdb-insert-msg new ] 2dip + [ >>collection ] dip + V{ } clone tuck push + >>objects OP_Insert >>opcode ; + + +CONSTRUCTOR: mdb-update-msg ( collection selector object -- mdb-update-msg ) + OP_Update >>opcode ; inline CONSTRUCTOR: mdb-reply-msg ( -- mdb-reply-msg ) ; inline -GENERIC: write-message ( message -- ) - - ( -- integer ) - msg-bytes-read get ; inline - -: >bytes-read ( integer -- ) - msg-bytes-read set ; inline - -: change-bytes-read ( integer -- ) - bytes-read> [ 0 ] unless* + >bytes-read ; inline - -: write-byte ( byte -- ) 1 >le write ; inline -: write-int32 ( int -- ) 4 >le write ; inline -: write-double ( real -- ) double>bits 8 >le write ; inline -: write-cstring ( string -- ) utf8 encode B{ 0 } append write ; inline -: write-longlong ( object -- ) 8 >le write ; inline - -: read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline -: read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline -: read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline -: read-byte ( -- byte ) read-byte-raw first ; inline - -: (read-cstring) ( acc -- ) - [ read-byte ] dip ! b acc - 2dup push ! b acc - [ 0 = ] dip ! bool acc - '[ _ (read-cstring) ] unless ; inline recursive - -: read-cstring ( -- string ) - BV{ } clone - [ (read-cstring) ] keep - [ zero? ] trim-tail - >byte-array utf8 decode ; inline - -GENERIC: (read-message) ( message opcode -- message ) - -: copy-header ( message msg-stub -- message ) - [ length>> ] keep [ >>length ] dip - [ req-id>> ] keep [ >>req-id ] dip - [ resp-id>> ] keep [ >>resp-id ] dip - [ opcode>> ] keep [ >>opcode ] dip - flags>> >>flags ; - -M: mdb-query-op (read-message) ( msg-stub opcode -- message ) - drop - [ mdb-query-msg new ] dip copy-header - read-cstring >>collection - read-int32 >>skip# - read-int32 >>return# - H{ } stream>assoc change-bytes-read >>query - dup length>> bytes-read> > - [ H{ } stream>assoc change-bytes-read >>returnfields - dup length>> bytes-read> > - [ H{ } stream>assoc drop >>orderby ] when - ] when ; - -M: mdb-insert-op (read-message) ( msg-stub opcode -- message ) - drop - [ mdb-insert-msg new ] dip copy-header - read-cstring >>collection - V{ } clone >>objects - [ '[ _ length>> bytes-read> > ] ] keep tuck - '[ H{ } stream>assoc change-bytes-read _ objects>> push ] - [ ] while ; - -M: mdb-delete-op (read-message) ( msg-stub opcode -- message ) - drop - [ mdb-delete-msg new ] dip copy-header - read-cstring >>collection - H{ } stream>assoc change-bytes-read >>selector ; - -M: mdb-getmore-op (read-message) ( msg-stub opcode -- message ) - drop - [ mdb-getmore-msg new ] dip copy-header - read-cstring >>collection - read-int32 >>return# - read-longlong >>cursor ; - -M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message ) - drop - [ mdb-killcursors-msg new ] dip copy-header - read-int32 >>cursors# - V{ } clone >>cursors - [ [ cursors#>> ] keep - '[ read-longlong _ cursors>> push ] times ] keep ; - -M: mdb-update-op (read-message) ( msg-stub opcode -- message ) - drop - [ mdb-update-msg new ] dip copy-header - read-cstring >>collection - read-int32 >>upsert? - H{ } stream>assoc change-bytes-read >>selector - H{ } stream>assoc change-bytes-read >>object ; - -M: mdb-reply-op (read-message) ( msg-stub opcode -- message ) - drop - [ ] dip copy-header - read-longlong >>cursor - read-int32 >>start# - read-int32 [ >>returned# ] keep - [ H{ } stream>assoc drop ] accumulator [ times ] dip >>objects ; - -: read-header ( message -- message ) - read-int32 >>length - read-int32 >>req-id - read-int32 >>resp-id - read-int32 >>opcode - read-int32 >>flags ; inline - -: write-header ( message length -- ) - MSG-HEADER-SIZE + write-int32 - [ req-id>> write-int32 ] keep - [ resp-id>> write-int32 ] keep - opcode>> write-int32 ; inline - -PRIVATE> - -: read-message ( -- message ) - mdb-msg new - 0 >bytes-read - read-header - [ ] [ opcode>> ] bi (read-message) ; - - - -M: mdb-query-msg write-message ( message -- ) - dup - '[ _ - [ flags>> write-int32 ] keep - [ collection>> write-cstring ] keep - [ skip#>> write-int32 ] keep - [ return#>> write-int32 ] keep - query>> assoc>array write - ] (write-message) ; - -M: mdb-insert-msg write-message ( message -- ) - dup - '[ _ - [ flags>> write-int32 ] keep - [ collection>> write-cstring ] keep - objects>> [ assoc>array write ] each - ] (write-message) ; - -M: mdb-update-msg write-message ( message -- ) - dup - '[ _ - [ flags>> write-int32 ] keep - [ collection>> write-cstring ] keep - [ upsert?>> write-int32 ] keep - [ selector>> assoc>array write ] keep - object>> assoc>array write - ] (write-message) ; - -M: mdb-delete-msg write-message ( message -- ) - dup - '[ _ - [ flags>> write-int32 ] keep - [ collection>> write-cstring ] keep - 0 write-int32 - selector>> assoc>array write - ] (write-message) ; - -M: mdb-getmore-msg write-message ( message -- ) - dup - '[ _ - [ flags>> write-int32 ] keep - [ collection>> write-cstring ] keep - [ return#>> write-int32 ] keep - cursor>> write-longlong - ] (write-message) ; - -M: mdb-killcursors-msg write-message ( message -- ) - dup - '[ _ - [ flags>> write-int32 ] keep - [ cursors#>> write-int32 ] keep - cursors>> [ write-longlong ] each - ] (write-message) ; \ No newline at end of file diff --git a/mongodb/operations/operations.factor b/mongodb/operations/operations.factor new file mode 100644 index 0000000000..e628251103 --- /dev/null +++ b/mongodb/operations/operations.factor @@ -0,0 +1,219 @@ +USING: accessors bson.reader bson.writer byte-arrays byte-vectors fry +io io.binary io.encodings.binary io.encodings.string io.encodings.utf8 +io.streams.byte-array kernel math mongodb.msg namespaces sequences +locals assocs combinators linked-assocs ; + +IN: mongodb.operations + + + +GENERIC: write-message ( message -- ) + + ( -- integer ) + msg-bytes-read get ; inline + +: >bytes-read ( integer -- ) + msg-bytes-read set ; inline + +: change-bytes-read ( integer -- ) + bytes-read> [ 0 ] unless* + >bytes-read ; inline + +: write-byte ( byte -- ) 1 >le write ; inline +: write-int32 ( int -- ) 4 >le write ; inline +: write-double ( real -- ) double>bits 8 >le write ; inline +: write-cstring ( string -- ) utf8 encode B{ 0 } append write ; inline +: write-longlong ( object -- ) 8 >le write ; inline + +: read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline +: read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline +: read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline +: read-byte ( -- byte ) read-byte-raw first ; inline + +: (read-cstring) ( acc -- ) + [ read-byte ] dip ! b acc + 2dup push ! b acc + [ 0 = ] dip ! bool acc + '[ _ (read-cstring) ] unless ; inline recursive + +: read-cstring ( -- string ) + BV{ } clone + [ (read-cstring) ] keep + [ zero? ] trim-tail + >byte-array utf8 decode ; inline + +GENERIC: (read-message) ( message opcode -- message ) + +: copy-header ( message msg-stub -- message ) + [ length>> ] keep [ >>length ] dip + [ req-id>> ] keep [ >>req-id ] dip + [ resp-id>> ] keep [ >>resp-id ] dip + [ opcode>> ] keep [ >>opcode ] dip + flags>> >>flags ; + +M: mdb-query-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-query-msg new ] dip copy-header + read-cstring >>collection + read-int32 >>skip# + read-int32 >>return# + H{ } stream>assoc change-bytes-read >>query + dup length>> bytes-read> > + [ H{ } stream>assoc change-bytes-read >>returnfields ] when ; + +M: mdb-insert-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-insert-msg new ] dip copy-header + read-cstring >>collection + V{ } clone >>objects + [ '[ _ length>> bytes-read> > ] ] keep tuck + '[ H{ } stream>assoc change-bytes-read _ objects>> push ] + while ; + +M: mdb-delete-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-delete-msg new ] dip copy-header + read-cstring >>collection + H{ } stream>assoc change-bytes-read >>selector ; + +M: mdb-getmore-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-getmore-msg new ] dip copy-header + read-cstring >>collection + read-int32 >>return# + read-longlong >>cursor ; + +M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-killcursors-msg new ] dip copy-header + read-int32 >>cursors# + V{ } clone >>cursors + [ [ cursors#>> ] keep + '[ read-longlong _ cursors>> push ] times ] keep ; + +M: mdb-update-op (read-message) ( msg-stub opcode -- message ) + drop + [ mdb-update-msg new ] dip copy-header + read-cstring >>collection + read-int32 >>upsert? + H{ } stream>assoc change-bytes-read >>selector + H{ } stream>assoc change-bytes-read >>object ; + +M: mdb-reply-op (read-message) ( msg-stub opcode -- message ) + drop + [ ] dip copy-header + read-longlong >>cursor + read-int32 >>start# + read-int32 [ >>returned# ] keep + [ stream>assoc drop ] accumulator [ times ] dip >>objects ; + +: read-header ( message -- message ) + read-int32 >>length + read-int32 >>req-id + read-int32 >>resp-id + read-int32 >>opcode + read-int32 >>flags ; inline + +: write-header ( message length -- ) + MSG-HEADER-SIZE + write-int32 + [ req-id>> write-int32 ] keep + [ resp-id>> write-int32 ] keep + opcode>> write-int32 ; inline + +PRIVATE> + +: read-message ( -- message ) + mdb-msg new + 0 >bytes-read + read-header + [ ] [ opcode>> ] bi (read-message) ; + + ] | + { [ orderby>> [ "orderby" selector set-at ] when* ] + [ explain>> [ "$explain" selector set-at ] when* ] + [ hint>> [ "$hint" selector set-at ] when* ] + [ query>> "query" selector set-at ] + } cleave + selector + ] ; + +PRIVATE> + +M: mdb-query-msg write-message ( message -- ) + dup + '[ _ + [ flags>> write-int32 ] keep + [ collection>> write-cstring ] keep + [ skip#>> write-int32 ] keep + [ return#>> write-int32 ] keep + [ build-query-object assoc>array write ] keep + returnfields>> [ assoc>array write ] when* + ] (write-message) ; + +M: mdb-insert-msg write-message ( message -- ) + dup + '[ _ + [ flags>> write-int32 ] keep + [ collection>> write-cstring ] keep + objects>> [ assoc>array write ] each + ] (write-message) ; + +M: mdb-update-msg write-message ( message -- ) + dup + '[ _ + [ flags>> write-int32 ] keep + [ collection>> write-cstring ] keep + [ upsert?>> write-int32 ] keep + [ selector>> assoc>array write ] keep + object>> assoc>array write + ] (write-message) ; + +M: mdb-delete-msg write-message ( message -- ) + dup + '[ _ + [ flags>> write-int32 ] keep + [ collection>> write-cstring ] keep + 0 write-int32 + selector>> assoc>array write + ] (write-message) ; + +M: mdb-getmore-msg write-message ( message -- ) + dup + '[ _ + [ flags>> write-int32 ] keep + [ collection>> write-cstring ] keep + [ return#>> write-int32 ] keep + cursor>> write-longlong + ] (write-message) ; + +M: mdb-killcursors-msg write-message ( message -- ) + dup + '[ _ + [ flags>> write-int32 ] keep + [ cursors#>> write-int32 ] keep + cursors>> [ write-longlong ] each + ] (write-message) ; + diff --git a/mongodb/persistent/persistent.factor b/mongodb/persistent/persistent.factor index f83d06905c..dc5ddb614b 100644 --- a/mongodb/persistent/persistent.factor +++ b/mongodb/persistent/persistent.factor @@ -1,6 +1,6 @@ USING: accessors assocs classes fry kernel linked-assocs math mirrors namespaces sequences strings vectors words bson.constants -continuations mongodb.tuple ; +continuations mongodb.driver mongodb.tuple ; IN: mongodb.persistent @@ -18,10 +18,10 @@ DEFER: create-mdb-command ( tuple -- objref ) - [ mdb-collection>> ] [ _id>> ] bi objref boa ; inline + [ mdb-collection-prop ] [ _id>> ] bi objref boa ; inline : mdbinfo>tuple-class ( mdbinfo -- class ) [ first ] keep second lookup ; inline diff --git a/mongodb/query/query.factor b/mongodb/query/query.factor deleted file mode 100644 index ca3b059537..0000000000 --- a/mongodb/query/query.factor +++ /dev/null @@ -1,68 +0,0 @@ -USING: accessors combinators fry io.encodings.binary io.sockets kernel -mongodb.msg mongodb.persistent mongodb.connection sequences math namespaces assocs -formatting splitting mongodb.tuple mongodb.index ; - -IN: mongodb.query - -TUPLE: mdb-result { cursor integer } -{ start# integer } -{ returned# integer } -{ objects sequence } ; - -: namespaces-ns ( name -- ns ) - "%s.system.namespaces" sprintf ; inline - - - -: (find) ( inet query -- result ) - '[ _ write-message read-message ] (execute-query) ; inline - -: (find-one) ( inet query -- result ) - 1 >>return# - (find) ; inline - -: build-result ( resultmsg -- mdb-result ) - [ mdb-result new ] dip - { - [ cursor>> >>cursor ] - [ start#>> >>start# ] - [ returned#>> >>returned# ] - [ objects>> [ assoc>tuple ] map >>objects ] - } cleave ; - -: load-collections ( -- collections ) - mdb>> [ master>> ] [ name>> namespaces-ns ] bi - H{ } clone (find) - objects>> [ [ "name" ] dip at "." split second ] map - H{ } clone tuck - '[ [ ensure-indices ] [ ] [ name>> ] tri _ set-at ] each - [ mdb>> ] dip >>collections collections>> ; - -: check-ok ( result -- ? ) - [ "ok" ] dip key? ; inline - -: create-collection ( mdb-collection -- ) - dup name>> "create" H{ } clone [ set-at ] keep - [ mdb>> [ master>> ] [ name>> ] bi "%s.$cmd" sprintf ] dip - (find-one) objects>> first - check-ok - [ [ ensure-indices ] keep dup name>> mdb>> collections>> set-at ] - [ "could not create collection" throw ] if ; - -: get-collection-fqn ( mdb-collection -- fqdn ) - mdb>> collections>> - dup keys length 0 = - [ drop load-collections ] - [ ] if - [ dup name>> ] dip - key? - [ ] - [ dup create-collection ] if - name>> [ mdb>> name>> ] dip "%s.%s" sprintf ; - - \ No newline at end of file diff --git a/mongodb/tuple/tuple.factor b/mongodb/tuple/tuple.factor index 16e408d78e..34591a5d4a 100644 --- a/mongodb/tuple/tuple.factor +++ b/mongodb/tuple/tuple.factor @@ -1,131 +1,170 @@ USING: accessors assocs classes classes.mixin classes.tuple vectors math -classes.tuple.parser formatting generalizations kernel sequences fry -prettyprint strings compiler.units slots tools.walker words arrays mongodb.persistent ; +classes.tuple.parser formatting generalizations kernel sequences fry combinators +linked-assocs sequences.deep mongodb.driver continuations memoize +prettyprint strings compiler.units slots tools.walker words arrays ; IN: mongodb.tuple MIXIN: mdb-persistent - -GENERIC: mdb-slot-definitions>> ( tuple -- string ) -GENERIC: mdb-collection>> ( object -- mdb-collection ) - -CONSTANT: MDB_COLLECTIONS "mdb_collections" -CONSTANT: MDB_COL_PROP "mdb_collection" -CONSTANT: MDB_SLOTOPT_PROP "mdb_slot_options" - SLOT: _id -CONSTANT: MDB_P_SLOTS { "_id" } -CONSTANT: MDB_OID "_id" +SLOT: _mdb_ -SYMBOLS: +transient+ +load+ ; +GENERIC: mdb-collection-prop ( object -- mdb-collection ) +GENERIC: mdb-slot-list ( tuple -- string ) -UNION: boolean t POSTPONE: f ; +CONSTANT: MDB_COLLECTION_MAP "_mdb_col_map" +CONSTANT: MDB_COLLECTION "_mdb_col" +CONSTANT: MDB_SLOTDEF_LIST "_mdb_slot_list" -TUPLE: mdb-collection - { name string } - { capped boolean initial: f } - { size integer initial: -1 } - { max integer initial: -1 } - { classes sequence } ; +SYMBOLS: +transient+ +load+ +fieldindex+ +compoundindex+ +deepindex+ ; + +TUPLE: mdb-tuple-collection < mdb-collection { classes sequence } ; +TUPLE: mdb-tuple-index name key ; + +USE: mongodb.persistent >) ( class -- mdb-collection ) - dup props>> [ MDB_COL_PROP ] dip at - [ [ drop ] dip ] - [ superclass [ (mdb-collection>>) ] [ f ] if* ] if* ; inline recursive +: MDB_ADDON_SLOTS ( -- slots ) + { } [ MDB_OID MDB_PROPERTIES ] with-datastack ; inline -: (mdb-slot-definitions>>) ( class -- slot-defs ) - superclasses [ MDB_SLOTOPT_PROP word-prop ] map assoc-combine ; inline +: (mdb-collection) ( class -- mdb-collection ) + dup MDB_COLLECTION word-prop + [ [ drop ] dip ] + [ superclass [ (mdb-collection) ] [ f ] if* ] if* ; inline recursive + +: (mdb-slot-list) ( class -- slot-defs ) + superclasses [ MDB_SLOTDEF_LIST word-prop ] map assoc-combine ; inline : link-class ( class collection -- ) - tuck classes>> ! col class v{} + over classes>> [ 2dup member? [ 2drop ] [ push ] if ] - [ 1vector >>classes ] if* drop ; + [ 1vector >>classes ] if* drop ; inline + +: link-collection ( class collection -- ) + [ swap link-class ] [ MDB_COLLECTION set-word-prop ] 2bi ; inline PRIVATE> -M: tuple-class mdb-collection>> ( tuple -- mdb-collection ) - (mdb-collection>>) ; +M: tuple-class mdb-collection-prop ( tuple -- mdb-collection ) + (mdb-collection) ; -M: mdb-persistent mdb-collection>> ( tuple -- mdb-collection ) - class (mdb-collection>>) ; +M: mdb-persistent mdb-collection-prop ( tuple -- mdb-collection ) + class (mdb-collection) ; -M: mdb-persistent mdb-slot-definitions>> ( tuple -- string ) - class (mdb-slot-definitions>>) ; +M: mdb-persistent mdb-slot-list ( tuple -- string ) + class (mdb-slot-list) ; -M: tuple-class mdb-slot-definitions>> ( class -- assoc ) - (mdb-slot-definitions>>) ; +M: tuple-class mdb-slot-list ( class -- assoc ) + (mdb-slot-list) ; -M: mdb-collection mdb-slot-definitions>> ( collection -- assoc ) - classes>> [ mdb-slot-definitions>> ] map assoc-combine ; - -: link-collection ( class collection -- ) - 2dup link-class - swap [ MDB_COL_PROP ] dip props>> set-at ; inline - -: declared-collections> ( -- assoc ) - MDB_COLLECTIONS mdb-persistent props>> at - [ H{ } clone - [ MDB_COLLECTIONS mdb-persistent props>> set-at ] keep - ] unless* ; - -: ( name -- mdb-collection ) - declared-collections> 2dup key? - [ at ] - [ [ mdb-collection new ] 2dip - [ [ >>name dup ] keep ] dip set-at ] if ; +M: mdb-collection mdb-slot-list ( collection -- assoc ) + classes>> [ mdb-slot-list ] map assoc-combine ; +: collection-map ( -- assoc ) + MDB_COLLECTION_MAP mdb-persistent word-prop + [ mdb-persistent MDB_COLLECTION_MAP H{ } clone + [ set-word-prop ] keep ] unless* ; inline + +: ( name -- mdb-tuple-collection ) + collection-map [ ] [ key? ] 2bi + [ at ] [ [ mdb-tuple-collection new dup ] 2dip + [ [ >>name ] keep ] dip set-at ] if ; + > ] map [ MDB_OID ] dip memq? - [ ] - [ MDB_P_SLOTS prepend ] if ; inline +: mdb-check-slots ( superclass slots -- superclass slots ) + over all-slots [ name>> ] map [ MDB_OID ] dip member? + [ ] [ MDB_ADDON_SLOTS prepend ] if ; inline PRIVATE> : show-persistence-info ( class -- ) H{ } clone - [ [ dup mdb-collection>> "collection" ] dip set-at ] keep - [ [ mdb-slot-definitions>> "slots" ] dip set-at ] keep . ; - -GENERIC: mdb-persisted? ( tuple -- ? ) - -M: mdb-persistent mdb-persisted? ( tuple -- ? ) - _id>> f = not ; - -M: assoc mdb-persisted? ( assoc -- ? ) - [ MDB_OID ] dip key? ; inline + [ [ mdb-collection-prop "collection" ] dip set-at ] 2keep + [ [ mdb-slot-list "slots" ] dip set-at ] keep . ; : MDBTUPLE: parse-tuple-definition - mdb-check-id-slot + mdb-check-slots define-tuple-class ; parsing assoc ( seq -- assoc ) +: opt>assoc ( seq -- assoc ) [ dup assoc? - [ 1array { "" } append ] unless - ] map ; + [ 1array { "" } append ] unless ] map ; + +: optl>map ( seq -- map ) + H{ } clone tuck + '[ split-optl opt>assoc swap _ set-at ] each ; inline + +: set-slot-options ( class options -- ) + '[ MDB_SLOTDEF_LIST _ optl>map set-word-prop ] keep + dup mdb-collection-prop link-collection ; inline PRIVATE> -: set-slot-options ( class options -- ) - H{ } clone tuck '[ _ [ split-olist optl>assoc swap ] dip set-at ] each - over [ MDB_SLOTOPT_PROP ] dip props>> set-at - dup mdb-collection>> link-collection ; - -: define-collection ( class collection options -- ) +: set-collection ( class collection options -- ) [ [ dup ] dip link-collection ] dip ! cl options [ dup '[ _ mdb-persistent add-mixin-instance ] with-compilation-unit ] dip set-slot-options ; + ] 2dip + [ rest ] keep first ! assoc slot options itype + { { +fieldindex+ [ drop [ 1 ] dip pick set-at ] } + { +deepindex+ [ first "%s.%s" sprintf [ 1 ] dip pick set-at ] } + { +compoundindex+ [ + 2over swap [ 1 ] 2dip set-at [ drop ] dip ! assoc options + over '[ _ [ 1 ] 2dip set-at ] each ] } + } case ; + +: build-index-seq ( slot optlist -- index-seq ) + [ V{ } clone ] 2dip pick ! v{} slot optl v{} + [ swap ] dip ! v{} optl slot v{ } + '[ _ mdb-tuple-index new ! element slot exemplar + 2over swap index-name >>name ! element slot clone + [ build-index ] dip swap >>key _ push + ] each ; + +MEMO: is-index-declaration? ( entry -- ? ) + first + { { +fieldindex+ [ t ] } + { +compoundindex+ [ t ] } + { +deepindex+ [ t ] } + [ drop f ] } case ; + +: build-tuple-index-list ( mdb-collection -- seq ) + mdb-slot-list V{ } clone tuck + '[ [ is-index-declaration? ] filter + build-index-seq _ push + ] assoc-each flatten ; + +PRIVATE> + +: clean-indices ( list list2 -- ) 2drop ; + +: load-tuple-index-list ( mdb-collection -- indexlist ) + [ load-index-list ] dip + '[ [ "ns" ] dip at _ name>> ensure-collection = ] filter ; + +: ensure-tuple-index-list ( mdb-collection -- ) + [ build-tuple-index-list ] keep + '[ [ _ name>> ] dip [ name>> ] [ key>> ] bi ensure-index ] each ;