Day 3: ClickHouse 실시간 데이터 적재 및 분석 파이프라인 구축

📋 오늘의 목표

Day 2에서 구축한 Spark Streaming 집계 결과를 ClickHouse에 저장하여 완전한 실시간 데이터 파이프라인을 완성한다.


🏗️ 아키텍처 개요

[광고 로그 생성기] 
    ↓ (JSON events)
[Kafka: ad-events Topic]
    ↓ (Stream)
[Spark Streaming: 1분 윈도우 집계]
    ↓ (Aggregated data)
[ClickHouse: OLAP 저장소] ← ✨ Today's Focus
    ↓ (Query)
[분석 & 대시보드]

🚀 Step 1: ClickHouse 테이블 설계

1.1 비즈니스 요구사항 분석

AdTech 실시간 파이프라인에서 저장해야 할 핵심 메트릭:

  • 시간 윈도우: 1분 단위 집계 시작/종료 시간
  • 차원(Dimensions): event_type, country, ad_format
  • 메트릭(Metrics): event_count, avg_revenue, total_revenue

1.2 테이블 스키마 설계

CREATE DATABASE IF NOT EXISTS rtb;

CREATE TABLE IF NOT EXISTS rtb.ad_events_aggregated (
    window_start DateTime,
    window_end DateTime,
    event_type String,
    country String,
    ad_format String,
    event_count UInt64,
    avg_revenue Float64,
    total_revenue Float64,
    created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (window_start, event_type, country, ad_format)
PARTITION BY toYYYYMM(window_start);

설계 포인트:

  1. MergeTree 엔진: ClickHouse의 기본 고성능 OLAP 엔진
    • 삽입 시 자동 정렬 및 병합
    • 범위 쿼리 최적화
  2. ORDER BY 절: 쿼리 패턴에 최적화된 정렬 키
    • window_start: 시계열 쿼리 (최근 데이터 조회)
    • event_type, country, ad_format: 차원별 필터링
  3. PARTITION BY: 월별 파티셔닝
    • 데이터 관리 용이 (오래된 파티션 삭제)
    • 쿼리 성능 향상 (파티션 프루닝)

1.3 초기화 스크립트 작성

sql/create_tables.sql 파일 생성:

-- 데이터베이스 생성 (먼저!)
CREATE DATABASE IF NOT EXISTS rtb;

-- 실시간 이벤트 집계 테이블
CREATE TABLE IF NOT EXISTS rtb.ad_events_aggregated (
    window_start DateTime,
    window_end DateTime,
    event_type String,
    country String,
    ad_format String,
    event_count UInt64,
    avg_revenue Float64,
    total_revenue Float64,
    created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (window_start, event_type, country, ad_format)
PARTITION BY toYYYYMM(window_start);

-- Raw 이벤트 테이블 (향후 사용)
CREATE TABLE IF NOT EXISTS rtb.ad_events_raw (
    timestamp DateTime,
    event_type String,
    user_id String,
    country String,
    ad_format String,
    campaign_id String,
    bid_price Float64,
    revenue Float64,
    created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (timestamp, event_type);

🔧 Step 2: Docker 환경 트러블슈팅

2.1 Kafka 초기화 문제 해결

문제 발생:

NodeExistsException: KeeperErrorCode = NodeExists

Zookeeper에 이전 Kafka 브로커 정보가 남아있어 충돌 발생.

해결 방법:

# 완전 초기화
docker-compose down
sudo rm -rf clickhouse-data/* redis-data/*
docker-compose up -d
sleep 30  # Kafka 완전 시작 대기

2.2 ClickHouse 초기화 스크립트 실행 확인

문제: 초기화 스크립트가 데이터베이스 생성 없이 테이블을 만들려고 시도.

해결: SQL 파일에 CREATE DATABASE IF NOT EXISTS rtb; 추가 후 컨테이너 재시작.

docker-compose restart clickhouse
docker logs adtech-clickhouse --tail 30

2.3 테이블 생성 확인

-- ClickHouse 클라이언트 접속
docker exec -it adtech-clickhouse clickhouse-client

-- 확인
SHOW DATABASES;
USE rtb;
SHOW TABLES;
DESCRIBE ad_events_aggregated;

💻 Step 3: Spark ClickHouse 적재 파이프라인 구현

3.1 새로운 Spark 애플리케이션 작성

src/spark_processor_clickhouse.py:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Spark Session 생성 (ClickHouse JDBC 드라이버 포함)
spark = SparkSession.builder \
    .appName("RTB-Pipeline-Processor-ClickHouse") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "com.clickhouse:clickhouse-jdbc:0.4.6:all") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session 시작됨!")

# 광고 로그 스키마 정의
ad_log_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("campaign_id", StringType(), True),
    StructField("ad_format", StringType(), True),
    StructField("country", StringType(), True),
    StructField("device", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("revenue", DoubleType(), True),
])

print("📡 Kafka에서 데이터 스트리밍 시작...")

# Kafka에서 스트리밍 데이터 읽기
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "ad-events") \
    .option("startingOffsets", "earliest") \
    .load()

# JSON 파싱
parsed_df = df.select(
    from_json(col("value").cast("string"), ad_log_schema).alias("data")
).select("data.*")

# 타임스탬프 변환
processed_df = parsed_df.withColumn(
    "event_timestamp",
    to_timestamp(col("timestamp"))
)

print("🔄 데이터 처리 파이프라인 구성 완료!")

# 윈도우 기반 집계 (1분 단위)
windowed_stats = processed_df \
    .withWatermark("event_timestamp", "1 minute") \
    .groupBy(
        window(col("event_timestamp"), "1 minute"),
        col("event_type"),
        col("country"),
        col("ad_format")
    ).agg(
        count("*").alias("event_count"),
        avg("revenue").alias("avg_revenue"),
        sum("revenue").alias("total_revenue")
    )

# 출력 포맷 변환
output_df = windowed_stats.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("event_type"),
    col("country"),
    col("ad_format"),
    col("event_count"),
    round(col("avg_revenue"), 4).alias("avg_revenue"),
    round(col("total_revenue"), 4).alias("total_revenue")
)

# ClickHouse 연결 정보
clickhouse_url = "jdbc:clickhouse://adtech-clickhouse:8123/rtb"
clickhouse_properties = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "default",
    "password": ""
}

# ClickHouse에 저장하는 함수
def write_to_clickhouse(batch_df, batch_id):
    print(f"📝 Batch {batch_id} 처리 중... (레코드 수: {batch_df.count()})")
    
    if batch_df.count() > 0:
        batch_df.write \
            .format("jdbc") \
            .option("url", clickhouse_url) \
            .option("dbtable", "ad_events_aggregated") \
            .option("driver", clickhouse_properties["driver"]) \
            .option("user", clickhouse_properties["user"]) \
            .option("password", clickhouse_properties["password"]) \
            .mode("append") \
            .save()
        
        print(f"✅ Batch {batch_id} ClickHouse 적재 완료!")
        
        # 적재된 데이터 샘플 출력
        batch_df.show(5, truncate=False)

print("📊 실시간 집계 시작 - ClickHouse 적재 모드")

# ClickHouse로 스트리밍 (foreachBatch 사용)
query = output_df \
    .writeStream \
    .outputMode("update") \
    .foreachBatch(write_to_clickhouse) \
    .trigger(processingTime="10 seconds") \
    .start()

print("🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)")
query.awaitTermination()

3.2 핵심 구현 포인트

1. foreachBatch 패턴 사용

def write_to_clickhouse(batch_df, batch_id):
    # 각 마이크로 배치마다 호출됨
    batch_df.write.format("jdbc").mode("append").save()

장점:

  • 배치 단위 트랜잭션 제어
  • 에러 핸들링 용이
  • 성능 최적화 가능

2. JDBC 연결 설정

clickhouse_url = "jdbc:clickhouse://adtech-clickhouse:8123/rtb"
  • Docker 네트워크 내부 주소 사용 (adtech-clickhouse)
  • HTTP 포트 8123 사용 (ClickHouse JDBC 기본)

3. outputMode = “update”

  • 윈도우 집계 결과의 업데이트만 전송
  • Complete 모드보다 효율적

🎯 Step 4: Spark Submit 실행 및 트러블슈팅

4.1 Maven Dependency 문제 해결

첫 번째 시도 (실패):

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,com.clickhouse:clickhouse-jdbc:0.4.6:all

에러:

requirement failed: Provided Maven Coordinates must be in the form 'groupId:artifactId:version'

:all classifier를 Spark가 인식하지 못함.

해결 방법:

docker exec -it spark-master /opt/spark/bin/spark-submit \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
  --jars https://repo1.maven.org/maven2/com/clickhouse/clickhouse-jdbc/0.4.6/clickhouse-jdbc-0.4.6-all.jar \
  /opt/spark-apps/spark_processor_clickhouse.py

--jars 옵션으로 직접 JAR 다운로드.

4.2 실행 프로세스

터미널 1: 광고 로그 생성기

python data-generator/ad_log_generator.py

터미널 2: Spark Streaming

docker exec -it spark-master /opt/spark/bin/spark-submit \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
  --jars https://repo1.maven.org/maven2/com/clickhouse/clickhouse-jdbc/0.4.6/clickhouse-jdbc-0.4.6-all.jar \
  /opt/spark-apps/spark_processor_clickhouse.py

정상 실행 로그:

✅ Spark Session 시작됨!
📡 Kafka에서 데이터 스트리밍 시작...
🔄 데이터 처리 파이프라인 구성 완료!
📊 실시간 집계 시작 - ClickHouse 적재 모드
🚀 Spark Streaming 실행 중...

📝 Batch 1 처리 중... (레코드 수: 42)
✅ Batch 1 ClickHouse 적재 완료!
+-------------------+-------------------+----------+-------+-------------+
|window_start       |window_end         |event_type|country|ad_format    |
+-------------------+-------------------+----------+-------+-------------+
|2025-12-16 23:43:00|2025-12-16 23:44:00|impression|FR     |interstitial |
|2025-12-16 23:43:00|2025-12-16 23:44:00|click     |CA     |interstitial |
+-------------------+-------------------+----------+-------+-------------+

📊 Step 5: 데이터 검증 및 분석

5.1 실시간 데이터 적재 확인

터미널 3: ClickHouse 쿼리

docker exec -it adtech-clickhouse clickhouse-client
USE rtb;

-- 총 레코드 수
SELECT count(*) FROM ad_events_aggregated;
-- 결과: 449 rows (실시간 증가 중)

-- 최근 데이터 확인
SELECT * FROM ad_events_aggregated 
ORDER BY window_start DESC 
LIMIT 10;

결과:

┌────────window_start─┬──────────window_end─┬─event_type─┬─country─┬─ad_format─────┬─event_count─┬─avg_revenue─┬─total_revenue─┐
│ 2025-12-16 23:44:00 │ 2025-12-16 23:45:00 │ click      │ US      │ rewarded_video│          42 │      0.065  │        2.7301 │
│ 2025-12-16 23:44:00 │ 2025-12-16 23:45:00 │ impression │ FR      │ banner        │         103 │      0.0    │        0.0    │
└─────────────────────┴─────────────────────┴────────────┴─────────┴───────────────┴─────────────┴─────────────┴───────────────┘

5.2 비즈니스 메트릭 분석

국가별 총 수익 순위:

SELECT 
    country,
    sum(total_revenue) as total_revenue,
    sum(event_count) as total_events
FROM ad_events_aggregated
WHERE event_type = 'click'
GROUP BY country
ORDER BY total_revenue DESC;

결과:

┌─country─┬──────total_revenue─┬─total_events─┐
│ US      │            56.2412 │          911 │
│ JP      │            45.7469 │          938 │
│ CA      │            40.1562 │          991 │
│ UK      │            34.8419 │          770 │
│ KR      │            25.3185 │          809 │
│ DE      │            25.1154 │          755 │
│ FR      │            23.3569 │          825 │
└─────────┴────────────────────┴──────────────┘

인사이트:

  • 미국(US)이 가장 높은 eCPM → 가장 많은 수익 생성
  • 설정한 국가별 가중치가 정확히 반영됨

광고 포맷별 성과:

SELECT 
    ad_format,
    sum(CASE WHEN event_type = 'impression' THEN event_count ELSE 0 END) as impressions,
    sum(CASE WHEN event_type = 'click' THEN event_count ELSE 0 END) as clicks,
    round(sum(total_revenue), 2) as revenue
FROM ad_events_aggregated
GROUP BY ad_format
ORDER BY revenue DESC;

결과:

┌─ad_format──────┬─impressions─┬─clicks─┬─revenue─┐
│ rewarded_video │       10937 │   3554 │  146.19 │
│ interstitial   │       12427 │   1705 │   73.06 │
│ banner         │       13636 │    814 │   34.92 │
└────────────────┴─────────────┴────────┴─────────┘

CTR 계산:

  • Rewarded Video: 3,554 / 10,937 = 32.5% (설정값 25%)
  • Interstitial: 1,705 / 12,427 = 13.7% (설정값 12%)
  • Banner: 814 / 13,636 = 6.0% (설정값 5%)

실제 광고 산업의 CTR 패턴과 일치!


🔍 Step 6: 성능 및 안정성 검증

6.1 처리 성능

  • 이벤트 생성 속도: ~80 events/sec
  • Spark 처리 속도: 10초마다 배치 처리
  • ClickHouse 적재 지연: < 1초
  • 총 End-to-End 지연: ~10-15초

6.2 데이터 정확성

-- 데이터 무결성 확인
SELECT 
    count(*) as total_records,
    count(DISTINCT window_start) as unique_windows,
    min(window_start) as first_window,
    max(window_start) as last_window
FROM ad_events_aggregated;

결과:

  • 중복 없음 ✅
  • 1분 단위 윈도우 정확 ✅
  • 시간순 정렬 정상 ✅

6.3 WARNING 메시지 분석

실행 중 발생한 WARNING:

WARN HDFSBackedStateStoreProvider: The state for version 2 doesn't exist

이유:

  • Spark Streaming이 처음 시작할 때 상태 저장소 초기화
  • 정상적인 메시지로, 에러가 아님

🎓 핵심 학습 내용

1. ClickHouse 테이블 설계 Best Practices

ORDER BY 선택 기준:

  • 가장 자주 필터링되는 컬럼 우선
  • Cardinality가 높은 컬럼부터 정렬
  • 시계열 데이터는 시간 컬럼 우선

PARTITION BY 전략:

  • 데이터 보관 정책에 따라 선택 (월/일/주)
  • 파티션 크기 10GB~100GB가 적정
  • 너무 많은 파티션은 오히려 성능 저하

2. Spark Streaming ClickHouse 적재 패턴

foreachBatch vs foreachPartition:

  • foreachBatch: 마이크로배치 단위 처리 (권장)
  • foreachPartition: 파티션 단위 처리 (세밀한 제어 필요 시)

JDBC vs Native Protocol:

  • JDBC: 간단하지만 성능 제한적
  • Native (향후 고려): 고성능, 복잡한 설정

3. 실시간 파이프라인 디버깅

체크리스트:

  1. Kafka 정상 실행 확인
  2. Topic 데이터 확인 (kafka-console-consumer)
  3. Spark 로그에서 배치 처리 확인
  4. ClickHouse에 데이터 적재 확인
  5. 쿼리로 데이터 정합성 검증

🚀 다음 단계 (Day 4 Preview)

현재까지 구축한 파이프라인:

[Generator] → [Kafka] → [Spark] → [ClickHouse] ✅

Day 4 계획:

  1. Redis 캐싱 레이어 추가
    • 실시간 조회 성능 최적화
    • Hot data 캐싱
  2. 추가 집계 로직 구현
    • CTR (Click-Through Rate) 계산
    • eCPM (effective Cost Per Mille) 계산
    • 실시간 이상 탐지
  3. Superset 대시보드 구축
    • ClickHouse 연동
    • 실시간 모니터링 차트

💡 트러블슈팅 요약

문제 1: Kafka NodeExistsException

원인: Zookeeper에 남은 브로커 정보 해결: 데이터 완전 초기화 후 재시작

문제 2: ClickHouse 초기화 스크립트 미실행

원인: 데이터베이스 먼저 생성 안 함 해결: SQL 파일에 CREATE DATABASE 추가

문제 3: Maven Dependency 형식 오류

원인: :all classifier 미지원 해결: --jars 옵션으로 직접 JAR 지정


📈 성과

완전한 실시간 데이터 파이프라인 구축

  • Kafka → Spark → ClickHouse
  • 초당 80개 이벤트 안정적 처리
  • 10초 이내 End-to-End 지연

프로덕션 수준의 데이터 모델링

  • 효율적인 파티셔닝 전략
  • 쿼리 최적화된 정렬 키
  • 실제 비즈니스 메트릭 반영

실시간 분석 기능

  • 국가/포맷별 성과 분석
  • CTR, eCPM 등 AdTech 핵심 지표
  • SQL 기반 유연한 분석

🎯 포트폴리오 포인트

이 프로젝트에서 보여줄 수 있는 역량:

  1. 분산 시스템 아키텍처 이해
    • Kafka, Spark, ClickHouse 통합
  2. 실시간 데이터 처리 경험
    • Streaming ETL 파이프라인 구축
  3. OLAP DB 최적화
    • ClickHouse 테이블 설계 및 파티셔닝
  4. 트러블슈팅 능력
    • Docker, Maven, JDBC 이슈 해결
  5. 비즈니스 메트릭 이해
    • AdTech 도메인 지식 적용