From 905ebd838a9ff8cc777cf3b3b6306066e8c177fc Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 26 Jan 2026 17:20:11 +0000 Subject: fix: load existing events from database on startup with two-pass queries Previously, SelfSubscriber only saw events returned by the WebSocket subscription to the local relay, which has limits on the number of events returned. This caused repos with announcements in the database to never get Layer 2/3 filters created, resulting in missing state events. Now, on startup, we query the database directly with two separate queries: 1. Query announcements (30617) to populate repo_sync_index 2. Query root events (1617/1618/1621) to create Layer 3 filters Both queries use .since(last_connected) if available for incremental loading on reconnect. Filters are created inline and made mutable to support the .since() clause, rather than using a shared create_event_filter() method. Fixes the issue where state events were missing for repos like cashbird and creative-space that had announcements in the database but weren't returned by the WebSocket subscription. --- src/sync/self_subscriber.rs | 167 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 144 insertions(+), 23 deletions(-) (limited to 'src/sync/self_subscriber.rs') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..e9505f1 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc}; +use crate::nostr::builder::SharedDatabase; + use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; // ============================================================================= @@ -98,6 +100,8 @@ pub struct SelfSubscriber { action_tx: mpsc::Sender, /// Last time we connected - used for since filter on reconnect last_connected: Option, + /// Database for querying existing events on startup + database: SharedDatabase, } impl SelfSubscriber { @@ -108,11 +112,13 @@ impl SelfSubscriber { /// * `relay_domain` - Our service domain (used for filtering relevant repos) /// * `repo_sync_index` - Shared index to update with discovered repos /// * `action_tx` - Channel to send AddFilters actions to the SyncManager + /// * `database` - Database for querying existing events on startup pub fn new( own_relay_url: String, relay_domain: String, repo_sync_index: RepoSyncIndex, action_tx: mpsc::Sender, + database: SharedDatabase, ) -> Self { Self { own_relay_url, @@ -120,6 +126,7 @@ impl SelfSubscriber { repo_sync_index, action_tx, last_connected: None, + database, } } @@ -135,6 +142,127 @@ impl SelfSubscriber { .unwrap_or(Duration::from_millis(5000)) } + /// Load existing events from database on startup + /// + /// Queries the database with two separate queries to build the initial + /// PendingUpdates state. This ensures all repos get Layer 2/3 filters + /// created, not just those returned by the WebSocket subscription + /// (which has limits on the number of events returned). + /// + /// Query order: + /// 1. First query: Get announcements (30617) to populate repo_sync_index + /// with repos and their relays + /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() + /// to add root event IDs for Layer 3 filter creation + /// + /// Both queries use `.since(last_connected)` if available for incremental + /// loading on reconnect. + /// + /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. + async fn load_existing_events(&self) -> PendingUpdates { + let mut pending = PendingUpdates::new(); + + // Log whether this is a full or incremental load + if let Some(since) = self.last_connected { + tracing::info!( + since = %since, + "Loading events incrementally from database (reconnect)" + ); + } else { + tracing::info!("Loading all events from database (first connection)"); + } + + // First query: Get announcements to populate repo_sync_index + let mut announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); + if let Some(timestamp) = self.last_connected { + announcement_filter = announcement_filter.since(timestamp); + } + + let announcements = match self.database.query(announcement_filter).await { + Ok(events) => { + tracing::info!( + count = events.len(), + "Loaded announcements from database" + ); + events + } + Err(e) => { + tracing::error!( + error = %e, + "Failed to query announcements from database" + ); + return pending; + } + }; + + // Process announcements + let mut announcements_loaded = 0; + for event in announcements.iter() { + if let Some(repo_id) = Self::extract_repo_id(event) { + let relays = Self::extract_relay_urls(event); + pending.add_repo(repo_id, relays, HashSet::new()); + announcements_loaded += 1; + } + } + + // Update repo_sync_index with announcements BEFORE querying root events + { + let mut index = self.repo_sync_index.write().await; + for (repo_id, needs) in &pending.repos { + let entry = index + .entry(repo_id.clone()) + .or_insert_with(|| RepoSyncNeeds { + relays: HashSet::new(), + root_events: HashSet::new(), + }); + entry.relays.extend(needs.relays.clone()); + } + } + + // Second query: Get root events for handle_root_event() + let mut root_filter = Filter::new().kinds(vec![ + Kind::GitPatch, + Kind::GitIssue, + Kind::GitPullRequest, + ]); + if let Some(timestamp) = self.last_connected { + root_filter = root_filter.since(timestamp); + } + + let root_events = match self.database.query(root_filter).await { + Ok(events) => { + tracing::info!( + count = events.len(), + "Loaded root events from database" + ); + events + } + Err(e) => { + tracing::error!( + error = %e, + "Failed to query root events from database" + ); + // Continue with just announcements + return pending; + } + }; + + // Process root events + let mut root_events_processed = 0; + for event in root_events.iter() { + self.handle_root_event(event, &mut pending).await; + root_events_processed += 1; + } + + tracing::info!( + announcements_loaded = announcements_loaded, + root_events_processed = root_events_processed, + "Processed existing events from database" + ); + + pending + } + /// Process a relay pool notification /// /// Handles incoming events from the subscription, queueing 30617 announcements @@ -276,33 +404,22 @@ impl SelfSubscriber { // Subscribe to announcement and root event kinds // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) // Plus kind 10317 (User Grasp List) for GRASP discovery - // Check if we have a last_connected time for reconnect filtering - let filter = if let Some(last) = self.last_connected { + let mut filter = Filter::new().kinds(vec![ + Kind::GitRepoAnnouncement, + Kind::GitPatch, + Kind::GitIssue, + Kind::GitPullRequest, + Kind::GitUserGraspList, + ]); + if let Some(timestamp) = self.last_connected { // Quick reconnect - use since filter (15 min buffer) - let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); + let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); tracing::debug!( since = %since, "Using since filter for reconnect" ); - Filter::new() - .kinds(vec![ - Kind::GitRepoAnnouncement, // Repository Announcements - Kind::GitPatch, // Patches - Kind::GitIssue, // Issues - Kind::GitPullRequest, // Pull Requests - Kind::GitUserGraspList, // User Grasp List - ]) - .since(since) - } else { - // First connection - no since filter - Filter::new().kinds(vec![ - Kind::GitRepoAnnouncement, // Repository Announcements - Kind::GitPatch, // Patches - Kind::GitIssue, // Issues - Kind::GitPullRequest, // Pull Requests - Kind::GitUserGraspList, // User Grasp List - ]) - }; + filter = filter.since(since); + } // Update last_connected AFTER creating filter but BEFORE subscribing self.last_connected = Some(Timestamp::now()); @@ -323,7 +440,11 @@ impl SelfSubscriber { let mut notifications = client.notifications(); let batch_window = Self::get_batch_window(); - let mut pending = PendingUpdates::new(); + + // Load existing events from database on startup + // This ensures all repos get Layer 2/3 filters created, not just those + // returned by the WebSocket subscription (which has limits) + let mut pending = self.load_existing_events().await; // Timer does NOT reset on new events - use interval let mut timer = tokio::time::interval(batch_window); -- cgit v1.2.3 From 6e5b7eb84b3ca8a902ac4bcbab9c2a9f9ecdee51 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 27 Jan 2026 09:16:41 +0000 Subject: fix(sync): Remove .since() filter from database queries in load_existing_events() Root cause: `last_connected` was set to Timestamp::now() BEFORE load_existing_events() was called (line 425), causing the database query to filter out all existing events with .since(current_time). The query became: SELECT * FROM events WHERE created_at >= Result: 0 events returned (nothing has created_at in the future) Solution: Remove .since() filter from database queries entirely. The `last_connected` field is now only used for WebSocket subscription filters to avoid re-fetching events from remote relays on reconnect. Rationale for this approach over reordering operations: - Database queries are fast (indexed by kind and created_at) - Loading all events on startup ensures consistency - Eliminates subtle ordering dependency that could break in refactoring - Cleaner mental model: database = full load, WebSocket = incremental This fixes the issue where ~190 state events weren't being fetched after deploying the database query fix (commit 4162c90). Evidence: Production logs showed "Loaded announcements from database count=0" when there should have been hundreds of announcements. --- src/sync/self_subscriber.rs | 42 ++++++++---------------------------------- 1 file changed, 8 insertions(+), 34 deletions(-) (limited to 'src/sync/self_subscriber.rs') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index e9505f1..86e4583 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -155,35 +155,18 @@ impl SelfSubscriber { /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() /// to add root event IDs for Layer 3 filter creation /// - /// Both queries use `.since(last_connected)` if available for incremental - /// loading on reconnect. - /// /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. async fn load_existing_events(&self) -> PendingUpdates { let mut pending = PendingUpdates::new(); - // Log whether this is a full or incremental load - if let Some(since) = self.last_connected { - tracing::info!( - since = %since, - "Loading events incrementally from database (reconnect)" - ); - } else { - tracing::info!("Loading all events from database (first connection)"); - } + tracing::info!("Loading all events from database"); - // First query: Get announcements to populate repo_sync_index - let mut announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); - if let Some(timestamp) = self.last_connected { - announcement_filter = announcement_filter.since(timestamp); - } + // First query: Get all announcements to populate repo_sync_index + let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); let announcements = match self.database.query(announcement_filter).await { Ok(events) => { - tracing::info!( - count = events.len(), - "Loaded announcements from database" - ); + tracing::info!(count = events.len(), "Loaded announcements from database"); events } Err(e) => { @@ -219,22 +202,13 @@ impl SelfSubscriber { } } - // Second query: Get root events for handle_root_event() - let mut root_filter = Filter::new().kinds(vec![ - Kind::GitPatch, - Kind::GitIssue, - Kind::GitPullRequest, - ]); - if let Some(timestamp) = self.last_connected { - root_filter = root_filter.since(timestamp); - } + // Second query: Get all root events for handle_root_event() + let root_filter = + Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]); let root_events = match self.database.query(root_filter).await { Ok(events) => { - tracing::info!( - count = events.len(), - "Loaded root events from database" - ); + tracing::info!(count = events.len(), "Loaded root events from database"); events } Err(e) => { -- cgit v1.2.3