Skip to content

Commit

Permalink
submap formula support
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed May 26, 2024
1 parent 0dcd0a7 commit 528e51f
Showing 1 changed file with 112 additions and 16 deletions.
128 changes: 112 additions & 16 deletions src/submap.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::str::Split;

use crate::mkmf::{Formula, MapKeysMatchFormula as _};
#[allow(clippy::wildcard_imports)]
use crate::types::*;

#[derive(Debug, Clone)]
struct Subscription<C> {
subscribers: Set<C>,
subtopics: Map<String, Subscription<C>>,
subtopics_by_formula: Map<Formula, Subscription<C>>,
subtopics_any: Option<Box<Subscription<C>>>, // ?
sub_any: Set<C>, // *
}
Expand All @@ -16,6 +18,7 @@ impl<C> Default for Subscription<C> {
Self {
subscribers: <_>::default(),
subtopics: <_>::default(),
subtopics_by_formula: <_>::default(),
subtopics_any: None,
sub_any: <_>::default(),
}
Expand All @@ -38,6 +41,7 @@ pub struct SubMap<C> {
subscribed_topics: Map<C, Set<String>>,
subscription_count: usize,
separator: char,
formula_prefix: Option<char>,
match_any: Set<String>,
wildcard: Set<String>,
}
Expand All @@ -49,6 +53,7 @@ impl<C> Default for SubMap<C> {
subscribed_topics: <_>::default(),
subscription_count: 0,
separator: '/',
formula_prefix: None,
match_any: vec!["?".to_owned()].into_iter().collect(),
wildcard: vec!["*".to_owned()].into_iter().collect(),
}
Expand All @@ -69,6 +74,11 @@ where
self
}
#[inline]
pub fn formula_prefix(mut self, prefix: char) -> Self {
self.formula_prefix = Some(prefix);
self
}
#[inline]
pub fn wildcard(mut self, wildcard: &str) -> Self {
self.wildcard = vec![wildcard.to_owned()].into_iter().collect();
self
Expand Down Expand Up @@ -121,6 +131,7 @@ where
client,
&self.wildcard,
&self.match_any,
self.formula_prefix,
);
self.subscription_count -= 1;
}
Expand All @@ -140,6 +151,7 @@ where
client,
&self.wildcard,
&self.match_any,
self.formula_prefix,
);
client_topics.insert(topic.to_owned());
self.subscription_count += 1;
Expand All @@ -158,6 +170,7 @@ where
client,
&self.wildcard,
&self.match_any,
self.formula_prefix,
);
client_topics.remove(topic);
self.subscription_count -= 1;
Expand All @@ -174,6 +187,7 @@ where
client,
&self.wildcard,
&self.match_any,
self.formula_prefix,
);
self.subscription_count -= 1;
}
Expand All @@ -189,13 +203,18 @@ where
get_subscribers_rec(
&self.subscriptions,
topic.split(self.separator),
self.formula_prefix,
&mut result,
);
result
}
#[inline]
pub fn is_subscribed(&self, topic: &str) -> bool {
is_subscribed_rec(&self.subscriptions, topic.split(self.separator))
is_subscribed_rec(
&self.subscriptions,
self.formula_prefix,
topic.split(self.separator),
)
}
#[inline]
pub fn subscription_count(&self) -> usize {
Expand All @@ -213,6 +232,7 @@ fn subscribe_rec<C>(
client: &C,
wildcard: &Set<String>,
match_any: &Set<String>,
formula_prefix: Option<char>,
) where
C: Client,
{
Expand All @@ -221,17 +241,30 @@ fn subscribe_rec<C>(
subscription.sub_any.insert(client.clone());
} else if match_any.contains(topic) {
if let Some(ref mut sub) = subscription.subtopics_any {
subscribe_rec(sub, sp, client, wildcard, match_any);
subscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
} else {
let mut sub = Subscription::default();
subscribe_rec(&mut sub, sp, client, wildcard, match_any);
subscribe_rec(&mut sub, sp, client, wildcard, match_any, formula_prefix);
subscription.subtopics_any = Some(Box::new(sub));
}
} else if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
let Ok(formula_parsed) = formula.parse::<Formula>() else {
return;
};
if let Some(sub) = subscription.subtopics_by_formula.get_mut(&formula_parsed) {
subscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
} else {
let mut sub = Subscription::default();
subscribe_rec(&mut sub, sp, client, wildcard, match_any, formula_prefix);
subscription
.subtopics_by_formula
.insert(formula_parsed, sub);
}
} else if let Some(sub) = subscription.subtopics.get_mut(topic) {
subscribe_rec(sub, sp, client, wildcard, match_any);
subscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
} else {
let mut sub = Subscription::default();
subscribe_rec(&mut sub, sp, client, wildcard, match_any);
subscribe_rec(&mut sub, sp, client, wildcard, match_any, formula_prefix);
subscription.subtopics.insert(topic.to_owned(), sub);
}
} else {
Expand All @@ -245,6 +278,7 @@ fn unsubscribe_rec<C>(
client: &C,
wildcard: &Set<String>,
match_any: &Set<String>,
formula_prefix: Option<char>,
) where
C: Client,
{
Expand All @@ -253,13 +287,23 @@ fn unsubscribe_rec<C>(
subscription.sub_any.remove(client);
} else if match_any.contains(topic) {
if let Some(ref mut sub) = subscription.subtopics_any {
unsubscribe_rec(sub, sp, client, wildcard, match_any);
unsubscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
if sub.is_empty() {
subscription.subtopics_any = None;
}
}
} else if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
let Ok(formula_parsed) = formula.parse::<Formula>() else {
return;
};
if let Some(sub) = subscription.subtopics_by_formula.get_mut(&formula_parsed) {
unsubscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
if sub.is_empty() {
subscription.subtopics_by_formula.remove(&formula_parsed);
}
}
} else if let Some(sub) = subscription.subtopics.get_mut(topic) {
unsubscribe_rec(sub, sp, client, wildcard, match_any);
unsubscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
if sub.is_empty() {
subscription.subtopics.remove(topic);
}
Expand All @@ -269,38 +313,70 @@ fn unsubscribe_rec<C>(
}
}

fn get_subscribers_rec<C>(subscription: &Subscription<C>, mut sp: Split<char>, result: &mut Set<C>)
where
fn get_subscribers_rec<C>(
subscription: &Subscription<C>,
mut sp: Split<char>,
formula_prefix: Option<char>,
result: &mut Set<C>,
) where
C: Client,
{
if let Some(topic) = sp.next() {
result.extend(subscription.sub_any.clone());
if let Some(sub) = subscription.subtopics.get(topic) {
get_subscribers_rec(sub, sp.clone(), result);
if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
for sub in subscription.subtopics.values_match_key_formula(formula) {
get_subscribers_rec(sub, sp.clone(), formula_prefix, result);
}
} else if let Some(sub) = subscription.subtopics.get(topic) {
get_subscribers_rec(sub, sp.clone(), formula_prefix, result);
}
if !subscription.subtopics_by_formula.is_empty() {
for (formula, sub) in &subscription.subtopics_by_formula {
if formula.matches(topic) {
get_subscribers_rec(sub, sp.clone(), formula_prefix, result);
}
}
}
if let Some(ref sub) = subscription.subtopics_any {
get_subscribers_rec(sub, sp, result);
get_subscribers_rec(sub, sp, formula_prefix, result);
}
} else {
result.extend(subscription.subscribers.clone());
}
}

fn is_subscribed_rec<C>(subscription: &Subscription<C>, mut sp: Split<char>) -> bool
fn is_subscribed_rec<C>(
subscription: &Subscription<C>,
formula_prefix: Option<char>,
mut sp: Split<char>,
) -> bool
where
C: Ord + Eq + Clone,
{
if let Some(topic) = sp.next() {
if !subscription.sub_any.is_empty() {
return true;
}
if let Some(sub) = subscription.subtopics.get(topic) {
if is_subscribed_rec(sub, sp.clone()) {
if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
for sub in subscription.subtopics.values_match_key_formula(formula) {
if is_subscribed_rec(sub, formula_prefix, sp.clone()) {
return true;
}
}
} else if let Some(sub) = subscription.subtopics.get(topic) {
if is_subscribed_rec(sub, formula_prefix, sp.clone()) {
return true;
}
}
if !subscription.subtopics_by_formula.is_empty() {
for (formula, sub) in &subscription.subtopics_by_formula {
if formula.matches(topic) && is_subscribed_rec(sub, formula_prefix, sp.clone()) {
return true;
}
}
}
if let Some(ref sub) = subscription.subtopics_any {
if is_subscribed_rec(sub, sp) {
if is_subscribed_rec(sub, formula_prefix, sp) {
return true;
}
}
Expand Down Expand Up @@ -393,4 +469,24 @@ mod test {
smap.subscribe("+/zzz/+/222", &client1);
assert_eq!(smap.get_subscribers("unix/zzz/xxx/222").len(), 1);
}
#[test]
fn test_match_formula() {
let mut smap: SubMap<String> = SubMap::new()
.match_any("+")
.wildcard("#")
.formula_prefix('!');
let client1 = "client1".to_owned();
smap.register_client(&client1);
assert_eq!(smap.get_subscribers("1/xxx").len(), 0);
smap.subscribe("!gt(1)/xxx", &client1);
assert_eq!(smap.get_subscribers("1/xxx").len(), 0);
assert_eq!(smap.get_subscribers("2/xxx").len(), 1);
assert_eq!(smap.get_subscribers("unix/zzz/95/222").len(), 0);
assert_eq!(smap.get_subscribers("unix/zzz/96/222").len(), 0);
assert_eq!(smap.get_subscribers("unix/zzz/97/222").len(), 0);
smap.subscribe("+/zzz/!ge(96)/222", &client1);
assert_eq!(smap.get_subscribers("unix/zzz/95/222").len(), 0);
assert_eq!(smap.get_subscribers("unix/zzz/96/222").len(), 1);
assert_eq!(smap.get_subscribers("unix/zzz/97/222").len(), 1);
}
}

0 comments on commit 528e51f

Please sign in to comment.