]> Repositorios git - scryer-prolog.git/commitdiff
HTTP Server 2.0
authorAdrián Arroyo Calle <[email protected]>
Thu, 11 Aug 2022 18:25:19 +0000 (20:25 +0200)
committerMark Thom <[email protected]>
Thu, 27 Oct 2022 05:36:07 +0000 (23:36 -0600)
12 files changed:
Cargo.lock
Cargo.toml
build/instructions_template.rs
src/arena.rs
src/http.rs [new file with mode: 0644]
src/lib.rs
src/lib/http/http_server.pl
src/machine/dispatch.rs
src/machine/mod.rs
src/machine/streams.rs
src/machine/system_calls.rs
src/macros.rs

index e914dcd9d3343600297d476dd0e4801ff93fc306..05adcc7e85074ad811187079781e45c5be5da55a 100644 (file)
@@ -411,6 +411,21 @@ dependencies = [
  "new_debug_unreachable",
 ]
 
+[[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
 [[package]]
 name = "futures-channel"
 version = "0.3.21"
@@ -418,6 +433,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
 dependencies = [
  "futures-core",
+ "futures-sink",
 ]
 
 [[package]]
@@ -426,6 +442,34 @@ version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
 
+[[package]]
+name = "futures-executor"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2 1.0.36",
+ "quote 1.0.17",
+ "syn 1.0.90",
+]
+
 [[package]]
 name = "futures-sink"
 version = "0.3.21"
@@ -444,10 +488,16 @@ version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
 dependencies = [
+ "futures-channel",
  "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
  "futures-task",
+ "memchr",
  "pin-project-lite",
  "pin-utils",
+ "slab",
 ]
 
 [[package]]
@@ -1628,6 +1678,7 @@ dependencies = [
  "ctrlc",
  "dirs-next",
  "divrem",
+ "futures",
  "fxhash",
  "git-version",
  "hostname",
index 283081f5f3499517629723e1e75e817103f96213..6ccdf31461a88dd55022fd73bcb16013ef94e61c 100644 (file)
@@ -67,6 +67,7 @@ ryu = "1.0.9"
 hyper = { version = "0.14", features = ["full"] }
 hyper-tls = "0.5.0"
 tokio = { version = "1", features = ["full"] }
+futures = "0.3"
 
 [dev-dependencies]
 assert_cmd = "1.0.3"
index 4cd660dfd9ff67b17233eaa4f58a66474954a694..9725178cc21f967c04dddf172892b2d957d309c2 100644 (file)
@@ -544,6 +544,12 @@ enum SystemClauseType {
     DeterministicLengthRundown,
     #[strum_discriminants(strum(props(Arity = "7", Name = "$http_open")))]
     HttpOpen,
+    #[strum_discriminants(strum(props(Arity = "2", Name = "$http_listen")))]
+    HttpListen,
+    #[strum_discriminants(strum(props(Arity = "7", Name = "$http_accept")))]
+    HttpAccept,
+    #[strum_discriminants(strum(props(Arity = "4", Name = "$http_answer")))]
+    HttpAnswer,
     #[strum_discriminants(strum(props(Arity = "3", Name = "$predicate_defined")))]
     PredicateDefined,
     #[strum_discriminants(strum(props(Arity = "3", Name = "$strip_module")))]
@@ -1686,6 +1692,9 @@ fn generate_instruction_preface() -> TokenStream {
                     &Instruction::CallCpuNow(_) |
                     &Instruction::CallDeterministicLengthRundown(_) |
                     &Instruction::CallHttpOpen(_) |
+                    &Instruction::CallHttpListen(_) |
+                    &Instruction::CallHttpAccept(_) |
+                    &Instruction::CallHttpAnswer(_) |
                     &Instruction::CallPredicateDefined(_) |
                     &Instruction::CallStripModule(_) |
                     &Instruction::CallCurrentTime(_) |
@@ -1895,6 +1904,9 @@ fn generate_instruction_preface() -> TokenStream {
                     &Instruction::ExecuteCpuNow(_) |
                     &Instruction::ExecuteDeterministicLengthRundown(_) |
                     &Instruction::ExecuteHttpOpen(_) |
+                    &Instruction::ExecuteHttpListen(_) |
+                    &Instruction::ExecuteHttpAccept(_) |
+                    &Instruction::ExecuteHttpAnswer(_) |
                     &Instruction::ExecutePredicateDefined(_) |
                     &Instruction::ExecuteStripModule(_) |
                     &Instruction::ExecuteCurrentTime(_) |
index bf0cd3ce8c9b2613b17162a6136d9ea703058435..3e75e12020bb5ea6e6c0cfe3414230ba1e4f1332 100644 (file)
@@ -1,3 +1,4 @@
+use crate::http::{HttpListener, HttpResponse};
 use crate::machine::loader::LiveLoadState;
 use crate::machine::machine_indices::*;
 use crate::machine::streams::*;
@@ -139,7 +140,8 @@ pub enum ArenaHeaderTag {
     OutputFileStream = 0b10100,
     NamedTcpStream = 0b011100,
     NamedTlsStream = 0b100000,
-    NamedHttpClientStream =  0b100001,
+    HttpReadStream =  0b100001,
+    HttpWriteStream = 0b100010,
     ReadlineStream = 0b110000,
     StaticStringStream = 0b110100,
     ByteStream = 0b111000,
@@ -147,6 +149,8 @@ pub enum ArenaHeaderTag {
     StandardErrorStream = 0b11000,
     NullStream = 0b111100,
     TcpListener = 0b1000000,
+    HttpListener = 0b1000001,
+    HttpResponse = 0b1000010,
     Dropped = 0b1000100,
     IndexPtrDynamicUndefined = 0b1000101,
     IndexPtrDynamicIndex = 0b1000110,
@@ -560,6 +564,50 @@ impl ArenaAllocated for TcpListener {
     }
 }
 
+impl ArenaAllocated for HttpListener {
+    type PtrToAllocated = TypedArenaPtr<HttpListener>;
+
+    #[inline]
+    fn tag() -> ArenaHeaderTag {
+       ArenaHeaderTag::HttpListener
+    }
+
+    #[inline]
+    fn size(&self) -> usize {
+       mem::size_of::<Self>()
+    }
+
+    #[inline]
+    fn copy_to_arena(self, dst: *mut Self) -> Self::PtrToAllocated {
+       unsafe {
+           ptr::write(dst, self);
+           TypedArenaPtr::new(dst as *mut Self)
+       }
+    }
+}
+
+impl ArenaAllocated for HttpResponse {
+    type PtrToAllocated = TypedArenaPtr<HttpResponse>;
+
+    #[inline]
+    fn tag() -> ArenaHeaderTag {
+       ArenaHeaderTag::HttpResponse
+    }
+
+    #[inline]
+    fn size(&self) -> usize {
+       mem::size_of::<Self>()
+    }
+
+    #[inline]
+    fn copy_to_arena(self, dst: *mut Self) -> Self::PtrToAllocated {
+       unsafe {
+           ptr::write(dst, self);
+           TypedArenaPtr::new(dst as *mut Self)
+       }
+    }
+}
+
 impl ArenaAllocated for IndexPtr {
     type PtrToAllocated = TypedArenaPtr<IndexPtr>;
 
@@ -647,9 +695,12 @@ unsafe fn drop_slab_in_place(value: &mut AllocSlab) {
         ArenaHeaderTag::NamedTlsStream => {
             ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<NamedTlsStream>>>());
         }
-        ArenaHeaderTag::NamedHttpClientStream => {
-            ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<NamedHttpClientStream>>>());
+        ArenaHeaderTag::HttpReadStream => {
+            ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<HttpReadStream>>>());
         }
+       ArenaHeaderTag::HttpWriteStream => {
+           ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<HttpWriteStream>>>());
+       }
         ArenaHeaderTag::ReadlineStream => {
             ptr::drop_in_place(value.payload_offset::<StreamLayout<ReadlineStream>>());
         }
@@ -670,6 +721,12 @@ unsafe fn drop_slab_in_place(value: &mut AllocSlab) {
         ArenaHeaderTag::TcpListener => {
             ptr::drop_in_place(value.payload_offset::<TcpListener>());
         }
+       ArenaHeaderTag::HttpListener => {
+           ptr::drop_in_place(value.payload_offset::<HttpListener>());
+       }
+       ArenaHeaderTag::HttpResponse => {
+           ptr::drop_in_place(value.payload_offset::<HttpResponse>());
+       }
         ArenaHeaderTag::StandardOutputStream => {
             ptr::drop_in_place(value.payload_offset::<StreamLayout<StandardOutputStream>>());
         }
diff --git a/src/http.rs b/src/http.rs
new file mode 100644 (file)
index 0000000..887366f
--- /dev/null
@@ -0,0 +1,25 @@
+use std::sync::Arc;
+use std::convert::Infallible;
+
+use hyper::{Response, Request, Body};
+use tokio::sync::Mutex;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+
+pub struct HttpListener {
+    pub incoming: Receiver<HttpRequest>
+}
+
+#[derive(Debug)]
+pub struct HttpRequest {
+    pub request: Request<Body>,
+    pub response: HttpResponse,
+}
+
+pub type HttpResponse = Sender<Response<Body>>;
+
+pub async fn serve_req(req: Request<Body>, tx: Arc<Mutex<Sender<HttpRequest>>>) -> Result<Response<Body>, Infallible> {
+    let (response_tx, mut rx) = channel(1);
+    let http_request = HttpRequest { request: req, response: response_tx };
+    tx.lock().await.send(http_request).await.unwrap();
+    Ok(rx.recv().await.unwrap())
+}
index fc8d8ca539e5bbf8ffd909decf3a7aeb1e65e74a..2846fd0e64448dbfe4ad778e2cb26cf801153a2e 100644 (file)
@@ -19,6 +19,7 @@ mod fixtures;
 mod forms;
 mod heap_iter;
 pub mod heap_print;
+mod http;
 mod indexing;
 #[macro_use]
 pub mod instructions {
index 72657c35d301964873ef68297695b54449e4b6c1..a49969eee25e2582e2a1ddc7f2fb28f4a94d2234 100644 (file)
@@ -1,11 +1,11 @@
 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    Written in December 2020 by Adrián Arroyo ([email protected])
+   Updated in March 2022 by Adrián Arroyo to use the Hyper backend
    Part of Scryer Prolog
 
    This library provides an starting point to build HTTP server based applications.
-   It currently implements a subset of HTTP/1.0. It is recommended to put a reverse
-   proxy like nginx in front of this server to have access to more advanced features
-   (gzip compression, HTTPS, ...)
+   It is based on Hyper, which allows for HTTP/1.0, HTTP/1.1 and HTTP/2. However,
+   some advanced features that Hyper provides are still not accesible.
 
    Usage
    ==========
    Some things that are still missing:
    - Read forms in multipart format
    - HTTP Basic Auth
-   - Keep-Alive support
    - Session handling via cookies
    - HTML Templating
 
    I place this code in the public domain. Use it in any way you want.
 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
 
+
 :- module(http_server, [
-    http_listen/2,
-    http_headers/2,
-    http_status_code/2,
-    http_body/2,
-    http_redirect/2,
-    http_query/3,
-    url_decode//1
+             http_listen/2,
+             http_headers/2,
+             http_status_code/2,
+             http_body/2,
+             http_redirect/2,
+             http_query/3
 ]).
 
-:- meta_predicate http_listen(?, 2).
+:- meta_predicate http_listen(?, :).
 
-:- use_module(library(sockets)).
-:- use_module(library(dcgs)).
-:- use_module(library(format)).
-:- use_module(library(error)).
 :- use_module(library(charsio)).
-:- use_module(library(lists)).
+:- use_module(library(crypto)).
+:- use_module(library(error)).
+:- use_module(library(format)).
 :- use_module(library(iso_ext)).
+:- use_module(library(lists)).
+:- use_module(library(pio)).
 :- use_module(library(time)).
-:- use_module(library(crypto)).
 
-% Module prefix workaround with meta_predicate
 http_listen(Port, Module:Handlers0) :-
+    must_be(integer, Port),
+    must_be(list, Handlers0),
     maplist(module_qualification(Module), Handlers0, Handlers),
     http_listen_(Port, Handlers).
 
@@ -79,49 +78,75 @@ module_qualification(M, H0, H) :-
     H0 =.. [Method, Path, Goal],
     H =.. [Method, Path, M:Goal].
 
-% Server initialization
 http_listen_(Port, Handlers) :-
-    must_be(integer, Port),
-    must_be(list, Handlers),
-    once(socket_server_open(Port, Socket)),
-    format("Listening at port ~d\n", [Port]),
-    accept_loop(Socket, Handlers).
-
-% Server loop
-accept_loop(Socket, Handlers) :-
-    setup_call_cleanup(socket_server_accept(Socket, _Client, Stream, [type(binary)]),
-        (
-            read_header_lines(Stream, Lines),
-            [Request|Headers] = Lines,
-            (
-                (phrase(parse_request(_Version, Method, Path, Queries), Request), maplist(map_parse_header, Headers, HeadersKV)) -> (
-                        (
-                            member("content-length"-ContentLength, HeadersKV) ->
-                                (number_chars(ContentLengthN, ContentLength), get_bytes(Stream, ContentLengthN, Body))
-                                ;true
-                        ),
-                        current_time(Time),
-                        phrase(format_time("%Y-%m-%d (%H:%M:%S)", Time), TimeString),
-                        format("~s ~w ~s\n", [TimeString, Method, Path]),
-                        (
-                            match_handler(Handlers, Method, Path, Handler) ->
-                            (
-                                HttpRequest = http_request(HeadersKV, binary(Body), Queries),
-                                HttpResponse = http_response(_, _, _),
-                                (call(Handler, HttpRequest, HttpResponse) ->
-                                    send_response(Stream, HttpResponse)
-                                ;   format(Stream, "HTTP/1.0 500 Internal Server Error\r\n\r\n", [])
-                                )
-                            )
-                        ;   format(Stream, "HTTP/1.0 404 Not Found\r\n\r\n", [])
-                        )
-                    );(
-                        format(Stream, "HTTP/1.0 400 Bad Request\r\n\r\n", []) % bad format
-                    )
-            ),
-            ! % Remove
-        ), close(Stream)),
-    accept_loop(Socket, Handlers).
+    number_chars(Port, CPort),
+    append("0.0.0.0:", CPort, Addr),
+    '$http_listen'(Addr, HttpListener),!,
+    format("Listening at ~s\n", [Addr]),
+    http_loop(HttpListener, Handlers).
+
+http_loop(HttpListener, Handlers) :-
+    '$http_accept'(HttpListener, RequestMethod, RequestPath, RequestHeaders, RequestQuery, RequestStream, ResponseHandle),
+    current_time(Time),
+    phrase(format_time("%Y-%m-%d (%H:%M:%S)", Time), TimeString),
+    format("~s ~w ~s\n", [TimeString, RequestMethod, RequestPath]),
+    maplist(map_header_kv, RequestHeaders, RequestHeadersKV),
+    phrase(parse_queries(RequestQueries), RequestQuery),
+    (
+       match_handler(Handlers, RequestMethod, RequestPath, Handler) ->
+       (
+           HttpRequest = http_request(RequestHeadersKV, stream(RequestStream), RequestQueries),
+           HttpResponse = http_response(_, _, _),
+           (call(Handler, HttpRequest, HttpResponse) ->
+                send_response(ResponseHandle, HttpResponse)
+           ;    '$http_answer'(ResponseHandle, 500, [], "Internal Server Error")
+           )
+       )
+    ; '$http_answer'(ResponseHandle, 404, [], "Not Found")
+    ),
+    http_loop(HttpListener, Handlers).
+
+send_response(ResponseHandle, http_response(StatusCode0, text(ResponseText), ResponseHeaders0)) :-
+    default(StatusCode0, 200, StatusCode),
+    maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0),
+    '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream),
+    format(ResponseStream, "~s", [ResponseText]),
+    close(ResponseStream).
+
+send_response(ResponseHandle, http_response(StatusCode0, bytes(ResponseBytes), ResponseHeaders0)) :-
+    default(StatusCode0, 200, StatusCode),
+    maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0),
+    '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream),
+    format(ResponseStream, "~s", [ResponseBytes]),
+    close(ResponseStream).
+
+send_response(ResponseHandle, http_response(StatusCode0, file(Filename), ResponseHeaders0)) :-
+    default(StatusCode0, 200, StatusCode),
+    maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0),
+    '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream),
+    setup_call_cleanup(
+       open(Filename, read, FileStream, [type(binary)]),
+       (
+           get_n_chars(FileStream, _, FileCs),
+           format(ResponseStream, "~s", [FileCs])
+       ),
+       close(FileStream)
+    ),
+    close(ResponseStream).
+    
+
+default(Var, Default, Out) :-
+    (var(Var) -> Out = Default
+    ;   Var = Out  
+    ).
+
+map_header_kv(T, K-V) :-
+    T =.. [K0, V],
+    atom_chars(K0, K).
+
+map_header_kv_2(T, K-V) :-
+    atom_chars(K0, K),
+    T =.. [K0, V].
 
 match_handler(Handlers, Method, "/", Handler) :-
     member(H, Handlers),
@@ -139,27 +164,6 @@ match_handler(Handlers, Method, Path, Handler) :-
     var(Var),
     Var = Path.
 
-
-% Helper and recommended predicates
-
-http_headers(http_request(Headers, _, _), Headers).
-http_headers(http_response(_, _, Headers), Headers).
-
-http_body(http_request(_, binary(ByteBody), _), text(TextBody)) :- chars_utf8bytes(TextBody, ByteBody).
-http_body(http_request(Headers, binary(ByteBody), _), form(FormBody)) :- 
-    member("content-type"-"application/x-www-form-urlencoded", Headers),
-    chars_utf8bytes(TextBody, ByteBody),
-    phrase(parse_queries(FormBody), TextBody).
-http_body(http_request(_, Body, _), Body).
-http_body(http_response(_, Body, _), Body).
-
-http_status_code(http_response(StatusCode, _, _), StatusCode).
-
-http_redirect(http_response(307, text("Moved Temporarily"), ["Location"-Uri]), Uri).
-
-http_query(http_request(_, _, Queries), Key, Value) :- member(Key-Value, Queries).
-
-% Route matching
 path(Pattern) -->
     {
         Pattern =.. Parts,
@@ -183,76 +187,31 @@ path(Pattern) -->
 
 path([]) --> [].
 
-% Send responses
-send_response(Stream, http_response(StatusCode0, file(Filename), Headers)) :-
-    default(StatusCode0, 200, StatusCode),
-    format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]),
-    overwrite_header("connection"-"Close", Headers, Headers0),
-    write_headers(Stream, Headers0),
-    format(Stream, "\r\n", []),
-    setup_call_cleanup(
-        open(Filename, read, FileStream, [type(binary)]),
-        pipe_bytes(FileStream, Stream),
-        close(FileStream)
-    ).
-
-send_response(Stream, http_response(StatusCode0, text(TextResponse), Headers)) :-
-    default(StatusCode0, 200, StatusCode),
-    format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]),
-    overwrite_header("content-type"-"text/plain", Headers, Headers0),
-    overwrite_header("connection"-"Close", Headers0, Headers1),
-    write_headers(Stream, Headers1),
-    format(Stream, "\r\n~s", [TextResponse]).
+string_without(Not, [Char|String]) -->
+    [Char],
+    {
+        \+ member(Char, Not)
+    },
+    string_without(Not, String).
 
-send_response(Stream, http_response(StatusCode0, binary(BinaryResponse), Headers)) :-
-    default(StatusCode0, 200, StatusCode),
-    format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]),
-    overwrite_header("connection"-"Close", Headers, Headers0),
-    write_headers(Stream, Headers0),
-    format(Stream, "\r\n", []),
-    put_bytes(Stream, BinaryResponse).
+string_without(_, []) -->
+    [].
 
-default(Var, Default, Out) :-
-    (var(Var) -> Out = Default
-    ;   Var = Out  
-    ).
+http_headers(http_request(Headers, _, _), Headers).
+http_headers(http_response(_, _, Headers), Headers).
 
-header([]) --> [].
-header([Key-Value|Headers]) -->
-    format_("~s: ~s\r\n", [Key, Value]),
-    header(Headers).
-
-write_headers(Stream, Headers) :-
-    phrase(header(Headers), Cs),
-    format(Stream, "~s", [Cs]).
-
-overwrite_header(Key-Value, [], [Key-Value]).
-overwrite_header(Key-Value, [Header|Headers], [Header|HeadersOut]) :-
-    Header = Key0-_,
-    Key0 \= Key,
-    overwrite_header(Key-Value, Headers, HeadersOut).
-overwrite_header(Key-Value, [Header|Headers], [NewHeader|Headers]) :-
-    Header = Key-_,
-    NewHeader = Key-Value.
-
-parse_request(http_version(Major, Minor), Method, Path, Queries) -->
-    method(Method),
-    " ",
-    parse_path(Path, Queries),
-    " ",
-    "HTTP/",
-    natural(Major),
-    ".",
-    natural(Minor),
-    "\r\n".
-
-parse_path(Path, Queries) -->
-    string_without("?", Path),
-    "?",
-    parse_queries(Queries).
+http_body(http_request(_, stream(StreamBody), _), bytes(BytesBody)) :- get_n_chars(StreamBody, _, BytesBody).
+http_body(http_request(_, stream(StreamBody), _), text(TextBody)) :- get_n_chars(StreamBody, _, TextBody).
+http_body(http_request(Headers, stream(StreamBody), _), form(FormBody)) :- 
+    member("content-type"-"application/x-www-form-urlencoded", Headers),
+    get_n_chars(StreamBody, _, TextBody),
+    phrase(parse_queries(FormBody), TextBody).
+http_body(http_request(_, Body, _), Body).
+http_body(http_response(_, Body, _), Body).
 
-parse_path(Path, []) -->
-    string_without(" ", Path).
+http_status_code(http_response(StatusCode, _, _), StatusCode).
+http_redirect(http_response(307, text("Moved Temporarily"), ["Location"-Uri]), Uri).
+http_query(http_request(_, _, Queries), Key, Value) :- member(Key-Value, Queries).
 
 parse_queries([Key-Value|Queries]) -->
     string_without("=", Key0),
@@ -278,94 +237,9 @@ parse_queries([Key-Value]) -->
         phrase(url_decode(Value), Value0)  
     }.
 
-map_parse_header(Header, HeaderKV) :-
-    phrase(parse_header(HeaderKV), Header).
-
-parse_header(Key-Value) -->
-    string_without(":", Key0),
-    {
-        chars_lower(Key0, Key)
-    },
-    ": ",
-    string_without("\r", Value),
-    "\r\n".
-
-method(options) --> "OPTIONS".
-method(get) --> "GET".
-method(head) --> "HEAD".
-method(post) --> "POST".
-method(put) --> "PUT".
-method(delete) --> "DELETE".
-
-string_without(Not, [Char|String]) -->
-    [Char],
-    {
-        \+ member(Char, Not)
-    },
-    string_without(Not, String).
-
-string_without(_, []) -->
+parse_queries([]) -->
     [].
 
-natural(Nat) -->
-    natural_(NatChars),
-    {
-        number_chars(Nat, NatChars)
-    }.
-
-natural_([Nat|Nats]) -->
-    [Nat],
-    {
-        char_type(Nat, decimal_digit)
-    },
-    natural_(Nats).
-
-natural_([]) -->
-    [].
-
-read_header_lines(Stream, Hs) :-
-        read_line_to_chars(Stream, Cs, []),
-        (   Cs == "" -> Hs = []
-        ;   Cs == "\r\n" -> Hs = []
-        ;   Hs = [Cs|Rest],
-            read_header_lines(Stream, Rest)
-        ).
-
-get_bytes(Stream, Length, Res) :- get_bytes(Stream, Length, [], Res).
-get_bytes(Stream, Length, Acc, Res) :-
-  (Length > 0 -> (
-    get_byte(Stream, B),
-    B =\= -1,
-    get_bytes(Stream, Length - 1, [B|Acc], Res)
-  ); reverse(Acc, Res)).
-
-put_bytes(_, []).
-put_bytes(Stream, [Byte|Bytes]) :-
-    put_byte(Stream, Byte),
-    put_bytes(Stream, Bytes).
-
-pipe_bytes(StreamIn, StreamOut) :-
-    get_byte(StreamIn, Byte),
-    (
-        Byte =\= -1 ->
-        (
-            put_byte(StreamOut, Byte),
-            pipe_bytes(StreamIn, StreamOut)
-        )
-    ;   true).
-
-% WARNING: This only works for ASCII chars. This code can be modified to support
-% Latin1 characters also but a completely different approach is needed for other
-% languages. Since HTTP internals are ASCII, this is fine for this usecase.
-chars_lower(Chars, Lower) :-
-    maplist(char_lower, Chars, Lower).
-char_lower(Char, Lower) :-
-    char_code(Char, Code),
-    ((Code >= 65,Code =< 90) ->
-        LowerCode is Code + 32,
-        char_code(Lower, LowerCode)
-    ;   Char = Lower).
-
 % Decodes a UTF-8 URL Encoded string: RFC-1738
 url_decode([Char|Chars]) -->
     [Char],
index 20e76b24b51d4fdc44f2c5f631cd45ca5c7f7846..82ef48264e95e523226423c91cfb354e4fc09233 100644 (file)
@@ -4197,6 +4197,30 @@ impl Machine {
                     try_or_throw!(self.machine_st, self.http_open());
                     step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
                 }
+               &Instruction::CallHttpListen(_) => {
+                   try_or_throw!(self.machine_st, self.http_listen());
+                   step_or_fail!(self, self.machine_st.p += 1);
+               }
+               &Instruction::ExecuteHttpListen(_) => {
+                   try_or_throw!(self.machine_st, self.http_listen());
+                   step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
+               }
+               &Instruction::CallHttpAccept(_) => {
+                   try_or_throw!(self.machine_st, self.http_accept());
+                   step_or_fail!(self, self.machine_st.p += 1);
+               }
+               &Instruction::ExecuteHttpAccept(_) => {
+                   try_or_throw!(self.machine_st, self.http_accept());
+                   step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
+               }
+               &Instruction::CallHttpAnswer(_) => {
+                   try_or_throw!(self.machine_st, self.http_answer());
+                   step_or_fail!(self, self.machine_st.p += 1);
+               }
+               &Instruction::ExecuteHttpAnswer(_) => {
+                   try_or_throw!(self.machine_st, self.http_answer());
+                   step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
+               }
                 &Instruction::CallCurrentTime(_) => {
                     self.current_time();
                     step_or_fail!(self, self.machine_st.p += 1);
index f34f38fbe70df737cbb361315eb29f2928bbf16f..4cd1fce83d84abdc2194699a28900a2a95ad9895 100644 (file)
@@ -431,9 +431,7 @@ impl Machine {
         let user_output = Stream::stdout(&mut machine_st.arena);
         let user_error = Stream::stderr(&mut machine_st.arena);
 
-        let runtime = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
+        let runtime = tokio::runtime::Runtime::new()
             .unwrap();
 
         let mut wam = Machine {
index 548cf6ac377cc4ed83eb2e604266f0e27c96e952..9e558af59540dd172d1da401bd09271b52af2252 100644 (file)
@@ -26,6 +26,7 @@ use std::ops::{Deref, DerefMut};
 use std::ptr;
 
 use native_tls::TlsStream;
+use hyper::body::{Bytes, Sender};
 
 #[derive(Debug, BitfieldSpecifier, Clone, Copy, PartialEq, Eq, Hash)]
 #[bits = 1]
@@ -249,24 +250,51 @@ impl Write for NamedTlsStream {
     }
 }
 
-pub struct NamedHttpClientStream {
+pub struct HttpReadStream {
     url: Atom,
     body_reader: Box<dyn BufRead>,
 }
 
-impl Debug for NamedHttpClientStream {
+impl Debug for HttpReadStream {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        write!(f, "Http Client Stream [{}]", self.url.as_str())
+        write!(f, "Http Read Stream [{}]", self.url.as_str())
     }
 }
 
-impl Read for NamedHttpClientStream {
+impl Read for HttpReadStream {
     #[inline]
     fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
         self.body_reader.read(buf)
     }
 }
 
+pub struct HttpWriteStream {
+    body_writer: Sender,
+}
+
+impl Debug for HttpWriteStream {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+       write!(f, "Http Write Stream")
+    }
+}
+
+impl Write for HttpWriteStream {
+    #[inline]
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+       let bytes = Bytes::copy_from_slice(buf);
+       let len = bytes.len();
+       match self.body_writer.try_send_data(bytes) {
+           Ok(()) => Ok(len),
+           Err(_) => Err(std::io::Error::from(ErrorKind::Interrupted))
+       }
+    }
+
+    #[inline]
+    fn flush(&mut self) -> std::io::Result<()> {
+       Ok(())
+    }
+}
+
 #[derive(Debug)]
 pub struct StandardOutputStream {}
 
@@ -405,7 +433,8 @@ arena_allocated_impl_for_stream!(CharReader<InputFileStream>, InputFileStream);
 arena_allocated_impl_for_stream!(OutputFileStream, OutputFileStream);
 arena_allocated_impl_for_stream!(CharReader<NamedTcpStream>, NamedTcpStream);
 arena_allocated_impl_for_stream!(CharReader<NamedTlsStream>, NamedTlsStream);
-arena_allocated_impl_for_stream!(CharReader<NamedHttpClientStream>, NamedHttpClientStream);
+arena_allocated_impl_for_stream!(CharReader<HttpReadStream>, HttpReadStream);
+arena_allocated_impl_for_stream!(CharReader<HttpWriteStream>, HttpWriteStream);
 arena_allocated_impl_for_stream!(ReadlineStream, ReadlineStream);
 arena_allocated_impl_for_stream!(StaticStringStream, StaticStringStream);
 arena_allocated_impl_for_stream!(StandardOutputStream, StandardOutputStream);
@@ -419,7 +448,8 @@ pub enum Stream {
     StaticString(TypedArenaPtr<StreamLayout<StaticStringStream>>),
     NamedTcp(TypedArenaPtr<StreamLayout<CharReader<NamedTcpStream>>>),
     NamedTls(TypedArenaPtr<StreamLayout<CharReader<NamedTlsStream>>>),
-    NamedHttpClient(TypedArenaPtr<StreamLayout<CharReader<NamedHttpClientStream>>>),
+    HttpRead(TypedArenaPtr<StreamLayout<CharReader<HttpReadStream>>>),
+    HttpWrite(TypedArenaPtr<StreamLayout<CharReader<HttpWriteStream>>>),
     Null(StreamOptions),
     Readline(TypedArenaPtr<StreamLayout<ReadlineStream>>),
     StandardOutput(TypedArenaPtr<StreamLayout<StandardOutputStream>>),
@@ -474,7 +504,8 @@ impl Stream {
             }
             ArenaHeaderTag::NamedTcpStream => Stream::NamedTcp(TypedArenaPtr::new(ptr as *mut _)),
             ArenaHeaderTag::NamedTlsStream => Stream::NamedTls(TypedArenaPtr::new(ptr as *mut _)),
-            ArenaHeaderTag::NamedHttpClientStream => Stream::NamedHttpClient(TypedArenaPtr::new(ptr as *mut _)),
+            ArenaHeaderTag::HttpReadStream => Stream::HttpRead(TypedArenaPtr::new(ptr as *mut _)),
+           ArenaHeaderTag::HttpWriteStream => Stream::HttpWrite(TypedArenaPtr::new(ptr as *mut _)),
             ArenaHeaderTag::ReadlineStream => Stream::Readline(TypedArenaPtr::new(ptr as *mut _)),
             ArenaHeaderTag::StaticStringStream => {
                 Stream::StaticString(TypedArenaPtr::new(ptr as *mut _))
@@ -527,7 +558,8 @@ impl Stream {
             Stream::StaticString(ptr) => ptr.header_ptr(),
             Stream::NamedTcp(ptr) => ptr.header_ptr(),
             Stream::NamedTls(ptr) => ptr.header_ptr(),
-            Stream::NamedHttpClient(ptr) => ptr.header_ptr(),
+            Stream::HttpRead(ptr) => ptr.header_ptr(),
+           Stream::HttpWrite(ptr) => ptr.header_ptr(),
             Stream::Null(_) => ptr::null(),
             Stream::Readline(ptr) => ptr.header_ptr(),
             Stream::StandardOutput(ptr) => ptr.header_ptr(),
@@ -543,7 +575,8 @@ impl Stream {
             Stream::StaticString(ref ptr) => &ptr.options,
             Stream::NamedTcp(ref ptr) => &ptr.options,
             Stream::NamedTls(ref ptr) => &ptr.options,
-            Stream::NamedHttpClient(ref ptr) => &ptr.options,
+            Stream::HttpRead(ref ptr) => &ptr.options,
+           Stream::HttpWrite(ref ptr) => &ptr.options,
             Stream::Null(ref options) => options,
             Stream::Readline(ref ptr) => &ptr.options,
             Stream::StandardOutput(ref ptr) => &ptr.options,
@@ -559,7 +592,8 @@ impl Stream {
             Stream::StaticString(ref mut ptr) => &mut ptr.options,
             Stream::NamedTcp(ref mut ptr) => &mut ptr.options,
             Stream::NamedTls(ref mut ptr) => &mut ptr.options,
-            Stream::NamedHttpClient(ref mut ptr) => &mut ptr.options,
+            Stream::HttpRead(ref mut ptr) => &mut ptr.options,
+            Stream::HttpWrite(ref mut ptr) => &mut ptr.options,            
             Stream::Null(ref mut options) => options,
             Stream::Readline(ref mut ptr) => &mut ptr.options,
             Stream::StandardOutput(ref mut ptr) => &mut ptr.options,
@@ -576,7 +610,8 @@ impl Stream {
             Stream::StaticString(ptr) => ptr.lines_read += incr_num_lines_read,
             Stream::NamedTcp(ptr) => ptr.lines_read += incr_num_lines_read,
             Stream::NamedTls(ptr) => ptr.lines_read += incr_num_lines_read,
-            Stream::NamedHttpClient(ptr) => ptr.lines_read += incr_num_lines_read,
+            Stream::HttpRead(ptr) => ptr.lines_read += incr_num_lines_read,
+            Stream::HttpWrite(_) => {}  
             Stream::Null(_) => {}
             Stream::Readline(ptr) => ptr.lines_read += incr_num_lines_read,
             Stream::StandardOutput(ptr) => ptr.lines_read += incr_num_lines_read,
@@ -593,7 +628,8 @@ impl Stream {
             Stream::StaticString(ptr) => ptr.lines_read = value,
             Stream::NamedTcp(ptr) => ptr.lines_read = value,
             Stream::NamedTls(ptr) => ptr.lines_read = value,
-            Stream::NamedHttpClient(ptr) => ptr.lines_read = value,
+            Stream::HttpRead(ptr) => ptr.lines_read = value,
+            Stream::HttpWrite(_) => {}     
             Stream::Null(_) => {}
             Stream::Readline(ptr) => ptr.lines_read = value,
             Stream::StandardOutput(ptr) => ptr.lines_read = value,
@@ -610,7 +646,8 @@ impl Stream {
             Stream::StaticString(ptr) => ptr.lines_read,
             Stream::NamedTcp(ptr) => ptr.lines_read,
             Stream::NamedTls(ptr) => ptr.lines_read,
-            Stream::NamedHttpClient(ptr) => ptr.lines_read,
+            Stream::HttpRead(ptr) => ptr.lines_read,
+            Stream::HttpWrite(_) => 0,   
             Stream::Null(_) => 0,
             Stream::Readline(ptr) => ptr.lines_read,
             Stream::StandardOutput(ptr) => ptr.lines_read,
@@ -625,13 +662,14 @@ impl CharRead for Stream {
             Stream::InputFile(file) => (*file).peek_char(),
             Stream::NamedTcp(tcp_stream) => (*tcp_stream).peek_char(),
             Stream::NamedTls(tls_stream) => (*tls_stream).peek_char(),
-            Stream::NamedHttpClient(http_stream) => (*http_stream).peek_char(),
+            Stream::HttpRead(http_stream) => (*http_stream).peek_char(),
             Stream::Readline(rl_stream) => (*rl_stream).peek_char(),
             Stream::StaticString(src) => (*src).peek_char(),
             Stream::Byte(cursor) => (*cursor).peek_char(),
             Stream::OutputFile(_) |
             Stream::StandardError(_) |
             Stream::StandardOutput(_) |
+           Stream::HttpWrite(_) |
             Stream::Null(_) => Some(Err(std::io::Error::new(
                 ErrorKind::PermissionDenied,
                 StreamError::ReadFromOutputStream,
@@ -644,13 +682,14 @@ impl CharRead for Stream {
             Stream::InputFile(file) => (*file).read_char(),
             Stream::NamedTcp(tcp_stream) => (*tcp_stream).read_char(),
             Stream::NamedTls(tls_stream) => (*tls_stream).read_char(),
-            Stream::NamedHttpClient(http_stream) => (*http_stream).read_char(),
+            Stream::HttpRead(http_stream) => (*http_stream).read_char(),
             Stream::Readline(rl_stream) => (*rl_stream).read_char(),
             Stream::StaticString(src) => (*src).read_char(),
             Stream::Byte(cursor) => (*cursor).read_char(),
             Stream::OutputFile(_) |
             Stream::StandardError(_) |
             Stream::StandardOutput(_) |
+           Stream::HttpWrite(_) |
             Stream::Null(_) => Some(Err(std::io::Error::new(
                 ErrorKind::PermissionDenied,
                 StreamError::ReadFromOutputStream,
@@ -663,13 +702,14 @@ impl CharRead for Stream {
             Stream::InputFile(file) => file.put_back_char(c),
             Stream::NamedTcp(tcp_stream) => tcp_stream.put_back_char(c),
             Stream::NamedTls(tls_stream) => tls_stream.put_back_char(c),
-            Stream::NamedHttpClient(http_stream) => http_stream.put_back_char(c),
+            Stream::HttpRead(http_stream) => http_stream.put_back_char(c),
             Stream::Readline(rl_stream) => rl_stream.put_back_char(c),
             Stream::StaticString(src) => src.put_back_char(c),
             Stream::Byte(cursor) => cursor.put_back_char(c),
             Stream::OutputFile(_) |
             Stream::StandardError(_) |
             Stream::StandardOutput(_) |
+           Stream::HttpWrite(_) |
             Stream::Null(_) => {}
         }
     }
@@ -679,13 +719,14 @@ impl CharRead for Stream {
             Stream::InputFile(ref mut file) => file.consume(nread),
             Stream::NamedTcp(ref mut tcp_stream) => tcp_stream.consume(nread),
             Stream::NamedTls(ref mut tls_stream) => tls_stream.consume(nread),
-            Stream::NamedHttpClient(ref mut http_stream) => http_stream.consume(nread),
+            Stream::HttpRead(ref mut http_stream) => http_stream.consume(nread),
             Stream::Readline(ref mut rl_stream) => rl_stream.consume(nread),
             Stream::StaticString(ref mut src) => src.consume(nread),
             Stream::Byte(ref mut cursor) => cursor.consume(nread),
             Stream::OutputFile(_) |
             Stream::StandardError(_) |
             Stream::StandardOutput(_) |
+           Stream::HttpWrite(_) |
             Stream::Null(_) => {}
         }
     }
@@ -698,13 +739,14 @@ impl Read for Stream {
             Stream::InputFile(file) => (*file).read(buf),
             Stream::NamedTcp(tcp_stream) => (*tcp_stream).read(buf),
             Stream::NamedTls(tls_stream) => (*tls_stream).read(buf),
-            Stream::NamedHttpClient(http_stream) => (*http_stream).read(buf),
+            Stream::HttpRead(http_stream) => (*http_stream).read(buf),
             Stream::Readline(rl_stream) => (*rl_stream).read(buf),
             Stream::StaticString(src) => (*src).read(buf),
             Stream::Byte(cursor) => (*cursor).read(buf),
             Stream::OutputFile(_)
             | Stream::StandardError(_)
-            | Stream::StandardOutput(_)
+           | Stream::StandardOutput(_)
+           | Stream::HttpWrite(_)
             | Stream::Null(_) => Err(std::io::Error::new(
                 ErrorKind::PermissionDenied,
                 StreamError::ReadFromOutputStream,
@@ -724,7 +766,8 @@ impl Write for Stream {
             Stream::Byte(ref mut cursor) => cursor.get_mut().write(buf),
             Stream::StandardOutput(stream) => stream.write(buf),
             Stream::StandardError(stream) => stream.write(buf),
-            Stream::NamedHttpClient(_) |
+           Stream::HttpWrite(ref mut stream) => stream.get_mut().write(buf),
+            Stream::HttpRead(_) |
             Stream::StaticString(_) |
             Stream::Readline(_) |
             Stream::InputFile(..) |
@@ -743,7 +786,8 @@ impl Write for Stream {
             Stream::Byte(ref mut cursor) => cursor.stream.get_mut().flush(),
             Stream::StandardError(stream) => stream.stream.flush(),
             Stream::StandardOutput(stream) => stream.stream.flush(),
-            Stream::NamedHttpClient(_) |
+           Stream::HttpWrite(ref mut stream) => stream.stream.get_mut().flush(),
+            Stream::HttpRead(_) |
             Stream::StaticString(_) |
             Stream::Readline(_) |
             Stream::InputFile(_) |
@@ -868,7 +912,8 @@ impl Stream {
             Stream::StaticString(stream) => stream.past_end_of_stream,
             Stream::NamedTcp(stream) => stream.past_end_of_stream,
             Stream::NamedTls(stream) => stream.past_end_of_stream,
-            Stream::NamedHttpClient(stream) => stream.past_end_of_stream,
+            Stream::HttpRead(stream) => stream.past_end_of_stream,
+            Stream::HttpWrite(stream) => stream.past_end_of_stream,        
             Stream::Null(_) => false,
             Stream::Readline(stream) => stream.past_end_of_stream,
             Stream::StandardOutput(stream) => stream.past_end_of_stream,
@@ -890,7 +935,8 @@ impl Stream {
             Stream::StaticString(stream) => stream.past_end_of_stream = value,
             Stream::NamedTcp(stream) => stream.past_end_of_stream = value,
             Stream::NamedTls(stream) => stream.past_end_of_stream = value,
-            Stream::NamedHttpClient(stream) => stream.past_end_of_stream = value,
+            Stream::HttpRead(stream) => stream.past_end_of_stream = value,
+            Stream::HttpWrite(stream) => stream.past_end_of_stream = value,        
             Stream::Null(_) => {}
             Stream::Readline(stream) => stream.past_end_of_stream = value,
             Stream::StandardOutput(stream) => stream.past_end_of_stream = value,
@@ -956,11 +1002,11 @@ impl Stream {
             Stream::Byte(_)
             | Stream::Readline(_)
             | Stream::StaticString(_)
-            | Stream::NamedHttpClient(_)
+            | Stream::HttpRead(_)
             | Stream::InputFile(..) => atom!("read"),
             Stream::NamedTcp(..) | Stream::NamedTls(..) => atom!("read_append"),
             Stream::OutputFile(file) if file.is_append => atom!("append"),
-            Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => atom!("write"),
+            Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | Stream::HttpWrite(_) => atom!("write"),
             Stream::Null(_) => atom!(""),
         }
     }
@@ -1020,8 +1066,8 @@ impl Stream {
         http_stream: Box<dyn BufRead>,
         arena: &mut Arena,
     ) -> Self {
-        Stream::NamedHttpClient(arena_alloc!(
-            StreamLayout::new(CharReader::new(NamedHttpClientStream {
+        Stream::HttpRead(arena_alloc!(
+            StreamLayout::new(CharReader::new(HttpReadStream {
                 url,
                 body_reader: http_stream
             })),
@@ -1029,6 +1075,19 @@ impl Stream {
         ))
     }
 
+    #[inline]
+    pub(crate) fn from_http_sender(
+       body_writer: Sender,
+       arena: &mut Arena,
+    ) -> Self {
+       Stream::HttpWrite(arena_alloc!(
+           StreamLayout::new(CharReader::new(HttpWriteStream {
+               body_writer
+           })),
+           arena
+       ))
+    }
+
     #[inline]
     pub(crate) fn from_file_as_output(
         file_name: Atom,
@@ -1065,7 +1124,7 @@ impl Stream {
             Stream::NamedTls(ref mut tls_stream) => {
                 tls_stream.inner_mut().tls_stream.shutdown()
             }
-            Stream::NamedHttpClient(ref mut http_stream) => {
+            Stream::HttpRead(ref mut http_stream) => {
                 unsafe {
                     http_stream.set_tag(ArenaHeaderTag::Dropped);
                     std::ptr::drop_in_place(&mut http_stream.inner_mut().body_reader as *mut _);
@@ -1073,6 +1132,14 @@ impl Stream {
 
                 Ok(())
             }
+           Stream::HttpWrite(ref mut http_stream) => {
+                unsafe {
+                    http_stream.set_tag(ArenaHeaderTag::Dropped);
+                    std::ptr::drop_in_place(&mut http_stream.inner_mut().body_writer as *mut _);
+                }
+
+                Ok(())
+           }
             Stream::InputFile(mut file_stream) => {
                 // close the stream by dropping the inner File.
                 unsafe {
@@ -1109,7 +1176,7 @@ impl Stream {
         match self {
             Stream::NamedTcp(..)
             | Stream::NamedTls(..)
-            | Stream::NamedHttpClient(..)
+            | Stream::HttpRead(..)
             | Stream::Byte(_)
             | Stream::Readline(_)
             | Stream::StaticString(_)
@@ -1124,7 +1191,8 @@ impl Stream {
             Stream::StandardError(_)
             | Stream::StandardOutput(_)
             | Stream::NamedTcp(..)
-            | Stream::NamedTls(..)
+           | Stream::NamedTls(..)
+           | Stream::HttpWrite(..)     
             | Stream::Byte(_)
             | Stream::OutputFile(..) => true,
             _ => false,
index c044cdb1ce0ac7f43ab433450541b3efa2fe8b46..46506621c68e4d9583570837b9939bd5f79b8b8d 100644 (file)
@@ -8,6 +8,7 @@ use crate::atom_table::*;
 use crate::forms::*;
 use crate::heap_iter::*;
 use crate::heap_print::*;
+use crate::http::{self, HttpListener, HttpResponse};
 use crate::instructions::*;
 use crate::machine;
 use crate::machine::{Machine, VERIFY_ATTR_INTERRUPT_LOC, get_structure_index};
@@ -37,19 +38,20 @@ use ref_thread_local::{RefThreadLocal, ref_thread_local};
 use std::cell::Cell;
 use std::cmp::Ordering;
 use std::collections::BTreeSet;
-use std::convert::TryFrom;
+use std::convert::{TryFrom, Infallible};
 use std::env;
 use std::fs;
 use std::hash::{BuildHasher, BuildHasherDefault};
 use std::io::{ErrorKind, Read, Write};
 use std::iter::{once, FromIterator};
 use std::mem;
-use std::net::{TcpListener, TcpStream};
+use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs};
 use std::num::NonZeroU32;
 use std::ops::Sub;
 use std::process;
 use std::rc::Rc;
 use std::str::FromStr;
+use std::sync::Arc;
 
 use chrono::{offset::Local, DateTime};
 use cpu_time::ProcessTime;
@@ -79,10 +81,13 @@ use base64;
 use roxmltree;
 use select;
 
-use hyper::{Body, Client, HeaderMap, Method, Request, Uri};
+use hyper::{Body, Server, Client, HeaderMap, Method, Request, Response, Uri};
 use hyper::header::{HeaderName, HeaderValue};
 use hyper::body::Buf;
+use hyper::service::{make_service_fn, service_fn};
 use hyper_tls::HttpsConnector;
+use tokio::sync::Mutex;
+use tokio::sync::mpsc::channel;
 
 ref_thread_local! {
     pub(crate) static managed RANDOM_STATE: RandState<'static> = RandState::new();
@@ -3901,6 +3906,203 @@ impl Machine {
         Ok(())
     }
 
+    #[inline(always)]
+    pub(crate) fn http_listen(&mut self) -> CallResult {
+       let address_sink = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1]));
+       if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) {
+           let address_string = address_str.as_str();
+           let addr: SocketAddr = match address_string.to_socket_addrs().ok().and_then(|mut s| s.next()) {
+               Some(addr) => addr,
+                _ => {
+                    self.machine_st.fail = true;
+                    return Ok(());
+                }
+           };
+
+           let (tx, rx) = channel(1);
+           let tx = Arc::new(Mutex::new(tx));
+
+           let _guard = self.runtime.enter();
+           let server = match Server::try_bind(&addr) {
+               Ok(server) => server,
+               Err(_) => {
+                   return Err(self.machine_st.open_permission_error(address_sink, atom!("http_listen"), 2));
+               }
+           };
+
+           self.runtime.spawn(async move {
+               let make_svc = make_service_fn(move |_conn| {
+                   let tx = tx.clone();
+                   async move { Ok::<_, Infallible>(service_fn(move |req| http::serve_req(req, tx.clone()))) }
+               });
+               let server = server.serve(make_svc);
+
+               if let Err(_) = server.await {
+                   eprintln!("server error");
+               }
+           });
+           let http_listener = HttpListener { incoming: rx };
+           let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena);
+           let addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2]));
+           self.machine_st.bind(addr.as_var().unwrap(), typed_arena_ptr_as_cell!(http_listener));
+       }
+       Ok(())
+    }
+
+    #[inline(always)]
+    pub(crate) fn http_accept(&mut self) -> CallResult {
+       let culprit = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1]));
+       let method = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2]));
+       let path = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[3]));
+       let query = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[5]));
+       let stream_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[6]));
+       let handle_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[7]));
+       read_heap_cell!(culprit,
+           (HeapCellValueTag::Cons, cons_ptr) => {
+               match_untyped_arena_ptr!(cons_ptr,
+                   (ArenaHeaderTag::HttpListener, http_listener) => {
+                       match http_listener.incoming.blocking_recv() {
+                           Some(request) => {
+                               let method_atom = match *request.request.method() {
+                                   Method::GET => atom!("get"),
+                                   Method::POST => atom!("post"),
+                                   Method::PUT => atom!("put"),
+                                   Method::DELETE => atom!("delete"),
+                                   Method::PATCH => atom!("patch"),
+                                   Method::HEAD => atom!("head"),
+                                   _ => unreachable!(),
+                               };
+                               let path_atom = self.machine_st.atom_tbl.build_with(request.request.uri().path());
+                               let path_cell = atom_as_cstr_cell!(path_atom);
+                               let headers: Vec<HeapCellValue> = request.request.headers().iter().map(|(header_name, header_value)| {
+                                   let h = self.machine_st.heap.len();
+
+                                   let header_term = functor!(
+                                       self.machine_st.atom_tbl.build_with(header_name.as_str()),
+                                       [cell(string_as_cstr_cell!(self.machine_st.atom_tbl.build_with(header_value.to_str().unwrap())))]
+                                   );
+
+                                   self.machine_st.heap.extend(header_term.into_iter());
+                                   str_loc_as_cell!(h)
+                               }).collect();
+
+                               let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
+
+                               let query_str = request.request.uri().query().unwrap_or("");
+                               let query_atom = self.machine_st.atom_tbl.build_with(query_str);
+                               let query_cell = string_as_cstr_cell!(query_atom);
+                               
+                               let hyper_req = request.request;
+                               let buf = self.runtime.block_on(async {hyper::body::aggregate(hyper_req).await.unwrap()});
+                               let reader = buf.reader();
+
+                               let mut stream = Stream::from_http_stream(
+                                   path_atom,
+                                   Box::new(reader),
+                                   &mut self.machine_st.arena
+                               );
+                               *stream.options_mut() = StreamOptions::default();
+                               stream.options_mut().set_stream_type(StreamType::Binary);
+                               self.indices.streams.insert(stream);
+                               let stream = stream_as_cell!(stream);
+
+                               let handle = arena_alloc!(request.response, &mut self.machine_st.arena);
+
+                               self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
+                               self.machine_st.bind(path.as_var().unwrap(), path_cell);
+                               unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
+                               self.machine_st.bind(query.as_var().unwrap(), query_cell);
+                               self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
+                               self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
+                               }
+                           None => {
+                               self.machine_st.fail = true;
+                           }
+                       }
+                   }
+                   _ => {
+                       unreachable!();
+                   }
+               );
+           }
+           _ => {
+               unreachable!();
+           }
+       );
+       Ok(())
+    }
+
+    #[inline(always)]
+    pub(crate) fn http_answer(&mut self) -> CallResult {
+       let culprit = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1]));
+       let status_code = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2]));
+       let status_code: u16 = match Number::try_from(status_code) {
+           Ok(Number::Fixnum(n)) => n.get_num() as u16,
+           Ok(Number::Integer(n)) => match n.to_u16() {
+               Some(u) => u,
+               _ => {
+                   self.machine_st.fail = true;
+                   return Ok(());
+               }
+           }
+           _ => unreachable!()
+       };
+       let stub_gen = || functor_stub(atom!("http_listen"), 2);
+       let headers = match self.machine_st.try_from_list(self.machine_st.registers[3], stub_gen) {
+            Ok(addrs) => {
+                let mut header_map = HeaderMap::new();
+                for heap_cell in addrs{
+                   read_heap_cell!(heap_cell,
+                       (HeapCellValueTag::Str, s) => {
+                           let name = cell_as_atom_cell!(self.machine_st.heap[s]).get_name();
+                           let value = self.machine_st.value_to_str_like(self.machine_st.heap[s + 1]).unwrap();
+                           header_map.insert(HeaderName::from_str(name.as_str()).unwrap(), HeaderValue::from_str(value.as_str()).unwrap());
+                       }
+                       _ => {
+                           unreachable!()
+                       }
+                   )
+                }
+                header_map
+            },
+            Err(e) => return Err(e)
+        };
+       let stream_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[4]));
+
+       read_heap_cell!(culprit,
+           (HeapCellValueTag::Cons, cons_ptr) => {
+               match_untyped_arena_ptr!(cons_ptr,
+                   (ArenaHeaderTag::HttpResponse, http_response) => {
+                       let mut response = Response::builder()
+                           .status(status_code);
+                       *response.headers_mut().unwrap() = headers;
+                       let (sender, body) = Body::channel();
+                       let response = response.body(body).unwrap();
+                       http_response.blocking_send(response).unwrap();
+
+                       let mut stream = Stream::from_http_sender(
+                           sender,
+                           &mut self.machine_st.arena
+                       );
+                       *stream.options_mut() = StreamOptions::default();
+                       stream.options_mut().set_stream_type(StreamType::Binary);
+                       self.indices.streams.insert(stream);
+                       let stream = stream_as_cell!(stream);
+                       self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
+                   }
+                   _ => {
+                       unreachable!();
+                   }
+               );
+           }
+           _ => {
+               unreachable!();
+           }
+       );
+
+       Ok(())
+    }
+
     #[inline(always)]
     pub(crate) fn current_time(&mut self) {
         let timestamp = self.systemtime_to_timestamp(SystemTime::now());
index 8a000b8213b3c6d9576ba7017f151ed3e91ad6fa..85e2e08671488b4472c4c12a4545fdf9d7c718cc 100644 (file)
@@ -272,6 +272,20 @@ macro_rules! match_untyped_arena_ptr_pat_body {
         #[allow(unused_braces)]
         $code
     }};
+    ($ptr:ident, HttpListener, $listener:ident, $code:expr) => {{
+        let payload_ptr = unsafe { std::mem::transmute::<_, *mut HttpListener>($ptr.payload_offset()) };
+        #[allow(unused_mut)]
+        let mut $listener = TypedArenaPtr::new(payload_ptr);
+        #[allow(unused_braces)]
+        $code
+    }};
+    ($ptr:ident, HttpResponse, $listener:ident, $code:expr) => {{
+        let payload_ptr = unsafe { std::mem::transmute::<_, *mut HttpResponse>($ptr.payload_offset()) };
+        #[allow(unused_mut)]
+        let mut $listener = TypedArenaPtr::new(payload_ptr);
+        #[allow(unused_braces)]
+        $code
+    }};
     ($ptr:ident, IndexPtr, $ip:ident, $code:expr) => {{
         #[allow(unused_mut)]
         let mut $ip = TypedArenaPtr::new(unsafe { std::mem::transmute::<_, *mut IndexPtr>($ptr.get_ptr()) });
@@ -291,7 +305,8 @@ macro_rules! match_untyped_arena_ptr_pat {
             | ArenaHeaderTag::OutputFileStream
             | ArenaHeaderTag::NamedTcpStream
             | ArenaHeaderTag::NamedTlsStream
-            | ArenaHeaderTag::NamedHttpClientStream
+            | ArenaHeaderTag::HttpReadStream
+           | ArenaHeaderTag::HttpWriteStream
             | ArenaHeaderTag::ReadlineStream
             | ArenaHeaderTag::StaticStringStream
             | ArenaHeaderTag::ByteStream