Kembali ke Blog
Python Data Science: Pandas, NumPy & Machine Learning Mastery 2025
PythonData SciencePandasNumPyMachine Learning

Python Data Science: Pandas, NumPy & Machine Learning Mastery 2025

Artikel ini sepenuhnya ditulis oleh teknologi AI ( Claude Sonnet 4 ) dan direview Oleh Muhammad Aji Sukma

Python telah menjadi bahasa pemrograman #1 untuk data science berkat ecosystem yang rich dan library-library powerful seperti Pandas, NumPy, dan Scikit-learn. Dalam artikel ini, kita akan membahas secara mendalam bagaimana menggunakan Python untuk data analysis, manipulation, dan machine learning yang production-ready.

🎯 Mengapa Python Dominan di Data Science?

Keunggulan Python untuk Data Science

1. Rich Ecosystem & Libraries Python memiliki ecosystem yang sangat lengkap untuk data science workflow.

# Essential data science imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

# Configure plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

2. Readable & Intuitive Syntax Python code mudah dibaca dan dipahami, sangat penting untuk collaborative data science projects.

# Contoh data analysis workflow yang intuitive
def analyze_sales_data(data):
    """
    Comprehensive sales data analysis
    """
    print("📊 Sales Data Analysis Report")
    print("=" * 50)

    # Basic statistics
    print(f"📈 Dataset Shape: {data.shape}")
    print(f"📅 Date Range: {data['date'].min()} to {data['date'].max()}")
    print(f"💰 Total Revenue: ${data['revenue'].sum():,.2f}")
    print(f"📦 Total Orders: {data['order_id'].nunique():,}")

    # Monthly trends
    monthly_revenue = data.groupby(data['date'].dt.to_period('M'))['revenue'].sum()
    print(f"📈 Average Monthly Revenue: ${monthly_revenue.mean():,.2f}")
    print(f"📊 Revenue Growth Rate: {((monthly_revenue.iloc[-1] / monthly_revenue.iloc[0]) - 1) * 100:.1f}%")

    return {
        'monthly_trends': monthly_revenue,
        'top_products': data.groupby('product')['revenue'].sum().sort_values(ascending=False).head(10),
        'customer_segments': data.groupby('customer_segment').agg({
            'revenue': ['sum', 'mean', 'count']
        }).round(2)
    }

# Usage dengan synthetic data
np.random.seed(42)
dates = pd.date_range('2024-01-01', '2024-12-31', freq='D')
sample_data = pd.DataFrame({
    'date': np.random.choice(dates, 10000),
    'order_id': range(10000),
    'product': np.random.choice(['Laptop', 'Phone', 'Tablet', 'Watch', 'Headphones'], 10000),
    'customer_segment': np.random.choice(['Premium', 'Standard', 'Basic'], 10000, p=[0.2, 0.5, 0.3]),
    'revenue': np.random.lognormal(mean=5, sigma=1, size=10000)
})

analysis_results = analyze_sales_data(sample_data)

📊 Pandas: Data Manipulation & Analysis

Advanced DataFrame Operations

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class DataProcessor:
    """
    Advanced data processing class dengan best practices
    """

    def __init__(self, data_path=None, data=None):
        if data_path:
            self.df = self.load_data(data_path)
        elif data is not None:
            self.df = data.copy()
        else:
            self.df = pd.DataFrame()

        self.original_shape = self.df.shape
        self.processing_log = []

    def load_data(self, file_path, **kwargs):
        """
        Smart data loading dengan auto-detection
        """
        file_extension = file_path.split('.')[-1].lower()

        loaders = {
            'csv': pd.read_csv,
            'xlsx': pd.read_excel,
            'json': pd.read_json,
            'parquet': pd.read_parquet,
            'pkl': pd.read_pickle
        }

        if file_extension in loaders:
            try:
                df = loaders[file_extension](file_path, **kwargs)
                self.log_operation(f"Loaded {df.shape[0]:,} rows from {file_path}")
                return df
            except Exception as e:
                raise Exception(f"Error loading {file_path}: {str(e)}")
        else:
            raise ValueError(f"Unsupported file format: {file_extension}")

    def profile_data(self):
        """
        Comprehensive data profiling
        """
        profile = {
            'basic_info': {
                'shape': self.df.shape,
                'memory_usage': f"{self.df.memory_usage(deep=True).sum() / 1024**2:.2f} MB",
                'dtypes': dict(self.df.dtypes.value_counts()),
            },
            'missing_data': {
                col: {
                    'count': self.df[col].isnull().sum(),
                    'percentage': (self.df[col].isnull().sum() / len(self.df)) * 100
                }
                for col in self.df.columns if self.df[col].isnull().sum() > 0
            },
            'duplicates': {
                'count': self.df.duplicated().sum(),
                'percentage': (self.df.duplicated().sum() / len(self.df)) * 100
            },
            'numeric_summary': self.df.describe().to_dict(),
            'categorical_summary': {
                col: {
                    'unique_values': self.df[col].nunique(),
                    'top_values': dict(self.df[col].value_counts().head().to_dict())
                }
                for col in self.df.select_dtypes(include=['object']).columns
            }
        }

        return profile

    def clean_data(self, strategy='smart'):
        """
        Intelligent data cleaning
        """
        original_rows = len(self.df)

        if strategy == 'smart':
            # Remove columns dengan > 70% missing values
            high_missing_cols = [
                col for col in self.df.columns
                if (self.df[col].isnull().sum() / len(self.df)) > 0.7
            ]
            if high_missing_cols:
                self.df = self.df.drop(columns=high_missing_cols)
                self.log_operation(f"Dropped {len(high_missing_cols)} high-missing columns")

            # Fill missing values intelligently
            for col in self.df.columns:
                if self.df[col].isnull().sum() > 0:
                    if self.df[col].dtype in ['int64', 'float64']:
                        # Numeric: use median
                        self.df[col] = self.df[col].fillna(self.df[col].median())
                    else:
                        # Categorical: use mode atau 'Unknown'
                        mode_value = self.df[col].mode()
                        fill_value = mode_value[0] if len(mode_value) > 0 else 'Unknown'
                        self.df[col] = self.df[col].fillna(fill_value)

            # Remove exact duplicates
            before_dedup = len(self.df)
            self.df = self.df.drop_duplicates()
            duplicates_removed = before_dedup - len(self.df)

            if duplicates_removed > 0:
                self.log_operation(f"Removed {duplicates_removed} duplicate rows")

        self.log_operation(f"Data cleaning completed: {original_rows:,} → {len(self.df):,} rows")
        return self

    def feature_engineering(self):
        """
        Advanced feature engineering
        """
        # Detect date columns dan create date features
        for col in self.df.columns:
            if self.df[col].dtype == 'object':
                try:
                    # Try to convert to datetime
                    dt_series = pd.to_datetime(self.df[col], errors='ignore', infer_datetime_format=True)
                    if dt_series.dtype == 'datetime64[ns]':
                        self.df[col] = dt_series

                        # Create date-based features
                        self.df[f'{col}_year'] = dt_series.dt.year
                        self.df[f'{col}_month'] = dt_series.dt.month
                        self.df[f'{col}_quarter'] = dt_series.dt.quarter
                        self.df[f'{col}_dayofweek'] = dt_series.dt.dayofweek
                        self.df[f'{col}_is_weekend'] = dt_series.dt.dayofweek.isin([5, 6])

                        self.log_operation(f"Created date features for {col}")
                except:
                    continue

        # Create interaction features untuk numeric columns
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns.tolist()
        if len(numeric_cols) >= 2:
            # Create ratios for first few numeric columns
            for i in range(min(3, len(numeric_cols))):
                for j in range(i + 1, min(3, len(numeric_cols))):
                    col1, col2 = numeric_cols[i], numeric_cols[j]
                    if (self.df[col2] != 0).all():  # Avoid division by zero
                        self.df[f'{col1}_{col2}_ratio'] = self.df[col1] / self.df[col2]
                        self.log_operation(f"Created ratio feature: {col1}/{col2}")

        return self

    def detect_outliers(self, method='iqr', factor=1.5):
        """
        Intelligent outlier detection
        """
        outliers_info = {}
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns

        for col in numeric_cols:
            if method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - factor * IQR
                upper_bound = Q3 + factor * IQR

                outliers_mask = (self.df[col] < lower_bound) | (self.df[col] > upper_bound)
                outliers_count = outliers_mask.sum()

                if outliers_count > 0:
                    outliers_info[col] = {
                        'count': outliers_count,
                        'percentage': (outliers_count / len(self.df)) * 100,
                        'bounds': (lower_bound, upper_bound)
                    }

            elif method == 'zscore':
                z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                outliers_mask = z_scores > factor
                outliers_count = outliers_mask.sum()

                if outliers_count > 0:
                    outliers_info[col] = {
                        'count': outliers_count,
                        'percentage': (outliers_count / len(self.df)) * 100,
                        'threshold': factor
                    }

        return outliers_info

    def advanced_aggregations(self, group_cols, agg_col, custom_funcs=None):
        """
        Advanced aggregation dengan custom functions
        """
        default_funcs = ['mean', 'median', 'std', 'min', 'max', 'count']

        # Custom aggregation functions
        def coefficient_of_variation(x):
            return x.std() / x.mean() if x.mean() != 0 else 0

        def percentile_90(x):
            return np.percentile(x, 90)

        def percentile_10(x):
            return np.percentile(x, 10)

        custom_aggs = {
            'cv': coefficient_of_variation,
            'p90': percentile_90,
            'p10': percentile_10,
            'range': lambda x: x.max() - x.min(),
            'skewness': lambda x: x.skew(),
        }

        if custom_funcs:
            custom_aggs.update(custom_funcs)

        # Perform aggregation
        agg_funcs = default_funcs + list(custom_aggs.keys())

        try:
            # Pandas aggregation dengan custom functions
            result = self.df.groupby(group_cols)[agg_col].agg(
                mean='mean',
                median='median',
                std='std',
                min='min',
                max='max',
                count='count',
                **custom_aggs
            ).round(3)

            self.log_operation(f"Aggregated {agg_col} by {group_cols} with {len(agg_funcs)} functions")
            return result

        except Exception as e:
            print(f"Aggregation error: {e}")
            return None

    def time_series_analysis(self, date_col, value_col, freq='D'):
        """
        Time series analysis dan forecasting preparation
        """
        if date_col not in self.df.columns:
            raise ValueError(f"Date column '{date_col}' not found")

        # Ensure datetime type
        self.df[date_col] = pd.to_datetime(self.df[date_col])

        # Create time series
        ts_data = self.df.groupby(pd.Grouper(key=date_col, freq=freq))[value_col].sum()

        # Basic time series features
        analysis = {
            'trend': {
                'slope': np.polyfit(range(len(ts_data)), ts_data.values, 1)[0],
                'correlation': np.corrcoef(range(len(ts_data)), ts_data.values)[0, 1]
            },
            'seasonality': {
                'monthly': ts_data.groupby(ts_data.index.month).mean().to_dict(),
                'weekday': ts_data.groupby(ts_data.index.dayofweek).mean().to_dict() if freq == 'D' else {}
            },
            'statistics': {
                'mean': ts_data.mean(),
                'std': ts_data.std(),
                'min': ts_data.min(),
                'max': ts_data.max(),
                'growth_rate': (ts_data.iloc[-1] / ts_data.iloc[0] - 1) * 100 if len(ts_data) > 1 else 0
            }
        }

        return ts_data, analysis

    def log_operation(self, operation):
        """Log processing operations"""
        self.processing_log.append({
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'operation': operation
        })

    def get_processing_summary(self):
        """Get summary of all operations"""
        return {
            'original_shape': self.original_shape,
            'current_shape': self.df.shape,
            'operations': self.processing_log
        }

# Example usage dengan real-world data
def demonstrate_advanced_pandas():
    """
    Demonstrate advanced pandas operations
    """
    print("🐼 Advanced Pandas Operations Demo")
    print("=" * 50)

    # Create sample e-commerce data
    np.random.seed(42)
    n_records = 50000

    data = {
        'order_date': pd.date_range('2023-01-01', '2024-12-31', periods=n_records),
        'customer_id': np.random.randint(1000, 10000, n_records),
        'product_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home', 'Sports'], n_records),
        'product_name': [f"Product_{i}" for i in np.random.randint(1, 1000, n_records)],
        'quantity': np.random.randint(1, 10, n_records),
        'unit_price': np.random.lognormal(3, 1, n_records),
        'discount_pct': np.random.choice([0, 5, 10, 15, 20], n_records, p=[0.4, 0.3, 0.2, 0.08, 0.02]),
        'customer_age': np.random.randint(18, 80, n_records),
        'customer_city': np.random.choice(['Jakarta', 'Surabaya', 'Bandung', 'Medan', 'Makassar'], n_records)
    }

    # Add some missing values dan duplicates
    df = pd.DataFrame(data)
    df.loc[np.random.choice(df.index, 1000), 'customer_age'] = np.nan
    df.loc[np.random.choice(df.index, 500), 'discount_pct'] = np.nan

    # Initialize processor
    processor = DataProcessor(data=df)

    # Profile data
    print("📊 Data Profiling Results:")
    profile = processor.profile_data()
    print(f"Shape: {profile['basic_info']['shape']}")
    print(f"Memory: {profile['basic_info']['memory_usage']}")
    print(f"Missing data columns: {len(profile['missing_data'])}")

    # Clean data
    processor.clean_data()

    # Feature engineering
    processor.feature_engineering()

    # Calculate total amount
    processor.df['total_amount'] = (processor.df['quantity'] * processor.df['unit_price'] *
                                   (1 - processor.df['discount_pct'] / 100))

    # Advanced aggregations
    print("\n📈 Advanced Aggregations:")
    agg_result = processor.advanced_aggregations(
        group_cols=['product_category'],
        agg_col='total_amount'
    )
    print(agg_result.head())

    # Time series analysis
    print("\n📅 Time Series Analysis:")
    ts_data, ts_analysis = processor.time_series_analysis('order_date', 'total_amount', 'M')
    print(f"Monthly sales trend slope: {ts_analysis['trend']['slope']:.2f}")
    print(f"Average monthly sales: ${ts_analysis['statistics']['mean']:,.2f}")

    # Outlier detection
    print("\n🔍 Outlier Detection:")
    outliers = processor.detect_outliers(method='iqr')
    for col, info in outliers.items():
        print(f"{col}: {info['count']} outliers ({info['percentage']:.1f}%)")

    # Processing summary
    summary = processor.get_processing_summary()
    print(f"\n✅ Processing completed: {summary['original_shape']} → {summary['current_shape']}")
    print(f"Operations performed: {len(summary['operations'])}")

    return processor.df

# Run demonstration
processed_data = demonstrate_advanced_pandas()

🔢 NumPy: High-Performance Computing

Advanced Array Operations & Vectorization

import numpy as np
from numba import jit, vectorize
import time

class NumPyOptimizer:
    """
    Advanced NumPy operations dengan performance optimization
    """

    @staticmethod
    def demonstrate_vectorization():
        """
        Show power of vectorization vs loops
        """
        print("⚡ Vectorization Performance Comparison")
        print("=" * 50)

        # Large array untuk testing
        n = 1_000_000
        arr1 = np.random.rand(n)
        arr2 = np.random.rand(n)

        # Method 1: Pure Python loop (SLOW)
        def python_loop_multiply(a, b):
            result = []
            for i in range(len(a)):
                result.append(a[i] * b[i])
            return result

        # Method 2: List comprehension (BETTER)
        def list_comp_multiply(a, b):
            return [a[i] * b[i] for i in range(len(a))]

        # Method 3: NumPy vectorization (BEST)
        def numpy_multiply(a, b):
            return a * b

        # Performance comparison
        methods = [
            ("Python Loop", lambda: python_loop_multiply(arr1[:1000], arr2[:1000])),  # Smaller sample
            ("List Comprehension", lambda: list_comp_multiply(arr1[:1000], arr2[:1000])),
            ("NumPy Vectorization", lambda: numpy_multiply(arr1, arr2))
        ]

        for name, func in methods:
            start_time = time.time()
            result = func()
            end_time = time.time()
            print(f"{name:20}: {(end_time - start_time)*1000:.3f} ms")

        return arr1, arr2

    @staticmethod
    def advanced_array_operations():
        """
        Demonstrate advanced NumPy operations
        """
        print("\n🔢 Advanced NumPy Operations")
        print("=" * 50)

        # Create sample data: sales data by region dan month
        np.random.seed(42)
        regions = 5
        months = 12
        products = 20

        # 3D array: regions × months × products
        sales_data = np.random.lognormal(mean=8, sigma=1, size=(regions, months, products))

        print(f"Sales data shape: {sales_data.shape}")
        print(f"Total sales: ${sales_data.sum():,.2f}")

        # Advanced indexing dan slicing
        print("\n📊 Advanced Indexing Examples:")

        # Boolean indexing: find high-performing products
        high_sales_mask = sales_data > np.percentile(sales_data, 90)
        high_sales_count = high_sales_mask.sum()
        print(f"High-performing sales (>90th percentile): {high_sales_count}")

        # Fancy indexing: select specific regions dan months
        selected_regions = [0, 2, 4]  # First, third, fifth region
        selected_months = [0, 5, 11]  # January, June, December
        subset = sales_data[np.ix_(selected_regions, selected_months)]
        print(f"Subset shape: {subset.shape}")

        # Aggregations across different axes
        print("\n📈 Aggregation Results:")
        total_by_region = sales_data.sum(axis=(1, 2))  # Sum across months dan products
        total_by_month = sales_data.sum(axis=(0, 2))   # Sum across regions dan products
        total_by_product = sales_data.sum(axis=(0, 1)) # Sum across regions dan months

        print(f"Best region (total sales): Region {total_by_region.argmax()} - ${total_by_region.max():,.2f}")
        print(f"Best month (total sales): Month {total_by_month.argmax() + 1} - ${total_by_month.max():,.2f}")
        print(f"Best product (total sales): Product {total_by_product.argmax() + 1} - ${total_by_product.max():,.2f}")

        return sales_data, total_by_region, total_by_month, total_by_product

    @staticmethod
    def statistical_operations():
        """
        Advanced statistical operations dengan NumPy
        """
        print("\n📊 Statistical Analysis with NumPy")
        print("=" * 50)

        # Generate sample data: customer transaction amounts
        np.random.seed(42)

        # Different customer segments with different spending patterns
        premium_customers = np.random.lognormal(mean=6, sigma=0.5, size=1000)  # High spenders
        standard_customers = np.random.lognormal(mean=4.5, sigma=0.7, size=5000)  # Medium spenders
        basic_customers = np.random.lognormal(mean=3, sigma=0.8, size=10000)  # Low spenders

        all_customers = np.concatenate([premium_customers, standard_customers, basic_customers])

        # Comprehensive statistical analysis
        stats = {
            'mean': np.mean(all_customers),
            'median': np.median(all_customers),
            'std': np.std(all_customers),
            'var': np.var(all_customers),
            'min': np.min(all_customers),
            'max': np.max(all_customers),
            'q25': np.percentile(all_customers, 25),
            'q75': np.percentile(all_customers, 75),
            'iqr': np.percentile(all_customers, 75) - np.percentile(all_customers, 25),
            'skewness': None,  # Will calculate manually
            'kurtosis': None   # Will calculate manually
        }

        # Manual skewness calculation
        mean_val = stats['mean']
        std_val = stats['std']
        skewness = np.mean(((all_customers - mean_val) / std_val) ** 3)
        kurtosis = np.mean(((all_customers - mean_val) / std_val) ** 4) - 3

        stats['skewness'] = skewness
        stats['kurtosis'] = kurtosis

        print("Customer Spending Statistics:")
        for key, value in stats.items():
            if value is not None:
                print(f"{key.upper():12}: ${value:8.2f}" if 'q' not in key.lower() and key not in ['skewness', 'kurtosis']
                     else f"{key.upper():12}: {value:8.3f}")

        # Advanced operations: correlation dan covariance
        print("\n🔗 Correlation Analysis:")

        # Create correlated variables
        spending = all_customers
        age = 25 + 40 * np.random.random(len(spending)) + 0.3 * spending + np.random.normal(0, 5, len(spending))
        income = 30000 + 2 * spending + np.random.normal(0, 10000, len(spending))

        # Correlation matrix
        data_matrix = np.column_stack([spending, age, income])
        correlation_matrix = np.corrcoef(data_matrix.T)

        variables = ['Spending', 'Age', 'Income']
        print("Correlation Matrix:")
        print(f"{'':12}", end='')
        for var in variables:
            print(f"{var:>12}", end='')
        print()

        for i, var in enumerate(variables):
            print(f"{var:12}", end='')
            for j in range(len(variables)):
                print(f"{correlation_matrix[i, j]:12.3f}", end='')
            print()

        return all_customers, data_matrix, correlation_matrix

    @staticmethod
    @jit(nopython=True)  # Numba JIT compilation untuk speed
    def monte_carlo_pi(n_points):
        """
        Monte Carlo estimation of π menggunakan Numba untuk speed
        """
        count_inside = 0
        for i in range(n_points):
            x = np.random.random()
            y = np.random.random()
            if x*x + y*y <= 1:
                count_inside += 1
        return 4.0 * count_inside / n_points

    @staticmethod
    def performance_optimization_demo():
        """
        Demonstrate performance optimization techniques
        """
        print("\n🚀 Performance Optimization Demo")
        print("=" * 50)

        # Compare different approaches untuk matrix operations
        n = 1000
        A = np.random.rand(n, n)
        B = np.random.rand(n, n)

        print(f"Matrix size: {n}×{n}")

        # Method 1: Pure NumPy
        start_time = time.time()
        C1 = np.dot(A, B)
        numpy_time = time.time() - start_time
        print(f"NumPy dot product:     {numpy_time:.4f} seconds")

        # Method 2: @ operator (Python 3.5+)
        start_time = time.time()
        C2 = A @ B
        at_operator_time = time.time() - start_time
        print(f"@ operator:            {at_operator_time:.4f} seconds")

        # Method 3: Using BLAS directly (if available)
        try:
            from scipy.linalg.blas import dgemm
            start_time = time.time()
            C3 = dgemm(1.0, A, B)
            blas_time = time.time() - start_time
            print(f"Direct BLAS:           {blas_time:.4f} seconds")
        except ImportError:
            print("BLAS not available")

        # Memory-efficient operations
        print("\n💾 Memory Efficiency Demo:")

        # In-place operations
        large_array = np.random.rand(10000, 1000)
        original_memory = large_array.nbytes / (1024**2)  # MB
        print(f"Original array size: {original_memory:.2f} MB")

        # Non-in-place (creates copy)
        start_time = time.time()
        result1 = large_array * 2 + 1
        copy_time = time.time() - start_time
        copy_memory = result1.nbytes / (1024**2)

        # In-place operations (more memory efficient)
        large_array_copy = large_array.copy()
        start_time = time.time()
        large_array_copy *= 2
        large_array_copy += 1
        inplace_time = time.time() - start_time

        print(f"Copy operation:   {copy_time:.4f} seconds, {copy_memory:.2f} MB extra")
        print(f"In-place operation: {inplace_time:.4f} seconds, 0 MB extra")

        # Monte Carlo π estimation dengan Numba
        print("\n🎲 Monte Carlo π Estimation (with Numba JIT):")
        for n_points in [10_000, 100_000, 1_000_000]:
            start_time = time.time()
            pi_estimate = NumPyOptimizer.monte_carlo_pi(n_points)
            end_time = time.time()
            error = abs(pi_estimate - np.pi) / np.pi * 100
            print(f"n={n_points:8,}: π≈{pi_estimate:.6f}, error={error:.3f}%, time={end_time-start_time:.4f}s")

# Run NumPy demonstrations
numpy_optimizer = NumPyOptimizer()

# Vectorization demo
arr1, arr2 = numpy_optimizer.demonstrate_vectorization()

# Advanced operations
sales_data, region_totals, month_totals, product_totals = numpy_optimizer.advanced_array_operations()

# Statistical analysis
customer_data, data_matrix, corr_matrix = numpy_optimizer.statistical_operations()

# Performance optimization
numpy_optimizer.performance_optimization_demo()

🤖 Machine Learning Implementation

Complete ML Pipeline dengan Scikit-learn

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
import matplotlib.pyplot as plt
import seaborn as sns

class MLPipeline:
    """
    Production-ready Machine Learning Pipeline
    """

    def __init__(self, problem_type='classification'):
        self.problem_type = problem_type
        self.models = {}
        self.best_model = None
        self.preprocessor = None
        self.feature_importance = None
        self.performance_metrics = {}

    def load_and_prepare_data(self, data_path=None, data=None, target_column=None):
        """
        Load dan prepare data untuk ML
        """
        if data_path:
            self.df = pd.read_csv(data_path)
        elif data is not None:
            self.df = data.copy()
        else:
            # Create sample data for demonstration
            self.df = self._create_sample_data()

        if target_column:
            self.target_column = target_column
        else:
            # Auto-detect target (assuming last column)
            self.target_column = self.df.columns[-1]

        print(f"📊 Dataset loaded: {self.df.shape}")
        print(f"🎯 Target column: {self.target_column}")
        print(f"📈 Target distribution:\n{self.df[self.target_column].value_counts()}")

        return self.df

    def _create_sample_data(self):
        """
        Create sample customer churn dataset
        """
        np.random.seed(42)
        n_customers = 10000

        # Customer features
        data = {
            'customer_id': range(1, n_customers + 1),
            'age': np.random.randint(18, 80, n_customers),
            'tenure_months': np.random.randint(1, 72, n_customers),
            'monthly_charges': np.random.normal(65, 20, n_customers),
            'total_charges': np.random.normal(2000, 1500, n_customers),
            'contract_type': np.random.choice(['Month-to-month', '1-year', '2-year'], n_customers, p=[0.5, 0.3, 0.2]),
            'payment_method': np.random.choice(['Electronic check', 'Mailed check', 'Bank transfer', 'Credit card'], n_customers),
            'internet_service': np.random.choice(['DSL', 'Fiber optic', 'No'], n_customers, p=[0.4, 0.4, 0.2]),
            'online_security': np.random.choice(['Yes', 'No', 'No internet service'], n_customers, p=[0.3, 0.5, 0.2]),
            'tech_support': np.random.choice(['Yes', 'No', 'No internet service'], n_customers, p=[0.3, 0.5, 0.2]),
            'streaming_tv': np.random.choice(['Yes', 'No', 'No internet service'], n_customers, p=[0.3, 0.5, 0.2]),
            'paperless_billing': np.random.choice(['Yes', 'No'], n_customers, p=[0.6, 0.4]),
            'senior_citizen': np.random.choice([0, 1], n_customers, p=[0.8, 0.2])
        }

        df = pd.DataFrame(data)

        # Create target variable (churn) dengan realistic logic
        churn_prob = (
            0.1 +  # Base churn rate
            0.3 * (df['contract_type'] == 'Month-to-month') +  # Higher churn untuk month-to-month
            0.2 * (df['monthly_charges'] > 80) +  # Higher churn untuk expensive plans
            0.15 * (df['tenure_months'] < 12) +  # Higher churn untuk new customers
            0.1 * (df['tech_support'] == 'No') +  # Higher churn tanpa tech support
            0.05 * df['senior_citizen'] +  # Slightly higher churn untuk seniors
            np.random.normal(0, 0.1, n_customers)  # Random noise
        )

        # Clip probabilities dan convert to binary
        churn_prob = np.clip(churn_prob, 0, 1)
        df['churn'] = np.random.binomial(1, churn_prob, n_customers)

        # Drop customer_id (not a feature)
        df = df.drop('customer_id', axis=1)

        print("✅ Sample customer churn dataset created")
        return df

    def exploratory_data_analysis(self):
        """
        Comprehensive EDA
        """
        print("\n📊 Exploratory Data Analysis")
        print("=" * 50)

        # Basic info
        print(f"Dataset shape: {self.df.shape}")
        print(f"Missing values: {self.df.isnull().sum().sum()}")
        print(f"Duplicate rows: {self.df.duplicated().sum()}")

        # Target distribution
        target_dist = self.df[self.target_column].value_counts()
        print(f"\nTarget distribution:")
        for value, count in target_dist.items():
            percentage = (count / len(self.df)) * 100
            print(f"  {value}: {count} ({percentage:.1f}%)")

        # Correlation analysis untuk numeric features
        numeric_features = self.df.select_dtypes(include=[np.number]).columns.tolist()
        if self.target_column in numeric_features:
            numeric_features.remove(self.target_column)

        if len(numeric_features) > 1:
            correlation_with_target = self.df[numeric_features + [self.target_column]].corr()[self.target_column].abs().sort_values(ascending=False)
            print(f"\nTop correlations with target:")
            for feature in correlation_with_target.index[1:6]:  # Skip target itself, show top 5
                corr_val = correlation_with_target[feature]
                print(f"  {feature}: {corr_val:.3f}")

        # Categorical features analysis
        categorical_features = self.df.select_dtypes(include=['object']).columns.tolist()
        if self.target_column in categorical_features:
            categorical_features.remove(self.target_column)

        if len(categorical_features) > 0:
            print(f"\nCategorical features summary:")
            for feature in categorical_features[:5]:  # Show first 5
                unique_count = self.df[feature].nunique()
                print(f"  {feature}: {unique_count} unique values")

        return {
            'numeric_features': numeric_features,
            'categorical_features': categorical_features,
            'target_distribution': target_dist
        }

    def feature_engineering(self, numeric_features, categorical_features):
        """
        Advanced feature engineering
        """
        print("\n🔧 Feature Engineering")
        print("=" * 50)

        # Create feature engineering pipeline
        numeric_transformer = Pipeline(steps=[
            ('scaler', StandardScaler())
        ])

        categorical_transformer = Pipeline(steps=[
            ('onehot', OneHotEncoder(drop='first', sparse_output=False))
        ])

        # Combine preprocessing steps
        self.preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, categorical_features)
            ]
        )

        # Additional feature engineering
        df_engineered = self.df.copy()

        # Create interaction features
        if 'monthly_charges' in df_engineered.columns and 'tenure_months' in df_engineered.columns:
            df_engineered['total_charges_estimated'] = df_engineered['monthly_charges'] * df_engineered['tenure_months']
            df_engineered['charges_per_month'] = df_engineered['total_charges'] / (df_engineered['tenure_months'] + 1)

        if 'age' in df_engineered.columns and 'tenure_months' in df_engineered.columns:
            df_engineered['customer_lifetime_ratio'] = df_engineered['tenure_months'] / df_engineered['age']

        # Binning numerical features
        if 'age' in df_engineered.columns:
            df_engineered['age_group'] = pd.cut(df_engineered['age'],
                                               bins=[0, 30, 50, 70, 100],
                                               labels=['Young', 'Middle', 'Senior', 'Elder'])

        if 'monthly_charges' in df_engineered.columns:
            df_engineered['charge_tier'] = pd.qcut(df_engineered['monthly_charges'],
                                                  q=4,
                                                  labels=['Low', 'Medium', 'High', 'Premium'])

        self.df_engineered = df_engineered

        # Update feature lists
        new_numeric = [col for col in df_engineered.select_dtypes(include=[np.number]).columns
                      if col != self.target_column]
        new_categorical = [col for col in df_engineered.select_dtypes(include=['object', 'category']).columns
                          if col != self.target_column]

        print(f"Original features: {len(numeric_features + categorical_features)}")
        print(f"Engineered features: {len(new_numeric + new_categorical)}")

        return new_numeric, new_categorical

    def prepare_model_data(self, test_size=0.2, random_state=42):
        """
        Prepare data untuk modeling
        """
        # Separate features dan target
        X = self.df_engineered.drop(self.target_column, axis=1)
        y = self.df_engineered[self.target_column]

        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state, stratify=y
        )

        print(f"Training set: {X_train.shape}")
        print(f"Test set: {X_test.shape}")

        self.X_train, self.X_test = X_train, X_test
        self.y_train, self.y_test = y_train, y_test

        return X_train, X_test, y_train, y_test

    def train_models(self):
        """
        Train multiple models dan compare performance
        """
        print("\n🤖 Training Multiple Models")
        print("=" * 50)

        # Define models
        models = {
            'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
            'Random Forest': RandomForestClassifier(random_state=42, n_estimators=100),
            'Gradient Boosting': GradientBoostingClassifier(random_state=42, n_estimators=100),
            'SVM': SVC(random_state=42, probability=True)
        }

        # Train dan evaluate setiap model
        for name, model in models.items():
            # Create pipeline dengan preprocessing
            pipeline = Pipeline([
                ('preprocessor', self.preprocessor),
                ('classifier', model)
            ])

            # Cross-validation
            cv_scores = cross_val_score(pipeline, self.X_train, self.y_train, cv=5, scoring='roc_auc')

            # Train pada full training set
            pipeline.fit(self.X_train, self.y_train)

            # Predictions
            y_pred = pipeline.predict(self.X_test)
            y_pred_proba = pipeline.predict_proba(self.X_test)[:, 1]

            # Metrics
            roc_auc = roc_auc_score(self.y_test, y_pred_proba)

            # Store model dan metrics
            self.models[name] = {
                'pipeline': pipeline,
                'cv_scores': cv_scores,
                'roc_auc': roc_auc,
                'predictions': y_pred,
                'probabilities': y_pred_proba
            }

            print(f"{name:20}: CV AUC = {cv_scores.mean():.3f} (±{cv_scores.std()*2:.3f}), Test AUC = {roc_auc:.3f}")

        # Select best model
        best_model_name = max(self.models.keys(), key=lambda k: self.models[k]['roc_auc'])
        self.best_model = self.models[best_model_name]

        print(f"\n🏆 Best model: {best_model_name} (AUC: {self.best_model['roc_auc']:.3f})")

        return self.models

    def hyperparameter_tuning(self, model_name='Random Forest'):
        """
        Hyperparameter tuning untuk selected model
        """
        print(f"\n🔍 Hyperparameter Tuning for {model_name}")
        print("=" * 50)

        # Parameter grids
        param_grids = {
            'Random Forest': {
                'classifier__n_estimators': [50, 100, 200],
                'classifier__max_depth': [10, 20, None],
                'classifier__min_samples_split': [2, 5, 10],
                'classifier__min_samples_leaf': [1, 2, 4]
            },
            'Gradient Boosting': {
                'classifier__n_estimators': [50, 100, 200],
                'classifier__learning_rate': [0.01, 0.1, 0.2],
                'classifier__max_depth': [3, 5, 7],
                'classifier__min_samples_split': [2, 5, 10]
            },
            'Logistic Regression': {
                'classifier__C': [0.01, 0.1, 1, 10, 100],
                'classifier__penalty': ['l1', 'l2'],
                'classifier__solver': ['liblinear', 'saga']
            }
        }

        if model_name in param_grids:
            # Base model
            if model_name == 'Random Forest':
                base_model = RandomForestClassifier(random_state=42)
            elif model_name == 'Gradient Boosting':
                base_model = GradientBoostingClassifier(random_state=42)
            elif model_name == 'Logistic Regression':
                base_model = LogisticRegression(random_state=42, max_iter=1000)

            # Create pipeline
            pipeline = Pipeline([
                ('preprocessor', self.preprocessor),
                ('classifier', base_model)
            ])

            # Grid search
            grid_search = GridSearchCV(
                pipeline,
                param_grids[model_name],
                cv=5,
                scoring='roc_auc',
                n_jobs=-1,
                verbose=1
            )

            grid_search.fit(self.X_train, self.y_train)

            # Best model results
            best_score = grid_search.best_score_
            best_params = grid_search.best_params_

            print(f"Best CV AUC: {best_score:.3f}")
            print("Best parameters:")
            for param, value in best_params.items():
                print(f"  {param}: {value}")

            # Update best model
            self.best_model = {
                'pipeline': grid_search.best_estimator_,
                'cv_score': best_score,
                'best_params': best_params
            }

            return grid_search
        else:
            print(f"Parameter grid not defined for {model_name}")
            return None

    def evaluate_model(self):
        """
        Comprehensive model evaluation
        """
        print("\n📈 Model Evaluation")
        print("=" * 50)

        pipeline = self.best_model['pipeline']
        y_pred = pipeline.predict(self.X_test)
        y_pred_proba = pipeline.predict_proba(self.X_test)[:, 1]

        # Classification report
        print("Classification Report:")
        print(classification_report(self.y_test, y_pred))

        # Confusion matrix
        cm = confusion_matrix(self.y_test, y_pred)
        print(f"\nConfusion Matrix:")
        print(cm)

        # Feature importance (if available)
        if hasattr(pipeline.named_steps['classifier'], 'feature_importances_'):
            # Get feature names after preprocessing
            feature_names = self._get_feature_names()
            importance_scores = pipeline.named_steps['classifier'].feature_importances_

            # Create feature importance dataframe
            feature_importance = pd.DataFrame({
                'feature': feature_names,
                'importance': importance_scores
            }).sort_values('importance', ascending=False)

            print(f"\nTop 10 Most Important Features:")
            for idx, row in feature_importance.head(10).iterrows():
                print(f"  {row['feature']:30}: {row['importance']:.3f}")

            self.feature_importance = feature_importance

        return {
            'classification_report': classification_report(self.y_test, y_pred, output_dict=True),
            'confusion_matrix': cm,
            'roc_auc': roc_auc_score(self.y_test, y_pred_proba),
            'feature_importance': getattr(self, 'feature_importance', None)
        }

    def _get_feature_names(self):
        """
        Get feature names after preprocessing
        """
        feature_names = []

        # Numeric features (after scaling)
        numeric_features = self.preprocessor.named_transformers_['num'].get_feature_names_out()
        feature_names.extend(numeric_features)

        # Categorical features (after one-hot encoding)
        categorical_features = self.preprocessor.named_transformers_['cat'].get_feature_names_out()
        feature_names.extend(categorical_features)

        return feature_names

    def predict_new_data(self, new_data):
        """
        Make predictions pada new data
        """
        if self.best_model is None:
            raise ValueError("No trained model available. Please train a model first.")

        pipeline = self.best_model['pipeline']
        predictions = pipeline.predict(new_data)
        probabilities = pipeline.predict_proba(new_data)[:, 1]

        return predictions, probabilities

# Demonstrate complete ML pipeline
def demonstrate_ml_pipeline():
    """
    Complete demonstration of ML pipeline
    """
    print("🤖 Machine Learning Pipeline Demonstration")
    print("=" * 60)

    # Initialize pipeline
    ml_pipeline = MLPipeline(problem_type='classification')

    # Load data (using sample data)
    df = ml_pipeline.load_and_prepare_data()

    # EDA
    eda_results = ml_pipeline.exploratory_data_analysis()

    # Feature engineering
    numeric_features, categorical_features = ml_pipeline.feature_engineering(
        eda_results['numeric_features'],
        eda_results['categorical_features']
    )

    # Prepare model data
    X_train, X_test, y_train, y_test = ml_pipeline.prepare_model_data()

    # Train multiple models
    models = ml_pipeline.train_models()

    # Hyperparameter tuning (optional - comment out untuk speed)
    # ml_pipeline.hyperparameter_tuning('Random Forest')

    # Evaluate best model
    evaluation_results = ml_pipeline.evaluate_model()

    print("\n✅ ML Pipeline completed successfully!")
    print(f"Best model AUC: {evaluation_results['roc_auc']:.3f}")

    return ml_pipeline, evaluation_results

# Run ML pipeline demonstration
ml_pipeline, results = demonstrate_ml_pipeline()

🌟 Kesimpulan

Python untuk data science di 2025 menawarkan ecosystem yang sangat mature dan powerful. Dengan menguasai:

Core Libraries:

  • Pandas untuk data manipulation dan analysis
  • NumPy untuk high-performance computing
  • Matplotlib/Seaborn untuk data visualization
  • Scikit-learn untuk machine learning

Advanced Techniques:

  • Vectorization untuk performance optimization
  • Feature Engineering yang sophisticated
  • Pipeline Design untuk reproducible workflows
  • Model Evaluation yang comprehensive

Best Practices:

  • Data Profiling sebelum analysis
  • Proper Preprocessing pipelines
  • Cross-validation untuk model reliability
  • Feature Importance analysis

Production Readiness:

  • Error Handling yang robust
  • Performance Monitoring dan optimization
  • Code Documentation yang comprehensive
  • Modular Design untuk maintainability

Python ecosystem untuk data science terus berkembang dengan tools baru seperti Polars, DuckDB, dan advanced ML libraries. Namun, foundation dengan Pandas, NumPy, dan Scikit-learn tetap essential untuk success di field ini.

Pro Tips:

  • Always profile your data before analysis
  • Use vectorization untuk better performance
  • Implement proper cross-validation strategies
  • Document your feature engineering decisions
  • Keep your models simple and interpretable when possible

Happy data science with Python! 🐍📊

Bagikan Artikel

Tentang Penulis

Muhammad Aji Sukma

IT enthusiast

Seorang IT enthusiast yang suka membagikan pengetahuan seputar teknologi, pemrograman, dan pengembangan web. Selalu antusias belajar hal baru .

Tags

#Python#Data Science#Pandas#NumPy#Machine Learning