Added automatic clearing of ratelimit keys
This commit is contained in:
parent
5ae6f578e3
commit
3e07e24e41
23
src/lib.rs
23
src/lib.rs
|
|
@ -7,8 +7,9 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
net::IpAddr,
|
|
||||||
};
|
};
|
||||||
|
#[cfg(feature = "ratelimiting")]
|
||||||
|
use std::net::IpAddr;
|
||||||
use futures_core::future::BoxFuture;
|
use futures_core::future::BoxFuture;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
prelude::*,
|
prelude::*,
|
||||||
|
|
@ -16,6 +17,8 @@ use tokio::{
|
||||||
net::{TcpStream, ToSocketAddrs},
|
net::{TcpStream, ToSocketAddrs},
|
||||||
time::timeout,
|
time::timeout,
|
||||||
};
|
};
|
||||||
|
#[cfg(feature = "ratelimiting")]
|
||||||
|
use tokio::time::interval;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use rustls::ClientCertVerifier;
|
use rustls::ClientCertVerifier;
|
||||||
use rustls::internal::msgs::handshake::DigitallySignedStruct;
|
use rustls::internal::msgs::handshake::DigitallySignedStruct;
|
||||||
|
|
@ -25,6 +28,7 @@ use anyhow::*;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use crate::util::opt_timeout;
|
use crate::util::opt_timeout;
|
||||||
use routing::RoutingNode;
|
use routing::RoutingNode;
|
||||||
|
#[cfg(feature = "ratelimiting")]
|
||||||
use ratelimiting::RateLimiter;
|
use ratelimiting::RateLimiter;
|
||||||
|
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
|
@ -60,6 +64,9 @@ impl Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn serve(self) -> Result<()> {
|
async fn serve(self) -> Result<()> {
|
||||||
|
#[cfg(feature = "ratelimiting")]
|
||||||
|
tokio::spawn(prune_ratelimit_log(self.rate_limits.clone()));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, _addr) = self.listener.accept().await
|
let (stream, _addr) = self.listener.accept().await
|
||||||
.context("Failed to accept client")?;
|
.context("Failed to accept client")?;
|
||||||
|
|
@ -434,6 +441,17 @@ async fn send_response_body(body: Body, stream: &mut (impl AsyncWrite + Unpin))
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature="ratelimiting")]
|
||||||
|
/// Every 5 minutes, remove excess keys from all ratelimiters
|
||||||
|
async fn prune_ratelimit_log(rate_limits: Arc<RoutingNode<RateLimiter<IpAddr>>>) -> Never {
|
||||||
|
let mut interval = interval(tokio::time::Duration::from_secs(10));
|
||||||
|
let log = rate_limits.as_ref();
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
log.iter().for_each(RateLimiter::trim_keys_verbose);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn tls_config(cert_path: &PathBuf, key_path: &PathBuf) -> Result<Arc<ServerConfig>> {
|
fn tls_config(cert_path: &PathBuf, key_path: &PathBuf) -> Result<Arc<ServerConfig>> {
|
||||||
let mut config = ServerConfig::new(AllowAnonOrSelfsignedClient::new());
|
let mut config = ServerConfig::new(AllowAnonOrSelfsignedClient::new());
|
||||||
|
|
||||||
|
|
@ -549,3 +567,6 @@ mod tests {
|
||||||
let _: &Mime = &GEMINI_MIME;
|
let _: &Mime = &GEMINI_MIME;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "ratelimiting")]
|
||||||
|
enum Never {}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
|
||||||
use std::{hash::Hash, collections::VecDeque, time::{Duration, Instant}};
|
use std::{fmt::Display, collections::VecDeque, hash::Hash, time::{Duration, Instant}};
|
||||||
|
|
||||||
/// A simple struct to manage rate limiting.
|
/// A simple struct to manage rate limiting.
|
||||||
///
|
///
|
||||||
|
|
@ -64,9 +64,43 @@ impl<K: Eq + Hash> RateLimiter<K> {
|
||||||
///
|
///
|
||||||
/// If you have many keys coming from a large set, you should infrequently call this
|
/// If you have many keys coming from a large set, you should infrequently call this
|
||||||
/// to prevent a memory leak.
|
/// to prevent a memory leak.
|
||||||
|
///
|
||||||
|
/// If debug level logging is enabled, this prints an *approximate* number of keys
|
||||||
|
/// removed to the log. For more precise output, use [`trim_keys_verbose()`]
|
||||||
|
///
|
||||||
|
/// [`trim_keys_verbose()`]: RateLimiter::trim_keys_verbose()
|
||||||
pub fn trim_keys(&self) {
|
pub fn trim_keys(&self) {
|
||||||
let count_after = Instant::now() - self.period;
|
let count_after = Instant::now() - self.period;
|
||||||
|
|
||||||
|
let len: isize = self.log.len() as isize;
|
||||||
self.log.retain(|_, conns| conns.back().unwrap() > &count_after);
|
self.log.retain(|_, conns| conns.back().unwrap() > &count_after);
|
||||||
|
let removed = len - self.log.len() as isize;
|
||||||
|
if removed.is_positive() {
|
||||||
|
debug!("Pruned approximately {} expired ratelimit keys", removed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K: Eq + Hash + Display> RateLimiter<K> {
|
||||||
|
|
||||||
|
/// Remove any expired keys from the ratelimiter
|
||||||
|
///
|
||||||
|
/// This only needs to be called if keys are continuously being added. If keys are
|
||||||
|
/// being reused, or come from a finite set, then you don't need to worry about this.
|
||||||
|
///
|
||||||
|
/// If you have many keys coming from a large set, you should infrequently call this
|
||||||
|
/// to prevent a memory leak.
|
||||||
|
///
|
||||||
|
/// If debug level logging is on, this prints out any removed keys.
|
||||||
|
pub fn trim_keys_verbose(&self) {
|
||||||
|
let count_after = Instant::now() - self.period;
|
||||||
|
|
||||||
|
self.log.retain(|ip, conns| {
|
||||||
|
let should_keep = conns.back().unwrap() > &count_after;
|
||||||
|
if !should_keep {
|
||||||
|
debug!("Pruned expired ratelimit key: {}", ip);
|
||||||
|
}
|
||||||
|
should_keep
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue