# coding: utf-8
"""
Analysis tasks.
"""
__all__ = []
import os
from shutil import move
from copy import deepcopy as copy
import json
import math
import itertools
from collections import OrderedDict
import tabulate
import numpy as np
from ctypes import c_double
import uproot
import law
import luigi
from analysis_tools.utils import (
import_root, create_file_dir, randomize
)
from cmt.base_tasks.base import ConfigTaskWithCategory, HTCondorWorkflow, SGEWorkflow
from cmt.base_tasks.plotting import FeaturePlot
ROOT = import_root()
directions = ["up", "down"]
[docs]class FitBase():
def convert_parameters(self, d):
for param, val in d.items():
if isinstance(val, str):
if "," not in val:
d[param] = tuple([float(val)])
else:
d[param] = tuple(map(float, val.split(', ')))
else:
d[param] = val
return d
def get_x(self, x_range, blind_range=None, name="x"):
# fit range
x_range = (float(x_range[0]), float(x_range[1]))
x = ROOT.RooRealVar(name, name, x_range[0], x_range[1])
# blinded range
blind = False
if not blind_range:
return x, False
if blind_range[0] != blind_range[1]:
blind = True
blind_range = (float(self.blind_range[0]), float(self.blind_range[1]))
assert(blind_range[0] >= x_range[0] and blind_range[0] < blind_range[1] and
blind_range[1] <= x_range[1])
x.setRange("loSB", x_range[0], blind_range[0])
x.setRange("hiSB", blind_range[1], x_range[1])
x.setRange("full", x_range[0], x_range[1])
fit_range = "loSB,hiSB"
return x, blind
def get_fit(self, name, parameters, x, **kwargs):
fit_parameters = {}
params = OrderedDict()
fit_name = kwargs.pop("fit_name", "model")
if name == "voigtian":
fit_parameters["mean"] = parameters.get("mean", (0, -100, 100))
fit_parameters["gamma"] = parameters.get("gamma", (0.02, 0, 0.1))
fit_parameters["sigma"] = parameters.get("sigma", (0.001, 0, 0.1))
fit_parameters = self.convert_parameters(fit_parameters)
params["mean"] = ROOT.RooRealVar('mean', 'Mean of Voigtian', *fit_parameters["mean"])
params["gamma"] = ROOT.RooRealVar('gamma', 'Gamma of Voigtian', *fit_parameters["gamma"])
params["sigma"] = ROOT.RooRealVar('sigma', 'Sigma of Voigtian', *fit_parameters["sigma"])
fun = ROOT.RooVoigtian(fit_name, fit_name, x,
params["mean"], params["gamma"], params["sigma"])
elif name == "polynomial":
order = int(parameters.get("order", 1))
for i in range(order):
fit_parameters[f"p{i}"] = parameters.get(f"p{i}", (0, -5, 5))
fit_parameters = self.convert_parameters(fit_parameters)
for i in range(order):
params[f"p{i}"] = ROOT.RooRealVar(f'p{i}', f'p{i}', *fit_parameters[f"p{i}"])
fun = ROOT.RooPolynomial(fit_name, fit_name, x, ROOT.RooArgList(*list(params.values())))
elif name == "exponential":
# https://root.cern.ch/doc/master/classRooExponential.html
fit_parameters["c"] = parameters.get("c", (0, -2, 2))
fit_parameters = self.convert_parameters(fit_parameters)
params["c"] = ROOT.RooRealVar('c', 'c', *fit_parameters["c"])
fun = ROOT.RooExponential(fit_name, fit_name, x, params["c"])
elif name == "powerlaw":
order = int(self.fit_parameters.get("order", 1))
for i in range(order):
fit_parameters[f"a{i}"] = parameters.get(f"a{i}", (1, 0, 2))
fit_parameters[f"b{i}"] = parameters.get(f"b{i}", (0, -2, 2))
fit_parameters = self.convert_parameters(fit_parameters)
for i in range(order):
params[f'a{i}'] = ROOT.RooRealVar(f'a{i}', f'a{i}', *fit_parameters[f"a{i}"])
params[f'b{i}'] = ROOT.RooRealVar(f'b{i}', f'b{i}', *fit_parameters[f"b{i}"])
fit_fun = " + ".join([f"@{i + 1} * TMath::Power(@0, @{i + 2})"
for i in range(0, order, 2)])
fun = ROOT.RooGenericPdf(fit_name, fit_fun, ROOT.RooArgList(*list(params.values())))
return fun, params
[docs]class CombineBase(FeaturePlot, FitBase):
"""
Base task for all combine-related tasks
:param pois: Parameters-of-interest to be considered
:type pois: list of `str`
:param higgs_mass: Higgs mass to be considered inside combine
:type higgs_mass: int
:param fit_models: filename with fit models to use in the fit
:type fit_models: str
"""
pois = law.CSVParameter(default=("r",), description="parameters of interest to be considered, "
"default: r")
higgs_mass = luigi.IntParameter(default=125, description="Higgs mass to be used inside "
"combine, default: 125")
fit_models = luigi.Parameter(default="", description="filename with fit models to use "
"in the fit, default: none (binned fit)")
[docs] def get_output_postfix(self, **kwargs):
self.process_group_name = kwargs.pop("process_group_name", self.process_group_name)
process_group_name = "" if self.process_group_name == "default" else "_{}".format(
self.process_group_name)
region_name = "" if not self.region else "_{}".format(self.region.name)
return process_group_name + region_name
[docs]class CombineCategoriesTask(CombineBase):
"""
Base task for all combine-related tasks with multiple categories
:param category_names: names of categories to run
:type category_names: list of `str`
:param combine_categories: whether to run on the combined datacard or per category
:type combine_categories: bool
"""
category_names = law.CSVParameter(default=("base",), description="names of categories "
"to run, default: (base,)")
combine_categories = luigi.BoolParameter(default=False, description="whether to run on the "
"combined datacard or per category, default: False (per category)")
[docs] def get_output_postfix(self, **kwargs):
postfix = super(CombineCategoriesTask, self).get_output_postfix()
if not self.combine_categories:
postfix += "_" + list(self.category_names)[self.branch]
return postfix
def store_parts(self):
parts = super(CombineCategoriesTask, self).store_parts()
# parts["category_name"] = "cat_combined"
del parts["category_name"]
return parts
[docs]class CreateDatacards(CombineBase):
"""
Task that creates datacards for its use inside the combine framework
:param automcstats: Value used for autoMCStats inside the datacard, -1 to avoid using it.
:type automcstats: int
:param additional_lines: Additional lines to write at the end of the datacard.
:type additional_lines: list of `str`
:param propagate_syst_qcd: Whether to propagate systematics to estimated qcd background.
:type propagate_syst_qcd: bool
:param counting: whether the datacard should consider a counting experiment
:type counting: bool
"""
automcstats = luigi.IntParameter(default=10, description="value used for autoMCStats inside "
"the datacard, -1 to avoid using it, default: 10")
additional_lines = law.CSVParameter(default=(), description="addtional lines to write at the "
"end of the datacard")
propagate_syst_qcd = luigi.BoolParameter(default=False, description="whether to propagate"
"systematics to estimated qcd background, default: False")
counting = luigi.BoolParameter(default=False, description="whether the datacard should consider "
"a counting experiment, default: False")
additional_scaling = luigi.DictParameter(description="dict with scalings to be "
"applied to processes in the datacard, ONLY IMPLEMENTED FOR PARAMETRIC FITS, default: None")
additional_scaling = {"dummy": 1} # Temporary fix, the DictParameter fails when empty
norm_syst_threshold = 0.01
norm_syst_threshold_sym = 0.01
def __init__(self, *args, **kwargs):
super(CreateDatacards, self).__init__(*args, **kwargs)
self.data_names = [p.name for p in self.processes_datasets.keys() if p.isData]
if len(self.data_names) > 1:
raise ValueError("Only 1 data process can be provided inside the process group")
self.non_data_names = [p.name for p in self.processes_datasets.keys() if not p.isData]
if self.do_qcd:
self.non_data_names.append("qcd")
if self.fit_models:
import yaml
from cmt.utils.yaml_utils import ordered_load
with open(self.retrieve_file("config/{}.yaml".format(self.fit_models))) as f:
self.models = ordered_load(f, yaml.SafeLoader)
[docs] def requires(self):
"""
Needs as input the root file provided by the FeaturePlot task
"""
if not self.fit_models and not self.counting:
return FeaturePlot.vreq(self, save_root=True, stack=True, hide_data=False, normalize_signals=False)
else: # FIXME allow counting datacards starting from FeaturePlot
reqs = {"fits": {}, "inspections": {}}
self.model_processes = []
for model, fit_params in self.models.items():
self.model_processes.append(fit_params["process_name"])
params = ", ".join([f"{param}='{value}'"
for param, value in fit_params.items() if param != "fit_parameters"])
if "fit_parameters" in fit_params:
params += ", fit_parameters={" + ", ".join([f"'{param}': '{value}'"
for param, value in fit_params["fit_parameters"].items()]) + "}"
reqs["fits"][fit_params["process_name"]] = eval(
f"Fit.vreq(self, {params}, _exclude=['include_fit'])")
reqs["inspections"][fit_params["process_name"]] = eval(
f"InspectFitSyst.vreq(self, {params}, _exclude=['include_fit'])")
# In order to have a workspace with data_obs, we replicate the first fit
# (just in case the x_range is defined) for the available data process
assert(self.data_names)
fit_params = copy(list(self.models.values())[0])
fit_params["process_name"] = self.data_names[0]
params = ", ".join([f"{param}='{value}'"
for param, value in fit_params.items() if param != "fit_parameters"])
if "fit_parameters" in fit_params:
params += ", fit_parameters={" + ", ".join([f"'{param}': '{value}'"
for param, value in fit_params["fit_parameters"].items()]) + "}"
reqs["fits"]["data_obs"] = eval(f"Fit.vreq(self, {params}, _exclude=['include_fit'])")
return reqs
[docs] def output(self):
"""
Returns, per feature, one txt storing the datacard and its corresponding root file
storing the histograms
"""
keys = ["txt"]
if not self.counting:
keys.append("root")
return {
feature.name: {
key: self.local_target("{}{}.{}".format(feature.name, self.get_output_postfix(),
key))
for key in keys
}
for feature in self.features
}
return reqs
def get_norm_systematics(self):
if self.plot_systematics:
return self.config.get_norm_systematics(self.processes_datasets, self.region)
return {}
def get_norm_systematics_from_inspect(self, feature_name):
# structure: systematics[syst_name][process_name] = syst_value
systematics = {}
for name in self.non_data_names:
path = self.input()["inspections"][name][feature_name]["json"].path
with open(path) as f:
d = json.load(f)
for syst_name, values in d.items():
up_value = values["integral"]["up"]
down_value = values["integral"]["down"]
if up_value == -999 or down_value == -999:
continue
if abs(up_value) > self.norm_syst_threshold or \
abs(down_value) > self.norm_syst_threshold:
if syst_name not in systematics:
systematics[syst_name] = {}
# symmetric?
if abs(up_value + down_value) < self.norm_syst_threshold_sym:
systematics[syst_name][name] = "{:.2f}".format(1 + up_value)
else:
systematics[syst_name][name] = "{:.2f}/{:.2f}".format(1 + up_value,
1 + down_value)
return systematics
def get_shape_systematics_from_inspect(self, feature_name):
def round_unc(num):
exp = 0
while True:
if num * 10 ** exp > 1:
return round(num, exp + 1)
exp += 1
# structure: systematics[syst_name][process_name][parameter] = syst_value
systematics = {}
for name in self.non_data_names:
path = self.input()["inspections"][name][feature_name]["json"].path
with open(path) as f:
d = json.load(f)
for syst_name, values in d.items():
for param in values:
if param == "integral":
continue
up_value = round_unc(abs(values[param]["up"]))
down_value = round_unc(abs(values[param]["down"]))
if up_value == -999 or down_value == -999:
continue
if abs(up_value) > self.norm_syst_threshold or \
abs(down_value) > self.norm_syst_threshold:
if name not in systematics:
systematics[name] = {}
if param not in systematics[name]:
systematics[name][param] = {}
systematics[name][param][syst_name] = max(up_value, down_value)
return systematics
def write_datacard(self, yields, feature, norm_systematics, shape_systematics, *args):
n_dashes = 50
region_name = "" if not self.region else "_{}".format(self.region.name)
bin_name = "{}{}".format(feature.name, region_name)
table = []
table.append(["bin", ""] + [bin_name for i in range(len(self.non_data_names))])
table.append(["process", ""] + self.non_data_names)
sig_counter = 0
bkg_counter = 1
line = ["process", ""]
for p_name in self.non_data_names:
try:
if self.config.processes.get(p_name).isSignal:
line.append(sig_counter)
sig_counter -= 1
else:
line.append(bkg_counter)
bkg_counter += 1
except ValueError: # qcd coming from do_qcd
line.append(bkg_counter)
bkg_counter += 1
table.append(line)
table.append(["rate", ""] + [yields[p_name] for p_name in self.non_data_names])
# normalization systematics
for norm_syst in norm_systematics:
line = [norm_syst, "lnN"]
for p_name in self.non_data_names:
if p_name in norm_systematics[norm_syst]:
line.append(norm_systematics[norm_syst][p_name])
else:
line.append("-")
table.append(line)
# shape systematics
for shape_syst in shape_systematics:
line = [shape_syst, "shape"]
for p_name in self.non_data_names:
if p_name in shape_systematics[shape_syst]:
line.append(1)
else:
line.append("-")
table.append(line)
fancy_table = tabulate.tabulate(table, tablefmt="plain").split("\n")
with open(create_file_dir(self.output()[feature.name]["txt"].path), "w+") as f:
for letter in ["i", "j", "k"]:
f.write("%smax *\n" % letter)
f.write(n_dashes * "-" + "\n")
f.write("shapes * {0} {1} $PROCESS $PROCESS_$SYSTEMATIC\n".format(bin_name,
self.output()[feature.name]["root"].path))
f.write(n_dashes * "-" + "\n")
f.write("{:<11} {}\n".format("bin", bin_name))
f.write("observation -1\n")
f.write(n_dashes * "-" + "\n")
for i in range(4):
f.write(fancy_table[i] + "\n")
f.write(n_dashes * "-" + "\n")
for i in range(4, len(fancy_table)):
f.write(fancy_table[i] + "\n")
if self.automcstats != -1:
f.write("* autoMCStats %s \n" % self.automcstats)
for arg in args:
f.write(arg + "\n")
def write_shape_datacard(self, feature, norm_systematics, shape_systematics,
datacard_syst_params, *args):
n_dashes = 50
region_name = "" if not self.region else "_{}".format(self.region.name)
bin_name = "{}{}".format(feature.name, region_name)
shapes_table = []
for name in self.non_data_names:
shapes_table.append(["shapes", name, bin_name, self.output()[feature.name]["root"].path,
"workspace_{0}:model_{0}".format(name)])
for name in self.data_names:
if name in self.model_processes:
# We are extracting the background from the data, so let's label it as background
# but let's be sure background is not in the process_group_name
assert not "background" in self.processes_datasets.keys()
shapes_table.append(["shapes", "background", bin_name, self.output()[feature.name]["root"].path,
"workspace_{0}:model_{0}".format(name)])
# Include shape for the data
shapes_table.append(["shapes", "data_obs", bin_name, self.output()[feature.name]["root"].path,
"workspace_data_obs:data_obs"])
table = []
process_names = self.non_data_names
for name in self.data_names:
if name in self.model_processes:
process_names.append("background")
table.append(["bin", ""] + [bin_name for i in range(len(process_names))])
table.append(["process", ""] + process_names)
sig_counter = 0
bkg_counter = 1
line = ["process", ""]
for p_name in process_names:
try:
if self.config.processes.get(p_name).isSignal:
line.append(sig_counter)
sig_counter -= 1
else:
line.append(bkg_counter)
bkg_counter += 1
except ValueError: # qcd coming from do_qcd or background coming from data
line.append(bkg_counter)
bkg_counter += 1
table.append(line)
rate_line = ["rate", ""]
for p_name in process_names:
if p_name == "background" and self.data_names[0] in self.model_processes:
# assuming it comes from data, may cause problems in certain setups
rate_line.append(1)
else:
filename = self.input()["fits"][p_name][feature.name]["json"].path
with open(filename) as f:
d = json.load(f)
rate = d[""]["integral"]
if self.additional_scaling.get(p_name, False):
rate *= float(self.additional_scaling.get(p_name))
rate_line.append(rate)
table.append(rate_line)
# normalization systematics
for norm_syst in norm_systematics:
line = [norm_syst, "lnN"]
for p_name in self.non_data_names:
if p_name in norm_systematics[norm_syst]:
line.append(norm_systematics[norm_syst][p_name])
else:
line.append("-")
table.append(line)
# shape systematics
for syst_param in datacard_syst_params:
table.append([syst_param, "param", 0.0, 1.0])
fancy_shapes_table = tabulate.tabulate(shapes_table, tablefmt="plain").split("\n")
fancy_table = tabulate.tabulate(table, tablefmt="plain").split("\n")
with open(create_file_dir(self.output()[feature.name]["txt"].path), "w+") as f:
for letter in ["i", "j", "k"]:
f.write("%smax *\n" % letter)
f.write(n_dashes * "-" + "\n")
for line in fancy_shapes_table:
f.write(line + "\n")
f.write(n_dashes * "-" + "\n")
f.write("{:<11} {}\n".format("bin", bin_name))
f.write("observation -1\n")
f.write(n_dashes * "-" + "\n")
for i in range(4):
f.write(fancy_table[i] + "\n")
f.write(n_dashes * "-" + "\n")
for i in range(4, len(fancy_table)):
f.write(fancy_table[i] + "\n")
# if self.automcstats != -1:
# f.write("* autoMCStats %s \n" % self.automcstats)
for arg in args:
f.write(arg + "\n")
def get_process_rate_for_counting(self, p_name, feature):
if p_name == "background" and self.data_names[0] in self.model_processes:
# assuming it comes from data, may cause problems in certain setups
return 1
else:
filename = self.input()["fits"][p_name][feature.name]["json"].path
with open(filename) as f:
d = json.load(f)
rate = d[""]["integral"]
weight = (0 if d[""]["integral_error"] == 0
else rate / (d[""]["integral_error"] * d[""]["integral_error"]))
if self.additional_scaling.get(p_name, False):
rate *= float(self.additional_scaling.get(p_name))
weight /= float(self.additional_scaling.get(p_name))
return rate, weight
def get_stat_unc_lines(self, feature, rates, process_names=None):
def round_unc(num):
exp = 0
while True:
if num * 10 ** exp > 1:
return round(num, exp + 1)
exp += 1
if not process_names:
process_names = self.non_data_names
table = []
for p_name in process_names:
# line = [f"{p_name}_norm", "gmN {:.2f}".format(rates[p_name][0] * rates[p_name][1])]
line = [f"{p_name}_{self.category_name}_norm",
"gmN {}".format(int(round(rates[p_name][0] * rates[p_name][1])))]
append_line = False
for name in self.non_data_names:
if name == p_name or \
p_name in [p.name for p in self.config.get_children_from_process(name)]:
if rates[p_name][1] > 0.001:
append_line = True
# line.append("{:.4f}".format(1. / rates[p_name][1]))
line.append(round_unc(1. / rates[p_name][1]))
else:
line.append("-")
if append_line:
table.append(line)
return table
def write_counting_datacard(self, feature, norm_systematics, shape_systematics, *args):
n_dashes = 50
region_name = "" if not self.region else "_{}".format(self.region.name)
bin_name = "{}".format(region_name)
table = []
process_names = self.non_data_names
for name in self.data_names:
if name in self.model_processes:
process_names.append("background")
table.append(["bin", ""] + [bin_name for i in range(len(process_names))])
table.append(["process", ""] + process_names)
sig_counter = 0
bkg_counter = 1
line = ["process", ""]
for p_name in process_names:
try:
if self.config.processes.get(p_name).isSignal:
line.append(sig_counter)
sig_counter -= 1
else:
line.append(bkg_counter)
bkg_counter += 1
except ValueError: # qcd coming from do_qcd or background coming from data
line.append(bkg_counter)
bkg_counter += 1
table.append(line)
rate_line = ["rate", ""]
rates = {}
for p_name in process_names:
rates[p_name] = self.get_process_rate_for_counting(p_name, feature)
if not self.config.processes.get(p_name).isSignal and rates[p_name][0] < 0.001:
rates[p_name] = (0.001, rates[p_name][1])
rate_line.append(round(rates[p_name][0], 3))
table.append(rate_line)
# normalization systematics
for norm_syst in norm_systematics:
line = [norm_syst, "lnN"]
for p_name in self.non_data_names:
if p_name in norm_systematics[norm_syst]:
line.append(norm_systematics[norm_syst][p_name])
else:
line.append("-")
table.append(line)
# statistical uncertainty
stat_unc_lines = self.get_stat_unc_lines(feature, rates)
table += stat_unc_lines
# # # shape systematics
# # for shape_syst in shape_systematics:
# # line = [shape_syst, "shape"]
# # for p_name in self.non_data_names:
# # if p_name in shape_systematics[shape_syst]:
# # line.append(1)
# # else:
# # line.append("-")
# # table.append(line)
fancy_table = tabulate.tabulate(table, tablefmt="plain").split("\n")
with open(create_file_dir(self.output()[feature.name]["txt"].path), "w+") as f:
for letter in ["i", "j", "k"]:
f.write("%smax *\n" % letter)
f.write(n_dashes * "-" + "\n")
f.write("{:<11} {}\n".format("bin", bin_name))
filename = self.input()["fits"]["data_obs"][feature.name]["json"].path
with open(filename) as f_data:
d = json.load(f_data)
rate = d[""]["integral"]
f.write("observation %s\n" % rate)
f.write(n_dashes * "-" + "\n")
for i in range(4):
f.write(fancy_table[i] + "\n")
f.write(n_dashes * "-" + "\n")
for i in range(4, len(fancy_table)):
f.write(fancy_table[i] + "\n")
for arg in args:
f.write(arg + "\n")
[docs] def run(self):
"""
Splits the processes into data and non-data. Per feature, loads the input histograms,
creates the output histograms and the datacard inside the txt file.
"""
inputs = self.input()
norm_systematics = self.get_norm_systematics()
for feature in self.features:
systs_directions = [("central", "")]
shape_syst_list = self.get_systs(feature, True)
systs_directions += list(itertools.product(shape_syst_list, directions))
# Convert the shape systematics list to a dict with the systs as keys and a list of
# the processes affected by them (all non-data processes except the qcd if computed
# in the code)
shape_systematics = {shape_syst: [p_name for p_name in self.non_data_names]
for shape_syst in shape_syst_list}
if not self.fit_models and not self.counting: # binned fits
histos = {}
tf = ROOT.TFile.Open(inputs["root"].targets[feature.name].path)
for name in self.data_names:
histos[name] = copy(tf.Get("histograms/" + name))
for name in self.non_data_names:
for (syst, d) in systs_directions:
if syst == "central":
name_to_save = name
name_from_featureplot = name
elif self.do_qcd and name == "qcd":
continue
else:
name_to_save = "%s_%s%s" % (name, syst, d.capitalize())
name_from_featureplot = "%s_%s_%s" % (name, syst, d)
histos[name_to_save] = copy(tf.Get("histograms/" + name_from_featureplot))
tf.Close()
yields = {name_central: histos[name_central].Integral()
for name_central in self.non_data_names}
self.write_datacard(yields, feature,
norm_systematics, shape_systematics, *self.additional_lines)
tf = ROOT.TFile.Open(create_file_dir(self.output()[feature.name]["root"].path),
"RECREATE")
for name, histo in histos.items():
if "data" in name:
name = "data_obs"
histo.Write(name)
tf.Close()
elif not self.counting: # unbinned fits
shape_systematics_feature = self.get_shape_systematics_from_inspect(feature.name)
# shape_systematics_feature = {}
datacard_syst_params = []
tf = ROOT.TFile.Open(create_file_dir(self.output()[feature.name]["root"].path),
"RECREATE")
for model, fit_params in self.models.items():
model_tf = ROOT.TFile.Open(
inputs["fits"][fit_params["process_name"]][feature.name]["root"].path)
w = model_tf.Get("workspace_" + fit_params["process_name"])
if fit_params["process_name"] not in shape_systematics_feature:
# directly extract the workspace from the inputs
tf.cd()
w.Write()
model_tf.Close()
else:
# create a new workspace with the dedicated systematics
x_range = fit_params.get("x_range", Fit.x_range._default)
if type(x_range) == str:
x_range = x_range.split(", ")
blind_range = fit_params.get("blind_range", Fit.blind_range._default)
method = fit_params.get("method")
x, blind = self.get_x(x_range, blind_range)
data = w.data("data_obs")
fit_parameters = fit_params.get("fit_parameters", {})
_, params = self.get_fit(method, fit_parameters, x)
# for param in params:
# param.setConstant(True)
# let's build the RooFormulaVar for each param (w/ or w/o systematics)
param_syst = OrderedDict()
systs = OrderedDict()
for param, value in params.items():
if param not in shape_systematics_feature[fit_params["process_name"]]:
# no systematics to add, only need to consider the actual parameter
param_syst[param] = value
else:
systs[param] = []
syst_values = []
for syst, syst_value in \
shape_systematics_feature[fit_params["process_name"]][param].items():
systs[param].append(ROOT.RooRealVar(f"d{param}_{syst}",
f"d{param}_{syst}", 0, -5, 5))
systs[param][-1].setConstant(True)
syst_values.append(syst_value)
datacard_syst_params.append(f"d{param}_{syst}")
param_syst[param] = ROOT.RooFormulaVar(
f"{param}_syst", f"{param}_syst",
"@0*" + "*".join([f"(1+{syst_values[i]}*@{i+1})"
for i in range(len(syst_values))]),
ROOT.RooArgList(value, *systs[param]))
# Create the new fitting function
if method == "voigtian":
fun = ROOT.RooVoigtian("model_%s" % fit_params["process_name"],
"model_%s" % fit_params["process_name"], x,
param_syst["mean"], param_syst["gamma"], param_syst["sigma"])
elif method == "polynomial":
fun = ROOT.RooPolynomial("model_%s" % fit_params["process_name"],
"model_%s" % fit_params["process_name"], x,
ROOT.RooArgList(*list(param_syst.values())))
elif method == "exponential":
fun = ROOT.RooExponential("model_%s" % fit_params["process_name"],
"model_%s" % fit_params["process_name"], x, param_syst["c"])
elif method == "powerlaw":
order = len(params.values())
fit_fun = " + ".join([f"@{i + 1} * TMath::Power(@0, @{i + 2})"
for i in range(0, order, 2)])
fun = ROOT.RooGenericPdf("model_%s" % self.process_name,
fit_fun, ROOT.RooArgList(*list(params.values())))
# Refit
if not blind:
fun.fitTo(data, ROOT.RooFit.SumW2Error(True))
else:
fun.fitTo(data, ROOT.RooFit.Range(
float(self.x_range[0]), float(self.x_range[1])),
ROOT.RooFit.SumW2Error(True))
for value in params.values():
value.setConstant(True)
# save the function inside the workspace
w_name = "workspace_syst"
workspace_syst = ROOT.RooWorkspace(w_name, w_name)
getattr(workspace_syst, "import")(fun)
getattr(workspace_syst, "import")(data)
tf.cd()
workspace_syst.Write("workspace_" + fit_params["process_name"])
model_tf.Close()
# data_obs
model_tf = ROOT.TFile.Open(inputs["fits"]["data_obs"][feature.name]["root"].path)
w = model_tf.Get("workspace_" + self.data_names[0])
tf.cd()
w.Write("workspace_data_obs")
model_tf.Close()
tf.Close()
norm_systematics_feature = self.get_norm_systematics_from_inspect(feature.name)
norm_systematics_feature.update(norm_systematics)
self.write_shape_datacard(feature, norm_systematics_feature, shape_systematics,
datacard_syst_params, *self.additional_lines)
else: # counting experiment
norm_systematics_feature = self.get_norm_systematics_from_inspect(feature.name)
norm_systematics_feature.update(norm_systematics)
self.write_counting_datacard(feature, norm_systematics_feature, shape_systematics,
*self.additional_lines)
[docs]class Fit(FeaturePlot, FitBase):
"""
Task that run fits over FeaturePlot histograms
:param method: Fitting method to consider. To choose between `voigtian`, `polynomial`,
`exponential`, `powerlaw`.
:type method: str
:param process_name: Process name to consider.
:type process_name: str
:param x_range: Range of the x axis to consider in the fitting.
:type x_range: Two comma-separated `float`.
:param blind_range: Range of the x axis to blind in the fitting.
:type blind_range: Two comma-separated `float`.
:param fit_parameters: Initial values for the parameters involved in the fit.
:type fit_parameters: `str` representing a dict (e.g. '{\"mean\": \"(20, -100, 100)\"}').
"""
method = luigi.ChoiceParameter(choices=("voigtian", "polynomial", "exponential", "powerlaw"),
default="voigtian", description="fitting method to consider, default: voigtian")
process_name = luigi.Parameter(default="signal", description="process name to consider, "
"default: signal")
x_range = law.CSVParameter(default=("1.2", "1.5"), description="range of the x axis to consider "
"in the fit, default: 1.2-1.5")
blind_range = law.CSVParameter(default=("-1", "-1"), description="range of the x axis to blind "
"in the fit, default: none")
fit_parameters = luigi.DictParameter(default={}, description="Initial values for the parameters "
"involved in the fit, defaults depend on the method. Should be included as "
"--fit-parameters '{\"mean\": \"(20, -100, 100)\"}'")
normalize_signals = False
def __init__(self, *args, **kwargs):
super(Fit, self).__init__(*args, **kwargs)
[docs] def requires(self):
"""
Needs as input the root file provided by the FeaturePlot task
"""
return FeaturePlot.vreq(self, save_root=True, stack=True, hide_data=False,
normalize_signals=False, save_yields=True)
[docs] def output(self):
"""
Returns, per feature, one json file storing the fit results and one root file
storing the workspace
"""
x0 = str(self.x_range[0]).replace(".", "p")
x1 = str(self.x_range[1]).replace(".", "p")
x = f"{x0}_{x1}".replace(" ", "")
blind = ("" if float(self.blind_range[0]) == -1. and float(self.blind_range[1]) == -1.
else "__blinded")
region_name = "" if not self.region_name else "__{}".format(self.region_name)
return {
feature.name: {
key: self.local_target("{}__{}__{}__{}_{}{}.{}".format(
feature.name, self.process_name, self.method, x, blind, region_name, key
))
for key in ["json", "root"]
}
for feature in self.features
}
[docs] def run(self):
"""
Obtains the fits per feature and systematic.
"""
inputs = self.input()
assert self.process_name in self.config.process_group_names[self.process_group_name]
isMC = self.config.processes.get(self.process_name).isMC
for ifeat, feature in enumerate(self.features):
systs_directions = [("central", "")]
if isMC and self.store_systematics:
systs = self.get_systs(feature, isMC)
systs_directions += list(itertools.product(systs, directions))
# fit range
x, blind = self.get_x(self.x_range, self.blind_range)
l = ROOT.RooArgList(x)
if blind:
x_blind, _ = self.get_x(self.blind_range)
l_blind = ROOT.RooArgList(x_blind)
d = {}
for syst_name, direction in systs_directions:
key = ""
if syst_name != "central":
key = f"_{syst_name}_{direction}"
tf = ROOT.TFile.Open(inputs["root"].targets[feature.name].path)
try:
histo = copy(tf.Get("histograms/" + self.process_name + key))
except:
raise ValueError(f"The histogram has not been created for {self.process_name}")
data = ROOT.RooDataHist("data_obs", "data_obs", l, histo)
if blind:
data_blind = ROOT.RooDataHist("data_obs_blind", "data_obs_blind", l_blind, histo)
frame = x.frame()
data.plotOn(frame)
n_non_zero_bins = 0
for i in range(1, histo.GetNbinsX()):
if histo.GetBinContent(i) > 0:
n_non_zero_bins += 1
# get function to fit and its parameters
fun, params = self.get_fit(self.method, self.fit_parameters, x,
fit_name="model_" + self.process_name)
# fitting
if not blind:
fun.fitTo(data, ROOT.RooFit.SumW2Error(True))
else:
fun.fitTo(data, ROOT.RooFit.Range(
float(self.x_range[0]), float(self.x_range[1])),
ROOT.RooFit.SumW2Error(True))
npar = fun.getParameters(data).selectByAttrib("Constant", False).getSize()
# filling output dict with fitting results
d[key] = {}
for param, value in params.items():
d[key][param] = value.getVal()
# setting the parameter as constant before saving it in the workspace
value.setConstant(True) # needs further studying
if self.method == "voigtian":
gamma_value = params["gamma"].getVal()
sigma_value = params["sigma"].getVal()
G = 2 * sigma_value * np.sqrt(2 * np.log(2))
L = 2 * gamma_value
d[key]["HWHM"] = (0.5346 * L + np.sqrt(0.2166 * L ** 2 + G ** 2)) / 2
histo_new = data.createHistogram("histo_new", x)
error = c_double(0.)
integral = histo_new.IntegralAndError(
0, histo_new.GetNbinsX() + 1, error)
if blind:
histo_blind = data_blind.createHistogram("histo_blind", x_blind)
error_blind = c_double(0.)
integral_blind = histo_blind.IntegralAndError(
0, histo_blind.GetNbinsX() + 1, error_blind)
# Additional results to include in the output dict
fun.plotOn(frame)
d[key]["chi2"] = frame.chiSquare()
d[key]["npar"] = npar
d[key]["chi2/ndf"] = frame.chiSquare(npar)
d[key]["Number of non-zero bins"] = n_non_zero_bins
d[key]["Full chi2"] = frame.chiSquare() * n_non_zero_bins
d[key]["ndf"] = n_non_zero_bins - npar
d[key]["integral"] = data.sumEntries()
d[key]["fit_range"] = self.x_range
d[key]["blind_range"] = "None" if not blind else self.blind_range
d[key]["sum_entries"] = data.sumEntries() - (
0 if not blind else data_blind.sumEntries())
d[key]["integral"] = integral - (0 if not blind else integral_blind)
d[key]["integral_error"] = error.value - (0 if not blind else error_blind.value)
w_name = "workspace_" + self.process_name + key
workspace = ROOT.RooWorkspace(w_name, w_name)
getattr(workspace, "import")(fun)
getattr(workspace, "import")(data)
workspace.Print()
f = ROOT.TFile.Open(create_file_dir(self.output()[feature.name]["root"].path),
"UPDATE")
f.cd()
workspace.Write()
f.Close()
with open(create_file_dir(self.output()[feature.name]["json"].path), "w+") as f:
json.dump(d, f, indent=4)
[docs]class InspectFitSyst(Fit):
"""
Task that extracts systematic effects on the fits.
"""
[docs] def requires(self):
"""
Needs as input the json file provided by the Fit task
"""
return Fit.vreq(self)
[docs] def output(self):
"""
Returns, per feature, one json file and one txt file storing the inspection results
"""
region_name = "" if not self.region else "__{}".format(self.region.name)
return {
feature.name: {
"json": self.local_target("{}__{}__{}{}.json".format(
feature.name, self.process_name, self.method, region_name)),
"txt": self.local_target("{}__{}__{}{}.txt".format(
feature.name, self.process_name, self.method, region_name))
}
for feature in self.features
}
[docs] def run(self):
inputs = self.input()
isMC = self.config.processes.get(self.process_name).isMC
for ifeat, feature in enumerate(self.features):
systs = self.get_unique_systs(self.get_systs(feature, isMC) \
+ self.config.get_weights_systematics(self.config.weights[self.category.name], isMC))
params = ["integral"]
if self.method == "voigtian":
params += ["mean", "sigma", "gamma"]
elif self.method == "exponential":
params += ["c"]
else:
order = int(self.fit_parameters.get("order", 1))
if self.method == "polynomial":
params += [f'p{i}' for i in range(order)]
elif self.method == "powerlaw":
params += [f'a{i}' for i in range(order)] + [f'b{i}' for i in range(order)]
with open(inputs[feature.name]["json"].path) as f:
d = json.load(f)
table = []
out = {}
for syst in systs:
line = [syst]
out[syst] = {}
for param in params:
out[syst][param] = {}
if d[""][param] == 0:
out[syst][param]["up"] = -999
out[syst][param]["down"] = -999
else:
out[syst][param]["up"] = (d[f"_{syst}_up"][param] - d[""][param]) / d[""][param]
out[syst][param]["down"] = (d[f"_{syst}_down"][param] - d[""][param]) /\
d[""][param]
line.append(out[syst][param]["up"])
line.append(out[syst][param]["down"])
table.append(line)
txt = tabulate.tabulate(table, headers=["syst name"] + list(
itertools.product(params, directions)))
print(txt)
with open(create_file_dir(self.output()[feature.name]["txt"].path), "w+") as f:
f.write(txt)
with open(create_file_dir(self.output()[feature.name]["json"].path), "w+") as f:
json.dump(out, f, indent=4)
[docs]class CombineDatacards(CombineCategoriesTask):
"""
Task that combines datacards coming from :class:`.CreateDatacards`.
"""
category_name = "base"
combine_categories = True
[docs] def requires(self):
"""
Needs as input the datacards provided by the CreateDatacards task for each category
"""
return {
category_name: CreateDatacards.vreq(self, category_name=category_name,
_exclude=["category_names"])
for category_name in self.category_names
}
[docs] def output(self):
"""
Outputs a txt file with the merged datacard
"""
return {
feature.name: self.local_target("{}{}.txt".format(
feature.name, self.get_output_postfix()))
for feature in self.features
}
[docs] def run(self):
"""
Combines all datacards using combine's combineCards.py
"""
cmd = "combineCards.py "
inputs = self.input()
for feature in self.features:
force_shape = False
for category_name in self.category_names:
cmd += f"{category_name}={inputs[category_name][feature.name]['txt'].path} "
with open(inputs[category_name][feature.name]['txt'].path) as f:
text = f.read()
if "shapes" in text:
force_shape = True
if force_shape:
cmd += "--force-shape "
cmd += f"> {self.output()[feature.name].path}"
create_file_dir(f"{self.output()[feature.name].path}")
os.system(cmd)
[docs]class CreateWorkspace(CreateDatacards, CombineCategoriesTask,
law.LocalWorkflow, HTCondorWorkflow, SGEWorkflow):
"""
Task that creates the Combine workspace from the datacards obtained from
:class:`.CreateDatacards` or :class:`.CombineDatacards`.
"""
category_name = "base"
[docs] def create_branch_map(self):
"""
Returns one branch if categories are combined or one branch per category
"""
if self.combine_categories:
return 1
return len(self.category_names)
[docs] def requires(self):
"""
Requires the datacard coming from CombineDatacards or one datacard per category obtained by
CreateDatacards.
"""
if self.combine_categories:
return CombineDatacards.vreq(self)
return {
category_name: CreateDatacards.vreq(self, category_name=category_name)
for category_name in self.category_names
}
[docs] def workflow_requires(self):
"""
Requires the datacard coming from CombineDatacards or one datacard per category obtained by
CreateDatacards.
"""
if self.combine_categories:
return {"data": CombineDatacards.vreq(self)}
return {
"data": {
category_name: CreateDatacards.vreq(self, category_name=category_name)
for category_name in self.category_names
}
}
[docs] def output(self):
"""
Outputs one root file with the workspace if categories are combined or one per category.
"""
assert not self.combine_categories or (
self.combine_categories and len(self.category_names) > 1)
return {
feature.name: self.local_target("workspace_{}{}.root".format(
feature.name, self.get_output_postfix() ))
for feature in self.features
}
[docs] def run(self):
"""
Obtains the workspace for each provided datacard.
"""
inputs = self.input()
for feature in self.features:
if not self.combine_categories:
inp = inputs[list(self.category_names)[self.branch]][feature.name]['txt'].path
else:
inp = inputs[feature.name].path
cmd = "text2workspace.py {} -m {} -o {}".format(
inp, self.higgs_mass, create_file_dir(self.output()[feature.name].path))
os.system(cmd)
[docs]class RunCombine(CreateWorkspace):
"""
Task that runs the combine tool over the workspace created by
:class:`.CreateWorkspace`.
:param method: Combine method to consider. Only `limits` (AsymptoticLimits) is implemented for
now.
:type method: str
:param unblind: Whether to run combine unblinded.
:type method: bool
"""
method = luigi.ChoiceParameter(choices=("limits",), default="limits",
description="combine method to be considered, default: False")
unblind = luigi.BoolParameter(default=False, description="whether to run combine unblinded, "
"default: False")
[docs] def workflow_requires(self):
"""
Requires the workspace coming from CreateWorkspace.
"""
return {"data": CreateWorkspace.vreq(self)}
[docs] def requires(self):
"""
Requires the workspace coming from CreateWorkspace.
"""
return CreateWorkspace.vreq(self)
[docs] def output(self):
"""
Outputs one txt file and one root file in total or one of each per category with the results
of running combine
"""
assert not self.combine_categories or (
self.combine_categories and len(self.category_names) > 1)
return {
feature.name: {
key: self.local_target("results_{}{}.{}".format(
feature.name, self.get_output_postfix(), key))
for key in ["txt", "root"]
}
for feature in self.features
}
[docs] def run(self):
"""
Runs combine over the provided workspaces.
"""
if self.method == "limits":
out_file = "higgsCombine{}.AsymptoticLimits.mH{}.root"
inputs = self.input()
for feature in self.features:
test_name = randomize("Test")
cmd = "combine -M "
if self.method == "limits":
cmd += "AsymptoticLimits "
cmd += f"--name {test_name} "
if not self.unblind: # not sure if this is only for AsymptoticLimits
cmd += "--run blind "
cmd += f"-m {self.higgs_mass} "
cmd += inputs[feature.name].path
cmd += f" > {create_file_dir(self.output()[feature.name]['txt'].path)}"
os.system(cmd)
move(out_file.format(test_name, self.higgs_mass),
self.output()[feature.name]["root"].path)
[docs]class PullsAndImpacts(RunCombine):
"""
Task that obtains the pulls and impacts over the workspace created by :class:`.CreateWorkspace`.
Based on https://gitlab.cern.ch/hh/tools/inference/-/blob/master/dhi/tasks/pulls_impacts.py
"""
def get_selected_nuisances(self, nuisances):
return nuisances
def extract_nuisance_names(self, path):
def prepare_prefit_var(var, pdf, epsilon=0.001):
"""
Prepares a RooRealVar *var* for the extraction of its prefit values using the corresponding
*pdf* object. Internally, this is done using a made-up fit with precision *epsilon* following a
recipe in the CombineHarvester (see
https://github.com/cms-analysis/CombineHarvester/blob/f1029e160701140ce3a1c1f44a991315fd272886/CombineTools/python/combine/utils.py#L87-L95). # noqa
*var* is returned.
"""
if var and pdf:
nll = ROOT.RooConstraintSum("NLL", "", ROOT.RooArgSet(pdf), ROOT.RooArgSet(var))
minimizer = ROOT.RooMinimizer(nll)
minimizer.setEps(epsilon)
minimizer.setErrorLevel(0.5)
minimizer.setPrintLevel(-1)
minimizer.setVerbose(False)
minimizer.minimize("Minuit2", "migrad")
minimizer.minos(ROOT.RooArgSet(var))
return var
tf = ROOT.TFile.Open(path)
w = tf.Get("w")
config = w.genobj("ModelConfig")
all_params = config.GetPdf().getParameters(config.GetObservables())
params = {}
for param in all_params:
if not (isinstance(param, ROOT.RooRealVar) and not param.isConstant()
and param.GetName() not in self.pois):
continue
pdf = w.pdf(f"{param.GetName()}_Pdf")
gobs = w.var(f"{param.GetName()}_In")
if pdf and gobs:
prepare_prefit_var(param, pdf)
nom = param.getVal()
prefit = [nom + param.getErrorLo(), nom, nom + param.getErrorHi()]
if isinstance(pdf, ROOT.RooGaussian):
pdf_type = "Gaussian"
elif isinstance(pdf, ROOT.RooPoisson):
pdf_type = "Poisson"
elif isinstance(pdf, ROOT.RooBifurGauss):
pdf_type = "AsymmetricGaussian"
else:
pdf_type = "Unrecognised"
elif not pdf or isinstance(pdf, ROOT.RooUniform):
pdf_type = "Unconstrained"
nom = param.getVal()
prefit = [nom, nom, nom]
# get groups
start = "group_"
groups = [attr.replace(start, "")
for attr in param.attributes() if attr.startswith(start)]
# store it
params[param.GetName()] = {
"name": param.GetName(),
"type": pdf_type,
"groups": groups,
"prefit": prefit,
}
tf.Close()
return params
@law.workflow_property(setter=False, empty_value=law.no_value, cache=True)
def workspace_parameters(self):
ws_input = CreateWorkspace.vreq(self, branch=0).output()[self.features[0].name]
if not ws_input.exists():
return law.no_value
return self.extract_nuisance_names(ws_input.path)
def __init__(self, *args, **kwargs):
super(PullsAndImpacts, self).__init__(*args, **kwargs)
[docs] def create_branch_map(self):
"""
Returns the nuisance parameters considered in the fit
"""
self.nuisance_names = [self.pois[0]]
params = self.get_selected_nuisances(self.workspace_parameters)
if params:
self.nuisance_names.extend(list(params.keys()))
self.cache_branch_map = True
return self.nuisance_names
[docs] def workflow_requires(self):
"""
Requires the workspace coming from CreateWorkspace.
"""
return {"data": CreateWorkspace.vreq(self, _exclude=["branches", "branch"])}
[docs] def requires(self):
"""
Requires the workspace coming from CreateWorkspace.
"""
return CreateWorkspace.vreq(self, _exclude=["branches", "branch"])
[docs] def output(self):
"""
Outputs one log, root file, and json per nuisance paramter.
"""
assert(self.combine_categories or self.category_names == 1)
return {
feature.name: {
key: self.local_target("results_{}{}__{}.{}".format(
feature.name, self.get_output_postfix(), self.branch_data, key))
for key in ["log", "root", "json"]
}
for feature in self.features
}
[docs] def run(self):
"""
Runs the combine commands needed for pulls and impacts computations.
"""
inp = self.input()["collection"].targets[0]
for feature in self.features:
w_path = inp[feature.name].path
if self.branch == 0:
cmd = ("combine -M MultiDimFit -n _initialFit --algo singles "
"--redefineSignalPOIs {poi} {toys} -m {mass} -d {workspace} > {log_file}")
out_file = "higgsCombine_initialFit.MultiDimFit.mH{mass}.root"
else:
cmd = ("combine -M MultiDimFit -n _paramFit_Test_{param} --algo impact "
"--redefineSignalPOIs {poi} -P {param} --floatOtherPOIs 1 --saveInactivePOI 1 "
"{toys} -m {mass} -d {workspace} > {log_file}")
out_file = "higgsCombine_paramFit_Test_{param}.MultiDimFit.mH{mass}.root"
toys = "--toys -1"
if self.unblind:
toys = ""
log_file = randomize("log")
cmd = cmd.format(poi=self.pois[0], toys=toys, mass=self.higgs_mass,
workspace=w_path, param=self.branch_data, log_file=log_file)
print("Running " + cmd)
os.system(cmd)
move(log_file, create_file_dir(self.output()[feature.name]["log"].path))
move(out_file.format(param=self.branch_data, mass=self.higgs_mass),
create_file_dir(self.output()[feature.name]["root"].path))
tree = uproot.open(self.output()[feature.name]["root"].path)["limit"]
results = {
self.branch_data: tree[self.branch_data].array().tolist()
}
if self.branch_data != self.pois[0]:
results[self.pois[0]] = tree[self.pois[0]].array().tolist()
results["prefit"] = self.workspace_parameters[self.branch_data]["prefit"]
results["groups"] = self.workspace_parameters[self.branch_data]["groups"]
results["type"] = self.workspace_parameters[self.branch_data]["type"]
with open(create_file_dir(self.output()[feature.name]["json"].path), "w+") as f:
json.dump(results, f)
[docs]class MergePullsAndImpacts(CombineCategoriesTask):
"""
Task that merges the pulls and impacts for each systematic obtained by :class:`.PullsAndImpacts`.
"""
category_name = "base"
[docs] def requires(self):
"""
Requires the json files coming from PullsAndImpacts.
"""
return PullsAndImpacts.vreq(self)
[docs] def output(self):
"""
Outputs one json file (in CombineHarvester format) with the results for all
nuisance parameters.
"""
assert(self.combine_categories or self.category_names == 1)
return {
feature.name: self.local_target("results_{}{}.json".format(
feature.name, self.get_output_postfix()))
for feature in self.features
}
[docs] def run(self):
"""
Merges the results from the PullsAndImpacts task into one json file
"""
inp = self.input()["collection"].targets
nuisance_names = list(self.requires().get_branch_map().values()) # includes POI
for feature in self.features:
res = {"POIs": [], "params": []}
with open(inp[0][feature.name]["json"].path) as f:
d = json.load(f)
res["POIs"].append({
"name": nuisance_names[0],
"fit": [d[self.pois[0]][1], d[self.pois[0]][0], d[self.pois[0]][2]]
})
for inuis, nuisance_name in enumerate(nuisance_names[1:]):
res["params"].append({"name": nuisance_name})
with open(inp[inuis + 1][feature.name]["json"].path) as f:
d = json.load(f)
res["params"][-1]["fit"] = [
d[nuisance_name][1], d[nuisance_name][0], d[nuisance_name][2]
]
res["params"][-1][self.pois[0]] = [
d[self.pois[0]][1], d[self.pois[0]][0], d[self.pois[0]][2]
]
res["params"][-1]["impact_" + self.pois[0]] = max(
list(map(abs, (res["params"][-1][self.pois[0]][1] -
res["params"][-1][self.pois[0]][i] for i in [0, 2])))
)
res["params"][-1]["prefit"] = d["prefit"]
res["params"][-1]["groups"] = d["groups"]
res["params"][-1]["type"] = d["type"]
with open(create_file_dir(self.output()[feature.name].path), "w+") as f:
json.dump(res, f, indent=4)
[docs]class PlotPullsAndImpacts(MergePullsAndImpacts):
"""
Task that plots pulls and impacts coming from :class:`.MergePullsAndImpacts`.
"""
[docs] def requires(self):
"""
Requires the json file with all pulls and impacts obtained by MergePullsAndImpacts.
"""
return MergePullsAndImpacts.vreq(self)
[docs] def output(self):
"""
Outputs one pdf with the pulls and impacts obtained by CombineHarvester.
"""
assert(self.combine_categories or self.category_names == 1)
return {
feature.name: self.local_target("impacts_{}{}.pdf".format(
feature.name, self.get_output_postfix()))
for feature in self.features
}
[docs] def run(self):
"""
Plots the pulls and impacts using CombineHarvester's plotImpacts.py
"""
inp = self.input()
for feature in self.features:
out = randomize("impacts")
os.system("plotImpacts.py -i {} -o {}".format(
inp[feature.name].path,
out
))
move(out + ".pdf", self.output()[feature.name].path)