autopush_common/db/bigtable/bigtable_client/
merge.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
use std::collections::{BTreeMap, HashMap};
use std::mem;
use std::time::{Duration, SystemTime};

use futures::StreamExt;
use google_cloud_rust_raw::bigtable::v2::bigtable::{ReadRowsResponse, ReadRowsResponse_CellChunk};
use grpcio::ClientSStreamReceiver;

use super::{cell::Cell, error::BigTableError, row::Row, FamilyId, Qualifier, RowKey};

/// List of the potential states when we are reading each value from the
/// returned stream and composing a "row"
#[derive(Debug, Default, Clone, Eq, PartialEq)]
enum ReadState {
    #[default]
    RowStart,
    CellStart,
    CellInProgress,
    CellComplete,
    RowComplete,
}

/// An in-progress Cell data struct.
#[derive(Debug, Clone)]
pub(crate) struct PartialCell {
    pub(crate) family: FamilyId,
    /// A "qualifier" is a column name
    pub(crate) qualifier: Qualifier,
    /// Timestamps are returned as microseconds, but need to be
    /// specified as milliseconds (even though the function asks
    /// for microseconds, you * 1000 the mls).
    pub(crate) timestamp: SystemTime,
    /// Not sure if or how these are used
    pub(crate) labels: Vec<String>,
    /// The data buffer.
    pub(crate) value: Vec<u8>,
    /// the returned sort order for the cell.
    pub(crate) value_index: usize,
}

impl Default for PartialCell {
    fn default() -> Self {
        Self {
            family: String::default(),
            qualifier: String::default(),
            timestamp: SystemTime::now(),
            labels: Vec::new(),
            value: Vec::new(),
            value_index: 0,
        }
    }
}

/// An in-progress Row structure
#[derive(Debug, Default, Clone)]
pub(crate) struct PartialRow {
    /// table uniquie key
    row_key: RowKey,
    /// map of cells per family identifier.
    cells: HashMap<FamilyId, Vec<PartialCell>>,
    /// the last family id string we encountered
    last_family: FamilyId,
    /// the working set of cells for a given qualifier
    /// *NOTE* Bigtable wants to collect cells by Family ID. That's
    /// not important for us to process data, so we cheat a bit and
    /// return a row structure that replaces the Family ID with the
    /// cell qualifier, allowing us to index the returned data faster.
    last_collected_cells: HashMap<Qualifier, Vec<Cell>>,
    /// the last column name we've encountered
    last_qualifier: Qualifier,
    /// Any cell that may be in progress (chunked
    /// across multiple portions)
    cell_in_progress: PartialCell,
}

/// workhorse struct, this is used to gather item data from the stream and build rows.
#[derive(Debug, Default)]
pub struct RowMerger {
    /// The row's current state. State progresses while processing a single chunk.
    state: ReadState,
    /// The last row key we've encountered. This should be consistent across the chunks.
    last_seen_row_key: Option<RowKey>,
    /// The last cell family. This may change, indicating a new cell group.
    last_seen_cell_family: Option<FamilyId>,
    /// The row that is currently being compiled.
    row_in_progress: PartialRow,
}

impl RowMerger {
    /// discard data so far and return to a neutral state.
    fn reset_row(&mut self, chunk: ReadRowsResponse_CellChunk) -> Result<&mut Self, BigTableError> {
        if self.state == ReadState::RowStart {
            return Err(BigTableError::InvalidChunk("Bare Reset".to_owned()));
        };
        if !chunk.row_key.is_empty() {
            return Err(BigTableError::InvalidChunk(
                "Reset chunk has a row key".to_owned(),
            ));
        };
        if chunk.has_family_name() {
            return Err(BigTableError::InvalidChunk(
                "Reset chunk has a family_name".to_owned(),
            ));
        }
        if chunk.has_qualifier() {
            return Err(BigTableError::InvalidChunk(
                "Reset chunk has a qualifier".to_owned(),
            ));
        }
        if chunk.timestamp_micros > 0 {
            return Err(BigTableError::InvalidChunk(
                "Reset chunk has a timestamp".to_owned(),
            ));
        }
        if !chunk.get_labels().is_empty() {
            return Err(BigTableError::InvalidChunk(
                "Reset chunk has a labels".to_owned(),
            ));
        }
        if !chunk.value.is_empty() {
            return Err(BigTableError::InvalidChunk(
                "Reset chunk has value".to_owned(),
            ));
        }

        self.state = ReadState::RowStart;
        self.row_in_progress = PartialRow::default();
        Ok(self)
    }

    /// The initial row contains the first cell data. There may be additional data that we
    /// have to use later, so capture that as well.
    fn row_start(
        &mut self,
        chunk: &mut ReadRowsResponse_CellChunk,
    ) -> Result<&Self, BigTableError> {
        if chunk.row_key.is_empty() {
            return Err(BigTableError::InvalidChunk(
                "New row is missing a row key".to_owned(),
            ));
        }
        if chunk.has_family_name() {
            info!(
                "👪Family name: {}: {:?}",
                String::from_utf8(chunk.row_key.clone()).unwrap_or_default(),
                &chunk.get_family_name()
            );
            self.last_seen_cell_family = Some(chunk.get_family_name().get_value().to_owned());
        }
        if let Some(last_key) = self.last_seen_row_key.clone() {
            if last_key.as_bytes().to_vec() >= chunk.row_key {
                return Err(BigTableError::InvalidChunk(
                    "Out of order row keys".to_owned(),
                ));
            }
        }

        let row = &mut self.row_in_progress;

        row.row_key = String::from_utf8(chunk.row_key.clone()).unwrap_or_default();
        row.cell_in_progress = PartialCell::default();

        self.state = ReadState::CellStart;
        Ok(self)
    }

    /// cell_start seems to be the main worker. It starts a new cell value (rows contain cells, which
    /// can have multiple versions).
    fn cell_start(
        &mut self,
        chunk: &mut ReadRowsResponse_CellChunk,
    ) -> Result<&Self, BigTableError> {
        // cells must have qualifiers.
        if !chunk.has_qualifier() {
            self.state = ReadState::CellComplete;
            return Ok(self);
            // return Err(BigTableError::InvalidChunk("Cell missing qualifier for new cell".to_owned()))
        }
        let qualifier = chunk.take_qualifier().get_value().to_vec();
        // dbg!(chunk.has_qualifier(), String::from_utf8(qualifier.clone()));
        let row = &mut self.row_in_progress;

        if !row.cells.is_empty()
            && !chunk.row_key.is_empty()
            && chunk.row_key != row.row_key.as_bytes()
        {
            return Err(BigTableError::InvalidChunk(
                "Row key changed mid row".to_owned(),
            ));
        }

        let cell = &mut row.cell_in_progress;
        if chunk.has_family_name() {
            chunk
                .take_family_name()
                .get_value()
                .clone_into(&mut cell.family);
        } else {
            if self.last_seen_cell_family.is_none() {
                return Err(BigTableError::InvalidChunk(
                    "Cell missing family for new cell".to_owned(),
                ));
            }
            cell.family = self.last_seen_cell_family.clone().unwrap();
        }

        // A qualifier is the name of the cell. (I don't know why it's called that either.)
        cell.qualifier = String::from_utf8(qualifier).unwrap_or_default();

        // record the timestamp for this cell. (Note: this is not the clock time that it was
        // created, but the timestamp that was used for it's creation. It is used by the
        // garbage collector.)
        cell.timestamp =
            SystemTime::UNIX_EPOCH + Duration::from_micros(chunk.timestamp_micros as u64);

        // If there are additional labels for this cell, record them.
        // can't call map, so do this the semi-hard way
        let mut labels = chunk.take_labels();
        while let Some(label) = labels.pop() {
            cell.labels.push(label)
        }

        // Pre-allocate space for this cell version data. The data will be delivered in
        // multiple chunks. (Not strictly neccessary, but can save us some future allocs)
        if chunk.value_size > 0 {
            cell.value = Vec::with_capacity(chunk.value_size as usize);
            self.state = ReadState::CellInProgress;
        } else {
            // Add the data to what we've got.
            cell.value.extend(chunk.value.clone());
            self.state = ReadState::CellComplete;
        }

        Ok(self)
    }

    /// Continue adding data to the cell version. Cell data may exceed a chunk's max size,
    /// so we contine feeding data into it.
    fn cell_in_progress(
        &mut self,
        chunk: &mut ReadRowsResponse_CellChunk,
    ) -> Result<&Self, BigTableError> {
        let row = &mut self.row_in_progress;
        let cell = &mut row.cell_in_progress;

        // Quick gauntlet to ensure that we have a cell continuation.
        if cell.value_index > 0 {
            if !chunk.row_key.is_empty() {
                return Err(BigTableError::InvalidChunk(
                    "Found row key mid cell".to_owned(),
                ));
            }
            if chunk.has_family_name() {
                return Err(BigTableError::InvalidChunk(
                    "Found family name mid cell".to_owned(),
                ));
            }
            if chunk.has_qualifier() {
                return Err(BigTableError::InvalidChunk(
                    "Found qualifier mid cell".to_owned(),
                ));
            }
            if chunk.get_timestamp_micros() > 0 {
                return Err(BigTableError::InvalidChunk(
                    "Found timestamp mid cell".to_owned(),
                ));
            }
            if chunk.get_labels().is_empty() {
                return Err(BigTableError::InvalidChunk(
                    "Found labels mid cell".to_owned(),
                ));
            }
        }

        let mut val = chunk.take_value();
        cell.value_index += val.len();
        cell.value.append(&mut val);

        self.state = if chunk.value_size > 0 {
            ReadState::CellInProgress
        } else {
            ReadState::CellComplete
        };

        Ok(self)
    }

    /// Wrap up a cell that's been in progress.
    fn cell_complete(
        &mut self,
        chunk: &mut ReadRowsResponse_CellChunk,
    ) -> Result<&Self, BigTableError> {
        let row_in_progress = &mut self.row_in_progress;
        let cell_in_progress = &mut row_in_progress.cell_in_progress;

        let mut family_changed = false;
        if row_in_progress.last_family != cell_in_progress.family {
            family_changed = true;
            let cip_family = cell_in_progress.family.clone();
            row_in_progress.last_family.clone_from(&cip_family);

            // append the cell in progress to the completed cells for this family in the row.
            //
            let cells = match row_in_progress.cells.get(&cip_family) {
                Some(cells) => {
                    let mut cells = cells.clone();
                    cells.push(cell_in_progress.clone());
                    cells
                }
                None => {
                    vec![cell_in_progress.clone()]
                }
            };
            row_in_progress.cells.insert(cip_family, cells);
        }

        // If the family changed, or the cell name changed
        if family_changed || row_in_progress.last_qualifier != cell_in_progress.qualifier {
            let qualifier = cell_in_progress.qualifier.clone();
            row_in_progress.last_qualifier.clone_from(&qualifier);
            let qualifier_cells = vec![Cell {
                family: cell_in_progress.family.clone(),
                timestamp: cell_in_progress.timestamp,
                labels: cell_in_progress.labels.clone(),
                qualifier: cell_in_progress.qualifier.clone(),
                value: cell_in_progress.value.clone(),
                ..Default::default()
            }];
            // The Family ID is only really important for us for garbage collection,
            // the rest of the time, the data is basically flat and contains one cell.
            //
            row_in_progress
                .last_collected_cells
                .insert(qualifier.clone(), qualifier_cells);
            row_in_progress.last_qualifier = qualifier;
        }

        // reset the cell in progress
        cell_in_progress.timestamp = SystemTime::now();
        cell_in_progress.value.clear();
        cell_in_progress.value_index = 0;

        // If this isn't the last item in the row, keep going.
        self.state = if !chunk.has_commit_row() {
            ReadState::CellStart
        } else {
            ReadState::RowComplete
        };

        Ok(self)
    }

    /// wrap up a row, reinitialize our state to read the next row.
    fn row_complete(
        &mut self,
        _chunk: &mut ReadRowsResponse_CellChunk,
    ) -> Result<Row, BigTableError> {
        // now that we're done, write a clean version.
        let row = mem::take(&mut self.row_in_progress);
        self.state = ReadState::RowStart;
        self.last_seen_row_key = Some(row.row_key.clone());

        Ok(Row {
            row_key: row.row_key,
            cells: row.last_collected_cells,
        })
    }

    /// wrap up anything, we're done reading data.
    async fn finalize(&mut self) -> Result<&Self, BigTableError> {
        if self.state != ReadState::RowStart {
            return Err(BigTableError::InvalidChunk(
                "The row remains partial / is not committed.".to_owned(),
            ));
        }
        Ok(self)
    }

    /// Iterate through all the returned chunks and compile them into a hash of finished cells indexed by row_key
    pub async fn process_chunks(
        mut stream: ClientSStreamReceiver<ReadRowsResponse>,
    ) -> Result<BTreeMap<RowKey, Row>, BigTableError> {
        // Work object
        let mut merger = Self::default();

        // finished collection
        let mut rows = BTreeMap::<RowKey, Row>::new();

        while let (Some(row_resp_res), s) = stream.into_future().await {
            stream = s;

            // Bigtable's responses are not reliable.
            // A read can fail for any number of reasons and return either 500 class errors or a corrupted data block,
            // depending on what system failed on the Bigtable side.
            // We want to retry the read in those cases and pick up from where we left off.
            let row = match row_resp_res {
                Ok(v) => v,
                Err(e) => return Err(BigTableError::InvalidRowResponse(e)),
            };
            /*
            ReadRowsResponse:
                pub chunks: ::protobuf::RepeatedField<ReadRowsResponse_CellChunk>,
                pub last_scanned_row_key: ::std::vec::Vec<u8>,
                // special fields
                pub unknown_fields: ::protobuf::UnknownFields,
                pub cached_size: ::protobuf::CachedSize,
            */
            if !row.last_scanned_row_key.is_empty() {
                let row_key = String::from_utf8(row.last_scanned_row_key).unwrap_or_default();
                if merger.last_seen_row_key.clone().unwrap_or_default() >= row_key {
                    return Err(BigTableError::InvalidChunk(
                        "Last scanned row key is out of order".to_owned(),
                    ));
                }
                merger.last_seen_row_key = Some(row_key);
            }

            for mut chunk in row.chunks {
                debug!("🧩 Chunk >> {:?}", &chunk);
                if chunk.get_reset_row() {
                    debug!("‼ resetting row");
                    merger.reset_row(chunk)?;
                    continue;
                }
                // each of these states feed into the next states.
                if merger.state == ReadState::RowStart {
                    debug!("🟧 new row");
                    merger.row_start(&mut chunk)?;
                }
                if merger.state == ReadState::CellStart {
                    debug!("🟡   cell start {:?}", chunk.get_qualifier());
                    merger.cell_start(&mut chunk)?;
                }
                if merger.state == ReadState::CellInProgress {
                    debug!("🟡   cell in progress");
                    merger.cell_in_progress(&mut chunk)?;
                }
                if merger.state == ReadState::CellComplete {
                    debug!("🟨   cell complete");
                    merger.cell_complete(&mut chunk)?;
                }
                if merger.state == ReadState::RowComplete {
                    debug! {"🟧 row complete"};
                    let finished_row = merger.row_complete(&mut chunk)?;
                    rows.insert(finished_row.row_key.clone(), finished_row);
                } else if chunk.has_commit_row() {
                    return Err(BigTableError::InvalidChunk(format!(
                        "Chunk tried to commit in row in wrong state {:?}",
                        merger.state
                    )));
                }

                debug!("🧩 Chunk end {:?}", merger.state);
            }
        }
        merger.finalize().await?;
        debug!("🚣 Rows: {}", &rows.len());
        Ok(rows)
    }
}