diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-03 14:50:22 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-03 15:18:23 +0000 |
| commit | 874a8abe1d076cfafd9baf919ec23d7d58200698 (patch) | |
| tree | dce0d0d36bddc496ff32f8555a8790d8dc7be7e4 /src/sync/self_subscriber.rs | |
| parent | 9fd4350c57bbe986ebf65bf3ea4c996572e81884 (diff) | |
| parent | 92a9a3bfe0bc522e8ae411991a366a3a6310d525 (diff) | |
Merge relay.ngit.dev migration: bug fixes and migration tooling
This merge includes critical bug fixes and comprehensive migration tooling
developed during the relay.ngit.dev migration effort.
Bug Fixes:
- Fix git protocol error handling to return HTTP 200 with ERR pkt-line
- Fix naughty list false positives and DNS failure identification
- Fix database query filters in load_existing_events (remove .since())
- Fix OID fetch tracking to distinguish 0 OIDs from successful fetches
- Fix purgatory event source tracking for filtered expiry logging
- Implement OID retry logic for 'not our ref' errors
Migration Tools & Documentation:
- Complete 5-phase migration analysis pipeline with orchestration script
- Phase 1: Event fetching from source relay
- Phase 2: Git sync verification
- Phase 3: Categorization and relay comparison
- Phase 4: Log extraction (parse failures, purgatory expiry)
- Phase 5: Action classification for migration decisions
- Comprehensive migration guide with lessons learned
- Troubleshooting guide for permission and corruption issues
Configuration:
- Add NGIT_LOG_LEVEL configuration option
- Update git throttle limits to 60/minute
- Improve logging throughout for better observability
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 141 |
1 files changed, 118 insertions, 23 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..86e4583 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; | |||
| 16 | use tokio::sync::broadcast::error::RecvError; | 16 | use tokio::sync::broadcast::error::RecvError; |
| 17 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | ||
| 20 | |||
| 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 20 | 22 | ||
| 21 | // ============================================================================= | 23 | // ============================================================================= |
| @@ -98,6 +100,8 @@ pub struct SelfSubscriber { | |||
| 98 | action_tx: mpsc::Sender<AddFilters>, | 100 | action_tx: mpsc::Sender<AddFilters>, |
| 99 | /// Last time we connected - used for since filter on reconnect | 101 | /// Last time we connected - used for since filter on reconnect |
| 100 | last_connected: Option<Timestamp>, | 102 | last_connected: Option<Timestamp>, |
| 103 | /// Database for querying existing events on startup | ||
| 104 | database: SharedDatabase, | ||
| 101 | } | 105 | } |
| 102 | 106 | ||
| 103 | impl SelfSubscriber { | 107 | impl SelfSubscriber { |
| @@ -108,11 +112,13 @@ impl SelfSubscriber { | |||
| 108 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 112 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 109 | /// * `repo_sync_index` - Shared index to update with discovered repos | 113 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 110 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager | 114 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 115 | /// * `database` - Database for querying existing events on startup | ||
| 111 | pub fn new( | 116 | pub fn new( |
| 112 | own_relay_url: String, | 117 | own_relay_url: String, |
| 113 | relay_domain: String, | 118 | relay_domain: String, |
| 114 | repo_sync_index: RepoSyncIndex, | 119 | repo_sync_index: RepoSyncIndex, |
| 115 | action_tx: mpsc::Sender<AddFilters>, | 120 | action_tx: mpsc::Sender<AddFilters>, |
| 121 | database: SharedDatabase, | ||
| 116 | ) -> Self { | 122 | ) -> Self { |
| 117 | Self { | 123 | Self { |
| 118 | own_relay_url, | 124 | own_relay_url, |
| @@ -120,6 +126,7 @@ impl SelfSubscriber { | |||
| 120 | repo_sync_index, | 126 | repo_sync_index, |
| 121 | action_tx, | 127 | action_tx, |
| 122 | last_connected: None, | 128 | last_connected: None, |
| 129 | database, | ||
| 123 | } | 130 | } |
| 124 | } | 131 | } |
| 125 | 132 | ||
| @@ -135,6 +142,101 @@ impl SelfSubscriber { | |||
| 135 | .unwrap_or(Duration::from_millis(5000)) | 142 | .unwrap_or(Duration::from_millis(5000)) |
| 136 | } | 143 | } |
| 137 | 144 | ||
| 145 | /// Load existing events from database on startup | ||
| 146 | /// | ||
| 147 | /// Queries the database with two separate queries to build the initial | ||
| 148 | /// PendingUpdates state. This ensures all repos get Layer 2/3 filters | ||
| 149 | /// created, not just those returned by the WebSocket subscription | ||
| 150 | /// (which has limits on the number of events returned). | ||
| 151 | /// | ||
| 152 | /// Query order: | ||
| 153 | /// 1. First query: Get announcements (30617) to populate repo_sync_index | ||
| 154 | /// with repos and their relays | ||
| 155 | /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() | ||
| 156 | /// to add root event IDs for Layer 3 filter creation | ||
| 157 | /// | ||
| 158 | /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. | ||
| 159 | async fn load_existing_events(&self) -> PendingUpdates { | ||
| 160 | let mut pending = PendingUpdates::new(); | ||
| 161 | |||
| 162 | tracing::info!("Loading all events from database"); | ||
| 163 | |||
| 164 | // First query: Get all announcements to populate repo_sync_index | ||
| 165 | let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); | ||
| 166 | |||
| 167 | let announcements = match self.database.query(announcement_filter).await { | ||
| 168 | Ok(events) => { | ||
| 169 | tracing::info!(count = events.len(), "Loaded announcements from database"); | ||
| 170 | events | ||
| 171 | } | ||
| 172 | Err(e) => { | ||
| 173 | tracing::error!( | ||
| 174 | error = %e, | ||
| 175 | "Failed to query announcements from database" | ||
| 176 | ); | ||
| 177 | return pending; | ||
| 178 | } | ||
| 179 | }; | ||
| 180 | |||
| 181 | // Process announcements | ||
| 182 | let mut announcements_loaded = 0; | ||
| 183 | for event in announcements.iter() { | ||
| 184 | if let Some(repo_id) = Self::extract_repo_id(event) { | ||
| 185 | let relays = Self::extract_relay_urls(event); | ||
| 186 | pending.add_repo(repo_id, relays, HashSet::new()); | ||
| 187 | announcements_loaded += 1; | ||
| 188 | } | ||
| 189 | } | ||
| 190 | |||
| 191 | // Update repo_sync_index with announcements BEFORE querying root events | ||
| 192 | { | ||
| 193 | let mut index = self.repo_sync_index.write().await; | ||
| 194 | for (repo_id, needs) in &pending.repos { | ||
| 195 | let entry = index | ||
| 196 | .entry(repo_id.clone()) | ||
| 197 | .or_insert_with(|| RepoSyncNeeds { | ||
| 198 | relays: HashSet::new(), | ||
| 199 | root_events: HashSet::new(), | ||
| 200 | }); | ||
| 201 | entry.relays.extend(needs.relays.clone()); | ||
| 202 | } | ||
| 203 | } | ||
| 204 | |||
| 205 | // Second query: Get all root events for handle_root_event() | ||
| 206 | let root_filter = | ||
| 207 | Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]); | ||
| 208 | |||
| 209 | let root_events = match self.database.query(root_filter).await { | ||
| 210 | Ok(events) => { | ||
| 211 | tracing::info!(count = events.len(), "Loaded root events from database"); | ||
| 212 | events | ||
| 213 | } | ||
| 214 | Err(e) => { | ||
| 215 | tracing::error!( | ||
| 216 | error = %e, | ||
| 217 | "Failed to query root events from database" | ||
| 218 | ); | ||
| 219 | // Continue with just announcements | ||
| 220 | return pending; | ||
| 221 | } | ||
| 222 | }; | ||
| 223 | |||
| 224 | // Process root events | ||
| 225 | let mut root_events_processed = 0; | ||
| 226 | for event in root_events.iter() { | ||
| 227 | self.handle_root_event(event, &mut pending).await; | ||
| 228 | root_events_processed += 1; | ||
| 229 | } | ||
| 230 | |||
| 231 | tracing::info!( | ||
| 232 | announcements_loaded = announcements_loaded, | ||
| 233 | root_events_processed = root_events_processed, | ||
| 234 | "Processed existing events from database" | ||
| 235 | ); | ||
| 236 | |||
| 237 | pending | ||
| 238 | } | ||
| 239 | |||
| 138 | /// Process a relay pool notification | 240 | /// Process a relay pool notification |
| 139 | /// | 241 | /// |
| 140 | /// Handles incoming events from the subscription, queueing 30617 announcements | 242 | /// Handles incoming events from the subscription, queueing 30617 announcements |
| @@ -276,33 +378,22 @@ impl SelfSubscriber { | |||
| 276 | // Subscribe to announcement and root event kinds | 378 | // Subscribe to announcement and root event kinds |
| 277 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) | 379 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) |
| 278 | // Plus kind 10317 (User Grasp List) for GRASP discovery | 380 | // Plus kind 10317 (User Grasp List) for GRASP discovery |
| 279 | // Check if we have a last_connected time for reconnect filtering | 381 | let mut filter = Filter::new().kinds(vec![ |
| 280 | let filter = if let Some(last) = self.last_connected { | 382 | Kind::GitRepoAnnouncement, |
| 383 | Kind::GitPatch, | ||
| 384 | Kind::GitIssue, | ||
| 385 | Kind::GitPullRequest, | ||
| 386 | Kind::GitUserGraspList, | ||
| 387 | ]); | ||
| 388 | if let Some(timestamp) = self.last_connected { | ||
| 281 | // Quick reconnect - use since filter (15 min buffer) | 389 | // Quick reconnect - use since filter (15 min buffer) |
| 282 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); | 390 | let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); |
| 283 | tracing::debug!( | 391 | tracing::debug!( |
| 284 | since = %since, | 392 | since = %since, |
| 285 | "Using since filter for reconnect" | 393 | "Using since filter for reconnect" |
| 286 | ); | 394 | ); |
| 287 | Filter::new() | 395 | filter = filter.since(since); |
| 288 | .kinds(vec![ | 396 | } |
| 289 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 290 | Kind::GitPatch, // Patches | ||
| 291 | Kind::GitIssue, // Issues | ||
| 292 | Kind::GitPullRequest, // Pull Requests | ||
| 293 | Kind::GitUserGraspList, // User Grasp List | ||
| 294 | ]) | ||
| 295 | .since(since) | ||
| 296 | } else { | ||
| 297 | // First connection - no since filter | ||
| 298 | Filter::new().kinds(vec![ | ||
| 299 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 300 | Kind::GitPatch, // Patches | ||
| 301 | Kind::GitIssue, // Issues | ||
| 302 | Kind::GitPullRequest, // Pull Requests | ||
| 303 | Kind::GitUserGraspList, // User Grasp List | ||
| 304 | ]) | ||
| 305 | }; | ||
| 306 | 397 | ||
| 307 | // Update last_connected AFTER creating filter but BEFORE subscribing | 398 | // Update last_connected AFTER creating filter but BEFORE subscribing |
| 308 | self.last_connected = Some(Timestamp::now()); | 399 | self.last_connected = Some(Timestamp::now()); |
| @@ -323,7 +414,11 @@ impl SelfSubscriber { | |||
| 323 | 414 | ||
| 324 | let mut notifications = client.notifications(); | 415 | let mut notifications = client.notifications(); |
| 325 | let batch_window = Self::get_batch_window(); | 416 | let batch_window = Self::get_batch_window(); |
| 326 | let mut pending = PendingUpdates::new(); | 417 | |
| 418 | // Load existing events from database on startup | ||
| 419 | // This ensures all repos get Layer 2/3 filters created, not just those | ||
| 420 | // returned by the WebSocket subscription (which has limits) | ||
| 421 | let mut pending = self.load_existing_events().await; | ||
| 327 | 422 | ||
| 328 | // Timer does NOT reset on new events - use interval | 423 | // Timer does NOT reset on new events - use interval |
| 329 | let mut timer = tokio::time::interval(batch_window); | 424 | let mut timer = tokio::time::interval(batch_window); |