1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use crate::backend::DEFAULT_DB_ADDRESS;
use crate::dispatcher::finalize::Finalize;
use crate::dispatcher::sink::Sink;
use crate::dispatcher::ventilator::Ventilator;
use crate::helpers::{TaskProgress, TaskReport};
use crate::models::Service;
use zmq::Error;
pub struct TaskManager {
pub source_port: usize,
pub result_port: usize,
pub queue_size: usize,
pub message_size: usize,
pub backend_address: String,
}
impl Default for TaskManager {
fn default() -> TaskManager {
TaskManager {
source_port: 51695,
result_port: 51696,
queue_size: 100,
message_size: 100_000,
backend_address: DEFAULT_DB_ADDRESS.to_string(),
}
}
}
impl TaskManager {
pub fn start(&self, job_limit: Option<usize>) -> Result<(), Error> {
let services: HashMap<String, Option<Service>> = HashMap::new();
let progress_queue: HashMap<i64, TaskProgress> = HashMap::new();
let done_queue: Vec<TaskReport> = Vec::new();
let services_arc = Arc::new(Mutex::new(services));
let progress_queue_arc = Arc::new(Mutex::new(progress_queue));
let done_queue_arc = Arc::new(Mutex::new(done_queue));
let source_port = self.source_port;
let source_queue_size = self.queue_size;
let source_message_size = self.message_size;
let source_backend_address = self.backend_address.clone();
let vent_services_arc = services_arc.clone();
let vent_progress_queue_arc = progress_queue_arc.clone();
let vent_done_queue_arc = done_queue_arc.clone();
let vent_thread = thread::spawn(move || {
Ventilator {
port: source_port,
queue_size: source_queue_size,
message_size: source_message_size,
backend_address: source_backend_address.clone(),
}
.start(
&vent_services_arc,
&vent_progress_queue_arc,
&vent_done_queue_arc,
job_limit,
)
.unwrap_or_else(|e| panic!("Failed in ventilator thread: {:?}", e));
});
let finalize_backend_address = self.backend_address.clone();
let finalize_done_queue_arc = done_queue_arc.clone();
let finalize_thread = thread::spawn(move || {
Finalize {
backend_address: finalize_backend_address,
job_limit,
}
.start(&finalize_done_queue_arc)
.unwrap_or_else(|e| panic!("Failed in finalize thread: {:?}", e));
});
let result_port = self.result_port;
let result_queue_size = self.queue_size;
let result_message_size = self.message_size;
let result_backend_address = self.backend_address.clone();
let sink_services_arc = services_arc;
let sink_progress_queue_arc = progress_queue_arc;
let sink_done_queue_arc = done_queue_arc;
let sink_thread = thread::spawn(move || {
Sink {
port: result_port,
queue_size: result_queue_size,
message_size: result_message_size,
backend_address: result_backend_address.clone(),
}
.start(
&sink_services_arc,
&sink_progress_queue_arc,
&sink_done_queue_arc,
job_limit,
)
.unwrap_or_else(|e| panic!("Failed in sink thread: {:?}", e));
});
if vent_thread.join().is_err() {
println!("Ventilator thread died unexpectedly!");
Err(zmq::Error::ETERM)
} else if sink_thread.join().is_err() {
println!("Sink thread died unexpectedly!");
Err(zmq::Error::ETERM)
} else if finalize_thread.join().is_err() {
println!("DB thread died unexpectedly!");
Err(zmq::Error::ETERM)
} else {
println!("Manager successfully terminated!");
Ok(())
}
}
}