autopush_common/
redis_util.rs1use redis::{aio::ConnectionLike, cmd, pipe, AsyncCommands, Pipeline, RedisError, ToRedisArgs};
2
3pub const MAX_TRANSACTION_LOOP: usize = 10;
4
5pub async fn transaction<
11 C: ConnectionLike + AsyncCommands,
12 K: ToRedisArgs,
13 T,
14 F: AsyncFnMut(&mut C, &mut Pipeline) -> Result<Option<T>, E>,
15 E: From<RedisError>,
16 R: FnOnce() -> E,
17>(
18 con: &mut C,
19 keys: &[K],
20 retries: usize,
21 retry_err: R,
22 func: F,
23) -> Result<T, E> {
24 let mut func = func;
25 for _i in 0..retries {
26 cmd("WATCH").arg(keys).exec_async(con).await?;
27 let mut p = pipe();
28 let response: Option<T> = func(con, p.atomic()).await?;
29 match response {
30 None => {
31 continue;
32 }
33 Some(response) => {
34 cmd("UNWATCH").exec_async(con).await?;
37 return Ok(response);
38 }
39 }
40 }
41
42 Err(retry_err())
44}