실시간 광고 데이터 파이프라인 구축기 (1) - Kafka 환경 세팅과 광고 로그 생성기

8주 프로젝트 Day 1: AdTech 데이터 엔지니어를 위한 실전 포트폴리오 만들기


프로젝트 소개

무엇을 만드는가?

실시간 광고 입찰 시스템(RTB, Real-Time Bidding) 데이터 파이프라인을 처음부터 끝까지 구축합니다.

[광고 이벤트 발생] 
    ↓
[Kafka로 실시간 수집] 
    ↓
[Spark Streaming으로 집계]
    ↓
[ClickHouse에 저장]
    ↓
[Superset 대시보드로 시각화]
    ↓
[ML 모델로 CTR 예측]

왜 이 프로젝트인가?

AdTech(광고 기술) 분야는 데이터 엔지니어링의 꽃입니다:

  • 실시간성: 초당 수만 건의 광고 요청 처리
  • 비즈니스 임팩트: 수익과 직결되는 지표 (CTR, eCPM)
  • 기술 스택: Kafka, Spark, ClickHouse 등 최신 기술
  • 복잡도: 스트리밍, 배치, ML 모델 서빙 모두 포함

🎯 Day 1 목표

오늘은 데이터 파이프라인의 출발점을 만듭니다:

  1. ✅ Docker로 Kafka 환경 구축
  2. ✅ 광고 로그 생성기 개발
  3. ✅ 실시간으로 Kafka에 이벤트 전송
  4. ✅ 데이터 구조 검증

🏗 Step 1: 프로젝트 구조 설계

GitHub Repository 구조

adtech-realtime-pipeline/
│
├── README.md                    # 프로젝트 설명
├── docker-compose.yml           # 인프라 설정
├── requirements.txt             # Python 패키지
│
├── data-generator/              # 광고 로그 생성기
│   ├── ad_log_generator.py     # Producer
│   └── ad_log_consumer.py      # Consumer
│
├── streaming/                   # Spark Streaming (Day 2)
├── models/                      # ML 모델 (Day 5)
├── sql/                         # ClickHouse 스키마
└── docs/                        # 문서

왜 이렇게 구성했나?

  • 모듈화: 각 기능을 독립적으로 개발/테스트 가능
  • 확장성: 새로운 기능 추가 시 구조 변경 불필요
  • 가독성: 디렉토리만 봐도 프로젝트 구조 파악 가능

🐳 Step 2: Docker Compose로 인프라 구축

왜 Docker인가?

실무에서는 Kafka, ClickHouse, Redis 등을 각각 설치하고 설정하는 게 복잡합니다. Docker Compose를 사용하면:

  • 일관된 환경: 모든 팀원이 동일한 환경에서 작업
  • 빠른 셋업: docker-compose up 한 줄로 전체 인프라 실행
  • 격리: 로컬 환경 오염 없이 테스트 가능

docker-compose.yml 핵심 구성

services:
  # Kafka를 위한 Zookeeper
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  # Kafka 브로커
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

  # ClickHouse (OLAP 데이터베이스)
  clickhouse:
    image: clickhouse/clickhouse-server:23.8
    ports:
      - "8123:8123"  # HTTP API
      - "9000:9000"  # Native protocol

  # Redis (캐싱용)
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  # Superset (대시보드)
  superset:
    image: apache/superset:2.1.0
    ports:
      - "8088:8088"

각 컴포넌트의 역할

컴포넌트 역할 왜 필요한가?
Kafka 메시지 큐 실시간 이벤트 스트리밍
Zookeeper Kafka 관리 Kafka 클러스터 코디네이션
ClickHouse OLAP DB 빠른 집계 쿼리 (컬럼 기반 저장)
Redis 캐시 ML 모델 예측 결과 저장
Superset BI 도구 실시간 대시보드

실행

# 전체 인프라 시작
docker-compose up -d

# 상태 확인
docker-compose ps

# 출력:
# NAME                IMAGE                               STATUS
# adtech-kafka        confluentinc/cp-kafka:7.4.0        Up
# adtech-clickhouse   clickhouse/clickhouse-server:23.8  Up
# adtech-redis        redis:7-alpine                     Up
# adtech-superset     apache/superset:2.1.0              Up
# adtech-zookeeper    confluentinc/cp-zookeeper:7.4.0    Up

📊 Step 3: 광고 데이터 구조 설계

실제 광고 로그는 어떻게 생겼나?

광고 시스템에서는 두 가지 핵심 이벤트가 발생합니다:

  1. Impression (노출): 사용자가 광고를 봄
  2. Click (클릭): 사용자가 광고를 클릭함

우리의 이벤트 스키마

{
  "event_id": "E1765295181994871",
  "timestamp": "2025-12-10T00:46:21.994570",
  "user_id": "U60734",
  "campaign_id": "C003",
  "ad_format": "interstitial",
  "country": "US",
  "device": "Android",
  "event_type": "impression",
  "revenue": 0.0
}

필드 설명

필드 타입 설명
event_id String 고유 이벤트 ID
timestamp ISO String 이벤트 발생 시각
user_id String 사용자 식별자
campaign_id String 광고 캠페인 ID
ad_format String 광고 포맷 (banner, interstitial, rewarded_video)
country String 사용자 국가 (US, KR, JP 등)
device String 디바이스 (iOS, Android)
event_type String 이벤트 타입 (impression, click)
revenue Float 수익 (클릭 시에만 발생)

왜 이렇게 설계했나?

1. 실무와 동일한 구조

  • 실제 AdTech 회사들이 사용하는 필드들
  • 분석에 필요한 최소한의 정보

2. 확장 가능성

  • 나중에 필드 추가 가능 (예: ad_size, placement_id)
  • JSON 형태라 스키마 변경 유연

3. 분석 용이성

  • 국가별, 포맷별, 캠페인별 집계 가능
  • 타임스탬프로 시계열 분석 가능

💡 Step 4: 현실적인 데이터 시뮬레이션

광고 지표 이해하기

CTR (Click-Through Rate): 클릭률

CTR = (클릭 수 / 노출 수) × 100%

eCPM (effective Cost Per Mille): 1,000회 노출당 수익

eCPM = (총 수익 / 총 노출 수) × 1,000

실제 업계 데이터 반영

1. 광고 포맷별 CTR

FORMAT_CTR = {
    'banner': 0.05,          # 5% - 배너 광고는 낮음
    'interstitial': 0.12,    # 12% - 전면 광고는 중간
    'rewarded_video': 0.25   # 25% - 리워드 광고는 높음
}

왜 이런 차이가?

  • 배너: 작고 눈에 안 띔
  • 전면: 크고 강제 노출
  • 리워드: 사용자가 보상 받기 위해 자발적 시청

2. 국가별 eCPM

COUNTRY_ECPM = {
    'US': (0.02, 0.10),   # 미국: $0.02~$0.10
    'KR': (0.01, 0.05),   # 한국: $0.01~$0.05
    'JP': (0.015, 0.08),  # 일본: $0.015~$0.08
}

왜 미국이 제일 높나?

  • 광고주 경쟁이 치열함
  • 구매력이 높음
  • 광고 시장이 성숙함

🔧 Step 5: 광고 로그 생성기 구현

confluent-kafka를 선택한 이유

kafka-python의 문제점:

  • Python 3.12와 호환성 이슈
  • 유지보수가 거의 안 됨
  • 성능이 상대적으로 느림

confluent-kafka의 장점:

  • Confluent 공식 지원 (Kafka 개발사)
  • C 라이브러리 기반으로 빠름
  • Python 3.12 완벽 지원

코드 핵심 로직

from confluent_kafka import Producer
import json
import random
from datetime import datetime
from faker import Faker

fake = Faker()

# Kafka Producer 설정
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)

def generate_ad_event():
    """광고 이벤트 하나 생성"""
    
    # 광고 포맷 선택
    ad_format = random.choice(['banner', 'interstitial', 'rewarded_video'])
    
    # 해당 포맷의 CTR 기반으로 클릭 여부 결정
    ctr = FORMAT_CTR[ad_format]
    is_click = random.random() < ctr
    
    # 클릭이면 수익 발생
    revenue = 0
    if is_click:
        country = random.choice(['US', 'KR', 'JP', ...])
        ecpm_range = COUNTRY_ECPM[country]
        revenue = round(random.uniform(*ecpm_range), 4)
    
    return {
        'event_id': f"E{int(time.time()*1000)}{random.randint(100,999)}",
        'timestamp': datetime.now().isoformat(),
        'user_id': f"U{fake.random_int(min=10000, max=99999)}",
        'campaign_id': random.choice(['C001', 'C002', ...]),
        'ad_format': ad_format,
        'country': country,
        'device': random.choice(['iOS', 'Android']),
        'event_type': 'click' if is_click else 'impression',
        'revenue': revenue
    }

핵심 포인트

1. 확률 기반 클릭 생성

is_click = random.random() < ctr
  • random.random()은 0~1 사이 값 반환
  • CTR이 0.05면 5% 확률로 클릭 발생

2. Kafka 비동기 전송

producer.produce(
    'ad-events',
    key=event['event_id'].encode('utf-8'),
    value=json.dumps(event).encode('utf-8')
)
producer.poll(0)  # 비동기 처리
  • poll(0): 백그라운드 전송 처리
  • 성능을 위해 버퍼에 모아서 전송

3. 주기적인 Flush

if event_count % 1000 == 0:
    producer.flush()  # 버퍼 강제 비우기
  • 1,000개마다 확실하게 전송
  • 데이터 손실 방지

🚀 Step 6: Kafka 토픽 생성 및 테스트

Kafka Topic이란?

카프카의 토픽(Topic)은 메시지를 저장하는 논리적 채널입니다.

  • 게시판의 “카테고리”와 비슷
  • 여러 Producer가 같은 토픽에 메시지 전송 가능
  • 여러 Consumer가 같은 토픽에서 메시지 읽기 가능

토픽 생성

docker exec adtech-kafka kafka-topics \
  --create \
  --topic ad-events \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

파라미터 설명:

  • --partitions 3: 3개의 파티션으로 분산 (병렬 처리)
  • --replication-factor 1: 복제본 1개 (로컬 환경이라 1개)

생성기 실행

python data-generator/ad_log_generator.py

출력:

🚀 Starting Ad Log Generator...
📊 Generating ~100 events/sec
📍 Kafka Topic: ad-events
==================================================
✅ Sent 1,000 events | Rate: 82.9 events/sec
✅ Sent 2,000 events | Rate: 82.7 events/sec

데이터 확인

docker exec -it adtech-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic ad-events \
  --from-beginning \
  --max-messages 3

출력:

{"event_id": "E1765295181994871", "timestamp": "2025-12-10T00:46:21.994570", "user_id": "U60734", "campaign_id": "C003", "ad_format": "interstitial", "country": "US", "device": "Android", "event_type": "impression", "revenue": 0}

{"event_id": "E1765295182007140", "timestamp": "2025-12-10T00:46:22.007029", "user_id": "U73756", "campaign_id": "C004", "ad_format": "banner", "country": "FR", "device": "Android", "event_type": "impression", "revenue": 0}

{"event_id": "E1765295182042305", "timestamp": "2025-12-10T00:46:22.042544", "user_id": "U21745", "campaign_id": "C004", "ad_format": "banner", "country": "KR", "device": "iOS", "event_type": "impression", "revenue": 0}

📈 Step 7: 성과 측정

오늘 생성한 데이터

📊 Final Stats:
   - Total Events: 2,937
   - Duration: 35.5s
   - Avg Rate: 82.6 events/sec

데이터 분포 분석 (예상)

이벤트 타입 분포:

  • Impression: ~2,650건 (90%)
  • Click: ~287건 (10%)

국가 분포 (균등 랜덤):

  • US, KR, JP, UK, DE, FR, CA 각각 ~420건

광고 포맷 분포 (균등 랜덤):

  • Banner: ~979건
  • Interstitial: ~979건
  • Rewarded Video: ~979건

평균 CTR:

  • 전체: ~10%
  • Banner: ~5%
  • Interstitial: ~12%
  • Rewarded Video: ~25%

🎓 배운 것들

1. Kafka 기초 개념

Producer → Topic → Consumer 구조

[Producer]  ─────>  [Topic: ad-events]  ─────>  [Consumer]
(광고 로그 생성)        (메시지 저장)          (데이터 처리)

2. 실시간 스트리밍의 핵심

  • 비동기 처리: 블로킹 없이 빠르게 전송
  • 배치 전송: 개별 전송보다 묶어서 전송이 효율적
  • 백프레셔: Producer가 너무 빠르면 버퍼 관리 필요

3. AdTech 도메인 지식

  • CTR: 광고 효율성 지표
  • eCPM: 수익성 지표
  • Fill Rate: 광고 채움률 (다음 단계에서 다룰 예정)

🐛 트러블슈팅

문제 1: kafka-python 호환성 에러

증상:

ModuleNotFoundError: No module named 'kafka.vendor.six.moves'

원인:

  • Python 3.12에서 kafka-python 라이브러리 오래됨

해결:

pip uninstall kafka-python
pip install confluent-kafka

문제 2: Docker 데몬 미실행

증상:

Cannot connect to the Docker daemon

해결:

# Docker Desktop 실행
open -a Docker

# 30초 대기 후
docker ps  # 정상 작동 확인

문제 3: 포트 충돌

증상:

port 8080 already in use

해결:

# 사용 중인 프로세스 확인
lsof -i :8080

# 기존 컨테이너 중지 또는
# docker-compose.yml에서 포트 변경
ports:
  - "8089:8088"  # 8088 → 8089

📦 완성된 구조

adtech-realtime-pipeline/
│
├── docker-compose.yml          # ✅ 5개 서비스 실행 중
├── requirements.txt            # ✅ confluent-kafka, faker
│
├── data-generator/
│   ├── ad_log_generator.py    # ✅ 83 events/sec
│   └── ad_log_consumer.py     # 준비 완료
│
├── .gitignore                 # ✅ data/, *.jsonl 제외
└── README.md                  # ✅ 프로젝트 문서

🎯 다음 단계 (Day 2-3)

목표: Spark Streaming으로 실시간 집계

할 일:

  1. PySpark 설치 및 환경 구축
  2. Kafka에서 스트림 읽기
  3. 10초 윈도우로 집계
  4. CTR, eCPM 실시간 계산
  5. 콘솔에 결과 출력

예상 결과:

Window: 2025-12-10 00:46:00 ~ 00:46:10
Campaign: C001 | Format: banner
- Impressions: 85
- Clicks: 4
- CTR: 4.7%
- Revenue: $0.23
- eCPM: $2.71

💻 전체 코드

최종 ad_log_generator.py

from confluent_kafka import Producer
import json
import random
import time
from datetime import datetime
from faker import Faker

fake = Faker()

conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)

CAMPAIGNS = ['C001', 'C002', 'C003', 'C004', 'C005']
AD_FORMATS = ['banner', 'interstitial', 'rewarded_video']
COUNTRIES = ['US', 'KR', 'JP', 'UK', 'DE', 'FR', 'CA']
DEVICES = ['iOS', 'Android']

COUNTRY_ECPM = {
    'US': (0.02, 0.10),
    'KR': (0.01, 0.05),
    'JP': (0.015, 0.08),
    'UK': (0.015, 0.07),
    'DE': (0.01, 0.06),
    'FR': (0.01, 0.05),
    'CA': (0.015, 0.07)
}

FORMAT_CTR = {
    'banner': 0.05,
    'interstitial': 0.12,
    'rewarded_video': 0.25
}

def delivery_report(err, msg):
    if err is not None:
        print(f'❌ Message delivery failed: {err}')

def generate_ad_event():
    timestamp = datetime.now().isoformat()
    user_id = f"U{fake.random_int(min=10000, max=99999)}"
    campaign_id = random.choice(CAMPAIGNS)
    ad_format = random.choice(AD_FORMATS)
    country = random.choice(COUNTRIES)
    device = random.choice(DEVICES)
    
    ctr = FORMAT_CTR[ad_format]
    is_click = random.random() < ctr
    event_type = 'click' if is_click else 'impression'
    
    revenue = 0
    if is_click:
        ecpm_range = COUNTRY_ECPM[country]
        revenue = round(random.uniform(*ecpm_range), 4)
    
    return {
        'event_id': f"E{int(time.time()*1000)}{random.randint(100,999)}",
        'timestamp': timestamp,
        'user_id': user_id,
        'campaign_id': campaign_id,
        'ad_format': ad_format,
        'country': country,
        'device': device,
        'event_type': event_type,
        'revenue': revenue
    }

def main():
    print("🚀 Starting Ad Log Generator...")
    print("📊 Generating ~100 events/sec")
    print("📍 Kafka Topic: ad-events")
    print("=" * 50)
    
    event_count = 0
    start_time = time.time()
    
    try:
        while True:
            event = generate_ad_event()
            
            producer.produce(
                'ad-events',
                key=event['event_id'].encode('utf-8'),
                value=json.dumps(event).encode('utf-8'),
                callback=delivery_report
            )
            producer.poll(0)
            
            event_count += 1
            
            if event_count % 1000 == 0:
                producer.flush()
                elapsed = time.time() - start_time
                rate = event_count / elapsed
                print(f"✅ Sent {event_count:,} events | Rate: {rate:.1f} events/sec")
            
            time.sleep(0.01)
            
    except KeyboardInterrupt:
        print("\n\n🛑 Stopping generator...")
        producer.flush()
        elapsed = time.time() - start_time
        print(f"📊 Final Stats:")
        print(f"   - Total Events: {event_count:,}")
        print(f"   - Duration: {elapsed:.1f}s")
        print(f"   - Avg Rate: {event_count/elapsed:.1f} events/sec")

if __name__ == "__main__":
    main()

📚 참고 자료

Kafka 공식 문서

AdTech 개념

Docker Compose


🎉 마무리

오늘 우리는:

  • ✅ Kafka 기반 실시간 데이터 파이프라인 기초 구축
  • ✅ 현실적인 광고 데이터 시뮬레이션
  • ✅ 초당 83개 이벤트를 안정적으로 생성

8주 프로젝트 Day 1 완료!

다음 포스팅에서는 Spark Streaming으로 실시간 집계를 구현합니다.


GitHub Repository: adtech-realtime-pipeline

다음 글: 실시간 광고 데이터 파이프라인 구축기 (2) - Spark Streaming 실시간 집계


이 글이 도움이 되었다면 ⭐️ 스타와 👏 공유 부탁드립니다!