flume不同的source可以写到一个flume filechannell么

博客访问: 3198
博文数量: 1
注册时间:
ITPUB论坛APP
ITPUB论坛APP
APP发帖 享双倍积分
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: Hadoop
一. flume是什么
flume是apache的一个数据收集框架。定义了一个数据流的模型。
二. 设计核心思想
对日志收集的三个核心要素进行了抽象:
1. Source:日志从哪里产生
2. Sink:日志会被推送到什么地方
3. Channel:Source通过什么样的渠道,送到Sink
# 可以把Channel理解为一个queue,Source adds the events and Sink removes
三. flume的价值 1
得益于设计上的抽象,使得要素隔离得很清晰。使用上变得非常灵活。我们可以根据实际需要,对于这三个核心要素,选择合适的具体实现,完成我们工作场景下,日志收集的目的!
apache官方已经对这三个具体要素,提供了很多现成的实现:
Avro Source:
Thrift Source:
#上面这两个很像,都是通过序列化的流,接收外部向它提交的数据,这两个都是apache自己实现的序列化。它们最常见的使用场景,都是用来构建log
chain的,下面会进一步介绍。Avro是hadoop自己实现的序列化方案,官方比较推荐这个
Source:收集外部Linux命令产生的标准输出(标准错误默认被丢弃,但有必要的话,我们都会做重定向)。对外部命令本身有一定要求,例如tail
-F这种持续产生输出的命令,就比较适合收集目的。
& & &NetCat
Source:使用简单的nc命令,打开一个监听端口。可以接收从nc发送过来的数据。功能简单,主要用于调试。
JMS Source:从消息队列获取数据。
& & Syslog
Source:直接从syslog(RFC 3164 - The BSD syslog Protocol, RFC 5424
-&The Syslog Protocol)获取数据
Sink:推送到hdfs上,这个模块应该算是flume的主打模块了。
& & Avro Sink/Thrift
Sink:使用在级联的场景下,配合后端agent对应的Avro Source和Thrift Source构成log
chain。后面会详细介绍。
& & File Roll
Sink:存放在外部文件系统中。
& & HBase Sink:
& & Logger Sink:debug用途
Channels:
& & Memory
Channel:优点,高吞吐量。缺点:1. 较容易受到容量限制 2. 无法持久化,在flume进程意外结束后,丢失数据。
Channels:使用内置的数据库Derby作为数据暂存区
Channels:使用外部的文件系统。可以设置文件、目录加密。不过速度很慢。。
四. flume的配置
为了方便后续的讲解,先打断插入介绍一下flume的配置。
整体而言,flume的配置简单到令人发指的程度
# 我们已经知道了,一个flume agent,主要就是由Source,Channel,Sink三部分构成的。所以先定义它们。
.sources =
.channels =
# 指定某个Source收集的数据,送到哪些下游的Channel
.sources..channels =
# 指定某个Sink,从哪个Channel获取数据
.sinks..channel =
注意:一个Source可以向多个Channels发送数据。但一个Sink只能指定一个Channel。
各个模块设置
# Source的设置
.sources.. =
# Channel的设置
.channel.. =
# Sink的设置
.sources.. =
# Interceptor的设置
.sources.. =
五. flume的价值2:
得益于flume模块化的设计,使得flume可以互相组合嵌套,从而产生出很多种灵活的使用方式!
方式一:较简单的绑定
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
方式二:汇流
Consolidation
配置和方式一相似,只有概念上的差别。此处省去配置。
方式三:分流 Fan out
对于分流场景,有两种策略可以设置。缺省是replicating,这种模式比较简单,意为从一个source收集来的数据,无差别地向所有下游绑定的channels进行发送。
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3
还有一种模式叫multiplexing。可以根据接收到的数据(在flume设计实现中,抽象为一个event)中的head信息,进行选择发送。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
六. 拦截器(Source
Interceptor)
Flume事件抽象——Event
body承载的就是需要被收集的日志数据。
而Interceptor的作用就是对header进行设定
Interceptor配置:
backendAgent.sources.thriftSource.interceptors
= hostInterceptor timeStampInterceptor
staticInterceptor
backendAgent.sources.thriftSource.interceptors.hostInterceptor.type
backendAgent.sources.thriftSource.interceptors.hostInterceptor.hostHeader
backendAgent.sources.thriftSource.interceptors.timeStampInterceptor.type
= timestamp
backendAgent.sources.thriftSource.interceptors.staticInterceptor.type
backendAgent.sources.thriftSource.interceptors.staticInterceptor.key
backendAgent.sources.thriftSource.interceptors.staticInterceptor.value
= tc_flume
七. 高级设置
Sink Processor
通过引入sinkgroup的概念,来为多个sinks如何协同工作设置策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = default | failover | load_balance
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Event Serializer
八. 使用问题杂记:
0. protobuf
如果需要自己编译flume源码,先要自行安装protobuf包。
关于依赖包
对于flume项目的依赖包组织,用一句话来简
单总结——就是坨屎!flume自带的脚本(flume-ng),可以通过配置在HADOOP_HOME变量的设定,直接使用hadoop项目中的jar
包(特别是hdfs和hbase的)。这对于使用hdfs的工作场景中是有意义的。因为事实上,flume自己只带了flume-hdfs-sink一层
很薄的接口,真实的HDFS存取访问操作,是需要外部的hadoop项目包提供的。不过,真要这么做的话,会立即会碰到很多版本不一致的冲突。。
所以,我最后为了隔离性和依赖的可控性,还是在flume的启动脚本中(flume-ng)中,重设了hadoop环境变量:
HADOOP_HOME=""
然后手工复制所需要的依赖(以下我列举的依赖包,在使用hdfs sink的场景中是必须的)
hadoop-common
hadoop-hdfs
hadoop-auth
commons-configuration
另外,对于guava包,不同的模块需要版本号也会有差异,非常头痛。只能分离多份flume的拷贝,根据不同使用配置的agent,采用不同的包。
2. 在HDFS Sink下的复制因子
们知道,在HDFS配置文件中,可以通过dfs.replication变量,设置hdfs的复制因子。而flume作为客户端,也会对这个复制因子有个
3的初始预设值。如果服务器的真实配置的复制因子小于这个数目,就会引起flume
rotate存放在hdfs上的文件。并在标准输出打出一行警告:
Block Under-replication detected. Rotating
这会使得hdfs
sink的一些设定,比如根据文件大小,根据事件数量,根据时间间隔翻滚文件的设置完全无效化,最后的结果就是在hdfs留下一堆十几kb的小文件。这对于本身就是为大文件设计的hdfs而言,无异于一种扼杀!
所以,如果确实不能配置3个以上datanode的话,一个简单的作法是在flume的配置目录下,复制一份hdfs-site.xml文件,写上:
dfs.replication
[flume-ng-core]
org.apache.flume.source.ThriftSource
logger.warn("Thrift source % could not append events to the "
"channel.", getName());
logger.warn("Thrift source {} could not append events to the "
"channel.", getName());
阅读(2457) | 评论(0) | 转发(0) |
上一篇:没有了
下一篇:没有了
相关热门文章
给主人留下些什么吧!~~
请登录后评论。中国领先的IT技术网站
51CTO旗下网站
Flume 的无数据丢失保证,Channel 和事务
《Flume:构建高可用、可扩展的海量日志采集系统》本书从Flume 的基本概念和设计原理开始讲解,分别介绍了不同种类的组件、如何配置组件、如何运行Flume Agent 等。同时,分别讨论Source、Channel 和Sink 三种核心组件,不仅仅阐述每个组件的基本概念,而且结合实际的编程案例,深入、全面地介绍每个组件的详细用法,并且这部分内容也是整个Flume 框架的重中之重。本节为大家介绍Flume 的无数据丢失保证,Channel 和事务。
作者:马延辉/史东杰 译来源:电子工业出版社| 15:47
Flume 的无数据丢失保证,Channel 和事务
如果配置正确,Flume 提供了无数据丢失的保证。当然,一旦管道中所有Flume Agent的容量之和被使用完,Flume 将不再接受来自客户端的数据。此时,客户端需要缓冲数据,否则数据可能会丢失。因此,配置管道能够处理最大预期的停机时间是非常重要的。我们将在第8 章讨论Flume 管道的配置。
Flume 的持久性保证依赖于使用的持久性Channel 的保证。Flume 自带两类Channel :Memory Channel 和File Channel。Memory Channel 是一个内存缓冲区,因此如果Java23 虚拟机(JVM)或机器重新启动,任何缓冲区中的数据将丢失。另一方面,File Channel是在磁盘上的。即使JVM 或机器重新启动,File Channel 也不丢失数据,只要磁盘上存储的数据仍然是起作用的和可访问的。机器和Agent 一旦开始运行,任何存储在FileChannel 中的数据将最终被访问。
Channel 本质上是事务性的。此处的事务不同于数据库事务。每个Flume 事务代表一批自动写入到Channel 或从Channel 删除的事件。无论是当Source 将事件写入Channel 时,或Sink 从Channel 读取事件时,它必须在事务的范围之内进行操作。
Flume 保证事件至少一次被送到它们的目的地。Flume 只有一次倾力写数据,且不存在任何类型的故障事件只被写一次。但是像网络超时或部分写入存储系统的错误,可能导致事件不止被写一次,因为Flume 将重试写操作直到它们完全成功。网络超时可能表示写操作的失败,或者只是机器运行缓慢。如果是机器运行缓慢,当Flume 重试这将导致重复。因此,确保每个事件都有某种形式的唯一标识符通常是一个好主意,如果需要,最终可以用来删除事件数据。
喜欢的朋友可以添加我们的微信账号:
51CTO读书频道二维码
51CTO读书频道活动讨论群:【责任编辑: TEL:(010)】&&&&&&
大家都在看猜你喜欢
热点热点头条头条热点
24H热文一周话题本月最赞
讲师:51174人学习过
讲师:41370人学习过
讲师:125189人学习过
精选博文论坛热帖下载排行
J2EE是目前企业级软件开发的首选平台。本书从架构的角度讲解了一个完整的J2EE系统的搭建。内容包括:正则表达式、JSP、Swing、XML等技术在...
订阅51CTO邮刊用户名:workming
文章数:126
评论数:252
访问量:1503666
注册日期:
阅读量:1297
阅读量:3317
阅读量:581466
阅读量:465990
51CTO推荐博文
一、简介、概述是&提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。最早是&提供的日志收集系统,目前是&下的一个孵化项目,支持在日志系统中定制各类数据发送方,用于收集数据;同时,提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力,提供了从(控制台)、()、(文件)、()、(日志系统,支持&和&等种模式),(命令执行)等数据源上收集数据的能力。、数据流通传输数据的基本单位是&,如果是文本文件,通常是一行记录,这也是事务的基本单位。F运行的核心是&。它是一个完整的数据收集工具,含有三个核心组件,分别是&、、。从&,流向&,再到&,本身为一个&数组,并可携带&信息。代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。:完成对日志数据的收集,分成&和&打入到&之中。:主要提供一个队列的功能,对&提供中的数据进行简单的缓存。:取出&中的数据,进行相应的存储文件系统、数据库、或者提交到远程服务器。通过这些组件,可以从一个地方流向另一个地方,如下图所示:消费从外部流进的&,如&接收外部客户端传来的或是从别的流出来的&。可以把&送往一个或多个&。是一个队列,持有&等待&来消费,一种&的实现:使用本地文件系统来作为它的存储。的作用是把&从&里移除,送往外部数据仓库或给下一站&的&,如&送往&。同个&下的&和&是异步的。下面再举几个数据流通的例子,说明不同的使用方式。、agent模式、/Collector场景、一对多路输出模型二、部署与测试、安装、安装、环境变量设置、配置、服务启动、相关测试、本地绑定&、采用提交测试数据、效果展示、日志定期清理附录:本文出自 “” 博客,请务必保留此出处
了这篇文章
附件下载:      
类别:┆阅读(0)┆评论(0)
23:48:59 14:02:26
请输入验证码:Flume 入门--几种不同的Sources - 波比12 - 博客园
随笔 - 86, 文章 - 0, 评论 - 5, 引用 - 0
1.flume概念
flume是分布式的,可靠的,高可用的,用于对不同来源的大量的日志数据进行有效收集、聚集和移动,并以集中式的数据存储的系统。
flume目前是apache的一个顶级项目。
flume需要java运行环境,要求java1.6以上,推荐java1.7.
将下载好的flume安装包解压到指定目录即可。
2.flume中的重要模型
2.1.1.flume Event:
flume 事件,被定义为一个具有有效荷载的字节数据流和可选的字符串属性集。
2.1.2.flume Agent:
flume 代理,是一个进程承载从外部源事件流到下一个目的地的过程。包含source channel 和 sink。
2.1.3.Source
数据源,消耗外部传递给他的事件,外部源将数据按照flume Source 能识别的格式将Flume 事件发送给flume Source。
2.1.4.Channel
数据通道,是一个被动的存储,用来保持事件,直到由一个flume Sink消耗。
2.1.5.Sink
数据汇聚点,代表外部数据存放位置。发送flume event到指定的外部目标。
2.2.flume流动模型
2.3.flume的特点
2.3.1.复杂流动性
Flume允许用户进行多级流动到最终目的地,也允许扇出流(一到多)、扇入流(多到一)的、故障转移和失败处理。
2.3.2.可靠性
事务性的数据传递,保证了数据的可靠性。
2.3.3.可恢复性
通道可以以内存或文件的方式实现,内存更快,但是不可恢复,而文件虽然比较慢但提供了可恢复性。
1.首先编写一个配置文件:
#example.conf:单节点Flume配置
#命名Agent a1的组件
a1.sources
a1.channels
#描述/配置Source
a1.sources.r1.type
a1.sources.r1.bind
a1.sources.r1.port
#描述Sink
a1.sinks.k1.type
#描述内存Channel
a1.channels.c1.type
a1.channels.c1.capacity
a1.channels.c1.transactionCapacity
#为Channle绑定Source和Sink
a1.sources.r1.channels
a1.sinks.k1.channel
&2.通过flume的工具启动agent
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
&3、发送数据
在windows中通过telnet命令连接flume所在机器的44444端口发送数据。
4.Source详解
现在介绍几种比较重要的Source
4.1.Avro Source
监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。
4.1.1.&Avro Source属性说明
!channels& –&&
!type& –&& 类型名称,"AVRO"
!bind& –&& 需要监听的主机名或IP
!port& –&& 要监听的端口
threads&&& –&& 工作线程最大线程数
selector.type && &
selector.* && &
interceptors& –&& 空格分隔的拦截器列表
interceptors.*&&&
compression-type& none&& 压缩类型,可以是“none”或“default”,这个值必须和AvroSource的压缩格式匹配
sslfalse& 是否启用ssl加密,如果启用还需要配置一个“keystore”和一个“keystore-password”。
keystore&& –&& 为SSL提供的java密钥文件所在路径。
keystore-password–&& 为SSL提供的java密钥文件 密码。
keystore-typeJKS密钥库类型可以是“JKS”或“PKCS12”。
exclude-protocolsSSLv3& 空格分隔开的列表,用来指定在SSL / TLS协议中排除。SSLv3将总是被排除除了所指定的协议。
ipFilter&& false& 如果需要为netty开启ip过滤,将此项设置为true
ipFilterRules–&& 定义netty的ip过滤设置表达式规则
编写配置文件& 修改上面给出的配置文件,除了Source部分配置不同,其余部分都一样。不同的地方如下:
#描述/配置Source
a1.sources.r1.type
a1.sources.r1.bind
a1.sources.r1.port
&启动flume:
&&& ./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console
&通过flume提供的avro客户端向指定机器指定端口发送日志信息:
&&& ./flume-ng avro-client --conf ../conf --host 0.0.0.0 --port 44444 --filename ../mydata/log1.txt
会发现确实收集到日志
4.2.Spooling Directory Source
这个Source允许你将将要收集的数据放置到"自动搜集"目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入通道,它会被重命名或可选的直接删除。
要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错。
属性说明:(由于比较长 这里只给出了必须给出的属性,全部属性请参考官方文档):
!channels& –&&
!type& –&& 类型,需要指定为"spooldir"
!spoolDir& –&& 读取文件的路径,即"搜集目录"
PLETED对处理完成的文件追加的后缀
编写配置文件& 修改上面给出的配置文件,除了Source部分配置不同,其余部分都一样。不同的地方如下:
#描述/配置Source
a1.sources.r1.type
= spooldir
a1.sources.r1.spoolDir=/home/park/work/apache-flume-1.6.0-bin/mydata
&启动flume:
&&& ./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console
&向指定目录中传输文件,发现flume收集到了该文件,将文件中的每一行都作为日志来处理
4.3.NetCat Source
一个NetCat Source用来监听一个指定端口,并将接收到的数据的每一行转换为一个事件。
4.3.1.&NetCat Source属性说明
!channels–&&
!type–&& 类型名称,需要被设置为"netcat"
!bind–&& 指定要绑定到的ip或主机名。
!port–&& 指定要绑定到的端口号
max-line-length&& 512单行最大字节数
案例:上面完整的例子即是
4.4.HTTP Source
HTTP Source接受HTTP的GET和POST请求作为Flume的事件,其中GET方式应该只用于试验。
该Source需要提供一个可插拔的"处理器"来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口,该处理器接受一个
HttpServletRequest对象,并返回一个Flume Envent对象集合。
从一个HTTP请求中得到的事件将在一个事务中提交到通道中。因此允许像文件通道那样对通道提高效率。
如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。
如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。
4.4.1.&HTTP Source属性说明
!type && 类型,必须为"HTTP"
!port–&& 监听的端口
bind&& 0.0.0.0&&& 监听的主机名或ip
handler&&& &
org.apache.flume.source.http.JSONHandler处理器类,需要实现HTTPSourceHandler接口
handler.*& –&& 处理器的配置参数
selector.type
selector.* &&
interceptors& –&&
interceptors.*&&&
enableSSL& false& 是否开启SSL,如果需要设置为true。注意,HTTP不支持SSLv3。
excludeProtocols& SSLv3& 空格分隔的要排除的SSL/TLS协议。SSLv3总是被排除的。
keystore&&
&& 密钥库文件所在位置。
keystorePassword
Keystore 密钥库密码
编写配置文件& 修改上面给出的配置文件,除了Source部分配置不同,其余部分都一样。不同的地方如下:
#描述/配置Source
a1.sources.r1.type
a1.sources.r1.port
&启动flume:
&&& ./flume-ng agent --conf ../conf --conf-file ../conf/template6.conf --name a1 -Dflume.root.logger=INFO,console
&通过命令发送HTTP请求到指定端口:
&&& curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://0.0.0.0:6666flume介绍 - J_Shine的博客 - CSDN博客
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
   flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。
可恢复性:
还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。
使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
生产数据,运行在一个独立的线程。
从Client收集数据,传递给Channel。
从Channel收集数据,运行在一个独立线程。
连接 sources 和 sinks ,这个有点像一个队列。
可以是日志记录、 avro 对象等。
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:
Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。如下图所示:
我的热门文章

我要回帖

更多关于 flume kafkachannel 的文章

 

随机推荐