Source code for pipe.pipe_control

# -*- coding: utf-8 -*-
"""
Created on Tue Jun  2 22:57:13 2020

@author: Alexis Brandeker, alexis@astro.su.se

Top level routines for using PIPE, to be called by scripts etc.
"""
import os
import pickle
import numpy as np
from .psf_phot import PsfPhot
from .read import lightcurve
from .make_multi_psf import MultiPSFMaker
from .spline_pca import SplinePCA


[docs]class PipeControl(): def __init__(self, pipe_params): self.pps = pipe_params # Set of parameters to be used self.pp = None # PsfPhot object initialised by class self.sa_psf_cube0 = None # Parameter used for binary PSF derivation self.sa_psf_cube1 = None # Parameter used for binary PSF derivation self.im_psf_cube0 = None # Parameter used for binary PSF derivation self.im_psf_cube1 = None # Parameter used for binary PSF derivation
[docs] def pre_proc(self, pproc=True): """Initialise PsfPhot class and do pre-processing on data (i.e. basic processing before photometric PSF extraction). If pproc is False, the pre-processing step is skipped. """ os.makedirs(self.pps.outdir, exist_ok=True) self.pp = PsfPhot(self.pps) self.pp.mess(f'--- {self.pps.name}/{self.pps.visit} (version {self.pps.version})') if pproc: self.pp.pre_process()
[docs] def process_eigen(self): """Process PSF photometry as defined by the pipe_param parameters, on both subarrays and imagettes (if they exist). klip is how many principal components should be used in PSF fit. sigma_clip is the clipping factor to mask bad pixels. For poor PSF fits, sigma_clip should be high in order to not mask perfectly valid but poorly fit pixels. """ self.process_eigen_sa() if self.pps.file_im is not None: self.process_eigen_im()
[docs] def process_eigen_sa(self): """Process PSF photometry as defined by the pipe_param parameters, on subarrays. klip is how many principal components should be used in PSF fit. sigma_clip is the clipping factor to mask bad pixels. For poor PSF fits, sigma_clip should be high in order to not mask perfectly valid but poorly fit pixels. """ if self.pp is None: self.pre_proc() self.sa_scale, self.sa_bg, self.sa_flux, self.sa_err, self.sa_sel, self.sa_w = \ self.pp.process_eigen_sa()
[docs] def process_eigen_im(self): """Process PSF photometry as defined by the pipe_param parameters, on imagettes. klip is how many principal components should be used in PSF fit. sigma_clip is the clipping factor to mask bad pixels. For poor PSF fits, sigma_clip should be high in order to not mask perfectly valid but poorly fit pixels. """ if self.pp is None: self.pre_proc() self.im_scale, self.im_bg, self.im_flux, self.im_err, self.im_sel, self.im_w = \ self.pp.process_eigen_im()
[docs] def make_psf_lib(self, lib_num=None, klip=None, phase=None, sub_orbits=1): """Produces a library of PSFs. The data series is divided into orbits, with one PSF determined per orbit. If phase is defined, the orbits are selected to start at phase rather than the first data point. sub_orbits can be defined to divide the orbits into smaller data sets, to e.g. check if there are systematic PSF differences with roll angle. """ if self.pp is None: self.pre_proc() pm = MultiPSFMaker(self.pp, max_threads=self.pps.nthreads) sa_ranges = pm.find_ranges(phase=phase, sub_orbits=sub_orbits) if klip is None: klip = len(sa_ranges) if lib_num is None: lib_num = self.pp.find_next_lib_num(self.pp.psf_name) return pm.prod_psf(sa_ranges[:klip], lib_num)
[docs] def combine_psf_lists(self, visit_list, lib_num=0): """From a list of visits, this method reads the corresponding PSF library files and merges them, with the merged list of PSFs returned. This list can then be used to derive an eigen library of principal PSF components. """ if self.pp is None: self.pre_proc(pproc=False) psf_lib = [] for visit in visit_list: self.pp.pps.visit = visit file_name = self.pp.psf_name(lib_num) with open(file_name, 'rb') as fp: psf_lib.extend(pickle.load(fp)) return psf_lib
[docs] def make_eigen(self, psf_lib, out_num): """From a list of PSFs, produce a list of principal component functions and save in a file ("eigen library"), enumerated by out_num. The eigen library list is also returned. """ if self.pp is None: self.pre_proc(pproc=False) sp = SplinePCA(psf_lib) eigen_lib = sp.get_eigen_spline_lib() filename = self.pp.eigen_name(out_num) with open(filename, 'wb') as fp: pickle.dump(eigen_lib, fp) self.pp.mess('--- {:d} PSF eigen functions produced, saved to \'{:s}\''.format( len(eigen_lib), filename)) return eigen_lib
[docs] def load_drp(self, desc='DEFAULT'): """Reads lightcurve extracted by the CHEOPS Data Reduction Pipeline. Returns DRP dict. """ if desc == 'DEFAULT': return lightcurve(self.pps.file_lc_default) if desc == 'OPTIMAL': return lightcurve(self.pps.file_lc_optimal) if desc == 'RINF': return lightcurve(self.pps.file_lc_rinf) if desc == 'RSUP': return lightcurve(self.pps.file_lc_rsup) raise Exception(f'[load_drp] Error: {desc} not defined')
[docs] def load_lc(self, postfix='_sa'): """Load lightcurve, returns dict data structure (if it exists) """ filename = os.path.join(self.pps.outdir, f'{self.pps.name}_{self.pps.visit}{postfix}.fits') return lightcurve(filename)
[docs] def load_sa(self): """Load subarray lightcurve, returns dict data structure (if it exists) """ return self.load_lc(postfix='_sa')
[docs] def load_im(self): """Load imagette lightcurve, returns dict data structure (if it exists) """ return self.load_lc(postfix='_im')
#----------- Methods for binary extractions below
[docs] def pre_binary(self): """Initialise PsfPhot class and do pre-processing on data (i.e. basic processing before photometric PSF extraction). """ self.pps.binary = True os.makedirs(self.pps.outdir, exist_ok=True) self.pp = PsfPhot(self.pps) self.pp.mess(f'--- Binary {self.pps.name}/{self.pps.visit} (version {self.pps.version})') self.pp.pre_binary()
[docs] def process_binary(self): """Extract photometry of a binary from the subarrays and the imagettes, if they exist. Saves PSF models for the two components to class variables. """ self.pps.binary = True if self.pp is None: self.pre_binary() self.sa_psf_cube0, self.sa_psf_cube1, self.sa_bg = \ self.pp.process_binary_sa() if self.pps.file_im is not None: self.im_psf_cube0, self.im_psf_cube1, self.im_bg = \ self.pp.process_binary_im()
[docs] def make_binary_psfs(self, lib_num, klip=None, phase=None, sub_orbits=1): """Produces one library of PSFs for each component of a binary. The data series is divided into orbits, with one PSF determined per orbit. if phase is defined, the orbits are selected to start at phase rather than the first data point. sub_orbits can be defined to divide the orbits into smaller data sets, to e.g. check if there are systematic PSF differences with roll angle. """ # An implementation trick used to derive PSFs for the primary and # secondary separately, is to produce PSF models for both # components and first subtract the secondary from the data # cube, derive the primary PSF, and vice versa subtract the # primary from the data cube to derive the secondary PSF. # To have a good initial PSF estimate for the first iteration # is therefore important for proper subtraction without inducing # too much PSF noise. if self.sa_psf_cube0 is None: self.process_binary() pm = MultiPSFMaker(self.pp, max_threads=self.pps.nthreads) sa_ranges = pm.find_ranges(phase=phase, sub_orbits=sub_orbits) if klip is None: klip = len(sa_ranges) def add_noise_sa(mc): if self.pps.bgstars: mc += 2*self.pp.sa_bgstars if self.pps.smear_corr: mc += np.abs(self.pp.sa_smear[:, None, :]) if self.pps.darksub: mc += self.pp.sa_dark if self.pps.remove_static: mc += np.abs(self.pp.sa_stat_res) return mc def add_noise_im(mc): if self.pps.bgstars: mc += 2*self.pp.im_bgstars if self.pps.smear_corr: mc += np.abs(self.pp.im_smear[:, None, :]) if self.pps.darksub: mc += self.pp.im_dark if self.pps.remove_static: mc += np.abs(self.pp.im_stat_res) return mc # Prepare for producing PSF of primary self.pp.sa_xc = self.pp.sa_xc0 self.pp.sa_yc = self.pp.sa_yc0 model_cube = (np.abs(self.sa_psf_cube0) + 2*np.abs(self.sa_psf_cube1) + self.sa_bg[:, None, None]) self.pp.sa_noise = self.pp.psf_noise_sa(add_noise_sa(model_cube)) self.pp.sa_sub -= self.sa_psf_cube1 if self.pps.file_im is not None: self.pp.im_xc, self.pp.im_yc = self.pp.im_xc0, self.pp.im_yc0 model_cube = (np.abs(self.im_psf_cube0) + 2*np.abs(self.im_psf_cube1) + self.im_bg[:, None, None]) self.pp.im_noise = self.pp.psf_noise_im(add_noise_sa(model_cube)) self.pp.im_sub -= self.im_psf_cube1 pm = MultiPSFMaker(self.pp, max_threads=self.pps.nthreads) # Produce primary PSF list pm.prod_psf(sa_ranges[:klip], lib_num=lib_num) # Prepare for producing PSF of secondary self.pp.sa_sub += self.sa_psf_cube1 self.pp.sa_xc, self.pp.sa_yc = self.pp.sa_xc1, self.pp.sa_yc1 model_cube = (2*np.abs(self.sa_psf_cube0) + np.abs(self.sa_psf_cube1) + self.sa_bg[:, None, None]) self.pp.sa_noise = self.pp.psf_noise_sa(add_noise_sa(model_cube)) self.pp.sa_sub -= self.sa_psf_cube0 pm = MultiPSFMaker(self.pp, max_threads=self.pps.nthreads) # Don't use imagettes for secondary pm.im = False # Produce secondary PSF list pm.prod_psf(sa_ranges[:klip], lib_num=lib_num) # Restate self.pp.sa_xc, self.pp.sa_yc = self.pp.sa_xc0, self.pp.sa_yc0 self.pp.sa_sub += self.sa_psf_cube0 model_cube = (np.abs(self.sa_psf_cube0) + np.abs(self.sa_psf_cube1) + self.sa_bg[:, None, None]) self.pp.sa_noise = self.pp.psf_noise_sa(add_noise_sa(model_cube)) if self.pps.file_im is not None: self.pp.im_sub += self.im_psf_cube1 model_cube = (np.abs(self.im_psf_cube0) + np.abs(self.im_psf_cube1) + self.im_bg[:, None, None]) self.pp.im_noise = self.pp.psf_noise_im(add_noise_im(model_cube))
[docs] def load_binary_sa(self): """Load binary subarray lightcurve, returns dict data structure (if it exists) """ return self.load_lc(postfix='_binary_sa')
[docs] def load_binary_im(self): """Load binary imagette lightcurve, returns dict data structure (if it exists) """ return self.load_lc(postfix='_binary_im')