upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs142
1 files changed, 119 insertions, 23 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index ab10c49..4d69c9a 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -16,6 +16,8 @@ use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError; 16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase;
20
19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel};
20 22
21// ============================================================================= 23// =============================================================================
@@ -99,6 +101,8 @@ pub struct SelfSubscriber {
99 action_tx: mpsc::Sender<AddFilters>, 101 action_tx: mpsc::Sender<AddFilters>,
100 /// Last time we connected - used for since filter on reconnect 102 /// Last time we connected - used for since filter on reconnect
101 last_connected: Option<Timestamp>, 103 last_connected: Option<Timestamp>,
104 /// Database for querying existing events on startup
105 database: SharedDatabase,
102} 106}
103 107
104impl SelfSubscriber { 108impl SelfSubscriber {
@@ -109,11 +113,13 @@ impl SelfSubscriber {
109 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 113 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
110 /// * `repo_sync_index` - Shared index to update with discovered repos 114 /// * `repo_sync_index` - Shared index to update with discovered repos
111 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager 115 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
116 /// * `database` - Database for querying existing events on startup
112 pub fn new( 117 pub fn new(
113 own_relay_url: String, 118 own_relay_url: String,
114 relay_domain: String, 119 relay_domain: String,
115 repo_sync_index: RepoSyncIndex, 120 repo_sync_index: RepoSyncIndex,
116 action_tx: mpsc::Sender<AddFilters>, 121 action_tx: mpsc::Sender<AddFilters>,
122 database: SharedDatabase,
117 ) -> Self { 123 ) -> Self {
118 Self { 124 Self {
119 own_relay_url, 125 own_relay_url,
@@ -121,6 +127,7 @@ impl SelfSubscriber {
121 repo_sync_index, 127 repo_sync_index,
122 action_tx, 128 action_tx,
123 last_connected: None, 129 last_connected: None,
130 database,
124 } 131 }
125 } 132 }
126 133
@@ -136,6 +143,102 @@ impl SelfSubscriber {
136 } 143 }
137 } 144 }
138 145
146 /// Load existing events from database on startup
147 ///
148 /// Queries the database with two separate queries to build the initial
149 /// PendingUpdates state. This ensures all repos get Layer 2/3 filters
150 /// created, not just those returned by the WebSocket subscription
151 /// (which has limits on the number of events returned).
152 ///
153 /// Query order:
154 /// 1. First query: Get announcements (30617) to populate repo_sync_index
155 /// with repos and their relays
156 /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event()
157 /// to add root event IDs for Layer 3 filter creation
158 ///
159 /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters.
160 async fn load_existing_events(&self) -> PendingUpdates {
161 let mut pending = PendingUpdates::new();
162
163 tracing::info!("Loading all events from database");
164
165 // First query: Get all announcements to populate repo_sync_index
166 let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement);
167
168 let announcements = match self.database.query(announcement_filter).await {
169 Ok(events) => {
170 tracing::info!(count = events.len(), "Loaded announcements from database");
171 events
172 }
173 Err(e) => {
174 tracing::error!(
175 error = %e,
176 "Failed to query announcements from database"
177 );
178 return pending;
179 }
180 };
181
182 // Process announcements
183 let mut announcements_loaded = 0;
184 for event in announcements.iter() {
185 if let Some(repo_id) = Self::extract_repo_id(event) {
186 let relays = Self::extract_relay_urls(event);
187 pending.add_repo(repo_id, relays, HashSet::new());
188 announcements_loaded += 1;
189 }
190 }
191
192 // Update repo_sync_index with announcements BEFORE querying root events
193 {
194 let mut index = self.repo_sync_index.write().await;
195 for (repo_id, needs) in &pending.repos {
196 let entry = index
197 .entry(repo_id.clone())
198 .or_insert_with(|| RepoSyncNeeds {
199 relays: HashSet::new(),
200 root_events: HashSet::new(),
201 sync_level: SyncLevel::StateOnly,
202 });
203 entry.relays.extend(needs.relays.clone());
204 }
205 }
206
207 // Second query: Get all root events for handle_root_event()
208 let root_filter =
209 Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]);
210
211 let root_events = match self.database.query(root_filter).await {
212 Ok(events) => {
213 tracing::info!(count = events.len(), "Loaded root events from database");
214 events
215 }
216 Err(e) => {
217 tracing::error!(
218 error = %e,
219 "Failed to query root events from database"
220 );
221 // Continue with just announcements
222 return pending;
223 }
224 };
225
226 // Process root events
227 let mut root_events_processed = 0;
228 for event in root_events.iter() {
229 self.handle_root_event(event, &mut pending).await;
230 root_events_processed += 1;
231 }
232
233 tracing::info!(
234 announcements_loaded = announcements_loaded,
235 root_events_processed = root_events_processed,
236 "Processed existing events from database"
237 );
238
239 pending
240 }
241
139 /// Process a relay pool notification 242 /// Process a relay pool notification
140 /// 243 ///
141 /// Handles incoming events from the subscription, queueing 30617 announcements 244 /// Handles incoming events from the subscription, queueing 30617 announcements
@@ -277,33 +380,22 @@ impl SelfSubscriber {
277 // Subscribe to announcement and root event kinds 380 // Subscribe to announcement and root event kinds
278 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) 381 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618)
279 // Plus kind 10317 (User Grasp List) for GRASP discovery 382 // Plus kind 10317 (User Grasp List) for GRASP discovery
280 // Check if we have a last_connected time for reconnect filtering 383 let mut filter = Filter::new().kinds(vec![
281 let filter = if let Some(last) = self.last_connected { 384 Kind::GitRepoAnnouncement,
385 Kind::GitPatch,
386 Kind::GitIssue,
387 Kind::GitPullRequest,
388 Kind::GitUserGraspList,
389 ]);
390 if let Some(timestamp) = self.last_connected {
282 // Quick reconnect - use since filter (15 min buffer) 391 // Quick reconnect - use since filter (15 min buffer)
283 let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); 392 let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60));
284 tracing::debug!( 393 tracing::debug!(
285 since = %since, 394 since = %since,
286 "Using since filter for reconnect" 395 "Using since filter for reconnect"
287 ); 396 );
288 Filter::new() 397 filter = filter.since(since);
289 .kinds(vec![ 398 }
290 Kind::GitRepoAnnouncement, // Repository Announcements
291 Kind::GitPatch, // Patches
292 Kind::GitIssue, // Issues
293 Kind::GitPullRequest, // Pull Requests
294 Kind::GitUserGraspList, // User Grasp List
295 ])
296 .since(since)
297 } else {
298 // First connection - no since filter
299 Filter::new().kinds(vec![
300 Kind::GitRepoAnnouncement, // Repository Announcements
301 Kind::GitPatch, // Patches
302 Kind::GitIssue, // Issues
303 Kind::GitPullRequest, // Pull Requests
304 Kind::GitUserGraspList, // User Grasp List
305 ])
306 };
307 399
308 // Update last_connected AFTER creating filter but BEFORE subscribing 400 // Update last_connected AFTER creating filter but BEFORE subscribing
309 self.last_connected = Some(Timestamp::now()); 401 self.last_connected = Some(Timestamp::now());
@@ -324,7 +416,11 @@ impl SelfSubscriber {
324 416
325 let mut notifications = client.notifications(); 417 let mut notifications = client.notifications();
326 let batch_window = Self::get_batch_window(); 418 let batch_window = Self::get_batch_window();
327 let mut pending = PendingUpdates::new(); 419
420 // Load existing events from database on startup
421 // This ensures all repos get Layer 2/3 filters created, not just those
422 // returned by the WebSocket subscription (which has limits)
423 let mut pending = self.load_existing_events().await;
328 424
329 // Timer does NOT reset on new events - use interval 425 // Timer does NOT reset on new events - use interval
330 let mut timer = tokio::time::interval(batch_window); 426 let mut timer = tokio::time::interval(batch_window);