Redis
是一个高性能的Nosql
内存数据库。代码精简,性能和扩展性强,被广泛用于互联网应用之中。许多语言也都支持redis,并实现了其客户端驱动。Python的redis驱动写得非常好(以下简称redis.py),通过阅读redis.py可以学习redis的通信协议,网络客户端的编程以及连接池管理等技术,我们也将通过对这三部分的逐一解析来学习redis.py。
目录:
- Redispy 源码学习(一) --- 概览
- Redispy 源码学习(二) --- RESP协议简介
- Redispy 源码学习(三) --- RESP协议实现--编码
- Redispy 源码学习(四) --- 创建连接
- Redispy 源码学习(五) --- RESP协议实现--解码
- Redispy 源码学习(六) --- 连接池
- Redispy 源码学习(七) --- 客户端接口
- Redispy 源码学习(八) --- 多线程和阻塞连接池
Redis 通信协议
redis设计了一个非常简单高效的通信协议RESP
---REdis Serialization Protocol,该协议基于TCP的应用层协议。在编码RESP协议的时候,我们需要学习字符的编码与解码。由于TCP是流(stream)模式,并没有边界,因此关于抽象出来的包
的边界,将会是RESP
协议的重点处理方式。同时也会发现RESP
设计比较巧妙。
网络通信
redis.py是redis的python客户端驱动,因此我们只需要实现客户端的逻辑,服务端当然就是redis服务器本身。简而言之,就是我们需要使用python实现一个redis-cli
。虽然客户端的socket编码不比服务端的复杂,可是要是处理不当,同样也会带来诸多问题。构建一个健壮的客户端是写好服务端的基础。
学习了RESP
的编码与解码之后,我们就需要借助socket把网络数据发送给redis服务器,同时介绍服务器的应答,完成客户端对数据库的操作。
连接池
redis.py是一个redis的客户端,主要任务是发送命令到redis服务器。对于客户端而言,与服务器的通信基于tcp的socket,客户端的生命周期,自然而然就需要创建连接,发送数据,关闭连接等基本操作。
连接的频繁创建与销毁也会消耗资源,引入连接池管理连接将会是一种比较好的解决方式。redis.py的连接池写得很不错,我们也会从中受益良多。
redis.py的软件架构
一个大型的系统需要一个良好的设计和架构。小的软件或者脚本也离不开好的设计结构。redis.py作为python的客户端,封装了很多redis命令的接口。因此在python中使用redis将非常方便和优雅。
分布式
redis提供分布式功能,我们也会针对其分布式实现和使用解释其原理。
阅读方式
在阅读redis.py源码的时候,尝试自己实现一个驱动将会对学习理解提供莫大帮助,同时也能带来成就感。因此我们将使用Python3的编码环境,以单文件为基础,实现一个简易的redis-like.py。
redis.py 的架构概览
软件结构
下面我们就先对redis.py做一个简单地概览。redis.py已经2.10
版本。其文件结构如下:
? redis tree
.
├── __init__.py
├── _compat.py
├── client.py
├── connection.py
├── exceptions.py
├── lock.py
├── sentinel.py
└── utils.py
- _compat.py 用于处理python2和python3不兼容的函数,封装并提供统一的接口。
- client.py 该文件提供接口给python代码使用。
- connection.py 该文件非常重要,实现了对redis服务器的连接创建销毁和socket收发过程。
- exceptions.py 自定义异常
- lock.py sentinel.py 用于分布式相关的操作
- utils.py 工具函数库
redis.py的作者把软件设计很清晰。不过我们可以先忽略这些结构,基于一个文件实现上面的功能。把核心功能实现之后,再拆分和组织代码结构。
创建客户端,初始化连接池
下面一段简单的使用代码:
import redis
rc = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
print(rc.ping())
print(rc.get('hello'))
StrictRedis创建一个客户端 rc(redis_cli),其内部创建一个连接池,当调用ping
方法的时候,rc才会创建连接。再次调用get
方法的时候,rc会从连接池中读取连接,执行命令。当连接池没有可用连接,rc又会自动创建连接。总之,redis在执行命令的时候,一旦连接坏了,就会清理释放连接,然后重建新连接,并重新执行命令。
与服务器通信入口
无论ping
还是get
方法,调用的都是StrictRedis
的execute_command
方法。
def execute_command(self, *args, **options):
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
pool.release(connection)
execute_command
方法中,先从连接池中get一条连接,然后调用send_command
发送命令给redis服务器,接着调用parse_response
方法读取redis的返回结果。
如果执行发送命令或者读取结果的时候发生异常,将会主动disconnect,即释放客户端的连接资源(如果连接已经断开,就清理对象)。然后再重新发送。等到通信完毕之后,再把连接释放回连接池。
之所以要清理连接对象,是因为在python代码上下文中,逻辑连接还是正常,只不过实际上的tcp连接已经close了。此时要同步逻辑连接和实际的连接。
发送命令
send_command
是连接对象的方法,执行该方法之前将会把命令参数按照RESP协议编码。并调用send_packed_comand
方法,后者会检查连接是是否存在,如果不存在,将会创建连接。这一步就是rc的惰性创建连接入口。
def send_packed_command(self, command):
if not self._sock:
self.connect()
try:
if isinstance(command, str):
command = [command]
for item in command:
self._sock.sendall(item)
except socket.timeout:
self.disconnect()
raise TimeoutError("Timeout writing to socket")
except socket.error:
e = sys.exc_info()[1]
self.disconnect()
if len(e.args) == 1:
errno, errmsg = 'UNKNOWN', e.args[0]
else:
errno = e.args[0]
errmsg = e.args[1]
raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg))
except:
self.disconnect()
raise
该方法会调用self._sock.sendall(item)
将redis命令发送到服务器。
连接与连接池
调用send_packed_command
之前就从连接池中读取连接。
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection
可以该方法会从可用的连接池对象中pop一个连接,如果连接不存在,那么就调用make_connection
创建连接并返回。然后才能使用send_packed_command
发送数据。
读取响应
发送的过程并不复杂,接收的过程则比较讲究。后面的我们会详细分析,在此只需要有个大概认识就行了。
parse_response
方法会调用连接对象的read_response
方法,后者会调用self._parser.read_response()
。这个 _parser
对象为了兼容Hiredis
而做的一个适配器。主要功能就是封装hiredis,提供统一的处理连接管理和数据缓冲的接口。默认使用PythonParse
类。
_parser
对象有一个_buffer
属性,后者是一个SocketBuffer
类,主要封装了对socket的接收功能,即从socket的缓冲区读取数据,通过BytesIO
写入到内存,然后从内存中读取数据。通过计算对比内存中的数据和读取的数据,控制从socket中读取的数据。这个精妙的设计我们后面会详细介绍。
再看_parse
对象read_response
方法:
def read_response(self):
response = self._buffer.readline()
if not response:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
byte, response = byte_to_chr(response[0]), response[1:]
if byte not in ('-', '+', ':', '$', '*'):
raise InvalidResponse("Protocol Error: %s, %s" %
(str(byte), str(response)))
if byte == '-':
response = nativestr(response)
error = self.parse_error(response)
if isinstance(error, ConnectionError):
raise error
return error
# single value
elif byte == '+':
pass
# int value
elif byte == ':':
response = long(response)
# bulk response
elif byte == '$':
length = int(response)
if length == -1:
return None
response = self._buffer.read(length)
# multi-bulk response
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self._buffer.read(length)() for i in xrange(length)]
if isinstance(response, bytes) and self.encoding:
response = response.decode(self.encoding)
return response
调用self._buffer.readline()
方法从socket读取数据。然后根据RESP协议处理redis的回复类型,接着逐一解析返回的数据,并返回。注意,遇到RESP中的批量回复(bulk response)和多批量回复(multi-bulk response),还需要调用self._buffer.read()
和递归调用self._buffer.read(length)
解析。
根据self._buffer.readline()
只能读取返回一行的数据,因为redis是用\r\n
区分数据,因此调用readline的时候,在批量回复和多批量回复的情况下,只能读取最前面的参数,后面的socket数据还在socket的缓冲区,所以需要继续调用read方法读取解析。后面的分析我们将会了解,多批量回复可以分解为多个批量回复,因此就与了迭代response的递归调用response = [self.read_response() for i in xrange(length)]
。这也是经典的tcp流无边界问题的处理方式。
断开连接
读取响应之后,交互就完成了使命。redis维护的是一个长
连接,也就是根据redis的timeout参数来决定连接的空闲时间。默认配置是0,即如果redis不主动close这个连接,连接将会一直存在。所以客户端不会主动disconnect连接,而是释放其回到连接池pool.release(connection)
。前面我们已经提到,只要交互过程中发生了异常,客户端才会主动调用disconnect方法释放与连接相关的资源和对象。
如果设置了redis连接的最大空闲时间: CONFIG SET TIMEOUT 30。那么每个redis的连接在30s之后,服务器都会主动close。此时的客户端还认为连接是正常的,执行收发数据的时候将会抛异常。这时就需要同步客户端和服务器的连接状态。
上面的过程可以用下面的流程图简要的说明:
+-----------------------+
+------------+ | pool |
| | |-----------------------|
| client | 1 | |
| ++-------------> | connection_pool |
+-----+------+ | |
| 2 | |
+---------------------> | execute_command |
+----------+------------+
|
|3
|
|
v
+-----------------------+ +---------------------+ +--------------------+
| pool | | pool | | connection |
|-----------------------| 4 |---------------------| |--------------------|
| get_conncetion ++-------->| | 6 | |
| | | pop connection | <-----------+| init a conn |
| send_command +-----+ | | 5 | |
16 | | | | make_connection |+------------>| conect |
+-------------------+ parse_response | | | | | |
| | | | +---------------------+ +------------+-------+
| | release | | ^ |
| +-----------------------+ +7 | |
| | | |10
| | | |
| | | |
+--------------------------+ v | v
| connection | +-------------------+ +---------------------------------+ | +-------------------+ +---------------+
|--------------------------| | | | connection | | | connection | | connection |
| | | | |---------------------------------| | |-------------------| |---------------|
| _parse.read_response | | | 8 | | | | | | |
| | | pack command |<------+| pack_command | | | | 11 | |
+-----------+--------------+ | |------->| | | | create socket +--------| on_connect |
| +-------------------+ +----------------+----------------+ | | | | |
| | | | | | |
|17 v | +--------+----------+ +------+--------+
v +---------------------------------+ | | +
+--------------------------+ | connection | | | |
| pythonparse | |---------------------------------| | | |
|--------------------------| | | 9 | | |12
| | 18 | check sock connect |+--------+ | |
| _buffer.readline +------------+ | |<---------------------+ |
| | | + sock sendall | v
| | | | |<--------------------------------+ +----------------+
| handle response | | +-----------------+---------------+ | | PythonParse |
| | | | | |----------------|
| | | |15 | | |
+----------+---------------+ | | | | on_connect |
| | v | | |
| | +-----------------+----------------+ | | |
| | | | | +-------+--------+
| | | end | | |
|19 | | | |14 |
| | +----------------------------------+ | |
v v | |13
+---------------------------+ +----------------------------+ +-------------------------------+ | |
| pythonparse | | ScoketBuffer | | SocketBuffer | | v
|---------------------------| |----------------------------| |-------------------------------| | +------------------+
| | | | | | | | |
| _buffer.read | | readline | 21 | | | |------------------|
| | 20 | |+---------------->| read from socket | | | |
| |------>| read | | | | |
| | | | | | +-------+ init SocketBuffer|
+---------------------------+ +----------------------------+ +-------------------------------+ | |
+------------------+
- StrictRedis创建客户端对象,并初始化连接池
- 执行ping命令
- 调用execute_command 从使用 get_connection 从连接池读取连接,然后发送命令,接着解析返回的响应,最后释放连接。
- get_connection 调用,要不重连接池中pop一个连接,如果连接不存在,则调用make_connection 方法创建连接。
- 初始化连接对象。
- 返回连接对象
- 执行send_command 函数,打包编码resp协议的命令。
- 编码resp过程。
- 检查连接是否存在,如果存在则发送socket数据。如果不存在,则调用connect方法创建连接对象。
- 创建socket,用于网络通信。
- 连接创建之后,调用连接的on_connect方法。
- 调用 pythonparse的on connect方法。初始化socketbuffer对象,用于接受数据时候的socket通信。
- 初始化 socketbuffer对象。
- 逐步返回连接对象,直到可以sendall数据到服务器。
- 结束发送过程。
- 调用parse_response 方法,用于读取服务器返回的响应数据
- 逐步回溯调用pythonparse封装的方法读取一行数据。
- 通过socketbuffer读取一行数据
- 遇到批量回复或多批量回复,调用read读取除token之后的数据。
- 与19类似,递归处理多批量回复。
- 从socket读取数据。
总结
经过上面的简述,我们对redis.py的大致框架和功能有了初步的了解,接下来就是针对上面所提及的三个方面深入解析。
RESP协议的学习比较简单,连接池的设计也不会很难,比较核心的关键是网络通信相关的处理。收发数据是我们的核心重点,连接管理也是举足轻重。一个经典的问题就是客户端的代码逻辑上的连接还存在,可是实际的tcp连接已经close,此时的收发数据该如何处理和管理呢?这将成为我们接下来阅读redispy的关键。
接下来将会在分别介绍redispy源码的时,提供文中使用的客户端测试代码,于文末的gist提供。