ADR

ADR-022: Data Pipeline & Analytics Strategy

Last updated: 2026-02-01 | Decisions

ADR-022: Data Pipeline & Analytics Strategy

Status

Revised — Updated for Knowledge Graph (ADR-019), Database Consolidation (ADR-005), and Hybrid Messaging (ADR-021)

Version Status Date Notes
v1 Proposed 2026-02-01 Original Airbyte reconfiguration plan
v2 Revised 2026-02-01 Integrated with Neo4j KG, Kafka streaming, updated consolidation numbers

Context

The platform’s data pipeline strategy must evolve to support three major architectural changes:

  1. ADR-019: Neo4j Knowledge Graph replaces Elasticsearch — adds graph-based analytics and RAG capabilities
  2. ADR-005: Database consolidation from 26 → 8 databases — changes CDC source structure
  3. ADR-021: Hybrid RabbitMQ + Kafka — enables real-time streaming analytics alongside batch CDC

Current State

Component Current Status
CDC tool Airbyte (self-managed) Working — ~80 DB connections (4 tenants × 20)
Warehouse Snowflake Working — analytics queries run here
BI tool Superset Working — dashboards for business metrics
Data freshness Near-real-time (Airbyte incremental sync) Adequate for batch
Real-time None Gap — no streaming analytics
Graph analytics None Gap — being addressed by Knowledge Graph
Data catalog None Gap — no metadata or lineage
Data quality None Gap — no automated checks

New Data Sources (Post-ADR Changes)

Source Type Analytics Value
Neo4j Knowledge Graph Graph + Vector Recommendations, semantic search metrics, RAG usage
Kafka Event Stream Real-time Live engagement, trending content, streaming metrics
Consolidated PostgreSQL CDC Transactional data (8 databases vs 26)

Decision

Evolve to a hybrid batch + streaming analytics architecture:

  1. Batch CDC (existing): Airbyte → Snowflake for historical/aggregate analytics
  2. Real-time streaming (new): Kafka → Snowflake Snowpipe for live dashboards
  3. Graph analytics (new): Neo4j → Snowflake for recommendation/search metrics
  4. Add quality layer: dbt + Great Expectations for transformation and validation

Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                         DATA PIPELINE ARCHITECTURE                           │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐                   │
│  │ PostgreSQL  │     │   Kafka     │     │   Neo4j     │                   │
│  │ (8 DBs)     │     │  (Events)   │     │ (Knowledge  │                   │
│  │             │     │             │     │   Graph)    │                   │
│  └──────┬──────┘     └──────┬──────┘     └──────┬──────┘                   │
│         │                   │                   │                          │
│         │ CDC               │ Stream            │ Graph                    │
│         ▼                   ▼                   ▼                          │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐                   │
│  │   Airbyte   │     │  Snowpipe   │     │  Neo4j →    │                   │
│  │ (Batch CDC) │     │ (Streaming) │     │  Connector  │                   │
│  └──────┬──────┘     └──────┬──────┘     └──────┬──────┘                   │
│         │                   │                   │                          │
│         └───────────────────┴───────────────────┘                          │
│                             │                                              │
│                             ▼                                              │
│                      ┌─────────────┐                                       │
│                      │  Snowflake  │                                       │
│                      │ (Warehouse) │                                       │
│                      └──────┬──────┘                                       │
│                             │                                              │
│                             ▼                                              │
│                      ┌─────────────┐                                       │
│                      │     dbt     │                                       │
│                      │ (Transform) │                                       │
│                      └──────┬──────┘                                       │
│                             │                                              │
│                             ▼                                              │
│                      ┌─────────────┐                                       │
│                      │  Superset   │                                       │
│                      │    (BI)     │                                       │
│                      └─────────────┘                                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Data Flow by Type

Data Type Source Ingestion Latency Use Case
Transactional PostgreSQL Airbyte CDC Minutes Historical analysis, compliance
Events Kafka Snowpipe Seconds Live dashboards, trending
Graph metrics Neo4j Scheduled export Hours Recommendation performance
Search analytics Knowledge Graph Direct API Real-time Query analysis, zero-results

ADR-005: Database Consolidation (26 → 8 Databases)

Airbyte Connection Reduction:

Current Target Reduction
26 databases × 4 tenants = 104 connections 8 databases × 4 tenants = 32 connections 69%

New Source Mapping:

Target DB Source DBs (Merged) Key Tables
peeq_users celebrity-db, fan-db, group-profile-db experts, fans, profiles
peeq_media content-db, media-db, shoutout-db, mux-livestream-db content, videos, shoutouts
peeq_commerce inventory-db, stripe-db, subscriptions-db, wallet-db transactions, subscriptions
peeq_events broadcast-db, stream-db, onsite-event-db, class-catalog-db broadcasts, events, classes
peeq_platform message-board-db, sse-db, tags-db, notification-service-db messages, notifications

ADR-019: Neo4j Knowledge Graph

New Analytics Capabilities:

Metric Source Value
Recommendation CTR Neo4j + Kafka events Measure recommendation effectiveness
Search relevance Knowledge Graph analytics Zero-result rate, query patterns
Expert discovery paths Graph traversals How fans find experts
Content engagement velocity Neo4j + engagement events Trending detection
RAG usage RagService metrics Q&A adoption, source attribution

Neo4j → Snowflake Integration:

-- Scheduled export from Neo4j to Snowflake (daily)
-- Option 1: Neo4j APOC export → S3 → Snowpipe
-- Option 2: Custom connector using Neo4j Java Driver → Snowflake JDBC

ADR-021: Hybrid Kafka + RabbitMQ

Streaming Analytics via Kafka:

Event Type Kafka Topic Analytics Use
content.viewed engagement.views Real-time view counts
subscription.created commerce.subscriptions Live conversion tracking
search.executed analytics.searches Query pattern analysis
recommendation.clicked analytics.recommendations Recommendation performance
broadcast.attended engagement.broadcasts Live attendance tracking

Snowpipe Integration:

-- Kafka → S3 → Snowpipe (continuous ingestion)
CREATE PIPE engagement_pipe AS
COPY INTO engagement_events
FROM @kafka_stage/engagement/
FILE_FORMAT = (TYPE = 'JSON');

Implementation Plan

Phase 1: Foundation (Weeks 1-4)

Task Owner Effort
Document current Airbyte connections Data Eng 1 week
Set up Kafka → S3 → Snowpipe pipeline Data Eng 2 weeks
Create Neo4j export scheduled job KG Team 1 week
Define dbt project structure Data Eng 1 week

Phase 2: Database Consolidation Sync (Weeks 5-8)

Task Owner Effort
Create new Airbyte connections (8 DBs) Data Eng 1 week
Run dual-sync (old + new) Data Eng 2 weeks
Validate row counts and data integrity Data Eng 1 week
Migrate Superset dashboards BI Team 1 week

Phase 3: Streaming Analytics (Weeks 9-12)

Task Owner Effort
Deploy Kafka Connect S3 sink Platform 1 week
Configure Snowpipe for event topics Data Eng 1 week
Build real-time dashboards in Superset BI Team 2 weeks
Add Knowledge Graph metrics export KG Team 1 week

Phase 4: Quality & Governance (Weeks 13-16)

Task Owner Effort
Implement dbt transformations Data Eng 2 weeks
Set up Great Expectations checks Data Eng 1 week
Create data catalog (dbt docs) Data Eng 1 week
Decommission old Airbyte connections Data Eng 1 week

Data Retention Policy

Data Category Retention Storage Tier Rationale
Transaction/payment 7 years Standard Financial compliance (SOX, PCI)
User profiles Active + 2 years Standard GDPR/CCPA
Content metadata Indefinite Standard Business analytics
Event streams 90 days raw Standard → Archive Real-time analysis window
Event aggregates 2 years Standard Historical trends
Graph snapshots 30 days Archive Point-in-time recovery
Search queries 1 year Standard Query optimization
Logs 30 days Archive Operational (ADR-014)

Hypothesis Background

Primary Hypothesis

A hybrid batch + streaming architecture provides the optimal balance of historical analytics (Airbyte CDC) and real-time insights (Kafka streaming), while Neo4j enables graph-based analytics not possible with relational data alone.

Evidence Chain

Claim Evidence Assurance
Airbyte working for batch CDC Running with 80+ connections L1
Database consolidation reduces complexity 26 → 8 DBs documented (ADR-005 analysis) L2
Kafka provides streaming capability ADR-021 hybrid architecture approved L1
Neo4j enables graph analytics Knowledge Graph implemented L2
Snowpipe handles streaming ingestion Snowflake documentation L1
dbt improves transformation reliability Industry standard practice L1

Overall Confidence: L1 (WLNK capped by untested Snowpipe integration)

Alternative Hypotheses

A1: Replace Snowflake with BigQuery - Rejected: Migration cost exceeds benefit; Snowflake working well

A2: Replace Airbyte with Fivetran - Deferred: Evaluate if Airbyte operational overhead is excessive post-consolidation

A3: Use Kafka Streams instead of Snowflake for real-time - Rejected: Adds complexity; Snowpipe provides adequate real-time latency

Falsifiability Criteria

Condition Threshold Action
Airbyte reconfiguration time >2 weeks per tenant Evaluate Fivetran
Snowpipe latency >5 minutes Investigate S3 staging
Data quality failures >5% of checks Review migration integrity
Neo4j export performance >1 hour for full export Implement incremental export
Snowflake cost increase >40% after streaming Optimize retention/aggregation

Bounded Validity

Scope

Expiry Conditions

Review Triggers

Monitoring

Metric Target Alert
Airbyte sync success rate >99% <95%
Snowpipe latency <2 minutes >5 minutes
Data freshness (batch) <15 minutes >30 minutes
dbt model success rate 100% Any failure
Data quality check pass rate >98% <95%

Consequences

Positive

Negative

Neutral



Original decision date: 2026-02-01 Revision date: 2026-02-01 Review by: 2026-08-01