flume接收kafka数据存储到hdfs
end-to-end:事件发送成功后才删除数据,失败时重新发送。 Store on failure:数据发送失败时写入本地存储,待接收方恢复后再发送。 Best effort:数据发送后不进行确认,可能丢失。 数据采集:使用Cloudera Flume。 数据接入:添加消息中间件如Apache Kafka作为缓冲。 流式计算:选用Apache Storm进行实时分析。 数据输出:暂定使用MySQL,模块化后可切换其他存储。
发布日期:2021-04-30 21:06:29
浏览次数:97
分类:精选文章
本文共 4094 字,大约阅读时间需要 13 分钟。
Flume介绍
Flume是一个分布式、可靠且高可用的海量日志聚合系统,支持定制数据发送方用于收集数据,同时提供数据处理和写入不同数据接受方的能力。
设计目标
可靠性
Flume提供了三种可靠性保障级别,从强到弱分别为:
可扩展性
Flume采用三层架构:agent、collector和storage,每一层均可水平扩展。多个master节点通过ZooKeeper管理,避免单点故障。
可管理性
所有agent和collector由master统一管理,便于维护。多master情况下,利用ZooKeeper和gossip保证动态配置的一致性。用户可在master上查看数据源情况,支持动态配置和加载,提供Web界面和shell命令进行管理。
功能可扩展性
用户可根据需求添加自定义agent、collector或storage,Flume还提供多种组件,如文件、syslog等源,以及HDFS等存储。
通常实时系统所选组件
Flume核心概念
Flume由agent、collector和storage组成,通过三层架构支持数据收集、处理和存储。
Flume整体构成图
Flume采用三层架构,agent负责数据采集,collector负责聚合和转发,storage负责持久化存储。
实例应用
4.1 Flume接收数据到指定文件
Flume配置
# Flume配置示例a1.channels = c1a1.sources = r1a1.sinks = k1a1.channels.c1.type = memorya1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 41414a1.sinks.k1.channel = c1a1.sinks.k1.type = file_rolla1.sinks.k1.sink.directory = /var/flume_log
启动类
package com.bigdata.flume;public class App { public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); client.init("node6.sdp.cn", 41414); for (int i = 0; i < 10; i++) { client.sendDataToFlume("Hello Flume!"); } client.cleanUp(); }} 发送数据类
package com.bigdata.flume;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.api.RpcClient;import org.apache.flume.api.RpcClientFactory;import org.apache.flume.event.EventBuilder;public class MyRpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); } public void sendDataToFlume(String data) { Event event = EventBuilder.withBody(data, "UTF-8"); try { client.append(event); } catch (EventDeliveryException e) { client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); } } public void cleanUp() { client.close(); }} Maven依赖
org.apache.logging.log4j log4j-core 2.1 org.apache.flume flume-ng-core 1.5.2
创建存储目录
mkdir flume_logchown flume:hadoop ./flume_logcd flume_log
启动App.java后,查看flume_log目录中的文件。
4.2 Flume接收Kafka数据到HDFS
Flume配置
# Flume配置示例agent.sources = kafkaSourceagent.channels = memoryChannelagent.sinks = hdfsSinkagent.sources.kafkaSource.channels = memoryChannelagent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.kafkaSource.zookeeperConnect = node3.sdp.cn:2181,node4.sdp.cn:2181,node5.sdp.cn:2181,node6.sdp.cn:2181agent.sources.kafkaSource.topic = applogagent.sources.kafkaSource.groupId = flumeagent.sources.kafkaSource.kafka.consumer.timeout.ms = 100agent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000agent.sinks.hdfsSink.type = hdfsagent.sinks.hdfsSink.hdfs.path = hdfs://node1.sdp.cn:8020/tmp/applogs/%{logApp}-%Y%m%dagent.sinks.hdfsSink.hdfs.writeFormat = Textagent.sinks.hdfsSink.hdfs.fileType = DataStreamagent.sinks.hdfsSink.hdfs.rollSize = 0agent.sinks.hdfsSink.hdfs.rollCount = 0agent.sinks.hdfsSink.hdfs.rollInterval = 600agent.sinks.hdfsSink.hdfs.filePrefix = applogagent.sinks.hdfsSink.hdfs.fileSuffix = .logagent.sinks.hdfsSink.hdfs.inUserPrefix = _agent.sinks.hdfsSink.hdfs.inUserSuffix = _agent.sinks.hdfsSink.hdfs.interceptors = i1agent.sources.kafkaSource.interceptors.i1.type = org.bigdata.flume.LogInterceptor$Builder 查看HDFS执行结果
hdfs dfs -ls /tmp/applogs
日志文件内容示例
applog_2023-02-11_00-00-00.logapplog_2023-02-11_00-00-01.log...
Flume可靠性和扩展性使其适合处理海量日志数据,支持实时流式计算和持久化存储。
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2026年06月03日 23时47分39秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
RabbitMQ - 以 MQ 为例,手写一个 RPC 框架 demo
2023-03-01
php模板引擎smarty
2023-03-01
php正则表达式模式
2023-03-01
php正则表达式的特殊字符含义
2023-03-01
PHP正则表达式获取武汉市的实时pm2.5数据并邮件发送phpmailer
2023-03-01
RabbitMQ + JMeter组合,优化你的中间件处理方式!
2023-03-01
PHP水仙花问题解法之一
2023-03-01
php没有解析是怎么回事,linux下php文件没有被剖析怎么办?_后端开发
2023-03-01
php注册页面实现注册后跳转页面
2023-03-01
PHP消息队列的实现方式与详解,值得一看
2023-03-01
PHP混合Go协程并发
2023-03-01
php源码中如何添加滚动公告,给WordPress网站添加滚动公告的方法
2023-03-01
PHP源码安装后如何新增模块
2023-03-01
php源码详细安装步骤,linux下php源码安装步骤
2023-03-01
php漏洞tips
2023-03-01
php版Zencoding之 phpstorm
2023-03-01
PHP版本升级5.4手记
2023-03-01
php版本升级总结
2023-03-01
php版本微信公众号开发
2023-03-01
php版的微信公众号开发演示
2023-03-01