Last active
February 10, 2017 21:23
-
-
Save ihodes/3b928d494789b3b345af28400f7978ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#use "topfind";; | |
#thread | |
#require "coclobas.ketrew_backend,biokepi,cmdliner,nonstd,sosa,csv";; | |
(* You must have a biokepi_machine.ml file in this directory that has a | |
`biokepi_machine` defined at the top level. *) | |
#use "biokepi_machine.ml" | |
open Nonstd | |
module String = Sosa.Native_string | |
(** We want to extend the compiler to handle a new function, | |
`write_csv_manifest`, so we define the new signature that the compiler must | |
have. *) | |
module type Semantics = sig | |
include Biokepi.EDSL.Semantics | |
val write_csv_manifest : | |
normal:[ `Bam ] repr -> | |
tumor:[ `Bam ] repr -> | |
vcfs:(string * [ `Vcf ] repr) list -> | |
string -> unit | |
end | |
(** Here we add the function itself for the `To_workflow` compiler (the compiler | |
which handles the transformation from the eDSL to actual Ketrew workflow | |
nodes). All we're adding is a function which outputs a CSV locally when the | |
workflow is compiled. *) | |
module To_workflow | |
(Config : Biokepi.EDSL.Compile.To_workflow.Compiler_configuration) = | |
struct | |
include Biokepi.EDSL.Compile.To_workflow.Make(Config) | |
let write_csv_manifest ~normal ~tumor ~vcfs out | |
= | |
let csv = | |
let module F = Biokepi.EDSL.Compile.To_workflow.File_type_specification in | |
let header = ["name"; "path"] in | |
let vcfs = | |
List.map ~f:(fun (name, n) -> [name; (F.get_vcf n)#product#path]) vcfs | |
in | |
[ | |
header; | |
["normal"; (F.get_bam normal)#product#path]; | |
["tumor"; (F.get_bam tumor)#product#path]; | |
] @ vcfs | |
in | |
let outc = Csv.to_channel (open_out out) in | |
Csv.output_all outc csv | |
end | |
(** The actual pipeline that does our work. *) | |
module Pipeline(Bfx: Semantics) = struct | |
module Stdlib = Biokepi.EDSL.Library.Make(Bfx) | |
(** This takes our list of FASTQs (pairs, or single-ended) and aligns them, | |
then merges them all together. *) | |
let to_bam ~reference_build fastqs = | |
Stdlib.bwa_mem_opt_inputs fastqs | |
|> List.map ~f:(Bfx.bwa_mem_opt ~reference_build ?configuration:None) | |
|> Bfx.list | |
|> Bfx.merge_bams | |
|> Bfx.picard_mark_duplicates | |
let pipeline ~normal_fastqs ~tumor_fastqs ~reference_build ~results_manifest = | |
let normal = to_bam ~reference_build normal_fastqs in | |
let tumor = to_bam ~reference_build tumor_fastqs in | |
let strelka_vcf = Bfx.strelka ~normal ~tumor () in | |
let mutect_vcf = Bfx.mutect ~normal ~tumor () in | |
let normal_haplotype_vcf = Bfx.gatk_haplotype_caller normal in | |
let tumor_haplotype_vcf = Bfx.gatk_haplotype_caller tumor in | |
let () = | |
let vcfs = ["strelka", strelka_vcf; "mutect", mutect_vcf; | |
"normal_haplotype", normal_haplotype_vcf; | |
"tumor_haplotype", tumor_haplotype_vcf] in | |
Bfx.write_csv_manifest ~normal ~tumor ~vcfs results_manifest | |
in | |
Bfx.list [Bfx.to_unit normal; | |
Bfx.to_unit tumor; | |
Bfx.to_unit strelka_vcf; | |
Bfx.to_unit mutect_vcf; | |
Bfx.to_unit tumor_haplotype_vcf; | |
Bfx.to_unit normal_haplotype_vcf] | |
let parse_input_fastqs ~sample_name files = | |
let open Biokepi.EDSL.Library.Input in | |
let parse_file file = | |
match (String.split ~on:(`Character ';') file) with | |
| [ pair1; pair2; ] -> pe pair1 pair2 | |
| [ single_end; ] -> se single_end | |
| _ -> failwith "Couldn't parse FASTQ." | |
in | |
fastq_sample ~sample_name (List.map ~f:(fun f -> parse_file f) files) | |
let run ~normal_paths ~tumor_paths ~name ~reference_build ~results_manifest = | |
let normal_fastqs = | |
parse_input_fastqs normal_paths ~sample_name:(sprintf "normal-%s" name) in | |
let tumor_fastqs = | |
parse_input_fastqs tumor_paths ~sample_name:(sprintf "tumor-%s" name) in | |
Bfx.observe (fun () -> | |
pipeline | |
~normal_fastqs ~tumor_fastqs ~reference_build ~results_manifest | |
|> Bfx.to_unit) | |
end | |
(** These describe the CLI interface, and is not specific to Biokepi at all | |
(uses the Cmdliner library). *) | |
let args f = | |
let open Cmdliner in | |
let open Cmdliner.Term in | |
app f begin | |
(* We use these "wrapper" functions adding polymorphic types in order to | |
further type the various returned bools and strings from the CLI. *) | |
pure (fun s -> `Name s) | |
$ Arg.( | |
required & opt (some string) None | |
& info ["name"] | |
~doc:"Name of the run (unique per sample/configuration).") | |
end | |
$ begin | |
pure (fun s -> `Results_dir s) | |
$ Arg.( | |
required & opt (some string) None | |
& info ["results-directory"] | |
~doc:"Directory where all files related to this run will be stored.") | |
end | |
$ begin | |
pure (fun s -> `Results_manifest s) | |
$ Arg.( | |
required & opt (some string) None | |
& info ["local-results-manifest-path"; "R"] | |
~doc:"Local path where CSV with all results/files listed will be created.") | |
end | |
$ begin | |
pure (fun s -> `Dry_run s) | |
$ Arg.( | |
value & flag & info ["dry-run"] | |
~doc:"Don't submit this job to Ketrew (test that it compiles).") | |
end | |
$ begin | |
pure (fun s -> `Normal_fastqs s) | |
$ Arg.( | |
required & opt (some (list string)) None | |
& info ["normal-fastqs"] | |
~doc:"Path to normal FASTQs, comma-separated pairs separated by\ | |
semicolons, e.g. (a.r1.fq.gz;a.r2.fq.gz,b.r1.fq.gz;b.r2.fq.gz)") | |
end | |
$ begin | |
pure (fun s -> `Tumor_fastqs s) | |
$ Arg.( | |
required & opt (some (list string)) None | |
& info ["tumor-fastqs"] | |
~doc:"Path to tumor FASTQs, comma-separated pairs separated by\ | |
semicolons, e.g. (a.r1.fq.gz;a.r2.fq.gz,b.r1.fq.gz;b.r2.fq.gz)") | |
end | |
$ begin | |
let references = | |
["b37decoy", "b37decoy"; "b37", "b37"; "b38", "b38"; "mm10", "mm10"] in | |
pure (fun s -> `Reference_build s) | |
$ Arg.( | |
required & opt (some (enum references)) None | |
& info ["reference-build"; "R"] | |
~doc:"Reference build to use.") | |
end | |
(** Here we compile & submit the pipeline. *) | |
let run () = | |
fun | |
(`Name name) | |
(`Results_dir results_dir) | |
(`Results_manifest results_manifest) | |
(`Dry_run dry_run) | |
(`Normal_fastqs normal_paths) | |
(`Tumor_fastqs tumor_paths) | |
(`Reference_build reference_build) | |
-> | |
(** This Config module is required to pass configuration & the | |
Biokepi.Machine.t (describing the compute infrastructure) to the | |
To_workflow compiler. This is used to compile the above pipeline to | |
a bunch of Ketrew jobs. *) | |
let module Config = struct | |
include Biokepi.EDSL.Compile.To_workflow.Defaults | |
(* This is where all the files generated by the workflow will end up. *) | |
let work_dir = results_dir | |
let machine = biokepi_machine | |
end in | |
let module Workflow_compiler = | |
To_workflow(Config) in | |
let module Compiled_pipeline = Pipeline(Workflow_compiler) in | |
let workflow = | |
Compiled_pipeline.run | |
~normal_paths ~tumor_paths ~name ~reference_build ~results_manifest | |
|> Biokepi.EDSL.Compile.To_workflow.File_type_specification. | |
get_unit_workflow ~name | |
in | |
if dry_run then | |
printf "Dry run, not submitting!\n" | |
else | |
let _ = Ketrew.Client.submit_workflow workflow ~add_tags:[name] in | |
printf "Submitted workflow \"%s\"!\n" name | |
(** Here we join the arguments in `args` with the actual `run` function taking | |
those arguments into the complete CLI. *) | |
let cli () = | |
let open Cmdliner in | |
let info = Term.(info "Tim's pipeline") in | |
let term = Term.(pure (run ())) |> args in | |
(term, info) | |
(** This is the "main" function that's actually called. *) | |
let () = | |
let open Cmdliner in | |
match Term.eval (cli ()) with | |
| `Error _ -> exit 1 | |
| `Ok f -> f | |
| _ -> exit 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment