New threaded-server

db4
Slava Pestov 2008-06-17 00:04:18 -05:00
parent 12b79b287f
commit 0a436e1184
19 changed files with 245 additions and 121 deletions

View File

@ -1,10 +0,0 @@
USING: help help.syntax help.markup io ;
IN: io.server
HELP: with-server
{ $values { "seq" "a sequence of address specifiers" } { "service" "a string or " { $link f } } { "encoding" "an encoding to use for client connections" } { "quot" "a quotation" } }
{ $description "Starts a TCP/IP server. The quotation is called in a new thread for each client connection, with the client connection being both the " { $link input-stream } " and " { $link output-stream } "." } ;
HELP: with-datagrams
{ $values { "seq" "a sequence of address specifiers" } { "service" "a string or " { $link f } } { "quot" "a quotation" } }
{ $description "Starts a UDP/IP server. The quotation is called for each datagram packet received." } ;

View File

@ -1,7 +0,0 @@
IN: io.server.tests
USING: tools.test io.server io.server.private kernel ;
{ 2 0 } [ [ ] server-loop ] must-infer-as
{ 3 0 } [ [ ] with-connection ] must-infer-as
{ 1 0 } [ [ ] swap datagram-loop ] must-infer-as
{ 2 0 } [ [ ] with-datagrams ] must-infer-as

View File

@ -1,76 +0,0 @@
! Copyright (C) 2003, 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: io io.sockets io.sockets.secure io.files
io.streams.duplex logging continuations destructors kernel math
math.parser namespaces parser sequences strings prettyprint
debugger quotations calendar threads concurrency.combinators
assocs fry accessors arrays ;
IN: io.server
SYMBOL: servers
SYMBOL: remote-address
<PRIVATE
LOG: accepted-connection NOTICE
: with-connection ( client remote local quot -- )
'[
, ,
[ [ remote-address set ] [ local-address set ] bi* ]
[ 2array accepted-connection ]
2bi
@
] with-stream ; inline
: accept-loop ( server quot -- )
[
[ [ accept ] [ addr>> ] bi ] dip
'[ , , , , with-connection ] "Client" spawn drop
] 2keep accept-loop ; inline
: server-loop ( addrspec encoding quot -- )
>r <server> dup servers get push r>
'[ , accept-loop ] with-disposal ; inline
\ server-loop NOTICE add-error-logging
PRIVATE>
: local-server ( port -- seq )
"localhost" swap t resolve-host ;
: internet-server ( port -- seq )
f swap t resolve-host ;
: secure-server ( port -- seq )
internet-server [ <secure> ] map ;
: with-server ( seq service encoding quot -- )
V{ } clone servers [
'[ , [ , , server-loop ] with-logging ] parallel-each
] with-variable ; inline
: stop-server ( -- )
servers get dispose-each ;
<PRIVATE
LOG: received-datagram NOTICE
: datagram-loop ( quot datagram -- )
[
[ receive dup received-datagram [ swap call ] dip ] keep
pick [ send ] [ 3drop ] if
] 2keep datagram-loop ; inline
: spawn-datagrams ( quot addrspec -- )
<datagram> [ datagram-loop ] with-disposal ; inline
\ spawn-datagrams NOTICE add-input-logging
PRIVATE>
: with-datagrams ( seq service quot -- )
'[ [ , _ spawn-datagrams ] parallel-each ] with-logging ; inline

View File

@ -1 +0,0 @@
TCP/IP and UDP/IP servers

View File

@ -0,0 +1,2 @@
USING: help help.syntax help.markup io ;
IN: io.servers.connection

View File

@ -0,0 +1,47 @@
IN: io.servers.connection
USING: tools.test io.servers.connection io.sockets namespaces
io.servers.connection.private kernel accessors sequences
concurrency.promises io.encodings.ascii io threads calendar ;
[ t ] [ <threaded-server> listen-on empty? ] unit-test
[ f ] [
<threaded-server>
25 internet-server >>insecure
listen-on
empty?
] unit-test
[ t ] [
T{ inet4 "1.2.3.4" 1234 } T{ inet4 "1.2.3.5" 1235 }
[ log-connection ] 2keep
[ remote-address get = ] [ local-address get = ] bi*
and
] unit-test
[ ] [ <threaded-server> init-server drop ] unit-test
[ 10 ] [
<threaded-server>
10 >>max-connections
init-server semaphore>> count>>
] unit-test
[ ] [ <promise> "p" set ] unit-test
[ ] [
[
<threaded-server>
5 >>max-connections
1237 >>insecure
[ "Hello world." write stop-server ] >>handler
start-server
t "p" get fulfill
] in-thread
] unit-test
[ ] [ 100 sleep ] unit-test
[ "Hello world." ] [ "localhost" 1237 <inet> ascii <client> drop contents ] unit-test
[ t ] [ "p" get 2 seconds ?promise-timeout ] unit-test

View File

@ -0,0 +1,131 @@
! Copyright (C) 2003, 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: continuations destructors kernel math math.parser
namespaces parser sequences strings prettyprint debugger
quotations combinators combinators.lib logging calendar assocs
fry accessors arrays io io.sockets io.encodings.ascii
io.sockets.secure io.files io.streams.duplex io.timeouts
io.encodings threads concurrency.combinators
concurrency.semaphores ;
IN: io.servers.connection
TUPLE: threaded-server
name
secure insecure
secure-config
sockets
max-connections
semaphore
timeout
encoding
handler ;
: local-server ( port -- addrspec ) "localhost" swap <inet> ;
: internet-server ( port -- addrspec ) f swap <inet> ;
: new-threaded-server ( class -- threaded-server )
new
"server" >>name
ascii >>encoding
1 minutes >>timeout
V{ } clone >>sockets
<secure-config> >>secure-config
[ "No handler quotation" throw ] >>handler ; inline
: <threaded-server> ( -- threaded-server )
threaded-server new-threaded-server ;
SYMBOL: remote-address
GENERIC: handle-client* ( server -- )
<PRIVATE
: >insecure ( addrspec -- addrspec' )
dup { [ integer? ] [ string? ] } 1|| [ internet-server ] when ;
: >secure ( addrspec -- addrspec' )
>insecure
dup { [ secure? ] [ not ] } 1|| [ <secure> ] unless ;
: listen-on ( threaded-server -- addrspecs )
[ secure>> >secure ] [ insecure>> >insecure ] bi
[ resolve-host ] bi@ append ;
LOG: accepted-connection NOTICE
: log-connection ( remote local -- )
[ [ remote-address set ] [ local-address set ] bi* ]
[ 2array accepted-connection ]
2bi ;
M: threaded-server handle-client* handler>> call ;
: handle-client ( client remote local -- )
'[
, , log-connection
threaded-server get
[ timeout>> timeouts ] [ handle-client* ] bi
] with-stream ;
: thread-name ( server-name addrspec -- string )
unparse " connection from " swap 3append ;
: accept-connection ( server -- )
[ accept ] [ addr>> ] bi
[ '[ , , , handle-client ] ]
[ drop threaded-server get name>> swap thread-name ] 2bi
spawn drop ;
: accept-loop ( server -- )
[
threaded-server get semaphore>>
[ [ accept-connection ] with-semaphore ]
[ accept-connection ]
if*
] [ accept-loop ] bi ; inline
\ accept-loop ERROR add-error-logging
: start-accept-loop ( server -- )
threaded-server get encoding>> <server>
[ threaded-server get sockets>> push ]
[ [ accept-loop ] with-disposal ]
bi ;
: init-server ( threaded-server -- threaded-server )
dup semaphore>> [
dup max-connections>> [
<semaphore> >>semaphore
] when*
] unless ;
PRIVATE>
: start-server ( threaded-server -- )
init-server
dup secure-config>> [
dup threaded-server [
dup name>> [
listen-on [
start-accept-loop
] parallel-each
] with-logging
] with-variable
] with-secure-context ;
: stop-server ( -- )
threaded-server get [ f ] change-sockets drop dispose-each ;
GENERIC: port ( addrspec -- n )
M: integer port ;
M: object port port>> ;
: secure-port ( -- n )
threaded-server get dup [ secure>> port ] when ;
: insecure-port ( -- n )
threaded-server get dup [ insecure>> port ] when ;

View File

@ -0,0 +1 @@
Multi-threaded TCP/IP servers

View File

@ -0,0 +1 @@
Slava Pestov

View File

@ -0,0 +1,21 @@
IN: io.servers.datagram
<PRIVATE
LOG: received-datagram NOTICE
: datagram-loop ( quot datagram -- )
[
[ receive dup received-datagram [ swap call ] dip ] keep
pick [ send ] [ 3drop ] if
] 2keep datagram-loop ; inline
: spawn-datagrams ( quot addrspec -- )
<datagram> [ datagram-loop ] with-disposal ; inline
\ spawn-datagrams NOTICE add-input-logging
PRIVATE>
: with-datagrams ( seq service quot -- )
'[ [ , _ spawn-datagrams ] parallel-each ] with-logging ; inline

View File

@ -0,0 +1 @@
Multi-threaded UDP/IP servers

View File

@ -0,0 +1 @@
network

View File

@ -1 +1,4 @@
! No unit tests here, until Windows SSL is implemented
IN: io.sockets.secure.tests
USING: io.sockets.secure tools.test ;
[ "hello" 24 ] [ "hello" 24 <inet> <secure> [ host>> ] [ port>> ] bi ] unit-test

View File

@ -1,7 +1,7 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: accessors kernel symbols namespaces continuations
destructors io.sockets sequences inspector calendar ;
destructors io.sockets sequences inspector calendar delegate ;
IN: io.sockets.secure
SYMBOL: secure-socket-timeout
@ -42,8 +42,10 @@ TUPLE: secure addrspec ;
C: <secure> secure
: resolve-secure-host ( host port passive? -- seq )
resolve-host [ <secure> ] map ;
CONSULT: inet secure addrspec>> ;
M: secure resolve-host ( secure -- seq )
addrspec>> resolve-host [ <secure> ] map ;
HOOK: check-certificate secure-socket-backend ( host handle -- )
@ -53,9 +55,8 @@ PREDICATE: secure-inet < secure addrspec>> inet? ;
M: secure-inet (client)
[
addrspec>>
[ [ host>> ] [ port>> ] bi f resolve-secure-host (client) >r |dispose r> ] keep
host>> pick handle>> check-certificate
[ resolve-host (client) [ |dispose ] dip ] keep
addrspec>> host>> pick handle>> check-certificate
] with-destructors ;
PRIVATE>

View File

@ -27,7 +27,7 @@ $nl
{ { $link inet4 } " - a TCP/IP connection to an IPv4 address and port number; no name lookup is performed" }
{ { $link inet6 } " - a TCP/IP connection to an IPv6 address and port number; no name lookup is performed" }
}
"The " { $vocab-link "io.server" } " library defines a nice high-level wrapper around " { $link <server> } " which makes it easy to listen for IPv4 and IPv6 connections simultaneously, perform logging, and optionally only allow connections from the loopback interface."
"The " { $vocab-link "io.servers.connection" } " library defines high-level wrappers around " { $link <server> } " which makes it easy to listen for IPv4, IPv6 and secure socket connections simultaneously, perform logging, and optionally only allow connections from the loopback interface."
{ $see-also "io.sockets.secure" } ;
ARTICLE: "network-packet" "Packet-oriented networking"
@ -79,7 +79,7 @@ HELP: inet
HELP: inet4
{ $class-description "IPv4 address/port number specifier for TCP/IP and UDP/IP connections. The " { $snippet "host" } " and " { $snippet "port" } " slots hold the IPv4 address and port number, respectively. New instances are created by calling " { $link <inet4> } "." }
{ $notes
"New instances should not be created directly; instead, use " { $link resolve-host } " to look up the address associated to a host name. Also, try to support IPv6 where possible."
"Most applications do not operate on IPv4 addresses directly, and instead should use " { $link resolve-host } " to look up the address associated to a host name. Also, try to support IPv6 where possible."
}
{ $examples
{ $code "\"127.0.0.1\" 8080 <inet4>" }
@ -88,7 +88,7 @@ HELP: inet4
HELP: inet6
{ $class-description "IPv6 address/port number specifier for TCP/IP and UDP/IP connections. The " { $snippet "host" } " and " { $snippet "port" } " slots hold the IPv6 address and port number, respectively. New instances are created by calling " { $link <inet6> } "." }
{ $notes
"New instances should not be created directly; instead, use " { $link resolve-host } " to look up the address associated to a host name." }
"Most applications do not operate on IPv6 addresses directly, and instead should use " { $link resolve-host } " to look up the address associated to a host name." }
{ $examples
{ $code "\"::1\" 8080 <inet6>" }
} ;
@ -118,10 +118,10 @@ HELP: <server>
}
{ $notes
"To start a TCP/IP server which listens for connections from any host, use an address specifier returned by the following code, where 1234 is the desired port number:"
{ $code "f 1234 t resolve-host" }
{ $code "f 1234 <inet> resolve-host" }
"To start a server which listens for connections from the loopback interface only, use an address specifier returned by the following code, where 1234 is the desired port number:"
{ $code "\"localhost\" 1234 t resolve-host" }
"Since " { $link resolve-host } " can return multiple address specifiers, your server code must listen on them all to work properly. The " { $vocab-link "io.server" } " vocabulary can be used to help with this."
{ $code "\"localhost\" 1234 <inet> resolve-host" }
"Since " { $link resolve-host } " can return multiple address specifiers, your server code must listen on them all to work properly. The " { $vocab-link "io.servers.connection" } " vocabulary can be used to help with this."
$nl
"To start a TCP/IP server which listens for connections on a randomly-assigned port, set the port number in the address specifier to 0, and then read the " { $snippet "addr" } " slot of the server instance to obtain the actual port number it is listening on:"
{ $unchecked-example
@ -148,9 +148,9 @@ HELP: <datagram>
}
{ $notes
"To accept UDP/IP packets from any host, use an address specifier returned by the following code, where 1234 is the desired port number:"
{ $code "f 1234 t resolve-host" }
{ $code "f 1234 <inet> resolve-host" }
"To accept UDP/IP packets from the loopback interface only, use an address specifier returned by the following code, where 1234 is the desired port number:"
{ $code "\"localhost\" 1234 t resolve-host" }
{ $code "\"localhost\" 1234 <inet> resolve-host" }
"Since " { $link resolve-host } " can return multiple address specifiers, your code must create a datagram socket for each one and co-ordinate packet sending accordingly."
"Datagrams are low-level binary ports that don't map onto streams, so the constructor does not use an encoding"
}
@ -165,3 +165,7 @@ HELP: send
{ $values { "packet" byte-array } { "addrspec" "an address specifier" } { "datagram" "a datagram socket" } }
{ $description "Sends a packet to the given address." }
{ $errors "Throws an error if the packet could not be sent." } ;
HELP: resolve-host
{ $values { "addrspec" "an address specifier" } { "seq" "a sequence of address specifiers" } }
{ $description "Resolves host names to IP addresses." } ;

View File

@ -45,7 +45,7 @@ concurrency.promises threads io.streams.string ;
[ "1:2:0:0:0:0:3:4" ]
[ B{ 0 1 0 2 0 0 0 0 0 0 0 0 0 3 0 4 } T{ inet6 } inet-ntop ] unit-test
[ t ] [ "localhost" 80 f resolve-host length 1 >= ] unit-test
[ t ] [ "localhost" 80 <inet> resolve-host length 1 >= ] unit-test
! Smoke-test UDP
[ ] [ "127.0.0.1" 0 <inet4> <datagram> "datagram1" set ] unit-test

View File

@ -259,20 +259,26 @@ HOOK: (send) io-backend ( packet addrspec datagram -- )
[ addrinfo>addrspec ] map
sift ;
: prepare-resolve-host ( host serv passive? -- host' serv' flags )
: prepare-resolve-host ( addrspec -- host' serv' flags )
#! If the port is a number, we resolve for 'http' then
#! change it later. This is a workaround for a FreeBSD
#! getaddrinfo() limitation -- on Windows, Linux and Mac,
#! we can convert a number to a string and pass that as the
#! service name, but on FreeBSD this gives us an unknown
#! service error.
>r
dup integer? [ port-override set "http" ] when
r> AI_PASSIVE 0 ? ;
[ host>> ]
[ port>> dup integer? [ port-override set "http" ] when ] bi
over 0 AI_PASSIVE ? ;
HOOK: addrinfo-error io-backend ( n -- )
: resolve-host ( host serv passive? -- seq )
GENERIC: resolve-host ( addrspec -- seq )
TUPLE: inet host port ;
C: <inet> inet
M: inet resolve-host
[
prepare-resolve-host
"addrinfo" <c-object>
@ -284,17 +290,16 @@ HOOK: addrinfo-error io-backend ( n -- )
freeaddrinfo
] with-scope ;
M: f resolve-host drop { } ;
M: object resolve-host 1array ;
: host-name ( -- string )
256 <byte-array> dup dup length gethostname
zero? [ "gethostname failed" throw ] unless
ascii alien>string ;
TUPLE: inet host port ;
C: <inet> inet
M: inet (client)
[ host>> ] [ port>> ] bi f resolve-host (client) ;
M: inet (client) resolve-host (client) ;
ERROR: invalid-inet-server addrspec ;