大数据实时处理架构:从Flink到Kafka的流式计算完整解决方案

Kevin179
Kevin179 2026-02-05T12:18:05+08:00
0 0 2

引言

在当今数据驱动的时代,实时处理能力已成为企业竞争力的重要组成部分。传统的批处理模式已无法满足现代业务对低延迟、高吞吐量的需求。Apache Flink和Apache Kafka作为大数据流处理领域的两大核心组件,为构建高效的实时数据处理平台提供了完整的解决方案。

本文将深入探讨基于Flink和Kafka的实时处理架构设计,从基础概念到实际应用,涵盖流处理引擎、消息队列集成、状态管理、窗口计算等核心技术,并提供详细的实践指南。

1. 流式计算基础概念

1.1 什么是流式计算

流式计算(Streaming Computing)是一种数据处理范式,它将数据视为连续不断的数据流进行实时处理。与传统的批处理不同,流式计算能够以极低的延迟处理数据,通常在毫秒到秒级的时间内完成数据处理和响应。

在流式计算中,数据被视为无限的、持续到达的数据序列,系统需要不断地从数据源读取数据,进行实时处理,并将结果输出。这种模式特别适用于需要快速响应的场景,如实时推荐、风险控制、监控告警等。

1.2 流式计算与批处理的区别

特性 批处理 流式计算
数据处理方式 批量处理 持续处理
延迟 秒级到分钟级 毫秒级到秒级
数据量 大批量数据 持续小批量数据
适用场景 定期报表、离线分析 实时监控、实时推荐

1.3 Flink与Kafka的核心价值

Apache Flink作为新一代流处理引擎,具有以下核心优势:

  • 低延迟处理:毫秒级延迟保证
  • 精确一次语义:确保数据处理的准确性
  • 状态管理:内置分布式状态管理机制
  • 窗口计算:支持多种窗口类型和聚合操作

Apache Kafka作为分布式流处理平台,提供:

  • 高吞吐量消息传递:每秒百万级消息处理能力
  • 持久化存储:数据持久化保证
  • 水平扩展:支持大规模集群部署
  • 生态系统集成:与多种大数据工具良好集成

2. Flink流处理引擎详解

2.1 Flink架构设计

Flink采用分布式计算架构,主要包含以下几个核心组件:

// Flink基本架构示例
public class FlinkArchitecture {
    // JobManager - 负责任务调度和资源管理
    private JobManager jobManager;
    
    // TaskManager - 负责实际的任务执行
    private TaskManager taskManager;
    
    // ResourceManager - 负责资源分配和管理
    private ResourceManager resourceManager;
    
    // CheckpointCoordinator - 负责检查点协调
    private CheckpointCoordinator checkpointCoordinator;
}

2.2 数据流处理模型

Flink采用基于数据流的处理模型,将计算过程抽象为一系列的转换操作:

// Flink数据流处理示例
public class StreamProcessingExample {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 从Kafka读取数据
        DataStream<String> kafkaStream = env.addSource(
            new FlinkKafkaConsumer<>("input-topic", 
                new SimpleStringSchema(), 
                properties)
        );
        
        // 数据处理转换
        DataStream<ProcessedEvent> processedStream = kafkaStream
            .map(new EventMapper())           // 映射转换
            .filter(new EventFilter())        // 过滤转换
            .keyBy(new KeySelector())         // 分组转换
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))  // 窗口转换
            .aggregate(new AggregationFunction());  // 聚合转换
        
        // 输出结果
        processedStream.addSink(new ResultSink());
        
        env.execute("Real-time Processing Job");
    }
}

2.3 状态管理机制

Flink内置了强大的状态管理机制,支持多种状态后端:

// Flink状态管理示例
public class StateManagementExample extends RichFlatMapFunction<String, String> {
    
    // 声明状态变量
    private transient ValueState<Integer> countState;
    private transient MapState<String, Integer> mapState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // 定义状态描述符
        ValueStateDescriptor<Integer> countDescriptor = 
            new ValueStateDescriptor<>("count", Integer.class);
        countState = getRuntimeContext().getState(countDescriptor);
        
        MapStateDescriptor<String, Integer> mapDescriptor = 
            new MapStateDescriptor<>("map-state", String.class, Integer.class);
        mapState = getRuntimeContext().getMapState(mapDescriptor);
    }
    
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        // 使用状态变量进行处理
        Integer currentCount = countState.value();
        if (currentCount == null) {
            currentCount = 0;
        }
        currentCount++;
        countState.update(currentCount);
        
        out.collect("Processed: " + value + ", Count: " + currentCount);
    }
}

3. Kafka集成与消息处理

3.1 Kafka架构与核心概念

Kafka采用分布式发布-订阅消息系统,主要包含以下核心组件:

// Kafka基本架构配置
public class KafkaConfiguration {
    // 配置生产者
    public Properties producerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");
        props.put("retries", 3);
        return props;
    }
    
    // 配置消费者
    public Properties consumerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "stream-processing-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        return props;
    }
}

3.2 Kafka与Flink集成

Flink提供了丰富的Kafka连接器,支持从Kafka读取数据和向Kafka写入数据:

// Flink Kafka集成示例
public class KafkaIntegrationExample {
    
    public static void readFromKafka() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 配置Kafka消费者属性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-consumer-group");
        
        // 创建Kafka源
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
            "input-topic",
            new SimpleStringSchema(),
            properties
        );
        
        // 添加到数据流
        DataStream<String> kafkaStream = env.addSource(kafkaSource);
        
        // 处理数据
        kafkaStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return "Processed: " + value;
            }
        }).print();
        
        env.execute("Kafka Integration Job");
    }
    
    public static void writeToKafka() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 配置Kafka生产者属性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // 创建数据流
        DataStream<String> dataStream = env.fromElements("message1", "message2", "message3");
        
        // 写入Kafka
        dataStream.addSink(new FlinkKafkaProducer<>(
            "output-topic",
            new SimpleStringSchema(),
            properties
        ));
        
        env.execute("Kafka Sink Job");
    }
}

3.3 消息序列化与反序列化

在流处理中,正确处理消息的序列化和反序列化至关重要:

// 自定义序列化器示例
public class CustomSerializerExample {
    
    // 自定义JSON序列化器
    public static class JsonSerializationSchema implements SerializationSchema<CustomEvent> {
        private final ObjectMapper objectMapper = new ObjectMapper();
        
        @Override
        public byte[] serialize(CustomEvent element) {
            try {
                return objectMapper.writeValueAsBytes(element);
            } catch (Exception e) {
                throw new RuntimeException("Failed to serialize event", e);
            }
        }
    }
    
    // 自定义反序列化器
    public static class JsonDeserializationSchema implements DeserializationSchema<CustomEvent> {
        private final ObjectMapper objectMapper = new ObjectMapper();
        
        @Override
        public CustomEvent deserialize(byte[] message) throws Exception {
            return objectMapper.readValue(message, CustomEvent.class);
        }
    }
    
    // 使用自定义序列化器的示例
    public static void useCustomSerializer() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        
        FlinkKafkaConsumer<CustomEvent> kafkaSource = new FlinkKafkaConsumer<>(
            "event-topic",
            new JsonDeserializationSchema(),
            properties
        );
        
        DataStream<CustomEvent> eventStream = env.addSource(kafkaSource);
        
        // 处理事件
        eventStream.map(event -> {
            return "Processed: " + event.getEventType() + 
                   ", Timestamp: " + event.getTimestamp();
        }).print();
    }
}

4. 状态管理与容错机制

4.1 状态后端类型

Flink支持多种状态后端,每种都有不同的性能特点和适用场景:

// 状态后端配置示例
public class StateBackendConfiguration {
    
    public void configureMemoryStateBackend() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 内存状态后端 - 适用于测试环境
        env.setStateBackend(new MemoryStateBackend());
    }
    
    public void configureFsStateBackend() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 文件系统状态后端 - 适用于生产环境
        try {
            env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink/checkpoints"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void configureRocksDBStateBackend() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // RocksDB状态后端 - 适用于大规模状态处理
        try {
            env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/flink/checkpoints"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.2 检查点机制

Flink的检查点机制确保了容错能力:

// 检查点配置示例
public class CheckpointConfiguration {
    
    public void configureCheckpointing() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 配置检查点
        env.enableCheckpointing(5000); // 5秒一次检查点
        
        // 检查点配置选项
        env.getCheckpointConfig()
           .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 精确一次语义
           .setMinPauseBetweenCheckpoints(500)                  // 最小检查点间隔
           .setCheckpointTimeout(60000)                         // 检查点超时时间
           .setMaxConcurrentCheckpoints(1)                      // 并发检查点数
           .enableExternalizedCheckpoints(
               ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
           ); // 取消作业时保留检查点
    }
}

4.3 状态管理最佳实践

// 状态管理最佳实践示例
public class StateManagementBestPractices {
    
    public static class EfficientStateFunction extends RichMapFunction<String, String> {
        private transient ValueState<Integer> countState;
        private transient ListState<String> listState;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            
            // 优化状态声明 - 使用适当的类型
            ValueStateDescriptor<Integer> countDescriptor = 
                new ValueStateDescriptor<>("count", Integer.class, 0);
            countState = getRuntimeContext().getState(countDescriptor);
            
            // 合理使用集合状态
            ListStateDescriptor<String> listDescriptor = 
                new ListStateDescriptor<>("events", String.class);
            listState = getRuntimeContext().getListState(listDescriptor);
        }
        
        @Override
        public String map(String value) throws Exception {
            // 原子性操作 - 状态更新
            Integer currentCount = countState.value();
            if (currentCount == null) {
                currentCount = 0;
            }
            currentCount++;
            countState.update(currentCount);
            
            // 添加到列表状态
            listState.add(value);
            
            return "Count: " + currentCount + ", Event: " + value;
        }
    }
    
    // 状态清理策略
    public static class StateCleanupExample extends RichFunction {
        
        @Override
        public void close() throws Exception {
            super.close();
            // 在函数关闭时清理资源
            // 注意:状态会自动管理,这里主要处理其他资源
        }
    }
}

5. 窗口计算与聚合操作

5.1 窗口类型详解

Flink支持多种窗口类型,每种都有特定的使用场景:

// 窗口计算示例
public class WindowingExamples {
    
    public static void tumblingWindowExample() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 滚动窗口 - 固定大小,无重叠
        DataStream<String> dataStream = env.fromElements("event1", "event2", "event3");
        
        dataStream.keyBy(new KeySelector<String, String>() {
                @Override
                public String getKey(String value) throws Exception {
                    return "key";
                }
            })
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))  // 5分钟滚动窗口
            .sum(1);  // 聚合操作
    }
    
    public static void slidingWindowExample() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 滑动窗口 - 固定大小,有重叠
        DataStream<String> dataStream = env.fromElements("event1", "event2", "event3");
        
        dataStream.keyBy(new KeySelector<String, String>() {
                @Override
                public String getKey(String value) throws Exception {
                    return "key";
                }
            })
            .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))  // 10分钟窗口,5分钟滑动
            .sum(1);
    }
    
    public static void sessionWindowExample() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 会话窗口 - 基于活动的窗口
        DataStream<String> dataStream = env.fromElements("event1", "event2", "event3");
        
        dataStream.keyBy(new KeySelector<String, String>() {
                @Override
                public String getKey(String value) throws Exception {
                    return "key";
                }
            })
            .window(EventTimeSessionWindows.withGap(Time.minutes(5)))  // 5分钟会话间隔
            .sum(1);
    }
}

5.2 聚合函数实现

// 自定义聚合函数示例
public class AggregationFunctionExamples {
    
    // 简单聚合函数
    public static class SimpleAggregationFunction 
        extends AggregateFunction<String, Integer, Integer> {
        
        @Override
        public Integer createAccumulator() {
            return 0;
        }
        
        @Override
        public Integer add(String value, Integer accumulator) {
            return accumulator + 1;  // 简单计数
        }
        
        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }
        
        @Override
        public Integer merge(Integer a, Integer b) {
            return a + b;
        }
    }
    
    // 复杂聚合函数
    public static class ComplexAggregationFunction 
        extends AggregateFunction<TradeEvent, TradeSummary, TradeSummary> {
        
        @Override
        public TradeSummary createAccumulator() {
            return new TradeSummary();
        }
        
        @Override
        public TradeSummary add(TradeEvent value, TradeSummary accumulator) {
            accumulator.setCount(accumulator.getCount() + 1);
            accumulator.setTotalAmount(accumulator.getTotalAmount() + value.getAmount());
            accumulator.setAvgAmount(accumulator.getTotalAmount() / accumulator.getCount());
            
            // 更新最大值和最小值
            if (value.getAmount() > accumulator.getMaxAmount()) {
                accumulator.setMaxAmount(value.getAmount());
            }
            if (value.getAmount() < accumulator.getMinAmount()) {
                accumulator.setMinAmount(value.getAmount());
            }
            
            return accumulator;
        }
        
        @Override
        public TradeSummary getResult(TradeSummary accumulator) {
            return accumulator;
        }
        
        @Override
        public TradeSummary merge(TradeSummary a, TradeSummary b) {
            TradeSummary merged = new TradeSummary();
            merged.setCount(a.getCount() + b.getCount());
            merged.setTotalAmount(a.getTotalAmount() + b.getTotalAmount());
            merged.setAvgAmount(merged.getTotalAmount() / merged.getCount());
            merged.setMaxAmount(Math.max(a.getMaxAmount(), b.getMaxAmount()));
            merged.setMinAmount(Math.min(a.getMinAmount(), b.getMinAmount()));
            return merged;
        }
    }
    
    // 使用聚合函数
    public static void useAggregationFunction() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<TradeEvent> tradeStream = env.fromElements(
            new TradeEvent("BTC", 1000.0),
            new TradeEvent("BTC", 2000.0),
            new TradeEvent("ETH", 500.0)
        );
        
        // 使用自定义聚合函数
        DataStream<TradeSummary> summaryStream = tradeStream
            .keyBy(new KeySelector<TradeEvent, String>() {
                @Override
                public String getKey(TradeEvent value) throws Exception {
                    return value.getCurrency();
                }
            })
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .aggregate(new ComplexAggregationFunction());
        
        summaryStream.print();
    }
}

5.3 窗口处理与时间语义

// 时间语义和窗口处理示例
public class TimeSemanticExample {
    
    public static void processTimeExample() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 设置时间语义为事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        DataStream<String> kafkaStream = env.addSource(
            new FlinkKafkaConsumer<>("input-topic", 
                new SimpleStringSchema(), 
                getKafkaProperties())
        );
        
        // 从字符串解析时间戳
        SingleOutputStreamOperator<ProcessedEvent> eventStream = kafkaStream
            .map(new MapFunction<String, ProcessedEvent>() {
                @Override
                public ProcessedEvent map(String value) throws Exception {
                    // 解析JSON并提取时间戳
                    ObjectMapper mapper = new ObjectMapper();
                    JsonNode node = mapper.readTree(value);
                    long timestamp = node.get("timestamp").asLong();
                    
                    return new ProcessedEvent(
                        node.get("id").asText(),
                        timestamp,
                        node.get("value").asDouble()
                    );
                }
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProcessedEvent>(Time.seconds(5)) {
                @Override
                public long extractTimestamp(ProcessedEvent element) {
                    return element.getTimestamp();
                }
            });
        
        // 基于事件时间的窗口计算
        eventStream.keyBy(new KeySelector<ProcessedEvent, String>() {
                @Override
                public String getKey(ProcessedEvent value) throws Exception {
                    return value.getId();
                }
            })
            .window(TumblingEventTimeWindows.of(Time.minutes(10)))
            .sum("value")
            .print();
    }
}

6. 实际应用案例与最佳实践

6.1 实时监控系统设计

// 实时监控系统示例
public class RealTimeMonitoringSystem {
    
    public static class LogEvent {
        private String userId;
        private String action;
        private long timestamp;
        private double value;
        
        // 构造函数、getter、setter
        public LogEvent() {}
        
        public LogEvent(String userId, String action, long timestamp, double value) {
            this.userId = userId;
            this.action = action;
            this.timestamp = timestamp;
            this.value = value;
        }
        
        // getters and setters...
    }
    
    public static class AlertRule {
        private String metricName;
        private double threshold;
        private int windowSizeMinutes;
        
        // 构造函数、getter、setter
        public AlertRule() {}
        
        public AlertRule(String metricName, double threshold, int windowSizeMinutes) {
            this.metricName = metricName;
            this.threshold = threshold;
            this.windowSizeMinutes = windowSizeMinutes;
        }
        
        // getters and setters...
    }
    
    public static void buildMonitoringPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 配置检查点
        env.enableCheckpointing(30000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 从Kafka读取日志数据
        DataStream<LogEvent> logStream = env.addSource(
            new FlinkKafkaConsumer<>("log-topic", 
                new JsonDeserializationSchema(), 
                getKafkaProperties())
        );
        
        // 计算各种指标
        DataStream<MetricResult> metricsStream = logStream
            .keyBy(new KeySelector<LogEvent, String>() {
                @Override
                public String getKey(LogEvent value) throws Exception {
                    return value.getAction();
                }
            })
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new AggregateFunction<LogEvent, MetricAccumulator, MetricResult>() {
                
                @Override
                public MetricAccumulator createAccumulator() {
                    return new MetricAccumulator();
                }
                
                @Override
                public MetricAccumulator add(LogEvent value, MetricAccumulator accumulator) {
                    accumulator.setCount(accumulator.getCount() + 1);
                    accumulator.setSum(accumulator.getSum() + value.getValue());
                    accumulator.setAvg(accumulator.getSum() / accumulator.getCount());
                    
                    if (value.getValue() > accumulator.getMax()) {
                        accumulator.setMax(value.getValue());
                    }
                    if (value.getValue() < accumulator.getMin()) {
                        accumulator.setMin(value.getValue());
                    }
                    
                    return accumulator;
                }
                
                @Override
                public MetricResult getResult(MetricAccumulator accumulator) {
                    return new MetricResult(
                        "action_metric",
                        accumulator.getCount(),
                        accumulator.getSum(),
                        accumulator.getAvg(),
                        accumulator.getMax(),
                        accumulator.getMin()
                    );
                }
                
                @Override
                public MetricAccumulator merge(MetricAccumulator a, MetricAccumulator b) {
                    MetricAccumulator merged = new MetricAccumulator();
                    merged.setCount(a.getCount() + b.getCount());
                    merged.setSum(a.getSum() + b.getSum());
                    merged.setAvg(merged.getSum() / merged.getCount());
                    merged.setMax(Math.max(a.getMax(), b.getMax()));
                    merged.setMin(Math.min(a.getMin(), b.getMin()));
                    return merged;
                }
            });
        
        // 规则检查和告警
        DataStream<AlertEvent> alertStream = metricsStream
            .map(new MapFunction<MetricResult, AlertEvent>() {
                @Override
                public AlertEvent map(MetricResult result) throws Exception {
                    // 检查是否需要触发告警
                    if (result.getAvg() > 1000.0) {  // 阈值检查
                        return new AlertEvent(
                            "HIGH_AVERAGE",
                            result.getMetricName(),
                            result.getAvg(),
                            System.currentTimeMillis()
                        );
                    }
                    return null;
                }
            })
            .filter(Objects::nonNull);
        
        // 输出告警到Kafka
        alertStream.addSink(new FlinkKafkaProducer<>(
            "alert-topic",
            new JsonSerializationSchema(),
            getKafkaProperties()
        ));
        
        env.execute("Real-time Monitoring System");
    }
    
    // 辅助类定义
    public static class MetricAccumulator {
        private long count = 0;
        private double sum = 0.0;
        private double avg = 0.0;
        private double max = Double.MIN_VALUE;
        private double min = Double.MAX_VALUE;
        
        // getters and setters...
    }
    
    public static class MetricResult {
        private String metricName;
        private long count;
        private double sum;
        private double avg;
        private double max;
        private double min;
        
        public MetricResult() {}
        
        public MetricResult(String metricName, long count, double sum, double avg, double max, double min) {
            this.metricName = metricName;
            this.count = count;
            this.sum = sum;
            this.avg = avg;
            this.max = max;
            this.min = min;
        }
        
        // getters and setters...
    }
    
    public static class AlertEvent {
        private String alertType;
        private String metricName;
        private double value;
        private long timestamp;
        
        public AlertEvent() {}
        
        public AlertEvent(String alertType, String metricName, double value, long timestamp) {
            this.alertType = alertType;
            this.metricName = metricName;
            this.value = value;
            this.timestamp = timestamp;
        }
        
        // getters and setters...
    }
}

6.2 实时推荐系统设计

// 实时推荐系统示例
public class RealTimeRecommendationSystem {
    
    public static class UserAction {
        private String userId;
        private String itemId;
        private String actionType;  // view, click, purchase
        private long timestamp;
        
        // 构造函数、getter、setter
        public UserAction() {}
        
        public UserAction(String userId, String itemId, String actionType, long timestamp) {
            this.userId = userId;
            this.itemId = itemId;
            this.actionType = actionType;
            this.timestamp = timestamp;
        }
        
        // getters and setters...
    }
    
    public static class RecommendationResult
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000