upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/subscription.rs
blob: c37404f0bf3fec9a0d00a7b8dddfa154b1f4dce2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions
//!
//! Manages dynamic subscription updates per connection, including:
//! - Tracking subscribed announcements and events
//! - Adding new subscriptions when announcements/PRs arrive
//! - Consolidating filters when count exceeds threshold
//! - Preventing duplicate subscriptions
//!
//! ## Dynamic Subscription Strategy
//!
//! Initial: Layer 1 (announcements)
//!   ↓ (announcement received)
//! Add: Layer 2 (events for that repo)
//!   ↓ (PR/Issue received)
//! Add: Layer 3 (events for that PR/Issue)
//!   ↓ (filter count > 150)
//! Consolidate: Back to Layer 1 only

use std::collections::HashSet;
use std::sync::Arc;

use nostr_sdk::prelude::*;

use super::filter::FilterService;

/// Maximum number of filters before consolidation is triggered
const CONSOLIDATION_THRESHOLD: usize = 150;

/// Kind 30617 - Repository Announcement (NIP-34)
const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617;

/// Kind 30618 - Maintainer List (NIP-34)
const KIND_MAINTAINER_LIST: u16 = 30618;

/// Manages subscriptions for a single relay connection
///
/// Tracks which announcements and events have been subscribed to,
/// and handles dynamic subscription updates as new events arrive.
#[derive(Debug)]
pub struct SubscriptionManager {
    /// Event IDs of announcements we've subscribed to (for Layer 2)
    subscribed_announcements: HashSet<String>,
    /// Event IDs of PRs/Issues we've subscribed to (for Layer 3)
    subscribed_events: HashSet<String>,
    /// Whether we've consolidated back to Layer 1 only
    is_consolidated: bool,
    /// FilterService for building filters
    filter_service: Arc<FilterService>,
    /// Remote relay domain for Layer 2 filters
    remote_domain: String,
}

impl SubscriptionManager {
    /// Create a new SubscriptionManager
    ///
    /// # Arguments
    /// * `filter_service` - FilterService for building subscription filters
    /// * `remote_domain` - The domain of the remote relay we're syncing from
    pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self {
        Self {
            subscribed_announcements: HashSet::new(),
            subscribed_events: HashSet::new(),
            is_consolidated: false,
            filter_service,
            remote_domain,
        }
    }

    /// Add an announcement and return new filters to subscribe to
    ///
    /// When a new announcement (kind 30617/30618) arrives, this creates
    /// Layer 2 filters to subscribe to events for that repository.
    ///
    /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed.
    pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> {
        let event_id = event.id.to_hex();

        // Check if already subscribed or consolidated
        if self.is_consolidated || self.subscribed_announcements.contains(&event_id) {
            return None;
        }

        // Add to tracked announcements
        self.subscribed_announcements.insert(event_id);

        // Build Layer 2 filters for this announcement
        // Layer 2 filters target events with 'a' tags pointing to this repo
        let filters = self.build_layer2_filter_for_announcement(event);

        if filters.is_empty() {
            None
        } else {
            Some(filters)
        }
    }

    /// Add a PR/Issue/Patch event and return new filters to subscribe to
    ///
    /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives,
    /// this creates Layer 3 filters to subscribe to related events.
    ///
    /// Returns `Some(filters)` if this is a new event, `None` if already subscribed.
    pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> {
        let event_id = event.id.to_hex();

        // Check if already subscribed or consolidated
        if self.is_consolidated || self.subscribed_events.contains(&event_id) {
            return None;
        }

        // Add to tracked events
        self.subscribed_events.insert(event_id.clone());

        // Build Layer 3 filter for this event
        // Layer 3 filters target events with 'e' tags pointing to this event
        let filter = Filter::new().custom_tag(
            SingleLetterTag::lowercase(Alphabet::E),
            event_id,
        );

        Some(vec![filter])
    }

    /// Check if consolidation is needed
    ///
    /// Returns true if the total filter count exceeds the threshold (150).
    pub fn should_consolidate(&self) -> bool {
        !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD
    }

    /// Consolidate all subscriptions back to Layer 1 only
    ///
    /// Clears all tracked announcements and events, marks as consolidated,
    /// and returns the Layer 1 filters to re-subscribe to.
    pub fn consolidate(&mut self) -> Vec<Filter> {
        tracing::info!(
            "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only",
            self.subscribed_announcements.len(),
            self.subscribed_events.len()
        );

        // Clear tracked subscriptions
        self.subscribed_announcements.clear();
        self.subscribed_events.clear();
        self.is_consolidated = true;

        // Return Layer 1 filters
        self.filter_service.get_layer1_filters()
    }

    /// Get the total count of active filters
    ///
    /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3),
    /// plus the base Layer 1 filter count.
    pub fn get_filter_count(&self) -> usize {
        if self.is_consolidated {
            // When consolidated, we only have Layer 1 filters
            1
        } else {
            // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events)
            1 + self.subscribed_announcements.len() + self.subscribed_events.len()
        }
    }

    /// Check if an announcement has been subscribed to
    pub fn has_announcement(&self, event_id: &str) -> bool {
        self.subscribed_announcements.contains(event_id)
    }

    /// Check if an event has been subscribed to
    pub fn has_event(&self, event_id: &str) -> bool {
        self.subscribed_events.contains(event_id)
    }

    /// Check if subscriptions have been consolidated
    pub fn is_consolidated(&self) -> bool {
        self.is_consolidated
    }

    /// Get the count of subscribed announcements
    pub fn announcement_count(&self) -> usize {
        self.subscribed_announcements.len()
    }

    /// Get the count of subscribed events
    pub fn event_count(&self) -> usize {
        self.subscribed_events.len()
    }

    /// Build Layer 2 filter for a specific announcement event
    ///
    /// Creates a filter with an 'a' tag pointing to the announcement's coordinates.
    fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> {
        // Extract the d tag (identifier) from the event
        let identifier = event.tags.iter().find_map(|tag| {
            let tag_vec = tag.clone().to_vec();
            if tag_vec.len() >= 2 && tag_vec[0] == "d" {
                Some(tag_vec[1].clone())
            } else {
                None
            }
        });

        let identifier = match identifier {
            Some(id) => id,
            None => {
                tracing::warn!(
                    "Announcement {} has no 'd' tag, cannot build Layer 2 filter",
                    event.id.to_hex()
                );
                return Vec::new();
            }
        };

        // Determine the kind for the coordinate
        let kind = event.kind.as_u16();
        if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST {
            tracing::warn!(
                "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
                event.id.to_hex(),
                kind
            );
            return Vec::new();
        }

        // Build the addressable coordinate: kind:pubkey:identifier
        let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier);

        // Create filter with 'a' tag for this coordinate
        let filter = Filter::new().custom_tag(
            SingleLetterTag::lowercase(Alphabet::A),
            coord,
        );

        vec![filter]
    }

    /// Check if an event kind is an announcement kind
    pub fn is_announcement_kind(kind: u16) -> bool {
        kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST
    }

    /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3
    pub fn is_pr_issue_kind(kind: u16) -> bool {
        matches!(
            kind,
            1617 | // Patch proposal (NIP-34)
            1618 | // PR
            1619 | // PR Update
            1621 | // Issue
            1622   // Reply
        )
    }
}

#[cfg(test)]
mod tests {
    use super::SubscriptionManager;

    #[test]
    fn test_is_announcement_kind() {
        assert!(SubscriptionManager::is_announcement_kind(30617));
        assert!(SubscriptionManager::is_announcement_kind(30618));
        assert!(!SubscriptionManager::is_announcement_kind(1));
        assert!(!SubscriptionManager::is_announcement_kind(1617));
    }

    #[test]
    fn test_is_pr_issue_kind() {
        assert!(SubscriptionManager::is_pr_issue_kind(1617));
        assert!(SubscriptionManager::is_pr_issue_kind(1618));
        assert!(SubscriptionManager::is_pr_issue_kind(1619));
        assert!(SubscriptionManager::is_pr_issue_kind(1621));
        assert!(SubscriptionManager::is_pr_issue_kind(1622));
        assert!(!SubscriptionManager::is_pr_issue_kind(30617));
        assert!(!SubscriptionManager::is_pr_issue_kind(1));
    }
}