当前位置: 首页 > news >正文

大数据Hadoop之——Flume安装与使用(详细)

目录

一、Flume架构

1、Agent 

2、Source

3、Channel

4、Sink

5、Event

二、前期准备

1、查看网卡

2、配置静态IP 

3、设置主机名 

4、配置IP与主机名映射

5、关闭防火墙

6、配置免密登录

三、JDK的安装

四、Flume的安装

1、Flume的下载安装  

2、Flume的使用

2.1. 创建 Flume Agent配置文件

2.2. 编写内容

 2.3. 创建脚本

2.4. 启动HDFS

​2.5. 启动Flume

2.6. 执行脚本

2.7. 结束进程

五、实时监控目录下多个新文件

1、创建监控目录

2、创建 Flume Agent 配置文件

3、创建脚本

4、启动HDFS

5、启动Flume

6、执行脚本

7、结束进程

六、实时监控目录下多个新文件

1、创建监控目录

2、创建 Flume Agent 配置文件

3、创建脚本

4、启动HDFS

5、启动Flume

6、执行脚本

7、结束进程


     (1)Flume官网地址:http://flume.apache.org/
     (2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
     (3)下载地址:http://archive.apache.org/dist/flume/

     Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

       Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

一、Flume架构

1、Agent 

      Agent 是一个运行在 Java 虚拟机(JVM)上的程序 ,它就像是一个小快递员。

      它的任务是:把数据从一个地方搬运到另一个地方

      数据在 Flume 中是以“事件(Event)”的形式存在的。

2、Source

     Source 是 Agent 的入口 ,相当于快递员的起点站。

     它负责接收各种来源的数据,比如:

  • 网络日志(netcat)

  • 文件内容(taildir、spooling directory)

  • 消息队列(如 Kafka、JMS)

  • 或者模拟数据(sequence generator)

3、Channel

     Channel 就像是快递员的背包 ,用来临时存放 Source 接收到的数据。

     它起到一个中转站的作用 ,让 Source 和 Sink 可以各自按照自己的节奏工作。

     Flume 提供了两种常用的 Channel【Memory Channel(内存通道)、File Channel(文件通道)】:

  • Memory Channel(内存通道)

    • 数据存在内存里,速度快。

    • 但如果程序崩溃或服务器断电,数据可能会丢。

  • File Channel(文件通道)

    • 数据写入磁盘,更安全。

    • 即使程序关闭或机器重启,数据也不会丢失。

4、Sink

     Sink 是 Agent 的出口 ,相当于快递员的目的地。

     它不断地从 Channel 中取数据,并把这些数据发送出去或者保存起来。

     常见的目的地有:

  • HDFS(存储大数据)

  • HBase、Solr(数据库/搜索引擎)

  • 日志记录器(Logger)

  • 或者发给下一个 Agent 继续处理

5、Event

      Event 是 Flume 中传输数据的基本单位 ,就像快递员运送的小包裹。

      每个 Event 包含两部分:

  • Header(头信息) :像标签一样,包含一些描述信息(比如数据类型、时间等),是键值对(Key-Value)形式。

  • Body(主体) :真正要传输的数据,是一串字节(byte[]),可以是文本、JSON、图片等。

二、前期准备

1、查看网卡

2、配置静态IP 

vi /etc/sysconfig/network-scripts/ifcfg-ens32  ----  根据自己网卡设置。 

3、设置主机名 

hostnamectl --static set-hostname  主机名

例如:

hostnamectl --static set-hostname  hadoop001

4、配置IP与主机名映射

vi /etc/hosts

5、关闭防火墙

systemctl stop firewalld

systemctl disable firewalld

6、配置免密登录

传送门 

三、JDK的安装

传送门

四、Flume的安装

1、Flume的下载安装  

​1.1. 下载

https://archive.apache.org/dist/flume/flume-1.9.0/

​下载 apache-flume-1.9.0-bin.tar.gz 安装包

1.2 上传
使用xshell上传到指定安装路径

此处是安装路径是 /opt/module

​​

1.3 解压重命名

tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz 

mv apache-flume-1.9.0-bin flume

1.4 配置环境变量

vi  /etc/profile

export JAVA_HOME=/opt/module/java

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export FLUME_HOME=/opt/module/flume

export PATH=$PATH:$JAVA_HOME/bin:$FLUME_HOME/bin

1.5 加载环境变量

source  /etc/profile

验证环境变量是否生效:

env | grep HOME

env | grep PATH

1.6 删除guava

     将lib文件夹下的guava-11.0.2.jar删除,以兼容Hadoop

     rm -rf /opt/module/flume/lib/guava-11.0.2.jar

2、Flume的使用

2.1. 创建 Flume Agent配置文件

       Agent = Source + Channel + Sink

       Flume可以定义多个Agent,如a1,a2,a3,....

1、创建job文件夹

      在flume目录下创建job文件夹并进入job文件夹。

      mkdir /opt/module/flume/job

      cd /opt/module/flume/job

2、创建 Flume Agent配置文件

     在job文件夹下创建Flume Agent配置文件 log-flume-hdfs.conf

     配置六大步骤:

     01、定义一个名为 a1 的 Flume Agent

     02、给 Agent a1 指定使用的组件

     03、配置source

     04、配置sink

     05、配置chanel

     06、连接通道

2.2. 编写内容

cd /opt/module/flume/job

vi log-flume-hdfs.conf

#01、02 给 Agent a1 指定使用的组件

#使用的 Source 是 r1
#使用的 Sink 是 k1
#使用的 Channel 是 c1

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#03 配置source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /opt/module/test.log

# 04配置sink:类型为hdfs、路径指定hdfs

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop001:9000/flume/%Y%m%d/%H

#上传文件的前缀

a1.sinks.k1.hdfs.filePrefix = logs-

#下面3行表示每小时创建一个文件夹

#是否按照时间滚动文件夹

a1.sinks.k1.hdfs.round = true

#多少时间单位创建一个新的文件夹

a1.sinks.k1.hdfs.roundValue = 1

#重新定义时间单位

a1.sinks.k1.hdfs.roundUnit = hour

#是否使用本地时间戳,如果使用%Y%m%d/格式必须设为true否则报错

a1.sinks.k1.hdfs.useLocalTimeStamp = true

#积攒多少个Event才flush到HDFS一次

a1.sinks.k1.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a1.sinks.k1.hdfs.fileType = DataStream

#生成新文件的条件:时间60秒,大小约128M、生成环境一般是3600s

#多久生成一个新的文件

a1.sinks.k1.hdfs.rollInterval = 60

#设置每个文件的滚动大小

a1.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关

a1.sinks.k1.hdfs.rollCount = 0

# 05 配置通道:方式、最多缓存Event个数、每次事务最多处理的Event个数

# 作为 Source 和 Sink 之间的“中转站”,缓解两者速度不一致的问题

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# 06 连接通道:a1源的组件r1使用通道 c1,a源的组件k1使用通道 c1,

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 2.3. 创建脚本

touch /opt/module/genarete_log.sh
chmod 777 /opt/module/genarete_log.sh


vi /opt/module/genarete_log.sh

#!/bin/bash
for i in {1..100}; do
    echo "$i linux生成1-100的数字的for循环" >> /opt/module/test.log
done

tail -10 /opt/module/test.log

2.4. 启动HDFS

/opt/module/hadoop/bin/start-dfs.sh

/opt/module/hadoop/sbin/start-yarn.sh

如果hadoop未安装请走这个门:Hadoop安装传送门

​2.5. 启动Flume

    启动flume,使用 ncf,即--name  agent 名称、--conf 目录 --conf-file 配置文件 顺序不固定

cd /opt/module/flume

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/log-flume-hdfs.conf

简写:

bin/flume-ng agent --c conf/ --n a1 -f job/log-flume-hdfs.conf

2.6. 执行脚本

      执行脚本,在hdfs上查看结果

/opt/module/genarete_log.sh

hdfs dfs -ls /flume

2.7. 结束进程

ps -ef | grep flume | grep -v grep | awk '{print $2}' | xargs kill -9 

五、实时监控目录下多个新文件

    在上面的基础上进行增加需求,让 flume 监控 /opt/module/flume/upload 录下的多个文件,并上传至HDFS。

1、创建监控目录

 mkdir /opt/module/flume/upload

2、创建 Flume Agent 配置文件

在job文件夹下创建Flume Agent 配置文件 flume-dir-hdfs.conf

cd /opt/module/flume/job

vi flume-dir-hdfs.conf

#01、02 给 Agent a2 指定使用的组件

#使用的 Source 是 r2
#使用的 Sink 是 k2
#使用的 Channel 是 c2

a2.sources = r2
a2.sinks = k2
a2.channels = c2

#03 配置source

a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume/upload
a2.sources.r2.fileSuffix = .COMPLETED
a2.sources.r2.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a2.sources.r2.ignorePattern = ([^ ]*\.tmp)

# 04配置sink:类型为hdfs、路径指定hdfs

a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop001:9000/flume/upload/%Y%m%d/%H

#上传文件的前缀

a2.sinks.k2.hdfs.filePrefix = upload-

#下面3行表示每小时创建一个文件夹

#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream

#生成新文件的条件:时间60秒,大小约128M、生成环境一般是3600s

#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# 05 配置通道:方式、最多缓存Event个数、每次事务最多处理的Event个数

# 作为 Source 和 Sink 之间的“中转站”,缓解两者速度不一致的问题

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# 06 连接通道:a2源的组件r2使用通道 c2,a2源的组件k2使用通道 c2,

a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、创建脚本

vi /opt/module/genarete_log.sh

#!/bin/bash
rm -rf /opt/module/flume/upload/*
for i in {1..100}; do
    echo "$i linux生成1-100的数字的for循环 " >> /opt/module/flume/upload/test01.log
    echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/upload/test02.log
    echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/upload/test03.log
done

tail -3 /opt/module/flume/upload/test01.log
tail -3 /opt/module/flume/upload/test02.log
tail -3 /opt/module/flume/upload/test03.log
          

4、启动HDFS

如果已经启动,则忽略

/opt/module/hadoop/bin/start-dfs.sh

/opt/module/hadoop/sbin/start-yarn.sh

如果hadoop未安装请走这个门:Hadoop安装传送门

5、启动Flume

    启动flume,使用 cnc,即--conf 目录 --name  agent 名称 --conf-file 配置文件 顺序不固定

cd /opt/module/flume

bin/flume-ng agent --c conf/ --n a2 -f job/flume-dir-hdfs.conf

6、执行脚本

      执行脚本,在hdfs上查看结果

/opt/module/genarete_log.sh

hdfs dfs -ls /flume

7、结束进程

ps -ef | grep flume | grep -v grep | awk '{print $2}' | xargs kill -9 

六、实时监控目录下多个新文件

       Exec source适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

     在上面的基础上进行增加需求,让 flume 监控 /opt/module/flume/files 整个目录的实时追加文件,并上传至HDFS。

1、创建监控目录

 mkdir /opt/module/flume/files01

 mkdir /opt/module/flume/files02

2、创建 Flume Agent 配置文件

在job文件夹下创建Flume Agent 配置文件 flume-taildir-hdfs.conf

cd /opt/module/flume/job

vi flume-taildir-hdfs.conf

#01、02 给 Agent a3 指定使用的组件

#使用的 Source 是 r3
#使用的 Sink 是 k3
#使用的 Channel 是 c3

a3.sources = r3
a3.sinks = k3
a3.channels = c3

#03 配置source

a3.sources.r3.type = TAILDIR

a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json

#配置两个文件组变量

a3.sources.r3.filegroups = f1 f2

#监控两个文件目录

a3.sources.r3.filegroups.f1 = /opt/module/flume/files01/.*file.*

a3.sources.r3.filegroups.f2 = /opt/module/flume/files02/.*log.*

# 04配置sink:类型为hdfs、路径指定hdfs

a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop001:9000/flume/upload/%Y%m%d/%H

#上传文件的前缀

a3.sinks.k3.hdfs.filePrefix = upload-

#下面3行表示每小时创建一个文件夹

#是否按照时间滚动文件夹

a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream

#生成新文件的条件:时间60秒,大小约128M、生成环境一般是3600s

#多久生成一个新的文件

a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# 05 配置通道:方式、最多缓存Event个数、每次事务最多处理的Event个数

# 作为 Source 和 Sink 之间的“中转站”,缓解两者速度不一致的问题

a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# 06 连接通道:a3源的组件r3使用通道 c3,a3源的组件k3使用通道 c3,

a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

3、创建脚本

vi /opt/module/genarete_log.sh

#!/bin/bash
rm -rf /opt/module/flume/files01/*
rm -rf /opt/module/flume/files02/*

# 第一个循环:i 从 1 到 100
for i in {1..100}; do
    echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files01/test01.file.txt
    echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files01/test02.file.txt
    echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files02/test01.log
    echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files02/test02.log
done

echo "查看生成数据 ..."
sleep 2

# 查看最后几行日志内容
tail -2 /opt/module/flume/files01/test01.file.txt
tail -2 /opt/module/flume/files01/test02.file.txt
tail -2 /opt/module/flume/files02/test01.log
tail -2 /opt/module/flume/files02/test02.log


echo "生成新数据 ..."
sleep 10

for i in {101..200}; do
    if [ $i -eq 150 ]; then
        echo "$i 等于 150,开始休眠 10 秒..."
        sleep 10
    else
        echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files01/test01.file.txt
        echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files01/test02.file.txt
        echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files02/test01.log
        echo "$i linux生成1-100的数字的for循环" >> /opt/module/flume/files02/test02.log
    fi
done

tail -2 /opt/module/flume/files01/test01.file.txt
tail -2 /opt/module/flume/files01/test02.file.txt
tail -2 /opt/module/flume/files02/test01.log

tail -2 /opt/module/flume/files02/test02.log

4、启动HDFS

如果已经启动,则忽略

/opt/module/hadoop/bin/start-dfs.sh

/opt/module/hadoop/sbin/start-yarn.sh

如果hadoop未安装请走这个门:Hadoop安装传送门

5、启动Flume

    启动flume,使用 cnc,即--conf 目录 --name  agent 名称 --conf-file 配置文件 顺序不固定

cd /opt/module/flume

bin/flume-ng agent --c conf/ --n a3 -f job/flume-dir-hdfs.conf

6、执行脚本

      执行脚本,在hdfs上查看结果

/opt/module/genarete_log.sh

hdfs dfs -ls /flume

7、结束进程

ps -ef | grep flume | grep -v grep | awk '{print $2}' | xargs kill -9 

http://www.lqws.cn/news/573985.html

相关文章:

  • sqlserver函数与过程(二)
  • 【Docker基础】Docker容器管理:docker inspect及其参数详解
  • 使用component封装组件和h函数的用法
  • Win10/Win11电源和电池设置打不开/卡住的解决方案(查看 电池健康度报告)
  • 【无标题】linux系统中无法删除文件后空间没有被释放还在被占用
  • AI智能体|扣子(Coze)搭建【沉浸式历史故事解说视频】工作流
  • 设计模式 | 过滤器模式
  • Springboot 集成 SpringBatch 批处理组件
  • 战略进阶——解读124页战略分析工具【附全文阅读】
  • #华为昇腾#华为计算#昇腾开发者计划2025#
  • Elasticsearch 集群升级实战指引—7.x 升级到 8.x
  • 开发者视角:一键拉起与快速安装的巧妙运用
  • 精通C++包括哪些方面
  • PowerBi 巧用UNICHAR(8203)实现自定义排序
  • Utils系列之内存池(Fixed size)
  • Modbus 报文结构与 CRC 校验实战指南(一)
  • Java面试宝典:基础一
  • 《弦论视角下前端架构:解构、重构与无限延伸的可能》
  • 71. 简化路径 —day94
  • LeetCode 第80题 删除有序数组中的重复项Ⅱ
  • b+和b树
  • AlpineLinux安装部署elasticsearch
  • 前端单点登录
  • 什么是 Event Loop?
  • 【C++】C++中的友元函数和友元类
  • LRU缓存设计与实现详解
  • 现代C++ 文件系统库
  • 重塑视觉叙事:用After Effects AI抠像与Photoshop神经滤镜“导演”你的设计
  • RNN人名分类器案例
  • Anaconda虚拟环境相关的常用命令