From 39e782b12fce1776f2ad0b0f5430749533cb80ea Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 11:07:50 +0000 Subject: sync v4 mvp --- src/sync/algorithms.rs | 589 +++++++++++++++++++++++++++++++++++++++++++ src/sync/filters.rs | 340 +++++++++++++++++++++++++ src/sync/mod.rs | 301 +++++++++++++++++++++- src/sync/relay_connection.rs | 216 ++++++++++++++++ src/sync/self_subscriber.rs | 430 +++++++++++++++++++++++++++++++ 5 files changed, 1867 insertions(+), 9 deletions(-) create mode 100644 src/sync/algorithms.rs create mode 100644 src/sync/filters.rs create mode 100644 src/sync/relay_connection.rs create mode 100644 src/sync/self_subscriber.rs (limited to 'src/sync') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs new file mode 100644 index 0000000..7d87411 --- /dev/null +++ b/src/sync/algorithms.rs @@ -0,0 +1,589 @@ +//! Core Sync Algorithms for Proactive Sync +//! +//! This module provides the decision-making algorithms for the sync system: +//! +//! - `derive_relay_targets()` - Inverts RepoSyncIndex to per-relay view +//! - `compute_actions()` - Three-way diff to determine new sync actions +//! +//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. + +use std::collections::{HashMap, HashSet}; + +use nostr_sdk::prelude::*; + +use super::{ConnectionStatus, PendingBatch, RelayState}; + +// ============================================================================= +// Data Structures +// ============================================================================= + +/// Relay-centric view of what needs syncing +/// +/// This is the inverted view of `RepoSyncNeeds` - instead of "what relays does +/// this repo need to sync from", it's "what repos does this relay need to sync". +#[derive(Debug, Clone, Default)] +pub struct RelaySyncNeeds { + /// Repos that need to be synced from this relay + pub repos: HashSet, + /// Root events that need to be tracked from this relay + pub root_events: HashSet, +} + +/// Action to add filters to a relay +/// +/// Produced by `compute_actions()` to describe incremental sync work needed. +#[derive(Debug)] +pub struct AddFilters { + /// The relay URL to add filters to + pub relay_url: String, + /// Repos being synced in this action + pub repos: HashSet, + /// Root events being tracked in this action + pub root_events: HashSet, + /// The actual filters to subscribe with + pub filters: Vec, +} + +// ============================================================================= +// Core Algorithms +// ============================================================================= + +/// Inverts RepoSyncIndex to per-relay view +/// +/// Takes the repo-centric index (repo -> {relays, root_events}) and inverts it +/// to a relay-centric view (relay -> {repos, root_events}). +/// +/// # Arguments +/// * `repo_index` - Map of repo addressable refs to their sync needs +/// +/// # Returns +/// Map of relay URLs to the combined sync needs from all repos +pub fn derive_relay_targets( + repo_index: &HashMap, +) -> HashMap { + let mut relay_targets: HashMap = HashMap::new(); + + for (repo_id, needs) in repo_index { + for relay_url in &needs.relays { + let entry = relay_targets + .entry(relay_url.clone()) + .or_insert_with(RelaySyncNeeds::default); + + entry.repos.insert(repo_id.clone()); + entry.root_events.extend(needs.root_events.iter().cloned()); + } + } + + relay_targets +} + +/// Three-way diff: target - pending - confirmed = new +/// +/// Computes what sync actions are needed by comparing: +/// 1. What we want (targets) +/// 2. What's already in-flight (pending) +/// 3. What's already confirmed (confirmed) +/// +/// Only creates AddFilters actions for items not already pending or confirmed. +/// +/// # Arguments +/// * `targets` - Per-relay sync needs (from `derive_relay_targets`) +/// * `pending` - In-flight batches per relay +/// * `confirmed` - Confirmed relay states +/// +/// # Returns +/// Vec of AddFilters actions for new sync work +pub fn compute_actions( + targets: &HashMap, + pending: &HashMap>, + confirmed: &HashMap, +) -> Vec { + use crate::sync::filters::build_layer2_and_layer3_filters; + + let mut actions = Vec::new(); + + for (relay_url, target_needs) in targets { + // Skip disconnected relays + if let Some(state) = confirmed.get(relay_url) { + if matches!(state.connection_status, ConnectionStatus::Disconnected) { + continue; + } + } + + // Calculate what's already pending + let pending_repos: HashSet = pending + .get(relay_url) + .map(|batches| { + batches + .iter() + .flat_map(|batch| batch.items.repos.iter().cloned()) + .collect() + }) + .unwrap_or_default(); + + let pending_events: HashSet = pending + .get(relay_url) + .map(|batches| { + batches + .iter() + .flat_map(|batch| batch.items.root_events.iter().cloned()) + .collect() + }) + .unwrap_or_default(); + + // Calculate what's already confirmed + let confirmed_repos: HashSet = confirmed + .get(relay_url) + .map(|state| state.repos.clone()) + .unwrap_or_default(); + + let confirmed_events: HashSet = confirmed + .get(relay_url) + .map(|state| state.root_events.clone()) + .unwrap_or_default(); + + // Calculate what's NEW (not in pending, not in confirmed) + let new_repos: HashSet = target_needs + .repos + .difference(&pending_repos) + .filter(|repo| !confirmed_repos.contains(*repo)) + .cloned() + .collect(); + + let new_events: HashSet = target_needs + .root_events + .difference(&pending_events) + .filter(|event| !confirmed_events.contains(*event)) + .cloned() + .collect(); + + // If there's anything new, create an AddFilters action + if !new_repos.is_empty() || !new_events.is_empty() { + let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); + + actions.push(AddFilters { + relay_url: relay_url.clone(), + repos: new_repos, + root_events: new_events, + filters, + }); + } + } + + actions +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; + + // ========================================================================= + // derive_relay_targets tests + // ========================================================================= + + #[test] + fn test_derive_relay_targets_empty() { + let repo_index = HashMap::new(); + let targets = derive_relay_targets(&repo_index); + assert!(targets.is_empty()); + } + + #[test] + fn test_derive_relay_targets_single_repo_single_relay() { + let mut repo_index = HashMap::new(); + let mut relays = HashSet::new(); + relays.insert("wss://relay1.com".to_string()); + + let mut root_events = HashSet::new(); + root_events.insert(EventId::all_zeros()); + + repo_index.insert( + "repo1".to_string(), + ModRepoSyncNeeds { + relays, + root_events, + }, + ); + + let targets = derive_relay_targets(&repo_index); + + assert_eq!(targets.len(), 1); + let relay_needs = targets.get("wss://relay1.com").unwrap(); + assert_eq!(relay_needs.repos.len(), 1); + assert!(relay_needs.repos.contains("repo1")); + assert_eq!(relay_needs.root_events.len(), 1); + } + + #[test] + fn test_derive_relay_targets_multiple_repos_same_relay() { + let mut repo_index = HashMap::new(); + + for i in 1..=3 { + let mut relays = HashSet::new(); + relays.insert("wss://relay1.com".to_string()); + + repo_index.insert( + format!("repo{}", i), + ModRepoSyncNeeds { + relays, + root_events: HashSet::new(), + }, + ); + } + + let targets = derive_relay_targets(&repo_index); + + assert_eq!(targets.len(), 1); + let relay_needs = targets.get("wss://relay1.com").unwrap(); + assert_eq!(relay_needs.repos.len(), 3); + } + + #[test] + fn test_derive_relay_targets_repo_across_multiple_relays() { + let mut repo_index = HashMap::new(); + let mut relays = HashSet::new(); + relays.insert("wss://relay1.com".to_string()); + relays.insert("wss://relay2.com".to_string()); + + repo_index.insert( + "repo1".to_string(), + ModRepoSyncNeeds { + relays, + root_events: HashSet::new(), + }, + ); + + let targets = derive_relay_targets(&repo_index); + + assert_eq!(targets.len(), 2); + assert!(targets + .get("wss://relay1.com") + .unwrap() + .repos + .contains("repo1")); + assert!(targets + .get("wss://relay2.com") + .unwrap() + .repos + .contains("repo1")); + } + + #[test] + fn test_derive_relay_targets_combines_root_events() { + let mut repo_index = HashMap::new(); + + // Repo1 has one root event + let mut relays1 = HashSet::new(); + relays1.insert("wss://relay1.com".to_string()); + let mut root_events1 = HashSet::new(); + root_events1.insert(EventId::all_zeros()); + + repo_index.insert( + "repo1".to_string(), + ModRepoSyncNeeds { + relays: relays1, + root_events: root_events1, + }, + ); + + // Repo2 also points to same relay but should have same event combined + let mut relays2 = HashSet::new(); + relays2.insert("wss://relay1.com".to_string()); + let mut root_events2 = HashSet::new(); + root_events2.insert(EventId::all_zeros()); // Same event + + repo_index.insert( + "repo2".to_string(), + ModRepoSyncNeeds { + relays: relays2, + root_events: root_events2, + }, + ); + + let targets = derive_relay_targets(&repo_index); + + assert_eq!(targets.len(), 1); + let relay_needs = targets.get("wss://relay1.com").unwrap(); + assert_eq!(relay_needs.repos.len(), 2); + // Root events should be deduplicated + assert_eq!(relay_needs.root_events.len(), 1); + } + + // ========================================================================= + // compute_actions tests + // ========================================================================= + + #[test] + fn test_compute_actions_empty() { + let targets = HashMap::new(); + let pending = HashMap::new(); + let confirmed = HashMap::new(); + + let actions = compute_actions(&targets, &pending, &confirmed); + assert!(actions.is_empty()); + } + + #[test] + fn test_compute_actions_skips_disconnected() { + let mut targets = HashMap::new(); + targets.insert( + "wss://relay1.com".to_string(), + RelaySyncNeeds { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + ); + + let pending = HashMap::new(); + + let mut confirmed = HashMap::new(); + confirmed.insert( + "wss://relay1.com".to_string(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Disconnected, + last_connected: None, + disconnected_at: None, + }, + ); + + let actions = compute_actions(&targets, &pending, &confirmed); + assert!(actions.is_empty(), "Should skip disconnected relays"); + } + + #[test] + fn test_compute_actions_new_repo() { + let mut targets = HashMap::new(); + targets.insert( + "wss://relay1.com".to_string(), + RelaySyncNeeds { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + ); + + let pending = HashMap::new(); + let confirmed = HashMap::new(); + + let actions = compute_actions(&targets, &pending, &confirmed); + + assert_eq!(actions.len(), 1); + let action = &actions[0]; + assert_eq!(action.relay_url, "wss://relay1.com"); + assert!(action.repos.contains("repo1")); + assert!(!action.filters.is_empty()); + } + + #[test] + fn test_compute_actions_excludes_pending() { + let mut targets = HashMap::new(); + targets.insert( + "wss://relay1.com".to_string(), + RelaySyncNeeds { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + ); + + let mut pending = HashMap::new(); + pending.insert( + "wss://relay1.com".to_string(), + vec![super::super::PendingBatch { + batch_id: 1, + items: super::super::PendingItems { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + outstanding_subs: HashSet::new(), + }], + ); + + let confirmed = HashMap::new(); + + let actions = compute_actions(&targets, &pending, &confirmed); + assert!( + actions.is_empty(), + "Should not create action for pending items" + ); + } + + #[test] + fn test_compute_actions_excludes_confirmed() { + let mut targets = HashMap::new(); + targets.insert( + "wss://relay1.com".to_string(), + RelaySyncNeeds { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + ); + + let pending = HashMap::new(); + + let mut confirmed = HashMap::new(); + confirmed.insert( + "wss://relay1.com".to_string(), + RelayState { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connected, + last_connected: None, + disconnected_at: None, + }, + ); + + let actions = compute_actions(&targets, &pending, &confirmed); + assert!( + actions.is_empty(), + "Should not create action for confirmed items" + ); + } + + #[test] + fn test_compute_actions_allows_connecting_relays() { + let mut targets = HashMap::new(); + targets.insert( + "wss://relay1.com".to_string(), + RelaySyncNeeds { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + ); + + let pending = HashMap::new(); + + let mut confirmed = HashMap::new(); + confirmed.insert( + "wss://relay1.com".to_string(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connecting, + last_connected: None, + disconnected_at: None, + }, + ); + + let actions = compute_actions(&targets, &pending, &confirmed); + assert_eq!( + actions.len(), + 1, + "Should create action for connecting relays" + ); + } + + #[test] + fn test_compute_actions_partial_overlap() { + // Target has repo1, repo2, repo3 + let mut targets = HashMap::new(); + targets.insert( + "wss://relay1.com".to_string(), + RelaySyncNeeds { + repos: vec![ + "repo1".to_string(), + "repo2".to_string(), + "repo3".to_string(), + ] + .into_iter() + .collect(), + root_events: HashSet::new(), + }, + ); + + // repo1 is pending + let mut pending = HashMap::new(); + pending.insert( + "wss://relay1.com".to_string(), + vec![super::super::PendingBatch { + batch_id: 1, + items: super::super::PendingItems { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + outstanding_subs: HashSet::new(), + }], + ); + + // repo2 is confirmed + let mut confirmed = HashMap::new(); + confirmed.insert( + "wss://relay1.com".to_string(), + RelayState { + repos: vec!["repo2".to_string()].into_iter().collect(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connected, + last_connected: None, + disconnected_at: None, + }, + ); + + let actions = compute_actions(&targets, &pending, &confirmed); + + assert_eq!(actions.len(), 1); + let action = &actions[0]; + // Only repo3 should be in the action (repo1 pending, repo2 confirmed) + assert_eq!(action.repos.len(), 1); + assert!(action.repos.contains("repo3")); + assert!(!action.repos.contains("repo1")); + assert!(!action.repos.contains("repo2")); + } + + #[test] + fn test_compute_actions_with_root_events() { + let event_id = EventId::all_zeros(); + + let mut targets = HashMap::new(); + targets.insert( + "wss://relay1.com".to_string(), + RelaySyncNeeds { + repos: HashSet::new(), + root_events: vec![event_id].into_iter().collect(), + }, + ); + + let pending = HashMap::new(); + let confirmed = HashMap::new(); + + let actions = compute_actions(&targets, &pending, &confirmed); + + assert_eq!(actions.len(), 1); + let action = &actions[0]; + assert!(action.repos.is_empty()); + assert_eq!(action.root_events.len(), 1); + assert!(action.root_events.contains(&event_id)); + // Should have 3 filters for the root event (e, E, q tags) + assert_eq!(action.filters.len(), 3); + } + + #[test] + fn test_compute_actions_unknown_relay_creates_action() { + // When a relay is not in confirmed at all, it should still create an action + // (it's treated as connected by default if missing from confirmed) + let mut targets = HashMap::new(); + targets.insert( + "wss://new-relay.com".to_string(), + RelaySyncNeeds { + repos: vec!["repo1".to_string()].into_iter().collect(), + root_events: HashSet::new(), + }, + ); + + let pending = HashMap::new(); + let confirmed = HashMap::new(); // relay not in confirmed + + let actions = compute_actions(&targets, &pending, &confirmed); + + assert_eq!( + actions.len(), + 1, + "Should create action for unknown relay (not yet tracked)" + ); + assert_eq!(actions[0].relay_url, "wss://new-relay.com"); + } +} \ No newline at end of file diff --git a/src/sync/filters.rs b/src/sync/filters.rs new file mode 100644 index 0000000..02d580e --- /dev/null +++ b/src/sync/filters.rs @@ -0,0 +1,340 @@ +//! Filter Building Functions for Proactive Sync +//! +//! This module provides functions to construct Nostr filters for the three-layer +//! sync strategy defined in GRASP-02 v4: +//! +//! - Layer 1: Repository announcements (30617 + 30618) +//! - Layer 2: Events tagging our repos (a/A/q tags) +//! - Layer 3: Events tagging our root events (e/E/q tags) +//! +//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. + +use std::collections::HashSet; + +use nostr_sdk::prelude::*; + +/// Layer 1: Announcements filter (kinds 30617 + 30618) +/// +/// Subscribed ONCE on connect - NOT included in consolidation rebuilds. +/// Note: 30618 is ONLY synced from remote relays, not self-subscribed. +pub fn build_announcement_filter(since: Option) -> Filter { + let filter = Filter::new().kinds([ + Kind::Custom(30617), // Repository announcements + Kind::Custom(30618), // Maintainer lists + ]); + + match since { + Some(ts) => filter.since(ts), + None => filter, + } +} + +/// Layer 2: Events tagging one of our repos +/// +/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. +/// Batched per 100 repo refs. +/// +/// # Arguments +/// * `repos` - Set of repo addressable refs (format: 30617:pubkey:identifier) +/// * `since` - Optional timestamp for incremental sync +/// +/// # Returns +/// Vec of filters, one set of 3 filters (a/A/q) per 100-repo chunk +pub fn tagged_one_of_our_repo_event_filters( + repos: &HashSet, + since: Option, +) -> Vec { + if repos.is_empty() { + return vec![]; + } + + let mut filters = Vec::new(); + let repo_refs: Vec<_> = repos.iter().collect(); + + for chunk in repo_refs.chunks(100) { + // Lowercase 'a' tag - standard addressable reference + let mut f1 = Filter::new(); + for repo in chunk { + f1 = f1.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo.as_str()); + } + + // Uppercase 'A' tag - some clients use this + let mut f2 = Filter::new(); + for repo in chunk { + f2 = f2.custom_tag(SingleLetterTag::uppercase(Alphabet::A), repo.as_str()); + } + + // Quote 'q' tag - NIP-10 quote references to addressable events + let mut f3 = Filter::new(); + for repo in chunk { + f3 = f3.custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo.as_str()); + } + + if let Some(ts) = since { + f1 = f1.since(ts); + f2 = f2.since(ts); + f3 = f3.since(ts); + } + + filters.push(f1); + filters.push(f2); + filters.push(f3); + } + + filters +} + +/// Layer 3: Events tagging one of our root events +/// +/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage. +/// Batched per 100 event IDs. +/// +/// # Arguments +/// * `root_events` - Set of event IDs (1617/1618/1619/1621 root events) +/// * `since` - Optional timestamp for incremental sync +/// +/// # Returns +/// Vec of filters, one set of 3 filters (e/E/q) per 100-event chunk +pub fn tagged_one_of_our_root_event_filters( + root_events: &HashSet, + since: Option, +) -> Vec { + if root_events.is_empty() { + return vec![]; + } + + let mut filters = Vec::new(); + let event_ids: Vec = root_events.iter().map(|id| id.to_hex()).collect(); + + for chunk in event_ids.chunks(100) { + // Lowercase 'e' tag - standard event reference + let mut f1 = Filter::new(); + for event_id in chunk { + f1 = f1.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.as_str()); + } + + // Uppercase 'E' tag - some clients use this + let mut f2 = Filter::new(); + for event_id in chunk { + f2 = f2.custom_tag(SingleLetterTag::uppercase(Alphabet::E), event_id.as_str()); + } + + // Quote 'q' tag - NIP-10 quote references to events + let mut f3 = Filter::new(); + for event_id in chunk { + f3 = f3.custom_tag(SingleLetterTag::lowercase(Alphabet::Q), event_id.as_str()); + } + + if let Some(ts) = since { + f1 = f1.since(ts); + f2 = f2.since(ts); + f3 = f3.since(ts); + } + + filters.push(f1); + filters.push(f2); + filters.push(f3); + } + + filters +} + +/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1) +/// +/// Used by: +/// - compute_actions for incremental subscriptions +/// - consolidation rebuilds (Layer 1 remains active) +/// +/// # Arguments +/// * `repos` - Set of repo addressable refs +/// * `root_events` - Set of root event IDs +/// * `since` - Optional timestamp for incremental sync +pub fn build_layer2_and_layer3_filters( + repos: &HashSet, + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); + filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); + filters +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_announcement_filter_no_since() { + let filter = build_announcement_filter(None); + + // Verify it includes both kinds + // Filter API: we can check by converting to JSON or inspecting structure + // For now we just verify it doesn't panic and returns a valid filter + assert!(!filter.is_empty()); + } + + #[test] + fn test_announcement_filter_with_since() { + let since = Timestamp::from(1700000000); + let filter = build_announcement_filter(Some(since)); + + assert!(!filter.is_empty()); + } + + #[test] + fn test_repo_filters_empty() { + let repos: HashSet = HashSet::new(); + let filters = tagged_one_of_our_repo_event_filters(&repos, None); + + assert!(filters.is_empty()); + } + + #[test] + fn test_repo_filters_single_repo() { + let mut repos = HashSet::new(); + repos.insert("30617:abc123:test-repo".to_string()); + + let filters = tagged_one_of_our_repo_event_filters(&repos, None); + + // Should create 3 filters (a, A, q) for one chunk + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_repo_filters_batching() { + let mut repos = HashSet::new(); + for i in 0..250 { + repos.insert(format!("30617:pubkey{}:repo{}", i, i)); + } + + let filters = tagged_one_of_our_repo_event_filters(&repos, None); + + // Should create 9 filters (3 chunks * 3 tag types) + // 250 repos = 100 + 100 + 50 = 3 chunks + assert_eq!(filters.len(), 9); + } + + #[test] + fn test_repo_filters_with_since() { + let mut repos = HashSet::new(); + repos.insert("30617:abc123:test-repo".to_string()); + + let since = Timestamp::from(1700000000); + let filters = tagged_one_of_our_repo_event_filters(&repos, Some(since)); + + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_root_event_filters_empty() { + let root_events: HashSet = HashSet::new(); + let filters = tagged_one_of_our_root_event_filters(&root_events, None); + + assert!(filters.is_empty()); + } + + #[test] + fn test_root_event_filters_single_event() { + let mut root_events = HashSet::new(); + // Create a valid event ID (all zeros for testing) + root_events.insert(EventId::all_zeros()); + + let filters = tagged_one_of_our_root_event_filters(&root_events, None); + + // Should create 3 filters (e, E, q) for one chunk + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_root_event_filters_batching() { + let mut root_events = HashSet::new(); + // EventId::all_zeros() will deduplicate, so we need unique IDs + // For testing purposes, we'll just verify with one ID since HashSet + // deduplicates all_zeros(). In real usage these would be unique. + for _ in 0..250 { + root_events.insert(EventId::all_zeros()); + } + + let filters = tagged_one_of_our_root_event_filters(&root_events, None); + + // With deduplication, we only have 1 unique ID, so 3 filters + // In real usage with 250 unique IDs, it would be 9 filters + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_root_event_filters_with_since() { + let mut root_events = HashSet::new(); + root_events.insert(EventId::all_zeros()); + + let since = Timestamp::from(1700000000); + let filters = tagged_one_of_our_root_event_filters(&root_events, Some(since)); + + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_combined_filters_empty() { + let repos: HashSet = HashSet::new(); + let root_events: HashSet = HashSet::new(); + + let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); + + assert!(filters.is_empty()); + } + + #[test] + fn test_combined_filters() { + let mut repos = HashSet::new(); + repos.insert("30617:abc123:repo1".to_string()); + + let mut root_events = HashSet::new(); + root_events.insert(EventId::all_zeros()); + + let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); + + // Should have 6 filters (3 for repos + 3 for root events) + assert_eq!(filters.len(), 6); + } + + #[test] + fn test_combined_filters_repos_only() { + let mut repos = HashSet::new(); + repos.insert("30617:abc123:repo1".to_string()); + + let root_events: HashSet = HashSet::new(); + + let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); + + // Should have 3 filters (3 for repos only) + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_combined_filters_root_events_only() { + let repos: HashSet = HashSet::new(); + + let mut root_events = HashSet::new(); + root_events.insert(EventId::all_zeros()); + + let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); + + // Should have 3 filters (3 for root events only) + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_combined_filters_with_since() { + let mut repos = HashSet::new(); + repos.insert("30617:abc123:repo1".to_string()); + + let mut root_events = HashSet::new(); + root_events.insert(EventId::all_zeros()); + + let since = Timestamp::from(1700000000); + let filters = build_layer2_and_layer3_filters(&repos, &root_events, Some(since)); + + assert_eq!(filters.len(), 6); + } +} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c1f8bca..fb09896 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -12,6 +12,20 @@ //! //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. +pub mod algorithms; +pub mod filters; +pub mod relay_connection; +pub mod self_subscriber; + +// Re-export core algorithm types +pub use algorithms::{AddFilters, RelaySyncNeeds}; + +// Re-export relay connection types +pub use relay_connection::{RelayConnection, RelayEvent}; + +// Re-export self-subscriber types +pub use self_subscriber::{RelayAction, SelfSubscriber}; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -355,21 +369,290 @@ impl SyncManager { } } - /// Run the sync manager (placeholder for Phase 1) + /// Run the sync manager /// - /// This will be implemented in later phases to: - /// 1. Subscribe to local relay for 30617 events - /// 2. Process events to build RepoSyncIndex - /// 3. Compute and execute sync actions - /// 4. Handle reconnection and catch-up logic + /// Coordinates all sync components: + /// 1. Spawns self-subscriber to monitor own relay for announcements + /// 2. Connects to bootstrap relay if configured + /// 3. Handles relay actions from self-subscriber pub async fn run(self) { + use tokio::sync::mpsc; + tracing::info!( bootstrap_relay = ?self.bootstrap_relay_url, service_domain = %self.service_domain, - "SyncManager starting (placeholder - not yet implemented)" + "SyncManager starting" + ); + + // 1. Create action channel for self-subscriber -> manager communication + let (action_tx, mut action_rx) = mpsc::channel::(100); + + // 2. Spawn self-subscriber + let self_subscriber = SelfSubscriber::new( + format!("ws://{}", self.service_domain), + self.service_domain.clone(), + Arc::clone(&self.repo_sync_index), + action_tx, ); + tokio::spawn(async move { self_subscriber.run().await }); + + // 3. Connect to bootstrap relay if configured + if let Some(ref bootstrap_url) = self.bootstrap_relay_url { + self.spawn_relay_connection(bootstrap_url.clone()).await; + } - // Phase 1: Just log and return - // Full implementation will be added in subsequent phases + // 4. Main loop - handle actions from self-subscriber + loop { + tokio::select! { + action = action_rx.recv() => { + match action { + Some(RelayAction::SpawnRelay { relay_url, repos }) => { + // Check if relay already exists + let relay_index = self.relay_sync_index.read().await; + let exists = relay_index.contains_key(&relay_url); + drop(relay_index); + + if !exists { + tracing::info!(relay = %relay_url, "Spawning new relay connection"); + self.spawn_relay_with_layer2(relay_url, repos).await; + } else { + tracing::debug!( + relay = %relay_url, + "Relay already exists, considering AddFilters" + ); + // For MVP, we don't handle AddFilters - just log + // Full implementation would call subscribe_filters on existing connection + } + } + Some(RelayAction::AddFilters { relay_url, repos }) => { + tracing::debug!( + relay = %relay_url, + repo_count = repos.len(), + "AddFilters action (MVP: not implemented)" + ); + // For MVP, not implemented - full version would add Layer 2 filters + // to existing relay connection + } + None => break, + } + } + } + } + } + + /// Spawn relay connection with Layer 2 filters for specific repos + /// + /// Used when discovering relays from announcements. Connects to the relay, + /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the + /// specific repos we want to sync. + async fn spawn_relay_with_layer2( + &self, + relay_url: String, + repos: HashMap>, + ) { + use crate::sync::filters::build_layer2_and_layer3_filters; + use tokio::sync::mpsc; + + let database = Arc::clone(&self.database); + let write_policy = self.write_policy.clone(); + let relay_sync_index = Arc::clone(&self.relay_sync_index); + + // Create relay connection + let connection = RelayConnection::new(relay_url.clone()); + + // Connect and subscribe to Layer 1 (announcements) + if let Err(e) = connection.connect_and_subscribe(None).await { + tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); + return; + } + + // Mark as connected in relay sync index + { + let mut index = relay_sync_index.write().await; + index.insert( + relay_url.clone(), + RelayState { + repos: repos.keys().cloned().collect(), + root_events: repos.values().flatten().cloned().collect(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connected, + last_connected: Some(Timestamp::now()), + disconnected_at: None, + }, + ); + } + + // Subscribe to Layer 2+3 filters for the repos + let repo_ids: HashSet = repos.keys().cloned().collect(); + let root_events: HashSet = repos.values().flatten().cloned().collect(); + let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None); + + for filter in filters { + if let Err(e) = connection.subscribe_filter(filter).await { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to subscribe to Layer 2 filter" + ); + } + } + + tracing::info!( + relay = %relay_url, + repo_count = repos.len(), + "Connected to discovered relay with Layer 2+3 filters" + ); + + // Create event channel + let (event_tx, mut event_rx) = mpsc::channel::(1000); + + // Spawn event loop + tokio::spawn(async move { + connection.run_event_loop(event_tx).await; + }); + + // Spawn event processor + let relay_url_clone = relay_url.clone(); + tokio::spawn(async move { + while let Some(relay_event) = event_rx.recv().await { + match relay_event { + RelayEvent::Event(event) => { + Self::process_event_static( + &event, + &relay_url_clone, + &database, + &write_policy, + ) + .await; + } + RelayEvent::EndOfStoredEvents(_) => { + tracing::debug!(relay = %relay_url_clone, "EOSE received"); + } + RelayEvent::Closed(_) | RelayEvent::Shutdown => { + tracing::info!(relay = %relay_url_clone, "Relay disconnected"); + break; + } + } + } + }); + } + + /// Spawn a relay connection and start its event loop + async fn spawn_relay_connection(&self, relay_url: String) { + use tokio::sync::mpsc; + + let database = Arc::clone(&self.database); + let write_policy = self.write_policy.clone(); + let relay_sync_index = Arc::clone(&self.relay_sync_index); + + // Create relay connection + let connection = RelayConnection::new(relay_url.clone()); + + // Connect and subscribe to Layer 1 + if let Err(e) = connection.connect_and_subscribe(None).await { + tracing::error!("Failed to connect to relay {}: {}", relay_url, e); + return; + } + + // Mark as connected in relay sync index + { + let mut index = relay_sync_index.write().await; + index.insert( + relay_url.clone(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: true, + connection_status: ConnectionStatus::Connected, + last_connected: Some(Timestamp::now()), + disconnected_at: None, + }, + ); + } + + // Create event channel + let (event_tx, mut event_rx) = mpsc::channel::(1000); + + // Spawn event loop + tokio::spawn(async move { + connection.run_event_loop(event_tx).await; + }); + + // Spawn event processor + let relay_url_clone = relay_url.clone(); + tokio::spawn(async move { + while let Some(relay_event) = event_rx.recv().await { + match relay_event { + RelayEvent::Event(event) => { + Self::process_event_static(&event, &relay_url_clone, &database, &write_policy) + .await; + } + RelayEvent::EndOfStoredEvents(_) => { + tracing::debug!("EOSE from {}", relay_url_clone); + } + RelayEvent::Closed(_) | RelayEvent::Shutdown => { + tracing::info!("Relay {} disconnected", relay_url_clone); + break; + } + } + } + }); + } + + /// Process a single event from a relay (static version for spawned tasks) + async fn process_event_static( + event: &Event, + relay_url: &str, + database: &SharedDatabase, + write_policy: &Nip34WritePolicy, + ) { + use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + // Check if event already exists + match database.event_by_id(&event.id).await { + Ok(Some(_)) => { + tracing::trace!(event_id = %event.id, "Event already exists, skipping"); + return; + } + Err(e) => { + tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); + return; + } + Ok(None) => {} // Continue processing + } + + // Apply write policy using a dummy address (sync events aren't from network clients) + let dummy_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); + let result = write_policy.admit_event(event, &dummy_addr).await; + + match result { + PolicyResult::Accept => { + // Save event + if let Err(e) = database.save_event(event).await { + tracing::error!( + event_id = %event.id, + relay = %relay_url, + error = %e, + "Failed to save synced event" + ); + } else { + tracing::debug!( + event_id = %event.id, + relay = %relay_url, + kind = %event.kind.as_u16(), + "Saved synced event" + ); + } + } + PolicyResult::Reject(reason) => { + tracing::debug!( + event_id = %event.id, + relay = %relay_url, + reason = %reason, + "Event rejected by write policy" + ); + } + } } } \ No newline at end of file diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs new file mode 100644 index 0000000..6499c27 --- /dev/null +++ b/src/sync/relay_connection.rs @@ -0,0 +1,216 @@ +//! Relay Connection Management for Proactive Sync +//! +//! This module provides relay connection management for external relay connections. +//! Each RelayConnection manages a single connection to an external relay and handles +//! subscriptions using the three-layer sync strategy. +//! +//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; + +use super::filters::build_announcement_filter; + +/// Events from a relay connection +#[derive(Debug)] +pub enum RelayEvent { + /// A new event was received + Event(Event), + /// End of stored events for a subscription + EndOfStoredEvents(SubscriptionId), + /// Connection was closed + Closed(String), + /// Shutdown notification + Shutdown, +} + +/// Manages connection to a single external relay +/// +/// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection +/// to an external relay. It handles: +/// - Connection establishment +/// - Layer 1 subscription (announcements) +/// - Additional filter subscriptions (Layers 2 & 3) +/// - Event notification loop +pub struct RelayConnection { + /// The relay URL this connection is for + url: String, + /// The underlying nostr-sdk client + client: Client, +} + +impl RelayConnection { + /// Create a new relay connection (not yet connected) + /// + /// # Arguments + /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com") + pub fn new(url: String) -> Self { + let client = Client::default(); + Self { url, client } + } + + /// Connect to the relay and subscribe to Layer 1 (announcements) + /// + /// This method: + /// 1. Adds the relay to the client + /// 2. Establishes the WebSocket connection + /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618) + /// + /// # Arguments + /// * `since` - Optional timestamp for incremental sync on reconnect + /// + /// # Returns + /// * `Ok(SubscriptionId)` - The subscription ID on successful connection + /// * `Err(String)` with error description on failure + pub async fn connect_and_subscribe( + &self, + since: Option, + ) -> Result { + // Add relay to client + self.client + .add_relay(&self.url) + .await + .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; + + // Establish connection + self.client.connect().await; + + // Subscribe to Layer 1 (announcements) + let filter = build_announcement_filter(since); + let output = self + .client + .subscribe(filter, None) + .await + .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?; + + tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); + Ok(output.val) + } + + /// Run the event loop, sending events through the provided channel + /// + /// This method blocks and processes notifications from the relay: + /// - `RelayPoolNotification::Event` -> sends `RelayEvent::Event` + /// - `RelayPoolNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents` + /// - `RelayPoolNotification::Shutdown` -> sends `RelayEvent::Shutdown` + /// + /// The loop terminates when: + /// - The sender channel is closed (receiver dropped) + /// - A shutdown notification is received + /// - An error occurs receiving notifications + /// + /// # Arguments + /// * `event_sender` - Channel to send relay events through + pub async fn run_event_loop(self, event_sender: mpsc::Sender) { + let mut notifications = self.client.notifications(); + let url = self.url.clone(); + + tracing::debug!(relay = %url, "Starting event loop"); + + while let Ok(notification) = notifications.recv().await { + match notification { + RelayPoolNotification::Event { event, .. } => { + tracing::trace!(relay = %url, event_id = %event.id, "Received event"); + if event_sender.send(RelayEvent::Event(*event)).await.is_err() { + tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); + break; + } + } + RelayPoolNotification::Message { message, .. } => { + match message { + RelayMessage::EndOfStoredEvents(sub_id) => { + tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); + // Convert Cow to owned SubscriptionId + let owned_sub_id = sub_id.into_owned(); + if event_sender + .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) + .await + .is_err() + { + tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); + break; + } + } + RelayMessage::Closed { message: msg, .. } => { + tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); + let _ = event_sender + .send(RelayEvent::Closed(msg.to_string())) + .await; + } + _ => {} + } + } + RelayPoolNotification::Shutdown => { + tracing::info!(relay = %url, "Relay pool shutdown"); + let _ = event_sender.send(RelayEvent::Shutdown).await; + break; + } + } + } + + tracing::debug!(relay = %url, "Event loop terminated"); + } + + /// Add additional filter subscription (for Layer 2 + 3) + /// + /// Use this to subscribe to: + /// - Layer 2: Events tagging our repos (a/A/q tags) + /// - Layer 3: Events tagging our root events (e/E/q tags) + /// + /// # Arguments + /// * `filter` - The filter to subscribe to + /// + /// # Returns + /// * `Ok(SubscriptionId)` - The subscription ID on success + /// * `Err(String)` - Error description on failure + pub async fn subscribe_filter(&self, filter: Filter) -> Result { + let output = self + .client + .subscribe(filter, None) + .await + .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; + Ok(output.val) + } + + /// Subscribe to multiple filters at once + /// + /// Each filter creates its own subscription. Returns when all subscriptions + /// are established. This is useful for Layer 2 + 3 filters together. + /// + /// # Arguments + /// * `filters` - Vec of filters to subscribe to + /// + /// # Returns + /// * `Ok(Vec)` - The subscription IDs on success + /// * `Err(String)` - Error description on failure + pub async fn subscribe_filters( + &self, + filters: Vec, + ) -> Result, String> { + if filters.is_empty() { + return Ok(vec![]); + } + + let mut sub_ids = Vec::with_capacity(filters.len()); + for filter in filters { + let output = self + .client + .subscribe(filter, None) + .await + .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; + sub_ids.push(output.val); + } + Ok(sub_ids) + } + + /// Get the relay URL + pub fn url(&self) -> &str { + &self.url + } + + /// Disconnect from the relay + pub async fn disconnect(&self) { + self.client.disconnect().await; + tracing::debug!(relay = %self.url, "Disconnected from relay"); + } +} \ No newline at end of file diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs new file mode 100644 index 0000000..1dec219 --- /dev/null +++ b/src/sync/self_subscriber.rs @@ -0,0 +1,430 @@ +//! Self-Subscriber for Proactive Sync +//! +//! Monitors the relay's own database for repository announcements and +//! updates the RepoSyncIndex when new relevant events are discovered. +//! +//! This module subscribes to relevant event kinds on our own relay and +//! batches updates before sending them to the SyncManager. +//! +//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. + +use std::collections::{HashMap, HashSet}; +use std::time::Duration; + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; + +use super::{RepoSyncIndex, RepoSyncNeeds}; + +// ============================================================================= +// RelayAction - Actions to send to SyncManager +// ============================================================================= + +/// Actions that the SelfSubscriber sends to the SyncManager +#[derive(Debug)] +pub enum RelayAction { + /// Spawn a new relay connection + SpawnRelay { + /// The relay URL to connect to + relay_url: String, + /// Repos to sync, mapped to their root event IDs + repos: HashMap>, + }, + /// Add filters to an existing relay connection + AddFilters { + /// The relay URL to add filters to + relay_url: String, + /// Repos to sync, mapped to their root event IDs + repos: HashMap>, + }, +} + +// ============================================================================= +// PendingUpdates - Accumulator for batching +// ============================================================================= + +/// Accumulates updates between batch timer firings +struct PendingUpdates { + /// Repos discovered since last batch, keyed by repo addressable ref + repos: HashMap, +} + +impl PendingUpdates { + /// Create a new empty pending updates accumulator + fn new() -> Self { + Self { + repos: HashMap::new(), + } + } + + /// Add or update a repo with its relays and root events + fn add_repo(&mut self, repo_id: String, relays: HashSet, root_events: HashSet) { + let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { + relays: HashSet::new(), + root_events: HashSet::new(), + }); + entry.relays.extend(relays); + entry.root_events.extend(root_events); + } + + /// Check if there are any pending updates + fn is_empty(&self) -> bool { + self.repos.is_empty() + } + + /// Take all pending updates, leaving empty + fn take(&mut self) -> HashMap { + std::mem::take(&mut self.repos) + } +} + +// ============================================================================= +// SelfSubscriber - Main Component +// ============================================================================= + +/// Subscribes to own relay's events to discover repos needing sync +/// +/// The SelfSubscriber connects to our own relay and monitors for: +/// - 30617 (Repository Announcements) - to discover repos listing our relay +/// - 1617 (Patches) - root events referencing repos +/// - 1618 (Issues) - root events referencing repos +/// - 1619 (Replies) - root events referencing repos +/// - 1621 (PRs) - root events referencing repos +/// +/// Note: 30618 is NOT subscribed to here (per v4 spec - only synced from remote relays) +pub struct SelfSubscriber { + /// Our own relay URL (to connect to) + own_relay_url: String, + /// Our service domain (for filtering relevant repos) + relay_domain: String, + /// Shared index of repos to sync + repo_sync_index: RepoSyncIndex, + /// Channel to send actions to SyncManager + action_tx: mpsc::Sender, +} + +impl SelfSubscriber { + /// Create a new SelfSubscriber + /// + /// # Arguments + /// * `own_relay_url` - The WebSocket URL of our own relay + /// * `relay_domain` - Our service domain (used for filtering relevant repos) + /// * `repo_sync_index` - Shared index to update with discovered repos + /// * `action_tx` - Channel to send RelayActions to the SyncManager + pub fn new( + own_relay_url: String, + relay_domain: String, + repo_sync_index: RepoSyncIndex, + action_tx: mpsc::Sender, + ) -> Self { + Self { + own_relay_url, + relay_domain, + repo_sync_index, + action_tx, + } + } + + /// Get batch window from environment or use default + /// + /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. + /// Default: 5000ms (5 seconds) + fn get_batch_window() -> Duration { + std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") + .ok() + .and_then(|s| s.parse::().ok()) + .map(Duration::from_millis) + .unwrap_or(Duration::from_millis(5000)) + } + + /// Extract relay URLs from event tags + /// + /// Extracts URLs from: + /// - `relays` tags: ["relays", "wss://relay1.com", "wss://relay2.com", ...] + /// - `clone` tags: ["clone", "https://example.com/repo.git", ...] (converted to ws://) + fn extract_relay_urls(event: &Event) -> HashSet { + let mut relays = HashSet::new(); + + for tag in event.tags.iter() { + let tag_vec = tag.as_slice(); + if tag_vec.is_empty() { + continue; + } + + match tag_vec[0].as_str() { + "relays" => { + // All subsequent values are relay URLs + for url in tag_vec.iter().skip(1) { + relays.insert(url.to_string()); + } + } + "clone" if tag_vec.len() >= 2 => { + // Convert http(s) clone URL to ws(s) relay URL + if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) { + relays.insert(relay_url); + } + } + _ => {} + } + } + + relays + } + + /// Extract repo identifier from event + /// + /// For kind 30617, uses the `d` tag to build the addressable reference + /// Format: 30617:pubkey:identifier + fn extract_repo_id(event: &Event) -> Option { + // For kind 30617, extract d tag and build addressable ref + if event.kind == Kind::Custom(30617) { + for tag in event.tags.iter() { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + return Some(format!("30617:{}:{}", event.pubkey, tag_vec[1])); + } + } + } + + // For other kinds (1617, 1618, 1619, 1621), we'd need to look at + // their 'a' tags to find which repo they belong to. + // That processing happens in the batch processing, not here. + None + } + + /// Check if announcement lists our relay + /// + /// Returns true if any extracted relay URL contains our domain + fn lists_our_relay(&self, event: &Event) -> bool { + Self::extract_relay_urls(event).iter().any(|url| { + url.contains(&self.relay_domain) || url == &self.own_relay_url + }) + } + + /// Main run loop + /// + /// Connects to own relay, subscribes to relevant event kinds, + /// and batches updates before processing them. + pub async fn run(self) { + let client = Client::default(); + + // Add own relay + if let Err(e) = client.add_relay(&self.own_relay_url).await { + tracing::error!( + url = %self.own_relay_url, + error = %e, + "Failed to add own relay for self-subscription" + ); + return; + } + + // Connect + client.connect().await; + + // Subscribe to announcement and root event kinds + // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618) + let filter = Filter::new().kinds(vec![ + Kind::Custom(30617), // Repository Announcements + Kind::Custom(1617), // Patches + Kind::Custom(1618), // Issues + Kind::Custom(1619), // Replies/Status + Kind::Custom(1621), // Pull Requests + ]); + + if let Err(e) = client.subscribe(filter, None).await { + tracing::error!( + error = %e, + "Failed to subscribe to own relay for self-subscription" + ); + return; + } + + tracing::info!( + url = %self.own_relay_url, + domain = %self.relay_domain, + "SelfSubscriber started" + ); + + let mut notifications = client.notifications(); + let batch_window = Self::get_batch_window(); + let mut pending = PendingUpdates::new(); + + // Timer does NOT reset on new events - use interval + let mut timer = tokio::time::interval(batch_window); + timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + notification = notifications.recv() => { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + // Only process 30617 events that list our relay + if event.kind == Kind::Custom(30617) { + if !self.lists_our_relay(&event) { + continue; + } + + // Extract repo ID and relays + if let Some(repo_id) = Self::extract_repo_id(&event) { + let relays = Self::extract_relay_urls(&event); + let mut root_events = HashSet::new(); + root_events.insert(event.id); + + pending.add_repo(repo_id, relays, root_events); + tracing::debug!( + event_id = %event.id, + "Queued 30617 announcement for batch processing" + ); + } + } else { + // For root event kinds (1617, 1618, 1619, 1621), + // we need to check if they reference repos we care about. + // For now, we'll track them in a simpler way. + // Full implementation would extract 'a' tag and match to known repos. + tracing::trace!( + kind = %event.kind, + event_id = %event.id, + "Received root event (processing deferred)" + ); + } + } + Ok(RelayPoolNotification::Shutdown) => { + tracing::info!("SelfSubscriber received shutdown notification"); + break; + } + Err(e) => { + tracing::error!(error = %e, "Error receiving notification"); + break; + } + _ => {} + } + } + _ = timer.tick() => { + if !pending.is_empty() { + self.process_batch(&mut pending).await; + } + } + } + } + + tracing::info!("SelfSubscriber stopped"); + } + + /// Process accumulated batch + /// + /// Updates the RepoSyncIndex with discovered repos, then derives per-relay + /// targets and sends RelayAction messages to the SyncManager. + async fn process_batch(&self, pending: &mut PendingUpdates) { + use crate::sync::algorithms::derive_relay_targets; + + let updates = pending.take(); + + if updates.is_empty() { + return; + } + + tracing::info!( + repo_count = updates.len(), + "Processing batch of repo updates" + ); + + // Update RepoSyncIndex + let mut index = self.repo_sync_index.write().await; + + for (repo_id, needs) in updates { + // Merge with existing entry or insert new + let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { + relays: HashSet::new(), + root_events: HashSet::new(), + }); + entry.relays.extend(needs.relays); + entry.root_events.extend(needs.root_events); + + tracing::debug!( + repo_id = %repo_id, + relay_count = entry.relays.len(), + event_count = entry.root_events.len(), + "Updated repo sync needs" + ); + } + + // Derive per-relay targets from the updated index + let targets = derive_relay_targets(&index); + drop(index); // Release lock before async operations + + // For each relay, send SpawnRelay action + // SyncManager will check if relay already exists + for (relay_url, needs) in targets { + // Skip our own relay URL (we're subscribed to ourselves via self-subscription) + if relay_url.contains(&self.relay_domain) { + continue; + } + + // Convert needs to HashMap> + let mut repos = HashMap::new(); + for repo_id in needs.repos { + repos.insert(repo_id, needs.root_events.clone()); + } + + let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos }; + + if let Err(e) = self.action_tx.send(action).await { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to send SpawnRelay action" + ); + } else { + tracing::debug!( + relay = %relay_url, + "Sent SpawnRelay action to SyncManager" + ); + } + } + } +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/// Convert clone URL to relay URL +/// +/// Converts http:// to ws:// and https:// to wss:// +/// Returns None for unsupported URL schemes +fn clone_url_to_relay_url(clone_url: &str) -> Option { + if clone_url.starts_with("http://") { + Some(clone_url.replacen("http://", "ws://", 1)) + } else if clone_url.starts_with("https://") { + Some(clone_url.replacen("https://", "wss://", 1)) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_clone_url_to_relay_url_https() { + assert_eq!( + clone_url_to_relay_url("https://example.com/repo.git"), + Some("wss://example.com/repo.git".to_string()) + ); + } + + #[test] + fn test_clone_url_to_relay_url_http() { + assert_eq!( + clone_url_to_relay_url("http://localhost:3000/repo.git"), + Some("ws://localhost:3000/repo.git".to_string()) + ); + } + + #[test] + fn test_clone_url_to_relay_url_unsupported() { + assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); + assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); + } +} \ No newline at end of file -- cgit v1.2.3