# Dhan HQ API Data Fetching: A Comprehensive Guide for AI-Driven Development
## Introduction
This document provides a highly detailed, step-by-step guide for an AI model to autonomously generate and validate the code for the Dhan HQ Data Lake project. Each sub-task includes explicit instructions, expected outputs, API call specifics, and validation criteria to minimize human intervention and ensure a robust, production-ready system.
## Prerequisites
# MongoDB Configuration
MONGO_URI="mongodb://localhost:27017/"
MONGO_DB_NAME="dhan_data_lake"
### 1. Environment Setup
**Objective:** Prepare the development environment with all necessary tools and configurations.
**Instructions for AI:**
1. Create a Python virtual environment named `.venv`.
2. Activate the virtual environment.
3. Install the following Python packages using `pip`:
* `requests`: For making HTTP requests to the Dhan HQ REST API.
* `pymongo`: For interacting with the MongoDB database.
* `python-dotenv`: For managing environment variables (API keys, etc.).
* `websocket-client`: For connecting to the Dhan HQ WebSocket API.
* `pytz`: For handling timezones, specifically IST.
* `pandas`: For data manipulation and analysis.
* `pandas_ta`: For calculating technical indicators.
* `py_vollib`: For calculating option Greeks.
4. Create a file named `.env` in the project root and populate it with the following content (placeholders should be replaced with actual values):
```
DHAN_ACCESS_TOKEN_1=your_primary_access_token
DHAN_CLIENT_ID_1=your_primary_client_id
DHAN_ACCESS_TOKEN_2=your_secondary_access_token
DHAN_CLIENT_ID_2=your_secondary_client_id
MONGO_URI=mongodb://localhost:27017/dhan_fno_data
TIMEZONE=Asia/Kolkata
```
5. Ensure a MongoDB instance is running and accessible at the URI specified in `.env`.
**Validation:**
* The virtual environment is active.
* All specified packages are installed.
* The `.env` file exists and is populated.
* A connection to the MongoDB instance can be established.
---
## Phase 1: Foundation Data (Critical - Must Complete Before Proceeding)
### 2. Fetch Instrument Master Data
**Objective:** Acquire and store comprehensive instrument master data from Dhan HQ API, which is foundational for identifying all tradable F&O contracts and their underlying assets.
**Python Script:** `01_fetch_instruments.py`
**Instructions for AI:**
#### Task 2.1: Fetch NSE_EQ instruments (for underlying stocks and indices)
**Purpose:** Obtain `securityId` and other details for all equity instruments listed on NSE, including those that are constituents of Nifty 50, Bank Nifty, and Fin Nifty.
**API Call Details:**
* **Endpoint:** `GET /api/v1/instruments`
* **Parameters:** `exchangeSegment=NSE_EQ`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Expected Response:** A JSON array of instrument objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
import requests
import os
from dotenv import load_dotenv
from pymongo import MongoClient
# Load environment variables
load_dotenv()
# --- Dual API Client (Conceptual - will be fully implemented in Phase 7) ---
# For now, assume a simple client that uses the first API key.
# The AI should understand that this will be replaced later.
class SimpleDhanAPIClient:
def __init__(self, access_token, client_id):
self.base_url = "https://api.dhan.co/" # Verify Dhan HQ base URL
self.headers = {
"access-token": access_token,
"x-dhan-client-id": client_id,
"Content-Type": "application/json"
}
def get_instruments(self, exchange_segment):
url = f"{self.base_url}api/v1/instruments"
params = {"exchangeSegment": exchange_segment}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database() # Gets the database name from MONGO_URI if specified, else default
# Initialize a simple API client (will be replaced by dual API client later)
api_client = SimpleDhanAPIClient(
os.getenv("DHAN_ACCESS_TOKEN_1"),
os.getenv("DHAN_CLIENT_ID_1")
)
def fetch_and_store_instruments(exchange_segment, api_client, db_collection):
print(f"Fetching {exchange_segment} instruments...")
try:
instruments_data = api_client.get_instruments(exchange_segment)
if instruments_data:
# Use bulk upsert for efficiency
operations = [
pymongo.UpdateOne(
{"securityId": doc["securityId"]},
{"$set": doc},
upsert=True
)
for doc in instruments_data
]
db_collection.bulk_write(operations, ordered=False)
print(f"Successfully fetched and stored {len(instruments_data)} {exchange_segment} instruments.")
else:
print(f"No instruments found for {exchange_segment}.")
except requests.exceptions.RequestException as e:
print(f"Error fetching {exchange_segment} instruments: {e}")
# Main execution for Task 2.1
# db.instruments_master.drop() # Uncomment to clear collection for fresh start
fetch_and_store_instruments("NSE_EQ", api_client, db.instruments_master)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Fetching NSE_EQ instruments...
Successfully fetched and stored XXXXX NSE_EQ instruments.
```
(Where XXXXX is the number of equity instruments, typically in the thousands).
**Expected Output (MongoDB - `instruments_master` collection):**
* Documents representing NSE equity instruments.
* Example document structure (simplified):
```json
{
"_id": ObjectId("..."),
"securityId": "1000",
"instrument": "EQUITY",
"symbol": "RELIANCE",
"companyName": "Reliance Industries Ltd",
"exchangeSegment": "NSE_EQ",
"tradingSymbol": "RELIANCE",
"isin": "INE002A01018",
"dhanSymbol": "RELIANCE"
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message indicating the number of instruments fetched.
* **MongoDB Check:**
* Connect to MongoDB and query the `instruments_master` collection.
* Run `db.instruments_master.count_documents({})` to ensure a significant number of documents (e.g., > 1000).
* Verify that documents with `exchangeSegment: "NSE_EQ"` are present.
* Check for the presence of known Nifty 50 constituent stocks (e.g., `db.instruments_master.find_one({"symbol": "RELIANCE", "exchangeSegment": "NSE_EQ"})`).
#### Task 2.2: Fetch NSE_FNO instruments (for F&O contracts)
**Purpose:** Obtain `securityId` and other details for all futures and options contracts traded on NSE F&O segment.
**API Call Details:**
* **Endpoint:** `GET /api/v1/instruments`
* **Parameters:** `exchangeSegment=NSE_FNO`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Expected Response:** A JSON array of instrument objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports and SimpleDhanAPIClient from Task 2.1)
# Main execution for Task 2.2
fetch_and_store_instruments("NSE_FNO", api_client, db.instruments_master)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Fetching NSE_FNO instruments...
Successfully fetched and stored XXXXX NSE_FNO instruments.
```
(Where XXXXX is the number of F&O instruments, typically in the tens of thousands).
**Expected Output (MongoDB - `instruments_master` collection):**
* Documents representing NSE F&O instruments.
* Example document structure (simplified for a NIFTY option):
```json
{
"_id": ObjectId("..."),
"securityId": "12345",
"instrument": "OPTIDX",
"symbol": "NIFTY",
"tradingSymbol": "NIFTY24AUG22000CE",
"exchangeSegment": "NSE_FNO",
"underlyingSecurityId": "99999", // securityId of NIFTY index
"expiryDate": "2024-08-29",
"strikePrice": 22000.0,
"optionType": "CE",
"lotSize": 50
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message indicating the number of instruments fetched.
* **MongoDB Check:**
* Run `db.instruments_master.count_documents({"exchangeSegment": "NSE_FNO"})` to ensure a significant number of F&O documents (e.g., > 10000).
* Check for the presence of known index futures (e.g., `db.instruments_master.find_one({"symbol": "NIFTY", "instrument": "FUTIDX"})`).
* Check for the presence of known index options (e.g., `db.instruments_master.find_one({"symbol": "BANKNIFTY", "instrument": "OPTIDX", "optionType": "PE"})`).
#### Task 2.3: Create MongoDB indexes
**Purpose:** Optimize database query performance for frequently accessed fields.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports and MongoDB client from Task 2.1)
print("Creating MongoDB indexes...")
# Create unique index on securityId for fast lookups and preventing duplicates
db.instruments_master.create_index([("securityId", 1)], unique=True)
print("Index on securityId created.")
# Create index on underlyingSecurityId for efficient filtering of F&O contracts
db.instruments_master.create_index([("underlyingSecurityId", 1)])
print("Index on underlyingSecurityId created.")
# Create compound index on (instrument, symbol) for common queries
db.instruments_master.create_index([("instrument", 1), ("symbol", 1)])
print("Compound index on (instrument, symbol) created.")
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Creating MongoDB indexes...
Index on securityId created.
Index on underlyingSecurityId created.
Compound index on (instrument, symbol) created.
```
**Validation (for AI):**
* **MongoDB Check:**
* Connect to MongoDB shell or use `pymongo` to list indexes for `instruments_master` collection.
* Verify that `securityId_1` (unique), `underlyingSecurityId_1`, and `instrument_1_symbol_1` indexes exist.
---
### 3. Build Trading Universes (Expanded Scope)
**Objective:** Define specific instrument sets (universes) for F&O trading, covering all specified stocks and indices, which will be used for real-time subscriptions and historical data fetches.
**Python Script:** `02_build_universes.py`
**Instructions for AI:**
#### Task 3.1: Create Nifty 50, Bank Nifty, Fin Nifty Constituent Stock Universe
**Purpose:** Compile a comprehensive list of `securityId`s for all equity stocks that are part of Nifty 50, Bank Nifty, and Fin Nifty indices. This forms the basis for fetching their F&O contracts.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
# --- List of known Nifty 50, Bank Nifty, Fin Nifty constituents (example/partial list) ---
# AI should understand that a comprehensive list might need to be sourced from a reliable financial data provider
# or dynamically updated from a source like NSE website if available via API.
# For this task, assume a pre-defined list of symbols for demonstration.
# Example list (AI should expand this based on latest index constituents)
# This list should be kept up-to-date by the AI by periodically checking reliable sources.
# For a real-world scenario, this list would be dynamically fetched or maintained.
ALL_INDEX_CONSTITUENT_SYMBOLS = [
"RELIANCE", "TCS", "HDFCBANK", "INFY", "ICICIBANK", "HINDUNILVR", "ITC", "SBIN",
"BHARTIARTL", "ASIANPAINT", "MARUTI", "BAJFINANCE", "LT", "HCLTECH", "AXISBANK",
"KOTAKBANK", "TITAN", "NESTLEIND", "ULTRACEMCO", "WIPRO", "ADANIPORTS", "NTPC",
"POWERGRID", "TATAMOTORS", "TECHM", "SUNPHARMA", "ONGC", "TATASTEEL", "COALINDIA",
"BAJAJFINSV", "M&M", "DRREDDY", "GRASIM", "CIPLA", "BRITANNIA", "EICHERMOT",
"HEROMOTOCO", "BPCL", "DIVISLAB", "TATACONSUM", "HINDALCO", "JSWSTEEL", "APOLLOHOSP",
"INDUSINDBK", "ADANIENT", "BAJAJ-AUTO", "SHREECEM", "SBILIFE", "HDFCLIFE", "PIDILITIND",
# Bank Nifty specific (beyond Nifty 50 overlap)
"FEDERALBNK", "IDFCFIRSTB", "PNB", "BANDHANBNK",
# Fin Nifty specific (beyond Nifty 50/Bank Nifty overlap)
"CHOLAFIN", "BAJAJHLDNG", "HDFC", "LICI", "MUTHOOTFIN", "PEL", "SRF", "SBICARD", "SHRIRAMFIN"
# AI should ensure this list is comprehensive and unique.
]
def build_constituent_stock_universe(db_client, symbols_list):
print("Building ALL_INDEX_CONSTITUENT_STOCKS universe...")
security_ids = []
for symbol in symbols_list:
# Find the equity securityId for the symbol
instrument_info = db_client.instruments_master.find_one({
"symbol": symbol,
"exchangeSegment": "NSE_EQ",
"instrument": "EQUITY"
})
if instrument_info:
security_ids.append({"symbol": symbol, "securityId": instrument_info["securityId"]})
else:
print(f"Warning: Equity instrument not found for symbol: {symbol}")
if security_ids:
# Upsert the universe document
db_client.universes.update_one(
{"_id": "ALL_INDEX_CONSTITUENT_STOCKS"},
{"$set": {"members": security_ids, "last_updated": datetime.now()}},
upsert=True
)
print(f"ALL_INDEX_CONSTITUENT_STOCKS universe built with {len(security_ids)} members.")
else:
print("No constituent stocks found to build universe.")
from datetime import datetime
# Main execution for Task 3.1
build_constituent_stock_universe(db, ALL_INDEX_CONSTITUENT_SYMBOLS)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Building ALL_INDEX_CONSTITUENT_STOCKS universe...
ALL_INDEX_CONSTITUENT_STOCKS universe built with XXX members.
```
(Where XXX is the number of unique constituent stocks, typically around 100-150).
**Expected Output (MongoDB - `universes` collection):**
* A document with `_id: "ALL_INDEX_CONSTITUENT_STOCKS"`.
* Example document structure:
```json
{
"_id": "ALL_INDEX_CONSTITUENT_STOCKS",
"members": [
{"symbol": "RELIANCE", "securityId": "1000"},
{"symbol": "HDFCBANK", "securityId": "1001"},
// ... other constituent stocks
],
"last_updated": ISODate("2025-08-08T10:00:00.000Z")
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message and the number of members.
* **MongoDB Check:**
* Query `db.universes.find_one({"_id": "ALL_INDEX_CONSTITUENT_STOCKS"})`.
* Verify that the `members` array is populated and contains `symbol` and `securityId` for known constituent stocks.
* Check the count of members to ensure it aligns with expectations (e.g., `len(doc["members"])` should be > 100).
#### Task 3.2: Create Index Universe
**Purpose:** Compile a list of `securityId`s for the major indices (NIFTY, BANKNIFTY, FINNIFTY) themselves, as these are also traded as F&O.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
MAJOR_INDEX_SYMBOLS = ["NIFTY", "BANKNIFTY", "FINNIFTY"]
def build_major_index_universe(db_client, symbols_list):
print("Building MAJOR_INDICES universe...")
security_ids = []
for symbol in symbols_list:
# Find the index securityId for the symbol
instrument_info = db_client.instruments_master.find_one({
"symbol": symbol,
"instrument": "INDEX" # Ensure it's the index, not an F&O contract
})
if instrument_info:
security_ids.append({"symbol": symbol, "securityId": instrument_info["securityId"]})
else:
print(f"Warning: Index instrument not found for symbol: {symbol}")
if security_ids:
db_client.universes.update_one(
{"_id": "MAJOR_INDICES"},
{"$set": {"members": security_ids, "last_updated": datetime.now()}},
upsert=True
)
print(f"MAJOR_INDICES universe built with {len(security_ids)} members.")
else:
print("No major indices found to build universe.")
from datetime import datetime
# Main execution for Task 3.2
build_major_index_universe(db, MAJOR_INDEX_SYMBOLS)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Building MAJOR_INDICES universe...
MAJOR_INDICES universe built with 3 members.
```
**Expected Output (MongoDB - `universes` collection):**
* A document with `_id: "MAJOR_INDICES"`.
* Example document structure:
```json
{
"_id": "MAJOR_INDICES",
"members": [
{"symbol": "NIFTY", "securityId": "99999"},
{"symbol": "BANKNIFTY", "securityId": "88888"},
{"symbol": "FINNIFTY", "securityId": "77777"}
],
"last_updated": ISODate("2025-08-08T10:05:00.000Z")
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message and that 3 members were built.
* **MongoDB Check:**
* Query `db.universes.find_one({"_id": "MAJOR_INDICES"})`.
* Verify that the `members` array contains exactly 3 entries for NIFTY, BANKNIFTY, and FINNIFTY, each with a `securityId`.
---
### 4. Compute Derivative Views (Expanded Scope)
**Objective:** Create mappings of underlying assets to their relevant F&O contracts (futures and options) for current and next expiries. This is crucial for subscribing to the correct instruments for real-time data and for fetching option chains.
**Python Script:** `03_derivative_views.py`
**Instructions for AI:**
#### Task 4.1: Map Index Futures
**Purpose:** Identify and store the `securityId`s for the current month and next month futures contracts for NIFTY, BANKNIFTY, and FINNIFTY indices.
**API Call Details:** No direct API call for this task. It queries the `instruments_master` collection.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime, timedelta
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def get_current_ist_date():
return datetime.now(IST).date()
def map_index_futures(db_client):
print("Mapping Index Futures...")
major_indices_doc = db_client.universes.find_one({"_id": "MAJOR_INDICES"})
if not major_indices_doc:
print("MAJOR_INDICES universe not found. Please complete Task 3.2 first.")
return
index_futures_data = []
today = get_current_ist_date()
for index_info in major_indices_doc["members"]:
symbol = index_info["symbol"]
underlying_security_id = index_info["securityId"]
# Find all FUTIDX contracts for this symbol
futures = list(db_client.instruments_master.find({
"symbol": symbol,
"instrument": "FUTIDX",
"exchangeSegment": "NSE_FNO"
}).sort("expiryDate", 1)) # Sort by expiry date ascending
# Filter out expired futures and get relevant ones
relevant_futures = []
for fut in futures:
expiry_date_str = fut["expiryDate"]
expiry_date = datetime.strptime(expiry_date_str, "%Y-%m-%d").date()
if expiry_date >= today:
relevant_futures.append(fut)
if len(relevant_futures) >= 1:
current_month_fut = relevant_futures[0]
index_futures_data.append({
"underlyingSymbol": symbol,
"underlyingSecurityId": underlying_security_id,
"type": "FUTIDX",
"currentMonthFuture": {
"securityId": current_month_fut["securityId"],
"tradingSymbol": current_month_fut["tradingSymbol"],
"expiryDate": current_month_fut["expiryDate"]
}
})
if len(relevant_futures) >= 2:
next_month_fut = relevant_futures[1]
index_futures_data[-1]["nextMonthFuture"] = {
"securityId": next_month_fut["securityId"],
"tradingSymbol": next_month_fut["tradingSymbol"],
"expiryDate": next_month_fut["expiryDate"]
}
else:
print(f"Warning: No active futures found for {symbol}")
if index_futures_data:
db_client.derivative_views.update_one(
{"_id": "INDEX_FUTURES"},
{"$set": {"data": index_futures_data, "last_updated": datetime.now()}},
upsert=True
)
print(f"Mapped {len(index_futures_data)} index futures.")
else:
print("No index futures mapped.")
# Main execution for Task 4.1
map_index_futures(db)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Mapping Index Futures...
Mapped 3 index futures.
```
**Expected Output (MongoDB - `derivative_views` collection):**
* A document with `_id: "INDEX_FUTURES"`.
* Example document structure (simplified):
```json
{
"_id": "INDEX_FUTURES",
"data": [
{
"underlyingSymbol": "NIFTY",
"underlyingSecurityId": "99999",
"type": "FUTIDX",
"currentMonthFuture": {
"securityId": "12345",
"tradingSymbol": "NIFTY24AUGFUT",
"expiryDate": "2024-08-29"
},
"nextMonthFuture": {
"securityId": "12346",
"tradingSymbol": "NIFTY24SEPFUT",
"expiryDate": "2024-09-26"
}
},
// ... similar for BANKNIFTY and FINNIFTY
],
"last_updated": ISODate("2025-08-08T10:10:00.000Z")
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message and that 3 index futures were mapped.
* **MongoDB Check:**
* Query `db.derivative_views.find_one({"_id": "INDEX_FUTURES"})`.
* Verify that the `data` array contains 3 entries, one for each major index.
* For each index, confirm `currentMonthFuture` and `nextMonthFuture` (if available) are present with correct `securityId`, `tradingSymbol`, and `expiryDate`.
* Ensure `expiryDate` is in the future relative to `today`.
#### Task 4.2: Map Index Options
**Purpose:** Identify and store the `securityId`s for the current weekly and next weekly options contracts for NIFTY, BANKNIFTY, and FINNIFTY indices.
**API Call Details:** No direct API call for this task. It queries the `instruments_master` collection.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports and MongoDB client from Task 4.1)
def get_next_weekday(d, weekday): # 0=Monday, 1=Tuesday, ..., 6=Sunday
days_ahead = weekday - d.weekday()
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
return d + timedelta(days_ahead)
def map_index_options(db_client):
print("Mapping Index Options...")
major_indices_doc = db_client.universes.find_one({"_id": "MAJOR_INDICES"})
if not major_indices_doc:
print("MAJOR_INDICES universe not found. Please complete Task 3.2 first.")
return
index_options_data = []
today = get_current_ist_date()
for index_info in major_indices_doc["members"]:
symbol = index_info["symbol"]
underlying_security_id = index_info["securityId"]
# Find all OPTIDX contracts for this symbol
options = list(db_client.instruments_master.find({
"symbol": symbol,
"instrument": "OPTIDX",
"exchangeSegment": "NSE_FNO"
}).sort("expiryDate", 1)) # Sort by expiry date ascending
# Filter out expired options
relevant_options = []
for opt in options:
expiry_date_str = opt["expiryDate"]
expiry_date = datetime.strptime(expiry_date_str, "%Y-%m-%d").date()
if expiry_date >= today:
relevant_options.append(opt)
# Determine weekly expiries (typically Thursday for Nifty/BankNifty, Tuesday for FinNifty)
# This logic might need refinement based on actual Dhan HQ expiry patterns
weekly_expiries = sorted(list(set([opt["expiryDate"] for opt in relevant_options if datetime.strptime(opt["expiryDate"], "%Y-%m-%d").weekday() == 3]))) # Thursday
if symbol == "FINNIFTY":
weekly_expiries = sorted(list(set([opt["expiryDate"] for opt in relevant_options if datetime.strptime(opt["expiryDate"], "%Y-%m-%d").weekday() == 1]))) # Tuesday
current_weekly_expiry = None
next_weekly_expiry = None
for exp_str in weekly_expiries:
exp_date = datetime.strptime(exp_str, "%Y-%m-%d").date()
if exp_date >= today:
if current_weekly_expiry is None:
current_weekly_expiry = exp_str
elif next_weekly_expiry is None and exp_str != current_weekly_expiry:
next_weekly_expiry = exp_str
break
index_options_entry = {
"underlyingSymbol": symbol,
"underlyingSecurityId": underlying_security_id,
"type": "OPTIDX",
"expiries": {}
}
if current_weekly_expiry:
index_options_entry["expiries"]["currentWeekly"] = current_weekly_expiry
if next_weekly_expiry:
index_options_entry["expiries"]["nextWeekly"] = next_weekly_expiry
if current_weekly_expiry or next_weekly_expiry:
index_options_data.append(index_options_entry)
else:
print(f"Warning: No active weekly options found for {symbol}")
if index_options_data:
db_client.derivative_views.update_one(
{"_id": "INDEX_OPTIONS"},
{"$set": {"data": index_options_data, "last_updated": datetime.now()}},
upsert=True
)
print(f"Mapped {len(index_options_data)} index options.")
else:
print("No index options mapped.")
# Main execution for Task 4.2
map_index_options(db)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Mapping Index Options...
Mapped 3 index options.
```
**Expected Output (MongoDB - `derivative_views` collection):**
* A document with `_id: "INDEX_OPTIONS"`.
* Example document structure (simplified):
```json
{
"_id": "INDEX_OPTIONS",
"data": [
{
"underlyingSymbol": "NIFTY",
"underlyingSecurityId": "99999",
"type": "OPTIDX",
"expiries": {
"currentWeekly": "2024-08-08",
"nextWeekly": "2024-08-15"
}
},
// ... similar for BANKNIFTY and FINNIFTY
],
"last_updated": ISODate("2025-08-08T10:15:00.000Z")
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message and that 3 index options were mapped.
* **MongoDB Check:**
* Query `db.derivative_views.find_one({"_id": "INDEX_OPTIONS"})`.
* Verify that the `data` array contains 3 entries, one for each major index.
* For each index, confirm `currentWeekly` and `nextWeekly` expiries are present and are valid future dates.
* Ensure the `expiryDate` corresponds to the correct weekday (Thursday for Nifty/BankNifty, Tuesday for FinNifty).
#### Task 4.3: Map Stock Futures (All Nifty 50, Bank Nifty, Fin Nifty F&O stocks)
**Purpose:** Identify and store the `securityId`s for the current month and next month futures contracts for all relevant F&O stocks that are constituents of Nifty 50, Bank Nifty, and Fin Nifty.
**API Call Details:** No direct API call for this task. It queries the `instruments_master` collection.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports and MongoDB client from Task 4.1)
def map_stock_futures(db_client):
print("Mapping Stock Futures...")
all_constituent_stocks_doc = db_client.universes.find_one({"_id": "ALL_INDEX_CONSTITUENT_STOCKS"})
if not all_constituent_stocks_doc:
print("ALL_INDEX_CONSTITUENT_STOCKS universe not found. Please complete Task 3.1 first.")
return
stock_futures_data = []
today = get_current_ist_date()
for stock_info in all_constituent_stocks_doc["members"]:
symbol = stock_info["symbol"]
underlying_security_id = stock_info["securityId"]
# Find all FUTSTK contracts for this symbol
futures = list(db_client.instruments_master.find({
"symbol": symbol,
"instrument": "FUTSTK",
"exchangeSegment": "NSE_FNO"
}).sort("expiryDate", 1)) # Sort by expiry date ascending
# Filter out expired futures and get relevant ones
relevant_futures = []
for fut in futures:
expiry_date_str = fut["expiryDate"]
expiry_date = datetime.strptime(expiry_date_str, "%Y-%m-%d").date()
if expiry_date >= today:
relevant_futures.append(fut)
if len(relevant_futures) >= 1:
current_month_fut = relevant_futures[0]
stock_futures_entry = {
"underlyingSymbol": symbol,
"underlyingSecurityId": underlying_security_id,
"type": "FUTSTK",
"currentMonthFuture": {
"securityId": current_month_fut["securityId"],
"tradingSymbol": current_month_fut["tradingSymbol"],
"expiryDate": current_month_fut["expiryDate"]
}
}
if len(relevant_futures) >= 2:
next_month_fut = relevant_futures[1]
stock_futures_entry["nextMonthFuture"] = {
"securityId": next_month_fut["securityId"],
"tradingSymbol": next_month_fut["tradingSymbol"],
"expiryDate": next_month_fut["expiryDate"]
}
stock_futures_data.append(stock_futures_entry)
# else: print(f"Warning: No active stock futures found for {symbol}") # Uncomment for debugging
if stock_futures_data:
db_client.derivative_views.update_one(
{"_id": "STOCK_FUTURES"},
{"$set": {"data": stock_futures_data, "last_updated": datetime.now()}},
upsert=True
)
print(f"Mapped {len(stock_futures_data)} stock futures.")
else:
print("No stock futures mapped.")
# Main execution for Task 4.3
map_stock_futures(db)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Mapping Stock Futures...
Mapped XXX stock futures.
```
(Where XXX is the number of F&O stocks for which futures were found, typically around 100-150).
**Expected Output (MongoDB - `derivative_views` collection):**
* A document with `_id: "STOCK_FUTURES"`.
* Example document structure (simplified):
```json
{
"_id": "STOCK_FUTURES",
"data": [
{
"underlyingSymbol": "RELIANCE",
"underlyingSecurityId": "1000",
"type": "FUTSTK",
"currentMonthFuture": {
"securityId": "10001",
"tradingSymbol": "RELIANCE24AUGFUT",
"expiryDate": "2024-08-29"
},
"nextMonthFuture": {
"securityId": "10002",
"tradingSymbol": "RELIANCE24SEPFUT",
"expiryDate": "2024-09-26"
}
},
// ... similar for other F&O stocks
],
"last_updated": ISODate("2025-08-08T10:20:00.000Z")
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message and the number of stock futures mapped.
* **MongoDB Check:**
* Query `db.derivative_views.find_one({"_id": "STOCK_FUTURES"})`.
* Verify that the `data` array is populated with entries for various F&O stocks.
* For each stock, confirm `currentMonthFuture` and `nextMonthFuture` (if available) are present with correct `securityId`, `tradingSymbol`, and `expiryDate`.
* Ensure `expiryDate` is in the future relative to `today`.
#### Task 4.4: Map Stock Options (All Nifty 50, Bank Nifty, Fin Nifty F&O stocks)
**Purpose:** Identify and store the `securityId`s for the current weekly and next weekly options contracts for all relevant F&O stocks that are constituents of Nifty 50, Bank Nifty, and Fin Nifty.
**API Call Details:** No direct API call for this task. It queries the `instruments_master` collection.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports and MongoDB client from Task 4.1)
def map_stock_options(db_client):
print("Mapping Stock Options...")
all_constituent_stocks_doc = db_client.universes.find_one({"_id": "ALL_INDEX_CONSTITUENT_STOCKS"})
if not all_constituent_stocks_doc:
print("ALL_INDEX_CONSTITUENT_STOCKS universe not found. Please complete Task 3.1 first.")
return
stock_options_data = []
today = get_current_ist_date()
for stock_info in all_constituent_stocks_doc["members"]:
symbol = stock_info["symbol"]
underlying_security_id = stock_info["securityId"]
# Find all OPTSTK contracts for this symbol
options = list(db_client.instruments_master.find({
"symbol": symbol,
"instrument": "OPTSTK",
"exchangeSegment": "NSE_FNO"
}).sort("expiryDate", 1)) # Sort by expiry date ascending
# Filter out expired options
relevant_options = []
for opt in options:
expiry_date_str = opt["expiryDate"]
expiry_date = datetime.strptime(expiry_date_str, "%Y-%m-%d").date()
if expiry_date >= today:
relevant_options.append(opt)
# Determine weekly expiries for stocks (typically Friday, but can vary)
# This logic might need refinement based on actual Dhan HQ expiry patterns
weekly_expiries = sorted(list(set([opt["expiryDate"] for opt in relevant_options if datetime.strptime(opt["expiryDate"], "%Y-%m-%d").weekday() == 4]))) # Friday
current_weekly_expiry = None
next_weekly_expiry = None
for exp_str in weekly_expiries:
exp_date = datetime.strptime(exp_str, "%Y-%m-%d").date()
if exp_date >= today:
if current_weekly_expiry is None:
current_weekly_expiry = exp_str
elif next_weekly_expiry is None and exp_str != current_weekly_expiry:
next_weekly_expiry = exp_str
break
stock_options_entry = {
"underlyingSymbol": symbol,
"underlyingSecurityId": underlying_security_id,
"type": "OPTSTK",
"expiries": {}
}
if current_weekly_expiry:
stock_options_entry["expiries"]["currentWeekly"] = current_weekly_expiry
if next_weekly_expiry:
stock_options_entry["expiries"]["nextWeekly"] = next_weekly_expiry
if current_weekly_expiry or next_weekly_expiry:
stock_options_data.append(stock_options_entry)
# else: print(f"Warning: No active stock options found for {symbol}") # Uncomment for debugging
if stock_options_data:
db_client.derivative_views.update_one(
{"_id": "STOCK_OPTIONS"},
{"$set": {"data": stock_options_data, "last_updated": datetime.now()}},
upsert=True
)
print(f"Mapped {len(stock_options_data)} stock options.")
else:
print("No stock options mapped.")
# Main execution for Task 4.4
map_stock_options(db)
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Mapping Stock Options...
Mapped XXX stock options.
```
(Where XXX is the number of F&O stocks for which options were found, typically around 100-150).
**Expected Output (MongoDB - `derivative_views` collection):**
* A document with `_id: "STOCK_OPTIONS"`.
* Example document structure (simplified):
```json
{
"_id": "STOCK_OPTIONS",
"data": [
{
"underlyingSymbol": "RELIANCE",
"underlyingSecurityId": "1000",
"type": "OPTSTK",
"expiries": {
"currentWeekly": "2024-08-09",
"nextWeekly": "2024-08-16"
}
},
// ... similar for other F&O stocks
],
"last_updated": ISODate("2025-08-08T10:25:00.000Z")
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message and the number of stock options mapped.
* **MongoDB Check:**
* Query `db.derivative_views.find_one({"_id": "STOCK_OPTIONS"})`.
* Verify that the `data` array is populated with entries for various F&O stocks.
* For each stock, confirm `currentWeekly` and `nextWeekly` expiries are present and are valid future dates.
* Ensure the `expiryDate` corresponds to the correct weekday (typically Friday for stock options).
---
## Phase 2: Real-Time Data Ingestion (Core Trading Data)
### 5. Implement WebSocket Client for Live Data (Expanded Scope)
**Objective:** Establish a robust WebSocket connection to receive tick-by-tick market data for all relevant F&O trading instruments (indices, index F&O, stock F&O) and store it efficiently.
**Python Script:** `04_websocket_client.py`
**Instructions for AI:**
#### Task 5.1: Build Comprehensive Subscription List
**Purpose:** Generate a single, comprehensive, and de-duplicated list of `securityId`s for all instruments that need real-time market data. This list will be used to subscribe to the Dhan HQ WebSocket feed.
**API Call Details:** No direct API call. This task queries the `instruments_master`, `universes`, and `derivative_views` collections.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def get_comprehensive_subscription_list(db_client):
print("Building comprehensive subscription list...")
subscribed_security_ids = set() # Use a set to automatically handle duplicates
# 1. Add Major Indices (NIFTY, BANKNIFTY, FINNIFTY)
major_indices_doc = db_client.universes.find_one({"_id": "MAJOR_INDICES"})
if major_indices_doc:
for member in major_indices_doc["members"]:
subscribed_security_ids.add(member["securityId"])
# 2. Add All Index Constituent Stocks (underlying equities)
all_constituent_stocks_doc = db_client.universes.find_one({"_id": "ALL_INDEX_CONSTITUENT_STOCKS"})
if all_constituent_stocks_doc:
for member in all_constituent_stocks_doc["members"]:
subscribed_security_ids.add(member["securityId"])
# 3. Add Index Futures (current and next month)
index_futures_doc = db_client.derivative_views.find_one({"_id": "INDEX_FUTURES"})
if index_futures_doc:
for entry in index_futures_doc["data"]:
if "currentMonthFuture" in entry:
subscribed_security_ids.add(entry["currentMonthFuture"]["securityId"])
if "nextMonthFuture" in entry:
subscribed_security_ids.add(entry["nextMonthFuture"]["securityId"])
# 4. Add Index Options (all strikes for current/next weekly expiries)
index_options_doc = db_client.derivative_views.find_one({"_id": "INDEX_OPTIONS"})
if index_options_doc:
for entry in index_options_doc["data"]:
for expiry_type, expiry_date_str in entry["expiries"].items():
# Find all options for this underlying and expiry
options_for_expiry = db_client.instruments_master.find({
"symbol": entry["underlyingSymbol"],
"instrument": "OPTIDX",
"expiryDate": expiry_date_str
})
for opt in options_for_expiry:
subscribed_security_ids.add(opt["securityId"])
# 5. Add Stock Futures (current and next month for all relevant F&O stocks)
stock_futures_doc = db_client.derivative_views.find_one({"_id": "STOCK_FUTURES"})
if stock_futures_doc:
for entry in stock_futures_doc["data"]:
if "currentMonthFuture" in entry:
subscribed_security_ids.add(entry["currentMonthFuture"]["securityId"])
if "nextMonthFuture" in entry:
subscribed_security_ids.add(entry["nextMonthFuture"]["securityId"])
# 6. Add Stock Options (all strikes for current/next weekly expiries for all relevant F&O stocks)
stock_options_doc = db_client.derivative_views.find_one({"_id": "STOCK_OPTIONS"})
if stock_options_doc:
for entry in stock_options_doc["data"]:
for expiry_type, expiry_date_str in entry["expiries"].items():
# Find all options for this underlying and expiry
options_for_expiry = db_client.instruments_master.find({
"symbol": entry["underlyingSymbol"],
"instrument": "OPTSTK",
"expiryDate": expiry_date_str
})
for opt in options_for_expiry:
subscribed_security_ids.add(opt["securityId"])
print(f"Comprehensive subscription list built with {len(subscribed_security_ids)} unique security IDs.")
return list(subscribed_security_ids)
# Main execution for Task 5.1
# subscription_list = get_comprehensive_subscription_list(db)
# print(f"Final subscription list size: {len(subscription_list)}")
# Close MongoDB connection (important in a full script)
# client.close()
```
**Expected Output (CLI):**
```
Building comprehensive subscription list...
Comprehensive subscription list built with XXXX unique security IDs.
Final subscription list size: XXXX
```
(Where XXXX is the total number of unique instruments, likely in the thousands, depending on the number of strikes for options).
**Validation (for AI):**
* **CLI Check:** Confirm the success message and the final count of `securityId`s.
* **Logic Check:** Manually (or programmatically) verify that the list includes:
* The three major indices (NIFTY, BANKNIFTY, FINNIFTY).
* All their current/next month futures and current/next weekly options (all strikes).
* All constituent stocks from Nifty 50, Bank Nifty, and Fin Nifty that have F&O contracts.
* All their current/next month futures and current/next weekly options (all strikes).
* Ensure there are no duplicate `securityId`s in the final list.
#### Task 5.2: Implement WebSocket connection
**Purpose:** Establish a persistent WebSocket connection to Dhan HQ and send subscription requests for the comprehensive list of instruments.
**API Call Details:**
* **Endpoint:** `wss://api.dhan.co/ws/v1/marketdata`
* **Authentication:** Via `access-token` and `x-dhan-client-id` in the WebSocket URL or headers.
* **Subscription Message:** JSON payload with `action: "subscribe"`, `securityIds`, `exchangeSegment`, `feedType: "full"`.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import json
import websocket
import threading
import time
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime, time as dt_time
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# --- Dual API Client (Conceptual - will be fully implemented in Phase 7) ---
# For now, assume a simple client that uses the first API key for WebSocket.
# The AI should understand that the WebSocket connection might be managed by one dedicated API key.
DHAN_ACCESS_TOKEN = os.getenv("DHAN_ACCESS_TOKEN_1")
DHAN_CLIENT_ID = os.getenv("DHAN_CLIENT_ID_1")
WS_URL = f"wss://api.dhan.co/ws/v1/marketdata?access_token={DHAN_ACCESS_TOKEN}&client_id={DHAN_CLIENT_ID}"
# Global variable to hold the WebSocket app instance
ws_app = None
# Function to check if market is open (09:15 to 15:30 IST)
def is_market_open():
now_ist = datetime.now(IST).time()
market_open_time = dt_time(9, 15)
market_close_time = dt_time(15, 30)
return market_open_time <= now_ist <= market_close_time
# WebSocket event handlers
def on_message(ws, message):
# This will be implemented in Task 5.3
pass
def on_error(ws, error):
print(f"WebSocket Error: {error}")
def on_close(ws, close_status_code, close_msg):
print(f"WebSocket Closed: Code={close_status_code}, Message={close_msg}")
def on_open(ws):
print("WebSocket Opened. Sending subscription...")
# Fetch the comprehensive subscription list (from Task 5.1)
# AI should call the function from Task 5.1 here
# For demonstration, let's use a placeholder list
# subscription_list = get_comprehensive_subscription_list(db) # Uncomment in actual implementation
subscription_list = ["1000", "10001", "12345"] # Placeholder for testing
if not subscription_list:
print("No security IDs to subscribe to. Closing WebSocket.")
ws.close()
return
# Dhan HQ API requires exchangeSegment for subscription. This needs careful handling.
# For a comprehensive list, you might need multiple subscription messages per exchangeSegment
# or a generic approach if Dhan HQ supports it.
# Assuming a single subscription for now, AI should refine this.
subscribe_message = {
"action": "subscribe",
"securityIds": subscription_list,
"exchangeSegment": "NSE_FNO", # This needs to be dynamic or multiple subscriptions
"feedType": "full"
}
ws.send(json.dumps(subscribe_message))
print(f"Subscribed to {len(subscription_list)} instruments.")
def start_websocket_client():
global ws_app
if not is_market_open():
print("Market is closed. Not starting WebSocket client.")
return
print("Starting WebSocket client...")
ws_app = websocket.WebSocketApp(
WS_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
# Run WebSocket in a separate thread to not block main execution
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True # Daemonize thread so it exits when main program exits
ws_thread.start()
print("WebSocket client started in background thread.")
# Main execution for Task 5.2
# if __name__ == "__main__":
# start_websocket_client()
# try:
# while True:
# time.sleep(1) # Keep main thread alive
# if not is_market_open() and ws_app and ws_app.sock: # Check if market closed during session
# print("Market closed during session. Closing WebSocket.")
# ws_app.close()
# break
# except KeyboardInterrupt:
# print("Stopping WebSocket client via KeyboardInterrupt.")
# if ws_app: ws_app.close()
# finally:
# client.close()
```
**Expected Output (CLI):**
```
Starting WebSocket client...
WebSocket client started in background thread.
WebSocket Opened. Sending subscription...
Subscribed to XXXX instruments.
```
(Where XXXX is the number of instruments from Task 5.1).
**Validation (for AI):**
* **CLI Check:** Confirm the messages indicating WebSocket client start, connection open, and successful subscription.
* **Network Check (Conceptual):** The AI should understand that in a real environment, it would monitor network traffic to confirm WebSocket frames are being sent and received.
* **Error Handling:** Test with invalid API keys to ensure `on_error` is triggered and connection fails gracefully.
#### Task 5.3: Implement data parsing and storage
**Purpose:** Process incoming WebSocket messages, extract relevant market data, and store it efficiently in MongoDB.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports and WebSocket setup from Task 5.2)
# MongoDB collection for live ticks
db.live_ticks_full.create_index([("securityId", 1), ("ts_event_ist", 1)], unique=True)
# In-memory buffer for batching writes (for efficiency)
tick_buffer = []
BUFFER_MAX_SIZE = 1000 # Flush after 1000 ticks
BUFFER_FLUSH_INTERVAL_SEC = 5 # Flush every 5 seconds
last_flush_time = time.time()
def flush_buffer():
global tick_buffer, last_flush_time
if tick_buffer:
print(f"Flushing {len(tick_buffer)} ticks to MongoDB...")
operations = []
for tick in tick_buffer:
operations.append(
pymongo.UpdateOne(
{"securityId": tick["securityId"], "ts_event_ist": tick["ts_event_ist"]},
{"$set": tick},
upsert=True
)
)
try:
db.live_ticks_full.bulk_write(operations, ordered=False) # ordered=False for performance
print("Buffer flushed successfully.")
except pymongo.errors.BulkWriteError as bwe:
print(f"Bulk write error: {bwe.details}")
# AI should analyze bwe.details for specific errors like duplicate key errors
except Exception as e:
print(f"Error during bulk write: {e}")
tick_buffer = []
last_flush_time = time.time()
def on_message(ws, message):
global tick_buffer, last_flush_time
try:
data = json.loads(message)
# print(f"Received: {data}") # Uncomment for debugging
# Process only 'full' packets (Code 8)
if data.get("feedDetails", {}).get("feedType") == "full" and data.get("feedDetails", {}).get("packetCode") == 8:
security_id = str(data["securityId"])
# Convert timestamp to IST datetime object
# Dhan HQ timestamp is typically epoch milliseconds
ts_event_ms = data.get("feedDetails", {}).get("exchangeTimeStamp")
if ts_event_ms:
ts_event_ist = datetime.fromtimestamp(ts_event_ms / 1000, tz=IST).replace(tzinfo=None) # Store as naive IST
else:
ts_event_ist = datetime.now(IST).replace(tzinfo=None) # Fallback to current IST
tick_data = {
"securityId": security_id,
"ts_event_ist": ts_event_ist,
"ts_ingest_utc": datetime.utcnow(), # Timestamp of ingestion
"ltp": data.get("ltp"),
"atp": data.get("atp"),
"volume": data.get("volume"),
"dayOpen": data.get("dayOpen"),
"dayHigh": data.get("dayHigh"),
"dayLow": data.get("dayLow"),
"dayClose": data.get("dayClose"),
"openInterest": data.get("openInterest"), # Will be None for equities
"depth": data.get("depth"), # Best 5 bid/ask
# Add other relevant fields from the full packet
}
tick_buffer.append(tick_data)
# Check buffer size or time for flushing
if len(tick_buffer) >= BUFFER_MAX_SIZE or (time.time() - last_flush_time) >= BUFFER_FLUSH_INTERVAL_SEC:
flush_buffer()
except json.JSONDecodeError:
print(f"Received non-JSON message: {message}")
except Exception as e:
print(f"Error processing WebSocket message: {e}\nMessage: {message}")
# Main execution for Task 5.3 (integrated into the main WebSocket loop)
# The main loop from Task 5.2 should call start_websocket_client()
# and then continuously check for market open/close and flush buffer periodically.
# Example of how the main loop would look:
# if __name__ == "__main__":
# start_websocket_client()
# try:
# while True:
# time.sleep(1) # Keep main thread alive
# if not is_market_open() and ws_app and ws_app.sock: # Check if market closed during session
# print("Market closed during session. Closing WebSocket.")
# ws_app.close()
# break
# # Periodically flush buffer even if not full
# if (time.time() - last_flush_time) >= BUFFER_FLUSH_INTERVAL_SEC:
# flush_buffer()
# except KeyboardInterrupt:
# print("Stopping WebSocket client via KeyboardInterrupt.")
# if ws_app: ws_app.close()
# finally:
# client.close()
```
**Expected Output (CLI - during market hours):**
```
Flushing XXX ticks to MongoDB...
Buffer flushed successfully.
```
(This message will appear periodically as the buffer fills or time interval passes).
**Expected Output (MongoDB - `live_ticks_full` collection):**
* Documents representing individual market ticks.
* Example document structure (simplified):
```json
{
"_id": ObjectId("..."),
"securityId": "12345",
"ts_event_ist": ISODate("2025-08-08T10:30:05.123Z"),
"ts_ingest_utc": ISODate("2025-08-08T05:00:05.500Z"),
"ltp": 22000.50,
"volume": 1234567,
"openInterest": 123456, // Present for F&O, null/absent for equities
"depth": {"buy": [...], "sell": [...]}
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Observe periodic
flush messages.
* **MongoDB Check:**
* Query `db.live_ticks_full.count_documents({})` to ensure documents are accumulating.
* Inspect a few documents to verify `securityId`, `ltp`, `ts_event_ist`, and `openInterest` (if applicable) are correctly populated.
* For F&O instruments, verify `openInterest` is present. For equity instruments, verify `openInterest` is `None` or absent.
* Check that `ts_event_ist` is an IST datetime and `ts_ingest_utc` is a UTC datetime.
#### Task 5.4: Implement market hour gating
**Purpose:** Ensure the WebSocket client only runs during Indian market hours (09:15 to 15:30 IST) to avoid unnecessary API calls and errors when markets are closed.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports from Task 5.2)
# Function to check if market is open (09:15 to 15:30 IST)
def is_market_open():
now_ist = datetime.now(IST).time()
market_open_time = dt_time(9, 15)
market_close_time = dt_time(15, 30)
return market_open_time <= now_ist <= market_close_time
# Modified start_websocket_client to include market hour check
def start_websocket_client():
global ws_app
if not is_market_open():
print("Market is closed. Not starting WebSocket client. Next open: 09:15 IST tomorrow.")
return
print("Starting WebSocket client...")
# ... (rest of the WebSocket setup from Task 5.2 on_open and on_message)
ws_app = websocket.WebSocketApp(
WS_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
print("WebSocket client started in background thread.")
# Main execution loop (to be run as the primary script)
if __name__ == "__main__":
start_websocket_client()
try:
while True:
time.sleep(1) # Keep main thread alive
# Check if market closed during session and close WebSocket
if not is_market_open() and ws_app and ws_app.sock: # ws_app.sock checks if connection is active
print("Market closed during session. Closing WebSocket.")
ws_app.close()
break
# Periodically flush buffer even if not full (from Task 5.3)
if (time.time() - last_flush_time) >= BUFFER_FLUSH_INTERVAL_SEC:
flush_buffer()
except KeyboardInterrupt:
print("Stopping WebSocket client via KeyboardInterrupt.")
if ws_app: ws_app.close()
finally:
client.close()
```
**Expected Output (CLI - outside market hours):**
```
Market is closed. Not starting WebSocket client. Next open: 09:15 IST tomorrow.
```
**Expected Output (CLI - during market hours, then market closes):**
```
Starting WebSocket client...
WebSocket client started in background thread.
WebSocket Opened. Sending subscription...
Subscribed to XXXX instruments.
... (data flushing messages)
Market closed during session. Closing WebSocket.
WebSocket Closed: Code=1000, Message=
```
**Validation (for AI):**
* **Behavioral Test (Outside Market Hours):** Run the script when the market is closed. Verify that it prints the
message indicating the market is closed and does not attempt to connect to the WebSocket.
* **Behavioral Test (During Market Hours, then Close):** Run the script during market hours. Verify that it connects, receives data, and then gracefully closes the WebSocket connection when the market close time is reached.
---
### 6. Implement REST Parity Checks
**Objective:** Periodically verify the accuracy and consistency of the real-time WebSocket data by comparing it with snapshots obtained from the Dhan HQ REST API.
**Python Script:** `05_rest_parity.py`
**Instructions for AI:**
#### Task 6.1: Implement REST quote fetching
**Purpose:** Fetch the latest quote (LTP, volume, OI) for a batch of `securityId`s using the REST API and compare it with the data received via WebSocket. This helps identify discrepancies or data gaps in the real-time feed.
**API Call Details:**
* **Endpoint:** `POST /api/v1/marketfeed/quote`
* **Method:** `POST`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Request Body:** JSON object with `securityIds` (list of strings) and `exchangeSegment`.
* **Limits:** Dhan HQ API typically limits `securityIds` to 1000 per request and has rate limits (e.g., 1 request per second).
* **Expected Response:** A JSON array of quote objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import json
import requests
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime
import pytz
import time
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# --- Dual API Client (Conceptual - will be fully implemented in Phase 7) ---
# For now, assume a simple client that uses the first API key.
# This client will be replaced by the robust dual API client in Phase 7.
class SimpleDhanAPIClient:
def __init__(self, access_token, client_id):
self.base_url = "https://api.dhan.co/" # Verify Dhan HQ base URL
self.headers = {
"access-token": access_token,
"x-dhan-client-id": client_id,
"Content-Type": "application/json"
}
def get_market_quote(self, security_ids, exchange_segment):
url = f"{self.base_url}api/v1/marketfeed/quote"
payload = {
"securityIds": security_ids,
"exchangeSegment": exchange_segment
}
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
# Initialize a simple API client (will be replaced by dual API client later)
api_client = SimpleDhanAPIClient(
os.getenv("DHAN_ACCESS_TOKEN_1"),
os.getenv("DHAN_CLIENT_ID_1")
)
def perform_rest_parity_check(api_client, db_client, security_ids_to_check, exchange_segment="NSE_FNO"):
print(f"Performing REST parity check for {len(security_ids_to_check)} instruments...")
if not security_ids_to_check:
print("No security IDs provided for parity check.")
return
# Split security_ids into chunks of 1000 to respect API limits
chunk_size = 1000
all_quotes = []
for i in range(0, len(security_ids_to_check), chunk_size):
chunk = security_ids_to_check[i:i + chunk_size]
try:
quotes = api_client.get_market_quote(chunk, exchange_segment)
all_quotes.extend(quotes)
time.sleep(1) # Respect rate limit (1 request per second)
except requests.exceptions.RequestException as e:
print(f"Error fetching quotes for chunk: {e}")
# AI should implement more sophisticated retry logic here
break
if not all_quotes:
print("No quotes fetched from REST API.")
return
# Compare with latest WebSocket data (conceptual)
# In a real scenario, you would fetch the latest data from live_ticks_full for these securityIds
# and compare LTP, OI, etc.
discrepancies = []
for quote in all_quotes:
sec_id = str(quote.get("securityId"))
rest_ltp = quote.get("ltp")
rest_oi = quote.get("openInterest")
# Fetch latest from MongoDB live_ticks_full
latest_ws_data = db_client.live_ticks_full.find_one(
{"securityId": sec_id},
sort=[("ts_event_ist", -1)] # Get the most recent tick
)
if latest_ws_data:
ws_ltp = latest_ws_data.get("ltp")
ws_oi = latest_ws_data.get("openInterest")
# Simple comparison (AI should define acceptable tolerance)
if rest_ltp is not None and ws_ltp is not None and abs(rest_ltp - ws_ltp) > 0.1: # Example tolerance
discrepancies.append(f"LTP mismatch for {sec_id}: REST={rest_ltp}, WS={ws_ltp}")
if rest_oi is not None and ws_oi is not None and abs(rest_oi - ws_oi) > 0: # OI should match exactly
discrepancies.append(f"OI mismatch for {sec_id}: REST={rest_oi}, WS={ws_oi}")
else:
discrepancies.append(f"No WebSocket data found for {sec_id}")
if discrepancies:
print("Discrepancies found:")
for disc in discrepancies:
print(f"- {disc}")
# Store discrepancies in a dedicated collection for analysis
db_client.ws_health.insert_one({
"timestamp": datetime.now(IST).replace(tzinfo=None),
"type": "parity_check",
"discrepancies": discrepancies,
"num_checked": len(security_ids_to_check),
"num_discrepancies": len(discrepancies)
})
else:
print("No significant discrepancies found.")
db_client.ws_health.insert_one({
"timestamp": datetime.now(IST).replace(tzinfo=None),
"type": "parity_check",
"discrepancies": [],
"num_checked": len(security_ids_to_check),
"num_discrepancies": 0
})
# Main execution for Task 6.1
# This would typically be called periodically (e.g., every 5-10 minutes) during market hours
# or specifically after a WebSocket re-connection.
# Example usage:
# if __name__ == "__main__":
# # Get a sample of security IDs to check (e.g., from the comprehensive subscription list)
# # For a real scenario, you'd select a representative subset or all active F&O instruments.
# sample_security_ids = ["12345", "67890"] # Placeholder
# # In a full script, you'd fetch this from your derivative_views or universes
# # e.g., from get_comprehensive_subscription_list(db) and take a subset
# perform_rest_parity_check(api_client, db, sample_security_ids, "NSE_FNO")
# client.close()
```
**Expected Output (CLI):**
```
Performing REST parity check for XXX instruments...
No significant discrepancies found.
```
OR
```
Performing REST parity check for XXX instruments...
Discrepancies found:
- LTP mismatch for 12345: REST=100.0, WS=100.1
- OI mismatch for 67890: REST=5000, WS=5001
```
**Expected Output (MongoDB - `ws_health` collection):**
* Documents recording the results of each parity check.
* Example document structure:
```json
{
"_id": ObjectId("..."),
"timestamp": ISODate("2025-08-08T10:40:00.000Z"),
"type": "parity_check",
"discrepancies": [
"LTP mismatch for 12345: REST=100.0, WS=100.1"
],
"num_checked": 2,
"num_discrepancies": 1
}
```
**Validation (for AI):**
* **CLI Check:** Observe the output indicating whether discrepancies were found.
* **MongoDB Check:**
* Query `db.ws_health.count_documents({})` to ensure parity check results are being logged.
* Inspect documents in `ws_health` to understand the nature and frequency of discrepancies.
* Verify that `num_discrepancies` accurately reflects the `discrepancies` array length.
---
## Phase 3: Historical Data for Context and Analysis
### 7. Fetch Historical Intraday Data
**Objective:** Acquire and store historical intraday OHLCV (Open, High, Low, Close, Volume) data, including Open Interest (OI) for F&O instruments, to enable technical analysis and backtesting.
**Python Script:** `06_historical_intraday.py`
**Instructions for AI:**
#### Task 7.1: Fetch 1-minute intraday data
**Purpose:** Obtain granular 1-minute historical data for all instruments in the comprehensive subscription list, focusing on recent trading days for detailed analysis.
**API Call Details:**
* **Endpoint:** `GET /api/v1/historical/intraday`
* **Method:** `GET`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Parameters:**
* `securityId`: The unique identifier of the instrument.
* `fromDate`: Start date for the historical data (YYYY-MM-DD).
* `toDate`: End date for the historical data (YYYY-MM-DD).
* `interval`: `1` (for 1-minute data).
* `includeOI`: `true` (for F&O instruments to get Open Interest).
* **Limits:** Dhan HQ API typically limits intraday historical data to 90 days per request. Pagination is required for longer periods.
* **Expected Response:** A JSON array of OHLCV objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import json
import requests
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
from datetime import datetime, timedelta
import pytz
import time
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# --- Dual API Client (Conceptual - will be fully implemented in Phase 7) ---
# This client will be replaced by the robust dual API client in Phase 7.
class SimpleDhanAPIClient:
def __init__(self, access_token, client_id):
self.base_url = "https://api.dhan.co/" # Verify Dhan HQ base URL
self.headers = {
"access-token": access_token,
"x-dhan-client-id": client_id,
"Content-Type": "application/json"
}
def get_intraday_historical(self, security_id, from_date, to_date, interval, include_oi=False):
url = f"{self.base_url}api/v1/historical/intraday"
params = {
"securityId": security_id,
"fromDate": from_date.strftime("%Y-%m-%d"),
"toDate": to_date.strftime("%Y-%m-%d"),
"interval": interval,
"includeOI": str(include_oi).lower() # Convert boolean to string 'true' or 'false'
}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
# Initialize a simple API client (will be replaced by dual API client later)
api_client = SimpleDhanAPIClient(
os.getenv("DHAN_ACCESS_TOKEN_1"),
os.getenv("DHAN_CLIENT_ID_1")
)
def fetch_and_store_intraday_historical(
security_id,
db_client,
api_client,
interval="1",
lookback_days=5
):
print(f"Fetching {interval}-min intraday historical for {security_id}...")
today = datetime.now(IST).date()
start_date = today - timedelta(days=lookback_days)
# Determine if the instrument is F&O to include OI
instrument_info = db_client.instruments_master.find_one({"securityId": security_id})
is_fno = instrument_info and instrument_info.get("exchangeSegment") == "NSE_FNO"
current_date = today
while current_date >= start_date:
from_date_chunk = max(start_date, current_date - timedelta(days=89)) # Max 90 days per call
to_date_chunk = current_date
print(f" Fetching from {from_date_chunk.strftime('%Y-%m-%d')} to {to_date_chunk.strftime('%Y-%m-%d')}...")
try:
historical_data = api_client.get_intraday_historical(
security_id, from_date_chunk, to_date_chunk, interval, include_oi=is_fno
)
if historical_data:
operations = []
for doc in historical_data:
# Convert timestamp string to datetime object for proper indexing/sorting
# Assuming timestamp is in ISO format or epoch milliseconds
if "timestamp" in doc and isinstance(doc["timestamp"], str):
# Example:
Dhan HQ returns epoch milliseconds
if isinstance(doc["timestamp"], (int, float)):
doc["timestamp"] = datetime.fromtimestamp(doc["timestamp"] / 1000, tz=IST).replace(tzinfo=None)
else: # Assume ISO format string
doc["timestamp"] = datetime.fromisoformat(doc["timestamp"]).astimezone(IST).replace(tzinfo=None)
operations.append(
UpdateOne(
{"securityId": doc["securityId"], "timestamp": doc["timestamp"]},
{"$set": doc},
upsert=True
)
)
if operations:
db_client.historical_intraday.bulk_write(operations, ordered=False)
print(f" Upserted {len(historical_data)} {interval}-min records for {security_id}.")
else:
print(f" No {interval}-min data for {security_id} from {from_date_chunk} to {to_date_chunk}.")
except requests.exceptions.RequestException as e:
print(f" Error fetching {interval}-min historical for {security_id}: {e}")
# AI should implement retry logic here
current_date = from_date_chunk - timedelta(days=1) # Move to the previous day/chunk
# Main execution for Task 7.1
# if __name__ == "__main__":
# # Example: Fetch 1-min data for NIFTY index
# nifty_info = db.universes.find_one({"_id": "MAJOR_INDICES"})
# if nifty_info:
# nifty_security_id = next((item["securityId"] for item in nifty_info["members"] if item["symbol"] == "NIFTY"), None)
# if nifty_security_id:
# fetch_and_store_intraday_historical(nifty_security_id, db, api_client, interval="1", lookback_days=5)
# client.close()
```
**Expected Output (CLI):**
```
Fetching 1-min intraday historical for XXXXX...
Fetching from YYYY-MM-DD to YYYY-MM-DD...
Upserted ZZZ 1-min records for XXXXX.
Fetching from YYYY-MM-DD to YYYY-MM-DD...
Upserted ZZZ 1-min records for XXXXX.
... (repeats for each 90-day chunk and instrument)
```
**Expected Output (MongoDB - `historical_intraday` collection):**
* Documents representing 1-minute OHLCV data.
* Example document structure (simplified):
```json
{
"_id": ObjectId("..."),
"securityId": "12345",
"timestamp": ISODate("2025-08-08T10:30:00.000Z"),
"open": 100.0,
"high": 100.5,
"low": 99.8,
"close": 100.2,
"volume": 5000,
"openInterest": 123456 // Present for F&O, null/absent for equities
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Observe the fetching and upsert messages for each instrument and date range.
* **MongoDB Check:**
* Query `db.historical_intraday.count_documents({})` to ensure documents are accumulating.
* Verify that `timestamp` is an IST datetime object.
* For F&O instruments, verify `openInterest` is present and non-null.
* For equity instruments, verify `openInterest` is `None` or absent.
* Check for data continuity and absence of gaps for a few sample instruments.
* Ensure `securityId` and `timestamp` form a unique key (no duplicates).
#### Task 7.2: Fetch 5-minute intraday data
**Purpose:** Obtain 5-minute historical data for all instruments in the comprehensive subscription list, covering a broader period for trend analysis and less granular indicator calculations.
**API Call Details:**
* **Endpoint:** `GET /api/v1/historical/intraday`
* **Method:** `GET`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Parameters:**
* `securityId`: The unique identifier of the instrument.
* `fromDate`: Start date for the historical data (YYYY-MM-DD).
* `toDate`: End date for the historical data (YYYY-MM-DD).
* `interval`: `5` (for 5-minute data).
* `includeOI`: `true` (for F&O instruments to get Open Interest).
* **Limits:** Dhan HQ API typically limits intraday historical data to 90 days per request. Pagination is required for longer periods.
* **Expected Response:** A JSON array of OHLCV objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports, MongoDB client, and SimpleDhanAPIClient from Task 7.1)
# Main execution for Task 7.2
# if __name__ == "__main__":
# # Example: Fetch 5-min data for NIFTY index and a sample stock
# nifty_info = db.universes.find_one({"_id": "MAJOR_INDICES"})
# if nifty_info:
# nifty_security_id = next((item["securityId"] for item in nifty_info["members"] if item["symbol"] == "NIFTY"), None)
# if nifty_security_id:
# fetch_and_store_intraday_historical(nifty_security_id, db, api_client, interval="5", lookback_days=30)
# # Example for a stock (assuming RELIANCE securityId is known)
# reliance_info = db.instruments_master.find_one({"symbol": "RELIANCE", "exchangeSegment": "NSE_EQ"})
# if reliance_info:
# reliance_security_id = reliance_info["securityId"]
# fetch_and_store_intraday_historical(reliance_security_id, db, api_client, interval="5", lookback_days=30)
# client.close()
```
**Expected Output (CLI):**
```
Fetching 5-min intraday historical for XXXXX...
Fetching from YYYY-MM-DD to YYYY-MM-DD...
Upserted ZZZ 5-min records for XXXXX.
... (repeats for each 90-day chunk and instrument)
```
**Expected Output (MongoDB - `historical_intraday` collection):**
* Documents representing 5-minute OHLCV data, upserted into the same `historical_intraday` collection as 1-minute data.
* Example document structure (similar to 1-min, but with 5-min `timestamp` intervals):
```json
{
"_id": ObjectId("..."),
"securityId": "12345",
"timestamp": ISODate("2025-08-08T10:30:00.000Z"),
"open": 100.0,
"high": 100.5,
"low": 99.8,
"close": 100.2,
"volume": 5000,
"openInterest": 123456 // Present for F&O, null/absent for equities
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Observe the fetching and upsert messages for each instrument and date range.
* **MongoDB Check:**
* Query `db.historical_intraday.count_documents({})` to ensure documents are accumulating.
* Verify that `timestamp` is an IST datetime object and that the intervals are indeed 5 minutes.
* For F&O instruments, verify `openInterest` is present and non-null.
* Ensure `securityId` and `timestamp` form a unique key (no duplicates), confirming idempotent upserts.
---
### 8. Fetch Daily Historical Data
**Objective:** Acquire and store daily OHLCV data for all relevant instruments to provide a longer-term historical context for trend analysis and strategy development.
**Python Script:** `07_historical_daily.py`
**Instructions for AI:**
#### Task 8.1: Fetch daily OHLCV data
**Purpose:** Obtain daily historical data for all instruments in the comprehensive subscription list, covering a period of at least 90 days.
**API Call Details:**
* **Endpoint:** `GET /api/v1/historical/daily`
* **Method:** `GET`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Parameters:**
* `securityId`: The unique identifier of the instrument.
* `fromDate`: Start date for the historical data (YYYY-MM-DD).
* `toDate`: End date for the historical data (YYYY-MM-DD).
* **Limits:** Dhan HQ API might have limits on the date range for daily historical data. Handle pagination if necessary.
* **Expected Response:** A JSON array of OHLCV objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import json
import requests
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
from datetime import datetime, timedelta
import pytz
import time
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# --- Dual API Client (Conceptual - will be fully implemented in Phase 7) ---
# This client will be replaced by the robust dual API client in Phase 7.
class SimpleDhanAPIClient:
def __init__(self, access_token, client_id):
self.base_url = "https://api.dhan.co/" # Verify Dhan HQ base URL
self.headers = {
"access-token": access_token,
"x-dhan-client-id": client_id,
"Content-Type": "application/json"
}
def get_daily_historical(self, security_id, from_date, to_date):
url = f"{self.base_url}api/v1/historical/daily"
params = {
"securityId": security_id,
"fromDate": from_date.strftime("%Y-%m-%d"),
"toDate": to_date.strftime("%Y-%m-%d")
}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
# Initialize a simple API client (will be replaced by dual API client later)
api_client = SimpleDhanAPIClient(
os.getenv("DHAN_ACCESS_TOKEN_1"),
os.getenv("DHAN_CLIENT_ID_1")
)
def fetch_and_store_daily_historical(
security_id,
db_client,
api_client,
lookback_days=90
):
print(f"Fetching daily historical for {security_id}...")
today = datetime.now(IST).date()
start_date = today - timedelta(days=lookback_days)
try:
historical_data = api_client.get_daily_historical(
security_id, start_date, today
)
if historical_data:
operations = []
for doc in historical_data:
# Convert date string to datetime object for proper indexing/sorting
if "date" in doc and isinstance(doc["date"], str):
doc["date"] = datetime.strptime(doc["date"], "%Y-%m-%d").date() # Store as date object
operations.append(
UpdateOne(
{"securityId": doc["securityId"], "date": doc["date"]},
{"$set": doc},
upsert=True
)
)
if operations:
db_client.historical_daily.bulk_write(operations, ordered=False)
print(f" Upserted {len(historical_data)} daily records for {security_id}.")
else:
print(f" No daily data for {security_id} from {start_date} to {today}.")
except requests.exceptions.RequestException as e:
print(f" Error fetching daily historical for {security_id}: {e}")
# AI should implement retry logic here
# Main execution for Task 8.1
# if __name__ == "__main__":
# # Example: Fetch daily data for NIFTY index and a sample stock
# nifty_info = db.universes.find_one({"_id": "MAJOR_INDICES"})
# if nifty_info:
# nifty_security_id = next((item["securityId"] for item in nifty_info["members"] if item["symbol"] == "NIFTY"), None)
# if nifty_security_id:
# fetch_and_store_daily_historical(nifty_security_id, db, api_client, lookback_days=90)
# # Example for a stock (assuming RELIANCE securityId is known)
# reliance_info = db.instruments_master.find_one({"symbol": "RELIANCE", "exchangeSegment": "NSE_EQ"})
# if reliance_info:
# reliance_security_id = reliance_info["securityId"]
# fetch_and_store_daily_historical(reliance_security_id, db, api_client, lookback_days=90)
# client.close()
```
**Expected Output (CLI):**
```
Fetching daily historical for XXXXX...
Upserted ZZZ daily records for XXXXX.
... (repeats for each instrument)
```
**Expected Output (MongoDB - `historical_daily` collection):**
* Documents representing daily OHLCV data.
* Example document structure (simplified):
```json
{
"_id": ObjectId("..."),
"securityId": "12345",
"date": ISODate("2025-08-08T00:00:00.000Z"), // Stored as date object
"open": 100.0,
"high": 100.5,
"low": 99.8,
"close": 100.2,
"volume": 500000,
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Observe the fetching and upsert messages for each instrument.
* **MongoDB Check:**
* Query `db.historical_daily.count_documents({})` to ensure documents are accumulating.
* Verify that `date` is stored as a date object (or a datetime object with time set to midnight).
* Check for data continuity and absence of gaps for a few sample instruments over the 90-day period.
* Ensure `securityId` and `date` form a unique key (no duplicates).
---
## Phase 4: Option Chain Data (Critical for Options Trading)
### 9. Implement Option Chain Fetching (Expanded Scope)
**Objective:** Periodically fetch comprehensive option chain data, including Greeks, Implied Volatility (IV), and detailed strike-wise information, for all relevant indices and F&O stocks. This data is crucial for options strategy analysis and model training.
**Python Script:** `08_option_chains.py`
**Instructions for AI:**
#### Task 9.1: Discover available expiries
**Purpose:** Identify the relevant expiry dates for NIFTY, BANKNIFTY, FINNIFTY, and all F&O stocks that are constituents of Nifty 50, Bank Nifty, and Fin Nifty. This will typically involve finding current weekly, next weekly, and current monthly expiries.
**API Call Details:** No direct API call for expiry discovery. This task primarily queries the `instruments_master` collection and applies date logic.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime, timedelta
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def get_current_ist_date():
return datetime.now(IST).date()
def discover_expiries(db_client):
print("Discovering relevant expiries...")
expiries_map = {}
today = get_current_ist_date()
# Get all underlying symbols for which we need option chains
underlying_symbols = set()
major_indices_doc = db_client.universes.find_one({"_id": "MAJOR_INDICES"})
if major_indices_doc:
for member in major_indices_doc["members"]:
underlying_symbols.add(member["symbol"])
all_constituent_stocks_doc = db_client.universes.find_one({"_id": "ALL_INDEX_CONSTITUENT_STOCKS"})
if all_constituent_stocks_doc:
for member in all_constituent_stocks_doc["members"]:
underlying_symbols.add(member["symbol"])
for symbol in underlying_symbols:
# Find all OPTIDX/OPTSTK contracts for this symbol
options = list(db_client.instruments_master.find({
"symbol": symbol,
"instrument": {"$in": ["OPTIDX", "OPTSTK"]},
"exchangeSegment": "NSE_FNO"
}).sort("expiryDate", 1)) # Sort by expiry date ascending
relevant_expiries = []
for opt in options:
expiry_date_str = opt["expiryDate"]
expiry_date = datetime.strptime(expiry_date_str, "%Y-%m-%d").date()
if expiry_date >= today: # Only consider future expiries
relevant_expiries.append(expiry_date_str)
# Get unique and sorted expiries
unique_expiries = sorted(list(set(relevant_expiries)))
current_weekly = None
next_weekly = None
current_monthly = None
for exp_str in unique_expiries:
exp_date = datetime.strptime(exp_str, "%Y-%m-%d").date()
# Determine weekly expiries (Nifty/BankNifty: Thursday, FinNifty: Tuesday, Stocks: Friday)
# This logic needs to be robust and handle exceptions
is_weekly_expiry = False
if symbol in ["NIFTY", "BANKNIFTY"] and exp_date.weekday() == 3: # Thursday
is_weekly_expiry = True
elif symbol == "FINNIFTY" and exp_date.weekday() == 1: # Tuesday
is_weekly_expiry = True
elif symbol not in ["NIFTY", "BANKNIFTY", "FINNIFTY"] and exp_date.weekday() == 4: # Friday for stocks
is_weekly_expiry = True
# Determine monthly expiry (last Thursday of the month)
# This is a common rule, but verify with Dhan HQ docs for exactness
is_monthly_expiry = False
if exp_date.weekday() == 3: # Thursday
# Check if it's the last Thursday of the month
next_week = exp_date + timedelta(days=7)
if next_week.month != exp_date.month:
is_monthly_expiry = True
if is_weekly_expiry:
if current_weekly is None:
current_weekly = exp_str
elif next_weekly is None and exp_str != current_weekly:
next_weekly = exp_str
if is_monthly_expiry:
if current_monthly is None:
current_monthly = exp_str
if current_weekly and next_weekly and current_monthly: # Found all relevant expiries
break
expiries_map[symbol] = {
"currentWeekly": current_weekly,
"nextWeekly": next_weekly,
"currentMonthly": current_monthly
}
if not (current_weekly or next_weekly or current_monthly):
print(f"Warning: No relevant expiries found for {symbol}")
# Store discovered expiries in derivative_views for easy access
db_client.derivative_views.update_one(
{"_id": "DISCOVERED_EXPIRES"},
{"$set": {"data": expiries_map, "last_updated": datetime.now()}},
upsert=True
)
print("Discovered expiries stored in derivative_views.")
return expiries_map
# Main execution for Task 9.1
# if __name__ == "__main__":
# discovered_expiries = discover_expiries(db)
# print(json.dumps(discovered_expiries, indent=2))
# client.close()
```
**Expected Output (CLI):**
```
Discovering relevant expiries...
Discovered expiries stored in derivative_views.
```
**Expected Output (MongoDB - `derivative_views` collection):**
* A document with `_id: "DISCOVERED_EXPIRES"`.
* Example document structure (simplified):
```json
{
"_id": "DISCOVERED_EXPIRES",
"data": {
"NIFTY": {
"currentWeekly": "2025-08-08",
"nextWeekly": "2025-08-15",
"currentMonthly": "2025-08-28"
},
"BANKNIFTY": {
"currentWeekly": "2025-08-08",
"nextWeekly": "2025-08-15",
"currentMonthly": "2025-08-28"
},
"RELIANCE": {
"currentWeekly": "2025-08-08",
"nextWeekly": "2025-08-15",
"currentMonthly": "2025-08-28"
}
// ... for all other relevant F&O stocks
},
"last_updated": ISODate("2025-08-08T10:50:00.000Z")
}
```
**Validation (for AI):**
* **CLI Check:** Confirm the success message.
* **MongoDB Check:**
* Query `db.derivative_views.find_one({"_id": "DISCOVERED_EXPIRES"})`.
* Verify that `data` contains entries for NIFTY, BANKNIFTY, FINNIFTY, and a representative sample of F&O stocks.
* For each symbol, confirm `currentWeekly`, `nextWeekly`, and `currentMonthly` expiries are present and are valid future dates. Pay attention to the correct weekday for weekly expiries (Thursday for Nifty/BankNifty, Tuesday for FinNifty, Friday for stocks).
#### Task 9.2: Fetch option chain snapshots
**Purpose:** Fetch the complete option chain for each identified underlying asset and its relevant expiries. This includes all strike prices, along with their LTP, IV, Greeks, OI, volume, and best bid/ask information.
**API Call Details:**
* **Endpoint:** `GET /api/v1/optionchain`
* **Method:** `GET`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Parameters:**
* `underlyingSecurityId`: The `securityId` of the underlying index or stock.
* `expiryDate`: The specific expiry date (YYYY-MM-DD).
* **Limits:** This API is typically rate-limited. **This is where the dual API accounts will significantly help.** Implement strict throttling and intelligent API key rotation.
* **Expected Response:** A JSON object containing `callOptions` and `putOptions` arrays.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import json
import requests
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
from datetime import datetime, timedelta
import pytz
import time
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# --- Dual API Client (Conceptual - will be fully implemented in Phase 7) ---
# This client will be replaced by the robust dual API client in Phase 7.
class SimpleDhanAPIClient:
def __init__(self, access_token, client_id):
self.base_url = "https://api.dhan.co/" # Verify Dhan HQ base URL
self.headers = {
"access-token": access_token,
"x-dhan-client-id": client_id,
"Content-Type": "application/json"
}
def get_option_chain(self, underlying_security_id, expiry_date):
url = f"{self.base_url}api/v1/optionchain"
params = {
"underlyingSecurityId": underlying_security_id,
"expiryDate": expiry_date
}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
# Initialize a simple API client (will be replaced by dual API client later)
api_client = SimpleDhanAPIClient(
os.getenv("DHAN_ACCESS_TOKEN_1"),
os.getenv("DHAN_CLIENT_ID_1")
)
def fetch_and_store_option_chain(
underlying_symbol,
underlying_security_id,
expiry_date_str,
db_client,
api_client
):
print(f"Fetching option chain for {underlying_symbol} (ID: {underlying_security_id}) expiring {expiry_date_str}...")
try:
option_chain_data = api_client.get_option_chain(underlying_security_id, expiry_date_str)
snapshot_ts_ist = datetime.now(IST).replace(tzinfo=None)
doc_to_insert = {
"underlyingSymbol": underlying_symbol,
"underlyingSecurityId": underlying_security_id,
"expiry": expiry_date_str,
"snapshot_ts_ist": snapshot_ts_ist,
"legs": []
}
# Process Call Options
for call_leg in option_chain_data.get("callOptions", []):
doc_to_insert["legs"].append({"type": "CE", **call_leg})
# Process Put Options
for put_leg in option_chain_data.get("putOptions", []):
doc_to_insert["legs"].append({"type": "PE", **put_leg})
# Upsert the option chain snapshot
db_client.option_chain_data.update_one(
{
"underlyingSecurityId": underlying_security_id,
"expiry": expiry_date_str,
"snapshot_ts_ist": snapshot_ts_ist # Use snapshot time as part of unique key
},
{"$set": doc_to_insert},
upsert=True
)
print(f" Option chain snapshot for {underlying_symbol} {expiry_date_str} inserted.")
except requests.exceptions.RequestException as e:
print(f" Error fetching option chain for {underlying_symbol} {expiry_date_str}: {e}")
# AI should implement retry logic and rate limit handling here.
def fetch_all_relevant_option_chains(db_client, api_client):
print("Fetching all relevant option chains...")
discovered_expiries_doc = db_client.derivative_views.find_one({"_id": "DISCOVERED_EXPIRES"})
if not discovered_expiries_doc:
print("DISCOVERED_EXPIRES not found. Please complete Task 9.1 first.")
return
expiries_map = discovered_expiries_doc["data"]
for symbol, expiries in expiries_map.items():
# Get underlying securityId for the symbol
underlying_info = db_client.instruments_master.find_one({"symbol": symbol, "instrument": {"$in": ["INDEX", "EQUITY"]}})
if not underlying_info:
print(f"Warning: Underlying instrument info not found for {symbol}. Skipping option chain fetch.")
continue
underlying_security_id = underlying_info["securityId"]
for expiry_type, expiry_date_str in expiries.items():
if expiry_date_str:
fetch_and_store_option_chain(
symbol, underlying_security_id, expiry_date_str, db_client, api_client
)
time.sleep(3) # Implement throttling to avoid rate limits.
# This will be managed by the dual API client in Phase 7.
# Main execution for Task 9.2
# if __name__ == "__main__":
# db.option_chain_data.create_index([("underlyingSecurityId", 1), ("expiry", 1), ("snapshot_ts_ist", 1)], unique=True)
# fetch_all_relevant_option_chains(db, api_client)
# client.close()
```
**Expected Output (CLI):**
```
Fetching all relevant option chains...
Fetching option chain for NIFTY (ID: 99999) expiring 2025-08-08...
Option chain snapshot for NIFTY 2025-08-08 inserted.
Fetching option chain for NIFTY (ID: 99999) expiring 2025-08-15...
Option chain snapshot for NIFTY 2025-08-15 inserted.
... (repeats for all symbols and expiries)
```
**Expected Output (MongoDB - `option_chain_data` collection):**
* Documents representing option chain snapshots.
* Example document structure (simplified):
```json
{
"_id": ObjectId("..."),
"underlyingSymbol": "NIFTY",
"underlyingSecurityId": "99999",
"expiry": "2025-08-08",
"snapshot_ts_ist": ISODate("2025-08-08T11:00:00.000Z"),
"legs": [
{ // Call Option
"type": "CE",
"strikePrice": 22000.0,
"ltp": 150.0,
"iv": 15.5,
"delta": 0.5,
"gamma": 0.001,
"theta": -0.5,
"vega": 0.8,
"volume": 100000,
"openInterest": 500000,
"bestBidPrice": 149.5,
"bestBidQty": 100,
"bestAskPrice": 150.5,
"bestAskQty": 100
},
{ // Put Option
"type": "PE",
"strikePrice": 22000.0,
"ltp": 100.0,
"iv": 16.0,
"delta": -0.5,
"gamma": 0.001,
"theta": -0.4,
"vega": 0.7,
"volume": 80000,
"openInterest": 400000,
"bestBidPrice": 99.5,
"bestBidQty": 100,
"bestAskPrice": 100.5,
"bestAskQty": 100
}
// ... many more strike prices
]
}
```
**Validation (for AI):**
* **CLI Check:** Observe the fetching and insertion messages for each option chain.
* **MongoDB Check:**
* Query `db.option_chain_data.count_documents({})` to ensure documents are accumulating.
* Verify that `underlyingSecurityId`, `expiry`, and `snapshot_ts_ist` form a unique key.
* Inspect a few documents to ensure `legs` array contains both CE and PE options, and key fields like `ltp`, `iv`, `delta`, `openInterest` are populated.
* Confirm that the `snapshot_ts_ist` reflects the time of data capture.
---
## Phase 5: Account and Position Management
### 10. Implement Account Data Fetching
**Objective:** Retrieve and store critical account-related information, including current holdings, open positions, order history, and trade history. This data is essential for portfolio tracking, risk management, and performance analysis.
**Python Script:** `09_account_data.py`
**Instructions for AI:**
#### Task 10.1: Fetch current positions
**Purpose:** Obtain a snapshot of all current open positions (intraday and delivery) for both your primary and secondary Dhan accounts. This allows for real-time monitoring of exposure and P&L.
**API Call Details:**
* **Endpoint:** `GET /api/v1/positions` (for open positions) or `GET /api/v1/holdings` (for delivery holdings).
* **Method:** `GET`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Parameters:** None for positions. For holdings, `segment` might be an optional parameter.
* **Expected Response:** A JSON array of position/holding objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import json
import requests
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
from datetime import datetime
import pytz
import time
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# --- Dual API Client (Conceptual - will be fully implemented in Phase 7) ---
# This client will be replaced by the robust dual API client in Phase 7.
class SimpleDhanAPIClient:
def __init__(self, access_token, client_id):
self.base_url = "https://api.dhan.co/" # Verify Dhan HQ base URL
self.headers = {
"access-token": access_token,
"x-dhan-client-id": client_id,
"Content-Type": "application/json"
}
def get_positions(self):
url = f"{self.base_url}api/v1/positions"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def get_holdings(self):
url = f"{self.base_url}api/v1/holdings"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
# Initialize simple API clients for both accounts (will be replaced by dual API client later)
api_client_1 = SimpleDhanAPIClient(
os.getenv("DHAN_ACCESS_TOKEN_1"),
os.getenv("DHAN_CLIENT_ID_1")
)
api_client_2 = SimpleDhanAPIClient(
os.getenv("DHAN_ACCESS_TOKEN_2"),
os.getenv("DHAN_CLIENT_ID_2")
)
def fetch_and_store_positions(db_client, api_client, account_id):
print(f"Fetching positions for account {account_id}...")
try:
positions_data = api_client.get_positions()
holdings_data = api_client.get_holdings()
snapshot_ts_ist = datetime.now(IST).replace(tzinfo=None)
# Combine positions and holdings, add account_id and timestamp
all_account_data = []
for pos in positions_data:
pos["account_id"] = account_id
pos["snapshot_ts_ist"] = snapshot_ts_ist
pos["type"] = "position"
all_account_data.append(pos)
for holding in holdings_data:
holding["account_id"] = account_id
holding["snapshot_ts_ist"] = snapshot_ts_ist
holding["type"] = "holding"
all_account_data.append(holding)
if all_account_data:
# Use bulk upsert for efficiency, using a combination of fields as unique key
operations = []
for doc in all_account_data:
# For positions, use securityId, productType, buyOrSell, and snapshot_ts_ist as unique key
# For holdings, use securityId and snapshot_ts_ist as unique key
if doc["type"] == "position":
unique_filter = {
"account_id": doc["account_id"],
"type": doc["type"],
"securityId": doc["securityId"],
"productType": doc["productType"],
"buyOrSell": doc["buyOrSell"],
"snapshot_ts_ist": doc["snapshot_ts_ist"]
}
else: # holding
unique_filter = {
"account_id": doc["account_id"],
"type": doc["type"],
"securityId": doc["securityId"],
"snapshot_ts_ist": doc["snapshot_ts_ist"]
}
operations.append(
UpdateOne(
unique_filter,
{"$set": doc},
upsert=True
)
)
db_client.account_positions.bulk_write(operations, ordered=False)
print(f"Successfully fetched and stored {len(all_account_data)} positions/holdings for account {account_id}.")
else:
print(f"No positions or holdings found for account {account_id}.")
except requests.exceptions.RequestException as e:
print(f"Error fetching positions for account {account_id}: {e}")
# AI should implement retry logic here
# Main execution for Task 10.1
# if __name__ == "__main__":
# db.account_positions.create_index([("account_id", 1), ("type", 1), ("securityId", 1), ("snapshot_ts_ist", 1)], unique=True)
# fetch_and_store_positions(db, api_client_1, os.getenv("DHAN_CLIENT_ID_1"))
# fetch_and_store_positions(db, api_client_2, os.getenv("DHAN_CLIENT_ID_2"))
# client.close()
```
**Expected Output (CLI):**
```
Fetching positions for account your_primary_client_id...
Successfully fetched and stored X positions/holdings for account your_primary_client_id.
Fetching positions for account your_secondary_client_id...
Successfully fetched and stored Y positions/holdings for account your_secondary_client_id.
```
**Expected Output (MongoDB - `account_positions` collection):**
* Documents representing positions and holdings, tagged with `account_id`.
* Example position document structure (simplified):
```json
{
"_id": ObjectId("..."),
"account_id": "your_primary_client_id",
"type": "position",
"securityId": "12345",
"tradingSymbol": "NIFTY24AUGFUT",
"productType": "INTRADAY",
"buyOrSell": "BUY",
"quantity": 50,
"averagePrice": 22000.50,
"ltp": 22001.00,
"realizedProfit": 0.0,
"unrealizedProfit": 25.0,
"snapshot_ts_ist": ISODate("2025-08-08T11:10:00.000Z")
// ... other position fields
}
```
* Example holding document structure (simplified):
```json
{
"_id": ObjectId("..."),
"account_id": "your_secondary_client_id",
"type": "holding",
"securityId": "1000",
"tradingSymbol": "RELIANCE",
"quantity": 10,
"averagePrice": 2500.00,
"ltp": 2505.00,
"unrealizedProfit": 50.0,
"snapshot_ts_ist": ISODate("2025-08-08T11:10:00.000Z")
// ... other holding fields
}
```
**Validation (for AI):**
* **CLI Check:** Confirm success messages for both accounts.
* **MongoDB Check:**
* Query `db.account_positions.count_documents({})` to ensure data is present.
* Verify that documents from both `account_id`s are stored.
* Inspect sample documents to ensure `type` is correctly set (`position` or `holding`) and relevant fields like `quantity`, `ltp`, `unrealizedProfit` are populated.
* Ensure `snapshot_ts_ist` is correctly recorded.
#### Task 10.2: Fetch order history
**Purpose:** Retrieve a historical record of all orders placed through both Dhan accounts. This is crucial for auditing, understanding trading activity, and analyzing order execution.
**API Call Details:**
* **Endpoint:** `GET /api/v1/orders`
* **Method:** `GET`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Parameters:** Optional parameters like `fromDate`, `toDate`, `status` to filter orders.
* **Expected Response:** A JSON array of order objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports, MongoDB client, and SimpleDhanAPIClient from Task 10.1)
def fetch_and_store_order_history(db_client, api_client, account_id, from_date=None, to_date=None):
print(f"Fetching order history for account {account_id}...")
try:
# You might want to fetch orders for a specific period, e.g., last 7 days
if not to_date: to_date = datetime.now(IST).date()
if not from_date: from_date = to_date - timedelta(days=7) # Last 7 days by default
orders_data = api_client.get_orders() # Assuming get_orders can take from/to date params if needed
snapshot_ts_ist = datetime.now(IST).replace(tzinfo=None)
if orders_data:
operations = []
for order in orders_data:
order["account_id"] = account_id
order["snapshot_ts_ist"] = snapshot_ts_ist
# Use orderId as the unique key for orders
operations.append(
UpdateOne(
{"account_id": order["account_id"], "orderId": order["orderId"]},
{"$set": order},
upsert=True
)
)
db_client.order_history.bulk_write(operations, ordered=False)
print(f"Successfully fetched and stored {len(orders_data)} orders for account {account_id}.")
else:
print(f"No orders found for account {account_id}.")
except requests.exceptions.RequestException as e:
print(f"Error fetching order history for account {account_id}: {e}")
# Main execution for Task 10.2
# if __name__ == "__main__":
# db.order_history.create_index([("account_id", 1), ("orderId", 1)], unique=True)
# fetch_and_store_order_history(db, api_client_1, os.getenv("DHAN_CLIENT_ID_1"))
# fetch_and_store_order_history(db, api_client_2, os.getenv("DHAN_CLIENT_ID_2"))
# client.close()
```
**Expected Output (CLI):**
```
Fetching order history for account your_primary_client_id...
Successfully fetched and stored X orders for account your_primary_client_id.
Fetching order history for account your_secondary_client_id...
Successfully fetched and stored Y orders for account your_secondary_client_id.
```
**Expected Output (MongoDB - `order_history` collection):**
* Documents representing individual orders, tagged with `account_id`.
* Example order document structure (simplified):
```json
{
"_id": ObjectId("..."),
"account_id": "your_primary_client_id",
"orderId": "1234567890",
"tradingSymbol": "NIFTY24AUGFUT",
"orderType": "MARKET",
"transactionType": "BUY",
"quantity": 50,
"price": 0.0, // For market orders
"orderStatus": "COMPLETE",
"orderTimestamp": ISODate("2025-08-08T11:05:00.000Z"),
"snapshot_ts_ist": ISODate("2025-08-08T11:15:00.000Z")
// ... other order fields
}
```
**Validation (for AI):**
* **CLI Check:** Confirm success messages for both accounts.
* **MongoDB Check:**
* Query `db.order_history.count_documents({})` to ensure data is present.
* Verify that documents from both `account_id`s are stored.
* Inspect sample documents to ensure `orderId`, `tradingSymbol`, `orderStatus`, and `orderTimestamp` are correctly populated.
* Ensure `orderId` and `account_id` form a unique key.
#### Task 10.3: Fetch trade history
**Purpose:** Retrieve a detailed record of all executed trades for both Dhan accounts. This provides granular information on actual fills, prices, and associated charges, essential for accurate P&L calculation and brokerage analysis.
**API Call Details:**
* **Endpoint:** `GET /api/v1/trades`
* **Method:** `GET`
* **Authentication:** Requires `access-token` and `x-dhan-client-id` headers.
* **Parameters:** Optional parameters like `fromDate`, `toDate`.
* **Expected Response:** A JSON array of trade objects.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports, MongoDB client, and SimpleDhanAPIClient from Task 10.1)
def fetch_and_store_trade_history(db_client, api_client, account_id, from_date=None, to_date=None):
print(f"Fetching trade history for account {account_id}...")
try:
# You might want to fetch trades for a specific period, e.g., last 7 days
if not to_date: to_date = datetime.now(IST).date()
if not from_date: from_date = to_date - timedelta(days=7) # Last 7 days by default
trades_data = api_client.get_trades() # Assuming get_trades can take from/to date params if needed
snapshot_ts_ist = datetime.now(IST).replace(tzinfo=None)
if trades_data:
operations = []
for trade in trades_data:
trade["account_id"] = account_id
trade["snapshot_ts_ist"] = snapshot_ts_ist
# Use tradeId (or a combination if tradeId is not unique across accounts/days) as the unique key
# Assuming tradeId is unique per account for simplicity
operations.append(
UpdateOne(
{"account_id": trade["account_id"], "tradeId": trade["tradeId"]},
{"$set": trade},
upsert=True
)
)
db_client.trade_history.bulk_write(operations, ordered=False)
print(f"Successfully fetched and stored {len(trades_data)} trades for account {account_id}.")
else:
print(f"No trades found for account {account_id}.")
except requests.exceptions.RequestException as e:
print(f"Error fetching trade history for account {account_id}: {e}")
# Main execution for Task 10.3
# if __name__ == "__main__":
# db.trade_history.create_index([("account_id", 1), ("tradeId", 1)], unique=True)
# fetch_and_store_trade_history(db, api_client_1, os.getenv("DHAN_CLIENT_ID_1"))
# fetch_and_store_trade_history(db, api_client_2, os.getenv("DHAN_CLIENT_ID_2"))
# client.close()
```
**Expected Output (CLI):**
```
Fetching trade history for account your_primary_client_id...
Successfully fetched and stored X trades for account your_primary_client_id.
Fetching trade history for account your_secondary_client_id...
Successfully fetched and stored Y trades for account your_secondary_client_id.
```
**Expected Output (MongoDB - `trade_history` collection):**
* Documents representing individual trades, tagged with `account_id`.
* Example trade document structure (simplified):
```json
{
"_id": ObjectId("..."),
"account_id": "your_primary_client_id",
"tradeId": "TRD1234567890",
"orderId": "ORD1234567890",
"tradingSymbol": "NIFTY24AUGFUT",
"transactionType": "BUY",
"quantity": 50,
"price": 22000.50,
"tradeTimestamp": ISODate("2025-08-08T11:05:05.000Z"),
"brokerage": 20.0,
"taxes": 5.0,
"snapshot_ts_ist": ISODate("2025-08-08T11:20:00.000Z")
// ... other trade fields
}
```
**Validation (for AI):**
* **CLI Check:** Confirm success messages for both accounts.
* **MongoDB Check:**
* Query `db.trade_history.count_documents({})` to ensure data is present.
* Verify that documents from both `account_id`s are stored.
* Inspect sample documents to ensure `tradeId`, `tradingSymbol`, `quantity`, `price`, and `tradeTimestamp` are correctly populated.
* Ensure `tradeId` and `account_id` form a unique key.
---
## Phase 6: Derived Data and Analytics
### 11. Calculate Technical Indicators
**Objective:** Compute various technical indicators (e.g., SMA, EMA, RSI, MACD, Bollinger Bands) from the stored historical price data. These indicators are crucial for generating trading signals and informing algorithmic strategies.
**Python Script:** `10_technical_indicators.py`
**Instructions for AI:**
#### Task 11.1: Calculate basic indicators
**Purpose:** Calculate fundamental technical indicators like Simple Moving Average (SMA), Exponential Moving Average (EMA), and Relative Strength Index (RSI) for all instruments based on their historical intraday (1-minute and 5-minute) and daily data. These indicators will be stored back into the respective historical data collections.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
import pandas as pd
import pandas_ta as ta
from datetime import datetime, timedelta
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def calculate_and_store_indicators(db_client, collection_name, security_id, time_field, price_field="close"):
print(f"Calculating indicators for {security_id} in {collection_name}...")
# Fetch data for the specific securityId
cursor = db_client[collection_name].find({"securityId": security_id}).sort(time_field, 1)
data = list(cursor)
if not data:
print(f"No data found for {security_id} in {collection_name}. Skipping indicator calculation.")
return
# Convert to Pandas DataFrame
df = pd.DataFrame(data)
df = df.set_index(time_field)
df.index = pd.to_datetime(df.index) # Ensure datetime index
# Ensure the price column is numeric
df[price_field] = pd.to_numeric(df[price_field], errors='coerce')
df.dropna(subset=[price_field], inplace=True)
if df.empty:
print(f"No valid price data for {security_id} in {collection_name}. Skipping indicator calculation.")
return
# Calculate SMA
df.ta.sma(length=10, append=True)
df.ta.sma(length=20, append=True)
df.ta.sma(length=50, append=True)
# Calculate EMA
df.ta.ema(length=12, append=True)
df.ta.ema(length=26, append=True)
# Calculate RSI
df.ta.rsi(length=14, append=True)
# Prepare updates for MongoDB
operations = []
for index, row in df.iterrows():
update_doc = {
f"SMA_{10}": row[f"SMA_{10}"],
f"SMA_{20}": row[f"SMA_{20}"],
f"SMA_{50}": row[f"SMA_{50}"],
f"EMA_{12}": row[f"EMA_{12}"],
f"EMA_{26}": row[f"EMA_{26}"],
f"RSI_{14}": row[f"RSI_{14}"]
}
# Remove NaN values before storing
update_doc = {k: v for k, v in update_doc.items() if pd.notna(v)}
if update_doc:
operations.append(
UpdateOne(
{"securityId": security_id, time_field: index.to_pydatetime()},
{"$set": update_doc}
)
)
if operations:
db_client[collection_name].bulk_write(operations, ordered=False)
print(f"Successfully calculated and stored basic indicators for {security_id} in {collection_name}.")
else:
print(f"No updates to perform for {security_id} in {collection_name}.")
# Main execution for Task 11.1
# if __name__ == "__main__":
# # Example: Calculate indicators for NIFTY index (1-min, 5-min, daily)
# nifty_info = db.universes.find_one({"_id": "MAJOR_INDICES"})
# if nifty_info:
# nifty_security_id = next((item["securityId"] for item in nifty_info["members"] if item["symbol"] == "NIFTY"), None)
# if nifty_security_id:
# calculate_and_store_indicators(db, "historical_intraday", nifty_security_id, "timestamp")
# calculate_and_store_indicators(db, "historical_daily", nifty_security_id, "date", price_field="close")
# # Example: Iterate through all instruments in the comprehensive subscription list
# # and calculate indicators for them.
# # This would involve fetching the comprehensive list (from Task 5.1) and looping.
# # For now, AI should understand to apply this to all relevant instruments.
# client.close()
```
**Expected Output (CLI):**
```
Calculating indicators for XXXXX in historical_intraday...
Successfully calculated and stored basic indicators for XXXXX in historical_intraday.
Calculating indicators for YYYYY in historical_daily...
Successfully calculated and stored basic indicators for YYYYY in historical_daily.
```
**Expected Output (MongoDB - `historical_intraday` and `historical_daily` collections):**
* Existing documents will be updated with new fields for the calculated indicators.
* Example document structure (simplified, for `historical_intraday`):
```json
{
"_id": ObjectId("..."),
"securityId": "12345",
"timestamp": ISODate("2025-08-08T10:30:00.000Z"),
"open": 100.0,
"high": 100.5,
"low": 99.8,
"close": 100.2,
"volume": 5000,
"openInterest": 123456,
"SMA_10": 100.15,
"SMA_20": 100.10,
"SMA_50": 100.05,
"EMA_12": 100.18,
"EMA_26": 100.12,
"RSI_14": 65.23
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Observe the success messages for each instrument and collection.
* **MongoDB Check:**
* Query a few documents in `historical_intraday` and `historical_daily` for instruments that should have indicators calculated.
* Verify the presence of new fields like `SMA_10`, `EMA_12`, `RSI_14`.
* Ensure that `NaN` values (where data is insufficient for calculation, e.g., first few data points for an SMA) are not stored or are handled gracefully.
#### Task 11.2: Calculate advanced indicators
**Purpose:** Compute more complex technical indicators such as MACD (Moving Average Convergence Divergence), Bollinger Bands, ADX (Average Directional Index), and Stochastic Oscillator. These provide deeper insights into momentum, volatility, and trend strength.
**Core Logic for AI (Python Snippet Guidance):**
```python
# ... (re-use imports, MongoDB client from Task 11.1)
def calculate_and_store_advanced_indicators(db_client, collection_name, security_id, time_field, price_field="close"):
print(f"Calculating advanced indicators for {security_id} in {collection_name}...")
cursor = db_client[collection_name].find({"securityId": security_id}).sort(time_field, 1)
data = list(cursor)
if not data:
print(f"No data found for {security_id} in {collection_name}. Skipping advanced indicator calculation.")
return
df = pd.DataFrame(data)
df = df.set_index(time_field)
df.index = pd.to_datetime(df.index)
df[price_field] = pd.to_numeric(df[price_field], errors='coerce')
df.dropna(subset=[price_field], inplace=True)
if df.empty:
print(f"No valid price data for {security_id} in {collection_name}. Skipping advanced indicator calculation.")
return
# Calculate MACD
df.ta.macd(append=True) # Default lengths 12, 26, 9
# Calculate Bollinger Bands
df.ta.bbands(append=True) # Default lengths 20, 2
# Calculate ADX (requires high, low, close)
if "high" in df.columns and "low" in df.columns:
df.ta.adx(high=df["high"], low=df["low"], close=df[price_field], append=True) # Default length 14
# Calculate Stochastic Oscillator (requires high, low, close)
if "high" in df.columns and "low" in df.columns:
df.ta.stoch(high=df["high"], low=df["low"], close=df[price_field], append=True) # Default lengths 14, 3, 3
operations = []
for index, row in df.iterrows():
update_doc = {}
# MACD
if f"MACD_{12}_{26}_{9}" in row:
update_doc[f"MACD_{12}_{26}_{9}"] = row[f"MACD_{12}_{26}_{9}"]
update_doc[f"MACDH_{12}_{26}_{9}"] = row[f"MACDH_{12}_{26}_{9}"]
update_doc[f"MACDS_{12}_{26}_{9}"] = row[f"MACDS_{12}_{26}_{9}"]
# Bollinger Bands
if f"BBL_{20}_2.0" in row:
update_doc[f"BBL_{20}_2.0"] = row[f"BBL_{20}_2.0"]
update_doc[f"BBM_{20}_2.0"] = row[f"BBM_{20}_2.0"]
update_doc[f"BBU_{20}_2.0"] = row[f"BBU_{20}_2.0"]
update_doc[f"BBB_{20}_2.0"] = row[f"BBB_{20}_2.0"]
update_doc[f"BBP_{20}_2.0"] = row[f"BBP_{20}_2.0"]
# ADX
if f"ADX_{14}" in row:
update_doc[f"ADX_{14}"] = row[f"ADX_{14}"]
update_doc[f"DMP_{14}"] = row[f"DMP_{14}"]
update_doc[f"DMN_{14}"] = row[f"DMN_{14}"]
# Stochastic
if f"STOCHk_{14}_{3}_3" in row:
update_doc[f"STOCHk_{14}_{3}_3"] = row[f"STOCHk_{14}_{3}_3"]
update_doc[f"STOCHd_{14}_{3}_3" ] = row[f"STOCHd_{14}_{3}_3"]
update_doc = {k: v for k, v in update_doc.items() if pd.notna(v)}
if update_doc:
operations.append(
UpdateOne(
{"securityId": security_id, time_field: index.to_pydatetime()},
{"$set": update_doc}
)
)
if operations:
db_client[collection_name].bulk_write(operations, ordered=False)
print(f"Successfully calculated and stored advanced indicators for {security_id} in {collection_name}.")
else:
print(f"No updates to perform for {security_id} in {collection_name}.")
# Main execution for Task 11.2
# if __name__ == "__main__":
# # Example: Calculate advanced indicators for NIFTY index (1-min, 5-min, daily)
# nifty_info = db.universes.find_one({"_id": "MAJOR_INDICES"})
# if nifty_info:
# nifty_security_id = next((item["securityId"] for item in nifty_info["members"] if item["symbol"] == "NIFTY"), None)
# if nifty_security_id:
# calculate_and_store_advanced_indicators(db, "historical_intraday", nifty_security_id, "timestamp")
# calculate_and_store_advanced_indicators(db, "historical_daily", nifty_security_id, "date", price_field="close")
# client.close()
```
**Expected Output (CLI):**
```
Calculating advanced indicators for XXXXX in historical_intraday...
Successfully calculated and stored advanced indicators for XXXXX in historical_intraday.
Calculating advanced indicators for YYYYY in historical_daily...
Successfully calculated and stored advanced indicators for YYYYY in historical_daily.
```
**Expected Output (MongoDB - `historical_intraday` and `historical_daily` collections):**
* Existing documents will be updated with new fields for the calculated advanced indicators.
* Example document structure (simplified, for `historical_intraday`):
```json
{
"_id": ObjectId("..."),
"securityId": "12345",
"timestamp": ISODate("2025-08-08T10:30:00.000Z"),
"close": 100.2,
"MACD_12_26_9": 0.5,
"MACDH_12_26_9": 0.1,
"MACDS_12_26_9": 0.4,
"BBL_20_2.0": 98.0,
"BBM_20_2.0": 100.0,
"BBU_20_2.0": 102.0,
"BBB_20_2.0": 4.0,
"BBP_20_2.0": 0.6,
"ADX_14": 25.0,
"DMP_14": 20.0,
"DMN_14": 15.0,
"STOCHk_14_3_3": 70.0,
"STOCHd_14_3_3": 65.0
// ... other fields
}
```
**Validation (for AI):**
* **CLI Check:** Observe the success messages for each instrument and collection.
* **MongoDB Check:**
* Query a few documents in `historical_intraday` and `historical_daily` for instruments that should have advanced indicators calculated.
* Verify the presence of new fields like `MACD_12_26_9`, `BBL_20_2.0`, `ADX_14`, `STOCHk_14_3_3`.
* Ensure that `NaN` values are handled correctly.
---
### 12. Calculate Option Greeks
**Objective:** Enhance the option chain data by calculating missing Option Greeks (Delta, Gamma, Theta, Vega, Rho) using the Black-Scholes model. This provides a complete set of Greeks for options analysis, even if not directly provided by the API.
**Python Script:** `11_option_greeks.py`
**Instructions for AI:**
#### Task 12.1: Calculate missing Greeks
**Purpose:** For each option leg in the `option_chain_data` collection, calculate or re-calculate Greeks (Delta, Gamma, Theta, Vega, Rho) using the `py_vollib` library and the Black-Scholes model. This ensures consistency and completeness of Greek data, which is vital for options strategy and risk management.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
from datetime import datetime, timedelta
import pytz
import pandas as pd
# Import py_vollib for Black-Scholes calculations
from py_vollib.black_scholes import black_scholes as bs
from py_vollib.black_scholes.greeks.analytical import delta, gamma, vega, theta, rho
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def calculate_and_store_greeks(db_client):
print("Calculating and storing Option Greeks...")
# Fetch all option chain snapshots that need Greek calculation
# We will iterate through each option chain document
option_chains_cursor = db_client.option_chain_data.find({})
for oc_doc in option_chains_cursor:
underlying_symbol = oc_doc["underlyingSymbol"]
underlying_security_id = oc_doc["underlyingSecurityId"]
expiry_date_str = oc_doc["expiry"]
snapshot_ts_ist = oc_doc["snapshot_ts_ist"]
# Fetch the latest LTP of the underlying asset at the time of the snapshot
# This is crucial for accurate Greek calculation.
# AI needs to ensure this data is available or fetched.
# For simplicity, we'll assume we can get the underlying LTP from the latest tick
# or from a historical snapshot if the option chain snapshot is historical.
# In a real system, you'd fetch the underlying price at `snapshot_ts_ist`.
underlying_ltp_doc = db_client.live_ticks_full.find_one(
{"securityId": underlying_security_id, "ts_event_ist": {"$lte": snapshot_ts_ist}},
sort=[("ts_event_ist", -1)]
)
if not underlying_ltp_doc or "ltp" not in underlying_ltp_doc:
print(f"Warning: Underlying LTP not found for {underlying_symbol} at {snapshot_ts_ist}. Skipping Greek calculation for this chain.")
continue
underlying_price = underlying_ltp_doc["ltp"]
# Get risk-free rate (e.g., current Indian 10-year G-Sec yield, or a fixed value)
# AI should research and use a reasonable risk-free rate for Indian market.
# For example, 5-7% annually, converted to daily/continuous.
risk_free_rate = 0.07 # Example: 7% annual risk-free rate
updated_legs = []
for leg in oc_doc.get("legs", []):
option_type = leg["type"].lower() # 'ce' or 'pe'
strike_price = leg["strikePrice"]
option_price = leg["ltp"]
implied_volatility = leg.get("iv") # Use API provided IV if available
# Calculate time to expiry in years
expiry_dt = datetime.strptime(expiry_date_str, "%Y-%m-%d").date()
time_to_expiry_days = (expiry_dt - snapshot_ts_ist.date()).days
if time_to_expiry_days <= 0: # Handle expired options
time_to_expiry_years = 0.00001 # Small non-zero value to avoid division by zero
else:
time_to_expiry_years = time_to_expiry_days / 365.0
# If IV is not provided by API, or is zero/invalid, calculate it from option price
# This is complex and requires numerical methods (e.g., Newton-Raphson)
# For simplicity, if IV is missing, we might skip Greeks or use a default.
# py_vollib's black_scholes function can calculate IV if option_price is given.
# However, py_vollib.black_scholes.implied_volatility is better for this.
# For this task, we assume IV is either provided or we skip Greeks if it's missing.
if implied_volatility is None or implied_volatility <= 0:
print(f" Warning: IV missing or invalid for {underlying_symbol} {leg.get('tradingSymbol')}. Skipping Greeks calculation for this leg.")
updated_legs.append(leg)
continue
# Convert IV from percentage to decimal for calculation
sigma = implied_volatility / 100.0
try:
# Calculate Greeks
leg["calculated_delta"] = delta(option_type, underlying_price, strike_price, time_to_expiry_years, risk_free_rate, sigma)
leg["calculated_gamma"] = gamma(option_type, underlying_price, strike_price, time_to_expiry_years, risk_free_rate, sigma)
leg["calculated_vega"] = vega(option_type, underlying_price, strike_price, time_to_expiry_years, risk_free_rate, sigma)
leg["calculated_theta"] = theta(option_type, underlying_price, strike_price, time_to_expiry_years, risk_free_rate, sigma)
leg["calculated_rho"] = rho(option_type, underlying_price, strike_price, time_to_expiry_years, risk_free_rate, sigma)
except Exception as e:
print(f" Error calculating Greeks for {underlying_symbol} {leg.get('tradingSymbol')}: {e}")
updated_legs.append(leg)
# Update the document in MongoDB
db_client.option_chain_data.update_one(
{"_id": oc_doc["_id"]},
{"$set": {"legs": updated_legs}}
)
print(f" Greeks calculated and updated for {underlying_symbol} expiring {expiry_date_str}.")
print("Finished calculating and storing Option Greeks.")
# Main execution for Task 12.1
# if __name__ == "__main__":
# calculate_and_store_greeks(db)
# client.close()
```
**Expected Output (CLI):**
```
Calculating and storing Option Greeks...
Greeks calculated and updated for NIFTY expiring 2025-08-08.
Greeks calculated and updated for BANKNIFTY expiring 2025-08-08.
... (repeats for all option chains)
Finished calculating and storing Option Greeks.
```
**Expected Output (MongoDB - `option_chain_data` collection):**
* Existing documents will be updated. Each option leg within the `legs` array will have new fields for calculated Greeks.
* Example document structure (simplified, showing added fields):
```json
{
"_id": ObjectId("..."),
"underlyingSymbol": "NIFTY",
"underlyingSecurityId": "99999",
"expiry": "2025-08-08",
"snapshot_ts_ist": ISODate("2025-08-08T11:00:00.000Z"),
"legs": [
{ // Call Option
"type": "CE",
"strikePrice": 22000.0,
"ltp": 150.0,
"iv": 15.5,
"delta": 0.5,
"gamma": 0.001,
"theta": -0.5,
"vega": 0.8,
"openInterest": 500000,
"calculated_delta": 0.5012,
"calculated_gamma": 0.0010,
"calculated_vega": 0.8050,
"calculated_theta": -0.0500,
"calculated_rho": 0.0120
},
// ... other legs
]
}
```
**Validation (for AI):**
* **CLI Check:** Observe the success messages for each option chain processed.
* **MongoDB Check:**
* Query a few documents in `option_chain_data`.
* Inspect the `legs` array for each document and verify the presence of `calculated_delta`, `calculated_gamma`, `calculated_vega`, `calculated_theta`, and `calculated_rho` fields.
* If the API provides Greeks, compare the `calculated_` values with the API-provided values to ensure they are reasonably close (allowing for minor differences due to model assumptions or input precision).
* Ensure that Greeks are calculated only for options with valid `iv` and `ltp`.
---
## Phase 7: Dual API Implementation and Optimization (Crucial for this setup)
### 13. Implement Dual API Strategy
**Objective:** Develop a robust API client that leverages two Dhan HQ API accounts to increase request throughput, manage rate limits effectively, and provide resilience through automatic failover. This is a critical component for high-frequency data acquisition.
**Python Script:** `12_dual_api_client.py`
**Instructions for AI:**
#### Task 13.1: Create a centralized API client with rotation logic
**Purpose:** Design and implement a Python class that encapsulates the logic for making API calls, automatically rotating between the two Dhan HQ API accounts (your primary and your brother's secondary account). It should handle rate limit errors by temporarily blacklisting an account and switching to the other, implementing a backoff and retry mechanism.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import requests
import time
from dotenv import load_dotenv
from collections import deque
from datetime import datetime, timedelta
import threading
# Load environment variables
load_dotenv()
class DhanAPIClient:
def __init__(self):
self.api_configs = [
{
"access_token": os.getenv("DHAN_ACCESS_TOKEN_1"),
"client_id": os.getenv("DHAN_CLIENT_ID_1"),
"last_used": datetime.min, # To track last usage for rotation
"rate_limited_until": datetime.min # To track when an account is rate-limited
},
{
"access_token": os.getenv("DHAN_ACCESS_TOKEN_2"),
"client_id": os.getenv("DHAN_CLIENT_ID_2"),
"last_used": datetime.min,
"rate_limited_until": datetime.min
}
]
self.base_url = "https://api.dhan.co/" # Verify Dhan HQ base URL
self.lock = threading.Lock() # To ensure thread-safe access to api_configs
self.retry_after_seconds = 60 # Default retry after 60 seconds for rate limits
def _get_available_client(self):
with self.lock:
now = datetime.now()
available_clients = [
config for config in self.api_configs
if config["rate_limited_until"] <= now
]
if not available_clients:
# No clients immediately available, wait for the soonest one to become available
soonest_available_time = min(config["rate_limited_until"] for config in self.api_configs)
wait_seconds = (soonest_available_time - now).total_seconds() + 1 # Add a small buffer
print(f"All API clients rate-limited. Waiting for {wait_seconds:.2f} seconds...")
time.sleep(wait_seconds)
return self._get_available_client() # Recurse after waiting
# Simple round-robin based on last used time
available_clients.sort(key=lambda x: x["last_used"])
selected_client = available_clients[0]
selected_client["last_used"] = now
return selected_client
def _make_request(self, method, endpoint, params=None, json_data=None):
client_config = self._get_available_client()
headers = {
"access-token": client_config["access_token"],
"x-dhan-client-id": client_config["client_id"],
"Content-Type": "application/json"
}
url = f"{self.base_url}{endpoint}"
for attempt in range(3): # Retry up to 3 times
try:
if method == "GET":
response = requests.get(url, headers=headers, params=params)
elif method == "POST":
response = requests.post(url, headers=headers, json=json_data)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
if response.status_code == 429: # Rate Limit Exceeded
print(f"Rate limit hit for client {client_config["client_id"]}. Retrying...")
with self.lock:
client_config["rate_limited_until"] = datetime.now() + timedelta(seconds=self.retry_after_seconds)
time.sleep(self.retry_after_seconds) # Wait before next attempt
client_config = self._get_available_client() # Get a new (or same if now available) client
headers["access-token"] = client_config["access_token"]
headers["x-dhan-client-id"] = client_config["client_id"]
continue # Retry with potentially new client
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
return response.json()
except requests.exceptions.RequestException as e:
print(f"Request error for client {client_config["client_id"]}: {e}")
if attempt < 2: # Don't wait on last attempt
time.sleep(2 ** attempt) # Exponential backoff
else:
raise # Re-raise after all retries
raise Exception("Failed to make request after multiple retries.")
# Public methods for specific API calls
def get_instruments(self, exchange_segment):
return self._make_request("GET", "api/v1/instruments", params={"exchangeSegment": exchange_segment})
def get_market_quote(self, security_ids, exchange_segment):
return self._make_request("POST", "api/v1/marketfeed/quote", json_data={
"securityIds": security_ids,
"exchangeSegment": exchange_segment
})
def get_intraday_historical(self, security_id, from_date, to_date, interval, include_oi=False):
params = {
"securityId": security_id,
"fromDate": from_date.strftime("%Y-%m-%d"),
"toDate": to_date.strftime("%Y-%m-%d"),
"interval": interval,
"includeOI": str(include_oi).lower()
}
return self._make_request("GET", "api/v1/historical/intraday", params=params)
def get_daily_historical(self, security_id, from_date, to_date):
params = {
"securityId": security_id,
"fromDate": from_date.strftime("%Y-%m-%d"),
"toDate": to_date.strftime("%Y-%m-%d")
}
return self._make_request("GET", "api/v1/historical/daily", params=params)
def get_option_chain(self, underlying_security_id, expiry_date):
params = {
"underlyingSecurityId": underlying_security_id,
"expiryDate": expiry_date
}
return self._make_request("GET", "api/v1/optionchain", params=params)
def get_positions(self):
return self._make_request("GET", "api/v1/positions")
def get_holdings(self):
return self._make_request("GET", "api/v1/holdings")
def get_orders(self):
return self._make_request("GET", "api/v1/orders")
def get_trades(self):
return self._make_request("GET", "api/v1/trades")
# Main execution for Task 13.1 (for testing the client itself)
# if __name__ == "__main__":
# dhan_client = DhanAPIClient()
# try:
# # Test instrument fetch with the new client
# print("Testing get_instruments...")
# instruments = dhan_client.get_instruments("NSE_EQ")
# print(f"Fetched {len(instruments)} NSE_EQ instruments.")
# # Simulate rate limit by making many requests quickly
# print("Simulating rate limit...")
# for _ in range(5): # Adjust based on actual rate limits
# try:
# quote = dhan_client.get_market_quote(["1000"], "NSE_EQ") # Use a dummy securityId
# print(f"Quote fetched: {quote}")
# except Exception as e:
# print(f"Error during rate limit simulation: {e}")
# time.sleep(0.1) # Make requests fast
# except Exception as e:
# print(f"An error occurred during client testing: {e}")
```
**Expected Output (CLI):**
```
Testing get_instruments...
Fetched XXXXX NSE_EQ instruments.
Simulating rate limit...
Quote fetched: [...]
Rate limit hit for client your_primary_client_id. Retrying...
Quote fetched: [...]
Rate limit hit for client your_secondary_client_id. Retrying...
All API clients rate-limited. Waiting for YY.YY seconds...
Quote fetched: [...]
```
(The exact output will depend on actual rate limits and API responses, but it should demonstrate client rotation and rate limit handling).
**Validation (for AI):**
* **Code Review:** Ensure the `DhanAPIClient` class correctly implements:
* Loading of both API keys from `.env`.
* A mechanism to select an available API key (e.g., round-robin, least recently used, or based on rate limit status).
* Error handling for HTTP 429 (Too Many Requests) responses.
* Temporary blacklisting of rate-limited accounts.
* A backoff/retry mechanism.
* **Behavioral Test:**
* Run the test code that simulates rate limits. Observe the console output to confirm that the client switches between API keys and waits when both are rate-limited.
* Verify that API calls eventually succeed after rate limits are respected.
#### Task 13.2: Integrate dual API client into all API-fetching scripts
**Purpose:** Replace the simple `SimpleDhanAPIClient` instances in all previously developed data fetching scripts with the newly created `DhanAPIClient` to leverage the dual API strategy across the entire data pipeline.
**Core Logic for AI (Python Snippet Guidance):**
**Instructions for AI:**
* **Modify `01_fetch_instruments.py`:**
* Remove `SimpleDhanAPIClient` definition.
* Replace `api_client = SimpleDhanAPIClient(...)` with `dhan_client = DhanAPIClient()`.
* Update calls from `api_client.get_instruments(...)` to `dhan_client.get_instruments(...)`.
* **Modify `05_rest_parity.py`:**
* Remove `SimpleDhanAPIClient` definition.
* Replace `api_client = SimpleDhanAPIClient(...)` with `dhan_client = DhanAPIClient()`.
* Update calls from `api_client.get_market_quote(...)` to `dhan_client.get_market_quote(...)`.
* **Modify `06_historical_intraday.py`:**
* Remove `SimpleDhanAPIClient` definition.
* Replace `api_client = SimpleDhanAPIClient(...)` with `dhan_client = DhanAPIClient()`.
* Update calls from `api_client.get_intraday_historical(...)` to `dhan_client.get_intraday_historical(...)`.
* **Modify `07_historical_daily.py`:**
* Remove `SimpleDhanAPIClient` definition.
* Replace `api_client = SimpleDhanAPIClient(...)` with `dhan_client = DhanAPIClient()`.
* Update calls from `api_client.get_daily_historical(...)` to `dhan_client.get_daily_historical(...)`.
* **Modify `08_option_chains.py`:**
* Remove `SimpleDhanAPIClient` definition.
* Replace `api_client = SimpleDhanAPIClient(...)` with `dhan_client = DhanAPIClient()`.
* Update calls from `api_client.get_option_chain(...)` to `dhan_client.get_option_chain(...)`.
* **Crucially, remove or significantly reduce the `time.sleep(3)` calls** within `fetch_and_store_option_chain` and `fetch_all_relevant_option_chains` as the `DhanAPIClient` now handles throttling and rotation internally.
* **Modify `09_account_data.py`:**
* Remove `SimpleDhanAPIClient` definition and the two separate `api_client_1`, `api_client_2` instances.
* Replace them with a single `dhan_client = DhanAPIClient()`.
* Update calls from `api_client_1.get_positions()`, `api_client_2.get_positions()` etc., to `dhan_client.get_positions()`, `dhan_client.get_holdings()`, `dhan_client.get_orders()`, `dhan_client.get_trades()`.
* The `account_id` parameter passed to `fetch_and_store_positions`, `fetch_and_store_order_history`, `fetch_and_store_trade_history` functions will now need to be explicitly managed. The AI should understand that the `DhanAPIClient` itself doesn't inherently know which account it's using for a specific request, but rather it's providing the *capability* to make requests. If you need to fetch data *per account*, you might need to pass the specific `access_token` and `client_id` to the `DhanAPIClient` methods, or instantiate two `DhanAPIClient` objects, each configured for one account, and manage their calls separately if the API doesn't support fetching data for multiple accounts in one call. **However, the primary goal of the `DhanAPIClient` is to abstract away the dual-account management for *rate limits*, not necessarily to fetch data *for* both accounts in a single call if the API doesn't support it.** For account-specific data (positions, orders, trades), you will still need to make two separate calls, one for each account, but now using the `DhanAPIClient`'s internal rotation for rate limit management. This means `fetch_and_store_positions` (and others) will be called twice, once for each account's credentials, which the `DhanAPIClient` will handle internally.
**Validation (for AI):**
* **Code Review:** Verify that all `SimpleDhanAPIClient` instances are replaced and calls are updated to use the `DhanAPIClient`.
* **Functional Test:** Run all data fetching scripts (`01_fetch_instruments.py`, `05_rest_parity.py`, `06_historical_intraday.py`, `07_historical_daily.py`, `08_option_chains.py`, `09_account_data.py`).
* **Performance Test (Conceptual):** Observe the speed of data fetching, especially for `08_option_chains.py`. It should be noticeably faster than with a single API key due to the parallel request capability (or faster sequential requests if rate limits are hit less often).
* **Error Handling Test:** Simulate rate limits (e.g., by making many rapid calls or temporarily invalidating one API key) and ensure the system continues to function by switching to the other API key.
---
## Phase 8: Monitoring and Quality Assurance
### 14. Implement Data Quality Checks
**Objective:** Ensure the integrity, accuracy, and freshness of the ingested and processed data through automated quality checks. This is crucial for maintaining reliable data for trading decisions and AI models.
**Python Script:** `13_data_quality.py`
**Instructions for AI:**
#### Task 14.1: Implement schema validation
**Purpose:** Verify that the data stored in MongoDB collections conforms to expected schemas, ensuring all required fields are present and data types are correct. This helps catch issues early in the data pipeline.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# Define expected schemas for key collections
# AI should understand that these schemas are simplified and need to be comprehensive
# based on the actual data structure from Dhan HQ API and derived data.
SCHEMAS = {
"instruments_master": {
"securityId": str,
"symbol": str,
"exchangeSegment": str,
"instrument": str
},
"live_ticks_full": {
"securityId": str,
"ts_event_ist": datetime,
"ltp": (int, float),
"volume": (int, float)
},
"historical_intraday": {
"securityId": str,
"timestamp": datetime,
"open": (int, float),
"high": (int, float),
"low": (int, float),
"close": (int, float),
"volume": (int, float)
},
"option_chain_data": {
"underlyingSecurityId": str,
"expiry": str,
"snapshot_ts_ist": datetime,
"legs": list # List of dicts, each with its own schema
}
# Add schemas for other collections like historical_daily, account_positions, etc.
}
def validate_collection_schema(db_client, collection_name, schema):
print(f"Validating schema for collection: {collection_name}...")
issues_found = []
# Limit the number of documents to check for performance in large collections
sample_docs = db_client[collection_name].find().limit(1000)
for doc in sample_docs:
doc_id = doc.get("_id", "N/A")
for field, expected_type in schema.items():
if field not in doc:
issues_found.append(f"Doc ID {doc_id}: Missing required field \'{field}\'")
elif not isinstance(doc[field], expected_type):
issues_found.append(f"Doc ID {doc_id}: Field \'{field}\' has incorrect type. Expected {expected_type}, got {type(doc[field])}")
# Special handling for nested schemas, e.g., option_chain_data.legs
if collection_name == "option_chain_data" and "legs" in doc and isinstance(doc["legs"], list):
for i, leg in enumerate(doc["legs"]):
# Define a sub-schema for option legs if needed
leg_schema = {"type": str, "strikePrice": (int, float), "ltp": (int, float)}
for leg_field, leg_expected_type in leg_schema.items():
if leg_field not in leg:
issues_found.append(f"Doc ID {doc_id}, Leg {i}: Missing required field \'{leg_field}\' in leg")
elif not isinstance(leg[leg_field], leg_expected_type):
issues_found.append(f"Doc ID {doc_id}, Leg {i}: Field \'{leg_field}\' has incorrect type. Expected {leg_expected_type}, got {type(leg[leg_field])}")
if issues_found:
print(f"Schema validation for {collection_name} completed with {len(issues_found)} issues.")
# Store issues in a dedicated collection for review
db_client.data_quality_issues.insert_one({
"timestamp": datetime.now(IST).replace(tzinfo=None),
"check_type": "schema_validation",
"collection": collection_name,
"issues": issues_found
})
else:
print(f"Schema validation for {collection_name} completed. No issues found.")
# Main execution for Task 14.1
# if __name__ == "__main__":
# db.data_quality_issues.create_index([("timestamp", 1)])
# for col_name, schema in SCHEMAS.items():
# validate_collection_schema(db, col_name, schema)
# client.close()
```
**Expected Output (CLI):**
```
Validating schema for collection: instruments_master...
Schema validation for instruments_master completed. No issues found.
Validating schema for collection: live_ticks_full...
Schema validation for live_ticks_full completed with X issues.
Doc ID 123: Missing required field 'ltp'
Doc ID 456: Field 'volume' has incorrect type. Expected (, ), got
```
**Expected Output (MongoDB - `data_quality_issues` collection):**
* Documents recording schema validation failures.
* Example document structure:
```json
{
"_id": ObjectId("..."),
"timestamp": ISODate("2025-08-08T11:30:00.000Z"),
"check_type": "schema_validation",
"collection": "live_ticks_full",
"issues": [
"Doc ID 123: Missing required field 'ltp'",
"Doc ID 456: Field 'volume' has incorrect type. Expected (, ), got "
]
}
```
**Validation (for AI):**
* **CLI Check:** Observe the validation messages. If issues are reported, inspect them.
* **MongoDB Check:**
* Query `db.data_quality_issues.count_documents({"check_type": "schema_validation"})`.
* Inspect documents to understand the types of schema violations. The AI should then suggest fixes to the data ingestion or processing logic.
#### Task 14.2: Implement data freshness checks
**Purpose:** Monitor the recency of data in critical collections (e.g., `live_ticks_full`, `option_chain_data`) to ensure that real-time and frequently updated data is not stale. This is vital for timely trading decisions.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime, timedelta
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# Define freshness thresholds for collections (in seconds)
FRESHNESS_THRESHOLDS = {
"live_ticks_full": 60, # Live ticks should be no older than 60 seconds
"option_chain_data": 300 # Option chain snapshots no older than 5 minutes
# Add thresholds for other collections as needed
}
def check_data_freshness(db_client, collection_name, time_field, threshold_seconds):
print(f"Checking freshness for collection: {collection_name}...")
now_ist = datetime.now(IST).replace(tzinfo=None)
stale_threshold = now_ist - timedelta(seconds=threshold_seconds)
# Find the most recent document in the collection
latest_doc = db_client[collection_name].find().sort(time_field, -1).limit(1)
latest_doc = list(latest_doc)
if not latest_doc:
print(f"Collection {collection_name} is empty. Cannot check freshness.")
db_client.data_quality_issues.insert_one({
"timestamp": now_ist,
"check_type": "data_freshness",
"collection": collection_name,
"status": "empty",
"message": "Collection is empty, no data to check freshness."
})
return
latest_timestamp = latest_doc[0].get(time_field)
if not latest_timestamp:
print(f"Warning: No timestamp field \'{time_field}\' found in latest document of {collection_name}.")
db_client.data_quality_issues.insert_one({
"timestamp": now_ist,
"check_type": "data_freshness",
"collection": collection_name,
"status": "no_timestamp_field",
"message": f"Latest document has no \'{time_field}\' field."
})
return
if latest_timestamp < stale_threshold:
stale_duration = now_ist - latest_timestamp
message = f"Collection {collection_name} is STALE. Latest data is {stale_duration.total_seconds():.2f} seconds old (threshold: {threshold_seconds}s)."
print(message)
db_client.data_quality_issues.insert_one({
"timestamp": now_ist,
"check_type": "data_freshness",
"collection": collection_name,
"status": "stale",
"latest_data_timestamp": latest_timestamp,
"stale_duration_seconds": stale_duration.total_seconds(),
"threshold_seconds": threshold_seconds,
"message": message
})
else:
fresh_duration = now_ist - latest_timestamp
print(f"Collection {collection_name} is FRESH. Latest data is {fresh_duration.total_seconds():.2f} seconds old.")
# Optionally, log success for auditing
# db_client.data_quality_issues.insert_one({
# "timestamp": now_ist,
# "check_type": "data_freshness",
# "collection": collection_name,
# "status": "fresh",
# "latest_data_timestamp": latest_timestamp,
# "fresh_duration_seconds": fresh_duration.total_seconds(),
# "threshold_seconds": threshold_seconds
# })
# Main execution for Task 14.2
# This would typically be run by a cron job or a separate monitoring thread.
# if __name__ == "__main__":
# db.data_quality_issues.create_index([("timestamp", 1)])
# check_data_freshness(db, "live_ticks_full", "ts_event_ist", FRESHNESS_THRESHOLDS["live_ticks_full"])
# check_data_freshness(db, "option_chain_data", "snapshot_ts_ist", FRESHNESS_THRESHOLDS["option_chain_data"])
# client.close()
```
**Expected Output (CLI):**
```
Checking freshness for collection: live_ticks_full...
Collection live_ticks_full is FRESH. Latest data is 15.34 seconds old.
Checking freshness for collection: option_chain_data...
Collection option_chain_data is STALE. Latest data is 350.12 seconds old (threshold: 300s).
```
**Expected Output (MongoDB - `data_quality_issues` collection):**
* Documents recording data freshness status, especially for stale data.
* Example document structure for stale data:
```json
{
"_id": ObjectId("..."),
"timestamp": ISODate("2025-08-08T11:35:00.000Z"),
"check_type": "data_freshness",
"collection": "option_chain_data",
"status": "stale",
"latest_data_timestamp": ISODate("2025-08-08T11:29:10.000Z"),
"stale_duration_seconds": 350.0,
"threshold_seconds": 300,
"message": "Collection option_chain_data is STALE. Latest data is 350.00 seconds old (threshold: 300s)."
}
```
**Validation (for AI):**
* **CLI Check:** Observe the freshness status messages.
* **MongoDB Check:**
* Query `db.data_quality_issues.count_documents({"check_type": "data_freshness"})`.
* Inspect documents to identify stale collections. The AI should then investigate the corresponding data ingestion process (e.g., WebSocket client, option chain fetcher) if data is consistently stale.
---
### 15. Implement Logging and Monitoring
**Objective:** Establish comprehensive logging and monitoring mechanisms to gain operational visibility into the data pipeline, track API calls, errors, and performance metrics, and store them for auditing and debugging.
**Python Script:** `14_monitoring.py`
**Instructions for AI:**
#### Task 15.1: Set up structured logging
**Purpose:** Implement a structured logging system that captures detailed information about API requests, responses, errors, and key events within the data fetching pipeline. This log data will be stored in a dedicated MongoDB collection for easy querying and analysis.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import logging
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime
import pytz
# Load environment variables
load_dotenv()
# Initialize MongoDB client
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
# --- Custom MongoDB Handler for Logging ---
class MongoHandler(logging.Handler):
def __init__(self, collection, level=logging.NOTSET):
super().__init__(level)
self.collection = collection
def emit(self, record):
log_entry = self.format(record)
try:
self.collection.insert_one(log_entry)
except Exception as e:
print(f"Error inserting log to MongoDB: {e}")
def format(self, record):
# Convert log record to a dictionary for MongoDB
log_data = {
"timestamp": datetime.fromtimestamp(record.created, tz=IST).replace(tzinfo=None),
"level": record.levelname,
"module": record.name,
"function": record.funcName,
"line": record.lineno,
"message": record.getMessage(),
"process": record.processName,
"thread": record.threadName,
"extra": {}
}
# Add any extra attributes passed to the logger
for key, value in record.__dict__.items():
if key not in ["name", "levelname", "pathname", "filename", "lineno", "funcName",
"created", "asctime", "msecs", "relativeCreated", "thread",
"threadName", "processName", "process", "message", "args",
"exc_info", "exc_text", "stack_info", "levelno", "msg", "_stack",
"_is_free_to_reuse_for_once_at_a_time_log_entry", "_log_record_handler_cache"]:
log_data["extra"][key] = value
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
if record.stack_info:
log_data["stack_info"] = record.stack_info
return log_data
# --- Logger Setup ---
def setup_logging():
logger = logging.getLogger("dhan_hq_pipeline")
logger.setLevel(logging.INFO) # Set base logging level
# Console Handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(console_handler)
# File Handler (for local logs)
log_file = "dhan_hq_pipeline.log"
file_handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5) # 10MB, 5 backups
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(file_handler)
# MongoDB Handler
mongo_collection = db.jobs_audit # Dedicated collection for logs
mongo_handler = MongoHandler(mongo_collection)
logger.addHandler(mongo_handler)
print("Logging system initialized.")
return logger
# Example usage:
# logger = setup_logging()
# logger.info("Application started.")
# logger.debug("This is a debug message.") # Won't show with INFO level
# logger.warning("A warning occurred.")
# logger.error("An error occurred!", extra={"api_endpoint": "/instruments", "status_code": 500})
# try:
# 1 / 0
# except ZeroDivisionError:
# logger.exception("Division by zero error!")
# Main execution for Task 15.1
# if __name__ == "__main__":
# logger = setup_logging()
# logger.info("Testing structured logging setup.")
# logger.error("Simulating an error with extra data.", extra={"component": "WebSocket", "reason": "Connection lost"})
# try:
# raise ValueError("Test exception")
# except ValueError:
# logger.exception("Caught a test exception.")
# client.close()
```
**Expected Output (CLI):**
```
Logging system initialized.
2025-08-08 11:40:00,000 - dhan_hq_pipeline - INFO - Testing structured logging setup.
2025-08-08 11:40:00,001 - dhan_hq_pipeline - ERROR - Simulating an error with extra data.
2025-08-08 11:40:00,002 - dhan_hq_pipeline - ERROR - Caught a test exception.
Traceback (most recent call last):
File "", line X, in
File "", line Y, in
ValueError: Test exception
```
(The exact timestamps and line numbers will vary).
**Expected Output (MongoDB - `jobs_audit` collection):**
* Documents representing structured log entries.
* Example log entry for an error with extra data:
```json
{
"_id": ObjectId("..."),
"timestamp": ISODate("2025-08-08T11:40:00.001Z"),
"level": "ERROR",
"module": "dhan_hq_pipeline",
"function": "",
"line": 123,
"message": "Simulating an error with extra data.",
"process": "MainProcess",
"thread": "MainThread",
"extra": {
"component": "WebSocket",
"reason": "Connection lost"
}
}
```
* Example log entry for an exception:
```json
{
"_id": ObjectId("..."),
"timestamp": ISODate("2025-08-08T11:40:00.002Z"),
"level": "ERROR",
"module": "dhan_hq_pipeline",
"function": "",
"line": 125,
"message": "Caught a test exception.",
"process": "MainProcess",
"thread": "MainThread",
"exception": "Traceback (most recent call last):...ValueError: Test exception\n",
"stack_info": "...", // Stack trace if available
"extra": {}
}
```
**Validation (for AI):**
* **CLI Check:** Verify that log messages appear in the console with the correct format and level.
* **File Check:** Confirm that `dhan_hq_pipeline.log` file is created and contains the log messages.
* **MongoDB Check:**
* Query `db.jobs_audit.count_documents({})` to ensure log entries are being inserted.
* Inspect sample documents to verify that `timestamp`, `level`, `module`, `message`, and any `extra` fields are correctly populated.
* Confirm that exceptions are captured with `exception` and `stack_info` fields.
* **Integration:** The AI should understand that this `setup_logging()` function and the `logger` instance should be integrated into all other Python scripts (`01_fetch_instruments.py`, `04_websocket_client.py`, etc.) to log their operations, errors, and key events. For example, in `01_fetch_instruments.py`, after `logger = setup_logging()`, replace `print(...)` statements with `logger.info(...)`, `logger.error(...)`, etc.
---
## Final Integration and Testing
### 16. Create Master Orchestration Script
**Objective:** Develop a central orchestration script that coordinates the execution of all data fetching, processing, and quality assurance tasks. This script will manage the overall data pipeline, ensuring tasks run in the correct sequence and at appropriate times (e.g., daily, during market hours, post-market).
**Python Script:** `15_master_orchestrator.py`
**Instructions for AI:**
#### Task 16.1: Create daily pre-market job
**Purpose:** Automate the execution of tasks that need to run once daily before market open, such as refreshing the instrument master, updating trading universes, and computing derivative views. This ensures that the system is operating with the latest foundational data.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import sys
import subprocess
from datetime import datetime
import pytz
import time
import logging
# Assume setup_logging is available from 14_monitoring.py
# For a real orchestrator, you'd import it or define it here.
# For now, a placeholder:
class TempLogger:
def info(self, msg): print(f"INFO: {msg}")
def error(self, msg): print(f"ERROR: {msg}")
def exception(self, msg): print(f"EXCEPTION: {msg}")
logger = TempLogger()
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def run_script(script_name):
logger.info(f"Running script: {script_name}")
try:
# Assuming scripts are in the same directory or accessible via PATH
# Use subprocess.run to execute Python scripts
result = subprocess.run([sys.executable, script_name], capture_output=True, text=True, check=True)
logger.info(f"Script {script_name} completed successfully.\nOutput:\n{result.stdout}")
except subprocess.CalledProcessError as e:
logger.error(f"Script {script_name} failed with error code {e.returncode}.\nStdout:\n{e.stdout}\nStderr:\n{e.stderr}")
logger.exception(f"Exception during {script_name} execution.")
except Exception as e:
logger.exception(f"Unexpected error running {script_name}.")
def daily_pre_market_job():
logger.info("Starting daily pre-market job...")
# Task 2: Fetch Instrument Master Data
run_script("01_fetch_instruments.py")
# Task 3: Build Trading Universes
run_script("02_build_universes.py")
# Task 4: Compute Derivative Views
run_script("03_derivative_views.py")
logger.info("Daily pre-market job completed.")
# Main execution for Task 16.1
# This job would typically be scheduled to run once daily before market open (e.g., 08:00 IST)
# if __name__ == "__main__":
# # Example of how to run this job
# daily_pre_market_job()
```
**Expected Output (CLI):**
```
INFO: Starting daily pre-market job...
INFO: Running script: 01_fetch_instruments.py
INFO: Script 01_fetch_instruments.py completed successfully.
Output:
Fetching NSE_EQ instruments...
Successfully fetched and stored XXXXX NSE_EQ instruments.
Fetching NSE_FNO instruments...
Successfully fetched and stored YYYYY NSE_FNO instruments.
Creating MongoDB indexes...
Index on securityId created.
Index on underlyingSecurityId created.
Compound index on (instrument, symbol) created.
INFO: Running script: 02_build_universes.py
INFO: Script 02_build_universes.py completed successfully.
Output:
Building ALL_INDEX_CONSTITUENT_STOCKS universe...
ALL_INDEX_CONSTITUENT_STOCKS universe built with XXX members.
Building MAJOR_INDICES universe...
MAJOR_INDICES universe built with 3 members.
INFO: Running script: 03_derivative_views.py
INFO: Script 03_derivative_views.py completed successfully.
Output:
Mapping Index Futures...
Mapped 3 index futures.
Mapping Index Options...
Mapped 3 index options.
Mapping Stock Futures...
Mapped XXX stock futures.
Mapping Stock Options...
Mapped YYY stock options.
INFO: Daily pre-market job completed.
```
**Validation (for AI):**
* **CLI Check:** Verify that all scripts within the pre-market job run successfully and report completion.
* **MongoDB Check:** After running, check the `instruments_master`, `universes`, and `derivative_views` collections to ensure they are populated with the latest data and reflect the expected structure and content as per Tasks 2, 3, and 4.
* **Error Handling:** Simulate a failure in one of the sub-scripts (e.g., by making an API call fail) and verify that the orchestrator logs the error appropriately and potentially stops or continues based on defined error handling strategy.
#### Task 16.2: Create real-time trading session job
**Purpose:** Manage the execution of real-time data ingestion and periodic updates during market hours. This job will primarily start and monitor the WebSocket client and periodically fetch option chain and account data.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import sys
import subprocess
from datetime import datetime, time as dt_time
import pytz
import time
import logging
# Assume logger and run_script are available from Task 16.1
# Assume is_market_open is available from Task 5.4
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def is_market_open():
now_ist = datetime.now(IST).time()
market_open_time = dt_time(9, 15)
market_close_time = dt_time(15, 30)
return market_open_time <= now_ist <= market_close_time
def real_time_trading_session_job():
logger.info("Starting real-time trading session job...")
if not is_market_open():
logger.info("Market is currently closed. Real-time job will wait for market open.")
# AI should implement a loop here to wait until market opens
while not is_market_open():
time.sleep(60) # Check every minute
logger.info("Market is now open. Resuming real-time job.")
# Start WebSocket client (04_websocket_client.py should be designed to run continuously)
# This script should be run in a separate process or thread to not block the orchestrator
logger.info("Starting WebSocket client (04_websocket_client.py) in background...")
websocket_process = subprocess.Popen([sys.executable, "04_websocket_client.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
logger.info(f"WebSocket client process started with PID: {websocket_process.pid}")
try:
while is_market_open():
logger.info("Market is open. Performing periodic tasks...")
# Periodic option chain updates (e.g., every 5 minutes)
run_script("08_option_chains.py") # This script should handle its own throttling
# Account data refresh (e.g., every 10 minutes)
run_script("09_account_data.py")
time.sleep(300) # Wait for 5 minutes before next cycle
logger.info("Market closed. Stopping real-time trading session job.")
except KeyboardInterrupt:
logger.info("Real-time job interrupted by user.")
finally:
# Terminate WebSocket process if still running
if websocket_process.poll() is None: # Check if process is still running
logger.info("Terminating WebSocket client process...")
websocket_process.terminate()
websocket_process.wait(timeout=10) # Wait for process to terminate
if websocket_process.poll() is None:
logger.error("WebSocket client process did not terminate gracefully. Killing it.")
websocket_process.kill()
logger.info("Real-time trading session job completed/terminated.")
# Main execution for Task 16.2
# This job would typically be run as a long-running process during market hours.
# if __name__ == "__main__":
# real_time_trading_session_job()
```
**Expected Output (CLI - during market hours):**
```
INFO: Starting real-time trading session job...
INFO: Starting WebSocket client (04_websocket_client.py) in background...
INFO: WebSocket client process started with PID: XXXXX
INFO: Market is open. Performing periodic tasks...
INFO: Running script: 08_option_chains.py
... (output from 08_option_chains.py)
INFO: Running script: 09_account_data.py
... (output from 09_account_data.py)
INFO: Market is open. Performing periodic tasks...
... (repeats every 5 minutes)
```
**Validation (for AI):**
* **CLI Check:** Verify that the WebSocket client starts in the background and periodic tasks (option chain, account data) run at the specified intervals.
* **MongoDB Check:** Confirm that `live_ticks_full`, `option_chain_data`, `account_positions`, `order_history`, and `trade_history` collections are continuously updated during market hours.
* **Process Monitoring:** Verify that the WebSocket process is running and terminates gracefully when the market closes or the orchestrator is stopped.
#### Task 16.3: Create post-market analysis job
**Purpose:** Execute tasks that need to run after market close, such as updating historical data, calculating technical indicators, and generating data quality reports. This ensures that all derived data is up-to-date for overnight analysis and next-day preparation.
**Core Logic for AI (Python Snippet Guidance):**
```python
import os
import sys
import subprocess
from datetime import datetime, time as dt_time
import pytz
import time
import logging
# Assume logger and run_script are available from Task 16.1
IST = pytz.timezone(os.getenv("TIMEZONE", "Asia/Kolkata"))
def is_market_closed():
now_ist = datetime.now(IST).time()
market_close_time = dt_time(15, 30)
# Consider a buffer after market close to ensure all real-time data is settled
buffer_time = dt_time(15, 45)
return now_ist >= market_close_time and now_ist <= dt_time(23, 59) # Example: runs after 15:30 until end of day
def post_market_analysis_job():
logger.info("Starting post-market analysis job...")
if not is_market_closed():
logger.info("Market is not yet closed or it's too late. Post-market job will wait.")
# AI should implement a loop here to wait until market closes and buffer time passes
while not is_market_closed():
time.sleep(60) # Check every minute
logger.info("Market is now closed. Resuming post-market job.")
# Task 7: Fetch Historical Intraday Data
run_script("06_historical_intraday.py")
# Task 8: Fetch Daily Historical Data
run_script("07_historical_daily.py")
# Task 11: Calculate Technical Indicators
run_script("10_technical_indicators.py")
# Task 12: Calculate Option Greeks
run_script("11_option_greeks.py")
# Task 14: Implement Data Quality Checks
run_script("13_data_quality.py")
# Task 15: Implement Logging and Monitoring (already integrated into other scripts)
logger.info("Post-market analysis job completed.")
# Main execution for Task 16.3
# This job would typically be scheduled to run once daily after market close (e.g., 16:00 IST)
# if __name__ == "__main__":
# post_market_analysis_job()
```
**Expected Output (CLI):**
```
INFO: Starting post-market analysis job...
INFO: Running script: 06_historical_intraday.py
... (output from 06_historical_intraday.py)
INFO: Running script: 07_historical_daily.py
... (output from 07_historical_daily.py)
INFO: Running script: 10_technical_indicators.py
... (output from 10_technical_indicators.py)
INFO: Running script: 11_option_greeks.py
... (output from 11_option_greeks.py)
INFO: Running script: 13_data_quality.py
... (output from 13_data_quality.py)
INFO: Post-market analysis job completed.
```
**Validation (for AI):**
* **CLI Check:** Verify that all scripts within the post-market job run successfully and report completion.
* **MongoDB Check:** After running, check the `historical_intraday`, `historical_daily`, `option_chain_data` (for updated Greeks), and `data_quality_issues` collections to ensure they are updated and reflect the expected content.
* **Scheduling:** Ensure the job runs only after market close and completes before the next trading day begins.
---
## Success Criteria for Each Phase
This section outlines the expected outcomes and validation points for each phase. An AI model should use these criteria to determine if a phase is successfully completed before proceeding to the next.
**Phase 1: Foundation Data (Tasks 2, 3, 4) Complete When:**
* **Task 2 (Fetch Instrument Master Data):**
* `instruments_master` collection in MongoDB contains all NSE_EQ and NSE_FNO instruments.
* `db.instruments_master.count_documents({"exchangeSegment": "NSE_EQ"})` returns > 10000 documents.
* `db.instruments_master.count_documents({"exchangeSegment": "NSE_FNO"})` returns > 10000 documents.
* Unique index on `securityId` and other specified indexes are created successfully.
* **Task 3 (Build Trading Universes):**
* `universes` collection contains documents with `_id: "ALL_INDEX_CONSTITUENT_STOCKS"` and `_id: "MAJOR_INDICES"`.
* `ALL_INDEX_CONSTITUENT_STOCKS` universe contains `securityId`s for all unique constituent stocks from Nifty 50, Bank Nifty, and Fin Nifty (typically 100-150 members).
* `MAJOR_INDICES` universe contains `securityId`s for NIFTY, BANKNIFTY, and FINNIFTY (3 members).
* **Task 4 (Compute Derivative Views):**
* `derivative_views` collection contains documents with `_id: "INDEX_FUTURES"`, `_id: "INDEX_OPTIONS"`, `_id: "STOCK_FUTURES"`, and `_id: "STOCK_OPTIONS"`.
* Each derivative view contains mappings for current and next month futures, and current/next weekly options for all relevant indices and F&O stocks.
* All `securityId`s referenced in derivative views exist in `instruments_master`.
**Phase 2: Real-Time Data Ingestion (Tasks 5, 6) Complete When:**
* **Task 5 (Implement WebSocket Client for Live Data):**
* The WebSocket client successfully connects to Dhan HQ API during market hours.
* The comprehensive subscription list (from Task 5.1) is successfully sent to the WebSocket.
* `live_ticks_full` collection in MongoDB accumulates tick-by-tick data for all subscribed instruments.
* `openInterest` is present for F&O instruments and absent/null for equities in `live_ticks_full`.
* Market hour gating correctly prevents WebSocket connection outside 09:15-15:30 IST.
* **Task 6 (Implement REST Parity Checks):**
* Periodic REST quote fetches complete without API errors.
* `ws_health` collection records parity check results.
* No significant discrepancies (LTP, OI) are consistently reported between WebSocket and REST data for key instruments.
**Phase 3: Historical Data for Context and Analysis (Tasks 7, 8) Complete When:**
* **Task 7 (Fetch Historical Intraday Data):**
* `historical_intraday` collection contains 1-minute and 5-minute OHLCV data for all instruments in the comprehensive subscription list.
* Data covers the specified lookback periods (e.g., last 5 days for 1-min, last 30 days for 5-min).
* `openInterest` is present for F&O instruments and absent/null for equities.
* No duplicate records exist for `(securityId, timestamp)` pairs.
* **Task 8 (Fetch Daily Historical Data):**
* `historical_daily` collection contains daily OHLCV data for all instruments in the comprehensive subscription list.
* Data covers the specified lookback period (e.g., last 90 days).
* No duplicate records exist for `(securityId, date)` pairs.
**Phase 4: Option Chain Data (Task 9) Complete When:**
* **Task 9 (Implement Option Chain Fetching):**
* `derivative_views` collection contains `_id: "DISCOVERED_EXPIRES"` with relevant weekly and monthly expiries for all major indices and F&O stocks.
* `option_chain_data` collection contains comprehensive option chain snapshots for all identified underlying assets and their relevant expiries.
* Each option chain snapshot includes all strike prices, along with LTP, IV, Greeks (if provided by API), OI, volume, and best bid/ask.
* The fetching process respects API rate limits, potentially leveraging dual API accounts for increased throughput.
**Phase 5: Account and Position Management (Task 10) Complete When:**
* **Task 10 (Implement Account Data Fetching):**
* `account_positions` collection contains current positions and holdings for both primary and secondary Dhan accounts, tagged with `account_id`.
* `order_history` collection contains historical orders for both accounts, tagged with `account_id`.
* `trade_history` collection contains historical trades for both accounts, tagged with `account_id`.
* Data is correctly parsed and stored with relevant fields (e.g., quantity, price, status, timestamp).
**Phase 6: Derived Data and Analytics (Tasks 11, 12) Complete When:**
* **Task 11 (Calculate Technical Indicators):**
* `historical_intraday` and `historical_daily` collections are updated with calculated basic (SMA, EMA, RSI) and advanced (MACD, Bollinger Bands, ADX, Stochastic) technical indicators for all relevant instruments.
* Indicator fields are correctly named and populated, with `NaN` values handled gracefully.
* **Task 12 (Calculate Option Greeks):**
* `option_chain_data` collection is updated with `calculated_` Greeks (Delta, Gamma, Theta, Vega, Rho) for each option leg.
* Calculated Greeks are consistent with market data and Black-Scholes model assumptions.
* Greeks are calculated only for options with valid IV and LTP.
**Phase 7: Dual API Implementation and Optimization (Task 13) Complete When:**
* **Task 13 (Implement Dual API Strategy):**
* `DhanAPIClient` class is fully implemented, managing two API accounts.
* The client successfully rotates between API keys for requests.
* Rate limit (HTTP 429) errors are handled gracefully, with temporary blacklisting and backoff/retry mechanisms.
* All API-fetching scripts (`01_fetch_instruments.py`, `05_rest_parity.py`, `06_historical_intraday.py`, `07_historical_daily.py`, `08_option_chains.py`, `09_account_data.py`) are successfully refactored to use the `DhanAPIClient`.
* Increased data throughput is observed, especially for `08_option_chains.py`.
**Phase 8: Monitoring and Quality Assurance (Tasks 14, 15) Complete When:**
* **Task 14 (Implement Data Quality Checks):**
* Schema validation runs for key collections (`instruments_master`, `live_ticks_full`, `historical_intraday`, `option_chain_data`, etc.).
* `data_quality_issues` collection records any schema violations or data type mismatches.
* Data freshness checks run for critical collections (`live_ticks_full`, `option_chain_data`).
* `data_quality_issues` collection records any stale data alerts.
* **Task 15 (Implement Logging and Monitoring):**
* A structured logging system is implemented and integrated across all scripts.
* `jobs_audit` collection in MongoDB receives detailed log entries for API calls, errors, and key events.
* Logs contain `timestamp`, `level`, `module`, `message`, and `extra` fields as expected.
**Final Integration and Testing (Task 16) Complete When:**
* **Task 16 (Create Master Orchestration Script):**
* `15_master_orchestrator.py` successfully runs the daily pre-market job, executing `01_fetch_instruments.py`, `02_build_universes.py`, and `03_derivative_views.py`.
* The real-time trading session job successfully starts and monitors `04_websocket_client.py` in the background.
* Periodic execution of `08_option_chains.py` and `09_account_data.py` occurs during market hours.
* The post-market analysis job successfully executes `06_historical_intraday.py`, `07_historical_daily.py`, `10_technical_indicators.py`, `11_option_greeks.py`, and `13_data_quality.py`.
* All jobs complete without unhandled exceptions, and their outputs are as expected in MongoDB.
* The entire data pipeline runs autonomously and reliably for a full trading day cycle.
视频信息
答案文本
视频字幕
Welcome to the comprehensive Dhan HQ API Data Fetching guide. This document provides a complete roadmap for AI-driven development of production-ready trading data systems. The architecture centers around MongoDB as a central data lake, utilizes dual API accounts for effective rate limit management, and implements real-time WebSocket connections for live market data. The system follows an 8-phase development approach covering foundation data, real-time ingestion, historical analysis, option chains, account management, analytics, optimization, and monitoring. Each component is critical for successful algorithmic trading operations.
Phase 1 establishes the critical foundation data architecture upon which all other components depend. This phase consists of three essential tasks. Task 2 fetches comprehensive instrument master data from both NSE equity and NSE futures and options segments, creating the instruments master collection with proper indexing strategies. Task 3 builds organized trading universes for Nifty 50, Bank Nifty, and Fin Nifty constituents, storing them in the universes collection. Task 4 computes derivative views that map underlying assets to their futures and options contracts, enabling efficient identification of all tradable instruments. This foundation is prerequisite for real-time subscriptions and option chain analysis, making it absolutely critical to complete before proceeding to subsequent phases.
Phase 2 presents the sophisticated real-time data ingestion system that forms the core of live trading operations. The WebSocket implementation handles tick-by-tick market data across thousands of instruments with proper market hour gating from 9:15 AM to 3:30 PM Indian Standard Time. The dual API client architecture manages rate limits effectively and ensures continuous data flow through automatic failover mechanisms. The system includes comprehensive subscription list generation covering indices, futures, options, and underlying stocks. Data parsing and MongoDB storage utilize proper buffering techniques for optimal performance. REST parity checks provide essential data validation to ensure accuracy. This real-time foundation enables immediate trading decisions and effective risk management, making it crucial for any production trading system.
Phases 3 through 6 provide comprehensive historical context and analytics essential for trading strategies and AI model training. Phase 3 and 4 cover historical data acquisition, including intraday one-minute and five-minute OHLCV data, daily historical data, and comprehensive option chain data with Greeks calculations. The data flow moves from raw historical prices through pandas technical analysis calculations to MongoDB storage. Phase 5 implements account and position management across dual accounts, tracking positions, orders, and trade history. Phase 6 focuses on technical indicator computation using pandas TA for indicators like SMA, EMA, RSI, MACD, Bollinger Bands, and ADX. Option Greeks are calculated using the Black-Scholes model with py vollib library. This analytical foundation supports backtesting, strategy development, and real-time decision making with proper technical analysis indicators, making it indispensable for algorithmic trading systems.
Phases 7 and 8 focus on advanced optimization and quality assurance systems that ensure production-ready reliability. Phase 7 details the dual API strategy implementation with the DhanAPIClient class that manages API rotation, handles rate limits gracefully, and provides resilience through automatic failover mechanisms. This significantly increases throughput and ensures continuous operation even when individual API limits are reached. Phase 8 implements comprehensive data quality checks including schema validation and freshness monitoring, along with a structured logging system that creates MongoDB audit trails for all operations. The master orchestration scripts manage three-tier job scheduling: pre-market foundation updates, real-time trading session management, and post-market analysis processing. These monitoring and alerting capabilities ensure system reliability and provide operational visibility essential for production trading environments.