insider trading agent 2
🚀 Airflow로 SEC 내부자 거래 실시간 모니터링 시스템 구축하기
📌 프로젝트 개요
미국 증권거래위원회(SEC)에 공시되는 내부자 거래(Insider Trading) 정보를 실시간으로 수집하고, 일일 매수/매도 상위 기업을 자동으로 분석하여 Slack으로 알림받는 시스템을 구축했습니다.
Tech Stack:
- Apache Airflow (워크플로우 오케스트레이션)
- MySQL 8.0 (데이터 저장)
- Redis (캐싱)
- Docker Compose (컨테이너 오케스트레이션)
- Python 3.11
- Slack Webhook (알림)
개발 기간: 1일 비용: $0 (로컬 실행)
🎯 왜 이 프로젝트를 만들었나?
내부자 거래는 기업의 임원, 이사 등이 자사 주식을 매수/매도할 때 SEC에 의무적으로 신고해야 하는 정보입니다. 이들은 일반 투자자보다 기업의 내부 상황을 잘 알고 있기 때문에, 내부자 거래 패턴을 분석하면 투자 인사이트를 얻을 수 있습니다.
핵심 아이디어:
- 임원들이 자기 회사 주식을 대량 매수하면? → 주가 상승 신호일 수 있음
- 임원들이 자기 회사 주식을 대량 매도하면? → 주가 하락 신호일 수 있음
🏗️ 시스템 아키텍처
┌─────────────────────────────────────────────────────────┐
│ SEC EDGAR RSS │
│ (Form 4 Insider Trading Filings) │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Airflow DAG #1: Crawler │
│ ┌──────────────────────────────────────────────────┐ │
│ │ fetch_sec_rss (최신 100개 Form 4 수집) │ │
│ └──────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌──────────────▼───────────────────────────────────┐ │
│ │ parse_and_save_filings (XML 파싱 + MySQL 저장) │ │
│ └──────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌──────────────▼───────────────────────────────────┐ │
│ │ send_to_kafka (통계 전송, 향후 구현) │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌──────────┐
│ MySQL │ ← insider_trades 테이블
└──────┬───┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Airflow DAG #2: Daily Ranking │
│ ┌──────────────────────┬──────────────────────────┐ │
│ │ calculate_top_buys │ calculate_top_sells │ │
│ └──────────┬───────────┴───────────┬──────────────┘ │
│ │ │ │
│ └───────────┬───────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ save_to_redis │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ save_to_mysql │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ generate_summary │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ send_slack_alert │ → 📱 Slack │
│ └──────────────────┘ │
└─────────────────────────────────────────────────────────┘
실행 스케줄:
- Crawler DAG: 30분마다 실행 (
*/30 * * * *) - Ranking DAG: 매일 오후 6시 실행 (
0 18 * * *)
💾 데이터베이스 스키마
insider_trades 테이블
CREATE TABLE insider_trades (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
-- Form 4 정보
accession_number VARCHAR(100) NOT NULL,
filing_date DATE NOT NULL,
-- 기업 정보
ticker VARCHAR(10) NOT NULL,
company_name VARCHAR(255) NOT NULL,
cik VARCHAR(20),
-- 내부자 정보
insider_name VARCHAR(255) NOT NULL,
insider_relationship VARCHAR(100),
is_director BOOLEAN DEFAULT FALSE,
is_officer BOOLEAN DEFAULT FALSE,
-- 거래 정보
transaction_date DATE NOT NULL,
transaction_code VARCHAR(10),
transaction_type ENUM('BUY','SELL','OPTION','GRANT','GIFT','OTHER'),
shares BIGINT NOT NULL,
price_per_share DECIMAL(15,4),
transaction_value DECIMAL(20,2),
shares_owned_after BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
-- 인덱스
INDEX idx_accession_number (accession_number),
INDEX idx_ticker (ticker),
INDEX idx_filing_date (filing_date),
INDEX idx_transaction_type (transaction_type)
);
핵심 필드:
ticker: 주식 티커 (예: AAPL, TSLA)insider_name: 내부자 이름transaction_type: BUY(매수), SELL(매도), OPTION(옵션 행사)transaction_value: 거래 금액 (달러)
🔧 핵심 기능 구현
1. SEC Form 4 XML 파서
SEC는 Form 4 데이터를 XML 형식으로 제공합니다. 이를 파싱하여 구조화된 데이터로 변환합니다.
파서 핵심 로직:
class Form4Parser:
def parse_form4(self, index_url: str) -> Dict:
# 1. Index 페이지에서 ownership.xml URL 추출
xml_url = self.get_xml_url_from_index(index_url)
# 2. XML 다운로드
xml_content = self.download_xml(xml_url)
# 3. XML 파싱
root = ET.fromstring(xml_content)
# Issuer (발행사/기업)
issuer = {
'cik': root.find('.//issuer/issuerCik').text,
'name': root.find('.//issuer/issuerName').text,
'ticker': root.find('.//issuer/issuerTradingSymbol').text
}
# Owner (내부자)
owner = {
'cik': root.find('.//reportingOwner/reportingOwnerId/rptOwnerCik').text,
'name': root.find('.//reportingOwner/reportingOwnerId/rptOwnerName').text,
'is_director': self._parse_bool(root.find('.//reportingOwner/reportingOwnerRelationship/isDirector')),
'is_officer': self._parse_bool(root.find('.//reportingOwner/reportingOwnerRelationship/isOfficer'))
}
# Transactions (거래 내역)
transactions = []
for trans in root.findall('.//nonDerivativeTransaction'):
transaction = {
'date': trans.find('.//transactionDate/value').text,
'code': trans.find('.//transactionCoding/transactionCode').text,
'shares': int(float(trans.find('.//transactionAmounts/transactionShares/value').text)),
'price': float(trans.find('.//transactionAmounts/transactionPricePerShare/value').text),
'type': self._get_transaction_type(code)
}
transactions.append(transaction)
return {
'issuer': issuer,
'owner': owner,
'transactions': transactions
}
거래 코드 매핑:
P→ BUY (매수)S→ SELL (매도)M→ OPTION (옵션 행사)A→ GRANT (무상 부여)
2. MySQL 저장 로직
중복 방지 및 트랜잭션 단위 저장:
class InsiderTradesDB:
def insert_filing(self, accession_number: str, filing_data: Dict) -> bool:
with connection.cursor() as cursor:
# 중복 확인
if self._is_duplicate(cursor, accession_number):
return False
# 각 거래별 INSERT
for trans in filing_data['transactions']:
try:
self._insert_transaction(cursor, accession_number, filing_data, trans)
except IntegrityError as e:
if '1062' in str(e): # Duplicate entry
break # 같은 Form 4의 다른 거래는 스킵
raise
connection.commit()
return True
3. 일일 랭킹 계산 SQL
매수 상위 10개 기업:
SELECT
ticker,
company_name,
COUNT(*) as buy_count,
SUM(transaction_value) as total_buy_value,
COUNT(DISTINCT insider_name) as insider_count,
GROUP_CONCAT(DISTINCT insider_name ORDER BY insider_name SEPARATOR ', ') as insiders
FROM insider_trades
WHERE DATE(transaction_date) = CURDATE()
AND transaction_type IN ('BUY', 'OPTION')
AND transaction_value > 0
GROUP BY ticker, company_name
ORDER BY total_buy_value DESC
LIMIT 10;
핵심 포인트:
CURDATE(): 하드코딩 없이 당일 자동 계산IN ('BUY', 'OPTION'): 옵션 행사도 매수로 간주GROUP_CONCAT: 여러 내부자를 하나의 문자열로
4. Slack 알림 포맷
def send_slack_notification(**context):
top_buys = context['task_instance'].xcom_pull(
task_ids='calculate_top_buys',
key='top_buys'
)
message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "📊 Daily Insider Trading Report"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*🟢 TOP 10 INSIDER BUYS ({today})*\n" +
"\n".join([
f"{i}. *{row['ticker']}* ({row['company_name'][:30]}...)\n"
f" 💰 ${row['total_buy_value']:,.2f} | "
f"📊 {row['buy_count']} txns | "
f"👥 {row['insider_count']} insider(s)"
for i, row in enumerate(top_buys, 1)
])
}
}
]
}
requests.post(webhook_url, json=message)
실제 알림 예시:
📊 Daily Insider Trading Report
🟢 TOP 10 INSIDER BUYS (2025-12-27)
1. IONQ (IonQ, Inc.)
💰 $23,050.00 | 📊 1 txns | 👥 1 insider(s)
2. AAPL (Apple Inc.)
💰 $1,250,000.00 | 📊 5 txns | 👥 3 insider(s)
───────────────────────────
🔴 TOP 10 INSIDER SELLS (2025-12-27)
1. TSLA (Tesla, Inc.)
💰 $5,000,000.00 | 📊 2 txns | 👥 1 insider(s)
🐳 Docker Compose 설정
7개 컨테이너 구성:
services:
mysql: # 데이터 저장
redis: # 캐싱 + Celery 브로커
zookeeper: # Kafka 코디네이터
kafka: # 메시지 큐 (향후 사용)
airflow-init: # DB 초기화
airflow-webserver: # UI (http://localhost:8081)
airflow-scheduler: # DAG 스케줄러
airflow-worker: # Task 실행
환경변수 전달 (중요!):
airflow-worker:
environment:
AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+pymysql://admin:password@mysql:3306/insider_trading
SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL} # .env에서 로드
📊 실행 결과
Crawler DAG 로그
📡 Fetching RSS from: https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&count=100
✅ Found 100 Form 4 filings
[1/100] 🔄 Parsing: urn:tag:sec.gov,2008:accession-number=0001193125-25-331321
📊 IONQ - IonQ, Inc.
👤 Chou Kathryn K. (Director)
💼 2 transaction(s)
✅ Saved to MySQL
- OPTION: 5,000 shares @ $4.61 = $23,050.00
- SELL: 5,000 shares @ $55.00 = $275,000.00
================================================================================
📊 PROCESSING SUMMARY
================================================================================
✅ Success: 92
⏭️ Duplicates: 8
❌ Errors: 0
📋 Total: 100
================================================================================
Ranking DAG 로그
📊 Calculating top 10 BUY transactions for today
✅ Found 15 companies with BUY transactions
1. AAPL (Apple Inc.): $5,000,000.00 (10 transactions, 5 insiders)
2. TSLA (Tesla, Inc.): $3,200,000.00 (8 transactions, 4 insiders)
...
✅ Slack notification sent successfully
🚧 트러블슈팅
문제 1: accession_number VARCHAR(50) → 데이터 길이 초과
증상:
Error: (1406, "Data too long for column 'accession_number' at row 1")
원인:
SEC의 accession_number는 실제로 66자까지 가능 (urn:tag:sec.gov,2008:accession-number=0001193125-25-331321)
해결:
ALTER TABLE insider_trades
MODIFY COLUMN accession_number VARCHAR(100) NOT NULL;
문제 2: 같은 Form 4에서 여러 거래 중복 에러
증상:
IntegrityError: (1062, "Duplicate entry ... for key 'accession_number'")
원인:
한 Form 4에 여러 거래가 있는데, 모두 같은 accession_number 사용
해결:
# UNIQUE 제약조건 제거
ALTER TABLE insider_trades DROP INDEX accession_number;
# 일반 INDEX로 변경
CREATE INDEX idx_accession_number ON insider_trades(accession_number);
문제 3: Slack 알림 안 옴
증상:
WARNING - ⚠️ SLACK_WEBHOOK_URL not set, skipping notification
원인: 환경변수가 Docker 컨테이너에 전달 안 됨
해결:
.env파일 생성:
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
docker-compose.yml에 추가:
airflow-worker:
environment:
SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL}
- 재시작:
docker-compose down
docker-compose up -d
📈 성능 최적화
1. Rate Limiting
SEC는 초당 10 request 제한:
for filing in filings:
time.sleep(0.15) # 6.67 req/sec → 안전
parse_form4(filing)
2. MySQL 인덱스
-- 조회 성능 향상
CREATE INDEX idx_ticker ON insider_trades(ticker);
CREATE INDEX idx_filing_date ON insider_trades(filing_date);
CREATE INDEX idx_transaction_type ON insider_trades(transaction_type);
-- 복합 인덱스 (날짜 + 타입 필터링)
CREATE INDEX idx_date_type ON insider_trades(transaction_date, transaction_type);
3. 중복 체크 최적화
def _is_duplicate(self, cursor, accession_number: str) -> bool:
sql = "SELECT 1 FROM insider_trades WHERE accession_number = %s LIMIT 1"
cursor.execute(sql, (accession_number,))
return cursor.fetchone() is not None # EXISTS 대신 SELECT 1 사용
🎯 향후 개선 사항
1. Reddit 감성 분석 통합
- r/wallstreetbets 크롤링
- FinBERT로 감성 점수 계산
- 내부자 거래 + Reddit 감성 결합
2. Kafka 실시간 스트리밍
SEC RSS → Airflow → Kafka Producer → Kafka Consumer → MySQL/Redis
3. FastAPI 대시보드
- 실시간 차트 (Chart.js)
- 티커별 내부자 거래 히스토리
- 알림 설정 UI
4. 클라우드 배포
- AWS ECS / GCP Cloud Run
- 24/7 자동 실행
- PostgreSQL RDS
💡 배운 점
1. Airflow XCom의 활용
Task 간 데이터 전달을 XCom으로 깔끔하게 처리:
# Producer Task
context['task_instance'].xcom_push(key='top_buys', value=results)
# Consumer Task
top_buys = context['task_instance'].xcom_pull(
task_ids='calculate_top_buys',
key='top_buys'
)
2. Docker Compose 환경변수 관리
.env 파일 + ${VARIABLE} 조합으로 보안 향상
3. SEC EDGAR XML 파싱
- RSS는 index.html URL만 제공
- 실제 데이터는 ownership.xml
- 거래 코드(P, S, M) 매핑 필요
4. MySQL pymysql 트랜잭션 처리
try:
cursor.execute(sql)
connection.commit()
except Exception as e:
connection.rollback()
raise
🔗 참고 자료
📦 프로젝트 구조
insider-trading-agent/
├── airflow/
│ ├── dags/
│ │ ├── sec_form4_crawler.py # 크롤러 DAG
│ │ └── daily_insider_ranking.py # 랭킹 DAG
│ ├── plugins/
│ │ ├── parsers/
│ │ │ └── form4_parser.py # XML 파서
│ │ └── database/
│ │ └── insider_db.py # MySQL 핸들러
│ └── Dockerfile
├── scripts/
│ └── init_db.sql # DB 초기화
├── docker-compose.yml
├── .env
└── README.md
🎬 마무리
하루 만에 SEC 내부자 거래 모니터링 시스템을 구축했습니다. Airflow의 강력한 스케줄링 기능과 Docker Compose의 편리한 컨테이너 관리 덕분에 빠르게 프로토타입을 완성할 수 있었습니다.
핵심 성과:
- ✅ 30분마다 최신 100개 Form 4 자동 수집
- ✅ 일일 매수/매도 TOP 10 자동 계산
- ✅ Slack으로 매일 오후 6시 알림
- ✅ 비용 $0 (로컬 실행)
다음에는 Reddit 감성 분석을 추가해서 소셜 미디어 트렌드와 내부자 거래를 결합한 투자 시그널을 만들어보려고 합니다!
GitHub: [링크 추가] Tech Blog: [링크 추가]
Tags: #Airflow #DataEngineering #Python #Docker #SEC #InsiderTrading #FinTech #Slack #MySQL