diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/self_subscriber.rs | 158 |
1 files changed, 75 insertions, 83 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 8113035..e9e61ff 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -13,11 +13,24 @@ use std::time::Duration; | |||
| 13 | 13 | ||
| 14 | use nostr_sdk::prelude::*; | 14 | use nostr_sdk::prelude::*; |
| 15 | use nostr_sdk::Timestamp; | 15 | use nostr_sdk::Timestamp; |
| 16 | use tokio::sync::broadcast::error::RecvError; | ||
| 16 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 17 | 18 | ||
| 18 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 19 | 20 | ||
| 20 | // ============================================================================= | 21 | // ============================================================================= |
| 22 | // LoopControl - Result of notification processing | ||
| 23 | // ============================================================================= | ||
| 24 | |||
| 25 | /// Control flow result from processing a notification | ||
| 26 | enum LoopControl { | ||
| 27 | /// Continue processing the next notification | ||
| 28 | Continue, | ||
| 29 | /// Break out of the event loop | ||
| 30 | Break, | ||
| 31 | } | ||
| 32 | |||
| 33 | // ============================================================================= | ||
| 21 | // PendingUpdates - Accumulator for batching | 34 | // PendingUpdates - Accumulator for batching |
| 22 | // ============================================================================= | 35 | // ============================================================================= |
| 23 | 36 | ||
| @@ -118,6 +131,64 @@ impl SelfSubscriber { | |||
| 118 | .unwrap_or(Duration::from_millis(5000)) | 131 | .unwrap_or(Duration::from_millis(5000)) |
| 119 | } | 132 | } |
| 120 | 133 | ||
| 134 | /// Process a relay pool notification | ||
| 135 | /// | ||
| 136 | /// Handles incoming events from the subscription, queueing 30617 announcements | ||
| 137 | /// for batch processing and immediately processing root events. | ||
| 138 | /// | ||
| 139 | /// Returns `LoopControl::Break` if the loop should exit, `LoopControl::Continue` otherwise. | ||
| 140 | async fn process_notification( | ||
| 141 | &self, | ||
| 142 | notification: Result<RelayPoolNotification, RecvError>, | ||
| 143 | pending: &mut PendingUpdates, | ||
| 144 | ) -> LoopControl { | ||
| 145 | match notification { | ||
| 146 | Ok(RelayPoolNotification::Event { event, .. }) => { | ||
| 147 | // Only process 30617 events that list our relay | ||
| 148 | if event.kind == Kind::Custom(30617) { | ||
| 149 | if !self.lists_our_relay(&event) { | ||
| 150 | return LoopControl::Continue; | ||
| 151 | } | ||
| 152 | |||
| 153 | // Extract repo ID and relays | ||
| 154 | if let Some(repo_id) = Self::extract_repo_id(&event) { | ||
| 155 | let relays = Self::extract_relay_urls(&event); | ||
| 156 | // 30617 announcements don't contribute to root_events - those are | ||
| 157 | // the 1617/1618/1619/1621 event IDs that get added when we receive | ||
| 158 | // root events via handle_root_event. See mod.rs:71 for details. | ||
| 159 | pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new()); | ||
| 160 | tracing::info!( | ||
| 161 | event_id = %event.id, | ||
| 162 | repo_id = %repo_id, | ||
| 163 | relay_count = relays.len(), | ||
| 164 | relays = ?relays, | ||
| 165 | "[DIAG] Queued 30617 announcement for batch processing" | ||
| 166 | ); | ||
| 167 | } | ||
| 168 | } else { | ||
| 169 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 170 | // process them to update the RepoSyncIndex | ||
| 171 | tracing::trace!( | ||
| 172 | kind = %event.kind, | ||
| 173 | event_id = %event.id, | ||
| 174 | "Received root event" | ||
| 175 | ); | ||
| 176 | self.handle_root_event(&event).await; | ||
| 177 | } | ||
| 178 | LoopControl::Continue | ||
| 179 | } | ||
| 180 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 181 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 182 | LoopControl::Break | ||
| 183 | } | ||
| 184 | Err(e) => { | ||
| 185 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 186 | LoopControl::Break | ||
| 187 | } | ||
| 188 | _ => LoopControl::Continue, | ||
| 189 | } | ||
| 190 | } | ||
| 191 | |||
| 121 | /// Extract relay URLs from event tags | 192 | /// Extract relay URLs from event tags |
| 122 | /// | 193 | /// |
| 123 | /// Extracts URLs from: | 194 | /// Extracts URLs from: |
| @@ -267,49 +338,8 @@ impl SelfSubscriber { | |||
| 267 | if let Some(ref mut rx) = shutdown_rx { | 338 | if let Some(ref mut rx) = shutdown_rx { |
| 268 | tokio::select! { | 339 | tokio::select! { |
| 269 | notification = notifications.recv() => { | 340 | notification = notifications.recv() => { |
| 270 | match notification { | 341 | if let LoopControl::Break = self.process_notification(notification, &mut pending).await { |
| 271 | Ok(RelayPoolNotification::Event { event, .. }) => { | 342 | break; |
| 272 | // Only process 30617 events that list our relay | ||
| 273 | if event.kind == Kind::Custom(30617) { | ||
| 274 | if !self.lists_our_relay(&event) { | ||
| 275 | continue; | ||
| 276 | } | ||
| 277 | |||
| 278 | // Extract repo ID and relays | ||
| 279 | if let Some(repo_id) = Self::extract_repo_id(&event) { | ||
| 280 | let relays = Self::extract_relay_urls(&event); | ||
| 281 | // 30617 announcements don't contribute to root_events - those are | ||
| 282 | // the 1617/1618/1619/1621 event IDs that get added when we receive | ||
| 283 | // root events via handle_root_event. See mod.rs:71 for details. | ||
| 284 | pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new()); | ||
| 285 | tracing::info!( | ||
| 286 | event_id = %event.id, | ||
| 287 | repo_id = %repo_id, | ||
| 288 | relay_count = relays.len(), | ||
| 289 | relays = ?relays, | ||
| 290 | "[DIAG] Queued 30617 announcement for batch processing" | ||
| 291 | ); | ||
| 292 | } | ||
| 293 | } else { | ||
| 294 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 295 | // process them to update the RepoSyncIndex | ||
| 296 | tracing::trace!( | ||
| 297 | kind = %event.kind, | ||
| 298 | event_id = %event.id, | ||
| 299 | "Received root event" | ||
| 300 | ); | ||
| 301 | self.handle_root_event(&event).await; | ||
| 302 | } | ||
| 303 | } | ||
| 304 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 305 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 306 | break; | ||
| 307 | } | ||
| 308 | Err(e) => { | ||
| 309 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 310 | break; | ||
| 311 | } | ||
| 312 | _ => {} | ||
| 313 | } | 343 | } |
| 314 | } | 344 | } |
| 315 | _ = timer.tick() => { | 345 | _ = timer.tick() => { |
| @@ -326,46 +356,8 @@ impl SelfSubscriber { | |||
| 326 | // No shutdown receiver - original behavior | 356 | // No shutdown receiver - original behavior |
| 327 | tokio::select! { | 357 | tokio::select! { |
| 328 | notification = notifications.recv() => { | 358 | notification = notifications.recv() => { |
| 329 | match notification { | 359 | if let LoopControl::Break = self.process_notification(notification, &mut pending).await { |
| 330 | Ok(RelayPoolNotification::Event { event, .. }) => { | 360 | break; |
| 331 | // Only process 30617 events that list our relay | ||
| 332 | if event.kind == Kind::Custom(30617) { | ||
| 333 | if !self.lists_our_relay(&event) { | ||
| 334 | continue; | ||
| 335 | } | ||
| 336 | |||
| 337 | // Extract repo ID and relays | ||
| 338 | if let Some(repo_id) = Self::extract_repo_id(&event) { | ||
| 339 | let relays = Self::extract_relay_urls(&event); | ||
| 340 | let mut root_events = HashSet::new(); | ||
| 341 | root_events.insert(event.id); | ||
| 342 | |||
| 343 | pending.add_repo(repo_id, relays, root_events); | ||
| 344 | tracing::debug!( | ||
| 345 | event_id = %event.id, | ||
| 346 | "Queued 30617 announcement for batch processing" | ||
| 347 | ); | ||
| 348 | } | ||
| 349 | } else { | ||
| 350 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 351 | // process them to update the RepoSyncIndex | ||
| 352 | tracing::trace!( | ||
| 353 | kind = %event.kind, | ||
| 354 | event_id = %event.id, | ||
| 355 | "Received root event" | ||
| 356 | ); | ||
| 357 | self.handle_root_event(&event).await; | ||
| 358 | } | ||
| 359 | } | ||
| 360 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 361 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 362 | break; | ||
| 363 | } | ||
| 364 | Err(e) => { | ||
| 365 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 366 | break; | ||
| 367 | } | ||
| 368 | _ => {} | ||
| 369 | } | 361 | } |
| 370 | } | 362 | } |
| 371 | _ = timer.tick() => { | 363 | _ = timer.tick() => { |