]> Repositorios git - scryer-prolog.git/commitdiff
Fix at_end_of_stream/1 for http read stream
authorAdrián Arroyo Calle <[email protected]>
Sat, 6 Jul 2024 14:00:50 +0000 (16:00 +0200)
committerAdrián Arroyo Calle <[email protected]>
Sat, 6 Jul 2024 14:00:50 +0000 (16:00 +0200)
src/http.rs
src/machine/streams.rs
src/machine/system_calls.rs

index cb1f2d9c7e0930dc915c6289ab34f15a11a2ca9c..f301ef645e2b06c9cdcac4f0fb4385085739fa92 100644 (file)
@@ -1,4 +1,4 @@
-use std::io::BufRead;
+use bytes::{Bytes, buf::Reader};
 use std::sync::{Arc, Condvar, Mutex};
 
 use warp::http;
@@ -19,5 +19,5 @@ pub struct HttpRequestData {
     pub headers: http::HeaderMap,
     pub path: String,
     pub query: String,
-    pub body: Box<dyn BufRead + Send>,
+    pub body: Reader<Bytes>,
 }
index be5591cf166607b2b1e54ca72b8e6015ec86320c..ae1fcf3090c94a3fe3c1a2789c193ae47fbe9bba 100644 (file)
@@ -12,6 +12,7 @@ use crate::machine::machine_indices::*;
 use crate::machine::machine_state::*;
 use crate::types::*;
 
+use bytes::Buf;
 pub use scryer_modular_bitfield::prelude::*;
 
 use std::cmp::Ordering;
@@ -22,7 +23,7 @@ use std::fs::{File, OpenOptions};
 use std::hash::Hash;
 use std::io;
 #[cfg(feature = "http")]
-use std::io::BufRead;
+use bytes::{buf::Reader as BufReader, Bytes};
 use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
 use std::net::{Shutdown, TcpStream};
 use std::ops::{Deref, DerefMut};
@@ -274,7 +275,7 @@ impl Write for NamedTlsStream {
 #[cfg(feature = "http")]
 pub struct HttpReadStream {
     url: Atom,
-    body_reader: Box<dyn BufRead>,
+    body_reader: BufReader<Bytes>,
 }
 
 #[cfg(feature = "http")]
@@ -1115,6 +1116,13 @@ impl Stream {
                     }
                 }
             }
+           Stream::HttpRead(stream_layout) => {
+               if stream_layout.stream.get_ref().body_reader.get_ref().has_remaining() {
+                   AtEndOfStream::Not
+               } else {
+                   AtEndOfStream::Past
+               }
+           }
             _ => AtEndOfStream::Not,
         }
     }
@@ -1203,7 +1211,7 @@ impl Stream {
     #[inline]
     pub(crate) fn from_http_stream(
         url: Atom,
-        http_stream: Box<dyn BufRead>,
+        http_stream: BufReader<Bytes>,
         arena: &mut Arena,
     ) -> Self {
         Stream::HttpRead(arena_alloc!(
index 80b579e7918ee0c2d14359f030e50cd7f3055c55..9b11a6e337651d169f2bf931836fe069a9603593 100644 (file)
@@ -50,8 +50,6 @@ use std::env;
 use std::ffi::CString;
 use std::fs;
 use std::hash::{BuildHasher, BuildHasherDefault};
-#[cfg(feature = "http")]
-use std::io::BufRead;
 use std::io::{ErrorKind, Read, Write};
 use std::iter::{once, FromIterator};
 use std::mem;
@@ -4352,7 +4350,7 @@ impl Machine {
 
                     let mut stream = Stream::from_http_stream(
                         AtomTable::build_with(&self.machine_st.atom_tbl, &address_string),
-                        Box::new(reader),
+                        reader,
                         &mut self.machine_st.arena,
                     );
                     *stream.options_mut() = StreamOptions::default();
@@ -4447,11 +4445,7 @@ impl Machine {
             let runtime = tokio::runtime::Handle::current();
             let _guard = runtime.enter();
 
-            fn get_reader(body: impl Buf + Send + 'static) -> Box<dyn BufRead + Send> {
-                Box::new(body.reader())
-            }
-
-            let serve = warp::body::aggregate()
+            let serve = warp::body::bytes()
                 .and(warp::header::optional::<u64>(
                     warp::http::header::CONTENT_LENGTH.as_str(),
                 ))
@@ -4462,7 +4456,7 @@ impl Machine {
                     future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),)))
                 }))
                 .map(
-                    move |body,
+                    move |body: bytes::Bytes,
                           content_length,
                           method,
                           headers: warp::http::HeaderMap,
@@ -4482,7 +4476,7 @@ impl Machine {
                             headers,
                             path: path.as_str().to_string(),
                             query,
-                            body: get_reader(body),
+                            body: body.reader(),
                         };
                         let response =
                             Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));