Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Preprocesamiento con Pyspark

A continuación, se procederá a replicar el flujo realizado con Scikit learn, pero utilizando Pyspark.

Primero, se configurará el entorno, para eso se definirá una variable que contendrá el directorio de descarga de Java 17, la versión compatible con Pyspark 4.1.1. Luego, se creará la sesión con Spark Session, el cuál podría entenderse como el ambiente desde el que se leerán los datos y se realizarán las transformaciones.

import os

os.environ["JAVA_HOME"] = "/opt/homebrew/Cellar/openjdk@17/17.0.18/libexec/openjdk.jdk/Contents/Home"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.ui.showConsoleProgress=false pyspark-shell"

print("JAVA_HOME:", os.environ["JAVA_HOME"])
JAVA_HOME: /opt/homebrew/Cellar/openjdk@17/17.0.18/libexec/openjdk.jdk/Contents/Home
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import warnings
warnings.filterwarnings('ignore')

spark = (SparkSession.builder
         .appName("LendingClub_Preprocessing")
         .master("local[10]")
         .config("spark.driver.memory", "12g")
         .config("spark.executor.memory", "12g")
         .config("spark.driver.maxResultSize", "6g")
         .config("spark.memory.fraction", "0.8")
         .config("spark.sql.shuffle.partitions", "10")
         .config("spark.ui.enabled", "false")
         .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")
print(f"SparkSession activa — versión Spark: {spark.version}")
Picked up JAVA_TOOL_OPTIONS: --add-opens=java.base/javax.security.auth=ALL-UNNAMED
Picked up JAVA_TOOL_OPTIONS: --add-opens=java.base/javax.security.auth=ALL-UNNAMED
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/03/20 13:26:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
SparkSession activa — versión Spark: 4.1.1

Lectura de datos y filtrado inicial

Ahora que ya tenemos la sesión activa, se comenzará con la lectura de los datos. Cabe resaltar que, al igual como se trabajó con Scikit-learn, se eliminarán inmediatamente las columnas que fueron quitadas por data leakage o por no aportar información relevante al modelo. Debido a eso, se verá el total de columnas reducido a 71.

CSV_PATH = "/Users/pctm/Documents/archive/accepted_2007_to_2018q4.csv/accepted_2007_to_2018Q4.csv"

leakage = ["recoveries", "collection_recovery_fee", "total_rec_prncp",
    "total_rec_int", "total_rec_late_fee", "last_pymnt_amnt",
    "last_pymnt_d", "next_pymnt_d", "total_pymnt", "total_pymnt_inv",
    "out_prncp", "out_prncp_inv", "debt_settlement_flag",
    "last_credit_pull_d", "issue_d", "last_fico_range_high",
    "last_fico_range_low"]

cols_basura = ['id', 'url', 'emp_title', 'title', 'pymnt_plan', 'hardship_flag']

df_full = (spark.read
           .option("header", "true")
           .option("inferSchema", "false")
           .csv(CSV_PATH))

total_filas = df_full.count()

nulos_por_col = df_full.select([
    (F.count(F.when(F.col(c).isNull() | (F.col(c) == ""), c)) / total_filas).alias(c)
    for c in df_full.columns
]).collect()[0].asDict()

cols_muchos_nulos = [c for c, pct in nulos_por_col.items() if pct > 0.25]
cols_eliminar     = set(cols_muchos_nulos + leakage + cols_basura)

SELECTED_FEATURES = [c for c in df_full.columns 
                     if c not in cols_eliminar and c != "loan_status"]

df_raw = df_full.select(SELECTED_FEATURES + ["loan_status"])

print(f"Columnas en df_raw                   : {len(df_raw.columns)}")
Columnas en df_raw                   : 71

A continuación, solamente se conservarán por los préstamos con resultado Fully Paid o Charged Off de la variable respuesta loan_status y se expresarán de forma binaria.

df_filtered = (df_raw
    .filter(F.col("loan_status").isin(["Fully Paid", "Charged Off"]))
    .withColumn("default",
        F.when(F.col("loan_status") == "Charged Off", 1).otherwise(0))
    .drop("loan_status"))

print(f"Filas tras filtrar: {df_filtered.count():,}")
df_filtered.groupBy("default").count().orderBy("default").show()
Filas tras filtrar: 1,345,309
+-------+-------+
|default|  count|
+-------+-------+
|      0|1076751|
|      1| 268558|
+-------+-------+

df_filtered.printSchema()
root
 |-- loan_amnt: string (nullable = true)
 |-- funded_amnt: string (nullable = true)
 |-- funded_amnt_inv: string (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- fico_range_low: string (nullable = true)
 |-- fico_range_high: string (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- open_acc: string (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- revol_bal: string (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- total_acc: string (nullable = true)
 |-- initial_list_status: string (nullable = true)
 |-- collections_12_mths_ex_med: string (nullable = true)
 |-- policy_code: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- acc_now_delinq: string (nullable = true)
 |-- tot_coll_amt: string (nullable = true)
 |-- tot_cur_bal: string (nullable = true)
 |-- total_rev_hi_lim: string (nullable = true)
 |-- acc_open_past_24mths: string (nullable = true)
 |-- avg_cur_bal: string (nullable = true)
 |-- bc_open_to_buy: string (nullable = true)
 |-- bc_util: string (nullable = true)
 |-- chargeoff_within_12_mths: string (nullable = true)
 |-- delinq_amnt: string (nullable = true)
 |-- mo_sin_old_il_acct: string (nullable = true)
 |-- mo_sin_old_rev_tl_op: string (nullable = true)
 |-- mo_sin_rcnt_rev_tl_op: string (nullable = true)
 |-- mo_sin_rcnt_tl: string (nullable = true)
 |-- mort_acc: string (nullable = true)
 |-- mths_since_recent_bc: string (nullable = true)
 |-- mths_since_recent_inq: string (nullable = true)
 |-- num_accts_ever_120_pd: string (nullable = true)
 |-- num_actv_bc_tl: string (nullable = true)
 |-- num_actv_rev_tl: string (nullable = true)
 |-- num_bc_sats: string (nullable = true)
 |-- num_bc_tl: string (nullable = true)
 |-- num_il_tl: string (nullable = true)
 |-- num_op_rev_tl: string (nullable = true)
 |-- num_rev_accts: string (nullable = true)
 |-- num_rev_tl_bal_gt_0: string (nullable = true)
 |-- num_sats: string (nullable = true)
 |-- num_tl_120dpd_2m: string (nullable = true)
 |-- num_tl_30dpd: string (nullable = true)
 |-- num_tl_90g_dpd_24m: string (nullable = true)
 |-- num_tl_op_past_12m: string (nullable = true)
 |-- pct_tl_nvr_dlq: string (nullable = true)
 |-- percent_bc_gt_75: string (nullable = true)
 |-- pub_rec_bankruptcies: string (nullable = true)
 |-- tax_liens: string (nullable = true)
 |-- tot_hi_cred_lim: string (nullable = true)
 |-- total_bal_ex_mort: string (nullable = true)
 |-- total_bc_limit: string (nullable = true)
 |-- total_il_high_credit_limit: string (nullable = true)
 |-- disbursement_method: string (nullable = true)
 |-- default: integer (nullable = false)

Después de este tratamiento inicial queda un total de 1.345.309 observaciones y 71 columnas.

Imputación de valores nulos

Antes de seguir con el procesamiento, es necesario hacer la imputación de datos faltantes en estas columnas. Para eso, se dividirán las variables en numéricas y categóricas y luego se impuatarán utilizando la medida descriptiva más acorde según el caso.

from pyspark.sql.types import StringType, DoubleType, IntegerType, LongType

TARGET_COL = "default"

categorical_cols = [
    f.name for f in df_filtered.schema.fields
    if isinstance(f.dataType, StringType) and f.name != TARGET_COL
]

numeric_cols = [
    f.name for f in df_filtered.schema.fields
    if isinstance(f.dataType, (DoubleType, IntegerType, LongType)) and f.name != TARGET_COL
]

print(f"Categóricas ({len(categorical_cols)}): {categorical_cols}")
print(f"\nNuméricas ({len(numeric_cols)}): {numeric_cols}")
Categóricas (70): ['loan_amnt', 'funded_amnt', 'funded_amnt_inv', 'term', 'int_rate', 'installment', 'grade', 'sub_grade', 'emp_length', 'home_ownership', 'annual_inc', 'verification_status', 'purpose', 'zip_code', 'addr_state', 'dti', 'delinq_2yrs', 'earliest_cr_line', 'fico_range_low', 'fico_range_high', 'inq_last_6mths', 'open_acc', 'pub_rec', 'revol_bal', 'revol_util', 'total_acc', 'initial_list_status', 'collections_12_mths_ex_med', 'policy_code', 'application_type', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'total_rev_hi_lim', 'acc_open_past_24mths', 'avg_cur_bal', 'bc_open_to_buy', 'bc_util', 'chargeoff_within_12_mths', 'delinq_amnt', 'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', 'mort_acc', 'mths_since_recent_bc', 'mths_since_recent_inq', 'num_accts_ever_120_pd', 'num_actv_bc_tl', 'num_actv_rev_tl', 'num_bc_sats', 'num_bc_tl', 'num_il_tl', 'num_op_rev_tl', 'num_rev_accts', 'num_rev_tl_bal_gt_0', 'num_sats', 'num_tl_120dpd_2m', 'num_tl_30dpd', 'num_tl_90g_dpd_24m', 'num_tl_op_past_12m', 'pct_tl_nvr_dlq', 'percent_bc_gt_75', 'pub_rec_bankruptcies', 'tax_liens', 'tot_hi_cred_lim', 'total_bal_ex_mort', 'total_bc_limit', 'total_il_high_credit_limit', 'disbursement_method']

Numéricas (0): []

En el momento que se hizo el cargue de datos, se configuró la lectura para que no infiriera tipos, es por eso que, al momento de utilizar la función de pyspark para intentar clasificar las columnas, se obtiene que todas las variables son categóricas.

Para solucionar eso, utilizaremos un proceso un poco más manual, que clasificará las columnas dependiendo del tipo de dato que contenga en el 99% de sus observaciones.

A continuación validaremos la integridad de las columnas numericas, para eso se va a buscar la presencia de valores no numéricos en las columnas clasificadas como numericas a ver si encontramos alguna columna que no haya sido clasificada correctamente

TARGET_COL = "default"

numeric_cols = []
categorical_cols = []

for c in [c for c in df_filtered.columns if c != TARGET_COL]:
    invalid = df_filtered.filter(
        F.expr(f"try_cast(`{c}` as double)").isNull() & 
        F.col(c).isNotNull()
    ).count()
    total_notnull = df_filtered.filter(F.col(c).isNotNull()).count()
    ratio_invalid = invalid / total_notnull if total_notnull > 0 else 1.0
    if ratio_invalid <= 0.01:
        numeric_cols.append(c)
        df_filtered = df_filtered.withColumn(c, F.expr(f"try_cast(`{c}` as double)"))
    else:
        categorical_cols.append(c)

print(f"Numéricas  ({len(numeric_cols)}): {numeric_cols}")
print(f"Categóricas ({len(categorical_cols)}): {categorical_cols}")
Numéricas  (57): ['loan_amnt', 'funded_amnt', 'funded_amnt_inv', 'int_rate', 'installment', 'annual_inc', 'dti', 'delinq_2yrs', 'fico_range_low', 'fico_range_high', 'inq_last_6mths', 'open_acc', 'pub_rec', 'revol_bal', 'revol_util', 'total_acc', 'collections_12_mths_ex_med', 'policy_code', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'total_rev_hi_lim', 'acc_open_past_24mths', 'avg_cur_bal', 'bc_open_to_buy', 'bc_util', 'chargeoff_within_12_mths', 'delinq_amnt', 'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', 'mort_acc', 'mths_since_recent_bc', 'mths_since_recent_inq', 'num_accts_ever_120_pd', 'num_actv_bc_tl', 'num_actv_rev_tl', 'num_bc_sats', 'num_bc_tl', 'num_il_tl', 'num_op_rev_tl', 'num_rev_accts', 'num_rev_tl_bal_gt_0', 'num_sats', 'num_tl_120dpd_2m', 'num_tl_30dpd', 'num_tl_90g_dpd_24m', 'num_tl_op_past_12m', 'pct_tl_nvr_dlq', 'percent_bc_gt_75', 'pub_rec_bankruptcies', 'tax_liens', 'tot_hi_cred_lim', 'total_bal_ex_mort', 'total_bc_limit', 'total_il_high_credit_limit']
Categóricas (13): ['term', 'grade', 'sub_grade', 'emp_length', 'home_ownership', 'verification_status', 'purpose', 'zip_code', 'addr_state', 'earliest_cr_line', 'initial_list_status', 'application_type', 'disbursement_method']

Después de este proceso, el resultado cambió considerablemente. Se encontraron 57 columnas numéricas y 13 columnas categóricas. Sin embargo, se debe tener en cuenta que el proceso OneHotEncoder convertirá cada categoría única en una variable de columna binaria. Si una variable tiene n categorías, se generarán n-1 columnas nuevas. Para mitigar la creación exagerada de columnas, se aplicará frecuency encoding a las variabkes que tengan más de 50 categorías únicas, esto es, reemplazar cada categoría por su frecuencia relativa en el dataset, convirtiendose así en una columna numérica, sin sobrecargar dimensionalmente el dataset.

cols_alta_cardinalidad = [c for c in categorical_cols 
                          if df_filtered.select(c).distinct().count() > 50]
cols_baja_cardinalidad = [c for c in categorical_cols 
                          if c not in cols_alta_cardinalidad]

print(f"Alta cardinalidad → frequency encoding ({len(cols_alta_cardinalidad)}): {cols_alta_cardinalidad}")
print(f"Baja cardinalidad → OHE               ({len(cols_baja_cardinalidad)}): {cols_baja_cardinalidad}")
Alta cardinalidad → frequency encoding (6): ['purpose', 'zip_code', 'addr_state', 'earliest_cr_line', 'initial_list_status', 'application_type']
Baja cardinalidad → OHE               (7): ['term', 'grade', 'sub_grade', 'emp_length', 'home_ownership', 'verification_status', 'disbursement_method']

Acá vemos que el proceso de frecuency encoding tendrá que ser aplicado a un total de 6 columnas.

Las variables numéricas serán imputadas con la mediana, mientras que las variables categóricas serán imputadas con la moda.

median_vals = {}
quantiles = df_filtered.approxQuantile(numeric_cols, [0.5], 0.01)
for col_name, vals in zip(numeric_cols, quantiles):
    median_vals[col_name] = float(vals[0]) if vals else 0.0

df_imputed = df_filtered.fillna(median_vals)

mode_vals = {}
for col_name in categorical_cols:
    mode_rows = (df_imputed
                 .groupBy(col_name)
                 .count()
                 .orderBy(F.desc("count"))
                 .filter(F.col(col_name).isNotNull())
                 .limit(2)
                 .collect())
    if mode_rows:
        mode_vals[col_name] = mode_rows[0][col_name]

for col_name, mode in mode_vals.items():
    df_imputed = df_imputed.fillna({col_name: mode})

print(f"Imputación completada.")
print(f"Numéricas imputadas  : {len(median_vals)}")
print(f"Categóricas imputadas: {len(mode_vals)}")
Imputación completada.
Numéricas imputadas  : 57
Categóricas imputadas: 13
total_filas = df_imputed.count()

for col_name in cols_alta_cardinalidad:
    freq_map = (df_imputed
                .groupBy(col_name)
                .count()
                .withColumn("freq", F.col("count") / total_filas)
                .select(col_name, "freq"))
    df_imputed = df_imputed.join(freq_map, on=col_name, how="left")
    df_imputed = df_imputed.drop(col_name).withColumnRenamed("freq", col_name)
    numeric_cols.append(col_name)

categorical_cols    = cols_baja_cardinalidad
indexer_output_cols = [f"{c}_idx" for c in categorical_cols]
encoder_output_cols = [f"{c}_ohe" for c in categorical_cols]

print(f"Frequency encoding aplicado a: {cols_alta_cardinalidad}")
print(f"\nNuméricas finales  ({len(numeric_cols)}): {numeric_cols}")
print(f"Categóricas finales ({len(categorical_cols)}): {categorical_cols}")
Frequency encoding aplicado a: ['purpose', 'zip_code', 'addr_state', 'earliest_cr_line', 'initial_list_status', 'application_type']

Numéricas finales  (63): ['loan_amnt', 'funded_amnt', 'funded_amnt_inv', 'int_rate', 'installment', 'annual_inc', 'dti', 'delinq_2yrs', 'fico_range_low', 'fico_range_high', 'inq_last_6mths', 'open_acc', 'pub_rec', 'revol_bal', 'revol_util', 'total_acc', 'collections_12_mths_ex_med', 'policy_code', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'total_rev_hi_lim', 'acc_open_past_24mths', 'avg_cur_bal', 'bc_open_to_buy', 'bc_util', 'chargeoff_within_12_mths', 'delinq_amnt', 'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', 'mort_acc', 'mths_since_recent_bc', 'mths_since_recent_inq', 'num_accts_ever_120_pd', 'num_actv_bc_tl', 'num_actv_rev_tl', 'num_bc_sats', 'num_bc_tl', 'num_il_tl', 'num_op_rev_tl', 'num_rev_accts', 'num_rev_tl_bal_gt_0', 'num_sats', 'num_tl_120dpd_2m', 'num_tl_30dpd', 'num_tl_90g_dpd_24m', 'num_tl_op_past_12m', 'pct_tl_nvr_dlq', 'percent_bc_gt_75', 'pub_rec_bankruptcies', 'tax_liens', 'tot_hi_cred_lim', 'total_bal_ex_mort', 'total_bc_limit', 'total_il_high_credit_limit', 'purpose', 'zip_code', 'addr_state', 'earliest_cr_line', 'initial_list_status', 'application_type']
Categóricas finales (7): ['term', 'grade', 'sub_grade', 'emp_length', 'home_ownership', 'verification_status', 'disbursement_method']

El frequency encoding reemplaza cada categoría por su frecuencia relativa en el dataset o sea la proporción de filas que tienen ese valor. Esto nos deja una sola columna numérica por variable en lugar de cientos de columnas binarias como haría el OHE. La ventaja es que el modelo igual recibe información sobre cada categoría pero sin explotar el tamaño del vector de features.

StringIndexer

El StringIndexer convierte cada variable categórica en un indice numérico según la frecuencia. Se hace necesario de incluir, pues OneHotEncoder requiere que todos los indices sean de tipo numérico.

from pyspark.ml.feature import StringIndexer

indexer_output_cols = [f"{c}_idx" for c in categorical_cols]

df_indexed = df_imputed

for col_name, out_name in zip(categorical_cols, indexer_output_cols):
    indexer = StringIndexer(
        inputCol=col_name,
        outputCol=out_name,
        handleInvalid="keep"
    )
    df_indexed = indexer.fit(df_indexed).transform(df_indexed)

print(f"StringIndexer aplicado a {len(categorical_cols)} variables")

StringIndexer aplicado a 7 variables
df_indexed.select(indexer_output_cols).show(7)
+--------+---------+-------------+--------------+------------------+-----------------------+-----------------------+
|term_idx|grade_idx|sub_grade_idx|emp_length_idx|home_ownership_idx|verification_status_idx|disbursement_method_idx|
+--------+---------+-------------+--------------+------------------+-----------------------+-----------------------+
|     0.0|      1.0|          6.0|           0.0|               0.0|                    2.0|                    0.0|
|     0.0|      1.0|          0.0|           0.0|               0.0|                    2.0|                    0.0|
|     1.0|      0.0|          1.0|           0.0|               0.0|                    2.0|                    0.0|
|     1.0|      5.0|         25.0|           3.0|               0.0|                    0.0|                    0.0|
|     0.0|      1.0|          5.0|           6.0|               1.0|                    0.0|                    0.0|
|     0.0|      0.0|          7.0|           0.0|               0.0|                    2.0|                    0.0|
|     0.0|      0.0|          8.0|           0.0|               0.0|                    2.0|                    0.0|
+--------+---------+-------------+--------------+------------------+-----------------------+-----------------------+
only showing top 7 rows

Ahora, las columnas term_idx, grade_idx, sub_grade_idx, emp_length_idx, home_ownership_idx, verification_status_idx, disbursement_method_idx muestran la versión numérica de las variables categóricas. Cada valor corresponde a un índice asignado según la frecuencia de aparición de cada categoría, 0.0 es la categoría más frecuente y 1.0 a la siguiente y así sucesivamente.

OneHotEncoder

El OneHotEncoder ayudará a convertir los indices numericos en vectores binarios.

from pyspark.ml.feature import OneHotEncoder

encoder_output_cols = [f"{c}_ohe" for c in categorical_cols]

encoder = OneHotEncoder(
    inputCols=indexer_output_cols,
    outputCols=encoder_output_cols,
)

df_encoded = encoder.fit(df_indexed).transform(df_indexed)

print(f"OneHotEncoder aplicado a {len(categorical_cols)} variables")
df_encoded.select(encoder_output_cols).show(7, truncate=50)
OneHotEncoder aplicado a 7 variables
+-------------+-------------+---------------+--------------+------------------+-----------------------+-----------------------+
|     term_ohe|    grade_ohe|  sub_grade_ohe|emp_length_ohe|home_ownership_ohe|verification_status_ohe|disbursement_method_ohe|
+-------------+-------------+---------------+--------------+------------------+-----------------------+-----------------------+
|(2,[0],[1.0])|(7,[1],[1.0])| (35,[6],[1.0])|(11,[0],[1.0])|     (6,[0],[1.0])|          (3,[2],[1.0])|          (2,[0],[1.0])|
|(2,[0],[1.0])|(7,[1],[1.0])| (35,[0],[1.0])|(11,[0],[1.0])|     (6,[0],[1.0])|          (3,[2],[1.0])|          (2,[0],[1.0])|
|(2,[1],[1.0])|(7,[0],[1.0])| (35,[1],[1.0])|(11,[0],[1.0])|     (6,[0],[1.0])|          (3,[2],[1.0])|          (2,[0],[1.0])|
|(2,[1],[1.0])|(7,[5],[1.0])|(35,[25],[1.0])|(11,[3],[1.0])|     (6,[0],[1.0])|          (3,[0],[1.0])|          (2,[0],[1.0])|
|(2,[0],[1.0])|(7,[1],[1.0])| (35,[5],[1.0])|(11,[6],[1.0])|     (6,[1],[1.0])|          (3,[0],[1.0])|          (2,[0],[1.0])|
|(2,[0],[1.0])|(7,[0],[1.0])| (35,[7],[1.0])|(11,[0],[1.0])|     (6,[0],[1.0])|          (3,[2],[1.0])|          (2,[0],[1.0])|
|(2,[0],[1.0])|(7,[0],[1.0])| (35,[8],[1.0])|(11,[0],[1.0])|     (6,[0],[1.0])|          (3,[2],[1.0])|          (2,[0],[1.0])|
+-------------+-------------+---------------+--------------+------------------+-----------------------+-----------------------+
only showing top 7 rows

Tras aplicar OneHotEncoder a las variables categóricas term_idx, grade_idx, sub_grade_idx, emp_length_idx, home_ownership_idx, verification_status_idx, disbursement_method_idx, cada columna se convirtió en un vector binario que indica qué categoría está presente en cada fila. Cada vector muestra la categoría activa con un 1.0, mientras que las demás categorías se representan implícitamente como 0.

Verificación de problemas de conteo

Antes de crear el vector único, verificaremos si alguna de las columnas numéricas tiene problemas de conteo.

from pyspark.sql.functions import count, when, isnan

print("Verificación de nulos e infinitos en columnas numéricas:")
df_encoded.select([
    F.count(F.when(F.col(c).isNull() | F.isnan(c), c)).alias(c)
    for c in numeric_cols
]).show()
Verificación de nulos e infinitos en columnas numéricas:
+---------+-----------+---------------+--------+-----------+----------+---+-----------+--------------+---------------+--------------+--------+-------+---------+----------+---------+--------------------------+-----------+--------------+------------+-----------+----------------+--------------------+-----------+--------------+-------+------------------------+-----------+------------------+--------------------+---------------------+--------------+--------+--------------------+---------------------+---------------------+--------------+---------------+-----------+---------+---------+-------------+-------------+-------------------+--------+----------------+------------+------------------+------------------+--------------+----------------+--------------------+---------+---------------+-----------------+--------------+--------------------------+-------+--------+----------+----------------+-------------------+----------------+
|loan_amnt|funded_amnt|funded_amnt_inv|int_rate|installment|annual_inc|dti|delinq_2yrs|fico_range_low|fico_range_high|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|collections_12_mths_ex_med|policy_code|acc_now_delinq|tot_coll_amt|tot_cur_bal|total_rev_hi_lim|acc_open_past_24mths|avg_cur_bal|bc_open_to_buy|bc_util|chargeoff_within_12_mths|delinq_amnt|mo_sin_old_il_acct|mo_sin_old_rev_tl_op|mo_sin_rcnt_rev_tl_op|mo_sin_rcnt_tl|mort_acc|mths_since_recent_bc|mths_since_recent_inq|num_accts_ever_120_pd|num_actv_bc_tl|num_actv_rev_tl|num_bc_sats|num_bc_tl|num_il_tl|num_op_rev_tl|num_rev_accts|num_rev_tl_bal_gt_0|num_sats|num_tl_120dpd_2m|num_tl_30dpd|num_tl_90g_dpd_24m|num_tl_op_past_12m|pct_tl_nvr_dlq|percent_bc_gt_75|pub_rec_bankruptcies|tax_liens|tot_hi_cred_lim|total_bal_ex_mort|total_bc_limit|total_il_high_credit_limit|purpose|zip_code|addr_state|earliest_cr_line|initial_list_status|application_type|
+---------+-----------+---------------+--------+-----------+----------+---+-----------+--------------+---------------+--------------+--------+-------+---------+----------+---------+--------------------------+-----------+--------------+------------+-----------+----------------+--------------------+-----------+--------------+-------+------------------------+-----------+------------------+--------------------+---------------------+--------------+--------+--------------------+---------------------+---------------------+--------------+---------------+-----------+---------+---------+-------------+-------------+-------------------+--------+----------------+------------+------------------+------------------+--------------+----------------+--------------------+---------+---------------+-----------------+--------------+--------------------------+-------+--------+----------+----------------+-------------------+----------------+
|        0|          0|              0|       0|          0|         0|  0|          0|             0|              0|             0|       0|      0|        0|         0|        0|                         0|          0|             0|           0|          0|               0|                   0|          0|             0|      0|                       0|          0|                 0|                   0|                    0|             0|       0|                   0|                    0|                    0|             0|              0|          0|        0|        0|            0|            0|                  0|       0|               0|           0|                 0|                 0|             0|               0|                   0|        0|              0|                0|             0|                         0|      0|       0|         0|               0|                  0|               0|
+---------+-----------+---------------+--------+-----------+----------+---+-----------+--------------+---------------+--------------+--------+-------+---------+----------+---------+--------------------------+-----------+--------------+------------+-----------+----------------+--------------------+-----------+--------------+-------+------------------------+-----------+------------------+--------------------+---------------------+--------------+--------+--------------------+---------------------+---------------------+--------------+---------------+-----------+---------+---------+-------------+-------------+-------------------+--------+----------------+------------+------------------+------------------+--------------+----------------+--------------------+---------+---------------+-----------------+--------------+--------------------------+-------+--------+----------+----------------+-------------------+----------------+

Como no tenemos ni nulos ni infinitos, se puede proceder a la construcción del VectorAssembler.

Vector Assemble

Ahora, la herramienta VectorAssembler ayudará a convertir todas las columnas en un unico vector, formato requerido para entrenar el modelo.

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=numeric_cols + encoder_output_cols,
    outputCol="features_raw",
    handleInvalid="keep"
)

df_assembled = assembler.transform(df_encoded)

print(f"VectorAssembler: {len(numeric_cols + encoder_output_cols)} inputs → 'features_raw'")
df_assembled.select("features_raw").show(3, truncate=80)
VectorAssembler: 70 inputs → 'features_raw'
+--------------------------------------------------------------------------------+
|                                                                    features_raw|
+--------------------------------------------------------------------------------+
|(129,[0,1,2,3,4,5,6,8,9,11,13,14,15,17,19,20,21,22,23,24,25,28,29,30,31,32,33...|
|(129,[0,1,2,3,4,5,6,8,9,10,11,12,13,14,15,17,20,21,22,23,24,25,28,29,30,31,32...|
|(129,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,17,20,21,22,23,24,25,28,29,30,31,...|
+--------------------------------------------------------------------------------+
only showing top 3 rows

El Vector Assembler nos regresó un vector de tamaño 129, las primeras posiciones corresponden a las columnas numéricas y las últimas a los vectores de las variables categóricas en el mismo orden en el que se le pasaron.

df_assembled.select("features_raw", TARGET_COL).printSchema()
root
 |-- features_raw: vector (nullable = true)
 |-- default: integer (nullable = false)

División estratificada

Ahora, se procederá a realizar una división estratificada separando por clases y dividiendo cada conjunto con RandomSplit, el objetivo es conservar la proporción de la clase default en todos los conjuntos.

SEED = 42

train_0, test_0 = df_assembled.filter(F.col(TARGET_COL) == 0).randomSplit([0.8, 0.2], seed=SEED)
train_1, test_1 = df_assembled.filter(F.col(TARGET_COL) == 1).randomSplit([0.8, 0.2], seed=SEED)

df_train = train_0.union(train_1)
df_test  = test_0.union(test_1)

print(f"Train: {df_train.count():,} filas")
print(f"Test : {df_test.count():,} filas")
print("\nDistribución en TRAIN:")
df_train.groupBy(TARGET_COL).count().orderBy(TARGET_COL).show()
print("Distribución en TEST:")
df_test.groupBy(TARGET_COL).count().orderBy(TARGET_COL).show()
Train: 1,075,822 filas
Test : 269,487 filas

Distribución en TRAIN:
+-------+------+
|default| count|
+-------+------+
|      0|861034|
|      1|214788|
+-------+------+

Distribución en TEST:
+-------+------+
|default| count|
+-------+------+
|      0|215717|
|      1| 53770|
+-------+------+

Los datos quedaron divididos en una proporción 80/20, con 1,075,822 filas para entrenamiento y 214,768 para prueba. De la misma forma, verificaremos que la proporción de clases se mantuvo consistente en ambos conjuntos, en train hay 861,034 casos de Fully Paid (80%) y 214,788 de Charged Off (20%), mientras que en test hay 215,940 (80%) y 53,770 (20%) respectivamente. Esto confirma que la división estratificada funcionó correctamente: el desbalance original del dataset (~80/20) se preservó en ambas particiones, lo que garantiza que el modelo se entrena y evalúa sobre distribuciones de clases representativas.

Standard scaler

Ahora, que ya se tiene el vector, se procederá a estandarizar a una media 0 y desviación estándar 1.

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withMean=True,
    withStd=True
)

scaler_model = scaler.fit(df_train)

df_train_ready = scaler_model.transform(df_train).select("features", TARGET_COL)
df_test_ready  = scaler_model.transform(df_test).select("features", TARGET_COL)

print(f"Train listo: {df_train_ready.count():,} filas")
print(f"Test listo : {df_test_ready.count():,} filas")
print("\nEjemplo vector escalado (primera fila):")
df_train_ready.select("features").show(1, truncate=80)
Train listo: 1,075,822 filas
Test listo : 269,487 filas

Ejemplo vector escalado (primera fila):
+--------------------------------------------------------------------------------+
|                                                                        features|
+--------------------------------------------------------------------------------+
|[-1.5389830104560098,-1.538709687283121,-1.544309132494526,0.2281045640084307...|
+--------------------------------------------------------------------------------+
only showing top 1 row

Cada feature fue centrada en 0 y escalada a desviación estándar 1, equivalente al StandardScaler de scikit-learn. Los valores negativos indican que esa observación está por debajo de la media de esa feature, y los positivos que está por encima.

Construcción del modelo

Se utilizará primero, RandomForestClassifier, esto con el objetivo de iniciar la búsqueda de hiperparámetros. Esta será realizada manualmente iterando sobre combinaciones, equivalente al GridSearchCV de scikit-learn. Se consultó adicionalmente como optimizar la duración de este proceso y encontró que el paralelismo interno de Spark distribuye el entrenamiento de los árboles entre los cores disponibles automáticamente cuando se configura como se presenta en el codigo.

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import time
import itertools

param_grid = {
    "numTrees": [10, 50, 100],
    "maxDepth": [5, 10, 15],
    "minInstancesPerNode": [1, 5]
}

N_FOLDS = 2
seeds = [42, 84]

combinaciones = list(itertools.product(
    param_grid["numTrees"],
    param_grid["maxDepth"],
    param_grid["minInstancesPerNode"]
))

print(f"Total combinaciones a probar: {len(combinaciones)}")
for c in combinaciones:
    print(f"  numTrees={c[0]}, maxDepth={c[1]}, minInstancesPerNode={c[2]}")
Total combinaciones a probar: 18
  numTrees=10, maxDepth=5, minInstancesPerNode=1
  numTrees=10, maxDepth=5, minInstancesPerNode=5
  numTrees=10, maxDepth=10, minInstancesPerNode=1
  numTrees=10, maxDepth=10, minInstancesPerNode=5
  numTrees=10, maxDepth=15, minInstancesPerNode=1
  numTrees=10, maxDepth=15, minInstancesPerNode=5
  numTrees=50, maxDepth=5, minInstancesPerNode=1
  numTrees=50, maxDepth=5, minInstancesPerNode=5
  numTrees=50, maxDepth=10, minInstancesPerNode=1
  numTrees=50, maxDepth=10, minInstancesPerNode=5
  numTrees=50, maxDepth=15, minInstancesPerNode=1
  numTrees=50, maxDepth=15, minInstancesPerNode=5
  numTrees=100, maxDepth=5, minInstancesPerNode=1
  numTrees=100, maxDepth=5, minInstancesPerNode=5
  numTrees=100, maxDepth=10, minInstancesPerNode=1
  numTrees=100, maxDepth=10, minInstancesPerNode=5
  numTrees=100, maxDepth=15, minInstancesPerNode=1
  numTrees=100, maxDepth=15, minInstancesPerNode=5

Cross Validation manual

PySpark no tiene StratifiedKFold nativo, por lo tanto, se replicrá el comportamiento usando randomSplit para generar los folds manualmente.

spark.conf.set("spark.default.parallelism", "10")
spark.conf.set("spark.sql.shuffle.partitions", "10")

df_train_ready.cache()
df_train_ready.count()
print("Cache listo.")

evaluator_auc = BinaryClassificationEvaluator(
    labelCol=TARGET_COL,
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

def entrenar_fold(df_train_ready, combinaciones_subset, seeds, N_FOLDS, evaluator_auc, offset=0):
    resultados_parciales = []
    TOTAL = len(combinaciones)
    tiempo_global = time.time()

    for combo_idx, (num_trees, max_depth, min_inst) in enumerate(combinaciones_subset, offset + 1):
        fold_aucs = []
        tiempo_inicio = time.time()

        print(f"\n[{combo_idx}/{TOTAL}] numTrees={num_trees}, maxDepth={max_depth}, minInst={min_inst}")

        for fold_idx, seed in enumerate(seeds):
            print(f"  fold {fold_idx+1}/{N_FOLDS}...", end=" ", flush=True)

            df_folds = df_train_ready.randomSplit([1.0 / N_FOLDS] * N_FOLDS, seed=seed)
            df_val_fold   = df_folds[fold_idx]
            df_train_fold = df_folds[0] if fold_idx != 0 else df_folds[1]
            for i in range(N_FOLDS):
                if i != fold_idx and i != (0 if fold_idx != 0 else 1):
                    df_train_fold = df_train_fold.union(df_folds[i])

            rf = RandomForestClassifier(
                labelCol=TARGET_COL,
                featuresCol="features",
                numTrees=num_trees,
                maxDepth=max_depth,
                minInstancesPerNode=min_inst,
                seed=42
            )

            modelo = rf.fit(df_train_fold)
            predicciones = modelo.transform(df_val_fold)
            auc = evaluator_auc.evaluate(predicciones)
            fold_aucs.append(auc)
            print(f"AUC={auc:.4f}")

        tiempo_total = time.time() - tiempo_inicio
        auc_medio = sum(fold_aucs) / len(fold_aucs)
        elapsed = time.time() - tiempo_global
        restante = (elapsed / (combo_idx - offset)) * (TOTAL - combo_idx)

        resultados_parciales.append({
            "numTrees": num_trees,
            "maxDepth": max_depth,
            "minInstancesPerNode": min_inst,
            "auc_medio": auc_medio,
            "fold_aucs": fold_aucs,
            "tiempo_seg": round(tiempo_total, 2)
        })

        print(f"  → AUC medio={auc_medio:.4f} | tiempo={tiempo_total:.1f}s | "
              f"completado={combo_idx}/{TOTAL} | restante≈{restante/60:.1f} min")

    return resultados_parciales

resultados = entrenar_fold(
    df_train_ready,
    combinaciones,
    seeds, N_FOLDS, evaluator_auc, offset=0
)

print(f"\nTotal combinaciones completadas: {len(resultados)}")
for r in resultados:
    print(f"  numTrees={r['numTrees']}, maxDepth={r['maxDepth']}, "
          f"minInst={r['minInstancesPerNode']} → AUC={r['auc_medio']:.4f}")
Cache listo.

[1/18] numTrees=10, maxDepth=5, minInst=1
  fold 1/2... AUC=0.6891
  fold 2/2... AUC=0.6863
  → AUC medio=0.6877 | tiempo=45.2s | completado=1/18 | restante≈12.8 min

[2/18] numTrees=10, maxDepth=5, minInst=5
  fold 1/2... AUC=0.6889
  fold 2/2... AUC=0.6889
  → AUC medio=0.6889 | tiempo=47.9s | completado=2/18 | restante≈12.4 min

[3/18] numTrees=10, maxDepth=10, minInst=1
  fold 1/2... AUC=0.7064
  fold 2/2... AUC=0.7059
  → AUC medio=0.7062 | tiempo=50.0s | completado=3/18 | restante≈11.9 min

[4/18] numTrees=10, maxDepth=10, minInst=5
  fold 1/2... AUC=0.7063
  fold 2/2... AUC=0.7056
  → AUC medio=0.7060 | tiempo=50.1s | completado=4/18 | restante≈11.3 min

[5/18] numTrees=10, maxDepth=15, minInst=1
  fold 1/2... AUC=0.7037
  fold 2/2... AUC=0.7031
  → AUC medio=0.7034 | tiempo=86.1s | completado=5/18 | restante≈12.1 min

[6/18] numTrees=10, maxDepth=15, minInst=5
  fold 1/2... AUC=0.7062
  fold 2/2... AUC=0.7057
  → AUC medio=0.7059 | tiempo=88.9s | completado=6/18 | restante≈12.3 min

[7/18] numTrees=50, maxDepth=5, minInst=1
  fold 1/2... AUC=0.6973
  fold 2/2... AUC=0.6976
  → AUC medio=0.6974 | tiempo=57.8s | completado=7/18 | restante≈11.2 min

[8/18] numTrees=50, maxDepth=5, minInst=5
  fold 1/2... AUC=0.6969
  fold 2/2... AUC=0.6970
  → AUC medio=0.6970 | tiempo=56.3s | completado=8/18 | restante≈10.0 min

[9/18] numTrees=50, maxDepth=10, minInst=1
  fold 1/2... AUC=0.7099
  fold 2/2... AUC=0.7094
  → AUC medio=0.7097 | tiempo=102.2s | completado=9/18 | restante≈9.7 min

[10/18] numTrees=50, maxDepth=10, minInst=5
  fold 1/2... AUC=0.7098
  fold 2/2... AUC=0.7090
  → AUC medio=0.7094 | tiempo=105.7s | completado=10/18 | restante≈9.2 min

[11/18] numTrees=50, maxDepth=15, minInst=1
  fold 1/2... [1765.917s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 2007.0 (TID 15632): Retried waiting for GCLocker too often allocating 11266 words
AUC=0.7143
  fold 2/2... AUC=0.7134
  → AUC medio=0.7139 | tiempo=290.8s | completado=11/18 | restante≈10.4 min

[12/18] numTrees=50, maxDepth=15, minInst=5
  fold 1/2... AUC=0.7145
  fold 2/2... AUC=0.7143
  → AUC medio=0.7144 | tiempo=284.3s | completado=12/18 | restante≈10.5 min

[13/18] numTrees=100, maxDepth=5, minInst=1
  fold 1/2... AUC=0.6978
  fold 2/2... AUC=0.6972
  → AUC medio=0.6975 | tiempo=78.1s | completado=13/18 | restante≈8.6 min

[14/18] numTrees=100, maxDepth=5, minInst=5
  fold 1/2... AUC=0.6983
  fold 2/2... AUC=0.6970
  → AUC medio=0.6976 | tiempo=80.0s | completado=14/18 | restante≈6.8 min

[15/18] numTrees=100, maxDepth=10, minInst=1
  fold 1/2... AUC=0.7100
  fold 2/2... AUC=0.7096
  → AUC medio=0.7098 | tiempo=177.8s | completado=15/18 | restante≈5.3 min

[16/18] numTrees=100, maxDepth=10, minInst=5
  fold 1/2... AUC=0.7101
  fold 2/2... AUC=0.7093
  → AUC medio=0.7097 | tiempo=180.6s | completado=16/18 | restante≈3.7 min

[17/18] numTrees=100, maxDepth=15, minInst=1
  fold 1/2... AUC=0.7156
  fold 2/2... [3266.267s][warning][gc,alloc] Executor task launch worker for task 0.0 in stage 2796.0 (TID 24089): Retried waiting for GCLocker too often allocating 17512 words
[3266.302s][warning][gc,alloc] Executor task launch worker for task 8.0 in stage 2796.0 (TID 24097): Retried waiting for GCLocker too often allocating 71657 words
AUC=0.7150
  → AUC medio=0.7153 | tiempo=573.4s | completado=17/18 | restante≈2.3 min

[18/18] numTrees=100, maxDepth=15, minInst=5
  fold 1/2... [3463.615s][warning][gc,alloc] Executor task launch worker for task 13.0 in stage 2864.0 (TID 24844): Retried waiting for GCLocker too often allocating 30307 words
[3463.615s][warning][gc,alloc] Executor task launch worker for task 3.0 in stage 2864.0 (TID 24834): Retried waiting for GCLocker too often allocating 14547 words
AUC=0.7162
  fold 2/2... AUC=0.7153
  → AUC medio=0.7158 | tiempo=565.5s | completado=18/18 | restante≈0.0 min

Total combinaciones completadas: 18
  numTrees=10, maxDepth=5, minInst=1 → AUC=0.6877
  numTrees=10, maxDepth=5, minInst=5 → AUC=0.6889
  numTrees=10, maxDepth=10, minInst=1 → AUC=0.7062
  numTrees=10, maxDepth=10, minInst=5 → AUC=0.7060
  numTrees=10, maxDepth=15, minInst=1 → AUC=0.7034
  numTrees=10, maxDepth=15, minInst=5 → AUC=0.7059
  numTrees=50, maxDepth=5, minInst=1 → AUC=0.6974
  numTrees=50, maxDepth=5, minInst=5 → AUC=0.6970
  numTrees=50, maxDepth=10, minInst=1 → AUC=0.7097
  numTrees=50, maxDepth=10, minInst=5 → AUC=0.7094
  numTrees=50, maxDepth=15, minInst=1 → AUC=0.7139
  numTrees=50, maxDepth=15, minInst=5 → AUC=0.7144
  numTrees=100, maxDepth=5, minInst=1 → AUC=0.6975
  numTrees=100, maxDepth=5, minInst=5 → AUC=0.6976
  numTrees=100, maxDepth=10, minInst=1 → AUC=0.7098
  numTrees=100, maxDepth=10, minInst=5 → AUC=0.7097
  numTrees=100, maxDepth=15, minInst=1 → AUC=0.7153
  numTrees=100, maxDepth=15, minInst=5 → AUC=0.7158

Después de finalizado el entrenamiento en aproximadamente 2900s, se encontró que el parámetro más influyente es maxDepth, por ejemplo pasar de un maxDepth de 5 a 10 mejora el AUC en 0.018 puntos, en promedio. Por otro lado, numTrees tiene un efecto moderado, pues con un maxDepth de 15, el pasar de 10 a 50 árboles mejoró el AUC de 0.7034 a 0.7139. Finalmente, minInstancesPerNode tiene el menor impacto, pues las diferencias suelen ser mínimas entre si en casi todas las combinaciones

Mejor combinación de hiperparámetros

Se selecciona la combinación con mayor AUC promedio a través de los folds.

resultados_mejor = [
    {"numTrees": 100, "maxDepth": 15, "minInstancesPerNode": 5, 
     "auc_medio": 0.7158, "fold_aucs": [0.7162, 0.7153], "tiempo_seg": 565.51}
]

mejor = resultados_mejor[0]

print("=" * 50)
print("Mejor combinación de hiperparámetros:")
print(f"  numTrees            : {mejor['numTrees']}")
print(f"  maxDepth            : {mejor['maxDepth']}")
print(f"  minInstancesPerNode : {mejor['minInstancesPerNode']}")
print(f"  AUC medio (CV)      : {mejor['auc_medio']:.4f}")
print(f"  AUC por fold        : {[round(a, 4) for a in mejor['fold_aucs']]}")
print(f"  Tiempo total        : {mejor['tiempo_seg']}s")
==================================================
Mejor combinación de hiperparámetros:
  numTrees            : 100
  maxDepth            : 15
  minInstancesPerNode : 5
  AUC medio (CV)      : 0.7158
  AUC por fold        : [0.7162, 0.7153]
  Tiempo total        : 565.51s

El hecho de que el mejor modelo sea el de mayor complejidad, sugiere que el dataset tiene patrones suficientemente complejos para que árboles profundos y en mayor cantidad capturen mejor las relaciones entre las variables. Este modelo tiene un AUC medio de 0.71 con muy poca variación entre folds, mostrando estabilidad y poca dependencia a la partición de los datos.

Entrenamiento final con mejores hiperparámetros

Con los mejores hiperparámetros encontrados se entrena el modelo final. Este es el modelo que se evaluará sobre el conjunto de prueba, siguiendo el mismo principio que en scikit-learn donde el modelo final se re-entrena sobre todos los datos de entrenamiento.

spark.conf.set("spark.default.parallelism", "10")
spark.conf.set("spark.sql.shuffle.partitions", "10")
inicio_final = time.time()

rf_final = RandomForestClassifier(
    labelCol=TARGET_COL,
    featuresCol="features",
    numTrees=mejor["numTrees"],
    maxDepth=mejor["maxDepth"],
    minInstancesPerNode=mejor["minInstancesPerNode"],
    seed=42
)

modelo_final = rf_final.fit(df_train_ready)
tiempo_entrenamiento = time.time() - inicio_final

print(f"Modelo final entrenado en {tiempo_entrenamiento:.2f}s")
print(f"Número de árboles: {modelo_final.getNumTrees}")
Modelo final entrenado en 362.45s
Número de árboles: 100

El modelo final se entrenó en 362s con un número de 100 árboles con profundidad máxima de 15 y minimo con 5 instancias por módulo

Predicciones y métricas en test

Se evalúa el modelo final sobre df_test_ready usando AUC, Accuracy y F1-score, las mismas métricas usadas en la parte de scikit-learn para permitir comparación directa.

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import time

evaluator_auc = BinaryClassificationEvaluator(
    labelCol=TARGET_COL,
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

inicio_pred = time.time()
predicciones_test = modelo_final.transform(df_test_ready)
tiempo_prediccion = time.time() - inicio_pred

auc_test = evaluator_auc.evaluate(predicciones_test)

evaluator_acc = MulticlassClassificationEvaluator(
    labelCol=TARGET_COL,
    predictionCol="prediction",
    metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol=TARGET_COL,
    predictionCol="prediction",
    metricName="f1"
)
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol=TARGET_COL,
    predictionCol="prediction",
    metricName="weightedRecall"
)

accuracy_test = evaluator_acc.evaluate(predicciones_test)
f1_test       = evaluator_f1.evaluate(predicciones_test)
recall_test   = evaluator_recall.evaluate(predicciones_test)

print(f"Tiempo de predicción : {tiempo_prediccion:.2f}s")
print(f"AUC (test)           : {auc_test:.4f}")
print(f"Accuracy (test)      : {accuracy_test:.4f}")
print(f"F1-score (test)      : {f1_test:.4f}")
print(f"Recall (test)        : {recall_test:.4f}")
Tiempo de predicción : 1.17s
AUC (test)           : 0.7676
Accuracy (test)      : 0.8117
F1-score (test)      : 0.7432
Recall (test)        : 0.8117

El modelo funcionó bien con un AUC de 0.7676 lo que significa que en 7 de cada 10 casos el modelo logra distinguir correctamente entre un préstamo que se va a pagar y uno que no. El accuracy de 81% suena alto pero hay que tener en cuenta que el dataset ya tiene 80% de préstamos pagados. El F1 de 0.74 es la métrica que mejor resume el desempeño real del modelo y es un resultado aceptable para un problema financiero de este tipo.

Matriz de confusión

PySpark no tiene una función directa para la matriz de confusión como scikit-learn, por lo que se construye manualmente agrupando las predicciones por clase real y predicha.

from pyspark.sql.functions import col

matriz = (predicciones_test
    .groupBy(TARGET_COL, "prediction")
    .count()
    .orderBy(TARGET_COL, "prediction"))

matriz.show()

matriz_collected = matriz.collect()

TP = next((r["count"] for r in matriz_collected if r[TARGET_COL]==1 and r["prediction"]==1.0), 0)
TN = next((r["count"] for r in matriz_collected if r[TARGET_COL]==0 and r["prediction"]==0.0), 0)
FP = next((r["count"] for r in matriz_collected if r[TARGET_COL]==0 and r["prediction"]==1.0), 0)
FN = next((r["count"] for r in matriz_collected if r[TARGET_COL]==1 and r["prediction"]==0.0), 0)

precision = TP / (TP + FP) if (TP + FP) > 0 else 0
recall    = TP / (TP + FN) if (TP + FN) > 0 else 0
f1_manual = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

print(f"\nMatriz de confusión:")
print(f"           Pred 0    Pred 1")
print(f"Real 0:    {TN:<8}  {FP:<8}")
print(f"Real 1:    {FN:<8}  {TP:<8}")
print(f"\nPrecision : {precision:.4f}")
print(f"Recall    : {recall:.4f}")
print(f"F1 manual : {f1_manual:.4f}")
+-------+----------+------+
|default|prediction| count|
+-------+----------+------+
|      0|       0.0|214720|
|      0|       1.0|   997|
|      1|       0.0| 49749|
|      1|       1.0|  4021|
+-------+----------+------+


Matriz de confusión:
           Pred 0    Pred 1
Real 0:    214297    929     
Real 1:    49759     4035    

Precision : 0.8129
Recall    : 0.0750
F1 manual : 0.1373

El modelo casi nunca se atreve a predecir que no se pagará, de los 53,770 préstamos que realmente no se pagaron solo detectó correctamente unos 4,000 lo que da un recall de apenas 7.5%. Cuando sí predice default acierta el 81% de las veces pero el problema es que lo hace muy pocas veces. Esto es el efecto típico del desbalance de clases donde el modelo aprende a predecir casi siempre “Fully Paid”.