```js
//@version=5
strategy("ADF Strategy", overlay = false, initial_capital = 10000, commission_type = strategy.commission.percent, commission_value = 0.25, pyramiding = 0)
start_date = input.time(defval = timestamp("01 Jan 2023 00:00 +0000"), title = "From")
end_date = input.time(defval = timestamp("01 Jan 2024 00:00 +0000"), title = "To")
show_date_range = input (defval = true, title = "Show Date Range")
in_date_range() => time >= start_date and time <= end_date
symbol = input.symbol("KRAKEN:BTCUSD", "Source Symbol")
sample_length = input.int(title='Length', defval=100, minval = 2, tooltip = 'The test is applied in a moving window. Length defines the number of points in the sample.')
max_lag = input.int(title='Maximum lag', defval=0, minval = 0, tooltip = 'Maximum lag which is included in test. Generally, lags allow taking into account serial correlation of price changes.')
confidence_level = input.string(title='Confidence Level', defval="90%", options = ['90%', '95%', '99%'], tooltip = 'Defines at which confidence level the critical value of the ADF test statistic is calculated. If the test statistic is below the critical value, the time series sample is concluded to be mean-reverting.')
invert_color = input.bool (true, 'Invert the Color', tooltip='Occasionally signal color inversion is required')
var int BUY_SIGNAL = 1
var int SELL_SIGNAL =-1
var int CLEAR_SIGNAL = 0
var int current_signal = CLEAR_SIGNAL
// Collect the time series data for both source and target series
source_close_price = request.security(symbol, timeframe.period, close)
target_close_price = close
matrix_get(A, i, j, nrows) =>
array.get(A, i + nrows * j)
matrix_set(A, value, i, j, nrows) =>
array.set(A, i + nrows * j, value)
A
transpose(A, nrows, ncolumns) =>
float[] AT = array.new_float(nrows * ncolumns, 0)
for i = 0 to nrows - 1
for j = 0 to ncolumns - 1
matrix_set(AT, matrix_get(A, i, j, nrows), j, i, ncolumns)
AT
multiply(A, B, nrowsA, ncolumnsA, ncolumnsB) =>
float[] C = array.new_float(nrowsA * ncolumnsB, 0)
int nrowsB = ncolumnsA
float elementC = 0.0
for i = 0 to nrowsA - 1
for j = 0 to ncolumnsB - 1
elementC := 0
for k = 0 to ncolumnsA - 1
elementC += matrix_get(A, i, k, nrowsA) * matrix_get(B, k, j, nrowsB)
matrix_set(C, elementC, i, j, nrowsA)
C
vnorm(X) =>
int n = array.size(X)
float norm = 0.0
for i = 0 to n - 1
norm += math.pow(array.get(X, i), 2)
math.sqrt(norm)
qr_diag(A, nrows, ncolumns) =>
float[] Q = array.new_float(nrows * ncolumns, 0)
float[] R = array.new_float(ncolumns * ncolumns, 0)
float[] a = array.new_float(nrows, 0)
float[] q = array.new_float(nrows, 0)
float r = 0.0
float aux = 0.0
for i = 0 to nrows - 1
array.set(a, i, matrix_get(A, i, 0, nrows))
r := vnorm(a)
matrix_set(R, r, 0, 0, ncolumns)
for i = 0 to nrows - 1
matrix_set(Q, array.get(a, i) / r, i, 0, nrows)
if ncolumns != 1
for k = 1 to ncolumns - 1
for i = 0 to nrows - 1
array.set(a, i, matrix_get(A, i, k, nrows))
for j = 0 to k - 1 by 1
r := 0
for i = 0 to nrows - 1
r += matrix_get(Q, i, j, nrows) * array.get(a, i)
matrix_set(R, r, j, k, ncolumns)
for i = 0 to nrows - 1
aux := array.get(a, i) - r * matrix_get(Q, i, j, nrows)
array.set(a, i, aux)
r := vnorm(a)
matrix_set(R, r, k, k, ncolumns)
for i = 0 to nrows - 1
matrix_set(Q, array.get(a, i) / r, i, k, nrows)
[Q, R]
pinv(A, nrows, ncolumns) =>
[Q, R] = qr_diag(A, nrows, ncolumns)
float[] QT = transpose(Q, nrows, ncolumns)
var Rinv = array.new_float(ncolumns * ncolumns, 0)
float r = 0.0
matrix_set(Rinv, 1 / matrix_get(R, 0, 0, ncolumns), 0, 0, ncolumns)
if ncolumns != 1
for j = 1 to ncolumns - 1
for i = 0 to j - 1
r := 0.0
for k = i to j - 1
r += matrix_get(Rinv, i, k, ncolumns) * matrix_get(R, k, j, ncolumns)
matrix_set(Rinv, r, i, j, ncolumns)
for k = 0 to j - 1
matrix_set(Rinv, -matrix_get(Rinv, k, j, ncolumns) / matrix_get(R, j, j, ncolumns), k, j, ncolumns)
matrix_set(Rinv, 1 / matrix_get(R, j, j, ncolumns), j, j, ncolumns)
float[] Ainv = multiply(Rinv, QT, ncolumns, ncolumns, nrows)
Ainv
adftest(a, nLag, conf) =>
if nLag >= array.size(a)/2 - 2
runtime.error("ADF: Maximum lag must be less than (Length/2 - 2)")
int nobs = array.size(a)-nLag-1
//
float[] y = array.new_float(na)
float[] x = array.new_float(na)
float[] x0 = array.new_float(na)
for i = 0 to nobs-1
array.push( y, array.get(a,i)-array.get(a,i+1))
array.push( x, array.get(a,i+1))
array.push(x0, 1.0)
float[] X = array.copy(x)
int M = 2
X := array.concat(X, x0)
if nLag > 0
for n = 1 to nLag
float[] xl = array.new_float(na)
for i = 0 to nobs-1
array.push(xl, array.get(a,i+n)-array.get(a,i+n+1)) // lag-n difference, predictor
X := array.concat(X, xl)
M += 1
float[] c = pinv(X, nobs, M)
float[] coeff = multiply(c, y, M, nobs, 1)
float[] Yhat = multiply(X,coeff,nobs,M,1)
float meanX = array.avg(x)
float sum1 = 0.0
float sum2 = 0.0
for i = 0 to nobs-1
sum1 += math.pow(array.get(y,i) - array.get(Yhat,i), 2)/(nobs-M)
sum2 += math.pow(array.get(x,i) - meanX, 2)
float SE = math.sqrt(sum1/sum2)
float adf = array.get(coeff,0) /SE
float crit = switch
conf == "90%" => -2.56677 - 1.5384/nobs - 2.809/nobs/nobs
conf == "95%" => -2.86154 - 2.8903/nobs - 4.234/nobs/nobs - 40.040/nobs/nobs/nobs
conf == "99%" => -3.43035 - 6.5393/nobs - 16.786/nobs/nobs - 79.433/nobs/nobs/nobs
[adf, crit, nobs]
float[] source_close_price_array = array.new_float(na)
for i = 0 to sample_length-1
array.push(source_close_price_array, source_close_price[i])
float[] target_close_price_array = array.new_float(na)
for i = 0 to sample_length-1
array.push(target_close_price_array, target_close_price[i])
[source_adf, source_critical_value, source_nobs] = adftest(source_close_price_array, max_lag, confidence_level)
[target_adf, target_critical_value, target_nobs] = adftest(target_close_price_array, max_lag, confidence_level)
price_spread = source_close_price - target_close_price
mean_spread = ta.sma(price_spread, 20)
std_spread = ta.stdev(price_spread, 20)
upper_band = mean_spread + 2 * std_spread
lower_band = mean_spread - 2 * std_spread
// Signals
bool is_long = source_adf < source_critical_value and target_adf < target_critical_value and price_spread > upper_band
bool is_short = source_adf < source_critical_value and target_adf < target_critical_value and price_spread < lower_band
bool is_clear = not(is_long and is_short)
current_signal := is_long ? BUY_SIGNAL : is_short ? SELL_SIGNAL : is_clear ? CLEAR_SIGNAL : nz(current_signal[1])
int signal_change = ta.change(current_signal)
bool long_condition = signal_change and current_signal == BUY_SIGNAL
bool short_condition = signal_change and current_signal == SELL_SIGNAL
bool clear_condition = signal_change and current_signal == CLEAR_SIGNAL
// Visuals
color buy_color = invert_color ? color.red : color.green
color sell_color = invert_color ? color.green : color.red
plot(long_condition ? price_spread : na, 'Sell Signal', sell_color, 2, plot.style_circles)
plot(short_condition ? price_spread : na, 'Buy Signal', buy_color, 2, plot.style_circles)
plot(clear_condition ? price_spread : na, 'Clear Position', color.orange, 2, plot.style_cross)
plot(price_spread, color=color.blue)
plot(upper_band, color=color.red)
plot(lower_band, color=color.green)
// Orders
if long_condition and strategy.position_size == 0
strategy.entry("Long", strategy.long)
if short_condition and strategy.position_size == 0
strategy.entry("Short", strategy.short)
// // Identify the exit conditions
// exit_long_condition = price_spread <= mean_spread
// exit_short_condition = price_spread >= mean_spread
// // Implement the exit strategy
// if exit_long_condition and strategy.position_size > 0
// strategy.close("Long")
// if exit_short_condition and strategy.position_size < 0
// strategy.close("Short")
```