upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs103
1 files changed, 91 insertions, 12 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 1dec219..229d2e1 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -12,6 +12,7 @@ use std::collections::{HashMap, HashSet};
12use std::time::Duration; 12use std::time::Duration;
13 13
14use nostr_sdk::prelude::*; 14use nostr_sdk::prelude::*;
15use nostr_sdk::Timestamp;
15use tokio::sync::mpsc; 16use tokio::sync::mpsc;
16 17
17use super::{RepoSyncIndex, RepoSyncNeeds}; 18use super::{RepoSyncIndex, RepoSyncNeeds};
@@ -101,6 +102,8 @@ pub struct SelfSubscriber {
101 repo_sync_index: RepoSyncIndex, 102 repo_sync_index: RepoSyncIndex,
102 /// Channel to send actions to SyncManager 103 /// Channel to send actions to SyncManager
103 action_tx: mpsc::Sender<RelayAction>, 104 action_tx: mpsc::Sender<RelayAction>,
105 /// Last time we connected - used for since filter on reconnect
106 last_connected: Option<Timestamp>,
104} 107}
105 108
106impl SelfSubscriber { 109impl SelfSubscriber {
@@ -122,6 +125,7 @@ impl SelfSubscriber {
122 relay_domain, 125 relay_domain,
123 repo_sync_index, 126 repo_sync_index,
124 action_tx, 127 action_tx,
128 last_connected: None,
125 } 129 }
126 } 130 }
127 131
@@ -205,7 +209,7 @@ impl SelfSubscriber {
205 /// 209 ///
206 /// Connects to own relay, subscribes to relevant event kinds, 210 /// Connects to own relay, subscribes to relevant event kinds,
207 /// and batches updates before processing them. 211 /// and batches updates before processing them.
208 pub async fn run(self) { 212 pub async fn run(mut self) {
209 let client = Client::default(); 213 let client = Client::default();
210 214
211 // Add own relay 215 // Add own relay
@@ -223,13 +227,36 @@ impl SelfSubscriber {
223 227
224 // Subscribe to announcement and root event kinds 228 // Subscribe to announcement and root event kinds
225 // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618) 229 // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618)
226 let filter = Filter::new().kinds(vec![ 230 // Check if we have a last_connected time for reconnect filtering
227 Kind::Custom(30617), // Repository Announcements 231 let filter = if let Some(last) = self.last_connected {
228 Kind::Custom(1617), // Patches 232 // Quick reconnect - use since filter (15 min buffer)
229 Kind::Custom(1618), // Issues 233 let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60));
230 Kind::Custom(1619), // Replies/Status 234 tracing::debug!(
231 Kind::Custom(1621), // Pull Requests 235 since = %since,
232 ]); 236 "Using since filter for reconnect"
237 );
238 Filter::new()
239 .kinds(vec![
240 Kind::Custom(30617), // Repository Announcements
241 Kind::Custom(1617), // Patches
242 Kind::Custom(1618), // Issues
243 Kind::Custom(1619), // Replies/Status
244 Kind::Custom(1621), // Pull Requests
245 ])
246 .since(since)
247 } else {
248 // First connection - no since filter
249 Filter::new().kinds(vec![
250 Kind::Custom(30617), // Repository Announcements
251 Kind::Custom(1617), // Patches
252 Kind::Custom(1618), // Issues
253 Kind::Custom(1619), // Replies/Status
254 Kind::Custom(1621), // Pull Requests
255 ])
256 };
257
258 // Update last_connected AFTER creating filter but BEFORE subscribing
259 self.last_connected = Some(Timestamp::now());
233 260
234 if let Err(e) = client.subscribe(filter, None).await { 261 if let Err(e) = client.subscribe(filter, None).await {
235 tracing::error!( 262 tracing::error!(
@@ -278,14 +305,13 @@ impl SelfSubscriber {
278 } 305 }
279 } else { 306 } else {
280 // For root event kinds (1617, 1618, 1619, 1621), 307 // For root event kinds (1617, 1618, 1619, 1621),
281 // we need to check if they reference repos we care about. 308 // process them to update the RepoSyncIndex
282 // For now, we'll track them in a simpler way.
283 // Full implementation would extract 'a' tag and match to known repos.
284 tracing::trace!( 309 tracing::trace!(
285 kind = %event.kind, 310 kind = %event.kind,
286 event_id = %event.id, 311 event_id = %event.id,
287 "Received root event (processing deferred)" 312 "Received root event"
288 ); 313 );
314 self.handle_root_event(&event).await;
289 } 315 }
290 } 316 }
291 Ok(RelayPoolNotification::Shutdown) => { 317 Ok(RelayPoolNotification::Shutdown) => {
@@ -310,6 +336,59 @@ impl SelfSubscriber {
310 tracing::info!("SelfSubscriber stopped"); 336 tracing::info!("SelfSubscriber stopped");
311 } 337 }
312 338
339 /// Handle a root event (1617/1618/1619/1621)
340 ///
341 /// Extracts the 'a' tag to find the repo addressable reference,
342 /// then updates the RepoSyncIndex with the event ID.
343 async fn handle_root_event(&self, event: &Event) {
344 // Extract 'a' tag to find the repo addressable reference
345 let repo_a_tag = event.tags.iter().find(|tag| {
346 let tag_vec = tag.as_slice();
347 !tag_vec.is_empty() && tag_vec[0] == "a"
348 });
349
350 let repo_ref = match repo_a_tag {
351 Some(tag) => {
352 let tag_vec = tag.as_slice();
353 // Get first value from tag (the 'a' tag value at index 1)
354 if tag_vec.len() >= 2 {
355 tag_vec[1].clone()
356 } else {
357 tracing::warn!(
358 event_id = %event.id,
359 "Root event has 'a' tag but no content"
360 );
361 return;
362 }
363 }
364 None => {
365 tracing::warn!(
366 event_id = %event.id,
367 "Root event missing 'a' tag"
368 );
369 return;
370 }
371 };
372
373 // Look up repo in repo_sync_index
374 let mut index = self.repo_sync_index.write().await;
375 if let Some(repo_sync) = index.get_mut(&repo_ref) {
376 // Add event.id to root_events set
377 repo_sync.root_events.insert(event.id);
378 tracing::debug!(
379 event_id = %event.id,
380 repo_ref = %repo_ref,
381 "Added root event to repo"
382 );
383 } else {
384 tracing::debug!(
385 event_id = %event.id,
386 repo_ref = %repo_ref,
387 "Root event references unknown repo"
388 );
389 }
390 }
391
313 /// Process accumulated batch 392 /// Process accumulated batch
314 /// 393 ///
315 /// Updates the RepoSyncIndex with discovered repos, then derives per-relay 394 /// Updates the RepoSyncIndex with discovered repos, then derives per-relay