HomeSample Page

Sample Page Title


On this tutorial, we design an end-to-end, production-style analytics and modeling pipeline utilizing Vaex to function effectively on thousands and thousands of rows with out materializing knowledge in reminiscence. We generate a sensible, large-scale dataset, engineer wealthy behavioral and city-level options utilizing lazy expressions and approximate statistics, and combination insights at scale. We then combine Vaex with scikit-learn to coach and consider a predictive mannequin, demonstrating how Vaex can act because the spine for high-performance exploratory evaluation and machine-learning workflows.

!pip -q set up "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"


import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score


print("Python:", __import__("sys").model.cut up()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)


rng = np.random.default_rng(7)


n = 2_000_000
cities = np.array(["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
metropolis = rng.alternative(cities, dimension=n, change=True, p=np.array([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, dimension=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, dimension=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, dimension=n).astype("int32")
base_income = rng.lognormal(imply=10.6, sigma=0.45, dimension=n).astype("float64")
city_mult = pd.Collection({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec Metropolis":0.88,"Winnipeg":0.90}).reindex(metropolis).to_numpy()
earnings = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimal(tenure_m,120))).astype("float64")
earnings = np.clip(earnings, 18_000, 420_000)


noise = rng.regular(0, 1, dimension=n).astype("float64")
score_latent = (
   0.55*np.log1p(earnings/1000.0)
   + 0.28*np.log1p(tx)
   + 0.18*np.sqrt(np.most(tenure_m,0)/12.0 + 1e-9)
   - 0.012*(age-40)
   + 0.22*(metropolis == "Vancouver").astype("float64")
   + 0.15*(metropolis == "Toronto").astype("float64")
   + 0.10*(metropolis == "Ottawa").astype("float64")
   + 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
goal = (rng.random(n) < p).astype("int8")


df = vaex.from_arrays(metropolis=metropolis, age=age, tenure_m=tenure_m, tx=tx, earnings=earnings, goal=goal)


df["income_k"] = df.earnings / 1000.0
df["tenure_y"] = df.tenure_m / 12.0
df["log_income"] = df.earnings.log1p()
df["tx_per_year"] = df.tx / (df.tenure_y + 0.25)
df["value_score"] = (0.35*df.log_income + 0.20*df.tx_per_year + 0.10*df.tenure_y - 0.015*df.age)
df["is_new"] = df.tenure_m < 6
df["is_senior"] = df.age >= 60


print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))

We generate a big, life like artificial dataset and initialize a Vaex DataFrame to work lazily on thousands and thousands of rows. We engineer core numerical options immediately as expressions so no intermediate knowledge is materialized. We validate the setup by inspecting schema, row counts, and a small pattern of computed values.

encoder = vaex.ml.LabelEncoder(options=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {v:ok for ok,v in city_map.gadgets()}
n_cities = len(city_map)


p95_income_k_by_city = df.percentile_approx("income_k", 95, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = df.percentile_approx("value_score", 50, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.imply("income_k", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.imply("goal", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.depend(binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])


p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)


city_table = pd.DataFrame({
   "city_id": np.arange(n_cities),
   "metropolis": [inv_city_map[i] for i in vary(n_cities)],
   "n": n_by_city.astype("int64"),
   "avg_income_k": avg_income_k_by_city,
   "p95_income_k": p95_income_k_by_city,
   "median_value_score": p50_value_by_city,
   "target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)


print("nCity abstract:")
print(city_table.to_string(index=False))


df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.be a part of(df_city_features, on="metropolis", rsuffix="_city")


df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_score

We encode categorical metropolis knowledge and compute scalable, approximate per-city statistics utilizing binning-based operations. We assemble these aggregates right into a city-level desk and be a part of them again to the principle dataset. We then derive relative options that examine every file towards its metropolis context.

features_num = [
   "age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
   "p95_income_k","avg_income_k","median_value_score","target_rate",
   "income_vs_city_p95","value_vs_city_median"
]


scaler = vaex.ml.StandardScaler(options=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)


options = ["z_"+f for f in features_num] + ["label_encoded_city"]


df_train, df_test = df.split_random([0.80, 0.20], random_state=42)


mannequin = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(mannequin=mannequin, options=options, goal="goal", prediction_name="pred")


t0 = time.time()
vaex_model.match(df=df_train)
fit_s = time.time() - t0


df_test = vaex_model.rework(df_test)


y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()


auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)


print("nModel:")
print("fit_seconds:", spherical(fit_s, 3))
print("test_auc:", spherical(float(auc), 4))
print("test_avg_precision:", spherical(float(ap), 4))

We standardize all numeric options utilizing Vaex’s ML utilities and put together a constant characteristic vector for modeling. We cut up the dataset with out loading the whole dataset into reminiscence. We practice a logistic regression mannequin by Vaex’s sklearn wrapper and consider its predictive efficiency.

deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts[0] = -np.inf
cuts[-1] = np.inf
bucket = np.digitize(y_pred, cuts[1:-1], proper=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
carry = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.depend(), "fee": vaex.agg.imply("y_true"), "avg_pred": vaex.agg.imply("y_pred")}).type("bucket")
lift_pd = carry.to_pandas_df()
baseline = float(df_test_local["y_true"].imply())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("nDecile carry desk:")
print(lift_pd.to_string(index=False))

We analyze mannequin conduct by segmenting predictions into deciles and computing carry metrics. We calculate baseline charges and examine them throughout rating buckets to evaluate rating high quality. We summarize the ends in a compact carry desk that displays real-world mannequin diagnostics.

out_dir = "/content material/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)


parquet_path = os.path.be a part of(out_dir, "customers_vaex.parquet")
state_path = os.path.be a part of(out_dir, "vaex_pipeline.json")


base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export = df[export_cols].pattern(n=500_000, random_state=1)


if os.path.exists(parquet_path):
   os.take away(parquet_path)
df_export.export_parquet(parquet_path)


pipeline_state = {
   "vaex_version": vaex.__version__,
   "encoder_labels": {ok: {str(kk): int(vv) for kk,vv in v.gadgets()} for ok,v in encoder.labels_.gadgets()},
   "scaler_mean": [float(x) for x in scaler.mean_],
   "scaler_std": [float(x) for x in scaler.std_],
   "features_num": features_num,
   "export_cols": export_cols,
}
with open(state_path, "w") as f:
   json.dump(pipeline_state, f)


df_reopen = vaex.open(parquet_path)


df_reopen["income_k"] = df_reopen.earnings / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.earnings.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)


df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.be a part of(df_city_features, on="metropolis", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score


with open(state_path, "r") as f:
   st = json.load(f)


labels_city = {ok: int(v) for ok,v in st["encoder_labels"]["city"].gadgets()}
df_reopen["label_encoded_city"] = df_reopen.metropolis.map(labels_city, default_value=-1)


for i, feat in enumerate(st["features_num"]):
   mean_i = st["scaler_mean"][i]
   std_i = st["scaler_std"][i] if st["scaler_std"][i] != 0 else 1.0
   df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i


df_reopen = vaex_model.rework(df_reopen)


print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))


print("nDone.")

We export a big, feature-complete pattern to Parquet and persist the total preprocessing state for reproducibility. We reload the information and deterministically rebuild all engineered options from saved metadata. We run inference on the reloaded dataset to substantiate that the pipeline stays secure and deployable end-to-end.

In conclusion, we demonstrated how Vaex permits quick, memory-efficient knowledge processing whereas nonetheless supporting superior characteristic engineering, aggregation, and mannequin integration. We confirmed that approximate statistics, lazy computation, and out-of-core execution enable us to scale cleanly from evaluation to deployment-ready artifacts. By exporting reproducible options and persisting the total pipeline state, we closed the loop from uncooked knowledge to inference, illustrating how Vaex matches naturally into trendy large-data Python workflows.


Take a look at the Full Codes right here. Additionally, be at liberty to comply with us on Twitter and don’t neglect to affix our 120k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be a part of us on telegram as properly.


Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles