ELK8.12.2(elasticsearch+logstash+kibana)集群部署
ELK8.12.2(elasticsearch+logstash+kibana)集群部署
ELK8.12.2(elasticsearch+logstash+kibana)集群部署
elasticsearch下载:https://www.elastic.co/downloads/past-releases#elasticsearchlogstash下载:http://elastic.co/downloads/past-releases#logstashkibana下载:https://www.elastic.co/downloads/past-releases#kibanaik分词器下载1:https://release.infinilabs.com/analysis-ik/stable/ik分词器下载2:https://github.com/infinilabs/analysis-ik/releases
一、elasticsearch部署
1. 系统配置
参考:https://www.elastic.co/guide/cn/elasticsearch/guide/current/_file_descriptors_and_mmap.html
● 不配置虚拟内存,可能会报错:[error]max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]
● 不配置打开文件数,可能会报错:[error] max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536] elasticsearch
● 不配置允许最大进程数为4096,可能会报错:[error]max number of threads [1024] for user [judy2] likely too low, increase to at least [4096]
● 需要创建ES用户,ES不支持root用户启动
# 设置虚拟内存
sysctl -w vm.max_map_count=262144 #临时方式
vi /etc/sysctl.conf #永久方式
vm.max_map_count=262144 #新增
# 设置打开文件数和进程数
# 查看
ulimit -n
# 设置打开文件数和进程数为65535
vi /etc/security/limits.conf
* soft nofile 65535
* hard nofile 65535
* soft nproc 65535
* hard nproc 65535
# 禁用swap交换分区
swapoff -a #临时禁用
vi /etc/fstab #永久禁用
找到swap这一行前面使用#符号禁用掉
# 重载配置
sysctl -p
# 设置主机名(单机部署不需要)
hostnamectl set-hostname es1
# 配置hosts
vim /etc/hosts
192.168.1.107 es1
192.168.1.108 es2
192.168.1.109 es3
# 创建用户和密码(ES不能使用root用户启动)
useradd es
passwd es #输入密码es123456
2. 部署
# 创建es目录
mkdir /opt/es
# 解压
tar -zxvf elasticsearch-8.12.2-linux-x86_64.tar.gz -C /opt/es
# 修改文件目录属性
chown es:es -R /opt/es
# 切换用户
su - es
# 解决控制台乱码(可不执行)
vi /opt/es/elasticsearch-8.12.2/config/jvm.options
-Dfile.encoding=GBK #新增
# 设置内存(默认是4g)
vi /opt/es/elasticsearch-8.12.2/config/jvm.options.d/heap.options
# 新增,生产环境最好不要超过32g
-Xms512m
-Xmx512m
3. ik分词器安装
# 解压
unzip elasticsearch-analysis-ik-8.12.2.zip -d /opt/es/elasticsearch-8.12.2/plugins/elasticsearch-analysis-ik-8.12.2
# 查看插件情况
/opt/es/elasticsearch-8.12.2/bin/elasticsearch-plugin list
# 显示
elasticsearch-analysis-ik-8.12.2
# ik_smart:最少切分 ik_max_word:最细粒度划分(穷尽词库的可能)
# kibana调试工具测试
POST /_analyze
{
"analyzer": "ik_smart",
"text":"中华人民共和国"
}
# 扩展词库配置,需要重启ES
vi /opt/es/elasticsearch-8.12.2/plugins/elasticsearch-analysis-ik-8.12.2/config/IKAnalyzer.cfg.xml
<!--扩展字典,和IKAnalyzer.cfg.xml在同级目录下 -->
<entry key="ext_dict">exi.dic</entry>
<!--扩展停止词字典,和IKAnalyzer.cfg.xml在同级目录下-->
<entry key="ext_stopwords">stopword.dic</entry>
4. 证书生成
集群使用http访问,可忽略此步骤
集群使用https访问,就需要生成证书,用于ES节点之间进行安全数据传输其中一个ES节点执行,生成证书后拷贝到其他服务器的ES节点上
4.1. ca证书
# 生成ca证书
cd /opt/es/elasticsearch-8.12.2
# 生成CA证书(输入证书名称、设置密码,一直回车即可)
./bin/elasticsearch-certutil ca
# 用ca证书签发节点证书(上一步生成证书的密码、第二个ca证书名称以及密码,一直回车即可)
./bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
# 将es的elastic-certificates.p12证书转成PEM格式的CA证书,用于logstash连接es使用(ES使用ca证书时需要,http证书不需要)
openssl pkcs12 -in elastic-certificates.p12 -cacerts -nokeys -out ca.crt
# 将证书移动到ES的config目录的certs目录之下
mkdir /opt/es/elasticsearch-8.12.2/config/certs
mv elastic-stack-ca.p12 config/certs/
mv elastic-certificates.p12 config/certs/
mv ca.crt config/certs/
# 将证书拷贝到另外两个节点
cd /opt/es/elasticsearch-8.12.2/config/certs
scp elastic-stack-ca.p12 es@192.168.1.108:/opt/es/elasticsearch-8.12.2/config/certs/
scp elastic-certificates.p12 es@192.168.1.108:/opt/es/elasticsearch-8.12.2/config/certs/
scp ca.crt es@192.168.1.108:/opt/es/elasticsearch-8.12.2/config/certs/
4.2. http证书
使用https访问接口,最好生成http证书,方便kibana和logstash连接
把生成的http.p12和elasticsearch-ca.pem证书移动到config/certs目录下
把这两个证书发送到ES的其他节点的config/certs目录下
● 使用ca证书生成http证书
参考地址:https://blog.csdn.net/h952520296/article/details/134371985?spm=1001.2101.3001.6650.3&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7Ebaidujs_baidulandingword%7ECtr-3-134371985-blog-135743571.235%5Ev43%5Epc_blog_bottom_relevance_base5&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7Ebaidujs_baidulandingword%7ECtr-3-134371985-blog-135743571.235%5Ev43%5Epc_blog_bottom_relevance_base5&utm_relevant_index=5
● 直接生成ca证书以及http证书
参数地址:https://cloud.tencent.com/developer/article/2457640
# 生成http证书--这里采用ca证书的方法生成http证书
cd /opt/es/elasticsearch-8.12.2
./bin/elasticsearch-certutil http
# 这里我们需要的是生成自签名证书,所以这里输入N。
Generate a CSR? [y/N] 选择N
# 选择是否使用现有证书,选y
Use an existing CA? [y/N] 选择y
# 输入ca证书的路径
CA Path: /opt/es/elasticsearch-8.12.2/config/certs/elastic-stack-ca.p12 输入ca证书的路径
# 输入密码,没有密码直接回车
Password for elastic-stack-ca.p12: 回车
# 是否需要对证书进行修改。不对证书需要进行修改,直接选择N
Do you wish to change any of these options? [y/N] 选择N
# 设置证书有效时长,设置了100年
For how long should your certificate be valid? [5y] 100y
# 选择节点单独的证书还是集群统一的证书,选择N,进入集群证书生成步骤
Generate a certificate per node? [y/N] 选择N
# 配置集群的主机名
Enter all the hostnames that you need, one per line.
When you are done, press <ENTER> once more to move on to the next step.
输入
es1 回车
es2 回车
es3 回车
回车
# 确认集群信息
Is this correct [Y/n] 选择Y
配置集群的IP地址
Enter all the IP addresses that you need, one per line.
When you are done, press <ENTER> once more to move on to the next step.
输入
192.168.1.107 回车
192.168.1.108 回车
192.168.1.109 回车
回车
# 确认集群信息
Is this correct [Y/n] 选择Y
# 是否还需要进行修改。如果不需要修改,输入N即可
Do you wish to change any of these options? [y/N] 选择N
# 对私钥进行密码的配置。该私钥将被应用与我们生成的http.p12证书中
Provide a password for the "http.p12" file: [<ENTER> for none]回车
# 输出的zip的文件名
What filename should be used for the output zip file? [/opt/es/elasticsearch-8.12.2/elasticsearch-ssl-http.zip]
回车
# 生成elasticsearch-ssl-http.zip
unzip elasticsearch-ssl-http.zip -d elasticsearch-ssl-http
elasticsearch包:http证书--http.p12
kibana包:kibana和logstash连接es的证书--elasticsearch-ca.pem
5. 配置文件
使用ca证书情况下,xpack.security.http.ssl.enabled先设置false不开启https访问,启动后修改elastic、kibana_system、logstash_system等用户密码,再改成true
使用http证书没有此问题
原因:使用ca自签名证书,在没有添加该证书到受信任的证书列表之前,SSL 验证会失败
# elasticsearch配置文件详解
vi /opt/es/elasticsearch-8.12.2/config/elasticsearch.yml
# 集群名称,集群部署所有节点名称保持一致
#cluster.name: elasticsearch_cluster
# 节点名称,集群部署每个节点名称配置不同
node.name: node1
# 节点角色,注意至少有两个具有选举master资格的节点
#node.roles: [master,data]
# 数据存储路径,不配置默认安装目录的data下面(逗号分隔多个目录)
path.data: /opt/es/data1,/opt/es/data2
# 日志存储路径,不配置默认安装目录的logs下面
path.logs: /opt/es/logs
# 启用内存锁定功能,避免es使用swap交换分区提高性能
#bootstrap.memory_lock: true
# 监听地址,0.0.0.0代表所有地址
#network.host: 0.0.0.0
network.host: 本机ip
# 用于HTTP客户端通信的端口。从9200开始往后找第一个可用端口
http.port: 9200
# 用于集群内节点间互相发现。提供了集群内符合主节点条件的其他节点列表,可以是IP地址或者主机名。可以是数组或序列形式
#discovery.seed_hosts: ["集群ip1:9300", "集群ip2:9300", "集群ip3:9300"]
# 指定集群的初始主节点列表,负责选举出主节点,并承担集群的管理和协调工作
#cluster.initial_master_nodes: ["节点名称1", "节点名称2", "节点名称3"]
# 是否开启密码验证(true:需要输入密码)
xpack.security.enabled: true
# 用于启用集群注册功能(enrollment)(true:启用集群注册的功能,允许节点通过安全的方式加入到集群中)
xpack.security.enrollment.enabled: true
# 是否开启https访问(true:开启)
xpack.security.http.ssl:
# true:开启 false:关闭
enabled: true
# 使用ca证书
#keystore.path: certs/elastic-certificates.p12
# 使用http证书(优先使用)
keystore.path: certs/http.p12
xpack.security.transport.ssl:
enabled: true
# certificate:验证对等方证书 none:不验证证书
verification_mode: certificate
# 使用ca证书
keystore.path: certs/elastic-certificates.p12
truststore.path: certs/elastic-certificates.p12
# 开启跨域支持
http.cors.enabled: true
# 允许所有人跨域访问
http.cors.allow-origin: "*"
6. 启动
# 切换目录
cd /opt/es/elasticsearch-8.12.2/bin
# 后台启动
./elasticsearch -d
浏览器访问 https://ip:9200/
默认账户:elastic 密码:看SSH窗口里显示的密码
# 重置密码
cd /opt/es/elasticsearch-8.12.2/bin
./elasticsearch-reset-password -u elastic -i
./elasticsearch-reset-password -u kibana_system -i
./elasticsearch-reset-password -u logstash_system -i
7. 常用命令
# 查看系统用户列表
https://ip:9200/_security/user/
# 查看ES节点
https://ip:9200/_cat/nodes
# 查看ES集群的健康状态
https://ip:9200/_cat/health?v
https://ip:9200/_cluster/health?pretty
# 查看ES索引
https://ip:9200/_cat/indices?v
二、logstash部署
1. 部署
# 解压
tar -zxvf logstash-8.12.2-linux-x86_64.tar.gz -C /opt/es
# 设置内存
vi /opt/es/logstash-8.12.2/config/jvm.options
# 默认1g,报错java.lang.OutOfMemoryError: Java heap space,调整内存设置,例如改成5g
-Xms1g
-Xmx1g
# 测试安装是否成功(从标准输入流 输出到 标准输出流)
cd /opt/es/logstash-8.12.2/bin
./logstash -e 'input { stdin { } } output { stdout {} }'
2. 执行单个配置文件
在bin目录下使用./logstash -f <配置文件>执行
2.1. 生成ES的PEM格式的CA证书
ES使用ca证书时需要执行,使用http证书就省略该步骤
其中一个ES节点执行,生成证书后拷贝到其他服务器的ES节点上
# 将es的elastic-certificates.p12证书转成PEM格式的CA证书
# logstash连接es使用
cd /opt/es/elasticsearch-8.12.2/config/certs
openssl pkcs12 -in elastic-certificates.p12 -cacerts -nokeys -out ca.crt
没有密码直接回车生成ca.crt
# 将ca.crt证书拷贝到另外两个节点
scp ca.crt es@192.168.1.108:/opt/es/elasticsearch-8.12.2/config/certs/
scp ca.crt es@192.168.1.108:/opt/es/elasticsearch-8.12.2/config/certs/
2.2. 数据抽取准备
● 创建目录
● 创建索引
● 创建模板
----● 模板未生效问题:Logstash默认会上传一个logstash的模板到ES里。如果你在使用上面这个配置之前,曾经运行过Logstash(一般来说都会),那么ES里就已经存在模板了。你可以curl -XGET ‘http://127.0.0.1:9200/_template/logstash’ 验证。ES里就有两个模板,logstash和(自定义模板名称)都匹配logstash-*索引名,ES会按照一定的规则来尝试自动merge多个都匹配上的模板规则,最终运用到索引上。
解决方法:自定义模板template是可以设置order参数的,不设置默认order值是0。order值越大,在merge规则的时候优先级越高
----● 模板字段类型和数据对不上问题:模板里设置的字段类型为text,但是导入数据的时候该字段的数据格式是date类型,导致导入数据后该字段自动转换成date类型
解决方法:在导入数据前先创建索引,这样子导入的时候字段类型就不会变化
● 创建sql
# 创建存储最后抽取信息的目录
mkdir /opt/es/logstash-8.12.2/config/lastid
# 创建抽取sql的目录
mkdir /opt/es/logstash-8.12.2/config/sql
# 创建抽取配置文件的目录
mkdir /opt/es/logstash-8.12.2/config/config
# 创建模板配置的目录
mkdir /opt/es/logstash-8.12.2/config/template
# 创建索引,如下所示
PUT /template_index
{
"settings": {
"number_of_shards": 9,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"xl": {
"type": "long"
},
"sfzh": {
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"jyrq": {
"type": "date",
"format": "yyyy-MM-dd"
},
"jysj": {
"type": "date",
"format": "HH:mm:ss"
},
"rkrq": {
"type": "date",
"format": "yyyy-MM-dd"
},
"rksj": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"gxsj": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
# 创建模板,如下所示
vi /opt/es/logstash-8.12.2/config/template/template_index.json
{
// 按照名字匹配
"template": "t_dk_ga_yh_fk_bhlc",
// 排序
"order": 1,
"settings": {
// 主分片
"number_of_shards": 9,
// 副本
"number_of_replicas": 1
},
"settings": {
"number_of_shards": 9,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"xl": {
"type": "long"
},
"sfzh": {
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"jyrq": {
"type": "date",
"format": "yyyy-MM-dd"
},
"jysj": {
"type": "date",
"format": "HH:mm:ss"
},
"rkrq": {
"type": "date",
"format": "yyyy-MM-dd"
},
"rksj": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"gxsj": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
# 创建增量抽取的sql
vi /opt/es/logstash-8.12.2/config/sql/template_index.sql
select xl, sfzh, to_char(jyrq,'yyyy-MM-dd') jyrq, to_char(jysj,'HH24:mi:ss') jysj, to_char(rkrq,'yyyy-MM-dd') rkrq, to_char(rksj,'yyyy-MM-dd HH24:mi:ss') rksj, to_char(gxsj,'yyyy-MM-dd HH24:mi:ss') gxsj from template_index where gxsj > :sql_last_value order by gxsj
2.3. MySQL数据抽取
先创建es索引
上传mysql数据库驱动jar
jar放到/opt/es/logstash-8.12.2/jdk/lib目录下(可自定义目录)
bin目录下执行:./logstash -f config/logstash-mysql.conf
vi /opt/es/logstash-8.12.2/config/logstash-mysql.conf
input {
jdbc {
# 设置 MySql 数据库url以及数据库名称
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/ruoyi?characterEncoding=utf8&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 数据库驱动mysql-connector-java-8.0.28.jar所在位置,可以是绝对路径或者相对路径
jdbc_driver_library => "/opt/es/logstash-8.12.2/jdk/lib/mysql-connector-java-8.0.28.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 是否开启分页,默认false,ture为开启
jdbc_paging_enabled => true
# 分页每页数量
jdbc_page_size => "2000"
# 设置时区
#jdbc_default_timezone =>"Asia/Shanghai"
# 执行的sql文件路径
# statement_filepath => "/opt/es/logstash-8.12.2/config/sql/sys_dept.sql"
# 使用这个可以直接写sql语句,但是复杂的语句最好是写在文件内
statement =>"SELECT * FROM sys_dept where dept_id > :sql_last_value order by dept_id"
# 是否需要记录某个字段值,如果为true,我们可以自定义要记录的数据库某个字段值,例如id或date字段。如果为false,记录的是上次执行的标记,默认是一个timestamp
use_column_value => true
# 如果use_column_value为true,需配置此参数. 指定增量更新的字段名。当然该字段必须是递增的,比如id或date字段。
tracking_column => "dept_id"
# tracking_column 对应字段的类型,只能选择timestamp或者numeric(数字类型),默认numeric。
tracking_column_type => "numeric"
#如果为true,每次会记录所更新的字段的值,并保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 是否将字段名称转小写。默认是true。这里注意Elasticsearch是区分大小写的
#lowercase_column_names => true
# 是否清除 last_run_metadata_path 的记录。默认false,true则每次都从头开始查询所有的数据库记录,
#clean_run => false
# 记录上次执行字段值路径。我们可以在sql语句中这么写:WHERE ID > :sql_last_value。其中 :sql_last_value 取得就是该文件中的值,这个last_time会以文件形式存在
last_run_metadata_path => "/opt/es/logstash-8.12.2/config/lastid/sys_dept_lastid.txt"
# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
schedule => "*/5 * * * *"
}
}
# 输出配置
output {
# 输出控制台
stdout {
codec => json_lines
}
# 输出到es
elasticsearch {
hosts => ["https://192.168.1.107:9200", "https://192.168.1.108:9200", "https://192.168.1.109:9200"]
user => "elastic"
password => "elastic"
# 启用 SSL/TLS 加密连接
ssl => true
# es使用http证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
ssl_certificate_verification => true
# es的PEM格式证书文件地址
ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/elasticsearch-ca.pem"
# es使用ca证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
#ssl_certificate_verification => false
# es的PEM格式证书文件地址
#ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/ca.crt"
# ES索引名称(自己定义的)
index => "sys_dept"
# 自定义模板配置文件
#template => "/opt/es/logstash-8.12.2/config/template/sys_dept.json"
# 是否覆盖模板
#template_overwrite => true
# 自定义模板名称
#template_name => "sys_dep"
# 是否由Logstash来管理模板。false:Logstash不会管理或创建索引模板,而是由用户手动配置和管理模板
#manage_template => false
#索引ID
document_id => "%{dept_id}"
}
}
2.4. PostgreSQL数据抽取
先创建es索引
上传postgresql数据库驱动jar
jar放到/opt/es/logstash-8.12.2/jdk/lib目录下(可自定义目录)
bin目录下执行:./logstash -f config/logstash-postgresql.conf
vi /opt/es/logstash-8.12.2/config/logstash-postgresql.conf
input {
jdbc {
# 设置 postgresql 数据库url以及数据库名称
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/gadb?currentSchema=ga,sys"
# 用户名和密码
jdbc_user => "pgadmin"
jdbc_password => "pgadmin"
# 数据库驱动postgresql-42.2.24.jar所在位置,可以是绝对路径或者相对路径
jdbc_driver_library => "/opt/es/logstash-8.12.2/jdk/lib/postgresql-42.2.24.jar"
# 驱动类名
jdbc_driver_class => "org.postgresql.Driver"
# 是否开启分页,默认false,ture为开启
jdbc_paging_enabled => true
# 分页每页数量
jdbc_page_size => "2000"
# 设置时区
#jdbc_default_timezone =>"Asia/Shanghai"
# 执行的sql文件路径
# statement_filepath => "/opt/es/logstash-8.12.2/config/sql/sys_dept.sql"
# 使用这个可以直接写sql语句,但是复杂的语句最好是写在文件内
statement =>"SELECT * FROM public.sys_dept where gxsj > :sql_last_value order by gxsj"
# 是否需要记录某个字段值,如果为true,我们可以自定义要记录的数据库某个字段值,例如id或date字段。如果为false,记录的是上次执行的标记,默认是一个timestamp
use_column_value => true
# 如果use_column_value为true,需配置此参数. 指定增量更新的字段名。当然该字段必须是递增的,比如id或date字段。
tracking_column => "gxsj"
# tracking_column 对应字段的类型,只能选择timestamp或者numeric(数字类型),默认numeric。
tracking_column_type => "timestamp"
#如果为true,每次会记录所更新的字段的值,并保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 是否将字段名称转小写。默认是true。这里注意Elasticsearch是区分大小写的
#lowercase_column_names => true
# 是否清除 last_run_metadata_path 的记录。默认false,true则每次都从头开始查询所有的数据库记录,
#clean_run => false
# 记录上次执行字段值路径。我们可以在sql语句中这么写:WHERE ID > :sql_last_value。其中 :sql_last_value 取得就是该文件中的值,这个last_time会以文件形式存在
last_run_metadata_path => "/opt/es/logstash-8.12.2/config/lastid/t_dk_ga_yh_fk_bhlc_lastid.txt"
# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
schedule => "*/5 * * * *"
}
}
# 输出配置
output {
# 输出控制台
stdout {
codec => json_lines
}
# 输出到es
elasticsearch {
hosts => ["https://192.168.1.107:9200", "https://192.168.1.108:9200", "https://192.168.1.109:9200"]
user => "elastic"
password => "elastic"
# 启用 SSL/TLS 加密连接
ssl => true
# es使用http证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
ssl_certificate_verification => true
# es的PEM格式证书文件地址
ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/elasticsearch-ca.pem"
# es使用ca证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
#ssl_certificate_verification => false
# es的PEM格式证书文件地址
#ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/ca.crt"
# ES索引名称(自己定义的)
index => "t_dk_ga_yh_fk_bhlc"
# 自定义模板配置文件
#template => "/opt/es/logstash-8.12.2/config/template/t_dk_ga_yh_fk_bhlc.json"
# 是否覆盖模板
#template_overwrite => true
# 自定义模板名称
#template_name => "t_dk_ga_yh_fk_bhlc"
# 是否由Logstash来管理模板。false:Logstash不会管理或创建索引模板,而是由用户手动配置和管理模板
#manage_template => false
#索引ID
document_id => "%{xl}"
}
}
2.5 日志文件抽取
bin目录下执行./logstash -f config/logstash-log.conf
vi /opt/es/logstash-8.12.2/config/logstash-log.conf
input {
file{
# 设置log文件路径,多个文件路径可设置成数组[],模糊匹配用*
# 指定单一文件
path => ["/opt/es/logstash-8.12.2/logs/logstash-plain.log"]
# 指定数组文件
#path => ["/data/es/logstash-5.6.1/files/test-1.log","/data/es/logstash-5.6.1/files/test-2.log"]
# 指定同级目录模糊匹配
#path => "/data/es/logstash-5.6.1/files/test*.log"
# 指定多级目录模糊匹配
#path => "/data/es/logstash-5.6.1/files/**/test*.log"
# begining表示从头开始读取文件,end表示读取最新数据,可和ignore_older一起使用
# begining只针对首次启动是否需要读取所有的历史数据,而当文件修改了之后,同样会自动增量更新新数据
start_position =>"beginning"
# 设置输入规则
#codec => multiline {
# 利用正则匹配规则,匹配每一行开始的位置,这里匹配每一行开始的位置为数字
#pattern => "^[0-9]"
# true:不匹配正则表达式,false:匹配正则表达式,默认false
# 如果不匹配,则会结合what参数,进行合并操作
#negate => true
# what可设置previous和next,previous则表示将所有不匹配的数据都合并到上一个正则事件,而next则相反,将所有的不匹配的数据都合并到下一个正则事件
#what => "previous"
# 表示当多长时间没有新的数据,最后一个正则匹配积累的多行数据都归属为最后一个事件,这里的10表示10秒
#auto_flush_interval => 10
#}
}
}
# 输出配置
output {
elasticsearch {
hosts => ["https://192.168.1.107:9200", "https://192.168.1.108:9200", "https://192.168.1.109:9200"]
user => "elastic"
password => "elastic"
# 启用 SSL/TLS 加密连接
ssl => true
# es使用http证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
ssl_certificate_verification => true
# es的PEM格式证书文件地址
ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/elasticsearch-ca.pem"
# es使用ca证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
#ssl_certificate_verification => false
# es的PEM格式证书文件地址
#ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/ca.crt"
# ES索引名称(自己定义的)
index => "logstash-%{+YYYY.MM.dd}"
}
}
2.6. 联合抽取
mysql和postgresql一起抽取
先创建es索引
上传mysql和postgresql数据库驱动jar
jar放到/opt/es/logstash-8.12.2/jdk/lib目录下(可自定义目录)
bin目录下执行:./logstash -f config/logstash.conf
vi /opt/es/logstash-8.12.2/config/logstash.config
input {
jdbc {
# mysql配置参考数据抽取
...
# 注意:在Elasticsearch 7.x及更高版本中,'type'字段已被弃用
# 你可以在这里添加一个自定义的字段来区分数据类型,而不是使用'type'
add_field => { "source" => "mysql_sys_dept" }
}
jdbc {
# postgresql配置参考数据抽取
...
# 注意:在Elasticsearch 7.x及更高版本中,'type'字段已被弃用
# 你可以在这里添加一个自定义的字段来区分数据类型,而不是使用'type'
add_field => { "source" => "postgresql_sys_dept" }
}
}
output {
# 输出控制台
stdout {
codec => json_lines
}
if [source] == "mysql_sys_dept" {
# 输出到es
elasticsearch {
# es地址 集群数组hosts => ["127.0.0.1:9200"]
hosts => ["127.0.0.1:9200"]
# ES索引名称(自己定义的)
index => "mysql_sys_dept"
# 连接es的用户名
user => "elastic"
# 连接es的密码
password => "elastic"
# 启用 SSL/TLS 加密连接
ssl => true
# es使用http证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
ssl_certificate_verification => true
# es的PEM格式证书文件地址
ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/elasticsearch-ca.pem"
# es使用ca证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
#ssl_certificate_verification => false
# es的PEM格式证书文件地址
#ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/ca.crt"
# 设置数据的id为数据库中的字段 id需要换成mysql中的id 名必须为小写不然无法带入
document_id => "%{dept_id}"
}
}
if [source] == "postgresql_sys_dept" {
# 输出到es
elasticsearch {
# es地址 集群数组hosts => ["127.0.0.1:9200"]
hosts => ["127.0.0.1:9200"]
# ES索引名称(自己定义的)
index => "postgresql_sys_dept"
# 连接es的用户名
user => "elastic"
# 连接es的密码
password => "elastic"
# 启用 SSL/TLS 加密连接
ssl => true
# es使用http证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
ssl_certificate_verification => true
# es的PEM格式证书文件地址
ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/elasticsearch-ca.pem"
# es使用ca证书配置如下:
# 是否启用 SSL/TLS 证书验证(true:启用 false:禁用)
#ssl_certificate_verification => false
# es的PEM格式证书文件地址
#ssl_certificate_authorities => "/opt/es/elasticsearch-8.12.2/config/certs/ca.crt"
# 设置数据的id为数据库中的字段 id需要换成mysql中的id 名必须为小写不然无法带入
document_id => "%{dict_code}"
}
}
}
3. 执行多个配置文件
3.1. 使用config配置文件
cd /opt/es/logstash-8.12.2/bin
# 第一种方式:使用逗号分隔
./logstash -f /path/to/config1.conf,/path/to/config2.conf
# 第二种方式:使用空格分隔
./logstash -f /path/to/config1.conf /path/to/config2.conf
# 第三种方式:使用通配符
./logstash -f /path/to/config/*.conf
# 第四种方式:指定目录
./logstash -f /path/to/config/
3.2. 使用pipelines配置文件
logstash抽数据出现连接超时:https://www.cnblogs.com/liang1101/p/8509978.html
执行数据导入命令
./logstash
vi /opt/es/logstash-8.12.2/config/pipelines.yml
# pipeline.id:唯一标识
- pipeline.id: dept
path.config: "/opt/es/logstash-8.12.2/sql_config/dept.conf"
- pipeline.id: person
path.config: "/opt/es/logstash-8.12.2/sql_config/person.conf"
# 下面是全局配置,也可以写在每个pipeline中
pipeline:
# 工作的线程数
workers: 16
batch:
# 每个批次包含的事件数量
size: 500
# 批处理的最大等待时间(毫秒)
# 等待期事件数量未达到批次数量,但超过等待时间,批次也会立即处理
delay: 200
output:
# 输出阶段的线程数,负责将处理后的事件发送到目的地(如Elasticsearch)
workers: 12
# false:Logstash关闭时能够安全地完成所有正在进行的任务
# true:关闭时可能会丢失未处理的事件
unsafe_shutdown: false
queue:
# 使用持久化队列(事件会被存储到磁盘上而非仅仅在内存中,有助于提高数据可靠性)
type: persisted
# 每个页面的大小限制为250MB
page_capacity: 250mb
# 队列中最多允许有10000个事件
max_events: 10000
# 队列的最大容量为1GB
max_bytes: 1gb
checkpoint:
# 每处理完10000个事件后进行一次确认
acks: 10000
# 每当写入10000个事件时,进行一次确认
writes: 10000
# 每1000毫秒(1秒)进行一次确认操作
interval: 1000
三、kibana部署
1. 部署
kibana配置参考官网:https://www.elastic.co/guide/cn/kibana/current/settings.html
# 解压
tar -zxvf kibana-8.12.2-linux-x86_64.tar.gz -C /opt/es
vi /opt/es/kibana-8.12.2/config/kibana.yml
# kibana端口
server.port: 5601
# 所有主机都能访问,或者也可以指定一个ip
server.host: "0.0.0.0"
# 配置es的访问地址
elasticsearch.hosts: ["https://ip1:9200", "https://ip2:9200", "https://ip3:9200"]
# 账户密码
elasticsearch.username: "kibana_system"
elasticsearch.password: "elastic"
# es证书(http证书可以配置,ca证书不需要配置)
elasticsearch.ssl.certificateAuthorities: [ "/opt/es/elasticsearch-8.12.2/config/certs/elasticsearch-ca.pem" ]
# ES证书认证。full(默认):验证所有(主机名和证书等),certificate:仅验证证书,不验证主机名,none:不校验
# es使用http证书,配置成full/certificate
# es使用ca证书,配置成none
elasticsearch.ssl.verificationMode: full
# 语言设置
i18n.locale: "zh-CN"
# 后台启动
cd /opt/es/kibana-8.12.2/bin
nohup ./kibana & # 默认日志输出到nohup.out
nohup ./kibana > kibana_log.txt & # 日志输出到kibana.txt
# 查看kibana进程
ss -tulnp | grep 5601 或者 netstat -tulnp | grep 5601
# 网页访问
http://ip:5601
2. 控制台操作
2.1. 索引操作
# 创建索引
PUT index
# 创建索引以及映射
PUT /index
{
"settings": {
"number_of_shards": 9,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"xl": {
"type": "long"
},
"sfzh": {
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"jyrq": {
"type": "date",
"format": "yyyy-MM-dd"
},
"jysj": {
"type": "date",
"format": "HH:mm:ss"
},
"rkrq": {
"type": "date",
"format": "yyyy-MM-dd"
},
"rksj": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"gxsj": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
# 给现有索引添加映射
PUT /index/_mapping
{
"properties": {
"xm": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"xb": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
# 查询索引
GET index
# 查询所有索引,_cat 表示查看的意思
GET _cat/indices
# 删除索引
DELETE index
2.2. 文档操作
# 创建/修改文档
POST index/_doc/1
{
"id" : 1001,
"name" : "zhangsan",
"age" : 30
}
# 批量插入(id相同则覆盖)
POST _bulk
{"index":{"_index":"test","_id":"1"}}
{"title":"hello","xm":"张三"}
{"index":{"_index":"test","_id":"2"}}
{"title":"world","xm":"李四"}
# 按条件更新文档
POST /index/_update_by_query
{
"query": {
"bool": {
"must": [
{
"term": {
"czzt.keyword": {
"value": "1"
}
}
},
{
"range": {
"jyrq": {
"gte": "2025-07-21",
"lte": "2025-08-05"
}
}
}
]
}
},
"script": {
"params": {
"czsj": null,
"czczyh": null,
"czzt": "0",
"czsm": null
},
"lang": "painless",
"source": "ctx._source.czzt = params.czzt;ctx._source.czsm = params.czsm;ctx._source.czczyh = params.czczyh;ctx._source.czsj = params.czsj;"
}
}
# 查询文档
GET index/_doc/1
# 查询所有文档(查表所有记录)
GET index/_search
GET index/_search
{
"query":{
"match_all":{
}
}
}
# 删除文档
DELETE index/_doc/1
# 按条件删除文档(conflicts=proceed 遇到版本冲突时,直接跳过继续处理下一条文档)
POST /index/_delete_by_query?conflicts=proceed
{
"query": {
"range": {
"rkrq": {
"gte": "2025-07-06",
"lte": "2025-08-05"
}
}
}
}
2.3. 查询操作
● 全量、分页、指定显示字段、排序查询
get index/_search
{
"query":{
"match_all":{} //全量查询
},
"from":0, //当前页的起始位置
"size":20, //每页查询的条数
"_source":["xm", "rksj"], //指定字段查询(返回的字段)
"sort":{ //表示排序
"rksj":{ //选择排序字段
"order":"desc" //选择排序方式 desc,asc
}
}
}
● match全文搜索:对查询字符串进行分词,文档中包含任意一个分词词汇就匹配成功
● match_phrase全文搜索:按照分词顺序匹配完整的短语
get index/_search
{
"query":{
"match":{
"bz":"转账信息" //会进行分词,匹配任意一个分词
},
"match_phrase":{
"xm":"张三" //会把张三当成一个词语去查询
}
}
}
● term精准匹配:适用于精确匹配场景,比如数值、日期或不需要分词的keyword字段
● terms多值精准匹配:匹配到任意一个值就行
get index/_search
{
"query": {
"term": {
"rkrq": "2022-05-04"
}
}
}
// rkrq=2025-06-03 or rkrq=2025-06-04
get t_dk_ga_sayhk/_search
{
"query": {
"terms": {
"rkrq": [
"2025-06-03",
"2025-06-04"
]
}
}
}
● range查询
get index/_search
{
"query": {
"range": {
"age": {
"gte": 10, // 大于等于
"lte": 50, // 小于等于
"gt": 20, // 大于
"lt": 30 // 小于
}
}
}
}
● bool组合查询:must(必须匹配)、should(至少匹配一个)、must_not(必须不匹配)和filter(过滤)
get index/_search
{
"query":{
"bool":{ //条件
"must":[{ //多个条件必须成立(sql中and)
"term":{ //第一个条件,
"rkrq":"2024-07-15"
}
},
{
"match":{ //第二个条件
"sfzh":"330821199901115917"
}
}],
"should":[{ //多个条件成立其中一个(sql中or)
"match":{
"dfxm":"张"
}
},
{
"match":{
"dfxm":"王"
}
}],
"must_not":[{ //必须不满足(sql中!=)
"term":{
"color":"yellow"
}
}],
"filter":{ //过滤
"range":{ //范围
"age":{ //选择的字段
"gt":10, //大于
"lt":50, //小于
"gte":20, //大于等于
"lte":30 //小于等于
}
}
}
}
}
}
● highlight高亮显示
get index/_search
{
"query" : {
"match_phrase" : {
"bz" : "转账"
}
},
"highlight": {
"fields" : {
"bz" : {}
}
}
}
● 聚合查询:https://www.elastic.co/guide/cn/elasticsearch/guide/current/aggregations.html
○ 分组
get index/_search
{
"size": 0, //设置成0,不展示原始数据,只展示分组结果数据
"aggs": { //聚合操作
"rkrq_group": { //统计结果名称,随意起名
"terms": { //分组
"field": "rkrq", //分组字段
"size": 10000 //允许最多返回10000个不同的rkrq值
}
}
}
}
// 返回结果
{
...
"hits": {
"total": {
"value": 179,
"relation": "eq"
},
"max_score": null,
"hits": [] // 设置了size参数,所以不会有 hits 搜索结果返回
},
"aggregations": {
"rkrq_group": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 3,
"buckets": [
{
"key": 1658880000000,
"key_as_string": "2022-07-27",
"doc_count": 107
},
{
"key": 1643846400000,
"key_as_string": "2022-02-03",
"doc_count": 15
},
{
"key": 1644019200000,
"key_as_string": "2022-02-05",
"doc_count": 1
}
]
}
}
}
○ 平均值
get index/_search
{
"size": 0, //设置成0,不展示原始数据,只展示分组结果数据
"aggs": {
"ajrq_avg": { //统计结果名称,随意起名
"avg": { //分组
"field": "rkrq" //分组字段
}
}
}
}
// 返回结果
{
...
"aggregations": {
"rkrq_avg": {
"value": 1654376782978.7234,
"value_as_string": "2022-06-04"
}
}
}
○ 分组嵌套平均值:rkrq分组后,每组rkrq的je平均值
get index/_search
{
"size" : 0,
"aggs": {
"rkrq_group": {
"terms": {
"field": "rkrq"
},
"aggs": {
"avg_je": {
"avg": {
"field": "je"
}
}
}
}
}
}
// 返回结果
{
...
"aggregations": {
"rkrq_group": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 3,
"buckets": [
{
"key": 1659916800000,
"key_as_string": "2022-08-08",
"doc_count": 2280,
"avg_xl": {
"value": 1146.5
}
},
{
"key": 1721260800000,
"key_as_string": "2024-07-18",
"doc_count": 8,
"avg_xl": {
"value": 2316.5
}
},
{
"key": 1701129600000,
"key_as_string": "2023-11-28",
"doc_count": 4,
"avg_xl": {
"value": 2301.5
}
}
]
}
}
}
○ 分组嵌套分组:按rkrq进行分组后,再按khwd进行分组
get index/_search
{
"size" : 0,
"aggs": {
"rkrq_group": {
"terms": {
"field": "rkrq"
},
"aggs": {
"khwd_group":{
"terms": {
"field":"khwd.keyword"
}
}
}
}
}
}
// 返回结果
{
...
"aggregations": {
"rkrq_group": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 1721260800000,
"key_as_string": "2024-07-18",
"doc_count": 8,
"khwd_group": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "浙江中信银行",
"doc_count": 4
},
{
"key": "建设银行-中国建设银行股份",
"doc_count": 2
},
{
"key": "衢州中信银行",
"doc_count": 2
}
]
}
},
{
"key": 1721347200000,
"key_as_string": "2024-07-19",
"doc_count": 4,
"khwd_group": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "交通银行-交通银行股份有限",
"doc_count": 2
},
{
"key": "建设银行-中国建设银行股份",
"doc_count": 2
}
]
}
},
{
"key": 1721606400000,
"key_as_string": "2024-07-22",
"doc_count": 2,
"khwd_group": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "交通银行-交通银行股份有限",
"doc_count": 2
}
]
}
}
]
}
}
}
○ 分组求最大最小值:按rkrq进行分组,每组rkrq求最大xl和最小xl
get index/_search
{
"size" : 0,
"aggs": {
"rkrq_group": {
"terms": {
"field": "rkrq"
},
"aggs": {
"xl_min": {
"min": {
"field":"xl"
}
},
"xl_max": {
"max": {
"field":"xl"
}
}
}
}
}
}
// 返回结果
{
...
"aggregations": {
"rkrq_group": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 1721260800000,
"key_as_string": "2024-07-18",
"doc_count": 8,
"xl_min": {
"value": 2313
},
"xl_max": {
"value": 2320
}
},
{
"key": 1721347200000,
"key_as_string": "2024-07-19",
"doc_count": 4,
"xl_min": {
"value": 2321
},
"xl_max": {
"value": 2324
}
},
{
"key": 1721606400000,
"key_as_string": "2024-07-22",
"doc_count": 2,
"xl_min": {
"value": 2325
},
"xl_max": {
"value": 2326
}
}
]
}
}
}
2.4. sql操作
# sql查询
POST /_sql?format=txt
{
"query": "select * from sys_dept"
}
POST /_sql?format=json
{
"query": "select * from sys_dept"
}
更多推荐



所有评论(0)