upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-13 19:59:36 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-13 19:59:36 +0000
commite922e14e3ec4b898c111b2100cd63dddbe2fcdb1 (patch)
tree8fa2343b4b12ce97108b1e3461410e97a7af8cce /src/sync
parent8c903c9449d387c9b0edefa5aa283b176a3ed0cb (diff)
feat: add SyncLevel to sync system for purgatory announcement state-only sync
Purgatory announcements need state events (kind 30618) synced from external relays, but not full L2/L3 events (patches, issues, PRs) which would be rejected anyway. This implements the SyncLevel concept from the design doc (decision #6): - Add SyncLevel enum (Full vs StateOnly) to RepoSyncNeeds - When announcement enters purgatory during sync, register in RepoSyncIndex with SyncLevel::StateOnly - Add build_sync_level_aware_filters() that partitions repos by level: StateOnly repos only get state event filters (kind 30618) - Update derive_relay_targets to track state_only_repos separately - Update compute_actions to handle both repo sets - SelfSubscriber always uses SyncLevel::Full (promoted repos)
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/algorithms.rs58
-rw-r--r--src/sync/filters.rs31
-rw-r--r--src/sync/mod.rs59
-rw-r--r--src/sync/self_subscriber.rs17
4 files changed, 150 insertions, 15 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index 39788bc..9899abc 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState};
25/// this repo need to sync from", it's "what repos does this relay need to sync". 25/// this repo need to sync from", it's "what repos does this relay need to sync".
26#[derive(Debug, Clone, Default)] 26#[derive(Debug, Clone, Default)]
27pub struct RelaySyncNeeds { 27pub struct RelaySyncNeeds {
28 /// Repos that need to be synced from this relay 28 /// Repos that need full L2+L3 sync from this relay
29 pub repos: HashSet<String>, 29 pub repos: HashSet<String>,
30 /// Repos that only need state event sync (purgatory announcements)
31 pub state_only_repos: HashSet<String>,
30 /// Root events that need to be tracked from this relay 32 /// Root events that need to be tracked from this relay
31 pub root_events: HashSet<EventId>, 33 pub root_events: HashSet<EventId>,
32} 34}
@@ -67,8 +69,15 @@ pub fn derive_relay_targets(
67 for relay_url in &needs.relays { 69 for relay_url in &needs.relays {
68 let entry = relay_targets.entry(relay_url.clone()).or_default(); 70 let entry = relay_targets.entry(relay_url.clone()).or_default();
69 71
70 entry.repos.insert(repo_id.clone()); 72 match needs.sync_level {
71 entry.root_events.extend(needs.root_events.iter().cloned()); 73 super::SyncLevel::Full => {
74 entry.repos.insert(repo_id.clone());
75 entry.root_events.extend(needs.root_events.iter().cloned());
76 }
77 super::SyncLevel::StateOnly => {
78 entry.state_only_repos.insert(repo_id.clone());
79 }
80 }
72 } 81 }
73 } 82 }
74 83
@@ -96,7 +105,7 @@ pub fn compute_actions(
96 pending: &HashMap<String, Vec<PendingBatch>>, 105 pending: &HashMap<String, Vec<PendingBatch>>,
97 confirmed: &HashMap<String, RelayState>, 106 confirmed: &HashMap<String, RelayState>,
98) -> Vec<AddFilters> { 107) -> Vec<AddFilters> {
99 use crate::sync::filters::build_layer2_and_layer3_filters; 108 use crate::sync::filters::build_sync_level_aware_filters;
100 109
101 let mut actions = Vec::new(); 110 let mut actions = Vec::new();
102 111
@@ -140,14 +149,22 @@ pub fn compute_actions(
140 .map(|state| state.root_events.clone()) 149 .map(|state| state.root_events.clone())
141 .unwrap_or_default(); 150 .unwrap_or_default();
142 151
143 // Calculate what's NEW (not in pending, not in confirmed) 152 // Calculate what's NEW for full repos (not in pending, not in confirmed)
144 let new_repos: HashSet<String> = target_needs 153 let new_full_repos: HashSet<String> = target_needs
145 .repos 154 .repos
146 .difference(&pending_repos) 155 .difference(&pending_repos)
147 .filter(|repo| !confirmed_repos.contains(*repo)) 156 .filter(|repo| !confirmed_repos.contains(*repo))
148 .cloned() 157 .cloned()
149 .collect(); 158 .collect();
150 159
160 // Calculate what's NEW for state-only repos
161 let new_state_only_repos: HashSet<String> = target_needs
162 .state_only_repos
163 .difference(&pending_repos)
164 .filter(|repo| !confirmed_repos.contains(*repo))
165 .cloned()
166 .collect();
167
151 let new_events: HashSet<EventId> = target_needs 168 let new_events: HashSet<EventId> = target_needs
152 .root_events 169 .root_events
153 .difference(&pending_events) 170 .difference(&pending_events)
@@ -156,13 +173,23 @@ pub fn compute_actions(
156 .collect(); 173 .collect();
157 174
158 // If there's anything new, create an AddFilters action 175 // If there's anything new, create an AddFilters action
159 if !new_repos.is_empty() || !new_events.is_empty() { 176 if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty()
160 let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); 177 {
178 let filters = build_sync_level_aware_filters(
179 &new_full_repos,
180 &new_state_only_repos,
181 &new_events,
182 None,
183 );
184
185 // Combine all repos into pending items (pending tracking doesn't need sync level)
186 let mut all_new_repos = new_full_repos;
187 all_new_repos.extend(new_state_only_repos);
161 188
162 actions.push(AddFilters { 189 actions.push(AddFilters {
163 relay_url: relay_url.clone(), 190 relay_url: relay_url.clone(),
164 items: PendingItems { 191 items: PendingItems {
165 repos: new_repos, 192 repos: all_new_repos,
166 root_events: new_events, 193 root_events: new_events,
167 }, 194 },
168 filters, 195 filters,
@@ -204,6 +231,7 @@ mod tests {
204 ModRepoSyncNeeds { 231 ModRepoSyncNeeds {
205 relays, 232 relays,
206 root_events, 233 root_events,
234 sync_level: Default::default(),
207 }, 235 },
208 ); 236 );
209 237
@@ -229,6 +257,7 @@ mod tests {
229 ModRepoSyncNeeds { 257 ModRepoSyncNeeds {
230 relays, 258 relays,
231 root_events: HashSet::new(), 259 root_events: HashSet::new(),
260 sync_level: Default::default(),
232 }, 261 },
233 ); 262 );
234 } 263 }
@@ -252,6 +281,7 @@ mod tests {
252 ModRepoSyncNeeds { 281 ModRepoSyncNeeds {
253 relays, 282 relays,
254 root_events: HashSet::new(), 283 root_events: HashSet::new(),
284 sync_level: Default::default(),
255 }, 285 },
256 ); 286 );
257 287
@@ -285,6 +315,7 @@ mod tests {
285 ModRepoSyncNeeds { 315 ModRepoSyncNeeds {
286 relays: relays1, 316 relays: relays1,
287 root_events: root_events1, 317 root_events: root_events1,
318 sync_level: Default::default(),
288 }, 319 },
289 ); 320 );
290 321
@@ -299,6 +330,7 @@ mod tests {
299 ModRepoSyncNeeds { 330 ModRepoSyncNeeds {
300 relays: relays2, 331 relays: relays2,
301 root_events: root_events2, 332 root_events: root_events2,
333 sync_level: Default::default(),
302 }, 334 },
303 ); 335 );
304 336
@@ -332,6 +364,7 @@ mod tests {
332 "wss://relay1.com".to_string(), 364 "wss://relay1.com".to_string(),
333 RelaySyncNeeds { 365 RelaySyncNeeds {
334 repos: vec!["repo1".to_string()].into_iter().collect(), 366 repos: vec!["repo1".to_string()].into_iter().collect(),
367 state_only_repos: HashSet::new(),
335 root_events: HashSet::new(), 368 root_events: HashSet::new(),
336 }, 369 },
337 ); 370 );
@@ -366,6 +399,7 @@ mod tests {
366 "wss://relay1.com".to_string(), 399 "wss://relay1.com".to_string(),
367 RelaySyncNeeds { 400 RelaySyncNeeds {
368 repos: vec!["repo1".to_string()].into_iter().collect(), 401 repos: vec!["repo1".to_string()].into_iter().collect(),
402 state_only_repos: HashSet::new(),
369 root_events: HashSet::new(), 403 root_events: HashSet::new(),
370 }, 404 },
371 ); 405 );
@@ -389,6 +423,7 @@ mod tests {
389 "wss://relay1.com".to_string(), 423 "wss://relay1.com".to_string(),
390 RelaySyncNeeds { 424 RelaySyncNeeds {
391 repos: vec!["repo1".to_string()].into_iter().collect(), 425 repos: vec!["repo1".to_string()].into_iter().collect(),
426 state_only_repos: HashSet::new(),
392 root_events: HashSet::new(), 427 root_events: HashSet::new(),
393 }, 428 },
394 ); 429 );
@@ -428,6 +463,7 @@ mod tests {
428 "wss://relay1.com".to_string(), 463 "wss://relay1.com".to_string(),
429 RelaySyncNeeds { 464 RelaySyncNeeds {
430 repos: vec!["repo1".to_string()].into_iter().collect(), 465 repos: vec!["repo1".to_string()].into_iter().collect(),
466 state_only_repos: HashSet::new(),
431 root_events: HashSet::new(), 467 root_events: HashSet::new(),
432 }, 468 },
433 ); 469 );
@@ -465,6 +501,7 @@ mod tests {
465 "wss://relay1.com".to_string(), 501 "wss://relay1.com".to_string(),
466 RelaySyncNeeds { 502 RelaySyncNeeds {
467 repos: vec!["repo1".to_string()].into_iter().collect(), 503 repos: vec!["repo1".to_string()].into_iter().collect(),
504 state_only_repos: HashSet::new(),
468 root_events: HashSet::new(), 505 root_events: HashSet::new(),
469 }, 506 },
470 ); 507 );
@@ -510,6 +547,7 @@ mod tests {
510 ] 547 ]
511 .into_iter() 548 .into_iter()
512 .collect(), 549 .collect(),
550 state_only_repos: HashSet::new(),
513 root_events: HashSet::new(), 551 root_events: HashSet::new(),
514 }, 552 },
515 ); 553 );
@@ -572,6 +610,7 @@ mod tests {
572 "wss://relay1.com".to_string(), 610 "wss://relay1.com".to_string(),
573 RelaySyncNeeds { 611 RelaySyncNeeds {
574 repos: HashSet::new(), 612 repos: HashSet::new(),
613 state_only_repos: HashSet::new(),
575 root_events: vec![event_id].into_iter().collect(), 614 root_events: vec![event_id].into_iter().collect(),
576 }, 615 },
577 ); 616 );
@@ -599,6 +638,7 @@ mod tests {
599 "wss://new-relay.com".to_string(), 638 "wss://new-relay.com".to_string(),
600 RelaySyncNeeds { 639 RelaySyncNeeds {
601 repos: vec!["repo1".to_string()].into_iter().collect(), 640 repos: vec!["repo1".to_string()].into_iter().collect(),
641 state_only_repos: HashSet::new(),
602 root_events: HashSet::new(), 642 root_events: HashSet::new(),
603 }, 643 },
604 ); 644 );
diff --git a/src/sync/filters.rs b/src/sync/filters.rs
index 3592489..1215e81 100644
--- a/src/sync/filters.rs
+++ b/src/sync/filters.rs
@@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters(
245 filters 245 filters
246} 246}
247 247
248/// Builds filters respecting SyncLevel for each repo
249///
250/// StateOnly repos only get state event filters (kind 30618).
251/// Full repos get all L2/L3 filters (state + repo-tagging + root event).
252///
253/// # Arguments
254/// * `full_repos` - Repos needing full L2+L3 sync
255/// * `state_only_repos` - Repos needing only state event sync (purgatory)
256/// * `root_events` - Root event IDs (only used for Full repos)
257/// * `since` - Optional timestamp for incremental sync
258pub fn build_sync_level_aware_filters(
259 full_repos: &HashSet<String>,
260 state_only_repos: &HashSet<String>,
261 root_events: &HashSet<EventId>,
262 since: Option<Timestamp>,
263) -> Vec<Filter> {
264 let mut filters = Vec::new();
265
266 // All repos (both Full and StateOnly) need state event filters
267 let all_repos: HashSet<String> = full_repos.union(state_only_repos).cloned().collect();
268 filters.extend(state_event_filters_for_our_repos(&all_repos, since));
269
270 // Only Full repos get repo-tagging and root event filters
271 if !full_repos.is_empty() {
272 filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since));
273 }
274 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
275
276 filters
277}
278
248#[cfg(test)] 279#[cfg(test)]
249mod tests { 280mod tests {
250 use super::*; 281 use super::*;
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 1ee1872..519017b 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex;
85// Supporting Data Structures 85// Supporting Data Structures
86// ============================================================================= 86// =============================================================================
87 87
88/// Level of sync needed for a repository
89///
90/// Purgatory announcements only need state events synced (to validate git data).
91/// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.).
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
93pub enum SyncLevel {
94 /// Full L2 + L3 sync (promoted repos with git data)
95 #[default]
96 Full,
97 /// Only state events (kind 30618) - for purgatory announcements
98 StateOnly,
99}
100
88/// What repos and root events need to be synced 101/// What repos and root events need to be synced
89#[derive(Debug, Clone, Default)] 102#[derive(Debug, Clone, Default)]
90pub struct RepoSyncNeeds { 103pub struct RepoSyncNeeds {
@@ -92,6 +105,8 @@ pub struct RepoSyncNeeds {
92 pub relays: HashSet<String>, 105 pub relays: HashSet<String>,
93 /// Root event IDs - 1617/1618/1621 - that reference this repo 106 /// Root event IDs - 1617/1618/1621 - that reference this repo
94 pub root_events: HashSet<EventId>, 107 pub root_events: HashSet<EventId>,
108 /// Sync level - StateOnly for purgatory, Full for promoted repos
109 pub sync_level: SyncLevel,
95} 110}
96 111
97/// Connection status for a relay 112/// Connection status for a relay
@@ -1677,6 +1692,7 @@ impl SyncManager {
1677 let eose_tx = self.eose_tx.as_ref().unwrap().clone(); 1692 let eose_tx = self.eose_tx.as_ref().unwrap().clone();
1678 let metrics_clone = self.metrics.clone(); 1693 let metrics_clone = self.metrics.clone();
1679 let pending_sync_index = Arc::clone(&self.pending_sync_index); 1694 let pending_sync_index = Arc::clone(&self.pending_sync_index);
1695 let repo_sync_index = Arc::clone(&self.repo_sync_index);
1680 let health_tracker = Arc::clone(&self.health_tracker); 1696 let health_tracker = Arc::clone(&self.health_tracker);
1681 let rejected_events_index = Arc::clone(&self.rejected_events_index); 1697 let rejected_events_index = Arc::clone(&self.rejected_events_index);
1682 1698
@@ -1719,8 +1735,49 @@ impl SyncManager {
1719 // For sync-triggered events that go to purgatory, trigger immediate sync 1735 // For sync-triggered events that go to purgatory, trigger immediate sync
1720 // (instead of the default 3-minute delay for user-submitted events) 1736 // (instead of the default 3-minute delay for user-submitted events)
1721 if result == ProcessResult::Purgatory { 1737 if result == ProcessResult::Purgatory {
1738 // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly
1739 // so that state events (kind 30618) are synced for this purgatory announcement
1740 if event.kind == Kind::GitRepoAnnouncement {
1741 if let Some(identifier) = event.tags.iter().find_map(|tag| {
1742 let tag_vec = tag.as_slice();
1743 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
1744 Some(tag_vec[1].to_string())
1745 } else {
1746 None
1747 }
1748 }) {
1749 let repo_id = format!("30617:{}:{}", event.pubkey, identifier);
1750
1751 // Extract relay URLs from the purgatory entry
1752 let relays = write_policy
1753 .purgatory()
1754 .find_announcement(&event.pubkey, &identifier)
1755 .map(|entry| entry.relays)
1756 .unwrap_or_default();
1757
1758 tracing::info!(
1759 event_id = %event.id,
1760 repo_id = %repo_id,
1761 relay_count = relays.len(),
1762 "Registering purgatory announcement in RepoSyncIndex with StateOnly level"
1763 );
1764
1765 // Register in RepoSyncIndex with StateOnly level
1766 let mut index = repo_sync_index.write().await;
1767 let entry = index
1768 .entry(repo_id)
1769 .or_insert_with(|| RepoSyncNeeds {
1770 relays: HashSet::new(),
1771 root_events: HashSet::new(),
1772 sync_level: SyncLevel::StateOnly,
1773 });
1774 entry.relays.extend(relays);
1775 // Don't upgrade sync_level if already Full
1776 // (e.g., if announcement was promoted before this runs)
1777 }
1778 }
1722 // State events (kind 30618) - extract identifier and trigger immediate sync 1779 // State events (kind 30618) - extract identifier and trigger immediate sync
1723 if event.kind.as_u16() == 30618 { 1780 else if event.kind.as_u16() == 30618 {
1724 if let Some(identifier) = event.tags.iter().find_map(|tag| { 1781 if let Some(identifier) = event.tags.iter().find_map(|tag| {
1725 let tag_vec = tag.clone().to_vec(); 1782 let tag_vec = tag.clone().to_vec();
1726 if tag_vec.len() >= 2 && tag_vec[0] == "d" { 1783 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 3cc408d..db16c62 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -16,7 +16,7 @@ use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError; 16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
18 18
19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel};
20 20
21// ============================================================================= 21// =============================================================================
22// LoopControl - Result of notification processing 22// LoopControl - Result of notification processing
@@ -58,6 +58,7 @@ impl PendingUpdates {
58 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { 58 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
59 relays: HashSet::new(), 59 relays: HashSet::new(),
60 root_events: HashSet::new(), 60 root_events: HashSet::new(),
61 sync_level: SyncLevel::Full,
61 }); 62 });
62 entry.relays.extend(relays); 63 entry.relays.extend(relays);
63 entry.root_events.extend(root_events); 64 entry.root_events.extend(root_events);
@@ -475,6 +476,7 @@ impl SelfSubscriber {
475 .or_insert_with(|| RepoSyncNeeds { 476 .or_insert_with(|| RepoSyncNeeds {
476 relays: HashSet::new(), 477 relays: HashSet::new(),
477 root_events: HashSet::new(), 478 root_events: HashSet::new(),
479 sync_level: SyncLevel::Full,
478 }); 480 });
479 entry.relays.extend(needs.relays); 481 entry.relays.extend(needs.relays);
480 entry.root_events.extend(needs.root_events); 482 entry.root_events.extend(needs.root_events);
@@ -499,21 +501,26 @@ impl SelfSubscriber {
499 continue; 501 continue;
500 } 502 }
501 503
502 // Build filters for these repos 504 // Build filters for these repos (sync-level-aware)
503 let filters = crate::sync::filters::build_layer2_and_layer3_filters( 505 let filters = crate::sync::filters::build_sync_level_aware_filters(
504 &needs.repos, 506 &needs.repos,
507 &needs.state_only_repos,
505 &needs.root_events, 508 &needs.root_events,
506 None, 509 None,
507 ); 510 );
508 511
509 // Log before moving values 512 // Log before moving values
510 let repo_count = needs.repos.len(); 513 let repo_count = needs.repos.len() + needs.state_only_repos.len();
511 let event_count = needs.root_events.len(); 514 let event_count = needs.root_events.len();
512 515
516 // Combine all repos into pending items
517 let mut all_repos = needs.repos;
518 all_repos.extend(needs.state_only_repos);
519
513 let action = AddFilters { 520 let action = AddFilters {
514 relay_url: relay_url.clone(), 521 relay_url: relay_url.clone(),
515 items: crate::sync::PendingItems { 522 items: crate::sync::PendingItems {
516 repos: needs.repos, 523 repos: all_repos,
517 root_events: needs.root_events, 524 root_events: needs.root_events,
518 }, 525 },
519 filters, 526 filters,