This is a comprehensive high-level and low-level design for your MCP-based SaaS B2B platform using the technologies you've specified. Let's break this down with diagrams and detailed explanations.
graph TB
subgraph "Client Layer"
WA[Web Application\nNextJS + TypeScript]
MC[Mobile Clients]
TC[Third-party Clients]
end
subgraph "API Gateway Layer"
AG[API Gateway\nNginx/Kong]
end
subgraph "Application Layer"
API[REST API Service\nFastAPI]
MCPS[MCP Server]
WS[WebSocket Service\nStarlette]
SSE[SSE Handler\nFastAPI]
end
subgraph "Service Layer"
AS[Authentication Service]
BS[Billing Service]
TS[Tenant Management]
LMS[LLM Management Service]
AAS[Analytics Service]
end
subgraph "Data Layer"
KAFKA[Kafka Message Queue]
REDIS[Redis Cache]
DB[(PostgreSQL Database)]
VDB[(Vector Database)]
end
subgraph "LLM Layer"
LLM[Large Language Models]
end
WA --> AG
MC --> AG
TC --> AG
AG --> API
AG --> SSE
AG --> WS
API --> MCPS
SSE --> MCPS
WS --> MCPS
API --> AS
API --> BS
API --> TS
API --> LMS
API --> AAS
MCPS --> LMS
LMS --> LLM
API --> KAFKA
API --> REDIS
API --> DB
LMS --> VDB
KAFKA --> AAS
sequenceDiagram
participant Client as Client Application
participant API as API Gateway
participant Auth as Authentication Service
participant MCP as MCP Server
participant LLM as LLM Service
participant Redis as Redis Cache
participant Kafka as Kafka
Client->>API: Request with tenant credentials
API->>Auth: Authenticate request
Auth-->>API: Authentication result
alt is authenticated
API->>MCP: Forward client request
MCP->>Redis: Check for cached responses
alt Cache hit
Redis-->>MCP: Return cached response
MCP-->>API: Forward cached response
API-->>Client: Return response
else Cache miss
MCP->>LLM: Process request via MCP protocol
loop Stream response
LLM-->>MCP: Stream chunks of response
MCP-->>API: Forward response chunks
API-->>Client: Stream response via SSE
MCP->>Kafka: Log interaction events
end
MCP->>Redis: Cache final response
end
else not authenticated
API-->>Client: Return authentication error
end
classDiagram
class MCPServer {
-config: ServerConfig
-llmService: LLMService
-redisClient: RedisClient
-kafkaProducer: KafkaProducer
+handleRequest(request: MCPRequest): MCPResponse
+streamResponse(request: MCPRequest): Stream
+initialize()
+shutdown()
}
class APIService {
-mcpServer: MCPServer
-authService: AuthService
-billingService: BillingService
-tenantService: TenantService
+registerRoutes()
+handleAPIRequest(request: Request): Response
+streamSSE(request: Request): EventStream
+handleWebSocket(connection: WebSocket)
}
class AuthService {
-userRepo: UserRepository
-jwtUtil: JWTUtil
+authenticate(credentials: Credentials): AuthResult
+validateToken(token: string): TokenValidation
+generateToken(user: User): string
}
class TenantService {
-tenantRepo: TenantRepository
-configRepo: ConfigRepository
+getTenant(id: string): Tenant
+getAPIKeys(tenantId: string): APIKey[]
+updateConfig(tenantId: string, config: TenantConfig)
+validateAPIKey(key: string): ValidationResult
}
class LLMService {
-modelRepo: ModelRepository
-vectorDB: VectorDatabase
-configManager: ConfigManager
+processQuery(query: string, modelConfig: ModelConfig): Response
+streamResponse(query: string, modelConfig: ModelConfig): Stream
+embedText(text: string): Vector
}
class BillingService {
-billingRepo: BillingRepository
-paymentProcessor: PaymentProcessor
+calculateUsage(tenantId: string, usage: Usage): Cost
+processPayment(tenantId: string, amount: number)
+generateInvoice(tenantId: string, period: Period): Invoice
}
class AnalyticsService {
-eventConsumer: KafkaConsumer
-analyticsRepo: AnalyticsRepository
+processEvents()
+generateReport(tenantId: string, period: Period): Report
+trackUsage(tenantId: string, event: Event)
}
APIService --> MCPServer
APIService --> AuthService
APIService --> BillingService
APIService --> TenantService
MCPServer --> LLMService
AnalyticsService --> MCPServer
graph TB
subgraph "Load Balancer"
LB[Nginx/HAProxy Load Balancer]
end
subgraph "Web Tier"
WS1[Web Server 1\nNextJS + TypeScript]
WS2[Web Server 2\nNextJS + TypeScript]
end
subgraph "API Tier"
AS1[API Server 1\nFastAPI]
AS2[API Server 2\nFastAPI]
end
subgraph "MCP Tier"
MCP1[MCP Server 1]
MCP2[MCP Server 2]
end
subgraph "Cache Tier"
RC1[Redis Cache Primary]
RC2[Redis Cache Replica]
end
subgraph "Message Queue"
KF1[Kafka Broker 1]
KF2[Kafka Broker 2]
KF3[Kafka Broker 3]
ZK[ZooKeeper Cluster]
end
subgraph "Database Tier"
PG1[(PostgreSQL Primary)]
PG2[(PostgreSQL Standby)]
VDB[(Vector Database)]
end
subgraph "Monitoring & Logging"
PROM[Prometheus]
GRAF[Grafana]
ELK[ELK Stack]
end
LB --> WS1
LB --> WS2
WS1 --> AS1
WS1 --> AS2
WS2 --> AS1
WS2 --> AS2
AS1 --> MCP1
AS1 --> MCP2
AS2 --> MCP1
AS2 --> MCP2
AS1 --> RC1
AS2 --> RC1
RC1 --- RC2
AS1 --> KF1
AS1 --> KF2
AS1 --> KF3
AS2 --> KF1
AS2 --> KF2
AS2 --> KF3
KF1 --- ZK
KF2 --- ZK
KF3 --- ZK
AS1 --> PG1
AS2 --> PG1
PG1 --- PG2
MCP1 --> VDB
MCP2 --> VDB
AS1 --> PROM
AS2 --> PROM
MCP1 --> PROM
MCP2 --> PROM
PROM --> GRAF
AS1 --> ELK
AS2 --> ELK
MCP1 --> ELK
MCP2 --> ELK
Based on requirements, I've designed a scalable SaaS B2B platform that leverages MCP (Model Context Protocol) to provide LLM services to third-party clients. Here's a detailed breakdown of the architecture:
-
Client Layer
- NextJS and TypeScript frontend for your SaaS dashboard
- Third-party client applications that will connect to your MCP server
-
API Gateway Layer
- Handles routing, authentication, and load balancing
- Supports both HTTP and WebSocket connections
-
Application Layer
- FastAPI for REST endpoints
- MCP Server implementation
- SSE (Server-Sent Events) handlers for streaming responses
- WebSocket service for bidirectional communication
-
Service Layer
- Authentication and authorization
- Billing and usage tracking
- Tenant management (multi-tenancy support)
- LLM management service
- Analytics service
-
Data Layer
- PostgreSQL for relational data
- Redis for caching and session management
- Kafka for event streaming and asynchronous processing
- Vector database for embeddings and semantic search
-
LLM Layer
- Integration with various LLM models
The MCP Server will be implemented in Python using FastAPI as the web framework. It will support:
-
Communication Protocols
- HTTP with SSE for streaming responses
- Standard I/O for local communication
- WebSockets for bidirectional communication
-
Key Components
- Request handler
- Response streamer
- Context manager
- Model integrator
- Caching layer
- Authentication middleware
The API service will be built with FastAPI and will provide:-
-
Endpoints for Client Management
- Tenant registration and management
- API key generation and validation
- Usage reporting and billing
-
MCP Protocol Endpoints
- HTTP endpoints for MCP requests
- SSE endpoints for streaming responses
- WebSocket endpoints for bidirectional communication
The system is designed with multi-tenancy in mind:
-
Tenant Isolation
- Separate configurations per tenant
- Isolated billing and usage tracking
- Customizable LLM settings per tenant
-
Tenant Management
- Self-service tenant registration
- API key management
- Configuration management
The sequence diagram illustrates the typical flow of a request through the system:-
- Client makes a request with tenant credentials
- System authenticates and authorizes the request
- MCP server processes the request, potentially using cached responses
- For uncached requests, the MCP server communicates with the LLM
- Responses are streamed back to the client via SSE
- Interaction events are logged to Kafka for analytics
// pages/dashboard.tsx
import { useState, useEffect } from 'react';
import { TenantConfig, APIKey, Usage } from '../types';
const Dashboard = () => {
const [config, setConfig] = useState<TenantConfig | null>(null);
const [apiKeys, setApiKeys] = useState<APIKey[]>([]);
const [usage, setUsage] = useState<Usage | null>(null);
useEffect(() => {
// Fetch tenant configuration, API keys, and usage data
fetchTenantData();
}, []);
const fetchTenantData = async () => {
try {
const [configData, keysData, usageData] = await Promise.all([
fetch('/api/tenant/config').then(res => res.json()),
fetch('/api/tenant/keys').then(res => res.json()),
fetch('/api/tenant/usage').then(res => res.json())
]);
setConfig(configData);
setApiKeys(keysData);
setUsage(usageData);
} catch (error) {
console.error('Failed to fetch tenant data:', error);
}
};
// Rest of the component...
};
export default Dashboard;
# mcp_server.py
from fastapi import FastAPI, Depends, HTTPException, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
import asyncio
import redis
import json
from typing import Dict, Any, AsyncGenerator
from .services.auth import authenticate_tenant
from .services.llm import LLMService
from .services.tenant import TenantService
from .services.billing import BillingService
app = FastAPI(title="MCP Server API")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify actual origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Services initialization
redis_client = redis.Redis(host="redis", port=6379, db=0)
llm_service = LLMService()
tenant_service = TenantService()
billing_service = BillingService()
@app.post("/api/mcp/query")
async def query_mcp(
request: Dict[str, Any],
tenant = Depends(authenticate_tenant)
):
"""
Handle synchronous MCP queries
"""
# Check tenant quotas and billing status
if not billing_service.check_quota(tenant.id):
raise HTTPException(status_code=402, detail="Usage quota exceeded")
# Process the request through MCP
try:
response = await llm_service.process_query(
query=request["query"],
model_config=tenant.model_config,
context=request.get("context", {})
)
# Log usage for billing
billing_service.log_usage(
tenant_id=tenant.id,
tokens_used=response.get("usage", {}).get("total_tokens", 0)
)
return response
except Exception as e:
# Log the error
print(f"Error processing query: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/mcp/stream")
async def stream_mcp(
query: str,
tenant = Depends(authenticate_tenant)
):
"""
Stream MCP responses using SSE
"""
# Check tenant quotas and billing status
if not billing_service.check_quota(tenant.id):
raise HTTPException(status_code=402, detail="Usage quota exceeded")
async def event_generator():
try:
tokens_used = 0
async for chunk in llm_service.stream_response(
query=query,
model_config=tenant.model_config
):
yield json.dumps(chunk)
tokens_used += len(chunk.get("content", "").split())
# Log usage for billing after streaming completes
billing_service.log_usage(
tenant_id=tenant.id,
tokens_used=tokens_used
)
except Exception as e:
# Log the error
print(f"Error streaming response: {str(e)}")
yield json.dumps({"error": str(e)})
return EventSourceResponse(event_generator())
@app.websocket("/api/mcp/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
Handle WebSocket connections for bidirectional MCP communication
"""
await websocket.accept()
try:
# Authenticate tenant
auth_message = await websocket.receive_json()
tenant = tenant_service.authenticate_ws(auth_message)
if not tenant:
await websocket.send_json({"error": "Authentication failed"})
await websocket.close()
return
# Process messages
while True:
message = await websocket.receive_json()
if not billing_service.check_quota(tenant.id):
await websocket.send_json({"error": "Usage quota exceeded"})
continue
# Process through MCP
try:
if message.get("stream", False):
# Stream response
tokens_used = 0
async for chunk in llm_service.stream_response(
query=message["query"],
model_config=tenant.model_config
):
await websocket.send_json(chunk)
tokens_used += len(chunk.get("content", "").split())
# Log usage
billing_service.log_usage(
tenant_id=tenant.id,
tokens_used=tokens_used
)
else:
# Synchronous response
response = await llm_service.process_query(
query=message["query"],
model_config=tenant.model_config,
context=message.get("context", {})
)
# Log usage
billing_service.log_usage(
tenant_id=tenant.id,
tokens_used=response.get("usage", {}).get("total_tokens", 0)
)
await websocket.send_json(response)
except Exception as e:
# Log the error
print(f"Error processing WebSocket message: {str(e)}")
await websocket.send_json({"error": str(e)})
except Exception as e:
# Log the error
print(f"WebSocket error: {str(e)}")
finally:
await websocket.close()
# services/llm.py
import asyncio
from typing import Dict, Any, AsyncGenerator, Optional
import json
import os
from mcp.server import MCPServer # Hypothetical MCP library
from mcp.types import MCPRequest, MCPResponse
class LLMService:
def __init__(self):
"""
Initialize the LLM service with MCP server
"""
self.mcp_server = MCPServer(
config={
"models": {
"default": os.getenv("DEFAULT_MODEL", "gpt-4o"),
"available": ["gpt-4o", "llama-3", "claude-3"]
},
"cache_enabled": True,
"timeout": 60
}
)
self.mcp_server.initialize()
async def process_query(
self,
query: str,
model_config: Dict[str, Any],
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Process a query through the MCP server
"""
request = MCPRequest(
query=query,
model=model_config.get("model", "default"),
temperature=model_config.get("temperature", 0.7),
max_tokens=model_config.get("max_tokens", 1000),
context=context or {}
)
response = await self.mcp_server.handle_request(request)
return response.to_dict()
async def stream_response(
self,
query: str,
model_config: Dict[str, Any]
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Stream a response from the MCP server
"""
request = MCPRequest(
query=query,
model=model_config.get("model", "default"),
temperature=model_config.get("temperature", 0.7),
max_tokens=model_config.get("max_tokens", 1000),
stream=True
)
async for chunk in self.mcp_server.stream_response(request):
yield chunk.to_dict()
The deployment architecture is designed for scalability and reliability:
-
Kubernetes-Based Deployment
- Containerized microservices
- Horizontal scaling for API and MCP servers
- Auto-scaling based on load
-
High Availability
- Multiple replicas for each service
- Redis cluster for caching
- PostgreSQL with standby replicas
- Kafka cluster with multiple brokers
-
Infrastructure as Code
- Terraform for infrastructure provisioning
- Helm charts for Kubernetes deployments
- CI/CD pipeline integration
-
Phase 1: Core MCP Server Implementation
- Implement basic MCP protocol support
- Set up FastAPI backend
- Implement authentication and tenant management
-
Phase 2: Frontend Dashboard
- Develop NextJS admin dashboard
- Implement tenant self-service features
- Create usage reporting and visualization
-
Phase 3: Advanced Features
- Implement billing and subscription management
- Add analytics and reporting
- Enhance scalability and performance
-
Phase 4: Third-Party Integrations
- Develop SDKs for common programming languages
- Create documentation and examples
- Build integration templates
// Example TypeScript integration code
import { MCPClient } from '@your-company/mcp-client-sdk';
// Initialize the client
const mcpClient = new MCPClient({
apiKey: 'your-api-key',
serverUrl: 'https://api.your-mcp-service.com',
defaultModel: 'gpt-4o'
});
// Synchronous query
async function askQuestion(question: string) {
try {
const response = await mcpClient.query({
text: question,
context: { previousMessages: [] }
});
console.log('Response:', response.content);
return response;
} catch (error) {
console.error('Error querying MCP server:', error);
throw error;
}
}
// Streaming query
function streamingQuestion(question: string, onChunk: (chunk: string) => void) {
const stream = mcpClient.streamQuery({
text: question,
context: { previousMessages: [] }
});
stream.on('data', (chunk) => {
onChunk(chunk.content);
});
stream.on('error', (error) => {
console.error('Stream error:', error);
});
stream.on('end', () => {
console.log('Stream completed');
});
return {
cancel: () => stream.cancel()
};
}
classDiagram
class ThirdPartyWebsite {
-apiKey: string
-endpointURL: string
+initialize()
+sendQuery(query: string, context: object): Promise~Response~
+streamQuery(query: string, context: object): EventSource
+handleResponse(response: Response)
+handleError(error: Error)
}
class MCPClientSDK {
-config: ClientConfig
-authToken: string
-connectionType: enum
+connect(serverURL: string, apiKey: string): Connection
+sendRequest(request: MCPRequest): Promise~MCPResponse~
+streamRequest(request: MCPRequest): Stream
+disconnect()
}
class APIGateway {
-routeTable: Map
-authService: AuthService
-rateLimit: RateLimitConfig
+routeRequest(request: HTTPRequest): HTTPResponse
+validateAPIKey(apiKey: string): boolean
+enforceRateLimit(clientID: string): boolean
}
class MCPServer {
-serverConfig: ServerConfig
-connectedClients: Map
-supportedProtocols: string[]
+initialize()
+handleConnection(conn: Connection)
+processRequest(request: MCPRequest): MCPResponse
+streamResponse(request: MCPRequest): Stream
+validateRequest(request: MCPRequest): ValidationResult
+shutdown()
}
class TenantManager {
5E27
-tenants: Map
-tenantConfigs: Map
+getTenant(id: string): Tenant
+validateTenant(tenantID: string, apiKey: string): boolean
+updateTenantConfig(tenantID: string, config: TenantConfig)
+getTenantUsage(tenantID: string): UsageStats
}
class LLMController {
-modelRegistry: Map
-contextManager: ContextManager
-defaultConfig: ModelConfig
+loadModel(modelID: string): Model
+processPrompt(prompt: string, context: object): Response
+streamResponse(prompt: string, context: object): Stream
+manageContext(sessionID: string, context: object)
}
class Model {
-modelID: string
-parameters: ModelParameters
-capabilities: string[]
+initialize()
+inference(input: string, params: InferenceParams): Response
+streamInference(input: string, params: InferenceParams): Stream
+getEmbedding(text: string): Vector
}
ThirdPartyWebsite --> MCPClientSDK: uses
MCPClientSDK --> APIGateway: connects to
APIGateway --> MCPServer: routes to
MCPServer --> TenantManager: validates with
MCPServer --> LLMController: forwards request to
LLMController --> Model: runs inference on
note for ThirdPartyWebsite "Third-party websites integrate\nusing REST API, WebSockets,\nor SDK libraries"
note for MCPClientSDK "Available in multiple languages:\nTypeScript, Python, Java, etc."
note for MCPServer "Supports HTTP, WebSockets,\nand stdio communication"
note for LLMController "Manages model selection,\ncontext handling,\nand response formatting"