1#![allow(clippy::extra_unused_lifetimes)]
2use chrono::prelude::*;
3use diesel::result::Error;
5use diesel::sql_types::{BigInt, Integer, Text, Timestamp};
6use diesel::*;
7use rocket::serde::Serialize;
8use crate::concerns::CortexInsertable;
12use crate::helpers::TaskStatus;
13use crate::models::{Corpus, Service};
14use crate::schema::{historical_tasks, tasks};
15
16#[derive(Identifiable, Queryable, Clone, Debug, PartialEq, Eq, QueryableByName)]
17#[diesel(table_name = historical_tasks)]
18pub struct HistoricalTask {
20 pub id: i64,
22 pub task_id: i64,
24 pub status: i32,
26 pub saved_at: NaiveDateTime,
28}
29
30#[derive(Insertable, Debug, Clone)]
31#[diesel(table_name = historical_tasks)]
32pub struct NewHistoricalTask {
34 pub task_id: i64,
36 pub status: i32,
38}
39
40#[derive(Debug, Clone, Serialize)]
41pub struct TaskRunMetadata {
43 pub task_id: String,
45 pub entry: String,
47 pub previous_status: String,
49 pub current_status: String,
51 pub previous_highlight: String,
53 pub current_highlight: String,
55 pub previous_saved_at: String,
57 pub current_saved_at: String,
59}
60
61#[derive(Debug, Clone, Serialize)]
62pub struct DiffStatusRow {
64 pub previous_status: String,
66 pub current_status: String,
68 pub previous_highlight: String,
70 pub current_highlight: String,
72 pub task_count: usize,
74}
75
76#[derive(QueryableByName)]
80struct StatusChangeCell {
81 #[diesel(sql_type = Integer)]
83 previous_status: i32,
84 #[diesel(sql_type = Integer)]
86 current_status: i32,
87 #[diesel(sql_type = BigInt)]
89 task_count: i64,
90}
91
92pub type StatusChangeMatrix = (Vec<String>, Vec<(i32, i32, i64)>);
95
96#[derive(Debug, Clone, Default)]
97pub struct DiffStatusFilter {
99 pub previous_status: Option<TaskStatus>,
101 pub current_status: Option<TaskStatus>,
103 pub previous_date: Option<NaiveDateTime>,
105 pub current_date: Option<NaiveDateTime>,
107 pub offset: usize,
109 pub page_size: usize,
111}
112
113pub type HistoricalReportOverview = (
116 Vec<String>,
117 Vec<(HistoricalTaskReport, HistoricalTaskReport)>,
118);
119
120impl CortexInsertable for NewHistoricalTask {
121 fn create(&self, connection: &mut PgConnection) -> Result<usize, Error> {
122 insert_into(historical_tasks::table)
123 .values(self)
124 .execute(connection)
125 }
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, QueryableByName)]
129#[diesel(check_for_backend(diesel::pg::Pg))]
130#[diesel(table_name = historical_tasks)]
131pub struct HistoricalTaskReport {
133 #[diesel(sql_type = BigInt)]
134 pub task_id: i64,
136 #[diesel(sql_type = Integer)]
137 pub status: i32,
139 #[diesel(sql_type = Timestamp)]
140 pub saved_at: NaiveDateTime,
142 #[diesel(sql_type = Text)]
143 pub entry: String,
145}
146
147impl HistoricalTask {
148 pub fn retention_stats(
152 connection: &mut PgConnection,
153 ) -> Result<(i64, Option<NaiveDateTime>), Error> {
154 use crate::schema::historical_tasks::dsl;
155 let total: i64 = dsl::historical_tasks.count().get_result(connection)?;
156 let oldest: Option<NaiveDateTime> = dsl::historical_tasks
157 .select(diesel::dsl::min(dsl::saved_at))
158 .first(connection)?;
159 Ok((total, oldest))
160 }
161
162 pub fn count_before(connection: &mut PgConnection, cutoff: NaiveDateTime) -> Result<i64, Error> {
165 use crate::schema::historical_tasks::dsl;
166 dsl::historical_tasks
167 .filter(dsl::saved_at.lt(cutoff))
168 .count()
169 .get_result(connection)
170 }
171
172 pub fn prune_before(
176 connection: &mut PgConnection,
177 cutoff: NaiveDateTime,
178 ) -> Result<usize, Error> {
179 use crate::schema::historical_tasks::dsl;
180 diesel::delete(dsl::historical_tasks.filter(dsl::saved_at.lt(cutoff))).execute(connection)
181 }
182
183 pub fn find_by(needle_id: i64, connection: &mut PgConnection) -> Result<Vec<Self>, Error> {
185 use crate::schema::historical_tasks::dsl::{saved_at, task_id};
186 let runs: Vec<HistoricalTask> = historical_tasks::table
187 .filter(task_id.eq(needle_id))
188 .order(saved_at.desc())
189 .get_results(connection)?;
190 Ok(runs)
191 }
192
193 pub fn find_most_recent(
195 needle_id: i64,
196 connection: &mut PgConnection,
197 ) -> Result<Option<HistoricalTask>, Error> {
198 use crate::schema::historical_tasks::dsl::{saved_at, task_id};
199 historical_tasks::table
200 .filter(task_id.eq(needle_id))
201 .order(saved_at.desc())
202 .first(connection)
203 .optional()
204 }
205
206 pub fn report_for(
210 corpus: &Corpus,
211 service: &Service,
212 filters: Option<DiffStatusFilter>,
213 connection: &mut PgConnection,
214 ) -> Result<HistoricalReportOverview, Error> {
215 use crate::schema::historical_tasks::dsl::{saved_at, task_id};
216 use crate::schema::tasks::dsl::{corpus_id, service_id};
217 let tasks_subquery = tasks::table
220 .filter(corpus_id.eq(corpus.id))
221 .filter(service_id.eq(service.id))
222 .order(tasks::id.asc())
223 .select(tasks::id);
224 let mut dates: Vec<NaiveDateTime> = Vec::new();
225 let mut previous_status = None;
226 let mut current_status = None;
227 let mut offset = 0;
228 let mut page_size = 0;
229 if let Some(opts) = filters {
230 previous_status = opts.previous_status;
231 current_status = opts.current_status;
232 offset = opts.offset;
233 page_size = opts.page_size;
234 if let (Some(previous_date_param), Some(current_date_param)) =
235 (opts.previous_date, opts.current_date)
236 {
237 dates.push(current_date_param);
238 dates.push(previous_date_param);
239 }
240 }
241 let all_dates = historical_tasks::table
242 .filter(task_id.eq_any(tasks_subquery))
243 .order(saved_at.desc())
244 .select(saved_at)
245 .distinct()
246 .get_results(connection)?;
247 if dates.is_empty() && all_dates.len() > 1 {
248 dates = vec![all_dates[0], all_dates[1]];
249 }
250 if dates.len() < 2 {
253 return Ok((Vec::new(), Vec::new()));
254 }
255 let dates_labels = all_dates
256 .into_iter()
257 .map(|date| date.format("%Y-%m-%d %H:%M:%S%.f").to_string())
258 .collect();
259 let mut reported_tasks = Vec::new();
260 {
265 let recent_historical_tasks: Vec<HistoricalTaskReport> = if let (Some(prev), Some(cur)) =
273 (previous_status, current_status)
274 {
275 let matching_tasks_query = r###"
277 WITH matching_tasks AS (
278 SELECT h.task_id
279 FROM historical_tasks h
280 JOIN tasks t ON h.task_id = t.id
281 WHERE t.corpus_id = $1 AND t.service_id = $2
282 AND (
283 (h.saved_at = $3 AND h.status = $4) OR
284 (h.saved_at = $5 AND h.status = $6)
285 )
286 GROUP BY h.task_id
287 HAVING COUNT(DISTINCT h.status) = 2
288 )
289 SELECT h.task_id, h.status, h.saved_at, t.entry
290 FROM historical_tasks h
291 JOIN tasks t ON h.task_id = t.id
292 WHERE h.task_id IN (SELECT task_id FROM matching_tasks)
293 AND h.saved_at IN ($3, $5)
294 ORDER BY t.entry ASC, h.task_id ASC, h.saved_at ASC
295 OFFSET $7
296 LIMIT $8;
297 "###;
298 sql_query(matching_tasks_query)
299 .bind::<Integer, _>(corpus.id)
300 .bind::<Integer, _>(service.id)
301 .bind::<Timestamp, _>(dates[0])
302 .bind::<Integer, _>(cur.raw())
303 .bind::<Timestamp, _>(dates[1])
304 .bind::<Integer, _>(prev.raw())
305 .bind::<Integer, _>(2 * offset as i32)
306 .bind::<Integer, _>(2 * page_size as i32)
307 .get_results::<HistoricalTaskReport>(connection)?
308 } else {
309 let changed_tasks_query = r###"
312 WITH matching_tasks AS (
313 SELECT h.task_id
314 FROM historical_tasks h
315 JOIN tasks t ON h.task_id = t.id
316 WHERE t.corpus_id = $1 AND t.service_id = $2
317 AND h.saved_at IN ($3, $4)
318 GROUP BY h.task_id
319 HAVING COUNT(DISTINCT h.saved_at) = 2 AND COUNT(DISTINCT h.status) = 2
320 )
321 SELECT h.task_id, h.status, h.saved_at, t.entry
322 FROM historical_tasks h
323 JOIN tasks t ON h.task_id = t.id
324 WHERE h.saved_at IN ($3, $4)
325 AND h.task_id IN (SELECT task_id FROM matching_tasks)
326 ORDER BY t.entry ASC, h.task_id ASC, h.saved_at ASC
327 OFFSET $5
328 LIMIT $6;
329 "###;
330 sql_query(changed_tasks_query)
331 .bind::<Integer, _>(corpus.id)
332 .bind::<Integer, _>(service.id)
333 .bind::<Timestamp, _>(dates[0])
334 .bind::<Timestamp, _>(dates[1])
335 .bind::<Integer, _>(2 * offset as i32)
336 .bind::<Integer, _>(2 * page_size as i32)
337 .get_results::<HistoricalTaskReport>(connection)?
338 };
339 let mut iter = recent_historical_tasks.into_iter();
341 while let Some(t1) = iter.next() {
342 if let Some(t2) = iter.next() {
343 if t1.task_id == t2.task_id {
344 reported_tasks.push((t1, t2));
345 }
346 } else {
347 break;
348 }
349 }
350 }
351 Ok((dates_labels, reported_tasks))
352 }
353
354 pub fn status_change_matrix(
364 corpus: &Corpus,
365 service: &Service,
366 previous_date: Option<NaiveDateTime>,
367 current_date: Option<NaiveDateTime>,
368 connection: &mut PgConnection,
369 ) -> Result<StatusChangeMatrix, Error> {
370 use crate::schema::historical_tasks::dsl::{saved_at, task_id};
371 use crate::schema::tasks::dsl::{corpus_id, service_id};
372
373 let tasks_subquery = tasks::table
377 .filter(corpus_id.eq(corpus.id))
378 .filter(service_id.eq(service.id))
379 .select(tasks::id);
380 let all_dates: Vec<NaiveDateTime> = historical_tasks::table
381 .filter(task_id.eq_any(tasks_subquery))
382 .order(saved_at.desc())
383 .select(saved_at)
384 .distinct()
385 .get_results(connection)?;
386 let dates_labels = all_dates
387 .iter()
388 .map(|date| date.format("%Y-%m-%d %H:%M:%S%.f").to_string())
389 .collect();
390
391 let (older, newer) = match (previous_date, current_date) {
393 (Some(previous), Some(current)) => (previous, current),
394 _ if all_dates.len() > 1 => (all_dates[1], all_dates[0]),
395 _ => return Ok((dates_labels, Vec::new())),
396 };
397
398 let matrix = sql_query(
401 "SELECT prev.status AS previous_status, cur.status AS current_status, \
402 COUNT(*) AS task_count \
403 FROM historical_tasks prev \
404 JOIN historical_tasks cur ON prev.task_id = cur.task_id \
405 JOIN tasks t ON t.id = prev.task_id \
406 WHERE t.corpus_id = $1 AND t.service_id = $2 \
407 AND prev.saved_at = $3 AND cur.saved_at = $4 \
408 GROUP BY prev.status, cur.status",
409 )
410 .bind::<Integer, _>(corpus.id)
411 .bind::<Integer, _>(service.id)
412 .bind::<Timestamp, _>(older)
413 .bind::<Timestamp, _>(newer)
414 .get_results::<StatusChangeCell>(connection)?
415 .into_iter()
416 .map(|cell| (cell.previous_status, cell.current_status, cell.task_count))
417 .collect();
418 Ok((dates_labels, matrix))
419 }
420}