File size: 6,420 Bytes
c170de8
 
 
db93c31
 
c170de8
db93c31
 
 
c170de8
 
 
 
 
 
db93c31
 
 
 
c170de8
db93c31
 
 
c170de8
 
 
 
 
 
 
db93c31
 
 
 
 
 
 
3c7edb8
db93c31
 
 
 
 
 
 
 
 
 
 
3c7edb8
c170de8
 
 
 
 
 
 
db93c31
c170de8
 
 
 
 
 
 
 
db93c31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c170de8
 
 
 
 
 
 
 
 
 
db93c31
3c7edb8
db93c31
3c7edb8
db93c31
 
 
c170de8
db93c31
 
 
 
c170de8
db93c31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c170de8
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! This module provides the functionality to cache the aggregated results fetched and aggregated
//! from the upstream search engines in a json format.

use error_stack::Report;
use futures::future::try_join_all;
use md5::compute;
use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};

use super::error::PoolError;

/// A named struct which stores the redis Connection url address to which the client will
/// connect to.
///
/// # Fields
///
/// * `connection_pool` - It stores a pool of connections ready to be used.
/// * `pool_size` - It stores the size of the connection pool (in other words the number of
/// connections that should be stored in the pool).
/// * `current_connection` - It stores the index of which connection is being used at the moment.
pub struct RedisCache {
    connection_pool: Vec<ConnectionManager>,
    pool_size: u8,
    current_connection: u8,
}

impl RedisCache {
    /// Constructs a new `SearchResult` with the given arguments needed for the struct.
    ///
    /// # Arguments
    ///
    /// * `redis_connection_url` - It takes the redis Connection url address.
    /// * `pool_size` - It takes the size of the connection pool (in other words the number of
    /// connections that should be stored in the pool).
    pub async fn new(
        redis_connection_url: &str,
        pool_size: u8,
    ) -> Result<Self, Box<dyn std::error::Error>> {
        let client = Client::open(redis_connection_url)?;
        let mut tasks: Vec<_> = Vec::new();

        for _ in 0..pool_size {
            tasks.push(client.get_tokio_connection_manager());
        }

        let redis_cache = RedisCache {
            connection_pool: try_join_all(tasks).await?,
            pool_size,
            current_connection: Default::default(),
        };
        Ok(redis_cache)
    }

    /// A helper function which computes the hash of the url and formats and returns it as string.
    ///
    /// # Arguments
    ///
    /// * `url` - It takes an url as string.
    fn hash_url(&self, url: &str) -> String {
        format!("{:?}", compute(url))
    }

    /// A function which fetches the cached json results as json string from the redis server.
    ///
    /// # Arguments
    ///
    /// * `url` - It takes an url as a string.
    pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<PoolError>> {
        self.current_connection = Default::default();
        let hashed_url_string: &str = &self.hash_url(url);

        let mut result: Result<String, RedisError> = self.connection_pool
            [self.current_connection as usize]
            .get(hashed_url_string)
            .await;

        // Code to check whether the current connection being used is dropped with connection error
        // or not. if it drops with the connection error then the current connection is replaced
        // with a new connection from the pool which is then used to run the redis command then
        // that connection is also checked whether it is dropped or not if it is not then the
        // result is passed as a `Result` or else the same process repeats again and if all of the
        // connections in the pool result in connection drop error then a custom pool error is
        // returned.
        loop {
            match result {
                Err(error) => match error.is_connection_dropped() {
                    true => {
                        self.current_connection += 1;
                        if self.current_connection == self.pool_size {
                            return Err(Report::new(
                                PoolError::PoolExhaustionWithConnectionDropError,
                            ));
                        }
                        result = self.connection_pool[self.current_connection as usize]
                            .get(hashed_url_string)
                            .await;
                        continue;
                    }
                    false => return Err(Report::new(PoolError::RedisError(error))),
                },
                Ok(res) => return Ok(res),
            }
        }
    }

    /// A function which caches the results by using the hashed `url` as the key and
    /// `json results` as the value and stores it in redis server with ttl(time to live)
    /// set to 60 seconds.
    ///
    /// # Arguments
    ///
    /// * `json_results` - It takes the json results string as an argument.
    /// * `url` - It takes the url as a String.
    pub async fn cache_results(
        &mut self,
        json_results: &str,
        url: &str,
    ) -> Result<(), Report<PoolError>> {
        self.current_connection = Default::default();
        let hashed_url_string: &str = &self.hash_url(url);

        let mut result: Result<(), RedisError> = self.connection_pool
            [self.current_connection as usize]
            .set_ex(hashed_url_string, json_results, 60)
            .await;

        // Code to check whether the current connection being used is dropped with connection error
        // or not. if it drops with the connection error then the current connection is replaced
        // with a new connection from the pool which is then used to run the redis command then
        // that connection is also checked whether it is dropped or not if it is not then the
        // result is passed as a `Result` or else the same process repeats again and if all of the
        // connections in the pool result in connection drop error then a custom pool error is
        // returned.
        loop {
            match result {
                Err(error) => match error.is_connection_dropped() {
                    true => {
                        self.current_connection += 1;
                        if self.current_connection == self.pool_size {
                            return Err(Report::new(
                                PoolError::PoolExhaustionWithConnectionDropError,
                            ));
                        }
                        result = self.connection_pool[self.current_connection as usize]
                            .set_ex(hashed_url_string, json_results, 60)
                            .await;
                        continue;
                    }
                    false => return Err(Report::new(PoolError::RedisError(error))),
                },
                Ok(_) => return Ok(()),
            }
        }
    }
}