Source code for omadb.OMARestAPI

'''
    PyOMADB -- client to the OMA browser, using the REST API.

    (C) 2018 Alex Warwick Vesztrocy <alex@warwickvesztrocy.co.uk>

    This file is part of PyOMADB.

    PyOMADB is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PyOMADB is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with PyOMADB.  If not, see <http://www.gnu.org/licenses/>.
'''
from collections import defaultdict, deque
from functools import lru_cache
from io import StringIO
from pprint import pformat
from property_manager import lazy_property
from requests_cache import CachedSession
from tempfile import NamedTemporaryFile, TemporaryDirectory
from urllib.request import quote as uri_quote
from tqdm import tqdm
import appdirs
import itertools
import json
import logging
import numbers
import os
import pandas as pd
import random
import requests
import shutil
import warnings


logging.basicConfig(format="%(levelname)s: %(message)s")
LOG = logging.getLogger(__name__)


def set_log_level(x):
    LOG.setLevel(getattr(logging, x.upper()))


[docs]class AttrDict(object): def __init__(self, d): self.__dictionary__ = d self.get = self.__dictionary__.get self.items = self.__dictionary__.items self.keys = self.__dictionary__.keys self.pop = self.__dictionary__.pop self.values = self.__dictionary__.values self._setup() def _setup(self): for (k, v) in self.__dictionary__.items(): if isinstance(v, dict): self.__dictionary__[k] = self._recurse_create(v) elif isinstance(v, list): self.__dictionary__[k] = [(x if not isinstance(x, dict) else self._recurse_create(x)) for x in v] if hasattr(self, '_setup_extra'): self._setup_extra(k, v) def __len__(self): return len(self.__dictionary__) def __getitem__(self, i): return self.__dictionary__[i] def __setitem__(self, i, d): self.__dictionary__[i] = d def __dir__(self): return self.__dictionary__.keys() def __getattr__(self, attr): try: return self.__dictionary__[attr] except KeyError: return self.__getattribute__(attr) def _recurse_create(self, *args, **kwargs): return type(self)(*args, **kwargs) def __repr__(self): return pformat(self.__dictionary__) def __str__(self): return repr(self.__dictionary__) def _undo_attrdict(self): # This is for strange scenarios where there isn't too much data, # as it creates a copy. y = {} for (k, v) in self.items(): if isinstance(v, AttrDict): y[k] = v._undo_attrdict() else: y[k] = v return y
[docs]class ClientResponse(AttrDict): def __init__(self, response, client=None, _is_paginated=None): self.client = client super().__init__(response) def _recurse_create(self, *args, **kwargs): return type(self)(*args, client=self.client, **kwargs) def __getattr__(self, attr): r = super().__getattr__(attr) return r if not isinstance(r, ClientRequest) else r() def _setup_extra(self, k, v): if isinstance(v, str) and v.startswith(self.client.endpoint): self.__dictionary__[k] = ClientRequest(self.client, v) elif isinstance(v, list): if all(isinstance(x, str) and x.startswith(self.client.endpoint) for x in v): self.__dictionary__[k] = ClientRequestList(self.client, v) def as_dataframe(self, k): ''' Get a dataframe for the list stored in a particular key / attribute. Note: any nested attribute-dictionaries shall have to be converted to dictionaries for compatibility reasons with pandas. :return: data frame for list of entries :rtype: pd.DataFrame ''' if not isinstance(self[k], list): raise TypeError('{} is not a list, cannot convert to ' 'dataframe.'.format(k)) if not isinstance(self[k][0], ClientResponse): raise TypeError('{} is not a list of dictionary-like elements, ' 'cannot convert to dataframe.'.format(k)) return pd.DataFrame.from_records(map(lambda x: x._undo_attrdict(), self[k]))
[docs]class ClientPagedResponse(object): def __init__(self, client, response, progress_desc=None): self.client = client self.response = response self.progress_desc = ('' if progress_desc is None else progress_desc) def _yield_elts(self, f, x, pbar): for e in map(f, x): yield e pbar.update()
[docs] def __iter__(self): ''' Iterates over all entries, silently lazily loading the next page when required. ''' r = self.response x = json.loads(r.content) pbar = tqdm(desc=self.progress_desc, unit=' entries', total=int(r.headers.get('X-Total-Count', len(x))), disable=(len(self.progress_desc) == 0)) yield from self._yield_elts(lambda e: ClientResponse(e, client=self.client), x, pbar) while 'next' in r.links: r = self.client._request(uri=r.links['next']['url'], raw=True) yield from self._yield_elts(lambda e: ClientResponse(e, client=self.client), json.loads(r.content), pbar) pbar.close()
[docs] def as_dataframe(self): ''' Retrieves all entries, from all pages. Returns as pandas data frame. Note: in general, it would be better to use iter_dataframes to deal with the returned entries in chunks (if possible). :return: data frame containing all entries in response :rtype: pd.DataFrame ''' return pd.concat(self.iter_dataframes())
[docs] def iter_dataframes(self): ''' Yields dataframes for each page of response. That is, entries are yielded in chunks. ''' r = self.response x = json.loads(r.content) pbar = tqdm(desc=self.progress_desc, unit=' entries', total=int(r.headers.get('X-Total-Count', len(x))), disable=(len(self.progress_desc) == 0)) yield pd.DataFrame.from_records( self._yield_elts(self.client._add_lazy_calls, x, pbar)) while 'next' in r.links: r = self.client._request(uri=r.links['next']['url'], raw=True) yield pd.DataFrame.from_records( self._yield_elts(self.client._add_lazy_calls, json.loads(r.content), pbar)) pbar.close()
class ClientRequest(object): def __init__(self, client, uri): self.client = client self.uri = uri def __call__(self): return self.client._request(uri=self.uri) def __str__(self): return repr(self) def __repr__(self): return '<API Request {}>'.format(self.uri[len(self.client.endpoint):]) class ClientRequestList(ClientRequest): def __init__(self, client, uris): self.client = client self.uris = uris def __call__(self): return list(map( lambda x: self.client._request(uri=x), self.uris)) def __repr__(self): return '<API Multiple Request>' class ClientException(Exception): pass class ClientTimeout(Exception): pass class NotFound(Exception): pass
[docs]class Client(object): ''' Client for the OMA browser REST API. Initialisation example:: from omadb import Client c = Client() :raises ClientException: for 400, 404, 500 errors. :raises ClientTimeout: for timeout when interacting with REST endpoint. ''' HEADERS = {'Content-type': 'application/json', 'Accept': 'application/json'} TIMEOUT = 60 PER_PAGE = 10000 RAMCACHE_SIZE = 10000
[docs] def __init__(self, endpoint='omabrowser.org/api', persistent_cached=False, persistent_cache_path=None): ''' :param str endpoint: OMA REST API endpoint (default omabrowser.org/api) :param bool persistent_cached: whether to cache queries on disk in SQLite DB. :param persistent_cache_path: location for persistent cache, optional :type persistent_cache_path: str or None ''' self.endpoint = ('https://' + endpoint if not endpoint.startswith('http') else endpoint) from . import __version__ as client_version self.HEADERS['User-agent'] = 'pyomadb/'+client_version self.persistent_cached = persistent_cached if self.persistent_cached: if persistent_cache_path is None: self.CACHE_PATH = appdirs.user_cache_dir('py' + __package__) else: self.CACHE_PATH = os.path.abspath(persistent_cache_path) self._version_check() self._setup_cache() self._setup() self._temp_path = TemporaryDirectory()
[docs] def clear_cache(self): ''' Clear both RAM and persistent cache. ''' self._request_get.cache_clear() if hasattr(self, 'session'): self.session.close() del self.session shutil.rmtree(self.CACHE_PATH) if self.persistent_cached: self._version_check() self._setup_cache()
def _setup_cache(self): os.makedirs(self.CACHE_PATH, exist_ok=True) self.session = CachedSession(cache_name=os.path.join(self.CACHE_PATH, 'api-cache'), backend='sqlite') def _setup(self): self.genomes = Genomes(self) self.entries = self.proteins = Entries(self) self.hogs = HOGs(self) self.groups = OMAGroups(self) self.function = Function(self) self.taxonomy = Taxonomy(self) self.pairwise = PairwiseRelations(self) self.synteny = Synteny(self) self.xrefs = self.external_references = ExternalReferences(self) @lazy_property def version(self): return self._request(action='version') @lazy_property def oma_release(self): return self.version['oma_version'] @lazy_property def api_version(self): return self.version['api_version'] def _version_check(self): db_fn = os.path.join(self.CACHE_PATH, 'oma-version') api_fn = os.path.join(self.CACHE_PATH, 'api-version') if os.path.isdir(self.CACHE_PATH): with open(db_fn, 'rt') as fp: oma_release = next(fp).rstrip() with open(api_fn, 'rt') as fp: api_version = next(fp).rstrip() if ((self.oma_release != oma_release) or (self.api_version != api_version)): warnings.warn('OMA database and / or API updated. ' 'Clearing cache.') shutil.rmtree(self.CACHE_PATH) os.makedirs(self.CACHE_PATH, exist_ok=True) if not os.path.isfile(db_fn): with open(db_fn, 'wt') as fp: fp.write(self.oma_release + '\n') if not os.path.isfile(api_fn): with open(api_fn, 'wt') as fp: fp.write(self.api_version + '\n') def _get_request_uri(self, uri=None, action=None, subject=None, params=None, **kwargs): if uri is None: if action is None or (isinstance(action, list) and len(action) == 0): raise Exception('No action declared.') if isinstance(action, list): uri = '/{}'.format(action[0]) uri += '/{}'.format(subject) if subject is not None else '' if len(action) > 1 and action[1] != 'list': uri += '/{}'.format(action[1]) else: uri = '/{}'.format(action) uri += '/{}'.format(subject) if subject is not None else '' uri = uri_quote(uri) uri = uri + '/' if (not uri.endswith('/') and '?' not in uri) else uri # Parse params params = {} if params is None else params paginated = kwargs.pop('paginated', False) if paginated: params['per_page'] = self.PER_PAGE # Add the rest of kwargs to params params.update(kwargs) return (str(uri), params) def _get_request_data(self, data=None, **kwargs): if not isinstance(data, dict): raise ValueError('Data is not defined.') return json.dumps(data) @lru_cache(RAMCACHE_SIZE) def _request_get(self, url, **params): get = getattr(self, 'session', requests).get LOG.debug(f'Calling GET {url}') return get(url, headers=self.HEADERS, params=params, timeout=self.TIMEOUT) def _request_post(self, url, data): LOG.debug(f'Calling POST {url}') return requests.post(url, data=data, headers=self.HEADERS, timeout=self.TIMEOUT) def _request(self, request_type='get', url=None, **kwargs): raw = kwargs.pop('raw', False) progress_desc = kwargs.pop('progress_desc', '') as_dataframe = kwargs.pop('as_dataframe', False) # Get URI and params if url is None: (uri, params) = self._get_request_uri(**kwargs) url = ((self.endpoint + uri) if not uri.startswith(self.endpoint) else uri) else: params = {} try: if request_type == 'get': r = self._request_get(url, **params) elif request_type == 'post': data = self._get_request_data(**kwargs) r = self._request_post(url, data) else: raise ValueError('Unsure how to deal with request type' '{}'.format(request_type)) except requests.exceptions.Timeout: raise ClientTimeout('API timed out after' '{}s.'.format(self.TIMEOUT)) # Check if response OK response_status = True try: r.raise_for_status() except requests.HTTPError as e: response_status = str(e).split('for url:')[0] if response_status is True: if raw: return r elif kwargs.get('paginated', False) or self._is_paginated(r): return ClientPagedResponse(self, r, progress_desc=progress_desc) else: content = json.loads(r.content) if isinstance(content, list): if as_dataframe: return pd.DataFrame.from_records( map(self._add_lazy_calls, content)) else: return list(map(lambda x: ClientResponse(x, client=self), content)) else: return ClientResponse(content, client=self) else: if r.status_code in {400, 404}: try: content = json.loads(r.content) except json.JSONDecodeError: content = {} z = ClientResponse(content, client=self) if set(z.keys()) == {'detail'}: response_status += '["' + z.detail + '"]' raise ClientException(response_status) def _is_paginated(self, r): return len(set(r.links.keys()) & {'next', 'last'}) == 2 def _add_lazy_calls(self, e): for (k, v) in e.items(): if isinstance(v, str) and v.startswith(self.endpoint): e[k] = ClientRequest(self, v) elif isinstance(v, dict): e[k] = self._add_lazy_calls(v) return e
class CoronaClient(Client): ''' Client for the Corona OMA browser REST API. Initialisation example:: from omadb import CoronaClient c = CoronaClient() :raises ClientException: for 400, 404, 500 errors. :raises ClientTimeout: for timeout when interacting with REST endpoint. ''' def __init__(self, endpoint='corona.omabrowser.org/api', persistent_cached=False, persistent_cache_path=None): ''' :param str endpoint: OMA REST API endpoint (default corona.omabrowser.org/api) :param bool persistent_cached: whether to cache queries on disk in SQLite DB. :param persistent_cache_path: location for persistent cache, optional :type persistent_cache_path: str or None ''' super().__init__(endpoint=endpoint, persistent_cached=persistent_cached, persistent_cache_path=persistent_cache_path) class OMAStageClient(Client): ''' Client for the OMA browser REST API on the oma-stage server. Initialisation example:: from omadb import OMAStageClient c = OMAStageClient() :raises ClientException: for 400, 404, 500 errors. :raises ClientTimeout: for timeout when interacting with REST endpoint. ''' def __init__(self, endpoint='oma-stage.vital-it.ch/api', persistent_cached=False, persistent_cache_path=None): ''' :param str endpoint: OMA REST API endpoint (default oma-stage.vital-it.ch/api) :param bool persistent_cached: whether to cache queries on disk in SQLite DB. :param persistent_cache_path: location for persistent cache, optional :type persistent_cache_path: str or None ''' super().__init__(endpoint=endpoint, persistent_cached=persistent_cached, persistent_cache_path=persistent_cache_path) def _setup(self): # for adding additional functionality that is only on the oma-stage server # e.g., previously was used to add synteny api calls super()._setup() class ClientFunctionSet(object): def __init__(self, client): self._client = client if hasattr(self, '_setup'): self._setup()
[docs]class Genomes(ClientFunctionSet): ''' API functionality for genome information. Access indirectly, via the client. Example:: from omadb import Client c = Client() wheat = c.genomes.genome('WHEAT') '''
[docs] def __getitem__(self, genome_id): ''' Retrieve information on a genome in OMA. :param genome_id: unique identifier for genome, NCBI taxonomic ID or UniProt species code :type genome_id: str or int :return: genome information :rtype: ClientResponse ''' return self.genome(genome_id)
[docs] def __iter__(self): '''Iterate over all genomes.''' yield from self.genomes.items()
@property def list(self): ''' Synonym for `genomes`. Retrieve information on all genomes in OMA. :return: information on all genomes :rtype: ClientResponse ''' return self.genomes
[docs] @lazy_property def genomes(self): ''' Retrieve information on all genomes in OMA. :return: information on all genomes :rtype: ClientResponse ''' r = {sp.pop('code'): sp for sp in self._client._request(action='genome', paginated=True)} return ClientResponse(r, client=self._client)
[docs] def as_dataframe(self): ''' Retrieve information on all genomes in OMA, return as pandas data frame. :return: information on all genomes :rtype: pd.DataFrame ''' return self._client._request(action='genome', paginated=True).as_dataframe()
[docs] def genome(self, genome_id): ''' Retrieve information on a genome in OMA. :param genome_id: unique identifier for genome, NCBI taxonomic ID or UniProt species code :type genome_id: str or int :return: genome information :rtype: ClientResponse ''' return self._client._request(action='genome', subject=genome_id)
[docs] def proteins(self, genome_id, progress=False): ''' Retrive all proteins for a particular genome. :param genome_id: unique identifier for genome, NCBI taxonomic ID or UniProt species code :type genome_id: str or int :param bool progress: whether to show progress bar :return: proteins in genome :rtype: ClientPagedResponse ''' desc = 'Retrieving proteins for {}'.format(genome_id) if progress else '' return self._client._request(action=['genome', 'proteins'], subject=genome_id, paginated=True, progress_desc=desc)
[docs]class HOGs(ClientFunctionSet): ''' API functionality for HOG information. Access indirectly, via the client. Example:: from omadb import Client c = Client() entry = c.hogs['WHEAT00001'] ''' def _ensure_hog_id(self, hog_id): return 'HOG:{:07d}'.format(hog_id) if isinstance(hog_id, numbers.Number) else hog_id
[docs] def __getitem__(self, hog_id): ''' Retrieve the detail available for a given HOG. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int :return: HOG information :rtype: ClientResponse ''' return self.info(hog_id)
[docs] def get_orthoxml(self, hog_id, augmented=False): ''' Retrieve OrthoXML (from browser) for a particular HOG. :param augmented: whether or not to use the augmented version of the orthoxml, that contains more information but is not fully according to the spec. Essentially it also contains orthologGroup nodes that have only one children node. :type augmented: bool :return: OrthoXML :rtype: str ''' url = 'https://omabrowser.org/oma/hog/{}/orthoxml/'.format(self._ensure_hog_id(hog_id)) if augmented: url += "augmented/" return self._client._request(url=url, raw=True).content.decode('ascii')
[docs] def list(self): ''' Retrieve information about all HOGs. :return: all HOGs :rtype: ClientPagedResponse ''' return self.at_level(level=None)
[docs] def at_level(self, level, compare=None, as_dataframe=None): ''' Retrieve list of HOGs at a particular level :param str level: level of interest :param compare: if set to None, or False, returns all HOGs defined at a particular level. If set to a parental taxonomic level, all hogs will be annotated with evolutionary events that occurred between the two points in time. If set to True, will compare with the direct parent (default None) :type compare: None or bool or str :param bool as_dataframe: whether to return as pandas data frame, optional :return: all hogs at a particular level :rtype: ClientPagedResponse ''' if compare is None or not compare: return self._client._request(action='hog', params={'level': level}, paginated=True) else: if not isinstance(compare, str): assert compare == True, 'Do not recognise {} level'.format(compare) # find the parental level t = self._client.taxonomy.dendropy_tree() n = t.find_node_with_taxon_label(level) if n is None: # possibly leaf node, attempt to translate to code n = t.find_node_with_taxon_label(self._client.genomes[level].code) if n is not None: # go up tree until there are two children p = n.parent_node while len(p.child_nodes()) < 2: p = p.parent_node compare = p.taxon.label else: raise ValueError("Unknown level {}".format(compare)) z = self._client._request(action='hog', params={'level': level, 'compare_with': compare}, paginated=True) return z.as_dataframe() if as_dataframe else z
[docs] def info(self, hog_id): ''' Retrieve the detail available for a given HOG. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int :return: HOG information :rtype: ClientResponse ''' return self._client._request(action='hog', subject=self._ensure_hog_id(hog_id))[0]
[docs] def members(self, hog_id, level=None, as_dataframe=None): ''' Retrieve list of protein entries in a given HOG. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int :param str level: level of interest :param bool as_dataframe: whether to return as pandas data frame, optional :return: list of members :rtype: list or pd.DataFrame ''' z = self._client._request(action=['hog', 'members'], subject=self._ensure_hog_id(hog_id), level=level) if as_dataframe: return z.as_dataframe('members') else: return z.members
[docs] def external_references(self, hog_id, type=None): ''' Retrieve external references for all members of a particular HOG. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int :param str level: level of interest :return: external references :rtype: dict ''' return self.xrefs(hog_id, type=type)
[docs] def xrefs(self, hog_id, level=None, type=None, as_dataframe=None): ''' Retrieve external references for all members of a particular HOG. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int :param str level: level of interest :param bool as_dataframe: whether to return as pandas data frame, optional :return: external references :rtype: dict or pd.DataFrame ''' if as_dataframe: if type is not None: return pd.DataFrame.from_records({'omaid': m['omaid'], 'xref': x} for m in self.members(hog_id, level=level) for x in self._client.entries.xrefs(m['entry_nr'], type)) else: return pd.DataFrame.from_records({'omaid': m['omaid'], 'type': xtype, 'xref': x} for m in self.members(hog_id, level=level) for (xtype, xs) in self._client.entries.xrefs(m['entry_nr']).items() for x in xs) else: return {m['omaid']: self._client.entries.xrefs(m['entry_nr'], type) for m in self.members(hog_id, level=level)}
[docs] def analyze(self, hog_id): ''' Use the PyHAM package to analyse a particular hierarchical orthologous group. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int :return: analysis object :rtype: pyham.Ham ''' return self.analyse(hog_id)
[docs] def analyse(self, hog_id): ''' Use the PyHAM package to analyse a particular hierarchical orthologous group. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int :return: analysis object :rtype: pyham.Ham ''' try: from pyham import Ham except ImportError: raise ImportError('For HOG analysis, the pyham package is ' 'required.') z = self[self[hog_id].roothog_id] m = z.members_url.members[0].omaid root_level = z.level try: r = requests.get('https://omabrowser.org/oma/hogs/{}/orthoxml'.format(m), timeout=self._client.TIMEOUT) except requests.exceptions.Timeout: raise ClientTimeout('OrthoXML request timed out after' '{}s.'.format(self.TIMEOUT)) xml = self._client.taxonomy.get(members=[root_level], format='phyloxml').decode('ascii') t = StringIO(xml) return Ham(tree_file=t, hog_file=r.content.decode('utf-8'), orthoXML_as_string=True, tree_format='phyloxml', use_internal_name=True)
[docs] def iham(self, hog_id): ''' Create an iHam page and print path to temporary file. :param hog_id: unique identifier for a HOG, either HOG ID or one of its member proteins :type hog_id: str or int ''' z = self.analyse(hog_id) iham = z.create_iHam(list(z.top_level_hogs.values())[0]) while True: fn = os.path.join(self._client._temp_path.name, 'iham{:06d}.html'.format(random.randint(0,999999))) if not os.path.isfile(fn): break with open(fn, 'wt') as fp: fp.write(iham.HTML) print('{}'.format(fn))
#def gene_ontology(self, hog_id, level=None, aspect=None, as_dataframe=None, # progress=False, **kwargs): # ''' # Retrieve any associations to Gene Ontology terms for a protein. # :param hog_id: a unique identifier for a hog_group - either its hog id starting with "HOG:" or one of its member proteins in which case the specific HOG ID of that protein is used. (Example: HOG:0001221.1a, P12345) # :type hog_id: str or list # :param level: taxonomic level of reference for a HOG - default is its deepest level for a given HOG ID. (Example: Mammalia) # :type level: str or list # :param str aspect: GO aspect - biological process (BP), cellular component (CC), molecular function (MF) # :param bool as_dataframe: whether to return as pandas data frame, optional # :param bool progress: whether to show a progress bar during load (default False) # :return: gene ontology associations # :rtype: list or pd.DataFrame or goatools.go_enrichment.GOEnrichmentStudy # ''' # if not isinstance(hog_id, list): # # get individual # z = self._client._request(action=['hog', 'gene_ontology'], # subject=hog_id, # params={'level': level}, # as_dataframe=as_dataframe) # if aspect is None or len(z) == 0: # return z # # Translate # aspects = {'bp': 'biological_process', # 'cc': 'cellular_component', # 'mf': 'molecular_function'} # valid_aspects = set(aspects.values()) # aspect = aspects.get(aspect.lower(), aspect) # assert (aspect in valid_aspects), 'Unknown aspect: {}'.format(aspect) # if as_dataframe: # return z[z.aspect == aspect] # else: # return list(filter(lambda x: x.aspect == aspect, z)) # else: # # get multiple # if not isinstance(level, list): # level = itertools.repeat(level, len(hog_id)) # else: # assert (len(hog_id) == len(level)), 'HOG IDs and level must be same length if given as list' # # # now load multiple # if as_dataframe: # dfs = [] # for x in tqdm(zip(hog_id, level), desc='Retrieving GO', # disable=(not progress)): # df = self.gene_ontology(hog_id=x[0], # level=x[1], # aspect=aspect, # as_dataframe=True) # df['hog_id'] = x[0] # df['level'] = x[1] # df = df.set_index(['hog_id', 'level']) # dfs.append(df) # return pd.concat(dfs) # else: # return {x: self.gene_ontology(hog_id=x[0], # level=x[1], # aspect=aspect) # for x in tqdm(zip(hog_id, level), # desc='Retrieving GO', # disable=(not progress))}
[docs]class Entries(ClientFunctionSet): ''' API functionality for protein entries. Access indirectly, via the client. Example:: from omadb import Client c = Client() entry = c.entries['WHEAT00001'] '''
[docs] def __getitem__(self, entry_id): ''' Retrieve the information available for a protein entry. :param entry_id: a unique identifier for a protein :type entry_id: str or int :return: entry information :rtype: ClientResponse ''' return self.info(entry_id)
[docs] def info(self, entry_id): ''' Retrieve the information available for a protein entry. :param entry_id: a unique identifier for a protein :type entry_id: str or int :return: entry information :rtype: ClientResponse ''' if isinstance(entry_id, numbers.Number) or isinstance(entry_id, str): return self._client._request(action='protein', subject=entry_id) else: is_list = isinstance(entry_id, list) entry_ids = list(entry_id) z = self._client._request(action='protein', subject='bulk_retrieve', data={'ids': entry_ids}, request_type='post') if not is_list: # set, dictionary keys, etc. anything that list(x) works on return {x.query_id: x.target for x in z} else: # possibly out of order e2i = dict(map(lambda x: (x[1], x[0]), enumerate(entry_ids))) res = [None] * len(entry_ids) for x in z: res[e2i[x.query_id]] = x.target return res
[docs] def domains(self, entry_id): ''' Retrieve the domains present in a protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :return: domain information :rtype: ClientResponse ''' return self._client._request(action=['protein', 'domains'], subject=entry_id)
@lazy_property def _gene_ontology(self): from goatools.obo_parser import GODag url = 'http://purl.obolibrary.org/obo/go/go-basic.obo' try: r = requests.get(url, timeout=self._client.TIMEOUT) except requests.exceptions.Timeout: raise ClientTimeout('Gene Ontology download request ' 'timed out after ' '{}s.'.format(self.TIMEOUT)) with NamedTemporaryFile(suffix='.obo') as fp: fp.write(r.content) return GODag(fp.name)
[docs] def gene_ontology(self, entry_id, aspect=None, as_dataframe=None, as_goatools=None, as_goea=None, progress=False, **kwargs): ''' Retrieve any associations to Gene Ontology terms for a protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int or list :param str aspect: GO aspect - biological process (BP), cellular component (CC), molecular function (MF) :param bool as_dataframe: whether to return as pandas data frame, optional :param bool as_goea: whether to return a GOEnrichmentAnalysis object, optional :param bool as_goatools: whether to return as GOATOOLS GOEA object, optional (deprecated) :param bool progress: whether to show a progress bar during load (default False) :return: gene ontology associations :rtype: list or pd.DataFrame or goatools.go_enrichment.GOEnrichmentStudy ''' if isinstance(entry_id, list): assert (not ((as_goea or as_goatools) and as_dataframe)), 'Cannot load both GOATOOLS and data frame!' if as_dataframe: dfs = [] for x in tqdm(entry_id, desc='Retrieving GO', disable=(not progress)): df = self.gene_ontology(x, aspect=aspect, as_dataframe=True) df['query_id'] = x df = df.set_index('query_id') dfs.append(df) return pd.concat(dfs) else: z = {x: self.gene_ontology(x, aspect=aspect) for x in tqdm(entry_id, desc='Retrieving GO', disable=(not progress))} if as_goea: # new wrapper to goatools to give df as res methods = kwargs.get('methods', ['fdr_bh']) return GOEnrichmentAnalysis(z, self._gene_ontology, methods) elif as_goatools: # for backwards compatibility keep this from goatools.go_enrichment import GOEnrichmentStudy methods = kwargs.get('methods', ['fdr_bh']) goea = GOEnrichmentStudy(z.keys(), {k: {x.GO_term for x in v} for (k, v) in z.items()}, self._gene_ontology, methods=methods) return goea else: return z else: if as_goatools: raise ValueError('Not possible to load GOEA object for single ' 'entry.') z = self._client._request(action=['protein', 'gene_ontology'], subject=entry_id, as_dataframe=as_dataframe) if aspect is None: return z # Translate aspects = {'bp': 'biological_process', 'cc': 'cellular_component', 'mf': 'molecular_function'} valid_aspects = set(aspects.values()) aspect = aspects.get(aspect.lower(), aspect) assert (aspect in valid_aspects), 'Unknown aspect: {}'.format(aspect) if as_dataframe: return z[z.aspect == aspect] else: return list(filter(lambda x: x.aspect == aspect, z))
[docs] def homoeologs(self, entry_id): ''' Retrieve all homoeologs for a given protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :return: list of homoeologs :rtype: ClientResponse ''' return self.homoeologues(entry_id)
[docs] def homoeologues(self, entry_id): ''' Retrieve all homoeologues for a given protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :return: list of homoeologues :rtype: ClientResponse ''' return self._client._request(action=['protein', 'homoeologs'], subject=entry_id)
[docs] def orthologs(self, entry_id, rel_type=None): ''' Retrieve list of all identified orthologs of a protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :param rel_type: relationship type to filter to ('1:1', '1:many', 'many:1', or 'many:many') :type rel_type: str or None :return: list of orthologs :rtype: ClientResponse ''' return self.orthologues(entry_id, rel_type=rel_type)
[docs] def orthologues(self, entry_id, rel_type=None): ''' Retrieve list of all identified orthologues of a protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :param rel_type: relationship type ('1:1', '1:many', 'many:1', or 'many:many'), optional :type rel_type: str or None :return: list of orthologues :rtype: ClientResponse ''' return self._client._request(action=['protein', 'orthologs'], subject=entry_id, params=({'rel_type': rel_type} if rel_type is not None else None))
[docs] def hog_derived_orthologs(self, entry_id): ''' Retrieve list of all orthologs derived from the HOG for a given protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :return: list of orthologs :rtype: ClientResponse ''' return self.hog_derived_orthologues(entry_id)
[docs] def hog_derived_orthologues(self, entry_id): ''' Retrieve list of all orthologues derived from the HOG for a given protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :return: list of orthologues :rtype: ClientResponse ''' return self._client._request(action=['protein', 'hog_derived_orthologs'], subject=entry_id)
[docs] def cross_references(self, entry_id, type=None): ''' Retrieve all cross-references for a protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :param type: specify type of cross-references to retain :type type: str or None :return: cross references :rtype: dict or set ''' return self.xrefs(entry_id, type=type)
[docs] def xrefs(self, entry_id, type=None): ''' Retrieve all cross-references for a protein. :param entry_id: a unique identifier for a protein :type entry_id: str or int :param type: specify type of cross-references to retain :type type: str or None :return: cross references :rtype: dict or set ''' def reformat(r): z = defaultdict(list) for x in r: z[x['source']].append(x['xref']) return z r = self._client._request(action=['protein', 'xref'], subject=entry_id) if type is not None: return set(map(lambda x: x['xref'], filter(lambda x: x['source'] == type, r))) else: return reformat(r)
[docs] def search(self, sequence, search=None, full_length=None): ''' Search for closest sequence in OMA database. :param str query: query sequence :param search: search strategy ('exact, 'approximate', 'mixed' [Default]) :type search: str or None :param bool full_length: indicates if exact matches have to be full length (by default, not) :return: closest entries :rtype: ClientResponse ''' params = {'query': sequence, 'full_length': bool(full_length)} search_valid = {'exact', 'approximate', 'mixed'} search = search.lower() if search is not None else None if (search not in search_valid and search is not None): raise ValueError('{} is not a valid search method. Choose from {}' .format(search, ', '.join(search_valid))) elif search is not None: params['search'] = search return self._client._request(action='sequence', params=params)
[docs]class Function(ClientFunctionSet): ''' API functionality for retrieving functional annotations for sequences. Access indirectly, via the client. Example:: from omadb import Client c = Client() gos = c.function('ATCATATCAT') '''
[docs] def __call__(self, seq, as_dataframe=None): ''' Annotate a sequence with GO terms based on annotations stored in the OMA database. :param str query: query sequence :param bool as_dataframe: whether to return as pandas data frame, optional :return: results of fast function prediction :rtype: list or pd.DataFrame ''' return self._client._request(action='function', params={'query': seq}, as_dataframe=as_dataframe)
[docs]class OMAGroups(ClientFunctionSet): ''' API functionality for retrieving information on OMA groups. Access indirectly, via the client. Example:: from omadb import Client c = Client() og = c.groups['WHEAT00001'] '''
[docs] def __getitem__(self, group_id): ''' Retrieve information available for a given OMA group. :param group_id: unique identifier of a group - either group number, fingerprint or entry ID of a member. :type group_id: int or str :return: group information :rtype: ClientResponse ''' return self.info(group_id)
[docs] def __iter__(self): ''' Iterate over all OMA groups in the current release. :yields: groups ''' yield from self._client._request(action='group', paginated=True)
[docs] def info(self, group_id): ''' Retrieve information available for a given OMA group. :param group_id: unique identifier of a group - either group number, fingerprint or entry ID of a member. :type group_id: int or str :return: group information :rtype: ClientResponse ''' return self._client._request(action='group', subject=group_id)
[docs] def close_groups(self, group_id, as_dataframe=None): ''' Retrieve the sorted list of closely related groups for a given OMA group. :param group_id: unique identifier of a group - either group number, fingerprint or entry ID of a member. :type group_id: int or str :param bool as_dataframe: whether to return as pandas data frame, optional :return: sorted list of closely related groups :rtype: list or pd.DataFrame ''' return self._client._request(action=['group', 'close_groups'], subject=group_id, as_dataframe=as_dataframe)
[docs]class Taxonomy(ClientFunctionSet):
[docs] def get(self, members=None, format=None, collapse=None): ''' Retrieve taxonomy in a particular format and return as string. :param list members: list of members to get the induced taxonomy for, optional :param str format: format of the taxonomy (dictionary [default], newick or phyloxml) :param bool collapse: whether or not to collapse levels with single child, optional (default yes) :return: taxonomy :rtype: str ''' members = (','.join(members) if members is not None else None) x = self._client._request(action='taxonomy', params={'members': members, 'type': format, 'collapse': collapse}, raw=((format is not None) and (format != 'dictionary'))) if format is None or format == 'dictionary': return x elif format == 'newick': return json.loads(x.content)['newick'] elif format == 'phyloxml': return x.content else: # This shouldn't really happen (Dec 2018) return x
[docs] def read(self, root, format=None, collapse=None): ''' Retrieve taxonomy in a particular format and return as string. :param root: taxon ID, species name or UniProt species code for root taxonomic level, optional :type root: str or int :param str format: format of the taxonomy (dictionary [default], newick or phyloxml) :param bool collapse: whether or not to collapse levels with single child, optional (default yes) :return: taxonomy :rtype: str ''' x = self._client._request(action=['taxonomy', root], params={'type': format, 'collapse': collapse}, raw=((format is not None) and (format != 'dictionary'))) if format is None or format == 'dictionary': return x elif format == 'newick': return json.loads(x.content)['newick'] elif format == 'phyloxml': return x.content else: # This shouldn't really happen (Dec 2018) return x
[docs] def dendropy_tree(self, members=None, root=None, with_names=None): ''' Retrieve taxonomy and load as dendropy tree. :param list members: list of members to get the induced taxonomy for, optional :param root: taxon ID, species name or UniProt species code for root taxonomic level, optional :type root: str or int or None :param bool with_names: whether to use species code (False, default) or species names (True), optional :return: taxonomy loaded as dendropy tree object :rtype: dendropy.Tree ''' try: import dendropy except ImportError: raise ImportError('Optional dependency of dendropy not installed.') def get_names(tree, with_names): internal_names = [tree.name] leaf_names = [] nodes = tree.children while len(nodes) > 0: nodes1 = [] for n in nodes: if hasattr(n, 'children'): nodes1 += n.children internal_names.append(n.name) elif with_names: leaf_names.append(n.name) else: leaf_names.append(n.code) nodes = nodes1 return (internal_names + leaf_names) if ((members is not None) and (root is not None)): raise ValueError('Taxonomy undefined in API when members and ' 'root are both set.') elif members is not None: struct = self.get(members=members) elif root is not None: struct = self.read(root) else: struct = self.get() taxon_namespace = dendropy.TaxonNamespace(get_names(struct, with_names)) tree = dendropy.Tree(taxon_namespace=taxon_namespace) tree.seed_node.taxon = taxon_namespace.get_taxon(struct.name) nodes = [(tree.seed_node, c) for c in struct.children] while len(nodes) > 0: nodes1 = [] for (parent, child) in nodes: ch = parent.new_child(edge_length=1) if hasattr(child, 'children'): ch.taxon = taxon_namespace.get_taxon(child.name) nodes1 += [(ch, c) for c in child.children] elif with_names: ch.taxon = taxon_namespace.get_taxon(child.name) else: ch.taxon = taxon_namespace.get_taxon(child.code) nodes = nodes1 return tree
[docs] def ete_tree(self, members=None, root=None, with_names=None): ''' Retrieve taxonomy and load as ete3 tree. :param list members: list of members to get the induced taxonomy for, optional :param root: taxon ID, species name or UniProt species code for root taxonomic level, optional :type root: str or int or None :param bool with_names: whether to use species code (False, default) or species names (True), optional :return: taxonomy loaded as ete tree object :rtype: ete.Tree ''' try: import ete3 except ImportError: raise ImportError('Optional dependency of ete3 not installed.') def get_names(tree, with_names): internal_names = [tree.name] leaf_names = [] nodes = tree.children while len(nodes) > 0: nodes1 = [] for n in nodes: if hasattr(n, 'children'): nodes1 += n.children internal_names.append(n.name) elif with_names: leaf_names.append(n.name) else: leaf_names.append(n.code) nodes = nodes1 return (internal_names + leaf_names) if ((members is not None) and (root is not None)): raise ValueError('Taxonomy undefined in API when members and ' 'root are both set.') elif members is not None: struct = self.get(members=members) elif root is not None: struct = self.read(root) else: struct = self.get() tree = ete3.Tree() tree.name = struct.name nodes = [(tree, c) for c in struct.children] while len(nodes) > 0: nodes1 = [] for (parent, child) in nodes: ch = parent.add_child(dist=1) if hasattr(child, 'children'): ch.name = child.name nodes1 += [(ch, c) for c in child.children] elif with_names: ch.name = child.name else: ch.name = child.code nodes = nodes1 return tree
[docs]class ExternalReferences(ClientFunctionSet): ''' API functionality for external references from a query sequence. Access indirectly, via the client. Example:: from omadb import Client c = Client() xrefs = c.xrefs('AAA') '''
[docs] def __call__(self, sequence): ''' Retrieve external references for a particular sequence. :param str sequence: query sequence or pattern :param bool as_dataframe: whether to return as pandas data frame, optional :return: list of cross references and match information :rtype: ClientResponse ''' return self._client._request(action='xref', params={'search': sequence})
[docs]class PairwiseRelations(ClientFunctionSet): ''' API functionality for pairwise relations.. Access indirectly, via the client. Example:: from omadb import Client c = Client() arath_wheat_pairs = list(c.pairwise('ARATH', 'WHEAT', progress=True)) '''
[docs] def __call__(self, genome_id1, genome_id2, chr1=None, chr2=None, rel_type=None, progress=False): ''' List the pairwise relations among two genomes. If genome_id1 == genome_id2, relations are close paralogues and homoeologues. If different, the relations are orthologues. By using the paramaters `chr1` and `chr2`, it is possible to limit the relations to a certain chromosome for one or both genomes. The ID of the chromosome corresponds to the IDs in, for example:: from omadb import Client c = Client() r = c.genomes.genome('HUMAN') human_inparalogues = list(c.pairwise('HUMAN', 'HUMAN', chr1=r.chromosomes[0].id, chr2=r.chromosomes[3].id, progress=True)) :param genome_id1: unique identifier for first genome - either NCBI taxonomic identifier or UniProt species code. :type genome_id1: int or str :param genome_id2: unique identifier for second genome - either NCBI taxonomic identifier or UniProt species code. :type genome_id2: int or str :param chr1: ID of chromosome of interest in first genome :type chr1: str or None :param chr2: ID of chromosome of interest in second genome :type chr2: str or None :param rel_type: relationship type ('1:1', '1:many', 'many:1', or 'many:many'), optional :param rel_type: str or None :param bool progress: whether to show a progress bar during load (default False) :return: generator of pairwise relations. :rtype: ClientPagedResponse ''' return self._client._request(action=['pairs', genome_id2], subject=genome_id1, params={'chr1': chr1, 'chr2': chr2, 'rel_type': rel_type}, paginated=True, progress_desc=('Loading pairs' if progress else None))
[docs]class GOEnrichmentAnalysis(object): ''' Wrapper to GOATOOLs to make it more user-friendly with the API. ''' def __init__(self, annots, go, methods): from goatools.go_enrichment import GOEnrichmentStudy self._goea = GOEnrichmentStudy(annots.keys(), {k: {x.GO_term for x in v} for (k, v) in annots.items()}, go, methods=methods)
[docs] def run_study(self, foreground): ''' Runs the GOEA study and returns a dataframe of results. :param foreground: unique identifiers for proteins in foreground set :type foreground: list :return: enrichment analysis results :rtype: pd.DataFrame ''' res = self._goea.run_study(foreground) return pd.DataFrame.from_records((dict(zip(z.get_prtflds_default(), z.get_field_values(z.get_prtflds_default()))) for z in res))
[docs]class Synteny(ClientFunctionSet): ''' API functionality for loading synteny data. Access indirectly, via the client. Example:: from omadb import Client c = Client() xrefs = c.synteny.xrefs('AAA') ''' EVIDENCE_TYPES = {'linearized', 'parsimonious', 'any'} def _parse_graph(self, z): import networkx as nx if len(z.nodes) > 1: # all nodes should be connected when we have more than 1 node in the list G = nx.from_pandas_edgelist( pd.DataFrame(dict(x) for x in z.links), source='source', target='target', edge_attr=['weight', 'evidence'] ) else: # add a single node graph G = nx.Graph() G.add_node(z.nodes[0].id) attributes = set(z.nodes[0].keys()) - {'id'} for k in attributes: nx.set_node_attributes(G, values={x['id']: x[k] for x in z.nodes}, name=k) # add ability to lazy call for information about the hog nx.set_node_attributes( G, values={x['id']: ClientRequest( self._client, ( self._client.endpoint + self._client._get_request_uri(action='hog', subject=x['id'])[0]) ) for x in z.nodes }, name='info' ) return G
[docs] def at_level(self, level, evidence='linearized', break_circular_contigs=True): ''' Retrieve list of HOGs at a particular level :param level: taxonomic level of interest. Ancestral levels accept scientific name or numeric TaxID. Extant genomes also accept UniProt 5-letter species codes. :type level: str or int :param str evidence: evidence value for the ancestral synteny graph, used for filtering. ('linearized', 'parsimonious', 'any'). (default 'linearized') :param bool break_circular_contigs: whether to break ancestral contigs on the weakest edge, when ancestral contigs end up being circular. This has no effect if evidence is not set to linearized. (default True) :return: networkx graph of synteny :rtype: ClientResponse ''' assert evidence in self.EVIDENCE_TYPES, 'Evidence must be one of: {}'.format(', '.join(self.EVIDENCE_TYPES)) z = self._client._request(action='synteny', params={'level': level, 'evidence': evidence, 'break_circular_contigs': break_circular_contigs}) # return a composition of all contigs import networkx as nx return nx.compose_all(list(map(self._parse_graph, z)))
[docs] def neighborhood(self, id, level, evidence='linearized', context=2, break_circular_contigs=True): ''' Retrieve neighborhood around a HOG or protein at a particular level. :param str id: unique identifier for either a HOG (for ancestral synteny), or a protein (for extant synteny) :param level: taxonomic level of interest. Ancestral levels accept scientific name or numeric TaxID. Extant genomes also accept UniProt 5-letter species codes. :type level: str or int :param str evidence: evidence value for the ancestral synteny graph, used for filtering. ('linearized', 'parsimonious', 'any'). (default 'linearized') :param int context: size of the graph around the query HOG, in terms of number of edges. (default 2) :param bool break_circular_contigs: whether to break ancestral contigs on the weakest edge, when ancestral contigs end up being circular. This has no effect if evidence is not set to linearized. (default True) :return: networkx graph of synteny :rtype: ClientResponse ''' return self.neighbourhood(id, level, evidence, context, break_circular_contigs)
[docs] def neighbourhood(self, id, level, evidence='linearized', context=2, break_circular_contigs=True): ''' Retrieve neighbourhood around a HOG or protein at a particular level. :param str id: unique identifier for either a HOG (for ancestral synteny), or a protein (for extant synteny) :param level: taxonomic level of interest. Ancestral levels accept scientific name or numeric TaxID. Extant genomes also accept UniProt 5-letter species codes. :type level: str or int :param str evidence: evidence value for the ancestral synteny graph, used for filtering. ('linearized', 'parsimonious', 'any'). (default 'linearized') :param int context: size of the graph around the query HOG, in terms of number of edges. (default 2) :param bool break_circular_contigs: whether to break ancestral contigs on the weakest edge, when ancestral contigs end up being circular. This has no effect if evidence is not set to linearized. (default True) :return: networkx graph of synteny :rtype: ClientResponse ''' assert evidence in self.EVIDENCE_TYPES, 'Evidence must be one of: {}'.format(', '.join(self.EVIDENCE_TYPES)) z = self._client._request(action='synteny', subject=id, params={'level': level, 'evidence': evidence, 'context': context, 'break_circular_contigs': break_circular_contigs}) import networkx as nx return self._parse_graph(z)
[docs] def window(self, id, level, n=2): ''' Retrieve window around a HOG or protein at a particular level. :param str id: unique identifier for either a HOG (for ancestral synteny), or a protein (for extant synteny) :param level: taxonomic level of interest. Ancestral levels accept scientific name or numeric TaxID. Extant genomes also accept UniProt 5-letter species codes. :type level: str or int :param int n: size of the window +-n around the query HOG. (default 2) :return: list of HOGs :rtype: ClientResponse ''' G = self.neighbourhood(id, level, context=n) hog_order = deque([id]) # get the direct neighbours direct_neighbours = list(G.adj[id].keys()) if len(direct_neighbours) == 2: (before, after) = direct_neighbours elif len(direct_neighbours) == 1: before = None after = direct_neighbours[0] else: before = after = None before1 = after1 = id seen = {id} i = 0 while i < n: # find next if before is not None: hog_order.appendleft(before) # find next before2 = set(G.adj[before].keys()) - {before1} before1 = before before = before2.pop() if len(before2) > 0 else None if after is not None: hog_order.append(after) # find next after2 = set(G.adj[after].keys()) - {after1} after1 = after after = after2.pop() if len(after2) > 0 else None i += 1 return list(map(self._client.hogs.__getitem__, hog_order))