File size: 7,324 Bytes
996ff84
 
 
991f3f5
996ff84
991f3f5
996ff84
 
991f3f5
 
996ff84
 
 
 
 
 
 
 
 
 
 
5a8d61f
 
991f3f5
 
996ff84
 
 
 
 
 
 
 
 
 
991f3f5
 
578c7bc
 
 
 
 
996ff84
 
 
5a8d61f
996ff84
 
991f3f5
996ff84
 
991f3f5
 
 
 
 
 
 
 
 
996ff84
 
 
991f3f5
996ff84
 
5a8d61f
991f3f5
996ff84
991f3f5
996ff84
 
 
e704c26
996ff84
 
 
e704c26
578c7bc
 
 
e704c26
578c7bc
e704c26
996ff84
 
 
 
e704c26
996ff84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d33129c
996ff84
 
 
e704c26
996ff84
 
 
d33129c
996ff84
 
 
 
 
 
e704c26
996ff84
 
 
 
 
 
e704c26
578c7bc
 
 
 
 
e704c26
996ff84
c762f9c
 
d33129c
996ff84
 
c762f9c
991f3f5
 
c762f9c
 
991f3f5
 
c762f9c
996ff84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d33129c
996ff84
 
991f3f5
 
c762f9c
 
 
996ff84
 
 
d33129c
996ff84
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//! This module provides the functionality to cache the aggregated results fetched and aggregated
//! from the upstream search engines in a json format.

use super::error::CacheError;
use error_stack::Report;
use futures::stream::FuturesUnordered;
use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};

/// A constant holding the redis pipeline size.
const REDIS_PIPELINE_SIZE: usize = 3;

/// A named struct which stores the redis Connection url address to which the client will
/// connect to.
pub struct RedisCache {
    /// It stores a pool of connections ready to be used.
    connection_pool: Vec<ConnectionManager>,
    /// It stores the size of the connection pool (in other words the number of
    /// connections that should be stored in the pool).
    pool_size: u8,
    /// It stores the index of which connection is being used at the moment.
    current_connection: u8,
    /// It stores the max TTL for keys.
    cache_ttl: u16,
    /// It stores the redis pipeline struct of size 3.
    pipeline: redis::Pipeline,
}

impl RedisCache {
    /// A function which fetches the cached json results as json string.
    ///
    /// # Arguments
    ///
    /// * `redis_connection_url` - It takes the redis Connection url address.
    /// * `pool_size` - It takes the size of the connection pool (in other words the number of
    /// connections that should be stored in the pool).
    /// * `cache_ttl` - It takes the the time to live for cached results to live in the redis
    /// server.
    ///
    /// # Error
    ///
    /// Returns a newly constructed `RedisCache` struct on success otherwise returns a standard
    /// error type.
    pub async fn new(
        redis_connection_url: &str,
        pool_size: u8,
        cache_ttl: u16,
    ) -> Result<Self, Box<dyn std::error::Error>> {
        let client = Client::open(redis_connection_url)?;
        let tasks: FuturesUnordered<_> = FuturesUnordered::new();

        for _ in 0..pool_size {
            let client_partially_cloned = client.clone();
            tasks.push(tokio::spawn(async move {
                client_partially_cloned.get_connection_manager().await
            }));
        }

        let mut outputs = Vec::new();
        for task in tasks {
            outputs.push(task.await??);
        }

        let redis_cache = RedisCache {
            connection_pool: outputs,
            pool_size,
            current_connection: Default::default(),
            cache_ttl,
            pipeline: redis::Pipeline::with_capacity(REDIS_PIPELINE_SIZE),
        };

        Ok(redis_cache)
    }

    /// A function which fetches the cached json as json string from the redis server.
    ///
    /// # Arguments
    ///
    /// * `key` - It takes a string as key.
    ///
    /// # Error
    ///
    /// Returns the json as a String from the cache on success otherwise returns a `CacheError`
    /// on a failure.
    pub async fn cached_json(&mut self, key: &str) -> Result<String, Report<CacheError>> {
        self.current_connection = Default::default();

        let mut result: Result<String, RedisError> = self.connection_pool
            [self.current_connection as usize]
            .get(key)
            .await;

        // Code to check whether the current connection being used is dropped with connection error
        // or not. if it drops with the connection error then the current connection is replaced
        // with a new connection from the pool which is then used to run the redis command then
        // that connection is also checked whether it is dropped or not if it is not then the
        // result is passed as a `Result` or else the same process repeats again and if all of the
        // connections in the pool result in connection drop error then a custom pool error is
        // returned.
        loop {
            match result {
                Err(error) => match error.is_connection_dropped() {
                    true => {
                        self.current_connection += 1;
                        if self.current_connection == self.pool_size {
                            return Err(Report::new(
                                CacheError::PoolExhaustionWithConnectionDropError,
                            ));
                        }
                        result = self.connection_pool[self.current_connection as usize]
                            .get(key)
                            .await;
                        continue;
                    }
                    false => return Err(Report::new(CacheError::RedisError(error))),
                },
                Ok(res) => return Ok(res),
            }
        }
    }

    /// A function which caches the json by using the key and
    /// `json results` as the value and stores it in redis server with ttl(time to live)
    /// set to 60 seconds.
    ///
    /// # Arguments
    ///
    /// * `json_results` - It takes the json results string as an argument.
    /// * `key` - It takes the key as a String.
    ///
    /// # Error
    ///
    /// Returns an unit type if the results are cached succesfully otherwise returns a `CacheError`
    /// on a failure.
    pub async fn cache_json(
        &mut self,
        json_results: impl Iterator<Item = String>,
        keys: impl Iterator<Item = String>,
    ) -> Result<(), Report<CacheError>> {
        self.current_connection = Default::default();

        for (key, json_result) in keys.zip(json_results) {
            self.pipeline
                .set_ex(key, json_result, self.cache_ttl.into());
        }

        let mut result: Result<(), RedisError> = self
            .pipeline
            .query_async(&mut self.connection_pool[self.current_connection as usize])
            .await;

        // Code to check whether the current connection being used is dropped with connection error
        // or not. if it drops with the connection error then the current connection is replaced
        // with a new connection from the pool which is then used to run the redis command then
        // that connection is also checked whether it is dropped or not if it is not then the
        // result is passed as a `Result` or else the same process repeats again and if all of the
        // connections in the pool result in connection drop error then a custom pool error is
        // returned.
        loop {
            match result {
                Err(error) => match error.is_connection_dropped() {
                    true => {
                        self.current_connection += 1;
                        if self.current_connection == self.pool_size {
                            return Err(Report::new(
                                CacheError::PoolExhaustionWithConnectionDropError,
                            ));
                        }
                        result = self
                            .pipeline
                            .query_async(
                                &mut self.connection_pool[self.current_connection as usize],
                            )
                            .await;
                        continue;
                    }
                    false => return Err(Report::new(CacheError::RedisError(error))),
                },
                Ok(_) => return Ok(()),
            }
        }
    }
}