}
fn map_type_ffi(&mut self, source: &Atom) -> *mut ffi_type {
-- unsafe {
-- match source {
-- atom!("sint64") => addr_of_mut!(types::sint64),
-- atom!("sint32") => addr_of_mut!(types::sint32),
-- atom!("sint16") => addr_of_mut!(types::sint16),
-- atom!("sint8") => addr_of_mut!(types::sint8),
-- atom!("uint64") => addr_of_mut!(types::uint64),
-- atom!("uint32") => addr_of_mut!(types::uint32),
-- atom!("uint16") => addr_of_mut!(types::uint16),
-- atom!("uint8") => addr_of_mut!(types::uint8),
-- atom!("bool") => addr_of_mut!(types::sint8),
-- atom!("void") => addr_of_mut!(types::void),
-- atom!("cstr") => addr_of_mut!(types::pointer),
-- atom!("ptr") => addr_of_mut!(types::pointer),
-- atom!("f32") => addr_of_mut!(types::float),
-- atom!("f64") => addr_of_mut!(types::double),
-- struct_name => match self.structs.get_mut(&*struct_name.as_str()) {
-- Some(ref mut struct_type) => &mut struct_type.ffi_type,
-- None => unreachable!(),
-- },
-- }
++ match source {
++ atom!("sint64") => addr_of_mut!(types::sint64),
++ atom!("sint32") => addr_of_mut!(types::sint32),
++ atom!("sint16") => addr_of_mut!(types::sint16),
++ atom!("sint8") => addr_of_mut!(types::sint8),
++ atom!("uint64") => addr_of_mut!(types::uint64),
++ atom!("uint32") => addr_of_mut!(types::uint32),
++ atom!("uint16") => addr_of_mut!(types::uint16),
++ atom!("uint8") => addr_of_mut!(types::uint8),
++ atom!("bool") => addr_of_mut!(types::sint8),
++ atom!("void") => addr_of_mut!(types::void),
++ atom!("cstr") => addr_of_mut!(types::pointer),
++ atom!("ptr") => addr_of_mut!(types::pointer),
++ atom!("f32") => addr_of_mut!(types::float),
++ atom!("f64") => addr_of_mut!(types::double),
++ struct_name => match self.structs.get_mut(&*struct_name.as_str()) {
++ Some(ref mut struct_type) => &mut struct_type.ffi_type,
++ None => unreachable!(),
++ },
}
}
}
}
+ /// Ensures that [`Machine::indices`] properly reflects
+ /// the streams stored in [`Machine::user_input`], [`Machine::user_output`]
+ /// and [`Machine::user_error`].
pub(crate) fn configure_streams(&mut self) {
- self.user_input
- .options_mut()
- .set_alias_to_atom_opt(Some(atom!("user_input")));
-
self.indices
- .stream_aliases
- .insert(atom!("user_input"), self.user_input);
-
- self.indices.streams.insert(self.user_input);
-
- self.user_output
- .options_mut()
- .set_alias_to_atom_opt(Some(atom!("user_output")));
-
+ .set_stream(atom!("user_input"), self.user_input);
self.indices
- .stream_aliases
- .insert(atom!("user_output"), self.user_output);
-
- self.indices.streams.insert(self.user_output);
-
+ .set_stream(atom!("user_output"), self.user_output);
self.indices
- .stream_aliases
- .insert(atom!("user_error"), self.user_error);
-
- self.indices.streams.insert(self.user_error);
+ .set_stream(atom!("user_error"), self.user_error);
+
+ let mut null_options = StreamOptions::default();
+ null_options.set_alias_to_atom_opt(Some(atom!("null_stream")));
+
+ self.indices
- .stream_aliases
- .insert(atom!("null_stream"), Stream::Null(null_options));
++ .set_stream(atom!("null_stream"), Stream::Null(null_options));
}
#[inline(always)]
ErrorKind::PermissionDenied,
StreamError::ReadFromOutputStream,
)),
- | Stream::Null(_)
+ Stream::OutputFile(_)
+ | Stream::StandardError(_)
+ | Stream::StandardOutput(_)
- Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => {
- Err(std::io::Error::new(
- ErrorKind::PermissionDenied,
- StreamError::ReadFromOutputStream,
- ))
- }
+ | Stream::Callback(_) => Err(std::io::Error::new(
+ ErrorKind::PermissionDenied,
+ StreamError::ReadFromOutputStream,
+ )),
+ Stream::Null(_) => Ok(0),
}
}
}
ErrorKind::PermissionDenied,
StreamError::WriteToInputStream,
)),
- Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(..) => Err(
- std::io::Error::new(ErrorKind::PermissionDenied, StreamError::WriteToInputStream),
- ),
+ Stream::Null(_) => Ok(buf.len()),
- | Stream::InputFile(..)
- | Stream::Null(_) => Err(std::io::Error::new(
+ Stream::StaticString(_)
+ | Stream::InputChannel(_)
+ | Stream::Readline(_)
++ | Stream::InputFile(..) => Err(std::io::Error::new(
+ ErrorKind::PermissionDenied,
+ StreamError::WriteToInputStream,
+ )),
}
}
ErrorKind::PermissionDenied,
StreamError::FlushToInputStream,
)),
- Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(_) => Err(
- std::io::Error::new(ErrorKind::PermissionDenied, StreamError::FlushToInputStream),
- ),
+ Stream::Null(_) => Ok(()),
- | Stream::InputFile(_)
- | Stream::Null(_) => Err(std::io::Error::new(
+ Stream::StaticString(_)
+ | Stream::InputChannel(_)
+ | Stream::Readline(_)
++ | Stream::InputFile(_) => Err(std::io::Error::new(
+ ErrorKind::PermissionDenied,
+ StreamError::FlushToInputStream,
+ )),
}
}
}
Stream::HttpRead(..) => true,
Stream::NamedTcp(..)
| Stream::Byte(_)
+ | Stream::InputChannel(_)
| Stream::Readline(_)
| Stream::StaticString(_)
- | Stream::InputFile(..) => true,
+ | Stream::InputFile(..)
+ | Stream::Null(_) => true,
_ => false,
}
}
| Stream::StandardOutput(_)
| Stream::NamedTcp(..)
| Stream::Byte(_)
- | Stream::OutputFile(..) => true,
+ | Stream::OutputFile(..)
+ | Stream::Callback(_)
+ | Stream::Null(_) => true,
_ => false,
}
}
(HeapCellValueTag::Atom, (name, arity)) => {
debug_assert_eq!(arity, 0);
- return match stream_aliases.get(&name) {
- Some(stream) => Ok(*stream),
+ return match indices.get_stream(name) {
- Some(stream) if !stream.is_null_stream() => Ok(stream),
++ Some(stream) => Ok(stream),
_ => {
let stub = functor_stub(caller, arity);
let addr = atom_as_cell!(name);
debug_assert_eq!(arity, 0);
- return match stream_aliases.get(&name) {
- Some(stream) => Ok(*stream),
+ return match indices.get_stream(name) {
- Some(stream) if !stream.is_null_stream() => Ok(stream),
++ Some(stream) => Ok(stream),
_ => {
let stub = functor_stub(caller, arity);
let addr = atom_as_cell!(name);
}
#[cfg(test)]
- mod tests {
+ mod test {
- use super::*;
+ use crate::*;
+ use std::{cell::RefCell, io::Read, io::Write, rc::Rc};
+
- fn succeeded(answer: Vec<Result<LeafAnswer, Term>>) -> bool {
+ use crate::machine::config::*;
+ use crate::LeafAnswer;
+
++ use super::{Stream, StreamOptions};
++
++ fn succeeded<T>(answer: Vec<Result<LeafAnswer, T>>) -> bool {
+ // Ideally this should be a method in QueryState or LeafAnswer.
+ matches!(
+ answer[0].as_ref(),
+ Ok(LeafAnswer::True) | Ok(LeafAnswer::LeafAnswer { .. })
+ )
+ }
+
+ fn is_successful<T>(answer: &Result<LeafAnswer, T>) -> bool {
+ matches!(
+ answer,
+ Ok(LeafAnswer::True) | Ok(LeafAnswer::LeafAnswer { .. })
+ )
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn user_input_string_stream() {
+ let streams =
+ StreamConfig::default().with_user_input(InputStreamConfig::string("a(1,2,3)."));
+
+ let mut machine = MachineBuilder::default().with_streams(streams).build();
+
+ let complete_answer: Vec<_> = machine
+ .run_query(r#"current_input(_), \+ at_end_of_stream."#)
+ .collect();
+
+ assert!(succeeded(complete_answer));
+
+ let complete_answer: Vec<_> = machine.run_query("read(A).").collect();
+
+ assert_eq!(
+ complete_answer,
+ [Ok(LeafAnswer::from_bindings([(
+ "A",
+ Term::compound("a", [Term::integer(1), Term::integer(2), Term::integer(3),])
+ )]))]
+ );
+
+ let complete_answer: Vec<_> = machine.run_query(r#"at_end_of_stream."#).collect();
+
+ assert!(succeeded(complete_answer));
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn user_input_channel_stream() {
+ let (mut user_input, channel_stream) = InputStreamConfig::channel();
+ let streams = StreamConfig::default().with_user_input(channel_stream);
+ let mut machine = MachineBuilder::default().with_streams(streams).build();
+
+ let complete_answer: Vec<_> = machine
+ .run_query(r#"current_input(_), \+ at_end_of_stream."#)
+ .collect();
+
+ assert!(succeeded(complete_answer));
+
+ write!(user_input, "a(1,2,3).").unwrap();
+
+ let complete_answer: Vec<_> = machine
+ .run_query(r#"\+ at_end_of_stream, read(A)."#)
+ .collect();
+
+ assert_eq!(
+ complete_answer,
+ [Ok(LeafAnswer::from_bindings([(
+ "A",
+ Term::compound("a", [Term::integer(1), Term::integer(2), Term::integer(3),])
+ )]))]
+ );
+
+ // End-of-data but not end-of-stream;
+ let complete_answer: Vec<_> = machine
+ .run_query(
+ r#"
+ use_module(library(charsio)),
+ current_input(In), get_n_chars(In, N, C),
+ N == 0, \+ at_end_of_stream.
+ "#,
+ )
+ .collect();
+
+ assert!(succeeded(complete_answer));
+
+ // Dropping the sender closes the input
+ drop(user_input);
+
+ let complete_answer: Vec<_> = machine
+ .run_query(
+ r#"
+ current_input(In), get_n_chars(In, N, _),
+ N == 0, at_end_of_stream.
+ "#,
+ )
+ .collect();
+
+ assert!(succeeded(complete_answer));
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn user_output_callback_stream() {
+ let test_string = Rc::new(RefCell::new(String::new()));
+
+ let streams =
+ StreamConfig::default().with_user_output(OutputStreamConfig::callback(Box::new({
+ let test_string = test_string.clone();
+ move |x| {
+ x.read_to_string(&mut test_string.borrow_mut()).unwrap();
+ }
+ })));
+
+ let mut machine = MachineBuilder::default().with_streams(streams).build();
+
+ let complete_answer: Vec<_> = machine
+ .run_query(r#"current_output(Out), \+ at_end_of_stream(Out)."#)
+ .collect();
+
+ assert!(succeeded(complete_answer));
+
+ let complete_answer: Vec<_> = machine
+ .run_query(r#"write(asdf), nl, flush_output."#)
+ .collect();
+
+ assert!(succeeded(complete_answer));
+ assert_eq!(test_string.borrow().as_str(), "asdf\n");
+
+ let complete_answer: Vec<_> = machine.run_query(r#"write(abcd), flush_output."#).collect();
+
+ assert!(succeeded(complete_answer));
+ assert_eq!(test_string.borrow().as_str(), "asdf\nabcd");
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn close_memory_user_output_stream() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine
+ .run_query(
+ "\\+ \\+ (current_output(Stream), close(Stream)), write(user_output, hello).",
+ )
+ .collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(results[0].is_ok());
+
+ let mut actual = String::new();
+ machine.user_output.read_to_string(&mut actual).unwrap();
+ assert_eq!(actual, "hello");
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn current_input_null_stream() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine.run_query("current_input(S).").collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(is_successful(&results[0]));
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn close_memory_user_output_stream_twice() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine
+ .run_query("\\+ \\+ (current_output(Stream), close(Stream), close(Stream)).")
+ .collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(results[0].is_ok());
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn read_null_stream() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine.run_query("get_code(C).").collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(
+ is_successful(&results[0]),
+ "Expected read to succeed, got {:?}",
+ results[0]
+ );
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn close_realiased_stream() {
+ let mut machine = MachineBuilder::new().build();
+
+ let results = machine
+ .run_query(
+ r#"
+ \+ \+ (
+ open("README.md", read, S, [alias(readme)]),
+ open(stream(S), read, _, [alias(another_alias)]),
+ close(S)
+ ),
+ open("README.md", read, _, [alias(readme)]).
+ "#,
+ )
+ .collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(results[0].is_ok());
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn current_output_null_stream() {
+ // TODO: switch to a proper solution for configuring the machine with null streams
+ // once `StreamConfig` supports it.
+ let mut machine = MachineBuilder::new().build();
+ machine.user_output = Stream::Null(StreamOptions::default());
+ machine.configure_streams();
+
+ let results = machine.run_query("current_output(S).").collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(is_successful(&results[0]));
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn close_realiased_user_output() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine
+ .run_query(
+ r#"
+ \+ \+ (
+ open("README.md", read, S),
+ open(stream(S), read, _, [alias(user_output)]),
+ close(S)
+ ),
+ write(user_output, hello).
+ "#,
+ )
+ .collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(results[0].is_ok());
+ }
++
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn write_null_stream() {
+ // TODO: switch to a proper solution for configuring the machine with null streams
+ // once `StreamConfig` supports it.
+ let mut machine = MachineBuilder::new().build();
+ machine.user_output = Stream::Null(StreamOptions::default());
+ machine.configure_streams();
+
+ let results = machine.run_query("write(hello).").collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(
+ is_successful(&results[0]),
+ "Expected write to succeed, got {:?}",
+ results[0]
+ );
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn put_code_null_stream() {
+ // TODO: switch to a proper solution for configuring the machine with null streams
+ // once `StreamConfig` supports it.
+ let mut machine = MachineBuilder::new().build();
+ machine.user_output = Stream::Null(StreamOptions::default());
+ machine.configure_streams();
+
+ let results = machine
+ .run_query("put_code(user_output, 65).")
+ .collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(
+ is_successful(&results[0]),
+ "Expected write to succeed, got {:?}",
+ results[0]
+ );
+ }
+
+ /// A variant of the [`write_null_stream`] that tries to write to a (null) input stream.
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn write_null_input_stream() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine
+ .run_query("current_input(Stream), write(Stream, hello).")
+ .collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(
+ is_successful(&results[0]),
+ "Expected write to succeed, got {:?}",
+ results[0]
+ );
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn at_end_of_stream_0_null_stream() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine.run_query("at_end_of_stream.").collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(
+ is_successful(&results[0]),
+ "Expected at_end_of_stream to succeed, got {:?}",
+ results[0]
+ );
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn at_end_of_stream_1_null_stream() {
+ let mut machine = MachineBuilder::new()
+ .with_streams(StreamConfig::in_memory())
+ .build();
+
+ let results = machine
+ .run_query("current_input(Stream), at_end_of_stream(Stream).")
+ .collect::<Vec<_>>();
+
+ assert_eq!(results.len(), 1);
+ assert!(
+ is_successful(&results[0]),
+ "Expected at_end_of_stream to succeed, got {:?}",
+ results[0]
+ );
+ }
}
#[inline(always)]
pub(crate) fn first_stream(&mut self) {
- let mut first_stream = None;
- let mut null_streams = BTreeSet::new();
-
- for stream in self.indices.streams.iter().cloned() {
- if !stream.is_null_stream() {
- first_stream = Some(stream);
- break;
- } else {
- null_streams.insert(stream);
- }
- }
-
- self.indices.streams = self.indices.streams.sub(&null_streams);
+ let first_stream = self
+ .indices
+ .iter_streams(..)
+ .filter(|s| !s.is_null_stream())
+ .next();
if let Some(first_stream) = first_stream {
- let stream = stream_as_cell!(first_stream);
+ let stream = first_stream.into();
let var = self.deref_register(1).as_var().unwrap();
stream.flush().unwrap(); // 8.11.6.1b)
}
- self.indices.streams.remove(&stream);
-
- if stream == self.user_input {
- self.user_input = self
- .indices
- .stream_aliases
- .get(&atom!("user_input"))
- .cloned()
- .unwrap();
-
- self.indices.streams.insert(self.user_input);
- } else if stream == self.user_output {
- self.user_output = self
- .indices
- .stream_aliases
- .get(&atom!("user_output"))
- .cloned()
- .unwrap();
-
- self.indices.streams.insert(self.user_output);
+ if stream == self.user_input || stream == self.user_output || stream.is_stderr() {
+ // stdin, stdout and stderr shouldn't be removed from the store, so return now
+ return Ok(());
}
- if !stream.is_stdin() && !stream.is_stdout() && !stream.is_stderr() {
- if let Some(alias) = stream.options().get_alias() {
- self.indices.stream_aliases.swap_remove(&alias);
- }
-
- let close_result = stream.close();
-
- if close_result.is_err() {
- let stub = functor_stub(atom!("close"), 1);
- let addr = stream.into();
- let err = self
- .machine_st
- .existence_error(ExistenceError::Stream(addr));
+ self.indices.remove_stream(stream);
- stream.close().map_err(|_| {
- return Err(self.machine_st.error_form(err, stub));
- }
- }
++ stream.close().or_else(|_| {
+ let stub = functor_stub(atom!("close"), 1);
- let addr = stream_as_cell!(stream);
++ let addr = stream.into();
+ let err = self
+ .machine_st
+ .existence_error(ExistenceError::Stream(addr));
- self.machine_st.error_form(err, stub)
- Ok(())
++ Err(self.machine_st.error_form(err, stub))
+ })
}
#[inline(always)]
}
// do it!
- match futures::executor::block_on(req.send()) {
- Ok(resp) => {
- // status code
- let status = resp.status().as_u16();
- self.machine_st
- .unify_fixnum(Fixnum::build_with(status as i64), address_status);
- // headers
- let headers: Vec<HeapCellValue> = resp
- .headers()
- .iter()
- .map(|(header_name, header_value)| {
- let h = self.machine_st.heap.len();
-
- let header_term = functor!(
- AtomTable::build_with(
- &self.machine_st.atom_tbl,
- header_name.as_str()
- ),
- [cell(string_as_cstr_cell!(AtomTable::build_with(
- &self.machine_st.atom_tbl,
- header_value.to_str().unwrap()
- )))]
- );
+ task::block_in_place(move || {
+ match Handle::current().block_on(req.send()) {
+ Ok(resp) => {
+ // status code
+ let status = resp.status().as_u16();
+ self.machine_st
+ .unify_fixnum(Fixnum::build_with(status as i64), address_status);
+ // headers
+ let headers: Vec<HeapCellValue> = resp
+ .headers()
+ .iter()
+ .map(|(header_name, header_value)| {
+ let h = self.machine_st.heap.len();
+
+ let header_term = functor!(
+ AtomTable::build_with(
+ &self.machine_st.atom_tbl,
+ header_name.as_str()
+ ),
+ [cell(string_as_cstr_cell!(AtomTable::build_with(
+ &self.machine_st.atom_tbl,
+ header_value.to_str().unwrap()
+ )))]
+ );
- self.machine_st.heap.extend(header_term);
- str_loc_as_cell!(h)
- })
- .collect();
+ self.machine_st.heap.extend(header_term);
+ str_loc_as_cell!(h)
+ })
+ .collect();
- let headers_list =
- iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
- unify!(
- self.machine_st,
- heap_loc_as_cell!(headers_list),
- self.machine_st.registers[6]
- );
- // body
- let reader = futures::executor::block_on(resp.bytes()).unwrap().reader();
+ let headers_list =
+ iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
- let mut stream = Stream::from_http_stream(
- AtomTable::build_with(&self.machine_st.atom_tbl, &address_string),
- reader,
- &mut self.machine_st.arena,
- );
- *stream.options_mut() = StreamOptions::default();
- if let Some(alias) = stream.options().get_alias() {
- self.indices.stream_aliases.insert(alias, stream);
- }
+ unify!(
+ self.machine_st,
+ heap_loc_as_cell!(headers_list),
+ self.machine_st.registers[6]
+ );
- self.indices.streams.insert(stream);
+ // body
+ let reader = futures::executor::block_on(resp.bytes()).unwrap().reader();
- let stream_addr = self.deref_register(2);
- self.machine_st
- .bind(stream_addr.as_var().unwrap(), stream.into());
- }
- Err(_) => {
- self.machine_st.fail = true;
+ let mut stream = Stream::from_http_stream(
+ AtomTable::build_with(&self.machine_st.atom_tbl, &address_string),
+ reader,
+ &mut self.machine_st.arena,
+ );
+ *stream.options_mut() = StreamOptions::default();
+
+ self.indices
+ .add_stream(stream, atom!("http_open"), 3)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))
+ .unwrap();
+
- let stream = stream_as_cell!(stream);
-
+ let stream_addr = self.deref_register(2);
- self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
++ self.machine_st
++ .bind(stream_addr.as_var().unwrap(), stream.into());
+ }
+ Err(_) => {
+ self.machine_st.fail = true;
+ }
}
- }
+ });
} else {
let err = self
.machine_st
let h = self.machine_st.heap.len();
let header_term = functor!(AtomTable::build_with(&self.machine_st.atom_tbl, header_name.as_str()), [cell(string_as_cstr_cell!(AtomTable::build_with(&self.machine_st.atom_tbl, header_value.to_str().unwrap())))]);
-- self.machine_st.heap.extend(header_term.into_iter());
-- str_loc_as_cell!(h)
-- }).collect();
++ self.machine_st.heap.extend(header_term.into_iter());
++ str_loc_as_cell!(h)
++ }).collect();
-- let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
++ let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
-- let query_str = request.request_data.query;
-- let query_atom = AtomTable::build_with(&self.machine_st.atom_tbl, &query_str);
-- let query_cell = string_as_cstr_cell!(query_atom);
++ let query_str = request.request_data.query;
++ let query_atom = AtomTable::build_with(&self.machine_st.atom_tbl, &query_str);
++ let query_cell = string_as_cstr_cell!(query_atom);
-- let mut stream = Stream::from_http_stream(
-- path_atom,
-- request.request_data.body,
-- &mut self.machine_st.arena
-- );
-- *stream.options_mut() = StreamOptions::default();
-- stream.options_mut().set_stream_type(StreamType::Binary);
- self.indices.streams.insert(stream);
- let stream: HeapCellValue = stream.into();
-
- let handle: TypedArenaPtr<HttpResponse> = arena_alloc!(request.response, &mut self.machine_st.arena);
-
- self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
- self.machine_st.bind(path.as_var().unwrap(), path_cell);
- unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
- self.machine_st.bind(query.as_var().unwrap(), query_cell);
- self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
- self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
- break
- }
++ let mut stream = Stream::from_http_stream(
++ path_atom,
++ request.request_data.body,
++ &mut self.machine_st.arena
++ );
++ *stream.options_mut() = StreamOptions::default();
++ stream.options_mut().set_stream_type(StreamType::Binary);
+
- self.indices.add_stream(stream, atom!("http_accept"), 7)
- .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
++ self.indices.add_stream(stream, atom!("http_accept"), 7)
++ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
+
- let stream = stream_as_cell!(stream);
++ let stream: HeapCellValue = stream.into();
+
- let handle: TypedArenaPtr<HttpResponse> = arena_alloc!(request.response, &mut self.machine_st.arena);
++ let handle: TypedArenaPtr<HttpResponse> = arena_alloc!(request.response, &mut self.machine_st.arena);
+
- self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
- self.machine_st.bind(path.as_var().unwrap(), path_cell);
- unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
- self.machine_st.bind(query.as_var().unwrap(), query_cell);
- self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
- self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
- break
- }
++ self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
++ self.machine_st.bind(path.as_var().unwrap(), path_cell);
++ unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
++ self.machine_st.bind(query.as_var().unwrap(), query_cell);
++ self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
++ self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
++ break
++ }
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
let interrupted = machine::INTERRUPT.load(std::sync::atomic::Ordering::Relaxed);
read_heap_cell!(culprit,
(HeapCellValueTag::Cons, cons_ptr) => {
- match_untyped_arena_ptr!(cons_ptr,
- (ArenaHeaderTag::HttpResponse, http_response) => {
- let mut stream = Stream::from_http_sender(
- http_response,
- status_code,
- headers,
- &mut self.machine_st.arena
- );
- *stream.options_mut() = StreamOptions::default();
- stream.options_mut().set_stream_type(StreamType::Binary);
-
+ match_untyped_arena_ptr!(cons_ptr,
+ (ArenaHeaderTag::HttpResponse, http_response) => {
+ let mut stream = Stream::from_http_sender(
+ http_response,
+ status_code,
+ headers,
+ &mut self.machine_st.arena
+ );
+
- self.indices.add_stream(stream, atom!("http_answer"), 4)
- .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
+ *stream.options_mut() = StreamOptions::default();
+ stream.options_mut().set_stream_type(StreamType::Binary);
- self.indices.streams.insert(stream);
+
- let stream = stream_as_cell!(stream);
- self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
- }
- _ => {
- unreachable!();
- }
- );
++ self.indices.add_stream(stream, atom!("http_answer"), 4)
++ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
+ self.machine_st.bind(stream_addr.as_var().unwrap(), stream.into());
+ }
+ _ => {
+ unreachable!();
+ }
+ );
}
_ => {
- unreachable!();
+ unreachable!();
}
);
*stream.options_mut() = options;
- if let Some(alias) = stream.options().get_alias() {
- self.indices.stream_aliases.insert(alias, stream);
- }
-
- self.indices.streams.insert(stream);
+ self.indices
+ .add_stream(stream, atom!("socket_client_open"), 7)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
- stream_as_cell!(stream)
+ HeapCellValue::from(stream)
}
Err(ErrorKind::PermissionDenied) => {
return Err(self.machine_st.open_permission_error(
*tcp_stream.options_mut() = options;
- if let Some(alias) = &tcp_stream.options().get_alias() {
- self.indices.stream_aliases.insert(*alias, tcp_stream);
- }
-
- self.indices.streams.insert(tcp_stream);
+ self.indices.add_stream(tcp_stream, atom!("socket_server_accept"), 4)
+ .map_err(|stub_gen| {
+ stub_gen(&mut self.machine_st)
+ })?;
- let tcp_stream = stream_as_cell!(tcp_stream);
let client = atom_as_cell!(client);
let client_addr = self.deref_register(2);
let addr = atom!("TLS");
let stream = Stream::from_tls_stream(addr, stream, &mut self.machine_st.arena);
- self.indices.streams.insert(stream);
+
+ self.indices
+ .add_stream(stream, atom!("tls_client_negotiate"), 3)
+ .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
- self.machine_st.heap.push(stream_as_cell!(stream));
+ // FIXME: why are we pushing a random, unreferenced cell on the heap?
+ self.machine_st.heap.push(stream.into());
let stream_addr = self.deref_register(3);
self.machine_st
- .bind(stream_addr.as_var().unwrap(), stream_as_cell!(stream));
+ .bind(stream_addr.as_var().unwrap(), stream.into());
Ok(())
} else {