import { reactive } from 'vue' import type { IEventBuilder, IEventMap } from '@/events/IEventBuilder' import { v4 as uuidv4 } from 'uuid' import type { WindowState } from './WindowService' import type { ResourceType } from './ResourceService' /** * 消息类型枚举 */ export enum MessageType { SYSTEM = 'system', APPLICATION = 'application', USER_INTERACTION = 'user_interaction', CROSS_APP = 'cross_app', BROADCAST = 'broadcast', } /** * 消息优先级枚举 */ export enum MessagePriority { LOW = 0, NORMAL = 1, HIGH = 2, CRITICAL = 3, } /** * 消息状态枚举 */ export enum MessageStatus { PENDING = 'pending', SENT = 'sent', DELIVERED = 'delivered', FAILED = 'failed', EXPIRED = 'expired', } /** * 事件消息接口 */ export interface EventMessage { id: string type: MessageType priority: MessagePriority senderId: string receiverId?: string // undefined表示广播消息 channel: string payload: any timestamp: Date expiresAt?: Date status: MessageStatus retryCount: number maxRetries: number } /** * 事件订阅者接口 */ export interface EventSubscriber { id: string appId: string channel: string handler: (message: EventMessage) => void | Promise filter?: (message: EventMessage) => boolean priority: MessagePriority createdAt: Date active: boolean } /** * 通信通道接口 */ export interface CommunicationChannel { name: string description: string restricted: boolean // 是否需要权限 allowedApps: string[] // 允许访问的应用ID列表 maxMessageSize: number // 最大消息大小(字节) messageRetention: number // 消息保留时间(毫秒) } /** * 事件统计信息 */ export interface EventStatistics { totalMessagesSent: number totalMessagesReceived: number totalBroadcasts: number failedMessages: number activeSubscribers: number channelUsage: Map } /** * 窗体数据更新参数 */ export interface WindowFormDataUpdateParams { /** 窗口id */ id: string /** 窗口状态 */ state: WindowState /** 窗口宽度 */ width: number /** 窗口高度 */ height: number /** 窗口x坐标(左上角) */ x: number /** 窗口y坐标(左上角) */ y: number } /** * 事件通信服务事件接口 */ export interface EventCommunicationEvents extends IEventMap { // 系统就绪事件 onSystemReady: (data: { timestamp: Date; services: string[] }) => void // 窗体相关事件 onWindowStateChanged: (windowId: string, newState: string, oldState: string) => void onWindowFormDataUpdate: (data: WindowFormDataUpdateParams) => void onWindowFormResizeStart: (windowId: string) => void onWindowFormResizing: (windowId: string, width: number, height: number) => void onWindowFormResizeEnd: (windowId: string) => void onWindowClose: (windowId: string) => void // 应用生命周期事件 onAppLifecycle: (data: { appId: string; event: string; timestamp: Date }) => void // 资源相关事件 onResourceQuotaExceeded: (appId: string, resourceType: ResourceType) => void onPerformanceAlert: (data: { type: 'memory' | 'cpu'; usage: number; limit: number }) => void // 消息处理事件 onMessageProcessed: (message: EventMessage) => void onMessageFailed: (message: EventMessage, error: any) => void } /** * 事件通信服务类 */ export class EventCommunicationService { private subscribers = reactive(new Map()) private messageQueue = reactive(new Map()) // 按应用分组的消息队列 private messageHistory = reactive(new Map()) // 消息历史记录 private channels = reactive(new Map()) private statistics = reactive({ totalMessagesSent: 0, totalMessagesReceived: 0, totalBroadcasts: 0, failedMessages: 0, activeSubscribers: 0, channelUsage: new Map(), }) private processingInterval: number | null = null private eventBus: IEventBuilder constructor(eventBus: IEventBuilder) { this.eventBus = eventBus this.initializeDefaultChannels() this.startMessageProcessing() } /** * 订阅事件频道 */ subscribe( appId: string, channel: string, handler: (message: EventMessage) => void | Promise, options: { filter?: (message: EventMessage) => boolean priority?: MessagePriority } = {}, ): string { // 检查通道权限 if (!this.canAccessChannel(appId, channel)) { throw new Error(`应用 ${appId} 无权访问频道 ${channel}`) } const subscriberId = uuidv4() const subscriber: EventSubscriber = { id: subscriberId, appId, channel, handler, filter: options.filter, priority: options.priority || MessagePriority.NORMAL, createdAt: new Date(), active: true, } this.subscribers.set(subscriberId, subscriber) this.updateActiveSubscribersCount() console.log(`应用 ${appId} 订阅了频道 ${channel}`) return subscriberId } /** * 取消订阅 */ unsubscribe(subscriberId: string): boolean { const result = this.subscribers.delete(subscriberId) if (result) { this.updateActiveSubscribersCount() console.log(`取消订阅: ${subscriberId}`) } return result } /** * 发送消息 */ async sendMessage( senderId: string, channel: string, payload: any, options: { receiverId?: string priority?: MessagePriority type?: MessageType expiresIn?: number // 过期时间(毫秒) maxRetries?: number } = {}, ): Promise { // 检查发送者权限 if (!this.canAccessChannel(senderId, channel)) { throw new Error(`应用 ${senderId} 无权向频道 ${channel} 发送消息`) } // 检查消息大小 const messageSize = JSON.stringify(payload).length const channelConfig = this.channels.get(channel) if (channelConfig && messageSize > channelConfig.maxMessageSize) { throw new Error(`消息大小超出限制: ${messageSize} > ${channelConfig.maxMessageSize}`) } const messageId = uuidv4() const now = new Date() const message: EventMessage = { id: messageId, type: options.type || MessageType.APPLICATION, priority: options.priority || MessagePriority.NORMAL, senderId, receiverId: options.receiverId, channel, payload, timestamp: now, expiresAt: options.expiresIn ? new Date(now.getTime() + options.expiresIn) : undefined, status: MessageStatus.PENDING, retryCount: 0, maxRetries: options.maxRetries || 3, } // 如果是点对点消息,直接发送 if (options.receiverId) { await this.deliverMessage(message) } else { // 广播消息,加入队列处理 this.addToQueue(message) } // 更新统计信息 this.statistics.totalMessagesSent++ if (!options.receiverId) { this.statistics.totalBroadcasts++ } const channelUsage = this.statistics.channelUsage.get(channel) || 0 this.statistics.channelUsage.set(channel, channelUsage + 1) // 记录消息历史 this.recordMessage(message) console.log( `[EventCommunication] 消息 ${messageId} 已发送到频道 ${channel}[发送者: ${senderId}]`, ) return messageId } /** * 广播消息到所有订阅者 */ async broadcast( senderId: string, channel: string, payload: any, options: { priority?: MessagePriority expiresIn?: number } = {}, ): Promise { return this.sendMessage(senderId, channel, payload, { ...options, type: MessageType.BROADCAST, }) } /** * 发送跨应用消息 */ async sendCrossAppMessage( senderId: string, receiverId: string, payload: any, options: { priority?: MessagePriority expiresIn?: number } = {}, ): Promise { const channel = 'cross-app' return this.sendMessage(senderId, channel, payload, { ...options, receiverId, type: MessageType.CROSS_APP, }) } /** * 获取消息历史 */ getMessageHistory( appId: string, options: { channel?: string limit?: number since?: Date } = {}, ): EventMessage[] { const history = this.messageHistory.get(appId) || [] let filtered = history.filter((msg) => msg.senderId === appId || msg.receiverId === appId) if (options.channel) { filtered = filtered.filter((msg) => msg.channel === options.channel) } if (options.since) { filtered = filtered.filter((msg) => msg.timestamp >= options.since!) } if (options.limit) { filtered = filtered.slice(-options.limit) } return filtered.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime()) } /** * 获取应用的订阅列表 */ getAppSubscriptions(appId: string): EventSubscriber[] { return Array.from(this.subscribers.values()).filter((sub) => sub.appId === appId) } /** * 获取频道订阅者数量 */ getChannelSubscriberCount(channel: string): number { return Array.from(this.subscribers.values()).filter( (sub) => sub.channel === channel && sub.active, ).length } /** * 创建通信频道 */ createChannel(channel: string, config: Omit): boolean { if (this.channels.has(channel)) { return false } this.channels.set(channel, { name: channel, ...config, }) console.log(`创建通信频道: ${channel}`) return true } /** * 删除通信频道 */ deleteChannel(channel: string): boolean { // 移除所有相关订阅 const subscribersToRemove = Array.from(this.subscribers.entries()) .filter(([, sub]) => sub.channel === channel) .map(([id]) => id) subscribersToRemove.forEach((id) => this.unsubscribe(id)) // 删除频道 const result = this.channels.delete(channel) if (result) { console.log(`删除通信频道: ${channel}`) } return result } /** * 获取统计信息 */ getStatistics(): EventStatistics { return { ...this.statistics } } /** * 清理过期消息和订阅 */ cleanup(): void { const now = new Date() // 清理过期消息 for (const [appId, messages] of this.messageQueue.entries()) { const validMessages = messages.filter((msg) => !msg.expiresAt || msg.expiresAt > now) if (validMessages.length !== messages.length) { this.messageQueue.set(appId, validMessages) } } // 清理消息历史(保留最近7天) const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000) for (const [appId, history] of this.messageHistory.entries()) { const recentHistory = history.filter((msg) => msg.timestamp > sevenDaysAgo) this.messageHistory.set(appId, recentHistory) } console.log('事件通信服务清理完成') } /** * 销毁服务 */ destroy(): void { if (this.processingInterval) { clearInterval(this.processingInterval) this.processingInterval = null } this.subscribers.clear() this.messageQueue.clear() this.messageHistory.clear() this.channels.clear() console.log('事件通信服务已销毁') } // 私有方法 /** * 初始化默认频道 */ private initializeDefaultChannels(): void { // 系统事件频道 this.createChannel('system', { description: '系统级事件通信', restricted: true, allowedApps: ['system'], maxMessageSize: 1024 * 10, // 10KB messageRetention: 24 * 60 * 60 * 1000, // 24小时 }) // 应用间通信频道 this.createChannel('cross-app', { description: '应用间通信', restricted: false, allowedApps: [], maxMessageSize: 1024 * 100, // 100KB messageRetention: 7 * 24 * 60 * 60 * 1000, // 7天 }) // 用户交互频道 this.createChannel('user-interaction', { description: '用户交互事件', restricted: false, allowedApps: [], maxMessageSize: 1024 * 5, // 5KB messageRetention: 60 * 60 * 1000, // 1小时 }) // 广播频道 this.createChannel('broadcast', { description: '系统广播', restricted: true, allowedApps: ['system'], maxMessageSize: 1024 * 50, // 50KB messageRetention: 24 * 60 * 60 * 1000, // 24小时 }) } /** * 检查应用是否可以访问频道 */ private canAccessChannel(appId: string, channel: string): boolean { const channelConfig = this.channels.get(channel) if (!channelConfig) { // 频道不存在,默认允许 return true } if (!channelConfig.restricted) { return true } // 系统应用总是有权限 if (appId === 'system') { return true } return channelConfig.allowedApps.includes(appId) } /** * 添加消息到队列 */ private addToQueue(message: EventMessage): void { const queueKey = message.receiverId || 'broadcast' if (!this.messageQueue.has(queueKey)) { this.messageQueue.set(queueKey, []) } const queue = this.messageQueue.get(queueKey)! // 按优先级插入 const insertIndex = queue.findIndex((msg) => msg.priority < message.priority) if (insertIndex === -1) { queue.push(message) } else { queue.splice(insertIndex, 0, message) } } /** * 直接投递消息 */ private async deliverMessage(message: EventMessage): Promise { try { const subscribers = this.getRelevantSubscribers(message) if (subscribers.length === 0) { message.status = MessageStatus.FAILED // 只对非系统频道显示警告信息 if (message.channel !== 'system') { console.warn( `[EventCommunication] 没有找到频道 ${message.channel} 的订阅者[消息 ID: ${message.id}]`, ) } return } // 并行发送给所有订阅者 const deliveryPromises = subscribers.map(async (subscriber) => { try { // 应用过滤器 if (subscriber.filter && !subscriber.filter(message)) { return } await subscriber.handler(message) this.statistics.totalMessagesReceived++ console.log( `[EventCommunication] 消息 ${message.id} 已投递给订阅者 ${subscriber.id}[频道: ${message.channel}]`, ) } catch (error) { console.error(`向订阅者 ${subscriber.id} 发送消息失败:`, error) throw error } }) await Promise.allSettled(deliveryPromises) message.status = MessageStatus.DELIVERED } catch (error) { message.status = MessageStatus.FAILED this.statistics.failedMessages++ console.error('消息投递失败:', error) // 重试机制 if (message.retryCount < message.maxRetries) { message.retryCount++ message.status = MessageStatus.PENDING setTimeout(() => this.deliverMessage(message), 1000 * message.retryCount) } } } /** * 获取相关订阅者 */ private getRelevantSubscribers(message: EventMessage): EventSubscriber[] { return Array.from(this.subscribers.values()).filter((subscriber) => { if (!subscriber.active) return false if (subscriber.channel !== message.channel) return false // 点对点消息检查接收者 if (message.receiverId && subscriber.appId !== message.receiverId) { return false } return true }) } /** * 开始消息处理循环 */ private startMessageProcessing(): void { this.processingInterval = setInterval(() => { this.processMessageQueue() this.cleanupExpiredMessages() }, 100) // 每100ms处理一次 } /** * 处理消息队列 */ private processMessageQueue(): void { for (const [queueKey, messages] of this.messageQueue.entries()) { if (messages.length === 0) continue // 处理优先级最高的消息 const message = messages.shift()! // 检查消息是否过期 if (message.expiresAt && message.expiresAt <= new Date()) { message.status = MessageStatus.EXPIRED continue } this.deliverMessage(message) } } /** * 清理过期消息 */ private cleanupExpiredMessages(): void { const now = new Date() for (const [queueKey, messages] of this.messageQueue.entries()) { const validMessages = messages.filter((msg) => !msg.expiresAt || msg.expiresAt > now) if (validMessages.length !== messages.length) { this.messageQueue.set(queueKey, validMessages) } } } /** * 记录消息历史 */ private recordMessage(message: EventMessage): void { // 记录发送者历史 if (!this.messageHistory.has(message.senderId)) { this.messageHistory.set(message.senderId, []) } this.messageHistory.get(message.senderId)!.push(message) // 记录接收者历史 if (message.receiverId && message.receiverId !== message.senderId) { if (!this.messageHistory.has(message.receiverId)) { this.messageHistory.set(message.receiverId, []) } this.messageHistory.get(message.receiverId)!.push(message) } } /** * 更新活跃订阅者数量 */ private updateActiveSubscribersCount(): void { this.statistics.activeSubscribers = Array.from(this.subscribers.values()).filter( (sub) => sub.active, ).length } }