Coverage for stdpopsim/genomes.py : 52%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Infrastructure for defining information about species' genomes and genomic
3regions to be simulated.
4"""
5import logging
6import warnings
7import attr
9import numpy as np
10import msprime
12import stdpopsim
14logger = logging.getLogger(__name__)
17@attr.s
18class Genome:
19 """
20 Class representing the genome for a species.
22 :ivar chromosomes: A list of :class:`.Chromosome` objects.
23 :vartype chromosomes: list
24 :ivar citations: A list of :class:`.Citation` objects
25 providing the source for the genome assembly,
26 mutation rate and recombination rate estimates.
27 :vartype citations: list
28 :ivar length: The total length of the genome.
29 :vartype length: int
30 """
32 # TODO document the assembly_name and accession
34 chromosomes = attr.ib(factory=list)
35 assembly_name = attr.ib(default=None, kw_only=True)
36 assembly_accession = attr.ib(default=None, kw_only=True)
37 citations = attr.ib(factory=list, kw_only=True)
39 @staticmethod
40 def from_data(genome_data, *, recombination_rate, mutation_rate, citations):
41 """
42 Construct a Genome object from the specified dictionary of
43 genome information from Ensembl, recombination_rate and
44 mutation_rate dictionaries.
46 This method is for internal use only.
47 """
48 chr_names = set(genome_data["chromosomes"].keys())
49 assert set(recombination_rate.keys()) == chr_names
50 assert set(mutation_rate.keys()) == chr_names
51 chromosomes = []
52 for name, data in genome_data["chromosomes"].items():
53 chromosomes.append(
54 Chromosome(
55 id=name,
56 length=data["length"],
57 synonyms=data["synonyms"],
58 mutation_rate=mutation_rate[name],
59 recombination_rate=recombination_rate[name],
60 )
61 )
62 return Genome(
63 chromosomes=chromosomes,
64 assembly_name=genome_data["assembly_name"],
65 assembly_accession=genome_data["assembly_accession"],
66 citations=citations,
67 )
69 @property
70 def length(self):
71 return sum(chrom.length for chrom in self.chromosomes)
73 def __str__(self):
74 s = "Chromosomes:\n"
75 length_sorted = sorted(self.chromosomes, key=lambda x: -x.length)
76 for chrom in length_sorted:
77 s += "\t{}\n".format(chrom)
78 return s
80 def get_chromosome(self, id):
81 """
82 Returns the chromosome with the specified ``id``.
84 :param str id: The string ID of the chromosome.
85 A complete list of chromosome IDs for each species can be found in the
86 "Genome" subsection for the species in the :ref:`sec_catalog`.
87 :rtype: :class:`Chromosome`
88 :return: A :class:`Chromosome` that defines properties of the chromosome
89 such as length, mutation rate, and recombination rate.
90 """
91 for chrom in self.chromosomes:
92 if chrom.id == id or id in chrom.synonyms:
93 return chrom
94 raise ValueError("Chromosome not found")
96 @property
97 def mean_recombination_rate(self):
98 """
99 The length-weighted mean recombination rate across all chromosomes.
100 """
101 length = self.length
102 mean_recombination_rate = 0
103 for chrom in self.chromosomes:
104 normalized_weight = chrom.length / length
105 cont = chrom.recombination_rate * normalized_weight
106 mean_recombination_rate += cont
107 return mean_recombination_rate
109 @property
110 def mean_mutation_rate(self):
111 """
112 The length-weighted mean mutation rate across all chromosomes.
113 """
114 length = self.length
115 mean_mutation_rate = 0
116 for chrom in self.chromosomes:
117 normalized_weight = chrom.length / length
118 cont = chrom.mutation_rate * normalized_weight
119 mean_mutation_rate += cont
120 return mean_mutation_rate
123@attr.s
124class Chromosome:
125 """
126 Class representing a single chromosome for a species.
128 :ivar str ~.id: The string identifier for the chromosome.
129 :ivar int length: The length of the chromosome.
130 :ivar float mutation_rate: The mutation rate used when simulating this
131 chromosome.
132 :ivar float recombination_rate: The recombination rate used when simulating
133 this chromosome (if not using a genetic map).
134 :ivar synonyms: List of synonyms that may be used when requesting this
135 chromosome by ID, e.g. from the command line interface.
136 :vartype synonyms: list of str
137 """
139 id = attr.ib(type=str, kw_only=True)
140 length = attr.ib(kw_only=True)
141 recombination_rate = attr.ib(type=float, kw_only=True)
142 mutation_rate = attr.ib(type=float, kw_only=True)
143 synonyms = attr.ib(factory=list, kw_only=True)
146@attr.s(kw_only=True)
147class Contig:
148 """
149 Class representing a contiguous region of genome that is to be simulated.
150 This contains the information about mutation rates, distributions of
151 fitness effects (DFEs), and recombination rates that are needed to simulate
152 this region.
154 Information about targets of selection are contained in the ``dfe_list``
155 and ``interval_list``. These must be of the same length, and the k-th DFE
156 applies to the k-th interval; see :meth:`.add_dfe` for more information.
158 :ivar mutation_rate: The rate of mutation per base per generation.
159 :vartype mutation_rate: float
160 :ivar recombination_map: The recombination map for the region. See the
161 :class:`msprime.RateMap` for details.
162 :vartype recombination_map: msprime.RateMap
163 :ivar mask_intervals: Intervals to keep in simulated tree sequence, as a list
164 of (left_position, right_position), such that intervals are non-overlapping
165 and in ascending order. Should have shape Nx2, where N is the number of
166 intervals.
167 :vartype mask_intervals: array-like (?)
168 :ivar exclude: If True, ``mask_intervals`` specify regions to exclude. If False,
169 ``mask_intervals`` specify regions in keep.
170 :vartype exclude: bool
171 :ivar dfe_list: a list of :class:`.DFE` objects.
172 By default, the only DFE is completely neutral.
173 :vartype dfe_list: list
174 :ivar interval_list: A list of :class:`np.array` objects containing integers.
175 By default, the inital interval list spans the whole chromosome with the
176 neutral DFE.
178 .. note::
179 To run stdpopsim simulations with alternative, user-specified mutation
180 or recombination rates, a new contig can be created based on an existing
181 one. For instance, the following will create a ``new_contig`` that,
182 when simulated, will have double the mutation rate of the ``old_contig``:
184 .. code-block:: python
186 new_contig = stdpopsim.Contig(
187 mutation_rate=old_contig.mutation_rate * 2,
188 recombination_map=old_contig.recombination_map,
189 genetic_map=old_contig.genetic_map,
190 )
191 """
193 recombination_map = attr.ib()
194 mutation_rate = attr.ib(type=float)
195 genetic_map = attr.ib(default=None)
196 inclusion_mask = attr.ib(default=None)
197 exclusion_mask = attr.ib(default=None)
198 dfe_list = attr.ib(factory=list)
199 interval_list = attr.ib(factory=list)
201 def __attrs_post_init__(self):
202 self.add_dfe(
203 np.array([[0, int(self.length)]]),
204 stdpopsim.dfe.neutral_dfe(),
205 )
207 @staticmethod
208 def basic_contig(*, length, mutation_rate=0, recombination_rate=0):
209 recomb_map = msprime.RateMap.uniform(length, recombination_rate)
210 return Contig(recombination_map=recomb_map, mutation_rate=mutation_rate)
212 @staticmethod
213 def species_contig(
214 *,
215 species,
216 chromosome=None,
217 genetic_map=None,
218 length_multiplier=1,
219 length=None,
220 mutation_rate=None,
221 inclusion_mask=None,
222 exclusion_mask=None,
223 ):
224 """
225 Build a Contig for a species.
226 """
227 # TODO: add non-autosomal support
228 non_autosomal_lower = ["x", "y", "m", "mt", "chrx", "chry", "chrm"]
229 if chromosome is not None and chromosome.lower() in non_autosomal_lower:
230 warnings.warn(
231 stdpopsim.NonAutosomalWarning(
232 "Non-autosomal simulations are not yet supported. See "
233 "https://github.com/popsim-consortium/stdpopsim/issues/383 and "
234 "https://github.com/popsim-consortium/stdpopsim/issues/406"
235 )
236 )
237 if chromosome is None:
238 if genetic_map is not None:
239 raise ValueError("Cannot use genetic map with generic contig")
240 if length_multiplier != 1:
241 raise ValueError("Cannot use length multiplier for generic contig")
242 if inclusion_mask is not None or exclusion_mask is not None:
243 raise ValueError("Cannot use mask with generic contig")
244 if length is None:
245 raise ValueError("Must specify sequence length of generic contig")
246 L_tot = 0
247 r_tot = 0
248 u_tot = 0
249 for chrom_data in species.genome.chromosomes:
250 if chrom_data.id.lower() not in non_autosomal_lower:
251 L_tot += chrom_data.length
252 r_tot += chrom_data.length * chrom_data.recombination_rate
253 u_tot += chrom_data.length * chrom_data.mutation_rate
254 if mutation_rate is None:
255 mutation_rate = u_tot / L_tot
256 r = r_tot / L_tot
257 contig = Contig.basic_contig(
258 length=length,
259 mutation_rate=mutation_rate,
260 recombination_rate=r,
261 )
262 else:
263 if length is not None:
264 raise ValueError("Cannot specify sequence length for named contig")
265 if inclusion_mask is not None and exclusion_mask is not None:
266 raise ValueError("Cannot specify both inclusion and exclusion masks")
267 chrom = species.genome.get_chromosome(chromosome)
268 if genetic_map is None:
269 logger.debug(f"Making flat chromosome {length_multiplier} * {chrom.id}")
270 gm = None
271 recomb_map = msprime.RateMap.uniform(
272 round(chrom.length * length_multiplier), chrom.recombination_rate
273 )
274 else:
275 if length_multiplier != 1:
276 raise ValueError("Cannot use length multiplier with empirical maps")
277 logger.debug(f"Getting map for {chrom.id} from {genetic_map}")
278 gm = species.get_genetic_map(genetic_map)
279 recomb_map = gm.get_chromosome_map(chrom.id)
281 inclusion_intervals = None
282 exclusion_intervals = None
283 if inclusion_mask is not None:
284 if length_multiplier != 1:
285 raise ValueError("Cannot use length multiplier with mask")
286 if isinstance(inclusion_mask, str):
287 inclusion_intervals = stdpopsim.utils.read_bed(
288 inclusion_mask, chromosome
289 )
290 else:
291 inclusion_intervals = inclusion_mask
292 if exclusion_mask is not None:
293 if length_multiplier != 1:
294 raise ValueError("Cannot use length multiplier with mask")
295 if isinstance(exclusion_mask, str):
296 exclusion_intervals = stdpopsim.utils.read_bed(
297 exclusion_mask, chromosome
298 )
299 else:
300 exclusion_intervals = exclusion_mask
302 if mutation_rate is None:
303 mutation_rate = chrom.mutation_rate
305 contig = stdpopsim.Contig(
306 recombination_map=recomb_map,
307 mutation_rate=mutation_rate,
308 genetic_map=gm,
309 inclusion_mask=inclusion_intervals,
310 exclusion_mask=exclusion_intervals,
311 )
313 return contig
315 @property
316 def length(self):
317 return self.recombination_map.sequence_length
319 def dfe_breakpoints(self):
320 """
321 Returns two things: the sorted vector of endpoints of all intervals across
322 all DFEs in the contig, and a vector of integer labels for these intervals,
323 saying which DFE goes with which interval.
324 This provides a complementary method to tell which bit of the contig
325 has which DFE attached, which may be more convenient than the list of two-column
326 arrays provided by interval_list.
328 Suppose there are n+1 unique interval endpoints across all the DFE intervals
329 in :attr:`.interval_list`. (If the intervals in that list
330 cover the whole genome, the number of intervals is n.)
331 This method returns a tuple of two things: ``breaks, dfe_labels``.
332 "breaks" is the array containing those n+1 unique endpoints, in increasing order,
333 and "dfe" is the array of length n containing the index of the DFE
334 the applies to that interval. So, ``breaks[0]`` is always 0, and
335 ``breaks[n+1]`` is always the length of the contig, and
336 ``dfe_labels[k] = j`` if
337 ``[breaks[k], breaks[k+1]]`` is an interval in ``contig.interval_list[j]``,
338 i.e., if ``contig.dfe_list[j]`` applies to the interval starting at
339 ``breaks[k]``. Some intervals may not be covered by a DFE, in which
340 case they will have the label ``-1`` (beware of python indexing!).
342 :return: A tuple (breaks, dfe_labels).
343 """
344 breaks = np.unique(
345 np.vstack(self.interval_list + [[[0, int(self.length)]]]) # also sorted
346 )
347 dfe_labels = np.full(len(breaks) - 1, -1, dtype="int")
348 for j, intervals in enumerate(self.interval_list):
349 dfe_labels[np.isin(breaks[:-1], intervals[:, 0], assume_unique=True)] = j
350 return breaks, dfe_labels
352 def clear_dfes(self):
353 """
354 Removes all DFEs from the contig (as well as the corresponding list of
355 intervals).
356 """
357 self.dfe_list = []
358 self.interval_list = []
360 def add_dfe(self, intervals, DFE):
361 """
362 Adds the provided DFE to the contig, applying to the regions of the
363 contig specified in ``intervals``. These intervals are also *removed*
364 from the intervals of any previously-present DFEs - in other words,
365 more recently-added DFEs take precedence. Each DFE added in this way
366 carries its own MutationTypes: no mutation types are shared between
367 DFEs added by different calls to ``add_dfe( )``, even if the same DFE
368 object is added more than once.
370 For instance, if we do
371 ```
372 a1 = np.array([[0, 100]])
373 a2 = np.array([[50, 120]])
374 contig.add_dfe(a1, dfe1)
375 contig.add_dfe(a2, dfe2)
376 ```
377 then ``dfe1`` applies to the region from 0 to 50 and ``dfe2`` applies to the
378 region from 50 to 120.
380 :param array intervals: A valid set of intervals.
381 :param DFE dfe: A DFE object.
382 """
383 stdpopsim.utils._check_intervals_validity(intervals)
384 for j, ints in enumerate(self.interval_list):
385 self.interval_list[j] = stdpopsim.utils.mask_intervals(ints, intervals)
386 self.dfe_list.append(DFE)
387 self.interval_list.append(intervals)
389 @property
390 def is_neutral(self):
391 """
392 Returns True if the contig has no non-neutral mutation types.
393 """
394 return all(mt.is_neutral for d in self.dfe_list for mt in d.mutation_types)
396 def mutation_types(self):
397 """
398 Provides information about the MutationTypes assigned to this Contig,
399 along with information about which DFE they correspond to. This is
400 useful because when simulating with SLiM, the mutation types are
401 assigned numeric IDs in the order provided here (e.g., in the order
402 encountered when iterating over the DFEs); this method provides an easy
403 way to map back from their numeric ID to the DFE that each MutationType
404 corresponds to.
406 This method returns a list of dictionaries of length equal to the
407 number of MutationTypes in all DFEs of the contig, each dictionary
408 containing three things: ``"dfe_id"``: the ID of the DFE this
409 MutationType comes from; ``mutation_type``: the mutation type; and
410 ``id``: the index in the list.
412 For instance, if ``muts`` is a list of mutation objects in a SLiM tree
413 sequence, then the following code will print the IDs of the DFEs that
414 each comes from:
416 .. code-block:: python
418 mut_types = contig.mutation_types()
419 for m in muts:
420 dfe_ids = [mut_types[k]["dfe_id"] for md in m.metadata["muation_list"]]
421 print(f"Mutation {m.id} has mutations from DFE(s) {','.join(dfe_ids)")
423 """
424 id = 0
425 mut_types = []
426 for d in self.dfe_list:
427 for mt in d.mutation_types:
428 mut_types.append(
429 {
430 "dfe_id": d.id,
431 "mutation_type": mt,
432 "id": id,
433 }
434 )
435 id += 1
436 return mut_types
438 def __str__(self):
439 gmap = "None" if self.genetic_map is None else self.genetic_map.id
440 s = (
441 "Contig(length={:.2G}, recombination_rate={:.2G}, "
442 "mutation_rate={:.2G}, genetic_map={}, dfe_list={}, "
443 "interval_list={})"
444 ).format(
445 self.length,
446 self.recombination_map.mean_rate,
447 self.mutation_rate,
448 gmap,
449 self.dfe_list,
450 self.interval_list,
451 )
452 return s