started complete rewrite... Now there's a low-level driver (mongodb.driver)

implementation. Tuple integration will follow soon.
db4
Sascha Matzke 2009-03-01 22:45:38 +01:00
parent 2e641216f3
commit 02a76d0a3e
12 changed files with 681 additions and 569 deletions

View File

@ -1,11 +1,13 @@
USING: accessors constructors uuid ;
USING: accessors kernel uuid ;
IN: bson.constants
TUPLE: objid id ;
CONSTRUCTOR: objid ( -- objid )
uuid1 >>id ; inline
: <objid> ( -- objid )
objid new uuid1 >>id ; inline
TUPLE: oid { a initial: 0 } { b initial: 0 } ;
TUPLE: objref ns objid ;

View File

@ -1,6 +1,6 @@
USING: accessors assocs bson.constants byte-arrays byte-vectors fry io
io.binary io.encodings.string io.encodings.utf8 kernel math namespaces
sequences serialize ;
sequences serialize arrays ;
IN: bson.reader
@ -176,6 +176,11 @@ M: bson-null element-data-read ( type -- bf )
drop
f ;
M: bson-oid element-data-read ( type -- oid )
drop
read-longlong
read-int32 oid boa ;
M: bson-binary-custom element-binary-read ( size type -- dbref )
2drop
read-cstring

View File

@ -25,6 +25,7 @@ M: string bson-type? ( string -- type ) drop T_String ;
M: integer bson-type? ( integer -- type ) drop T_Integer ;
M: sequence bson-type? ( seq -- type ) drop T_Array ;
M: oid bson-type? ( word -- type ) drop T_OID ;
M: objid bson-type? ( objid -- type ) drop T_Binary ;
M: objref bson-type? ( objref -- type ) drop T_Binary ;
M: quotation bson-type? ( quotation -- type ) drop T_Binary ;
@ -68,6 +69,9 @@ M: quotation bson-write ( quotation -- )
T_Binary_Function write-byte
write ;
M: oid bson-write ( oid -- )
[ a>> write-longlong ] [ b>> write-int32 ] bi ;
M: objid bson-write ( oid -- )
id>> utf8 encode
[ length write-int32 ] keep

View File

@ -1,65 +0,0 @@
USING: accessors assocs fry io.sockets kernel math mongodb.msg
namespaces sequences splitting math.parser io.encodings.binary ;
IN: mongodb.connection
TUPLE: mdb-node master? inet ;
TUPLE: mdb name nodes collections ;
: mdb>> ( -- mdb )
mdb get ; inline
: with-db ( mdb quot -- ... )
'[ _ mdb set _ call ] with-scope ;
: master>> ( mdb -- inet )
nodes>> [ t ] dip at inet>> ;
: slave>> ( mdb -- inet )
nodes>> [ f ] dip at inet>> ;
<PRIVATE
: ismaster-cmd ( node -- result )
binary "admin.$cmd" H{ { "ismaster" 1 } } <mdb-query-msg>
1 >>return# '[ _ write-message read-message ] with-client
objects>> first ;
: split-host-str ( hoststr -- host port )
":" split [ first ] keep
second string>number ; inline
: eval-ismaster-result ( node result -- node result )
[ [ "ismaster" ] dip at
>fixnum 1 =
[ t >>master? ] [ f >>master? ] if ] keep ;
: check-node ( node -- node remote )
dup inet>> ismaster-cmd
eval-ismaster-result
[ "remote" ] dip at ;
: check-nodes ( node -- nodelist )
check-node
[ V{ } clone [ push ] keep ] dip
[ split-host-str <inet> [ f ] dip
mdb-node boa check-node drop
swap tuck push
] when* ;
: verify-nodes ( -- )
mdb>> nodes>> [ t ] dip at
check-nodes
H{ } clone tuck
'[ dup master?>> _ set-at ] each
[ mdb>> ] dip >>nodes drop ;
PRIVATE>
: (<mdb>) ( db host port -- mdb )
[ f ] 2dip <inet> mdb-node boa
check-nodes
H{ } clone tuck
'[ dup master?>> _ set-at ] each
H{ } clone mdb boa ;

View File

@ -0,0 +1,282 @@
USING: accessors assocs fry io.sockets kernel math mongodb.msg formatting linked-assocs destructors continuations
mongodb.operations namespaces sequences splitting math.parser io.encodings.binary combinators io.streams.duplex
arrays io memoize constructors sets strings ;
IN: mongodb.driver
TUPLE: mdb-node master? inet ;
TUPLE: mdb name nodes collections ;
TUPLE: mdb-cursor collection id return# ;
UNION: boolean t POSTPONE: f ;
TUPLE: mdb-collection
{ name string }
{ capped boolean initial: f }
{ size integer initial: -1 }
{ max integer initial: -1 } ;
CONSTRUCTOR: mdb-cursor ( id collection return# -- cursor ) ;
CONSTRUCTOR: mdb-collection ( name -- collection ) ;
CONSTANT: MDB-GENERAL-ERROR 1
CONSTANT: MDB_OID "_id"
CONSTANT: MDB_PROPERTIES "_mdb_"
CONSTANT: PARTIAL? "partial?"
CONSTANT: DIRTY? "dirty?"
ERROR: mdb-error id msg ;
<PRIVATE
SYMBOL: mdb-socket-stream
: mdb-stream>> ( -- stream )
mdb-socket-stream get ; inline
: check-ok ( result -- ? )
[ "ok" ] dip key? ; inline
PRIVATE>
: mdb>> ( -- mdb )
mdb get ; inline
: master>> ( mdb -- inet )
nodes>> [ t ] dip at inet>> ;
: slave>> ( mdb -- inet )
nodes>> [ f ] dip at inet>> ;
: with-db ( mdb quot -- ... )
[ [ '[ _ [ mdb set ] keep master>>
[ remote-address set ] keep
binary <client>
local-address set
mdb-socket-stream set ] ] dip compose
[ mdb-stream>> [ dispose ] when* ] [ ] cleanup
] with-scope ;
<PRIVATE
: index-collection ( -- ns )
mdb>> name>> "%s.system.indexes" sprintf ; inline
: namespaces-collection ( -- ns )
mdb>> name>> "%s.system.namespaces" sprintf ; inline
: cmd-collection ( -- ns )
mdb>> name>> "%s.$cmd" sprintf ; inline
: index-ns ( colname -- index-ns )
[ mdb>> name>> ] dip "%s.%s" sprintf ; inline
: ismaster-cmd ( node -- result )
binary "admin.$cmd" H{ { "ismaster" 1 } } <mdb-query-msg>
1 >>return# '[ _ write-message read-message ] with-client
objects>> first ;
: split-host-str ( hoststr -- host port )
":" split [ first ] keep
second string>number ; inline
: eval-ismaster-result ( node result -- node result )
[ [ "ismaster" ] dip at
>fixnum 1 =
[ t >>master? ] [ f >>master? ] if ] keep ;
: check-node ( node -- node remote )
dup inet>> ismaster-cmd
eval-ismaster-result
[ "remote" ] dip at ;
: check-nodes ( node -- nodelist )
check-node
[ V{ } clone [ push ] keep ] dip
[ split-host-str <inet> [ f ] dip
mdb-node boa check-node drop
swap tuck push
] when* ;
: verify-nodes ( -- )
mdb>> nodes>> [ t ] dip at
check-nodes
H{ } clone tuck
'[ dup master?>> _ set-at ] each
[ mdb>> ] dip >>nodes drop ;
: send-message ( message -- )
[ mdb-stream>> ] dip '[ _ write-message ] with-stream* ;
: send-query-plain ( query-message -- result )
[ mdb-stream>> ] dip
'[ _ write-message read-message ] with-stream* ;
: send-query ( query-message -- cursor result )
[ send-query-plain ] keep
{ [ collection>> >>collection drop ]
[ return#>> >>requested# ]
} 2cleave
[ [ cursor>> 0 > ] keep
'[ _ [ cursor>> ] [ collection>> ] [ requested#>> ] tri <mdb-cursor> ]
[ f ] if
] [ objects>> ] bi ;
PRIVATE>
: <mdb> ( db host port -- mdb )
[ f ] 2dip <inet> mdb-node boa
check-nodes
H{ } clone tuck
'[ dup master?>> _ set-at ] each
H{ } clone mdb boa ;
: create-collection ( name -- )
[ cmd-collection ] dip
"create" H{ } clone [ set-at ] keep
<mdb-query-msg> 1 >>return# send-query-plain objects>> first check-ok
[ "could not create collection" throw ] unless ;
: load-collection-list ( -- collection-list )
namespaces-collection
H{ } clone <mdb-query-msg> send-query-plain objects>> ;
<PRIVATE
: ensure-valid-collection-name ( collection -- )
[ ";$." intersect length 0 > ] keep
'[ _ "%s contains invalid characters ( . $ ; )" sprintf throw ] when ; inline
: (ensure-collection) ( collection -- )
mdb>> collections>> dup keys length 0 =
[ load-collection-list
[ [ "options" ] dip key? ] filter
[ [ "name" ] dip at "." split second <mdb-collection> ] map
over '[ [ ] [ name>> ] bi _ set-at ] each ] [ ] if
[ dup ] dip key? [ drop ]
[ [ ensure-valid-collection-name ] keep create-collection ] if ; inline
MEMO: reserved-namespace? ( name -- ? )
[ "$cmd" = ] [ "system" head? ] bi or ;
PRIVATE>
MEMO: ensure-collection ( collection -- fq-collection )
"." split1 over mdb>> name>> =
[ [ drop ] dip ] [ drop ] if
[ ] [ reserved-namespace? ] bi
[ [ (ensure-collection) ] keep ] unless
[ mdb>> name>> ] dip "%s.%s" sprintf ; inline
: <query> ( collection query -- mdb-query )
[ ensure-collection ] dip
<mdb-query-msg> ; inline
GENERIC# limit 1 ( mdb-query limit# -- mdb-query )
M: mdb-query-msg limit ( query limit# -- mdb-query )
>>return# ; inline
GENERIC# skip 1 ( mdb-query skip# -- mdb-query )
M: mdb-query-msg skip ( query skip# -- mdb-query )
>>skip# ; inline
: asc ( key -- spec ) [ 1 ] dip H{ } clone [ set-at ] keep ; inline
: desc ( key -- spec ) [ -1 ] dip H{ } clone [ set-at ] keep ; inline
GENERIC# sort 1 ( mdb-query quot -- mdb-query )
M: mdb-query-msg sort ( query qout -- mdb-query )
[ { } ] dip with-datastack >>orderby ;
GENERIC# hint 1 ( mdb-query index-hint -- mdb-query )
M: mdb-query-msg hint ( mdb-query index-hint -- mdb-query )
>>hint ;
: find ( mdb-query -- cursor result )
send-query ;
: explain ( mdb-query -- result )
t >>explain find [ drop ] dip ;
GENERIC: get-more ( mdb-cursor -- mdb-cursor objects )
M: mdb-cursor get-more ( mdb-cursor -- mdb-cursor objects )
[ [ collection>> ] [ return#>> ] [ id>> ] tri <mdb-getmore-msg> send-query ]
[ f f ] if* ;
: find-one ( mdb-query -- result )
1 >>return# send-query-plain ;
: count ( collection query -- result )
[ "count" H{ } clone [ set-at ] keep ] dip
[ over [ "query" ] dip set-at ] when*
[ cmd-collection ] dip <mdb-query-msg> find-one objects>> first
[ check-ok ] keep '[ "n" _ at >fixnum ] [ f ] if ;
: lasterror ( -- error )
cmd-collection H{ { "getlasterror" 1 } } <mdb-query-msg>
find-one objects>> [ "err" ] at ;
: validate ( collection -- )
[ cmd-collection ] dip
"validate" H{ } clone [ set-at ] keep
<mdb-query-msg> find-one objects>> first [ check-ok ] keep
'[ "result" _ at print ] when ;
<PRIVATE
: send-message-check-error ( message -- )
send-message lasterror [ [ MDB-GENERAL-ERROR ] dip mdb-error ] when* ;
PRIVATE>
: save ( collection object -- )
[ ensure-collection ] dip
<mdb-insert-msg> send-message-check-error ;
: save-unsafe ( collection object -- )
[ ensure-collection ] dip
<mdb-insert-msg> send-message ;
: ensure-index ( collection name spec -- )
H{ } clone
[ [ "key" ] dip set-at ] keep
[ [ "name" ] dip set-at ] keep
[ [ index-ns "ns" ] dip set-at ] keep
[ index-collection ] dip
save ;
: drop-index ( collection name -- )
H{ } clone
[ [ "index" ] dip set-at ] keep
[ [ "deleteIndexes" ] dip set-at ] keep
[ cmd-collection ] dip <mdb-query-msg> find-one objects>> first
check-ok [ "could not drop index" throw ] unless ;
: update ( collection selector object -- )
[ ensure-collection ] dip
<mdb-update-msg> send-message-check-error ;
: update-unsafe ( collection selector object -- )
[ ensure-collection ] dip
<mdb-update-msg> send-message ;
: delete ( collection selector -- )
[ ensure-collection ] dip
<mdb-delete-msg> send-message-check-error ;
: delete-unsafe ( collection selector -- )
[ ensure-collection ] dip
<mdb-delete-msg> send-message ;
: load-index-list ( -- index-list )
index-collection
H{ } clone <mdb-query-msg> find [ drop ] dip ;
: drop-collection ( name -- )
[ cmd-collection ] dip
"drop" H{ } clone [ set-at ] keep
<mdb-query-msg> find-one objects>> first check-ok
[ "could not drop collection" throw ] unless ;

View File

@ -1,108 +0,0 @@
USING: accessors assocs combinators formatting fry kernel memoize
linked-assocs mongodb.persistent mongodb.msg mongodb.connection
sequences sequences.deep io.encodings.binary mongodb.tuple
io.sockets prettyprint sets tools.walker math ;
IN: mongodb.index
: index-ns ( name -- ns )
"%s.system.indexes" sprintf ; inline
TUPLE: index name ns key ;
SYMBOLS: +fieldindex+ +compoundindex+ +deepindex+ ;
<PRIVATE
: index-type ( type -- name )
{ { +fieldindex+ [ "field" ] }
{ +deepindex+ [ "deep" ] }
{ +compoundindex+ [ "compound" ] } } case ;
: index-name ( slot index-spec -- name )
[ first index-type ] keep
rest "-" join
"%s-%s-%s-Idx" sprintf ;
: build-index ( element slot -- assoc )
swap [ <linked-hash> ] 2dip
[ rest ] keep first ! assoc slot options itype
{ { +fieldindex+ [ drop [ 1 ] dip pick set-at ] }
{ +deepindex+ [ first "%s.%s" sprintf [ 1 ] dip pick set-at ] }
{ +compoundindex+ [
2over swap [ 1 ] 2dip set-at [ drop ] dip ! assoc options
over '[ _ [ 1 ] 2dip set-at ] each ] }
} case ;
: build-index-seq ( slot optlist ns -- index-seq )
[ V{ } clone ] 3dip ! v{} slot optl ns
[ index new ] dip ! v{} slot optl index ns
>>ns
[ pick ] dip swap ! v{} slot optl index v{}
[ swap ] 2dip ! v{} optl slot index v{ }
'[ _ _ ! element slot exemplar
clone 2over swap index-name >>name ! element slot clone
[ build-index ] dip swap >>key _ push
] each ;
: is-index-declaration? ( entry -- ? )
first
{ { +fieldindex+ [ t ] }
{ +compoundindex+ [ t ] }
{ +deepindex+ [ t ] }
[ drop f ] } case ;
: index-assoc ( seq -- assoc )
H{ } clone tuck '[ dup name>> _ set-at ] each ;
: delete-index ( name ns -- )
"Drop index %s - %s" sprintf . ;
: clean-indices ( existing defined -- )
[ index-assoc ] bi@ assoc-diff values
[ [ name>> ] [ ns>> ] bi delete-index ] each ;
PRIVATE>
USE: mongodb.query
: load-indices ( mdb-collection -- indexlist )
[ mdb>> name>> ] dip name>> "%s.%s" sprintf
"ns" H{ } clone [ set-at ] keep [ mdb>> name>> index-ns ] dip <mdb-query-msg>
'[ _ write-message read-message ]
[ mdb>> master>> binary ] dip with-client
objects>> [ [ index new ] dip
[ [ "ns" ] dip at >>ns ]
[ [ "name" ] dip at >>name ]
[ [ "key" ] dip at >>key ] tri
] map ;
: build-indices ( mdb-collection mdb -- seq )
name>>
[ [ mdb-slot-definitions>> ] keep name>> ] dip
swap "%s.%s" sprintf
[ V{ } clone ] 2dip pick
'[ _
[ [ is-index-declaration? ] filter ] dip
build-index-seq _ push
] assoc-each flatten ;
: ensure-indices ( mdb-collection -- )
[ load-indices ] keep mdb>> build-indices
[ clean-indices ] keep
V{ } clone tuck
'[ _ [ <linked-hash> tuple>query ] dip push ] each
dup length 0 >
[ [ mdb>> name>> "%s.system.indexes" sprintf ] dip
<mdb-insert-msg>
[ mdb>> master>> binary ] dip '[ _ write-message ] with-client
]
[ drop ] if ;
: show-indices ( mdb-collection -- )
load-indices . ;
: show-all-indices ( -- )
mdb>> collections>> values
V{ } clone tuck
'[ load-indices _ push ] each flatten . ;

View File

@ -1,7 +1,8 @@
USING: accessors assocs fry io.encodings.binary io.sockets kernel math
math.parser namespaces sequences splitting
mongodb.connection mongodb.persistent mongodb.msg mongodb.query
mongodb.tuple ;
USING: accessors assocs combinators fry io.encodings.binary
io.sockets kernel math math.parser mongodb.driver
mongodb.msg mongodb.operations mongodb.persistent
mongodb.tuple namespaces
sequences splitting ;
IN: mongodb
@ -18,29 +19,32 @@ GENERIC: explain ( object -- object )
[ mdb-collection>> get-collection-fqn ] keep
H{ } tuple>query <mdb-query-msg> ; inline
TUPLE: mdb-result { cursor integer }
{ start# integer }
{ returned# integer }
{ objects sequence } ;
: build-result ( resultmsg -- mdb-result )
[ mdb-result new ] dip
{
[ cursor>> >>cursor ]
[ start#>> >>start# ]
[ returned#>> >>returned# ]
[ objects>> [ assoc>tuple ] map >>objects ]
} cleave ;
PRIVATE>
: <mdb> ( db host port -- mdb )
(<mdb>) ;
M: mdb-persistent store ( tuple -- )
prepare-store ! H { collection { ... values ... }
[ [ get-collection-fqn ] dip
values <mdb-insert-msg>
[ mdb>> master>> binary ] dip '[ _ write-message ] with-client
values <mdb-insert-msg> send-message
] assoc-each ;
M: mdb-persistent find ( example -- result )
prepare-find [ mdb>> master>> ] dip (find)
prepare-find [ mdb>> master>> ] dip send-query
build-result ;
M: mdb-persistent nfind ( example n -- result )
[ prepare-find ] dip >>return#
[ mdb>> master>> ] dip (find)
build-result ;
M: mdb-persistent explain ( example -- result )
prepare-find [ query>> [ t "$explain" ] dip set-at ] keep
[ mdb>> master>> ] dip (find-one)
build-result ;
send-query build-result ;

View File

@ -1,12 +1,8 @@
USING: accessors io.encodings.string assocs bson.reader
bson.writer byte-arrays byte-vectors constructors fry io io.binary
io.encodings.binary io.encodings.utf8 io.streams.byte-array kernel
linked-assocs math namespaces sequences strings ;
USING: accessors assocs constructors kernel linked-assocs math
sequences strings ;
IN: mongodb.msg
<PRIVATE
CONSTANT: OP_Reply 1
CONSTANT: OP_Message 1000
CONSTANT: OP_Update 2001
@ -14,17 +10,7 @@ CONSTANT: OP_Insert 2002
CONSTANT: OP_Query 2004
CONSTANT: OP_GetMore 2005
CONSTANT: OP_Delete 2006
CONSTANT: OP_KillCursors 2007
PREDICATE: mdb-reply-op < integer OP_Reply = ;
PREDICATE: mdb-query-op < integer OP_Query = ;
PREDICATE: mdb-insert-op < integer OP_Insert = ;
PREDICATE: mdb-update-op < integer OP_Update = ;
PREDICATE: mdb-delete-op < integer OP_Delete = ;
PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
PRIVATE>
CONSTANT: OP_KillCursors 2007
TUPLE: mdb-msg
{ opcode integer }
@ -39,7 +25,7 @@ TUPLE: mdb-insert-msg < mdb-msg
TUPLE: mdb-update-msg < mdb-msg
{ collection string }
{ upsert? integer initial: 1 }
{ upsert? integer initial: 0 }
{ selector assoc }
{ object assoc } ;
@ -62,16 +48,19 @@ TUPLE: mdb-query-msg < mdb-msg
{ return# integer initial: 0 }
{ query assoc }
{ returnfields assoc }
{ orderby sequence } ;
{ orderby sequence }
explain hint ;
TUPLE: mdb-reply-msg < mdb-msg
{ collection string }
{ cursor integer initial: 0 }
{ start# integer initial: 0 }
{ requested# integer initial: 0 }
{ returned# integer initial: 0 }
{ objects sequence } ;
CONSTRUCTOR: mdb-getmore-msg ( collection return# -- mdb-getmore-msg )
CONSTRUCTOR: mdb-getmore-msg ( collection return# cursor -- mdb-getmore-msg )
OP_GetMore >>opcode ; inline
CONSTRUCTOR: mdb-delete-msg ( collection selector -- mdb-delete-msg )
@ -90,213 +79,22 @@ M: sequence <mdb-killcursors-msg> ( sequences -- mdb-killcursors-msg )
M: integer <mdb-killcursors-msg> ( integer -- mdb-killcursors-msg )
V{ } clone [ push ] keep <mdb-killcursors-msg> ;
GENERIC# <mdb-insert-msg> 1 ( collection objects -- mdb-insert-msg )
M: linked-assoc <mdb-insert-msg> ( collection linked-assoc -- mdb-insert-msg )
[ mdb-insert-msg new ] 2dip
[ >>collection ] dip
V{ } clone tuck push
>>objects OP_Insert >>opcode ;
GENERIC: <mdb-insert-msg> ( collection objects -- mdb-insert-msg )
M: sequence <mdb-insert-msg> ( collection sequence -- mdb-insert-msg )
[ mdb-insert-msg new ] 2dip
[ >>collection ] dip
>>objects OP_Insert >>opcode ;
CONSTRUCTOR: mdb-update-msg ( collection object -- mdb-update-msg )
dup object>> [ "_id" ] dip at "_id" H{ } clone [ set-at ] keep >>selector
OP_Update >>opcode ;
M: assoc <mdb-insert-msg> ( collection assoc -- mdb-insert-msg )
[ mdb-insert-msg new ] 2dip
[ >>collection ] dip
V{ } clone tuck push
>>objects OP_Insert >>opcode ;
CONSTRUCTOR: mdb-update-msg ( collection selector object -- mdb-update-msg )
OP_Update >>opcode ; inline
CONSTRUCTOR: mdb-reply-msg ( -- mdb-reply-msg ) ; inline
GENERIC: write-message ( message -- )
<PRIVATE
CONSTANT: MSG-HEADER-SIZE 16
SYMBOL: msg-bytes-read
: bytes-read> ( -- integer )
msg-bytes-read get ; inline
: >bytes-read ( integer -- )
msg-bytes-read set ; inline
: change-bytes-read ( integer -- )
bytes-read> [ 0 ] unless* + >bytes-read ; inline
: write-byte ( byte -- ) 1 >le write ; inline
: write-int32 ( int -- ) 4 >le write ; inline
: write-double ( real -- ) double>bits 8 >le write ; inline
: write-cstring ( string -- ) utf8 encode B{ 0 } append write ; inline
: write-longlong ( object -- ) 8 >le write ; inline
: read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline
: read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline
: read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline
: read-byte ( -- byte ) read-byte-raw first ; inline
: (read-cstring) ( acc -- )
[ read-byte ] dip ! b acc
2dup push ! b acc
[ 0 = ] dip ! bool acc
'[ _ (read-cstring) ] unless ; inline recursive
: read-cstring ( -- string )
BV{ } clone
[ (read-cstring) ] keep
[ zero? ] trim-tail
>byte-array utf8 decode ; inline
GENERIC: (read-message) ( message opcode -- message )
: copy-header ( message msg-stub -- message )
[ length>> ] keep [ >>length ] dip
[ req-id>> ] keep [ >>req-id ] dip
[ resp-id>> ] keep [ >>resp-id ] dip
[ opcode>> ] keep [ >>opcode ] dip
flags>> >>flags ;
M: mdb-query-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-query-msg new ] dip copy-header
read-cstring >>collection
read-int32 >>skip#
read-int32 >>return#
H{ } stream>assoc change-bytes-read >>query
dup length>> bytes-read> >
[ H{ } stream>assoc change-bytes-read >>returnfields
dup length>> bytes-read> >
[ H{ } stream>assoc drop >>orderby ] when
] when ;
M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-insert-msg new ] dip copy-header
read-cstring >>collection
V{ } clone >>objects
[ '[ _ length>> bytes-read> > ] ] keep tuck
'[ H{ } stream>assoc change-bytes-read _ objects>> push ]
[ ] while ;
M: mdb-delete-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-delete-msg new ] dip copy-header
read-cstring >>collection
H{ } stream>assoc change-bytes-read >>selector ;
M: mdb-getmore-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-getmore-msg new ] dip copy-header
read-cstring >>collection
read-int32 >>return#
read-longlong >>cursor ;
M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-killcursors-msg new ] dip copy-header
read-int32 >>cursors#
V{ } clone >>cursors
[ [ cursors#>> ] keep
'[ read-longlong _ cursors>> push ] times ] keep ;
M: mdb-update-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-update-msg new ] dip copy-header
read-cstring >>collection
read-int32 >>upsert?
H{ } stream>assoc change-bytes-read >>selector
H{ } stream>assoc change-bytes-read >>object ;
M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
drop
[ <mdb-reply-msg> ] dip copy-header
read-longlong >>cursor
read-int32 >>start#
read-int32 [ >>returned# ] keep
[ H{ } stream>assoc drop ] accumulator [ times ] dip >>objects ;
: read-header ( message -- message )
read-int32 >>length
read-int32 >>req-id
read-int32 >>resp-id
read-int32 >>opcode
read-int32 >>flags ; inline
: write-header ( message length -- )
MSG-HEADER-SIZE + write-int32
[ req-id>> write-int32 ] keep
[ resp-id>> write-int32 ] keep
opcode>> write-int32 ; inline
PRIVATE>
: read-message ( -- message )
mdb-msg new
0 >bytes-read
read-header
[ ] [ opcode>> ] bi (read-message) ;
<PRIVATE
: (write-message) ( message quot -- )
[ binary ] dip with-byte-writer
[ length write-header ] keep
write flush ; inline
PRIVATE>
M: mdb-query-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
[ skip#>> write-int32 ] keep
[ return#>> write-int32 ] keep
query>> assoc>array write
] (write-message) ;
M: mdb-insert-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
objects>> [ assoc>array write ] each
] (write-message) ;
M: mdb-update-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
[ upsert?>> write-int32 ] keep
[ selector>> assoc>array write ] keep
object>> assoc>array write
] (write-message) ;
M: mdb-delete-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
0 write-int32
selector>> assoc>array write
] (write-message) ;
M: mdb-getmore-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
[ return#>> write-int32 ] keep
cursor>> write-longlong
] (write-message) ;
M: mdb-killcursors-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ cursors#>> write-int32 ] keep
cursors>> [ write-longlong ] each
] (write-message) ;

View File

@ -0,0 +1,219 @@
USING: accessors bson.reader bson.writer byte-arrays byte-vectors fry
io io.binary io.encodings.binary io.encodings.string io.encodings.utf8
io.streams.byte-array kernel math mongodb.msg namespaces sequences
locals assocs combinators linked-assocs ;
IN: mongodb.operations
<PRIVATE
PREDICATE: mdb-reply-op < integer OP_Reply = ;
PREDICATE: mdb-query-op < integer OP_Query = ;
PREDICATE: mdb-insert-op < integer OP_Insert = ;
PREDICATE: mdb-update-op < integer OP_Update = ;
PREDICATE: mdb-delete-op < integer OP_Delete = ;
PREDICATE: mdb-getmore-op < integer OP_GetMore = ;
PREDICATE: mdb-killcursors-op < integer OP_KillCursors = ;
PRIVATE>
GENERIC: write-message ( message -- )
<PRIVATE
CONSTANT: MSG-HEADER-SIZE 16
SYMBOL: msg-bytes-read
: bytes-read> ( -- integer )
msg-bytes-read get ; inline
: >bytes-read ( integer -- )
msg-bytes-read set ; inline
: change-bytes-read ( integer -- )
bytes-read> [ 0 ] unless* + >bytes-read ; inline
: write-byte ( byte -- ) 1 >le write ; inline
: write-int32 ( int -- ) 4 >le write ; inline
: write-double ( real -- ) double>bits 8 >le write ; inline
: write-cstring ( string -- ) utf8 encode B{ 0 } append write ; inline
: write-longlong ( object -- ) 8 >le write ; inline
: read-int32 ( -- int32 ) 4 [ read le> ] [ change-bytes-read ] bi ; inline
: read-longlong ( -- longlong ) 8 [ read le> ] [ change-bytes-read ] bi ; inline
: read-byte-raw ( -- byte-raw ) 1 [ read le> ] [ change-bytes-read ] bi ; inline
: read-byte ( -- byte ) read-byte-raw first ; inline
: (read-cstring) ( acc -- )
[ read-byte ] dip ! b acc
2dup push ! b acc
[ 0 = ] dip ! bool acc
'[ _ (read-cstring) ] unless ; inline recursive
: read-cstring ( -- string )
BV{ } clone
[ (read-cstring) ] keep
[ zero? ] trim-tail
>byte-array utf8 decode ; inline
GENERIC: (read-message) ( message opcode -- message )
: copy-header ( message msg-stub -- message )
[ length>> ] keep [ >>length ] dip
[ req-id>> ] keep [ >>req-id ] dip
[ resp-id>> ] keep [ >>resp-id ] dip
[ opcode>> ] keep [ >>opcode ] dip
flags>> >>flags ;
M: mdb-query-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-query-msg new ] dip copy-header
read-cstring >>collection
read-int32 >>skip#
read-int32 >>return#
H{ } stream>assoc change-bytes-read >>query
dup length>> bytes-read> >
[ H{ } stream>assoc change-bytes-read >>returnfields ] when ;
M: mdb-insert-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-insert-msg new ] dip copy-header
read-cstring >>collection
V{ } clone >>objects
[ '[ _ length>> bytes-read> > ] ] keep tuck
'[ H{ } stream>assoc change-bytes-read _ objects>> push ]
while ;
M: mdb-delete-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-delete-msg new ] dip copy-header
read-cstring >>collection
H{ } stream>assoc change-bytes-read >>selector ;
M: mdb-getmore-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-getmore-msg new ] dip copy-header
read-cstring >>collection
read-int32 >>return#
read-longlong >>cursor ;
M: mdb-killcursors-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-killcursors-msg new ] dip copy-header
read-int32 >>cursors#
V{ } clone >>cursors
[ [ cursors#>> ] keep
'[ read-longlong _ cursors>> push ] times ] keep ;
M: mdb-update-op (read-message) ( msg-stub opcode -- message )
drop
[ mdb-update-msg new ] dip copy-header
read-cstring >>collection
read-int32 >>upsert?
H{ } stream>assoc change-bytes-read >>selector
H{ } stream>assoc change-bytes-read >>object ;
M: mdb-reply-op (read-message) ( msg-stub opcode -- message )
drop
[ <mdb-reply-msg> ] dip copy-header
read-longlong >>cursor
read-int32 >>start#
read-int32 [ >>returned# ] keep
[ <linked-hash> stream>assoc drop ] accumulator [ times ] dip >>objects ;
: read-header ( message -- message )
read-int32 >>length
read-int32 >>req-id
read-int32 >>resp-id
read-int32 >>opcode
read-int32 >>flags ; inline
: write-header ( message length -- )
MSG-HEADER-SIZE + write-int32
[ req-id>> write-int32 ] keep
[ resp-id>> write-int32 ] keep
opcode>> write-int32 ; inline
PRIVATE>
: read-message ( -- message )
mdb-msg new
0 >bytes-read
read-header
[ ] [ opcode>> ] bi (read-message) ;
<PRIVATE
: (write-message) ( message quot -- )
[ binary ] dip with-byte-writer
[ length write-header ] keep
write flush ; inline
: build-query-object ( query -- selector )
[let | selector [ <linked-hash> ] |
{ [ orderby>> [ "orderby" selector set-at ] when* ]
[ explain>> [ "$explain" selector set-at ] when* ]
[ hint>> [ "$hint" selector set-at ] when* ]
[ query>> "query" selector set-at ]
} cleave
selector
] ;
PRIVATE>
M: mdb-query-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
[ skip#>> write-int32 ] keep
[ return#>> write-int32 ] keep
[ build-query-object assoc>array write ] keep
returnfields>> [ assoc>array write ] when*
] (write-message) ;
M: mdb-insert-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
objects>> [ assoc>array write ] each
] (write-message) ;
M: mdb-update-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
[ upsert?>> write-int32 ] keep
[ selector>> assoc>array write ] keep
object>> assoc>array write
] (write-message) ;
M: mdb-delete-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
0 write-int32
selector>> assoc>array write
] (write-message) ;
M: mdb-getmore-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ collection>> write-cstring ] keep
[ return#>> write-int32 ] keep
cursor>> write-longlong
] (write-message) ;
M: mdb-killcursors-msg write-message ( message -- )
dup
'[ _
[ flags>> write-int32 ] keep
[ cursors#>> write-int32 ] keep
cursors>> [ write-longlong ] each
] (write-message) ;

View File

@ -1,6 +1,6 @@
USING: accessors assocs classes fry kernel linked-assocs math mirrors
namespaces sequences strings vectors words bson.constants
continuations mongodb.tuple ;
continuations mongodb.driver mongodb.tuple ;
IN: mongodb.persistent
@ -18,10 +18,10 @@ DEFER: create-mdb-command
<PRIVATE
CONSTANT: MDB_INFO "_mdb_info"
CONSTANT: MDB_INFO "_mdb_"
: <objref> ( tuple -- objref )
[ mdb-collection>> ] [ _id>> ] bi objref boa ; inline
[ mdb-collection-prop ] [ _id>> ] bi objref boa ; inline
: mdbinfo>tuple-class ( mdbinfo -- class )
[ first ] keep second lookup ; inline

View File

@ -1,68 +0,0 @@
USING: accessors combinators fry io.encodings.binary io.sockets kernel
mongodb.msg mongodb.persistent mongodb.connection sequences math namespaces assocs
formatting splitting mongodb.tuple mongodb.index ;
IN: mongodb.query
TUPLE: mdb-result { cursor integer }
{ start# integer }
{ returned# integer }
{ objects sequence } ;
: namespaces-ns ( name -- ns )
"%s.system.namespaces" sprintf ; inline
<PRIVATE
: (execute-query) ( inet quot -- result )
[ binary ] dip with-client ; inline
PRIVATE>
: (find) ( inet query -- result )
'[ _ write-message read-message ] (execute-query) ; inline
: (find-one) ( inet query -- result )
1 >>return#
(find) ; inline
: build-result ( resultmsg -- mdb-result )
[ mdb-result new ] dip
{
[ cursor>> >>cursor ]
[ start#>> >>start# ]
[ returned#>> >>returned# ]
[ objects>> [ assoc>tuple ] map >>objects ]
} cleave ;
: load-collections ( -- collections )
mdb>> [ master>> ] [ name>> namespaces-ns ] bi
H{ } clone <mdb-query-msg> (find)
objects>> [ [ "name" ] dip at "." split second <mdb-collection> ] map
H{ } clone tuck
'[ [ ensure-indices ] [ ] [ name>> ] tri _ set-at ] each
[ mdb>> ] dip >>collections collections>> ;
: check-ok ( result -- ? )
[ "ok" ] dip key? ; inline
: create-collection ( mdb-collection -- )
dup name>> "create" H{ } clone [ set-at ] keep
[ mdb>> [ master>> ] [ name>> ] bi "%s.$cmd" sprintf ] dip
<mdb-query-msg> (find-one) objects>> first
check-ok
[ [ ensure-indices ] keep dup name>> mdb>> collections>> set-at ]
[ "could not create collection" throw ] if ;
: get-collection-fqn ( mdb-collection -- fqdn )
mdb>> collections>>
dup keys length 0 =
[ drop load-collections ]
[ ] if
[ dup name>> ] dip
key?
[ ]
[ dup create-collection ] if
name>> [ mdb>> name>> ] dip "%s.%s" sprintf ;

View File

@ -1,131 +1,170 @@
USING: accessors assocs classes classes.mixin classes.tuple vectors math
classes.tuple.parser formatting generalizations kernel sequences fry
prettyprint strings compiler.units slots tools.walker words arrays mongodb.persistent ;
classes.tuple.parser formatting generalizations kernel sequences fry combinators
linked-assocs sequences.deep mongodb.driver continuations memoize
prettyprint strings compiler.units slots tools.walker words arrays ;
IN: mongodb.tuple
MIXIN: mdb-persistent
GENERIC: mdb-slot-definitions>> ( tuple -- string )
GENERIC: mdb-collection>> ( object -- mdb-collection )
CONSTANT: MDB_COLLECTIONS "mdb_collections"
CONSTANT: MDB_COL_PROP "mdb_collection"
CONSTANT: MDB_SLOTOPT_PROP "mdb_slot_options"
SLOT: _id
CONSTANT: MDB_P_SLOTS { "_id" }
CONSTANT: MDB_OID "_id"
SLOT: _mdb_
SYMBOLS: +transient+ +load+ ;
GENERIC: mdb-collection-prop ( object -- mdb-collection )
GENERIC: mdb-slot-list ( tuple -- string )
UNION: boolean t POSTPONE: f ;
CONSTANT: MDB_COLLECTION_MAP "_mdb_col_map"
CONSTANT: MDB_COLLECTION "_mdb_col"
CONSTANT: MDB_SLOTDEF_LIST "_mdb_slot_list"
TUPLE: mdb-collection
{ name string }
{ capped boolean initial: f }
{ size integer initial: -1 }
{ max integer initial: -1 }
{ classes sequence } ;
SYMBOLS: +transient+ +load+ +fieldindex+ +compoundindex+ +deepindex+ ;
TUPLE: mdb-tuple-collection < mdb-collection { classes sequence } ;
TUPLE: mdb-tuple-index name key ;
USE: mongodb.persistent
<PRIVATE
: (mdb-collection>>) ( class -- mdb-collection )
dup props>> [ MDB_COL_PROP ] dip at
[ [ drop ] dip ]
[ superclass [ (mdb-collection>>) ] [ f ] if* ] if* ; inline recursive
: MDB_ADDON_SLOTS ( -- slots )
{ } [ MDB_OID MDB_PROPERTIES ] with-datastack ; inline
: (mdb-slot-definitions>>) ( class -- slot-defs )
superclasses [ MDB_SLOTOPT_PROP word-prop ] map assoc-combine ; inline
: (mdb-collection) ( class -- mdb-collection )
dup MDB_COLLECTION word-prop
[ [ drop ] dip ]
[ superclass [ (mdb-collection) ] [ f ] if* ] if* ; inline recursive
: (mdb-slot-list) ( class -- slot-defs )
superclasses [ MDB_SLOTDEF_LIST word-prop ] map assoc-combine ; inline
: link-class ( class collection -- )
tuck classes>> ! col class v{}
over classes>>
[ 2dup member? [ 2drop ] [ push ] if ]
[ 1vector >>classes ] if* drop ;
[ 1vector >>classes ] if* drop ; inline
: link-collection ( class collection -- )
[ swap link-class ] [ MDB_COLLECTION set-word-prop ] 2bi ; inline
PRIVATE>
M: tuple-class mdb-collection>> ( tuple -- mdb-collection )
(mdb-collection>>) ;
M: tuple-class mdb-collection-prop ( tuple -- mdb-collection )
(mdb-collection) ;
M: mdb-persistent mdb-collection>> ( tuple -- mdb-collection )
class (mdb-collection>>) ;
M: mdb-persistent mdb-collection-prop ( tuple -- mdb-collection )
class (mdb-collection) ;
M: mdb-persistent mdb-slot-definitions>> ( tuple -- string )
class (mdb-slot-definitions>>) ;
M: mdb-persistent mdb-slot-list ( tuple -- string )
class (mdb-slot-list) ;
M: tuple-class mdb-slot-definitions>> ( class -- assoc )
(mdb-slot-definitions>>) ;
M: tuple-class mdb-slot-list ( class -- assoc )
(mdb-slot-list) ;
M: mdb-collection mdb-slot-definitions>> ( collection -- assoc )
classes>> [ mdb-slot-definitions>> ] map assoc-combine ;
: link-collection ( class collection -- )
2dup link-class
swap [ MDB_COL_PROP ] dip props>> set-at ; inline
: declared-collections> ( -- assoc )
MDB_COLLECTIONS mdb-persistent props>> at
[ H{ } clone
[ MDB_COLLECTIONS mdb-persistent props>> set-at ] keep
] unless* ;
: <mdb-collection> ( name -- mdb-collection )
declared-collections> 2dup key?
[ at ]
[ [ mdb-collection new ] 2dip
[ [ >>name dup ] keep ] dip set-at ] if ;
M: mdb-collection mdb-slot-list ( collection -- assoc )
classes>> [ mdb-slot-list ] map assoc-combine ;
: collection-map ( -- assoc )
MDB_COLLECTION_MAP mdb-persistent word-prop
[ mdb-persistent MDB_COLLECTION_MAP H{ } clone
[ set-word-prop ] keep ] unless* ; inline
: <mdb-tuple-collection> ( name -- mdb-tuple-collection )
collection-map [ ] [ key? ] 2bi
[ at ] [ [ mdb-tuple-collection new dup ] 2dip
[ [ >>name ] keep ] dip set-at ] if ;
<PRIVATE
: mdb-check-id-slot ( superclass slots -- superclass slots )
over
all-slots [ name>> ] map [ MDB_OID ] dip memq?
[ ]
[ MDB_P_SLOTS prepend ] if ; inline
: mdb-check-slots ( superclass slots -- superclass slots )
over all-slots [ name>> ] map [ MDB_OID ] dip member?
[ ] [ MDB_ADDON_SLOTS prepend ] if ; inline
PRIVATE>
: show-persistence-info ( class -- )
H{ } clone
[ [ dup mdb-collection>> "collection" ] dip set-at ] keep
[ [ mdb-slot-definitions>> "slots" ] dip set-at ] keep . ;
GENERIC: mdb-persisted? ( tuple -- ? )
M: mdb-persistent mdb-persisted? ( tuple -- ? )
_id>> f = not ;
M: assoc mdb-persisted? ( assoc -- ? )
[ MDB_OID ] dip key? ; inline
[ [ mdb-collection-prop "collection" ] dip set-at ] 2keep
[ [ mdb-slot-list "slots" ] dip set-at ] keep . ;
: MDBTUPLE:
parse-tuple-definition
mdb-check-id-slot
mdb-check-slots
define-tuple-class ; parsing
<PRIVATE
: split-olist ( seq -- key options )
: split-optl ( seq -- key options )
[ first ] [ rest ] bi ; inline
: optl>assoc ( seq -- assoc )
: opt>assoc ( seq -- assoc )
[ dup assoc?
[ 1array { "" } append ] unless
] map ;
[ 1array { "" } append ] unless ] map ;
: optl>map ( seq -- map )
H{ } clone tuck
'[ split-optl opt>assoc swap _ set-at ] each ; inline
: set-slot-options ( class options -- )
'[ MDB_SLOTDEF_LIST _ optl>map set-word-prop ] keep
dup mdb-collection-prop link-collection ; inline
PRIVATE>
: set-slot-options ( class options -- )
H{ } clone tuck '[ _ [ split-olist optl>assoc swap ] dip set-at ] each
over [ MDB_SLOTOPT_PROP ] dip props>> set-at
dup mdb-collection>> link-collection ;
: define-collection ( class collection options -- )
: set-collection ( class collection options -- )
[ [ dup ] dip link-collection ] dip ! cl options
[ dup '[ _ mdb-persistent add-mixin-instance ] with-compilation-unit ] dip
set-slot-options ;
<PRIVATE
: index-type ( type -- name )
{ { +fieldindex+ [ "field" ] }
{ +deepindex+ [ "deep" ] }
{ +compoundindex+ [ "compound" ] } } case ;
: index-name ( slot index-spec -- name )
[ first index-type ] keep
rest "-" join
"%s-%s-%s-Idx" sprintf ;
: build-index ( element slot -- assoc )
swap [ <linked-hash> ] 2dip
[ rest ] keep first ! assoc slot options itype
{ { +fieldindex+ [ drop [ 1 ] dip pick set-at ] }
{ +deepindex+ [ first "%s.%s" sprintf [ 1 ] dip pick set-at ] }
{ +compoundindex+ [
2over swap [ 1 ] 2dip set-at [ drop ] dip ! assoc options
over '[ _ [ 1 ] 2dip set-at ] each ] }
} case ;
: build-index-seq ( slot optlist -- index-seq )
[ V{ } clone ] 2dip pick ! v{} slot optl v{}
[ swap ] dip ! v{} optl slot v{ }
'[ _ mdb-tuple-index new ! element slot exemplar
2over swap index-name >>name ! element slot clone
[ build-index ] dip swap >>key _ push
] each ;
MEMO: is-index-declaration? ( entry -- ? )
first
{ { +fieldindex+ [ t ] }
{ +compoundindex+ [ t ] }
{ +deepindex+ [ t ] }
[ drop f ] } case ;
: build-tuple-index-list ( mdb-collection -- seq )
mdb-slot-list V{ } clone tuck
'[ [ is-index-declaration? ] filter
build-index-seq _ push
] assoc-each flatten ;
PRIVATE>
: clean-indices ( list list2 -- ) 2drop ;
: load-tuple-index-list ( mdb-collection -- indexlist )
[ load-index-list ] dip
'[ [ "ns" ] dip at _ name>> ensure-collection = ] filter ;
: ensure-tuple-index-list ( mdb-collection -- )
[ build-tuple-index-list ] keep
'[ [ _ name>> ] dip [ name>> ] [ key>> ] bi ensure-index ] each ;