Source code for lephare.process

import os
import shutil
import time

import numpy as np

import lephare as lp

__all__ = ["object_types", "process", "table_to_data", "calculate_offsets_from_input", "load_sed_list"]

[docs] object_types = ["STAR", "GAL", "QSO"]
[docs] def process( config, input, col_names=None, standard_names=False, filename=None, write_outputs=False, ): """Run all required steps to produce photometric redshift estimates Parameters ========== config : dict of lephare.keyword The configuration for the run input : astropy.table.Table The input table which must satisfy column name requirements depending on other optional inputs. col_names : list Input catalogue column names. We will use ordering to determine meaning standard_names : bool If true we assume standard names. filename : str Output file name for the output catalogue. write_outputs : bool Whether to write the output spectra, PDF, and ascii file if specified in the config. By default these are not written to save space. Returns ======= output : astropy.table.Table The output table. photozlist : list of lephare.onesource List of lephare onesource objects. """ # ensure that all values in the keymap are keyword objects config = lp.all_types_to_keymap(config) id, flux, flux_err, context, zspec, string_data = table_to_data( config, input, col_names=col_names, standard_names=standard_names ) ng = len(id) n_filters = len(config["FILTER_LIST"].value.split(",")) print(f"Processing {ng} objects with {n_filters} bands") photz = lp.PhotoZ(config) if config["AUTO_ADAPT"].split_bool("", 1): print("AUTO_ADAPT is set to YES. Computing offsets.") # Loop over all ng galaxies! srclist = [] for i in range(ng): one_obj = lp.onesource(i, photz.gridz) one_obj.readsource(str(id[i]), flux[i], flux_err[i], context[i], zspec[i], str(string_data[i])) photz.prep_data(one_obj) srclist.append(one_obj) # compute the offset, depending on the option in the code (AUTO_ADAPT, or APPLY_SYSSHIFT a0 = photz.compute_offsets(srclist) else: a0 = np.zeros(len(config["FILTER_LIST"].value.split(","))) offsets = ",".join(np.array(a0).astype(str)) offsets = "# Offsets added to the modeled magnitudes (or substracted to the observed): " + offsets + "\n" print(offsets) # create the onesource objects photozlist = [] for i in range(ng): one_obj = lp.onesource(i, photz.gridz) one_obj.readsource(str(id[i]), flux[i], flux_err[i], context[i], zspec[i], str(string_data[i])) photz.prep_data(one_obj) photozlist.append(one_obj) # Perform the main run photz.run_photoz(photozlist, a0) # Write outputs if requested if write_outputs: photz.write_outputs(photozlist, int(time.time())) output = photz.build_output_tables(photozlist, para_out=None, filename=filename) # Return the table and all the onesource objects return output, photozlist
[docs] def calculate_offsets_from_input(config, input, col_names=None, standard_names=False): """Calculate the zero point offsets for objects with spectroscopic redshifts We want to have this available as an independent method so that it can be called as a training stage prior to running at scale. Parameters ========== config : dict of lephare.keyword The configuration for the run input : astropy.table.Table The input table which must satisfy column name requirements depending on other optional inputs. col_names : list Input catalogue column names. We will use ordering to determine meaning standard_names : bool If true we assume standard names. Returns ======= a0 : np.array Offsets a0 """ config = lp.all_types_to_keymap(config) id, flux, flux_err, context, zspec, string_data = table_to_data( config, input, col_names=col_names, standard_names=standard_names ) ng = len(id) n_filters = len(config["FILTER_LIST"].value.split(",")) print(f"Processing {ng} objects with {n_filters} bands") photz = lp.PhotoZ(config) # Loop over all ng galaxies! srclist = [] for i in range(ng): one_obj = lp.onesource(i, photz.gridz) one_obj.readsource(str(id[i]), flux[i], flux_err[i], context[i], zspec[i], str(string_data[i])) photz.prep_data(one_obj) srclist.append(one_obj) a0 = photz.compute_offsets(srclist) offsets = ",".join(np.array(a0).astype(str)) offsets = "Offsets from auto-adapt: " + offsets + "\n" print(offsets) return a0
[docs] def table_to_data(config, input, col_names=None, standard_names=False): """Take an astropy table and return the arrays required for a run. We assume that either the columns are in the standard LePHARE order or that the user provides the colnames in the standard order or they use a standard naming convention and set the standard_names=True The column names are id, f_filtername, ferr_filtername (for all filters) and context, zspec, string_input. Parameters ========== config : dict of lephare.keyword The config keymap. We need this to know if we have magnitudes or fluxes and to get the filter names. input : astropy.table.Table The input catalogue. col_names : list of str The column names in the default order. standard_names : bool If this is set we assume a standard column naming convention. Returns ======= id : np.array The object ids flux : np.array The object fluxes or magnitudes flux_err : np.array The flux or magnitude errors context : np.array The context determining flux usage zspec : np.array The spectroscopic redshifts. string_input : np.array Additional notes as a string. """ config = lp.all_types_to_keymap(config) n_filters = len(config["FILTER_LIST"].value.split(",")) if col_names is not None: print("Using user defined column names based on ordering.") assert len(input.colnames) == 2 * n_filters + 4 elif standard_names: print( "Using standard column names id, f_filtername,ferr_filtername,... context, zspec, string_input." ) col_names = ["id"] for f in config["FILTER_LIST"].value.split(","): col_names += [f"f_{f}"] col_names += [f"ferr_{f}"] col_names += ["context"] col_names += ["zspec"] col_names += ["string_input"] else: print("Using user columns from input table assuming they are in the standard order.") assert len(input.colnames) == 2 * n_filters + 4 col_names = list(np.full(2 * n_filters + 4, "")) col_names[0] = input.colnames[0] col_names[1 : 2 * n_filters : 2] = input.colnames[1 : 2 * n_filters : 2] col_names[2 : 2 * n_filters + 1 : 2] = input.colnames[2 : 2 * n_filters + 1 : 2] col_names[-3] = input.colnames[-3] col_names[-2] = input.colnames[-2] col_names[-1] = input.colnames[-1] # print(col_names) assert len(col_names) == 2 * n_filters + 4 id = [str(i) for i in input[col_names[0]]] flux = input[col_names[1 : 2 * n_filters : 2]] flux = np.array([np.array(flux[c]) for c in flux.colnames]) flux = flux.T flux_err = input[col_names[2 : 2 * n_filters + 1 : 2]] flux_err = np.array([np.array(flux_err[c]) for c in flux_err.colnames]) flux_err = flux_err.T context = input[col_names[-3]] zspec = input[col_names[-2]] string_data = input[col_names[-1]] # Perform basic checks on table # Replace nans with -99 mask = np.isnan(flux) mask |= np.isnan(flux_err) flux[mask] = -99.0 flux_err[mask] = -99.0 return id, flux, flux_err, context, zspec, string_data
[docs] def load_sed_list(path_to_list, object_type, absolute_paths=False): """Take a sed list and move it with the sed files to the work directory. We want to be able to load SED lists from other sources. These may use absolute paths or relative to the list file. They must adopt the lepahre standard which is relative to the sed/$TYPE/ directory Parameters ========== path_to_list : str The path to the SED list file. This can be absolute or relative. object_type : {'STAR', 'GAL', 'QSO'} The type of object. Must be one of "STAR", "GAL", or "QSO". absolute_paths : bool Set to True if list of absolute paths and not relative to list file Returns ======= new_config_val : str The location of the new list file with respect to current LEPHAREWORK directory. This is the value to set in the config. """ # The type must by one of the three lephare standards assert object_type in object_types # The original file must exist assert os.path.isfile(path_to_list) # Set folder and file names. name = path_to_list.split("/")[-1].split(".")[0] old_dir = os.path.dirname(path_to_list) new_dir = f"{lp.dm.lephare_dir}/sed/{object_type}/{name}" new_list_file = f"{new_dir}/{name}.list" # Create the config value that needs to be passed to lephare. new_config_val = f"sed/{object_type}/{name}" if os.path.exists(new_list_file): print( f"List file already exists at {new_list_file}\n" "The SEDs may already have been loaded to the work directory." ) return new_config_val # Get the list of sed files. with open(path_to_list, "r") as f: lines = f.readlines() sed_files = [] for line in lines: # Ignore commented files if line.strip()[0] != "#": sed_files.append(line.replace("\n", "")) # Make the folder to hold the seds. assert not os.path.isdir(new_dir) print( f"Creating new sed directory at {new_dir}.\n" "Attempting to copy sed files and list files in place." ) os.makedirs(new_dir) # Make the new list of files with paths from the type folder new_list = "" for sed_file in sed_files: new_file = f"{sed_file.split('/')[-1]}" new_list += f"{name}/{new_file}\n" # Copy files if absolute_paths: shutil.copyfile(sed_file, f"{new_dir}/{new_file}") else: shutil.copyfile(f"{old_dir}/{sed_file.split('/')[-1]}", f"{new_dir}/{new_file}") # Open the list file for writing and add all the seds with open(new_list_file, "w") as file: # Define the data to be written file.write(new_list) return new_config_val