Speed Layer Pattern¶
The speed layer provides low-latency access to frequently-accessed data: user accounts, sessions, preferences, and real-time state. While Cassandra serves as both the speed layer and the system of record, optimal performance requires understanding which data is "hot" and designing schemas and access patterns accordingly.
Speed Layer Architecture¶
In enterprise environments, Cassandra commonly serves as a speed layer in front of legacy systems. The mainframe or relational database remains the system of record, while Cassandra provides low-latency reads for applications. CDC (Change Data Capture) keeps the speed layer synchronized.
CDC Pipeline¶
When the system of record is a mainframe, Oracle database, or other legacy system, Cassandra provides a high-performance read layer while CDC keeps data synchronized.
CDC Consumer Implementation¶
@KafkaListener(topics = "mainframe.accounts")
public class AccountCDCConsumer {
private final CqlSession session;
private final PreparedStatement upsertAccount;
private final PreparedStatement deleteAccount;
public void handleChange(CDCEvent event) {
switch (event.getOperation()) {
case INSERT, UPDATE -> upsertAccount(event.getAfter());
case DELETE -> deleteAccount(event.getBefore().getAccountId());
}
}
private void upsertAccount(AccountRecord record) {
session.execute(upsertAccount.bind(
record.getAccountId(),
record.getCustomerId(),
record.getAccountType(),
record.getBalance(),
record.getStatus(),
record.getLastUpdated()
));
}
private void deleteAccount(String accountId) {
session.execute(deleteAccount.bind(accountId));
}
}
Handling CDC Lag¶
CDC introduces latency between the source system and speed layer. Applications must handle potential staleness:
public class AccountSpeedLayer {
private final CqlSession cassandraSession;
private final LegacyClient mainframeClient;
public Account getAccount(String accountId, StalenessPolicy policy) {
// Read from speed layer
Account cached = readFromCassandra(accountId);
if (cached == null) {
// Not yet replicated - fall back to source
return readFromMainframe(accountId);
}
if (policy == StalenessPolicy.STRICT) {
// For critical operations, verify against source
Instant sourceTimestamp = getSourceTimestamp(accountId);
if (cached.getLastUpdated().isBefore(sourceTimestamp)) {
return readFromMainframe(accountId);
}
}
return cached;
}
}
Write-Through to Legacy¶
For applications that need to write data:
public class AccountService {
public void updateBalance(String accountId, BigDecimal newBalance) {
// Write to system of record (mainframe)
mainframeClient.updateBalance(accountId, newBalance);
// CDC will propagate change to Cassandra
// Optionally, write-through for immediate consistency:
cassandraSession.execute(updateAccountBalance.bind(
newBalance, Instant.now(), accountId
));
}
}
The Hot Data Problem¶
Not all data is accessed equally. In a typical application:
- User profile: Accessed on every authenticated request
- Session data: Read/written multiple times per second during active sessions
- Preferences: Loaded once per session, rarely modified
- Historical data: Accessed occasionally for reporting
Treating all data identically leads to either over-provisioning (expensive) or under-serving hot data (slow). The speed layer pattern optimizes for the hot path while maintaining access to the full dataset.
User Account Data Architecture¶
User account data is the canonical speed layer use case: accessed constantly, latency-sensitive, and critical to application function.
Schema Design¶
-- Primary user record (optimized for lookup by ID)
CREATE TABLE users (
user_id UUID,
email TEXT,
username TEXT,
password_hash TEXT,
status TEXT,
created_at TIMESTAMP,
last_login_at TIMESTAMP,
profile FROZEN<user_profile>,
preferences MAP<TEXT, TEXT>,
roles SET<TEXT>,
PRIMARY KEY (user_id)
);
-- Lookup by email (for login)
CREATE TABLE users_by_email (
email TEXT,
user_id UUID,
password_hash TEXT,
status TEXT,
PRIMARY KEY (email)
);
-- Lookup by username (for @mentions, profile URLs)
CREATE TABLE users_by_username (
username TEXT,
user_id UUID,
PRIMARY KEY (username)
);
CREATE TYPE user_profile (
display_name TEXT,
avatar_url TEXT,
bio TEXT,
location TEXT,
website TEXT
);
Design rationale:
- Denormalized lookups: Each access pattern has its own table, so no secondary indexes are required
- Frozen UDT for profile: Atomic read/write of profile data
- Map for preferences: Flexible key-value storage without schema changes
- Minimal joins: All data for common operations in single partition
Optimized Read Path¶
public class UserSpeedLayer {
private final CqlSession session;
private final LoadingCache<UUID, User> userCache;
private final LoadingCache<String, UUID> emailIndex;
public UserSpeedLayer(CqlSession session) {
this.session = session;
// In-memory cache for hottest data
this.userCache = Caffeine.newBuilder()
.maximumSize(100_000) // Top 100K users
.expireAfterWrite(Duration.ofMinutes(5))
.refreshAfterWrite(Duration.ofMinutes(1))
.buildAsync(this::loadUser);
this.emailIndex = Caffeine.newBuilder()
.maximumSize(100_000)
.expireAfterWrite(Duration.ofMinutes(5))
.build(this::lookupUserIdByEmail);
}
public CompletableFuture<User> getUser(UUID userId) {
return userCache.get(userId);
}
public CompletableFuture<User> getUserByEmail(String email) {
return emailIndex.get(email)
.thenCompose(this::getUser);
}
private User loadUser(UUID userId) {
Row row = session.execute(
selectUser.bind(userId)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
).one();
return row != null ? mapToUser(row) : null;
}
private UUID lookupUserIdByEmail(String email) {
Row row = session.execute(
selectUserByEmail.bind(email)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
).one();
return row != null ? row.getUuid("user_id") : null;
}
}
Write-Through Updates¶
Maintain cache consistency on writes:
public class UserService {
private final UserSpeedLayer speedLayer;
private final CqlSession session;
public void updateUser(UUID userId, UserUpdate update) {
// Update Cassandra
BatchStatement batch = BatchStatement.newInstance(BatchType.LOGGED);
batch = batch.add(updateUsers.bind(
update.getProfile(), update.getPreferences(), userId));
// Update lookup tables if relevant fields changed
if (update.getEmailChanged()) {
batch = batch.add(deleteUsersByEmail.bind(update.getOldEmail()));
batch = batch.add(insertUsersByEmail.bind(
update.getNewEmail(), userId, /* ... */));
}
session.execute(batch.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
// Invalidate cache
speedLayer.invalidate(userId);
// Invalidate email index if changed
if (update.getEmailChanged()) {
speedLayer.invalidateEmail(update.getOldEmail());
}
}
}
Session Management¶
Sessions require even lower latency than user data and have unique access patterns.
Session Schema¶
CREATE TABLE sessions (
session_id TEXT,
user_id UUID,
created_at TIMESTAMP,
last_accessed_at TIMESTAMP,
expires_at TIMESTAMP,
ip_address TEXT,
user_agent TEXT,
data MAP<TEXT, TEXT>,
PRIMARY KEY (session_id)
) WITH default_time_to_live = 86400; -- 24-hour default TTL
-- Sessions by user (for "logout all devices")
CREATE TABLE sessions_by_user (
user_id UUID,
session_id TEXT,
created_at TIMESTAMP,
last_accessed_at TIMESTAMP,
device_info TEXT,
PRIMARY KEY ((user_id), created_at, session_id)
) WITH CLUSTERING ORDER BY (created_at DESC)
AND default_time_to_live = 86400;
Session Service¶
public class SessionSpeedLayer {
private final CqlSession session;
private final Cache<String, SessionData> sessionCache;
public SessionSpeedLayer(CqlSession session) {
this.session = session;
// Very aggressive caching for sessions
this.sessionCache = Caffeine.newBuilder()
.maximumSize(500_000) // 500K active sessions
.expireAfterAccess(Duration.ofMinutes(15))
.build();
}
public SessionData getSession(String sessionId) {
// Check cache first
SessionData cached = sessionCache.getIfPresent(sessionId);
if (cached != null) {
if (cached.isExpired()) {
sessionCache.invalidate(sessionId);
return null;
}
return cached;
}
// Load from Cassandra
Row row = session.execute(
selectSession.bind(sessionId)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
).one();
if (row == null) {
return null;
}
SessionData sessionData = mapToSession(row);
if (sessionData.isExpired()) {
return null;
}
sessionCache.put(sessionId, sessionData);
return sessionData;
}
public String createSession(UUID userId, SessionRequest request) {
String sessionId = generateSessionId();
Instant now = Instant.now();
Instant expiresAt = now.plus(Duration.ofHours(24));
// Write to both tables
BatchStatement batch = BatchStatement.newInstance(BatchType.LOGGED)
.add(insertSession.bind(
sessionId, userId, now, now, expiresAt,
request.getIpAddress(), request.getUserAgent(),
new HashMap<>()
))
.add(insertSessionByUser.bind(
userId, sessionId, now, now,
request.getDeviceInfo()
));
session.execute(batch.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
SessionData sessionData = new SessionData(
sessionId, userId, now, expiresAt, new HashMap<>());
sessionCache.put(sessionId, sessionData);
return sessionId;
}
public void touchSession(String sessionId) {
Instant now = Instant.now();
// Update last accessed time
session.executeAsync(
updateSessionAccess.bind(now, sessionId)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
);
// Update cache
SessionData cached = sessionCache.getIfPresent(sessionId);
if (cached != null) {
cached.setLastAccessedAt(now);
}
}
public void destroySession(String sessionId) {
SessionData sessionData = getSession(sessionId);
if (sessionData != null) {
// Delete from both tables
BatchStatement batch = BatchStatement.newInstance(BatchType.LOGGED)
.add(deleteSession.bind(sessionId))
.add(deleteSessionByUser.bind(
sessionData.getUserId(),
sessionData.getCreatedAt(),
sessionId
));
session.execute(batch);
}
sessionCache.invalidate(sessionId);
}
public void destroyAllUserSessions(UUID userId) {
// Get all sessions for user
List<Row> sessions = session.execute(
selectSessionsByUser.bind(userId)
).all();
// Delete each session
for (Row row : sessions) {
destroySession(row.getString("session_id"));
}
}
}
Real-Time Counters and Aggregates¶
Some speed layer data is derived: counters, aggregates, and computed values that support real-time features.
Counter Tables¶
-- Real-time user statistics
CREATE TABLE user_stats (
user_id UUID,
stat_name TEXT,
stat_value COUNTER,
PRIMARY KEY ((user_id), stat_name)
);
-- Global feature counters
CREATE TABLE feature_counters (
feature_name TEXT,
time_bucket TEXT, -- Hour or day bucket
counter_value COUNTER,
PRIMARY KEY ((feature_name, time_bucket))
);
Counter Service¶
public class UserStatsService {
public void incrementStat(UUID userId, String statName) {
session.executeAsync(
incrementUserStat.bind(userId, statName)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
);
}
public Map<String, Long> getUserStats(UUID userId) {
return session.execute(
selectUserStats.bind(userId)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
).all().stream()
.collect(Collectors.toMap(
row -> row.getString("stat_name"),
row -> row.getLong("stat_value")
));
}
}
Materialized Speed Views¶
Pre-compute expensive queries into speed-optimized views:
View Schema¶
-- Pre-computed user dashboard data
CREATE TABLE user_dashboard (
user_id UUID,
last_computed_at TIMESTAMP,
unread_notifications INT,
pending_tasks INT,
recent_activity_summary TEXT,
recommended_items LIST<UUID>,
PRIMARY KEY (user_id)
) WITH default_time_to_live = 300; -- 5-minute TTL forces refresh
Background Computation¶
@Scheduled(fixedDelay = 60000) // Every minute
public void refreshDashboards() {
// Get users with recent activity
List<UUID> activeUsers = getRecentlyActiveUsers(Duration.ofMinutes(15));
for (UUID userId : activeUsers) {
try {
refreshDashboard(userId);
} catch (Exception e) {
log.warn("Failed to refresh dashboard for {}", userId, e);
}
}
}
private void refreshDashboard(UUID userId) {
// Compute expensive aggregates
int unreadNotifications = countUnreadNotifications(userId);
int pendingTasks = countPendingTasks(userId);
String activitySummary = computeActivitySummary(userId);
List<UUID> recommendations = computeRecommendations(userId);
// Write to speed view
session.execute(insertDashboard.bind(
userId, Instant.now(),
unreadNotifications, pendingTasks,
activitySummary, recommendations
));
}
Reading Speed Views¶
public class DashboardService {
public UserDashboard getDashboard(UUID userId) {
// Try speed view first
Row row = session.execute(
selectDashboard.bind(userId)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
).one();
if (row != null && isRecent(row.getTimestamp("last_computed_at"))) {
return mapToDashboard(row);
}
// Fall back to real-time computation
return computeDashboardRealTime(userId);
}
private boolean isRecent(Instant lastComputed) {
return lastComputed != null &&
lastComputed.isAfter(Instant.now().minus(Duration.ofMinutes(5)));
}
}
Consistency Considerations¶
Speed layer trades consistency for latency. Understanding the trade-offs is essential.
Consistency Level Selection¶
| Data Type | Read CL | Write CL | Rationale |
|---|---|---|---|
| Session validation | LOCAL_ONE | LOCAL_QUORUM | Fast reads, durable writes |
| User profile display | LOCAL_ONE | LOCAL_QUORUM | Stale data acceptable |
| Authentication | LOCAL_QUORUM | LOCAL_QUORUM | Security-critical |
| Counters | LOCAL_ONE | LOCAL_ONE | Approximate is acceptable |
| Dashboard data | LOCAL_ONE | LOCAL_ONE | Pre-computed, non-critical |
Stale Read Handling¶
public class StaleDataHandler {
public User getUserWithStaleness(UUID userId, Duration maxStaleness) {
// Get from cache with staleness check
CacheEntry<User> entry = userCache.getEntry(userId);
if (entry != null) {
Duration age = Duration.between(entry.getLoadedAt(), Instant.now());
if (age.compareTo(maxStaleness) <= 0) {
return entry.getValue();
}
// Data too stale - refresh
if (age.compareTo(maxStaleness.multipliedBy(2)) <= 0) {
// Return stale data but trigger async refresh
refreshAsync(userId);
return entry.getValue();
}
}
// Must load fresh
return loadUser(userId);
}
}
Cache Invalidation Strategies¶
Event-Driven Invalidation¶
@KafkaListener(topics = "user-events")
public void handleUserEvent(UserEvent event) {
switch (event.getType()) {
case USER_UPDATED:
speedLayer.invalidateUser(event.getUserId());
break;
case USER_DELETED:
speedLayer.invalidateUser(event.getUserId());
speedLayer.invalidateEmail(event.getEmail());
break;
case PASSWORD_CHANGED:
// Invalidate all sessions
speedLayer.invalidateUserSessions(event.getUserId());
break;
}
}
TTL-Based Refresh¶
// Cache configuration with automatic refresh
Cache<UUID, User> userCache = Caffeine.newBuilder()
.maximumSize(100_000)
.expireAfterWrite(Duration.ofMinutes(5))
.refreshAfterWrite(Duration.ofMinutes(1))
.build(userId -> loadUser(userId));
Versioned Invalidation¶
public class VersionedCache {
private final ConcurrentMap<UUID, VersionedEntry<User>> cache;
public void put(UUID userId, User user, long version) {
cache.compute(userId, (id, existing) -> {
if (existing == null || existing.getVersion() < version) {
return new VersionedEntry<>(user, version);
}
return existing; // Keep newer version
});
}
public void invalidateIfOlder(UUID userId, long version) {
cache.computeIfPresent(userId, (id, existing) -> {
if (existing.getVersion() <= version) {
return null; // Remove
}
return existing; // Keep newer version
});
}
}
Monitoring Speed Layer Performance¶
Key Metrics¶
| Metric | Description | Alert Threshold |
|---|---|---|
speed_layer.cache_hit_ratio |
Cache hit percentage | < 90% |
speed_layer.read_latency_p99 |
99th percentile read latency | > 10ms |
speed_layer.cache_size |
Current cache entries | Near maximum |
speed_layer.eviction_rate |
Cache evictions per second | Sustained high |
speed_layer.stale_reads |
Reads returning stale data | > 5% |
Health Check¶
@Component
public class SpeedLayerHealthIndicator implements HealthIndicator {
@Override
public Health health() {
Map<String, Object> details = new HashMap<>();
// Check cache hit ratio
double hitRatio = getCacheHitRatio();
details.put("cache_hit_ratio", hitRatio);
// Check read latency
double p99Latency = getP99ReadLatency();
details.put("p99_read_latency_ms", p99Latency);
// Check Cassandra connectivity
boolean cassandraHealthy = checkCassandraHealth();
details.put("cassandra_healthy", cassandraHealthy);
if (hitRatio < 0.8 || p99Latency > 50 || !cassandraHealthy) {
return Health.down().withDetails(details).build();
}
return Health.up().withDetails(details).build();
}
}
Summary¶
The speed layer pattern optimizes access to hot data through:
- Schema design for access patterns - Denormalized tables for each lookup path
- In-memory caching - Caffeine or similar for sub-millisecond access
- Appropriate consistency levels - LOCAL_ONE for reads where stale data is acceptable
- Write-through updates - Maintain cache consistency on mutations
- Pre-computed views - Background computation of expensive aggregates
- TTL-based lifecycle - Automatic expiration of transient data
Cassandra provides the durable backing store while in-memory caches serve the hottest data. The combination achieves both the reliability of a distributed database and the performance of in-memory access.
Related Documentation¶
- CQRS Pattern - Separating read and write models
- Multi-Tenant Isolation - Per-tenant speed layers
- Time-Series Data - Hot vs cold data tiering