upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs167
1 files changed, 144 insertions, 23 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 3cc408d..e9505f1 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -16,6 +16,8 @@ use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError; 16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase;
20
19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds};
20 22
21// ============================================================================= 23// =============================================================================
@@ -98,6 +100,8 @@ pub struct SelfSubscriber {
98 action_tx: mpsc::Sender<AddFilters>, 100 action_tx: mpsc::Sender<AddFilters>,
99 /// Last time we connected - used for since filter on reconnect 101 /// Last time we connected - used for since filter on reconnect
100 last_connected: Option<Timestamp>, 102 last_connected: Option<Timestamp>,
103 /// Database for querying existing events on startup
104 database: SharedDatabase,
101} 105}
102 106
103impl SelfSubscriber { 107impl SelfSubscriber {
@@ -108,11 +112,13 @@ impl SelfSubscriber {
108 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 112 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
109 /// * `repo_sync_index` - Shared index to update with discovered repos 113 /// * `repo_sync_index` - Shared index to update with discovered repos
110 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager 114 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
115 /// * `database` - Database for querying existing events on startup
111 pub fn new( 116 pub fn new(
112 own_relay_url: String, 117 own_relay_url: String,
113 relay_domain: String, 118 relay_domain: String,
114 repo_sync_index: RepoSyncIndex, 119 repo_sync_index: RepoSyncIndex,
115 action_tx: mpsc::Sender<AddFilters>, 120 action_tx: mpsc::Sender<AddFilters>,
121 database: SharedDatabase,
116 ) -> Self { 122 ) -> Self {
117 Self { 123 Self {
118 own_relay_url, 124 own_relay_url,
@@ -120,6 +126,7 @@ impl SelfSubscriber {
120 repo_sync_index, 126 repo_sync_index,
121 action_tx, 127 action_tx,
122 last_connected: None, 128 last_connected: None,
129 database,
123 } 130 }
124 } 131 }
125 132
@@ -135,6 +142,127 @@ impl SelfSubscriber {
135 .unwrap_or(Duration::from_millis(5000)) 142 .unwrap_or(Duration::from_millis(5000))
136 } 143 }
137 144
145 /// Load existing events from database on startup
146 ///
147 /// Queries the database with two separate queries to build the initial
148 /// PendingUpdates state. This ensures all repos get Layer 2/3 filters
149 /// created, not just those returned by the WebSocket subscription
150 /// (which has limits on the number of events returned).
151 ///
152 /// Query order:
153 /// 1. First query: Get announcements (30617) to populate repo_sync_index
154 /// with repos and their relays
155 /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event()
156 /// to add root event IDs for Layer 3 filter creation
157 ///
158 /// Both queries use `.since(last_connected)` if available for incremental
159 /// loading on reconnect.
160 ///
161 /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters.
162 async fn load_existing_events(&self) -> PendingUpdates {
163 let mut pending = PendingUpdates::new();
164
165 // Log whether this is a full or incremental load
166 if let Some(since) = self.last_connected {
167 tracing::info!(
168 since = %since,
169 "Loading events incrementally from database (reconnect)"
170 );
171 } else {
172 tracing::info!("Loading all events from database (first connection)");
173 }
174
175 // First query: Get announcements to populate repo_sync_index
176 let mut announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement);
177 if let Some(timestamp) = self.last_connected {
178 announcement_filter = announcement_filter.since(timestamp);
179 }
180
181 let announcements = match self.database.query(announcement_filter).await {
182 Ok(events) => {
183 tracing::info!(
184 count = events.len(),
185 "Loaded announcements from database"
186 );
187 events
188 }
189 Err(e) => {
190 tracing::error!(
191 error = %e,
192 "Failed to query announcements from database"
193 );
194 return pending;
195 }
196 };
197
198 // Process announcements
199 let mut announcements_loaded = 0;
200 for event in announcements.iter() {
201 if let Some(repo_id) = Self::extract_repo_id(event) {
202 let relays = Self::extract_relay_urls(event);
203 pending.add_repo(repo_id, relays, HashSet::new());
204 announcements_loaded += 1;
205 }
206 }
207
208 // Update repo_sync_index with announcements BEFORE querying root events
209 {
210 let mut index = self.repo_sync_index.write().await;
211 for (repo_id, needs) in &pending.repos {
212 let entry = index
213 .entry(repo_id.clone())
214 .or_insert_with(|| RepoSyncNeeds {
215 relays: HashSet::new(),
216 root_events: HashSet::new(),
217 });
218 entry.relays.extend(needs.relays.clone());
219 }
220 }
221
222 // Second query: Get root events for handle_root_event()
223 let mut root_filter = Filter::new().kinds(vec![
224 Kind::GitPatch,
225 Kind::GitIssue,
226 Kind::GitPullRequest,
227 ]);
228 if let Some(timestamp) = self.last_connected {
229 root_filter = root_filter.since(timestamp);
230 }
231
232 let root_events = match self.database.query(root_filter).await {
233 Ok(events) => {
234 tracing::info!(
235 count = events.len(),
236 "Loaded root events from database"
237 );
238 events
239 }
240 Err(e) => {
241 tracing::error!(
242 error = %e,
243 "Failed to query root events from database"
244 );
245 // Continue with just announcements
246 return pending;
247 }
248 };
249
250 // Process root events
251 let mut root_events_processed = 0;
252 for event in root_events.iter() {
253 self.handle_root_event(event, &mut pending).await;
254 root_events_processed += 1;
255 }
256
257 tracing::info!(
258 announcements_loaded = announcements_loaded,
259 root_events_processed = root_events_processed,
260 "Processed existing events from database"
261 );
262
263 pending
264 }
265
138 /// Process a relay pool notification 266 /// Process a relay pool notification
139 /// 267 ///
140 /// Handles incoming events from the subscription, queueing 30617 announcements 268 /// Handles incoming events from the subscription, queueing 30617 announcements
@@ -276,33 +404,22 @@ impl SelfSubscriber {
276 // Subscribe to announcement and root event kinds 404 // Subscribe to announcement and root event kinds
277 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) 405 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618)
278 // Plus kind 10317 (User Grasp List) for GRASP discovery 406 // Plus kind 10317 (User Grasp List) for GRASP discovery
279 // Check if we have a last_connected time for reconnect filtering 407 let mut filter = Filter::new().kinds(vec![
280 let filter = if let Some(last) = self.last_connected { 408 Kind::GitRepoAnnouncement,
409 Kind::GitPatch,
410 Kind::GitIssue,
411 Kind::GitPullRequest,
412 Kind::GitUserGraspList,
413 ]);
414 if let Some(timestamp) = self.last_connected {
281 // Quick reconnect - use since filter (15 min buffer) 415 // Quick reconnect - use since filter (15 min buffer)
282 let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); 416 let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60));
283 tracing::debug!( 417 tracing::debug!(
284 since = %since, 418 since = %since,
285 "Using since filter for reconnect" 419 "Using since filter for reconnect"
286 ); 420 );
287 Filter::new() 421 filter = filter.since(since);
288 .kinds(vec![ 422 }
289 Kind::GitRepoAnnouncement, // Repository Announcements
290 Kind::GitPatch, // Patches
291 Kind::GitIssue, // Issues
292 Kind::GitPullRequest, // Pull Requests
293 Kind::GitUserGraspList, // User Grasp List
294 ])
295 .since(since)
296 } else {
297 // First connection - no since filter
298 Filter::new().kinds(vec![
299 Kind::GitRepoAnnouncement, // Repository Announcements
300 Kind::GitPatch, // Patches
301 Kind::GitIssue, // Issues
302 Kind::GitPullRequest, // Pull Requests
303 Kind::GitUserGraspList, // User Grasp List
304 ])
305 };
306 423
307 // Update last_connected AFTER creating filter but BEFORE subscribing 424 // Update last_connected AFTER creating filter but BEFORE subscribing
308 self.last_connected = Some(Timestamp::now()); 425 self.last_connected = Some(Timestamp::now());
@@ -323,7 +440,11 @@ impl SelfSubscriber {
323 440
324 let mut notifications = client.notifications(); 441 let mut notifications = client.notifications();
325 let batch_window = Self::get_batch_window(); 442 let batch_window = Self::get_batch_window();
326 let mut pending = PendingUpdates::new(); 443
444 // Load existing events from database on startup
445 // This ensures all repos get Layer 2/3 filters created, not just those
446 // returned by the WebSocket subscription (which has limits)
447 let mut pending = self.load_existing_events().await;
327 448
328 // Timer does NOT reset on new events - use interval 449 // Timer does NOT reset on new events - use interval
329 let mut timer = tokio::time::interval(batch_window); 450 let mut timer = tokio::time::interval(batch_window);