streamline send_response

This commit is contained in:
panicbit 2020-11-19 19:29:17 +01:00
parent aeeee86aae
commit 46ab84ba04
2 changed files with 49 additions and 74 deletions

View file

@ -21,6 +21,7 @@ use tokio_rustls::{rustls, TlsAcceptor};
use rustls::*;
use anyhow::*;
use lazy_static::lazy_static;
use crate::util::opt_timeout;
pub mod types;
pub mod util;
@ -113,91 +114,46 @@ impl Server {
let maybe_body = response.take_body();
let header = response.header();
// Okay, I know this method looks really complicated, but I promise it's not.
// There's really only three things this method does:
//
// * Send the response header
// * Send the response body
// * Flush the stream
//
// All the other code is doing one of two things. Either it's
//
// * code to add and handle timeouts (that's what all the async blocks and calls
// to timeout are), or
// * logic to decide whether to use the special case timeout handling (seperate
// timeouts for the header and the body) vs the normal timeout handling (header,
// body, and flush all as one timeout)
//
// The split between the two cases happens at this very first if block.
// Everything in this if is for the special case. If any one of the ifs fails,
// the code after the big if block is run, and that's the normal case.
//
// Hope this helps! Emi <3
if header.status == Status::SUCCESS &&
let use_complex_timeout =
header.status.is_success() &&
maybe_body.is_some() &&
header.meta.as_str() != "text/plain" &&
header.meta.as_str() != "text/gemini"
{
if let Some(cplx_timeout) = self.complex_timeout {
header.meta.as_str() != "text/gemini" &&
self.complex_timeout.is_some();
let send_general_timeout;
let send_header_timeout;
let send_body_timeout;
////////////// Use the special case timeout override /////////////////////////////
// Send the header & flush
let fut_send_header = async {
send_response_header(response.header(), stream).await
.context("Failed to write response header")?;
stream.flush()
.await
.context("Failed to flush response header")
};
timeout(self.timeout, fut_send_header)
.await
.context("Timed out while sending response header")??;
// Send the body & flush
let fut_send_body = async {
send_response_body(maybe_body.unwrap(), stream).await
.context("Failed to write response body")?;
stream.flush()
.await
.context("Failed to flush response body")
};
timeout(cplx_timeout, fut_send_body)
.await
.context("Timed out while sending response body")??;
return Ok(())
}
if use_complex_timeout {
send_general_timeout = None;
send_header_timeout = Some(self.timeout);
send_body_timeout = self.complex_timeout;
} else {
send_general_timeout = Some(self.timeout);
send_header_timeout = None;
send_body_timeout = None;
}
///////////// Use the normal timeout /////////////////////////////////////////////
let fut_send_response = async {
send_response_header(response.header(), stream).await
opt_timeout(send_general_timeout, async {
// Send the header
opt_timeout(send_header_timeout, send_response_header(response.header(), stream))
.await
.context("Timed out while sending response header")?
.context("Failed to write response header")?;
if let Some(body) = maybe_body {
send_response_body(body, stream).await
.context("Failed to write response body")?;
}
stream.flush()
// Send the body
opt_timeout(send_body_timeout, maybe_send_response_body(maybe_body, stream))
.await
.context("Failed to flush response data")
};
timeout(self.timeout, fut_send_response)
.await
.context("Timed out while sending response data")??;
.context("Timed out while sending response body")?
.context("Failed to write response body")?;
Ok::<_,Error>(())
})
.await
.context("Timed out while sending response data")??;
Ok(())
//////////////////////////////////////////////////////////////////////////////////
}
}
@ -377,6 +333,15 @@ async fn send_response_header(header: &ResponseHeader, stream: &mut (impl AsyncW
);
stream.write_all(header.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
async fn maybe_send_response_body(maybe_body: Option<Body>, stream: &mut (impl AsyncWrite + Unpin)) -> Result<()> {
if let Some(body) = maybe_body {
send_response_body(body, stream).await?;
}
Ok(())
}
@ -387,6 +352,8 @@ async fn send_response_body(body: Body, stream: &mut (impl AsyncWrite + Unpin))
Body::Reader(mut reader) => { io::copy(&mut reader, stream).await?; },
}
stream.flush().await?;
Ok(())
}

View file

@ -14,6 +14,7 @@ use crate::types::Response;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::task::Poll;
use futures_core::future::Future;
use tokio::time;
#[cfg(feature="serve_dir")]
pub async fn serve_file<P: AsRef<Path>>(path: P, mime: &Mime) -> Result<Response> {
@ -155,3 +156,10 @@ impl Future for HandlerCatchUnwind {
}
}
}
pub(crate) async fn opt_timeout<T>(duration: Option<time::Duration>, future: impl Future<Output = T>) -> Result<T, time::error::Elapsed> {
match duration {
Some(duration) => time::timeout(duration, future).await,
None => Ok(future.await),
}
}