Calculator MCP Server
Pricing
Pay per usage
Go to Store
Calculator MCP Server
0.0 (0)
Pricing
Pay per usage
0
Total users
3
Monthly users
2
Runs succeeded
>99%
Last modified
a day ago
.dockerignore
.git.mise.toml.nvim.luastorage
# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files__pycache__/*.py[cod]*$py.class
# C extensions*.so
# Distribution / packaging.Pythonbuild/develop-eggs/dist/downloads/eggs/.eggs/lib/lib64/parts/sdist/var/wheels/share/python-wheels/*.egg-info/.installed.cfg*.eggMANIFEST
# PyInstaller# Usually these files are written by a python script from a template# before PyInstaller builds the exe, so as to inject date/other infos into it.*.manifest*.spec
# Installer logspip-log.txtpip-delete-this-directory.txt
# Unit test / coverage reportshtmlcov/.tox/.nox/.coverage.coverage.*.cachenosetests.xmlcoverage.xml*.cover*.py,cover.hypothesis/.pytest_cache/cover/
# Translations*.mo*.pot
# Django stuff:*.loglocal_settings.pydb.sqlite3db.sqlite3-journal
# Flask stuff:instance/.webassets-cache
# Scrapy stuff:.scrapy
# Sphinx documentationdocs/_build/
# PyBuilder.pybuilder/target/
# Jupyter Notebook.ipynb_checkpoints
# IPythonprofile_default/ipython_config.py
# pyenv# For a library or package, you might want to ignore these files since the code is# intended to run in multiple environments; otherwise, check them in:.python-version
# pdm# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.#pdm.lock# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it# in version control.# https://pdm.fming.dev/latest/usage/project/#working-with-version-control.pdm.toml.pdm-python.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm__pypackages__/
# Celery stuffcelerybeat-schedulecelerybeat.pid
# SageMath parsed files*.sage.py
# Environments.env.venvenv/venv/ENV/env.bak/venv.bak/
# Spyder project settings.spyderproject.spyproject
# Rope project settings.ropeproject
# mkdocs documentation/site
# mypy.mypy_cache/.dmypy.jsondmypy.json
# Pyre type checker.pyre/
# pytype static type analyzer.pytype/
# Cython debug symbolscython_debug/
# PyCharm# JetBrains specific template is maintained in a separate JetBrains.gitignore that can# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore# and can be added to the global gitignore or merged into this file. For a more nuclear# option (not recommended) you can uncomment the following to ignore the entire idea folder..idea/
# Visual Studio Code# Ignores the folder created by VS Code when changing workspace settings, doing debugger# configuration, etc. Can be commented out to share Workspace Settings within a team.vscode
# Zed editor# Ignores the folder created when setting Project Settings in the Zed editor. Can be commented out# to share Project Settings within a team.zed
.gitignore
.mise.toml.nvim.luastorage
# The rest is copied from https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files__pycache__/*.py[cod]*$py.class
# C extensions*.so
# Distribution / packaging.Pythonbuild/develop-eggs/dist/downloads/eggs/.eggs/lib/lib64/parts/sdist/var/wheels/share/python-wheels/*.egg-info/.installed.cfg*.eggMANIFEST
# PyInstaller# Usually these files are written by a python script from a template# before PyInstaller builds the exe, so as to inject date/other infos into it.*.manifest*.spec
# Installer logspip-log.txtpip-delete-this-directory.txt
# Unit test / coverage reportshtmlcov/.tox/.nox/.coverage.coverage.*.cachenosetests.xmlcoverage.xml*.cover*.py,cover.hypothesis/.pytest_cache/cover/
# Translations*.mo*.pot
# Django stuff:*.loglocal_settings.pydb.sqlite3db.sqlite3-journal
# Flask stuff:instance/.webassets-cache
# Scrapy stuff:.scrapy
# Sphinx documentationdocs/_build/
# PyBuilder.pybuilder/target/
# Jupyter Notebook.ipynb_checkpoints
# IPythonprofile_default/ipython_config.py
# pyenv# For a library or package, you might want to ignore these files since the code is# intended to run in multiple environments; otherwise, check them in:.python-version
# pdm# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.#pdm.lock# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it# in version control.# https://pdm.fming.dev/latest/usage/project/#working-with-version-control.pdm.toml.pdm-python.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm__pypackages__/
# Celery stuffcelerybeat-schedulecelerybeat.pid
# SageMath parsed files*.sage.py
# Environments.env.venvenv/venv/ENV/env.bak/venv.bak/
# Spyder project settings.spyderproject.spyproject
# Rope project settings.ropeproject
# mkdocs documentation/site
# mypy.mypy_cache/.dmypy.jsondmypy.json
# Pyre type checker.pyre/
# pytype static type analyzer.pytype/
# Cython debug symbolscython_debug/
# PyCharm# JetBrains specific template is maintained in a separate JetBrains.gitignore that can# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore# and can be added to the global gitignore or merged into this file. For a more nuclear# option (not recommended) you can uncomment the following to ignore the entire idea folder..idea/
# Visual Studio Code# Ignores the folder created by VS Code when changing workspace settings, doing debugger# configuration, etc. Can be commented out to share Workspace Settings within a team.vscode
# Zed editor# Ignores the folder created when setting Project Settings in the Zed editor. Can be commented out# to share Project Settings within a team.zed
# Added by Apify CLInode_modules
Dockerfile
# First, specify the base Docker image.# You can see the Docker images from Apify at https://hub.docker.com/r/apify/.# You can also use any other image from Docker Hub.FROM apify/actor-python:3.13
# Second, copy just requirements.txt into the Actor image,# since it should be the only file that affects the dependency installation in the next step,# in order to speed up the build.COPY requirements.txt ./
# Install the packages specified in requirements.txt,# print the installed Python version, pip version,# and all installed packages with their versions for debugging.RUN echo "Python version:" \ && python --version \ && echo "Pip version:" \ && pip --version \ && echo "Installing dependencies:" \ && pip install -r requirements.txt \ && echo "All installed Python packages:" \ && pip freeze
# Next, copy the remaining files and directories with the source code.# Since we do this after installing the dependencies, quick builds will be really fast# for most source file changes.COPY . ./
# Use compileall to ensure the runnability of the Actor Python code.RUN python3 -m compileall -q src/
# Create and run as a non-root user.RUN useradd --create-home apify && \ chown -R apify:apify ./USER apify
# Specify how to launch the source code of your Actor.# By default, the "python3 -m ." command is run.CMD ["python3", "-m", "src"]
requirements.txt
1apify<3.0.02apify_client<2.0.03mcp-server-calculator==0.2.04fastapi==0.116.05mcp==1.10.16pydantic>=2.0.07sse-starlette>=1.8.08uv>=0.7.89uvicorn>=0.27.0
.actor/actor.json
{ "actorSpecification": 1, "name": "calculator-MCP-server", "title": "Python MCP server", "description": "A Model Context Protocol server for calculating. This server enables LLMs to use calculator for precise numerical calculations.", "version": "0.0", "buildTag": "latest", "usesStandbyMode": true, "meta": { "templateId": "python-mcp-server" }, "input": { "title": "Actor input schema", "description": "This is Actor input schema", "type": "object", "schemaVersion": 1, "properties": {}, "required": [] }, "dockerfile": "../Dockerfile", "webServerMcpPath": "/sse,/mcp"}
.actor/pay_per_event.json
{ "actor-start": { "eventTitle": "MCP server startup", "eventDescription": "Initial fee for starting the Actor MCP Server", "eventPriceUsd": 0.1 }, "resource-list": { "eventTitle": "MCP resource listing", "eventDescription": "Fee for listing available resources.", "eventPriceUsd": 0.0001 }, "resource-read": { "eventTitle": "MCP resource access", "eventDescription": "Fee for accessing full content or resources.", "eventPriceUsd": 0.001 }, "prompt-list": { "eventTitle": "MCP prompt listing", "eventDescription": "Fee for listing available prompts.", "eventPriceUsd": 0.0001 }, "prompt-get": { "eventTitle": "MCP prompt processing", "eventDescription": "Fee for processing AI prompts.", "eventPriceUsd": 0.001 }, "tool-list": { "eventTitle": "MCP tool listing", "eventDescription": "Fee for listing available tools.", "eventPriceUsd": 0.0001 }, "tool-call": { "eventTitle": "MCP tool call", "eventDescription": "Fee for executing MCP tools.", "eventPriceUsd": 0.05 }}
src/__init__.py
1
src/__main__.py
1import asyncio2
3from .main import main4
5# Execute the Actor entry point.6asyncio.run(main())
src/const.py
1from enum import Enum2
3
4class ChargeEvents(str, Enum):5 """Event types for charging MCP operations.6
7 These events are used to charge users for different types of MCP operations8 when running as an Apify Actor. Each event corresponds to a specific operation9 that can be charged for, such as tool calls, resource access, or prompt operations.10 """11
12 ACTOR_START = 'actor-start'13 RESOURCE_LIST = 'resource-list'14 RESOURCE_READ = 'resource-read'15 PROMPT_LIST = 'prompt-list'16 PROMPT_GET = 'prompt-get'17 TOOL_LIST = 'tool-list'18 TOOL_CALL = 'tool-call'
src/event_store.py
1# Source https://github.com/modelcontextprotocol/python-sdk/blob/3978c6e1b91e8830e82d97ab3c4e3b6559972021/examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/event_store.py2"""In-memory event store for demonstrating resumability functionality.3
4This is a simple implementation intended for examples and testing,5not for production use where a persistent storage solution would be more appropriate.6"""7
8import logging9from collections import deque10from dataclasses import dataclass11from uuid import uuid412
13from mcp.server.streamable_http import (14 EventCallback,15 EventId,16 EventMessage,17 EventStore,18 StreamId,19)20from mcp.types import JSONRPCMessage21
22logger = logging.getLogger(__name__)23
24
25@dataclass26class EventEntry:27 """Represents an event entry in the event store."""28
29 event_id: EventId30 stream_id: StreamId31 message: JSONRPCMessage32
33
34class InMemoryEventStore(EventStore):35 """Simple in-memory implementation of the EventStore interface for resumability.36 This is primarily intended for examples and testing, not for production use37 where a persistent storage solution would be more appropriate.38
39 This implementation keeps only the last N events per stream for memory efficiency.40 """ # noqa: D20541
42 def __init__(self, max_events_per_stream: int = 100): # noqa: ANN20443 """Initialize the event store.44
45 Args:46 max_events_per_stream: Maximum number of events to keep per stream47 """48 self.max_events_per_stream = max_events_per_stream49 # for maintaining last N events per stream50 self.streams: dict[StreamId, deque[EventEntry]] = {}51 # event_id -> EventEntry for quick lookup52 self.event_index: dict[EventId, EventEntry] = {}53
54 async def store_event(self, stream_id: StreamId, message: JSONRPCMessage) -> EventId:55 """Stores an event with a generated event ID.""" # noqa: D40156 event_id = str(uuid4())57 event_entry = EventEntry(event_id=event_id, stream_id=stream_id, message=message)58
59 # Get or create deque for this stream60 if stream_id not in self.streams:61 self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)62
63 # If deque is full, the oldest event will be automatically removed64 # We need to remove it from the event_index as well65 if len(self.streams[stream_id]) == self.max_events_per_stream:66 oldest_event = self.streams[stream_id][0]67 self.event_index.pop(oldest_event.event_id, None)68
69 # Add new event70 self.streams[stream_id].append(event_entry)71 self.event_index[event_id] = event_entry72
73 return event_id74
75 async def replay_events_after(76 self,77 last_event_id: EventId,78 send_callback: EventCallback,79 ) -> StreamId | None:80 """Replays events that occurred after the specified event ID."""81 if last_event_id not in self.event_index:82 logger.warning(f'Event ID {last_event_id} not found in store')83 return None84
85 # Get the stream and find events after the last one86 last_event = self.event_index[last_event_id]87 stream_id = last_event.stream_id88 stream_events = self.streams.get(last_event.stream_id, deque())89
90 # Events in deque are already in chronological order91 found_last = False92 for event in stream_events:93 if found_last:94 await send_callback(EventMessage(event.message, event.event_id))95 elif event.event_id == last_event_id:96 found_last = True97
98 return stream_id
src/main.py
1"""Main entry point for the MCP Server Actor."""2
3import os4
5from apify import Actor6
7from .const import ChargeEvents8from .server import ProxyServer9
10# Actor configuration11STANDBY_MODE = os.environ.get('APIFY_META_ORIGIN') == 'STANDBY'12# Bind to all interfaces (0.0.0.0) as this is running in a containerized environment (Apify Actor)13# The container's network is isolated, so this is safe14HOST = '0.0.0.0' # noqa: S104 - Required for container networking in Apify platform15PORT = (Actor.is_at_home() and int(os.environ.get('ACTOR_STANDBY_PORT'))) or 500116
17# EDIT THIS SECTION ------------------------------------------------------------18# Configuration constants - You need to override these values. You can also pass environment variables if needed.19# 1) For stdio server type, you need to provide the command and args20from mcp.client.stdio import StdioServerParameters # noqa: E40221
22MCP_SERVER_PARAMS = StdioServerParameters(23 command='uv',24 args=['run', 'mcp-server-calculator'],25 env={'YOUR-ENV_VAR': os.getenv('YOUR-ENV-VAR') or ''}, # Optional environment variables26)27
28# 2) For SSE server type, you need to provide the url, you can also specify headers if needed with Authorization29# from .models import SseServerParameters # noqa: ERA00130#31# MCP_SERVER_PARAMS = SseServerParameters( # noqa: ERA001, RUF10032# url='https://actors-mcp-server.apify.actor/sse', # noqa: ERA00133# headers={'Authorization': 'YOUR-API-KEY'}, # Optional headers, e.g., for authentication # noqa: ERA00134# ) # noqa: ERA001, RUF10035# ------------------------------------------------------------------------------36
37
38async def main() -> None:39 """Run the MCP Server Actor.40
41 This function:42 1. Initializes the Actor43 2. Charges for Actor startup44 3. Creates and starts the proxy server45 4. Configures charging for MCP operations using Actor.charge46
47 The proxy server will charge for different MCP operations like:48 - Tool calls49 - Prompt operations50 - Resource access51 - List operations52
53 Charging events are defined in .actor/pay_per_event.json54 """55 async with Actor:56 # Initialize and charge for Actor startup57 Actor.log.info('Starting MCP Server Actor')58 await Actor.charge(ChargeEvents.ACTOR_START.value)59
60 if not STANDBY_MODE:61 msg = 'This Actor is not meant to be run directly. It should be run in standby mode.'62 Actor.log.error(msg)63 await Actor.exit(status_message=msg)64 return65
66 try:67 # Create and start the server with charging enabled68 url = os.environ.get('ACTOR_STANDBY_URL', HOST)69 Actor.log.info('Starting MCP proxy server')70 Actor.log.info(f' - proxy server host: {os.environ.get("ACTOR_STANDBY_URL", HOST)}')71 Actor.log.info(f' - proxy server port: {PORT}')72
73 Actor.log.info('Put this in your client config to use streamable HTTP transport:')74 Actor.log.info(75 f"""76 {{77 "mcpServers": {{78 "arxiv-mcp-server": {{79 "url": "{url}/mcp",80 }}81 }}82 }}83 """84 )85 Actor.log.info('Put this in your client config to use legacy SSE transport:')86 Actor.log.info(87 f"""88 {{89 "mcpServers": {{90 "arxiv-mcp-server": {{91 "url": "{url}/sse",92 }}93 }}94 }}95 """96 )97 # Pass Actor.charge to enable charging for MCP operations98 # The proxy server will use this to charge for different operations99 proxy_server = ProxyServer(MCP_SERVER_PARAMS, HOST, PORT, actor_charge_function=Actor.charge)100 await proxy_server.start()101 except Exception as e:102 Actor.log.exception(f'Server failed to start: {e}')103 await Actor.exit()104 raise
src/models.py
1from enum import Enum2from typing import Any, TypeAlias3
4import httpx5from mcp.client.stdio import StdioServerParameters6from pydantic import BaseModel, ConfigDict7
8
9class ServerType(str, Enum):10 """Type of server to connect."""11
12 STDIO = 'stdio' # Connect to a stdio server13 SSE = 'sse' # Connect to an SSE server14
15
16class SseServerParameters(BaseModel):17 """Parameters for connecting to an SSE-based MCP server.18
19 Attributes:20 url: The URL of the SSE server endpoint21 headers: Optional HTTP headers to include in the connection request22 """23
24 url: str25 headers: dict[str, Any] | None = None26 timeout: float = 5 # Default timeout for SSE connection27 sse_read_timeout: float = 60 * 5 # Default read timeout for SSE connection28 auth: httpx.Auth | None = None29 model_config = ConfigDict(arbitrary_types_allowed=True)30
31
32# Type alias for server parameters33ServerParameters: TypeAlias = StdioServerParameters | SseServerParameters
src/proxy_server.py
1"""Create an MCP server that proxies requests through an MCP client.2
3This server is created independent of any transport mechanism.4Source: https://github.com/sparfenyuk/mcp-proxy5
6The server can optionally charge for MCP operations using a provided charging function.7This is typically used in Apify Actors to charge users for different types of MCP operations8like tool calls, prompt operations, or resource access.9"""10
11from __future__ import annotations12
13import logging14from typing import TYPE_CHECKING, Any15
16from mcp import server, types17
18from .const import ChargeEvents19
20if TYPE_CHECKING:21 from collections.abc import Callable22
23 from mcp.client.session import ClientSession24
25logger = logging.getLogger('apify')26
27
28async def charge_mcp_operation(29 charge_function: Callable[[str], None] | None,30 event_name: ChargeEvents,31) -> None:32 """Charge for an MCP operation.33
34 Args:35 charge_function: Function to call for charging, or None if charging is disabled36 event_name: The type of event to charge for37 """38 if not charge_function:39 return40
41 try:42 await charge_function(event_name.value)43 logger.info(f'Charged for event {event_name.value}')44 except Exception:45 logger.exception(f'Failed to charge for event {event_name.value}')46 # Don't raise the exception - we want the operation to continue even if charging fails47
48
49async def create_proxy_server( # noqa: PLR091550 client_session: ClientSession,51 actor_charge_function: Callable[[str, dict[str, Any] | None], None] | None = None,52) -> server.Server[object]:53 """Create a server instance from a remote app.54
55 Args:56 client_session: The MCP client session to proxy requests through57 actor_charge_function: Optional function to charge for operations.58 Should accept (event_name: str, params: Optional[dict]).59 Typically, Actor.charge in Apify Actors.60 If None, no charging will occur.61 """62 logger.debug('Sending initialization request to remote MCP server...')63 response = await client_session.initialize()64 capabilities: types.ServerCapabilities = response.capabilities65
66 logger.debug('Configuring proxied MCP server...')67 app: server.Server[object] = server.Server(name=response.serverInfo.name, version=response.serverInfo.version)68
69 if capabilities.prompts:70 logger.debug('Capabilities: adding Prompts...')71
72 async def _list_prompts(_: Any) -> types.ServerResult:73 await charge_mcp_operation(actor_charge_function, ChargeEvents.PROMPT_LIST)74 result = await client_session.list_prompts()75 return types.ServerResult(result)76
77 app.request_handlers[types.ListPromptsRequest] = _list_prompts78
79 async def _get_prompt(req: types.GetPromptRequest) -> types.ServerResult:80 await charge_mcp_operation(actor_charge_function, ChargeEvents.PROMPT_GET)81 result = await client_session.get_prompt(req.params.name, req.params.arguments)82 return types.ServerResult(result)83
84 app.request_handlers[types.GetPromptRequest] = _get_prompt85
86 if capabilities.resources:87 logger.debug('Capabilities: adding Resources...')88
89 async def _list_resources(_: Any) -> types.ServerResult:90 await charge_mcp_operation(actor_charge_function, ChargeEvents.RESOURCE_LIST)91 result = await client_session.list_resources()92 return types.ServerResult(result)93
94 app.request_handlers[types.ListResourcesRequest] = _list_resources95
96 async def _list_resource_templates(_: Any) -> types.ServerResult:97 result = await client_session.list_resource_templates()98 return types.ServerResult(result)99
100 app.request_handlers[types.ListResourceTemplatesRequest] = _list_resource_templates101
102 async def _read_resource(req: types.ReadResourceRequest) -> types.ServerResult:103 await charge_mcp_operation(actor_charge_function, ChargeEvents.RESOURCE_READ)104 result = await client_session.read_resource(req.params.uri)105 return types.ServerResult(result)106
107 app.request_handlers[types.ReadResourceRequest] = _read_resource108
109 if capabilities.logging:110 logger.debug('Capabilities: adding Logging...')111
112 async def _set_logging_level(req: types.SetLevelRequest) -> types.ServerResult:113 await client_session.set_logging_level(req.params.level)114 return types.ServerResult(types.EmptyResult())115
116 app.request_handlers[types.SetLevelRequest] = _set_logging_level117
118 if capabilities.resources:119 logger.debug('Capabilities: adding Resources...')120
121 async def _subscribe_resource(req: types.SubscribeRequest) -> types.ServerResult:122 await client_session.subscribe_resource(req.params.uri)123 return types.ServerResult(types.EmptyResult())124
125 app.request_handlers[types.SubscribeRequest] = _subscribe_resource126
127 async def _unsubscribe_resource(req: types.UnsubscribeRequest) -> types.ServerResult:128 await client_session.unsubscribe_resource(req.params.uri)129 return types.ServerResult(types.EmptyResult())130
131 app.request_handlers[types.UnsubscribeRequest] = _unsubscribe_resource132
133 if capabilities.tools:134 logger.debug('Capabilities: adding Tools...')135
136 async def _list_tools(_: Any) -> types.ServerResult:137 await charge_mcp_operation(actor_charge_function, ChargeEvents.TOOL_LIST)138 tools = await client_session.list_tools()139 return types.ServerResult(tools)140
141 app.request_handlers[types.ListToolsRequest] = _list_tools142
143 async def _call_tool(req: types.CallToolRequest) -> types.ServerResult:144 await charge_mcp_operation(actor_charge_function, ChargeEvents.TOOL_CALL)145 try:146 result = await client_session.call_tool(req.params.name, (req.params.arguments or {}))147 return types.ServerResult(result)148 except Exception as e:149 return types.ServerResult(150 types.CallToolResult(content=[types.TextContent(type='text', text=str(e))], isError=True),151 )152
153 app.request_handlers[types.CallToolRequest] = _call_tool154
155 async def _send_progress_notification(req: types.ProgressNotification) -> None:156 await client_session.send_progress_notification(157 req.params.progressToken,158 req.params.progress,159 req.params.total,160 )161
162 app.notification_handlers[types.ProgressNotification] = _send_progress_notification163
164 async def _complete(req: types.CompleteRequest) -> types.ServerResult:165 result = await client_session.complete(166 req.params.ref,167 req.params.argument.model_dump(),168 )169 return types.ServerResult(result)170
171 app.request_handlers[types.CompleteRequest] = _complete172
173 return app
src/server.py
1"""Module implementing an MCP server that can be used to connect to stdio or SSE based MCP servers.2
3Heavily inspired by: https://github.com/sparfenyuk/mcp-proxy4"""5
6from __future__ import annotations7
8import contextlib9import logging10from typing import TYPE_CHECKING, Any11
12import uvicorn13from mcp.client.session import ClientSession14from mcp.client.sse import sse_client15from mcp.client.stdio import StdioServerParameters, stdio_client16from mcp.server.sse import SseServerTransport17from mcp.server.streamable_http_manager import StreamableHTTPSessionManager18from starlette.applications import Starlette19from starlette.requests import Request20from starlette.responses import JSONResponse, Response21from starlette.routing import Mount, Route22
23from .event_store import InMemoryEventStore24from .models import ServerParameters, ServerType, SseServerParameters25from .proxy_server import create_proxy_server26
27if TYPE_CHECKING:28 from collections.abc import AsyncIterator, Callable29
30 from mcp.server import Server31 from starlette import types as st32 from starlette.requests import Request33 from starlette.types import Receive, Scope, Send34
35logger = logging.getLogger('apify')36
37
38class ProxyServer:39 """Main class implementing the proxy functionality using MCP SDK.40
41 This proxy runs a Starlette app that exposes /sse and /messages/ endpoints for legacy SSE transport,42 and a /mcp endpoint for streamable HTTP transport.43 It then connects to stdio or SSE based MCP servers and forwards the messages to the client.44
45 The server can optionally charge for operations using a provided charging function.46 This is typically used in Apify Actors to charge users for MCP operations.47 The charging function should accept an event name and optional parameters.48 """49
50 def __init__(51 self,52 config: ServerParameters,53 host: str,54 port: int,55 actor_charge_function: Callable[[str, int], None] | None = None,56 ) -> None:57 """Initialize the proxy server.58
59 Args:60 config: Server configuration (stdio or SSE parameters)61 host: Host to bind the server to62 port: Port to bind the server to63 actor_charge_function: Optional function to charge for operations.64 Should accept (event_name: str, count: int).65 Typically, Actor.charge in Apify Actors.66 If None, no charging will occur.67 """68 self.server_type = ServerType.STDIO if isinstance(config, StdioServerParameters) else ServerType.SSE69 self.config = self._validate_config(self.server_type, config)70 self.path_sse: str = '/sse'71 self.path_message: str = '/message'72 self.host: str = host73 self.port: int = port74 self.actor_charge_function = actor_charge_function75
76 @staticmethod77 def _validate_config(client_type: ServerType, config: ServerParameters) -> ServerParameters:78 """Validate and return the appropriate server parameters."""79
80 def validate_and_return() -> ServerParameters:81 if client_type == ServerType.STDIO:82 return StdioServerParameters.model_validate(config)83 if client_type == ServerType.SSE:84 return SseServerParameters.model_validate(config)85 raise ValueError(f'Invalid client type: {client_type}')86
87 try:88 return validate_and_return()89 except Exception as e:90 raise ValueError(f'Invalid server configuration: {e}') from e91
92 @staticmethod93 async def create_starlette_app(mcp_server: Server) -> Starlette:94 """Create a Starlette app (SSE server) that exposes /sse and /messages/ endpoints."""95 transport = SseServerTransport('/messages/')96 event_store = InMemoryEventStore()97 session_manager = StreamableHTTPSessionManager(98 app=mcp_server,99 event_store=event_store, # Enable resumability100 json_response=False,101 )102
103 @contextlib.asynccontextmanager104 async def lifespan(_app: Starlette) -> AsyncIterator[None]:105 """Context manager for managing session manager lifecycle."""106 async with session_manager.run():107 logger.info('Application started with StreamableHTTP session manager!')108 try:109 yield110 finally:111 logger.info('Application shutting down...')112
113 async def handle_root(request: Request) -> st.Response:114 """Handle root endpoint."""115 # Handle Apify standby readiness probe116 if 'x-apify-container-server-readiness-probe' in request.headers:117 return Response(118 content=b'ok',119 media_type='text/plain',120 status_code=200,121 )122
123 return JSONResponse(124 {125 'status': 'running',126 'type': 'mcp-server',127 'transport': 'sse+streamable-http',128 'endpoints': {129 'sse': '/sse',130 'messages': '/messages/',131 'streamableHttp': '/mcp',132 },133 }134 )135
136 async def handle_sse(request: st.Request) -> st.Response | None:137 """Handle incoming SSE requests."""138 try:139 async with transport.connect_sse(request.scope, request.receive, request._send) as streams: # noqa: SLF001140 init_options = mcp_server.create_initialization_options()141 await mcp_server.run(streams[0], streams[1], init_options)142 except Exception as e:143 logger.exception('Error in SSE connection')144 return Response(status_code=500, content=str(e))145 finally:146 logger.info('SSE connection closed')147 # Add Response to prevent the None type error148 return Response(status_code=204) # No content response149
150 # ASGI handler for streamable HTTP connections151 async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:152 await session_manager.handle_request(scope, receive, send)153
154 app = Starlette(155 debug=True,156 routes=[157 Route('/', endpoint=handle_root),158 Route('/sse', endpoint=handle_sse, methods=['GET']),159 Mount('/messages/', app=transport.handle_post_message),160 Mount('/mcp/', app=handle_streamable_http),161 ],162 lifespan=lifespan,163 )164
165 # Add middleware to rewrite /mcp to /mcp/ to ensure consistent path handling.166 # This is necessary so that Starlette does not return a 307 Temporary Redirect on the /mcp path,167 # which would otherwise trigger the OAuth flow when the MCP server is deployed on the Apify platform.168 @app.middleware('http')169 async def rewrite_mcp(request: Request, call_next: Callable): # noqa: ANN202170 """Middleware to rewrite /mcp to /mcp/."""171 if request.url.path == '/mcp':172 request.scope['path'] = '/mcp/'173 request.scope['raw_path'] = b'/mcp/'174 return await call_next(request)175
176 return app177
178 async def _run_server(self, app: Starlette) -> None:179 """Run the Starlette app with uvicorn."""180 config_ = uvicorn.Config(181 app,182 host=self.host,183 port=self.port,184 log_level='info',185 access_log=True,186 )187 server = uvicorn.Server(config_)188 await server.serve()189
190 async def _initialize_and_run_server(self, client_session_factory: Any, **client_params: dict) -> None:191 """Initialize and run the server."""192 async with client_session_factory(**client_params) as streams, ClientSession(*streams) as session:193 mcp_server = await create_proxy_server(session, self.actor_charge_function)194 app = await self.create_starlette_app(mcp_server)195 await self._run_server(app)196
197 async def start(self) -> None:198 """Start Starlette app (SSE server) and connect to stdio or SSE based MCP server."""199 logger.info(f'Starting MCP server with client type: {self.server_type} and config {self.config}')200
201 if self.server_type == ServerType.STDIO:202 logger.info(f'Starting and connecting to stdio based MCP server with config {self.config}')203 await self._initialize_and_run_server(stdio_client, server=self.config)204 elif self.server_type == ServerType.SSE:205 logger.info(f'Connecting to SSE based MCP server with config {self.config}')206 params = self.config.model_dump(exclude_unset=True)207 await self._initialize_and_run_server(sse_client, **params)