Source code for swolfpy.Monte_Carlo

# -*- coding: utf-8 -*-
import multiprocessing as mp
import os

import numpy as np
import pandas as pd
from brightway2 import LCA, projects

from .LCA_matrix import LCA_matrix


[docs] class Monte_Carlo(LCA_matrix): """ Setups the Monte Carlo simulation. This class is inherited from ``swolfpy.LCA_matrix``. The Monte Carlo simulation will be only done for the process models, common data or parameters than the class gets by arguments. :param functional_unit: ``{flow:amount}`` :type functional_unit: dict :param method: List of methods for MC. :type method: list :param project: Name of the project. :type project: str :param process_models: list of the process models. :type process_models: list, optional :param process_model_names: list of process models' names. :type process_model_names: list, optional :param common_data: ``CommonData`` object. :type common_data: ``swolfpy_inputdata.CommonData.CommonData`` , optional :param parameters: ``Parameters`` object :type parameters: ``swolfpy_inputdata.Parameters.Parameters``,optional :param seed: seed for ``stats_arrays.RandomNumberGenerator`` :type seed: int, optional """ def __init__( self, functional_unit, method, project, process_models=None, process_model_names=None, common_data=None, parameters=None, seed=None, ): super().__init__(functional_unit, method) self.process_models = process_models self.process_model_names = process_model_names self.parameters = parameters self.common_data = common_data self.project = project if seed: self.seed = seed else: self.seed = 0
[docs] def run(self, nproc, n): """ Runs the Monte Carlo ``n`` times with ``nproc`` processors. Calls and map the ``Monte_Carlo.worker()`` to the processors. :param nproc: Number of processors allocated to MC :type nproc: int :param n: Number of iterations in MC :type n: int """ def pool_adapter(x): return x with pool_adapter(mp.Pool(processes=nproc)) as pool: res = pool.map( Monte_Carlo.worker, [ ( self.project, self.functional_unit, self.method, self.parameters, self.process_models, self.process_model_names, self.common_data, self.tech_matrix, self.bio_matrix, self.seed + i * 100, n // nproc, ) for i in range(nproc) ], ) self.results = [x for lst in res for x in lst]
# ============================================================================= # res = Monte_Carlo.worker((self.project, self.functional_unit, self.method, self.parameters, self.process_models, self.process_model_names, # self.common_data, self.tech_matrix, self.bio_matrix, self.seed, n//nproc)) # self.results = [x for lst in res for x in lst] # =============================================================================
[docs] @staticmethod def worker(args): """ Setups the Monte Carlo for process models and input data and then creates the ``LCA`` object and Calls the ``Monte_Carlo.parallel_mc()`` for `n` times. """ ( project, functional_unit, method, parameters, process_models, process_model_names, common_data, tech_matrix, bio_matrix, seed, n, ) = args projects.set_current(project, writable=False) if common_data: common_data.setup_MC(seed + 100000) if process_models: for seed_, x in enumerate(process_models): x.setup_MC(seed + seed_) if parameters: parameters.setup_MC(seed + 200000) lca = LCA(functional_unit, method[0]) lca.lci() lca.lcia() return [ Monte_Carlo.parallel_mc( lca, method, tech_matrix, bio_matrix, process_models=process_models, process_model_names=process_model_names, parameters=parameters, common_data=common_data, index=x, ) for x in range(n) ]
[docs] @staticmethod def parallel_mc( lca, method, tech_matrix, bio_matrix, process_models=None, process_model_names=None, parameters=None, common_data=None, index=None, ): """ Calls the ``InputData.gen_MC()`` , ``ProcessModel.MC_calc()`` and ``parameters.MC_calc()`` and then gets the new LCI and updates the ``tech_matrix`` and ``bio_matrix``. Creates new ``bio_param`` and ``tech_param`` and then recalculate the LCA. """ uncertain_inputs = [] if process_models: if common_data: uncertain_inputs += common_data.gen_MC() for process in process_models: process.CommonData = common_data uncertain_inputs += process.MC_calc() else: for process in process_models: uncertain_inputs += process.MC_calc() i = 0 for process_name in process_model_names: report_dict = process_models[i].report() LCA_matrix.update_techmatrix(process_name, report_dict, tech_matrix) LCA_matrix.update_biomatrix(process_name, report_dict, bio_matrix) i += 1 if parameters: param_exchanges, params = parameters.MC_calc() uncertain_inputs += params for key, value in param_exchanges.items(): if key in tech_matrix: tech_matrix[key] = value tech = np.array(list(tech_matrix.values()), dtype=float) bio = np.array(list(bio_matrix.values()), dtype=float) lca.rebuild_technosphere_matrix(tech) lca.rebuild_biosphere_matrix(bio) lca.lci_calculation() if lca.lcia: lca.lcia_calculation() lca_results = {} lca_results[method[0]] = lca.score if len(method) > 1: for i in range(1, len(method)): lca.switch_method(method[i]) lca.lcia_calculation() lca_results[method[i]] = lca.score lca.switch_method(method[0]) print(os.getpid(), index) return (os.getpid(), lca_results, uncertain_inputs)
### Export results
[docs] def result_to_DF(self): """ Returns the results from the Monte Carlo in a ``pandas.DataFrame`` format. :return: Monte Carlo results :rtype: ``pandas.DataFrame`` """ output = pd.DataFrame() # Reporting the LCIA results; Create a column for each method for j in self.results[0][1].keys(): output[j] = [self.results[i][1][j] for i in range(len(self.results))] # Reporting the input data for j in range(len(self.results[0][2])): output[self.results[0][2][j][0]] = [ self.results[i][2][j][1] for i in range(len(self.results)) ] return output
[docs] def save_results(self, name): """ Save the results from the Monte Carlo to pickle file. """ self.result_to_DF().to_pickle(name)