Spring-data-redis 2.0.7.RELEASE

executePipelined方法说明

executePipelined方法基于Redis的pipelining。关于pipelining官方解释如下:

A Request/Response server can be implemented so that it is able to process new requests even if the client didn’t already read the old responses. This way it is possible to send multiple commands to the server without waiting for the replies at all, and finally read the replies in a single step.
—摘自Redis Piplining

上面的意思大概是Redis服务器可以实现即使没有读取旧响应的情况下也可以发送新的请求,以这种方式可以发送多个命令到服务器而不用等待回复,最后一次获取全部的回复。这就是Redis Pipelining。

executePipelined的官方注释:

Executes the given action object on a pipelined connection, returning the results. Note that the callback cannot return a non-null value as it gets overwritten by the pipeline. This method will use the default serializers to deserialize results.

上面这句话的意思在一个管道连接中执行给定的动作对象,并返回结果。但是需要注意的是callback不能返回一个非null的值,callback的值将被pipeline覆盖。这个方法将使用默认的序列化和反序列化方式处理结果集。

例举executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) 方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {
Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(session, "Callback object must not be null");
RedisConnectionFactory factory = this.getRequiredConnectionFactory();
// 是否开启事务管理,将当前连接注册到事务管理器
RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);

List var4;
try {
// this.execute其实就是execute,所以本质上来说,两个方法的区别在于executePipeline方法开启了pipeline
var4 = (List)this.execute((connection) -> {
// 开启管道
connection.openPipeline();
boolean pipelinedClosed = false;

List var7;
try {
// 在连接中执行SessionCallback中的动作,并获取结果集
Object result = this.executeSession(session);
// 如果结果集不为空,抛出InvalidDataAccessApiUsageException
if (result != null) {
throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the pipeline");
}

List<Object> closePipeline = connection.closePipeline();
pipelinedClosed = true;
// 获取管道返回的结果集并序列化
var7 = this.deserializeMixedResults(closePipeline, resultSerializer, this.hashKeySerializer, this.hashValueSerializer);
} finally {
if (!pipelinedClosed) {
connection.closePipeline();
}

}
// 返回管道的结果集
return var7;
});
} finally {
RedisConnectionUtils.unbindConnection(factory);
}

return var4;
}

上述代码中this.execute((connection)...其实就是execute(RedisCallback<?> session)方法。所以executePipelined方法只是在execute内中开启了pipeline而已。

execute方法说明

execute相对于executePipelined(SessionCallback<?> session)比较简单,没有过多的处理,而是直接执行SessionCallback中的动作,官方注释如下:

Executes a Redis session. Allows multiple operations to be executed in the same session enabling ‘transactional’ capabilities through RedisOperations.multi() and RedisOperations.watch(Collection) operations.

大概意思是执行一个Redis会话。允许在同一会话中执行多个操作,通过RedisOperations.multi()和RedisOperations.watch(Collection)操作启用“事务”功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

public <T> T execute(SessionCallback<T> session) {
Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(session, "Callback object must not be null");
RedisConnectionFactory factory = this.getRequiredConnectionFactory();
RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);

Object var3;
try {
// 执行SessionCallback并获取执行SessionCallback返回的结果集
var3 = session.execute(this);
} finally {
RedisConnectionUtils.unbindConnection(factory);
}
// 直接返回结果集
return var3;
}

execute和executePipelined区别

从上面两段源码示例可以看出,execute和executePipelined的最主要区别是executePipelined开启了pipeline。pipline与execute正常的请求/响应的区别主要在于请求/响应模式上,execute是串行化的命令请求,executePipelined请求与响应则是穿插进行。两者区别如图所示:
串行:
redis-execute

execute方法是串行的,命令请求发出后,必须得到响应数据,才能发送下一条命令请求。所以在一次Redis会话中,一次会话可能包含多次请求,即多次RTT。

穿插:

redis-executepipeline.png

executePipelined是穿插的,可以批量发送命令到服务器,也可以批量获取响应数据。即可能使用一次RTT就能完成批量操作。

除了网络协议上的区别外,execute和executePipelined都支持事务管理器,支持multi,watch,exec,discard等事务操作,不过execute与executePipelined的这些操作还是有些区别的。

例如 JedisConnection开启multi:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

public void multi() {
if (!this.isQueueing()) {
try {
if (this.isPipelined()) {
// 如果开启了pipeline,则使用Pipeline内部的multi
// Pipeline绑定的是client
this.getRequiredPipeline().multi();
} else {
// 没有开启pipeline使用jedis的multi
// jedis的multi绑定的是connection
this.transaction = this.jedis.multi();
}
} catch (Exception var2) {
throw this.convertJedisAccessException(var2);
}
}
}

jedis与Pipeline的命令绑定的对象不一样,前者绑定的是connection,后者绑定client。这和上面提到的请求/响应模式有关。
watch,exec,discard等操作也是如此。

除此之外,executePipelined的SessionCallback是不能有返回值的,executePipelined需要返回Pipleline的返回值。

参考资料

Redis客户端与服务端交互详解

pipelining