ここではオーダーブック、オーダーポジションのデータの取得を行います。
MongoDBは前回入れたので、 今回は、指定した日から現在までのブックを取得するプログラムを作成します。
使用するretryパッケージをインストールします。
(venv) $ pip install retry
import v20
import pymongo
from retry import retry
import yaml
from datetime import datetime, timedelta
import time
import sys
MONGO_URL = 'mongodb://localhost:27017/'
AUTH_PATH = "auth.yaml"
ACCOUNT_TYPE = "live"
DATETIME_FROM = datetime(2019, 1, 1)
with open(AUTH_PATH, "r") as f:
auth = yaml.safe_load(f)
api = v20.Context(
auth[ACCOUNT_TYPE]["url"],
auth[ACCOUNT_TYPE]["port"],
token=auth[ACCOUNT_TYPE]["token"],
)
@retry(
(v20.errors.V20Timeout, v20.errors.V20ConnectionError),
tries=5,
delay=1,
backoff=2,
)
def fetch_orderbook(instrument, time):
if time == None:
return api.instrument.order_book(instrument)
else:
return api.instrument.order_book(instrument, time=time)
def fetch_orderbook_dict(instrument, time=None):
response = fetch_orderbook(instrument, time)
if response.status != 200:
print(response.body["errorMessage"])
return
orderBook = response.body["orderBook"]
return orderBook.dict()
@retry(
(v20.errors.V20Timeout, v20.errors.V20ConnectionError),
tries=5,
delay=1,
backoff=2,
)
def fetch_positionbook(instrument, time):
if time == None:
return api.instrument.position_book(instrument)
else:
return api.instrument.position_book(instrument, time=time)
def fetch_positionbook_dict(instrument, time=None):
response = fetch_positionbook(instrument, time)
if response.status != 200:
print(response.body["errorMessage"])
return
positionBook = response.body["positionBook"]
return positionBook.dict()
instrument = "USD_JPY"
now = datetime.now()
client = pymongo.MongoClient(MONGO_URL)
database = client[ACCOUNT_TYPE]
o_key = instrument + ".order"
order_collection = database[o_key]
order_collection.create_index(
[
("unixtime", 1),
],
unique=True,
)
p_key = instrument + ".position"
position_collection = database[p_key]
position_collection.create_index(
[
("unixtime", 1),
],
unique=True,
)
latest = order_collection.find_one(
sort=[("unixtime", pymongo.DESCENDING)])
#latest = order_collection.find_one(
# sort=[("unixtime", pymongo.ASCENDING)]) # for check
if latest:
dt_from = datetime.utcfromtimestamp(latest["unixtime"])
print(o_key, dt_from)
else:
dt_from = DATETIME_FROM
t = dt_from
while t < now:
t += timedelta(minutes=20) # min interval 5 minutes
print("{} {} ".format(instrument, t), end='', flush=True)
iso_time = api.datetime_to_str(t)
hit = order_collection.find_one({"unixtime": t.timestamp()})
if not hit:
orderBook = fetch_orderbook_dict(instrument, iso_time)
if orderBook == None:
print()
continue
else:
print("order ", end='', flush=True)
unixtime = datetime.strptime(
orderBook["time"], '%Y-%m-%dT%H:%M:%SZ'
).timestamp()
orderBook.update({"unixtime": unixtime})
try:
order_collection.insert_one(orderBook)
except pymongo.errors.DuplicateKeyError:
pass
else:
print("hit:order ", end='', flush=True)
hit = position_collection.find_one({"unixtime": t.timestamp()})
if not hit:
positionBook = fetch_positionbook_dict(instrument, iso_time)
if positionBook == None:
print()
continue
else:
print("position ", end='', flush=True)
unixtime = datetime.strptime(
positionBook["time"], '%Y-%m-%dT%H:%M:%SZ'
).timestamp()
positionBook.update({"unixtime": unixtime})
try:
position_collection.insert_one(positionBook)
except pymongo.errors.DuplicateKeyError:
pass
else:
print("hit:position ", end='', flush=True)
print()
上記コードで2019年初からの現在までのオーダーブック・オーダーポジションデータをMongoDBに保存できます。
次にMongoDBからデータを取得します。
import pymongo
from datetime import datetime
MONGO_URL = 'mongodb://localhost:27017/'
ACCOUNT_TYPE = "live"
instrument = "USD_JPY"
book_type = "order"
client = pymongo.MongoClient(MONGO_URL)
database = client[ACCOUNT_TYPE]
collection = database[instrument + "." + book_type]
dt = datatime(2019, 1, 7)
book = collection.find_one({
"unixtime": {dt.timestamp(),}}).sort("unixtime")
print(book)
上記のコードで、ブックデータが表示できます。
ブックデータはローソク足と異なり、ブックデータ時刻はその時点のデータママなので 扱いが楽です。
Home > rest-live-v20 > tips > mongo
prev: 取得データをMongoDBに入れる1 ローソク足
next: -