From 7e68b71558c8f6d3f2aa1d3bf18e77eec335343d Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 9 Dec 2025 09:59:01 +0000 Subject: sync initalize from db --- src/sync/mod.rs | 386 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 379 insertions(+), 7 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index aa34490..71f91e2 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -39,11 +39,13 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; use nostr_sdk::EventId; use tokio::sync::RwLock; use crate::config::Config; use crate::nostr::builder::Nip34WritePolicy; +use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; use crate::nostr::SharedDatabase; // ============================================================================= @@ -263,6 +265,191 @@ impl SyncManager { &self.sync_relays } + // ========================================================================= + // Phase 2: Database Initialization + // ========================================================================= + + /// Initialize sync state from database queries at startup. + /// + /// This method performs two database queries: + /// 1. Query kinds 1617/1618/1619/1621 to build `following_repo_root_events` + /// 2. Query kind 30617 to build `sync_relays` + /// + /// The bootstrap relay (if configured) is always added to `sync_relays`. + /// + /// # Errors + /// + /// Returns an error if database queries fail. + pub async fn initialize_from_database(&self) -> Result<(), String> { + // Initialize bootstrap relay if configured (never removed) + if let Some(bootstrap_url) = &self.bootstrap_relay_url { + self.sync_relays.write().await.insert( + bootstrap_url.clone(), + HashMap::new(), // Repos potentially populated below but may stay empty (Layer 1 only) + ); + tracing::info!("Added bootstrap relay to sync_relays: {}", bootstrap_url); + } + + // Query 1: Build following_repo_root_events + // Find all 1617/1618/1619/1621 events and extract their repo references + let root_event_kinds = vec![ + Kind::GitPatch, // 1617 + Kind::from(KIND_PR), // 1618 + Kind::from(KIND_PR_UPDATE), // 1619 + Kind::GitIssue, // 1621 + ]; + + let filter = Filter::new().kinds(root_event_kinds); + let root_events = self + .database + .query(filter) + .await + .map_err(|e| format!("Failed to query root events: {}", e))?; + + let mut root_events_count = 0; + for event in root_events { + // An event may have multiple 'a' tags pointing to different repos + let repo_refs = Self::extract_all_repo_refs(&event); + for repo_ref in repo_refs { + self.following_repo_root_events + .write() + .await + .entry(repo_ref) + .or_default() + .insert(event.id); + root_events_count += 1; + } + } + tracing::info!( + "Populated following_repo_root_events with {} repo-event mappings", + root_events_count + ); + + // Query 2: Build sync_relays from kind 30617 announcements + let announcement_filter = Filter::new().kind(Kind::from(KIND_REPOSITORY_ANNOUNCEMENT)); + let announcements = self + .database + .query(announcement_filter) + .await + .map_err(|e| format!("Failed to query announcements: {}", e))?; + + let mut sync_relays_count = 0; + for event in announcements { + let repo_ref = Self::build_repo_ref(&event); + let relay_urls = Self::extract_relay_urls(&event); + + // Only track repos that list BOTH a remote relay AND our service + if self.lists_our_service(&event) { + for relay_url in relay_urls { + if !self.is_own_relay(&relay_url) { + // Get events for this repo from following_repo_root_events + let events = self + .following_repo_root_events + .read() + .await + .get(&repo_ref) + .cloned() + .unwrap_or_default(); + + self.sync_relays + .write() + .await + .entry(relay_url) + .or_default() + .insert(repo_ref.clone(), events); + sync_relays_count += 1; + } + } + } + } + tracing::info!( + "Populated sync_relays with {} relay-repo mappings", + sync_relays_count + ); + + Ok(()) + } + + // ========================================================================= + // Helper Methods for Event Extraction + // ========================================================================= + + /// Extract ALL repo refs from an event (it may tag multiple repos). + /// + /// Looks for 'a' tags that reference kind 30617 (repository announcements). + /// Returns refs in format "30617:pubkey:identifier". + pub fn extract_all_repo_refs(event: &Event) -> Vec { + event + .tags + .iter() + .filter_map(|tag| { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "a" { + // Validate it's a 30617 reference + if tag_vec[1].starts_with("30617:") { + Some(tag_vec[1].clone()) + } else { + None + } + } else { + None + } + }) + .collect() + } + + /// Build a repo ref string from a 30617 announcement event. + /// + /// Returns format "30617:pubkey:identifier". + pub fn build_repo_ref(event: &Event) -> String { + // Extract 'd' tag for identifier + let identifier = event + .tags + .iter() + .find(|tag| tag.kind() == TagKind::d()) + .and_then(|tag| tag.content()) + .map(|s| s.to_string()) + .unwrap_or_default(); + + format!("30617:{}:{}", event.pubkey.to_hex(), identifier) + } + + /// Extract relay URLs from a repository announcement event. + /// + /// Looks for the 'relays' tag and returns all relay URLs. + pub fn extract_relay_urls(event: &Event) -> Vec { + event + .tags + .iter() + .filter(|tag| matches!(tag.kind(), TagKind::Relays)) + .flat_map(|tag| { + let vec = tag.clone().to_vec(); + // Skip first element (tag name), rest are values + vec.into_iter().skip(1) + }) + .collect() + } + + /// Check if event lists our service in the relays tag. + /// + /// Compares relay URLs against our service domain. + fn lists_our_service(&self, event: &Event) -> bool { + let relay_urls = Self::extract_relay_urls(event); + relay_urls.iter().any(|url| self.is_own_relay(url)) + } + + /// Check if a relay URL matches our relay. + /// + /// Compares the URL against our service domain. + fn is_own_relay(&self, relay_url: &str) -> bool { + // Normalize comparison: check if URL contains our domain + relay_url.contains(&self.service_domain) + } + + // ========================================================================= + // Main Run Loop + // ========================================================================= + /// Runs the sync manager main loop. /// /// This method should be called in a spawned task: @@ -277,22 +464,35 @@ impl SyncManager { /// /// This is a stub that logs and then waits indefinitely. /// Full implementation includes: - /// - Phase 2: Database initialization queries + /// - Phase 2: Database initialization queries ✓ /// - Phase 3: Self-subscription for incremental updates /// - Phase 4-6: Filter building, connection management /// - Phase 7: Full sync loop pub async fn run(self) { tracing::info!( - "SyncManager stub started (bootstrap_relay={:?}, domain={})", + "SyncManager starting (bootstrap_relay={:?}, domain={})", self.bootstrap_relay_url, self.service_domain ); - tracing::info!( - "Phase 1 data structures initialized: following_repo_root_events, sync_relays" - ); + // Phase 2: Initialize from database + if let Err(e) = self.initialize_from_database().await { + tracing::error!("Failed to initialize sync state from database: {}", e); + // Continue anyway - we can still receive events via self-subscription + } + + // Log initialization results + { + let following_count = self.following_repo_root_events.read().await.len(); + let sync_relays_count = self.sync_relays.read().await.len(); + tracing::info!( + "Sync state initialized: {} repos tracked, {} sync relays", + following_count, + sync_relays_count + ); + } - // Stub: just wait indefinitely until full implementation + // Stub: wait indefinitely until full implementation (Phases 3-7) // This prevents the spawned task from immediately completing loop { tokio::time::sleep(std::time::Duration::from_secs(3600)).await; @@ -318,6 +518,178 @@ pub use metrics::{event_source, SyncMetrics}; #[cfg(test)] mod tests { use super::*; + use nostr_relay_builder::prelude::{EventBuilder, Keys, Tag}; + + /// Helper to create a test event with specific tags + fn create_test_event(kind: Kind, tags: Vec) -> Event { + let keys = Keys::generate(); + EventBuilder::new(kind, "test content") + .tags(tags) + .sign_with_keys(&keys) + .expect("Failed to sign test event") + } + + // ========================================================================= + // Tests for extract_all_repo_refs + // ========================================================================= + + #[test] + fn test_extract_all_repo_refs_single_ref() { + let event = create_test_event( + Kind::GitPatch, + vec![Tag::custom( + nostr_relay_builder::prelude::TagKind::custom("a"), + vec!["30617:abc123def456:my-project"], + )], + ); + + let refs = SyncManager::extract_all_repo_refs(&event); + assert_eq!(refs.len(), 1); + assert_eq!(refs[0], "30617:abc123def456:my-project"); + } + + #[test] + fn test_extract_all_repo_refs_multiple_refs() { + let event = create_test_event( + Kind::GitPatch, + vec![ + Tag::custom( + nostr_relay_builder::prelude::TagKind::custom("a"), + vec!["30617:abc123:project1"], + ), + Tag::custom( + nostr_relay_builder::prelude::TagKind::custom("a"), + vec!["30617:def456:project2"], + ), + ], + ); + + let refs = SyncManager::extract_all_repo_refs(&event); + assert_eq!(refs.len(), 2); + assert!(refs.contains(&"30617:abc123:project1".to_string())); + assert!(refs.contains(&"30617:def456:project2".to_string())); + } + + #[test] + fn test_extract_all_repo_refs_ignores_non_30617() { + let event = create_test_event( + Kind::GitPatch, + vec![ + Tag::custom( + nostr_relay_builder::prelude::TagKind::custom("a"), + vec!["30617:abc123:valid-repo"], + ), + Tag::custom( + nostr_relay_builder::prelude::TagKind::custom("a"), + vec!["30618:def456:state-event"], // Not a repo ref + ), + ], + ); + + let refs = SyncManager::extract_all_repo_refs(&event); + assert_eq!(refs.len(), 1); + assert_eq!(refs[0], "30617:abc123:valid-repo"); + } + + #[test] + fn test_extract_all_repo_refs_empty_when_no_a_tags() { + let event = create_test_event( + Kind::GitPatch, + vec![Tag::custom( + nostr_relay_builder::prelude::TagKind::custom("e"), + vec!["some-event-id"], + )], + ); + + let refs = SyncManager::extract_all_repo_refs(&event); + assert!(refs.is_empty()); + } + + // ========================================================================= + // Tests for build_repo_ref + // ========================================================================= + + #[test] + fn test_build_repo_ref() { + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::from(30617_u16), "announcement") + .tags(vec![Tag::custom( + nostr_relay_builder::prelude::TagKind::d(), + vec!["my-identifier"], + )]) + .sign_with_keys(&keys) + .expect("Failed to sign test event"); + + let repo_ref = SyncManager::build_repo_ref(&event); + assert!(repo_ref.starts_with("30617:")); + assert!(repo_ref.ends_with(":my-identifier")); + assert!(repo_ref.contains(&event.pubkey.to_hex())); + } + + #[test] + fn test_build_repo_ref_empty_identifier() { + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::from(30617_u16), "announcement") + .sign_with_keys(&keys) + .expect("Failed to sign test event"); + + let repo_ref = SyncManager::build_repo_ref(&event); + assert!(repo_ref.starts_with("30617:")); + assert!(repo_ref.ends_with(":")); // Empty identifier + } + + // ========================================================================= + // Tests for extract_relay_urls + // ========================================================================= + + #[test] + fn test_extract_relay_urls_single() { + let event = create_test_event( + Kind::from(30617_u16), + vec![Tag::custom( + nostr_relay_builder::prelude::TagKind::Relays, + vec!["wss://relay.example.com"], + )], + ); + + let urls = SyncManager::extract_relay_urls(&event); + assert_eq!(urls.len(), 1); + assert_eq!(urls[0], "wss://relay.example.com"); + } + + #[test] + fn test_extract_relay_urls_multiple() { + let event = create_test_event( + Kind::from(30617_u16), + vec![Tag::custom( + nostr_relay_builder::prelude::TagKind::Relays, + vec!["wss://relay1.example.com", "wss://relay2.example.com"], + )], + ); + + let urls = SyncManager::extract_relay_urls(&event); + assert_eq!(urls.len(), 2); + assert!(urls.contains(&"wss://relay1.example.com".to_string())); + assert!(urls.contains(&"wss://relay2.example.com".to_string())); + } + + #[test] + fn test_extract_relay_urls_empty_when_no_relays_tag() { + let event = create_test_event( + Kind::from(30617_u16), + vec![Tag::custom( + nostr_relay_builder::prelude::TagKind::custom("d"), + vec!["my-project"], + )], + ); + + let urls = SyncManager::extract_relay_urls(&event); + assert!(urls.is_empty()); + } + + // ========================================================================= + // Original data structure tests + // ========================================================================= #[tokio::test] async fn test_following_repo_root_events_basic_operations() { @@ -391,4 +763,4 @@ mod tests { let guard = state.read().await; assert!(guard.contains_key("30617:writer:repo")); } -} \ No newline at end of file +} -- cgit v1.2.3