flume接收kafka数据存储到hdfs
发布日期:2021-04-30 21:06:29 浏览次数:97 分类:精选文章

本文共 4094 字,大约阅读时间需要 13 分钟。

Flume介绍

Flume是一个分布式、可靠且高可用的海量日志聚合系统,支持定制数据发送方用于收集数据,同时提供数据处理和写入不同数据接受方的能力。

设计目标

可靠性

Flume提供了三种可靠性保障级别,从强到弱分别为:

  • end-to-end:事件发送成功后才删除数据,失败时重新发送。
  • Store on failure:数据发送失败时写入本地存储,待接收方恢复后再发送。
  • Best effort:数据发送后不进行确认,可能丢失。
  • 可扩展性

    Flume采用三层架构:agentcollectorstorage,每一层均可水平扩展。多个master节点通过ZooKeeper管理,避免单点故障。

    可管理性

    所有agent和collector由master统一管理,便于维护。多master情况下,利用ZooKeeper和gossip保证动态配置的一致性。用户可在master上查看数据源情况,支持动态配置和加载,提供Web界面和shell命令进行管理。

    功能可扩展性

    用户可根据需求添加自定义agent、collector或storage,Flume还提供多种组件,如文件、syslog等源,以及HDFS等存储。

    通常实时系统所选组件

  • 数据采集:使用Cloudera Flume。
  • 数据接入:添加消息中间件如Apache Kafka作为缓冲。
  • 流式计算:选用Apache Storm进行实时分析。
  • 数据输出:暂定使用MySQL,模块化后可切换其他存储。
  • Flume核心概念

    Flume由agentcollectorstorage组成,通过三层架构支持数据收集、处理和存储。

    Flume整体构成图

    Flume采用三层架构,agent负责数据采集,collector负责聚合和转发,storage负责持久化存储。

    实例应用

    4.1 Flume接收数据到指定文件

    Flume配置

    # Flume配置示例
    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    a1.channels.c1.type = memory
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 41414
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.sink.directory = /var/flume_log

    启动类

    package com.bigdata.flume;
    public class App {
    public static void main(String[] args) {
    MyRpcClientFacade client = new MyRpcClientFacade();
    client.init("node6.sdp.cn", 41414);
    for (int i = 0; i < 10; i++) {
    client.sendDataToFlume("Hello Flume!");
    }
    client.cleanUp();
    }
    }

    发送数据类

    package com.bigdata.flume;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    public class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;
    public void init(String hostname, int port) {
    this.hostname = hostname;
    this.port = port;
    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    }
    public void sendDataToFlume(String data) {
    Event event = EventBuilder.withBody(data, "UTF-8");
    try {
    client.append(event);
    } catch (EventDeliveryException e) {
    client.close();
    client = null;
    client = RpcClientFactory.getDefaultInstance(hostname, port);
    }
    }
    public void cleanUp() {
    client.close();
    }
    }

    Maven依赖

    org.apache.logging.log4j
    log4j-core
    2.1
    org.apache.flume
    flume-ng-core
    1.5.2

    创建存储目录

    mkdir flume_log
    chown flume:hadoop ./flume_log
    cd flume_log

    启动App.java后,查看flume_log目录中的文件。

    4.2 Flume接收Kafka数据到HDFS

    Flume配置

    # Flume配置示例
    agent.sources = kafkaSource
    agent.channels = memoryChannel
    agent.sinks = hdfsSink
    agent.sources.kafkaSource.channels = memoryChannel
    agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSource.zookeeperConnect = node3.sdp.cn:2181,node4.sdp.cn:2181,node5.sdp.cn:2181,node6.sdp.cn:2181
    agent.sources.kafkaSource.topic = applog
    agent.sources.kafkaSource.groupId = flume
    agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 10000
    agent.channels.memoryChannel.transactionCapacity = 1000
    agent.sinks.hdfsSink.type = hdfs
    agent.sinks.hdfsSink.hdfs.path = hdfs://node1.sdp.cn:8020/tmp/applogs/%{logApp}-%Y%m%d
    agent.sinks.hdfsSink.hdfs.writeFormat = Text
    agent.sinks.hdfsSink.hdfs.fileType = DataStream
    agent.sinks.hdfsSink.hdfs.rollSize = 0
    agent.sinks.hdfsSink.hdfs.rollCount = 0
    agent.sinks.hdfsSink.hdfs.rollInterval = 600
    agent.sinks.hdfsSink.hdfs.filePrefix = applog
    agent.sinks.hdfsSink.hdfs.fileSuffix = .log
    agent.sinks.hdfsSink.hdfs.inUserPrefix = _
    agent.sinks.hdfsSink.hdfs.inUserSuffix = _
    agent.sinks.hdfsSink.hdfs.interceptors = i1
    agent.sources.kafkaSource.interceptors.i1.type = org.bigdata.flume.LogInterceptor$Builder

    查看HDFS执行结果

    hdfs dfs -ls /tmp/applogs

    日志文件内容示例

    applog_2023-02-11_00-00-00.log
    applog_2023-02-11_00-00-01.log
    ...

    Flume可靠性和扩展性使其适合处理海量日志数据,支持实时流式计算和持久化存储。

    上一篇:JavaWeb学习笔记(6)__Servlet篇
    下一篇:所有文件的路径都在一个配置文件中的处理

    发表评论

    最新留言

    能坚持,总会有不一样的收获!
    [***.219.124.196]2026年06月03日 23时47分39秒