upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs158
1 files changed, 75 insertions, 83 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 8113035..e9e61ff 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -13,11 +13,24 @@ use std::time::Duration;
13 13
14use nostr_sdk::prelude::*; 14use nostr_sdk::prelude::*;
15use nostr_sdk::Timestamp; 15use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError;
16use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
17 18
18use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds};
19 20
20// ============================================================================= 21// =============================================================================
22// LoopControl - Result of notification processing
23// =============================================================================
24
25/// Control flow result from processing a notification
26enum LoopControl {
27 /// Continue processing the next notification
28 Continue,
29 /// Break out of the event loop
30 Break,
31}
32
33// =============================================================================
21// PendingUpdates - Accumulator for batching 34// PendingUpdates - Accumulator for batching
22// ============================================================================= 35// =============================================================================
23 36
@@ -118,6 +131,64 @@ impl SelfSubscriber {
118 .unwrap_or(Duration::from_millis(5000)) 131 .unwrap_or(Duration::from_millis(5000))
119 } 132 }
120 133
134 /// Process a relay pool notification
135 ///
136 /// Handles incoming events from the subscription, queueing 30617 announcements
137 /// for batch processing and immediately processing root events.
138 ///
139 /// Returns `LoopControl::Break` if the loop should exit, `LoopControl::Continue` otherwise.
140 async fn process_notification(
141 &self,
142 notification: Result<RelayPoolNotification, RecvError>,
143 pending: &mut PendingUpdates,
144 ) -> LoopControl {
145 match notification {
146 Ok(RelayPoolNotification::Event { event, .. }) => {
147 // Only process 30617 events that list our relay
148 if event.kind == Kind::Custom(30617) {
149 if !self.lists_our_relay(&event) {
150 return LoopControl::Continue;
151 }
152
153 // Extract repo ID and relays
154 if let Some(repo_id) = Self::extract_repo_id(&event) {
155 let relays = Self::extract_relay_urls(&event);
156 // 30617 announcements don't contribute to root_events - those are
157 // the 1617/1618/1619/1621 event IDs that get added when we receive
158 // root events via handle_root_event. See mod.rs:71 for details.
159 pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new());
160 tracing::info!(
161 event_id = %event.id,
162 repo_id = %repo_id,
163 relay_count = relays.len(),
164 relays = ?relays,
165 "[DIAG] Queued 30617 announcement for batch processing"
166 );
167 }
168 } else {
169 // For root event kinds (1617, 1618, 1619, 1621),
170 // process them to update the RepoSyncIndex
171 tracing::trace!(
172 kind = %event.kind,
173 event_id = %event.id,
174 "Received root event"
175 );
176 self.handle_root_event(&event).await;
177 }
178 LoopControl::Continue
179 }
180 Ok(RelayPoolNotification::Shutdown) => {
181 tracing::info!("SelfSubscriber received shutdown notification");
182 LoopControl::Break
183 }
184 Err(e) => {
185 tracing::error!(error = %e, "Error receiving notification");
186 LoopControl::Break
187 }
188 _ => LoopControl::Continue,
189 }
190 }
191
121 /// Extract relay URLs from event tags 192 /// Extract relay URLs from event tags
122 /// 193 ///
123 /// Extracts URLs from: 194 /// Extracts URLs from:
@@ -267,49 +338,8 @@ impl SelfSubscriber {
267 if let Some(ref mut rx) = shutdown_rx { 338 if let Some(ref mut rx) = shutdown_rx {
268 tokio::select! { 339 tokio::select! {
269 notification = notifications.recv() => { 340 notification = notifications.recv() => {
270 match notification { 341 if let LoopControl::Break = self.process_notification(notification, &mut pending).await {
271 Ok(RelayPoolNotification::Event { event, .. }) => { 342 break;
272 // Only process 30617 events that list our relay
273 if event.kind == Kind::Custom(30617) {
274 if !self.lists_our_relay(&event) {
275 continue;
276 }
277
278 // Extract repo ID and relays
279 if let Some(repo_id) = Self::extract_repo_id(&event) {
280 let relays = Self::extract_relay_urls(&event);
281 // 30617 announcements don't contribute to root_events - those are
282 // the 1617/1618/1619/1621 event IDs that get added when we receive
283 // root events via handle_root_event. See mod.rs:71 for details.
284 pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new());
285 tracing::info!(
286 event_id = %event.id,
287 repo_id = %repo_id,
288 relay_count = relays.len(),
289 relays = ?relays,
290 "[DIAG] Queued 30617 announcement for batch processing"
291 );
292 }
293 } else {
294 // For root event kinds (1617, 1618, 1619, 1621),
295 // process them to update the RepoSyncIndex
296 tracing::trace!(
297 kind = %event.kind,
298 event_id = %event.id,
299 "Received root event"
300 );
301 self.handle_root_event(&event).await;
302 }
303 }
304 Ok(RelayPoolNotification::Shutdown) => {
305 tracing::info!("SelfSubscriber received shutdown notification");
306 break;
307 }
308 Err(e) => {
309 tracing::error!(error = %e, "Error receiving notification");
310 break;
311 }
312 _ => {}
313 } 343 }
314 } 344 }
315 _ = timer.tick() => { 345 _ = timer.tick() => {
@@ -326,46 +356,8 @@ impl SelfSubscriber {
326 // No shutdown receiver - original behavior 356 // No shutdown receiver - original behavior
327 tokio::select! { 357 tokio::select! {
328 notification = notifications.recv() => { 358 notification = notifications.recv() => {
329 match notification { 359 if let LoopControl::Break = self.process_notification(notification, &mut pending).await {
330 Ok(RelayPoolNotification::Event { event, .. }) => { 360 break;
331 // Only process 30617 events that list our relay
332 if event.kind == Kind::Custom(30617) {
333 if !self.lists_our_relay(&event) {
334 continue;
335 }
336
337 // Extract repo ID and relays
338 if let Some(repo_id) = Self::extract_repo_id(&event) {
339 let relays = Self::extract_relay_urls(&event);
340 let mut root_events = HashSet::new();
341 root_events.insert(event.id);
342
343 pending.add_repo(repo_id, relays, root_events);
344 tracing::debug!(
345 event_id = %event.id,
346 "Queued 30617 announcement for batch processing"
347 );
348 }
349 } else {
350 // For root event kinds (1617, 1618, 1619, 1621),
351 // process them to update the RepoSyncIndex
352 tracing::trace!(
353 kind = %event.kind,
354 event_id = %event.id,
355 "Received root event"
356 );
357 self.handle_root_event(&event).await;
358 }
359 }
360 Ok(RelayPoolNotification::Shutdown) => {
361 tracing::info!("SelfSubscriber received shutdown notification");
362 break;
363 }
364 Err(e) => {
365 tracing::error!(error = %e, "Error receiving notification");
366 break;
367 }
368 _ => {}
369 } 361 }
370 } 362 }
371 _ = timer.tick() => { 363 _ = timer.tick() => {