autopush_common/
redis_util.rs

1use redis::{aio::ConnectionLike, cmd, pipe, AsyncCommands, Pipeline, RedisError, ToRedisArgs};
2
3pub const MAX_TRANSACTION_LOOP: usize = 10;
4
5/// Async version of [redis::transaction]
6///
7/// Note that transaction usage is problematic when multiplexing is utilized on the passed in async
8/// Connection, as this would cause the multiplexed commands to interleave with the
9/// transaction's. The transaction's Connection should not be shared.
10pub async fn transaction<
11    C: ConnectionLike + AsyncCommands,
12    K: ToRedisArgs,
13    T,
14    F: AsyncFnMut(&mut C, &mut Pipeline) -> Result<Option<T>, E>,
15    E: From<RedisError>,
16    R: FnOnce() -> E,
17>(
18    con: &mut C,
19    keys: &[K],
20    retries: usize,
21    retry_err: R,
22    func: F,
23) -> Result<T, E> {
24    let mut func = func;
25    for _i in 0..retries {
26        cmd("WATCH").arg(keys).exec_async(con).await?;
27        let mut p = pipe();
28        let response: Option<T> = func(con, p.atomic()).await?;
29        match response {
30            None => {
31                continue;
32            }
33            Some(response) => {
34                // make sure no watch is left in the connection, even if
35                // someone forgot to use the pipeline.
36                cmd("UNWATCH").exec_async(con).await?;
37                return Ok(response);
38            }
39        }
40    }
41
42    // The transaction failed, so return an error.
43    Err(retry_err())
44}