```js //@version=5 strategy("ADF Strategy", overlay = false, initial_capital = 10000, commission_type = strategy.commission.percent, commission_value = 0.25, pyramiding = 0) start_date = input.time(defval = timestamp("01 Jan 2023 00:00 +0000"), title = "From") end_date = input.time(defval = timestamp("01 Jan 2024 00:00 +0000"), title = "To") show_date_range = input (defval = true, title = "Show Date Range") in_date_range() => time >= start_date and time <= end_date symbol = input.symbol("KRAKEN:BTCUSD", "Source Symbol") sample_length = input.int(title='Length', defval=100, minval = 2, tooltip = 'The test is applied in a moving window. Length defines the number of points in the sample.') max_lag = input.int(title='Maximum lag', defval=0, minval = 0, tooltip = 'Maximum lag which is included in test. Generally, lags allow taking into account serial correlation of price changes.') confidence_level = input.string(title='Confidence Level', defval="90%", options = ['90%', '95%', '99%'], tooltip = 'Defines at which confidence level the critical value of the ADF test statistic is calculated. If the test statistic is below the critical value, the time series sample is concluded to be mean-reverting.') invert_color = input.bool (true, 'Invert the Color', tooltip='Occasionally signal color inversion is required') var int BUY_SIGNAL = 1 var int SELL_SIGNAL =-1 var int CLEAR_SIGNAL = 0 var int current_signal = CLEAR_SIGNAL // Collect the time series data for both source and target series source_close_price = request.security(symbol, timeframe.period, close) target_close_price = close matrix_get(A, i, j, nrows) => array.get(A, i + nrows * j) matrix_set(A, value, i, j, nrows) => array.set(A, i + nrows * j, value) A transpose(A, nrows, ncolumns) => float[] AT = array.new_float(nrows * ncolumns, 0) for i = 0 to nrows - 1 for j = 0 to ncolumns - 1 matrix_set(AT, matrix_get(A, i, j, nrows), j, i, ncolumns) AT multiply(A, B, nrowsA, ncolumnsA, ncolumnsB) => float[] C = array.new_float(nrowsA * ncolumnsB, 0) int nrowsB = ncolumnsA float elementC = 0.0 for i = 0 to nrowsA - 1 for j = 0 to ncolumnsB - 1 elementC := 0 for k = 0 to ncolumnsA - 1 elementC += matrix_get(A, i, k, nrowsA) * matrix_get(B, k, j, nrowsB) matrix_set(C, elementC, i, j, nrowsA) C vnorm(X) => int n = array.size(X) float norm = 0.0 for i = 0 to n - 1 norm += math.pow(array.get(X, i), 2) math.sqrt(norm) qr_diag(A, nrows, ncolumns) => float[] Q = array.new_float(nrows * ncolumns, 0) float[] R = array.new_float(ncolumns * ncolumns, 0) float[] a = array.new_float(nrows, 0) float[] q = array.new_float(nrows, 0) float r = 0.0 float aux = 0.0 for i = 0 to nrows - 1 array.set(a, i, matrix_get(A, i, 0, nrows)) r := vnorm(a) matrix_set(R, r, 0, 0, ncolumns) for i = 0 to nrows - 1 matrix_set(Q, array.get(a, i) / r, i, 0, nrows) if ncolumns != 1 for k = 1 to ncolumns - 1 for i = 0 to nrows - 1 array.set(a, i, matrix_get(A, i, k, nrows)) for j = 0 to k - 1 by 1 r := 0 for i = 0 to nrows - 1 r += matrix_get(Q, i, j, nrows) * array.get(a, i) matrix_set(R, r, j, k, ncolumns) for i = 0 to nrows - 1 aux := array.get(a, i) - r * matrix_get(Q, i, j, nrows) array.set(a, i, aux) r := vnorm(a) matrix_set(R, r, k, k, ncolumns) for i = 0 to nrows - 1 matrix_set(Q, array.get(a, i) / r, i, k, nrows) [Q, R] pinv(A, nrows, ncolumns) => [Q, R] = qr_diag(A, nrows, ncolumns) float[] QT = transpose(Q, nrows, ncolumns) var Rinv = array.new_float(ncolumns * ncolumns, 0) float r = 0.0 matrix_set(Rinv, 1 / matrix_get(R, 0, 0, ncolumns), 0, 0, ncolumns) if ncolumns != 1 for j = 1 to ncolumns - 1 for i = 0 to j - 1 r := 0.0 for k = i to j - 1 r += matrix_get(Rinv, i, k, ncolumns) * matrix_get(R, k, j, ncolumns) matrix_set(Rinv, r, i, j, ncolumns) for k = 0 to j - 1 matrix_set(Rinv, -matrix_get(Rinv, k, j, ncolumns) / matrix_get(R, j, j, ncolumns), k, j, ncolumns) matrix_set(Rinv, 1 / matrix_get(R, j, j, ncolumns), j, j, ncolumns) float[] Ainv = multiply(Rinv, QT, ncolumns, ncolumns, nrows) Ainv adftest(a, nLag, conf) => if nLag >= array.size(a)/2 - 2 runtime.error("ADF: Maximum lag must be less than (Length/2 - 2)") int nobs = array.size(a)-nLag-1 // float[] y = array.new_float(na) float[] x = array.new_float(na) float[] x0 = array.new_float(na) for i = 0 to nobs-1 array.push( y, array.get(a,i)-array.get(a,i+1)) array.push( x, array.get(a,i+1)) array.push(x0, 1.0) float[] X = array.copy(x) int M = 2 X := array.concat(X, x0) if nLag > 0 for n = 1 to nLag float[] xl = array.new_float(na) for i = 0 to nobs-1 array.push(xl, array.get(a,i+n)-array.get(a,i+n+1)) // lag-n difference, predictor X := array.concat(X, xl) M += 1 float[] c = pinv(X, nobs, M) float[] coeff = multiply(c, y, M, nobs, 1) float[] Yhat = multiply(X,coeff,nobs,M,1) float meanX = array.avg(x) float sum1 = 0.0 float sum2 = 0.0 for i = 0 to nobs-1 sum1 += math.pow(array.get(y,i) - array.get(Yhat,i), 2)/(nobs-M) sum2 += math.pow(array.get(x,i) - meanX, 2) float SE = math.sqrt(sum1/sum2) float adf = array.get(coeff,0) /SE float crit = switch conf == "90%" => -2.56677 - 1.5384/nobs - 2.809/nobs/nobs conf == "95%" => -2.86154 - 2.8903/nobs - 4.234/nobs/nobs - 40.040/nobs/nobs/nobs conf == "99%" => -3.43035 - 6.5393/nobs - 16.786/nobs/nobs - 79.433/nobs/nobs/nobs [adf, crit, nobs] float[] source_close_price_array = array.new_float(na) for i = 0 to sample_length-1 array.push(source_close_price_array, source_close_price[i]) float[] target_close_price_array = array.new_float(na) for i = 0 to sample_length-1 array.push(target_close_price_array, target_close_price[i]) [source_adf, source_critical_value, source_nobs] = adftest(source_close_price_array, max_lag, confidence_level) [target_adf, target_critical_value, target_nobs] = adftest(target_close_price_array, max_lag, confidence_level) price_spread = source_close_price - target_close_price mean_spread = ta.sma(price_spread, 20) std_spread = ta.stdev(price_spread, 20) upper_band = mean_spread + 2 * std_spread lower_band = mean_spread - 2 * std_spread // Signals bool is_long = source_adf < source_critical_value and target_adf < target_critical_value and price_spread > upper_band bool is_short = source_adf < source_critical_value and target_adf < target_critical_value and price_spread < lower_band bool is_clear = not(is_long and is_short) current_signal := is_long ? BUY_SIGNAL : is_short ? SELL_SIGNAL : is_clear ? CLEAR_SIGNAL : nz(current_signal[1]) int signal_change = ta.change(current_signal) bool long_condition = signal_change and current_signal == BUY_SIGNAL bool short_condition = signal_change and current_signal == SELL_SIGNAL bool clear_condition = signal_change and current_signal == CLEAR_SIGNAL // Visuals color buy_color = invert_color ? color.red : color.green color sell_color = invert_color ? color.green : color.red plot(long_condition ? price_spread : na, 'Sell Signal', sell_color, 2, plot.style_circles) plot(short_condition ? price_spread : na, 'Buy Signal', buy_color, 2, plot.style_circles) plot(clear_condition ? price_spread : na, 'Clear Position', color.orange, 2, plot.style_cross) plot(price_spread, color=color.blue) plot(upper_band, color=color.red) plot(lower_band, color=color.green) // Orders if long_condition and strategy.position_size == 0 strategy.entry("Long", strategy.long) if short_condition and strategy.position_size == 0 strategy.entry("Short", strategy.short) // // Identify the exit conditions // exit_long_condition = price_spread <= mean_spread // exit_short_condition = price_spread >= mean_spread // // Implement the exit strategy // if exit_long_condition and strategy.position_size > 0 // strategy.close("Long") // if exit_short_condition and strategy.position_size < 0 // strategy.close("Short") ```