Day 6: 실시간 이상 탐지 시스템 구축

오늘의 목표

Day 5에서 구축한 Redis 캐싱 시스템에 실시간 이상 탐지(Anomaly Detection)Slack 알림 기능을 추가한다.

핵심 질문: “CTR이 갑자기 50% 떨어지면 어떻게 즉시 알 수 있을까?”


왜 이 시스템이 필요한가?

실제 AdTech에서 발생하는 문제들

문제 상황 원인 결과
CTR이 5% → 2%로 급락 광고 소재 오류 시간당 $500 손실
CTR이 5% → 15%로 급등 봇(Bot) 공격 광고주 예산 낭비
트래픽 3배 증가 DDoS 공격 서버 다운
수익(eCPM) 50% 감소 입찰가 오류 매출 급락

문제: 이런 이상을 수동으로 모니터링? → 불가능!

해결: 실시간 자동 탐지 + 30초 내 Slack 알림 🔔


🏗️ 전체 시스템 구조 (한눈에 보기)

┌─────────────────┐
│ 터미널 1        │  광고 로그 생성 (초당 80개)
│ ad_log_generator│─────┐
└─────────────────┘     │
                        ↓
                  ┌──────────┐
                  │  Kafka   │  이벤트 보관함
                  │ad-events │
                  └──────────┘
                        ↓
┌─────────────────┐     │
│ 터미널 2        │←────┘
│ Spark Streaming │
│                 │
│ 1. 집계         │──→ Redis (캐시)
│ 2. 이상 탐지 ⚠️ │──→ ClickHouse (분석)
│                 │
│ 3. 알림 전송    │──→ Kafka alerts 토픽
└─────────────────┘            ↓
                               │
┌─────────────────┐            │
│ 터미널 3        │←───────────┘
│ Slack Consumer  │
│                 │
│ 중복 체크       │──→ Redis (cooldown)
│ 알림 전송       │──→ 🔔 Slack
└─────────────────┘

┌─────────────────┐
│ 터미널 4        │  테스트용 (이상 패턴 생성)
│ Test Generator  │──→ Kafka ad-events
└─────────────────┘

🚀 Step 1: 4개 터미널의 역할 이해하기

터미널 1: 광고 로그 생성기 (데이터 소스)

역할: 가짜 광고 이벤트를 계속 만들어냅니다.

# 실행 명령
python data-generator/ad_log_generator.py

하는 일:

매초마다:
    "미국 사용자가 배너 광고를 클릭했어요!"
    "한국 사용자가 리워드 비디오를 봤어요!"
     Kafka ad-events 토픽으로 전송 (초당 80)

출력 예시:

✅ Sent 1,000 events | Rate: 83 events/sec
📊 Total sent: 5,000 events

비유: 공장에서 제품(광고 이벤트)을 계속 생산하는 기계

image-20251218024508095


터미널 2: Spark Streaming (두뇌)

역할: 실시간으로 데이터를 분석하고 이상을 탐지합니다.

# 실행 명령
docker exec spark-master /opt/spark/bin/spark-submit \
  --master "local[2]" \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
  /opt/spark-apps/spark_processor_anomaly_detection.py

하는 일 (3가지):

① 실시간 집계 (1분 윈도우)

# 1분마다 계산
- 미국 배너: impression 1,234, click 45
- CTR = 45 / 1,234 = 3.64%
- eCPM = $2.50

 Redis 저장 (실시간 조회용)
 ClickHouse 저장 (장기 분석용)

② 이상 탐지 (4가지 패턴)

# 과거 1시간 평균과 비교
if 현재 CTR < 평균 CTR × 0.5:
    🚨 알림: "CTR 급락!"
    
if 현재 CTR > 평균 CTR × 2:
    🤖 알림: "CTR 급등 (봇 의심)!"
    
if 현재 트래픽 > 평균 트래픽 × 3:
    📈 알림: "트래픽 폭증!"
    
if 현재 eCPM < 평균 eCPM × 0.5:
    💸 알림: "수익 급락!"

③ 알림 전송

# 이상 감지 시
alert = {
    "alert_type": "ctr_drop",
    "country": "US",
    "ad_format": "banner",
    "current_value": 2.0,
    "expected_value": 5.0,
    "deviation_percent": -60.0,
    "message": "🚨 CTR 급락! US banner: 5.00% → 2.00% (-60.0%)"
}

 Kafka alerts 토픽으로 전송

출력 예시:

📦 Processing Batch 5...
✅ Batch 5: Redis에 21개 메트릭 저장됨
✅ Batch 5: ClickHouse에 21개 메트릭 저장됨

⚠️  🚨 CTR 급락! US banner: 5.00% → 2.00% (-60.0%)
⚠️  📈 트래픽 급증! JP interstitial: 31 → 114 (+267.7%)
⚠️  🤖 CTR 급등 (봇 의심)! UK banner: 0.06% → 5.77% (+8914.4%)
✅ 3개 알림을 Kafka로 전송 완료
🔔 Batch 5: 3개 알림 감지!

비유: 공장에서 불량품(이상)을 검사하고, 발견하면 경보를 울리는 품질 검사원

image-20251218025922864


터미널 3: Slack Consumer (알림 전달자)

역할: Kafka alerts 토픽을 구독하고 Slack으로 알림을 전송합니다.

# 실행 명령
python src/slack_alert_consumer.py

하는 일:

① Kafka에서 알림 수신

consumer = KafkaConsumer('alerts', ...)

for message in consumer:
    alert = message.value
    # {"alert_type": "ctr_drop", "message": "🚨 CTR 급락!", ...}

② 중복 체크 (Redis)

# 같은 알림이 10분 내에 발생했나?
key = f"alert_sent:US:banner:ctr_drop"

if redis.exists(key):
    print("⏭️  중복 알림 (10분 내 동일 알림 발생)")
    continue

# 10분 동안 중복 방지
redis.setex(key, 600, "1")

③ Slack으로 전송

slack_message = {
    "text": "🚨 📉 AdTech 이상 탐지",
    "blocks": [
        {"type": "header", "text": "🚨 CTR 급락! US banner: 5.00% → 2.00%"},
        {"type": "section", "fields": [
            {"type": "mrkdwn", "text": "*국가:*\nUS"},
            {"type": "mrkdwn", "text": "*광고 포맷:*\nbanner"},
            ...
        ]}
    ]
}

requests.post(SLACK_WEBHOOK_URL, json=slack_message)

출력 예시:

🔔 Slack Alert Consumer 시작!
   - Kafka Topic: alerts
   - Redis 중복 방지: 10분 cooldown
   - Slack Webhook: 설정됨

⏳ 알림 대기 중... (Ctrl+C로 종료)

🔔 알림 수신: 🚨 CTR 급락! US banner: 5.00% → 2.00% (-60.0%)
✅ Slack 전송 성공!
   📊 Severity: high | Type: ctr_drop

비유: 경보를 듣고 담당자(Slack 채널)에게 전화하는 보안 요원

image-20251218030034796


터미널 4: 테스트 생성기 (시뮬레이터)

역할: 의도적으로 이상 패턴을 만들어서 시스템을 테스트합니다.

# 실행 명령
python src/anomaly_test_generator.py

하는 일:

시나리오 1: CTR 급락 시뮬레이션

# 1단계: 정상 데이터 전송 (30초)
for i in range(1000):
    send_event(type="impression")  # 1,000개
for i in range(50):
    send_event(type="click")        # 50개 (CTR 5%)

time.sleep(30)  # Spark가 평균 계산

# 2단계: 이상 데이터 전송
for i in range(1000):
    send_event(type="impression")  # 1,000개
for i in range(20):
    send_event(type="click")        # 20개 (CTR 2% ← 급락!)

출력 예시:

🧪 이상 패턴 테스트 생성기
==================================================
선택 (1-5): 1

📉 테스트 1: CTR 급락 시뮬레이션
   - 정상: impression 1000개, click 50개 (CTR 5%)
   - 이상: impression 1000개, click 20개 (CTR 2%)

1️⃣ 정상 데이터 전송 중... (30초)
✅ 정상 데이터 전송 완료! 30초 대기 (Spark가 집계하도록)...

2️⃣ 이상 데이터 전송 중... (CTR 급락)
✅ 이상 데이터 전송 완료!
🔍 Spark 로그에서 '🚨 CTR 급락!' 알림 확인하세요!

비유: 소방훈련을 위해 일부러 화재 경보를 울리는 것

image-20251218024455778


🔄 데이터 흐름 (타임라인)

실제로 이상이 탐지되는 과정을 시간 순서대로 봅시다.

00:00초 - 정상 데이터 생성

터미널 1 (생성기):

✅ Sent impression event (US, banner)
✅ Sent click event (US, banner)
... (초당 80개)

터미널 2 (Spark):

📦 Processing Batch 0...
✅ Batch 0: Redis에 21개 메트릭 저장됨
   - US banner: CTR 5.0%

Redis 저장:

# 메트릭 저장
metrics:US:banner:1m = {"ctr": 5.0, "impressions": 100, "clicks": 5}

# 히스토리 저장 (평균 계산용)
history:US:banner:1h = {"avg_ctr": 5.0, "avg_impressions": 100}

00:30초 - 이상 데이터 발생!

터미널 4 (테스트):

2️⃣ 이상 데이터 전송 중... (CTR 급락)
   - impression 1000개, click 20개 (CTR 2%)

01:00초 - Spark가 이상 탐지!

터미널 2 (Spark):

# 1분 윈도우 집계
current_ctr = 2.0%
avg_ctr = 5.0%  # Redis history에서 조회

# 이상 탐지 로직
if current_ctr < avg_ctr * 0.5:  # 2.0 < 2.5 → True!
    alert = {
        "message": "🚨 CTR 급락! US banner: 5.00% → 2.00% (-60.0%)",
        "severity": "high",
        ...
    }
    send_to_kafka(alert)

출력:

⚠️  🚨 CTR 급락! US banner: 5.00% → 2.00% (-60.0%)
✅ 1개 알림을 Kafka로 전송 완료
🔔 Batch 1: 1개 알림 감지!

01:01초 - Slack Consumer가 수신!

터미널 3 (Consumer):

# Kafka에서 메시지 수신
message = consumer.poll()
alert = json.loads(message.value)

# 중복 체크
if not is_duplicate(alert):
    send_to_slack(alert)

출력:

🔔 알림 수신: 🚨 CTR 급락! US banner: 5.00% → 2.00% (-60.0%)
✅ Slack 전송 성공!
   📊 Severity: high | Type: ctr_drop

01:02초 - Slack 앱에 알림 도착!

Slack 화면:

🚨 📉 AdTech 이상 탐지
🚨 CTR 급락! UK banner: 5.70% → 0.00% (-100.0%)

국가:           광고 포맷:
UK              banner

메트릭:         심각도:
CTR             HIGH

현재 값:        예상 값:
0.00            5.70

변화율:         발생 시간:
-100.0%         2025-12-18T02:58:00

Alert ID: 712b0e22-0b84-4e86-b21c-77789d187cc6 | Type: ctr_drop

총 소요 시간: 약 30초~1분 (실시간!)

image-20251218025951339


🧠 핵심 기술: 지수 이동 평균 (EMA)

왜 단순 평균이 아닌 EMA를 쓸까?

문제 상황:

09시: CTR 5% (정상)
10시: CTR 5% (정상)
11시: CTR 5% (정상)
12시: CTR 20% (점심시간 트래픽 증가, 정상!)

단순 평균:

avg = (5 + 5 + 5 + 20) / 4 = 8.75%

# 다음 시간
13: CTR 10%
if 10% < 8.75% × 0.5:  # False (정상으로 판단)
    alert("CTR 급락!")

→ 점심시간 데이터가 평균을 왜곡!

EMA (지수 이동 평균):

# 최근 데이터에 더 큰 가중치
new_avg = old_avg × 0.8 + current_value × 0.2

09: avg = 5.0
10: avg = 5.0 × 0.8 + 5.0 × 0.2 = 5.0
11: avg = 5.0 × 0.8 + 5.0 × 0.2 = 5.0
12: avg = 5.0 × 0.8 + 20.0 × 0.2 = 8.0  # 천천히 증가

13: current = 10%
avg = 8.0 × 0.8 + 10.0 × 0.2 = 8.4
if 10% < 8.4% × 0.5:  # False (정상으로 판단)

→ 점진적 변화는 정상, 급격한 변화만 탐지!

실제 코드:

# Redis에서 히스토리 조회
history = redis.get(f"history:{country}:{ad_format}:1h")
avg_ctr = history['avg_ctr']

# 이상 탐지
if current_ctr < avg_ctr * 0.5:
    send_alert("CTR 급락!")

# EMA로 히스토리 업데이트
new_avg_ctr = avg_ctr * 0.8 + current_ctr * 0.2
redis.setex(f"history:{country}:{ad_format}:1h", 3600, {
    "avg_ctr": new_avg_ctr
})

🔍 중복 알림 방지 (Redis Cooldown)

왜 중복 방지가 필요한가?

문제 상황:

12:00 - CTR 급락 감지 → Slack 알림 전송
12:01 - CTR 여전히 낮음 → Slack 알림 전송
12:02 - CTR 여전히 낮음 → Slack 알림 전송
...
12:10 - Slack에 10개 알림 (스팸!)

해결: Redis Cooldown

def is_duplicate_alert(alert):
    # 알림 키 생성 (국가 + 포맷 + 타입)
    key = f"alert_sent:{alert['country']}:{alert['ad_format']}:{alert['alert_type']}"
    # 예: "alert_sent:US:banner:ctr_drop"
    
    # 이미 보냈나?
    if redis.exists(key):
        return True  # 중복!
    
    # 10분 동안 이 알림 금지
    redis.setex(key, 600, "1")
    return False

효과:

12:00 - CTR 급락 감지 → Slack 알림 전송 ✅
         Redis: alert_sent:US:banner:ctr_drop = "1" (TTL 10분)
12:01 - CTR 여전히 낮음 → 중복 체크 → ⏭️  스킵
12:02 - CTR 여전히 낮음 → 중복 체크 → ⏭️  스킵
...
12:10 - Redis 키 만료
12:11 - CTR 여전히 낮음 → Slack 알림 전송 ✅ (재발송)

💻 실전 코드 분석

1. Spark에서 이상 탐지 로직

def detect_anomalies(batch_df, batch_id):
    """
    배치 데이터에서 이상 패턴 탐지
    """
    import redis
    r = redis.Redis(host='redis', port=6379, db=0)
    alerts = []
    
    for row in batch_df.collect():
        # 현재 메트릭
        current_ctr = float(row.ctr) if row.ctr else 0.0
        
        # 과거 평균 조회
        history_key = f"history:{row.country}:{row.ad_format}:1h"
        history_data = r.get(history_key)
        
        if history_data:
            history = json.loads(history_data)
            avg_ctr = history.get('avg_ctr', current_ctr)
        else:
            avg_ctr = current_ctr  # 히스토리 없으면 현재값 사용
        
        # 🚨 이상 1: CTR 급락 (50% 이상 하락)
        if avg_ctr > 0 and current_ctr < avg_ctr * 0.5:
            deviation = ((current_ctr - avg_ctr) / avg_ctr) * 100
            alert = {
                "alert_id": str(uuid.uuid4()),
                "alert_time": row.window_end.isoformat(),
                "alert_type": "ctr_drop",
                "severity": "high" if deviation < -70 else "medium",
                "country": row.country,
                "ad_format": row.ad_format,
                "metric_name": "CTR",
                "current_value": current_ctr,
                "expected_value": avg_ctr,
                "deviation_percent": round(deviation, 2),
                "message": f"🚨 CTR 급락! {row.country} {row.ad_format}: {avg_ctr:.2f}% → {current_ctr:.2f}% ({deviation:.1f}%)"
            }
            alerts.append(alert)
            print(f"⚠️  {alert['message']}")
        
        # EMA로 히스토리 업데이트
        new_avg_ctr = avg_ctr * 0.8 + current_ctr * 0.2
        history_data = {
            "avg_ctr": round(new_avg_ctr, 4),
            "last_updated": row.last_updated.isoformat()
        }
        r.setex(history_key, 3600, json.dumps(history_data))
    
    # Kafka로 알림 전송
    if alerts:
        send_alerts_to_kafka(alerts)

핵심 포인트:

  • ✅ Redis에서 과거 평균 조회
  • ✅ 현재값과 비교 (50% 하락 체크)
  • ✅ 이상 감지 시 alert 생성
  • ✅ EMA로 히스토리 업데이트
  • ✅ Kafka로 알림 전송

2. Kafka로 알림 전송

from kafka import KafkaProducer
import json

def send_alerts_to_kafka(alerts):
    """
    탐지된 알림을 Kafka "alerts" 토픽으로 전송
    """
    try:
        producer = KafkaProducer(
            bootstrap_servers=['kafka:29092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        for alert in alerts:
            producer.send('alerts', value=alert)
        
        producer.flush()
        print(f"✅ {len(alerts)}개 알림을 Kafka로 전송 완료")
    except Exception as e:
        print(f"❌ Kafka 전송 실패: {str(e)}")

핵심 포인트:

  • ✅ JSON 직렬화 (datetime → 문자열)
  • alerts 토픽으로 전송
  • flush()로 즉시 전송 보장

3. Slack Consumer (kafka-python 사용)

from kafka import KafkaConsumer
import json
import redis
import requests

def consume_alerts():
    """
    Kafka alerts 토픽을 구독하여 알림 처리
    """
    consumer = KafkaConsumer(
        'alerts',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='latest',
        enable_auto_commit=True,
        group_id='slack-alert-consumer',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    r = redis.Redis(host='localhost', port=6379, db=1)
    
    for message in consumer:
        alert = message.value
        
        print(f"\n🔔 알림 수신: {alert['message']}")
        
        # 중복 체크
        key = f"alert_sent:{alert['country']}:{alert['ad_format']}:{alert['alert_type']}"
        if r.exists(key):
            print(f"   ⏭️  중복 알림 (10분 내 동일 알림 발생)")
            continue
        
        # Slack 전송
        send_to_slack(alert)
        
        # 10분 cooldown
        r.setex(key, 600, "1")

핵심 포인트:

  • value_deserializer로 자동 JSON 파싱
  • ✅ Redis로 중복 체크
  • ✅ Slack Webhook 호출
  • ✅ 10분 cooldown 설정

4. Slack Webhook 전송

def send_to_slack(alert):
    """
    Slack Webhook으로 알림 전송
    """
    severity_emoji = {
        "low": "ℹ️",
        "medium": "⚠️",
        "high": "🚨",
        "critical": "🔥"
    }
    
    emoji = severity_emoji.get(alert['severity'], "⚠️")
    
    message = {
        "text": f"{emoji} AdTech 이상 탐지",
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} {alert['message']}"
                }
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*국가:*\n{alert['country']}"},
                    {"type": "mrkdwn", "text": f"*광고 포맷:*\n{alert['ad_format']}"},
                    {"type": "mrkdwn", "text": f"*메트릭:*\n{alert['metric_name']}"},
                    {"type": "mrkdwn", "text": f"*심각도:*\n{alert['severity'].upper()}"},
                    {"type": "mrkdwn", "text": f"*현재 값:*\n{alert['current_value']:.2f}"},
                    {"type": "mrkdwn", "text": f"*예상 값:*\n{alert['expected_value']:.2f}"},
                    {"type": "mrkdwn", "text": f"*변화율:*\n{alert['deviation_percent']:.1f}%"},
                    {"type": "mrkdwn", "text": f"*발생 시간:*\n{alert['alert_time']}"}
                ]
            }
        ]
    }
    
    response = requests.post(SLACK_WEBHOOK_URL, json=message)
    
    if response.status_code == 200:
        print(f"✅ Slack 전송 성공!")
    else:
        print(f"❌ Slack 전송 실패: {response.status_code}")

핵심 포인트:

  • ✅ Slack Block Kit 사용 (예쁜 포맷팅)
  • ✅ 심각도별 이모지
  • ✅ 구조화된 필드 (국가, 포맷, 메트릭 등)

🐛 트러블슈팅 과정 (배운 점)

문제 1: confluent-kafka 컴파일 에러

에러:

fatal error: librdkafka/rdkafka.h: No such file or directory

원인: confluent-kafka는 C 라이브러리(librdkafka)가 필요한데 Spark 컨테이너에 없음

해결: kafka-python 사용 (순수 Python, 컴파일 불필요)

pip install kafka-python

교훈: 컨테이너 환경에서는 의존성이 적은 라이브러리 선택!


문제 2: datetime 객체 JSON 직렬화 실패

에러:

❌ Kafka 전송 실패: Object of type datetime is not JSON serializable

원인:

alert = {
    "alert_time": row.window_end,  # datetime 객체
    ...
}
json.dumps(alert)  # 에러!

해결:

alert = {
    "alert_time": row.window_end.isoformat(),  # 문자열 "2025-12-18T02:58:00"
    ...
}

교훈: JSON으로 전송할 데이터는 반드시 JSON 호환 타입으로 변환!


문제 3: Spark round() 함수 충돌

에러:

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got float.

원인: Python 내장 round()와 Spark의 round() 함수 충돌

# Spark는 이걸 Spark 함수로 해석
round((avg_ctr * 0.8 + current_ctr * 0.2), 4)

해결:

import builtins
builtins.round((avg_ctr * 0.8 + current_ctr * 0.2), 4)

교훈: Spark 컨텍스트에서는 Python 내장 함수도 조심!


📊 성능 측정

실제 측정 결과

메트릭
이벤트 생성 속도 초당 80개
Spark 처리 주기 30초
이상 탐지 지연 30~60초
Slack 알림 지연 1~2초
Total End-to-End ~1분

분석:

  • ✅ 실시간 광고 입찰(100ms)보다는 느림
  • ✅ 하지만 모니터링 목적으로는 충분히 빠름
  • ✅ 사람이 대시보드 보는 것보다 훨씬 빠름!

동시 알림 처리

테스트 결과:

📦 Processing Batch 1...
⚠️  46개 알림 감지!
✅ 46개 알림을 Kafka로 전송 완료 (1초 이내)

Slack Consumer:
🔔 알림 수신 1/46
🔔 알림 수신 2/46
...
🔔 알림 수신 46/46
✅ 총 소요 시간: 8초

분석:

  • ✅ 대량 알림도 안정적으로 처리
  • ✅ Kafka 버퍼링 덕분에 유실 없음
  • ✅ 중복 방지 로직 정상 작동

🎓 핵심 학습 내용

1. 이벤트 기반 아키텍처 (Event-Driven Architecture)

장점:

  • 비동기 처리: Spark와 Slack Consumer가 독립적으로 실행
  • 확장 가능: Consumer를 여러 개 띄워도 OK (Email, PagerDuty 등)
  • 장애 격리: Consumer가 죽어도 Spark는 계속 실행
  • 재처리 가능: Kafka에 메시지가 보관되어 있어서 재처리 가능

단점:

  • ❌ 복잡도 증가 (여러 컴포넌트 관리)
  • ❌ 디버깅 어려움 (분산 시스템)
  • ❌ 메시지 순서 보장 어려움

2. 통계 기반 vs ML 기반 이상 탐지

통계 기반 (우리가 구현한 것):

if current_value < average * 0.5:
    alert("이상!")

장점:

  • ✅ 구현 간단
  • ✅ 빠른 응답 (< 1초)
  • ✅ 해석 가능 (“평균보다 50% 낮음”)
  • ✅ 학습 데이터 불필요

단점:

  • ❌ Threshold 수동 설정
  • ❌ 복잡한 패턴 감지 어려움
  • ❌ 계절성/트렌드 반영 어려움

ML 기반 (Isolation Forest, LSTM Autoencoder 등):

장점:

  • ✅ 복잡한 패턴 자동 학습
  • ✅ 계절성/트렌드 반영
  • ✅ Threshold 자동 조정

단점:

  • ❌ 학습 데이터 필요 (최소 수주~수개월)
  • ❌ 느린 응답 (수초~수분)
  • ❌ 해석 어려움 (“왜 이상인가?”)
  • ❌ False Positive 많음

결론: AdTech 실시간 모니터링에서는 통계 기반이 더 적합!


3. Kafka의 역할 (메시지 큐)

왜 Redis Pub/Sub가 아니라 Kafka를 쓸까?

특징 Kafka Redis Pub/Sub
메시지 보관 ✅ 디스크에 저장 (재처리 가능) ❌ 메모리에만 (휘발성)
Consumer 추가 ✅ 여러 Consumer 독립적 소비 ❌ 구독자가 죽으면 유실
처리량 ✅ 초당 수백만 건 ⚠️ 초당 수만 건
장애 복구 ✅ Offset 관리로 재시작 가능 ❌ 메시지 유실

결론: 중요한 알림은 Kafka 사용!


🎯 포트폴리오 어필 포인트

면접에서 이렇게 말하세요

Q: “실시간 이상 탐지 시스템을 어떻게 구축했나요?”

A:

“AdTech에서 CTR이 50% 떨어지면 시간당 수백 달러 손실이 발생합니다. 이를 실시간으로 탐지하기 위해 이벤트 기반 아키텍처를 구축했습니다.

1. 실시간 집계 (Spark Streaming)

  • 1분 윈도우로 CTR, eCPM 등 핵심 메트릭 계산
  • 지수 이동 평균(EMA)으로 과거 패턴 학습

2. 이상 탐지 (통계 기반)

  • 4가지 패턴 자동 탐지 (CTR 급락/급등, 트래픽 폭증, 수익 급락)
  • Threshold: 평균 대비 ±50%

3. 알림 시스템 (Kafka + Slack)

  • Kafka로 알림을 비동기 전송 (확장성)
  • Redis로 중복 알림 방지 (10분 cooldown)
  • Slack Webhook으로 즉시 알림 (1분 이내)

성과:

  • 이상 탐지 지연: 평균 30초
  • 대량 알림 처리: 46개 알림 8초 내 전송
  • 중복 알림 99% 감소”

Q: “왜 ML이 아니라 통계 기반으로 했나요?”

A:

“AdTech 모니터링에서는 빠른 응답이 가장 중요합니다. ML 모델은 학습에 수주~수개월이 걸리고, 추론도 수초가 필요합니다. 반면 통계 기반 방법은:

  1. 즉시 사용 가능 (학습 데이터 불필요)
  2. 밀리초 단위 응답 (실시간 처리)
  3. 해석 가능 (팀원들이 이해 쉬움)

실제로 대부분의 AdTech 회사(Google Ads, Facebook Ads)도 실시간 모니터링은 통계 기반, 장기 예측은 ML을 사용합니다.”


Q: “4개 터미널이 어떻게 통신하나요?”

A:

“각 컴포넌트가 Kafka를 중심으로 느슨하게 결합되어 있습니다:

  1. 생성기 → Kafka (ad-events 토픽)
  2. Spark → Kafka (alerts 토픽)
  3. Consumer ← Kafka (alerts 구독)

이렇게 하면:

  • ✅ 각 컴포넌트가 독립적으로 실행
  • ✅ Consumer를 추가해도 생성기/Spark 수정 불필요
  • ✅ 장애 시 Kafka가 메시지 보관 (유실 방지)

또한 Redis를 2가지 용도로 사용:

  • 메트릭 캐싱 (실시간 조회)
  • 중복 알림 방지 (cooldown)”

🚀 다음 단계 (Day 7)

Day 6에서 구축한 이상 탐지 시스템이 완벽하게 작동합니다! 이제 선택할 수 있어요:

Option 1: Superset 대시보드 완성 ⭐ (추천!)

  • 알림 히스토리 차트 추가
  • 실시간 알림 패널
  • 국가별 이상 빈도 히트맵

Option 2: ClickHouse 알림 저장 활성화

  • ad_alerts 테이블 데이터 저장
  • SQL로 알림 분석
  • 주간/월간 리포트 생성

Option 3: 프로젝트 문서화

  • README 작성
  • 아키텍처 다이어그램
  • GitHub 업로드

📚 참고 자료


🎉 마무리

Day 6에서는 실시간 이상 탐지 시스템을 완성했습니다!

핵심 성과:

  • ✅ 4가지 이상 패턴 자동 탐지
  • ✅ 30초 내 Slack 알림
  • ✅ 이벤트 기반 아키텍처
  • ✅ 확장 가능한 구조

다음 단계:

  • 대시보드로 시각화
  • 장기 알림 분석
  • 프로젝트 문서화

질문 있으면 댓글로! 🚀