diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 10:33:07 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 10:33:07 +0000 |
| commit | 586fc2a7df1ce256469f0742d23f687ac4b075b1 (patch) | |
| tree | dc07dbec88ea1ca2e80b4ced91831256bb68ce4e /src | |
| parent | 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b (diff) | |
stub of sync v4
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/health.rs | 475 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 348 | ||||
| -rw-r--r-- | src/sync/mod.rs | 1264 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 185 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 497 |
5 files changed, 300 insertions, 2469 deletions
diff --git a/src/sync/health.rs b/src/sync/health.rs deleted file mode 100644 index 51bd5ae..0000000 --- a/src/sync/health.rs +++ /dev/null | |||
| @@ -1,475 +0,0 @@ | |||
| 1 | //! Relay Health Tracking for GRASP-02 Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module implements health tracking for relay connections, including: | ||
| 4 | //! - Health state machine (Healthy -> Degraded -> Dead) | ||
| 5 | //! - Exponential backoff with configurable max delay | ||
| 6 | //! - Dead relay detection after 24h of continuous failures | ||
| 7 | //! | ||
| 8 | //! ## Health States | ||
| 9 | //! | ||
| 10 | //! - **Healthy**: Working connection, no recent failures | ||
| 11 | //! - **Degraded**: Connection failed, retrying with backoff | ||
| 12 | //! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) | ||
| 13 | |||
| 14 | use std::sync::Arc; | ||
| 15 | use std::time::{Duration, Instant}; | ||
| 16 | |||
| 17 | use dashmap::DashMap; | ||
| 18 | |||
| 19 | use crate::config::Config; | ||
| 20 | |||
| 21 | /// Duration threshold before a relay is considered dead (24 hours) | ||
| 22 | const DEAD_THRESHOLD_HOURS: u64 = 24; | ||
| 23 | |||
| 24 | /// How often dead relays are retried (once per 24 hours) | ||
| 25 | const DEAD_RETRY_INTERVAL_HOURS: u64 = 24; | ||
| 26 | |||
| 27 | /// Default maximum backoff duration in seconds (1 hour) | ||
| 28 | const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; | ||
| 29 | |||
| 30 | /// Base backoff duration in seconds | ||
| 31 | const BASE_BACKOFF_SECS: u64 = 5; | ||
| 32 | |||
| 33 | /// Health state of a relay connection | ||
| 34 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 35 | pub enum HealthState { | ||
| 36 | /// Working connection, no recent failures | ||
| 37 | Healthy, | ||
| 38 | /// Connection failed, retrying with exponential backoff | ||
| 39 | Degraded, | ||
| 40 | /// 24h+ of continuous failures, minimal retry | ||
| 41 | Dead, | ||
| 42 | } | ||
| 43 | |||
| 44 | impl std::fmt::Display for HealthState { | ||
| 45 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 46 | match self { | ||
| 47 | HealthState::Healthy => write!(f, "healthy"), | ||
| 48 | HealthState::Degraded => write!(f, "degraded"), | ||
| 49 | HealthState::Dead => write!(f, "dead"), | ||
| 50 | } | ||
| 51 | } | ||
| 52 | } | ||
| 53 | |||
| 54 | /// Health information for a single relay | ||
| 55 | #[derive(Debug, Clone)] | ||
| 56 | pub struct RelayHealth { | ||
| 57 | /// Current health state | ||
| 58 | pub state: HealthState, | ||
| 59 | /// Number of consecutive connection failures | ||
| 60 | pub consecutive_failures: u32, | ||
| 61 | /// Time of the first failure in the current failure streak | ||
| 62 | pub first_failure_time: Option<Instant>, | ||
| 63 | /// Time of the last failure | ||
| 64 | pub last_failure_time: Option<Instant>, | ||
| 65 | /// Time of the last successful connection | ||
| 66 | pub last_success_time: Option<Instant>, | ||
| 67 | /// Next time a connection attempt should be made | ||
| 68 | pub next_retry_at: Option<Instant>, | ||
| 69 | } | ||
| 70 | |||
| 71 | impl Default for RelayHealth { | ||
| 72 | fn default() -> Self { | ||
| 73 | Self { | ||
| 74 | state: HealthState::Healthy, | ||
| 75 | consecutive_failures: 0, | ||
| 76 | first_failure_time: None, | ||
| 77 | last_failure_time: None, | ||
| 78 | last_success_time: None, | ||
| 79 | next_retry_at: None, | ||
| 80 | } | ||
| 81 | } | ||
| 82 | } | ||
| 83 | |||
| 84 | impl RelayHealth { | ||
| 85 | /// Create a new RelayHealth with healthy state | ||
| 86 | pub fn new() -> Self { | ||
| 87 | Self::default() | ||
| 88 | } | ||
| 89 | } | ||
| 90 | |||
| 91 | /// Thread-safe relay health tracker using DashMap | ||
| 92 | #[derive(Debug)] | ||
| 93 | pub struct RelayHealthTracker { | ||
| 94 | health: DashMap<String, RelayHealth>, | ||
| 95 | max_backoff_secs: u64, | ||
| 96 | } | ||
| 97 | |||
| 98 | impl RelayHealthTracker { | ||
| 99 | /// Create a new RelayHealthTracker | ||
| 100 | pub fn new(config: &Config) -> Self { | ||
| 101 | Self { | ||
| 102 | health: DashMap::new(), | ||
| 103 | max_backoff_secs: config.sync_max_backoff_secs, | ||
| 104 | } | ||
| 105 | } | ||
| 106 | |||
| 107 | /// Create a new RelayHealthTracker with default settings | ||
| 108 | pub fn with_defaults() -> Self { | ||
| 109 | Self { | ||
| 110 | health: DashMap::new(), | ||
| 111 | max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, | ||
| 112 | } | ||
| 113 | } | ||
| 114 | |||
| 115 | /// Create a new RelayHealthTracker with custom max backoff | ||
| 116 | pub fn with_max_backoff(max_backoff_secs: u64) -> Self { | ||
| 117 | Self { | ||
| 118 | health: DashMap::new(), | ||
| 119 | max_backoff_secs, | ||
| 120 | } | ||
| 121 | } | ||
| 122 | |||
| 123 | /// Record a successful connection to a relay | ||
| 124 | /// | ||
| 125 | /// Resets the relay to Healthy state and clears failure counters. | ||
| 126 | pub fn record_success(&self, relay_url: &str) { | ||
| 127 | let now = Instant::now(); | ||
| 128 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); | ||
| 129 | let health = entry.value_mut(); | ||
| 130 | |||
| 131 | let old_state = health.state; | ||
| 132 | |||
| 133 | // Reset to healthy state | ||
| 134 | health.state = HealthState::Healthy; | ||
| 135 | health.consecutive_failures = 0; | ||
| 136 | health.first_failure_time = None; | ||
| 137 | health.last_failure_time = None; | ||
| 138 | health.last_success_time = Some(now); | ||
| 139 | health.next_retry_at = None; | ||
| 140 | |||
| 141 | if old_state != HealthState::Healthy { | ||
| 142 | tracing::info!( | ||
| 143 | "Relay {} recovered to healthy (was {:?})", | ||
| 144 | relay_url, | ||
| 145 | old_state | ||
| 146 | ); | ||
| 147 | } | ||
| 148 | } | ||
| 149 | |||
| 150 | /// Record a connection failure for a relay | ||
| 151 | /// | ||
| 152 | /// Increments failure counter, updates state, and calculates next retry time. | ||
| 153 | pub fn record_failure(&self, relay_url: &str) { | ||
| 154 | let now = Instant::now(); | ||
| 155 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); | ||
| 156 | let health = entry.value_mut(); | ||
| 157 | |||
| 158 | let old_state = health.state; | ||
| 159 | |||
| 160 | // Set first_failure_time if this is a new failure streak | ||
| 161 | if health.first_failure_time.is_none() { | ||
| 162 | health.first_failure_time = Some(now); | ||
| 163 | } | ||
| 164 | |||
| 165 | health.consecutive_failures = health.consecutive_failures.saturating_add(1); | ||
| 166 | health.last_failure_time = Some(now); | ||
| 167 | |||
| 168 | // Check if we should transition to Dead state | ||
| 169 | if let Some(first_failure) = health.first_failure_time { | ||
| 170 | let failure_duration = now.duration_since(first_failure); | ||
| 171 | let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); | ||
| 172 | |||
| 173 | if failure_duration >= dead_threshold { | ||
| 174 | health.state = HealthState::Dead; | ||
| 175 | // Dead relays retry once per day | ||
| 176 | health.next_retry_at = | ||
| 177 | Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); | ||
| 178 | |||
| 179 | if old_state != HealthState::Dead { | ||
| 180 | tracing::warn!( | ||
| 181 | "Relay {} marked dead after 24h failures ({} consecutive failures)", | ||
| 182 | relay_url, | ||
| 183 | health.consecutive_failures | ||
| 184 | ); | ||
| 185 | } | ||
| 186 | } else { | ||
| 187 | // Degraded state with exponential backoff | ||
| 188 | health.state = HealthState::Degraded; | ||
| 189 | let backoff = Self::get_backoff_duration( | ||
| 190 | health.consecutive_failures, | ||
| 191 | self.max_backoff_secs, | ||
| 192 | ); | ||
| 193 | health.next_retry_at = Some(now + backoff); | ||
| 194 | |||
| 195 | if old_state != HealthState::Degraded { | ||
| 196 | tracing::warn!( | ||
| 197 | "Relay {} degraded, backoff {:?}", | ||
| 198 | relay_url, | ||
| 199 | backoff | ||
| 200 | ); | ||
| 201 | } else { | ||
| 202 | tracing::debug!( | ||
| 203 | "Relay {} failure #{}, backoff {:?}", | ||
| 204 | relay_url, | ||
| 205 | health.consecutive_failures, | ||
| 206 | backoff | ||
| 207 | ); | ||
| 208 | } | ||
| 209 | } | ||
| 210 | } | ||
| 211 | } | ||
| 212 | |||
| 213 | /// Check if a connection attempt should be made to a relay | ||
| 214 | /// | ||
| 215 | /// Returns true if: | ||
| 216 | /// - The relay has no health record (first attempt) | ||
| 217 | /// - The relay is healthy | ||
| 218 | /// - The backoff period has elapsed | ||
| 219 | pub fn should_attempt_connection(&self, relay_url: &str) -> bool { | ||
| 220 | let entry = self.health.get(relay_url); | ||
| 221 | |||
| 222 | match entry { | ||
| 223 | None => true, // No record, allow first attempt | ||
| 224 | Some(entry) => { | ||
| 225 | let health = entry.value(); | ||
| 226 | |||
| 227 | match health.state { | ||
| 228 | HealthState::Healthy => true, | ||
| 229 | HealthState::Degraded | HealthState::Dead => { | ||
| 230 | // Check if backoff period has elapsed | ||
| 231 | match health.next_retry_at { | ||
| 232 | None => true, | ||
| 233 | Some(next_retry) => Instant::now() >= next_retry, | ||
| 234 | } | ||
| 235 | } | ||
| 236 | } | ||
| 237 | } | ||
| 238 | } | ||
| 239 | } | ||
| 240 | |||
| 241 | /// Get the current health state of a relay | ||
| 242 | pub fn get_state(&self, relay_url: &str) -> HealthState { | ||
| 243 | self.health | ||
| 244 | .get(relay_url) | ||
| 245 | .map(|entry| entry.value().state) | ||
| 246 | .unwrap_or(HealthState::Healthy) | ||
| 247 | } | ||
| 248 | |||
| 249 | /// Check if a relay is marked as dead | ||
| 250 | pub fn is_dead(&self, relay_url: &str) -> bool { | ||
| 251 | self.get_state(relay_url) == HealthState::Dead | ||
| 252 | } | ||
| 253 | |||
| 254 | /// Get the remaining backoff duration for a relay | ||
| 255 | /// | ||
| 256 | /// Returns None if no backoff is active. | ||
| 257 | pub fn get_remaining_backoff(&self, relay_url: &str) -> Option<Duration> { | ||
| 258 | let entry = self.health.get(relay_url)?; | ||
| 259 | let health = entry.value(); | ||
| 260 | let next_retry = health.next_retry_at?; | ||
| 261 | let now = Instant::now(); | ||
| 262 | |||
| 263 | if now >= next_retry { | ||
| 264 | None | ||
| 265 | } else { | ||
| 266 | Some(next_retry - now) | ||
| 267 | } | ||
| 268 | } | ||
| 269 | |||
| 270 | /// Get the consecutive failure count for a relay | ||
| 271 | pub fn get_failure_count(&self, relay_url: &str) -> u32 { | ||
| 272 | self.health | ||
| 273 | .get(relay_url) | ||
| 274 | .map(|entry| entry.value().consecutive_failures) | ||
| 275 | .unwrap_or(0) | ||
| 276 | } | ||
| 277 | |||
| 278 | /// Calculate the backoff duration based on failure count | ||
| 279 | /// | ||
| 280 | /// Uses exponential backoff: base * 2^failures, capped at max_backoff | ||
| 281 | pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration { | ||
| 282 | let backoff_secs = BASE_BACKOFF_SECS | ||
| 283 | .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); | ||
| 284 | Duration::from_secs(backoff_secs.min(max_backoff_secs)) | ||
| 285 | } | ||
| 286 | |||
| 287 | /// Get all tracked relay URLs | ||
| 288 | pub fn get_tracked_relays(&self) -> Vec<String> { | ||
| 289 | self.health.iter().map(|entry| entry.key().clone()).collect() | ||
| 290 | } | ||
| 291 | |||
| 292 | /// Get a clone of the health info for a relay | ||
| 293 | pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> { | ||
| 294 | self.health.get(relay_url).map(|entry| entry.value().clone()) | ||
| 295 | } | ||
| 296 | } | ||
| 297 | |||
| 298 | /// Create a shared RelayHealthTracker wrapped in Arc | ||
| 299 | pub fn create_health_tracker(config: &Config) -> Arc<RelayHealthTracker> { | ||
| 300 | Arc::new(RelayHealthTracker::new(config)) | ||
| 301 | } | ||
| 302 | |||
| 303 | #[cfg(test)] | ||
| 304 | mod tests { | ||
| 305 | use super::*; | ||
| 306 | |||
| 307 | #[test] | ||
| 308 | fn test_health_state_display() { | ||
| 309 | assert_eq!(HealthState::Healthy.to_string(), "healthy"); | ||
| 310 | assert_eq!(HealthState::Degraded.to_string(), "degraded"); | ||
| 311 | assert_eq!(HealthState::Dead.to_string(), "dead"); | ||
| 312 | } | ||
| 313 | |||
| 314 | #[test] | ||
| 315 | fn test_default_health_is_healthy() { | ||
| 316 | let health = RelayHealth::default(); | ||
| 317 | assert_eq!(health.state, HealthState::Healthy); | ||
| 318 | assert_eq!(health.consecutive_failures, 0); | ||
| 319 | assert!(health.first_failure_time.is_none()); | ||
| 320 | } | ||
| 321 | |||
| 322 | #[test] | ||
| 323 | fn test_should_attempt_connection_new_relay() { | ||
| 324 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 325 | assert!(tracker.should_attempt_connection("wss://new-relay.example.com")); | ||
| 326 | } | ||
| 327 | |||
| 328 | #[test] | ||
| 329 | fn test_record_success_resets_to_healthy() { | ||
| 330 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 331 | let url = "wss://test-relay.example.com"; | ||
| 332 | |||
| 333 | // Simulate a few failures | ||
| 334 | tracker.record_failure(url); | ||
| 335 | tracker.record_failure(url); | ||
| 336 | assert_eq!(tracker.get_state(url), HealthState::Degraded); | ||
| 337 | assert_eq!(tracker.get_failure_count(url), 2); | ||
| 338 | |||
| 339 | // Record success | ||
| 340 | tracker.record_success(url); | ||
| 341 | assert_eq!(tracker.get_state(url), HealthState::Healthy); | ||
| 342 | assert_eq!(tracker.get_failure_count(url), 0); | ||
| 343 | assert!(tracker.should_attempt_connection(url)); | ||
| 344 | } | ||
| 345 | |||
| 346 | #[test] | ||
| 347 | fn test_backoff_increases_exponentially() { | ||
| 348 | // failure 1: 5s | ||
| 349 | assert_eq!( | ||
| 350 | RelayHealthTracker::get_backoff_duration(1, 3600), | ||
| 351 | Duration::from_secs(5) | ||
| 352 | ); | ||
| 353 | // failure 2: 10s | ||
| 354 | assert_eq!( | ||
| 355 | RelayHealthTracker::get_backoff_duration(2, 3600), | ||
| 356 | Duration::from_secs(10) | ||
| 357 | ); | ||
| 358 | // failure 3: 20s | ||
| 359 | assert_eq!( | ||
| 360 | RelayHealthTracker::get_backoff_duration(3, 3600), | ||
| 361 | Duration::from_secs(20) | ||
| 362 | ); | ||
| 363 | // failure 4: 40s | ||
| 364 | assert_eq!( | ||
| 365 | RelayHealthTracker::get_backoff_duration(4, 3600), | ||
| 366 | Duration::from_secs(40) | ||
| 367 | ); | ||
| 368 | // failure 5: 80s | ||
| 369 | assert_eq!( | ||
| 370 | RelayHealthTracker::get_backoff_duration(5, 3600), | ||
| 371 | Duration::from_secs(80) | ||
| 372 | ); | ||
| 373 | } | ||
| 374 | |||
| 375 | #[test] | ||
| 376 | fn test_backoff_capped_at_max() { | ||
| 377 | let max_backoff = 3600u64; | ||
| 378 | // After many failures, should cap at max_backoff (1 hour) | ||
| 379 | assert_eq!( | ||
| 380 | RelayHealthTracker::get_backoff_duration(20, max_backoff), | ||
| 381 | Duration::from_secs(max_backoff) | ||
| 382 | ); | ||
| 383 | } | ||
| 384 | |||
| 385 | #[test] | ||
| 386 | fn test_degraded_state_after_failure() { | ||
| 387 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 388 | let url = "wss://test-relay.example.com"; | ||
| 389 | |||
| 390 | tracker.record_failure(url); | ||
| 391 | assert_eq!(tracker.get_state(url), HealthState::Degraded); | ||
| 392 | assert_eq!(tracker.get_failure_count(url), 1); | ||
| 393 | } | ||
| 394 | |||
| 395 | #[test] | ||
| 396 | fn test_backoff_blocks_immediate_reconnection() { | ||
| 397 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 398 | let url = "wss://test-relay.example.com"; | ||
| 399 | |||
| 400 | tracker.record_failure(url); | ||
| 401 | |||
| 402 | // Immediately after failure, should not attempt (backoff active) | ||
| 403 | assert!(!tracker.should_attempt_connection(url)); | ||
| 404 | |||
| 405 | // Remaining backoff should be some positive duration | ||
| 406 | let remaining = tracker.get_remaining_backoff(url); | ||
| 407 | assert!(remaining.is_some()); | ||
| 408 | assert!(remaining.unwrap() > Duration::ZERO); | ||
| 409 | } | ||
| 410 | |||
| 411 | #[test] | ||
| 412 | fn test_is_dead() { | ||
| 413 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 414 | let url = "wss://test-relay.example.com"; | ||
| 415 | |||
| 416 | // Initially not dead | ||
| 417 | assert!(!tracker.is_dead(url)); | ||
| 418 | |||
| 419 | // After a failure, still not dead (just degraded) | ||
| 420 | tracker.record_failure(url); | ||
| 421 | assert!(!tracker.is_dead(url)); | ||
| 422 | assert_eq!(tracker.get_state(url), HealthState::Degraded); | ||
| 423 | } | ||
| 424 | |||
| 425 | #[test] | ||
| 426 | fn test_get_tracked_relays() { | ||
| 427 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 428 | |||
| 429 | tracker.record_success("wss://relay1.example.com"); | ||
| 430 | tracker.record_failure("wss://relay2.example.com"); | ||
| 431 | |||
| 432 | let tracked = tracker.get_tracked_relays(); | ||
| 433 | assert_eq!(tracked.len(), 2); | ||
| 434 | assert!(tracked.contains(&"wss://relay1.example.com".to_string())); | ||
| 435 | assert!(tracked.contains(&"wss://relay2.example.com".to_string())); | ||
| 436 | } | ||
| 437 | |||
| 438 | #[test] | ||
| 439 | fn test_custom_max_backoff() { | ||
| 440 | let custom_max = 60u64; // 1 minute max | ||
| 441 | let tracker = RelayHealthTracker::with_max_backoff(custom_max); | ||
| 442 | let url = "wss://test-relay.example.com"; | ||
| 443 | |||
| 444 | // Simulate many failures | ||
| 445 | for _ in 0..20 { | ||
| 446 | tracker.record_failure(url); | ||
| 447 | } | ||
| 448 | |||
| 449 | // The remaining backoff should respect the custom max | ||
| 450 | // Note: We can't easily test the internal backoff calculation here, | ||
| 451 | // but we can verify the tracker was created with the custom setting | ||
| 452 | assert_eq!(tracker.max_backoff_secs, custom_max); | ||
| 453 | } | ||
| 454 | |||
| 455 | #[test] | ||
| 456 | fn test_get_health_returns_clone() { | ||
| 457 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 458 | let url = "wss://test-relay.example.com"; | ||
| 459 | |||
| 460 | tracker.record_success(url); | ||
| 461 | let health = tracker.get_health(url); | ||
| 462 | |||
| 463 | assert!(health.is_some()); | ||
| 464 | let health = health.unwrap(); | ||
| 465 | assert_eq!(health.state, HealthState::Healthy); | ||
| 466 | assert!(health.last_success_time.is_some()); | ||
| 467 | } | ||
| 468 | |||
| 469 | #[test] | ||
| 470 | fn test_get_health_nonexistent() { | ||
| 471 | let tracker = RelayHealthTracker::with_defaults(); | ||
| 472 | let health = tracker.get_health("wss://nonexistent.example.com"); | ||
| 473 | assert!(health.is_none()); | ||
| 474 | } | ||
| 475 | } \ No newline at end of file | ||
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs deleted file mode 100644 index c93e583..0000000 --- a/src/sync/metrics.rs +++ /dev/null | |||
| @@ -1,348 +0,0 @@ | |||
| 1 | //! Prometheus Metrics for Proactive Sync (GRASP-02 Phase 6) | ||
| 2 | //! | ||
| 3 | //! This module provides comprehensive sync monitoring metrics including: | ||
| 4 | //! - Connection status and attempts per relay | ||
| 5 | //! - Health state tracking (Healthy/Degraded/Dead) | ||
| 6 | //! - Event sync tracking by source (live/startup/reconnect/daily catchup) | ||
| 7 | //! - Gap events filled during catchup operations | ||
| 8 | //! | ||
| 9 | //! All metrics follow the `ngit_sync_` prefix convention. | ||
| 10 | |||
| 11 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; | ||
| 12 | |||
| 13 | use super::health::HealthState; | ||
| 14 | |||
| 15 | /// Prometheus metrics for the proactive sync system | ||
| 16 | #[derive(Clone)] | ||
| 17 | pub struct SyncMetrics { | ||
| 18 | // === Connection metrics === | ||
| 19 | /// Per-relay connection status (1=connected, 0=disconnected) | ||
| 20 | relay_connected: IntGaugeVec, | ||
| 21 | /// Connection attempts by relay and result (success/failure) | ||
| 22 | connection_attempts_total: IntCounterVec, | ||
| 23 | |||
| 24 | // === Health metrics === | ||
| 25 | /// Per-relay health status (healthy=1, degraded=2, dead=3) | ||
| 26 | relay_status: IntGaugeVec, | ||
| 27 | /// Per-relay consecutive failure count | ||
| 28 | relay_failures: IntGaugeVec, | ||
| 29 | |||
| 30 | // === Event metrics === | ||
| 31 | /// Events synced by source (live/startup/reconnect/daily) | ||
| 32 | events_total: IntCounterVec, | ||
| 33 | /// Gap events filled during catchup, by relay | ||
| 34 | gap_events_total: IntCounterVec, | ||
| 35 | |||
| 36 | // === Summary metrics === | ||
| 37 | /// Total relays discovered and tracked | ||
| 38 | relays_tracked_total: IntGauge, | ||
| 39 | /// Currently connected relay count | ||
| 40 | relays_connected_total: IntGauge, | ||
| 41 | /// Relays marked as dead | ||
| 42 | relays_dead_total: IntGauge, | ||
| 43 | } | ||
| 44 | |||
| 45 | impl SyncMetrics { | ||
| 46 | /// Register all sync metrics with the provided Prometheus registry | ||
| 47 | pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> { | ||
| 48 | // Connection metrics | ||
| 49 | let relay_connected = IntGaugeVec::new( | ||
| 50 | Opts::new( | ||
| 51 | "ngit_sync_relay_connected", | ||
| 52 | "Relay connection status (1=connected, 0=disconnected)", | ||
| 53 | ), | ||
| 54 | &["relay"], | ||
| 55 | )?; | ||
| 56 | registry.register(Box::new(relay_connected.clone()))?; | ||
| 57 | |||
| 58 | let connection_attempts_total = IntCounterVec::new( | ||
| 59 | Opts::new( | ||
| 60 | "ngit_sync_connection_attempts_total", | ||
| 61 | "Total connection attempts by relay and result", | ||
| 62 | ), | ||
| 63 | &["relay", "result"], | ||
| 64 | )?; | ||
| 65 | registry.register(Box::new(connection_attempts_total.clone()))?; | ||
| 66 | |||
| 67 | // Health metrics | ||
| 68 | let relay_status = IntGaugeVec::new( | ||
| 69 | Opts::new( | ||
| 70 | "ngit_sync_relay_status", | ||
| 71 | "Relay health status (1=healthy, 2=degraded, 3=dead)", | ||
| 72 | ), | ||
| 73 | &["relay"], | ||
| 74 | )?; | ||
| 75 | registry.register(Box::new(relay_status.clone()))?; | ||
| 76 | |||
| 77 | let relay_failures = IntGaugeVec::new( | ||
| 78 | Opts::new( | ||
| 79 | "ngit_sync_relay_failures", | ||
| 80 | "Consecutive failure count per relay", | ||
| 81 | ), | ||
| 82 | &["relay"], | ||
| 83 | )?; | ||
| 84 | registry.register(Box::new(relay_failures.clone()))?; | ||
| 85 | |||
| 86 | // Event metrics | ||
| 87 | let events_total = IntCounterVec::new( | ||
| 88 | Opts::new( | ||
| 89 | "ngit_sync_events_total", | ||
| 90 | "Total events synced by source type", | ||
| 91 | ), | ||
| 92 | &["source"], | ||
| 93 | )?; | ||
| 94 | registry.register(Box::new(events_total.clone()))?; | ||
| 95 | |||
| 96 | let gap_events_total = IntCounterVec::new( | ||
| 97 | Opts::new( | ||
| 98 | "ngit_sync_gap_events_total", | ||
| 99 | "Gap events filled during catchup by relay", | ||
| 100 | ), | ||
| 101 | &["relay"], | ||
| 102 | )?; | ||
| 103 | registry.register(Box::new(gap_events_total.clone()))?; | ||
| 104 | |||
| 105 | // Summary metrics | ||
| 106 | let relays_tracked_total = IntGauge::with_opts(Opts::new( | ||
| 107 | "ngit_sync_relays_tracked_total", | ||
| 108 | "Total number of relays discovered and tracked", | ||
| 109 | ))?; | ||
| 110 | registry.register(Box::new(relays_tracked_total.clone()))?; | ||
| 111 | |||
| 112 | let relays_connected_total = IntGauge::with_opts(Opts::new( | ||
| 113 | "ngit_sync_relays_connected_total", | ||
| 114 | "Number of currently connected relays", | ||
| 115 | ))?; | ||
| 116 | registry.register(Box::new(relays_connected_total.clone()))?; | ||
| 117 | |||
| 118 | let relays_dead_total = IntGauge::with_opts(Opts::new( | ||
| 119 | "ngit_sync_relays_dead_total", | ||
| 120 | "Number of relays marked as dead", | ||
| 121 | ))?; | ||
| 122 | registry.register(Box::new(relays_dead_total.clone()))?; | ||
| 123 | |||
| 124 | Ok(Self { | ||
| 125 | relay_connected, | ||
| 126 | connection_attempts_total, | ||
| 127 | relay_status, | ||
| 128 | relay_failures, | ||
| 129 | events_total, | ||
| 130 | gap_events_total, | ||
| 131 | relays_tracked_total, | ||
| 132 | relays_connected_total, | ||
| 133 | relays_dead_total, | ||
| 134 | }) | ||
| 135 | } | ||
| 136 | |||
| 137 | // === Connection Recording Methods === | ||
| 138 | |||
| 139 | /// Record a connection attempt (success or failure) | ||
| 140 | pub fn record_connection_attempt(&self, relay: &str, success: bool) { | ||
| 141 | let result = if success { "success" } else { "failure" }; | ||
| 142 | self.connection_attempts_total | ||
| 143 | .with_label_values(&[relay, result]) | ||
| 144 | .inc(); | ||
| 145 | } | ||
| 146 | |||
| 147 | /// Set relay connection status | ||
| 148 | pub fn set_relay_connected(&self, relay: &str, connected: bool) { | ||
| 149 | self.relay_connected | ||
| 150 | .with_label_values(&[relay]) | ||
| 151 | .set(if connected { 1 } else { 0 }); | ||
| 152 | |||
| 153 | // Update connected count based on all relay values | ||
| 154 | // This is handled by update_connected_count() for accuracy | ||
| 155 | } | ||
| 156 | |||
| 157 | /// Update the total connected relay count | ||
| 158 | pub fn update_connected_count(&self, count: i64) { | ||
| 159 | self.relays_connected_total.set(count); | ||
| 160 | } | ||
| 161 | |||
| 162 | /// Increment connected count | ||
| 163 | pub fn inc_connected_count(&self) { | ||
| 164 | self.relays_connected_total.inc(); | ||
| 165 | } | ||
| 166 | |||
| 167 | /// Decrement connected count | ||
| 168 | pub fn dec_connected_count(&self) { | ||
| 169 | self.relays_connected_total.dec(); | ||
| 170 | } | ||
| 171 | |||
| 172 | // === Health Recording Methods === | ||
| 173 | |||
| 174 | /// Record relay health state change | ||
| 175 | pub fn record_health_state(&self, relay: &str, state: HealthState) { | ||
| 176 | let state_value = match state { | ||
| 177 | HealthState::Healthy => 1, | ||
| 178 | HealthState::Degraded => 2, | ||
| 179 | HealthState::Dead => 3, | ||
| 180 | }; | ||
| 181 | self.relay_status.with_label_values(&[relay]).set(state_value); | ||
| 182 | } | ||
| 183 | |||
| 184 | /// Record relay failure count | ||
| 185 | pub fn record_failure_count(&self, relay: &str, count: u32) { | ||
| 186 | self.relay_failures | ||
| 187 | .with_label_values(&[relay]) | ||
| 188 | .set(count as i64); | ||
| 189 | } | ||
| 190 | |||
| 191 | /// Update dead relay count | ||
| 192 | pub fn update_dead_count(&self, count: i64) { | ||
| 193 | self.relays_dead_total.set(count); | ||
| 194 | } | ||
| 195 | |||
| 196 | /// Increment dead relay count | ||
| 197 | pub fn inc_dead_count(&self) { | ||
| 198 | self.relays_dead_total.inc(); | ||
| 199 | } | ||
| 200 | |||
| 201 | /// Decrement dead relay count | ||
| 202 | pub fn dec_dead_count(&self) { | ||
| 203 | self.relays_dead_total.dec(); | ||
| 204 | } | ||
| 205 | |||
| 206 | // === Event Recording Methods === | ||
| 207 | |||
| 208 | /// Record a synced event by source type | ||
| 209 | /// | ||
| 210 | /// Source types: | ||
| 211 | /// - "live" - Real-time subscription events | ||
| 212 | /// - "startup" - Events from startup catchup | ||
| 213 | /// - "reconnect" - Events from reconnection catchup | ||
| 214 | /// - "daily" - Events from daily catchup | ||
| 215 | pub fn record_event(&self, source: &str) { | ||
| 216 | self.events_total.with_label_values(&[source]).inc(); | ||
| 217 | } | ||
| 218 | |||
| 219 | /// Record multiple events synced by source type | ||
| 220 | pub fn record_events(&self, source: &str, count: u64) { | ||
| 221 | self.events_total | ||
| 222 | .with_label_values(&[source]) | ||
| 223 | .inc_by(count); | ||
| 224 | } | ||
| 225 | |||
| 226 | /// Record a gap event filled during catchup | ||
| 227 | pub fn record_gap_event(&self, relay: &str) { | ||
| 228 | self.gap_events_total.with_label_values(&[relay]).inc(); | ||
| 229 | } | ||
| 230 | |||
| 231 | /// Record multiple gap events filled during catchup | ||
| 232 | pub fn record_gap_events(&self, relay: &str, count: u64) { | ||
| 233 | self.gap_events_total | ||
| 234 | .with_label_values(&[relay]) | ||
| 235 | .inc_by(count); | ||
| 236 | } | ||
| 237 | |||
| 238 | // === Summary Recording Methods === | ||
| 239 | |||
| 240 | /// Set the total tracked relay count | ||
| 241 | pub fn set_tracked_count(&self, count: i64) { | ||
| 242 | self.relays_tracked_total.set(count); | ||
| 243 | } | ||
| 244 | |||
| 245 | /// Increment tracked relay count | ||
| 246 | pub fn inc_tracked_count(&self) { | ||
| 247 | self.relays_tracked_total.inc(); | ||
| 248 | } | ||
| 249 | |||
| 250 | /// Get current tracked relay count | ||
| 251 | pub fn get_tracked_count(&self) -> i64 { | ||
| 252 | self.relays_tracked_total.get() | ||
| 253 | } | ||
| 254 | |||
| 255 | /// Get current connected relay count | ||
| 256 | pub fn get_connected_count(&self) -> i64 { | ||
| 257 | self.relays_connected_total.get() | ||
| 258 | } | ||
| 259 | |||
| 260 | /// Get current dead relay count | ||
| 261 | pub fn get_dead_count(&self) -> i64 { | ||
| 262 | self.relays_dead_total.get() | ||
| 263 | } | ||
| 264 | } | ||
| 265 | |||
| 266 | /// Event source types for metrics tracking | ||
| 267 | pub mod event_source { | ||
| 268 | /// Real-time subscription events | ||
| 269 | pub const LIVE: &str = "live"; | ||
| 270 | /// Events from startup catchup | ||
| 271 | pub const STARTUP: &str = "startup"; | ||
| 272 | /// Events from reconnection catchup | ||
| 273 | pub const RECONNECT: &str = "reconnect"; | ||
| 274 | /// Events from daily catchup | ||
| 275 | pub const DAILY: &str = "daily"; | ||
| 276 | } | ||
| 277 | |||
| 278 | #[cfg(test)] | ||
| 279 | mod tests { | ||
| 280 | use super::*; | ||
| 281 | |||
| 282 | fn create_test_registry() -> Registry { | ||
| 283 | Registry::new() | ||
| 284 | } | ||
| 285 | |||
| 286 | #[test] | ||
| 287 | fn test_metrics_registration() { | ||
| 288 | let registry = create_test_registry(); | ||
| 289 | let metrics = SyncMetrics::register(®istry); | ||
| 290 | assert!(metrics.is_ok()); | ||
| 291 | } | ||
| 292 | |||
| 293 | #[test] | ||
| 294 | fn test_connection_metrics() { | ||
| 295 | let registry = create_test_registry(); | ||
| 296 | let metrics = SyncMetrics::register(®istry).unwrap(); | ||
| 297 | |||
| 298 | metrics.record_connection_attempt("wss://relay1.example.com", true); | ||
| 299 | metrics.record_connection_attempt("wss://relay1.example.com", false); | ||
| 300 | metrics.record_connection_attempt("wss://relay2.example.com", true); | ||
| 301 | |||
| 302 | metrics.set_relay_connected("wss://relay1.example.com", true); | ||
| 303 | metrics.inc_connected_count(); | ||
| 304 | |||
| 305 | assert_eq!(metrics.get_connected_count(), 1); | ||
| 306 | } | ||
| 307 | |||
| 308 | #[test] | ||
| 309 | fn test_health_metrics() { | ||
| 310 | let registry = create_test_registry(); | ||
| 311 | let metrics = SyncMetrics::register(®istry).unwrap(); | ||
| 312 | |||
| 313 | metrics.record_health_state("wss://relay1.example.com", HealthState::Healthy); | ||
| 314 | metrics.record_health_state("wss://relay2.example.com", HealthState::Degraded); | ||
| 315 | metrics.record_health_state("wss://relay3.example.com", HealthState::Dead); | ||
| 316 | |||
| 317 | metrics.record_failure_count("wss://relay2.example.com", 5); | ||
| 318 | metrics.update_dead_count(1); | ||
| 319 | |||
| 320 | assert_eq!(metrics.get_dead_count(), 1); | ||
| 321 | } | ||
| 322 | |||
| 323 | #[test] | ||
| 324 | fn test_event_metrics() { | ||
| 325 | let registry = create_test_registry(); | ||
| 326 | let metrics = SyncMetrics::register(®istry).unwrap(); | ||
| 327 | |||
| 328 | metrics.record_event(event_source::LIVE); | ||
| 329 | metrics.record_events(event_source::STARTUP, 10); | ||
| 330 | metrics.record_gap_event("wss://relay1.example.com"); | ||
| 331 | metrics.record_gap_events("wss://relay2.example.com", 5); | ||
| 332 | } | ||
| 333 | |||
| 334 | #[test] | ||
| 335 | fn test_summary_metrics() { | ||
| 336 | let registry = create_test_registry(); | ||
| 337 | let metrics = SyncMetrics::register(®istry).unwrap(); | ||
| 338 | |||
| 339 | metrics.set_tracked_count(5); | ||
| 340 | assert_eq!(metrics.get_tracked_count(), 5); | ||
| 341 | |||
| 342 | metrics.inc_tracked_count(); | ||
| 343 | assert_eq!(metrics.get_tracked_count(), 6); | ||
| 344 | |||
| 345 | metrics.update_connected_count(3); | ||
| 346 | assert_eq!(metrics.get_connected_count(), 3); | ||
| 347 | } | ||
| 348 | } \ No newline at end of file | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 9dec982..c1f8bca 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1,1039 +1,375 @@ | |||
| 1 | //! Proactive Sync Module for GRASP-02 | 1 | //! Proactive Sync Module - GRASP-02 v4 Implementation |
| 2 | //! | 2 | //! |
| 3 | //! This module implements the proactive sync system that ensures data availability | 3 | //! This module implements proactive synchronization of repository data from external |
| 4 | //! for repositories hosted on this relay by syncing from other relays in the ecosystem. | 4 | //! relays based on relay URLs listed in 30617 repository announcements. |
| 5 | //! | 5 | //! |
| 6 | //! ## Architecture Overview | 6 | //! ## Architecture |
| 7 | //! | 7 | //! |
| 8 | //! The sync system is built around two core data structures: | 8 | //! The sync system uses three index structures: |
| 9 | //! - `RepoSyncIndex` - What we WANT to sync (source of truth from self-subscription) | ||
| 10 | //! - `RelaySyncIndex` - What we have CONFIRMED syncing + connection state | ||
| 11 | //! - `PendingSyncIndex` - In-flight batches awaiting EOSE confirmation | ||
| 9 | //! | 12 | //! |
| 10 | //! - **FollowingRepoRootEvents**: Tracks repository root events we're following | 13 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. |
| 11 | //! - **SyncRelays**: Tracks relays we sync from, including their repos and events | ||
| 12 | //! | ||
| 13 | //! These type aliases are colocated with SyncManager (following the pattern of | ||
| 14 | //! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity. | ||
| 15 | //! | ||
| 16 | //! ## Submodules | ||
| 17 | //! | ||
| 18 | //! - [`health`]: Relay health tracking with exponential backoff and dead relay detection | ||
| 19 | //! - [`metrics`]: Prometheus metrics for sync operations | ||
| 20 | //! | ||
| 21 | //! ## Memory Estimates (from design doc) | ||
| 22 | //! | ||
| 23 | //! At target scale (1,000 repos, 100 relays): | ||
| 24 | //! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB | ||
| 25 | //! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB | ||
| 26 | //! - **Total in-memory state**: ~10 MB | ||
| 27 | //! | ||
| 28 | //! ## Upper Bounds (triggers for redesign) | ||
| 29 | //! | ||
| 30 | //! - 10,000+ repos: Consider database-backed state | ||
| 31 | //! - 500+ sync relays: Consider connection pooling | ||
| 32 | //! - 500+ root events per repo: Consider per-repo pagination | ||
| 33 | //! | ||
| 34 | //! ## Design References | ||
| 35 | //! | ||
| 36 | //! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md) | ||
| 37 | //! for the complete design context. | ||
| 38 | 14 | ||
| 39 | use std::collections::{HashMap, HashSet}; | 15 | use std::collections::{HashMap, HashSet}; |
| 40 | use std::net::SocketAddr; | ||
| 41 | use std::sync::Arc; | 16 | use std::sync::Arc; |
| 42 | 17 | ||
| 43 | use nostr_relay_builder::prelude::{ | ||
| 44 | DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy, | ||
| 45 | }; | ||
| 46 | use nostr_sdk::prelude::*; | 18 | use nostr_sdk::prelude::*; |
| 47 | use nostr_sdk::EventId; | 19 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; |
| 48 | use tokio::sync::{mpsc, RwLock}; | 20 | use tokio::sync::RwLock; |
| 49 | 21 | ||
| 50 | use crate::config::Config; | 22 | use crate::config::Config; |
| 51 | use crate::nostr::builder::Nip34WritePolicy; | 23 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 52 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | ||
| 53 | use crate::nostr::SharedDatabase; | ||
| 54 | |||
| 55 | mod relay_connection; | ||
| 56 | mod self_subscriber; | ||
| 57 | pub use relay_connection::{RelayConnection, RelayEvent}; | ||
| 58 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | ||
| 59 | 24 | ||
| 60 | // ============================================================================= | 25 | // ============================================================================= |
| 61 | // Type Aliases for Sync State | 26 | // Type Aliases for Index Structures |
| 62 | // ============================================================================= | 27 | // ============================================================================= |
| 63 | 28 | ||
| 64 | /// Repository root events we're following. | 29 | /// What we WANT to sync - derived from events received via self-subscription. |
| 65 | /// | 30 | /// Updated immediately when self-subscriber batch fires. |
| 66 | /// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621) | 31 | /// Key: repo addressable ref - 30617:pubkey:identifier |
| 67 | /// we need to follow for each repository we host. | 32 | pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>; |
| 68 | /// | ||
| 69 | /// ## Key Format | ||
| 70 | /// | ||
| 71 | /// The key is a repository addressable reference in the format: | ||
| 72 | /// `"30617:<pubkey>:<identifier>"` | ||
| 73 | /// | ||
| 74 | /// For example: `"30617:abc123...def:my-project"` | ||
| 75 | /// | ||
| 76 | /// ## Value | ||
| 77 | /// | ||
| 78 | /// A set of event IDs representing root events (PRs, Issues, Patches, Status events) | ||
| 79 | /// that reference this repository via an `a` tag. | ||
| 80 | /// | ||
| 81 | /// ## Event Kinds Tracked | ||
| 82 | /// | ||
| 83 | /// - **1617**: Patches (NIP-34) | ||
| 84 | /// - **1618**: Issues (NIP-34) | ||
| 85 | /// - **1619**: PRs (Pull Requests, NIP-34) | ||
| 86 | /// - **1621**: Status events (NIP-34) | ||
| 87 | /// | ||
| 88 | /// ## Invariants | ||
| 89 | /// | ||
| 90 | /// - May include a few extra repo refs that aren't in `SyncRelays` | ||
| 91 | /// - This is acceptable - we won't query other relays for them | ||
| 92 | /// - Updated incrementally via self-subscription | ||
| 93 | /// | ||
| 94 | /// ## Thread Safety | ||
| 95 | /// | ||
| 96 | /// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple | ||
| 97 | /// async tasks performing sync operations. | ||
| 98 | /// | ||
| 99 | /// ## Example Usage | ||
| 100 | /// | ||
| 101 | /// ```rust,ignore | ||
| 102 | /// use ngit_grasp::sync::FollowingRepoRootEvents; | ||
| 103 | /// use std::collections::HashSet; | ||
| 104 | /// use nostr_sdk::EventId; | ||
| 105 | /// | ||
| 106 | /// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) { | ||
| 107 | /// let guard = state.read().await; | ||
| 108 | /// if let Some(events) = guard.get(repo_ref) { | ||
| 109 | /// println!("Tracking {} root events for {}", events.len(), repo_ref); | ||
| 110 | /// } | ||
| 111 | /// } | ||
| 112 | /// ``` | ||
| 113 | pub type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>; | ||
| 114 | |||
| 115 | /// Relays we sync from, including their repos and events. | ||
| 116 | /// | ||
| 117 | /// This structure tracks which relays we need to connect to for syncing, | ||
| 118 | /// and for each relay, which repositories and their root events we're interested in. | ||
| 119 | /// | ||
| 120 | /// ## Key Format (Outer HashMap) | ||
| 121 | /// | ||
| 122 | /// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"` | ||
| 123 | /// | ||
| 124 | /// ## Value Format (Inner HashMap) | ||
| 125 | /// | ||
| 126 | /// For each relay, we maintain a map of: | ||
| 127 | /// - Key: Repository addressable reference (`"30617:<pubkey>:<identifier>"`) | ||
| 128 | /// - Value: Set of event IDs for that repo which should be synced from this relay | ||
| 129 | /// | ||
| 130 | /// ## Relay Selection Criteria | ||
| 131 | /// | ||
| 132 | /// A relay is included if its URL appears in a repository announcement (kind 30617) | ||
| 133 | /// that **also** lists our service URL. This ensures we only sync from relays | ||
| 134 | /// for repositories that are actually hosted on our relay. | ||
| 135 | /// | ||
| 136 | /// ## Bootstrap Relay | ||
| 137 | /// | ||
| 138 | /// If configured, the bootstrap relay is always present in this map and is | ||
| 139 | /// excluded from automatic removal logic. The bootstrap relay is used for | ||
| 140 | /// initial sync and discovery even when no repositories explicitly list it. | ||
| 141 | /// | ||
| 142 | /// ## Thread Safety | ||
| 143 | /// | ||
| 144 | /// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple | ||
| 145 | /// async tasks performing sync operations. | ||
| 146 | /// | ||
| 147 | /// ## Example Usage | ||
| 148 | /// | ||
| 149 | /// ```rust,ignore | ||
| 150 | /// use ngit_grasp::sync::SyncRelays; | ||
| 151 | /// use std::collections::{HashMap, HashSet}; | ||
| 152 | /// | ||
| 153 | /// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) { | ||
| 154 | /// let guard = state.read().await; | ||
| 155 | /// if let Some(repos) = guard.get(relay_url) { | ||
| 156 | /// println!("Relay {} tracks {} repos", relay_url, repos.len()); | ||
| 157 | /// for (repo_ref, events) in repos { | ||
| 158 | /// println!(" {} -> {} events", repo_ref, events.len()); | ||
| 159 | /// } | ||
| 160 | /// } | ||
| 161 | /// } | ||
| 162 | /// ``` | ||
| 163 | pub type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>; | ||
| 164 | 33 | ||
| 165 | /// Creates a new empty `FollowingRepoRootEvents` state. | 34 | /// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. |
| 166 | /// | 35 | /// Key: relay URL |
| 167 | /// Use this to initialize the state before populating from database queries. | 36 | pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>; |
| 168 | pub fn new_following_repo_root_events() -> FollowingRepoRootEvents { | ||
| 169 | Arc::new(RwLock::new(HashMap::new())) | ||
| 170 | } | ||
| 171 | 37 | ||
| 172 | /// Creates a new empty `SyncRelays` state. | 38 | /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. |
| 173 | /// | 39 | /// Each batch has its own ID and can confirm independently. |
| 174 | /// Use this to initialize the state before populating from database queries. | 40 | /// Key: relay URL |
| 175 | pub fn new_sync_relays() -> SyncRelays { | 41 | pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; |
| 176 | Arc::new(RwLock::new(HashMap::new())) | ||
| 177 | } | ||
| 178 | 42 | ||
| 179 | // ============================================================================= | 43 | // ============================================================================= |
| 180 | // SyncManager | 44 | // Supporting Data Structures |
| 181 | // ============================================================================= | 45 | // ============================================================================= |
| 182 | 46 | ||
| 183 | /// Manages proactive synchronization with external relays. | 47 | /// What repos and root events need to be synced |
| 184 | /// | 48 | #[derive(Debug, Clone, Default)] |
| 185 | /// The SyncManager is responsible for: | 49 | pub struct RepoSyncNeeds { |
| 186 | /// - Discovering relays from stored repository announcements | 50 | /// Relay URLs listed in this repo's 30617 announcement |
| 187 | /// - Maintaining connections to sync relays | 51 | pub relays: HashSet<String>, |
| 188 | /// - Subscribing to events at external relays | 52 | /// Root event IDs - 1617/1618/1619/1621 - that reference this repo |
| 189 | /// - Applying the acceptance policy to synced events | 53 | pub root_events: HashSet<EventId>, |
| 190 | /// | 54 | } |
| 191 | /// ## Lifecycle | ||
| 192 | /// | ||
| 193 | /// 1. `new()` - Creates manager with database and config | ||
| 194 | /// 2. `run()` - Main async loop (call in a spawned task) | ||
| 195 | /// | ||
| 196 | /// ## Current Status | ||
| 197 | /// | ||
| 198 | /// Phase 2 implementation supports: | ||
| 199 | /// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter | ||
| 200 | /// - Event processing through write policy | ||
| 201 | /// - Storage of accepted events | ||
| 202 | /// | ||
| 203 | /// Core data structures: | ||
| 204 | /// - [`FollowingRepoRootEvents`]: Repository root events we're following | ||
| 205 | /// - [`SyncRelays`]: Relays we sync from with their repos and events | ||
| 206 | pub struct SyncManager { | ||
| 207 | /// Bootstrap relay URL if configured | ||
| 208 | bootstrap_relay_url: Option<String>, | ||
| 209 | |||
| 210 | /// Our service domain for filtering repo announcements | ||
| 211 | #[allow(dead_code)] | ||
| 212 | service_domain: String, | ||
| 213 | |||
| 214 | /// Database for querying/storing events | ||
| 215 | database: SharedDatabase, | ||
| 216 | |||
| 217 | /// Write policy for applying acceptance rules | ||
| 218 | write_policy: Nip34WritePolicy, | ||
| 219 | |||
| 220 | /// Repository root events we're following (Phase 1 data structure) | ||
| 221 | #[allow(dead_code)] | ||
| 222 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 223 | |||
| 224 | /// Relays we sync from (Phase 1 data structure) | ||
| 225 | #[allow(dead_code)] | ||
| 226 | sync_relays: SyncRelays, | ||
| 227 | |||
| 228 | /// Max backoff duration for relay reconnection | ||
| 229 | #[allow(dead_code)] | ||
| 230 | max_backoff_secs: u64, | ||
| 231 | 55 | ||
| 232 | /// Socket address used for sync source (for write policy) | 56 | /// Connection status for a relay |
| 233 | sync_source_addr: SocketAddr, | 57 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] |
| 58 | pub enum ConnectionStatus { | ||
| 59 | /// Not currently connected | ||
| 60 | #[default] | ||
| 61 | Disconnected, | ||
| 62 | /// Connection attempt in progress | ||
| 63 | Connecting, | ||
| 64 | /// Successfully connected and subscribed | ||
| 65 | Connected, | ||
| 234 | } | 66 | } |
| 235 | 67 | ||
| 236 | impl SyncManager { | 68 | /// Complete state for a single relay - combines sync needs with connection lifecycle |
| 237 | /// Creates a new SyncManager. | 69 | #[derive(Debug)] |
| 238 | /// | 70 | pub struct RelayState { |
| 239 | /// # Arguments | 71 | /// Repos we have confirmed syncing from this relay |
| 240 | /// | 72 | pub repos: HashSet<String>, |
| 241 | /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync | 73 | /// Root events we have confirmed tracking |
| 242 | /// * `service_domain` - Our domain for filtering announcements | 74 | pub root_events: HashSet<EventId>, |
| 243 | /// * `database` - Database for event storage/queries | 75 | /// If true, never disconnect this relay |
| 244 | /// * `write_policy` - Policy for accepting events | 76 | pub is_bootstrap: bool, |
| 245 | /// * `config` - Configuration for sync parameters | 77 | /// Current connection status |
| 246 | pub fn new( | 78 | pub connection_status: ConnectionStatus, |
| 247 | bootstrap_relay_url: Option<String>, | 79 | /// When we last successfully connected - used for since filter on reconnect |
| 248 | service_domain: String, | 80 | pub last_connected: Option<Timestamp>, |
| 249 | database: SharedDatabase, | 81 | /// When we disconnected - for 15-minute state retention rule |
| 250 | write_policy: Nip34WritePolicy, | 82 | pub disconnected_at: Option<Timestamp>, |
| 251 | config: &Config, | 83 | // The active connection - will be added in Phase 4 |
| 252 | ) -> Self { | 84 | // pub connection: Option<RelayConnection>, |
| 253 | // Create a synthetic SocketAddr for sync source identification | 85 | } |
| 254 | // This is used when calling write_policy.admit_event() for synced events | ||
| 255 | let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); | ||
| 256 | 86 | ||
| 87 | impl Default for RelayState { | ||
| 88 | fn default() -> Self { | ||
| 257 | Self { | 89 | Self { |
| 258 | bootstrap_relay_url, | 90 | repos: HashSet::new(), |
| 259 | service_domain, | 91 | root_events: HashSet::new(), |
| 260 | database, | 92 | is_bootstrap: false, |
| 261 | write_policy, | 93 | connection_status: ConnectionStatus::Disconnected, |
| 262 | following_repo_root_events: new_following_repo_root_events(), | 94 | last_connected: None, |
| 263 | sync_relays: new_sync_relays(), | 95 | disconnected_at: None, |
| 264 | max_backoff_secs: config.sync_max_backoff_secs, | ||
| 265 | sync_source_addr, | ||
| 266 | } | ||
| 267 | } | ||
| 268 | |||
| 269 | /// Returns a reference to the following repo root events state. | ||
| 270 | /// | ||
| 271 | /// This is the Phase 1 data structure tracking which repository root events | ||
| 272 | /// (kinds 1617, 1618, 1619, 1621) we're following. | ||
| 273 | pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents { | ||
| 274 | &self.following_repo_root_events | ||
| 275 | } | ||
| 276 | |||
| 277 | /// Returns a reference to the sync relays state. | ||
| 278 | /// | ||
| 279 | /// This is the Phase 1 data structure tracking which relays we sync from | ||
| 280 | /// and their associated repositories/events. | ||
| 281 | pub fn sync_relays(&self) -> &SyncRelays { | ||
| 282 | &self.sync_relays | ||
| 283 | } | ||
| 284 | |||
| 285 | // ========================================================================= | ||
| 286 | // Phase 2: Database Initialization | ||
| 287 | // ========================================================================= | ||
| 288 | |||
| 289 | /// Initialize sync state from database queries at startup. | ||
| 290 | /// | ||
| 291 | /// This method performs two database queries: | ||
| 292 | /// 1. Query kinds 1617/1618/1619/1621 to build `following_repo_root_events` | ||
| 293 | /// 2. Query kind 30617 to build `sync_relays` | ||
| 294 | /// | ||
| 295 | /// The bootstrap relay (if configured) is always added to `sync_relays`. | ||
| 296 | /// | ||
| 297 | /// # Errors | ||
| 298 | /// | ||
| 299 | /// Returns an error if database queries fail. | ||
| 300 | pub async fn initialize_from_database(&self) -> Result<(), String> { | ||
| 301 | // Initialize bootstrap relay if configured (never removed) | ||
| 302 | if let Some(bootstrap_url) = &self.bootstrap_relay_url { | ||
| 303 | self.sync_relays.write().await.insert( | ||
| 304 | bootstrap_url.clone(), | ||
| 305 | HashMap::new(), // Repos potentially populated below but may stay empty (Layer 1 only) | ||
| 306 | ); | ||
| 307 | tracing::info!("Added bootstrap relay to sync_relays: {}", bootstrap_url); | ||
| 308 | } | ||
| 309 | |||
| 310 | // Query 1: Build following_repo_root_events | ||
| 311 | // Find all 1617/1618/1619/1621 events and extract their repo references | ||
| 312 | let root_event_kinds = vec![ | ||
| 313 | Kind::GitPatch, // 1617 | ||
| 314 | Kind::from(KIND_PR), // 1618 | ||
| 315 | Kind::from(KIND_PR_UPDATE), // 1619 | ||
| 316 | Kind::GitIssue, // 1621 | ||
| 317 | ]; | ||
| 318 | |||
| 319 | let filter = Filter::new().kinds(root_event_kinds); | ||
| 320 | let root_events = self | ||
| 321 | .database | ||
| 322 | .query(filter) | ||
| 323 | .await | ||
| 324 | .map_err(|e| format!("Failed to query root events: {}", e))?; | ||
| 325 | |||
| 326 | let mut root_events_count = 0; | ||
| 327 | for event in root_events { | ||
| 328 | // An event may have multiple 'a' tags pointing to different repos | ||
| 329 | let repo_refs = Self::extract_all_repo_refs(&event); | ||
| 330 | for repo_ref in repo_refs { | ||
| 331 | self.following_repo_root_events | ||
| 332 | .write() | ||
| 333 | .await | ||
| 334 | .entry(repo_ref) | ||
| 335 | .or_default() | ||
| 336 | .insert(event.id); | ||
| 337 | root_events_count += 1; | ||
| 338 | } | ||
| 339 | } | ||
| 340 | tracing::info!( | ||
| 341 | "Populated following_repo_root_events with {} repo-event mappings", | ||
| 342 | root_events_count | ||
| 343 | ); | ||
| 344 | |||
| 345 | // Query 2: Build sync_relays from kind 30617 announcements | ||
| 346 | let announcement_filter = Filter::new().kind(Kind::from(KIND_REPOSITORY_ANNOUNCEMENT)); | ||
| 347 | let announcements = self | ||
| 348 | .database | ||
| 349 | .query(announcement_filter) | ||
| 350 | .await | ||
| 351 | .map_err(|e| format!("Failed to query announcements: {}", e))?; | ||
| 352 | |||
| 353 | let mut sync_relays_count = 0; | ||
| 354 | for event in announcements { | ||
| 355 | let repo_ref = Self::build_repo_ref(&event); | ||
| 356 | let relay_urls = Self::extract_relay_urls(&event); | ||
| 357 | |||
| 358 | // Only track repos that list BOTH a remote relay AND our service | ||
| 359 | if self.lists_our_service(&event) { | ||
| 360 | for relay_url in relay_urls { | ||
| 361 | if !self.is_own_relay(&relay_url) { | ||
| 362 | // Get events for this repo from following_repo_root_events | ||
| 363 | let events = self | ||
| 364 | .following_repo_root_events | ||
| 365 | .read() | ||
| 366 | .await | ||
| 367 | .get(&repo_ref) | ||
| 368 | .cloned() | ||
| 369 | .unwrap_or_default(); | ||
| 370 | |||
| 371 | self.sync_relays | ||
| 372 | .write() | ||
| 373 | .await | ||
| 374 | .entry(relay_url) | ||
| 375 | .or_default() | ||
| 376 | .insert(repo_ref.clone(), events); | ||
| 377 | sync_relays_count += 1; | ||
| 378 | } | ||
| 379 | } | ||
| 380 | } | ||
| 381 | } | ||
| 382 | tracing::info!( | ||
| 383 | "Populated sync_relays with {} relay-repo mappings", | ||
| 384 | sync_relays_count | ||
| 385 | ); | ||
| 386 | |||
| 387 | Ok(()) | ||
| 388 | } | ||
| 389 | |||
| 390 | // ========================================================================= | ||
| 391 | // Helper Methods for Event Extraction | ||
| 392 | // ========================================================================= | ||
| 393 | |||
| 394 | /// Extract ALL repo refs from an event (it may tag multiple repos). | ||
| 395 | /// | ||
| 396 | /// Looks for 'a' tags that reference kind 30617 (repository announcements). | ||
| 397 | /// Returns refs in format "30617:pubkey:identifier". | ||
| 398 | pub fn extract_all_repo_refs(event: &Event) -> Vec<String> { | ||
| 399 | event | ||
| 400 | .tags | ||
| 401 | .iter() | ||
| 402 | .filter_map(|tag| { | ||
| 403 | let tag_vec = tag.clone().to_vec(); | ||
| 404 | if tag_vec.len() >= 2 && tag_vec[0] == "a" { | ||
| 405 | // Validate it's a 30617 reference | ||
| 406 | if tag_vec[1].starts_with("30617:") { | ||
| 407 | Some(tag_vec[1].clone()) | ||
| 408 | } else { | ||
| 409 | None | ||
| 410 | } | ||
| 411 | } else { | ||
| 412 | None | ||
| 413 | } | ||
| 414 | }) | ||
| 415 | .collect() | ||
| 416 | } | ||
| 417 | |||
| 418 | /// Build a repo ref string from a 30617 announcement event. | ||
| 419 | /// | ||
| 420 | /// Returns format "30617:pubkey:identifier". | ||
| 421 | pub fn build_repo_ref(event: &Event) -> String { | ||
| 422 | // Extract 'd' tag for identifier | ||
| 423 | let identifier = event | ||
| 424 | .tags | ||
| 425 | .iter() | ||
| 426 | .find(|tag| tag.kind() == TagKind::d()) | ||
| 427 | .and_then(|tag| tag.content()) | ||
| 428 | .map(|s| s.to_string()) | ||
| 429 | .unwrap_or_default(); | ||
| 430 | |||
| 431 | format!("30617:{}:{}", event.pubkey.to_hex(), identifier) | ||
| 432 | } | ||
| 433 | |||
| 434 | /// Extract relay URLs from a repository announcement event. | ||
| 435 | /// | ||
| 436 | /// Looks for the 'relays' tag and returns all relay URLs. | ||
| 437 | pub fn extract_relay_urls(event: &Event) -> Vec<String> { | ||
| 438 | event | ||
| 439 | .tags | ||
| 440 | .iter() | ||
| 441 | .filter(|tag| matches!(tag.kind(), TagKind::Relays)) | ||
| 442 | .flat_map(|tag| { | ||
| 443 | let vec = tag.clone().to_vec(); | ||
| 444 | // Skip first element (tag name), rest are values | ||
| 445 | vec.into_iter().skip(1) | ||
| 446 | }) | ||
| 447 | .collect() | ||
| 448 | } | ||
| 449 | |||
| 450 | /// Check if event lists our service in the relays tag. | ||
| 451 | /// | ||
| 452 | /// Compares relay URLs against our service domain. | ||
| 453 | fn lists_our_service(&self, event: &Event) -> bool { | ||
| 454 | let relay_urls = Self::extract_relay_urls(event); | ||
| 455 | relay_urls.iter().any(|url| self.is_own_relay(url)) | ||
| 456 | } | ||
| 457 | |||
| 458 | /// Check if a relay URL matches our relay. | ||
| 459 | /// | ||
| 460 | /// Compares the URL against our service domain. | ||
| 461 | fn is_own_relay(&self, relay_url: &str) -> bool { | ||
| 462 | // Normalize comparison: check if URL contains our domain | ||
| 463 | relay_url.contains(&self.service_domain) | ||
| 464 | } | ||
| 465 | |||
| 466 | // ========================================================================= | ||
| 467 | // Main Run Loop | ||
| 468 | // ========================================================================= | ||
| 469 | |||
| 470 | /// Runs the sync manager main loop. | ||
| 471 | /// | ||
| 472 | /// This method should be called in a spawned task: | ||
| 473 | /// | ||
| 474 | /// ```rust,ignore | ||
| 475 | /// tokio::spawn(async move { | ||
| 476 | /// sync_manager.run().await; | ||
| 477 | /// }); | ||
| 478 | /// ``` | ||
| 479 | /// | ||
| 480 | /// ## Implementation Status | ||
| 481 | /// | ||
| 482 | /// - Phase 2: Layer 1 sync from bootstrap relay ✓ | ||
| 483 | /// - Phase 3: Self-subscription and relay discovery ✓ | ||
| 484 | /// - Phase 4-6: Filter building, connection management (TODO) | ||
| 485 | /// - Phase 7: Full sync loop (TODO) | ||
| 486 | pub async fn run(self) { | ||
| 487 | tracing::info!( | ||
| 488 | "SyncManager starting (bootstrap_relay={:?}, domain={})", | ||
| 489 | self.bootstrap_relay_url, | ||
| 490 | self.service_domain | ||
| 491 | ); | ||
| 492 | |||
| 493 | // Phase 3: Initialize state from database BEFORE spawning connections | ||
| 494 | if let Err(e) = self.initialize_from_database().await { | ||
| 495 | tracing::error!("Failed to initialize from database: {}", e); | ||
| 496 | // Continue anyway - we can still sync from bootstrap | ||
| 497 | } | ||
| 498 | |||
| 499 | // Create channel for relay actions from self-subscriber | ||
| 500 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); | ||
| 501 | |||
| 502 | // Construct our own relay URL for self-subscription | ||
| 503 | let own_relay_url = format!("ws://{}", self.service_domain); | ||
| 504 | |||
| 505 | // Spawn self-subscriber task | ||
| 506 | let self_subscriber = SelfSubscriber::new( | ||
| 507 | own_relay_url.clone(), | ||
| 508 | self.service_domain.clone(), | ||
| 509 | Arc::clone(&self.following_repo_root_events), | ||
| 510 | Arc::clone(&self.sync_relays), | ||
| 511 | action_tx, | ||
| 512 | ); | ||
| 513 | |||
| 514 | tokio::spawn(async move { | ||
| 515 | self_subscriber.run().await; | ||
| 516 | }); | ||
| 517 | |||
| 518 | tracing::info!("SelfSubscriber spawned for {}", own_relay_url); | ||
| 519 | |||
| 520 | // Track active relay connections (relay_url -> event_sender) | ||
| 521 | let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new(); | ||
| 522 | |||
| 523 | // Phase 2: Connect to bootstrap relay if configured | ||
| 524 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | ||
| 525 | if let Some(event_tx) = self | ||
| 526 | .spawn_relay_connection(bootstrap_url.clone(), None) | ||
| 527 | .await | ||
| 528 | { | ||
| 529 | active_relays.insert(bootstrap_url.clone(), event_tx); | ||
| 530 | } | ||
| 531 | } | ||
| 532 | |||
| 533 | // Main coordination loop | ||
| 534 | loop { | ||
| 535 | tokio::select! { | ||
| 536 | // Handle relay actions from self-subscriber | ||
| 537 | action = action_rx.recv() => { | ||
| 538 | match action { | ||
| 539 | Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => { | ||
| 540 | tracing::info!("Spawning new relay connection to {}", relay_url); | ||
| 541 | if !active_relays.contains_key(&relay_url) { | ||
| 542 | if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await { | ||
| 543 | active_relays.insert(relay_url, event_tx); | ||
| 544 | } | ||
| 545 | } | ||
| 546 | } | ||
| 547 | Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => { | ||
| 548 | tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len()); | ||
| 549 | // TODO: Implement filter updates for existing connections | ||
| 550 | } | ||
| 551 | None => { | ||
| 552 | tracing::info!("Action channel closed, continuing without self-subscriber"); | ||
| 553 | } | ||
| 554 | } | ||
| 555 | } | ||
| 556 | // Sleep to prevent busy loop when no events | ||
| 557 | _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { | ||
| 558 | // Periodic maintenance could go here | ||
| 559 | } | ||
| 560 | } | ||
| 561 | } | ||
| 562 | } | ||
| 563 | |||
| 564 | /// Spawn a relay connection with optional Layer 2 filters. | ||
| 565 | /// | ||
| 566 | /// Returns the event sender channel if successfully spawned. | ||
| 567 | async fn spawn_relay_connection( | ||
| 568 | &self, | ||
| 569 | relay_url: String, | ||
| 570 | repos: Option<HashMap<String, HashSet<EventId>>>, | ||
| 571 | ) -> Option<mpsc::Sender<RelayEvent>> { | ||
| 572 | // Create channel for receiving events | ||
| 573 | let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100); | ||
| 574 | |||
| 575 | // Create connection | ||
| 576 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 577 | |||
| 578 | // Determine if this is bootstrap (no repos) or discovered relay (with repos) | ||
| 579 | let is_bootstrap = repos.is_none(); | ||
| 580 | |||
| 581 | match connection.connect_and_subscribe().await { | ||
| 582 | Ok(()) => { | ||
| 583 | if is_bootstrap { | ||
| 584 | tracing::info!("Bootstrap relay connection established: {}", relay_url); | ||
| 585 | } else { | ||
| 586 | tracing::info!( | ||
| 587 | "Discovered relay connection established: {} (with Layer 2 filters)", | ||
| 588 | relay_url | ||
| 589 | ); | ||
| 590 | |||
| 591 | // Add Layer 2 subscription for repo events | ||
| 592 | if let Some(ref repos) = repos { | ||
| 593 | if let Err(e) = self.add_layer2_subscription(&connection, repos).await { | ||
| 594 | tracing::warn!("Failed to add Layer 2 subscription: {}", e); | ||
| 595 | } | ||
| 596 | } | ||
| 597 | } | ||
| 598 | |||
| 599 | // Clone refs needed for event processing task | ||
| 600 | let database = Arc::clone(&self.database); | ||
| 601 | let write_policy = self.write_policy.clone(); | ||
| 602 | let sync_source_addr = self.sync_source_addr; | ||
| 603 | |||
| 604 | // Clone event_tx for the spawned task | ||
| 605 | let event_tx_clone = event_tx.clone(); | ||
| 606 | |||
| 607 | // Spawn event loop task | ||
| 608 | let conn_url = relay_url.clone(); | ||
| 609 | tokio::spawn(async move { | ||
| 610 | connection.run_event_loop(event_tx_clone).await; | ||
| 611 | }); | ||
| 612 | |||
| 613 | // Spawn event processing task | ||
| 614 | tokio::spawn(async move { | ||
| 615 | Self::process_relay_events( | ||
| 616 | event_rx, | ||
| 617 | database, | ||
| 618 | write_policy, | ||
| 619 | sync_source_addr, | ||
| 620 | conn_url, | ||
| 621 | ) | ||
| 622 | .await; | ||
| 623 | }); | ||
| 624 | |||
| 625 | Some(event_tx) | ||
| 626 | } | ||
| 627 | Err(e) => { | ||
| 628 | tracing::error!("Failed to connect to relay {}: {}", relay_url, e); | ||
| 629 | None | ||
| 630 | } | ||
| 631 | } | ||
| 632 | } | ||
| 633 | |||
| 634 | /// Add Layer 2 subscription for repo-related events. | ||
| 635 | /// | ||
| 636 | /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track. | ||
| 637 | async fn add_layer2_subscription( | ||
| 638 | &self, | ||
| 639 | connection: &RelayConnection, | ||
| 640 | repos: &HashMap<String, HashSet<EventId>>, | ||
| 641 | ) -> Result<(), String> { | ||
| 642 | if repos.is_empty() { | ||
| 643 | return Ok(()); | ||
| 644 | } | ||
| 645 | |||
| 646 | // Build repo refs list for filter | ||
| 647 | let repo_refs: Vec<String> = repos.keys().cloned().collect(); | ||
| 648 | |||
| 649 | tracing::debug!( | ||
| 650 | "Adding Layer 2 subscription for {} repos to {}", | ||
| 651 | repo_refs.len(), | ||
| 652 | connection.url() | ||
| 653 | ); | ||
| 654 | |||
| 655 | // Chunk repo_refs into groups of 100 (per plan) | ||
| 656 | for chunk in repo_refs.chunks(100) { | ||
| 657 | // Build filter with lowercase 'a' tag for each repo ref | ||
| 658 | let mut filter = Filter::new().kinds([ | ||
| 659 | Kind::GitPatch, // 1617 | ||
| 660 | Kind::Custom(1618), // PR | ||
| 661 | Kind::Custom(1619), // PR update | ||
| 662 | Kind::GitIssue, // 1621 | ||
| 663 | ]); | ||
| 664 | |||
| 665 | // Add each repo ref as a custom tag filter | ||
| 666 | for repo_ref in chunk { | ||
| 667 | filter = | ||
| 668 | filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone()); | ||
| 669 | } | ||
| 670 | |||
| 671 | // Subscribe to this filter | ||
| 672 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 673 | return Err(format!("Failed to subscribe with Layer 2 filter: {}", e)); | ||
| 674 | } | ||
| 675 | } | 96 | } |
| 676 | |||
| 677 | Ok(()) | ||
| 678 | } | 97 | } |
| 98 | } | ||
| 679 | 99 | ||
| 680 | /// Process events from a single relay connection. | 100 | impl RelayState { |
| 681 | /// | 101 | /// Check if state should be cleared based on 15-minute rule |
| 682 | /// This is a static method that runs in its own task. | 102 | pub fn should_clear_state(&self) -> bool { |
| 683 | async fn process_relay_events( | 103 | match self.disconnected_at { |
| 684 | mut event_rx: mpsc::Receiver<RelayEvent>, | 104 | Some(disconnected) => { |
| 685 | database: SharedDatabase, | 105 | let now = Timestamp::now(); |
| 686 | write_policy: Nip34WritePolicy, | 106 | now.as_secs().saturating_sub(disconnected.as_secs()) > 900 // 15 minutes |
| 687 | sync_source_addr: SocketAddr, | ||
| 688 | relay_url: String, | ||
| 689 | ) { | ||
| 690 | tracing::debug!("Starting event processing for relay: {}", relay_url); | ||
| 691 | |||
| 692 | while let Some(relay_event) = event_rx.recv().await { | ||
| 693 | match relay_event { | ||
| 694 | RelayEvent::Event(event) => { | ||
| 695 | Self::process_single_event_static( | ||
| 696 | &event, | ||
| 697 | &database, | ||
| 698 | &write_policy, | ||
| 699 | &sync_source_addr, | ||
| 700 | &relay_url, | ||
| 701 | ) | ||
| 702 | .await; | ||
| 703 | } | ||
| 704 | RelayEvent::EndOfStoredEvents => { | ||
| 705 | tracing::debug!("EOSE received from {}", relay_url); | ||
| 706 | } | ||
| 707 | RelayEvent::Closed(reason) => { | ||
| 708 | tracing::warn!("Connection to {} closed: {}", relay_url, reason); | ||
| 709 | break; | ||
| 710 | } | ||
| 711 | } | 107 | } |
| 108 | None => false, // Still connected or never connected | ||
| 712 | } | 109 | } |
| 713 | |||
| 714 | tracing::info!("Event processing ended for relay: {}", relay_url); | ||
| 715 | } | 110 | } |
| 716 | 111 | ||
| 717 | /// Process a single event (static version for use in spawned tasks). | 112 | /// Clear repos and root_events - called when reconnect takes > 15 minutes |
| 718 | async fn process_single_event_static( | 113 | pub fn clear_sync_state(&mut self) { |
| 719 | event: &Event, | 114 | self.repos.clear(); |
| 720 | database: &SharedDatabase, | 115 | self.root_events.clear(); |
| 721 | write_policy: &Nip34WritePolicy, | ||
| 722 | sync_source_addr: &SocketAddr, | ||
| 723 | relay_url: &str, | ||
| 724 | ) { | ||
| 725 | let event_id = event.id; | ||
| 726 | let kind = event.kind.as_u16(); | ||
| 727 | |||
| 728 | // Check if event already exists in database | ||
| 729 | match database.check_id(&event_id).await { | ||
| 730 | Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => { | ||
| 731 | tracing::trace!("Event {} already exists, skipping", event_id); | ||
| 732 | return; | ||
| 733 | } | ||
| 734 | Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing | ||
| 735 | Err(e) => { | ||
| 736 | tracing::warn!("Failed to check if event {} exists: {}", event_id, e); | ||
| 737 | } | ||
| 738 | } | ||
| 739 | |||
| 740 | // Pass through write policy | ||
| 741 | let policy_result = write_policy.admit_event(event, sync_source_addr).await; | ||
| 742 | |||
| 743 | match policy_result { | ||
| 744 | PolicyResult::Accept => match database.save_event(event).await { | ||
| 745 | Ok(SaveEventStatus::Success) => { | ||
| 746 | tracing::info!( | ||
| 747 | "Synced event {} (kind {}) from {}", | ||
| 748 | event_id, | ||
| 749 | kind, | ||
| 750 | relay_url | ||
| 751 | ); | ||
| 752 | } | ||
| 753 | Ok(_) => { | ||
| 754 | tracing::debug!( | ||
| 755 | "Event {} (kind {}) already stored or rejected by database", | ||
| 756 | event_id, | ||
| 757 | kind | ||
| 758 | ); | ||
| 759 | } | ||
| 760 | Err(e) => { | ||
| 761 | tracing::error!("Failed to save synced event {}: {}", event_id, e); | ||
| 762 | } | ||
| 763 | }, | ||
| 764 | PolicyResult::Reject(reason) => { | ||
| 765 | tracing::debug!( | ||
| 766 | "Rejected synced event {} (kind {}): {}", | ||
| 767 | event_id, | ||
| 768 | kind, | ||
| 769 | reason | ||
| 770 | ); | ||
| 771 | } | ||
| 772 | } | ||
| 773 | } | 116 | } |
| 774 | } | 117 | } |
| 775 | 118 | ||
| 776 | // ============================================================================= | 119 | /// A batch of items pending EOSE confirmation |
| 777 | // Submodules | 120 | #[derive(Debug, Clone)] |
| 778 | // ============================================================================= | 121 | pub struct PendingBatch { |
| 779 | 122 | /// Unique ID for this batch - for debugging/logging | |
| 780 | pub mod health; | 123 | pub batch_id: u64, |
| 781 | pub mod metrics; | 124 | /// The items this batch is syncing |
| 125 | pub items: PendingItems, | ||
| 126 | /// Subscription IDs that must ALL receive EOSE before confirming | ||
| 127 | pub outstanding_subs: HashSet<SubscriptionId>, | ||
| 128 | } | ||
| 782 | 129 | ||
| 783 | // Re-export commonly used types | 130 | /// Items included in a pending batch |
| 784 | pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker}; | 131 | #[derive(Debug, Clone, Default)] |
| 785 | pub use metrics::{event_source, SyncMetrics}; | 132 | pub struct PendingItems { |
| 133 | /// Repos being synced in this batch | ||
| 134 | pub repos: HashSet<String>, | ||
| 135 | /// Root events being synced in this batch | ||
| 136 | pub root_events: HashSet<EventId>, | ||
| 137 | } | ||
| 786 | 138 | ||
| 787 | // ============================================================================= | 139 | // ============================================================================= |
| 788 | // Tests | 140 | // SyncMetrics - Prometheus Metrics for Sync System |
| 789 | // ============================================================================= | 141 | // ============================================================================= |
| 790 | 142 | ||
| 791 | #[cfg(test)] | 143 | /// Prometheus metrics for the proactive sync system. |
| 792 | mod tests { | 144 | /// |
| 793 | use super::*; | 145 | /// Tracks relay connections, sync progress, and operational statistics. |
| 794 | use nostr_relay_builder::prelude::{EventBuilder, Keys, Tag}; | 146 | /// Following the comprehensive v3 metrics design. |
| 147 | #[derive(Clone)] | ||
| 148 | pub struct SyncMetrics { | ||
| 149 | // === Connection metrics === | ||
| 150 | /// Per-relay connection status (1=connected, 0=disconnected) | ||
| 151 | relay_connected: IntGaugeVec, | ||
| 152 | /// Connection attempts by relay and result (success/failure) | ||
| 153 | connection_attempts_total: IntCounterVec, | ||
| 154 | |||
| 155 | // === Event metrics === | ||
| 156 | /// Events synced by source (live/startup/reconnect/daily) | ||
| 157 | events_total: IntCounterVec, | ||
| 158 | |||
| 159 | // === Summary metrics === | ||
| 160 | /// Total relays discovered and tracked | ||
| 161 | relays_tracked_total: IntGauge, | ||
| 162 | /// Currently connected relay count | ||
| 163 | relays_connected_total: IntGauge, | ||
| 164 | } | ||
| 795 | 165 | ||
| 796 | /// Helper to create a test event with specific tags | 166 | impl SyncMetrics { |
| 797 | fn create_test_event(kind: Kind, tags: Vec<Tag>) -> Event { | 167 | /// Register sync metrics with a Prometheus registry. |
| 798 | let keys = Keys::generate(); | 168 | /// |
| 799 | EventBuilder::new(kind, "test content") | 169 | /// Returns an error if metrics are already registered (e.g., in tests). |
| 800 | .tags(tags) | 170 | pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> { |
| 801 | .sign_with_keys(&keys) | 171 | // Connection metrics |
| 802 | .expect("Failed to sign test event") | 172 | let relay_connected = IntGaugeVec::new( |
| 173 | Opts::new( | ||
| 174 | "ngit_sync_relay_connected", | ||
| 175 | "Relay connection status (1=connected, 0=disconnected)", | ||
| 176 | ), | ||
| 177 | &["relay"], | ||
| 178 | )?; | ||
| 179 | registry.register(Box::new(relay_connected.clone()))?; | ||
| 180 | |||
| 181 | let connection_attempts_total = IntCounterVec::new( | ||
| 182 | Opts::new( | ||
| 183 | "ngit_sync_connection_attempts_total", | ||
| 184 | "Total connection attempts by relay and result", | ||
| 185 | ), | ||
| 186 | &["relay", "result"], | ||
| 187 | )?; | ||
| 188 | registry.register(Box::new(connection_attempts_total.clone()))?; | ||
| 189 | |||
| 190 | // Event metrics | ||
| 191 | let events_total = IntCounterVec::new( | ||
| 192 | Opts::new( | ||
| 193 | "ngit_sync_events_total", | ||
| 194 | "Total events synced by source type", | ||
| 195 | ), | ||
| 196 | &["source"], | ||
| 197 | )?; | ||
| 198 | registry.register(Box::new(events_total.clone()))?; | ||
| 199 | |||
| 200 | // Summary metrics | ||
| 201 | let relays_tracked_total = IntGauge::with_opts(Opts::new( | ||
| 202 | "ngit_sync_relays_tracked_total", | ||
| 203 | "Total number of relays discovered and tracked", | ||
| 204 | ))?; | ||
| 205 | registry.register(Box::new(relays_tracked_total.clone()))?; | ||
| 206 | |||
| 207 | let relays_connected_total = IntGauge::with_opts(Opts::new( | ||
| 208 | "ngit_sync_relays_connected_total", | ||
| 209 | "Number of currently connected relays", | ||
| 210 | ))?; | ||
| 211 | registry.register(Box::new(relays_connected_total.clone()))?; | ||
| 212 | |||
| 213 | Ok(Self { | ||
| 214 | relay_connected, | ||
| 215 | connection_attempts_total, | ||
| 216 | events_total, | ||
| 217 | relays_tracked_total, | ||
| 218 | relays_connected_total, | ||
| 219 | }) | ||
| 803 | } | 220 | } |
| 804 | 221 | ||
| 805 | // ========================================================================= | 222 | // === Connection Recording Methods === |
| 806 | // Tests for extract_all_repo_refs | ||
| 807 | // ========================================================================= | ||
| 808 | 223 | ||
| 809 | #[test] | 224 | /// Record a connection attempt (success or failure) |
| 810 | fn test_extract_all_repo_refs_single_ref() { | 225 | pub fn record_connection_attempt(&self, relay: &str, success: bool) { |
| 811 | let event = create_test_event( | 226 | let result = if success { "success" } else { "failure" }; |
| 812 | Kind::GitPatch, | 227 | self.connection_attempts_total |
| 813 | vec![Tag::custom( | 228 | .with_label_values(&[relay, result]) |
| 814 | nostr_relay_builder::prelude::TagKind::custom("a"), | 229 | .inc(); |
| 815 | vec!["30617:abc123def456:my-project"], | ||
| 816 | )], | ||
| 817 | ); | ||
| 818 | |||
| 819 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 820 | assert_eq!(refs.len(), 1); | ||
| 821 | assert_eq!(refs[0], "30617:abc123def456:my-project"); | ||
| 822 | } | 230 | } |
| 823 | 231 | ||
| 824 | #[test] | 232 | /// Set relay connection status |
| 825 | fn test_extract_all_repo_refs_multiple_refs() { | 233 | pub fn set_relay_connected(&self, relay: &str, connected: bool) { |
| 826 | let event = create_test_event( | 234 | self.relay_connected |
| 827 | Kind::GitPatch, | 235 | .with_label_values(&[relay]) |
| 828 | vec![ | 236 | .set(if connected { 1 } else { 0 }); |
| 829 | Tag::custom( | ||
| 830 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 831 | vec!["30617:abc123:project1"], | ||
| 832 | ), | ||
| 833 | Tag::custom( | ||
| 834 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 835 | vec!["30617:def456:project2"], | ||
| 836 | ), | ||
| 837 | ], | ||
| 838 | ); | ||
| 839 | |||
| 840 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 841 | assert_eq!(refs.len(), 2); | ||
| 842 | assert!(refs.contains(&"30617:abc123:project1".to_string())); | ||
| 843 | assert!(refs.contains(&"30617:def456:project2".to_string())); | ||
| 844 | } | 237 | } |
| 845 | 238 | ||
| 846 | #[test] | 239 | /// Increment connected count |
| 847 | fn test_extract_all_repo_refs_ignores_non_30617() { | 240 | pub fn inc_connected_count(&self) { |
| 848 | let event = create_test_event( | 241 | self.relays_connected_total.inc(); |
| 849 | Kind::GitPatch, | ||
| 850 | vec![ | ||
| 851 | Tag::custom( | ||
| 852 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 853 | vec!["30617:abc123:valid-repo"], | ||
| 854 | ), | ||
| 855 | Tag::custom( | ||
| 856 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 857 | vec!["30618:def456:state-event"], // Not a repo ref | ||
| 858 | ), | ||
| 859 | ], | ||
| 860 | ); | ||
| 861 | |||
| 862 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 863 | assert_eq!(refs.len(), 1); | ||
| 864 | assert_eq!(refs[0], "30617:abc123:valid-repo"); | ||
| 865 | } | 242 | } |
| 866 | 243 | ||
| 867 | #[test] | 244 | /// Decrement connected count |
| 868 | fn test_extract_all_repo_refs_empty_when_no_a_tags() { | 245 | pub fn dec_connected_count(&self) { |
| 869 | let event = create_test_event( | 246 | self.relays_connected_total.dec(); |
| 870 | Kind::GitPatch, | ||
| 871 | vec![Tag::custom( | ||
| 872 | nostr_relay_builder::prelude::TagKind::custom("e"), | ||
| 873 | vec!["some-event-id"], | ||
| 874 | )], | ||
| 875 | ); | ||
| 876 | |||
| 877 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 878 | assert!(refs.is_empty()); | ||
| 879 | } | 247 | } |
| 880 | 248 | ||
| 881 | // ========================================================================= | 249 | // === Event Recording Methods === |
| 882 | // Tests for build_repo_ref | ||
| 883 | // ========================================================================= | ||
| 884 | 250 | ||
| 885 | #[test] | 251 | /// Record a synced event by source type |
| 886 | fn test_build_repo_ref() { | 252 | /// |
| 887 | let keys = Keys::generate(); | 253 | /// Source types: |
| 888 | let event = EventBuilder::new(Kind::from(30617_u16), "announcement") | 254 | /// - "live" - Real-time subscription events |
| 889 | .tags(vec![Tag::custom( | 255 | /// - "startup" - Events from startup catchup |
| 890 | nostr_relay_builder::prelude::TagKind::d(), | 256 | /// - "reconnect" - Events from reconnection catchup |
| 891 | vec!["my-identifier"], | 257 | pub fn record_event(&self, source: &str) { |
| 892 | )]) | 258 | self.events_total.with_label_values(&[source]).inc(); |
| 893 | .sign_with_keys(&keys) | ||
| 894 | .expect("Failed to sign test event"); | ||
| 895 | |||
| 896 | let repo_ref = SyncManager::build_repo_ref(&event); | ||
| 897 | assert!(repo_ref.starts_with("30617:")); | ||
| 898 | assert!(repo_ref.ends_with(":my-identifier")); | ||
| 899 | assert!(repo_ref.contains(&event.pubkey.to_hex())); | ||
| 900 | } | 259 | } |
| 901 | 260 | ||
| 902 | #[test] | 261 | /// Record multiple events synced by source type |
| 903 | fn test_build_repo_ref_empty_identifier() { | 262 | pub fn record_events(&self, source: &str, count: u64) { |
| 904 | let keys = Keys::generate(); | 263 | self.events_total |
| 905 | let event = EventBuilder::new(Kind::from(30617_u16), "announcement") | 264 | .with_label_values(&[source]) |
| 906 | .sign_with_keys(&keys) | 265 | .inc_by(count); |
| 907 | .expect("Failed to sign test event"); | ||
| 908 | |||
| 909 | let repo_ref = SyncManager::build_repo_ref(&event); | ||
| 910 | assert!(repo_ref.starts_with("30617:")); | ||
| 911 | assert!(repo_ref.ends_with(":")); // Empty identifier | ||
| 912 | } | 266 | } |
| 913 | 267 | ||
| 914 | // ========================================================================= | 268 | // === Summary Recording Methods === |
| 915 | // Tests for extract_relay_urls | ||
| 916 | // ========================================================================= | ||
| 917 | |||
| 918 | #[test] | ||
| 919 | fn test_extract_relay_urls_single() { | ||
| 920 | let event = create_test_event( | ||
| 921 | Kind::from(30617_u16), | ||
| 922 | vec![Tag::custom( | ||
| 923 | nostr_relay_builder::prelude::TagKind::Relays, | ||
| 924 | vec!["wss://relay.example.com"], | ||
| 925 | )], | ||
| 926 | ); | ||
| 927 | 269 | ||
| 928 | let urls = SyncManager::extract_relay_urls(&event); | 270 | /// Set the total tracked relay count |
| 929 | assert_eq!(urls.len(), 1); | 271 | pub fn set_tracked_count(&self, count: i64) { |
| 930 | assert_eq!(urls[0], "wss://relay.example.com"); | 272 | self.relays_tracked_total.set(count); |
| 931 | } | 273 | } |
| 932 | 274 | ||
| 933 | #[test] | 275 | /// Increment tracked relay count |
| 934 | fn test_extract_relay_urls_multiple() { | 276 | pub fn inc_tracked_count(&self) { |
| 935 | let event = create_test_event( | 277 | self.relays_tracked_total.inc(); |
| 936 | Kind::from(30617_u16), | ||
| 937 | vec![Tag::custom( | ||
| 938 | nostr_relay_builder::prelude::TagKind::Relays, | ||
| 939 | vec!["wss://relay1.example.com", "wss://relay2.example.com"], | ||
| 940 | )], | ||
| 941 | ); | ||
| 942 | |||
| 943 | let urls = SyncManager::extract_relay_urls(&event); | ||
| 944 | assert_eq!(urls.len(), 2); | ||
| 945 | assert!(urls.contains(&"wss://relay1.example.com".to_string())); | ||
| 946 | assert!(urls.contains(&"wss://relay2.example.com".to_string())); | ||
| 947 | } | 278 | } |
| 948 | 279 | ||
| 949 | #[test] | 280 | /// Get current tracked relay count |
| 950 | fn test_extract_relay_urls_empty_when_no_relays_tag() { | 281 | pub fn get_tracked_count(&self) -> i64 { |
| 951 | let event = create_test_event( | 282 | self.relays_tracked_total.get() |
| 952 | Kind::from(30617_u16), | ||
| 953 | vec![Tag::custom( | ||
| 954 | nostr_relay_builder::prelude::TagKind::custom("d"), | ||
| 955 | vec!["my-project"], | ||
| 956 | )], | ||
| 957 | ); | ||
| 958 | |||
| 959 | let urls = SyncManager::extract_relay_urls(&event); | ||
| 960 | assert!(urls.is_empty()); | ||
| 961 | } | 283 | } |
| 962 | 284 | ||
| 963 | // ========================================================================= | 285 | /// Get current connected relay count |
| 964 | // Original data structure tests | 286 | pub fn get_connected_count(&self) -> i64 { |
| 965 | // ========================================================================= | 287 | self.relays_connected_total.get() |
| 966 | |||
| 967 | #[tokio::test] | ||
| 968 | async fn test_following_repo_root_events_basic_operations() { | ||
| 969 | let state = new_following_repo_root_events(); | ||
| 970 | |||
| 971 | // Insert some events | ||
| 972 | { | ||
| 973 | let mut guard = state.write().await; | ||
| 974 | let repo_ref = "30617:abc123:my-project".to_string(); | ||
| 975 | guard | ||
| 976 | .entry(repo_ref) | ||
| 977 | .or_default() | ||
| 978 | .insert(EventId::all_zeros()); | ||
| 979 | } | ||
| 980 | |||
| 981 | // Read back | ||
| 982 | { | ||
| 983 | let guard = state.read().await; | ||
| 984 | assert_eq!(guard.len(), 1); | ||
| 985 | assert!(guard.contains_key("30617:abc123:my-project")); | ||
| 986 | } | ||
| 987 | } | 288 | } |
| 289 | } | ||
| 988 | 290 | ||
| 989 | #[tokio::test] | 291 | /// Event source types for metrics tracking |
| 990 | async fn test_sync_relays_basic_operations() { | 292 | pub mod event_source { |
| 991 | let state = new_sync_relays(); | 293 | /// Real-time subscription events |
| 294 | pub const LIVE: &str = "live"; | ||
| 295 | /// Events from startup catchup | ||
| 296 | pub const STARTUP: &str = "startup"; | ||
| 297 | /// Events from reconnection catchup | ||
| 298 | pub const RECONNECT: &str = "reconnect"; | ||
| 299 | } | ||
| 992 | 300 | ||
| 993 | // Insert relay with repos | 301 | // ============================================================================= |
| 994 | { | 302 | // SyncManager - Main Entry Point |
| 995 | let mut guard = state.write().await; | 303 | // ============================================================================= |
| 996 | let relay_url = "wss://relay.example.com".to_string(); | ||
| 997 | let repo_ref = "30617:abc123:my-project".to_string(); | ||
| 998 | 304 | ||
| 999 | guard | 305 | /// Manages proactive synchronization with external relays |
| 1000 | .entry(relay_url) | 306 | /// |
| 1001 | .or_default() | 307 | /// The SyncManager runs as a background task, subscribing to repository |
| 1002 | .entry(repo_ref) | 308 | /// announcements on the local relay and syncing data from external relays |
| 1003 | .or_default() | 309 | /// listed in those announcements. |
| 1004 | .insert(EventId::all_zeros()); | 310 | #[allow(dead_code)] // Fields will be used in later phases |
| 1005 | } | 311 | pub struct SyncManager { |
| 312 | /// Bootstrap relay URL for initial sync (optional) | ||
| 313 | bootstrap_relay_url: Option<String>, | ||
| 314 | /// Our service domain - used for filtering relevant repos | ||
| 315 | service_domain: String, | ||
| 316 | /// Database for event storage and queries | ||
| 317 | database: SharedDatabase, | ||
| 318 | /// Write policy for validating incoming events | ||
| 319 | write_policy: Nip34WritePolicy, | ||
| 320 | /// Configuration reference for sync settings | ||
| 321 | config: Config, | ||
| 322 | /// What we want to sync (source of truth) | ||
| 323 | repo_sync_index: RepoSyncIndex, | ||
| 324 | /// What we've confirmed syncing + connection state | ||
| 325 | relay_sync_index: RelaySyncIndex, | ||
| 326 | /// In-flight subscription batches | ||
| 327 | pending_sync_index: PendingSyncIndex, | ||
| 328 | } | ||
| 1006 | 329 | ||
| 1007 | // Read back | 330 | impl SyncManager { |
| 1008 | { | 331 | /// Create a new SyncManager |
| 1009 | let guard = state.read().await; | 332 | /// |
| 1010 | assert_eq!(guard.len(), 1); | 333 | /// # Arguments |
| 1011 | let relay_repos = guard.get("wss://relay.example.com").unwrap(); | 334 | /// * `bootstrap_relay_url` - Optional relay URL for initial historical sync |
| 1012 | assert_eq!(relay_repos.len(), 1); | 335 | /// * `service_domain` - The domain this relay serves (for filtering repos) |
| 1013 | let events = relay_repos.get("30617:abc123:my-project").unwrap(); | 336 | /// * `database` - Shared database for event storage |
| 1014 | assert_eq!(events.len(), 1); | 337 | /// * `write_policy` - Policy for validating events before storage |
| 338 | /// * `config` - Configuration for sync settings | ||
| 339 | pub fn new( | ||
| 340 | bootstrap_relay_url: Option<String>, | ||
| 341 | service_domain: String, | ||
| 342 | database: SharedDatabase, | ||
| 343 | write_policy: Nip34WritePolicy, | ||
| 344 | config: &Config, | ||
| 345 | ) -> Self { | ||
| 346 | Self { | ||
| 347 | bootstrap_relay_url, | ||
| 348 | service_domain, | ||
| 349 | database, | ||
| 350 | write_policy, | ||
| 351 | config: config.clone(), | ||
| 352 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | ||
| 353 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | ||
| 354 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | ||
| 1015 | } | 355 | } |
| 1016 | } | 356 | } |
| 1017 | 357 | ||
| 1018 | #[tokio::test] | 358 | /// Run the sync manager (placeholder for Phase 1) |
| 1019 | async fn test_concurrent_access() { | 359 | /// |
| 1020 | let state = new_following_repo_root_events(); | 360 | /// This will be implemented in later phases to: |
| 1021 | let state_clone = Arc::clone(&state); | 361 | /// 1. Subscribe to local relay for 30617 events |
| 1022 | 362 | /// 2. Process events to build RepoSyncIndex | |
| 1023 | // Writer task | 363 | /// 3. Compute and execute sync actions |
| 1024 | let writer = tokio::spawn(async move { | 364 | /// 4. Handle reconnection and catch-up logic |
| 1025 | let mut guard = state_clone.write().await; | 365 | pub async fn run(self) { |
| 1026 | guard | 366 | tracing::info!( |
| 1027 | .entry("30617:writer:repo".to_string()) | 367 | bootstrap_relay = ?self.bootstrap_relay_url, |
| 1028 | .or_default() | 368 | service_domain = %self.service_domain, |
| 1029 | .insert(EventId::all_zeros()); | 369 | "SyncManager starting (placeholder - not yet implemented)" |
| 1030 | }); | 370 | ); |
| 1031 | |||
| 1032 | // Wait for writer | ||
| 1033 | writer.await.unwrap(); | ||
| 1034 | 371 | ||
| 1035 | // Reader should see the change | 372 | // Phase 1: Just log and return |
| 1036 | let guard = state.read().await; | 373 | // Full implementation will be added in subsequent phases |
| 1037 | assert!(guard.contains_key("30617:writer:repo")); | ||
| 1038 | } | 374 | } |
| 1039 | } | 375 | } \ No newline at end of file |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs deleted file mode 100644 index 71b5d51..0000000 --- a/src/sync/relay_connection.rs +++ /dev/null | |||
| @@ -1,185 +0,0 @@ | |||
| 1 | //! Relay Connection for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module handles connecting to external relays and receiving events | ||
| 4 | //! for the proactive sync system. | ||
| 5 | |||
| 6 | use std::time::Duration; | ||
| 7 | |||
| 8 | use nostr_sdk::prelude::*; | ||
| 9 | use tokio::sync::mpsc; | ||
| 10 | |||
| 11 | use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; | ||
| 12 | |||
| 13 | /// Events received from a relay connection | ||
| 14 | #[derive(Debug)] | ||
| 15 | pub enum RelayEvent { | ||
| 16 | /// A nostr event was received | ||
| 17 | Event(Event), | ||
| 18 | /// End of stored events (EOSE) received | ||
| 19 | EndOfStoredEvents, | ||
| 20 | /// Connection was closed | ||
| 21 | Closed(String), | ||
| 22 | } | ||
| 23 | |||
| 24 | /// Connection to an external relay for syncing events. | ||
| 25 | /// | ||
| 26 | /// RelayConnection handles: | ||
| 27 | /// - Connecting to the relay | ||
| 28 | /// - Subscribing with appropriate filters (Layer 1 for bootstrap) | ||
| 29 | /// - Receiving events and sending them through a channel | ||
| 30 | pub struct RelayConnection { | ||
| 31 | /// The relay URL | ||
| 32 | url: String, | ||
| 33 | /// The nostr-sdk client | ||
| 34 | client: Client, | ||
| 35 | } | ||
| 36 | |||
| 37 | impl RelayConnection { | ||
| 38 | /// Create a new relay connection. | ||
| 39 | /// | ||
| 40 | /// # Arguments | ||
| 41 | /// | ||
| 42 | /// * `url` - The WebSocket URL of the relay to connect to | ||
| 43 | pub fn new(url: String) -> Self { | ||
| 44 | // Create a client with generated keys (we're just subscribing, not publishing) | ||
| 45 | let keys = Keys::generate(); | ||
| 46 | let client = Client::new(keys); | ||
| 47 | |||
| 48 | Self { url, client } | ||
| 49 | } | ||
| 50 | |||
| 51 | /// Connect to the relay and subscribe with Layer 1 filter. | ||
| 52 | /// | ||
| 53 | /// Layer 1 filter syncs announcement events (30617, 30618) which are | ||
| 54 | /// the foundation for discovering repository relationships. | ||
| 55 | /// | ||
| 56 | /// Returns the notification stream for receiving events. | ||
| 57 | pub async fn connect_and_subscribe(&self) -> Result<(), String> { | ||
| 58 | // Add the relay | ||
| 59 | self.client | ||
| 60 | .add_relay(&self.url) | ||
| 61 | .await | ||
| 62 | .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; | ||
| 63 | |||
| 64 | // Connect to relay | ||
| 65 | self.client.connect().await; | ||
| 66 | |||
| 67 | // Wait for connection to establish | ||
| 68 | let mut connected = false; | ||
| 69 | for _ in 0..30 { | ||
| 70 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 71 | let relays = self.client.relays().await; | ||
| 72 | if relays.values().any(|r| r.is_connected()) { | ||
| 73 | connected = true; | ||
| 74 | break; | ||
| 75 | } | ||
| 76 | } | ||
| 77 | |||
| 78 | if !connected { | ||
| 79 | return Err(format!( | ||
| 80 | "Failed to connect to relay {} after 3 seconds", | ||
| 81 | self.url | ||
| 82 | )); | ||
| 83 | } | ||
| 84 | |||
| 85 | tracing::info!("Connected to bootstrap relay: {}", self.url); | ||
| 86 | |||
| 87 | // Layer 1 filter: Repository announcements and state events | ||
| 88 | // These are addressable events that define repositories | ||
| 89 | let filter = Filter::new().kinds([ | ||
| 90 | Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 | ||
| 91 | Kind::Custom(KIND_REPOSITORY_STATE), // 30618 | ||
| 92 | ]); | ||
| 93 | |||
| 94 | // Subscribe to the filter | ||
| 95 | self.client | ||
| 96 | .subscribe(filter, None) | ||
| 97 | .await | ||
| 98 | .map_err(|e| format!("Failed to subscribe: {}", e))?; | ||
| 99 | |||
| 100 | tracing::debug!( | ||
| 101 | "Subscribed to Layer 1 events (kinds 30617, 30618) from {}", | ||
| 102 | self.url | ||
| 103 | ); | ||
| 104 | |||
| 105 | Ok(()) | ||
| 106 | } | ||
| 107 | |||
| 108 | /// Run the event loop, sending received events through the channel. | ||
| 109 | /// | ||
| 110 | /// This method runs until the connection is closed or an error occurs. | ||
| 111 | /// | ||
| 112 | /// # Arguments | ||
| 113 | /// | ||
| 114 | /// * `event_sender` - Channel to send received events | ||
| 115 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { | ||
| 116 | tracing::debug!("Starting event loop for relay: {}", self.url); | ||
| 117 | |||
| 118 | // Handle notifications | ||
| 119 | self.client | ||
| 120 | .handle_notifications(|notification| async { | ||
| 121 | match notification { | ||
| 122 | RelayPoolNotification::Event { event, .. } => { | ||
| 123 | tracing::debug!( | ||
| 124 | "Received event {} (kind {}) from {}", | ||
| 125 | event.id, | ||
| 126 | event.kind.as_u16(), | ||
| 127 | self.url | ||
| 128 | ); | ||
| 129 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | ||
| 130 | tracing::warn!("Event channel closed, stopping relay connection"); | ||
| 131 | return Ok(true); // Stop handling | ||
| 132 | } | ||
| 133 | } | ||
| 134 | RelayPoolNotification::Message { message, .. } => { | ||
| 135 | if let RelayMessage::EndOfStoredEvents(_) = message { | ||
| 136 | tracing::debug!("EOSE received from {}", self.url); | ||
| 137 | if event_sender | ||
| 138 | .send(RelayEvent::EndOfStoredEvents) | ||
| 139 | .await | ||
| 140 | .is_err() | ||
| 141 | { | ||
| 142 | return Ok(true); // Stop handling | ||
| 143 | } | ||
| 144 | } | ||
| 145 | } | ||
| 146 | RelayPoolNotification::Shutdown => { | ||
| 147 | tracing::info!("Relay {} shutting down", self.url); | ||
| 148 | let _ = event_sender | ||
| 149 | .send(RelayEvent::Closed("Shutdown".to_string())) | ||
| 150 | .await; | ||
| 151 | return Ok(true); // Stop handling | ||
| 152 | } | ||
| 153 | } | ||
| 154 | Ok(false) // Continue handling | ||
| 155 | }) | ||
| 156 | .await | ||
| 157 | .ok(); // Ignore errors on shutdown | ||
| 158 | |||
| 159 | // Disconnect when done | ||
| 160 | self.client.disconnect().await; | ||
| 161 | tracing::info!("Disconnected from relay: {}", self.url); | ||
| 162 | } | ||
| 163 | |||
| 164 | /// Get the relay URL | ||
| 165 | pub fn url(&self) -> &str { | ||
| 166 | &self.url | ||
| 167 | } | ||
| 168 | |||
| 169 | /// Subscribe to an additional filter. | ||
| 170 | /// | ||
| 171 | /// This is used to add Layer 2 filters for repo-related events after | ||
| 172 | /// the initial connection is established. | ||
| 173 | pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> { | ||
| 174 | self.client | ||
| 175 | .subscribe(filter, None) | ||
| 176 | .await | ||
| 177 | .map_err(|e| format!("Failed to subscribe with filter: {}", e))?; | ||
| 178 | Ok(()) | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Get a reference to the client for additional operations. | ||
| 182 | pub fn client(&self) -> &Client { | ||
| 183 | &self.client | ||
| 184 | } | ||
| 185 | } | ||
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs deleted file mode 100644 index 0512088..0000000 --- a/src/sync/self_subscriber.rs +++ /dev/null | |||
| @@ -1,497 +0,0 @@ | |||
| 1 | //! Self-Subscriber for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module handles subscribing to our own relay to detect new events | ||
| 4 | //! and trigger relay discovery from announcements. | ||
| 5 | |||
| 6 | use std::collections::{HashMap, HashSet}; | ||
| 7 | use std::time::Duration; | ||
| 8 | |||
| 9 | use nostr_sdk::prelude::*; | ||
| 10 | use tokio::sync::mpsc; | ||
| 11 | use tokio::time::Instant; | ||
| 12 | |||
| 13 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | ||
| 14 | |||
| 15 | use super::{FollowingRepoRootEvents, SyncManager, SyncRelays}; | ||
| 16 | |||
| 17 | // ============================================================================= | ||
| 18 | // Types | ||
| 19 | // ============================================================================= | ||
| 20 | |||
| 21 | /// Actions to be taken by the SyncManager based on self-subscription events. | ||
| 22 | #[derive(Debug, Clone)] | ||
| 23 | pub enum RelayAction { | ||
| 24 | /// Spawn a new relay connection to sync from. | ||
| 25 | /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering. | ||
| 26 | SpawnRelay { | ||
| 27 | relay_url: String, | ||
| 28 | repos_and_root_events: HashMap<String, HashSet<EventId>>, | ||
| 29 | }, | ||
| 30 | /// Add filters to an existing relay connection. | ||
| 31 | /// Contains: relay_url, additional repos to add. | ||
| 32 | AddFilters { | ||
| 33 | relay_url: String, | ||
| 34 | repos_and_new_root_event: HashMap<String, HashSet<EventId>>, | ||
| 35 | }, | ||
| 36 | } | ||
| 37 | |||
| 38 | /// Pending updates collected during batch window. | ||
| 39 | #[derive(Debug, Default)] | ||
| 40 | struct PendingUpdates { | ||
| 41 | /// New announcements (kind 30617) - triggers relay discovery | ||
| 42 | announcements: Vec<Event>, | ||
| 43 | /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set | ||
| 44 | root_events: Vec<Event>, | ||
| 45 | } | ||
| 46 | |||
| 47 | // ============================================================================= | ||
| 48 | // SelfSubscriber | ||
| 49 | // ============================================================================= | ||
| 50 | |||
| 51 | /// Subscribes to our own relay to detect new events. | ||
| 52 | /// | ||
| 53 | /// The self-subscriber: | ||
| 54 | /// 1. Connects to our own relay | ||
| 55 | /// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618) | ||
| 56 | /// 3. When events arrive, batches them | ||
| 57 | /// 4. On batch timer fire, processes updates and sends relay actions | ||
| 58 | pub struct SelfSubscriber { | ||
| 59 | /// URL of our own relay to subscribe to | ||
| 60 | own_relay_url: String, | ||
| 61 | /// Our relay domain for checking if announcements list us | ||
| 62 | relay_domain: String, | ||
| 63 | /// Reference to following repo root events (shared with SyncManager) | ||
| 64 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 65 | /// Reference to sync relays (shared with SyncManager) | ||
| 66 | sync_relays: SyncRelays, | ||
| 67 | /// Channel to send relay actions back to manager | ||
| 68 | action_tx: mpsc::Sender<RelayAction>, | ||
| 69 | } | ||
| 70 | |||
| 71 | impl SelfSubscriber { | ||
| 72 | /// Create a new self-subscriber. | ||
| 73 | pub fn new( | ||
| 74 | own_relay_url: String, | ||
| 75 | relay_domain: String, | ||
| 76 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 77 | sync_relays: SyncRelays, | ||
| 78 | action_tx: mpsc::Sender<RelayAction>, | ||
| 79 | ) -> Self { | ||
| 80 | Self { | ||
| 81 | own_relay_url, | ||
| 82 | relay_domain, | ||
| 83 | following_repo_root_events, | ||
| 84 | sync_relays, | ||
| 85 | action_tx, | ||
| 86 | } | ||
| 87 | } | ||
| 88 | |||
| 89 | /// Get the batch window duration from environment variable. | ||
| 90 | /// | ||
| 91 | /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS | ||
| 92 | /// for faster tests (typically 200ms). | ||
| 93 | fn get_batch_window() -> Duration { | ||
| 94 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | ||
| 95 | .ok() | ||
| 96 | .and_then(|s| s.parse().ok()) | ||
| 97 | .map(Duration::from_millis) | ||
| 98 | .unwrap_or(Duration::from_secs(5)) | ||
| 99 | } | ||
| 100 | |||
| 101 | /// Run the self-subscriber event loop. | ||
| 102 | /// | ||
| 103 | /// This method: | ||
| 104 | /// 1. Connects to our own relay | ||
| 105 | /// 2. Subscribes to relevant event kinds | ||
| 106 | /// 3. Receives events and batches them | ||
| 107 | /// 4. On batch timer fire, processes and sends relay actions | ||
| 108 | pub async fn run(self) { | ||
| 109 | tracing::info!("SelfSubscriber starting for {}", self.own_relay_url); | ||
| 110 | |||
| 111 | // Create nostr-sdk client | ||
| 112 | let keys = Keys::generate(); | ||
| 113 | let client = Client::new(keys); | ||
| 114 | |||
| 115 | // Connect to our own relay | ||
| 116 | if let Err(e) = client.add_relay(&self.own_relay_url).await { | ||
| 117 | tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e); | ||
| 118 | return; | ||
| 119 | } | ||
| 120 | |||
| 121 | client.connect().await; | ||
| 122 | |||
| 123 | // Wait for connection | ||
| 124 | let mut connected = false; | ||
| 125 | for _ in 0..30 { | ||
| 126 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 127 | let relays = client.relays().await; | ||
| 128 | if relays.values().any(|r| r.is_connected()) { | ||
| 129 | connected = true; | ||
| 130 | break; | ||
| 131 | } | ||
| 132 | } | ||
| 133 | |||
| 134 | if !connected { | ||
| 135 | tracing::error!( | ||
| 136 | "Failed to connect to own relay {} after 3 seconds", | ||
| 137 | self.own_relay_url | ||
| 138 | ); | ||
| 139 | return; | ||
| 140 | } | ||
| 141 | |||
| 142 | tracing::info!("SelfSubscriber connected to {}", self.own_relay_url); | ||
| 143 | |||
| 144 | // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design) | ||
| 145 | let filter = Filter::new() | ||
| 146 | .kinds([ | ||
| 147 | Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 | ||
| 148 | Kind::GitPatch, // 1617 | ||
| 149 | Kind::Custom(KIND_PR), // 1618 | ||
| 150 | Kind::Custom(KIND_PR_UPDATE), // 1619 | ||
| 151 | Kind::GitIssue, // 1621 | ||
| 152 | ]) | ||
| 153 | .since(Timestamp::now()); | ||
| 154 | |||
| 155 | if let Err(e) = client.subscribe(filter, None).await { | ||
| 156 | tracing::error!("Failed to subscribe to own relay: {}", e); | ||
| 157 | return; | ||
| 158 | } | ||
| 159 | |||
| 160 | tracing::info!("SelfSubscriber subscribed to event kinds on own relay"); | ||
| 161 | |||
| 162 | // Batch state | ||
| 163 | let mut pending = PendingUpdates::default(); | ||
| 164 | let mut batch_timer_started: Option<Instant> = None; | ||
| 165 | let batch_window = Self::get_batch_window(); | ||
| 166 | |||
| 167 | // Main event loop using notifications stream | ||
| 168 | loop { | ||
| 169 | // Calculate timeout for batch processing | ||
| 170 | let timeout = if let Some(started) = batch_timer_started { | ||
| 171 | let elapsed = started.elapsed(); | ||
| 172 | if elapsed >= batch_window { | ||
| 173 | Duration::ZERO | ||
| 174 | } else { | ||
| 175 | batch_window - elapsed | ||
| 176 | } | ||
| 177 | } else { | ||
| 178 | Duration::from_secs(60) // Long timeout when no batch pending | ||
| 179 | }; | ||
| 180 | |||
| 181 | // Wait for notification with timeout | ||
| 182 | let notification = tokio::time::timeout(timeout, client.notifications().recv()).await; | ||
| 183 | |||
| 184 | match notification { | ||
| 185 | Ok(Ok(notification)) => { | ||
| 186 | match notification { | ||
| 187 | RelayPoolNotification::Event { event, .. } => { | ||
| 188 | let kind = event.kind.as_u16(); | ||
| 189 | |||
| 190 | // Start batch timer on first event (does NOT reset) | ||
| 191 | if batch_timer_started.is_none() { | ||
| 192 | batch_timer_started = Some(Instant::now()); | ||
| 193 | tracing::debug!("Batch timer started"); | ||
| 194 | } | ||
| 195 | |||
| 196 | // Classify and add to pending | ||
| 197 | if kind == KIND_REPOSITORY_ANNOUNCEMENT { | ||
| 198 | tracing::debug!( | ||
| 199 | "SelfSubscriber received announcement {}", | ||
| 200 | event.id | ||
| 201 | ); | ||
| 202 | pending.announcements.push(*event); | ||
| 203 | } else { | ||
| 204 | tracing::debug!( | ||
| 205 | "SelfSubscriber received root event {} (kind {})", | ||
| 206 | event.id, | ||
| 207 | kind | ||
| 208 | ); | ||
| 209 | pending.root_events.push(*event); | ||
| 210 | } | ||
| 211 | } | ||
| 212 | RelayPoolNotification::Message { message, .. } => { | ||
| 213 | if let RelayMessage::EndOfStoredEvents(_) = message { | ||
| 214 | tracing::debug!("SelfSubscriber EOSE received"); | ||
| 215 | // Process any pending events after EOSE | ||
| 216 | if !pending.announcements.is_empty() | ||
| 217 | || !pending.root_events.is_empty() | ||
| 218 | { | ||
| 219 | self.process_batch(&mut pending).await; | ||
| 220 | batch_timer_started = None; | ||
| 221 | } | ||
| 222 | } | ||
| 223 | } | ||
| 224 | RelayPoolNotification::Shutdown => { | ||
| 225 | tracing::info!("SelfSubscriber shutting down"); | ||
| 226 | break; | ||
| 227 | } | ||
| 228 | } | ||
| 229 | } | ||
| 230 | Ok(Err(_)) => { | ||
| 231 | // Channel closed | ||
| 232 | tracing::warn!("SelfSubscriber notification channel closed"); | ||
| 233 | break; | ||
| 234 | } | ||
| 235 | Err(_) => { | ||
| 236 | // Timeout - check if batch should be processed | ||
| 237 | if let Some(started) = batch_timer_started { | ||
| 238 | if started.elapsed() >= batch_window { | ||
| 239 | if !pending.announcements.is_empty() || !pending.root_events.is_empty() | ||
| 240 | { | ||
| 241 | self.process_batch(&mut pending).await; | ||
| 242 | } | ||
| 243 | batch_timer_started = None; | ||
| 244 | } | ||
| 245 | } | ||
| 246 | } | ||
| 247 | } | ||
| 248 | } | ||
| 249 | |||
| 250 | client.disconnect().await; | ||
| 251 | tracing::info!("SelfSubscriber disconnected"); | ||
| 252 | } | ||
| 253 | |||
| 254 | /// Process a batch of pending updates. | ||
| 255 | async fn process_batch(&self, pending: &mut PendingUpdates) { | ||
| 256 | tracing::debug!( | ||
| 257 | "Processing batch: {} announcements, {} root events", | ||
| 258 | pending.announcements.len(), | ||
| 259 | pending.root_events.len() | ||
| 260 | ); | ||
| 261 | |||
| 262 | // Process root events first (update following_repo_root_events) | ||
| 263 | for event in pending.root_events.drain(..) { | ||
| 264 | let repo_refs = SyncManager::extract_all_repo_refs(&event); | ||
| 265 | if !repo_refs.is_empty() { | ||
| 266 | let mut guard = self.following_repo_root_events.write().await; | ||
| 267 | for repo_ref in repo_refs { | ||
| 268 | guard.entry(repo_ref).or_default().insert(event.id); | ||
| 269 | } | ||
| 270 | } | ||
| 271 | } | ||
| 272 | |||
| 273 | // Process announcements (relay discovery) | ||
| 274 | for event in pending.announcements.drain(..) { | ||
| 275 | self.process_announcement(&event).await; | ||
| 276 | } | ||
| 277 | } | ||
| 278 | |||
| 279 | /// Process an announcement event for relay discovery. | ||
| 280 | async fn process_announcement(&self, event: &Event) { | ||
| 281 | let repo_ref = SyncManager::build_repo_ref(event); | ||
| 282 | let relay_urls = Self::extract_relay_urls_from_announcement(event); | ||
| 283 | |||
| 284 | // Check if this announcement lists our relay | ||
| 285 | if !self.lists_our_service(event) { | ||
| 286 | tracing::debug!( | ||
| 287 | "Announcement {} does not list our service, skipping relay discovery", | ||
| 288 | event.id | ||
| 289 | ); | ||
| 290 | return; | ||
| 291 | } | ||
| 292 | |||
| 293 | tracing::info!( | ||
| 294 | "Processing announcement {} for repo {}, found {} relay URLs", | ||
| 295 | event.id, | ||
| 296 | repo_ref, | ||
| 297 | relay_urls.len() | ||
| 298 | ); | ||
| 299 | |||
| 300 | // Get current events for this repo from following_repo_root_events | ||
| 301 | let events = self | ||
| 302 | .following_repo_root_events | ||
| 303 | .read() | ||
| 304 | .await | ||
| 305 | .get(&repo_ref) | ||
| 306 | .cloned() | ||
| 307 | .unwrap_or_default(); | ||
| 308 | |||
| 309 | // For each relay URL in the announcement, check if we need to spawn or update | ||
| 310 | for relay_url in relay_urls { | ||
| 311 | if self.is_own_relay(&relay_url) { | ||
| 312 | continue; // Skip our own relay | ||
| 313 | } | ||
| 314 | |||
| 315 | let sync_relays_guard = self.sync_relays.read().await; | ||
| 316 | let exists = sync_relays_guard.contains_key(&relay_url); | ||
| 317 | drop(sync_relays_guard); | ||
| 318 | |||
| 319 | if exists { | ||
| 320 | // Relay already known - check if we need to add this repo | ||
| 321 | let mut guard = self.sync_relays.write().await; | ||
| 322 | let relay_repos = guard.entry(relay_url.clone()).or_default(); | ||
| 323 | let is_new_repo = !relay_repos.contains_key(&repo_ref); | ||
| 324 | |||
| 325 | if is_new_repo { | ||
| 326 | relay_repos.insert(repo_ref.clone(), events.clone()); | ||
| 327 | drop(guard); | ||
| 328 | |||
| 329 | // Send action to add filters | ||
| 330 | let mut repos_filters = HashMap::new(); | ||
| 331 | repos_filters.insert(repo_ref.clone(), events.clone()); | ||
| 332 | |||
| 333 | if let Err(e) = self | ||
| 334 | .action_tx | ||
| 335 | .send(RelayAction::AddFilters { | ||
| 336 | relay_url: relay_url.clone(), | ||
| 337 | repos_and_new_root_event: repos_filters, | ||
| 338 | }) | ||
| 339 | .await | ||
| 340 | { | ||
| 341 | tracing::warn!("Failed to send AddFilters action: {}", e); | ||
| 342 | } | ||
| 343 | } | ||
| 344 | } else { | ||
| 345 | // New relay - add to sync_relays and spawn | ||
| 346 | let mut guard = self.sync_relays.write().await; | ||
| 347 | let mut repos = HashMap::new(); | ||
| 348 | repos.insert(repo_ref.clone(), events.clone()); | ||
| 349 | guard.insert(relay_url.clone(), repos.clone()); | ||
| 350 | drop(guard); | ||
| 351 | |||
| 352 | tracing::info!("Discovered new relay to sync from: {}", relay_url); | ||
| 353 | |||
| 354 | // Send action to spawn relay | ||
| 355 | if let Err(e) = self | ||
| 356 | .action_tx | ||
| 357 | .send(RelayAction::SpawnRelay { | ||
| 358 | relay_url: relay_url.clone(), | ||
| 359 | repos_and_root_events: repos, | ||
| 360 | }) | ||
| 361 | .await | ||
| 362 | { | ||
| 363 | tracing::warn!("Failed to send SpawnRelay action: {}", e); | ||
| 364 | } | ||
| 365 | } | ||
| 366 | } | ||
| 367 | } | ||
| 368 | |||
| 369 | /// Extract relay URLs from an announcement event. | ||
| 370 | /// | ||
| 371 | /// Looks for both 'relays' and 'clone' tags. | ||
| 372 | fn extract_relay_urls_from_announcement(event: &Event) -> Vec<String> { | ||
| 373 | let mut urls = Vec::new(); | ||
| 374 | |||
| 375 | // Extract from 'relays' tag | ||
| 376 | for tag in event.tags.iter() { | ||
| 377 | if matches!(tag.kind(), TagKind::Relays) { | ||
| 378 | let vec = tag.clone().to_vec(); | ||
| 379 | urls.extend(vec.into_iter().skip(1)); // Skip tag name | ||
| 380 | } | ||
| 381 | } | ||
| 382 | |||
| 383 | // Extract from 'clone' tag - parse URLs to get relay hints | ||
| 384 | // Clone URLs look like: http://domain/repo.git or git://domain/repo.git | ||
| 385 | // We want to construct ws://domain from these | ||
| 386 | for tag in event.tags.iter() { | ||
| 387 | if matches!(tag.kind(), TagKind::Clone) { | ||
| 388 | let vec = tag.clone().to_vec(); | ||
| 389 | for url in vec.into_iter().skip(1) { | ||
| 390 | if let Some(relay_url) = Self::clone_url_to_relay_url(&url) { | ||
| 391 | if !urls.contains(&relay_url) { | ||
| 392 | urls.push(relay_url); | ||
| 393 | } | ||
| 394 | } | ||
| 395 | } | ||
| 396 | } | ||
| 397 | } | ||
| 398 | |||
| 399 | urls | ||
| 400 | } | ||
| 401 | |||
| 402 | /// Convert a clone URL to a potential relay URL. | ||
| 403 | /// | ||
| 404 | /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080" | ||
| 405 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | ||
| 406 | // Parse the URL to extract host:port | ||
| 407 | if let Ok(url) = url::Url::parse(clone_url) { | ||
| 408 | let host = url.host_str()?; | ||
| 409 | let port = url.port(); | ||
| 410 | let scheme = if url.scheme() == "https" { "wss" } else { "ws" }; | ||
| 411 | |||
| 412 | if let Some(port) = port { | ||
| 413 | Some(format!("{}://{}:{}", scheme, host, port)) | ||
| 414 | } else { | ||
| 415 | Some(format!("{}://{}", scheme, host)) | ||
| 416 | } | ||
| 417 | } else { | ||
| 418 | None | ||
| 419 | } | ||
| 420 | } | ||
| 421 | |||
| 422 | /// Check if event lists our service in the relays or clone tags. | ||
| 423 | fn lists_our_service(&self, event: &Event) -> bool { | ||
| 424 | // Check relays tag | ||
| 425 | for tag in event.tags.iter() { | ||
| 426 | if matches!(tag.kind(), TagKind::Relays) { | ||
| 427 | let vec = tag.clone().to_vec(); | ||
| 428 | for url in vec.into_iter().skip(1) { | ||
| 429 | if self.is_own_relay(&url) { | ||
| 430 | return true; | ||
| 431 | } | ||
| 432 | } | ||
| 433 | } | ||
| 434 | } | ||
| 435 | |||
| 436 | // Check clone tag | ||
| 437 | for tag in event.tags.iter() { | ||
| 438 | if matches!(tag.kind(), TagKind::Clone) { | ||
| 439 | let vec = tag.clone().to_vec(); | ||
| 440 | for url in vec.into_iter().skip(1) { | ||
| 441 | if url.contains(&self.relay_domain) { | ||
| 442 | return true; | ||
| 443 | } | ||
| 444 | } | ||
| 445 | } | ||
| 446 | } | ||
| 447 | |||
| 448 | false | ||
| 449 | } | ||
| 450 | |||
| 451 | /// Check if a relay URL matches our relay. | ||
| 452 | fn is_own_relay(&self, relay_url: &str) -> bool { | ||
| 453 | relay_url.contains(&self.relay_domain) | ||
| 454 | } | ||
| 455 | } | ||
| 456 | |||
| 457 | #[cfg(test)] | ||
| 458 | mod tests { | ||
| 459 | use super::*; | ||
| 460 | |||
| 461 | #[test] | ||
| 462 | fn test_clone_url_to_relay_url_http() { | ||
| 463 | let url = "http://127.0.0.1:8080/repo.git"; | ||
| 464 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 465 | assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string())); | ||
| 466 | } | ||
| 467 | |||
| 468 | #[test] | ||
| 469 | fn test_clone_url_to_relay_url_https() { | ||
| 470 | let url = "https://example.com/repo.git"; | ||
| 471 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 472 | assert_eq!(relay, Some("wss://example.com".to_string())); | ||
| 473 | } | ||
| 474 | |||
| 475 | #[test] | ||
| 476 | fn test_clone_url_to_relay_url_invalid() { | ||
| 477 | let url = "not-a-valid-url"; | ||
| 478 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 479 | assert_eq!(relay, None); | ||
| 480 | } | ||
| 481 | |||
| 482 | #[test] | ||
| 483 | fn test_get_batch_window_default() { | ||
| 484 | // Clear env var if set | ||
| 485 | std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); | ||
| 486 | let window = SelfSubscriber::get_batch_window(); | ||
| 487 | assert_eq!(window, Duration::from_secs(5)); | ||
| 488 | } | ||
| 489 | |||
| 490 | #[test] | ||
| 491 | fn test_get_batch_window_from_env() { | ||
| 492 | std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200"); | ||
| 493 | let window = SelfSubscriber::get_batch_window(); | ||
| 494 | assert_eq!(window, Duration::from_millis(200)); | ||
| 495 | std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); | ||
| 496 | } | ||
| 497 | } | ||