From 5d468d3e1989a11e12d6aa00a50ab7df4ba38fb7 Mon Sep 17 00:00:00 2001 From: revue_2_presse Date: Tue, 25 Feb 2025 20:19:14 +0100 Subject: [PATCH] Replace futures::executor::block_on with tokio::block_in_place --- src/machine/system_calls.rs | 111 +++++++++++++++------------- tests-pl/issue-http_open-hanging.pl | 23 ++++++ tests/scryer/helper.rs | 16 ++++ tests/scryer/issues.rs | 13 ++++ 4 files changed, 111 insertions(+), 52 deletions(-) create mode 100644 tests-pl/issue-http_open-hanging.pl diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 71e90182..abc8f0b0 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -91,6 +91,8 @@ use roxmltree; use futures::future; #[cfg(feature = "http")] use reqwest::Url; +use tokio::runtime::Handle; +use tokio::task; #[cfg(feature = "http")] use warp::hyper::header::{HeaderName, HeaderValue}; #[cfg(feature = "http")] @@ -4378,65 +4380,70 @@ impl Machine { } // do it! - match futures::executor::block_on(req.send()) { - Ok(resp) => { - // status code - let status = resp.status().as_u16(); - self.machine_st - .unify_fixnum(Fixnum::build_with(status as i64), address_status); - // headers - let headers: Vec = resp - .headers() - .iter() - .map(|(header_name, header_value)| { - let h = self.machine_st.heap.len(); - - let header_term = functor!( - AtomTable::build_with( - &self.machine_st.atom_tbl, - header_name.as_str() - ), - [cell(string_as_cstr_cell!(AtomTable::build_with( - &self.machine_st.atom_tbl, - header_value.to_str().unwrap() - )))] - ); + task::block_in_place(move || { + match Handle::current().block_on(req.send()) { + Ok(resp) => { + // status code + let status = resp.status().as_u16(); + self.machine_st + .unify_fixnum(Fixnum::build_with(status as i64), address_status); + // headers + let headers: Vec = resp + .headers() + .iter() + .map(|(header_name, header_value)| { + let h = self.machine_st.heap.len(); + + let header_term = functor!( + AtomTable::build_with( + &self.machine_st.atom_tbl, + header_name.as_str() + ), + [cell(string_as_cstr_cell!(AtomTable::build_with( + &self.machine_st.atom_tbl, + header_value.to_str().unwrap() + )))] + ); - self.machine_st.heap.extend(header_term); - str_loc_as_cell!(h) - }) - .collect(); + self.machine_st.heap.extend(header_term); + str_loc_as_cell!(h) + }) + .collect(); - let headers_list = - iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); - unify!( - self.machine_st, - heap_loc_as_cell!(headers_list), - self.machine_st.registers[6] - ); - // body - let reader = futures::executor::block_on(resp.bytes()).unwrap().reader(); + let headers_list = + iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); - let mut stream = Stream::from_http_stream( - AtomTable::build_with(&self.machine_st.atom_tbl, &address_string), - reader, - &mut self.machine_st.arena, - ); - *stream.options_mut() = StreamOptions::default(); + unify!( + self.machine_st, + heap_loc_as_cell!(headers_list), + self.machine_st.registers[6] + ); - self.indices - .add_stream(stream, atom!("http_open"), 3) - .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; + // body + let reader = futures::executor::block_on(resp.bytes()).unwrap().reader(); - let stream = stream_as_cell!(stream); + let mut stream = Stream::from_http_stream( + AtomTable::build_with(&self.machine_st.atom_tbl, &address_string), + reader, + &mut self.machine_st.arena, + ); + *stream.options_mut() = StreamOptions::default(); - let stream_addr = self.deref_register(2); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); - } - Err(_) => { - self.machine_st.fail = true; + self.indices + .add_stream(stream, atom!("http_open"), 3) + .map_err(|stub_gen| stub_gen(&mut self.machine_st)) + .unwrap(); + + let stream = stream_as_cell!(stream); + + let stream_addr = self.deref_register(2); + self.machine_st.bind(stream_addr.as_var().unwrap(), stream); + } + Err(_) => { + self.machine_st.fail = true; + } } - } + }); } else { let err = self .machine_st diff --git a/tests-pl/issue-http_open-hanging.pl b/tests-pl/issue-http_open-hanging.pl new file mode 100644 index 00000000..b26b5d06 --- /dev/null +++ b/tests-pl/issue-http_open-hanging.pl @@ -0,0 +1,23 @@ +:- module(http_open_hanging, [submit_request/0]). + +:- use_module(library(charsio)). +:- use_module(library(http/http_open)). + +send_request :- + Options = [ + method('get'), + status_code(StatusCode), + request_headers([]), + headers(_) + ], + http_open("https://scryer.pl", _Stream, Options), + write_term('received response with status code':StatusCode, []), nl. + +main :- + send_request, + send_request, + send_request, + send_request, + send_request. + +:- initialization(main). \ No newline at end of file diff --git a/tests/scryer/helper.rs b/tests/scryer/helper.rs index bb6402a1..b7c2dd77 100644 --- a/tests/scryer/helper.rs +++ b/tests/scryer/helper.rs @@ -1,3 +1,5 @@ +use scryer_prolog::MachineBuilder; + pub(crate) trait Expectable { #[track_caller] fn assert_eq(self, other: &[u8]); @@ -31,3 +33,17 @@ pub(crate) fn load_module_test(file: &str, expected: T) { let mut wam = MachineBuilder::default().build(); expected.assert_eq(wam.test_load_file(file).as_slice()); } + +/// Same as `load_module_test` with tokio runtime +#[cfg(not(target_arch = "wasm32"))] +pub(crate) fn load_module_test_with_tokio_runtime(file: &str, expected: T) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async move { + let mut wam = MachineBuilder::default().build(); + expected.assert_eq(wam.test_load_file(file).as_slice()) + }); +} diff --git a/tests/scryer/issues.rs b/tests/scryer/issues.rs index 784c20d0..974183a4 100644 --- a/tests/scryer/issues.rs +++ b/tests/scryer/issues.rs @@ -1,4 +1,6 @@ use crate::helper::load_module_test; +#[cfg(not(target_arch = "wasm32"))] +use crate::helper::load_module_test_with_tokio_runtime; use serial_test::serial; // issue #831 @@ -43,3 +45,14 @@ fn load_context_unreachable() { fn issue2725_dcg_without_module() { load_module_test("tests-pl/issue2725.pl", ""); } + +#[test] +#[cfg(feature = "http")] +#[cfg(not(target_arch = "wasm32"))] +#[cfg_attr(miri, ignore = "it takes too long to run")] +fn http_open_hanging() { + load_module_test_with_tokio_runtime( + "tests-pl/issue-http_open-hanging.pl", + "received response with status code:200\nreceived response with status code:200\nreceived response with status code:200\nreceived response with status code:200\nreceived response with status code:200\n" + ); +} -- 2.54.0