Source code for lib_dd.apps.ccd_single

# jupyter notebook app for ccd_single
import os
import logging
import tempfile
import shutil
import datetime

import ipywidgets
import ipywidgets as widgets
import IPython
from IPython.core.display import display, HTML
import numpy as np

import NDimInv.plot_helper
plt, mpl = NDimInv.plot_helper.setup()
import lib_dd.decomposition.ccd_single as ccd_single
import lib_dd.config.cfg_single as cfg_single
import lib_dd.plot


[docs]class ccd_single_app(object): def __init__(self, frequency_file, data_file, no_logging=False): self._setup_logger(disabled=no_logging) self._check_versions() self.style = {'description_width': 'initial'} self.frequencies = np.loadtxt(frequency_file) self.data = np.loadtxt(data_file) self.widgets = {} # items to be arranged in a VBox later self.items = [] # store containers for parameter groups here self.containers = {} self.vbox = None self.print_data_summary() # we generate links to the online documentation. To simplify things, # use the url-prefix from here self.url = 'https://m-weigand.github.io/ccd_tools/doc_ccd/' self._init_app_widgets() def _check_versions(self): if int(ipywidgets.__version__[0]) < 7: raise Exception( 'ipywidgets version is too old. Need at least major ' + 'version number 7, is: {0}'.format(ipywidgets.__version__) ) def _setup_logger(self, disabled=False): logging.basicConfig(level=logging.INFO) if disabled: logging.disable(100)
[docs] def enable_logger(self): logging.disable(logging.NOTSET)
[docs] def disable_logger(self): logging.disable(100)
[docs] def print_data_summary(self): """Print a summary of the data, e.g., number of frequencies, min/max values """ summary = [] # frequency statistics summary.append( 'Number of frequencies: {0}'.format(len(self.frequencies)) ) summary.append( 'Minimum frequency: {0} Hz'.format(self.frequencies.min()) ) summary.append( 'Maximum frequency: {0} Hz'.format(self.frequencies.max()) ) # data statistics summary.append('') summary.append('number of SIP spectra: {0}'.format( np.atleast_2d(self.data).shape[0]) ) summary.append('<hr />') display(HTML('<h2>Data summary</h2>')) display(HTML('<br />'.join(summary)))
def _help_url(self, postfix): help_str = '<a href="{0}/{1}"'.format( self.url, postfix ) + ' target="_blank">Help</a>' return help_str def _add_dformat_selection(self): w_dformat = widgets.Dropdown( options=[ 'rmag_rpha', 'lnrmag_rpha', 'log10rmag_rpha', 'rmag_rpha', 'rre_rim', 'rre_rmim', 'cmag_cpha', 'cre_cim', 'cre_cmim', ], value='rmag_rpha', description='Input data format:', style=self.style, ) h_dformat = widgets.HTML( value=self._help_url('data_formats.html#input-data-formats') ) hb_dformat = widgets.HBox( children=[w_dformat, h_dformat] ) self.widgets['data_format'] = w_dformat self.items.append(hb_dformat) def _add_condres_selection(self): w_condres = widgets.Dropdown( options={ 'resistivity formulation': '0', 'conductivity formulation': '1', }, value='0', description='Type of kernel:', style=self.style, ) h_condres = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'usage_and_implementation.html#" ' + 'target="_blank">Help</a>', ) hb_condres = widgets.HBox( children=[w_condres, h_condres] ) self.widgets['type_formulation'] = w_condres self.items.append(hb_condres) def _add_flimits(self): w_fmin = widgets.FloatText( value=self.frequencies.min(), description='Frequency min:', style=self.style, ) w_fmax = widgets.FloatText( value=self.frequencies.max(), description='Frequency max:', style=self.style, ) h_fminmax = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'usage_and_implementation.html#" ' + 'target="_blank">Help</a>', ) hb_fminmax = widgets.HBox( children=[w_fmin, w_fmax, h_fminmax] ) self.widgets['fmin'] = w_fmin self.widgets['fmax'] = w_fmax self.items.append(hb_fminmax) self.containers['fminmax'] = hb_fminmax def fminmax_change(change): if change['name'] == 'value': fmin = self.widgets['fmin'].value fmax = self.widgets['fmax'].value # hide all frequencies in fignore that are outside the range fign = self.widgets['fignore'] indices = np.where( (fmin <= self.frequencies) & (self.frequencies <= fmax) )[0] for nr, wdg in enumerate(fign): if nr in indices: wdg.layout.visibility = 'visible' else: wdg.layout.visibility = 'hidden' w_fmin.observe(fminmax_change) w_fmax.observe(fminmax_change) def _add_fignore(self): f_ign = [] for i in range(0, self.frequencies.size): wid = widgets.Checkbox( value=True, description='{0:.3f} Hz'.format(self.frequencies[i]), disabled=False, # width=50, ) f_ign.append(wid) box_layout = widgets.Layout( width='100%', flex_flow='row wrap', display='inline-flex', ) vb_fignore = widgets.VBox( children=f_ign, layout=box_layout, ) accordion = widgets.Accordion( children=[vb_fignore, ] ) accordion.set_title(0, 'Ignore frequencies') if ipywidgets.__version__[0] == '6': accordion.selected_index = -1 else: accordion.selected_index = None self.widgets['fignore'] = f_ign self.items.append(accordion) self.containers['fignore'] = accordion def _add_max_its(self): w_max_its = widgets.IntSlider( value=20, min=0, max=40, step=1, description='Maximum nr of iterations:', disabled=False, continuous_update=False, orientation='horizontal', readout=True, readout_format='d', style=self.style, ) h_max_its = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'usage_and_implementation.html#" ' + 'target="_blank">Help</a>', ) hb_max_its = widgets.HBox( children=[w_max_its, h_max_its] ) self.widgets['max_its'] = w_max_its self.items.append(hb_max_its) self.containers['hb_max_its'] = hb_max_its def _add_nr_terms(self): w_nr_terms = widgets.IntText( value=20, description='Nr of terms per frequency decade:', disabled=False, style=self.style, ) h_nr_terms = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'usage_and_implementation.html#" ' + 'target="_blank">Help</a>', ) # h_nr_terms = widgets.HTML( # value='<iframe src="https://m-weigand.github.io/ccd_tools/' + # 'doc_ccd/"></iframe>', # ) hb_nr_terms = widgets.HBox( children=[w_nr_terms, h_nr_terms] ) self.widgets['nr_terms'] = w_nr_terms self.items.append(hb_nr_terms) self.containers['nr_terms'] = hb_nr_terms def _add_tausel(self): w_tausel = widgets.Dropdown( options=[ 'data_ext', 'data', 'individual', ], value='data_ext', description='Range of tau values:', style=self.style, ) w_tausel_ind_left = widgets.SelectionSlider( options=[1, 10, 100, 1000], value=100, description='left:', disabled=False, continuous_update=False, orientation='horizontal', readout=True ) w_tausel_ind_left.layout.visibility = 'hidden' w_tausel_ind_right = widgets.SelectionSlider( options=[1, 10, 100, 1000], value=100, description='right:', disabled=False, continuous_update=False, orientation='horizontal', readout=True ) w_tausel_ind_right.layout.visibility = 'hidden' h_tausel = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'usage_and_implementation.html#" ' + 'target="_blank">Help</a>', ) hb_tausel = widgets.HBox( children=[ w_tausel, w_tausel_ind_left, w_tausel_ind_right, h_tausel ] ) self.widgets['tausel'] = w_tausel self.widgets['tausel_left'] = w_tausel_ind_left self.widgets['tausel_right'] = w_tausel_ind_right self.items.append(hb_tausel) self.containers['tausel'] = hb_tausel def tausel_change(change): if change['name'] == 'value': if change['new'] == 'individual': w_tausel_ind_left.layout.visibility = 'visible' w_tausel_ind_right.layout.visibility = 'visible' else: w_tausel_ind_left.layout.visibility = 'hidden' w_tausel_ind_right.layout.visibility = 'hidden' w_tausel.observe(tausel_change) def _add_plot_options(self): w_generate_plot = widgets.Checkbox( value=True, description='Generate plot', disabled=False ) w_generate_it_plots = widgets.Checkbox( value=False, description='Generate plots for all iterations', disabled=False, ) w_generate_lcurve = widgets.Checkbox( value=False, description='Plot lcurve', disabled=False ) w_generate_reg_strength = widgets.Checkbox( value=False, description='Plot regularization strength ', disabled=False ) w_generate_output = widgets.Checkbox( value=False, description='Generate output for download', disabled=False ) box_layout = widgets.Layout( width='100%', flex_flow='row wrap', display='inline-flex', border='3px solid black', ) hb_output = widgets.VBox( [ w_generate_plot, w_generate_it_plots, w_generate_lcurve, w_generate_reg_strength, w_generate_output, ], layout=box_layout, ) self.widgets['generate_plot'] = w_generate_plot self.widgets['generate_output'] = w_generate_output self.widgets['generate_it_plots'] = w_generate_it_plots self.widgets['generate_lcurve'] = w_generate_lcurve self.widgets['generate_reg_strength'] = w_generate_reg_strength self.items.append(hb_output) def _add_data_weighting(self): w_data_weighting = widgets.Dropdown( options=[ 're_vs_im', 'avg_im', 'avg_im_err', 'one', 'all_to_one', 'rel_abs_error', ], value='re_vs_im', description='Data weighting scheme:', style=self.style, ) h_data_weighting = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'usage_and_implementation.html#data_weighting" ' + 'target="_blank">Help</a>', ) hb_data_weighting = widgets.HBox( children=[w_data_weighting, h_data_weighting] ) self.widgets['data_weighting'] = w_data_weighting self.items.append(hb_data_weighting) self.containers['data_weighting'] = hb_data_weighting def _add_lambda(self): w_lambda = widgets.IntText( value=10, description='Lambda', disabled=False, ) h_lambda = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'usage_and_implementation.html#data_weighting" ' + 'target="_blank">Help</a>', ) hb_lambda = widgets.HBox( children=[w_lambda, h_lambda] ) self.widgets['lambda'] = w_lambda self.items.append(hb_lambda) def _add_c(self): w_c = widgets.FloatSlider( value=1.0, min=0.1, max=1.0, step=0.1, description='C-value:', disabled=False, continuous_update=False, orientation='horizontal', readout=True, readout_format='.1f', ) h_c = widgets.HTML( value='<a href="https://m-weigand.github.io/ccd_tools/doc_ccd/' + 'theory.html" ' + 'target="_blank">Help</a>', ) hb_c = widgets.HBox( children=[w_c, h_c] ) self.widgets['c'] = w_c self.items.append(hb_c) def _add_normalization(self): w_normalization = widgets.Checkbox( value=False, description='Activate normalization', disabled=False, style=self.style, ) help_normalization = widgets.HTML( value=self._help_url( 'usage_and_implementation.html#normalization' ) ) w_norm_value = widgets.FloatText( value=10, step=1, description='Normalization value:', disabled=True, style=self.style, ) hb_norm = widgets.HBox( children=[ w_normalization, w_norm_value, help_normalization, ], ) self.containers['hb_norm'] = hb_norm def use_norm_change(change): if change['name'] == 'value': if change['new'] is True: w_norm_value.disabled = False else: w_norm_value.disabled = True w_normalization.observe(use_norm_change) self.widgets['use_norm'] = w_normalization self.widgets['norm_value'] = w_norm_value self.items.append(hb_norm) def _add_toggles(self): w_advanced = widgets.ToggleButton( value=True, description='Show advanced options', disabled=False, button_style='', tooltip='', icon='check', style=self.style, layout=widgets.Layout(width='50%'), ) def toggle_advanced(change): if change['name'] == 'value': keys = [ 'hb_norm', 'hb_max_its', 'fignore', 'fminmax', 'nr_terms', 'data_weighting', 'tausel', ] if change['new'] is True: # advanced mode for key in keys: self.containers[key].layout.visibility = 'visible' else: for key in keys: self.containers[key].layout.visibility = 'hidden' w_show_output = widgets.ToggleButton( value=True, description='Show output', disabled=False, button_style='', tooltip='', # icon='check' style=self.style, layout=widgets.Layout(width='50%'), ) hb_toggles = widgets.HBox( children=[w_advanced, w_show_output], layout=widgets.Layout( width='80%', ) ) w_advanced.observe(toggle_advanced) self.widgets['nb_show_output'] = w_show_output self.items.append(hb_toggles) def _init_app_widgets(self): self._add_toggles() self._add_dformat_selection() self._add_condres_selection() self._add_lambda() self._add_c() self._add_flimits() self._add_fignore() self._add_max_its() self._add_nr_terms() self._add_tausel() self._add_data_weighting() self._add_normalization() self._add_plot_options() w_run = widgets.Button( description='Run CCD', disabled=False, tooltip='Run the Cole-Cole decomposition', layout=widgets.Layout(width='50%', height='80px') ) w_run.on_click(self.run_ccd) w_clear = widgets.Button( description='Reset app', layout=widgets.Layout(width='50%', height='80px'), ) def clear_cell(button): IPython.display.clear_output() w_clear.on_click(clear_cell) hb_run = widgets.HBox( children=[ w_run, w_clear, ], layout=widgets.Layout( width='100%', align_items='center' ), ) self.widgets.update({ 'run': w_run, }) header = widgets.HTML( value='<h3>Settings</h3>' ) self.items.insert(0, header) self.items += [ hb_run, ] self.vbox = widgets.VBox(self.items)
[docs] def show(self): display(self.vbox) display(HTML('<hr />'))
[docs] def run_ccd(self, button): """Based on the GUI input, generate a configuration for the CCD and run it """ if self.widgets['nb_show_output'].value is True: self.enable_logger() else: # show at least this note print('Running CCD') self.disable_logger() logging.info('running CCD') # set environment variables os.environ['DD_COND'] = self.widgets['type_formulation'].value os.environ['DD_C'] = '{0:.2f}'.format( self.widgets['c'].value ) # set options using this dict-like object config = cfg_single.cfg_single() config['frequency_file'] = self.frequencies config['data_file'] = self.data config['fixed_lambda'] = int(self.widgets['lambda'].value) # config['plot_it_spectra'] = self.widgets['generate_it_plots'].value config['max_iterations'] = self.widgets['max_its'].value config['nr_terms_decade'] = self.widgets['nr_terms'].value config['data_format'] = self.widgets['data_format'].value config['data_fmin'] = self.widgets['fmin'].value config['data_fmax'] = self.widgets['fmax'].value ignores_raw = self.widgets['fignore'] ignores = ['{0}'.format( nr ) for nr, x in enumerate(ignores_raw) if x.value is False] if len(ignores) == 0: ign_str = None else: ign_str = ','.join(ignores) config['ignore_frequencies'] = ign_str # tausel w_tausel = self.widgets['tausel'] w_tausel_ind_left = self.widgets['tausel_left'] w_tausel_ind_right = self.widgets['tausel_right'] if w_tausel.value == 'individual': tausel_opt = '{0},{1}'.format( w_tausel_ind_left.value, w_tausel_ind_right.value, ) else: tausel_opt = w_tausel.value config['tausel'] = tausel_opt if self.widgets['use_norm'].value is True: config['norm'] = self.widgets['norm_value'].value # generate a ccd object ccd_obj = ccd_single.ccd_single(config) # commence with the actual fitting ccd_obj.fit_data() # extract the last iteration last_it = ccd_obj.results[0].iterations[-1] if self.widgets['generate_plot'].value is True: logging.info('plotting ... this may take a while') # don't show in notebook # plt.ioff() # _ = last_it.plot() # _ for spectrum in ccd_obj.results: last_it = spectrum.iterations[-1] self._plot(ccd_obj, last_it) if self.widgets['generate_lcurve'].value is True: for spectrum in ccd_obj.results: last_it = spectrum.iterations[-1] last_it.plot_lcurve() if self.widgets['generate_reg_strength'].value is True: for spectrum in ccd_obj.results: last_it = spectrum.iterations[-1] last_it.plot_reg_strengths() if self.widgets['generate_it_plots'].value is True: for spectrum in ccd_obj.results: for iteration in spectrum.iterations: self._plot(ccd_obj, iteration) if self.widgets['generate_output'].value is True: with tempfile.TemporaryDirectory() as outdir: ccd_obj.save_to_directory(outdir) outfile = 'sip_results_{0}'.format( datetime.datetime.strftime( datetime.datetime.now(), '%Y%m%d_%H%M', ) ) shutil.make_archive( outfile, format='zip', root_dir=outdir + os.sep, verbose=True ) display(HTML( self._help_url('data_formats.html#ascii-audit-format') )) display(HTML( '<a href="{0}.zip" download>Download results</a>'.format( outfile ) )) print('finished')
def _plot(self, ccd_obj, it): obj = lib_dd.plot.plot_iteration() norm_factors = ccd_obj.data.get('norm_factors', 1) D = it.Data.D / norm_factors M = it.Model.convert_to_M(it.m) # renormalize here? Why do we compute the forward solution again? F = it.Model.F(M) / norm_factors # extra_size = int( # np.sum([x[1][1] for x in it.Data.extra_dims.items()]) # ) # nr_spectra = max(1, extra_size) mpl.rcParams['figure.dpi'] = 250 # iterate over spectra for nr, (d, m) in enumerate(it.Model.DM_iterator()): # plot spectrum fig, axes = plt.subplots(1, 2, figsize=(10 / 2.54, 5 / 2.54)) obj._plot_rre_rim(nr, axes, D[d], F[d], it) # self._plot_rmag_rpha(nr, axes[nr, 2:4], D[d], F[d], it) ax1 = axes[0].twinx() ax2 = axes[1].twinx() obj._plot_cre_cim(nr, [ax1, ax2], D[d], F[d], it) fig.tight_layout() fig.subplots_adjust(top=0.7) # plot spectrum rmag_rpha fig, axes = plt.subplots(1, 2, figsize=(10 / 2.54, 5 / 2.54)) obj._plot_rmag_rpha(nr, axes, D[d], F[d], it) ax1 = axes[0].twinx() ax2 = axes[1].twinx() fig.tight_layout() fig.subplots_adjust(top=0.7) # plot rtd fig, axes = plt.subplots(1, 1, figsize=(10 / 2.54, 5 / 2.54)) obj._plot_rtd(nr, axes, M[m], it) fig.tight_layout()