Realestate Newsletter Agent Langgraph avatar

Realestate Newsletter Agent Langgraph

Try for free

This Actor is paid per event

Go to Store
Realestate Newsletter Agent Langgraph

Realestate Newsletter Agent Langgraph

gopalakrishnan/realestate-newsletter-agent-langgraph
Try for free

This Actor is paid per event

An autonomous Apify actor that generates comprehensive real estate market research reports by analyzing data from multiple authoritative sources.

Developer
Maintained by Community

Actor Metrics

  • 1 monthly user

  • No reviews yet

  • No bookmarks yet

  • Created in Mar 2025

  • Modified 11 hours ago

.dockerignore

1.git
2.mise.toml
3.nvim.lua
4storage
5
6# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
7
8# Byte-compiled / optimized / DLL files
9__pycache__/
10*.py[cod]
11*$py.class
12
13# C extensions
14*.so
15
16# Distribution / packaging
17.Python
18build/
19develop-eggs/
20dist/
21downloads/
22eggs/
23.eggs/
24lib/
25lib64/
26parts/
27sdist/
28var/
29wheels/
30share/python-wheels/
31*.egg-info/
32.installed.cfg
33*.egg
34MANIFEST
35
36# PyInstaller
37#  Usually these files are written by a python script from a template
38#  before PyInstaller builds the exe, so as to inject date/other infos into it.
39*.manifest
40*.spec
41
42# Installer logs
43pip-log.txt
44pip-delete-this-directory.txt
45
46# Unit test / coverage reports
47htmlcov/
48.tox/
49.nox/
50.coverage
51.coverage.*
52.cache
53nosetests.xml
54coverage.xml
55*.cover
56*.py,cover
57.hypothesis/
58.pytest_cache/
59cover/
60
61# Translations
62*.mo
63*.pot
64
65# Django stuff:
66*.log
67local_settings.py
68db.sqlite3
69db.sqlite3-journal
70
71# Flask stuff:
72instance/
73.webassets-cache
74
75# Scrapy stuff:
76.scrapy
77
78# Sphinx documentation
79docs/_build/
80
81# PyBuilder
82.pybuilder/
83target/
84
85# Jupyter Notebook
86.ipynb_checkpoints
87
88# IPython
89profile_default/
90ipython_config.py
91
92# pyenv
93#   For a library or package, you might want to ignore these files since the code is
94#   intended to run in multiple environments; otherwise, check them in:
95.python-version
96
97# pdm
98#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
99#pdm.lock
100#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
101#   in version control.
102#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
103.pdm.toml
104.pdm-python
105.pdm-build/
106
107# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
108__pypackages__/
109
110# Celery stuff
111celerybeat-schedule
112celerybeat.pid
113
114# SageMath parsed files
115*.sage.py
116
117# Environments
118.env
119.venv
120env/
121venv/
122ENV/
123env.bak/
124venv.bak/
125
126# Spyder project settings
127.spyderproject
128.spyproject
129
130# Rope project settings
131.ropeproject
132
133# mkdocs documentation
134/site
135
136# mypy
137.mypy_cache/
138.dmypy.json
139dmypy.json
140
141# Pyre type checker
142.pyre/
143
144# pytype static type analyzer
145.pytype/
146
147# Cython debug symbols
148cython_debug/
149
150# PyCharm
151#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
152#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
153#  and can be added to the global gitignore or merged into this file.  For a more nuclear
154#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
155.idea/

.gitignore

1.mise.toml
2.nvim.lua
3storage
4
5# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
6
7# Byte-compiled / optimized / DLL files
8__pycache__/
9*.py[cod]
10*$py.class
11
12# C extensions
13*.so
14
15# Distribution / packaging
16.Python
17build/
18develop-eggs/
19dist/
20downloads/
21eggs/
22.eggs/
23lib/
24lib64/
25parts/
26sdist/
27var/
28wheels/
29share/python-wheels/
30*.egg-info/
31.installed.cfg
32*.egg
33MANIFEST
34
35# PyInstaller
36#  Usually these files are written by a python script from a template
37#  before PyInstaller builds the exe, so as to inject date/other infos into it.
38*.manifest
39*.spec
40
41# Installer logs
42pip-log.txt
43pip-delete-this-directory.txt
44
45# Unit test / coverage reports
46htmlcov/
47.tox/
48.nox/
49.coverage
50.coverage.*
51.cache
52nosetests.xml
53coverage.xml
54*.cover
55*.py,cover
56.hypothesis/
57.pytest_cache/
58cover/
59
60# Translations
61*.mo
62*.pot
63
64# Django stuff:
65*.log
66local_settings.py
67db.sqlite3
68db.sqlite3-journal
69
70# Flask stuff:
71instance/
72.webassets-cache
73
74# Scrapy stuff:
75.scrapy
76
77# Sphinx documentation
78docs/_build/
79
80# PyBuilder
81.pybuilder/
82target/
83
84# Jupyter Notebook
85.ipynb_checkpoints
86
87# IPython
88profile_default/
89ipython_config.py
90
91# pyenv
92#   For a library or package, you might want to ignore these files since the code is
93#   intended to run in multiple environments; otherwise, check them in:
94.python-version
95
96# pdm
97#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
98#pdm.lock
99#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
100#   in version control.
101#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
102.pdm.toml
103.pdm-python
104.pdm-build/
105
106# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
107__pypackages__/
108
109# Celery stuff
110celerybeat-schedule
111celerybeat.pid
112
113# SageMath parsed files
114*.sage.py
115
116# Environments
117.env
118.venv
119env/
120venv/
121ENV/
122env.bak/
123venv.bak/
124
125# Spyder project settings
126.spyderproject
127.spyproject
128
129# Rope project settings
130.ropeproject
131
132# mkdocs documentation
133/site
134
135# mypy
136.mypy_cache/
137.dmypy.json
138dmypy.json
139
140# Pyre type checker
141.pyre/
142
143# pytype static type analyzer
144.pytype/
145
146# Cython debug symbols
147cython_debug/
148
149# PyCharm
150#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
151#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
152#  and can be added to the global gitignore or merged into this file.  For a more nuclear
153#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
154.idea/
155
156# Added by Apify CLI
157node_modules
158src/data_analysis_logic.py
159src/data_parsing_logic.py
160src/data_validation_logic.py
161src/data_validation_logic1.py

requirements.txt

1apify<3.0.0
2langchain-openai==0.3.6
3langgraph==0.2.73

.actor/actor.json

1{
2	"actorSpecification": 1,
3	"name": "realestate-newsletter-agent-langgraph",
4	"title": "Python LangGraph Agent",
5	"description": "LangGraph agent in python",
6	"version": "0.0",
7	"buildTag": "latest",
8	"input": "./input_schema.json",
9	"storages": {
10		"dataset": "./dataset_schema.json"
11	},
12	"meta": {
13		"templateId": "python-langgraph"
14	},
15	"dockerfile": "./Dockerfile"
16}

.actor/dataset_schema.json

1{
2  "actorSpecification": 1,
3  "views": {
4    "overview": {
5      "title": "Overview",
6      "transformation": {
7        "fields": ["response", "structured_response"]
8      },
9      "display": {
10        "component": "table",
11        "properties": {
12          "response": {
13            "label": "Response",
14            "format": "text"
15          },
16          "structured_response": {
17            "label": "Structured Response",
18            "format": "object"
19          }
20        }
21      }
22    }
23  }
24}

.actor/Dockerfile

1# First, specify the base Docker image.
2# You can see the Docker images from Apify at https://hub.docker.com/r/apify/.
3# You can also use any other image from Docker Hub.
4FROM apify/actor-python:3.13
5
6# Second, copy just requirements.txt into the Actor image,
7# since it should be the only file that affects the dependency installation in the next step,
8# in order to speed up the build.
9COPY requirements.txt ./
10
11# Install the packages specified in requirements.txt,
12# print the installed Python version, pip version,
13# and all installed packages with their versions for debugging.
14RUN echo "Python version:" \
15 && python --version \
16 && echo "Pip version:" \
17 && pip --version \
18 && echo "Installing dependencies:" \
19 && pip install -r requirements.txt \
20 && echo "All installed Python packages:" \
21 && pip freeze
22
23# Next, copy the remaining files and directories with the source code.
24# Since we do this after installing the dependencies, quick builds will be really fast
25# for most source file changes.
26COPY . ./
27
28# Use compileall to ensure the runnability of the Actor Python code.
29RUN python3 -m compileall -q .
30
31# Create and run as a non-root user.
32RUN useradd --create-home apify && \
33    chown -R apify:apify ./
34USER apify
35
36# Specify how to launch the source code of your Actor.
37# By default, the "python3 -m ." command is run.
38CMD ["python3", "-m", "src"]

.actor/input_schema.json

1{
2  "title": "Real Estate Newsletter Agent",
3  "type": "object",
4  "schemaVersion": 1,
5  "properties": {
6    "location": {
7      "title": "Location",
8      "type": "string",
9      "description": "City and State (e.g. 'San Jose, CA')",
10      "editor": "textfield"
11    },
12    "openaiApiKey": {
13      "title": "OpenAI API Key",
14      "type": "string",
15      "description": "Your OpenAI API key",
16      "editor": "textfield",
17      "isSecret": true
18    },
19    "apifyApiKey": {
20      "title": "Apify API Key",
21      "type": "string",
22      "description": "Your Apify API key",
23      "editor": "textfield",
24      "isSecret": true
25    },
26    "debug": {
27      "title": "Debug Mode",
28      "type": "boolean",
29      "description": "Enable debug logging",
30      "default": false
31    }
32  },
33  "required": ["location", "openaiApiKey", "apifyApiKey"]
34}

.actor/pay_per_event.json

1{
2    "actor-start": {
3        "eventTitle": "Price for Actor start",
4        "eventDescription": "Flat fee for starting an Actor run.",
5        "eventPriceUsd": 0.1
6    },
7    "task-completed": {
8        "eventTitle": "Price for completing the task",
9        "eventDescription": "Flat fee for completing the task.",
10        "eventPriceUsd": 0.4
11    },
12    "search-init": {
13        "eventTitle": "Search Initialization",
14        "eventDescription": "Charged when a new market search is initiated for a location",
15        "eventPriceUsd": 0.02
16    },
17    "url-processed": {
18        "eventTitle": "URL Processing",
19        "eventDescription": "Charged per validated URL from real estate sources (Zillow, Redfin, Realtor, Rocket)",
20        "eventPriceUsd": 0.02
21    },
22    "data-extracted": {
23        "eventTitle": "Data Extraction",
24        "eventDescription": "Charged per source when market data is successfully extracted from the webpage",
25        "eventPriceUsd": 0.02
26    },
27    "market-analyzed": {
28        "eventTitle": "Market Analysis",
29        "eventDescription": "Charged per source when market data is successfully analyzed and validated",
30        "eventPriceUsd": 0.02
31    },
32    "newsletter-generated": {
33        "eventTitle": "Newsletter Generation",
34        "eventDescription": "Charged for generating the final market analysis newsletter with compiled insights",
35        "eventPriceUsd": 0.50
36    }
37}

src/main.py

1"""
2Main entry point for the Apify Actor.
3Orchestrates the autonomous real estate market research process.
4"""
5
6from __future__ import annotations
7
8import logging
9import os
10from apify import Actor
11from apify_client import ApifyClient
12from openai import AsyncOpenAI
13
14from .agents.search_agent import SearchAgent
15from .agents.extraction_agent import ExtractionAgent
16from .agents.analysis_agent import AnalysisAgent
17from .agents.newsletter_agent import NewsletterAgent
18from .models.schemas import AgentState
19
20# Configure logging
21logging.basicConfig(level=logging.INFO)
22logger = logging.getLogger(__name__)
23
24async def main() -> None:
25    """Main entry point for the Apify Actor."""
26    async with Actor:
27        logger.info("Starting real estate market analysis actor")
28        
29        # Get input
30        actor_input = await Actor.get_input() or {}
31        logger.info(f"Received input with keys: {', '.join(actor_input.keys())}")
32        
33        # Set API keys from input
34        openai_api_key = actor_input.get("openaiApiKey")
35        apify_api_key = actor_input.get("apifyApiKey")
36        
37        if not openai_api_key:
38            logger.error("OpenAI API key is required in input")
39            return
40        logger.info("OpenAI API key validated")
41            
42        if not apify_api_key:
43            logger.error("Apify API key is required in input")
44            return
45        logger.info("Apify API key validated")
46        
47        # Get location
48        location = actor_input.get("location")
49        if not location:
50            logger.error("Location is required")
51            return
52        logger.info(f"Processing location: {location}")
53            
54        # Set environment variables for API keys
55        os.environ["OPENAI_API_KEY"] = openai_api_key
56        os.environ["APIFY_TOKEN"] = apify_api_key
57        logger.info("Environment variables set")
58            
59        # Initialize OpenAI client
60        try:
61            openai_client = AsyncOpenAI(api_key=openai_api_key)
62            logger.info("OpenAI client initialized")
63        except Exception as e:
64            logger.error(f"Failed to initialize OpenAI client: {str(e)}")
65            return
66        
67        # Initialize Apify client
68        try:
69            apify_client = ApifyClient(token=apify_api_key)
70            logger.info("Apify client initialized")
71        except Exception as e:
72            logger.error(f"Failed to initialize Apify client: {str(e)}")
73            return
74            
75        # Initialize agents with shared OpenAI client
76        try:
77            newsletter_agent = NewsletterAgent(client=openai_client)
78            search_agent = SearchAgent(client=openai_client)
79            extraction_agent = ExtractionAgent(client=openai_client)
80            analysis_agent = AnalysisAgent(client=openai_client)
81            logger.info("All agents initialized successfully")
82        except Exception as e:
83            logger.error(f"Failed to initialize agents: {str(e)}")
84            return
85        
86        try:
87            # Execute workflow
88            logger.info("Starting source search...")
89            urls = await search_agent.find_sources(location)
90            if not urls:
91                logger.error("No valid URLs found for the location")
92                return
93            logger.info(f"Found {len(urls)} valid URLs")
94                
95            logger.info("Starting data extraction...")
96            market_data = await extraction_agent.extract_data(urls)
97            if not market_data:
98                logger.error("No market data could be extracted from URLs")
99                return
100            logger.info(f"Extracted market data from {len(market_data) if isinstance(market_data, list) else len(market_data.keys())} sources")
101                
102            logger.info("Starting market analysis...")
103            analysis = await analysis_agent.analyze_market(market_data)
104            if not analysis:
105                logger.error("Market analysis failed to produce results")
106                return
107            logger.info("Market analysis completed successfully")
108                
109            logger.info("Generating newsletter...")
110            newsletter = await newsletter_agent.generate_newsletter(location, market_data, analysis)
111            if not newsletter:
112                logger.error("Newsletter generation failed")
113                return
114            logger.info("Newsletter generated successfully")
115            
116            # Save output
117            logger.info("Saving results...")
118            await Actor.push_data({
119                "location": location,
120                "filtered_urls": urls,
121                "market_data": market_data,
122                "analysis": analysis,
123                "newsletter": newsletter
124            })
125            logger.info("Results saved successfully")
126            
127        except Exception as e:
128            logger.error(f"Actor failed with error: {str(e)}")
129            logger.exception("Detailed error traceback:")
130            raise
131
132if __name__ == "__main__":
133    Actor.main(main)

src/models.py

1"""This module defines Pydantic models for this project.
2
3These models are used mainly for the structured tool and LLM outputs.
4Resources:
5- https://docs.pydantic.dev/latest/concepts/models/
6"""
7
8from __future__ import annotations
9
10from pydantic import BaseModel
11
12
13class InstagramPost(BaseModel):
14    """Instagram Post Pydantic model.
15
16    Returned as a structured output by the `tool_scrape_instagram_profile_posts` tool.
17
18    url: The URL of the post.
19    likes: The number of likes on the post.
20    comments: The number of comments on the post.
21    timestamp: The timestamp when the post was published.
22    caption: The post caption.
23    alt: The post alt text.
24    """
25
26    url: str
27    likes: int
28    comments: int
29    timestamp: str
30    caption: str | None = None
31    alt: str | None = None
32
33
34class AgentStructuredOutput(BaseModel):
35    """Structured output for the ReAct agent.
36
37    Returned as a structured output by the ReAct agent.
38
39    total_likes: The total number of likes on the most popular posts.
40    total_comments: The total number of comments on the most popular posts.
41    most_popular_posts: A list of the most popular posts.
42    """
43
44    total_likes: int
45    total_comments: int
46    most_popular_posts: list[InstagramPost]

src/tools.py

1"""This module defines the tools used by the agent.
2
3Feel free to modify or add new tools to suit your specific needs.
4
5To learn how to create a new tool, see:
6- https://python.langchain.com/docs/concepts/tools/
7- https://python.langchain.com/docs/how_to/#tools
8"""
9
10from __future__ import annotations
11
12from apify import Actor
13from langchain_core.tools import tool
14
15from src.models import InstagramPost
16
17
18@tool
19def tool_calculator_sum(numbers: list[int]) -> int:
20    """Tool to calculate the sum of a list of numbers.
21
22    Args:
23        numbers (list[int]): List of numbers to sum.
24
25    Returns:
26        int: Sum of the numbers.
27    """
28    return sum(numbers)
29
30
31@tool
32async def tool_scrape_instagram_profile_posts(handle: str, max_posts: int = 30) -> list[InstagramPost]:
33    """Tool to scrape Instagram profile posts.
34
35    Args:
36        handle (str): Instagram handle of the profile to scrape (without the '@' symbol).
37        max_posts (int, optional): Maximum number of posts to scrape. Defaults to 30.
38
39    Returns:
40        list[InstagramPost]: List of Instagram posts scraped from the profile.
41
42    Raises:
43        RuntimeError: If the Actor fails to start.
44    """
45    run_input = {
46        'directUrls': [f'https://www.instagram.com/{handle}/'],
47        'resultsLimit': max_posts,
48        'resultsType': 'posts',
49        'searchLimit': 1,
50    }
51    if not (run := await Actor.apify_client.actor('apify/instagram-scraper').call(run_input=run_input)):
52        msg = 'Failed to start the Actor apify/instagram-scraper'
53        raise RuntimeError(msg)
54
55    dataset_id = run['defaultDatasetId']
56    dataset_items: list[dict] = (await Actor.apify_client.dataset(dataset_id).list_items()).items
57    posts: list[InstagramPost] = []
58    for item in dataset_items:
59        url: str | None = item.get('url')
60        caption: str | None = item.get('caption')
61        alt: str | None = item.get('alt')
62        likes: int | None = item.get('likesCount')
63        comments: int | None = item.get('commentsCount')
64        timestamp: str | None = item.get('timestamp')
65
66        # only include posts with all required fields
67        if not url or not likes or not comments or not timestamp:
68            Actor.log.warning('Skipping post with missing fields: %s', item)
69            continue
70
71        posts.append(
72            InstagramPost(
73                url=url,
74                likes=likes,
75                comments=comments,
76                timestamp=timestamp,
77                caption=caption,
78                alt=alt,
79            )
80        )
81
82    return posts

src/utils.py

1from apify import Actor
2from langchain_core.messages import ToolMessage
3
4
5def log_state(state: dict) -> None:
6    """Logs the state of the graph.
7
8    Uses the `Actor.log.debug` method to log the state of the graph.
9
10    Args:
11        state (dict): The state of the graph.
12    """
13    message = state['messages'][-1]
14    # Traverse all tool messages and print them
15    # if multiple tools are called in parallel
16    if isinstance(message, ToolMessage):
17        # Until the analyst message with tool_calls
18        for _message in state['messages'][::-1]:
19            if hasattr(_message, 'tool_calls'):
20                break
21            Actor.log.debug('-------- Tool Result --------')
22            Actor.log.debug('Tool: %s', _message.name)
23            Actor.log.debug('Result: %s', _message.content)
24
25    Actor.log.debug('-------- Message --------')
26    Actor.log.debug('Message: %s', message)
27
28    # Print all tool calls
29    if hasattr(message, 'tool_calls'):
30        for tool_call in getattr(message, 'tool_calls', []):
31            Actor.log.debug('-------- Tool Call --------')
32            Actor.log.debug('Tool: %s', tool_call['name'])
33            Actor.log.debug('Args: %s', tool_call['args'])

src/__init__.py

1"""Real Estate Newsletter Agent package."""

src/__main__.py

1import asyncio
2
3from .main import main
4
5# Execute the Actor entry point.
6asyncio.run(main())

src/agents/analysis_agent.py

1"""
2Analysis Agent Module - Handles market data analysis and metrics calculation
3"""
4
5import logging
6import re
7from typing import Dict, List
8from decimal import Decimal
9from apify import Actor
10from openai import AsyncOpenAI
11
12from .extraction_agent import MarketData
13
14logger = logging.getLogger(__name__)
15
16class AnalysisAgent:
17    def __init__(self, client: AsyncOpenAI):
18        """Initialize the AnalysisAgent with an OpenAI client.
19        
20        Args:
21            client (AsyncOpenAI): The OpenAI client instance to use for API calls
22        """
23        self.client = client
24        self.high_cost_markets = ["new york", "san francisco", "los angeles", "seattle", "boston", "miami"]
25
26    async def analyze_market(self, market_data: List[MarketData], location: str = "") -> Dict:
27        """Analyze the extracted market data and extract key metrics"""
28        metrics = {
29            "zillow": {},
30            "redfin": {},
31            "realtor": {},
32            "rocket": {}
33        }
34        
35        is_high_cost = any(market.lower() in location.lower() for market in self.high_cost_markets)
36        
37        min_price = 10000
38        max_price = 10000000 if is_high_cost else 2000000
39        min_valid_price = 100000
40        max_valid_price = 5000000 if is_high_cost else 1000000
41        
42        try:
43            for data in market_data:
44                source = data.source
45                metrics[source] = {
46                    "median_price": data.median_price,
47                    "price_change": data.price_change,
48                    "inventory": data.inventory,
49                    "days_on_market": data.days_on_market,
50                    "source": source
51                }
52                
53                # Validate metrics
54                if min_price <= data.median_price <= max_price:
55                    await Actor.charge('market-analyzed')
56                    logger.info(f"Successfully analyzed data from {source}")
57                
58        except Exception as e:
59            logger.error(f"Error analyzing market data: {str(e)}")
60        
61        metrics["_meta"] = {
62            "min_valid_price": min_valid_price,
63            "max_valid_price": max_valid_price
64        }
65        
66        return metrics
67
68    async def analyze_market_data(self, market_data: Dict, location: str = "") -> Dict:
69        """Analyze the extracted market data and extract key metrics"""
70        metrics = {
71            "zillow": {},
72            "redfin": {},
73            "realtor": {},
74            "rapid": {}
75        }
76        
77        await Actor.charge('market-analyzed')
78        
79        is_high_cost = any(market.lower() in location.lower() for market in self.high_cost_markets)
80        
81        min_price = 10000
82        max_price = 10000000 if is_high_cost else 2000000
83        min_valid_price = 100000
84        max_valid_price = 5000000 if is_high_cost else 1000000
85        
86        try:
87            for source, data in market_data.items():
88                text = data.get("text", "").lower()
89                
90                if not text:
91                    continue
92                    
93                metrics_found = False
94                
95                # Extract and validate metrics
96                metrics[source].update(self._extract_price_metrics(text, min_price, max_price))
97                metrics[source].update(self._extract_price_change(text))
98                metrics[source].update(self._extract_market_metrics(text))
99                
100                if metrics[source]:
101                    metrics_found = True
102                    await Actor.charge('market-analyzed')
103                
104                metrics[source]["source_date"] = data.get("metadata", {}).get("loadedTime", "")
105                
106        except Exception as e:
107            logger.error(f"Error analyzing market data: {str(e)}")
108        
109        metrics["_meta"] = {
110            "min_valid_price": min_valid_price,
111            "max_valid_price": max_valid_price,
112            "is_high_cost": is_high_cost
113        }
114        
115        source_urls = {
116            source: data.get("metadata", {}).get("canonicalUrl") or data.get("metadata", {}).get("loadedUrl", "")
117            for source, data in market_data.items()
118        }
119        
120        return {"metrics": metrics, "source_urls": source_urls}
121
122    def _extract_price_metrics(self, text: str, min_price: int, max_price: int) -> Dict:
123        """Extract and validate price metrics"""
124        metrics = {}
125        price_patterns = [
126            r"median (?:sale )?price.*?\$([0-9,.]+)[MK]?",
127            r"average.*?home value.*?\$([0-9,.]+)[MK]?",
128            r"median.*?home value.*?\$([0-9,.]+)[MK]?",
129            r"\$([0-9,.]+)[MK]?(?=.*median)",
130        ]
131        
132        for pattern in price_patterns:
133            price_match = re.search(pattern, text)
134            if price_match:
135                try:
136                    price = float(price_match.group(1).replace(",", ""))
137                    if min_price <= price <= max_price:
138                        if "m" in text[price_match.end():price_match.end()+2].lower():
139                            metrics["median_price"] = price * 1000000
140                        elif "k" in text[price_match.end():price_match.end()+2].lower():
141                            metrics["median_price"] = price * 1000
142                        else:
143                            metrics["median_price"] = price
144                        break
145                except ValueError:
146                    continue
147        
148        return metrics
149
150    def _extract_price_change(self, text: str) -> Dict:
151        """Extract and validate price change percentage"""
152        metrics = {}
153        change_patterns = [
154            r"(up|down)\s+([0-9.]+)%\s+(?:since|compared to|over|in the) last year",
155            r"([0-9.]+)%\s+(increase|decrease)\s+(?:since|compared to|over|in the) last year",
156            r"([+-]?[0-9.]+)%\s+1-yr"
157        ]
158        
159        for pattern in change_patterns:
160            change_match = re.search(pattern, text)
161            if change_match:
162                try:
163                    if len(change_match.groups()) == 2:
164                        change = float(change_match.group(2))
165                        if "down" in change_match.group(1).lower() or "decrease" in change_match.group(2).lower():
166                            change = -change
167                    else:
168                        change = float(change_match.group(1))
169                    if abs(change) <= 50:
170                        metrics["price_change"] = change
171                        break
172                except ValueError:
173                    continue
174        
175        return metrics
176
177    def _extract_market_metrics(self, text: str) -> Dict:
178        """Extract and validate market metrics (days on market, price per sqft, inventory)"""
179        metrics = {}
180        
181        # Days on market
182        dom_patterns = [
183            r"(?:sell|sold) (?:in|after) (?:around )?([0-9]+) days",
184            r"(?:average|median) (?:of )?([0-9]+) days on (?:the )?market",
185            r"([0-9]+) days on (?:the )?market",
186            r"pending in (?:around )?([0-9]+) days"
187        ]
188        
189        for pattern in dom_patterns:
190            dom_match = re.search(pattern, text)
191            if dom_match:
192                try:
193                    days = int(dom_match.group(1))
194                    if 0 <= days <= 365:
195                        metrics["days_on_market"] = days
196                        break
197                except ValueError:
198                    continue
199        
200        # Price per sqft
201        sqft_patterns = [
202            r"\$([0-9,.]+) per square (?:foot|feet|ft)",
203            r"price per (?:square )?(?:foot|feet|ft).*?\$([0-9,.]+)"
204        ]
205        
206        for pattern in sqft_patterns:
207            sqft_match = re.search(pattern, text)
208            if sqft_match:
209                try:
210                    price_sqft = float(sqft_match.group(1).replace(",", ""))
211                    if 50 <= price_sqft <= 2000:
212                        metrics["price_per_sqft"] = price_sqft
213                        break
214                except ValueError:
215                    continue
216        
217        # Inventory
218        inv_patterns = [
219            r"([0-9,]+) homes? (?:for sale|available|active)",
220            r"inventory of ([0-9,]+) homes",
221            r"([0-9,]+) properties? (?:for sale|available|active)"
222        ]
223        
224        for pattern in inv_patterns:
225            inv_match = re.search(pattern, text)
226            if inv_match:
227                try:
228                    inventory = int(inv_match.group(1).replace(",", ""))
229                    if 0 <= inventory <= 10000:
230                        metrics["inventory"] = inventory
231                        break
232                except ValueError:
233                    continue
234        
235        return metrics

src/agents/extraction_agent.py

1"""
2Extraction Agent Module - Handles data extraction from validated sources
3"""
4
5import logging
6from typing import Dict, List
7from dataclasses import dataclass
8from decimal import Decimal
9from apify import Actor
10from apify_client import ApifyClient
11from openai import AsyncOpenAI
12import os
13import re
14
15logger = logging.getLogger(__name__)
16
17@dataclass
18class MarketData:
19    median_price: float
20    price_change: float
21    inventory: int
22    days_on_market: int
23    source: str
24
25class ExtractionAgent:
26    def __init__(self, client: AsyncOpenAI):
27        """Initialize the ExtractionAgent with OpenAI client"""
28        self.client = client
29        self.apify_client = ApifyClient(token=os.environ["APIFY_TOKEN"])
30        
31    def _extract_price_metrics(self, text: str) -> Dict:
32        """Extract price metrics from text"""
33        metrics = {}
34        price_patterns = [
35            r"(?:median|average).*?(?:home value|sale price|sold price)[^\$]*\$([0-9,.]+)([KM]?)",
36            r"\$([0-9,.]+)([KM]?)(?=.*median)",
37            r"typical home value.*?\$([0-9,.]+)([KM]?)",
38            r"median list price.*?\$([0-9,.]+)([KM]?)"
39        ]
40        
41        for pattern in price_patterns:
42            price_match = re.search(pattern, text.lower())
43            if price_match:
44                try:
45                    price_str = price_match.group(1).replace(",", "")
46                    price = float(price_str)
47                    suffix = price_match.group(2).upper() if len(price_match.groups()) > 1 else ""
48                    
49                    # Handle price multipliers
50                    if suffix == "M":
51                        price *= 1000000
52                    elif suffix == "K":
53                        price *= 1000
54                    
55                    # Validate price range (100k to 2M for normal markets)
56                    if 100000 <= price <= 2000000:
57                        metrics["median_price"] = price
58                        logger.info(f"Extracted price: ${price:,.2f}")
59                        break
60                    else:
61                        logger.warning(f"Price {price} outside valid range")
62                except ValueError:
63                    continue
64        
65        return metrics
66
67    def _extract_price_change(self, text: str) -> Dict:
68        """Extract price change percentage from text"""
69        metrics = {}
70        change_patterns = [
71            r"(?:down|decreased)\s+([0-9.]+)%",
72            r"([0-9.]+)%\s+(?:decrease)",
73            r"-([0-9.]+)%",
74            r"(?:up|increased)\s+([0-9.]+)%",
75            r"([0-9.]+)%\s+(?:increase)",
76            r"\+([0-9.]+)%"
77        ]
78        
79        for pattern in change_patterns:
80            change_match = re.search(pattern, text.lower())
81            if change_match:
82                try:
83                    change = float(change_match.group(1))
84                    # Make negative if pattern indicates decrease
85                    if any(word in pattern.lower() for word in ["down", "decrease", "-"]):
86                        change = -change
87                    
88                    # Validate change range (-20% to +20% is reasonable)
89                    if abs(change) <= 20:
90                        metrics["price_change"] = change
91                        logger.info(f"Extracted price change: {change}%")
92                        break
93                    else:
94                        logger.warning(f"Price change {change}% outside valid range")
95                except ValueError:
96                    continue
97        
98        return metrics
99
100    def _extract_market_metrics(self, text: str) -> Dict:
101        """Extract market metrics from text"""
102        metrics = {}
103        
104        # Days on market
105        dom_patterns = [
106            r"(?:median |average )?(?:days on market|dom)[:\s]+([0-9]+)",
107            r"(?:sell|sold|pending) (?:in|after) (?:around )?([0-9]+) days",
108            r"(?:average|median) (?:of )?([0-9]+) days on (?:the )?market",
109            r"([0-9]+) days? on (?:the )?market",
110            r"average listing age of ([0-9]+) days"
111        ]
112        
113        for pattern in dom_patterns:
114            dom_match = re.search(pattern, text.lower())
115            if dom_match:
116                try:
117                    days = int(dom_match.group(1))
118                    if 0 <= days <= 180:  # Most markets are under 180 days
119                        metrics["days_on_market"] = days
120                        logger.info(f"Extracted days on market: {days}")
121                        break
122                    else:
123                        logger.warning(f"Days on market {days} outside valid range")
124                except ValueError:
125                    continue
126        
127        # Inventory
128        inv_patterns = [
129            r"([0-9,]+)\s+(?:homes?|properties?|listings?|houses?)?\s+(?:for sale|available|active)",
130            r"inventory of ([0-9,]+)",
131            r"([0-9,]+) total listings",
132            r"([0-9,]+) properties found",
133            r"([0-9,]+) homes sold"
134        ]
135        
136        for pattern in inv_patterns:
137            inv_match = re.search(pattern, text.lower())
138            if inv_match:
139                try:
140                    inventory = int(inv_match.group(1).replace(",", ""))
141                    if 0 <= inventory <= 50000:  # Increased max for larger markets
142                        metrics["inventory"] = inventory
143                        logger.info(f"Extracted inventory: {inventory}")
144                        break
145                    else:
146                        logger.warning(f"Inventory {inventory} outside valid range")
147                except ValueError:
148                    continue
149        
150        return metrics
151
152    async def extract_data(self, urls: Dict[str, str]) -> List[MarketData]:
153        """Extract market data from the filtered URLs"""
154        market_data = []
155        
156        try:
157            # Convert URLs dict to list of URLData for crawler
158            url_list = [{"url": url, "method": "GET"} for url in urls.values()]
159            
160            crawler_input = {
161                "startUrls": url_list,
162                "crawlerType": "playwright",  # Changed to playwright for better JS support
163                "maxCrawlPages": len(urls),
164                "maxCrawlDepth": 0,
165                "saveMarkdown": True,
166                "maxRequestRetries": 3,
167                "maxConcurrency": 1,
168                "proxyConfiguration": {
169                    "useApifyProxy": True,
170                    "apifyProxyGroups": ["RESIDENTIAL"]
171                },
172                "removeElementsCssSelector": "nav, footer, script, style, noscript, svg, img[src^='data:']",
173                "htmlTransformer": "readableText",
174                "waitUntil": "networkidle",  # Wait for network to be idle
175                "browserPoolOptions": {
176                    "maxOpenPagesPerBrowser": 1,
177                    "retireInstanceAfterRequestCount": 5
178                }
179            }
180            
181            logger.info("Starting Website Content Crawler...")
182            run = self.apify_client.actor("apify/website-content-crawler").call(
183                run_input=crawler_input,
184                memory_mbytes=4096
185            )
186            
187            dataset_id = run["defaultDatasetId"]
188            items = self.apify_client.dataset(dataset_id).list_items().items
189            
190            for source, url in urls.items():
191                logger.info(f"Processing {source} URL: {url}")
192                extracted = False
193                
194                for item in items:
195                    if item.get("url") == url:
196                        text_content = item.get("text", "")
197                        
198                        if not text_content:
199                            logger.warning(f"No content extracted from {source}")
200                            continue
201                        
202                        # Log first 500 chars of content for debugging
203                        logger.debug(f"{source} content preview: {text_content[:500]}")
204                            
205                        # Extract metrics using parsing logic
206                        price_metrics = self._extract_price_metrics(text_content)
207                        price_change = self._extract_price_change(text_content)
208                        market_metrics = self._extract_market_metrics(text_content)
209                        
210                        if price_metrics or price_change or market_metrics:
211                            await Actor.charge('data-extracted')
212                            logger.info(f"Successfully extracted data from {source}")
213                            
214                            market_data.append(MarketData(
215                                median_price=price_metrics.get("median_price", 0),
216                                price_change=price_change.get("price_change", 0),
217                                inventory=market_metrics.get("inventory", 0),
218                                days_on_market=market_metrics.get("days_on_market", 0),
219                                source=source
220                            ))
221                            extracted = True
222                            break
223                
224                if not extracted:
225                    logger.error(f"Failed to extract valid data from {source}")
226                        
227        except Exception as e:
228            logger.error(f"Error extracting market data: {str(e)}")
229            
230        return market_data

src/agents/newsletter_agent.py

1"""
2Newsletter Agent Module - Handles report generation using OpenAI
3"""
4
5import logging
6from datetime import datetime
7from typing import Dict, Any
8from openai import AsyncOpenAI
9from apify import Actor
10
11logger = logging.getLogger(__name__)
12
13class NewsletterAgent:
14    def __init__(self, client: AsyncOpenAI):
15        """Initialize the NewsletterAgent with an OpenAI client.
16        
17        Args:
18            client (AsyncOpenAI): The OpenAI client instance to use for API calls
19        """
20        self.client = client
21
22    async def generate_newsletter(self, location: str, market_data: Dict, analysis: Dict) -> str:
23        """Generate a real estate market newsletter using OpenAI"""
24        try:
25            current_date = datetime.now().strftime("%B %Y")
26            
27            metrics = analysis.get("metrics", {})
28            source_urls = analysis.get("source_urls", {})
29            meta = metrics.get("_meta", {})
30            min_valid_price = meta.get("min_valid_price", 100000)
31            max_valid_price = meta.get("max_valid_price", 1000000)
32            
33            formatted_data = self._format_source_data(metrics)
34            formatted_urls = self._format_source_urls(source_urls)
35            avg_metrics = self._calculate_averages(metrics, min_valid_price, max_valid_price)
36            
37            system_content = self._get_system_prompt()
38            user_content = self._get_user_prompt(location, current_date, formatted_data, avg_metrics, formatted_urls)
39            
40            response = await self.client.chat.completions.create(
41                model="gpt-4o-mini",
42                messages=[
43                    {"role": "system", "content": system_content},
44                    {"role": "user", "content": user_content}
45                ],
46                temperature=0.7,
47                max_tokens=2000
48            )
49            
50            newsletter = response.choices[0].message.content
51            
52            # Charge for newsletter generation
53            await Actor.charge('newsletter-generated')
54            
55            return newsletter
56            
57        except Exception as e:
58            logger.error(f"Error generating newsletter: {str(e)}")
59            return f"Error generating newsletter: {str(e)}"
60
61    def _format_price(self, price):
62        """Format price with proper formatting"""
63        if price and isinstance(price, (int, float)):
64            return f"${price:,.0f}"
65        return "N/A"
66
67    def _format_percent(self, value):
68        """Format percentage with proper formatting"""
69        if value is not None:
70            return f"{value:+.1f}%" if value >= 0 else f"{value:.1f}%"
71        return "N/A"
72
73    def _format_source_data(self, metrics: Dict) -> str:
74        """Format market data from each source"""
75        formatted_data = ""
76        for source in ["zillow", "redfin", "realtor", "rapid"]:
77            source_data = metrics.get(source, {})
78            if source_data:
79                formatted_data += f"""
80{source.capitalize()}:
81- Median Price: {self._format_price(source_data.get('median_price'))}
82- Price Change: {self._format_percent(source_data.get('price_change'))}
83- Days on Market: {source_data.get('days_on_market', 'N/A')}
84- Price Per SqFt: {self._format_price(source_data.get('price_per_sqft'))}
85- Inventory: {source_data.get('inventory', 'N/A')}
86"""
87        return formatted_data
88
89    def _format_source_urls(self, source_urls: Dict) -> str:
90        """Format source URLs"""
91        return "\n".join(f"- {source.capitalize()}: {url}" for source, url in source_urls.items() if url)
92
93    def _calculate_averages(self, metrics: Dict, min_valid_price: int, max_valid_price: int) -> Dict:
94        """Calculate average metrics across sources"""
95        def calculate_average(metric_name):
96            values = []
97            for source, source_data in metrics.items():
98                if source == "_meta":
99                    continue
100                value = source_data.get(metric_name)
101                if value and isinstance(value, (int, float)):
102                    if metric_name == "median_price" and (value < min_valid_price or value > max_valid_price):
103                        continue
104                    if metric_name == "price_change" and abs(value) > 20:
105                        continue
106                    values.append(value)
107            return sum(values) / len(values) if values else None
108        
109        return {
110            "avg_price": calculate_average("median_price"),
111            "avg_price_change": calculate_average("price_change"),
112            "avg_dom": calculate_average("days_on_market")
113        }
114
115    def _get_system_prompt(self) -> str:
116        """Get system prompt for newsletter generation"""
117        return """You are an expert real estate newsletter writer. Create a professional, polished, and well-structured newsletter using the provided market data.
118
119Format the newsletter in Markdown with the following requirements:
1201. Begin with a main heading (#) that includes the location name and "Real Estate Market Update - [Month Year]"
1212. Add a "Last Updated" line right after the title
1223. Use subheadings (##) for different sections with proper spacing
1234. Include a well-formatted Markdown table comparing data sources
1245. Use emoji icons to highlight key points
1256. Format all prices with dollar signs and commas
1267. Include percentages with % symbol and +/- signs
1278. Use proper spacing between sections
1289. Use **bold** for critical information and *italic* for secondary emphasis
12910. If data is limited or inconsistent, acknowledge this and focus on reliable metrics
130
131Include these sections:
132- Executive Summary (3-4 sentences)
133- Market Overview (with validated average metrics)
134- Market Data Comparison (table)
135- Price Analysis
136- Market Activity
137- Market Forecast
138- Recommendations for Buyers and Sellers
139- Additional Resources"""
140
141    def _get_user_prompt(self, location: str, current_date: str, formatted_data: str, avg_metrics: Dict, formatted_urls: str) -> str:
142        """Get user prompt for newsletter generation"""
143        return f"""Create a real estate market newsletter for {location} for {current_date}.
144
145MARKET DATA:
146{formatted_data}
147
148AVERAGE METRICS (excluding outliers):
149- Average Price: {self._format_price(avg_metrics['avg_price'])}
150- Average Price Change: {self._format_percent(avg_metrics['avg_price_change'])}
151- Average Days on Market: {int(avg_metrics['avg_dom']) if avg_metrics['avg_dom'] else 'N/A'}
152
153SOURCE URLS:
154{formatted_urls}
155
156Please generate a comprehensive newsletter following the format in the system instructions. Make sure to:
1571. Focus on the most reliable data points
1582. Acknowledge any data inconsistencies
1593. Provide specific insights based on validated metrics
1604. Include actionable recommendations
1615. Format all numbers properly
1626. Use appropriate spacing and visual elements
1637. Bold important findings
1648. If certain metrics are missing or unreliable, explain the limitations"""

src/agents/search_agent.py

1"""
2Search Agent Module - Handles finding relevant real estate market sources
3"""
4
5import logging
6import re
7from typing import Dict, List, Optional
8from dataclasses import dataclass
9from decimal import Decimal
10from apify import Actor
11from apify_client import ApifyClient
12from openai import AsyncOpenAI
13import os
14
15logger = logging.getLogger(__name__)
16
17@dataclass
18class URLData:
19    url: str
20    source: str
21
22class SearchAgent:
23    # URL validation patterns focusing on essential subdirectories and formats
24    URL_PATTERNS = {
25        "zillow": r"zillow\.com/home-values/\d+/[a-zA-Z0-9-]+(?:/)?$",
26        "redfin": r"redfin\.com/city/\d+/[A-Z]{2}/[A-Za-z-]+/housing-market(?:/)?$",
27        "realtor": r"realtor\.com/realestateandhomes-search/[A-Za-z-]+_[A-Z]{2}/overview(?:/)?$",
28        "rocket": r"rocket\.com/homes/market-reports/[a-z]{2}/[a-z-]+(?:/)?$"
29    }
30
31    def __init__(self, client: AsyncOpenAI):
32        """Initialize the SearchAgent with an OpenAI client.
33        
34        Args:
35            client (AsyncOpenAI): The OpenAI client instance to use for API calls
36        """
37        self.client = client
38        self.apify_client = ApifyClient(token=os.environ["APIFY_TOKEN"])
39
40    def _normalize_location(self, location: str) -> Optional[str]:
41        """Normalize location input to a standardized format."""
42        try:
43            # Remove extra whitespace and convert to lowercase
44            location = " ".join(location.strip().lower().split())
45            
46            # Extract state code (assuming 2-letter state code)
47            state_match = re.search(r'[,\s]+([a-zA-Z]{2})$', location)
48            if not state_match:
49                logger.warning(f"No valid state code found in location: {location}")
50                return None
51                
52            state = state_match.group(1).upper()
53            
54            # Remove state code and clean up remaining location
55            base_location = location[:state_match.start()].strip()
56            
57            # Remove only non-essential location words and special characters
58            base_location = re.sub(r'\b(town|village|township|metropolitan|area)\b', '', base_location)
59            base_location = re.sub(r'[^\w\s-]', '', base_location).strip()
60            
61            # Convert spaces to hyphens and remove multiple hyphens
62            normalized = f"{'-'.join(base_location.split())}-{state}"
63            normalized = re.sub(r'-+', '-', normalized)
64            
65            logger.info(f"Normalized location '{location}' to '{normalized}'")
66            return normalized
67            
68        except Exception as e:
69            logger.error(f"Error normalizing location '{location}': {str(e)}")
70            return None
71
72    async def find_sources(self, location: str) -> Dict[str, str]:
73        """Find relevant real estate market sources for the given location.
74        
75        Args:
76            location (str): The city and state to search for
77            
78        Returns:
79            Dict[str, str]: Dictionary of source names to URLs
80        """
81        try:
82            # Charge for search initialization
83            await Actor.charge('search-init')
84            
85            # Get normalized location
86            normalized_location = self._normalize_location(location)
87            if not normalized_location:
88                raise ValueError(f"Could not normalize location: {location}")
89            
90            # Search for URLs using Google Search
91            all_urls = await self.search_urls(location)
92            
93            # Filter and validate URLs
94            filtered_urls = await self.filter_urls(all_urls)
95            if not filtered_urls:
96                raise ValueError("No valid URLs found after filtering")
97            
98            # Convert to dictionary format
99            return {url_data.source: url_data.url for url_data in filtered_urls}
100            
101        except Exception as e:
102            logger.error(f"Error finding sources: {str(e)}")
103            raise  # Re-raise the error instead of returning empty dict
104
105    async def search_urls(self, location: str) -> List[str]:
106        """Search for market research URLs using Apify Google Search Scraper"""
107        all_urls = []
108        
109        try:
110            normalized_location = self._normalize_location(location)
111            if not normalized_location:
112                raise ValueError(f"Could not normalize location: {location}")
113                
114            # Simple search query focusing on main domains
115            search_query = f"{normalized_location} real estate market site:zillow.com OR site:redfin.com OR site:realtor.com OR site:rocket.com"
116            logger.info(f"Searching with query: {search_query}")
117            
118            # Run Google Search scraper
119            run = self.apify_client.actor("apify/google-search-scraper").call(
120                run_input={
121                    "queries": search_query,
122                    "maxPagesPerQuery": 2,
123                    "resultsPerPage": 10,
124                    "languageCode": "en",
125                    "countryCode": "us",
126                    "mobileResults": False
127                }
128            )
129            
130            # Get results from dataset
131            dataset_id = run["defaultDatasetId"]
132            items = self.apify_client.dataset(dataset_id).list_items().items
133            
134            if items and len(items) > 0:
135                for item in items:
136                    for result in item.get("organicResults", []):
137                        url = result.get("url", "").strip()
138                        if url:
139                            all_urls.append(url)
140                            logger.info(f"Found URL: {url}")
141                            await Actor.charge('url-processed')
142                            
143        except Exception as e:
144            logger.error(f"Error searching URLs: {str(e)}")
145            raise  # Raise the error instead of falling back to templates
146            
147        if not all_urls:
148            logger.warning("No URLs found in search")
149            raise ValueError("No URLs found in search")  # Raise error instead of falling back
150            
151        logger.info(f"Found {len(all_urls)} URLs in total")
152        return all_urls
153
154    async def filter_urls(self, urls: List[str]) -> List[URLData]:
155        """Filter and validate URLs by source"""
156        filtered_urls = []
157        source_counts = {source: 0 for source in self.URL_PATTERNS.keys()}
158        
159        for url in urls:
160            for source, pattern in self.URL_PATTERNS.items():
161                if re.search(pattern, url, re.IGNORECASE):
162                    if source_counts[source] == 0:  # Only take first valid URL per source
163                        filtered_urls.append(URLData(url=url, source=source))
164                        source_counts[source] += 1
165                        logger.info(f"Found valid {source} URL: {url}")
166                        await Actor.charge('url-processed')
167                    break
168        
169        if not filtered_urls:
170            logger.warning("No valid URLs found after filtering")
171        else:
172            logger.info(f"Found {len(filtered_urls)} valid URLs")
173            
174        return filtered_urls
175
176    def _get_template_urls(self, normalized_location: str) -> Dict[str, str]:
177        """Get template URLs as fallback"""
178        return {
179            "zillow": f"https://www.zillow.com/homes/{normalized_location}_rb/",
180            "redfin": f"https://www.redfin.com/city/{normalized_location}",
181            "realtor": f"https://www.realtor.com/realestateandhomes-search/{normalized_location}"
182        }

src/agents/writer_agent.py

1class NewsletterWriter:
2    def __init__(self, openai_client):
3        self.client = openai_client
4        self.required_metrics = ['median_price', 'price_change', 'days_on_market']
5
6    def _validate_source_data(self, market_data):
7        """Validate and consolidate data from different sources."""
8        valid_sources = {}
9        
10        for source, data in market_data.items():
11            if not data or not isinstance(data, dict):
12                continue
13                
14            metrics = {}
15            # Extract core metrics if they exist
16            if 'median_price' in data and data['median_price']:
17                metrics['median_price'] = data['median_price']
18            if 'price_change' in data and data['price_change']:
19                metrics['price_change'] = data['price_change']
20            if 'days_on_market' in data and data['days_on_market']:
21                metrics['days_on_market'] = data['days_on_market']
22            if 'price_per_sqft' in data and data['price_per_sqft']:
23                metrics['price_per_sqft'] = data['price_per_sqft']
24            
25            # Extract additional metrics from Rocket data
26            if source == 'rocket' and isinstance(data.get('text'), str):
27                text = data['text']
28                if "Neutral Market" in text:
29                    metrics['market_type'] = "Neutral Market"
30                elif "Seller's Market" in text:
31                    metrics['market_type'] = "Seller's Market"
32                elif "Buyer's Market" in text:
33                    metrics['market_type'] = "Buyer's Market"
34                
35                # Extract inventory and sales data if available
36                if "homes for sale" in text:
37                    metrics['inventory'] = self._extract_inventory(text)
38                if "homes sold" in text:
39                    metrics['sales_volume'] = self._extract_sales(text)
40                
41            # Only include sources with actual data
42            if metrics:
43                valid_sources[source] = metrics
44                
45        return valid_sources
46
47    def _extract_inventory(self, text):
48        """Extract inventory numbers from text."""
49        try:
50            # Add logic to extract inventory numbers
51            return None
52        except:
53            return None
54
55    def _extract_sales(self, text):
56        """Extract sales volume from text."""
57        try:
58            # Add logic to extract sales numbers
59            return None
60        except:
61            return None
62
63    def _format_market_data(self, market_data):
64        """Format market data into sections for the newsletter."""
65        valid_sources = self._validate_source_data(market_data)
66        
67        if not valid_sources:
68            return "Error: No valid market data available"
69
70        # Calculate averages across sources
71        avg_metrics = {
72            'median_price': [],
73            'price_change': [],
74            'days_on_market': [],
75            'price_per_sqft': []
76        }
77        
78        for source_data in valid_sources.values():
79            for metric, values in avg_metrics.items():
80                if metric in source_data:
81                    values.append(source_data[metric])
82
83        # Format market insights
84        insights = []
85        
86        # Add price insights
87        if avg_metrics['median_price']:
88            median_price = sum(avg_metrics['median_price']) / len(avg_metrics['median_price'])
89            insights.append(f"The median home price is ${median_price:,.0f}")
90        
91        if avg_metrics['price_change']:
92            avg_change = sum(avg_metrics['price_change']) / len(avg_metrics['price_change'])
93            insights.append(f"Prices have changed by {avg_change:.1f}% over the past year")
94        
95        if avg_metrics['days_on_market']:
96            avg_dom = sum(avg_metrics['days_on_market']) / len(avg_metrics['days_on_market'])
97            insights.append(f"Homes are selling in an average of {avg_dom:.0f} days")
98
99        # Add market type if available from Rocket
100        rocket_data = valid_sources.get('rocket', {})
101        if 'market_type' in rocket_data:
102            insights.append(f"The area is currently a {rocket_data['market_type']}")
103
104        # Add inventory insights if available
105        if 'inventory' in rocket_data:
106            insights.append(f"There are currently {rocket_data['inventory']:,} homes for sale")
107
108        return {
109            'insights': insights,
110            'averages': {
111                'median_price': sum(avg_metrics['median_price']) / len(avg_metrics['median_price']) if avg_metrics['median_price'] else None,
112                'price_change': sum(avg_metrics['price_change']) / len(avg_metrics['price_change']) if avg_metrics['price_change'] else None,
113                'days_on_market': sum(avg_metrics['days_on_market']) / len(avg_metrics['days_on_market']) if avg_metrics['days_on_market'] else None
114            },
115            'sources': list(valid_sources.keys())
116        }
117
118    def write_newsletter(self, location, market_data):
119        """Generate a real estate market newsletter."""
120        try:
121            formatted_data = self._format_market_data(market_data)
122            
123            if isinstance(formatted_data, str) and formatted_data.startswith("Error"):
124                return formatted_data
125
126            # Create system prompt for the model
127            system_prompt = """You are a professional real estate market analyst writing a newsletter.
128            IMPORTANT FORMATTING RULES:
129            1. DO NOT include any tables or grid-like data presentations
130            2. Present all data in a narrative, paragraph format
131            3. Use bullet points sparingly and only for recommendations
132            4. Write in a clear, flowing style that connects insights naturally
133            5. Keep the tone professional and avoid emojis
134            6. Focus on telling the market story rather than listing data points
135            7. Keep sections concise and impactful
136            8. When presenting numbers, integrate them smoothly into sentences
137            9. Avoid markdown formatting except for section headers
138            10. Do not include comparison grids or charts"""
139
140            # Create user prompt with formatted data
141            user_prompt = f"""Write a real estate market newsletter for {location} that weaves these insights into a cohesive narrative:
142
143            Available Market Insights:
144            {chr(10).join('- ' + insight for insight in formatted_data['insights'])}
145            
146            Based on data from: {', '.join(formatted_data['sources']).title()}
147            
148            Structure the newsletter as follows:
149            1. Title and Date
150            2. Executive Summary (2-3 sentences on key trends)
151            3. Current Market Conditions (integrate price and market type insights)
152            4. Market Activity and Trends (blend sales pace and price trends)
153            5. Future Outlook (brief forecast based on current trends)
154            6. Buyer and Seller Recommendations (3-4 actionable points each)
155            
156            IMPORTANT:
157            - DO NOT include any tables or data grids
158            - Present all metrics within flowing paragraphs
159            - Focus on telling a coherent market story
160            - Keep the writing style professional and straightforward
161            - Integrate numbers naturally into sentences
162            - Use minimal formatting - only use ## for section headers"""
163
164            # Generate newsletter using OpenAI
165            response = self.client.chat.completions.create(
166                model="gpt-3.5-turbo",
167                messages=[
168                    {"role": "system", "content": system_prompt},
169                    {"role": "user", "content": user_prompt}
170                ],
171                temperature=0.7,
172                max_tokens=1500
173            )
174
175            return response.choices[0].message.content
176
177        except Exception as e:
178            return f"Error generating newsletter: {str(e)}"

src/models/schemas.py

1from typing import List, Dict, Optional
2from pydantic import BaseModel, Field
3
4class URLData(BaseModel):
5    """Data structure for URLs with their source information"""
6    url: str
7    source: str
8
9class MarketMetrics(BaseModel):
10    """Structure for real estate market metrics"""
11    median_price: Optional[float] = None
12    price_change: Optional[float] = None
13    days_on_market: Optional[int] = None
14    inventory: Optional[int] = None
15    price_per_sqft: Optional[float] = None
16    source_date: Optional[str] = None
17
18class AgentState(BaseModel):
19    """State management for the real estate newsletter agent"""
20    location: str = Field(..., description="Target location for market analysis")
21    search_urls: List[str] = Field(default_factory=list, description="Initial search results")
22    filtered_urls: List[URLData] = Field(default_factory=list, description="Filtered and validated URLs")
23    final_urls: List[URLData] = Field(default_factory=list, description="Final URLs for data extraction")
24    market_data: Dict[str, MarketMetrics] = Field(default_factory=dict, description="Extracted market data")
25    errors: List[str] = Field(default_factory=list, description="Error messages during processing")
26    location_valid: bool = Field(default=False, description="Location validation status")
27    analysis_complete: bool = Field(default=False, description="Analysis completion status")
28    newsletter: Optional[str] = None

src/models/__init__.py

1"""Models package for the Real Estate Newsletter Agent."""

src/utils/charging.py

1"""
2Shared utilities for pay-per-event charging
3"""
4
5import logging
6from decimal import Decimal
7from typing import Dict
8from apify import Actor
9
10logger = logging.getLogger(__name__)
11
12# Define all chargeable events and their prices
13EVENTS = {
14    'search-initialized': '0.02',
15    'url-processed': '0.02',
16    'data-extracted': '0.02',
17    'market-analyzed': '0.02',
18    'newsletter-generated': '0.50'
19}
20
21def register_events():
22    """Register all chargeable events with their prices"""
23    try:
24        charging_manager = Actor.get_charging_manager()
25        for event_name, price in EVENTS.items():
26            charging_manager.register_event(event_name, price)
27        logger.info("Successfully registered all chargeable events")
28    except Exception as e:
29        logger.error(f"Error registering events: {str(e)}")
30
31async def charge_event(event_name: str, count: int = 1) -> bool:
32    """Charge for an event using predefined prices
33    
34    Args:
35        event_name: Name of the event to charge for
36        count: Number of events to charge for (default: 1)
37        
38    Returns:
39        bool: True if charging was successful, False otherwise
40    """
41    try:
42        if event_name not in EVENTS:
43            logger.warning(f"Unknown event: {event_name}")
44            return False
45            
46        await Actor.charge(event_name, count)
47        logger.info(f"Successfully charged for {count} {event_name} event(s)")
48        return True
49    except Exception as e:
50        logger.warning(f"Failed to charge for {event_name}: {str(e)}")
51        return False

src/utils/url_patterns.py

1"""URL patterns for real estate market data sources"""
2
3# Regular expression patterns for validating real estate market URLs
4URL_PATTERNS = {
5    "zillow": r"zillow\.com/(?:home-values/\d+/[^/]+(?:-[a-z]{2})?|[^/]+-[a-z]{2}/home-values|[^/]+/home-values)/?$",
6    "redfin": r"redfin\.com/(?:city/\d+/[A-Z]{2}/[^/]+/housing-market|[^/]+/housing-market)/?$",
7    "realtor": r"realtor\.com/(?:realestateandhomes-search/[^/]+(?:_[A-Z]{2})?/overview|market-trends/[^/]+)/?$",
8    "rapid": r"(?:rocket|rapid)\.com/(?:homes/market-reports|market-trends)/(?:[a-z]{2}/)?[^/]+/?$"
9}
10
11# Search query templates for each source
12SEARCH_QUERIES = {
13    "zillow": "{location} real estate market home values site:zillow.com",
14    "redfin": "{location} housing market trends site:redfin.com",
15    "realtor": "{location} real estate market overview site:realtor.com",
16    "rapid": "{location} housing market report site:rocket.com"
17}
18
19# Maximum number of URLs to process per source
20MAX_URLS_PER_SOURCE = 1
21
22# Required sources for complete analysis
23REQUIRED_SOURCES = ["zillow", "redfin", "realtor", "rapid"]

src/utils/__init__.py

1"""Utilities package for the Real Estate Newsletter Agent."""