# -*- coding: utf-8 -*-
from __future__ import division
import logging
import math
from collections import namedtuple
import numpy as np
import scipy.stats
from simoa.stats import von_neumann_ratio_test
logger = logging.getLogger(__name__)
# check python 3
if not (3 / 2 == 1.5):
raise RuntimeError
ONE_SIGMA = scipy.stats.norm().cdf(1) - scipy.stats.norm().cdf(-1)
""" Minimum number of data points """
NSKART_MIN_DATA_POINTS = 1280
NSKART_INITIAL_NUMBER_OF_BATCHES_IN_SPACER = 0
NSKART_MAXIMUM_NUMBER_OF_BATCHES_IN_SPACER = 10
NSKART_MAXIMUM_NUMBER_OF_BATCHES_IN_SPACER_SKEWED = 3
NSKART_INITIAL_BATCH_NUMBER = 1280
NSKART_RANDOMNESS_TEST_SIGNIFICANCE = 0.20
NSKART_RANDOMNESS_TEST_SIGNIFICANCE_KEY = r'\alpha_{\text{ran}}'
NSKART_NONSPACED_RANDOMNESS_TEST_KEY = (
'nonspaced batch means randomness test passed'
)
NSKART_SPACED_RANDOMNESS_TEST_KEY = (
'spaced batch means randomness test passed'
)
NSKART_INSUFFICIENT_DATA_KEY = "insufficient data"
NSKART_GRAND_AVERAGE_KEY = r"\bar{Y}(m,k')"
NSKART_SAMPLE_VAR_KEY = r"S^2_{m,k'}"
NSKART_SAMPLE_LAG1_CORR = r"\hat{\varphi}_{Y(m)}"
NSKART_BATCHED_GRAND_MEAN_KEY = r"\bar{Y}(m,k'',d')"
NSKART_BATCHED_SAMPLE_VAR_KEY = r"S^2_{m,k'',d'}"
NSKART_BATCHED_SKEW_KEY = r"\hat{\mathcal{B}}_{m,k''}"
[docs]class NSkartException(BaseException):
pass
[docs]class NSkartInsufficientDataError(NSkartException, RuntimeError):
pass
[docs]class NSkartTooFewValues(NSkartException, ValueError):
pass
NSkartReturnValue = namedtuple(
'NSkartReturnValue',
['mean', 'ci', 'env']
)
[docs]def compute_nskart_interval(
data, confidence_level=ONE_SIGMA, continue_insufficient_data=False
):
"""
Compute the confidence interval for the steady-state mean
Implements the N-Skart algorithm, "a nonsequential procedure designed to
deliver a confidence interval for the steady-state mean of a simulation
output process when the user supplies a single simulation-generated time
series of arbitrary size and specifies the required coverage probability
for a confidence interval based on that data set."
Notes
-----
"N-Skart is a variant of the method of batch means that exploits separate
adjustments to the half-length of the CI so as to account for the effects
on the distribution of the underlying Student’s t-statistic that arise from
skewness (nonnormality) and autocorrelation of the batch means.
If the sample size is sufficiently large, then N-Skart delivers not only
a CI but also a point estimator for the steady-state mean that
is approximately free of initialization bias."
References
----------
.. [1] Tafazzoli, A.; Steiger, N.M.; Wilson, J.R.,
"N-Skart: A Nonsequential Skewness- and Autoregression-Adjusted
Batch-Means Procedure for Simulation Analysis,"
Automatic Control, IEEE Transactions on , vol.56, no.2, pp.254,264, 2011
doi: 10.1109/TAC.2010.2052137
"""
# STEPS 1 -- 4
env = _get_independent_data(data, continue_insufficient_data)
# STEP 5a
env = _step_5a(env)
# STEP 5b
env = _step_5b(env)
# STEP 6
env = _step_6(env)
# STEP 7
env = _step_7(env, confidence_level)
return NSkartReturnValue(
mean=env[NSKART_GRAND_AVERAGE_KEY],
ci=env['CI'],
env=env,
)
def _get_independent_data(data, continue_insufficient_data=False):
# STEP 1
env = _step_1(data)
while True:
# STEP 2
env = _step_2(env)
# STEP 3
# STEP 3a
env = _step_3a(env)
if env[NSKART_NONSPACED_RANDOMNESS_TEST_KEY]:
# randomness test passed (failed-to-reject)
return env
# randomness test failed (independence hypothesis rejected)
while env["d"] < env["d^*"]:
# STEP 3b/d
env = _step_3bd(env)
# STEP 3c
env = _step_3c(env)
if env[NSKART_SPACED_RANDOMNESS_TEST_KEY]:
# randomness test passed (failed-to-reject)
return env
# Continue with Step 3b/d
continue
# STEP 4
env = _step_4(env, continue_insufficient_data)
if NSKART_INSUFFICIENT_DATA_KEY in env:
return env
# continue with Step 2
continue
raise RuntimeError
def _compute_nonspaced_batch_means(env):
return (
env['X_i']
[:env['m'] * env['k']]
.reshape((env['k'], env['m']))
.mean(axis=1)
)
def _step_1(data):
"""
Perform step 1 of the N-Skart algorithm
Employs the `logging` module for output.
Parameters
----------
data: array-like
Simulation output series
"""
logger.info('N-Skart step 1')
# initialize persistent environment for the algorithm
env = dict()
env['X_i'] = data
env['N'] = data.size
logger.debug('N = {}'.format(env['N']))
if env['N'] < NSKART_MIN_DATA_POINTS:
raise NSkartTooFewValues(
'Need {} values, got {}.'.format(NSKART_MIN_DATA_POINTS, env['N'])
)
# From the given sample data set of size N, compute the sample skewness of
# the last 80% of the observations.
# Skewness is defined as in the scipy function here, as
# G_1 = \frac{n^2}{(n-1)(n-2)} \frac{m_3}{s^3}
sample_skewness = scipy.stats.skew(
env['X_i'][math.floor(env['N'] / 5):], bias=False
)
logger.debug(
'Sample skewness of the last 80% of the observations: {:.2f}'
.format(sample_skewness)
)
# set initial batch size
env['m'] = (
1 if np.abs(sample_skewness) <= 4.0 else
min(16, math.floor(env['N'] / 1280))
)
logger.debug(
'Initial batch size: m = {}'.format(env['m'])
)
# set current number of batches in a spacer
env['d'] = NSKART_INITIAL_NUMBER_OF_BATCHES_IN_SPACER
# set maximum number of batches allowed in a spacer
env['d^*'] = NSKART_MAXIMUM_NUMBER_OF_BATCHES_IN_SPACER
# set nonspaced (adjacent) batch number
env['k'] = NSKART_INITIAL_BATCH_NUMBER
# set randomness test significance level
env[NSKART_RANDOMNESS_TEST_SIGNIFICANCE_KEY] = (
NSKART_RANDOMNESS_TEST_SIGNIFICANCE
)
# initialize randomness test counter
env['b'] = 0
# initially processed sample size
env['n'] = env['k'] * env['m']
# Having set an appropriate value for the initial batch size,
# N-Skart uses the initial n = 1280m observations of the
# overall sample of size N to compute k = 1280 nonspaced
# (adjacent) batches of size m with an initial spacer consisting of
# d ← 0 ignored batches preceding each “spaced” batch.
env['Y_j(m)'] = _compute_nonspaced_batch_means(env)
logger.debug('Post-step 1 environment: {}'.format(env))
logger.info('Finish step 1')
return env
def _step_2(env):
"""
Perform step 2 of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
yjs_sample_skewness = scipy.stats.skew(
env['Y_j(m)'][math.ceil(0.8 * env['k']):], bias=False
)
logger.debug((
'Sample skewness of the last 80% of the current set of nonspaced batch'
' means: {:.2f}'
).format(yjs_sample_skewness)
)
if abs(yjs_sample_skewness) > 0.5:
# reset the maximum number of batches per spacer
env['d^*'] = NSKART_MAXIMUM_NUMBER_OF_BATCHES_IN_SPACER_SKEWED
logger.debug(
'Reset the maximum number of batches per spacer to {}'.format(
env['d^*']
)
)
logger.debug('Post-step 2 environment: {}'.format(env))
logger.info('Finish step 2')
return env
def _step_3a(env):
"""
Perform step 3a of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 3a')
env[NSKART_NONSPACED_RANDOMNESS_TEST_KEY] = (
von_neumann_ratio_test(
data=env['Y_j(m)'],
alpha=env[NSKART_RANDOMNESS_TEST_SIGNIFICANCE_KEY]
)
)
if env[NSKART_NONSPACED_RANDOMNESS_TEST_KEY]:
env["k'"] = env['k']
logger.debug(
'Randomness test for nonspaced batch means {}ed'.format(
'pass' if env[NSKART_NONSPACED_RANDOMNESS_TEST_KEY]
else 'fail'
)
)
logger.debug('Post-step 3a environment: {}'.format(env))
logger.info('Finish step 3a')
return env
def _step_3bd(env):
"""
Perform step 3b/d of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 3b/d')
# add another ignored batch to each spacer
env['d'] += 1
logger.debug('Add another ignored batch to each spacer: d = {}'.format(
env['d']
))
# number of spaced batches
env["k'"] = math.floor(env['n'] / (env['d'] + 1) / env['m'])
logger.debug("Number of spaced batches: k' = {}".format(env["k'"]))
# compute spaced batch means
env['Y_j(m,d)'] = env['Y_j(m)'][env['d']::env['d'] + 1]
logger.debug('Post-step 3b/d environment: {}'.format(env))
logger.info('Finish step 3b/d')
return env
def _step_3c(env):
"""
Perform step 3c of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 3c')
env[NSKART_SPACED_RANDOMNESS_TEST_KEY] = (
von_neumann_ratio_test(
data=env['Y_j(m,d)'],
alpha=env[NSKART_RANDOMNESS_TEST_SIGNIFICANCE_KEY]
)
)
logger.debug(
'Randomness test for spaced batch means {}ed'.format(
'pass' if env[NSKART_SPACED_RANDOMNESS_TEST_KEY]
else 'fail'
)
)
logger.debug('Post-step 3c environment: {}'.format(env))
logger.info('Finish step 3c')
return env
def _step_4(env, continue_insufficient_data=False):
"""
Perform step 4 of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 4')
# tentative new batch size
new_m = math.ceil(math.sqrt(2) * env['m'])
# tentative new total batch count
new_k = math.ceil(0.9 * env['k'])
# tentative new processed sample size
new_n = new_k * new_m
if new_n <= env['N']:
# update batch size
env['m'] = new_m
logger.debug('Update batch size: m = {}'.format(env['m']))
# update total batch count
env['k'] = new_k
logger.debug('Update total batch count: k = {}'.format(env['k']))
# update processed sample size
env['n'] = new_n
logger.debug('Update processed sample size: n = {}'.format(env['n']))
# reset batch counter
env['d'] = 0
env['d^*'] = 10
# increase test counter
env['b'] += 1
# recalculate non-spaced batch means
env['Y_j(m)'] = _compute_nonspaced_batch_means(env)
else:
# insufficient data: sample to processed larger than available data
logger.debug("Insufficient data.")
logger.debug("New batch size: m = {}".format(new_m))
logger.debug("New total batch count: k = {}".format(new_k))
logger.debug("New processed sample size: n = {}".format(new_n))
logger.debug("Available data points: N = {}".format(env['N']))
error_msg = ("N = {} data points available, need n = {}".format(
env['N'], new_n
))
if continue_insufficient_data:
logger.error(error_msg)
logger.info("Insufficient data -- user request to continue")
env[NSKART_INSUFFICIENT_DATA_KEY] = True
else:
raise NSkartInsufficientDataError(error_msg)
logger.debug('Post-step 4 environment: {}'.format(env))
logger.info('Finish step 4')
return env
def _step_5a(env, **kwargs):
"""
Perform step 5a of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 5a')
# Skip first w observations in the warm-up periods
env['w'] = env['d'] * env['m']
# Number of approximately steady-state observations is available to build a
# confidence interval
env["N'"] = env['N'] - env['w']
# Reinflate batch count to compensate for the total number of times the
# batch count was deflated in successive iterations of the randomness test.
env['k'] = min(
math.ceil(env["k"] * (1.0 / 0.9) ** env['b']),
env['k']
)
# compute a multiplier to use all available observations
env['f'] = math.sqrt(env["N'"] / env["k'"] / env['m'])
# update count of truncated, nonspaced batch means
env["k'"] = min(math.floor(env['f'] * env["k'"]), 1024)
# update batch size
env['m'] = math.floor(
env['f'] * env['m'] if env["k'"] < 1024
else
env["N'"] / 1024.0
)
# update length of warm-up period such that the initial w observations are
# the only unused items in the overall data set of size N
env['w'] += env["N'"] - env["k'"] * env['m']
logger.debug('Post-step 5a environment: {}'.format(env))
logger.info('Finish step 5a')
return env
def _step_5b(env, **kwargs):
"""
Perform step 5b of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 5b')
# Recompute the truncated, nonspaced batch means so that there is no
# partial batch left at the end of the overall data set of size N
env['Y_j(m)'] = (
env['X_i']
[env['w']:]
.reshape((env["k'"], env['m']))
.mean(axis=1)
)
logger.debug('Post-step 5b environment: {}'.format(env))
logger.info('Finish step 5b')
return env
def _step_6(env, **kwargs):
"""
Perform step 6 of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 6')
# compute the grand average of the current set of truncated, nonspaced
# batch means
env[NSKART_GRAND_AVERAGE_KEY] = env['Y_j(m)'].mean()
# compute the sample variance of the current set of truncated, nonspaced
# batch means
env[NSKART_SAMPLE_VAR_KEY] = env['Y_j(m)'].var(ddof=1)
# compute the sample estimator of the lag-one correlation of the truncated,
# nonspaced batch means
env[NSKART_SAMPLE_LAG1_CORR] = (
(
(env['Y_j(m)'][:-1] - env[NSKART_GRAND_AVERAGE_KEY]) *
(env['Y_j(m)'][1:] - env[NSKART_GRAND_AVERAGE_KEY])
)
.sum() /
env[NSKART_SAMPLE_VAR_KEY] /
(env["k'"] - 1.0)
)
env['A'] = (
(1. + env[NSKART_SAMPLE_LAG1_CORR]) /
(1. - env[NSKART_SAMPLE_LAG1_CORR])
)
logger.debug('Post-step 6 environment: {}'.format(env))
logger.info('Finish step 6')
return env
def _step_7(env, confidence_level=ONE_SIGMA, **kwargs):
"""
Perform step 7 of the N-Skart algorithm
Parameters
----------
env: dict
The persistent algorithm environment (parameters and variables)
Returns
-------
env: dict
The persistent algorithm environment (parameters and variables)
"""
logger.info('N-Skart step 7')
# compute spacer number such that spacer size is the smallest multiple of m
# not less than the final size of the warm-up period
env["d'"] = math.ceil(env['w'] / env['m'])
# number of spaced batches
env["k''"] = 1 + math.floor(
(env["k'"] - 1) / (env["d'"] + 1)
)
# compute spaced batch means
# FIXME: BUG in paper
# Sum needs to start at: N - (k'' - j) (d' + 1) m - m + 1
# Until: N - (k'' - j) (d' + 1) m
mask = (
np.arange(env['N'] - env['w']) // env['m'] % (env["d'"] + 1) == 0
)[::-1]
env["Y_j(m,d')"] = (
env['X_i']
[env['w']:]
[mask]
.reshape(
(env["k''"], env['m'])
)
.mean(axis=1)
)
# compute grand mean
env[NSKART_BATCHED_GRAND_MEAN_KEY] = env["Y_j(m,d')"].mean()
# compute sample variance
env[NSKART_BATCHED_SAMPLE_VAR_KEY] = env["Y_j(m,d')"].var(ddof=1)
# compute skewness
env[NSKART_BATCHED_SKEW_KEY] = scipy.stats.skew(
env["Y_j(m,d')"], bias=False
)
env[r'\beta'] = env[NSKART_BATCHED_SKEW_KEY] / 6 / math.sqrt(env["k''"])
def skewness_adjust(zeta):
beta = env[r'\beta']
return (
(np.power(1 + 6 * beta * (zeta - beta), 1 / 3) - 1) / 2 / beta
)
env['G(zeta)'] = skewness_adjust
env['L, R'] = np.asarray(
scipy.stats.t(df=env["k''"] - 1)
.interval(confidence_level)
)
env['CI'] = (
env[NSKART_GRAND_AVERAGE_KEY] +
env['G(zeta)'](env['L, R']) *
math.sqrt(env['A'] * env[NSKART_SAMPLE_VAR_KEY] / env["k'"])
)
logger.debug('Post-step 7 environment: {}'.format(env))
logger.info('Finish step 7')
return env