Job Search Agent Langraph
Try for free
This Actor is paid per event
Go to Store
Job Search Agent Langraph
gopalakrishnan/job-search-agent-langraph
Try for free
This Actor is paid per event
An autonomous AI agent that helps you find the perfect job match by analyzing your resume and searching across LinkedIn and Indeed. Built with Apify and powered by advanced AI capabilities.
Developer
Maintained by Community
Actor Metrics
1 monthly user
No reviews yet
No bookmarks yet
90% runs succeeded
Created in Mar 2025
Modified a day ago
Categories
.dockerignore
1.git
2.mise.toml
3.nvim.lua
4storage
5
6# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
7
8# Byte-compiled / optimized / DLL files
9__pycache__/
10*.py[cod]
11*$py.class
12
13# C extensions
14*.so
15
16# Distribution / packaging
17.Python
18build/
19develop-eggs/
20dist/
21downloads/
22eggs/
23.eggs/
24lib/
25lib64/
26parts/
27sdist/
28var/
29wheels/
30share/python-wheels/
31*.egg-info/
32.installed.cfg
33*.egg
34MANIFEST
35
36# PyInstaller
37# Usually these files are written by a python script from a template
38# before PyInstaller builds the exe, so as to inject date/other infos into it.
39*.manifest
40*.spec
41
42# Installer logs
43pip-log.txt
44pip-delete-this-directory.txt
45
46# Unit test / coverage reports
47htmlcov/
48.tox/
49.nox/
50.coverage
51.coverage.*
52.cache
53nosetests.xml
54coverage.xml
55*.cover
56*.py,cover
57.hypothesis/
58.pytest_cache/
59cover/
60
61# Translations
62*.mo
63*.pot
64
65# Django stuff:
66*.log
67local_settings.py
68db.sqlite3
69db.sqlite3-journal
70
71# Flask stuff:
72instance/
73.webassets-cache
74
75# Scrapy stuff:
76.scrapy
77
78# Sphinx documentation
79docs/_build/
80
81# PyBuilder
82.pybuilder/
83target/
84
85# Jupyter Notebook
86.ipynb_checkpoints
87
88# IPython
89profile_default/
90ipython_config.py
91
92# pyenv
93# For a library or package, you might want to ignore these files since the code is
94# intended to run in multiple environments; otherwise, check them in:
95.python-version
96
97# pdm
98# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
99#pdm.lock
100# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
101# in version control.
102# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
103.pdm.toml
104.pdm-python
105.pdm-build/
106
107# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
108__pypackages__/
109
110# Celery stuff
111celerybeat-schedule
112celerybeat.pid
113
114# SageMath parsed files
115*.sage.py
116
117# Environments
118.env
119.venv
120env/
121venv/
122ENV/
123env.bak/
124venv.bak/
125
126# Spyder project settings
127.spyderproject
128.spyproject
129
130# Rope project settings
131.ropeproject
132
133# mkdocs documentation
134/site
135
136# mypy
137.mypy_cache/
138.dmypy.json
139dmypy.json
140
141# Pyre type checker
142.pyre/
143
144# pytype static type analyzer
145.pytype/
146
147# Cython debug symbols
148cython_debug/
149
150# PyCharm
151# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
152# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
153# and can be added to the global gitignore or merged into this file. For a more nuclear
154# option (not recommended) you can uncomment the following to ignore the entire idea folder.
155.idea/
.gitignore
1.mise.toml
2.nvim.lua
3storage
4
5# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
6
7# Byte-compiled / optimized / DLL files
8__pycache__/
9*.py[cod]
10*$py.class
11
12# C extensions
13*.so
14
15# Distribution / packaging
16.Python
17build/
18develop-eggs/
19dist/
20downloads/
21eggs/
22.eggs/
23lib/
24lib64/
25parts/
26sdist/
27var/
28wheels/
29share/python-wheels/
30*.egg-info/
31.installed.cfg
32*.egg
33MANIFEST
34
35# PyInstaller
36# Usually these files are written by a python script from a template
37# before PyInstaller builds the exe, so as to inject date/other infos into it.
38*.manifest
39*.spec
40
41# Installer logs
42pip-log.txt
43pip-delete-this-directory.txt
44
45# Unit test / coverage reports
46htmlcov/
47.tox/
48.nox/
49.coverage
50.coverage.*
51.cache
52nosetests.xml
53coverage.xml
54*.cover
55*.py,cover
56.hypothesis/
57.pytest_cache/
58cover/
59
60# Translations
61*.mo
62*.pot
63
64# Django stuff:
65*.log
66local_settings.py
67db.sqlite3
68db.sqlite3-journal
69
70# Flask stuff:
71instance/
72.webassets-cache
73
74# Scrapy stuff:
75.scrapy
76
77# Sphinx documentation
78docs/_build/
79
80# PyBuilder
81.pybuilder/
82target/
83
84# Jupyter Notebook
85.ipynb_checkpoints
86
87# IPython
88profile_default/
89ipython_config.py
90
91# pyenv
92# For a library or package, you might want to ignore these files since the code is
93# intended to run in multiple environments; otherwise, check them in:
94.python-version
95
96# pdm
97# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
98#pdm.lock
99# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
100# in version control.
101# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
102.pdm.toml
103.pdm-python
104.pdm-build/
105
106# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
107__pypackages__/
108
109# Celery stuff
110celerybeat-schedule
111celerybeat.pid
112
113# SageMath parsed files
114*.sage.py
115
116# Environments
117.env
118.venv
119env/
120venv/
121ENV/
122env.bak/
123venv.bak/
124
125# Spyder project settings
126.spyderproject
127.spyproject
128
129# Rope project settings
130.ropeproject
131
132# mkdocs documentation
133/site
134
135# mypy
136.mypy_cache/
137.dmypy.json
138dmypy.json
139
140# Pyre type checker
141.pyre/
142
143# pytype static type analyzer
144.pytype/
145
146# Cython debug symbols
147cython_debug/
148
149# PyCharm
150# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
151# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
152# and can be added to the global gitignore or merged into this file. For a more nuclear
153# option (not recommended) you can uncomment the following to ignore the entire idea folder.
154.idea/
155
156# Added by Apify CLI
157node_modules
158
159# Virtual Environment
160venv/
161ENV/
162env/
163
164# IDE
165.idea/
166.vscode/
167*.swp
168*.swo
169
170# Apify specific
171apify_storage/
172storage/
173.env
174
175# Logs
176*.log
177npm-debug.log*
178yarn-debug.log*
179yarn-error.log*
180
181# OS specific
182.DS_Store
183Thumbs.db
Dockerfile
1FROM apify/actor-python:3.11
2
3# Copy package files
4COPY requirements.txt ./
5
6# Install dependencies
7RUN pip install --no-cache-dir -r requirements.txt
8
9# Copy source code
10COPY . ./
11
12# Run the actor
13CMD ["python", "src/main.py"]
INPUT_SCHEMA.json
1{
2 "title": "AI Job Search Agent Input",
3 "type": "object",
4 "schemaVersion": 1,
5 "properties": {
6 "openAiApiKey": {
7 "title": "OpenAI API Key",
8 "type": "string",
9 "description": "Your OpenAI API key for AI services",
10 "editor": "textfield",
11 "isSecret": true
12 },
13 "resumeText": {
14 "title": "Resume Text",
15 "type": "string",
16 "description": "The full text of your resume",
17 "editor": "textarea"
18 },
19 "locationPreference": {
20 "title": "Location Preference",
21 "type": "string",
22 "description": "Your preferred job location (e.g., 'Seattle, WA')",
23 "editor": "textfield"
24 },
25 "workModePreference": {
26 "title": "Work Mode Preference",
27 "type": "string",
28 "description": "Preferred work mode",
29 "default": "Any",
30 "enum": ["Any", "Remote", "Hybrid", "On-site"]
31 },
32 "searchRadius": {
33 "title": "Search Radius",
34 "type": "integer",
35 "description": "Search radius in miles",
36 "default": 25
37 },
38 "minSalary": {
39 "title": "Minimum Salary",
40 "type": "integer",
41 "description": "Minimum annual salary",
42 "default": 0
43 }
44 },
45 "required": ["openAiApiKey", "resumeText"]
46}
project_outline.md
1# Job Search Agent - Apify Actor Project Outline
2
3Based on the existing codebase and the Apify templates, I'll outline a project structure for converting the current job search functionality into an Apify actor using LangGraph. This approach maintains the core functionality while leveraging Apify's infrastructure.
4
5## Project Structure
6
7```
8job-search-agent/
9├── .actor/ # Apify actor configuration
10│ ├── actor.json # Actor metadata
11│ └── input_schema.json # Input schema definition
12├── src/
13│ ├── agents/ # Agent definitions
14│ │ ├── __init__.py
15│ │ ├── manager.py # Orchestration agent
16│ │ ├── resume_parser.py # Resume parsing agent
17│ │ ├── job_scraper.py # Job search agent
18│ │ ├── relevance_scorer.py # Scoring and matching agent
19│ │ └── output_formatter.py # Results formatting agent
20│ ├── tools/ # Tool definitions
21│ │ ├── __init__.py
22│ │ ├── apify_tools.py # LinkedIn/Indeed search tools
23│ │ └── utility_tools.py # Helper tools
24│ ├── models/ # Data models
25│ │ ├── __init__.py
26│ │ ├── resume.py # Resume data structure
27│ │ └── job.py # Job listing data structure
28│ ├── utils/ # Utility functions
29│ │ ├── __init__.py
30│ │ ├── text_processing.py # Text cleaning utilities
31│ │ └── scoring.py # Scoring algorithms
32│ ├── config.py # Configuration settings
33│ └── main.py # Entry point
34├── .gitignore
35├── README.md
36└── requirements.txt # Dependencies
37```
38
39## Workflow Logic
40
41The job search workflow will follow these steps:
42
431. **Input Processing**
44 - Parse user input (resume text, location preferences, work mode)
45 - Validate and prepare for processing
46
472. **Resume Parsing**
48 - Extract essential information from resume
49 - Categorize skills, experience, education
50 - Identify key search terms
51
523. **Job Search**
53 - Use Apify actors for LinkedIn and Indeed searches
54 - Apply search parameters from resume
55 - Collect raw job listings
56
574. **Relevance Scoring**
58 - Score each job based on match criteria
59 - Calculate component scores (skills, experience, location)
60 - Generate overall match percentage
61
625. **Results Formatting**
63 - Structure results with match details
64 - Sort by relevance score
65 - Add match explanations
66
676. **Output Generation**
68 - Save to Apify dataset
69 - Generate summary statistics
70 - Create downloadable results
71
72## LangGraph Implementation
73
74Using the [python-langgraph template](https://github.com/apify/actor-templates/tree/master/templates/python-langgraph), we'll implement a graph-based workflow:
75
76```python
77# Simplified graph structure
78graph = StateGraph(nodes=[
79 Node("resume_parser", resume_parser_agent),
80 Node("job_scraper", job_scraper_agent),
81 Node("relevance_scorer", relevance_scorer_agent),
82 Node("output_formatter", output_formatter_agent)
83])
84
85# Define transitions
86graph.add_edge("resume_parser", "job_scraper")
87graph.add_edge("job_scraper", "relevance_scorer")
88graph.add_edge("relevance_scorer", "output_formatter")
89```
90
91## Scoring Logic
92
93The relevance scoring will use a weighted approach:
94
95```
96Overall Score = (Skills Match × 0.40) +
97 (Experience Match × 0.25) +
98 (Location Match × 0.20) +
99 (Company/Role Fit × 0.15)
100```
101
102Each component will be scored from 0-100:
103
1041. **Skills Match (40%)**
105 - Required skills present in resume
106 - Skill level/proficiency match
107 - Technology stack alignment
108
1092. **Experience Match (25%)**
110 - Years of experience match
111 - Role similarity
112 - Industry relevance
113
1143. **Location Match (20%)**
115 - Exact location match
116 - Remote/hybrid preference match
117 - Commute distance (if applicable)
118
1194. **Company/Role Fit (15%)**
120 - Company size preference
121 - Industry alignment
122 - Career growth potential
123
124## Input Schema
125
126```json
127{
128 "title": "Job Search Input",
129 "type": "object",
130 "schemaVersion": 1,
131 "properties": {
132 "resumeText": {
133 "title": "Resume Text",
134 "type": "string",
135 "description": "Full text of the resume",
136 "editor": "textarea"
137 },
138 "locationPreference": {
139 "title": "Location Preference",
140 "type": "string",
141 "description": "Preferred job location",
142 "editor": "textfield"
143 },
144 "workModePreference": {
145 "title": "Work Mode",
146 "type": "string",
147 "description": "Preferred work mode (Remote, Hybrid, On-site)",
148 "editor": "select",
149 "enum": ["Remote", "Hybrid", "On-site", "Any"]
150 },
151 "searchRadius": {
152 "title": "Search Radius (miles)",
153 "type": "integer",
154 "description": "Maximum distance from preferred location",
155 "default": 25
156 },
157 "minSalary": {
158 "title": "Minimum Salary",
159 "type": "integer",
160 "description": "Minimum acceptable salary",
161 "default": 0
162 }
163 },
164 "required": ["resumeText"]
165}
166```
167
168## Output Format
169
170```json
171{
172 "query": {
173 "resumeSummary": "...",
174 "searchParameters": {
175 "location": "...",
176 "workMode": "...",
177 "searchRadius": 25
178 }
179 },
180 "results": [
181 {
182 "position": "Software Engineer",
183 "company": "Example Corp",
184 "location": "New York, NY",
185 "workMode": "Hybrid",
186 "salary": "$120,000 - $150,000",
187 "matchScore": 87,
188 "matchDetails": {
189 "skillsMatch": 90,
190 "experienceMatch": 85,
191 "locationMatch": 75,
192 "companyFitMatch": 95
193 },
194 "matchExplanation": "Strong match on required skills (Python, React). Experience level aligns well with 5+ years requirement. Location is within commuting distance. Company culture emphasizes work-life balance.",
195 "keyRequirements": ["Python", "React", "5+ years experience"],
196 "applicationUrl": "https://example.com/jobs/123"
197 }
198 ],
199 "statistics": {
200 "totalJobsFound": 120,
201 "averageMatchScore": 72,
202 "topSkillsRequested": ["Python", "JavaScript", "AWS"],
203 "salaryRange": {
204 "min": 80000,
205 "max": 180000,
206 "average": 125000
207 }
208 }
209}
210```
211
212## Implementation Approach
213
2141. **Start with the python-langgraph template**
215 - This provides the LangGraph structure needed for agent workflow
216 - Already includes Apify SDK integration
217
2182. **Implement agents as LangGraph nodes**
219 - Each agent will be a node in the graph
220 - Define clear input/output contracts
221
2223. **Use Apify actors as tools**
223 - LinkedIn and Indeed scrapers as external tools
224 - Integrate with LangChain tool calling format
225
2264. **Implement scoring logic in Python**
227 - Create utility functions for match calculations
228 - Use weighted scoring approach
229
2305. **Store results in Apify dataset**
231 - Structured job matches with scores
232 - Include match explanations and statistics
233
234## Deployment and Execution
235
2361. **Local Development**
237 - Use Apify CLI for local testing
238 - Run with `apify run` command
239
2402. **Apify Platform Deployment**
241 - Push to Apify with `apify push`
242 - Configure environment variables
243
2443. **Execution**
245 - Run via Apify UI or API
246 - Monitor execution in Apify console
247 - Download results as JSON
248
249This project structure maintains the core functionality of the existing job search implementation while leveraging Apify's infrastructure and the LangGraph framework for agent orchestration.
250
251[Source: Python LangGraph template](https://github.com/apify/actor-templates/tree/master/templates/python-langgraph)
requirements.txt
1# Feel free to add your Python dependencies below. For formatting guidelines, see:
2# https://pip.pypa.io/en/latest/reference/requirements-file-format/
3
4# Apify SDK
5apify==1.1.3
6apify-client==1.4.0
7
8# LangGraph and related packages
9langgraph==0.0.20
10
11# OpenAI
12openai>=1.10.0
13
14# Utilities
15python-dotenv==1.0.0
16pydantic==1.10.8
17tenacity==8.2.3
18
19# Web scraping
20beautifulsoup4==4.12.2
21httpx==0.24.1
.actor/actor.json
1{
2 "actorSpecification": 1,
3 "name": "job-search-agent-langraph",
4 "title": "AI Job Search Agent",
5 "description": "An AI-powered job search agent that analyzes resumes and finds matching job opportunities from LinkedIn and Indeed.",
6 "version": "0.1",
7 "buildTag": "latest",
8 "meta": {
9 "templateId": "python-langgraph"
10 },
11 "input": "input_schema.json",
12 "dockerfile": "./Dockerfile",
13 "storages": {
14 "dataset": {
15 "actorSpecification": 1,
16 "title": "Job Search Results",
17 "views": {
18 "overview": {
19 "title": "Overview",
20 "transformation": {
21 "fields": [
22 "position",
23 "company",
24 "location",
25 "matchScore",
26 "applicationUrl"
27 ]
28 },
29 "display": {
30 "component": "table",
31 "properties": {
32 "matchScore": {
33 "template": "{{value}}%",
34 "format": "number"
35 }
36 }
37 }
38 }
39 }
40 }
41 }
42}
.actor/Dockerfile
1# First, specify the base Docker image.
2# You can see the Docker images from Apify at https://hub.docker.com/r/apify/.
3# You can also use any other image from Docker Hub.
4FROM apify/actor-python:3.11
5
6# Copy package files
7COPY requirements.txt ./
8
9# Install dependencies
10RUN pip install --no-cache-dir -r requirements.txt
11
12# Copy source code
13COPY . ./
14
15# Run the actor
16CMD ["python", "src/main.py"]
.actor/input_schema.json
1{
2 "title": "Job Search Input",
3 "type": "object",
4 "schemaVersion": 1,
5 "properties": {
6 "apifyApiToken": {
7 "title": "Apify API Token",
8 "type": "string",
9 "description": "Your Apify API token for running job search actors",
10 "editor": "textfield",
11 "isSecret": true
12 },
13 "openAiApiKey": {
14 "title": "OpenAI API Key",
15 "type": "string",
16 "description": "Your OpenAI API key for AI services",
17 "editor": "textfield",
18 "isSecret": true
19 },
20 "resumeText": {
21 "title": "Resume Text",
22 "type": "string",
23 "description": "Full text of the resume",
24 "editor": "textarea"
25 },
26 "locationPreference": {
27 "title": "Location Preference",
28 "type": "string",
29 "description": "Preferred job location",
30 "editor": "textfield"
31 },
32 "workModePreference": {
33 "title": "Work Mode",
34 "type": "string",
35 "description": "Preferred work mode (Remote, Hybrid, On-site)",
36 "editor": "select",
37 "enum": ["Remote", "Hybrid", "On-site", "Any"]
38 },
39 "searchRadius": {
40 "title": "Search Radius (miles)",
41 "type": "integer",
42 "description": "Maximum distance from preferred location",
43 "default": 25
44 },
45 "minSalary": {
46 "title": "Minimum Salary",
47 "type": "integer",
48 "description": "Minimum acceptable salary",
49 "default": 0
50 }
51 },
52 "required": ["apifyApiToken", "openAiApiKey", "resumeText"]
53}
src/main.py
1"""
2AI Job Search Agent - Apify Actor
3
4This actor analyzes a resume and finds matching job opportunities from LinkedIn and Indeed.
5"""
6
7import os
8import json
9import logging
10from typing import Dict, Any
11from datetime import datetime
12import asyncio
13import sys
14
15# Add the src directory to the Python path
16sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
17
18from apify import Actor
19from openai import OpenAI
20from dotenv import load_dotenv
21from apify_client import ApifyClient
22
23# Import settings
24from src.config.settings import (
25 OPENAI_MODEL,
26 ACTOR_IDS,
27 FINAL_SCORING_WEIGHTS
28)
29
30# Import agents
31from src.agents.manager import ManagerAgent
32from src.agents.resume_parser import ResumeParserAgent
33from src.agents.relevance_scorer import RelevanceScorerAgent
34from src.agents.notification import NotificationAgent
35from src.models.schema import JobSearchState
36
37# Configure logging
38logging.basicConfig(
39 level=logging.INFO,
40 format='%(asctime)s [%(levelname)s] [%(name)s] %(message)s',
41 handlers=[logging.StreamHandler()]
42)
43logger = logging.getLogger(__name__)
44
45# Load environment variables
46load_dotenv()
47
48# Event names and prices
49EVENTS = {
50 'resume_parse': 0.10, # Resume parsing
51 'job_score': 0.02, # Per job scoring
52 'results_summary': 0.10 # Final summary
53}
54
55class JobSearchActor:
56 def __init__(self, apify_client: ApifyClient):
57 self.apify_client = apify_client
58 self.manager = ManagerAgent(apify_client)
59 self.resume_parser = ResumeParserAgent(apify_client)
60 self.relevance_scorer = RelevanceScorerAgent(apify_client)
61 self.notification = NotificationAgent(apify_client)
62
63 async def process_step(self, state: Dict[str, Any]) -> Dict[str, Any]:
64 """Process a single step in the workflow based on current phase."""
65 current_phase = state.get("current_phase", "initialize")
66
67 try:
68 if current_phase == "parse_resume":
69 logger.info("Parsing resume...")
70 state = await self.resume_parser.process(state)
71 # Log charge for resume parsing
72 await self._log_charge('resume_parse')
73
74 elif current_phase == "search_jobs":
75 logger.info("Searching for jobs...")
76 # Use Apify's LinkedIn Jobs Scraper
77 run_input = {
78 "keywords": state["resume_data"]["desired_role"],
79 "location": state["resume_data"]["location_preference"],
80 "maxItems": 50,
81 "timeoutSecs": 120,
82 "memoryMbytes": 1024
83 }
84
85 run = await self.apify_client.actor(ACTOR_IDS["linkedin_scraper"]).call(run_input=run_input)
86 dataset_items = await self.apify_client.dataset(run["defaultDatasetId"]).list_items().items()
87
88 if not dataset_items:
89 logger.warning("No jobs found in search results")
90 state["error_log"] = state.get("error_log", []) + ["No jobs found during search"]
91 else:
92 logger.info(f"Found {len(dataset_items)} jobs")
93 state["job_listings"] = dataset_items
94
95 state["scraping_complete"] = True
96
97 elif current_phase == "score_jobs":
98 logger.info("Scoring jobs...")
99 state = await self.relevance_scorer.process(state)
100 # Log charge for job scoring
101 job_count = len(state.get("scored_listings", []))
102 for _ in range(job_count):
103 await self._log_charge('job_score')
104
105 elif current_phase == "send_notifications":
106 logger.info("Generating results summary...")
107 state = await self.notification.process(state)
108 # Log charge for results summary
109 await self._log_charge('results_summary')
110
111 except Exception as e:
112 logger.error(f"Error in {current_phase}: {str(e)}")
113 state["error_log"] = state.get("error_log", []) + [f"Error in {current_phase}: {str(e)}"]
114
115 return state
116
117 async def _log_charge(self, event_type: str):
118 """Log a charge event."""
119 if event_type in EVENTS:
120 await self.apify_client.key_value_store().set_record(
121 f"CHARGE_{datetime.now().isoformat()}",
122 {
123 "event": event_type,
124 "amount": EVENTS[event_type],
125 "timestamp": datetime.now().isoformat()
126 }
127 )
128
129 async def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
130 """Run the job search workflow."""
131 # Initialize state
132 state = JobSearchState(
133 resume_text=input_data.get("resumeText", ""),
134 resume_parsed=False,
135 scraping_complete=False,
136 current_phase="initialize",
137 next_step="parse_resume"
138 ).dict()
139
140 # Process workflow until completion
141 while state.get("status") not in ["completed", "failed"]:
142 # Let manager determine next step
143 state = await self.manager.process(state)
144
145 if state.get("status") in ["completed", "failed"]:
146 break
147
148 # Process the current step
149 state = await self.process_step(state)
150
151 # Add checkpoint to dataset
152 await self.apify_client.key_value_store().set_record(
153 "STATE_CHECKPOINT",
154 state
155 )
156
157 return state
158
159async def main():
160 """Main entry point for the actor."""
161 async with Actor:
162 # Initialize Apify client
163 apify_client = ApifyClient()
164
165 try:
166 # Get input
167 actor_input = await Actor.get_input() or {}
168
169 # Initialize and run job search
170 job_search = JobSearchActor(apify_client)
171 result = await job_search.run(actor_input)
172
173 # Save output
174 await Actor.push_data(result)
175
176 except Exception as e:
177 logger.error(f"Actor failed: {str(e)}")
178 raise
179
180if __name__ == "__main__":
181 asyncio.run(main())
src/py.typed
1
src/__init__.py
src/__main__.py
1import asyncio
2
3from .main import main
4
5# Execute the Actor entry point.
6asyncio.run(main())
src/config/settings.py
1"""Configuration settings for the job search actor."""
2
3import os
4from typing import Dict
5from dotenv import load_dotenv
6
7# Load environment variables
8load_dotenv()
9
10# API Keys - these should be set as environment variables
11APIFY_API_TOKEN = os.getenv("APIFY_API_TOKEN")
12OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
13
14# OpenAI Model Configuration
15OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
16
17# Apify Actor IDs
18ACTOR_IDS: Dict[str, str] = {
19 "linkedin_scraper": "krandiash/linkedin-jobs-scraper",
20 "indeed_scraper": "epctex/indeed-scraper"
21}
22
23# Job Search Settings
24JOB_SEARCH_CONFIG = {
25 "initial_results_per_source": 5, # Reduced for testing
26 "detail_fetch_threshold": 0.6, # Minimum initial score to fetch details
27 "max_details_to_fetch": 5, # Reduced for testing
28 "max_days_old": 30 # Maximum age of job postings
29}
30
31# Scoring weights for initial filtering
32INITIAL_SCORING_WEIGHTS = {
33 "title_match": 0.4,
34 "location_match": 0.3,
35 "company_relevance": 0.2,
36 "posting_date": 0.1
37}
38
39# Detailed scoring weights
40FINAL_SCORING_WEIGHTS = {
41 "position_match": 0.25,
42 "skills_experience": 0.40,
43 "location": 0.20,
44 "company": 0.15,
45 "salary": 0.0,
46 "benefits": 0.0
47}
48
49# System settings
50RETRY_ATTEMPTS = 3
51RETRY_DELAY = 5 # seconds
src/config/__init__.py
1"""Configuration module for job search actor."""
2
3# Config package initialization
4from .settings import (
5 APIFY_API_TOKEN,
6 OPENAI_API_KEY,
7 OPENAI_MODEL,
8 ACTOR_IDS,
9 JOB_SEARCH_CONFIG,
10 INITIAL_SCORING_WEIGHTS,
11 FINAL_SCORING_WEIGHTS,
12 RETRY_ATTEMPTS,
13 RETRY_DELAY
14)
15
16__all__ = [
17 'APIFY_API_TOKEN',
18 'OPENAI_API_KEY',
19 'OPENAI_MODEL',
20 'ACTOR_IDS',
21 'JOB_SEARCH_CONFIG',
22 'INITIAL_SCORING_WEIGHTS',
23 'FINAL_SCORING_WEIGHTS',
24 'RETRY_ATTEMPTS',
25 'RETRY_DELAY'
26]
src/agents/base.py
1from typing import Any, Dict, Optional
2from apify_client import ApifyClient
3from abc import ABC, abstractmethod
4import logging
5
6# Import the retry utility using absolute import
7from src.utils.retry import with_retry
8
9logger = logging.getLogger(__name__)
10
11class BaseAgent(ABC):
12 def __init__(self, apify_client: ApifyClient, actor_id: Optional[str] = None):
13 self.apify_client = apify_client
14 self.actor_id = actor_id
15 self.system_prompt = self._get_system_prompt()
16
17 @abstractmethod
18 def _get_system_prompt(self) -> str:
19 """Return the system prompt for this agent."""
20 raise NotImplementedError("Subclasses must implement _get_system_prompt")
21
22 @abstractmethod
23 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
24 """Process the current state and return updated state."""
25 raise NotImplementedError("Subclasses must implement process")
26
27 @with_retry()
28 def run_actor(self, actor_id: Optional[str] = None, input_data: Dict[str, Any] = None) -> Dict[str, Any]:
29 """Run the associated Apify actor if one exists."""
30 actor_id = actor_id or self.actor_id
31 if not actor_id:
32 raise ValueError("No actor_id specified for this agent")
33
34 try:
35 run = self.apify_client.actor(actor_id).call(run_input=input_data)
36 dataset_items = self.apify_client.dataset(run["defaultDatasetId"]).list_items().items
37 return dataset_items
38 except Exception as e:
39 # Log the error and re-raise
40 error_msg = f"Error running actor {actor_id}: {str(e)}"
41 logger.error(error_msg)
42 raise
43
44 async def _safe_api_call(self, func, *args, **kwargs):
45 """Safely make an API call with error handling."""
46 try:
47 return await func(*args, **kwargs)
48 except Exception as e:
49 logger.error(f"API call failed: {str(e)}")
50 raise
src/agents/feedback_refiner.py
1from typing import Dict, Any, List
2from langchain_openai import ChatOpenAI
3from langchain_core.messages import SystemMessage, HumanMessage
4from .base import BaseAgent
5from ..config.settings import OPENAI_API_KEY, OPENAI_MODEL
6
7class FeedbackRefinerAgent(BaseAgent):
8 def __init__(self, apify_client):
9 super().__init__(apify_client)
10 self.llm = ChatOpenAI(
11 api_key=OPENAI_API_KEY,
12 model=OPENAI_MODEL,
13 temperature=0.2
14 )
15
16 def _get_system_prompt(self) -> str:
17 return """
18 Feedback Refiner Agent responsible for:
19 1. Processing user feedback on job matches
20 2. Refining search parameters
21 3. Adjusting scoring weights based on preferences
22 4. Improving match quality over time
23 """
24
25 def _analyze_feedback(self, feedback: Dict[str, Any]) -> Dict[str, Any]:
26 """Analyze user feedback to determine parameter adjustments."""
27 adjustments = {
28 "search_params": {},
29 "scoring_weights": {},
30 "filters": {}
31 }
32
33 # Process location preferences
34 if feedback.get("location_feedback"):
35 location_pref = feedback["location_feedback"]
36 adjustments["search_params"]["location"] = location_pref
37 if "remote" in location_pref.lower():
38 adjustments["filters"]["remote_only"] = True
39
40 # Process role preferences
41 if feedback.get("role_feedback"):
42 role_pref = feedback["role_feedback"]
43 adjustments["search_params"]["keywords"] = role_pref
44
45 # Process experience level preferences
46 if feedback.get("experience_feedback"):
47 exp_pref = feedback["experience_feedback"]
48 adjustments["search_params"]["experience_level"] = exp_pref
49
50 # Process salary preferences
51 if feedback.get("salary_feedback"):
52 salary_pref = feedback["salary_feedback"]
53 adjustments["filters"]["salary_min"] = salary_pref.get("min")
54 adjustments["filters"]["salary_max"] = salary_pref.get("max")
55
56 # Process company preferences
57 if feedback.get("company_preferences"):
58 company_prefs = feedback["company_preferences"]
59 adjustments["filters"]["preferred_companies"] = company_prefs
60
61 return adjustments
62
63 def _refine_search_parameters(self, current_params: Dict[str, Any], adjustments: Dict[str, Any]) -> Dict[str, Any]:
64 """Refine search parameters based on feedback analysis."""
65 refined_params = current_params.copy()
66
67 # Update search parameters
68 if "search_params" in adjustments:
69 refined_params.update(adjustments["search_params"])
70
71 # Update filters
72 if "filters" in adjustments:
73 if "filters" not in refined_params:
74 refined_params["filters"] = {}
75 refined_params["filters"].update(adjustments["filters"])
76
77 # Update scoring weights if provided
78 if "scoring_weights" in adjustments:
79 if "scoring_weights" not in refined_params:
80 refined_params["scoring_weights"] = {}
81 refined_params["scoring_weights"].update(adjustments["scoring_weights"])
82
83 return refined_params
84
85 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
86 """Process user feedback and refine search parameters."""
87 feedback = state.get("feedback", {})
88 current_params = state.get("search_parameters", {})
89
90 if not feedback:
91 return state
92
93 # Analyze feedback
94 adjustments = self._analyze_feedback(feedback)
95
96 # Refine parameters
97 refined_params = self._refine_search_parameters(current_params, adjustments)
98
99 # Update state
100 state["search_parameters"] = refined_params
101 state["feedback_processed"] = True
102 state["parameter_adjustments"] = adjustments
103
104 return state
src/agents/job_scraper.py
1from typing import Any, Dict, List
2from datetime import datetime, timedelta
3import asyncio
4from .base import BaseAgent
5from ..config.settings import (
6 ACTOR_IDS,
7 JOB_SEARCH_CONFIG,
8 INITIAL_SCORING_WEIGHTS
9)
10from ..models.schema import ResumeData
11
12class JobScraperAgent(BaseAgent):
13 def __init__(self, apify_client):
14 super().__init__(apify_client)
15 self.linkedin_search_actor = ACTOR_IDS["linkedin_jobs_search"]
16 self.linkedin_detail_actor = ACTOR_IDS["linkedin_job_detail"]
17 self.indeed_scraper = ACTOR_IDS["indeed_scraper"]
18
19 def _get_system_prompt(self) -> str:
20 return """
21 Job Scraping Agent responsible for:
22 1. Initial job search from multiple sources
23 2. Preliminary filtering and scoring
24 3. Detailed information gathering for promising matches
25 """
26
27 def _search_linkedin_jobs(self, search_params: Dict[str, Any]) -> List[Dict]:
28 """Perform initial LinkedIn job search."""
29 search_input = {
30 "keywords": search_params["keywords"],
31 "location": search_params["location"],
32 "limit": JOB_SEARCH_CONFIG["initial_results_per_source"]
33 }
34
35 return self.run_actor(
36 actor_id=self.linkedin_search_actor,
37 input_data=search_input
38 )
39
40 def _search_indeed_jobs(self, search_params: Dict[str, Any]) -> List[Dict]:
41 """Perform initial Indeed job search."""
42 search_input = {
43 "keyword": search_params["keywords"],
44 "location": search_params["location"],
45 "maxResults": JOB_SEARCH_CONFIG["initial_results_per_source"]
46 }
47
48 return self.run_actor(
49 actor_id=self.indeed_scraper,
50 input_data=search_input
51 )
52
53 def _calculate_initial_score(self, job: Dict, resume_data: ResumeData) -> float:
54 """Calculate preliminary score for a job listing."""
55 scores = {
56 "title_match": self._score_title_match(job["title"], resume_data.desired_role),
57 "location_match": self._score_location_match(job["location"], resume_data.location_preference),
58 "company_relevance": self._score_company_relevance(job["company"], resume_data.industry_experience),
59 "posting_date": self._score_posting_date(job.get("posted_date", datetime.now()))
60 }
61
62 return sum(
63 scores[key] * INITIAL_SCORING_WEIGHTS[key]
64 for key in INITIAL_SCORING_WEIGHTS
65 )
66
67 def _fetch_job_details(self, job_urls: List[str]) -> List[Dict]:
68 """Fetch detailed information for promising job listings."""
69 detailed_jobs = []
70 for url in job_urls:
71 if "linkedin.com" in url:
72 job_details = self.run_actor(
73 actor_id=self.linkedin_detail_actor,
74 input_data={"url": url}
75 )
76 detailed_jobs.extend(job_details)
77 # Add similar handling for Indeed URLs if needed
78
79 return detailed_jobs
80
81 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
82 """Process the job search workflow."""
83 resume_data = state["resume_data"]
84 work_mode_preference = state["feedback"].get("work_mode_preference", "Any")
85
86 # Prepare search parameters
87 search_params = {
88 "keywords": resume_data.desired_role,
89 "location": resume_data.location_preference
90 }
91
92 # Add work mode to keywords if specified
93 if work_mode_preference and work_mode_preference != "Any":
94 search_params["keywords"] += f" {work_mode_preference}"
95
96 # Initial job search from multiple sources
97 linkedin_jobs = self._search_linkedin_jobs(search_params)
98 indeed_jobs = self._search_indeed_jobs(search_params)
99
100 # Combine and score initial results
101 all_jobs = []
102 for job in linkedin_jobs + indeed_jobs:
103 # Normalize job data structure
104 normalized_job = self._normalize_job_data(job)
105
106 # Calculate initial score
107 score = self._calculate_initial_score(normalized_job, resume_data)
108 normalized_job["initial_score"] = score
109 all_jobs.append(normalized_job)
110
111 # Sort by score and filter
112 promising_jobs = sorted(
113 [j for j in all_jobs if j["initial_score"] >= JOB_SEARCH_CONFIG["detail_fetch_threshold"]],
114 key=lambda x: x["initial_score"],
115 reverse=True
116 )[:JOB_SEARCH_CONFIG["max_details_to_fetch"]]
117
118 # Fetch detailed information for promising jobs
119 job_urls = [job["url"] for job in promising_jobs if "url" in job]
120 detailed_jobs = self._fetch_job_details(job_urls)
121
122 # If we couldn't get detailed jobs, use the promising jobs
123 if not detailed_jobs:
124 detailed_jobs = promising_jobs
125
126 # Update state
127 state["jobs_scraped"] = True
128 state["job_listings"] = detailed_jobs
129
130 return state
131
132 def _normalize_job_data(self, job: Dict[str, Any]) -> Dict[str, Any]:
133 """Normalize job data from different sources to a common format."""
134 normalized = {
135 "job_id": job.get("id", job.get("jobId", str(hash(str(job))))),
136 "title": job.get("title", job.get("name", "Unknown Position")),
137 "company": job.get("company", job.get("companyName", "Unknown Company")),
138 "location": job.get("location", job.get("place", "Unknown Location")),
139 "url": job.get("url", job.get("link", job.get("applicationLink", ""))),
140 "source": "LinkedIn" if "linkedin" in job.get("url", "") else "Indeed"
141 }
142
143 # Handle posted date
144 posted_date = job.get("postedDate", job.get("date", job.get("listedAt", None)))
145 if isinstance(posted_date, str):
146 try:
147 normalized["posted_date"] = datetime.fromisoformat(posted_date.replace('Z', '+00:00'))
148 except (ValueError, TypeError):
149 normalized["posted_date"] = datetime.now() - timedelta(days=7) # Default to 1 week ago
150 else:
151 normalized["posted_date"] = datetime.now() - timedelta(days=7)
152
153 return normalized
154
155 def _score_title_match(self, job_title: str, desired_role: str) -> float:
156 """Score how well the job title matches the desired role."""
157 # Implement fuzzy matching or keyword matching logic
158 job_title = job_title.lower()
159 desired_role = desired_role.lower()
160
161 if job_title == desired_role:
162 return 1.0
163 elif desired_role in job_title or job_title in desired_role:
164 return 0.8
165 # Add more sophisticated matching logic here
166 return 0.4
167
168 def _score_location_match(self, job_location: str, preferred_location: str) -> float:
169 """Score how well the job location matches preferences."""
170 # Implement location matching logic
171 if not job_location or not preferred_location:
172 return 0.5
173
174 job_location = job_location.lower()
175 preferred_location = preferred_location.lower()
176
177 if "remote" in job_location:
178 return 1.0
179 elif preferred_location in job_location:
180 return 1.0
181 # Add more sophisticated location matching logic here
182 return 0.5
183
184 def _score_company_relevance(self, company: str, industry_experience: List[str]) -> float:
185 """Score company relevance based on industry experience."""
186 # Implement company/industry matching logic
187 # This could be enhanced with company industry data
188 return 0.7 # Default score, improve with better matching logic
189
190 def _score_posting_date(self, posted_date: datetime) -> float:
191 """Score job based on how recently it was posted."""
192 days_old = (datetime.now() - posted_date).days
193 if days_old <= 7:
194 return 1.0
195 elif days_old <= 14:
196 return 0.8
197 elif days_old <= 21:
198 return 0.6
199 else:
200 return 0.4
src/agents/manager.py
1from typing import Dict, Any, List
2from langchain_openai import ChatOpenAI
3from langchain_core.messages import SystemMessage, HumanMessage
4
5# Use absolute imports
6from src.agents.base import BaseAgent
7from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL
8from src.models.schema import JobSearchState
9
10class ManagerAgent(BaseAgent):
11 def __init__(self, apify_client):
12 super().__init__(apify_client)
13 self.llm = ChatOpenAI(
14 api_key=OPENAI_API_KEY,
15 model=OPENAI_MODEL,
16 temperature=0.2
17 )
18
19 def _get_system_prompt(self) -> str:
20 return """
21 Manager Agent responsible for:
22 1. Orchestrating the job search workflow
23 2. Determining next steps based on current state
24 3. Handling errors and retries
25 4. Monitoring progress and completion
26 """
27
28 def _determine_next_step(self, state: JobSearchState) -> str:
29 """Determine the next step in the workflow based on current state."""
30 if not state.resume_data and not state.resume_parsed:
31 return "parse_resume"
32
33 if not state.job_listings and not state.scraping_complete:
34 return "search_jobs"
35
36 if state.job_listings and not state.scored_listings:
37 return "score_jobs"
38
39 if state.scored_listings and not state.notification_complete:
40 return "send_notifications"
41
42 return "complete"
43
44 def _check_error_conditions(self, state: JobSearchState) -> List[str]:
45 """Check for error conditions in the current state."""
46 errors = []
47
48 if state.error_log:
49 errors.extend(state.error_log)
50
51 if not state.resume_text:
52 errors.append("Missing resume text")
53
54 if state.scraping_complete and not state.job_listings:
55 errors.append("No jobs found during search")
56
57 return errors
58
59 def _handle_retry_logic(self, state: JobSearchState, errors: List[str]) -> Dict[str, Any]:
60 """Handle retry logic for failed operations."""
61 retry_actions = {}
62
63 for error in errors:
64 if "API rate limit" in error:
65 retry_actions["wait_time"] = 60
66 retry_actions["retry_count"] = state.get("retry_count", 0) + 1
67 elif "No jobs found" in error:
68 retry_actions["broaden_search"] = True
69 retry_actions["retry_count"] = state.get("retry_count", 0) + 1
70
71 return retry_actions
72
73 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
74 """Process the current state and determine next actions."""
75 # Convert state to JobSearchState for type checking
76 job_state = JobSearchState(**state)
77
78 # Check for errors
79 errors = self._check_error_conditions(job_state)
80
81 if errors:
82 # Handle retries if needed
83 retry_actions = self._handle_retry_logic(job_state, errors)
84 state.update(retry_actions)
85
86 if retry_actions.get("retry_count", 0) > 3:
87 state["status"] = "failed"
88 state["error_message"] = "Max retries exceeded"
89 return state
90
91 # Determine next step
92 next_step = self._determine_next_step(job_state)
93
94 # Update state
95 state["current_phase"] = next_step
96 state["next_step"] = next_step
97
98 if next_step == "complete":
99 state["status"] = "completed"
100
101 return state
src/agents/notification.py
1from typing import Dict, Any, List
2from langchain_openai import ChatOpenAI
3from langchain_core.messages import SystemMessage, HumanMessage
4
5# Use absolute imports
6from src.agents.base import BaseAgent
7from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL
8from src.models.schema import ScoredJobListing
9
10class NotificationAgent(BaseAgent):
11 def __init__(self, apify_client):
12 super().__init__(apify_client)
13 self.llm = ChatOpenAI(
14 api_key=OPENAI_API_KEY,
15 model=OPENAI_MODEL,
16 temperature=0.2
17 )
18
19 def _get_system_prompt(self) -> str:
20 return """
21 Notification Agent responsible for:
22 1. Generating personalized job match summaries
23 2. Creating detailed match explanations
24 3. Formatting notifications for delivery
25 """
26
27 def _generate_job_summary(self, job: ScoredJobListing) -> str:
28 """Generate a human-readable summary of a job match."""
29 summary = f"""
30🎯 {job.title} at {job.company}
31📍 {job.location} {'(Remote)' if job.remote_status else ''}
32💼 Match Score: {job.total_score}%
33
34Key Matches:
35- Position: {job.match_details['position_match']}
36- Skills & Experience: {job.match_details['skills_experience']}
37- Location: {job.match_details['location']}
38- Company: {job.match_details['company']}
39{f"- Salary: {job.salary_info}" if job.salary_info else ""}
40
41Apply here: {job.application_url}
42 """
43 return summary.strip()
44
45 def _create_batch_summary(self, jobs: List[ScoredJobListing], resume_data: Dict[str, Any]) -> str:
46 """Create a summary for a batch of job matches."""
47 total_jobs = len(jobs)
48 high_matches = len([j for j in jobs if j.total_score >= 80])
49 avg_score = sum(j.total_score for j in jobs) / total_jobs if total_jobs > 0 else 0
50
51 summary = f"""
52📊 Job Search Results Summary
53============================
54🎯 Searching for: {resume_data.get('desired_role', 'Not specified')}
55📍 Location: {resume_data.get('location_preference', 'Not specified')}
56
57📈 Match Statistics:
58- Total Jobs Found: {total_jobs}
59- High Match Jobs (80%+): {high_matches}
60- Average Match Score: {avg_score:.1f}%
61
62🔝 Top Matches:
63"""
64
65 # Add top 5 matches
66 for i, job in enumerate(jobs[:5], 1):
67 summary += f"\n{i}. {job.title} at {job.company}"
68 summary += f"\n Match Score: {job.total_score}%"
69 summary += f"\n Location: {job.location}"
70 if job.salary_info:
71 summary += f"\n Salary: {job.salary_info}"
72 summary += "\n"
73
74 return summary.strip()
75
76 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
77 """Process scored job listings and generate notifications."""
78 scored_listings = state.get("scored_listings", [])
79 resume_data = state.get("resume_data", {})
80
81 if not scored_listings:
82 state["notifications"] = []
83 state["batch_summary"] = "No job matches found."
84 return state
85
86 # Convert to ScoredJobListing objects
87 job_objects = [ScoredJobListing(**job) for job in scored_listings]
88
89 # Generate individual job summaries
90 notifications = []
91 for job in job_objects:
92 summary = self._generate_job_summary(job)
93 notifications.append({
94 "job_id": job.job_id,
95 "summary": summary,
96 "score": job.total_score
97 })
98
99 # Generate batch summary
100 batch_summary = self._create_batch_summary(job_objects, resume_data)
101
102 # Update state
103 state["notifications"] = notifications
104 state["batch_summary"] = batch_summary
105 state["notification_complete"] = True
106
107 return state
src/agents/relevance_scorer.py
1from typing import Dict, Any, List
2import logging
3from langchain_openai import ChatOpenAI
4from langchain_core.messages import SystemMessage, HumanMessage
5import json
6
7# Use absolute imports
8from src.agents.base import BaseAgent
9from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL, FINAL_SCORING_WEIGHTS
10from src.models.schema import JobListing, ScoredJobListing
11
12logger = logging.getLogger(__name__)
13
14class RelevanceScorerAgent(BaseAgent):
15 def __init__(self, apify_client):
16 super().__init__(apify_client)
17 self.llm = ChatOpenAI(
18 api_key=OPENAI_API_KEY,
19 model=OPENAI_MODEL,
20 temperature=0.2
21 )
22
23 def _get_system_prompt(self) -> str:
24 return """
25 Relevance Scorer Agent responsible for:
26 1. Analyzing job details against resume
27 2. Calculating match scores for skills, experience, location
28 3. Ranking job opportunities by relevance
29 """
30
31 def _calculate_position_match(self, job_title: str, desired_role: str) -> int:
32 """Calculate position match score (0-100)."""
33 if not job_title or not desired_role:
34 return 0
35
36 job_title = job_title.lower()
37 desired_role = desired_role.lower()
38
39 # Exact match
40 if job_title == desired_role:
41 return 100
42
43 # Partial match
44 if desired_role in job_title or job_title in desired_role:
45 return 80
46
47 # Check for common keywords
48 common_keywords = ["engineer", "developer", "manager", "analyst", "designer", "specialist"]
49 job_keywords = [kw for kw in common_keywords if kw in job_title]
50 role_keywords = [kw for kw in common_keywords if kw in desired_role]
51
52 common_count = len(set(job_keywords).intersection(set(role_keywords)))
53 if common_count > 0:
54 return 60
55
56 return 30 # Default low match
57
58 def _calculate_skills_match(self, job_description: str, skills: List[str]) -> int:
59 """Calculate skills match score (0-100)."""
60 if not job_description or not skills:
61 return 0
62
63 job_description = job_description.lower()
64 matched_skills = [skill for skill in skills if skill.lower() in job_description]
65
66 if not skills:
67 return 0
68
69 match_percentage = (len(matched_skills) / len(skills)) * 100
70 return min(int(match_percentage), 100)
71
72 def _calculate_location_match(self, job_location: str, preferred_location: str, remote: bool) -> int:
73 """Calculate location match score (0-100)."""
74 if not job_location:
75 return 0
76
77 # If remote job and candidate prefers remote
78 if remote and "remote" in preferred_location.lower():
79 return 100
80
81 job_location = job_location.lower()
82 preferred_location = preferred_location.lower()
83
84 # Exact location match
85 if preferred_location in job_location or job_location in preferred_location:
86 return 100
87
88 # Check for city/state match
89 location_parts = preferred_location.split(',')
90 for part in location_parts:
91 part = part.strip()
92 if part and part in job_location:
93 return 80
94
95 return 40 # Default low match
96
97 def _score_job(self, job: Dict[str, Any], resume_data: Dict[str, Any]) -> Dict[str, Any]:
98 """Score a job listing against resume data."""
99 # Extract job details
100 job_title = job.get("title", "")
101 job_description = job.get("description", "")
102 job_location = job.get("location", "")
103 company = job.get("company", "")
104 remote = job.get("remote_status", False)
105
106 # Extract resume details
107 desired_role = resume_data.get("desired_role", "")
108 skills = resume_data.get("skills", [])
109 preferred_location = resume_data.get("location_preference", "")
110
111 # Calculate component scores
112 position_score = self._calculate_position_match(job_title, desired_role)
113 skills_score = self._calculate_skills_match(job_description, skills)
114 location_score = self._calculate_location_match(job_location, preferred_location, remote)
115 company_score = 70 # Default company score
116
117 # Apply weights from settings
118 weighted_scores = {
119 "position_match": position_score * FINAL_SCORING_WEIGHTS["position_match"],
120 "skills_experience": skills_score * FINAL_SCORING_WEIGHTS["skills_experience"],
121 "location": location_score * FINAL_SCORING_WEIGHTS["location"],
122 "company": company_score * FINAL_SCORING_WEIGHTS["company"]
123 }
124
125 # Calculate total score (0-100)
126 total_score = sum(weighted_scores.values())
127
128 # Create match details
129 match_details = {
130 "position_match": f"{position_score}% match with desired role '{desired_role}'",
131 "skills_experience": f"{skills_score}% of skills match the job requirements",
132 "location": f"{location_score}% location match with preference '{preferred_location}'",
133 "company": f"{company_score}% company match"
134 }
135
136 return {
137 "total_score": int(total_score),
138 "score_breakdown": weighted_scores,
139 "match_details": match_details
140 }
141
142 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
143 """Process job listings and score them against resume data."""
144 job_listings = state.get("job_listings", [])
145 resume_data = state.get("resume_data", {})
146
147 if not job_listings:
148 state["scored_listings"] = []
149 return state
150
151 if not resume_data:
152 state["error_log"] = state.get("error_log", []) + ["Missing resume data for scoring"]
153 return state
154
155 # Score each job
156 scored_jobs = []
157 for job in job_listings:
158 try:
159 # Standardize job data format if needed
160 standardized_job = self._standardize_job_data(job)
161
162 # Skip jobs with missing essential data
163 if not standardized_job.get("title") or not standardized_job.get("company"):
164 continue
165
166 # Score the job
167 score_data = self._score_job(standardized_job, resume_data)
168
169 # Create ScoredJobListing
170 scored_job = ScoredJobListing(
171 job_id=standardized_job.get("job_id", ""),
172 title=standardized_job.get("title", ""),
173 company=standardized_job.get("company", ""),
174 location=standardized_job.get("location", ""),
175 remote_status=standardized_job.get("remote_status", False),
176 description=standardized_job.get("description", ""),
177 required_skills=standardized_job.get("required_skills", []),
178 salary_info=standardized_job.get("salary_info", ""),
179 posted_date=standardized_job.get("posted_date", ""),
180 application_url=standardized_job.get("application_url", ""),
181 source=standardized_job.get("source", ""),
182 total_score=score_data["total_score"],
183 score_breakdown=score_data["score_breakdown"],
184 match_details=score_data["match_details"]
185 )
186
187 scored_jobs.append(scored_job.dict())
188
189 except Exception as e:
190 logger.error(f"Error scoring job: {str(e)}")
191 continue
192
193 # Sort by score (descending)
194 scored_jobs.sort(key=lambda x: x["total_score"], reverse=True)
195
196 # Take top 10 matches
197 top_matches = scored_jobs[:10]
198
199 # Update state
200 state["scored_listings"] = top_matches
201
202 return state
203
204 def _standardize_job_data(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
205 """Standardize job data from different sources."""
206 # Handle case where job_data is a list
207 if isinstance(job_data, list):
208 if not job_data:
209 return {}
210 job_data = job_data[0] # Take the first item
211
212 # Check if job data is in LinkedIn format
213 if "job_title" in job_data:
214 return {
215 "job_id": job_data.get("job_id", ""),
216 "title": job_data.get("job_title", ""),
217 "company": job_data.get("company_name", ""),
218 "location": job_data.get("job_location", ""),
219 "remote_status": "remote" in job_data.get("job_location", "").lower(),
220 "description": job_data.get("job_description", ""),
221 "required_skills": job_data.get("job_skills", []),
222 "salary_info": job_data.get("salary_range", ""),
223 "posted_date": job_data.get("posted_date", ""),
224 "application_url": job_data.get("job_url", ""),
225 "source": "LinkedIn"
226 }
227
228 # Check if job data is in Indeed format
229 elif "jobTitle" in job_data:
230 return {
231 "job_id": job_data.get("jobId", ""),
232 "title": job_data.get("jobTitle", ""),
233 "company": job_data.get("companyName", ""),
234 "location": job_data.get("location", ""),
235 "remote_status": "remote" in job_data.get("location", "").lower(),
236 "description": job_data.get("description", ""),
237 "required_skills": [], # Indeed doesn't provide skills directly
238 "salary_info": job_data.get("salary", ""),
239 "posted_date": job_data.get("date", ""),
240 "application_url": job_data.get("url", ""),
241 "source": "Indeed"
242 }
243
244 # Default case - return as is
245 return job_data
src/agents/resume_parser.py
1from typing import Dict, Any
2import logging
3from langchain_openai import ChatOpenAI
4from langchain_core.messages import SystemMessage, HumanMessage
5import json
6
7# Use absolute imports
8from src.agents.base import BaseAgent
9from src.config.settings import OPENAI_API_KEY, OPENAI_MODEL
10from src.models.schema import ResumeData
11
12logger = logging.getLogger(__name__)
13
14class ResumeParserAgent(BaseAgent):
15 def __init__(self, apify_client):
16 super().__init__(apify_client)
17 self.llm = ChatOpenAI(
18 api_key=OPENAI_API_KEY,
19 model=OPENAI_MODEL,
20 temperature=0.2
21 )
22
23 def _get_system_prompt(self) -> str:
24 return """
25 Resume Parser Agent responsible for:
26 1. Extracting structured information from resume text
27 2. Identifying key skills and experience
28 3. Determining job search parameters
29 """
30
31 def _extract_basic_info(self, resume_text: str) -> Dict[str, Any]:
32 """Extract basic information from resume text."""
33 try:
34 client = ChatOpenAI(
35 api_key=OPENAI_API_KEY,
36 model=OPENAI_MODEL,
37 temperature=0.2
38 )
39
40 system_message = SystemMessage(content="""
41 You are a resume parsing expert. Extract the following information from the resume:
42 - Desired role (based on most recent experience or stated objective)
43 - Location preference (if mentioned)
44 - Total years of experience
45 - Skills (technical and soft skills)
46 - Industry experience
47 - Work experience (list of jobs with title, company, duration)
48 - Education (list of degrees with institution, year)
49
50 Format your response as a JSON object.
51 """)
52
53 human_message = HumanMessage(content=resume_text)
54
55 response = client.invoke([system_message, human_message])
56
57 # Extract JSON from response
58 response_text = response.content
59
60 # Find JSON in the response
61 json_start = response_text.find('{')
62 json_end = response_text.rfind('}') + 1
63
64 if json_start >= 0 and json_end > json_start:
65 json_str = response_text[json_start:json_end]
66 return json.loads(json_str)
67 else:
68 logger.warning("Could not extract JSON from OpenAI response")
69 return self._basic_extraction_fallback(resume_text)
70
71 except Exception as e:
72 logger.error(f"Error extracting resume info: {str(e)}")
73 return self._basic_extraction_fallback(resume_text)
74
75 def _basic_extraction_fallback(self, resume_text: str) -> Dict[str, Any]:
76 """Basic fallback extraction when OpenAI fails."""
77 # Extract some basic info using simple heuristics
78 lines = resume_text.split('\n')
79 skills = []
80
81 for line in lines:
82 if "skills" in line.lower() and ":" in line:
83 skills_text = line.split(":", 1)[1].strip()
84 skills = [s.strip() for s in skills_text.split(',')]
85 break
86
87 return {
88 "desired_role": "Software Engineer", # Default
89 "location_preference": "Remote", # Default
90 "total_years_experience": 3, # Default
91 "skills": skills[:10] if skills else ["Python", "JavaScript"],
92 "industry_experience": ["Technology"],
93 "experience": [
94 {
95 "title": "Software Engineer",
96 "company": "Unknown",
97 "duration": "3 years"
98 }
99 ],
100 "education": [
101 {
102 "degree": "Bachelor's Degree",
103 "institution": "Unknown University",
104 "year": 2020
105 }
106 ]
107 }
108
109 async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
110 """Process resume text and extract information."""
111 resume_text = state.get("resume_text", "")
112
113 if not resume_text:
114 state["error_log"] = state.get("error_log", []) + ["Missing resume text"]
115 return state
116
117 # Extract resume information
118 resume_info = self._extract_basic_info(resume_text)
119
120 # Create ResumeData instance
121 resume_data = ResumeData(
122 skills=resume_info.get("skills", []),
123 work_experience=resume_info.get("experience", []),
124 education=resume_info.get("education", []),
125 desired_role=resume_info.get("desired_role", ""),
126 location_preference=resume_info.get("location_preference", ""),
127 industry_experience=resume_info.get("industry_experience", []),
128 total_years_experience=resume_info.get("total_years_experience", 0)
129 )
130
131 # Update state
132 state["resume_data"] = resume_data.dict()
133 state["resume_parsed"] = True
134
135 return state
src/agents/__init__.py
1"""Agent module for job search workflow."""
2
3# Agents package initialization
4from .base import BaseAgent
5from .resume_parser import ResumeParserAgent
6from .job_scraper import JobScraperAgent
7from .relevance_scorer import RelevanceScorerAgent
8from .feedback_refiner import FeedbackRefinerAgent
9from .notification import NotificationAgent
10from .manager import ManagerAgent
11
12__all__ = [
13 'BaseAgent',
14 'ResumeParserAgent',
15 'JobScraperAgent',
16 'RelevanceScorerAgent',
17 'FeedbackRefinerAgent',
18 'NotificationAgent',
19 'ManagerAgent'
20]
src/models/schema.py
1from typing import Dict, Any, List, Optional
2from pydantic import BaseModel
3
4class WorkExperience(BaseModel):
5 """Work experience entry in a resume."""
6 title: str
7 company: str
8 duration: str
9 description: Optional[str] = None
10
11class Education(BaseModel):
12 """Education entry in a resume."""
13 degree: str
14 institution: str
15 year: int
16 area: Optional[str] = None
17
18class ResumeData(BaseModel):
19 """Structured data extracted from a resume."""
20 skills: List[str] = []
21 work_experience: List[Dict[str, Any]] = []
22 education: List[Dict[str, Any]] = []
23 desired_role: str = ""
24 location_preference: str = ""
25 industry_experience: List[str] = []
26 total_years_experience: int = 0
27
28class JobListing(BaseModel):
29 """Job listing data structure."""
30 job_id: str = ""
31 title: str = ""
32 company: str = ""
33 location: str = ""
34 remote_status: bool = False
35 description: str = ""
36 required_skills: List[str] = []
37 salary_info: str = ""
38 posted_date: str = ""
39 application_url: str = ""
40 source: str = ""
41
42class ScoredJobListing(JobListing):
43 """Job listing with relevance score."""
44 total_score: int = 0
45 score_breakdown: Dict[str, float] = {}
46 match_details: Dict[str, str] = {}
47
48class JobSearchState(BaseModel):
49 """State for the job search workflow."""
50 resume_text: str = ""
51 resume_parsed: bool = False
52 scraping_complete: bool = False
53 current_phase: str = ""
54 next_step: str = ""
55 resume_data: Dict[str, Any] = {}
56 job_listings: List[Dict[str, Any]] = []
57 scored_listings: List[Dict[str, Any]] = []
58 notifications: List[Dict[str, Any]] = []
59 batch_summary: str = ""
60 notification_complete: bool = False
61 error_log: List[str] = []
src/models/__init__.py
1"""Models module for job search data structures."""
2
3# Models package initialization
4from .schema import (
5 WorkExperience,
6 Education,
7 ResumeData,
8 JobListing,
9 ScoredJobListing,
10 JobSearchState
11)
12
13__all__ = [
14 'WorkExperience',
15 'Education',
16 'ResumeData',
17 'JobListing',
18 'ScoredJobListing',
19 'JobSearchState'
20]
src/utils/retry.py
1"""Retry utility for handling transient errors."""
2
3import time
4import logging
5import functools
6from typing import Callable, Any, Optional
7
8# Use absolute imports
9from src.config.settings import RETRY_ATTEMPTS, RETRY_DELAY
10
11logger = logging.getLogger(__name__)
12
13def with_retry(max_attempts: Optional[int] = None, delay: Optional[int] = None):
14 """
15 Decorator for retrying a function if it raises an exception.
16
17 Args:
18 max_attempts: Maximum number of attempts (default from settings)
19 delay: Delay between attempts in seconds (default from settings)
20 """
21 max_attempts = max_attempts or RETRY_ATTEMPTS
22 delay = delay or RETRY_DELAY
23
24 def decorator(func: Callable) -> Callable:
25 @functools.wraps(func)
26 def wrapper(*args, **kwargs) -> Any:
27 attempts = 0
28 while attempts < max_attempts:
29 try:
30 return func(*args, **kwargs)
31 except Exception as e:
32 attempts += 1
33 if attempts >= max_attempts:
34 logger.error(f"Function {func.__name__} failed after {attempts} attempts: {str(e)}")
35 raise
36
37 logger.warning(f"Attempt {attempts} failed for {func.__name__}: {str(e)}. Retrying in {delay} seconds...")
38 time.sleep(delay)
39
40 # This should never be reached due to the raise in the except block
41 return None
42
43 return wrapper
44
45 return decorator
src/utils/__init__.py
1"""Utility functions for the job search actor."""
2
3# Utils package initialization
4from .retry import with_retry
5
6__all__ = ['with_retry']