AI模型部署:Triton+TensorRT部署Bert文本向量化服务实践

关键词:Triton,TensorRT,Bert

前言

本篇介绍以Triton作为推理服务器,TensorRT作为推理后端,部署句嵌入向量模型m3e-base的工程方案和实现,句嵌入模型本质上是Bert结构,本案例可以推广到更一般的深度学习模型部署场景。


内容摘要

  • 推理服务器和推理后端介绍
  • TensorRT+Triton环境搭建
  • Bert模型转化为ONNX中间表示
  • ONNX中间表示编译为TensorRT模型文件
  • Triton服务端参数配置
  • Triton服务端代码实现
  • Triton服务端启动
  • HTTP客户端请求
  • TensorRT前后压测结果对比

推理服务器和推理后端介绍

在Triton+TensorRT的组合中,Triton是推理服务器,TensorRT是推理后端,两者都是NVIDIA推出的推理部署服务组件,Triton原名TensorRT Inference Server,是专供于TensorRT后端的推理服务器,由于TensorRT Inference Server支持的后端越来越多,因此其改名为Triton,Triton+TensorRT一直是NVIDIA主推的部署方式。
模型的推理服务由两块组件构成,分别是推理服务器和推理后端,推理服务器负责处理客户端的请求输送给推理后端,推理后端负责输出推理结果,经过推理服务器返回给客户端,两者的工作示意图如下

推理服务器和推理后端示意图

推理服务的优化需要推理服务器和推理后端的共同优化,其中推理后端的优化主要体现在推理引擎内核的优化使得推理的性能更高,延迟更低,推理服务器的优化主要体现在推理策略、调度策略的优化,通过辅助策略来协助后端更好地推理,提高吞吐量。
推理服务器接受客户端请求,并为后端推理提供必要的前提准备,包括

  • 前处理:对请求的数据进行预先处理,使得服务后端模型的要求,例如NLP领域,对输入的自然语言使用tokenizer进行分词编码
  • 后处理:对后端模型的推理结果做再加工,符合客户端的需求,比如对输出的矩阵进行reshape转换,label映射等
  • 模型预热:在服务完全启动之前,提供一些样例数据给到模型推理,使得模型完全初始化
  • 动态批处理:服务端自动将请求时间接近的所携带的数据进行合并,从而批量推理提高GPU的利用率,等待请求的合并会带来一定的延迟
  • 并发多实例:推理服务器设置多个实例来共同处理请求,分配资源来处理模型的负载,提高并发请求下的服务吞吐量

推理后端又分别推理表示和推理引擎,其内容如下

  • 推理表示:即模型格式,是模型训练后输出的模型文件,例如Tensorflow的冻结图,PyTorch的bin格式
  • 推理引擎:即支持该类模型格式的推理计算组件,Tensorflow和PyTorch这样的训练框架本身自带推理引擎,也有其他的更加优化的推理引擎,比如ONNXRuntime等。

业界常用的推理服务器和推理后端罗列如下

推理服务器和推理后端选型

本文要介绍的是以Triton作为推理服务器,以TensorRT作为推理后端的部署方案,其中Triton中的后端程序由Python实现,模型格式为TensorRT,使用Python后端下的TensorRT包实现对模型推理。


TensorRT+Triton环境搭建

笔者的环境为NVIDIA显卡驱动driver版本为535.154.05,cuda版本为12.2。下载Triton的Docker镜像,到NVIDIA查看符合cuda版本的镜像。 下载23.08版本的Triton镜像,对应的TensorRT版本为8.6.1.6,该镜像提供了推理服务器环境,是模型服务的基础镜像,该镜像的Python3版本为3.10。

docker pull nvcr.io/nvidia/tritonserver:23.08-py3

下载23.08版本的TensorRT镜像,该镜像的作用是使用trtexec将onnx模型格式转化为trt格式

docker pull nvcr.io/nvidia/tensorrt:23.08-py3

手动下载8.6.1.6版本的TensorRT,下载的目的是手动安装TensorRT的Python包,在推理的时候需要TensorRT的Python API实现推理

TensorRT官网下载

解压

tar -xzvf TensorRT-8.6.1.6.Linux.x86_64-gnu.cuda-12.0.tar.gz

下一步搭建基础镜像环境,需要在tritonserver镜像中安装如下Python包依赖,参考版本如下

torch                 2.1.2+cu121
transformers          4.39.3
tensorrt              8.6.1
sentence-transformers 2.7.0
pycuda                2022.2.2

手动下载torch gpu版本和cuda,和TensorRT解压后的文件夹到pip_package下,一起映射到容器

# docker启动Triton镜像
docker run --rm -it -v /home/pip_package/:/home nvcr.io/nvidia/tritonserver:23.08-py3 /bin/bash
 
# 安装tensorrt
cd /home
pip install TensorRT-8.6.1.6/python/tensorrt-8.6.1-cp310-none-linux_x86_64.whl
 
# 安装torch
pip install torch-2.1.2+cu121-cp310-cp310-linux_x86_64.whl -i https://pypi.tuna.tsinghua.edu.cn/simple
 
# 安装transformers
pip install transformers -i https://pypi.tuna.tsinghua.edu.cn/simple
 
# 安装sentence-transformers
pip install sentence-transformers -i https://pypi.tuna.tsinghua.edu.cn/simple
 
# 安装pycuda
pip install pycuda -i https://pypi.tuna.tsinghua.edu.cn/simple

将容器保存为一个新的镜像,至此环境搭建完毕

docker commit xxxxxxx triton_tensorrt_py_23.08:v1

Bert模型转化为ONNX中间表示

使用TensorRT作为后端推理模型必须将模型转化为trt格式,目前众多模型都支持转化为trt,但是支持程度层次不起,其中TensorRT对模型中间表示ONNX支持的最好,因此一般的做法是将tensorflow,pytorch的模型文件先转化为ONNX,再从ONNX转化为trt格式。
首先将m3e-base模型转化为ONNX格式,PyTorch API支持直接转化

from transformers import BertModel
 
model = BertModel.from_pretrained("./m3e-base").eval()
 
 
import torch
onnx_path = "./m3e-base.onnx"
 
input_ids = torch.LongTensor([[1, 2, 3], [2, 3, 4]])
attention_mask = torch.LongTensor([[1, 1, 1], [1, 1, 1]])
 
torch.onnx.export(model,
                  (input_ids, attention_mask),
                  onnx_path,
                  verbose=False,
                  opset_version=11,
                  input_names=['input_ids', 'attention_mask'],
                  output_names=['output'],
                  dynamic_axes={"input_ids": {0: "batch_size", 1: "max_seq_len"},
                                "attention_mask": {0: "batch_size", 1: "max_seq_len"},
                                "output": {0: "batch_size"}
                                })

其中input_names和output_names取名可以自定义,输入的顺序必须和模型forward顺序一致,dynamic_axes代表不定长的动态维度,指定维度索引和一个自定义命名,本例中input_ids,attention_mask的0,1维度都是不定长,output的0维度是不定长


ONNX中间表示编译为TensorRT模型文件

下一步将ONNX文件转化为trt格式,将m3e-base.onnx(/home/model_repository/目录下)映射到tensorrt容器内,使用trtexec进行转换,需要将宿主机的gpu挂入容器内

docker run --gpus=all --rm -it -v /home/model_repository/:/home nvcr.io/nvidia/tensorrt:23.08-py3 /bin/bash
trtexec --onnx=m3e-base.onnx \
--workspace=10000 \
--saveEngine=m3e-base.trt \
--minShapes=input_ids:1x1,attention_mask:1x1 \
--optShapes=input_ids:16x512,attention_mask:16x512 \
--maxShapes=input_ids:64x512,attention_mask:64x512

若日志显示PASSED代表转化成功,若显示空间不足报错请适当增大workspace,其中saveEngine代表模型输出的文件命中,minShapes,optShapes,maxShapes代表支持动态输入,指定最小尺寸和最大尺寸。转化完成后输出trt文件m3e-base.trt,将其映射到triton_tensorrt_py_23.08:v1容器中,测试是否能够正常被tensorrt的Python API读取

docker run --rm -it --gpus=all -v /home/model_repository:/home triton_tensorrt_py_23.08:v1 /bin/bash

用容器内的Python3来读取trt文件

root@a10830d0aeec:/home# python3
Python 3.10.12 (main, Jun 11 2023, 05:26:28) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tensorrt as trt
>>> TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
>>> def get_engine(engine_file_path):
...     print("Reading engine from file {}".format(engine_file_path))
...     with open(engine_file_path, "rb") as f, trt.Runtime(TRT_LOGGER) as runtime:
...         engine = runtime.deserialize_cuda_engine(f.read())
...         return engine
...
>>> engine_model_path = "m3e-base.trt"
>>> engine = get_engine(engine_model_path)
Reading engine from file m3e-base.trt
>>>

若没有报错则表示trt模型转化成功


Triton服务端参数配置

Triton部署模型的服务端有严格的文件目录和文件形式要求,首先创建一个模型目录命名为m3e-base-trt,本例的文件格式如下

(base) [root@localhost m3e-base-trt]# tree
.
├── 1
│   ├── m3e-base-trt
│   │   ├── m3e.trt
│   │   └── vocab.txt
│   ├── model.py
│   ├── __pycache__
│   │   ├── model.cpython-310.pyc
│   │   ├── model.cpython-38.pyc
│   │   ├── trtutils.cpython-310.pyc
│   │   └── trtutils.cpython-38.pyc
│   ├── trtutils.py
│   └── work
│       └── version.txt
└── config.pbtxt

该目录下的1代表模型版本,可以取任意数字id作为文件名代表模型版本,默认情况下Triton以最大的那个模型版本作为服务。config.pbtxt为模型的服务端配置文件,配置如下

(base) [root@localhost m3e-base-trt]# cat config.pbtxt
name: "m3e-base-trt"
backend: "python"
 
max_batch_size: 32
input [
    {
        name: "text"
        dims: [ -1 ]
        data_type: TYPE_STRING
    }
]
output [
    {
        name: "embeddings"
        dims: [ 768 ]
        data_type: TYPE_FP32
    }
]
 
instance_group [
{
  count: 2
  kind: KIND_GPU
  gpus: [ 0 ]
}
]
dynamic_batching {
    max_queue_delay_microseconds: 2000
}

该文件决定了模型的输入输出的维度,服务策略等内容,重点内容如下

  • backend:推理后端,本例采用Python实现的自定义客户端,在Python中使用了tensorrt的API,因此本质上是tensorrt的后端
  • max_batch_size:一次推理的最大批次,超过该值会报错,max_batch_size通常和动态批处理dynamic_batching一起使用,max_batch_size会作为停止合并的一个条件
  • input/output:输入和输入的定义,变量名自定义,但是必须和Python后端脚本一致,dims代表维度,-1代表不定长,data_type代表类型,具体使用请参考Triton的教程
  • instance_group:多实例设置,kind代表设备,KIND_GPU为GPU设备,也可以指定CPU,gpus指定GPU设备号,多个id就是指定多gpu,count代表实例数,具体是每个GPU/CPU下的实例数,本例中代表0号GPU启2个实例
  • dynamic_batching:动态批处理,服务端会自动合并请求,从而尽量以批量推理的方式来代替单条请求推理,提高吞吐量,因此服务端会主动等待max_queue_delay_microseconds时间,在这段时间内服务端会将所有请求合并,合并完成后再输送给推理后端,推理完成后合并的结果会再拆成单条请求的形式,因此对客户端无感。默认情况下如果不设置dynamic_batching,Triton不会进行动态批处理

自定义Python客户端需要在版本号文件夹下设置一个model.py文件,该文件内部实现了后端推理逻辑,work目录为服务运行过程中自动生成,不需要理会。


Triton服务端代码实现

服务端代码实现在model.py中,具体的在其中实现trt文件的读取,客户端数据的获取,模型推理,响应返回,本例如下

import os
 
# 设置显存空闲block最大分割阈值
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:32'
# 设置work目录
 
os.environ['TRANSFORMERS_CACHE'] = os.path.dirname(os.path.abspath(__file__)) + "/work/"
os.environ['HF_MODULES_CACHE'] = os.path.dirname(os.path.abspath(__file__)) + "/work/"
 
import json
import triton_python_backend_utils as pb_utils
import sys
import gc
import time
import logging
from transformers import BertTokenizer
import tensorrt as trt
import numpy as np
import torch
 
import trtutils as trtu
 
gc.collect()
 
logging.basicConfig(format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                    level=logging.INFO)
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
 
 
class TritonPythonModel:
    def initialize(self, args):
        # You must parse model_config. JSON string is not parsed here
        self.model_config = json.loads(args['model_config'])
        output_config = pb_utils.get_output_config_by_name(self.model_config, "embeddings")
 
        # Convert Triton types to numpy types
        self.output_response_dtype = pb_utils.triton_string_to_numpy(output_config['data_type'])
 
        # trt engine
        def get_engine(engine_file_path):
            print("Reading engine from file {}".format(engine_file_path))
            with open(engine_file_path, "rb") as f, trt.Runtime(TRT_LOGGER) as runtime:
                engine = runtime.deserialize_cuda_engine(f.read())
                return engine
 
        engine_model_path = os.path.dirname(os.path.abspath(__file__)) + "/m3e-base-trt"
        self.engine = get_engine(engine_model_path + "/m3e-base.trt")
        self.context = self.engine.create_execution_context()
        self.context.active_optimization_profile = 0
        self.tokenizer = BertTokenizer.from_pretrained(engine_model_path + "/vocab.txt")
        logging.info("model init success")
 
    def execute(self, requests):
        responses = []
        # TODO 记录下每个请求的数据和数据batch大小
        batch_text, batch_len = [], []
        for request in requests:
            text = pb_utils.get_input_tensor_by_name(request, "text").as_numpy().astype("S")
            text = np.char.decode(text, "utf-8").squeeze(1).tolist()
            batch_text.extend(text)
            batch_len.append(len(text))
        # 日志输出传入信息
        in_log_info = {
            "text": batch_text,
        }
        logging.info(in_log_info)
 
        # tokenizer
        encode = self.tokenizer.batch_encode_plus(batch_text, max_length=512, truncation=True, padding=True)
        input_ids, attention_mask = np.array(encode["input_ids"]).astype(np.int32), np.array(
            encode["attention_mask"]).astype(
            np.int32)
 
        origin_input_shape = self.context.get_binding_shape(0)
        origin_input_shape[0], origin_input_shape[1] = input_ids.shape
        self.context.set_binding_shape(0, origin_input_shape)
        self.context.set_binding_shape(1, origin_input_shape)
        inputs, outputs, bindings, stream = trtu.allocate_buffers_v2(self.engine, self.context)
        inputs[0].host = input_ids
        inputs[1].host = attention_mask
        trt_outputs = trtu.do_inference_v2(self.context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream)
        token_embeddings = trt_outputs[0].reshape(input_ids.shape[0], input_ids.shape[1], 768)
 
        # mean pool
        attention_mask = torch.LongTensor(attention_mask)
        token_embeddings = torch.tensor(token_embeddings)
        input_mask_expanded = (
            attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
        )
        sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
        sum_mask = input_mask_expanded.sum(1)
        sum_mask = torch.clamp(sum_mask, min=1e-9)
        token_embeddings = sum_embeddings / sum_mask
 
        # 归一化
        token_embeddings = torch.nn.functional.normalize(token_embeddings, p=2, dim=1)
        token_embeddings = token_embeddings.detach().cpu().numpy().tolist()
 
        # 日志输出处理后的信息
        out_log_info = {
            "embeddings": token_embeddings
        }
        #logging.info(out_log_info)
 
        # TODO 响应数要和请求数一致
        start = 0
        for i in range(len(requests)):
            end = start + batch_len[i]
            out_tensor = pb_utils.Tensor("embeddings",
                                         np.array(token_embeddings[start:end]).astype(self.output_response_dtype))
            start += batch_len[i]
            final_inference_response = pb_utils.InferenceResponse(output_tensors=[out_tensor])
            responses.append(final_inference_response)
 
        return responses
 
    def finalize(self):
        print('Cleaning up...')

其中tensorrt只支持int32,因此需要手动将numpy数据类型转化为int32否则推理报错,trtu.do_inference_v2完成了模型推理。注意从客户端拿到的text和返回的embeddings命名都是要和config.pbtxt保持一致的。
trtutils为现成的推理工具方法,直接使用即可,代码如下

import argparse
import os
import numpy as np
import pycuda.autoinit
import pycuda.driver as cuda
import tensorrt as trt
 
try:
    # Sometimes python does not understand FileNotFoundError
    FileNotFoundError
except NameError:
    FileNotFoundError = IOError
 
EXPLICIT_BATCH = 1 << (int)(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
 
 
def GiB(val):
    return val * 1 << 30
 
 
def add_help(description):
    parser = argparse.ArgumentParser(description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    args, _ = parser.parse_known_args()
 
 
def find_sample_data(description="Runs a TensorRT Python sample", subfolder="", find_files=[], err_msg=""):
    '''
    Parses sample arguments.
    Args:
        description (str): Description of the sample.
        subfolder (str): The subfolder containing data relevant to this sample
        find_files (str): A list of filenames to find. Each filename will be replaced with an absolute path.
    Returns:
        str: Path of data directory.
    '''
 
    # Standard command-line arguments for all samples.
    kDEFAULT_DATA_ROOT = os.path.join(os.sep, "usr", "src", "tensorrt", "data")
    parser = argparse.ArgumentParser(description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("-d", "--datadir",
                        help="Location of the TensorRT sample data directory, and any additional data directories.",
                        action="append", default=[kDEFAULT_DATA_ROOT])
    args, _ = parser.parse_known_args()
 
    def get_data_path(data_dir):
        # If the subfolder exists, append it to the path, otherwise use the provided path as-is.
        data_path = os.path.join(data_dir, subfolder)
        if not os.path.exists(data_path):
            if data_dir != kDEFAULT_DATA_ROOT:
                print("WARNING: " + data_path + " does not exist. Trying " + data_dir + " instead.")
            data_path = data_dir
        # Make sure data directory exists.
        if not (os.path.exists(data_path)) and data_dir != kDEFAULT_DATA_ROOT:
            print("WARNING: {:} does not exist. Please provide the correct data path with the -d option.".format(
                data_path))
        return data_path
 
    data_paths = [get_data_path(data_dir) for data_dir in args.datadir]
    return data_paths, locate_files(data_paths, find_files, err_msg)
 
 
def locate_files(data_paths, filenames, err_msg=""):
    """
    Locates the specified files in the specified data directories.
    If a file exists in multiple data directories, the first directory is used.
    Args:
        data_paths (List[str]): The data directories.
        filename (List[str]): The names of the files to find.
    Returns:
        List[str]: The absolute paths of the files.
    Raises:
        FileNotFoundError if a file could not be located.
    """
    found_files = [None] * len(filenames)
    for data_path in data_paths:
        # Find all requested files.
        for index, (found, filename) in enumerate(zip(found_files, filenames)):
            if not found:
                file_path = os.path.abspath(os.path.join(data_path, filename))
                if os.path.exists(file_path):
                    found_files[index] = file_path
 
    # Check that all files were found
    for f, filename in zip(found_files, filenames):
        if not f or not os.path.exists(f):
            raise FileNotFoundError(
                "Could not find {:}. Searched in data paths: {:}\n{:}".format(filename, data_paths, err_msg))
    return found_files
 
 
# Simple helper data class that's a little nicer to use than a 2-tuple.
class HostDeviceMem(object):
    def __init__(self, host_mem, device_mem):
        self.host = host_mem
        self.device = device_mem
 
    def __str__(self):
        return "Host:\n" + str(self.host) + "\nDevice:\n" + str(self.device)
 
    def __repr__(self):
        return self.__str__()
 
 
# Allocates all buffers required for an engine, i.e. host/device inputs/outputs.
def allocate_buffers(engine):
    inputs = []
    outputs = []
    bindings = []
    stream = cuda.Stream()
    for binding in engine:
        size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size  # max_batch_size=1
        dtype = trt.nptype(engine.get_binding_dtype(binding))
        # Allocate host and device buffers
        host_mem = cuda.pagelocked_empty(size, dtype)
        device_mem = cuda.mem_alloc(host_mem.nbytes)  # # nbytes表示数组中的所有数据消耗掉的字节数
        # Append the device buffer to device bindings.
        bindings.append(int(device_mem))
        # Append to the appropriate list.
        if engine.binding_is_input(binding):
            inputs.append(HostDeviceMem(host_mem, device_mem))
        else:
            outputs.append(HostDeviceMem(host_mem, device_mem))
    return inputs, outputs, bindings, stream
 
 
# Allocates all buffers required for an engine, i.e. host/device inputs/outputs.
def allocate_buffers_v2(engine, context):
    """
    Allocates host and device buffer for TRT engine inference.
    This function is similiar to the one in ../../common.py, but
    converts network outputs (which are np.float32) appropriately
    before writing them to Python buffer. This is needed, since
    TensorRT plugins doesn't support output type description, and
    in our particular case, we use NMS plugin as network output.
    Args:
        engine (trt.ICudaEngine): TensorRT engine
    Returns:
        inputs [HostDeviceMem]: engine input memory
        outputs [HostDeviceMem]: engine output memory
        bindings [int]: buffer to device bindings
        stream (cuda.Stream): cuda stream for engine inference synchronization
    """
    inputs = []
    outputs = []
    bindings = []
    stream = cuda.Stream()
    for i, binding in enumerate(engine):
        # binding:input_ids,input_mask,output
        # print(context.get_binding_shape(i)) # (input_ids,input_mask,output).shape (1,105)
        size = trt.volume(context.get_binding_shape(i))  # 1*105
        # dims = engine.get_binding_shape(binding)
        # if dims[1] < 0:
        # size *= -1
        dtype = trt.nptype(engine.get_binding_dtype(binding))  # DataType.FLOAT
        # print(dtype)  # <class 'numpy.float32'>
        # Allocate host and device buffers
        host_mem = cuda.pagelocked_empty(size, dtype)
        device_mem = cuda.mem_alloc(host_mem.nbytes)
        # Append the device buffer to device bindings.
        bindings.append(int(device_mem))
        # Append to the appropriate list.
        if engine.binding_is_input(binding):
            inputs.append(HostDeviceMem(host_mem, device_mem))
        else:
            outputs.append(HostDeviceMem(host_mem, device_mem))
    return inputs, outputs, bindings, stream
 
 
# This function is generalized for multiple inputs/outputs.
# inputs and outputs are expected to be lists of HostDeviceMem objects.
def do_inference(context, bindings, inputs, outputs, stream, batch_size=1):
    # Transfer input data to the GPU.
    [cuda.memcpy_htod_async(inp.device, inp.host, stream) for inp in inputs]
    # Run inference. batch_size = 1
    context.execute_async(batch_size=batch_size, bindings=bindings, stream_handle=stream.handle)
    # Transfer predictions back from the GPU.
    [cuda.memcpy_dtoh_async(out.host, out.device, stream) for out in outputs]
    # Synchronize the stream
    stream.synchronize()
    # Return only the host outputs.
    return [out.host for out in outputs]
 
 
# This function is generalized for multiple inputs/outputs for full dimension networks.
# inputs and outputs are expected to be lists of HostDeviceMem objects.
def do_inference_v2(context, bindings, inputs, outputs, stream):
    # Transfer input data to the GPU.
    [cuda.memcpy_htod_async(inp.device, inp.host, stream) for inp in inputs]
    # Run inference.
    context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
    # Transfer predictions back from the GPU.
    [cuda.memcpy_dtoh_async(out.host, out.device, stream) for out in outputs]
    # Synchronize the stream
    stream.synchronize()
    # Return only the host outputs.
    return [out.host for out in outputs]

由于m3e模型的特殊性,需要对模型推理结果做后处理,本例增加了mean pool和归一化操作,代码摘自sentence_transformers的源码,由于在模型转换阶段只对原生的Bert模型进行了转化,因此后处理需要额外补充进来。


Triton服务端启动

Triton存在三种启动方式none,poll,explicit,详情见官网文档。本例采用explicit模式启动,该模式下启动服务和关闭服务都需要手动指定,启动docker脚本如下

docker run --rm --gpus=all \
-p18999:8000 -p18998:8001 -p18997:8002 \
--shm-size=1g \
-e PYTHONIOENCODING=utf-8 \
-v /home/model_repository/:/models \
triton_tensorrt_py_23.08:v1 \
--model-repository=/models \
--model-control-mode explicit \
--load-model m3e-base-trt

宿主机暴露三个端口承接容器内的8000,8001,8002,其中8000是客户端请求推理的端口。--load-model指定了在Triton启动的时候加载m3e-base-trt模型为服务,该名称和模型文件夹要保持一致。宿主机/home/model_repository下的目录结构如下所示,一个模型一个服务(实际上可以多个服务,即多版本),一个服务一个模型文件

(base) [root@localhost model_repository]# tree
.
├── bert-base-chinese
├── chatglm3-6b
├── m3e-base
├── m3e-base-trt
│   ├── 1
│   │   ├── m3e-base-trt
│   │   │   ├── m3e.trt
│   │   │   └── vocab.txt
│   │   ├── model.py
│   │   ├── model.py.bak
│   │   ├── __pycache__
│   │   │   ├── model.cpython-310.pyc
│   │   │   ├── model.cpython-38.pyc
│   │   │   ├── trtutils.cpython-310.pyc
│   │   │   └── trtutils.cpython-38.pyc
│   │   ├── trtutils.py
│   │   └── work
│   │       └── version.txt
│   └── config.pbtxt
├── Qwen1.5-1.8B-Chat
└── Qwen1.5-4B-Chat

docker启动后显示以下日志代表启动成功

I0422 10:35:54.844803 1 grpc_server.cc:2451] Started GRPCInferenceService at 0.0.0.0:8001
I0422 10:35:54.845428 1 http_server.cc:3558] Started HTTPService at 0.0.0.0:8000
I0422 10:35:54.888419 1 http_server.cc:187] Started Metrics Service at 0.0.0.0:8002

可以使用curl请求来关停模型服务

curl -X POST http://0.0.0.0:18999/v2/repository/models/m3e-base-trt/unload

同样也可以再启动

curl -X POST http://0.0.0.0:18999/v2/repository/models/m3e-base-trt/load

注意可以在模型服务中进行load,此时Triton会检查模型目录下是否有变动,如果有变动此时load相当于reload,如果没有变动则load没有任何反应。
另外可以通过以下HTTP请求查看模型仓库下的所有模型,和已经准备就绪服务的模型

curl -X POST http://10.2.13.11:18999/v2/repository/index
[{"name":"Qwen1.5-1.8B-Chat"},{"name":"Qwen1.5-4B-Chat"},{"name":"bert-base-chinese"},{"name":"chatglm3-6b"},{"name":"m3e-base"},{"name":"m3e-base-trt","version":"1","state":"READY"},{"name":"onnx_trt"}]

其中只有m3e-base-trt为READY状态,代表Triton目前只有一个模型在服务。


HTTP客户端请求

Triton暴露的服务支持HTTP和GRPC,本例采用更加通用的HTTP,请求的无如下

import json
import requests
 
 
if __name__ == '__main__':
    import time
    t1 = time.time()
    data = ["酒店很好", "pip清华源 清华大学的镜像", "源这篇文章主要为大家介绍了如何", "关内容,包含IT学习相关文档"]
    for d in data:
        url = "http://10.2.13.11:18999/v2/models/m3e-base-trt/infer"
        raw_data = {
            "inputs": [
                {
                    "name": "text",
                    "datatype": "BYTES",
                    "shape": [1, 1],
                    "data": [d]
                }
            ],
            "outputs": [
                {
                    "name": "embeddings",
                    "shape": [1, 768],
                }
            ]
        }
        res = requests.post(url, json.dumps(raw_data, ensure_ascii=True), headers={"Content_Type": "application/json"},timeout=2000)
    print(res.text)

返回如下

/usr/bin/python3.8 /home/myproject/bisai/m3e_trt_client.py
{"model_name":"m3e-base-trt","model_version":"1","outputs":[{"name":"embeddings","datatype":"FP32","shape":[1,768],"data":[0.014592116698622704,-0.020276973024010659,0.046606432646512988,-0....]
 
Process finished with exit code 0

注意一个请求带有多个数据,返回的结果会被Flatten为一行,所以需要后处理reshape。
Triton的HTTP采用KServe的协议,更多请求方式请看KServe官网


TensorRT前后压测结果对比

本次压测考察推理服务在一段时间不间断不同并发数的请求下,推理成功的吞吐量和95%响应时间,具体解释如下

  • 并发数:分别取并发数为1, 2, 4, 16, 32
  • 一段时间:取1分钟,1分钟连续不间断请求
  • 吞吐量:单位为每秒能推理成功的请求数,infer / s
  • 95%延迟时间:所有返回请求的响应时间的95%分位数,就是说95%的请求响应时间应该小于这个值

并发测试脚本如下

import os
import time
import json
import threading
from typing import List
from concurrent.futures import ThreadPoolExecutor
 
import requests
 
 
class InterfacePressureTesting:
    def __init__(self, concurrency, percent=0.95, duration=60):
        self.concurrency = concurrency
        self.percent = percent
        self.duration = duration
        self.lock = threading.RLock()
        self.finished = 0
        self.delay = []
 
    def decorator(self, func):
        def wrapper(d: str):
            t = time.time()
            func(d)
            self.lock.acquire()
            self.delay.append(time.time() - t)
            self.finished += 1
            self.lock.release()
        return wrapper
 
    def start(self, job, data: List[str]):
        executor = ThreadPoolExecutor(self.concurrency)
        for d in data * 10000:
            executor.submit(self.decorator(job), d)
        time.sleep(self.duration)
        executor.shutdown(wait=False)
        print(self.finished / self.duration)
        print(sorted(self.delay)[int(len(self.delay) * self.percent)])
        os.kill(os.getpid(), 9)
 
if __name__ == '__main__':
    import sys
    api_test = InterfacePressureTesting(concurrency=int(sys.argv[1]))
    # 定义接口请求逻辑job函数,["xxx", "xxx", "xxx", "xxx"]是造的请求数据文本
    api_test.start(job, ["xxx", "xxx", "xxx", "xxx", ...])

笔者的环境是一块gtx 1080Ti的GPU,推理服务为m3e-base embedding服务,其本质是一个bert-base的推理,分别对比PyTorch作为后端部署和TensorRT作为后端部署的各项压测指标,推理服务器采用Triton。

  • 第一组:一块GPU,一个实例
一块GPU,一个实例性能测试

在没有并发的情况下(并发为1),TensorRT的推理延迟比PyTorch降低48%,吞吐量提高82%将近一倍,随着并发的增大,TensorRT对性能的提升更加明显,基本稳定提升PyTorch一倍。由于只有一个实例,并发高了之后吞吐也基本饱和了。

  • 第二组:一块GPU,两个实例
第二组:一块GPU,两个实例性能测试

因为有两个实例来分摊请求,PyTorch和TensorRT的推理性能差距被缩小,推理服务器的多实例策略微弥补了PyTorch推理性能的不足。在多实例和并发场景下,TensorRT性能稳定超过PyTorch的60%。

  • 第三组:一块GPU,两个实例,允许服务端动态批处理,最大批次32, 合并请求允许最大等待0.002秒,就是说服务端会等待2ms,将2ms以内的所有请求合并进行批量推理,或者请求提前达到最大批次32直接推理。
第三组:一块GPU,两个实例,带有动态批处理

随着服务端批处理策略的加入,吞吐量有巨量的提升,相比于没有动态批处理提升了3倍(152 -> 468),开启动态批处理之后,PyTorch和TensorRT的差距再次缩小,且并发越大,批处理越明显,差距越小,TensorRT推理性能稳定超越Pytorch。虽然推理后端不行,但是合理的推理服务器优化也可以提高吞吐量。

最终结论:在没有任何服务端策略优化的情况下,裸预测性能TensorRT是PyTorch的2倍(gtx 1080ti),如果在推理服务器增加策略优化,比如动态批处理,多实例部署,则在高并发场景下,TensorRT和PyTorch的性能差异会被缩小,仅从后端这个角色上来说TensorRT稳定且可观超越PyTorch。


?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容