基于芯片数据的亲子鉴定分析代码

本软件通过利用SNP信息根据孟德尔错误位点比例,对于系谱中均具有基因型的亲子对进行亲子鉴定,判断系谱是否正确。这里的孟德尔错误位点指个体和亲本之间具有相反的纯和子的位点(例如个体的基因型为AA,亲本的基因型为GG),依据孟德尔定律亲子间不应存在这种情况,但由于突变和分型错误等原因,真实的亲子对之间仍存在少量的孟德尔错误位点。当使用的位点数目较多时,通过设定一个合理的阈值可以明确判定系谱中正确的亲子关系和错误的亲子关系。

使用软件

软件

python 3.0 以上版本,事先安装 numpy 模块

输入文件格式

  • raw 文件:通过 plink 软件对原始芯片数据(未填充)采用 --recodeA 选项生成的文件,后缀为 .raw

  • 映射文件: 两列,所有基因型个体的芯片号与个体号,空格或 tab分隔,无标题。

  • 系谱文件:三列,个体号-父号-母号,空格或 tab分隔,无标题。

输出文件说明

  • check_pedigree.csv: 五列,个体 - 原系谱中的父本 - 原系谱中的母本 - 父本判定信息 - 母本判定信息( 三种判定信息,par_nogeno : 亲本没有基因型信息,Match : 亲本符合阈值要求, No-Match : 亲本不符合阈值要求 )
  • check_paternity_prob.csv: 七列,亲子对类型 - 个体 - 亲本 - 错误位点数 - 匹配的位点总数(剔除缺失位点)- 错误位点概率 - 判定信息

运行代码

代码文件见:paternity_check_1.0.py

将输入文件和本程序放在同一文件夹下,运行命令示范为

1
python paternity_check_1.0.py --raw test.raw --chipid test_id.txt --pedigree pedigree.txt --check-prob 0.005

可用参数说明:

--raw : 基因型文件名,必选。

--chipid : 基因型个体映射文件名,必选。

--pedigree : 系谱文件名,必选

--list : 需要进行亲子鉴定的个体文件名,可选。如果提供该参数,则只对这个文件中的个体进行亲子鉴定;如果不提供该参数,则默认对所有基因型个体进行亲子鉴定。

--check-prob : 亲子鉴定孟德尔错误率的阈值,可选,默认为0.005

--miss-snp : SNP位点的基因分型缺失率阈值,可选,默认为0.2

--maf : SNP 位点的最小等位基因频率阈值,可选,默认为0.01

--miss-sample : 样本的基因分型缺失率阈值,可选,默认为0.5

代码说明

读取文件

这里首先打印了一下 raw 文件中的样本数目,实质采用 linux 命令 wc -l

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#打印raw文件行数
raw_lines = os.popen("wc -l {}".format(args.raw)).read().split()[0]
true_raw_lines = int(raw_lines) -1
print("Number of records in raw file: {}\n".format(true_raw_lines))

#读取文件
raw_file = open(args.raw,'r')
chipid_file = open(args.chipid,'r')
pedigree_file = open(args.pedigree,'r')

#new_raw = open("newid.raw",'w+')

check_prob = open("chek_paternity_prob.txt",'w')
check_pedigree = open("check_pedigree.txt",'w')

映射与raw文件处理

这里先对映射文件进行处理,过滤条件有两个:

  1. 每一行的列数必须大于等于2
  2. 重复个体号会保留第一个
1
2
3
4
5
6
7
8
9
10
11
set1=set();dick={}
j=0
for i in chipid_file:
f = i.split()
j+=1
if len(f) >= 2:
if f[1] not in set1: #去除重复个体号
set1.add(f[1])
dick[f[0]] = f[1]
else:
print("Warning: {} row in chipid file has less than 2 coloum\n".format(j))

再对 raw 文件进行处理,提取在映射文件中的个体的基因型,生成基因型的二维数组(array1)。这里将缺失值NA替换为3。gene_id_array 是相应的个体号一维数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
gene_list=[]
raw_list=[]
raw_file.readline() #剔除标题
for i in raw_file:
f = i.split()
if f[1] in dick:
gene_list.append(dick[f[1]])
raw_list.append('\t'.join(f[6:]))
#new_raw.write('\t'.join(f[6:])+'\n')
else:
print("Warning: {} in raw file not in chpid file or is duplicated samples\n".format(f[1]))

gene_id_array = np.array(gene_list)

#new_raw.seek(0)

# snp 缺失值过滤,结果文件是 array2
#numpy 0/1 结果会显示为 nan
array1 = np.genfromtxt(raw_list, dtype=np.int8, missing_values="NA", filling_values=3)
#array2 = array1.copy()

sample_num = array1.shape[0]
snp_num = array1.shape[1]


print("Number of SNPs in raw file: {}\n".format(snp_num))
print("Number of records present in chipid file: {}\n".format(sample_num))

位点和样本质控

位点缺失率

这里统计每一列(SNP)中等于3(表示缺失)的比例,剔除缺失率高于阈值的SNP。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool_array1 = array1==3

total_miss = bool_array1.sum()/(bool_array1.shape[0] * bool_array1.shape[1])
toatl_callrate = 1 - total_miss
print("Total genotyping rate is {:.3f}\n".format(toatl_callrate))

snp_missing_count = bool_array1.sum(axis=0)


snp_missing_prob = snp_missing_count/sample_num

snp_missing_min = snp_missing_prob.min()
snp_missing_mean = snp_missing_prob.mean()
snp_missing_max = snp_missing_prob.max()

#打印snp缺失值情况
print("SNP missing\n")
print(" Min missing rate: {:.3f}\n".format(snp_missing_min))
print(" AVG missing rate: {:.3f}\n".format(snp_missing_mean))
print(" Max missing rate: {:.3f}\n".format(snp_missing_max))
#np.savetxt("snp_missing.txt",snp_missing_prob, fmt='%.4f')

snp_missing_pick = snp_missing_prob < float(args.miss_snp)
array2 = array1[:,snp_missing_pick]
snp_miss_remove = array1.shape[1] - array2.shape[1]
print("{} snps removed due to missing genotype data\n".format(snp_miss_remove))
del array1
del bool_array1

MAF 过滤

统计每个SNP的最小等位基因频率(MAF),剔除低于阈值的位点。

这里的做法是先将缺失值替换为0,统计每一列的和作为分子,分母为该列非缺失样本数的两倍。这么做的原理是 plink 软件 在生成 raw 文件时是按照最小等位基因计数的(raw 文件中的 0 1 2 表示最小等位基因的数目),因此将缺失值替换为0,再求每列的和,便得到最小等位基因的总数,除以所有等位基因总数,便得到了最小等位基因频率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# maf 过滤,结果文件是 array3

#计算array2每列的缺失值
bool_array2 = array2==3
snp_missing_count2 = bool_array2.sum(axis=0)

#将array2 中的3替换为0,提取备份为 array2_copy
array2_copy = array2.copy()
array2[array2==3] = 0

maf_sum = array2.sum(axis=0)
#print("maf_sum[1:5] are {}".format(maf_sum[1:5]))
maf_denominator = 2*(sample_num-snp_missing_count2)
#print("sample_num are {}".format(sample_num))
#print("maf_denominator[1:5] are {}".format(maf_denominator[1:5]))


maf_prob = maf_sum/maf_denominator
#print("maf_prob[1:5] are {}".format(maf_prob[1:5]))

#maf_out = np.column_stack((maf_sum,maf_denominator,maf_prob))
#np.savetxt("maf_all.txt",maf_out)
#np.savetxt("maf.txt",maf_prob, fmt='%.4f')

maf_min = maf_prob.min()
maf_mean = maf_prob.mean()
maf_max = maf_prob.max()

#打印MAF情况
print("Minor alleles frequencies\n")
print(" Min alleles frequencies: {:.3f}\n".format(maf_min))
print(" AVG alleles frequencies: {:.3f}\n".format(maf_mean))
print(" Max alleles frequencies: {:.3f}\n".format(maf_max))

snp_maf_pick = maf_prob > float(args.maf)
array3 = array2_copy[:,snp_maf_pick]

snp_maf_remove = array2_copy.shape[1] - array3.shape[1]
print("{} snps removed due to minor allele threshold\n".format(snp_maf_remove))
del array2
del array2_copy

样本缺失率过滤

先过滤SNP,再过滤样本,这和 plink 软件的逻辑一致,也符合实际需求。毕竟每个样本都是花了钱的,能不剔除就不剔除。

原理同SNP过滤,只不过列换成了行(每一列表示一个SNP,每一行表示一个样本)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# sample_miss 过滤,结果为 array4

sample_num3 = array3.shape[0]
snp_num3 = array3.shape[1]

bool_array3 = array3==3

sample_missing_count = bool_array3.sum(axis=1)
sample_missing_prob = sample_missing_count/snp_num3

sample_missing_min = sample_missing_prob.min()
sample_missing_mean = sample_missing_prob.mean()
sample_missing_max = sample_missing_prob.max()

#打印sample缺失值情况
print("Sample missing\n")
print(" Min missing rate: {:.3f}\n".format(sample_missing_min))
print(" AVG missing rate: {:.3f}\n".format(sample_missing_mean))
print(" Max missing rate: {:.3f}\n".format(sample_missing_max))

#np.savetxt("sample_missing.txt",sample_missing_prob, fmt='%.4f')

sample_missing_pick = sample_missing_prob < float(args.miss_sample)
array4 = array3[sample_missing_pick,:]

gene_id_array_new = gene_id_array[sample_missing_pick] #样本过滤后的基因个体号

sample_miss_remove = array3.shape[0] - array4.shape[0]

print("{} samples removed due to missing genotype data\n".format(sample_miss_remove))

print("{} samples and {} snps pass QC finally\n".format(array4.shape[0],array4.shape[1]))

如果剔除了样本,将样本和缺失率写入一个文件中

1
2
3
4
5
6
7
8
9
10
11
12
#如果删除了样本,将删除的样本id和缺失率写入一个新文件
if sample_miss_remove > 0 :
filter_sample_file = open('sample_filted.txt','w')
sample_missing_fail = sample_missing_prob >= float(args.miss_sample)
gene_id_fail_array = gene_id_array[sample_missing_fail]
sample_missing_prob_array = sample_missing_prob[sample_missing_fail]
for i in range(gene_id_fail_array.shape[0]):
filter_sample_file.write(str(gene_id_fail_array[i])+'\t'+str("{:.4f}".format(sample_missing_prob_array[i]))+'\n')
filter_sample_file.close()

del array3
del bool_array3

亲子鉴定

首先将基因型数组改为 1 2 3 0 ,对应之前的 0 1 2 3 。改完以后,0 就表示缺失,1 2 3 表示三种基因型。这样改方便之处在于:

  1. 缺失值是0,0乘任何数还是0。
  2. 1,2,3 都是质数,这三个数之间任选两个数进行相乘得到的积是唯一的,比如如果乘积为2,那么只能是1和2相乘,3只能是1和3相乘……

因此,这样我们在比对两个样本的基因型时,只需要将两个个体的基因型进行相乘,如果某个位点乘积为0,表示存在缺失(至少一个个体的基因型为缺失),无法比较;如果某个位点乘积为 3,表示在这个位点上存在孟德尔错误(两个个体为相反纯合子)。这样,我们统计两个个体的缺失位点和孟德尔错误位点非常方便。

1
2
3
4
5
6
7
8
9
#开始进行亲子鉴定
#array4 改为 1 2 3 0(缺失)
#这样两个array 的乘积的array为3的是错误位点,是0的是缺失位点。

array4 += 1
array4[array4==4] = 0

snp_num4 = array4.shape[1]

清洗系谱

这里剔除了第一列个体没有基因型的行,因为个体没有基因型就不可能进行亲子鉴定 (如果提过了 --list 选项,那么只会提取需要亲子鉴定的个体的系谱)。

这里得到个体和亲本均有基因型的亲子对列表:offspring_sire_listoffspring_dam_list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#清洗系谱,剔除第一列个体没有基因型的行

offspring_sire_list = []
offspring_dam_list = []
pedigree_list = []

#判断 args.list 是否存在
id_set = set()
if args.list is None:
for i in pedigree_file:
f = i.split()
id_set.add(f[0])
else:
list_file = open(args.list,'r')
for i in list_file:
f = i.split()
id_set.add(f[0])

test_records=0
sire_pairs=0
dam_pairs=0
pedigree_file.seek(0)
for i in pedigree_file:
f = i.split()
if (f[0] in gene_id_array_new) and (f[0] in id_set):
pedigree_list.append(f[0:3])
test_records+=1
if f[1] in gene_id_array_new:
sire_pairs+=1
offspring_sire_list.append([f[0],f[1]])
if f[2] in gene_id_array_new:
dam_pairs+=1
offspring_dam_list.append([f[0],f[2]])

亲子鉴定

这里的思路很简单,就是上面提到了,计算两个个体基因型的乘积,统计乘积等于3的位点数目,除以位点总数(剔除缺失个体后),便得到了孟德尔错误率,再与给定的阈值进行比对。如果小于阈值,则认为是正确的亲子对,反之则认为错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
check_dick = {}

#np.savetxt("gene_id_array_new.txt",gene_id_array_new, fmt="%s")

#np.where 结果是一个元组,元组里是一个索引构成的数组,如(array([1, 2]),)
conflict_sire_pairs=0
conflict_dam_pairs=0
for i in offspring_sire_list:
first_index = np.where(gene_id_array_new == i[0])
second_index = np.where(gene_id_array_new == i[1])
first_array = array4[first_index]
second_array = array4[second_index]
product_array = first_array * second_array
error_snp_nums = (product_array==3).sum()
miss_snp_nums = (product_array==0).sum()
total_snp_nums = snp_num4 - miss_snp_nums
error_snp_prob = error_snp_nums/total_snp_nums
if error_snp_prob < float(args.check_prob):
match_status = "Match"
else:
match_status = "No-Match"
conflict_sire_pairs+=1
check_dick[tuple(i)] = match_status
check_prob.write('Offspring-Sire'+'\t'+i[0]+'\t'+i[1]+'\t'+str(error_snp_nums)+'\t'+str(total_snp_nums)+'\t'+str(error_snp_prob)+'\t'+match_status+'\n')

for i in offspring_dam_list:
first_index = np.where(gene_id_array_new == i[0])
second_index = np.where(gene_id_array_new == i[1])
first_array = array4[first_index]
second_array = array4[second_index]
product_array = first_array * second_array
error_snp_nums = (product_array==3).sum()
miss_snp_nums = (product_array==0).sum()
total_snp_nums = snp_num4 - miss_snp_nums
error_snp_prob = error_snp_nums/total_snp_nums
if error_snp_prob < float(args.check_prob):
match_status = "Match"
else:
match_status = "No-Match"
conflict_dam_pairs+=1
check_dick[tuple(i)] = match_status
check_prob.write('Offspring-Dam'+'\t'+i[0]+'\t'+i[1]+'\t'+str(error_snp_nums)+'\t'+str(total_snp_nums)+'\t'+str("{:.4f}".format(error_snp_prob))+'\t'+match_status+'\n')

for i in pedigree_list:
offspring_sire = (i[0],i[1])
offspring_dam = (i[0],i[2])
if offspring_sire in check_dick:
sire_info = check_dick[offspring_sire]
else:
sire_info = "par_nogeno"
if offspring_dam in check_dick:
dam_info = check_dick[offspring_dam]
else:
dam_info = "par_nogeno"
check_pedigree.write('\t'.join(i)+'\t'+sire_info+'\t'+dam_info+'\n')


raw_file.close()
chipid_file.close()
pedigree_file.close()
#new_raw.close()
check_prob.close()
check_pedigree.close()

print("Records tested: {}\n".format(test_records))
print(" Pair parent/progeny tested: {}\n".format(sire_pairs+dam_pairs))
print(" Pair with conflicts: {}\n\n".format(conflict_sire_pairs+conflict_dam_pairs))

print("Sire-progeny tested: {}\n".format(sire_pairs))
print("Sire-progeny with conflicts: {}\n\n".format(conflict_sire_pairs))

print("Dam-progeny tested: {}\n".format(dam_pairs))
print("Dam-progeny with conflicts: {}\n\n".format(conflict_dam_pairs))

print("End paternity check\n")

end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("\nFinish time: {}\n".format(end_time))

不足

  1. 质控过程缺少了重复样本检验(基因型相同的样本)

  2. 亲子鉴定阈值需要通过经验给定,如果阈值设定不好,可能无法准确地将正确的亲子对和错误的亲子对分开,可能会造成错误鉴定。

  3. 后续可以对存在错误的个体查找真实亲本,方法就是对该个体遍历所有的候选亲本。

  4. 因为实际生产中可能存在采样错误(采的样本/基因型与标记的个体号不一致),因此本流程找到的错误系谱也有可能是由于采样错误导致的(例如这头个体本身系谱可能没错,但是可能采样采错了,不是采的这个个体,那么亲子鉴定肯定显示为错误)。也就是说,亲子鉴定找的错误实际由采样错误+系谱错误两部分组成。

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2019-2024 Vincere Zhou
  • 访问人数: | 浏览次数:

请我喝杯茶吧~

支付宝
微信