Source code for pipe.analyse

# -*- coding: utf-8 -*-
"""
Created on Wed Mar 25 11:00:44 2020

@author: Alexis Brandeker, alexis@astro.su.se

A collection of routines and tools that analyse both extracted
and non-extracted data in various ways, including rudimentary aperture
photometry.

"""
import numpy as np
import os
from .read import lightcurve


[docs]def mad(series): """Compute normalised mean absolute deviation (MAD) for an array, return in ppm """ return np.nanmedian(np.abs(np.diff(series/np.nanmedian(series))))*1e6
def find_orbits(roll, phase=None, sub_orbits=1): """Finds the indices for each orbit repeat, starting with the first roll angle. Assumes that roll angles decrease, and that part of the orbit has roll angles higher than the first one. Orbits can start at a different phase than the first element, if defined. sub_orbits defines how many suborbits each orbit should be divided to. """ orbit_index = [] if phase is None: orbit_index.append(0) phase0 = roll[0] else: phase0 = phase switch = False for n in range(1,len(roll)): if switch and roll[n] <= phase0: orbit_index.append(n) switch = False switch = roll[n] > roll[n-1] or switch if len(orbit_index) <= 2: orbit_index = (0, len(roll)-1) if sub_orbits > 1: suborb_list = [] dorb = np.array(np.diff(orbit_index)/sub_orbits, dtype='int') for n in range(len(orbit_index)-1): for m in range(sub_orbits): suborb_list.append(orbit_index[n]+m*dorb[n]) suborb_list.append(orbit_index[-1]) orbit_index = np.array(suborb_list) return orbit_index
[docs]def psf_phot_cube(data_cube, noise_cube, psf_cube, bg, apt): """Use defined PSF to integrate flux and derive error. Only region inside apt is used (can be cube with apt defined for each frame, or single image for the same region in every frame to be used). """ N = len(data_cube) f = np.zeros(N) e = np.zeros(N) if apt.ndim == 2: sapt = apt for n in range(N): if apt.ndim > 2: sapt = apt[n] frame = data_cube[n]-bg[n] f[n], e[n] = phot(frame, noise_cube[n], psf_cube[n], sapt) return f, e
def phot(frame, noise, psf, apt): """Use defined PSF to integrate flux and derive error. Only region inside apt is used. """ npsf = np.abs(psf[apt]) s1 = np.sum(npsf*frame[apt]/noise[apt]**2) s2 = np.sum(npsf**2/noise[apt]**2) if s2 > 0: return s1/s2, (1/s2)**0.5 print('WARNING [analyse/phot]: Empty flux integration encountered') return 0, -1 def apt_phot_cube(data_cube, noise_cube, radius, xc, yc): """Do aperture photometry on data cube, assuming the background has already been subtracted. A circular aperture of radius centered on xc, yc is used with a linearly interpolated edge. Returns the flux and its std noise as arrays. """ flux = np.zeros(len(data_cube)) noise = flux.copy() apt_coo = np.linspace(-radius,radius,2*radius+1) xmat, ymat = np.meshgrid(apt_coo, apt_coo) apt = xmat**2+ymat**2 <= radius**2 full_shape = data_cube[0].shape apt_list = {} for n in range(len(data_cube)): frame = data_cube[n] std = noise_cube[n]**2 x0 = int(xc[n]) y0 = int(yc[n]) tx = xc[n]-x0 ty = yc[n]-y0 for i in range(2): ind0 = x0 + i for j in range(2): ind1 = y0 + j if (ind0, ind1) not in apt_list: apt_list[(ind0, ind1)] = make_full_apt(full_shape, apt, ind0, ind1) # Do integer aperture photometry centered on pixels surrounding # the float value centre, and then bi-linearly interpolate. ind00 = apt_list[(x0, y0)] ind01 = apt_list[(x0, y0+1)] ind10 = apt_list[(x0+1, y0)] ind11 = apt_list[(x0+1, y0+1)] flux[n] = ((1-tx) * (1-ty) * np.sum(frame[ind00]) + tx * (1-ty) * np.sum(frame[ind10]) + (1-tx) * ty * np.sum(frame[ind01]) + tx * ty * np.sum(frame[ind11])) noise[n] = ((1-tx) * (1-ty) * np.sum(std[ind00]**2) + tx * (1-ty) * np.sum(std[ind10]**2) + (1-tx) * ty * np.sum(std[ind01]**2) + tx * ty * np.sum(std[ind11]**2))**0.5 return flux, noise def make_full_apt(full_shape, apt, ixc, iyc): """Help function to apt_phot_cube, produces an aperture mask of shape full_shape by placing the aperture apt centered on integer pixel coordinates according to ixc, iyc. """ full_apt = np.zeros(full_shape, dtype='?') radius = int((apt.shape[0]-1)/2) full_apt[(iyc-radius):(iyc+radius+1), (ixc-radius):(ixc+radius+1)] = apt return full_apt
[docs]def sigma_clip(data, clip=5, niter=5): """A simple 1D sigma-clipping routine that returns a boolean array of indices to non-clipped data """ ind = np.ones(data.shape, dtype='?') for _n in range(niter): sigma = np.std(data[ind]) m = np.median(data[ind]) ind = np.abs(data-m) < clip*sigma return ind
def load_lc(name, visit, version=0, postfix=''): """Load lightcurve, returns dict data structure """ from .config import conf filename = os.path.join(conf.data_root, name, visit, 'Outdata', '{:05d}'.format(version), f'{name}_{visit}{postfix}.fits') return lightcurve(filename) def load_sa(name, visit, version=0, postfix=''): """Load subarray lightcurve, returns dict data structure (if it exists) """ return load_lc(name, visit, version, postfix=f'{postfix}_sa') def load_im(name, visit, version=0, postfix=''): """Load imagette lightcurve, returns dict data structure (if it exists) """ return load_lc(name, visit, version, postfix=f'{postfix}_im') def load_binary_sa(name, visit, version=0, postfix=''): """Load binary subarray lightcurve, returns dict data structure (if it exists) """ return load_sa(name, visit, version, postfix='_binary') def load_binary_im(name, visit, version=0, postfix=''): """Load binary imagette lightcurve, returns dict data structure (if it exists) """ return load_im(name, visit, version, postfix='_binary') def load_drp(name, visit, desc='DEFAULT'): """Reads lightcurve extracted by the CHEOPS Data Reduction Pipeline. Returns DRP dict, if found. """ from .config import conf datapath = os.path.join(conf.data_root, name, visit) def find_file(substring): for file in os.listdir(datapath): if substring in file: return os.path.join(datapath, file) raise Exception(f'[load_drp] Error: \"{substring}\" not found') filename = find_file('SCI_COR_Lightcurve-{:s}'.format(desc)) return lightcurve(filename)