From: Nicolas Luck Date: Thu, 5 Oct 2023 10:06:30 +0000 (+0200) Subject: Merge branch 'master' into library-use-case X-Git-Tag: remove^2~10 X-Git-Url: https://git.sagredo.dev/?a=commitdiff_plain;h=640c637ca81c8fe3112bbff557619e6112df71b4;p=scryer-prolog.git Merge branch 'master' into library-use-case # Conflicts: # Cargo.lock # Cargo.toml # src/http.rs # src/machine/mock_wam.rs # src/machine/mod.rs # src/machine/system_calls.rs --- 640c637ca81c8fe3112bbff557619e6112df71b4 diff --cc Cargo.toml index 47e26ecc,5fb0db69..3f60f4bc --- a/Cargo.toml +++ b/Cargo.toml @@@ -64,12 -64,10 +64,11 @@@ smallvec = "1.8.0 static_assertions = "1.1.0" ryu = "1.0.9" futures = "0.3" +regex = "1.9.1" libloading = "0.7" derive_deref = "1.1.1" - http-body-util = "0.1.0-rc.2" bytes = "1" - dashu = { version = "0.3.1", git = "https://github.com/cmpute/dashu.git" } + dashu = "0.4.0" num-order = { version = "1.2.0" } rand = "0.8.5" @@@ -80,8 -78,7 +79,9 @@@ crossterm = { version = "0.20.0", optio ctrlc = { version = "3.2.2", optional = true } rustyline = { version = "12.0.0", optional = true } native-tls = { version = "0.2.4", optional = true } - hyper = { version = "=1.0.0-rc.4", features = ["full"], optional = true } - hyper-util = { git = "https://github.com/hyperium/hyper-util.git" } ++#hyper = { version = "=1.0.0-rc.4", features = ["full"], optional = true } ++#hyper-util = { git = "https://github.com/hyperium/hyper-util.git" } + warp = { version = "=0.3.5", features = ["tls"], optional = true } reqwest = { version = "0.11.18", features = ["blocking"], optional = true } tokio = { version = "1.28.2", features = ["full"] } diff --cc src/machine/mod.rs index 40a96d4e,665366c3..45688ff4 --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@@ -54,12 -51,11 +54,15 @@@ use ordered_float::OrderedFloat use std::cmp::Ordering; use std::env; +use std::io::Read; use std::path::PathBuf; use std::sync::atomic::AtomicBool; + +use self::config::MachineConfig; +use self::parsed_results::*; + use tokio::runtime::Runtime; + use rand::rngs::StdRng; + use rand::SeedableRng; lazy_static! { pub static ref INTERRUPT: AtomicBool = AtomicBool::new(false); @@@ -74,8 -70,10 +77,9 @@@ pub struct Machine pub(super) user_output: Stream, pub(super) user_error: Stream, pub(super) load_contexts: Vec, - pub(super) runtime: Runtime, #[cfg(feature = "ffi")] pub(super) foreign_function_table: ForeignFunctionTable, + pub(super) rng: StdRng, } #[derive(Debug)] @@@ -485,8 -472,10 +489,9 @@@ impl Machine user_output, user_error, load_contexts: vec![], - runtime, #[cfg(feature = "ffi")] foreign_function_table: Default::default(), + rng: StdRng::from_entropy(), }; let mut lib_path = current_dir(); diff --cc src/machine/system_calls.rs index 97888e8a,40881c54..86fededf --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@@ -92,17 -92,15 +92,16 @@@ use base64 use roxmltree; use select; - use bytes::Buf; - use http_body_util::BodyExt; #[cfg(feature = "http")] - use hyper::header::{HeaderName, HeaderValue}; + use warp::hyper::header::{HeaderValue, HeaderName}; #[cfg(feature = "http")] - use hyper::server::conn::http1; + use warp::hyper::{HeaderMap, Method}; #[cfg(feature = "http")] - use hyper::{HeaderMap, Method}; + use warp::{Buf, Filter}; #[cfg(feature = "http")] use reqwest::Url; - use hyper_util::rt::TokioIo; ++//use hyper_util::rt::TokioIo; + use futures::future; #[cfg(feature = "repl")] pub(crate) fn get_key() -> KeyEvent { @@@ -4429,64 -4411,124 +4412,128 @@@ impl Machine #[inline(always)] pub(crate) fn http_listen(&mut self) -> CallResult { let address_sink = self.deref_register(1); - if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) { - let address_string = address_str.as_str(); - let addr: SocketAddr = match address_string - .to_socket_addrs() - .ok() - .and_then(|mut s| s.next()) - { - Some(addr) => addr, - _ => { - self.machine_st.fail = true; - return Ok(()); - } - }; - - let (tx, rx) = std::sync::mpsc::sync_channel(1024); + let tls_key = self.deref_register(3); + let tls_cert = self.deref_register(4); + let content_length_limit = self.deref_register(5); + const CONTENT_LENGTH_LIMIT_DEFAULT: u64 = 32768; + let content_length_limit = match Number::try_from(content_length_limit) { + Ok(Number::Fixnum(n)) => if n.get_num() >= 0 { + n.get_num() as u64 + } else { + CONTENT_LENGTH_LIMIT_DEFAULT + }, + Ok(Number::Integer(n)) => { + let n: Result = (&*n).try_into(); + match n { + Ok(u) => u, + Err(_) => CONTENT_LENGTH_LIMIT_DEFAULT, + } + } + _ => CONTENT_LENGTH_LIMIT_DEFAULT, + }; + let ssl_server: Option<(String,String)> = { + match self.machine_st.value_to_str_like(tls_key) { + Some(key) => { + match self.machine_st.value_to_str_like(tls_cert) { + Some(cert) => { + let key_str = key.as_str(); + let cert_str = cert.as_str(); + + if key_str.is_empty() || cert_str.is_empty() { + None + } else { + Some((key_str.to_string(), cert_str.to_string())) + } + } + None => None + } + } + None => None + } + }; - let runtime = tokio::runtime::Handle::current(); - let _guard = runtime.enter(); + if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) { + let address_string = address_str.as_str(); + let addr: SocketAddr = match address_string.to_socket_addrs().ok().and_then(|mut s| s.next()) { - Some(addr) => addr, - _ => { - self.machine_st.fail = true; - return Ok(()); - } - }; ++ Some(addr) => addr, ++ _ => { ++ self.machine_st.fail = true; ++ return Ok(()); ++ } ++ }; - let listener = match runtime - .block_on(async { tokio::net::TcpListener::bind(addr).await }) - { - Ok(listener) => listener, - Err(_) => { - return Err(self.machine_st.open_permission_error( - address_sink, - atom!("http_listen"), - 2, - )); - } - }; - let (tx, rx) = std::sync::mpsc::sync_channel(1024); ++ let (tx, rx) = std::sync::mpsc::sync_channel(1024); ++ ++ let runtime = tokio::runtime::Handle::current(); ++ let _guard = runtime.enter(); + + fn get_reader(body: impl Buf + Send + 'static) -> Box { + Box::new(body.reader()) + } + + let serve = warp::body::aggregate() + .and(warp::header::optional::(warp::http::header::CONTENT_LENGTH.as_str())) + .and(warp::method()) + .and(warp::header::headers_cloned()) + .and(warp::path::full()) + .and(warp::query::raw().or_else(|_| future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),))))) + .map(move |body, content_length, method, headers: warp::http::HeaderMap, path: warp::filters::path::FullPath, query| { + if let Some(content_length) = content_length { + if content_length > content_length_limit { + return warp::http::Response::builder() + .status(413) + .body(warp::hyper::Body::empty()) + .unwrap(); + } + } + + let http_request_data = HttpRequestData { + method, + headers, + path: path.as_str().to_string(), + query, + body: get_reader(body), + }; + let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new())); + let http_request = HttpRequest { request_data: http_request_data, response: Arc::clone(&response) }; + // we send the request to http_accept + tx.send(http_request).unwrap(); + + // we wait for the Response info from Prolog + { + let (ready, _response, cvar) = &*response; + let mut ready = ready.lock().unwrap(); + while !*ready { + ready = cvar.wait(ready).unwrap(); + } + } + { + let (_, response, _) = &*response; + let response = response.lock().unwrap().take(); + response.expect("Data race error in HTTP server") + } + }); + - self.runtime.spawn(async move { ++ runtime.spawn(async move { + match ssl_server { + Some((key, cert)) => { + warp::serve(serve).tls().key(key).cert(cert).run(addr).await + } + None => { + warp::serve(serve).run(addr).await + } + } + }); + + let http_listener = HttpListener { incoming: rx }; + let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena); - let addr = self.deref_register(2); - self.machine_st.bind( - addr.as_var().unwrap(), - typed_arena_ptr_as_cell!(http_listener), - ); + - runtime.spawn(async move { - loop { - let tx = tx.clone(); - let (stream, _) = listener.accept().await.unwrap(); - - tokio::task::spawn(async move { - let io = TokioIo::new(stream); - - if let Err(err) = http1::Builder::new() - .serve_connection(io, HttpService {tx}) - .await - { - eprintln!("Error serving connection: {:?}", err); - } - }); - } - }); - let http_listener = HttpListener { incoming: rx }; - let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena); - - let addr = self.deref_register(2); - self.machine_st.bind( - addr.as_var().unwrap(), - typed_arena_ptr_as_cell!(http_listener), - ); ++ let addr = self.deref_register(2); ++ self.machine_st.bind( ++ addr.as_var().unwrap(), ++ typed_arena_ptr_as_cell!(http_listener), ++ ); } Ok(()) }