upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 10:33:07 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 10:33:07 +0000
commit586fc2a7df1ce256469f0742d23f687ac4b075b1 (patch)
treedc07dbec88ea1ca2e80b4ced91831256bb68ce4e /src
parent2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b (diff)
stub of sync v4
Diffstat (limited to 'src')
-rw-r--r--src/sync/health.rs475
-rw-r--r--src/sync/metrics.rs348
-rw-r--r--src/sync/mod.rs1264
-rw-r--r--src/sync/relay_connection.rs185
-rw-r--r--src/sync/self_subscriber.rs497
5 files changed, 300 insertions, 2469 deletions
diff --git a/src/sync/health.rs b/src/sync/health.rs
deleted file mode 100644
index 51bd5ae..0000000
--- a/src/sync/health.rs
+++ /dev/null
@@ -1,475 +0,0 @@
1//! Relay Health Tracking for GRASP-02 Proactive Sync
2//!
3//! This module implements health tracking for relay connections, including:
4//! - Health state machine (Healthy -> Degraded -> Dead)
5//! - Exponential backoff with configurable max delay
6//! - Dead relay detection after 24h of continuous failures
7//!
8//! ## Health States
9//!
10//! - **Healthy**: Working connection, no recent failures
11//! - **Degraded**: Connection failed, retrying with backoff
12//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day)
13
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18
19use crate::config::Config;
20
21/// Duration threshold before a relay is considered dead (24 hours)
22const DEAD_THRESHOLD_HOURS: u64 = 24;
23
24/// How often dead relays are retried (once per 24 hours)
25const DEAD_RETRY_INTERVAL_HOURS: u64 = 24;
26
27/// Default maximum backoff duration in seconds (1 hour)
28const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600;
29
30/// Base backoff duration in seconds
31const BASE_BACKOFF_SECS: u64 = 5;
32
33/// Health state of a relay connection
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum HealthState {
36 /// Working connection, no recent failures
37 Healthy,
38 /// Connection failed, retrying with exponential backoff
39 Degraded,
40 /// 24h+ of continuous failures, minimal retry
41 Dead,
42}
43
44impl std::fmt::Display for HealthState {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 HealthState::Healthy => write!(f, "healthy"),
48 HealthState::Degraded => write!(f, "degraded"),
49 HealthState::Dead => write!(f, "dead"),
50 }
51 }
52}
53
54/// Health information for a single relay
55#[derive(Debug, Clone)]
56pub struct RelayHealth {
57 /// Current health state
58 pub state: HealthState,
59 /// Number of consecutive connection failures
60 pub consecutive_failures: u32,
61 /// Time of the first failure in the current failure streak
62 pub first_failure_time: Option<Instant>,
63 /// Time of the last failure
64 pub last_failure_time: Option<Instant>,
65 /// Time of the last successful connection
66 pub last_success_time: Option<Instant>,
67 /// Next time a connection attempt should be made
68 pub next_retry_at: Option<Instant>,
69}
70
71impl Default for RelayHealth {
72 fn default() -> Self {
73 Self {
74 state: HealthState::Healthy,
75 consecutive_failures: 0,
76 first_failure_time: None,
77 last_failure_time: None,
78 last_success_time: None,
79 next_retry_at: None,
80 }
81 }
82}
83
84impl RelayHealth {
85 /// Create a new RelayHealth with healthy state
86 pub fn new() -> Self {
87 Self::default()
88 }
89}
90
91/// Thread-safe relay health tracker using DashMap
92#[derive(Debug)]
93pub struct RelayHealthTracker {
94 health: DashMap<String, RelayHealth>,
95 max_backoff_secs: u64,
96}
97
98impl RelayHealthTracker {
99 /// Create a new RelayHealthTracker
100 pub fn new(config: &Config) -> Self {
101 Self {
102 health: DashMap::new(),
103 max_backoff_secs: config.sync_max_backoff_secs,
104 }
105 }
106
107 /// Create a new RelayHealthTracker with default settings
108 pub fn with_defaults() -> Self {
109 Self {
110 health: DashMap::new(),
111 max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS,
112 }
113 }
114
115 /// Create a new RelayHealthTracker with custom max backoff
116 pub fn with_max_backoff(max_backoff_secs: u64) -> Self {
117 Self {
118 health: DashMap::new(),
119 max_backoff_secs,
120 }
121 }
122
123 /// Record a successful connection to a relay
124 ///
125 /// Resets the relay to Healthy state and clears failure counters.
126 pub fn record_success(&self, relay_url: &str) {
127 let now = Instant::now();
128 let mut entry = self.health.entry(relay_url.to_string()).or_default();
129 let health = entry.value_mut();
130
131 let old_state = health.state;
132
133 // Reset to healthy state
134 health.state = HealthState::Healthy;
135 health.consecutive_failures = 0;
136 health.first_failure_time = None;
137 health.last_failure_time = None;
138 health.last_success_time = Some(now);
139 health.next_retry_at = None;
140
141 if old_state != HealthState::Healthy {
142 tracing::info!(
143 "Relay {} recovered to healthy (was {:?})",
144 relay_url,
145 old_state
146 );
147 }
148 }
149
150 /// Record a connection failure for a relay
151 ///
152 /// Increments failure counter, updates state, and calculates next retry time.
153 pub fn record_failure(&self, relay_url: &str) {
154 let now = Instant::now();
155 let mut entry = self.health.entry(relay_url.to_string()).or_default();
156 let health = entry.value_mut();
157
158 let old_state = health.state;
159
160 // Set first_failure_time if this is a new failure streak
161 if health.first_failure_time.is_none() {
162 health.first_failure_time = Some(now);
163 }
164
165 health.consecutive_failures = health.consecutive_failures.saturating_add(1);
166 health.last_failure_time = Some(now);
167
168 // Check if we should transition to Dead state
169 if let Some(first_failure) = health.first_failure_time {
170 let failure_duration = now.duration_since(first_failure);
171 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600);
172
173 if failure_duration >= dead_threshold {
174 health.state = HealthState::Dead;
175 // Dead relays retry once per day
176 health.next_retry_at =
177 Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600));
178
179 if old_state != HealthState::Dead {
180 tracing::warn!(
181 "Relay {} marked dead after 24h failures ({} consecutive failures)",
182 relay_url,
183 health.consecutive_failures
184 );
185 }
186 } else {
187 // Degraded state with exponential backoff
188 health.state = HealthState::Degraded;
189 let backoff = Self::get_backoff_duration(
190 health.consecutive_failures,
191 self.max_backoff_secs,
192 );
193 health.next_retry_at = Some(now + backoff);
194
195 if old_state != HealthState::Degraded {
196 tracing::warn!(
197 "Relay {} degraded, backoff {:?}",
198 relay_url,
199 backoff
200 );
201 } else {
202 tracing::debug!(
203 "Relay {} failure #{}, backoff {:?}",
204 relay_url,
205 health.consecutive_failures,
206 backoff
207 );
208 }
209 }
210 }
211 }
212
213 /// Check if a connection attempt should be made to a relay
214 ///
215 /// Returns true if:
216 /// - The relay has no health record (first attempt)
217 /// - The relay is healthy
218 /// - The backoff period has elapsed
219 pub fn should_attempt_connection(&self, relay_url: &str) -> bool {
220 let entry = self.health.get(relay_url);
221
222 match entry {
223 None => true, // No record, allow first attempt
224 Some(entry) => {
225 let health = entry.value();
226
227 match health.state {
228 HealthState::Healthy => true,
229 HealthState::Degraded | HealthState::Dead => {
230 // Check if backoff period has elapsed
231 match health.next_retry_at {
232 None => true,
233 Some(next_retry) => Instant::now() >= next_retry,
234 }
235 }
236 }
237 }
238 }
239 }
240
241 /// Get the current health state of a relay
242 pub fn get_state(&self, relay_url: &str) -> HealthState {
243 self.health
244 .get(relay_url)
245 .map(|entry| entry.value().state)
246 .unwrap_or(HealthState::Healthy)
247 }
248
249 /// Check if a relay is marked as dead
250 pub fn is_dead(&self, relay_url: &str) -> bool {
251 self.get_state(relay_url) == HealthState::Dead
252 }
253
254 /// Get the remaining backoff duration for a relay
255 ///
256 /// Returns None if no backoff is active.
257 pub fn get_remaining_backoff(&self, relay_url: &str) -> Option<Duration> {
258 let entry = self.health.get(relay_url)?;
259 let health = entry.value();
260 let next_retry = health.next_retry_at?;
261 let now = Instant::now();
262
263 if now >= next_retry {
264 None
265 } else {
266 Some(next_retry - now)
267 }
268 }
269
270 /// Get the consecutive failure count for a relay
271 pub fn get_failure_count(&self, relay_url: &str) -> u32 {
272 self.health
273 .get(relay_url)
274 .map(|entry| entry.value().consecutive_failures)
275 .unwrap_or(0)
276 }
277
278 /// Calculate the backoff duration based on failure count
279 ///
280 /// Uses exponential backoff: base * 2^failures, capped at max_backoff
281 pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration {
282 let backoff_secs = BASE_BACKOFF_SECS
283 .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1)));
284 Duration::from_secs(backoff_secs.min(max_backoff_secs))
285 }
286
287 /// Get all tracked relay URLs
288 pub fn get_tracked_relays(&self) -> Vec<String> {
289 self.health.iter().map(|entry| entry.key().clone()).collect()
290 }
291
292 /// Get a clone of the health info for a relay
293 pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> {
294 self.health.get(relay_url).map(|entry| entry.value().clone())
295 }
296}
297
298/// Create a shared RelayHealthTracker wrapped in Arc
299pub fn create_health_tracker(config: &Config) -> Arc<RelayHealthTracker> {
300 Arc::new(RelayHealthTracker::new(config))
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_health_state_display() {
309 assert_eq!(HealthState::Healthy.to_string(), "healthy");
310 assert_eq!(HealthState::Degraded.to_string(), "degraded");
311 assert_eq!(HealthState::Dead.to_string(), "dead");
312 }
313
314 #[test]
315 fn test_default_health_is_healthy() {
316 let health = RelayHealth::default();
317 assert_eq!(health.state, HealthState::Healthy);
318 assert_eq!(health.consecutive_failures, 0);
319 assert!(health.first_failure_time.is_none());
320 }
321
322 #[test]
323 fn test_should_attempt_connection_new_relay() {
324 let tracker = RelayHealthTracker::with_defaults();
325 assert!(tracker.should_attempt_connection("wss://new-relay.example.com"));
326 }
327
328 #[test]
329 fn test_record_success_resets_to_healthy() {
330 let tracker = RelayHealthTracker::with_defaults();
331 let url = "wss://test-relay.example.com";
332
333 // Simulate a few failures
334 tracker.record_failure(url);
335 tracker.record_failure(url);
336 assert_eq!(tracker.get_state(url), HealthState::Degraded);
337 assert_eq!(tracker.get_failure_count(url), 2);
338
339 // Record success
340 tracker.record_success(url);
341 assert_eq!(tracker.get_state(url), HealthState::Healthy);
342 assert_eq!(tracker.get_failure_count(url), 0);
343 assert!(tracker.should_attempt_connection(url));
344 }
345
346 #[test]
347 fn test_backoff_increases_exponentially() {
348 // failure 1: 5s
349 assert_eq!(
350 RelayHealthTracker::get_backoff_duration(1, 3600),
351 Duration::from_secs(5)
352 );
353 // failure 2: 10s
354 assert_eq!(
355 RelayHealthTracker::get_backoff_duration(2, 3600),
356 Duration::from_secs(10)
357 );
358 // failure 3: 20s
359 assert_eq!(
360 RelayHealthTracker::get_backoff_duration(3, 3600),
361 Duration::from_secs(20)
362 );
363 // failure 4: 40s
364 assert_eq!(
365 RelayHealthTracker::get_backoff_duration(4, 3600),
366 Duration::from_secs(40)
367 );
368 // failure 5: 80s
369 assert_eq!(
370 RelayHealthTracker::get_backoff_duration(5, 3600),
371 Duration::from_secs(80)
372 );
373 }
374
375 #[test]
376 fn test_backoff_capped_at_max() {
377 let max_backoff = 3600u64;
378 // After many failures, should cap at max_backoff (1 hour)
379 assert_eq!(
380 RelayHealthTracker::get_backoff_duration(20, max_backoff),
381 Duration::from_secs(max_backoff)
382 );
383 }
384
385 #[test]
386 fn test_degraded_state_after_failure() {
387 let tracker = RelayHealthTracker::with_defaults();
388 let url = "wss://test-relay.example.com";
389
390 tracker.record_failure(url);
391 assert_eq!(tracker.get_state(url), HealthState::Degraded);
392 assert_eq!(tracker.get_failure_count(url), 1);
393 }
394
395 #[test]
396 fn test_backoff_blocks_immediate_reconnection() {
397 let tracker = RelayHealthTracker::with_defaults();
398 let url = "wss://test-relay.example.com";
399
400 tracker.record_failure(url);
401
402 // Immediately after failure, should not attempt (backoff active)
403 assert!(!tracker.should_attempt_connection(url));
404
405 // Remaining backoff should be some positive duration
406 let remaining = tracker.get_remaining_backoff(url);
407 assert!(remaining.is_some());
408 assert!(remaining.unwrap() > Duration::ZERO);
409 }
410
411 #[test]
412 fn test_is_dead() {
413 let tracker = RelayHealthTracker::with_defaults();
414 let url = "wss://test-relay.example.com";
415
416 // Initially not dead
417 assert!(!tracker.is_dead(url));
418
419 // After a failure, still not dead (just degraded)
420 tracker.record_failure(url);
421 assert!(!tracker.is_dead(url));
422 assert_eq!(tracker.get_state(url), HealthState::Degraded);
423 }
424
425 #[test]
426 fn test_get_tracked_relays() {
427 let tracker = RelayHealthTracker::with_defaults();
428
429 tracker.record_success("wss://relay1.example.com");
430 tracker.record_failure("wss://relay2.example.com");
431
432 let tracked = tracker.get_tracked_relays();
433 assert_eq!(tracked.len(), 2);
434 assert!(tracked.contains(&"wss://relay1.example.com".to_string()));
435 assert!(tracked.contains(&"wss://relay2.example.com".to_string()));
436 }
437
438 #[test]
439 fn test_custom_max_backoff() {
440 let custom_max = 60u64; // 1 minute max
441 let tracker = RelayHealthTracker::with_max_backoff(custom_max);
442 let url = "wss://test-relay.example.com";
443
444 // Simulate many failures
445 for _ in 0..20 {
446 tracker.record_failure(url);
447 }
448
449 // The remaining backoff should respect the custom max
450 // Note: We can't easily test the internal backoff calculation here,
451 // but we can verify the tracker was created with the custom setting
452 assert_eq!(tracker.max_backoff_secs, custom_max);
453 }
454
455 #[test]
456 fn test_get_health_returns_clone() {
457 let tracker = RelayHealthTracker::with_defaults();
458 let url = "wss://test-relay.example.com";
459
460 tracker.record_success(url);
461 let health = tracker.get_health(url);
462
463 assert!(health.is_some());
464 let health = health.unwrap();
465 assert_eq!(health.state, HealthState::Healthy);
466 assert!(health.last_success_time.is_some());
467 }
468
469 #[test]
470 fn test_get_health_nonexistent() {
471 let tracker = RelayHealthTracker::with_defaults();
472 let health = tracker.get_health("wss://nonexistent.example.com");
473 assert!(health.is_none());
474 }
475} \ No newline at end of file
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
deleted file mode 100644
index c93e583..0000000
--- a/src/sync/metrics.rs
+++ /dev/null
@@ -1,348 +0,0 @@
1//! Prometheus Metrics for Proactive Sync (GRASP-02 Phase 6)
2//!
3//! This module provides comprehensive sync monitoring metrics including:
4//! - Connection status and attempts per relay
5//! - Health state tracking (Healthy/Degraded/Dead)
6//! - Event sync tracking by source (live/startup/reconnect/daily catchup)
7//! - Gap events filled during catchup operations
8//!
9//! All metrics follow the `ngit_sync_` prefix convention.
10
11use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
12
13use super::health::HealthState;
14
15/// Prometheus metrics for the proactive sync system
16#[derive(Clone)]
17pub struct SyncMetrics {
18 // === Connection metrics ===
19 /// Per-relay connection status (1=connected, 0=disconnected)
20 relay_connected: IntGaugeVec,
21 /// Connection attempts by relay and result (success/failure)
22 connection_attempts_total: IntCounterVec,
23
24 // === Health metrics ===
25 /// Per-relay health status (healthy=1, degraded=2, dead=3)
26 relay_status: IntGaugeVec,
27 /// Per-relay consecutive failure count
28 relay_failures: IntGaugeVec,
29
30 // === Event metrics ===
31 /// Events synced by source (live/startup/reconnect/daily)
32 events_total: IntCounterVec,
33 /// Gap events filled during catchup, by relay
34 gap_events_total: IntCounterVec,
35
36 // === Summary metrics ===
37 /// Total relays discovered and tracked
38 relays_tracked_total: IntGauge,
39 /// Currently connected relay count
40 relays_connected_total: IntGauge,
41 /// Relays marked as dead
42 relays_dead_total: IntGauge,
43}
44
45impl SyncMetrics {
46 /// Register all sync metrics with the provided Prometheus registry
47 pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> {
48 // Connection metrics
49 let relay_connected = IntGaugeVec::new(
50 Opts::new(
51 "ngit_sync_relay_connected",
52 "Relay connection status (1=connected, 0=disconnected)",
53 ),
54 &["relay"],
55 )?;
56 registry.register(Box::new(relay_connected.clone()))?;
57
58 let connection_attempts_total = IntCounterVec::new(
59 Opts::new(
60 "ngit_sync_connection_attempts_total",
61 "Total connection attempts by relay and result",
62 ),
63 &["relay", "result"],
64 )?;
65 registry.register(Box::new(connection_attempts_total.clone()))?;
66
67 // Health metrics
68 let relay_status = IntGaugeVec::new(
69 Opts::new(
70 "ngit_sync_relay_status",
71 "Relay health status (1=healthy, 2=degraded, 3=dead)",
72 ),
73 &["relay"],
74 )?;
75 registry.register(Box::new(relay_status.clone()))?;
76
77 let relay_failures = IntGaugeVec::new(
78 Opts::new(
79 "ngit_sync_relay_failures",
80 "Consecutive failure count per relay",
81 ),
82 &["relay"],
83 )?;
84 registry.register(Box::new(relay_failures.clone()))?;
85
86 // Event metrics
87 let events_total = IntCounterVec::new(
88 Opts::new(
89 "ngit_sync_events_total",
90 "Total events synced by source type",
91 ),
92 &["source"],
93 )?;
94 registry.register(Box::new(events_total.clone()))?;
95
96 let gap_events_total = IntCounterVec::new(
97 Opts::new(
98 "ngit_sync_gap_events_total",
99 "Gap events filled during catchup by relay",
100 ),
101 &["relay"],
102 )?;
103 registry.register(Box::new(gap_events_total.clone()))?;
104
105 // Summary metrics
106 let relays_tracked_total = IntGauge::with_opts(Opts::new(
107 "ngit_sync_relays_tracked_total",
108 "Total number of relays discovered and tracked",
109 ))?;
110 registry.register(Box::new(relays_tracked_total.clone()))?;
111
112 let relays_connected_total = IntGauge::with_opts(Opts::new(
113 "ngit_sync_relays_connected_total",
114 "Number of currently connected relays",
115 ))?;
116 registry.register(Box::new(relays_connected_total.clone()))?;
117
118 let relays_dead_total = IntGauge::with_opts(Opts::new(
119 "ngit_sync_relays_dead_total",
120 "Number of relays marked as dead",
121 ))?;
122 registry.register(Box::new(relays_dead_total.clone()))?;
123
124 Ok(Self {
125 relay_connected,
126 connection_attempts_total,
127 relay_status,
128 relay_failures,
129 events_total,
130 gap_events_total,
131 relays_tracked_total,
132 relays_connected_total,
133 relays_dead_total,
134 })
135 }
136
137 // === Connection Recording Methods ===
138
139 /// Record a connection attempt (success or failure)
140 pub fn record_connection_attempt(&self, relay: &str, success: bool) {
141 let result = if success { "success" } else { "failure" };
142 self.connection_attempts_total
143 .with_label_values(&[relay, result])
144 .inc();
145 }
146
147 /// Set relay connection status
148 pub fn set_relay_connected(&self, relay: &str, connected: bool) {
149 self.relay_connected
150 .with_label_values(&[relay])
151 .set(if connected { 1 } else { 0 });
152
153 // Update connected count based on all relay values
154 // This is handled by update_connected_count() for accuracy
155 }
156
157 /// Update the total connected relay count
158 pub fn update_connected_count(&self, count: i64) {
159 self.relays_connected_total.set(count);
160 }
161
162 /// Increment connected count
163 pub fn inc_connected_count(&self) {
164 self.relays_connected_total.inc();
165 }
166
167 /// Decrement connected count
168 pub fn dec_connected_count(&self) {
169 self.relays_connected_total.dec();
170 }
171
172 // === Health Recording Methods ===
173
174 /// Record relay health state change
175 pub fn record_health_state(&self, relay: &str, state: HealthState) {
176 let state_value = match state {
177 HealthState::Healthy => 1,
178 HealthState::Degraded => 2,
179 HealthState::Dead => 3,
180 };
181 self.relay_status.with_label_values(&[relay]).set(state_value);
182 }
183
184 /// Record relay failure count
185 pub fn record_failure_count(&self, relay: &str, count: u32) {
186 self.relay_failures
187 .with_label_values(&[relay])
188 .set(count as i64);
189 }
190
191 /// Update dead relay count
192 pub fn update_dead_count(&self, count: i64) {
193 self.relays_dead_total.set(count);
194 }
195
196 /// Increment dead relay count
197 pub fn inc_dead_count(&self) {
198 self.relays_dead_total.inc();
199 }
200
201 /// Decrement dead relay count
202 pub fn dec_dead_count(&self) {
203 self.relays_dead_total.dec();
204 }
205
206 // === Event Recording Methods ===
207
208 /// Record a synced event by source type
209 ///
210 /// Source types:
211 /// - "live" - Real-time subscription events
212 /// - "startup" - Events from startup catchup
213 /// - "reconnect" - Events from reconnection catchup
214 /// - "daily" - Events from daily catchup
215 pub fn record_event(&self, source: &str) {
216 self.events_total.with_label_values(&[source]).inc();
217 }
218
219 /// Record multiple events synced by source type
220 pub fn record_events(&self, source: &str, count: u64) {
221 self.events_total
222 .with_label_values(&[source])
223 .inc_by(count);
224 }
225
226 /// Record a gap event filled during catchup
227 pub fn record_gap_event(&self, relay: &str) {
228 self.gap_events_total.with_label_values(&[relay]).inc();
229 }
230
231 /// Record multiple gap events filled during catchup
232 pub fn record_gap_events(&self, relay: &str, count: u64) {
233 self.gap_events_total
234 .with_label_values(&[relay])
235 .inc_by(count);
236 }
237
238 // === Summary Recording Methods ===
239
240 /// Set the total tracked relay count
241 pub fn set_tracked_count(&self, count: i64) {
242 self.relays_tracked_total.set(count);
243 }
244
245 /// Increment tracked relay count
246 pub fn inc_tracked_count(&self) {
247 self.relays_tracked_total.inc();
248 }
249
250 /// Get current tracked relay count
251 pub fn get_tracked_count(&self) -> i64 {
252 self.relays_tracked_total.get()
253 }
254
255 /// Get current connected relay count
256 pub fn get_connected_count(&self) -> i64 {
257 self.relays_connected_total.get()
258 }
259
260 /// Get current dead relay count
261 pub fn get_dead_count(&self) -> i64 {
262 self.relays_dead_total.get()
263 }
264}
265
266/// Event source types for metrics tracking
267pub mod event_source {
268 /// Real-time subscription events
269 pub const LIVE: &str = "live";
270 /// Events from startup catchup
271 pub const STARTUP: &str = "startup";
272 /// Events from reconnection catchup
273 pub const RECONNECT: &str = "reconnect";
274 /// Events from daily catchup
275 pub const DAILY: &str = "daily";
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 fn create_test_registry() -> Registry {
283 Registry::new()
284 }
285
286 #[test]
287 fn test_metrics_registration() {
288 let registry = create_test_registry();
289 let metrics = SyncMetrics::register(&registry);
290 assert!(metrics.is_ok());
291 }
292
293 #[test]
294 fn test_connection_metrics() {
295 let registry = create_test_registry();
296 let metrics = SyncMetrics::register(&registry).unwrap();
297
298 metrics.record_connection_attempt("wss://relay1.example.com", true);
299 metrics.record_connection_attempt("wss://relay1.example.com", false);
300 metrics.record_connection_attempt("wss://relay2.example.com", true);
301
302 metrics.set_relay_connected("wss://relay1.example.com", true);
303 metrics.inc_connected_count();
304
305 assert_eq!(metrics.get_connected_count(), 1);
306 }
307
308 #[test]
309 fn test_health_metrics() {
310 let registry = create_test_registry();
311 let metrics = SyncMetrics::register(&registry).unwrap();
312
313 metrics.record_health_state("wss://relay1.example.com", HealthState::Healthy);
314 metrics.record_health_state("wss://relay2.example.com", HealthState::Degraded);
315 metrics.record_health_state("wss://relay3.example.com", HealthState::Dead);
316
317 metrics.record_failure_count("wss://relay2.example.com", 5);
318 metrics.update_dead_count(1);
319
320 assert_eq!(metrics.get_dead_count(), 1);
321 }
322
323 #[test]
324 fn test_event_metrics() {
325 let registry = create_test_registry();
326 let metrics = SyncMetrics::register(&registry).unwrap();
327
328 metrics.record_event(event_source::LIVE);
329 metrics.record_events(event_source::STARTUP, 10);
330 metrics.record_gap_event("wss://relay1.example.com");
331 metrics.record_gap_events("wss://relay2.example.com", 5);
332 }
333
334 #[test]
335 fn test_summary_metrics() {
336 let registry = create_test_registry();
337 let metrics = SyncMetrics::register(&registry).unwrap();
338
339 metrics.set_tracked_count(5);
340 assert_eq!(metrics.get_tracked_count(), 5);
341
342 metrics.inc_tracked_count();
343 assert_eq!(metrics.get_tracked_count(), 6);
344
345 metrics.update_connected_count(3);
346 assert_eq!(metrics.get_connected_count(), 3);
347 }
348} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 9dec982..c1f8bca 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1,1039 +1,375 @@
1//! Proactive Sync Module for GRASP-02 1//! Proactive Sync Module - GRASP-02 v4 Implementation
2//! 2//!
3//! This module implements the proactive sync system that ensures data availability 3//! This module implements proactive synchronization of repository data from external
4//! for repositories hosted on this relay by syncing from other relays in the ecosystem. 4//! relays based on relay URLs listed in 30617 repository announcements.
5//! 5//!
6//! ## Architecture Overview 6//! ## Architecture
7//! 7//!
8//! The sync system is built around two core data structures: 8//! The sync system uses three index structures:
9//! - `RepoSyncIndex` - What we WANT to sync (source of truth from self-subscription)
10//! - `RelaySyncIndex` - What we have CONFIRMED syncing + connection state
11//! - `PendingSyncIndex` - In-flight batches awaiting EOSE confirmation
9//! 12//!
10//! - **FollowingRepoRootEvents**: Tracks repository root events we're following 13//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
11//! - **SyncRelays**: Tracks relays we sync from, including their repos and events
12//!
13//! These type aliases are colocated with SyncManager (following the pattern of
14//! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity.
15//!
16//! ## Submodules
17//!
18//! - [`health`]: Relay health tracking with exponential backoff and dead relay detection
19//! - [`metrics`]: Prometheus metrics for sync operations
20//!
21//! ## Memory Estimates (from design doc)
22//!
23//! At target scale (1,000 repos, 100 relays):
24//! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB
25//! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB
26//! - **Total in-memory state**: ~10 MB
27//!
28//! ## Upper Bounds (triggers for redesign)
29//!
30//! - 10,000+ repos: Consider database-backed state
31//! - 500+ sync relays: Consider connection pooling
32//! - 500+ root events per repo: Consider per-repo pagination
33//!
34//! ## Design References
35//!
36//! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md)
37//! for the complete design context.
38 14
39use std::collections::{HashMap, HashSet}; 15use std::collections::{HashMap, HashSet};
40use std::net::SocketAddr;
41use std::sync::Arc; 16use std::sync::Arc;
42 17
43use nostr_relay_builder::prelude::{
44 DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy,
45};
46use nostr_sdk::prelude::*; 18use nostr_sdk::prelude::*;
47use nostr_sdk::EventId; 19use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
48use tokio::sync::{mpsc, RwLock}; 20use tokio::sync::RwLock;
49 21
50use crate::config::Config; 22use crate::config::Config;
51use crate::nostr::builder::Nip34WritePolicy; 23use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
52use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT};
53use crate::nostr::SharedDatabase;
54
55mod relay_connection;
56mod self_subscriber;
57pub use relay_connection::{RelayConnection, RelayEvent};
58pub use self_subscriber::{RelayAction, SelfSubscriber};
59 24
60// ============================================================================= 25// =============================================================================
61// Type Aliases for Sync State 26// Type Aliases for Index Structures
62// ============================================================================= 27// =============================================================================
63 28
64/// Repository root events we're following. 29/// What we WANT to sync - derived from events received via self-subscription.
65/// 30/// Updated immediately when self-subscriber batch fires.
66/// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621) 31/// Key: repo addressable ref - 30617:pubkey:identifier
67/// we need to follow for each repository we host. 32pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
68///
69/// ## Key Format
70///
71/// The key is a repository addressable reference in the format:
72/// `"30617:<pubkey>:<identifier>"`
73///
74/// For example: `"30617:abc123...def:my-project"`
75///
76/// ## Value
77///
78/// A set of event IDs representing root events (PRs, Issues, Patches, Status events)
79/// that reference this repository via an `a` tag.
80///
81/// ## Event Kinds Tracked
82///
83/// - **1617**: Patches (NIP-34)
84/// - **1618**: Issues (NIP-34)
85/// - **1619**: PRs (Pull Requests, NIP-34)
86/// - **1621**: Status events (NIP-34)
87///
88/// ## Invariants
89///
90/// - May include a few extra repo refs that aren't in `SyncRelays`
91/// - This is acceptable - we won't query other relays for them
92/// - Updated incrementally via self-subscription
93///
94/// ## Thread Safety
95///
96/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
97/// async tasks performing sync operations.
98///
99/// ## Example Usage
100///
101/// ```rust,ignore
102/// use ngit_grasp::sync::FollowingRepoRootEvents;
103/// use std::collections::HashSet;
104/// use nostr_sdk::EventId;
105///
106/// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) {
107/// let guard = state.read().await;
108/// if let Some(events) = guard.get(repo_ref) {
109/// println!("Tracking {} root events for {}", events.len(), repo_ref);
110/// }
111/// }
112/// ```
113pub type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>;
114
115/// Relays we sync from, including their repos and events.
116///
117/// This structure tracks which relays we need to connect to for syncing,
118/// and for each relay, which repositories and their root events we're interested in.
119///
120/// ## Key Format (Outer HashMap)
121///
122/// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"`
123///
124/// ## Value Format (Inner HashMap)
125///
126/// For each relay, we maintain a map of:
127/// - Key: Repository addressable reference (`"30617:<pubkey>:<identifier>"`)
128/// - Value: Set of event IDs for that repo which should be synced from this relay
129///
130/// ## Relay Selection Criteria
131///
132/// A relay is included if its URL appears in a repository announcement (kind 30617)
133/// that **also** lists our service URL. This ensures we only sync from relays
134/// for repositories that are actually hosted on our relay.
135///
136/// ## Bootstrap Relay
137///
138/// If configured, the bootstrap relay is always present in this map and is
139/// excluded from automatic removal logic. The bootstrap relay is used for
140/// initial sync and discovery even when no repositories explicitly list it.
141///
142/// ## Thread Safety
143///
144/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
145/// async tasks performing sync operations.
146///
147/// ## Example Usage
148///
149/// ```rust,ignore
150/// use ngit_grasp::sync::SyncRelays;
151/// use std::collections::{HashMap, HashSet};
152///
153/// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) {
154/// let guard = state.read().await;
155/// if let Some(repos) = guard.get(relay_url) {
156/// println!("Relay {} tracks {} repos", relay_url, repos.len());
157/// for (repo_ref, events) in repos {
158/// println!(" {} -> {} events", repo_ref, events.len());
159/// }
160/// }
161/// }
162/// ```
163pub type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>;
164 33
165/// Creates a new empty `FollowingRepoRootEvents` state. 34/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
166/// 35/// Key: relay URL
167/// Use this to initialize the state before populating from database queries. 36pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
168pub fn new_following_repo_root_events() -> FollowingRepoRootEvents {
169 Arc::new(RwLock::new(HashMap::new()))
170}
171 37
172/// Creates a new empty `SyncRelays` state. 38/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
173/// 39/// Each batch has its own ID and can confirm independently.
174/// Use this to initialize the state before populating from database queries. 40/// Key: relay URL
175pub fn new_sync_relays() -> SyncRelays { 41pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
176 Arc::new(RwLock::new(HashMap::new()))
177}
178 42
179// ============================================================================= 43// =============================================================================
180// SyncManager 44// Supporting Data Structures
181// ============================================================================= 45// =============================================================================
182 46
183/// Manages proactive synchronization with external relays. 47/// What repos and root events need to be synced
184/// 48#[derive(Debug, Clone, Default)]
185/// The SyncManager is responsible for: 49pub struct RepoSyncNeeds {
186/// - Discovering relays from stored repository announcements 50 /// Relay URLs listed in this repo's 30617 announcement
187/// - Maintaining connections to sync relays 51 pub relays: HashSet<String>,
188/// - Subscribing to events at external relays 52 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
189/// - Applying the acceptance policy to synced events 53 pub root_events: HashSet<EventId>,
190/// 54}
191/// ## Lifecycle
192///
193/// 1. `new()` - Creates manager with database and config
194/// 2. `run()` - Main async loop (call in a spawned task)
195///
196/// ## Current Status
197///
198/// Phase 2 implementation supports:
199/// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter
200/// - Event processing through write policy
201/// - Storage of accepted events
202///
203/// Core data structures:
204/// - [`FollowingRepoRootEvents`]: Repository root events we're following
205/// - [`SyncRelays`]: Relays we sync from with their repos and events
206pub struct SyncManager {
207 /// Bootstrap relay URL if configured
208 bootstrap_relay_url: Option<String>,
209
210 /// Our service domain for filtering repo announcements
211 #[allow(dead_code)]
212 service_domain: String,
213
214 /// Database for querying/storing events
215 database: SharedDatabase,
216
217 /// Write policy for applying acceptance rules
218 write_policy: Nip34WritePolicy,
219
220 /// Repository root events we're following (Phase 1 data structure)
221 #[allow(dead_code)]
222 following_repo_root_events: FollowingRepoRootEvents,
223
224 /// Relays we sync from (Phase 1 data structure)
225 #[allow(dead_code)]
226 sync_relays: SyncRelays,
227
228 /// Max backoff duration for relay reconnection
229 #[allow(dead_code)]
230 max_backoff_secs: u64,
231 55
232 /// Socket address used for sync source (for write policy) 56/// Connection status for a relay
233 sync_source_addr: SocketAddr, 57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub enum ConnectionStatus {
59 /// Not currently connected
60 #[default]
61 Disconnected,
62 /// Connection attempt in progress
63 Connecting,
64 /// Successfully connected and subscribed
65 Connected,
234} 66}
235 67
236impl SyncManager { 68/// Complete state for a single relay - combines sync needs with connection lifecycle
237 /// Creates a new SyncManager. 69#[derive(Debug)]
238 /// 70pub struct RelayState {
239 /// # Arguments 71 /// Repos we have confirmed syncing from this relay
240 /// 72 pub repos: HashSet<String>,
241 /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync 73 /// Root events we have confirmed tracking
242 /// * `service_domain` - Our domain for filtering announcements 74 pub root_events: HashSet<EventId>,
243 /// * `database` - Database for event storage/queries 75 /// If true, never disconnect this relay
244 /// * `write_policy` - Policy for accepting events 76 pub is_bootstrap: bool,
245 /// * `config` - Configuration for sync parameters 77 /// Current connection status
246 pub fn new( 78 pub connection_status: ConnectionStatus,
247 bootstrap_relay_url: Option<String>, 79 /// When we last successfully connected - used for since filter on reconnect
248 service_domain: String, 80 pub last_connected: Option<Timestamp>,
249 database: SharedDatabase, 81 /// When we disconnected - for 15-minute state retention rule
250 write_policy: Nip34WritePolicy, 82 pub disconnected_at: Option<Timestamp>,
251 config: &Config, 83 // The active connection - will be added in Phase 4
252 ) -> Self { 84 // pub connection: Option<RelayConnection>,
253 // Create a synthetic SocketAddr for sync source identification 85}
254 // This is used when calling write_policy.admit_event() for synced events
255 let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
256 86
87impl Default for RelayState {
88 fn default() -> Self {
257 Self { 89 Self {
258 bootstrap_relay_url, 90 repos: HashSet::new(),
259 service_domain, 91 root_events: HashSet::new(),
260 database, 92 is_bootstrap: false,
261 write_policy, 93 connection_status: ConnectionStatus::Disconnected,
262 following_repo_root_events: new_following_repo_root_events(), 94 last_connected: None,
263 sync_relays: new_sync_relays(), 95 disconnected_at: None,
264 max_backoff_secs: config.sync_max_backoff_secs,
265 sync_source_addr,
266 }
267 }
268
269 /// Returns a reference to the following repo root events state.
270 ///
271 /// This is the Phase 1 data structure tracking which repository root events
272 /// (kinds 1617, 1618, 1619, 1621) we're following.
273 pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents {
274 &self.following_repo_root_events
275 }
276
277 /// Returns a reference to the sync relays state.
278 ///
279 /// This is the Phase 1 data structure tracking which relays we sync from
280 /// and their associated repositories/events.
281 pub fn sync_relays(&self) -> &SyncRelays {
282 &self.sync_relays
283 }
284
285 // =========================================================================
286 // Phase 2: Database Initialization
287 // =========================================================================
288
289 /// Initialize sync state from database queries at startup.
290 ///
291 /// This method performs two database queries:
292 /// 1. Query kinds 1617/1618/1619/1621 to build `following_repo_root_events`
293 /// 2. Query kind 30617 to build `sync_relays`
294 ///
295 /// The bootstrap relay (if configured) is always added to `sync_relays`.
296 ///
297 /// # Errors
298 ///
299 /// Returns an error if database queries fail.
300 pub async fn initialize_from_database(&self) -> Result<(), String> {
301 // Initialize bootstrap relay if configured (never removed)
302 if let Some(bootstrap_url) = &self.bootstrap_relay_url {
303 self.sync_relays.write().await.insert(
304 bootstrap_url.clone(),
305 HashMap::new(), // Repos potentially populated below but may stay empty (Layer 1 only)
306 );
307 tracing::info!("Added bootstrap relay to sync_relays: {}", bootstrap_url);
308 }
309
310 // Query 1: Build following_repo_root_events
311 // Find all 1617/1618/1619/1621 events and extract their repo references
312 let root_event_kinds = vec![
313 Kind::GitPatch, // 1617
314 Kind::from(KIND_PR), // 1618
315 Kind::from(KIND_PR_UPDATE), // 1619
316 Kind::GitIssue, // 1621
317 ];
318
319 let filter = Filter::new().kinds(root_event_kinds);
320 let root_events = self
321 .database
322 .query(filter)
323 .await
324 .map_err(|e| format!("Failed to query root events: {}", e))?;
325
326 let mut root_events_count = 0;
327 for event in root_events {
328 // An event may have multiple 'a' tags pointing to different repos
329 let repo_refs = Self::extract_all_repo_refs(&event);
330 for repo_ref in repo_refs {
331 self.following_repo_root_events
332 .write()
333 .await
334 .entry(repo_ref)
335 .or_default()
336 .insert(event.id);
337 root_events_count += 1;
338 }
339 }
340 tracing::info!(
341 "Populated following_repo_root_events with {} repo-event mappings",
342 root_events_count
343 );
344
345 // Query 2: Build sync_relays from kind 30617 announcements
346 let announcement_filter = Filter::new().kind(Kind::from(KIND_REPOSITORY_ANNOUNCEMENT));
347 let announcements = self
348 .database
349 .query(announcement_filter)
350 .await
351 .map_err(|e| format!("Failed to query announcements: {}", e))?;
352
353 let mut sync_relays_count = 0;
354 for event in announcements {
355 let repo_ref = Self::build_repo_ref(&event);
356 let relay_urls = Self::extract_relay_urls(&event);
357
358 // Only track repos that list BOTH a remote relay AND our service
359 if self.lists_our_service(&event) {
360 for relay_url in relay_urls {
361 if !self.is_own_relay(&relay_url) {
362 // Get events for this repo from following_repo_root_events
363 let events = self
364 .following_repo_root_events
365 .read()
366 .await
367 .get(&repo_ref)
368 .cloned()
369 .unwrap_or_default();
370
371 self.sync_relays
372 .write()
373 .await
374 .entry(relay_url)
375 .or_default()
376 .insert(repo_ref.clone(), events);
377 sync_relays_count += 1;
378 }
379 }
380 }
381 }
382 tracing::info!(
383 "Populated sync_relays with {} relay-repo mappings",
384 sync_relays_count
385 );
386
387 Ok(())
388 }
389
390 // =========================================================================
391 // Helper Methods for Event Extraction
392 // =========================================================================
393
394 /// Extract ALL repo refs from an event (it may tag multiple repos).
395 ///
396 /// Looks for 'a' tags that reference kind 30617 (repository announcements).
397 /// Returns refs in format "30617:pubkey:identifier".
398 pub fn extract_all_repo_refs(event: &Event) -> Vec<String> {
399 event
400 .tags
401 .iter()
402 .filter_map(|tag| {
403 let tag_vec = tag.clone().to_vec();
404 if tag_vec.len() >= 2 && tag_vec[0] == "a" {
405 // Validate it's a 30617 reference
406 if tag_vec[1].starts_with("30617:") {
407 Some(tag_vec[1].clone())
408 } else {
409 None
410 }
411 } else {
412 None
413 }
414 })
415 .collect()
416 }
417
418 /// Build a repo ref string from a 30617 announcement event.
419 ///
420 /// Returns format "30617:pubkey:identifier".
421 pub fn build_repo_ref(event: &Event) -> String {
422 // Extract 'd' tag for identifier
423 let identifier = event
424 .tags
425 .iter()
426 .find(|tag| tag.kind() == TagKind::d())
427 .and_then(|tag| tag.content())
428 .map(|s| s.to_string())
429 .unwrap_or_default();
430
431 format!("30617:{}:{}", event.pubkey.to_hex(), identifier)
432 }
433
434 /// Extract relay URLs from a repository announcement event.
435 ///
436 /// Looks for the 'relays' tag and returns all relay URLs.
437 pub fn extract_relay_urls(event: &Event) -> Vec<String> {
438 event
439 .tags
440 .iter()
441 .filter(|tag| matches!(tag.kind(), TagKind::Relays))
442 .flat_map(|tag| {
443 let vec = tag.clone().to_vec();
444 // Skip first element (tag name), rest are values
445 vec.into_iter().skip(1)
446 })
447 .collect()
448 }
449
450 /// Check if event lists our service in the relays tag.
451 ///
452 /// Compares relay URLs against our service domain.
453 fn lists_our_service(&self, event: &Event) -> bool {
454 let relay_urls = Self::extract_relay_urls(event);
455 relay_urls.iter().any(|url| self.is_own_relay(url))
456 }
457
458 /// Check if a relay URL matches our relay.
459 ///
460 /// Compares the URL against our service domain.
461 fn is_own_relay(&self, relay_url: &str) -> bool {
462 // Normalize comparison: check if URL contains our domain
463 relay_url.contains(&self.service_domain)
464 }
465
466 // =========================================================================
467 // Main Run Loop
468 // =========================================================================
469
470 /// Runs the sync manager main loop.
471 ///
472 /// This method should be called in a spawned task:
473 ///
474 /// ```rust,ignore
475 /// tokio::spawn(async move {
476 /// sync_manager.run().await;
477 /// });
478 /// ```
479 ///
480 /// ## Implementation Status
481 ///
482 /// - Phase 2: Layer 1 sync from bootstrap relay ✓
483 /// - Phase 3: Self-subscription and relay discovery ✓
484 /// - Phase 4-6: Filter building, connection management (TODO)
485 /// - Phase 7: Full sync loop (TODO)
486 pub async fn run(self) {
487 tracing::info!(
488 "SyncManager starting (bootstrap_relay={:?}, domain={})",
489 self.bootstrap_relay_url,
490 self.service_domain
491 );
492
493 // Phase 3: Initialize state from database BEFORE spawning connections
494 if let Err(e) = self.initialize_from_database().await {
495 tracing::error!("Failed to initialize from database: {}", e);
496 // Continue anyway - we can still sync from bootstrap
497 }
498
499 // Create channel for relay actions from self-subscriber
500 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100);
501
502 // Construct our own relay URL for self-subscription
503 let own_relay_url = format!("ws://{}", self.service_domain);
504
505 // Spawn self-subscriber task
506 let self_subscriber = SelfSubscriber::new(
507 own_relay_url.clone(),
508 self.service_domain.clone(),
509 Arc::clone(&self.following_repo_root_events),
510 Arc::clone(&self.sync_relays),
511 action_tx,
512 );
513
514 tokio::spawn(async move {
515 self_subscriber.run().await;
516 });
517
518 tracing::info!("SelfSubscriber spawned for {}", own_relay_url);
519
520 // Track active relay connections (relay_url -> event_sender)
521 let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new();
522
523 // Phase 2: Connect to bootstrap relay if configured
524 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
525 if let Some(event_tx) = self
526 .spawn_relay_connection(bootstrap_url.clone(), None)
527 .await
528 {
529 active_relays.insert(bootstrap_url.clone(), event_tx);
530 }
531 }
532
533 // Main coordination loop
534 loop {
535 tokio::select! {
536 // Handle relay actions from self-subscriber
537 action = action_rx.recv() => {
538 match action {
539 Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => {
540 tracing::info!("Spawning new relay connection to {}", relay_url);
541 if !active_relays.contains_key(&relay_url) {
542 if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await {
543 active_relays.insert(relay_url, event_tx);
544 }
545 }
546 }
547 Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => {
548 tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len());
549 // TODO: Implement filter updates for existing connections
550 }
551 None => {
552 tracing::info!("Action channel closed, continuing without self-subscriber");
553 }
554 }
555 }
556 // Sleep to prevent busy loop when no events
557 _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
558 // Periodic maintenance could go here
559 }
560 }
561 }
562 }
563
564 /// Spawn a relay connection with optional Layer 2 filters.
565 ///
566 /// Returns the event sender channel if successfully spawned.
567 async fn spawn_relay_connection(
568 &self,
569 relay_url: String,
570 repos: Option<HashMap<String, HashSet<EventId>>>,
571 ) -> Option<mpsc::Sender<RelayEvent>> {
572 // Create channel for receiving events
573 let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100);
574
575 // Create connection
576 let connection = RelayConnection::new(relay_url.clone());
577
578 // Determine if this is bootstrap (no repos) or discovered relay (with repos)
579 let is_bootstrap = repos.is_none();
580
581 match connection.connect_and_subscribe().await {
582 Ok(()) => {
583 if is_bootstrap {
584 tracing::info!("Bootstrap relay connection established: {}", relay_url);
585 } else {
586 tracing::info!(
587 "Discovered relay connection established: {} (with Layer 2 filters)",
588 relay_url
589 );
590
591 // Add Layer 2 subscription for repo events
592 if let Some(ref repos) = repos {
593 if let Err(e) = self.add_layer2_subscription(&connection, repos).await {
594 tracing::warn!("Failed to add Layer 2 subscription: {}", e);
595 }
596 }
597 }
598
599 // Clone refs needed for event processing task
600 let database = Arc::clone(&self.database);
601 let write_policy = self.write_policy.clone();
602 let sync_source_addr = self.sync_source_addr;
603
604 // Clone event_tx for the spawned task
605 let event_tx_clone = event_tx.clone();
606
607 // Spawn event loop task
608 let conn_url = relay_url.clone();
609 tokio::spawn(async move {
610 connection.run_event_loop(event_tx_clone).await;
611 });
612
613 // Spawn event processing task
614 tokio::spawn(async move {
615 Self::process_relay_events(
616 event_rx,
617 database,
618 write_policy,
619 sync_source_addr,
620 conn_url,
621 )
622 .await;
623 });
624
625 Some(event_tx)
626 }
627 Err(e) => {
628 tracing::error!("Failed to connect to relay {}: {}", relay_url, e);
629 None
630 }
631 }
632 }
633
634 /// Add Layer 2 subscription for repo-related events.
635 ///
636 /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track.
637 async fn add_layer2_subscription(
638 &self,
639 connection: &RelayConnection,
640 repos: &HashMap<String, HashSet<EventId>>,
641 ) -> Result<(), String> {
642 if repos.is_empty() {
643 return Ok(());
644 }
645
646 // Build repo refs list for filter
647 let repo_refs: Vec<String> = repos.keys().cloned().collect();
648
649 tracing::debug!(
650 "Adding Layer 2 subscription for {} repos to {}",
651 repo_refs.len(),
652 connection.url()
653 );
654
655 // Chunk repo_refs into groups of 100 (per plan)
656 for chunk in repo_refs.chunks(100) {
657 // Build filter with lowercase 'a' tag for each repo ref
658 let mut filter = Filter::new().kinds([
659 Kind::GitPatch, // 1617
660 Kind::Custom(1618), // PR
661 Kind::Custom(1619), // PR update
662 Kind::GitIssue, // 1621
663 ]);
664
665 // Add each repo ref as a custom tag filter
666 for repo_ref in chunk {
667 filter =
668 filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone());
669 }
670
671 // Subscribe to this filter
672 if let Err(e) = connection.subscribe_filter(filter).await {
673 return Err(format!("Failed to subscribe with Layer 2 filter: {}", e));
674 }
675 } 96 }
676
677 Ok(())
678 } 97 }
98}
679 99
680 /// Process events from a single relay connection. 100impl RelayState {
681 /// 101 /// Check if state should be cleared based on 15-minute rule
682 /// This is a static method that runs in its own task. 102 pub fn should_clear_state(&self) -> bool {
683 async fn process_relay_events( 103 match self.disconnected_at {
684 mut event_rx: mpsc::Receiver<RelayEvent>, 104 Some(disconnected) => {
685 database: SharedDatabase, 105 let now = Timestamp::now();
686 write_policy: Nip34WritePolicy, 106 now.as_secs().saturating_sub(disconnected.as_secs()) > 900 // 15 minutes
687 sync_source_addr: SocketAddr,
688 relay_url: String,
689 ) {
690 tracing::debug!("Starting event processing for relay: {}", relay_url);
691
692 while let Some(relay_event) = event_rx.recv().await {
693 match relay_event {
694 RelayEvent::Event(event) => {
695 Self::process_single_event_static(
696 &event,
697 &database,
698 &write_policy,
699 &sync_source_addr,
700 &relay_url,
701 )
702 .await;
703 }
704 RelayEvent::EndOfStoredEvents => {
705 tracing::debug!("EOSE received from {}", relay_url);
706 }
707 RelayEvent::Closed(reason) => {
708 tracing::warn!("Connection to {} closed: {}", relay_url, reason);
709 break;
710 }
711 } 107 }
108 None => false, // Still connected or never connected
712 } 109 }
713
714 tracing::info!("Event processing ended for relay: {}", relay_url);
715 } 110 }
716 111
717 /// Process a single event (static version for use in spawned tasks). 112 /// Clear repos and root_events - called when reconnect takes > 15 minutes
718 async fn process_single_event_static( 113 pub fn clear_sync_state(&mut self) {
719 event: &Event, 114 self.repos.clear();
720 database: &SharedDatabase, 115 self.root_events.clear();
721 write_policy: &Nip34WritePolicy,
722 sync_source_addr: &SocketAddr,
723 relay_url: &str,
724 ) {
725 let event_id = event.id;
726 let kind = event.kind.as_u16();
727
728 // Check if event already exists in database
729 match database.check_id(&event_id).await {
730 Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => {
731 tracing::trace!("Event {} already exists, skipping", event_id);
732 return;
733 }
734 Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing
735 Err(e) => {
736 tracing::warn!("Failed to check if event {} exists: {}", event_id, e);
737 }
738 }
739
740 // Pass through write policy
741 let policy_result = write_policy.admit_event(event, sync_source_addr).await;
742
743 match policy_result {
744 PolicyResult::Accept => match database.save_event(event).await {
745 Ok(SaveEventStatus::Success) => {
746 tracing::info!(
747 "Synced event {} (kind {}) from {}",
748 event_id,
749 kind,
750 relay_url
751 );
752 }
753 Ok(_) => {
754 tracing::debug!(
755 "Event {} (kind {}) already stored or rejected by database",
756 event_id,
757 kind
758 );
759 }
760 Err(e) => {
761 tracing::error!("Failed to save synced event {}: {}", event_id, e);
762 }
763 },
764 PolicyResult::Reject(reason) => {
765 tracing::debug!(
766 "Rejected synced event {} (kind {}): {}",
767 event_id,
768 kind,
769 reason
770 );
771 }
772 }
773 } 116 }
774} 117}
775 118
776// ============================================================================= 119/// A batch of items pending EOSE confirmation
777// Submodules 120#[derive(Debug, Clone)]
778// ============================================================================= 121pub struct PendingBatch {
779 122 /// Unique ID for this batch - for debugging/logging
780pub mod health; 123 pub batch_id: u64,
781pub mod metrics; 124 /// The items this batch is syncing
125 pub items: PendingItems,
126 /// Subscription IDs that must ALL receive EOSE before confirming
127 pub outstanding_subs: HashSet<SubscriptionId>,
128}
782 129
783// Re-export commonly used types 130/// Items included in a pending batch
784pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker}; 131#[derive(Debug, Clone, Default)]
785pub use metrics::{event_source, SyncMetrics}; 132pub struct PendingItems {
133 /// Repos being synced in this batch
134 pub repos: HashSet<String>,
135 /// Root events being synced in this batch
136 pub root_events: HashSet<EventId>,
137}
786 138
787// ============================================================================= 139// =============================================================================
788// Tests 140// SyncMetrics - Prometheus Metrics for Sync System
789// ============================================================================= 141// =============================================================================
790 142
791#[cfg(test)] 143/// Prometheus metrics for the proactive sync system.
792mod tests { 144///
793 use super::*; 145/// Tracks relay connections, sync progress, and operational statistics.
794 use nostr_relay_builder::prelude::{EventBuilder, Keys, Tag}; 146/// Following the comprehensive v3 metrics design.
147#[derive(Clone)]
148pub struct SyncMetrics {
149 // === Connection metrics ===
150 /// Per-relay connection status (1=connected, 0=disconnected)
151 relay_connected: IntGaugeVec,
152 /// Connection attempts by relay and result (success/failure)
153 connection_attempts_total: IntCounterVec,
154
155 // === Event metrics ===
156 /// Events synced by source (live/startup/reconnect/daily)
157 events_total: IntCounterVec,
158
159 // === Summary metrics ===
160 /// Total relays discovered and tracked
161 relays_tracked_total: IntGauge,
162 /// Currently connected relay count
163 relays_connected_total: IntGauge,
164}
795 165
796 /// Helper to create a test event with specific tags 166impl SyncMetrics {
797 fn create_test_event(kind: Kind, tags: Vec<Tag>) -> Event { 167 /// Register sync metrics with a Prometheus registry.
798 let keys = Keys::generate(); 168 ///
799 EventBuilder::new(kind, "test content") 169 /// Returns an error if metrics are already registered (e.g., in tests).
800 .tags(tags) 170 pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> {
801 .sign_with_keys(&keys) 171 // Connection metrics
802 .expect("Failed to sign test event") 172 let relay_connected = IntGaugeVec::new(
173 Opts::new(
174 "ngit_sync_relay_connected",
175 "Relay connection status (1=connected, 0=disconnected)",
176 ),
177 &["relay"],
178 )?;
179 registry.register(Box::new(relay_connected.clone()))?;
180
181 let connection_attempts_total = IntCounterVec::new(
182 Opts::new(
183 "ngit_sync_connection_attempts_total",
184 "Total connection attempts by relay and result",
185 ),
186 &["relay", "result"],
187 )?;
188 registry.register(Box::new(connection_attempts_total.clone()))?;
189
190 // Event metrics
191 let events_total = IntCounterVec::new(
192 Opts::new(
193 "ngit_sync_events_total",
194 "Total events synced by source type",
195 ),
196 &["source"],
197 )?;
198 registry.register(Box::new(events_total.clone()))?;
199
200 // Summary metrics
201 let relays_tracked_total = IntGauge::with_opts(Opts::new(
202 "ngit_sync_relays_tracked_total",
203 "Total number of relays discovered and tracked",
204 ))?;
205 registry.register(Box::new(relays_tracked_total.clone()))?;
206
207 let relays_connected_total = IntGauge::with_opts(Opts::new(
208 "ngit_sync_relays_connected_total",
209 "Number of currently connected relays",
210 ))?;
211 registry.register(Box::new(relays_connected_total.clone()))?;
212
213 Ok(Self {
214 relay_connected,
215 connection_attempts_total,
216 events_total,
217 relays_tracked_total,
218 relays_connected_total,
219 })
803 } 220 }
804 221
805 // ========================================================================= 222 // === Connection Recording Methods ===
806 // Tests for extract_all_repo_refs
807 // =========================================================================
808 223
809 #[test] 224 /// Record a connection attempt (success or failure)
810 fn test_extract_all_repo_refs_single_ref() { 225 pub fn record_connection_attempt(&self, relay: &str, success: bool) {
811 let event = create_test_event( 226 let result = if success { "success" } else { "failure" };
812 Kind::GitPatch, 227 self.connection_attempts_total
813 vec![Tag::custom( 228 .with_label_values(&[relay, result])
814 nostr_relay_builder::prelude::TagKind::custom("a"), 229 .inc();
815 vec!["30617:abc123def456:my-project"],
816 )],
817 );
818
819 let refs = SyncManager::extract_all_repo_refs(&event);
820 assert_eq!(refs.len(), 1);
821 assert_eq!(refs[0], "30617:abc123def456:my-project");
822 } 230 }
823 231
824 #[test] 232 /// Set relay connection status
825 fn test_extract_all_repo_refs_multiple_refs() { 233 pub fn set_relay_connected(&self, relay: &str, connected: bool) {
826 let event = create_test_event( 234 self.relay_connected
827 Kind::GitPatch, 235 .with_label_values(&[relay])
828 vec![ 236 .set(if connected { 1 } else { 0 });
829 Tag::custom(
830 nostr_relay_builder::prelude::TagKind::custom("a"),
831 vec!["30617:abc123:project1"],
832 ),
833 Tag::custom(
834 nostr_relay_builder::prelude::TagKind::custom("a"),
835 vec!["30617:def456:project2"],
836 ),
837 ],
838 );
839
840 let refs = SyncManager::extract_all_repo_refs(&event);
841 assert_eq!(refs.len(), 2);
842 assert!(refs.contains(&"30617:abc123:project1".to_string()));
843 assert!(refs.contains(&"30617:def456:project2".to_string()));
844 } 237 }
845 238
846 #[test] 239 /// Increment connected count
847 fn test_extract_all_repo_refs_ignores_non_30617() { 240 pub fn inc_connected_count(&self) {
848 let event = create_test_event( 241 self.relays_connected_total.inc();
849 Kind::GitPatch,
850 vec![
851 Tag::custom(
852 nostr_relay_builder::prelude::TagKind::custom("a"),
853 vec!["30617:abc123:valid-repo"],
854 ),
855 Tag::custom(
856 nostr_relay_builder::prelude::TagKind::custom("a"),
857 vec!["30618:def456:state-event"], // Not a repo ref
858 ),
859 ],
860 );
861
862 let refs = SyncManager::extract_all_repo_refs(&event);
863 assert_eq!(refs.len(), 1);
864 assert_eq!(refs[0], "30617:abc123:valid-repo");
865 } 242 }
866 243
867 #[test] 244 /// Decrement connected count
868 fn test_extract_all_repo_refs_empty_when_no_a_tags() { 245 pub fn dec_connected_count(&self) {
869 let event = create_test_event( 246 self.relays_connected_total.dec();
870 Kind::GitPatch,
871 vec![Tag::custom(
872 nostr_relay_builder::prelude::TagKind::custom("e"),
873 vec!["some-event-id"],
874 )],
875 );
876
877 let refs = SyncManager::extract_all_repo_refs(&event);
878 assert!(refs.is_empty());
879 } 247 }
880 248
881 // ========================================================================= 249 // === Event Recording Methods ===
882 // Tests for build_repo_ref
883 // =========================================================================
884 250
885 #[test] 251 /// Record a synced event by source type
886 fn test_build_repo_ref() { 252 ///
887 let keys = Keys::generate(); 253 /// Source types:
888 let event = EventBuilder::new(Kind::from(30617_u16), "announcement") 254 /// - "live" - Real-time subscription events
889 .tags(vec![Tag::custom( 255 /// - "startup" - Events from startup catchup
890 nostr_relay_builder::prelude::TagKind::d(), 256 /// - "reconnect" - Events from reconnection catchup
891 vec!["my-identifier"], 257 pub fn record_event(&self, source: &str) {
892 )]) 258 self.events_total.with_label_values(&[source]).inc();
893 .sign_with_keys(&keys)
894 .expect("Failed to sign test event");
895
896 let repo_ref = SyncManager::build_repo_ref(&event);
897 assert!(repo_ref.starts_with("30617:"));
898 assert!(repo_ref.ends_with(":my-identifier"));
899 assert!(repo_ref.contains(&event.pubkey.to_hex()));
900 } 259 }
901 260
902 #[test] 261 /// Record multiple events synced by source type
903 fn test_build_repo_ref_empty_identifier() { 262 pub fn record_events(&self, source: &str, count: u64) {
904 let keys = Keys::generate(); 263 self.events_total
905 let event = EventBuilder::new(Kind::from(30617_u16), "announcement") 264 .with_label_values(&[source])
906 .sign_with_keys(&keys) 265 .inc_by(count);
907 .expect("Failed to sign test event");
908
909 let repo_ref = SyncManager::build_repo_ref(&event);
910 assert!(repo_ref.starts_with("30617:"));
911 assert!(repo_ref.ends_with(":")); // Empty identifier
912 } 266 }
913 267
914 // ========================================================================= 268 // === Summary Recording Methods ===
915 // Tests for extract_relay_urls
916 // =========================================================================
917
918 #[test]
919 fn test_extract_relay_urls_single() {
920 let event = create_test_event(
921 Kind::from(30617_u16),
922 vec![Tag::custom(
923 nostr_relay_builder::prelude::TagKind::Relays,
924 vec!["wss://relay.example.com"],
925 )],
926 );
927 269
928 let urls = SyncManager::extract_relay_urls(&event); 270 /// Set the total tracked relay count
929 assert_eq!(urls.len(), 1); 271 pub fn set_tracked_count(&self, count: i64) {
930 assert_eq!(urls[0], "wss://relay.example.com"); 272 self.relays_tracked_total.set(count);
931 } 273 }
932 274
933 #[test] 275 /// Increment tracked relay count
934 fn test_extract_relay_urls_multiple() { 276 pub fn inc_tracked_count(&self) {
935 let event = create_test_event( 277 self.relays_tracked_total.inc();
936 Kind::from(30617_u16),
937 vec![Tag::custom(
938 nostr_relay_builder::prelude::TagKind::Relays,
939 vec!["wss://relay1.example.com", "wss://relay2.example.com"],
940 )],
941 );
942
943 let urls = SyncManager::extract_relay_urls(&event);
944 assert_eq!(urls.len(), 2);
945 assert!(urls.contains(&"wss://relay1.example.com".to_string()));
946 assert!(urls.contains(&"wss://relay2.example.com".to_string()));
947 } 278 }
948 279
949 #[test] 280 /// Get current tracked relay count
950 fn test_extract_relay_urls_empty_when_no_relays_tag() { 281 pub fn get_tracked_count(&self) -> i64 {
951 let event = create_test_event( 282 self.relays_tracked_total.get()
952 Kind::from(30617_u16),
953 vec![Tag::custom(
954 nostr_relay_builder::prelude::TagKind::custom("d"),
955 vec!["my-project"],
956 )],
957 );
958
959 let urls = SyncManager::extract_relay_urls(&event);
960 assert!(urls.is_empty());
961 } 283 }
962 284
963 // ========================================================================= 285 /// Get current connected relay count
964 // Original data structure tests 286 pub fn get_connected_count(&self) -> i64 {
965 // ========================================================================= 287 self.relays_connected_total.get()
966
967 #[tokio::test]
968 async fn test_following_repo_root_events_basic_operations() {
969 let state = new_following_repo_root_events();
970
971 // Insert some events
972 {
973 let mut guard = state.write().await;
974 let repo_ref = "30617:abc123:my-project".to_string();
975 guard
976 .entry(repo_ref)
977 .or_default()
978 .insert(EventId::all_zeros());
979 }
980
981 // Read back
982 {
983 let guard = state.read().await;
984 assert_eq!(guard.len(), 1);
985 assert!(guard.contains_key("30617:abc123:my-project"));
986 }
987 } 288 }
289}
988 290
989 #[tokio::test] 291/// Event source types for metrics tracking
990 async fn test_sync_relays_basic_operations() { 292pub mod event_source {
991 let state = new_sync_relays(); 293 /// Real-time subscription events
294 pub const LIVE: &str = "live";
295 /// Events from startup catchup
296 pub const STARTUP: &str = "startup";
297 /// Events from reconnection catchup
298 pub const RECONNECT: &str = "reconnect";
299}
992 300
993 // Insert relay with repos 301// =============================================================================
994 { 302// SyncManager - Main Entry Point
995 let mut guard = state.write().await; 303// =============================================================================
996 let relay_url = "wss://relay.example.com".to_string();
997 let repo_ref = "30617:abc123:my-project".to_string();
998 304
999 guard 305/// Manages proactive synchronization with external relays
1000 .entry(relay_url) 306///
1001 .or_default() 307/// The SyncManager runs as a background task, subscribing to repository
1002 .entry(repo_ref) 308/// announcements on the local relay and syncing data from external relays
1003 .or_default() 309/// listed in those announcements.
1004 .insert(EventId::all_zeros()); 310#[allow(dead_code)] // Fields will be used in later phases
1005 } 311pub struct SyncManager {
312 /// Bootstrap relay URL for initial sync (optional)
313 bootstrap_relay_url: Option<String>,
314 /// Our service domain - used for filtering relevant repos
315 service_domain: String,
316 /// Database for event storage and queries
317 database: SharedDatabase,
318 /// Write policy for validating incoming events
319 write_policy: Nip34WritePolicy,
320 /// Configuration reference for sync settings
321 config: Config,
322 /// What we want to sync (source of truth)
323 repo_sync_index: RepoSyncIndex,
324 /// What we've confirmed syncing + connection state
325 relay_sync_index: RelaySyncIndex,
326 /// In-flight subscription batches
327 pending_sync_index: PendingSyncIndex,
328}
1006 329
1007 // Read back 330impl SyncManager {
1008 { 331 /// Create a new SyncManager
1009 let guard = state.read().await; 332 ///
1010 assert_eq!(guard.len(), 1); 333 /// # Arguments
1011 let relay_repos = guard.get("wss://relay.example.com").unwrap(); 334 /// * `bootstrap_relay_url` - Optional relay URL for initial historical sync
1012 assert_eq!(relay_repos.len(), 1); 335 /// * `service_domain` - The domain this relay serves (for filtering repos)
1013 let events = relay_repos.get("30617:abc123:my-project").unwrap(); 336 /// * `database` - Shared database for event storage
1014 assert_eq!(events.len(), 1); 337 /// * `write_policy` - Policy for validating events before storage
338 /// * `config` - Configuration for sync settings
339 pub fn new(
340 bootstrap_relay_url: Option<String>,
341 service_domain: String,
342 database: SharedDatabase,
343 write_policy: Nip34WritePolicy,
344 config: &Config,
345 ) -> Self {
346 Self {
347 bootstrap_relay_url,
348 service_domain,
349 database,
350 write_policy,
351 config: config.clone(),
352 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
353 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
354 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
1015 } 355 }
1016 } 356 }
1017 357
1018 #[tokio::test] 358 /// Run the sync manager (placeholder for Phase 1)
1019 async fn test_concurrent_access() { 359 ///
1020 let state = new_following_repo_root_events(); 360 /// This will be implemented in later phases to:
1021 let state_clone = Arc::clone(&state); 361 /// 1. Subscribe to local relay for 30617 events
1022 362 /// 2. Process events to build RepoSyncIndex
1023 // Writer task 363 /// 3. Compute and execute sync actions
1024 let writer = tokio::spawn(async move { 364 /// 4. Handle reconnection and catch-up logic
1025 let mut guard = state_clone.write().await; 365 pub async fn run(self) {
1026 guard 366 tracing::info!(
1027 .entry("30617:writer:repo".to_string()) 367 bootstrap_relay = ?self.bootstrap_relay_url,
1028 .or_default() 368 service_domain = %self.service_domain,
1029 .insert(EventId::all_zeros()); 369 "SyncManager starting (placeholder - not yet implemented)"
1030 }); 370 );
1031
1032 // Wait for writer
1033 writer.await.unwrap();
1034 371
1035 // Reader should see the change 372 // Phase 1: Just log and return
1036 let guard = state.read().await; 373 // Full implementation will be added in subsequent phases
1037 assert!(guard.contains_key("30617:writer:repo"));
1038 } 374 }
1039} 375} \ No newline at end of file
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
deleted file mode 100644
index 71b5d51..0000000
--- a/src/sync/relay_connection.rs
+++ /dev/null
@@ -1,185 +0,0 @@
1//! Relay Connection for Proactive Sync
2//!
3//! This module handles connecting to external relays and receiving events
4//! for the proactive sync system.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9use tokio::sync::mpsc;
10
11use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE};
12
13/// Events received from a relay connection
14#[derive(Debug)]
15pub enum RelayEvent {
16 /// A nostr event was received
17 Event(Event),
18 /// End of stored events (EOSE) received
19 EndOfStoredEvents,
20 /// Connection was closed
21 Closed(String),
22}
23
24/// Connection to an external relay for syncing events.
25///
26/// RelayConnection handles:
27/// - Connecting to the relay
28/// - Subscribing with appropriate filters (Layer 1 for bootstrap)
29/// - Receiving events and sending them through a channel
30pub struct RelayConnection {
31 /// The relay URL
32 url: String,
33 /// The nostr-sdk client
34 client: Client,
35}
36
37impl RelayConnection {
38 /// Create a new relay connection.
39 ///
40 /// # Arguments
41 ///
42 /// * `url` - The WebSocket URL of the relay to connect to
43 pub fn new(url: String) -> Self {
44 // Create a client with generated keys (we're just subscribing, not publishing)
45 let keys = Keys::generate();
46 let client = Client::new(keys);
47
48 Self { url, client }
49 }
50
51 /// Connect to the relay and subscribe with Layer 1 filter.
52 ///
53 /// Layer 1 filter syncs announcement events (30617, 30618) which are
54 /// the foundation for discovering repository relationships.
55 ///
56 /// Returns the notification stream for receiving events.
57 pub async fn connect_and_subscribe(&self) -> Result<(), String> {
58 // Add the relay
59 self.client
60 .add_relay(&self.url)
61 .await
62 .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?;
63
64 // Connect to relay
65 self.client.connect().await;
66
67 // Wait for connection to establish
68 let mut connected = false;
69 for _ in 0..30 {
70 tokio::time::sleep(Duration::from_millis(100)).await;
71 let relays = self.client.relays().await;
72 if relays.values().any(|r| r.is_connected()) {
73 connected = true;
74 break;
75 }
76 }
77
78 if !connected {
79 return Err(format!(
80 "Failed to connect to relay {} after 3 seconds",
81 self.url
82 ));
83 }
84
85 tracing::info!("Connected to bootstrap relay: {}", self.url);
86
87 // Layer 1 filter: Repository announcements and state events
88 // These are addressable events that define repositories
89 let filter = Filter::new().kinds([
90 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617
91 Kind::Custom(KIND_REPOSITORY_STATE), // 30618
92 ]);
93
94 // Subscribe to the filter
95 self.client
96 .subscribe(filter, None)
97 .await
98 .map_err(|e| format!("Failed to subscribe: {}", e))?;
99
100 tracing::debug!(
101 "Subscribed to Layer 1 events (kinds 30617, 30618) from {}",
102 self.url
103 );
104
105 Ok(())
106 }
107
108 /// Run the event loop, sending received events through the channel.
109 ///
110 /// This method runs until the connection is closed or an error occurs.
111 ///
112 /// # Arguments
113 ///
114 /// * `event_sender` - Channel to send received events
115 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
116 tracing::debug!("Starting event loop for relay: {}", self.url);
117
118 // Handle notifications
119 self.client
120 .handle_notifications(|notification| async {
121 match notification {
122 RelayPoolNotification::Event { event, .. } => {
123 tracing::debug!(
124 "Received event {} (kind {}) from {}",
125 event.id,
126 event.kind.as_u16(),
127 self.url
128 );
129 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
130 tracing::warn!("Event channel closed, stopping relay connection");
131 return Ok(true); // Stop handling
132 }
133 }
134 RelayPoolNotification::Message { message, .. } => {
135 if let RelayMessage::EndOfStoredEvents(_) = message {
136 tracing::debug!("EOSE received from {}", self.url);
137 if event_sender
138 .send(RelayEvent::EndOfStoredEvents)
139 .await
140 .is_err()
141 {
142 return Ok(true); // Stop handling
143 }
144 }
145 }
146 RelayPoolNotification::Shutdown => {
147 tracing::info!("Relay {} shutting down", self.url);
148 let _ = event_sender
149 .send(RelayEvent::Closed("Shutdown".to_string()))
150 .await;
151 return Ok(true); // Stop handling
152 }
153 }
154 Ok(false) // Continue handling
155 })
156 .await
157 .ok(); // Ignore errors on shutdown
158
159 // Disconnect when done
160 self.client.disconnect().await;
161 tracing::info!("Disconnected from relay: {}", self.url);
162 }
163
164 /// Get the relay URL
165 pub fn url(&self) -> &str {
166 &self.url
167 }
168
169 /// Subscribe to an additional filter.
170 ///
171 /// This is used to add Layer 2 filters for repo-related events after
172 /// the initial connection is established.
173 pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> {
174 self.client
175 .subscribe(filter, None)
176 .await
177 .map_err(|e| format!("Failed to subscribe with filter: {}", e))?;
178 Ok(())
179 }
180
181 /// Get a reference to the client for additional operations.
182 pub fn client(&self) -> &Client {
183 &self.client
184 }
185}
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
deleted file mode 100644
index 0512088..0000000
--- a/src/sync/self_subscriber.rs
+++ /dev/null
@@ -1,497 +0,0 @@
1//! Self-Subscriber for Proactive Sync
2//!
3//! This module handles subscribing to our own relay to detect new events
4//! and trigger relay discovery from announcements.
5
6use std::collections::{HashMap, HashSet};
7use std::time::Duration;
8
9use nostr_sdk::prelude::*;
10use tokio::sync::mpsc;
11use tokio::time::Instant;
12
13use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT};
14
15use super::{FollowingRepoRootEvents, SyncManager, SyncRelays};
16
17// =============================================================================
18// Types
19// =============================================================================
20
21/// Actions to be taken by the SyncManager based on self-subscription events.
22#[derive(Debug, Clone)]
23pub enum RelayAction {
24 /// Spawn a new relay connection to sync from.
25 /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering.
26 SpawnRelay {
27 relay_url: String,
28 repos_and_root_events: HashMap<String, HashSet<EventId>>,
29 },
30 /// Add filters to an existing relay connection.
31 /// Contains: relay_url, additional repos to add.
32 AddFilters {
33 relay_url: String,
34 repos_and_new_root_event: HashMap<String, HashSet<EventId>>,
35 },
36}
37
38/// Pending updates collected during batch window.
39#[derive(Debug, Default)]
40struct PendingUpdates {
41 /// New announcements (kind 30617) - triggers relay discovery
42 announcements: Vec<Event>,
43 /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set
44 root_events: Vec<Event>,
45}
46
47// =============================================================================
48// SelfSubscriber
49// =============================================================================
50
51/// Subscribes to our own relay to detect new events.
52///
53/// The self-subscriber:
54/// 1. Connects to our own relay
55/// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618)
56/// 3. When events arrive, batches them
57/// 4. On batch timer fire, processes updates and sends relay actions
58pub struct SelfSubscriber {
59 /// URL of our own relay to subscribe to
60 own_relay_url: String,
61 /// Our relay domain for checking if announcements list us
62 relay_domain: String,
63 /// Reference to following repo root events (shared with SyncManager)
64 following_repo_root_events: FollowingRepoRootEvents,
65 /// Reference to sync relays (shared with SyncManager)
66 sync_relays: SyncRelays,
67 /// Channel to send relay actions back to manager
68 action_tx: mpsc::Sender<RelayAction>,
69}
70
71impl SelfSubscriber {
72 /// Create a new self-subscriber.
73 pub fn new(
74 own_relay_url: String,
75 relay_domain: String,
76 following_repo_root_events: FollowingRepoRootEvents,
77 sync_relays: SyncRelays,
78 action_tx: mpsc::Sender<RelayAction>,
79 ) -> Self {
80 Self {
81 own_relay_url,
82 relay_domain,
83 following_repo_root_events,
84 sync_relays,
85 action_tx,
86 }
87 }
88
89 /// Get the batch window duration from environment variable.
90 ///
91 /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS
92 /// for faster tests (typically 200ms).
93 fn get_batch_window() -> Duration {
94 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS")
95 .ok()
96 .and_then(|s| s.parse().ok())
97 .map(Duration::from_millis)
98 .unwrap_or(Duration::from_secs(5))
99 }
100
101 /// Run the self-subscriber event loop.
102 ///
103 /// This method:
104 /// 1. Connects to our own relay
105 /// 2. Subscribes to relevant event kinds
106 /// 3. Receives events and batches them
107 /// 4. On batch timer fire, processes and sends relay actions
108 pub async fn run(self) {
109 tracing::info!("SelfSubscriber starting for {}", self.own_relay_url);
110
111 // Create nostr-sdk client
112 let keys = Keys::generate();
113 let client = Client::new(keys);
114
115 // Connect to our own relay
116 if let Err(e) = client.add_relay(&self.own_relay_url).await {
117 tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e);
118 return;
119 }
120
121 client.connect().await;
122
123 // Wait for connection
124 let mut connected = false;
125 for _ in 0..30 {
126 tokio::time::sleep(Duration::from_millis(100)).await;
127 let relays = client.relays().await;
128 if relays.values().any(|r| r.is_connected()) {
129 connected = true;
130 break;
131 }
132 }
133
134 if !connected {
135 tracing::error!(
136 "Failed to connect to own relay {} after 3 seconds",
137 self.own_relay_url
138 );
139 return;
140 }
141
142 tracing::info!("SelfSubscriber connected to {}", self.own_relay_url);
143
144 // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design)
145 let filter = Filter::new()
146 .kinds([
147 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617
148 Kind::GitPatch, // 1617
149 Kind::Custom(KIND_PR), // 1618
150 Kind::Custom(KIND_PR_UPDATE), // 1619
151 Kind::GitIssue, // 1621
152 ])
153 .since(Timestamp::now());
154
155 if let Err(e) = client.subscribe(filter, None).await {
156 tracing::error!("Failed to subscribe to own relay: {}", e);
157 return;
158 }
159
160 tracing::info!("SelfSubscriber subscribed to event kinds on own relay");
161
162 // Batch state
163 let mut pending = PendingUpdates::default();
164 let mut batch_timer_started: Option<Instant> = None;
165 let batch_window = Self::get_batch_window();
166
167 // Main event loop using notifications stream
168 loop {
169 // Calculate timeout for batch processing
170 let timeout = if let Some(started) = batch_timer_started {
171 let elapsed = started.elapsed();
172 if elapsed >= batch_window {
173 Duration::ZERO
174 } else {
175 batch_window - elapsed
176 }
177 } else {
178 Duration::from_secs(60) // Long timeout when no batch pending
179 };
180
181 // Wait for notification with timeout
182 let notification = tokio::time::timeout(timeout, client.notifications().recv()).await;
183
184 match notification {
185 Ok(Ok(notification)) => {
186 match notification {
187 RelayPoolNotification::Event { event, .. } => {
188 let kind = event.kind.as_u16();
189
190 // Start batch timer on first event (does NOT reset)
191 if batch_timer_started.is_none() {
192 batch_timer_started = Some(Instant::now());
193 tracing::debug!("Batch timer started");
194 }
195
196 // Classify and add to pending
197 if kind == KIND_REPOSITORY_ANNOUNCEMENT {
198 tracing::debug!(
199 "SelfSubscriber received announcement {}",
200 event.id
201 );
202 pending.announcements.push(*event);
203 } else {
204 tracing::debug!(
205 "SelfSubscriber received root event {} (kind {})",
206 event.id,
207 kind
208 );
209 pending.root_events.push(*event);
210 }
211 }
212 RelayPoolNotification::Message { message, .. } => {
213 if let RelayMessage::EndOfStoredEvents(_) = message {
214 tracing::debug!("SelfSubscriber EOSE received");
215 // Process any pending events after EOSE
216 if !pending.announcements.is_empty()
217 || !pending.root_events.is_empty()
218 {
219 self.process_batch(&mut pending).await;
220 batch_timer_started = None;
221 }
222 }
223 }
224 RelayPoolNotification::Shutdown => {
225 tracing::info!("SelfSubscriber shutting down");
226 break;
227 }
228 }
229 }
230 Ok(Err(_)) => {
231 // Channel closed
232 tracing::warn!("SelfSubscriber notification channel closed");
233 break;
234 }
235 Err(_) => {
236 // Timeout - check if batch should be processed
237 if let Some(started) = batch_timer_started {
238 if started.elapsed() >= batch_window {
239 if !pending.announcements.is_empty() || !pending.root_events.is_empty()
240 {
241 self.process_batch(&mut pending).await;
242 }
243 batch_timer_started = None;
244 }
245 }
246 }
247 }
248 }
249
250 client.disconnect().await;
251 tracing::info!("SelfSubscriber disconnected");
252 }
253
254 /// Process a batch of pending updates.
255 async fn process_batch(&self, pending: &mut PendingUpdates) {
256 tracing::debug!(
257 "Processing batch: {} announcements, {} root events",
258 pending.announcements.len(),
259 pending.root_events.len()
260 );
261
262 // Process root events first (update following_repo_root_events)
263 for event in pending.root_events.drain(..) {
264 let repo_refs = SyncManager::extract_all_repo_refs(&event);
265 if !repo_refs.is_empty() {
266 let mut guard = self.following_repo_root_events.write().await;
267 for repo_ref in repo_refs {
268 guard.entry(repo_ref).or_default().insert(event.id);
269 }
270 }
271 }
272
273 // Process announcements (relay discovery)
274 for event in pending.announcements.drain(..) {
275 self.process_announcement(&event).await;
276 }
277 }
278
279 /// Process an announcement event for relay discovery.
280 async fn process_announcement(&self, event: &Event) {
281 let repo_ref = SyncManager::build_repo_ref(event);
282 let relay_urls = Self::extract_relay_urls_from_announcement(event);
283
284 // Check if this announcement lists our relay
285 if !self.lists_our_service(event) {
286 tracing::debug!(
287 "Announcement {} does not list our service, skipping relay discovery",
288 event.id
289 );
290 return;
291 }
292
293 tracing::info!(
294 "Processing announcement {} for repo {}, found {} relay URLs",
295 event.id,
296 repo_ref,
297 relay_urls.len()
298 );
299
300 // Get current events for this repo from following_repo_root_events
301 let events = self
302 .following_repo_root_events
303 .read()
304 .await
305 .get(&repo_ref)
306 .cloned()
307 .unwrap_or_default();
308
309 // For each relay URL in the announcement, check if we need to spawn or update
310 for relay_url in relay_urls {
311 if self.is_own_relay(&relay_url) {
312 continue; // Skip our own relay
313 }
314
315 let sync_relays_guard = self.sync_relays.read().await;
316 let exists = sync_relays_guard.contains_key(&relay_url);
317 drop(sync_relays_guard);
318
319 if exists {
320 // Relay already known - check if we need to add this repo
321 let mut guard = self.sync_relays.write().await;
322 let relay_repos = guard.entry(relay_url.clone()).or_default();
323 let is_new_repo = !relay_repos.contains_key(&repo_ref);
324
325 if is_new_repo {
326 relay_repos.insert(repo_ref.clone(), events.clone());
327 drop(guard);
328
329 // Send action to add filters
330 let mut repos_filters = HashMap::new();
331 repos_filters.insert(repo_ref.clone(), events.clone());
332
333 if let Err(e) = self
334 .action_tx
335 .send(RelayAction::AddFilters {
336 relay_url: relay_url.clone(),
337 repos_and_new_root_event: repos_filters,
338 })
339 .await
340 {
341 tracing::warn!("Failed to send AddFilters action: {}", e);
342 }
343 }
344 } else {
345 // New relay - add to sync_relays and spawn
346 let mut guard = self.sync_relays.write().await;
347 let mut repos = HashMap::new();
348 repos.insert(repo_ref.clone(), events.clone());
349 guard.insert(relay_url.clone(), repos.clone());
350 drop(guard);
351
352 tracing::info!("Discovered new relay to sync from: {}", relay_url);
353
354 // Send action to spawn relay
355 if let Err(e) = self
356 .action_tx
357 .send(RelayAction::SpawnRelay {
358 relay_url: relay_url.clone(),
359 repos_and_root_events: repos,
360 })
361 .await
362 {
363 tracing::warn!("Failed to send SpawnRelay action: {}", e);
364 }
365 }
366 }
367 }
368
369 /// Extract relay URLs from an announcement event.
370 ///
371 /// Looks for both 'relays' and 'clone' tags.
372 fn extract_relay_urls_from_announcement(event: &Event) -> Vec<String> {
373 let mut urls = Vec::new();
374
375 // Extract from 'relays' tag
376 for tag in event.tags.iter() {
377 if matches!(tag.kind(), TagKind::Relays) {
378 let vec = tag.clone().to_vec();
379 urls.extend(vec.into_iter().skip(1)); // Skip tag name
380 }
381 }
382
383 // Extract from 'clone' tag - parse URLs to get relay hints
384 // Clone URLs look like: http://domain/repo.git or git://domain/repo.git
385 // We want to construct ws://domain from these
386 for tag in event.tags.iter() {
387 if matches!(tag.kind(), TagKind::Clone) {
388 let vec = tag.clone().to_vec();
389 for url in vec.into_iter().skip(1) {
390 if let Some(relay_url) = Self::clone_url_to_relay_url(&url) {
391 if !urls.contains(&relay_url) {
392 urls.push(relay_url);
393 }
394 }
395 }
396 }
397 }
398
399 urls
400 }
401
402 /// Convert a clone URL to a potential relay URL.
403 ///
404 /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080"
405 fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
406 // Parse the URL to extract host:port
407 if let Ok(url) = url::Url::parse(clone_url) {
408 let host = url.host_str()?;
409 let port = url.port();
410 let scheme = if url.scheme() == "https" { "wss" } else { "ws" };
411
412 if let Some(port) = port {
413 Some(format!("{}://{}:{}", scheme, host, port))
414 } else {
415 Some(format!("{}://{}", scheme, host))
416 }
417 } else {
418 None
419 }
420 }
421
422 /// Check if event lists our service in the relays or clone tags.
423 fn lists_our_service(&self, event: &Event) -> bool {
424 // Check relays tag
425 for tag in event.tags.iter() {
426 if matches!(tag.kind(), TagKind::Relays) {
427 let vec = tag.clone().to_vec();
428 for url in vec.into_iter().skip(1) {
429 if self.is_own_relay(&url) {
430 return true;
431 }
432 }
433 }
434 }
435
436 // Check clone tag
437 for tag in event.tags.iter() {
438 if matches!(tag.kind(), TagKind::Clone) {
439 let vec = tag.clone().to_vec();
440 for url in vec.into_iter().skip(1) {
441 if url.contains(&self.relay_domain) {
442 return true;
443 }
444 }
445 }
446 }
447
448 false
449 }
450
451 /// Check if a relay URL matches our relay.
452 fn is_own_relay(&self, relay_url: &str) -> bool {
453 relay_url.contains(&self.relay_domain)
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[test]
462 fn test_clone_url_to_relay_url_http() {
463 let url = "http://127.0.0.1:8080/repo.git";
464 let relay = SelfSubscriber::clone_url_to_relay_url(url);
465 assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string()));
466 }
467
468 #[test]
469 fn test_clone_url_to_relay_url_https() {
470 let url = "https://example.com/repo.git";
471 let relay = SelfSubscriber::clone_url_to_relay_url(url);
472 assert_eq!(relay, Some("wss://example.com".to_string()));
473 }
474
475 #[test]
476 fn test_clone_url_to_relay_url_invalid() {
477 let url = "not-a-valid-url";
478 let relay = SelfSubscriber::clone_url_to_relay_url(url);
479 assert_eq!(relay, None);
480 }
481
482 #[test]
483 fn test_get_batch_window_default() {
484 // Clear env var if set
485 std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS");
486 let window = SelfSubscriber::get_batch_window();
487 assert_eq!(window, Duration::from_secs(5));
488 }
489
490 #[test]
491 fn test_get_batch_window_from_env() {
492 std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200");
493 let window = SelfSubscriber::get_batch_window();
494 assert_eq!(window, Duration::from_millis(200));
495 std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS");
496 }
497}