用 Python 编写一个composition函数
Composition 函数(简称函数)是模板化 Crossplane 资源的自定义程序。 当你创建复合资源 (XR) 时,Crossplane 会调用 Composition 函数来决定它应该创建哪些资源。阅读 concepts 页面了解更多有关 Composition 函数的信息。
您可以使用通用编程语言为模板资源编写函数。 使用通用编程语言可以让函数对模板资源使用高级逻辑,如循环和条件。 本指南介绍如何在 Python 中编写 Composition 函数。
了解步骤
本指南介绍为
Composition 资源 (XR) 的composition函数。
1apiVersion: example.crossplane.io/v1
2kind: XBuckets
3metadata:
4 name: example-buckets
5spec:
6 region: us-east-2
7 names:
8 - crossplane-functions-example-a
9 - crossplane-functions-example-b
10 - crossplane-functions-example-c
一个 XBuckets
XR 有一个区域和一个桶名数组。 该函数将为名称数组中的每个条目创建一个亚马逊网络服务(AWS)S3 桶。
用 Python 写一个函数
1.安装编写函数所需的工具 2.从模板初始化函数 3.编辑模板,添加函数逻辑 4.测试端到端函数 5.构建函数并将其推送到软件包注册库
本指南将详细介绍每个步骤。
安装编写函数所需的工具
要在 Python 中编写一个函数,您需要
- Python v3.11。
- Hatch,一个 Python 构建工具。本指南被引用 v1.7。
- Docker Engine。本指南被引用 Engine v24。
- Crossplane CLI](https://docs.crossplane.io/latest/cli) v1.14 或更新版本。本指南被引用的是 Crossplane CLI v1.14。
从模板初始化函数
使用 “crossplane beta xpkg init “命令初始化一个新函数。运行该命令时,它会以一个 GitHub 仓库为模板初始化你的函数。
1crossplane beta xpkg init function-xbuckets https://github.com/crossplane/function-template-python -d function-xbuckets
2Initialized package "function-xbuckets" in directory "/home/negz/control/negz/function-xbuckets" from https://github.com/crossplane/function-template-python/tree/bfed6923ab4c8e7adeed70f41138645fc7d38111 (main)
使用 crossplane beta init xpkg
命令会创建一个名为 function-xbuckets
的目录。 运行该命令后,新目录应如下所示:
1ls function-xbuckets
2Dockerfile example/ function/ LICENSE package/ pyproject.toml README.md renovate.json tests/
您的函数代码位于 function
目录中:
function/fn.py
文件是添加函数代码的地方。 了解模板中的其他一些文件很有用:
function/main.py
运行函数。你不需要编辑main.py
。Dockerfile
运行函数。不需要编辑Dockerfile
。- package` 目录包含被引用用于构建函数包的元数据。
在 Crossplane CLI v1.14 中,“crossplane beta xpkg init “只是克隆了一个模板 GitHub 仓库。 未来的 CLI 发布将自动执行用新函数名称替换模板名称等任务。 详情请参见 Crossplane 问题 #4941。
在开始添加代码之前,编辑 package/crossplane.yaml
更改软件包名称。 将软件包命名为 function-xbuckets
。
package/input 目录定义了函数输入的 OpenAPI 模式。 本指南中的函数不接受输入。 删除 package/input
目录。
composition函数](https://docs.crossplane.io/latest/concepts/composition-functions) 文档解释了composition函数的输入。
如果您正在编写一个被引用的函数,请编辑输入 YAML 文件以满足您的函数要求。
更改输入的种类和 API group。 不要使用 “Input “和 “template.fn.crossplane.io”,而应使用对函数有意义的名称。
编辑模板,添加函数逻辑
您可以在
方法中添加您的函数逻辑。 首次打开该文件时,它包含一个 “hello world “函数。
1async def RunFunction(self, req: fnv1beta1.RunFunctionRequest, _: grpc.aio.ServicerContext) -> fnv1beta1.RunFunctionResponse:
2 log = self.log.bind(tag=req.meta.tag)
3 log.info("Running function")
4
5 rsp = response.to(req)
6
7 example = ""
8 if "example" in req.input:
9 example = req.input["example"]
10
11 # TODO: Add your function logic here!
12 response.normal(rsp, f"I was run with input {example}!")
13 log.info("I was run!", input=example)
14
15 return rsp
所有 Python Composition 函数都有一个 “RunFunction “方法。 crossplane 会在一个
对象中传递函数运行所需的一切。
该函数通过返回一个
对象。
编辑 RunFunction
方法,将其替换为以下代码。
1async def RunFunction(self, req: fnv1beta1.RunFunctionRequest, _: grpc.aio.ServicerContext) -> fnv1beta1.RunFunctionResponse:
2 log = self.log.bind(tag=req.meta.tag)
3 log.info("Running function")
4
5 rsp = response.to(req)
6
7 region = req.observed.composite.resource["spec"]["region"]
8 names = req.observed.composite.resource["spec"]["names"]
9
10 for name in names:
11 rsp.desired.resources[f"xbuckets-{name}"].resource.update(
12 {
13 "apiVersion": "s3.aws.upbound.io/v1beta1",
14 "kind": "Bucket",
15 "metadata": {
16 "annotations": {
17 "crossplane.io/external-name": name,
18 },
19 },
20 "spec": {
21 "forProvider": {
22 "region": region,
23 },
24 },
25 }
26 )
27
28 log.info("Added desired buckets", region=region, count=len(names))
29
30 return rsp
展开下面的代码块以查看完整的 fn.py
,包括导入和解释函数逻辑的注释。
1"""A Crossplane composition function."""
2
3import grpc
4from crossplane.function import logging, response
5from crossplane.function.proto.v1beta1 import run_function_pb2 as fnv1beta1
6from crossplane.function.proto.v1beta1 import run_function_pb2_grpc as grpcv1beta1
7
8class FunctionRunner(grpcv1beta1.FunctionRunnerService):
9 """A FunctionRunner handles gRPC RunFunctionRequests."""
10
11 def __init__(self):
12 """Create a new FunctionRunner."""
13 self.log = logging.get_logger()
14
15 async def RunFunction(
16 self, req: fnv1beta1.RunFunctionRequest, _: grpc.aio.ServicerContext
17 ) -> fnv1beta1.RunFunctionResponse:
18 """Run the function."""
19 # Create a logger for this request.
20 log = self.log.bind(tag=req.meta.tag)
21 log.info("Running function")
22
23 # Create a response to the request. This copies the desired state and
24 # pipeline context from the request to the response.
25 rsp = response.to(req)
26
27 # Get the region and a list of bucket names from the observed composite
28 # resource (XR). Crossplane represents resources using the Struct
29 # well-known protobuf type. The Struct Python object can be accessed
30 # like a dictionary.
31 region = req.observed.composite.resource["spec"]["region"]
32 names = req.observed.composite.resource["spec"]["names"]
33
34 # Add a desired S3 bucket for each name.
35 for name in names:
36 # Crossplane represents desired composed resources using a protobuf
37 # map of messages. This works a little like a Python defaultdict.
38 # Instead of assigning to a new key in the dict-like map, you access
39 # the key and mutate its value as if it did exist.
40 #
41 # The below code works because accessing the xbuckets-{name} key
42 # automatically creates a new, empty fnv1beta1.Resource message. The
43 # Resource message has a resource field containing an empty Struct
44 # object that can be populated from a dictionary by calling update.
45 #
46 # https://protobuf.dev/reference/python/python-generated/#map-fields
47 rsp.desired.resources[f"xbuckets-{name}"].resource.update(
48 {
49 "apiVersion": "s3.aws.upbound.io/v1beta1",
50 "kind": "Bucket",
51 "metadata": {
52 "annotations": {
53 "crossplane.io/external-name": name,
54 },
55 },
56 "spec": {
57 "forProvider": {
58 "region": region,
59 },
60 },
61 }
62 )
63
64 # Log what the function did. This will only appear in the function's pod
65 # logs. A function can use response.normal() and response.warning() to
66 # emit Kubernetes events associated with the XR it's operating on.
67 log.info("Added desired buckets", region=region, count=len(names))
68
69 return rsp
此代码
1.从 RunFunctionRequest
获取观察到的 Composition 资源。
2.从观察到的 Composition 资源中获取区域和水桶名称。
3.为每个桶名添加一个所需的 S3 桶。
4.在 “RunFunctionResponse “中返回所需的 S3 存储桶。
crossplane 提供了一个 软件开发工具包 (SDK),用于用 Python 编写 Composition 函数。本函数被引用了 SDK 中的实用工具。
Python SDK 会根据 Protocol Buffers 模式自动生成 RunFunctionRequest
和 RunFunctionResponse
Python 对象。您可以在 Buf Schema Registry 中查看该模式。
生成的 Python 对象的字段与内置的 Python 类型(如字典和列表)的行为类似。 请注意,它们之间存在一些差异。
值得注意的是,您可以像访问字典一样访问观察到的资源和所需资源的映射,但不能通过为映射键赋值来添加新的所需资源。 相反,访问和 mutation 映射键就好像它已经存在一样。
而不是像这样添加一个新资源:
1resource = {"apiVersion": "example.org/v1", "kind": "Composed", ...}
2rsp.desired.resources["new-resource"] = fnv1beta1.Resource(resource=resource)
假装它已经存在,然后对它进行 mutation,就像这样:
1resource = {"apiVersion": "example.org/v1", "kind": "Composed", ...}
2rsp.desired.resources["new-resource"].resource.update(resource)
更多详情,请参阅协议缓冲区 Python 生成代码指南。
测试端到端功能
通过添加单元测试和被引用 “crossplane beta render “命令来测试你的函数。
当你从模板初始化一个函数时,它会在 tests/test_fn.py
中添加一些单元测试。这些测试使用 Python 标准库中的 unittest
模块。
要添加测试用例,请更新 test_run_function
中的 cases
列表。 展开下面的代码块,查看函数的完整 tests/test_fn.py
文件。
1import dataclasses
2import unittest
3
4from crossplane.function import logging, resource
5from crossplane.function.proto.v1beta1 import run_function_pb2 as fnv1beta1
6from google.protobuf import duration_pb2 as durationpb
7from google.protobuf import json_format
8from google.protobuf import struct_pb2 as structpb
9
10from function import fn
11
12class TestFunctionRunner(unittest.IsolatedAsyncioTestCase):
13 def setUp(self) -> None:
14 logging.configure(level=logging.Level.DISABLED)
15 self.maxDiff = 2000
16
17 async def test_run_function(self) -> None:
18 @dataclasses.dataclass
19 class TestCase:
20 reason: str
21 req: fnv1beta1.RunFunctionRequest
22 want: fnv1beta1.RunFunctionResponse
23
24 cases = [
25 TestCase(
26 reason="The function should compose two S3 buckets.",
27 req=fnv1beta1.RunFunctionRequest(
28 observed=fnv1beta1.State(
29 composite=fnv1beta1.Resource(
30 resource=resource.dict_to_struct(
31 {
32 "apiVersion": "example.crossplane.io/v1alpha1",
33 "kind": "XBuckets",
34 "metadata": {"name": "test"},
35 "spec": {
36 "region": "us-east-2",
37 "names": ["test-bucket-a", "test-bucket-b"],
38 },
39 }
40 )
41 )
42 )
43 ),
44 want=fnv1beta1.RunFunctionResponse(
45 meta=fnv1beta1.ResponseMeta(ttl=durationpb.Duration(seconds=60)),
46 desired=fnv1beta1.State(
47 resources={
48 "xbuckets-test-bucket-a": fnv1beta1.Resource(
49 resource=resource.dict_to_struct(
50 {
51 "apiVersion": "s3.aws.upbound.io/v1beta1",
52 "kind": "Bucket",
53 "metadata": {
54 "annotations": {
55 "crossplane.io/external-name": "test-bucket-a"
56 },
57 },
58 "spec": {
59 "forProvider": {"region": "us-east-2"}
60 },
61 }
62 )
63 ),
64 "xbuckets-test-bucket-b": fnv1beta1.Resource(
65 resource=resource.dict_to_struct(
66 {
67 "apiVersion": "s3.aws.upbound.io/v1beta1",
68 "kind": "Bucket",
69 "metadata": {
70 "annotations": {
71 "crossplane.io/external-name": "test-bucket-b"
72 },
73 },
74 "spec": {
75 "forProvider": {"region": "us-east-2"}
76 },
77 }
78 )
79 ),
80 },
81 ),
82 context=structpb.Struct(),
83 ),
84 ),
85 ]
86
87 runner = fn.FunctionRunner()
88
89 for case in cases:
90 got = await runner.RunFunction(case.req, None)
91 self.assertEqual(
92 json_format.MessageToDict(got),
93 json_format.MessageToDict(case.want),
94 "-want, +got",
95 )
96
97if __name__ == "__main__":
98 unittest.main()
使用 hatch run
运行单元测试:
1hatch run test:unit
2.
3----------------------------------------------------------------------
4Ran 1 test in 0.003s
5
6OK
virtualenv
或 venv
。hatch run
命令会创建一个虚拟环境,并在该环境中运行命令。
您可以使用 Crossplane CLI 预览被引用此功能的 Composition 的 Output,不需要使用 Crossplane 控制平面就能完成此操作。
在 function-xbuckets
下创建名为 example
的目录,并创建 Composite Resource、Composition 和 Function YAML 文件。
展开以下区块,查看示例文件。
您可以使用这些文件,通过运行 crossplane beta render
重现下面的输出结果。
XR.yaml` 文件包含要渲染的 Composition 资源:
1apiVersion: example.crossplane.io/v1
2kind: XBuckets
3metadata:
4 name: example-buckets
5spec:
6 region: us-east-2
7 names:
8 - crossplane-functions-example-a
9 - crossplane-functions-example-b
10 - crossplane-functions-example-c
composition.yaml` 文件包含用于渲染复合资源的 Composition:
1apiVersion: apiextensions.crossplane.io/v1
2kind: Composition
3metadata:
4 name: create-buckets
5spec:
6 compositeTypeRef:
7 apiVersion: example.crossplane.io/v1
8 kind: XBuckets
9 mode: Pipeline
10 pipeline:
11 - step: create-buckets
12 functionRef:
13 name: function-xbuckets
functions.yaml` 文件包含 Composition 在其 Pipelines 步骤中引用的函数:
1apiVersion: pkg.crossplane.io/v1beta1
2kind: Function
3metadata:
4 name: function-xbuckets
5 annotations:
6 render.crossplane.io/runtime: Development
7spec:
8 # The CLI ignores this package when using the Development runtime.
9 # You can set it to any value.
10 package: xpkg.upbound.io/negz/function-xbuckets:v0.1.0
functions.yaml中的函数被引用为
crossplane beta render` 您的函数正在本地运行。 它会连接到您本地运行的函数,而不是被引用 Docker 来拉动和运行函数。
运行时。 这会告诉
1apiVersion: pkg.crossplane.io/v1beta1
2kind: Function
3metadata:
4 name: function-xbuckets
5 annotations:
6 render.crossplane.io/runtime: Development
使用 hatch run development
在本地运行您的函数。
1hatch run development
hatch run development
在不进行加密或身份验证的情况下运行函数。 仅在测试和开发过程中使用。
在另一个终端中,运行 crossplane beta render
。
1crossplane beta render xr.yaml composition.yaml functions.yaml
该命令调用你的函数。 在运行函数的终端中,现在应该可以看到 logging 输出:
1hatch run development
22024-01-11T22:12:58.153572Z [info ] Running function filename=fn.py lineno=22 tag=
32024-01-11T22:12:58.153792Z [info ] Added desired buckets count=3 filename=fn.py lineno=68 region=us-east-2 tag=
crossplane beta render` 命令会打印函数返回的所需资源。
1---
2apiVersion: example.crossplane.io/v1
3kind: XBuckets
4metadata:
5 name: example-buckets
6---
7apiVersion: s3.aws.upbound.io/v1beta1
8kind: Bucket
9metadata:
10 annotations:
11 crossplane.io/composition-resource-name: xbuckets-crossplane-functions-example-b
12 crossplane.io/external-name: crossplane-functions-example-b
13 generateName: example-buckets-
14 labels:
15 crossplane.io/composite: example-buckets
16 ownerReferences:
17 # Omitted for brevity
18spec:
19 forProvider:
20 region: us-east-2
21---
22apiVersion: s3.aws.upbound.io/v1beta1
23kind: Bucket
24metadata:
25 annotations:
26 crossplane.io/composition-resource-name: xbuckets-crossplane-functions-example-c
27 crossplane.io/external-name: crossplane-functions-example-c
28 generateName: example-buckets-
29 labels:
30 crossplane.io/composite: example-buckets
31 ownerReferences:
32 # Omitted for brevity
33spec:
34 forProvider:
35 region: us-east-2
36---
37apiVersion: s3.aws.upbound.io/v1beta1
38kind: Bucket
39metadata:
40 annotations:
41 crossplane.io/composition-resource-name: xbuckets-crossplane-functions-example-a
42 crossplane.io/external-name: crossplane-functions-example-a
43 generateName: example-buckets-
44 labels:
45 crossplane.io/composite: example-buckets
46 ownerReferences:
47 # Omitted for brevity
48spec:
49 forProvider:
50 region: us-east-2
构建函数并将其推送至 packages 注册表
构建函数分为两个阶段: 首先是构建函数的运行时,这是 Crossplane 用来运行函数的开放容器倡议(OCI)镜像。 然后将运行时嵌入软件包,并将其推送到软件包注册中心。 Crossplane CLI 将 xpkg.upbound.io
作为默认的软件包注册中心。
一个函数默认支持单个平台,如 “linux/amd64”,您可以为每个平台构建运行时和软件包,然后将所有软件包推送到注册表中的单个标签,从而支持多个平台。
将您的函数推送到 registry,就可以在 crossplane 控制平面中使用您的函数。请参阅Composition functions documentation。了解如何在控制平面中使用函数。
使用 docker 为每个平台构建运行时。
1docker build . --quiet --platform=linux/amd64 --tag runtime-amd64
2sha256:fdf40374cc6f0b46191499fbc1dbbb05ddb76aca854f69f2912e580cfe624b4b
1docker build . --quiet --platform=linux/arm64 --tag runtime-arm64
2sha256:cb015ceabf46d2a55ccaeebb11db5659a2fb5e93de36713364efcf6d699069af
crossplane xpkg build
嵌入什么运行时。
binfmt
。有关说明,请参阅 Docker 文档。
使用 Crossplane CLI 为每个平台构建一个软件包。 每个软件包都嵌入了一个运行时镜像。
……。
flag 指定了包含 crossplane.yaml
的 package
目录。 其中包括软件包的元数据。
……。
flag 指定了被引用 Docker 构建的运行时镜像标签。
在
标志指定将软件包文件写入磁盘的位置。 crossplane 软件包文件的扩展名为 .xpkg
。
1crossplane xpkg build \
2 --package-root=package \
3 --embed-runtime-image=runtime-amd64 \
4 --package-file=function-amd64.xpkg
1crossplane xpkg build \
2 --package-root=package \
3 --embed-runtime-image=runtime-arm64 \
4 --package-file=function-arm64.xpkg
将两个软件包文件都推送到注册表中。将两个文件都推送到注册表中的一个标签,就能创建一个多平台 软件包,在 linux/arm64
和 linux/amd64
主机上都能运行。
1crossplane xpkg push \
2 --package-files=function-amd64.xpkg,function-arm64.xpkg \
3 negz/function-xbuckets:v0.1.0
如果您将函数推送到 GitHub 仓库,模板会使用 GitHub Actions 自动设置持续集成 (CI)。CI 工作流将对您的函数进行校验、测试和构建。您可以通过阅读 .github/workflows/ci.yaml
,查看模板是如何配置 CI 的。
CI 工作流可以自动将软件包推送到 xpkg.upbound.io
。要做到这一点,您必须在 https://marketplace.upbound.io 创建一个版本库。通过创建一个 API 令牌并将其添加到您的版本库,赋予 CI 工作流向市场推送的权限。将您的 API 令牌访问 ID 保存为名为 XPKG_ACCESS_ID
的secret,并将您的 API 令牌保存为名为 XPKG_TOKEN
的secret。