Monitoring

Metric #

Built-in Metrics #

We offer data monitoring for built-in metrics, which includes events, actions, and token usage.

Event and Action Metrics #

Scope Metrics Description Type
Agent numOfEventProcessed The total number of Events this operator has processed. Count
Agent numOfEventProcessedPerSec The number of Events this operator has processed per second. Meter
Agent numOfActionsExecuted The total number of actions this operator has executed. Count
Agent numOfActionsExecutedPerSec The number of actions this operator has executed per second. Meter
Action <action_name>.numOfActionsExecuted The total number of actions this operator has executed for a specific action name. Count
Action <action_name>.numOfActionsExecutedPerSec The number of actions this operator has executed per second for a specific action name. Meter

Token Usage Metrics #

Token usage metrics are automatically recorded when chat models are invoked through ChatModelConnection. These metrics help track LLM API usage and costs.

Scope Metrics Description Type
Model <action_name>.<model_name>.promptTokens The total number of prompt tokens consumed by the model within an action. Count
Model <action_name>.<model_name>.completionTokens The total number of completion tokens generated by the model within an action. Count

How to add custom metrics #

In Flink Agents, users implement their logic by defining custom Actions that respond to various Events throughout the Agent lifecycle. To support user-defined metrics, we introduce two new properties: agent_metric_group and action_metric_group in the RunnerContext. These properties allow users to create or update global metrics and independent metrics for actions. For an introduction to metric types, please refer to the Metric types documentation.

Here is the user case example:

class MyAgent(Agent):
    @action(InputEvent)
    @staticmethod
    def first_action(event: Event, ctx: RunnerContext):  # noqa D102
        start_time = time.time_ns()

        # the action logic
        ...

        # Access the main agent metric group
        metrics = ctx.agent_metric_group

        # Update global metrics
        metrics.get_counter("numInputEvent").inc()
        metrics.get_meter("numInputEventPerSec").mark()

        # Access the per-action metric group
        action_metrics = ctx.action_metric_group
        action_metrics.get_histogram("actionLatencyMs") \
            .update(int(time.time_ns() - start_time) // 1000000)
public class MyAgent extends Agent {

    @Action(listenEvents = {InputEvent.class})
    public static void firstAction(InputEvent event, RunnerContext ctx) throws Exception {
        long startTime = System.currentTimeMillis();
        
        // the action logic
        ...
        
        FlinkAgentsMetricGroup metrics = ctx.getAgentMetricGroup();

        metrics.getCounter("numInputEvent").inc();
        metrics.getMeter("numInputEventPerSec").markEvent();

        FlinkAgentsMetricGroup actionMetrics = ctx.getActionMetricGroup();
        actionMetrics
                .getHistogram("actionLatencyMs")
                .update(System.currentTimeMillis() - startTime);
    }
}

Flink agents enable the reporting of metrics to external systems by creating a metric identifier prefix in the format <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>. Please refer to Flink Metric Reporters for more details.

Additionally, we can check the metric results in the Flink Job WebUI using the metric identifier prefix <subtask_index>.<operator_name>.

Metric Web UI

Log #

The Flink Agents’ log system uses Flink’s logging framework. For more details, please refer to the Flink log system documentation.

For adding logs in Java code, you can refer to Flink documentation. In Python, you can add logs using logging. Here is a specific example:

@action(InputEvent)
@staticmethod
def process_input(event: InputEvent, ctx: RunnerContext) -> None:
    logging.info("Processing input event: %s", event)
    # the action logic

We can check the log result in the WebUI of Flink Job:

Log Web UI

Event Log #

Currently, the system supports File-based Event Log as the default implementation. Future releases will introduce support for additional types of event logs and provide configuration options to let users choose their preferred logging mechanism.

File Event Log #

The File Event Log is a file-based event logging system that stores events in structured files within a flat directory.

By default, each event is recorded in JSON Lines (JSONL) format, with one JSON object per line. When prettyPrint is enabled, each event is written as formatted multi-line JSON instead, and the log file is no longer in valid JSONL format.

File Structure #

The log files follow a naming convention consistent with Flink’s logging standards and are stored in a flat directory structure:

{baseLogDir}/
├── events-{jobId}-{taskName}-{subtaskId}.log
├── events-{jobId}-{taskName}-{subtaskId}.log
└── events-{jobId}-{taskName}-{subtaskId}.log

By default, all File-based Event Logs are stored in the flink-agents subdirectory under the system temporary directory (java.io.tmpdir). You can override the base log directory with the agent.baseLogDir setting in Flink config.yaml.