From: Nicolas Luck Date: Wed, 13 Sep 2023 16:13:14 +0000 (+0200) Subject: Merge branch 'master' into library-use-case X-Git-Tag: remove^2~23 X-Git-Url: https://git.sagredo.dev/?a=commitdiff_plain;h=136463c92edb89944e5611e3acd56c0c165181a9;p=scryer-prolog.git Merge branch 'master' into library-use-case # Conflicts: # Cargo.toml # src/atom_table.rs # src/bin/scryer-prolog.rs # src/http.rs # src/machine/mock_wam.rs # src/machine/mod.rs # src/machine/system_calls.rs --- 136463c92edb89944e5611e3acd56c0c165181a9 diff --cc Cargo.lock index fd3211c1,5651a7a7..c05e1975 --- a/Cargo.lock +++ b/Cargo.lock @@@ -1607,26 -1574,6 +1606,26 @@@ dependencies = "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", - "syn 2.0.29", ++ "syn 2.0.32", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@@ -2023,9 -1942,9 +2024,10 @@@ dependencies = "libc", "libffi", "libloading", + "maplit", "modular-bitfield", "native-tls", + "num-order", "ordered-float", "phf 0.9.0", "predicates-core", diff --cc Cargo.toml index 7c257008,92c08572..47e26ecc --- a/Cargo.toml +++ b/Cargo.toml @@@ -82,8 -85,20 +87,20 @@@ tokio = { version = "1.28.2", features [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2.10", features = ["js"] } -tokio = { version = "1.28.2", features = ["sync", "macros", "io-util", "rt"] } +tokio = { version = "1.28.2", features = ["sync", "macros", "io-util", "rt", "time"] } + [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] + console_error_panic_hook = "0.1" + console_log = "1.0" + wasm-bindgen = "0.2.87" + wasm-bindgen-futures = "0.4" + serde-wasm-bindgen = "0.5" + web-sys = { version = "0.3", features = [ + "Document", + "Window", + "Element", + ]} + [target.'cfg(target_os = "wasi")'.dependencies] ring-wasi = { version = "0.16.25" } diff --cc src/bin/scryer-prolog.rs index 01291652,09465006..f138024b --- a/src/bin/scryer-prolog.rs +++ b/src/bin/scryer-prolog.rs @@@ -1,27 -1,13 +1,28 @@@ fn main() -> std::process::ExitCode { - use std::sync::atomic::Ordering; use scryer_prolog::*; + use scryer_prolog::atom_table::Atom; + use std::sync::atomic::Ordering; #[cfg(feature = "repl")] ctrlc::set_handler(move || { scryer_prolog::machine::INTERRUPT.store(true, Ordering::Relaxed); - }).unwrap(); + }) + .unwrap(); - let mut wam = machine::Machine::new(); - wam.run_top_level() + #[cfg(target_os = "wasi")] + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + #[cfg(not(target_os = "wasi"))] + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async move { + let mut wam = machine::Machine::new(Default::default()); + wam.run_top_level(atom!("$toplevel"), (atom!("$repl"), 1)) + }) } diff --cc src/http.rs index 8f548235,13d6ff31..6c0d40c8 --- a/src/http.rs +++ b/src/http.rs @@@ -27,28 -27,29 +27,31 @@@ impl Service> fo type Error = hyper::Error; type Future = Pin> + Send>>; - fn call(&mut self, req: Request) -> Self::Future { - // new connection! - // we send the Request info to Prolog - let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new())); - let http_request = HttpRequest { - request: req, - response: Arc::clone(&response), + fn call(self: &HttpService, req: Request) -> Self::Future { + // new connection! + // we send the Request info to Prolog + let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new())); - let http_request = HttpRequest { request: req, response: Arc::clone(&response) }; ++ let http_request = HttpRequest { ++ request: req, ++ response: Arc::clone(&response) + }; - self.tx.send(http_request).unwrap(); + self.tx.send(http_request).unwrap(); - // we wait for the Response info from Prolog - { - let (ready, _response, cvar) = &*response; - let mut ready = ready.lock().unwrap(); - while !*ready { - ready = cvar.wait(ready).unwrap(); - } - } - { - let (_, response, _) = &*response; - let response = response.lock().unwrap().take(); - let res = response.expect("Data race error in HTTP Server"); - Box::pin(async move { Ok(res) }) - } + // we wait for the Response info from Prolog + { + let (ready, _response, cvar) = &*response; + let mut ready = ready.lock().unwrap(); + while !*ready { - ready = cvar.wait(ready).unwrap(); ++ ready = cvar.wait(ready).unwrap(); + } + } + { + let (_, response, _) = &*response; + let response = response.lock().unwrap().take(); + let res = response.expect("Data race error in HTTP Server"); + Box::pin(async move { + Ok(res) + }) + } } } diff --cc src/loader.pl index c7479d50,0ba61117..69897a0c --- a/src/loader.pl +++ b/src/loader.pl @@@ -586,11 -595,9 +595,13 @@@ use_module(Module, Exports, Evacuable) ) ). +consult_stream(Stream, PathFileName) :- + '$push_load_state_payload'(Evacuable), + file_load(Stream, PathFileName, Subevacuable), + '$use_module'(Evacuable, Subevacuable, _). + :- non_counted_backtracking check_predicate_property/5. + check_predicate_property(meta_predicate, Module, Name, Arity, MetaPredicateTerm) :- '$meta_predicate_property'(Module, Name, Arity, MetaPredicateTerm). check_predicate_property(built_in, _, Name, Arity, built_in) :- diff --cc src/machine/lib_machine.rs index eac57668,00000000..c2b8966a mode 100644,000000..100644 --- a/src/machine/lib_machine.rs +++ b/src/machine/lib_machine.rs @@@ -1,379 -1,0 +1,380 @@@ +use std::collections::BTreeSet; + ++use crate::atom_table; +use crate::machine::BREAK_FROM_DISPATCH_LOOP_LOC; +use crate::machine::mock_wam::{CompositeOpDir, Term}; +use crate::parser::parser::{Parser, Tokens}; +use crate::read::write_term_to_heap; + +use super::{ + Machine, MachineConfig, QueryResult, QueryResolutionLine, + Atom, AtomCell, HeapCellValue, HeapCellValueTag, + streams::Stream +}; +use ref_thread_local::__Deref; +fn print_term(term: &Term) { + match term { + Term::Clause(clause, atom, terms) => { + println!("clause: {:?}", clause); + println!("atom: {:?}", atom.as_str()); + println!("terms: {:?}", terms); + + for term in terms { + print_term(term); + } + }, + Term::Cons(cons, term1, term2) => { + println!("constant: {:?}", cons); + println!("term1: {:?}", term1); + println!("term2: {:?}", term2); + }, + Term::Literal(cell, literal) => { + println!("Cell: {:?}", cell); + println!("Literal: {:?}", literal); + }, + Term::Var(var_reg, var_ptr) => { + println!("Var: {:?}, {:?}", var_reg.get(), var_ptr.deref()); + }, + Term::CompleteString(cell, atom) => { + println!("Cell: {:?}", cell); + println!("Atom: {:?}", atom.as_str()); + }, + _ => { + println!("Parsed query: {:?}", term); + } + } + +} + +impl Machine { + pub fn new_lib() -> Self { + Machine::new(MachineConfig::in_memory().with_toplevel(include_str!("../lib_toplevel.pl"))) + } + + pub fn load_module_string(&mut self, module_name: &str, program: String) { + let stream = Stream::from_owned_string(program, &mut self.machine_st.arena); + self.load_file(module_name, stream); + } + + pub fn consult_module_string(&mut self, module_name: &str, program: String) { + let stream = Stream::from_owned_string(program, &mut self.machine_st.arena); + self.machine_st.registers[1] = stream_as_cell!(stream); - self.machine_st.registers[2] = atom_as_cell!(self.machine_st.atom_tbl.build_with(module_name)); ++ self.machine_st.registers[2] = atom_as_cell!(&atom_table::AtomTable::build_with(&self.machine_st.atom_tbl, module_name)); + + self.run_module_predicate(atom!("loader"), (atom!("consult_stream"), 2)); + } + + pub fn run_query(&mut self, query: String) -> QueryResult { + //let input = format!("{}", query); + //println!("Running query: {}", input); + + // Parse the query so we can analyze and then call the term + let mut parser = Parser::new( + Stream::from_owned_string(query, &mut self.machine_st.arena), + &mut self.machine_st + ); + let op_dir = CompositeOpDir::new(&self.indices.op_dir, None); + let term = parser.read_term(&op_dir, Tokens::Default).expect("Failed to parse query"); + + // Write parsed term to heap + let term_write_result = write_term_to_heap(&term, &mut self.machine_st.heap, &mut self.machine_st.atom_tbl).expect("couldn't write term to heap"); + + // Set up registers + self.machine_st.registers[1] = self.machine_st.heap[term_write_result.heap_loc]; + self.machine_st.cp = BREAK_FROM_DISPATCH_LOOP_LOC; + self.machine_st.p = self.indices.code_dir.get(&(atom!("$call"), 1)).expect("couldn't get code index").local().unwrap(); + + // If we don't set this register, we get an error in write_term. + // It seems to be the register that holds max_depth + self.machine_st.registers[7] = 50.into(); + // Not setting this will cause: + // thread 'machine::lib_machine::tests::programatic_query' panicked at 'index out of bounds: the len is 10 but the index is 295', src/machine/machine_state_impl.rs:1479:56 + self.machine_st.registers[6] = 9.into(); + + + println!("running dispatch loop"); + self.dispatch_loop(); + println!("done"); + + let op_dir = &self.indices.op_dir; + let write_term_result = self.machine_st.write_term(op_dir); + println!("write_term_result: {:?}", write_term_result); + // => Err([HeapCellValue { tag: Atom, name: "error", arity: 2, m: false, f: false }, HeapCellValue { tag: Str, value: 13, m: false, f: false }, HeapCellValue { tag: Str, value: 16, m: false, f: false }, HeapCellValue { tag: Atom, name: "type_error", arity: 2, m: false, f: false }, HeapCellValue { tag: Atom, name: "list", arity: 0, m: false, f: false }, HeapCellValue { tag: Cons, ptr: 9, m: false, f: false }, HeapCellValue { tag: Atom, name: "/", arity: 2, m: false, f: false }, HeapCellValue { tag: Atom, name: "write_term", arity: 0, m: false, f: false }, HeapCellValue { tag: Fixnum, value: 2, m: false, f: false }]) + let printer = write_term_result + .expect("Couldn't get printer from write_term") + .expect("Couldn't get printer from write_term"); + + println!("Varnames: {:?}", printer.var_names); + println!("Printer: {:?}", printer); + + Err("not implementend".to_string()) + } + + pub fn parse_output(&self) -> QueryResult { + let output = self.get_user_output().trim().to_string(); + //println!("Output: {}", output); + if output.starts_with("error(") { + Err(output) + } else { + // Remove duplicate lines + Ok(output + // 1. Split into disjunct matches + .split(";") + .map(|s| s.trim()) + // 2. Dedupe through Set + .collect::>() + .iter() + .cloned() + // 3. Back to Vec + .collect::>() + .iter() + // 4. Trim and remove empty lines + .map(|s| s.trim()) + .map(|s| if s.ends_with(".") { s[..s.len()-1].to_string() } else { s.to_string() }) + .filter(|s| !s.is_empty()) + // 5. Parse into QueryResolutionLine + .map(QueryResolutionLine::try_from) + // 6. Remove lines that couldn't be parsed, so we still keep the ones they did + .filter_map(Result::ok) + .collect::>() + .into()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::machine::{QueryMatch, Value, QueryResolution}; + + #[tokio::test] + async fn programatic_query() { + let mut machine = Machine::new_lib(); + + machine.load_module_string( + "facts", + String::from( + r#" + triple("a", "p1", "b"). + triple("a", "p2", "b"). + "#, + ), + ); + + let query = String::from(r#"triple("a",P,"b")."#); + let output = machine.run_query(query); + assert_eq!( + output, + Ok(QueryResolution::Matches(vec![ + QueryMatch::from(btreemap! { + "P" => Value::from("p1"), + }), + QueryMatch::from(btreemap! { + "P" => Value::from("p2"), + }), + ])) + ); + + assert_eq!( + machine.run_query(String::from(r#"triple("a","p1","b")."#)), + Ok(QueryResolution::True) + ); + + assert_eq!( + machine.run_query(String::from(r#"triple("x","y","z")."#)), + Ok(QueryResolution::False) + ); + } + + #[tokio::test] + async fn failing_query() { + let mut machine = Machine::new_lib(); + let query = String::from(r#"triple("a",P,"b")."#); + let output = machine.run_query(query); + assert_eq!( + output, + Err(String::from("error(existence_error(procedure,triple/3),triple/3).")) + ); + } + + #[tokio::test] + async fn complex_results() { + let mut machine = Machine::new_lib(); + machine.load_module_string( + "facts", + r#" + :- discontiguous(subject_class/2). + :- discontiguous(constructor/2). + + subject_class("Todo", c). + constructor(c, '[{action: "addLink", source: "this", predicate: "todo://state", target: "todo://ready"}]'). + + subject_class("Recipe", xyz). + constructor(xyz, '[{action: "addLink", source: "this", predicate: "recipe://title", target: "literal://string:Meta%20Muffins"}]'). + "#.to_string()); + + let result = machine.run_query(String::from("subject_class(\"Todo\", C), constructor(C, Actions).")); + assert_eq!( + result, + Ok(QueryResolution::Matches(vec![ + QueryMatch::from(btreemap! { + "C" => Value::from("c"), + "Actions" => Value::from("[{action: \"addLink\", source: \"this\", predicate: \"todo://state\", target: \"todo://ready\"}]"), + }), + ])) + ); + + let result = machine.run_query(String::from("subject_class(\"Recipe\", C), constructor(C, Actions).")); + assert_eq!( + result, + Ok(QueryResolution::Matches(vec![ + QueryMatch::from(btreemap! { + "C" => Value::from("xyz"), + "Actions" => Value::from("[{action: \"addLink\", source: \"this\", predicate: \"recipe://title\", target: \"literal://string:Meta%20Muffins\"}]"), + }), + ])) + ); + + let result = machine.run_query(String::from("subject_class(Class, _).")); + assert_eq!( + result, + Ok(QueryResolution::Matches(vec![ + QueryMatch::from(btreemap! { + "Class" => Value::from("Recipe") + }), + QueryMatch::from(btreemap! { + "Class" => Value::from("Todo") + }), + ])) + ); + } + + #[tokio::test] + async fn list_results() { + let mut machine = Machine::new_lib(); + machine.load_module_string( + "facts", + r#" + list([1,2,3]). + "#.to_string()); + + let result = machine.run_query(String::from("list(X).")); + assert_eq!( + result, + Ok(QueryResolution::Matches(vec![ + QueryMatch::from(btreemap! { + "X" => Value::List(Vec::from([Value::from("1"), Value::from("2"), Value::from("3")])) + }), + ])) + ); + } + + + #[tokio::test] + async fn consult() { + let mut machine = Machine::new_lib(); + + machine.consult_module_string( + "facts", + String::from( + r#" + triple("a", "p1", "b"). + triple("a", "p2", "b"). + "#, + ), + ); + + let query = String::from(r#"triple("a",P,"b")."#); + let output = machine.run_query(query); + assert_eq!( + output, + Ok(QueryResolution::Matches(vec![ + QueryMatch::from(btreemap! { + "P" => Value::from("p1"), + }), + QueryMatch::from(btreemap! { + "P" => Value::from("p2"), + }), + ])) + ); + + assert_eq!( + machine.run_query(String::from(r#"triple("a","p1","b")."#)), + Ok(QueryResolution::True) + ); + + assert_eq!( + machine.run_query(String::from(r#"triple("x","y","z")."#)), + Ok(QueryResolution::False) + ); + + machine.consult_module_string( + "facts", + String::from( + r#" + triple("a", "new", "b"). + "#, + ), + ); + + assert_eq!( + machine.run_query(String::from(r#"triple("a","p1","b")."#)), + Ok(QueryResolution::False) + ); + + assert_eq!( + machine.run_query(String::from(r#"triple("a","new","b")."#)), + Ok(QueryResolution::True) + ); + + } + + #[ignore = "fails on windows"] + #[tokio::test] + async fn stress_integration_test() { + let mut machine = Machine::new_lib(); + + // File with test commands, i.e. program code to consult and queries to run + let code = include_str!("./lib_integration_test_commands.txt"); + + // Split the code into blocks + let blocks = code.split("====="); + + let mut i = 0; + // Iterate over the blocks + for block in blocks { + // Trim the block to remove any leading or trailing whitespace + let block = block.trim(); + + // Skip empty blocks + if block.is_empty() { + continue; + } + + // Check if the block is a query + if block.starts_with("query") { + // Extract the query from the block + let query = &block[5..]; + + i += 1; + println!("query #{}: {}", i, query); + // Parse and execute the query + let result = machine.run_query(query.to_string()); + + assert!(result.is_ok()); + + // Print the result + println!("{:?}", result); + } else if block.starts_with("consult") { + // Extract the code from the block + let code = &block[7..]; + + println!("load code: {}", code); + + // Load the code into the machine + machine.consult_module_string("facts", code.to_string()); + } + } + + } +} diff --cc src/machine/mock_wam.rs index 35e00eea,cd8f7836..7182d626 --- a/src/machine/mock_wam.rs +++ b/src/machine/mock_wam.rs @@@ -234,6 -332,19 +234,17 @@@ impl Machine self.load_file(file.into(), stream); self.user_output.bytes().map(|b| b.unwrap()).collect() } + + pub fn test_load_string(&mut self, code: &str) -> Vec { - use std::io::Read; - + let stream = Stream::from_owned_string( + code.to_owned(), + &mut self.machine_st.arena, + ); + + self.load_file("".into(), stream); + self.user_output.bytes().map(|b| b.unwrap()).collect() + } + } #[cfg(test)] diff --cc src/machine/mod.rs index e56e72bb,7e456d4e..40a96d4e --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@@ -5,8 -5,8 +5,9 @@@ pub mod code_walker #[macro_use] pub mod loader; pub mod compile; +pub mod config; pub mod copier; + pub mod disjuncts; pub mod dispatch; pub mod gc; pub mod heap; @@@ -17,9 -16,7 +18,8 @@@ pub mod machine_indices pub mod machine_state; pub mod machine_state_impl; pub mod mock_wam; +pub mod parsed_results; pub mod partial_string; - pub mod disjuncts; pub mod preprocessor; pub mod stack; pub mod streams; @@@ -239,7 -242,8 +247,9 @@@ impl Machine path_buf.push("src/toplevel.pl"); let path = path_buf.to_str().unwrap(); - let toplevel_stream = Stream::from_static_string(program, &mut self.machine_st.arena); - let toplevel_stream = - Stream::from_static_string(include_str!("../toplevel.pl"), &mut self.machine_st.arena); ++ let toplevel_stream = ++ Stream::from_static_string(program, &mut self.machine_st.arena); ++ self.load_file(path, toplevel_stream); @@@ -454,8 -469,9 +485,8 @@@ user_output, user_error, load_contexts: vec![], - runtime, #[cfg(feature = "ffi")] - foreign_function_table: Default::default(), + foreign_function_table: Default::default(), }; let mut lib_path = current_dir(); diff --cc src/machine/system_calls.rs index bd38455b,412afca6..97888e8a --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@@ -87,17 -92,16 +92,17 @@@ use base64 use roxmltree; use select; + use bytes::Buf; + use http_body_util::BodyExt; #[cfg(feature = "http")] - use hyper::server::conn::http1; + use hyper::header::{HeaderName, HeaderValue}; #[cfg(feature = "http")] - use hyper::header::{HeaderValue, HeaderName}; + use hyper::server::conn::http1; #[cfg(feature = "http")] use hyper::{HeaderMap, Method}; - use http_body_util::BodyExt; - use bytes::Buf; #[cfg(feature = "http")] use reqwest::Url; +use hyper_util::rt::TokioIo; #[cfg(feature = "repl")] pub(crate) fn get_key() -> KeyEvent { @@@ -4372,130 -4440,133 +4441,139 @@@ impl Machine self.machine_st.fail = true; return Ok(()); } - }; - - let (tx, rx) = std::sync::mpsc::sync_channel(1024); + }; - let runtime = tokio::runtime::Handle::current(); - let _guard = runtime.enter(); - let listener = match runtime.block_on(async { tokio::net::TcpListener::bind(addr).await }) { - Ok(listener) => listener, - Err(_) => { - return Err(self.machine_st.open_permission_error(address_sink, atom!("http_listen"), 2)); - } - }; + let (tx, rx) = std::sync::mpsc::sync_channel(1024); - let _guard = self.runtime.enter(); - let listener = match self - .runtime + - runtime.spawn(async move { - loop { - let tx = tx.clone(); - let (stream, _) = listener.accept().await.unwrap(); ++ let runtime = tokio::runtime::Handle::current(); ++ let _guard = runtime.enter(); + - tokio::task::spawn(async move { - let io = TokioIo::new(stream); ++ let listener = match runtime + .block_on(async { tokio::net::TcpListener::bind(addr).await }) + { + Ok(listener) => listener, + Err(_) => { + return Err(self.machine_st.open_permission_error( + address_sink, + atom!("http_listen"), + 2, + )); + } + }; - if let Err(err) = http1::Builder::new() - .serve_connection(io, HttpService { - tx - }) - .await - { - eprintln!("Error serving connection: {:?}", err); - self.runtime.spawn(async move { ++ runtime.spawn(async move { + loop { + let tx = tx.clone(); + let (stream, _) = listener.accept().await.unwrap(); - ++ + tokio::task::spawn(async move { ++ let io = TokioIo::new(stream); ++ + if let Err(err) = http1::Builder::new() - .serve_connection(stream, HttpService { tx }) ++ .serve_connection(io, HttpService {tx}) + .await + { + eprintln!("Error serving connection: {:?}", err); + } + }); } - }); - } - }); - let http_listener = HttpListener { incoming: rx }; - let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena); + }); + let http_listener = HttpListener { incoming: rx }; + let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena); ++ let addr = self.deref_register(2); - self.machine_st.bind(addr.as_var().unwrap(), typed_arena_ptr_as_cell!(http_listener)); - } - Ok(()) + self.machine_st.bind( + addr.as_var().unwrap(), + typed_arena_ptr_as_cell!(http_listener), + ); + } + Ok(()) } #[cfg(feature = "http")] #[inline(always)] pub(crate) fn http_accept(&mut self) -> CallResult { - let culprit = self.deref_register(1); - let method = self.deref_register(2); - let path = self.deref_register(3); - let query = self.deref_register(5); - let stream_addr = self.deref_register(6); - let handle_addr = self.deref_register(7); - read_heap_cell!(culprit, - (HeapCellValueTag::Cons, cons_ptr) => { - match_untyped_arena_ptr!(cons_ptr, - (ArenaHeaderTag::HttpListener, http_listener) => { - match http_listener.incoming.recv() { - Ok(request) => { - let method_atom = match *request.request.method() { - Method::GET => atom!("get"), - Method::POST => atom!("post"), - Method::PUT => atom!("put"), - Method::DELETE => atom!("delete"), - Method::PATCH => atom!("patch"), - Method::HEAD => atom!("head"), - _ => unreachable!(), - }; - let path_atom = self.machine_st.atom_tbl.build_with(request.request.uri().path()); - let path_cell = atom_as_cstr_cell!(path_atom); - let headers: Vec = request.request.headers().iter().map(|(header_name, header_value)| { - let h = self.machine_st.heap.len(); - - let header_term = functor!( - self.machine_st.atom_tbl.build_with(header_name.as_str()), - [cell(string_as_cstr_cell!(self.machine_st.atom_tbl.build_with(header_value.to_str().unwrap())))] - ); - - self.machine_st.heap.extend(header_term.into_iter()); - str_loc_as_cell!(h) - }).collect(); - - let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); - - let query_str = request.request.uri().query().unwrap_or(""); - let query_atom = self.machine_st.atom_tbl.build_with(query_str); - let query_cell = string_as_cstr_cell!(query_atom); - - let hyper_req = request.request; - let runtime = tokio::runtime::Handle::current(); - let buf = runtime.block_on(async {hyper_req.collect().await.unwrap().aggregate()}); - let reader = buf.reader(); - - let mut stream = Stream::from_http_stream( - path_atom, - Box::new(reader), - &mut self.machine_st.arena - ); - *stream.options_mut() = StreamOptions::default(); - stream.options_mut().set_stream_type(StreamType::Binary); - self.indices.streams.insert(stream); - let stream = stream_as_cell!(stream); - - let handle = arena_alloc!(request.response, &mut self.machine_st.arena); - - self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom)); - self.machine_st.bind(path.as_var().unwrap(), path_cell); - unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]); - self.machine_st.bind(query.as_var().unwrap(), query_cell); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); - self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle)); - } - Err(_) => { - self.machine_st.fail = true; - } - } - } - _ => { - unreachable!(); - } - ); - } - _ => { - unreachable!(); - } - ); - Ok(()) + let culprit = self.deref_register(1); + let method = self.deref_register(2); + let path = self.deref_register(3); + let query = self.deref_register(5); + let stream_addr = self.deref_register(6); + let handle_addr = self.deref_register(7); + read_heap_cell!(culprit, + (HeapCellValueTag::Cons, cons_ptr) => { + match_untyped_arena_ptr!(cons_ptr, + (ArenaHeaderTag::HttpListener, http_listener) => { + match http_listener.incoming.recv() { + Ok(request) => { + let method_atom = match *request.request.method() { + Method::GET => atom!("get"), + Method::POST => atom!("post"), + Method::PUT => atom!("put"), + Method::DELETE => atom!("delete"), + Method::PATCH => atom!("patch"), + Method::HEAD => atom!("head"), + _ => unreachable!(), + }; + let path_atom = AtomTable::build_with(&self.machine_st.atom_tbl, request.request.uri().path()); + let path_cell = atom_as_cstr_cell!(path_atom); + let headers: Vec = request.request.headers().iter().map(|(header_name, header_value)| { + let h = self.machine_st.heap.len(); + + let header_term = functor!( + AtomTable::build_with(&self.machine_st.atom_tbl, header_name.as_str()), + [cell(string_as_cstr_cell!(AtomTable::build_with(&self.machine_st.atom_tbl, header_value.to_str().unwrap())))] + ); + + self.machine_st.heap.extend(header_term.into_iter()); + str_loc_as_cell!(h) + }).collect(); + + let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); + + let query_str = request.request.uri().query().unwrap_or(""); + let query_atom = AtomTable::build_with(&self.machine_st.atom_tbl, query_str); + let query_cell = string_as_cstr_cell!(query_atom); + + let hyper_req = request.request; - let buf = self.runtime.block_on(async {hyper_req.collect().await.unwrap().aggregate()}); ++ let runtime = tokio::runtime::Handle::current(); ++ let buf = runtime.block_on(async {hyper_req.collect().await.unwrap().aggregate()}); + let reader = buf.reader(); + + let mut stream = Stream::from_http_stream( + path_atom, + Box::new(reader), + &mut self.machine_st.arena + ); + *stream.options_mut() = StreamOptions::default(); + stream.options_mut().set_stream_type(StreamType::Binary); + self.indices.streams.insert(stream); + let stream = stream_as_cell!(stream); + + let handle = arena_alloc!(request.response, &mut self.machine_st.arena); + + self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom)); + self.machine_st.bind(path.as_var().unwrap(), path_cell); + unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]); + self.machine_st.bind(query.as_var().unwrap(), query_cell); + self.machine_st.bind(stream_addr.as_var().unwrap(), stream); + self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle)); + } + Err(_) => { + self.machine_st.fail = true; + } + } + } + _ => { + unreachable!(); + } + ); + } + _ => { + unreachable!(); + } + ); + Ok(()) } #[cfg(feature = "http")]