import logging
import subprocess
from pathlib import Path
from typing import Union
import pandas as pd
# BioPython
from Bio import SeqIO, pairwise2
from Bio.Blast import NCBIWWW, NCBIXML
from pydantic import BaseModel, Field
[docs]
def parse_blast(
results_file: Union[str, Path],
e_val_thresh: float,
user_email: str,
verbose: bool = False,
) -> pd.DataFrame:
"""Parse data from BLAST xml file
Parameters
----------
results_file : str
Path to BLAST results
e_val_thresh : float
Threshold to filter BLAST results
user_email : str
Email to use for the Entrez query
verbose : bool, optional
Whether to print information, by default False
Returns
-------
pd.DataFrame
DataFrame with BLAST entries
"""
# Return DataFrame with BLAST results
dfs = []
for record in NCBIXML.parse(open(results_file)):
if record.alignments:
query = record.query[:100]
if verbose:
logging.debug("\n")
logging.debug("query: %s" % query)
for align in record.alignments:
for hsp in align.hsps:
if hsp.expect < e_val_thresh:
# Print sequence identity, title, and gapless sequence substring that aligns
hsps0 = align.hsps[0]
sequence_to_model = hsps0.sbjct.replace("-", "")
pidentity = round(
100.0
* hsps0.identities
/ (hsps0.query_end - hsps0.query_start + 1),
2,
)
title = align.title
id = "".join(title.split(" ")[0])
description = " ".join(title.split(" ")[1:])
virus_host = ""
virus_organism = ""
if len(user_email) > 0:
virus_host, virus_organism = search_host(
align.hit_id.split("|")[1], user_email
)
if verbose:
logging.debug(
f"Record {virus_organism} of length {hsps0.identities} infecting host {virus_host}, score {pidentity}: {align.title}"
)
data = {
"query": [query],
"ID": [id],
"description": [description],
"sequence": [sequence_to_model],
"host": [virus_host],
"organism": [virus_organism],
"score": [pidentity],
}
df_row = pd.DataFrame(data)
dfs.append(df_row)
df = pd.concat(dfs, axis=0, ignore_index=True, sort=False).dropna(
axis=1, how="all"
)
df.sort_values(by=["score"], ascending=False)
return df
[docs]
def get_blast_seqs(
seq_source: str,
save_folder: Path,
input_type="fasta",
nhits=100,
nalign=500,
e_val_thresh=1e-20,
database="refseq_protein",
xml_file="results.xml",
verbose=True,
save_csv=None,
email="",
pdb_file=None,
) -> pd.DataFrame:
"""Run a BLAST search on a protein sequence.
Parameters
----------
seq_source : str
Source with the sequence.
save_folder : Path
Path to folder to save BLAST results
input_type : str, optional
Type of sequence source ["pre-cal", "fasta", "sequence"], by default "fasta"
nhits : int, optional
Number of hits, hitlist_size parameter in BLAST, by default 100
nalign : int, optional
Number of alignments, alignments parameter in BLAST, by default 500
e_val_thresh : float, optional
Threshold to filter BLAST results, by default 1e-20
database : str, optional
Name of BLAST database, by default "refseq_protein"
xml_file : str, optional
Name to be given to XML with BLAST results, by default "results.xml"
verbose : bool, optional
Whether to print info on BLAST search, by default True
save_csv : Union[str, None], optional
CSV file name to optionally save dataframe, by default None
email : str, optional
Email to use for the Entrez query, by default ""
pdb_file: str, optional
Path to PDB file used to calculate pocket similarity score
Returns
-------
pd.DataFrame
DataFrame with Blast results.
"""
if input_type == "pre-calc":
matches_df = parse_blast(seq_source, e_val_thresh, email, verbose)
else:
if input_type == "fasta":
# Input is file name
sequence = open(seq_source).read()
elif input_type == "sequence":
# Input is sequence
sequence = seq_source
elif input_type == "pdb":
# Input is a PDB file
seq_source = Path(seq_source)
seq, fasta_out = pdb_to_seq(
seq_source, fasta_out=f"{seq_source.stem}.fasta"
)
sequence = open(fasta_out).read()
logging.info(
f"Sequence was extracted from PDB file and saved as {fasta_out}"
)
else: # Another source?
raise ValueError("unknown input type")
logging.info(
f"BLAST search with {nalign} alignments, expect {e_val_thresh}, {nhits} hitlist_size and {nhits} descriptions"
)
# Retrieve blastp results
result_handle = NCBIWWW.qblast(
"blastp",
database,
sequence,
hitlist_size=nhits,
alignments=nalign,
expect=e_val_thresh,
descriptions=nalign,
)
save_file = save_folder / xml_file
with open(save_file, "w") as file:
blast_results = result_handle.read()
file.write(blast_results)
matches_df = parse_blast(save_file, e_val_thresh, email, verbose)
# Add Binding site similarity score if pdb file was provided
if input_type == "pdb" and not pdb_file:
pdb_file = seq_source
if pdb_file:
logging.info("Calculating (approximated) binding pocket similarity score")
matches_df = bsite_similarity(
matches_df, pdb_file, ref_chain="A", lig_resname="LIG"
)
if save_csv:
matches_df.to_csv(save_folder / save_csv, index=False)
return matches_df
[docs]
class PDBRecord(BaseModel):
label: str = Field(description="RefID label of the Blast PDB record")
description: str = Field(description="Description of the Blast record")
seq: str = Field(description="Sequence")
chain: int = Field(description="Chain identifier")
pdb_id: str = Field("", description="The pdb ID of entry")
pdb_file: str = Field("", description="The PDB file Path that was saved")
[docs]
class PDBEntry(BaseModel):
seq: str = Field(description="Input with the protein sequence")
type: str = Field(description="Type of input")
[docs]
def retrieve_pdb(
self,
results_folder: Path,
min_id_match=99,
ref_only=False,
): # ->list[PDBRecord]:
"""Retrieve the PDB record of a given sequence.
Parameters
----------
results_folder : Path
Path store results
min_id_match : int, optional
Minimum match score (in %) to use as criteria to filter PDB blast entries, by default 99
ref_only : bool, optional
Whether to save the PDBs of only the reference seq or for all BLAST selections, by default False
Returns
-------
_type_
_description_
Raises
------
ValueError
_description_
"""
import prody
record_name = "pdb_blast.xml"
# Find pdb matching given sequence
matches_df = get_blast_seqs(
self.seq,
results_folder,
input_type=self.type,
xml_file=record_name,
nalign=50,
database="pdb",
verbose=False,
)
logging.info(f"Saving blast results in {results_folder / record_name}")
# Load original sequence names and descriptors for reference
pdb_file_record = []
for seq_record in SeqIO.parse(self.seq, self.type):
seq_id = seq_record.id
seq_description = seq_record.description
seq_chain = int(seq_record.id.split("|")[1].split(".")[-1])
pdb_file_record.append(
PDBRecord(
label=seq_id,
description=seq_description,
seq=str(seq_record.seq),
chain=seq_chain,
)
)
best_scores = []
for query in matches_df["query"].unique():
best_scores.append(
matches_df.loc[matches_df["query"] == query, "score"].max()
)
match_hits = self.parse_pdb_blast_results(matches_df, min_score=min_id_match)
for i, hits in enumerate(match_hits):
if len(hits) == 0:
logging.warning("The record doesn't have a PDB with high confidence")
if i == 0:
raise ValueError(
"The reference sequence MUST have an existing PDB entry. Maybe check the sequence?"
)
else:
# Return the pdb entry with the max alignment and max resolution (if there are multiple matches)
best_pdb_record = self.choose_best_pdb_entry(hits, best_scores[i])
# pdbl = PDBList()
# filename = pdbl.retrieve_pdb_file(best_pdb_record.lower(), file_format="pdb", pdir=results_folder)
filename = prody.fetchPDB(best_pdb_record, folder=str(results_folder))
subprocess.run(["gunzip", filename])
pdb_file_record[i].pdb_file = filename
pdb_file_record[i].pdb_id = best_pdb_record
# If I'm only requesting the reference PDB, there's no need to generate more files.
if ref_only:
break
return pdb_file_record
[docs]
@staticmethod
def parse_pdb_blast_results(blast_df: pd.DataFrame, min_score: int) -> list[dict]:
"""For a pdb database search extract pdb ids with a minimum score.
Outputs are lists because iterables are needed for functions down the pipeline.
Parameters
----------
blast_df : pd.DataFrame
DataFrame with BLAST results
min_score : int
The minimum match score (in %) that a BLAST entry must have w.r.t the reference
Returns
-------
list[dict]
Blast hits for each query, where data on each hit can be accessed through its PDB ID.
"""
match_hits = []
for q in blast_df["query"].unique(): # Loop over queries
hits = {}
# for e in range(len(blast_ids)): # Loop over BLAST entries
for idx, e in blast_df.loc[blast_df["query"] == q].iterrows():
if e["score"] >= min_score:
raw_id = e["ID"].split("|")
pdb_index = raw_id.index("pdb")
pdb_id = raw_id[pdb_index + 1]
hits[pdb_id] = {}
hits[pdb_id]["percent_identity"] = e["score"]
hits[pdb_id]["sequence"] = e["sequence"]
match_hits.append(hits)
return match_hits
@staticmethod
def request_rcsb_pdbid(pdb_id):
import requests
"""A function to request a protein entry from the rcsb API with a pdb ID"""
requestURL = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}"
r = requests.get(requestURL, headers={"Accept": "application/json"})
if not r.ok:
r.raise_for_status()
return r.json()
@classmethod
def check_resolution(self, pdb_id):
fjson = self.request_rcsb_pdbid(pdb_id)
return fjson["rcsb_entry_info"]["resolution_combined"][0]
[docs]
@classmethod
def choose_best_pdb_entry(self, hits, best_match_percent):
"""If a sequence have different PDB files all with the same alignment match, we choose and retrieve the one with the highest resolution"""
max_res = 100 # some unrealistically large number?
best_pdb_record = ""
for h in hits:
id_match = hits[h]["percent_identity"]
if id_match == best_match_percent:
res = self.check_resolution(h)
if res < max_res:
best_pdb_record = h
max_res = res
else: # We're only interested in the entries with the max possible alignment
break
logging.debug(
f"The best PDB entry is {best_pdb_record}, with match {best_match_percent}% and res {max_res}A"
)
return best_pdb_record
[docs]
def pdb_to_seq(pdb_input=Path, chain="A", fasta_out=None):
"""Given a PDB file, extract the protein sequence
Parameters
----------
pdb_input : Path, optional
Path to the input pdb file, by default Path
chaint : str, optional
Chain that will be extracted from the PDB, by default "A"
fasta_out : str, optional
Path to optionally save the output fasta sequence, by default None
"""
from Bio import SeqIO
from Bio.PDB import PDBParser
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqUtils import seq1
# Extract sequence from PDB
pdbparser = PDBParser()
pdb_id = 00000
structure = pdbparser.get_structure(pdb_id, pdb_input)
chains = {
chain.id: seq1("".join(residue.resname for residue in chain))
for chain in structure.get_chains()
}
sequence = chains[chain]
# Save sequence as a SeqRecord
parts = pdb_input.stem.split("_")
entry_name = parts[0]
description = "No description"
if len(parts) > 1:
description = "_".join(parts[1:])
biop_sequence = Seq(sequence)
seq_record = SeqRecord(biop_sequence, id=entry_name, description=description)
# Save as fasta file
if fasta_out is None:
return seq_record
else:
with open(fasta_out, "w") as output_handle:
SeqIO.write(seq_record, output_handle, "fasta")
return seq_record, fasta_out
[docs]
def search_host(hit_id, user_email):
from Bio import Entrez
Entrez.email = user_email
handle = Entrez.efetch(db="protein", id=hit_id, retmode="xml", rettype="gb")
record = Entrez.read(handle)[0]
handle.close()
# The Entrez output is a very illogical list of dictionaries so we have to search through it for the host
# It works. Don't ask.
search_key = "host"
host_value_key = "GBQualifier_value"
host = "Not found"
for d1 in record["GBSeq_feature-table"]:
for d2 in d1["GBFeature_quals"]:
if d2["GBQualifier_name"] == search_key:
host = d2[host_value_key]
break # Stop if the key is found
# Search Organism name
search_key = "organism"
organism = "Not found"
for d1 in record["GBSeq_feature-table"]:
for d2 in d1["GBFeature_quals"]:
if d2["GBQualifier_name"] == search_key:
organism = d2[host_value_key]
break
return host, organism
[docs]
def get_bsite(
pdb_ref: Union[str, Path],
chain_ref: str = "A",
lig_ref: str = "LIG",
bs_dist: Union[float, int] = 4.5,
):
import MDAnalysis as mda
from MDAnalysis.lib.util import convert_aa_code
u_ref = mda.Universe(pdb_ref)
res_ref = u_ref.select_atoms(
f"protein and chainid {chain_ref} and not name H* and around {bs_dist} resname {lig_ref} and not resname ACE and not resname NME and not resname NMA"
).residues
bs_ref = []
bs_ref_num = []
ref_len = len(res_ref)
for i in range(ref_len):
ref = convert_aa_code(res_ref[i].resname) if i < ref_len else "-"
ref_n = res_ref[i].resid if i < ref_len else "-"
bs_ref.append(ref)
bs_ref_num.append(ref_n)
return bs_ref, bs_ref_num
[docs]
def bsite_similarity(
df: pd.DataFrame, pdb_file: str, ref_chain: str = "A", lig_resname: str = "LIG"
) -> pd.DataFrame:
"""Calculate similarity score for the residues in the binding pocket
Parameters
----------
df : pandas.DataFrame
DataFrame generated from Blast search
pdb_file : str
PDB given as a reference that will be used to dtermine the binding site
ref_chain : str, optional
Identifier of chain that contains/interacts with the ligand , by default "A"
lig_resname : str, optional
Identifier for ligand, by default 'LIG'
"""
import numpy as np
df_new = df.copy()
bs_ref, bs_ref_num = get_bsite(pdb_file, ref_chain, lig_resname)
seq_ref = "".join(bs_ref)
df_new["bsite_score"] = None
df_new["bsite_sequence"] = None
for i, row in df_new.iterrows():
# Find matching residues for each alignment
seq = row["sequence"]
alignment_pw = pairwise2.align.globalms(seq, seq_ref, 1, -1, -1, -0.5)[0]
formatted_alignment = pairwise2.format_alignment(*alignment_pw)
seq_ref_aligned, _, _, _, _ = alignment_pw
alignment_lines = formatted_alignment.split("\n")
alignment_symbols = alignment_lines[1]
matched_ref = [
(i, res)
for i, (res, sym) in enumerate(zip(seq_ref_aligned, alignment_symbols))
if sym == "|" and res != "-"
]
try:
matches_mob = []
matches_mob_res = []
for j, res in matched_ref:
matches_mob_res.append(str(seq)[j])
r = min(bs_ref_num, key=lambda x: abs(j - x))
matches_mob.append(str(seq)[j] == res if abs(j - r) < 10 else False)
total_match = np.count_nonzero(matches_mob)
total_res = len(matched_ref)
match_score = round(100 * total_match / total_res, 2)
df_new.loc[i, "bsite_score"] = match_score
df_new.loc[i, "bsite_sequence"] = "".join(matches_mob_res)
except IndexError:
logging.debug(f"The binding site search was not succesful for {row['ID']}")
return df_new