DSL
# Endpoint DSL v0.21.0+
Allows dynamic creation and updating of different types of endpoints (Endpoint) through DSL, such as configuring: mqtt, http, ws, schedule, and other endpoint triggers, as well as configuring routes and processors.
The DSL configuration file for Endpoint
is a JSON-formatted file, structured as follows:
Field | Type | Required | Description |
---|---|---|---|
id | string | Yes | Endpoint ID |
name | string | No | Route |
type | string | Yes | Endpoint type, such as http, mqtt, ws, net, scheduler |
configuration | object | No | Endpoint configuration, different endpoint types have different configuration fields, please refer to the Endpoint component startup configuration for details. |
additionalInfo | object | No | Extended fields, used to save additional information. |
processor | string[] | No | Global Endpoint Processor list, interceptor names must be registered on the platform. |
routers | router[] | Yes | Router list. |
# router: Endpoint routing
Field | Type | Required | Description |
---|---|---|---|
id | string | Yes | Route ID |
params | string[] | No | Routing parameters, for example: HTTP Endpoint routing parameters are POST/GET/PUT... |
from | From | Yes | Routing source |
to | To | No | Routing target, if configured in the rule chain DSL, the routing target is the rule chain ID itself, no additional specification needed |
additionalInfo | object | No | Extended fields, used to save additional information. |
# router.from: Endpoint routing source
Field | Type | Required | Description |
---|---|---|---|
path | string | Yes | Routing source path, for example: http Endpoint /api/msg/:chainId scheduler Endpoint */1 * * * * * |
configuration | object | No | Routing source configuration, not required |
processor | From | Yes | Processor list,which processes the data from the routing source, interceptor names must be registered on the platform. |
# router.to: Endpoint routing target
Field | Type | Required | Description |
---|---|---|---|
path | string | Yes | Routing target path, can be an executing rule chain ID or a variable such as: ${chainId} |
configuration | object | No | Routing target configuration, not required |
wait | boolean | No | Whether to wait for message processing to complete, default is false. |
processor | From | Yes | Processor list, which is executed after the To executor has completed, interceptor names must be registered on the platform. |
# Register Processors
You can build some processors internally and then call them by their name
in the DSL to perform operations such as data transformation and validation.
First, register the processor:
// Put http header into message metadata
Builtins.Register("headersToMetadata", func(router endpoint.Router, exchange *endpoint.Exchange) bool {
msg := exchange.In.GetMsg()
headers := exchange.In.Headers()
for k := range headers {
msg.Metadata.PutValue(k, headers.Get(k))
}
// Return true to execute a processor or To operation
// Return true if the logic is completed and do not proceed further
return true
})
2
3
4
5
6
7
8
9
10
11
Then: Specify through "processors": ["responseToBody"], multiple can be specified, if the processor returns false, it will not proceed further.
# Example
{
"id": "e1",
"type": "http",
"name": "http server",
"configuration": {
"server": ":9090"
},
"routers": [
{
"id":"r1",
"params": [
"post"
],
"from": {
"path": "/api/v1/test/:chainId",
"configuration": {
}
},
"to": {
"path": "${chainId}",
"wait": true,
"processors": ["responseToBody"]
},
"additionalInfo": {
"aa":"aa"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
To start an Endpoint through DSL:
// Initialize the rule chain
ruleDsl, err := os.ReadFile(testRulesFolder + "/filter_node.json")
_, err = engine.New("test01", ruleDsl)
if err != nil {
t.Fatal(err)
}
// Read the endpoint DSL file
endpointBuf, err := os.ReadFile(testEndpointsFolder + "/http_01.json")
if err != nil {
t.Fatal(err)
}
// Initialize the endpoint
ep, err = endpoint.New("", endpointBuf, endpoint.DynamicEndpointOptions.WithConfig(config))
if err != nil {
t.Fatal(err)
}
// Start the endpoint
err = ep.Start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
This DSL starts an HTTP service on port 9090 and provides the /api/v1/test/:chainId API interface. The processing logic is handed over to the ${chainId} rule chain for processing, and the results of the rule chain processing are returned to the client.
Supports dynamic refresh of services and routes:
_ = ep.Reload([]byte(newDsl))
Through this method, you can easily implement the following requirements, for example:
- Quickly and dynamically provide external HTTP APIs and let the rule chain handle the API logic.
- Dynamically subscribe to MQTT, Kafka, Nats topics, and hand over the subscription data to the rule chain for processing.
- Periodically trigger rule chain logic.
- Dynamically provide TCP/UDP, websocket, and other services.
- You can easily extend and customize your Endpoint and manage it in a DSL way.
# Other Examples
Subscribe to an MQTT topic and hand it over to the rule chain with ID=default for processing:
{
"id": "e_mqtt_01",
"type": "mqtt",
"name": "mqtt subscription trigger",
"configuration": {
"server": "127.0.0.1:1883",
"username": "admin",
"password": "admin"
},
"routers": [
{
"id": "r1",
"from": {
"path": "#"
},
"to": {
"path": "default"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Trigger the execution of the rule chain with ID=default every 1 second:
{
"id": "schedule_e1",
"type": "schedule",
"name": "schedule",
"routers": [
{
"from": {
"path": "*/1 * * * * *"
},
"to": {
"path": "default"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15