0%

企业级应用——ELK(三):filebeat

  提到ELK,就不得不提到EFK,通常意义上说,EFK是指用filebeat代替logstash形成的新组合。(哈,也有是指Fluentd的,这个我们之后再说)
  Filebeat 是基于原先 logstash-forwarder 的源码改造出来的,无需依赖 Java 环境就能运行,安装包10M不到。
  而且如果日志的量很大,Logstash 会遇到资源占用高的问题,为解决这个问题,我们引入了Filebeat。Filebeat 是基于 logstash-forwarder 的源码改造而成,用 Golang 编写,无需依赖 Java 环境,效率高,占用内存和 CPU 比较少,非常适合作为 Agent 跑在服务器上,来实现日志转发的功能。

  还是去官网下载https://www.elastic.co/cn/downloads/beats/filebeat。本次演示还是以最新版的filebeat7.5.1为例(以前版本的filebeat配置文件格式参数上可能有一些改变,不过大同小异)。

1
2
3
cd /usr/local/src/
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.1-amd64.deb
dpkg -i filebeat-7.5.1-amd64.deb

基础配置

  配置文件也很简单,如果想要写入文件,则配置如下

1
2
3
4
5
6
7
8
9
10
11
grep -v "#" /etc/filebeat/filebeat.yml | grep -v "^$"
filebeat.inputs:
- type: log
paths:
- /var/log/syslog
exclude_lines: ["^DBG"]
exclude_files: [".gz$"]
tags: "syslog-filebeat"
output.file:
path: "/tmp"
filename: "filebeat.txt"

  paths路径支持正则通配符写法,exclude是设置不匹配的文件格式。而且filebeat也支持同时从多个路径收集写成如下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
filebeat.inputs:
- type: log
paths:
- /var/log/syslog
exclude_lines: ["^DBG"]
exclude_files: [".gz$"]
tags: "syslog-filebeat"
- type: log
paths:
- /var/log/nginx/access.log
exclude_lines: ["^DBG"]
exclude_files: [".gz$"]
document_type: "nginx-accesslog-filebeat
output.file:
path: "/tmp"
filename: "filebeat.txt"

  同样,filebeat支持写入redis和kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
filebeat.inputs:
- type: log
paths:
- /var/log/syslog
exclude_lines: ["^DBG"]
exclude_files: [".gz$"]
tags: "filebeat-redis-syslog"
output.redis:
hosts: ["192.168.32.31:6379"]
key: "filebeat-system-log" #为了后期日志处理,建议自定义 key 名称
db: 1 #使用第几个库
timeout: 5 #超时时间
password: 123456 #redis 密码

  想要写入kafka则添加output插件,配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filebeat.inputs:
- type: log
paths:
- /var/log/syslog
exclude_lines: ["^DBG"]
exclude_files: [".gz$"]
tags: "filebeat-kafka-syslog"
output.kafka: #写入 kafka
hosts: ["192.168.15.11:9092","192.168.15.12:9092","192.168.15.13:9092"]
topic: "systemlog-1512-filebeat"
partition.round_robin:
reachable_only: true
required_acks: 1 #本地写入完成
compression: gzip #开启压缩
max_message_bytes: 1000000 #消息最大值

配置详解

input

  也就是设置日志收集的来源,需要的属性有type,path,根据官方文档,现在版本常用写法为,

1
2
3
4
5
6
7
8
9
10
11
filebeat.inputs:
- type: log
paths:
- /var/log/system.log
- /var/log/wifi.log
- type: log
paths:
- "/var/log/apache2/*"
fields:
apache: true
fields_under_root: true

  其中type的类型很多

  • Log 日志文件,必须有PATH,官方示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
filebeat.inputs:
- type: log
paths:
- /var/log/system.log
- /var/log/wifi.log
- /var/log/*.log
- type: log
paths:
- "/var/log/apache2/*"
fields:
apache: true
fields_under_root: true
  • Stdin 标准输入,没有PATH,官方示例如下:
1
2
filebeat.inputs:
- type: stdin
  • Container 容器中日志,必须有PATH,官方示例如下:
1
2
3
4
filebeat.inputs:
- type: container
paths:
- '/var/lib/docker/containers/*/*.log'
  • Kafka 从kafka中读取数据,官方示例如下:
1
2
3
4
5
6
7
8
9
filebeat.inputs:
- type: kafka
hosts: [".servicebus.windows.net:9093"]
topics: [""]
group_id: ""

username: "$ConnectionString"
password: ""
ssl.enabled: true
  • Redis 从redis中读取数据,官方示例如下:
1
2
3
4
filebeat.inputs:
- type: redis
hosts: ["localhost:6379"]
password: "${redis_pwd}"
  • UDP 开放UDP端口来接受数据,可设置单条最大数据上限,不定义默认为20MiB。
1
2
3
4
filebeat.inputs:
- type: udp
max_message_size: 10KiB
host: "localhost:8080"
  • Docker 也支持直接从容器中读取数据, containers.ids是必须定义说明,可以用*代表所有容器。
1
2
3
4
filebeat.inputs:
- type: docker
containers.ids:
- '8b6fe7dc9e067b58476dc57d6986dd96d7100430c5de3b109a99cd56ac655347'
  • TCP 用法与UDP相同,设置监听的主机和端口。
1
2
3
4
filebeat.inputs:
- type: tcp
max_message_size: 10MiB
host: "localhost:9000"
  • Syslog 监听系统日志,指定传输协议即可。类似TCP和UDP。
1
2
3
4
filebeat.inputs:
- type: syslog
protocol.udp:
host: "localhost:9000"
1
2
3
4
filebeat.inputs:
- type: syslog
protocol.tcp:
host: "localhost:9000"
  • s3 AWS的对象存储日志,不过多介绍
  • NetFlow Cisco设备网络设备的日志,不过多介绍
  • Google Pub/Sub google云的订阅发布模式协议数据,不过多介绍。

  一般来说,我们都写log,就可以满足我们绝大多数场景的使用了。除了typepath这俩常用的input属性外,还有两个设置属性,我们也经常会用到,就是include_linesexclude_lines,顾名思义,就是包括和排除,配合path中的通配符,可以帮助我们更灵活的指定要收集的日志文件。
  还有一个很常用的属性就是tags,可以写多个,用[]括起来就可以。因为在filebeat中因为有自带type关键字,所以我们在之后筛选日志的时候,无法通过type字段来区分不同的日志源了,所以我们可以通过自定义tags字段,来实现之前在logstash上type的功能,这样在我们收集到的日志中,会自动加入tags 标签属性,然后通过logstash的筛选时,就可以对tags关键字做判断了。

output

  输出选项有ElasticsearchLogstashKafkaRedisFileConsoleElastic Cloud

  • File
    输出到文件中是最简单的设置了,一般用于测试。

    1
    2
    3
    4
    5
    6
    output.file:
    path: "/tmp/filebeat" #输出文件路径
    filename: filebeat #输出日志名称,超过大小限制后会自动添加数字后缀
    #rotate_every_kb: 10000 #每个日志文件大小限制
    #number_of_files: 7 #路径下最大的储存日志文件数量,超过此值后自动删除最早的日志文件,默认为7
    #permissions: 0600 #创建的日志文件的权限
  • Logstash
    filebeat支持直接将数据输出值logstash主机。

    1
    2
    output.logstash:
    hosts: ["127.0.0.1:5044"]

  而logstash主机需要设置输入为beats,才可以顺利接收filebeat的数据。

1
2
3
4
5
6
7
8
9
10
11
12
input {
beats {
port => 5044
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
}
}

  • redis
    输出值redis,上面有说明,这里就不详细介绍了。

    1
    2
    3
    4
    5
    6
    output.redis:
    hosts: ["localhost"]
    password: "my_password"
    key: "filebeat"
    db: 0
    timeout: 5
  • kafka
    输出至kafka。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]

    # message topic selection + partitioning
    topic: '%{[fields.log_topic]}'
    partition.round_robin:
    reachable_only: false

    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000

  也可以输出至kafka的不同的topic中

Rule settings:
topic
The topic format string to use. If this string contains field references, such as %{[fields.name]}, the fields must exist, or the rule fails.
mappings
A dictionary that takes the value returned by topic and maps it to a new >name.
default
The default string value to use if mappings does not find a match.
when
A condition that must succeed in order to execute the current rule. All the conditions supported by processors are also supported here.

  官方示例如下:

1
2
3
4
5
6
7
8
9
10
output.kafka:
hosts: ["localhost:9092"]
topic: "logs-%{[beat.version]}"
topics:
- topic: "critical-%{[beat.version]}"
when.contains:
message: "CRITICAL"
- topic: "error-%{[beat.version]}"
when.contains:
message: "ERR"

  • Elaticsearch
    可以直接将数据输出给elaticsearch服务器,不过一般来说我们不会这样做,一般是会经过logstash来筛选之后再传入elaticsearch。官方示例如下:

    1
    2
    3
    4
    output.elasticsearch:
    hosts: ["https://localhost:9200"]
    username: "filebeat_internal"
    password: "YOUR_PASSWORD"
  • Console
    输出至屏幕终端显示。pretty官方的介绍为If pretty is set to true, events written to stdout will be nicely formatted. The default is false,示例如下:

    1
    2
    output.console:
    pretty: true
------------------------------- The End -------------------------------
It's very nice of you to support me.