👍 重难点:
- 全栈项目架构
- Kafka + Clickhouse + Flink 实时数据流经典架构设计
- 性能与异常指标采集
- 【初中级】做过监控设计?那性能与异常监控指标怎么算的?
- 【中高级】有设计过大型实时数据流相关系统吗,具体怎么做?
- 【专家级】详细说明监控平台方案设计与落地,项目实践
【初中级】做过监控设计?那前端性能与异常监控指标怎么算?
性能监控那节,我们详细介绍了性能优化相关指标概念,这次我们来结合大厂监控平台实战,充分理解指标定义、计算与上报逻辑。
前端性能监控通过收集和分析用户端的性能数据来衡量和优化页面加载速度、交互响应时间等关键性能指标的过程。
为什么需要性能监控?
- 提升用户体验:减少页面加载和响应时间,提高用户留存率。
- 发现和优化性能瓶颈:及时发现影响性能的因素并进行优化。
- 数据驱动决策:基于真实数据进行性能优化,而非依赖直觉。
常见的性能监控指标
页面加载时间(Page Load Time)
- 定义:从用户发出请求到页面完全加载完成的时间。
- 计算方式:
代码块
const pageLoadTime = performance.timing.loadEventEnd - performance.timing.navigationStart;
- 意义:页面加载时间是用户体验的核心指标之一。
首次内容绘制时间(First Contentful Paint, FCP)
- 定义:页面首次绘制出任何内容的时间。
- 获取方法:
代码块
new PerformanceObserver((entryList) => {
const entries = entryList.getEntriesByName('first-contentful-paint');
if (entries.length > 0) {
const fcpTime = entries[0].startTime;
console.log(`FCP: ${fcpTime}`);
}
}).observe({ type: 'paint', buffered: true });
- 意义:FCP直接反映页面开始展现内容的速度。
最大内容绘制时间(Largest Contentful Paint, LCP)
- 定义:页面中最大内容元素的渲染时间。
- 获取方法:
代码块
new PerformanceObserver((entryList) => {
const entries = entryList.getEntries();
const lastEntry = entries[entries.length - 1];
const lcpTime = lastEntry.startTime;
console.log(`LCP: ${lcpTime}`);
}).observe({ type: 'largest-contentful-paint', buffered: true });
- 意义:LCP是衡量页面主要内容是否迅速加载的核心指标。
交互时间(Time to Interactive, TTI)
- 定义:页面从加载开始到可以响应用户交互的时间。
- 计算方式:
代码块
const timeToInteractive = performance.timing.domInteractive - performance.timing.navigationStart;
- 意义:TTI直接关系到用户能够开始操作页面的时间。
资源加载时间
- 定义:每个静态资源(如图片、JS文件)的加载时间。
- 获取方法:
代码块
const resources = performance.getEntriesByType('resource');
resources.forEach((resource) => {
const loadTime = resource.responseEnd - resource.startTime;
console.log(`${resource.name}: ${loadTime}`);
});
- 意义:资源加载时间影响整体页面加载性能,是优化的关键。
前端异常监控
前端异常监控指的是捕获并报告用户端发生的错误或异常,帮助开发者及时发现和修复问题。最经典的要数sentry了,有兴趣的同学呢,重点了解sentry的源码实现。
JavaScript错误(JS Errors)
- 定义:运行时JavaScript错误。
- 捕获方法:
代码块
window.onerror = function (message, source, lineno, colno, error) {
console.log(`Error: ${message}, Source: ${source}, Line: ${lineno}, Column: ${colno}, Error Object: ${error}`);
};
Promise未处理拒绝(Unhandled Promise Rejection)
- 定义:未处理的Promise拒绝错误。
- 捕获方法:
代码块
window.addEventListener('unhandledrejection', function (event) {
console.log(`Unhandled Rejection: ${event.reason}`);
});
资源加载错误
- 定义:静态资源加载失败错误。
- 捕获方法:
代码块
window.addEventListener('error', function (event) {
if (event.target!== window) {
console.log(`Resource Load Error: ${event.target.src || event.target.href}`);
}
}, true);
接口请求失败(API Failure)
- 定义:API请求失败或超时错误。
- 捕获方法:
代码块
fetch(url)
.then(response => {
if (!response.ok) {
console.error(`API Failure: ${response.status} ${response.statusText}`);
}
})
.catch(error => console.error(`Fetch Error: ${error}`));
统一上报数据格式
数据格式设计原则
- 一致性:所有监控数据应采用统一的格式,便于后续分析和展示。
- 可扩展性:数据格式设计应考虑未来的扩展需求,允许添加新字段。
- 轻量化:数据尽量精简,减少上报带来的性能影响。
数据格式示例
以下为一个仿照Sentry定义的统一性能与异常监控数据格式:
代码块
{
"event_id": "unique-event-id",
"timestamp": "2024-08-28T14:25:43.511Z",
"platform": "javascript",
"release": "project-release-version",
"environment": "production",
"user": {
"id": "user-id",
"username": "username",
"email": "user@example.com",
"ip_address": "user-ip-address"
},
"contexts": {
"performance": {
"page_load_time": 1234,
"fcp": 678,
"lcp": 1234,
"tti": 2345,
"resources": [
{"name": "resource1.js", "load_time": 123},
{"name": "image1.png", "load_time": 456}
]
}
},
"exception": {
"values": [
{
"type": "TypeError",
"value": "Cannot read property 'x' of undefined",
"stacktrace": {
"frames": [
{
"filename": "https://example.com/script.js",
"lineno": 42,
"colno": 21,
"function": "myFunction",
"in_app": true
}
]
}
}
]
},
"errors": [
{
"message": "Unhandled Promise Rejection: Error: Something went wrong",
"source": "https://example.com/promise.js",
"lineno": 10,
"colno": 15,
"errorObject": {
"name": "Error",
"message": "Something went wrong",
"stack": "Error: Something went wrong\n at promise.js:10:15"
}
},
{
"message": "Resource Load Error: https://example.com/image.png",
"source": "image.png",
"lineno": 0,
"colno": 0
}
],
"breadcrumbs": [
{
"category": "ui.click",
"message": "User clicked button",
"level": "info",
"timestamp": "2024-08-28T14:25:40.511Z"
},
{
"category": "navigation",
"message": "User navigated to /dashboard",
"level": "info",
"timestamp": "2024-08-28T14:25:43.511Z"
}
]
}
数据上报
使用以下代码上报监控数据:
fetch
代码块
fetch('https://your-monitoring-service.com/api/events', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-api-key'
},
body: JSON.stringify(eventData) // eventData 是 JSON 数据
});
除此以外,还有以下上报方式:
- ajax
- 图片打点image
- navigator.sendBeacon
- WS
【拓展】用户行为埋点与监控
埋点是一种通过在前端代码中添加特定标记或逻辑来记录用户行为、页面事件等数据的技术。埋点数据可以帮助开发者了解用户行为路径、页面功能使用情况等,为产品优化和用户体验提升提供数据支持。
常见埋点指标
- 页面浏览量(Page View, PV)
- 定义:用户每次加载页面时的计数。
- 埋点方式:在页面加载时触发埋点。
代码块
// Example: 埋点触发
sendTrackingData({
event: 'page_view',
url: window.location.href,
timestamp: Date.now()
});
- 独立访客数(Unique Visitor, UV)
- 定义:独立用户访问次数的计数。通常通过用户ID或Cookie进行识别。
- 埋点方式:在用户第一次访问时触发埋点。
代码块
// Example: 获取用户ID并触发UV埋点
const userId = getUserIdFromCookie();
if (!userId) {
setUserIdInCookie(generateNewUserId());
}
sendTrackingData({
event: 'unique_visitor',
userId: userId,
timestamp: Date.now()
});
- 点击事件(Click Event)
- 定义:用户点击页面元素(如按钮、链接)时的记录。
- 埋点方式:在用户点击特定元素时触发埋点。
代码块
document.getElementById('buttonId').addEventListener('click', function() {
sendTrackingData({
event: 'click',
elementId: 'buttonId',
url: window.location.href,
timestamp: Date.now()
});
});
- 表单提交(Form Submit)
- 定义:用户提交表单时的记录。
- 埋点方式:在表单提交时触发埋点。
代码块
document.getElementById('formId').addEventListener('submit', function() {
sendTrackingData({
event: 'form_submit',
formId: 'formId',
url: window.location.href,
timestamp: Date.now()
});
});
- 停留时间(Time on Page)
- 定义:用户在页面上停留的时间。
- 埋点方式:在页面加载和关闭时分别触发埋点,并计算时间差。
代码块
const startTime = Date.now();
window.addEventListener('beforeunload', function() {
const timeSpent = Date.now() - startTime;
sendTrackingData({
event: 'time_on_page',
timeSpent: timeSpent,
url: window.location.href,
timestamp: Date.now()
});
});
综合埋点数据协议设计
在之前的性能与异常监控数据格式的基础上,我们可以加入埋点数据,使数据协议更加完整和灵活。以下是包含埋点信息的综合数据协议设计:
代码块
{
"event_id": "unique-event-id",
"timestamp": "2024-08-28T14:25:43.511Z",
"platform": "javascript",
"release": "project-release-version",
"environment": "production",
"user": {
"id": "user-id",
"username": "username",
"email": "user@example.com",
"ip_address": "user-ip-address"
},
"contexts": {
"performance": {
"page_load_time": 1234,
"fcp": 678,
"lcp": 1234,
"tti": 2345,
"resources": [
{"name": "resource1.js", "load_time": 123},
{"name": "image1.png", "load_time": 456}
]
},
"exceptions": {
"values": [
{
"type": "TypeError",
"value": "Cannot read property 'x' of undefined",
"stacktrace": {
"frames": [
{
"filename": "https://example.com/script.js",
"lineno": 42,
"colno": 21,
"function": "myFunction",
"in_app": true
}
]
}
}
]
}
},
"tracking": {
"events": [
{
"event": "page_view",
"url": "https://example.com",
"timestamp": "2024-08-28T14:25:43.511Z"
},
{
"event": "click",
"elementId": "buttonId",
"url": "https://example.com",
"timestamp": "2024-08-28T14:26:00.511Z"
},
{
"event": "form_submit",
"formId": "formId",
"url": "https://example.com",
"timestamp": "2024-08-28T14:26:30.511Z"
},
{
"event": "time_on_page",
"timeSpent": 120000,
"url": "https://example.com",
"timestamp": "2024-08-28T14:28:00.511Z"
}
]
},
"errors": [
{
"message": "Unhandled Promise Rejection: Error: Something went wrong",
"source": "https://example.com/promise.js",
"lineno": 10,
"colno": 15,
"errorObject": {
"name": "Error",
"message": "Something went wrong",
"stack": "Error: Something went wrong\n at promise.js:10:15"
}
},
{
"message": "Resource Load Error: https://example.com/image.png",
"source": "image.png",
"lineno": 0,
"colno": 0
}
],
"breadcrumbs": [
{
"category": "ui.click",
"message": "User clicked button",
"level": "info",
"timestamp": "2024-08-28T14:25:40.511Z"
},
{
"category": "navigation",
"message": "User navigated to /dashboard",
"level": "info",
"timestamp": "2024-08-28T14:25:43.511Z"
}
]
}
【中高级】有设计过大型实时数据流相关系统吗,具体怎么做?
我设计过前端监控平台数据上报、消息队列处理(Kafka)直至数据落库ClickHouse完整环节。 其中完整细节包括:
- 监控数据采集与上报
- 数据接收与消息队列(Kafka)机制
- 数据预处理(Flink/Spark)
- 数据存储(ClickHouse)
- 前端数据可视化与预警机制
前端监控数据上报
数据收集方式
在前端,我们需要采集用户行为、性能指标、异常日志等监控数据。这些数据可以通过以下方式收集:
- AJAX请求:通过fetch或XMLHttpRequest发送数据至后端。
- sendBeacon:用于在页面卸载或关闭时上报数据,确保数据不丢失。
- WebSocket:用于实时性要求较高的数据上报,保持前端和后端的长连接。
数据结构设计
上报的数据通常包含以下内容:
- 事件类型:如click、error、performance等。
- 时间戳:事件发生的时间,用于排序和分析。
- 用户信息:包括用户ID、会话ID、设备信息等。
- 页面信息:包括页面URL、页面加载时间等性能指标。
示例JSON数据结构
代码块
{
"event_type": "click",
"timestamp": "2024-08-28T14:25:43.511Z",
"user": {
"id": "user-id",
"session_id": "session-id",
"device": "device-info"
},
"page": {
"url": "https://example.com",
"referrer": "https://referrer.com"
},
"data": {
"element_id": "buttonId",
"x_position": 100,
"y_position": 200
}
}
数据上报到后端
前端通过上述方式收集数据后,将这些数据发送到后端的API接口或Kafka主题中。
代码块
fetch('https://your-server.com/api/monitoring', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(eventData)
});
数据接收与队列处理
接收层:Nginx或Node.js
后端API负责接收前端发送的监控数据,可以使用Nginx进行负载均衡和反向代理,将请求路由到多个Node.js服务器,确保系统的高可用性。
- Nginx反向代理:Nginx负责分发请求,支持高并发,适合大规模数据接入。
- Node.js服务器:Node.js提供灵活的API接口,处理接收到的数据。
数据队列:Kafka
接收到的数据通常直接推送到Kafka队列中进行缓冲处理,这样可以解耦数据接收与后续的处理逻辑。
Kafka的作用
- 高吞吐量:Kafka可以处理大量的监控数据,确保数据的实时性。
- 数据持久化:Kafka会将数据持久化到磁盘,确保在消费者消费数据之前数据不会丢失。
- 多消费者处理:Kafka支持多个消费者组,这允许我们对不同类型的数据进行并行处理。
Kafka主题与分区
- 主题:可以根据数据类型(如性能数据、错误数据、用户行为数据等)创建不同的Kafka主题,确保数据分类处理。
- 分区:Kafka的分区机制允许我们将同一主题的数据分布在多个分区上,提升处理性能。
代码块
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
const producer = new kafka.Producer(client);
const payloads = [
{ topic:'monitoring', messages: JSON.stringify(eventData), partition: 0 }
];
producer.send(payloads, function(err, data) {
console.log(data);
});
数据处理与预处理
Kafka Consumer
Kafka的消费者从队列中拉取数据,对数据进行清洗、过滤、聚合等预处理操作。
- 数据清洗:过滤掉无效或重复的数据,确保数据的准确性。
- 数据聚合:在消费过程中,可以对数据进行初步的聚合,如按用户ID聚合访问次数等。
- 实时处理:消费者可以将数据实时处理后推送到ClickHouse,也可以进一步推送到其他处理层(如Spark、Flink)进行更复杂的实时分析。
代码块
const Consumer = kafka.Consumer;
const consumer = new Consumer(client, [{ topic:'monitoring', partition: 0 }], {autoCommit: true});
consumer.on('message', function(message) {
const data = JSON.parse(message.value);
processData(data); // 数据预处理逻辑
});
数据处理层:流式处理
可以使用流式处理引擎(如Apache Flink或Spark Streaming)来处理和分析从Kafka消费的数据流。
- Apache Flink:Flink提供了低延迟的实时流处理,适合处理复杂的事件模式。
- Spark Streaming:适合批处理与流处理结合的场景,能够处理大量数据的同时进行复杂分析。
通过流式处理引擎,系统能够在数据进入存储之前执行复杂的数据分析或事件检测。
数据存储:ClickHouse
数据写入ClickHouse
处理后的数据最终会写入ClickHouse数据库中,以供查询和分析。
为什么选择ClickHouse?
- 高性能查询:ClickHouse采用列式存储和数据压缩技术,能够在处理大规模数据时提供高效的查询性能。
- 实时分析:ClickHouse支持实时数据插入和查询,适合实时监控系统的需求。
- 分布式架构:ClickHouse支持数据的分布式存储和查询,能够轻松扩展以处理更大规模的数据。
表结构设计
ClickHouse的表结构设计需要考虑到数据的查询需求和性能优化。常见的做法是根据数据的维度和指标设计表结构,并创建适当的分区和索引。
示例表结构
代码块
CREATE TABLE monitoring_data (
event_type String,
timestamp DateTime,
user_id String,
session_id String,
device String,
url String,
referrer String,
element_id String,
x_position Int32,
y_position Int32
) ENGINE = MergeTree()
ORDER BY (timestamp, event_type)
PARTITION BY toYYYYMM(timestamp)
- ORDER BY:用于加速查询,特别是在时间范围查询和按事件类型查询时性能提升显著。
- PARTITION BY:按月分区(或其他时间周期),以优化存储和查询性能。
数据插入操作
数据可以通过Kafka消费者或者直接从流式处理引擎推送到ClickHouse,进行批量插入操作,确保系统的写入性能。
代码块
import clickhouse_driver
client = clickhouse_driver.Client('localhost')
client.execute('INSERT INTO monitoring_data (event_type, timestamp, user_id, session_id, device, url, referrer, element_id, x_position, y_position) VALUES', data_batch)
数据查询与分析
存储在ClickHouse的数据可以通过SQL查询进行分析,前端可以通过REST API或者直接使用ClickHouse提供的HTTP接口进行数据获取。
代码块
SELECT
event_type,
COUNT(*) AS event_count
FROM monitoring_data
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY event_count DESC
前端数据展示与报警
前端展示
前端可以通过调用REST API或者直接查询ClickHouse数据库,获取需要展示的数据,并使用图表库(如ECharts或D3.js)进行可视化展示。
- 实时监控面板:通过WebSocket或定时刷新机制,实时展示系统的性能指标、错误率等关键数据。
- 历史数据分析:通过查询ClickHouse的历史数据,生成趋势图、分布图等,帮助分析系统的长期表现。
报警机制
在数据分析的基础上,可以设置阈值或检测异常模式,当数据超出预设范围时,触发报警。
- 定时任务:可以定期查询ClickHouse数据库,检测是否有指标超出预警范围。
- 实时报警:通过流式处理系统(如Flink)实时检测异常,直接触发报警(如发送邮件或消息通知)。
【专家级全栈】详细说明监控平台方案设计与落地,项目实践
整体架构设计
架构概述
监控平台的整体架构分为以下几个主要模块:
- 前端监控数据采集:前端应用采集用户行为、性能指标、错误日志等数据,并通过多种方式上报至后端。
- 服务端数据接收与处理:服务端使用NestJS作为核心框架,接收并处理前端上报的数据,同时将数据推送至Kafka进行进一步处理。
- 数据队列与流处理:通过Kafka进行数据缓冲和流式处理,确保数据处理的实时性和高吞吐量。
- 数据存储与分析:处理后的数据最终存储在ClickHouse中,用于实时查询和分析。
- 前端展示与报警:通过可视化界面展示监控数据,并在必要时触发报警机制。
技术选型
- 前端:使用JavaScript/TypeScript编写的SDK,负责采集和上报监控数据。
- 后端:NestJS框架,使用TypeScript编写,提供API接口和数据处理逻辑。
- 数据队列:Kafka,用于高并发数据的缓冲和传输。
- 数据存储:ClickHouse,用于存储和快速查询大规模监控数据。
- 流处理:Apache Flink或Spark Streaming,用于实时数据处理和分析。
前端接入
前端SDK设计
为了让前端应用能够无缝接入监控平台,需要设计一个前端SDK。这个SDK应该具备以下功能:
- 自动采集性能指标:包括页面加载时间、FCP、LCP、TTI等。
- 捕获JavaScript错误:捕获运行时错误和未处理的Promise拒绝。
- 用户行为追踪:如点击、表单提交、页面停留时间等。
- 数据上报:通过fetch、sendBeacon或WebSocket将数据上报到后端。
SDK实现示例
代码块
class MonitoringSDK {
private endpoint: string;
constructor(endpoint: string) {
this.endpoint = endpoint;
this.init();
}
private init() {
this.collectPerformanceData();
this.setupErrorHandling();
}
private collectPerformanceData() {
window.addEventListener('load', () => {
const performanceData = {
event_type: 'performance',
timestamp: new Date().toISOString(),
data: {
pageLoadTime: performance.timing.loadEventEnd - performance.timing.navigationStart,
fcp: performance.getEntriesByName('first-contentful-paint')[0]?.startTime || null,
lcp: performance.getEntriesByType('largest-contentful-paint')[0]?.startTime || null,
tti: performance.timing.domInteractive - performance.timing.navigationStart
}
};
this.sendData(performanceData);
});
}
private setupErrorHandling() {
window.onerror = (message, source, lineno, colno, error) => {
const errorData = {
event_type: 'error',
timestamp: new Date().toISOString(),
data: {
message,
source,
lineno,
colno,
stack: error?.stack || ''
}
};
this.sendData(errorData);
};
window.addEventListener('unhandledrejection', (event) => {
const rejectionData = {
event_type: 'unhandledrejection',
timestamp: new Date().toISOString(),
data: {
reason: event.reason,
stack: event.reason?.stack || ''
}
};
this.sendData(rejectionData);
});
}
private sendData(data: any) {
navigator.sendBeacon(this.endpoint, JSON.stringify(data));
}
}
// 使用示例
const monitoring = new MonitoringSDK('https://your-server.com/api/monitoring');
前端应用集成
将SDK集成到前端应用中非常简单。开发者只需在应用的入口文件(如index.js或main.ts)中引入并初始化SDK。
代码块
import MonitoringSDK from './monitoring-sdk';
// 初始化SDK
const monitoring = new MonitoringSDK('https://your-server.com/api/monitoring');
集成后,前端应用在用户访问页面时会自动开始采集和上报数据,而开发者几乎不需要做额外的工作。
服务端实现(NestJS)
NestJS服务端架构
NestJS是一个基于Node.js的现代Web框架,采用模块化设计,非常适合构建复杂的企业级应用。在监控平台的服务端实现中,我们将NestJS用于以下几个关键部分:
- API接口:提供数据接收接口,处理前端上报的数据。
- 数据处理与转发:处理并清洗数据,然后推送至Kafka进行异步处理。
- 与数据库的交互:在必要时直接与ClickHouse进行交互,执行数据查询。
实现数据接收与处理
数据接收模块
首先,我们定义一个控制器(Controller),用于接收前端上报的数据。
代码块
import { Controller, Post, Body } from '@nestjs/common';
import { MonitoringService } from './monitoring.service';
@Controller('monitoring')
export class MonitoringController {
constructor(private readonly monitoringService: MonitoringService) {}
@Post()
async receiveData(@Body() data: any) {
// 将数据传递给服务层处理
await this.monitoringService.processData(data);
return { status:'success' };
}
}
数据处理模块
在服务层,我们对接收到的数据进行处理,并将其推送至Kafka。
代码块
import { Injectable } from '@nestjs/common';
import { Kafka } from 'kafkajs';
@Injectable()
export class MonitoringService {
private kafkaProducer;
constructor() {
const kafka = new Kafka({
clientId:'monitoring-service',
brokers: ['kafka:9092']
});
this.kafkaProducer = kafka.producer();
this.kafkaProducer.connect();
}
async processData(data: any) {
// 清洗数据或添加额外信息
const enrichedData = {
...data,
receivedAt: new Date().toISOString(),
environment: process.env.NODE_ENV
};
// 推送数据到Kafka主题
await this.kafkaProducer.send({
topic:'monitoring-events',
messages: [{ value: JSON.stringify(enrichedData) }]
});
}
}
在这个模块中,我们使用了kafkajs来与Kafka交互。数据在处理后会推送到Kafka主题monitoring-events,供后续的流式处理或直接存储到ClickHouse。
与ClickHouse的交互
在某些情况下,我们可能需要服务端直接与ClickHouse交互,执行复杂的查询或插入操作。我们可以使用clickhouse-driver与ClickHouse进行通信。
代码块
import { Injectable } from '@nestjs/common';
import { Client } from 'clickhouse';
@Injectable()
export class ClickhouseService {
private client;
constructor() {
this.client = new Client({
url: 'http://clickhouse-server:8123',
basicAuth: {
username: 'default',
password: ''
},
debug: false
});
}
async insertData(data: any) {
const query = `
INSERT INTO monitoring_data (event_type, timestamp, user_id, session_id, device, url, referrer, element_id, x_position, y_position)
VALUES?
`;
await this.client.insert(query, [data]);
}
async queryData() {
const query = `
SELECT event_type, COUNT(*) AS event_count
FROM monitoring_data
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY event_count DESC
`;
return await this.client.query(query).toPromise();
}
}
这个服务可以根据需要被嵌入到不同的业务逻辑中,实现复杂的数据处理和查询。
数据流处理与分析
Kafka消费者
在数据进入Kafka后,我们需要通过Kafka的消费者从队列中拉取数据,并对其进行实时处理。可以使用Apache Flink或Spark Streaming来处理这些数据。
代码块
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Kafka } from 'kafkajs';
@Injectable()
export class KafkaConsumerService implements OnModuleInit {
private kafkaConsumer;
constructor() {
const kafka = new Kafka({
clientId:'monitoring-consumer',
brokers: ['kafka:9092']
});
this.kafkaConsumer = kafka.consumer({ groupId:'monitoring-group' });
}
async onModuleInit() {
await this.kafkaConsumer.connect();
await this.kafkaConsumer.subscribe({ topic:'monitoring-events', fromBeginning: true });
this.kafkaConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
const eventData = JSON.parse(message.value.toString());
// 在这里处理数据,例如推送到ClickHouse或进行实时分析
await this.processEventData(eventData);
}
});
}
async processEventData(data: any) {
// 在这里进行数据的清洗、聚合等处理
console.log(`Processing event: ${data.event_type}`);
// 将数据推送到ClickHouse
// this.clickhouseService.insertData(data);
}
}
前端展示与报警
实时监控面板
前端可以通过调用REST API或直接查询ClickHouse数据库,获取实时监控数据,并使用图表库(如ECharts)进行可视化展示。
代码块
import axios from 'axios';
import { useEffect, useState } from'react';
function MonitoringDashboard() {
const [data, setData] = useState([]);
useEffect(() => {
const fetchData = async () => {
const response = await axios.get('https://your-server.com/api/monitoring/dashboard');
setData(response.data);
};
fetchData();
}, []);
return (
<div>
{/* 使用ECharts或其他图表库展示数据 */}
</div>
);
}
报警机制
在系统中,可以根据实时监控数据设置报警规则,当某个指标超出预设的阈值时,触发报警机制(如发送邮件或消息通知)。
代码块
// 定期任务或实时检测
function checkForAlerts(data) {
if (data.errorCount > 100) {
sendAlert('High error rate detected!');
}
}
function sendAlert(message) {
axios.post('https://alert-service.com/api/send', {
message,
recipients: ['admin@example.com']
});
}