|
use async_once::AsyncOnce; |
|
use aws_sdk_s3::{ |
|
config::{Credentials, Region}, |
|
Client, |
|
}; |
|
use ctor::{ctor, dtor}; |
|
use lazy_static::lazy_static; |
|
use std::{future::Future, thread}; |
|
use testcontainers::{clients::Cli, images::minio::MinIO}; |
|
use tokio::{ |
|
runtime, |
|
sync::{ |
|
mpsc::{self, UnboundedReceiver, UnboundedSender}, |
|
Mutex, |
|
}, |
|
}; |
|
|
|
lazy_static! { |
|
static ref MINIO_IN: Channel<ContainerCommands> = channel(); |
|
static ref MINIO_PORT: Channel<u16> = channel(); |
|
static ref MINIO_STOPPED: Channel<()> = channel(); |
|
static ref S3_CLIENT: AsyncOnce<Client> = AsyncOnce::new(create_s3_client()); |
|
} |
|
|
|
#[tokio::test] |
|
async fn should_create_bucket() { |
|
let s3_client = S3_CLIENT.get().await; |
|
let bucket = "test-s3-bucket"; |
|
s3_client |
|
.create_bucket() |
|
.bucket(bucket) |
|
.send() |
|
.await |
|
.unwrap(); |
|
println!("Bucket '{}' created.", bucket); |
|
} |
|
|
|
#[ctor] |
|
fn on_startup() { |
|
thread::spawn(|| execute_blocking(start_minio())); |
|
} |
|
|
|
#[dtor] |
|
fn on_shutdown() { |
|
execute_blocking(clean_up()); |
|
} |
|
|
|
fn execute_blocking<F: Future>(f: F) { |
|
runtime::Builder::new_current_thread() |
|
.build() |
|
.unwrap() |
|
.block_on(f); |
|
} |
|
|
|
async fn clean_up() { |
|
MINIO_IN.tx.send(ContainerCommands::Stop).unwrap(); |
|
MINIO_STOPPED.rx.lock().await.recv().await; |
|
println!("Minio stopped.") |
|
} |
|
|
|
#[derive(Debug)] |
|
enum ContainerCommands { |
|
FetchPort, |
|
Stop, |
|
} |
|
|
|
struct Channel<T> { |
|
tx: UnboundedSender<T>, |
|
rx: Mutex<UnboundedReceiver<T>>, |
|
} |
|
|
|
fn channel<T>() -> Channel<T> { |
|
let (tx, rx) = mpsc::unbounded_channel(); |
|
Channel { |
|
tx, |
|
rx: Mutex::new(rx), |
|
} |
|
} |
|
|
|
async fn start_minio() { |
|
let docker = Cli::default(); |
|
let minio = docker.run(MinIO::default()); |
|
let port = minio.get_host_port_ipv4(9000); |
|
println!("MinIO started on port {}", port); |
|
let mut rx = MINIO_IN.rx.lock().await; |
|
while let Some(command) = rx.recv().await { |
|
println!("Received container command: {:?}", command); |
|
match command { |
|
ContainerCommands::FetchPort => MINIO_PORT.tx.send(port).unwrap(), |
|
ContainerCommands::Stop => { |
|
minio.stop(); |
|
MINIO_STOPPED.tx.send(()).unwrap(); |
|
rx.close(); |
|
} |
|
} |
|
} |
|
} |
|
|
|
async fn create_s3_client() -> Client { |
|
MINIO_IN.tx.send(ContainerCommands::FetchPort).unwrap(); |
|
let minio_port = MINIO_PORT.rx.lock().await.recv().await.unwrap(); |
|
println!("Connecting s3 client to port {}", minio_port); |
|
Client::from_conf( |
|
aws_sdk_s3::Config::builder() |
|
.credentials_provider(Credentials::new( |
|
"minioadmin", |
|
"minioadmin", |
|
None, |
|
None, |
|
"test-provider", |
|
)) |
|
.region(Region::new("us-east-1")) |
|
.endpoint_url(format!("http://127.0.0.1:{}/", minio_port)) |
|
.build(), |
|
) |
|
} |