全部課程
發(fā)布時間: 2019-09-26 14:14:41
在大數(shù)據(jù)項目中,常常會使用到flume把數(shù)據(jù)發(fā)送到kafka 消息系統(tǒng)或者h(yuǎn)dfs 存儲。以下介紹flume組件與kakfa組件兩個端的數(shù)據(jù)連接。在實驗過程中,主要是配置文件。
配置flume的conf文件
#通過sink把數(shù)據(jù)分別輸出到kafka和HDFS上 # Name the components on this agent agent.sources = r1 agent.sinks = k1 k2 agent.channels = c1 c2 # Describe/configuration the source agent.sources.r1.type = exec agent.sources.r1.command = tail -f /root/test.log agent.sources.r1.shell = /bin/bash -c ## kafka #Describe the sink agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.topic = kafkatest agent.sinks.k1.brokerList = master:9092 agent.sinks.k1.requiredAcks = 1 agent.sinks.k1.batchSize = 2 # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 #agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.r1.channels = c1 c2 agent.sinks.k1.channel = c1 ## hdfs #Describe the sink agent.sinks.k2.type = hdfs agent.sinks.k2.hdfs.path = hdfs://master:9000/data/flume/tail agent.sinks.k2.hdfs.fileType=DataStream agent.sinks.k2.hdfs.writeFormat=Text #agent.sinks.k2.hdfs.rollInterval = 0 #agent.sinks.k2.hdfs.rollSize = 134217728 #agent.sinks.k2.hdfs.rollCount = 1000000 agent.sinks.k2.hdfs.batchSize=10 ## Use a channel which buffers events in memory agent.channels.c2.type = memory #agent.channels.c1.capacity = 1000 #agent.channels.c2.transactionCapacity = 100 ## Bind the source and sink to the channel #agent.sources.r1.channels = c2 agent.sinks.k2.channel = c2 |
服務(wù)端/usr/local/flume/bin/flume-ng agent -f flume-exec-total.conf -n agent -Dflume.root.logger=INFO, console 客戶端 echo "wangzai doubi" > test.log
? Kafka配置項目
kafka創(chuàng)建topic
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkatest |
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafkatest --from-beginning