两个Plink文件样本间基因型比对代码实现

本软件对两个plink文件中的样本的共同的SNP位点进行比对,可以应用于同一群体不同处理之后得到的基因型文件的比对(例如比较不同的填充方式得到的plink文件)。

前提条件为,两个map的染色体、SNP名称及物理位置必须保持一致。

使用软件

  • python 3.0以上版本,事先安装 numpy 模块。
  • plink 1.9 , 并将 plink 添加到环境变量中。

输入文件格式

  • plink 文件 :前缀相同的一对文件,后缀分别为.ped.map
  • 映射文件: 两列,所有基因型个体的芯片号与个体号,空格或 tab分隔,无标题。

输出文件说明

  • cmp_two_plinks.txt : 第一列为第一个plink文件某个个体的个体号,第二列为第二个plink文件某个个体的个体号,第三列为两个样本不一致的位点数目,第四列为位点总数,第五列为前两列相除得到的不一致率。

    示例如下:

    202309870171_R01C01 202309870171_R01C01 0 43279 0
    202309870171_R01C02 202309870171_R01C02 0 43289 0
    202309870171_R02C01 202309870171_R02C01 0 43268 0

运行代码

代码文件见:compair_samples.py

将输入文件和本程序放在同一文件夹下,运行命令示范为

1
python compair_samples.py --plink1 plink1 --plink2 plink2

可用参数说明

--plink1: 第一个plink文件前缀,必选。

--plink2: 第二个plink文件前缀,可选。

--chipid1 : 第一个基因型个体映射文件名,芯片和个体号均不能出现重复,可选。如果提供这个文件,那么基因型样本会使用映射文件的个体号。

--chipid2 : 第二个基因型个体映射文件名,芯片和个体号均不能出现重复,可选。如果提供这个文件,那么基因型样本会使用映射文件的个体号。

--cmp-all : 是否需要所有配对两两遍历,可选,只能为yes和no,默认为no。如果该选项设置为yes,则会对两个plink文件中所有样本之间所有可能的组合进行比较,输出所有配对结果;如果设置为no, 则仅针对两个plink文件的共同样本进行基因型比对,输出两个plink文件的同名的样本间的基因型比对结果。

--exclude-miss : 基因型比对是否先剔除缺失值,可选,只能为yes和no,默认为yes。如果该选项设置为no,比对时不会剔除缺失位点,如果存在同一个SNP,一个样本有基因型,一个样本为缺失,该SNP同样会被视为不一致位点;如果该选项设置为 yes ,比对时会剔除至少一个样本为缺失的位点,只比对两个样本均不为缺失的位点。

--save-temp : 是否保留中间文件,可选,只能为yes和no,默认为 no。

--out : 结果文件名称,可选,默认为 cmp_two_plinks.txt

流程说明

先对两个ped文件中的芯片号进行重编码,并获取两个ped文件相同的芯片号信息。之后从两个map中提取相同的SNP位点,使用plink软件提取共同位点并合并两个plink文件,得到012编码的raw文件。最后默认对raw文件中两个plink文件中的共同样本进行基因型比对。

代码说明

检验参数

检验输入的各个参数是否合规

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#检验参数
legal_argument_set = set(["yes", "no"])

def is_number(s):
try:
float(s)
return True
except ValueError:
pass
return False

if not os.path.exists(f"{args.plink1}.ped"):
print(f"Error: not exist {args.plink1}.ped\n\n")
sys.exit(1)

if not os.path.exists(f"{args.plink1}.map"):
print(f"Error: not exist {args.plink1}.map\n\n")
sys.exit(1)

if args.chipid1 is not None:
if not os.path.exists(f"{args.chipid1}"):
print(f"Error: not exist {args.chipid1}\n\n")
sys.exit(1)

if not os.path.exists(f"{args.plink2}.ped"):
print(f"Error: not exist {args.plink2}.ped\n\n")
sys.exit(1)

if not os.path.exists(f"{args.plink2}.map"):
print(f"Error: not exist {args.plink2}.map\n\n")
sys.exit(1)

if args.chipid2 is not None:
if not os.path.exists(f"{args.chipid2}"):
print(f"Error: not exist {args.chipid2}\n\n")
sys.exit(1)

if args.cmp_all not in legal_argument_set:
print("Error: illegal input, --cmp-all must be yes or no\n\n")
sys.exit(1)

if args.exclude_miss not in legal_argument_set:
print("Error: illegal input, --exclude-miss must be yes or no\n\n")
sys.exit(1)

if args.save_temp not in legal_argument_set:
print("Error: illegal input, --save-temp must be yes or no\n\n")
sys.exit(1)

复制文件

这里根据原始plink文件重新生成两套新的plink文件,其中芯片号使用数字代替。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#开始流程

plink_prefix1 = args.plink1
plink_prefix2 = args.plink2

# 复制 map
os.system(f"cp {plink_prefix1}.map {plink_prefix1}_temp.map")
os.system(f"cp {plink_prefix2}.map {plink_prefix2}_temp.map")

# 重新生成plink 文件
ped1 = open(f"{plink_prefix1}.ped",'r')
map1 = open(f"{plink_prefix1}.map",'r')

ped2 = open(f"{plink_prefix2}.ped",'r')
map2 = open(f"{plink_prefix2}.map",'r')

re_ped1 = open(f"{plink_prefix1}_temp.ped",'w')
re_ped2 = open(f"{plink_prefix2}_temp.ped",'w')
rename_chipid_file = open('rename_chipid_temp.txt','w')
common_snp_file = open(f'{plink_prefix1}_{plink_prefix2}_common_snp_temp.txt','w')

提取共同位点

读入两个 map 文件,提取共同的 snp 名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
error_status = False 

#提取共同位点
j = 0
temp_id_set = set()
temp_line_set = set()
map1_dick = {} # SNP名称:染色体_物理位置
for i in map1:
j += 1
f = i.split()
if (len(f) == 4):
if '\t'.join(f) not in temp_line_set:
temp_line_set.add('\t'.join(f))
if f[1] not in temp_id_set:
temp_id_set.add(f[1])
map1_dick[f[1]] = f[0]+"_"+f[3] # SNP名称:“染色体_物理位置”
else:
print(f"Error: duplicated id {f[1]} in different rows in {plink_prefix1}.map\n")
error_status = True
else:
print(f"Error: duplicted row {j} in {plink_prefix1}.map\n")
error_status = True

else:
print(f"Error: {j} row in {plink_prefix1}.map has less or more than 4 coloums\n")
error_status = True

j = 0
temp_id_set = set()
temp_line_set = set()
common_snp_num = 0
for i in map2:
j += 1
f = i.split()
if (len(f) == 4):
if '\t'.join(f) not in temp_line_set:
temp_line_set.add('\t'.join(f))
if f[1] not in temp_id_set:
temp_id_set.add(f[1])
if f[1] in map1_dick:
common_snp_num += 1
common_snp_file.write(f[1]+'\n')
else:
print(f"Error: duplicated id {f[1]} in different rows in {plink_prefix2}.map\n")
error_status = True
else:
print(f"Error: duplicted row {j} in {plink_prefix2}.map\n")
error_status = True

else:
print(f"Error: {j} row in {plink_prefix2}.map has less or more than 4 coloums\n")
error_status = True

print("\nNumber of common SNPs in two plink files: {}\n".format(common_snp_num))

ped文件芯片号重编码

将ped文件中的芯片号改为重编码的数字,如果有映射文件,在此处读取映射文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#创建一个函数,得到芯片号-个体号的字典(chipid_sampleid_dick 提前创建)
def Chipid_Sampleid(chipid_file_name):
chipid_file = open(chipid_file_name,'r')
global error_status
j = 0
temp_id_set = set()
temp_id_set2 = set()
temp_line_set = set()
for i in chipid_file:
j += 1
f = i.split()
if (len(f) >= 2):
if '\t'.join(f) not in temp_line_set:
temp_line_set.add('\t'.join(f))
if f[0] not in temp_id_set:
temp_id_set.add(f[0])
if f[1] not in temp_id_set2:
temp_id_set2.add(f[1])
chipid_sampleid_dick[f[0]] = f[1] #芯片号:个体号
else:
print(f"Error: duplicated sample id {f[1]} in different rows in {chipid_file_name}\n")
error_status = True
else:
print(f"Error: duplicated chip id {f[0]} in different rows in {chipid_file_name}\n")
error_status = True
else:
print(f"Warinning: duplicted row {j} in {chipid_file_name}\n")
else:
print(f"Error: {j} row in {chipid_file_name} has less than 2 coloums\n")
error_status = True
chipid_file.close()



#重命名
newid_dick = {} #chipid/sampleid:newid
sampleid_dick = {} #newid:chipid/sampleid

j = 0

#ped重编码为数字(1-indexed),家系号设定为0。

#查看是否存在chipid1
if args.chipid1 is not None:
chipid_sampleid_dick = {}
Chipid_Sampleid(args.chipid1) #运行函数
for i in ped1:
f = i.split()
if f[1] in chipid_sampleid_dick:
j += 1
newid_dick[chipid_sampleid_dick[f[1]]] = j
sampleid_dick[j] = chipid_sampleid_dick[f[1]]
rename_chipid_file.write(f"{chipid_sampleid_dick[f[1]]}\t{j}\n")
f[1] = str(j)
re_ped1.write('0'+'\t'+'\t'.join(f[1:])+'\n')
else:
print(f"Warning: {f[1]} in {args.plink1}.ped not in chpid file")
else:
for i in ped1:
f = i.split()
j += 1
newid_dick[f[1]] = j
sampleid_dick[j] = f[1]
rename_chipid_file.write(f"{f[1]}\t{j}\n")
f[1] = str(j)
re_ped1.write('0'+'\t'+'\t'.join(f[1:])+'\n')

sample1_num = j
print("Number of records present in plink1 ped file: {}\n".format(sample1_num))

common_sample_list = []
common_sample_num = 0
#查看是否存在chipid2
if args.chipid2 is not None:
chipid_sampleid_dick = {}
Chipid_Sampleid(args.chipid2) #运行函数
for i in ped2:
f = i.split()
if f[1] in chipid_sampleid_dick:
j += 1 #接着上面递增
sampleid_dick[j] = chipid_sampleid_dick[f[1]]
if chipid_sampleid_dick[f[1]] in newid_dick: #提取相同样本
common_sample_num += 1
common_sample_list.append([newid_dick[chipid_sampleid_dick[f[1]]],j]) #两个重编号
rename_chipid_file.write(f"{chipid_sampleid_dick[f[1]]}\t{j}\n")
f[1] = str(j)
re_ped2.write('0'+'\t'+'\t'.join(f[1:])+'\n')
else:
print(f"Warning: {f[1]} in {args.plink2}.ped not in chpid file")
else:
for i in ped2:
f = i.split()
j += 1
sampleid_dick[j] = f[1]
if f[1] in newid_dick:
common_sample_num += 1
common_sample_list.append([newid_dick[f[1]],j]) #两个重编号
rename_chipid_file.write(f"{f[1]}\t{j}\n")
f[1] = str(j)
re_ped2.write('0'+'\t'+'\t'.join(f[1:])+'\n')

sample2_num = j - sample1_num
print("Number of records present in plink2 ped file: {}\n".format(sample2_num))
print("Number of common samples in two plink file: {}\n".format(common_sample_num))

ped1.close()
map1.close()
ped2.close()
map2.close()
re_ped1.close()
re_ped2.close()
rename_chipid_file.close()
common_snp_file.close()

如果上面出现错误,报错,中止程序

1
2
3
4
# 如果输入文件出现错误
if error_status:
print("Please check your file again!")
sys.exit(1)

提取共同位点,合并基因型

提取两个plink文件的共同位点,合并基因型,生成 012 编码的 raw 文件。如果合并的过程中出现报错,程序中止,一般这种报错都是由于非二等位基因造成的,即两个plink文件中某个位点的两个碱基不一样。

1
2
3
4
5
6
7
8
9
#extract merge recodeA

merge_status = os.system(f"plink --allow-extra-chr --chr-set 95 --file {plink_prefix1}_temp --merge {plink_prefix2}_temp --extract {plink_prefix1}_{plink_prefix2}_common_snp_temp.txt --recodeA --out {plink_prefix1}_{plink_prefix2}_merge_temp > /dev/null")

if merge_status != 0:
print("Error in merge two plink data\n")
print("Maybe some common SNP from Complementary strands,check your data again")
sys.exit(1)

将缺失位点替换为3,将合并基因型读取为二维数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#raw文件处理
os.system(f"sed 's/NA/3/g' {plink_prefix1}_{plink_prefix2}_merge_temp.raw > {plink_prefix1}_{plink_prefix2}_merge_temp_sed.raw")
raw_file = open(f"{plink_prefix1}_{plink_prefix2}_merge_temp_sed.raw",'r')
out_file = open(args.out,'w')

gene_list=[]
raw_list=[]
title = raw_file.readline() #剔除标题
for i in raw_file:
f = i.split()
gene_list.append(int(f[1]))
raw_list.append(f[6:])

gene_id_array1 = np.array(gene_list)

array1 = np.array(raw_list, dtype=np.int8)

total_sample_num = array1.shape[0]
snp_num = array1.shape[1]
print("Number of records present in merged ped file: {}\n".format(total_sample_num))
print("Number of SNPs in merged plink files: {}\n".format(snp_num))

比对两个样本的基因型

创建了两个比对函数(是否剔除缺失值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#创建比对函数(是否剔除缺失值)
#这个函数定死了使用 array1 first_index second index
#创建两个函数,这样只要判断一次,而不要每个pair都要判断是否剔除缺失值

def Exclude_Miss_Yes():
#输出变量设置为全局变量
global total_num,no_match_num,no_match_rate
v1 = array1[first_index,:]
v2 = array1[second_index,:]
non_miss_index = np.where((v1!=3) & (v2!=3))
total_num = non_miss_index[0].shape[0]
v1_new = v1[non_miss_index]
v2_new = v2[non_miss_index]
no_match_num = np.count_nonzero(v1_new!=v2_new)
no_match_rate = no_match_num/total_num


def Exclude_Miss_No():
#输出变量设置为全局变量
global total_num,no_match_num,no_match_rate
v1 = array1[first_index,:]
v2 = array1[second_index,:]
total_num = v1.shape[0]
no_match_num = np.count_nonzero(v1!=v2)
no_match_rate = no_match_num/total_num

计算两个样本的不一致,分两种情况,比较所有配对和只比较同名样本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#计算不一致率
out_list = []
if args.cmp_all == 'yes': #计算所有pairs
if args.exclude_miss == 'yes':
for i in range(sample1_num):
for j in range(sample1_num, total_sample_num):
first_index = i
second_index = j
#计算去除缺失后的不一致位点数
Exclude_Miss_Yes()
#newid = index + 1
out_list.append([sampleid_dick[first_index+1],sampleid_dick[second_index+1],f"{no_match_num}",f"{total_num}",f"{no_match_rate:.4f}"])
elif args.exclude_miss == 'no':
for i in range(sample1_num):
for j in range(sample1_num, total_sample_num):
first_index = i
second_index = j
#计算去除缺失后的不一致位点数
Exclude_Miss_No()
#newid = index + 1
out_list.append([sampleid_dick[first_index+1],sampleid_dick[second_index+1],f"{no_match_num}",f"{total_num}",f"{no_match_rate:.4f}"])
elif args.cmp_all == 'no': #只计算个体号相同的pairs
if len(common_sample_list) == 0: #如果没有共同个体,报错
print("Error: two ped file has no same samples, check your data again!")
sys.exit(1)
else:
if args.exclude_miss == 'yes':
for i in common_sample_list:
#index = newid - 1
first_index = i[0] - 1
second_index = i[1] -1
#计算去除缺失后的不一致位点数
Exclude_Miss_Yes()
out_list.append([sampleid_dick[i[0]],sampleid_dick[i[1]],f"{no_match_num}",f"{total_num}",f"{no_match_rate:.4f}"])
elif args.exclude_miss == 'no':
for i in common_sample_list:
#index = newid - 1
first_index = i[0] - 1
second_index = i[1] -1
#计算去除缺失后的不一致位点数
Exclude_Miss_No()
out_list.append([sampleid_dick[i[0]],sampleid_dick[i[1]],f"{no_match_num}",f"{total_num}",f"{no_match_rate:.4f}"])


sorted_out_list = sorted(out_list, key = lambda s:(float(s[4]),s[0])) #不一致率从小到大
for i in sorted_out_list:
out_file.write('\t'.join(i)+'\n')

raw_file.close()
out_file.close()

删除中间文件

1
2
3
4
5
if args.save_temp == "yes":
pass
elif args.save_temp == "no":
os.system(f"rm *temp*")

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2019-2024 Vincere Zhou
  • 访问人数: | 浏览次数:

请我喝杯茶吧~

支付宝
微信