use roxmltree;
use select;
- use bytes::Buf;
- use http_body_util::BodyExt;
#[cfg(feature = "http")]
- use hyper::header::{HeaderName, HeaderValue};
+ use warp::hyper::header::{HeaderValue, HeaderName};
#[cfg(feature = "http")]
- use hyper::server::conn::http1;
+ use warp::hyper::{HeaderMap, Method};
#[cfg(feature = "http")]
- use hyper::{HeaderMap, Method};
+ use warp::{Buf, Filter};
#[cfg(feature = "http")]
use reqwest::Url;
- use hyper_util::rt::TokioIo;
++//use hyper_util::rt::TokioIo;
+ use futures::future;
#[cfg(feature = "repl")]
pub(crate) fn get_key() -> KeyEvent {
#[inline(always)]
pub(crate) fn http_listen(&mut self) -> CallResult {
let address_sink = self.deref_register(1);
- if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) {
- let address_string = address_str.as_str();
- let addr: SocketAddr = match address_string
- .to_socket_addrs()
- .ok()
- .and_then(|mut s| s.next())
- {
- Some(addr) => addr,
- _ => {
- self.machine_st.fail = true;
- return Ok(());
- }
- };
-
- let (tx, rx) = std::sync::mpsc::sync_channel(1024);
+ let tls_key = self.deref_register(3);
+ let tls_cert = self.deref_register(4);
+ let content_length_limit = self.deref_register(5);
+ const CONTENT_LENGTH_LIMIT_DEFAULT: u64 = 32768;
+ let content_length_limit = match Number::try_from(content_length_limit) {
+ Ok(Number::Fixnum(n)) => if n.get_num() >= 0 {
+ n.get_num() as u64
+ } else {
+ CONTENT_LENGTH_LIMIT_DEFAULT
+ },
+ Ok(Number::Integer(n)) => {
+ let n: Result<u64, _> = (&*n).try_into();
+ match n {
+ Ok(u) => u,
+ Err(_) => CONTENT_LENGTH_LIMIT_DEFAULT,
+ }
+ }
+ _ => CONTENT_LENGTH_LIMIT_DEFAULT,
+ };
+ let ssl_server: Option<(String,String)> = {
+ match self.machine_st.value_to_str_like(tls_key) {
+ Some(key) => {
+ match self.machine_st.value_to_str_like(tls_cert) {
+ Some(cert) => {
+ let key_str = key.as_str();
+ let cert_str = cert.as_str();
+
+ if key_str.is_empty() || cert_str.is_empty() {
+ None
+ } else {
+ Some((key_str.to_string(), cert_str.to_string()))
+ }
+ }
+ None => None
+ }
+ }
+ None => None
+ }
+ };
- let runtime = tokio::runtime::Handle::current();
- let _guard = runtime.enter();
+ if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) {
+ let address_string = address_str.as_str();
+ let addr: SocketAddr = match address_string.to_socket_addrs().ok().and_then(|mut s| s.next()) {
- Some(addr) => addr,
- _ => {
- self.machine_st.fail = true;
- return Ok(());
- }
- };
++ Some(addr) => addr,
++ _ => {
++ self.machine_st.fail = true;
++ return Ok(());
++ }
++ };
- let listener = match runtime
- .block_on(async { tokio::net::TcpListener::bind(addr).await })
- {
- Ok(listener) => listener,
- Err(_) => {
- return Err(self.machine_st.open_permission_error(
- address_sink,
- atom!("http_listen"),
- 2,
- ));
- }
- };
- let (tx, rx) = std::sync::mpsc::sync_channel(1024);
++ let (tx, rx) = std::sync::mpsc::sync_channel(1024);
++
++ let runtime = tokio::runtime::Handle::current();
++ let _guard = runtime.enter();
+
+ fn get_reader(body: impl Buf + Send + 'static) -> Box<dyn BufRead + Send> {
+ Box::new(body.reader())
+ }
+
+ let serve = warp::body::aggregate()
+ .and(warp::header::optional::<u64>(warp::http::header::CONTENT_LENGTH.as_str()))
+ .and(warp::method())
+ .and(warp::header::headers_cloned())
+ .and(warp::path::full())
+ .and(warp::query::raw().or_else(|_| future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),)))))
+ .map(move |body, content_length, method, headers: warp::http::HeaderMap, path: warp::filters::path::FullPath, query| {
+ if let Some(content_length) = content_length {
+ if content_length > content_length_limit {
+ return warp::http::Response::builder()
+ .status(413)
+ .body(warp::hyper::Body::empty())
+ .unwrap();
+ }
+ }
+
+ let http_request_data = HttpRequestData {
+ method,
+ headers,
+ path: path.as_str().to_string(),
+ query,
+ body: get_reader(body),
+ };
+ let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));
+ let http_request = HttpRequest { request_data: http_request_data, response: Arc::clone(&response) };
+ // we send the request to http_accept
+ tx.send(http_request).unwrap();
+
+ // we wait for the Response info from Prolog
+ {
+ let (ready, _response, cvar) = &*response;
+ let mut ready = ready.lock().unwrap();
+ while !*ready {
+ ready = cvar.wait(ready).unwrap();
+ }
+ }
+ {
+ let (_, response, _) = &*response;
+ let response = response.lock().unwrap().take();
+ response.expect("Data race error in HTTP server")
+ }
+ });
+
- self.runtime.spawn(async move {
++ runtime.spawn(async move {
+ match ssl_server {
+ Some((key, cert)) => {
+ warp::serve(serve).tls().key(key).cert(cert).run(addr).await
+ }
+ None => {
+ warp::serve(serve).run(addr).await
+ }
+ }
+ });
+
+ let http_listener = HttpListener { incoming: rx };
+ let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena);
- let addr = self.deref_register(2);
- self.machine_st.bind(
- addr.as_var().unwrap(),
- typed_arena_ptr_as_cell!(http_listener),
- );
+
- runtime.spawn(async move {
- loop {
- let tx = tx.clone();
- let (stream, _) = listener.accept().await.unwrap();
-
- tokio::task::spawn(async move {
- let io = TokioIo::new(stream);
-
- if let Err(err) = http1::Builder::new()
- .serve_connection(io, HttpService {tx})
- .await
- {
- eprintln!("Error serving connection: {:?}", err);
- }
- });
- }
- });
- let http_listener = HttpListener { incoming: rx };
- let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena);
-
- let addr = self.deref_register(2);
- self.machine_st.bind(
- addr.as_var().unwrap(),
- typed_arena_ptr_as_cell!(http_listener),
- );
++ let addr = self.deref_register(2);
++ self.machine_st.bind(
++ addr.as_var().unwrap(),
++ typed_arena_ptr_as_cell!(http_listener),
++ );
}
Ok(())
}