upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/subscription.rs
blob: bbeaa2a1bf2ee230fab468d4e62b8fe9cdcdc9b0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions
//!
//! Manages dynamic subscription updates per connection, including:
//! - Tracking subscribed announcements and events
//! - Adding new subscriptions when announcements/PRs arrive
//! - Consolidating filters when count exceeds threshold
//! - Preventing duplicate subscriptions
//!
//! ## Dynamic Subscription Strategy
//!
//! Initial: Layer 1 (announcements)
//!   ↓ (announcement received)
//! Add: Layer 2 (events for that repo)
//!   ↓ (PR/Issue received)
//! Add: Layer 3 (events for that PR/Issue)
//!   ↓ (filter count > 150)
//! Consolidate: Back to Layer 1 only

use std::collections::HashSet;
use std::sync::Arc;

use nostr_sdk::prelude::*;

use super::filter::FilterService;

/// Maximum number of filters before consolidation is triggered
const CONSOLIDATION_THRESHOLD: usize = 150;

/// Manages subscriptions for a single relay connection
///
/// Tracks which announcements and events have been subscribed to,
/// and handles dynamic subscription updates as new events arrive.
#[derive(Debug)]
pub struct SubscriptionManager {
    /// Event IDs of announcements we've subscribed to (for Layer 2)
    subscribed_announcements: HashSet<String>,
    /// Event IDs of PRs/Issues we've subscribed to (for Layer 3)
    subscribed_events: HashSet<String>,
    /// Whether we've consolidated back to Layer 1 only
    is_consolidated: bool,
    /// FilterService for building filters
    filter_service: Arc<FilterService>,
    /// Remote relay domain for Layer 2 filters
    remote_domain: String,
}

impl SubscriptionManager {
    /// Create a new SubscriptionManager
    ///
    /// # Arguments
    /// * `filter_service` - FilterService for building subscription filters
    /// * `remote_domain` - The domain of the remote relay we're syncing from
    pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self {
        Self {
            subscribed_announcements: HashSet::new(),
            subscribed_events: HashSet::new(),
            is_consolidated: false,
            filter_service,
            remote_domain,
        }
    }

    /// Add an announcement and return new filters to subscribe to
    ///
    /// When a new announcement (kind 30617/30618) arrives, this creates
    /// Layer 2 filters to subscribe to events for that repository.
    ///
    /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed.
    pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> {
        let event_id = event.id.to_hex();

        // Check if already subscribed or consolidated
        if self.is_consolidated || self.subscribed_announcements.contains(&event_id) {
            return None;
        }

        // Add to tracked announcements
        self.subscribed_announcements.insert(event_id);

        // Build Layer 2 filters for this announcement
        // Layer 2 filters target events with 'a' tags pointing to this repo
        let filters = self.build_layer2_filter_for_announcement(event);

        if filters.is_empty() {
            None
        } else {
            Some(filters)
        }
    }

    /// Add a PR/Issue/Patch event and return new filters to subscribe to
    ///
    /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives,
    /// this creates Layer 3 filters to subscribe to related events.
    ///
    /// Returns `Some(filters)` if this is a new event, `None` if already subscribed.
    pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> {
        let event_id = event.id.to_hex();

        // Check if already subscribed or consolidated
        if self.is_consolidated || self.subscribed_events.contains(&event_id) {
            return None;
        }

        // Add to tracked events
        self.subscribed_events.insert(event_id.clone());

        // Build Layer 3 filter for this event
        // Layer 3 filters target events with 'e' tags pointing to this event
        let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id);

        Some(vec![filter])
    }

    /// Check if consolidation is needed
    ///
    /// Returns true if the total filter count exceeds the threshold (150).
    pub fn should_consolidate(&self) -> bool {
        !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD
    }

    /// Consolidate all subscriptions back to Layer 1 only
    ///
    /// Clears all tracked announcements and events, marks as consolidated,
    /// and returns the Layer 1 filters to re-subscribe to.
    pub fn consolidate(&mut self) -> Vec<Filter> {
        tracing::info!(
            "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only",
            self.subscribed_announcements.len(),
            self.subscribed_events.len()
        );

        // Clear tracked subscriptions
        self.subscribed_announcements.clear();
        self.subscribed_events.clear();
        self.is_consolidated = true;

        // Return Layer 1 filters
        self.filter_service.get_layer1_filters()
    }

    /// Get the total count of active filters
    ///
    /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3),
    /// plus the base Layer 1 filter count.
    pub fn get_filter_count(&self) -> usize {
        if self.is_consolidated {
            // When consolidated, we only have Layer 1 filters
            1
        } else {
            // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events)
            1 + self.subscribed_announcements.len() + self.subscribed_events.len()
        }
    }

    /// Check if an announcement has been subscribed to
    pub fn has_announcement(&self, event_id: &str) -> bool {
        self.subscribed_announcements.contains(event_id)
    }

    /// Check if an event has been subscribed to
    pub fn has_event(&self, event_id: &str) -> bool {
        self.subscribed_events.contains(event_id)
    }

    /// Check if subscriptions have been consolidated
    pub fn is_consolidated(&self) -> bool {
        self.is_consolidated
    }

    /// Get the count of subscribed announcements
    pub fn announcement_count(&self) -> usize {
        self.subscribed_announcements.len()
    }

    /// Get the count of subscribed events
    pub fn event_count(&self) -> usize {
        self.subscribed_events.len()
    }

    /// Build Layer 2 filter for a specific announcement event
    ///
    /// Creates a filter with an 'a' tag pointing to the announcement's coordinates.
    fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> {
        // Extract the d tag (identifier) from the event
        let identifier = event.tags.iter().find_map(|tag| {
            let tag_vec = tag.clone().to_vec();
            if tag_vec.len() >= 2 && tag_vec[0] == "d" {
                Some(tag_vec[1].clone())
            } else {
                None
            }
        });

        let identifier = match identifier {
            Some(id) => id,
            None => {
                tracing::warn!(
                    "Announcement {} has no 'd' tag, cannot build Layer 2 filter",
                    event.id.to_hex()
                );
                return Vec::new();
            }
        };

        // Verify this is an announcement kind
        if !matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
            tracing::warn!(
                "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
                event.id.to_hex(),
                event.kind
            );
            return Vec::new();
        }

        // Build the addressable coordinate: kind:pubkey:identifier
        let coord = format!(
            "{}:{}:{}",
            event.kind.as_u16(),
            event.pubkey.to_hex(),
            identifier
        );

        // Create filter with 'a' tag for this coordinate
        let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord);

        vec![filter]
    }
}