From e6c4524dd1c15ea6d948c8c5630a9e9de392d989 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Sat, 10 Jan 2026 00:39:57 +0000 Subject: fix: move state events from Layer 1 to identifier-based filters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes kind 30618 (state events) from Layer 1 announcement filter and adds targeted subscriptions using #d (identifier) tags in Layer 2. Problem: Layer 1 was receiving ALL state events from all relays, causing 1000+ rejections for repositories we don't host. Solution: - Remove Kind::RepoState from build_announcement_filter (Layer 1) - Add state_event_filters_for_our_repos() function that creates filters with kind 30618 and #d tags for only our hosted repo identifiers - Integrate state filters into build_layer2_and_layer3_filters - Extract unique identifiers from repo refs and batch by 100 per filter Benefits: - Dramatically reduces bandwidth and rejection noise (1000+ → ~0) - More efficient: one filter with multiple identifiers vs broadcast - Only receive state events for repositories we actually care about Resolves: work/active-issues/layer1-state-event-oversubscription.md --- src/sync/filters.rs | 138 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 130 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/sync/filters.rs b/src/sync/filters.rs index c4e20e7..3592489 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs @@ -13,15 +13,15 @@ use std::collections::HashSet; use nostr_sdk::prelude::*; -/// Layer 1: Announcements filter (kinds 30617 + 30618 + 10317) +/// Layer 1: Announcements filter (kinds 30617 + 10317) /// /// Subscribed ONCE on connect - NOT included in consolidation rebuilds. -/// Note: 30618 is ONLY synced from remote relays, not self-subscribed. +/// Note: State events (30618) are now subscribed via identifier-based filters in Layer 2 +/// to avoid receiving state events for repositories we don't host. /// Note: 10317 (User Grasp List) is synced for better GRASP discovery. pub fn build_announcement_filter(since: Option) -> Filter { let filter = Filter::new().kinds([ Kind::GitRepoAnnouncement, // Repository announcements - Kind::RepoState, // Repository state Kind::GitUserGraspList, // User Grasp List ]); @@ -31,6 +31,64 @@ pub fn build_announcement_filter(since: Option) -> Filter { } } +/// State event filters for our hosted repositories +/// +/// Subscribes to kind 30618 state events using #d (identifier) tags. +/// This is more efficient than Layer 1 broadcast subscription because: +/// - Only receives state events for repos we actually host +/// - One filter can include multiple identifiers (batched per 100) +/// - Avoids 1000+ rejections for repos we don't care about +/// +/// # Arguments +/// * `repos` - Set of repo addressable refs (format: 30617:pubkey:identifier) +/// * `since` - Optional timestamp for incremental sync +/// +/// # Returns +/// Vec of filters, one filter per 100-identifier chunk +pub fn state_event_filters_for_our_repos( + repos: &HashSet, + since: Option, +) -> Vec { + if repos.is_empty() { + return vec![]; + } + + // Extract unique identifiers from addressable refs + let mut identifiers: HashSet = HashSet::new(); + for repo_ref in repos { + // Format: 30617:pubkey:identifier + if let Some(identifier) = repo_ref.split(':').nth(2) { + identifiers.insert(identifier.to_string()); + } + } + + if identifiers.is_empty() { + return vec![]; + } + + let mut filters = Vec::new(); + let identifier_vec: Vec<_> = identifiers.iter().collect(); + + // Batch by 100 identifiers per filter + for chunk in identifier_vec.chunks(100) { + let mut filter = Filter::new().kind(Kind::RepoState); // kind 30618 + + // Add #d tags for all identifiers in this chunk + for identifier in chunk { + filter = + filter.custom_tag(SingleLetterTag::lowercase(Alphabet::D), identifier.as_str()); + } + + if let Some(ts) = since { + filter = filter.since(ts); + } + + filters.push(filter); + } + + filters +} + /// Layer 2: Events tagging one of our repos /// /// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. @@ -166,6 +224,11 @@ pub fn tagged_one_of_our_root_event_filters( /// - compute_actions for incremental subscriptions /// - consolidation rebuilds (Layer 1 remains active) /// +/// Includes: +/// - State event filters (kind 30618 with #d tags for our repo identifiers) +/// - Repo-tagging filters (a/A/q tags) +/// - Root event filters (e/E/q tags) +/// /// # Arguments /// * `repos` - Set of repo addressable refs /// * `root_events` - Set of root event IDs @@ -176,6 +239,7 @@ pub fn build_layer2_and_layer3_filters( since: Option, ) -> Vec { let mut filters = Vec::new(); + filters.extend(state_event_filters_for_our_repos(repos, since)); filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); filters @@ -315,8 +379,8 @@ mod tests { let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); - // Should have 6 filters (3 for repos + 3 for root events) - assert_eq!(filters.len(), 6); + // Should have 7 filters (1 state + 3 for repos + 3 for root events) + assert_eq!(filters.len(), 7); } #[test] @@ -328,8 +392,8 @@ mod tests { let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); - // Should have 3 filters (3 for repos only) - assert_eq!(filters.len(), 3); + // Should have 4 filters (1 state + 3 for repos only) + assert_eq!(filters.len(), 4); } #[test] @@ -356,6 +420,64 @@ mod tests { let since = Timestamp::from(1700000000); let filters = build_layer2_and_layer3_filters(&repos, &root_events, Some(since)); - assert_eq!(filters.len(), 6); + // Should have 7 filters (1 state + 3 for repos + 3 for root events) + assert_eq!(filters.len(), 7); + } + + #[test] + fn test_state_event_filters_empty() { + let repos: HashSet = HashSet::new(); + let filters = state_event_filters_for_our_repos(&repos, None); + + assert!(filters.is_empty()); + } + + #[test] + fn test_state_event_filters_single_repo() { + let mut repos = HashSet::new(); + repos.insert("30617:abc123:test-repo".to_string()); + + let filters = state_event_filters_for_our_repos(&repos, None); + + // Should create 1 filter with kind 30618 and #d tag + assert_eq!(filters.len(), 1); + } + + #[test] + fn test_state_event_filters_batching() { + let mut repos = HashSet::new(); + for i in 0..250 { + repos.insert(format!("30617:pubkey{}:repo{}", i, i)); + } + + let filters = state_event_filters_for_our_repos(&repos, None); + + // Should create 3 filters (250 identifiers = 100 + 100 + 50 = 3 chunks) + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_state_event_filters_deduplicates_identifiers() { + let mut repos = HashSet::new(); + // Same identifier with different pubkeys + repos.insert("30617:pubkey1:same-repo".to_string()); + repos.insert("30617:pubkey2:same-repo".to_string()); + repos.insert("30617:pubkey3:same-repo".to_string()); + + let filters = state_event_filters_for_our_repos(&repos, None); + + // Should create 1 filter with deduplicated identifier + assert_eq!(filters.len(), 1); + } + + #[test] + fn test_state_event_filters_with_since() { + let mut repos = HashSet::new(); + repos.insert("30617:abc123:test-repo".to_string()); + + let since = Timestamp::from(1700000000); + let filters = state_event_filters_for_our_repos(&repos, Some(since)); + + assert_eq!(filters.len(), 1); } } -- cgit v1.2.3