From: Adrián Arroyo Calle Date: Thu, 11 Aug 2022 18:25:19 +0000 (+0200) Subject: HTTP Server 2.0 X-Git-Tag: v0.9.1^2~43 X-Git-Url: https://git.sagredo.dev/?a=commitdiff_plain;h=181be5be3f9426944c1f4475dcb66a120cf4132f;p=scryer-prolog.git HTTP Server 2.0 --- diff --git a/Cargo.lock b/Cargo.lock index e914dcd9..05adcc7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,6 +411,21 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.21" @@ -418,6 +433,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -426,6 +442,34 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2 1.0.36", + "quote 1.0.17", + "syn 1.0.90", +] + [[package]] name = "futures-sink" version = "0.3.21" @@ -444,10 +488,16 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1628,6 +1678,7 @@ dependencies = [ "ctrlc", "dirs-next", "divrem", + "futures", "fxhash", "git-version", "hostname", diff --git a/Cargo.toml b/Cargo.toml index 283081f5..6ccdf314 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ ryu = "1.0.9" hyper = { version = "0.14", features = ["full"] } hyper-tls = "0.5.0" tokio = { version = "1", features = ["full"] } +futures = "0.3" [dev-dependencies] assert_cmd = "1.0.3" diff --git a/build/instructions_template.rs b/build/instructions_template.rs index 4cd660df..9725178c 100644 --- a/build/instructions_template.rs +++ b/build/instructions_template.rs @@ -544,6 +544,12 @@ enum SystemClauseType { DeterministicLengthRundown, #[strum_discriminants(strum(props(Arity = "7", Name = "$http_open")))] HttpOpen, + #[strum_discriminants(strum(props(Arity = "2", Name = "$http_listen")))] + HttpListen, + #[strum_discriminants(strum(props(Arity = "7", Name = "$http_accept")))] + HttpAccept, + #[strum_discriminants(strum(props(Arity = "4", Name = "$http_answer")))] + HttpAnswer, #[strum_discriminants(strum(props(Arity = "3", Name = "$predicate_defined")))] PredicateDefined, #[strum_discriminants(strum(props(Arity = "3", Name = "$strip_module")))] @@ -1686,6 +1692,9 @@ fn generate_instruction_preface() -> TokenStream { &Instruction::CallCpuNow(_) | &Instruction::CallDeterministicLengthRundown(_) | &Instruction::CallHttpOpen(_) | + &Instruction::CallHttpListen(_) | + &Instruction::CallHttpAccept(_) | + &Instruction::CallHttpAnswer(_) | &Instruction::CallPredicateDefined(_) | &Instruction::CallStripModule(_) | &Instruction::CallCurrentTime(_) | @@ -1895,6 +1904,9 @@ fn generate_instruction_preface() -> TokenStream { &Instruction::ExecuteCpuNow(_) | &Instruction::ExecuteDeterministicLengthRundown(_) | &Instruction::ExecuteHttpOpen(_) | + &Instruction::ExecuteHttpListen(_) | + &Instruction::ExecuteHttpAccept(_) | + &Instruction::ExecuteHttpAnswer(_) | &Instruction::ExecutePredicateDefined(_) | &Instruction::ExecuteStripModule(_) | &Instruction::ExecuteCurrentTime(_) | diff --git a/src/arena.rs b/src/arena.rs index bf0cd3ce..3e75e120 100644 --- a/src/arena.rs +++ b/src/arena.rs @@ -1,3 +1,4 @@ +use crate::http::{HttpListener, HttpResponse}; use crate::machine::loader::LiveLoadState; use crate::machine::machine_indices::*; use crate::machine::streams::*; @@ -139,7 +140,8 @@ pub enum ArenaHeaderTag { OutputFileStream = 0b10100, NamedTcpStream = 0b011100, NamedTlsStream = 0b100000, - NamedHttpClientStream = 0b100001, + HttpReadStream = 0b100001, + HttpWriteStream = 0b100010, ReadlineStream = 0b110000, StaticStringStream = 0b110100, ByteStream = 0b111000, @@ -147,6 +149,8 @@ pub enum ArenaHeaderTag { StandardErrorStream = 0b11000, NullStream = 0b111100, TcpListener = 0b1000000, + HttpListener = 0b1000001, + HttpResponse = 0b1000010, Dropped = 0b1000100, IndexPtrDynamicUndefined = 0b1000101, IndexPtrDynamicIndex = 0b1000110, @@ -560,6 +564,50 @@ impl ArenaAllocated for TcpListener { } } +impl ArenaAllocated for HttpListener { + type PtrToAllocated = TypedArenaPtr; + + #[inline] + fn tag() -> ArenaHeaderTag { + ArenaHeaderTag::HttpListener + } + + #[inline] + fn size(&self) -> usize { + mem::size_of::() + } + + #[inline] + fn copy_to_arena(self, dst: *mut Self) -> Self::PtrToAllocated { + unsafe { + ptr::write(dst, self); + TypedArenaPtr::new(dst as *mut Self) + } + } +} + +impl ArenaAllocated for HttpResponse { + type PtrToAllocated = TypedArenaPtr; + + #[inline] + fn tag() -> ArenaHeaderTag { + ArenaHeaderTag::HttpResponse + } + + #[inline] + fn size(&self) -> usize { + mem::size_of::() + } + + #[inline] + fn copy_to_arena(self, dst: *mut Self) -> Self::PtrToAllocated { + unsafe { + ptr::write(dst, self); + TypedArenaPtr::new(dst as *mut Self) + } + } +} + impl ArenaAllocated for IndexPtr { type PtrToAllocated = TypedArenaPtr; @@ -647,9 +695,12 @@ unsafe fn drop_slab_in_place(value: &mut AllocSlab) { ArenaHeaderTag::NamedTlsStream => { ptr::drop_in_place(value.payload_offset::>>()); } - ArenaHeaderTag::NamedHttpClientStream => { - ptr::drop_in_place(value.payload_offset::>>()); + ArenaHeaderTag::HttpReadStream => { + ptr::drop_in_place(value.payload_offset::>>()); } + ArenaHeaderTag::HttpWriteStream => { + ptr::drop_in_place(value.payload_offset::>>()); + } ArenaHeaderTag::ReadlineStream => { ptr::drop_in_place(value.payload_offset::>()); } @@ -670,6 +721,12 @@ unsafe fn drop_slab_in_place(value: &mut AllocSlab) { ArenaHeaderTag::TcpListener => { ptr::drop_in_place(value.payload_offset::()); } + ArenaHeaderTag::HttpListener => { + ptr::drop_in_place(value.payload_offset::()); + } + ArenaHeaderTag::HttpResponse => { + ptr::drop_in_place(value.payload_offset::()); + } ArenaHeaderTag::StandardOutputStream => { ptr::drop_in_place(value.payload_offset::>()); } diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 00000000..887366f8 --- /dev/null +++ b/src/http.rs @@ -0,0 +1,25 @@ +use std::sync::Arc; +use std::convert::Infallible; + +use hyper::{Response, Request, Body}; +use tokio::sync::Mutex; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +pub struct HttpListener { + pub incoming: Receiver +} + +#[derive(Debug)] +pub struct HttpRequest { + pub request: Request, + pub response: HttpResponse, +} + +pub type HttpResponse = Sender>; + +pub async fn serve_req(req: Request, tx: Arc>>) -> Result, Infallible> { + let (response_tx, mut rx) = channel(1); + let http_request = HttpRequest { request: req, response: response_tx }; + tx.lock().await.send(http_request).await.unwrap(); + Ok(rx.recv().await.unwrap()) +} diff --git a/src/lib.rs b/src/lib.rs index fc8d8ca5..2846fd0e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ mod fixtures; mod forms; mod heap_iter; pub mod heap_print; +mod http; mod indexing; #[macro_use] pub mod instructions { diff --git a/src/lib/http/http_server.pl b/src/lib/http/http_server.pl index 72657c35..a49969ee 100644 --- a/src/lib/http/http_server.pl +++ b/src/lib/http/http_server.pl @@ -1,11 +1,11 @@ /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Written in December 2020 by Adrián Arroyo (adrian.arroyocalle@gmail.com) + Updated in March 2022 by Adrián Arroyo to use the Hyper backend Part of Scryer Prolog This library provides an starting point to build HTTP server based applications. - It currently implements a subset of HTTP/1.0. It is recommended to put a reverse - proxy like nginx in front of this server to have access to more advanced features - (gzip compression, HTTPS, ...) + It is based on Hyper, which allows for HTTP/1.0, HTTP/1.1 and HTTP/2. However, + some advanced features that Hyper provides are still not accesible. Usage ========== @@ -41,37 +41,36 @@ Some things that are still missing: - Read forms in multipart format - HTTP Basic Auth - - Keep-Alive support - Session handling via cookies - HTML Templating I place this code in the public domain. Use it in any way you want. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ + :- module(http_server, [ - http_listen/2, - http_headers/2, - http_status_code/2, - http_body/2, - http_redirect/2, - http_query/3, - url_decode//1 + http_listen/2, + http_headers/2, + http_status_code/2, + http_body/2, + http_redirect/2, + http_query/3 ]). -:- meta_predicate http_listen(?, 2). +:- meta_predicate http_listen(?, :). -:- use_module(library(sockets)). -:- use_module(library(dcgs)). -:- use_module(library(format)). -:- use_module(library(error)). :- use_module(library(charsio)). -:- use_module(library(lists)). +:- use_module(library(crypto)). +:- use_module(library(error)). +:- use_module(library(format)). :- use_module(library(iso_ext)). +:- use_module(library(lists)). +:- use_module(library(pio)). :- use_module(library(time)). -:- use_module(library(crypto)). -% Module prefix workaround with meta_predicate http_listen(Port, Module:Handlers0) :- + must_be(integer, Port), + must_be(list, Handlers0), maplist(module_qualification(Module), Handlers0, Handlers), http_listen_(Port, Handlers). @@ -79,49 +78,75 @@ module_qualification(M, H0, H) :- H0 =.. [Method, Path, Goal], H =.. [Method, Path, M:Goal]. -% Server initialization http_listen_(Port, Handlers) :- - must_be(integer, Port), - must_be(list, Handlers), - once(socket_server_open(Port, Socket)), - format("Listening at port ~d\n", [Port]), - accept_loop(Socket, Handlers). - -% Server loop -accept_loop(Socket, Handlers) :- - setup_call_cleanup(socket_server_accept(Socket, _Client, Stream, [type(binary)]), - ( - read_header_lines(Stream, Lines), - [Request|Headers] = Lines, - ( - (phrase(parse_request(_Version, Method, Path, Queries), Request), maplist(map_parse_header, Headers, HeadersKV)) -> ( - ( - member("content-length"-ContentLength, HeadersKV) -> - (number_chars(ContentLengthN, ContentLength), get_bytes(Stream, ContentLengthN, Body)) - ;true - ), - current_time(Time), - phrase(format_time("%Y-%m-%d (%H:%M:%S)", Time), TimeString), - format("~s ~w ~s\n", [TimeString, Method, Path]), - ( - match_handler(Handlers, Method, Path, Handler) -> - ( - HttpRequest = http_request(HeadersKV, binary(Body), Queries), - HttpResponse = http_response(_, _, _), - (call(Handler, HttpRequest, HttpResponse) -> - send_response(Stream, HttpResponse) - ; format(Stream, "HTTP/1.0 500 Internal Server Error\r\n\r\n", []) - ) - ) - ; format(Stream, "HTTP/1.0 404 Not Found\r\n\r\n", []) - ) - );( - format(Stream, "HTTP/1.0 400 Bad Request\r\n\r\n", []) % bad format - ) - ), - ! % Remove - ), close(Stream)), - accept_loop(Socket, Handlers). + number_chars(Port, CPort), + append("0.0.0.0:", CPort, Addr), + '$http_listen'(Addr, HttpListener),!, + format("Listening at ~s\n", [Addr]), + http_loop(HttpListener, Handlers). + +http_loop(HttpListener, Handlers) :- + '$http_accept'(HttpListener, RequestMethod, RequestPath, RequestHeaders, RequestQuery, RequestStream, ResponseHandle), + current_time(Time), + phrase(format_time("%Y-%m-%d (%H:%M:%S)", Time), TimeString), + format("~s ~w ~s\n", [TimeString, RequestMethod, RequestPath]), + maplist(map_header_kv, RequestHeaders, RequestHeadersKV), + phrase(parse_queries(RequestQueries), RequestQuery), + ( + match_handler(Handlers, RequestMethod, RequestPath, Handler) -> + ( + HttpRequest = http_request(RequestHeadersKV, stream(RequestStream), RequestQueries), + HttpResponse = http_response(_, _, _), + (call(Handler, HttpRequest, HttpResponse) -> + send_response(ResponseHandle, HttpResponse) + ; '$http_answer'(ResponseHandle, 500, [], "Internal Server Error") + ) + ) + ; '$http_answer'(ResponseHandle, 404, [], "Not Found") + ), + http_loop(HttpListener, Handlers). + +send_response(ResponseHandle, http_response(StatusCode0, text(ResponseText), ResponseHeaders0)) :- + default(StatusCode0, 200, StatusCode), + maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0), + '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream), + format(ResponseStream, "~s", [ResponseText]), + close(ResponseStream). + +send_response(ResponseHandle, http_response(StatusCode0, bytes(ResponseBytes), ResponseHeaders0)) :- + default(StatusCode0, 200, StatusCode), + maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0), + '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream), + format(ResponseStream, "~s", [ResponseBytes]), + close(ResponseStream). + +send_response(ResponseHandle, http_response(StatusCode0, file(Filename), ResponseHeaders0)) :- + default(StatusCode0, 200, StatusCode), + maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0), + '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream), + setup_call_cleanup( + open(Filename, read, FileStream, [type(binary)]), + ( + get_n_chars(FileStream, _, FileCs), + format(ResponseStream, "~s", [FileCs]) + ), + close(FileStream) + ), + close(ResponseStream). + + +default(Var, Default, Out) :- + (var(Var) -> Out = Default + ; Var = Out + ). + +map_header_kv(T, K-V) :- + T =.. [K0, V], + atom_chars(K0, K). + +map_header_kv_2(T, K-V) :- + atom_chars(K0, K), + T =.. [K0, V]. match_handler(Handlers, Method, "/", Handler) :- member(H, Handlers), @@ -139,27 +164,6 @@ match_handler(Handlers, Method, Path, Handler) :- var(Var), Var = Path. - -% Helper and recommended predicates - -http_headers(http_request(Headers, _, _), Headers). -http_headers(http_response(_, _, Headers), Headers). - -http_body(http_request(_, binary(ByteBody), _), text(TextBody)) :- chars_utf8bytes(TextBody, ByteBody). -http_body(http_request(Headers, binary(ByteBody), _), form(FormBody)) :- - member("content-type"-"application/x-www-form-urlencoded", Headers), - chars_utf8bytes(TextBody, ByteBody), - phrase(parse_queries(FormBody), TextBody). -http_body(http_request(_, Body, _), Body). -http_body(http_response(_, Body, _), Body). - -http_status_code(http_response(StatusCode, _, _), StatusCode). - -http_redirect(http_response(307, text("Moved Temporarily"), ["Location"-Uri]), Uri). - -http_query(http_request(_, _, Queries), Key, Value) :- member(Key-Value, Queries). - -% Route matching path(Pattern) --> { Pattern =.. Parts, @@ -183,76 +187,31 @@ path(Pattern) --> path([]) --> []. -% Send responses -send_response(Stream, http_response(StatusCode0, file(Filename), Headers)) :- - default(StatusCode0, 200, StatusCode), - format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]), - overwrite_header("connection"-"Close", Headers, Headers0), - write_headers(Stream, Headers0), - format(Stream, "\r\n", []), - setup_call_cleanup( - open(Filename, read, FileStream, [type(binary)]), - pipe_bytes(FileStream, Stream), - close(FileStream) - ). - -send_response(Stream, http_response(StatusCode0, text(TextResponse), Headers)) :- - default(StatusCode0, 200, StatusCode), - format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]), - overwrite_header("content-type"-"text/plain", Headers, Headers0), - overwrite_header("connection"-"Close", Headers0, Headers1), - write_headers(Stream, Headers1), - format(Stream, "\r\n~s", [TextResponse]). +string_without(Not, [Char|String]) --> + [Char], + { + \+ member(Char, Not) + }, + string_without(Not, String). -send_response(Stream, http_response(StatusCode0, binary(BinaryResponse), Headers)) :- - default(StatusCode0, 200, StatusCode), - format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]), - overwrite_header("connection"-"Close", Headers, Headers0), - write_headers(Stream, Headers0), - format(Stream, "\r\n", []), - put_bytes(Stream, BinaryResponse). +string_without(_, []) --> + []. -default(Var, Default, Out) :- - (var(Var) -> Out = Default - ; Var = Out - ). +http_headers(http_request(Headers, _, _), Headers). +http_headers(http_response(_, _, Headers), Headers). -header([]) --> []. -header([Key-Value|Headers]) --> - format_("~s: ~s\r\n", [Key, Value]), - header(Headers). - -write_headers(Stream, Headers) :- - phrase(header(Headers), Cs), - format(Stream, "~s", [Cs]). - -overwrite_header(Key-Value, [], [Key-Value]). -overwrite_header(Key-Value, [Header|Headers], [Header|HeadersOut]) :- - Header = Key0-_, - Key0 \= Key, - overwrite_header(Key-Value, Headers, HeadersOut). -overwrite_header(Key-Value, [Header|Headers], [NewHeader|Headers]) :- - Header = Key-_, - NewHeader = Key-Value. - -parse_request(http_version(Major, Minor), Method, Path, Queries) --> - method(Method), - " ", - parse_path(Path, Queries), - " ", - "HTTP/", - natural(Major), - ".", - natural(Minor), - "\r\n". - -parse_path(Path, Queries) --> - string_without("?", Path), - "?", - parse_queries(Queries). +http_body(http_request(_, stream(StreamBody), _), bytes(BytesBody)) :- get_n_chars(StreamBody, _, BytesBody). +http_body(http_request(_, stream(StreamBody), _), text(TextBody)) :- get_n_chars(StreamBody, _, TextBody). +http_body(http_request(Headers, stream(StreamBody), _), form(FormBody)) :- + member("content-type"-"application/x-www-form-urlencoded", Headers), + get_n_chars(StreamBody, _, TextBody), + phrase(parse_queries(FormBody), TextBody). +http_body(http_request(_, Body, _), Body). +http_body(http_response(_, Body, _), Body). -parse_path(Path, []) --> - string_without(" ", Path). +http_status_code(http_response(StatusCode, _, _), StatusCode). +http_redirect(http_response(307, text("Moved Temporarily"), ["Location"-Uri]), Uri). +http_query(http_request(_, _, Queries), Key, Value) :- member(Key-Value, Queries). parse_queries([Key-Value|Queries]) --> string_without("=", Key0), @@ -278,94 +237,9 @@ parse_queries([Key-Value]) --> phrase(url_decode(Value), Value0) }. -map_parse_header(Header, HeaderKV) :- - phrase(parse_header(HeaderKV), Header). - -parse_header(Key-Value) --> - string_without(":", Key0), - { - chars_lower(Key0, Key) - }, - ": ", - string_without("\r", Value), - "\r\n". - -method(options) --> "OPTIONS". -method(get) --> "GET". -method(head) --> "HEAD". -method(post) --> "POST". -method(put) --> "PUT". -method(delete) --> "DELETE". - -string_without(Not, [Char|String]) --> - [Char], - { - \+ member(Char, Not) - }, - string_without(Not, String). - -string_without(_, []) --> +parse_queries([]) --> []. -natural(Nat) --> - natural_(NatChars), - { - number_chars(Nat, NatChars) - }. - -natural_([Nat|Nats]) --> - [Nat], - { - char_type(Nat, decimal_digit) - }, - natural_(Nats). - -natural_([]) --> - []. - -read_header_lines(Stream, Hs) :- - read_line_to_chars(Stream, Cs, []), - ( Cs == "" -> Hs = [] - ; Cs == "\r\n" -> Hs = [] - ; Hs = [Cs|Rest], - read_header_lines(Stream, Rest) - ). - -get_bytes(Stream, Length, Res) :- get_bytes(Stream, Length, [], Res). -get_bytes(Stream, Length, Acc, Res) :- - (Length > 0 -> ( - get_byte(Stream, B), - B =\= -1, - get_bytes(Stream, Length - 1, [B|Acc], Res) - ); reverse(Acc, Res)). - -put_bytes(_, []). -put_bytes(Stream, [Byte|Bytes]) :- - put_byte(Stream, Byte), - put_bytes(Stream, Bytes). - -pipe_bytes(StreamIn, StreamOut) :- - get_byte(StreamIn, Byte), - ( - Byte =\= -1 -> - ( - put_byte(StreamOut, Byte), - pipe_bytes(StreamIn, StreamOut) - ) - ; true). - -% WARNING: This only works for ASCII chars. This code can be modified to support -% Latin1 characters also but a completely different approach is needed for other -% languages. Since HTTP internals are ASCII, this is fine for this usecase. -chars_lower(Chars, Lower) :- - maplist(char_lower, Chars, Lower). -char_lower(Char, Lower) :- - char_code(Char, Code), - ((Code >= 65,Code =< 90) -> - LowerCode is Code + 32, - char_code(Lower, LowerCode) - ; Char = Lower). - % Decodes a UTF-8 URL Encoded string: RFC-1738 url_decode([Char|Chars]) --> [Char], diff --git a/src/machine/dispatch.rs b/src/machine/dispatch.rs index 20e76b24..82ef4826 100644 --- a/src/machine/dispatch.rs +++ b/src/machine/dispatch.rs @@ -4197,6 +4197,30 @@ impl Machine { try_or_throw!(self.machine_st, self.http_open()); step_or_fail!(self, self.machine_st.p = self.machine_st.cp); } + &Instruction::CallHttpListen(_) => { + try_or_throw!(self.machine_st, self.http_listen()); + step_or_fail!(self, self.machine_st.p += 1); + } + &Instruction::ExecuteHttpListen(_) => { + try_or_throw!(self.machine_st, self.http_listen()); + step_or_fail!(self, self.machine_st.p = self.machine_st.cp); + } + &Instruction::CallHttpAccept(_) => { + try_or_throw!(self.machine_st, self.http_accept()); + step_or_fail!(self, self.machine_st.p += 1); + } + &Instruction::ExecuteHttpAccept(_) => { + try_or_throw!(self.machine_st, self.http_accept()); + step_or_fail!(self, self.machine_st.p = self.machine_st.cp); + } + &Instruction::CallHttpAnswer(_) => { + try_or_throw!(self.machine_st, self.http_answer()); + step_or_fail!(self, self.machine_st.p += 1); + } + &Instruction::ExecuteHttpAnswer(_) => { + try_or_throw!(self.machine_st, self.http_answer()); + step_or_fail!(self, self.machine_st.p = self.machine_st.cp); + } &Instruction::CallCurrentTime(_) => { self.current_time(); step_or_fail!(self, self.machine_st.p += 1); diff --git a/src/machine/mod.rs b/src/machine/mod.rs index f34f38fb..4cd1fce8 100644 --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@ -431,9 +431,7 @@ impl Machine { let user_output = Stream::stdout(&mut machine_st.arena); let user_error = Stream::stderr(&mut machine_st.arena); - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() + let runtime = tokio::runtime::Runtime::new() .unwrap(); let mut wam = Machine { diff --git a/src/machine/streams.rs b/src/machine/streams.rs index 548cf6ac..9e558af5 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -26,6 +26,7 @@ use std::ops::{Deref, DerefMut}; use std::ptr; use native_tls::TlsStream; +use hyper::body::{Bytes, Sender}; #[derive(Debug, BitfieldSpecifier, Clone, Copy, PartialEq, Eq, Hash)] #[bits = 1] @@ -249,24 +250,51 @@ impl Write for NamedTlsStream { } } -pub struct NamedHttpClientStream { +pub struct HttpReadStream { url: Atom, body_reader: Box, } -impl Debug for NamedHttpClientStream { +impl Debug for HttpReadStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Http Client Stream [{}]", self.url.as_str()) + write!(f, "Http Read Stream [{}]", self.url.as_str()) } } -impl Read for NamedHttpClientStream { +impl Read for HttpReadStream { #[inline] fn read(&mut self, buf: &mut [u8]) -> std::io::Result { self.body_reader.read(buf) } } +pub struct HttpWriteStream { + body_writer: Sender, +} + +impl Debug for HttpWriteStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Http Write Stream") + } +} + +impl Write for HttpWriteStream { + #[inline] + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let bytes = Bytes::copy_from_slice(buf); + let len = bytes.len(); + match self.body_writer.try_send_data(bytes) { + Ok(()) => Ok(len), + Err(_) => Err(std::io::Error::from(ErrorKind::Interrupted)) + } + } + + #[inline] + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + #[derive(Debug)] pub struct StandardOutputStream {} @@ -405,7 +433,8 @@ arena_allocated_impl_for_stream!(CharReader, InputFileStream); arena_allocated_impl_for_stream!(OutputFileStream, OutputFileStream); arena_allocated_impl_for_stream!(CharReader, NamedTcpStream); arena_allocated_impl_for_stream!(CharReader, NamedTlsStream); -arena_allocated_impl_for_stream!(CharReader, NamedHttpClientStream); +arena_allocated_impl_for_stream!(CharReader, HttpReadStream); +arena_allocated_impl_for_stream!(CharReader, HttpWriteStream); arena_allocated_impl_for_stream!(ReadlineStream, ReadlineStream); arena_allocated_impl_for_stream!(StaticStringStream, StaticStringStream); arena_allocated_impl_for_stream!(StandardOutputStream, StandardOutputStream); @@ -419,7 +448,8 @@ pub enum Stream { StaticString(TypedArenaPtr>), NamedTcp(TypedArenaPtr>>), NamedTls(TypedArenaPtr>>), - NamedHttpClient(TypedArenaPtr>>), + HttpRead(TypedArenaPtr>>), + HttpWrite(TypedArenaPtr>>), Null(StreamOptions), Readline(TypedArenaPtr>), StandardOutput(TypedArenaPtr>), @@ -474,7 +504,8 @@ impl Stream { } ArenaHeaderTag::NamedTcpStream => Stream::NamedTcp(TypedArenaPtr::new(ptr as *mut _)), ArenaHeaderTag::NamedTlsStream => Stream::NamedTls(TypedArenaPtr::new(ptr as *mut _)), - ArenaHeaderTag::NamedHttpClientStream => Stream::NamedHttpClient(TypedArenaPtr::new(ptr as *mut _)), + ArenaHeaderTag::HttpReadStream => Stream::HttpRead(TypedArenaPtr::new(ptr as *mut _)), + ArenaHeaderTag::HttpWriteStream => Stream::HttpWrite(TypedArenaPtr::new(ptr as *mut _)), ArenaHeaderTag::ReadlineStream => Stream::Readline(TypedArenaPtr::new(ptr as *mut _)), ArenaHeaderTag::StaticStringStream => { Stream::StaticString(TypedArenaPtr::new(ptr as *mut _)) @@ -527,7 +558,8 @@ impl Stream { Stream::StaticString(ptr) => ptr.header_ptr(), Stream::NamedTcp(ptr) => ptr.header_ptr(), Stream::NamedTls(ptr) => ptr.header_ptr(), - Stream::NamedHttpClient(ptr) => ptr.header_ptr(), + Stream::HttpRead(ptr) => ptr.header_ptr(), + Stream::HttpWrite(ptr) => ptr.header_ptr(), Stream::Null(_) => ptr::null(), Stream::Readline(ptr) => ptr.header_ptr(), Stream::StandardOutput(ptr) => ptr.header_ptr(), @@ -543,7 +575,8 @@ impl Stream { Stream::StaticString(ref ptr) => &ptr.options, Stream::NamedTcp(ref ptr) => &ptr.options, Stream::NamedTls(ref ptr) => &ptr.options, - Stream::NamedHttpClient(ref ptr) => &ptr.options, + Stream::HttpRead(ref ptr) => &ptr.options, + Stream::HttpWrite(ref ptr) => &ptr.options, Stream::Null(ref options) => options, Stream::Readline(ref ptr) => &ptr.options, Stream::StandardOutput(ref ptr) => &ptr.options, @@ -559,7 +592,8 @@ impl Stream { Stream::StaticString(ref mut ptr) => &mut ptr.options, Stream::NamedTcp(ref mut ptr) => &mut ptr.options, Stream::NamedTls(ref mut ptr) => &mut ptr.options, - Stream::NamedHttpClient(ref mut ptr) => &mut ptr.options, + Stream::HttpRead(ref mut ptr) => &mut ptr.options, + Stream::HttpWrite(ref mut ptr) => &mut ptr.options, Stream::Null(ref mut options) => options, Stream::Readline(ref mut ptr) => &mut ptr.options, Stream::StandardOutput(ref mut ptr) => &mut ptr.options, @@ -576,7 +610,8 @@ impl Stream { Stream::StaticString(ptr) => ptr.lines_read += incr_num_lines_read, Stream::NamedTcp(ptr) => ptr.lines_read += incr_num_lines_read, Stream::NamedTls(ptr) => ptr.lines_read += incr_num_lines_read, - Stream::NamedHttpClient(ptr) => ptr.lines_read += incr_num_lines_read, + Stream::HttpRead(ptr) => ptr.lines_read += incr_num_lines_read, + Stream::HttpWrite(_) => {} Stream::Null(_) => {} Stream::Readline(ptr) => ptr.lines_read += incr_num_lines_read, Stream::StandardOutput(ptr) => ptr.lines_read += incr_num_lines_read, @@ -593,7 +628,8 @@ impl Stream { Stream::StaticString(ptr) => ptr.lines_read = value, Stream::NamedTcp(ptr) => ptr.lines_read = value, Stream::NamedTls(ptr) => ptr.lines_read = value, - Stream::NamedHttpClient(ptr) => ptr.lines_read = value, + Stream::HttpRead(ptr) => ptr.lines_read = value, + Stream::HttpWrite(_) => {} Stream::Null(_) => {} Stream::Readline(ptr) => ptr.lines_read = value, Stream::StandardOutput(ptr) => ptr.lines_read = value, @@ -610,7 +646,8 @@ impl Stream { Stream::StaticString(ptr) => ptr.lines_read, Stream::NamedTcp(ptr) => ptr.lines_read, Stream::NamedTls(ptr) => ptr.lines_read, - Stream::NamedHttpClient(ptr) => ptr.lines_read, + Stream::HttpRead(ptr) => ptr.lines_read, + Stream::HttpWrite(_) => 0, Stream::Null(_) => 0, Stream::Readline(ptr) => ptr.lines_read, Stream::StandardOutput(ptr) => ptr.lines_read, @@ -625,13 +662,14 @@ impl CharRead for Stream { Stream::InputFile(file) => (*file).peek_char(), Stream::NamedTcp(tcp_stream) => (*tcp_stream).peek_char(), Stream::NamedTls(tls_stream) => (*tls_stream).peek_char(), - Stream::NamedHttpClient(http_stream) => (*http_stream).peek_char(), + Stream::HttpRead(http_stream) => (*http_stream).peek_char(), Stream::Readline(rl_stream) => (*rl_stream).peek_char(), Stream::StaticString(src) => (*src).peek_char(), Stream::Byte(cursor) => (*cursor).peek_char(), Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | + Stream::HttpWrite(_) | Stream::Null(_) => Some(Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, @@ -644,13 +682,14 @@ impl CharRead for Stream { Stream::InputFile(file) => (*file).read_char(), Stream::NamedTcp(tcp_stream) => (*tcp_stream).read_char(), Stream::NamedTls(tls_stream) => (*tls_stream).read_char(), - Stream::NamedHttpClient(http_stream) => (*http_stream).read_char(), + Stream::HttpRead(http_stream) => (*http_stream).read_char(), Stream::Readline(rl_stream) => (*rl_stream).read_char(), Stream::StaticString(src) => (*src).read_char(), Stream::Byte(cursor) => (*cursor).read_char(), Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | + Stream::HttpWrite(_) | Stream::Null(_) => Some(Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, @@ -663,13 +702,14 @@ impl CharRead for Stream { Stream::InputFile(file) => file.put_back_char(c), Stream::NamedTcp(tcp_stream) => tcp_stream.put_back_char(c), Stream::NamedTls(tls_stream) => tls_stream.put_back_char(c), - Stream::NamedHttpClient(http_stream) => http_stream.put_back_char(c), + Stream::HttpRead(http_stream) => http_stream.put_back_char(c), Stream::Readline(rl_stream) => rl_stream.put_back_char(c), Stream::StaticString(src) => src.put_back_char(c), Stream::Byte(cursor) => cursor.put_back_char(c), Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | + Stream::HttpWrite(_) | Stream::Null(_) => {} } } @@ -679,13 +719,14 @@ impl CharRead for Stream { Stream::InputFile(ref mut file) => file.consume(nread), Stream::NamedTcp(ref mut tcp_stream) => tcp_stream.consume(nread), Stream::NamedTls(ref mut tls_stream) => tls_stream.consume(nread), - Stream::NamedHttpClient(ref mut http_stream) => http_stream.consume(nread), + Stream::HttpRead(ref mut http_stream) => http_stream.consume(nread), Stream::Readline(ref mut rl_stream) => rl_stream.consume(nread), Stream::StaticString(ref mut src) => src.consume(nread), Stream::Byte(ref mut cursor) => cursor.consume(nread), Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | + Stream::HttpWrite(_) | Stream::Null(_) => {} } } @@ -698,13 +739,14 @@ impl Read for Stream { Stream::InputFile(file) => (*file).read(buf), Stream::NamedTcp(tcp_stream) => (*tcp_stream).read(buf), Stream::NamedTls(tls_stream) => (*tls_stream).read(buf), - Stream::NamedHttpClient(http_stream) => (*http_stream).read(buf), + Stream::HttpRead(http_stream) => (*http_stream).read(buf), Stream::Readline(rl_stream) => (*rl_stream).read(buf), Stream::StaticString(src) => (*src).read(buf), Stream::Byte(cursor) => (*cursor).read(buf), Stream::OutputFile(_) | Stream::StandardError(_) - | Stream::StandardOutput(_) + | Stream::StandardOutput(_) + | Stream::HttpWrite(_) | Stream::Null(_) => Err(std::io::Error::new( ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, @@ -724,7 +766,8 @@ impl Write for Stream { Stream::Byte(ref mut cursor) => cursor.get_mut().write(buf), Stream::StandardOutput(stream) => stream.write(buf), Stream::StandardError(stream) => stream.write(buf), - Stream::NamedHttpClient(_) | + Stream::HttpWrite(ref mut stream) => stream.get_mut().write(buf), + Stream::HttpRead(_) | Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(..) | @@ -743,7 +786,8 @@ impl Write for Stream { Stream::Byte(ref mut cursor) => cursor.stream.get_mut().flush(), Stream::StandardError(stream) => stream.stream.flush(), Stream::StandardOutput(stream) => stream.stream.flush(), - Stream::NamedHttpClient(_) | + Stream::HttpWrite(ref mut stream) => stream.stream.get_mut().flush(), + Stream::HttpRead(_) | Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(_) | @@ -868,7 +912,8 @@ impl Stream { Stream::StaticString(stream) => stream.past_end_of_stream, Stream::NamedTcp(stream) => stream.past_end_of_stream, Stream::NamedTls(stream) => stream.past_end_of_stream, - Stream::NamedHttpClient(stream) => stream.past_end_of_stream, + Stream::HttpRead(stream) => stream.past_end_of_stream, + Stream::HttpWrite(stream) => stream.past_end_of_stream, Stream::Null(_) => false, Stream::Readline(stream) => stream.past_end_of_stream, Stream::StandardOutput(stream) => stream.past_end_of_stream, @@ -890,7 +935,8 @@ impl Stream { Stream::StaticString(stream) => stream.past_end_of_stream = value, Stream::NamedTcp(stream) => stream.past_end_of_stream = value, Stream::NamedTls(stream) => stream.past_end_of_stream = value, - Stream::NamedHttpClient(stream) => stream.past_end_of_stream = value, + Stream::HttpRead(stream) => stream.past_end_of_stream = value, + Stream::HttpWrite(stream) => stream.past_end_of_stream = value, Stream::Null(_) => {} Stream::Readline(stream) => stream.past_end_of_stream = value, Stream::StandardOutput(stream) => stream.past_end_of_stream = value, @@ -956,11 +1002,11 @@ impl Stream { Stream::Byte(_) | Stream::Readline(_) | Stream::StaticString(_) - | Stream::NamedHttpClient(_) + | Stream::HttpRead(_) | Stream::InputFile(..) => atom!("read"), Stream::NamedTcp(..) | Stream::NamedTls(..) => atom!("read_append"), Stream::OutputFile(file) if file.is_append => atom!("append"), - Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => atom!("write"), + Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | Stream::HttpWrite(_) => atom!("write"), Stream::Null(_) => atom!(""), } } @@ -1020,8 +1066,8 @@ impl Stream { http_stream: Box, arena: &mut Arena, ) -> Self { - Stream::NamedHttpClient(arena_alloc!( - StreamLayout::new(CharReader::new(NamedHttpClientStream { + Stream::HttpRead(arena_alloc!( + StreamLayout::new(CharReader::new(HttpReadStream { url, body_reader: http_stream })), @@ -1029,6 +1075,19 @@ impl Stream { )) } + #[inline] + pub(crate) fn from_http_sender( + body_writer: Sender, + arena: &mut Arena, + ) -> Self { + Stream::HttpWrite(arena_alloc!( + StreamLayout::new(CharReader::new(HttpWriteStream { + body_writer + })), + arena + )) + } + #[inline] pub(crate) fn from_file_as_output( file_name: Atom, @@ -1065,7 +1124,7 @@ impl Stream { Stream::NamedTls(ref mut tls_stream) => { tls_stream.inner_mut().tls_stream.shutdown() } - Stream::NamedHttpClient(ref mut http_stream) => { + Stream::HttpRead(ref mut http_stream) => { unsafe { http_stream.set_tag(ArenaHeaderTag::Dropped); std::ptr::drop_in_place(&mut http_stream.inner_mut().body_reader as *mut _); @@ -1073,6 +1132,14 @@ impl Stream { Ok(()) } + Stream::HttpWrite(ref mut http_stream) => { + unsafe { + http_stream.set_tag(ArenaHeaderTag::Dropped); + std::ptr::drop_in_place(&mut http_stream.inner_mut().body_writer as *mut _); + } + + Ok(()) + } Stream::InputFile(mut file_stream) => { // close the stream by dropping the inner File. unsafe { @@ -1109,7 +1176,7 @@ impl Stream { match self { Stream::NamedTcp(..) | Stream::NamedTls(..) - | Stream::NamedHttpClient(..) + | Stream::HttpRead(..) | Stream::Byte(_) | Stream::Readline(_) | Stream::StaticString(_) @@ -1124,7 +1191,8 @@ impl Stream { Stream::StandardError(_) | Stream::StandardOutput(_) | Stream::NamedTcp(..) - | Stream::NamedTls(..) + | Stream::NamedTls(..) + | Stream::HttpWrite(..) | Stream::Byte(_) | Stream::OutputFile(..) => true, _ => false, diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index c044cdb1..46506621 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -8,6 +8,7 @@ use crate::atom_table::*; use crate::forms::*; use crate::heap_iter::*; use crate::heap_print::*; +use crate::http::{self, HttpListener, HttpResponse}; use crate::instructions::*; use crate::machine; use crate::machine::{Machine, VERIFY_ATTR_INTERRUPT_LOC, get_structure_index}; @@ -37,19 +38,20 @@ use ref_thread_local::{RefThreadLocal, ref_thread_local}; use std::cell::Cell; use std::cmp::Ordering; use std::collections::BTreeSet; -use std::convert::TryFrom; +use std::convert::{TryFrom, Infallible}; use std::env; use std::fs; use std::hash::{BuildHasher, BuildHasherDefault}; use std::io::{ErrorKind, Read, Write}; use std::iter::{once, FromIterator}; use std::mem; -use std::net::{TcpListener, TcpStream}; +use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs}; use std::num::NonZeroU32; use std::ops::Sub; use std::process; use std::rc::Rc; use std::str::FromStr; +use std::sync::Arc; use chrono::{offset::Local, DateTime}; use cpu_time::ProcessTime; @@ -79,10 +81,13 @@ use base64; use roxmltree; use select; -use hyper::{Body, Client, HeaderMap, Method, Request, Uri}; +use hyper::{Body, Server, Client, HeaderMap, Method, Request, Response, Uri}; use hyper::header::{HeaderName, HeaderValue}; use hyper::body::Buf; +use hyper::service::{make_service_fn, service_fn}; use hyper_tls::HttpsConnector; +use tokio::sync::Mutex; +use tokio::sync::mpsc::channel; ref_thread_local! { pub(crate) static managed RANDOM_STATE: RandState<'static> = RandState::new(); @@ -3901,6 +3906,203 @@ impl Machine { Ok(()) } + #[inline(always)] + pub(crate) fn http_listen(&mut self) -> CallResult { + let address_sink = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1])); + if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) { + let address_string = address_str.as_str(); + let addr: SocketAddr = match address_string.to_socket_addrs().ok().and_then(|mut s| s.next()) { + Some(addr) => addr, + _ => { + self.machine_st.fail = true; + return Ok(()); + } + }; + + let (tx, rx) = channel(1); + let tx = Arc::new(Mutex::new(tx)); + + let _guard = self.runtime.enter(); + let server = match Server::try_bind(&addr) { + Ok(server) => server, + Err(_) => { + return Err(self.machine_st.open_permission_error(address_sink, atom!("http_listen"), 2)); + } + }; + + self.runtime.spawn(async move { + let make_svc = make_service_fn(move |_conn| { + let tx = tx.clone(); + async move { Ok::<_, Infallible>(service_fn(move |req| http::serve_req(req, tx.clone()))) } + }); + let server = server.serve(make_svc); + + if let Err(_) = server.await { + eprintln!("server error"); + } + }); + let http_listener = HttpListener { incoming: rx }; + let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena); + let addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2])); + self.machine_st.bind(addr.as_var().unwrap(), typed_arena_ptr_as_cell!(http_listener)); + } + Ok(()) + } + + #[inline(always)] + pub(crate) fn http_accept(&mut self) -> CallResult { + let culprit = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1])); + let method = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2])); + let path = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[3])); + let query = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[5])); + let stream_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[6])); + let handle_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[7])); + read_heap_cell!(culprit, + (HeapCellValueTag::Cons, cons_ptr) => { + match_untyped_arena_ptr!(cons_ptr, + (ArenaHeaderTag::HttpListener, http_listener) => { + match http_listener.incoming.blocking_recv() { + Some(request) => { + let method_atom = match *request.request.method() { + Method::GET => atom!("get"), + Method::POST => atom!("post"), + Method::PUT => atom!("put"), + Method::DELETE => atom!("delete"), + Method::PATCH => atom!("patch"), + Method::HEAD => atom!("head"), + _ => unreachable!(), + }; + let path_atom = self.machine_st.atom_tbl.build_with(request.request.uri().path()); + let path_cell = atom_as_cstr_cell!(path_atom); + let headers: Vec = request.request.headers().iter().map(|(header_name, header_value)| { + let h = self.machine_st.heap.len(); + + let header_term = functor!( + self.machine_st.atom_tbl.build_with(header_name.as_str()), + [cell(string_as_cstr_cell!(self.machine_st.atom_tbl.build_with(header_value.to_str().unwrap())))] + ); + + self.machine_st.heap.extend(header_term.into_iter()); + str_loc_as_cell!(h) + }).collect(); + + let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); + + let query_str = request.request.uri().query().unwrap_or(""); + let query_atom = self.machine_st.atom_tbl.build_with(query_str); + let query_cell = string_as_cstr_cell!(query_atom); + + let hyper_req = request.request; + let buf = self.runtime.block_on(async {hyper::body::aggregate(hyper_req).await.unwrap()}); + let reader = buf.reader(); + + let mut stream = Stream::from_http_stream( + path_atom, + Box::new(reader), + &mut self.machine_st.arena + ); + *stream.options_mut() = StreamOptions::default(); + stream.options_mut().set_stream_type(StreamType::Binary); + self.indices.streams.insert(stream); + let stream = stream_as_cell!(stream); + + let handle = arena_alloc!(request.response, &mut self.machine_st.arena); + + self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom)); + self.machine_st.bind(path.as_var().unwrap(), path_cell); + unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]); + self.machine_st.bind(query.as_var().unwrap(), query_cell); + self.machine_st.bind(stream_addr.as_var().unwrap(), stream); + self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle)); + } + None => { + self.machine_st.fail = true; + } + } + } + _ => { + unreachable!(); + } + ); + } + _ => { + unreachable!(); + } + ); + Ok(()) + } + + #[inline(always)] + pub(crate) fn http_answer(&mut self) -> CallResult { + let culprit = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1])); + let status_code = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2])); + let status_code: u16 = match Number::try_from(status_code) { + Ok(Number::Fixnum(n)) => n.get_num() as u16, + Ok(Number::Integer(n)) => match n.to_u16() { + Some(u) => u, + _ => { + self.machine_st.fail = true; + return Ok(()); + } + } + _ => unreachable!() + }; + let stub_gen = || functor_stub(atom!("http_listen"), 2); + let headers = match self.machine_st.try_from_list(self.machine_st.registers[3], stub_gen) { + Ok(addrs) => { + let mut header_map = HeaderMap::new(); + for heap_cell in addrs{ + read_heap_cell!(heap_cell, + (HeapCellValueTag::Str, s) => { + let name = cell_as_atom_cell!(self.machine_st.heap[s]).get_name(); + let value = self.machine_st.value_to_str_like(self.machine_st.heap[s + 1]).unwrap(); + header_map.insert(HeaderName::from_str(name.as_str()).unwrap(), HeaderValue::from_str(value.as_str()).unwrap()); + } + _ => { + unreachable!() + } + ) + } + header_map + }, + Err(e) => return Err(e) + }; + let stream_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[4])); + + read_heap_cell!(culprit, + (HeapCellValueTag::Cons, cons_ptr) => { + match_untyped_arena_ptr!(cons_ptr, + (ArenaHeaderTag::HttpResponse, http_response) => { + let mut response = Response::builder() + .status(status_code); + *response.headers_mut().unwrap() = headers; + let (sender, body) = Body::channel(); + let response = response.body(body).unwrap(); + http_response.blocking_send(response).unwrap(); + + let mut stream = Stream::from_http_sender( + sender, + &mut self.machine_st.arena + ); + *stream.options_mut() = StreamOptions::default(); + stream.options_mut().set_stream_type(StreamType::Binary); + self.indices.streams.insert(stream); + let stream = stream_as_cell!(stream); + self.machine_st.bind(stream_addr.as_var().unwrap(), stream); + } + _ => { + unreachable!(); + } + ); + } + _ => { + unreachable!(); + } + ); + + Ok(()) + } + #[inline(always)] pub(crate) fn current_time(&mut self) { let timestamp = self.systemtime_to_timestamp(SystemTime::now()); diff --git a/src/macros.rs b/src/macros.rs index 8a000b82..85e2e086 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -272,6 +272,20 @@ macro_rules! match_untyped_arena_ptr_pat_body { #[allow(unused_braces)] $code }}; + ($ptr:ident, HttpListener, $listener:ident, $code:expr) => {{ + let payload_ptr = unsafe { std::mem::transmute::<_, *mut HttpListener>($ptr.payload_offset()) }; + #[allow(unused_mut)] + let mut $listener = TypedArenaPtr::new(payload_ptr); + #[allow(unused_braces)] + $code + }}; + ($ptr:ident, HttpResponse, $listener:ident, $code:expr) => {{ + let payload_ptr = unsafe { std::mem::transmute::<_, *mut HttpResponse>($ptr.payload_offset()) }; + #[allow(unused_mut)] + let mut $listener = TypedArenaPtr::new(payload_ptr); + #[allow(unused_braces)] + $code + }}; ($ptr:ident, IndexPtr, $ip:ident, $code:expr) => {{ #[allow(unused_mut)] let mut $ip = TypedArenaPtr::new(unsafe { std::mem::transmute::<_, *mut IndexPtr>($ptr.get_ptr()) }); @@ -291,7 +305,8 @@ macro_rules! match_untyped_arena_ptr_pat { | ArenaHeaderTag::OutputFileStream | ArenaHeaderTag::NamedTcpStream | ArenaHeaderTag::NamedTlsStream - | ArenaHeaderTag::NamedHttpClientStream + | ArenaHeaderTag::HttpReadStream + | ArenaHeaderTag::HttpWriteStream | ArenaHeaderTag::ReadlineStream | ArenaHeaderTag::StaticStringStream | ArenaHeaderTag::ByteStream