Skip to main content

Counter

A thread-safe counter for tracking numeric values — message counts, totals, or any incrementing/decrementing metric. Obtained through metrics.getCounter().

Counters support method chaining for fluent updates.


At a Glance

// Get or create a counter
const processed = metrics.getCounter('orders.processed');
const failed = metrics.getCounter('orders.failed');

// Increment
processed.increment();
processed.increment(5);

// Decrement
failed.decrement();

// Read value
stream.logInfo(`Processed: ${processed.count}`);

Properties

PropertyTypeDescription
countnumberCurrent counter value
const counter = metrics.getCounter('my.counter');
stream.logInfo(counter.count); // 0 (initially)

Methods

increment(value?)

Increases the counter by 1 or the specified amount.

ParameterTypeDescription
valuenumber (optional)Amount to add — defaults to 1

Returns: Counter (supports chaining)

const c = metrics.getCounter('events');

c.increment(); // +1
c.increment(5); // +5
c.increment(2).increment(3); // chained: +5 total

decrement(value?)

Decreases the counter by 1 or the specified amount.

ParameterTypeDescription
valuenumber (optional)Amount to subtract — defaults to 1

Returns: Counter (supports chaining)

const c = metrics.getCounter('pending');

c.decrement(); // -1
c.decrement(3); // -3
c.decrement(2).decrement(1); // chained: -3 total

getCount()

Returns the current count. Same as reading count.

Returns: number

const total = counter.getCount();

Complete Example

let OUTPUT_PORT;
let ERROR_PORT;

export function onInit() {
OUTPUT_PORT = processor.getOutputPort('Output');
ERROR_PORT = processor.getOutputPort('Error');
}

export function onMessage() {
const processed = metrics.getCounter('stream.records.processed');
const errors = metrics.getCounter('stream.records.errors');

try {
validateRecord(message);
processed.increment();
stream.emit(message, OUTPUT_PORT);
} catch (err) {
errors.increment();
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'VALIDATION_FAILED'));
stream.emit(message, ERROR_PORT);
}
}

export function onStreamEnd() {
const processed = metrics.getCounter('stream.records.processed');
const errors = metrics.getCounter('stream.records.errors');

stream.logInfo(`Stream complete: ${processed.count} processed, ${errors.count} errors`);
}

See Also

  • Metrics — Create and manage counters