diff options
Diffstat (limited to 'src/sync/algorithms.rs')
| -rw-r--r-- | src/sync/algorithms.rs | 58 |
1 files changed, 49 insertions, 9 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 39788bc..9899abc 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState}; | |||
| 25 | /// this repo need to sync from", it's "what repos does this relay need to sync". | 25 | /// this repo need to sync from", it's "what repos does this relay need to sync". |
| 26 | #[derive(Debug, Clone, Default)] | 26 | #[derive(Debug, Clone, Default)] |
| 27 | pub struct RelaySyncNeeds { | 27 | pub struct RelaySyncNeeds { |
| 28 | /// Repos that need to be synced from this relay | 28 | /// Repos that need full L2+L3 sync from this relay |
| 29 | pub repos: HashSet<String>, | 29 | pub repos: HashSet<String>, |
| 30 | /// Repos that only need state event sync (purgatory announcements) | ||
| 31 | pub state_only_repos: HashSet<String>, | ||
| 30 | /// Root events that need to be tracked from this relay | 32 | /// Root events that need to be tracked from this relay |
| 31 | pub root_events: HashSet<EventId>, | 33 | pub root_events: HashSet<EventId>, |
| 32 | } | 34 | } |
| @@ -67,8 +69,15 @@ pub fn derive_relay_targets( | |||
| 67 | for relay_url in &needs.relays { | 69 | for relay_url in &needs.relays { |
| 68 | let entry = relay_targets.entry(relay_url.clone()).or_default(); | 70 | let entry = relay_targets.entry(relay_url.clone()).or_default(); |
| 69 | 71 | ||
| 70 | entry.repos.insert(repo_id.clone()); | 72 | match needs.sync_level { |
| 71 | entry.root_events.extend(needs.root_events.iter().cloned()); | 73 | super::SyncLevel::Full => { |
| 74 | entry.repos.insert(repo_id.clone()); | ||
| 75 | entry.root_events.extend(needs.root_events.iter().cloned()); | ||
| 76 | } | ||
| 77 | super::SyncLevel::StateOnly => { | ||
| 78 | entry.state_only_repos.insert(repo_id.clone()); | ||
| 79 | } | ||
| 80 | } | ||
| 72 | } | 81 | } |
| 73 | } | 82 | } |
| 74 | 83 | ||
| @@ -96,7 +105,7 @@ pub fn compute_actions( | |||
| 96 | pending: &HashMap<String, Vec<PendingBatch>>, | 105 | pending: &HashMap<String, Vec<PendingBatch>>, |
| 97 | confirmed: &HashMap<String, RelayState>, | 106 | confirmed: &HashMap<String, RelayState>, |
| 98 | ) -> Vec<AddFilters> { | 107 | ) -> Vec<AddFilters> { |
| 99 | use crate::sync::filters::build_layer2_and_layer3_filters; | 108 | use crate::sync::filters::build_sync_level_aware_filters; |
| 100 | 109 | ||
| 101 | let mut actions = Vec::new(); | 110 | let mut actions = Vec::new(); |
| 102 | 111 | ||
| @@ -140,14 +149,22 @@ pub fn compute_actions( | |||
| 140 | .map(|state| state.root_events.clone()) | 149 | .map(|state| state.root_events.clone()) |
| 141 | .unwrap_or_default(); | 150 | .unwrap_or_default(); |
| 142 | 151 | ||
| 143 | // Calculate what's NEW (not in pending, not in confirmed) | 152 | // Calculate what's NEW for full repos (not in pending, not in confirmed) |
| 144 | let new_repos: HashSet<String> = target_needs | 153 | let new_full_repos: HashSet<String> = target_needs |
| 145 | .repos | 154 | .repos |
| 146 | .difference(&pending_repos) | 155 | .difference(&pending_repos) |
| 147 | .filter(|repo| !confirmed_repos.contains(*repo)) | 156 | .filter(|repo| !confirmed_repos.contains(*repo)) |
| 148 | .cloned() | 157 | .cloned() |
| 149 | .collect(); | 158 | .collect(); |
| 150 | 159 | ||
| 160 | // Calculate what's NEW for state-only repos | ||
| 161 | let new_state_only_repos: HashSet<String> = target_needs | ||
| 162 | .state_only_repos | ||
| 163 | .difference(&pending_repos) | ||
| 164 | .filter(|repo| !confirmed_repos.contains(*repo)) | ||
| 165 | .cloned() | ||
| 166 | .collect(); | ||
| 167 | |||
| 151 | let new_events: HashSet<EventId> = target_needs | 168 | let new_events: HashSet<EventId> = target_needs |
| 152 | .root_events | 169 | .root_events |
| 153 | .difference(&pending_events) | 170 | .difference(&pending_events) |
| @@ -156,13 +173,23 @@ pub fn compute_actions( | |||
| 156 | .collect(); | 173 | .collect(); |
| 157 | 174 | ||
| 158 | // If there's anything new, create an AddFilters action | 175 | // If there's anything new, create an AddFilters action |
| 159 | if !new_repos.is_empty() || !new_events.is_empty() { | 176 | if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty() |
| 160 | let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); | 177 | { |
| 178 | let filters = build_sync_level_aware_filters( | ||
| 179 | &new_full_repos, | ||
| 180 | &new_state_only_repos, | ||
| 181 | &new_events, | ||
| 182 | None, | ||
| 183 | ); | ||
| 184 | |||
| 185 | // Combine all repos into pending items (pending tracking doesn't need sync level) | ||
| 186 | let mut all_new_repos = new_full_repos; | ||
| 187 | all_new_repos.extend(new_state_only_repos); | ||
| 161 | 188 | ||
| 162 | actions.push(AddFilters { | 189 | actions.push(AddFilters { |
| 163 | relay_url: relay_url.clone(), | 190 | relay_url: relay_url.clone(), |
| 164 | items: PendingItems { | 191 | items: PendingItems { |
| 165 | repos: new_repos, | 192 | repos: all_new_repos, |
| 166 | root_events: new_events, | 193 | root_events: new_events, |
| 167 | }, | 194 | }, |
| 168 | filters, | 195 | filters, |
| @@ -204,6 +231,7 @@ mod tests { | |||
| 204 | ModRepoSyncNeeds { | 231 | ModRepoSyncNeeds { |
| 205 | relays, | 232 | relays, |
| 206 | root_events, | 233 | root_events, |
| 234 | sync_level: Default::default(), | ||
| 207 | }, | 235 | }, |
| 208 | ); | 236 | ); |
| 209 | 237 | ||
| @@ -229,6 +257,7 @@ mod tests { | |||
| 229 | ModRepoSyncNeeds { | 257 | ModRepoSyncNeeds { |
| 230 | relays, | 258 | relays, |
| 231 | root_events: HashSet::new(), | 259 | root_events: HashSet::new(), |
| 260 | sync_level: Default::default(), | ||
| 232 | }, | 261 | }, |
| 233 | ); | 262 | ); |
| 234 | } | 263 | } |
| @@ -252,6 +281,7 @@ mod tests { | |||
| 252 | ModRepoSyncNeeds { | 281 | ModRepoSyncNeeds { |
| 253 | relays, | 282 | relays, |
| 254 | root_events: HashSet::new(), | 283 | root_events: HashSet::new(), |
| 284 | sync_level: Default::default(), | ||
| 255 | }, | 285 | }, |
| 256 | ); | 286 | ); |
| 257 | 287 | ||
| @@ -285,6 +315,7 @@ mod tests { | |||
| 285 | ModRepoSyncNeeds { | 315 | ModRepoSyncNeeds { |
| 286 | relays: relays1, | 316 | relays: relays1, |
| 287 | root_events: root_events1, | 317 | root_events: root_events1, |
| 318 | sync_level: Default::default(), | ||
| 288 | }, | 319 | }, |
| 289 | ); | 320 | ); |
| 290 | 321 | ||
| @@ -299,6 +330,7 @@ mod tests { | |||
| 299 | ModRepoSyncNeeds { | 330 | ModRepoSyncNeeds { |
| 300 | relays: relays2, | 331 | relays: relays2, |
| 301 | root_events: root_events2, | 332 | root_events: root_events2, |
| 333 | sync_level: Default::default(), | ||
| 302 | }, | 334 | }, |
| 303 | ); | 335 | ); |
| 304 | 336 | ||
| @@ -332,6 +364,7 @@ mod tests { | |||
| 332 | "wss://relay1.com".to_string(), | 364 | "wss://relay1.com".to_string(), |
| 333 | RelaySyncNeeds { | 365 | RelaySyncNeeds { |
| 334 | repos: vec!["repo1".to_string()].into_iter().collect(), | 366 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 367 | state_only_repos: HashSet::new(), | ||
| 335 | root_events: HashSet::new(), | 368 | root_events: HashSet::new(), |
| 336 | }, | 369 | }, |
| 337 | ); | 370 | ); |
| @@ -366,6 +399,7 @@ mod tests { | |||
| 366 | "wss://relay1.com".to_string(), | 399 | "wss://relay1.com".to_string(), |
| 367 | RelaySyncNeeds { | 400 | RelaySyncNeeds { |
| 368 | repos: vec!["repo1".to_string()].into_iter().collect(), | 401 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 402 | state_only_repos: HashSet::new(), | ||
| 369 | root_events: HashSet::new(), | 403 | root_events: HashSet::new(), |
| 370 | }, | 404 | }, |
| 371 | ); | 405 | ); |
| @@ -389,6 +423,7 @@ mod tests { | |||
| 389 | "wss://relay1.com".to_string(), | 423 | "wss://relay1.com".to_string(), |
| 390 | RelaySyncNeeds { | 424 | RelaySyncNeeds { |
| 391 | repos: vec!["repo1".to_string()].into_iter().collect(), | 425 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 426 | state_only_repos: HashSet::new(), | ||
| 392 | root_events: HashSet::new(), | 427 | root_events: HashSet::new(), |
| 393 | }, | 428 | }, |
| 394 | ); | 429 | ); |
| @@ -428,6 +463,7 @@ mod tests { | |||
| 428 | "wss://relay1.com".to_string(), | 463 | "wss://relay1.com".to_string(), |
| 429 | RelaySyncNeeds { | 464 | RelaySyncNeeds { |
| 430 | repos: vec!["repo1".to_string()].into_iter().collect(), | 465 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 466 | state_only_repos: HashSet::new(), | ||
| 431 | root_events: HashSet::new(), | 467 | root_events: HashSet::new(), |
| 432 | }, | 468 | }, |
| 433 | ); | 469 | ); |
| @@ -465,6 +501,7 @@ mod tests { | |||
| 465 | "wss://relay1.com".to_string(), | 501 | "wss://relay1.com".to_string(), |
| 466 | RelaySyncNeeds { | 502 | RelaySyncNeeds { |
| 467 | repos: vec!["repo1".to_string()].into_iter().collect(), | 503 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 504 | state_only_repos: HashSet::new(), | ||
| 468 | root_events: HashSet::new(), | 505 | root_events: HashSet::new(), |
| 469 | }, | 506 | }, |
| 470 | ); | 507 | ); |
| @@ -510,6 +547,7 @@ mod tests { | |||
| 510 | ] | 547 | ] |
| 511 | .into_iter() | 548 | .into_iter() |
| 512 | .collect(), | 549 | .collect(), |
| 550 | state_only_repos: HashSet::new(), | ||
| 513 | root_events: HashSet::new(), | 551 | root_events: HashSet::new(), |
| 514 | }, | 552 | }, |
| 515 | ); | 553 | ); |
| @@ -572,6 +610,7 @@ mod tests { | |||
| 572 | "wss://relay1.com".to_string(), | 610 | "wss://relay1.com".to_string(), |
| 573 | RelaySyncNeeds { | 611 | RelaySyncNeeds { |
| 574 | repos: HashSet::new(), | 612 | repos: HashSet::new(), |
| 613 | state_only_repos: HashSet::new(), | ||
| 575 | root_events: vec![event_id].into_iter().collect(), | 614 | root_events: vec![event_id].into_iter().collect(), |
| 576 | }, | 615 | }, |
| 577 | ); | 616 | ); |
| @@ -599,6 +638,7 @@ mod tests { | |||
| 599 | "wss://new-relay.com".to_string(), | 638 | "wss://new-relay.com".to_string(), |
| 600 | RelaySyncNeeds { | 639 | RelaySyncNeeds { |
| 601 | repos: vec!["repo1".to_string()].into_iter().collect(), | 640 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 641 | state_only_repos: HashSet::new(), | ||
| 602 | root_events: HashSet::new(), | 642 | root_events: HashSet::new(), |
| 603 | }, | 643 | }, |
| 604 | ); | 644 | ); |