1"""Search agent for real estate properties."""
2
3import os
4import json
5from typing import List, Dict, Any, Optional, Tuple
6from datetime import datetime
7from crewai import Agent, Task, Crew
8from langchain.tools import BaseTool
9from langchain_openai import ChatOpenAI
10from apify import Actor
11
12from .models.property import PropertyListing, SearchCriteria, SearchResults
13from .scrapers.zillow import ZillowScraper
14from .scrapers.realtor import RealtorScraper
15from .scrapers.apartments import ApartmentsScraper
16from .utils.llm import filter_properties_with_llm, summarize_property
17from .utils.storage import (
18 load_previous_results,
19 mark_new_listings,
20 save_search_results,
21 push_results_to_dataset
22)
23
24
25class SearchTool(BaseTool):
26 """Tool for searching real estate listings."""
27
28 name = "search_real_estate"
29 description = "Search for real estate listings based on search criteria"
30 search_criteria: SearchCriteria = None
31
32 def __init__(self, search_criteria: SearchCriteria):
33 """Initialize the search tool.
34
35 Args:
36 search_criteria: Search criteria
37 """
38 super().__init__()
39 self.search_criteria = search_criteria
40
41 def _run(self, query: str) -> Dict[str, Any]:
42 """Run the search tool.
43
44 Args:
45 query: Search query (not used, but required by BaseTool)
46
47 Returns:
48 Search results
49 """
50
51 scrapers = []
52 if "zillow" in self.search_criteria.sources:
53 scrapers.append(ZillowScraper(self.search_criteria))
54 if "realtor" in self.search_criteria.sources:
55 scrapers.append(RealtorScraper(self.search_criteria))
56 if "apartments" in self.search_criteria.sources:
57 scrapers.append(ApartmentsScraper(self.search_criteria))
58
59
60 all_listings = []
61 sources_searched = []
62
63 for scraper in scrapers:
64 try:
65 listings = scraper.scrape()
66 all_listings.extend(listings)
67 sources_searched.append(scraper.source_name)
68 except Exception as e:
69 Actor.log.exception(f"Error scraping {scraper.source_name}: {e}")
70
71
72 previous_results = load_previous_results(self.search_criteria)
73
74
75 marked_listings = mark_new_listings(all_listings, previous_results)
76
77
78 results = SearchResults(
79 search_criteria=self.search_criteria,
80 results=marked_listings,
81 total_results=len(marked_listings),
82 new_results=sum(1 for listing in marked_listings if listing.is_new),
83 sources_searched=sources_searched
84 )
85
86
87 save_search_results(results)
88 push_results_to_dataset(results)
89
90
91 return {
92 "total_results": results.total_results,
93 "new_results": results.new_results,
94 "sources_searched": results.sources_searched,
95 "search_date": results.search_date.isoformat()
96 }
97
98 async def _arun(self, query: str) -> Dict[str, Any]:
99 """Async version of _run.
100
101 Args:
102 query: Search query
103
104 Returns:
105 Search results
106 """
107 return self._run(query)
108
109
110class FilterTool(BaseTool):
111 """Tool for filtering property listings with LLM."""
112
113 name = "filter_properties"
114 description = "Filter property listings based on search criteria using LLM"
115 search_criteria: SearchCriteria = None
116
117 def __init__(self, search_criteria: SearchCriteria):
118 """Initialize the filter tool.
119
120 Args:
121 search_criteria: Search criteria
122 """
123 super().__init__()
124 self.search_criteria = search_criteria
125
126 def _run(self, query: str) -> Dict[str, Any]:
127 """Run the filter tool.
128
129 Args:
130 query: Filter query (not used, but required by BaseTool)
131
132 Returns:
133 Filtered search results
134 """
135
136 try:
137 results_dict = None
138
139
140 if hasattr(Actor, 'main_kv_store'):
141 results_dict = Actor.main_kv_store.get_value("search_results")
142
143 elif os.path.exists("storage/key_value_stores/search_results.json"):
144 with open("storage/key_value_stores/search_results.json", "r") as f:
145 results_dict = json.load(f)
146
147 if not results_dict:
148 return {"error": "No search results found"}
149
150
151 search_results = SearchResults(**results_dict)
152
153 if not search_results.results:
154 return {"error": "No results to filter"}
155
156
157 if self.search_criteria.llm_api_token:
158 filtered_listings = filter_properties_with_llm(
159 search_results.results,
160 self.search_criteria,
161 self.search_criteria.llm_api_token
162 )
163
164
165 search_results.results = filtered_listings
166 search_results.total_results = len(filtered_listings)
167
168
169 save_search_results(search_results)
170
171 return {
172 "total_results_after_filtering": len(filtered_listings),
173 "filter_date": datetime.now().isoformat()
174 }
175 else:
176 return {"error": "No LLM API token provided for filtering"}
177
178 except Exception as e:
179 Actor.log.exception(f"Error filtering properties: {e}")
180 return {"error": str(e)}
181
182 async def _arun(self, query: str) -> Dict[str, Any]:
183 """Async version of _run.
184
185 Args:
186 query: Filter query
187
188 Returns:
189 Filtered search results
190 """
191 return self._run(query)
192
193
194class SummarizeTool(BaseTool):
195 """Tool for summarizing property listings."""
196
197 name = "summarize_properties"
198 description = "Generate summaries of property listings"
199 search_criteria: SearchCriteria = None
200
201 def __init__(self, search_criteria: SearchCriteria):
202 """Initialize the summarize tool.
203
204 Args:
205 search_criteria: Search criteria
206 """
207 super().__init__()
208 self.search_criteria = search_criteria
209
210 def _run(self, query: str) -> Dict[str, Any]:
211 """Run the summarize tool.
212
213 Args:
214 query: Summarize query (not used, but required by BaseTool)
215
216 Returns:
217 Summarized search results
218 """
219
220 try:
221 results_dict = None
222
223
224 if hasattr(Actor, 'main_kv_store'):
225 results_dict = Actor.main_kv_store.get_value("search_results")
226
227 elif os.path.exists("storage/key_value_stores/search_results.json"):
228 with open("storage/key_value_stores/search_results.json", "r") as f:
229 results_dict = json.load(f)
230
231 if not results_dict:
232 return {"error": "No search results found"}
233
234
235 search_results = SearchResults(**results_dict)
236
237 if not search_results.results:
238 return {"error": "No results to summarize"}
239
240
241 if self.search_criteria.llm_api_token:
242 summaries = []
243
244 for listing in search_results.results:
245 summary = summarize_property(listing, self.search_criteria.llm_api_token)
246 summaries.append({
247 "id": listing.id,
248 "summary": summary,
249 "is_new": listing.is_new
250 })
251
252 return {
253 "summaries": summaries,
254 "total_summaries": len(summaries),
255 "summarize_date": datetime.now().isoformat()
256 }
257 else:
258
259 summaries = []
260
261 for listing in search_results.results:
262 basic_summary = (
263 f"{listing.title}: {listing.bedrooms} bed, "
264 f"{listing.bathrooms or 'unknown'} bath {listing.property_type} "
265 f"for ${listing.price:,.2f} in {listing.address.city}, "
266 f"{listing.address.state}."
267 )
268
269 summaries.append({
270 "id": listing.id,
271 "summary": basic_summary,
272 "is_new": listing.is_new
273 })
274
275 return {
276 "summaries": summaries,
277 "total_summaries": len(summaries),
278 "summarize_date": datetime.now().isoformat()
279 }
280
281 except Exception as e:
282 Actor.log.exception(f"Error summarizing properties: {e}")
283 return {"error": str(e)}
284
285 async def _arun(self, query: str) -> Dict[str, Any]:
286 """Async version of _run.
287
288 Args:
289 query: Summarize query
290
291 Returns:
292 Summarized search results
293 """
294 return self._run(query)
295
296
297class SearchAgentCrew:
298 """Crew of agents for property search."""
299
300 def __init__(self, search_criteria: SearchCriteria):
301 """Initialize the search agent crew.
302
303 Args:
304 search_criteria: Search criteria
305 """
306 self.search_criteria = search_criteria
307 self.llm = None
308
309
310 if search_criteria.llm_api_token:
311 self.llm = ChatOpenAI(
312 api_key=search_criteria.llm_api_token,
313 temperature=0,
314 model="gpt-3.5-turbo"
315 )
316
317 def run(self) -> SearchResults:
318 """Run the search agent crew.
319
320 Returns:
321 Search results
322 """
323
324 if not self.llm:
325 Actor.log.info("No LLM API token provided, running basic search without agents")
326 search_tool = SearchTool(self.search_criteria)
327 search_tool._run("")
328
329
330 try:
331
332 if hasattr(Actor, 'main_kv_store'):
333 results_dict = Actor.main_kv_store.get_value("search_results")
334
335 elif os.path.exists("storage/key_value_stores/search_results.json"):
336 with open("storage/key_value_stores/search_results.json", "r") as f:
337 results_dict = json.load(f)
338 else:
339 results_dict = None
340
341 if results_dict:
342 return SearchResults(**results_dict)
343 except Exception as e:
344 Actor.log.error(f"Error loading search results: {e}")
345
346
347 return SearchResults(
348 search_criteria=self.search_criteria,
349 results=[],
350 total_results=0,
351 new_results=0,
352 sources_searched=[]
353 )
354
355
356 search_tool = SearchTool(self.search_criteria)
357 filter_tool = FilterTool(self.search_criteria)
358 summarize_tool = SummarizeTool(self.search_criteria)
359
360
361 search_agent = Agent(
362 role="Real Estate Search Specialist",
363 goal="Find properties that match the search criteria",
364 backstory="You are an expert in finding real estate listings across multiple platforms.",
365 verbose=True,
366 allow_delegation=True,
367 tools=[search_tool],
368 llm=self.llm
369 )
370
371 filter_agent = Agent(
372 role="Property Filter Specialist",
373 goal="Filter properties to find the best matches for the user",
374 backstory="You are an expert in analyzing property details and matching them with user preferences.",
375 verbose=True,
376 allow_delegation=True,
377 tools=[filter_tool],
378 llm=self.llm
379 )
380
381 summarize_agent = Agent(
382 role="Property Summarizer",
383 goal="Create concise, informative summaries of properties",
384 backstory="You are skilled at creating appealing property descriptions that highlight key features.",
385 verbose=True,
386 allow_delegation=True,
387 tools=[summarize_tool],
388 llm=self.llm
389 )
390
391
392 search_task = Task(
393 description=(
394 f"Search for properties in {self.search_criteria.location} "
395 f"with {self.search_criteria.min_bedrooms}+ bedrooms, "
396 f"maximum price of ${self.search_criteria.max_price or 'any'}, "
397 f"property type: {self.search_criteria.property_type}. "
398 f"Search sources: {', '.join(self.search_criteria.sources)}."
399 ),
400 agent=search_agent,
401 expected_output="A report of the total number of properties found"
402 )
403
404 filter_task = Task(
405 description=(
406 "Filter the search results to find properties that best match "
407 f"the user's criteria, especially regarding amenities: {', '.join(self.search_criteria.amenities)}"
408 ),
409 agent=filter_agent,
410 expected_output="A report of how many properties passed the filtering"
411 )
412
413 summarize_task = Task(
414 description=(
415 "Create summaries for each property highlighting key features. "
416 "Mark new listings that weren't found in previous searches."
417 ),
418 agent=summarize_agent,
419 expected_output="Summaries of each property"
420 )
421
422
423 crew = Crew(
424 agents=[search_agent, filter_agent, summarize_agent],
425 tasks=[search_task, filter_task, summarize_task],
426 verbose=True
427 )
428
429
430 try:
431 result = crew.kickoff()
432
433
434 try:
435
436 if hasattr(Actor, 'main_kv_store'):
437 results_dict = Actor.main_kv_store.get_value("search_results")
438
439 elif os.path.exists("storage/key_value_stores/search_results.json"):
440 with open("storage/key_value_stores/search_results.json", "r") as f:
441 results_dict = json.load(f)
442 else:
443 results_dict = None
444
445 if results_dict:
446 return SearchResults(**results_dict)
447 except Exception as e:
448 Actor.log.error(f"Error loading search results: {e}")
449 except Exception as e:
450 Actor.log.error(f"Error running crew: {e}")
451
452
453
454 return SearchResults(
455 search_criteria=self.search_criteria,
456 results=[],
457 total_results=0,
458 new_results=0,
459 sources_searched=[]
460 )