Version: 1.0
Last Updated: November 2025
Author: Anand Mohan Singh
Repository: https://github.com/anandDev77/stock-sentiment-analysis
Target Audience: ML Engineers, Data Scientists, Software Developers, and ML Enthusiasts
Imagine you’re an investor trying to understand how the market feels about Apple’s stock. You could:
This application does all of that automatically in seconds:
Think of it as having a team of financial analysts working 24/7, reading every news article from multiple sources, remembering everything in a searchable database, and providing you with a comprehensive sentiment report that considers context from related articles.
This application demonstrates:
| Technology | Version | Purpose | Documentation |
|---|---|---|---|
| Python | 3.8+ | Programming language | Python Docs |
| Streamlit | Latest | Web framework for dashboard | Streamlit Docs |
| FastAPI | Latest | REST API framework | FastAPI Docs |
| Azure OpenAI | GPT-4 | Large Language Model for sentiment analysis | Azure OpenAI Docs |
| Azure AI Search | Latest | Vector database and search service (optional) | Azure AI Search Docs |
| Redis | 7.0+ | In-memory cache | Redis Docs |
| yfinance | Latest | Stock market data API (primary source) | yfinance Docs |
| Library | Purpose | Link |
|---|---|---|
| openai | Azure OpenAI SDK | OpenAI Python SDK |
| numpy | Numerical computations (cosine similarity, vector operations) | NumPy Docs |
| azure-search-documents | Azure AI Search SDK | Azure Search SDK |
| Source | Purpose | API/Library | Rate Limits |
|---|---|---|---|
| yfinance | Primary source - Stock prices, company info, news | yfinance library | None (public API) |
| Alpha Vantage | Company news | REST API | 500 calls/day (free tier) |
| Finnhub | Company news | REST API | 60 calls/minute (free tier) |
| Social media sentiment | PRAW library | 60 requests/minute |
Before reading this documentation, you should understand:
Recommended reading before diving deep:
For Complete Beginners:
For Experienced ML Practitioners:
The application follows a layered, modular architecture with clear separation of concerns and an API-driven design pattern.
This comprehensive diagram shows the entire application architecture with all components, data flows, and external dependencies:
Key Architectural Patterns:
The application is built with an API-driven architecture where the Streamlit frontend communicates exclusively with a FastAPI backend, which in turn orchestrates all service interactions.
This diagram shows how components interact during a typical request:
The application uses a clean API-driven architecture where the frontend never directly calls services:
The application can be deployed as separate services:
The application follows a modular, layered architecture with clear separation of concerns. This section documents every file in the codebase with its purpose, key functions, and code snippets.
src/stock_sentiment/
├── app.py # Main Streamlit orchestrator (entry point)
├── presentation/ # Presentation Layer (Frontend)
│ ├── styles.py # Custom CSS styling
│ ├── initialization.py # App setup, service initialization
│ ├── data_loader.py # Data loading orchestration (API-driven)
│ ├── api_client.py # HTTP client for FastAPI
│ ├── components/ # Reusable UI components
│ │ ├── sidebar.py # Sidebar with filters, settings
│ │ └── empty_state.py # Empty state component
│ └── tabs/ # Tab modules
│ ├── overview_tab.py # Overview dashboard
│ ├── price_analysis_tab.py # Price charts and analysis
│ ├── news_sentiment_tab.py # News and sentiment display
│ ├── technical_analysis_tab.py # Technical indicators
│ ├── ai_insights_tab.py # AI-generated insights
│ └── comparison_tab.py # Multi-stock comparison
├── api/ # API Layer (FastAPI Backend)
│ ├── main.py # FastAPI application
│ ├── dependencies.py # Dependency injection
│ ├── routes/ # API route handlers
│ │ ├── sentiment.py # Sentiment endpoints
│ │ ├── price.py # Price history endpoints
│ │ ├── comparison.py # Comparison endpoints
│ │ ├── system.py # System status endpoints
│ │ └── cache.py # Cache management endpoints
│ └── models/ # API response models
│ └── response.py # Pydantic response models
├── services/ # Service Layer (Business Logic)
│ ├── orchestrator.py # Core orchestration logic
│ ├── collector.py # Multi-source data collection
│ ├── sentiment.py # AI sentiment analysis
│ ├── rag.py # RAG service with hybrid search
│ ├── cache.py # Redis caching
│ └── vector_db.py # Azure AI Search integration
├── config/ # Configuration
│ └── settings.py # Pydantic settings management
├── models/ # Data Models
│ ├── sentiment.py # SentimentScores model
│ └── stock.py # StockData model
└── utils/ # Utilities
├── logger.py # Logging configuration
├── retry.py # Retry logic with exponential backoff
├── circuit_breaker.py # Circuit breaker pattern
├── preprocessing.py # Text preprocessing
├── validators.py # Input validation
└── ui_helpers.py # UI helper functions
This diagram shows how different components interact and depend on each other:
Key Relationships:
app.pyPurpose: Thin orchestrator that coordinates the Streamlit application flow. This is the entry point when running streamlit run app.py.
Key Responsibilities:
Key Code:
# Setup application
setup_app()
apply_custom_styles()
# Initialize settings and services
settings = initialize_settings()
api_client, redis_cache, rag_service, collector, analyzer = initialize_services(settings)
initialize_session_state()
# Render sidebar and get selected symbol
symbol = render_sidebar(redis_cache, rag_service, analyzer, settings, api_client)
# Load data if button clicked
if st.session_state.load_data and symbol:
if api_client and settings.app.api_enabled:
load_stock_data(symbol, api_client, settings)
# ... error handling ...
# Render tabs based on data availability
if data is None:
render_empty_state()
else:
with tab1:
render_overview_tab(data, news_sentiments, social_sentiments, current_symbol)
# ... other tabs ...
Dependencies:
Used By: Streamlit runtime (entry point)
The presentation layer handles all user interface concerns and communicates exclusively with the API layer.
presentation/initialization.pyPurpose: Centralized initialization of all application components, including settings, services, and session state.
Key Responsibilities:
Key Functions/Methods:
initialize_settings()Purpose: Load and validate application settings from environment variables
Code:
def initialize_settings():
"""Initialize and validate application settings."""
try:
settings = get_settings()
return settings
except ValueError as e:
st.error(f"Configuration Error: {e}")
st.stop()
return None
Returns: Settings instance or None if validation fails
initialize_services(settings)Purpose: Initialize all application services with proper dependency injection
Code:
def initialize_services(settings) -> Tuple[
Optional[SentimentAPIClient],
Optional[RedisCache],
Optional[RAGService],
Optional[StockDataCollector],
Optional[SentimentAnalyzer]
]:
"""
Initialize all application services.
If API mode is enabled, returns API client. Otherwise returns direct services.
"""
# Initialize API client if API mode is enabled
api_client = None
if settings.app.api_enabled:
api_client = get_api_client(settings)
# Initialize services for fallback or status display
redis_cache = get_redis_cache(settings)
rag_service = get_rag_service(settings, redis_cache)
collector = get_collector(settings, redis_cache)
analyzer = get_analyzer(settings, redis_cache, rag_service)
return api_client, redis_cache, rag_service, collector, analyzer
Parameters:
settings: Application settings instanceReturns: Tuple of (api_client, redis_cache, rag_service, collector, analyzer)
Dependencies:
config.settingsservices.* (for service initialization)presentation.api_clientUsed By: app.py
initialize_session_state()Purpose: Initialize all Streamlit session state variables with default values
Code:
def initialize_session_state():
"""Initialize Streamlit session state variables."""
defaults = {
'load_data': False,
'data': None,
'news_sentiments': [],
'social_sentiments': [],
'symbol': "AAPL",
'title_shown': False,
'data_errors': {},
'search_filters': {
"date_range": None,
"sources": None,
"exclude_unknown": True,
"days_back": None,
"data_sources": {
"yfinance": True,
"alpha_vantage": settings.data_sources.alpha_vantage_enabled,
"finnhub": settings.data_sources.finnhub_enabled,
"reddit": settings.data_sources.reddit_enabled
}
}
}
for key, default_value in defaults.items():
if key not in st.session_state:
st.session_state[key] = default_value
Dependencies: Streamlit, config.settings
Used By: app.py
presentation/data_loader.pyPurpose: Orchestrates the complete data loading and processing pipeline using the API client. This module ensures the dashboard is fully API-driven.
Key Responsibilities:
Key Functions/Methods:
load_stock_data(symbol, api_client, settings)Purpose: Main orchestration function that loads stock data via API
Code:
def load_stock_data(
symbol: str,
api_client: SentimentAPIClient,
settings
) -> bool:
"""
Load and process stock data with sentiment analysis using the API.
Args:
symbol: Stock symbol to analyze
api_client: SentimentAPIClient instance
settings: Application settings
Returns:
True if successful, False otherwise
"""
# Step 1: Preparing request
status_text.text("📊 Preparing analysis request...")
progress_bar.progress(0.2)
# Get data source filters from UI
data_source_filters = st.session_state.search_filters.get('data_sources')
enabled_sources = [k for k, v in data_source_filters.items() if v]
sources_param = ','.join(enabled_sources) if enabled_sources else 'yfinance'
# Step 2: Calling API
status_text.text("🌐 Calling sentiment analysis API...")
progress_bar.progress(0.4)
api_response = api_client.get_sentiment(
symbol=symbol,
detailed=True,
sources=sources_param,
cache_enabled=settings.app.cache_sentiment_enabled
)
# Step 3: Processing response
status_text.text("📊 Processing API response...")
progress_bar.progress(0.6)
# Extract data from API response
data = {
'price_data': api_response.get('price_data', {}),
'news': api_response.get('news', []),
'social_media': []
}
# Store in session state
st.session_state.data = data
st.session_state.news_sentiments = api_response.get('news_sentiments', [])
st.session_state.social_sentiments = api_response.get('social_sentiments', [])
st.session_state.symbol = symbol
progress_bar.progress(1.0)
status_text.empty()
return True
Parameters:
symbol: Stock symbol to analyze (e.g., “AAPL”)api_client: SentimentAPIClient instance for API communicationsettings: Application settings instanceReturns: bool - True if successful, False otherwise
Dependencies:
presentation.api_client.SentimentAPIClientutils.loggerutils.ui_helpersUsed By: app.py
presentation/api_client.pyPurpose: HTTP client for communicating with the FastAPI backend. Ensures all frontend interactions go through the API.
Key Responsibilities:
Key Functions/Methods:
get_sentiment(symbol, detailed=False, sources=None, cache_enabled=True)Purpose: Get sentiment analysis for a stock symbol
Code:
def get_sentiment(
self,
symbol: str,
detailed: bool = False,
sources: Optional[str] = None,
cache_enabled: bool = True
) -> Dict[str, Any]:
"""
Get sentiment analysis for a stock symbol.
Args:
symbol: Stock symbol to analyze
detailed: If True, returns detailed response with individual article sentiments
sources: Comma-separated list of data sources (yfinance,alpha_vantage,finnhub,reddit)
cache_enabled: Whether to use sentiment caching
Returns:
Dictionary with sentiment scores and optional detailed data
"""
endpoint = f"/sentiment/{symbol}"
params = {}
if detailed:
params['detailed'] = 'true'
if sources:
params['sources'] = sources
if not cache_enabled:
params['cache_enabled'] = 'false'
try:
response = self.client.get(endpoint, params=params, timeout=self.timeout)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.error(f"API request timeout for {symbol}")
raise
except httpx.RequestError as e:
logger.error(f"API request error: {e}")
raise
Parameters:
symbol: Stock symbol (e.g., “AAPL”)detailed: If True, includes individual article sentimentssources: Comma-separated source listcache_enabled: Whether to use sentiment cachingReturns: Dict[str, Any] - API response with sentiment data
Dependencies: httpx, config.settings, utils.logger
Used By: presentation.data_loader, presentation.tabs.*
presentation/components/sidebar.pyPurpose: Renders the sidebar UI component with all user controls, system status indicators, and configuration options.
Key Responsibilities:
Key Functions/Methods:
render_sidebar(redis_cache, rag_service, analyzer, settings, api_client)Purpose: Main function to render the complete sidebar
Code:
def render_sidebar(
redis_cache: Optional[Any],
rag_service: Optional[Any],
analyzer: Optional[Any],
settings,
api_client: Optional[SentimentAPIClient] = None
) -> str:
"""
Render the sidebar with all controls and status indicators.
Returns:
Selected stock symbol
"""
with st.sidebar:
# Stock symbol input
symbol = st.text_input(
"📊 Stock Symbol",
value=st.session_state.symbol,
key="stock_symbol"
).upper()
# System status indicators
_render_system_status(api_client, redis_cache, rag_service, settings)
# Search filters
_render_search_filters(settings)
# Sentiment cache controls
_render_sentiment_cache_controls(settings)
# Load button
if st.button("🚀 Load Data", type="primary"):
st.session_state.load_data = True
st.session_state.symbol = symbol
# Connection details
_render_connection_details(api_client, redis_cache, rag_service, settings)
# Summary log
_render_summary_log()
# Cache management
_render_cache_management(api_client, redis_cache, settings)
return symbol
Parameters:
redis_cache: RedisCache instance (for status display)rag_service: RAGService instance (for status display)analyzer: SentimentAnalyzer instance (for status display)settings: Application settingsapi_client: API client instance (for API mode)Returns: str - Selected stock symbol
Dependencies: Streamlit, presentation.api_client, config.settings
Used By: app.py
The service layer contains all business logic and is independent of the presentation layer. Services are called by the API layer.
services/orchestrator.pyPurpose: Core orchestration service that coordinates the complete sentiment analysis pipeline. This is the main entry point for business logic, used by both the API and can be used by the dashboard.
Key Responsibilities:
Key Functions/Methods:
get_aggregated_sentiment(symbol, collector, analyzer, rag_service, redis_cache, settings, data_source_filters, return_detailed)Purpose: Main orchestration function that runs the complete sentiment analysis pipeline
Code:
def get_aggregated_sentiment(
symbol: str,
collector: StockDataCollector,
analyzer: SentimentAnalyzer,
rag_service: Optional[RAGService] = None,
redis_cache: Optional[RedisCache] = None,
settings: Optional[Settings] = None,
data_source_filters: Optional[Dict[str, bool]] = None,
return_detailed: bool = False
) -> Dict[str, Any]:
"""
Get aggregated sentiment analysis for a stock symbol.
This function orchestrates the complete sentiment analysis pipeline:
1. Collect stock data and news from multiple sources
2. Store articles in RAG for context retrieval
3. Analyze sentiment for all articles with RAG context
4. Aggregate sentiment scores
Returns:
Dictionary with sentiment scores, operation summary, and optionally detailed data
"""
# Initialize operation summary
operation_summary = {
'redis_used': False,
'stock_cached': False,
'news_cached': False,
'sentiment_cache_hits': 0,
'sentiment_cache_misses': 0,
'rag_used': False,
'rag_queries': 0,
'rag_articles_found': 0,
'articles_stored': 0
}
# Step 1: Collect stock data and news
logger.info(f"📊 STEP 1: Collecting stock data and news for {symbol}...")
# Check cache status BEFORE collection
if redis_cache:
cached_stock = redis_cache.get_cached_stock_data(symbol)
if redis_cache.last_tier_used == "Redis":
operation_summary['stock_cached'] = True
cached_news = redis_cache.get_cached_news(symbol)
if redis_cache.last_tier_used == "Redis":
operation_summary['news_cached'] = True
# Collect data with source filters
data = collector.collect_all_data(symbol, data_source_filters=data_source_filters)
# Step 2: Store articles in RAG
logger.info(f"💾 STEP 2: Storing articles in RAG vector database...")
if rag_service and data['news']:
total_in_rag = rag_service.store_articles_batch(data['news'], symbol)
operation_summary['articles_stored'] = total_in_rag
# Step 3: Analyze sentiment
logger.info(f"🤖 STEP 3: Analyzing sentiment with AI...")
news_texts = [
article.get('summary', article.get('title', ''))
for article in data.get('news', [])
]
# Parallel sentiment analysis
sentiment_results = analyzer.batch_analyze(
news_texts,
symbol,
max_workers=settings.app.analysis_parallel_workers
)
# Track cache hits/misses
for result in sentiment_results:
if result.get('from_cache'):
operation_summary['sentiment_cache_hits'] += 1
else:
operation_summary['sentiment_cache_misses'] += 1
if result.get('rag_used'):
operation_summary['rag_used'] = True
operation_summary['rag_queries'] += 1
operation_summary['rag_articles_found'] += result.get('rag_articles_found', 0)
# Aggregate sentiment scores
if sentiment_results:
avg_positive = sum(r['positive'] for r in sentiment_results) / len(sentiment_results)
avg_negative = sum(r['negative'] for r in sentiment_results) / len(sentiment_results)
avg_neutral = sum(r['neutral'] for r in sentiment_results) / len(sentiment_results)
else:
avg_positive = avg_negative = avg_neutral = 0.0
# Build response
result = {
'symbol': symbol,
'positive': avg_positive,
'negative': avg_negative,
'neutral': avg_neutral,
'net_sentiment': avg_positive - avg_negative,
'dominant_sentiment': 'positive' if avg_positive > avg_negative and avg_positive > avg_neutral
else 'negative' if avg_negative > avg_neutral else 'neutral',
'sources_analyzed': len(sentiment_results),
'timestamp': datetime.now().isoformat(),
'operation_summary': operation_summary
}
# Add detailed data if requested
if return_detailed:
result['data'] = data
result['news_sentiments'] = sentiment_results
result['social_sentiments'] = [] # Can be extended
return result
Parameters:
symbol: Stock symbol to analyze (e.g., “AAPL”)collector: StockDataCollector instanceanalyzer: SentimentAnalyzer instancerag_service: RAGService instance (optional)redis_cache: RedisCache instance (optional)settings: Application settings (optional)data_source_filters: Dictionary of data source enable/disable flagsreturn_detailed: If True, includes raw data and individual sentiment scoresReturns: Dict[str, Any] - Aggregated sentiment with operation summary
Dependencies:
services.collector.StockDataCollectorservices.sentiment.SentimentAnalyzerservices.rag.RAGServiceservices.cache.RedisCacheconfig.settings.SettingsUsed By: api.routes.sentiment (API endpoints)
services/collector.pyPurpose: Handles multi-source data collection from various financial APIs and news sources.
Key Responsibilities:
Key Functions/Methods:
collect_all_data(symbol, data_source_filters=None)Purpose: Collect stock data and news from all enabled sources
Code:
def collect_all_data(
self,
symbol: str,
data_source_filters: Optional[Dict[str, bool]] = None
) -> Dict[str, Any]:
"""
Collect stock data and news from multiple sources.
Args:
symbol: Stock symbol to collect data for
data_source_filters: Dictionary of source enable/disable flags
Returns:
Dictionary with price_data and news articles
"""
# Get stock price data (always from yfinance)
price_data = self.get_stock_price(symbol)
# Collect news from enabled sources
all_articles = []
# yfinance (always enabled)
if not data_source_filters or data_source_filters.get('yfinance', True):
yfinance_news = self.get_news_headlines(symbol)
all_articles.extend(yfinance_news)
# Alpha Vantage (if enabled)
if (not data_source_filters or data_source_filters.get('alpha_vantage', False)) and \
self.settings.data_sources.alpha_vantage_enabled:
try:
av_news = self.get_alpha_vantage_news(symbol)
all_articles.extend(av_news)
except Exception as e:
logger.warning(f"Alpha Vantage news collection failed: {e}")
# Finnhub (if enabled)
if (not data_source_filters or data_source_filters.get('finnhub', False)) and \
self.settings.data_sources.finnhub_enabled:
try:
finnhub_news = self.get_finnhub_news(symbol)
all_articles.extend(finnhub_news)
except Exception as e:
logger.warning(f"Finnhub news collection failed: {e}")
# Reddit (if enabled)
if (not data_source_filters or data_source_filters.get('reddit', False)) and \
self.settings.data_sources.reddit_enabled:
try:
reddit_posts = self.get_reddit_sentiment_data(symbol)
all_articles.extend(reddit_posts)
except Exception as e:
logger.warning(f"Reddit data collection failed: {e}")
# Deduplicate articles
unique_articles = self._deduplicate_articles(all_articles)
# Cache results
if self.cache:
self.cache.cache_stock_data(symbol, price_data)
self.cache.cache_news(symbol, unique_articles)
return {
'price_data': price_data,
'news': unique_articles
}
Parameters:
symbol: Stock symbol (e.g., “AAPL”)data_source_filters: Optional dict of source enable/disable flagsReturns: Dict[str, Any] - Dictionary with price_data and news keys
Dependencies:
yfinance libraryservices.cache.RedisCacheUsed By: services.orchestrator.get_aggregated_sentiment
The API layer provides RESTful endpoints for the sentiment analysis service.
api/main.pyPurpose: FastAPI application entry point that sets up the API server, middleware, error handling, and routes.
Key Responsibilities:
Key Code:
# Create FastAPI app
app = FastAPI(
title="Stock Sentiment Analysis API",
description="REST API for AI-powered stock sentiment analysis.",
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""Log all API requests with timing."""
start_time = time.time()
logger.info(f"🌐 API Request: {request.method} {request.url.path}")
response = await call_next(request)
process_time = time.time() - start_time
logger.info(f"✅ API Response: {request.method} {request.url.path} - Time: {process_time:.3f}s")
return response
# Include routers
app.include_router(sentiment_router)
app.include_router(price_router)
app.include_router(comparison_router)
app.include_router(system_router)
app.include_router(cache_router)
# Health check endpoint
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
settings, redis_cache, rag_service, collector, analyzer = get_all_services()
services: Dict[str, str] = {}
overall_status = "healthy"
# Check Redis
if redis_cache and redis_cache.client:
try:
redis_cache.client.ping()
services["redis"] = "available"
except Exception:
services["redis"] = "unavailable"
overall_status = "degraded"
# Check RAG service
if rag_service and hasattr(rag_service, 'vector_db') and rag_service.vector_db:
services["rag"] = "available"
else:
services["rag"] = "not_configured"
overall_status = "degraded"
# Check Azure OpenAI
if analyzer and hasattr(analyzer, 'client') and analyzer.client:
services["azure_openai"] = "available"
else:
services["azure_openai"] = "not_configured"
overall_status = "degraded"
return HealthResponse(status=overall_status, services=services)
Dependencies:
api.dependenciesapi.models.responseUsed By: Uvicorn server (entry point: python -m stock_sentiment.api)
api/routes/sentiment.pyPurpose: Defines sentiment analysis API endpoints.
Key Responsibilities:
/sentiment/{symbol} endpoint/sentiment/batch endpointKey Functions/Methods:
GET /sentiment/{symbol}Purpose: Get sentiment analysis for a single stock symbol
Code:
@router.get(
"/{symbol}",
response_model=SentimentResponse,
summary="Get sentiment analysis for a stock symbol",
description="Analyzes sentiment for a stock symbol by collecting news, analyzing sentiment with AI, and returning aggregated scores."
)
async def get_sentiment(
symbol: str = Path(..., description="Stock ticker symbol (e.g., AAPL)"),
detailed: bool = Query(False, description="Include detailed data (articles, individual sentiments)"),
sources: Optional[str] = Query(None, description="Comma-separated data sources (yfinance,alpha_vantage,finnhub,reddit)"),
cache_enabled: bool = Query(True, description="Enable sentiment caching"),
settings: Settings = Depends(get_settings),
services: Tuple = Depends(get_all_services)
):
"""Get sentiment analysis for a stock symbol."""
_, redis_cache, rag_service, collector, analyzer = services
# Parse data source filters
data_source_filters = parse_data_source_filters(sources)
# Temporarily override cache setting if requested
original_cache_setting = settings.app.cache_sentiment_enabled
if not cache_enabled:
settings.app.cache_sentiment_enabled = False
try:
# Call orchestrator
result = get_aggregated_sentiment(
symbol=symbol.upper(),
collector=collector,
analyzer=analyzer,
rag_service=rag_service,
redis_cache=redis_cache,
settings=settings,
data_source_filters=data_source_filters,
return_detailed=detailed
)
return SentimentResponse(**result)
finally:
# Restore original cache setting
settings.app.cache_sentiment_enabled = original_cache_setting
Parameters:
symbol: Stock symbol (path parameter)detailed: Include detailed data (query parameter)sources: Comma-separated source list (query parameter)cache_enabled: Enable sentiment caching (query parameter)Returns: SentimentResponse - Pydantic model with sentiment scores
Dependencies:
services.orchestrator.get_aggregated_sentimentapi.dependencies.get_all_servicesapi.models.response.SentimentResponseUsed By: API clients (frontend, external services)
services/sentiment.pyPurpose: Provides AI-powered sentiment analysis using Azure OpenAI GPT-4 with RAG context and Redis caching.
Key Responsibilities:
Key Functions/Methods:
analyze_sentiment(text, symbol, context)Purpose: Analyze sentiment of text with optional RAG context
Code:
def analyze_sentiment(
self,
text: str,
symbol: Optional[str] = None,
context: Optional[List[Dict]] = None
) -> Dict[str, float]:
"""
Analyze sentiment of given text using Azure OpenAI with optional RAG context.
Args:
text: Text to analyze
symbol: Optional stock symbol for RAG context retrieval
context: Optional additional context items
Returns:
Dictionary with sentiment scores: {'positive': float, 'negative': float, 'neutral': float}
"""
# Check cache first
if self.cache:
cached_result = self.cache.get_cached_sentiment(text)
if cached_result:
return cached_result
# Retrieve RAG context if available
rag_context = None
if self.rag_service and symbol:
rag_context = self.rag_service.retrieve_relevant_context(
query=text,
symbol=symbol,
top_k=self.settings.app.rag_top_k
)
# Build prompt with context
prompt = self._build_prompt(text, rag_context, context)
# Call Azure OpenAI
try:
response = self.client.chat.completions.create(
model=self.deployment_name,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
temperature=0.2,
max_tokens=200,
response_format={"type": "json_object"}
)
# Parse response
result = json.loads(response.choices[0].message.content)
# Normalize scores
scores = SentimentScores(**result)
# Cache result
if self.cache:
self.cache.cache_sentiment(text, scores.dict())
return scores.dict()
except Exception as e:
logger.error(f"Azure OpenAI error: {e}")
# Fallback to TextBlob
return self._fallback_sentiment(text)
Parameters:
text: Text to analyzesymbol: Optional stock symbol for RAG contextcontext: Optional additional contextReturns: Dict[str, float] - Sentiment scores (positive, negative, neutral)
Dependencies:
services.rag.RAGServiceservices.cache.RedisCacheutils.preprocessingutils.retryutils.circuit_breakerUsed By: services.orchestrator.get_aggregated_sentiment
batch_analyze(texts, symbol, max_workers)Purpose: Analyze multiple texts in parallel for improved performance
Code:
def batch_analyze(
self,
texts: List[str],
symbol: Optional[str] = None,
max_workers: int = 5
) -> List[Dict[str, Any]]:
"""
Analyze multiple texts in parallel using ThreadPoolExecutor.
Args:
texts: List of texts to analyze
symbol: Optional stock symbol for RAG context
max_workers: Number of parallel workers
Returns:
List of sentiment analysis results with metadata
"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.analyze_sentiment, text, symbol): text
for text in texts
}
for future in as_completed(futures):
text = futures[future]
try:
result = future.result(timeout=self.settings.app.analysis_worker_timeout)
results.append({
'positive': result['positive'],
'negative': result['negative'],
'neutral': result['neutral'],
'text': text,
'from_cache': False # Would need to track this
})
except Exception as e:
logger.error(f"Error analyzing text: {e}")
results.append({
'positive': 0.0,
'negative': 0.0,
'neutral': 1.0,
'text': text,
'error': str(e)
})
return results
Parameters:
texts: List of text strings to analyzesymbol: Optional stock symbolmax_workers: Number of parallel workersReturns: List[Dict[str, Any]] - List of sentiment results with metadata
Dependencies: concurrent.futures.ThreadPoolExecutor
Used By: services.orchestrator.get_aggregated_sentiment
services/rag.pyPurpose: Provides Retrieval Augmented Generation (RAG) functionality for context-aware sentiment analysis using Azure OpenAI embeddings and Azure AI Search (or Redis fallback).
Key Responsibilities:
Key Functions/Methods:
store_articles_batch(articles, symbol, batch_size)Purpose: Store multiple articles with batch embedding generation
Code:
def store_articles_batch(
self,
articles: List[Dict],
symbol: str,
batch_size: Optional[int] = None
) -> int:
"""
Store multiple articles with embeddings in batch for efficiency.
Args:
articles: List of article dictionaries
symbol: Stock symbol
batch_size: Number of articles per batch (default: from settings)
Returns:
Total number of articles stored (existing + newly stored)
"""
if not articles:
return 0
# Check which articles already exist
existing_in_vector_db = 0
articles_to_process = []
if self.vector_db and self.vector_db.is_available():
# Check Azure AI Search for existing articles
vector_ids_to_check = [f"{symbol}_{self._get_article_id(article)}" for article in articles]
existing_ids = self.vector_db.batch_check_existing(vector_ids_to_check)
existing_in_vector_db = len(existing_ids)
# Filter out existing articles
articles_to_process = [
article for article in articles
if f"{symbol}_{self._get_article_id(article)}" not in existing_ids
]
else:
# Fallback: check Redis
articles_to_process = articles
if not articles_to_process:
return existing_in_vector_db
# Prepare texts for batch embedding
article_texts = [
f"{article.get('title', '')} {article.get('summary', '')}"
for article in articles_to_process
]
# Generate embeddings in batch
embeddings = self.get_embeddings_batch(
article_texts,
batch_size=batch_size,
use_cache=True # Cache embeddings
)
# Store in Azure AI Search or Redis
if self.vector_db and self.vector_db.is_available():
# Store in Azure AI Search
stored_count = self._store_in_azure_ai_search(
articles_to_process,
embeddings,
symbol
)
else:
# Fallback to Redis
stored_count = self._store_in_redis(
articles_to_process,
embeddings,
symbol
)
return existing_in_vector_db + stored_count
Parameters:
articles: List of article dictionariessymbol: Stock symbolbatch_size: Optional batch size overrideReturns: int - Total articles stored (existing + newly stored)
Dependencies:
services.vector_db.AzureAISearchVectorDBservices.cache.RedisCacheUsed By: services.orchestrator.get_aggregated_sentiment
retrieve_relevant_context(query, symbol, top_k, use_hybrid)Purpose: Retrieve relevant articles using hybrid search for RAG context
Code:
def retrieve_relevant_context(
self,
query: str,
symbol: str,
top_k: Optional[int] = None,
use_hybrid: bool = True,
date_range: Optional[tuple] = None,
sources: Optional[List[str]] = None,
exclude_sources: Optional[List[str]] = None,
days_back: Optional[int] = None,
expand_query: bool = True
) -> List[Dict]:
"""
Retrieve relevant articles using hybrid search (semantic + keyword).
Args:
query: Search query text
symbol: Stock symbol to filter by
top_k: Number of results to return
use_hybrid: Use hybrid search (semantic + keyword)
date_range: Optional date range filter
sources: Optional source filter
exclude_sources: Optional sources to exclude
days_back: Optional days back filter
expand_query: Expand query with synonyms
Returns:
List of relevant article dictionaries with similarity scores
"""
if not self.embeddings_enabled and not use_hybrid:
return []
# Use Azure AI Search if available
if self.vector_db and self.vector_db.is_available():
# Generate query embedding
query_embedding = self.get_embedding(query)
if not query_embedding:
return []
# Build filter string
filter_string = self._build_filter_string(
symbol=symbol,
date_range=date_range,
sources=sources,
exclude_sources=exclude_sources,
days_back=days_back
)
# Perform hybrid search
if use_hybrid:
results = self.vector_db.hybrid_search(
query_text=query,
query_vector=query_embedding,
top_k=top_k or self.settings.app.rag_top_k,
filter_string=filter_string
)
else:
results = self.vector_db.search_vectors(
query_vector=query_embedding,
top_k=top_k or self.settings.app.rag_top_k,
filter_string=filter_string
)
# Apply temporal decay
results = self._apply_temporal_decay(results)
return results
else:
# Fallback to Redis SCAN (slower but works)
return self._retrieve_from_redis(query, symbol, top_k)
Parameters:
query: Search query textsymbol: Stock symboltop_k: Number of resultsuse_hybrid: Enable hybrid searchReturns: List[Dict] - List of relevant articles with metadata
Dependencies:
services.vector_db.AzureAISearchVectorDBservices.cache.RedisCache (fallback)Used By: services.sentiment.SentimentAnalyzer.analyze_sentiment
config/settings.pyPurpose: Centralized configuration management using Pydantic for environment variable loading and validation.
Key Responsibilities:
.env fileKey Classes:
SettingsPurpose: Main settings class that aggregates all configuration
Code:
class Settings(BaseSettings):
"""Main application settings."""
app: AppSettings = Field(default_factory=AppSettings)
azure_openai: AzureOpenAISettings = Field(default_factory=AzureOpenAISettings)
redis: RedisSettings = Field(default_factory=RedisSettings)
azure_ai_search: Optional[AzureAISearchSettings] = Field(default=None)
data_sources: DataSourceSettings = Field(default_factory=DataSourceSettings)
def is_redis_available(self) -> bool:
"""Check if Redis is configured."""
return bool(self.redis.host and self.redis.password)
def is_rag_available(self) -> bool:
"""Check if RAG is available (requires embeddings)."""
return bool(
self.azure_openai.embedding_deployment and
self.azure_openai.endpoint and
self.azure_openai.api_key
)
def is_azure_ai_search_available(self) -> bool:
"""Check if Azure AI Search is configured."""
return bool(
self.azure_ai_search and
self.azure_ai_search.endpoint and
self.azure_ai_search.api_key
)
Key Nested Settings:
AppSettings: Application-level settings (cache TTLs, parallel workers, etc.)AzureOpenAISettings: Azure OpenAI configurationRedisSettings: Redis cache configurationAzureAISearchSettings: Azure AI Search configuration (optional)DataSourceSettings: Data source enable/disable flagsDependencies:
pydantic or pydantic-settingspython-dotenvUsed By: All modules throughout the application
services/cache.pyPurpose: Provides Redis-based caching layer to reduce API calls and improve performance.
Key Responsibilities:
Key Functions/Methods:
get(key)Purpose: Retrieve cached value by key
Code:
def get(self, key: str) -> Optional[Any]:
"""
Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None if not found
"""
if not self.client:
return None
try:
value = self.client.get(key)
if value:
self.last_tier_used = "Redis"
CacheStats.increment_hit(self.client)
return json.loads(value)
else:
CacheStats.increment_miss(self.client)
return None
except Exception as e:
logger.error(f"Error getting from cache: {e}")
return None
Parameters:
key: Cache key stringReturns: Optional[Any] - Cached value or None
Dependencies: Redis client
Used By: All services that need caching
get_cached_sentiment(text)Purpose: Get cached sentiment result for text (respects cache_sentiment_enabled setting)
Code:
def get_cached_sentiment(self, text: str) -> Optional[Dict[str, float]]:
"""
Get cached sentiment result.
Args:
text: Text that was analyzed
Returns:
Cached sentiment scores or None
"""
if not self.settings.app.cache_sentiment_enabled:
return None
key = self._generate_key("sentiment", text)
return self.get(key)
Parameters:
text: Text that was analyzedReturns: Optional[Dict[str, float]] - Cached sentiment scores
Dependencies: config.settings (checks cache_sentiment_enabled)
Used By: services.sentiment.SentimentAnalyzer
services/vector_db.pyPurpose: Provides vector database abstraction with Azure AI Search implementation for high-performance vector search.
Key Responsibilities:
Key Classes:
AzureAISearchVectorDBPurpose: Azure AI Search implementation of vector database interface
Key Methods:
hybrid_search(query_text, query_vector, top_k, filter_string)Purpose: Perform hybrid search combining semantic and keyword search
Code:
def hybrid_search(
self,
query_text: str,
query_vector: List[float],
top_k: int = 10,
filter_string: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Perform hybrid search (semantic + keyword) using Azure AI Search.
Azure AI Search automatically combines results using Reciprocal Rank Fusion (RRF).
Args:
query_text: Keyword search query
query_vector: Semantic search query vector
top_k: Number of results
filter_string: Optional OData filter
Returns:
List of results with RRF scores
"""
if not self.is_available():
return []
try:
# Perform hybrid search (Azure AI Search handles RRF internally)
results = self._client.search(
search_text=query_text, # Keyword search
vector_queries=[{
"vector": query_vector, # Semantic search
"k_nearest_neighbors": top_k,
"fields": "contentVector"
}],
filter=filter_string,
top=top_k,
select=["article_id", "title", "summary", "source", "timestamp", "symbol"]
)
# Format results with RRF scores
formatted = []
for result in results:
formatted.append({
"article_id": result.get("article_id", ""),
"title": result.get("title", ""),
"summary": result.get("summary", ""),
"source": result.get("source", ""),
"timestamp": result.get("timestamp", ""),
"similarity": result.get("@search.score", 0.0),
"rrf_score": result.get("@search.reranker_score", result.get("@search.score", 0.0))
})
return formatted
except Exception as e:
logger.error(f"Error in hybrid search: {e}")
return []
Parameters:
query_text: Keyword search textquery_vector: Semantic search vector (1536 dimensions)top_k: Number of resultsfilter_string: Optional OData filterReturns: List[Dict[str, Any]] - Hybrid search results with RRF scores
Dependencies: Azure AI Search SDK
Used By: services.rag.RAGService.retrieve_relevant_context
api/dependencies.pyPurpose: Provides dependency injection functions for FastAPI, initializing services as singletons without Streamlit dependencies.
Key Functions/Methods:
get_all_services()Purpose: Get all services initialized (main dependency for API routes)
Code:
def get_all_services() -> Tuple[
Settings,
Optional[RedisCache],
Optional[RAGService],
StockDataCollector,
Optional[SentimentAnalyzer]
]:
"""
Get all services initialized.
Returns:
Tuple of (settings, redis_cache, rag_service, collector, analyzer)
"""
settings = get_settings_cached()
redis_cache = get_redis_cache()
rag_service = get_rag_service()
collector = get_collector()
analyzer = get_analyzer()
return settings, redis_cache, rag_service, collector, analyzer
Returns: Tuple - All service instances
Dependencies: All service modules
Used By: All API route handlers
models/sentiment.pyPurpose: Defines data structures for sentiment analysis results.
Key Classes:
SentimentScoresPurpose: Represents sentiment scores as a normalized probability distribution
Code:
@dataclass
class SentimentScores:
"""
Sentiment scores for a piece of text.
Attributes:
positive: Positive sentiment score (0.0 to 1.0)
negative: Negative sentiment score (0.0 to 1.0)
neutral: Neutral sentiment score (0.0 to 1.0)
"""
positive: float
negative: float
neutral: float
def __post_init__(self):
"""Validate and normalize scores to sum to 1.0."""
# Normalize to ensure they sum to 1.0
total = self.positive + self.negative + self.neutral
if total > 0:
self.positive /= total
self.negative /= total
self.neutral /= total
@property
def net_sentiment(self) -> float:
"""Calculate net sentiment (positive - negative)."""
return self.positive - self.negative
@property
def dominant_sentiment(self) -> str:
"""Get the dominant sentiment label."""
if self.positive > self.negative and self.positive > self.neutral:
return "positive"
elif self.negative > self.positive and self.negative > self.neutral:
return "negative"
else:
return "neutral"
Properties:
net_sentiment: Calculated as positive - negative (-1.0 to 1.0)dominant_sentiment: Returns “positive”, “negative”, or “neutral”Dependencies: Python dataclasses
Used By: services.sentiment.SentimentAnalyzer, API response models
api/routes/price.pyPurpose: Defines API endpoints for historical price data.
Key Endpoints:
GET /price/{symbol}/historyPurpose: Get historical OHLCV (Open, High, Low, Close, Volume) data
Returns: PriceHistoryResponse - Historical price data with date, OHLCV values
Dependencies: yfinance library
Used By: Frontend price analysis tab
api/routes/comparison.pyPurpose: Defines API endpoints for multi-stock comparison.
Key Endpoints:
POST /comparison/insightsPurpose: Generate AI-powered comparison insights for multiple stocks
Returns: ComparisonInsightsResponse - AI-generated comparison insights
Dependencies: services.sentiment.SentimentAnalyzer
Used By: Frontend comparison tab
api/routes/system.pyPurpose: Defines API endpoints for system status and health checks.
Key Endpoints:
GET /system/statusPurpose: Get overall system status and service health (Redis, RAG, Azure OpenAI)
Returns: HealthResponse - System health status with service details
Dependencies: All services
Used By: Frontend sidebar (system status display)
api/routes/cache.pyPurpose: Defines API endpoints for cache management operations.
Key Endpoints:
POST /cache/stats/reset - Reset Redis cache statisticsPOST /cache/clear - Clear all data from Redis cacheDependencies: services.cache.RedisCache
Used By: Frontend sidebar (cache management)
api/models/response.pyPurpose: Defines Pydantic models for API request and response validation.
Key Models:
SentimentResponse - Response model for sentiment analysis endpointPriceHistoryResponse - Response model for price history endpointComparisonInsightsResponse - Response model for comparison insightsHealthResponse - Response model for health checksErrorResponse - Error response modelDependencies: Pydantic
Used By: All API endpoints
utils/logger.pyPurpose: Provides centralized logging configuration for the application.
Key Functions:
setup_logger(name, level) - Configure root logger with formatting and handlersget_logger(name) - Get logger instance for a moduleDependencies: Python logging module
Used By: All modules throughout the application
utils/retry.pyPurpose: Provides retry logic with exponential backoff for resilient API calls.
Key Functions:
retry_with_exponential_backoff(max_retries, base_delay)Purpose: Decorator for retrying functions with exponential backoff
Code:
def retry_with_exponential_backoff(
max_retries: int = 3,
base_delay: float = 1.0
):
"""
Retry decorator with exponential backoff.
Args:
max_retries: Maximum number of retry attempts
base_delay: Base delay in seconds (doubles with each retry)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
time.sleep(delay)
return wrapper
return decorator
Dependencies: Python time, functools
Used By: services.rag.RAGService, services.sentiment.SentimentAnalyzer
utils/circuit_breaker.pyPurpose: Implements circuit breaker pattern to prevent cascading failures.
Key Classes:
CircuitBreakerPurpose: Prevents API calls when service is down
Code:
class CircuitBreaker:
"""
Circuit breaker to prevent cascading failures.
Opens circuit after failure_threshold failures,
preventing further calls until timeout expires.
"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half_open
Dependencies: Python threading, time
Used By: services.sentiment.SentimentAnalyzer
utils/preprocessing.pyPurpose: Provides text preprocessing utilities for better embedding and analysis quality.
Key Functions:
preprocess_text(text, expand_abbreviations)Purpose: Clean and normalize text for analysis
Code:
def preprocess_text(text: str, expand_abbreviations: bool = True) -> str:
"""
Preprocess text for better analysis quality.
- Removes HTML tags
- Normalizes whitespace
- Expands financial abbreviations (Q1, Q2, EPS, etc.)
"""
# Remove HTML
text = re.sub(r'<[^>]+>', '', text)
# Normalize whitespace
text = ' '.join(text.split())
# Expand abbreviations
if expand_abbreviations:
abbreviations = {
'Q1': 'first quarter', 'Q2': 'second quarter',
'Q3': 'third quarter', 'Q4': 'fourth quarter',
'EPS': 'earnings per share', 'P/E': 'price to earnings'
}
for abbr, full in abbreviations.items():
text = text.replace(abbr, full)
return text
Dependencies: Python re
Used By: services.sentiment.SentimentAnalyzer, services.rag.RAGService
presentation/styles.pyPurpose: Provides custom CSS styling for the Streamlit application to enhance visual appearance.
Key Responsibilities:
Key Functions/Methods:
apply_custom_styles()Purpose: Apply custom CSS styles to the Streamlit app
Code:
def apply_custom_styles():
"""Apply custom CSS styles to the Streamlit app."""
import streamlit as st
st.markdown(CUSTOM_CSS, unsafe_allow_html=True)
Dependencies: Streamlit
Used By: app.py (called at startup)
Key CSS Features:
presentation/components/empty_state.pyPurpose: Renders an empty state component when no data is loaded.
Key Functions/Methods:
render_empty_state()Purpose: Display empty state with instructions and popular symbols
Code:
def render_empty_state():
"""Render the empty state when no data is loaded."""
st.markdown(
"""
<div class='empty-state'>
<h2>👆 Get Started</h2>
<p>Enter a stock symbol in the sidebar and click 'Load Data' to begin analysis</p>
<div>
<h4>💡 Popular Symbols</h4>
<div>
<span>AAPL</span>
<span>MSFT</span>
<span>GOOGL</span>
<span>TSLA</span>
<span>AMZN</span>
</div>
""",
unsafe_allow_html=True
)
Dependencies: Streamlit
Used By: app.py (when no data is available)
All tab modules follow a similar pattern: they receive data from session state and render visualizations using Streamlit and Plotly.
presentation/tabs/overview_tab.pyPurpose: Renders the overview dashboard tab with key metrics and sentiment visualizations.
Key Functions/Methods:
render_overview_tab(data, news_sentiments, social_sentiments, current_symbol)Purpose: Display overview dashboard with metrics and charts
Code:
def render_overview_tab(data, news_sentiments, social_sentiments, current_symbol):
"""Render the overview tab."""
# Aggregate sentiment scores
news_agg = aggregate_sentiments(news_sentiments)
price_data = data.get('price_data', {})
# Hero section with key metrics
st.markdown(f"""
<div style='background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 2rem; border-radius: 12px; color: white;'>
<h2>{price_data.get('company_name', current_symbol)}</h2>
<h1>${price_data.get('price', 0):.2f}</h1>
</div>
""", unsafe_allow_html=True)
# Key metrics in a grid
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Market Cap", f"${market_cap/1e9:.2f}B")
with col2:
st.metric("Net Sentiment", f"{net_sentiment:+.2%}")
with col3:
st.metric("Positive", f"{news_agg['positive']:.1%}")
with col4:
st.metric("Negative", f"{news_agg['negative']:.1%}")
# Sentiment visualization using Plotly
fig = px.bar(sentiment_df, x='Sentiment', y='Score', color='Sentiment')
st.plotly_chart(fig, use_container_width=True)
Parameters:
data: Stock data dictionarynews_sentiments: List of sentiment scores for news articlessocial_sentiments: List of sentiment scores for social mediacurrent_symbol: Current stock symbolDependencies: Streamlit, Plotly, Pandas
Used By: app.py
presentation/tabs/price_analysis_tab.pyPurpose: Renders price analysis tab with historical price charts.
Key Functions/Methods:
render_price_analysis_tab(current_symbol, api_client)Purpose: Display historical price charts using API client
Code:
def render_price_analysis_tab(current_symbol, api_client):
"""Render the price analysis tab."""
period = st.selectbox("📅 Time Period", ["1d", "5d", "1mo", "3mo", "6mo", "1y", "2y", "5y"])
# Get price history from API
price_data = api_client.get_price_history(current_symbol, period=period)
data_points = price_data.get('data', [])
if not data_points:
st.warning("⚠️ No price data available.")
return
# Create DataFrame and plot
hist_df = pd.DataFrame(data_points)
hist_df['date'] = pd.to_datetime(hist_df['date'])
# Create candlestick chart
fig = go.Figure(data=[go.Candlestick(
x=hist_df['date'],
open=hist_df['open'],
high=hist_df['high'],
low=hist_df['low'],
close=hist_df['close']
)])
st.plotly_chart(fig, use_container_width=True)
Parameters:
current_symbol: Stock symbolapi_client: API client instanceDependencies: Streamlit, Plotly, Pandas, presentation.api_client
Used By: app.py
presentation/tabs/news_sentiment_tab.pyPurpose: Renders news and sentiment tab with article list and sentiment scores.
Key Functions/Methods:
render_news_sentiment_tab(data, news_sentiments, current_symbol)Purpose: Display news articles with sentiment analysis results
Features:
Dependencies: Streamlit, utils.ui_helpers
Used By: app.py
presentation/tabs/technical_analysis_tab.pyPurpose: Renders technical analysis tab with technical indicators.
Key Functions/Methods:
render_technical_analysis_tab(current_symbol, api_client)Purpose: Display technical indicators (moving averages, RSI, MACD, etc.)
Features:
Dependencies: Streamlit, Plotly, Pandas, presentation.api_client
Used By: app.py
presentation/tabs/ai_insights_tab.pyPurpose: Renders AI-generated insights tab.
Key Functions/Methods:
render_ai_insights_tab(news_sentiments, current_symbol)Purpose: Display AI-generated insights based on sentiment analysis
Features:
Dependencies: Streamlit, services.sentiment.SentimentAnalyzer (via API)
Used By: app.py
presentation/tabs/comparison_tab.pyPurpose: Renders multi-stock comparison tab.
Key Functions/Methods:
render_comparison_tab(api_client)Purpose: Display comparison of multiple stocks
Code:
def render_comparison_tab(api_client):
"""Render the comparison tab."""
# Get comparison stocks from session state
compare_stocks = st.session_state.get('comparison_stocks', [])
if not compare_stocks:
st.info("Select stocks to compare in the sidebar")
return
# Collect sentiment for each stock via API
comparison_data = {}
comparison_sentiments = {}
for symbol in compare_stocks:
result = api_client.get_sentiment(symbol=symbol, detailed=True)
comparison_data[symbol] = {
'price_data': result.get('price_data', {}),
'news': result.get('news', [])
}
comparison_sentiments[symbol] = {
'positive': result.get('positive', 0.0),
'negative': result.get('negative', 0.0),
'neutral': result.get('neutral', 0.0)
}
# Generate AI insights via API
insights = api_client.get_comparison_insights(
comparison_data,
comparison_sentiments
)
st.markdown(insights)
Parameters:
api_client: API client instanceDependencies: Streamlit, presentation.api_client
Used By: app.py
models/stock.pyPurpose: Defines data models for stock information.
Note: This file may be minimal or use simple dictionaries. Stock data is typically represented as dictionaries with keys like symbol, price, company_name, market_cap, timestamp.
Used By: services.collector.StockDataCollector, API response models
utils/validators.pyPurpose: Provides validation utilities for input data (stock symbols, text content).
Key Functions/Methods:
validate_stock_symbol(symbol)Purpose: Validate stock ticker symbol format
Code:
def validate_stock_symbol(symbol: str) -> bool:
"""
Validate stock ticker symbol format.
Stock symbols are typically 1-5 uppercase letters.
Some may include numbers or dots (e.g., BRK.B).
Returns:
True if valid, False otherwise
"""
if not symbol or not isinstance(symbol, str):
return False
pattern = r'^[A-Z]{1,5}(\.[A-Z])?$'
return bool(re.match(pattern, symbol.upper()))
Parameters:
symbol: Stock ticker symbol to validateReturns: bool - True if valid
Dependencies: Python re
Used By: Input validation throughout the application
validate_text(text, min_length, max_length)Purpose: Validate text content for sentiment analysis
Parameters:
text: Text to validatemin_length: Minimum text lengthmax_length: Maximum text lengthReturns: bool - True if valid
Used By: Text preprocessing and validation
utils/ui_helpers.pyPurpose: Provides UI helper functions for enhanced user experience.
Key Functions/Methods:
show_toast(message, type, duration)Purpose: Display a toast notification
Code:
def show_toast(message: str, type: str = "success", duration: int = 3000) -> None:
"""
Display a toast notification.
Args:
message: Message to display
type: Type of toast (success, error, warning, info)
duration: Duration in milliseconds
"""
# Creates animated toast notification with CSS/JavaScript
st.markdown(f"""
<div style="position: fixed; top: 20px; right: 20px;
background: {color}; color: white; padding: 1rem;
border-radius: 8px; z-index: 9999;">
{message}
</div>
""", unsafe_allow_html=True)
Parameters:
message: Message to displaytype: Toast type (success, error, warning, info)duration: Display duration in millisecondsDependencies: Streamlit
Used By: Various presentation components
export_to_csv(data, news_sentiments, symbol)Purpose: Export data to CSV format
Returns: str - CSV string
Used By: News sentiment tab (export functionality)
export_to_json(data, news_sentiments, symbol)Purpose: Export data to JSON format
Returns: str - JSON string
Used By: News sentiment tab (export functionality)
filter_articles(articles, search_query, sentiment_filter, source_filter, date_range, sentiments)Purpose: Filter articles based on various criteria
Parameters:
articles: List of article dictionariessearch_query: Text to searchsentiment_filter: Filter by sentiment typesource_filter: List of sources to includedate_range: Tuple of (start_date, end_date)sentiments: List of sentiment dictionariesReturns: List[Dict] - Filtered articles
Used By: News sentiment tab (filtering functionality)
get_error_recovery_ui(error_message, retry_key)Purpose: Display error message with retry option
Returns: bool - True if retry was clicked
Used By: Error handling in data loader
generate_comparison_insights(comparison_data, comparison_sentiments, analyzer)Purpose: Generate AI-powered comparison insights
Returns: str - AI-generated insights text
Dependencies: services.sentiment.SentimentAnalyzer
Used By: Comparison tab
This section documents all major flows in the application with both layman’s explanations (for non-technical users) and technical deep dives (for developers). Each flow includes detailed Mermaid diagrams showing the complete process.
Imagine you’re at a restaurant. When you click “Load Data”, it’s like placing an order:
In technical terms: The frontend never directly talks to the “chefs” (services). Everything goes through the “waiter” (API), ensuring a clean separation and making the system easier to maintain and scale.
When a user clicks “Load Data” in the Streamlit dashboard:
app.py detects the button click and calls load_stock_data()presentation/api_client.py sends HTTP GET request to FastAPI/sentiment/{symbol} endpointapi/routes/sentiment.py calls services/orchestrator.pyservices/collector.py to fetch dataservices/rag.py to store articlesservices/sentiment.py for each articleKey Design Pattern: API-driven architecture ensures frontend is decoupled from business logic.
Think of the app as a news aggregator that reads multiple newspapers:
The app is smart: it remembers what it read before, so if you ask about the same stock again within a few hours, it doesn’t need to read the newspapers again - it just tells you what it remembers!
The data collection flow is handled by services/collector.py:
Key Features:
Imagine you have a library where you want to store books so you can find them later:
The “summary card” (embedding) is special: it captures the meaning of the article, not just keywords. So if you search for “Apple earnings”, it will find articles about “Apple profits” or “Apple quarterly results” even if they don’t use the exact word “earnings”.
The RAG storage flow is handled by services/rag.py in the store_articles_batch() method:
Key Optimizations:
Think of the AI as a financial analyst reading news articles:
The AI is smart: it doesn’t just read the article in isolation. It considers what it learned from similar articles before, making its analysis more accurate and consistent.
The sentiment analysis flow is handled by services/sentiment.py in the analyze_sentiment() method:
cache_sentiment_enabled is True)rag_service.retrieve_relevant_context() to find similar articlesParallel Processing: When analyzing multiple articles, batch_analyze() uses ThreadPoolExecutor to process articles concurrently (default: 5 workers).
Parallel Processing Flow (for multiple articles):
Imagine you’re looking for a book in a huge library:
The “meaning search” is powerful: if you search for “Apple earnings”, it will find articles about “Apple profits” or “Apple quarterly results” because they mean the same thing, even though the words are different.
The RAG retrieval flow is handled by services/rag.py in the retrieve_relevant_context() method:
Fallback Mechanism: If Azure AI Search is not available, falls back to Redis SCAN-based search (slower but works).
Detailed Hybrid Search Combination:
This section explains all mathematical formulas, algorithms, and concepts used in the application. Each concept includes the formula, step-by-step examples, rationale for its use, and visual diagrams.
Cosine similarity measures the angle between two vectors in high-dimensional space, providing a measure of semantic similarity regardless of vector magnitude.
Formula:
cos(θ) = (A · B) / (||A|| × ||B||)
Where:
A · B = Dot product of vectors A and B||A|| = Euclidean norm (magnitude) of vector A||B|| = Euclidean norm (magnitude) of vector Bθ = Angle between vectorsRange: -1.0 to 1.0 (typically 0.0 to 1.0 for embeddings, where 1.0 = identical, 0.0 = orthogonal)
Scenario: Compare two article embeddings to find similarity.
Given:
[0.5, 0.3, 0.8, 0.2, 0.9][0.4, 0.5, 0.7, 0.3, 0.8]Step 1: Calculate Dot Product
A · B = (0.5 × 0.4) + (0.3 × 0.5) + (0.8 × 0.7) + (0.2 × 0.3) + (0.9 × 0.8)
= 0.20 + 0.15 + 0.56 + 0.06 + 0.72
= 1.69
Step 2: Calculate Magnitudes
||A|| = √(0.5² + 0.3² + 0.8² + 0.2² + 0.9²)
= √(0.25 + 0.09 + 0.64 + 0.04 + 0.81)
= √1.83
= 1.353
||B|| = √(0.4² + 0.5² + 0.7² + 0.3² + 0.8²)
= √(0.16 + 0.25 + 0.49 + 0.09 + 0.64)
= √1.63
= 1.277
Step 3: Calculate Cosine Similarity
cos(θ) = 1.69 / (1.353 × 1.277)
= 1.69 / 1.727
= 0.979
Result: The articles are 97.9% similar (very similar content).
Real-World Example:
Reciprocal Rank Fusion combines multiple ranked lists without requiring score normalization, making it ideal for combining semantic and keyword search results.
Formula:
RRF_score(d) = Σ(1 / (k + rank_i(d)))
Where:
d = Document/articlerank_i(d) = Rank of document d in list i (1-indexed)k = RRF constant (typically 60, standard in literature)Σ = Sum over all ranked listsFor our hybrid search:
Scenario: Combine semantic and keyword search results for query “Apple earnings”.
Given:
Step 1: Calculate RRF Scores from Semantic List
Article A: RRF_semantic = 1 / (60 + 1) = 1/61 = 0.0164
Article B: RRF_semantic = 1 / (60 + 2) = 1/62 = 0.0161
Article C: RRF_semantic = 1 / (60 + 3) = 1/63 = 0.0159
Step 2: Calculate RRF Scores from Keyword List
Article D: RRF_keyword = 1 / (60 + 1) = 1/61 = 0.0164
Article A: RRF_keyword = 1 / (60 + 2) = 1/62 = 0.0161
Article E: RRF_keyword = 1 / (60 + 3) = 1/63 = 0.0159
Step 3: Combine RRF Scores
Article A: RRF_total = 0.0164 + 0.0161 = 0.0325 (appears in both!)
Article B: RRF_total = 0.0161 + 0 = 0.0161
Article C: RRF_total = 0.0159 + 0 = 0.0159
Article D: RRF_total = 0 + 0.0164 = 0.0164
Article E: RRF_total = 0 + 0.0159 = 0.0159
Step 4: Final Ranking
Result: Article A ranks highest because it appears in both semantic and keyword results, indicating high relevance.
Temporal decay boosts the relevance of recent articles, recognizing that financial news is time-sensitive. Older articles receive lower scores.
Formula:
decay = 1.0 / (1 + age_days / decay_days)
boosted_score = original_score × (1 + decay × boost_factor)
Where:
age_days = Age of article in days (from current time)decay_days = Half-life constant (default: 7 days)boost_factor = Maximum boost percentage (default: 0.2 = 20%)original_score = RRF score or similarity score before temporal boostDecay Behavior:
Scenario: Boost three articles with different ages for query “Apple earnings”.
Given:
decay_days = 7boost_factor = 0.2 (20%)Step 1: Calculate Age in Days
Article A: age_days = (2024-12-10 - 2024-12-10) = 0 days
Article B: age_days = (2024-12-10 - 2024-12-05) = 5 days
Article C: age_days = (2024-12-10 - 2024-11-20) = 20 days
Step 2: Calculate Decay Factor
Article A: decay = 1.0 / (1 + 0/7) = 1.0 / 1 = 1.0
Article B: decay = 1.0 / (1 + 5/7) = 1.0 / 1.714 = 0.583
Article C: decay = 1.0 / (1 + 20/7) = 1.0 / 3.857 = 0.259
Step 3: Calculate Boosted Score
Article A: boosted = 0.0325 × (1 + 1.0 × 0.2) = 0.0325 × 1.2 = 0.0390
Article B: boosted = 0.0320 × (1 + 0.583 × 0.2) = 0.0320 × 1.117 = 0.0357
Article C: boosted = 0.0330 × (1 + 0.259 × 0.2) = 0.0330 × 1.052 = 0.0347
Step 4: Re-rank by Boosted Score
Result: Recent Article A now ranks highest, even though Article C had a slightly higher original score.
decay_days parameter
Real-World Impact:
Sentiment scores must form a probability distribution (sum to 1.0) to represent the likelihood of each sentiment category.
Formula:
normalized_score_i = score_i / Σ(score_j)
Where:
score_i = Original score for sentiment i (positive, negative, or neutral)Σ(score_j) = Sum of all original scoresnormalized_score_i = Normalized score (0.0 to 1.0)Constraint: Σ(normalized_score_i) = 1.0
Scenario: Normalize sentiment scores from LLM response.
Given: LLM returns raw scores:
positive = 0.85negative = 0.10neutral = 0.05Step 1: Calculate Total
total = 0.85 + 0.10 + 0.05 = 1.00
Step 2: Normalize (divide by total)
normalized_positive = 0.85 / 1.00 = 0.85 (85%)
normalized_negative = 0.10 / 1.00 = 0.10 (10%)
normalized_neutral = 0.05 / 1.00 = 0.05 (5%)
Step 3: Verify Sum
0.85 + 0.10 + 0.05 = 1.00 ✓
Result: Scores are already normalized (sum = 1.0).
Example 2: Scores Don’t Sum to 1.0
Given: LLM returns:
positive = 0.90negative = 0.15neutral = 0.05Step 1: Calculate Total
total = 0.90 + 0.15 + 0.05 = 1.10
Step 2: Normalize
normalized_positive = 0.90 / 1.10 = 0.818 (81.8%)
normalized_negative = 0.15 / 1.10 = 0.136 (13.6%)
normalized_neutral = 0.05 / 1.10 = 0.045 (4.5%)
Step 3: Verify Sum
0.818 + 0.136 + 0.045 = 0.999 ≈ 1.00 ✓
Result: Scores normalized to sum to 1.0.
Example 3: Edge Case - All Zero Scores
Given: LLM returns invalid scores:
positive = 0.0negative = 0.0neutral = 0.0Step 1: Calculate Total
total = 0.0 + 0.0 + 0.0 = 0.0
Step 2: Handle Division by Zero (Fallback)
normalized_positive = 0.0
normalized_negative = 0.0
normalized_neutral = 1.0 (default to neutral)
Result: Default to neutral sentiment when scores are invalid.
Real-World Example:
{"positive": 0.95, "negative": 0.02, "neutral": 0.03} (sum = 1.0){"positive": 0.95, "negative": 0.02, "neutral": 0.03} (already normalized)Batch embedding generation processes multiple texts in a single API call, dramatically reducing latency and cost compared to individual calls.
Efficiency Formula:
API_calls_saved = N - ⌈N / batch_size⌉
Latency_reduction = (N × latency_per_call) - (⌈N / batch_size⌉ × batch_latency)
Cost_reduction = (N - ⌈N / batch_size⌉) × cost_per_call
Where:
N = Number of articles to embedbatch_size = Articles per batch (default: 100)latency_per_call = Time for single embedding call (~50ms)batch_latency = Time for batch call (~200ms for 100 articles)cost_per_call = Cost per API call (same regardless of batch size)Scenario: Generate embeddings for 250 articles.
Given:
Method 1: Individual Calls (Inefficient)
API calls = 250
Total latency = 250 × 50ms = 12,500ms = 12.5 seconds
Total cost = 250 × $0.0001 = $0.025
Method 2: Batch Processing (Efficient)
Batches needed = ⌈250 / 100⌉ = 3 batches
- Batch 1: 100 articles
- Batch 2: 100 articles
- Batch 3: 50 articles
API calls = 3
Total latency = 3 × 200ms = 600ms = 0.6 seconds
Total cost = 3 × $0.0001 = $0.0003
Efficiency Gains:
API calls saved = 250 - 3 = 247 calls (98.8% reduction)
Latency reduction = 12.5s - 0.6s = 11.9s (95.2% faster)
Cost reduction = $0.025 - $0.0003 = $0.0247 (98.8% cheaper)
Result: Batch processing is 20× faster and 83× cheaper!
Real-World Performance:
Conclusion: Batch processing is essential for production systems handling large volumes of articles.
HNSW (Hierarchical Navigable Small World) is an approximate nearest neighbor search algorithm used by Azure AI Search for fast vector similarity search in high-dimensional spaces.
Key Concepts:
Algorithm Steps:
1. Build multi-layer graph:
- Layer 0: All nodes, many connections (dense)
- Layer 1: Subset of nodes, fewer connections
- Layer 2: Even fewer nodes, even fewer connections
- ... (logarithmic decrease)
2. Search process:
- Start at top layer (sparse, fast navigation)
- Find entry point (closest to query)
- Navigate to nearest neighbor
- Move down to next layer
- Repeat until layer 0
- Return top K nearest neighbors
Complexity:
m: Connections per node (more = more accurate, slower)efConstruction: Build quality (higher = better index, slower build)efSearch: Search quality (higher = more accurate, slower search)
Search Process Example:
Real-World Performance (Azure AI Search with HNSW):
Configuration in Our System:
m = 16 (connections per node)efConstruction = 200 (build quality)efSearch = 50 (search quality)metric = COSINE (cosine similarity)Conclusion: HNSW enables fast, scalable vector search essential for production RAG systems.
This section documents all data sources used by the application, their configuration, rate limits, and data flow.
The application aggregates news and data from multiple sources to provide comprehensive sentiment analysis:
Data Flow:
User Request → Data Collector → Source Filtering → Parallel Collection → Deduplication → Normalization → Caching → Return
The following diagram shows how all data sources integrate into the application:
Integration Points:
Status: Primary source, always enabled
What it provides:
Configuration: No API key required (free)
Rate Limits:
Caching:
Data Format:
{
"title": "Article title",
"summary": "Article summary",
"source": "Publisher name",
"url": "https://...",
"timestamp": datetime
}
Usage Example:
collector = StockDataCollector(settings, redis_cache)
stock_data = collector.get_stock_price("AAPL")
news = collector.get_news_headlines("AAPL", limit=30)
Status: Optional, requires API key
What it provides:
Configuration:
DATA_SOURCE_ALPHA_VANTAGE_ENABLED=true
DATA_SOURCE_ALPHA_VANTAGE_API_KEY=your_api_key_here
Rate Limits:
Caching:
Data Format: Same as yfinance (normalized)
Enabling:
.env fileDATA_SOURCE_ALPHA_VANTAGE_ENABLED=trueStatus: Optional, requires API key
What it provides:
Configuration:
DATA_SOURCE_FINNHUB_ENABLED=true
DATA_SOURCE_FINNHUB_API_KEY=your_api_key_here
Rate Limits:
Caching:
Data Format: Same as yfinance (normalized)
Enabling:
.env fileDATA_SOURCE_FINNHUB_ENABLED=trueStatus: Optional, requires Reddit app credentials
What it provides:
Configuration:
DATA_SOURCE_REDDIT_ENABLED=true
DATA_SOURCE_REDDIT_CLIENT_ID=your_client_id
DATA_SOURCE_REDDIT_CLIENT_SECRET=your_client_secret
DATA_SOURCE_REDDIT_USER_AGENT=your_app_name/1.0
Rate Limits:
Caching:
Data Format: Same as news articles (normalized)
Enabling:
client_id and client_secret.env fileDATA_SOURCE_REDDIT_ENABLED=trueSubreddits Searched:
r/stocksr/investingr/StockMarketr/wallstreetbets (if enabled)Users can enable/disable individual sources via the dashboard sidebar or API parameters.
Dashboard UI: Checkboxes in sidebar to enable/disable sources
API Parameter: sources query parameter (comma-separated)
GET /sentiment/AAPL?sources=yfinance,alpha_vantage
Default Behavior:
yfinance enabledFiltering Flow:
Articles from multiple sources are deduplicated to avoid analyzing the same article multiple times.
Deduplication Strategy:
Benefits:
All articles from different sources are normalized to a standard format:
Normalization Rules:
title, headline, or content.title (first available)summary, description, content.summary, or selftexturl, canonicalUrl, or clickThroughUrltimestamp, published_time, datetime, or created_utc (normalize to UTC)This section explains the multi-tier caching strategy used to optimize performance and reduce API costs.
The application uses Redis as a single-tier cache (L2) for:
Why No L1 (In-Memory) Cache?
The application uses a single-tier Redis cache architecture optimized for Streamlit’s stateless nature:
Cache Strategy:
Purpose: Persistent caching across app reloads and API calls
What’s Cached:
Cache Key Structure:
stock:{symbol} # Stock price data
news:{symbol} # News articles
sentiment:{hash} # Sentiment analysis result
embedding:{hash} # Article embedding vector
article_hash:{symbol}:{id} # Duplicate marker
Time-To-Live (TTL) Configuration:
| Data Type | Default TTL | Configurable | Rationale |
|---|---|---|---|
| Stock Price | 1 hour | Yes | Prices change frequently, but not second-by-second |
| News Articles | 2 hours | Yes | News is time-sensitive, but doesn’t change every minute |
| Sentiment Results | 24 hours | Yes | Sentiment for same article doesn’t change |
| Article Embeddings | 7 days | Yes | Embeddings are stable, rarely need regeneration |
| Duplicate Markers | 7 days | Yes | Prevents re-storing same articles |
Configuration:
# Cache TTLs (in seconds)
CACHE_TTL_STOCK=3600 # 1 hour
CACHE_TTL_NEWS=7200 # 2 hours
CACHE_TTL_SENTIMENT=86400 # 24 hours
CACHE_TTL_RAG_ARTICLES=604800 # 7 days
Naming Convention: {type}:{identifier}
Examples:
stock:AAPL # Stock data for AAPL
news:AAPL # News articles for AAPL
sentiment:abc123def456 # Sentiment for text hash
embedding:xyz789uvw012 # Embedding for text hash
article_hash:AAPL:ed8c4cd1 # Duplicate marker
Hash Generation (for sentiment/embeddings):
import hashlib
text_hash = hashlib.md5(text.encode()).hexdigest()
cache_key = f"sentiment:{text_hash}"
Without Caching:
With Caching:
Real-World Example:
Viewing Cache Stats:
GET /cache/statsClearing Cache:
POST /cache/clearCache Invalidation:
Complete reference for all API endpoints, request/response formats, and error handling.
Development: http://localhost:8000
Production: Configure based on deployment
Currently, the API does not require authentication. For production deployments, consider adding API keys or OAuth.
The following diagram shows how API requests flow through the system:
Key Flow Points:
GET /health
Check API health and service availability.
Response:
{
"status": "healthy",
"services": {
"redis": "available",
"rag": "available",
"azure_openai": "available"
}
}
Status Codes:
200: Healthy503: Degraded (some services unavailable)GET /sentiment/{symbol}
Analyze sentiment for a stock symbol.
Path Parameters:
symbol (string, required): Stock ticker symbol (e.g., “AAPL”)Query Parameters:
sources (string, optional): Comma-separated data sources (yfinance,alpha_vantage,finnhub,reddit)cache_enabled (boolean, optional): Enable sentiment caching (default: true)detailed (boolean, optional): Return detailed response (default: false)Example Request:
GET /sentiment/AAPL?detailed=true&sources=yfinance,alpha_vantage
Response (Simple):
{
"symbol": "AAPL",
"positive": 0.65,
"negative": 0.20,
"neutral": 0.15,
"net_sentiment": 0.45,
"article_count": 30,
"operation_summary": {
"redis_used": true,
"rag_used": true,
"articles_stored": 5
}
}
Response (Detailed):
{
"symbol": "AAPL",
"positive": 0.65,
"negative": 0.20,
"neutral": 0.15,
"net_sentiment": 0.45,
"article_count": 30,
"price_data": {
"symbol": "AAPL",
"price": 175.50,
"company_name": "Apple Inc.",
"market_cap": 2800000000000
},
"news": [
{
"title": "Apple reports record earnings",
"summary": "...",
"source": "Yahoo Finance",
"url": "https://...",
"timestamp": "2024-12-10T10:00:00"
}
],
"news_sentiments": [
{
"positive": 0.85,
"negative": 0.10,
"neutral": 0.05
}
],
"operation_summary": {
"redis_used": true,
"rag_used": true,
"articles_stored": 5
}
}
Status Codes:
200: Success400: Invalid symbol500: Internal server errorGET /price/{symbol}/history
Get historical price data for a stock symbol.
Path Parameters:
symbol (string, required): Stock ticker symbolQuery Parameters:
period (string, optional): Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, default: 1y)Example Request:
GET /price/AAPL/history?period=6mo
Response:
{
"symbol": "AAPL",
"period": "6mo",
"data": [
{
"date": "2024-06-10",
"open": 170.00,
"high": 172.50,
"low": 169.50,
"close": 171.25,
"volume": 50000000
}
]
}
Status Codes:
200: Success400: Invalid symbol or period500: Internal server errorPOST /comparison/insights
Get AI-generated comparison insights for multiple stocks.
Request Body:
{
"symbols": ["AAPL", "MSFT", "GOOGL"],
"data": {
"AAPL": {
"price_data": {...},
"news": [...]
}
},
"sentiments": {
"AAPL": {
"positive": 0.65,
"negative": 0.20,
"neutral": 0.15
}
}
}
Response:
{
"insights": "Based on the analysis, AAPL shows the strongest positive sentiment...",
"comparison": {
"AAPL": {"rank": 1, "sentiment": "positive"},
"MSFT": {"rank": 2, "sentiment": "neutral"},
"GOOGL": {"rank": 3, "sentiment": "positive"}
}
}
Status Codes:
200: Success400: Invalid request500: Internal server errorGET /system/status
Get detailed system status for Redis, RAG, and Azure services.
Response:
{
"redis": {
"connected": true,
"host": "your-redis.redis.cache.windows.net"
},
"rag": {
"embeddings_enabled": true,
"vector_db_available": true
},
"azure_openai": {
"available": true,
"deployment": "gpt-4"
}
}
Status Codes:
200: Success500: Internal server errorGET /cache/stats
Get cache statistics (hits, misses, sets).
Response:
{
"cache_hits": 150,
"cache_misses": 50,
"cache_sets": 200,
"hit_rate": 0.75
}
POST /cache/clear
Clear all cache or specific cache entries.
Request Body:
{
"cache_type": "all", // or "sentiment", "news", "stock"
"symbol": "AAPL" // optional, for specific symbol
}
Response:
{
"message": "Cache cleared successfully",
"cleared_keys": 150
}
Status Codes:
200: Success400: Invalid request500: Internal server errorError Response Format:
{
"error": "Error message",
"detail": "Detailed error information",
"status_code": 400
}
Common Error Codes:
400: Bad Request (invalid parameters)404: Not Found (endpoint or resource not found)500: Internal Server Error (server-side error)503: Service Unavailable (dependency unavailable)Complete guide to configuring the application via environment variables and settings.
The application loads configuration in a specific order to ensure proper initialization:
Configuration Hierarchy:
Loading Order:
.env file: python-dotenv loads environment variablesBaseSettings classes parse and validate_settings variableget_settings() to access configurationRequired Variables:
# Azure OpenAI (Required)
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your_api_key
AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4
AZURE_OPENAI_EMBEDDING_DEPLOYMENT=text-embedding-ada-002
# Redis (Required)
REDIS_HOST=your-redis.redis.cache.windows.net
REDIS_PORT=6380
REDIS_PASSWORD=your_password
REDIS_SSL=true
Optional Variables:
# Azure AI Search (Optional - for vector database)
AZURE_AI_SEARCH_ENDPOINT=https://your-search.search.windows.net
AZURE_AI_SEARCH_API_KEY=your_api_key
AZURE_AI_SEARCH_INDEX_NAME=stock-articles
# Data Sources (Optional)
DATA_SOURCE_ALPHA_VANTAGE_ENABLED=false
DATA_SOURCE_ALPHA_VANTAGE_API_KEY=your_key
DATA_SOURCE_FINNHUB_ENABLED=false
DATA_SOURCE_FINNHUB_API_KEY=your_key
DATA_SOURCE_REDDIT_ENABLED=false
DATA_SOURCE_REDDIT_CLIENT_ID=your_id
DATA_SOURCE_REDDIT_CLIENT_SECRET=your_secret
# Application Settings (Optional)
APP_API_ENABLED=true
APP_API_HOST=0.0.0.0
APP_API_PORT=8000
APP_API_TIMEOUT=180
APP_CACHE_SENTIMENT_ENABLED=true
APP_CACHE_TTL_SENTIMENT=86400
APP_CACHE_TTL_STOCK=3600
APP_CACHE_TTL_NEWS=7200
APP_RAG_TOP_K=3
APP_RAG_BATCH_SIZE=100
APP_SENTIMENT_TEMPERATURE=0.2
APP_SENTIMENT_MAX_TOKENS=500
Purpose: Configure Azure OpenAI for sentiment analysis and embeddings
Variables:
AZURE_OPENAI_ENDPOINT: Your Azure OpenAI endpoint URLAZURE_OPENAI_API_KEY: Your API keyAZURE_OPENAI_DEPLOYMENT_NAME: GPT-4 deployment name (default: “gpt-4”)AZURE_OPENAI_EMBEDDING_DEPLOYMENT: Embedding model deployment (default: None, uses text-embedding-ada-002)Example:
AZURE_OPENAI_ENDPOINT=https://my-openai.openai.azure.com/
AZURE_OPENAI_API_KEY=abc123...
AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4
AZURE_OPENAI_EMBEDDING_DEPLOYMENT=text-embedding-ada-002
Purpose: Configure Redis cache for data and sentiment caching
Variables:
REDIS_HOST: Redis hostnameREDIS_PORT: Redis port (default: 6380 for Azure Redis)REDIS_PASSWORD: Redis passwordREDIS_SSL: Enable SSL (default: true for Azure Redis)Example:
REDIS_HOST=my-redis.redis.cache.windows.net
REDIS_PORT=6380
REDIS_PASSWORD=xyz789...
REDIS_SSL=true
Purpose: Configure vector database for RAG (optional)
Variables:
AZURE_AI_SEARCH_ENDPOINT: Azure AI Search endpointAZURE_AI_SEARCH_API_KEY: API keyAZURE_AI_SEARCH_INDEX_NAME: Index name (default: “stock-articles”)Example:
AZURE_AI_SEARCH_ENDPOINT=https://my-search.search.windows.net
AZURE_AI_SEARCH_API_KEY=def456...
AZURE_AI_SEARCH_INDEX_NAME=stock-articles
Note: If not configured, RAG falls back to Redis SCAN (slower but works).
Purpose: Enable/disable and configure data sources
Alpha Vantage:
DATA_SOURCE_ALPHA_VANTAGE_ENABLED=true
DATA_SOURCE_ALPHA_VANTAGE_API_KEY=your_key
Finnhub:
DATA_SOURCE_FINNHUB_ENABLED=true
DATA_SOURCE_FINNHUB_API_KEY=your_key
Reddit:
DATA_SOURCE_REDDIT_ENABLED=true
DATA_SOURCE_REDDIT_CLIENT_ID=your_id
DATA_SOURCE_REDDIT_CLIENT_SECRET=your_secret
DATA_SOURCE_REDDIT_USER_AGENT=MyApp/1.0
Purpose: Configure application behavior
API Settings:
APP_API_ENABLED=true # Enable FastAPI server
APP_API_HOST=0.0.0.0 # Bind address
APP_API_PORT=8000 # Port number
APP_API_TIMEOUT=180 # Request timeout (seconds)
Cache Settings:
APP_CACHE_SENTIMENT_ENABLED=true # Enable sentiment caching
APP_CACHE_TTL_SENTIMENT=86400 # 24 hours
APP_CACHE_TTL_STOCK=3600 # 1 hour
APP_CACHE_TTL_NEWS=7200 # 2 hours
RAG Settings:
APP_RAG_TOP_K=3 # Number of context articles
APP_RAG_BATCH_SIZE=100 # Embedding batch size
APP_RAG_TEMPORAL_DECAY_DAYS=7 # Temporal decay half-life
Sentiment Settings:
APP_SENTIMENT_TEMPERATURE=0.2 # LLM temperature (lower = more consistent)
APP_SENTIMENT_MAX_TOKENS=500 # Max tokens in response
AZURE_OPENAI_ENDPOINT=https://my-openai.openai.azure.com/
AZURE_OPENAI_API_KEY=abc123...
AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4
REDIS_HOST=my-redis.redis.cache.windows.net
REDIS_PASSWORD=xyz789...
# Azure OpenAI
AZURE_OPENAI_ENDPOINT=https://my-openai.openai.azure.com/
AZURE_OPENAI_API_KEY=abc123...
AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4
AZURE_OPENAI_EMBEDDING_DEPLOYMENT=text-embedding-ada-002
# Redis
REDIS_HOST=my-redis.redis.cache.windows.net
REDIS_PORT=6380
REDIS_PASSWORD=xyz789...
REDIS_SSL=true
# Azure AI Search (Optional)
AZURE_AI_SEARCH_ENDPOINT=https://my-search.search.windows.net
AZURE_AI_SEARCH_API_KEY=def456...
AZURE_AI_SEARCH_INDEX_NAME=stock-articles
# Data Sources
DATA_SOURCE_ALPHA_VANTAGE_ENABLED=true
DATA_SOURCE_ALPHA_VANTAGE_API_KEY=ghi789...
DATA_SOURCE_FINNHUB_ENABLED=true
DATA_SOURCE_FINNHUB_API_KEY=jkl012...
DATA_SOURCE_REDDIT_ENABLED=true
DATA_SOURCE_REDDIT_CLIENT_ID=mno345...
DATA_SOURCE_REDDIT_CLIENT_SECRET=pqr678...
# Application
APP_API_ENABLED=true
APP_CACHE_SENTIMENT_ENABLED=true
APP_RAG_TOP_K=3
The application validates all settings on startup:
Common Validation Errors:
Common issues, solutions, and best practices for using the application.
Use this decision tree to quickly identify and resolve common issues:
Symptoms:
Solutions:
.env:
REDIS_HOST=your-redis.redis.cache.windows.net
REDIS_PASSWORD=your_password
REDIS_SSL=true
redis-cli -h your-redis.redis.cache.windows.net -p 6380 -a your_password --tlsImpact: Caching disabled, but application still works (slower)
Symptoms:
Solutions:
.env:
AZURE_OPENAI_API_KEY=your_key
/ or be removed)Impact: Sentiment analysis fails, fallback to TextBlob (less accurate)
Symptoms:
Solutions:
Impact: No sentiment analysis possible (no data to analyze)
Symptoms:
Solutions:
AZURE_AI_SEARCH_ENDPOINT=https://...
AZURE_AI_SEARCH_API_KEY=...
Impact: Sentiment analysis works but without RAG context (less accurate)
Symptoms:
Solutions:
.env):
APP_API_TIMEOUT=300 # 5 minutes
Impact: Request fails, user must retry
Symptoms:
Explanation: This is expected behavior when sentiment caching is disabled. Each article undergoes:
With 30 articles and cache disabled = 30 RAG calls + 30 LLM calls.
Solutions:
APP_CACHE_SENTIMENT_ENABLED=true
Impact: Normal behavior, but slower without caching
Impact: 50-90% reduction in API calls and latency
Configuration:
APP_CACHE_SENTIMENT_ENABLED=true
APP_CACHE_TTL_SENTIMENT=86400 # 24 hours
Impact: 10-100× faster vector search vs. Redis SCAN
Configuration:
AZURE_AI_SEARCH_ENABLED=true
AZURE_AI_SEARCH_ENDPOINT=https://...
AZURE_AI_SEARCH_API_KEY=...
Impact: 3-5× faster sentiment analysis (already enabled)
Configuration:
APP_SENTIMENT_MAX_WORKERS=5 # Default
Impact: Faster data collection, fewer API calls
Configuration: Disable unused sources in dashboard or API
Impact: 100× reduction in embedding API calls (already enabled)
Configuration:
APP_RAG_BATCH_SIZE=100 # Default
Caching provides massive performance improvements with minimal cost:
Redis SCAN works but is slow for large datasets:
Data source APIs have rate limits:
Enable caching to stay within limits.
Balance freshness vs. performance:
Already enabled by default, but verify:
APP_SENTIMENT_MAX_WORKERS=5 # Adjust based on API limits
Application logs provide detailed insights:
Check logs regularly to identify issues early.
Q: Do I need all data sources enabled?
A: No. yfinance is sufficient for basic functionality. Other sources provide more comprehensive coverage.
Q: Can I use the API without the dashboard?
A: Yes. The API is independent and can be used by any client.
Q: Is Azure AI Search required?
A: No. It’s optional. RAG works with Redis fallback (slower but functional).
Q: How much does it cost to run?
A: Depends on usage:
Q: Can I deploy this in production?
A: Yes, but consider:
Q: How do I scale this?
A:
Q: What’s the difference between simple and detailed API response?
A:
Q: Can I customize sentiment analysis prompts?
A: Yes, edit src/stock_sentiment/services/sentiment.py to modify prompts.
Q: How do I add new data sources?
A:
settings.pycollector.pyVersion: 1.0
Last Updated: November 2025
Maintainer: Anand Mohan Singh