一、Pipelining简介

引用官方的一段介绍


A Request/Response server can be implemented so that it is able to process new requests even if the client didn’t already read the old responses. This way it is possible to send multiple commands to the server without waiting for the replies at all, and finally read the replies in a single step.


我们知道,redis是典型的C/S架构,客户端与服务端通信是通过TCP协议进行。在使用redis进行一次存/取数据的过程中,总的耗时是由以下三部分组成


发送请求到服务器+redis服务器存/取/处理数据+返回结果到客户端


由于redis操作数据是在内存中进行的,所以速度相当快,故此时大部分时间花在网络传输上。如果只是低频存取数据,那么影响不大;但当需要在短时间内做多次存取操作,那么其网络的瓶颈就相当明显了。为了解决此问题,Redis引入了管道技术,管道技术可以将多次操作的指令放在一次传输中进行,然后再将多次操作的结果封装在一起一次性返回,当需要大量存取数据时,使用管道技术可以大大减少网络传输总耗时。


二、如何使用管道

下面我以Java为例,演示如何简单使用管道技术进行存取数据


首先需要在pom.xml引入jedis依赖


<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
   <version>2.9.0</version>
</dependency>

实例化一个jedis对象

Jedis jedis = new Jedis("192.168.195.128",6379);

从jedis中获取Pipelined对象

Pipeline pipeline = jedis.pipelined();


批量存储数据


for(int i = 0;i<1000;i++){
     String content = i + "";
     pipeline.set(content,content);
 }
 pipeline.sync();

批量读取数据

在pipeline中,所有操作的返回结果会封装成response 对象,在执行pipeline.sync()方法前,该response对象内不会有内容,所以需要使用容器先将所有操作的response对象收集起来,在执行完sync方法后,再从此容器中取出所有的response对象,并获取其结果。

Map<String,Response> responses = new LinkedHashMap<String, Response>();
for(int i = 0;i<1000;i++){
     String content = i + "";
     Response<String> response = pipeline.get(content);
     responses.put(content,response);
}
pipeline.sync();
for(String key:responses.keySet()){
    System.out.println("key:"+key + ",value:" + responses.get(key).get());
}

三、性能测试

1、测试代码

package com.czh.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import java.io.IOException;
import java.util.*;
/**
 * 创建人:菜头君
 * <p>
 * 创建时间:2018-03-03 23:18
 * <p>
 * 描述:管道技术测试代码
 */
public class Main {
    static Long start = 0L;
    static Long time = 0L;
    public static void main(String[] args) throws IOException {
        //测试一条数据
        testInsertAndGet(1);
        //测试十条数据
        testInsertAndGet(10);
        //测试百条数据
        testInsertAndGet(100);
        //测试千条数据
        testInsertAndGet(1000);
        //测试万条数据
        testInsertAndGet(10000);
        //测试十万条数据
        testInsertAndGet(100000);
        //测试百万条数据
        testInsertAndGet(1000000);
    }
    /**
     * 测试插入与查询
     * @param count 测试的数据量
     * @throws IOException
     */
    public static void testInsertAndGet(int count) throws IOException {
        Jedis jedis = new Jedis("192.168.195.128",6379);
        Map result = new HashMap<String,String>(count);
        Pipeline pipeline = jedis.pipelined();
        start();
        for(int i = 0;i<count;i++){
            String content = i + "";
            jedis.set(content,content);
        }
        end();
        System.out.println("未使用管道插入数据,耗时:" + time + "ms");
        jedis.flushAll();
        start();
        for(int i = 0;i<count;i++){
            String content = i + "";
            pipeline.set(content,content);
        }
        pipeline.sync();
        end();
        System.out.println("已使用管道插入数据,耗时:" + time + "ms");
        start();
        for(int i = 0;i<count;i++){
            String content = i + "";
            String value = jedis.get(content);
            result.put(content,value);
        }
        end();
        System.out.println("未使用管道查询数据,耗时:" + time + "ms");
        result.clear();
        start();
        Map<String,Response> responses = new LinkedHashMap<String, Response>(count);
        for(int i = 0;i<count;i++){
            String content = i + "";
            Response<String> response = pipeline.get(content);
            responses.put(content,response);
        }
        for(String key:responses.keySet()){
            result.put(key,responses.get(key));
        }
        end();
        System.out.println("已使用管道查询数据,耗时:" + time + "ms");
        pipeline.close();
        jedis.flushAll();
        System.out.println("本轮测试结束,测试数据量为:" + count);
    }
    /**
     * 计时开始
     */
    public static void start(){
        start = System.currentTimeMillis();
    }
    /**
     * 计时结束
     */
    public static void end(){
        time = System.currentTimeMillis() - start;
    }
}

2、测试环境

这次只是简单测试,所以我直接在我的小米笔记本上进行测试了。我建立了一个vm虚拟机作为服务器,再在虚拟机中的docker运行了redis镜像;客户端则直接使用IDEA运行代码。

环境 操作系统 CPU 内存 JDK Jedis/Redis版本
客户端 win10 64位家庭版 四核 8G 1.8 2.9.0
服务端 CentOS7 64位1708 单核 1G 4.0.8

3、测试报告

控制台输出的信息如下:

未使用管道插入数据,耗时:12ms

已使用管道插入数据,耗时:12ms

未使用管道查询数据,耗时:0ms

已使用管道查询数据,耗时:0ms

本轮测试结束,测试数据量为:1

未使用管道插入数据,耗时:4ms

已使用管道插入数据,耗时:0ms

未使用管道查询数据,耗时:4ms

已使用管道查询数据,耗时:4ms

本轮测试结束,测试数据量为:10

未使用管道插入数据,耗时:31ms

已使用管道插入数据,耗时:0ms

未使用管道查询数据,耗时:70ms

已使用管道查询数据,耗时:0ms

本轮测试结束,测试数据量为:100

未使用管道插入数据,耗时:520ms

已使用管道插入数据,耗时:4ms

未使用管道查询数据,耗时:469ms

已使用管道查询数据,耗时:4ms

本轮测试结束,测试数据量为:1000

未使用管道插入数据,耗时:4629ms

已使用管道插入数据,耗时:40ms

未使用管道查询数据,耗时:4542ms

已使用管道查询数据,耗时:23ms

本轮测试结束,测试数据量为:10000

未使用管道插入数据,耗时:45917ms

已使用管道插入数据,耗时:208ms

未使用管道查询数据,耗时:45837ms

已使用管道查询数据,耗时:140ms

本轮测试结束,测试数据量为:100000

未使用管道插入数据,耗时:463291ms

已使用管道插入数据,耗时:2671ms

未使用管道查询数据,耗时:459813ms

已使用管道查询数据,耗时:2226ms

本轮测试结束,测试数据量为:1000000


生成表格信息如下


用例 1 10 100 1000 10000 100000 1000000
插入(未使用管道) 12ms 4ms 31ms 520ms 4629ms 45917ms 463291ms
插入(使用管道) 12ms <1ms 0ms 4ms 40ms 208ms 2671ms
查询(未使用管道) <1ms 4ms 70ms 469ms 4542ms 45837ms 459813ms
查询(使用管道) <1ms 4ms 0ms 4ms 23ms 140ms 2226ms

绘制成图表数据更直观
插入操作耗时对比

查询操作耗时对比

4、结论

当使用了管道后,无论是高频率的存还是取数据,其性能均大大提高,相同条件下,当数据量达到百万级别时,使用管道后性能可提升惊人的200倍以上!当然上面的测试也并非标准严苛的测试,其结果也仅供参考。


来源:https://blog.csdn.net/qq_22152261/article/details/79434206