Source code for cellmaps_ppidownloader.gene

import re
import csv
import logging
import mygene
from tqdm import tqdm

from cellmaps_ppidownloader.exceptions import CellMapsPPIDownloaderError

logger = logging.getLogger(__name__)


[docs] class GeneQuery(object): """ Gets information about genes from mygene """ def __init__(self, mygeneinfo=mygene.MyGeneInfo()): """ Constructor """ self._mg = mygeneinfo
[docs] def querymany(self, queries, species=None, scopes=None, fields=None): """ Simple wrapper that calls MyGene querymany returning the results :param queries: list of gene ids/symbols to query :type queries: list :param species: :type species: str :param scopes: :type scopes: str :param fields: :type fields: list :return: dict from MyGene usually in format of :rtype: list """ mygene_out = self._mg.querymany(queries, scopes=scopes, fields=fields, species=species) return mygene_out
[docs] def get_symbols_for_genes(self, genelist=None, scopes='_id'): """ Queries for genes via GeneQuery() object passed in via constructor :param genelist: genes to query for valid symbols and ensembl ids :type genelist: list :param scopes: field to query on _id for gene id, ensemble.gene for ENSEMBLE IDs :type scopes: str :return: result from mygene which is a list of dict objects where each dict is of format: .. code-block:: { 'query': 'ID', '_id': 'ID', '_score': #.##, 'ensembl': { 'gene': 'ENSEMBLEID' }, 'symbol': 'GENESYMBOL' } :rtype: list """ res = self.querymany(genelist, species='human', scopes=scopes, fields=['ensembl.gene', 'symbol']) return res
[docs] class GeneNodeAttributeGenerator(object): """ Base class for GeneNodeAttribute Generator """ def __init__(self): """ Constructor """ pass
[docs] @staticmethod def add_geneids_to_set(gene_set=None, ambiguous_gene_dict=None, geneid=None): """ Examines **geneid** passed in and if a comma exists in value split by comma and assume multiple genes. Adds those genes into **gene_set** and add entry to **ambiguous_gene_dict** with key set to each gene name and value set to original **geneid** value :param gene_set: unique set of genes :type gene_set: set :param geneid: name of gene or comma delimited string of genes :type geneid: str :return: genes found in **geneid** or None if **gene_set** or **geneid** is ``None`` :rtype: list """ if gene_set is None: return None if geneid is None: return None split_str = re.split('\W*,\W*', geneid) gene_set.update(split_str) if ambiguous_gene_dict is not None: if len(split_str) > 1: for entry in split_str: ambiguous_gene_dict[entry] = geneid return split_str
[docs] def get_gene_node_attributes(self): """ Should be implemented by subclasses :raises NotImplementedError: Always """ raise NotImplementedError('Subclasses should implement')
[docs] class APMSGeneNodeAttributeGenerator(GeneNodeAttributeGenerator): """ Creates APMS Gene Node Attributes table """ GENEID_COL1 = 'GeneID1' GENEID_COL2 = 'GeneID2' SYMBOL_COL1 = 'Symbol1' SYMBOL_COL2 = 'Symbol2' BAITLIST_GENE_SYMBOL = 'GeneSymbol' BAITLIST_GENE_ID = 'GeneID' BAITLIST_NUM_INTERACTORS = '# Interactors' def __init__(self, apms_edgelist=None, apms_baitlist=None, genequery=GeneQuery()): """ Constructor :param apms_edgelist: list of dict elements where each dict is of format: .. code-block:: {'GeneID1': VAL, 'Symbol1': VAL, 'GeneID2': VAL, 'Symbol2': VAL} :type apms_edgelist: list :param apms_baitlist: list of dict elements where each dict is of format: .. code-block:: { 'GeneSymbol': VAL, 'GeneID': VAL, 'NumIteractors': VAL } :type apms_baitlist: list :param genequery: """ super().__init__() self._apms_edgelist = apms_edgelist self._apms_baitlist = apms_baitlist self._genequery = genequery
[docs] @staticmethod def get_apms_edgelist_from_tsvfile(tsvfile=None, geneid_one_col=GENEID_COL1, symbol_one_col=SYMBOL_COL1, geneid_two_col=GENEID_COL2, symbol_two_col=SYMBOL_COL2): """ Generates list of dicts by parsing TSV file specified by **tsvfile** with the format header column and corresponding values: .. code-block:: GeneID1\tSymbol1\tGeneID2\tSymbol2 :param tsvfile: Path to TSV file with above format :type tsvfile: str :return: list of dicts, with each dict of format: .. code-block:: {'GeneID1': VAL, 'Symbol1': VAL, 'GeneID2': VAL, 'Symbol2': VAL} :rtype: list """ edgelist = [] with open(tsvfile, 'r') as f: reader = csv.DictReader(f, delimiter='\t') for row in reader: edgelist.append({'GeneID1': row[geneid_one_col], 'Symbol1': row[symbol_one_col], 'GeneID2': row[geneid_two_col], 'Symbol2': row[symbol_two_col]}) return edgelist
[docs] @staticmethod def get_apms_baitlist_from_tsvfile(tsvfile=None, symbol_col=BAITLIST_GENE_SYMBOL, geneid_col=BAITLIST_GENE_ID, numinteractors_col=BAITLIST_NUM_INTERACTORS): """ Generates list of dicts by parsing TSV file specified by **tsvfile** with the format header column and corresponding values: .. code-block:: GeneSymbol\tGeneID\t# Interactors :param tsvfile: Path to TSV file with above format :type tsvfile: str :return: list of dicts, with each dict of format: .. code-block:: { 'GeneSymbol': VAL, 'GeneID': VAL, 'NumIteractors': VAL } :rtype: list """ edgelist = [] if tsvfile is not None: with open(tsvfile, 'r') as f: reader = csv.DictReader(f, delimiter='\t') for row in reader: edgelist.append({'GeneSymbol': row[symbol_col], 'GeneID': row[geneid_col], 'NumInteractors': row[numinteractors_col]}) return edgelist
[docs] def get_apms_edgelist(self): """ Gets apms edgelist passed in via constructor :return: :rtype: list """ return self._apms_edgelist
def _get_unique_genelist_from_edgelist(self): """ Gets unique list of genes from edge list along with a dict for ambiguous genes which have multiple names. For the ambiguous genes the dict is of format: ``{'GENEID': 'AMBIGUOUS ID aka x,y,z'}`` :return: (list of genes, dict of ambiguous genes) :rtype: list """ gene_set = set() ambiguous_gene_dict = {} for row in self._apms_edgelist: GeneNodeAttributeGenerator.add_geneids_to_set(gene_set=gene_set, ambiguous_gene_dict=ambiguous_gene_dict, geneid=row['GeneID1']) GeneNodeAttributeGenerator.add_geneids_to_set(gene_set=gene_set, ambiguous_gene_dict=ambiguous_gene_dict, geneid=row['GeneID2']) return list(gene_set), ambiguous_gene_dict def _get_apms_bait_set(self): """ Gets unique set of baits :return: :rtype: set """ bait_set = set() for entry in self._apms_baitlist: bait_set.add(entry['GeneID']) return bait_set def _process_query_results(self, query_res): """ Processes the results from a gene symbol query, organizing the data into mappings between queries, symbols, and Ensembl IDs, while capturing errors for any entries that lack necessary information. This method iterates over the query results, constructing three main dictionaries: - A mapping from query strings to the corresponding gene name or query (if symbol is missing). - A mapping from gene name to sets of queries that resulted in those symbols (maps gene name back to query gene node attributes and filters it by GENE SYMBOL, column has associated ensembl ID(s) to keep track). - A mapping from gene n to sets of associated Ensembl IDs. Entries without an 'ensembl' field are skipped, and an error is logged for each skipped entry. :param query_res: A list of dictionaries, each representing a query result. :type query_res: list :return: A tuple containing mappings of query to symbol, symbol to queries, symbol to Ensembl IDs, and a list of errors. :rtype: (dict, dict, dict, list) """ errors = [] query_symbol_dict = {} symbol_query_dict = {} symbol_ensembl_dict = {} for x in query_res: if 'symbol' not in x: symbol = x['query'] else: symbol = x['symbol'] if 'ensembl' not in x: errors.append('Skipping ' + str(x) + ' no ensembl in query result: ' + str(x)) logger.error(errors[-1]) continue if x['query'] in query_symbol_dict: continue # duplicate query, just take first result query_symbol_dict[x['query']] = symbol if symbol not in symbol_query_dict: symbol_query_dict[symbol] = set() symbol_query_dict[symbol].add(x['query']) if symbol not in symbol_ensembl_dict: symbol_ensembl_dict[symbol] = set() if len(x['ensembl']) > 1: for g in x['ensembl']: symbol_ensembl_dict[symbol].add(g['gene']) else: symbol_ensembl_dict[symbol].add(x['ensembl']['gene']) return query_symbol_dict, symbol_query_dict, symbol_ensembl_dict, errors def _create_gene_node_attributes_dict(self, symbol_query_dict, symbol_ensembl_dict, bait_set, ambiguous_gene_dict): """ Compiles gene node attributes into a dictionary based on several mappings and the fold. It loops through unique gene symbols, make gene nodes attribute dictionary that contains gene symbol, ensembl ids, antibodies, ambiguous gene symbols and image filenames. :param symbol_query_dict: Mapping of gene symbols to their queries. :param symbol_ensembl_dict: Mapping of gene symbols to Ensembl IDs. :param bait_set: Set with boolean values, indicating bait proteins with True :param ambiguous_gene_dict: Mapping of ambiguous genes. :return: A dictionary of gene node attributes. :rtype: dict """ gene_node_attrs = {} for symbol, queries in symbol_query_dict.items(): ensemble_str = ','.join(sorted(symbol_ensembl_dict[symbol])) for query in queries: ambiguous_str = '' if query in ambiguous_gene_dict: ambiguous_str = ambiguous_gene_dict[query] gene_node_attrs[query] = {'name': symbol, 'represents': ensemble_str, 'ambiguous': ambiguous_str, 'bait': query in bait_set} return gene_node_attrs
[docs] def get_gene_node_attributes(self): """ Gene gene node attributes which is output as a list of dicts in this format: .. code-block:: { 'GENEID': { 'name': 'GENESYMBOL', 'represents': 'ensemble:ENSEMBLID1;ENSEMBLID2..', 'ambiguous': 'ALTERNATE GENEs' } } :return: (list of dicts containing gene node attributes, list of str describing any errors encountered) :rtype: tuple """ t = tqdm(total=2, desc='Get updated gene symbols', unit='steps') try: t.update() genelist, ambiguous_gene_dict = self._get_unique_genelist_from_edgelist() t.update() query_res = self._genequery.get_symbols_for_genes(genelist=genelist) bait_set = self._get_apms_bait_set() query_symbol_dict, symbol_query_dict, symbol_ensembl_dict, errors = self._process_query_results(query_res) gene_node_attrs = self._create_gene_node_attributes_dict(symbol_query_dict, symbol_ensembl_dict, bait_set, ambiguous_gene_dict) return gene_node_attrs, errors finally: t.close()
[docs] class CM4AIGeneNodeAttributeGenerator(GeneNodeAttributeGenerator): """ Creates APMS Gene Node Attributes table from CM4AI data """ def __init__(self, apms_edgelist=None, genequery=GeneQuery()): """ Constructor :param apms_edgelist: list of dict elements where each dict is of format: .. code-block:: {'Bait': VAL, 'Prey': VAL, 'logOddsScore': VAL, 'FoldChange.x': VAL, 'BFDR.x': VAL} :type apms_edgelist: list :param genequery: """ super().__init__() self._raw_apms_edgelist = apms_edgelist self._apms_edgelist = None self._genequery = genequery
[docs] @staticmethod def get_apms_edgelist_from_tsvfile(tsvfile=None, bait_col='Bait', prey_col='Prey', bfdr_col=None, foldchange_col=None, foldchange_cutoff=0.0, bfdr_maxcutoff=0.05): """ Generates list of dicts by parsing TSV file specified by **tsvfile** with the format header column and corresponding values: .. code-block:: Bait\tPrey\tBFDR.x\tFoldChange.x .. note:: If BFDR.x column does not exist, no BFDR filtering will occur Same goes if FoldChange.x column does not exist :param tsvfile: Path to TSV file with above format :type tsvfile: str :param bait_col: Name of bait column :type bait_col: str :param prey_col: Name of prey column :type prey_col: str :param bfdr_col: Name of BFDR aka false discovery rate column If ``None`` no BFDR filtering will occur :type bfdr_col: str :param foldchange_col: Name of FoldChange column If ``None`` no FoldChange filtering will occur :type foldchange_col: str :param foldchange_cutoff: Foldchange cutoff. Only keep rows with values greater then this value. If this value is ``None`` no filtering will occur :type foldchange_cutoff: float :param bfdr_maxcutoff: BFDR cutoff. Only keep rows with BFDR less then or equal to this value. If this value is ``None`` no filtering will occur :type bfdr_maxcutoff: float :return: list of dicts, with each dict of format: .. code-block:: {'Bait': VAL, 'Prey': VAL} :rtype: list """ edgelist = [] with open(tsvfile, 'r') as f: reader = csv.DictReader(f, delimiter='\t') for row in reader: if bfdr_col is not None and bfdr_col in row \ and row[bfdr_col] > bfdr_maxcutoff: continue if foldchange_col is not None and foldchange_col in row \ and row[foldchange_col] <= foldchange_cutoff: continue edgelist.append({'Bait': row[bait_col], 'Prey': row[prey_col]}) return edgelist
def _get_unique_set_from_raw_edgelist(self, colname=None): """ Given a column name **colname** extract unique set of values from raw apms edgelist passed in via constructor :return: :rtype: set """ col_set = set() for entry in self._raw_apms_edgelist: col_set.add(entry[colname]) return col_set def _get_baits_to_ensemblsymbolmap(self): """ Get unique set of bait names from raw apms edgelist and query mygene to get symbols and ensembl gene ids :return: original bait name to mapped to tuple (id, symbol, ensembl gene id) :rtype: dict """ bait_set = self._get_unique_set_from_raw_edgelist('Bait') res = self._genequery.get_symbols_for_genes(list(bait_set), scopes='symbol') bait_to_id = {} for entry in res: bait_to_id[entry['query']] = (entry['_id'], entry['symbol'], entry['ensembl']['gene']) return bait_to_id def _get_prey_to_ensemblsymbolmap(self): """ Get unique set of prey names from raw apms edgelist and query mygene to get symbols and ensembl gene ids :return: original bait name to mapped to tuple (id, symbol, ensembl gene id) :rtype: dict """ prey_set = self._get_unique_set_from_raw_edgelist('Prey') res = self._genequery.get_symbols_for_genes(list(prey_set), scopes='uniprot') prey_to_id = {} for entry in res: ensemblstr = '' if 'ensembl' not in entry: logger.error(str(entry) + ' no ensembl found') continue if isinstance(entry['ensembl'], list): ensemblstr += ';'.join([g['gene'] for g in entry['ensembl']]) else: ensemblstr = entry['ensembl']['gene'] prey_to_id[entry['query']] = (entry['_id'], entry['symbol'], ensemblstr) return prey_to_id
[docs] def get_apms_edgelist(self): """ Gets apms edgelist :return: :rtype: list """ if self._apms_edgelist is not None: return self._apms_edgelist # we need to generate this list baits_to_idmap = self._get_baits_to_ensemblsymbolmap() prey_set = self._get_unique_set_from_raw_edgelist('Prey') prey_to_idmap = self._get_prey_to_ensemblsymbolmap() self._apms_edgelist = [] for row in self._raw_apms_edgelist: if row['Bait'] not in baits_to_idmap: logger.warning('Bait ' + str(row['Bait']) + ' not in map. Skipping') continue if row['Prey'] not in prey_to_idmap: logger.warning('Prey ' + str(row['Prey'] + ' not in map. Skipping')) continue bait_tuple = baits_to_idmap[row['Bait']] prey_tuple = prey_to_idmap[row['Prey']] self._apms_edgelist.append({'GeneID1': bait_tuple[0], 'Symbol1': bait_tuple[1], 'Ensembl1': bait_tuple[2], 'GeneID2': prey_tuple[0], 'Symbol2': prey_tuple[1], 'Ensembl2': prey_tuple[2]}) return self._apms_edgelist
def _get_apms_bait_set(self): """ Gets unique set of baits :return: :rtype: set """ bait_set = set() for entry in self._apms_baitlist: bait_set.add(entry['GeneID']) return bait_set
[docs] def get_gene_node_attributes(self): """ Gene gene node attributes which is output as a list of dicts in this format: .. code-block:: { 'GENEID': { 'name': 'GENESYMBOL', 'represents': 'ensemble:ENSEMBLID1;ENSEMBLID2..', 'ambiguous': 'ALTERNATE GENEs', 'bait': True or False} } :return: (list of dicts containing gene node attributes, list of str describing any errors encountered) :rtype: tuple """ self.get_apms_edgelist() errors = [] gene_node_attrs = {} for i in ['1', '2']: if i == '1': bait = True else: bait = False for x in self._apms_edgelist: if x['GeneID' + i] in gene_node_attrs: continue ensemblstr = 'ensembl:' + x['Ensembl' + i] gene_node_attrs[x['GeneID' + i]] = {'name': x['Symbol' + i], 'represents': ensemblstr, 'ambiguous': '', 'bait': bait} return gene_node_attrs, errors