Day 2: Spark Streaming 실시간 데이터 처리 구축
Day 2: Spark Streaming 실시간 데이터 처리 구축
목차
- 오늘의 목표
- 아키텍처 개요
- Spark 환경 구축
- Kafka 설정 문제 해결
- Spark Streaming 코드 작성
- 스키마 불일치 문제 해결
- 실행 결과
- 배운 점과 트러블슈팅
- 다음 단계
🎯 오늘의 목표
Day 1에서 구축한 광고 로그 생성기가 Kafka로 데이터를 전송하고 있습니다. 오늘은 이 데이터를 Spark Streaming으로 실시간 처리하여 의미 있는 인사이트를 추출하는 파이프라인을 구축합니다.
핵심 목표:
- Spark Streaming 환경 구축
- Kafka와 Spark 연동
- 실시간 윈도우 기반 집계 구현
- 국가별, 광고 포맷별 성과 분석
🏗️ 아키텍처 개요
┌─────────────────┐
│ Ad Log Generator│ (Python)
└────────┬────────┘
│ ~82 events/sec
↓
┌─────────────────┐
│ Kafka Topic │ (ad-events)
└────────┬────────┘
│ Real-time stream
↓
┌─────────────────┐
│ Spark Streaming │ (1분 윈도우 집계)
└────────┬────────┘
│
↓
┌─────────────────┐
│ Console Output │ (실시간 모니터링)
└─────────────────┘
데이터 흐름:
- 광고 로그 생성기가 초당 82개의 이벤트 생성
- Kafka가 이벤트를 버퍼링하고 분산 저장
- Spark Streaming이 1분 단위로 데이터 집계
- 국가별/포맷별 통계를 콘솔에 출력
🔧 Spark 환경 구축
1. docker-compose.yml에 Spark 서비스 추가
Apache Spark 공식 이미지를 사용하여 Master-Worker 구조를 구성했습니다.
spark-master:
image: apache/spark:3.5.0
container_name: spark-master
user: root
command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080" # Spark Web UI
- "7077:7077" # Spark Master Port
volumes:
- ./src:/opt/spark-apps
- ./data:/opt/spark-data
networks:
- rtb-network
spark-worker:
image: apache/spark:3.5.0
container_name: spark-worker
user: root
command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=2G
volumes:
- ./src:/opt/spark-apps
- ./data:/opt/spark-data
networks:
- rtb-network
주요 설정:
- Master-Worker 구조: 분산 처리를 위한 클러스터 모드
- 리소스 할당: Worker당 2코어, 2GB 메모리
- 볼륨 마운트: 코드와 데이터를 호스트와 공유
- 네트워크: 다른 서비스들과 동일한 네트워크 사용
2. 서비스 시작 및 확인
# Spark 서비스 시작
docker-compose up -d spark-master spark-worker
# 서비스 상태 확인
docker-compose ps
# Spark Web UI 접속
# http://localhost:9090
Web UI에서 확인할 수 있는 정보:
- Worker 연결 상태
- 사용 가능한 코어 및 메모리
- 실행 중인 애플리케이션
🔥 Kafka 설정 문제 해결
문제 발견
Spark 구축 후 서비스를 시작했을 때 Kafka에서 다음 에러 발생:
java.lang.IllegalArgumentException: requirement failed:
Each listener must have a different port,
listeners: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9092
원인 분석
Kafka 설정에서 두 개의 리스너(PLAINTEXT, PLAINTEXT_HOST)가 모두 같은 포트(9092)를 사용하려고 해서 충돌이 발생했습니다.
Kafka 리스너의 역할:
- PLAINTEXT: Docker 컨테이너 간 통신용
- PLAINTEXT_HOST: 호스트(로컬 머신)에서 접속용
해결 방법
docker-compose.yml의 Kafka 설정을 수정:
kafka:
image: confluentinc/cp-kafka:7.4.0
container_name: adtech-kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092" # 내부 통신용 포트 추가
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
networks:
- rtb-network
핵심 변경사항:
KAFKA_LISTENERS환경변수 추가 (필수!)- 포트 분리: 29092 (컨테이너 간), 9092 (호스트)
- 각 리스너가 고유한 포트 사용
네트워크 설정 추가
모든 서비스가 동일한 네트워크를 사용하도록 설정 필요:
# docker-compose.yml 맨 아래 추가
networks:
rtb-network:
driver: bridge
그리고 모든 서비스에 networks: - rtb-network 추가.
검증
# Kafka 재시작
docker-compose down
docker-compose up -d
# 토픽 생성 테스트
docker-compose exec kafka kafka-topics --create \
--topic ad-events \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# 토픽 목록 확인
docker-compose exec kafka kafka-topics --list \
--bootstrap-server localhost:9092
결과:
Created topic ad-events.
💻 Spark Streaming 코드 작성
초기 코드 (src/spark_processor.py)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Spark Session 생성
spark = SparkSession.builder \
.appName("RTB-Pipeline-Processor") \
.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session 시작됨!")
# 광고 로그 스키마 정의 (초기 - 잘못된 버전)
ad_log_schema = StructType([
StructField("timestamp", StringType(), True),
StructField("event_type", StringType(), True),
StructField("user_id", StringType(), True),
StructField("country", StringType(), True),
StructField("ad_format", StringType(), True),
StructField("campaign_id", StringType(), True),
StructField("publisher_id", StringType(), True),
StructField("bid_price", DoubleType(), True),
StructField("win_price", DoubleType(), True),
])
print("📡 Kafka에서 데이터 스트리밍 시작...")
# Kafka에서 스트리밍 데이터 읽기
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:29092") \
.option("subscribe", "ad-events") \
.option("startingOffsets", "latest") \
.load()
# JSON 파싱
parsed_df = df.select(
from_json(col("value").cast("string"), ad_log_schema).alias("data")
).select("data.*")
# 타임스탬프 변환
processed_df = parsed_df.withColumn(
"event_timestamp",
to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)
print("🔄 데이터 처리 파이프라인 구성 완료!")
# 윈도우 기반 집계 (1분 단위)
windowed_stats = processed_df \
.withWatermark("event_timestamp", "1 minute") \
.groupBy(
window(col("event_timestamp"), "1 minute"),
col("event_type"),
col("country"),
col("ad_format")
).agg(
count("*").alias("event_count"),
avg("bid_price").alias("avg_bid_price"),
sum("win_price").alias("total_revenue")
)
# 출력 포맷 변환
output_df = windowed_stats.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("event_type"),
col("country"),
col("ad_format"),
col("event_count"),
round(col("avg_bid_price"), 4).alias("avg_bid_price"),
round(col("total_revenue"), 4).alias("total_revenue")
)
print("📊 실시간 집계 시작 - 콘솔 출력 모드")
# 콘솔 출력 (실시간 모니터링)
query = output_df \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="10 seconds") \
.start()
print("🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)")
query.awaitTermination()
핵심 개념 설명
1. Watermark (워터마크)
.withWatermark("event_timestamp", "1 minute")
워터마크란?
- 늦게 도착하는 데이터(late data)를 어느 정도까지 기다릴지 설정
- “1 minute” = 이벤트 시간 기준 1분까지 늦은 데이터 허용
- 실시간 스트리밍에서 매우 중요한 개념
예시:
- 현재 시간: 10:05
- 워터마크: 1분
- 10:03 이전의 데이터는 버려짐 (너무 늦음)
- 10:03~10:05 데이터는 처리됨
2. Window Function (윈도우 함수)
window(col("event_timestamp"), "1 minute")
윈도우란?
- 시간을 특정 간격으로 나누어 집계
- “1 minute” = 1분 단위 tumbling window
- 각 윈도우는 겹치지 않음
예시:
10:00:00 ~ 10:01:00 → Window 1
10:01:00 ~ 10:02:00 → Window 2
10:02:00 ~ 10:03:00 → Window 3
3. Output Mode
.outputMode("update")
3가지 출력 모드:
- append: 새로운 행만 출력 (불변 데이터)
- complete: 전체 결과 테이블 출력 (작은 데이터)
- update: 변경된 행만 출력 (집계 작업에 최적)
4. Trigger
.trigger(processingTime="10 seconds")
트리거 간격:
- 10초마다 마이크로 배치 실행
- 더 짧게 하면 실시간성 증가, 오버헤드 증가
- 더 길게 하면 처리량 증가, 지연 증가
🐛 스키마 불일치 문제 해결
문제 발견
Spark Streaming을 실행했지만 데이터가 표시되지 않았습니다. Kafka에 데이터가 있는지 확인:
docker exec -it adtech-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic ad-events \
--from-beginning \
--max-messages 5
실제 Kafka 데이터:
{
"event_id": "E1765818248891151",
"timestamp": "2025-12-16T02:04:08.891156",
"user_id": "U95480",
"campaign_id": "C004",
"ad_format": "rewarded_video",
"country": "UK",
"device": "Android",
"event_type": "click",
"revenue": 0.0165
}
Spark에서 기대한 스키마:
StructType([
StructField("timestamp", StringType(), True),
StructField("event_type", StringType(), True),
StructField("user_id", StringType(), True),
StructField("country", StringType(), True),
StructField("ad_format", StringType(), True),
StructField("campaign_id", StringType(), True),
StructField("publisher_id", StringType(), True), # ❌ 없음!
StructField("bid_price", DoubleType(), True), # ❌ 없음!
StructField("win_price", DoubleType(), True), # ❌ 없음!
])
원인 분석
Day 1 광고 로그 생성기의 실제 구조:
event_id: 이벤트 고유 IDdevice: 디바이스 타입 (iOS/Android)revenue: 실제 수익 (impression=0, click>0)
Spark 코드에서 가정한 구조:
publisher_id: 존재하지 않음bid_price,win_price: RTB 입찰 시스템 개념 (실제 데이터에는 없음)
→ 스키마 불일치로 인해 JSON 파싱 실패!
해결: 스키마 수정
# 수정된 스키마 (실제 데이터 구조에 맞춤)
ad_log_schema = StructType([
StructField("event_id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("user_id", StringType(), True),
StructField("campaign_id", StringType(), True),
StructField("ad_format", StringType(), True),
StructField("country", StringType(), True),
StructField("device", StringType(), True),
StructField("event_type", StringType(), True),
StructField("revenue", DoubleType(), True),
])
타임스탬프 형식 변경
문제:
- 실제 데이터:
"2025-12-16T02:04:08.891156"(ISO 8601) - 기존 파싱:
"yyyy-MM-dd HH:mm:ss"(다른 형식)
해결:
# 기존
processed_df = parsed_df.withColumn(
"event_timestamp",
to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)
# 수정 (ISO 8601 자동 파싱)
processed_df = parsed_df.withColumn(
"event_timestamp",
to_timestamp(col("timestamp"))
)
집계 로직 변경
# 기존 (존재하지 않는 필드)
.agg(
count("*").alias("event_count"),
avg("bid_price").alias("avg_bid_price"),
sum("win_price").alias("total_revenue")
)
# 수정 (실제 revenue 사용)
.agg(
count("*").alias("event_count"),
avg("revenue").alias("avg_revenue"),
sum("revenue").alias("total_revenue")
)
startingOffsets 설정 변경
# 기존
.option("startingOffsets", "latest") # Spark 시작 후 데이터만
# 수정
.option("startingOffsets", "earliest") # 토픽의 처음부터
왜 변경?
latest: Spark가 시작된 이후 도착한 데이터만 처리earliest: 토픽에 저장된 모든 데이터 처리- 테스트 단계에서는
earliest가 유용
🎯 최종 코드
src/spark_processor.py (완성 버전):
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Spark Session 생성
spark = SparkSession.builder \
.appName("RTB-Pipeline-Processor") \
.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session 시작됨!")
# 광고 로그 스키마 정의 (실제 데이터 구조에 맞게 수정)
ad_log_schema = StructType([
StructField("event_id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("user_id", StringType(), True),
StructField("campaign_id", StringType(), True),
StructField("ad_format", StringType(), True),
StructField("country", StringType(), True),
StructField("device", StringType(), True),
StructField("event_type", StringType(), True),
StructField("revenue", DoubleType(), True),
])
print("📡 Kafka에서 데이터 스트리밍 시작...")
# Kafka에서 스트리밍 데이터 읽기
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:29092") \
.option("subscribe", "ad-events") \
.option("startingOffsets", "earliest") \
.load()
# JSON 파싱
parsed_df = df.select(
from_json(col("value").cast("string"), ad_log_schema).alias("data")
).select("data.*")
# 타임스탬프 변환 (ISO 8601 형식 자동 파싱)
processed_df = parsed_df.withColumn(
"event_timestamp",
to_timestamp(col("timestamp"))
)
print("🔄 데이터 처리 파이프라인 구성 완료!")
# 윈도우 기반 집계 (1분 단위)
windowed_stats = processed_df \
.withWatermark("event_timestamp", "1 minute") \
.groupBy(
window(col("event_timestamp"), "1 minute"),
col("event_type"),
col("country"),
col("ad_format")
).agg(
count("*").alias("event_count"),
avg("revenue").alias("avg_revenue"),
sum("revenue").alias("total_revenue")
)
# 출력 포맷 변환
output_df = windowed_stats.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("event_type"),
col("country"),
col("ad_format"),
col("event_count"),
round(col("avg_revenue"), 4).alias("avg_revenue"),
round(col("total_revenue"), 4).alias("total_revenue")
)
print("📊 실시간 집계 시작 - 콘솔 출력 모드")
# 콘솔 출력 (실시간 모니터링)
query = output_df \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="10 seconds") \
.start()
print("🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)")
query.awaitTermination()
🚀 실행 및 결과
실행 방법
터미널 1: 광고 로그 생성기
cd ~/adtech-realtime-pipeline
python data-generator/ad_log_generator.py
터미널 2: Spark Streaming
docker exec -it spark-master /opt/spark/bin/spark-submit \
--master spark://spark-master:7077 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
/opt/spark-apps/spark_processor.py
실행 결과
✅ Spark Session 시작됨!
📡 Kafka에서 데이터 스트리밍 시작...
🔄 데이터 처리 파이프라인 구성 완료!
📊 실시간 집계 시작 - 콘솔 출력 모드
🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)
-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|window_start |window_end |event_type|country|ad_format |event_count|avg_revenue|total_revenue|
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|2025-12-16 02:06:00|2025-12-16 02:07:00|impression|DE |interstitial |207 |0.0 |0.0 |
|2025-12-16 02:08:00|2025-12-16 02:09:00|impression|CA |interstitial |206 |0.0 |0.0 |
|2025-12-16 02:11:00|2025-12-16 02:12:00|impression|KR |rewarded_video|175 |0.0 |0.0 |
|2025-12-16 02:07:00|2025-12-16 02:08:00|click |CA |banner |13 |0.0426 |0.5538 |
|2025-12-16 02:04:00|2025-12-16 02:05:00|impression|US |banner |204 |0.0 |0.0 |
|2025-12-16 02:14:00|2025-12-16 02:15:00|click |UK |banner |4 |0.0364 |0.1454 |
|2025-12-16 02:15:00|2025-12-16 02:16:00|click |FR |interstitial |25 |0.0325 |0.8127 |
|2025-12-16 02:08:00|2025-12-16 02:09:00|click |CA |rewarded_video|58 |0.0445 |2.5801 |
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
only showing top 20 rows
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|window_start |window_end |event_type|country|ad_format |event_count|avg_revenue|total_revenue|
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|2025-12-16 02:19:00|2025-12-16 02:20:00|impression|US |banner |160 |0.0 |0.0 |
|2025-12-16 02:19:00|2025-12-16 02:20:00|impression|DE |interstitial |148 |0.0 |0.0 |
|2025-12-16 02:19:00|2025-12-16 02:20:00|click |JP |rewarded_video|35 |0.0485 |1.6987 |
|2025-12-16 02:19:00|2025-12-16 02:20:00|click |CA |banner |9 |0.0494 |0.4443 |
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
only showing top 20 rows
결과 분석
1. 이벤트 타입별 특성
Impression (노출) 이벤트:
event_type=impression → avg_revenue=0.0, total_revenue=0.0
- 광고가 화면에 표시되기만 함
- 수익 발생하지 않음
- 대량 발생 (분당 100~250회)
Click (클릭) 이벤트:
event_type=click → avg_revenue=0.03~0.05, total_revenue > 0
- 사용자가 실제로 광고 클릭
- 수익 발생 (클릭당 $0.03~0.05)
- 상대적으로 적게 발생 (분당 10~60회)
2. 광고 포맷별 성과
banner: 낮은 클릭률, 낮은 단가
interstitial: 중간 클릭률, 중간 단가
rewarded_video: 높은 클릭률, 높은 단가
실제 데이터 예시:
CA banner (1분): 13 clicks, $0.0426 avg, $0.55 total
CA rewarded_video (1분): 58 clicks, $0.0445 avg, $2.58 total
→ Rewarded video가 약 5배 더 많은 수익 생성!
3. 국가별 트래픽 차이
주요 국가:
- US, KR, JP: 높은 트래픽 (분당 200+ impressions)
- UK, CA, FR, DE: 중간 트래픽 (분당 150-200 impressions)
4. 시간대별 패턴
1분 윈도우로 집계되어 실시간 트래픽 변화 관찰 가능:
02:04:00~02:05:00 → 02:06:00~02:07:00 → 02:08:00~02:09:00
💡 배운 점과 트러블슈팅
1. Docker 네트워크의 중요성
문제:
- 처음에는 각 서비스가 독립적으로 실행
- 서비스 간 통신 불가
해결:
- 모든 서비스를 동일한 Docker 네트워크에 배치
- 컨테이너 이름으로 서비스 간 통신 가능 (
kafka:29092)
교훈:
networks:
rtb-network:
driver: bridge
이 한 줄이 모든 서비스를 연결!
2. Kafka 리스너 설정의 복잡성
핵심 개념:
LISTENERS: Kafka가 어디서 연결을 받을지ADVERTISED_LISTENERS: 클라이언트에게 어떤 주소를 알려줄지
실전 팁:
# 컨테이너 내부
PLAINTEXT://kafka:29092
# 호스트 (로컬 개발)
PLAINTEXT_HOST://localhost:9092
두 개를 분리하면 유연성 증가!
3. 스키마 정의의 중요성
실수:
- 데이터 구조를 확인하지 않고 코드 작성
- 존재하지 않는 필드 참조
올바른 접근:
- 먼저 Kafka 토픽의 실제 데이터 확인
- 스키마 정의
- 코드 작성
검증 방법:
docker exec -it adtech-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic ad-events \
--from-beginning \
--max-messages 1
4. Watermark와 Window의 이해
Watermark (워터마크):
- 늦게 도착하는 데이터 처리 정책
- 너무 짧으면 → 데이터 손실
- 너무 길면 → 메모리 사용량 증가
Window (윈도우):
- Tumbling Window: 겹치지 않음 (우리가 사용)
- Sliding Window: 겹침 (더 부드러운 분석)
- Session Window: 이벤트 간격 기반
5. startingOffsets 전략
latest (기본값):
- 장점: 최신 데이터만, 빠른 시작
- 단점: 과거 데이터 놓침
earliest:
- 장점: 모든 데이터 처리, 재현 가능
- 단점: 토픽이 크면 느림
프로덕션 권장:
- 처음:
earliest(전체 데이터 분석) - 이후: 체크포인트 사용 (중단된 지점부터 재개)
6. 성능 최적화 고려사항
현재 설정:
.trigger(processingTime="10 seconds")
트레이드오프:
- 짧은 간격 (1-5초): 낮은 지연, 높은 오버헤드
- 긴 간격 (30-60초): 높은 처리량, 긴 지연
우리 선택:
- 10초: 실시간성과 효율의 균형
📊 성능 지표
처리 성능
입력: ~82 events/sec (광고 로그 생성기)
처리: 10초 마이크로배치
지연: < 15초 (end-to-end)
리소스 사용
Spark Worker:
- CPU: 2 cores
- Memory: 2GB
- 실제 사용: ~30% CPU, ~500MB RAM
데이터 처리량
1분 윈도우당:
- Impressions: 100-250 events
- Clicks: 10-60 events
- 총: ~5,000 events/분 (~82 events/초)
🎓 핵심 개념 정리
Spark Streaming 아키텍처
Driver (Master)
↓
Executor (Worker) → Task 1
→ Task 2
→ Task ...
역할:
- Driver: 전체 조율, 스케줄링
- Executor: 실제 데이터 처리
- Task: 파티션별 처리 단위
Structured Streaming vs DStream
Structured Streaming (우리가 사용):
- DataFrame/Dataset API
- SQL과 유사한 문법
- 높은 수준의 추상화
- 최적화 자동
DStream (구버전):
- RDD 기반
- 낮은 수준 제어
- 수동 최적화 필요
선택 이유: Structured Streaming이 현대적이고 사용하기 쉬움!
마이크로배치 vs 연속 처리
마이크로배치 (우리가 사용):
[Batch 0] → [Batch 1] → [Batch 2] → ...
(10초) (10초) (10초)
연속 처리:
→ → → → → (계속)
마이크로배치 장점:
- 장애 복구 용이
- 정확한 처리 보장
- 디버깅 쉬움
🔍 디버깅 팁
1. Kafka 데이터 확인
# 최신 메시지 확인
docker exec -it adtech-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic ad-events \
--max-messages 10
# 특정 파티션 확인
docker exec -it adtech-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic ad-events \
--partition 0 \
--offset 0 \
--max-messages 5
2. Spark UI 활용
http://localhost:9090 → Spark Master UI
http://localhost:4040 → Spark Application UI (실행 중일 때)
확인 사항:
- Streaming 탭: 배치 처리 시간
- Executors 탭: 리소스 사용량
- SQL 탭: 쿼리 실행 계획
3. 로그 레벨 조정
# 더 자세한 로그
spark.sparkContext.setLogLevel("INFO")
# 에러만
spark.sparkContext.setLogLevel("ERROR")
# 경고만 (기본 권장)
spark.sparkContext.setLogLevel("WARN")
4. 데이터 검증
# 스트림 중간 결과 확인
parsed_df.writeStream \
.format("console") \
.outputMode("append") \
.start()
📁 프로젝트 구조 (Day 2 완료 후)
adtech-realtime-pipeline/
├── data-generator/
│ └── ad_log_generator.py # 광고 로그 생성기
├── src/
│ └── spark_processor.py # Spark Streaming (NEW!)
├── sql/
│ └── create_tables.sql # ClickHouse 테이블 정의
├── docker-compose.yml # 전체 서비스 정의 (Updated!)
├── requirements.txt
└── README.md
🚀 다음 단계 (Day 3 Preview)
목표: 데이터 영속화 및 분석 기반 구축
-
ClickHouse 테이블 생성
CREATE TABLE ad_events_aggregated ( window_start DateTime, window_end DateTime, event_type String, country String, ad_format String, event_count UInt64, avg_revenue Float64, total_revenue Float64 ) ENGINE = MergeTree() ORDER BY (window_start, country); -
Spark → ClickHouse 연동
- JDBC 드라이버 추가
- 배치 삽입 최적화
- 에러 처리 및 재시도
-
데이터 쿼리 및 검증
SELECT country, sum(total_revenue) as revenue, sum(event_count) as events FROM ad_events_aggregated WHERE event_type = 'click' GROUP BY country ORDER BY revenue DESC; -
Superset 대시보드 준비
- ClickHouse 데이터소스 연결
- 차트 타입 설계
- 실시간 업데이트 설정
📚 참고 자료
공식 문서
추천 읽을거리
- Watermarking in Spark Streaming
- Kafka Consumer Groups 이해하기
- Docker Networking Best Practices
유용한 명령어 모음
# Spark 로그 실시간 확인
docker logs -f spark-master
# Kafka 토픽 상세 정보
docker exec -it adtech-kafka kafka-topics --describe \
--topic ad-events \
--bootstrap-server localhost:9092
# 실행 중인 Spark 애플리케이션 확인
docker exec -it spark-master ps aux | grep spark
# 리소스 사용량 모니터링
docker stats spark-master spark-worker
💭 회고
잘된 점
✅ Kafka-Spark 파이프라인 구축 성공 ✅ 실시간 윈도우 집계 구현 ✅ 의미 있는 비즈니스 인사이트 도출 ✅ 체계적인 문제 해결 과정
어려웠던 점
🔥 Docker 네트워크 설정의 복잡성 🔥 Kafka 리스너 구성 이해 🔥 스키마 불일치 디버깅 🔥 Spark Streaming 개념 학습
개선할 점
💡 처음부터 데이터 구조 명확히 정의하기 💡 더 체계적인 로깅 및 모니터링 💡 단위 테스트 추가 💡 설정 파일 외부화 (하드코딩 제거)
🎯 Day 2 체크리스트
- Spark Master & Worker 구축
- Kafka 네트워크 설정 문제 해결
- Spark Streaming 코드 작성
- 스키마 불일치 문제 해결
- 실시간 윈도우 집계 구현
- 콘솔 출력으로 데이터 검증
- 성능 및 안정성 확인
다음 목표: ClickHouse에 데이터 저장하고 Superset으로 시각화! 🚀
작성일: 2025년 12월 16일 소요 시간: 약 4시간 난이도: ⭐⭐⭐⭐☆ 만족도: ⭐⭐⭐⭐⭐