upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-26 17:20:11 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-27 20:38:16 +0000
commit905ebd838a9ff8cc777cf3b3b6306066e8c177fc (patch)
tree1c46ddded3b1fff935dfa80b44aeff862e624642 /src
parent1ae97cd85aec95f6270f853b28e48774cefc6bf6 (diff)
fix: load existing events from database on startup with two-pass queries
Previously, SelfSubscriber only saw events returned by the WebSocket subscription to the local relay, which has limits on the number of events returned. This caused repos with announcements in the database to never get Layer 2/3 filters created, resulting in missing state events. Now, on startup, we query the database directly with two separate queries: 1. Query announcements (30617) to populate repo_sync_index 2. Query root events (1617/1618/1621) to create Layer 3 filters Both queries use .since(last_connected) if available for incremental loading on reconnect. Filters are created inline and made mutable to support the .since() clause, rather than using a shared create_event_filter() method. Fixes the issue where state events were missing for repos like cashbird and creative-space that had announcements in the database but weren't returned by the WebSocket subscription.
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs1
-rw-r--r--src/sync/self_subscriber.rs167
2 files changed, 145 insertions, 23 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index bc8c428..226e681 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1442,6 +1442,7 @@ impl SyncManager {
1442 self.service_domain.clone(), 1442 self.service_domain.clone(),
1443 Arc::clone(&self.repo_sync_index), 1443 Arc::clone(&self.repo_sync_index),
1444 action_tx, 1444 action_tx,
1445 self.database.clone(),
1445 ); 1446 );
1446 let subscriber_shutdown = shutdown_tx.subscribe(); 1447 let subscriber_shutdown = shutdown_tx.subscribe();
1447 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); 1448 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 3cc408d..e9505f1 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -16,6 +16,8 @@ use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError; 16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase;
20
19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds};
20 22
21// ============================================================================= 23// =============================================================================
@@ -98,6 +100,8 @@ pub struct SelfSubscriber {
98 action_tx: mpsc::Sender<AddFilters>, 100 action_tx: mpsc::Sender<AddFilters>,
99 /// Last time we connected - used for since filter on reconnect 101 /// Last time we connected - used for since filter on reconnect
100 last_connected: Option<Timestamp>, 102 last_connected: Option<Timestamp>,
103 /// Database for querying existing events on startup
104 database: SharedDatabase,
101} 105}
102 106
103impl SelfSubscriber { 107impl SelfSubscriber {
@@ -108,11 +112,13 @@ impl SelfSubscriber {
108 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 112 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
109 /// * `repo_sync_index` - Shared index to update with discovered repos 113 /// * `repo_sync_index` - Shared index to update with discovered repos
110 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager 114 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
115 /// * `database` - Database for querying existing events on startup
111 pub fn new( 116 pub fn new(
112 own_relay_url: String, 117 own_relay_url: String,
113 relay_domain: String, 118 relay_domain: String,
114 repo_sync_index: RepoSyncIndex, 119 repo_sync_index: RepoSyncIndex,
115 action_tx: mpsc::Sender<AddFilters>, 120 action_tx: mpsc::Sender<AddFilters>,
121 database: SharedDatabase,
116 ) -> Self { 122 ) -> Self {
117 Self { 123 Self {
118 own_relay_url, 124 own_relay_url,
@@ -120,6 +126,7 @@ impl SelfSubscriber {
120 repo_sync_index, 126 repo_sync_index,
121 action_tx, 127 action_tx,
122 last_connected: None, 128 last_connected: None,
129 database,
123 } 130 }
124 } 131 }
125 132
@@ -135,6 +142,127 @@ impl SelfSubscriber {
135 .unwrap_or(Duration::from_millis(5000)) 142 .unwrap_or(Duration::from_millis(5000))
136 } 143 }
137 144
145 /// Load existing events from database on startup
146 ///
147 /// Queries the database with two separate queries to build the initial
148 /// PendingUpdates state. This ensures all repos get Layer 2/3 filters
149 /// created, not just those returned by the WebSocket subscription
150 /// (which has limits on the number of events returned).
151 ///
152 /// Query order:
153 /// 1. First query: Get announcements (30617) to populate repo_sync_index
154 /// with repos and their relays
155 /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event()
156 /// to add root event IDs for Layer 3 filter creation
157 ///
158 /// Both queries use `.since(last_connected)` if available for incremental
159 /// loading on reconnect.
160 ///
161 /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters.
162 async fn load_existing_events(&self) -> PendingUpdates {
163 let mut pending = PendingUpdates::new();
164
165 // Log whether this is a full or incremental load
166 if let Some(since) = self.last_connected {
167 tracing::info!(
168 since = %since,
169 "Loading events incrementally from database (reconnect)"
170 );
171 } else {
172 tracing::info!("Loading all events from database (first connection)");
173 }
174
175 // First query: Get announcements to populate repo_sync_index
176 let mut announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement);
177 if let Some(timestamp) = self.last_connected {
178 announcement_filter = announcement_filter.since(timestamp);
179 }
180
181 let announcements = match self.database.query(announcement_filter).await {
182 Ok(events) => {
183 tracing::info!(
184 count = events.len(),
185 "Loaded announcements from database"
186 );
187 events
188 }
189 Err(e) => {
190 tracing::error!(
191 error = %e,
192 "Failed to query announcements from database"
193 );
194 return pending;
195 }
196 };
197
198 // Process announcements
199 let mut announcements_loaded = 0;
200 for event in announcements.iter() {
201 if let Some(repo_id) = Self::extract_repo_id(event) {
202 let relays = Self::extract_relay_urls(event);
203 pending.add_repo(repo_id, relays, HashSet::new());
204 announcements_loaded += 1;
205 }
206 }
207
208 // Update repo_sync_index with announcements BEFORE querying root events
209 {
210 let mut index = self.repo_sync_index.write().await;
211 for (repo_id, needs) in &pending.repos {
212 let entry = index
213 .entry(repo_id.clone())
214 .or_insert_with(|| RepoSyncNeeds {
215 relays: HashSet::new(),
216 root_events: HashSet::new(),
217 });
218 entry.relays.extend(needs.relays.clone());
219 }
220 }
221
222 // Second query: Get root events for handle_root_event()
223 let mut root_filter = Filter::new().kinds(vec![
224 Kind::GitPatch,
225 Kind::GitIssue,
226 Kind::GitPullRequest,
227 ]);
228 if let Some(timestamp) = self.last_connected {
229 root_filter = root_filter.since(timestamp);
230 }
231
232 let root_events = match self.database.query(root_filter).await {
233 Ok(events) => {
234 tracing::info!(
235 count = events.len(),
236 "Loaded root events from database"
237 );
238 events
239 }
240 Err(e) => {
241 tracing::error!(
242 error = %e,
243 "Failed to query root events from database"
244 );
245 // Continue with just announcements
246 return pending;
247 }
248 };
249
250 // Process root events
251 let mut root_events_processed = 0;
252 for event in root_events.iter() {
253 self.handle_root_event(event, &mut pending).await;
254 root_events_processed += 1;
255 }
256
257 tracing::info!(
258 announcements_loaded = announcements_loaded,
259 root_events_processed = root_events_processed,
260 "Processed existing events from database"
261 );
262
263 pending
264 }
265
138 /// Process a relay pool notification 266 /// Process a relay pool notification
139 /// 267 ///
140 /// Handles incoming events from the subscription, queueing 30617 announcements 268 /// Handles incoming events from the subscription, queueing 30617 announcements
@@ -276,33 +404,22 @@ impl SelfSubscriber {
276 // Subscribe to announcement and root event kinds 404 // Subscribe to announcement and root event kinds
277 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) 405 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618)
278 // Plus kind 10317 (User Grasp List) for GRASP discovery 406 // Plus kind 10317 (User Grasp List) for GRASP discovery
279 // Check if we have a last_connected time for reconnect filtering 407 let mut filter = Filter::new().kinds(vec![
280 let filter = if let Some(last) = self.last_connected { 408 Kind::GitRepoAnnouncement,
409 Kind::GitPatch,
410 Kind::GitIssue,
411 Kind::GitPullRequest,
412 Kind::GitUserGraspList,
413 ]);
414 if let Some(timestamp) = self.last_connected {
281 // Quick reconnect - use since filter (15 min buffer) 415 // Quick reconnect - use since filter (15 min buffer)
282 let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); 416 let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60));
283 tracing::debug!( 417 tracing::debug!(
284 since = %since, 418 since = %since,
285 "Using since filter for reconnect" 419 "Using since filter for reconnect"
286 ); 420 );
287 Filter::new() 421 filter = filter.since(since);
288 .kinds(vec![ 422 }
289 Kind::GitRepoAnnouncement, // Repository Announcements
290 Kind::GitPatch, // Patches
291 Kind::GitIssue, // Issues
292 Kind::GitPullRequest, // Pull Requests
293 Kind::GitUserGraspList, // User Grasp List
294 ])
295 .since(since)
296 } else {
297 // First connection - no since filter
298 Filter::new().kinds(vec![
299 Kind::GitRepoAnnouncement, // Repository Announcements
300 Kind::GitPatch, // Patches
301 Kind::GitIssue, // Issues
302 Kind::GitPullRequest, // Pull Requests
303 Kind::GitUserGraspList, // User Grasp List
304 ])
305 };
306 423
307 // Update last_connected AFTER creating filter but BEFORE subscribing 424 // Update last_connected AFTER creating filter but BEFORE subscribing
308 self.last_connected = Some(Timestamp::now()); 425 self.last_connected = Some(Timestamp::now());
@@ -323,7 +440,11 @@ impl SelfSubscriber {
323 440
324 let mut notifications = client.notifications(); 441 let mut notifications = client.notifications();
325 let batch_window = Self::get_batch_window(); 442 let batch_window = Self::get_batch_window();
326 let mut pending = PendingUpdates::new(); 443
444 // Load existing events from database on startup
445 // This ensures all repos get Layer 2/3 filters created, not just those
446 // returned by the WebSocket subscription (which has limits)
447 let mut pending = self.load_existing_events().await;
327 448
328 // Timer does NOT reset on new events - use interval 449 // Timer does NOT reset on new events - use interval
329 let mut timer = tokio::time::interval(batch_window); 450 let mut timer = tokio::time::interval(batch_window);