Source code for pipe.read

# -*- coding: utf-8 -*-
"""
Created on Wed Mar  4 16:00:21 2020

@author: Alexis Brandeker, alexis@astro.su.se

A collection of routines that read data from CHEOPS fits format
files, and typically returns numpy arrays.

Also contains routines that save data in fits or text formats.

"""

import os
import numpy as np
from scipy import interpolate
from astropy.io import fits

fits.Conf.use_memmap = False

[docs]def datacube(filename, frame_range=None): """Read CHEOPS datacube format, either subarray or imagettes. Returns cube as numpy array where the first index is frame number, an array with the mjd for each frame, the header and an associated table with various pre-frame data like e.g. the bias values. """ with fits.open(filename) as hdul: rawcube = np.array(hdul[1].data, dtype='f8') np.nan_to_num(rawcube, copy=False) hdr = hdul[0].header + hdul[1].header mjd = hdul[2].data['MJD_TIME'] tab = hdul[2].data if frame_range is not None: return rawcube[frame_range[0]:frame_range[1]], \ mjd[frame_range[0]:frame_range[1]], \ hdr, \ tab[frame_range[0]:frame_range[1]] return rawcube, mjd, hdr, tab
def lightcurve(filename): """Reads the DRP (or PIPE) lightcurve fits file, returns a numpy dict table """ with fits.open(filename) as hdul: lc = hdul[1].data return lc def fits_cube(filename, level=0): """Reads raw fits cube, returns data (converted to doubles) and header. level is the fits-level of the data, in case of multiple fits layers. """ with fits.open(filename) as hdul: cube = np.array(hdul[level].data, dtype='f8') hdr = hdul[level].header.copy() return cube, hdr def mask(filename): """Reads a predefined mask saved as a fits file, returns numpy array """ with fits.open(filename) as hdul: m = hdul[1].data return m
[docs]def nonlinear(filename): """Reads the non-linear correction from a text file with ADU vs multiplicative correction. The correction should be applied after bias subtraction. Return is an interpolation function that gives correction as a function of ADU. """ nl = np.loadtxt(filename) ifun = interpolate.interp1d(nl[:, 0], nl[:, 1], axis=0, bounds_error=False, fill_value=(nl[0, 1], nl[-1, 1])) return ifun
[docs]def attitude(filename): """Reads the CHEOPS attitude file and puts the data into a N-by-4 array with spacecraft mjd, ra, dev, and roll angle. """ with fits.open(filename) as hdul: outparam = np.zeros((hdul[1].header['NAXIS2'], 4)) outparam[:, 0] = hdul[1].data['MJD_TIME'] outparam[:, 1] = hdul[1].data['SC_RA'] outparam[:, 2] = hdul[1].data['SC_DEC'] outparam[:, 3] = hdul[1].data['SC_ROLL_ANGLE'] return outparam
[docs]def starcat(filename, colstr, entry=0): """Reads star catalogue file and returns value for column string colstr and entry row """ with fits.open(filename) as hdul: val = hdul[1].data[colstr][entry] return val
def raw_param(filename, data_index, param_name): """Reads the specific sensor from the CHEOPS sa raw file. """ with fits.open(filename) as hdul: ret_param = np.asarray(hdul[data_index].data[param_name]) return ret_param def ron(filename): """Reads the readout noise table entry from the CHEOPS calibratared subarray cube file. Returns a single value (in ADU) """ return np.nanmedian(raw_param(filename, data_index=2, param_name='RON'))
[docs]def gain(file_hk, file_gain): """Compute gain using HK parameters and the gain reference table Returns gain in units of e/ADU """ with fits.open(file_hk) as hdul: data = hdul[1].data volt_vod = data['VOLT_FEE_VOD'] volt_vrd = data['VOLT_FEE_VRD'] volt_vog = data['VOLT_FEE_VOG'] volt_vss = data['VOLT_FEE_VSS'] temp_ccd = data['VOLT_FEE_CCD'] with fits.open(file_gain) as hdul: data = hdul[1].data hdr = hdul[1].header vod_off = hdr['VOD_OFF'] vrd_off = hdr['VRD_OFF'] vog_off = hdr['VOG_OFF'] vss_off = hdr['VSS_OFF'] temp_off = hdr['TEMP_OFF'] gain_nom = hdr['GAIN_NOM'] gain_fact = data['FACTOR'] exp_vod = data['EXP_VOD'] exp_vrd = data['EXP_VRD'] exp_vog = data['EXP_VOG'] exp_vss = data['EXP_VSS'] exp_temp = data['EXP_TEMP'] gain_vec = gain_nom * (1 + np.sum(gain_fact[None, :] * (volt_vss[:, None] - vss_off) ** exp_vss[ None, :] * (volt_vod[:, None] - volt_vss[:, None] - vod_off) ** exp_vod[ None, :] * (volt_vrd[:, None] - volt_vss[:, None] - vrd_off) ** exp_vrd[ None, :] * (volt_vog[:, None] - volt_vss[:, None] - vog_off) ** exp_vog[ None, :] * (temp_ccd[:, None] + temp_off) ** exp_temp[None, :], axis=1)) return 1 / gain_vec
[docs]def thermFront_2(filename): """Reads frontTemp_2 sensor data from the CHEOPS raw file. """ return raw_param(filename, data_index=9, param_name='thermFront_2')
[docs]def mjd2bjd(filename): with fits.open(filename) as hdul: mjd = np.asarray(hdul[2].data['MJD_TIME']) bjd = np.asarray(hdul[2].data['BJD_TIME']) ifun = interpolate.interp1d(mjd, bjd, bounds_error=False, fill_value='extrapolate') return ifun
[docs]def sub_image_indices(offset, size): """Helper function that computes index ranges given a 2D offset and a 2D size """ i0 = int(offset[0]) i1 = int(i0 + size[0]) j0 = int(offset[1]) j1 = int(j0 + size[1]) return i0, i1, j0, j1
[docs]def flatfield(filename, Teff, offset, size): """Reads the flatfield cube and interpolates the flatfield temperatures to the given temperature. The part of the detector that is returned is defined by the offset and size (in 2D pixel coordinates). """ with fits.open(filename) as hdul: T = hdul[2].data['T_EFF'] T = T[hdul[2].data['DATA_TYPE'] == 'FLAT FIELD'] idx = np.searchsorted(T, Teff, side="left") a = (Teff - T[idx]) / (T[idx + 1] - T[idx]) i0, i1, j0, j1 = sub_image_indices(offset, size) ff0 = hdul[1].data[idx, j0:j1, i0:i1] ff1 = hdul[1].data[idx + 1, j0:j1, i0:i1] return ff0 * (1 - a) + ff1 * a
def dark(darkpath, mjd, offset, size): """Traverses darkpath directory, looking for all dark current files and picks the one closest in time """ darkfiles = [] mjds = [] for root, dirs, files in os.walk(darkpath): for file in files: if 'MCO_REP_DarkFrameFullArray' in file: filename = os.path.join(root, file) darkfiles.append(filename) with fits.open(filename) as hdul: mjds.append(hdul[1].header['V_STRT_M']) if len(mjds) < 1: raise ValueError('Missing dark frame reference file. You may turn off ' 'this feature by setting `pps.darksub = False`.') ind = np.argmin(np.abs(np.array(mjds) - mjd)) i0, i1, j0, j1 = sub_image_indices(offset, size) with fits.open(darkfiles[ind]) as hdul: dark = hdul[1].data[0, j0:j1, i0:i1] dark_err = hdul[1].data[1, j0:j1, i0:i1] return dark, dark_err # return np.zeros(size), np.zeros(size)
[docs]def imagette_offset(filename, frame_range=None): """Returns the first imagette offset from an imagette fits-file cube; first offset is relative to full array, second offset is relative to subarray """ with fits.open(filename) as hdul: x_off = hdul[2].data['X_OFF_FULL_ARRAY'][0] y_off = hdul[2].data['Y_OFF_FULL_ARRAY'][0] x_sa_off = hdul[2].data['X_OFF_SUB_ARRAY'][0] y_sa_off = hdul[2].data['Y_OFF_SUB_ARRAY'][0] return (x_off, y_off), (x_sa_off, y_sa_off)
# raise Exception('[imagette_offset] Error: {:s} not found'.format(filename))
[docs]def save_eigen_fits(filename, t, bjd, sc, err, bg, roll, xc, yc, flag, w, thermFront_2, header): """Save lightcurve data as defined by arguments to fits table in binary format. Coefficients for the principle components of the PSF eigen analysis are also added, as well as the thermFront_2 values, to be used in de-correlations. """ c = [] c.append(fits.Column(name='MJD_TIME', format='D', unit='day', array=t)) c.append(fits.Column(name='BJD_TIME', format='D', unit='day', array=bjd)) c.append(fits.Column(name='FLUX', format='D', unit='electrons', array=sc)) c.append( fits.Column(name='FLUXERR', format='D', unit='electrons', array=err)) c.append(fits.Column(name='BG', format='D', unit='electrons/pix', array=bg)) c.append(fits.Column(name='ROLL', format='D', unit='deg', array=roll)) c.append(fits.Column(name='XC', format='D', unit='pix', array=xc)) c.append(fits.Column(name='YC', format='D', unit='pix', array=yc)) c.append(fits.Column(name='FLAG', format='I', array=flag)) for n in range(w.shape[1]): c.append(fits.Column(name='U{:d}'.format(n), format='D', array=w[:, n])) c.append(fits.Column(name='thermFront_2', format='D', array=thermFront_2)) tab = fits.BinTableHDU.from_columns(c, header=header) tab.writeto(filename, overwrite=True, checksum=True)
[docs]def save_binary_eigen_fits(filename, t, bjd, sc0, sc1, bg, roll, xc0, yc0, xc1, yc1, flag, w0, w1, thermFront_2, header): """Save lightcurve data from both componentes of a binary, as defined by arguments to fits table in binary format. Coefficients for the principle components of both stars from the PSF eigen analysis are also added, as well as the thermFront_2 values, to be used in de-correlations. """ c = [] c.append(fits.Column(name='MJD_TIME', format='D', unit='day', array=t)) c.append(fits.Column(name='BJD_TIME', format='D', unit='day', array=bjd)) c.append(fits.Column(name='FLUX0', format='D', unit='electrons', array=sc0)) c.append(fits.Column(name='FLUX1', format='D', unit='electrons', array=sc1)) c.append(fits.Column(name='BG', format='D', unit='electrons/pix', array=bg)) c.append(fits.Column(name='ROLL', format='D', unit='deg', array=roll)) c.append(fits.Column(name='XC0', format='D', unit='pix', array=xc0)) c.append(fits.Column(name='YC0', format='D', unit='pix', array=yc0)) c.append(fits.Column(name='XC1', format='D', unit='pix', array=xc1)) c.append(fits.Column(name='YC1', format='D', unit='pix', array=yc1)) c.append(fits.Column(name='FLAG', format='I', array=flag)) for n in range(w0.shape[1]): c.append( fits.Column(name='U{:d}'.format(n), format='D', array=w0[:, n])) for n in range(w1.shape[1]): c.append( fits.Column(name='W{:d}'.format(n), format='D', array=w1[:, n])) c.append(fits.Column(name='thermFront_2', format='D', array=thermFront_2)) tab = fits.BinTableHDU.from_columns(c, header=header) tab.writeto(filename, overwrite=True, checksum=True)
def save_txt(filename, t, flux, err, bg, roll, xc, yc): """Save lightcurve to textfile according to arrays defined by arguments """ X = np.array([t, flux, err, bg, roll, xc, yc]).T fmt = '%26.18e' np.savetxt(filename, X=X, fmt=fmt)