"""
A mix of functions to make querying the database and analyzing the results
faster.
Primary query functions:
`get_studies()`:
Allows querying the Study table by a combination of population and
phenotype variables.
`get_snps()`:
Take a study list (possibly from `get_studies`) and return a SNP list
or dataframe.
Helpful additional functions:
`intersecting_phenos()`:
Return a list of phenotypes or phenotype categories present in all
queried populations.
`write_study_dict()`:
Write the dictionary returned from `get_studies(dictionary=True)` to
a tab delimited file with extra data from the database.
Lookup functions:
`lookup_rsid()`, `lookup_location()` and `lookup_studies()` allow the
querying of the database for specific SNPs and can return customized
information on them.
MyVariant:
`get_variant_info()`:
Use myvariant to get variant info for a list of SNPs.
DataFrame Manipulation:
`collapse_dataframe()`:
Collapse a dataframe (such as that returned by `get_snps()`) to include
only a single entry per SNP (collapsing multiple studies into one).
`intersect_overlapping_series()`:
Merge two sets of pvalues (such as those from `collapse_dataframe()`)
into a single merged dataframe with the original index and one column
for each pvalue. Good for plotting.
"""
from time import sleep as _sleep
import pandas as _pd
import myvariant as _mv
try:
import numpy as _np
from matplotlib import pyplot as _plt
except ImportError:
pass
from . import db as _db
from . import ref as _ref
from . import tables as t
__all__ = ["get_studies", "get_snps", "get_variant_info",
"intersecting_phenos", "lookup_rsid", "lookup_location",
"lookup_studies", "write_study_dict", "collapse_dataframe",
"intersect_overlapping_series"]
###############################################################################
# Retrieve SNPs and Study Data #
###############################################################################
[docs]def get_studies(primary_phenotype=None, pheno_cats=None, pheno_cats_alias=None,
primary_pop=None, has_pop=None, only_pop=None,
has_disc_pop=None, has_rep_pop=None, only_disc_pop=None,
only_rep_pop=None, query=False, count=False, dictionary=False,
pandas=False):
"""Return a list of studies filtered by phenotype and population.
There are two ways to query both phenotype and population.
Phenotype:
GRASP provides a 'primary phenotype' for each study, which are fairly
poorly curated. They also provide a list of phenotype categories, which
are well curated. The problem with the categories is that there are
multiple per study and some are to general to be useful. If using
categories be sure to post filter the study list.
Note: I have made a list of aliases for the phenotype categories to
make them easier to type. Use pheno_cats_alias for that.
Population:
Each study has a primary population (list available with
'get_populations') but some studies also have other populations in the
cohort. GRASP indexes all population counts, so those can be used to
query also. To query these use `has_` or `only_` (exclusive) parameters,
you can query either discovery populations or replication populations.
Note that you cannot provide both `has_` and `only_` parameters for the
same population type.
For doing population specific analyses most of the time you will want
the excl_disc_pop query.
Argument Description:
Phenotype Arguments are 'primary_phenotype', 'pheno_cats', and
'pheno_cats_alias'.
Only provide one of pheno_cats or pheno_cats_alias
Population Arguments are `primary_pop`, `has_pop`, `only_pop`,
`has_disc_pop`, `has_rep_pop`, `only_disc_pop`, `only_rep_pop`.
`primary_pop` is a simple argument, the others use bitwise flags for
lookup.
`has_pop` and `only_pop` simpply combine both the discovery and
replication population lookups.
The easiest way to use the `has_` and `only_` parameters is with the
PopFlag object. It uses `py-flags`. For example::
pops = PopFlag.eur | PopFlag.afr
In addition you can provide a list of strings corresponding to PopFlag
attributes.
Note: the `only_` parameters work as ANDs, not ORs. So
only_disc_pop='eur|afr' will return those studies that have BOTH
european and african discovery populations, but no other discovery
populations. On the other hand, `has_` works as an OR, and will return
any study with any of the specified populations.
Args:
primary_phenotype: Phenotype of interest, string or list of strings.
pheno_cats: Phenotype category of interest.
pheno_cats_alias: Phenotype category of interest.
primary_pop: Query the primary population, string or list of
strings.
has_pop: Return all studies with these populations
only_pop: Return all studies with these populations
has_disc_pop: Return all studies with these discovery populations
has_rep_pop: Return all studies with these replication
populations
only_disc_pop: Return all studies with ONLY these discovery
populations
only_rep_pop: Return all studies with ONLY these replication
populations
query: Return the query instead of the list of study
objects.
count: Return a count of the number of studies.
dictionary: Return a dictionary of title->id for filtering.
pandas: Return a dataframe of study information instead
of the list.
Returns:
A list of study objects, a query, or a dataframe.
"""
s, e = _db.get_session()
if (has_disc_pop and only_disc_pop) or (has_rep_pop and only_rep_pop):
raise KeywordError('Cannot use both has_ and only_ args')
# Create a query, we will build it iteratively
q = s.query(t.Study)
# Phenotype queries
if primary_phenotype:
if isinstance(primary_phenotype, (list, tuple)):
q = q.filter(
t.Study.phenotype.has(
t.Phenotype.phenotype.in_(primary_phenotype)
)
)
else:
q = q.filter(t.Study.phenotype.has(phenotype=primary_phenotype))
if pheno_cats:
if isinstance(pheno_cats, (list, tuple)):
q = q.filter(
t.Study.phenotype_cats.any(
t.PhenoCats.category.in_(pheno_cats)
)
)
else:
q = q.filter(t.Study.phenotype_cats.any(category=pheno_cats))
if pheno_cats_alias:
if isinstance(pheno_cats, (list, tuple)):
q = q.filter(
t.Study.phenotype_cats.any(
t.PhenoCats.alias.in_(pheno_cats)
)
)
else:
q = q.filter(t.Study.phenotype_cats.any(alias=pheno_cats_alias))
# Population queries
if primary_pop:
if isinstance(primary_pop, (list, tuple)):
q = q.filter(
t.Study.population.has(
t.Population.population.in_(primary_pop)
)
)
else:
q = q.filter(t.Study.population.has(population=primary_pop))
# Bitwise population queries
if has_pop:
pop_flags = get_pop_flags(has_pop)
q = q.filter(t.Study.disc_pop_flag.op('&')(int(pop_flags)))
elif only_pop:
pop_flags = get_pop_flags(only_pop)
q = q.filter(
t.Study.disc_pop_flag.is_(int(pop_flags))
)
if has_disc_pop:
pop_flags = get_pop_flags(has_disc_pop)
q = q.filter(t.Study.disc_pop_flag.op('&')(int(pop_flags)))
elif only_disc_pop:
pop_flags = get_pop_flags(only_disc_pop)
q = q.filter(
t.Study.disc_pop_flag.is_(int(pop_flags))
)
if has_rep_pop:
pop_flags = get_pop_flags(has_rep_pop)
q = q.filter(t.Study.disc_pop_flag.op('&')(int(pop_flags)))
elif only_rep_pop:
pop_flags = get_pop_flags(only_rep_pop)
q = q.filter(
t.Study.disc_pop_flag.is_(int(pop_flags))
)
# Query is built, now we decide how to return it.
if query:
return q
elif count:
return q.count()
elif dictionary:
return {i.title: i.id for i in q.all()}
elif pandas:
return _pd.read_sql(q.statement, e)
else:
return q.all()
[docs]def get_snps(studies, pandas=True):
"""Return a list of SNPs in a single population in a single phenotype.
:studies: A list of studies.
:pandas: Return a dataframe instead of a list of SNP objects.
:returns: Either a DataFrame or list of SNP objects.
"""
s, e = _db.get_session()
if isinstance(studies, dict):
studies = list(studies.values())
if not isinstance(studies, (list, tuple)):
studies = [studies]
if isinstance(studies[0], t.Study):
studies = [i.id for i in studies]
studies = list(studies)
if pandas:
dfs = []
for small_studies in _chunks(studies, 999):
dfs.append(_pd.read_sql(
s.query(
t.SNP.id, t.SNP.chrom, t.SNP.pos, t.SNP.snpid,
t.SNP.study_snpid, t.SNP.pval, t.SNP.study_id,
t.SNP.InGene, t.SNP.InMiRNA, t.SNP.InLincRNA, t.SNP.LSSNP,
t.SNP.phenotype_desc
).filter(
t.SNP.study_id.in_(small_studies)
).statement, e, index_col='id'))
return _pd.concat(dfs)
else:
snps = []
for small_studies in _chunks(studies, 999):
snps += s.query(t.SNP).filter(
t.SNP.study_id.in_(small_studies)
).all()
return snps
def intersecting_phenos(check, primary_pops=None, pop_flags=None,
exclusive=False, pop_type='all',list_only=False):
"""Return a list of phenotypes that are present in all populations.
Can only provide one of primary_pops or pop_flags. pop_flags does a
bitwise lookup, primary_pops queries the primary string only.
By default this function returns a list of phenotype categories, if you
want to check primary phenotypes instead, provide check='primary'.
Args:
check: cat/primary either check categories or primary phenos.
primary_pops: A string or list of strings corresponding to the
`tables.Study.phenotype` column
pop_flags: A `ref.PopFlag` object or list of objects.
pop_type: all/disc/rep Use with pop_flags only, check either
discovery or replication populations or both.
exclusive: Use with pop_flags only, do an exclusive rather than
inclusion population search
list_only: Return a list of names only, rather than a list of
objects
Returns:
A list of `table.Phenotype` or `table.PhenoCat` objects, or a list of
names if `list_only` is specified.
"""
# Check arguments
if primary_pops and pop_flags:
raise KeywordError("Cannot specify both 'primary_pops' and " +
"'pop_flags'")
if not primary_pops and not pop_flags:
raise KeywordError("Must provide at least one of 'primary_pops' or " +
"'pop_flags'")
if check not in ['cat', 'primary']:
raise KeywordError("'check' must be one of ['cat', 'primary']")
# Pick query type
if pop_flags:
if pop_type not in ['all', 'disc', 'rep']:
raise KeywordError("pop_type must be one of ['all','disc','rep']")
l = 'only' if exclusive else 'has'
key = '{}_pop'.format(l) if pop_type == 'all' else\
'{}_{}_pop'.format(l, pop_type)
qpops = [get_pop_flags(i) for i in pop_flags]
else:
key = 'primary_pop'
qpops = primary_pops
# Check that we have an iterable
if not isinstance(qpops, (list, tuple)):
raise KeywordError('Population query must be a list or tuple')
# Get phenotype lists and intersect to form a final set
# We use IDs here because it makes the set intersection more robust
final_set = set()
for pop in qpops:
p = []
for i in get_studies(**{key: pop}):
if check == 'cat':
p += i.phenotype_cats
else:
p.append(i.phenotype)
out = set([i.id for i in p])
if not final_set:
final_set = out
else:
final_set &= out
# Get the final phenotype list
s, _ = _db.get_session()
phenos = []
for id_list in _chunks(list(final_set), 999):
table = t.Phenotype if check == 'primary' else t.PhenoCats
phenos += s.query(table).filter(table.id.in_(id_list)).all()
# Return the list
if list_only:
return [i.phenotype for i in phenos] if check == 'primary' \
else [i.category for i in phenos]
else:
return phenos
###############################################################################
# Lookup Functions #
###############################################################################
[docs]def lookup_rsid(rsid, study=False, columns=None, pandas=False):
"""Query database by rsID.
Args:
rsID (str): An rsID or list of rsIDs
study (bool): Include study info in the output.
columns (list): A list of columns to include in the query. Default is
all. List must be made up of column objects, e.g.
[t.SNP.snpid, t.Study.id]
pandas (bool): Return a dataframe instead of a list of SNPs
Returns:
list: List of SNP objects
"""
s, e = _db.get_session()
if not columns:
columns = [t.SNP, t.Study] if study else [t.SNP]
q = s.query(*columns)
if study:
q = q.filter(
t.SNP.study_id == t.Study.id
)
if isinstance(rsid, str):
q = q.filter(t.SNP.study_snpid == rsid)
elif isinstance(rsid, (list, tuple)):
q = q.filter(t.SNP.study_snpid.in_(rsid))
else:
raise ValueError('rsid must be either list or string')
if pandas:
return _pd.read_sql(q.statement, e, index_col='study_snpid')
else:
q.all()
[docs]def lookup_location(chrom, position, study=False, columns=None, pandas=False):
"""Query database by location.
Args:
chrom (str): The chromosome as an int or string (e.g. 1 or chr1)
position (int): Either one location, a list of locations, or a range
of locations, range can be expressed as a tuple of two
ints, a range object, or a string of 'int-int'
study (bool): Include study info in the output.
columns (list): A list of columns to include in the query. Default is
all. List must be made up of column objects, e.g.
[t.SNP.snpid, t.Study.id]
pandas (bool): Return a dataframe instead of a list of SNPs
Returns:
list: List of SNP objects
"""
s, e = _db.get_session()
if isinstance(chrom, str) and chrom.startswith('chr'):
chrom = chrom[3:]
chrom = str(chrom)
if not columns:
columns = [t.SNP, t.Study] if study else [t.SNP]
if study:
q = s.query(*columns).filter(
t.SNP.chrom == chrom
).filter(
t.SNP.study_id == t.Study.id
)
else:
q = s.query(*columns).filter(
t.SNP.chrom == chrom
)
if isinstance(position, int):
q = q.filter(t.SNP.pos == position)
elif isinstance(position, list):
q = q.filter(t.SNP.pos.in_(position))
elif isinstance(position, str):
if position.isdigit():
q = q.filter(t.SNP.pos == int(position))
else:
start, end = position.split('-')
q = q.filter(t.SNP.pos.between(int(start), int(end)))
elif isinstance(position, range):
print(position.start, position.stop)
q = q.filter(t.SNP.pos.between(position.start, position.stop))
elif isinstance(position, tuple):
assert len(position) == 2
q = q.filter(t.SNP.pos.between(int(position[0]), int(position[1])))
else:
raise ValueError("'position' must be an int, str, range, list, or " +
"tuple. Is {}".format(type(position)))
if pandas:
return _pd.read_sql(q.statement, e, index_col='study_snpid')
else:
return q.all()
[docs]def lookup_studies(title=None, study_id=None, columns=None, pandas=False):
"""Find all studies matching either title or id.
Args:
title (str): The study title, string or list of strings.
study_id (int): The row ID, usually the PMID, int or list of ints.
columns (list): A list of columns to include in the query. Default is
all. List must be made up of column objects, e.g.
[t.SNP.snpid, t.Study.id]
pandas (bool): Return a dataframe instead of a list of SNPs
Returns:
DataFrame or list: All matching studies as either a dataframe or a list
"""
s, e = _db.get_session()
if not columns:
columns = [t.Study]
if not isinstance(columns, (list, tuple)):
columns = [columns]
q = s.query(*columns)
if title:
if isinstance(title, dict):
title = list(title.keys())
if isinstance(title, str):
title = [title]
if not isinstance(title, (tuple, list)):
raise KeywordError('title argument must be a string or list')
q = q.filter(t.Study.title.in_(title))
if study_id:
if isinstance(study_id, int):
study_id = [study_id]
elif isinstance(study_id, str):
study_id = [int(study_id)]
if not isinstance(study_id, (tuple, list)):
raise KeywordError('title argument must be an int or list')
study_id = [int(i) for i in study_id]
q = q.filter(t.Study.title.in_(study_id))
if pandas:
return _pd.read_sql(q.statement, e, index_col='id')
else:
return q.all()
###############################################################################
# MyVariant Queries #
###############################################################################
[docs]def get_variant_info(snp_list, fields='dbsnp', pandas=True):
"""Get variant info for a list of SNPs.
Args:
snp_list: A list of SNP objects or SNP rsIDs
fields: Choose fields to display from:
`<docs.myvariant.info/en/latest/doc/data.html#available-fields>`_
Good choices are 'dbsnp', 'clinvar', or 'gwassnps'
Can also use 'grasp' to get a different version of this
info.
pandas: Return a dataframe instead of dictionary.
Returns:
A dictionary or a dataframe.
"""
mv = _mv.MyVariantInfo()
if isinstance(snp_list, _pd.DataFrame):
try:
snps = list(snp_list.study_snpid)
except AttributeError:
snps = list(snp_list.index)
elif isinstance(snp_list[0], t.SNP):
snps = [i.study_snpid for i in snp_list]
else:
snps = snp_list
assert isinstance(snps, (list, tuple))
dfs = []
for q in _chunks(snps, 999):
dfs.append(mv.querymany(q, scopes='dbsnp.rsid', fields=fields,
as_dataframe=pandas, df_index=True))
if len(snps) > 999:
_sleep(2)
if pandas:
return _pd.concat(dfs)
else:
if len(dfs) > 1:
return dfs
else:
return dfs[0]
###############################################################################
# DataFrame Manipulations #
###############################################################################
[docs]def collapse_dataframe(df, mechanism='median', pvalue_filter=None,
protected_columns=None):
"""Collapse a dataframe by chrom:location from get_snps.
Will use the mechanism defined by 'mechanism' to collapse a dataframe
to one indexed by 'chrom:location' with pvalue and count only.
This function is agnostic to all dataframe columns other than::
['chrom', 'pos', 'snpid', 'pval']
All other columns are collapsed into a comma separated list, a string.
'chrom' and 'pos' are merged to become the new colon-separated index,
snpid is maintained, and pval is merged using the function in 'mechanism'.
Args:
df: A pandas dataframe, must have 'chrom', 'pos',
'snpid', and 'pval' columns.
mechanism: A numpy statistical function to use to collapse the
pvalue, median or mean are the common ones.
pvalue_filter: After collapsing the dataframe, filter to only
include pvalues less than this cutoff.
protected_columns: A list of column names that will be maintained as is,
although all duplicates will be dropped (randomly).
Only makes sense for columns that are identical
for all studies of the same SNP.
Returns:
DataFrame: Indexed by chr:pos, contains flattened pvalue column, and
all original columns as a comma-separated list. Additionally
contains a count and stddev (of pvalues) column. stddev is
nan if count is 1.
"""
# Copy the dataframe to avoid clobbering it
df = df.sort_values(['chrom', 'pos'])
# Add the location column, which will become the index
df['location'] = df.apply(_create_loc, axis=1)
# Move all of the columns we want to maintain to a separate df and drop
# dups
if protected_columns:
prot_cols = []
for i in protected_columns:
if i not in ['location', 'snpid', 'chrom', 'pos', 'pval']:
prot_cols.append(i)
df_prot = df[prot_cols]
df_prot = df_prot.drop_duplicates('location')
df_prot.set_index('location', inplace=True)
df.drop(prot_cols, axis=1, inplace=True)
# Protect SNPID, chrom, and location
df_snp = df[['location', 'snpid', 'chrom', 'pos']]
df_snp = df_snp.drop_duplicates('location')
df_snp.set_index('location', inplace=True)
df.drop(['snpid', 'chrom', 'pos'], axis=1, inplace=True)
# Flatten the pvalue
df_p = df[['location', 'pval']]
df.drop('pval', axis=1, inplace=True)
df_p = df_p.groupby('location').aggregate([mechanism, 'std', 'count'])
df_p.columns = ['pval', 'std', 'count']
# Flatten the remainder
df = df.groupby('location').aggregate(_aggregate_strings)
# Recombine
comb_list = [df_snp, df_p, df]
if protected_columns:
comb_list.insert(2, df_prot)
df = _pd.concat(comb_list, axis=1)
# Filter
if pvalue_filter:
df = df[df.pval < pvalue_filter]
# Done
return df
[docs]def intersect_overlapping_series(series1, series2, names=None,
stats=True, plot=None, name=None):
"""Plot all SNPs that overlap between two pvalue series.
Args:
series{1,2} (Series): A pandas series object
names (list): A list of two names to use for the resultant
dataframes
stats (bool): Print some stats on the intersection
plot (str): Plot the resulting intersection, path to save
figure to (must end in .pdf/.png). Numpy and
Matplotlib are required.
name (str): A name for the plot, optional.
Returns:
DataFrame: with the two series as columns
"""
if isinstance(series1, _pd.DataFrame):
series1 = series1.pval
if isinstance(series2, _pd.DataFrame):
series2 = series2.pval
assert isinstance(series1, _pd.Series)
assert isinstance(series2, _pd.Series)
df = _pd.concat([series1, series2], join='inner', axis=1)
names = names if names else ['series1', 'series2']
assert isinstance(names, list)
df.columns = names
if stats:
print("Series 1 len: {}".format(len(series1)))
print("Series 2 len: {}".format(len(series2)))
print("Intersected df len: {}".format(len(df)))
if plot:
name = name if name else ''
pdf = -_np.log10(df)
lmax = pdf.max().max() + 2
f, a = _plt.subplots(figsize=(10,10))
pdf.plot(x=names[0], y=names[1], ax=a, kind='scatter')
a.set_title(name)
a.set_xlabel("{} | -log10 pval".format(names[0]))
a.set_ylabel("{} | -log10 pval".format(names[1]))
a.set_xlim(0, lmax)
a.set_ylim(0, lmax)
_plt.grid()
f.savefig(plot)
_plt.show()
return df
###############################################################################
# Helper Functions #
###############################################################################
[docs]def write_study_dict(study_dict, outfile):
"""Write a study dictionary from `get_studies(dictionary=True)` to file.
Looks up studies in the Study table first.
Args:
study_dict (dict): A dictionary of title=>id from the Study table.
outfile: A valid path to a file with write permission.
Outputs:
A tab delimited file of ID, Title, Author, Journal, Discovery
Population, Replication Population, SNP_Count
Returns:
None
"""
columns = [t.Study.id, t.Study.title, t.Study.author, t.Study.journal,
t.Study.sample_size, t.Study.replication_size,
t.Study.snp_count]
studies = lookup_studies(study_id=[i for i in study_dict.values()],
columns=columns, pandas=True)
studies.to_csv(outfile, sep='\t')
def get_pop_flags(pop_flags):
"""Merge a list, string, int, or PopFlag series."""
if isinstance(pop_flags, _ref.PopFlag):
return pop_flags
if not isinstance(pop_flags, (list, tuple)):
pop_flags = [pop_flags]
final_flag = _ref.PopFlag(0)
for pflag in pop_flags:
if isinstance(pflag, int):
pflag = _ref.PopFlag(pflag)
elif isinstance(pflag, str):
pflag = _ref.PopFlag().from_simple_str(pflag)
if isinstance(pflag, _ref.PopFlag):
final_flag |= pflag
else:
raise TypeError('Could not convert population flag into PopFlag')
return final_flag
def _aggregate_strings(x):
"""Converts records to comma separated string."""
return ','.join(set([str(i) for i in x]))
def _create_loc(x):
"""Merge chrom and pos columns into 'chrom:pos'."""
return '{}:{}'.format(x.chrom, x.pos)
def _chunks(l, n):
n = max(1, n)
return (l[i:i+n] for i in range(0, len(l), n))
###############################################################################
# Exception Definition #
###############################################################################
class KeywordError(Exception):
"""For incorrect combinations of arguments."""
pass