This commit is contained in:
2025-09-24 16:43:10 +08:00
parent 12f46e6f8e
commit 9dbc054483
130 changed files with 16474 additions and 4660 deletions

View File

@@ -0,0 +1,639 @@
import { reactive, ref } from 'vue'
import type { IEventBuilder } from '@/events/IEventBuilder'
import { v4 as uuidv4 } from 'uuid'
/**
* 消息类型枚举
*/
export enum MessageType {
SYSTEM = 'system',
APPLICATION = 'application',
USER_INTERACTION = 'user_interaction',
CROSS_APP = 'cross_app',
BROADCAST = 'broadcast'
}
/**
* 消息优先级枚举
*/
export enum MessagePriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
CRITICAL = 3
}
/**
* 消息状态枚举
*/
export enum MessageStatus {
PENDING = 'pending',
SENT = 'sent',
DELIVERED = 'delivered',
FAILED = 'failed',
EXPIRED = 'expired'
}
/**
* 事件消息接口
*/
export interface EventMessage {
id: string
type: MessageType
priority: MessagePriority
senderId: string
receiverId?: string // undefined表示广播消息
channel: string
payload: any
timestamp: Date
expiresAt?: Date
status: MessageStatus
retryCount: number
maxRetries: number
}
/**
* 事件订阅者接口
*/
export interface EventSubscriber {
id: string
appId: string
channel: string
handler: (message: EventMessage) => void | Promise<void>
filter?: (message: EventMessage) => boolean
priority: MessagePriority
createdAt: Date
active: boolean
}
/**
* 通信通道接口
*/
export interface CommunicationChannel {
name: string
description: string
restricted: boolean // 是否需要权限
allowedApps: string[] // 允许访问的应用ID列表
maxMessageSize: number // 最大消息大小(字节)
messageRetention: number // 消息保留时间(毫秒)
}
/**
* 事件统计信息
*/
export interface EventStatistics {
totalMessagesSent: number
totalMessagesReceived: number
totalBroadcasts: number
failedMessages: number
activeSubscribers: number
channelUsage: Map<string, number>
}
/**
* 事件通信服务类
*/
export class EventCommunicationService {
private subscribers = reactive(new Map<string, EventSubscriber>())
private messageQueue = reactive(new Map<string, EventMessage[]>()) // 按应用分组的消息队列
private messageHistory = reactive(new Map<string, EventMessage[]>()) // 消息历史记录
private channels = reactive(new Map<string, CommunicationChannel>())
private statistics = reactive<EventStatistics>({
totalMessagesSent: 0,
totalMessagesReceived: 0,
totalBroadcasts: 0,
failedMessages: 0,
activeSubscribers: 0,
channelUsage: new Map()
})
private processingInterval: number | null = null
private eventBus: IEventBuilder<any>
constructor(eventBus: IEventBuilder<any>) {
this.eventBus = eventBus
this.initializeDefaultChannels()
this.startMessageProcessing()
}
/**
* 订阅事件频道
*/
subscribe(
appId: string,
channel: string,
handler: (message: EventMessage) => void | Promise<void>,
options: {
filter?: (message: EventMessage) => boolean
priority?: MessagePriority
} = {}
): string {
// 检查通道权限
if (!this.canAccessChannel(appId, channel)) {
throw new Error(`应用 ${appId} 无权访问频道 ${channel}`)
}
const subscriberId = uuidv4()
const subscriber: EventSubscriber = {
id: subscriberId,
appId,
channel,
handler,
filter: options.filter,
priority: options.priority || MessagePriority.NORMAL,
createdAt: new Date(),
active: true
}
this.subscribers.set(subscriberId, subscriber)
this.updateActiveSubscribersCount()
console.log(`应用 ${appId} 订阅了频道 ${channel}`)
return subscriberId
}
/**
* 取消订阅
*/
unsubscribe(subscriberId: string): boolean {
const result = this.subscribers.delete(subscriberId)
if (result) {
this.updateActiveSubscribersCount()
console.log(`取消订阅: ${subscriberId}`)
}
return result
}
/**
* 发送消息
*/
async sendMessage(
senderId: string,
channel: string,
payload: any,
options: {
receiverId?: string
priority?: MessagePriority
type?: MessageType
expiresIn?: number // 过期时间(毫秒)
maxRetries?: number
} = {}
): Promise<string> {
// 检查发送者权限
if (!this.canAccessChannel(senderId, channel)) {
throw new Error(`应用 ${senderId} 无权向频道 ${channel} 发送消息`)
}
// 检查消息大小
const messageSize = JSON.stringify(payload).length
const channelConfig = this.channels.get(channel)
if (channelConfig && messageSize > channelConfig.maxMessageSize) {
throw new Error(`消息大小超出限制: ${messageSize} > ${channelConfig.maxMessageSize}`)
}
const messageId = uuidv4()
const now = new Date()
const message: EventMessage = {
id: messageId,
type: options.type || MessageType.APPLICATION,
priority: options.priority || MessagePriority.NORMAL,
senderId,
receiverId: options.receiverId,
channel,
payload,
timestamp: now,
expiresAt: options.expiresIn ? new Date(now.getTime() + options.expiresIn) : undefined,
status: MessageStatus.PENDING,
retryCount: 0,
maxRetries: options.maxRetries || 3
}
// 如果是点对点消息,直接发送
if (options.receiverId) {
await this.deliverMessage(message)
} else {
// 广播消息,加入队列处理
this.addToQueue(message)
}
// 更新统计信息
this.statistics.totalMessagesSent++
if (!options.receiverId) {
this.statistics.totalBroadcasts++
}
const channelUsage = this.statistics.channelUsage.get(channel) || 0
this.statistics.channelUsage.set(channel, channelUsage + 1)
// 记录消息历史
this.recordMessage(message)
console.log(`[EventCommunication] 消息 ${messageId} 已发送到频道 ${channel}[发送者: ${senderId}]`)
return messageId
}
/**
* 广播消息到所有订阅者
*/
async broadcast(
senderId: string,
channel: string,
payload: any,
options: {
priority?: MessagePriority
expiresIn?: number
} = {}
): Promise<string> {
return this.sendMessage(senderId, channel, payload, {
...options,
type: MessageType.BROADCAST
})
}
/**
* 发送跨应用消息
*/
async sendCrossAppMessage(
senderId: string,
receiverId: string,
payload: any,
options: {
priority?: MessagePriority
expiresIn?: number
} = {}
): Promise<string> {
const channel = 'cross-app'
return this.sendMessage(senderId, channel, payload, {
...options,
receiverId,
type: MessageType.CROSS_APP
})
}
/**
* 获取消息历史
*/
getMessageHistory(
appId: string,
options: {
channel?: string
limit?: number
since?: Date
} = {}
): EventMessage[] {
const history = this.messageHistory.get(appId) || []
let filtered = history.filter(msg =>
msg.senderId === appId || msg.receiverId === appId
)
if (options.channel) {
filtered = filtered.filter(msg => msg.channel === options.channel)
}
if (options.since) {
filtered = filtered.filter(msg => msg.timestamp >= options.since!)
}
if (options.limit) {
filtered = filtered.slice(-options.limit)
}
return filtered.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime())
}
/**
* 获取应用的订阅列表
*/
getAppSubscriptions(appId: string): EventSubscriber[] {
return Array.from(this.subscribers.values()).filter(sub => sub.appId === appId)
}
/**
* 获取频道订阅者数量
*/
getChannelSubscriberCount(channel: string): number {
return Array.from(this.subscribers.values()).filter(
sub => sub.channel === channel && sub.active
).length
}
/**
* 创建通信频道
*/
createChannel(
channel: string,
config: Omit<CommunicationChannel, 'name'>
): boolean {
if (this.channels.has(channel)) {
return false
}
this.channels.set(channel, {
name: channel,
...config
})
console.log(`创建通信频道: ${channel}`)
return true
}
/**
* 删除通信频道
*/
deleteChannel(channel: string): boolean {
// 移除所有相关订阅
const subscribersToRemove = Array.from(this.subscribers.entries())
.filter(([, sub]) => sub.channel === channel)
.map(([id]) => id)
subscribersToRemove.forEach(id => this.unsubscribe(id))
// 删除频道
const result = this.channels.delete(channel)
if (result) {
console.log(`删除通信频道: ${channel}`)
}
return result
}
/**
* 获取统计信息
*/
getStatistics(): EventStatistics {
return { ...this.statistics }
}
/**
* 清理过期消息和订阅
*/
cleanup(): void {
const now = new Date()
// 清理过期消息
for (const [appId, messages] of this.messageQueue.entries()) {
const validMessages = messages.filter(msg =>
!msg.expiresAt || msg.expiresAt > now
)
if (validMessages.length !== messages.length) {
this.messageQueue.set(appId, validMessages)
}
}
// 清理消息历史(保留最近7天)
const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000)
for (const [appId, history] of this.messageHistory.entries()) {
const recentHistory = history.filter(msg => msg.timestamp > sevenDaysAgo)
this.messageHistory.set(appId, recentHistory)
}
console.log('事件通信服务清理完成')
}
/**
* 销毁服务
*/
destroy(): void {
if (this.processingInterval) {
clearInterval(this.processingInterval)
this.processingInterval = null
}
this.subscribers.clear()
this.messageQueue.clear()
this.messageHistory.clear()
this.channels.clear()
console.log('事件通信服务已销毁')
}
// 私有方法
/**
* 初始化默认频道
*/
private initializeDefaultChannels(): void {
// 系统事件频道
this.createChannel('system', {
description: '系统级事件通信',
restricted: true,
allowedApps: ['system'],
maxMessageSize: 1024 * 10, // 10KB
messageRetention: 24 * 60 * 60 * 1000 // 24小时
})
// 应用间通信频道
this.createChannel('cross-app', {
description: '应用间通信',
restricted: false,
allowedApps: [],
maxMessageSize: 1024 * 100, // 100KB
messageRetention: 7 * 24 * 60 * 60 * 1000 // 7天
})
// 用户交互频道
this.createChannel('user-interaction', {
description: '用户交互事件',
restricted: false,
allowedApps: [],
maxMessageSize: 1024 * 5, // 5KB
messageRetention: 60 * 60 * 1000 // 1小时
})
// 广播频道
this.createChannel('broadcast', {
description: '系统广播',
restricted: true,
allowedApps: ['system'],
maxMessageSize: 1024 * 50, // 50KB
messageRetention: 24 * 60 * 60 * 1000 // 24小时
})
}
/**
* 检查应用是否可以访问频道
*/
private canAccessChannel(appId: string, channel: string): boolean {
const channelConfig = this.channels.get(channel)
if (!channelConfig) {
// 频道不存在,默认允许
return true
}
if (!channelConfig.restricted) {
return true
}
// 系统应用总是有权限
if (appId === 'system') {
return true
}
return channelConfig.allowedApps.includes(appId)
}
/**
* 添加消息到队列
*/
private addToQueue(message: EventMessage): void {
const queueKey = message.receiverId || 'broadcast'
if (!this.messageQueue.has(queueKey)) {
this.messageQueue.set(queueKey, [])
}
const queue = this.messageQueue.get(queueKey)!
// 按优先级插入
const insertIndex = queue.findIndex(msg => msg.priority < message.priority)
if (insertIndex === -1) {
queue.push(message)
} else {
queue.splice(insertIndex, 0, message)
}
}
/**
* 直接投递消息
*/
private async deliverMessage(message: EventMessage): Promise<void> {
try {
const subscribers = this.getRelevantSubscribers(message)
if (subscribers.length === 0) {
message.status = MessageStatus.FAILED
console.warn(`[EventCommunication] 没有找到频道 ${message.channel} 的订阅者[消息 ID: ${message.id}]`)
return
}
// 并行发送给所有订阅者
const deliveryPromises = subscribers.map(async (subscriber) => {
try {
// 应用过滤器
if (subscriber.filter && !subscriber.filter(message)) {
return
}
await subscriber.handler(message)
this.statistics.totalMessagesReceived++
console.log(`[EventCommunication] 消息 ${message.id} 已投递给订阅者 ${subscriber.id}[频道: ${message.channel}]`)
} catch (error) {
console.error(`向订阅者 ${subscriber.id} 发送消息失败:`, error)
throw error
}
})
await Promise.allSettled(deliveryPromises)
message.status = MessageStatus.DELIVERED
} catch (error) {
message.status = MessageStatus.FAILED
this.statistics.failedMessages++
console.error('消息投递失败:', error)
// 重试机制
if (message.retryCount < message.maxRetries) {
message.retryCount++
message.status = MessageStatus.PENDING
setTimeout(() => this.deliverMessage(message), 1000 * message.retryCount)
}
}
}
/**
* 获取相关订阅者
*/
private getRelevantSubscribers(message: EventMessage): EventSubscriber[] {
return Array.from(this.subscribers.values()).filter(subscriber => {
if (!subscriber.active) return false
if (subscriber.channel !== message.channel) return false
// 点对点消息检查接收者
if (message.receiverId && subscriber.appId !== message.receiverId) {
return false
}
return true
})
}
/**
* 开始消息处理循环
*/
private startMessageProcessing(): void {
this.processingInterval = setInterval(() => {
this.processMessageQueue()
this.cleanupExpiredMessages()
}, 100) // 每100ms处理一次
}
/**
* 处理消息队列
*/
private processMessageQueue(): void {
for (const [queueKey, messages] of this.messageQueue.entries()) {
if (messages.length === 0) continue
// 处理优先级最高的消息
const message = messages.shift()!
// 检查消息是否过期
if (message.expiresAt && message.expiresAt <= new Date()) {
message.status = MessageStatus.EXPIRED
continue
}
this.deliverMessage(message)
}
}
/**
* 清理过期消息
*/
private cleanupExpiredMessages(): void {
const now = new Date()
for (const [queueKey, messages] of this.messageQueue.entries()) {
const validMessages = messages.filter(msg =>
!msg.expiresAt || msg.expiresAt > now
)
if (validMessages.length !== messages.length) {
this.messageQueue.set(queueKey, validMessages)
}
}
}
/**
* 记录消息历史
*/
private recordMessage(message: EventMessage): void {
// 记录发送者历史
if (!this.messageHistory.has(message.senderId)) {
this.messageHistory.set(message.senderId, [])
}
this.messageHistory.get(message.senderId)!.push(message)
// 记录接收者历史
if (message.receiverId && message.receiverId !== message.senderId) {
if (!this.messageHistory.has(message.receiverId)) {
this.messageHistory.set(message.receiverId, [])
}
this.messageHistory.get(message.receiverId)!.push(message)
}
}
/**
* 更新活跃订阅者数量
*/
private updateActiveSubscribersCount(): void {
this.statistics.activeSubscribers = Array.from(this.subscribers.values())
.filter(sub => sub.active).length
}
}