在上一篇文章中,我们讨论了如何使用 python 获取 nifty 和 bank nifty 数据。那篇文章的反响很好,因此根据大众的需求,这里有一个扩展版本。在本文中,我们将学习如何每 30 秒从 nse 网站获取期权链数据。此内容仅用于学习目的。
在 python 中,我们将使用 asyncio 每 30 秒向 nse 数据发出一次 api 请求。
在python中安装所需的库
pip 安装 aiohttp 异步
代码
import aiohttp
import asyncio
import requests
import json
import math
import time
def strRed(skk): return "033[91m {}033[00m".format(skk)
def strGreen(skk): return "033[92m {}033[00m".format(skk)
def strYellow(skk): return "033[93m {}033[00m".format(skk)
def strLightPurple(skk): return "033[94m {}033[00m".format(skk)
def strPurple(skk): return "033[95m {}033[00m".format(skk)
def strCyan(skk): return "033[96m {}033[00m".format(skk)
def strLightGray(skk): return "033[97m {}033[00m".format(skk)
def strBlack(skk): return "033[98m {}033[00m".format(skk)
def strBold(skk): return "033[1m {}033[00m".format(skk)
def round_nearest(x, num=50): return int(math.ceil(float(x)/num)*num)
def nearest_strike_bnf(x): return round_nearest(x, 100)
def nearest_strike_nf(x): return round_nearest(x, 50)
url_oc = "https://www.nseindia.com/option-chain"
url_bnf = 'https://www.nseindia.com/api/option-chain-indices?symbol=BANKNIFTY'
url_nf = 'https://www.nseindia.com/api/option-chain-indices?symbol=NIFTY'
url_indices = "https://www.nseindia.com/api/allIndices"
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36',
'accept-language': 'en,gu;q=0.9,hi;q=0.8',
'accept-encoding': 'gzip, deflate, br'}
cookies = dict()
def set_cookie():
sess = requests.Session()
request = sess.get(url_oc, headers=headers, timeout=5)
return dict(request.cookies)
async def get_data(url, session):
global cookies
async with session.get(url, headers=headers, timeout=5, cookies=cookies) as response:
if response.status == 401:
cookies = set_cookie()
async with session.get(url, headers=headers, timeout=5, cookies=cookies) as response:
return await response.text()
elif response.status == 200:
return await response.text()
return ""
async def fetch_all_data():
async with aiohttp.ClientSession() as session:
indices_data = await get_data(url_indices, session)
bnf_data = await get_data(url_bnf, session)
nf_data = await get_data(url_nf, session)
return indices_data, bnf_data, nf_data
# Process the fetched data
def process_indices_data(data):
global bnf_ul, nf_ul, bnf_nearest, nf_nearest
data = json.loads(data)
for index in data["data"]:
if index["index"] == "NIFTY 50":
nf_ul = index["last"]
if index["index"] == "NIFTY BANK":
bnf_ul = index["last"]
bnf_nearest = nearest_strike_bnf(bnf_ul)
nf_nearest = nearest_strike_nf(nf_ul)
def process_oi_data(data, nearest, step, num):
data = json.loads(data)
currExpiryDate = data["records"]["expiryDates"][0]
oi_data = []
for item in data['records']['data']:
if item["expiryDate"] == currExpiryDate:
if nearest - step*num 0 else strRed(ce_oi)
pe_color = strGreen(pe_oi) if pe_change > 0 else strRed(pe_oi)
print(f"Strike Price: {strike}, Call OI: {ce_color} ({strBold(f'+{ce_change}') if ce_change > 0 else strBold(ce_change) if ce_change 0 else strBold(pe_change) if pe_change 0 else strRed(ce_oi)
pe_color = strGreen(pe_oi) if pe_change > 0 else strRed(pe_oi)
print(f"Strike Price: {strike}, Call OI: {ce_color} ({strBold(f'+{ce_change}') if ce_change > 0 else strBold(ce_change) if ce_change 0 else strBold(pe_change) if pe_change
<h2>
输出:
</h2>
<p><img src="https://img.php.cn/upload/article/000/465/014/172317790728767.png" alt="使用 Python 的 NSE 期权链数据 - 第二部分 |沙阿·斯塔万"></p>
<p><img src="https://img.php.cn/upload/article/000/465/014/172317791760318.png" alt="使用 Python 的 NSE 期权链数据 - 第二部分 |沙阿·斯塔万"></p><p><span>立即学习</span>“<a href="https://pan.quark.cn/s/00968c3c2c15" style="max-width:90%" rel="nofollow" target="_blank">Python免费学习笔记(深入)</a>”;</p>
<p>您甚至可以通过此链接观看演示视频</p>
<p>谢谢!!<br>
我们下一篇富有洞察力的博客见。</p>