diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:46:41 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:46:41 +0000 |
| commit | c1730d5cafc3af2d5ec8f3bdbed5c32bb15fcb74 (patch) | |
| tree | ac960111a09c17e901713d116e167c7fe13ff719 /src/sync | |
| parent | d3c7de6cc1c265d0ea05e59c86543fb03870ae6f (diff) | |
sync: enhance SelfSubscriber with reconnect and root event tracking
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/self_subscriber.rs | 103 |
1 files changed, 91 insertions, 12 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 1dec219..229d2e1 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -12,6 +12,7 @@ use std::collections::{HashMap, HashSet}; | |||
| 12 | use std::time::Duration; | 12 | use std::time::Duration; |
| 13 | 13 | ||
| 14 | use nostr_sdk::prelude::*; | 14 | use nostr_sdk::prelude::*; |
| 15 | use nostr_sdk::Timestamp; | ||
| 15 | use tokio::sync::mpsc; | 16 | use tokio::sync::mpsc; |
| 16 | 17 | ||
| 17 | use super::{RepoSyncIndex, RepoSyncNeeds}; | 18 | use super::{RepoSyncIndex, RepoSyncNeeds}; |
| @@ -101,6 +102,8 @@ pub struct SelfSubscriber { | |||
| 101 | repo_sync_index: RepoSyncIndex, | 102 | repo_sync_index: RepoSyncIndex, |
| 102 | /// Channel to send actions to SyncManager | 103 | /// Channel to send actions to SyncManager |
| 103 | action_tx: mpsc::Sender<RelayAction>, | 104 | action_tx: mpsc::Sender<RelayAction>, |
| 105 | /// Last time we connected - used for since filter on reconnect | ||
| 106 | last_connected: Option<Timestamp>, | ||
| 104 | } | 107 | } |
| 105 | 108 | ||
| 106 | impl SelfSubscriber { | 109 | impl SelfSubscriber { |
| @@ -122,6 +125,7 @@ impl SelfSubscriber { | |||
| 122 | relay_domain, | 125 | relay_domain, |
| 123 | repo_sync_index, | 126 | repo_sync_index, |
| 124 | action_tx, | 127 | action_tx, |
| 128 | last_connected: None, | ||
| 125 | } | 129 | } |
| 126 | } | 130 | } |
| 127 | 131 | ||
| @@ -205,7 +209,7 @@ impl SelfSubscriber { | |||
| 205 | /// | 209 | /// |
| 206 | /// Connects to own relay, subscribes to relevant event kinds, | 210 | /// Connects to own relay, subscribes to relevant event kinds, |
| 207 | /// and batches updates before processing them. | 211 | /// and batches updates before processing them. |
| 208 | pub async fn run(self) { | 212 | pub async fn run(mut self) { |
| 209 | let client = Client::default(); | 213 | let client = Client::default(); |
| 210 | 214 | ||
| 211 | // Add own relay | 215 | // Add own relay |
| @@ -223,13 +227,36 @@ impl SelfSubscriber { | |||
| 223 | 227 | ||
| 224 | // Subscribe to announcement and root event kinds | 228 | // Subscribe to announcement and root event kinds |
| 225 | // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618) | 229 | // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618) |
| 226 | let filter = Filter::new().kinds(vec![ | 230 | // Check if we have a last_connected time for reconnect filtering |
| 227 | Kind::Custom(30617), // Repository Announcements | 231 | let filter = if let Some(last) = self.last_connected { |
| 228 | Kind::Custom(1617), // Patches | 232 | // Quick reconnect - use since filter (15 min buffer) |
| 229 | Kind::Custom(1618), // Issues | 233 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); |
| 230 | Kind::Custom(1619), // Replies/Status | 234 | tracing::debug!( |
| 231 | Kind::Custom(1621), // Pull Requests | 235 | since = %since, |
| 232 | ]); | 236 | "Using since filter for reconnect" |
| 237 | ); | ||
| 238 | Filter::new() | ||
| 239 | .kinds(vec![ | ||
| 240 | Kind::Custom(30617), // Repository Announcements | ||
| 241 | Kind::Custom(1617), // Patches | ||
| 242 | Kind::Custom(1618), // Issues | ||
| 243 | Kind::Custom(1619), // Replies/Status | ||
| 244 | Kind::Custom(1621), // Pull Requests | ||
| 245 | ]) | ||
| 246 | .since(since) | ||
| 247 | } else { | ||
| 248 | // First connection - no since filter | ||
| 249 | Filter::new().kinds(vec![ | ||
| 250 | Kind::Custom(30617), // Repository Announcements | ||
| 251 | Kind::Custom(1617), // Patches | ||
| 252 | Kind::Custom(1618), // Issues | ||
| 253 | Kind::Custom(1619), // Replies/Status | ||
| 254 | Kind::Custom(1621), // Pull Requests | ||
| 255 | ]) | ||
| 256 | }; | ||
| 257 | |||
| 258 | // Update last_connected AFTER creating filter but BEFORE subscribing | ||
| 259 | self.last_connected = Some(Timestamp::now()); | ||
| 233 | 260 | ||
| 234 | if let Err(e) = client.subscribe(filter, None).await { | 261 | if let Err(e) = client.subscribe(filter, None).await { |
| 235 | tracing::error!( | 262 | tracing::error!( |
| @@ -278,14 +305,13 @@ impl SelfSubscriber { | |||
| 278 | } | 305 | } |
| 279 | } else { | 306 | } else { |
| 280 | // For root event kinds (1617, 1618, 1619, 1621), | 307 | // For root event kinds (1617, 1618, 1619, 1621), |
| 281 | // we need to check if they reference repos we care about. | 308 | // process them to update the RepoSyncIndex |
| 282 | // For now, we'll track them in a simpler way. | ||
| 283 | // Full implementation would extract 'a' tag and match to known repos. | ||
| 284 | tracing::trace!( | 309 | tracing::trace!( |
| 285 | kind = %event.kind, | 310 | kind = %event.kind, |
| 286 | event_id = %event.id, | 311 | event_id = %event.id, |
| 287 | "Received root event (processing deferred)" | 312 | "Received root event" |
| 288 | ); | 313 | ); |
| 314 | self.handle_root_event(&event).await; | ||
| 289 | } | 315 | } |
| 290 | } | 316 | } |
| 291 | Ok(RelayPoolNotification::Shutdown) => { | 317 | Ok(RelayPoolNotification::Shutdown) => { |
| @@ -310,6 +336,59 @@ impl SelfSubscriber { | |||
| 310 | tracing::info!("SelfSubscriber stopped"); | 336 | tracing::info!("SelfSubscriber stopped"); |
| 311 | } | 337 | } |
| 312 | 338 | ||
| 339 | /// Handle a root event (1617/1618/1619/1621) | ||
| 340 | /// | ||
| 341 | /// Extracts the 'a' tag to find the repo addressable reference, | ||
| 342 | /// then updates the RepoSyncIndex with the event ID. | ||
| 343 | async fn handle_root_event(&self, event: &Event) { | ||
| 344 | // Extract 'a' tag to find the repo addressable reference | ||
| 345 | let repo_a_tag = event.tags.iter().find(|tag| { | ||
| 346 | let tag_vec = tag.as_slice(); | ||
| 347 | !tag_vec.is_empty() && tag_vec[0] == "a" | ||
| 348 | }); | ||
| 349 | |||
| 350 | let repo_ref = match repo_a_tag { | ||
| 351 | Some(tag) => { | ||
| 352 | let tag_vec = tag.as_slice(); | ||
| 353 | // Get first value from tag (the 'a' tag value at index 1) | ||
| 354 | if tag_vec.len() >= 2 { | ||
| 355 | tag_vec[1].clone() | ||
| 356 | } else { | ||
| 357 | tracing::warn!( | ||
| 358 | event_id = %event.id, | ||
| 359 | "Root event has 'a' tag but no content" | ||
| 360 | ); | ||
| 361 | return; | ||
| 362 | } | ||
| 363 | } | ||
| 364 | None => { | ||
| 365 | tracing::warn!( | ||
| 366 | event_id = %event.id, | ||
| 367 | "Root event missing 'a' tag" | ||
| 368 | ); | ||
| 369 | return; | ||
| 370 | } | ||
| 371 | }; | ||
| 372 | |||
| 373 | // Look up repo in repo_sync_index | ||
| 374 | let mut index = self.repo_sync_index.write().await; | ||
| 375 | if let Some(repo_sync) = index.get_mut(&repo_ref) { | ||
| 376 | // Add event.id to root_events set | ||
| 377 | repo_sync.root_events.insert(event.id); | ||
| 378 | tracing::debug!( | ||
| 379 | event_id = %event.id, | ||
| 380 | repo_ref = %repo_ref, | ||
| 381 | "Added root event to repo" | ||
| 382 | ); | ||
| 383 | } else { | ||
| 384 | tracing::debug!( | ||
| 385 | event_id = %event.id, | ||
| 386 | repo_ref = %repo_ref, | ||
| 387 | "Root event references unknown repo" | ||
| 388 | ); | ||
| 389 | } | ||
| 390 | } | ||
| 391 | |||
| 313 | /// Process accumulated batch | 392 | /// Process accumulated batch |
| 314 | /// | 393 | /// |
| 315 | /// Updates the RepoSyncIndex with discovered repos, then derives per-relay | 394 | /// Updates the RepoSyncIndex with discovered repos, then derives per-relay |