snakemake使用笔记
snakemake是一个用来编写任务流程的工具,用python编写的,因此其执行的流程脚本也比较通俗易懂,易于理解。
# 从一个简单的例子开始
# 1、安装snakemake
安装snakemake的方法有多种,snakemake官方推荐的是conda,安装方法如下:
conda install -c bioconda snakemake
# 2、一个简单的snakemake脚本
虽然snakemake广泛的应用于生物信息方面的流程编写,但是snakemake的应用并不局限于编写生物信息学的流程,这里以一个简单的合并文件的例子开始介绍snakemake的简单使用。
#首先我们建立两个文件
echo "Here is hello." > hello.txt
echo "Here is world." > world.txt
#接下来开始编写我们的Snakefile
rule concat: # 这里的rule可视为snakemake定义的关键字,concat使我们自定义的这一步任务的名称
input: # input同样是snakemake的关键字,定义了在这个任务中的输入文件
expand("{file}.txt", file=["hello", "world"]) #expand是一个snakemake定义的替换命令
output: # output也是snakemake的关键字,定义输出结果的保存文件
"merged.txt"
shell: # 这里表示我们下面的命令将在命令行中执行
"cat {input} > {output}"
#最后就可以在Snakefile的路径执行snakemake命令即可
snakemake
cat merge.txt
# Here is hello.
# Here is world.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
提示
在上面的Snakefile脚本中,rule、input、output、shell、expand均为snakemake中的关键字或者命令。同时Snakefile中的每一个rule其实都可以看作是一个简单的shell脚本,通过Snakefile将多个rule组织在一起并按照我们定义的顺序来执行。另外,在output中的结果文件可以是未存在目录中的文件,这时会自动创建不存在的目录。
# snakemake中的一些命令与规则
# 1、rule
提示
rule是Snakefile中最主要的部分。如上面的例子所说,每一个rule定义了一系列pipe中的一步,每一个rule都可以当作一个shell脚本来处理,一般主要包括input、output、shell3个部分。同时还有许多上面没有列出来的用法:
rule all
。不同于其他的rule,在rule all
里面一般不会去定义要执行的命令,他一般用来定义最后的输出结果文件。除了rule all
中定义的文件外最后输出结果不会保存任何中间文件。例如将上面的脚本改成如下文件则没有输出结果:
rule all:
input:
#"merged.txt" 取消注释后,则能正常输出文件
rule concat:
input:
expand("{file}.txt", file=["hello", "world"])
output:
"merge.txt"
shell:
"cat {input} > {output}"
2
3
4
5
6
7
8
9
10
wildcards
。用来获取通配符匹配到的部分,例如对于通配符"{dataset}/file.{group}.txt"匹配到文件101/file.A.txt,则{wildcards.dataset}就是101,{wildcards.group}就是A。threads
。通过在rule里面指定threads参数来指定分配给程序的线程数,egthreads: 8。resources
。可用来指定程序运行的内存,eg. resources: mem_mb=800。message
。使用message参数可以指定每运行到一个rule时,在终端中给出提示信息,eg.message: "starting mapping ..."。priority
。可用来指定程序运行的优先级,默认为0,eg.priority: 20。log
。用来指定生成的日志文件,eg.log: "logs/concat.log"。params
。指定程序运行的参数,eg.params: cat="-n",调用方法为{params.cat}。run
。在run的缩进区域里面可以输入并执行python代码。scripts
。用来执行指定脚本,eg.scripts: "rm_dup.py"temp
。通过temp方法可以在所有rule运行完后删除指定的中间文件,eg.output: temp("f1.bam")。protected
。用来指定某些中间文件是需要保留的,eg.output: protected("f1.bam")。ancient
。重复运行执行某个Snakefile时,snakemake会通过比较输入文件的时间戳是否更改(比原来的新)来决定是否重新执行程序生成文件,使用ancient方法可以强制使得结果文件一旦生成就不会再次重新生成覆盖,即便输入文件时间戳已经更新,eg.input: ancient("f1.fastq")。Rule Dependencies
。可通过快捷方式指定前一个rule的输出文件为此rule的输入文件:
rule a:
input: "path/to/input"
output: "path/to/output"
shell: ...
rule b:
input: rules.a.output #直接通过rules.a.output 指定rule a的输出
output: "path/to/output/of/b"
shell: ...
2
3
4
5
6
7
8
9
report
。使用snakemake定义的report函数可以方便的将结果嵌入到一个HTML文件中进行查看。
# 2. configuration
每计算一次数据都要重写一次Snakefile有时可能会显得有些繁琐,我们可以将那些改动写入配置文件,使用相同流程计算时,将输入文件的文件名写入配置文件然后通过Snakefile读入即可。 配置文件有两种书写格式——json和yaml。在Snakefile中读入配置文件使用如下方式:
configfile: "path/to/config.json"
configfile: "path/to/config.yaml"
2
# 也可直接在执行snakemake命令时指定配置
snakemake --config yourparam=1.5
2
在shell命令中直接调用config文件中的内容的话,不需要引号,如config[a]
而不是config["a"]
。
# snakemake的执行
一般讲所有的参数配置写入Snakefile后直接在Snakefile所在路径执行snakemake命令即可开始执行流程任务。一些常用的参数:
--snakefile, -s 指定Snakefile,否则是当前目录下的Snakefile
--dryrun, -n 不真正执行,一般用来查看Snakefile是否有错
--printshellcmds, -p 输出要执行的shell命令
--reason, -r 输出每条rule执行的原因,默认FALSE
--cores, --jobs, -j 指定运行的核数,若不指定,则使用最大的核数
--force, -f 重新运行第一条rule或指定的rule
--forceall, -F 重新运行所有的rule,不管是否已经有输出结果
--forcerun, -R 重新执行Snakefile,当更新了rule时候使用此命令
--set-threads 指定特定rule的核数
# 一些可视化命令
$ snakemake --dag | dot -Tpdf > dag.pdf
# 集群设置
--profile 指定集群配置文件所在的文件夹
2
3
4
5
6
7
8
9
10
11
12
13
# Profile集群设置示例(pbs)
# config.yaml文件示例
cluster: "pbs-submit.py --depend \"{dependencies}\""
cluster-status: "pbs-status.py"
jobscript: "pbs-jobscript.sh"
jobs: 5000
immediate-submit: {{cookiecutter.immediate_submit}}
verbose: true
notemp: true
2
3
4
5
6
7
# pbs-jobscript.sh
#!/bin/sh
# properties = {properties}
{exec_job}
2
3
# pbs-status.py
#!/usr/bin/env python3
import sys
import subprocess
import xml.etree.cElementTree as ET
jobid = sys.argv[1]
try:
res = subprocess.run("qstat -f -x {}".format(jobid), check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
xmldoc = ET.ElementTree(ET.fromstring(res.stdout.decode())).getroot()
job_state = xmldoc.findall('.//job_state')[0].text
if job_state == "C":
exit_status = xmldoc.findall('.//exit_status')[0].text
if exit_status == '0':
print("success")
else:
print("failed")
else:
print("running")
except (subprocess.CalledProcessError, IndexError, KeyboardInterrupt) as e:
print("failed")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# pbs-submit.py
#!/usr/bin/env python3
import os
import sys
import argparse
import subprocess
from snakemake.utils import read_job_properties
parser=argparse.ArgumentParser(add_help=False)
parser.add_argument("--depend", help="Space separated list of ids for jobs this job should depend on.")
parser.add_argument("-a", help="Declare the time when the job becomes eligible for execution.")
parser.add_argument("-A", help="Define the account string.")
parser.add_argument("-b", help="PBS Server timeout.")
parser.add_argument("-c", help="Checkpoint options.")
parser.add_argument("-C", help="Directive prefix in script file.")
parser.add_argument("-d", help="Working directory to be used (default: ~). PBS_O_INITDIR")
parser.add_argument("-D", help="Root directory to be used. PBS_O_ROOTDIR")
parser.add_argument("-e", help="standard error path.")
parser.add_argument("-f", help="Fault tolerant.",action="store_true")
parser.add_argument("-h", help="Apply user hold at submission time",action="store_true")
parser.add_argument("-j", help="Merge standard error and standard out. (oe or eo)")
parser.add_argument("-l", help="Resource list.")
parser.add_argument("-m", help="Mail options.")
parser.add_argument("-M", help="Mail users.")
parser.add_argument("-N", help="Name for the job.")
parser.add_argument("-o", help="standard output path.")
parser.add_argument("-p", help="Set job priority.")
parser.add_argument("-P", help="Proxy user for job.")
parser.add_argument("-q", help="Set destination queue.")
parser.add_argument("-t", help="Array request.")
parser.add_argument("-u", help="Set user name for job.")
parser.add_argument("-v", help="Environment variables to export to the job.")
parser.add_argument("-V", help="Export all environment variables.",action="store_true")
parser.add_argument("-w", help="Set working directory. PBS_O_WORKDIR")
parser.add_argument("-W", help="Additional attributes.")
parser.add_argument("--help", help="Display help message.",action="store_true")
parser.add_argument("positional",action="append",nargs="?")
args = parser.parse_args()
if args.help :
parser.print_help()
sys.exit(0)
jobscript = sys.argv[-1]
job_properties = read_job_properties(jobscript)
atime=""
acc_string=""
pbs_time=""
chkpt=""
pref=""
dd=""
rd=""
se=""
ft=""
hold=""
j=""
resource=""
mail=""
mailuser=""
jname=""
so=""
priority=""
proxy=""
q=""
ar=""
user=""
ev=""
eall=""
wd=""
add=""
depend=""
resourceparams=""
extras=""
if args.depend:
for m in args.depend.split(" "):
depend = depend + ":" + m
if depend:
depend = " -W \"depend=afterok" + depend + "\""
if args.positional:
for m in args.positional:
extras = extras + " " + m
if args.a: atime = " -a " + args.a
if args.A: acc_string = " -A " + args.A
if args.b: pbs_time = " -b " + args.b
if args.c: chkpt = " -c " + args.c
if args.C: pref = " -C " + args.C
if args.d: dd = " -d " + args.d
if args.D: rd = " -D " + args.D
if args.e: se = " -e " + args.e
if args.f: ft = " -f"
if args.h: hold = " -h"
if args.j: j = " -j " + args.j
if args.l: resource = " -l " + args.l
if args.m: mail = " -m " + args.m
if args.M: mailuser = " -M " + args.M
if args.N: jname = " -N " + args.N
if args.o: so = " -o " + args.o
if args.p: priority = " -p " + args.p
if args.P: proxy = " -P " + args.P
if args.q: q = " -q " + args.q
if args.t: ar = " -t " + args.ar
if args.u: user = " -u " + args.u
if args.v: ev = " -v " + args.v
if args.V: eall = " -V"
if args.w: wd = " -w " + args.w
if args.W: add= " -W \"" + args.W + "\""
nodes=""
ppn=""
mem=""
walltime=""
if "threads" in job_properties:
ppn = "ppn=" + str(job_properties["threads"])
if "resources" in job_properties:
resources = job_properties["resources"]
if "nodes" in resources: nodes="nodes=" + str(resources["nodes"])
if ppn and not nodes : nodes="nodes=1"
if "mem" in resources: mem="mem=" + str(resources["mem"])
if "walltime" in resources: walltime="walltime=" + str(resources["walltime"])
if nodes or ppn or mem or walltime: resourceparams = " -l \""
if nodes: resourceparams = resourceparams + nodes
if nodes and ppn: resourceparams = resourceparams + ":" + ppn
if nodes and mem: resourceparams = resourceparams + ","
if mem: resourceparams = resourceparams + mem
if walltime and (nodes or mem): resourceparams = resourceparams + ","
if walltime: resourceparams = resourceparams + walltime
if nodes or mem or walltime: resourceparams = resourceparams + "\""
if "cluster" in job_properties:
cluster = job_properties["cluster"]
if "error" in cluster:
os.makedirs(os.path.dirname(cluster["error"]), exist_ok=True)
se = " -e " + cluster["error"]
if "output" in cluster:
os.makedirs(os.path.dirname(cluster["output"]), exist_ok=True)
so = " -o " + cluster["output"]
cmd = "qsub {a}{A}{b}{c}{C}{d}{D}{e}{f}{h}{j}{l}{m}{M}{N}{o}{p}{P}{q}{t}{u}{v}{V}{w}{W}{rp}{dep}{ex}".format(\
a=atime,A=acc_string,b=pbs_time,c=chkpt,C=pref,d=dd,D=rd,e=se,f=ft,h=hold,j=j,l=resource,m=mail,M=mailuser,\
N=jname,o=so,p=priority,P=proxy,q=q,t=ar,u=user,v=ev,V=eall,w=wd,W=add,rp=resourceparams,dep=depend,ex=extras)
try:
res = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise e
res = res.stdout.decode()
print(res.strip())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
参考链接: https://www.jianshu.com/p/14b9eccc0c0e (opens new window)
https://snakemake.readthedocs.io/en/latest/index.html (opens new window)