Custom Operator Plugins #
The operator provides a customizable platform for Flink resource management. Users can develop plugins to tailor the operator behaviour to their own needs.
Custom Flink Resource Validators #
FlinkResourceValidator, an interface for validating the resources of FlinkDeployment and FlinkSessionJob, is a pluggable component based on the Plugins mechanism. During development, we can customize the implementation of FlinkResourceValidator and make sure to retain the service definition in META-INF/services.
The following steps demonstrate how to develop and use a custom validator.
-
Implement
FlinkResourceValidatorinterface:package org.apache.flink.kubernetes.operator.validation; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import java.util.Optional; /** Custom validator implementation of {@link FlinkResourceValidator}. */ public class CustomValidator implements FlinkResourceValidator { @Override public Optional<String> validateDeployment(FlinkDeployment deployment) { if (deployment.getSpec().getFlinkVersion() == null) { return Optional.of("Flink Version must be defined."); } return Optional.empty(); } @Override public Optional<String> validateSessionJob( FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) { if (sessionJob.getSpec().getJob() == null) { return Optional.of("The job spec should not be empty"); } return Optional.empty(); } } -
Create service definition file
org.apache.flink.kubernetes.operator.validation.FlinkResourceValidatorinMETA-INF/services. With customFlinkResourceValidatorimplementation, the service definition describes as follows:org.apache.flink.kubernetes.operator.validation.CustomValidator -
Use the Maven tool to package the project and generate the custom validator JAR.
-
Create Dockerfile to build a custom image from the
apache/flink-kubernetes-operatorofficial image and copy the generated JAR to custom validator plugin directory./opt/flink/pluginsis the value ofFLINK_PLUGINS_DIRenvironment variable in the flink-kubernetes-operator helm chart. The structure of custom validator directory under/opt/flink/pluginsis as follows:/opt/flink/plugins ├── custom-validator │ ├── custom-validator.jar └── ...With the custom validator directory location, the Dockerfile is defined as follows:
FROM apache/flink-kubernetes-operator ENV FLINK_PLUGINS_DIR=/opt/flink/plugins ENV CUSTOM_VALIDATOR_DIR=custom-validator RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR COPY custom-validator.jar $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR/ -
Install the flink-kubernetes-operator helm chart with the custom image and verify the
deploy/flink-kubernetes-operatorlog has:2022-05-04 14:01:46,551 o.a.f.k.o.u.FlinkUtils [INFO ] Discovered resource validator from plugin directory[/opt/flink/plugins]: org.apache.flink.kubernetes.operator.validation.CustomValidator.
Custom Flink Resource Listeners #
The Flink Kubernetes Operator allows users to listen to events and status updates triggered for the Flink Resources managed by the operator. This feature enables tighter integration with the user’s own data platform.
By implementing the FlinkResourceListener interface users can listen to both events and status updates per resource type (FlinkDeployment / FlinkSessionJob). These methods will be called after the respective events have been triggered by the system.
Using the context provided on each listener method users can also get access to the related Flink resource and the KubernetesClient itself in order to trigger any further events etc on demand.
Similar to custom validator implementations, resource listeners are loaded via the Flink Plugins mechanism.
In order to enable your custom FlinkResourceListener you need to:
- Implement the interface
- Add your listener class to
org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListenerinMETA-INF/services - Package your JAR and add it to the plugins directory of your operator image (
/opt/flink/plugins)
Additional Dependencies #
In some cases, users may need to add required dependencies onto the operator classpath.
When building the custom image, The additional dependencies shall be copied to /opt/flink/operator-lib with the environment variable: OPERATOR_LIB
That folder is added to classpath upon initialization.
FROM apache/flink-kubernetes-operator
ARG ARTIFACT1=custom-artifact1.jar
ARG ARTIFACT2=custom-artifact2.jar
COPY target/$ARTIFACT1 $OPERATOR_LIB
COPY target/$ARTIFACT2 $OPERATOR_LIB
Custom Flink Resource Mutators #
FlinkResourceMutator, an interface for mutating the resources of FlinkDeployment and FlinkSessionJob, is a pluggable component based on the Plugins mechanism. During development, we can customize the implementation of FlinkResourceMutator and make sure to retain the service definition in META-INF/services.
The following steps demonstrate how to develop and use a custom mutator.
- Implement
FlinkResourceMutatorinterface:package org.apache.flink.mutator; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import java.util.Optional; /** Custom Flink Mutator. */ public class CustomFlinkMutator implements FlinkResourceMutator { @Override public FlinkDeployment mutateDeployment(FlinkDeployment deployment) { return deployment; } @Override public FlinkSessionJob mutateSessionJob( FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) { return sessionJob; }
}
```
-
Create service definition file
org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutatorinMETA-INF/services. With customFlinkResourceMutatorimplementation, the service definition describes as follows:org.apache.flink.mutator.CustomFlinkMutator -
Use the Maven tool to package the project and generate the custom mutator JAR.
-
Create Dockerfile to build a custom image from the
apache/flink-kubernetes-operatorofficial image and copy the generated JAR to custom mutator plugin directory./opt/flink/pluginsis the value ofFLINK_PLUGINS_DIRenvironment variable in the flink-kubernetes-operator helm chart. The structure of custom mutator directory under/opt/flink/pluginsis as follows:/opt/flink/plugins ├── custom-mutator │ ├── custom-mutator.jar └── ...With the custom mutator directory location, the Dockerfile is defined as follows:
FROM apache/flink-kubernetes-operator ENV FLINK_PLUGINS_DIR=/opt/flink/plugins ENV CUSTOM_MUTATOR_DIR=custom-mutator RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_MUTATOR_DIR COPY custom-mutator.jar $FLINK_PLUGINS_DIR/$CUSTOM_MUTATOR_DIR/ -
Install the flink-kubernetes-operator helm chart with the custom image and verify the
deploy/flink-kubernetes-operatorlog has:2023-12-12 06:26:56,667 o.a.f.k.o.u.MutatorUtils [INFO ] Discovered mutator from plugin directory[/opt/flink/plugins]: org.apache.flink.mutator.CustomFlinkMutator.