diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:07:50 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:07:50 +0000 |
| commit | 39e782b12fce1776f2ad0b0f5430749533cb80ea (patch) | |
| tree | d050a079a82898848da870d9307a98a83480629b /src/sync/mod.rs | |
| parent | 586fc2a7df1ce256469f0742d23f687ac4b075b1 (diff) | |
sync v4 mvp
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 301 |
1 files changed, 292 insertions, 9 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c1f8bca..fb09896 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -12,6 +12,20 @@ | |||
| 12 | //! | 12 | //! |
| 13 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. | 13 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. |
| 14 | 14 | ||
| 15 | pub mod algorithms; | ||
| 16 | pub mod filters; | ||
| 17 | pub mod relay_connection; | ||
| 18 | pub mod self_subscriber; | ||
| 19 | |||
| 20 | // Re-export core algorithm types | ||
| 21 | pub use algorithms::{AddFilters, RelaySyncNeeds}; | ||
| 22 | |||
| 23 | // Re-export relay connection types | ||
| 24 | pub use relay_connection::{RelayConnection, RelayEvent}; | ||
| 25 | |||
| 26 | // Re-export self-subscriber types | ||
| 27 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | ||
| 28 | |||
| 15 | use std::collections::{HashMap, HashSet}; | 29 | use std::collections::{HashMap, HashSet}; |
| 16 | use std::sync::Arc; | 30 | use std::sync::Arc; |
| 17 | 31 | ||
| @@ -355,21 +369,290 @@ impl SyncManager { | |||
| 355 | } | 369 | } |
| 356 | } | 370 | } |
| 357 | 371 | ||
| 358 | /// Run the sync manager (placeholder for Phase 1) | 372 | /// Run the sync manager |
| 359 | /// | 373 | /// |
| 360 | /// This will be implemented in later phases to: | 374 | /// Coordinates all sync components: |
| 361 | /// 1. Subscribe to local relay for 30617 events | 375 | /// 1. Spawns self-subscriber to monitor own relay for announcements |
| 362 | /// 2. Process events to build RepoSyncIndex | 376 | /// 2. Connects to bootstrap relay if configured |
| 363 | /// 3. Compute and execute sync actions | 377 | /// 3. Handles relay actions from self-subscriber |
| 364 | /// 4. Handle reconnection and catch-up logic | ||
| 365 | pub async fn run(self) { | 378 | pub async fn run(self) { |
| 379 | use tokio::sync::mpsc; | ||
| 380 | |||
| 366 | tracing::info!( | 381 | tracing::info!( |
| 367 | bootstrap_relay = ?self.bootstrap_relay_url, | 382 | bootstrap_relay = ?self.bootstrap_relay_url, |
| 368 | service_domain = %self.service_domain, | 383 | service_domain = %self.service_domain, |
| 369 | "SyncManager starting (placeholder - not yet implemented)" | 384 | "SyncManager starting" |
| 385 | ); | ||
| 386 | |||
| 387 | // 1. Create action channel for self-subscriber -> manager communication | ||
| 388 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); | ||
| 389 | |||
| 390 | // 2. Spawn self-subscriber | ||
| 391 | let self_subscriber = SelfSubscriber::new( | ||
| 392 | format!("ws://{}", self.service_domain), | ||
| 393 | self.service_domain.clone(), | ||
| 394 | Arc::clone(&self.repo_sync_index), | ||
| 395 | action_tx, | ||
| 370 | ); | 396 | ); |
| 397 | tokio::spawn(async move { self_subscriber.run().await }); | ||
| 398 | |||
| 399 | // 3. Connect to bootstrap relay if configured | ||
| 400 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | ||
| 401 | self.spawn_relay_connection(bootstrap_url.clone()).await; | ||
| 402 | } | ||
| 371 | 403 | ||
| 372 | // Phase 1: Just log and return | 404 | // 4. Main loop - handle actions from self-subscriber |
| 373 | // Full implementation will be added in subsequent phases | 405 | loop { |
| 406 | tokio::select! { | ||
| 407 | action = action_rx.recv() => { | ||
| 408 | match action { | ||
| 409 | Some(RelayAction::SpawnRelay { relay_url, repos }) => { | ||
| 410 | // Check if relay already exists | ||
| 411 | let relay_index = self.relay_sync_index.read().await; | ||
| 412 | let exists = relay_index.contains_key(&relay_url); | ||
| 413 | drop(relay_index); | ||
| 414 | |||
| 415 | if !exists { | ||
| 416 | tracing::info!(relay = %relay_url, "Spawning new relay connection"); | ||
| 417 | self.spawn_relay_with_layer2(relay_url, repos).await; | ||
| 418 | } else { | ||
| 419 | tracing::debug!( | ||
| 420 | relay = %relay_url, | ||
| 421 | "Relay already exists, considering AddFilters" | ||
| 422 | ); | ||
| 423 | // For MVP, we don't handle AddFilters - just log | ||
| 424 | // Full implementation would call subscribe_filters on existing connection | ||
| 425 | } | ||
| 426 | } | ||
| 427 | Some(RelayAction::AddFilters { relay_url, repos }) => { | ||
| 428 | tracing::debug!( | ||
| 429 | relay = %relay_url, | ||
| 430 | repo_count = repos.len(), | ||
| 431 | "AddFilters action (MVP: not implemented)" | ||
| 432 | ); | ||
| 433 | // For MVP, not implemented - full version would add Layer 2 filters | ||
| 434 | // to existing relay connection | ||
| 435 | } | ||
| 436 | None => break, | ||
| 437 | } | ||
| 438 | } | ||
| 439 | } | ||
| 440 | } | ||
| 441 | } | ||
| 442 | |||
| 443 | /// Spawn relay connection with Layer 2 filters for specific repos | ||
| 444 | /// | ||
| 445 | /// Used when discovering relays from announcements. Connects to the relay, | ||
| 446 | /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the | ||
| 447 | /// specific repos we want to sync. | ||
| 448 | async fn spawn_relay_with_layer2( | ||
| 449 | &self, | ||
| 450 | relay_url: String, | ||
| 451 | repos: HashMap<String, HashSet<EventId>>, | ||
| 452 | ) { | ||
| 453 | use crate::sync::filters::build_layer2_and_layer3_filters; | ||
| 454 | use tokio::sync::mpsc; | ||
| 455 | |||
| 456 | let database = Arc::clone(&self.database); | ||
| 457 | let write_policy = self.write_policy.clone(); | ||
| 458 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | ||
| 459 | |||
| 460 | // Create relay connection | ||
| 461 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 462 | |||
| 463 | // Connect and subscribe to Layer 1 (announcements) | ||
| 464 | if let Err(e) = connection.connect_and_subscribe(None).await { | ||
| 465 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | ||
| 466 | return; | ||
| 467 | } | ||
| 468 | |||
| 469 | // Mark as connected in relay sync index | ||
| 470 | { | ||
| 471 | let mut index = relay_sync_index.write().await; | ||
| 472 | index.insert( | ||
| 473 | relay_url.clone(), | ||
| 474 | RelayState { | ||
| 475 | repos: repos.keys().cloned().collect(), | ||
| 476 | root_events: repos.values().flatten().cloned().collect(), | ||
| 477 | is_bootstrap: false, | ||
| 478 | connection_status: ConnectionStatus::Connected, | ||
| 479 | last_connected: Some(Timestamp::now()), | ||
| 480 | disconnected_at: None, | ||
| 481 | }, | ||
| 482 | ); | ||
| 483 | } | ||
| 484 | |||
| 485 | // Subscribe to Layer 2+3 filters for the repos | ||
| 486 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | ||
| 487 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | ||
| 488 | let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None); | ||
| 489 | |||
| 490 | for filter in filters { | ||
| 491 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 492 | tracing::error!( | ||
| 493 | relay = %relay_url, | ||
| 494 | error = %e, | ||
| 495 | "Failed to subscribe to Layer 2 filter" | ||
| 496 | ); | ||
| 497 | } | ||
| 498 | } | ||
| 499 | |||
| 500 | tracing::info!( | ||
| 501 | relay = %relay_url, | ||
| 502 | repo_count = repos.len(), | ||
| 503 | "Connected to discovered relay with Layer 2+3 filters" | ||
| 504 | ); | ||
| 505 | |||
| 506 | // Create event channel | ||
| 507 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | ||
| 508 | |||
| 509 | // Spawn event loop | ||
| 510 | tokio::spawn(async move { | ||
| 511 | connection.run_event_loop(event_tx).await; | ||
| 512 | }); | ||
| 513 | |||
| 514 | // Spawn event processor | ||
| 515 | let relay_url_clone = relay_url.clone(); | ||
| 516 | tokio::spawn(async move { | ||
| 517 | while let Some(relay_event) = event_rx.recv().await { | ||
| 518 | match relay_event { | ||
| 519 | RelayEvent::Event(event) => { | ||
| 520 | Self::process_event_static( | ||
| 521 | &event, | ||
| 522 | &relay_url_clone, | ||
| 523 | &database, | ||
| 524 | &write_policy, | ||
| 525 | ) | ||
| 526 | .await; | ||
| 527 | } | ||
| 528 | RelayEvent::EndOfStoredEvents(_) => { | ||
| 529 | tracing::debug!(relay = %relay_url_clone, "EOSE received"); | ||
| 530 | } | ||
| 531 | RelayEvent::Closed(_) | RelayEvent::Shutdown => { | ||
| 532 | tracing::info!(relay = %relay_url_clone, "Relay disconnected"); | ||
| 533 | break; | ||
| 534 | } | ||
| 535 | } | ||
| 536 | } | ||
| 537 | }); | ||
| 538 | } | ||
| 539 | |||
| 540 | /// Spawn a relay connection and start its event loop | ||
| 541 | async fn spawn_relay_connection(&self, relay_url: String) { | ||
| 542 | use tokio::sync::mpsc; | ||
| 543 | |||
| 544 | let database = Arc::clone(&self.database); | ||
| 545 | let write_policy = self.write_policy.clone(); | ||
| 546 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | ||
| 547 | |||
| 548 | // Create relay connection | ||
| 549 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 550 | |||
| 551 | // Connect and subscribe to Layer 1 | ||
| 552 | if let Err(e) = connection.connect_and_subscribe(None).await { | ||
| 553 | tracing::error!("Failed to connect to relay {}: {}", relay_url, e); | ||
| 554 | return; | ||
| 555 | } | ||
| 556 | |||
| 557 | // Mark as connected in relay sync index | ||
| 558 | { | ||
| 559 | let mut index = relay_sync_index.write().await; | ||
| 560 | index.insert( | ||
| 561 | relay_url.clone(), | ||
| 562 | RelayState { | ||
| 563 | repos: HashSet::new(), | ||
| 564 | root_events: HashSet::new(), | ||
| 565 | is_bootstrap: true, | ||
| 566 | connection_status: ConnectionStatus::Connected, | ||
| 567 | last_connected: Some(Timestamp::now()), | ||
| 568 | disconnected_at: None, | ||
| 569 | }, | ||
| 570 | ); | ||
| 571 | } | ||
| 572 | |||
| 573 | // Create event channel | ||
| 574 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | ||
| 575 | |||
| 576 | // Spawn event loop | ||
| 577 | tokio::spawn(async move { | ||
| 578 | connection.run_event_loop(event_tx).await; | ||
| 579 | }); | ||
| 580 | |||
| 581 | // Spawn event processor | ||
| 582 | let relay_url_clone = relay_url.clone(); | ||
| 583 | tokio::spawn(async move { | ||
| 584 | while let Some(relay_event) = event_rx.recv().await { | ||
| 585 | match relay_event { | ||
| 586 | RelayEvent::Event(event) => { | ||
| 587 | Self::process_event_static(&event, &relay_url_clone, &database, &write_policy) | ||
| 588 | .await; | ||
| 589 | } | ||
| 590 | RelayEvent::EndOfStoredEvents(_) => { | ||
| 591 | tracing::debug!("EOSE from {}", relay_url_clone); | ||
| 592 | } | ||
| 593 | RelayEvent::Closed(_) | RelayEvent::Shutdown => { | ||
| 594 | tracing::info!("Relay {} disconnected", relay_url_clone); | ||
| 595 | break; | ||
| 596 | } | ||
| 597 | } | ||
| 598 | } | ||
| 599 | }); | ||
| 600 | } | ||
| 601 | |||
| 602 | /// Process a single event from a relay (static version for spawned tasks) | ||
| 603 | async fn process_event_static( | ||
| 604 | event: &Event, | ||
| 605 | relay_url: &str, | ||
| 606 | database: &SharedDatabase, | ||
| 607 | write_policy: &Nip34WritePolicy, | ||
| 608 | ) { | ||
| 609 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; | ||
| 610 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||
| 611 | |||
| 612 | // Check if event already exists | ||
| 613 | match database.event_by_id(&event.id).await { | ||
| 614 | Ok(Some(_)) => { | ||
| 615 | tracing::trace!(event_id = %event.id, "Event already exists, skipping"); | ||
| 616 | return; | ||
| 617 | } | ||
| 618 | Err(e) => { | ||
| 619 | tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); | ||
| 620 | return; | ||
| 621 | } | ||
| 622 | Ok(None) => {} // Continue processing | ||
| 623 | } | ||
| 624 | |||
| 625 | // Apply write policy using a dummy address (sync events aren't from network clients) | ||
| 626 | let dummy_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); | ||
| 627 | let result = write_policy.admit_event(event, &dummy_addr).await; | ||
| 628 | |||
| 629 | match result { | ||
| 630 | PolicyResult::Accept => { | ||
| 631 | // Save event | ||
| 632 | if let Err(e) = database.save_event(event).await { | ||
| 633 | tracing::error!( | ||
| 634 | event_id = %event.id, | ||
| 635 | relay = %relay_url, | ||
| 636 | error = %e, | ||
| 637 | "Failed to save synced event" | ||
| 638 | ); | ||
| 639 | } else { | ||
| 640 | tracing::debug!( | ||
| 641 | event_id = %event.id, | ||
| 642 | relay = %relay_url, | ||
| 643 | kind = %event.kind.as_u16(), | ||
| 644 | "Saved synced event" | ||
| 645 | ); | ||
| 646 | } | ||
| 647 | } | ||
| 648 | PolicyResult::Reject(reason) => { | ||
| 649 | tracing::debug!( | ||
| 650 | event_id = %event.id, | ||
| 651 | relay = %relay_url, | ||
| 652 | reason = %reason, | ||
| 653 | "Event rejected by write policy" | ||
| 654 | ); | ||
| 655 | } | ||
| 656 | } | ||
| 374 | } | 657 | } |
| 375 | } \ No newline at end of file | 658 | } \ No newline at end of file |