ADR-022: Data Pipeline
& Analytics Strategy
Status
Revised — Updated for Knowledge Graph (ADR-019),
Database Consolidation (ADR-005), and Hybrid Messaging (ADR-021)
| Version |
Status |
Date |
Notes |
| v1 |
Proposed |
2026-02-01 |
Original Airbyte reconfiguration plan |
| v2 |
Revised |
2026-02-01 |
Integrated with Neo4j KG, Kafka streaming, updated consolidation
numbers |
Context
The platform’s data pipeline strategy must evolve to support three
major architectural changes:
- ADR-019: Neo4j Knowledge Graph replaces
Elasticsearch — adds graph-based analytics and RAG capabilities
- ADR-005: Database consolidation from 26 → 8
databases — changes CDC source structure
- ADR-021: Hybrid RabbitMQ + Kafka — enables
real-time streaming analytics alongside batch CDC
Current State
| Component |
Current |
Status |
| CDC tool |
Airbyte (self-managed) |
Working — ~80 DB connections (4 tenants × 20) |
| Warehouse |
Snowflake |
Working — analytics queries run here |
| BI tool |
Superset |
Working — dashboards for business metrics |
| Data freshness |
Near-real-time (Airbyte incremental sync) |
Adequate for batch |
| Real-time |
None |
Gap — no streaming analytics |
| Graph analytics |
None |
Gap — being addressed by Knowledge Graph |
| Data catalog |
None |
Gap — no metadata or lineage |
| Data quality |
None |
Gap — no automated checks |
New Data Sources (Post-ADR
Changes)
| Source |
Type |
Analytics Value |
| Neo4j Knowledge Graph |
Graph + Vector |
Recommendations, semantic search metrics, RAG usage |
| Kafka Event Stream |
Real-time |
Live engagement, trending content, streaming metrics |
| Consolidated PostgreSQL |
CDC |
Transactional data (8 databases vs 26) |
Decision
Evolve to a hybrid batch + streaming analytics
architecture:
- Batch CDC (existing): Airbyte → Snowflake for
historical/aggregate analytics
- Real-time streaming (new): Kafka → Snowflake
Snowpipe for live dashboards
- Graph analytics (new): Neo4j → Snowflake for
recommendation/search metrics
- Add quality layer: dbt + Great Expectations for
transformation and validation
Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATA PIPELINE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PostgreSQL │ │ Kafka │ │ Neo4j │ │
│ │ (8 DBs) │ │ (Events) │ │ (Knowledge │ │
│ │ │ │ │ │ Graph) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ │ CDC │ Stream │ Graph │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Airbyte │ │ Snowpipe │ │ Neo4j → │ │
│ │ (Batch CDC) │ │ (Streaming) │ │ Connector │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┴───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Snowflake │ │
│ │ (Warehouse) │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ dbt │ │
│ │ (Transform) │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Superset │ │
│ │ (BI) │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Data Flow by Type
| Data Type |
Source |
Ingestion |
Latency |
Use Case |
| Transactional |
PostgreSQL |
Airbyte CDC |
Minutes |
Historical analysis, compliance |
| Events |
Kafka |
Snowpipe |
Seconds |
Live dashboards, trending |
| Graph metrics |
Neo4j |
Scheduled export |
Hours |
Recommendation performance |
| Search analytics |
Knowledge Graph |
Direct API |
Real-time |
Query analysis, zero-results |
ADR-005: Database
Consolidation (26 → 8 Databases)
Airbyte Connection Reduction:
| Current |
Target |
Reduction |
| 26 databases × 4 tenants = 104 connections |
8 databases × 4 tenants = 32 connections |
69% |
New Source Mapping:
| Target DB |
Source DBs (Merged) |
Key Tables |
| peeq_users |
celebrity-db, fan-db, group-profile-db |
experts, fans, profiles |
| peeq_media |
content-db, media-db, shoutout-db, mux-livestream-db |
content, videos, shoutouts |
| peeq_commerce |
inventory-db, stripe-db, subscriptions-db, wallet-db |
transactions, subscriptions |
| peeq_events |
broadcast-db, stream-db, onsite-event-db, class-catalog-db |
broadcasts, events, classes |
| peeq_platform |
message-board-db, sse-db, tags-db, notification-service-db |
messages, notifications |
ADR-019: Neo4j Knowledge
Graph
New Analytics Capabilities:
| Metric |
Source |
Value |
| Recommendation CTR |
Neo4j + Kafka events |
Measure recommendation effectiveness |
| Search relevance |
Knowledge Graph analytics |
Zero-result rate, query patterns |
| Expert discovery paths |
Graph traversals |
How fans find experts |
| Content engagement velocity |
Neo4j + engagement events |
Trending detection |
| RAG usage |
RagService metrics |
Q&A adoption, source attribution |
Neo4j → Snowflake Integration:
-- Scheduled export from Neo4j to Snowflake (daily)
-- Option 1: Neo4j APOC export → S3 → Snowpipe
-- Option 2: Custom connector using Neo4j Java Driver → Snowflake JDBC
ADR-021: Hybrid Kafka +
RabbitMQ
Streaming Analytics via Kafka:
| Event Type |
Kafka Topic |
Analytics Use |
| content.viewed |
engagement.views |
Real-time view counts |
| subscription.created |
commerce.subscriptions |
Live conversion tracking |
| search.executed |
analytics.searches |
Query pattern analysis |
| recommendation.clicked |
analytics.recommendations |
Recommendation performance |
| broadcast.attended |
engagement.broadcasts |
Live attendance tracking |
Snowpipe Integration:
-- Kafka → S3 → Snowpipe (continuous ingestion)
CREATE PIPE engagement_pipe AS
COPY INTO engagement_events
FROM @kafka_stage/engagement/
FILE_FORMAT = (TYPE = 'JSON');
Implementation Plan
Phase 1: Foundation (Weeks
1-4)
| Task |
Owner |
Effort |
| Document current Airbyte connections |
Data Eng |
1 week |
| Set up Kafka → S3 → Snowpipe pipeline |
Data Eng |
2 weeks |
| Create Neo4j export scheduled job |
KG Team |
1 week |
| Define dbt project structure |
Data Eng |
1 week |
Phase 2: Database
Consolidation Sync (Weeks 5-8)
| Task |
Owner |
Effort |
| Create new Airbyte connections (8 DBs) |
Data Eng |
1 week |
| Run dual-sync (old + new) |
Data Eng |
2 weeks |
| Validate row counts and data integrity |
Data Eng |
1 week |
| Migrate Superset dashboards |
BI Team |
1 week |
Phase 3: Streaming
Analytics (Weeks 9-12)
| Task |
Owner |
Effort |
| Deploy Kafka Connect S3 sink |
Platform |
1 week |
| Configure Snowpipe for event topics |
Data Eng |
1 week |
| Build real-time dashboards in Superset |
BI Team |
2 weeks |
| Add Knowledge Graph metrics export |
KG Team |
1 week |
Phase 4: Quality &
Governance (Weeks 13-16)
| Task |
Owner |
Effort |
| Implement dbt transformations |
Data Eng |
2 weeks |
| Set up Great Expectations checks |
Data Eng |
1 week |
| Create data catalog (dbt docs) |
Data Eng |
1 week |
| Decommission old Airbyte connections |
Data Eng |
1 week |
Data Retention Policy
| Data Category |
Retention |
Storage Tier |
Rationale |
| Transaction/payment |
7 years |
Standard |
Financial compliance (SOX, PCI) |
| User profiles |
Active + 2 years |
Standard |
GDPR/CCPA |
| Content metadata |
Indefinite |
Standard |
Business analytics |
| Event streams |
90 days raw |
Standard → Archive |
Real-time analysis window |
| Event aggregates |
2 years |
Standard |
Historical trends |
| Graph snapshots |
30 days |
Archive |
Point-in-time recovery |
| Search queries |
1 year |
Standard |
Query optimization |
| Logs |
30 days |
Archive |
Operational (ADR-014) |
Hypothesis Background
Primary Hypothesis
A hybrid batch + streaming architecture provides the optimal
balance of historical analytics (Airbyte CDC) and real-time insights
(Kafka streaming), while Neo4j enables graph-based analytics not
possible with relational data alone.
Evidence Chain
| Claim |
Evidence |
Assurance |
| Airbyte working for batch CDC |
Running with 80+ connections |
L1 |
| Database consolidation reduces complexity |
26 → 8 DBs documented (ADR-005 analysis) |
L2 |
| Kafka provides streaming capability |
ADR-021 hybrid architecture approved |
L1 |
| Neo4j enables graph analytics |
Knowledge Graph implemented |
L2 |
| Snowpipe handles streaming ingestion |
Snowflake documentation |
L1 |
| dbt improves transformation reliability |
Industry standard practice |
L1 |
Overall Confidence: L1 (WLNK capped by untested
Snowpipe integration)
Alternative Hypotheses
A1: Replace Snowflake with BigQuery - Rejected:
Migration cost exceeds benefit; Snowflake working well
A2: Replace Airbyte with Fivetran - Deferred:
Evaluate if Airbyte operational overhead is excessive
post-consolidation
A3: Use Kafka Streams instead of Snowflake for
real-time - Rejected: Adds complexity; Snowpipe provides
adequate real-time latency
Falsifiability Criteria
| Condition |
Threshold |
Action |
| Airbyte reconfiguration time |
>2 weeks per tenant |
Evaluate Fivetran |
| Snowpipe latency |
>5 minutes |
Investigate S3 staging |
| Data quality failures |
>5% of checks |
Review migration integrity |
| Neo4j export performance |
>1 hour for full export |
Implement incremental export |
| Snowflake cost increase |
>40% after streaming |
Optimize retention/aggregation |
Bounded Validity
Scope
- Applies to: All analytics data flows from
PostgreSQL, Kafka, and Neo4j to Snowflake
- Does not apply to: ML training pipelines, real-time
personalization (handled by Knowledge Graph)
Expiry Conditions
- Time-based: Re-evaluate after 6 months of streaming
analytics operation
- Scale-based: If event volume exceeds 10M
events/day
- Cost-based: If Snowflake costs exceed budget by
>50%
Review Triggers
- Kafka streaming proving insufficient for real-time needs
- Snowflake performance degradation
- New analytics requirements (ML, advanced graph)
Monitoring
| Metric |
Target |
Alert |
| Airbyte sync success rate |
>99% |
<95% |
| Snowpipe latency |
<2 minutes |
>5 minutes |
| Data freshness (batch) |
<15 minutes |
>30 minutes |
| dbt model success rate |
100% |
Any failure |
| Data quality check pass rate |
>98% |
<95% |
Consequences
Positive
- Unified analytics: Batch + streaming + graph in one
warehouse
- Reduced connections: 104 → 32 Airbyte connections
(69% reduction)
- Real-time capability: Live dashboards via Kafka
streaming
- Graph insights: Recommendation/search analytics
from Neo4j
- Data quality: dbt + Great Expectations catch issues
early
- Lower complexity: Fewer databases to sync
Negative
- New infrastructure: Kafka → S3 → Snowpipe pipeline
to manage
- Neo4j export: Custom connector development
required
- Migration effort: Airbyte reconfiguration during
consolidation
- Learning curve: dbt and streaming concepts for
team
- Temporary dual-sync: Higher resource usage during
migration
Neutral
- Snowflake remains the single warehouse (no change)
- Superset remains the BI tool (no change)
- Data retention policies unchanged
- ADR-005: Database Consolidation (source schema
changes)
- ADR-019: Knowledge Graph (new analytics data
source)
- ADR-021: Messaging Architecture (Kafka streaming
source)
- ADR-014: Observability (log retention)
Original decision date: 2026-02-01 Revision date:
2026-02-01 Review by: 2026-08-01