redis

NoSQL数据库简介

主要解决性能问题

Web1.0 的时代,数据访问量很有限,用一夫当关的高性能的单点服务器可以解决大部分问题。

随着 Web2.0 的时代的到来,用户访问量大幅度提升,同时产生了大量的用户数据。加上后来的智能移动设备的普及,所有的互联网平台都面临了巨大的性能挑战。

image-20220607112049192

NoSQL的出现解决了解决 CPU 及内存压力 和 IO 压力

解决 CPU 及内存压力

将服务器进行 集群 或 分布式 部署,也就是多台服务器。通过nginx(负载均衡)进行分配服务器。

但存在一个问题,就是当用户访问服务器时,例如,分配了第一个服务器,将session存储到服务器1中,但当第二次访问时,分配的是第二台服务器,此时第二台服务器没有session对象,怎么办?

image-20220607112137349

NoSql可以减少cpu和io的压力,直接通过内存进行读取,nosql可以缓存数据库,提高访问速度,减少io操作

解决 IO 压力

image-20220607124939134

NoSQL 数据库概述

非关系型的数据库 。NoSQL 不依赖业务逻辑方式存储,而以简单的 key-value 模式存储。因此大大的增加了数据库的扩展能力。

  • 不遵循 SQL 标准。
  • 不支持 ACID。(原子性,一致性,隔离性,持久性 )
  • 远超于 SQL 的性能。

NoSQL 适用场景

  • 数据高并发的读写
  • 海量数据的读写
  • 对数据高可扩展性的

NoSQL 不适用场景

  • 需要事务支持
  • 基于 sql 的结构化查询存储,处理复杂的关系,需要即席查询。
  • (用不着 sql 的和用了 sql 也不行的情况,请考虑用 NoSql)

NoSQL具体的数据库:

(1)Memcache

  • 很早出现的 NoSql 数据库
  • 数据都在内存中, 一般不持久化
  • 支持简单的 key-value 模式, 支持类型单一
  • 一般是作为缓存数据库辅助持久化的数据库

(2) Redis

  • 几乎覆盖了 Memcached 的绝大部分功能
  • 数据都在内存中, 支持持久化, 主要用作备份恢复
  • 除了支持简单的 key-value 模式, 还支持多种数据结构 的存储, 比如 list、 set、 hash、 zset 等。
  • 一般是作为缓存数据库辅助持久化的数据库

(3) MongoDB

  • 数据都在内存中, 如果内存不足, 把不常用的数据保 存到硬盘
  • 虽 然 是 key-value 模 式 , 但 是 对 value ( 尤 其 是 json) 提供了丰富的查询功能  支持二进制数据及大型对象 
  • 可以根据数据的特点替代 RDBMS , 成为独立的数据 库。 或者配合 RDBMS, 存储特定的数据。

Redis安装

(1)在官网下载稳定版本

(2)在linux环境下进行安装,eg:redis-6.2.1.tar.gz

(3)在linux环境 安装 C 语言的编译环境

  • yum install centos-release-scl scl-utils-build
  • yum install -y devtoolset-8-toolchain
  • scl enable devtoolset-8 bash

如果在Centos7下,直接安装gcc就可以yum install gcc

测试gcc版本 : gcc --version (GCC是一个用于linux系统下编程的编译器)

image-20220607125734946

(4)下载 redis-6.2.1.tar.gz 放/opt/redis 目录 中(运用文件传输软件,例如Xftp)

(5)解压命令tar -zxvf redis-6.2.1.tar.gz

(6)解压完成后进入目录: cd redis-6.2.1

(7)在 redis-6.2.1 目录下执行 make 命令

(8)如果没有准备好 C 语言编译环境, make 会报错—Jemalloc/jemalloc.h: 没有那个文件

  • 解决方案: 运行 make distclean 后再次执行make

(9)跳过 make test 继续执行: make install

(10)默认的安装目录为/usr/local/bin

查看默认安装目录:

  • redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何
  • redis-check-aof:修复有问题的 AOF 文件,rdb 和 aof 后面讲
  • redis-check-dump:修复有问题的 dump.rdb 文件
  • redis-sentinel:Redis 集群使用
  • redis-server:Redis 服务器启动命令
  • redis-cli:客户端,操作入口

(11)后台启动

  • 备份redis.conf : cp /opt/redis-3.2.5/redis.conf /etc/ redis.conf

(12)后台启动设置 daemonize no 改成 yes :修改 redis.conf(128 行)文件将里面的 daemonize no 改成 yes,让服务在后台启动

(13)Redis 启动redis-server /etc/redis.conf

(14)用客户端访问: redis-cli

(15)测试验证: ping

image-20220607131718774

(16)Redis 关闭:

  • 单实例关闭:redis-cli shutdown
  • 也可以通过关闭进程进行关闭

image-20220607131847688

(17)给redis设置密码:

修改redis.conf配置文件中的requirepass,修改为如下:

1
2
# requirepass foobared
requirepass Zlw199805

Redis 介绍相关知识

redis的底层使用的是:单线程+多路IO服用 :

Redis支持多线程主要有两个原因:

  • 可以充分利用服务器CPU资源,目前主线程只能利用一个核。
  • 多线程任务可以分摊Redis同步IO读写负荷。

image-20220607133051004

黄牛还没有买到票的时候,123个人都在做自己的事情,而不是一直等待着票。

Redis中常用的五大数据类型

Redis键(Key)

(1)keys * :查看当前库中所有的key

image-20220607134236942

(2)exists key :判断某个 key 是否存在

(3)type key:查看你的 key 是什么类型

(4)del key :删除指定的 key 数据 / unlink key 根据 value 选择非阻塞删除

(5)expire key 10 :10 秒钟:为给定的 key 设置过期时间

(6)ttl key :查看还有多少秒过期,-1 表示永不过期,-2 表示已过期

(7)select :命令切换数据库

(8)dbsize :查看当前数据库的 key 的数量

(9) flushdb :清空当前库 / flushall :通杀全部库

Redis 字符串(String)

String 类型是二进制安全的(内容能用字符串表示的 )。意味着 Redis 的 string 可以包含任何数据。比如 jpg 图片或者序列化的对象。

String 类型是 Redis 最基本的数据类型,一个 Redis 中字符串 value 最多可以是 512M

常用命令

(1)set <key><value>:添加键值对(内容能用字符串表示的)如果设置相同的key,则前面的value被覆盖

(2)get <key>:查询对应键值

(3)append <key><value>:将给定的 追加到原值的末尾

(4)strlen <key>:获得值的长度

(5)setnx <key><value>:只有在 key 不存在时 设置 key 的值

(6)incr <key> :将 key 中储存的数字值增 1 (只能对数字操作)

(7)decr <key> :将 key 中储存的数字值减 1 (只能对数字操作)

(8) incrby / decrby <key><步长>:将 key 中储存的数字值增减。自定义步长。

  • 是原子性的:所谓原子操作是指不会被线程调度机制打断的操作;
  • (1)在单线程中, 能够在单条指令中完成的操作都可以认为是”原子操作”,因为中断只能发生于指令之间。
  • (2)在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。
  • Redis 单命令的原子性主要得益于 Redis 的单线程。

(9)mset <key1><value1><key2><value2> .....同时设置一个或多个 key-value 对

(10)mget <key1><key2><key3> .....同时获取一个或多个 value

(11)msetnx <key1><value1><key2><value2> ..... :同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在

(12)getrange <key><起始位置><结束位置>: 获得值的范围

(13)setrange <key><起始位置><value> :用 覆写所储存的字符串值,从<起始位置>开始(索引从 0 开始)。

(14)setex <key><过期时间><value>:设置键值的同时,设置过期时间,单位秒

(15)getset <key><value>:以新换旧,设置了新值同时获得旧值。

数据结构:

String 的数据结构为简单动态字符串(Simple Dynamic String,缩写 SDS)。是可以修改的字符串,内部结构实现上类似于 Java 的 ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配.

image-20220607151814346

内部为当前字符串实际分配的空间 capacity 一般要高于实际字符串长度len。如果字符串小于1M,扩容加倍现有的空间,如果超过1M,扩容时多扩1M空间。 需要注意的是字符串最大长度为 512M 。

Redis 列表(List)

单键多值 按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边) 它的底层实际是个双向链表,

常用命令

(1)lpush/rpush <key><value1><value2><value3> ....: 从左边/右边插入一个或多个值。
(2)lpop/rpop <key>从左边/右边吐出一个值。值在键在,值光键亡。
(3)rpoplpush <key1><key2>列表右边吐出一个值,插到列表左边。
(4)lrange <key><start><stop>按照索引下标获得元素(从左到右)
(5)lrange mylist 0 -1 0 左边第一个,-1 右边第一个,(0-1 表示获取所有)
(6)lindex <key><index>按照索引下标获得元素(从左到右)
(7)llen <key>获得列表长度
(8)linsert <key> before <value><newvalue>的后面插入插入值
(9)lrem <key><n><value>从左边删除 n 个 value(从左到右)
(10)lset<key><index><value>将列表 key 下标为 index 的值替换成 value

数据结构

List 的数据结构为快速链表 quickList。

当列表元素比较少的情况下,会使用一块连续的内存存储,这个结构是 ziplist,也即是压缩列表 。因为普通的链表需要的附加指针空间太大,会比较浪费空间。

当数据量比较多的时候才会改成 quicklist

Redis 将链表和 ziplist 结合起来组成了 quicklist。也就是将多个 ziplist 使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。

Redis 集合(Set)

特殊之处在于 set 是可以自动排重的

常用命令

sadd <key><value1><value2> .....将一个或多个 member 元素加入到集合 key 中,已经存在的 member 元素将被忽略
smembers <key>取出该集合的所有值。
sismember <key><value>判断集合是否为含有该值,有 1,没有 0
scard<key>返回该集合的元素个数。

srem <key><value1><value2> .... 删除集合中的某个元素。
spop <key>随机从该集合中吐出一个值。
srandmember <key><n>随机从该集合中取出 n 个值。不会从集合中删除 。
smove <source><destination>value 把集合中一个值从一个集合移动到另一个集合
sinter <key1><key2>返回两个集合的交集元素。
sunion <key1><key2>返回两个集合的并集元素。
sdiff <key1><key2>返回两个集合的差集元素(key1 中的,不包含 key2 中的)

数据结构

Set 数据结构是 dict 字典,字典是用哈希表实现的

Redis 哈希(Hash)

Redis hash 是一个键值对集合。Redis hash 是一个 string 类型的 field 和 value 的映射表,hash 特别适合用于存储对象。

image-20220607155709655

常用命令

hset <key><field><value>:给集合中的 键赋值
hget <key1><field>:从集合取出 value
hmset <key1><field1><value1><field2><value2>... :批量设置 hash 的值
hexists<key1><field>:查看哈希表 key 中, 给定域 field 是否存在。
hkeys <key>:列出该 hash 集合的所有 field
hvals <key>:列出该 hash 集合的所有 value
hincrby <key><field><increment>:为哈希表 key 中的域 field 的值加上增量 1 -1
hsetnx <key><field><value>:将哈希表 key 中的域 field 的值设置为 value , 当且仅当域field 不存在

数据结构

Hash 类型对应的数据结构是两种: ziplist(压缩列表), hashtable(哈希表)。 当field-value 长度较短且个数较少时, 使用 ziplist, 否则使用 hashtable。

Redis 有序集合 Zset(sorted set)

Redis 有序集合 zset 与普通集合 set 非常相似,是一个没有重复元素的字符串集合。
不同之处是有序集合的每个成员都关联了一个评分(score),这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复了 。

常用命令

zadd <key><score1><value1><score2><value2>…将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
zrange <key><start><stop> [WITHSCORES]返回有序集 key 中,下标在之间的元素带 WITHSCORES,可以让分数一起和值返回到结果集。
zrangebyscore key minmax [withscores] [limit offset count]返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
zrevrangebyscore key maxmin [withscores] [limit offset count]同上,改为从大到小排列。

zincrby <key><increment><value> 为元素的 score 加上增量
zrem <key><value>删除该集合下,指定值的元素
zcount <key><min><max>统计该集合,分数区间内的元素个数
zrank <key><value>返回该值在集合中的排名,从 0 开始。

数据结构

zset 底层使用了两个数据结构
(1)hash,hash 的作用就是关联元素 value 和权重 score,保障元素 value 的唯一性,可以通过元素 value 找到相应的 score 值。
(2)跳跃表,跳跃表的目的在于给元素 value 排序,根据 score 的范围获取元素列表。

image-20220607161721235

从此可以看出跳跃表比有序链表效率要高

Redis 的发布和订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者(sub) 接收消息

当订阅者订阅A频道时,A频道发布消息,则订阅者就能收到,如果B频道也发布消息,但订阅者没有订阅,则不会收到。

image-20220607221305094

命令行实现:

(1)打开客户端的订阅 channel1 :SUBSCRIBE channel1

(2)打开另一个客户端,给 channel1 发布消息 hello :publish channel1 hello

(3)打开第一个客户端,可以看到发送的消息

image-20220607221433147

Redis 新数据类型

Bitmaps

在我们平时的开发过程中,会有一些bool类型数据需要存取,比如用户一年的签到记录,签了是1,没签是0,要记录365天。如果使用普通的key/value,每个用户要记录365个,当用户数上亿的时候,需要的存储空间是惊人的。

为了解决这个问题,Redis提供了位图数据结构,这样每天的签到记录只占据了一个位,365天就是365个位,46个字节(一个稍长一点的字符串)就可以完全容纳下来,这就大大节约了存储空间。位图的最小单位是bit,每个bit的取值只能是0或1。

Redis 提供了 Bitmaps 这个“数据类型”可以实现对位的操作:

(1) Bitmaps 本身不是一种数据类型, 实际上它就是字符串(key-value) ,但是它可以对字符串的位进行操作。
(2) Bitmaps 单独提供了一套命令, 所以在 Redis 中使用 Bitmaps 和使用字符串的方法不太相同。 可以把 Bitmaps 想象成一个以位为单位的数组,数组的每个单元只能存储 0 和 1, 数组的下标在 Bitmaps 中叫做偏移量。

常用命令:

(1)setbit

setbit<key><offset><value>设置 Bitmaps 中某个偏移量的值(0 或 1)

实例:
每个独立用户是否访问过网站存放在 Bitmaps 中, 将访问的用户记做 1, 没有访问的用户记做 0, 用偏移量作为用户的 id。

​ 设置键的第 offset 个位的值(从 0 算起) , 假设现在有 20 个用户,userid=1,6, 11, 15, 19 的用户对网站进行了访问

image-20220608094714290

注意:

​ 由于很多用户id 以一个指定数字(例如 10000) 开头, 直接将用户 id 和Bitmaps 的偏移量对应势必会造成一定的浪费, 通常的做法是每次做 setbit 操作时将用户 id 减去这个指定数字 。

(2)getbit

getbit<key><offset>获取 Bitmaps 中某个偏移量的值

获取用户id为,15,18,19的用户是否被访问过,如果被访问过值为1,否则为0.

image-20220608100244411

(3)bitcount

统计字符串被设置为 1 的 bit 数 一般情况下,给定的整个字符串都会被进行计数,通过指定额外的 start 或 end 参数,

start 和 end 参数的设置,都可以使用负数值:比如 -1 表示最后一个位,而 -2 表示倒数第二个位,start、end 是指 bit 组的字节的下标数,二者皆包含 。

bitcount<key>[start end] 统计字符串从 start 字节到 end 字节比特值为 1 的数量

实例:

计算用户 id 在第 1 个字节到第 3 个字

image-20220608100603141

举例: K1 【01000001 01000000 00000000 00100001】,对应【0,1,2,3】

image-20220608101336433

(4)bitop

bitop and(or/not/xor) <destkey> [key…] : bitop 是一个复合操作, 它可以做多个 Bitmaps 的 and(交集) 、 or(并集) 、 not
(非) 、 xor(异或) 操作并将结果保存在 destkey 中 。

**实例 :计算出两天都访问过网站的用户数量 **(使用and)

​ 2020-11-04 日访问网站的 userid=1,2,5,9。

​ 2020-11-03 日访问网站的 userid=0,1,4,9 。

image-20220608101808071

image-20220608101833339

Bitmaps 与 set 对比

​ 在存储活跃用户时,bitmap要比set要大大节省空间。

image-20220608101952307

但如果该网站每天的独立访问用户很少, 例如只有 10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps 就不太合适了, 因为基本上大部分位都是 0 。

HyperLogLog

在工作当中,我们经常会遇到与统计相关的功能需求,比如统计网站 PV(PageView 页面访问量),可以使用 Redis 的 incr、incrby 轻松实现

用于解决UV(独立访客 )独立 IP 数、搜索记录数等需要去重和计数的问题

主要用于基数计算的操作,帮助去重复 ,解决去重问题,有很多方案,为什么要新提出一个这个,例如:

(1)数据存储在 MySQL 表中,使用 distinct count 计算不重复个数
(2)使用 Redis 提供的 hash、set、bitmaps 等数据结构来处理

以上的方案结果精确,但随着数据不断增加,导致占用空间越来越大,对于非常大的数据集是不切实际的。

HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的

因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

什么是基数? 比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8},基数(不重复元素)为 5。 基数估计就是在误差可接受的范围内,快速计算基数。

命令:

(1)pfadd

pfadd <key>< element> [element ...] 添加指定元素到 HyperLogLog 中

image-20220608103326649

(2)pfcount

pfcount<key> [key ...] 计算 HLL 的近似基数 ,可以计算多个

image-20220608103423389

(3)pfmerge

pfmerge<destkey><sourcekey> [sourcekey ...] 将一个或多个 HLL 合并后的结果存储在另一个 HLL 中,比如每月活跃用户可以使用每天的活跃用户来合并计算可得

image-20220608103630010

Geospatial

该类型,就是元素的 2 维坐标,在地图上就是经纬度。redis 基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度 Hash 等常见操作。

命令

(1)getadd

geoadd<key>< longitude><latitude><member> [longitude latitude member...] 添加地理位置(经度,纬度,名称)

image-20220608104301235

(2)geopos

geopos <key><member> [member...] 获得指定地区的坐标值

image-20220608104401677

(3)geodist

geodist<key><member1><member2> [m|km|ft|mi ] 获取两个位置之间的直线距离

image-20220608104503203

  • m 表示单位为米[默认值]。
  • km 表示单位为千米。
  • mi 表示单位为英里。
  • ft 表示单位为英尺。
  • 如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位

(4)georadius

georadius<key>< longitude><latitude>radius m|km|ft|mi 以给定的经纬度为中心,找出某一半径内的元素

image-20220608104720055

Redis_Jedis_测试

Jedis 所需要的 jar 包

引入依赖:

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>

连接 Redis 注意事项

  • 禁用 Linux 的防火墙:Linux(CentOS7)里执行命令
  • systemctl stop/disable firewalld.service
  • redis.conf 中注释掉 bind 127.0.0.1 ,然后 protected-mode no

Jedis 常用操作

(1)创建测试程序

1
2
3
4
5
6
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
String ping = jedis.ping();
System.out.println(ping);
}

(2)测试:Jedis-API: Key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 测试:key
*/
@Test
public void testKey(){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
jedis.set("k1","v1");
jedis.set("k2","v2");
jedis.set("k3","v3");
String k2 = jedis.get("k2");
System.out.println(k2);
Set<String> keys = jedis.keys("*");
for (String key:keys){
System.out.print(key+" ");
}
System.out.println();
System.out.println(jedis.exists("k3"));
System.out.println(jedis.ttl("k2"));//-1 表示永不过期
}

image-20220608114625329

(3)测试 :Jedis-API: String

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 测试String
*/
@Test
public void StringTest(){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
jedis.flushDB();
jedis.set("name","zlw");
jedis.mset("name1","jack","name2","mary");
String name = jedis.get("name");
System.out.println(name);
List<String> name1 = jedis.mget("name1","name2");
System.out.println(name1);
Long len = jedis.strlen("name");
System.out.println(len);

}

image-20220608134027626

(4)测试:Jedis-API: List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 测试list
*/
@Test
public void ListTest(){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
jedis.flushDB();
jedis.lpush("name", "zlw", "hhh", "aaa");
String name = jedis.lpop("name");
System.out.println(name);
List<String> name1 = jedis.lrange("name", 0, -1);
System.out.println(name1);
Long len = jedis.llen("name");
System.out.println(len);
}

image-20220608133540388

(5)测试:Jedis-API: set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 测试set
*/
@Test
public void setTest(){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
jedis.flushDB();
jedis.sadd("name","jack","mary","zlw");
Set<String> name = jedis.smembers("name");
System.out.println(name);
//判断集合中是否有:zlw
System.out.println(jedis.sismember("name", "zlw"));
//返回集合中元素的数量
Long len = jedis.scard("name");
System.out.println(len);
//删除集合中的元素
jedis.srem("name", "zlw");
Set<String> name1 = jedis.smembers("name");
System.out.println(name1);
}

image-20220608134454960

(6)测试:Jedis-API: hash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 测试hash
*/
@Test
public void hashTest(){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
jedis.flushDB();
jedis.hset("name","zlw","20");
System.out.println(jedis.hget("name", "zlw"));
Map<String,String> map = new HashMap<>();
map.put("jack","22");
map.put("mary","21");
jedis.hset("names",map);
System.out.println(jedis.hmget("names","jack","mary"));
//查看是否有元素
System.out.println(jedis.hexists("name", "zlw"));
//列出key中所有的filed
System.out.println(jedis.hkeys("names"));
//列出key中所有的value
System.out.println(jedis.hvals("names"));
}

image-20220608135235012

(7)测试:Jedis-API: zset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 测试zset
*/
@Test
public void zsetTest(){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
jedis.flushDB();
jedis.zadd("name1",100d,"zlw");
jedis.zadd("name1",99d,"jack");
jedis.zadd("name1",98d,"mary");
System.out.println(jedis.zrange("name1", 0, -1));
System.out.println(jedis.zrangeByScore("name1", 99, 100));
jedis.zincrby("name1",100,"mary");
System.out.println(jedis.zrange("name1", 0, -1));
}

image-20220608140213830

Redis_Jedis_实例 (完成手机验证)

完成一个手机验证码功能

要求:

(1)输入手机号,点击发送后随机生成 6 位数字码,2 分钟有效

(2)输入验证码,点击验证,返回成功或失败

(3)每个手机号每天只能输入 3 次

对要求的分析:

image-20220608140726491

代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.zlw.redis;

import redis.clients.jedis.Jedis;

import java.util.Random;

public class PhoneCode {
public static void main(String[] args) {
String phone = "18840613021";
PhoneCode p = new PhoneCode();
String code = p.getCode();
// String code = "399675";
// p.VerCode(phone,code);
p.setCodeToRedis(phone,code);

}
//1.生成随机6位数字验证
public String getCode(){
String code = "";
Random random = new Random();
for (int i = 0;i<6;i++){
code +=random.nextInt(10);
}
return code;
}

//将发送验证码的数量放入到redis中,设置每个手机一天之内只能发送三次,放入redis中,设置验证码在2分钟内有效,
public void setCodeToRedis(String phone, String code){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
//验证码数量的key
String codeCount = phone + ":count";
//验证码两分钟有效的key
String codeVer = phone + ":ver";

if (jedis.get(codeCount)==null){
jedis.setex(codeCount,24*60*60,"1");
}else if (Integer.parseInt(jedis.get(codeCount) )<=2){
jedis.incr(codeCount);
}else if (Integer.parseInt(jedis.get(codeCount) )>2){
System.out.println("今日发送已经超过三日,明日再来吧~~");
return;//如果发送失败了,就不会将验证码存入到redis中了,因此结束return;
}

jedis.setex(codeVer,120,code);
jedis.close();
}

//验证 验证码和输入的是否相等
public void VerCode(String phone,String code){
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
String codeVer = phone + ":ver";
String code1 = jedis.get(codeVer);
if (code1.equals(code)){
System.out.println("成功");
}else {
System.out.println("失败");
}
jedis.close();
}

}

Redis 与 Spring Boot 整合

整合步骤:

(1)在 pom.xml 文件中引入 redis 相关依赖

1
2
3
4
5
6
7
8
9
10
11
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X 集成 redis 所需 common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>

(2)application.properties 配置 redis 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#Redis 服务器地址
spring.redis.host=192.168.140.136
#Redis 服务器连接端口
spring.redis.port=6379
#Redis 数据库索引( 默认为 0
spring.redis.database= 0
#连接超时时间( 毫秒)
spring.redis.timeout=1800000
#连接池最大连接数( 使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0

(3)添加 redis 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key 序列化方式
template.setKeySerializer(redisSerializer);
//value 序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap 序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化( 解决乱码的问题) ,过期时间 600 秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().
entryTtl(Duration.ofSeconds(600)).
serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer)).
serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory).cacheDefaults(config).build();
return cacheManager;
}
}

瑞吉外卖中的配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Redis配置类
*/

@Configuration
public class RedisConfig extends CachingConfigurerSupport {

@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {

RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();

//默认的Key序列化器为:JdkSerializationRedisSerializer
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());

redisTemplate.setConnectionFactory(connectionFactory);

return redisTemplate;
}

}

(4)测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping
public String testRedis() {
//设置值到 redis
redisTemplate.opsForValue().set("name","lucy");
//从 redis 获取值
String name = (String)redisTemplate.opsForValue().get("name");
return name;
}
}

瑞吉外卖中的测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package com.itheima.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.*;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringDataRedisTest {

@Autowired
private RedisTemplate redisTemplate;

/**
* 操作String类型数据
*/
@Test
public void testString(){
redisTemplate.opsForValue().set("city123","beijing");

String value = (String) redisTemplate.opsForValue().get("city123");
System.out.println(value);

redisTemplate.opsForValue().set("key1","value1",10l, TimeUnit.SECONDS);

Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent("city1234", "nanjing");
System.out.println(aBoolean);
}

/**
* 操作Hash类型数据
*/
@Test
public void testHash(){
HashOperations hashOperations = redisTemplate.opsForHash();

//存值
hashOperations.put("002","name","xiaoming");
hashOperations.put("002","age","20");
hashOperations.put("002","address","bj");

//取值
String age = (String) hashOperations.get("002", "age");
System.out.println(age);

//获得hash结构中的所有字段
Set keys = hashOperations.keys("002");
for (Object key : keys) {
System.out.println(key);
}

//获得hash结构中的所有值
List values = hashOperations.values("002");
for (Object value : values) {
System.out.println(value);
}
}

/**
* 操作List类型的数据
*/
@Test
public void testList(){
ListOperations listOperations = redisTemplate.opsForList();

//存值
listOperations.leftPush("mylist","a");
listOperations.leftPushAll("mylist","b","c","d");

//取值
List<String> mylist = listOperations.range("mylist", 0, -1);
for (String value : mylist) {
System.out.println(value);
}

//获得列表长度 llen
Long size = listOperations.size("mylist");
int lSize = size.intValue();
for (int i = 0; i < lSize; i++) {
//出队列
String element = (String) listOperations.rightPop("mylist");
System.out.println(element);
}
}

/**
* 操作Set类型的数据
*/
@Test
public void testSet(){
SetOperations setOperations = redisTemplate.opsForSet();

//存值
setOperations.add("myset","a","b","c","a");

//取值
Set<String> myset = setOperations.members("myset");
for (String o : myset) {
System.out.println(o);
}

//删除成员
setOperations.remove("myset","a","b");

//取值
myset = setOperations.members("myset");
for (String o : myset) {
System.out.println(o);
}

}

/**
* 操作ZSet类型的数据
*/
@Test
public void testZset(){
ZSetOperations zSetOperations = redisTemplate.opsForZSet();

//存值
zSetOperations.add("myZset","a",10.0);
zSetOperations.add("myZset","b",11.0);
zSetOperations.add("myZset","c",12.0);
zSetOperations.add("myZset","a",13.0);

//取值
Set<String> myZset = zSetOperations.range("myZset", 0, -1);
for (String s : myZset) {
System.out.println(s);
}

//修改分数
zSetOperations.incrementScore("myZset","b",20.0);

//取值
myZset = zSetOperations.range("myZset", 0, -1);
for (String s : myZset) {
System.out.println(s);
}

//删除成员
zSetOperations.remove("myZset","a","b");

//取值
myZset = zSetOperations.range("myZset", 0, -1);
for (String s : myZset) {
System.out.println(s);
}
}

/**
* 通用操作,针对不同的数据类型都可以操作
*/
@Test
public void testCommon(){
//获取Redis中所有的key
Set<String> keys = redisTemplate.keys("*");
for (String key : keys) {
System.out.println(key);
}

//判断某个key是否存在
Boolean itcast = redisTemplate.hasKey("itcast");
System.out.println(itcast);

//删除指定key
redisTemplate.delete("myZset");

//获取指定key对应的value的数据类型
DataType dataType = redisTemplate.type("myset");
System.out.println(dataType.name());

}
}

Redis_事务

Redis 事务的主要作用就是串联多个命令防止别的命令插队

从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec 后,Redis 会将之前的命令队列中的命令依次执行。组队的过程中可以通过 discard 来放弃组队。

image-20220609132109343

事务的错误处理

  • 组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消
  • 如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚 。因此Redis不具有原子性。

悲观锁和乐观锁

悲观锁

​ 就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会 block 直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

image-20220609132539677

乐观锁

就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。 乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis 就是利用这种 check-and-set 机制实现事务的。

WATCH key [key …]

在执行 multi 之前,先执行 watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断

unwatch :取消 WATCH 命令对所有 key 的监视

Redis事务的相关指令

(1)WATCH命令:乐观锁,可以为Redis事务提供check-and-set(CAS)行为。可以监控一个或多个键,一旦有一个被修改和删除,之后的事务不会执行。
(2)MULTI:开启一个事务,MULTI执行后,客户端可以向服务器发送多条命令,这些命令不会立即执行,而时被放入到一个队列中,当EXEC命令被调用时,所有的队列命令才会执行。

(3)EXEC:执行事务块内的命令,返回事务块内所有命令的返回值,按命令执行先后顺序排列,操作被打断时,返回nil。

(4)UNWATCH:可以取消watch对所有key的监控。

Redis 事务三特性

(1)单独的隔离操作

  • 事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

(2)没有隔离级别的概念

  • 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

(3)不保证原子性

  • 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

Redis-事务-秒杀案例

最基本的案例:最基本的代码:

代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//秒杀过程
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1.判断uid和proid是否为空,如果为空则不能秒杀
if (uid==null || prodid==null) return false;
//设置key,产品的key和用户的key
String proKey = "sk:" + prodid + ":pt";
String usrKey = "sk:" + uid + ":pt";

//判断库存是否为null,null证明秒杀没有开始
Jedis jedis = new Jedis("192.168.86.129",6379);
jedis.auth("Zlw199805");
if (jedis.get(proKey)==null){
System.out.println("秒杀还没有开始");
jedis.close();
return false;
}
//判断库存是否够用,小于1 证明秒杀完成
if (Integer.parseInt(jedis.get(proKey) )<=0){
System.out.println("秒杀结束");
jedis.close();
return false;
}

//判断用户是否已经秒杀过一次,同一个用户只能秒杀一次
if (jedis.sismember(usrKey,uid)){
System.out.println("该用户已经秒杀完成,不可以重复秒杀");
jedis.close();
return false;
}

//进行秒杀商品数量减1
jedis.decr(proKey);
//加入用户
jedis.sadd(usrKey,uid);
System.out.println("秒杀成功!");
jedis.close();
return true;

}

但在以上的这段代码中,会发生连接超时和超卖问题(库存存在负数了)。

(1)修改连接超时问题:

​ 通过数据库连接池实现:

redis数据库连接池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class JedisPoolUtil {
private static volatile JedisPool jedisPool = null;

private JedisPoolUtil() {
}

public static JedisPool getJedisPoolInstance() {
if (null == jedisPool) {
synchronized (JedisPoolUtil.class) {
if (null == jedisPool) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(200);
poolConfig.setMaxIdle(32);
poolConfig.setMaxWaitMillis(100*1000);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true); // ping PONG

jedisPool = new JedisPool(poolConfig, "192.168.86.129", 6379, 60000 );
}
}
}
return jedisPool;
}

public static void release(JedisPool jedisPool, Jedis jedis) {
if (null != jedis) {
jedisPool.returnResource(jedis);
}
}

}

将秒杀系统中的jedis对象,通过数据库连接池创建。

1
2
3
4
5
6
//		Jedis jedis = new Jedis("192.168.86.129",6379);
// jedis.auth("Zlw199805");
//通过连接池实现
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();
jedis.auth("Zlw199805");

(2)超卖问题

使用事务中的乐观锁监视即可。

代码为:

  • 第一步监视商品的数量:

    1
    2
    //监视商品的数量
    jedis.watch(proKey);
  • 第二步:增加事务,商品数量减1 ,增加用户,执行事务,判断事务执行是否失败

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    //增加事务
    Transaction multi = jedis.multi();
    //进行秒杀商品数量减1
    // jedis.decr(proKey);
    //减少库存
    multi.decr(proKey);
    //加入用户
    // jedis.sadd(usrKey,uid);
    //增加成功的用户
    multi.sadd(usrKey,uid);

    //执行事务
    List<Object> exec = multi.exec();
    //判断事务提交是否失败
    if (exec==null ||exec.size()==0){
    System.out.println("秒杀结束了");
    jedis.close();
    return false;
    }

(3)但使用乐观锁会产生数据遗留问题。因为乐观锁,当数据修改时,会改变版本号,其他的用户不能秒杀,所以出现库存遗留

使用LUA脚本解决库存遗留问题
(LUA:是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。)

代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local userid=KEYS[1];
local prodid=KEYS[2];
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid.":usr';
local userExists=redis.call("sismember",usersKey,userid);
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("get" ,qtkey);
if tonumber(num)<=0 then
return 0;
else
redis.call("decr",qtkey);
redis.call("sadd",usersKey,userid);
end
return 1;

Redis 持久化之 RDB

为什么要持久化?

为了能够重用Redis数据,或者防止系统故障,我们需要将Redis中的数据写入到磁盘中,即持久化。

持久化:redis可以写到硬盘中。。

RDB是持久化方式,按照一定的时间间隔将内存的数据以快照的形式保存到硬盘,恢复时将快照读取到内存,

RDB是什么?

指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的 Snapshot 快照,它恢复时是将快照文件直接读到内存里

image-20220609193324763

备份是如何进行的?

Redis 会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何 IO 操作的,

image-20220609160158010

Fork 的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程

RDB的优缺点

优点:

  • 适合大规模的数据恢复
  • 对数据完整性和一致性要求不高更适合使用
  • 节省磁盘空间
  • 恢复速度快

缺点:

  • Fork 的时候,内存中的数据被克隆了一份,大致 2 倍的膨胀性需要考虑
  • 虽然 Redis 在 fork 时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
  • 备份周期在一定间隔时间做一次备份,所以如果 Redis 意外 down 掉的话,就会丢失最后一次快照后的所有修改

rbd的备份

(1)先通过 config get dir 查询 rdb 文件的目录

(2)将*.rdb 的文件拷贝到别的地方 eg:cp dump.rdb d.rdb

(3)rdb的恢复

  • 关闭 Redis ,删除dump.rdb

  • 先把备份的文件拷贝到工作目录下 cp d.rdb dump.rdb

  • 启动 Redis, 备份数据会直接加载

  • image-20220609163844921

    image-20220609163802517

image-20220609163732378

Redis 持久化之 AOF

AOF 持久化以日志的形式记录服务器所处理的每一个写、删除操作,但查询操作不会记录,以文本的方式记录,可以打开文件看到详细的操作。

AOF采用文件追加方式,文件越来越大,为避免出现这种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值,Redis就会启动AOF文件的内存压缩,只保留可以恢复数据的最小指令集。

image-20220609193655926

AOF 持久化流程

(1)客户端的请求写命令会被 append 追加到 AOF 缓冲区内;
(2)AOF 缓冲区根据 AOF 持久化策略[always,everysec,no]将操作 sync 同步到磁盘的AOF 文件中;
(3)AOF 文件大小超过重写策略或手动重写时,会对 AOF 文件 rewrite 重写,压缩AOF 文件容量;

(4)Redis 服务重启时,会重新 load 加载 AOF 文件中的写操作达到数据恢复的目的;

AOF 启动/修复/恢复

  • AOF 的备份机制和性能虽然和 RDB 不同, 但是备份和恢复的操作同 RDB 一样,都是拷贝备份文件,需要恢复时再拷贝到 Redis 工作目录下,启动系统即加载

  • 正常恢复

    • 修改默认的 appendonly no,改为 yes
    • 将有数据的 aof 文件复制一份保存到对应目录(查看目录:config get dir)
    • 恢复:重启 redis 然后重新加载
  • 异常恢复

    • 修改默认的 appendonly no,改为 yes
    • 如遇到 AOF 文件损坏,通过/usr/local/bin/redis-check-aof–fix appendonly.aof 进行恢复
    • 备份被写坏的 AOF 文件
    • 恢复:重启 redis,然后重新加载

AOF的优缺点

优势

  • 备份机制更稳健,丢失数据概率更低
  • 可读的日志文本,通过操作 AOF 稳健,可以处理误操作。

劣势

  • 比起 RDB 占用更多的磁盘空间。
  • 恢复备份速度要慢。
  • 每次读写都同步的话,有一定的性能压力
  • 存在个别 Bug,造成恢复不能。

对于持久化方式官网的建议

  • 官方推荐两个都启用。
  • 如果对数据不敏感,可以选单独用 RDB。
  • 不建议单独用 AOF,因为可能会出现 Bug。
  • 如果只是做纯内存缓存,可以都不用。

主从复制

主机数据更新后根据配置和策略, 自动同步到备机的 master/slaver 机制,Master 以写为主,Slave 以读为主

能干嘛

  • 读写分离,性能扩展
  • 容灾快速恢复 (当一台从机挂掉之后,其他从机能快速提供服务 )
  • image-20220610102855173

步骤

image-20220610102915397

image-20220610104030188

info replication
打印主从复制的相关信息

image-20220610111351081

配从(库)不配主(库)

slaveof <ip><port> :主服务器的ip和端口。

在主机上写,在从机上可以读取数据

主机挂掉,重启就行,一切如初

从机重启需重设:slaveof 127.0.0.1 6379

一主二仆

从机不可以写数据,

主机shutdown后,从机还是原地待命,不会上位变为主机。

主机又回来之后,主机新增记录,从机还能顺利复制,

其中一台从机down后,从新开启,重设slaveof 127.0.0.1 6379 ,依旧可以复制主机中的全部数据,。

薪火相传

其中一个从机可以作为其他其他从机的主机,。同样可以接受其他从机的连接和请求,可以有效减轻master’写的压力。也就是一态主机只写到一台从机,而一台从机下面右好多从机(缺点就是,唯一的这台与主机交互的从机一旦down掉,则后面的那些从机都没办法复制)

**用 slaveof <ip><port>**

中途变更转向:会清除之前的数据,重新建立拷贝最新的

风险是一旦某个 slave 宕机,后面的 slave 都没法备份

主机挂了,从机还是从机,无法写数据了

image-20220610113123157

反客为主

当一个 master 宕机后,后面的 slave 可以立刻升为 master,其后面的 slave 不用做任何修改。

slaveof no one 将从机变为主机。

但这个过程是手动设置的,但在真正的使用中,如果主机挂掉,工作人员需要时间去指定下一个上位的从机,但这会花费时间,有没有一种可以自动去执行的这个过程?

搭:哨兵模式。在下面

复制原理

  • Slave 启动成功连接到 master 后会发送一个 sync(内存中的数据写入磁盘中) 命令
  • Master 接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,master 将传送整个数据文件到 slave,以完成一次完全同步
  • 全量复制:而 slave 服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  • 增量复制:Master 继续将新的所有收集到的修改命令依次传给 slave,完成同步
  • 但是只要是重新连接 master,一次完全同步(全量复制)将被自动执行

image-20220610111839270

哨兵模式(sentinel)

是什么:

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库

image-20220610115550445

步骤:

(1)调整为一主二仆模式, 6379 带着 6380、 6381

(2)自定义的/myredis 目录下新建 sentinel.conf 文件, 名字绝不能错

(3)配置哨兵,填写内容 :sentinel monitor mymaster 127.0.0.1 6379 1 (其中 mymaster 为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。 )

(4)启动哨兵 :执行 redis-sentinel /myredis/sentinel.conf

image-20220610115726460

当主机挂掉, 从机选举中产生新的主机

应该选择哪个从机作为主机呢?

(1)选择优先级靠前的。—redis.conf配置文件中,replica-priority 默认值为100,值越小优先级越高

(2)选择偏移量最大的,也就是从机中包含主机中数据最多的。

(3)选择runid最小的从服务::每个 redis 实例启动后都会随机生成一个 40 位的 runid (这个就是随机选取了)

主从复制设置哨兵的工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static JedisSentinelPool jedisSentinelPool=null;
public static Jedis getJedisFromSentinel(){
if(jedisSentinelPool==null){
Set<String> sentinelSet=new HashSet<>();
sentinelSet.add("192.168.11.103:26379");
JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10); //最大可用连接数
jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
jedisPoolConfig.setMinIdle(5); //最小闲置连接数
jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
return jedisSentinelPool.getResource();
replica-priority}
else{
return jedisSentinelPool.getResource();
}
}

Redis 集群

容量不够,redis 如何进行扩容? 可以使用多台redis

并发写操作, redis 如何分摊? 多个redis

之前通过代理主机来解决,但是 redis3.0 中提供了解决方案。就是无中心化集群配置。

之前的代理主机:

image-20220610121734297

现在的无中心化集群配置:

任何一台服务器都可以作为集群的入口。可以把请求转交给任何一台服务器

image-20220610121748075

什么是集群

Redis 集群实现了对 Redis 的水平扩容,即启动 N 个 redis 节点,将整个数据库分布存储在这 N 个节点中,每个节点存储总数据的 1/N

Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。 (提供了从机)

配置集群步骤:

(1)将 rdb,aof 文件都删除掉

(2)制作 6 个实例, 6379,6380,6381,6389,6390,6391.conf

(3)redis cluster 配置修改 ,在原来的主从复制中的配置添加:

  • cluster-enabled yes 打开集群模式
  • cluster-config-file nodes-6379.conf 设定节点配置文件名
  • cluster-node-timeout 15000 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。
1
2
3
4
5
6
7
8
9
include /home/bigdata/redis.conf
port 6379
pidfile "/var/run/redis_6379.pid"
dbfilename "dump6379.rdb"
dir "/home/bigdata/redis_cluster"
logfile "/home/bigdata/redis_cluster/redis_err_6379.log"
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 15000

(4)修 改 好 redis6379.conf 文 件 , 拷 贝 多 个redis.conf 文件 ,分别对应( 6379,6380,6381,6389,6390,6391.conf)

(5)使用查找替换修改另外 5 个文件 :%s/6379/6380

(6)启动 6 个 redis 服务

(7)将六个节点合成一个集群 :组合之前,请确保所有 redis 实例启动后,nodes-xxxx.conf 文件都生成正常

  • 合体: cd /opt/redis-6.2.1/src redis文件的位置中的src
1
2
3
redis-cli --cluster create --cluster-replicas 1 192.168.11.101:6379
192.168.11.101:6380 192.168.11.101:6381 192.168.11.101:6389
192.168.11.101:6390 192.168.11.101:6391

此处不要用 127.0.0.1, 请用真实 IP 地址
–replicas 1 :采用最简单的方式配置集群,一台主机,一台从机,正好三组。

(8)采用集群的方式连接

image-20220610124129675

(9)通过 cluster nodes 命令查看集群信息

redis cluster 如何分配这六个节点?

(1)一个集群至少要有三个主节点

(2)选项 –cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点

(3)分配原则尽量保证每个主数据库运行在不同的 IP 地址,每个从库和主库不在一个 IP 地址上。 (因为当一台主机挂掉之后,从机可以变为主机,如果都在一台服务器上时,则从机也挂掉了)

什么是 slots

一个 Redis 集群包含 16384 个插槽(hash slot)(为了集群平均分配), 数据库中的每个键都属于这 16384个插槽的其中一个

image-20220610131018008

集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。

集群中的每个节点负责处理一部分插槽

在集群中录入值:自动分配到键所对应的插槽中,

image-20220610130004189

不能进行 mget 和 mset 操作,可以通过{}来定义组的概念,从而使 key 中{}内相同内容的键值对放到一个 slot 中去。

image-20220610130140398

查询集群中的值 (计算插槽值):

image-20220610130257347

CLUSTER GETKEYSINSLOT <slot><count> 返回 count 个 slot 槽中的键。

故障恢复

如果主节点下线,则从节点自动上位为主节点,

当主节点恢复回来后,变为从节点,

如果所有某一段插槽的主从节点都宕掉,redis 服务是否还能继续?

  • 如果某一段插槽的主从都挂掉,而redis.conf中的 cluster-require-full-coverage 为 yes ,那么 ,整个集群都挂掉
  • 如果某一段插槽的主从都挂掉,而 redis.conf中的cluster-require-full-coverage 为 no ,那么,该插槽数据全都不能使用,也无法存储。但其他插槽还可以继续使用

集群的 Jedis 开发

image-20220610132151742

Redis集群的好处和不足

好处:

  • 实现扩容 (用集群实现扩容 )
  • 分摊压力 (用多台机器分担某一台机器(插槽) )
  • 无中心配置相对简单 (任何一个节点都能进入到集群,之间能够互相切换 )

不足”:

  • 多键操作是不被支持的
  • 多键的 Redis 事务是不被支持的。lua 脚本不被支持
  • 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至 redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

Redis应用问题的解决

缓存穿透

问题描述

key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库

image-20220610143457717

解决方案

最终解决的方式就是,不安全的连接不能进入数据库,拦到外面

(1)对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟 .

(2)设置可访问的名单(白名单) :使用 bitmaps 类型定义一个可以访问的名单,名单 id 作为 bitmaps 的偏移量,每次访问和 bitmap 里面的 id 进行比较,如果访问 id 不在 bitmaps 里面,进行拦截,不允许访问。

(3)采用布隆过滤器 :将所有可能存在的数据哈希到一个足够大的 bitmaps 中,一个一定不存在的数据会被 这个 bitmaps 拦截掉,从而避免了对底层存储系统的查询压力。

(4)进行实时监控:当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

缓存击穿

key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮 。

image-20220610144458611

解决方案:

(1)预先设置热门数据:在 redis 高峰访问之前,把一些热门数据提前存入到redis 里面,加大这些热门数据 key 的时长
(2)实时调整:现场监控哪些数据热门,实时调整 key 的过期时长
(3)使用锁:(效率低)

  • 就是在缓存失效的时候(判断拿出来的值为空),不是立即去 load db。
  • 先使用缓存工具的某些带成功操作返回值的操作(比如 Redis 的 SETNX)去 set 一个 mutex key
  • 当操作返回成功时,再进行 load db 的操作,并回设缓存,最后删除 mutexkey;
  • 当操作返回失败,证明有线程在 load db,当前线程睡眠一段时间再重试整个 get 缓存的方法
  • image-20220610144742863

缓存雪崩

key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。
缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个 key

image-20220610145205724

解决方案:“

(1) 构建多级缓存架构:nginx 缓存 + redis 缓存 +其他缓存(ehcache 等)
(2) 使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
(3) 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际 key 的缓存。
(4) 将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

分布式锁

原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

image-20220610150150521

image-20220610150026377

编写代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@GetMapping("testLock")
public void testLock(){
//1 获取锁, setne
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
//2 获取锁成功、 查询 num 的值if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1 判断 num 为空 return
if(StringUtils.isEmpty(value)){
return;
}
//2.2 有值就转成成 int
int num = Integer.parseInt(value+"");
//2.3 把 redis 的 num 加 1
redisTemplate.opsForValue().set("num", ++num);
//2.4 释放锁, del
redisTemplate.delete("lock");
}else{
//3 获取锁失败、 每隔 0.1 秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

问题1: setnx 刚好获取到锁, 业务逻辑出现异常, 导致锁无法释放
解决: 设置过期时间, 自动释放锁

**优化之设置锁的过期时间 **

两种方式:

(1)首先想到通过 expire 设置过期时间(缺乏原子性: 如果在 setnx 和 expire 之
间出现异常, 锁也无法释放)
(2) 在 set 时指定过期时间(推荐)

image-20220610152715911

问题2: 可能会释放其他服务器的锁。

场景:

image-20220610152828761

代码:

image-20220610152957380

image-20220610153011261

但是使用了UUID之后不具有原子性:

image-20220610153713549

使用LUA脚本保证删除的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个 uuid ,将做为一个 value 放入我们的 key 所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁: lua 脚本可以使用同一把锁, 来实现删除!
String skuId = "25"; // 访问 skuId 为 25 号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3,
TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果 true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的 num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么 delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使 num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用 lua 脚本来锁*/
// 定义 lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return
redis.call('del', KEYS[1]) else return 0 end";
// 使用 redis 执行 lua 执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为 Long
// 因为删除判断的时候, 返回的 0,给其封装为数据类型。 如果不封装那么默认返回 String 类型,
// 那么返回字符串与 0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是 script 脚本 , 第二个需要判断的 key, 第三个就是 key 所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后, 调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

LUA脚本详解

image-20220610154032749

image-20220610154103532


redis
http://example.com/2022/06/07/redis/
作者
zlw
发布于
2022年6月7日
许可协议