This documentation is for an unreleased version of Apache Flink Agents. We recommend you use the latest version.
Overview #
Flink Agents is an Agentic AI framework based on Apache Flink. By integrate agent with flink DataStream/Table, Flink Agents can leverage the powerful data processing ability of Flink.
From/To Flink DataStream API #
First of all, get the flink StreamExecutionEnvironment and flink-agents AgentsExecutionEnvironment.
# Set up the Flink streaming environment and the Agents execution environment.
env = StreamExecutionEnvironment.get_execution_environment()
agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
// Set up the Flink streaming environment and the Agents execution environment.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AgentsExecutionEnvironment agentsEnv =
AgentsExecutionEnvironment.getExecutionEnvironment(env);
Integrate the agent with input DataStream, and return the output DataStream can be consumed by downstream.
# create input datastream
input_stream = env.from_source(...)
# integrate agent with input datastream, and return output datastream
output_stream = (
agents_env.from_datastream(
input=input_stream, key_selector=lambda x: x.id
)
.apply(your_agent)
.to_datastream()
)
# consume agent output datastream
output_stream.print()
// create input datastream
DataStream<YourPojo> inputStream = env.fromSource(...);
// integrate agent with input datastream, and return output datastream
DataStream<Object> outputStream =
agentsEnv
.fromDataStream(inputStream, (KeySelector<YourPojo, String>) x::getId)
.apply(yourAgent)
.toDataStream();
// consume agent output datastream
outputStream.print();
The input DataStream must be KeyedStream, or user should provide KeySelector to tell how to convert the input DataStream to KeyedStream.
From/To Flink Table API #
First of all, get the flink StreamExecutionEnvironment, StreamTableEnvironment, and flink-agents AgentsExecutionEnvironment.
# Set up the Flink streaming environment and table environment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# Setup flink agents execution environment
agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, t_env=t_env)
// Set up the Flink streaming environment and table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Setup flink agents execution environment
AgentsExecutionEnvironment agentsEnv =
AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv);
Integrate the agent with input Table, and return the output Table can be consumed by downstream.
input_table = t_env.from_elements(...)
output_type = ExternalTypeInfo(RowTypeInfo(
[BasicTypeInfo.INT_TYPE_INFO()],
["result"],
))
schema = (Schema.new_builder().column("result", DataTypes.INT())).build()
output_table = (
agents_env.from_table(input=input_table, key_selector=MyKeySelector())
.apply(agent)
.to_table(schema=schema, output_type=output_type)
)
Table inputTable = tableEnv.fromValues(...);
// Here the output schema should always be a nested row, of which
// the f0 column is the expected row.
Schema outputSchema =
Schema.newBuilder()
.column("f0", DataTypes.ROW(DataTypes.FIELD("result", DataTypes.DOUBLE())))
.build();
Table outputTable =
agentsEnv
.fromTable(
inputTable,
myKeySelector)
.apply(agent)
.toTable(outputSchema);
User should provide KeySelector in from_table() to tell how to convert the input Table to KeyedStream internally. And provide Schema and TypeInformation in to_table() to tell the output Table schema.
Currently, user should provide bothSchemaandTypeInformationwhen callto_table(), we will support only provide one of them in the future.