1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use crate::error::BoxDynError;
use crate::migrate::{Migration, MigrationType};
use futures_core::future::BoxFuture;
use futures_util::TryStreamExt;
use sqlx_rt::fs;
use std::borrow::Cow;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
pub trait MigrationSource<'s>: Debug {
fn resolve(self) -> BoxFuture<'s, Result<Vec<Migration>, BoxDynError>>;
}
impl<'s> MigrationSource<'s> for &'s Path {
fn resolve(self) -> BoxFuture<'s, Result<Vec<Migration>, BoxDynError>> {
Box::pin(async move {
#[allow(unused_mut)]
let mut s = fs::read_dir(self.canonicalize()?).await?;
let mut migrations = Vec::new();
#[cfg(any(feature = "_rt-actix", feature = "_rt-tokio"))]
let mut s = tokio_stream::wrappers::ReadDirStream::new(s);
while let Some(entry) = s.try_next().await? {
if !entry.metadata().await?.is_file() {
continue;
}
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
let parts = file_name.splitn(2, '_').collect::<Vec<_>>();
if parts.len() != 2 || !parts[1].ends_with(".sql") {
continue;
}
let version: i64 = parts[0].parse()?;
let migration_type = MigrationType::from_filename(parts[1]);
let description = parts[1]
.trim_end_matches(migration_type.suffix())
.replace('_', " ")
.to_owned();
let sql = fs::read_to_string(&entry.path()).await?;
migrations.push(Migration::new(
version,
Cow::Owned(description),
migration_type,
Cow::Owned(sql),
));
}
migrations.sort_by_key(|m| m.version);
Ok(migrations)
})
}
}
impl MigrationSource<'static> for PathBuf {
fn resolve(self) -> BoxFuture<'static, Result<Vec<Migration>, BoxDynError>> {
Box::pin(async move { self.as_path().resolve().await })
}
}