1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use super::task_report::task_report;
use crate::backend::Backend;
use redis::Commands;
use std::collections::HashMap;
use std::thread;
use std::time::Duration;
pub fn cache_worker() {
let redis_client = match redis::Client::open("redis://127.0.0.1/") {
Ok(client) => client,
_ => panic!("Redis connection failed, please boot up redis and restart the frontend!"),
};
let mut redis_connection = match redis_client.get_connection() {
Ok(conn) => conn,
_ => panic!("Redis connection failed, please boot up redis and restart the frontend!"),
};
let mut queued_cache: HashMap<String, usize> = HashMap::new();
loop {
let backend = Backend::default();
let mut global_stub: HashMap<String, String> = HashMap::new();
for corpus in &backend.corpora() {
if let Ok(services) = corpus.select_services(&backend.connection) {
for service in &services {
if service.name == "import" {
continue;
}
println!(
"[cache worker] Examining corpus {:?}, service {:?}",
corpus.name, service.name
);
let report = backend.progress_report(corpus, service);
let zero: f64 = 0.0;
let huge: usize = 999_999;
let queued_count_f64: f64 =
report.get("queued").unwrap_or(&zero) + report.get("todo").unwrap_or(&zero);
let queued_count: usize = queued_count_f64 as usize;
let key_base: String = corpus.id.to_string() + "_" + &service.id.to_string();
if *queued_cache.get(&key_base).unwrap_or(&huge) != queued_count {
println!("[cache worker] state changed, invalidating ...");
queued_cache.insert(key_base.clone(), queued_count);
for severity in &["invalid", "fatal", "error", "warning", "no_problem", "info"] {
let key_severity = key_base.clone() + "_" + severity;
println!("[cache worker] DEL {:?}", key_severity);
redis_connection.del(key_severity.clone()).unwrap_or(());
let key_severity_all = key_severity.clone() + "_all_messages";
println!("[cache worker] DEL {:?}", key_severity_all);
redis_connection.del(key_severity_all.clone()).unwrap_or(());
if "no_problem" == *severity {
continue;
}
thread::sleep(Duration::new(1, 0));
let category_report = task_report(
&mut global_stub,
corpus,
service,
Some((*severity).to_string()),
None,
None,
&None,
);
for cat_hash in &category_report {
let string_empty = String::new();
let category = cat_hash.get("name").unwrap_or(&string_empty);
if category.is_empty() || (category == "total") {
continue;
}
let key_category = key_severity.clone() + "_" + category;
println!("[cache worker] DEL {:?}", key_category);
redis_connection.del(key_category.clone()).unwrap_or(());
let key_category_all = key_category + "_all_messages";
println!("[cache worker] DEL {:?}", key_category_all);
redis_connection.del(key_category_all.clone()).unwrap_or(());
let _ = task_report(
&mut global_stub,
corpus,
service,
Some((*severity).to_string()),
Some((*category).to_string()),
None,
&None,
);
}
}
}
}
}
}
thread::sleep(Duration::new(120, 0));
}
}