neon_arch commited on
Commit
db93c31
1 Parent(s): 4afc0d1

⚙️ refactor: implement async pooling for redis connections (#180)(#178)

Browse files
Files changed (1) hide show
  1. src/cache/cacher.rs +103 -23
src/cache/cacher.rs CHANGED
@@ -1,17 +1,26 @@
1
  //! This module provides the functionality to cache the aggregated results fetched and aggregated
2
  //! from the upstream search engines in a json format.
3
 
 
 
4
  use md5::compute;
5
- use redis::{Client, Commands, Connection};
 
 
6
 
7
  /// A named struct which stores the redis Connection url address to which the client will
8
  /// connect to.
9
  ///
10
  /// # Fields
11
  ///
12
- /// * `redis_connection_url` - It stores the redis Connection url address.
 
 
 
13
  pub struct RedisCache {
14
- connection: Connection,
 
 
15
  }
16
 
17
  impl RedisCache {
@@ -19,11 +28,25 @@ impl RedisCache {
19
  ///
20
  /// # Arguments
21
  ///
22
- /// * `redis_connection_url` - It stores the redis Connection url address.
23
- pub fn new(redis_connection_url: String) -> Result<Self, Box<dyn std::error::Error>> {
 
 
 
 
 
24
  let client = Client::open(redis_connection_url)?;
25
- let connection = client.get_connection()?;
26
- let redis_cache = RedisCache { connection };
 
 
 
 
 
 
 
 
 
27
  Ok(redis_cache)
28
  }
29
 
@@ -32,7 +55,7 @@ impl RedisCache {
32
  /// # Arguments
33
  ///
34
  /// * `url` - It takes an url as string.
35
- fn hash_url(url: &str) -> String {
36
  format!("{:?}", compute(url))
37
  }
38
 
@@ -41,9 +64,42 @@ impl RedisCache {
41
  /// # Arguments
42
  ///
43
  /// * `url` - It takes an url as a string.
44
- pub fn cached_json(&mut self, url: &str) -> Result<String, Box<dyn std::error::Error>> {
45
- let hashed_url_string = Self::hash_url(url);
46
- Ok(self.connection.get(hashed_url_string)?)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  }
48
 
49
  /// A function which caches the results by using the hashed `url` as the key and
@@ -54,21 +110,45 @@ impl RedisCache {
54
  ///
55
  /// * `json_results` - It takes the json results string as an argument.
56
  /// * `url` - It takes the url as a String.
57
- pub fn cache_results(
58
  &mut self,
59
- json_results: String,
60
  url: &str,
61
- ) -> Result<(), Box<dyn std::error::Error>> {
62
- let hashed_url_string = Self::hash_url(url);
63
-
64
- // put results_json into cache
65
- self.connection.set(&hashed_url_string, json_results)?;
66
 
67
- // Set the TTL for the key to 60 seconds
68
- self.connection
69
- .expire::<String, u32>(hashed_url_string, 60)
70
- .unwrap();
71
 
72
- Ok(())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  }
74
  }
 
1
  //! This module provides the functionality to cache the aggregated results fetched and aggregated
2
  //! from the upstream search engines in a json format.
3
 
4
+ use error_stack::Report;
5
+ use futures::future::try_join_all;
6
  use md5::compute;
7
+ use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};
8
+
9
+ use super::error::PoolError;
10
 
11
  /// A named struct which stores the redis Connection url address to which the client will
12
  /// connect to.
13
  ///
14
  /// # Fields
15
  ///
16
+ /// * `connection_pool` - It stores a pool of connections ready to be used.
17
+ /// * `pool_size` - It stores the size of the connection pool (in other words the number of
18
+ /// connections that should be stored in the pool).
19
+ /// * `current_connection` - It stores the index of which connection is being used at the moment.
20
  pub struct RedisCache {
21
+ connection_pool: Vec<ConnectionManager>,
22
+ pool_size: u8,
23
+ current_connection: u8,
24
  }
25
 
26
  impl RedisCache {
 
28
  ///
29
  /// # Arguments
30
  ///
31
+ /// * `redis_connection_url` - It takes the redis Connection url address.
32
+ /// * `pool_size` - It takes the size of the connection pool (in other words the number of
33
+ /// connections that should be stored in the pool).
34
+ pub async fn new(
35
+ redis_connection_url: &str,
36
+ pool_size: u8,
37
+ ) -> Result<Self, Box<dyn std::error::Error>> {
38
  let client = Client::open(redis_connection_url)?;
39
+ let mut tasks: Vec<_> = Vec::new();
40
+
41
+ for _ in 0..pool_size {
42
+ tasks.push(client.get_tokio_connection_manager());
43
+ }
44
+
45
+ let redis_cache = RedisCache {
46
+ connection_pool: try_join_all(tasks).await?,
47
+ pool_size,
48
+ current_connection: Default::default(),
49
+ };
50
  Ok(redis_cache)
51
  }
52
 
 
55
  /// # Arguments
56
  ///
57
  /// * `url` - It takes an url as string.
58
+ fn hash_url(&self, url: &str) -> String {
59
  format!("{:?}", compute(url))
60
  }
61
 
 
64
  /// # Arguments
65
  ///
66
  /// * `url` - It takes an url as a string.
67
+ pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<PoolError>> {
68
+ self.current_connection = Default::default();
69
+ let hashed_url_string: &str = &self.hash_url(url);
70
+
71
+ let mut result: Result<String, RedisError> = self.connection_pool
72
+ [self.current_connection as usize]
73
+ .get(hashed_url_string)
74
+ .await;
75
+
76
+ // Code to check whether the current connection being used is dropped with connection error
77
+ // or not. if it drops with the connection error then the current connection is replaced
78
+ // with a new connection from the pool which is then used to run the redis command then
79
+ // that connection is also checked whether it is dropped or not if it is not then the
80
+ // result is passed as a `Result` or else the same process repeats again and if all of the
81
+ // connections in the pool result in connection drop error then a custom pool error is
82
+ // returned.
83
+ loop {
84
+ match result {
85
+ Err(error) => match error.is_connection_dropped() {
86
+ true => {
87
+ self.current_connection += 1;
88
+ if self.current_connection == self.pool_size {
89
+ return Err(Report::new(
90
+ PoolError::PoolExhaustionWithConnectionDropError,
91
+ ));
92
+ }
93
+ result = self.connection_pool[self.current_connection as usize]
94
+ .get(hashed_url_string)
95
+ .await;
96
+ continue;
97
+ }
98
+ false => return Err(Report::new(PoolError::RedisError(error))),
99
+ },
100
+ Ok(res) => return Ok(res),
101
+ }
102
+ }
103
  }
104
 
105
  /// A function which caches the results by using the hashed `url` as the key and
 
110
  ///
111
  /// * `json_results` - It takes the json results string as an argument.
112
  /// * `url` - It takes the url as a String.
113
+ pub async fn cache_results(
114
  &mut self,
115
+ json_results: &str,
116
  url: &str,
117
+ ) -> Result<(), Report<PoolError>> {
118
+ self.current_connection = Default::default();
119
+ let hashed_url_string: &str = &self.hash_url(url);
 
 
120
 
121
+ let mut result: Result<(), RedisError> = self.connection_pool
122
+ [self.current_connection as usize]
123
+ .set_ex(hashed_url_string, json_results, 60)
124
+ .await;
125
 
126
+ // Code to check whether the current connection being used is dropped with connection error
127
+ // or not. if it drops with the connection error then the current connection is replaced
128
+ // with a new connection from the pool which is then used to run the redis command then
129
+ // that connection is also checked whether it is dropped or not if it is not then the
130
+ // result is passed as a `Result` or else the same process repeats again and if all of the
131
+ // connections in the pool result in connection drop error then a custom pool error is
132
+ // returned.
133
+ loop {
134
+ match result {
135
+ Err(error) => match error.is_connection_dropped() {
136
+ true => {
137
+ self.current_connection += 1;
138
+ if self.current_connection == self.pool_size {
139
+ return Err(Report::new(
140
+ PoolError::PoolExhaustionWithConnectionDropError,
141
+ ));
142
+ }
143
+ result = self.connection_pool[self.current_connection as usize]
144
+ .set_ex(hashed_url_string, json_results, 60)
145
+ .await;
146
+ continue;
147
+ }
148
+ false => return Err(Report::new(PoolError::RedisError(error))),
149
+ },
150
+ Ok(_) => return Ok(()),
151
+ }
152
+ }
153
  }
154
  }