]> Repositorios git - scryer-prolog.git/commitdiff
Merge branch 'null-stream-safety' of https://github.com/adri326/scryer-prolog into... adri326-null-stream-safety origin/adri326-null-stream-safety
authorMark Thom <[email protected]>
Wed, 30 Apr 2025 06:32:48 +0000 (23:32 -0700)
committerMark Thom <[email protected]>
Wed, 30 Apr 2025 06:32:48 +0000 (23:32 -0700)
1  2 
src/ffi.rs
src/lib/builtins.pl
src/machine/lib_machine/mod.rs
src/machine/mod.rs
src/machine/streams.rs
src/machine/system_calls.rs
src/macros.rs

diff --cc src/ffi.rs
index a3e933800adda8fb89fda716aa9a68db1cefa4d5,a3e933800adda8fb89fda716aa9a68db1cefa4d5..2895009b013c4722176fa175b4d409412798942d
@@@ -89,27 -89,27 +89,25 @@@ impl ForeignFunctionTable 
      }
  
      fn map_type_ffi(&mut self, source: &Atom) -> *mut ffi_type {
--        unsafe {
--            match source {
--                atom!("sint64") => addr_of_mut!(types::sint64),
--                atom!("sint32") => addr_of_mut!(types::sint32),
--                atom!("sint16") => addr_of_mut!(types::sint16),
--                atom!("sint8") => addr_of_mut!(types::sint8),
--                atom!("uint64") => addr_of_mut!(types::uint64),
--                atom!("uint32") => addr_of_mut!(types::uint32),
--                atom!("uint16") => addr_of_mut!(types::uint16),
--                atom!("uint8") => addr_of_mut!(types::uint8),
--                atom!("bool") => addr_of_mut!(types::sint8),
--                atom!("void") => addr_of_mut!(types::void),
--                atom!("cstr") => addr_of_mut!(types::pointer),
--                atom!("ptr") => addr_of_mut!(types::pointer),
--                atom!("f32") => addr_of_mut!(types::float),
--                atom!("f64") => addr_of_mut!(types::double),
--                struct_name => match self.structs.get_mut(&*struct_name.as_str()) {
--                    Some(ref mut struct_type) => &mut struct_type.ffi_type,
--                    None => unreachable!(),
--                },
--            }
++        match source {
++            atom!("sint64") => addr_of_mut!(types::sint64),
++            atom!("sint32") => addr_of_mut!(types::sint32),
++            atom!("sint16") => addr_of_mut!(types::sint16),
++            atom!("sint8") => addr_of_mut!(types::sint8),
++            atom!("uint64") => addr_of_mut!(types::uint64),
++            atom!("uint32") => addr_of_mut!(types::uint32),
++            atom!("uint16") => addr_of_mut!(types::uint16),
++            atom!("uint8") => addr_of_mut!(types::uint8),
++            atom!("bool") => addr_of_mut!(types::sint8),
++            atom!("void") => addr_of_mut!(types::void),
++            atom!("cstr") => addr_of_mut!(types::pointer),
++            atom!("ptr") => addr_of_mut!(types::pointer),
++            atom!("f32") => addr_of_mut!(types::float),
++            atom!("f64") => addr_of_mut!(types::double),
++            struct_name => match self.structs.get_mut(&*struct_name.as_str()) {
++                Some(ref mut struct_type) => &mut struct_type.ffi_type,
++                None => unreachable!(),
++            },
          }
      }
  
Simple merge
Simple merge
index 09d9f642363760cfdeb2619ea794bc9aee9e59f8,e2306283b5f7e2bd904262fc58864efa6b4aab0e..da2ab6df53b87c0016e4e863c7207b98fb846891
@@@ -490,16 -483,39 +490,22 @@@ impl Machine 
          }
      }
  
 +    /// Ensures that [`Machine::indices`] properly reflects
 +    /// the streams stored in [`Machine::user_input`], [`Machine::user_output`]
 +    /// and [`Machine::user_error`].
      pub(crate) fn configure_streams(&mut self) {
 -        self.user_input
 -            .options_mut()
 -            .set_alias_to_atom_opt(Some(atom!("user_input")));
 -
          self.indices
 -            .stream_aliases
 -            .insert(atom!("user_input"), self.user_input);
 -
 -        self.indices.streams.insert(self.user_input);
 -
 -        self.user_output
 -            .options_mut()
 -            .set_alias_to_atom_opt(Some(atom!("user_output")));
 -
 +            .set_stream(atom!("user_input"), self.user_input);
          self.indices
 -            .stream_aliases
 -            .insert(atom!("user_output"), self.user_output);
 -
 -        self.indices.streams.insert(self.user_output);
 -
 +            .set_stream(atom!("user_output"), self.user_output);
          self.indices
 -            .stream_aliases
 -            .insert(atom!("user_error"), self.user_error);
 -
 -        self.indices.streams.insert(self.user_error);
 +            .set_stream(atom!("user_error"), self.user_error);
+         let mut null_options = StreamOptions::default();
+         null_options.set_alias_to_atom_opt(Some(atom!("null_stream")));
+         self.indices
 -            .stream_aliases
 -            .insert(atom!("null_stream"), Stream::Null(null_options));
++            .set_stream(atom!("null_stream"), Stream::Null(null_options));
      }
  
      #[inline(always)]
index 500ed67930ac3d32a48ccfb544c1bb6a70380468,54efbd39cb065e185d8e737bd2a453acc85f2b16..678b2297b8e5089b1c90ccf3746da74cfe180845
@@@ -964,14 -836,13 +964,14 @@@ impl Read for Stream 
                  ErrorKind::PermissionDenied,
                  StreamError::ReadFromOutputStream,
              )),
-             | Stream::Null(_)
 +            Stream::OutputFile(_)
 +            | Stream::StandardError(_)
 +            | Stream::StandardOutput(_)
 -            Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => {
 -                Err(std::io::Error::new(
 -                    ErrorKind::PermissionDenied,
 -                    StreamError::ReadFromOutputStream,
 -                ))
 -            }
 +            | Stream::Callback(_) => Err(std::io::Error::new(
 +                ErrorKind::PermissionDenied,
 +                StreamError::ReadFromOutputStream,
 +            )),
+             Stream::Null(_) => Ok(0),
          }
      }
  }
@@@ -994,14 -864,10 +994,14 @@@ impl Write for Stream 
                  ErrorKind::PermissionDenied,
                  StreamError::WriteToInputStream,
              )),
 -            Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(..) => Err(
 -                std::io::Error::new(ErrorKind::PermissionDenied, StreamError::WriteToInputStream),
 -            ),
+             Stream::Null(_) => Ok(buf.len()),
-             | Stream::InputFile(..)
-             | Stream::Null(_) => Err(std::io::Error::new(
 +            Stream::StaticString(_)
 +            | Stream::InputChannel(_)
 +            | Stream::Readline(_)
++            | Stream::InputFile(..) => Err(std::io::Error::new(
 +                ErrorKind::PermissionDenied,
 +                StreamError::WriteToInputStream,
 +            )),
          }
      }
  
                  ErrorKind::PermissionDenied,
                  StreamError::FlushToInputStream,
              )),
 -            Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(_) => Err(
 -                std::io::Error::new(ErrorKind::PermissionDenied, StreamError::FlushToInputStream),
 -            ),
+             Stream::Null(_) => Ok(()),
-             | Stream::InputFile(_)
-             | Stream::Null(_) => Err(std::io::Error::new(
 +            Stream::StaticString(_)
 +            | Stream::InputChannel(_)
 +            | Stream::Readline(_)
++            | Stream::InputFile(_) => Err(std::io::Error::new(
 +                ErrorKind::PermissionDenied,
 +                StreamError::FlushToInputStream,
 +            )),
          }
      }
  }
@@@ -1519,10 -1349,10 +1534,11 @@@ impl Stream 
              Stream::HttpRead(..) => true,
              Stream::NamedTcp(..)
              | Stream::Byte(_)
 +            | Stream::InputChannel(_)
              | Stream::Readline(_)
              | Stream::StaticString(_)
-             | Stream::InputFile(..) => true,
+             | Stream::InputFile(..)
+             | Stream::Null(_) => true,
              _ => false,
          }
      }
              | Stream::StandardOutput(_)
              | Stream::NamedTcp(..)
              | Stream::Byte(_)
-             | Stream::OutputFile(..) => true,
+             | Stream::OutputFile(..)
 +            | Stream::Callback(_)
+             | Stream::Null(_) => true,
              _ => false,
          }
      }
@@@ -1794,8 -1603,8 +1811,8 @@@ impl MachineState 
                          (HeapCellValueTag::Atom, (name, arity)) => {
                              debug_assert_eq!(arity, 0);
  
 -                            return match stream_aliases.get(&name) {
 -                                Some(stream) => Ok(*stream),
 +                            return match indices.get_stream(name) {
-                                 Some(stream) if !stream.is_null_stream() => Ok(stream),
++                                Some(stream) => Ok(stream),
                                  _ => {
                                      let stub = functor_stub(caller, arity);
                                      let addr = atom_as_cell!(name);
  
                              debug_assert_eq!(arity, 0);
  
 -                            return match stream_aliases.get(&name) {
 -                                Some(stream) => Ok(*stream),
 +                            return match indices.get_stream(name) {
-                                 Some(stream) if !stream.is_null_stream() => Ok(stream),
++                                Some(stream) => Ok(stream),
                                  _ => {
                                      let stub = functor_stub(caller, arity);
                                      let addr = atom_as_cell!(name);
  }
  
  #[cfg(test)]
- mod tests {
+ mod test {
 -    use super::*;
 +    use crate::*;
 +    use std::{cell::RefCell, io::Read, io::Write, rc::Rc};
 +
-     fn succeeded(answer: Vec<Result<LeafAnswer, Term>>) -> bool {
+     use crate::machine::config::*;
+     use crate::LeafAnswer;
++    use super::{Stream, StreamOptions};
++
++    fn succeeded<T>(answer: Vec<Result<LeafAnswer, T>>) -> bool {
 +        // Ideally this should be a method in QueryState or LeafAnswer.
 +        matches!(
 +            answer[0].as_ref(),
 +            Ok(LeafAnswer::True) | Ok(LeafAnswer::LeafAnswer { .. })
 +        )
 +    }
 +
+     fn is_successful<T>(answer: &Result<LeafAnswer, T>) -> bool {
+         matches!(
+             answer,
+             Ok(LeafAnswer::True) | Ok(LeafAnswer::LeafAnswer { .. })
+         )
+     }
 +    #[test]
 +    #[cfg_attr(miri, ignore)]
 +    fn user_input_string_stream() {
 +        let streams =
 +            StreamConfig::default().with_user_input(InputStreamConfig::string("a(1,2,3)."));
 +
 +        let mut machine = MachineBuilder::default().with_streams(streams).build();
 +
 +        let complete_answer: Vec<_> = machine
 +            .run_query(r#"current_input(_), \+ at_end_of_stream."#)
 +            .collect();
 +
 +        assert!(succeeded(complete_answer));
 +
 +        let complete_answer: Vec<_> = machine.run_query("read(A).").collect();
 +
 +        assert_eq!(
 +            complete_answer,
 +            [Ok(LeafAnswer::from_bindings([(
 +                "A",
 +                Term::compound("a", [Term::integer(1), Term::integer(2), Term::integer(3),])
 +            )]))]
 +        );
 +
 +        let complete_answer: Vec<_> = machine.run_query(r#"at_end_of_stream."#).collect();
 +
 +        assert!(succeeded(complete_answer));
 +    }
 +
 +    #[test]
 +    #[cfg_attr(miri, ignore)]
 +    fn user_input_channel_stream() {
 +        let (mut user_input, channel_stream) = InputStreamConfig::channel();
 +        let streams = StreamConfig::default().with_user_input(channel_stream);
 +        let mut machine = MachineBuilder::default().with_streams(streams).build();
 +
 +        let complete_answer: Vec<_> = machine
 +            .run_query(r#"current_input(_), \+ at_end_of_stream."#)
 +            .collect();
 +
 +        assert!(succeeded(complete_answer));
 +
 +        write!(user_input, "a(1,2,3).").unwrap();
 +
 +        let complete_answer: Vec<_> = machine
 +            .run_query(r#"\+ at_end_of_stream, read(A)."#)
 +            .collect();
 +
 +        assert_eq!(
 +            complete_answer,
 +            [Ok(LeafAnswer::from_bindings([(
 +                "A",
 +                Term::compound("a", [Term::integer(1), Term::integer(2), Term::integer(3),])
 +            )]))]
 +        );
 +
 +        // End-of-data but not end-of-stream;
 +        let complete_answer: Vec<_> = machine
 +            .run_query(
 +                r#"
 +                use_module(library(charsio)),
 +                current_input(In), get_n_chars(In, N, C),
 +                N == 0, \+ at_end_of_stream.
 +            "#,
 +            )
 +            .collect();
 +
 +        assert!(succeeded(complete_answer));
 +
 +        // Dropping the sender closes the input
 +        drop(user_input);
 +
 +        let complete_answer: Vec<_> = machine
 +            .run_query(
 +                r#"
 +                current_input(In), get_n_chars(In, N, _),
 +                N == 0, at_end_of_stream.
 +            "#,
 +            )
 +            .collect();
 +
 +        assert!(succeeded(complete_answer));
 +    }
 +
 +    #[test]
 +    #[cfg_attr(miri, ignore)]
 +    fn user_output_callback_stream() {
 +        let test_string = Rc::new(RefCell::new(String::new()));
 +
 +        let streams =
 +            StreamConfig::default().with_user_output(OutputStreamConfig::callback(Box::new({
 +                let test_string = test_string.clone();
 +                move |x| {
 +                    x.read_to_string(&mut test_string.borrow_mut()).unwrap();
 +                }
 +            })));
 +
 +        let mut machine = MachineBuilder::default().with_streams(streams).build();
 +
 +        let complete_answer: Vec<_> = machine
 +            .run_query(r#"current_output(Out), \+ at_end_of_stream(Out)."#)
 +            .collect();
 +
 +        assert!(succeeded(complete_answer));
 +
 +        let complete_answer: Vec<_> = machine
 +            .run_query(r#"write(asdf), nl, flush_output."#)
 +            .collect();
 +
 +        assert!(succeeded(complete_answer));
 +        assert_eq!(test_string.borrow().as_str(), "asdf\n");
 +
 +        let complete_answer: Vec<_> = machine.run_query(r#"write(abcd), flush_output."#).collect();
 +
 +        assert!(succeeded(complete_answer));
 +        assert_eq!(test_string.borrow().as_str(), "asdf\nabcd");
 +    }
 +
 +    #[test]
 +    #[cfg_attr(miri, ignore)]
 +    fn close_memory_user_output_stream() {
 +        let mut machine = MachineBuilder::new()
 +            .with_streams(StreamConfig::in_memory())
 +            .build();
 +
 +        let results = machine
 +            .run_query(
 +                "\\+ \\+ (current_output(Stream), close(Stream)), write(user_output, hello).",
 +            )
 +            .collect::<Vec<_>>();
 +
 +        assert_eq!(results.len(), 1);
 +        assert!(results[0].is_ok());
 +
 +        let mut actual = String::new();
 +        machine.user_output.read_to_string(&mut actual).unwrap();
 +        assert_eq!(actual, "hello");
 +    }
 +
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn current_input_null_stream() {
+         let mut machine = MachineBuilder::new()
+             .with_streams(StreamConfig::in_memory())
+             .build();
+         let results = machine.run_query("current_input(S).").collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(is_successful(&results[0]));
+     }
 +    #[test]
 +    #[cfg_attr(miri, ignore)]
 +    fn close_memory_user_output_stream_twice() {
 +        let mut machine = MachineBuilder::new()
 +            .with_streams(StreamConfig::in_memory())
 +            .build();
 +
 +        let results = machine
 +            .run_query("\\+ \\+ (current_output(Stream), close(Stream), close(Stream)).")
 +            .collect::<Vec<_>>();
 +
 +        assert_eq!(results.len(), 1);
 +        assert!(results[0].is_ok());
 +    }
 +
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn read_null_stream() {
+         let mut machine = MachineBuilder::new()
+             .with_streams(StreamConfig::in_memory())
+             .build();
+         let results = machine.run_query("get_code(C).").collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(
+             is_successful(&results[0]),
+             "Expected read to succeed, got {:?}",
+             results[0]
+         );
+     }
 +    #[test]
 +    #[cfg_attr(miri, ignore)]
 +    fn close_realiased_stream() {
 +        let mut machine = MachineBuilder::new().build();
 +
 +        let results = machine
 +            .run_query(
 +                r#"
 +                \+ \+ (
 +                    open("README.md", read, S, [alias(readme)]),
 +                    open(stream(S), read, _, [alias(another_alias)]),
 +                    close(S)
 +                ),
 +                open("README.md", read, _, [alias(readme)]).
 +            "#,
 +            )
 +            .collect::<Vec<_>>();
 +
 +        assert_eq!(results.len(), 1);
 +        assert!(results[0].is_ok());
 +    }
 +
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn current_output_null_stream() {
+         // TODO: switch to a proper solution for configuring the machine with null streams
+         // once `StreamConfig` supports it.
+         let mut machine = MachineBuilder::new().build();
+         machine.user_output = Stream::Null(StreamOptions::default());
+         machine.configure_streams();
+         let results = machine.run_query("current_output(S).").collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(is_successful(&results[0]));
+     }
 +    #[test]
 +    #[cfg_attr(miri, ignore)]
 +    fn close_realiased_user_output() {
 +        let mut machine = MachineBuilder::new()
 +            .with_streams(StreamConfig::in_memory())
 +            .build();
 +
 +        let results = machine
 +            .run_query(
 +                r#"
 +                \+ \+ (
 +                    open("README.md", read, S),
 +                    open(stream(S), read, _, [alias(user_output)]),
 +                    close(S)
 +                ),
 +                write(user_output, hello).
 +            "#,
 +            )
 +            .collect::<Vec<_>>();
 +
 +        assert_eq!(results.len(), 1);
 +        assert!(results[0].is_ok());
 +    }
++
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn write_null_stream() {
+         // TODO: switch to a proper solution for configuring the machine with null streams
+         // once `StreamConfig` supports it.
+         let mut machine = MachineBuilder::new().build();
+         machine.user_output = Stream::Null(StreamOptions::default());
+         machine.configure_streams();
+         let results = machine.run_query("write(hello).").collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(
+             is_successful(&results[0]),
+             "Expected write to succeed, got {:?}",
+             results[0]
+         );
+     }
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn put_code_null_stream() {
+         // TODO: switch to a proper solution for configuring the machine with null streams
+         // once `StreamConfig` supports it.
+         let mut machine = MachineBuilder::new().build();
+         machine.user_output = Stream::Null(StreamOptions::default());
+         machine.configure_streams();
+         let results = machine
+             .run_query("put_code(user_output, 65).")
+             .collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(
+             is_successful(&results[0]),
+             "Expected write to succeed, got {:?}",
+             results[0]
+         );
+     }
+     /// A variant of the [`write_null_stream`] that tries to write to a (null) input stream.
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn write_null_input_stream() {
+         let mut machine = MachineBuilder::new()
+             .with_streams(StreamConfig::in_memory())
+             .build();
+         let results = machine
+             .run_query("current_input(Stream), write(Stream, hello).")
+             .collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(
+             is_successful(&results[0]),
+             "Expected write to succeed, got {:?}",
+             results[0]
+         );
+     }
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn at_end_of_stream_0_null_stream() {
+         let mut machine = MachineBuilder::new()
+             .with_streams(StreamConfig::in_memory())
+             .build();
+         let results = machine.run_query("at_end_of_stream.").collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(
+             is_successful(&results[0]),
+             "Expected at_end_of_stream to succeed, got {:?}",
+             results[0]
+         );
+     }
+     #[test]
+     #[cfg_attr(miri, ignore)]
+     fn at_end_of_stream_1_null_stream() {
+         let mut machine = MachineBuilder::new()
+             .with_streams(StreamConfig::in_memory())
+             .build();
+         let results = machine
+             .run_query("current_input(Stream), at_end_of_stream(Stream).")
+             .collect::<Vec<_>>();
+         assert_eq!(results.len(), 1);
+         assert!(
+             is_successful(&results[0]),
+             "Expected at_end_of_stream to succeed, got {:?}",
+             results[0]
+         );
+     }
  }
index abc8f0b0ffb66568ef7207ccc325c4991794e4d8,11fe5c5f18735d4abc85fadec3f6a45b61ab7ec8..dc5da32a67eb41f04dfc1862fe1c6541c4c58947
@@@ -3715,14 -3715,22 +3715,14 @@@ impl Machine 
  
      #[inline(always)]
      pub(crate) fn first_stream(&mut self) {
 -        let mut first_stream = None;
 -        let mut null_streams = BTreeSet::new();
 -
 -        for stream in self.indices.streams.iter().cloned() {
 -            if !stream.is_null_stream() {
 -                first_stream = Some(stream);
 -                break;
 -            } else {
 -                null_streams.insert(stream);
 -            }
 -        }
 -
 -        self.indices.streams = self.indices.streams.sub(&null_streams);
 +        let first_stream = self
 +            .indices
 +            .iter_streams(..)
 +            .filter(|s| !s.is_null_stream())
 +            .next();
  
          if let Some(first_stream) = first_stream {
-             let stream = stream_as_cell!(first_stream);
+             let stream = first_stream.into();
  
              let var = self.deref_register(1).as_var().unwrap();
  
              stream.flush().unwrap(); // 8.11.6.1b)
          }
  
 -        self.indices.streams.remove(&stream);
 -
 -        if stream == self.user_input {
 -            self.user_input = self
 -                .indices
 -                .stream_aliases
 -                .get(&atom!("user_input"))
 -                .cloned()
 -                .unwrap();
 -
 -            self.indices.streams.insert(self.user_input);
 -        } else if stream == self.user_output {
 -            self.user_output = self
 -                .indices
 -                .stream_aliases
 -                .get(&atom!("user_output"))
 -                .cloned()
 -                .unwrap();
 -
 -            self.indices.streams.insert(self.user_output);
 +        if stream == self.user_input || stream == self.user_output || stream.is_stderr() {
 +            // stdin, stdout and stderr shouldn't be removed from the store, so return now
 +            return Ok(());
          }
  
 -        if !stream.is_stdin() && !stream.is_stdout() && !stream.is_stderr() {
 -            if let Some(alias) = stream.options().get_alias() {
 -                self.indices.stream_aliases.swap_remove(&alias);
 -            }
 -
 -            let close_result = stream.close();
 -
 -            if close_result.is_err() {
 -                let stub = functor_stub(atom!("close"), 1);
 -                let addr = stream.into();
 -                let err = self
 -                    .machine_st
 -                    .existence_error(ExistenceError::Stream(addr));
 +        self.indices.remove_stream(stream);
  
-         stream.close().map_err(|_| {
 -                return Err(self.machine_st.error_form(err, stub));
 -            }
 -        }
++        stream.close().or_else(|_| {
 +            let stub = functor_stub(atom!("close"), 1);
-             let addr = stream_as_cell!(stream);
++            let addr = stream.into();
 +            let err = self
 +                .machine_st
 +                .existence_error(ExistenceError::Stream(addr));
  
-             self.machine_st.error_form(err, stub)
 -        Ok(())
++            Err(self.machine_st.error_form(err, stub))
 +        })
      }
  
      #[inline(always)]
              }
  
              // do it!
 -            match futures::executor::block_on(req.send()) {
 -                Ok(resp) => {
 -                    // status code
 -                    let status = resp.status().as_u16();
 -                    self.machine_st
 -                        .unify_fixnum(Fixnum::build_with(status as i64), address_status);
 -                    // headers
 -                    let headers: Vec<HeapCellValue> = resp
 -                        .headers()
 -                        .iter()
 -                        .map(|(header_name, header_value)| {
 -                            let h = self.machine_st.heap.len();
 -
 -                            let header_term = functor!(
 -                                AtomTable::build_with(
 -                                    &self.machine_st.atom_tbl,
 -                                    header_name.as_str()
 -                                ),
 -                                [cell(string_as_cstr_cell!(AtomTable::build_with(
 -                                    &self.machine_st.atom_tbl,
 -                                    header_value.to_str().unwrap()
 -                                )))]
 -                            );
 +            task::block_in_place(move || {
 +                match Handle::current().block_on(req.send()) {
 +                    Ok(resp) => {
 +                        // status code
 +                        let status = resp.status().as_u16();
 +                        self.machine_st
 +                            .unify_fixnum(Fixnum::build_with(status as i64), address_status);
 +                        // headers
 +                        let headers: Vec<HeapCellValue> = resp
 +                            .headers()
 +                            .iter()
 +                            .map(|(header_name, header_value)| {
 +                                let h = self.machine_st.heap.len();
 +
 +                                let header_term = functor!(
 +                                    AtomTable::build_with(
 +                                        &self.machine_st.atom_tbl,
 +                                        header_name.as_str()
 +                                    ),
 +                                    [cell(string_as_cstr_cell!(AtomTable::build_with(
 +                                        &self.machine_st.atom_tbl,
 +                                        header_value.to_str().unwrap()
 +                                    )))]
 +                                );
  
 -                            self.machine_st.heap.extend(header_term);
 -                            str_loc_as_cell!(h)
 -                        })
 -                        .collect();
 +                                self.machine_st.heap.extend(header_term);
 +                                str_loc_as_cell!(h)
 +                            })
 +                            .collect();
  
 -                    let headers_list =
 -                        iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
 -                    unify!(
 -                        self.machine_st,
 -                        heap_loc_as_cell!(headers_list),
 -                        self.machine_st.registers[6]
 -                    );
 -                    // body
 -                    let reader = futures::executor::block_on(resp.bytes()).unwrap().reader();
 +                        let headers_list =
 +                            iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
  
 -                    let mut stream = Stream::from_http_stream(
 -                        AtomTable::build_with(&self.machine_st.atom_tbl, &address_string),
 -                        reader,
 -                        &mut self.machine_st.arena,
 -                    );
 -                    *stream.options_mut() = StreamOptions::default();
 -                    if let Some(alias) = stream.options().get_alias() {
 -                        self.indices.stream_aliases.insert(alias, stream);
 -                    }
 +                        unify!(
 +                            self.machine_st,
 +                            heap_loc_as_cell!(headers_list),
 +                            self.machine_st.registers[6]
 +                        );
  
 -                    self.indices.streams.insert(stream);
 +                        // body
 +                        let reader = futures::executor::block_on(resp.bytes()).unwrap().reader();
  
 -                    let stream_addr = self.deref_register(2);
 -                    self.machine_st
 -                        .bind(stream_addr.as_var().unwrap(), stream.into());
 -                }
 -                Err(_) => {
 -                    self.machine_st.fail = true;
 +                        let mut stream = Stream::from_http_stream(
 +                            AtomTable::build_with(&self.machine_st.atom_tbl, &address_string),
 +                            reader,
 +                            &mut self.machine_st.arena,
 +                        );
 +                        *stream.options_mut() = StreamOptions::default();
 +
 +                        self.indices
 +                            .add_stream(stream, atom!("http_open"), 3)
 +                            .map_err(|stub_gen| stub_gen(&mut self.machine_st))
 +                            .unwrap();
 +
-                         let stream = stream_as_cell!(stream);
 +                        let stream_addr = self.deref_register(2);
-                         self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
++                        self.machine_st
++                            .bind(stream_addr.as_var().unwrap(), stream.into());
 +                    }
 +                    Err(_) => {
 +                        self.machine_st.fail = true;
 +                    }
                  }
 -            }
 +            });
          } else {
              let err = self
                  .machine_st
                      let h = self.machine_st.heap.len();
                      let header_term = functor!(AtomTable::build_with(&self.machine_st.atom_tbl, header_name.as_str()), [cell(string_as_cstr_cell!(AtomTable::build_with(&self.machine_st.atom_tbl, header_value.to_str().unwrap())))]);
  
--                                    self.machine_st.heap.extend(header_term.into_iter());
--                                    str_loc_as_cell!(h)
--                                }).collect();
++                    self.machine_st.heap.extend(header_term.into_iter());
++                    str_loc_as_cell!(h)
++                }).collect();
  
--                                let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
++                    let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter());
  
--                let query_str = request.request_data.query;
--                let query_atom = AtomTable::build_with(&self.machine_st.atom_tbl, &query_str);
--                let query_cell = string_as_cstr_cell!(query_atom);
++                    let query_str = request.request_data.query;
++                    let query_atom = AtomTable::build_with(&self.machine_st.atom_tbl, &query_str);
++                    let query_cell = string_as_cstr_cell!(query_atom);
  
--                let mut stream = Stream::from_http_stream(
--                    path_atom,
--                    request.request_data.body,
--                    &mut self.machine_st.arena
--                );
--                *stream.options_mut() = StreamOptions::default();
--                stream.options_mut().set_stream_type(StreamType::Binary);
 -                self.indices.streams.insert(stream);
 -                let stream: HeapCellValue = stream.into();
 -
 -                                let handle: TypedArenaPtr<HttpResponse> = arena_alloc!(request.response, &mut self.machine_st.arena);
 -
 -                                self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
 -                                self.machine_st.bind(path.as_var().unwrap(), path_cell);
 -                                unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
 -                                self.machine_st.bind(query.as_var().unwrap(), query_cell);
 -                                self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
 -                                self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
 -                break
 -                            }
++                    let mut stream = Stream::from_http_stream(
++                        path_atom,
++                        request.request_data.body,
++                        &mut self.machine_st.arena
++                    );
++                    *stream.options_mut() = StreamOptions::default();
++                    stream.options_mut().set_stream_type(StreamType::Binary);
 +
-                 self.indices.add_stream(stream, atom!("http_accept"), 7)
-                     .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
++                    self.indices.add_stream(stream, atom!("http_accept"), 7)
++                        .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
 +
-                 let stream = stream_as_cell!(stream);
++                    let stream: HeapCellValue = stream.into();
 +
-                                 let handle: TypedArenaPtr<HttpResponse> = arena_alloc!(request.response, &mut self.machine_st.arena);
++                    let handle: TypedArenaPtr<HttpResponse> = arena_alloc!(request.response, &mut self.machine_st.arena);
 +
-                                 self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
-                                 self.machine_st.bind(path.as_var().unwrap(), path_cell);
-                                 unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
-                                 self.machine_st.bind(query.as_var().unwrap(), query_cell);
-                                 self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
-                                 self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
-                 break
-                             }
++                    self.machine_st.bind(method.as_var().unwrap(), atom_as_cell!(method_atom));
++                    self.machine_st.bind(path.as_var().unwrap(), path_cell);
++                    unify!(self.machine_st, heap_loc_as_cell!(headers_list), self.machine_st.registers[4]);
++                    self.machine_st.bind(query.as_var().unwrap(), query_cell);
++                    self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
++                    self.machine_st.bind(handle_addr.as_var().unwrap(), typed_arena_ptr_as_cell!(handle));
++                    break
++                }
                      Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
                      let interrupted = machine::INTERRUPT.load(std::sync::atomic::Ordering::Relaxed);
  
  
          read_heap_cell!(culprit,
              (HeapCellValueTag::Cons, cons_ptr) => {
-             match_untyped_arena_ptr!(cons_ptr,
-                 (ArenaHeaderTag::HttpResponse, http_response) => {
-                 let mut stream = Stream::from_http_sender(
-                     http_response,
-                     status_code,
-                     headers,
-                     &mut self.machine_st.arena
-                 );
-                 *stream.options_mut() = StreamOptions::default();
-                 stream.options_mut().set_stream_type(StreamType::Binary);
+                 match_untyped_arena_ptr!(cons_ptr,
+                     (ArenaHeaderTag::HttpResponse, http_response) => {
+                         let mut stream = Stream::from_http_sender(
+                             http_response,
+                             status_code,
+                             headers,
+                             &mut self.machine_st.arena
+                         );
 +
-                 self.indices.add_stream(stream, atom!("http_answer"), 4)
-                     .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
+                         *stream.options_mut() = StreamOptions::default();
+                         stream.options_mut().set_stream_type(StreamType::Binary);
 -                        self.indices.streams.insert(stream);
 +
-                 let stream = stream_as_cell!(stream);
-                 self.machine_st.bind(stream_addr.as_var().unwrap(), stream);
-                 }
-                 _ => {
-                 unreachable!();
-                 }
-             );
++                        self.indices.add_stream(stream, atom!("http_answer"), 4)
++                            .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
+                         self.machine_st.bind(stream_addr.as_var().unwrap(), stream.into());
+                     }
+                     _ => {
+                         unreachable!();
+                     }
+                 );
              }
              _ => {
-             unreachable!();
+                 unreachable!();
              }
          );
  
  
                  *stream.options_mut() = options;
  
 -                if let Some(alias) = stream.options().get_alias() {
 -                    self.indices.stream_aliases.insert(alias, stream);
 -                }
 -
 -                self.indices.streams.insert(stream);
 +                self.indices
 +                    .add_stream(stream, atom!("socket_client_open"), 7)
 +                    .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
  
-                 stream_as_cell!(stream)
+                 HeapCellValue::from(stream)
              }
              Err(ErrorKind::PermissionDenied) => {
                  return Err(self.machine_st.open_permission_error(
  
                                   *tcp_stream.options_mut() = options;
  
 -                                 if let Some(alias) = &tcp_stream.options().get_alias() {
 -                                     self.indices.stream_aliases.insert(*alias, tcp_stream);
 -                                 }
 -
 -                                 self.indices.streams.insert(tcp_stream);
 +                                 self.indices.add_stream(tcp_stream, atom!("socket_server_accept"), 4)
 +                                    .map_err(|stub_gen| {
 +                                        stub_gen(&mut self.machine_st)
 +                                    })?;
  
-                                  let tcp_stream = stream_as_cell!(tcp_stream);
                                   let client = atom_as_cell!(client);
  
                                   let client_addr = self.deref_register(2);
  
              let addr = atom!("TLS");
              let stream = Stream::from_tls_stream(addr, stream, &mut self.machine_st.arena);
 -            self.indices.streams.insert(stream);
 +
 +            self.indices
 +                .add_stream(stream, atom!("tls_client_negotiate"), 3)
 +                .map_err(|stub_gen| stub_gen(&mut self.machine_st))?;
  
-             self.machine_st.heap.push(stream_as_cell!(stream));
+             // FIXME: why are we pushing a random, unreferenced cell on the heap?
+             self.machine_st.heap.push(stream.into());
              let stream_addr = self.deref_register(3);
              self.machine_st
-                 .bind(stream_addr.as_var().unwrap(), stream_as_cell!(stream));
+                 .bind(stream_addr.as_var().unwrap(), stream.into());
  
              Ok(())
          } else {
diff --cc src/macros.rs
Simple merge