使用Python解决序列重叠问题

 更新时间:2024年03月29日 11:22:05   作者:终是蝶衣梦晓楼  
这篇文章主要为大家详细介绍了如何使用Python解决序列重叠问题,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

tblastn比对出来候选HSP区段,我们需要根据一定的基因长度范围来进行区域延伸去重叠,然后进行下一步操作。对HSP区域的延伸要考虑基因的长度以及目标基因组scafflod or chromosome长度,不是一件容易的事情。

这里采用了dataclass以及改写slots存储数据方式,减少内存占用以及加快读取速度,attrgetter对列表字典结构排序,biopython里的SeqIO.parse感觉挺慢的,不过懒得重写了,用了据说更快的SimpleFastaParser来解析,实际测试下来速度确实更快,主要是用来存储scaffold或chromosome长度,以防延伸超出边界。

去重叠的原理在于先排序,然后判断前一区间的末尾是否小于后一区间开始,若为假则重叠,根据长度/得分来判断删除前一区间还是后一区间。
以下运行得到的结果仍然是blast的tabular格式(之后可以经过一些简单的shell命令处理,可转成bed格式,结合bedtools批量提取序列)。

注:如果你需要去重的格式不为blast tabular,简单的利用一些工具如awk/sed/perl/python/shell各种改变格式就好,只需要第二列的id,第九列的序列起始,第十列的序列结束,第十二列的得分有意义,作为排序用到的字段,其余字段都可缺省

from dataclasses import dataclass, replace
from operator import attrgetter
from Bio.SeqIO.FastaIO import SimpleFastaParser
import getopt
import sys

"""
用法:
#根据基因组文件,延伸上下游区域
example 1. region_tools.py -u <int> -d <int> -i <blast_tabular> -o extend.txt [-g <genome_file>]
#去除重叠区域,保留最长,并用-t指定筛去小于某长度的结果
example 2. region_tools.py -f -i extend.txt -t <region_length> -o filter.txt
#延伸+去重+根据得分保留
example 3. region_tools.py -u <int> -d <int> -f -s -i <blast_tabular> [-g <genome_file>] -o output.txt
"""

@dataclass
class Rec:
    __slots__ = ("q_id", "s_id", "identity", "alignment_length", "mismatches",
                 "gap_openings", "q_start", "q_end", "s_start", "s_end",
                 "e_value", "bit_score")
    q_id: str
    s_id: str
    identity: str
    alignment_length: str
    mismatches: str
    gap_openings: str
    q_start: str
    q_end: str
    s_start: int
    s_end: int
    e_value: str
    bit_score: float


def filter_overlap(sort_data, score=False):
    i = 0
    if sort_data:
        if sort_data[0].s_start <= sort_data[0].s_end:
            start = "s_start"
            end = "s_end"
        else:
            start = "s_end"
            end = "s_start"
        while i <= len(sort_data) - 2:
            if getattr(sort_data[i+1], start) < getattr(sort_data[i], end) and \
                    sort_data[i+1].s_id == sort_data[i].s_id:
                if not score:
                    if abs(sort_data[i+1].s_start - sort_data[i+1].s_end) > \
                            abs(sort_data[i].s_start - sort_data[i].s_end):
                        del sort_data[i]
                    else:
                        del sort_data[i+1]
                else:
                    if sort_data[i+1].bit_score > sort_data[i].bit_score:
                        del sort_data[i]
                    else:
                        del sort_data[i+1]
            else:
                i += 1


class RegionIO:
    def __init__(self, file_path):
        self.file_path = file_path
        self.sort_rf = []
        self.sort_rv = []
        self.recs_forward = []
        self.recs_reverse = []
        with open(self.file_path, "r") as f:
            for rec in f:
                rec = rec.strip().split("\t")
                bit_score = float(rec[11])
                rec_dict = Rec(
                    *rec[0:8],
                    int(rec[8]),
                    int(rec[9]),
                    rec[10],
                    int(bit_score) if bit_score == int(bit_score) else bit_score)
                if rec_dict.s_start <= rec_dict.s_end:
                    self.recs_forward.append(rec_dict)
                else:
                    self.recs_reverse.append(rec_dict)

    def extend_region(self, up=0, down=0, genome_path=""):
        if genome_path == "":
            for n in range(len(self.recs_forward)):
                start = self.recs_forward[n].s_start - up
                end = self.recs_forward[n].s_end + down
                self.recs_forward[n] = \
                    replace(self.recs_forward[n], s_start=start) \
                    if start >= 1 \
                        else replace(self.recs_forward[n], s_start=1)
                self.recs_forward[n] = \
                    replace(self.recs_forward[n], s_end=end) \
                    if start >= 1 \
                        else replace(self.recs_forward[n],
                                     s_end=abs(start)+end+1)
            for n in range(len(self.recs_reverse)):
                start = self.recs_reverse[n].s_end - up
                end = self.recs_reverse[n].s_start + down
                self.recs_reverse[n] = \
                    replace(self.recs_reverse[n], s_end=start) \
                        if start >= 1 \
                        else replace(self.recs_reverse[n], s_end=1)
                self.recs_reverse[n] = \
                    replace(self.recs_reverse[n], s_start=end) \
                        if start >= 1 \
                        else replace(self.recs_reverse[n],
                                     s_start=abs(start)+end+1)
        else:
            id_len = {}
            with open(genome_path,"r") as in_handle:
                for title, seq in SimpleFastaParser(in_handle):
                    id_len[title.split(" ")[0]] = len(seq)
            for n in range(len(self.recs_forward)):
                start = self.recs_forward[n].s_start - up
                end = self.recs_forward[n].s_end + down
                self.recs_forward[n] = \
                    replace(self.recs_forward[n], s_start=start) \
                        if start >= 1 \
                        else replace(self.recs_forward[n], s_start=1)
                if end <= id_len[self.recs_forward[n].s_id]:
                    self.recs_forward[n] = \
                        replace(self.recs_forward[n], s_end=end) \
                            if start >= 1 \
                            else replace(self.recs_forward[n],
                                         s_end=abs(start) + end + 1 if
                                         abs(start) + end + 1 <=
                                         id_len[self.recs_forward[n].s_id] else
                                         id_len[self.recs_forward[n].s_id])
                else:
                    self.recs_forward[n] = \
                        replace(self.recs_forward[n],
                                s_end=id_len[self.recs_forward[n].s_id])
                    self.recs_forward[n] = \
                        replace(self.recs_forward[n],
                                s_start=self.recs_forward[n].s_start
                                             -(end-id_len[self.recs_forward[n].s_id])
                            if self.recs_forward[n].s_start
                                -(end-id_len[self.recs_forward[n].s_id]) >= 1 else 1)
            for n in range(len(self.recs_reverse)):
                start = self.recs_reverse[n].s_end - up
                end = self.recs_reverse[n].s_start + down
                self.recs_reverse[n] = \
                    replace(self.recs_reverse[n], s_end=start) \
                        if start >= 1 \
                        else replace(self.recs_reverse[n], s_end=1)
                if end <= id_len[self.recs_reverse[n].s_id]:
                    self.recs_reverse[n] = \
                        replace(self.recs_reverse[n], s_start=end) \
                            if start >= 1 \
                            else replace(self.recs_reverse[n],
                                         s_start=abs(start) + end + 1 if
                                         abs(start) + end + 1 <=
                                         id_len[self.recs_forward[n].s_id] else
                                         id_len[self.recs_forward[n].s_id])
                else:
                    self.recs_reverse[n] = \
                        replace(self.recs_reverse[n],
                                s_start=id_len[self.recs_reverse[n].s_id])
                    self.recs_reverse[n] = \
                        replace(self.recs_reverse[n],
                                s_end=self.recs_reverse[n].s_end
                                           -(end - id_len[self.recs_reverse[n].s_id])
                            if (self.recs_reverse[n].s_end
                                -(end - id_len[self.recs_reverse[n].s_id])) >= 1 else 1)

    def region_sort(self):
        self.sort_rf = sorted(self.recs_forward,
                              key=attrgetter("s_id", "s_start", "s_end"))
        self.sort_rv = sorted(self.recs_reverse,
                              key=attrgetter("s_id", "s_end", "s_start"))

    def filter_threshold(self, threshold):
        if int(threshold) > 0:
            self.sort_rf = [record for record in self.sort_rf
                         if abs(record.s_start-record.s_end)+1 >= threshold]
            self.sort_rv = [record for record in self.sort_rv
                         if abs(record.s_start-record.s_end)+1 >= threshold]

    def write(self, output_path):
        with open(output_path, "w") as o:
            for rec in self.sort_rf+self.sort_rv:
                o.write(f"{rec.q_id}\t{rec.s_id}"
                        f"\t{rec.identity}\t{rec.alignment_length}"
                        f"\t{rec.mismatches}\t{rec.gap_openings}"
                        f"\t{rec.q_start}\t{rec.q_end}"
                        f"\t{rec.s_start}\t{rec.s_end}"
                        f"\t{rec.e_value}\t{rec.bit_score}\n")


class StepError(Exception):
    def __init__(self, error):
        self.error = error


if __name__ == "__main__":
    path, up, down, filt, genome, output, threshold, score = "", 0, 0, False, "", "", 0, False
    try:
        opts, args = getopt.getopt(sys.argv[1:], "-i:-g:-o:-u:-d:-f-t:-s")
        for opt_name, opt_value in opts:
            if opt_name == "-i":
                path = opt_value
            if opt_name == "-u":
                up = int(opt_value)
            if opt_name == "-d":
                down = int(opt_value)
            if opt_name == "-f":
                filt = True
            if opt_name == "-t":
                threshold = int(opt_value)
            if opt_name == "-g":
                genome = opt_value
            if opt_name == "-o":
                output = opt_value
            if opt_name == "-s":
                score = True
    except getopt.GetoptError as e:
        for error in e.args:
            print("".join(error))
    results = RegionIO(path)
    if up or down:
        if genome:
            results.extend_region(up=up, down=down, genome_path=genome)
        else:
            results.extend_region(up=up, down=down)
    results.region_sort()
    if filt:
        filter_overlap(results.sort_rf, score)
        filter_overlap(results.sort_rv, score)
    if threshold > 0:
        results.filter_threshold(threshold)
    if output:
        results.write(output)
    else:
        results.write(path+"_region_tools")

到此这篇关于使用Python解决序列重叠问题的文章就介绍到这了,更多相关Python序列重叠内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 使用Python webdriver图书馆抢座自动预约的正确方法

    使用Python webdriver图书馆抢座自动预约的正确方法

    这篇文章主要介绍了使用Python webdriver图书馆抢座自动预约的正确方法,本文通过图文实例相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • PyCharm出现卡顿问题的解决

    PyCharm出现卡顿问题的解决

    这篇文章主要介绍了PyCharm出现卡顿问题的解决方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-02-02
  • Python数据结构之栈、队列及二叉树定义与用法浅析

    Python数据结构之栈、队列及二叉树定义与用法浅析

    这篇文章主要介绍了Python数据结构之栈、队列及二叉树定义与用法,结合具体实例形式分析了Python数据结构中栈、队列及二叉树的定义与使用相关操作技巧,需要的朋友可以参考下
    2018-12-12
  • python使用requests模块实现爬取电影天堂最新电影信息

    python使用requests模块实现爬取电影天堂最新电影信息

    这篇文章主要介绍了python使用requests模块实现爬取电影天堂最新电影信息,本文通过实例代码给大家介绍了str/list/tuple三者之间怎么相互转换,需要的朋友可以参考下
    2019-04-04
  • 如何通过Python实现RabbitMQ延迟队列

    如何通过Python实现RabbitMQ延迟队列

    这篇文章主要介绍了如何通过Python实现RabbitMQ延迟队列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • Python机器学习之PCA降维算法详解

    Python机器学习之PCA降维算法详解

    今天带大家复习python机器学习的知识点,文中对PCA降维算法介绍的非常详细,对正在学习python机器学习的小伙伴们有很好地帮助,需要的朋友可以参考下
    2021-05-05
  • 对python append 与浅拷贝的实例讲解

    对python append 与浅拷贝的实例讲解

    今天小编就为大家分享一篇对python append 与浅拷贝的实例讲解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-05-05
  • baselines示例程序train_cartpole.py的ImportError

    baselines示例程序train_cartpole.py的ImportError

    这篇文章主要为大家介绍了baselines示例程序train_cartpole.py的ImportError引入错误详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • 一文带你掌握Python中的双下划线写法

    一文带你掌握Python中的双下划线写法

    在 Python 中,双下划线--也被称为“dunder”--是一种用于修饰类属性名称或类方法名称的行为,下面小编就来和大家详细讲讲如何在Python中使用双下划线吧
    2023-10-10
  • PyCharm实现本地恢复或查看历史代码

    PyCharm实现本地恢复或查看历史代码

    这篇文章主要介绍了PyCharm实现本地恢复或查看历史代码,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02

最新评论