From 586fc2a7df1ce256469f0742d23f687ac4b075b1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 10:33:07 +0000 Subject: stub of sync v4 --- src/sync/health.rs | 475 ---------------- src/sync/metrics.rs | 348 ------------ src/sync/mod.rs | 1264 ++++++++++-------------------------------- src/sync/relay_connection.rs | 185 ------- src/sync/self_subscriber.rs | 497 ----------------- 5 files changed, 300 insertions(+), 2469 deletions(-) delete mode 100644 src/sync/health.rs delete mode 100644 src/sync/metrics.rs delete mode 100644 src/sync/relay_connection.rs delete mode 100644 src/sync/self_subscriber.rs (limited to 'src/sync') diff --git a/src/sync/health.rs b/src/sync/health.rs deleted file mode 100644 index 51bd5ae..0000000 --- a/src/sync/health.rs +++ /dev/null @@ -1,475 +0,0 @@ -//! Relay Health Tracking for GRASP-02 Proactive Sync -//! -//! This module implements health tracking for relay connections, including: -//! - Health state machine (Healthy -> Degraded -> Dead) -//! - Exponential backoff with configurable max delay -//! - Dead relay detection after 24h of continuous failures -//! -//! ## Health States -//! -//! - **Healthy**: Working connection, no recent failures -//! - **Degraded**: Connection failed, retrying with backoff -//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) - -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use dashmap::DashMap; - -use crate::config::Config; - -/// Duration threshold before a relay is considered dead (24 hours) -const DEAD_THRESHOLD_HOURS: u64 = 24; - -/// How often dead relays are retried (once per 24 hours) -const DEAD_RETRY_INTERVAL_HOURS: u64 = 24; - -/// Default maximum backoff duration in seconds (1 hour) -const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; - -/// Base backoff duration in seconds -const BASE_BACKOFF_SECS: u64 = 5; - -/// Health state of a relay connection -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum HealthState { - /// Working connection, no recent failures - Healthy, - /// Connection failed, retrying with exponential backoff - Degraded, - /// 24h+ of continuous failures, minimal retry - Dead, -} - -impl std::fmt::Display for HealthState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - HealthState::Healthy => write!(f, "healthy"), - HealthState::Degraded => write!(f, "degraded"), - HealthState::Dead => write!(f, "dead"), - } - } -} - -/// Health information for a single relay -#[derive(Debug, Clone)] -pub struct RelayHealth { - /// Current health state - pub state: HealthState, - /// Number of consecutive connection failures - pub consecutive_failures: u32, - /// Time of the first failure in the current failure streak - pub first_failure_time: Option, - /// Time of the last failure - pub last_failure_time: Option, - /// Time of the last successful connection - pub last_success_time: Option, - /// Next time a connection attempt should be made - pub next_retry_at: Option, -} - -impl Default for RelayHealth { - fn default() -> Self { - Self { - state: HealthState::Healthy, - consecutive_failures: 0, - first_failure_time: None, - last_failure_time: None, - last_success_time: None, - next_retry_at: None, - } - } -} - -impl RelayHealth { - /// Create a new RelayHealth with healthy state - pub fn new() -> Self { - Self::default() - } -} - -/// Thread-safe relay health tracker using DashMap -#[derive(Debug)] -pub struct RelayHealthTracker { - health: DashMap, - max_backoff_secs: u64, -} - -impl RelayHealthTracker { - /// Create a new RelayHealthTracker - pub fn new(config: &Config) -> Self { - Self { - health: DashMap::new(), - max_backoff_secs: config.sync_max_backoff_secs, - } - } - - /// Create a new RelayHealthTracker with default settings - pub fn with_defaults() -> Self { - Self { - health: DashMap::new(), - max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, - } - } - - /// Create a new RelayHealthTracker with custom max backoff - pub fn with_max_backoff(max_backoff_secs: u64) -> Self { - Self { - health: DashMap::new(), - max_backoff_secs, - } - } - - /// Record a successful connection to a relay - /// - /// Resets the relay to Healthy state and clears failure counters. - pub fn record_success(&self, relay_url: &str) { - let now = Instant::now(); - let mut entry = self.health.entry(relay_url.to_string()).or_default(); - let health = entry.value_mut(); - - let old_state = health.state; - - // Reset to healthy state - health.state = HealthState::Healthy; - health.consecutive_failures = 0; - health.first_failure_time = None; - health.last_failure_time = None; - health.last_success_time = Some(now); - health.next_retry_at = None; - - if old_state != HealthState::Healthy { - tracing::info!( - "Relay {} recovered to healthy (was {:?})", - relay_url, - old_state - ); - } - } - - /// Record a connection failure for a relay - /// - /// Increments failure counter, updates state, and calculates next retry time. - pub fn record_failure(&self, relay_url: &str) { - let now = Instant::now(); - let mut entry = self.health.entry(relay_url.to_string()).or_default(); - let health = entry.value_mut(); - - let old_state = health.state; - - // Set first_failure_time if this is a new failure streak - if health.first_failure_time.is_none() { - health.first_failure_time = Some(now); - } - - health.consecutive_failures = health.consecutive_failures.saturating_add(1); - health.last_failure_time = Some(now); - - // Check if we should transition to Dead state - if let Some(first_failure) = health.first_failure_time { - let failure_duration = now.duration_since(first_failure); - let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); - - if failure_duration >= dead_threshold { - health.state = HealthState::Dead; - // Dead relays retry once per day - health.next_retry_at = - Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); - - if old_state != HealthState::Dead { - tracing::warn!( - "Relay {} marked dead after 24h failures ({} consecutive failures)", - relay_url, - health.consecutive_failures - ); - } - } else { - // Degraded state with exponential backoff - health.state = HealthState::Degraded; - let backoff = Self::get_backoff_duration( - health.consecutive_failures, - self.max_backoff_secs, - ); - health.next_retry_at = Some(now + backoff); - - if old_state != HealthState::Degraded { - tracing::warn!( - "Relay {} degraded, backoff {:?}", - relay_url, - backoff - ); - } else { - tracing::debug!( - "Relay {} failure #{}, backoff {:?}", - relay_url, - health.consecutive_failures, - backoff - ); - } - } - } - } - - /// Check if a connection attempt should be made to a relay - /// - /// Returns true if: - /// - The relay has no health record (first attempt) - /// - The relay is healthy - /// - The backoff period has elapsed - pub fn should_attempt_connection(&self, relay_url: &str) -> bool { - let entry = self.health.get(relay_url); - - match entry { - None => true, // No record, allow first attempt - Some(entry) => { - let health = entry.value(); - - match health.state { - HealthState::Healthy => true, - HealthState::Degraded | HealthState::Dead => { - // Check if backoff period has elapsed - match health.next_retry_at { - None => true, - Some(next_retry) => Instant::now() >= next_retry, - } - } - } - } - } - } - - /// Get the current health state of a relay - pub fn get_state(&self, relay_url: &str) -> HealthState { - self.health - .get(relay_url) - .map(|entry| entry.value().state) - .unwrap_or(HealthState::Healthy) - } - - /// Check if a relay is marked as dead - pub fn is_dead(&self, relay_url: &str) -> bool { - self.get_state(relay_url) == HealthState::Dead - } - - /// Get the remaining backoff duration for a relay - /// - /// Returns None if no backoff is active. - pub fn get_remaining_backoff(&self, relay_url: &str) -> Option { - let entry = self.health.get(relay_url)?; - let health = entry.value(); - let next_retry = health.next_retry_at?; - let now = Instant::now(); - - if now >= next_retry { - None - } else { - Some(next_retry - now) - } - } - - /// Get the consecutive failure count for a relay - pub fn get_failure_count(&self, relay_url: &str) -> u32 { - self.health - .get(relay_url) - .map(|entry| entry.value().consecutive_failures) - .unwrap_or(0) - } - - /// Calculate the backoff duration based on failure count - /// - /// Uses exponential backoff: base * 2^failures, capped at max_backoff - pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration { - let backoff_secs = BASE_BACKOFF_SECS - .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); - Duration::from_secs(backoff_secs.min(max_backoff_secs)) - } - - /// Get all tracked relay URLs - pub fn get_tracked_relays(&self) -> Vec { - self.health.iter().map(|entry| entry.key().clone()).collect() - } - - /// Get a clone of the health info for a relay - pub fn get_health(&self, relay_url: &str) -> Option { - self.health.get(relay_url).map(|entry| entry.value().clone()) - } -} - -/// Create a shared RelayHealthTracker wrapped in Arc -pub fn create_health_tracker(config: &Config) -> Arc { - Arc::new(RelayHealthTracker::new(config)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_health_state_display() { - assert_eq!(HealthState::Healthy.to_string(), "healthy"); - assert_eq!(HealthState::Degraded.to_string(), "degraded"); - assert_eq!(HealthState::Dead.to_string(), "dead"); - } - - #[test] - fn test_default_health_is_healthy() { - let health = RelayHealth::default(); - assert_eq!(health.state, HealthState::Healthy); - assert_eq!(health.consecutive_failures, 0); - assert!(health.first_failure_time.is_none()); - } - - #[test] - fn test_should_attempt_connection_new_relay() { - let tracker = RelayHealthTracker::with_defaults(); - assert!(tracker.should_attempt_connection("wss://new-relay.example.com")); - } - - #[test] - fn test_record_success_resets_to_healthy() { - let tracker = RelayHealthTracker::with_defaults(); - let url = "wss://test-relay.example.com"; - - // Simulate a few failures - tracker.record_failure(url); - tracker.record_failure(url); - assert_eq!(tracker.get_state(url), HealthState::Degraded); - assert_eq!(tracker.get_failure_count(url), 2); - - // Record success - tracker.record_success(url); - assert_eq!(tracker.get_state(url), HealthState::Healthy); - assert_eq!(tracker.get_failure_count(url), 0); - assert!(tracker.should_attempt_connection(url)); - } - - #[test] - fn test_backoff_increases_exponentially() { - // failure 1: 5s - assert_eq!( - RelayHealthTracker::get_backoff_duration(1, 3600), - Duration::from_secs(5) - ); - // failure 2: 10s - assert_eq!( - RelayHealthTracker::get_backoff_duration(2, 3600), - Duration::from_secs(10) - ); - // failure 3: 20s - assert_eq!( - RelayHealthTracker::get_backoff_duration(3, 3600), - Duration::from_secs(20) - ); - // failure 4: 40s - assert_eq!( - RelayHealthTracker::get_backoff_duration(4, 3600), - Duration::from_secs(40) - ); - // failure 5: 80s - assert_eq!( - RelayHealthTracker::get_backoff_duration(5, 3600), - Duration::from_secs(80) - ); - } - - #[test] - fn test_backoff_capped_at_max() { - let max_backoff = 3600u64; - // After many failures, should cap at max_backoff (1 hour) - assert_eq!( - RelayHealthTracker::get_backoff_duration(20, max_backoff), - Duration::from_secs(max_backoff) - ); - } - - #[test] - fn test_degraded_state_after_failure() { - let tracker = RelayHealthTracker::with_defaults(); - let url = "wss://test-relay.example.com"; - - tracker.record_failure(url); - assert_eq!(tracker.get_state(url), HealthState::Degraded); - assert_eq!(tracker.get_failure_count(url), 1); - } - - #[test] - fn test_backoff_blocks_immediate_reconnection() { - let tracker = RelayHealthTracker::with_defaults(); - let url = "wss://test-relay.example.com"; - - tracker.record_failure(url); - - // Immediately after failure, should not attempt (backoff active) - assert!(!tracker.should_attempt_connection(url)); - - // Remaining backoff should be some positive duration - let remaining = tracker.get_remaining_backoff(url); - assert!(remaining.is_some()); - assert!(remaining.unwrap() > Duration::ZERO); - } - - #[test] - fn test_is_dead() { - let tracker = RelayHealthTracker::with_defaults(); - let url = "wss://test-relay.example.com"; - - // Initially not dead - assert!(!tracker.is_dead(url)); - - // After a failure, still not dead (just degraded) - tracker.record_failure(url); - assert!(!tracker.is_dead(url)); - assert_eq!(tracker.get_state(url), HealthState::Degraded); - } - - #[test] - fn test_get_tracked_relays() { - let tracker = RelayHealthTracker::with_defaults(); - - tracker.record_success("wss://relay1.example.com"); - tracker.record_failure("wss://relay2.example.com"); - - let tracked = tracker.get_tracked_relays(); - assert_eq!(tracked.len(), 2); - assert!(tracked.contains(&"wss://relay1.example.com".to_string())); - assert!(tracked.contains(&"wss://relay2.example.com".to_string())); - } - - #[test] - fn test_custom_max_backoff() { - let custom_max = 60u64; // 1 minute max - let tracker = RelayHealthTracker::with_max_backoff(custom_max); - let url = "wss://test-relay.example.com"; - - // Simulate many failures - for _ in 0..20 { - tracker.record_failure(url); - } - - // The remaining backoff should respect the custom max - // Note: We can't easily test the internal backoff calculation here, - // but we can verify the tracker was created with the custom setting - assert_eq!(tracker.max_backoff_secs, custom_max); - } - - #[test] - fn test_get_health_returns_clone() { - let tracker = RelayHealthTracker::with_defaults(); - let url = "wss://test-relay.example.com"; - - tracker.record_success(url); - let health = tracker.get_health(url); - - assert!(health.is_some()); - let health = health.unwrap(); - assert_eq!(health.state, HealthState::Healthy); - assert!(health.last_success_time.is_some()); - } - - #[test] - fn test_get_health_nonexistent() { - let tracker = RelayHealthTracker::with_defaults(); - let health = tracker.get_health("wss://nonexistent.example.com"); - assert!(health.is_none()); - } -} \ No newline at end of file diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs deleted file mode 100644 index c93e583..0000000 --- a/src/sync/metrics.rs +++ /dev/null @@ -1,348 +0,0 @@ -//! Prometheus Metrics for Proactive Sync (GRASP-02 Phase 6) -//! -//! This module provides comprehensive sync monitoring metrics including: -//! - Connection status and attempts per relay -//! - Health state tracking (Healthy/Degraded/Dead) -//! - Event sync tracking by source (live/startup/reconnect/daily catchup) -//! - Gap events filled during catchup operations -//! -//! All metrics follow the `ngit_sync_` prefix convention. - -use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; - -use super::health::HealthState; - -/// Prometheus metrics for the proactive sync system -#[derive(Clone)] -pub struct SyncMetrics { - // === Connection metrics === - /// Per-relay connection status (1=connected, 0=disconnected) - relay_connected: IntGaugeVec, - /// Connection attempts by relay and result (success/failure) - connection_attempts_total: IntCounterVec, - - // === Health metrics === - /// Per-relay health status (healthy=1, degraded=2, dead=3) - relay_status: IntGaugeVec, - /// Per-relay consecutive failure count - relay_failures: IntGaugeVec, - - // === Event metrics === - /// Events synced by source (live/startup/reconnect/daily) - events_total: IntCounterVec, - /// Gap events filled during catchup, by relay - gap_events_total: IntCounterVec, - - // === Summary metrics === - /// Total relays discovered and tracked - relays_tracked_total: IntGauge, - /// Currently connected relay count - relays_connected_total: IntGauge, - /// Relays marked as dead - relays_dead_total: IntGauge, -} - -impl SyncMetrics { - /// Register all sync metrics with the provided Prometheus registry - pub fn register(registry: &Registry) -> Result { - // Connection metrics - let relay_connected = IntGaugeVec::new( - Opts::new( - "ngit_sync_relay_connected", - "Relay connection status (1=connected, 0=disconnected)", - ), - &["relay"], - )?; - registry.register(Box::new(relay_connected.clone()))?; - - let connection_attempts_total = IntCounterVec::new( - Opts::new( - "ngit_sync_connection_attempts_total", - "Total connection attempts by relay and result", - ), - &["relay", "result"], - )?; - registry.register(Box::new(connection_attempts_total.clone()))?; - - // Health metrics - let relay_status = IntGaugeVec::new( - Opts::new( - "ngit_sync_relay_status", - "Relay health status (1=healthy, 2=degraded, 3=dead)", - ), - &["relay"], - )?; - registry.register(Box::new(relay_status.clone()))?; - - let relay_failures = IntGaugeVec::new( - Opts::new( - "ngit_sync_relay_failures", - "Consecutive failure count per relay", - ), - &["relay"], - )?; - registry.register(Box::new(relay_failures.clone()))?; - - // Event metrics - let events_total = IntCounterVec::new( - Opts::new( - "ngit_sync_events_total", - "Total events synced by source type", - ), - &["source"], - )?; - registry.register(Box::new(events_total.clone()))?; - - let gap_events_total = IntCounterVec::new( - Opts::new( - "ngit_sync_gap_events_total", - "Gap events filled during catchup by relay", - ), - &["relay"], - )?; - registry.register(Box::new(gap_events_total.clone()))?; - - // Summary metrics - let relays_tracked_total = IntGauge::with_opts(Opts::new( - "ngit_sync_relays_tracked_total", - "Total number of relays discovered and tracked", - ))?; - registry.register(Box::new(relays_tracked_total.clone()))?; - - let relays_connected_total = IntGauge::with_opts(Opts::new( - "ngit_sync_relays_connected_total", - "Number of currently connected relays", - ))?; - registry.register(Box::new(relays_connected_total.clone()))?; - - let relays_dead_total = IntGauge::with_opts(Opts::new( - "ngit_sync_relays_dead_total", - "Number of relays marked as dead", - ))?; - registry.register(Box::new(relays_dead_total.clone()))?; - - Ok(Self { - relay_connected, - connection_attempts_total, - relay_status, - relay_failures, - events_total, - gap_events_total, - relays_tracked_total, - relays_connected_total, - relays_dead_total, - }) - } - - // === Connection Recording Methods === - - /// Record a connection attempt (success or failure) - pub fn record_connection_attempt(&self, relay: &str, success: bool) { - let result = if success { "success" } else { "failure" }; - self.connection_attempts_total - .with_label_values(&[relay, result]) - .inc(); - } - - /// Set relay connection status - pub fn set_relay_connected(&self, relay: &str, connected: bool) { - self.relay_connected - .with_label_values(&[relay]) - .set(if connected { 1 } else { 0 }); - - // Update connected count based on all relay values - // This is handled by update_connected_count() for accuracy - } - - /// Update the total connected relay count - pub fn update_connected_count(&self, count: i64) { - self.relays_connected_total.set(count); - } - - /// Increment connected count - pub fn inc_connected_count(&self) { - self.relays_connected_total.inc(); - } - - /// Decrement connected count - pub fn dec_connected_count(&self) { - self.relays_connected_total.dec(); - } - - // === Health Recording Methods === - - /// Record relay health state change - pub fn record_health_state(&self, relay: &str, state: HealthState) { - let state_value = match state { - HealthState::Healthy => 1, - HealthState::Degraded => 2, - HealthState::Dead => 3, - }; - self.relay_status.with_label_values(&[relay]).set(state_value); - } - - /// Record relay failure count - pub fn record_failure_count(&self, relay: &str, count: u32) { - self.relay_failures - .with_label_values(&[relay]) - .set(count as i64); - } - - /// Update dead relay count - pub fn update_dead_count(&self, count: i64) { - self.relays_dead_total.set(count); - } - - /// Increment dead relay count - pub fn inc_dead_count(&self) { - self.relays_dead_total.inc(); - } - - /// Decrement dead relay count - pub fn dec_dead_count(&self) { - self.relays_dead_total.dec(); - } - - // === Event Recording Methods === - - /// Record a synced event by source type - /// - /// Source types: - /// - "live" - Real-time subscription events - /// - "startup" - Events from startup catchup - /// - "reconnect" - Events from reconnection catchup - /// - "daily" - Events from daily catchup - pub fn record_event(&self, source: &str) { - self.events_total.with_label_values(&[source]).inc(); - } - - /// Record multiple events synced by source type - pub fn record_events(&self, source: &str, count: u64) { - self.events_total - .with_label_values(&[source]) - .inc_by(count); - } - - /// Record a gap event filled during catchup - pub fn record_gap_event(&self, relay: &str) { - self.gap_events_total.with_label_values(&[relay]).inc(); - } - - /// Record multiple gap events filled during catchup - pub fn record_gap_events(&self, relay: &str, count: u64) { - self.gap_events_total - .with_label_values(&[relay]) - .inc_by(count); - } - - // === Summary Recording Methods === - - /// Set the total tracked relay count - pub fn set_tracked_count(&self, count: i64) { - self.relays_tracked_total.set(count); - } - - /// Increment tracked relay count - pub fn inc_tracked_count(&self) { - self.relays_tracked_total.inc(); - } - - /// Get current tracked relay count - pub fn get_tracked_count(&self) -> i64 { - self.relays_tracked_total.get() - } - - /// Get current connected relay count - pub fn get_connected_count(&self) -> i64 { - self.relays_connected_total.get() - } - - /// Get current dead relay count - pub fn get_dead_count(&self) -> i64 { - self.relays_dead_total.get() - } -} - -/// Event source types for metrics tracking -pub mod event_source { - /// Real-time subscription events - pub const LIVE: &str = "live"; - /// Events from startup catchup - pub const STARTUP: &str = "startup"; - /// Events from reconnection catchup - pub const RECONNECT: &str = "reconnect"; - /// Events from daily catchup - pub const DAILY: &str = "daily"; -} - -#[cfg(test)] -mod tests { - use super::*; - - fn create_test_registry() -> Registry { - Registry::new() - } - - #[test] - fn test_metrics_registration() { - let registry = create_test_registry(); - let metrics = SyncMetrics::register(®istry); - assert!(metrics.is_ok()); - } - - #[test] - fn test_connection_metrics() { - let registry = create_test_registry(); - let metrics = SyncMetrics::register(®istry).unwrap(); - - metrics.record_connection_attempt("wss://relay1.example.com", true); - metrics.record_connection_attempt("wss://relay1.example.com", false); - metrics.record_connection_attempt("wss://relay2.example.com", true); - - metrics.set_relay_connected("wss://relay1.example.com", true); - metrics.inc_connected_count(); - - assert_eq!(metrics.get_connected_count(), 1); - } - - #[test] - fn test_health_metrics() { - let registry = create_test_registry(); - let metrics = SyncMetrics::register(®istry).unwrap(); - - metrics.record_health_state("wss://relay1.example.com", HealthState::Healthy); - metrics.record_health_state("wss://relay2.example.com", HealthState::Degraded); - metrics.record_health_state("wss://relay3.example.com", HealthState::Dead); - - metrics.record_failure_count("wss://relay2.example.com", 5); - metrics.update_dead_count(1); - - assert_eq!(metrics.get_dead_count(), 1); - } - - #[test] - fn test_event_metrics() { - let registry = create_test_registry(); - let metrics = SyncMetrics::register(®istry).unwrap(); - - metrics.record_event(event_source::LIVE); - metrics.record_events(event_source::STARTUP, 10); - metrics.record_gap_event("wss://relay1.example.com"); - metrics.record_gap_events("wss://relay2.example.com", 5); - } - - #[test] - fn test_summary_metrics() { - let registry = create_test_registry(); - let metrics = SyncMetrics::register(®istry).unwrap(); - - metrics.set_tracked_count(5); - assert_eq!(metrics.get_tracked_count(), 5); - - metrics.inc_tracked_count(); - assert_eq!(metrics.get_tracked_count(), 6); - - metrics.update_connected_count(3); - assert_eq!(metrics.get_connected_count(), 3); - } -} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 9dec982..c1f8bca 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,1039 +1,375 @@ -//! Proactive Sync Module for GRASP-02 +//! Proactive Sync Module - GRASP-02 v4 Implementation //! -//! This module implements the proactive sync system that ensures data availability -//! for repositories hosted on this relay by syncing from other relays in the ecosystem. +//! This module implements proactive synchronization of repository data from external +//! relays based on relay URLs listed in 30617 repository announcements. //! -//! ## Architecture Overview +//! ## Architecture //! -//! The sync system is built around two core data structures: +//! The sync system uses three index structures: +//! - `RepoSyncIndex` - What we WANT to sync (source of truth from self-subscription) +//! - `RelaySyncIndex` - What we have CONFIRMED syncing + connection state +//! - `PendingSyncIndex` - In-flight batches awaiting EOSE confirmation //! -//! - **FollowingRepoRootEvents**: Tracks repository root events we're following -//! - **SyncRelays**: Tracks relays we sync from, including their repos and events -//! -//! These type aliases are colocated with SyncManager (following the pattern of -//! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity. -//! -//! ## Submodules -//! -//! - [`health`]: Relay health tracking with exponential backoff and dead relay detection -//! - [`metrics`]: Prometheus metrics for sync operations -//! -//! ## Memory Estimates (from design doc) -//! -//! At target scale (1,000 repos, 100 relays): -//! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB -//! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB -//! - **Total in-memory state**: ~10 MB -//! -//! ## Upper Bounds (triggers for redesign) -//! -//! - 10,000+ repos: Consider database-backed state -//! - 500+ sync relays: Consider connection pooling -//! - 500+ root events per repo: Consider per-repo pagination -//! -//! ## Design References -//! -//! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md) -//! for the complete design context. +//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; use std::sync::Arc; -use nostr_relay_builder::prelude::{ - DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy, -}; use nostr_sdk::prelude::*; -use nostr_sdk::EventId; -use tokio::sync::{mpsc, RwLock}; +use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; +use tokio::sync::RwLock; use crate::config::Config; -use crate::nostr::builder::Nip34WritePolicy; -use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; -use crate::nostr::SharedDatabase; - -mod relay_connection; -mod self_subscriber; -pub use relay_connection::{RelayConnection, RelayEvent}; -pub use self_subscriber::{RelayAction, SelfSubscriber}; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; // ============================================================================= -// Type Aliases for Sync State +// Type Aliases for Index Structures // ============================================================================= -/// Repository root events we're following. -/// -/// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621) -/// we need to follow for each repository we host. -/// -/// ## Key Format -/// -/// The key is a repository addressable reference in the format: -/// `"30617::"` -/// -/// For example: `"30617:abc123...def:my-project"` -/// -/// ## Value -/// -/// A set of event IDs representing root events (PRs, Issues, Patches, Status events) -/// that reference this repository via an `a` tag. -/// -/// ## Event Kinds Tracked -/// -/// - **1617**: Patches (NIP-34) -/// - **1618**: Issues (NIP-34) -/// - **1619**: PRs (Pull Requests, NIP-34) -/// - **1621**: Status events (NIP-34) -/// -/// ## Invariants -/// -/// - May include a few extra repo refs that aren't in `SyncRelays` -/// - This is acceptable - we won't query other relays for them -/// - Updated incrementally via self-subscription -/// -/// ## Thread Safety -/// -/// Wrapped in `Arc>` for safe concurrent access from multiple -/// async tasks performing sync operations. -/// -/// ## Example Usage -/// -/// ```rust,ignore -/// use ngit_grasp::sync::FollowingRepoRootEvents; -/// use std::collections::HashSet; -/// use nostr_sdk::EventId; -/// -/// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) { -/// let guard = state.read().await; -/// if let Some(events) = guard.get(repo_ref) { -/// println!("Tracking {} root events for {}", events.len(), repo_ref); -/// } -/// } -/// ``` -pub type FollowingRepoRootEvents = Arc>>>; - -/// Relays we sync from, including their repos and events. -/// -/// This structure tracks which relays we need to connect to for syncing, -/// and for each relay, which repositories and their root events we're interested in. -/// -/// ## Key Format (Outer HashMap) -/// -/// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"` -/// -/// ## Value Format (Inner HashMap) -/// -/// For each relay, we maintain a map of: -/// - Key: Repository addressable reference (`"30617::"`) -/// - Value: Set of event IDs for that repo which should be synced from this relay -/// -/// ## Relay Selection Criteria -/// -/// A relay is included if its URL appears in a repository announcement (kind 30617) -/// that **also** lists our service URL. This ensures we only sync from relays -/// for repositories that are actually hosted on our relay. -/// -/// ## Bootstrap Relay -/// -/// If configured, the bootstrap relay is always present in this map and is -/// excluded from automatic removal logic. The bootstrap relay is used for -/// initial sync and discovery even when no repositories explicitly list it. -/// -/// ## Thread Safety -/// -/// Wrapped in `Arc>` for safe concurrent access from multiple -/// async tasks performing sync operations. -/// -/// ## Example Usage -/// -/// ```rust,ignore -/// use ngit_grasp::sync::SyncRelays; -/// use std::collections::{HashMap, HashSet}; -/// -/// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) { -/// let guard = state.read().await; -/// if let Some(repos) = guard.get(relay_url) { -/// println!("Relay {} tracks {} repos", relay_url, repos.len()); -/// for (repo_ref, events) in repos { -/// println!(" {} -> {} events", repo_ref, events.len()); -/// } -/// } -/// } -/// ``` -pub type SyncRelays = Arc>>>>; +/// What we WANT to sync - derived from events received via self-subscription. +/// Updated immediately when self-subscriber batch fires. +/// Key: repo addressable ref - 30617:pubkey:identifier +pub type RepoSyncIndex = Arc>>; -/// Creates a new empty `FollowingRepoRootEvents` state. -/// -/// Use this to initialize the state before populating from database queries. -pub fn new_following_repo_root_events() -> FollowingRepoRootEvents { - Arc::new(RwLock::new(HashMap::new())) -} +/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. +/// Key: relay URL +pub type RelaySyncIndex = Arc>>; -/// Creates a new empty `SyncRelays` state. -/// -/// Use this to initialize the state before populating from database queries. -pub fn new_sync_relays() -> SyncRelays { - Arc::new(RwLock::new(HashMap::new())) -} +/// Tracks batches of subscriptions that are in-flight, awaiting EOSE. +/// Each batch has its own ID and can confirm independently. +/// Key: relay URL +pub type PendingSyncIndex = Arc>>>; // ============================================================================= -// SyncManager +// Supporting Data Structures // ============================================================================= -/// Manages proactive synchronization with external relays. -/// -/// The SyncManager is responsible for: -/// - Discovering relays from stored repository announcements -/// - Maintaining connections to sync relays -/// - Subscribing to events at external relays -/// - Applying the acceptance policy to synced events -/// -/// ## Lifecycle -/// -/// 1. `new()` - Creates manager with database and config -/// 2. `run()` - Main async loop (call in a spawned task) -/// -/// ## Current Status -/// -/// Phase 2 implementation supports: -/// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter -/// - Event processing through write policy -/// - Storage of accepted events -/// -/// Core data structures: -/// - [`FollowingRepoRootEvents`]: Repository root events we're following -/// - [`SyncRelays`]: Relays we sync from with their repos and events -pub struct SyncManager { - /// Bootstrap relay URL if configured - bootstrap_relay_url: Option, - - /// Our service domain for filtering repo announcements - #[allow(dead_code)] - service_domain: String, - - /// Database for querying/storing events - database: SharedDatabase, - - /// Write policy for applying acceptance rules - write_policy: Nip34WritePolicy, - - /// Repository root events we're following (Phase 1 data structure) - #[allow(dead_code)] - following_repo_root_events: FollowingRepoRootEvents, - - /// Relays we sync from (Phase 1 data structure) - #[allow(dead_code)] - sync_relays: SyncRelays, - - /// Max backoff duration for relay reconnection - #[allow(dead_code)] - max_backoff_secs: u64, +/// What repos and root events need to be synced +#[derive(Debug, Clone, Default)] +pub struct RepoSyncNeeds { + /// Relay URLs listed in this repo's 30617 announcement + pub relays: HashSet, + /// Root event IDs - 1617/1618/1619/1621 - that reference this repo + pub root_events: HashSet, +} - /// Socket address used for sync source (for write policy) - sync_source_addr: SocketAddr, +/// Connection status for a relay +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum ConnectionStatus { + /// Not currently connected + #[default] + Disconnected, + /// Connection attempt in progress + Connecting, + /// Successfully connected and subscribed + Connected, } -impl SyncManager { - /// Creates a new SyncManager. - /// - /// # Arguments - /// - /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync - /// * `service_domain` - Our domain for filtering announcements - /// * `database` - Database for event storage/queries - /// * `write_policy` - Policy for accepting events - /// * `config` - Configuration for sync parameters - pub fn new( - bootstrap_relay_url: Option, - service_domain: String, - database: SharedDatabase, - write_policy: Nip34WritePolicy, - config: &Config, - ) -> Self { - // Create a synthetic SocketAddr for sync source identification - // This is used when calling write_policy.admit_event() for synced events - let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); +/// Complete state for a single relay - combines sync needs with connection lifecycle +#[derive(Debug)] +pub struct RelayState { + /// Repos we have confirmed syncing from this relay + pub repos: HashSet, + /// Root events we have confirmed tracking + pub root_events: HashSet, + /// If true, never disconnect this relay + pub is_bootstrap: bool, + /// Current connection status + pub connection_status: ConnectionStatus, + /// When we last successfully connected - used for since filter on reconnect + pub last_connected: Option, + /// When we disconnected - for 15-minute state retention rule + pub disconnected_at: Option, + // The active connection - will be added in Phase 4 + // pub connection: Option, +} +impl Default for RelayState { + fn default() -> Self { Self { - bootstrap_relay_url, - service_domain, - database, - write_policy, - following_repo_root_events: new_following_repo_root_events(), - sync_relays: new_sync_relays(), - max_backoff_secs: config.sync_max_backoff_secs, - sync_source_addr, - } - } - - /// Returns a reference to the following repo root events state. - /// - /// This is the Phase 1 data structure tracking which repository root events - /// (kinds 1617, 1618, 1619, 1621) we're following. - pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents { - &self.following_repo_root_events - } - - /// Returns a reference to the sync relays state. - /// - /// This is the Phase 1 data structure tracking which relays we sync from - /// and their associated repositories/events. - pub fn sync_relays(&self) -> &SyncRelays { - &self.sync_relays - } - - // ========================================================================= - // Phase 2: Database Initialization - // ========================================================================= - - /// Initialize sync state from database queries at startup. - /// - /// This method performs two database queries: - /// 1. Query kinds 1617/1618/1619/1621 to build `following_repo_root_events` - /// 2. Query kind 30617 to build `sync_relays` - /// - /// The bootstrap relay (if configured) is always added to `sync_relays`. - /// - /// # Errors - /// - /// Returns an error if database queries fail. - pub async fn initialize_from_database(&self) -> Result<(), String> { - // Initialize bootstrap relay if configured (never removed) - if let Some(bootstrap_url) = &self.bootstrap_relay_url { - self.sync_relays.write().await.insert( - bootstrap_url.clone(), - HashMap::new(), // Repos potentially populated below but may stay empty (Layer 1 only) - ); - tracing::info!("Added bootstrap relay to sync_relays: {}", bootstrap_url); - } - - // Query 1: Build following_repo_root_events - // Find all 1617/1618/1619/1621 events and extract their repo references - let root_event_kinds = vec![ - Kind::GitPatch, // 1617 - Kind::from(KIND_PR), // 1618 - Kind::from(KIND_PR_UPDATE), // 1619 - Kind::GitIssue, // 1621 - ]; - - let filter = Filter::new().kinds(root_event_kinds); - let root_events = self - .database - .query(filter) - .await - .map_err(|e| format!("Failed to query root events: {}", e))?; - - let mut root_events_count = 0; - for event in root_events { - // An event may have multiple 'a' tags pointing to different repos - let repo_refs = Self::extract_all_repo_refs(&event); - for repo_ref in repo_refs { - self.following_repo_root_events - .write() - .await - .entry(repo_ref) - .or_default() - .insert(event.id); - root_events_count += 1; - } - } - tracing::info!( - "Populated following_repo_root_events with {} repo-event mappings", - root_events_count - ); - - // Query 2: Build sync_relays from kind 30617 announcements - let announcement_filter = Filter::new().kind(Kind::from(KIND_REPOSITORY_ANNOUNCEMENT)); - let announcements = self - .database - .query(announcement_filter) - .await - .map_err(|e| format!("Failed to query announcements: {}", e))?; - - let mut sync_relays_count = 0; - for event in announcements { - let repo_ref = Self::build_repo_ref(&event); - let relay_urls = Self::extract_relay_urls(&event); - - // Only track repos that list BOTH a remote relay AND our service - if self.lists_our_service(&event) { - for relay_url in relay_urls { - if !self.is_own_relay(&relay_url) { - // Get events for this repo from following_repo_root_events - let events = self - .following_repo_root_events - .read() - .await - .get(&repo_ref) - .cloned() - .unwrap_or_default(); - - self.sync_relays - .write() - .await - .entry(relay_url) - .or_default() - .insert(repo_ref.clone(), events); - sync_relays_count += 1; - } - } - } - } - tracing::info!( - "Populated sync_relays with {} relay-repo mappings", - sync_relays_count - ); - - Ok(()) - } - - // ========================================================================= - // Helper Methods for Event Extraction - // ========================================================================= - - /// Extract ALL repo refs from an event (it may tag multiple repos). - /// - /// Looks for 'a' tags that reference kind 30617 (repository announcements). - /// Returns refs in format "30617:pubkey:identifier". - pub fn extract_all_repo_refs(event: &Event) -> Vec { - event - .tags - .iter() - .filter_map(|tag| { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() >= 2 && tag_vec[0] == "a" { - // Validate it's a 30617 reference - if tag_vec[1].starts_with("30617:") { - Some(tag_vec[1].clone()) - } else { - None - } - } else { - None - } - }) - .collect() - } - - /// Build a repo ref string from a 30617 announcement event. - /// - /// Returns format "30617:pubkey:identifier". - pub fn build_repo_ref(event: &Event) -> String { - // Extract 'd' tag for identifier - let identifier = event - .tags - .iter() - .find(|tag| tag.kind() == TagKind::d()) - .and_then(|tag| tag.content()) - .map(|s| s.to_string()) - .unwrap_or_default(); - - format!("30617:{}:{}", event.pubkey.to_hex(), identifier) - } - - /// Extract relay URLs from a repository announcement event. - /// - /// Looks for the 'relays' tag and returns all relay URLs. - pub fn extract_relay_urls(event: &Event) -> Vec { - event - .tags - .iter() - .filter(|tag| matches!(tag.kind(), TagKind::Relays)) - .flat_map(|tag| { - let vec = tag.clone().to_vec(); - // Skip first element (tag name), rest are values - vec.into_iter().skip(1) - }) - .collect() - } - - /// Check if event lists our service in the relays tag. - /// - /// Compares relay URLs against our service domain. - fn lists_our_service(&self, event: &Event) -> bool { - let relay_urls = Self::extract_relay_urls(event); - relay_urls.iter().any(|url| self.is_own_relay(url)) - } - - /// Check if a relay URL matches our relay. - /// - /// Compares the URL against our service domain. - fn is_own_relay(&self, relay_url: &str) -> bool { - // Normalize comparison: check if URL contains our domain - relay_url.contains(&self.service_domain) - } - - // ========================================================================= - // Main Run Loop - // ========================================================================= - - /// Runs the sync manager main loop. - /// - /// This method should be called in a spawned task: - /// - /// ```rust,ignore - /// tokio::spawn(async move { - /// sync_manager.run().await; - /// }); - /// ``` - /// - /// ## Implementation Status - /// - /// - Phase 2: Layer 1 sync from bootstrap relay ✓ - /// - Phase 3: Self-subscription and relay discovery ✓ - /// - Phase 4-6: Filter building, connection management (TODO) - /// - Phase 7: Full sync loop (TODO) - pub async fn run(self) { - tracing::info!( - "SyncManager starting (bootstrap_relay={:?}, domain={})", - self.bootstrap_relay_url, - self.service_domain - ); - - // Phase 3: Initialize state from database BEFORE spawning connections - if let Err(e) = self.initialize_from_database().await { - tracing::error!("Failed to initialize from database: {}", e); - // Continue anyway - we can still sync from bootstrap - } - - // Create channel for relay actions from self-subscriber - let (action_tx, mut action_rx) = mpsc::channel::(100); - - // Construct our own relay URL for self-subscription - let own_relay_url = format!("ws://{}", self.service_domain); - - // Spawn self-subscriber task - let self_subscriber = SelfSubscriber::new( - own_relay_url.clone(), - self.service_domain.clone(), - Arc::clone(&self.following_repo_root_events), - Arc::clone(&self.sync_relays), - action_tx, - ); - - tokio::spawn(async move { - self_subscriber.run().await; - }); - - tracing::info!("SelfSubscriber spawned for {}", own_relay_url); - - // Track active relay connections (relay_url -> event_sender) - let mut active_relays: HashMap> = HashMap::new(); - - // Phase 2: Connect to bootstrap relay if configured - if let Some(ref bootstrap_url) = self.bootstrap_relay_url { - if let Some(event_tx) = self - .spawn_relay_connection(bootstrap_url.clone(), None) - .await - { - active_relays.insert(bootstrap_url.clone(), event_tx); - } - } - - // Main coordination loop - loop { - tokio::select! { - // Handle relay actions from self-subscriber - action = action_rx.recv() => { - match action { - Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => { - tracing::info!("Spawning new relay connection to {}", relay_url); - if !active_relays.contains_key(&relay_url) { - if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await { - active_relays.insert(relay_url, event_tx); - } - } - } - Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => { - tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len()); - // TODO: Implement filter updates for existing connections - } - None => { - tracing::info!("Action channel closed, continuing without self-subscriber"); - } - } - } - // Sleep to prevent busy loop when no events - _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { - // Periodic maintenance could go here - } - } - } - } - - /// Spawn a relay connection with optional Layer 2 filters. - /// - /// Returns the event sender channel if successfully spawned. - async fn spawn_relay_connection( - &self, - relay_url: String, - repos: Option>>, - ) -> Option> { - // Create channel for receiving events - let (event_tx, event_rx) = mpsc::channel::(100); - - // Create connection - let connection = RelayConnection::new(relay_url.clone()); - - // Determine if this is bootstrap (no repos) or discovered relay (with repos) - let is_bootstrap = repos.is_none(); - - match connection.connect_and_subscribe().await { - Ok(()) => { - if is_bootstrap { - tracing::info!("Bootstrap relay connection established: {}", relay_url); - } else { - tracing::info!( - "Discovered relay connection established: {} (with Layer 2 filters)", - relay_url - ); - - // Add Layer 2 subscription for repo events - if let Some(ref repos) = repos { - if let Err(e) = self.add_layer2_subscription(&connection, repos).await { - tracing::warn!("Failed to add Layer 2 subscription: {}", e); - } - } - } - - // Clone refs needed for event processing task - let database = Arc::clone(&self.database); - let write_policy = self.write_policy.clone(); - let sync_source_addr = self.sync_source_addr; - - // Clone event_tx for the spawned task - let event_tx_clone = event_tx.clone(); - - // Spawn event loop task - let conn_url = relay_url.clone(); - tokio::spawn(async move { - connection.run_event_loop(event_tx_clone).await; - }); - - // Spawn event processing task - tokio::spawn(async move { - Self::process_relay_events( - event_rx, - database, - write_policy, - sync_source_addr, - conn_url, - ) - .await; - }); - - Some(event_tx) - } - Err(e) => { - tracing::error!("Failed to connect to relay {}: {}", relay_url, e); - None - } - } - } - - /// Add Layer 2 subscription for repo-related events. - /// - /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track. - async fn add_layer2_subscription( - &self, - connection: &RelayConnection, - repos: &HashMap>, - ) -> Result<(), String> { - if repos.is_empty() { - return Ok(()); - } - - // Build repo refs list for filter - let repo_refs: Vec = repos.keys().cloned().collect(); - - tracing::debug!( - "Adding Layer 2 subscription for {} repos to {}", - repo_refs.len(), - connection.url() - ); - - // Chunk repo_refs into groups of 100 (per plan) - for chunk in repo_refs.chunks(100) { - // Build filter with lowercase 'a' tag for each repo ref - let mut filter = Filter::new().kinds([ - Kind::GitPatch, // 1617 - Kind::Custom(1618), // PR - Kind::Custom(1619), // PR update - Kind::GitIssue, // 1621 - ]); - - // Add each repo ref as a custom tag filter - for repo_ref in chunk { - filter = - filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone()); - } - - // Subscribe to this filter - if let Err(e) = connection.subscribe_filter(filter).await { - return Err(format!("Failed to subscribe with Layer 2 filter: {}", e)); - } + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Disconnected, + last_connected: None, + disconnected_at: None, } - - Ok(()) } +} - /// Process events from a single relay connection. - /// - /// This is a static method that runs in its own task. - async fn process_relay_events( - mut event_rx: mpsc::Receiver, - database: SharedDatabase, - write_policy: Nip34WritePolicy, - sync_source_addr: SocketAddr, - relay_url: String, - ) { - tracing::debug!("Starting event processing for relay: {}", relay_url); - - while let Some(relay_event) = event_rx.recv().await { - match relay_event { - RelayEvent::Event(event) => { - Self::process_single_event_static( - &event, - &database, - &write_policy, - &sync_source_addr, - &relay_url, - ) - .await; - } - RelayEvent::EndOfStoredEvents => { - tracing::debug!("EOSE received from {}", relay_url); - } - RelayEvent::Closed(reason) => { - tracing::warn!("Connection to {} closed: {}", relay_url, reason); - break; - } +impl RelayState { + /// Check if state should be cleared based on 15-minute rule + pub fn should_clear_state(&self) -> bool { + match self.disconnected_at { + Some(disconnected) => { + let now = Timestamp::now(); + now.as_secs().saturating_sub(disconnected.as_secs()) > 900 // 15 minutes } + None => false, // Still connected or never connected } - - tracing::info!("Event processing ended for relay: {}", relay_url); } - /// Process a single event (static version for use in spawned tasks). - async fn process_single_event_static( - event: &Event, - database: &SharedDatabase, - write_policy: &Nip34WritePolicy, - sync_source_addr: &SocketAddr, - relay_url: &str, - ) { - let event_id = event.id; - let kind = event.kind.as_u16(); - - // Check if event already exists in database - match database.check_id(&event_id).await { - Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => { - tracing::trace!("Event {} already exists, skipping", event_id); - return; - } - Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing - Err(e) => { - tracing::warn!("Failed to check if event {} exists: {}", event_id, e); - } - } - - // Pass through write policy - let policy_result = write_policy.admit_event(event, sync_source_addr).await; - - match policy_result { - PolicyResult::Accept => match database.save_event(event).await { - Ok(SaveEventStatus::Success) => { - tracing::info!( - "Synced event {} (kind {}) from {}", - event_id, - kind, - relay_url - ); - } - Ok(_) => { - tracing::debug!( - "Event {} (kind {}) already stored or rejected by database", - event_id, - kind - ); - } - Err(e) => { - tracing::error!("Failed to save synced event {}: {}", event_id, e); - } - }, - PolicyResult::Reject(reason) => { - tracing::debug!( - "Rejected synced event {} (kind {}): {}", - event_id, - kind, - reason - ); - } - } + /// Clear repos and root_events - called when reconnect takes > 15 minutes + pub fn clear_sync_state(&mut self) { + self.repos.clear(); + self.root_events.clear(); } } -// ============================================================================= -// Submodules -// ============================================================================= - -pub mod health; -pub mod metrics; +/// A batch of items pending EOSE confirmation +#[derive(Debug, Clone)] +pub struct PendingBatch { + /// Unique ID for this batch - for debugging/logging + pub batch_id: u64, + /// The items this batch is syncing + pub items: PendingItems, + /// Subscription IDs that must ALL receive EOSE before confirming + pub outstanding_subs: HashSet, +} -// Re-export commonly used types -pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker}; -pub use metrics::{event_source, SyncMetrics}; +/// Items included in a pending batch +#[derive(Debug, Clone, Default)] +pub struct PendingItems { + /// Repos being synced in this batch + pub repos: HashSet, + /// Root events being synced in this batch + pub root_events: HashSet, +} // ============================================================================= -// Tests +// SyncMetrics - Prometheus Metrics for Sync System // ============================================================================= -#[cfg(test)] -mod tests { - use super::*; - use nostr_relay_builder::prelude::{EventBuilder, Keys, Tag}; +/// Prometheus metrics for the proactive sync system. +/// +/// Tracks relay connections, sync progress, and operational statistics. +/// Following the comprehensive v3 metrics design. +#[derive(Clone)] +pub struct SyncMetrics { + // === Connection metrics === + /// Per-relay connection status (1=connected, 0=disconnected) + relay_connected: IntGaugeVec, + /// Connection attempts by relay and result (success/failure) + connection_attempts_total: IntCounterVec, + + // === Event metrics === + /// Events synced by source (live/startup/reconnect/daily) + events_total: IntCounterVec, + + // === Summary metrics === + /// Total relays discovered and tracked + relays_tracked_total: IntGauge, + /// Currently connected relay count + relays_connected_total: IntGauge, +} - /// Helper to create a test event with specific tags - fn create_test_event(kind: Kind, tags: Vec) -> Event { - let keys = Keys::generate(); - EventBuilder::new(kind, "test content") - .tags(tags) - .sign_with_keys(&keys) - .expect("Failed to sign test event") +impl SyncMetrics { + /// Register sync metrics with a Prometheus registry. + /// + /// Returns an error if metrics are already registered (e.g., in tests). + pub fn register(registry: &Registry) -> Result { + // Connection metrics + let relay_connected = IntGaugeVec::new( + Opts::new( + "ngit_sync_relay_connected", + "Relay connection status (1=connected, 0=disconnected)", + ), + &["relay"], + )?; + registry.register(Box::new(relay_connected.clone()))?; + + let connection_attempts_total = IntCounterVec::new( + Opts::new( + "ngit_sync_connection_attempts_total", + "Total connection attempts by relay and result", + ), + &["relay", "result"], + )?; + registry.register(Box::new(connection_attempts_total.clone()))?; + + // Event metrics + let events_total = IntCounterVec::new( + Opts::new( + "ngit_sync_events_total", + "Total events synced by source type", + ), + &["source"], + )?; + registry.register(Box::new(events_total.clone()))?; + + // Summary metrics + let relays_tracked_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_tracked_total", + "Total number of relays discovered and tracked", + ))?; + registry.register(Box::new(relays_tracked_total.clone()))?; + + let relays_connected_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_connected_total", + "Number of currently connected relays", + ))?; + registry.register(Box::new(relays_connected_total.clone()))?; + + Ok(Self { + relay_connected, + connection_attempts_total, + events_total, + relays_tracked_total, + relays_connected_total, + }) } - // ========================================================================= - // Tests for extract_all_repo_refs - // ========================================================================= + // === Connection Recording Methods === - #[test] - fn test_extract_all_repo_refs_single_ref() { - let event = create_test_event( - Kind::GitPatch, - vec![Tag::custom( - nostr_relay_builder::prelude::TagKind::custom("a"), - vec!["30617:abc123def456:my-project"], - )], - ); - - let refs = SyncManager::extract_all_repo_refs(&event); - assert_eq!(refs.len(), 1); - assert_eq!(refs[0], "30617:abc123def456:my-project"); + /// Record a connection attempt (success or failure) + pub fn record_connection_attempt(&self, relay: &str, success: bool) { + let result = if success { "success" } else { "failure" }; + self.connection_attempts_total + .with_label_values(&[relay, result]) + .inc(); } - #[test] - fn test_extract_all_repo_refs_multiple_refs() { - let event = create_test_event( - Kind::GitPatch, - vec![ - Tag::custom( - nostr_relay_builder::prelude::TagKind::custom("a"), - vec!["30617:abc123:project1"], - ), - Tag::custom( - nostr_relay_builder::prelude::TagKind::custom("a"), - vec!["30617:def456:project2"], - ), - ], - ); - - let refs = SyncManager::extract_all_repo_refs(&event); - assert_eq!(refs.len(), 2); - assert!(refs.contains(&"30617:abc123:project1".to_string())); - assert!(refs.contains(&"30617:def456:project2".to_string())); + /// Set relay connection status + pub fn set_relay_connected(&self, relay: &str, connected: bool) { + self.relay_connected + .with_label_values(&[relay]) + .set(if connected { 1 } else { 0 }); } - #[test] - fn test_extract_all_repo_refs_ignores_non_30617() { - let event = create_test_event( - Kind::GitPatch, - vec![ - Tag::custom( - nostr_relay_builder::prelude::TagKind::custom("a"), - vec!["30617:abc123:valid-repo"], - ), - Tag::custom( - nostr_relay_builder::prelude::TagKind::custom("a"), - vec!["30618:def456:state-event"], // Not a repo ref - ), - ], - ); - - let refs = SyncManager::extract_all_repo_refs(&event); - assert_eq!(refs.len(), 1); - assert_eq!(refs[0], "30617:abc123:valid-repo"); + /// Increment connected count + pub fn inc_connected_count(&self) { + self.relays_connected_total.inc(); } - #[test] - fn test_extract_all_repo_refs_empty_when_no_a_tags() { - let event = create_test_event( - Kind::GitPatch, - vec![Tag::custom( - nostr_relay_builder::prelude::TagKind::custom("e"), - vec!["some-event-id"], - )], - ); - - let refs = SyncManager::extract_all_repo_refs(&event); - assert!(refs.is_empty()); + /// Decrement connected count + pub fn dec_connected_count(&self) { + self.relays_connected_total.dec(); } - // ========================================================================= - // Tests for build_repo_ref - // ========================================================================= + // === Event Recording Methods === - #[test] - fn test_build_repo_ref() { - let keys = Keys::generate(); - let event = EventBuilder::new(Kind::from(30617_u16), "announcement") - .tags(vec![Tag::custom( - nostr_relay_builder::prelude::TagKind::d(), - vec!["my-identifier"], - )]) - .sign_with_keys(&keys) - .expect("Failed to sign test event"); - - let repo_ref = SyncManager::build_repo_ref(&event); - assert!(repo_ref.starts_with("30617:")); - assert!(repo_ref.ends_with(":my-identifier")); - assert!(repo_ref.contains(&event.pubkey.to_hex())); + /// Record a synced event by source type + /// + /// Source types: + /// - "live" - Real-time subscription events + /// - "startup" - Events from startup catchup + /// - "reconnect" - Events from reconnection catchup + pub fn record_event(&self, source: &str) { + self.events_total.with_label_values(&[source]).inc(); } - #[test] - fn test_build_repo_ref_empty_identifier() { - let keys = Keys::generate(); - let event = EventBuilder::new(Kind::from(30617_u16), "announcement") - .sign_with_keys(&keys) - .expect("Failed to sign test event"); - - let repo_ref = SyncManager::build_repo_ref(&event); - assert!(repo_ref.starts_with("30617:")); - assert!(repo_ref.ends_with(":")); // Empty identifier + /// Record multiple events synced by source type + pub fn record_events(&self, source: &str, count: u64) { + self.events_total + .with_label_values(&[source]) + .inc_by(count); } - // ========================================================================= - // Tests for extract_relay_urls - // ========================================================================= - - #[test] - fn test_extract_relay_urls_single() { - let event = create_test_event( - Kind::from(30617_u16), - vec![Tag::custom( - nostr_relay_builder::prelude::TagKind::Relays, - vec!["wss://relay.example.com"], - )], - ); + // === Summary Recording Methods === - let urls = SyncManager::extract_relay_urls(&event); - assert_eq!(urls.len(), 1); - assert_eq!(urls[0], "wss://relay.example.com"); + /// Set the total tracked relay count + pub fn set_tracked_count(&self, count: i64) { + self.relays_tracked_total.set(count); } - #[test] - fn test_extract_relay_urls_multiple() { - let event = create_test_event( - Kind::from(30617_u16), - vec![Tag::custom( - nostr_relay_builder::prelude::TagKind::Relays, - vec!["wss://relay1.example.com", "wss://relay2.example.com"], - )], - ); - - let urls = SyncManager::extract_relay_urls(&event); - assert_eq!(urls.len(), 2); - assert!(urls.contains(&"wss://relay1.example.com".to_string())); - assert!(urls.contains(&"wss://relay2.example.com".to_string())); + /// Increment tracked relay count + pub fn inc_tracked_count(&self) { + self.relays_tracked_total.inc(); } - #[test] - fn test_extract_relay_urls_empty_when_no_relays_tag() { - let event = create_test_event( - Kind::from(30617_u16), - vec![Tag::custom( - nostr_relay_builder::prelude::TagKind::custom("d"), - vec!["my-project"], - )], - ); - - let urls = SyncManager::extract_relay_urls(&event); - assert!(urls.is_empty()); + /// Get current tracked relay count + pub fn get_tracked_count(&self) -> i64 { + self.relays_tracked_total.get() } - // ========================================================================= - // Original data structure tests - // ========================================================================= - - #[tokio::test] - async fn test_following_repo_root_events_basic_operations() { - let state = new_following_repo_root_events(); - - // Insert some events - { - let mut guard = state.write().await; - let repo_ref = "30617:abc123:my-project".to_string(); - guard - .entry(repo_ref) - .or_default() - .insert(EventId::all_zeros()); - } - - // Read back - { - let guard = state.read().await; - assert_eq!(guard.len(), 1); - assert!(guard.contains_key("30617:abc123:my-project")); - } + /// Get current connected relay count + pub fn get_connected_count(&self) -> i64 { + self.relays_connected_total.get() } +} - #[tokio::test] - async fn test_sync_relays_basic_operations() { - let state = new_sync_relays(); +/// Event source types for metrics tracking +pub mod event_source { + /// Real-time subscription events + pub const LIVE: &str = "live"; + /// Events from startup catchup + pub const STARTUP: &str = "startup"; + /// Events from reconnection catchup + pub const RECONNECT: &str = "reconnect"; +} - // Insert relay with repos - { - let mut guard = state.write().await; - let relay_url = "wss://relay.example.com".to_string(); - let repo_ref = "30617:abc123:my-project".to_string(); +// ============================================================================= +// SyncManager - Main Entry Point +// ============================================================================= - guard - .entry(relay_url) - .or_default() - .entry(repo_ref) - .or_default() - .insert(EventId::all_zeros()); - } +/// Manages proactive synchronization with external relays +/// +/// The SyncManager runs as a background task, subscribing to repository +/// announcements on the local relay and syncing data from external relays +/// listed in those announcements. +#[allow(dead_code)] // Fields will be used in later phases +pub struct SyncManager { + /// Bootstrap relay URL for initial sync (optional) + bootstrap_relay_url: Option, + /// Our service domain - used for filtering relevant repos + service_domain: String, + /// Database for event storage and queries + database: SharedDatabase, + /// Write policy for validating incoming events + write_policy: Nip34WritePolicy, + /// Configuration reference for sync settings + config: Config, + /// What we want to sync (source of truth) + repo_sync_index: RepoSyncIndex, + /// What we've confirmed syncing + connection state + relay_sync_index: RelaySyncIndex, + /// In-flight subscription batches + pending_sync_index: PendingSyncIndex, +} - // Read back - { - let guard = state.read().await; - assert_eq!(guard.len(), 1); - let relay_repos = guard.get("wss://relay.example.com").unwrap(); - assert_eq!(relay_repos.len(), 1); - let events = relay_repos.get("30617:abc123:my-project").unwrap(); - assert_eq!(events.len(), 1); +impl SyncManager { + /// Create a new SyncManager + /// + /// # Arguments + /// * `bootstrap_relay_url` - Optional relay URL for initial historical sync + /// * `service_domain` - The domain this relay serves (for filtering repos) + /// * `database` - Shared database for event storage + /// * `write_policy` - Policy for validating events before storage + /// * `config` - Configuration for sync settings + pub fn new( + bootstrap_relay_url: Option, + service_domain: String, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + config: &Config, + ) -> Self { + Self { + bootstrap_relay_url, + service_domain, + database, + write_policy, + config: config.clone(), + repo_sync_index: Arc::new(RwLock::new(HashMap::new())), + relay_sync_index: Arc::new(RwLock::new(HashMap::new())), + pending_sync_index: Arc::new(RwLock::new(HashMap::new())), } } - #[tokio::test] - async fn test_concurrent_access() { - let state = new_following_repo_root_events(); - let state_clone = Arc::clone(&state); - - // Writer task - let writer = tokio::spawn(async move { - let mut guard = state_clone.write().await; - guard - .entry("30617:writer:repo".to_string()) - .or_default() - .insert(EventId::all_zeros()); - }); - - // Wait for writer - writer.await.unwrap(); + /// Run the sync manager (placeholder for Phase 1) + /// + /// This will be implemented in later phases to: + /// 1. Subscribe to local relay for 30617 events + /// 2. Process events to build RepoSyncIndex + /// 3. Compute and execute sync actions + /// 4. Handle reconnection and catch-up logic + pub async fn run(self) { + tracing::info!( + bootstrap_relay = ?self.bootstrap_relay_url, + service_domain = %self.service_domain, + "SyncManager starting (placeholder - not yet implemented)" + ); - // Reader should see the change - let guard = state.read().await; - assert!(guard.contains_key("30617:writer:repo")); + // Phase 1: Just log and return + // Full implementation will be added in subsequent phases } -} +} \ No newline at end of file diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs deleted file mode 100644 index 71b5d51..0000000 --- a/src/sync/relay_connection.rs +++ /dev/null @@ -1,185 +0,0 @@ -//! Relay Connection for Proactive Sync -//! -//! This module handles connecting to external relays and receiving events -//! for the proactive sync system. - -use std::time::Duration; - -use nostr_sdk::prelude::*; -use tokio::sync::mpsc; - -use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; - -/// Events received from a relay connection -#[derive(Debug)] -pub enum RelayEvent { - /// A nostr event was received - Event(Event), - /// End of stored events (EOSE) received - EndOfStoredEvents, - /// Connection was closed - Closed(String), -} - -/// Connection to an external relay for syncing events. -/// -/// RelayConnection handles: -/// - Connecting to the relay -/// - Subscribing with appropriate filters (Layer 1 for bootstrap) -/// - Receiving events and sending them through a channel -pub struct RelayConnection { - /// The relay URL - url: String, - /// The nostr-sdk client - client: Client, -} - -impl RelayConnection { - /// Create a new relay connection. - /// - /// # Arguments - /// - /// * `url` - The WebSocket URL of the relay to connect to - pub fn new(url: String) -> Self { - // Create a client with generated keys (we're just subscribing, not publishing) - let keys = Keys::generate(); - let client = Client::new(keys); - - Self { url, client } - } - - /// Connect to the relay and subscribe with Layer 1 filter. - /// - /// Layer 1 filter syncs announcement events (30617, 30618) which are - /// the foundation for discovering repository relationships. - /// - /// Returns the notification stream for receiving events. - pub async fn connect_and_subscribe(&self) -> Result<(), String> { - // Add the relay - self.client - .add_relay(&self.url) - .await - .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; - - // Connect to relay - self.client.connect().await; - - // Wait for connection to establish - let mut connected = false; - for _ in 0..30 { - tokio::time::sleep(Duration::from_millis(100)).await; - let relays = self.client.relays().await; - if relays.values().any(|r| r.is_connected()) { - connected = true; - break; - } - } - - if !connected { - return Err(format!( - "Failed to connect to relay {} after 3 seconds", - self.url - )); - } - - tracing::info!("Connected to bootstrap relay: {}", self.url); - - // Layer 1 filter: Repository announcements and state events - // These are addressable events that define repositories - let filter = Filter::new().kinds([ - Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 - Kind::Custom(KIND_REPOSITORY_STATE), // 30618 - ]); - - // Subscribe to the filter - self.client - .subscribe(filter, None) - .await - .map_err(|e| format!("Failed to subscribe: {}", e))?; - - tracing::debug!( - "Subscribed to Layer 1 events (kinds 30617, 30618) from {}", - self.url - ); - - Ok(()) - } - - /// Run the event loop, sending received events through the channel. - /// - /// This method runs until the connection is closed or an error occurs. - /// - /// # Arguments - /// - /// * `event_sender` - Channel to send received events - pub async fn run_event_loop(self, event_sender: mpsc::Sender) { - tracing::debug!("Starting event loop for relay: {}", self.url); - - // Handle notifications - self.client - .handle_notifications(|notification| async { - match notification { - RelayPoolNotification::Event { event, .. } => { - tracing::debug!( - "Received event {} (kind {}) from {}", - event.id, - event.kind.as_u16(), - self.url - ); - if event_sender.send(RelayEvent::Event(*event)).await.is_err() { - tracing::warn!("Event channel closed, stopping relay connection"); - return Ok(true); // Stop handling - } - } - RelayPoolNotification::Message { message, .. } => { - if let RelayMessage::EndOfStoredEvents(_) = message { - tracing::debug!("EOSE received from {}", self.url); - if event_sender - .send(RelayEvent::EndOfStoredEvents) - .await - .is_err() - { - return Ok(true); // Stop handling - } - } - } - RelayPoolNotification::Shutdown => { - tracing::info!("Relay {} shutting down", self.url); - let _ = event_sender - .send(RelayEvent::Closed("Shutdown".to_string())) - .await; - return Ok(true); // Stop handling - } - } - Ok(false) // Continue handling - }) - .await - .ok(); // Ignore errors on shutdown - - // Disconnect when done - self.client.disconnect().await; - tracing::info!("Disconnected from relay: {}", self.url); - } - - /// Get the relay URL - pub fn url(&self) -> &str { - &self.url - } - - /// Subscribe to an additional filter. - /// - /// This is used to add Layer 2 filters for repo-related events after - /// the initial connection is established. - pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> { - self.client - .subscribe(filter, None) - .await - .map_err(|e| format!("Failed to subscribe with filter: {}", e))?; - Ok(()) - } - - /// Get a reference to the client for additional operations. - pub fn client(&self) -> &Client { - &self.client - } -} diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs deleted file mode 100644 index 0512088..0000000 --- a/src/sync/self_subscriber.rs +++ /dev/null @@ -1,497 +0,0 @@ -//! Self-Subscriber for Proactive Sync -//! -//! This module handles subscribing to our own relay to detect new events -//! and trigger relay discovery from announcements. - -use std::collections::{HashMap, HashSet}; -use std::time::Duration; - -use nostr_sdk::prelude::*; -use tokio::sync::mpsc; -use tokio::time::Instant; - -use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; - -use super::{FollowingRepoRootEvents, SyncManager, SyncRelays}; - -// ============================================================================= -// Types -// ============================================================================= - -/// Actions to be taken by the SyncManager based on self-subscription events. -#[derive(Debug, Clone)] -pub enum RelayAction { - /// Spawn a new relay connection to sync from. - /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering. - SpawnRelay { - relay_url: String, - repos_and_root_events: HashMap>, - }, - /// Add filters to an existing relay connection. - /// Contains: relay_url, additional repos to add. - AddFilters { - relay_url: String, - repos_and_new_root_event: HashMap>, - }, -} - -/// Pending updates collected during batch window. -#[derive(Debug, Default)] -struct PendingUpdates { - /// New announcements (kind 30617) - triggers relay discovery - announcements: Vec, - /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set - root_events: Vec, -} - -// ============================================================================= -// SelfSubscriber -// ============================================================================= - -/// Subscribes to our own relay to detect new events. -/// -/// The self-subscriber: -/// 1. Connects to our own relay -/// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618) -/// 3. When events arrive, batches them -/// 4. On batch timer fire, processes updates and sends relay actions -pub struct SelfSubscriber { - /// URL of our own relay to subscribe to - own_relay_url: String, - /// Our relay domain for checking if announcements list us - relay_domain: String, - /// Reference to following repo root events (shared with SyncManager) - following_repo_root_events: FollowingRepoRootEvents, - /// Reference to sync relays (shared with SyncManager) - sync_relays: SyncRelays, - /// Channel to send relay actions back to manager - action_tx: mpsc::Sender, -} - -impl SelfSubscriber { - /// Create a new self-subscriber. - pub fn new( - own_relay_url: String, - relay_domain: String, - following_repo_root_events: FollowingRepoRootEvents, - sync_relays: SyncRelays, - action_tx: mpsc::Sender, - ) -> Self { - Self { - own_relay_url, - relay_domain, - following_repo_root_events, - sync_relays, - action_tx, - } - } - - /// Get the batch window duration from environment variable. - /// - /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS - /// for faster tests (typically 200ms). - fn get_batch_window() -> Duration { - std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") - .ok() - .and_then(|s| s.parse().ok()) - .map(Duration::from_millis) - .unwrap_or(Duration::from_secs(5)) - } - - /// Run the self-subscriber event loop. - /// - /// This method: - /// 1. Connects to our own relay - /// 2. Subscribes to relevant event kinds - /// 3. Receives events and batches them - /// 4. On batch timer fire, processes and sends relay actions - pub async fn run(self) { - tracing::info!("SelfSubscriber starting for {}", self.own_relay_url); - - // Create nostr-sdk client - let keys = Keys::generate(); - let client = Client::new(keys); - - // Connect to our own relay - if let Err(e) = client.add_relay(&self.own_relay_url).await { - tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e); - return; - } - - client.connect().await; - - // Wait for connection - let mut connected = false; - for _ in 0..30 { - tokio::time::sleep(Duration::from_millis(100)).await; - let relays = client.relays().await; - if relays.values().any(|r| r.is_connected()) { - connected = true; - break; - } - } - - if !connected { - tracing::error!( - "Failed to connect to own relay {} after 3 seconds", - self.own_relay_url - ); - return; - } - - tracing::info!("SelfSubscriber connected to {}", self.own_relay_url); - - // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design) - let filter = Filter::new() - .kinds([ - Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 - Kind::GitPatch, // 1617 - Kind::Custom(KIND_PR), // 1618 - Kind::Custom(KIND_PR_UPDATE), // 1619 - Kind::GitIssue, // 1621 - ]) - .since(Timestamp::now()); - - if let Err(e) = client.subscribe(filter, None).await { - tracing::error!("Failed to subscribe to own relay: {}", e); - return; - } - - tracing::info!("SelfSubscriber subscribed to event kinds on own relay"); - - // Batch state - let mut pending = PendingUpdates::default(); - let mut batch_timer_started: Option = None; - let batch_window = Self::get_batch_window(); - - // Main event loop using notifications stream - loop { - // Calculate timeout for batch processing - let timeout = if let Some(started) = batch_timer_started { - let elapsed = started.elapsed(); - if elapsed >= batch_window { - Duration::ZERO - } else { - batch_window - elapsed - } - } else { - Duration::from_secs(60) // Long timeout when no batch pending - }; - - // Wait for notification with timeout - let notification = tokio::time::timeout(timeout, client.notifications().recv()).await; - - match notification { - Ok(Ok(notification)) => { - match notification { - RelayPoolNotification::Event { event, .. } => { - let kind = event.kind.as_u16(); - - // Start batch timer on first event (does NOT reset) - if batch_timer_started.is_none() { - batch_timer_started = Some(Instant::now()); - tracing::debug!("Batch timer started"); - } - - // Classify and add to pending - if kind == KIND_REPOSITORY_ANNOUNCEMENT { - tracing::debug!( - "SelfSubscriber received announcement {}", - event.id - ); - pending.announcements.push(*event); - } else { - tracing::debug!( - "SelfSubscriber received root event {} (kind {})", - event.id, - kind - ); - pending.root_events.push(*event); - } - } - RelayPoolNotification::Message { message, .. } => { - if let RelayMessage::EndOfStoredEvents(_) = message { - tracing::debug!("SelfSubscriber EOSE received"); - // Process any pending events after EOSE - if !pending.announcements.is_empty() - || !pending.root_events.is_empty() - { - self.process_batch(&mut pending).await; - batch_timer_started = None; - } - } - } - RelayPoolNotification::Shutdown => { - tracing::info!("SelfSubscriber shutting down"); - break; - } - } - } - Ok(Err(_)) => { - // Channel closed - tracing::warn!("SelfSubscriber notification channel closed"); - break; - } - Err(_) => { - // Timeout - check if batch should be processed - if let Some(started) = batch_timer_started { - if started.elapsed() >= batch_window { - if !pending.announcements.is_empty() || !pending.root_events.is_empty() - { - self.process_batch(&mut pending).await; - } - batch_timer_started = None; - } - } - } - } - } - - client.disconnect().await; - tracing::info!("SelfSubscriber disconnected"); - } - - /// Process a batch of pending updates. - async fn process_batch(&self, pending: &mut PendingUpdates) { - tracing::debug!( - "Processing batch: {} announcements, {} root events", - pending.announcements.len(), - pending.root_events.len() - ); - - // Process root events first (update following_repo_root_events) - for event in pending.root_events.drain(..) { - let repo_refs = SyncManager::extract_all_repo_refs(&event); - if !repo_refs.is_empty() { - let mut guard = self.following_repo_root_events.write().await; - for repo_ref in repo_refs { - guard.entry(repo_ref).or_default().insert(event.id); - } - } - } - - // Process announcements (relay discovery) - for event in pending.announcements.drain(..) { - self.process_announcement(&event).await; - } - } - - /// Process an announcement event for relay discovery. - async fn process_announcement(&self, event: &Event) { - let repo_ref = SyncManager::build_repo_ref(event); - let relay_urls = Self::extract_relay_urls_from_announcement(event); - - // Check if this announcement lists our relay - if !self.lists_our_service(event) { - tracing::debug!( - "Announcement {} does not list our service, skipping relay discovery", - event.id - ); - return; - } - - tracing::info!( - "Processing announcement {} for repo {}, found {} relay URLs", - event.id, - repo_ref, - relay_urls.len() - ); - - // Get current events for this repo from following_repo_root_events - let events = self - .following_repo_root_events - .read() - .await - .get(&repo_ref) - .cloned() - .unwrap_or_default(); - - // For each relay URL in the announcement, check if we need to spawn or update - for relay_url in relay_urls { - if self.is_own_relay(&relay_url) { - continue; // Skip our own relay - } - - let sync_relays_guard = self.sync_relays.read().await; - let exists = sync_relays_guard.contains_key(&relay_url); - drop(sync_relays_guard); - - if exists { - // Relay already known - check if we need to add this repo - let mut guard = self.sync_relays.write().await; - let relay_repos = guard.entry(relay_url.clone()).or_default(); - let is_new_repo = !relay_repos.contains_key(&repo_ref); - - if is_new_repo { - relay_repos.insert(repo_ref.clone(), events.clone()); - drop(guard); - - // Send action to add filters - let mut repos_filters = HashMap::new(); - repos_filters.insert(repo_ref.clone(), events.clone()); - - if let Err(e) = self - .action_tx - .send(RelayAction::AddFilters { - relay_url: relay_url.clone(), - repos_and_new_root_event: repos_filters, - }) - .await - { - tracing::warn!("Failed to send AddFilters action: {}", e); - } - } - } else { - // New relay - add to sync_relays and spawn - let mut guard = self.sync_relays.write().await; - let mut repos = HashMap::new(); - repos.insert(repo_ref.clone(), events.clone()); - guard.insert(relay_url.clone(), repos.clone()); - drop(guard); - - tracing::info!("Discovered new relay to sync from: {}", relay_url); - - // Send action to spawn relay - if let Err(e) = self - .action_tx - .send(RelayAction::SpawnRelay { - relay_url: relay_url.clone(), - repos_and_root_events: repos, - }) - .await - { - tracing::warn!("Failed to send SpawnRelay action: {}", e); - } - } - } - } - - /// Extract relay URLs from an announcement event. - /// - /// Looks for both 'relays' and 'clone' tags. - fn extract_relay_urls_from_announcement(event: &Event) -> Vec { - let mut urls = Vec::new(); - - // Extract from 'relays' tag - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Relays) { - let vec = tag.clone().to_vec(); - urls.extend(vec.into_iter().skip(1)); // Skip tag name - } - } - - // Extract from 'clone' tag - parse URLs to get relay hints - // Clone URLs look like: http://domain/repo.git or git://domain/repo.git - // We want to construct ws://domain from these - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Clone) { - let vec = tag.clone().to_vec(); - for url in vec.into_iter().skip(1) { - if let Some(relay_url) = Self::clone_url_to_relay_url(&url) { - if !urls.contains(&relay_url) { - urls.push(relay_url); - } - } - } - } - } - - urls - } - - /// Convert a clone URL to a potential relay URL. - /// - /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080" - fn clone_url_to_relay_url(clone_url: &str) -> Option { - // Parse the URL to extract host:port - if let Ok(url) = url::Url::parse(clone_url) { - let host = url.host_str()?; - let port = url.port(); - let scheme = if url.scheme() == "https" { "wss" } else { "ws" }; - - if let Some(port) = port { - Some(format!("{}://{}:{}", scheme, host, port)) - } else { - Some(format!("{}://{}", scheme, host)) - } - } else { - None - } - } - - /// Check if event lists our service in the relays or clone tags. - fn lists_our_service(&self, event: &Event) -> bool { - // Check relays tag - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Relays) { - let vec = tag.clone().to_vec(); - for url in vec.into_iter().skip(1) { - if self.is_own_relay(&url) { - return true; - } - } - } - } - - // Check clone tag - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Clone) { - let vec = tag.clone().to_vec(); - for url in vec.into_iter().skip(1) { - if url.contains(&self.relay_domain) { - return true; - } - } - } - } - - false - } - - /// Check if a relay URL matches our relay. - fn is_own_relay(&self, relay_url: &str) -> bool { - relay_url.contains(&self.relay_domain) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_clone_url_to_relay_url_http() { - let url = "http://127.0.0.1:8080/repo.git"; - let relay = SelfSubscriber::clone_url_to_relay_url(url); - assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string())); - } - - #[test] - fn test_clone_url_to_relay_url_https() { - let url = "https://example.com/repo.git"; - let relay = SelfSubscriber::clone_url_to_relay_url(url); - assert_eq!(relay, Some("wss://example.com".to_string())); - } - - #[test] - fn test_clone_url_to_relay_url_invalid() { - let url = "not-a-valid-url"; - let relay = SelfSubscriber::clone_url_to_relay_url(url); - assert_eq!(relay, None); - } - - #[test] - fn test_get_batch_window_default() { - // Clear env var if set - std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); - let window = SelfSubscriber::get_batch_window(); - assert_eq!(window, Duration::from_secs(5)); - } - - #[test] - fn test_get_batch_window_from_env() { - std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200"); - let window = SelfSubscriber::get_batch_window(); - assert_eq!(window, Duration::from_millis(200)); - std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); - } -} -- cgit v1.2.3