diff --git a/core/src/clocks.rs b/core/src/clocks.rs index 63acee9..ba57c61 100644 --- a/core/src/clocks.rs +++ b/core/src/clocks.rs @@ -46,6 +46,7 @@ impl Clock for SystemClock { pub mod testutils { use super::*; use std::sync::atomic::{AtomicU64, Ordering}; + use std::time::Duration; use time::{Date, Month, Time}; /// A clock that returns a monotonically increasing instant every time it is queried. @@ -68,7 +69,41 @@ pub mod testutils { } } + /// A clock that returns a preconfigured instant and that can be modified at will. + /// + /// Only supports second-level precision. + pub struct SettableClock { + /// Current fake time. + now: AtomicU64, + } + + impl SettableClock { + /// Creates a new clock that returns `now` until reconfigured with `set`. + pub fn new(now: OffsetDateTime) -> Self { + Self { now: AtomicU64::new(now.unix_timestamp() as u64) } + } + + /// Sets the new value of `now` that the clock returns. + pub fn set(&self, now: OffsetDateTime) { + self.now.store(now.unix_timestamp() as u64, Ordering::SeqCst); + } + + /// Advances the current time by `delta`. + pub fn advance(&self, delta: Duration) { + let seconds = delta.as_secs(); + self.now.fetch_add(seconds, Ordering::SeqCst); + } + } + + impl Clock for SettableClock { + fn now_utc(&self) -> OffsetDateTime { + let now = self.now.load(Ordering::SeqCst); + OffsetDateTime::from_unix_timestamp(now as i64).unwrap() + } + } + /// Creates an `OffsetDateTime` with the given values, assuming UTC. + // TODO(jmmv): Remove in favor of the datetime!() macro from the time crate. pub fn utc_datetime( year: i32, month: u8, diff --git a/geo/Cargo.toml b/geo/Cargo.toml index a897ebf..9d78635 100644 --- a/geo/Cargo.toml +++ b/geo/Cargo.toml @@ -15,15 +15,17 @@ async-trait = "0.1" bytes = "1.0" derivative = "2.2" futures = "0.3" +iii-iv-core = { path = "../core" } log = "0.4" lru_time_cache = "0.11" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] } serde = "1" serde_json = "1" -iii-iv-core = { path = "../core" } +time = "0.3" [dev-dependencies] iii-iv-core = { path = "../core", features = ["testutils"] } serde_test = "1" temp-env = "0.3.2" +time = { version = "0.3", features = ["macros"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } diff --git a/geo/src/counter.rs b/geo/src/counter.rs new file mode 100644 index 0000000..7f99c9a --- /dev/null +++ b/geo/src/counter.rs @@ -0,0 +1,122 @@ +// III-IV +// Copyright 2023 Julio Merino +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy +// of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +//! Counter of requests for a period of time. + +use iii_iv_core::clocks::Clock; +use std::{sync::Arc, time::Duration}; + +/// Counts the number of requests over the last minute with second resolution. +pub(crate) struct RequestCounter { + /// Clock to obtain the current time from. + clock: Arc, + + /// Tracker of per-second counts within a minute. + /// + /// Each pair contains the timestamp of the ith second in the array and + /// the counter of requests at that second. + counts: [(i64, u16); 60], +} + +impl RequestCounter { + /// Creates a new request counter backed by `clock`. + pub(crate) fn new(clock: Arc) -> Self { + Self { clock, counts: [(0, 0); 60] } + } + + /// Adds a request to the counter at the current time. + pub(crate) fn account(&mut self) { + let now = self.clock.now_utc(); + let i = usize::from(now.second()) % 60; + let (ts, count) = self.counts[i]; + if ts == now.unix_timestamp() { + self.counts[i] = (ts, count + 1); + } else { + self.counts[i] = (now.unix_timestamp(), 1); + } + } + + /// Counts the number of requests during the last minute. + pub(crate) fn last_minute(&self) -> usize { + let now = self.clock.now_utc(); + let since = (now - Duration::from_secs(60)).unix_timestamp(); + + let mut total = 0; + for (ts, count) in self.counts { + if ts > since { + total += usize::from(count); + } + } + total + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iii_iv_core::clocks::testutils::SettableClock; + use std::time::Duration; + use time::macros::datetime; + + #[test] + fn test_continuous() { + let clock = Arc::from(SettableClock::new(datetime!(2023-09-26 18:20:15 UTC))); + let mut counter = RequestCounter::new(clock.clone()); + + assert_eq!(0, counter.last_minute()); + for i in 0..60 { + clock.advance(Duration::from_secs(1)); + counter.account(); + counter.account(); + assert_eq!((i + 1) * 2, counter.last_minute()); + } + assert_eq!(120, counter.last_minute()); + for i in 0..60 { + clock.advance(Duration::from_secs(1)); + counter.account(); + assert_eq!(120 - (i + 1), counter.last_minute()); + } + assert_eq!(60, counter.last_minute()); + for i in 0..60 { + clock.advance(Duration::from_secs(1)); + assert_eq!(60 - (i + 1), counter.last_minute()); + } + assert_eq!(0, counter.last_minute()); + } + + #[test] + fn test_gaps() { + let clock = Arc::from(SettableClock::new(datetime!(2023-09-26 17:20:56 UTC))); + let mut counter = RequestCounter::new(clock.clone()); + + assert_eq!(0, counter.last_minute()); + for _ in 0..1000 { + counter.account(); + } + assert_eq!(1000, counter.last_minute()); + + clock.advance(Duration::from_secs(30)); + counter.account(); + assert_eq!(1001, counter.last_minute()); + + clock.advance(Duration::from_secs(29)); + counter.account(); + assert_eq!(1002, counter.last_minute()); + + clock.advance(Duration::from_secs(1)); + counter.account(); + assert_eq!(3, counter.last_minute()); + } +} diff --git a/geo/src/ipapi.rs b/geo/src/ipapi.rs new file mode 100644 index 0000000..fb4c96c --- /dev/null +++ b/geo/src/ipapi.rs @@ -0,0 +1,267 @@ +// III-IV +// Copyright 2023 Julio Merino +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy +// of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +//! Geolocation API implementation backed by IP-API. + +use crate::counter::RequestCounter; +use crate::{CountryIsoCode, GeoLocator, GeoResult}; +use async_trait::async_trait; +use bytes::Buf; +use futures::lock::Mutex; +use iii_iv_core::clocks::Clock; +use log::warn; +use reqwest::{Client, Response, StatusCode}; +use serde::Deserialize; +use std::io; +use std::net::IpAddr; +use std::sync::Arc; +use std::time::Duration; +use time::OffsetDateTime; + +/// Delay until retrying queries when we receive a 429 response. +const BACKOFF_SECS: u64 = 30; + +/// Maximum number of requests per minute allowed at the free tier. +const MAX_REQUESTS_PER_MINUTE: usize = 45; + +/// Converts a `reqwest::Error` to an `io::Error`. +fn reqwest_error_to_io_error(e: reqwest::Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, format!("{}", e)) +} + +/// Converts a `reqwest::Response` to an `io::Error`. The response should have a non-OK status. +async fn http_response_to_io_error(response: Response) -> io::Error { + let status = response.status(); + + let kind = match status { + StatusCode::OK => panic!("Should not have been called on a successful request"), + + // Match against the codes we know the server explicitly hands us. + StatusCode::BAD_REQUEST => io::ErrorKind::InvalidInput, + StatusCode::UNAUTHORIZED => io::ErrorKind::PermissionDenied, + StatusCode::FORBIDDEN => io::ErrorKind::PermissionDenied, + StatusCode::NOT_FOUND => io::ErrorKind::NotFound, + StatusCode::INTERNAL_SERVER_ERROR => io::ErrorKind::Other, + + // Special status code handling when exceeding the default free quota. + StatusCode::TOO_MANY_REQUESTS => io::ErrorKind::ConnectionRefused, + + _ => io::ErrorKind::Other, + }; + + match response.text().await { + Ok(text) => io::Error::new( + kind, + format!("HTTP request returned status {} with text '{}'", status, text), + ), + Err(e) => io::Error::new( + kind, + format!("HTTP request returned status {} and failed to get text due to {}", status, e), + ), + } +} + +/// Response from the IP-API service on a successful request. +#[derive(Deserialize)] +struct QueryResponse { + /// Whether the query succeeded (`success`) or failed (`fail`). + status: String, + + /// Error message when the status is `fail`. + message: Option, + + /// Country code when the status is `success`. + #[serde(rename = "countryCode")] + country_code: Option, +} + +/// Geolocator that uses an IP-API account in the free tier. +/// +/// Because the free tier has per-minute limits, this geolocator requires backing by another +/// geolocator to which delegate requests when the limits are reached. +#[derive(Clone)] +pub struct FreeIpApiGeoLocator { + /// Asynchronous HTTP client with which to issue the service requests. + client: Client, + + /// The clock used to query the current time. + clock: Arc, + + /// The time to wait until issuing more requests, in case we are backing off. + backoff_until: Arc>, + + /// The number of requests in the current minute. + counter: Arc>, + + /// The geolocator to delegate to when we have detected overload. + delegee: G, +} + +impl FreeIpApiGeoLocator { + /// Creates a new IP-API-backed geolocator using `opts` for configuration. + pub fn new(clock: Arc, delegee: G) -> Self { + Self { + client: Client::default(), + clock: clock.clone(), + backoff_until: Arc::from(Mutex::from(OffsetDateTime::UNIX_EPOCH)), + counter: Arc::from(Mutex::from(RequestCounter::new(clock))), + delegee, + } + } + + async fn locate_raw(&self, ip: &IpAddr) -> GeoResult> { + let request = format!("http://ip-api.com/json/{}?fields=status,message,countryCode", ip); + + let response = self.client.get(&request).send().await.map_err(reqwest_error_to_io_error)?; + match response.status() { + StatusCode::OK => { + let bytes = response.bytes().await.map_err(reqwest_error_to_io_error)?; + let response: QueryResponse = serde_json::from_reader(bytes.reader())?; + + match response.status.as_ref() { + "success" => match response.country_code { + Some(country_code) => Ok(Some(CountryIsoCode::new(country_code)?)), + None => Ok(None), + }, + + "fail" => match response.message.as_deref() { + Some("private range") | Some("reserved range") => Ok(None), + message => Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Query {} returned failure: {}", + request, + message.unwrap_or("No message") + ), + )), + }, + + status => Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Query {} returned invalid status {}: {}", + request, + status, + response.message.as_deref().unwrap_or("No message") + ), + )), + } + } + _ => Err(http_response_to_io_error(response).await), + } + } +} + +#[async_trait] +impl GeoLocator for FreeIpApiGeoLocator +where + G: GeoLocator + Send + Sync, +{ + async fn locate(&self, ip: &IpAddr) -> GeoResult> { + { + let now = self.clock.now_utc(); + + let mut backoff_until = self.backoff_until.lock().await; + if *backoff_until < now { + let mut counter = self.counter.lock().await; + if counter.last_minute() < MAX_REQUESTS_PER_MINUTE { + counter.account(); + + match self.locate_raw(ip).await { + Ok(result) => return Ok(result), + Err(e) if e.kind() == io::ErrorKind::ConnectionRefused => { + warn!("IP-API returned 429; falling back to delegee"); + *backoff_until = now + Duration::from_secs(BACKOFF_SECS); + } + Err(e) => return Err(e), + } + } else { + warn!("Out of quota; falling back to delegee"); + } + } + } + + self.delegee.locate(ip).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::MockGeoLocator; + use iii_iv_core::clocks::testutils::SettableClock; + use time::macros::datetime; + + const IP_IN_ES: &str = "212.170.36.79"; + const IP_IN_IE: &str = "185.2.66.42"; + const IP_IN_US: &str = "2001:4898:80e8:3c::"; + const FAKE_DATA: &[(&str, &str)] = &[(IP_IN_ES, "XX"), (IP_IN_IE, "XX"), (IP_IN_US, "XX")]; + + #[tokio::test] + #[ignore = "Talks to an external service"] + async fn test_ok() { + let clock = Arc::from(SettableClock::new(datetime!(2023-09-23 22:00:00 UTC))); + let geolocator = FreeIpApiGeoLocator::new(clock, MockGeoLocator::new(FAKE_DATA)); + assert_eq!( + "ES", + geolocator.locate(&IP_IN_ES.parse().unwrap()).await.unwrap().unwrap().as_str() + ); + assert_eq!( + "IE", + geolocator.locate(&IP_IN_IE.parse().unwrap()).await.unwrap().unwrap().as_str() + ); + assert_eq!( + "US", + geolocator.locate(&IP_IN_US.parse().unwrap()).await.unwrap().unwrap().as_str() + ); + } + + #[tokio::test] + #[ignore = "Talks to an external service"] + async fn test_backoff_by_counter() { + let clock = Arc::from(SettableClock::new(datetime!(2023-09-23 22:00:00 UTC))); + let geolocator = FreeIpApiGeoLocator::new(clock.clone(), MockGeoLocator::new(FAKE_DATA)); + { + let mut counter = geolocator.counter.lock().await; + for _ in 0..MAX_REQUESTS_PER_MINUTE { + counter.account(); + } + } + assert_eq!( + "XX", + geolocator.locate(&IP_IN_ES.parse().unwrap()).await.unwrap().unwrap().as_str() + ); + + clock.advance(Duration::from_secs(50)); + assert_eq!( + "XX", + geolocator.locate(&IP_IN_ES.parse().unwrap()).await.unwrap().unwrap().as_str() + ); + + clock.advance(Duration::from_secs(10)); + assert_eq!( + "ES", + geolocator.locate(&IP_IN_ES.parse().unwrap()).await.unwrap().unwrap().as_str() + ); + } + + #[tokio::test] + #[ignore = "Talks to an external service"] + async fn test_missing() { + let clock = Arc::from(SettableClock::new(datetime!(2023-09-23 22:00:00 UTC))); + let geolocator = FreeIpApiGeoLocator::new(clock, MockGeoLocator::new(FAKE_DATA)); + assert_eq!(None, geolocator.locate(&"198.18.0.1".parse().unwrap()).await.unwrap()); + } +} diff --git a/geo/src/lib.rs b/geo/src/lib.rs index 151a927..b1c4636 100644 --- a/geo/src/lib.rs +++ b/geo/src/lib.rs @@ -30,6 +30,9 @@ mod azure; pub use azure::{AzureGeoLocator, AzureGeoLocatorOptions}; mod caching; pub use caching::{CachingGeoLocator, CachingGeoLocatorOptions}; +pub(crate) mod counter; +mod ipapi; +pub use ipapi::FreeIpApiGeoLocator; #[cfg(any(test, feature = "testutils"))] mod mock; #[cfg(any(test, feature = "testutils"))]