Aggregate Commands¶
User-Defined Aggregates (UDAs) process multiple rows and produce a single result value. UDAs combine a state function, optional final function, and initial state to implement custom aggregation logic like weighted averages, custom statistics, or domain-specific calculations.
Overview¶
What are User-Defined Aggregates?¶
UDAs extend CQL's aggregation capabilities beyond the built-in functions (COUNT, SUM, AVG, MIN, MAX). They enable custom aggregation logic implemented in Java or JavaScript, executed on the coordinator node during query processing.
-- Built-in aggregate
SELECT AVG(price) FROM products WHERE category = 'electronics';
-- User-defined aggregate for weighted average
SELECT weighted_avg(price, quantity) FROM order_items WHERE order_id = ?;
Version Support¶
| Feature | Cassandra Version | Notes |
|---|---|---|
| User-Defined Aggregates | 2.2+ | Initial UDA support |
| OR REPLACE syntax | 3.0+ | Replace existing aggregates |
| Java UDFs | 2.2+ | Always available |
| JavaScript UDFs | 2.2+ | Requires configuration |
| Improved null handling | 3.0+ | Better INITCOND behavior |
Configuration Requirements¶
UDAs require User-Defined Functions (UDFs) to be enabled in cassandra.yaml:
# Enable Java UDFs (required for UDAs)
enable_user_defined_functions: true
# Enable JavaScript UDFs (optional)
enable_scripted_user_defined_functions: true
# Timeout settings
user_defined_function_warn_timeout: 500ms
user_defined_function_fail_timeout: 10000ms
Security Consideration
Enabling UDFs allows execution of user-provided code on cluster nodes. In multi-tenant environments, restrict UDF creation permissions to trusted roles only.
Built-in vs User-Defined Aggregates¶
| Aspect | Built-in Aggregates | User-Defined Aggregates |
|---|---|---|
| Performance | Optimized native code | JVM overhead |
| Availability | Always available | Requires configuration |
| Customization | Fixed behavior | Fully customizable |
| Maintenance | Automatic | User managed |
| Examples | COUNT, SUM, AVG, MIN, MAX | Weighted avg, percentiles, custom stats |
When to Use UDAs
Use built-in aggregates when possible—they are faster and require no setup. Use UDAs when:
- Built-in aggregates don't meet requirements
- Custom business logic is needed
- Multiple values must be combined (e.g., weighted calculations)
- Domain-specific statistics are required
Behavioral Guarantees¶
What UDA Operations Guarantee¶
- CREATE AGGREGATE creates schema metadata that propagates to all nodes via gossip
- Aggregate state function is called once per row in the result set
- Final function (if specified) is called once after all rows are processed
- INITCOND provides the initial state value for the aggregation
- OR REPLACE atomically updates an existing aggregate definition
- IF NOT EXISTS provides idempotent aggregate creation
What UDA Operations Do NOT Guarantee¶
Undefined Behavior
The following behaviors are undefined and must not be relied upon:
- Execution order: The order in which rows are processed is not guaranteed (unless ORDER BY is used)
- Partial aggregation: Aggregation is not distributed; all matching rows are sent to coordinator
- State persistence: Aggregate state is not persisted between queries
- Memory limits: Large result sets may cause coordinator memory exhaustion
- Timeout during aggregation: Partial state is discarded on timeout
Execution Contract¶
| Property | Guarantee |
|---|---|
| Execution location | Coordinator node only |
| State function calls | Once per row in result set |
| Final function calls | Once after all rows processed |
| Initial state | INITCOND value or type default if null |
| Null row handling | Controlled by underlying state function's null handling |
Aggregation Order Contract¶
| Query | Row Processing Order |
|---|---|
| Without ORDER BY | Undefined (implementation-dependent) |
| With ORDER BY | Specified order |
| Multiple partitions | Undefined between partitions |
Failure Semantics¶
| Failure Mode | Outcome | Client Action |
|---|---|---|
| State function throws exception | Query fails, partial state discarded | Fix state function |
| Final function throws exception | Query fails, aggregation result lost | Fix final function |
| Timeout during aggregation | Query fails, partial state discarded | Reduce result set or increase timeout |
| Referenced function dropped | Query fails with InvalidRequestException |
Recreate function or aggregate |
Version-Specific Behavior¶
| Version | Behavior |
|---|---|
| 2.2+ | User-defined aggregates introduced (CASSANDRA-6890) |
| 3.0+ | OR REPLACE syntax, improved null handling |
| 4.0+ | Better timeout handling during aggregation |
Aggregate Architecture¶
How UDAs Work¶
Aggregates process rows through a state accumulation pattern:
Components¶
| Component | Required | Purpose | Example |
|---|---|---|---|
| SFUNC | Yes | State function called for each row | sum_state(state, val) |
| STYPE | Yes | Data type of the state variable | BIGINT, TUPLE<...> |
| FINALFUNC | No | Transforms final state to result | avg_final(state) → DOUBLE |
| INITCOND | No | Initial state value (defaults to null) | 0, (0, 0) |
Execution Flow¶
- State initialized to
INITCOND(or null if not specified) - For each row matching the query:
- State function called:
new_state = SFUNC(current_state, input_values) - State is updated with return value
- State function called:
- After all rows processed:
- If
FINALFUNCdefined:result = FINALFUNC(final_state) - Otherwise:
result = final_state
- If
Execution Location¶
UDAs execute entirely on the coordinator node:
Performance Implications
- All matching rows are streamed to the coordinator
- UDA computation happens in coordinator's JVM
- Large result sets can cause memory pressure
- Consider filtering with WHERE clause to limit rows
Null Handling¶
Understanding null behavior is critical for correct UDA implementation:
| Scenario | Behavior |
|---|---|
| Input value is NULL | State function is not called; state remains unchanged |
| State function returns NULL | State becomes NULL for remainder of aggregation |
| INITCOND is NULL or unspecified | First SFUNC call receives NULL as initial state |
| All input values are NULL | Result is FINALFUNC(INITCOND) or INITCOND if no FINALFUNC |
Null Propagation
If the state function returns NULL at any point, the state remains NULL for all subsequent rows. This is particularly dangerous with arithmetic operations:
// Dangerous: returns null if state is ever null
return state + val; // null + anything = null
// Safe: handle null state explicitly
if (state == null) return (long) val;
return state + val;
Best Practices for Null Safety
- Always specify
INITCONDto avoid null initial state - Use
CALLED ON NULL INPUTfor state functions - Explicitly check for null state AND null input values in SFUNC
- Test aggregation with datasets containing NULL values
Non-Deterministic Functions¶
Avoid Non-Deterministic Functions in UDAs
UDAs can reference UDFs that use non-deterministic operations like now() or uuid(). This leads to unpredictable results because:
- Aggregation assumes deterministic behavior for consistency
- The same input data may produce different results on each execution
- Replayed queries (e.g., during repair or retry) may yield inconsistent values
Recommendation: Use only deterministic functions in SFUNC and FINALFUNC. If timestamps or UUIDs are needed, pass them as input parameters rather than generating them inside the function.
Type Constraints¶
Strict type matching is enforced between aggregate components:
| Constraint | Requirement | Error if Violated |
|---|---|---|
| SFUNC return type | Must exactly match STYPE | InvalidRequestException at creation |
| SFUNC first parameter | Must exactly match STYPE | InvalidRequestException at creation |
| FINALFUNC input type | Must exactly match STYPE | InvalidRequestException at creation |
| INITCOND type | Must be literal of STYPE | Compile-time error |
INITCOND Parsing Rules:
-- Scalar types: use literal values
INITCOND 0 -- for INT, BIGINT
INITCOND 0.0 -- for DOUBLE, FLOAT
INITCOND '' -- for TEXT
-- Tuple types: use parentheses
INITCOND (0, 0) -- for TUPLE<BIGINT, BIGINT>
INITCOND (0, 0.0, '') -- for TUPLE<INT, DOUBLE, TEXT>
-- Collection types: use brackets/braces
INITCOND [] -- for LIST
INITCOND {} -- for SET or empty MAP
INITCOND {'key': 0} -- for MAP<TEXT, INT>
-- UDT types: use named fields
INITCOND {field1: 0, field2: ''}
Sandbox Restrictions¶
UDFs (and therefore UDAs) execute in a sandboxed JVM environment with significant restrictions:
| Restriction | Description |
|---|---|
| No file system access | Cannot read or write files |
| No network access | Cannot open sockets or make HTTP calls |
| Limited Java classes | Only whitelisted JDK classes available |
| No reflection | java.lang.reflect package blocked |
| No threading | Cannot create threads or use concurrency utilities |
| Memory limits | Bounded by user_function_timeout settings |
Allowed Classes
The sandbox permits basic Java classes: primitives, String, Math, collections (List, Set, Map), and Cassandra driver types (TupleValue, UDTValue). Attempting to use restricted classes throws SecurityException.
Distributed Consistency¶
UDA execution has implications for distributed consistency:
| Aspect | Behavior |
|---|---|
| Aggregation location | Coordinator collects all rows, then aggregates |
| Partition ordering | Rows from different partitions have undefined order |
| Global ordering | Not guaranteed unless single partition with ORDER BY |
| Result merging | No distributed reduce phase; all data flows to coordinator |
No Global Ordering Guarantees
If a UDA relies on processing rows in a specific global order or expects uniqueness across partitions, results may differ from expectations. Cassandra does not guarantee ordering between partitions.
-- Order guaranteed within partition
SELECT my_agg(value) FROM table WHERE partition_key = ?
ORDER BY clustering_col;
-- Order NOT guaranteed across partitions
SELECT my_agg(value) FROM table; -- undefined row order
Error Handling¶
UDA error behavior follows fail-fast semantics:
| Error Source | Behavior | Recovery |
|---|---|---|
| SFUNC throws exception | Query fails immediately | No partial results returned |
| FINALFUNC throws exception | Query fails after aggregation | Aggregated state is lost |
| Timeout during SFUNC | Query fails | Partial state discarded |
| Type conversion error | Query fails | Fix function or input data |
No Partial Aggregation Recovery
When any error occurs during UDA execution:
- The entire query fails
- No partial aggregation results are returned
- All accumulated state is discarded
- The error propagates to the client
Mitigation: Implement defensive programming in SFUNC/FINALFUNC with try-catch blocks that return safe default values rather than throwing exceptions.
CREATE AGGREGATE¶
Create a user-defined aggregate function.
Synopsis¶
CREATE [ OR REPLACE ] AGGREGATE [ IF NOT EXISTS ]
[ *keyspace_name*. ] *aggregate_name*
( [ *arg_type* [, *arg_type* ... ] ] )
SFUNC *state_function*
STYPE *state_type*
[ FINALFUNC *final_function* ]
[ INITCOND *initial_condition* ]
Description¶
CREATE AGGREGATE defines a UDA composed of user-defined functions. The state function and optional final function must be created before the aggregate.
Parameters¶
OR REPLACE¶
Replace existing aggregate with same signature.
IF NOT EXISTS¶
Prevent error if aggregate already exists.
aggregate_name¶
Identifier for the aggregate. Aggregates can be overloaded like functions.
Argument Types¶
Input types matching the state function's input parameters (after the state parameter):
-- Aggregate taking INT values
CREATE AGGREGATE my_sum(INT) ...
-- Aggregate taking multiple arguments
CREATE AGGREGATE weighted_avg(DOUBLE, DOUBLE) ...
SFUNC¶
The state function called for each row. Must:
- Accept state type as first parameter
- Accept aggregate argument types as subsequent parameters
- Return the state type
-- State function signature for my_sum(INT)
CREATE FUNCTION sum_state(state INT, val INT)
RETURNS INT ...
STYPE¶
Data type for the accumulated state. Can be:
- Native types (
INT,BIGINT,DOUBLE, etc.) - Tuples (for multi-value state)
- User-defined types
- Collections
-- Simple state
STYPE INT
-- Tuple state (for average: sum and count)
STYPE TUPLE<BIGINT, BIGINT>
-- UDT state
STYPE FROZEN<stats_accumulator>
FINALFUNC¶
Optional function to transform final state into result:
- Takes state type as input
- Returns the aggregate's result type
-- Final function for average
CREATE FUNCTION avg_final(state TUPLE<BIGINT, BIGINT>)
RETURNS DOUBLE ...
If not specified, the final state is returned directly.
INITCOND¶
Initial value for the state. Format depends on state type:
-- Scalar initial condition
INITCOND 0
-- Tuple initial condition
INITCOND (0, 0)
-- Collection initial condition
INITCOND []
-- UDT initial condition
INITCOND {field1: 0, field2: 0}
INITCOND Importance
Without INITCOND:
- State starts as null
- First row must handle null state
- Empty result sets return null
With INITCOND:
- State has defined starting value
- Empty result sets return FINALFUNC(INITCOND) or INITCOND
Examples¶
Simple Sum Aggregate¶
-- State function
CREATE FUNCTION sum_state(state BIGINT, val INT)
CALLED ON NULL INPUT
RETURNS BIGINT
LANGUAGE java
AS '
if (val == null) return state;
if (state == null) return (long) val;
return state + val;
';
-- Aggregate
CREATE AGGREGATE my_sum(INT)
SFUNC sum_state
STYPE BIGINT
INITCOND 0;
-- Usage
SELECT my_sum(quantity) FROM orders WHERE customer_id = ?;
Average Aggregate¶
-- State function (accumulates sum and count)
CREATE FUNCTION avg_state(state TUPLE<BIGINT, BIGINT>, val DOUBLE)
CALLED ON NULL INPUT
RETURNS TUPLE<BIGINT, BIGINT>
LANGUAGE java
AS '
if (val == null) return state;
long sum = state.getLong(0) + val.longValue();
long count = state.getLong(1) + 1;
return new com.datastax.driver.core.TupleValue(
com.datastax.driver.core.TupleType.of(
com.datastax.driver.core.DataType.bigint(),
com.datastax.driver.core.DataType.bigint()
)
).setLong(0, sum).setLong(1, count);
';
-- Final function (computes average)
CREATE FUNCTION avg_final(state TUPLE<BIGINT, BIGINT>)
RETURNS NULL ON NULL INPUT
RETURNS DOUBLE
LANGUAGE java
AS '
long count = state.getLong(1);
if (count == 0) return null;
return (double) state.getLong(0) / count;
';
-- Aggregate
CREATE AGGREGATE my_avg(DOUBLE)
SFUNC avg_state
STYPE TUPLE<BIGINT, BIGINT>
FINALFUNC avg_final
INITCOND (0, 0);
-- Usage
SELECT my_avg(price) FROM products WHERE category = ?;
Weighted Average¶
-- State function
CREATE FUNCTION weighted_avg_state(
state TUPLE<DOUBLE, DOUBLE>,
value DOUBLE,
weight DOUBLE
)
CALLED ON NULL INPUT
RETURNS TUPLE<DOUBLE, DOUBLE>
LANGUAGE java
AS '
if (value == null || weight == null) return state;
double sum = state.getDouble(0) + (value * weight);
double totalWeight = state.getDouble(1) + weight;
return state.getType().newValue()
.setDouble(0, sum)
.setDouble(1, totalWeight);
';
-- Final function
CREATE FUNCTION weighted_avg_final(state TUPLE<DOUBLE, DOUBLE>)
RETURNS NULL ON NULL INPUT
RETURNS DOUBLE
LANGUAGE java
AS '
double totalWeight = state.getDouble(1);
if (totalWeight == 0.0) return null;
return state.getDouble(0) / totalWeight;
';
-- Aggregate
CREATE AGGREGATE weighted_avg(DOUBLE, DOUBLE)
SFUNC weighted_avg_state
STYPE TUPLE<DOUBLE, DOUBLE>
FINALFUNC weighted_avg_final
INITCOND (0.0, 0.0);
-- Usage
SELECT weighted_avg(score, credit_hours) FROM grades
WHERE student_id = ?;
String Concatenation Aggregate¶
-- State function
CREATE FUNCTION concat_state(state TEXT, val TEXT, delimiter TEXT)
CALLED ON NULL INPUT
RETURNS TEXT
LANGUAGE java
AS '
if (val == null) return state;
if (state == null || state.isEmpty()) return val;
return state + delimiter + val;
';
-- Aggregate
CREATE AGGREGATE group_concat(TEXT, TEXT)
SFUNC concat_state
STYPE TEXT
INITCOND '';
-- Usage
SELECT group_concat(tag, ', ') FROM items WHERE category = ?;
Min/Max with Metadata¶
-- State type to track min value and its metadata
CREATE TYPE min_with_id (
min_value DOUBLE,
id UUID
);
-- State function
CREATE FUNCTION min_state(state FROZEN<min_with_id>, value DOUBLE, id UUID)
CALLED ON NULL INPUT
RETURNS FROZEN<min_with_id>
LANGUAGE java
AS '
if (value == null) return state;
if (state.getDouble("min_value") == null ||
value < state.getDouble("min_value")) {
return state.getType().newValue()
.setDouble("min_value", value)
.setUUID("id", id);
}
return state;
';
-- Aggregate returns the UDT
CREATE AGGREGATE min_with_metadata(DOUBLE, UUID)
SFUNC min_state
STYPE FROZEN<min_with_id>;
-- Usage
SELECT min_with_metadata(price, product_id) FROM products;
Count Distinct Approximation¶
-- Using a set to track unique values (limited scalability)
CREATE FUNCTION count_distinct_state(state SET<TEXT>, val TEXT)
CALLED ON NULL INPUT
RETURNS SET<TEXT>
LANGUAGE java
AS '
if (val == null) return state;
java.util.Set<String> result = new java.util.HashSet<>(state);
result.add(val);
return result;
';
CREATE FUNCTION count_distinct_final(state SET<TEXT>)
RETURNS NULL ON NULL INPUT
RETURNS INT
LANGUAGE java
AS 'return state.size();';
CREATE AGGREGATE count_distinct(TEXT)
SFUNC count_distinct_state
STYPE SET<TEXT>
FINALFUNC count_distinct_final
INITCOND {};
-- Usage (caution: memory intensive for high cardinality)
SELECT count_distinct(category) FROM products;
Restrictions¶
Restrictions
Component Requirements:
- SFUNC must exist before creating aggregate
- SFUNC must accept STYPE as first parameter
- SFUNC must return STYPE
- FINALFUNC (if specified) must accept STYPE
Type Restrictions:
- Counter columns not supported
- State type must be serializable
Execution Restrictions:
- Aggregates execute on coordinator only
- Memory bounded by function limits
- Cannot aggregate across partitions without ALLOW FILTERING
Performance Considerations
- Aggregates process all matching rows
- Large result sets consume coordinator memory
- Collection state types grow with data volume
- Consider pre-aggregating for large datasets
Notes¶
- Aggregates stored in
system_schema.aggregates - View with
DESCRIBE AGGREGATE - Built-in aggregates (COUNT, SUM, AVG) are more efficient than UDAs
- Test with representative data volumes before production use
DROP AGGREGATE¶
Remove a user-defined aggregate.
Synopsis¶
DROP AGGREGATE [ IF EXISTS ] [ *keyspace_name*. ] *aggregate_name*
[ ( [ *arg_type* [, *arg_type* ... ] ] ) ]
Description¶
DROP AGGREGATE removes a UDA. Specify argument types if multiple overloads exist.
Parameters¶
IF EXISTS¶
Prevent error if aggregate doesn't exist.
Argument Types¶
Identify specific overload when multiple exist:
-- If aggregate has overloads
DROP AGGREGATE my_agg(INT);
DROP AGGREGATE my_agg(DOUBLE);
Examples¶
-- Drop simple aggregate
DROP AGGREGATE my_sum;
-- Drop with keyspace
DROP AGGREGATE my_keyspace.weighted_avg;
-- Drop specific overload
DROP AGGREGATE my_avg(DOUBLE);
-- Safe drop
DROP AGGREGATE IF EXISTS temp_aggregate;
Restrictions¶
Restrictions
- Cannot drop aggregate while queries using it are running
- Dropping aggregate does not drop underlying functions
- Requires DROP permission
Finding Aggregates¶
-- List aggregates in keyspace
SELECT aggregate_name, argument_types, state_type, state_func
FROM system_schema.aggregates
WHERE keyspace_name = 'my_keyspace';
-- Describe aggregate
DESCRIBE AGGREGATE my_keyspace.my_sum;
Best Practices¶
When to Use UDAs¶
Good Use Cases
- Custom statistical calculations
- Domain-specific aggregations
- Multi-value aggregations (returning tuples/UDTs)
- Aggregations with complex accumulation logic
When to Avoid UDAs¶
Avoid When
- Built-in aggregates suffice (COUNT, SUM, AVG, MIN, MAX)
- Aggregating across entire table (too many rows)
- High-frequency queries (UDA overhead significant)
- Memory-intensive state types on large datasets
Design Guidelines¶
-
Choose appropriate state type
- Simple types for simple aggregations
- Tuples for multi-value accumulation
- Avoid unbounded collections
-
Handle null state correctly
- Use INITCOND when possible
- Make SFUNC null-safe
-
Test with realistic volumes
- Profile memory usage
- Test with expected result set sizes
-
Consider pre-aggregation
- Maintain aggregated tables for common queries
- Update aggregates via application logic
Performance and Safety¶
UDAs have significant performance implications that must be considered:
| Concern | Impact | Mitigation |
|---|---|---|
| Coordinator memory | All rows streamed to single node | Limit result sets with WHERE clauses |
| State size | Large state consumes heap | Keep state minimal; avoid unbounded collections |
| SFUNC complexity | Called once per row | Keep operations O(1); avoid heavy computation |
| FINALFUNC complexity | Called once at end | Acceptable to be more complex than SFUNC |
| Network transfer | All data flows to coordinator | Pre-filter data; consider materialized views |
Performance Best Practices
Keep state small:
- Prefer primitive types over collections
- Use fixed-size tuples instead of growing lists
- Accumulate only what's necessary for final computation
Keep operations lightweight:
- Avoid object allocation in SFUNC when possible
- No I/O operations (blocked by sandbox anyway)
- No complex string manipulation per row
- Pre-compute values that don't change
Example - Efficient vs Inefficient:
// INEFFICIENT: Creates new ArrayList every call
List<Integer> result = new ArrayList<>(state);
result.add(val);
return result;
// EFFICIENT: Accumulate only sum and count
return state + val; // For simple sum
Capacity Planning
Estimate coordinator memory requirements:
Memory ≈ (rows × state_size) + (rows × row_size_during_transfer)
For 1 million rows with 100-byte state: ~100MB minimum coordinator heap required.
Cleanup: Dropping Aggregate and Functions¶
-- Order matters: drop aggregate before its functions
DROP AGGREGATE IF EXISTS my_avg;
DROP FUNCTION IF EXISTS avg_final;
DROP FUNCTION IF EXISTS avg_state;
Related Documentation¶
- CREATE FUNCTION - User-defined functions for aggregates
- Functions Reference - Built-in aggregate functions
- SELECT - Using aggregates in queries