diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 10:33:07 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 10:33:07 +0000 |
| commit | 586fc2a7df1ce256469f0742d23f687ac4b075b1 (patch) | |
| tree | dc07dbec88ea1ca2e80b4ced91831256bb68ce4e /src/sync/self_subscriber.rs | |
| parent | 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b (diff) | |
stub of sync v4
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 497 |
1 files changed, 0 insertions, 497 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs deleted file mode 100644 index 0512088..0000000 --- a/src/sync/self_subscriber.rs +++ /dev/null | |||
| @@ -1,497 +0,0 @@ | |||
| 1 | //! Self-Subscriber for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module handles subscribing to our own relay to detect new events | ||
| 4 | //! and trigger relay discovery from announcements. | ||
| 5 | |||
| 6 | use std::collections::{HashMap, HashSet}; | ||
| 7 | use std::time::Duration; | ||
| 8 | |||
| 9 | use nostr_sdk::prelude::*; | ||
| 10 | use tokio::sync::mpsc; | ||
| 11 | use tokio::time::Instant; | ||
| 12 | |||
| 13 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | ||
| 14 | |||
| 15 | use super::{FollowingRepoRootEvents, SyncManager, SyncRelays}; | ||
| 16 | |||
| 17 | // ============================================================================= | ||
| 18 | // Types | ||
| 19 | // ============================================================================= | ||
| 20 | |||
| 21 | /// Actions to be taken by the SyncManager based on self-subscription events. | ||
| 22 | #[derive(Debug, Clone)] | ||
| 23 | pub enum RelayAction { | ||
| 24 | /// Spawn a new relay connection to sync from. | ||
| 25 | /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering. | ||
| 26 | SpawnRelay { | ||
| 27 | relay_url: String, | ||
| 28 | repos_and_root_events: HashMap<String, HashSet<EventId>>, | ||
| 29 | }, | ||
| 30 | /// Add filters to an existing relay connection. | ||
| 31 | /// Contains: relay_url, additional repos to add. | ||
| 32 | AddFilters { | ||
| 33 | relay_url: String, | ||
| 34 | repos_and_new_root_event: HashMap<String, HashSet<EventId>>, | ||
| 35 | }, | ||
| 36 | } | ||
| 37 | |||
| 38 | /// Pending updates collected during batch window. | ||
| 39 | #[derive(Debug, Default)] | ||
| 40 | struct PendingUpdates { | ||
| 41 | /// New announcements (kind 30617) - triggers relay discovery | ||
| 42 | announcements: Vec<Event>, | ||
| 43 | /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set | ||
| 44 | root_events: Vec<Event>, | ||
| 45 | } | ||
| 46 | |||
| 47 | // ============================================================================= | ||
| 48 | // SelfSubscriber | ||
| 49 | // ============================================================================= | ||
| 50 | |||
| 51 | /// Subscribes to our own relay to detect new events. | ||
| 52 | /// | ||
| 53 | /// The self-subscriber: | ||
| 54 | /// 1. Connects to our own relay | ||
| 55 | /// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618) | ||
| 56 | /// 3. When events arrive, batches them | ||
| 57 | /// 4. On batch timer fire, processes updates and sends relay actions | ||
| 58 | pub struct SelfSubscriber { | ||
| 59 | /// URL of our own relay to subscribe to | ||
| 60 | own_relay_url: String, | ||
| 61 | /// Our relay domain for checking if announcements list us | ||
| 62 | relay_domain: String, | ||
| 63 | /// Reference to following repo root events (shared with SyncManager) | ||
| 64 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 65 | /// Reference to sync relays (shared with SyncManager) | ||
| 66 | sync_relays: SyncRelays, | ||
| 67 | /// Channel to send relay actions back to manager | ||
| 68 | action_tx: mpsc::Sender<RelayAction>, | ||
| 69 | } | ||
| 70 | |||
| 71 | impl SelfSubscriber { | ||
| 72 | /// Create a new self-subscriber. | ||
| 73 | pub fn new( | ||
| 74 | own_relay_url: String, | ||
| 75 | relay_domain: String, | ||
| 76 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 77 | sync_relays: SyncRelays, | ||
| 78 | action_tx: mpsc::Sender<RelayAction>, | ||
| 79 | ) -> Self { | ||
| 80 | Self { | ||
| 81 | own_relay_url, | ||
| 82 | relay_domain, | ||
| 83 | following_repo_root_events, | ||
| 84 | sync_relays, | ||
| 85 | action_tx, | ||
| 86 | } | ||
| 87 | } | ||
| 88 | |||
| 89 | /// Get the batch window duration from environment variable. | ||
| 90 | /// | ||
| 91 | /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS | ||
| 92 | /// for faster tests (typically 200ms). | ||
| 93 | fn get_batch_window() -> Duration { | ||
| 94 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | ||
| 95 | .ok() | ||
| 96 | .and_then(|s| s.parse().ok()) | ||
| 97 | .map(Duration::from_millis) | ||
| 98 | .unwrap_or(Duration::from_secs(5)) | ||
| 99 | } | ||
| 100 | |||
| 101 | /// Run the self-subscriber event loop. | ||
| 102 | /// | ||
| 103 | /// This method: | ||
| 104 | /// 1. Connects to our own relay | ||
| 105 | /// 2. Subscribes to relevant event kinds | ||
| 106 | /// 3. Receives events and batches them | ||
| 107 | /// 4. On batch timer fire, processes and sends relay actions | ||
| 108 | pub async fn run(self) { | ||
| 109 | tracing::info!("SelfSubscriber starting for {}", self.own_relay_url); | ||
| 110 | |||
| 111 | // Create nostr-sdk client | ||
| 112 | let keys = Keys::generate(); | ||
| 113 | let client = Client::new(keys); | ||
| 114 | |||
| 115 | // Connect to our own relay | ||
| 116 | if let Err(e) = client.add_relay(&self.own_relay_url).await { | ||
| 117 | tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e); | ||
| 118 | return; | ||
| 119 | } | ||
| 120 | |||
| 121 | client.connect().await; | ||
| 122 | |||
| 123 | // Wait for connection | ||
| 124 | let mut connected = false; | ||
| 125 | for _ in 0..30 { | ||
| 126 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 127 | let relays = client.relays().await; | ||
| 128 | if relays.values().any(|r| r.is_connected()) { | ||
| 129 | connected = true; | ||
| 130 | break; | ||
| 131 | } | ||
| 132 | } | ||
| 133 | |||
| 134 | if !connected { | ||
| 135 | tracing::error!( | ||
| 136 | "Failed to connect to own relay {} after 3 seconds", | ||
| 137 | self.own_relay_url | ||
| 138 | ); | ||
| 139 | return; | ||
| 140 | } | ||
| 141 | |||
| 142 | tracing::info!("SelfSubscriber connected to {}", self.own_relay_url); | ||
| 143 | |||
| 144 | // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design) | ||
| 145 | let filter = Filter::new() | ||
| 146 | .kinds([ | ||
| 147 | Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 | ||
| 148 | Kind::GitPatch, // 1617 | ||
| 149 | Kind::Custom(KIND_PR), // 1618 | ||
| 150 | Kind::Custom(KIND_PR_UPDATE), // 1619 | ||
| 151 | Kind::GitIssue, // 1621 | ||
| 152 | ]) | ||
| 153 | .since(Timestamp::now()); | ||
| 154 | |||
| 155 | if let Err(e) = client.subscribe(filter, None).await { | ||
| 156 | tracing::error!("Failed to subscribe to own relay: {}", e); | ||
| 157 | return; | ||
| 158 | } | ||
| 159 | |||
| 160 | tracing::info!("SelfSubscriber subscribed to event kinds on own relay"); | ||
| 161 | |||
| 162 | // Batch state | ||
| 163 | let mut pending = PendingUpdates::default(); | ||
| 164 | let mut batch_timer_started: Option<Instant> = None; | ||
| 165 | let batch_window = Self::get_batch_window(); | ||
| 166 | |||
| 167 | // Main event loop using notifications stream | ||
| 168 | loop { | ||
| 169 | // Calculate timeout for batch processing | ||
| 170 | let timeout = if let Some(started) = batch_timer_started { | ||
| 171 | let elapsed = started.elapsed(); | ||
| 172 | if elapsed >= batch_window { | ||
| 173 | Duration::ZERO | ||
| 174 | } else { | ||
| 175 | batch_window - elapsed | ||
| 176 | } | ||
| 177 | } else { | ||
| 178 | Duration::from_secs(60) // Long timeout when no batch pending | ||
| 179 | }; | ||
| 180 | |||
| 181 | // Wait for notification with timeout | ||
| 182 | let notification = tokio::time::timeout(timeout, client.notifications().recv()).await; | ||
| 183 | |||
| 184 | match notification { | ||
| 185 | Ok(Ok(notification)) => { | ||
| 186 | match notification { | ||
| 187 | RelayPoolNotification::Event { event, .. } => { | ||
| 188 | let kind = event.kind.as_u16(); | ||
| 189 | |||
| 190 | // Start batch timer on first event (does NOT reset) | ||
| 191 | if batch_timer_started.is_none() { | ||
| 192 | batch_timer_started = Some(Instant::now()); | ||
| 193 | tracing::debug!("Batch timer started"); | ||
| 194 | } | ||
| 195 | |||
| 196 | // Classify and add to pending | ||
| 197 | if kind == KIND_REPOSITORY_ANNOUNCEMENT { | ||
| 198 | tracing::debug!( | ||
| 199 | "SelfSubscriber received announcement {}", | ||
| 200 | event.id | ||
| 201 | ); | ||
| 202 | pending.announcements.push(*event); | ||
| 203 | } else { | ||
| 204 | tracing::debug!( | ||
| 205 | "SelfSubscriber received root event {} (kind {})", | ||
| 206 | event.id, | ||
| 207 | kind | ||
| 208 | ); | ||
| 209 | pending.root_events.push(*event); | ||
| 210 | } | ||
| 211 | } | ||
| 212 | RelayPoolNotification::Message { message, .. } => { | ||
| 213 | if let RelayMessage::EndOfStoredEvents(_) = message { | ||
| 214 | tracing::debug!("SelfSubscriber EOSE received"); | ||
| 215 | // Process any pending events after EOSE | ||
| 216 | if !pending.announcements.is_empty() | ||
| 217 | || !pending.root_events.is_empty() | ||
| 218 | { | ||
| 219 | self.process_batch(&mut pending).await; | ||
| 220 | batch_timer_started = None; | ||
| 221 | } | ||
| 222 | } | ||
| 223 | } | ||
| 224 | RelayPoolNotification::Shutdown => { | ||
| 225 | tracing::info!("SelfSubscriber shutting down"); | ||
| 226 | break; | ||
| 227 | } | ||
| 228 | } | ||
| 229 | } | ||
| 230 | Ok(Err(_)) => { | ||
| 231 | // Channel closed | ||
| 232 | tracing::warn!("SelfSubscriber notification channel closed"); | ||
| 233 | break; | ||
| 234 | } | ||
| 235 | Err(_) => { | ||
| 236 | // Timeout - check if batch should be processed | ||
| 237 | if let Some(started) = batch_timer_started { | ||
| 238 | if started.elapsed() >= batch_window { | ||
| 239 | if !pending.announcements.is_empty() || !pending.root_events.is_empty() | ||
| 240 | { | ||
| 241 | self.process_batch(&mut pending).await; | ||
| 242 | } | ||
| 243 | batch_timer_started = None; | ||
| 244 | } | ||
| 245 | } | ||
| 246 | } | ||
| 247 | } | ||
| 248 | } | ||
| 249 | |||
| 250 | client.disconnect().await; | ||
| 251 | tracing::info!("SelfSubscriber disconnected"); | ||
| 252 | } | ||
| 253 | |||
| 254 | /// Process a batch of pending updates. | ||
| 255 | async fn process_batch(&self, pending: &mut PendingUpdates) { | ||
| 256 | tracing::debug!( | ||
| 257 | "Processing batch: {} announcements, {} root events", | ||
| 258 | pending.announcements.len(), | ||
| 259 | pending.root_events.len() | ||
| 260 | ); | ||
| 261 | |||
| 262 | // Process root events first (update following_repo_root_events) | ||
| 263 | for event in pending.root_events.drain(..) { | ||
| 264 | let repo_refs = SyncManager::extract_all_repo_refs(&event); | ||
| 265 | if !repo_refs.is_empty() { | ||
| 266 | let mut guard = self.following_repo_root_events.write().await; | ||
| 267 | for repo_ref in repo_refs { | ||
| 268 | guard.entry(repo_ref).or_default().insert(event.id); | ||
| 269 | } | ||
| 270 | } | ||
| 271 | } | ||
| 272 | |||
| 273 | // Process announcements (relay discovery) | ||
| 274 | for event in pending.announcements.drain(..) { | ||
| 275 | self.process_announcement(&event).await; | ||
| 276 | } | ||
| 277 | } | ||
| 278 | |||
| 279 | /// Process an announcement event for relay discovery. | ||
| 280 | async fn process_announcement(&self, event: &Event) { | ||
| 281 | let repo_ref = SyncManager::build_repo_ref(event); | ||
| 282 | let relay_urls = Self::extract_relay_urls_from_announcement(event); | ||
| 283 | |||
| 284 | // Check if this announcement lists our relay | ||
| 285 | if !self.lists_our_service(event) { | ||
| 286 | tracing::debug!( | ||
| 287 | "Announcement {} does not list our service, skipping relay discovery", | ||
| 288 | event.id | ||
| 289 | ); | ||
| 290 | return; | ||
| 291 | } | ||
| 292 | |||
| 293 | tracing::info!( | ||
| 294 | "Processing announcement {} for repo {}, found {} relay URLs", | ||
| 295 | event.id, | ||
| 296 | repo_ref, | ||
| 297 | relay_urls.len() | ||
| 298 | ); | ||
| 299 | |||
| 300 | // Get current events for this repo from following_repo_root_events | ||
| 301 | let events = self | ||
| 302 | .following_repo_root_events | ||
| 303 | .read() | ||
| 304 | .await | ||
| 305 | .get(&repo_ref) | ||
| 306 | .cloned() | ||
| 307 | .unwrap_or_default(); | ||
| 308 | |||
| 309 | // For each relay URL in the announcement, check if we need to spawn or update | ||
| 310 | for relay_url in relay_urls { | ||
| 311 | if self.is_own_relay(&relay_url) { | ||
| 312 | continue; // Skip our own relay | ||
| 313 | } | ||
| 314 | |||
| 315 | let sync_relays_guard = self.sync_relays.read().await; | ||
| 316 | let exists = sync_relays_guard.contains_key(&relay_url); | ||
| 317 | drop(sync_relays_guard); | ||
| 318 | |||
| 319 | if exists { | ||
| 320 | // Relay already known - check if we need to add this repo | ||
| 321 | let mut guard = self.sync_relays.write().await; | ||
| 322 | let relay_repos = guard.entry(relay_url.clone()).or_default(); | ||
| 323 | let is_new_repo = !relay_repos.contains_key(&repo_ref); | ||
| 324 | |||
| 325 | if is_new_repo { | ||
| 326 | relay_repos.insert(repo_ref.clone(), events.clone()); | ||
| 327 | drop(guard); | ||
| 328 | |||
| 329 | // Send action to add filters | ||
| 330 | let mut repos_filters = HashMap::new(); | ||
| 331 | repos_filters.insert(repo_ref.clone(), events.clone()); | ||
| 332 | |||
| 333 | if let Err(e) = self | ||
| 334 | .action_tx | ||
| 335 | .send(RelayAction::AddFilters { | ||
| 336 | relay_url: relay_url.clone(), | ||
| 337 | repos_and_new_root_event: repos_filters, | ||
| 338 | }) | ||
| 339 | .await | ||
| 340 | { | ||
| 341 | tracing::warn!("Failed to send AddFilters action: {}", e); | ||
| 342 | } | ||
| 343 | } | ||
| 344 | } else { | ||
| 345 | // New relay - add to sync_relays and spawn | ||
| 346 | let mut guard = self.sync_relays.write().await; | ||
| 347 | let mut repos = HashMap::new(); | ||
| 348 | repos.insert(repo_ref.clone(), events.clone()); | ||
| 349 | guard.insert(relay_url.clone(), repos.clone()); | ||
| 350 | drop(guard); | ||
| 351 | |||
| 352 | tracing::info!("Discovered new relay to sync from: {}", relay_url); | ||
| 353 | |||
| 354 | // Send action to spawn relay | ||
| 355 | if let Err(e) = self | ||
| 356 | .action_tx | ||
| 357 | .send(RelayAction::SpawnRelay { | ||
| 358 | relay_url: relay_url.clone(), | ||
| 359 | repos_and_root_events: repos, | ||
| 360 | }) | ||
| 361 | .await | ||
| 362 | { | ||
| 363 | tracing::warn!("Failed to send SpawnRelay action: {}", e); | ||
| 364 | } | ||
| 365 | } | ||
| 366 | } | ||
| 367 | } | ||
| 368 | |||
| 369 | /// Extract relay URLs from an announcement event. | ||
| 370 | /// | ||
| 371 | /// Looks for both 'relays' and 'clone' tags. | ||
| 372 | fn extract_relay_urls_from_announcement(event: &Event) -> Vec<String> { | ||
| 373 | let mut urls = Vec::new(); | ||
| 374 | |||
| 375 | // Extract from 'relays' tag | ||
| 376 | for tag in event.tags.iter() { | ||
| 377 | if matches!(tag.kind(), TagKind::Relays) { | ||
| 378 | let vec = tag.clone().to_vec(); | ||
| 379 | urls.extend(vec.into_iter().skip(1)); // Skip tag name | ||
| 380 | } | ||
| 381 | } | ||
| 382 | |||
| 383 | // Extract from 'clone' tag - parse URLs to get relay hints | ||
| 384 | // Clone URLs look like: http://domain/repo.git or git://domain/repo.git | ||
| 385 | // We want to construct ws://domain from these | ||
| 386 | for tag in event.tags.iter() { | ||
| 387 | if matches!(tag.kind(), TagKind::Clone) { | ||
| 388 | let vec = tag.clone().to_vec(); | ||
| 389 | for url in vec.into_iter().skip(1) { | ||
| 390 | if let Some(relay_url) = Self::clone_url_to_relay_url(&url) { | ||
| 391 | if !urls.contains(&relay_url) { | ||
| 392 | urls.push(relay_url); | ||
| 393 | } | ||
| 394 | } | ||
| 395 | } | ||
| 396 | } | ||
| 397 | } | ||
| 398 | |||
| 399 | urls | ||
| 400 | } | ||
| 401 | |||
| 402 | /// Convert a clone URL to a potential relay URL. | ||
| 403 | /// | ||
| 404 | /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080" | ||
| 405 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | ||
| 406 | // Parse the URL to extract host:port | ||
| 407 | if let Ok(url) = url::Url::parse(clone_url) { | ||
| 408 | let host = url.host_str()?; | ||
| 409 | let port = url.port(); | ||
| 410 | let scheme = if url.scheme() == "https" { "wss" } else { "ws" }; | ||
| 411 | |||
| 412 | if let Some(port) = port { | ||
| 413 | Some(format!("{}://{}:{}", scheme, host, port)) | ||
| 414 | } else { | ||
| 415 | Some(format!("{}://{}", scheme, host)) | ||
| 416 | } | ||
| 417 | } else { | ||
| 418 | None | ||
| 419 | } | ||
| 420 | } | ||
| 421 | |||
| 422 | /// Check if event lists our service in the relays or clone tags. | ||
| 423 | fn lists_our_service(&self, event: &Event) -> bool { | ||
| 424 | // Check relays tag | ||
| 425 | for tag in event.tags.iter() { | ||
| 426 | if matches!(tag.kind(), TagKind::Relays) { | ||
| 427 | let vec = tag.clone().to_vec(); | ||
| 428 | for url in vec.into_iter().skip(1) { | ||
| 429 | if self.is_own_relay(&url) { | ||
| 430 | return true; | ||
| 431 | } | ||
| 432 | } | ||
| 433 | } | ||
| 434 | } | ||
| 435 | |||
| 436 | // Check clone tag | ||
| 437 | for tag in event.tags.iter() { | ||
| 438 | if matches!(tag.kind(), TagKind::Clone) { | ||
| 439 | let vec = tag.clone().to_vec(); | ||
| 440 | for url in vec.into_iter().skip(1) { | ||
| 441 | if url.contains(&self.relay_domain) { | ||
| 442 | return true; | ||
| 443 | } | ||
| 444 | } | ||
| 445 | } | ||
| 446 | } | ||
| 447 | |||
| 448 | false | ||
| 449 | } | ||
| 450 | |||
| 451 | /// Check if a relay URL matches our relay. | ||
| 452 | fn is_own_relay(&self, relay_url: &str) -> bool { | ||
| 453 | relay_url.contains(&self.relay_domain) | ||
| 454 | } | ||
| 455 | } | ||
| 456 | |||
| 457 | #[cfg(test)] | ||
| 458 | mod tests { | ||
| 459 | use super::*; | ||
| 460 | |||
| 461 | #[test] | ||
| 462 | fn test_clone_url_to_relay_url_http() { | ||
| 463 | let url = "http://127.0.0.1:8080/repo.git"; | ||
| 464 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 465 | assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string())); | ||
| 466 | } | ||
| 467 | |||
| 468 | #[test] | ||
| 469 | fn test_clone_url_to_relay_url_https() { | ||
| 470 | let url = "https://example.com/repo.git"; | ||
| 471 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 472 | assert_eq!(relay, Some("wss://example.com".to_string())); | ||
| 473 | } | ||
| 474 | |||
| 475 | #[test] | ||
| 476 | fn test_clone_url_to_relay_url_invalid() { | ||
| 477 | let url = "not-a-valid-url"; | ||
| 478 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 479 | assert_eq!(relay, None); | ||
| 480 | } | ||
| 481 | |||
| 482 | #[test] | ||
| 483 | fn test_get_batch_window_default() { | ||
| 484 | // Clear env var if set | ||
| 485 | std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); | ||
| 486 | let window = SelfSubscriber::get_batch_window(); | ||
| 487 | assert_eq!(window, Duration::from_secs(5)); | ||
| 488 | } | ||
| 489 | |||
| 490 | #[test] | ||
| 491 | fn test_get_batch_window_from_env() { | ||
| 492 | std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200"); | ||
| 493 | let window = SelfSubscriber::get_batch_window(); | ||
| 494 | assert_eq!(window, Duration::from_millis(200)); | ||
| 495 | std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); | ||
| 496 | } | ||
| 497 | } | ||