Source code for bitraider.strategy
import sys
import pytz
import xml.utils.iso8601
import time
import numpy
from datetime import date, datetime, timedelta
from matplotlib import pyplot as plt
from exchange import cb_exchange as cb_exchange
from exchange import CoinbaseExchangeAuth
from abc import ABCMeta, abstractmethod
[docs]class strategy(object):
"""`strategy` defines an abstract base strategy class. Minimum required to create a strategy is a file with a class which inherits from strategy containing a backtest_strategy function. As a bonus, strategy includes utility functions like calculate_historic_data.
"""
__metaclass__ = ABCMeta
def __init__(name="default name", interval=5):
"""Constructor for an abstract strategy. You can modify it as needed.
\n`interval`: a.k.a timeslice the amount of time in seconds for each 'tick' default is 5
\n`name`: a string name for the strategy
"""
self.name = name
self.interval = interval
self.times_recalculated = 0
@abstractmethod
[docs] def trade(self, timeslice):
"""Perform operations on a timeslice.
\n`timeslice`: a section of trade data with time length equal to the strategy's interval, formatted as follows:
\n[time, low, high, open, close, volume]
"""
return
[docs] def backtest_strategy(self, historic_data):
"""Returns performance of a strategy vs market performance.
"""
# Reverse the data since Coinbase returns it in reverse chronological
# now historic_data strarts with the oldest entry
historic_data = list(reversed(historic_data))
earliest_time = float(historic_data[0][0])
latest_time = float(historic_data[-1][0])
start_price = float(historic_data[0][4])
end_price = float(historic_data[-1][4])
market_performance = ((end_price-start_price)/start_price)*100
print("Running simulation on historic data. This may take some time....")
for timeslice in historic_data:
# Display what percent through the data we are
idx = historic_data.index(timeslice)
percent = (float(idx)/float(len(historic_data)))*100 + 1
sys.stdout.write("\r%d%%" % percent)
sys.stdout.flush()
self.trade(timeslice)
# Calculate performance
end_amt_no_trades = (float(self.exchange.start_usd)/float(end_price)) + float(self.exchange.start_btc)
end_amt = (float(self.exchange.usd_bal)/float(end_price)) + float(self.exchange.btc_bal)
start_amt = (float(self.exchange.start_usd)/float(start_price)) + float(self.exchange.start_btc)
strategy_performance = ((end_amt-start_amt)/start_amt)*100
print("\n")
print("Times recalculated: "+str(self.times_recalculated))
print("Times bought: "+str(self.exchange.times_bought))
print("Times sold: "+str(self.exchange.times_sold))
print("The Market's performance: "+str(market_performance)+" %")
print("Strategy's performance: "+str(strategy_performance)+" %")
print("Account's ending value if no trades were made: "+str(end_amt_no_trades)+" BTC")
print("Account's ending value with this strategy: "+str(end_amt)+" BTC")
strategy_performance_vs_market = strategy_performance - market_performance
if strategy_performance > market_performance:
print("Congratulations! This strategy has beat the market by: "+str(strategy_performance_vs_market)+" %")
elif strategy_performance < market_performance:
print("This strategy has preformed: "+str(strategy_performance_vs_market)+" % worse than market.")
return strategy_performance_vs_market, strategy_performance, market_performance
@staticmethod
[docs] def calculate_historic_data(data, pivot):
"""Returns average price weighted according to volume, and the number of bitcoins traded
above and below a price point, called a pivot.\n
\npivot: the price used for returning volume above and below
\ndata: a list of lists formated as follows [time, low, high, open, close]
\n[
\n\t["2014-11-07 22:19:28.578544+00", "0.32", "4.2", "0.35", "4.2", "12.3"],
\n\t\t...
\n]
"""
price_list = []
weights = []
if data is None:
pass
min_price = float(data[0][1])
max_price = float(data[0][2])
discrete_prices = {}
for timeslice in data:
timeslice = [float(i) for i in timeslice]
if max_price < timeslice[2]:
max_prie = timeslice[2]
if min_price > timeslice[1]:
min_price = timeslice[1]
closing_price = timeslice[4]
volume = timeslice[5]
if closing_price not in discrete_prices.keys():
discrete_prices[str(closing_price)] = volume
else:
discrete[str(closing_price)] += volume
idx = data.index(timeslice)
price_list.append(closing_price)
weights.append(volume)
fltprices = [float(i) for i in discrete_prices.keys()]
fltvolumes = [float(i) for i in discrete_prices.values()]
np_discrete_prices = numpy.array(fltprices)
np_volume_per_price = numpy.array(fltvolumes)
weighted_avg = numpy.average(np_discrete_prices, weights=np_volume_per_price)
num_above = 0
num_below = 0
num_at = 0
for key in discrete_prices.keys():
value = discrete_prices[key]
if float(key) > pivot:
num_above+=value
elif float(key) < pivot:
num_below+=value
elif float(key) == pivot:
num_at+=value
total_volume = 0.0
for volume in fltvolumes:
total_volume+=volume
fltprops = []
for volume in fltvolumes:
fltprops.append((volume/total_volume))
#print("num_below: "+str(num_below))
#print("num_above: "+str(num_above))
#print("num_at: "+str(num_at))
#print("weighted_average: "+str(weighted_avg))
#plt.title("Price distribution")
#plt.xlabel("Price (USD)")
#plt.ylabel("Volume")
#plt.bar(fltprices, fltprops)
#plt.show()
return weighted_avg, num_above, num_below