Source code for mtspec.multitaper

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Main functions of mtspec.

:copyright:
    Lion Krischer (krischer@geophysik.uni-muenchen.de) and
    Moritz Beyreuther, 2010-2016
:license:
    GNU General Public License, Version 3
    (http://www.gnu.org/copyleft/gpl.html)
"""
import ctypes as C
import numpy as np

from .util import _load_lib


mtspeclib = _load_lib()


[docs]def mtspec(data, delta, time_bandwidth, nfft=None, number_of_tapers=None, quadratic=False, adaptive=True, verbose=False, optional_output=False, statistics=False, rshape=False, fcrit=False): """ Wrapper method for the mtspec subroutine in the library by German A. Prieto. This method estimates the adaptive weighted multitaper spectrum, as in Thomson 1982. This is done by estimating the DPSS (discrete prolate spheroidal sequences), multiplying each of the tapers with the data series, take the FFT, and using the adaptive scheme for a better estimation. It outputs the power spectral density (PSD). :param data: :class:`numpy.ndarray` Array with the data. :param delta: float Sample spacing of the data. :param time_bandwidth: float Time-bandwidth product. Common values are 2, 3, 4 and numbers in between. :param nfft: int Number of points for fft. If nfft == None, no zero padding will be applied before the fft :param number_of_tapers: integer, optional Number of tapers to use. Defaults to int(2*time_bandwidth) - 1. This is maximum senseful amount. More tapers will have no great influence on the final spectrum but increase the calculation time. Use fewer tapers for a faster calculation. :param quadratic: bool, optional Whether or not to caluclate a quadratic multitaper. Will only work if nfft is False or equal to the sample count. The nfft parameter will overwrite the quadratic paramter. Defaults to False. :param adaptive: bool, optional Whether to use adaptive or constant weighting of the eigenspectra. Defaults to True(adaptive). :param verbose: bool, optional Passed to the fortran library. Defaults to False. :param optional_output: bool, optional Calculates and returns additional output parameters. See the notes in the docstring for further details. :param statistics: bool, optional Calculates and returns statistics. See the notes in the docstring for further details. :param rshape: integer/None, optional Determines whether or not to perform the F-test for lines. If rshape is 1 or 2, then don't put the lines back. If rshape is 2 only check around 60 Hz. See the fortran source code for more informations. Defaults to None (do not perform the F-test). :param fcrit: float/None, optional The threshold probability for the F-test. If none is given, the mtspec library calculates a default value. See the fortran source code for details. Defaults to None. :return: Returns a list with :class:`numpy.ndarray`. See the note below. .. note:: This method will at return at least two arrays: The calculated spectrum and the corresponding frequencies. If optional_output is true it will also return (in the given order) (multidimensional) arrays containing the eigenspectra, the corresponding eigencoefficients and an array containing the weights for each eigenspectra normalized so that the sum of squares over the eigenspectra is one. If statistics is True is will also return (in the given order) (multidimensional) arrays containing the jackknife 5% and 95% confidence intervals, the F statistics for single line and the number of degrees of freedom for each frequency bin. If both optional_output and statistics are true, the optional_outputs will be returned before the statistics. """ npts = len(data) # Depending if nfft is specified or not initialte MtspecTytpe # for mtspec_pad_ or mtspec_d_ if nfft is None or nfft == npts: nfft = npts mt = _MtspecType("float64") # mtspec_d_ else: mt = _MtspecType("float32") # mtspec_pad_ quadratic = False # Use the optimal number of tapers in case no number is specified. if number_of_tapers is None: number_of_tapers = int(2 * time_bandwidth) - 1 # Transform the data to work with the library. data = np.require(data, dtype=mt.float, requirements=[mt.order]) # Get some information necessary for the call to the Fortran library. number_of_frequency_bins = int(nfft / 2) + 1 # Create output arrays. spectrum = mt.empty(number_of_frequency_bins) frequency_bins = mt.empty(number_of_frequency_bins) # Create optional outputs. if optional_output is True: eigenspectra = mt.empty((number_of_frequency_bins, number_of_tapers)) eigencoefficients = mt.empty((nfft, number_of_tapers), complex=True) weights = mt.empty((number_of_frequency_bins, number_of_tapers)) else: eigenspectra = eigencoefficients = weights = None # Create statistics. if statistics is True: jackknife_interval = mt.empty((number_of_frequency_bins, 2)) f_statistics = mt.empty(number_of_frequency_bins) degrees_of_freedom = mt.empty(number_of_frequency_bins) else: jackknife_interval = f_statistics = degrees_of_freedom = None # Verbose mode on or off. if verbose is True: verbose = C.byref(C.c_char('y')) else: verbose = None # Determine whether or not to compute the quadratic multitaper. if quadratic is True: quadratic = C.byref(C.c_int(1)) else: quadratic = None # Determine whether to use adaptive or constant weighting of the # eigenspectra. if adaptive is True: adaptive = None else: adaptive = C.byref(C.c_int(1)) # Determines whether or not to perform the F-test for lines. If rshape is 1 # or 2, then don't put the lines back. If rshape is 2 only check around 60 # Hz. See the fortran source code for more informations. if type(rshape) == int: rshape = C.byref(C.c_int(rshape)) else: rshape = None # The threshold probability for the F-test. If none is given, the mtspec # library calculates a default value. See the fortran source code for # details. if type(fcrit) == float: fcrit = C.byref(C.c_float(fcrit)) else: fcrit = None # Call the library. Fortran passes pointers! args = [C.byref(C.c_int(npts)), C.byref(C.c_int(nfft)), C.byref(mt.c_float(delta)), mt.p(data), C.byref(mt.c_float(time_bandwidth)), C.byref(C.c_int(number_of_tapers)), C.byref(C.c_int(number_of_frequency_bins)), mt.p(frequency_bins), mt.p(spectrum), verbose, quadratic, adaptive, mt.p(eigencoefficients), mt.p(weights), mt.p(jackknife_interval), mt.p(degrees_of_freedom), mt.p(eigenspectra), rshape, mt.p(f_statistics), fcrit, None] # diffrent arguments, depending on mtspec_pad_ or mtspec_d_, adapt if npts == nfft: args.pop(1) # finally call the shared library function mt.mtspec(*args) # Figure out what to return. See the docstring of this method for details. return_values = [spectrum, frequency_bins] if optional_output is True: return_values.extend([eigenspectra, eigencoefficients, weights]) if statistics is True: return_values.extend([jackknife_interval, f_statistics, degrees_of_freedom]) return return_values
[docs]def sine_psd(data, delta, number_of_tapers=None, number_of_iterations=2, degree_of_smoothing=1.0, statistics=False, verbose=False): """ Wrapper method for the sine_psd subroutine in the library by German A. Prieto. The subroutine is in charge of estimating the adaptive sine multitaper as in Riedel and Sidorenko (1995). It outputs the power spectral density (PSD). This is done by performing a MSE adaptive estimation. First a pilot spectral estimate is used, and S" is estimated, in order to get te number of tapers to use, using (13) of Riedel and Sidorenko for a min square error spectrum. Unlike the prolate spheroidal multitapers, the sine multitaper adaptive process introduces a variable resolution and error in the frequency domain. Complete error information is contained in the output variables as the corridor of 1-standard-deviation errors, and in the number of tapers used at each frequency. The errors are estimated in the simplest way, from the number of degrees of freedom (two per taper), not by jack-knifing. The frequency resolution is found from K*fN/Nf where fN is the Nyquist frequency and Nf is the number of frequencies estimated. The adaptive process used is as follows. A quadratic fit to the log PSD within an adaptively determined frequency band is used to find an estimate of the local second derivative of the spectrum. This is used in an equation like R & S (13) for the MSE taper number, with the difference that a parabolic weighting is applied with increasing taper order. Because the FFTs of the tapered series can be found by resampling the FFT of the original time series (doubled in length and padded with zeros) only one FFT is required per series, no matter how many tapers are used. This makes the program fast. Compared with the Thomson multitaper programs, this code is not only fast but simple and short. The spectra associated with the sine tapers are weighted before averaging with a parabolically varying weight. The expression for the optimal number of tapers given by R & S must be modified since it gives an unbounded result near points where S" vanishes, which happens at many points in most spectra. This program restricts the rate of growth of the number of tapers so that a neighboring covering interval estimate is never completely contained in the next such interval. This method SHOULD not be used for sharp cutoffs or deep valleys, or small sample sizes. Instead use Thomson multitaper in mtspec in this same library. :param data: :class:`numpy.ndarray` Array with the data. :param delta: float Sample spacing of the data. :param number_of_tapers: integer/None, optional Number of tapers to use. If none is given, the library will perform an adaptive taper estimation with a varying number of tapers for each frequency. Defaults to None. :param number_of_iterations: integer, optional Number of iterations to perform. Values less than 2 will be set to 2. Defaults to 2. :param degree_of_smoothing: float, optional Degree of smoothing. Defaults to 1.0. :param statistics: bool, optional Calculates and returns statistics. See the notes in the docstring for further details. :param verbose: bool, optional Passed to the fortran library. Defaults to False. :return: Returns a list with :class:`numpy.ndarray`. See the note below for details. .. note:: This method will at return at least two arrays: The calculated spectrum and the corresponding frequencies. If statistics is True is will also return (in the given order) (multidimensional) arrays containing the 1-std errors (a simple dof estimate) and the number of tapers used for each frequency point. """ # Verbose mode on or off. if verbose is True: verbose = C.byref(C.c_char('y')) else: verbose = None # Set the number of tapers so it can be read by the library. if number_of_tapers is None: number_of_tapers = 0 # initialize _MtspecType to save some space mt = _MtspecType("float32") # Transform the data to work with the library. data = np.require(data, dtype=mt.float, requirements=[mt.order]) # Some variables necessary to call the library. npts = len(data) number_of_frequency_bins = int(npts / 2) + 1 # Create output arrays. frequency_bins = mt.empty(number_of_frequency_bins) spectrum = mt.empty(number_of_frequency_bins) # Create optional arrays or set to None. if statistics is True: # here an exception, mt sets the type float32, here we need int32 # that is do all the type and POINTER definition once by hand tapers_per_freq_point = np.empty(number_of_frequency_bins, dtype='int32', order=mt.order) tapers_per_freq_point_p = \ tapers_per_freq_point.ctypes.data_as(C.POINTER(C.c_int)) errors = mt.empty((number_of_frequency_bins, 2)) else: tapers_per_freq_point_p = errors = None # Call the library. Fortran passes pointers! mtspeclib.sine_psd_( C.byref(C.c_int(npts)), C.byref(C.c_float(delta)), mt.p(data), C.byref(C.c_int(number_of_tapers)), C.byref(C.c_int(number_of_iterations)), C.byref(C.c_float(degree_of_smoothing)), C.byref(C.c_int(number_of_frequency_bins)), mt.p(frequency_bins), mt.p(spectrum), tapers_per_freq_point_p, mt.p(errors), verbose) # Calculate return values. return_values = [spectrum, frequency_bins] if statistics is True: return_values.extend([errors, tapers_per_freq_point]) return return_values
[docs]def dpss(npts, fw, number_of_tapers, auto_spline=True, npts_max=None): """ Calculates DPSS also known as Slepian sequences or Slepian tapers. Calculation of the DPSS (Discrete Prolate Spheroidal Sequences) and the correspondent eigenvalues. The (1 - eigenvalue) terms are also calculated. Wraps the ``dpss()`` subroutine from the Fortran library. By default this routine will use spline interpolation if sequences with more than 200.000 samples are requested. .. note:: The tapers are the eigenvectors of the tridiagonal matrix sigma(i, j) [see Slepian(1978) eq 14 and 25]. They are also the eigenvectors of the Toeplitz matrix, eq. 18. :param npts: The number of points in the series. :type npts: int :param fw: The time-bandwidth product (number of Rayleigh bins). :type fw: float :param number_of_tapers: The desired number of tapers. :type number_of_tapers: int :param auto_spline: Whether or not to automatically use spline interpolation for ``npts`` > 200000. :type auto_spline: bool :param npts_max: The number of actual points to calculate the DPSS. If this number is smaller than ``npts``, spline interpolation will be performed, regardless of the value of ``auto_spline``. :type npts_max: None or int :returns: ``(v, lambda, theta)`` with ``v(npts, number_of_tapers)`` the eigenvectors (tapers), ``lambda`` the eigenvalues of the ``v``'s and ``theta`` the 1 - ``lambda`` (energy outside the bandwidth) values. .. rubric:: Example This example demonstrates how to calculate and plot the first five DPSS'. >>> import matplotlib.pyplot as plt >>> from mtspec import dpss >>> tapers, lamb, theta = dpss(512, 2.5, 5) >>> for i in range(5): ... plt.plot(tapers[:, i]) .. plot :: # Same as the code snippet in the docstring, just a bit prettier. import matplotlib.pyplot as plt plt.style.use("ggplot") from mtspec import dpss tapers, lamb, theta = dpss(512, 2.5, 5) for i in range(5): plt.plot(tapers[:, i]) plt.xlim(0, 512) plt.ylim(-0.09, 0.09) plt.tight_layout() """ mt = _MtspecType("float64") v = mt.empty((npts, number_of_tapers)) lamb = mt.empty(number_of_tapers) theta = mt.empty(number_of_tapers) # Set auto_spline to True. if npts_max and npts_max < npts: auto_spline = True # Always set npts_max. else: npts_max = 200000 # Call either the spline routine or the normal routine. if auto_spline is True and npts > npts_max: mtspeclib.dpss_spline_( C.byref(C.c_int(npts_max)), C.byref(C.c_int(npts)), C.byref(C.c_double(fw)), C.byref(C.c_int(number_of_tapers)), mt.p(v), mt.p(lamb), mt.p(theta)) else: mtspeclib.dpss_(C.byref(C.c_int(npts)), C.byref(C.c_double(fw)), C.byref(C.c_int(number_of_tapers)), mt.p(v), mt.p(lamb), mt.p(theta)) return (v, lamb, theta)
[docs]def wigner_ville_spectrum(data, delta, time_bandwidth=3.5, number_of_tapers=None, smoothing_filter=None, filter_width=100, frequency_divider=1, verbose=False): """ Function to calculate the Wigner-Ville Distribution or Wigner-Ville Spectrum of a signal using multitaper spectral estimates. In general it gives better temporal and frequency resolution than a spectrogram but introduces many artifacts and possibly negative values which are not physical. This can be alleviated a bit by applying a smoothing kernel which is also known as a reduced interference distribution (RID). Wraps the ``wv_spec()`` and ``wv_spec_to_array()`` subroutines of the Fortran library. It is very slow for large arrays so try with a small one (< 5000 samples) first. :param data: The input signal. :type data: numpy.ndarray :param delta: The sampling interval of the data. :type delta: float :param time_bandwidth: Time bandwidth product for the tapers. :type time_bandwidth: float :param number_of_tapers: Number of tapers to use. If ``None``, the number will be automatically determined from the time bandwidth product which is usually the optimal choice. :type number_of_tapers: int :param smoothing_filter: One of ``"boxcar"``, ``"gauss"`` or just ``None`` :type smoothing_filter: str :param filter_width: Filter width in samples. :type filter_width: int :param frequency_divider: This method will always calculate all frequencies from 0 ... Nyquist frequency. This parameter allows the adjustment of the maximum frequency, so that the frequencies range from 0 .. Nyquist frequency / int(frequency_divider). :type frequency_divider: int :param verbose: Verbose output on/off. :type verbose: bool .. rubric:: Example This example demonstrates how to plot a signal, its multitaper spectral estimate, and its Wigner-Ville time-frequency distribution. The signal is sinusoidal overlaid with two simple linear chirps. >>> import matplotlib.pyplot as plt >>> import numpy as np >>> from mtspec import mtspec, wigner_ville_spectrum >>> from mtspec.util import signal_bursts >>> fig = plt.figure() Get the example signal. >>> data = signal_bursts() Plot the data on the top axes. >>> ax1 = fig.add_axes([0.2,0.75, 0.79, 0.23]) >>> ax1.plot(data, color="0.1") >>> ax1.set_xlim(0, len(data)) Plot its spectral estimate on the side. >>> ax2 = fig.add_axes([0.06,0.02,0.13,0.69]) >>> spec, freq = mtspec(data, 10, 3.5) >>> ax2.plot(spec, freq, color="0.1") >>> ax2.set_xlim(0, spec.max()) >>> ax2.set_ylim(freq[0], freq[-1]) >>> ax2.set_xticks([]) Create and plot the Wigner-Ville distribution. >>> wv = wigner_ville_spectrum(data, 10, 3.5, ... smoothing_filter='gauss') >>> ax3 = fig.add_axes([0.2, 0.02, 0.79, 0.69]) >>> ax3.set_yticks([]) >>> ax3.set_xticks([]) >>> # The square root only serves plotting purposes. >>> ax3.imshow(np.sqrt(abs(wv)), interpolation='lanczos', ... aspect='auto', cmap="magma") .. plot:: # Same as the above code snippet just a bit prettier. import matplotlib.pyplot as plt plt.style.use("ggplot") from mtspec import mtspec, wigner_ville_spectrum from mtspec.util import signal_bursts import numpy as np fig = plt.figure() data = signal_bursts() # Plot the data ax1 = fig.add_axes([0.2,0.75, 0.79, 0.23]) ax1.plot(data, color="0.1") ax1.set_xlim(0, len(data)) # Plot multitaper spectrum ax2 = fig.add_axes([0.06,0.02,0.13,0.69]) spec, freq = mtspec(data, 10, 3.5) ax2.plot(spec, freq, color="0.1") ax2.set_xlim(0, spec.max()) ax2.set_ylim(freq[0], freq[-1]) ax2.set_xticks([]) # Create the wigner ville spectrum wv = wigner_ville_spectrum(data, 10, 3.5, smoothing_filter='gauss') # Plot the WV ax3 = fig.add_axes([0.2, 0.02, 0.79, 0.69]) ax3.set_yticks([]) ax3.set_xticks([]) ax3.imshow(np.sqrt(abs(wv)), interpolation='lanczos', aspect='auto', cmap="magma") """ data = np.require(data, 'float32') mt = _MtspecType("float32") npts = len(data) # Use the optimal number of tapers in case no number is specified. if number_of_tapers is None: number_of_tapers = int(2 * time_bandwidth) - 1 # Determine filter. if not smoothing_filter: smoothing_filter = 0 elif smoothing_filter == 'boxcar': smoothing_filter = 1 elif smoothing_filter == 'gauss': smoothing_filter = 2 else: msg = 'Invalid value for smoothing filter.' raise Exception(msg) # Verbose mode on or off. if verbose: verbose = C.byref(C.c_char('y')) else: verbose = None # Allocate the output array # f90 code internally pads zeros to 2 * npts. That is we only return # every second frequency point, thus decrease the size of the array output = mt.empty((npts // 2 // int(frequency_divider) + 1, npts)) mtspeclib.wv_spec_to_array_(C.byref(C.c_int(npts)), C.byref(C.c_float(delta)), mt.p(data), mt.p(output), C.byref(C.c_float(time_bandwidth)), C.byref(C.c_int(number_of_tapers)), C.byref(C.c_int(smoothing_filter)), C.byref(C.c_float(filter_width)), C.byref(C.c_int(frequency_divider)), verbose) return output
[docs]def mt_coherence(df, xi, xj, tbp, kspec, nf, p, **kwargs): """ Construct the coherence spectrum from the yk's and the weights of the usual multitaper spectrum estimation. Note this code uses the real(4) multitaper code. INPUT :param df: float; sampling rate of time series :param xi: numpy.ndarray; data for first series :param xj: numpy.ndarray; data for second series :param tbp: float; the time-bandwidth product :param kspec: integer; number of tapers to use :param nf: integer; number of freq points in spectrum :param p: float; confidence for null hypothesis test, e.g. .95 OPTIONAL INPUT :param iadapt: integer 0 - adaptive, 1 - constant weights default adapt = 1 OPTIONAL OUTPUTS, the outputs are returned as dictionary, with keys as specified below and values as numpy.ndarrays. In order to activate the output set the corresponding kwarg in the argument list, e.g. ``mt_coherence(df, xi, xj, tbp, kspec, nf, p, freq=True, cohe=True)`` :param freq: the frequency bins :param cohe: coherence of the two series (0 - 1) :param phase: the phase at each frequency :param speci: spectrum of first series :param specj: spectrum of second series :param conf: p confidence value for each freq. :param cohe_ci: 95% bounds on coherence (not larger than 1) :param phase_ci: 95% bounds on phase estimates If confidence intervals are requested, then both phase and cohe variables need to be requested as well. """ npts = len(xi) if len(xj) != npts: raise Exception("Inpurt ndarrays have mismatching length") mt = _MtspecType('float32') # convert type of input arguments if necessary xi = np.require(xi, dtype=mt.float, requirements=[mt.order]) xj = np.require(xj, dtype=mt.float, requirements=[mt.order]) # fill up optional arguments, if not given set them None args = [] for key in ('freq', 'cohe', 'phase', 'speci', 'specj', 'conf', 'cohe_ci', 'phase_ci', 'iadapt'): kwargs.setdefault(key, None) if key in ('cohe_ci', 'phase_ci') and kwargs[key]: kwargs[key] = mt.empty(nf, 2) args.append(mt.p(kwargs[key])) elif key == 'iadapt' and kwargs[key]: args.append(C.byref(C.c_int(kwargs[key]))) elif kwargs[key]: kwargs[key] = mt.empty(nf) args.append(mt.p(kwargs[key])) else: args.append(kwargs[key]) mtspeclib.mt_cohe_(C.byref(C.c_int(int(npts))), C.byref(C.c_float(float(df))), mt.p(xi), mt.p(xj), C.byref(C.c_float(float(tbp))), C.byref(C.c_int(int(kspec))), C.byref(C.c_int(int(nf))), C.byref(C.c_float(float(p))), *args) # remove None values from dictionary return dict([(k, v) for k, v in kwargs.items() if v is not None])
[docs]def mt_deconvolve(data_a, data_b, delta, nfft=None, time_bandwidth=None, number_of_tapers=None, weights="adaptive", demean=True, fmax=0.0): """ Deconvolve two time series using multitapers. This uses the eigencoefficients and the weights from the multitaper spectral estimations and more or less follows this paper: .. |br| raw:: html <br /> **Receiver Functions from Multiple-Taper Spectral Correlation Estimates** *Jeffrey Park, Vadim Levin* |br| Bulletin of the Seismological Society of America Dec 2000, 90 (6) 1507-1520 http://dx.doi.org/10.1785/0119990122 :type data_a: :class:`numpy.ndarray` :param data_a: Data for first time series. :type data_b: :class:`numpy.ndarray` :param data_b: Data for second time series. :type delta: float :param delta: Sample spacing of the data. :type nfft: int :param nfft: Number of points for the FFT. If ``nfft == None``, no zero padding will be applied before the FFT. :type time_bandwidth: float :param time_bandwidth: Time-bandwidth product. Common values are 2, 3, 4, and numbers in between. :type number_of_tapers: int :param number_of_tapers: Number of tapers to use. Defaults to ``int(2*time_bandwidth) - 1``. This is maximum senseful amount. More tapers will have no great influence on the final spectrum but increase the calculation time. Use fewer tapers for a faster calculation. :type weights: str :param weights: ``"adaptive"`` or ``"constant"`` weights. :type deman: bool :param demean: Force the complex TF to be demeaned. :type fmax: float :param fmax: Maximum frequency for lowpass cosine filter. Set this to zero to not have a filter. :return: Returns a dictionary with 5 :class:`numpy.ndarray`'s. See the note below. .. note:: Returns a dictionary with five arrays: * ``"deconvolved"``: Deconvolved time series. * ``"spectrum_a"``: Spectrum of the first time series. * ``"spectrum_b"``: Spectrum of the second time series. * ``"spectral_ratio"``: The ratio of both spectra. * ``"frequencies"``: The used frequency bins for the spectra. """ npts = len(data_a) if len(data_b) != npts: raise ValueError("Input arrays must have the same length!") if nfft is None: nfft = npts elif nfft < npts: raise ValueError("nfft must be larger then the number of samples in " "the array.") # Deconvolution utilizes the 32bit version. mt = _MtspecType("float32") # Use the optimal number of tapers in case no number is specified. if number_of_tapers is None: number_of_tapers = int(2 * time_bandwidth) - 1 # Transform the data to work with the library. data_a = np.require(data_a, mt.float, requirements=[mt.order]) data_b = np.require(data_b, mt.float, requirements=[mt.order]) nf = nfft // 2 + 1 # Internally uses integers if demean: demean = 1 else: demean = 0 # iad = 0 are adaptive, iad = 1 are constant weight - this is # counter intuitive. if weights == "constant": adaptive = 1 elif weights == "adaptive": adaptive = 0 else: raise ValueError('Weights must be either "adaptive" or "constant".') tfun = mt.empty(nfft) freq = mt.empty(nf) spec_ratio = mt.empty(nf) speci = mt.empty(nf) specj = mt.empty(nf) mtspeclib.mt_deconv_( C.byref(C.c_int(int(npts))), C.byref(C.c_int(int(nfft))), C.byref(C.c_float(float(delta))), mt.p(data_a), mt.p(data_b), C.byref(C.c_float(float(time_bandwidth))), C.byref(C.c_int(int(number_of_tapers))), C.byref(C.c_int(int(nf))), C.byref(C.c_int(adaptive)), mt.p(freq), mt.p(tfun), mt.p(spec_ratio), mt.p(speci), mt.p(specj), C.byref(C.c_int(demean)), C.byref(C.c_float(fmax))) return { "frequencies": freq, "deconvolved": tfun, "spectral_ratio": spec_ratio, "spectrum_a": speci, "spectrum_b": specj }
class _MtspecType(object): """ Simple class that stores type definition for interfacing with the fortran code and provides some helper functions. """ struct = {"float32": (C.c_float, mtspeclib.mtspec_pad_), "float64": (C.c_double, mtspeclib.mtspec_d_)} def __init__(self, dtype): """ Depending on dtype initialize different type structures. :param dtype: 'float32' or 'float64' """ if dtype not in self.struct.keys(): raise ValueError("dtype must be either 'float32' or 'float64'") self.float = dtype self.complex = 'complex%d' % (2 * float(dtype[-2:])) self.c_float = self.struct[dtype][0] self.pointer = C.POINTER(self.c_float) self.order = "F" self.mtspec = self.struct[dtype][1] def empty(self, shape, complex=False): """ A wrapper around np.empty which automatically sets the correct type and returns an empty array. :param shape: The shape of the array in np.empty format """ if complex: return np.empty(shape, dtype=self.complex, order=self.order) return np.empty(shape, dtype=self.float, order=self.order) def p(self, ndarray): """ A wrapper around ctypes.data_as which automatically sets the correct type. Returns none if ndarray is None. :param ndarray: numpy input array or None """ # short variable name for passing as argument in function calls if ndarray is None: return None return ndarray.ctypes.data_as(self.pointer)