DSL
# Endpoint DSL v0.21.0+
Allows dynamic creation and updating of different types of endpoints (Endpoint) through DSL, such as configuring: mqtt, http, ws, schedule, and other endpoint triggers, as well as configuring routes and processors.
The DSL configuration file for Endpoint
is a JSON-formatted file, structured as follows:
Field | Type | Required | Description |
---|---|---|---|
id | string | Yes | Endpoint ID |
name | string | No | Route |
type | string | Yes | Endpoint type, such as http, mqtt, ws, net, scheduler |
configuration | object | No | Endpoint configuration, different endpoint types have different configuration fields, please refer to the Endpoint component startup configuration for details. |
additionalInfo | object | No | Extended fields, used to save additional information. |
processor | string[] | No | Global Endpoint Processor list, interceptor names must be registered on the platform. |
routers | string[] | Yes | Router list. |
# router: Endpoint routing
Field | Type | Required | Description |
---|---|---|---|
id | string | Yes | Route ID |
params | string[] | No | Routing parameters, for example: HTTP Endpoint routing parameters are POST/GET/PUT... |
from | From | Yes | Routing source |
to | To | No | Routing target, if configured in the rule chain DSL, the routing target is the rule chain ID itself, no additional specification needed |
additionalInfo | object | No | Extended fields, used to save additional information. |
# router.from: Endpoint routing source
Field | Type | Required | Description |
---|---|---|---|
path | string | Yes | Routing source path, for example: http Endpoint /api/msg/:chainId scheduler Endpoint */1 * * * * * |
configuration | object | No | Routing source configuration, not required |
processor | string[] | Yes | Processor list,which processes the data from the routing source, interceptor names must be registered on the platform. |
# router.to: Endpoint routing target
Field | Type | Required | Description |
---|---|---|---|
path | string | Yes | Routing target path, can be an executing rule chain ID or a variable such as: ${chainId} |
configuration | object | No | Routing target configuration, not required |
wait | boolean | No | Whether to wait for message processing to complete, default is false. |
processor | From | Yes | Processor list, which is executed after the To executor has completed, interceptor names must be registered on the platform. |
router.to.path
supports specifying a node in a rule chain as the starting point, with the format:chainId:nodeId
, for example:b9ab7ada-97d0-4b73-8887-0cfca516a871:node_12
.router.to.path
supports the use ofmetadata
variables, for example:${chainId}
, which will be replaced with the corresponding value, such as:b9ab7ada-97d0-4b73-8887-0cfca516a871
.- If
router.to.path
is configured within the rule chain DSL, it can be left blank, which means it will route to the default start node of the current rule chain.
# Register Processors
You can build some processors internally and then call them by their name
in the DSL to perform operations such as data transformation and validation.
- Register the
From
processor throughprocessor.InBuiltins.Register
. - Register the
To
processor throughprocessor.OutBuiltins.Register
.
First, register the processor:
// Put http header into message metadata
processor.InBuiltins.Register("headersToMetadata", func(router endpoint.Router, exchange *endpoint.Exchange) bool {
msg := exchange.In.GetMsg()
headers := exchange.In.Headers()
for k := range headers {
msg.Metadata.PutValue(k, headers.Get(k))
}
// Return true to execute a processor or To operation
// Return true if the logic is completed and do not proceed further
return true
})
2
3
4
5
6
7
8
9
10
11
Then: Specify through "processors": ["responseToBody"], multiple can be specified, if the processor returns false, it will not proceed further.
# Example
{
"id": "e1",
"type": "http",
"name": "http server",
"configuration": {
"server": ":9090"
},
"routers": [
{
"id":"r1",
"params": [
"post"
],
"from": {
"path": "/api/v1/test/:chainId",
"configuration": {
}
},
"to": {
"path": "${chainId}",
"wait": true,
"processors": ["responseToBody"]
},
"additionalInfo": {
"aa":"aa"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
To start an Endpoint through DSL:
// Initialize the rule chain
ruleDsl, err := os.ReadFile(testRulesFolder + "/filter_node.json")
_, err = engine.New("test01", ruleDsl)
if err != nil {
t.Fatal(err)
}
// Read the endpoint DSL file
endpointBuf, err := os.ReadFile(testEndpointsFolder + "/http_01.json")
if err != nil {
t.Fatal(err)
}
// Initialize the endpoint
ep, err = endpoint.New("", endpointBuf, endpoint.DynamicEndpointOptions.WithConfig(config))
if err != nil {
t.Fatal(err)
}
// Start the endpoint
err = ep.Start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
This DSL starts an HTTP service on port 9090 and provides the /api/v1/test/:chainId API interface. The processing logic is handed over to the ${chainId} rule chain for processing, and the results of the rule chain processing are returned to the client.
Supports dynamic refresh of services and routes:
_ = ep.Reload([]byte(newDsl))
Through this method, you can easily implement the following requirements, for example:
- Quickly and dynamically provide external HTTP APIs and let the rule chain handle the API logic.
- Dynamically subscribe to MQTT, Kafka, Nats topics, and hand over the subscription data to the rule chain for processing.
- Periodically trigger rule chain logic.
- Dynamically provide TCP/UDP, websocket, and other services.
- You can easily extend and customize your Endpoint and manage it in a DSL way.
# Other Examples
Subscribe to an MQTT topic and hand it over to the rule chain with ID=default for processing:
{
"id": "e_mqtt_01",
"type": "mqtt",
"name": "mqtt subscription trigger",
"configuration": {
"server": "127.0.0.1:1883",
"username": "admin",
"password": "admin"
},
"routers": [
{
"id": "r1",
"from": {
"path": "#"
},
"to": {
"path": "default"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Trigger the execution of the rule chain with ID=default every 1 second:
{
"id": "schedule_e1",
"type": "schedule",
"name": "schedule",
"routers": [
{
"from": {
"path": "*/1 * * * * *"
},
"to": {
"path": "default"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15