# -*- coding: utf-8 -*-
import json
import multiprocessing as mp
import os
from copy import deepcopy
from functools import partial
from multiprocessing import Pool, cpu_count
from multiprocessing.dummy import Pool as ThreadPool
from time import time
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import pyDOE
from brightway2 import projects
from plotly.offline import plot
from scipy.optimize import minimize
from .LCA_matrix import LCA_matrix
[docs]
class Optimization(LCA_matrix):
"""
:param functional_unit:
:type functional_unit: dict
:param method:
:type method: list
:param project:
:type project: ``swolfpy.Project.Project``
"""
def __init__(self, functional_unit, method, project):
super().__init__(functional_unit, method)
self.project = project
self.Treatment_processes = deepcopy(self.project.Treatment_processes)
self.Collection_processes = deepcopy(self.project.Collection_processes)
self.N_param = len(self.project.parameters_list)
self.n_scheme_vars = 0
[docs]
@staticmethod
def get_config(project):
columns = []
schemes = {}
for col in project.Collection_processes:
columns.append(col)
columns.append(col + " mode")
schemes[col] = project.Collection_processes[col]["model"].col_schm
index = [
("RWC", "N/A", "N/A"),
("RWC", "N/A", "SSR"),
("RWC", "SSYW", "N/A"),
("RWC", "SSYW", "SSR"),
("RWC", "SSO", "N/A"),
("RWC", "SSO", "SSR"),
("RWC", "SSO_AnF", "N/A"),
("RWC", "SSO_AnF", "SSR"),
("REC_WetRes", "N/A", "REC_WetRes"),
("REC_WetRes", "SSYW", "REC_WetRes"),
("REC_WetRes", "SSO", "REC_WetRes"),
("REC_WetRes", "SSO_AnF", "REC_WetRes"),
("ORG_DryRes", "ORG_DryRes", "N/A"),
("ORG_DryRes", "ORG_DryRes", "SSR"),
]
config_pd = pd.DataFrame(index=index, columns=columns)
if len(config_pd.columns) > 0:
for c in columns[1::2]:
config_pd[c] = [
"Optimize",
"Optimize",
"Optimize",
"Optimize",
"Optimize",
"Optimize",
"Fix",
"Fix",
"Fix",
"Fix",
"Fix",
"Fix",
"Fix",
"Fix",
]
for col, sch in schemes.items():
for k, v in sch.items():
if k in index:
config_pd[col][k] = v
return config_pd.fillna(0.0)
[docs]
def set_config(self, config):
self.config = config
self.scheme_vars_index = self.N_param
self.scheme_vars_dict = {}
self.x0_col = []
for c in self.config.columns[1::2]:
for i in self.config.index:
if self.config[c][i] == "Optimize":
self.scheme_vars_dict[self.scheme_vars_index] = (c.split(" mode")[0], i)
self.x0_col.append(self.config[c.split(" mode")[0]][i])
self.scheme_vars_index += 1
self.n_scheme_vars += 1
for process in self.config.columns[0::2]:
for schm in self.config.index:
if schm in self.Treatment_processes[process]["model"].col_schm:
self.Treatment_processes[process]["model"].col_schm[schm] = self.config.loc[
[schm], process
].values[0]
[docs]
def update_col_scheme(self, x):
process_set = set()
if self.n_scheme_vars:
for k, v in self.scheme_vars_dict.items():
process = v[0]
process_set.add(process)
self.Treatment_processes[process]["model"].col_schm[v[1]] = x[k]
for process in process_set:
self.Treatment_processes[process]["model"]._normalize_scheme(
DropOff=False, warn=False
)
### Objective function
[docs]
def _objective_function(self, x):
"""
Use the new parameters (Waste fractions) to update the ``tech_matrix``
(``tech_param``) and recalculate the LCA score.
"""
if self.oldx != list(x): # Calculations are done only when the function get new x.
if self.oldx[0 : self.N_param] != list(x)[0 : self.N_param]:
param_exchanges = self.project.parameters.Param_exchanges(x[0 : self.N_param])
for key, value in param_exchanges.items():
if key in self.tech_matrix:
self.tech_matrix[key] = value
if self.collection and self.oldx[self.N_param :] != list(x)[self.N_param :]:
self.update_col_scheme(x)
for col in self.Collection_processes:
model = self.Treatment_processes[col]["model"]
model.calc()
report_dict = model.report()
process_name = model.process_name
LCA_matrix.update_techmatrix(process_name, report_dict, self.tech_matrix)
LCA_matrix.update_biomatrix(process_name, report_dict, self.bio_matrix)
tech = np.array(list(self.tech_matrix.values()), dtype=float)
bio = np.array(list(self.bio_matrix.values()), dtype=float)
self.rebuild_technosphere_matrix(tech)
self.rebuild_biosphere_matrix(bio)
self.lci_calculation()
if self.lcia: # pylint: disable=using-constant-test
self.lcia_calculation()
self.oldx = list(x)
return self.score / 10**self.magnitude
### Mass to process
[docs]
def get_mass_flow_from_supply_array(self, key, KeyType, x):
"""
Calculate the mass to the process from the `supply_array` matrix.
"""
self._objective_function(x)
mass_flow = 0
if KeyType == "WasteToProcess":
for i in range(len(self.supply_array)):
if key == self.activities_dict[i]:
mass_flow += self.supply_array[i]
elif KeyType == "Process":
for i in range(len(self.supply_array)):
if key == self.activities_dict[i][0]:
mass_flow += self.supply_array[i]
else:
raise ValueError(
""" KeyType for the get_mass_flow_from_supply_array function is not defined correct."""
)
return mass_flow
### Emission flow in LCI
[docs]
def get_emission_amount(self, emission, x):
"""
Calculate the mass of the `emission` to biosphere from the `inventory`.
"""
self._objective_function(x)
inventory = self.biosphere_matrix * self.supply_array
emission_amount = 0
for i in range(len(inventory)):
if emission == self.biosphere_dict[i]:
emission_amount += inventory[i]
return emission_amount
### Calculates impacts other than objective
[docs]
def get_impact_amount(self, impact, x):
"""
Calculates impacts other than objective.
"""
self._objective_function(x)
self.switch_method(impact)
self.lcia()
score = self.score
self.switch_method(self._base_method)
self.lcia()
return score
[docs]
def _create_equality(self, N_param_Ingroup):
"""
Check that the sum of parameters in each group should be one.
"""
local_index = self.Param_index
f = lambda x: sum([x[i] for i in range(local_index, local_index + N_param_Ingroup)]) - 1
self.Param_index += N_param_Ingroup
return f
[docs]
def _create_inequality(self, key, limit, KeyType, ConstType):
"""
:param key: process name, key for activity in process or key for activity in biosphere
:type key: str or tuple
:param limit:
:type limit: float
:param KeyType: ``"Process"``, ``"WasteToProcess"``, ``"Emission"``
:type KeyType: str
:param ConstType: ``"<="`` , ``">="``
:type ConstType: str
"""
if ConstType not in ["<=", ">="]:
raise ValueError(" Constraint Type is not defined correct ")
if KeyType == "Process":
if ConstType == "<=":
f = lambda x: limit - self.get_mass_flow_from_supply_array(key, KeyType, x)
else:
f = lambda x: self.get_mass_flow_from_supply_array(key, KeyType, x) - limit
return f
elif KeyType == "WasteToProcess":
if ConstType == "<=":
f = lambda x: limit - self.get_mass_flow_from_supply_array(key, KeyType, x)
else:
f = lambda x: self.get_mass_flow_from_supply_array(key, KeyType, x) - limit
return f
elif KeyType == "Emission":
if ConstType == "<=":
f = lambda x: limit - self.get_emission_amount(key, x)
else:
f = lambda x: self.get_emission_amount(key, x) - limit
return f
elif KeyType == "Impact":
if ConstType == "<=":
f = lambda x: limit - self.get_impact_amount(key, x)
else:
f = lambda x: self.get_impact_amount(key, x) - limit
return f
return None
[docs]
def _create_collection_constraints(self, cons):
const_dict = {}
if self.n_scheme_vars:
for k in self.scheme_vars_dict:
process = self.scheme_vars_dict[k][0]
if process not in const_dict:
const_dict[process] = []
const_dict[process].append(k)
print("\n\n collection constraints dict: \n", const_dict, "\n\n")
for k in const_dict:
self._col_const_helper(const_dict, k, cons)
[docs]
def _col_const_helper(self, const_dict, k, cons):
def helper_sum(x, index):
return sum([x[i] for i in index])
fix = 0
for i in self.config.index:
if self.config[k + " mode"][i] == "Fix":
fix += self.config[k][i]
cons.append(
{
"type": "eq",
"fun": (lambda x: helper_sum(x, const_dict[k]) + fix - 1),
"Name": "{} main const".format(k),
}
)
[docs]
def _create_constraints(self):
cons = list()
group = dict()
# Index for the parameters
self.Param_index = 0
# Number of parameters in each group (from one source to different destinations)
for key in self.project.parameters.param_uncertainty_dict.keys():
group[key] = len(self.project.parameters.param_uncertainty_dict[key])
# Equal constraint (sum of the parameters in each group should be one)
for vals in group.values():
cons.append({"type": "eq", "fun": self._create_equality(N_param_Ingroup=vals)})
if self.collection and self.n_scheme_vars:
self._create_collection_constraints(cons)
if self.constraints:
for key in self.constraints.keys():
cons.append(
{
"type": "ineq",
"fun": self._create_inequality(
key,
self.constraints[key]["limit"],
self.constraints[key]["KeyType"],
self.constraints[key]["ConstType"],
),
}
)
return cons
[docs]
@staticmethod
def multi_start_optimization(
optObject,
constraints=None,
collection=False,
n_iter=30,
nproc=None,
timeout=None,
initialize_guess="random",
):
"""
Call the ``scipy.optimize.minimize()`` to minimize the LCA score.
Notes
-----
- ``constraints`` is python dictionary.
- Constraint type can be ``'<='`` or ``'>='``.
- Three kind of constraints are defined as below:
* **Process:** Constraint on the total mass to the process. The ``'KeyType'`` should be ``'Process'``
(e.g., The capacity of the WTE). Example:
>>> constraints = {}
>>> # Use name the the process as key in dict
>>> constraints['WTE'] = {'limit':100, 'KeyType':'Process','ConstType':"<="}
* **WasteToProcess:** Constraint on the total mass of waste fraction to the process. The ``'KeyType'`` should
be ``'WasteToProcess'`` (e.g., Ban food waste from landfill). Example:
>>> constraints = {}
>>> # Use database key as key in dict
>>> constraints[('LF','Food_Waste_Vegetable')] = {'limit':0, 'KeyType':'WasteToProcess','ConstType':"<="}
* **Emission:** Constraint on the emissions. The ``'KeyType'`` should be ``'Emission'`` (e.g., CO2 emissions Cap). Example:
>>> constraints = {}
>>> # Use database key as key in dict
>>> constraints[('biosphere3', 'eba59fd6-f37e-41dc-9ca3-c7ea22d602c7')] = {'limit':100,'KeyType':'Emission','ConstType':"<="}
"""
optObject.constraints = constraints
optObject.collection = collection
optObject.magnitude = len(str(int(abs(optObject.score))))
global_min = 1e100
if optObject.collection:
n_dec_vars = len(optObject.project.parameters_list) + optObject.n_scheme_vars
else:
n_dec_vars = len(optObject.project.parameters_list)
bnds = tuple([(0, 1) for _ in range(n_dec_vars)])
args = []
if initialize_guess == "LHS":
all_x0 = pyDOE.lhs(n_dec_vars, samples=n_iter)
elif initialize_guess == "random":
all_x0 = np.random.rand(n_iter, n_dec_vars)
elif initialize_guess == "binary":
all_x0 = np.random.randint(low=0, high=2, size=(n_iter, n_dec_vars))
else:
raise ValueError(
f"The {initialize_guess} method for generating the initial guess is not correct!"
)
for j in range(n_iter):
args.append((optObject, bnds, all_x0[j], j))
if not nproc:
nproc = cpu_count()
all_results = []
with Pool(processes=nproc, maxtasksperchild=1) as pool:
for arg in args:
abortable_func = partial(
Optimization.abortable_worker, Optimization.worker, timeout=timeout
)
all_results.append(pool.apply_async(abortable_func, args=arg))
results = [res.get() for res in all_results]
optObject.all_results = results
optObject.res_global = False
for i, res in enumerate(optObject.all_results):
if res:
if res.success:
if res.fun < global_min:
optObject.res_global = res
global_min = res.fun
print(
"""\n
Iteration: {}
Status: {}, Message: {}
Objective function: {}
Global min: {} \n
""".format(
i,
res.success,
res.message,
res.fun * 10**optObject.magnitude,
global_min * 10**optObject.magnitude,
)
)
else:
print(
"""\n
Iteration: {}
Status: {}, Message: {}
Objective function: {}
Global min: {} \n
""".format(
i,
False,
"Aborting due to timeout!",
"NAN",
global_min * 10**optObject.magnitude,
)
)
if not optObject.res_global:
optObject.success = False
print("None of the iterations were successful!")
return None
if optObject.res_global.success:
optObject.oldx = [0 for i in range(n_dec_vars)]
optObject.success = True
optObject.optimized_x = list()
optObject.res_global.x = optObject.res_global.x.round(decimals=4)
optObject._objective_function(optObject.res_global.x)
for i in range(len(optObject.project.parameters_list)):
optObject.optimized_x.append(
{
"name": optObject.project.parameters_list[i]["name"],
"amount": optObject.res_global.x[i],
}
)
if optObject.collection:
for k, v in optObject.scheme_vars_dict.items():
optObject.optimized_x.append({"name": v, "amount": optObject.res_global.x[k]})
return optObject.res_global
else:
optObject.success = False
print(optObject.res_global.message)
return optObject.res_global
[docs]
@staticmethod
def abortable_worker(func, *args, **kwargs):
iteration = args[3]
timeout = kwargs.get("timeout", None)
p = ThreadPool(1)
res = p.apply_async(func, args)
try:
return res.get(timeout) # Wait timeout seconds for func to complete.
except mp.TimeoutError:
print("(Iteration:{}, PID:{}): Aborting due to timeout!".format(iteration, os.getpid()))
return None
[docs]
@staticmethod
def worker(optObject, bnds, x0, iteration):
start = time()
projects.set_current(optObject.project.project_name, writable=False)
print("Iteration: {} PID: {}\n".format(iteration, os.getpid()))
optObject.oldx = [0 for i in range(len(x0))]
optObject.cons = optObject._create_constraints()
res = minimize(
optObject._objective_function,
x0,
method="SLSQP",
bounds=bnds,
constraints=optObject.cons,
)
time_ = round(time() - start)
print(
"Iteration: {} PID: {} time:{} sec, Success:{} \n".format(
iteration, os.getpid(), time_, res.success
)
)
res["time"] = time_
res["PID"] = os.getpid()
return res
[docs]
def set_optimized_parameters_to_project(self):
assert hasattr(self, "project"), "Must run optimize_parameters first"
assert self.success, "Optimization has to be successful first"
self.project.update_parameters(self.optimized_x)
[docs]
def plot_sankey(self, optimized_flow=True, show=True, fileName=None, params=None):
"""
Plots a sankey diagram for the waste mass flows. Calls the
``plotly.graph_objs.Sankey`` to plot sankey. Calculates the mass flows by calling
``self.get_mass_flow_from_supply_array()``.
:param optimized_flow: If ``True``, it plots the sankey based on the optimized waste fractions.
If ``False``, it plots the sankey based on the current waste fractions by calling ``self.project.parameters_list``.
:type optimized_flow: bool
:param show: If ``True``, it will show the figure
:type show: bool
"""
if optimized_flow:
params = [i["amount"] for i in self.optimized_x]
else:
params = params or [i["amount"] for i in self.project.parameters_list]
self.oldx = [0 for i in range(len(params))]
self.magnitude = len(str(int(abs(self.score))))
self.N_param = len(self.project.parameters_list)
self.col_model = []
product = []
index = 0
for _, i in self.project.parameters.param_uncertainty_dict.items():
for j in i:
product.append((j[3], params[index]))
index += 1
for _, i in self.project.parameters.static_param_dict.items():
for j in i:
product.append((j[3], 1))
label = self.project.parameters.nodes
source = []
target = []
value = []
label_link = []
color = []
# Color & shape for plotting the SWM Network
# https://www.rapidtables.com/web/color/RGB_Color.html
edge_color = {
"RWC": (160, 82, 45), # sienna #A0522D
"SSR": (0, 0, 255), # blue #0000FF
"DSR": (0, 0, 205), # medium blue #0000CD
"MSR": (65, 105, 225), # royal blue #4169E1
"LV": (0, 255, 0), # lime #00FF00
"SSYW": (0, 100, 0), # dark green #006400
"SSO": (0, 255, 127), # spring green #00FF7F
"SSO_HC": (128, 128, 0), # olive #808000
"SSO_AnF": (127, 255, 0), # chartreuse #7FFF00
"ORG": (46, 139, 87), # sea green #2E8B57
"DryRes": (222, 184, 135), # burly wood #DEB887
"REC": (0, 191, 255), # deep sky blue #00BFFF
"WetRes": (210, 105, 30), # chocolate #D2691E
"MRDO": (205, 133, 63), # peru #CD853F
"SSYWDO": (107, 142, 35), # olive drab #6B8E23
"MSRDO": (106, 90, 205), # slate blue #6A5ACD
"Bottom_Ash": (128, 128, 128), # Gray #808080
"Fly_Ash": (0, 0, 0), # black #000000
"Unreacted_Ash": (128, 128, 128), # Gray #808080
"Separated_Organics": (50, 205, 50), # lime green #32CD32
"Separated_Recyclables": (0, 128, 128), # teal #008080
"Other_Residual": (139, 69, 19), # saddle brown #8B4513
"RDF": (255, 0, 0),
} # Red #FF0000
for i in self.project.CommonData.Reprocessing_Index:
edge_color[i] = (0, 0, 139) # dark blue #00008B
for x in product:
key, frac = x
source.append(label.index(key[0]))
target.append(label.index(key[1]))
label_link.append(key[2])
# color.append('rgba({},{},{}, 0.8)'.format(*np.random.randint(256, size=3)))
color.append("rgba({}, {}, {}, 0.8)".format(*edge_color[key[2]]))
mass = 0.0
for m in self.project.CommonData.Index + ["RDF"]:
mass += self.get_mass_flow_from_supply_array(
(key[0] + "_product", m + "_" + key[2]), "WasteToProcess", params
)
mass += self.get_mass_flow_from_supply_array(
(key[0] + "_product", key[2]), "WasteToProcess", params
)
value.append(np.round(mass * frac, 3))
print(
"""
# Sankey Mass flows
label = {}
source = {}
target = {}
label_link = {}
value = {}""".format(
label, source, target, label_link, value
)
)
node = dict(
pad=20,
thickness=20,
line=dict(color="black", width=0.5),
label=label,
color="rgba({}, {}, {}, 0.8)".format(*(176, 196, 222)),
) # light steel blue #B0C4DE
link = dict(source=source, target=target, value=value, label=label_link, color=color)
# The other good option for the value format is ".3f". Yes
score = self._objective_function(params) * 10**self.magnitude
if score >= 1000 or score <= -1000:
score = "{:,.0f}".format(score)
elif -0.1 <= score <= 0.1:
score = "{:,.4f}".format(score)
elif score - 1 <= score <= 1:
score = "{:,.3f}".format(score)
else:
score = "{:,.2f}".format(score)
layout = go.Layout(
title_text="LCIA: " + str(self.method[0]) + f"= {score}",
font_size=16,
hoverlabel=dict(font_size=14),
)
data = go.Sankey(valueformat=".3s", valuesuffix="Mg", node=node, link=link)
fig = go.Figure(data=[data], layout=layout)
plot(fig, filename=fileName if fileName else "plot.html", auto_open=show)
# Store data for ploting the sankey
store_data = {}
store_data["title_text"] = "Impact " + str(self.method[0]) + f": {score}"
store_data["font_size"] = 16
store_data["hoverlabel"] = dict(font_size=14)
store_data["valueformat"] = ".3s"
store_data["valuesuffix"] = "Mg"
store_data["node"] = node
store_data["link"] = link
filename = fileName.split(".")[0] + ".JSON" if fileName else "Sankey_Data.JSON"
with open(filename, mode="w", encoding="utf-8") as outfile:
json.dump(store_data, outfile, indent=4)