Skip to main content

Counter


id: py-Counter

Counter

A thread-safe counter for tracking numeric values — message counts, totals, or any incrementing/decrementing metric. Obtained through metrics.getCounter().

Counters support method chaining for fluent updates.


At a Glance

# Get or create a counter
processed = metrics.getCounter('orders.processed')
failed = metrics.getCounter('orders.failed')

# Increment
processed.increment()
processed.increment(5)

# Decrement
failed.decrement()

# Read value
stream.logInfo(f"Processed: {processed.count}")

Properties

PropertyTypeDescription
countintCurrent counter value
counter = metrics.getCounter('my.counter')
stream.logInfo(counter.count) # 0 (initially)

Methods

increment(value)

Increases the counter by 1 or the specified amount.

ParameterTypeDescription
valueint (optional)Amount to add — defaults to 1

Returns: Counter (supports chaining)

c = metrics.getCounter('events')

c.increment() # +1
c.increment(5) # +5
c.increment(2).increment(3) # chained: +5 total

decrement(value)

Decreases the counter by 1 or the specified amount.

ParameterTypeDescription
valueint (optional)Amount to subtract — defaults to 1

Returns: Counter (supports chaining)

c = metrics.getCounter('pending')

c.decrement() # -1
c.decrement(3) # -3
c.decrement(2).decrement(1) # chained: -3 total

getCount()

Returns the current count. Same as reading count.

Returns: int

total = counter.getCount()

Complete Example

OUTPUT_PORT = None
ERROR_PORT = None

def on_init():
global OUTPUT_PORT, ERROR_PORT
OUTPUT_PORT = processor.getOutputPort('Output')
ERROR_PORT = processor.getOutputPort('Error')

def on_message():
processed = metrics.getCounter('stream.records.processed')
errors = metrics.getCounter('stream.records.errors')

try:
validate_record(message)
processed.increment()
stream.emit(message, OUTPUT_PORT)
except Exception as err:
errors.increment()
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'VALIDATION_FAILED'))
stream.emit(message, ERROR_PORT)

def on_stream_end():
processed = metrics.getCounter('stream.records.processed')
errors = metrics.getCounter('stream.records.errors')

stream.logInfo(f"Stream complete: {processed.count} processed, {errors.count} errors")

See Also

  • Metrics — Create and manage counters