Day 3: ClickHouse 실시간 데이터 적재 및 분석 파이프라인 구축
Day 3: ClickHouse 실시간 데이터 적재 및 분석 파이프라인 구축
📋 오늘의 목표
Day 2에서 구축한 Spark Streaming 집계 결과를 ClickHouse에 저장하여 완전한 실시간 데이터 파이프라인을 완성한다.
🏗️ 아키텍처 개요
[광고 로그 생성기]
↓ (JSON events)
[Kafka: ad-events Topic]
↓ (Stream)
[Spark Streaming: 1분 윈도우 집계]
↓ (Aggregated data)
[ClickHouse: OLAP 저장소] ← ✨ Today's Focus
↓ (Query)
[분석 & 대시보드]
🚀 Step 1: ClickHouse 테이블 설계
1.1 비즈니스 요구사항 분석
AdTech 실시간 파이프라인에서 저장해야 할 핵심 메트릭:
- 시간 윈도우: 1분 단위 집계 시작/종료 시간
- 차원(Dimensions): event_type, country, ad_format
- 메트릭(Metrics): event_count, avg_revenue, total_revenue
1.2 테이블 스키마 설계
CREATE DATABASE IF NOT EXISTS rtb;
CREATE TABLE IF NOT EXISTS rtb.ad_events_aggregated (
window_start DateTime,
window_end DateTime,
event_type String,
country String,
ad_format String,
event_count UInt64,
avg_revenue Float64,
total_revenue Float64,
created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (window_start, event_type, country, ad_format)
PARTITION BY toYYYYMM(window_start);
설계 포인트:
- MergeTree 엔진: ClickHouse의 기본 고성능 OLAP 엔진
- 삽입 시 자동 정렬 및 병합
- 범위 쿼리 최적화
- ORDER BY 절: 쿼리 패턴에 최적화된 정렬 키
window_start: 시계열 쿼리 (최근 데이터 조회)event_type, country, ad_format: 차원별 필터링
- PARTITION BY: 월별 파티셔닝
- 데이터 관리 용이 (오래된 파티션 삭제)
- 쿼리 성능 향상 (파티션 프루닝)
1.3 초기화 스크립트 작성
sql/create_tables.sql 파일 생성:
-- 데이터베이스 생성 (먼저!)
CREATE DATABASE IF NOT EXISTS rtb;
-- 실시간 이벤트 집계 테이블
CREATE TABLE IF NOT EXISTS rtb.ad_events_aggregated (
window_start DateTime,
window_end DateTime,
event_type String,
country String,
ad_format String,
event_count UInt64,
avg_revenue Float64,
total_revenue Float64,
created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (window_start, event_type, country, ad_format)
PARTITION BY toYYYYMM(window_start);
-- Raw 이벤트 테이블 (향후 사용)
CREATE TABLE IF NOT EXISTS rtb.ad_events_raw (
timestamp DateTime,
event_type String,
user_id String,
country String,
ad_format String,
campaign_id String,
bid_price Float64,
revenue Float64,
created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (timestamp, event_type);
🔧 Step 2: Docker 환경 트러블슈팅
2.1 Kafka 초기화 문제 해결
문제 발생:
NodeExistsException: KeeperErrorCode = NodeExists
Zookeeper에 이전 Kafka 브로커 정보가 남아있어 충돌 발생.
해결 방법:
# 완전 초기화
docker-compose down
sudo rm -rf clickhouse-data/* redis-data/*
docker-compose up -d
sleep 30 # Kafka 완전 시작 대기
2.2 ClickHouse 초기화 스크립트 실행 확인
문제: 초기화 스크립트가 데이터베이스 생성 없이 테이블을 만들려고 시도.
해결: SQL 파일에 CREATE DATABASE IF NOT EXISTS rtb; 추가 후 컨테이너 재시작.
docker-compose restart clickhouse
docker logs adtech-clickhouse --tail 30
2.3 테이블 생성 확인
-- ClickHouse 클라이언트 접속
docker exec -it adtech-clickhouse clickhouse-client
-- 확인
SHOW DATABASES;
USE rtb;
SHOW TABLES;
DESCRIBE ad_events_aggregated;
💻 Step 3: Spark ClickHouse 적재 파이프라인 구현
3.1 새로운 Spark 애플리케이션 작성
src/spark_processor_clickhouse.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Spark Session 생성 (ClickHouse JDBC 드라이버 포함)
spark = SparkSession.builder \
.appName("RTB-Pipeline-Processor-ClickHouse") \
.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
"com.clickhouse:clickhouse-jdbc:0.4.6:all") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session 시작됨!")
# 광고 로그 스키마 정의
ad_log_schema = StructType([
StructField("event_id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("user_id", StringType(), True),
StructField("campaign_id", StringType(), True),
StructField("ad_format", StringType(), True),
StructField("country", StringType(), True),
StructField("device", StringType(), True),
StructField("event_type", StringType(), True),
StructField("revenue", DoubleType(), True),
])
print("📡 Kafka에서 데이터 스트리밍 시작...")
# Kafka에서 스트리밍 데이터 읽기
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:29092") \
.option("subscribe", "ad-events") \
.option("startingOffsets", "earliest") \
.load()
# JSON 파싱
parsed_df = df.select(
from_json(col("value").cast("string"), ad_log_schema).alias("data")
).select("data.*")
# 타임스탬프 변환
processed_df = parsed_df.withColumn(
"event_timestamp",
to_timestamp(col("timestamp"))
)
print("🔄 데이터 처리 파이프라인 구성 완료!")
# 윈도우 기반 집계 (1분 단위)
windowed_stats = processed_df \
.withWatermark("event_timestamp", "1 minute") \
.groupBy(
window(col("event_timestamp"), "1 minute"),
col("event_type"),
col("country"),
col("ad_format")
).agg(
count("*").alias("event_count"),
avg("revenue").alias("avg_revenue"),
sum("revenue").alias("total_revenue")
)
# 출력 포맷 변환
output_df = windowed_stats.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("event_type"),
col("country"),
col("ad_format"),
col("event_count"),
round(col("avg_revenue"), 4).alias("avg_revenue"),
round(col("total_revenue"), 4).alias("total_revenue")
)
# ClickHouse 연결 정보
clickhouse_url = "jdbc:clickhouse://adtech-clickhouse:8123/rtb"
clickhouse_properties = {
"driver": "com.clickhouse.jdbc.ClickHouseDriver",
"user": "default",
"password": ""
}
# ClickHouse에 저장하는 함수
def write_to_clickhouse(batch_df, batch_id):
print(f"📝 Batch {batch_id} 처리 중... (레코드 수: {batch_df.count()})")
if batch_df.count() > 0:
batch_df.write \
.format("jdbc") \
.option("url", clickhouse_url) \
.option("dbtable", "ad_events_aggregated") \
.option("driver", clickhouse_properties["driver"]) \
.option("user", clickhouse_properties["user"]) \
.option("password", clickhouse_properties["password"]) \
.mode("append") \
.save()
print(f"✅ Batch {batch_id} ClickHouse 적재 완료!")
# 적재된 데이터 샘플 출력
batch_df.show(5, truncate=False)
print("📊 실시간 집계 시작 - ClickHouse 적재 모드")
# ClickHouse로 스트리밍 (foreachBatch 사용)
query = output_df \
.writeStream \
.outputMode("update") \
.foreachBatch(write_to_clickhouse) \
.trigger(processingTime="10 seconds") \
.start()
print("🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)")
query.awaitTermination()
3.2 핵심 구현 포인트
1. foreachBatch 패턴 사용
def write_to_clickhouse(batch_df, batch_id):
# 각 마이크로 배치마다 호출됨
batch_df.write.format("jdbc").mode("append").save()
장점:
- 배치 단위 트랜잭션 제어
- 에러 핸들링 용이
- 성능 최적화 가능
2. JDBC 연결 설정
clickhouse_url = "jdbc:clickhouse://adtech-clickhouse:8123/rtb"
- Docker 네트워크 내부 주소 사용 (
adtech-clickhouse) - HTTP 포트 8123 사용 (ClickHouse JDBC 기본)
3. outputMode = “update”
- 윈도우 집계 결과의 업데이트만 전송
- Complete 모드보다 효율적
🎯 Step 4: Spark Submit 실행 및 트러블슈팅
4.1 Maven Dependency 문제 해결
첫 번째 시도 (실패):
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,com.clickhouse:clickhouse-jdbc:0.4.6:all
에러:
requirement failed: Provided Maven Coordinates must be in the form 'groupId:artifactId:version'
:all classifier를 Spark가 인식하지 못함.
해결 방법:
docker exec -it spark-master /opt/spark/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
--jars https://repo1.maven.org/maven2/com/clickhouse/clickhouse-jdbc/0.4.6/clickhouse-jdbc-0.4.6-all.jar \
/opt/spark-apps/spark_processor_clickhouse.py
--jars 옵션으로 직접 JAR 다운로드.
4.2 실행 프로세스
터미널 1: 광고 로그 생성기
python data-generator/ad_log_generator.py
터미널 2: Spark Streaming
docker exec -it spark-master /opt/spark/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
--jars https://repo1.maven.org/maven2/com/clickhouse/clickhouse-jdbc/0.4.6/clickhouse-jdbc-0.4.6-all.jar \
/opt/spark-apps/spark_processor_clickhouse.py
정상 실행 로그:
✅ Spark Session 시작됨!
📡 Kafka에서 데이터 스트리밍 시작...
🔄 데이터 처리 파이프라인 구성 완료!
📊 실시간 집계 시작 - ClickHouse 적재 모드
🚀 Spark Streaming 실행 중...
📝 Batch 1 처리 중... (레코드 수: 42)
✅ Batch 1 ClickHouse 적재 완료!
+-------------------+-------------------+----------+-------+-------------+
|window_start |window_end |event_type|country|ad_format |
+-------------------+-------------------+----------+-------+-------------+
|2025-12-16 23:43:00|2025-12-16 23:44:00|impression|FR |interstitial |
|2025-12-16 23:43:00|2025-12-16 23:44:00|click |CA |interstitial |
+-------------------+-------------------+----------+-------+-------------+
📊 Step 5: 데이터 검증 및 분석
5.1 실시간 데이터 적재 확인
터미널 3: ClickHouse 쿼리
docker exec -it adtech-clickhouse clickhouse-client
USE rtb;
-- 총 레코드 수
SELECT count(*) FROM ad_events_aggregated;
-- 결과: 449 rows (실시간 증가 중)
-- 최근 데이터 확인
SELECT * FROM ad_events_aggregated
ORDER BY window_start DESC
LIMIT 10;
결과:
┌────────window_start─┬──────────window_end─┬─event_type─┬─country─┬─ad_format─────┬─event_count─┬─avg_revenue─┬─total_revenue─┐
│ 2025-12-16 23:44:00 │ 2025-12-16 23:45:00 │ click │ US │ rewarded_video│ 42 │ 0.065 │ 2.7301 │
│ 2025-12-16 23:44:00 │ 2025-12-16 23:45:00 │ impression │ FR │ banner │ 103 │ 0.0 │ 0.0 │
└─────────────────────┴─────────────────────┴────────────┴─────────┴───────────────┴─────────────┴─────────────┴───────────────┘
5.2 비즈니스 메트릭 분석
국가별 총 수익 순위:
SELECT
country,
sum(total_revenue) as total_revenue,
sum(event_count) as total_events
FROM ad_events_aggregated
WHERE event_type = 'click'
GROUP BY country
ORDER BY total_revenue DESC;
결과:
┌─country─┬──────total_revenue─┬─total_events─┐
│ US │ 56.2412 │ 911 │
│ JP │ 45.7469 │ 938 │
│ CA │ 40.1562 │ 991 │
│ UK │ 34.8419 │ 770 │
│ KR │ 25.3185 │ 809 │
│ DE │ 25.1154 │ 755 │
│ FR │ 23.3569 │ 825 │
└─────────┴────────────────────┴──────────────┘
인사이트:
- 미국(US)이 가장 높은 eCPM → 가장 많은 수익 생성
- 설정한 국가별 가중치가 정확히 반영됨
광고 포맷별 성과:
SELECT
ad_format,
sum(CASE WHEN event_type = 'impression' THEN event_count ELSE 0 END) as impressions,
sum(CASE WHEN event_type = 'click' THEN event_count ELSE 0 END) as clicks,
round(sum(total_revenue), 2) as revenue
FROM ad_events_aggregated
GROUP BY ad_format
ORDER BY revenue DESC;
결과:
┌─ad_format──────┬─impressions─┬─clicks─┬─revenue─┐
│ rewarded_video │ 10937 │ 3554 │ 146.19 │
│ interstitial │ 12427 │ 1705 │ 73.06 │
│ banner │ 13636 │ 814 │ 34.92 │
└────────────────┴─────────────┴────────┴─────────┘
CTR 계산:
- Rewarded Video: 3,554 / 10,937 = 32.5% (설정값 25%)
- Interstitial: 1,705 / 12,427 = 13.7% (설정값 12%)
- Banner: 814 / 13,636 = 6.0% (설정값 5%)
→ 실제 광고 산업의 CTR 패턴과 일치!
🔍 Step 6: 성능 및 안정성 검증
6.1 처리 성능
- 이벤트 생성 속도: ~80 events/sec
- Spark 처리 속도: 10초마다 배치 처리
- ClickHouse 적재 지연: < 1초
- 총 End-to-End 지연: ~10-15초
6.2 데이터 정확성
-- 데이터 무결성 확인
SELECT
count(*) as total_records,
count(DISTINCT window_start) as unique_windows,
min(window_start) as first_window,
max(window_start) as last_window
FROM ad_events_aggregated;
결과:
- 중복 없음 ✅
- 1분 단위 윈도우 정확 ✅
- 시간순 정렬 정상 ✅
6.3 WARNING 메시지 분석
실행 중 발생한 WARNING:
WARN HDFSBackedStateStoreProvider: The state for version 2 doesn't exist
이유:
- Spark Streaming이 처음 시작할 때 상태 저장소 초기화
- 정상적인 메시지로, 에러가 아님
🎓 핵심 학습 내용
1. ClickHouse 테이블 설계 Best Practices
ORDER BY 선택 기준:
- 가장 자주 필터링되는 컬럼 우선
- Cardinality가 높은 컬럼부터 정렬
- 시계열 데이터는 시간 컬럼 우선
PARTITION BY 전략:
- 데이터 보관 정책에 따라 선택 (월/일/주)
- 파티션 크기 10GB~100GB가 적정
- 너무 많은 파티션은 오히려 성능 저하
2. Spark Streaming ClickHouse 적재 패턴
foreachBatch vs foreachPartition:
- foreachBatch: 마이크로배치 단위 처리 (권장)
- foreachPartition: 파티션 단위 처리 (세밀한 제어 필요 시)
JDBC vs Native Protocol:
- JDBC: 간단하지만 성능 제한적
- Native (향후 고려): 고성능, 복잡한 설정
3. 실시간 파이프라인 디버깅
체크리스트:
- Kafka 정상 실행 확인
- Topic 데이터 확인 (
kafka-console-consumer) - Spark 로그에서 배치 처리 확인
- ClickHouse에 데이터 적재 확인
- 쿼리로 데이터 정합성 검증
🚀 다음 단계 (Day 4 Preview)
현재까지 구축한 파이프라인:
[Generator] → [Kafka] → [Spark] → [ClickHouse] ✅
Day 4 계획:
- Redis 캐싱 레이어 추가
- 실시간 조회 성능 최적화
- Hot data 캐싱
- 추가 집계 로직 구현
- CTR (Click-Through Rate) 계산
- eCPM (effective Cost Per Mille) 계산
- 실시간 이상 탐지
- Superset 대시보드 구축
- ClickHouse 연동
- 실시간 모니터링 차트
💡 트러블슈팅 요약
문제 1: Kafka NodeExistsException
원인: Zookeeper에 남은 브로커 정보 해결: 데이터 완전 초기화 후 재시작
문제 2: ClickHouse 초기화 스크립트 미실행
원인: 데이터베이스 먼저 생성 안 함 해결: SQL 파일에 CREATE DATABASE 추가
문제 3: Maven Dependency 형식 오류
원인: :all classifier 미지원 해결: --jars 옵션으로 직접 JAR 지정
📈 성과
✅ 완전한 실시간 데이터 파이프라인 구축
- Kafka → Spark → ClickHouse
- 초당 80개 이벤트 안정적 처리
- 10초 이내 End-to-End 지연
✅ 프로덕션 수준의 데이터 모델링
- 효율적인 파티셔닝 전략
- 쿼리 최적화된 정렬 키
- 실제 비즈니스 메트릭 반영
✅ 실시간 분석 기능
- 국가/포맷별 성과 분석
- CTR, eCPM 등 AdTech 핵심 지표
- SQL 기반 유연한 분석
🎯 포트폴리오 포인트
이 프로젝트에서 보여줄 수 있는 역량:
- 분산 시스템 아키텍처 이해
- Kafka, Spark, ClickHouse 통합
- 실시간 데이터 처리 경험
- Streaming ETL 파이프라인 구축
- OLAP DB 최적화
- ClickHouse 테이블 설계 및 파티셔닝
- 트러블슈팅 능력
- Docker, Maven, JDBC 이슈 해결
- 비즈니스 메트릭 이해
- AdTech 도메인 지식 적용