]> Repositorios git - scryer-prolog.git/commitdiff
Merge branch 'master' into library-use-case
authorNicolas Luck <[email protected]>
Thu, 5 Oct 2023 10:06:30 +0000 (12:06 +0200)
committerNicolas Luck <[email protected]>
Thu, 5 Oct 2023 10:06:47 +0000 (12:06 +0200)
# Conflicts:
# Cargo.lock
# Cargo.toml
# src/http.rs
# src/machine/mock_wam.rs
# src/machine/mod.rs
# src/machine/system_calls.rs

1  2 
Cargo.lock
Cargo.toml
src/loader.pl
src/machine/machine_state.rs
src/machine/mod.rs
src/machine/system_calls.rs

diff --cc Cargo.lock
Simple merge
diff --cc Cargo.toml
index 47e26eccc9f4d55c22578637835f2205a6bb958f,5fb0db694918627df88a21b96cffacb8fb456918..3f60f4bcd0daa592288e6293c3bc4fb3692c5b71
@@@ -64,12 -64,10 +64,11 @@@ smallvec = "1.8.0
  static_assertions = "1.1.0"
  ryu = "1.0.9"
  futures = "0.3"
 +regex = "1.9.1"
  libloading = "0.7"
  derive_deref = "1.1.1"
- http-body-util = "0.1.0-rc.2"
  bytes = "1"
- dashu = { version = "0.3.1", git = "https://github.com/cmpute/dashu.git"  }
+ dashu = "0.4.0"
  num-order = { version = "1.2.0" }
  rand = "0.8.5"
  
@@@ -80,8 -78,7 +79,9 @@@ crossterm = { version = "0.20.0", optio
  ctrlc = { version = "3.2.2", optional = true }
  rustyline = { version = "12.0.0", optional = true }
  native-tls = { version = "0.2.4", optional = true }
- hyper = { version = "=1.0.0-rc.4", features = ["full"], optional = true }
- hyper-util = { git = "https://github.com/hyperium/hyper-util.git" }
++#hyper = { version = "=1.0.0-rc.4", features = ["full"], optional = true }
++#hyper-util = { git = "https://github.com/hyperium/hyper-util.git" }
+ warp = { version = "=0.3.5", features = ["tls"], optional = true }
  reqwest = { version = "0.11.18", features = ["blocking"], optional = true }
  tokio = { version = "1.28.2", features = ["full"] }
  
diff --cc src/loader.pl
Simple merge
Simple merge
index 40a96d4e4a910b4d35136be4d04dbdc953c4a047,665366c331723fa9a9a87d803097bc018a4bd0df..45688ff4bbbafc0778eb93e742e765b3dcc5f3d8
@@@ -54,12 -51,11 +54,15 @@@ use ordered_float::OrderedFloat
  
  use std::cmp::Ordering;
  use std::env;
 +use std::io::Read;
  use std::path::PathBuf;
  use std::sync::atomic::AtomicBool;
 +
 +use self::config::MachineConfig;
 +use self::parsed_results::*;
+ use tokio::runtime::Runtime;
+ use rand::rngs::StdRng;
+ use rand::SeedableRng;
  
  lazy_static! {
      pub static ref INTERRUPT: AtomicBool = AtomicBool::new(false);
@@@ -74,8 -70,10 +77,9 @@@ pub struct Machine 
      pub(super) user_output: Stream,
      pub(super) user_error: Stream,
      pub(super) load_contexts: Vec<LoadContext>,
 -    pub(super) runtime: Runtime,
      #[cfg(feature = "ffi")]
      pub(super) foreign_function_table: ForeignFunctionTable,
+     pub(super) rng: StdRng,
  }
  
  #[derive(Debug)]
@@@ -485,8 -472,10 +489,9 @@@ impl Machine 
              user_output,
              user_error,
              load_contexts: vec![],
 -            runtime,
              #[cfg(feature = "ffi")]
              foreign_function_table: Default::default(),
+             rng: StdRng::from_entropy(),
          };
  
          let mut lib_path = current_dir();
index 97888e8a7ff334d71143fcc36121b440c9dd596d,40881c54df02105bed72384d3fa35a0705dc9996..86fededf7238c8343a970dabf24ff9ea16591038
@@@ -92,17 -92,15 +92,16 @@@ use base64
  use roxmltree;
  use select;
  
- use bytes::Buf;
- use http_body_util::BodyExt;
  #[cfg(feature = "http")]
- use hyper::header::{HeaderName, HeaderValue};
+ use warp::hyper::header::{HeaderValue, HeaderName};
  #[cfg(feature = "http")]
- use hyper::server::conn::http1;
+ use warp::hyper::{HeaderMap, Method};
  #[cfg(feature = "http")]
- use hyper::{HeaderMap, Method};
+ use warp::{Buf, Filter};
  #[cfg(feature = "http")]
  use reqwest::Url;
- use hyper_util::rt::TokioIo;
++//use hyper_util::rt::TokioIo;
+ use futures::future;
  
  #[cfg(feature = "repl")]
  pub(crate) fn get_key() -> KeyEvent {
@@@ -4429,64 -4411,124 +4412,128 @@@ impl Machine 
      #[inline(always)]
      pub(crate) fn http_listen(&mut self) -> CallResult {
          let address_sink = self.deref_register(1);
-         if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) {
-             let address_string = address_str.as_str();
-             let addr: SocketAddr = match address_string
-                 .to_socket_addrs()
-                 .ok()
-                 .and_then(|mut s| s.next())
-             {
-                 Some(addr) => addr,
-                 _ => {
-                     self.machine_st.fail = true;
-                     return Ok(());
-                 }
-             };
-             let (tx, rx) = std::sync::mpsc::sync_channel(1024);
+       let tls_key = self.deref_register(3);
+       let tls_cert = self.deref_register(4);
+       let content_length_limit = self.deref_register(5);
+       const CONTENT_LENGTH_LIMIT_DEFAULT: u64 = 32768;
+       let content_length_limit = match Number::try_from(content_length_limit) {
+           Ok(Number::Fixnum(n)) => if n.get_num() >= 0 {
+               n.get_num() as u64
+           } else {
+               CONTENT_LENGTH_LIMIT_DEFAULT
+           },
+           Ok(Number::Integer(n)) => {
+                 let n: Result<u64, _> = (&*n).try_into();
+               match n {
+                   Ok(u) => u,
+                   Err(_) => CONTENT_LENGTH_LIMIT_DEFAULT,
+               }
+           }
+           _ => CONTENT_LENGTH_LIMIT_DEFAULT,
+       };
  
+       let ssl_server: Option<(String,String)> = {
+           match self.machine_st.value_to_str_like(tls_key) {
+               Some(key) => {
+                   match self.machine_st.value_to_str_like(tls_cert) {
+                       Some(cert) => {
+                           let key_str = key.as_str();
+                           let cert_str = cert.as_str();
+                           if key_str.is_empty() || cert_str.is_empty() {
+                               None
+                           } else {
+                               Some((key_str.to_string(), cert_str.to_string()))
+                           }
+                       }
+                       None => None
+                   }
+               }
+               None => None
+           }
+       };
  
-             let runtime = tokio::runtime::Handle::current();
-               let _guard = runtime.enter();
+       if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) {
+           let address_string = address_str.as_str();
+           let addr: SocketAddr = match address_string.to_socket_addrs().ok().and_then(|mut s| s.next()) {
 -              Some(addr) => addr,
 -                _ => {
 -                    self.machine_st.fail = true;
 -                    return Ok(());
 -                }
 -            };
++                  Some(addr) => addr,
++            _ => {
++                self.machine_st.fail = true;
++                return Ok(());
++            }
++        };
  
-             let listener = match runtime
-                 .block_on(async { tokio::net::TcpListener::bind(addr).await })
-             {
-                 Ok(listener) => listener,
-                 Err(_) => {
-                     return Err(self.machine_st.open_permission_error(
-                         address_sink,
-                         atom!("http_listen"),
-                         2,
-                     ));
-                 }
-             };
 -          let (tx, rx) = std::sync::mpsc::sync_channel(1024);
++        let (tx, rx) = std::sync::mpsc::sync_channel(1024);
++
++        let runtime = tokio::runtime::Handle::current();
++        let _guard = runtime.enter();
+           fn get_reader(body: impl Buf + Send + 'static) -> Box<dyn BufRead + Send> {
+               Box::new(body.reader())
+           }
+           let serve = warp::body::aggregate()
+               .and(warp::header::optional::<u64>(warp::http::header::CONTENT_LENGTH.as_str()))
+               .and(warp::method())
+               .and(warp::header::headers_cloned())
+               .and(warp::path::full())
+               .and(warp::query::raw().or_else(|_| future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),)))))
+               .map(move |body, content_length, method, headers: warp::http::HeaderMap, path: warp::filters::path::FullPath, query| {
+                   if let Some(content_length) = content_length {
+                       if content_length > content_length_limit {
+                           return warp::http::Response::builder()
+                               .status(413)
+                               .body(warp::hyper::Body::empty())
+                               .unwrap();
+                       }
+                   }
+                   
+                   let http_request_data = HttpRequestData {
+                       method,
+                       headers,
+                       path: path.as_str().to_string(),
+                       query,
+                       body: get_reader(body),
+                   };
+                   let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));
+                   let http_request = HttpRequest { request_data: http_request_data, response: Arc::clone(&response) };
+                   // we send the request to http_accept
+                   tx.send(http_request).unwrap();
+                   // we wait for the Response info from Prolog
+                   {
+                       let (ready, _response, cvar) = &*response;
+                       let mut ready = ready.lock().unwrap();
+                       while !*ready {
+                           ready = cvar.wait(ready).unwrap();
+                       }
+                   }
+                   {
+                       let (_, response, _) = &*response;
+                       let response = response.lock().unwrap().take();
+                       response.expect("Data race error in HTTP server")
+                   }
+               });
 -          self.runtime.spawn(async move {
++          runtime.spawn(async move {
+               match ssl_server {
+                   Some((key, cert)) => {
+                       warp::serve(serve).tls().key(key).cert(cert).run(addr).await
+                   }
+                   None => {
+                       warp::serve(serve).run(addr).await
+                   }
+               }
+           });
+           let http_listener = HttpListener { incoming: rx };
+           let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena);
 -            let addr = self.deref_register(2);
 -            self.machine_st.bind(
 -                addr.as_var().unwrap(),
 -                typed_arena_ptr_as_cell!(http_listener),
 -            );
 +
-             runtime.spawn(async move {
-                 loop {
-                     let tx = tx.clone();
-                     let (stream, _) = listener.accept().await.unwrap();
-         
-                     tokio::task::spawn(async move {
-                         let io = TokioIo::new(stream);
-         
-                         if let Err(err) = http1::Builder::new()
-                             .serve_connection(io, HttpService {tx})
-                             .await
-                         {
-                             eprintln!("Error serving connection: {:?}", err);
-                         }
-                     });
-                 }
-             });
-             let http_listener = HttpListener { incoming: rx };
-             let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena);
-             let addr = self.deref_register(2);
-             self.machine_st.bind(
-                 addr.as_var().unwrap(),
-                 typed_arena_ptr_as_cell!(http_listener),
-             );
++        let addr = self.deref_register(2);
++        self.machine_st.bind(
++            addr.as_var().unwrap(),
++            typed_arena_ptr_as_cell!(http_listener),
++        );
          }
          Ok(())
      }