Job Search Agent Langraph
Deprecated
Pricing
Pay per event
Go to Store
Job Search Agent Langraph
Deprecated
An autonomous AI agent that helps you find the perfect job match by analyzing your resume and searching across LinkedIn and Indeed. Built with Apify and powered by advanced AI capabilities.
0.0 (0)
Pricing
Pay per event
0
Total users
1
Monthly users
1
Runs succeeded
33%
Last modified
2 months ago
.dockerignore
.git.mise.toml.nvim.luastorage
# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files__pycache__/*.py[cod]*$py.class
# C extensions*.so
# Distribution / packaging.Pythonbuild/develop-eggs/dist/downloads/eggs/.eggs/lib/lib64/parts/sdist/var/wheels/share/python-wheels/*.egg-info/.installed.cfg*.eggMANIFEST
# PyInstaller# Usually these files are written by a python script from a template# before PyInstaller builds the exe, so as to inject date/other infos into it.*.manifest*.spec
# Installer logspip-log.txtpip-delete-this-directory.txt
# Unit test / coverage reportshtmlcov/.tox/.nox/.coverage.coverage.*.cachenosetests.xmlcoverage.xml*.cover*.py,cover.hypothesis/.pytest_cache/cover/
# Translations*.mo*.pot
# Django stuff:*.loglocal_settings.pydb.sqlite3db.sqlite3-journal
# Flask stuff:instance/.webassets-cache
# Scrapy stuff:.scrapy
# Sphinx documentationdocs/_build/
# PyBuilder.pybuilder/target/
# Jupyter Notebook.ipynb_checkpoints
# IPythonprofile_default/ipython_config.py
# pyenv# For a library or package, you might want to ignore these files since the code is# intended to run in multiple environments; otherwise, check them in:.python-version
# pdm# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.#pdm.lock# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it# in version control.# https://pdm.fming.dev/latest/usage/project/#working-with-version-control.pdm.toml.pdm-python.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm__pypackages__/
# Celery stuffcelerybeat-schedulecelerybeat.pid
# SageMath parsed files*.sage.py
# Environments.env.venvenv/venv/ENV/env.bak/venv.bak/
# Spyder project settings.spyderproject.spyproject
# Rope project settings.ropeproject
# mkdocs documentation/site
# mypy.mypy_cache/.dmypy.jsondmypy.json
# Pyre type checker.pyre/
# pytype static type analyzer.pytype/
# Cython debug symbolscython_debug/
# PyCharm# JetBrains specific template is maintained in a separate JetBrains.gitignore that can# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore# and can be added to the global gitignore or merged into this file. For a more nuclear# option (not recommended) you can uncomment the following to ignore the entire idea folder..idea/
.gitignore
.mise.toml.nvim.luastorage
# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files__pycache__/*.py[cod]*$py.class
# C extensions*.so
# Distribution / packaging.Pythonbuild/develop-eggs/dist/downloads/eggs/.eggs/lib/lib64/parts/sdist/var/wheels/share/python-wheels/*.egg-info/.installed.cfg*.eggMANIFEST
# PyInstaller# Usually these files are written by a python script from a template# before PyInstaller builds the exe, so as to inject date/other infos into it.*.manifest*.spec
# Installer logspip-log.txtpip-delete-this-directory.txt
# Unit test / coverage reportshtmlcov/.tox/.nox/.coverage.coverage.*.cachenosetests.xmlcoverage.xml*.cover*.py,cover.hypothesis/.pytest_cache/cover/
# Translations*.mo*.pot
# Django stuff:*.loglocal_settings.pydb.sqlite3db.sqlite3-journal
# Flask stuff:instance/.webassets-cache
# Scrapy stuff:.scrapy
# Sphinx documentationdocs/_build/
# PyBuilder.pybuilder/target/
# Jupyter Notebook.ipynb_checkpoints
# IPythonprofile_default/ipython_config.py
# pyenv# For a library or package, you might want to ignore these files since the code is# intended to run in multiple environments; otherwise, check them in:.python-version
# pdm# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.#pdm.lock# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it# in version control.# https://pdm.fming.dev/latest/usage/project/#working-with-version-control.pdm.toml.pdm-python.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm__pypackages__/
# Celery stuffcelerybeat-schedulecelerybeat.pid
# SageMath parsed files*.sage.py
# Environments.env.venvenv/venv/ENV/env.bak/venv.bak/
# Spyder project settings.spyderproject.spyproject
# Rope project settings.ropeproject
# mkdocs documentation/site
# mypy.mypy_cache/.dmypy.jsondmypy.json
# Pyre type checker.pyre/
# pytype static type analyzer.pytype/
# Cython debug symbolscython_debug/
# PyCharm# JetBrains specific template is maintained in a separate JetBrains.gitignore that can# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore# and can be added to the global gitignore or merged into this file. For a more nuclear# option (not recommended) you can uncomment the following to ignore the entire idea folder..idea/
# Added by Apify CLInode_modules
# Virtual Environmentvenv/ENV/env/
# IDE.idea/.vscode/*.swp*.swo
# Apify specificapify_storage/storage/.env
# Logs*.lognpm-debug.log*yarn-debug.log*yarn-error.log*
# OS specific.DS_StoreThumbs.db
Dockerfile
FROM apify/actor-python:3.11
# Copy package filesCOPY requirements.txt ./
# Install dependenciesRUN pip install --no-cache-dir -r requirements.txt
# Copy source codeCOPY . ./
# Run the actorCMD ["python", "src/main.py"]
INPUT_SCHEMA.json
{ "title": "AI Job Search Agent Input", "type": "object", "schemaVersion": 1, "properties": { "openAiApiKey": { "title": "OpenAI API Key", "type": "string", "description": "Your OpenAI API key for AI services", "editor": "textfield", "isSecret": true }, "resumeText": { "title": "Resume Text", "type": "string", "description": "The full text of your resume", "editor": "textarea" }, "locationPreference": { "title": "Location Preference", "type": "string", "description": "Your preferred job location (e.g., 'Seattle, WA')", "editor": "textfield" }, "workModePreference": { "title": "Work Mode Preference", "type": "string", "description": "Preferred work mode", "default": "Any", "enum": ["Any", "Remote", "Hybrid", "On-site"] }, "searchRadius": { "title": "Search Radius", "type": "integer", "description": "Search radius in miles", "default": 25 }, "minSalary": { "title": "Minimum Salary", "type": "integer", "description": "Minimum annual salary", "default": 0 } }, "required": ["openAiApiKey", "resumeText"]}
project_outline.md
1# Job Search Agent - Apify Actor Project Outline2
3Based on the existing codebase and the Apify templates, I'll outline a project structure for converting the current job search functionality into an Apify actor using LangGraph. This approach maintains the core functionality while leveraging Apify's infrastructure.4
5## Project Structure6
7```8job-search-agent/9├── .actor/ # Apify actor configuration10│ ├── actor.json # Actor metadata11│ └── input_schema.json # Input schema definition12├── src/13│ ├── agents/ # Agent definitions14│ │ ├── __init__.py15│ │ ├── manager.py # Orchestration agent16│ │ ├── resume_parser.py # Resume parsing agent17│ │ ├── job_scraper.py # Job search agent18│ │ ├── relevance_scorer.py # Scoring and matching agent19│ │ └── output_formatter.py # Results formatting agent20│ ├── tools/ # Tool definitions21│ │ ├── __init__.py22│ │ ├── apify_tools.py # LinkedIn/Indeed search tools23│ │ └── utility_tools.py # Helper tools24│ ├── models/ # Data models25│ │ ├── __init__.py26│ │ ├── resume.py # Resume data structure27│ │ └── job.py # Job listing data structure28│ ├── utils/ # Utility functions29│ │ ├── __init__.py30│ │ ├── text_processing.py # Text cleaning utilities31│ │ └── scoring.py # Scoring algorithms32│ ├── config.py # Configuration settings33│ └── main.py # Entry point34├── .gitignore35├── README.md36└── requirements.txt # Dependencies37```38
39## Workflow Logic40
41The job search workflow will follow these steps:42
431. **Input Processing**44 - Parse user input (resume text, location preferences, work mode)45 - Validate and prepare for processing46
472. **Resume Parsing**48 - Extract essential information from resume49 - Categorize skills, experience, education50 - Identify key search terms51
523. **Job Search**53 - Use Apify actors for LinkedIn and Indeed searches54 - Apply search parameters from resume55 - Collect raw job listings56
574. **Relevance Scoring**58 - Score each job based on match criteria59 - Calculate component scores (skills, experience, location)60 - Generate overall match percentage61
625. **Results Formatting**63 - Structure results with match details64 - Sort by relevance score65 - Add match explanations66
676. **Output Generation**68 - Save to Apify dataset69 - Generate summary statistics70 - Create downloadable results71
72## LangGraph Implementation73
74Using the [python-langgraph template](https://github.com/apify/actor-templates/tree/master/templates/python-langgraph), we'll implement a graph-based workflow:75
76```python77# Simplified graph structure78graph = StateGraph(nodes=[79 Node("resume_parser", resume_parser_agent),80 Node("job_scraper", job_scraper_agent),81 Node("relevance_scorer", relevance_scorer_agent),82 Node("output_formatter", output_formatter_agent)83])84
85# Define transitions86graph.add_edge("resume_parser", "job_scraper")87graph.add_edge("job_scraper", "relevance_scorer")88graph.add_edge("relevance_scorer", "output_formatter")89```90
91## Scoring Logic92
93The relevance scoring will use a weighted approach:94
95```96Overall Score = (Skills Match × 0.40) + 97 (Experience Match × 0.25) + 98 (Location Match × 0.20) + 99 (Company/Role Fit × 0.15)100```101
102Each component will be scored from 0-100:103
1041. **Skills Match (40%)**105 - Required skills present in resume106 - Skill level/proficiency match107 - Technology stack alignment108
1092. **Experience Match (25%)**110 - Years of experience match111 - Role similarity112 - Industry relevance113
1143. **Location Match (20%)**115 - Exact location match116 - Remote/hybrid preference match117 - Commute distance (if applicable)118
1194. **Company/Role Fit (15%)**120 - Company size preference121 - Industry alignment122 - Career growth potential123
124## Input Schema125
126```json127{128 "title": "Job Search Input",129 "type": "object",130 "schemaVersion": 1,131 "properties": {132 "resumeText": {133 "title": "Resume Text",134 "type": "string",135 "description": "Full text of the resume",136 "editor": "textarea"137 },138 "locationPreference": {139 "title": "Location Preference",140 "type": "string",141 "description": "Preferred job location",142 "editor": "textfield"143 },144 "workModePreference": {145 "title": "Work Mode",146 "type": "string",147 "description": "Preferred work mode (Remote, Hybrid, On-site)",148 "editor": "select",149 "enum": ["Remote", "Hybrid", "On-site", "Any"]150 },151 "searchRadius": {152 "title": "Search Radius (miles)",153 "type": "integer",154 "description": "Maximum distance from preferred location",155 "default": 25156 },157 "minSalary": {158 "title": "Minimum Salary",159 "type": "integer",160 "description": "Minimum acceptable salary",161 "default": 0162 }163 },164 "required": ["resumeText"]165}166```167
168## Output Format169
170```json171{172 "query": {173 "resumeSummary": "...",174 "searchParameters": {175 "location": "...",176 "workMode": "...",177 "searchRadius": 25178 }179 },180 "results": [181 {182 "position": "Software Engineer",183 "company": "Example Corp",184 "location": "New York, NY",185 "workMode": "Hybrid",186 "salary": "$120,000 - $150,000",187 "matchScore": 87,188 "matchDetails": {189 "skillsMatch": 90,190 "experienceMatch": 85,191 "locationMatch": 75,192 "companyFitMatch": 95193 },194 "matchExplanation": "Strong match on required skills (Python, React). Experience level aligns well with 5+ years requirement. Location is within commuting distance. Company culture emphasizes work-life balance.",195 "keyRequirements": ["Python", "React", "5+ years experience"],196 "applicationUrl": "https://example.com/jobs/123"197 }198 ],199 "statistics": {200 "totalJobsFound": 120,201 "averageMatchScore": 72,202 "topSkillsRequested": ["Python", "JavaScript", "AWS"],203 "salaryRange": {204 "min": 80000,205 "max": 180000,206 "average": 125000207 }208 }209}210```211
212## Implementation Approach213
2141. **Start with the python-langgraph template**215 - This provides the LangGraph structure needed for agent workflow216 - Already includes Apify SDK integration217
2182. **Implement agents as LangGraph nodes**219 - Each agent will be a node in the graph220 - Define clear input/output contracts221
2223. **Use Apify actors as tools**223 - LinkedIn and Indeed scrapers as external tools224 - Integrate with LangChain tool calling format225
2264. **Implement scoring logic in Python**227 - Create utility functions for match calculations228 - Use weighted scoring approach229
2305. **Store results in Apify dataset**231 - Structured job matches with scores232 - Include match explanations and statistics233
234## Deployment and Execution235
2361. **Local Development**237 - Use Apify CLI for local testing238 - Run with `apify run` command239
2402. **Apify Platform Deployment**241 - Push to Apify with `apify push`242 - Configure environment variables243
2443. **Execution**245 - Run via Apify UI or API246 - Monitor execution in Apify console247 - Download results as JSON248
249This project structure maintains the core functionality of the existing job search implementation while leveraging Apify's infrastructure and the LangGraph framework for agent orchestration.250
251[Source: Python LangGraph template](https://github.com/apify/actor-templates/tree/master/templates/python-langgraph)
requirements.txt
1# Feel free to add your Python dependencies below. For formatting guidelines, see:2# https://pip.pypa.io/en/latest/reference/requirements-file-format/3
4# Apify SDK5apify==1.1.36apify-client==1.4.07
8# LangGraph and related packages9langgraph==0.0.2010
11# OpenAI12openai>=1.10.013
14# Utilities15python-dotenv==1.0.016pydantic==1.10.817tenacity==8.2.318
19# Web scraping20beautifulsoup4==4.12.221httpx==0.24.1
.actor/actor.json
{ "actorSpecification": 1, "name": "job-search-agent-langraph", "title": "AI Job Search Agent", "description": "An AI-powered job search agent that analyzes resumes and finds matching job opportunities from LinkedIn and Indeed.", "version": "0.1", "buildTag": "latest", "meta": { "templateId": "python-langgraph" }, "input": "input_schema.json", "dockerfile": "./Dockerfile", "storages": { "dataset": { "actorSpecification": 1, "title": "Job Search Results", "views": { "overview": { "title": "Overview", "transformation": { "fields": [ "position", "company", "location", "matchScore", "applicationUrl" ] }, "display": { "component": "table", "properties": { "matchScore": { "template": "{{value}}%", "format": "number" } } } } } } }}
.actor/Dockerfile
# First, specify the base Docker image.# You can see the Docker images from Apify at https://hub.docker.com/r/apify/.# You can also use any other image from Docker Hub.FROM apify/actor-python:3.11
# Copy package filesCOPY requirements.txt ./
# Install dependenciesRUN pip install --no-cache-dir -r requirements.txt
# Copy source codeCOPY . ./
# Run the actorCMD ["python", "src/main.py"]
.actor/input_schema.json
{ "title": "Job Search Input", "type": "object", "schemaVersion": 1, "properties": { "apifyApiToken": { "title": "Apify API Token", "type": "string", "description": "Your Apify API token for running job search actors", "editor": "textfield", "isSecret": true }, "openAiApiKey": { "title": "OpenAI API Key", "type": "string", "description": "Your OpenAI API key for AI services", "editor": "textfield", "isSecret": true }, "resumeText": { "title": "Resume Text", "type": "string", "description": "Full text of the resume", "editor": "textarea" }, "locationPreference": { "title": "Location Preference", "type": "string", "description": "Preferred job location", "editor": "textfield" }, "workModePreference": { "title": "Work Mode", "type": "string", "description": "Preferred work mode (Remote, Hybrid, On-site)", "editor": "select", "enum": ["Remote", "Hybrid", "On-site", "Any"] }, "searchRadius": { "title": "Search Radius (miles)", "type": "integer", "description": "Maximum distance from preferred location", "default": 25 }, "minSalary": { "title": "Minimum Salary", "type": "integer", "description": "Minimum acceptable salary", "default": 0 } }, "required": ["apifyApiToken", "openAiApiKey", "resumeText"]}
src/main.py
1"""2AI Job Search Agent - Apify Actor3
4This actor analyzes a resume and finds matching job opportunities from LinkedIn and Indeed.5"""6
7import os8import json9import logging10from typing import Dict, Any11from datetime import datetime12import asyncio13import sys14
15# Add the src directory to the Python path16sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))17
18from apify import Actor19from openai import OpenAI20from dotenv import load_dotenv21from apify_client import ApifyClient22
23# Import settings24from src.config.settings import (25 OPENAI_MODEL,26 ACTOR_IDS,27 FINAL_SCORING_WEIGHTS28)29
30# Import agents31from src.agents.manager import ManagerAgent32from src.agents.resume_parser import ResumeParserAgent33from src.agents.relevance_scorer import RelevanceScorerAgent34from src.agents.notification import NotificationAgent35from src.models.schema import JobSearchState36
37# Configure logging38logging.basicConfig(39 level=logging.INFO,40 format='%(asctime)s [%(levelname)s] [%(name)s] %(message)s',41 handlers=[logging.StreamHandler()]42)43logger = logging.getLogger(__name__)44
45# Load environment variables46load_dotenv()47
48# Event names and prices49EVENTS = {50 'resume_parse': 0.10, # Resume parsing51 'job_score': 0.02, # Per job scoring52 'results_summary': 0.10 # Final summary53}54
55class JobSearchActor:56 def __init__(self, apify_client: ApifyClient):57 self.apify_client = apify_client58 self.manager = ManagerAgent(apify_client)59 self.resume_parser = ResumeParserAgent(apify_client)60 self.relevance_scorer = RelevanceScorerAgent(apify_client)61 self.notification = NotificationAgent(apify_client)62 63 async def process_step(self, state: Dict[str, Any]) -> Dict[str, Any]:64 """Process a single step in the workflow based on current phase."""65 current_phase = state.get("current_phase", "initialize")66 67 try:68 if current_phase == "parse_resume":69 logger.info("Parsing resume...")70 state = await self.resume_parser.process(state)71 # Log charge for resume parsing72 await self._log_charge('resume_parse')73 74 elif current_phase == "search_jobs":75 logger.info("Searching for jobs...")76 # Use Apify's LinkedIn Jobs Scraper77 run_input = {78 "keywords": state["resume_data"]["desired_role"],79 "location": state["resume_data"]["location_preference"],80 "maxItems": 50,81 "timeoutSecs": 120,82 "memoryMbytes": 102483 }84 85 run = await self.apify_client.actor(ACTOR_IDS["linkedin_scraper"]).call(run_input=run_input)86 dataset_items = await self.apify_client.dataset(run["defaultDatasetId"]).list_items().items()87 88 if not dataset_items:89 logger.warning("No jobs found in search results")90 state["error_log"] = state.get("error_log", []) + ["No jobs found during search"]91 else:92 logger.info(f"Found {len(dataset_items)} jobs")93 state["job_listings"] = dataset_items94 95 state["scraping_complete"] = True96 97 elif current_phase == "score_jobs":98 logger.info("Scoring jobs...")99 state = await self.relevance_scorer.process(state)100 # Log charge for job scoring101 job_count = len(state.get("scored_listings", []))102 for _ in range(job_count):103 await self._log_charge('job_score')104 105 elif current_phase == "send_notifications":106 logger.info("Generating results summary...")107 state = await self.notification.process(state)108 # Log charge for results summary109 await self._log_charge('results_summary')110 111 except Exception as e:112 logger.error(f"Error in {current_phase}: {str(e)}")113 state["error_log"] = state.get("error_log", []) + [f"Error in {current_phase}: {str(e)}"]114 115 return state116
117 async def _log_charge(self, event_type: str):118 """Log a charge event."""119 if event_type in EVENTS:120 await self.apify_client.key_value_store().set_record(121 f"CHARGE_{datetime.now().isoformat()}",122 {123 "event": event_type,124 "amount": EVENTS[event_type],125 "timestamp": datetime.now().isoformat()126 }127 )128
129 async def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:130 """Run the job search workflow."""131 # Initialize state132 state = JobSearchState(133 resume_text=input_data.get("resumeText", ""),134 resume_parsed=False,135 scraping_complete=False,136 current_phase="initialize",137 next_step="parse_resume"138 ).dict()139 140 # Process workflow until completion141 while state.get("status") not in ["completed", "failed"]:142 # Let manager determine next step143 state = await self.manager.process(state)144 145 if state.get("status") in ["completed", "failed"]:146 break147 148 # Process the current step149 state = await self.process_step(state)150 151 # Add checkpoint to dataset152 await self.apify_client.key_value_store().set_record(153 "STATE_CHECKPOINT",154 state155 )156 157 return state158
159async def main():160 """Main entry point for the actor."""161 async with Actor:162 # Initialize Apify client163 apify_client = ApifyClient()164 165 try:166 # Get input167 actor_input = await Actor.get_input() or {}168 169 # Initialize and run job search170 job_search = JobSearchActor(apify_client)171 result = await job_search.run(actor_input)172 173 # Save output174 await Actor.push_data(result)175 176 except Exception as e:177 logger.error(f"Actor failed: {str(e)}")178 raise179
180if __name__ == "__main__":181 asyncio.run(main())
src/py.typed
src/__init__.py
1
src/__main__.py
1import asyncio2
3from .main import main4
5# Execute the Actor entry point.6asyncio.run(main())
src/config/settings.py
1"""Configuration settings for the job search actor."""2
3import os4from typing import Dict5from dotenv import load_dotenv6
7# Load environment variables8load_dotenv()9
10# API Keys - these should be set as environment variables11APIFY_API_TOKEN = os.getenv("APIFY_API_TOKEN")12OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")13
14# OpenAI Model Configuration15OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")16
17# Apify Actor IDs18ACTOR_IDS: Dict[str, str] = {19 "linkedin_scraper": "krandiash/linkedin-jobs-scraper",20 "indeed_scraper": "epctex/indeed-scraper"21}22
23# Job Search Settings24JOB_SEARCH_CONFIG = {25 "initial_results_per_source": 5, # Reduced for testing26 "detail_fetch_threshold": 0.6, # Minimum initial score to fetch details27 "max_details_to_fetch": 5, # Reduced for testing28 "max_days_old": 30 # Maximum age of job postings29}30
31# Scoring weights for initial filtering32INITIAL_SCORING_WEIGHTS = {33 "title_match": 0.4,34 "location_match": 0.3,35 "company_relevance": 0.2,36 "posting_date": 0.137}38
39# Detailed scoring weights40FINAL_SCORING_WEIGHTS = {41 "position_match": 0.25,42 "skills_experience": 0.40,43 "location": 0.20,44 "company": 0.15,45 "salary": 0.0,46 "benefits": 0.047}48
49# System settings50RETRY_ATTEMPTS = 351RETRY_DELAY = 5 # seconds
src/config/__init__.py
1"""Configuration module for job search actor."""2
3# Config package initialization4from .settings import (5 APIFY_API_TOKEN,6 OPENAI_API_KEY,7 OPENAI_MODEL,8 ACTOR_IDS,9 JOB_SEARCH_CONFIG,10 INITIAL_SCORING_WEIGHTS,11 FINAL_SCORING_WEIGHTS,12 RETRY_ATTEMPTS,13 RETRY_DELAY14)15
16__all__ = [17 'APIFY_API_TOKEN',18 'OPENAI_API_KEY',19 'OPENAI_MODEL',20 'ACTOR_IDS',21 'JOB_SEARCH_CONFIG',22 'INITIAL_SCORING_WEIGHTS',23 'FINAL_SCORING_WEIGHTS',24 'RETRY_ATTEMPTS',25 'RETRY_DELAY'26]
src/agents/base.py
1from typing import Any, Dict, Optional2from apify_client import ApifyClient3from abc import ABC, abstractmethod4import logging5
6# Import the retry utility using absolute import7from src.utils.retry import with_retry8
9logger = logging.getLogger(__name__)10
11class BaseAgent(ABC):12 def __init__(self, apify_client: ApifyClient, actor_id: Optional[str] = None):13 self.apify_client = apify_client14 self.actor_id = actor_id15 self.system_prompt = self._get_system_prompt()16
17 @abstractmethod18 def _get_system_prompt(self) -> str:19 """Return the system prompt for this agent."""20 raise NotImplementedError("Subclasses must implement _get_system_prompt")21
22 @abstractmethod23 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:24 """Process the current state and return updated state."""25 raise NotImplementedError("Subclasses must implement process")26
27 @with_retry()28 def run_actor(self, actor_id: Optional[str] = None, input_data: Dict[str, Any] = None) -> Dict[str, Any]:29 """Run the associated Apify actor if one exists."""30 actor_id = actor_id or self.actor_id31 if not actor_id:32 raise ValueError("No actor_id specified for this agent")33 34 try:35 run = self.apify_client.actor(actor_id).call(run_input=input_data)36 dataset_items = self.apify_client.dataset(run["defaultDatasetId"]).list_items().items37 return dataset_items38 except Exception as e:39 # Log the error and re-raise40 error_msg = f"Error running actor {actor_id}: {str(e)}"41 logger.error(error_msg)42 raise43
44 async def _safe_api_call(self, func, *args, **kwargs):45 """Safely make an API call with error handling."""46 try:47 return await func(*args, **kwargs)48 except Exception as e:49 logger.error(f"API call failed: {str(e)}")50 raise
src/agents/feedback_refiner.py
1from typing import Dict, Any, List2from langchain_openai import ChatOpenAI3from langchain_core.messages import SystemMessage, HumanMessage4from .base import BaseAgent5from ..config.settings import OPENAI_API_KEY, OPENAI_MODEL6
7class FeedbackRefinerAgent(BaseAgent):8 def __init__(self, apify_client):9 super().__init__(apify_client)10 self.llm = ChatOpenAI(11 api_key=OPENAI_API_KEY,12 model=OPENAI_MODEL,13 temperature=0.214 )15
16 def _get_system_prompt(self) -> str:17 return """18 Feedback Refiner Agent responsible for:19 1. Processing user feedback on job matches20 2. Refining search parameters21 3. Adjusting scoring weights based on preferences22 4. Improving match quality over time23 """24
25 def _analyze_feedback(self, feedback: Dict[str, Any]) -> Dict[str, Any]:26 """Analyze user feedback to determine parameter adjustments."""27 adjustments = {28 "search_params": {},29 "scoring_weights": {},30 "filters": {}31 }32
33 # Process location preferences34 if feedback.get("location_feedback"):35 location_pref = feedback["location_feedback"]36 adjustments["search_params"]["location"] = location_pref37 if "remote" in location_pref.lower():38 adjustments["filters"]["remote_only"] = True39
40 # Process role preferences41 if feedback.get("role_feedback"):42 role_pref = feedback["role_feedback"]43 adjustments["search_params"]["keywords"] = role_pref44 45 # Process experience level preferences46 if feedback.get("experience_feedback"):47 exp_pref = feedback["experience_feedback"]48 adjustments["search_params"]["experience_level"] = exp_pref49
50 # Process salary preferences51 if feedback.get("salary_feedback"):52 salary_pref = feedback["salary_feedback"]53 adjustments["filters"]["salary_min"] = salary_pref.get("min")54 adjustments["filters"]["salary_max"] = salary_pref.get("max")55
56 # Process company preferences57 if feedback.get("company_preferences"):58 company_prefs = feedback["company_preferences"]59 adjustments["filters"]["preferred_companies"] = company_prefs60
61 return adjustments62
63 def _refine_search_parameters(self, current_params: Dict[str, Any], adjustments: Dict[str, Any]) -> Dict[str, Any]:64 """Refine search parameters based on feedback analysis."""65 refined_params = current_params.copy()66 67 # Update search parameters68 if "search_params" in adjustments:69 refined_params.update(adjustments["search_params"])70 71 # Update filters72 if "filters" in adjustments:73 if "filters" not in refined_params:74 refined_params["filters"] = {}75 refined_params["filters"].update(adjustments["filters"])76 77 # Update scoring weights if provided78 if "scoring_weights" in adjustments:79 if "scoring_weights" not in refined_params:80 refined_params["scoring_weights"] = {}81 refined_params["scoring_weights"].update(adjustments["scoring_weights"])82 83 return refined_params84
85 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:86 """Process user feedback and refine search parameters."""87 feedback = state.get("feedback", {})88 current_params = state.get("search_parameters", {})89 90 if not feedback:91 return state92 93 # Analyze feedback94 adjustments = self._analyze_feedback(feedback)95 96 # Refine parameters97 refined_params = self._refine_search_parameters(current_params, adjustments)98 99 # Update state100 state["search_parameters"] = refined_params101 state["feedback_processed"] = True102 state["parameter_adjustments"] = adjustments103 104 return state
src/agents/job_scraper.py
1from typing import Any, Dict, List2from datetime import datetime, timedelta3import asyncio4from .base import BaseAgent5from ..config.settings import (6 ACTOR_IDS,7 JOB_SEARCH_CONFIG,8 INITIAL_SCORING_WEIGHTS9)10from ..models.schema import ResumeData11
12class JobScraperAgent(BaseAgent):13 def __init__(self, apify_client):14 super().__init__(apify_client)15 self.linkedin_search_actor = ACTOR_IDS["linkedin_jobs_search"]16 self.linkedin_detail_actor = ACTOR_IDS["linkedin_job_detail"]17 self.indeed_scraper = ACTOR_IDS["indeed_scraper"]18
19 def _get_system_prompt(self) -> str:20 return """21 Job Scraping Agent responsible for:22 1. Initial job search from multiple sources23 2. Preliminary filtering and scoring24 3. Detailed information gathering for promising matches25 """26
27 def _search_linkedin_jobs(self, search_params: Dict[str, Any]) -> List[Dict]:28 """Perform initial LinkedIn job search."""29 search_input = {30 "keywords": search_params["keywords"],31 "location": search_params["location"],32 "limit": JOB_SEARCH_CONFIG["initial_results_per_source"]33 }34 35 return self.run_actor(36 actor_id=self.linkedin_search_actor,37 input_data=search_input38 )39
40 def _search_indeed_jobs(self, search_params: Dict[str, Any]) -> List[Dict]:41 """Perform initial Indeed job search."""42 search_input = {43 "keyword": search_params["keywords"],44 "location": search_params["location"],45 "maxResults": JOB_SEARCH_CONFIG["initial_results_per_source"]46 }47 48 return self.run_actor(49 actor_id=self.indeed_scraper,50 input_data=search_input51 )52
53 def _calculate_initial_score(self, job: Dict, resume_data: ResumeData) -> float:54 """Calculate preliminary score for a job listing."""55 scores = {56 "title_match": self._score_title_match(job["title"], resume_data.desired_role),57 "location_match": self._score_location_match(job["location"], resume_data.location_preference),58 "company_relevance": self._score_company_relevance(job["company"], resume_data.industry_experience),59 "posting_date": self._score_posting_date(job.get("posted_date", datetime.now()))60 }61 62 return sum(63 scores[key] * INITIAL_SCORING_WEIGHTS[key]64 for key in INITIAL_SCORING_WEIGHTS65 )66
67 def _fetch_job_details(self, job_urls: List[str]) -> List[Dict]:68 """Fetch detailed information for promising job listings."""69 detailed_jobs = []70 for url in job_urls:71 if "linkedin.com" in url:72 job_details = self.run_actor(73 actor_id=self.linkedin_detail_actor,74 input_data={"url": url}75 )76 detailed_jobs.extend(job_details)77 # Add similar handling for Indeed URLs if needed78 79 return detailed_jobs80
81 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:82 """Process the job search workflow."""83 resume_data = state["resume_data"]84 work_mode_preference = state["feedback"].get("work_mode_preference", "Any")85 86 # Prepare search parameters87 search_params = {88 "keywords": resume_data.desired_role,89 "location": resume_data.location_preference90 }91 92 # Add work mode to keywords if specified93 if work_mode_preference and work_mode_preference != "Any":94 search_params["keywords"] += f" {work_mode_preference}"95 96 # Initial job search from multiple sources97 linkedin_jobs = self._search_linkedin_jobs(search_params)98 indeed_jobs = self._search_indeed_jobs(search_params)99 100 # Combine and score initial results101 all_jobs = []102 for job in linkedin_jobs + indeed_jobs:103 # Normalize job data structure104 normalized_job = self._normalize_job_data(job)105 106 # Calculate initial score107 score = self._calculate_initial_score(normalized_job, resume_data)108 normalized_job["initial_score"] = score109 all_jobs.append(normalized_job)110 111 # Sort by score and filter112 promising_jobs = sorted(113 [j for j in all_jobs if j["initial_score"] >= JOB_SEARCH_CONFIG["detail_fetch_threshold"]],114 key=lambda x: x["initial_score"],115 reverse=True116 )[:JOB_SEARCH_CONFIG["max_details_to_fetch"]]117 118 # Fetch detailed information for promising jobs119 job_urls = [job["url"] for job in promising_jobs if "url" in job]120 detailed_jobs = self._fetch_job_details(job_urls)121 122 # If we couldn't get detailed jobs, use the promising jobs123 if not detailed_jobs:124 detailed_jobs = promising_jobs125 126 # Update state127 state["jobs_scraped"] = True128 state["job_listings"] = detailed_jobs129 130 return state131
132 def _normalize_job_data(self, job: Dict[str, Any]) -> Dict[str, Any]:133 """Normalize job data from different sources to a common format."""134 normalized = {135 "job_id": job.get("id", job.get("jobId", str(hash(str(job))))),136 "title": job.get("title", job.get("name", "Unknown Position")),137 "company": job.get("company", job.get("companyName", "Unknown Company")),138 "location": job.get("location", job.get("place", "Unknown Location")),139 "url": job.get("url", job.get("link", job.get("applicationLink", ""))),140 "source": "LinkedIn" if "linkedin" in job.get("url", "") else "Indeed"141 }142 143 # Handle posted date144 posted_date = job.get("postedDate", job.get("date", job.get("listedAt", None)))145 if isinstance(posted_date, str):146 try:147 normalized["posted_date"] = datetime.fromisoformat(posted_date.replace('Z', '+00:00'))148 except (ValueError, TypeError):149 normalized["posted_date"] = datetime.now() - timedelta(days=7) # Default to 1 week ago150 else:151 normalized["posted_date"] = datetime.now() - timedelta(days=7)152 153 return normalized154
155 def _score_title_match(self, job_title: str, desired_role: str) -> float:156 """Score how well the job title matches the desired role."""157 # Implement fuzzy matching or keyword matching logic158 job_title = job_title.lower()159 desired_role = desired_role.lower()160 161 if job_title == desired_role:162 return 1.0163 elif desired_role in job_title or job_title in desired_role:164 return 0.8165 # Add more sophisticated matching logic here166 return 0.4167
168 def _score_location_match(self, job_location: str, preferred_location: str) -> float:169 """Score how well the job location matches preferences."""170 # Implement location matching logic171 if not job_location or not preferred_location:172 return 0.5173 174 job_location = job_location.lower()175 preferred_location = preferred_location.lower()176 177 if "remote" in job_location:178 return 1.0179 elif preferred_location in job_location:180 return 1.0181 # Add more sophisticated location matching logic here182 return 0.5183
184 def _score_company_relevance(self, company: str, industry_experience: List[str]) -> float:185 """Score company relevance based on industry experience."""186 # Implement company/industry matching logic187 # This could be enhanced with company industry data188 return 0.7 # Default score, improve with better matching logic189
190 def _score_posting_date(self, posted_date: datetime) -> float:191 """Score job based on how recently it was posted."""192 days_old = (datetime.now() - posted_date).days193 if days_old <= 7:194 return 1.0195 elif days_old <= 14:196 return 0.8197 elif days_old <= 21:198 return 0.6199 else:200 return 0.4
src/agents/manager.py
1from typing import Dict, Any, List2from langchain_openai import ChatOpenAI3from langchain_core.messages import SystemMessage, HumanMessage4
5# Use absolute imports6from src.agents.base import BaseAgent7from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL8from src.models.schema import JobSearchState9
10class ManagerAgent(BaseAgent):11 def __init__(self, apify_client):12 super().__init__(apify_client)13 self.llm = ChatOpenAI(14 api_key=OPENAI_API_KEY,15 model=OPENAI_MODEL,16 temperature=0.217 )18
19 def _get_system_prompt(self) -> str:20 return """21 Manager Agent responsible for:22 1. Orchestrating the job search workflow23 2. Determining next steps based on current state24 3. Handling errors and retries25 4. Monitoring progress and completion26 """27
28 def _determine_next_step(self, state: JobSearchState) -> str:29 """Determine the next step in the workflow based on current state."""30 if not state.resume_data and not state.resume_parsed:31 return "parse_resume"32 33 if not state.job_listings and not state.scraping_complete:34 return "search_jobs"35 36 if state.job_listings and not state.scored_listings:37 return "score_jobs"38 39 if state.scored_listings and not state.notification_complete:40 return "send_notifications"41 42 return "complete"43
44 def _check_error_conditions(self, state: JobSearchState) -> List[str]:45 """Check for error conditions in the current state."""46 errors = []47 48 if state.error_log:49 errors.extend(state.error_log)50 51 if not state.resume_text:52 errors.append("Missing resume text")53 54 if state.scraping_complete and not state.job_listings:55 errors.append("No jobs found during search")56 57 return errors58
59 def _handle_retry_logic(self, state: JobSearchState, errors: List[str]) -> Dict[str, Any]:60 """Handle retry logic for failed operations."""61 retry_actions = {}62 63 for error in errors:64 if "API rate limit" in error:65 retry_actions["wait_time"] = 6066 retry_actions["retry_count"] = state.get("retry_count", 0) + 167 elif "No jobs found" in error:68 retry_actions["broaden_search"] = True69 retry_actions["retry_count"] = state.get("retry_count", 0) + 170 71 return retry_actions72
73 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:74 """Process the current state and determine next actions."""75 # Convert state to JobSearchState for type checking76 job_state = JobSearchState(**state)77 78 # Check for errors79 errors = self._check_error_conditions(job_state)80 81 if errors:82 # Handle retries if needed83 retry_actions = self._handle_retry_logic(job_state, errors)84 state.update(retry_actions)85 86 if retry_actions.get("retry_count", 0) > 3:87 state["status"] = "failed"88 state["error_message"] = "Max retries exceeded"89 return state90 91 # Determine next step92 next_step = self._determine_next_step(job_state)93 94 # Update state95 state["current_phase"] = next_step96 state["next_step"] = next_step97 98 if next_step == "complete":99 state["status"] = "completed"100 101 return state
src/agents/notification.py
1from typing import Dict, Any, List2from langchain_openai import ChatOpenAI3from langchain_core.messages import SystemMessage, HumanMessage4
5# Use absolute imports6from src.agents.base import BaseAgent7from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL8from src.models.schema import ScoredJobListing9
10class NotificationAgent(BaseAgent):11 def __init__(self, apify_client):12 super().__init__(apify_client)13 self.llm = ChatOpenAI(14 api_key=OPENAI_API_KEY,15 model=OPENAI_MODEL,16 temperature=0.217 )18
19 def _get_system_prompt(self) -> str:20 return """21 Notification Agent responsible for:22 1. Generating personalized job match summaries23 2. Creating detailed match explanations24 3. Formatting notifications for delivery25 """26
27 def _generate_job_summary(self, job: ScoredJobListing) -> str:28 """Generate a human-readable summary of a job match."""29 summary = f"""30🎯 {job.title} at {job.company}31📍 {job.location} {'(Remote)' if job.remote_status else ''}32💼 Match Score: {job.total_score}%33
34Key Matches:35- Position: {job.match_details['position_match']}36- Skills & Experience: {job.match_details['skills_experience']}37- Location: {job.match_details['location']}38- Company: {job.match_details['company']}39{f"- Salary: {job.salary_info}" if job.salary_info else ""}40
41Apply here: {job.application_url}42 """43 return summary.strip()44
45 def _create_batch_summary(self, jobs: List[ScoredJobListing], resume_data: Dict[str, Any]) -> str:46 """Create a summary for a batch of job matches."""47 total_jobs = len(jobs)48 high_matches = len([j for j in jobs if j.total_score >= 80])49 avg_score = sum(j.total_score for j in jobs) / total_jobs if total_jobs > 0 else 050
51 summary = f"""52📊 Job Search Results Summary53============================54🎯 Searching for: {resume_data.get('desired_role', 'Not specified')}55📍 Location: {resume_data.get('location_preference', 'Not specified')}56
57📈 Match Statistics:58- Total Jobs Found: {total_jobs}59- High Match Jobs (80%+): {high_matches}60- Average Match Score: {avg_score:.1f}%61
62🔝 Top Matches:63"""64 65 # Add top 5 matches66 for i, job in enumerate(jobs[:5], 1):67 summary += f"\n{i}. {job.title} at {job.company}"68 summary += f"\n Match Score: {job.total_score}%"69 summary += f"\n Location: {job.location}"70 if job.salary_info:71 summary += f"\n Salary: {job.salary_info}"72 summary += "\n"73
74 return summary.strip()75
76 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:77 """Process scored job listings and generate notifications."""78 scored_listings = state.get("scored_listings", [])79 resume_data = state.get("resume_data", {})80 81 if not scored_listings:82 state["notifications"] = []83 state["batch_summary"] = "No job matches found."84 return state85 86 # Convert to ScoredJobListing objects87 job_objects = [ScoredJobListing(**job) for job in scored_listings]88 89 # Generate individual job summaries90 notifications = []91 for job in job_objects:92 summary = self._generate_job_summary(job)93 notifications.append({94 "job_id": job.job_id,95 "summary": summary,96 "score": job.total_score97 })98 99 # Generate batch summary100 batch_summary = self._create_batch_summary(job_objects, resume_data)101 102 # Update state103 state["notifications"] = notifications104 state["batch_summary"] = batch_summary105 state["notification_complete"] = True106 107 return state
src/agents/relevance_scorer.py
1from typing import Dict, Any, List2import logging3from langchain_openai import ChatOpenAI4from langchain_core.messages import SystemMessage, HumanMessage5import json6
7# Use absolute imports8from src.agents.base import BaseAgent9from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL, FINAL_SCORING_WEIGHTS10from src.models.schema import JobListing, ScoredJobListing11
12logger = logging.getLogger(__name__)13
14class RelevanceScorerAgent(BaseAgent):15 def __init__(self, apify_client):16 super().__init__(apify_client)17 self.llm = ChatOpenAI(18 api_key=OPENAI_API_KEY,19 model=OPENAI_MODEL,20 temperature=0.221 )22
23 def _get_system_prompt(self) -> str:24 return """25 Relevance Scorer Agent responsible for:26 1. Analyzing job details against resume27 2. Calculating match scores for skills, experience, location28 3. Ranking job opportunities by relevance29 """30
31 def _calculate_position_match(self, job_title: str, desired_role: str) -> int:32 """Calculate position match score (0-100)."""33 if not job_title or not desired_role:34 return 035 36 job_title = job_title.lower()37 desired_role = desired_role.lower()38 39 # Exact match40 if job_title == desired_role:41 return 10042 43 # Partial match44 if desired_role in job_title or job_title in desired_role:45 return 8046 47 # Check for common keywords48 common_keywords = ["engineer", "developer", "manager", "analyst", "designer", "specialist"]49 job_keywords = [kw for kw in common_keywords if kw in job_title]50 role_keywords = [kw for kw in common_keywords if kw in desired_role]51 52 common_count = len(set(job_keywords).intersection(set(role_keywords)))53 if common_count > 0:54 return 6055 56 return 30 # Default low match57
58 def _calculate_skills_match(self, job_description: str, skills: List[str]) -> int:59 """Calculate skills match score (0-100)."""60 if not job_description or not skills:61 return 062 63 job_description = job_description.lower()64 matched_skills = [skill for skill in skills if skill.lower() in job_description]65 66 if not skills:67 return 068 69 match_percentage = (len(matched_skills) / len(skills)) * 10070 return min(int(match_percentage), 100)71
72 def _calculate_location_match(self, job_location: str, preferred_location: str, remote: bool) -> int:73 """Calculate location match score (0-100)."""74 if not job_location:75 return 076 77 # If remote job and candidate prefers remote78 if remote and "remote" in preferred_location.lower():79 return 10080 81 job_location = job_location.lower()82 preferred_location = preferred_location.lower()83 84 # Exact location match85 if preferred_location in job_location or job_location in preferred_location:86 return 10087 88 # Check for city/state match89 location_parts = preferred_location.split(',')90 for part in location_parts:91 part = part.strip()92 if part and part in job_location:93 return 8094 95 return 40 # Default low match96
97 def _score_job(self, job: Dict[str, Any], resume_data: Dict[str, Any]) -> Dict[str, Any]:98 """Score a job listing against resume data."""99 # Extract job details100 job_title = job.get("title", "")101 job_description = job.get("description", "")102 job_location = job.get("location", "")103 company = job.get("company", "")104 remote = job.get("remote_status", False)105 106 # Extract resume details107 desired_role = resume_data.get("desired_role", "")108 skills = resume_data.get("skills", [])109 preferred_location = resume_data.get("location_preference", "")110 111 # Calculate component scores112 position_score = self._calculate_position_match(job_title, desired_role)113 skills_score = self._calculate_skills_match(job_description, skills)114 location_score = self._calculate_location_match(job_location, preferred_location, remote)115 company_score = 70 # Default company score116 117 # Apply weights from settings118 weighted_scores = {119 "position_match": position_score * FINAL_SCORING_WEIGHTS["position_match"],120 "skills_experience": skills_score * FINAL_SCORING_WEIGHTS["skills_experience"],121 "location": location_score * FINAL_SCORING_WEIGHTS["location"],122 "company": company_score * FINAL_SCORING_WEIGHTS["company"]123 }124 125 # Calculate total score (0-100)126 total_score = sum(weighted_scores.values())127 128 # Create match details129 match_details = {130 "position_match": f"{position_score}% match with desired role '{desired_role}'",131 "skills_experience": f"{skills_score}% of skills match the job requirements",132 "location": f"{location_score}% location match with preference '{preferred_location}'",133 "company": f"{company_score}% company match"134 }135 136 return {137 "total_score": int(total_score),138 "score_breakdown": weighted_scores,139 "match_details": match_details140 }141
142 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:143 """Process job listings and score them against resume data."""144 job_listings = state.get("job_listings", [])145 resume_data = state.get("resume_data", {})146 147 if not job_listings:148 state["scored_listings"] = []149 return state150 151 if not resume_data:152 state["error_log"] = state.get("error_log", []) + ["Missing resume data for scoring"]153 return state154 155 # Score each job156 scored_jobs = []157 for job in job_listings:158 try:159 # Standardize job data format if needed160 standardized_job = self._standardize_job_data(job)161 162 # Skip jobs with missing essential data163 if not standardized_job.get("title") or not standardized_job.get("company"):164 continue165 166 # Score the job167 score_data = self._score_job(standardized_job, resume_data)168 169 # Create ScoredJobListing170 scored_job = ScoredJobListing(171 job_id=standardized_job.get("job_id", ""),172 title=standardized_job.get("title", ""),173 company=standardized_job.get("company", ""),174 location=standardized_job.get("location", ""),175 remote_status=standardized_job.get("remote_status", False),176 description=standardized_job.get("description", ""),177 required_skills=standardized_job.get("required_skills", []),178 salary_info=standardized_job.get("salary_info", ""),179 posted_date=standardized_job.get("posted_date", ""),180 application_url=standardized_job.get("application_url", ""),181 source=standardized_job.get("source", ""),182 total_score=score_data["total_score"],183 score_breakdown=score_data["score_breakdown"],184 match_details=score_data["match_details"]185 )186 187 scored_jobs.append(scored_job.dict())188 189 except Exception as e:190 logger.error(f"Error scoring job: {str(e)}")191 continue192 193 # Sort by score (descending)194 scored_jobs.sort(key=lambda x: x["total_score"], reverse=True)195 196 # Take top 10 matches197 top_matches = scored_jobs[:10]198 199 # Update state200 state["scored_listings"] = top_matches201 202 return state203 204 def _standardize_job_data(self, job_data: Dict[str, Any]) -> Dict[str, Any]:205 """Standardize job data from different sources."""206 # Handle case where job_data is a list207 if isinstance(job_data, list):208 if not job_data:209 return {}210 job_data = job_data[0] # Take the first item211 212 # Check if job data is in LinkedIn format213 if "job_title" in job_data:214 return {215 "job_id": job_data.get("job_id", ""),216 "title": job_data.get("job_title", ""),217 "company": job_data.get("company_name", ""),218 "location": job_data.get("job_location", ""),219 "remote_status": "remote" in job_data.get("job_location", "").lower(),220 "description": job_data.get("job_description", ""),221 "required_skills": job_data.get("job_skills", []),222 "salary_info": job_data.get("salary_range", ""),223 "posted_date": job_data.get("posted_date", ""),224 "application_url": job_data.get("job_url", ""),225 "source": "LinkedIn"226 }227 228 # Check if job data is in Indeed format229 elif "jobTitle" in job_data:230 return {231 "job_id": job_data.get("jobId", ""),232 "title": job_data.get("jobTitle", ""),233 "company": job_data.get("companyName", ""),234 "location": job_data.get("location", ""),235 "remote_status": "remote" in job_data.get("location", "").lower(),236 "description": job_data.get("description", ""),237 "required_skills": [], # Indeed doesn't provide skills directly238 "salary_info": job_data.get("salary", ""),239 "posted_date": job_data.get("date", ""),240 "application_url": job_data.get("url", ""),241 "source": "Indeed"242 }243 244 # Default case - return as is245 return job_data
src/agents/resume_parser.py
1from typing import Dict, Any2import logging3from langchain_openai import ChatOpenAI4from langchain_core.messages import SystemMessage, HumanMessage5import json6
7# Use absolute imports8from src.agents.base import BaseAgent9from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL10from src.models.schema import ResumeData11
12logger = logging.getLogger(__name__)13
14class ResumeParserAgent(BaseAgent):15 def __init__(self, apify_client):16 super().__init__(apify_client)17 self.llm = ChatOpenAI(18 api_key=OPENAI_API_KEY,19 model=OPENAI_MODEL,20 temperature=0.221 )22
23 def _get_system_prompt(self) -> str:24 return """25 Resume Parser Agent responsible for:26 1. Extracting structured information from resume text27 2. Identifying key skills and experience28 3. Determining job search parameters29 """30
31 def _extract_basic_info(self, resume_text: str) -> Dict[str, Any]:32 """Extract basic information from resume text."""33 try:34 client = ChatOpenAI(35 api_key=OPENAI_API_KEY,36 model=OPENAI_MODEL,37 temperature=0.238 )39 40 system_message = SystemMessage(content="""41 You are a resume parsing expert. Extract the following information from the resume:42 - Desired role (based on most recent experience or stated objective)43 - Location preference (if mentioned)44 - Total years of experience45 - Skills (technical and soft skills)46 - Industry experience47 - Work experience (list of jobs with title, company, duration)48 - Education (list of degrees with institution, year)49 50 Format your response as a JSON object.51 """)52 53 human_message = HumanMessage(content=resume_text)54 55 response = client.invoke([system_message, human_message])56 57 # Extract JSON from response58 response_text = response.content59 60 # Find JSON in the response61 json_start = response_text.find('{')62 json_end = response_text.rfind('}') + 163 64 if json_start >= 0 and json_end > json_start:65 json_str = response_text[json_start:json_end]66 return json.loads(json_str)67 else:68 logger.warning("Could not extract JSON from OpenAI response")69 return self._basic_extraction_fallback(resume_text)70 71 except Exception as e:72 logger.error(f"Error extracting resume info: {str(e)}")73 return self._basic_extraction_fallback(resume_text)74 75 def _basic_extraction_fallback(self, resume_text: str) -> Dict[str, Any]:76 """Basic fallback extraction when OpenAI fails."""77 # Extract some basic info using simple heuristics78 lines = resume_text.split('\n')79 skills = []80 81 for line in lines:82 if "skills" in line.lower() and ":" in line:83 skills_text = line.split(":", 1)[1].strip()84 skills = [s.strip() for s in skills_text.split(',')]85 break86 87 return {88 "desired_role": "Software Engineer", # Default89 "location_preference": "Remote", # Default90 "total_years_experience": 3, # Default91 "skills": skills[:10] if skills else ["Python", "JavaScript"],92 "industry_experience": ["Technology"],93 "experience": [94 {95 "title": "Software Engineer",96 "company": "Unknown",97 "duration": "3 years"98 }99 ],100 "education": [101 {102 "degree": "Bachelor's Degree",103 "institution": "Unknown University",104 "year": 2020105 }106 ]107 }108
109 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:110 """Process resume text and extract information."""111 resume_text = state.get("resume_text", "")112 113 if not resume_text:114 state["error_log"] = state.get("error_log", []) + ["Missing resume text"]115 return state116 117 # Extract resume information118 resume_info = self._extract_basic_info(resume_text)119 120 # Create ResumeData instance121 resume_data = ResumeData(122 skills=resume_info.get("skills", []),123 work_experience=resume_info.get("experience", []),124 education=resume_info.get("education", []),125 desired_role=resume_info.get("desired_role", ""),126 location_preference=resume_info.get("location_preference", ""),127 industry_experience=resume_info.get("industry_experience", []),128 total_years_experience=resume_info.get("total_years_experience", 0)129 )130 131 # Update state132 state["resume_data"] = resume_data.dict()133 state["resume_parsed"] = True134 135 return state
src/agents/__init__.py
1"""Agent module for job search workflow."""2
3# Agents package initialization4from .base import BaseAgent5from .resume_parser import ResumeParserAgent6from .job_scraper import JobScraperAgent7from .relevance_scorer import RelevanceScorerAgent8from .feedback_refiner import FeedbackRefinerAgent9from .notification import NotificationAgent10from .manager import ManagerAgent11
12__all__ = [13 'BaseAgent',14 'ResumeParserAgent',15 'JobScraperAgent',16 'RelevanceScorerAgent',17 'FeedbackRefinerAgent',18 'NotificationAgent',19 'ManagerAgent'20]
src/models/schema.py
1from typing import Dict, Any, List, Optional2from pydantic import BaseModel3
4class WorkExperience(BaseModel):5 """Work experience entry in a resume."""6 title: str7 company: str8 duration: str9 description: Optional[str] = None10
11class Education(BaseModel):12 """Education entry in a resume."""13 degree: str14 institution: str15 year: int16 area: Optional[str] = None17
18class ResumeData(BaseModel):19 """Structured data extracted from a resume."""20 skills: List[str] = []21 work_experience: List[Dict[str, Any]] = []22 education: List[Dict[str, Any]] = []23 desired_role: str = ""24 location_preference: str = ""25 industry_experience: List[str] = []26 total_years_experience: int = 027
28class JobListing(BaseModel):29 """Job listing data structure."""30 job_id: str = ""31 title: str = ""32 company: str = ""33 location: str = ""34 remote_status: bool = False35 description: str = ""36 required_skills: List[str] = []37 salary_info: str = ""38 posted_date: str = ""39 application_url: str = ""40 source: str = ""41
42class ScoredJobListing(JobListing):43 """Job listing with relevance score."""44 total_score: int = 045 score_breakdown: Dict[str, float] = {}46 match_details: Dict[str, str] = {}47
48class JobSearchState(BaseModel):49 """State for the job search workflow."""50 resume_text: str = ""51 resume_parsed: bool = False52 scraping_complete: bool = False53 current_phase: str = ""54 next_step: str = ""55 resume_data: Dict[str, Any] = {}56 job_listings: List[Dict[str, Any]] = []57 scored_listings: List[Dict[str, Any]] = []58 notifications: List[Dict[str, Any]] = []59 batch_summary: str = ""60 notification_complete: bool = False61 error_log: List[str] = []
src/models/__init__.py
1"""Models module for job search data structures."""2
3# Models package initialization4from .schema import (5 WorkExperience, 6 Education, 7 ResumeData, 8 JobListing, 9 ScoredJobListing,10 JobSearchState11)12
13__all__ = [14 'WorkExperience',15 'Education',16 'ResumeData',17 'JobListing',18 'ScoredJobListing',19 'JobSearchState'20]
src/utils/retry.py
1"""Retry utility for handling transient errors."""2
3import time4import logging5import functools6from typing import Callable, Any, Optional7
8# Use absolute imports9from src.config.settings import RETRY_ATTEMPTS, RETRY_DELAY10
11logger = logging.getLogger(__name__)12
13def with_retry(max_attempts: Optional[int] = None, delay: Optional[int] = None):14 """15 Decorator for retrying a function if it raises an exception.16 17 Args:18 max_attempts: Maximum number of attempts (default from settings)19 delay: Delay between attempts in seconds (default from settings)20 """21 max_attempts = max_attempts or RETRY_ATTEMPTS22 delay = delay or RETRY_DELAY23 24 def decorator(func: Callable) -> Callable:25 @functools.wraps(func)26 def wrapper(*args, **kwargs) -> Any:27 attempts = 028 while attempts < max_attempts:29 try:30 return func(*args, **kwargs)31 except Exception as e:32 attempts += 133 if attempts >= max_attempts:34 logger.error(f"Function {func.__name__} failed after {attempts} attempts: {str(e)}")35 raise36 37 logger.warning(f"Attempt {attempts} failed for {func.__name__}: {str(e)}. Retrying in {delay} seconds...")38 time.sleep(delay)39 40 # This should never be reached due to the raise in the except block41 return None42 43 return wrapper44 45 return decorator
src/utils/__init__.py
1"""Utility functions for the job search actor."""2
3# Utils package initialization4from .retry import with_retry5
6__all__ = ['with_retry']