Data source described in the Data Source API section all are finite type of data source int he sense that they all have a finite number of rows. SymetryML also support Stream Data Source that in theory can contain an infinite number of rows. Currently, both Kafka and NATS Stream Data Sources are supported.
To use a Kafka stream data source, create a JSON data structure described in DSInfo that contains the fields in the table below. Please note that for SymetryML to be able to read the data in your topic it needs to use both the KafkaAvroDeserializer as well as the Kafka Registry.
Fields Required to Create a Stream DSInfo Data Structure
Type of Data Source - kafka for Kafka streams or nats for NATS streams
path of the file / entity
Key
Required / Optional
Description
Kafka Stream configuration parameter.
Kafka Stream configuration parameter.
Kafka Stream configuration parameter.
Kafka Stream configuration parameter. Default to all partitions. The list of partition to use must be defined as a list of comma separated integers. For instance 0,2,4,6,8 or 0,1,2,6,7,10,12.
kafka_stream_time_between_persist
Default to 300 seconds, that is 5 minutes. How long to wait between when the Symetry Project will persist its state. To disable this pass -1.
Kafka Stream configuration parameter. Default to true
Kafka Stream configuration parameter. Default to 1000 ms.
Kafka Stream configuration parameter. Default to earliest
Any other kafka parameter
Any Kafka parameters can be used as well. One needs to prefix them with sml.kafka. e.g. sml.kafka.client.dns.lookup or sml.kafka.fetch.min.bytes
NATS is a lightweight, high-performance messaging system that provides both publish-subscribe and distributed queueing capabilities. SymetryML can connect to NATS servers to consume streaming data in real-time.
To use a NATS stream data source, create a JSON data structure described in DSInfo with type set to nats and include the following fields in the info map:
Key
Required / Optional
Description
NATS server URL(s) to connect to. For example: nats://localhost:4222. Multiple hosts can be specified as a comma-separated list.
NATS subject to subscribe to. The subject determines which messages the stream will receive.
Format of the data in the NATS messages. Supported values: json, csv, or protobuf
Protobuf schema definition. Required only when data.format is set to protobuf.
Protobuf message type name to deserialize. Required only when data.format is set to protobuf.
Username for NATS authentication. Use for user/password authentication.
Password for NATS authentication. Use in combination with nats_sec_user.
Token for NATS authentication. Use for token-based authentication.
NKey seed for NATS authentication. Use for NKey-based authentication.
Request timeout in seconds. Default is 30 seconds.
Maximum memory (in bytes) for the NATS stream buffer. Default is 1073741824 (1 GB).
NATS streams support three data formats:
JSON - Messages are expected to be in JSON format with attribute names matching the project schema
CSV - Messages are expected to be comma-separated values (or using custom delimiters defined with additional CSV options)
Protobuf - Messages are serialized using Protocol Buffers. Requires both protobuf.schema and protobuf.msg.type.name to be specified
Stream Data Source Encryption
Same as one need to encrypt normal data source, stream data source information needs to be encrypted. Please consult the Data Source Encryption for details.
Stream Data Source Create
This API function creates a new stream data source and attach it to a SymetryML project - the owner project. Once created the new stream data source will continuously pull data from Kafka and then push the new data tuple into SymetryML Project in a streaming fashion.
Parameter
Required / Optional
Description
if true then start streaming data from beginning of the stream.
HTTP Status Code
HTTP Status Message
Description
A stream data source with the specified name already exists.
Stream Data Source Browse
This methods allows you to browse available stream on your stream server. For Kafka this means listing topic that are available.
HTTP Status Code
HTTP Status Message
Description
HTTP Response Entity
HTTP Response Entity
Description
Contains listing information from the streaming server.
Stream Data Source Preview
This methods allows you to preview a sample of the data available on a given stream.
HTTP Status Code
HTTP Status Message
Description
HTTP Response Entity
HTTP Response Entity
Description
Dataframe containing a preview of the data
Stream Data Source Metrics
This rest endpoint return information about a stream. Number of rows processed, tuples / secs processed, etc… Information varies with the type of the stream.
HTTP Status Code
HTTP Status Message
Description
HTTP Response Entity
HTTP Response Entity
Description
A map with key, value as string pair
Stream Data Source Start
This rest endpoint start / resume a stream data source. That is start pulling data and push it into the owner SymetryML project.
Parameter
Required/Optional?
Description
if true then start streaming data from beginning of the stream.
HTTP Status Code
HTTP Status Message
Description
Stream Data Source Stop
This rest endpoint stop a stream data source. Data will not be pushed anymore to the owner SymetryML project.
HTTP Status Code
HTTP Status Message
Description
Delete a Stream
Delete a stream from a project. If the stream is running it will first be stopped.
Canonical URL Parameters
HTTP Status Code
HTTP Status Message
Description
Stream Data Source Error Log
This rest endpoint return a list of error for a given stream. Since stream happen asynchronously in the background, it allows to check for any problems with a given stream
HTTP Status Code
HTTP Status Message
Description
HTTP Response Entity
HTTP Response Entity
Description
A list of error for that stream
Stream Data Source List
This rest endpoint return a list of streams name that belong to a given project for a given user.
HTTP Status Code
HTTP Status Message
Description
HTTP Response Entity
HTTP Response Entity
Description
A list of stream data source name