diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-26 17:20:11 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-27 20:38:16 +0000 |
| commit | 905ebd838a9ff8cc777cf3b3b6306066e8c177fc (patch) | |
| tree | 1c46ddded3b1fff935dfa80b44aeff862e624642 | |
| parent | 1ae97cd85aec95f6270f853b28e48774cefc6bf6 (diff) | |
fix: load existing events from database on startup with two-pass queries
Previously, SelfSubscriber only saw events returned by the WebSocket
subscription to the local relay, which has limits on the number of
events returned. This caused repos with announcements in the database
to never get Layer 2/3 filters created, resulting in missing state events.
Now, on startup, we query the database directly with two separate queries:
1. Query announcements (30617) to populate repo_sync_index
2. Query root events (1617/1618/1621) to create Layer 3 filters
Both queries use .since(last_connected) if available for incremental
loading on reconnect.
Filters are created inline and made mutable to support the .since()
clause, rather than using a shared create_event_filter() method.
Fixes the issue where state events were missing for repos like cashbird
and creative-space that had announcements in the database but weren't
returned by the WebSocket subscription.
| -rw-r--r-- | src/sync/mod.rs | 1 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 167 |
2 files changed, 145 insertions, 23 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index bc8c428..226e681 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1442,6 +1442,7 @@ impl SyncManager { | |||
| 1442 | self.service_domain.clone(), | 1442 | self.service_domain.clone(), |
| 1443 | Arc::clone(&self.repo_sync_index), | 1443 | Arc::clone(&self.repo_sync_index), |
| 1444 | action_tx, | 1444 | action_tx, |
| 1445 | self.database.clone(), | ||
| 1445 | ); | 1446 | ); |
| 1446 | let subscriber_shutdown = shutdown_tx.subscribe(); | 1447 | let subscriber_shutdown = shutdown_tx.subscribe(); |
| 1447 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); | 1448 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..e9505f1 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; | |||
| 16 | use tokio::sync::broadcast::error::RecvError; | 16 | use tokio::sync::broadcast::error::RecvError; |
| 17 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | ||
| 20 | |||
| 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 20 | 22 | ||
| 21 | // ============================================================================= | 23 | // ============================================================================= |
| @@ -98,6 +100,8 @@ pub struct SelfSubscriber { | |||
| 98 | action_tx: mpsc::Sender<AddFilters>, | 100 | action_tx: mpsc::Sender<AddFilters>, |
| 99 | /// Last time we connected - used for since filter on reconnect | 101 | /// Last time we connected - used for since filter on reconnect |
| 100 | last_connected: Option<Timestamp>, | 102 | last_connected: Option<Timestamp>, |
| 103 | /// Database for querying existing events on startup | ||
| 104 | database: SharedDatabase, | ||
| 101 | } | 105 | } |
| 102 | 106 | ||
| 103 | impl SelfSubscriber { | 107 | impl SelfSubscriber { |
| @@ -108,11 +112,13 @@ impl SelfSubscriber { | |||
| 108 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 112 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 109 | /// * `repo_sync_index` - Shared index to update with discovered repos | 113 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 110 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager | 114 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 115 | /// * `database` - Database for querying existing events on startup | ||
| 111 | pub fn new( | 116 | pub fn new( |
| 112 | own_relay_url: String, | 117 | own_relay_url: String, |
| 113 | relay_domain: String, | 118 | relay_domain: String, |
| 114 | repo_sync_index: RepoSyncIndex, | 119 | repo_sync_index: RepoSyncIndex, |
| 115 | action_tx: mpsc::Sender<AddFilters>, | 120 | action_tx: mpsc::Sender<AddFilters>, |
| 121 | database: SharedDatabase, | ||
| 116 | ) -> Self { | 122 | ) -> Self { |
| 117 | Self { | 123 | Self { |
| 118 | own_relay_url, | 124 | own_relay_url, |
| @@ -120,6 +126,7 @@ impl SelfSubscriber { | |||
| 120 | repo_sync_index, | 126 | repo_sync_index, |
| 121 | action_tx, | 127 | action_tx, |
| 122 | last_connected: None, | 128 | last_connected: None, |
| 129 | database, | ||
| 123 | } | 130 | } |
| 124 | } | 131 | } |
| 125 | 132 | ||
| @@ -135,6 +142,127 @@ impl SelfSubscriber { | |||
| 135 | .unwrap_or(Duration::from_millis(5000)) | 142 | .unwrap_or(Duration::from_millis(5000)) |
| 136 | } | 143 | } |
| 137 | 144 | ||
| 145 | /// Load existing events from database on startup | ||
| 146 | /// | ||
| 147 | /// Queries the database with two separate queries to build the initial | ||
| 148 | /// PendingUpdates state. This ensures all repos get Layer 2/3 filters | ||
| 149 | /// created, not just those returned by the WebSocket subscription | ||
| 150 | /// (which has limits on the number of events returned). | ||
| 151 | /// | ||
| 152 | /// Query order: | ||
| 153 | /// 1. First query: Get announcements (30617) to populate repo_sync_index | ||
| 154 | /// with repos and their relays | ||
| 155 | /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() | ||
| 156 | /// to add root event IDs for Layer 3 filter creation | ||
| 157 | /// | ||
| 158 | /// Both queries use `.since(last_connected)` if available for incremental | ||
| 159 | /// loading on reconnect. | ||
| 160 | /// | ||
| 161 | /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. | ||
| 162 | async fn load_existing_events(&self) -> PendingUpdates { | ||
| 163 | let mut pending = PendingUpdates::new(); | ||
| 164 | |||
| 165 | // Log whether this is a full or incremental load | ||
| 166 | if let Some(since) = self.last_connected { | ||
| 167 | tracing::info!( | ||
| 168 | since = %since, | ||
| 169 | "Loading events incrementally from database (reconnect)" | ||
| 170 | ); | ||
| 171 | } else { | ||
| 172 | tracing::info!("Loading all events from database (first connection)"); | ||
| 173 | } | ||
| 174 | |||
| 175 | // First query: Get announcements to populate repo_sync_index | ||
| 176 | let mut announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); | ||
| 177 | if let Some(timestamp) = self.last_connected { | ||
| 178 | announcement_filter = announcement_filter.since(timestamp); | ||
| 179 | } | ||
| 180 | |||
| 181 | let announcements = match self.database.query(announcement_filter).await { | ||
| 182 | Ok(events) => { | ||
| 183 | tracing::info!( | ||
| 184 | count = events.len(), | ||
| 185 | "Loaded announcements from database" | ||
| 186 | ); | ||
| 187 | events | ||
| 188 | } | ||
| 189 | Err(e) => { | ||
| 190 | tracing::error!( | ||
| 191 | error = %e, | ||
| 192 | "Failed to query announcements from database" | ||
| 193 | ); | ||
| 194 | return pending; | ||
| 195 | } | ||
| 196 | }; | ||
| 197 | |||
| 198 | // Process announcements | ||
| 199 | let mut announcements_loaded = 0; | ||
| 200 | for event in announcements.iter() { | ||
| 201 | if let Some(repo_id) = Self::extract_repo_id(event) { | ||
| 202 | let relays = Self::extract_relay_urls(event); | ||
| 203 | pending.add_repo(repo_id, relays, HashSet::new()); | ||
| 204 | announcements_loaded += 1; | ||
| 205 | } | ||
| 206 | } | ||
| 207 | |||
| 208 | // Update repo_sync_index with announcements BEFORE querying root events | ||
| 209 | { | ||
| 210 | let mut index = self.repo_sync_index.write().await; | ||
| 211 | for (repo_id, needs) in &pending.repos { | ||
| 212 | let entry = index | ||
| 213 | .entry(repo_id.clone()) | ||
| 214 | .or_insert_with(|| RepoSyncNeeds { | ||
| 215 | relays: HashSet::new(), | ||
| 216 | root_events: HashSet::new(), | ||
| 217 | }); | ||
| 218 | entry.relays.extend(needs.relays.clone()); | ||
| 219 | } | ||
| 220 | } | ||
| 221 | |||
| 222 | // Second query: Get root events for handle_root_event() | ||
| 223 | let mut root_filter = Filter::new().kinds(vec![ | ||
| 224 | Kind::GitPatch, | ||
| 225 | Kind::GitIssue, | ||
| 226 | Kind::GitPullRequest, | ||
| 227 | ]); | ||
| 228 | if let Some(timestamp) = self.last_connected { | ||
| 229 | root_filter = root_filter.since(timestamp); | ||
| 230 | } | ||
| 231 | |||
| 232 | let root_events = match self.database.query(root_filter).await { | ||
| 233 | Ok(events) => { | ||
| 234 | tracing::info!( | ||
| 235 | count = events.len(), | ||
| 236 | "Loaded root events from database" | ||
| 237 | ); | ||
| 238 | events | ||
| 239 | } | ||
| 240 | Err(e) => { | ||
| 241 | tracing::error!( | ||
| 242 | error = %e, | ||
| 243 | "Failed to query root events from database" | ||
| 244 | ); | ||
| 245 | // Continue with just announcements | ||
| 246 | return pending; | ||
| 247 | } | ||
| 248 | }; | ||
| 249 | |||
| 250 | // Process root events | ||
| 251 | let mut root_events_processed = 0; | ||
| 252 | for event in root_events.iter() { | ||
| 253 | self.handle_root_event(event, &mut pending).await; | ||
| 254 | root_events_processed += 1; | ||
| 255 | } | ||
| 256 | |||
| 257 | tracing::info!( | ||
| 258 | announcements_loaded = announcements_loaded, | ||
| 259 | root_events_processed = root_events_processed, | ||
| 260 | "Processed existing events from database" | ||
| 261 | ); | ||
| 262 | |||
| 263 | pending | ||
| 264 | } | ||
| 265 | |||
| 138 | /// Process a relay pool notification | 266 | /// Process a relay pool notification |
| 139 | /// | 267 | /// |
| 140 | /// Handles incoming events from the subscription, queueing 30617 announcements | 268 | /// Handles incoming events from the subscription, queueing 30617 announcements |
| @@ -276,33 +404,22 @@ impl SelfSubscriber { | |||
| 276 | // Subscribe to announcement and root event kinds | 404 | // Subscribe to announcement and root event kinds |
| 277 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) | 405 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) |
| 278 | // Plus kind 10317 (User Grasp List) for GRASP discovery | 406 | // Plus kind 10317 (User Grasp List) for GRASP discovery |
| 279 | // Check if we have a last_connected time for reconnect filtering | 407 | let mut filter = Filter::new().kinds(vec![ |
| 280 | let filter = if let Some(last) = self.last_connected { | 408 | Kind::GitRepoAnnouncement, |
| 409 | Kind::GitPatch, | ||
| 410 | Kind::GitIssue, | ||
| 411 | Kind::GitPullRequest, | ||
| 412 | Kind::GitUserGraspList, | ||
| 413 | ]); | ||
| 414 | if let Some(timestamp) = self.last_connected { | ||
| 281 | // Quick reconnect - use since filter (15 min buffer) | 415 | // Quick reconnect - use since filter (15 min buffer) |
| 282 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); | 416 | let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); |
| 283 | tracing::debug!( | 417 | tracing::debug!( |
| 284 | since = %since, | 418 | since = %since, |
| 285 | "Using since filter for reconnect" | 419 | "Using since filter for reconnect" |
| 286 | ); | 420 | ); |
| 287 | Filter::new() | 421 | filter = filter.since(since); |
| 288 | .kinds(vec![ | 422 | } |
| 289 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 290 | Kind::GitPatch, // Patches | ||
| 291 | Kind::GitIssue, // Issues | ||
| 292 | Kind::GitPullRequest, // Pull Requests | ||
| 293 | Kind::GitUserGraspList, // User Grasp List | ||
| 294 | ]) | ||
| 295 | .since(since) | ||
| 296 | } else { | ||
| 297 | // First connection - no since filter | ||
| 298 | Filter::new().kinds(vec![ | ||
| 299 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 300 | Kind::GitPatch, // Patches | ||
| 301 | Kind::GitIssue, // Issues | ||
| 302 | Kind::GitPullRequest, // Pull Requests | ||
| 303 | Kind::GitUserGraspList, // User Grasp List | ||
| 304 | ]) | ||
| 305 | }; | ||
| 306 | 423 | ||
| 307 | // Update last_connected AFTER creating filter but BEFORE subscribing | 424 | // Update last_connected AFTER creating filter but BEFORE subscribing |
| 308 | self.last_connected = Some(Timestamp::now()); | 425 | self.last_connected = Some(Timestamp::now()); |
| @@ -323,7 +440,11 @@ impl SelfSubscriber { | |||
| 323 | 440 | ||
| 324 | let mut notifications = client.notifications(); | 441 | let mut notifications = client.notifications(); |
| 325 | let batch_window = Self::get_batch_window(); | 442 | let batch_window = Self::get_batch_window(); |
| 326 | let mut pending = PendingUpdates::new(); | 443 | |
| 444 | // Load existing events from database on startup | ||
| 445 | // This ensures all repos get Layer 2/3 filters created, not just those | ||
| 446 | // returned by the WebSocket subscription (which has limits) | ||
| 447 | let mut pending = self.load_existing_events().await; | ||
| 327 | 448 | ||
| 328 | // Timer does NOT reset on new events - use interval | 449 | // Timer does NOT reset on new events - use interval |
| 329 | let mut timer = tokio::time::interval(batch_window); | 450 | let mut timer = tokio::time::interval(batch_window); |