From 528e51f50c5dc212f329f1a25d7b3cd859525666 Mon Sep 17 00:00:00 2001
From: Serhij S
Date: Sun, 26 May 2024 22:00:59 +0200
Subject: [PATCH] submap formula support
---
src/submap.rs | 128 +++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 112 insertions(+), 16 deletions(-)
diff --git a/src/submap.rs b/src/submap.rs
index 1129731..ee8efcf 100644
--- a/src/submap.rs
+++ b/src/submap.rs
@@ -1,5 +1,6 @@
use std::str::Split;
+use crate::mkmf::{Formula, MapKeysMatchFormula as _};
#[allow(clippy::wildcard_imports)]
use crate::types::*;
@@ -7,6 +8,7 @@ use crate::types::*;
struct Subscription {
subscribers: Set,
subtopics: Map>,
+ subtopics_by_formula: Map>,
subtopics_any: Option>>, // ?
sub_any: Set, // *
}
@@ -16,6 +18,7 @@ impl Default for Subscription {
Self {
subscribers: <_>::default(),
subtopics: <_>::default(),
+ subtopics_by_formula: <_>::default(),
subtopics_any: None,
sub_any: <_>::default(),
}
@@ -38,6 +41,7 @@ pub struct SubMap {
subscribed_topics: Map>,
subscription_count: usize,
separator: char,
+ formula_prefix: Option,
match_any: Set,
wildcard: Set,
}
@@ -49,6 +53,7 @@ impl Default for SubMap {
subscribed_topics: <_>::default(),
subscription_count: 0,
separator: '/',
+ formula_prefix: None,
match_any: vec!["?".to_owned()].into_iter().collect(),
wildcard: vec!["*".to_owned()].into_iter().collect(),
}
@@ -69,6 +74,11 @@ where
self
}
#[inline]
+ pub fn formula_prefix(mut self, prefix: char) -> Self {
+ self.formula_prefix = Some(prefix);
+ self
+ }
+ #[inline]
pub fn wildcard(mut self, wildcard: &str) -> Self {
self.wildcard = vec![wildcard.to_owned()].into_iter().collect();
self
@@ -121,6 +131,7 @@ where
client,
&self.wildcard,
&self.match_any,
+ self.formula_prefix,
);
self.subscription_count -= 1;
}
@@ -140,6 +151,7 @@ where
client,
&self.wildcard,
&self.match_any,
+ self.formula_prefix,
);
client_topics.insert(topic.to_owned());
self.subscription_count += 1;
@@ -158,6 +170,7 @@ where
client,
&self.wildcard,
&self.match_any,
+ self.formula_prefix,
);
client_topics.remove(topic);
self.subscription_count -= 1;
@@ -174,6 +187,7 @@ where
client,
&self.wildcard,
&self.match_any,
+ self.formula_prefix,
);
self.subscription_count -= 1;
}
@@ -189,13 +203,18 @@ where
get_subscribers_rec(
&self.subscriptions,
topic.split(self.separator),
+ self.formula_prefix,
&mut result,
);
result
}
#[inline]
pub fn is_subscribed(&self, topic: &str) -> bool {
- is_subscribed_rec(&self.subscriptions, topic.split(self.separator))
+ is_subscribed_rec(
+ &self.subscriptions,
+ self.formula_prefix,
+ topic.split(self.separator),
+ )
}
#[inline]
pub fn subscription_count(&self) -> usize {
@@ -213,6 +232,7 @@ fn subscribe_rec(
client: &C,
wildcard: &Set,
match_any: &Set,
+ formula_prefix: Option,
) where
C: Client,
{
@@ -221,17 +241,30 @@ fn subscribe_rec(
subscription.sub_any.insert(client.clone());
} else if match_any.contains(topic) {
if let Some(ref mut sub) = subscription.subtopics_any {
- subscribe_rec(sub, sp, client, wildcard, match_any);
+ subscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
} else {
let mut sub = Subscription::default();
- subscribe_rec(&mut sub, sp, client, wildcard, match_any);
+ subscribe_rec(&mut sub, sp, client, wildcard, match_any, formula_prefix);
subscription.subtopics_any = Some(Box::new(sub));
}
+ } else if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
+ let Ok(formula_parsed) = formula.parse::() else {
+ return;
+ };
+ if let Some(sub) = subscription.subtopics_by_formula.get_mut(&formula_parsed) {
+ subscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
+ } else {
+ let mut sub = Subscription::default();
+ subscribe_rec(&mut sub, sp, client, wildcard, match_any, formula_prefix);
+ subscription
+ .subtopics_by_formula
+ .insert(formula_parsed, sub);
+ }
} else if let Some(sub) = subscription.subtopics.get_mut(topic) {
- subscribe_rec(sub, sp, client, wildcard, match_any);
+ subscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
} else {
let mut sub = Subscription::default();
- subscribe_rec(&mut sub, sp, client, wildcard, match_any);
+ subscribe_rec(&mut sub, sp, client, wildcard, match_any, formula_prefix);
subscription.subtopics.insert(topic.to_owned(), sub);
}
} else {
@@ -245,6 +278,7 @@ fn unsubscribe_rec(
client: &C,
wildcard: &Set,
match_any: &Set,
+ formula_prefix: Option,
) where
C: Client,
{
@@ -253,13 +287,23 @@ fn unsubscribe_rec(
subscription.sub_any.remove(client);
} else if match_any.contains(topic) {
if let Some(ref mut sub) = subscription.subtopics_any {
- unsubscribe_rec(sub, sp, client, wildcard, match_any);
+ unsubscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
if sub.is_empty() {
subscription.subtopics_any = None;
}
}
+ } else if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
+ let Ok(formula_parsed) = formula.parse::() else {
+ return;
+ };
+ if let Some(sub) = subscription.subtopics_by_formula.get_mut(&formula_parsed) {
+ unsubscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
+ if sub.is_empty() {
+ subscription.subtopics_by_formula.remove(&formula_parsed);
+ }
+ }
} else if let Some(sub) = subscription.subtopics.get_mut(topic) {
- unsubscribe_rec(sub, sp, client, wildcard, match_any);
+ unsubscribe_rec(sub, sp, client, wildcard, match_any, formula_prefix);
if sub.is_empty() {
subscription.subtopics.remove(topic);
}
@@ -269,24 +313,43 @@ fn unsubscribe_rec(
}
}
-fn get_subscribers_rec(subscription: &Subscription, mut sp: Split, result: &mut Set)
-where
+fn get_subscribers_rec(
+ subscription: &Subscription,
+ mut sp: Split,
+ formula_prefix: Option,
+ result: &mut Set,
+) where
C: Client,
{
if let Some(topic) = sp.next() {
result.extend(subscription.sub_any.clone());
- if let Some(sub) = subscription.subtopics.get(topic) {
- get_subscribers_rec(sub, sp.clone(), result);
+ if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
+ for sub in subscription.subtopics.values_match_key_formula(formula) {
+ get_subscribers_rec(sub, sp.clone(), formula_prefix, result);
+ }
+ } else if let Some(sub) = subscription.subtopics.get(topic) {
+ get_subscribers_rec(sub, sp.clone(), formula_prefix, result);
+ }
+ if !subscription.subtopics_by_formula.is_empty() {
+ for (formula, sub) in &subscription.subtopics_by_formula {
+ if formula.matches(topic) {
+ get_subscribers_rec(sub, sp.clone(), formula_prefix, result);
+ }
+ }
}
if let Some(ref sub) = subscription.subtopics_any {
- get_subscribers_rec(sub, sp, result);
+ get_subscribers_rec(sub, sp, formula_prefix, result);
}
} else {
result.extend(subscription.subscribers.clone());
}
}
-fn is_subscribed_rec(subscription: &Subscription, mut sp: Split) -> bool
+fn is_subscribed_rec(
+ subscription: &Subscription,
+ formula_prefix: Option,
+ mut sp: Split,
+) -> bool
where
C: Ord + Eq + Clone,
{
@@ -294,13 +357,26 @@ where
if !subscription.sub_any.is_empty() {
return true;
}
- if let Some(sub) = subscription.subtopics.get(topic) {
- if is_subscribed_rec(sub, sp.clone()) {
+ if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
+ for sub in subscription.subtopics.values_match_key_formula(formula) {
+ if is_subscribed_rec(sub, formula_prefix, sp.clone()) {
+ return true;
+ }
+ }
+ } else if let Some(sub) = subscription.subtopics.get(topic) {
+ if is_subscribed_rec(sub, formula_prefix, sp.clone()) {
return true;
}
}
+ if !subscription.subtopics_by_formula.is_empty() {
+ for (formula, sub) in &subscription.subtopics_by_formula {
+ if formula.matches(topic) && is_subscribed_rec(sub, formula_prefix, sp.clone()) {
+ return true;
+ }
+ }
+ }
if let Some(ref sub) = subscription.subtopics_any {
- if is_subscribed_rec(sub, sp) {
+ if is_subscribed_rec(sub, formula_prefix, sp) {
return true;
}
}
@@ -393,4 +469,24 @@ mod test {
smap.subscribe("+/zzz/+/222", &client1);
assert_eq!(smap.get_subscribers("unix/zzz/xxx/222").len(), 1);
}
+ #[test]
+ fn test_match_formula() {
+ let mut smap: SubMap = SubMap::new()
+ .match_any("+")
+ .wildcard("#")
+ .formula_prefix('!');
+ let client1 = "client1".to_owned();
+ smap.register_client(&client1);
+ assert_eq!(smap.get_subscribers("1/xxx").len(), 0);
+ smap.subscribe("!gt(1)/xxx", &client1);
+ assert_eq!(smap.get_subscribers("1/xxx").len(), 0);
+ assert_eq!(smap.get_subscribers("2/xxx").len(), 1);
+ assert_eq!(smap.get_subscribers("unix/zzz/95/222").len(), 0);
+ assert_eq!(smap.get_subscribers("unix/zzz/96/222").len(), 0);
+ assert_eq!(smap.get_subscribers("unix/zzz/97/222").len(), 0);
+ smap.subscribe("+/zzz/!ge(96)/222", &client1);
+ assert_eq!(smap.get_subscribers("unix/zzz/95/222").len(), 0);
+ assert_eq!(smap.get_subscribers("unix/zzz/96/222").len(), 1);
+ assert_eq!(smap.get_subscribers("unix/zzz/97/222").len(), 1);
+ }
}