upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
commit39e782b12fce1776f2ad0b0f5430749533cb80ea (patch)
treed050a079a82898848da870d9307a98a83480629b /src/sync/mod.rs
parent586fc2a7df1ce256469f0742d23f687ac4b075b1 (diff)
sync v4 mvp
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs301
1 files changed, 292 insertions, 9 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index c1f8bca..fb09896 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -12,6 +12,20 @@
12//! 12//!
13//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. 13//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
14 14
15pub mod algorithms;
16pub mod filters;
17pub mod relay_connection;
18pub mod self_subscriber;
19
20// Re-export core algorithm types
21pub use algorithms::{AddFilters, RelaySyncNeeds};
22
23// Re-export relay connection types
24pub use relay_connection::{RelayConnection, RelayEvent};
25
26// Re-export self-subscriber types
27pub use self_subscriber::{RelayAction, SelfSubscriber};
28
15use std::collections::{HashMap, HashSet}; 29use std::collections::{HashMap, HashSet};
16use std::sync::Arc; 30use std::sync::Arc;
17 31
@@ -355,21 +369,290 @@ impl SyncManager {
355 } 369 }
356 } 370 }
357 371
358 /// Run the sync manager (placeholder for Phase 1) 372 /// Run the sync manager
359 /// 373 ///
360 /// This will be implemented in later phases to: 374 /// Coordinates all sync components:
361 /// 1. Subscribe to local relay for 30617 events 375 /// 1. Spawns self-subscriber to monitor own relay for announcements
362 /// 2. Process events to build RepoSyncIndex 376 /// 2. Connects to bootstrap relay if configured
363 /// 3. Compute and execute sync actions 377 /// 3. Handles relay actions from self-subscriber
364 /// 4. Handle reconnection and catch-up logic
365 pub async fn run(self) { 378 pub async fn run(self) {
379 use tokio::sync::mpsc;
380
366 tracing::info!( 381 tracing::info!(
367 bootstrap_relay = ?self.bootstrap_relay_url, 382 bootstrap_relay = ?self.bootstrap_relay_url,
368 service_domain = %self.service_domain, 383 service_domain = %self.service_domain,
369 "SyncManager starting (placeholder - not yet implemented)" 384 "SyncManager starting"
385 );
386
387 // 1. Create action channel for self-subscriber -> manager communication
388 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100);
389
390 // 2. Spawn self-subscriber
391 let self_subscriber = SelfSubscriber::new(
392 format!("ws://{}", self.service_domain),
393 self.service_domain.clone(),
394 Arc::clone(&self.repo_sync_index),
395 action_tx,
370 ); 396 );
397 tokio::spawn(async move { self_subscriber.run().await });
398
399 // 3. Connect to bootstrap relay if configured
400 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
401 self.spawn_relay_connection(bootstrap_url.clone()).await;
402 }
371 403
372 // Phase 1: Just log and return 404 // 4. Main loop - handle actions from self-subscriber
373 // Full implementation will be added in subsequent phases 405 loop {
406 tokio::select! {
407 action = action_rx.recv() => {
408 match action {
409 Some(RelayAction::SpawnRelay { relay_url, repos }) => {
410 // Check if relay already exists
411 let relay_index = self.relay_sync_index.read().await;
412 let exists = relay_index.contains_key(&relay_url);
413 drop(relay_index);
414
415 if !exists {
416 tracing::info!(relay = %relay_url, "Spawning new relay connection");
417 self.spawn_relay_with_layer2(relay_url, repos).await;
418 } else {
419 tracing::debug!(
420 relay = %relay_url,
421 "Relay already exists, considering AddFilters"
422 );
423 // For MVP, we don't handle AddFilters - just log
424 // Full implementation would call subscribe_filters on existing connection
425 }
426 }
427 Some(RelayAction::AddFilters { relay_url, repos }) => {
428 tracing::debug!(
429 relay = %relay_url,
430 repo_count = repos.len(),
431 "AddFilters action (MVP: not implemented)"
432 );
433 // For MVP, not implemented - full version would add Layer 2 filters
434 // to existing relay connection
435 }
436 None => break,
437 }
438 }
439 }
440 }
441 }
442
443 /// Spawn relay connection with Layer 2 filters for specific repos
444 ///
445 /// Used when discovering relays from announcements. Connects to the relay,
446 /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the
447 /// specific repos we want to sync.
448 async fn spawn_relay_with_layer2(
449 &self,
450 relay_url: String,
451 repos: HashMap<String, HashSet<EventId>>,
452 ) {
453 use crate::sync::filters::build_layer2_and_layer3_filters;
454 use tokio::sync::mpsc;
455
456 let database = Arc::clone(&self.database);
457 let write_policy = self.write_policy.clone();
458 let relay_sync_index = Arc::clone(&self.relay_sync_index);
459
460 // Create relay connection
461 let connection = RelayConnection::new(relay_url.clone());
462
463 // Connect and subscribe to Layer 1 (announcements)
464 if let Err(e) = connection.connect_and_subscribe(None).await {
465 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
466 return;
467 }
468
469 // Mark as connected in relay sync index
470 {
471 let mut index = relay_sync_index.write().await;
472 index.insert(
473 relay_url.clone(),
474 RelayState {
475 repos: repos.keys().cloned().collect(),
476 root_events: repos.values().flatten().cloned().collect(),
477 is_bootstrap: false,
478 connection_status: ConnectionStatus::Connected,
479 last_connected: Some(Timestamp::now()),
480 disconnected_at: None,
481 },
482 );
483 }
484
485 // Subscribe to Layer 2+3 filters for the repos
486 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
487 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
488 let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None);
489
490 for filter in filters {
491 if let Err(e) = connection.subscribe_filter(filter).await {
492 tracing::error!(
493 relay = %relay_url,
494 error = %e,
495 "Failed to subscribe to Layer 2 filter"
496 );
497 }
498 }
499
500 tracing::info!(
501 relay = %relay_url,
502 repo_count = repos.len(),
503 "Connected to discovered relay with Layer 2+3 filters"
504 );
505
506 // Create event channel
507 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
508
509 // Spawn event loop
510 tokio::spawn(async move {
511 connection.run_event_loop(event_tx).await;
512 });
513
514 // Spawn event processor
515 let relay_url_clone = relay_url.clone();
516 tokio::spawn(async move {
517 while let Some(relay_event) = event_rx.recv().await {
518 match relay_event {
519 RelayEvent::Event(event) => {
520 Self::process_event_static(
521 &event,
522 &relay_url_clone,
523 &database,
524 &write_policy,
525 )
526 .await;
527 }
528 RelayEvent::EndOfStoredEvents(_) => {
529 tracing::debug!(relay = %relay_url_clone, "EOSE received");
530 }
531 RelayEvent::Closed(_) | RelayEvent::Shutdown => {
532 tracing::info!(relay = %relay_url_clone, "Relay disconnected");
533 break;
534 }
535 }
536 }
537 });
538 }
539
540 /// Spawn a relay connection and start its event loop
541 async fn spawn_relay_connection(&self, relay_url: String) {
542 use tokio::sync::mpsc;
543
544 let database = Arc::clone(&self.database);
545 let write_policy = self.write_policy.clone();
546 let relay_sync_index = Arc::clone(&self.relay_sync_index);
547
548 // Create relay connection
549 let connection = RelayConnection::new(relay_url.clone());
550
551 // Connect and subscribe to Layer 1
552 if let Err(e) = connection.connect_and_subscribe(None).await {
553 tracing::error!("Failed to connect to relay {}: {}", relay_url, e);
554 return;
555 }
556
557 // Mark as connected in relay sync index
558 {
559 let mut index = relay_sync_index.write().await;
560 index.insert(
561 relay_url.clone(),
562 RelayState {
563 repos: HashSet::new(),
564 root_events: HashSet::new(),
565 is_bootstrap: true,
566 connection_status: ConnectionStatus::Connected,
567 last_connected: Some(Timestamp::now()),
568 disconnected_at: None,
569 },
570 );
571 }
572
573 // Create event channel
574 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
575
576 // Spawn event loop
577 tokio::spawn(async move {
578 connection.run_event_loop(event_tx).await;
579 });
580
581 // Spawn event processor
582 let relay_url_clone = relay_url.clone();
583 tokio::spawn(async move {
584 while let Some(relay_event) = event_rx.recv().await {
585 match relay_event {
586 RelayEvent::Event(event) => {
587 Self::process_event_static(&event, &relay_url_clone, &database, &write_policy)
588 .await;
589 }
590 RelayEvent::EndOfStoredEvents(_) => {
591 tracing::debug!("EOSE from {}", relay_url_clone);
592 }
593 RelayEvent::Closed(_) | RelayEvent::Shutdown => {
594 tracing::info!("Relay {} disconnected", relay_url_clone);
595 break;
596 }
597 }
598 }
599 });
600 }
601
602 /// Process a single event from a relay (static version for spawned tasks)
603 async fn process_event_static(
604 event: &Event,
605 relay_url: &str,
606 database: &SharedDatabase,
607 write_policy: &Nip34WritePolicy,
608 ) {
609 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy};
610 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
611
612 // Check if event already exists
613 match database.event_by_id(&event.id).await {
614 Ok(Some(_)) => {
615 tracing::trace!(event_id = %event.id, "Event already exists, skipping");
616 return;
617 }
618 Err(e) => {
619 tracing::warn!(event_id = %event.id, error = %e, "Database error checking event");
620 return;
621 }
622 Ok(None) => {} // Continue processing
623 }
624
625 // Apply write policy using a dummy address (sync events aren't from network clients)
626 let dummy_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
627 let result = write_policy.admit_event(event, &dummy_addr).await;
628
629 match result {
630 PolicyResult::Accept => {
631 // Save event
632 if let Err(e) = database.save_event(event).await {
633 tracing::error!(
634 event_id = %event.id,
635 relay = %relay_url,
636 error = %e,
637 "Failed to save synced event"
638 );
639 } else {
640 tracing::debug!(
641 event_id = %event.id,
642 relay = %relay_url,
643 kind = %event.kind.as_u16(),
644 "Saved synced event"
645 );
646 }
647 }
648 PolicyResult::Reject(reason) => {
649 tracing::debug!(
650 event_id = %event.id,
651 relay = %relay_url,
652 reason = %reason,
653 "Event rejected by write policy"
654 );
655 }
656 }
374 } 657 }
375} \ No newline at end of file 658} \ No newline at end of file