"new_debug_unreachable",
]
+[[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
[[package]]
name = "futures-channel"
version = "0.3.21"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
+[[package]]
+name = "futures-executor"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2 1.0.36",
+ "quote 1.0.17",
+ "syn 1.0.90",
+]
+
[[package]]
name = "futures-sink"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
"futures-task",
+ "memchr",
"pin-project-lite",
"pin-utils",
+ "slab",
]
[[package]]
"ctrlc",
"dirs-next",
"divrem",
+ "futures",
"fxhash",
"git-version",
"hostname",
hyper = { version = "0.14", features = ["full"] }
hyper-tls = "0.5.0"
tokio = { version = "1", features = ["full"] }
+futures = "0.3"
[dev-dependencies]
assert_cmd = "1.0.3"
DeterministicLengthRundown,
#[strum_discriminants(strum(props(Arity = "7", Name = "$http_open")))]
HttpOpen,
+ #[strum_discriminants(strum(props(Arity = "2", Name = "$http_listen")))]
+ HttpListen,
+ #[strum_discriminants(strum(props(Arity = "7", Name = "$http_accept")))]
+ HttpAccept,
+ #[strum_discriminants(strum(props(Arity = "4", Name = "$http_answer")))]
+ HttpAnswer,
#[strum_discriminants(strum(props(Arity = "3", Name = "$predicate_defined")))]
PredicateDefined,
#[strum_discriminants(strum(props(Arity = "3", Name = "$strip_module")))]
&Instruction::CallCpuNow(_) |
&Instruction::CallDeterministicLengthRundown(_) |
&Instruction::CallHttpOpen(_) |
+ &Instruction::CallHttpListen(_) |
+ &Instruction::CallHttpAccept(_) |
+ &Instruction::CallHttpAnswer(_) |
&Instruction::CallPredicateDefined(_) |
&Instruction::CallStripModule(_) |
&Instruction::CallCurrentTime(_) |
&Instruction::ExecuteCpuNow(_) |
&Instruction::ExecuteDeterministicLengthRundown(_) |
&Instruction::ExecuteHttpOpen(_) |
+ &Instruction::ExecuteHttpListen(_) |
+ &Instruction::ExecuteHttpAccept(_) |
+ &Instruction::ExecuteHttpAnswer(_) |
&Instruction::ExecutePredicateDefined(_) |
&Instruction::ExecuteStripModule(_) |
&Instruction::ExecuteCurrentTime(_) |
+use crate::http::{HttpListener, HttpResponse};
use crate::machine::loader::LiveLoadState;
use crate::machine::machine_indices::*;
use crate::machine::streams::*;
OutputFileStream = 0b10100,
NamedTcpStream = 0b011100,
NamedTlsStream = 0b100000,
- NamedHttpClientStream = 0b100001,
+ HttpReadStream = 0b100001,
+ HttpWriteStream = 0b100010,
ReadlineStream = 0b110000,
StaticStringStream = 0b110100,
ByteStream = 0b111000,
StandardErrorStream = 0b11000,
NullStream = 0b111100,
TcpListener = 0b1000000,
+ HttpListener = 0b1000001,
+ HttpResponse = 0b1000010,
Dropped = 0b1000100,
IndexPtrDynamicUndefined = 0b1000101,
IndexPtrDynamicIndex = 0b1000110,
}
}
+impl ArenaAllocated for HttpListener {
+ type PtrToAllocated = TypedArenaPtr<HttpListener>;
+
+ #[inline]
+ fn tag() -> ArenaHeaderTag {
+ ArenaHeaderTag::HttpListener
+ }
+
+ #[inline]
+ fn size(&self) -> usize {
+ mem::size_of::<Self>()
+ }
+
+ #[inline]
+ fn copy_to_arena(self, dst: *mut Self) -> Self::PtrToAllocated {
+ unsafe {
+ ptr::write(dst, self);
+ TypedArenaPtr::new(dst as *mut Self)
+ }
+ }
+}
+
+impl ArenaAllocated for HttpResponse {
+ type PtrToAllocated = TypedArenaPtr<HttpResponse>;
+
+ #[inline]
+ fn tag() -> ArenaHeaderTag {
+ ArenaHeaderTag::HttpResponse
+ }
+
+ #[inline]
+ fn size(&self) -> usize {
+ mem::size_of::<Self>()
+ }
+
+ #[inline]
+ fn copy_to_arena(self, dst: *mut Self) -> Self::PtrToAllocated {
+ unsafe {
+ ptr::write(dst, self);
+ TypedArenaPtr::new(dst as *mut Self)
+ }
+ }
+}
+
impl ArenaAllocated for IndexPtr {
type PtrToAllocated = TypedArenaPtr<IndexPtr>;
ArenaHeaderTag::NamedTlsStream => {
ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<NamedTlsStream>>>());
}
- ArenaHeaderTag::NamedHttpClientStream => {
- ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<NamedHttpClientStream>>>());
+ ArenaHeaderTag::HttpReadStream => {
+ ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<HttpReadStream>>>());
}
+ ArenaHeaderTag::HttpWriteStream => {
+ ptr::drop_in_place(value.payload_offset::<StreamLayout<CharReader<HttpWriteStream>>>());
+ }
ArenaHeaderTag::ReadlineStream => {
ptr::drop_in_place(value.payload_offset::<StreamLayout<ReadlineStream>>());
}
ArenaHeaderTag::TcpListener => {
ptr::drop_in_place(value.payload_offset::<TcpListener>());
}
+ ArenaHeaderTag::HttpListener => {
+ ptr::drop_in_place(value.payload_offset::<HttpListener>());
+ }
+ ArenaHeaderTag::HttpResponse => {
+ ptr::drop_in_place(value.payload_offset::<HttpResponse>());
+ }
ArenaHeaderTag::StandardOutputStream => {
ptr::drop_in_place(value.payload_offset::<StreamLayout<StandardOutputStream>>());
}
--- /dev/null
+use std::sync::Arc;
+use std::convert::Infallible;
+
+use hyper::{Response, Request, Body};
+use tokio::sync::Mutex;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+
+pub struct HttpListener {
+ pub incoming: Receiver<HttpRequest>
+}
+
+#[derive(Debug)]
+pub struct HttpRequest {
+ pub request: Request<Body>,
+ pub response: HttpResponse,
+}
+
+pub type HttpResponse = Sender<Response<Body>>;
+
+pub async fn serve_req(req: Request<Body>, tx: Arc<Mutex<Sender<HttpRequest>>>) -> Result<Response<Body>, Infallible> {
+ let (response_tx, mut rx) = channel(1);
+ let http_request = HttpRequest { request: req, response: response_tx };
+ tx.lock().await.send(http_request).await.unwrap();
+ Ok(rx.recv().await.unwrap())
+}
mod forms;
mod heap_iter;
pub mod heap_print;
+mod http;
mod indexing;
#[macro_use]
pub mod instructions {
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ Updated in March 2022 by Adrián Arroyo to use the Hyper backend
Part of Scryer Prolog
This library provides an starting point to build HTTP server based applications.
- It currently implements a subset of HTTP/1.0. It is recommended to put a reverse
- proxy like nginx in front of this server to have access to more advanced features
- (gzip compression, HTTPS, ...)
+ It is based on Hyper, which allows for HTTP/1.0, HTTP/1.1 and HTTP/2. However,
+ some advanced features that Hyper provides are still not accesible.
Usage
==========
Some things that are still missing:
- Read forms in multipart format
- HTTP Basic Auth
- - Keep-Alive support
- Session handling via cookies
- HTML Templating
I place this code in the public domain. Use it in any way you want.
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
+
:- module(http_server, [
- http_listen/2,
- http_headers/2,
- http_status_code/2,
- http_body/2,
- http_redirect/2,
- http_query/3,
- url_decode//1
+ http_listen/2,
+ http_headers/2,
+ http_status_code/2,
+ http_body/2,
+ http_redirect/2,
+ http_query/3
]).
-:- meta_predicate http_listen(?, 2).
+:- meta_predicate http_listen(?, :).
-:- use_module(library(sockets)).
-:- use_module(library(dcgs)).
-:- use_module(library(format)).
-:- use_module(library(error)).
:- use_module(library(charsio)).
-:- use_module(library(lists)).
+:- use_module(library(crypto)).
+:- use_module(library(error)).
+:- use_module(library(format)).
:- use_module(library(iso_ext)).
+:- use_module(library(lists)).
+:- use_module(library(pio)).
:- use_module(library(time)).
-:- use_module(library(crypto)).
-% Module prefix workaround with meta_predicate
http_listen(Port, Module:Handlers0) :-
+ must_be(integer, Port),
+ must_be(list, Handlers0),
maplist(module_qualification(Module), Handlers0, Handlers),
http_listen_(Port, Handlers).
H0 =.. [Method, Path, Goal],
H =.. [Method, Path, M:Goal].
-% Server initialization
http_listen_(Port, Handlers) :-
- must_be(integer, Port),
- must_be(list, Handlers),
- once(socket_server_open(Port, Socket)),
- format("Listening at port ~d\n", [Port]),
- accept_loop(Socket, Handlers).
-
-% Server loop
-accept_loop(Socket, Handlers) :-
- setup_call_cleanup(socket_server_accept(Socket, _Client, Stream, [type(binary)]),
- (
- read_header_lines(Stream, Lines),
- [Request|Headers] = Lines,
- (
- (phrase(parse_request(_Version, Method, Path, Queries), Request), maplist(map_parse_header, Headers, HeadersKV)) -> (
- (
- member("content-length"-ContentLength, HeadersKV) ->
- (number_chars(ContentLengthN, ContentLength), get_bytes(Stream, ContentLengthN, Body))
- ;true
- ),
- current_time(Time),
- phrase(format_time("%Y-%m-%d (%H:%M:%S)", Time), TimeString),
- format("~s ~w ~s\n", [TimeString, Method, Path]),
- (
- match_handler(Handlers, Method, Path, Handler) ->
- (
- HttpRequest = http_request(HeadersKV, binary(Body), Queries),
- HttpResponse = http_response(_, _, _),
- (call(Handler, HttpRequest, HttpResponse) ->
- send_response(Stream, HttpResponse)
- ; format(Stream, "HTTP/1.0 500 Internal Server Error\r\n\r\n", [])
- )
- )
- ; format(Stream, "HTTP/1.0 404 Not Found\r\n\r\n", [])
- )
- );(
- format(Stream, "HTTP/1.0 400 Bad Request\r\n\r\n", []) % bad format
- )
- ),
- ! % Remove
- ), close(Stream)),
- accept_loop(Socket, Handlers).
+ number_chars(Port, CPort),
+ append("0.0.0.0:", CPort, Addr),
+ '$http_listen'(Addr, HttpListener),!,
+ format("Listening at ~s\n", [Addr]),
+ http_loop(HttpListener, Handlers).
+
+http_loop(HttpListener, Handlers) :-
+ '$http_accept'(HttpListener, RequestMethod, RequestPath, RequestHeaders, RequestQuery, RequestStream, ResponseHandle),
+ current_time(Time),
+ phrase(format_time("%Y-%m-%d (%H:%M:%S)", Time), TimeString),
+ format("~s ~w ~s\n", [TimeString, RequestMethod, RequestPath]),
+ maplist(map_header_kv, RequestHeaders, RequestHeadersKV),
+ phrase(parse_queries(RequestQueries), RequestQuery),
+ (
+ match_handler(Handlers, RequestMethod, RequestPath, Handler) ->
+ (
+ HttpRequest = http_request(RequestHeadersKV, stream(RequestStream), RequestQueries),
+ HttpResponse = http_response(_, _, _),
+ (call(Handler, HttpRequest, HttpResponse) ->
+ send_response(ResponseHandle, HttpResponse)
+ ; '$http_answer'(ResponseHandle, 500, [], "Internal Server Error")
+ )
+ )
+ ; '$http_answer'(ResponseHandle, 404, [], "Not Found")
+ ),
+ http_loop(HttpListener, Handlers).
+
+send_response(ResponseHandle, http_response(StatusCode0, text(ResponseText), ResponseHeaders0)) :-
+ default(StatusCode0, 200, StatusCode),
+ maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0),
+ '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream),
+ format(ResponseStream, "~s", [ResponseText]),
+ close(ResponseStream).
+
+send_response(ResponseHandle, http_response(StatusCode0, bytes(ResponseBytes), ResponseHeaders0)) :-
+ default(StatusCode0, 200, StatusCode),
+ maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0),
+ '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream),
+ format(ResponseStream, "~s", [ResponseBytes]),
+ close(ResponseStream).
+
+send_response(ResponseHandle, http_response(StatusCode0, file(Filename), ResponseHeaders0)) :-
+ default(StatusCode0, 200, StatusCode),
+ maplist(map_header_kv_2, ResponseHeaders, ResponseHeaders0),
+ '$http_answer'(ResponseHandle, StatusCode, ResponseHeaders, ResponseStream),
+ setup_call_cleanup(
+ open(Filename, read, FileStream, [type(binary)]),
+ (
+ get_n_chars(FileStream, _, FileCs),
+ format(ResponseStream, "~s", [FileCs])
+ ),
+ close(FileStream)
+ ),
+ close(ResponseStream).
+
+
+default(Var, Default, Out) :-
+ (var(Var) -> Out = Default
+ ; Var = Out
+ ).
+
+map_header_kv(T, K-V) :-
+ T =.. [K0, V],
+ atom_chars(K0, K).
+
+map_header_kv_2(T, K-V) :-
+ atom_chars(K0, K),
+ T =.. [K0, V].
match_handler(Handlers, Method, "/", Handler) :-
member(H, Handlers),
var(Var),
Var = Path.
-
-% Helper and recommended predicates
-
-http_headers(http_request(Headers, _, _), Headers).
-http_headers(http_response(_, _, Headers), Headers).
-
-http_body(http_request(_, binary(ByteBody), _), text(TextBody)) :- chars_utf8bytes(TextBody, ByteBody).
-http_body(http_request(Headers, binary(ByteBody), _), form(FormBody)) :-
- member("content-type"-"application/x-www-form-urlencoded", Headers),
- chars_utf8bytes(TextBody, ByteBody),
- phrase(parse_queries(FormBody), TextBody).
-http_body(http_request(_, Body, _), Body).
-http_body(http_response(_, Body, _), Body).
-
-http_status_code(http_response(StatusCode, _, _), StatusCode).
-
-http_redirect(http_response(307, text("Moved Temporarily"), ["Location"-Uri]), Uri).
-
-http_query(http_request(_, _, Queries), Key, Value) :- member(Key-Value, Queries).
-
-% Route matching
path(Pattern) -->
{
Pattern =.. Parts,
path([]) --> [].
-% Send responses
-send_response(Stream, http_response(StatusCode0, file(Filename), Headers)) :-
- default(StatusCode0, 200, StatusCode),
- format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]),
- overwrite_header("connection"-"Close", Headers, Headers0),
- write_headers(Stream, Headers0),
- format(Stream, "\r\n", []),
- setup_call_cleanup(
- open(Filename, read, FileStream, [type(binary)]),
- pipe_bytes(FileStream, Stream),
- close(FileStream)
- ).
-
-send_response(Stream, http_response(StatusCode0, text(TextResponse), Headers)) :-
- default(StatusCode0, 200, StatusCode),
- format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]),
- overwrite_header("content-type"-"text/plain", Headers, Headers0),
- overwrite_header("connection"-"Close", Headers0, Headers1),
- write_headers(Stream, Headers1),
- format(Stream, "\r\n~s", [TextResponse]).
+string_without(Not, [Char|String]) -->
+ [Char],
+ {
+ \+ member(Char, Not)
+ },
+ string_without(Not, String).
-send_response(Stream, http_response(StatusCode0, binary(BinaryResponse), Headers)) :-
- default(StatusCode0, 200, StatusCode),
- format(Stream, "HTTP/1.0 ~d\r\n", [StatusCode]),
- overwrite_header("connection"-"Close", Headers, Headers0),
- write_headers(Stream, Headers0),
- format(Stream, "\r\n", []),
- put_bytes(Stream, BinaryResponse).
+string_without(_, []) -->
+ [].
-default(Var, Default, Out) :-
- (var(Var) -> Out = Default
- ; Var = Out
- ).
+http_headers(http_request(Headers, _, _), Headers).
+http_headers(http_response(_, _, Headers), Headers).
-header([]) --> [].
-header([Key-Value|Headers]) -->
- format_("~s: ~s\r\n", [Key, Value]),
- header(Headers).
-
-write_headers(Stream, Headers) :-
- phrase(header(Headers), Cs),
- format(Stream, "~s", [Cs]).
-
-overwrite_header(Key-Value, [], [Key-Value]).
-overwrite_header(Key-Value, [Header|Headers], [Header|HeadersOut]) :-
- Header = Key0-_,
- Key0 \= Key,
- overwrite_header(Key-Value, Headers, HeadersOut).
-overwrite_header(Key-Value, [Header|Headers], [NewHeader|Headers]) :-
- Header = Key-_,
- NewHeader = Key-Value.
-
-parse_request(http_version(Major, Minor), Method, Path, Queries) -->
- method(Method),
- " ",
- parse_path(Path, Queries),
- " ",
- "HTTP/",
- natural(Major),
- ".",
- natural(Minor),
- "\r\n".
-
-parse_path(Path, Queries) -->
- string_without("?", Path),
- "?",
- parse_queries(Queries).
+http_body(http_request(_, stream(StreamBody), _), bytes(BytesBody)) :- get_n_chars(StreamBody, _, BytesBody).
+http_body(http_request(_, stream(StreamBody), _), text(TextBody)) :- get_n_chars(StreamBody, _, TextBody).
+http_body(http_request(Headers, stream(StreamBody), _), form(FormBody)) :-
+ member("content-type"-"application/x-www-form-urlencoded", Headers),
+ get_n_chars(StreamBody, _, TextBody),
+ phrase(parse_queries(FormBody), TextBody).
+http_body(http_request(_, Body, _), Body).
+http_body(http_response(_, Body, _), Body).
-parse_path(Path, []) -->
- string_without(" ", Path).
+http_status_code(http_response(StatusCode, _, _), StatusCode).
+http_redirect(http_response(307, text("Moved Temporarily"), ["Location"-Uri]), Uri).
+http_query(http_request(_, _, Queries), Key, Value) :- member(Key-Value, Queries).
parse_queries([Key-Value|Queries]) -->
string_without("=", Key0),
phrase(url_decode(Value), Value0)
}.
-map_parse_header(Header, HeaderKV) :-
- phrase(parse_header(HeaderKV), Header).
-
-parse_header(Key-Value) -->
- string_without(":", Key0),
- {
- chars_lower(Key0, Key)
- },
- ": ",
- string_without("\r", Value),
- "\r\n".
-
-method(options) --> "OPTIONS".
-method(get) --> "GET".
-method(head) --> "HEAD".
-method(post) --> "POST".
-method(put) --> "PUT".
-method(delete) --> "DELETE".
-
-string_without(Not, [Char|String]) -->
- [Char],
- {
- \+ member(Char, Not)
- },
- string_without(Not, String).
-
-string_without(_, []) -->
+parse_queries([]) -->
[].
-natural(Nat) -->
- natural_(NatChars),
- {
- number_chars(Nat, NatChars)
- }.
-
-natural_([Nat|Nats]) -->
- [Nat],
- {
- char_type(Nat, decimal_digit)
- },
- natural_(Nats).
-
-natural_([]) -->
- [].
-
-read_header_lines(Stream, Hs) :-
- read_line_to_chars(Stream, Cs, []),
- ( Cs == "" -> Hs = []
- ; Cs == "\r\n" -> Hs = []
- ; Hs = [Cs|Rest],
- read_header_lines(Stream, Rest)
- ).
-
-get_bytes(Stream, Length, Res) :- get_bytes(Stream, Length, [], Res).
-get_bytes(Stream, Length, Acc, Res) :-
- (Length > 0 -> (
- get_byte(Stream, B),
- B =\= -1,
- get_bytes(Stream, Length - 1, [B|Acc], Res)
- ); reverse(Acc, Res)).
-
-put_bytes(_, []).
-put_bytes(Stream, [Byte|Bytes]) :-
- put_byte(Stream, Byte),
- put_bytes(Stream, Bytes).
-
-pipe_bytes(StreamIn, StreamOut) :-
- get_byte(StreamIn, Byte),
- (
- Byte =\= -1 ->
- (
- put_byte(StreamOut, Byte),
- pipe_bytes(StreamIn, StreamOut)
- )
- ; true).
-
-% WARNING: This only works for ASCII chars. This code can be modified to support
-% Latin1 characters also but a completely different approach is needed for other
-% languages. Since HTTP internals are ASCII, this is fine for this usecase.
-chars_lower(Chars, Lower) :-
- maplist(char_lower, Chars, Lower).
-char_lower(Char, Lower) :-
- char_code(Char, Code),
- ((Code >= 65,Code =< 90) ->
- LowerCode is Code + 32,
- char_code(Lower, LowerCode)
- ; Char = Lower).
-
% Decodes a UTF-8 URL Encoded string: RFC-1738
url_decode([Char|Chars]) -->
[Char],
try_or_throw!(self.machine_st, self.http_open());
step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
}
+ &Instruction::CallHttpListen(_) => {
+ try_or_throw!(self.machine_st, self.http_listen());
+ step_or_fail!(self, self.machine_st.p += 1);
+ }
+ &Instruction::ExecuteHttpListen(_) => {
+ try_or_throw!(self.machine_st, self.http_listen());
+ step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
+ }
+ &Instruction::CallHttpAccept(_) => {
+ try_or_throw!(self.machine_st, self.http_accept());
+ step_or_fail!(self, self.machine_st.p += 1);
+ }
+ &Instruction::ExecuteHttpAccept(_) => {
+ try_or_throw!(self.machine_st, self.http_accept());
+ step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
+ }
+ &Instruction::CallHttpAnswer(_) => {
+ try_or_throw!(self.machine_st, self.http_answer());
+ step_or_fail!(self, self.machine_st.p += 1);
+ }
+ &Instruction::ExecuteHttpAnswer(_) => {
+ try_or_throw!(self.machine_st, self.http_answer());
+ step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
+ }
&Instruction::CallCurrentTime(_) => {
self.current_time();
step_or_fail!(self, self.machine_st.p += 1);
let user_output = Stream::stdout(&mut machine_st.arena);
let user_error = Stream::stderr(&mut machine_st.arena);
- let runtime = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
+ let runtime = tokio::runtime::Runtime::new()
.unwrap();
let mut wam = Machine {
use std::ptr;
use native_tls::TlsStream;
+use hyper::body::{Bytes, Sender};
#[derive(Debug, BitfieldSpecifier, Clone, Copy, PartialEq, Eq, Hash)]
#[bits = 1]
}
}
-pub struct NamedHttpClientStream {
+pub struct HttpReadStream {
url: Atom,
body_reader: Box<dyn BufRead>,
}
-impl Debug for NamedHttpClientStream {
+impl Debug for HttpReadStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Http Client Stream [{}]", self.url.as_str())
+ write!(f, "Http Read Stream [{}]", self.url.as_str())
}
}
-impl Read for NamedHttpClientStream {
+impl Read for HttpReadStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.body_reader.read(buf)
}
}
+pub struct HttpWriteStream {
+ body_writer: Sender,
+}
+
+impl Debug for HttpWriteStream {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Http Write Stream")
+ }
+}
+
+impl Write for HttpWriteStream {
+ #[inline]
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let bytes = Bytes::copy_from_slice(buf);
+ let len = bytes.len();
+ match self.body_writer.try_send_data(bytes) {
+ Ok(()) => Ok(len),
+ Err(_) => Err(std::io::Error::from(ErrorKind::Interrupted))
+ }
+ }
+
+ #[inline]
+ fn flush(&mut self) -> std::io::Result<()> {
+ Ok(())
+ }
+}
+
#[derive(Debug)]
pub struct StandardOutputStream {}
arena_allocated_impl_for_stream!(OutputFileStream, OutputFileStream);
arena_allocated_impl_for_stream!(CharReader<NamedTcpStream>, NamedTcpStream);
arena_allocated_impl_for_stream!(CharReader<NamedTlsStream>, NamedTlsStream);
-arena_allocated_impl_for_stream!(CharReader<NamedHttpClientStream>, NamedHttpClientStream);
+arena_allocated_impl_for_stream!(CharReader<HttpReadStream>, HttpReadStream);
+arena_allocated_impl_for_stream!(CharReader<HttpWriteStream>, HttpWriteStream);
arena_allocated_impl_for_stream!(ReadlineStream, ReadlineStream);
arena_allocated_impl_for_stream!(StaticStringStream, StaticStringStream);
arena_allocated_impl_for_stream!(StandardOutputStream, StandardOutputStream);
StaticString(TypedArenaPtr<StreamLayout<StaticStringStream>>),
NamedTcp(TypedArenaPtr<StreamLayout<CharReader<NamedTcpStream>>>),
NamedTls(TypedArenaPtr<StreamLayout<CharReader<NamedTlsStream>>>),
- NamedHttpClient(TypedArenaPtr<StreamLayout<CharReader<NamedHttpClientStream>>>),
+ HttpRead(TypedArenaPtr<StreamLayout<CharReader<HttpReadStream>>>),
+ HttpWrite(TypedArenaPtr<StreamLayout<CharReader<HttpWriteStream>>>),
Null(StreamOptions),
Readline(TypedArenaPtr<StreamLayout<ReadlineStream>>),
StandardOutput(TypedArenaPtr<StreamLayout<StandardOutputStream>>),
}
ArenaHeaderTag::NamedTcpStream => Stream::NamedTcp(TypedArenaPtr::new(ptr as *mut _)),
ArenaHeaderTag::NamedTlsStream => Stream::NamedTls(TypedArenaPtr::new(ptr as *mut _)),
- ArenaHeaderTag::NamedHttpClientStream => Stream::NamedHttpClient(TypedArenaPtr::new(ptr as *mut _)),
+ ArenaHeaderTag::HttpReadStream => Stream::HttpRead(TypedArenaPtr::new(ptr as *mut _)),
+ ArenaHeaderTag::HttpWriteStream => Stream::HttpWrite(TypedArenaPtr::new(ptr as *mut _)),
ArenaHeaderTag::ReadlineStream => Stream::Readline(TypedArenaPtr::new(ptr as *mut _)),
ArenaHeaderTag::StaticStringStream => {
Stream::StaticString(TypedArenaPtr::new(ptr as *mut _))
Stream::StaticString(ptr) => ptr.header_ptr(),
Stream::NamedTcp(ptr) => ptr.header_ptr(),
Stream::NamedTls(ptr) => ptr.header_ptr(),
- Stream::NamedHttpClient(ptr) => ptr.header_ptr(),
+ Stream::HttpRead(ptr) => ptr.header_ptr(),
+ Stream::HttpWrite(ptr) => ptr.header_ptr(),
Stream::Null(_) => ptr::null(),
Stream::Readline(ptr) => ptr.header_ptr(),
Stream::StandardOutput(ptr) => ptr.header_ptr(),
Stream::StaticString(ref ptr) => &ptr.options,
Stream::NamedTcp(ref ptr) => &ptr.options,
Stream::NamedTls(ref ptr) => &ptr.options,
- Stream::NamedHttpClient(ref ptr) => &ptr.options,
+ Stream::HttpRead(ref ptr) => &ptr.options,
+ Stream::HttpWrite(ref ptr) => &ptr.options,
Stream::Null(ref options) => options,
Stream::Readline(ref ptr) => &ptr.options,
Stream::StandardOutput(ref ptr) => &ptr.options,
Stream::StaticString(ref mut ptr) => &mut ptr.options,
Stream::NamedTcp(ref mut ptr) => &mut ptr.options,
Stream::NamedTls(ref mut ptr) => &mut ptr.options,
- Stream::NamedHttpClient(ref mut ptr) => &mut ptr.options,
+ Stream::HttpRead(ref mut ptr) => &mut ptr.options,
+ Stream::HttpWrite(ref mut ptr) => &mut ptr.options,
Stream::Null(ref mut options) => options,
Stream::Readline(ref mut ptr) => &mut ptr.options,
Stream::StandardOutput(ref mut ptr) => &mut ptr.options,
Stream::StaticString(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::NamedTcp(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::NamedTls(ptr) => ptr.lines_read += incr_num_lines_read,
- Stream::NamedHttpClient(ptr) => ptr.lines_read += incr_num_lines_read,
+ Stream::HttpRead(ptr) => ptr.lines_read += incr_num_lines_read,
+ Stream::HttpWrite(_) => {}
Stream::Null(_) => {}
Stream::Readline(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::StandardOutput(ptr) => ptr.lines_read += incr_num_lines_read,
Stream::StaticString(ptr) => ptr.lines_read = value,
Stream::NamedTcp(ptr) => ptr.lines_read = value,
Stream::NamedTls(ptr) => ptr.lines_read = value,
- Stream::NamedHttpClient(ptr) => ptr.lines_read = value,
+ Stream::HttpRead(ptr) => ptr.lines_read = value,
+ Stream::HttpWrite(_) => {}
Stream::Null(_) => {}
Stream::Readline(ptr) => ptr.lines_read = value,
Stream::StandardOutput(ptr) => ptr.lines_read = value,
Stream::StaticString(ptr) => ptr.lines_read,
Stream::NamedTcp(ptr) => ptr.lines_read,
Stream::NamedTls(ptr) => ptr.lines_read,
- Stream::NamedHttpClient(ptr) => ptr.lines_read,
+ Stream::HttpRead(ptr) => ptr.lines_read,
+ Stream::HttpWrite(_) => 0,
Stream::Null(_) => 0,
Stream::Readline(ptr) => ptr.lines_read,
Stream::StandardOutput(ptr) => ptr.lines_read,
Stream::InputFile(file) => (*file).peek_char(),
Stream::NamedTcp(tcp_stream) => (*tcp_stream).peek_char(),
Stream::NamedTls(tls_stream) => (*tls_stream).peek_char(),
- Stream::NamedHttpClient(http_stream) => (*http_stream).peek_char(),
+ Stream::HttpRead(http_stream) => (*http_stream).peek_char(),
Stream::Readline(rl_stream) => (*rl_stream).peek_char(),
Stream::StaticString(src) => (*src).peek_char(),
Stream::Byte(cursor) => (*cursor).peek_char(),
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
Stream::InputFile(file) => (*file).read_char(),
Stream::NamedTcp(tcp_stream) => (*tcp_stream).read_char(),
Stream::NamedTls(tls_stream) => (*tls_stream).read_char(),
- Stream::NamedHttpClient(http_stream) => (*http_stream).read_char(),
+ Stream::HttpRead(http_stream) => (*http_stream).read_char(),
Stream::Readline(rl_stream) => (*rl_stream).read_char(),
Stream::StaticString(src) => (*src).read_char(),
Stream::Byte(cursor) => (*cursor).read_char(),
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => Some(Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
Stream::InputFile(file) => file.put_back_char(c),
Stream::NamedTcp(tcp_stream) => tcp_stream.put_back_char(c),
Stream::NamedTls(tls_stream) => tls_stream.put_back_char(c),
- Stream::NamedHttpClient(http_stream) => http_stream.put_back_char(c),
+ Stream::HttpRead(http_stream) => http_stream.put_back_char(c),
Stream::Readline(rl_stream) => rl_stream.put_back_char(c),
Stream::StaticString(src) => src.put_back_char(c),
Stream::Byte(cursor) => cursor.put_back_char(c),
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => {}
}
}
Stream::InputFile(ref mut file) => file.consume(nread),
Stream::NamedTcp(ref mut tcp_stream) => tcp_stream.consume(nread),
Stream::NamedTls(ref mut tls_stream) => tls_stream.consume(nread),
- Stream::NamedHttpClient(ref mut http_stream) => http_stream.consume(nread),
+ Stream::HttpRead(ref mut http_stream) => http_stream.consume(nread),
Stream::Readline(ref mut rl_stream) => rl_stream.consume(nread),
Stream::StaticString(ref mut src) => src.consume(nread),
Stream::Byte(ref mut cursor) => cursor.consume(nread),
Stream::OutputFile(_) |
Stream::StandardError(_) |
Stream::StandardOutput(_) |
+ Stream::HttpWrite(_) |
Stream::Null(_) => {}
}
}
Stream::InputFile(file) => (*file).read(buf),
Stream::NamedTcp(tcp_stream) => (*tcp_stream).read(buf),
Stream::NamedTls(tls_stream) => (*tls_stream).read(buf),
- Stream::NamedHttpClient(http_stream) => (*http_stream).read(buf),
+ Stream::HttpRead(http_stream) => (*http_stream).read(buf),
Stream::Readline(rl_stream) => (*rl_stream).read(buf),
Stream::StaticString(src) => (*src).read(buf),
Stream::Byte(cursor) => (*cursor).read(buf),
Stream::OutputFile(_)
| Stream::StandardError(_)
- | Stream::StandardOutput(_)
+ | Stream::StandardOutput(_)
+ | Stream::HttpWrite(_)
| Stream::Null(_) => Err(std::io::Error::new(
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
Stream::Byte(ref mut cursor) => cursor.get_mut().write(buf),
Stream::StandardOutput(stream) => stream.write(buf),
Stream::StandardError(stream) => stream.write(buf),
- Stream::NamedHttpClient(_) |
+ Stream::HttpWrite(ref mut stream) => stream.get_mut().write(buf),
+ Stream::HttpRead(_) |
Stream::StaticString(_) |
Stream::Readline(_) |
Stream::InputFile(..) |
Stream::Byte(ref mut cursor) => cursor.stream.get_mut().flush(),
Stream::StandardError(stream) => stream.stream.flush(),
Stream::StandardOutput(stream) => stream.stream.flush(),
- Stream::NamedHttpClient(_) |
+ Stream::HttpWrite(ref mut stream) => stream.stream.get_mut().flush(),
+ Stream::HttpRead(_) |
Stream::StaticString(_) |
Stream::Readline(_) |
Stream::InputFile(_) |
Stream::StaticString(stream) => stream.past_end_of_stream,
Stream::NamedTcp(stream) => stream.past_end_of_stream,
Stream::NamedTls(stream) => stream.past_end_of_stream,
- Stream::NamedHttpClient(stream) => stream.past_end_of_stream,
+ Stream::HttpRead(stream) => stream.past_end_of_stream,
+ Stream::HttpWrite(stream) => stream.past_end_of_stream,
Stream::Null(_) => false,
Stream::Readline(stream) => stream.past_end_of_stream,
Stream::StandardOutput(stream) => stream.past_end_of_stream,
Stream::StaticString(stream) => stream.past_end_of_stream = value,
Stream::NamedTcp(stream) => stream.past_end_of_stream = value,
Stream::NamedTls(stream) => stream.past_end_of_stream = value,
- Stream::NamedHttpClient(stream) => stream.past_end_of_stream = value,
+ Stream::HttpRead(stream) => stream.past_end_of_stream = value,
+ Stream::HttpWrite(stream) => stream.past_end_of_stream = value,
Stream::Null(_) => {}
Stream::Readline(stream) => stream.past_end_of_stream = value,
Stream::StandardOutput(stream) => stream.past_end_of_stream = value,
Stream::Byte(_)
| Stream::Readline(_)
| Stream::StaticString(_)
- | Stream::NamedHttpClient(_)
+ | Stream::HttpRead(_)
| Stream::InputFile(..) => atom!("read"),
Stream::NamedTcp(..) | Stream::NamedTls(..) => atom!("read_append"),
Stream::OutputFile(file) if file.is_append => atom!("append"),
- Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => atom!("write"),
+ Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) | Stream::HttpWrite(_) => atom!("write"),
Stream::Null(_) => atom!(""),
}
}
http_stream: Box<dyn BufRead>,
arena: &mut Arena,
) -> Self {
- Stream::NamedHttpClient(arena_alloc!(
- StreamLayout::new(CharReader::new(NamedHttpClientStream {
+ Stream::HttpRead(arena_alloc!(
+ StreamLayout::new(CharReader::new(HttpReadStream {
url,
body_reader: http_stream
})),
))
}
+ #[inline]
+ pub(crate) fn from_http_sender(
+ body_writer: Sender,
+ arena: &mut Arena,
+ ) -> Self {
+ Stream::HttpWrite(arena_alloc!(
+ StreamLayout::new(CharReader::new(HttpWriteStream {
+ body_writer
+ })),
+ arena
+ ))
+ }
+
#[inline]
pub(crate) fn from_file_as_output(
file_name: Atom,
Stream::NamedTls(ref mut tls_stream) => {
tls_stream.inner_mut().tls_stream.shutdown()
}
- Stream::NamedHttpClient(ref mut http_stream) => {
+ Stream::HttpRead(ref mut http_stream) => {
unsafe {
http_stream.set_tag(ArenaHeaderTag::Dropped);
std::ptr::drop_in_place(&mut http_stream.inner_mut().body_reader as *mut _);
Ok(())
}
+ Stream::HttpWrite(ref mut http_stream) => {
+ unsafe {
+ http_stream.set_tag(ArenaHeaderTag::Dropped);
+ std::ptr::drop_in_place(&mut http_stream.inner_mut().body_writer as *mut _);
+ }
+
+ Ok(())
+ }
Stream::InputFile(mut file_stream) => {
// close the stream by dropping the inner File.
unsafe {
match self {
Stream::NamedTcp(..)
| Stream::NamedTls(..)
- | Stream::NamedHttpClient(..)
+ | Stream::HttpRead(..)
| Stream::Byte(_)
| Stream::Readline(_)
| Stream::StaticString(_)
Stream::StandardError(_)
| Stream::StandardOutput(_)
| Stream::NamedTcp(..)
- | Stream::NamedTls(..)
+ | Stream::NamedTls(..)
+ | Stream::HttpWrite(..)
| Stream::Byte(_)
| Stream::OutputFile(..) => true,
_ => false,
use crate::forms::*;
use crate::heap_iter::*;
use crate::heap_print::*;
+use crate::http::{self, HttpListener, HttpResponse};
use crate::instructions::*;
use crate::machine;
use crate::machine::{Machine, VERIFY_ATTR_INTERRUPT_LOC, get_structure_index};
use std::cell::Cell;
use std::cmp::Ordering;
use std::collections::BTreeSet;
-use std::convert::TryFrom;
+use std::convert::{TryFrom, Infallible};
use std::env;
use std::fs;
use std::hash::{BuildHasher, BuildHasherDefault};
use std::io::{ErrorKind, Read, Write};
use std::iter::{once, FromIterator};
use std::mem;
-use std::net::{TcpListener, TcpStream};
+use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs};
use std::num::NonZeroU32;
use std::ops::Sub;
use std::process;
use std::rc::Rc;
use std::str::FromStr;
+use std::sync::Arc;
use chrono::{offset::Local, DateTime};
use cpu_time::ProcessTime;
use roxmltree;
use select;
-use hyper::{Body, Client, HeaderMap, Method, Request, Uri};
+use hyper::{Body, Server, Client, HeaderMap, Method, Request, Response, Uri};
use hyper::header::{HeaderName, HeaderValue};
use hyper::body::Buf;
+use hyper::service::{make_service_fn, service_fn};
use hyper_tls::HttpsConnector;
+use tokio::sync::Mutex;
+use tokio::sync::mpsc::channel;
ref_thread_local! {
pub(crate) static managed RANDOM_STATE: RandState<'static> = RandState::new();
Ok(())
}
+ #[inline(always)]
+ pub(crate) fn http_listen(&mut self) -> CallResult {
+ let address_sink = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1]));
+ if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) {
+ let address_string = address_str.as_str();
+ let addr: SocketAddr = match address_string.to_socket_addrs().ok().and_then(|mut s| s.next()) {
+ Some(addr) => addr,
+ _ => {
+ self.machine_st.fail = true;
+ return Ok(());
+ }
+ };
+
+ let (tx, rx) = channel(1);
+ let tx = Arc::new(Mutex::new(tx));
+
+ let _guard = self.runtime.enter();
+ let server = match Server::try_bind(&addr) {
+ Ok(server) => server,
+ Err(_) => {
+ return Err(self.machine_st.open_permission_error(address_sink, atom!("http_listen"), 2));
+ }
+ };
+
+ self.runtime.spawn(async move {
+ let make_svc = make_service_fn(move |_conn| {
+ let tx = tx.clone();
+ async move { Ok::<_, Infallible>(service_fn(move |req| http::serve_req(req, tx.clone()))) }
+ });
+ let server = server.serve(make_svc);
+
+ if let Err(_) = server.await {
+ eprintln!("server error");
+ }
+ });
+ let http_listener = HttpListener { incoming: rx };
+ let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena);
+ let addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2]));
+ self.machine_st.bind(addr.as_var().unwrap(), typed_arena_ptr_as_cell!(http_listener));
+ }
+ Ok(())
+ }
+
+ #[inline(always)]
+ pub(crate) fn http_accept(&mut self) -> CallResult {
+ let culprit = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1]));
+ let method = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2]));
+ let path = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[3]));
+ let query = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[5]));
+ let stream_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[6]));
+ let handle_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[7]));
+ read_heap_cell!(culprit,
+ (HeapCellValueTag::Cons, cons_ptr) => {
+ match_untyped_arena_ptr!(cons_ptr,
+ (ArenaHeaderTag::HttpListener, http_listener) => {
+ match http_listener.incoming.blocking_recv() {
+ Some(request) => {
+ let method_atom = match *request.request.method() {
+ Method::GET => atom!("get"),
+ Method::POST => atom!("post"),
+ Method::PUT => atom!("put"),
+ Method::DELETE => atom!("delete"),
+ Method::PATCH => atom!("patch"),
+ Method::HEAD => atom!("head"),
+ _ => unreachable!(),
+ };
+ let path_atom = self.machine_st.atom_tbl.build_with(request.request.uri().path());
+ let path_cell = atom_as_cstr_cell!(path_atom);
+ let headers: Vec<HeapCellValue> = request.request.headers().iter().map(|(header_name, header_value)| {
+ let h = self.machine_st.heap.len();
+
+ let header_term = functor!(
+ self.machine_st.atom_tbl.build_with(header_name.as_str()),
+ [cell(string_as_cstr_cell!(self.machine_st.atom_tbl.build_with(header_value.to_str().unwrap())))]
+ );
+
+ self.machine_st.heap.extend(header_term.into_iter());
+ str_loc_as_cell!(h)
+ }).collect();
+
+ let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
+
+ let query_str = request.request.uri().query().unwrap_or("");
+ let query_atom = self.machine_st.atom_tbl.build_with(query_str);
+ let query_cell = string_as_cstr_cell!(query_atom);
+
+ let hyper_req = request.request;
+ let buf = self.runtime.block_on(async {hyper::body::aggregate(hyper_req).await.unwrap()});
+ let reader = buf.reader();
+
+ let mut stream = Stream::from_http_stream(
+ path_atom,
+ Box::new(reader),
+ &mut self.machine_st.arena
+ );
+ *stream.options_mut() = StreamOptions::default();
+ stream.options_mut().set_stream_type(StreamType::Binary);
+ self.indices.streams.insert(stream);
+ let stream = stream_as_cell!(stream);
+
+ let handle = arena_alloc!(request.response, &mut self.machine_st.arena);
+
+ self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
+ self.machine_st.bind(path.as_var().unwrap(), path_cell);
+ unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
+ self.machine_st.bind(query.as_var().unwrap(), query_cell);
+ self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
+ self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
+ }
+ None => {
+ self.machine_st.fail = true;
+ }
+ }
+ }
+ _ => {
+ unreachable!();
+ }
+ );
+ }
+ _ => {
+ unreachable!();
+ }
+ );
+ Ok(())
+ }
+
+ #[inline(always)]
+ pub(crate) fn http_answer(&mut self) -> CallResult {
+ let culprit = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[1]));
+ let status_code = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[2]));
+ let status_code: u16 = match Number::try_from(status_code) {
+ Ok(Number::Fixnum(n)) => n.get_num() as u16,
+ Ok(Number::Integer(n)) => match n.to_u16() {
+ Some(u) => u,
+ _ => {
+ self.machine_st.fail = true;
+ return Ok(());
+ }
+ }
+ _ => unreachable!()
+ };
+ let stub_gen = || functor_stub(atom!("http_listen"), 2);
+ let headers = match self.machine_st.try_from_list(self.machine_st.registers[3], stub_gen) {
+ Ok(addrs) => {
+ let mut header_map = HeaderMap::new();
+ for heap_cell in addrs{
+ read_heap_cell!(heap_cell,
+ (HeapCellValueTag::Str, s) => {
+ let name = cell_as_atom_cell!(self.machine_st.heap[s]).get_name();
+ let value = self.machine_st.value_to_str_like(self.machine_st.heap[s + 1]).unwrap();
+ header_map.insert(HeaderName::from_str(name.as_str()).unwrap(), HeaderValue::from_str(value.as_str()).unwrap());
+ }
+ _ => {
+ unreachable!()
+ }
+ )
+ }
+ header_map
+ },
+ Err(e) => return Err(e)
+ };
+ let stream_addr = self.machine_st.store(self.machine_st.deref(self.machine_st.registers[4]));
+
+ read_heap_cell!(culprit,
+ (HeapCellValueTag::Cons, cons_ptr) => {
+ match_untyped_arena_ptr!(cons_ptr,
+ (ArenaHeaderTag::HttpResponse, http_response) => {
+ let mut response = Response::builder()
+ .status(status_code);
+ *response.headers_mut().unwrap() = headers;
+ let (sender, body) = Body::channel();
+ let response = response.body(body).unwrap();
+ http_response.blocking_send(response).unwrap();
+
+ let mut stream = Stream::from_http_sender(
+ sender,
+ &mut self.machine_st.arena
+ );
+ *stream.options_mut() = StreamOptions::default();
+ stream.options_mut().set_stream_type(StreamType::Binary);
+ self.indices.streams.insert(stream);
+ let stream = stream_as_cell!(stream);
+ self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
+ }
+ _ => {
+ unreachable!();
+ }
+ );
+ }
+ _ => {
+ unreachable!();
+ }
+ );
+
+ Ok(())
+ }
+
#[inline(always)]
pub(crate) fn current_time(&mut self) {
let timestamp = self.systemtime_to_timestamp(SystemTime::now());
#[allow(unused_braces)]
$code
}};
+ ($ptr:ident, HttpListener, $listener:ident, $code:expr) => {{
+ let payload_ptr = unsafe { std::mem::transmute::<_, *mut HttpListener>($ptr.payload_offset()) };
+ #[allow(unused_mut)]
+ let mut $listener = TypedArenaPtr::new(payload_ptr);
+ #[allow(unused_braces)]
+ $code
+ }};
+ ($ptr:ident, HttpResponse, $listener:ident, $code:expr) => {{
+ let payload_ptr = unsafe { std::mem::transmute::<_, *mut HttpResponse>($ptr.payload_offset()) };
+ #[allow(unused_mut)]
+ let mut $listener = TypedArenaPtr::new(payload_ptr);
+ #[allow(unused_braces)]
+ $code
+ }};
($ptr:ident, IndexPtr, $ip:ident, $code:expr) => {{
#[allow(unused_mut)]
let mut $ip = TypedArenaPtr::new(unsafe { std::mem::transmute::<_, *mut IndexPtr>($ptr.get_ptr()) });
| ArenaHeaderTag::OutputFileStream
| ArenaHeaderTag::NamedTcpStream
| ArenaHeaderTag::NamedTlsStream
- | ArenaHeaderTag::NamedHttpClientStream
+ | ArenaHeaderTag::HttpReadStream
+ | ArenaHeaderTag::HttpWriteStream
| ArenaHeaderTag::ReadlineStream
| ArenaHeaderTag::StaticStringStream
| ArenaHeaderTag::ByteStream