upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:12 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:18 +0000
commitefaad1e2857914b87307cf78903a957a604697a8 (patch)
treedadd0285727b324328166d06d86a6e1e6fb935cf /src/sync/mod.rs
parent91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff)
basic sync stub
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs416
1 files changed, 385 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 17418d0..aa34490 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1,40 +1,394 @@
1//! Proactive Sync Module for GRASP-02 1//! Proactive Sync Module for GRASP-02
2//! 2//!
3//! This module implements proactive synchronization of kind 30617 (repository state) 3//! This module implements the proactive sync system that ensures data availability
4//! events from configured relay(s). Events are validated through the same write policy 4//! for repositories hosted on this relay by syncing from other relays in the ecosystem.
5//! as directly-submitted events.
6//! 5//!
7//! ## Three-Layer Filter Strategy (Phase 2) 6//! ## Architecture Overview
8//! 7//!
9//! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) 8//! The sync system is built around two core data structures:
10//! - **Layer 2**: Repository events (A/a tags for shared repos)
11//! - **Layer 3**: Related events (E/e tags for discussions, reviews)
12//! 9//!
13//! ## Resilience & Health Tracking (Phase 3) 10//! - **FollowingRepoRootEvents**: Tracks repository root events we're following
11//! - **SyncRelays**: Tracks relays we sync from, including their repos and events
14//! 12//!
15//! - **Health tracking**: Per-relay connection health states (Healthy, Degraded, Dead) 13//! These type aliases are colocated with SyncManager (following the pattern of
16//! - **Exponential backoff**: Smart retry delays on failures (5s -> 1h max) 14//! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity.
17//! - **Dead relay handling**: Minimal retry for 24h+ failed relays 15//!
18//! - **Startup jitter**: Prevent thundering herd on launch (0-10s random delay) 16//! ## Submodules
17//!
18//! - [`health`]: Relay health tracking with exponential backoff and dead relay detection
19//! - [`metrics`]: Prometheus metrics for sync operations
20//!
21//! ## Memory Estimates (from design doc)
22//!
23//! At target scale (1,000 repos, 100 relays):
24//! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB
25//! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB
26//! - **Total in-memory state**: ~10 MB
27//!
28//! ## Upper Bounds (triggers for redesign)
29//!
30//! - 10,000+ repos: Consider database-backed state
31//! - 500+ sync relays: Consider connection pooling
32//! - 500+ root events per repo: Consider per-repo pagination
33//!
34//! ## Design References
35//!
36//! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md)
37//! for the complete design context.
38
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41
42use nostr_sdk::EventId;
43use tokio::sync::RwLock;
44
45use crate::config::Config;
46use crate::nostr::builder::Nip34WritePolicy;
47use crate::nostr::SharedDatabase;
48
49// =============================================================================
50// Type Aliases for Sync State
51// =============================================================================
52
53/// Repository root events we're following.
54///
55/// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621)
56/// we need to follow for each repository we host.
57///
58/// ## Key Format
59///
60/// The key is a repository addressable reference in the format:
61/// `"30617:<pubkey>:<identifier>"`
62///
63/// For example: `"30617:abc123...def:my-project"`
64///
65/// ## Value
66///
67/// A set of event IDs representing root events (PRs, Issues, Patches, Status events)
68/// that reference this repository via an `a` tag.
69///
70/// ## Event Kinds Tracked
71///
72/// - **1617**: Patches (NIP-34)
73/// - **1618**: Issues (NIP-34)
74/// - **1619**: PRs (Pull Requests, NIP-34)
75/// - **1621**: Status events (NIP-34)
76///
77/// ## Invariants
78///
79/// - May include a few extra repo refs that aren't in `SyncRelays`
80/// - This is acceptable - we won't query other relays for them
81/// - Updated incrementally via self-subscription
82///
83/// ## Thread Safety
84///
85/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
86/// async tasks performing sync operations.
87///
88/// ## Example Usage
89///
90/// ```rust,ignore
91/// use ngit_grasp::sync::FollowingRepoRootEvents;
92/// use std::collections::HashSet;
93/// use nostr_sdk::EventId;
94///
95/// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) {
96/// let guard = state.read().await;
97/// if let Some(events) = guard.get(repo_ref) {
98/// println!("Tracking {} root events for {}", events.len(), repo_ref);
99/// }
100/// }
101/// ```
102pub type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>;
103
104/// Relays we sync from, including their repos and events.
105///
106/// This structure tracks which relays we need to connect to for syncing,
107/// and for each relay, which repositories and their root events we're interested in.
108///
109/// ## Key Format (Outer HashMap)
110///
111/// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"`
112///
113/// ## Value Format (Inner HashMap)
114///
115/// For each relay, we maintain a map of:
116/// - Key: Repository addressable reference (`"30617:<pubkey>:<identifier>"`)
117/// - Value: Set of event IDs for that repo which should be synced from this relay
118///
119/// ## Relay Selection Criteria
120///
121/// A relay is included if its URL appears in a repository announcement (kind 30617)
122/// that **also** lists our service URL. This ensures we only sync from relays
123/// for repositories that are actually hosted on our relay.
124///
125/// ## Bootstrap Relay
126///
127/// If configured, the bootstrap relay is always present in this map and is
128/// excluded from automatic removal logic. The bootstrap relay is used for
129/// initial sync and discovery even when no repositories explicitly list it.
130///
131/// ## Thread Safety
132///
133/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
134/// async tasks performing sync operations.
135///
136/// ## Example Usage
137///
138/// ```rust,ignore
139/// use ngit_grasp::sync::SyncRelays;
140/// use std::collections::{HashMap, HashSet};
141///
142/// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) {
143/// let guard = state.read().await;
144/// if let Some(repos) = guard.get(relay_url) {
145/// println!("Relay {} tracks {} repos", relay_url, repos.len());
146/// for (repo_ref, events) in repos {
147/// println!(" {} -> {} events", repo_ref, events.len());
148/// }
149/// }
150/// }
151/// ```
152pub type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>;
153
154/// Creates a new empty `FollowingRepoRootEvents` state.
155///
156/// Use this to initialize the state before populating from database queries.
157pub fn new_following_repo_root_events() -> FollowingRepoRootEvents {
158 Arc::new(RwLock::new(HashMap::new()))
159}
160
161/// Creates a new empty `SyncRelays` state.
162///
163/// Use this to initialize the state before populating from database queries.
164pub fn new_sync_relays() -> SyncRelays {
165 Arc::new(RwLock::new(HashMap::new()))
166}
167
168// =============================================================================
169// SyncManager
170// =============================================================================
171
172/// Manages proactive synchronization with external relays.
173///
174/// The SyncManager is responsible for:
175/// - Discovering relays from stored repository announcements
176/// - Maintaining connections to sync relays
177/// - Subscribing to events at external relays
178/// - Applying the acceptance policy to synced events
179///
180/// ## Lifecycle
181///
182/// 1. `new()` - Creates manager with database and config
183/// 2. `run()` - Main async loop (call in a spawned task)
184///
185/// ## Current Status
186///
187/// This is a stub implementation. The core data structures are:
188/// - [`FollowingRepoRootEvents`]: Repository root events we're following
189/// - [`SyncRelays`]: Relays we sync from with their repos and events
190///
191/// Full implementation will come in later phases.
192pub struct SyncManager {
193 /// Bootstrap relay URL if configured
194 #[allow(dead_code)]
195 bootstrap_relay_url: Option<String>,
196
197 /// Our service domain for filtering repo announcements
198 #[allow(dead_code)]
199 service_domain: String,
200
201 /// Database for querying/storing events
202 #[allow(dead_code)]
203 database: SharedDatabase,
204
205 /// Write policy for applying acceptance rules
206 #[allow(dead_code)]
207 write_policy: Nip34WritePolicy,
208
209 /// Repository root events we're following (Phase 1 data structure)
210 #[allow(dead_code)]
211 following_repo_root_events: FollowingRepoRootEvents,
212
213 /// Relays we sync from (Phase 1 data structure)
214 #[allow(dead_code)]
215 sync_relays: SyncRelays,
216
217 /// Max backoff duration for relay reconnection
218 #[allow(dead_code)]
219 max_backoff_secs: u64,
220}
221
222impl SyncManager {
223 /// Creates a new SyncManager.
224 ///
225 /// # Arguments
226 ///
227 /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync
228 /// * `service_domain` - Our domain for filtering announcements
229 /// * `database` - Database for event storage/queries
230 /// * `write_policy` - Policy for accepting events
231 /// * `config` - Configuration for sync parameters
232 pub fn new(
233 bootstrap_relay_url: Option<String>,
234 service_domain: String,
235 database: SharedDatabase,
236 write_policy: Nip34WritePolicy,
237 config: &Config,
238 ) -> Self {
239 Self {
240 bootstrap_relay_url,
241 service_domain,
242 database,
243 write_policy,
244 following_repo_root_events: new_following_repo_root_events(),
245 sync_relays: new_sync_relays(),
246 max_backoff_secs: config.sync_max_backoff_secs,
247 }
248 }
249
250 /// Returns a reference to the following repo root events state.
251 ///
252 /// This is the Phase 1 data structure tracking which repository root events
253 /// (kinds 1617, 1618, 1619, 1621) we're following.
254 pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents {
255 &self.following_repo_root_events
256 }
257
258 /// Returns a reference to the sync relays state.
259 ///
260 /// This is the Phase 1 data structure tracking which relays we sync from
261 /// and their associated repositories/events.
262 pub fn sync_relays(&self) -> &SyncRelays {
263 &self.sync_relays
264 }
265
266 /// Runs the sync manager main loop.
267 ///
268 /// This method should be called in a spawned task:
269 ///
270 /// ```rust,ignore
271 /// tokio::spawn(async move {
272 /// sync_manager.run().await;
273 /// });
274 /// ```
275 ///
276 /// ## Current Status
277 ///
278 /// This is a stub that logs and then waits indefinitely.
279 /// Full implementation includes:
280 /// - Phase 2: Database initialization queries
281 /// - Phase 3: Self-subscription for incremental updates
282 /// - Phase 4-6: Filter building, connection management
283 /// - Phase 7: Full sync loop
284 pub async fn run(self) {
285 tracing::info!(
286 "SyncManager stub started (bootstrap_relay={:?}, domain={})",
287 self.bootstrap_relay_url,
288 self.service_domain
289 );
290
291 tracing::info!(
292 "Phase 1 data structures initialized: following_repo_root_events, sync_relays"
293 );
294
295 // Stub: just wait indefinitely until full implementation
296 // This prevents the spawned task from immediately completing
297 loop {
298 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
299 }
300 }
301}
302
303// =============================================================================
304// Submodules
305// =============================================================================
19 306
20mod connection;
21mod filter;
22pub mod health; 307pub mod health;
23mod manager;
24pub mod metrics; 308pub mod metrics;
25pub mod negentropy; 309
26mod subscription; 310// Re-export commonly used types
27 311pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker};
28pub use filter::FilterService; 312pub use metrics::{event_source, SyncMetrics};
29pub use health::{HealthState, RelayHealth, RelayHealthTracker}; 313
30pub use manager::SyncManager; 314// =============================================================================
31pub use metrics::SyncMetrics; 315// Tests
32pub use negentropy::NegentropyService; 316// =============================================================================
33pub use subscription::SubscriptionManager; 317
34 318#[cfg(test)]
35// Re-export default sync source address for backward compatibility with modules like negentropy.rs 319mod tests {
36// Manager.rs derives sync_source_addr from config.bind_address at runtime 320 use super::*;
37pub use manager::DEFAULT_SYNC_SOURCE_ADDR as SYNC_SOURCE_ADDR; 321
38 322 #[tokio::test]
39/// Kind for repository state events (NIP-34) 323 async fn test_following_repo_root_events_basic_operations() {
40pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file 324 let state = new_following_repo_root_events();
325
326 // Insert some events
327 {
328 let mut guard = state.write().await;
329 let repo_ref = "30617:abc123:my-project".to_string();
330 guard
331 .entry(repo_ref)
332 .or_default()
333 .insert(EventId::all_zeros());
334 }
335
336 // Read back
337 {
338 let guard = state.read().await;
339 assert_eq!(guard.len(), 1);
340 assert!(guard.contains_key("30617:abc123:my-project"));
341 }
342 }
343
344 #[tokio::test]
345 async fn test_sync_relays_basic_operations() {
346 let state = new_sync_relays();
347
348 // Insert relay with repos
349 {
350 let mut guard = state.write().await;
351 let relay_url = "wss://relay.example.com".to_string();
352 let repo_ref = "30617:abc123:my-project".to_string();
353
354 guard
355 .entry(relay_url)
356 .or_default()
357 .entry(repo_ref)
358 .or_default()
359 .insert(EventId::all_zeros());
360 }
361
362 // Read back
363 {
364 let guard = state.read().await;
365 assert_eq!(guard.len(), 1);
366 let relay_repos = guard.get("wss://relay.example.com").unwrap();
367 assert_eq!(relay_repos.len(), 1);
368 let events = relay_repos.get("30617:abc123:my-project").unwrap();
369 assert_eq!(events.len(), 1);
370 }
371 }
372
373 #[tokio::test]
374 async fn test_concurrent_access() {
375 let state = new_following_repo_root_events();
376 let state_clone = Arc::clone(&state);
377
378 // Writer task
379 let writer = tokio::spawn(async move {
380 let mut guard = state_clone.write().await;
381 guard
382 .entry("30617:writer:repo".to_string())
383 .or_default()
384 .insert(EventId::all_zeros());
385 });
386
387 // Wait for writer
388 writer.await.unwrap();
389
390 // Reader should see the change
391 let guard = state.read().await;
392 assert!(guard.contains_key("30617:writer:repo"));
393 }
394} \ No newline at end of file