Day 2: Spark Streaming 실시간 데이터 처리 구축

목차

  1. 오늘의 목표
  2. 아키텍처 개요
  3. Spark 환경 구축
  4. Kafka 설정 문제 해결
  5. Spark Streaming 코드 작성
  6. 스키마 불일치 문제 해결
  7. 실행 결과
  8. 배운 점과 트러블슈팅
  9. 다음 단계

🎯 오늘의 목표

Day 1에서 구축한 광고 로그 생성기가 Kafka로 데이터를 전송하고 있습니다. 오늘은 이 데이터를 Spark Streaming으로 실시간 처리하여 의미 있는 인사이트를 추출하는 파이프라인을 구축합니다.

핵심 목표:

  • Spark Streaming 환경 구축
  • Kafka와 Spark 연동
  • 실시간 윈도우 기반 집계 구현
  • 국가별, 광고 포맷별 성과 분석

🏗️ 아키텍처 개요

┌─────────────────┐
│ Ad Log Generator│ (Python)
└────────┬────────┘
         │ ~82 events/sec
         ↓
┌─────────────────┐
│   Kafka Topic   │ (ad-events)
└────────┬────────┘
         │ Real-time stream
         ↓
┌─────────────────┐
│ Spark Streaming │ (1분 윈도우 집계)
└────────┬────────┘
         │
         ↓
┌─────────────────┐
│  Console Output │ (실시간 모니터링)
└─────────────────┘

데이터 흐름:

  1. 광고 로그 생성기가 초당 82개의 이벤트 생성
  2. Kafka가 이벤트를 버퍼링하고 분산 저장
  3. Spark Streaming이 1분 단위로 데이터 집계
  4. 국가별/포맷별 통계를 콘솔에 출력

🔧 Spark 환경 구축

1. docker-compose.yml에 Spark 서비스 추가

Apache Spark 공식 이미지를 사용하여 Master-Worker 구조를 구성했습니다.

spark-master:
  image: apache/spark:3.5.0
  container_name: spark-master
  user: root
  command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master
  ports:
    - "9090:8080"  # Spark Web UI
    - "7077:7077"  # Spark Master Port
  volumes:
    - ./src:/opt/spark-apps
    - ./data:/opt/spark-data
  networks:
    - rtb-network

spark-worker:
  image: apache/spark:3.5.0
  container_name: spark-worker
  user: root
  command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
  depends_on:
    - spark-master
  environment:
    - SPARK_WORKER_CORES=2
    - SPARK_WORKER_MEMORY=2G
  volumes:
    - ./src:/opt/spark-apps
    - ./data:/opt/spark-data
  networks:
    - rtb-network

주요 설정:

  • Master-Worker 구조: 분산 처리를 위한 클러스터 모드
  • 리소스 할당: Worker당 2코어, 2GB 메모리
  • 볼륨 마운트: 코드와 데이터를 호스트와 공유
  • 네트워크: 다른 서비스들과 동일한 네트워크 사용

2. 서비스 시작 및 확인

# Spark 서비스 시작
docker-compose up -d spark-master spark-worker

# 서비스 상태 확인
docker-compose ps

# Spark Web UI 접속
# http://localhost:9090

Web UI에서 확인할 수 있는 정보:

  • Worker 연결 상태
  • 사용 가능한 코어 및 메모리
  • 실행 중인 애플리케이션

🔥 Kafka 설정 문제 해결

문제 발견

Spark 구축 후 서비스를 시작했을 때 Kafka에서 다음 에러 발생:

java.lang.IllegalArgumentException: requirement failed: 
Each listener must have a different port, 
listeners: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9092

원인 분석

Kafka 설정에서 두 개의 리스너(PLAINTEXT, PLAINTEXT_HOST)가 모두 같은 포트(9092)를 사용하려고 해서 충돌이 발생했습니다.

Kafka 리스너의 역할:

  • PLAINTEXT: Docker 컨테이너 간 통신용
  • PLAINTEXT_HOST: 호스트(로컬 머신)에서 접속용

해결 방법

docker-compose.yml의 Kafka 설정을 수정:

kafka:
  image: confluentinc/cp-kafka:7.4.0
  container_name: adtech-kafka
  depends_on:
    - zookeeper
  ports:
    - "9092:9092"
    - "29092:29092"  # 내부 통신용 포트 추가
  environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
  networks:
    - rtb-network

핵심 변경사항:

  1. KAFKA_LISTENERS 환경변수 추가 (필수!)
  2. 포트 분리: 29092 (컨테이너 간), 9092 (호스트)
  3. 각 리스너가 고유한 포트 사용

네트워크 설정 추가

모든 서비스가 동일한 네트워크를 사용하도록 설정 필요:

# docker-compose.yml 맨 아래 추가
networks:
  rtb-network:
    driver: bridge

그리고 모든 서비스에 networks: - rtb-network 추가.

검증

# Kafka 재시작
docker-compose down
docker-compose up -d

# 토픽 생성 테스트
docker-compose exec kafka kafka-topics --create \
  --topic ad-events \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1

# 토픽 목록 확인
docker-compose exec kafka kafka-topics --list \
  --bootstrap-server localhost:9092

결과:

Created topic ad-events.

💻 Spark Streaming 코드 작성

초기 코드 (src/spark_processor.py)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Spark Session 생성
spark = SparkSession.builder \
    .appName("RTB-Pipeline-Processor") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session 시작됨!")

# 광고 로그 스키마 정의 (초기 - 잘못된 버전)
ad_log_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("ad_format", StringType(), True),
    StructField("campaign_id", StringType(), True),
    StructField("publisher_id", StringType(), True),
    StructField("bid_price", DoubleType(), True),
    StructField("win_price", DoubleType(), True),
])

print("📡 Kafka에서 데이터 스트리밍 시작...")

# Kafka에서 스트리밍 데이터 읽기
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "ad-events") \
    .option("startingOffsets", "latest") \
    .load()

# JSON 파싱
parsed_df = df.select(
    from_json(col("value").cast("string"), ad_log_schema).alias("data")
).select("data.*")

# 타임스탬프 변환
processed_df = parsed_df.withColumn(
    "event_timestamp",
    to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)

print("🔄 데이터 처리 파이프라인 구성 완료!")

# 윈도우 기반 집계 (1분 단위)
windowed_stats = processed_df \
    .withWatermark("event_timestamp", "1 minute") \
    .groupBy(
        window(col("event_timestamp"), "1 minute"),
        col("event_type"),
        col("country"),
        col("ad_format")
    ).agg(
        count("*").alias("event_count"),
        avg("bid_price").alias("avg_bid_price"),
        sum("win_price").alias("total_revenue")
    )

# 출력 포맷 변환
output_df = windowed_stats.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("event_type"),
    col("country"),
    col("ad_format"),
    col("event_count"),
    round(col("avg_bid_price"), 4).alias("avg_bid_price"),
    round(col("total_revenue"), 4).alias("total_revenue")
)

print("📊 실시간 집계 시작 - 콘솔 출력 모드")

# 콘솔 출력 (실시간 모니터링)
query = output_df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="10 seconds") \
    .start()

print("🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)")

query.awaitTermination()

핵심 개념 설명

1. Watermark (워터마크)

.withWatermark("event_timestamp", "1 minute")

워터마크란?

  • 늦게 도착하는 데이터(late data)를 어느 정도까지 기다릴지 설정
  • “1 minute” = 이벤트 시간 기준 1분까지 늦은 데이터 허용
  • 실시간 스트리밍에서 매우 중요한 개념

예시:

  • 현재 시간: 10:05
  • 워터마크: 1분
  • 10:03 이전의 데이터는 버려짐 (너무 늦음)
  • 10:03~10:05 데이터는 처리됨

2. Window Function (윈도우 함수)

window(col("event_timestamp"), "1 minute")

윈도우란?

  • 시간을 특정 간격으로 나누어 집계
  • “1 minute” = 1분 단위 tumbling window
  • 각 윈도우는 겹치지 않음

예시:

10:00:00 ~ 10:01:00 → Window 1
10:01:00 ~ 10:02:00 → Window 2
10:02:00 ~ 10:03:00 → Window 3

3. Output Mode

.outputMode("update")

3가지 출력 모드:

  • append: 새로운 행만 출력 (불변 데이터)
  • complete: 전체 결과 테이블 출력 (작은 데이터)
  • update: 변경된 행만 출력 (집계 작업에 최적)

4. Trigger

.trigger(processingTime="10 seconds")

트리거 간격:

  • 10초마다 마이크로 배치 실행
  • 더 짧게 하면 실시간성 증가, 오버헤드 증가
  • 더 길게 하면 처리량 증가, 지연 증가

🐛 스키마 불일치 문제 해결

문제 발견

Spark Streaming을 실행했지만 데이터가 표시되지 않았습니다. Kafka에 데이터가 있는지 확인:

docker exec -it adtech-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic ad-events \
  --from-beginning \
  --max-messages 5

실제 Kafka 데이터:

{
  "event_id": "E1765818248891151",
  "timestamp": "2025-12-16T02:04:08.891156",
  "user_id": "U95480",
  "campaign_id": "C004",
  "ad_format": "rewarded_video",
  "country": "UK",
  "device": "Android",
  "event_type": "click",
  "revenue": 0.0165
}

Spark에서 기대한 스키마:

StructType([
    StructField("timestamp", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("ad_format", StringType(), True),
    StructField("campaign_id", StringType(), True),
    StructField("publisher_id", StringType(), True),  # ❌ 없음!
    StructField("bid_price", DoubleType(), True),      # ❌ 없음!
    StructField("win_price", DoubleType(), True),      # ❌ 없음!
])

원인 분석

Day 1 광고 로그 생성기의 실제 구조:

  • event_id: 이벤트 고유 ID
  • device: 디바이스 타입 (iOS/Android)
  • revenue: 실제 수익 (impression=0, click>0)

Spark 코드에서 가정한 구조:

  • publisher_id: 존재하지 않음
  • bid_price, win_price: RTB 입찰 시스템 개념 (실제 데이터에는 없음)

스키마 불일치로 인해 JSON 파싱 실패!

해결: 스키마 수정

# 수정된 스키마 (실제 데이터 구조에 맞춤)
ad_log_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("campaign_id", StringType(), True),
    StructField("ad_format", StringType(), True),
    StructField("country", StringType(), True),
    StructField("device", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("revenue", DoubleType(), True),
])

타임스탬프 형식 변경

문제:

  • 실제 데이터: "2025-12-16T02:04:08.891156" (ISO 8601)
  • 기존 파싱: "yyyy-MM-dd HH:mm:ss" (다른 형식)

해결:

# 기존
processed_df = parsed_df.withColumn(
    "event_timestamp",
    to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)

# 수정 (ISO 8601 자동 파싱)
processed_df = parsed_df.withColumn(
    "event_timestamp",
    to_timestamp(col("timestamp"))
)

집계 로직 변경

# 기존 (존재하지 않는 필드)
.agg(
    count("*").alias("event_count"),
    avg("bid_price").alias("avg_bid_price"),
    sum("win_price").alias("total_revenue")
)

# 수정 (실제 revenue 사용)
.agg(
    count("*").alias("event_count"),
    avg("revenue").alias("avg_revenue"),
    sum("revenue").alias("total_revenue")
)

startingOffsets 설정 변경

# 기존
.option("startingOffsets", "latest")  # Spark 시작 후 데이터만

# 수정
.option("startingOffsets", "earliest")  # 토픽의 처음부터

왜 변경?

  • latest: Spark가 시작된 이후 도착한 데이터만 처리
  • earliest: 토픽에 저장된 모든 데이터 처리
  • 테스트 단계에서는 earliest가 유용

🎯 최종 코드

src/spark_processor.py (완성 버전):

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Spark Session 생성
spark = SparkSession.builder \
    .appName("RTB-Pipeline-Processor") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session 시작됨!")

# 광고 로그 스키마 정의 (실제 데이터 구조에 맞게 수정)
ad_log_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("campaign_id", StringType(), True),
    StructField("ad_format", StringType(), True),
    StructField("country", StringType(), True),
    StructField("device", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("revenue", DoubleType(), True),
])

print("📡 Kafka에서 데이터 스트리밍 시작...")

# Kafka에서 스트리밍 데이터 읽기
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "ad-events") \
    .option("startingOffsets", "earliest") \
    .load()

# JSON 파싱
parsed_df = df.select(
    from_json(col("value").cast("string"), ad_log_schema).alias("data")
).select("data.*")

# 타임스탬프 변환 (ISO 8601 형식 자동 파싱)
processed_df = parsed_df.withColumn(
    "event_timestamp",
    to_timestamp(col("timestamp"))
)

print("🔄 데이터 처리 파이프라인 구성 완료!")

# 윈도우 기반 집계 (1분 단위)
windowed_stats = processed_df \
    .withWatermark("event_timestamp", "1 minute") \
    .groupBy(
        window(col("event_timestamp"), "1 minute"),
        col("event_type"),
        col("country"),
        col("ad_format")
    ).agg(
        count("*").alias("event_count"),
        avg("revenue").alias("avg_revenue"),
        sum("revenue").alias("total_revenue")
    )

# 출력 포맷 변환
output_df = windowed_stats.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("event_type"),
    col("country"),
    col("ad_format"),
    col("event_count"),
    round(col("avg_revenue"), 4).alias("avg_revenue"),
    round(col("total_revenue"), 4).alias("total_revenue")
)

print("📊 실시간 집계 시작 - 콘솔 출력 모드")

# 콘솔 출력 (실시간 모니터링)
query = output_df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="10 seconds") \
    .start()

print("🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)")

query.awaitTermination()

🚀 실행 및 결과

실행 방법

터미널 1: 광고 로그 생성기

cd ~/adtech-realtime-pipeline
python data-generator/ad_log_generator.py

터미널 2: Spark Streaming

docker exec -it spark-master /opt/spark/bin/spark-submit \
  --master spark://spark-master:7077 \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
  /opt/spark-apps/spark_processor.py

실행 결과

✅ Spark Session 시작됨!
📡 Kafka에서 데이터 스트리밍 시작...
🔄 데이터 처리 파이프라인 구성 완료!
📊 실시간 집계 시작 - 콘솔 출력 모드
🚀 Spark Streaming 실행 중... (Ctrl+C로 종료)

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|window_start       |window_end         |event_type|country|ad_format     |event_count|avg_revenue|total_revenue|
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|2025-12-16 02:06:00|2025-12-16 02:07:00|impression|DE     |interstitial  |207        |0.0        |0.0          |
|2025-12-16 02:08:00|2025-12-16 02:09:00|impression|CA     |interstitial  |206        |0.0        |0.0          |
|2025-12-16 02:11:00|2025-12-16 02:12:00|impression|KR     |rewarded_video|175        |0.0        |0.0          |
|2025-12-16 02:07:00|2025-12-16 02:08:00|click     |CA     |banner        |13         |0.0426     |0.5538       |
|2025-12-16 02:04:00|2025-12-16 02:05:00|impression|US     |banner        |204        |0.0        |0.0          |
|2025-12-16 02:14:00|2025-12-16 02:15:00|click     |UK     |banner        |4          |0.0364     |0.1454       |
|2025-12-16 02:15:00|2025-12-16 02:16:00|click     |FR     |interstitial  |25         |0.0325     |0.8127       |
|2025-12-16 02:08:00|2025-12-16 02:09:00|click     |CA     |rewarded_video|58         |0.0445     |2.5801       |
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|window_start       |window_end         |event_type|country|ad_format     |event_count|avg_revenue|total_revenue|
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
|2025-12-16 02:19:00|2025-12-16 02:20:00|impression|US     |banner        |160        |0.0        |0.0          |
|2025-12-16 02:19:00|2025-12-16 02:20:00|impression|DE     |interstitial  |148        |0.0        |0.0          |
|2025-12-16 02:19:00|2025-12-16 02:20:00|click     |JP     |rewarded_video|35         |0.0485     |1.6987       |
|2025-12-16 02:19:00|2025-12-16 02:20:00|click     |CA     |banner        |9          |0.0494     |0.4443       |
+-------------------+-------------------+----------+-------+--------------+-----------+-----------+-------------+
only showing top 20 rows

결과 분석

1. 이벤트 타입별 특성

Impression (노출) 이벤트:

event_type=impression → avg_revenue=0.0, total_revenue=0.0
  • 광고가 화면에 표시되기만 함
  • 수익 발생하지 않음
  • 대량 발생 (분당 100~250회)

Click (클릭) 이벤트:

event_type=click → avg_revenue=0.03~0.05, total_revenue > 0
  • 사용자가 실제로 광고 클릭
  • 수익 발생 (클릭당 $0.03~0.05)
  • 상대적으로 적게 발생 (분당 10~60회)

2. 광고 포맷별 성과

banner:          낮은 클릭률, 낮은 단가
interstitial:    중간 클릭률, 중간 단가
rewarded_video:  높은 클릭률, 높은 단가

실제 데이터 예시:

CA banner (1분):         13 clicks, $0.0426 avg, $0.55 total
CA rewarded_video (1분): 58 clicks, $0.0445 avg, $2.58 total

→ Rewarded video가 약 5배 더 많은 수익 생성!

3. 국가별 트래픽 차이

주요 국가:

  • US, KR, JP: 높은 트래픽 (분당 200+ impressions)
  • UK, CA, FR, DE: 중간 트래픽 (분당 150-200 impressions)

4. 시간대별 패턴

1분 윈도우로 집계되어 실시간 트래픽 변화 관찰 가능:

02:04:00~02:05:00 → 02:06:00~02:07:00 → 02:08:00~02:09:00

💡 배운 점과 트러블슈팅

1. Docker 네트워크의 중요성

문제:

  • 처음에는 각 서비스가 독립적으로 실행
  • 서비스 간 통신 불가

해결:

  • 모든 서비스를 동일한 Docker 네트워크에 배치
  • 컨테이너 이름으로 서비스 간 통신 가능 (kafka:29092)

교훈:

networks:
  rtb-network:
    driver: bridge

이 한 줄이 모든 서비스를 연결!

2. Kafka 리스너 설정의 복잡성

핵심 개념:

  • LISTENERS: Kafka가 어디서 연결을 받을지
  • ADVERTISED_LISTENERS: 클라이언트에게 어떤 주소를 알려줄지

실전 팁:

# 컨테이너 내부
PLAINTEXT://kafka:29092

# 호스트 (로컬 개발)
PLAINTEXT_HOST://localhost:9092

두 개를 분리하면 유연성 증가!

3. 스키마 정의의 중요성

실수:

  • 데이터 구조를 확인하지 않고 코드 작성
  • 존재하지 않는 필드 참조

올바른 접근:

  1. 먼저 Kafka 토픽의 실제 데이터 확인
  2. 스키마 정의
  3. 코드 작성

검증 방법:

docker exec -it adtech-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic ad-events \
  --from-beginning \
  --max-messages 1

4. Watermark와 Window의 이해

Watermark (워터마크):

  • 늦게 도착하는 데이터 처리 정책
  • 너무 짧으면 → 데이터 손실
  • 너무 길면 → 메모리 사용량 증가

Window (윈도우):

  • Tumbling Window: 겹치지 않음 (우리가 사용)
  • Sliding Window: 겹침 (더 부드러운 분석)
  • Session Window: 이벤트 간격 기반

5. startingOffsets 전략

latest (기본값):

  • 장점: 최신 데이터만, 빠른 시작
  • 단점: 과거 데이터 놓침

earliest:

  • 장점: 모든 데이터 처리, 재현 가능
  • 단점: 토픽이 크면 느림

프로덕션 권장:

  • 처음: earliest (전체 데이터 분석)
  • 이후: 체크포인트 사용 (중단된 지점부터 재개)

6. 성능 최적화 고려사항

현재 설정:

.trigger(processingTime="10 seconds")

트레이드오프:

  • 짧은 간격 (1-5초): 낮은 지연, 높은 오버헤드
  • 긴 간격 (30-60초): 높은 처리량, 긴 지연

우리 선택:

  • 10초: 실시간성과 효율의 균형

📊 성능 지표

처리 성능

입력: ~82 events/sec (광고 로그 생성기)
처리: 10초 마이크로배치
지연: < 15초 (end-to-end)

리소스 사용

Spark Worker:
- CPU: 2 cores
- Memory: 2GB
- 실제 사용: ~30% CPU, ~500MB RAM

데이터 처리량

1분 윈도우당:
- Impressions: 100-250 events
- Clicks: 10-60 events
- 총: ~5,000 events/분 (~82 events/초)

🎓 핵심 개념 정리

Spark Streaming 아키텍처

Driver (Master)
    ↓
Executor (Worker) → Task 1
                  → Task 2
                  → Task ...

역할:

  • Driver: 전체 조율, 스케줄링
  • Executor: 실제 데이터 처리
  • Task: 파티션별 처리 단위

Structured Streaming vs DStream

Structured Streaming (우리가 사용):

  • DataFrame/Dataset API
  • SQL과 유사한 문법
  • 높은 수준의 추상화
  • 최적화 자동

DStream (구버전):

  • RDD 기반
  • 낮은 수준 제어
  • 수동 최적화 필요

선택 이유: Structured Streaming이 현대적이고 사용하기 쉬움!

마이크로배치 vs 연속 처리

마이크로배치 (우리가 사용):

[Batch 0] → [Batch 1] → [Batch 2] → ...
  (10초)      (10초)      (10초)

연속 처리:

→ → → → → (계속)

마이크로배치 장점:

  • 장애 복구 용이
  • 정확한 처리 보장
  • 디버깅 쉬움

🔍 디버깅 팁

1. Kafka 데이터 확인

# 최신 메시지 확인
docker exec -it adtech-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic ad-events \
  --max-messages 10

# 특정 파티션 확인
docker exec -it adtech-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic ad-events \
  --partition 0 \
  --offset 0 \
  --max-messages 5

2. Spark UI 활용

http://localhost:9090 → Spark Master UI
http://localhost:4040 → Spark Application UI (실행 중일 때)

확인 사항:

  • Streaming 탭: 배치 처리 시간
  • Executors 탭: 리소스 사용량
  • SQL 탭: 쿼리 실행 계획

3. 로그 레벨 조정

# 더 자세한 로그
spark.sparkContext.setLogLevel("INFO")

# 에러만
spark.sparkContext.setLogLevel("ERROR")

# 경고만 (기본 권장)
spark.sparkContext.setLogLevel("WARN")

4. 데이터 검증

# 스트림 중간 결과 확인
parsed_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

📁 프로젝트 구조 (Day 2 완료 후)

adtech-realtime-pipeline/
├── data-generator/
│   └── ad_log_generator.py          # 광고 로그 생성기
├── src/
│   └── spark_processor.py           # Spark Streaming (NEW!)
├── sql/
│   └── create_tables.sql            # ClickHouse 테이블 정의
├── docker-compose.yml               # 전체 서비스 정의 (Updated!)
├── requirements.txt
└── README.md

🚀 다음 단계 (Day 3 Preview)

목표: 데이터 영속화 및 분석 기반 구축

  1. ClickHouse 테이블 생성

    CREATE TABLE ad_events_aggregated (
        window_start DateTime,
        window_end DateTime,
        event_type String,
        country String,
        ad_format String,
        event_count UInt64,
        avg_revenue Float64,
        total_revenue Float64
    ) ENGINE = MergeTree()
    ORDER BY (window_start, country);
    
  2. Spark → ClickHouse 연동

    • JDBC 드라이버 추가
    • 배치 삽입 최적화
    • 에러 처리 및 재시도
  3. 데이터 쿼리 및 검증

    SELECT 
        country,
        sum(total_revenue) as revenue,
        sum(event_count) as events
    FROM ad_events_aggregated
    WHERE event_type = 'click'
    GROUP BY country
    ORDER BY revenue DESC;
    
  4. Superset 대시보드 준비

    • ClickHouse 데이터소스 연결
    • 차트 타입 설계
    • 실시간 업데이트 설정

📚 참고 자료

공식 문서

추천 읽을거리

  • Watermarking in Spark Streaming
  • Kafka Consumer Groups 이해하기
  • Docker Networking Best Practices

유용한 명령어 모음

# Spark 로그 실시간 확인
docker logs -f spark-master

# Kafka 토픽 상세 정보
docker exec -it adtech-kafka kafka-topics --describe \
  --topic ad-events \
  --bootstrap-server localhost:9092

# 실행 중인 Spark 애플리케이션 확인
docker exec -it spark-master ps aux | grep spark

# 리소스 사용량 모니터링
docker stats spark-master spark-worker

💭 회고

잘된 점

✅ Kafka-Spark 파이프라인 구축 성공 ✅ 실시간 윈도우 집계 구현 ✅ 의미 있는 비즈니스 인사이트 도출 ✅ 체계적인 문제 해결 과정

어려웠던 점

🔥 Docker 네트워크 설정의 복잡성 🔥 Kafka 리스너 구성 이해 🔥 스키마 불일치 디버깅 🔥 Spark Streaming 개념 학습

개선할 점

💡 처음부터 데이터 구조 명확히 정의하기 💡 더 체계적인 로깅 및 모니터링 💡 단위 테스트 추가 💡 설정 파일 외부화 (하드코딩 제거)


🎯 Day 2 체크리스트

  • Spark Master & Worker 구축
  • Kafka 네트워크 설정 문제 해결
  • Spark Streaming 코드 작성
  • 스키마 불일치 문제 해결
  • 실시간 윈도우 집계 구현
  • 콘솔 출력으로 데이터 검증
  • 성능 및 안정성 확인

다음 목표: ClickHouse에 데이터 저장하고 Superset으로 시각화! 🚀


작성일: 2025년 12월 16일 소요 시간: 약 4시간 난이도: ⭐⭐⭐⭐☆ 만족도: ⭐⭐⭐⭐⭐