upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 10:33:07 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 10:33:07 +0000
commit586fc2a7df1ce256469f0742d23f687ac4b075b1 (patch)
treedc07dbec88ea1ca2e80b4ced91831256bb68ce4e /src/sync/self_subscriber.rs
parent2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b (diff)
stub of sync v4
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs497
1 files changed, 0 insertions, 497 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
deleted file mode 100644
index 0512088..0000000
--- a/src/sync/self_subscriber.rs
+++ /dev/null
@@ -1,497 +0,0 @@
1//! Self-Subscriber for Proactive Sync
2//!
3//! This module handles subscribing to our own relay to detect new events
4//! and trigger relay discovery from announcements.
5
6use std::collections::{HashMap, HashSet};
7use std::time::Duration;
8
9use nostr_sdk::prelude::*;
10use tokio::sync::mpsc;
11use tokio::time::Instant;
12
13use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT};
14
15use super::{FollowingRepoRootEvents, SyncManager, SyncRelays};
16
17// =============================================================================
18// Types
19// =============================================================================
20
21/// Actions to be taken by the SyncManager based on self-subscription events.
22#[derive(Debug, Clone)]
23pub enum RelayAction {
24 /// Spawn a new relay connection to sync from.
25 /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering.
26 SpawnRelay {
27 relay_url: String,
28 repos_and_root_events: HashMap<String, HashSet<EventId>>,
29 },
30 /// Add filters to an existing relay connection.
31 /// Contains: relay_url, additional repos to add.
32 AddFilters {
33 relay_url: String,
34 repos_and_new_root_event: HashMap<String, HashSet<EventId>>,
35 },
36}
37
38/// Pending updates collected during batch window.
39#[derive(Debug, Default)]
40struct PendingUpdates {
41 /// New announcements (kind 30617) - triggers relay discovery
42 announcements: Vec<Event>,
43 /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set
44 root_events: Vec<Event>,
45}
46
47// =============================================================================
48// SelfSubscriber
49// =============================================================================
50
51/// Subscribes to our own relay to detect new events.
52///
53/// The self-subscriber:
54/// 1. Connects to our own relay
55/// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618)
56/// 3. When events arrive, batches them
57/// 4. On batch timer fire, processes updates and sends relay actions
58pub struct SelfSubscriber {
59 /// URL of our own relay to subscribe to
60 own_relay_url: String,
61 /// Our relay domain for checking if announcements list us
62 relay_domain: String,
63 /// Reference to following repo root events (shared with SyncManager)
64 following_repo_root_events: FollowingRepoRootEvents,
65 /// Reference to sync relays (shared with SyncManager)
66 sync_relays: SyncRelays,
67 /// Channel to send relay actions back to manager
68 action_tx: mpsc::Sender<RelayAction>,
69}
70
71impl SelfSubscriber {
72 /// Create a new self-subscriber.
73 pub fn new(
74 own_relay_url: String,
75 relay_domain: String,
76 following_repo_root_events: FollowingRepoRootEvents,
77 sync_relays: SyncRelays,
78 action_tx: mpsc::Sender<RelayAction>,
79 ) -> Self {
80 Self {
81 own_relay_url,
82 relay_domain,
83 following_repo_root_events,
84 sync_relays,
85 action_tx,
86 }
87 }
88
89 /// Get the batch window duration from environment variable.
90 ///
91 /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS
92 /// for faster tests (typically 200ms).
93 fn get_batch_window() -> Duration {
94 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS")
95 .ok()
96 .and_then(|s| s.parse().ok())
97 .map(Duration::from_millis)
98 .unwrap_or(Duration::from_secs(5))
99 }
100
101 /// Run the self-subscriber event loop.
102 ///
103 /// This method:
104 /// 1. Connects to our own relay
105 /// 2. Subscribes to relevant event kinds
106 /// 3. Receives events and batches them
107 /// 4. On batch timer fire, processes and sends relay actions
108 pub async fn run(self) {
109 tracing::info!("SelfSubscriber starting for {}", self.own_relay_url);
110
111 // Create nostr-sdk client
112 let keys = Keys::generate();
113 let client = Client::new(keys);
114
115 // Connect to our own relay
116 if let Err(e) = client.add_relay(&self.own_relay_url).await {
117 tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e);
118 return;
119 }
120
121 client.connect().await;
122
123 // Wait for connection
124 let mut connected = false;
125 for _ in 0..30 {
126 tokio::time::sleep(Duration::from_millis(100)).await;
127 let relays = client.relays().await;
128 if relays.values().any(|r| r.is_connected()) {
129 connected = true;
130 break;
131 }
132 }
133
134 if !connected {
135 tracing::error!(
136 "Failed to connect to own relay {} after 3 seconds",
137 self.own_relay_url
138 );
139 return;
140 }
141
142 tracing::info!("SelfSubscriber connected to {}", self.own_relay_url);
143
144 // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design)
145 let filter = Filter::new()
146 .kinds([
147 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617
148 Kind::GitPatch, // 1617
149 Kind::Custom(KIND_PR), // 1618
150 Kind::Custom(KIND_PR_UPDATE), // 1619
151 Kind::GitIssue, // 1621
152 ])
153 .since(Timestamp::now());
154
155 if let Err(e) = client.subscribe(filter, None).await {
156 tracing::error!("Failed to subscribe to own relay: {}", e);
157 return;
158 }
159
160 tracing::info!("SelfSubscriber subscribed to event kinds on own relay");
161
162 // Batch state
163 let mut pending = PendingUpdates::default();
164 let mut batch_timer_started: Option<Instant> = None;
165 let batch_window = Self::get_batch_window();
166
167 // Main event loop using notifications stream
168 loop {
169 // Calculate timeout for batch processing
170 let timeout = if let Some(started) = batch_timer_started {
171 let elapsed = started.elapsed();
172 if elapsed >= batch_window {
173 Duration::ZERO
174 } else {
175 batch_window - elapsed
176 }
177 } else {
178 Duration::from_secs(60) // Long timeout when no batch pending
179 };
180
181 // Wait for notification with timeout
182 let notification = tokio::time::timeout(timeout, client.notifications().recv()).await;
183
184 match notification {
185 Ok(Ok(notification)) => {
186 match notification {
187 RelayPoolNotification::Event { event, .. } => {
188 let kind = event.kind.as_u16();
189
190 // Start batch timer on first event (does NOT reset)
191 if batch_timer_started.is_none() {
192 batch_timer_started = Some(Instant::now());
193 tracing::debug!("Batch timer started");
194 }
195
196 // Classify and add to pending
197 if kind == KIND_REPOSITORY_ANNOUNCEMENT {
198 tracing::debug!(
199 "SelfSubscriber received announcement {}",
200 event.id
201 );
202 pending.announcements.push(*event);
203 } else {
204 tracing::debug!(
205 "SelfSubscriber received root event {} (kind {})",
206 event.id,
207 kind
208 );
209 pending.root_events.push(*event);
210 }
211 }
212 RelayPoolNotification::Message { message, .. } => {
213 if let RelayMessage::EndOfStoredEvents(_) = message {
214 tracing::debug!("SelfSubscriber EOSE received");
215 // Process any pending events after EOSE
216 if !pending.announcements.is_empty()
217 || !pending.root_events.is_empty()
218 {
219 self.process_batch(&mut pending).await;
220 batch_timer_started = None;
221 }
222 }
223 }
224 RelayPoolNotification::Shutdown => {
225 tracing::info!("SelfSubscriber shutting down");
226 break;
227 }
228 }
229 }
230 Ok(Err(_)) => {
231 // Channel closed
232 tracing::warn!("SelfSubscriber notification channel closed");
233 break;
234 }
235 Err(_) => {
236 // Timeout - check if batch should be processed
237 if let Some(started) = batch_timer_started {
238 if started.elapsed() >= batch_window {
239 if !pending.announcements.is_empty() || !pending.root_events.is_empty()
240 {
241 self.process_batch(&mut pending).await;
242 }
243 batch_timer_started = None;
244 }
245 }
246 }
247 }
248 }
249
250 client.disconnect().await;
251 tracing::info!("SelfSubscriber disconnected");
252 }
253
254 /// Process a batch of pending updates.
255 async fn process_batch(&self, pending: &mut PendingUpdates) {
256 tracing::debug!(
257 "Processing batch: {} announcements, {} root events",
258 pending.announcements.len(),
259 pending.root_events.len()
260 );
261
262 // Process root events first (update following_repo_root_events)
263 for event in pending.root_events.drain(..) {
264 let repo_refs = SyncManager::extract_all_repo_refs(&event);
265 if !repo_refs.is_empty() {
266 let mut guard = self.following_repo_root_events.write().await;
267 for repo_ref in repo_refs {
268 guard.entry(repo_ref).or_default().insert(event.id);
269 }
270 }
271 }
272
273 // Process announcements (relay discovery)
274 for event in pending.announcements.drain(..) {
275 self.process_announcement(&event).await;
276 }
277 }
278
279 /// Process an announcement event for relay discovery.
280 async fn process_announcement(&self, event: &Event) {
281 let repo_ref = SyncManager::build_repo_ref(event);
282 let relay_urls = Self::extract_relay_urls_from_announcement(event);
283
284 // Check if this announcement lists our relay
285 if !self.lists_our_service(event) {
286 tracing::debug!(
287 "Announcement {} does not list our service, skipping relay discovery",
288 event.id
289 );
290 return;
291 }
292
293 tracing::info!(
294 "Processing announcement {} for repo {}, found {} relay URLs",
295 event.id,
296 repo_ref,
297 relay_urls.len()
298 );
299
300 // Get current events for this repo from following_repo_root_events
301 let events = self
302 .following_repo_root_events
303 .read()
304 .await
305 .get(&repo_ref)
306 .cloned()
307 .unwrap_or_default();
308
309 // For each relay URL in the announcement, check if we need to spawn or update
310 for relay_url in relay_urls {
311 if self.is_own_relay(&relay_url) {
312 continue; // Skip our own relay
313 }
314
315 let sync_relays_guard = self.sync_relays.read().await;
316 let exists = sync_relays_guard.contains_key(&relay_url);
317 drop(sync_relays_guard);
318
319 if exists {
320 // Relay already known - check if we need to add this repo
321 let mut guard = self.sync_relays.write().await;
322 let relay_repos = guard.entry(relay_url.clone()).or_default();
323 let is_new_repo = !relay_repos.contains_key(&repo_ref);
324
325 if is_new_repo {
326 relay_repos.insert(repo_ref.clone(), events.clone());
327 drop(guard);
328
329 // Send action to add filters
330 let mut repos_filters = HashMap::new();
331 repos_filters.insert(repo_ref.clone(), events.clone());
332
333 if let Err(e) = self
334 .action_tx
335 .send(RelayAction::AddFilters {
336 relay_url: relay_url.clone(),
337 repos_and_new_root_event: repos_filters,
338 })
339 .await
340 {
341 tracing::warn!("Failed to send AddFilters action: {}", e);
342 }
343 }
344 } else {
345 // New relay - add to sync_relays and spawn
346 let mut guard = self.sync_relays.write().await;
347 let mut repos = HashMap::new();
348 repos.insert(repo_ref.clone(), events.clone());
349 guard.insert(relay_url.clone(), repos.clone());
350 drop(guard);
351
352 tracing::info!("Discovered new relay to sync from: {}", relay_url);
353
354 // Send action to spawn relay
355 if let Err(e) = self
356 .action_tx
357 .send(RelayAction::SpawnRelay {
358 relay_url: relay_url.clone(),
359 repos_and_root_events: repos,
360 })
361 .await
362 {
363 tracing::warn!("Failed to send SpawnRelay action: {}", e);
364 }
365 }
366 }
367 }
368
369 /// Extract relay URLs from an announcement event.
370 ///
371 /// Looks for both 'relays' and 'clone' tags.
372 fn extract_relay_urls_from_announcement(event: &Event) -> Vec<String> {
373 let mut urls = Vec::new();
374
375 // Extract from 'relays' tag
376 for tag in event.tags.iter() {
377 if matches!(tag.kind(), TagKind::Relays) {
378 let vec = tag.clone().to_vec();
379 urls.extend(vec.into_iter().skip(1)); // Skip tag name
380 }
381 }
382
383 // Extract from 'clone' tag - parse URLs to get relay hints
384 // Clone URLs look like: http://domain/repo.git or git://domain/repo.git
385 // We want to construct ws://domain from these
386 for tag in event.tags.iter() {
387 if matches!(tag.kind(), TagKind::Clone) {
388 let vec = tag.clone().to_vec();
389 for url in vec.into_iter().skip(1) {
390 if let Some(relay_url) = Self::clone_url_to_relay_url(&url) {
391 if !urls.contains(&relay_url) {
392 urls.push(relay_url);
393 }
394 }
395 }
396 }
397 }
398
399 urls
400 }
401
402 /// Convert a clone URL to a potential relay URL.
403 ///
404 /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080"
405 fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
406 // Parse the URL to extract host:port
407 if let Ok(url) = url::Url::parse(clone_url) {
408 let host = url.host_str()?;
409 let port = url.port();
410 let scheme = if url.scheme() == "https" { "wss" } else { "ws" };
411
412 if let Some(port) = port {
413 Some(format!("{}://{}:{}", scheme, host, port))
414 } else {
415 Some(format!("{}://{}", scheme, host))
416 }
417 } else {
418 None
419 }
420 }
421
422 /// Check if event lists our service in the relays or clone tags.
423 fn lists_our_service(&self, event: &Event) -> bool {
424 // Check relays tag
425 for tag in event.tags.iter() {
426 if matches!(tag.kind(), TagKind::Relays) {
427 let vec = tag.clone().to_vec();
428 for url in vec.into_iter().skip(1) {
429 if self.is_own_relay(&url) {
430 return true;
431 }
432 }
433 }
434 }
435
436 // Check clone tag
437 for tag in event.tags.iter() {
438 if matches!(tag.kind(), TagKind::Clone) {
439 let vec = tag.clone().to_vec();
440 for url in vec.into_iter().skip(1) {
441 if url.contains(&self.relay_domain) {
442 return true;
443 }
444 }
445 }
446 }
447
448 false
449 }
450
451 /// Check if a relay URL matches our relay.
452 fn is_own_relay(&self, relay_url: &str) -> bool {
453 relay_url.contains(&self.relay_domain)
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[test]
462 fn test_clone_url_to_relay_url_http() {
463 let url = "http://127.0.0.1:8080/repo.git";
464 let relay = SelfSubscriber::clone_url_to_relay_url(url);
465 assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string()));
466 }
467
468 #[test]
469 fn test_clone_url_to_relay_url_https() {
470 let url = "https://example.com/repo.git";
471 let relay = SelfSubscriber::clone_url_to_relay_url(url);
472 assert_eq!(relay, Some("wss://example.com".to_string()));
473 }
474
475 #[test]
476 fn test_clone_url_to_relay_url_invalid() {
477 let url = "not-a-valid-url";
478 let relay = SelfSubscriber::clone_url_to_relay_url(url);
479 assert_eq!(relay, None);
480 }
481
482 #[test]
483 fn test_get_batch_window_default() {
484 // Clear env var if set
485 std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS");
486 let window = SelfSubscriber::get_batch_window();
487 assert_eq!(window, Duration::from_secs(5));
488 }
489
490 #[test]
491 fn test_get_batch_window_from_env() {
492 std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200");
493 let window = SelfSubscriber::get_batch_window();
494 assert_eq!(window, Duration::from_millis(200));
495 std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS");
496 }
497}