import matplotlib.pyplot as plt import numpy as np from astropy.modeling.fitting import ( Fitter, _validate_model, fitter_to_model_params, model_to_fit_params, _convert_input, ) from astropy.modeling.optimizers import Optimization from astropy.modeling.statistic import leastsquare from astropy import uncertainty as astrounc import emcee import corner all = ["EmceeOpt", "EmceeFitter", "plot_emcee_results"] class EmceeOpt(Optimization): """ Interface to emcee sampler. """ supported_constraints = ["bounds", "fixed", "tied"] def __init__(self): super().__init__(emcee) self.fit_info = {"perparams": None, "samples": None, "sampler": None} @staticmethod def _get_best_fit_params(sampler): """ Determine the best fit parameters given an emcee sampler object """ # very likely a faster way max_lnp = -1e6 nwalkers, nsteps = sampler.lnprobability.shape for k in range(nwalkers): tmax_lnp = np.nanmax(sampler.lnprobability[k]) if tmax_lnp > max_lnp: max_lnp = tmax_lnp (indxs,) = np.where(sampler.lnprobability[k] == tmax_lnp) fit_params_best = sampler.chain[k, indxs[0], :] return fit_params_best def __call__(self, objfunc, initval, fargs, nsteps, save_samples=None, save_backend=None, **kwargs): """ Run the sampler. Parameters ---------- objfunc : callable objection function initval : iterable initial guess for the parameter values fargs : tuple other arguments to be passed to the statistic function kwargs : dict other keyword arguments to be passed to the solver """ # optresult = self.opt_method(objfunc, initval, args=fargs) # fitparams = optresult['x'] ndim = len(initval) nwalkers = 10 * ndim #2 * ndim pos = initval + 1e-4 * np.random.randn(nwalkers, ndim) # ensure all the walkers start within the bounds model = fargs[0] for cp in pos: k = 0 for cname in model.param_names: if not model.fixed[cname]: if model.bounds[cname][0] is not None: if cp[k] < model.bounds[cname][0]: cp[k] = model.bounds[cname][0] if model.bounds[cname][1] is not None: if cp[k] > model.bounds[cname][1]: cp[k] = model.bounds[cname][1] # only non-fixed parameters are in initval # so only increment k when non-fixed k += 1 # Set up the backend if save_samples: # Don't forget to clear it in case the file already exists save_backend = emcee.backends.HDFBackend(save_samples) save_backend.reset(nwalkers, ndim) sampler = self.opt_method.EnsembleSampler( nwalkers, ndim, objfunc, backend=save_backend, args=fargs ) sampler.run_mcmc(pos, nsteps, progress=True) samples = sampler.get_chain() fitparams = self._get_best_fit_params(sampler) self.fit_info["sampler"] = sampler self.fit_info["samples"] = samples return fitparams, self.fit_info class EmceeFitter(Fitter): """ Use emcee and least squares statistic """ def __init__(self, nsteps=100, burnfrac=0.1, save_samples=None): super().__init__(optimizer=EmceeOpt, statistic=leastsquare) self.nsteps = nsteps self.burnfrac = burnfrac self.fit_info = {} self.save_samples = save_samples # add lnlike and lnprior and have log_probability just be the combo of the two def log_prior(self, fps, *args): """ Computes the natural log of the prior. Currently only handles flat priors set using parameter bounds. Parameters ---------- fps : list parameters returned by the fitter args : list [model, [other_args], [input coordinates]] other_args may include weights or any other quantities specific for a statistic Returns ------- log(prior) : float natural log of the prior probability """ # pameter bound priors = flat priors between two limits # EMCEE uses an explicit return of -np.inf to designate such bounds # need to be handled explicitly to get good sampler chains # standard astropy modeling fitting results in chains not accurately # reflecting the bounds model = args[0] k = 0 for cname in model.param_names: if not model.fixed[cname]: if model.bounds[cname][0] is not None: if fps[k] < model.bounds[cname][0]: return -np.inf if model.bounds[cname][1] is not None: if fps[k] > model.bounds[cname][1]: return -np.inf k += 1 # no other priors, so return 0.0 = log(1.0) return 0.0 def log_likelihood(self, fps, *args): """ Computes the natural log of the likelihood. Parameters ---------- fps : list parameters returned by the fitter args : list [model, [other_args], [input coordinates]] other_args may include weights or any other quantities specific for a statistic Returns ------- log(likelihood) : float natural log of the likelihood probability """ # assume the standard leastsquare res = self.objective_function(fps, *args) # convert to a log value - assumes chisqr/Gaussian unc model return -0.5 * res def log_probability(self, fps, *args): """ Compute the natural log of the probability by combining the likelihood and prior probabilties. Parameters ---------- fps : list parameters returned by the fitter args : list [model, [other_args], [input coordinates]] other_args may include weights or any other quantities specific for a statistic Returns ------- log(prob) : float natural log of the probability Notes ----- The list of arguments (args) is set in the `__call__` method. Fitters may overwrite this method, e.g. when statistic functions require other arguments. """ lp = self.log_prior(fps, *args) if not np.isfinite(lp): return -np.inf return lp + self.log_likelihood(fps, *args) def _set_uncs_and_posterior(self, model): """ Set the symmetric and asymmetric Gaussian uncertainties and sets the posteriors to astropy.unc distributions Parameters ---------- model : astropy model model giving the result from the fitting Returns ------- model : astropy model model updated with uncertainties """ sampler = self.fit_info["sampler"] nwalkers, nsteps = sampler.lnprobability.shape # discard the 1st burn_frac (burn in) flat_samples = sampler.get_chain(discard=int(self.burnfrac * nsteps), flat=True) nflatsteps, ndim = flat_samples.shape nparams = len(model.parameters) model.uncs = np.zeros((nparams)) model.uncs_plus = np.zeros((nparams)) model.uncs_minus = np.zeros((nparams)) k = 0 for i, pname in enumerate(model.param_names): if not model.fixed[pname]: mcmc = np.percentile(flat_samples[:, k], [16, 50, 84]) # set the uncertainty arrays - could be done via the parameter objects # but would need an update to the model properties to make this happen # model.parameters[i] = mcmc[1] model.uncs[i] = 0.5 * (mcmc[2] - mcmc[0]) model.uncs_plus[i] = mcmc[2] - mcmc[1] model.uncs_minus[i] = mcmc[1] - mcmc[0] # set the posterior distribution to the samples param = getattr(model, pname) param.value = mcmc[1] param.posterior = astrounc.Distribution(flat_samples[:, k]) k += 1 else: model.uncs[i] = 0.0 model.uncs_plus[i] = 0.0 model.uncs_minus[i] = 0.0 # set the posterior distribution to the samples param = getattr(model, pname) param.posterior = None # now set uncertainties on the parameter objects themselves param = getattr(model, pname) param.unc = model.uncs[i] param.unc_plus = model.uncs_plus[i] param.unc_minus = model.uncs_minus[i] return model def __call__(self, model, x, y, weights=None, **kwargs): """ Fit data to this model. Parameters ---------- model : `~astropy.modeling.FittableModel` model to fit to x, y x : array input coordinates y : array input coordinates weights : array, optional Weights for fitting. For data with Gaussian uncertainties, the weights should be 1/sigma. kwargs : dict optional keyword arguments to be passed to the optimizer or the statistic Returns ------- model_copy : `~astropy.modeling.FittableModel` a copy of the input model with parameters set by the fitter """ model_copy = _validate_model(model, self._opt_method.supported_constraints) farg = _convert_input(x, y) farg = (model_copy, weights) + farg p0, _, _ = model_to_fit_params(model_copy) fitparams, self.fit_info = self._opt_method( self.log_probability, p0, farg, self.nsteps, save_samples=self.save_samples, **kwargs ) # set the output model parameters to the "best fit" parameters fitter_to_model_params(model_copy, fitparams) # get and set the symmetric and asymmetric uncertainties on each parameter model_copy = self._set_uncs_and_posterior(model_copy) return model_copy def plot_emcee_results(self, fitted_model, filebase=""): """ Plot the standard triangle and diagnostic walker plots """ # get the samples to use sampler = self.fit_info["sampler"] # only the non fixed parameters were fit fit_param_names = [] for pname in fitted_model.param_names: if not fitted_model.fixed[pname]: fit_param_names.append(pname) # plot the walker chains for all parameters nwalkers, nsteps, ndim = sampler.chain.shape fig, ax = plt.subplots(ndim, sharex=True, figsize=(13, 13)) walk_val = np.arange(nsteps) for i in range(ndim): for k in range(nwalkers): ax[i].plot(walk_val, sampler.chain[k, :, i], "-") ax[i].set_ylabel(fit_param_names[i]) fig.savefig("%s_walker_param_values.png" % filebase) plt.close(fig) # plot the 1D and 2D likelihood functions in a traditional triangle plot nwalkers, nsteps = sampler.lnprobability.shape # discard the 1st burn_frac (burn in) flat_samples = sampler.get_chain(discard=int(self.burnfrac * nsteps), flat=True) nflatsteps, ndim = flat_samples.shape fig = corner.corner( flat_samples, labels=fit_param_names, show_titles=True, title_fmt=".3f", use_math_text=True, ) fig.savefig("%s_param_triangle.png" % filebase) plt.close(fig)