Source code for lasif.window_selection

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Window selection algorithm.

This module aims to provide a window selection algorithm suitable for
calculating phase misfits between two seismic waveforms.

The main function is the select_windows() function. The selection process is a
multi-stage process. Initially all time steps are considered to be valid in
the sense as being suitable for window selection. Then a number of selectors
is applied, progressively excluding more and more time steps.

:copyright:
    Lion Krischer (krischer@geophysik.uni-muenchen.de), 2013

:license:
    GNU General Public License, Version 3
    (http://www.gnu.org/copyleft/gpl.html)
"""
import itertools
import math

import numpy as np
from obspy import geodetics
import obspy.signal.filter
from scipy.signal import argrelextrema


[docs]def flatnotmasked_contiguous(time_windows): """ Helper function enabling to loop over empty time windows. """ fc = np.ma.flatnotmasked_contiguous(time_windows) # If nothing could be found, set the mask to true (which should already # be the case). if fc is None: return [] else: return fc
[docs]def find_local_extrema(data): """ Function finding local extrema. It can also deal with flat extrema, e.g. a flat top or bottom. In that case the first index of all flat values will be returned. Returns a tuple of maxima and minima indices. """ length = len(data) - 1 diff = np.diff(data) flats = np.argwhere(diff == 0) # Discard neighbouring flat points. new_flats = list(flats[0:1]) for i, j in zip(flats[:-1], flats[1:]): if j - i == 1: continue new_flats.append(j) flats = new_flats maxima = [] minima = [] # Go over each flats position and check if its a maxima/minima. for idx in flats: l_type = "left" r_type = "right" for i in itertools.count(): this_idx = idx - i - 1 if diff[this_idx] < 0: l_type = "minima" break elif diff[this_idx] > 0: l_type = "maxima" break for i in itertools.count(): this_idx = idx + i + 1 if this_idx >= len(diff): break if diff[this_idx] < 0: r_type = "maxima" break elif diff[this_idx] > 0: r_type = "minima" break if r_type != l_type: continue if r_type == "maxima": maxima.append(int(idx)) else: minima.append(int(idx)) maxs = set(list(argrelextrema(data, np.greater)[0])) mins = set(list(argrelextrema(data, np.less)[0])) peaks, troughs = ( sorted(list(maxs.union(set(maxima)))), sorted(list(mins.union(set(minima))))) # Special case handling for missing one or the other. if not peaks and not troughs: return np.array([], dtype=np.int32), np.array([], dtype=np.int32) elif not peaks: if 0 not in troughs: peaks.insert(0, 0) if length not in troughs: peaks.append(length) return (np.array(peaks, dtype=np.int32), np.array(troughs, dtype=np.int32)) elif not troughs: if 0 not in peaks: troughs.insert(0, 0) if length not in peaks: troughs.append(length) return (np.array(peaks, dtype=np.int32), np.array(troughs, dtype=np.int32)) # Mark the first and last values as well to facilitate the peak and # trough marching algorithm if 0 not in peaks and 0 not in troughs: if peaks[0] < troughs[0]: troughs.insert(0, 0) else: peaks.insert(0, 0) if length not in peaks and length not in troughs: if peaks[-1] < troughs[-1]: peaks.append(length) else: troughs.append(length) return (np.array(peaks, dtype=np.int32), np.array(troughs, dtype=np.int32))
[docs]def find_closest(ref_array, target): """ For every value in target, find the index of ref_array to which the value is closest. from http://stackoverflow.com/a/8929827/1657047 :param ref_array: The reference array. Must be sorted! :type ref_array: :class:`numpy.ndarray` :param target: The target array. :type target: :class:`numpy.ndarray` >>> ref_array = np.arange(0, 20.) >>> target = np.array([-2, 100., 2., 2.4, 2.5, 2.6]) >>> find_closest(ref_array, target) array([ 0, 19, 2, 2, 3, 3]) """ # A must be sorted idx = ref_array.searchsorted(target) idx = np.clip(idx, 1, len(ref_array) - 1) left = ref_array[idx - 1] right = ref_array[idx] idx -= target - left < right - target return idx
def _plot_mask(new_mask, old_mask, name=None): """ Helper function plotting the remaining time segments after an elimination stage. Useful to figure out which stage is responsible for a certain window being picked/rejected. :param new_mask: The mask after the elimination stage. :param old_mask: The mask before the elimination stage. :param name: The name of the elimination stage. :return: """ # Lazy imports as not needed by default. import matplotlib.pylab as plt # NOQA import matplotlib.patheffects as PathEffects # NOQA old_mask = old_mask.copy() new_mask = new_mask.copy() new_mask.mask = np.bitwise_xor(old_mask.mask, new_mask.mask) old_mask.mask = np.invert(old_mask.mask) for i in flatnotmasked_contiguous(old_mask): plt.fill_between((i.start, i.stop), (-1.0, -1.0), (2.0, 2.0), color="gray", alpha=0.3, lw=0) new_mask.mask = np.invert(new_mask.mask) for i in flatnotmasked_contiguous(new_mask): plt.fill_between((i.start, i.stop), (-1.0, -1.0), (2.0, 2.0), color="#fb9a99", lw=0) if name: plt.text(len(new_mask) - 1 - 20, 0.5, name, verticalalignment="center", horizontalalignment="right", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")], fontweight=500) plt.xlim(0, len(new_mask) - 1) plt.ylim(0, 1) plt.yticks([]) plt.gca().xaxis.set_ticklabels([]) def _window_generator(data_length, window_width): """ Simple generator yielding start and stop indices for sliding windows. :param data_length: The complete length of the data series over which to slide the window. :param window_width: The desired window width. """ window_start = 0 while True: window_end = window_start + window_width if window_end > data_length: break yield (window_start, window_end, window_start + window_width // 2) window_start += 1 def _log_window_selection(tr_id, msg): """ Helper function for consistent output during the window selection. :param tr_id: The id of the current trace. :param msg: The message to be printed. """ print "[Window selection for %s] %s" % (tr_id, msg) # Dictionary to cache the TauPyModel so there is no need to reinitialize it # each time which is a fairly expensive operation. TAUPY_MODEL_CACHE = {}
[docs]def select_windows(data_trace, synthetic_trace, event_latitude, event_longitude, event_depth_in_km, station_latitude, station_longitude, minimum_period, maximum_period, min_cc=0.10, max_noise=0.10, max_noise_window=0.4, min_velocity=2.4, threshold_shift=0.30, threshold_correlation=0.75, min_length_period=1.5, min_peaks_troughs=2, max_energy_ratio=10.0, min_envelope_similarity=0.2, verbose=False, plot=False): """ Window selection algorithm for picking windows suitable for misfit calculation based on phase differences. Returns a list of windows which might be empty due to various reasons. This function is really long and a lot of things. For a more detailed description, please see the LASIF paper. :param data_trace: The data trace. :type data_trace: :class:`~obspy.core.trace.Trace` :param synthetic_trace: The synthetic trace. :type synthetic_trace: :class:`~obspy.core.trace.Trace` :param event_latitude: The event latitude. :type event_latitude: float :param event_longitude: The event longitude. :type event_longitude: float :param event_depth_in_km: The event depth in km. :type event_depth_in_km: float :param station_latitude: The station latitude. :type station_latitude: float :param station_longitude: The station longitude. :type station_longitude: float :param minimum_period: The minimum period of the data in seconds. :type minimum_period: float :param maximum_period: The maximum period of the data in seconds. :type maximum_period: float :param min_cc: Minimum normalised correlation coefficient of the complete traces. :type min_cc: float :param max_noise: Maximum relative noise level for the whole trace. Measured from maximum amplitudes before and after the first arrival. :type max_noise: float :param max_noise_window: Maximum relative noise level for individual windows. :type max_noise_window: float :param min_velocity: All arrivals later than those corresponding to the threshold velocity [km/s] will be excluded. :type min_velocity: float :param threshold_shift: Maximum allowable time shift within a window, as a fraction of the minimum period. :type threshold_shift: float :param threshold_correlation: Minimum normalised correlation coeeficient within a window. :type threshold_correlation: float :param min_length_period: Minimum length of the time windows relative to the minimum period. :type min_length_period: float :param min_peaks_troughs: Minimum number of extrema in an individual time window (excluding the edges). :type min_peaks_troughs: float :param max_energy_ratio: Maximum energy ratio between data and synthetics within a time window. Don't make this too small! :type max_energy_ratio: float :param min_envelope_similarity: The minimum similarity of the envelopes of both data and synthetics. This essentially assures that the amplitudes of data and synthetics can not diverge too much within a window. It is a bit like the inverse of the ratio of both envelopes so a value of 0.2 makes sure neither amplitude can be more then 5 times larger than the other. :type min_envelope_similarity: float :param verbose: No output by default. :type verbose: bool :param plot: Create a plot of the algortihm while it does its work. :type plot: bool """ # Shortcuts to frequently accessed variables. data_starttime = data_trace.stats.starttime data_delta = data_trace.stats.delta dt = data_trace.stats.delta npts = data_trace.stats.npts synth = synthetic_trace.data data = data_trace.data times = data_trace.times() # Fill cache if necessary. if not TAUPY_MODEL_CACHE: from obspy.taup import TauPyModel # NOQA TAUPY_MODEL_CACHE["model"] = TauPyModel("AK135") model = TAUPY_MODEL_CACHE["model"] # ------------------------------------------------------------------------- # Geographical calculations and the time of the first arrival. # ------------------------------------------------------------------------- dist_in_deg = geodetics.locations2degrees(station_latitude, station_longitude, event_latitude, event_longitude) dist_in_km = geodetics.calc_vincenty_inverse( station_latitude, station_longitude, event_latitude, event_longitude)[0] / 1000.0 # Get only a couple of P phases which should be the first arrival # for every epicentral distance. Its quite a bit faster than calculating # the arrival times for every phase. # Assumes the first sample is the centroid time of the event. tts = model.get_travel_times(source_depth_in_km=event_depth_in_km, distance_in_degree=dist_in_deg, phase_list=["ttp"]) # Sort just as a safety measure. tts = sorted(tts, key=lambda x: x.time) first_tt_arrival = tts[0].time # ------------------------------------------------------------------------- # Window settings # ------------------------------------------------------------------------- # Number of samples in the sliding window. Currently, the length of the # window is set to a multiple of the dominant period of the synthetics. # Make sure it is an uneven number; just to have a trivial midpoint # definition and one sample does not matter much in any case. window_length = int(round(float(2 * minimum_period) / dt)) if not window_length % 2: window_length += 1 # Use a Hanning window. No particular reason for it but its a well-behaved # window and has nice spectral properties. taper = np.hanning(window_length) # ========================================================================= # check if whole seismograms are sufficiently correlated and estimate # noise level # ========================================================================= # Overall Correlation coefficient. norm = np.sqrt(np.sum(data ** 2)) * np.sqrt(np.sum(synth ** 2)) cc = np.sum(data * synth) / norm if verbose: _log_window_selection(data_trace.id, "Correlation Coefficient: %.4f" % cc) # Estimate noise level from waveforms prior to the first arrival. idx_end = int(np.ceil((first_tt_arrival - 0.5 * minimum_period) / dt)) idx_end = max(10, idx_end) idx_start = int(np.ceil((first_tt_arrival - 2.5 * minimum_period) / dt)) idx_start = max(10, idx_start) if idx_start >= idx_end: idx_start = max(0, idx_end - 10) abs_data = np.abs(data) noise_absolute = abs_data[idx_start:idx_end].max() noise_relative = noise_absolute / abs_data.max() if verbose: _log_window_selection(data_trace.id, "Absolute Noise Level: %e" % noise_absolute) _log_window_selection(data_trace.id, "Relative Noise Level: %e" % noise_relative) # Basic global rejection criteria. accept_traces = True if (cc < min_cc) and (noise_relative > max_noise / 3.0): msg = "Correlation %.4f is below threshold of %.4f" % (cc, min_cc) if verbose: _log_window_selection(data_trace.id, msg) accept_traces = msg if noise_relative > max_noise: msg = "Noise level %.3f is above threshold of %.3f" % ( noise_relative, max_noise) if verbose: _log_window_selection( data_trace.id, msg) accept_traces = msg # Calculate the envelope of both data and synthetics. This is to make sure # that the amplitude of both is not too different over time and is # used as another selector. Only calculated if the trace is generally # accepted as it is fairly slow. if accept_traces is True: data_env = obspy.signal.filter.envelope(data) synth_env = obspy.signal.filter.envelope(synth) # ------------------------------------------------------------------------- # Initial Plot setup. # ------------------------------------------------------------------------- # All the plot calls are interleaved. I realize this is really ugly but # the alternative would be to either have two functions (one with plots, # one without) or split the plotting function in various subfunctions, # neither of which are acceptable in my opinion. The impact on # performance is minimal if plotting is turned off: all imports are lazy # and a couple of conditionals are cheap. if plot: import matplotlib.pylab as plt # NOQA import matplotlib.patheffects as PathEffects # NOQA if accept_traces is True: plt.figure(figsize=(18, 12)) plt.subplots_adjust(left=0.05, bottom=0.05, right=0.98, top=0.95, wspace=None, hspace=0.0) grid = (31, 1) # Axes showing the data. data_plot = plt.subplot2grid(grid, (0, 0), rowspan=8) else: # Only show one axes it the traces are not accepted. plt.figure(figsize=(18, 3)) # Plot envelopes if needed. if accept_traces is True: plt.plot(times, data_env, color="black", alpha=0.5, lw=0.4, label="data envelope") plt.plot(synthetic_trace.times(), synth_env, color="#e41a1c", alpha=0.4, lw=0.5, label="synthetics envelope") plt.plot(times, data, color="black", label="data", lw=1.5) plt.plot(synthetic_trace.times(), synth, color="#e41a1c", label="synthetics", lw=1.5) # Symmetric around y axis. middle = data.mean() d_max, d_min = data.max(), data.min() r = max(d_max - middle, middle - d_min) * 1.1 ylim = (middle - r, middle + r) xlim = (times[0], times[-1]) plt.ylim(*ylim) plt.xlim(*xlim) offset = (xlim[1] - xlim[0]) * 0.005 plt.vlines(first_tt_arrival, ylim[0], ylim[1], colors="#ff7f00", lw=2) plt.text(first_tt_arrival + offset, ylim[1] - (ylim[1] - ylim[0]) * 0.02, "first arrival", verticalalignment="top", horizontalalignment="left", color="#ee6e00", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")]) plt.vlines(first_tt_arrival - minimum_period / 2.0, ylim[0], ylim[1], colors="#ff7f00", lw=2) plt.text(first_tt_arrival - minimum_period / 2.0 - offset, ylim[0] + (ylim[1] - ylim[0]) * 0.02, "first arrival - min period / 2", verticalalignment="bottom", horizontalalignment="right", color="#ee6e00", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")]) for velocity in [6, 5, 4, 3, min_velocity]: tt = dist_in_km / velocity plt.vlines(tt, ylim[0], ylim[1], colors="gray", lw=2) if velocity == min_velocity: hal = "right" o_s = -1.0 * offset else: hal = "left" o_s = offset plt.text(tt + o_s, ylim[0] + (ylim[1] - ylim[0]) * 0.02, str(velocity) + " km/s", verticalalignment="bottom", horizontalalignment=hal, color="0.15") plt.vlines(dist_in_km / min_velocity + minimum_period / 2.0, ylim[0], ylim[1], colors="gray", lw=2) plt.text(dist_in_km / min_velocity + minimum_period / 2.0 - offset, ylim[1] - (ylim[1] - ylim[0]) * 0.02, "min surface velocity + min period / 2", verticalalignment="top", horizontalalignment="right", color="0.15", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")]) plt.hlines(noise_absolute, xlim[0], xlim[1], linestyle="--", color="gray") plt.hlines(-noise_absolute, xlim[0], xlim[1], linestyle="--", color="gray") plt.text(offset, noise_absolute + (ylim[1] - ylim[0]) * 0.01, "noise level", verticalalignment="bottom", horizontalalignment="left", color="0.15", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")]) plt.legend(loc="lower right", fancybox=True, framealpha=0.5, fontsize="small") plt.gca().xaxis.set_ticklabels([]) # Plot the basic global information. ax = plt.gca() txt = ( "Total CC Coeff: %.4f\nAbsolute Noise: %e\nRelative Noise: %.3f" % (cc, noise_absolute, noise_relative)) ax.text(0.01, 0.95, txt, transform=ax.transAxes, fontdict=dict(fontsize="small", ha='left', va='top'), bbox=dict(boxstyle="round", fc="w", alpha=0.8)) plt.suptitle("Channel %s" % data_trace.id, fontsize="larger") # Show plot and return if not accepted. if accept_traces is not True: txt = "Rejected: %s" % (accept_traces) ax.text(0.99, 0.95, txt, transform=ax.transAxes, fontdict=dict(fontsize="small", ha='right', va='top'), bbox=dict(boxstyle="round", fc="red", alpha=1.0)) plt.show() if accept_traces is not True: return [] # Initialise masked arrays. The mask will be set to True where no # windows are chosen. time_windows = np.ma.ones(npts) time_windows.mask = False if plot: old_time_windows = time_windows.copy() # Elimination Stage 1: Eliminate everything half a period before or # after the minimum and maximum travel times, respectively. # theoretical arrival as positive. min_idx = int((first_tt_arrival - (minimum_period / 2.0)) / dt) max_idx = int(math.ceil(( dist_in_km / min_velocity + minimum_period / 2.0) / dt)) time_windows.mask[:min_idx + 1] = True time_windows.mask[max_idx:] = True if plot: plt.subplot2grid(grid, (8, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="TRAVELTIME ELIMINATION") old_time_windows = time_windows.copy() # ------------------------------------------------------------------------- # Compute sliding time shifts and correlation coefficients for time # frames that passed the traveltime elimination stage. # ------------------------------------------------------------------------- # Allocate arrays to collect the time dependent values. sliding_time_shift = np.ma.zeros(npts, dtype="float32") sliding_time_shift.mask = True max_cc_coeff = np.ma.zeros(npts, dtype="float32") max_cc_coeff.mask = True for start_idx, end_idx, midpoint_idx in _window_generator(npts, window_length): if not min_idx < midpoint_idx < max_idx: continue # Slice windows. Create a copy to be able to taper without affecting # the original time series. data_window = data[start_idx: end_idx].copy() * taper synthetic_window = \ synth[start_idx: end_idx].copy() * taper # Elimination Stage 2: Skip windows that have essentially no energy # to avoid instabilities. No windows can be picked in these. if synthetic_window.ptp() < synth.ptp() * 0.001: time_windows.mask[midpoint_idx] = True continue # Calculate the time shift. Here this is defined as the shift of the # synthetics relative to the data. So a value of 2, for instance, means # that the synthetics are 2 timesteps later then the data. cc = np.correlate(data_window, synthetic_window, mode="full") time_shift = cc.argmax() - window_length + 1 # Express the time shift in fraction of the minimum period. sliding_time_shift[midpoint_idx] = (time_shift * dt) / minimum_period # Normalized cross correlation. max_cc_value = cc.max() / np.sqrt((synthetic_window ** 2).sum() * (data_window ** 2).sum()) max_cc_coeff[midpoint_idx] = max_cc_value if plot: plt.subplot2grid(grid, (9, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="NO ENERGY IN CC WINDOW") # Axes with the CC coeffs plt.subplot2grid(grid, (15, 0), rowspan=4) plt.hlines(0, xlim[0], xlim[1], color="lightgray") plt.hlines(-threshold_shift, xlim[0], xlim[1], color="gray", linestyle="--") plt.hlines(threshold_shift, xlim[0], xlim[1], color="gray", linestyle="--") plt.text(5, -threshold_shift - (2) * 0.03, "threshold", verticalalignment="top", horizontalalignment="left", color="0.15", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")]) plt.plot(times, sliding_time_shift, color="#377eb8", label="Time shift in fraction of minimum period", lw=1.5) ylim = plt.ylim() plt.yticks([-0.75, 0, 0.75]) plt.xticks([300, 600, 900, 1200, 1500, 1800]) plt.ylim(ylim[0], ylim[1] + ylim[1] - ylim[0]) plt.ylim(-1.0, 1.0) plt.xlim(xlim) plt.gca().xaxis.set_ticklabels([]) plt.legend(loc="lower right", fancybox=True, framealpha=0.5, fontsize="small") plt.subplot2grid(grid, (10, 0), rowspan=4) plt.hlines(threshold_correlation, xlim[0], xlim[1], color="0.15", linestyle="--") plt.hlines(1, xlim[0], xlim[1], color="lightgray") plt.hlines(0, xlim[0], xlim[1], color="lightgray") plt.text(5, threshold_correlation + (1.4) * 0.01, "threshold", verticalalignment="bottom", horizontalalignment="left", color="0.15", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")]) plt.plot(times, max_cc_coeff, color="#4daf4a", label="Maximum CC coefficient", lw=1.5) plt.ylim(-0.2, 1.2) plt.yticks([0, 0.5, 1]) plt.xticks([300, 600, 900, 1200, 1500, 1800]) plt.xlim(xlim) plt.gca().xaxis.set_ticklabels([]) plt.legend(loc="lower right", fancybox=True, framealpha=0.5, fontsize="small") # Elimination Stage 3: Mark all areas where the normalized cross # correlation coefficient is under threshold_correlation as negative if plot: old_time_windows = time_windows.copy() time_windows.mask[max_cc_coeff < threshold_correlation] = True if plot: plt.subplot2grid(grid, (14, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="CORRELATION COEFF THRESHOLD ELIMINATION") # Elimination Stage 4: Mark everything with an absolute travel time # shift of more than # threshold_shift times the dominant period as # negative if plot: old_time_windows = time_windows.copy() time_windows.mask[np.ma.abs(sliding_time_shift) > threshold_shift] = True if plot: plt.subplot2grid(grid, (19, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="TIME SHIFT THRESHOLD ELIMINATION") # Elimination Stage 5: Mark the area around every "travel time shift # jump" (based on the traveltime time difference) negative. The width of # the area is currently chosen to be a tenth of a dominant period to # each side. if plot: old_time_windows = time_windows.copy() sample_buffer = int(np.ceil(minimum_period / dt * 0.1)) indices = np.ma.where(np.ma.abs(np.ma.diff(sliding_time_shift)) > 0.1)[0] for index in indices: time_windows.mask[index - sample_buffer: index + sample_buffer] = True if plot: plt.subplot2grid(grid, (20, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="TIME SHIFT JUMPS ELIMINATION") # Clip both to avoid large numbers by division. stacked = np.vstack([ np.ma.clip(synth_env, synth_env.max() * min_envelope_similarity * 0.5, synth_env.max()), np.ma.clip(data_env, data_env.max() * min_envelope_similarity * 0.5, data_env.max())]) # Ratio. ratio = stacked.min(axis=0) / stacked.max(axis=0) # Elimination Stage 6: Make sure the amplitudes of both don't vary too # much. if plot: old_time_windows = time_windows.copy() time_windows.mask[ratio < min_envelope_similarity] = True if plot: plt.subplot2grid(grid, (25, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="ENVELOPE AMPLITUDE SIMILARITY ELIMINATION") if plot: plt.subplot2grid(grid, (21, 0), rowspan=4) plt.hlines(min_envelope_similarity, xlim[0], xlim[1], color="gray", linestyle="--") plt.text(5, min_envelope_similarity + (2) * 0.03, "threshold", verticalalignment="bottom", horizontalalignment="left", color="0.15", path_effects=[ PathEffects.withStroke(linewidth=3, foreground="white")]) plt.plot(times, ratio, color="#9B59B6", label="Envelope amplitude similarity", lw=1.5) plt.yticks([0, 0.2, 0.4, 0.6, 0.8, 1.0]) plt.ylim(0.05, 1.05) plt.xticks([300, 600, 900, 1200, 1500, 1800]) plt.xlim(xlim) plt.gca().xaxis.set_ticklabels([]) plt.legend(loc="lower right", fancybox=True, framealpha=0.5, fontsize="small") # First minimum window length elimination stage. This is cheap and if # not done it can easily destabilize the peak-and-trough marching stage # which would then have to deal with way more edge cases. if plot: old_time_windows = time_windows.copy() min_length = \ min(minimum_period / dt * min_length_period, maximum_period / dt) for i in flatnotmasked_contiguous(time_windows): # Step 7: Throw away all windows with a length of less then # min_length_period the dominant period. if (i.stop - i.start) < min_length: time_windows.mask[i.start: i.stop] = True if plot: plt.subplot2grid(grid, (26, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="MINIMUM WINDOW LENGTH ELIMINATION 1") # ------------------------------------------------------------------------- # Peak and trough marching algorithm # ------------------------------------------------------------------------- final_windows = [] for i in flatnotmasked_contiguous(time_windows): # Cut respective windows. window_npts = i.stop - i.start synthetic_window = synth[i.start: i.stop] data_window = data[i.start: i.stop] # Find extrema in the data and the synthetics. data_p, data_t = find_local_extrema(data_window) synth_p, synth_t = find_local_extrema(synthetic_window) window_mask = np.ones(window_npts, dtype="bool") closest_peaks = find_closest(data_p, synth_p) diffs = np.diff(closest_peaks) for idx in np.where(diffs == 1)[0]: if idx > 0: start = synth_p[idx - 1] else: start = 0 if idx < (len(synth_p) - 1): end = synth_p[idx + 1] else: end = -1 window_mask[start: end] = False closest_troughs = find_closest(data_t, synth_t) diffs = np.diff(closest_troughs) for idx in np.where(diffs == 1)[0]: if idx > 0: start = synth_t[idx - 1] else: start = 0 if idx < (len(synth_t) - 1): end = synth_t[idx + 1] else: end = -1 window_mask[start: end] = False window_mask = np.ma.masked_array(window_mask, mask=window_mask) if window_mask.mask.all(): continue for j in flatnotmasked_contiguous(window_mask): final_windows.append((i.start + j.start, i.start + j.stop)) if plot: old_time_windows = time_windows.copy() time_windows.mask[:] = True for start, stop in final_windows: time_windows.mask[start:stop] = False if plot: plt.subplot2grid(grid, (27, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="PEAK AND TROUGH MARCHING ELIMINATION") # Loop through all the time windows, remove windows not satisfying the # minimum number of peaks and troughs per window. Acts mainly as a # safety guard. old_time_windows = time_windows.copy() for i in flatnotmasked_contiguous(old_time_windows): synthetic_window = synth[i.start: i.stop] data_window = data[i.start: i.stop] data_p, data_t = find_local_extrema(data_window) synth_p, synth_t = find_local_extrema(synthetic_window) if np.min([len(synth_p), len(synth_t), len(data_p), len(data_t)]) < \ min_peaks_troughs: time_windows.mask[i.start: i.stop] = True if plot: plt.subplot2grid(grid, (28, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="PEAK/TROUGH COUNT ELIMINATION") # Second minimum window length elimination stage. if plot: old_time_windows = time_windows.copy() min_length = \ min(minimum_period / dt * min_length_period, maximum_period / dt) for i in flatnotmasked_contiguous(time_windows): # Step 7: Throw away all windows with a length of less then # min_length_period the dominant period. if (i.stop - i.start) < min_length: time_windows.mask[i.start: i.stop] = True if plot: plt.subplot2grid(grid, (29, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="MINIMUM WINDOW LENGTH ELIMINATION 2") # Final step, eliminating windows with little energy. final_windows = [] for j in flatnotmasked_contiguous(time_windows): # Again assert a certain minimal length. if (j.stop - j.start) < min_length: continue # Compare the energy in the data window and the synthetic window. data_energy = (data[j.start: j.stop] ** 2).sum() synth_energy = (synth[j.start: j.stop] ** 2).sum() energies = sorted([data_energy, synth_energy]) if energies[1] > max_energy_ratio * energies[0]: if verbose: _log_window_selection( data_trace.id, "Deselecting window due to energy ratio between " "data and synthetics.") continue # Check that amplitudes in the data are above the noise if noise_absolute / data[j.start: j.stop].ptp() > \ max_noise_window: if verbose: _log_window_selection( data_trace.id, "Deselecting window due having no amplitude above the " "signal to noise ratio.") final_windows.append((j.start, j.stop)) if plot: old_time_windows = time_windows.copy() time_windows.mask[:] = True for start, stop in final_windows: time_windows.mask[start:stop] = False if plot: plt.subplot2grid(grid, (30, 0), rowspan=1) _plot_mask(time_windows, old_time_windows, name="LITTLE ENERGY ELIMINATION") if verbose: _log_window_selection( data_trace.id, "Done, Selected %i window(s)" % len(final_windows)) # Final step is to convert the index value windows to actual times. windows = [] for start, stop in final_windows: start = data_starttime + start * data_delta stop = data_starttime + stop * data_delta windows.append((start, stop)) if plot: # Plot the final windows to the data axes. import matplotlib.transforms as mtransforms # NOQA ax = data_plot trans = mtransforms.blended_transform_factory(ax.transData, ax.transAxes) for start, stop in final_windows: ax.fill_between([start * data_delta, stop * data_delta], 0, 1, facecolor="#CDDC39", alpha=0.5, transform=trans) plt.show() return windows
if __name__ == '__main__': import doctest doctest.testmod()