From: Mark Thom Date: Wed, 30 Apr 2025 06:32:48 +0000 (-0700) Subject: Merge branch 'null-stream-safety' of https://github.com/adri326/scryer-prolog into... X-Git-Tag: v0.10.0~49^2 X-Git-Url: https://git.sagredo.dev/?a=commitdiff_plain;h=90f4716cc3a2271404baaf980a50ef6cc5116c3a;p=scryer-prolog.git Merge branch 'null-stream-safety' of https://github.com/adri326/scryer-prolog into adri326-null-stream-safety --- 90f4716cc3a2271404baaf980a50ef6cc5116c3a diff --cc src/ffi.rs index a3e93380,a3e93380..2895009b --- a/src/ffi.rs +++ b/src/ffi.rs @@@ -89,27 -89,27 +89,25 @@@ impl ForeignFunctionTable } fn map_type_ffi(&mut self, source: &Atom) -> *mut ffi_type { -- unsafe { -- match source { -- atom!("sint64") => addr_of_mut!(types::sint64), -- atom!("sint32") => addr_of_mut!(types::sint32), -- atom!("sint16") => addr_of_mut!(types::sint16), -- atom!("sint8") => addr_of_mut!(types::sint8), -- atom!("uint64") => addr_of_mut!(types::uint64), -- atom!("uint32") => addr_of_mut!(types::uint32), -- atom!("uint16") => addr_of_mut!(types::uint16), -- atom!("uint8") => addr_of_mut!(types::uint8), -- atom!("bool") => addr_of_mut!(types::sint8), -- atom!("void") => addr_of_mut!(types::void), -- atom!("cstr") => addr_of_mut!(types::pointer), -- atom!("ptr") => addr_of_mut!(types::pointer), -- atom!("f32") => addr_of_mut!(types::float), -- atom!("f64") => addr_of_mut!(types::double), -- struct_name => match self.structs.get_mut(&*struct_name.as_str()) { -- Some(ref mut struct_type) => &mut struct_type.ffi_type, -- None => unreachable!(), -- }, -- } ++ match source { ++ atom!("sint64") => addr_of_mut!(types::sint64), ++ atom!("sint32") => addr_of_mut!(types::sint32), ++ atom!("sint16") => addr_of_mut!(types::sint16), ++ atom!("sint8") => addr_of_mut!(types::sint8), ++ atom!("uint64") => addr_of_mut!(types::uint64), ++ atom!("uint32") => addr_of_mut!(types::uint32), ++ atom!("uint16") => addr_of_mut!(types::uint16), ++ atom!("uint8") => addr_of_mut!(types::uint8), ++ atom!("bool") => addr_of_mut!(types::sint8), ++ atom!("void") => addr_of_mut!(types::void), ++ atom!("cstr") => addr_of_mut!(types::pointer), ++ atom!("ptr") => addr_of_mut!(types::pointer), ++ atom!("f32") => addr_of_mut!(types::float), ++ atom!("f64") => addr_of_mut!(types::double), ++ struct_name => match self.structs.get_mut(&*struct_name.as_str()) { ++ Some(ref mut struct_type) => &mut struct_type.ffi_type, ++ None => unreachable!(), ++ }, } } diff --cc src/machine/mod.rs index 09d9f642,e2306283..da2ab6df --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@@ -490,16 -483,39 +490,22 @@@ impl Machine } } + /// Ensures that [`Machine::indices`] properly reflects + /// the streams stored in [`Machine::user_input`], [`Machine::user_output`] + /// and [`Machine::user_error`]. pub(crate) fn configure_streams(&mut self) { - self.user_input - .options_mut() - .set_alias_to_atom_opt(Some(atom!("user_input"))); - self.indices - .stream_aliases - .insert(atom!("user_input"), self.user_input); - - self.indices.streams.insert(self.user_input); - - self.user_output - .options_mut() - .set_alias_to_atom_opt(Some(atom!("user_output"))); - + .set_stream(atom!("user_input"), self.user_input); self.indices - .stream_aliases - .insert(atom!("user_output"), self.user_output); - - self.indices.streams.insert(self.user_output); - + .set_stream(atom!("user_output"), self.user_output); self.indices - .stream_aliases - .insert(atom!("user_error"), self.user_error); - - self.indices.streams.insert(self.user_error); + .set_stream(atom!("user_error"), self.user_error); + + let mut null_options = StreamOptions::default(); + null_options.set_alias_to_atom_opt(Some(atom!("null_stream"))); + + self.indices - .stream_aliases - .insert(atom!("null_stream"), Stream::Null(null_options)); ++ .set_stream(atom!("null_stream"), Stream::Null(null_options)); } #[inline(always)] diff --cc src/machine/streams.rs index 500ed679,54efbd39..678b2297 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@@ -964,14 -836,13 +964,14 @@@ impl Read for Stream ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, )), + Stream::OutputFile(_) + | Stream::StandardError(_) + | Stream::StandardOutput(_) - | Stream::Null(_) + | Stream::Callback(_) => Err(std::io::Error::new( + ErrorKind::PermissionDenied, + StreamError::ReadFromOutputStream, + )), + Stream::Null(_) => Ok(0), - Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => { - Err(std::io::Error::new( - ErrorKind::PermissionDenied, - StreamError::ReadFromOutputStream, - )) - } } } } @@@ -994,14 -864,10 +994,14 @@@ impl Write for Stream ErrorKind::PermissionDenied, StreamError::WriteToInputStream, )), + Stream::Null(_) => Ok(buf.len()), - Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(..) => Err( - std::io::Error::new(ErrorKind::PermissionDenied, StreamError::WriteToInputStream), - ), + Stream::StaticString(_) + | Stream::InputChannel(_) + | Stream::Readline(_) - | Stream::InputFile(..) - | Stream::Null(_) => Err(std::io::Error::new( ++ | Stream::InputFile(..) => Err(std::io::Error::new( + ErrorKind::PermissionDenied, + StreamError::WriteToInputStream, + )), } } @@@ -1022,14 -887,10 +1022,14 @@@ ErrorKind::PermissionDenied, StreamError::FlushToInputStream, )), + Stream::Null(_) => Ok(()), - Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(_) => Err( - std::io::Error::new(ErrorKind::PermissionDenied, StreamError::FlushToInputStream), - ), + Stream::StaticString(_) + | Stream::InputChannel(_) + | Stream::Readline(_) - | Stream::InputFile(_) - | Stream::Null(_) => Err(std::io::Error::new( ++ | Stream::InputFile(_) => Err(std::io::Error::new( + ErrorKind::PermissionDenied, + StreamError::FlushToInputStream, + )), } } } @@@ -1519,10 -1349,10 +1534,11 @@@ impl Stream Stream::HttpRead(..) => true, Stream::NamedTcp(..) | Stream::Byte(_) + | Stream::InputChannel(_) | Stream::Readline(_) | Stream::StaticString(_) - | Stream::InputFile(..) => true, + | Stream::InputFile(..) + | Stream::Null(_) => true, _ => false, } } @@@ -1538,8 -1368,8 +1554,9 @@@ | Stream::StandardOutput(_) | Stream::NamedTcp(..) | Stream::Byte(_) + | Stream::OutputFile(..) + | Stream::Callback(_) - | Stream::OutputFile(..) => true, + | Stream::Null(_) => true, _ => false, } } @@@ -1794,8 -1603,8 +1811,8 @@@ impl MachineState (HeapCellValueTag::Atom, (name, arity)) => { debug_assert_eq!(arity, 0); - return match stream_aliases.get(&name) { - Some(stream) => Ok(*stream), + return match indices.get_stream(name) { - Some(stream) if !stream.is_null_stream() => Ok(stream), ++ Some(stream) => Ok(stream), _ => { let stub = functor_stub(caller, arity); let addr = atom_as_cell!(name); @@@ -1812,8 -1621,8 +1829,8 @@@ debug_assert_eq!(arity, 0); - return match stream_aliases.get(&name) { - Some(stream) => Ok(*stream), + return match indices.get_stream(name) { - Some(stream) if !stream.is_null_stream() => Ok(stream), ++ Some(stream) => Ok(stream), _ => { let stub = functor_stub(caller, arity); let addr = atom_as_cell!(name); @@@ -2097,215 -1905,156 +2113,368 @@@ } #[cfg(test)] - mod tests { + mod test { - use super::*; + use crate::*; + use std::{cell::RefCell, io::Read, io::Write, rc::Rc}; + - fn succeeded(answer: Vec>) -> bool { + use crate::machine::config::*; + use crate::LeafAnswer; + ++ use super::{Stream, StreamOptions}; ++ ++ fn succeeded(answer: Vec>) -> bool { + // Ideally this should be a method in QueryState or LeafAnswer. + matches!( + answer[0].as_ref(), + Ok(LeafAnswer::True) | Ok(LeafAnswer::LeafAnswer { .. }) + ) + } + + fn is_successful(answer: &Result) -> bool { + matches!( + answer, + Ok(LeafAnswer::True) | Ok(LeafAnswer::LeafAnswer { .. }) + ) + } + + #[test] + #[cfg_attr(miri, ignore)] + fn user_input_string_stream() { + let streams = + StreamConfig::default().with_user_input(InputStreamConfig::string("a(1,2,3).")); + + let mut machine = MachineBuilder::default().with_streams(streams).build(); + + let complete_answer: Vec<_> = machine + .run_query(r#"current_input(_), \+ at_end_of_stream."#) + .collect(); + + assert!(succeeded(complete_answer)); + + let complete_answer: Vec<_> = machine.run_query("read(A).").collect(); + + assert_eq!( + complete_answer, + [Ok(LeafAnswer::from_bindings([( + "A", + Term::compound("a", [Term::integer(1), Term::integer(2), Term::integer(3),]) + )]))] + ); + + let complete_answer: Vec<_> = machine.run_query(r#"at_end_of_stream."#).collect(); + + assert!(succeeded(complete_answer)); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn user_input_channel_stream() { + let (mut user_input, channel_stream) = InputStreamConfig::channel(); + let streams = StreamConfig::default().with_user_input(channel_stream); + let mut machine = MachineBuilder::default().with_streams(streams).build(); + + let complete_answer: Vec<_> = machine + .run_query(r#"current_input(_), \+ at_end_of_stream."#) + .collect(); + + assert!(succeeded(complete_answer)); + + write!(user_input, "a(1,2,3).").unwrap(); + + let complete_answer: Vec<_> = machine + .run_query(r#"\+ at_end_of_stream, read(A)."#) + .collect(); + + assert_eq!( + complete_answer, + [Ok(LeafAnswer::from_bindings([( + "A", + Term::compound("a", [Term::integer(1), Term::integer(2), Term::integer(3),]) + )]))] + ); + + // End-of-data but not end-of-stream; + let complete_answer: Vec<_> = machine + .run_query( + r#" + use_module(library(charsio)), + current_input(In), get_n_chars(In, N, C), + N == 0, \+ at_end_of_stream. + "#, + ) + .collect(); + + assert!(succeeded(complete_answer)); + + // Dropping the sender closes the input + drop(user_input); + + let complete_answer: Vec<_> = machine + .run_query( + r#" + current_input(In), get_n_chars(In, N, _), + N == 0, at_end_of_stream. + "#, + ) + .collect(); + + assert!(succeeded(complete_answer)); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn user_output_callback_stream() { + let test_string = Rc::new(RefCell::new(String::new())); + + let streams = + StreamConfig::default().with_user_output(OutputStreamConfig::callback(Box::new({ + let test_string = test_string.clone(); + move |x| { + x.read_to_string(&mut test_string.borrow_mut()).unwrap(); + } + }))); + + let mut machine = MachineBuilder::default().with_streams(streams).build(); + + let complete_answer: Vec<_> = machine + .run_query(r#"current_output(Out), \+ at_end_of_stream(Out)."#) + .collect(); + + assert!(succeeded(complete_answer)); + + let complete_answer: Vec<_> = machine + .run_query(r#"write(asdf), nl, flush_output."#) + .collect(); + + assert!(succeeded(complete_answer)); + assert_eq!(test_string.borrow().as_str(), "asdf\n"); + + let complete_answer: Vec<_> = machine.run_query(r#"write(abcd), flush_output."#).collect(); + + assert!(succeeded(complete_answer)); + assert_eq!(test_string.borrow().as_str(), "asdf\nabcd"); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn close_memory_user_output_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query( + "\\+ \\+ (current_output(Stream), close(Stream)), write(user_output, hello).", + ) + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + + let mut actual = String::new(); + machine.user_output.read_to_string(&mut actual).unwrap(); + assert_eq!(actual, "hello"); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn current_input_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine.run_query("current_input(S).").collect::>(); + + assert_eq!(results.len(), 1); + assert!(is_successful(&results[0])); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn close_memory_user_output_stream_twice() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query("\\+ \\+ (current_output(Stream), close(Stream), close(Stream)).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn read_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine.run_query("get_code(C).").collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected read to succeed, got {:?}", + results[0] + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn close_realiased_stream() { + let mut machine = MachineBuilder::new().build(); + + let results = machine + .run_query( + r#" + \+ \+ ( + open("README.md", read, S, [alias(readme)]), + open(stream(S), read, _, [alias(another_alias)]), + close(S) + ), + open("README.md", read, _, [alias(readme)]). + "#, + ) + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn current_output_null_stream() { + // TODO: switch to a proper solution for configuring the machine with null streams + // once `StreamConfig` supports it. + let mut machine = MachineBuilder::new().build(); + machine.user_output = Stream::Null(StreamOptions::default()); + machine.configure_streams(); + + let results = machine.run_query("current_output(S).").collect::>(); + + assert_eq!(results.len(), 1); + assert!(is_successful(&results[0])); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn close_realiased_user_output() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query( + r#" + \+ \+ ( + open("README.md", read, S), + open(stream(S), read, _, [alias(user_output)]), + close(S) + ), + write(user_output, hello). + "#, + ) + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } ++ + #[test] + #[cfg_attr(miri, ignore)] + fn write_null_stream() { + // TODO: switch to a proper solution for configuring the machine with null streams + // once `StreamConfig` supports it. + let mut machine = MachineBuilder::new().build(); + machine.user_output = Stream::Null(StreamOptions::default()); + machine.configure_streams(); + + let results = machine.run_query("write(hello).").collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected write to succeed, got {:?}", + results[0] + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn put_code_null_stream() { + // TODO: switch to a proper solution for configuring the machine with null streams + // once `StreamConfig` supports it. + let mut machine = MachineBuilder::new().build(); + machine.user_output = Stream::Null(StreamOptions::default()); + machine.configure_streams(); + + let results = machine + .run_query("put_code(user_output, 65).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected write to succeed, got {:?}", + results[0] + ); + } + + /// A variant of the [`write_null_stream`] that tries to write to a (null) input stream. + #[test] + #[cfg_attr(miri, ignore)] + fn write_null_input_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query("current_input(Stream), write(Stream, hello).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected write to succeed, got {:?}", + results[0] + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn at_end_of_stream_0_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine.run_query("at_end_of_stream.").collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected at_end_of_stream to succeed, got {:?}", + results[0] + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn at_end_of_stream_1_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query("current_input(Stream), at_end_of_stream(Stream).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected at_end_of_stream to succeed, got {:?}", + results[0] + ); + } } diff --cc src/machine/system_calls.rs index abc8f0b0,11fe5c5f..dc5da32a --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@@ -3715,14 -3715,22 +3715,14 @@@ impl Machine #[inline(always)] pub(crate) fn first_stream(&mut self) { - let mut first_stream = None; - let mut null_streams = BTreeSet::new(); - - for stream in self.indices.streams.iter().cloned() { - if !stream.is_null_stream() { - first_stream = Some(stream); - break; - } else { - null_streams.insert(stream); - } - } - - self.indices.streams = self.indices.streams.sub(&null_streams); + let first_stream = self + .indices + .iter_streams(..) + .filter(|s| !s.is_null_stream()) + .next(); if let Some(first_stream) = first_stream { - let stream = stream_as_cell!(first_stream); + let stream = first_stream.into(); let var = self.deref_register(1).as_var().unwrap(); @@@ -3852,22 -3867,47 +3851,22 @@@ stream.flush().unwrap(); // 8.11.6.1b) } - self.indices.streams.remove(&stream); - - if stream == self.user_input { - self.user_input = self - .indices - .stream_aliases - .get(&atom!("user_input")) - .cloned() - .unwrap(); - - self.indices.streams.insert(self.user_input); - } else if stream == self.user_output { - self.user_output = self - .indices - .stream_aliases - .get(&atom!("user_output")) - .cloned() - .unwrap(); - - self.indices.streams.insert(self.user_output); + if stream == self.user_input || stream == self.user_output || stream.is_stderr() { + // stdin, stdout and stderr shouldn't be removed from the store, so return now + return Ok(()); } - if !stream.is_stdin() && !stream.is_stdout() && !stream.is_stderr() { - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.swap_remove(&alias); - } - - let close_result = stream.close(); - - if close_result.is_err() { - let stub = functor_stub(atom!("close"), 1); - let addr = stream.into(); - let err = self - .machine_st - .existence_error(ExistenceError::Stream(addr)); + self.indices.remove_stream(stream); - stream.close().map_err(|_| { - return Err(self.machine_st.error_form(err, stub)); - } - } ++ stream.close().or_else(|_| { + let stub = functor_stub(atom!("close"), 1); - let addr = stream_as_cell!(stream); ++ let addr = stream.into(); + let err = self + .machine_st + .existence_error(ExistenceError::Stream(addr)); - self.machine_st.error_form(err, stub) - Ok(()) ++ Err(self.machine_st.error_form(err, stub)) + }) } #[inline(always)] @@@ -4380,70 -4420,65 +4379,69 @@@ } // do it! - match futures::executor::block_on(req.send()) { - Ok(resp) => { - // status code - let status = resp.status().as_u16(); - self.machine_st - .unify_fixnum(Fixnum::build_with(status as i64), address_status); - // headers - let headers: Vec = resp - .headers() - .iter() - .map(|(header_name, header_value)| { - let h = self.machine_st.heap.len(); - - let header_term = functor!( - AtomTable::build_with( - &self.machine_st.atom_tbl, - header_name.as_str() - ), - [cell(string_as_cstr_cell!(AtomTable::build_with( - &self.machine_st.atom_tbl, - header_value.to_str().unwrap() - )))] - ); + task::block_in_place(move || { + match Handle::current().block_on(req.send()) { + Ok(resp) => { + // status code + let status = resp.status().as_u16(); + self.machine_st + .unify_fixnum(Fixnum::build_with(status as i64), address_status); + // headers + let headers: Vec = resp + .headers() + .iter() + .map(|(header_name, header_value)| { + let h = self.machine_st.heap.len(); + + let header_term = functor!( + AtomTable::build_with( + &self.machine_st.atom_tbl, + header_name.as_str() + ), + [cell(string_as_cstr_cell!(AtomTable::build_with( + &self.machine_st.atom_tbl, + header_value.to_str().unwrap() + )))] + ); - self.machine_st.heap.extend(header_term); - str_loc_as_cell!(h) - }) - .collect(); + self.machine_st.heap.extend(header_term); + str_loc_as_cell!(h) + }) + .collect(); - let headers_list = - iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); - unify!( - self.machine_st, - heap_loc_as_cell!(headers_list), - self.machine_st.registers[6] - ); - // body - let reader = futures::executor::block_on(resp.bytes()).unwrap().reader(); + let headers_list = + iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); - let mut stream = Stream::from_http_stream( - AtomTable::build_with(&self.machine_st.atom_tbl, &address_string), - reader, - &mut self.machine_st.arena, - ); - *stream.options_mut() = StreamOptions::default(); - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.insert(alias, stream); - } + unify!( + self.machine_st, + heap_loc_as_cell!(headers_list), + self.machine_st.registers[6] + ); - self.indices.streams.insert(stream); + // body + let reader = futures::executor::block_on(resp.bytes()).unwrap().reader(); - let stream_addr = self.deref_register(2); - self.machine_st - .bind(stream_addr.as_var().unwrap(), stream.into()); - } - Err(_) => { - self.machine_st.fail = true; + let mut stream = Stream::from_http_stream( + AtomTable::build_with(&self.machine_st.atom_tbl, &address_string), + reader, + &mut self.machine_st.arena, + ); + *stream.options_mut() = StreamOptions::default(); + + self.indices + .add_stream(stream, atom!("http_open"), 3) + .map_err(|stub_gen| stub_gen(&mut self.machine_st)) + .unwrap(); + - let stream = stream_as_cell!(stream); - + let stream_addr = self.deref_register(2); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); ++ self.machine_st ++ .bind(stream_addr.as_var().unwrap(), stream.into()); + } + Err(_) => { + self.machine_st.fail = true; + } } - } + }); } else { let err = self .machine_st @@@ -4634,39 -4669,36 +4632,39 @@@ let h = self.machine_st.heap.len(); let header_term = functor!(AtomTable::build_with(&self.machine_st.atom_tbl, header_name.as_str()), [cell(string_as_cstr_cell!(AtomTable::build_with(&self.machine_st.atom_tbl, header_value.to_str().unwrap())))]); -- self.machine_st.heap.extend(header_term.into_iter()); -- str_loc_as_cell!(h) -- }).collect(); ++ self.machine_st.heap.extend(header_term.into_iter()); ++ str_loc_as_cell!(h) ++ }).collect(); -- let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); ++ let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); -- let query_str = request.request_data.query; -- let query_atom = AtomTable::build_with(&self.machine_st.atom_tbl, &query_str); -- let query_cell = string_as_cstr_cell!(query_atom); ++ let query_str = request.request_data.query; ++ let query_atom = AtomTable::build_with(&self.machine_st.atom_tbl, &query_str); ++ let query_cell = string_as_cstr_cell!(query_atom); -- let mut stream = Stream::from_http_stream( -- path_atom, -- request.request_data.body, -- &mut self.machine_st.arena -- ); -- *stream.options_mut() = StreamOptions::default(); -- stream.options_mut().set_stream_type(StreamType::Binary); - self.indices.streams.insert(stream); - let stream: HeapCellValue = stream.into(); - - let handle: TypedArenaPtr = arena_alloc!(request.response, &mut self.machine_st.arena); - - self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom)); - self.machine_st.bind(path.as_var().unwrap(), path_cell); - unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]); - self.machine_st.bind(query.as_var().unwrap(), query_cell); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); - self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle)); - break - } ++ let mut stream = Stream::from_http_stream( ++ path_atom, ++ request.request_data.body, ++ &mut self.machine_st.arena ++ ); ++ *stream.options_mut() = StreamOptions::default(); ++ stream.options_mut().set_stream_type(StreamType::Binary); + - self.indices.add_stream(stream, atom!("http_accept"), 7) - .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; ++ self.indices.add_stream(stream, atom!("http_accept"), 7) ++ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; + - let stream = stream_as_cell!(stream); ++ let stream: HeapCellValue = stream.into(); + - let handle: TypedArenaPtr = arena_alloc!(request.response, &mut self.machine_st.arena); ++ let handle: TypedArenaPtr = arena_alloc!(request.response, &mut self.machine_st.arena); + - self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom)); - self.machine_st.bind(path.as_var().unwrap(), path_cell); - unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]); - self.machine_st.bind(query.as_var().unwrap(), query_cell); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); - self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle)); - break - } ++ self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom)); ++ self.machine_st.bind(path.as_var().unwrap(), path_cell); ++ unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]); ++ self.machine_st.bind(query.as_var().unwrap(), query_cell); ++ self.machine_st.bind(stream_addr.as_var().unwrap(), stream); ++ self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle)); ++ break ++ } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { let interrupted = machine::INTERRUPT.load(std::sync::atomic::Ordering::Relaxed); @@@ -4758,31 -4790,26 +4756,29 @@@ read_heap_cell!(culprit, (HeapCellValueTag::Cons, cons_ptr) => { - match_untyped_arena_ptr!(cons_ptr, - (ArenaHeaderTag::HttpResponse, http_response) => { - let mut stream = Stream::from_http_sender( - http_response, - status_code, - headers, - &mut self.machine_st.arena - ); - *stream.options_mut() = StreamOptions::default(); - stream.options_mut().set_stream_type(StreamType::Binary); - + match_untyped_arena_ptr!(cons_ptr, + (ArenaHeaderTag::HttpResponse, http_response) => { + let mut stream = Stream::from_http_sender( + http_response, + status_code, + headers, + &mut self.machine_st.arena + ); + - self.indices.add_stream(stream, atom!("http_answer"), 4) - .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; + *stream.options_mut() = StreamOptions::default(); + stream.options_mut().set_stream_type(StreamType::Binary); - self.indices.streams.insert(stream); + - let stream = stream_as_cell!(stream); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); - } - _ => { - unreachable!(); - } - ); ++ self.indices.add_stream(stream, atom!("http_answer"), 4) ++ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; + self.machine_st.bind(stream_addr.as_var().unwrap(), stream.into()); + } + _ => { + unreachable!(); + } + ); } _ => { - unreachable!(); + unreachable!(); } ); @@@ -6520,11 -6550,13 +6516,11 @@@ *stream.options_mut() = options; - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.insert(alias, stream); - } - - self.indices.streams.insert(stream); + self.indices + .add_stream(stream, atom!("socket_client_open"), 7) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; - stream_as_cell!(stream) + HeapCellValue::from(stream) } Err(ErrorKind::PermissionDenied) => { return Err(self.machine_st.open_permission_error( @@@ -6674,12 -6706,12 +6670,11 @@@ *tcp_stream.options_mut() = options; - if let Some(alias) = &tcp_stream.options().get_alias() { - self.indices.stream_aliases.insert(*alias, tcp_stream); - } - - self.indices.streams.insert(tcp_stream); + self.indices.add_stream(tcp_stream, atom!("socket_server_accept"), 4) + .map_err(|stub_gen| { + stub_gen(&mut self.machine_st) + })?; - let tcp_stream = stream_as_cell!(tcp_stream); let client = atom_as_cell!(client); let client_addr = self.deref_register(2); @@@ -6732,15 -6764,13 +6727,16 @@@ let addr = atom!("TLS"); let stream = Stream::from_tls_stream(addr, stream, &mut self.machine_st.arena); - self.indices.streams.insert(stream); + + self.indices + .add_stream(stream, atom!("tls_client_negotiate"), 3) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; - self.machine_st.heap.push(stream_as_cell!(stream)); + // FIXME: why are we pushing a random, unreferenced cell on the heap? + self.machine_st.heap.push(stream.into()); let stream_addr = self.deref_register(3); self.machine_st - .bind(stream_addr.as_var().unwrap(), stream_as_cell!(stream)); + .bind(stream_addr.as_var().unwrap(), stream.into()); Ok(()) } else {