8000 fix: sse new ctx by akitaSummer · Pull Request #323 · eggjs/tegg · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix: sse new ctx #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/controller/lib/impl/mcp/MCPControllerRegister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,17 @@ export class MCPControllerRegister implements ControllerRegister {
req.method = 'POST';
req.url = self.mcpConfig.sseInitPath;
req.headers = {
...ctx.req.headers,
accept: 'application/json, text/event-stream',
'content-type': 'application/json',
};
const newCtx = self.app.createContext(req, res) as unknown as Context;
await ctx.app.ctxStorage.run(newCtx, async () => {
if (MCPControllerRegister.hooks.length > 0) {
for (const hook of MCPControllerRegister.hooks) {
await hook.preHandle?.(newCtx);
}
}
await mw(newCtx, async () => {
messageFunc!(...args);
if (isJSONRPCRequest(args[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
ContextProto,
ToolArgsSchema,
} from '@eggjs/tegg';
import z from 'zod';

Check warning on line 17 in plugin/controller/test/fixtures/apps/mcp-app/app/controller/McpController.ts

View workflow job for this annotation

GitHub Actions / Runner-ubuntu (18)

Using exported name 'z' as identifier for default import

Check warning on line 17 in plugin/controller/test/fixtures/apps/mcp-app/app/controller/McpController.ts

View workflow job for this annotation

GitHub Actions / Runner-macos (18)

Using exported name 'z' as identifier for default import

Check warning on line 17 in plugin/controller/test/fixtures/apps/mcp-app/app/controller/McpController.ts

View workflow job for this annotation

GitHub Actions / Runner-macos (20)

Using exported name 'z' as identifier for default import

Check warning on line 17 in plugin/controller/test/fixtures/apps/mcp-app/app/controller/McpController.ts

View workflow job for this annotation

GitHub Actions / Runner-ubuntu (20)

Using exported name 'z' as identifier for default import

Check warning on line 17 in plugin/controller/test/fixtures/apps/mcp-app/app/controller/McpController.ts

View workflow job for this annotation

GitHub Actions / Runner-macos (16)

Using exported name 'z' as identifier for default import

Check warning on line 17 in plugin/controller/test/fixtures/apps/mcp-app/app/controller/McpController.ts

View workflow job for this annotation

GitHub Actions / Runner-ubuntu (22)

Using exported name 'z' as identifier for default import

Check warning on line 17 in plugin/controller/test/fixtures/apps/mcp-app/app/controller/McpController.ts

View workflow job for this annotation

GitHub Actions / Runner-ubuntu (16)

Using exported name 'z' as identifier for default import

export const PromptType = {
name: z.string(),
Expand Down Expand Up @@ -46,6 +46,9 @@
@Inject()
commonService: CommonService;

@Inject()
user: any;

@MCPPrompt()
async foo(@PromptArgsSchema(PromptType) args: PromptArgs<typeof PromptType>): Promise<MCPPromptResponse> {
this.logger.info('hello world');
Expand Down Expand Up @@ -75,6 +78,18 @@
};
}

@MCPTool()
async echoUser(): Promise<MCPToolResponse> {
return {
content: [
{
type: 'text',
text: `hello ${this.user}`,
},
],
};
}


@MCPResource({
template: [
Expand Down
8 changes: 8 additions & 0 deletions plugin/controller/test/fixtures/apps/mcp-app/config/plugin.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const path = require('path');

exports.tracer = {
package: 'egg-tracer',
Expand All @@ -24,3 +26,9 @@ exports.mcpProxy = {
package: '@eggjs/mcp-proxy',
enable: true,
};


exports.hookPlugin = {
path: path.join(__dirname, '../hook-plugin'),
enable: true,
};
15 changes: 15 additions & 0 deletions plugin/controller/test/fixtures/apps/mcp-app/hook-plugin/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { Application } from 'egg';
import { MCPControllerRegister } from '@eggjs/tegg-controller-plugin/lib/impl/mcp/MCPControllerRegister';
import { GetAlipayTeggHook } from './lib/MCPControllerHook';

export default class ControllerAppBootHook {
#app: Application;

constructor(app: Application) {
this.#app = app;
}

configWillLoad() {
MCPControllerRegister.addHook(GetAlipayTeggHook(this.#app));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import type { Application, Context } from 'egg';
import { MCPControllerHook } from '@eggjs/tegg-controller-plugin/lib/impl/mcp/MCPControllerRegister';

export const GetAlipayTeggHook = (app: Application) => {
const setUser = (ctx: Context) => {
ctx.set({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
'transfer-encoding': 'chunked',
});
try {
const auth = ctx.get('authorization');
const atitString = Buffer.from(
auth.substring('Bearer '.length),
'base64',
).toString('utf8');
ctx.user = atitString;
} catch (e) {
app.logger.warn('get user failed: ', e);
}
};
const AlipayTeggControllerHook: MCPControllerHook = {
async preHandle(ctx) {
setUser(ctx);
},
async preHandleInitHandle(ctx) {
setUser(ctx);
},
async preSSEInitHandle(ctx) {
setUser(ctx);
},
async preProxy(ctx) {
setUser(ctx);
},
};

return AlipayTeggControllerHook;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "@eggjs/hook-plugin",
"eggPlugin": {
"name": "hookPlugin"
}
}
109 changes: 106 additions & 3 deletions plugin/controller/test/mcp/mcp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,29 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
});
const baseUrl = await app.httpRequest()
.get('/mcp/init').url;
const sseTransport = new SSEClientTransport(new URL(baseUrl));
const sseTransport = new SSEClientTransport(
new URL(baseUrl),
{
authProvider: {
get redirectUrl() { return 'http://localhost/callback'; },
get clientMetadata() { return { redirect_uris: [ 'http://localhost/callback' ] }; },
clientInformation: () => ({ client_id: 'test-client-id', client_secret: 'test-client-secret' }),
tokens: () => {
return {
access_token: Buffer.from('akita').toString('base64'),
token_type: 'Bearer',
};
},
// eslint-disable-next-line @typescript-eslint/no-empty-function
saveTokens: () => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
redirectToAuthorization: () => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
saveCodeVerifier: () => {},
codeVerifier: () => '',
},
},
);
const sseNotifications: { level: string, data: string }[] = [];
sseClient.setNotificationHandler(LoggingMessageNotificationSchema, notification => {
sseNotifications.push({ level: notification.params.level, data: notification.params.data as string });
Expand All @@ -107,6 +129,10 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
description: undefined,
name: 'bar',
},
{
description: undefined,
name: 'echoUser',
},
]);

const toolRes = await sseClient.callTool({
Expand All @@ -118,6 +144,14 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
assert.deepEqual(toolRes, {
content: [{ type: 'text', text: 'npm package: aaa not found' }],
});

const userRes = await sseClient.callTool({
name: 'echoUser',
arguments: {},
});
assert.deepEqual(userRes, {
content: [{ type: 'text', text: 'hello akita' }],
});
// notification
const notificationResp = await startNotificationTool(sseClient);
await new Promise(resolve => setTimeout(resolve, 5000));
Expand Down Expand Up @@ -181,7 +215,30 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
});
const baseUrl = await app.httpRequest()
.post('/mcp/stream').url;
const streamableTransport = new StreamableHTTPClientTransport(new URL(baseUrl), { requestInit: { headers: { 'custom-session-id': 'custom-session-id' } } });
const streamableTransport = new StreamableHTTPClientTransport(
new URL(baseUrl),
{
authProvider: {
get redirectUrl() { return 'http://localhost/callback'; },
get clientMetadata() { return { redirect_uris: [ 'http://localhost/callback' ] }; },
clientInformation: () => ({ client_id: 'test-client-id', client_secret: 'test-client-secret' }),
tokens: () => {
return {
access_token: Buffer.from('akita').toString('base64'),
token_type: 'Bearer',
};
},
// eslint-disable-next-line @typescript-eslint/no-empty-function
saveTokens: () => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
redirectToAuthorization: () => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
saveCodeVerifier: () => {},
codeVerifier: () => '',
},
requestInit: { headers: { 'custom-session-id': 'custom-session-id' } },
},
);
const streamableNotifications: { level: string, data: string }[] = [];
streamableClient.setNotificationHandler(LoggingMessageNotificationSchema, notification => {
streamableNotifications.push({ level: notification.params.level, data: notification.params.data as string });
Expand All @@ -199,6 +256,10 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
description: undefined,
name: 'bar',
},
{
description: undefined,
name: 'echoUser',
},
]);

const toolRes = await streamableClient.callTool({
Expand All @@ -210,6 +271,14 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
assert.deepEqual(toolRes, {
content: [{ type: 'text', text: 'npm package: aaa not found' }],
});

const userRes = await streamableClient.callTool({
name: 'echoUser',
arguments: {},
});
assert.deepEqual(userRes, {
content: [{ type: 'text', text: 'hello akita' }],
});
// notification
const notificationResp = await startNotificationTool(streamableClient);
await new Promise(resolve => setTimeout(resolve, 5000));
Expand Down Expand Up @@ -279,7 +348,29 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
});
const baseUrl = await app.httpRequest()
.post('/mcp/stateless/stream').url;
const streamableTransport = new StreamableHTTPClientTransport(new URL(baseUrl));
const streamableTransport = new StreamableHTTPClientTransport(
new URL(baseUrl),
{
authProvider: {
get redirectUrl() { return 'http://localhost/callback'; },
get clientMetadata() { return { redirect_uris: [ 'http://localhost/callback' ] }; },
clientInformation: () => ({ client_id: 'test-client-id', client_secret: 'test-client-secret' }),
tokens: () => {
return {
access_token: Buffer.from('akita').toString('base64'),
token_type: 'Bearer',
};
},
// eslint-disable-next-line @typescript-eslint/no-empty-function
saveTokens: () => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
redirectToAuthorization: () => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
saveCodeVerifier: () => {},
codeVerifier: () => '',
},
},
);
const streamableNotifications: { level: string, data: string }[] = [];
streamableClient.setNotificationHandler(LoggingMessageNotificationSchema, notification => {
streamableNotifications.push({ level: notification.params.level, data: notification.params.data as string });
Expand All @@ -296,6 +387,10 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
description: undefined,
name: 'bar',
},
{
description: undefined,
name: 'echoUser',
},
]);

const toolRes = await streamableClient.callTool({
Expand All @@ -307,6 +402,14 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
assert.deepEqual(toolRes, {
content: [{ type: 'text', text: 'npm package: aaa not found' }],
});

const userRes = await streamableClient.callTool({
name: 'echoUser',
arguments: {},
});
assert.deepEqual(userRes, {
content: [{ type: 'text', text: 'hello akita' }],
});
// notification
const notificationResp = await startNotificationTool(streamableClient);
await new Promise(resolve => setTimeout(resolve, 5000));
Expand Down
Loading
Loading
0