Skip to main content

cortex/models/
historical_tasks.rs

1#![allow(clippy::extra_unused_lifetimes)]
2use chrono::prelude::*;
3// use diesel::pg::Pg;
4use diesel::result::Error;
5use diesel::sql_types::{BigInt, Integer, Text, Timestamp};
6use diesel::*;
7use rocket::serde::Serialize;
8// use super::messages::*;
9// use super::tasks::Task;
10// use crate::helpers::TaskStatus;
11use crate::concerns::CortexInsertable;
12use crate::helpers::TaskStatus;
13use crate::models::{Corpus, Service};
14use crate::schema::{historical_tasks, tasks};
15
16#[derive(Identifiable, Queryable, Clone, Debug, PartialEq, Eq, QueryableByName)]
17#[diesel(table_name = historical_tasks)]
18/// Historical `(Corpus, Service)` run records
19pub struct HistoricalTask {
20  /// id of the historical record (task granularity)
21  pub id: i64,
22  /// foreign key in Tasks(id)
23  pub task_id: i64,
24  /// The historical status of the task
25  pub status: i32,
26  /// When was the save request for this historical record made
27  pub saved_at: NaiveDateTime,
28}
29
30#[derive(Insertable, Debug, Clone)]
31#[diesel(table_name = historical_tasks)]
32/// A new task, to be inserted into `CorTeX`
33pub struct NewHistoricalTask {
34  /// id of the task we are tracking
35  pub task_id: i64,
36  /// the historical status of the task
37  pub status: i32,
38}
39
40#[derive(Debug, Clone, Serialize)]
41/// A single report row for the diff-historical feature
42pub struct TaskRunMetadata {
43  /// id of the task we are tracking
44  pub task_id: String,
45  /// The underling entry of the task
46  pub entry: String,
47  /// The previous result for processing this task
48  pub previous_status: String,
49  /// The current result for processing this task
50  pub current_status: String,
51  /// The previous highlight for displaying this task
52  pub previous_highlight: String,
53  /// The current highlight for displaying this task
54  pub current_highlight: String,
55  /// Previous timestamp of a manual save
56  pub previous_saved_at: String,
57  /// Current/latest timestamp of a manual save
58  pub current_saved_at: String,
59}
60
61#[derive(Debug, Clone, Serialize)]
62/// A single report row for the diff-summary feature
63pub struct DiffStatusRow {
64  /// The previous result for processing this task
65  pub previous_status: String,
66  /// The current result for processing this task
67  pub current_status: String,
68  /// The previous highlight for displaying this task
69  pub previous_highlight: String,
70  /// The current highlight for displaying this task
71  pub current_highlight: String,
72  /// Task count
73  pub task_count: usize,
74}
75
76/// One aggregated cell of the status-transition matrix (see
77/// [`HistoricalTask::status_change_matrix`]): how many tasks moved from `previous_status` to
78/// `current_status` between two snapshots.
79#[derive(QueryableByName)]
80struct StatusChangeCell {
81  /// raw signed status at the earlier (previous) snapshot
82  #[diesel(sql_type = Integer)]
83  previous_status: i32,
84  /// raw signed status at the later (current) snapshot
85  #[diesel(sql_type = Integer)]
86  current_status: i32,
87  /// number of tasks making this exact transition
88  #[diesel(sql_type = BigInt)]
89  task_count: i64,
90}
91
92/// The result of [`HistoricalTask::status_change_matrix`]: the available snapshot-date labels
93/// (newest first) and the `(previous_status, current_status, task_count)` transition cells.
94pub type StatusChangeMatrix = (Vec<String>, Vec<(i32, i32, i64)>);
95
96#[derive(Debug, Clone, Default)]
97/// A collection of filters for task status history reports
98pub struct DiffStatusFilter {
99  /// The previous result for processing this task
100  pub previous_status: Option<TaskStatus>,
101  /// The current result for processing this task
102  pub current_status: Option<TaskStatus>,
103  /// The requested previous date for this manual save
104  pub previous_date: Option<NaiveDateTime>,
105  /// The requested current date for this manual save
106  pub current_date: Option<NaiveDateTime>,
107  /// Starting offset
108  pub offset: usize,
109  /// Page size
110  pub page_size: usize,
111}
112
113/// A historical overview contains the list of labels for all dates where a snapshot was taken,
114/// followed by pairs of reports for tasks at the previous date and current date chosen.
115pub type HistoricalReportOverview = (
116  Vec<String>,
117  Vec<(HistoricalTaskReport, HistoricalTaskReport)>,
118);
119
120impl CortexInsertable for NewHistoricalTask {
121  fn create(&self, connection: &mut PgConnection) -> Result<usize, Error> {
122    insert_into(historical_tasks::table)
123      .values(self)
124      .execute(connection)
125  }
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, QueryableByName)]
129#[diesel(check_for_backend(diesel::pg::Pg))]
130#[diesel(table_name = historical_tasks)]
131/// A reportable historical task
132pub struct HistoricalTaskReport {
133  #[diesel(sql_type = BigInt)]
134  /// the id from the task table
135  pub task_id: i64,
136  #[diesel(sql_type = Integer)]
137  /// the recorded status at this timestamp
138  pub status: i32,
139  #[diesel(sql_type = Timestamp)]
140  /// the saved request was at this time
141  pub saved_at: NaiveDateTime,
142  #[diesel(sql_type = Text)]
143  /// the filename on disk for this task
144  pub entry: String,
145}
146
147impl HistoricalTask {
148  /// Retention stats for the per-task snapshot store: total rows and the oldest `saved_at`. This is
149  /// the unbounded-growth table (one row per task per save-snapshot), so the admin "manage
150  /// historical data" screen surfaces these to decide a retention cutoff.
151  pub fn retention_stats(
152    connection: &mut PgConnection,
153  ) -> Result<(i64, Option<NaiveDateTime>), Error> {
154    use crate::schema::historical_tasks::dsl;
155    let total: i64 = dsl::historical_tasks.count().get_result(connection)?;
156    let oldest: Option<NaiveDateTime> = dsl::historical_tasks
157      .select(diesel::dsl::min(dsl::saved_at))
158      .first(connection)?;
159    Ok((total, oldest))
160  }
161
162  /// How many snapshot rows are strictly older than `cutoff` — the **dry-run count** shown before a
163  /// prune, so the admin sees exactly what a prune would remove.
164  pub fn count_before(connection: &mut PgConnection, cutoff: NaiveDateTime) -> Result<i64, Error> {
165    use crate::schema::historical_tasks::dsl;
166    dsl::historical_tasks
167      .filter(dsl::saved_at.lt(cutoff))
168      .count()
169      .get_result(connection)
170  }
171
172  /// Deletes snapshot rows strictly older than `cutoff` (retention prune), returning the number
173  /// removed. The run *summaries* (`historical_runs`) are untouched — only the bulky per-task
174  /// snapshots are pruned, so the run history/charts survive while old per-task diffs age out.
175  pub fn prune_before(
176    connection: &mut PgConnection,
177    cutoff: NaiveDateTime,
178  ) -> Result<usize, Error> {
179    use crate::schema::historical_tasks::dsl;
180    diesel::delete(dsl::historical_tasks.filter(dsl::saved_at.lt(cutoff))).execute(connection)
181  }
182
183  /// Obtain all historical records for a given task id
184  pub fn find_by(needle_id: i64, connection: &mut PgConnection) -> Result<Vec<Self>, Error> {
185    use crate::schema::historical_tasks::dsl::{saved_at, task_id};
186    let runs: Vec<HistoricalTask> = historical_tasks::table
187      .filter(task_id.eq(needle_id))
188      .order(saved_at.desc())
189      .get_results(connection)?;
190    Ok(runs)
191  }
192
193  /// Obtain the most recent historical record for a given taskid
194  pub fn find_most_recent(
195    needle_id: i64,
196    connection: &mut PgConnection,
197  ) -> Result<Option<HistoricalTask>, Error> {
198    use crate::schema::historical_tasks::dsl::{saved_at, task_id};
199    historical_tasks::table
200      .filter(task_id.eq(needle_id))
201      .order(saved_at.desc())
202      .first(connection)
203      .optional()
204  }
205
206  /// Prepare a report for diffing the two most recent historical records of all tasks belonging to
207  /// a `(corpus,service)` pair. We do this 100 tasks at a time, starting from the given offset.
208  /// The return contract is (id, previous, current)
209  pub fn report_for(
210    corpus: &Corpus,
211    service: &Service,
212    filters: Option<DiffStatusFilter>,
213    connection: &mut PgConnection,
214  ) -> Result<HistoricalReportOverview, Error> {
215    use crate::schema::historical_tasks::dsl::{saved_at, task_id};
216    use crate::schema::tasks::dsl::{corpus_id, service_id};
217    // 1. We need to know the cutoff date of the previous status, to only select the relevant
218    //    entries.
219    let tasks_subquery = tasks::table
220      .filter(corpus_id.eq(corpus.id))
221      .filter(service_id.eq(service.id))
222      .order(tasks::id.asc())
223      .select(tasks::id);
224    let mut dates: Vec<NaiveDateTime> = Vec::new();
225    let mut previous_status = None;
226    let mut current_status = None;
227    let mut offset = 0;
228    let mut page_size = 0;
229    if let Some(opts) = filters {
230      previous_status = opts.previous_status;
231      current_status = opts.current_status;
232      offset = opts.offset;
233      page_size = opts.page_size;
234      if let (Some(previous_date_param), Some(current_date_param)) =
235        (opts.previous_date, opts.current_date)
236      {
237        dates.push(current_date_param);
238        dates.push(previous_date_param);
239      }
240    }
241    let all_dates = historical_tasks::table
242      .filter(task_id.eq_any(tasks_subquery))
243      .order(saved_at.desc())
244      .select(saved_at)
245      .distinct()
246      .get_results(connection)?;
247    if dates.is_empty() && all_dates.len() > 1 {
248      dates = vec![all_dates[0], all_dates[1]];
249    }
250    // 2. Next, we extract all historical tasks for those 100 ids, using a single query,
251    //    descendingly sorting by saved_at.
252    if dates.len() < 2 {
253      return Ok((Vec::new(), Vec::new()));
254    }
255    let dates_labels = all_dates
256      .into_iter()
257      .map(|date| date.format("%Y-%m-%d %H:%M:%S%.f").to_string())
258      .collect();
259    let mut reported_tasks = Vec::new();
260    // Always paginated: `page_size` bounds the rows pulled into the application. A `page_size` of 0
261    // yields an empty page (`LIMIT 0`) rather than the former unbounded "load every snapshot row"
262    // branch, which pulled millions of rows for a large corpus (the unbounded-load class fixed in
263    // KNOWN_ISSUES R-8). The diff *summary* counts in SQL via `status_change_matrix` instead.
264    {
265      // A specific transition (both statuses given) uses a tight query that matches the two
266      // (snapshot, status) pairs directly. With a partial or empty status filter we instead list
267      // *every* task whose status changed between the two snapshots — the "an empty filter lists
268      // every change" contract of the run-tasks screen. Either way the work is paginated in SQL.
269      // The unfiltered path previously `.expect()`ed the statuses and **panicked on the request
270      // thread** (a 500 that killed the Rocket worker) whenever the tasks screen was opened without
271      // picking a transition — see docs/KNOWN_ISSUES.md (F-2).
272      let recent_historical_tasks: Vec<HistoricalTaskReport> = if let (Some(prev), Some(cur)) =
273        (previous_status, current_status)
274      {
275        // Optimized query to fetch historical tasks with matching status and saved_at.
276        let matching_tasks_query = r###"
277        WITH matching_tasks AS (
278          SELECT h.task_id
279          FROM historical_tasks h
280          JOIN tasks t ON h.task_id = t.id
281          WHERE t.corpus_id = $1 AND t.service_id = $2
282            AND (
283              (h.saved_at = $3 AND h.status = $4) OR
284              (h.saved_at = $5 AND h.status = $6)
285            )
286          GROUP BY h.task_id
287          HAVING COUNT(DISTINCT h.status) = 2
288        )
289        SELECT h.task_id, h.status, h.saved_at, t.entry
290        FROM historical_tasks h
291        JOIN tasks t ON h.task_id = t.id
292        WHERE h.task_id IN (SELECT task_id FROM matching_tasks)
293          AND h.saved_at IN ($3, $5)
294        ORDER BY t.entry ASC, h.task_id ASC, h.saved_at ASC
295        OFFSET $7
296        LIMIT $8;
297        "###;
298        sql_query(matching_tasks_query)
299          .bind::<Integer, _>(corpus.id)
300          .bind::<Integer, _>(service.id)
301          .bind::<Timestamp, _>(dates[0])
302          .bind::<Integer, _>(cur.raw())
303          .bind::<Timestamp, _>(dates[1])
304          .bind::<Integer, _>(prev.raw())
305          .bind::<Integer, _>(2 * offset as i32)
306          .bind::<Integer, _>(2 * page_size as i32)
307          .get_results::<HistoricalTaskReport>(connection)?
308      } else {
309        // No (or partial) transition selected: every task present in *both* snapshots whose status
310        // differs between them, paginated — two rows per task, ordered so each pair is adjacent.
311        let changed_tasks_query = r###"
312        WITH matching_tasks AS (
313          SELECT h.task_id
314          FROM historical_tasks h
315          JOIN tasks t ON h.task_id = t.id
316          WHERE t.corpus_id = $1 AND t.service_id = $2
317            AND h.saved_at IN ($3, $4)
318          GROUP BY h.task_id
319          HAVING COUNT(DISTINCT h.saved_at) = 2 AND COUNT(DISTINCT h.status) = 2
320        )
321        SELECT h.task_id, h.status, h.saved_at, t.entry
322        FROM historical_tasks h
323        JOIN tasks t ON h.task_id = t.id
324        WHERE h.saved_at IN ($3, $4)
325          AND h.task_id IN (SELECT task_id FROM matching_tasks)
326        ORDER BY t.entry ASC, h.task_id ASC, h.saved_at ASC
327        OFFSET $5
328        LIMIT $6;
329        "###;
330        sql_query(changed_tasks_query)
331          .bind::<Integer, _>(corpus.id)
332          .bind::<Integer, _>(service.id)
333          .bind::<Timestamp, _>(dates[0])
334          .bind::<Timestamp, _>(dates[1])
335          .bind::<Integer, _>(2 * offset as i32)
336          .bind::<Integer, _>(2 * page_size as i32)
337          .get_results::<HistoricalTaskReport>(connection)?
338      };
339      // Iterate in pairs, as applicable
340      let mut iter = recent_historical_tasks.into_iter();
341      while let Some(t1) = iter.next() {
342        if let Some(t2) = iter.next() {
343          if t1.task_id == t2.task_id {
344            reported_tasks.push((t1, t2));
345          }
346        } else {
347          break;
348        }
349      }
350    }
351    Ok((dates_labels, reported_tasks))
352  }
353
354  /// The **status-transition matrix** between two snapshots of a `(corpus, service)`: how many
355  /// tasks moved from each previous status to each current status. Unlike [`report_for`], it
356  /// aggregates **in SQL** (`GROUP BY previous, current`), so the result is bounded to one row per
357  /// status pair (≤ a few dozen) regardless of corpus size — a corpus with two 1.5M-task snapshots
358  /// is summarized without loading millions of `historical_tasks` rows into the application (the
359  /// unbounded-load class of KNOWN_ISSUES R-7/R-8). Returns the available snapshot-date labels
360  /// (newest first) and the matrix cells `(previous_status, current_status, task_count)`. Fewer
361  /// than two snapshots (or an out-of-range date pair) yields an empty matrix — a normal "nothing
362  /// to diff" result, not an error.
363  pub fn status_change_matrix(
364    corpus: &Corpus,
365    service: &Service,
366    previous_date: Option<NaiveDateTime>,
367    current_date: Option<NaiveDateTime>,
368    connection: &mut PgConnection,
369  ) -> Result<StatusChangeMatrix, Error> {
370    use crate::schema::historical_tasks::dsl::{saved_at, task_id};
371    use crate::schema::tasks::dsl::{corpus_id, service_id};
372
373    // All snapshot dates for this (corpus, service): bounded by the number of saved runs, not by
374    // task count (DISTINCT over saved_at). Drives the date picker + the default two-most-recent
375    // diff.
376    let tasks_subquery = tasks::table
377      .filter(corpus_id.eq(corpus.id))
378      .filter(service_id.eq(service.id))
379      .select(tasks::id);
380    let all_dates: Vec<NaiveDateTime> = historical_tasks::table
381      .filter(task_id.eq_any(tasks_subquery))
382      .order(saved_at.desc())
383      .select(saved_at)
384      .distinct()
385      .get_results(connection)?;
386    let dates_labels = all_dates
387      .iter()
388      .map(|date| date.format("%Y-%m-%d %H:%M:%S%.f").to_string())
389      .collect();
390
391    // Choose the newer/older snapshots to diff: the caller's pair, else the two most recent.
392    let (older, newer) = match (previous_date, current_date) {
393      (Some(previous), Some(current)) => (previous, current),
394      _ if all_dates.len() > 1 => (all_dates[1], all_dates[0]),
395      _ => return Ok((dates_labels, Vec::new())),
396    };
397
398    // Count the transitions in the database — one row per (previous, current) status pair, so the
399    // application never materializes more than the matrix itself.
400    let matrix = sql_query(
401      "SELECT prev.status AS previous_status, cur.status AS current_status, \
402       COUNT(*) AS task_count \
403       FROM historical_tasks prev \
404       JOIN historical_tasks cur ON prev.task_id = cur.task_id \
405       JOIN tasks t ON t.id = prev.task_id \
406       WHERE t.corpus_id = $1 AND t.service_id = $2 \
407       AND prev.saved_at = $3 AND cur.saved_at = $4 \
408       GROUP BY prev.status, cur.status",
409    )
410    .bind::<Integer, _>(corpus.id)
411    .bind::<Integer, _>(service.id)
412    .bind::<Timestamp, _>(older)
413    .bind::<Timestamp, _>(newer)
414    .get_results::<StatusChangeCell>(connection)?
415    .into_iter()
416    .map(|cell| (cell.previous_status, cell.current_status, cell.task_count))
417    .collect();
418    Ok((dates_labels, matrix))
419  }
420}