Elastic Stack

官网地址:https://www.elastic.co/cn/

包含了数据的整合 => 提取 => 存储 => 使用,一整套

各个组件介绍:

  • beats 套件:从各种不同类型的文件 / 应用中采集数据。比如:a, b, c, d, e, aa, bb, cc
  • Logstash:从多个采集器或数据源来抽取 / 转换数据,向 ES 输送。比如:a, bb, cc
  • elasticsearch:存储、查询数据
  • kibana:可视化 ES 数据

安装 ES

elasticsearch:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/setup.html

安装:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/zip-windows.html

注意:一套技术,版本必须要一致,此处使用 7.17 版本

安装 Kibana

Kibana:https://www.elastic.co/guide/en/kibana/7.17/introduction.html

安装:https://www.elastic.co/guide/en/kibana/7.17/install.html

ElasticSearch 概念

可以理解为 MySQL 一样的数据库去学习和理解

入门学习:

  • Index 索引 => MySQL 里的表(table)
  • 建表、增删改查(查询需要花费的时间较多)
  • 用客户端去调用 ElasticSearch(3种)
  • 语法:SQL、代码的方法(4中种语法)

ES 相比于 MySQL 能够自动帮我们做分词,能够非常高效、灵活地查询内容

索引(倒排索引)

正向索引:理解为书籍的目录,可以快速找到对应的内容(怎么根据页码找到文章)

倒排索引:怎么根据内容找到文章

文章 A:你好,我是 rapper

文章 B:墨枫你好,我是 coder

切词:

你好,我是,rapper

墨枫,你好,我是,coder

构建倒排索引表:

内容 id
你好 文章 A, B
我是 文章 A, B
rapper 文章 A
墨枫 文章 B
coder 文章 B

用户搜:“墨枫 rapper”

ES 先切词:墨枫, rapper

然后去倒排索引对应的文章

ES的几种调用方式

1)restful api 调用(HTTP 请求)

GET请求:http://localhost:9200/

curl 可以模拟发送请求:curl -X GET “localhost:9200/?pretty”

ES 的启动端口

  1. 9200:给外部用户(给客户端调用)的端口
  2. 9300:给 ES 集群内部通信的(外部调用不了)

2)kibana devtools

自由地对 ES 进行操作(本质上也是 restful api)

devtools 不建议生产环境使用

3)客户端调用

java 客户端、go 客户端等

参考文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/_getting_started.html

ES 语法

DSL

json 格式,好理解;额 Http 请求最兼容,应用最广

建表、插入数据

1
2
3
4
5
POST post/_doc/pqGEzYcB9-HYm7AKfgyO
{
"title": "墨枫33333",
"desc": "墨枫的描述33333"
}

查询

DSL 语法:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/query-dsl.html

(随忘随查)

1
2
3
4
5
6
7
8
9
10
11
GET post/_search
{
"query": {
"match_all": { }
},
"sort": [
{
"@timestamp": "desc"
}
]
}

根据 id 查询:

1
GET post/_doc/pqGEzYcB9-HYm7AKfgyO

修改

1
2
3
4
5
POST post/_doc/pqGEzYcB9-HYm7AKfgyO
{
"title": "墨枫",
"desc": "墨枫的描述"
}

删除

删除普通索引

1
DELETE index_name

删除数据流式索引

1
DELETE _data_stream/post

EQL

专门查询 ECS 文档(标准指标文档)的数据的语法,更加规范,但只适用于特定场景(比如事件流)

文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/eql.html

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST my_event/_doc
{
"title": "墨枫33333",
"@timestamp": "2099-05-06T16:21:15.000Z",
"event": {
"original": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
}
}

GET /my_event/_eql/search
{
"query": """
any where 1 == 1
"""
}

SQL

文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/sql-getting-started.html

学习成本低,但是可能需要插件支持、性能较差

示例:

1
2
3
4
POST /_sql?format=txt
{
"query": "SELECT * FROM post where title like '墨枫%'"
}

Painless Scripting language

编程式取值,更灵活,但是学习成本高

Mapping

文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/explicit-mapping.html

可以理解为数据库的表结构,有哪些字段、字段类型。

ES 支持动态 mapping,表结构可以动态改变,而不像 MySQL 一样必须手动建表,没有的字段就不能插入。

显示创建 mapping:

1
2
3
4
5
6
7
8
9
10
11
GET user/_mapping
PUT user
{
"mappings": {
"properties": {
"age": { "type": "integer" },
"email": { "type": "keyword" },
"name": { "type": "text" }
}
}
}

ElasticStack 概念

ES 索引(Index) => 表

ES field (字段) => 列

倒排索引

调用语法(DSL、EQL、SQL等)

Mapping 表结构

  • 自动生成 mapping
  • 手动指定 mapping

分词器

内置分词器:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/analysis-analyzers.html

空格分词器:whitespace,结果 The、quick、brown、fox等

示例:

1
2
3
4
5
POST _analyze
{
"analyzer": "whitespace",
"text": "The quick is brown fox."
}

标准分词规则,结果:is、this、deja、vu

1
2
3
4
5
6
POST _analyze
{
"tokenizer": "standard",
"filter": [ "lowercase", "asciifolding" ],
"text": "Is this déja vu?"
}

关键词分词器:就是不分词,整句话当作专业术语

1
2
3
4
5
POST _analyze
{
"analyzer": "keyword",
"text": "The quick is brown fox."
}

IK 分词器(ES 插件)

中文友好:https://github.com/medcl/elasticsearch-analysis-ik

下载地址(注意版本一致):https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.17.7

思考:如何让 IK 按照自己的想法分词?

解决方案:自定义词典

ik_smart 和 ik_max_word 的区别:

举例:“小黑子”

ik_smart 是只能分词,尽量选择最像一个词的拆分方式,比如“小”、“黑子”

ik_max_word 尽可能地分词,可以包括组合词,比如“小黑”、“黑子“

打分机制

比如有 3 条内容:

  1. 墨枫是狗
  2. 墨枫是小黑子
  3. 我是小黑子

用户搜索:

  1. 墨枫,第一条分数最高,因为第一条匹配了关键词,而且更短(匹配比例更大)
  2. 墨枫小黑子 => 墨枫、小、黑子,排序结果:2 > 3 > 1

参考文章:https://liyupi.blog.csdn.net/article/details/119176943

官方参考文章:https://www.elastic.co/guide/en/elasticsearch/guide/master/controlling-relevance.html

ES 调用方式

有 3 种:

  1. HTTP Restful 调用
  2. kibana 操作(dev tools)
  3. 客户端操作(Java)

Java 操作 ES

3 种方式:

1)ES 官方的 Java API

https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/introduction.html

快速开始:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/connecting.html

2)ES 以前的官方 Java API, HighLevelRestClient(已经废弃,不建议使用)

3)Spring Data Elasticsearch(推荐)

spring-data 系列:spring 提供的操作数据的框架

spring-data-redis:操作 Redis 的一套方法

spring-data-mongodb:操作 mongodb 的一套方法

spring-data-elasticsearch:操作 elasticsearch 的一套方法

官方文档:https://docs.spring.io/spring-data/elasticsearch/docs/4.4.10/reference/html/

自定义方法:用户可以指定接口的方法名称,框架帮你自动生成查询

用 ES 实现搜索接口

1、建表(建立索引)

数据库表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- 帖子表
create table if not exists post
(
id bigint auto_increment comment 'id' primary key,
title varchar(512) null comment '标题',
content text null comment '内容',
tags varchar(1024) null comment '标签列表(json 数组)',
thumbNum int default 0 not null comment '点赞数',
favourNum int default 0 not null comment '收藏数',
userId bigint not null comment '创建用户 id',
createTime datetime default CURRENT_TIMESTAMP not null comment '创建时间',
updateTime datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
isDelete tinyint default 0 not null comment '是否删除',
index idx_userId (userId)
) comment '帖子' collate = utf8mb4_unicode_ci;

ES Mapping:

id(可以不放到字段设置里)

ES 中,尽量存放需要用户筛选(搜索)的数据

  • aliases:别名(为了后续方便数据迁移)

​ 字段类型时 text,这个字段是可被分词的、可模糊查询的;而如果是 keyword,只能完全匹配、精确查询。

  • analyzer (存储时生效的分词器):用 ik_max_word ,拆的更碎、索引更多,更有可能被搜索出来

  • search_analyzer (查询时生效的分词器):用 ik_smart ,更偏向于用户想搜的分词

如果想让 text 类型的分词字段也支持精确查询,可以创建 keyword 类型的子字段:

1
2
3
4
5
6
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}

建表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
PUT post_v1
{
"aliases": {
"post": {}
},
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tags": {
"type": "keyword"
},
"userId": {
"type": "keyword"
},
"createTime": {
"type": "date"
},
"updateTime": {
"type": "date"
},
"isDelete": {
"type": "keyword"
}
}
}
}

2、增删改查

第一种方式:ElasticsearchRepository<PostEsDTO, Long>, 默认提供了简单的增删改查,多用于可预期的、相对诶那么复杂的查询、自定义查询,返回结果相对简单直接。

接口代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface CrudRepository<T, ID> extends Repository<T, ID> {

<S extends T> S save(S entity);

<S extends T> Iterable<S> saveAll(Iterable<S> entities);

Optional<T> findById(ID id);

boolean existsById(ID id);

Iterable<T> findAll();

Iterable<T> findAllById(Iterable<ID> ids);

long count();

void deleteById(ID id);

void delete(T entity);

void deleteAllById(Iterable<? extends ID> ids);

void deleteAll(Iterable<? extends T> entities);

void deleteAll();
}

ES 中,_开头的字段表示系统默认字段,比如 _id,如果系统不指定,会自动生成。但是不会在 _source 字段中补充id 的值,所以建议手动指定。

支持根据方法名自动生成方法,比如:

1
List<PostEsDTO> findByTitle(String title);

第二种方式:Spring 默认给我们提供的操作 ES 的客户端对象 ElasticsearchRestTemplate ,也提供了增删改查,它的增删改查更灵活,适用于更复杂的操作,返回结果更完整,但需要自己解析。

对于复杂的查询,建议使用第二种方式

三个步骤:

  1. 取参数
  2. 把参数组合为 ES 支持的搜索条件
  3. 从返回值中取结果

3、查询 DSL

参考文档:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET post/_search
{
"query": {
"bool": {
"must": [
{ "match": { "title": "墨枫" }},
{ "match": { "content": "博客" }}
],
"filter": [
{ "term": { "status": "published" }},
{ "range": { "publish_date": { "gte": "2023-05-02" }}}
]
}
}
}

wilcard 模糊查询

regexp 正则表达式

查询结果中,score 代码匹配分数

建议先测试 DSL,再翻译成 Java

动静分离设计:先模糊筛选竟然数据,查出数据后,再根据查到的内容 id 去数据库查找到 动态数据

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊查询,但是数据是存放在数据库 MySQL 里的,所以说需要把 MySQL 的数据和 ES 的数据进行同步,保证数据的一致性(以 MySQL为主)

MySQL => ES (单向)

首次安装完 ES ,把 MySQL数据全量同步到 ES 里,写一个单词脚本

4 种方式,全量同步(首次) + 增量同步(新数据):

  1. 定时任务,比如 1 分钟 1 次,找到 MySQL 过去几分钟内(至少定时周期的 2 倍)发生改变的数据,然后更新到 ES
    优点:简单易懂、占用资源少、不用引入第三方中间件
    缺点:有时间差
    应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
  2. 双写:写数据的时候,必须也去写 ES;更新删除数据库同理。(事务:建议先保证 MySQL 写成功,如果 ES 写失败了,可以通过定时任务 + 日志 + 告警进行检测和修复(补偿)
  3. 用 Logstash 数据同步管道(一般要配合 kafka 消息队列 + beats 采集器)
  4. Canal 监听 MySQL Binlog, 实时同步

Logstash

传输处理数据的管道

https://www.elastic.co/guide/en/logstash/7.17/getting-started-with-logstash.html

好处:用起来方便,插件多

缺点:成本更大、一般要配合其他组件使用(比如 kafka)

image-20230503223832634

事件 DEMO:

1
2
cd logstash-7.17.9
.\bin\logstash.bat -e "input { stdin {} } output { stdout{} }"

快速开始文档:https://www.elastic.co/guide/en/logstash/7.17/running-logstash-windows.html

监听 udp 并输出:

1
2
3
4
5
6
7
8
9
10
11
12
input{
udp{
port => 514
type => "syslog"
}
}

output{
stdout{
codec => rubydebug
}
}

要把 MySQL 同步给 Elasticsearch

问题1:找不到 mysql 包

1
2
3
Error:unable to load mysql-connector-java-5.1.36-bin.jar from :jdbc_driver_library,file not readable
(please check user and group permissions for the path)
Exception:LogStash:PluginLoadingError

解决:修改 Logstash 任务配置中的 jdbc_driver_library 为驱动包的绝对路径(驱动包可以从 maven 仓库中拷贝)

增量配置:是不是可以只查最新更新的数据?可以记录上次更新的时间,只查出来 > 该更新时间的数据

小知识:预编译 SQL 的优点

  1. 灵活
  2. 模板好懂
  3. 快(有缓存)
  4. 部分仿注入

sql_last_value 是上次查到额数据的最后一行的指定的字段,如果要全量更新,只要删除掉

D:\javaTools\ElasticStack\logstash-7.17.9\data\plugins\inputs\jdbc\logstash_jdbc_last_run 文件即可(这个文件存储额上次同步到的数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
input {
jdbc {
jdbc_driver_library => "D:\javaTools\ElasticStack\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT * from post where updateTime > :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updatetime"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "*/5 * * * *"
jdbc_default_timezone => "Asia/Shanghai"
}
}

output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "http://localhost:9200"
index => "post_v1"
document_id => "%{id}"

}
}

注意查询语句中要按 updateTime 排序,保证最后一条是最大的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
input {
jdbc {
jdbc_driver_library => "D:\javaTools\ElasticStack\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT * from post where updateTime > :sql_last_value and updateTime < now() order by updateTime desc"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updatetime"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "*/5 * * * *"
jdbc_default_timezone => "Asia/Shanghai"
}
}

output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "http://localhost:9200"
index => "post_v1"
document_id => "%{id}"

}
}

两个问题:

  1. 字段全变成小写了
  2. 多了一些我们不想同步的字段

解决:

编写过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
input {
jdbc {
jdbc_driver_library => "D:\javaTools\ElasticStack\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT * from post where updateTime > :sql_last_value and updateTime < now() order by updateTime desc"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updatetime"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "*/5 * * * *"
jdbc_default_timezone => "Asia/Shanghai"
}
}

filter {
mutate {
rename => {
"updatetime" => "updateTime"
"userid" => "userId"
"createtime" => "createTime"
"isdelete" => "isDelete"
}
remove_field => ["thumbnum", "favournum"]
}
}


output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "http://localhost:9200"
index => "post_v1"
document_id => "%{id}"

}
}

订阅数据库流水的同步方式 Canal

https://github.com/alibaba/canal/

优点:实时同步,实时性非常强

原理:数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间饿到消息并处理

canal:帮你监听 binlog,并解析 binlog 为可以理解的内容

它伪装成了 MySQL 的从节点,获取主节点给的 binlog ,如图:

image-20230503230056433

快速开始:https://github.com/alibaba/canal/wiki/QuickStart

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

如果 java 找不到,修改 startup.bat 脚本为你自己的 java home:

1
2
3
4
set JAVA_HOME=C:\Users\59278\.jdks\corretto-1.8.0_302
echo %JAVA_HOME%
set PATH=%JAVA_HOME%\bin;%PATH%
echo %PATH%

问题:mysql 无法链接,Caused by: java.io.IOException: caching_sha2_password Auth failed

解决方案:https://github.com/alibaba/canal/issues/3902

1
2
3
4
5
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;

FLUSH PRIVILEGES;

配置 kibama 可视化看板

  1. 创建索引
  2. 导入数据
  3. 创建索引模式
  4. 选择图表、托拉拽
  5. 保存

压力测试

官方文档:https://jmeter.apache.org/

找到 jar 包:D:\javaTools\apache-jmeter-5.5\bin\ApacheJMeter.jar 启动

配置线程组 => 请求头 => 默认请求 => 单个请求头 => 响应断言 => 聚合报告 / 结果树

image-20230503230921181

99% 分位:99% 的用户都在这个响应时间内

吞吐量:每秒处理的请求数 qps

更多的学习

插件:https://jmeter-plugins.org/install/Install/

下载后文件为 plugins-manager.jar 格式,将其放入 jmeter 安装目录下的 lib/ext 目录,然后 jmeter,即可。

参考文章:https://blog.csdn.net/weixin_45189665/article/details/125278218

搜索建议

参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-suggesters.html

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
GET post/_search
{
"query": {
"match": { "content": "墨枫是狗" }
},
"highlight": {
"fields": {
"content": {
"pre_tags" : ["<h1>"],"post_tags" : ["<h1>"]
}
}
},
"suggest" : {
"my-suggestion" : {
"text" : "墨枫是狗",
"term" : {
"field" : "content"
}
}
}
}

搜索高亮

参考官方:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/highlighting.html

1
2
3
4
5
6
7
8
9
10
11
12
13
GET post/_search
{
"query": {
"match": { "content": "鱼皮是狗" }
},
"highlight": {
"fields": {
"content": {
"pre_tags" : ["<h1>"],"post_tags" : ["<h1>"]
}
}
}
}

前端防抖节流

问题:用户频繁搜索、频繁点击欧索按钮怎么办?

解决:使用 lodash 工具库实现防抖和节流

节流:每段时间最多执行 x 次(比如服务器限流)https://www.lodashjs.com/docs/lodash.throttle

防抖:等待一段时间内没有其他操作了,才执行操作(比如输入搜索)

https://www.lodashjs.com/docs/lodash.debounce

接口稳定性优化

问题:调用第三方接口不稳定怎么办?(比如 bing 接口)

使用 guava-retrying 库实现自动重试:https://github.com/rholder/guava-retrying

参考学习文章:https://cloud.tencent.com/developer/article/1752086