From fdbc8895e1e9e712882bd854908295a95e7afcb9 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 16:54:38 +0000 Subject: docs: update GRASP-02 proactive sync event sync approach --- docs/explanation/grasp-02-proactive-sync.md | 336 +++++++++++++++++++++------- docs/explanation/monitoring.md | 96 +++++++- 2 files changed, 350 insertions(+), 82 deletions(-) (limited to 'docs/explanation') diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index 250aece..a8af3f4 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md @@ -23,6 +23,7 @@ flowchart TB RH[RelayHealthTracker] DB[(Database)] AP[AcceptancePolicy] + MET[Prometheus Metrics] end subgraph External Relays @@ -40,7 +41,7 @@ flowchart TB SM <-->|WebSocket + NEG| R2 SM <-->|WebSocket + NEG| R3 - RH -->|persists state| DB + RH -->|exposes state| MET ``` ## Connection Management @@ -87,28 +88,79 @@ stateDiagram-v2 | State | Behavior | | ----------- | --------------------------------------------------- | | **Healthy** | Normal operation, immediate reconnect on disconnect | -| **Backoff** | Exponential backoff: 1s → 2s → 4s → ... → 1h max | +| **Backoff** | Exponential backoff: 5s → 10s → 20s → ... → 1h max | | **Dead** | 24h of continuous failures, retry once per day | -Health state is **persisted to database** to survive restarts: +Health state is **kept in-memory** using a `DashMap` for lock-free concurrent access: ```rust +/// In-memory relay health tracking (NOT persisted to database) +/// +/// Design rationale: For <100 relays, persistence adds complexity without +/// significant benefit. Conservative initial backoff on restart avoids +/// thundering herd issues. +struct RelayHealthTracker { + health: DashMap, + metrics: SyncMetrics, // Prometheus metrics for operator visibility +} + struct RelayHealth { url: RelayUrl, status: RelayStatus, // Healthy, Backoff, Dead consecutive_failures: u32, - last_failure_at: Option, - last_success_at: Option, - next_retry_at: Timestamp, + last_failure_at: Option, + last_success_at: Option, + next_retry_at: Instant, } enum RelayStatus { Healthy, - Backoff { attempt: u32 }, // backoff = min(2^attempt, 3600) seconds + Backoff { attempt: u32 }, // backoff = min(5 * 2^attempt, 3600) seconds Dead, // retry in 24h } ``` +### Restart Behavior (Graceful Degradation) + +On restart, all relay health state is reset. To avoid thundering herd: + +1. **Conservative initial backoff**: Start with 5s delay (not immediate) for all relays +2. **Staggered connection attempts**: Add random jitter (0-2s) per relay +3. **Health rebuilds organically**: Relays prove themselves healthy through successful connections + +```rust +impl RelayHealthTracker { + fn new(metrics: SyncMetrics) -> Self { + Self { + health: DashMap::new(), + metrics, + } + } + + /// Called on startup for each discovered relay + fn initialize_relay(&self, url: RelayUrl) { + self.health.insert(url.clone(), RelayHealth { + url, + status: RelayStatus::Backoff { attempt: 0 }, // Start conservative + consecutive_failures: 0, + last_failure_at: None, + last_success_at: None, + next_retry_at: Instant::now() + Self::initial_backoff_with_jitter(), + }); + } + + fn initial_backoff_with_jitter() -> Duration { + Duration::from_secs(5) + Duration::from_millis(rand::random::() % 2000) + } +} +``` + +**Trade-off**: We lose knowledge of chronically failing relays across restarts. This is acceptable because: + +- Scale is small (<100 relays) +- Conservative initial backoff prevents hammering bad relays +- Prometheus metrics preserve historical health data for operators + ## Filter Strategy ### Unified Filters for Live Sync and Negentropy @@ -193,6 +245,23 @@ fn build_filters(tag_values: Vec) -> Vec { **Consolidation**: When total filter count exceeds ~150 across a connection, consolidate by rebuilding from scratch. +### Filter Generation vs. Policy Validation + +The filter strategy and acceptance policies serve **different purposes** even though they share conceptual knowledge: + +| Concern | Filters | Policies | +|---------|---------|----------| +| **Direction** | What to request FROM remote relays | What to accept INTO local database | +| **Input** | Stored events (announcements, PRs, etc.) | Single incoming event | +| **Output** | Filter specification | Accept/Reject decision | + +The modular sub-policies ([`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24), [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25), etc.) encode knowledge about event kinds and tag types, but this knowledge is applied differently: + +- **In filters**: We enumerate **all** addressable refs (`30617:pubkey:id`) from stored announcements +- **In policies**: [`RelatedEventPolicy::check_references()`](../../src/nostr/policy/related.rs:39) checks if incoming event references **any** accepted event + +Because of this fundamental difference, filter generation logic stays in `src/sync/filter.rs` rather than being delegated to policy modules. Both share the understanding of NIP-34 event relationships, but they answer different questions. + ## Subscription Updates ### Dynamic Subscription Management @@ -334,25 +403,59 @@ async fn schedule_daily_catchup(&self) { ### Acceptance Policy -All synced events go through our acceptance policy (same as direct submissions), but **excluding nostr-sdk defaults** like rate limiting: +All synced events go through our acceptance policy, reusing the same [`Nip34WritePolicy`](../../src/nostr/builder.rs:36) validation logic used for direct client submissions. + +#### Design: Reusing admit_event() + +The [`WritePolicy::admit_event()`](../../src/nostr/builder.rs:256-269) trait method takes a `SocketAddr` parameter designed for client connections: ```rust -async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> { - // Skip nostr-sdk's built-in rate limiting - we trust our filter strategy - // and don't want to reject valid events just because they arrived quickly +// From nostr-relay-builder WritePolicy trait +fn admit_event<'a>( + &'a self, + event: &'a Event, + _addr: &'a SocketAddr, // Unused in our implementation +) -> BoxedFuture<'a, PolicyResult>; +``` + +For synced events from remote relays, we pass a **synthetic localhost address** since: +1. The `_addr` parameter is currently unused in our [`Nip34WritePolicy`](../../src/nostr/builder.rs:259) +2. All meaningful validation is done by the modular sub-policies (see below) +3. This allows reusing 100% of the existing validation logic + +```rust +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +/// Synthetic address for synced events (not from a direct client connection) +const SYNC_SOURCE_ADDR: SocketAddr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 0 +); - // Apply our custom Nip34WritePolicy +async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> { + // Apply our Nip34WritePolicy using synthetic address + // The SocketAddr is unused - all validation is by the modular sub-policies let result = self.acceptance_policy - .admit_event(&event, source_relay) + .admit_event(&event, &SYNC_SOURCE_ADDR) .await; match result { PolicyResult::Accept => { self.database.save_event(&event).await?; + tracing::debug!( + "Accepted synced event {} from {}", + event.id.to_hex(), + source_relay + ); self.trigger_subscription_updates(&event).await; } PolicyResult::Reject(reason) => { - tracing::debug!("Rejected synced event {}: {}", event.id.to_hex(), reason); + tracing::debug!( + "Rejected synced event {} from {}: {}", + event.id.to_hex(), + source_relay, + reason + ); } } @@ -360,6 +463,41 @@ async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> R } ``` +#### Modular Sub-Policies + +The [`Nip34WritePolicy`](../../src/nostr/builder.rs:36-42) delegates to specialized sub-policies in [`src/nostr/policy/`](../../src/nostr/policy/mod.rs:1-41): + +| Sub-Policy | Kinds | Responsibility | +|------------|-------|----------------| +| [`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24-27) | 30617 | Validates service listing, maintainer exception, creates bare repos | +| [`StatePolicy`](../../src/nostr/policy/state.rs:43-46) | 30618 | Validates state structure, aligns git refs with authorized state | +| [`PrEventPolicy`](../../src/nostr/policy/pr_event.rs) | 1618, 1619 | Validates PR/PR Update events, manages refs/nostr/* | +| [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25-29) | All others | Checks forward/backward references to accepted repos/events | + +All sub-policies share a common [`PolicyContext`](../../src/nostr/policy/mod.rs:22-27) containing: +- `domain`: Our service domain for validation +- `database`: For querying existing events +- `git_data_path`: For git operations + +#### Why Not Call Sub-Policies Directly? + +While we could bypass `admit_event()` and call sub-policies directly: + +```rust +// Alternative: Direct sub-policy calls (NOT recommended) +match event.kind.as_u16() { + 30617 => self.announcement_policy.validate(&event).await, + 30618 => self.state_policy.validate(&event), + 1618 | 1619 => self.pr_event_policy.validate_nostr_ref(&event).await, + _ => self.related_event_policy.check_references(&event).await, +} +``` + +This is **not recommended** because: +1. Duplicates the kind-routing logic from [`admit_event()`](../../src/nostr/builder.rs:261-268) +2. Misses important post-validation steps (e.g., `handle_announcement()` also calls `ensure_bare_repository()`) +3. Creates maintenance burden when policy logic changes + ## Module Structure ### New `src/sync/` Module @@ -402,77 +540,78 @@ async fn main() -> Result<()> { ## Metrics & Observability -### Event Source Tracking +All sync metrics are exposed via Prometheus at `/metrics`. For <100 relays, per-relay labels are acceptable cardinality. -Track provenance of every event to measure live sync effectiveness: +### Prometheus Metrics ```rust -enum EventSource { - DirectSubmission, // Sent directly to our relay by a user - LiveSync(RelayUrl), // Received via live subscription - Catchup(RelayUrl), // Discovered during negentropy catchup - DailyCatchup(RelayUrl), // Found during daily reconciliation -} +/// Sync module metrics registered with the global Prometheus registry +pub struct SyncMetrics { + // === Connection Metrics (per relay) === + /// Active outbound connections: ngit_sync_relay_connected{relay="wss://..."} + relay_connected: IntGaugeVec, // labels: [relay] -struct SyncMetrics { - /// Events by source - events_from_direct: Counter, - events_from_live_sync: Counter, - events_from_catchup: Counter, // Indicates live sync failure - events_from_daily_catchup: Counter, // Indicates sustained sync gap + /// Connection attempts: ngit_sync_connection_attempts_total{relay="wss://...", result="success|failure"} + connection_attempts: CounterVec, // labels: [relay, result] - /// Catchup gap tracking - events found that should have been live synced - catchup_gap_total: Counter, - catchup_gap_by_relay: HashMap, -} -``` + // === Relay Health Status === + /// Current status: ngit_sync_relay_status{relay="wss://...", status="healthy|backoff|dead"} + relay_status: IntGaugeVec, // labels: [relay, status] -**Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time. + /// Consecutive failures: ngit_sync_relay_failures{relay="wss://..."} + relay_failures: IntGaugeVec, // labels: [relay] -### Peer Reliability Tracking + // === Event Source Tracking === + /// Events received by source: ngit_sync_events_total{source="direct|live_sync|catchup|daily_catchup"} + events_total: CounterVec, // labels: [source] -Track relay health and data completeness: + /// Sync gap events (should have been live synced): ngit_sync_gap_events_total{relay="wss://..."} + sync_gap_events: CounterVec, // labels: [relay] -```rust -struct PeerReliability { - relay_url: RelayUrl, - - // Connection metrics - connection_attempts: u64, - connection_failures: u64, - total_uptime_seconds: u64, - total_downtime_seconds: u64, + // === Aggregate Metrics === + /// Total relays being tracked + relays_tracked_total: IntGauge, - // Event coverage metrics (per repo we track) - repos_tracked: HashSet, - missing_events_detected: HashMap, // Events we have that they dont - events_received_from: u64, + /// Relays currently connected + relays_connected_total: IntGauge, - // Calculated scores - uptime_percentage: f64, // uptime / (uptime + downtime) - event_coverage_score: f64, // ratio of events we have vs what we expect from them + /// Relays in dead state + relays_dead_total: IntGauge, } ``` +### Metric Definitions + +| Metric | Type | Labels | Description | +| ------------------------------------- | ------- | ------------- | ------------------------------------------------------ | +| `ngit_sync_relay_connected` | Gauge | relay | 1 if connected, 0 if not | +| `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes | +| `ngit_sync_relay_status` | Gauge | relay, status | 1 for current status, 0 otherwise | +| `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count | +| `ngit_sync_events_total` | Counter | source | Events received by source type | +| `ngit_sync_gap_events_total` | Counter | relay | Events found during catchup that should have been live | +| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered from announcements | +| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count | +| `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead | + +**Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time. The `ngit_sync_gap_events_total` metric tracks this per relay. + ### Observability Integration ```rust -// Prometheus-style metrics impl SyncManager { fn record_event_received(&self, event: &Event, source: EventSource) { match source { EventSource::DirectSubmission => { - self.metrics.events_from_direct.inc(); + self.metrics.events_total.with_label_values(&["direct"]).inc(); } EventSource::LiveSync(relay) => { - self.metrics.events_from_live_sync.inc(); - self.peer_metrics.get(&relay).events_received_from += 1; + self.metrics.events_total.with_label_values(&["live_sync"]).inc(); } EventSource::Catchup(relay) => { // This is a sync gap - we should have gotten it via live sync - self.metrics.events_from_catchup.inc(); - self.metrics.catchup_gap_total.inc(); - self.metrics.catchup_gap_by_relay.entry(relay.clone()).or_default().inc(); + self.metrics.events_total.with_label_values(&["catchup"]).inc(); + self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc(); tracing::warn!( relay = %relay, event_id = %event.id.to_hex(), @@ -481,7 +620,8 @@ impl SyncManager { } EventSource::DailyCatchup(relay) => { // Sustained sync gap - missed by both live sync and initial catchup - self.metrics.events_from_daily_catchup.inc(); + self.metrics.events_total.with_label_values(&["daily_catchup"]).inc(); + self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc(); tracing::error!( relay = %relay, event_id = %event.id.to_hex(), @@ -492,15 +632,56 @@ impl SyncManager { } fn record_connection_attempt(&self, relay: &RelayUrl, success: bool) { - let peer = self.peer_metrics.entry(relay.clone()).or_default(); - peer.connection_attempts += 1; - if !success { - peer.connection_failures += 1; + let result = if success { "success" } else { "failure" }; + self.metrics.connection_attempts + .with_label_values(&[relay.as_str(), result]) + .inc(); + } + + fn update_relay_status(&self, relay: &RelayUrl, status: &RelayStatus) { + // Reset all status labels for this relay + for s in ["healthy", "backoff", "dead"] { + self.metrics.relay_status + .with_label_values(&[relay.as_str(), s]) + .set(0); } + // Set current status + let status_label = match status { + RelayStatus::Healthy => "healthy", + RelayStatus::Backoff { .. } => "backoff", + RelayStatus::Dead => "dead", + }; + self.metrics.relay_status + .with_label_values(&[relay.as_str(), status_label]) + .set(1); } } ``` +### Example Grafana Queries + +```promql +# Relay health overview - count by status +sum by (status) (ngit_sync_relay_status == 1) + +# Connection success rate over last hour +sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) +/ sum(rate(ngit_sync_connection_attempts_total[1h])) + +# Sync gap detection - events that should have been live synced +sum(rate(ngit_sync_gap_events_total[1h])) by (relay) + +# Live sync effectiveness (lower is better - fewer gaps) +sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) +/ sum(rate(ngit_sync_events_total[1h])) + +# Relays with high failure counts (potential issues) +topk(10, ngit_sync_relay_failures) + +# Alert: relay stuck in dead state +ngit_sync_relay_status{status="dead"} == 1 +``` + ### Log Levels for Sync Events | Event | Level | Context | @@ -544,22 +725,23 @@ pub struct SyncConfig { ## Summary -| Component | Responsibility | -| ---------------------- | ------------------------------------------------------------ | -| **SyncManager** | Orchestrates connections, triggers catchup, processes events | -| **FilterService** | Builds unified filters from database state | -| **RelayHealthTracker** | Manages backoff, dead relay detection, persistence | -| **ConnectionState** | Per-relay WebSocket + subscription management | +| Component | Responsibility | +| ---------------------- | -------------------------------------------------------------- | +| **SyncManager** | Orchestrates connections, triggers catchup, processes events | +| **FilterService** | Builds unified filters from database state | +| **RelayHealthTracker** | Manages backoff, dead relay detection (in-memory + Prometheus) | +| **ConnectionState** | Per-relay WebSocket + subscription management | +| **SyncMetrics** | Prometheus metrics for operator visibility | ### Key Design Decisions 1. **Unified filters** for live sync and negentropy - same criteria, different delivery mechanism 2. **Exclude ourselves** from relay list to prevent loops 3. **One connection per relay** with combined filters for efficiency -4. **Persisted health state** survives restarts -5. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up -6. **Client-side filtering** for 30617/30618, server-side for Layer 2/3 -7. **Dynamic subscription addition** with periodic consolidation -8. **Custom acceptance policy** excluding rate limiting defaults -9. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps -10. **Peer reliability tracking** - monitor uptime and event coverage per relay +4. **In-memory health state** with Prometheus metrics for visibility (no database persistence needed for <100 relays) +5. **Graceful degradation on restart** - conservative initial backoff with jitter avoids thundering herd +6. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up +7. **Client-side filtering** for 30617/30618, server-side for Layer 2/3 +8. **Dynamic subscription addition** with periodic consolidation +9. **Custom acceptance policy** excluding rate limiting defaults +10. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps, tracked in Prometheus diff --git a/docs/explanation/monitoring.md b/docs/explanation/monitoring.md index 3b1b1ac..9368bf4 100644 --- a/docs/explanation/monitoring.md +++ b/docs/explanation/monitoring.md @@ -90,10 +90,96 @@ For detailed per-repository investigation at scale, consider adding **Loki** (lo - Loki queries enable ad-hoc deep dives (e.g., find all transfers > 10MB) - Pairs with Prometheus for long-term trends -## Future: Sync Metrics (GRASP-02) +## Sync Metrics (GRASP-02) -When GRASP-02 proactive sync is implemented, additional metrics will track: +When GRASP-02 proactive sync is implemented, the following metrics will be added to track relay synchronization health. These metrics use in-memory tracking with Prometheus for operator visibility (no database persistence needed for <100 relays). -- Events received from sync (live vs catchup) -- Active outbound relay connections -- Catchup gap (events found during catchup indicating sync failures) \ No newline at end of file +### Sync Metrics Overview + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `ngit_sync_relay_connected` | Gauge | relay | 1 if connected, 0 if not | +| `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes | +| `ngit_sync_relay_status` | Gauge | relay, status | 1 for current status, 0 otherwise | +| `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count | +| `ngit_sync_events_total` | Counter | source | Events received by source type | +| `ngit_sync_gap_events_total` | Counter | relay | Events found during catchup | +| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | +| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count | +| `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead | + +### Event Sources + +The `source` label on `ngit_sync_events_total` tracks how events were received: + +- `direct` - Submitted directly to our relay by a user +- `live_sync` - Received via live WebSocket subscription (expected path) +- `catchup` - Found during negentropy catchup after reconnect +- `daily_catchup` - Found during daily reconciliation + +**Catchup events indicate sync failures** - these should have been received via live sync. High catchup rates suggest connectivity issues or filter mismatches. + +### Relay Health States + +The `status` label on `ngit_sync_relay_status` tracks relay health: + +- `healthy` - Normal operation, connections working +- `backoff` - Exponential backoff after failures (5s → 10s → ... → 1h) +- `dead` - 24h of continuous failures, daily retry only + +### Example Grafana Queries + +```promql +# Relay health overview - count by status +sum by (status) (ngit_sync_relay_status == 1) + +# Connection success rate over last hour +sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) +/ sum(rate(ngit_sync_connection_attempts_total[1h])) + +# Sync gap detection - events that should have been live synced +sum(rate(ngit_sync_gap_events_total[1h])) by (relay) + +# Live sync effectiveness (lower is better - fewer gaps) +sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) +/ sum(rate(ngit_sync_events_total[1h])) + +# Relays with high failure counts (potential issues) +topk(10, ngit_sync_relay_failures) +``` + +### Example Alerts + +```yaml +# Alert if relay stuck in dead state for > 1 day +- alert: SyncRelayDead + expr: ngit_sync_relay_status{status="dead"} == 1 + for: 1d + labels: + severity: warning + annotations: + summary: "Sync relay {{ $labels.relay }} is dead" + +# Alert if sync gap rate is high (>10% of events from catchup) +- alert: SyncGapHigh + expr: > + sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) + / sum(rate(ngit_sync_events_total[1h])) > 0.1 + for: 30m + labels: + severity: warning + annotations: + summary: "High sync gap rate - {{ $value | humanizePercentage }} of events from catchup" +``` + +### Design Rationale + +**In-memory health tracking with Prometheus visibility** was chosen over database persistence because: + +1. **Scale**: <100 relays means per-relay labels have acceptable cardinality +2. **Simplicity**: No database schema, migrations, or cleanup needed +3. **Operator visibility**: Prometheus + Grafana provide better dashboards than custom queries +4. **Restart behavior**: Conservative initial backoff (5s + jitter) avoids thundering herd on restart +5. **Historical data**: Prometheus retains health history; in-memory state only needs current status + +See [GRASP-02 Proactive Sync](grasp-02-proactive-sync.md) for full architecture details. \ No newline at end of file -- cgit v1.2.3