杭州手机网站制作,男女做爰视频网站,app开发和网站开发的区别,网站开发公司 上海背景
在数据量较大的业务场景中#xff0c;spark在数据处理、传统机器学习训练、 深度学习相关业务#xff0c;能取得较明显的效率提升。 本篇围绕spark大数据背景下的推理#xff0c;介绍一些优雅的使用方式。
spark适用场景
大数据量自定义方法处理、类sql处理传统机器…背景
在数据量较大的业务场景中spark在数据处理、传统机器学习训练、 深度学习相关业务能取得较明显的效率提升。 本篇围绕spark大数据背景下的推理介绍一些优雅的使用方式。
spark适用场景
大数据量自定义方法处理、类sql处理传统机器学习方法k-means、xgboost、lr…分布式深度学习推理
目前在10亿数据量的推理场景中使用需要用户自己实现批数据准备基于RDD的方法完成模型推理输出。 业务使用中的问题
模型文件重复导入加载自定义批数据准备脱离深度学习dataloader框架操作略显麻烦有性能和内存oom等问题。
实践
spark加速深度学习推理
spark加速深度学习推理基本思路为
开启不定量worker并行执行cpu或gpu推理任务所有worker共享同一份模型参数依赖spark pandas udf功能方便并行处理 dataframe 数据依赖深度学习框架方便实现最优批数据划分 下面以pytorch resnet 为实践demo
加载广播模型参数
广播模型参数不仅能减少模型重复加载带来的流量和io而且能加速推理前模型加载的速度。 driver广播模型参数
# Load ResNet50 on driver node and broadcast its state.
model_state models.resnet50(pretrainedTrue).state_dict()
bc_model_state sc.broadcast(model_state)worker读取模型参数
def get_model_for_eval():Gets the broadcasted model.model models.resnet50(pretrainedTrue)model.load_state_dict(bc_model_state.value)model.eval()return model实现基于dataframe的dataset
目前主流的深度学习框架dataset的实现大多基于本地存储在读取分布式存储的场景 需要用户自定义实现。 自定义实现有2个方法
使用分布式存储的api接口读取文件内容dataset读取dataframe二进制文件内容
方法一迭代与使用的存储类型会保持同步且每次使用前需要明确使用的分布式存储虽然实现方法容易但是使用流程略显麻烦。 方法二不需要关心分布式存储类型只要需要获取并解析spark dataframe列传入内容即可。
本文采用方法二实现dataset:
# 从二进制流中解析图片信息
def pil_loader(binary_file):# open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835)image_io io.BytesIO(binary_file)img Image.open(image_io)return img.convert(RGB)# Create a custom PyTorch dataset class.
class ImageDataset(Dataset):def __init__(self, data, transformNone):self.data dataself.transform transformdef __len__(self):return len(self.data)def __getitem__(self, index):image pil_loader(self.data[index])if self.transform is not None:image self.transform(image)return image实现批量推理的pandas udf
Pandas udf是基于RDD的一个低门槛高性能的实现方法pandas udf能自定义处理逻辑以列的方式操作datafrme内容。 这是社区目前推荐的自定义处理方式。
# Define the function for model inference.
# PyArrow 1.0.0 must be installed;
pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(binaray_data: pd.Series) - pd.Series:transform transforms.Compose([transforms.Resize(224),transforms.CenterCrop(224),transforms.ToTensor(),transforms.Normalize(mean[0.485, 0.456, 0.406],std[0.229, 0.224, 0.225])])images ImageDataset(binaray_data, transformtransform)loader torch.utils.data.DataLoader(images, batch_size500, num_workers8)model get_model_for_eval()model.to(device)all_predictions []with torch.no_grad():for batch in loader:predictions list(model(batch.to(device)).cpu().numpy())for prediction in predictions:all_predictions.append(prediction)return pd.Series(all_predictions)# 调用pandas udf
predictions_df df. \select(col(filename), predict_batch_udf(col(data)).alias(prediction))更多代码细节 https://github.com/Crazybean-lwb/deeplearning-pyspark/blob/master/examples/pytorch-inference.py
模型仓加速推理
打通到模型仓mlflow功能
模型存储和版本管理便捷取用适用spark datarame更高阶的pandas udf实现 # Create the PySpark UDF
import mlflow.pyfunc
pyfunc_udf mlflow.pyfunc.spark_udf(spark, model_urimodel_uri)# 调用pandas udf
df spark_df.withColumn(prediction, pyfunc_udf(struct([...])))参考信息
pytorch分布式批量推理tensorflow分布式批量推理模型仓mlflow协助分布式批量推理