数据, 术→技巧

Scikit-Learn学习之管道Pipeline

钱魏Way · · 0 次浏览

Scikit-Learn的Pipeline是一个工具,可以将多个数据预处理和建模步骤连接起来,形成一个完整的机器学习工作流。它允许用户通过链式执行多个转换步骤并最终拟合一个模型,从而使代码更加简洁。下面我们将详细介绍Pipeline的各个组件及其使用方法。

流水线的基本结构

在scikit-learn中,Pipeline由多个步骤组成,每个步骤都是一个元组(name, transform),其中name为步骤名称,transform为要执行的转换对象。这些元组按照顺序组成了流水线,最后一个元组的transform对象是一个机器学习模型。

例如我们需要做如下操作,可以看出有很多重复代码:

vect = CountVectorizer()
tfidf = TfidfTransformer()
clf = SGDClassifier()
vX = vect.fit_transform(Xtrain)
tfidfX = tfidf.fit_transform(vX)
predicted = clf.fit_predict(tfidfX)
# Now evaluate all steps on test set
vX = vect.fit_transform(Xtest)
tfidfX = tfidf.fit_transform(vX)
predicted = clf.fit_predict(tfidfX)

利用pipeline,上面代码可以抽象为:

pipeline = Pipeline([
    ('vect', CountVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier()),
])
predicted = pipeline.fit(Xtrain).predict(Xtrain)
predicted = pipeline.predict(Xtest)

注意,pipeline最后一步如果有predict()方法我们才可以对pipeline使用fit_predict(),同理,最后一步如果有transform()方法我们才可以对pipeline使用fit_transform()方法。

Pipeline实例讲解

以下代码创建了一个简单的Pipeline,该流水线首先对输入数据进行特征缩放,然后使用支持向量机算法进行分类:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('svm', SVC())
])

在上面的代码中,StandardScaler和SVC均为scikit-learn提供的预定义类。第一个步骤将输入数据进行标准化处理,第二个步骤则调用支持向量机算法进行分类。注意,每个步骤的名称 ‘scaler’ 和 ‘svm’ 可以随意指定,但必须唯一。

步骤:Transformers 和 Estimators

在scikit-learn中,流水线中的每个步骤都必须是一个Transformer或Estimator对象。

  • Transformer对象接受输入,并对其进行某种形式的转换,生成输出;例如,StandardScaler用于将数据集的每个特征缩放到零均值和单位方差。其他常见的变换包括对数据进行编码(例如one-hot编码)和填充缺失值。
  • Estimator对象通过拟合数据集来生成一个预测模型,即机器学习模型;例如,SVC就是一个Estimator,可以通过训练数据来生成支持向量机分类器。

流水线的参数

对于每个步骤,可以使用以下语法设置参数:

(name, Transformer(**params))

其中params是一个字典,包含要传递给Transformer的超参数。例如:

from sklearn.impute import SimpleImputer

pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
    ('svm', SVC(C=1.0))
])

在上述代码中,SimpleImputer被添加为流水线的第一个步骤,并且带有一个名为strategy的超参数,它的值为’median’。这告诉SimpleImputer使用中位数填充缺失值。

将流水线应用于数据

流水线的主要目的是使模型开发者能够轻松地将多个步骤组合在一起,以便进行数据预处理、特征提取和模型拟合等操作。当流水线构建好后,可以调用其fit()方法将其应用于数据,如下所示:

pipeline.fit(X_train, y_train)

在上述代码中,X_train是输入特征变量的训练集,y_train是相应的训练标签。调用fit()方法将沿着流水线执行每个步骤,直到最后一个步骤完成模型拟合。

对于测试数据,可以使用predict()方法进行预测:

y_pred = pipeline.predict(X_test)

流水线的交叉验证

Pipeline结合cross_val_score和GridSearchCV等函数,可实现快速有效的模型选择和调参。例如,以下代码展示了如何使用交叉验证选择最佳正则化超参数C:

from sklearn.model_selection import GridSearchCV

param_grid = {
    'svm__C': [0.1, 1, 10],
}

grid_search = GridSearchCV(pipeline, param_grid=param_grid, cv=5)
grid_search.fit(X_train, y_train)

print(grid_search.best_params_)

在上述代码中,网格搜索通过遍历不同的正则化超参数值[0.1, 1, 10],选择在交叉验证期间得分最高的超参数值。此处需要注意的是,由于正则化参数C定义在流水线的svm步骤中,因此在param_grid中需要将超参数命名为’svm__C’。

自定义transformer

我们可以如下自定义transformer(来自Using Pipelines and FeatureUnions in scikit-learn – Michelle Fullwood

from sklearn.base import BaseEstimator, TransformerMixin
class SampleExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, vars):
        self.vars = vars  # e.g. pass in a column name to extract
    def transform(self, X, y=None):
        return do_something_to(X, self.vars)  # where the actual feature extraction happens
    def fit(self, X, y=None):
        return self  # generally does nothing

另外,我们也可以对每个feature单独处理,例如下面的这个比较大的流水线(来自Using scikit-learn Pipelines and FeatureUnions (zacstewart.com)),我们可以发现作者的pipeline中,首先是一个叫做features的FeatureUnion,其中,每个特征分别以一个pipeline来处理,这个pipeline首先是一个ColumnExtractor提取出这个特征,后续进行一系列处理转换,最终这些pipeline组合为特征组合,再喂给一系列ModelTransformer包装的模型来predict,最终使用KNeighborsRegressor预测(相当于两层stacking)。

pipeline = Pipeline([
    ('features', FeatureUnion([
        ('continuous', Pipeline([
            ('extract', ColumnExtractor(CONTINUOUS_FIELDS)),
            ('scale', Normalizer())
        ])),
        ('factors', Pipeline([
            ('extract', ColumnExtractor(FACTOR_FIELDS)),
            ('one_hot', OneHotEncoder(n_values=5)),
            ('to_dense', DenseTransformer())
        ])),
        ('weekday', Pipeline([
            ('extract', DayOfWeekTransformer()),
            ('one_hot', OneHotEncoder()),
            ('to_dense', DenseTransformer())
        ])),
        ('hour_of_day', HourOfDayTransformer()),
        ('month', Pipeline([
            ('extract', ColumnExtractor(['datetime'])),
            ('to_month', DateTransformer()),
            ('one_hot', OneHotEncoder()),
            ('to_dense', DenseTransformer())
        ])),
        ('growth', Pipeline([
            ('datetime', ColumnExtractor(['datetime'])),
            ('to_numeric', MatrixConversion(int)),
            ('regression', ModelTransformer(LinearRegression()))
        ]))
    ])),
    ('estimators', FeatureUnion([
        ('knn', ModelTransformer(KNeighborsRegressor(n_neighbors=5))),
        ('gbr', ModelTransformer(GradientBoostingRegressor())),
        ('dtr', ModelTransformer(DecisionTreeRegressor())),
        ('etr', ModelTransformer(ExtraTreesRegressor())),
        ('rfr', ModelTransformer(RandomForestRegressor())),
        ('par', ModelTransformer(PassiveAggressiveRegressor())),
        ('en', ModelTransformer(ElasticNet())),
        ('cluster', ModelTransformer(KMeans(n_clusters=2)))
    ])),
    ('estimator', KNeighborsRegressor())
])

class HourOfDayTransformer(TransformerMixin):
    def transform(self, X, **transform_params):
        hours = DataFrame(X['datetime'].apply(lambda x: x.hour))
        return hours
    def fit(self, X, y=None, **fit_params):
        return self

class ModelTransformer(TransformerMixin):
    def __init__(self, model):
        self.model = model
    def fit(self, *args, **kwargs):
        self.model.fit(*args, **kwargs)
        return self
    def transform(self, X, **transform_params):
        return DataFrame(self.model.predict(X))

FeatureUnion

sklearn.pipeline.FeatureUnion — scikit-learn 0.19.1 documentation 和pipeline的序列执行不同,FeatureUnion指的是并行地应用许多transformer在input上,再将结果合并,所以自然地适合特征工程中的增加特征,而FeatureUnion与pipeline组合可以方便的完成许多复杂的操作,例如:

pipeline = Pipeline([
  ('extract_essays', EssayExractor()),
  ('features', FeatureUnion([
    ('ngram_tf_idf', Pipeline([
      ('counts', CountVectorizer()),
      ('tf_idf', TfidfTransformer())
    ])),
    ('essay_length', LengthTransformer()),
    ('misspellings', MispellingCountTransformer())
  ])),
  ('classifier', MultinomialNB())
])

整个features是一个FeatureUnion,而其中的ngram_tf_idf又是一个包括两步的pipeline。

下面的例子中,使用FeatureUnion结合PCA降维后特征以及选择原特征中的几个作为特征组合再喂给SVM分类,最后用grid_search 做了 pca的n_components、SelectKBest的k以及SVM的C的CV。

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
from sklearn.datasets import load_iris
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
iris = load_iris()
X, y = iris.data, iris.target
print(X.shape, y.shape)
# This dataset is way too high-dimensional. Better do PCA:
pca = PCA()
# Maybe some original features where good, too?
selection = SelectKBest()
# Build estimator from PCA and Univariate selection:
svm = SVC(kernel="linear")
# Do grid search over k, n_components and C:
pipeline = Pipeline([("features",
                      FeatureUnion([("pca", pca), ("univ_select",selection)])), ("svm",svm)])
param_grid = dict(
    features__pca__n_components=[1, 2, 3],
    features__univ_select__k=[1, 2],
    svm__C=[0.1, 1, 10])
grid_search = GridSearchCV(pipeline, param_grid=param_grid, verbose=10)
grid_search.fit(X, y)
grid_search.best_estimator_
grid_search.best_params_
grid_search.best_score_

类别型和数值型数据的处理

import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier

df = pd.dataframe()
numeric_features = ['salary', 'zone_count', 'staff_count']
categorical_features = ['rank', 'district']

categorical_feature_mask = df.dtypes == object
categorical_features = df.columns[categorical_feature_mask].tolist()
numeric_feature_mask = df.dtypes != object
numeric_features = df.columns[numeric_feature_mask].tolist()

categorical_transformer = Pipeline(steps=[
    ('onehot', OneHotEncoder(handle_unknown='ignore')),
])

numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

clf = Pipeline([
     ('preprocessor', preprocessor),
     ('clf', RandomForestClassifier())
])

参考链接:

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注