upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
commit39e782b12fce1776f2ad0b0f5430749533cb80ea (patch)
treed050a079a82898848da870d9307a98a83480629b /src/sync/self_subscriber.rs
parent586fc2a7df1ce256469f0742d23f687ac4b075b1 (diff)
sync v4 mvp
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs430
1 files changed, 430 insertions, 0 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
new file mode 100644
index 0000000..1dec219
--- /dev/null
+++ b/src/sync/self_subscriber.rs
@@ -0,0 +1,430 @@
1//! Self-Subscriber for Proactive Sync
2//!
3//! Monitors the relay's own database for repository announcements and
4//! updates the RepoSyncIndex when new relevant events are discovered.
5//!
6//! This module subscribes to relevant event kinds on our own relay and
7//! batches updates before sending them to the SyncManager.
8//!
9//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
10
11use std::collections::{HashMap, HashSet};
12use std::time::Duration;
13
14use nostr_sdk::prelude::*;
15use tokio::sync::mpsc;
16
17use super::{RepoSyncIndex, RepoSyncNeeds};
18
19// =============================================================================
20// RelayAction - Actions to send to SyncManager
21// =============================================================================
22
23/// Actions that the SelfSubscriber sends to the SyncManager
24#[derive(Debug)]
25pub enum RelayAction {
26 /// Spawn a new relay connection
27 SpawnRelay {
28 /// The relay URL to connect to
29 relay_url: String,
30 /// Repos to sync, mapped to their root event IDs
31 repos: HashMap<String, HashSet<EventId>>,
32 },
33 /// Add filters to an existing relay connection
34 AddFilters {
35 /// The relay URL to add filters to
36 relay_url: String,
37 /// Repos to sync, mapped to their root event IDs
38 repos: HashMap<String, HashSet<EventId>>,
39 },
40}
41
42// =============================================================================
43// PendingUpdates - Accumulator for batching
44// =============================================================================
45
46/// Accumulates updates between batch timer firings
47struct PendingUpdates {
48 /// Repos discovered since last batch, keyed by repo addressable ref
49 repos: HashMap<String, RepoSyncNeeds>,
50}
51
52impl PendingUpdates {
53 /// Create a new empty pending updates accumulator
54 fn new() -> Self {
55 Self {
56 repos: HashMap::new(),
57 }
58 }
59
60 /// Add or update a repo with its relays and root events
61 fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) {
62 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
63 relays: HashSet::new(),
64 root_events: HashSet::new(),
65 });
66 entry.relays.extend(relays);
67 entry.root_events.extend(root_events);
68 }
69
70 /// Check if there are any pending updates
71 fn is_empty(&self) -> bool {
72 self.repos.is_empty()
73 }
74
75 /// Take all pending updates, leaving empty
76 fn take(&mut self) -> HashMap<String, RepoSyncNeeds> {
77 std::mem::take(&mut self.repos)
78 }
79}
80
81// =============================================================================
82// SelfSubscriber - Main Component
83// =============================================================================
84
85/// Subscribes to own relay's events to discover repos needing sync
86///
87/// The SelfSubscriber connects to our own relay and monitors for:
88/// - 30617 (Repository Announcements) - to discover repos listing our relay
89/// - 1617 (Patches) - root events referencing repos
90/// - 1618 (Issues) - root events referencing repos
91/// - 1619 (Replies) - root events referencing repos
92/// - 1621 (PRs) - root events referencing repos
93///
94/// Note: 30618 is NOT subscribed to here (per v4 spec - only synced from remote relays)
95pub struct SelfSubscriber {
96 /// Our own relay URL (to connect to)
97 own_relay_url: String,
98 /// Our service domain (for filtering relevant repos)
99 relay_domain: String,
100 /// Shared index of repos to sync
101 repo_sync_index: RepoSyncIndex,
102 /// Channel to send actions to SyncManager
103 action_tx: mpsc::Sender<RelayAction>,
104}
105
106impl SelfSubscriber {
107 /// Create a new SelfSubscriber
108 ///
109 /// # Arguments
110 /// * `own_relay_url` - The WebSocket URL of our own relay
111 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
112 /// * `repo_sync_index` - Shared index to update with discovered repos
113 /// * `action_tx` - Channel to send RelayActions to the SyncManager
114 pub fn new(
115 own_relay_url: String,
116 relay_domain: String,
117 repo_sync_index: RepoSyncIndex,
118 action_tx: mpsc::Sender<RelayAction>,
119 ) -> Self {
120 Self {
121 own_relay_url,
122 relay_domain,
123 repo_sync_index,
124 action_tx,
125 }
126 }
127
128 /// Get batch window from environment or use default
129 ///
130 /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable.
131 /// Default: 5000ms (5 seconds)
132 fn get_batch_window() -> Duration {
133 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS")
134 .ok()
135 .and_then(|s| s.parse::<u64>().ok())
136 .map(Duration::from_millis)
137 .unwrap_or(Duration::from_millis(5000))
138 }
139
140 /// Extract relay URLs from event tags
141 ///
142 /// Extracts URLs from:
143 /// - `relays` tags: ["relays", "wss://relay1.com", "wss://relay2.com", ...]
144 /// - `clone` tags: ["clone", "https://example.com/repo.git", ...] (converted to ws://)
145 fn extract_relay_urls(event: &Event) -> HashSet<String> {
146 let mut relays = HashSet::new();
147
148 for tag in event.tags.iter() {
149 let tag_vec = tag.as_slice();
150 if tag_vec.is_empty() {
151 continue;
152 }
153
154 match tag_vec[0].as_str() {
155 "relays" => {
156 // All subsequent values are relay URLs
157 for url in tag_vec.iter().skip(1) {
158 relays.insert(url.to_string());
159 }
160 }
161 "clone" if tag_vec.len() >= 2 => {
162 // Convert http(s) clone URL to ws(s) relay URL
163 if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) {
164 relays.insert(relay_url);
165 }
166 }
167 _ => {}
168 }
169 }
170
171 relays
172 }
173
174 /// Extract repo identifier from event
175 ///
176 /// For kind 30617, uses the `d` tag to build the addressable reference
177 /// Format: 30617:pubkey:identifier
178 fn extract_repo_id(event: &Event) -> Option<String> {
179 // For kind 30617, extract d tag and build addressable ref
180 if event.kind == Kind::Custom(30617) {
181 for tag in event.tags.iter() {
182 let tag_vec = tag.as_slice();
183 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
184 return Some(format!("30617:{}:{}", event.pubkey, tag_vec[1]));
185 }
186 }
187 }
188
189 // For other kinds (1617, 1618, 1619, 1621), we'd need to look at
190 // their 'a' tags to find which repo they belong to.
191 // That processing happens in the batch processing, not here.
192 None
193 }
194
195 /// Check if announcement lists our relay
196 ///
197 /// Returns true if any extracted relay URL contains our domain
198 fn lists_our_relay(&self, event: &Event) -> bool {
199 Self::extract_relay_urls(event).iter().any(|url| {
200 url.contains(&self.relay_domain) || url == &self.own_relay_url
201 })
202 }
203
204 /// Main run loop
205 ///
206 /// Connects to own relay, subscribes to relevant event kinds,
207 /// and batches updates before processing them.
208 pub async fn run(self) {
209 let client = Client::default();
210
211 // Add own relay
212 if let Err(e) = client.add_relay(&self.own_relay_url).await {
213 tracing::error!(
214 url = %self.own_relay_url,
215 error = %e,
216 "Failed to add own relay for self-subscription"
217 );
218 return;
219 }
220
221 // Connect
222 client.connect().await;
223
224 // Subscribe to announcement and root event kinds
225 // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618)
226 let filter = Filter::new().kinds(vec![
227 Kind::Custom(30617), // Repository Announcements
228 Kind::Custom(1617), // Patches
229 Kind::Custom(1618), // Issues
230 Kind::Custom(1619), // Replies/Status
231 Kind::Custom(1621), // Pull Requests
232 ]);
233
234 if let Err(e) = client.subscribe(filter, None).await {
235 tracing::error!(
236 error = %e,
237 "Failed to subscribe to own relay for self-subscription"
238 );
239 return;
240 }
241
242 tracing::info!(
243 url = %self.own_relay_url,
244 domain = %self.relay_domain,
245 "SelfSubscriber started"
246 );
247
248 let mut notifications = client.notifications();
249 let batch_window = Self::get_batch_window();
250 let mut pending = PendingUpdates::new();
251
252 // Timer does NOT reset on new events - use interval
253 let mut timer = tokio::time::interval(batch_window);
254 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
255
256 loop {
257 tokio::select! {
258 notification = notifications.recv() => {
259 match notification {
260 Ok(RelayPoolNotification::Event { event, .. }) => {
261 // Only process 30617 events that list our relay
262 if event.kind == Kind::Custom(30617) {
263 if !self.lists_our_relay(&event) {
264 continue;
265 }
266
267 // Extract repo ID and relays
268 if let Some(repo_id) = Self::extract_repo_id(&event) {
269 let relays = Self::extract_relay_urls(&event);
270 let mut root_events = HashSet::new();
271 root_events.insert(event.id);
272
273 pending.add_repo(repo_id, relays, root_events);
274 tracing::debug!(
275 event_id = %event.id,
276 "Queued 30617 announcement for batch processing"
277 );
278 }
279 } else {
280 // For root event kinds (1617, 1618, 1619, 1621),
281 // we need to check if they reference repos we care about.
282 // For now, we'll track them in a simpler way.
283 // Full implementation would extract 'a' tag and match to known repos.
284 tracing::trace!(
285 kind = %event.kind,
286 event_id = %event.id,
287 "Received root event (processing deferred)"
288 );
289 }
290 }
291 Ok(RelayPoolNotification::Shutdown) => {
292 tracing::info!("SelfSubscriber received shutdown notification");
293 break;
294 }
295 Err(e) => {
296 tracing::error!(error = %e, "Error receiving notification");
297 break;
298 }
299 _ => {}
300 }
301 }
302 _ = timer.tick() => {
303 if !pending.is_empty() {
304 self.process_batch(&mut pending).await;
305 }
306 }
307 }
308 }
309
310 tracing::info!("SelfSubscriber stopped");
311 }
312
313 /// Process accumulated batch
314 ///
315 /// Updates the RepoSyncIndex with discovered repos, then derives per-relay
316 /// targets and sends RelayAction messages to the SyncManager.
317 async fn process_batch(&self, pending: &mut PendingUpdates) {
318 use crate::sync::algorithms::derive_relay_targets;
319
320 let updates = pending.take();
321
322 if updates.is_empty() {
323 return;
324 }
325
326 tracing::info!(
327 repo_count = updates.len(),
328 "Processing batch of repo updates"
329 );
330
331 // Update RepoSyncIndex
332 let mut index = self.repo_sync_index.write().await;
333
334 for (repo_id, needs) in updates {
335 // Merge with existing entry or insert new
336 let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds {
337 relays: HashSet::new(),
338 root_events: HashSet::new(),
339 });
340 entry.relays.extend(needs.relays);
341 entry.root_events.extend(needs.root_events);
342
343 tracing::debug!(
344 repo_id = %repo_id,
345 relay_count = entry.relays.len(),
346 event_count = entry.root_events.len(),
347 "Updated repo sync needs"
348 );
349 }
350
351 // Derive per-relay targets from the updated index
352 let targets = derive_relay_targets(&index);
353 drop(index); // Release lock before async operations
354
355 // For each relay, send SpawnRelay action
356 // SyncManager will check if relay already exists
357 for (relay_url, needs) in targets {
358 // Skip our own relay URL (we're subscribed to ourselves via self-subscription)
359 if relay_url.contains(&self.relay_domain) {
360 continue;
361 }
362
363 // Convert needs to HashMap<String, HashSet<EventId>>
364 let mut repos = HashMap::new();
365 for repo_id in needs.repos {
366 repos.insert(repo_id, needs.root_events.clone());
367 }
368
369 let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos };
370
371 if let Err(e) = self.action_tx.send(action).await {
372 tracing::error!(
373 relay = %relay_url,
374 error = %e,
375 "Failed to send SpawnRelay action"
376 );
377 } else {
378 tracing::debug!(
379 relay = %relay_url,
380 "Sent SpawnRelay action to SyncManager"
381 );
382 }
383 }
384 }
385}
386
387// =============================================================================
388// Helper Functions
389// =============================================================================
390
391/// Convert clone URL to relay URL
392///
393/// Converts http:// to ws:// and https:// to wss://
394/// Returns None for unsupported URL schemes
395fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
396 if clone_url.starts_with("http://") {
397 Some(clone_url.replacen("http://", "ws://", 1))
398 } else if clone_url.starts_with("https://") {
399 Some(clone_url.replacen("https://", "wss://", 1))
400 } else {
401 None
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_clone_url_to_relay_url_https() {
411 assert_eq!(
412 clone_url_to_relay_url("https://example.com/repo.git"),
413 Some("wss://example.com/repo.git".to_string())
414 );
415 }
416
417 #[test]
418 fn test_clone_url_to_relay_url_http() {
419 assert_eq!(
420 clone_url_to_relay_url("http://localhost:3000/repo.git"),
421 Some("ws://localhost:3000/repo.git".to_string())
422 );
423 }
424
425 #[test]
426 fn test_clone_url_to_relay_url_unsupported() {
427 assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None);
428 assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None);
429 }
430} \ No newline at end of file