魏长东

weichangdong

aws的ec2上配置flume

最近项目要用的AWS的一些东西,准备到时候把ec2的数据通过flume同步到aws的s3。

以下的代码是我从n个网站搜到的,并且成功执行的。

最终我也配置成功了。

 

1)将下载的flume包,解压到/home/hadoop目录中。所以你的先安装hadoop。
2)修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置。
 

cp conf/flume-env.sh.template conf/flume-env.sh
JAVA_HOME=/usr/java/jdk1.8.0_51

/etc/profile文件修改
export AWS_CREDENTIAL_FILE=/opt/aws/credential-file-path
export  PATH=$PATH:/opt/apache-maven-3.3.3/bin
export JAVA_HOME=/usr/java/jdk1.8.0_51
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/bin
export PATH=$PATH:/opt/hadoop-2.7.1/bin:/opt/hadoop-2.7.1/sbin
export HADOOP_MAPRED_HOME=/opt/hadoop-2.7.1/
export HADOOP_HOME=/opt/hadoop-2.7.1/

==========【avro】===========
sudo  vim   conf/avro.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
  
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4141
  
# Describe the sink
a1.sinks.k1.type = logger
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/opt/hadoop-2.7.1/apache-flume-1.6.0-bin/bin/flume-ng agent -c . -f /opt/hadoop-2.7.1/apache-flume-1.6.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

/opt/hadoop-2.7.1/apache-flume-1.6.0-bin/bin/flume-ng avro-client -c . -H localhost -p 4141 -F /home/ec2-user/flume-wcd.txt


==========【spool】===========
 vi conf/spool.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/ec2-user/
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/opt/hadoop-2.7.1/apache-flume-1.6.0-bin/bin/flume-ng agent -c . -f /opt/hadoop-2.7.1/apache-flume-1.6.0-bin/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
echo "spool test1" > /home/ec2-user/spool_text.log

==========【Exec】===========

sudo vi conf/exec_tail.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/ec2-user/spool_text.log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/opt/hadoop-2.7.1/apache-flume-1.6.0-bin/bin/flume-ng agent -c . -f /opt/hadoop-2.7.1/apache-flume-1.6.0-bin/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console

for i in {1..10};do echo "exec tail$i" >> /home/ec2-user/spool_text.log;echo $i;sleep 0.1;done

==========【Syslogtcp】===========

sudo vi conf/syslog_tcp.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/opt/hadoop-2.7.1/apache-flume-1.6.0-bin/bin/flume-ng agent -c . -f /opt/hadoop-2.7.1/apache-flume-1.6.0-bin/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
echo "hello wcd syslog" | nc localhost 5140

==========【JSONHandler】===========
sudo vi conf/post_json.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/opt/hadoop-2.7.1/apache-flume-1.6.0-bin/bin/flume-ng agent -c . -f /opt/hadoop-2.7.1/apache-flume-1.6.0-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console

curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "swordman wcd"}]' http://localhost:8888

==========【hadoop or aws s3】===========


sudo vi conf/hdfs_sink.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = s3n://mobileapiaccess/wcd
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/opt/hadoop-2.7.1/apache-flume-1.6.0-bin/bin/flume-ng agent -c . -f /opt/hadoop-2.7.1/apache-flume-1.6.0-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
echo "hello swordman flume -> hadoop and  aws  s3 testing" | nc localhost 5140

上面的除了hadoop的遇见了好多问题之外,别的很顺利、

因为用s3的话,需要额外的jar包,但是hadoop默认路径没有这包,如是需要自己配置、


 

加到hadoop-env.sh
$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*:
$HADOOP_HOME/share/hadoop/tools/lib/*
hadoop classpath
/opt/hadoop-2.7.1/etc/hadoop:/opt/hadoop-2.7.1/share/hadoop/common/lib/*:/opt/hadoop-2.7.1/share/hadoop/common/*:
/opt/hadoop-2.7.1/share/hadoop/hdfs:
/opt/hadoop-2.7.1/share/hadoop/hdfs/lib/*:/opt/hadoop-2.7.1/share/hadoop/hdfs/*:
/opt/hadoop-2.7.1/share/hadoop/yarn/lib/*:/opt/hadoop-2.7.1/share/hadoop/yarn/*:
/opt/hadoop-2.7.1/share/hadoop/mapreduce/lib/*:
/opt/hadoop-2.7.1/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/share/hadoop/mapreduce/*:
/share/hadoop/mapreduce/lib/*:/share/hadoop/tools/lib/*

另外需要配置s3访问的id和key。

hadoop  fs -ls  s3n://mobileapiaccess/activelog

core-site.xml中添加了aws 证书:
<property>
  <name>fs.s3.awsAccessKeyId</name>
  <value>id</value>
</property>
  
<property>
  <name>fs.s3n.awsAccessKeyId</name>
  <value>id</value>
</property>
  
<property>
  <name>fs.s3.awsSecretAccessKey</name>
  <value>key</value>
</property>
  
<property>
  <name>fs.s3n.awsSecretAccessKey</name>
  <value>key</value>
</property>