# -*- coding: utf-8 -*-
import pickle
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from brightway2 import (
LCA,
Database,
Method,
MultiLCA,
calculation_setups,
get_activity,
parameters,
projects,
)
from bw2analyzer import ContributionAnalysis
from bw2data.parameters import ActivityParameter
from .Parameters import Parameters
from .ProcessDB import ProcessDB
from .Technosphere import Technosphere
[docs]
class Project:
"""
Project class creates a new project in Brightway2.
:param project_name: Name for the project
:type project_name: str
:param CommonData: CommonData object
:type CommonData: class: `swolfpy_inputdata.CommonData`
:param Treatment_processes: Dictionary for treatment processes include their input type and model.
:type Treatment_processes: dict
:param Distance: Distance object.
:type Distance: class: `swolfpy.Distance.Distance`
:param Collection_processes: Dictionary for collection processes include their input type and model.
Input type for the collection process is empty list ``[]`` as they don't accept waste from other processes.
:type Collection_processes: dict, optional
.. note:: Treatment processes and distance between them are required for creating a project.
Collection processes are not required unless they are included in the system boundary.
:Create sample project:
Step 1: Create CommonData and Treatment_processes dict with LF and WTE:
>>> from swolfpy_inputdata import CommonData
>>> from swolfpy_processmodels import LF
>>> from swolfpy_processmodels import WTE
>>> common_data = CommonData()
>>> Treatment_processes = {}
>>> Treatment_processes['LF']={'input_type':['RWC','Bottom_Ash','Fly_Ash','Other_Residual'],'model': LF()}
>>> Treatment_processes['WTE']={'input_type':['RWC','Other_Residual'],'model': WTE()}
Step 2: Create :ref:`Distance <Distance>` object:
>>> from swolfpy_processmodels import Distance
>>> import pandas as pd
>>> Processes = ['LF','WTE','SF_COl']
>>> data = Distance.create_distance_table(Processes, ['Heavy Duty Truck'], default_dist=20)
>>> distance = Distance(data)
Step 3: Create Collection_processes dict with one single family sector which only has residual waste collection:
>>> from swolfpy_processmodels import SF_Col
>>> Collection_processes = {}
>>> Collection_scheme_SF_COL={'RWC':{'Contribution':1,'separate_col':{'SSR':0,'DSR':0,'MSR':0,'MSRDO':0,'SSYW':0,'SSYWDO':0}},
>>> 'SSO_DryRes':{'Contribution':0,'separate_col':{'SSR':0,'DSR':0,'MSR':0,'MSRDO':0,'SSYW':0,'SSYWDO':0}},
>>> 'REC_WetRes':{'Contribution':0,'separate_col':{'SSR':0,'DSR':0,'MSR':0,'MSRDO':0,'SSYW':0,'SSYWDO':0}},
>>> 'MRDO':{'Contribution':0,'separate_col':{'SSR':0,'DSR':0,'MSR':0,'MSRDO':0,'SSYW':0,'SSYWDO':0}}}
>>> Collection_processes['SF_COl']={'input_type':[],'model': SF_Col('SF_COl',Collection_scheme_SF_COL,
>>> Treatment_processes=Treatment_processes,Distance=distance)}
Step 4: Create project:
>>> from swolfpy import Project
>>> demo = Project('demo',common_data,Treatment_processes,distance,Collection_processes)
>>> demo.init_project()
>>> demo.write_project()
>>> demo.group_exchanges()
>>> demo.parameters.default_parameters_list()
[{'name': 'frac_of_Bottom_Ash_from_WTE_to_LF', 'amount': 1.0},
{'name': 'frac_of_Fly_Ash_from_WTE_to_LF', 'amount': 1.0},
{'name': 'frac_of_RWC_from_SF_COl_to_LF', 'amount': 0.5},
{'name': 'frac_of_RWC_from_SF_COl_to_WTE', 'amount': 0.5}]
>>> demo.update_parameters(demo.parameters.default_parameters_list())
"""
def __init__(
self,
project_name,
CommonData,
Treatment_processes,
Distance,
Collection_processes=None,
Technosphere_obj=None,
):
if Technosphere_obj:
self.Technosphere = Technosphere_obj
self.project_name = self.Technosphere.project_name
if self.project_name != project_name:
raise Warning(
"The project name should be same with the name selected for creating the technosphere"
)
else:
self.project_name = project_name
self.Technosphere = Technosphere(self.project_name)
self.CommonData = CommonData
self.Treatment_processes = Treatment_processes
self.Collection_processes = Collection_processes
self.Distance = Distance
if self.Collection_processes:
for j in self.Collection_processes.keys():
self.Treatment_processes[j] = self.Collection_processes[j]
self.processes = list(self.Treatment_processes.keys())
self.processTypes = {}
for p in self.processes:
self.processTypes[p] = self.Treatment_processes[p]["model"].Process_Type
projects.set_current(self.project_name)
self.waste_treatment = {}
for i in self.CommonData.All_Waste_Pr_Index:
self.waste_treatment[i] = self._find_destination(i)
self.process_model = {}
# Creating swolfpy parameter class
self.parameters = Parameters(self.Treatment_processes, self.CommonData)
self._progress = 0
[docs]
def _find_destination(self, product):
"""
Find the processes that can treat the `product`. This function check the
``input_type`` in the ``Treatment_processes`` dictionary.
:param product: Waste product e.g., RWC, Fly_Ash, Separated_Organics, Other_Residual
:type product: str
:return: A list of the discovered processes in the ``Treatment_processes`` dictionary that can treat the `product`
:rtype: list
"""
destination = []
for P in self.Treatment_processes:
if product in self.Treatment_processes[P]["input_type"]:
destination.append(P)
return destination
[docs]
def init_project(self, signal=None):
"""
Calls the Create_Technosphere_ method to initialize a project.
This function create an empty database for each process as a placeholder, so
swolfpy can browse these databases in the next step (writing project) and create
exchanges between them.
"""
if signal:
self._progress += 5
signal.emit(self._progress)
self.Technosphere.Create_Technosphere()
if signal:
self._progress += 10
signal.emit(self._progress)
# Initializing the databases
for DB_name in self.Treatment_processes:
if self.Treatment_processes[DB_name]["model"].Process_Type in [
"Treatment",
"Collection",
]:
ProcessDB.init_DB(DB_name, self.CommonData.Index)
elif self.Treatment_processes[DB_name]["model"].Process_Type == "Reprocessing":
ProcessDB.init_DB(DB_name, self.CommonData.Reprocessing_Index)
elif self.Treatment_processes[DB_name]["model"].Process_Type == "RDF":
ProcessDB.init_DB(DB_name, ["RDF"])
Database("waste").register()
self.waste_BD = Database("waste")
if signal:
self._progress += 5
signal.emit(self._progress)
[docs]
def write_project(self, signal=None):
"""
Call the import_database_ for all the process models.
"""
self.parameters_dict = {}
self.parameters_list = []
self.act_include_param = {}
for j in self.Treatment_processes:
(P, G) = self._import_database(j)
self.parameters_dict[j] = P
self.act_include_param[j] = G
self.parameters_list += P
if signal:
self._progress += 10
signal.emit(self._progress)
[docs]
def _import_database(self, name):
"""
.. _import_database:
Instantiate the ProcessDB_ class for the process model and gets the LCI report
from it; then translates the report for Brightway2 and populates the databases by
Write_DB_ method.
:return: Returns a tuple (parameters,act_in_group)
:rtype: tuple
"""
self.process_model[name] = ProcessDB(
name, self.waste_treatment, self.CommonData, self.processTypes, self.Distance
)
self.Treatment_processes[name]["model"].calc()
self.process_model[name].Report = self.Treatment_processes[name]["model"].report()
if self.Treatment_processes[name]["model"].Process_Type in ["Treatment", "Collection"]:
(P, G) = self.process_model[name].Write_DB(
waste_flows=self.CommonData.Index,
parameters=self.parameters,
Process_Type=self.Treatment_processes[name]["model"].Process_Type,
)
elif self.Treatment_processes[name]["model"].Process_Type == "Reprocessing":
(P, G) = self.process_model[name].Write_DB(
waste_flows=self.CommonData.Reprocessing_Index,
parameters=self.parameters,
Process_Type=self.Treatment_processes[name]["model"].Process_Type,
)
elif self.Treatment_processes[name]["model"].Process_Type == "Transfer_Station":
(P, G) = self.process_model[name].Write_DB(
waste_flows=self.Treatment_processes[name]["model"]._Extened_Index,
parameters=self.parameters,
Process_Type=self.Treatment_processes[name]["model"].Process_Type,
)
elif self.Treatment_processes[name]["model"].Process_Type == "RDF":
(P, G) = self.process_model[name].Write_DB(
waste_flows=["RDF"],
parameters=self.parameters,
Process_Type=self.Treatment_processes[name]["model"].Process_Type,
)
return (P, G)
[docs]
def report_parameters(self):
"""
Reports the `parameters` in dictionary format.
:return: dictionary include the processes as key and parameters in each process as
values.
:rtype: dict
"""
return self.parameters_dict
[docs]
def report_parameters_list(self):
"""
Reports the `parameters` in list format.
:return: List of `parameters` (waste fractions) in the project
:rtype: list
"""
return self.parameters_list
[docs]
def group_exchanges(self, signal=None):
"""
Create a group for the `parameters` in each database and add the exchanges that
include these `parameters` to this group.
As a results, model know to update the values in those exchanges when the
`parameter` is updated
"""
for j in self.processes:
print(
"""
Grouping the exchanges with parameters in Database {}
""".format(
j
)
)
if len(self.act_include_param[j]) > 0:
for r in self.act_include_param[j]:
parameters.add_exchanges_to_group(j, r)
if signal:
self._progress += 70 / len(self.processes)
signal.emit(self._progress)
if signal:
signal.emit(100)
[docs]
def update_parameters(self, new_param_data, signal=None):
"""
Updates the `parameters` and their related exchanges based on the
`new_param_data`.
:param new_param_data: List of `parameters` (waste fractions) in the project with new values
:type new_param_data: list
.. note:: `parameters` are waste fractions which show what fraction of waste from one source
go to different destinations, so sum of parameters from each source should be 1. (0<= `parameters` <=1)
"""
progress = 0
if signal:
signal.emit(progress)
for j in new_param_data:
for k in self.parameters_list:
if k["name"] == j["name"]:
self.parameters.update_values(j["name"], j["amount"])
if self.parameters.check_sum():
for j in new_param_data:
for k in self.parameters_list:
if k["name"] == j["name"]:
k["amount"] = j["amount"]
parameters.new_project_parameters(new_param_data)
for j in self.processes:
if len(self.act_include_param[j]) > 0:
ActivityParameter.recalculate_exchanges(j)
progress += 100 / len(self.processes)
if signal:
signal.emit(progress)
else:
for j in new_param_data:
for k in self.parameters_list:
if k["name"] == j["name"]:
self.parameters.update_values(k["name"], k["amount"])
[docs]
def create_scenario(self, input_dict, scenario_name):
"""
Creates a new scenario (activity).
"""
# Calculate Unit
mass = 0
for P in input_dict:
for y in input_dict[P]:
if input_dict[P][y] != 0:
unit_i = get_activity((P, y)).as_dict()["unit"].split(sep=" ")
if len(unit_i) > 1:
mass += float(unit_i[0]) * input_dict[P][y]
if unit_i[0] == "Mg/year":
mass += 1 * input_dict[P][y]
new_act = self.waste_BD.new_activity(
code=scenario_name,
name=scenario_name,
type="process",
unit="{} Mg/year".format(np.round(mass, decimals=2)),
**{"reference product": scenario_name},
)
new_act.save()
new_act.new_exchange(input=new_act, amount=1, type="production").save()
for P in input_dict:
for y in input_dict[P]:
if input_dict[P][y] != 0:
new_act.new_exchange(
input=(P, y), amount=input_dict[P][y], type="technosphere"
).save()
[docs]
@staticmethod
def setup_LCA(name, functional_units, impact_methods):
"""
Perform LCA by instantiating the ``bw2calc.multi_lca`` class from Brightway2.
``bw2calc.multi_lca`` is a wrapper class for performing LCA calculations with many
functional units and LCIA methods.
"""
if len(functional_units) > 0 and len(impact_methods) > 0:
calculation_setups[name] = {"inv": functional_units, "ia": impact_methods}
MultiLca = MultiLCA(name)
index = [str(x) for x in list(MultiLca.all.keys())]
columns = [str(x) for x in impact_methods]
results = pd.DataFrame(MultiLca.results, columns=columns, index=index)
return results
else:
raise ValueError("Check the in inputs")
[docs]
@staticmethod
def contribution_analysis(
functional_unit,
impact_method,
limit,
limit_type="number",
target="emissions",
figsize=(6, 4),
font_scale=1,
):
"""
Perform LCA by instantiating the ``bw2calc.lca.LCA`` class from ``Brightway2`` and
then perform contribution analysis by ``bw2analyzer.ContributionAnalysis`` class.
Check the following functions in ``bw2analyzer`` for more info:
* ``bw2analyzer.ContributionAnalysis.annotated_top_processes``
* ``bw2analyzer.ContributionAnalysis.annotated_top_emissions``
"""
lca = LCA(functional_unit, impact_method)
lca.lci()
lca.lcia()
impacts = []
amounts = []
activities = []
flow_unit = []
compartments = []
f = font_scale * 14
if target == "activities":
data = ContributionAnalysis().annotated_top_processes(
lca, limit=50, limit_type="number"
)
else:
data = ContributionAnalysis().annotated_top_emissions(
lca, limit=50, limit_type="number"
)
for impact, amount, act in data:
impacts.append(impact)
amounts.append(amount)
flow_unit.append(act.as_dict()["unit"])
if target == "activities":
activities.append(act.as_dict()["name"].replace("_", " "))
else:
activities.append(act.as_dict()["name"])
compartments.append(act.as_dict()["categories"])
if target == "activities":
top_df = pd.DataFrame(columns=["Activity", "Flow", "Flow Unit", "Contribution", "Unit"])
top_df["Activity"] = activities
else:
top_df = pd.DataFrame(
columns=["Emission", "Compartment", "Flow", "Flow Unit", "Contribution", "Unit"]
)
top_df["Emission"] = activities
top_df["Compartment"] = compartments
top_df["Flow"] = amounts
top_df["Flow Unit"] = flow_unit
top_df["Contribution"] = impacts
top_df["Unit"] = Method(lca.method).metadata["unit"]
if limit_type == "number":
DF = top_df.loc[0:limit, :]
else:
for i, j in enumerate(top_df["Contribution"]):
if abs(j) <= abs((limit * lca.score)):
break
DF = top_df.loc[0:i, :]
plot_DF = pd.DataFrame(
data=[list(DF["Contribution"])],
columns=DF.iloc[:, 0].values,
index=list(functional_unit.keys()),
)
legend = ["Net"]
if target == "emissions":
for x, y in DF[["Emission", "Compartment"]].values:
y = str(y).replace("'", "")
legend.append("{}\n{}".format(x, y))
else:
for x in DF["Activity"].values:
if len(x) > 20:
x = x[0:15] + x[15:].replace(" ", "\n", 1)
legend.append(x)
fig, ax = plt.subplots(figsize=figsize)
plot_DF.plot(kind="bar", stacked=True, ax=ax, alpha=0.9)
ax.set_title("Contribution to {}".format(str(lca.method).replace("'", "")), fontsize=f)
ax.scatter(0, lca.score, s=50, marker="D", edgecolor="w", facecolor="k")
ax.legend(legend, fontsize=f, bbox_to_anchor=(1, 0, 0.2, 1), loc=2)
ax.tick_params(axis="both", which="major", labelsize=f, rotation="auto")
ax.tick_params(axis="both", which="minor", labelsize=f, rotation="auto")
ax.set_ylabel(Method(lca.method).metadata["unit"], fontsize=f)
return (DF, (fig, ax))
[docs]
def save(self, filename):
"""
Dumps the ``Project`` class to pickle file. User can load the pickle and use it
later.
:param filename:
:type filename: str
"""
with open(filename, "wb") as f:
pickle.dump(self, f)