优化
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { reactive, ref } from 'vue'
|
||||
import { reactive } from 'vue'
|
||||
import type { IEventBuilder } from '@/events/IEventBuilder'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
|
||||
@@ -10,7 +10,7 @@ export enum MessageType {
|
||||
APPLICATION = 'application',
|
||||
USER_INTERACTION = 'user_interaction',
|
||||
CROSS_APP = 'cross_app',
|
||||
BROADCAST = 'broadcast'
|
||||
BROADCAST = 'broadcast',
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -20,7 +20,7 @@ export enum MessagePriority {
|
||||
LOW = 0,
|
||||
NORMAL = 1,
|
||||
HIGH = 2,
|
||||
CRITICAL = 3
|
||||
CRITICAL = 3,
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -31,7 +31,7 @@ export enum MessageStatus {
|
||||
SENT = 'sent',
|
||||
DELIVERED = 'delivered',
|
||||
FAILED = 'failed',
|
||||
EXPIRED = 'expired'
|
||||
EXPIRED = 'expired',
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -104,7 +104,7 @@ export class EventCommunicationService {
|
||||
totalBroadcasts: 0,
|
||||
failedMessages: 0,
|
||||
activeSubscribers: 0,
|
||||
channelUsage: new Map()
|
||||
channelUsage: new Map(),
|
||||
})
|
||||
|
||||
private processingInterval: number | null = null
|
||||
@@ -126,7 +126,7 @@ export class EventCommunicationService {
|
||||
options: {
|
||||
filter?: (message: EventMessage) => boolean
|
||||
priority?: MessagePriority
|
||||
} = {}
|
||||
} = {},
|
||||
): string {
|
||||
// 检查通道权限
|
||||
if (!this.canAccessChannel(appId, channel)) {
|
||||
@@ -142,7 +142,7 @@ export class EventCommunicationService {
|
||||
filter: options.filter,
|
||||
priority: options.priority || MessagePriority.NORMAL,
|
||||
createdAt: new Date(),
|
||||
active: true
|
||||
active: true,
|
||||
}
|
||||
|
||||
this.subscribers.set(subscriberId, subscriber)
|
||||
@@ -177,7 +177,7 @@ export class EventCommunicationService {
|
||||
type?: MessageType
|
||||
expiresIn?: number // 过期时间(毫秒)
|
||||
maxRetries?: number
|
||||
} = {}
|
||||
} = {},
|
||||
): Promise<string> {
|
||||
// 检查发送者权限
|
||||
if (!this.canAccessChannel(senderId, channel)) {
|
||||
@@ -193,7 +193,7 @@ export class EventCommunicationService {
|
||||
|
||||
const messageId = uuidv4()
|
||||
const now = new Date()
|
||||
|
||||
|
||||
const message: EventMessage = {
|
||||
id: messageId,
|
||||
type: options.type || MessageType.APPLICATION,
|
||||
@@ -206,7 +206,7 @@ export class EventCommunicationService {
|
||||
expiresAt: options.expiresIn ? new Date(now.getTime() + options.expiresIn) : undefined,
|
||||
status: MessageStatus.PENDING,
|
||||
retryCount: 0,
|
||||
maxRetries: options.maxRetries || 3
|
||||
maxRetries: options.maxRetries || 3,
|
||||
}
|
||||
|
||||
// 如果是点对点消息,直接发送
|
||||
@@ -222,14 +222,16 @@ export class EventCommunicationService {
|
||||
if (!options.receiverId) {
|
||||
this.statistics.totalBroadcasts++
|
||||
}
|
||||
|
||||
|
||||
const channelUsage = this.statistics.channelUsage.get(channel) || 0
|
||||
this.statistics.channelUsage.set(channel, channelUsage + 1)
|
||||
|
||||
// 记录消息历史
|
||||
this.recordMessage(message)
|
||||
|
||||
console.log(`[EventCommunication] 消息 ${messageId} 已发送到频道 ${channel}[发送者: ${senderId}]`)
|
||||
console.log(
|
||||
`[EventCommunication] 消息 ${messageId} 已发送到频道 ${channel}[发送者: ${senderId}]`,
|
||||
)
|
||||
return messageId
|
||||
}
|
||||
|
||||
@@ -243,11 +245,11 @@ export class EventCommunicationService {
|
||||
options: {
|
||||
priority?: MessagePriority
|
||||
expiresIn?: number
|
||||
} = {}
|
||||
} = {},
|
||||
): Promise<string> {
|
||||
return this.sendMessage(senderId, channel, payload, {
|
||||
...options,
|
||||
type: MessageType.BROADCAST
|
||||
type: MessageType.BROADCAST,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -261,14 +263,14 @@ export class EventCommunicationService {
|
||||
options: {
|
||||
priority?: MessagePriority
|
||||
expiresIn?: number
|
||||
} = {}
|
||||
} = {},
|
||||
): Promise<string> {
|
||||
const channel = 'cross-app'
|
||||
|
||||
|
||||
return this.sendMessage(senderId, channel, payload, {
|
||||
...options,
|
||||
receiverId,
|
||||
type: MessageType.CROSS_APP
|
||||
type: MessageType.CROSS_APP,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -281,20 +283,18 @@ export class EventCommunicationService {
|
||||
channel?: string
|
||||
limit?: number
|
||||
since?: Date
|
||||
} = {}
|
||||
} = {},
|
||||
): EventMessage[] {
|
||||
const history = this.messageHistory.get(appId) || []
|
||||
|
||||
let filtered = history.filter(msg =>
|
||||
msg.senderId === appId || msg.receiverId === appId
|
||||
)
|
||||
|
||||
let filtered = history.filter((msg) => msg.senderId === appId || msg.receiverId === appId)
|
||||
|
||||
if (options.channel) {
|
||||
filtered = filtered.filter(msg => msg.channel === options.channel)
|
||||
filtered = filtered.filter((msg) => msg.channel === options.channel)
|
||||
}
|
||||
|
||||
if (options.since) {
|
||||
filtered = filtered.filter(msg => msg.timestamp >= options.since!)
|
||||
filtered = filtered.filter((msg) => msg.timestamp >= options.since!)
|
||||
}
|
||||
|
||||
if (options.limit) {
|
||||
@@ -308,7 +308,7 @@ export class EventCommunicationService {
|
||||
* 获取应用的订阅列表
|
||||
*/
|
||||
getAppSubscriptions(appId: string): EventSubscriber[] {
|
||||
return Array.from(this.subscribers.values()).filter(sub => sub.appId === appId)
|
||||
return Array.from(this.subscribers.values()).filter((sub) => sub.appId === appId)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -316,24 +316,21 @@ export class EventCommunicationService {
|
||||
*/
|
||||
getChannelSubscriberCount(channel: string): number {
|
||||
return Array.from(this.subscribers.values()).filter(
|
||||
sub => sub.channel === channel && sub.active
|
||||
(sub) => sub.channel === channel && sub.active,
|
||||
).length
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建通信频道
|
||||
*/
|
||||
createChannel(
|
||||
channel: string,
|
||||
config: Omit<CommunicationChannel, 'name'>
|
||||
): boolean {
|
||||
createChannel(channel: string, config: Omit<CommunicationChannel, 'name'>): boolean {
|
||||
if (this.channels.has(channel)) {
|
||||
return false
|
||||
}
|
||||
|
||||
this.channels.set(channel, {
|
||||
name: channel,
|
||||
...config
|
||||
...config,
|
||||
})
|
||||
|
||||
console.log(`创建通信频道: ${channel}`)
|
||||
@@ -349,15 +346,15 @@ export class EventCommunicationService {
|
||||
.filter(([, sub]) => sub.channel === channel)
|
||||
.map(([id]) => id)
|
||||
|
||||
subscribersToRemove.forEach(id => this.unsubscribe(id))
|
||||
subscribersToRemove.forEach((id) => this.unsubscribe(id))
|
||||
|
||||
// 删除频道
|
||||
const result = this.channels.delete(channel)
|
||||
|
||||
|
||||
if (result) {
|
||||
console.log(`删除通信频道: ${channel}`)
|
||||
}
|
||||
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -376,10 +373,8 @@ export class EventCommunicationService {
|
||||
|
||||
// 清理过期消息
|
||||
for (const [appId, messages] of this.messageQueue.entries()) {
|
||||
const validMessages = messages.filter(msg =>
|
||||
!msg.expiresAt || msg.expiresAt > now
|
||||
)
|
||||
|
||||
const validMessages = messages.filter((msg) => !msg.expiresAt || msg.expiresAt > now)
|
||||
|
||||
if (validMessages.length !== messages.length) {
|
||||
this.messageQueue.set(appId, validMessages)
|
||||
}
|
||||
@@ -388,7 +383,7 @@ export class EventCommunicationService {
|
||||
// 清理消息历史(保留最近7天)
|
||||
const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000)
|
||||
for (const [appId, history] of this.messageHistory.entries()) {
|
||||
const recentHistory = history.filter(msg => msg.timestamp > sevenDaysAgo)
|
||||
const recentHistory = history.filter((msg) => msg.timestamp > sevenDaysAgo)
|
||||
this.messageHistory.set(appId, recentHistory)
|
||||
}
|
||||
|
||||
@@ -424,7 +419,7 @@ export class EventCommunicationService {
|
||||
restricted: true,
|
||||
allowedApps: ['system'],
|
||||
maxMessageSize: 1024 * 10, // 10KB
|
||||
messageRetention: 24 * 60 * 60 * 1000 // 24小时
|
||||
messageRetention: 24 * 60 * 60 * 1000, // 24小时
|
||||
})
|
||||
|
||||
// 应用间通信频道
|
||||
@@ -433,7 +428,7 @@ export class EventCommunicationService {
|
||||
restricted: false,
|
||||
allowedApps: [],
|
||||
maxMessageSize: 1024 * 100, // 100KB
|
||||
messageRetention: 7 * 24 * 60 * 60 * 1000 // 7天
|
||||
messageRetention: 7 * 24 * 60 * 60 * 1000, // 7天
|
||||
})
|
||||
|
||||
// 用户交互频道
|
||||
@@ -442,7 +437,7 @@ export class EventCommunicationService {
|
||||
restricted: false,
|
||||
allowedApps: [],
|
||||
maxMessageSize: 1024 * 5, // 5KB
|
||||
messageRetention: 60 * 60 * 1000 // 1小时
|
||||
messageRetention: 60 * 60 * 1000, // 1小时
|
||||
})
|
||||
|
||||
// 广播频道
|
||||
@@ -451,7 +446,7 @@ export class EventCommunicationService {
|
||||
restricted: true,
|
||||
allowedApps: ['system'],
|
||||
maxMessageSize: 1024 * 50, // 50KB
|
||||
messageRetention: 24 * 60 * 60 * 1000 // 24小时
|
||||
messageRetention: 24 * 60 * 60 * 1000, // 24小时
|
||||
})
|
||||
}
|
||||
|
||||
@@ -460,7 +455,7 @@ export class EventCommunicationService {
|
||||
*/
|
||||
private canAccessChannel(appId: string, channel: string): boolean {
|
||||
const channelConfig = this.channels.get(channel)
|
||||
|
||||
|
||||
if (!channelConfig) {
|
||||
// 频道不存在,默认允许
|
||||
return true
|
||||
@@ -483,15 +478,15 @@ export class EventCommunicationService {
|
||||
*/
|
||||
private addToQueue(message: EventMessage): void {
|
||||
const queueKey = message.receiverId || 'broadcast'
|
||||
|
||||
|
||||
if (!this.messageQueue.has(queueKey)) {
|
||||
this.messageQueue.set(queueKey, [])
|
||||
}
|
||||
|
||||
const queue = this.messageQueue.get(queueKey)!
|
||||
|
||||
|
||||
// 按优先级插入
|
||||
const insertIndex = queue.findIndex(msg => msg.priority < message.priority)
|
||||
const insertIndex = queue.findIndex((msg) => msg.priority < message.priority)
|
||||
if (insertIndex === -1) {
|
||||
queue.push(message)
|
||||
} else {
|
||||
@@ -505,12 +500,14 @@ export class EventCommunicationService {
|
||||
private async deliverMessage(message: EventMessage): Promise<void> {
|
||||
try {
|
||||
const subscribers = this.getRelevantSubscribers(message)
|
||||
|
||||
|
||||
if (subscribers.length === 0) {
|
||||
message.status = MessageStatus.FAILED
|
||||
// 只对非系统频道显示警告信息
|
||||
if (message.channel !== 'system') {
|
||||
console.warn(`[EventCommunication] 没有找到频道 ${message.channel} 的订阅者[消息 ID: ${message.id}]`)
|
||||
console.warn(
|
||||
`[EventCommunication] 没有找到频道 ${message.channel} 的订阅者[消息 ID: ${message.id}]`,
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -525,7 +522,9 @@ export class EventCommunicationService {
|
||||
|
||||
await subscriber.handler(message)
|
||||
this.statistics.totalMessagesReceived++
|
||||
console.log(`[EventCommunication] 消息 ${message.id} 已投递给订阅者 ${subscriber.id}[频道: ${message.channel}]`)
|
||||
console.log(
|
||||
`[EventCommunication] 消息 ${message.id} 已投递给订阅者 ${subscriber.id}[频道: ${message.channel}]`,
|
||||
)
|
||||
} catch (error) {
|
||||
console.error(`向订阅者 ${subscriber.id} 发送消息失败:`, error)
|
||||
throw error
|
||||
@@ -534,12 +533,11 @@ export class EventCommunicationService {
|
||||
|
||||
await Promise.allSettled(deliveryPromises)
|
||||
message.status = MessageStatus.DELIVERED
|
||||
|
||||
} catch (error) {
|
||||
message.status = MessageStatus.FAILED
|
||||
this.statistics.failedMessages++
|
||||
console.error('消息投递失败:', error)
|
||||
|
||||
|
||||
// 重试机制
|
||||
if (message.retryCount < message.maxRetries) {
|
||||
message.retryCount++
|
||||
@@ -553,10 +551,10 @@ export class EventCommunicationService {
|
||||
* 获取相关订阅者
|
||||
*/
|
||||
private getRelevantSubscribers(message: EventMessage): EventSubscriber[] {
|
||||
return Array.from(this.subscribers.values()).filter(subscriber => {
|
||||
return Array.from(this.subscribers.values()).filter((subscriber) => {
|
||||
if (!subscriber.active) return false
|
||||
if (subscriber.channel !== message.channel) return false
|
||||
|
||||
|
||||
// 点对点消息检查接收者
|
||||
if (message.receiverId && subscriber.appId !== message.receiverId) {
|
||||
return false
|
||||
@@ -585,7 +583,7 @@ export class EventCommunicationService {
|
||||
|
||||
// 处理优先级最高的消息
|
||||
const message = messages.shift()!
|
||||
|
||||
|
||||
// 检查消息是否过期
|
||||
if (message.expiresAt && message.expiresAt <= new Date()) {
|
||||
message.status = MessageStatus.EXPIRED
|
||||
@@ -601,12 +599,10 @@ export class EventCommunicationService {
|
||||
*/
|
||||
private cleanupExpiredMessages(): void {
|
||||
const now = new Date()
|
||||
|
||||
|
||||
for (const [queueKey, messages] of this.messageQueue.entries()) {
|
||||
const validMessages = messages.filter(msg =>
|
||||
!msg.expiresAt || msg.expiresAt > now
|
||||
)
|
||||
|
||||
const validMessages = messages.filter((msg) => !msg.expiresAt || msg.expiresAt > now)
|
||||
|
||||
if (validMessages.length !== messages.length) {
|
||||
this.messageQueue.set(queueKey, validMessages)
|
||||
}
|
||||
@@ -636,7 +632,8 @@ export class EventCommunicationService {
|
||||
* 更新活跃订阅者数量
|
||||
*/
|
||||
private updateActiveSubscribersCount(): void {
|
||||
this.statistics.activeSubscribers = Array.from(this.subscribers.values())
|
||||
.filter(sub => sub.active).length
|
||||
this.statistics.activeSubscribers = Array.from(this.subscribers.values()).filter(
|
||||
(sub) => sub.active,
|
||||
).length
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user