大家好,欢迎回到学习:)。在本文中,我将继续讨论Apache Spark的深度学习。你可以在这里看到第一部分。
在这一部分中,我将全面关注DL流水线库以及如何从头开始使用它。
Apache Spark时间轴
Apache Spark的持续改进使我们现今能够讨论如何使用它进行深度学习。我创建了Apache Spark到目前为止开发的详细时间表,以了解我们如何到达这里。
很快,我将创建一篇文章,介绍这个时间表,但如果你认为有什么缺失,请让我知道:)
深度学习流水线
深度学习流水线是由Databricks创建的开放源代码库,它提供了高级API以便使用Apache Spark在Python中进行可伸缩深度学习。
databricks/spark-deep-learning
spark-deep-learning - Apache Spark 深度学习流水线
这是一项非常棒的工作,不久应该就会合并到官方API,所以值得一看。
与Spark和DL相比,这个库的一些优点是:
- 本着Spark和Spark MLlib的精神,它提供了易于使用的API,可以只用几行代码就可以进行深入学习。
- 它侧重于易用性和集成性,而不牺牲性能。
- 它由Apache Spark(也是主要贡献者)的创建者构建,因此它更有可能被合并为官方API。
- 它是用Python编写的,因此它将与所有着名的库集成在一起,现在它使用TensorFlow和Keras这两个主要的库来执行DL。
Deep Learning Pipelines基于Apache Spark的ML Pipelines进行训练,并使用Spark DataFrame和SQL来部署模型。它包含深度学习常见方面的高级API,因此可以通过几行代码高效完成:
- 图像加载
- 在Spark ML流水线中应用预先训练的模型作为转换器(transformers)
- 转移学习(Transfer learning)
- 大规模应用深度学习模型
- 分布式超参数调整(Distributed hyperparameter tuning)(下一部分)
- 在DataFrames和SQL中部署模型
我将用例子详细描述这些功能。这些例子来自Databricks的官方笔记 。
Apache Spark深度认知(Deep Cognition)
要运行和测试本文中的代码,您需要在Deep Cognition中创建一个帐户。
非常容易,然后你可以访问他们的所有功能。当你登录时,你应该看到:
现在只需点击左侧的笔记按钮:
你将在Jupyter笔记上安装所有的软件包:)。哦!这里需要注意的是:Spark Notebook(DLS SPARK)即将推出,将在下个月的某个时候公开发布,并告诉它它仍处于私人测试阶段(仅适用于本文)。
您可以在这里下载完整的Notebook以查看所有代码:
https://github.com/FavioVazquez/deep-learning-pyspark
图像加载
在图像上应用深度学习的第一步是加载图像的能力。深度学习流水线包括实用功能,可以将数百万张图像加载到DataFrame中并以分布式方式自动解码,从而可以进行大规模操作。Spark(2.3.0)的新版本也具有此功能,但我们将使用sparkdl库。
我们将使用由TensorFlow策划的创作共同许可(creative-commons )花卉照片档案进行测试。要获得一组花卉照片,请从笔记运行这些命令(我们也将创建一个样本文件夹):
https://gist.github.com/FavioVazquez/33350294e31213ff761bf2ff51e25c4a#file-load_photos-py
!curl -O http://download.tensorflow.org/example_images/flower_photos.tgz
!tar xzf flower_photos.tgz
!mkdir flower_photos/sample
让我们从郁金香和雏菊文件夹中复制一些照片来创建一小部分照片。
!cp flower_photos/daisy/100080576_f52e8ee070_n.jpg flower_photos/sample/
!cp flower_photos/daisy/10140303196_b88d3d6cec.jpg flower_photos/sample/
!cp flower_photos/tulips/100930342_92e8746431_n.jpg flower_photos/sample/
要查看笔记上的这些图像,可以运行以下命令:
import IPython.display as dp
# collect all .png files in ssample dir
fs = !ls flower_photos/sample/*.jpg
# create list of image objects
images = []
for ea in fs:
images.append(dp.Image(filename=ea, format='png'))
# display all images
for ea in images:
dp.display_png(ea)
你应该看到这个
现在我们使用Spark将这些图像作为DataFrame加载。spark.readImage
方法可让您以常见格式(jpg,png等)将图像从HDFS存储读取到DataFrame中。每个图像都以imageSchema格式存储为一行。递归选项允许您从子文件夹读取图像,例如正面和负面标记的样本。sampleRatio参数允许您在训练具有完整数据的模型之前尝试更小的图像样本。
from sparkdl import readImages
# Read images using Spark
image_df = readImages("flower_photos/sample/")
如果我们看一下这个数据框,我们就会看到它创建了一个名为“image”的列。
image_df.show()
+--------------------+
| image|
+--------------------+
|[file:/Users/favi...|
|[file:/Users/favi...|
|[file:/Users/favi...|
+--------------------+
图像列包含一个字符串列,其中包含一个图像结构,schema == ImageSchema。
转换学习(Transfer learning)
深度学习流水线提供实用程序来执行图像转换学习( transfer learning) ,这是开始使用深度学习的最快速(代码和运行时)方式之一。使用深度学习流水线,只需几行代码即可完成。
深度学习流水线可以通过Featurizer的概念实现快速转换学习。以下示例将Spark中的InceptionV3模型和逻辑回归相结合,以将InceptionV3调整到我们的特定的领域。DeepImageFeaturizer会自动剥离预先训练好的神经网络的最后一层,并使用前面所有层的输出作为逻辑回归算法的特征 。由于逻辑回归是一种简单而快速的算法,这种转移学习训练可以使用远远少于通常需要从底层训练深度学习模型所需的图像来快速收敛。
首先,我们需要创建用于转移学习的训练和测试数据框。
from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import lit
from sparkdl.image import imageIO
tulips_df = ImageSchema.readImages("flower_photos/tulips").withColumn("label", lit(1))
daisy_df = imageIO.readImagesWithCustomFn("flower_photos/daisy", decode_f=imageIO.PIL_decode).withColumn("label", lit(0))
tulips_train, tulips_test, _ = tulips_df.randomSplit([0.1, 0.05, 0.85]) # use larger training sets (e.g. [0.6, 0.4] for getting more images)
daisy_train, daisy_test, _ = daisy_df.randomSplit([0.1, 0.05, 0.85]) # use larger training sets (e.g. [0.6, 0.4] for getting more images)
train_df = tulips_train.unionAll(daisy_train)
test_df = tulips_test.unionAll(daisy_test)
# Under the hood, each of the partitions is fully loaded in memory, which may be expensive.
# This ensure that each of the paritions has a small size.
train_df = train_df.repartition(100)
test_df = test_df.repartition(100)
现在让我们来训练模型
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=10, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])
p_model = p.fit(train_df)
让我们看看模型的效果如何:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
tested_df = p_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label"))))
Test set accuracy = 0.9753086419753086
对于一个例子来说并不是那么糟糕,完全没有任何调整!
我们可以看看我们犯错的地方:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import expr
from pyspark.sql.functions import *
from pyspark.sql.types import *
def _p1(v):
return float(v.array[1])y
take_one = udf(_p1, DoubleType())
df = tested_df.withColumn("p", take_one(tested_df.probability))
wrong_df = df.orderBy(expr("abs(p - label)"), ascending=False)
wrong_df.select("image.origin", "p", "label").show(10)
大规模应用深度学习模型
深度学习流水线支持使用Spark以分布式方式运行预先训练的模型,可用于批处理和流数据处理 。
它包含一些最流行的模型,使用户无需花费昂贵的训练模型即可开始深入学习。当然,模型的预测与Spark所带来的所有好处并行完成。
除了使用内置模型之外,用户还可以在Spark预测流水线中插入Keras模型和TensorFlow Graphs。这将单节点工具上的任何单节点模型转换为可以以分布式方式应用于大量数据的模型。
下面的代码使用InceptionV3(一种用于图像分类的最新卷积神经网络(CNN)模型)创建Spark预测流水线,并预测我们刚刚加载的图像中有哪些对象。
from sparkdl import DeepImagePredictor
# Read images using Spark
image_df = ImageSchema.readImages("flower_photos/sample/")
predictor = DeepImagePredictor(inputCol="image", outputCol="predicted_labels", modelName="InceptionV3", decodePredictions=True, topK=10)
predictions_df = predictor.transform(image_df)
我们来看看预测数据框:
predictions_df.select("predicted_labels").show(truncate=False,n=3)
+----------------+
|predicted_labels| | |
+----------------+
|[[n03930313, picket_fence, 0.1424783], [n11939491, daisy, 0.10951301], [n03991062, pot, 0.04505], [n02206856, bee, 0.03734662], [n02280649, cabbage_butterfly, 0.019011213], [n13133613, ear, 0.017185668], [n02219486, ant, 0.014198389], [n02281406, sulphur_butterfly, 0.013113698], [n12620546, hip, 0.012272579], [n03457902, greenhouse, 0.011370744]] |
|[[n11939491, daisy, 0.9532104], [n02219486, ant, 6.175268E-4], [n02206856, bee, 5.1203516E-4], [n02190166, fly, 4.0093894E-4], [n02165456, ladybug, 3.70687E-4], [n02281406, sulphur_butterfly, 3.0587992E-4], [n02112018, Pomeranian, 2.9011074E-4], [n01795545, black_grouse, 2.5667972E-4], [n02177972, weevil, 2.4875381E-4], [n07745940, strawberry, 2.3729511E-4]]|
|[[n11939491, daisy, 0.89181453], [n02219486, ant, 0.0012404523], [n02206856, bee, 8.13047E-4], [n02190166, fly, 6.03804E-4], [n02165456, ladybug, 6.005444E-4], [n02281406, sulphur_butterfly, 5.32096E-4], [n04599235, wool, 4.6653638E-4], [n02112018, Pomeranian, 4.625338E-4], [n07930864, cup, 4.400617E-4], [n02177972, weevil, 4.2434104E-4]] |
+----------------+
only showing top 3 rows
请注意, 由于某种原因,郁金香(tulip)更接近栅栏(picket fence)而不是花朵(可能是因为照片的背景) ,因此predicted_labels
列显示“雏菊”是使用此基本模型的所有样本花朵的高概率类别。
然而,从概率值的差异可以看出,神经网络具有辨别两种花型的信息。因此,我们上面的转换学习示例能够从基本模型开始正确地学习雏菊和郁金香之间的差异。
让我们看看我们的模型如何识别花的类型:
df = p_model.transform(image_df)
# 100930342_92e8746431_n.jpg not a daisy
df.select("image.origin",(1-take_one(df.probability)).alias("p_daisy")).show(truncate=False)
+---------------------------------------------------+--------------------+
|origin |p_daisy |
+---------------------------------------------------+--------------------+
|.../100930342_92e8746431_n.jpg |0.016760347798379538|
|.../10140303196_b88d3d6cec.jpg |0.9704259547739851 |
|.../100080576_f52e8ee070_n.jpg |0.9705190124824862 |
+------------------------------------------------------------------------+
对于Keras用户
为了使用Spark以分布式方式应用Keras模型, KerasImageFileTransformer在TensorFlow支持的Keras模型上工作。它具有:
- 通过将用户指定的图像加载和处理功能应用于包含一列图像URI的输入数据框,在内部创建一个包含图像列的DataFrame
- 从给定的模型文件路径加载Keras模型
- 将模型应用于图像DataFrame
要使用转换器,我们首先需要将Keras模型存储为文件。对于这个笔记,我们只保存Keras内置的InceptionV3模型,而不是训练一个。
from keras.applications import InceptionV3
model = InceptionV3(weights="imagenet")
model.save('model-full.h5') # saves to the local filesystem
现在我们将创建一个Keras变换器,但首先我们将对图像进行预处理以使用它
from keras.applications.inception_v3 import preprocess_input
from keras.preprocessing.image import img_to_array, load_img
import numpy as np
from pyspark.sql.types import StringType
from sparkdl import KerasImageFileTransformer
def loadAndPreprocessKerasInceptionV3(uri):
# this is a typical way to load and prep images in keras
image = img_to_array(load_img(uri, target_size=(299, 299))) # image dimensions for InceptionV3
image = np.expand_dims(image, axis=0)
return preprocess_input(image)
transformer = KerasImageFileTransformer(inputCol="uri", outputCol="predictions",
modelFile='model-full.h5', # local file path for model
imageLoader=loadAndPreprocessKerasInceptionV3,
outputMode="vector")
我们现在将读取图像并将它们加载到Spark Dataframe中,并使用我们的变换器将模型应用到图像中:
fs = !ls flower_photos/sample/*.jpg
uri_df = spark.createDataFrame(fs, StringType()).toDF("uri")
keras_pred_df = transformer.transform(uri_df)
如果我们通过预测来看这个数据框,我们会看到很多信息,这就是InceptionV3模型中每个类的概率。
与通用张量(general tensors)一起工作
深度学习流水线还提供了使用张量输入(最多2维)应用模型的方法,这些模型由流行的深度学习库编写而成:
- 张量流图(TensorFlow graphs)
- Keras模型
在本文中,我们只会关注Keras模型。KerasTransformer
将TensorFlow支持的KerasTransformer
模型应用于最多2维的张量输入。它从给定的模型文件路径加载一个Keras模型,并将该模型应用到一列数组(其中一个数组对应一个张量),输出一列数组。
from sparkdl import KerasTransformer
from keras.models import Sequential
from keras.layers import Dense
import numpy as np
# Generate random input data
num_features = 10
num_examples = 100
input_data = [{"features" : np.random.randn(num_features).astype(float).tolist()} for i in range(num_examples)]
schema = StructType([ StructField("features", ArrayType(FloatType()), True)])
input_df = spark.createDataFrame(input_data, schema)
# Create and save a single-hidden-layer Keras model for binary classification
# NOTE: In a typical workflow, we'd train the model before exporting it to disk,
# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "simple-binary-classification"
model.save(model_path)
# Create transformer and apply it to our input data
transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)
final_df = transformer.transform(input_df)
final_df.show()
+-------------+--------------------+
| predictions| features|
+-------------+--------------------+
| [0.86104786]|[-0.76344526, 0.2...|
| [0.21693115]|[0.41084298, 0.93...|
|[0.057743043]|[0.062970825, 0.3...|
| [0.43409333]|[-0.43408343, -1....|
| [0.43690935]|[-0.89413625, 0.8...|
| [0.49984664]|[-0.82052463, -0....|
| [0.6204273]|[-0.5075533, 0.54...|
| [0.2285336]|[0.016106872, -0....|
| [0.37478408]|[-1.6756374, 0.84...|
| [0.2997861]|[-0.34952268, 1.2...|
| [0.3885377]|[0.1639214, -0.22...|
| [0.5006814]|[0.91551965, -0.3...|
| [0.20518135]|[-1.2620118, -0.4...|
| [0.18882117]|[-0.14812712, 0.8...|
| [0.49993372]|[1.4617485, -0.33...|
| [0.42390883]|[-0.877813, 0.603...|
| [0.5232896]|[-0.031451378, -1...|
| [0.45858437]|[0.9310042, -1.77...|
| [0.49794272]|[-0.37061003, -1....|
| [0.2543479]|[0.41954428, 1.88...|
+-------------+--------------------+
only showing top 20 rows
在SQL中部署模型
产品化模型的一种方法是将其部署为Spark SQL用户定义函数,该函数允许任何知道SQL的人使用它。深度学习流水线提供了采用深度学习模型并注册Spark SQL用户定义函数(UDF)的机制。尤其是,Deep Learning Pipelines 0.2.0增加了对Keras模型中创建SQL UDF的支持,该模型在图像数据上工作。
生成的UDF需要一列(格式化为图像结构“SpImage”)并生成给定Keras模型的输出;例如,对于Inception V3,它会在ImageNet对象类别上生成一个实数评分向量(real valued score vector)。
from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF
registerKerasImageUDF("inceptionV3_udf", InceptionV3(weights="imagenet"))
在处理图像的Keras工作流程中,在将模型应用于图像之前,经常会有预处理步骤。如果我们的工作流程需要预处理,我们可以选择为UDF注册提供预处理功能。预处理器应该接受一个文件路径并返回一个图像数组;下面是一个简单的例子。
from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF
def keras_load_img(fpath):
from keras.preprocessing.image import load_img, img_to_array
import numpy as np
img = load_img(fpath, target_size=(299, 299))
return img_to_array(img).astype(np.uint8)
registerKerasImageUDF("inceptionV3_udf_with_preprocessing", InceptionV3(weights="imagenet"), keras_load_img)
一旦注册了UDF,就可以在SQL查询中使用它:
from pyspark.ml.image import ImageSchema
image_df = ImageSchema.readImages("flower_photos/sample/")
image_df.registerTempTable("sample_images")
df = spark.sql("SELECT inceptionV3_udf_with_preprocessing(image) as predictions from sample_images").show(truncate=False)
这非常强大。一旦数据科学家建立了所需的模型,Deep Learning Pipelines可以很容易地将其作为SQL中的一个函数公开,因此组织中的任何人都可以使用它 - 数据工程师,数据科学家,业务分析师,任何人。
sparkdl.registerKerasUDF("awesome_dl_model", "/mymodels/businessmodel.h5")
接下来,组织中的任何用户都可以在SQL中应用预测:
SELECT image, awesome_dl_model(image) label FROM images WHERE contains(label, “Product”)
在下一部分我将讨论Spark的分布式超参数调优,并将尝试新的模型和示例:)。