FlumeNG配置说明文档V10版

上传人:xian****hua 文档编号:163982019 上传时间:2022-10-23 格式:DOCX 页数:15 大小:143.32KB
收藏 版权申诉 举报 下载
FlumeNG配置说明文档V10版_第1页
第1页 / 共15页
FlumeNG配置说明文档V10版_第2页
第2页 / 共15页
FlumeNG配置说明文档V10版_第3页
第3页 / 共15页
资源描述:

《FlumeNG配置说明文档V10版》由会员分享,可在线阅读,更多相关《FlumeNG配置说明文档V10版(15页珍藏版)》请在装配图网上搜索。

1、Flume-NG配置说明文档文件状态: 草稿 正式发布 正在修改文件标识:说明文档当前版本:V1.0作 者:审 核:完成日期:Flume介绍Flume是Cloudera提供的日志收集系统,后贡献给了Apache。Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,可有效地收集,汇总和来自许多不同来源的大量日志数据到集中的数据存储系统。Apache的Flume是在Apache软件基金会的顶级项目。目前有两个版本的代码行,版本(也称为Flum

2、e-OG Flumeoriginal generation)版本以及1.x的版本(也称为Flume-NG Flume next generation版本)。Flume NG 1.x 是的重构版本,去掉了master、zookeeper、collector以及Web console,只有source、sink、channel,成为一个数据传输工具。Flume-NG支持多种source、多种channel、多种sink,而且可扩展性好,可以自定义组件,是一个非常优秀的日志传输系统。FlumeNG数据获取Flume提供了各种source的实现,包括Avro Source、Exce Source、Sp

3、ooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。RPC1、在flume中 ,Avro客户端使用AVRO RPC机制可以发送一个给定的文件 Avro 源:2、$ bin/flume-ng avro-client -H localhost -p 41414 F /usr/logs/log.103、上面的命令将发送的/ usr/logs/log.10的内容到 flume源监听端Executing commands1、还

4、有一个exec执行一个给定的命令获得输出的源。一个单一的输出,即“line”。回车( R)或换行符( N),或两者一起的文本。注:Flume不支持tail做为一个源,不过可以通过exec tail。Network streamsFlume支持以下的机制,从流行的日志流类型读取数据1、Avro(数据序列化系统)2、Syslog3、Netcat(使用TCP或UDP协议的网络连接去读写数据)Flume部署种类Flume可以实现多代理、合并、多路复用等多种部署方式。多代理:设置一个多层的流,你需要有一个指向下一跳avro源的第一跳的avro 接收器。这将导致第一Flume代理转发事件到下一个Flume

5、代理。例如,如果您定期发送的文件,每个事件(1文件)AVRO客户端使用本地Flume 代理,那么这个当地的代理可以转发到另一个有存储的代理。如下图:合并:在日志收集的一个非常普遍的情况是大量生产客户日志的数据发送到一些消费者代理连接到存储子系统。举例来说,从数以百计的Web服务器收集的日志发送到十几代理写入HDFS集群。如下图:多路复用流:Flume支持从一个源到多个通道。有两种模式的,分别是复制和复用。在复制的情况下,流的事件被发送到所有的配置通道。在复用的情况下,事件被发送到可用的渠道中的一个子集。如下图:Flume-NG配置Flume-NG安装简单,使用方便。从网上下载flume压缩包解

6、压,配置其中的配置文件,即可使用。定义流Flume事件(event)被定义为一个单位的数据流量有一个字节的有效载荷和一个可选字符串属性。Flume代理是一个承载组件(source源,sink接收器或channel通道)的(JVM)进程,通过事件流从外部源到下一个目的地(跳)。一个web服务器的产生的事件由 Flume源消耗。外部源发送事件发送到Flume中,会带着一个识别的格式。例如: 例如:一个Avro Flume源可以用来接收从Avro clients或其他flume代理从Avro link发送事件。当一个Flume 源接收一个事件,他会存储到一个或多个channel中,这些channel

7、会一直保存着event,直到被Flume sink处理掉,例如JDBC Channel作为一个例子-它使用一个文件系统支持嵌入式数据库,sink从channel中移除事件,同时放入到一个外部的仓库,比如HDFS,或者流转到下一个Flume source 源,source和sink在agent中是以异步运行方式运行事件。Flume代理配置存储在本地配置文件。这是一个文本文件格式如Java属性文件格式。在相同的配置文件中,可以指定一个或多个代理的配置。配置一个代理需要先单独配置每个组件(source、sink、channel),然后连接在一起,形成数据流。在conf目录下的flume.conf(无

8、此文件则创建)中配置。在Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。配置单个代理举一个例子,配置文件,描述一个单节点的Flume部署。这种配置可以让用户生成的事件和随后输出到控制台。将以下代码输入flume.conf配置文件。#确定代理上的组件agent1.sources = source1agent1.sinks = sink1agent1.channels = channel1#配置sourceagent1.sources.source1.type= netcatagent1.sources.source

9、1.bind = localhostagent1.sources.source1.port = 44444#配置sinkagent1.sinks.sink1.type= logger#配置channel(内存类型)agent1.channels.channel1.type= memoryagent1.channels.channel1.capacity = 1000agent1.channels.channel1.transactionCapactiy = 100#将组件连接agent1.sources.source1.channels = channel1agent1.sinks.sink1

10、.channel = channel1由以上代码可知,代理的名称为agent1,sink类型为日志,channel类型为内存,source为netcat方式,总体流程为:从netcat获取数据,通过内存输出到日志这个配置定义了一个单一的代理,称为agent1。agent1监听44444端口的数据作为source,通道缓存在内存中事件数据,事件数据记录到控制台和一个接收器上的数据源。配置文件名的各个组成部分,然后介绍了他们的 类型和配置参数。一个给定的配置文件可能会定义多个命名的代理人;一个给定的Flume进程启动时传递一个标志,告诉它的具名代理体现。在flume-ng目录下,结合此配置文件,我

11、们启动Flume按如下参数:1bin/flume-ng agent -f conf/flume.conf -n agent1 c ./conf -=INFO,console其中-n指定代理名称,-c指定配置文件目录,-f指定代理的配置文件。请注意,在完整部署,我们通常会包括一个选项: - CONF=,可用-c代替。 目录将包括一个shell脚本flume-env.sh(没有则创建此文件,其内容可参考flume-)和内置的Log4j属性文件。在这个例子中,我们使用一个Java选项强制flume登录到控制台。我们可以从一个单独的终端,然后telnet端口44444和发送flume事件:telnet

12、 localhost 44444Trying 127.0.0.1.Connected to localhost.Escape character is .1234OK他原来的flume终端输出日志信息的事件。2013-04-17 14:41:53,499 (lifecycleSupervisor-1-3) INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150) Source starting/127.0.0.1:444442013-04-17 14:42:10,898 (SinkRunner-Pollin

13、gRunner-DefaultSinkProcessor) INFO - (LoggerSink.java:70) Event: headers: body: 31 32 33 34 0D 1234. 说明你已经成功地配置和部署了一个flume代理!合并每个Flume代理均需要对其source、sink、channel配置。source是数据源用以指定数据来源,sink是接收器用以指向数据目标流向,channel是通道用以传输数据。以下以合并为例,讲述配置过程:上图中以tongjitest166和tongjitest167作为数据收集代理weblog-agent,tongjitest165作为

14、数据合并代理hdfs-agent,并将数据存入hdfs中。Flume所有配置文件在conf下,需要自己创建两个文件:flume-env.sh(可参考flume-)和flume.conf,其中后者名称可以随意指定。flume-env.sh主要配置:JAVA_HOME(指向本地java安装目录)、JAVA_OPTS(设置JVM相关运行参数)、FLUME_CLASSPATH(指向flume配置文件目录),flume.conf配置source、sink、channel的文件。tongjitest166和tongjitest167配置完全相同(也可以根据需要,配置不同的source端)。一、weblog

15、-agent代理配置此代理配置除了配置source、sink、channel以外。因为是多层代理,所以还要有“一个指向下一跳avro源的第一跳的avro 接收器”。如下,两个weblog-agent的flume.conf配置:# weblog agent config#确定sources, sinks和channels组件weblog-agent.sources = avro-AppSrv-sourceweblog-agent.sinks = avro-forward-sinkweblog-agent.channels = jdbc-channel#定义流weblog-AppSrv-sourc

16、e.channels = jdbc-channelweblog-forward-sink.channel = jdbc-channel#指向下一跳avro源的第一跳的avro接收器weblog-forward-sink.type = avroweblog-forward-sink.hostnameweblog-forward-sink.port = 10000#配置sinkweblog-forward-sink.type=avroweblog-forward-sink.hostnameweblog-forward-sink.port=10000#配置channelweblog-channel.

17、type= memoryweblog-channel.capacity = 1000weblog-channel.transactionCapactiy = 1000#配置sourceweblog-AppSrv-source.type= execweblog-AppSrv-mand= tail -f /home/hadoop/t.txtweblog-AppSrv-source.batchSize = 20source、sink、channel都有多种类型,可以根据需要灵活的进行配置。具体参数配置可参考网络资源。二、hdfs-agent代理配置tongjitest165也就是hdfs-agent

18、会将两个weblog-agent的数据进行汇总,并存入hdfs当中。其配置信息如下:# hdfs-agent config#确定sources, sinks和channels组件hdfs-agent.sources = avro-AppSrv-sourcehdfs-agent.sinks = avro-forward-sink hdfs-agent.channels = jdbc-channel#定义流hdfs-forward-sink.channel = jdbc-channelhdfs=jdbc-channel#配置流向hdfs的sinkhdfs-forward-sink.type=hdf

19、shdfs-forward-= hdfs:/10.32.21.165:8020/flume/webdatahdfs-forward-=DataStreamhdfs-forward-=1000hdfs-forward-=temphdfs-forward-=2000#配置channelhdfs= memoryhdfs = 1000hdfs = 1000#配置source(指向weblog-agent流向的端口)hdfs = avrohdfshdfs = 10000三、启动若是多个代理,需要启动每个代理。如果当前在flume-ng的根目录下,weblog-agent则需要执行bin/flume-ng

20、 agent n weblog-agent c ./conf f conf/flume.conf;hdfs-agent则需要执行bin/flume-ng agent n hdfs-agent c ./conf f conf/flume.conf。多路复制当有多个通道,需要将源发送到所有通道时,可以考虑使用复制。将会把源数据复制发送到所有通道。Flume.conf配置信息如下:# weblog agent config#List sources, sinks and channels in the agentweblog-agent.sources = avro-AppSrv-sourceweb

21、log-agent.sinks = avro-forward-sink file-sink log-sinkweblog-agent.channels = jdbc-channel file-channel log-channel#define the flowweblog-AppSrv-source.channels = jdbc-channel file-channel log-channelweblog-forward-sink.channel = jdbc-channelweblog-sink.channel=file-channelweblog-sink.channel=log-ch

22、annelweblog-agent.sources.avro-AppSrv-source.selector.type=replicating#avro sink propertiesweblog-sink.type = file_rollweblog- = /home/hadoop/flumeweblog- = 0weblog-sink.type=loggerweblog-forward-sink.type=hdfsweblog-forward-= hdfs:/10.32.21.165:8020/flume/webdataweblog-forward-=DataStreamweblog-for

23、ward-=20weblog-forward-=tempweblog-forward-=50#configure other piecesweblog-channel.type= memoryweblog-channel.capacity = 1000weblog-channel.transactionCapactiy = 1000weblog-channel.type=memoryweblog-channel.capacity = 1000weblog-channel.transactionCapactiy = 1000weblog-channel.type=memoryweblog-cha

24、nnel.capacity=1000weblog-channel.transactionCapactiy=1000#读取本地文件/home/hadoop/t.txtweblog-AppSrv-source.type= execweblog-AppSrv-mand = cat /home/hadoop/t.txtweblog-AppSrv-source.batchSize = 20注:使用复制功能需要“.sources.selector.type = replicating”,如果没有这句,则源数据将会只发送到第一个通道,其余通道在flume启动时会报错;如果一个源通过一个通道发往多个sink,

25、则每个sink的内容只是源的一部分数据,综合所有sink数据才会还原源数据。而且即使有上述复制语句但是只有一个通道,数据同样会分散在多个sink中。启动flume语句:bin/flume-ng agent -f conf/flume.conf -n weblog-agent -c ./conf -=INFO,console 以上配置代码有一个source组件(使用exec源读取本地文件),三个sink组件(一个写入本地文件、一个写入日志文件、一个写入HDFS)和三个channel组件(都是memory),这些个组件均需要单独配置,并通过sources的channels连接在一起。多路复用多路复

26、用可以根据设定的信息,进一步分流。在使用多路复用前得先设定header,而这在从源读数据时设定。而根据header的值选择推送的channel。Flume.conf配置信息如下:# weblog agent config#List sources, sinks and channels in the agentweblog-agent.sources = avro-AppSrv-sourceweblog-agent.sinks = avro-forward-sink file-sink log-sinkweblog-agent.channels = jdbc-channel file-chan

27、nel log-channel#define the flowweblog-AppSrv-source.channels = jdbc-channel file-channel log-channelweblog-forward-sink.channel = jdbc-channelweblog-sink.channel=file-channelweblog-sink.channel=log-channelweblog-agent.sources.avro-AppSrv-source.selector.type=multiplexingweblog-AppSrv-source.selector

28、.header=stateweblog-agent.sources.avro-AppSrv-source.selector.mapping.CA=jdbc-channel log-channelweblog-agent.sources.avro-AppSrv-source.selector.mapping.NY=file-channelweblog-agent.sources.avro-AppSrv-source.selector.mapping.AZ=jdbc-channelweblog-agent.sources.avro-AppSrv-source.selector.default=jd

29、bc-channel#avro sink propertiesweblog-sink.type = FILE_ROLLweblog- = /home/hadoop/flumeweblog- = 0weblog-sink.type=loggerweblog-forward-sink.type=hdfsweblog-forward-= hdfs:/10.32.21.165:8020/flume/webdataweblog-forward-=DataStreamweblog-forward-=20weblog-forward-=tempweblog-forward-=50#configure oth

30、er piecesweblog-channel.type= memoryweblog-channel.capacity = 1000weblog-channel.transactionCapactiy = 1000weblog-channel.type=memoryweblog-channel.capacity = 1000weblog-channel.transactionCapactiy = 1000weblog-channel.type=memoryweblog-channel.capacity=1000weblog-channel.transactionCapactiy=1000web

31、log-AppSrv-source.type= com.source.ExecSourceweblog-AppSrv-mand = cat /home/hadoop/t.txtweblog-AppSrv-source.batchSize = 20是自定义源,此源是源代码中ExecSource类的修改,在run()方法中eventList.add(EventBuilder.withBody(line.getBytes()改为:eventList.add(EventBuilder.withBody(line.getBytes(),map)其中map是这样声明的:Map map=new HashMa

32、p();map.put(state, NY);其中state对应于selector.header,NY对应于mapping.NY,通过这样在从源获取数据时对数据做了封装,成为event,并加入header作为分流标记。当event要推送入channel时,会根据header的值将其推送入相应的channel中。上例中,通过增加header信息将所有数据标记为“NY”,则数据只会推送入file-channel,最终存入/home/hadoop/flume目录下,而其他channel则没有数据。Flume拦截器InterceptorsFlume可以修改和删除传输中的数据。Flume可以通过拦截器拦

33、截符合要的event并设置header,而这可能影响到以后汇总时的复用。例如,可以将不同机器收集的数据加上主机名作为header,然后在汇总端可以按照不同的主机名发送到不同的目标去。举例:多级代理,tongjitest166收集的数据均对其加header为主机名的标记,然后发送到tongjitest165上,然后在复用模式下,将根据header的值选择存储在本地文件还是hdfs中。tongjitest166中的flume配置文件如下:weblog-agent.sources = avro-AppSrv-sourceweblog-agent.sinks = avro-forward-sinkwe

34、blog-agent.channels = jdbc-channel#define the flowweblog-AppSrv-source.channels = jdbc-channelweblog-forward-sink.channel = jdbc-channel#avro sink propertiesweblog-forward-sink.type = avroweblog-forward-sink.hostnameweblog-forward-sink.port = 10000#weblog-forward-sink.type=avroweblog-forward-sink.ho

35、stnameweblog-forward-sink.port=10000#weblog-channel.type= memoryweblog-channel.capacity = 1000weblog-channel.transactionCapactiy = 100#weblog-AppSrv-source.type= netcat#weblog-AppSrv-source.bind = localhost#weblog-AppSrv-source.port = 44444#指定源weblog-AppSrv-source.type= execweblog-AppSrv-mand = cat

36、/home/hadoop/t.txtweblog-AppSrv-source.batchSize = 100#使用HostInterceptor拦截器weblog-AppSrv-source.interceptors = i1weblog-agent.sources.avro-AppSrv-source.interceptors.i1.type= host#不允许使用IP作为主机名weblog-agent.sources.avro-AppSrv-source.interceptors.i1.useIP=falseweblog-agent.sources.avro-AppSrv-source.i

37、nterceptors.i1.preserveExisting = false#设置event的headerweblog-agent.sources.avro-AppSrv-source.interceptors.i1.hostHeader= hostname 上述配置文件将使用EXEC源读取本地文件,并将主机名作为header,然后使用avro发送给目标机端口。以下是tongjitest165中的flume配置文件:hdfs-agent.sources = avro-collection-sourcehdfs-agent.sinks = hdfs-sink file-sinkhdfs-age

38、nt.channels = mem-channel file-channel#hdfs-collection-source.channels = mem-channel file-channel = mem-channel = file-channel#指定数据源hdfs-collection-source.type = avrohdfs-collection-source.bindhdfs-collection-source.port = 10000#配置channel = memory = 1000 = 100= memory = 1000 = 100#目标是hdfs=hdfs= hdfs

39、:/10.32.21.165:8020/flume/webdata=DataStream=20=temp=5#目标是写入本地文件 = file_roll = /home/hadoop/flume = 0#复用模式hdfs-collection- = multiplexinghdfs-agent.sources.avro-collection-source.selector.header=hostnamehdfs-agent.sources.avro-collection-source.selector.mapping.tongjitest166 = file-channelhdfs-colle

40、ction- = mem-channel上述代码说明从指定的avro源不断读取数据然后根据header的内容发往不同的sink端,本例是如果是tongjitest166来的数据则存储在本地文件中,其他则存储于hdfs中。拦截器的种类有很多,有支持正则表达式的、时间戳的、host、static等种类,可以根据需要自行选择。Flume Sink ProcessorsSink Processor可以实现容错和负载均衡的目的。有Failover Sink Processor、Load balancing Sink Processor、Custom Sink Processor(目前还不支持)。容错和负

41、载均衡机制是以sinkgroups方式实现,需要有多个sink,在选择机制时在,type中指定就可以:failover、load_balance。比如:a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 5a1.sinkgroups.g1.processor.priority.k2 = 10a1.sinkgroups.g1.processor.maxpenalty = 10000以上是容

42、错的一个例子,通过设置优先级,当出现问题时会根据优先级选择sink。在sinks中的只要有一个可用就能保证继续正确的运行下去。Flume自定义业务Flume可以根据实际需要实现自己的业务或者扩展source和sink。做到这些,都必须实现四个方法:stop()、start()、process()、configure(),其中处理过程主要在process方法中。可以对其中的一些source和sink做修改:比如:ExecSource类,发现在273行有读取文件的代码line = reader.readLine(),在此可以对line使用java进行一些操作。然后放入event,转而发送到chan

43、nel中,等待sink消耗event。也可在sink端找到从channel中读取event的代码channel.take(),通过event的getBody()方法获取传输的数据(返回的是字符数组),截取数据对其做一些处理后再转换为event,送入sink端。如,在RollingFileSink类中process()方法中的sinkCounter.incrementEventDrainAttemptCount();语句前加如下代码:String temp=new String(event.getBody(); temp=temp+;event.setBody(temp.getBytes();则

44、在配置flume.conf时,weblog-sink.type = ,等号后面是自己的类,则写入指定文件的数据的每行后面会有+。还可以将sink端指向MongDB、cassandra、Kafk等目标,也可以扩展新的source端。需要注意的是,扩展sink端需要“extends AbstractSink implements Configurable”,而扩展source端需要“extends AbstractSource implements Configurable, PollableSource(或者是EventDrivenSource,)”,再具体实现上述四个方法即可。sink的sta

45、rt()方法是用来初始化sink并且使其处于可以发送events到下一目标的状态;process()方法是核心的处理方法,负责从channel接受数据并推送数据;stop()方法负责做一些清理工作(比如释放资源);configure()方法用来做配置设置。Source的start()、stop()、configure()方法和sink的类似,而process()的方负责法检查新数据并以event的形式存入channel中。例如:public class barSource extends AbstractSource implements Configurable, EventDrivenSo

46、urceOverridepublic void configure(Context context) some_Param = context.get(some_param, String.class);/ 加载一些参数Overridepublic void start() / 初始化/例如多线程初始化.Overridepublic void stop () / 停止 /例如关闭线程等Overridepublic Status process() throws EventDeliveryException try /业务代码/ receive new dataEvent e = get_som

47、e_data();/ store the event to underlying channels(s)getChannelProcessor().processEvent(e) catch (ChannelException ex) return Status.BACKOFF;return Status.READY;使用:将自定义插件打包成jar包,放入flume安装包的lib中,然后在配置文件中的组件类型时,指定实现类(类的完整名称:包名+类名)即可。可在eclipse下面开发自定义业务,具体做法是将flume中lib下的所有jar包引入项目,然后编写source和sink处理业务。简单一点的做法,可将某个类直接拷贝到项目下,然后对其修改,打包上传到flume下的lib目录中,在配置文件中相应位置指定此类就可以。复杂的就需要自己写类去实现4个方法。注:所使用flume版本是;为了方便实际去体会flume,文档中关于flume的配置代码都是flume.conf的全部内容,复制即可使用,且每个例子都是实际通过,测试可用;flume包含内容十分丰富,本文档并没有包含所有相关内容,如无需要的内容还请自行查找相关内容。参考:123 456 7 8

展开阅读全文
温馨提示:
1: 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
2: 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
3.本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
5. 装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
关于我们 - 网站声明 - 网站地图 - 资源地图 - 友情链接 - 网站客服 - 联系我们

copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!