Source code for zfinder.finder

"""
Class for finding the redshift of a source in a FITS file. Exports data to csv files and plots figures.
"""

import warnings
from multiprocessing import Pool
from itertools import zip_longest

import numpy as np
from tqdm import tqdm
from astropy.io import fits
from astropy.wcs import WCS
from photutils.aperture import CircularAperture, CircularAnnulus

from zfinder.flux import calc_beam_area, mp_flux_jobs, serial_flux_jobs
from zfinder.template import template_zfind, template_per_pixel, find_lines
from zfinder.fft import fft_zfind, fft, fft_per_pixel
from zfinder.plotter import Plotter
from zfinder.utils import get_eng_exponent, average_zeroes, wcs2pix, generate_square_world_coords, radec2str, gen_random_coords
from zfinder.uncertainty import z_uncert

warnings.filterwarnings("ignore", message="divide by zero encountered in true_divide", category=RuntimeWarning)
warnings.filterwarnings("ignore", message="divide by zero encountered in divide", category=RuntimeWarning)

[docs] class zfinder(): """ zfinder is used to find the redshift of a fits image via two different methods: `template` fits gaussian functions to the data to find redshift and `zfft` performs the fast fourier transform on the flux data to find redshift. These methods will create and save a series of plots and csv files with raw data by default. Can be changed with the `showfig` and `export` parameters. If running in a `.py` file, you may need to add the following to your code. Otherwise, jupyter notebooks should work fine. >>> if __name__ == '__main__': >>> source = zfinder(fitsfile, ra, dec, aperture_radius, transition) Parameters ---------- fitsfile : `.fits` A .fits image file ra : list Right ascension of the target [h, m, s] dec : list Declination of the target [d, m, s, esign] aperture_radius : float Radius of the aperture to use over the source (pixels) transition : float The first transition frequency of the target element or molecule (GHz) bkg_radius : float, optional Radius of the background annulus (pixels). Default=50 beam_tolerance : float, optional The tolerance of the beam area (arcsec). Default=1 showfig : bool, optional If True, show the figures. Default=True savefig : bool, optional If True, save the figures. Default=True export : bool, optional If True, export the data to csv files. Default=True Methods ------- get_freq() Caclulate the frequency axis get_flux() For every frequency channel, find the flux and associated uncertainty at a position get_all_flux() Get the flux values for all ra and dec coordinates template() Performs the template redshift finding method template_pp() Performs the template redshift finding method in a square around the target ra and dec fft() Performs the fft redshift finding method fft_pp() Performs the fft redshift finding method in a square around the target ra and dec fft_uncert() Calculates the uncertainty on the fft flux. Examples -------- >>> from zfinder import zfinder >>> >>> fitsfile = '0856_cube_c0.4_nat_80MHz_taper3.fits' >>> ra = '08:56:14.8' >>> dec = '02:24:00.6' >>> aperture_radius = 3 >>> transition = 115.2712 >>> z_start = 0 >>> dz = 0.001 >>> z_end = 10 >>> >>> source = zfinder(fitsfile, ra, dec, aperture_radius, transition) >>> source.template(z_start, dz, z_end) >>> source.fft(z_start, dz, z_end) >>> >>> # Once redshift is found, narrow down the redshift range and run per pixel methods >>> aperture_radius_pp = 0.5 >>> size = 15 >>> z_start = 5.4 >>> dz = 0.001 >>> z_end = 5.7 >>> source.fft_pp(size, z_start, dz, z_end, aperture_radius_pp) >>> source.template_pp(size, z_start, dz, z_end, aperture_radius_pp) """ def __init__(self, fitsfile, ra, dec, aperture_radius, transition, bkg_radius=50, beam_tolerance=1, showfig=True, savefig=True, export=True): self._fitsfile = fitsfile self._hdr = fits.getheader(fitsfile) self._data = fits.getdata(fitsfile)[0] self._bin_hdu = fits.open(fitsfile)[1] self._ra = ra self._dec = dec self._aperture_radius = aperture_radius self._transition = transition self._bkg_radius = bkg_radius self._beam_tolerance = beam_tolerance self._export = export self._plotter = Plotter(showfig=showfig, savefig=savefig) # Ignore warnings warnings.filterwarnings("ignore", module='astropy.wcs.wcs') warnings.filterwarnings("ignore", message='Metadata was averaged for keywords CHAN,POL', category=UserWarning)
[docs] def get_freq(self): """ Caclulate the frequency axis list (x-axis) of the flux """ # Get frequency axis start = self._hdr['CRVAL3'] increment = self._hdr['CDELT3'] length = self._hdr['NAXIS3'] end = start + length * increment # Create frequency axis frequency = np.linspace(start, end, length) # Normalise to engineering notation self._freq_exponent = get_eng_exponent(frequency[0]) frequency = frequency / 10**self._freq_exponent return frequency
[docs] def get_flux(self, verbose=True, parallel=True): """ For every frequency channel, find the flux and associated uncertainty at a position Paramters --------- verbose : bool, optional If True, print progress. Default=True parallel : bool, optional If True, use multiprocessing. Default=True Returns ------- flux : list A list of flux values from each frequency channel f_uncert : list A list of flux uncertainty values for each flux measurement """ # Calculate area of the beam beam_area = calc_beam_area(self._bin_hdu, self._beam_tolerance) pix2deg = self._hdr['CDELT2'] # Pixel to degree conversion factor # The position to find the flux at position = wcs2pix(self._ra, self._dec, self._hdr) # Setup the apertures inner_radius = 2*self._aperture_radius outter_radius = 3*self._aperture_radius aperture = CircularAperture(position, self._aperture_radius) annulus = CircularAnnulus(position, inner_radius, outter_radius) # Process the flux arrays if verbose: print('Calculating flux values...') if parallel: flux, flux_uncert = mp_flux_jobs(self._data, aperture, annulus, (self._bkg_radius, self._bkg_radius), pix2deg, beam_area, verbose) else: flux, flux_uncert = serial_flux_jobs(self._data, aperture, annulus, (self._bkg_radius, self._bkg_radius), pix2deg, beam_area, verbose) # Average zeroes so there isn't div by zero error later flux_uncert = average_zeroes(flux_uncert) # Normalise to engineering notation self._flux_exponent = get_eng_exponent(np.max(flux)) flux = flux / 10**self._flux_exponent flux_uncert = flux_uncert / 10**self._flux_exponent return flux, flux_uncert
[docs] def get_all_flux(self, all_ra, all_dec, aperture_radius=None): """ Get the flux values for all ra and dec coordinates Paramters --------- all_ra : list A list of all ra coordinates all_dec : list A list of all dec coordinates aperture_radius : float Radius of the aperture to use over the source (pixels). If None, use the aperture_radius """ if aperture_radius is None: aperture_radius = self._aperture_radius print('Calculating all flux values...') with Pool() as pool: jobs = [pool.apply_async(_mp_all_flux, (self._fitsfile, r, d, aperture_radius, \ self._transition, self._bkg_radius, self._beam_tolerance)) for r, d in zip(all_ra, all_dec)] results = [res.get() for res in tqdm(jobs)] all_flux, all_uncert = zip(*results) return all_flux, all_uncert
def _calc_freq_flux(self, verbose, parallel): """ Calculate the frequency and flux """ frequency = self.get_freq() flux, flux_uncert = self.get_flux(verbose, parallel) return frequency, flux, flux_uncert def _z_uncert(self, z, chi2, sigma): """ Caclulate the uncertainty on the best fitting redshift """ peaks, _, _ = find_lines(self._flux) reduced_sigma = sigma**2 / (len(self._flux) - 2*len(peaks) - 1) neg, pos = z_uncert(z, chi2, reduced_sigma) return neg, pos
[docs] def template(self, z_start=0, dz=0.001, z_end=10, sigma=1, verbose=True, parallel=True): """ Perform the template redshift finding method Parameters ---------- z_start : float, optional The starting redshift to search from. Default=0 dz : float, optional The step size of the redshift search. Default=0.01 z_end : float, optional The ending redshift to search to. Default=10 sigma : float, optional The significance level to calculate the redshift uncertainty. Default=1 verbose : bool, optional If True, print progress. Default=True parallel : bool, optional If True, use multiprocessing. Default=True Returns ------- z : float Best fit redshift z_uncert : tuple(float, float) Lower and upper uncertainty on the best fit redshift """ # Calculate the frequency and flux if not hasattr(self, '_frequency'): self._frequency, self._flux, self._flux_uncert = self._calc_freq_flux(verbose, parallel) # Calculate the template chi2 z, chi2 = template_zfind(self._transition, self._frequency, self._flux, self._flux_uncert, z_start, dz, z_end, verbose, parallel) # Plot the template chi2 and flux self._plotter.plot_chi2(z, dz, chi2, title='Template') self._plotter.plot_template_flux(self._transition, self._frequency, self._freq_exponent, self._flux, self._flux_exponent) if self._export: self._plotter._export_template_data(self._fitsfile, self._ra, self._dec, self._aperture_radius, self._transition, filename='template.csv', sigma=sigma, flux_uncert=self._flux_uncert) neg, pos = self._z_uncert(z, chi2, sigma) return z[np.argmin(chi2)], (neg, pos)
[docs] def template_pp(self, size, z_start=0, dz=0.001, z_end=10, aperture_radius_pp=0.5, flux_limit=0.001, contfile=None): """ Performs the template redshift finding method in a square around the target ra and dec. Not recommended for very large redshift search ranges. Remember to narrow down the redshift range before using this method. Recommended to have 100-300 redshifts to check per pixel. z_start = 5.4, dz = 0.001, z_end = 5.7 is a good range for GLEAM J0856. z_start = 4.28, dz = 0.0001, z_end = 4.31 is a good range for SPT 0345-47. Parameters ---------- size : int The size of a square (centred on ra and dec) to calculate redshifts in (pixels). size=15 is 15x15 pixels z_start : float, optional The starting redshift to search from. Default=0 dz : float, optional The step size of the redshift search. Default=0.01 z_end : float, optional The ending redshift to search to. Default=10 aperture_radius_pp : float, optional Radius of the aperture to use over the source (pixels). Default=0.5 flux_limit : float, optional The minimum flux value to calculate redshifts for. Default=0.001 If contfile is specified, this is the minimum continuum flux. contfile : str, optional The fits file containing the continuum data. Default=None """ # If the other pp method has not been run, calculate all fluxes and uncertainties if not hasattr(self, '_all_flux'): all_ra, all_dec = generate_square_world_coords(self._fitsfile, self._ra, self._dec, size, aperture_radius_pp) self._all_flux, self._flux_uncertainty = self.get_all_flux(all_ra, all_dec, aperture_radius_pp) # Calculate the template chi2 for each pixel frequency = self.get_freq() z = template_per_pixel(self._transition, frequency, self._all_flux, self._flux_uncertainty, z_start, dz, z_end, size) # Plot the template pp heatmap if self._export: header = ['fitsfile', 'ra', 'dec', 'aperture_radius', 'transition', 'size', 'flux_limit', 'contfile'] data = [[self._fitsfile, self._ra, self._dec, aperture_radius_pp, self._transition, size, flux_limit, contfile], ['']] self._plotter._write_method_to_csv('template_per_pixel.csv', header, data) self._plotter._export_heatmap_data('template_per_pixel.csv', 'a', z) # export redshifts to csv self._plotter.plot_heatmap(ra=self._ra, dec=self._dec, hdr=self._hdr, data=self._data, size=size, \ z=z, title='Template', aperture_radius=aperture_radius_pp, flux_limit=flux_limit, export=self._export, contfile=contfile) # Plot the template pp heatmap
[docs] def fft(self, z_start=0, dz=0.001, z_end=10, sigma=1, verbose=True, parallel=True, uncertainty=False): """ Perform the fft redshift finding method Parameters ---------- z_start : float, optional The starting redshift to search from. Default=0 dz : float, optional The step size of the redshift search. Default=0.001 z_end : float, optional The ending redshift to search to. Default=10 sigma : float, optional The significance level to calculate the redshift uncertainty. Default=1 verbose : bool, optional If True, print progress. Default=True parallel : bool, optional If True, use multiprocessing. Default=True uncertainty : list, bool, optional A list of the uncertainty values calculated by `zfinder.fft_uncert`. If True, read the uncertainty values from the csv file. If False, no uncertainty values will be used for redshift calculation Returns ------- z : float Best fit redshift z_uncert : tuple(float, float) Lower and upper uncertainty on the best fit redshift """ # Calculate the frequency and flux if not hasattr(self, '_frequency'): self._frequency, self._flux, self._flux_uncert = self._calc_freq_flux(verbose, parallel) # Calculate the fft frequency and flux self._ffreq, self._fflux = fft(self._frequency, self._flux) if uncertainty is False: uncertainty = 1 if uncertainty is True: uncertainty = self.read_fft_uncert() # Calculate the fft chi2 z, chi2 = fft_zfind(self._transition, self._frequency, self._flux, uncertainty, z_start, dz, z_end, verbose, parallel) # Plot the fft chi2 and flux self._plotter.plot_chi2(z, dz, chi2, title='FFT') self._plotter.plot_fft_flux(self._transition, self._frequency, self._ffreq, self._fflux) if self._export: self._plotter._export_fft_data(self._fitsfile, self._ra, self._dec, self._aperture_radius, self._transition, filename='fft.csv', sigma=sigma, frequency=self._frequency, flux=self._flux, flux_uncert=uncertainty) neg, pos = self._z_uncert(z, chi2, sigma) return z[np.argmin(chi2)], (neg, pos)
[docs] def fft_pp(self, size, z_start=0, dz=0.01, z_end=10, aperture_radius_pp=0.5, flux_limit=0.001, contfile=None): """ Performs the fft redshift finding method in a square around the target ra and dec Not recommended for very large redshift search ranges. Remember to narrow down the redshift range before using this method. Recommended to have 100-300 redshifts to check per pixel. z_start = 5.4, dz = 0.001, z_end = 5.7 is a good range for GLEAM J0856. z_start = 4.28, dz = 0.0001, z_end = 4.31 is a good range for SPT 0345-47. Parameters ---------- size : int The size of a square (centred on ra and dec) to calculate redshifts in (pixels). size=15 is 15x15 pixels z_start : float, optional The starting redshift to search from. Default=0 dz : float, optional The step size of the redshift search. Default=0.01 z_end : float, optional The ending redshift to search to. Default=10 aperture_radius_pp : float, optional Radius of the aperture to use over the source (pixels). Default=0.5 flux_limit : float, optional The minimum flux value to calculate redshifts for. Default=0.001 If contfile is specified, this is the minimum continuum flux. contfile : str, optional The fits file containing the continuum data. Default=None """ # If the other pp method has not been run, calculate all fluxes and uncertainties if not hasattr(self, '_all_flux'): all_ra, all_dec = generate_square_world_coords(self._fitsfile, self._ra, self._dec, size, aperture_radius_pp) self._all_flux, self._flux_uncertainty = self.get_all_flux(all_ra, all_dec, aperture_radius_pp) # Calculate the fft chi2 for each pixel frequency = self.get_freq() z = fft_per_pixel(self._transition, frequency, self._all_flux, z_start, dz, z_end, size) # Plot the fft pp heatmap if self._export: headings = ['fitsfile', 'ra', 'dec', 'aperture_radius', 'transition', 'size', 'flux_limit', 'contfile'] data = [[self._fitsfile, self._ra, self._dec, aperture_radius_pp, self._transition, size, flux_limit, contfile], ['']] self._plotter._write_method_to_csv('fft_per_pixel.csv', headings, data) self._plotter._export_heatmap_data('fft_per_pixel.csv', 'a', z) # export redshifts to csv self._plotter.plot_heatmap(ra=self._ra, dec=self._dec, hdr=self._hdr, data=self._data, size=size, \ z=z, title='FFT', aperture_radius=aperture_radius_pp, flux_limit=flux_limit, export=self._export, contfile=contfile) # Plot the template pp heatmap
[docs] def fft_uncert(self, n=100, radius=50, min_spread=1): """ Doc string here """ # Find x,y coordinates of the target wcs = WCS(self._hdr, naxis=2) center_x, center_y = wcs2pix(self._ra, self._dec, self._hdr) x, y = gen_random_coords(n, radius, [center_x, center_y], min_spread) # self._x_coords, self._y_coords = x, y # Convert x, y pix coordinates to world ra and dec ra, dec = wcs.all_pix2world(x, y, 1) all_ra, all_dec = radec2str(ra, dec) # Get the flux values for all ra and dec coordinates all_flux, _ = self.get_all_flux(all_ra, all_dec, self._aperture_radius) freq = self.get_freq() # Calculate the FFT for each flux all_fflux = np.transpose([fft(freq, flux)[1] for flux in all_flux]) # Calculate the standard deviation of all the fflux channels all_std = np.std(all_fflux, axis=1) # column headers field_names = ['std', 'x_centre', 'y_centre', 'x', 'y', 'ra', 'dec', 'n', 'radius', 'min_spread', 'fitsfile'] dtype = [(name, object) for name in field_names] data = np.array(list(zip_longest( all_std, [center_x], [center_y], x, y, all_ra, all_dec, [n], [radius], [min_spread], [self._fitsfile], fillvalue='')), dtype=dtype) # Write the data to the csv if self._export: np.savetxt('.csv', data, delimiter=',', fmt='%s', header=','.join(data.dtype.names)) self._plotter.plot_coords(center_x, center_y, x, y, radius, fitsfile=self._fitsfile) return all_std
[docs] @staticmethod def read_fft_uncert(filename='fft_uncertainty.csv'): """ Read the fft uncertainty csv file """ std = np.genfromtxt(filename, delimiter=',', usecols=(0)).T return std
def _mp_all_flux(fitsfile, ra, dec, aperture_radius, transition, bkg_radius, beam_tolerance): """ Get the flux values for a single ra and dec coordinate """ flux, flux_uncert = zfinder(fitsfile, ra, dec, aperture_radius, transition, bkg_radius, beam_tolerance).get_flux(verbose=False, parallel=False) return flux, flux_uncert