upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-03 14:50:22 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-03 15:18:23 +0000
commit874a8abe1d076cfafd9baf919ec23d7d58200698 (patch)
treedce0d0d36bddc496ff32f8555a8790d8dc7be7e4 /src/sync/self_subscriber.rs
parent9fd4350c57bbe986ebf65bf3ea4c996572e81884 (diff)
parent92a9a3bfe0bc522e8ae411991a366a3a6310d525 (diff)
Merge relay.ngit.dev migration: bug fixes and migration tooling
This merge includes critical bug fixes and comprehensive migration tooling developed during the relay.ngit.dev migration effort. Bug Fixes: - Fix git protocol error handling to return HTTP 200 with ERR pkt-line - Fix naughty list false positives and DNS failure identification - Fix database query filters in load_existing_events (remove .since()) - Fix OID fetch tracking to distinguish 0 OIDs from successful fetches - Fix purgatory event source tracking for filtered expiry logging - Implement OID retry logic for 'not our ref' errors Migration Tools & Documentation: - Complete 5-phase migration analysis pipeline with orchestration script - Phase 1: Event fetching from source relay - Phase 2: Git sync verification - Phase 3: Categorization and relay comparison - Phase 4: Log extraction (parse failures, purgatory expiry) - Phase 5: Action classification for migration decisions - Comprehensive migration guide with lessons learned - Troubleshooting guide for permission and corruption issues Configuration: - Add NGIT_LOG_LEVEL configuration option - Update git throttle limits to 60/minute - Improve logging throughout for better observability
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs141
1 files changed, 118 insertions, 23 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 3cc408d..86e4583 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -16,6 +16,8 @@ use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError; 16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase;
20
19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds};
20 22
21// ============================================================================= 23// =============================================================================
@@ -98,6 +100,8 @@ pub struct SelfSubscriber {
98 action_tx: mpsc::Sender<AddFilters>, 100 action_tx: mpsc::Sender<AddFilters>,
99 /// Last time we connected - used for since filter on reconnect 101 /// Last time we connected - used for since filter on reconnect
100 last_connected: Option<Timestamp>, 102 last_connected: Option<Timestamp>,
103 /// Database for querying existing events on startup
104 database: SharedDatabase,
101} 105}
102 106
103impl SelfSubscriber { 107impl SelfSubscriber {
@@ -108,11 +112,13 @@ impl SelfSubscriber {
108 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 112 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
109 /// * `repo_sync_index` - Shared index to update with discovered repos 113 /// * `repo_sync_index` - Shared index to update with discovered repos
110 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager 114 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
115 /// * `database` - Database for querying existing events on startup
111 pub fn new( 116 pub fn new(
112 own_relay_url: String, 117 own_relay_url: String,
113 relay_domain: String, 118 relay_domain: String,
114 repo_sync_index: RepoSyncIndex, 119 repo_sync_index: RepoSyncIndex,
115 action_tx: mpsc::Sender<AddFilters>, 120 action_tx: mpsc::Sender<AddFilters>,
121 database: SharedDatabase,
116 ) -> Self { 122 ) -> Self {
117 Self { 123 Self {
118 own_relay_url, 124 own_relay_url,
@@ -120,6 +126,7 @@ impl SelfSubscriber {
120 repo_sync_index, 126 repo_sync_index,
121 action_tx, 127 action_tx,
122 last_connected: None, 128 last_connected: None,
129 database,
123 } 130 }
124 } 131 }
125 132
@@ -135,6 +142,101 @@ impl SelfSubscriber {
135 .unwrap_or(Duration::from_millis(5000)) 142 .unwrap_or(Duration::from_millis(5000))
136 } 143 }
137 144
145 /// Load existing events from database on startup
146 ///
147 /// Queries the database with two separate queries to build the initial
148 /// PendingUpdates state. This ensures all repos get Layer 2/3 filters
149 /// created, not just those returned by the WebSocket subscription
150 /// (which has limits on the number of events returned).
151 ///
152 /// Query order:
153 /// 1. First query: Get announcements (30617) to populate repo_sync_index
154 /// with repos and their relays
155 /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event()
156 /// to add root event IDs for Layer 3 filter creation
157 ///
158 /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters.
159 async fn load_existing_events(&self) -> PendingUpdates {
160 let mut pending = PendingUpdates::new();
161
162 tracing::info!("Loading all events from database");
163
164 // First query: Get all announcements to populate repo_sync_index
165 let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement);
166
167 let announcements = match self.database.query(announcement_filter).await {
168 Ok(events) => {
169 tracing::info!(count = events.len(), "Loaded announcements from database");
170 events
171 }
172 Err(e) => {
173 tracing::error!(
174 error = %e,
175 "Failed to query announcements from database"
176 );
177 return pending;
178 }
179 };
180
181 // Process announcements
182 let mut announcements_loaded = 0;
183 for event in announcements.iter() {
184 if let Some(repo_id) = Self::extract_repo_id(event) {
185 let relays = Self::extract_relay_urls(event);
186 pending.add_repo(repo_id, relays, HashSet::new());
187 announcements_loaded += 1;
188 }
189 }
190
191 // Update repo_sync_index with announcements BEFORE querying root events
192 {
193 let mut index = self.repo_sync_index.write().await;
194 for (repo_id, needs) in &pending.repos {
195 let entry = index
196 .entry(repo_id.clone())
197 .or_insert_with(|| RepoSyncNeeds {
198 relays: HashSet::new(),
199 root_events: HashSet::new(),
200 });
201 entry.relays.extend(needs.relays.clone());
202 }
203 }
204
205 // Second query: Get all root events for handle_root_event()
206 let root_filter =
207 Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]);
208
209 let root_events = match self.database.query(root_filter).await {
210 Ok(events) => {
211 tracing::info!(count = events.len(), "Loaded root events from database");
212 events
213 }
214 Err(e) => {
215 tracing::error!(
216 error = %e,
217 "Failed to query root events from database"
218 );
219 // Continue with just announcements
220 return pending;
221 }
222 };
223
224 // Process root events
225 let mut root_events_processed = 0;
226 for event in root_events.iter() {
227 self.handle_root_event(event, &mut pending).await;
228 root_events_processed += 1;
229 }
230
231 tracing::info!(
232 announcements_loaded = announcements_loaded,
233 root_events_processed = root_events_processed,
234 "Processed existing events from database"
235 );
236
237 pending
238 }
239
138 /// Process a relay pool notification 240 /// Process a relay pool notification
139 /// 241 ///
140 /// Handles incoming events from the subscription, queueing 30617 announcements 242 /// Handles incoming events from the subscription, queueing 30617 announcements
@@ -276,33 +378,22 @@ impl SelfSubscriber {
276 // Subscribe to announcement and root event kinds 378 // Subscribe to announcement and root event kinds
277 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) 379 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618)
278 // Plus kind 10317 (User Grasp List) for GRASP discovery 380 // Plus kind 10317 (User Grasp List) for GRASP discovery
279 // Check if we have a last_connected time for reconnect filtering 381 let mut filter = Filter::new().kinds(vec![
280 let filter = if let Some(last) = self.last_connected { 382 Kind::GitRepoAnnouncement,
383 Kind::GitPatch,
384 Kind::GitIssue,
385 Kind::GitPullRequest,
386 Kind::GitUserGraspList,
387 ]);
388 if let Some(timestamp) = self.last_connected {
281 // Quick reconnect - use since filter (15 min buffer) 389 // Quick reconnect - use since filter (15 min buffer)
282 let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); 390 let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60));
283 tracing::debug!( 391 tracing::debug!(
284 since = %since, 392 since = %since,
285 "Using since filter for reconnect" 393 "Using since filter for reconnect"
286 ); 394 );
287 Filter::new() 395 filter = filter.since(since);
288 .kinds(vec![ 396 }
289 Kind::GitRepoAnnouncement, // Repository Announcements
290 Kind::GitPatch, // Patches
291 Kind::GitIssue, // Issues
292 Kind::GitPullRequest, // Pull Requests
293 Kind::GitUserGraspList, // User Grasp List
294 ])
295 .since(since)
296 } else {
297 // First connection - no since filter
298 Filter::new().kinds(vec![
299 Kind::GitRepoAnnouncement, // Repository Announcements
300 Kind::GitPatch, // Patches
301 Kind::GitIssue, // Issues
302 Kind::GitPullRequest, // Pull Requests
303 Kind::GitUserGraspList, // User Grasp List
304 ])
305 };
306 397
307 // Update last_connected AFTER creating filter but BEFORE subscribing 398 // Update last_connected AFTER creating filter but BEFORE subscribing
308 self.last_connected = Some(Timestamp::now()); 399 self.last_connected = Some(Timestamp::now());
@@ -323,7 +414,11 @@ impl SelfSubscriber {
323 414
324 let mut notifications = client.notifications(); 415 let mut notifications = client.notifications();
325 let batch_window = Self::get_batch_window(); 416 let batch_window = Self::get_batch_window();
326 let mut pending = PendingUpdates::new(); 417
418 // Load existing events from database on startup
419 // This ensures all repos get Layer 2/3 filters created, not just those
420 // returned by the WebSocket subscription (which has limits)
421 let mut pending = self.load_existing_events().await;
327 422
328 // Timer does NOT reset on new events - use interval 423 // Timer does NOT reset on new events - use interval
329 let mut timer = tokio::time::interval(batch_window); 424 let mut timer = tokio::time::interval(batch_window);