Source code for cmt.base_tasks.preprocessing

# coding: utf-8

"""
Preprocessing tasks.
"""

__all__ = []


import abc
import contextlib
import itertools
from collections import OrderedDict, defaultdict
import os
import sys
from subprocess import call
import json

import law
import luigi

from analysis_tools.utils import join_root_selection as jrs
from analysis_tools.utils import import_root, create_file_dir

from cmt.base_tasks.base import ( 
    DatasetTaskWithCategory, DatasetWrapperTask, HTCondorWorkflow, SGEWorkflow,
    InputData, ConfigTaskWithCategory, SplittedTask, DatasetTask, RDFModuleTask
)

directions = ["up", "down"]


class DatasetSuperWrapperTask(DatasetWrapperTask, law.WrapperTask):

    exclude_index = True

    def __init__(self, *args, **kwargs):
        super(DatasetSuperWrapperTask, self).__init__(*args, **kwargs)

    @abc.abstractmethod
    def atomic_requires(self, dataset):
        return None

    def requires(self):
        return OrderedDict(
            (dataset.name, self.atomic_requires(dataset))
            for dataset in self.datasets
        )


class DatasetSystWrapperTask(DatasetSuperWrapperTask):
    systematic_names = law.CSVParameter(default=(), description="names of systematics "
        "to run, default: central only (empty string)")

    exclude_index = True

    @abc.abstractmethod
    def atomic_requires(self, dataset, systematic, direction):
        return None

    def requires(self):
        systematics = [("", "")] + list(itertools.product(self.systematic_names, directions))
        return OrderedDict(
            ((dataset.name, syst, d),
                self.atomic_requires(dataset, syst, d))
            for dataset, (syst, d) in itertools.product(self.datasets, systematics)
        )


class DatasetCategoryWrapperTask(DatasetWrapperTask, law.WrapperTask):
    category_names = law.CSVParameter(default=("baseline_even",), description="names of categories "
        "to run, default: (baseline_even,)")

    exclude_index = True

    def __init__(self, *args, **kwargs):
        super(DatasetCategoryWrapperTask, self).__init__(*args, **kwargs)

        # tasks wrapped by this class do not allow composite categories, so split them here
        self.categories = []
        for name in self.category_names:
            category = self.config.categories.get(name)
            if category.subcategories:
                self.categories.extend(category.subcategories)
            else:
                self.categories.append(category)

    @abc.abstractmethod
    def atomic_requires(self, dataset, category):
        return None

    def requires(self):
        return OrderedDict(
            ((dataset.name, category.name), self.atomic_requires(dataset, category))
            for dataset, category in itertools.product(self.datasets, self.categories)
            if not dataset.process.name in category.get_aux("skip_processes", [])
        )


class DatasetCategoryWrapperTask(DatasetWrapperTask):
    category_names = law.CSVParameter(default=("baseline_even",), description="names of categories "
        "to run, default: (baseline_even,)")

    exclude_index = True

    def __init__(self, *args, **kwargs):
        super(DatasetCategoryWrapperTask, self).__init__(*args, **kwargs)

        # tasks wrapped by this class do not allow composite categories, so split them here
        self.categories = []
        for name in self.category_names:
            category = self.config.categories.get(name)
            if category.subcategories:
                self.categories.extend(category.subcategories)
            else:
                self.categories.append(category)

    @abc.abstractmethod
    def atomic_requires(self, dataset, category):
        return None

    def requires(self):
        return OrderedDict(
            ((dataset.name, category.name), self.atomic_requires(dataset, category))
            for dataset, category in itertools.product(self.datasets, self.categories)
            if not dataset.process.name in category.get_aux("skip_processes", [])
        )


class DatasetCategorySystWrapperTask(DatasetCategoryWrapperTask, law.WrapperTask):
    systematic_names = law.CSVParameter(default=(), description="names of systematics "
        "to run, default: central only (empty string)")

    exclude_index = True

    @abc.abstractmethod
    def atomic_requires(self, dataset, category, systematic, direction):
        return None

    def requires(self):
        systematics = [("", "")]
        if self.systematic_names:
            systematics += list(itertools.product(self.systematic_names, directions))
        return OrderedDict(
            ((dataset.name, category.name, syst, d),
                self.atomic_requires(dataset, category, syst, d))
            for dataset, category, (syst, d) in itertools.product(
                self.datasets, self.categories, systematics)
                if not dataset.process.name in category.get_aux("skip_processes", [])
        )


[docs]class PreCounter(DatasetTask, law.LocalWorkflow, HTCondorWorkflow, SGEWorkflow, SplittedTask, RDFModuleTask): """ Performs a counting of the events with and without applying the necessary weights. Weights are read from the config file. In case they have to be computed, RDF modules can be run. Example command: ``law run PreCounter --version test --config-name base_config --dataset-name ggf_sm \ --workflow htcondor --weights-file weight_file`` :param weights_file: filename inside ``cmt/config/`` (w/o extension) with the RDF modules to run :type weights_file: str :param systematic: systematic to use for categorization. :type systematic: str :param systematic_direction: systematic direction to use for categorization. :type systematic_direction: str """ weights_file = luigi.Parameter(description="filename with modules to run RDataFrame", default=law.NO_STR) systematic = luigi.Parameter(default="", description="systematic to use for categorization, " "default: None") systematic_direction = luigi.Parameter(default="", description="systematic direction to use " "for categorization, default: None") # regions not supported region_name = None default_store = "$CMT_STORE_EOS_CATEGORIZATION" default_wlcg_fs = "wlcg_fs_categorization" def __init__(self, *args, **kwargs): super(PreCounter, self).__init__(*args, **kwargs) self.addendum = self.get_addendum() self.custom_output_tag = "_%s" % self.addendum self.threshold = self.dataset.get_aux("event_threshold", None) self.merging_factor = self.dataset.get_aux("preprocess_merging_factor", None) def get_addendum(self): if self.systematic: weights = self.config.weights.total_events_weights for weight in weights: try: feature = self.config.features.get(weight) if self.systematic in feature.systematics: return f"{self.systematic}_{self.systematic_direction}_" except: continue return ""
[docs] def create_branch_map(self): """ :return: number of files for the selected dataset :rtype: int """ self.threshold = self.dataset.get_aux("event_threshold", None) self.merging_factor = self.dataset.get_aux("preprocess_merging_factor", None) if not self.threshold and not self.merging_factor: return len(self.dataset.get_files( os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), add_prefix=False, check_empty=True)) elif self.threshold and not self.merging_factor: return len(self.dataset.get_file_groups( path_to_look=os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), threshold=self.threshold)) elif not self.threshold and self.merging_factor: nfiles = len(self.dataset.get_files( os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), add_prefix=False, check_empty=True)) nbranches = nfiles // self.dataset.get_aux("preprocess_merging_factor") if nfiles % self.dataset.get_aux("preprocess_merging_factor"): nbranches += 1 return nbranches else: raise ValueError("Both event_threshold and preprocess_merging_factor " "can't be set at once")
[docs] def workflow_requires(self): """ """ return {"data": InputData.req(self)}
[docs] def requires(self): """ Each branch requires one input file """ if not self.threshold and not self.merging_factor: return InputData.req(self, file_index=self.branch) elif self.threshold and not self.merging_factor: reqs = {} for i in self.dataset.get_file_groups( path_to_look=os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), threshold=self.threshold)[self.branch]: reqs[str(i)] = InputData.req(self, file_index=i) return reqs elif not self.threshold and self.merging_factor: nfiles = len(self.dataset.get_files( os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), add_prefix=False, check_empty=True)) reqs = {} for i in range(self.merging_factor * self.branch, self.merging_factor * (self.branch + 1)): if i >= nfiles: break reqs[str(i)] = InputData.req(self, file_index=i) return reqs else: raise ValueError("Both event_threshold and preprocess_merging_factor " "can't be set at once")
[docs] def output(self): """ :return: One file per input file :rtype: `.json` """ return self.local_target(f"data_{self.addendum}{self.branch}.json")
[docs] def get_weight(self, weights, syst_name, syst_direction, **kwargs): """ Obtains the product of all weights depending on the category/channel applied. Returns "1" if it's a data sample. :return: Product of all weights to be applied :rtype: str """ if self.config.processes.get(self.dataset.process.name).isData: return "1" return self.config.get_weights_expression(weights, syst_name, syst_direction)
[docs] @law.decorator.notify @law.decorator.localize(input=False) def run(self): """ Creates one RDataFrame per input file, runs the desired RDFModules and counts the number of events w/ and w/o additional weights """ from shutil import copy ROOT = import_root() ROOT.ROOT.EnableImplicitMT(self.request_cpus) # create RDataFrame inp = self.get_input() if not self.dataset.friend_datasets: df = ROOT.RDataFrame(self.tree_name, self.get_path(inp)) # friend tree else: tchain = ROOT.TChain() for elem in self.get_path(inp): tchain.Add("{}/{}".format(elem, self.tree_name)) friend_tchain = ROOT.TChain() for elem in self.get_path(inp, 1): friend_tchain.Add("{}/{}".format(elem, self.tree_name)) tchain.AddFriend(friend_tchain, "friend") df = ROOT.RDataFrame(tchain) weight_modules = self.get_feature_modules(self.weights_file) if len(weight_modules) > 0: for module in weight_modules: try: df, _ = module.run(df) except Exception as e: print("Exception: %s. Exiting" % e) sys.exit() weight = self.get_weight( self.config.weights.total_events_weights, self.systematic, self.systematic_direction) hmodel = ("", "", 1, 1, 2) histo_noweight = df.Define("var", "1.").Histo1D(hmodel, "var") if not self.dataset.process.isData: histo_weight = df.Define("var", "1.").Define("weight", weight).Histo1D( hmodel, "var", "weight") else: histo_weight = df.Define("var", "1.").Histo1D(hmodel, "var") d = { "nevents": histo_noweight.Integral(), "nweightedevents": histo_weight.Integral(), "filenames": [str(self.get_path(inp))] } with open(create_file_dir(self.output().path), "w+") as f: json.dump(d, f, indent=4)
[docs]class PreCounterWrapper(DatasetSystWrapperTask): """ Wrapper task to run the PreCounter task over several datasets in parallel. Example command: ``law run PreCounterWrapper --version test --config-name base_config \ --dataset-names tt_dl,tt_sl --PreCounter-weights-file weight_file --workers 2`` """ def atomic_requires(self, dataset, systematic, direction): return PreCounter.req(self, dataset_name=dataset.name, systematic=systematic, systematic_direction=direction)
[docs]class PreprocessRDF(PreCounter, DatasetTaskWithCategory): """ Performs the preprocessing step applying a preselection + running RDF modules See requirements in :class:`.PreCounter`. Example command: ``law run PreprocessRDF --version test --category-name base_selection \ --config-name base_config --dataset-name ggf_sm --workflow htcondor \ --modules-file modulesrdf --workers 10 --max-runtime 12h`` :param modules_file: filename inside ``cmt/config/`` or "../config/" (w/o extension) with the RDF modules to run :type modules_file: str :param keep_and_drop_file: filename inside ``cmt/config/`` or "../config/" (w/o extension) with the RDF columns to save in the output file :type keep_and_drop_file: str """ modules_file = luigi.Parameter(description="filename with RDF modules", default=law.NO_STR) keep_and_drop_file = luigi.Parameter(description="filename with branches to save, empty: all", default="") weights_file = None default_store = "$CMT_STORE_EOS_PREPROCESSING" default_wlcg_fs = "wlcg_fs_categorization" def get_addendum(self): if self.systematic: systematic = self.config.systematics.get(self.systematic) if self.category.name in systematic.get_aux("affected_categories", []): return f"{self.systematic}_{self.systematic_direction}_" return ""
[docs] def output(self): """ :return: One file per input file with the tree + additional branches :rtype: `.root` """ return self.local_target(f"data_{self.addendum}{self.branch}.root")
[docs] @law.decorator.notify @law.decorator.localize(input=False) def run(self): """ Creates one RDataFrame per input file, applies a preselection and runs the desired RDFModules """ from shutil import copy ROOT = import_root() ROOT.ROOT.EnableImplicitMT(self.request_cpus) # create RDataFrame inp = self.get_input() if not self.dataset.friend_datasets: df = ROOT.RDataFrame(self.tree_name, self.get_path(inp)) # friend tree else: tchain = ROOT.TChain() for elem in self.get_path(inp): tchain.Add("{}/{}".format(elem, self.tree_name)) friend_tchain = ROOT.TChain() for elem in self.get_path(inp, 1): friend_tchain.Add("{}/{}".format(elem, self.tree_name)) tchain.AddFriend(friend_tchain, "friend") df = ROOT.RDataFrame(tchain) print(self.get_path(inp)) outp = self.output() # print(outp.path) selection = self.category.selection # dataset_selection = self.dataset.get_aux("selection") # if dataset_selection and dataset_selection != "1": # selection = jrs(dataset_selection, selection, op="and") if selection != "": filtered_df = df.Define("selection", selection).Filter("selection") else: filtered_df = df modules = self.get_feature_modules(self.modules_file) branches = list(df.GetColumnNames()) if len(modules) > 0: for module in modules: try: filtered_df, add_branches = module.run(filtered_df) except Exception as e: print("Exception: %s. Exiting" % e) sys.exit() branches += add_branches branches = self.get_branches_to_save(branches, self.keep_and_drop_file) branch_list = ROOT.vector('string')() for branch_name in branches: branch_list.push_back(branch_name) filtered_df.Snapshot(self.tree_name, create_file_dir(outp.path), branch_list)
[docs]class PreprocessRDFWrapper(DatasetCategorySystWrapperTask): """ Wrapper task to run the PreprocessRDF task over several datasets in parallel. Example command: ``law run PreprocessRDFWrapper --version test --category-name base_selection \ --config-name ul_2018 --dataset-names tt_dl,tt_sl --PreprocessRDF-workflow htcondor \ --PreprocessRDF-max-runtime 48h --PreprocessRDF-modules-file modulesrdf --workers 10`` """ def atomic_requires(self, dataset, category, systematic, direction): return PreprocessRDF.vreq(self, dataset_name=dataset.name, category_name=category.name, systematic=systematic, systematic_direction=direction)
class Preprocess(DatasetTaskWithCategory, law.LocalWorkflow, HTCondorWorkflow, SplittedTask): modules = luigi.DictParameter(default=None) modules_file = luigi.Parameter(description="filename with modules to run on nanoAOD tools", default="") max_events = luigi.IntParameter(description="maximum number of input events per file, " " -1 for all events", default=50000) keep_and_drop_file = luigi.Parameter(description="filename with output branches to " "keep and drop", default="$CMT_BASE/cmt/files/keep_and_drop_branches.txt") # regions not supported region_name = None default_store = "$CMT_STORE_EOS_CATEGORIZATION" default_wlcg_fs = "wlcg_fs_categorization" def __init__(self, *args, **kwargs): super(Preprocess, self).__init__(*args, **kwargs) if not self.keep_and_drop_file: self.keep_and_drop_file = None else: if "$" in self.keep_and_drop_file: self.keep_and_drop_file = os.path.expandvars(self.keep_and_drop_file) if self.dataset.get_aux("splitting") and self.max_events != -1: self.max_events = self.dataset.get_aux("splitting") if self.max_events != -1: if not hasattr(self, "splitted_branches") and self.is_workflow(): self.splitted_branches = self.build_splitted_branches() elif not hasattr(self, "splitted_branches"): self.splitted_branches = self.get_splitted_branches def get_n_events(self, fil): ROOT = import_root() for trial in range(10): try: f = ROOT.TFile.Open(fil) tree = f.Get(self.tree_name) nevents = tree.GetEntries() f.Close() return nevents except: print("Failed opening %s, %s/10 trials" % (fil, trial + 1)) raise RuntimeError("Failed opening %s" % fil) def build_splitted_branches(self): if self.dataset.get_aux("splitting"): self.max_events = self.dataset.get_aux("splitting") if not os.path.exists( os.path.expandvars("$CMT_TMP_DIR/%s/splitted_branches_%s/%s.json" % ( self.config_name, self.max_events, self.dataset.name))): ROOT = import_root() files = self.dataset.get_files( os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), add_prefix=False, check_empty=True) branches = [] for ifil, fil in enumerate(files): nevents = -1 fil = self.dataset.get_files( os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), index=ifil) print("Analyzing file %s" % fil) nevents = self.get_n_events(fil) initial_event = 0 isplit = 0 while initial_event < nevents: max_events = min(initial_event + self.max_events, int(nevents)) branches.append({ "filenumber": ifil, "split": isplit, "initial_event": initial_event, "max_events": max_events, }) initial_event += self.max_events isplit += 1 with open(create_file_dir(os.path.expandvars( "$CMT_TMP_DIR/%s/splitted_branches_%s/%s.json" % ( self.config_name, self.max_events, self.dataset.name))), "w+") as f: json.dump(branches, f, indent=4) else: with open(create_file_dir(os.path.expandvars( "$CMT_TMP_DIR/%s/splitted_branches_%s/%s.json" % ( self.config_name, self.max_events, self.dataset.name)))) as f: branches = json.load(f) return branches @law.workflow_property def get_splitted_branches(self): return self.splitted_branches def create_branch_map(self): if self.max_events != -1: return len(self.splitted_branches) else: return len(self.dataset.get_files( os.path.expandvars("$CMT_TMP_DIR/%s/" % self.config_name), add_prefix=False)) def workflow_requires(self): return {"data": InputData.req(self)} def requires(self): if self.max_events == -1: return InputData.req(self, file_index=self.branch) else: return InputData.req(self, file_index=self.splitted_branches[self.branch]["filenumber"]) def output(self): return {"data": self.local_target("data_%s.root" % self.branch), "stats": self.local_target("data_%s.json" % self.branch)} # return self.local_target("{}".format(self.input()["data"].path.split("/")[-1])) def get_modules(self): module_params = None if self.modules_file: import yaml from cmt.utils.yaml_utils import ordered_load with open(self.retrieve_file("config/{}.yaml".format(self.modules_file))) as f: module_params = ordered_load(f, yaml.SafeLoader) else: return [] def _args(*_nargs, **_kwargs): return _nargs, _kwargs modules = [] if not module_params: return modules for tag in module_params.keys(): parameter_str = "" assert "name" in module_params[tag] and "path" in module_params[tag] name = module_params[tag]["name"] if "parameters" in module_params[tag]: for param, value in module_params[tag]["parameters"].items(): if isinstance(value, str): if "self" in value: value = eval(value) if isinstance(value, str): parameter_str += param + " = '{}', ".format(value) else: parameter_str += param + " = {}, ".format(value) mod = module_params[tag]["path"] mod = __import__(mod, fromlist=[name]) nargs, kwargs = eval('_args(%s)' % parameter_str) modules.append(getattr(mod, name)(**kwargs)()) return modules @law.decorator.notify @law.decorator.localize(input=False) def run(self): from shutil import move from PhysicsTools.NanoAODTools.postprocessing.framework.postprocessor import PostProcessor from analysis_tools.utils import import_root ROOT = import_root() # prepare inputs and outputs # inp = self.input()["data"].path inp = self.input().path outp = self.output() d = {} # count events if self.max_events == -1: d["nevents"] = self.get_n_events(inp) else: d["nevents"] = (self.splitted_branches[self.branch]["max_events"] - self.splitted_branches[self.branch]["initial_event"]) with open(outp["stats"].path, "w+") as f: json.dump(d, f, indent = 4) # build the full selection selection = self.category.get_aux("nt_selection", self.category.selection) # dataset_selection = self.dataset.get_aux("selection") # if dataset_selection and dataset_selection != "1": # selection = jrs(dataset_selection, selection, op="and") # selection = "Jet_pt > 500" # hard-coded to reduce the number of events for testing # selection = "(event == 265939)" modules = self.get_modules() if self.max_events == -1: maxEntries = None firstEntry = 0 postfix = "" output_file = inp.split("/")[-1] else: maxEntries = self.max_events # maxEntries = 1 # firstEntry = 228 firstEntry = self.splitted_branches[self.branch]["initial_event"] postfix = "_%s" % self.splitted_branches[self.branch]["split"] output_file = ("%s." % postfix).join(inp.split("/")[-1].split(".")) p = PostProcessor(".", [inp], cut=selection, modules=modules, postfix=postfix, outputbranchsel=self.keep_and_drop_file, maxEntries=maxEntries, firstEntry=firstEntry) p.run() move(output_file, outp["data"].path) class PreprocessWrapper(DatasetCategoryWrapperTask): def atomic_requires(self, dataset, category): return Preprocess.req(self, dataset_name=dataset.name, category_name=category.name)
[docs]class Categorization(PreprocessRDF): """ Performs the categorization step running RDF modules and applying a post-selection Example command: ``law run Categorization --version test --category-name etau --config-name base_config \ --dataset-name tt_dl --workflow local --base-category-name base_selection \ --workers 10 --feature-modules-file features`` :param base_category_name: category name from the PreprocessRDF requirements. :type base_category_name: str :param feature_modules_file: filename inside ``cmt/config/`` or ``../config/`` (w/o extension) with the RDF modules to run :type feature_modules_file: str :param skip_preprocess: whether to skip the PreprocessRDF task :type skip_preprocess: bool """ base_category_name = luigi.Parameter(default="base_selection", description="the name of the " "base category with the initial selection, default: base") feature_modules_file = luigi.Parameter(description="filename with RDataFrame modules to run", default=law.NO_STR) skip_preprocess = luigi.BoolParameter(default=False, description="whether to skip the " " PreprocessRDF task, default: False") # regions not supported region_name = None default_store = "$CMT_STORE_EOS_CATEGORIZATION" default_wlcg_fs = "wlcg_fs_categorization" def __init__(self, *args, **kwargs): super(Categorization, self).__init__(*args, **kwargs)
[docs] def workflow_requires(self): if not self.skip_preprocess: return {"data": PreprocessRDF.vreq(self, category_name=self.base_category_name)} else: return {"data": InputData.req(self)}
[docs] def requires(self): if not self.skip_preprocess: return PreprocessRDF.vreq(self, category_name=self.base_category_name, branch=self.branch) else: return InputData.req(self, file_index=self.branch)
[docs] def output(self): return self.local_target(f"data_{self.addendum}{self.branch}.root")
[docs] @law.decorator.notify @law.decorator.localize(input=False) def run(self): """ Creates one RDataFrame per input file, runs the desired RDFModules and applies a post-selection """ from shutil import copy ROOT = import_root() ROOT.ROOT.EnableImplicitMT(self.request_cpus) # prepare inputs and outputs # inp = self.input()["data"].path # inp = self.input().path outp = self.output() # tf = ROOT.TFile.Open(inp) try: if self.skip_preprocess: # create RDataFrame inp = self.get_input() if not self.dataset.friend_datasets: df = ROOT.RDataFrame(self.tree_name, self.get_path(inp)) # friend tree else: tchain = ROOT.TChain() for elem in self.get_path(inp): tchain.Add("{}/{}".format(elem, self.tree_name)) friend_tchain = ROOT.TChain() for elem in self.get_path(inp, 1): friend_tchain.Add("{}/{}".format(elem, self.tree_name)) tchain.AddFriend(friend_tchain, "friend") df = ROOT.RDataFrame(tchain) else: df = ROOT.RDataFrame(self.tree_name, self.input().path) selection = self.config.get_object_expression(self.category, self.dataset.process.isMC, self.systematic, self.systematic_direction) dataset_selection = self.dataset.get_aux("selection") if dataset_selection and dataset_selection != "1": selection = jrs(dataset_selection, selection, op="and") try: branches = list(df.GetColumnNames()) except: raise ReferenceError feature_modules = self.get_feature_modules(self.feature_modules_file) if len(feature_modules) > 0: for module in feature_modules: try: df, add_branches = module.run(df) except Exception as e: print("Exception: %s. Exiting" % e) sys.exit() branches += add_branches branches = self.get_branches_to_save(branches, self.keep_and_drop_file) branch_list = ROOT.vector('string')() for branch_name in branches: branch_list.push_back(branch_name) filtered_df = df.Define("selection", selection).Filter("selection") filtered_df.Snapshot(self.tree_name, create_file_dir(outp.path), branch_list) except ReferenceError: # empty ntuple inp = self.input().path copy(inp, outp.path) except AttributeError: # empty input file inp = self.input().path copy(inp, outp.path)
#copy(self.input()["stats"].path, outp["stats"].path) # except KeyboardInterrupt: # print("### DEBUG Error")
[docs]class CategorizationWrapper(DatasetCategorySystWrapperTask): """ Wrapper task to run the Categorization task over several datasets in parallel. Example command: ``law run CategorizationWrapper --version test --category-names etau --config-name base_config \ --dataset-names tt_dl,tt_sl --Categorization-workflow htcondor --workers 20 \ --Categorization-base-category-name base_selection`` """ def atomic_requires(self, dataset, category, systematic, direction): return Categorization.req(self, dataset_name=dataset.name, category_name=category.name, systematic=systematic, systematic_direction=direction)
[docs]class MergeCategorization(DatasetTaskWithCategory, law.tasks.ForestMerge): """ Merges the output from the Categorization or PreprocessRDF tasks in order to reduce the parallelization entering the plotting tasks. By default it merges into one output file, although a bigger number can be set with the `merging` parameter inside the dataset definition. In simulated samples, ``hadd`` is used to perform the merging. In data samples, to avoid skipping events due to different branches between them, ``haddnano.py`` (safer but slower) is used instead. In any case, the use of one method or the other can be forced by specifying the parameters ``--force-hadd`` and ``--force-haddnano`` respectively. Example command: ``law run MergeCategorization --version test --category-name etau \ --config-name base_config --dataset-name tt_sl --workflow local --workers 4`` :param from_preprocess: whether it merges the output from the PreprocessRDF task (True) or Categorization (False, default) :type from_preprocess: bool :param force_hadd: whether to force ``hadd`` as tool to do the merging. :type force_hadd: bool :param force_haddnano: whether to force ``haddnano.py`` as tool to do the merging. :type force_haddnano: bool :param systematic: systematic to use for categorization. :type systematic: str :param systematic_direction: systematic direction to use for categorization. :type systematic_direction: str """ from_preprocess = luigi.BoolParameter(default=False, description="whether to use as input " "PreprocessRDF, default: False") force_hadd = luigi.BoolParameter(default=False, description="whether to force hadd " "as tool to do the merging, default: False") force_haddnano = luigi.BoolParameter(default=False, description="whether to force haddnano.py " "as tool to do the merging, default: False") systematic = luigi.Parameter(default="", description="systematic to use for categorization, " "default: None") systematic_direction = luigi.Parameter(default="", description="systematic direction to use " "for categorization, default: None") # regions not supported region_name = None merge_factor = 10 default_store = "$CMT_STORE_EOS_MERGECATEGORIZATION" default_wlcg_fs = "wlcg_fs_categorization" def merge_workflow_requires(self): if not self.from_preprocess: return Categorization.vreq(self, _prefer_cli=["workflow"]) else: return PreprocessRDF.vreq(self, _prefer_cli=["workflow"]) def merge_requires(self, start_leaf, end_leaf): if not self.from_preprocess: return Categorization.vreq(self, workflow="local", branches=((start_leaf, end_leaf),), _exclude={"branch"}) else: return PreprocessRDF.vreq(self, workflow="local", branches=((start_leaf, end_leaf),), _exclude={"branch"}) def trace_merge_inputs(self, inputs): return [inp for inp in inputs["collection"].targets.values()] def merge_output(self): addendum = PreprocessRDF.get_addendum(self) return law.SiblingFileCollection([ self.local_target("data_{}{}.root".format(addendum, i)) for i in range(self.n_files_after_merging) ]) def merge(self, inputs, output): ROOT = import_root() # with output.localize("w") as tmp_out: with output.localize("w") as tmp_out: good_inputs = [] for inp in inputs: # merge only files with a filled tree tf = ROOT.TFile.Open(inp.path) try: tree = tf.Get(self.tree_name) if tree.GetEntries() > 0: good_inputs.append(inp) except: print("File %s not used" % inp.path) if len(good_inputs) != 0: use_hadd = self.dataset.process.isMC assert not(self.force_haddnano and self.force_hadd) if self.force_haddnano: use_hadd = False elif self.force_hadd: use_hadd = True if use_hadd: print("Merging with hadd...") law.root.hadd_task(self, good_inputs, tmp_out, local=True) else: print("Merging with haddnano.py...") cmd = "python3 %s/bin/%s/haddnano.py %s %s" % ( os.environ["CMSSW_BASE"], os.environ["SCRAM_ARCH"], create_file_dir(tmp_out.path), " ".join([f.path for f in good_inputs])) rc = call(cmd, shell=True) else: # if all input files are empty, create an empty file as output tf = ROOT.TFile.Open(create_file_dir(tmp_out.path), "RECREATE") tf.Close()
[docs]class MergeCategorizationWrapper(DatasetCategorySystWrapperTask): """ Wrapper task to run the MergeCategorizationWrapper task over several datasets in parallel. Example command: ``law run MergeCategorizationWrapper --version test --category-names etau \ --config-name base_config --dataset-names tt_dl,tt_sl --workers 10`` """ def atomic_requires(self, dataset, category, systematic, direction): return MergeCategorization.vreq(self, dataset_name=dataset.name, category_name=category.name, systematic=systematic, systematic_direction=direction)
[docs]class MergeCategorizationStats(DatasetTask, law.tasks.ForestMerge): """ Merges the output from the PreCounter task in order to reduce the parallelization entering the plotting tasks. :param systematic: systematic to use for categorization. :type systematic: str :param systematic_direction: systematic direction to use for categorization. :type systematic_direction: str Example command: ``law run MergeCategorizationStats --version test --config-name base_config \ --dataset-name dy_high --workers 10`` """ systematic = luigi.Parameter(default="", description="systematic to use for categorization, " "default: None") systematic_direction = luigi.Parameter(default="", description="systematic direction to use " "for categorization, default: None") # regions not supported region_name = None merge_factor = 16 default_store = "$CMT_STORE_EOS_CATEGORIZATION" default_wlcg_fs = "wlcg_fs_categorization" def merge_workflow_requires(self): return PreCounter.vreq(self, _prefer_cli=["workflow"]) def merge_requires(self, start_leaf, end_leaf): return PreCounter.vreq(self, workflow="local", branches=((start_leaf, end_leaf),), _exclude={"branch"}) def trace_merge_inputs(self, inputs): # return [inp["data"] for inp in inputs["collection"].targets.values()] return [inp for inp in inputs["collection"].targets.values()] def merge_output(self): addendum = PreCounter.get_addendum(self) return self.local_target(f"stats{addendum}.json") def merge(self, inputs, output): # output content stats = dict(nevents=0, nweightedevents=0, filenames=[]) # merge for inp in inputs: try: if "json" in inp.path: _stats = inp.load(formatter="json") elif "root" in inp.path: _stats = inp.load(formatter="root") except: print("error leading input target {}".format(inp)) raise # add nevents if "json" in inp.path: stats["nevents"] += _stats["nevents"] stats["nweightedevents"] += _stats["nweightedevents"] stats["filenames"] += _stats["filenames"] else: try: histo = _stats.Get("histos/events") stats["nevents"] += histo.GetBinContent(1) stats["nweightedevents"] += histo.GetBinContent(2) except: stats["nevents"] += 0 stats["nweightedevents"] += 0 output.parent.touch() output.dump(stats, indent=4, formatter="json")
[docs]class MergeCategorizationStatsWrapper(DatasetSystWrapperTask): """ Wrapper task to run the MergeCategorizationStatsWrapper task over several datasets in parallel. Example command: ``law run MergeCategorizationStatsWrapper --version test --config-name base_config \ --dataset-names tt_dl,tt_sl --workers 10`` """ def atomic_requires(self, dataset, systematic, direction): return MergeCategorizationStats.req(self, dataset_name=dataset.name, systematic=systematic, systematic_direction=direction)
[docs]class EventCounterDAS(DatasetTask): """ Performs a counting of the events with and without applying the necessary weights. Weights are read from the config file. In case they have to be computed, RDF modules can be run. Example command: ``law run EventCounterDAS --version test --config-name base_config --dataset-name ggf_sm`` :param use_secondary_dataset: whether to use the dataset included in the secondary_dataset parameter from the dataset instead of the actual dataset :type use_secondary_dataset: bool """ use_secondary_dataset = luigi.BoolParameter(default=False, description="whether to use " "secondary_dataset instead of the actual dataset or folder, default: False")
[docs] def requires(self): """ No requirements needed """ return {}
[docs] def output(self): """ :return: One file for the whole dataset :rtype: `.json` """ return self.local_target("stats.json")
[docs] def run(self): """ Asks for the numbers of events using dasgoclient and stores them in the output json file """ from analysis_tools.utils import randomize from subprocess import call tmpname = randomize("tmp") cmd = 'dasgoclient --query=="summary dataset={}" > {}' if not self.use_secondary_dataset: assert self.dataset.dataset rc = call(cmd.format(self.dataset.dataset, tmpname), shell = True) else: assert self.dataset.get_aux("secondary_dataset", None) rc = call(cmd.format(self.dataset.get_aux("secondary_dataset"), tmpname), shell = True) if rc == 0: with open(tmpname) as f: d = json.load(f) output_d = { "nevents": d[0]["nevents"], "nweightedevents": d[0]["nevents"], } with open(create_file_dir(self.output().path), "w+") as f: json.dump(output_d, f, indent=4) os.remove(tmpname)
[docs]class EventCounterDASWrapper(DatasetSuperWrapperTask): """ Wrapper task to run the EventCounterDAS task over several datasets in parallel. Example command: ``law run EventCounterDASWrapper --version test --config-name base_config \ --dataset-names tt_dl,tt_sl --workers 2`` """ def atomic_requires(self, dataset): return EventCounterDAS.req(self, dataset_name=dataset.name)