Realestate Newsletter Agent Langgraph
Try for free
This Actor is paid per event
Go to Store
Realestate Newsletter Agent Langgraph
gopalakrishnan/realestate-newsletter-agent-langgraph
Try for free
This Actor is paid per event
An autonomous Apify actor that generates comprehensive real estate market research reports by analyzing data from multiple authoritative sources.
Developer
Maintained by Community
Actor Metrics
1 monthly user
No reviews yet
No bookmarks yet
Created in Mar 2025
Modified 11 hours ago
Categories
.dockerignore
1.git
2.mise.toml
3.nvim.lua
4storage
5
6# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
7
8# Byte-compiled / optimized / DLL files
9__pycache__/
10*.py[cod]
11*$py.class
12
13# C extensions
14*.so
15
16# Distribution / packaging
17.Python
18build/
19develop-eggs/
20dist/
21downloads/
22eggs/
23.eggs/
24lib/
25lib64/
26parts/
27sdist/
28var/
29wheels/
30share/python-wheels/
31*.egg-info/
32.installed.cfg
33*.egg
34MANIFEST
35
36# PyInstaller
37# Usually these files are written by a python script from a template
38# before PyInstaller builds the exe, so as to inject date/other infos into it.
39*.manifest
40*.spec
41
42# Installer logs
43pip-log.txt
44pip-delete-this-directory.txt
45
46# Unit test / coverage reports
47htmlcov/
48.tox/
49.nox/
50.coverage
51.coverage.*
52.cache
53nosetests.xml
54coverage.xml
55*.cover
56*.py,cover
57.hypothesis/
58.pytest_cache/
59cover/
60
61# Translations
62*.mo
63*.pot
64
65# Django stuff:
66*.log
67local_settings.py
68db.sqlite3
69db.sqlite3-journal
70
71# Flask stuff:
72instance/
73.webassets-cache
74
75# Scrapy stuff:
76.scrapy
77
78# Sphinx documentation
79docs/_build/
80
81# PyBuilder
82.pybuilder/
83target/
84
85# Jupyter Notebook
86.ipynb_checkpoints
87
88# IPython
89profile_default/
90ipython_config.py
91
92# pyenv
93# For a library or package, you might want to ignore these files since the code is
94# intended to run in multiple environments; otherwise, check them in:
95.python-version
96
97# pdm
98# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
99#pdm.lock
100# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
101# in version control.
102# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
103.pdm.toml
104.pdm-python
105.pdm-build/
106
107# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
108__pypackages__/
109
110# Celery stuff
111celerybeat-schedule
112celerybeat.pid
113
114# SageMath parsed files
115*.sage.py
116
117# Environments
118.env
119.venv
120env/
121venv/
122ENV/
123env.bak/
124venv.bak/
125
126# Spyder project settings
127.spyderproject
128.spyproject
129
130# Rope project settings
131.ropeproject
132
133# mkdocs documentation
134/site
135
136# mypy
137.mypy_cache/
138.dmypy.json
139dmypy.json
140
141# Pyre type checker
142.pyre/
143
144# pytype static type analyzer
145.pytype/
146
147# Cython debug symbols
148cython_debug/
149
150# PyCharm
151# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
152# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
153# and can be added to the global gitignore or merged into this file. For a more nuclear
154# option (not recommended) you can uncomment the following to ignore the entire idea folder.
155.idea/
.gitignore
1.mise.toml
2.nvim.lua
3storage
4
5# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
6
7# Byte-compiled / optimized / DLL files
8__pycache__/
9*.py[cod]
10*$py.class
11
12# C extensions
13*.so
14
15# Distribution / packaging
16.Python
17build/
18develop-eggs/
19dist/
20downloads/
21eggs/
22.eggs/
23lib/
24lib64/
25parts/
26sdist/
27var/
28wheels/
29share/python-wheels/
30*.egg-info/
31.installed.cfg
32*.egg
33MANIFEST
34
35# PyInstaller
36# Usually these files are written by a python script from a template
37# before PyInstaller builds the exe, so as to inject date/other infos into it.
38*.manifest
39*.spec
40
41# Installer logs
42pip-log.txt
43pip-delete-this-directory.txt
44
45# Unit test / coverage reports
46htmlcov/
47.tox/
48.nox/
49.coverage
50.coverage.*
51.cache
52nosetests.xml
53coverage.xml
54*.cover
55*.py,cover
56.hypothesis/
57.pytest_cache/
58cover/
59
60# Translations
61*.mo
62*.pot
63
64# Django stuff:
65*.log
66local_settings.py
67db.sqlite3
68db.sqlite3-journal
69
70# Flask stuff:
71instance/
72.webassets-cache
73
74# Scrapy stuff:
75.scrapy
76
77# Sphinx documentation
78docs/_build/
79
80# PyBuilder
81.pybuilder/
82target/
83
84# Jupyter Notebook
85.ipynb_checkpoints
86
87# IPython
88profile_default/
89ipython_config.py
90
91# pyenv
92# For a library or package, you might want to ignore these files since the code is
93# intended to run in multiple environments; otherwise, check them in:
94.python-version
95
96# pdm
97# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
98#pdm.lock
99# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
100# in version control.
101# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
102.pdm.toml
103.pdm-python
104.pdm-build/
105
106# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
107__pypackages__/
108
109# Celery stuff
110celerybeat-schedule
111celerybeat.pid
112
113# SageMath parsed files
114*.sage.py
115
116# Environments
117.env
118.venv
119env/
120venv/
121ENV/
122env.bak/
123venv.bak/
124
125# Spyder project settings
126.spyderproject
127.spyproject
128
129# Rope project settings
130.ropeproject
131
132# mkdocs documentation
133/site
134
135# mypy
136.mypy_cache/
137.dmypy.json
138dmypy.json
139
140# Pyre type checker
141.pyre/
142
143# pytype static type analyzer
144.pytype/
145
146# Cython debug symbols
147cython_debug/
148
149# PyCharm
150# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
151# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
152# and can be added to the global gitignore or merged into this file. For a more nuclear
153# option (not recommended) you can uncomment the following to ignore the entire idea folder.
154.idea/
155
156# Added by Apify CLI
157node_modules
158src/data_analysis_logic.py
159src/data_parsing_logic.py
160src/data_validation_logic.py
161src/data_validation_logic1.py
requirements.txt
1apify<3.0.0
2langchain-openai==0.3.6
3langgraph==0.2.73
.actor/actor.json
1{
2 "actorSpecification": 1,
3 "name": "realestate-newsletter-agent-langgraph",
4 "title": "Python LangGraph Agent",
5 "description": "LangGraph agent in python",
6 "version": "0.0",
7 "buildTag": "latest",
8 "input": "./input_schema.json",
9 "storages": {
10 "dataset": "./dataset_schema.json"
11 },
12 "meta": {
13 "templateId": "python-langgraph"
14 },
15 "dockerfile": "./Dockerfile"
16}
.actor/dataset_schema.json
1{
2 "actorSpecification": 1,
3 "views": {
4 "overview": {
5 "title": "Overview",
6 "transformation": {
7 "fields": ["response", "structured_response"]
8 },
9 "display": {
10 "component": "table",
11 "properties": {
12 "response": {
13 "label": "Response",
14 "format": "text"
15 },
16 "structured_response": {
17 "label": "Structured Response",
18 "format": "object"
19 }
20 }
21 }
22 }
23 }
24}
.actor/Dockerfile
1# First, specify the base Docker image.
2# You can see the Docker images from Apify at https://hub.docker.com/r/apify/.
3# You can also use any other image from Docker Hub.
4FROM apify/actor-python:3.13
5
6# Second, copy just requirements.txt into the Actor image,
7# since it should be the only file that affects the dependency installation in the next step,
8# in order to speed up the build.
9COPY requirements.txt ./
10
11# Install the packages specified in requirements.txt,
12# print the installed Python version, pip version,
13# and all installed packages with their versions for debugging.
14RUN echo "Python version:" \
15 && python --version \
16 && echo "Pip version:" \
17 && pip --version \
18 && echo "Installing dependencies:" \
19 && pip install -r requirements.txt \
20 && echo "All installed Python packages:" \
21 && pip freeze
22
23# Next, copy the remaining files and directories with the source code.
24# Since we do this after installing the dependencies, quick builds will be really fast
25# for most source file changes.
26COPY . ./
27
28# Use compileall to ensure the runnability of the Actor Python code.
29RUN python3 -m compileall -q .
30
31# Create and run as a non-root user.
32RUN useradd --create-home apify && \
33 chown -R apify:apify ./
34USER apify
35
36# Specify how to launch the source code of your Actor.
37# By default, the "python3 -m ." command is run.
38CMD ["python3", "-m", "src"]
.actor/input_schema.json
1{
2 "title": "Real Estate Newsletter Agent",
3 "type": "object",
4 "schemaVersion": 1,
5 "properties": {
6 "location": {
7 "title": "Location",
8 "type": "string",
9 "description": "City and State (e.g. 'San Jose, CA')",
10 "editor": "textfield"
11 },
12 "openaiApiKey": {
13 "title": "OpenAI API Key",
14 "type": "string",
15 "description": "Your OpenAI API key",
16 "editor": "textfield",
17 "isSecret": true
18 },
19 "apifyApiKey": {
20 "title": "Apify API Key",
21 "type": "string",
22 "description": "Your Apify API key",
23 "editor": "textfield",
24 "isSecret": true
25 },
26 "debug": {
27 "title": "Debug Mode",
28 "type": "boolean",
29 "description": "Enable debug logging",
30 "default": false
31 }
32 },
33 "required": ["location", "openaiApiKey", "apifyApiKey"]
34}
.actor/pay_per_event.json
1{
2 "actor-start": {
3 "eventTitle": "Price for Actor start",
4 "eventDescription": "Flat fee for starting an Actor run.",
5 "eventPriceUsd": 0.1
6 },
7 "task-completed": {
8 "eventTitle": "Price for completing the task",
9 "eventDescription": "Flat fee for completing the task.",
10 "eventPriceUsd": 0.4
11 },
12 "search-init": {
13 "eventTitle": "Search Initialization",
14 "eventDescription": "Charged when a new market search is initiated for a location",
15 "eventPriceUsd": 0.02
16 },
17 "url-processed": {
18 "eventTitle": "URL Processing",
19 "eventDescription": "Charged per validated URL from real estate sources (Zillow, Redfin, Realtor, Rocket)",
20 "eventPriceUsd": 0.02
21 },
22 "data-extracted": {
23 "eventTitle": "Data Extraction",
24 "eventDescription": "Charged per source when market data is successfully extracted from the webpage",
25 "eventPriceUsd": 0.02
26 },
27 "market-analyzed": {
28 "eventTitle": "Market Analysis",
29 "eventDescription": "Charged per source when market data is successfully analyzed and validated",
30 "eventPriceUsd": 0.02
31 },
32 "newsletter-generated": {
33 "eventTitle": "Newsletter Generation",
34 "eventDescription": "Charged for generating the final market analysis newsletter with compiled insights",
35 "eventPriceUsd": 0.50
36 }
37}
src/main.py
1"""
2Main entry point for the Apify Actor.
3Orchestrates the autonomous real estate market research process.
4"""
5
6from __future__ import annotations
7
8import logging
9import os
10from apify import Actor
11from apify_client import ApifyClient
12from openai import AsyncOpenAI
13
14from .agents.search_agent import SearchAgent
15from .agents.extraction_agent import ExtractionAgent
16from .agents.analysis_agent import AnalysisAgent
17from .agents.newsletter_agent import NewsletterAgent
18from .models.schemas import AgentState
19
20# Configure logging
21logging.basicConfig(level=logging.INFO)
22logger = logging.getLogger(__name__)
23
24async def main() -> None:
25 """Main entry point for the Apify Actor."""
26 async with Actor:
27 logger.info("Starting real estate market analysis actor")
28
29 # Get input
30 actor_input = await Actor.get_input() or {}
31 logger.info(f"Received input with keys: {', '.join(actor_input.keys())}")
32
33 # Set API keys from input
34 openai_api_key = actor_input.get("openaiApiKey")
35 apify_api_key = actor_input.get("apifyApiKey")
36
37 if not openai_api_key:
38 logger.error("OpenAI API key is required in input")
39 return
40 logger.info("OpenAI API key validated")
41
42 if not apify_api_key:
43 logger.error("Apify API key is required in input")
44 return
45 logger.info("Apify API key validated")
46
47 # Get location
48 location = actor_input.get("location")
49 if not location:
50 logger.error("Location is required")
51 return
52 logger.info(f"Processing location: {location}")
53
54 # Set environment variables for API keys
55 os.environ["OPENAI_API_KEY"] = openai_api_key
56 os.environ["APIFY_TOKEN"] = apify_api_key
57 logger.info("Environment variables set")
58
59 # Initialize OpenAI client
60 try:
61 openai_client = AsyncOpenAI(api_key=openai_api_key)
62 logger.info("OpenAI client initialized")
63 except Exception as e:
64 logger.error(f"Failed to initialize OpenAI client: {str(e)}")
65 return
66
67 # Initialize Apify client
68 try:
69 apify_client = ApifyClient(token=apify_api_key)
70 logger.info("Apify client initialized")
71 except Exception as e:
72 logger.error(f"Failed to initialize Apify client: {str(e)}")
73 return
74
75 # Initialize agents with shared OpenAI client
76 try:
77 newsletter_agent = NewsletterAgent(client=openai_client)
78 search_agent = SearchAgent(client=openai_client)
79 extraction_agent = ExtractionAgent(client=openai_client)
80 analysis_agent = AnalysisAgent(client=openai_client)
81 logger.info("All agents initialized successfully")
82 except Exception as e:
83 logger.error(f"Failed to initialize agents: {str(e)}")
84 return
85
86 try:
87 # Execute workflow
88 logger.info("Starting source search...")
89 urls = await search_agent.find_sources(location)
90 if not urls:
91 logger.error("No valid URLs found for the location")
92 return
93 logger.info(f"Found {len(urls)} valid URLs")
94
95 logger.info("Starting data extraction...")
96 market_data = await extraction_agent.extract_data(urls)
97 if not market_data:
98 logger.error("No market data could be extracted from URLs")
99 return
100 logger.info(f"Extracted market data from {len(market_data) if isinstance(market_data, list) else len(market_data.keys())} sources")
101
102 logger.info("Starting market analysis...")
103 analysis = await analysis_agent.analyze_market(market_data)
104 if not analysis:
105 logger.error("Market analysis failed to produce results")
106 return
107 logger.info("Market analysis completed successfully")
108
109 logger.info("Generating newsletter...")
110 newsletter = await newsletter_agent.generate_newsletter(location, market_data, analysis)
111 if not newsletter:
112 logger.error("Newsletter generation failed")
113 return
114 logger.info("Newsletter generated successfully")
115
116 # Save output
117 logger.info("Saving results...")
118 await Actor.push_data({
119 "location": location,
120 "filtered_urls": urls,
121 "market_data": market_data,
122 "analysis": analysis,
123 "newsletter": newsletter
124 })
125 logger.info("Results saved successfully")
126
127 except Exception as e:
128 logger.error(f"Actor failed with error: {str(e)}")
129 logger.exception("Detailed error traceback:")
130 raise
131
132if __name__ == "__main__":
133 Actor.main(main)
src/models.py
1"""This module defines Pydantic models for this project.
2
3These models are used mainly for the structured tool and LLM outputs.
4Resources:
5- https://docs.pydantic.dev/latest/concepts/models/
6"""
7
8from __future__ import annotations
9
10from pydantic import BaseModel
11
12
13class InstagramPost(BaseModel):
14 """Instagram Post Pydantic model.
15
16 Returned as a structured output by the `tool_scrape_instagram_profile_posts` tool.
17
18 url: The URL of the post.
19 likes: The number of likes on the post.
20 comments: The number of comments on the post.
21 timestamp: The timestamp when the post was published.
22 caption: The post caption.
23 alt: The post alt text.
24 """
25
26 url: str
27 likes: int
28 comments: int
29 timestamp: str
30 caption: str | None = None
31 alt: str | None = None
32
33
34class AgentStructuredOutput(BaseModel):
35 """Structured output for the ReAct agent.
36
37 Returned as a structured output by the ReAct agent.
38
39 total_likes: The total number of likes on the most popular posts.
40 total_comments: The total number of comments on the most popular posts.
41 most_popular_posts: A list of the most popular posts.
42 """
43
44 total_likes: int
45 total_comments: int
46 most_popular_posts: list[InstagramPost]
src/tools.py
1"""This module defines the tools used by the agent.
2
3Feel free to modify or add new tools to suit your specific needs.
4
5To learn how to create a new tool, see:
6- https://python.langchain.com/docs/concepts/tools/
7- https://python.langchain.com/docs/how_to/#tools
8"""
9
10from __future__ import annotations
11
12from apify import Actor
13from langchain_core.tools import tool
14
15from src.models import InstagramPost
16
17
18@tool
19def tool_calculator_sum(numbers: list[int]) -> int:
20 """Tool to calculate the sum of a list of numbers.
21
22 Args:
23 numbers (list[int]): List of numbers to sum.
24
25 Returns:
26 int: Sum of the numbers.
27 """
28 return sum(numbers)
29
30
31@tool
32async def tool_scrape_instagram_profile_posts(handle: str, max_posts: int = 30) -> list[InstagramPost]:
33 """Tool to scrape Instagram profile posts.
34
35 Args:
36 handle (str): Instagram handle of the profile to scrape (without the '@' symbol).
37 max_posts (int, optional): Maximum number of posts to scrape. Defaults to 30.
38
39 Returns:
40 list[InstagramPost]: List of Instagram posts scraped from the profile.
41
42 Raises:
43 RuntimeError: If the Actor fails to start.
44 """
45 run_input = {
46 'directUrls': [f'https://www.instagram.com/{handle}/'],
47 'resultsLimit': max_posts,
48 'resultsType': 'posts',
49 'searchLimit': 1,
50 }
51 if not (run := await Actor.apify_client.actor('apify/instagram-scraper').call(run_input=run_input)):
52 msg = 'Failed to start the Actor apify/instagram-scraper'
53 raise RuntimeError(msg)
54
55 dataset_id = run['defaultDatasetId']
56 dataset_items: list[dict] = (await Actor.apify_client.dataset(dataset_id).list_items()).items
57 posts: list[InstagramPost] = []
58 for item in dataset_items:
59 url: str | None = item.get('url')
60 caption: str | None = item.get('caption')
61 alt: str | None = item.get('alt')
62 likes: int | None = item.get('likesCount')
63 comments: int | None = item.get('commentsCount')
64 timestamp: str | None = item.get('timestamp')
65
66 # only include posts with all required fields
67 if not url or not likes or not comments or not timestamp:
68 Actor.log.warning('Skipping post with missing fields: %s', item)
69 continue
70
71 posts.append(
72 InstagramPost(
73 url=url,
74 likes=likes,
75 comments=comments,
76 timestamp=timestamp,
77 caption=caption,
78 alt=alt,
79 )
80 )
81
82 return posts
src/utils.py
1from apify import Actor
2from langchain_core.messages import ToolMessage
3
4
5def log_state(state: dict) -> None:
6 """Logs the state of the graph.
7
8 Uses the `Actor.log.debug` method to log the state of the graph.
9
10 Args:
11 state (dict): The state of the graph.
12 """
13 message = state['messages'][-1]
14 # Traverse all tool messages and print them
15 # if multiple tools are called in parallel
16 if isinstance(message, ToolMessage):
17 # Until the analyst message with tool_calls
18 for _message in state['messages'][::-1]:
19 if hasattr(_message, 'tool_calls'):
20 break
21 Actor.log.debug('-------- Tool Result --------')
22 Actor.log.debug('Tool: %s', _message.name)
23 Actor.log.debug('Result: %s', _message.content)
24
25 Actor.log.debug('-------- Message --------')
26 Actor.log.debug('Message: %s', message)
27
28 # Print all tool calls
29 if hasattr(message, 'tool_calls'):
30 for tool_call in getattr(message, 'tool_calls', []):
31 Actor.log.debug('-------- Tool Call --------')
32 Actor.log.debug('Tool: %s', tool_call['name'])
33 Actor.log.debug('Args: %s', tool_call['args'])
src/__init__.py
1"""Real Estate Newsletter Agent package."""
src/__main__.py
1import asyncio
2
3from .main import main
4
5# Execute the Actor entry point.
6asyncio.run(main())
src/agents/analysis_agent.py
1"""
2Analysis Agent Module - Handles market data analysis and metrics calculation
3"""
4
5import logging
6import re
7from typing import Dict, List
8from decimal import Decimal
9from apify import Actor
10from openai import AsyncOpenAI
11
12from .extraction_agent import MarketData
13
14logger = logging.getLogger(__name__)
15
16class AnalysisAgent:
17 def __init__(self, client: AsyncOpenAI):
18 """Initialize the AnalysisAgent with an OpenAI client.
19
20 Args:
21 client (AsyncOpenAI): The OpenAI client instance to use for API calls
22 """
23 self.client = client
24 self.high_cost_markets = ["new york", "san francisco", "los angeles", "seattle", "boston", "miami"]
25
26 async def analyze_market(self, market_data: List[MarketData], location: str = "") -> Dict:
27 """Analyze the extracted market data and extract key metrics"""
28 metrics = {
29 "zillow": {},
30 "redfin": {},
31 "realtor": {},
32 "rocket": {}
33 }
34
35 is_high_cost = any(market.lower() in location.lower() for market in self.high_cost_markets)
36
37 min_price = 10000
38 max_price = 10000000 if is_high_cost else 2000000
39 min_valid_price = 100000
40 max_valid_price = 5000000 if is_high_cost else 1000000
41
42 try:
43 for data in market_data:
44 source = data.source
45 metrics[source] = {
46 "median_price": data.median_price,
47 "price_change": data.price_change,
48 "inventory": data.inventory,
49 "days_on_market": data.days_on_market,
50 "source": source
51 }
52
53 # Validate metrics
54 if min_price <= data.median_price <= max_price:
55 await Actor.charge('market-analyzed')
56 logger.info(f"Successfully analyzed data from {source}")
57
58 except Exception as e:
59 logger.error(f"Error analyzing market data: {str(e)}")
60
61 metrics["_meta"] = {
62 "min_valid_price": min_valid_price,
63 "max_valid_price": max_valid_price
64 }
65
66 return metrics
67
68 async def analyze_market_data(self, market_data: Dict, location: str = "") -> Dict:
69 """Analyze the extracted market data and extract key metrics"""
70 metrics = {
71 "zillow": {},
72 "redfin": {},
73 "realtor": {},
74 "rapid": {}
75 }
76
77 await Actor.charge('market-analyzed')
78
79 is_high_cost = any(market.lower() in location.lower() for market in self.high_cost_markets)
80
81 min_price = 10000
82 max_price = 10000000 if is_high_cost else 2000000
83 min_valid_price = 100000
84 max_valid_price = 5000000 if is_high_cost else 1000000
85
86 try:
87 for source, data in market_data.items():
88 text = data.get("text", "").lower()
89
90 if not text:
91 continue
92
93 metrics_found = False
94
95 # Extract and validate metrics
96 metrics[source].update(self._extract_price_metrics(text, min_price, max_price))
97 metrics[source].update(self._extract_price_change(text))
98 metrics[source].update(self._extract_market_metrics(text))
99
100 if metrics[source]:
101 metrics_found = True
102 await Actor.charge('market-analyzed')
103
104 metrics[source]["source_date"] = data.get("metadata", {}).get("loadedTime", "")
105
106 except Exception as e:
107 logger.error(f"Error analyzing market data: {str(e)}")
108
109 metrics["_meta"] = {
110 "min_valid_price": min_valid_price,
111 "max_valid_price": max_valid_price,
112 "is_high_cost": is_high_cost
113 }
114
115 source_urls = {
116 source: data.get("metadata", {}).get("canonicalUrl") or data.get("metadata", {}).get("loadedUrl", "")
117 for source, data in market_data.items()
118 }
119
120 return {"metrics": metrics, "source_urls": source_urls}
121
122 def _extract_price_metrics(self, text: str, min_price: int, max_price: int) -> Dict:
123 """Extract and validate price metrics"""
124 metrics = {}
125 price_patterns = [
126 r"median (?:sale )?price.*?\$([0-9,.]+)[MK]?",
127 r"average.*?home value.*?\$([0-9,.]+)[MK]?",
128 r"median.*?home value.*?\$([0-9,.]+)[MK]?",
129 r"\$([0-9,.]+)[MK]?(?=.*median)",
130 ]
131
132 for pattern in price_patterns:
133 price_match = re.search(pattern, text)
134 if price_match:
135 try:
136 price = float(price_match.group(1).replace(",", ""))
137 if min_price <= price <= max_price:
138 if "m" in text[price_match.end():price_match.end()+2].lower():
139 metrics["median_price"] = price * 1000000
140 elif "k" in text[price_match.end():price_match.end()+2].lower():
141 metrics["median_price"] = price * 1000
142 else:
143 metrics["median_price"] = price
144 break
145 except ValueError:
146 continue
147
148 return metrics
149
150 def _extract_price_change(self, text: str) -> Dict:
151 """Extract and validate price change percentage"""
152 metrics = {}
153 change_patterns = [
154 r"(up|down)\s+([0-9.]+)%\s+(?:since|compared to|over|in the) last year",
155 r"([0-9.]+)%\s+(increase|decrease)\s+(?:since|compared to|over|in the) last year",
156 r"([+-]?[0-9.]+)%\s+1-yr"
157 ]
158
159 for pattern in change_patterns:
160 change_match = re.search(pattern, text)
161 if change_match:
162 try:
163 if len(change_match.groups()) == 2:
164 change = float(change_match.group(2))
165 if "down" in change_match.group(1).lower() or "decrease" in change_match.group(2).lower():
166 change = -change
167 else:
168 change = float(change_match.group(1))
169 if abs(change) <= 50:
170 metrics["price_change"] = change
171 break
172 except ValueError:
173 continue
174
175 return metrics
176
177 def _extract_market_metrics(self, text: str) -> Dict:
178 """Extract and validate market metrics (days on market, price per sqft, inventory)"""
179 metrics = {}
180
181 # Days on market
182 dom_patterns = [
183 r"(?:sell|sold) (?:in|after) (?:around )?([0-9]+) days",
184 r"(?:average|median) (?:of )?([0-9]+) days on (?:the )?market",
185 r"([0-9]+) days on (?:the )?market",
186 r"pending in (?:around )?([0-9]+) days"
187 ]
188
189 for pattern in dom_patterns:
190 dom_match = re.search(pattern, text)
191 if dom_match:
192 try:
193 days = int(dom_match.group(1))
194 if 0 <= days <= 365:
195 metrics["days_on_market"] = days
196 break
197 except ValueError:
198 continue
199
200 # Price per sqft
201 sqft_patterns = [
202 r"\$([0-9,.]+) per square (?:foot|feet|ft)",
203 r"price per (?:square )?(?:foot|feet|ft).*?\$([0-9,.]+)"
204 ]
205
206 for pattern in sqft_patterns:
207 sqft_match = re.search(pattern, text)
208 if sqft_match:
209 try:
210 price_sqft = float(sqft_match.group(1).replace(",", ""))
211 if 50 <= price_sqft <= 2000:
212 metrics["price_per_sqft"] = price_sqft
213 break
214 except ValueError:
215 continue
216
217 # Inventory
218 inv_patterns = [
219 r"([0-9,]+) homes? (?:for sale|available|active)",
220 r"inventory of ([0-9,]+) homes",
221 r"([0-9,]+) properties? (?:for sale|available|active)"
222 ]
223
224 for pattern in inv_patterns:
225 inv_match = re.search(pattern, text)
226 if inv_match:
227 try:
228 inventory = int(inv_match.group(1).replace(",", ""))
229 if 0 <= inventory <= 10000:
230 metrics["inventory"] = inventory
231 break
232 except ValueError:
233 continue
234
235 return metrics
src/agents/extraction_agent.py
1"""
2Extraction Agent Module - Handles data extraction from validated sources
3"""
4
5import logging
6from typing import Dict, List
7from dataclasses import dataclass
8from decimal import Decimal
9from apify import Actor
10from apify_client import ApifyClient
11from openai import AsyncOpenAI
12import os
13import re
14
15logger = logging.getLogger(__name__)
16
17@dataclass
18class MarketData:
19 median_price: float
20 price_change: float
21 inventory: int
22 days_on_market: int
23 source: str
24
25class ExtractionAgent:
26 def __init__(self, client: AsyncOpenAI):
27 """Initialize the ExtractionAgent with OpenAI client"""
28 self.client = client
29 self.apify_client = ApifyClient(token=os.environ["APIFY_TOKEN"])
30
31 def _extract_price_metrics(self, text: str) -> Dict:
32 """Extract price metrics from text"""
33 metrics = {}
34 price_patterns = [
35 r"(?:median|average).*?(?:home value|sale price|sold price)[^\$]*\$([0-9,.]+)([KM]?)",
36 r"\$([0-9,.]+)([KM]?)(?=.*median)",
37 r"typical home value.*?\$([0-9,.]+)([KM]?)",
38 r"median list price.*?\$([0-9,.]+)([KM]?)"
39 ]
40
41 for pattern in price_patterns:
42 price_match = re.search(pattern, text.lower())
43 if price_match:
44 try:
45 price_str = price_match.group(1).replace(",", "")
46 price = float(price_str)
47 suffix = price_match.group(2).upper() if len(price_match.groups()) > 1 else ""
48
49 # Handle price multipliers
50 if suffix == "M":
51 price *= 1000000
52 elif suffix == "K":
53 price *= 1000
54
55 # Validate price range (100k to 2M for normal markets)
56 if 100000 <= price <= 2000000:
57 metrics["median_price"] = price
58 logger.info(f"Extracted price: ${price:,.2f}")
59 break
60 else:
61 logger.warning(f"Price {price} outside valid range")
62 except ValueError:
63 continue
64
65 return metrics
66
67 def _extract_price_change(self, text: str) -> Dict:
68 """Extract price change percentage from text"""
69 metrics = {}
70 change_patterns = [
71 r"(?:down|decreased)\s+([0-9.]+)%",
72 r"([0-9.]+)%\s+(?:decrease)",
73 r"-([0-9.]+)%",
74 r"(?:up|increased)\s+([0-9.]+)%",
75 r"([0-9.]+)%\s+(?:increase)",
76 r"\+([0-9.]+)%"
77 ]
78
79 for pattern in change_patterns:
80 change_match = re.search(pattern, text.lower())
81 if change_match:
82 try:
83 change = float(change_match.group(1))
84 # Make negative if pattern indicates decrease
85 if any(word in pattern.lower() for word in ["down", "decrease", "-"]):
86 change = -change
87
88 # Validate change range (-20% to +20% is reasonable)
89 if abs(change) <= 20:
90 metrics["price_change"] = change
91 logger.info(f"Extracted price change: {change}%")
92 break
93 else:
94 logger.warning(f"Price change {change}% outside valid range")
95 except ValueError:
96 continue
97
98 return metrics
99
100 def _extract_market_metrics(self, text: str) -> Dict:
101 """Extract market metrics from text"""
102 metrics = {}
103
104 # Days on market
105 dom_patterns = [
106 r"(?:median |average )?(?:days on market|dom)[:\s]+([0-9]+)",
107 r"(?:sell|sold|pending) (?:in|after) (?:around )?([0-9]+) days",
108 r"(?:average|median) (?:of )?([0-9]+) days on (?:the )?market",
109 r"([0-9]+) days? on (?:the )?market",
110 r"average listing age of ([0-9]+) days"
111 ]
112
113 for pattern in dom_patterns:
114 dom_match = re.search(pattern, text.lower())
115 if dom_match:
116 try:
117 days = int(dom_match.group(1))
118 if 0 <= days <= 180: # Most markets are under 180 days
119 metrics["days_on_market"] = days
120 logger.info(f"Extracted days on market: {days}")
121 break
122 else:
123 logger.warning(f"Days on market {days} outside valid range")
124 except ValueError:
125 continue
126
127 # Inventory
128 inv_patterns = [
129 r"([0-9,]+)\s+(?:homes?|properties?|listings?|houses?)?\s+(?:for sale|available|active)",
130 r"inventory of ([0-9,]+)",
131 r"([0-9,]+) total listings",
132 r"([0-9,]+) properties found",
133 r"([0-9,]+) homes sold"
134 ]
135
136 for pattern in inv_patterns:
137 inv_match = re.search(pattern, text.lower())
138 if inv_match:
139 try:
140 inventory = int(inv_match.group(1).replace(",", ""))
141 if 0 <= inventory <= 50000: # Increased max for larger markets
142 metrics["inventory"] = inventory
143 logger.info(f"Extracted inventory: {inventory}")
144 break
145 else:
146 logger.warning(f"Inventory {inventory} outside valid range")
147 except ValueError:
148 continue
149
150 return metrics
151
152 async def extract_data(self, urls: Dict[str, str]) -> List[MarketData]:
153 """Extract market data from the filtered URLs"""
154 market_data = []
155
156 try:
157 # Convert URLs dict to list of URLData for crawler
158 url_list = [{"url": url, "method": "GET"} for url in urls.values()]
159
160 crawler_input = {
161 "startUrls": url_list,
162 "crawlerType": "playwright", # Changed to playwright for better JS support
163 "maxCrawlPages": len(urls),
164 "maxCrawlDepth": 0,
165 "saveMarkdown": True,
166 "maxRequestRetries": 3,
167 "maxConcurrency": 1,
168 "proxyConfiguration": {
169 "useApifyProxy": True,
170 "apifyProxyGroups": ["RESIDENTIAL"]
171 },
172 "removeElementsCssSelector": "nav, footer, script, style, noscript, svg, img[src^='data:']",
173 "htmlTransformer": "readableText",
174 "waitUntil": "networkidle", # Wait for network to be idle
175 "browserPoolOptions": {
176 "maxOpenPagesPerBrowser": 1,
177 "retireInstanceAfterRequestCount": 5
178 }
179 }
180
181 logger.info("Starting Website Content Crawler...")
182 run = self.apify_client.actor("apify/website-content-crawler").call(
183 run_input=crawler_input,
184 memory_mbytes=4096
185 )
186
187 dataset_id = run["defaultDatasetId"]
188 items = self.apify_client.dataset(dataset_id).list_items().items
189
190 for source, url in urls.items():
191 logger.info(f"Processing {source} URL: {url}")
192 extracted = False
193
194 for item in items:
195 if item.get("url") == url:
196 text_content = item.get("text", "")
197
198 if not text_content:
199 logger.warning(f"No content extracted from {source}")
200 continue
201
202 # Log first 500 chars of content for debugging
203 logger.debug(f"{source} content preview: {text_content[:500]}")
204
205 # Extract metrics using parsing logic
206 price_metrics = self._extract_price_metrics(text_content)
207 price_change = self._extract_price_change(text_content)
208 market_metrics = self._extract_market_metrics(text_content)
209
210 if price_metrics or price_change or market_metrics:
211 await Actor.charge('data-extracted')
212 logger.info(f"Successfully extracted data from {source}")
213
214 market_data.append(MarketData(
215 median_price=price_metrics.get("median_price", 0),
216 price_change=price_change.get("price_change", 0),
217 inventory=market_metrics.get("inventory", 0),
218 days_on_market=market_metrics.get("days_on_market", 0),
219 source=source
220 ))
221 extracted = True
222 break
223
224 if not extracted:
225 logger.error(f"Failed to extract valid data from {source}")
226
227 except Exception as e:
228 logger.error(f"Error extracting market data: {str(e)}")
229
230 return market_data
src/agents/newsletter_agent.py
1"""
2Newsletter Agent Module - Handles report generation using OpenAI
3"""
4
5import logging
6from datetime import datetime
7from typing import Dict, Any
8from openai import AsyncOpenAI
9from apify import Actor
10
11logger = logging.getLogger(__name__)
12
13class NewsletterAgent:
14 def __init__(self, client: AsyncOpenAI):
15 """Initialize the NewsletterAgent with an OpenAI client.
16
17 Args:
18 client (AsyncOpenAI): The OpenAI client instance to use for API calls
19 """
20 self.client = client
21
22 async def generate_newsletter(self, location: str, market_data: Dict, analysis: Dict) -> str:
23 """Generate a real estate market newsletter using OpenAI"""
24 try:
25 current_date = datetime.now().strftime("%B %Y")
26
27 metrics = analysis.get("metrics", {})
28 source_urls = analysis.get("source_urls", {})
29 meta = metrics.get("_meta", {})
30 min_valid_price = meta.get("min_valid_price", 100000)
31 max_valid_price = meta.get("max_valid_price", 1000000)
32
33 formatted_data = self._format_source_data(metrics)
34 formatted_urls = self._format_source_urls(source_urls)
35 avg_metrics = self._calculate_averages(metrics, min_valid_price, max_valid_price)
36
37 system_content = self._get_system_prompt()
38 user_content = self._get_user_prompt(location, current_date, formatted_data, avg_metrics, formatted_urls)
39
40 response = await self.client.chat.completions.create(
41 model="gpt-4o-mini",
42 messages=[
43 {"role": "system", "content": system_content},
44 {"role": "user", "content": user_content}
45 ],
46 temperature=0.7,
47 max_tokens=2000
48 )
49
50 newsletter = response.choices[0].message.content
51
52 # Charge for newsletter generation
53 await Actor.charge('newsletter-generated')
54
55 return newsletter
56
57 except Exception as e:
58 logger.error(f"Error generating newsletter: {str(e)}")
59 return f"Error generating newsletter: {str(e)}"
60
61 def _format_price(self, price):
62 """Format price with proper formatting"""
63 if price and isinstance(price, (int, float)):
64 return f"${price:,.0f}"
65 return "N/A"
66
67 def _format_percent(self, value):
68 """Format percentage with proper formatting"""
69 if value is not None:
70 return f"{value:+.1f}%" if value >= 0 else f"{value:.1f}%"
71 return "N/A"
72
73 def _format_source_data(self, metrics: Dict) -> str:
74 """Format market data from each source"""
75 formatted_data = ""
76 for source in ["zillow", "redfin", "realtor", "rapid"]:
77 source_data = metrics.get(source, {})
78 if source_data:
79 formatted_data += f"""
80{source.capitalize()}:
81- Median Price: {self._format_price(source_data.get('median_price'))}
82- Price Change: {self._format_percent(source_data.get('price_change'))}
83- Days on Market: {source_data.get('days_on_market', 'N/A')}
84- Price Per SqFt: {self._format_price(source_data.get('price_per_sqft'))}
85- Inventory: {source_data.get('inventory', 'N/A')}
86"""
87 return formatted_data
88
89 def _format_source_urls(self, source_urls: Dict) -> str:
90 """Format source URLs"""
91 return "\n".join(f"- {source.capitalize()}: {url}" for source, url in source_urls.items() if url)
92
93 def _calculate_averages(self, metrics: Dict, min_valid_price: int, max_valid_price: int) -> Dict:
94 """Calculate average metrics across sources"""
95 def calculate_average(metric_name):
96 values = []
97 for source, source_data in metrics.items():
98 if source == "_meta":
99 continue
100 value = source_data.get(metric_name)
101 if value and isinstance(value, (int, float)):
102 if metric_name == "median_price" and (value < min_valid_price or value > max_valid_price):
103 continue
104 if metric_name == "price_change" and abs(value) > 20:
105 continue
106 values.append(value)
107 return sum(values) / len(values) if values else None
108
109 return {
110 "avg_price": calculate_average("median_price"),
111 "avg_price_change": calculate_average("price_change"),
112 "avg_dom": calculate_average("days_on_market")
113 }
114
115 def _get_system_prompt(self) -> str:
116 """Get system prompt for newsletter generation"""
117 return """You are an expert real estate newsletter writer. Create a professional, polished, and well-structured newsletter using the provided market data.
118
119Format the newsletter in Markdown with the following requirements:
1201. Begin with a main heading (#) that includes the location name and "Real Estate Market Update - [Month Year]"
1212. Add a "Last Updated" line right after the title
1223. Use subheadings (##) for different sections with proper spacing
1234. Include a well-formatted Markdown table comparing data sources
1245. Use emoji icons to highlight key points
1256. Format all prices with dollar signs and commas
1267. Include percentages with % symbol and +/- signs
1278. Use proper spacing between sections
1289. Use **bold** for critical information and *italic* for secondary emphasis
12910. If data is limited or inconsistent, acknowledge this and focus on reliable metrics
130
131Include these sections:
132- Executive Summary (3-4 sentences)
133- Market Overview (with validated average metrics)
134- Market Data Comparison (table)
135- Price Analysis
136- Market Activity
137- Market Forecast
138- Recommendations for Buyers and Sellers
139- Additional Resources"""
140
141 def _get_user_prompt(self, location: str, current_date: str, formatted_data: str, avg_metrics: Dict, formatted_urls: str) -> str:
142 """Get user prompt for newsletter generation"""
143 return f"""Create a real estate market newsletter for {location} for {current_date}.
144
145MARKET DATA:
146{formatted_data}
147
148AVERAGE METRICS (excluding outliers):
149- Average Price: {self._format_price(avg_metrics['avg_price'])}
150- Average Price Change: {self._format_percent(avg_metrics['avg_price_change'])}
151- Average Days on Market: {int(avg_metrics['avg_dom']) if avg_metrics['avg_dom'] else 'N/A'}
152
153SOURCE URLS:
154{formatted_urls}
155
156Please generate a comprehensive newsletter following the format in the system instructions. Make sure to:
1571. Focus on the most reliable data points
1582. Acknowledge any data inconsistencies
1593. Provide specific insights based on validated metrics
1604. Include actionable recommendations
1615. Format all numbers properly
1626. Use appropriate spacing and visual elements
1637. Bold important findings
1648. If certain metrics are missing or unreliable, explain the limitations"""
src/agents/search_agent.py
1"""
2Search Agent Module - Handles finding relevant real estate market sources
3"""
4
5import logging
6import re
7from typing import Dict, List, Optional
8from dataclasses import dataclass
9from decimal import Decimal
10from apify import Actor
11from apify_client import ApifyClient
12from openai import AsyncOpenAI
13import os
14
15logger = logging.getLogger(__name__)
16
17@dataclass
18class URLData:
19 url: str
20 source: str
21
22class SearchAgent:
23 # URL validation patterns focusing on essential subdirectories and formats
24 URL_PATTERNS = {
25 "zillow": r"zillow\.com/home-values/\d+/[a-zA-Z0-9-]+(?:/)?$",
26 "redfin": r"redfin\.com/city/\d+/[A-Z]{2}/[A-Za-z-]+/housing-market(?:/)?$",
27 "realtor": r"realtor\.com/realestateandhomes-search/[A-Za-z-]+_[A-Z]{2}/overview(?:/)?$",
28 "rocket": r"rocket\.com/homes/market-reports/[a-z]{2}/[a-z-]+(?:/)?$"
29 }
30
31 def __init__(self, client: AsyncOpenAI):
32 """Initialize the SearchAgent with an OpenAI client.
33
34 Args:
35 client (AsyncOpenAI): The OpenAI client instance to use for API calls
36 """
37 self.client = client
38 self.apify_client = ApifyClient(token=os.environ["APIFY_TOKEN"])
39
40 def _normalize_location(self, location: str) -> Optional[str]:
41 """Normalize location input to a standardized format."""
42 try:
43 # Remove extra whitespace and convert to lowercase
44 location = " ".join(location.strip().lower().split())
45
46 # Extract state code (assuming 2-letter state code)
47 state_match = re.search(r'[,\s]+([a-zA-Z]{2})$', location)
48 if not state_match:
49 logger.warning(f"No valid state code found in location: {location}")
50 return None
51
52 state = state_match.group(1).upper()
53
54 # Remove state code and clean up remaining location
55 base_location = location[:state_match.start()].strip()
56
57 # Remove only non-essential location words and special characters
58 base_location = re.sub(r'\b(town|village|township|metropolitan|area)\b', '', base_location)
59 base_location = re.sub(r'[^\w\s-]', '', base_location).strip()
60
61 # Convert spaces to hyphens and remove multiple hyphens
62 normalized = f"{'-'.join(base_location.split())}-{state}"
63 normalized = re.sub(r'-+', '-', normalized)
64
65 logger.info(f"Normalized location '{location}' to '{normalized}'")
66 return normalized
67
68 except Exception as e:
69 logger.error(f"Error normalizing location '{location}': {str(e)}")
70 return None
71
72 async def find_sources(self, location: str) -> Dict[str, str]:
73 """Find relevant real estate market sources for the given location.
74
75 Args:
76 location (str): The city and state to search for
77
78 Returns:
79 Dict[str, str]: Dictionary of source names to URLs
80 """
81 try:
82 # Charge for search initialization
83 await Actor.charge('search-init')
84
85 # Get normalized location
86 normalized_location = self._normalize_location(location)
87 if not normalized_location:
88 raise ValueError(f"Could not normalize location: {location}")
89
90 # Search for URLs using Google Search
91 all_urls = await self.search_urls(location)
92
93 # Filter and validate URLs
94 filtered_urls = await self.filter_urls(all_urls)
95 if not filtered_urls:
96 raise ValueError("No valid URLs found after filtering")
97
98 # Convert to dictionary format
99 return {url_data.source: url_data.url for url_data in filtered_urls}
100
101 except Exception as e:
102 logger.error(f"Error finding sources: {str(e)}")
103 raise # Re-raise the error instead of returning empty dict
104
105 async def search_urls(self, location: str) -> List[str]:
106 """Search for market research URLs using Apify Google Search Scraper"""
107 all_urls = []
108
109 try:
110 normalized_location = self._normalize_location(location)
111 if not normalized_location:
112 raise ValueError(f"Could not normalize location: {location}")
113
114 # Simple search query focusing on main domains
115 search_query = f"{normalized_location} real estate market site:zillow.com OR site:redfin.com OR site:realtor.com OR site:rocket.com"
116 logger.info(f"Searching with query: {search_query}")
117
118 # Run Google Search scraper
119 run = self.apify_client.actor("apify/google-search-scraper").call(
120 run_input={
121 "queries": search_query,
122 "maxPagesPerQuery": 2,
123 "resultsPerPage": 10,
124 "languageCode": "en",
125 "countryCode": "us",
126 "mobileResults": False
127 }
128 )
129
130 # Get results from dataset
131 dataset_id = run["defaultDatasetId"]
132 items = self.apify_client.dataset(dataset_id).list_items().items
133
134 if items and len(items) > 0:
135 for item in items:
136 for result in item.get("organicResults", []):
137 url = result.get("url", "").strip()
138 if url:
139 all_urls.append(url)
140 logger.info(f"Found URL: {url}")
141 await Actor.charge('url-processed')
142
143 except Exception as e:
144 logger.error(f"Error searching URLs: {str(e)}")
145 raise # Raise the error instead of falling back to templates
146
147 if not all_urls:
148 logger.warning("No URLs found in search")
149 raise ValueError("No URLs found in search") # Raise error instead of falling back
150
151 logger.info(f"Found {len(all_urls)} URLs in total")
152 return all_urls
153
154 async def filter_urls(self, urls: List[str]) -> List[URLData]:
155 """Filter and validate URLs by source"""
156 filtered_urls = []
157 source_counts = {source: 0 for source in self.URL_PATTERNS.keys()}
158
159 for url in urls:
160 for source, pattern in self.URL_PATTERNS.items():
161 if re.search(pattern, url, re.IGNORECASE):
162 if source_counts[source] == 0: # Only take first valid URL per source
163 filtered_urls.append(URLData(url=url, source=source))
164 source_counts[source] += 1
165 logger.info(f"Found valid {source} URL: {url}")
166 await Actor.charge('url-processed')
167 break
168
169 if not filtered_urls:
170 logger.warning("No valid URLs found after filtering")
171 else:
172 logger.info(f"Found {len(filtered_urls)} valid URLs")
173
174 return filtered_urls
175
176 def _get_template_urls(self, normalized_location: str) -> Dict[str, str]:
177 """Get template URLs as fallback"""
178 return {
179 "zillow": f"https://www.zillow.com/homes/{normalized_location}_rb/",
180 "redfin": f"https://www.redfin.com/city/{normalized_location}",
181 "realtor": f"https://www.realtor.com/realestateandhomes-search/{normalized_location}"
182 }
src/agents/writer_agent.py
1class NewsletterWriter:
2 def __init__(self, openai_client):
3 self.client = openai_client
4 self.required_metrics = ['median_price', 'price_change', 'days_on_market']
5
6 def _validate_source_data(self, market_data):
7 """Validate and consolidate data from different sources."""
8 valid_sources = {}
9
10 for source, data in market_data.items():
11 if not data or not isinstance(data, dict):
12 continue
13
14 metrics = {}
15 # Extract core metrics if they exist
16 if 'median_price' in data and data['median_price']:
17 metrics['median_price'] = data['median_price']
18 if 'price_change' in data and data['price_change']:
19 metrics['price_change'] = data['price_change']
20 if 'days_on_market' in data and data['days_on_market']:
21 metrics['days_on_market'] = data['days_on_market']
22 if 'price_per_sqft' in data and data['price_per_sqft']:
23 metrics['price_per_sqft'] = data['price_per_sqft']
24
25 # Extract additional metrics from Rocket data
26 if source == 'rocket' and isinstance(data.get('text'), str):
27 text = data['text']
28 if "Neutral Market" in text:
29 metrics['market_type'] = "Neutral Market"
30 elif "Seller's Market" in text:
31 metrics['market_type'] = "Seller's Market"
32 elif "Buyer's Market" in text:
33 metrics['market_type'] = "Buyer's Market"
34
35 # Extract inventory and sales data if available
36 if "homes for sale" in text:
37 metrics['inventory'] = self._extract_inventory(text)
38 if "homes sold" in text:
39 metrics['sales_volume'] = self._extract_sales(text)
40
41 # Only include sources with actual data
42 if metrics:
43 valid_sources[source] = metrics
44
45 return valid_sources
46
47 def _extract_inventory(self, text):
48 """Extract inventory numbers from text."""
49 try:
50 # Add logic to extract inventory numbers
51 return None
52 except:
53 return None
54
55 def _extract_sales(self, text):
56 """Extract sales volume from text."""
57 try:
58 # Add logic to extract sales numbers
59 return None
60 except:
61 return None
62
63 def _format_market_data(self, market_data):
64 """Format market data into sections for the newsletter."""
65 valid_sources = self._validate_source_data(market_data)
66
67 if not valid_sources:
68 return "Error: No valid market data available"
69
70 # Calculate averages across sources
71 avg_metrics = {
72 'median_price': [],
73 'price_change': [],
74 'days_on_market': [],
75 'price_per_sqft': []
76 }
77
78 for source_data in valid_sources.values():
79 for metric, values in avg_metrics.items():
80 if metric in source_data:
81 values.append(source_data[metric])
82
83 # Format market insights
84 insights = []
85
86 # Add price insights
87 if avg_metrics['median_price']:
88 median_price = sum(avg_metrics['median_price']) / len(avg_metrics['median_price'])
89 insights.append(f"The median home price is ${median_price:,.0f}")
90
91 if avg_metrics['price_change']:
92 avg_change = sum(avg_metrics['price_change']) / len(avg_metrics['price_change'])
93 insights.append(f"Prices have changed by {avg_change:.1f}% over the past year")
94
95 if avg_metrics['days_on_market']:
96 avg_dom = sum(avg_metrics['days_on_market']) / len(avg_metrics['days_on_market'])
97 insights.append(f"Homes are selling in an average of {avg_dom:.0f} days")
98
99 # Add market type if available from Rocket
100 rocket_data = valid_sources.get('rocket', {})
101 if 'market_type' in rocket_data:
102 insights.append(f"The area is currently a {rocket_data['market_type']}")
103
104 # Add inventory insights if available
105 if 'inventory' in rocket_data:
106 insights.append(f"There are currently {rocket_data['inventory']:,} homes for sale")
107
108 return {
109 'insights': insights,
110 'averages': {
111 'median_price': sum(avg_metrics['median_price']) / len(avg_metrics['median_price']) if avg_metrics['median_price'] else None,
112 'price_change': sum(avg_metrics['price_change']) / len(avg_metrics['price_change']) if avg_metrics['price_change'] else None,
113 'days_on_market': sum(avg_metrics['days_on_market']) / len(avg_metrics['days_on_market']) if avg_metrics['days_on_market'] else None
114 },
115 'sources': list(valid_sources.keys())
116 }
117
118 def write_newsletter(self, location, market_data):
119 """Generate a real estate market newsletter."""
120 try:
121 formatted_data = self._format_market_data(market_data)
122
123 if isinstance(formatted_data, str) and formatted_data.startswith("Error"):
124 return formatted_data
125
126 # Create system prompt for the model
127 system_prompt = """You are a professional real estate market analyst writing a newsletter.
128 IMPORTANT FORMATTING RULES:
129 1. DO NOT include any tables or grid-like data presentations
130 2. Present all data in a narrative, paragraph format
131 3. Use bullet points sparingly and only for recommendations
132 4. Write in a clear, flowing style that connects insights naturally
133 5. Keep the tone professional and avoid emojis
134 6. Focus on telling the market story rather than listing data points
135 7. Keep sections concise and impactful
136 8. When presenting numbers, integrate them smoothly into sentences
137 9. Avoid markdown formatting except for section headers
138 10. Do not include comparison grids or charts"""
139
140 # Create user prompt with formatted data
141 user_prompt = f"""Write a real estate market newsletter for {location} that weaves these insights into a cohesive narrative:
142
143 Available Market Insights:
144 {chr(10).join('- ' + insight for insight in formatted_data['insights'])}
145
146 Based on data from: {', '.join(formatted_data['sources']).title()}
147
148 Structure the newsletter as follows:
149 1. Title and Date
150 2. Executive Summary (2-3 sentences on key trends)
151 3. Current Market Conditions (integrate price and market type insights)
152 4. Market Activity and Trends (blend sales pace and price trends)
153 5. Future Outlook (brief forecast based on current trends)
154 6. Buyer and Seller Recommendations (3-4 actionable points each)
155
156 IMPORTANT:
157 - DO NOT include any tables or data grids
158 - Present all metrics within flowing paragraphs
159 - Focus on telling a coherent market story
160 - Keep the writing style professional and straightforward
161 - Integrate numbers naturally into sentences
162 - Use minimal formatting - only use ## for section headers"""
163
164 # Generate newsletter using OpenAI
165 response = self.client.chat.completions.create(
166 model="gpt-3.5-turbo",
167 messages=[
168 {"role": "system", "content": system_prompt},
169 {"role": "user", "content": user_prompt}
170 ],
171 temperature=0.7,
172 max_tokens=1500
173 )
174
175 return response.choices[0].message.content
176
177 except Exception as e:
178 return f"Error generating newsletter: {str(e)}"
src/models/schemas.py
1from typing import List, Dict, Optional
2from pydantic import BaseModel, Field
3
4class URLData(BaseModel):
5 """Data structure for URLs with their source information"""
6 url: str
7 source: str
8
9class MarketMetrics(BaseModel):
10 """Structure for real estate market metrics"""
11 median_price: Optional[float] = None
12 price_change: Optional[float] = None
13 days_on_market: Optional[int] = None
14 inventory: Optional[int] = None
15 price_per_sqft: Optional[float] = None
16 source_date: Optional[str] = None
17
18class AgentState(BaseModel):
19 """State management for the real estate newsletter agent"""
20 location: str = Field(..., description="Target location for market analysis")
21 search_urls: List[str] = Field(default_factory=list, description="Initial search results")
22 filtered_urls: List[URLData] = Field(default_factory=list, description="Filtered and validated URLs")
23 final_urls: List[URLData] = Field(default_factory=list, description="Final URLs for data extraction")
24 market_data: Dict[str, MarketMetrics] = Field(default_factory=dict, description="Extracted market data")
25 errors: List[str] = Field(default_factory=list, description="Error messages during processing")
26 location_valid: bool = Field(default=False, description="Location validation status")
27 analysis_complete: bool = Field(default=False, description="Analysis completion status")
28 newsletter: Optional[str] = None
src/models/__init__.py
1"""Models package for the Real Estate Newsletter Agent."""
src/utils/charging.py
1"""
2Shared utilities for pay-per-event charging
3"""
4
5import logging
6from decimal import Decimal
7from typing import Dict
8from apify import Actor
9
10logger = logging.getLogger(__name__)
11
12# Define all chargeable events and their prices
13EVENTS = {
14 'search-initialized': '0.02',
15 'url-processed': '0.02',
16 'data-extracted': '0.02',
17 'market-analyzed': '0.02',
18 'newsletter-generated': '0.50'
19}
20
21def register_events():
22 """Register all chargeable events with their prices"""
23 try:
24 charging_manager = Actor.get_charging_manager()
25 for event_name, price in EVENTS.items():
26 charging_manager.register_event(event_name, price)
27 logger.info("Successfully registered all chargeable events")
28 except Exception as e:
29 logger.error(f"Error registering events: {str(e)}")
30
31async def charge_event(event_name: str, count: int = 1) -> bool:
32 """Charge for an event using predefined prices
33
34 Args:
35 event_name: Name of the event to charge for
36 count: Number of events to charge for (default: 1)
37
38 Returns:
39 bool: True if charging was successful, False otherwise
40 """
41 try:
42 if event_name not in EVENTS:
43 logger.warning(f"Unknown event: {event_name}")
44 return False
45
46 await Actor.charge(event_name, count)
47 logger.info(f"Successfully charged for {count} {event_name} event(s)")
48 return True
49 except Exception as e:
50 logger.warning(f"Failed to charge for {event_name}: {str(e)}")
51 return False
src/utils/url_patterns.py
1"""URL patterns for real estate market data sources"""
2
3# Regular expression patterns for validating real estate market URLs
4URL_PATTERNS = {
5 "zillow": r"zillow\.com/(?:home-values/\d+/[^/]+(?:-[a-z]{2})?|[^/]+-[a-z]{2}/home-values|[^/]+/home-values)/?$",
6 "redfin": r"redfin\.com/(?:city/\d+/[A-Z]{2}/[^/]+/housing-market|[^/]+/housing-market)/?$",
7 "realtor": r"realtor\.com/(?:realestateandhomes-search/[^/]+(?:_[A-Z]{2})?/overview|market-trends/[^/]+)/?$",
8 "rapid": r"(?:rocket|rapid)\.com/(?:homes/market-reports|market-trends)/(?:[a-z]{2}/)?[^/]+/?$"
9}
10
11# Search query templates for each source
12SEARCH_QUERIES = {
13 "zillow": "{location} real estate market home values site:zillow.com",
14 "redfin": "{location} housing market trends site:redfin.com",
15 "realtor": "{location} real estate market overview site:realtor.com",
16 "rapid": "{location} housing market report site:rocket.com"
17}
18
19# Maximum number of URLs to process per source
20MAX_URLS_PER_SOURCE = 1
21
22# Required sources for complete analysis
23REQUIRED_SOURCES = ["zillow", "redfin", "realtor", "rapid"]
src/utils/__init__.py
1"""Utilities package for the Real Estate Newsletter Agent."""