1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
#![warn(missing_docs)]

//! A data structure that can efficiently store many keys that map to a
//! relatively small number of values.

use dashmap::{mapref::entry::Entry, DashMap};
use std::collections::hash_map::DefaultHasher;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};

/// A hashmap that assumes a large number of keys will map to a relatively smaller number of values.
///
/// If a value is stored in the map that is already stored by another key, the
/// refcount of the value will be incremented instead of duplicating it.
///
/// Uses DashMap internally
#[derive(Debug)]
pub struct DedupedMap<K, M, V>
where
    K: Eq + Hash,
    M: Debug,
    V: Debug,
{
    /// First layer of the map. A mapping from incoming requests to a u64 pointer
    /// into the cache storage.
    pointers: DashMap<K, MapPointer<M>>,

    /// Second layer of the map. The items stored in the cache, keyed by their
    /// hash.
    storage: DashMap<u64, MapValue<V>>,
}

/// The first layer of the map, it stores per-key metadata, and a hash entry for the second layer.
#[derive(Debug)]
struct MapPointer<M: Debug> {
    /// The metadata associated with this pointer.
    meta: M,
    /// The hash of the content to retrieve from the storage.
    hash: u64,
}

impl<M: Clone + Debug> Clone for MapPointer<M> {
    fn clone(&self) -> Self {
        Self {
            meta: self.meta.clone(),
            hash: self.hash,
        }
    }
}

/// The second layer of the map, a reference counted value.
#[derive(Debug)]
struct MapValue<V> {
    /// The stored value.
    value: V,
    /// The number of pointers that are referring to this storage item.
    refcount: usize,
}

impl<V: PartialEq> PartialEq for MapValue<V> {
    fn eq(&self, other: &Self) -> bool {
        self.value == other.value && self.refcount == other.refcount
    }
}

impl<V: Clone> Clone for MapValue<V> {
    fn clone(&self) -> Self {
        Self {
            value: self.value.clone(),
            refcount: self.refcount,
        }
    }
}

impl<K, M, V> DedupedMap<K, M, V>
where
    K: Eq + Hash + Debug,
    M: Debug + Clone,
    V: Hash + Debug + Clone,
{
    /// Create an empty map.
    #[must_use]
    pub fn new() -> Self {
        Self {
            storage: DashMap::new(),
            pointers: DashMap::new(),
        }
    }

    /// Insert `value` into the map at `key` with metadata `meta`.
    ///
    /// If `value` is already in the map under a different key, its refcount will
    /// be incremented instead of storing another copy of `value`. The metadata
    /// data `meta` will be attached to this specific key, and not refcounted.
    pub fn insert(&self, key: K, meta: M, value: V) {
        let mut hasher = DefaultHasher::new();
        value.hash(&mut hasher);
        let hash = hasher.finish();

        // This order reduces the chance of seeing a dangling pointer, since we
        // wont have a time where there is a pointer pointing to nothing. It
        // increases the chances of orphaned storage entries. An orphan is safer
        // than a dangling pointer, as an orphan is similar to a memory leak,
        // instead of allowing for potential race conditions.
        match self.storage.entry(hash) {
            Entry::Occupied(mut occupied_entry) => {
                occupied_entry.get_mut().refcount += 1;
            }
            Entry::Vacant(vacant_entry) => {
                vacant_entry.insert(MapValue { value, refcount: 1 });
            }
        }

        self.pointers.insert(key, MapPointer { meta, hash });
    }

    /// Remove the item associated with a key from the map.
    ///
    /// The metadata associated with `key` will always be removed. The reference
    /// count of the storage item it points to will be decremented. If no more
    /// keys refer to the storage item, it will also be removed.
    pub fn remove(&self, key: K) {
        let key_desc = format!("{:?}", key);
        if let Entry::Occupied(occupied_pointer_entry) = self.pointers.entry(key) {
            let pointer = occupied_pointer_entry.remove();
            match self.storage.entry(pointer.hash) {
                Entry::Occupied(mut occupied_storage_entry) => {
                    let item = occupied_storage_entry.get_mut();
                    if item.refcount > 1 {
                        item.refcount -= 1;
                    } else {
                        occupied_storage_entry.remove();
                    }
                }
                Entry::Vacant(_) => {
                    tracing::error!(
                        r#type = "deduped-dashmap.remove.dangling-entry",
                        key = %key_desc, "Dangling storage entry");
                }
            }
        }
    }

    /// Get cloned copies of the metadata and value associated with `key`.
    pub fn get(&self, key: &K) -> Option<(M, V)> {
        match self.pointers.get(key) {
            Some(pointer_ref) => {
                if let Some(storage_ref) = self.storage.get(&pointer_ref.hash) {
                    let meta = pointer_ref.meta.clone();
                    let value = storage_ref.value.clone();
                    Some((meta, value))
                } else {
                    tracing::error!(
                        r#type = "deduped-dashmap.get.dangling-entry",
                        ?key,
                        "Dangling storage entry"
                    );
                    None
                }
            }
            None => None,
        }
    }

    /// Fetches the total number of storage items in the map.
    ///
    /// This will be at most `self.len_pointers()`.
    #[must_use]
    pub fn len_storage(&self) -> usize {
        self.storage.len()
    }

    /// Fetches the total number of pointers stored in the map.
    ///
    /// This will be at least `self.len_storage()`.
    #[must_use]
    pub fn len_pointers(&self) -> usize {
        self.pointers.len()
    }

    /// Retain elements based on the result of a predicate.
    ///
    /// If the predicate function returns:
    ///
    /// - `ControlFlow::Continue(true)`: The item will be retained and the iteration will continue.
    /// - `ControlFlow::Continue(false)`: The item will be removed and the iteration will continue.
    /// - `ControlFlow::Break(())`: This item, and all further items in the map,
    ///   will be retained, and the predicate won't be called again.
    pub fn retain<F>(&self, mut pred: F)
    where
        F: FnMut(&K, &M, &V) -> ControlFlow<bool>,
    {
        let mut should_continue = true;

        self.pointers.retain(|key, pointer| -> bool {
            // it would be nice if we could stop the `dashmap`'s iteration, but we can't.
            if !should_continue {
                return true;
            }

            match self.storage.entry(pointer.hash) {
                Entry::Occupied(mut occupied_entry) => {
                    let item = occupied_entry.get_mut();
                    let should_keep = pred(key, &pointer.meta, &item.value);

                    match should_keep {
                        ControlFlow::Continue(true) => true,
                        ControlFlow::Continue(false) => {
                            if item.refcount > 1 {
                                item.refcount -= 1;
                            } else {
                                occupied_entry.remove();
                            }
                            false
                        }
                        ControlFlow::Break => {
                            should_continue = false;
                            true
                        }
                    }
                }
                Entry::Vacant(_) => {
                    tracing::error!(
                        r#type = "deduped-dashmap.retain.missing-entry",
                        "missing storage entry in memory cache"
                    );
                    false
                }
            }
        });
    }

    /// Checks if the map contains a specific key.
    #[cfg_attr(not(test), allow(dead_code))]
    pub fn contains_key(&self, key: &K) -> bool {
        self.pointers.contains_key(key)
    }

    /// Clears the map, both the pointer and the storage maps.
    ///
    /// It behaves as `Self::retain(|_, _, _| ControlFlow::Continue(false))`
    /// except being faster as the predicate checking step is skipped.
    ///
    /// Note that the two underlying maps are cleared sequentially without
    /// locking, calling this function while other map mutations are done
    /// concurrently could result in dangling pointers. As a mitigation,
    /// it clears the pointer map first, followed by the storage map to
    /// reduce the chance of dangling pointers.
    pub fn clear(&self) {
        self.pointers.clear();
        self.storage.clear();
    }
}

impl<K, M, V> Default for DedupedMap<K, M, V>
where
    K: Eq + Hash,
    M: Debug,
    V: Debug,
{
    fn default() -> Self {
        Self {
            pointers: DashMap::default(),
            storage: DashMap::default(),
        }
    }
}

impl<K, M, V> FromIterator<(K, M, V)> for DedupedMap<K, M, V>
where
    K: Debug + Eq + Hash,
    M: Clone + Debug,
    V: Clone + Debug + Eq + Hash,
{
    fn from_iter<T: IntoIterator<Item = (K, M, V)>>(iter: T) -> Self {
        let map = Self::new();
        for (k, m, v) in iter {
            map.insert(k, m, v);
        }
        map
    }
}

impl<K, V> FromIterator<(K, V)> for DedupedMap<K, (), V>
where
    K: Debug + Eq + Hash,
    V: Clone + Debug + Eq + Hash,
{
    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
        iter.into_iter().map(|(k, v)| (k, (), v)).collect()
    }
}

/// This mimic's the unstable API std::ops::ControlFlow, except the Break variant here doesn't have a value.
#[derive(Debug)]
pub enum ControlFlow<C> {
    /// Continue to the next iteration.
    Continue(C),
    /// Stop after this iteration.
    Break,
}

#[cfg(test)]
mod tests {
    use crate::MapValue;

    use super::{ControlFlow, DedupedMap};
    use std::collections::{HashMap, HashSet};

    #[test]
    fn test_simple() {
        // Set up a map with some values related to days of the week
        let map = DedupedMap::<&str, usize, &str>::new();
        map.insert("monday", 0, "week day");
        map.insert("tuesday", 1, "week day");
        map.insert("wednesday", 2, "week day");
        map.insert("thursday", 3, "week day");
        map.insert("friday", 4, "week day");
        map.insert("saturday", 5, "week end");
        map.insert("sunday", 6, "week end");

        // Make some assertions about the initial state
        assert_eq!(map.len_pointers(), 7);
        assert_eq!(map.len_storage(), 2);

        let mut pointers = map.pointers.clone().into_iter().collect::<Vec<_>>();
        pointers.sort_by_key(|v| v.0);
        let storage = map.storage.clone().into_iter().collect::<HashMap<_, _>>();

        let cases = vec![
            ("friday", 4, "week day", 5),
            ("monday", 0, "week day", 5),
            ("saturday", 5, "week end", 2),
            ("sunday", 6, "week end", 2),
            ("thursday", 3, "week day", 5),
            ("tuesday", 1, "week day", 5),
            ("wednesday", 2, "week day", 5),
        ];
        for (idx, (day, day_num, day_type, refcount)) in cases.into_iter().enumerate() {
            assert_eq!(pointers[idx].0, day);
            assert_eq!(pointers[idx].1.meta, day_num);
            assert_eq!(
                storage[&pointers[idx].1.hash],
                MapValue {
                    value: day_type,
                    refcount,
                }
            );
        }

        // Remove some items from the map
        map.remove("sunday");
        assert_eq!(map.len_pointers(), 6);
        assert_eq!(map.len_storage(), 2);
        map.remove("saturday");
        assert_eq!(map.len_pointers(), 5);
        assert_eq!(map.len_storage(), 1);
        map.remove("monday");
        assert_eq!(map.len_pointers(), 4);
        assert_eq!(map.len_storage(), 1);

        // Make sure the internal structure changed as expected
        let mut pointers = map.pointers.clone().into_iter().collect::<Vec<_>>();
        pointers.sort_by_key(|v| v.0);
        let storage = map.storage.clone().into_iter().collect::<HashMap<_, _>>();

        let cases = vec![
            ("friday", 4, "week day", 4),
            ("thursday", 3, "week day", 4),
            ("tuesday", 1, "week day", 4),
            ("wednesday", 2, "week day", 4),
        ];
        for (idx, (day, day_num, day_type, refcount)) in cases.into_iter().enumerate() {
            assert_eq!(pointers[idx].0, day);
            assert_eq!(pointers[idx].1.meta, day_num);
            assert_eq!(
                storage[&pointers[idx].1.hash],
                MapValue {
                    value: day_type,
                    refcount,
                }
            );
        }

        // Remove the rest of the items
        map.remove("tuesday");
        map.remove("wednesday");
        map.remove("thursday");
        map.remove("friday");

        // Make sure the internal structure changed as expected
        assert_eq!(map.len_pointers(), 0);
        assert_eq!(map.len_storage(), 0);
        assert!(map.pointers.iter().next().is_none());
        assert!(map.storage.iter().next().is_none());
    }

    #[test]
    fn test_retain() {
        let map = DedupedMap::<&str, &str, &str>::new();
        map.insert("a", "red", "#f00");
        map.insert("b", "red", "#f00");
        map.insert("c", "green", "#0f0");
        map.insert("d", "green", "#0f0");
        map.insert("e", "blue", "#00f");
        map.insert("f", "blue", "#00f");

        assert_eq!(map.len_pointers(), 6);
        assert_eq!(map.len_storage(), 3);

        // retain only red things
        map.retain(|_key, meta, _value| ControlFlow::Continue(*meta == "red"));

        assert_eq!(map.len_pointers(), 2);
        assert_eq!(map.len_storage(), 1);

        assert!(map.contains_key(&"a"));
        assert!(map.contains_key(&"b"));
    }

    #[test]
    fn test_retain_control_flow() {
        let map = DedupedMap::<u32, (), ()>::new();
        for i in 0..10 {
            map.insert(i, (), ());
        }
        assert_eq!(map.len_pointers(), 10);
        assert_eq!(map.len_storage(), 1);

        // Remove numbers until we find five, and then stop.
        // note: keys are iterated in an arbitrary order.
        let mut removed_items = HashSet::new();
        map.retain(|key, _, _| {
            if *key == 5 {
                ControlFlow::Break
            } else {
                removed_items.insert(*key);
                ControlFlow::Continue(false)
            }
        });

        // Five should still be in the map, the removed numbers should no longer
        // be in the map, and any number that wasn't encountered before the
        // break should still be in the map.
        assert!(map.contains_key(&5));
        for i in 0..10 {
            if removed_items.contains(&i) {
                assert!(!map.contains_key(&i));
            } else {
                assert!(map.contains_key(&i));
            }
        }
    }

    #[test]
    fn test_remove_everything() {
        let map = DedupedMap::<char, (), usize>::new();
        for (idx, letter) in ('a'..='z').enumerate() {
            map.insert(letter, (), idx / 2);
        }

        assert_eq!(map.len_pointers(), 26);
        assert_eq!(map.len_storage(), 13);

        map.retain(|_, _, _| ControlFlow::Continue(false));

        assert_eq!(map.len_pointers(), 0);
        assert_eq!(map.len_storage(), 0);
    }

    #[test]
    fn test_clear() {
        let map = DedupedMap::<char, (), usize>::new();
        for (idx, letter) in ('a'..='z').enumerate() {
            map.insert(letter, (), idx / 2);
        }

        assert_eq!(map.len_pointers(), 26);
        assert_eq!(map.len_storage(), 13);

        map.clear();

        assert_eq!(map.len_pointers(), 0);
        assert_eq!(map.len_storage(), 0);
    }
}