自定义结构 RandomPool

请设计出一种结构,在该结构中有如下三个功能,要求其时间复杂度为 $O(1)$

  • insert(key):将某个 key 加入到该结构,做到不重复加入

  • delete(key):将原本在结构中的某个 key 移除

  • getRandom(): 等概率随机返回结构中的任何一个key

设计思路

  • 设计两个哈希表 keyIndexMapindexKeyMap

  • keyIndexMap 用来记录 key 到 index 的对应关系

  • indexKeyMap 用来记录 index 到 key 的对应关系

  • 整数size初始化为0,记录Pool的大小

  • 执行 insert(newKey),将(newKey,size)放入keyIndexMap,将(size,newKey)放入indexKeyMap,然后size自增

  • 执行 delete(deleteKey),最新加入的key记为lastKey,对应的index信息记为lastIndex。删除的key记为deleteKey,对应的index记为deleteKey

    • 把记录(lastKey,lastIndex)变为(lastKey,deleteIndex),并在indexMap中把记录(deleteIndex,deleteKey)变为(deleteIndex,lastKey)
    • 在keyIndexMap中删除记录(deleteKey,deleteIndex),在indexKeyMap中把记录(lastIndex,lastKey)删除
    • size减一
  • getRandom 操作时根据size随机得到index,返回对应的key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class RandomPool {

public static class Pool<T> {
// 实现T-->index的映射
private HashMap<T, Integer> keyIndexMap;
// 实现index-->T的映射
private HashMap<Integer, T> indexKeyMap;
// Pool的大小
private int size;

public Pool() {
this.indexKeyMap = new HashMap<>();
this.keyIndexMap = new HashMap<>();
this.size = 0;
}

// 新增记录
public void insert(T key) {
if (!this.keyIndexMap.containsKey(key)) {
this.keyIndexMap.put(key, this.size);
this.indexKeyMap.put(this.size++, key);
}
}

/**
* 删除记录
* keyIndexMap (lastKey,lastIndex)-->(lastKey,deleteIndex)
* indexKeyMap (deleteIndex,deleteKey)-->(deleteIndex,lastKey)
* <p>
* delete keyIndexMap (deleteKey,deleteIndex)
* delete indexKeyMap (lastKey,lastIndex)
*
* @param key
*/
public void delete(T key) {
if (this.keyIndexMap.containsKey(key)) {
// 指向要删除元素的index
int deleteIndex = this.keyIndexMap.get(key);
// 指向最后一个元素的index
int lastIndex = --this.size;
// 指向最后一个元素的key
T lastKey = this.indexKeyMap.get(lastIndex);

this.keyIndexMap.put(lastKey, deleteIndex);
this.indexKeyMap.put(deleteIndex, lastKey);

this.keyIndexMap.remove(key);
this.indexKeyMap.remove(lastIndex);

}

}

// 随机获取记录
public T getRandom() {
if (this.size == 0) {
return null;
}
int randomIndex = (int) (Math.random() * this.size); // 0 ~ size -1
return this.indexKeyMap.get(randomIndex);
}
}
}

bitmap

基本思想:用一个bit位标记某个元素对应的value,key即是该元素

假设32位操作系统,4G内存,在20亿个随机整数中找出某个数m是否存在其中

  • 如果每个数字用int存储,那就是20亿个int,因而占用的空间约为 (20000000004/1024/1024/1024)≈ *7.45 G

  • 如果每个数字用位存储,20亿的数就是20亿位,占空间约为(2000000000/8/1024/1024/1024)≈ 0.23 G

存储数据

用bitmap存储(1, 2 ,4, 6)

1个int占32位,那么我们只需要申请一个int数组长度为 int tmp[1+N/32] 即可存储,其中N表示要存储的这些数中的最大值,于是乎:

  • tmp[0]:可以表示0~31

  • tmp[1]:可以表示32~63

  • tmp[2]:可以表示64~95

添加操作

放入数字5,5/32=0,5%32=5,在temp[0]位置,把1向左移动五位,按位或

b[0] = b[0] | (1<<5)

插入一个数,将1左移带代表该数字的那一位,然后与原数进行按位或操作
公式可以概括为:p + (i/8)|(1<<(i%8)) 其中,p表示现在的值,i表示待插入的数

删除操作

删除6

1左移6位,到达数字6代表的位,然后按位取反,最后与原数按位与

b[0] = b[0] & (~(1<<6))

b[0] = b[0] & (~(1<<(i%8)))

查找操作

判断3在不在,那么只需判断 b[0] & (1<<3) 如果这个值是0,则不存在,如果是1,就表示存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

public static void main(String[] args) {
int a = 0;
// a 32 bit
int[] arr = new int[10]; // 32bit * 10 -> 320 bits
// arr[0] int 0 ~ 31
// arr[1] int 32 ~ 63

int i = 178; // 取得178个bit的状态
// 定位出 下标
int numIndex = i / 32;
// 定位出 位
int bitIndex = i % 32;
// 拿到i位的状态
int s = ((arr[numIndex]>>(bitIndex)) & 1);
// 添加操作
arr[numIndex] = arr[numIndex] | (1<<(bitIndex));
i = 178;
// 删除操作
arr[numIndex] = arr[numIndex] & (~(1<<bitIndex));
}

应用场景

优缺点

优点:

  • 运算效率高,不需要进行比较和移位;
  • 占用内存少,比如N=10000000;只需占用内存为N/8=1250000Byte=1.25M

缺点:

  • 所有的数据不能重复。即不可对重复的数据进行排序和查找。
  • 只有当数据比较密集时才有优势

快速排序

  • 要对0-7内的5个元素 (4,7,2,5,3) 排序(这里假设这些元素没有重复)

  • 要表示8个数,我们就只需要8个Bit(1Bytes),开辟1Byte的空间,将这些空间的所有Bit位都置为0,然后将对应位置为1。

  • 遍历一遍Bit区域,将该位是一的位的编号输出(2,3,4,5,7),这样就达到了排序的目的,时间复杂度 $O(n)$

快速去重

20亿个整数中找出不重复的整数的个数,内存不足以容纳这20亿个整数

  • 一个数字的状态只有三种,分别为不存在,只有一个,有重复。

  • 需要2bit就可以对一个数字的状态进行存储

  • 假设一个数字不存在为00,存在一次01,存在两次及其以上为11。那我们大概需要存储空间2G左右。

  • 把这20亿个数字存储,如果对应的状态位为00,则将其变为01,表示存在一次

  • 如果对应的状态位为01,则将其变为11,表示已经有一个了,即出现多次

  • 如果为11,则对应的状态位保持不变,仍表示出现多次

  • 统计状态位为01的个数,就得到了不重复的数字个数,时间复杂度为 $O(n)$

快速查找

int数组中的一个元素是4字节占32位,那么除以32就知道元素的下标,对32求余数(%32)就知道它在哪一位,如果该位是1,则表示存在

补充:在没有溢出的前提下

  • << 左移,相当于乘以2的n次方,例如:1<<6 相当于1×64=64,3<<4 相当于3×16=48*

  • >> 右移,相当于除以2的n次方,例如:64>>3 相当于64÷8=8

  • ^ 异或,相当于求余数,例如:48^32 相当于 48%32=16

总结回顾

  • Bitmap主要用于快速检索关键字状态,通常要求关键字是一个连续的序列(或者关键字是一个连续序列中的大部分)

  • 最基本的情况,使用1bit表示一个关键字的状态(可标示两种状态),但根据需要也可以使用2bit(表示4种状态),3bit(表示8种状态)。

  • Bitmap的主要应用场合:表示连续(或接近连续,即大部分会出现)的关键字序列的状态(状态数/关键字个数 越小越好)

布隆过滤器

突破口

面试遇到黑名单系统、垃圾邮件过滤系统、爬虫的网址判重系统等。又看到系统容忍一定程度的失误率,但是对空间要求严格,可以考虑布隆过滤器

布隆过滤器是一个很长的二进制向量和一系列随机映射函数。一个布隆过滤器精确的代表一个集合,可以精确判断一个元素是否在一个集合中,缺点是有一定的误识别率和删除困难。

布隆过滤器

布隆过滤器的优点:

  • 时间复杂度低,增加和查询元素的时间复杂为 $O(N)$ ,( $N$ 为哈希函数的个数,通常情况比较小)

  • 保密性强,布隆过滤器不存储元素本身

  • 存储空间小,如果允许存在一定的误判,布隆过滤器是非常节省空间的(相比其他数据结构如Set集合)

布隆过滤器的缺点:

  • 有点一定的误判率,但是可以通过调整参数来降低
  • 无法获取元素本身
  • 很难删除元素

应用场景

布隆过滤器可以告诉我们 “某样东西一定不存在或者可能存在”,也就是说布隆过滤器说这个数不存在则一定不存在,布隆过滤器说这个数存在可能不存在

  • 解决Redis缓存穿透问题

  • 邮件过滤,使用布隆过滤器来做邮件黑名单过滤

  • 对爬虫网址进行过滤,爬过的不再爬

  • 解决新闻推荐过的不再推荐(类似抖音刷过的往下滑动不再刷到)

  • HBase\RocksDB\LevelDB等数据库内置布隆过滤器,用于判断数据是否存在,可以减少数据库的IO请求

实现原理

当一个元素被加入集合时,通过 K 个散列函数将这个元素映射成一个位数组(Bit array)中的 K 个点,把它们置为 1 。

检索时,只要看看这些点是不是都是1就知道元素是否在集合中;如果这些点有任何一个 0,则被检元素一定不在;如果都是1,则被检元素很可能在(之所以说“可能”是误差的存在)。

数据结构

布隆过滤器是一个很长的二进制向量和一系列随机映射函数

以Redis中的布隆过滤器实现为例,Redis中的布隆过滤器底层是 一个大型位数组(二进制数组)+ 多个无偏hash函数

空间计算

在布隆过滤器增加元素之前

  • 需要初始化布隆过滤器的空间,也就是上面说的二进制数组
  • 需要计算无偏hash函数的个数。

布隆过滤器提供了两个参数,分别是预计加入元素的大小n,运行的错误率f

布隆过滤器中有算法根据这两个参数会计算出二进制数组的大小l,以及无偏hash函数的个数k

它们之间的关系比较简单:

  • 错误率越低,位数组越长,控件占用较大
  • 错误率越低,无偏hash函数越多,计算耗时较长

增加元素

往布隆过滤器增加元素,添加的key需要根据k个无偏hash函数计算得到多个hash值,

然后对数组长度进行取模得到数组下标的位置,然后将对应数组下标的位置的值置为1

  • 通过k个无偏hash函数计算得到k个hash值
  • 依次取模数组长度,得到数组索引
  • 将计算得到的数组索引下标位置数据修改为1

查询元素

布隆过滤器最大的用处就在于判断某样东西一定不存在或者可能存在,而这个就是查询元素的结果。其查询元素的过程如下:

  • 通过k个无偏hash函数计算得到k个hash值
  • 依次取模数组长度,得到数组索引
  • 判断索引处的值是否全部为1,如果全部为1则存在==(这种存在可能是误判)==,如果存在一个0则必定不存在

关于误判,其实非常好理解,hash函数在怎么好,也无法完全避免hash冲突,也就是说可能会存在多个元素计算的hash值是相同的,那么它们取模数组长度后的到的数组索引也是相同的,这就是误判的原因。

例如李子捌和李子柒的hash值取模后得到的数组索引都是1,但其实这里只有李子捌,如果此时判断李子柒在不在这里,误判就出现啦!

一致性哈希算法

问题背景

工程师使用服务器集群设计和实现数据缓存

1.无论是添加、查询和删除数据,都先将数据的id通过哈希函数转换成哈希值,记为key

2.如果目前机器有N台,则计算key%N的值,这个值就是该数据所属的机器编号,无论添加、删除还是查询,都只在这台机器上运行

该缓存策略存在的问题

如果增加或删除机器,所有的数据不得不根据id重新算哈希值,并将哈希值对新机器数进行取模操作,然后进行大规模数据迁移

改进方案

分布式系统中对象与节点的映射关系,传统方案是使用对象的哈希值,对节点个数取模,再映射到相应编号的节点,

这种方案在节点个数变动时,绝大多数对象的映射关系会失效而需要迁移

而一致性哈希算法中,当节点个数变动时,映射关系失效的对象非常少,迁移成本也非常小。

算法原理

映射方案

公用哈希函数和哈希环

设计哈希函数 Hash(key),要求取值范围为 $[0, 2^{32})$
各哈希值在上图 Hash 环上的分布:时钟12点位置为0,按顺时针方向递增,临近12点的左侧位置为 $2^{32}-1$ 。

节点(Node)映射至哈希环

如图哈希环上的绿球所示,四个节点 Node A/B/C/D,其 IP 地址或机器名,经过同一个 Hash() 计算的结果,映射到哈希环上。

对象(Object)映射至哈希环

如图哈希环上的黄球所示,四个对象 Object A/B/C/D,其键值,经过同一个 Hash() 计算的结果,映射到哈希环上。

对象(Object)归属节点(Node)

要确定某个对象归属哪个节点,只需从该对象开始,沿着哈希环顺时针方向查找,找到距离最近的节点,Object A/B/C/D 分别映射至 Node A/B/C/D。

删除节点

现实场景:服务器缩容时删除节点,或者有节点宕机。

要删除节点 Node C:只会影响Object C,,调整映射至欲删除节点的下一个节点 Node D。

增加节点

现实场景:服务器扩容时增加节点。比如要在 Node B/C 之间增加节点 Node X:

只会影响Object C,调整映射至新增的节点 Node X。

虚拟节点

节点数越少,越容易出现节点在哈希环上的分布不均匀,导致各节点映射的对象数量严重不均衡(数据倾斜);相反,节点数越多越密集,数据在哈希环上的分布就越均匀。 但实际部署的物理节点有限,我们可以用有限的物理节点,虚拟出足够多的虚拟节点(Virtual Node),最终达到数据在哈希环上均匀分布的效果

如下图,实际只部署了2个节点 Node A/B,每个节点都复制成3倍,结果看上去是部署了6个节点。
可以想象,当复制倍数为 $2^32$ 时,就达到绝对的均匀,通常可取复制倍数为32或更高。

虚拟节点哈希值的计算方法调整为:对“节点的IP(或机器名)+虚拟节点的序号(1~N)”作哈希

案例演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class ConsistentHashing {
// 物理节点
private Set<String> physicalNodes = new TreeSet<String>() {
{
add("192.168.1.101");
add("192.168.1.102");
add("192.168.1.103");
add("192.168.1.104");
}
};

//虚拟节点
private final int VIRTUAL_COPIES = 1048576; // 物理节点至虚拟节点的复制倍数
private TreeMap<Long, String> virtualNodes = new TreeMap<>(); // 哈希值 => 物理节点

// 32位的 Fowler-Noll-Vo 哈希算法
// https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function
private static Long FNVHash(String key) {
final int p = 16777619;
Long hash = 2166136261L;
for (int idx = 0, num = key.length(); idx < num; ++idx) {
hash = (hash ^ key.charAt(idx)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;

if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}

// 根据物理节点,构建虚拟节点映射表
public ConsistentHashing() {
for (String nodeIp : physicalNodes) {
addPhysicalNode(nodeIp);
}
}

// 添加物理节点
public void addPhysicalNode(String nodeIp) {
for (int idx = 0; idx < VIRTUAL_COPIES; ++idx) {
long hash = FNVHash(nodeIp + "#" + idx);
virtualNodes.put(hash, nodeIp);
}
}

// 删除物理节点
public void removePhysicalNode(String nodeIp) {
for (int idx = 0; idx < VIRTUAL_COPIES; ++idx) {
long hash = FNVHash(nodeIp + "#" + idx);
virtualNodes.remove(hash);
}
}

// 查找对象映射的节点
public String getObjectNode(String object) {
long hash = FNVHash(object);
SortedMap<Long, String> tailMap = virtualNodes.tailMap(hash); // 所有大于 hash 的节点
Long key = tailMap.isEmpty() ? virtualNodes.firstKey() : tailMap.firstKey();
return virtualNodes.get(key);
}

// 统计对象与节点的映射关系
public void dumpObjectNodeMap(String label, int objectMin, int objectMax) {
// 统计
Map<String, Integer> objectNodeMap = new TreeMap<>(); // IP => COUNT
for (int object = objectMin; object <= objectMax; ++object) {
String nodeIp = getObjectNode(Integer.toString(object));
Integer count = objectNodeMap.get(nodeIp);
objectNodeMap.put(nodeIp, (count == null ? 0 : count + 1));
}

// 打印
double totalCount = objectMax - objectMin + 1;
System.out.println("======== " + label + " ========");
for (Map.Entry<String, Integer> entry : objectNodeMap.entrySet()) {
long percent = (int) (100 * entry.getValue() / totalCount);
System.out.println("IP=" + entry.getKey() + ": RATE=" + percent + "%");
}
}

public static void main(String[] args) {
ConsistentHashing ch = new ConsistentHashing();

// 初始情况
ch.dumpObjectNodeMap("初始情况", 0, 65536);

// 删除物理节点
ch.removePhysicalNode("192.168.1.103");
ch.dumpObjectNodeMap("删除物理节点", 0, 65536);

// 添加物理节点
ch.addPhysicalNode("192.168.1.108");
ch.dumpObjectNodeMap("添加物理节点", 0, 65536);
}
}

岛问题

单线程环境

思路:

ij列位置出发,向上下左右四个位置依次去感染

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Islands {
/**
* 岛问题

* @param m
* @return
*/
public static int countIslands(int[][] m) {
if (m == null || m[0] == null) {
return 0;
}
int N = m.length;
int M = m[0].length;
int res = 0;
for (int i = 0; i < N; i++) {
for (int j = 0; j < M; j++) {
if (m[i][j] == 1) {
res++;
//从当前(i,j)感染并返回结果
infect(m, i, j, N, M);
}
}
}
return res;
}

/**
* 感染函数

* @param m 矩阵m
* @param i 当前横坐标
* @param j 当前纵坐标
* @param N 矩阵长
* @param M 矩阵宽
*/
private static void infect(int[][] m, int i, int j, int N, int M) {
//违规条件
if (i < 0 || i >= N || j < 0 || j >= M || m[i][j] != 1) {
return;
}
//当前位置感染
m[i][j] = 2;
//向右感染
infect(m, i + 1, j, N, M);
//向左感染
infect(m, i - 1, j, N, M);
//向下感染
infect(m, i, j + 1, N, M);
//向上感染
infect(m, i, j - 1, N, M);
}
}

多线程环境

进阶问题:给你多个CPU,设计出并行算法解决 提示:并查集思想

若是两个CPU情况,分成两部分,记录每个部分的岛屿数量和边界点,对于边界点采用并查集合并,合并同时数量减少。

两个CPU分别处理会得到4个岛,但事实上只有一个岛,原本连通的被割断了,所以需要合并操作

多个CPU:收集四个边界信息,使用并查集

并查集

并查集操作

调用两操作总次数超过 $O(N)$,单词调用操作平均时间复杂度为 $O(1)$

1
2
boolean isSameSet(int a, int b); // 查询a和b是否属于一个集合
void union(int a, int b); // 把a所在的集合与b所在的集合合并在一起

并查集的初始化

  • 并查集由一群集合构成,最开始所有元素单独构成一个集合

  • 当集合中只有一个元素时,其父亲节点指向自身

  • 使用 fatherMap 保存并查集中元素的父亲节点,(key,value)表示key节点的 fathervalue

  • rank 是父亲节点所在集合的秩,表示该集合有多少元素,所有节点的 rank 信息保存在 rankMap

查找父亲节点

要查一个节点属于哪个集合,就是差其代表节点(父亲节点root)

一个节点通过 father 信息逐渐找到最上面的节点,该节点的 father 是自身,则为代表节点

路径压缩

通过 father 找到最上层节点后,会把最上层节点到该节点路径上的所有节点的 father 都设置成最上层节点

合并操作

并查集的合并操作是找到两个集合的代表节点,然后规定将小集合归并到大集合中

算法实现

并查集结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class UnionFind {


// 将集合中的单个元素封装成并查集结构
public static class Element<V> {
public V value;

public Element(V value) {
this.value = value;
}

}

//并查集操作
public static class UnionFindSet<V> {

// 元素 V 所对应的并查集结构
public HashMap<V, Element<V>> elementMap;

// 存储代表节点 key 的代表元素是 value
public HashMap<Element<V>, Element<V>> fatherMap;

// 存储代表节点所在集合的秩,代表节点所在集合的元素个数为value
public HashMap<Element<V>, Integer> rankMap;

// 初始化并查集
public UnionFindSet(List<V> list) {

// 初始化哈希结构
elementMap = new HashMap<>();
fatherMap = new HashMap<>();
rankMap = new HashMap<>();

// 对于集合中的单个元素封装成并查集结构
for (V value : list) {

// 将value封装成并查集节点
Element<V> element = new Element<V>(value);

// value-->对应并查集节点
elementMap.put(value, element);

// key的代表节点为value
fatherMap.put(element, element);

// key,代表节点所在集合元素个数初始化成1
rankMap.put(element, 1);
}
}

// 递归找代表节点
public Element<V> findFather(Element<V> element) {

Stack<Element<V>> path = new Stack<>();

while (element != fatherMap.get(element)) {
// 将不是代表节点的元素压栈
path.push(element);
// 继续向上查找
element = fatherMap.get(element);
}

// 路径压缩:将element到代表节点所在路径的元素的直接父亲设置成代表节点,降低时间复杂度
while (!path.isEmpty()) {
fatherMap.put(path.pop(), element);
}
return element;
}

// 判断两集合是不是属于一个集合
public boolean isSameSet(V a, V b) {

// 如果两个集合的代表节点一样,表示在一个集合中,返回true,否则不一样返回false
if (elementMap.containsKey(a) && elementMap.containsKey(b)) {
return findFather(elementMap.get(a)) == findFather(elementMap.get(b));
}

return false;
}

// 将a、b所在的两个集合合并
public void union(V a, V b) {

if (elementMap.containsKey(a) && elementMap.containsKey(b)) {

Element<V> aF = findFather(elementMap.get(a));
Element<V> bF = findFather(elementMap.get(b));

// 当处于不同集合才能合并
if (aF != bF) {

// 存储较大的集合
Element<V> big = rankMap.get(aF) >= rankMap.get(bF) ? aF : bF;
// 存储看较小的集合
Element<V> small = big == aF ? bF : aF;
// 将较小集合的直接代表节点设置成较大集合的代表节点
fatherMap.put(small, big);
// 更新秩
rankMap.put(big, rankMap.get(aF) + rankMap.get(bF));
// 删除小集合
rankMap.remove(small);
}
}
}
}
}