SysFlow Telemetry Pipeline is a framework for monitoring cloud and enterprise workloads. The framework builds the plumbing required for system telemetry so that users can focus on writing and sharing analytics on a scalable, common open-source platform.
The backbone of the telemetry pipeline is a new data format which lifts raw system event information into an abstraction that describes process behaviors, and their relationships with containers, files, and network activity. This object-relational format is highly compact, yet it provides broad visibility into legacy endpoints and container clouds.
The platform is designed as a pluggable edge processing architecture which includes a policy engine that accepts declarative policies that support edge filtering, tagging, and alerting on SysFlow streams. It also offers several APIs that allow users to process SysFlow with their favorite toolkits.
The pipeline can be deployed using Docker, Kubernetes, OpenShift, and bare metal/VMs. The SysFlow agent can be configured as an edge analytics pipeline to stream SysFlow records through rsyslog, or as a batch exporter of raw SysFlow traces to S3-compatible object stores.
An integrated Jupyter environment makes it easy to perform log hunting on collected traces. There are also Apache Avro schema files for SysFlow so that users can generate APIs for other programming languages. C++, Python, and Golang APIs are available, allowing users to interact with SysFlow traces programmatically.
We encourage you to check the documentation first, but here are a few tips for a quick start.
The easiest way to run the SysFlow collector is from a Docker container, with host mount for the output trace files. The following command shows how to run sf-collector with trace files exported to /mnt/data
on the host.
docker run -d –privileged –name sf-collector \
-v /var/run/docker.sock:/host/var/run/docker.sock \
-v /dev:/host/dev -v /proc:/host/proc:ro \
-v /boot:/host/boot:ro -v /lib/modules:/host/lib/modules:ro \
-v /usr:/host/usr:ro -v /mnt/data:/mnt/data \
-e INTERVAL=60 \
-e EXPORTER_ID=${HOSTNAME} \
-e OUTPUT=/mnt/data/ \
-e FILTER=”container.name!=sf-collector and container.name!=sf-exporter” \
–rm sysflowtelemetry/sf-collector
where INTERVAL denotes the time in seconds before a new trace file is generated, EXPORTER_ID sets the exporter name, OUTPUT is the directory in which trace files are written, and FILTER is the filter expression used to filter collected events. Note: append container.type!=host
to FILTER expression to filter host events.
The SysFlow agent can be deployed in S3 (batch) or rsyslog (edge processing) export configurations. In the batch configuration, SysFlow exports the collected telemetry as trace files (batches of SysFlow records) to any S3-compliant object storage service.
In edge processing configuration, SysFlow exports the collected telemetry as events streamed to a rsyslog collector. This deployment enables the creation of customized edge pipelines, and offers a built-in policy engine to filter, enrich, and alert on SysFlow records.
Instructions for Docker Compose
, Helm
, and OpenShift
deployments of complete SysFlow stacks are available here.
A command line utilitiy is provided for inspecting collected traces or convert traces from SysFlow’s compact binary format into human-readable JSON or CSV formats.
docker run –rm -v /mnt/data:/mnt/data sysflowtelemetry/sysprint /mnt/data/
where trace
is the the name of the trace file inside /mnt/data
. If empty, all files in /mnt/data
are processed. By default, the traces are printed to the standard output with a default set of SysFlow attributes. For a complete list of options, run:
docker run --rm -v /mnt/data:/mnt/data sysflowtelemetry/sysprint -h
A Jupyter environment is also available for inspecting and implementing analytic notebooks on collected SysFlow data. It includes APIs for data manipulation using Pandas dataframes and a native query language (sfql
) with macro support. To start it locally with example notebooks, run:
git clone https://github.com/sysflow-telemetry/sf-apis.git && cd sf-apis
docker run –rm -d –name sfnb –user $(id -u):$(id -g) –group-add users -v $(pwd)/pynb:/home/jovyan/work -p 8888:8888 sysflowtelemetry/sfnb
Then, open a web browser and point it to http://localhost:8888
(alternatively, the remote server name or IP where the notebook is hosted). To obtain the notebook authentication token, run docker logs sfnb
.
SysFlow is an open specification for system event-level telemetry. The main goal of SysFlow is to create a standard and extensible data format for both security and performance analytics for compute workloads. An open standard will enable researchers and practitioners to more easily work on a common data format, and focus on analytics using open source software.
The primary objective of SysFlow is to lift raw system call data into more semantic process behaviors which promote significant data reductions for longer term forensic storage of data which is crucial for security analyzes. Through an object relational model of entities, events and flows, we enable SysFlow users to configure the desired granularity of data collection and filtering in order to facilitate most types of analysis in big data frameworks.
SysFlow is an object relational model of entities, events and flows that describe the behaviors of processes on a system, and encode them into an open format. A SysFlow exporter is designed to monitor system events of a workload, convert them to SysFlow objects, and output them in a binary output file. We envision that one exporter will be deployed per host (or Virtual Machine) and will output one binary file over a particular time period. Figure 1 show a detailed view of the objects that the SysFlow exporter will export.
Entities represent the components on a system that we are interested in monitoring. In this version of SysFlow, we support three types of entities: Containers, Processes, and Files. As shown in Figure 1, Containers contain both Processes and Files, and the three are linked through object identifiers (more on this later).
Entity behaviors are modeled as events or flows. Events represent important individual behaviors of an entity that are broken out on their own due to their importance, their rarity, or because maintaining operation order is important. An example of an event would be a process clone or exec, or the deletion or renaming of a file. By contrast, a Flow represents an aggregation of multiple events that naturally fit together to describe a particular behavior. For example, we can model the network interactions of a process and a remote host as a bidirectional flow that is composed of several events, including connect, read, write, and close.
The idea behind SysFlow is to enable the user to configure the granularity of system-level data desired based on resource limitations and data analytics requirements. In this way, behaviors can be broken out into individual events or combined into smaller aggregated volumetric flows. The current version of the specification describes events and flows in three key behavioral areas: Files, Networks, and Processes. Figure 1 shows these events and flows with their attributes and relationships to entities, which are described in greater details in the following sections
As mentioned above, entities are the components on a system that we are interested in monitoring. These include containers, processes, and files. We also support a special entity object called a Header, which stores information about the SysFlow version, and a unique ID representing the host or virtual machine monitored by the SysFlow exporter. The header is always the first record appearing in a SysFlow File. All other entities contain a timestamp, an object ID and a state. The timestamp is used to indicate the time at which the entity was exported to the SysFlow file.
Object IDs allow events and flows to reference entities without having duplicate information stored in each record. Object IDs are not required to be globally unique across space and time. In fact, the only requirement for uniqueness is that no two objects managed by a SysFlow exporter can have the same ID simultaneously. Entities are always written to the binary output file before any events, and flows associated with them are exported. Since entities are exported first, each event, and flow is matched with the entity (with the same id) that is closest to it in the file. Furthermore, every binary output file must be self-contained, meaning that all entities referenced by flows/events must be present in every SysFlow file generated.
The state is an enumeration that indicates why an entity was written to disk. The state can currently be one of three values:
State | Description |
---|---|
CREATED | Indicates that the entity was recently created on the host/VM. For example, a process clone. |
MODIFIED | Indicates that some attributes of the entity were modified since the last time it was exported. |
REUP | Indicates that the entity already existed, but is being exported again, so that output files can be self-contained. |
Each entity is defined below with recommendations on what to use for object identifiers, based on what is used in the current implementation of the SysFlow exporter.
The Header entity is an object which appears at the beginning of each binary SysFlow file. It contains the current version of SysFlow as supported in the file, and the exporter ID.
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
version | long | The current SysFlow version. | 1 |
exporter | string | Globally unique id representing the host monitored by SysFlow. | 1 |
ip | string | IP address in dot notation representing the monitored host. | 2 |
The Container entity represents a system or application container such as docker or LXC. It contains important information about the container including its id, name, and whether it is privileged.
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
id | string | Unique string representing the Container Object as provided by docker, LXC, etc. | 1 |
state | enum | state of the process (CREATED, MODIFIED, REUP). | not implemented |
timestamp (ts) | int64 | The timestamp when container object is exported (nanoseconds). | not implemented |
name | string | Container name as provided by docker, LXC, etc. | 1 |
image | string | Image name associated with container as provided by docker, LXC, etc. | 1 |
imageID | string | Image ID associated with container as provided by docker, LXC, etc. | 1 |
type | enum | Can be one of: CT_DOCKER, CT_LXC, CT_LIBVIRT_LXC, CT_MESOS, CT_RKT, CT_CUSTOM | 1 |
privileged | boolean | If true, the container is running with root privileges | 1 |
The process entity represents a running process on the system. It contains important information about the process including its host pid, creation time, oid id, as well as references to its parent id. When a process entity is exported to a SysFlow file, all its parent processes should be exported before the process, as well as the process’s Container entity. Processes are only exported to a SysFlow file if an event or flow associated with that process or any of its threads are exported. Threads are not explicitly exported in the process object but are represented in events and flows through a thread id field. Finally, a Process entity only needs to be exported to a file once, unless it’s been modified by an event or flow.
NOTE: In current implementation, the creation timestamp is the time at which the process is cloned. If the process was cloned before capture was started, this value is 0. The current implementation also has problems getting absolute paths for exes when relative paths are used to launch processes.
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
state | enum | state of the process (CREATED, MODIFIED, REUP) | 1 |
OID: host pid create ts | struct int64 int64 | The Process OID contains the host pid of the project, and creation timestamp. | 1 |
POID: parent host pid parent create ts | struct int64 int64 | The OID of the parent process can be NULL if not available or if a root process. | 1 |
timestamp (ts) | int64 | The timestamp when process object is exported (nanoseconds). | 1 |
exe | string | Full path (if available) of the executable used in the process launch; otherwise, it’s the name of the exe. | 1 |
exeArgs | string | Concatenated list of args passed on process startup. | 1 |
uid | int32 | User ID under which the process is running. | 1 |
userName | string | User name under which the process is running. | 1 |
gid | int32 | Group ID under which the process is running | 1 |
groupName | string | Group Name under which the process is running | 1 |
tty | boolean | If true, the process is tied to a shell | 1 |
containerId | string | Unique string representing the Container Object to which the process resides. It can be NULL if process isn’t in a container. | 1 |
entry | boolean | If true, the process is a container or system entrypoint (i.e., virtual pid = 1). | 2 |
The File entity represents file-based resources on a system including files, directories, unix sockets, and pipes.
NOTE: Current implementation does not have access to inode related values, which would greatly improve object ids. Also, the current implementation has some issues with absolute paths when monitoring operations that use relative paths.
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
state | enum | state of the file (CREATED, MODIFIED, REUP) | 1 |
FOID: | string (128bit) | File Identifier, is a SHA1 hash of the concatenation of the path + container ID | 1 |
timestamp (ts) | int64 | The timestamp when file object is exported (nanoseconds). | 1 |
restype | enum | Indicates the resource type. Currently support: SF_FILE, SF_DIR, SF_UNIX (unix socket), SF_PIPE, SF_UNKNOWN | 1 |
path | string | Full path of the file/directory, or unique identifier for pipe, unix socket | 1 |
containerId | string | Unique string representing the Container Object to which the file resides. Can be NULL if file isn’t in a container. | 1 |
Events represent important individual behaviors of an entity that are broken out on their own due to their importance, their rarity, or because maintaining operation order is important. In order to manage events and their differing attributes, we divide them into three different categories: Process, File, and Network events. These are described more in detail later on.
Each event and flow contains a process object id, a timestamp, a thread id, and a set of operation flags. The process object id represents the Process Entity on which the event occurred, while the thread id indicates which process thread was associated with the event.
The operation flags describe the actual behavior associated with the event (or flow). The flags are represented in a single bitmap which enables multiple behaviors to be combined easily into a flow. An event will have a single bit active, while a flow could have several. The current supported flags are as follows:
Operation | Numeric ID | Description | System Calls | Evts/Flows Supported | Since (schema version) |
---|---|---|---|---|---|
OP_CLONE | (1 << 0) | Process or thread cloned. | clone() | ProcessEvent | 1 |
OP_EXEC | (1 << 1) | Execution of a file | execve() | ProcessEvent | 1 |
OP_EXIT | (1 << 2) | Process or thread exit. | exit() | ProcessEvent | 1 |
OP_SETUID | (1 << 3) | UID of process was changed | setuid(), setresuid | ProcessEvent | 1 |
OP_SETNS | (1 << 4) | Process entering namespace | setns() | FileFlow | 1 |
OP_ACCEPT | (1 << 5) | Process accepting network connections | accept(), select() | NetworkFlow | 1 |
OP_CONNECT | (1 << 6) | Process connecting to remote host or process | connect() | NetworkFlow | 1 |
OP_OPEN | (1 << 7) | Process opening a file/resource | open(), openat(), create() | FileFlow | 1 |
OP_READ_RECV | (1 << 8) | Process reading from file, receiving network data | read(),pread(),recv(),recvfrom(),recvmsg() | NetworkFlow, FileFlow | 1 |
OP_WRITE_SEND | (1 << 9) | Process writing to file, sending network data | write(),pwrite(),send(),sendto(),sendmsg() | NetworkFlow, FileFlow | 1 |
OP_CLOSE | (1 << 10) | Process close resource | close(),socketshutdown | NetworkFlow, FileFlow | 1 |
OP_TRUNCATE | (1 << 11) | Premature closing of a flow due to exporter shutdown | N/A | NetworkFlow, FileFlow | 1 |
OP_SHUTDOWN | (1 << 12) | Shutdown all or part of a full duplex socket connection | shutdown() | NetworkFlow | 1 |
OP_MMAP | (1 << 13) | Memory map of a file. | mmap() | FileFlow | 1 |
OP_DIGEST | (1 << 14) | Summary flow information for long running flows | N/A | NetworkFlow, FileFlow | 1 |
OP_MKDIR | (1 << 15) | Make directory | mkdir(), mkdirat() | FileEvent | 1 |
OP_RMDIR | (1 << 16) | Remove directory | rmdir() | FileEvent | 1 |
OP_LINK | (1 << 17) | Process creates hard link to existing file | link(), linkat() | FileEvent | 1 |
OP_UNLINK | (1 << 18) | Process deletes file | unlink(), unlinkat() | FileEvent | 1 |
OP_SYMLINK | (1 << 19) | Process creates sym link to existing file | symlink(), symlinkat() | FileEvent | 1 |
OP_RENAME | (1 << 20) | File renamed | rename(), renameat() | FileEvent | 1 |
A Process Event is an event that creates or modifies a process in some way. Currently, we support four Process Events (referred to as operations), and their behavior in SysFlow is described below.
Operation | Behavior |
---|---|
OP_CLONE | Exported when a new process or thread is cloned. A new Process Entity should be exported prior to exporting the clone operation of a new process. |
OP_EXEC | Exported when a process calls an exec syscall. This event will modify an existing process, and should be accompanied by a modified Process Entity. |
OP_EXIT | Exported on a process or thread exit. |
OP_SETUID | Exported when a process’s UID is changed. This event will modify an existing process, and should be accompanied by a modified Process Entity. |
The list of attributes for the Process Event are as follows:
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
OID: host pid create ts | struct int64 int64 | The OID of the process for which the event occurred. | 1 |
timestamp (ts) | int64 | The timestamp when the event occurred (nanoseconds). | 1 |
tid | int64 | The id of the thread associated with the ProcessEvent. If the running process is single threaded tid == pid | 1 |
opFlags | int64 | The id of the syscall associated with the event. See list of Operation Flags for details. | 1 |
args | string[] | An array of arguments encoded as string for the syscall. | Sparingly implemented. Only really used with setuid for now. |
ret | int64 | Syscall return value. | 1 |
A File Event is an event that creates, deletes or modifies a File Entity. Currently, we support six File Events (referred to as operations), and their behavior in SysFlow is described below.
Operation | Behavior |
---|---|
OP_MKDIR | Exported when a new directory is created. Should be accompanied by a new File Entity representing the directory |
OP_RMDIR | Exported when a directory is deleted. |
OP_LINK | Exported when a process creates a hard link to an existing file. Should be accompanied by a new File Entity representing the new link. |
OP_UNLINK | Exported when a process deletes a file. |
OP_SYMLINK | Exported when a process creates a sym link to an existing file. Should be accompanied by a new File Entity representing the new link. |
OP_RENAME | Exported when a process creates renames an existing file. Should be accompanied by a new File Entity representing the renamed file. |
NOTE: We’d like to also support chmod and chown but these two operations are not fully supported in sysdig. We’d also like to support umount and mount but these operations are not implemented. We anticipate supporting these in a future version.
The list of attributes for the File Event are as follows:
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
OID: host pid create ts | struct int64 int64 | The OID of the process for which the event occurred. | 1 |
timestamp (ts) | int64 | The timestamp when the event occurred (nanoseconds). | 1 |
tid | int64 | The id of the thread associated with the FileEvent. If the running process is single threaded tid == pid | 1 |
opFlags | int64 | The id of the syscall associated with the event. See list of Operation Flags for details. | 1 |
ret | int64 | Syscall return value. | 1 |
FOID: | string (128bit) | The id of the file on which the system call was called. File Identifier, is a SHA1 hash of the concatenation of the path + container ID. | 1 |
NewFOID: | string (128bit) | Some syscalls (link, symlink, etc.) convert one file into another requiring two files. This id is the id of the file secondary or new file on which the system call was called. File Identifier, is a SHA1 hash of the concatenation of the path + container ID. Can be NULL. | 1 |
Currently, not implemented.
A Flow represents an aggregation of multiple events that naturally fit together to describe a particular behavior. They are designed to reduce data and collect statistics. Examples of flows include an application reading or writing to a file, or sending and receiving data from another process or host. Flows represent a number of events occurring over a period of time, and as such each flow has a set of operations (encoded in a bitmap), a start and an end time. One can determine the operations in the flow by decoding the operation flags.
A flow can be started by any supported operation and are exported in one of two ways. First, they are exported on an exit, or close event signifying the end of a connection, file interaction, or process. Second, a long running flow is exported after a preconfigured time period. After a long running flow is exported, its counters and flags are reset. However, if there is no activity on the flow over a preconfigured period of time, that flow is no longer exported.
In this section, we describe three categories of Flows: Process, File and Network Flows.
A Process Flow represents a summarization of the number of threads created and destroyed over a time period. Process Flows are partially implemented in the collector and will be fully implemented in a later release. Since schema version 2. Currently we support the following operations:
Operation | Behavior |
---|---|
OP_CLONE | Recorded when a new thread is cloned. |
OP_EXIT | Recorded on a thread exit. |
The list of attributes for the Process Flow are as follows:
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
OID: host pid create ts | struct int64 int64 | The OID of the process for which the flow occurred. | 2 |
timestamp (ts) | int64 | The timestamp when the flow starts (nanoseconds). | 2 |
numThreadsCloned | int64 | The number of threads cloned during the duration of the flow. | 2 |
opFlags | int64 (bitmap) | The id of one or more syscalls associated with the ProcessFlow. See list of Operation Flags for details. | 2 |
endTs | int64 | The timestamp when the process flow is exported (nanoseconds). | 2 |
numThreadsExited | int64 | Number of threads exited during the duration of the flow. | 2 |
numCloneErrors | int64 | Number of clone errors occuring during the duration of the flow. | 2 |
A File Flow represents a collection of operations on a file. Currently we support the following operations:
Operation | Behavior |
---|---|
OP_SETNS | Process entering namespace entry in mounted file related to reference File Entity |
OP_OPEN | Process opening a file/resource. |
OP_READ_RECV | Process reading from file/resource. |
OP_WRITE_SEND | Process writing to file. |
OP_MMAP | Processing memory mapping a file. |
OP_CLOSE | Process closing resource. This action will close corresponding FileFlow. |
OP_TRUNCATE | Indicates Premature closing of a flow due to exporter shutdown. |
OP_DIGEST | Summary flow information for long running flows (not implemented). |
The list of attributes for the File Flow are as follows:
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
OID: host pid create ts | struct int64 int64 | The OID of the process for which the flow occurred. | 1 |
timestamp (ts) | int64 | The timestamp when the flow starts (nanoseconds). | 1 |
tid | int64 | The id of the thread associated with the flow. If the running process is single threaded tid == pid | 1 |
opFlags | int64 (bitmap) | The id of one or more syscalls associated with the FileFlow. See list of Operation Flags for details. | 1 |
openFlags | int64 | Flags associated with an open syscall if present. | 1 |
endTs | int64 | The timestamp when the file flow is exported (nanoseconds). | 1 |
FOID: | string (128bit) | The id of the file on which the system call was called. File Identifier, is a SHA1 hash of the concatenation of the path + container ID. | 1 |
fd | int32 | The file descriptor associated with the flow. | 1 |
numRRecvOps | int64 | Number of read operations performed during the duration of the flow. | 1 |
numWSendOps | int64 | Number of write operations performed during the duration of the flow. | 1 |
numRRecvBytes | int64 | Number of bytes read during the duration of the flow. | 1 |
numWSendBytes | int64 | Number of bytes written during the duration of the flow. | 1 |
A Network Flow represents a collection of operations on a network connection. Currently we support the following operations:
Operation | Behavior |
---|---|
OP_ACCEPT | Process accepted a new network connection. |
OP_CONNECT | Process connected to a remote host or process. |
OP_READ_RECV | Process receiving data from a remote host or process. |
OP_WRITE_SEND | Process sending data to a remote host or process. |
OP_SHUTDOWN | Process shutdown full or single duplex connections. |
OP_CLOSE | Process closing network connection. This action will close corresponding NetworkFlow. |
OP_TRUNCATE | Indicates Premature closing of a flow due to exporter shutdown. |
OP_DIGEST | Summary flow information for long running flows (not implemented). |
The list of attributes for the Network Flow are as follows:
Attribute | Type | Description | Since (schema version) |
---|---|---|---|
OID: host pid create ts | struct int64 int64 | The OID of the process for which the flow occurred. | 1 |
timestamp (ts) | int64 | The timestamp when the flow starts (nanoseconds). | 1 |
tid | int64 | The id of the thread associated with the flow. If the running process is single threaded tid == pid | 1 |
opFlags | int64 (bitmap) | The id of one or more syscalls associated with the flow. See list of Operation Flags for details. | 1 |
endTs | int64 | The timestamp when the flow is exported (nanoseconds). | 1 |
sip | int32 | The source IP address. | 1 |
sport | int16 | The source port. | 1 |
dip | int32 | The destination IP address. | 1 |
dport | int16 | The destination port. | 1 |
proto | enum | The network protocol of the flow. Can be: TCP, UDP, ICMP, RAW | 1 |
numRRecvOps | int64 | Number of receive operations performed during the duration of the flow. | 1 |
numWSendOps | int64 | Number of send operations performed during the duration of the flow. | 1 |
numRRecvBytes | int64 | Number of bytes received during the duration of the flow. | 1 |
numWSendBytes | int64 | Number of bytes sent during the duration of the flow. |
SysFlow Collector (sf-collector repo)
The SysFlow Collector monitors and collects system call and event information from hosts and exports them in the SysFlow format using Apache Avro object serialization. SysFlow lifts system call information into a higher order object relational form that models how containers, processes and files interact with their environment through process control flow, file, and network operations. Learn more about SysFlow in the SysFlow Specification Document.
The SysFlow Collector is currently built upon a Sysdig core and requires the Sysdig probe to passively collect system events and turn them into SysFlow. As a result, the collector supports Sysdig’s powerful filtering capabilities. Please see the build and installation instructions for installing the collector.
The sf-collector project has been tested primarily on Ubuntu 16.04 and 18.04. The project will be tested on other flavors of UNIX in the future. This document describes how to build and run the application both inside a docker container and on a linux host. Building and running the application inside a docker container is the easiest way to start. For convenience, skip the build step and pull pre-built images directly from Docker Hub.
To build the project, first pull down the source code, with submodules:
git clone –recursive https://github.com/sysflow-telemetry/sf-collector.git
To checkout submodules on an already cloned repo:
git submodule update --init --recursive
To build as docker container:
cd sf-collector
make -C modules init
docker build –target runtime -t sf-collector .
The container is built in stages to enable caching of the intermediate steps of the build and reduce final image sizes.
First, install required dependencies:
apt install patch base-files binutils bzip2 libdpkg-perl perl make xz-utils libncurses5-dev libncursesw5-dev cmake libboost-all-dev g++ flex bison wget libelf-dev liblog4cxx-dev libapr1 libaprutil1 libsparsehash-dev libsnappy-dev libgoogle-glog-dev libjsoncpp-dev
To build the collector:
cd sf-collector
make install
Running the collector from the command line
The collector has the following options:
Usage: sysporter [options] -w
Options:
-h Show this help message and exit
-w file name/dir (required) The file or directory to which sysflow records are written. If a directory is specified (using a trailing slash), file name will be an epoch timestamp. If -G is specified, then the file name specified will have an epoch timestamp appended to it
-e exporterID A globally unique ID representing the host or VM being monitored which is stored in the sysflow dumpfile header. If -e not set, the hostname of the CURRENT machine is used, which may not be accurate for reading offline scap files
-G interval (in secs) Rotates the dumpfile specified in -w every interval seconds and appends epoch timestamp to file name
-r scap file The scap file to be read and dumped as sysflow format at the file specified by -w. If this option is not specified, a live capture is assumed
-s schema file The sysflow avro schema file (.avsc) used for schema validation (default: /usr/local/sysflow/conf/SysFlow.avsc)
-f filter Sysdig style filtering string to filter scap. Must be surrounded by quotes
-c Simple, fast filter to allow only container-related events to be dumped
-p cri-o path The path to the cri-o domain socket
-t cri-o timeout The amount of time in ms to wait for cri-o socket to respond
-u domain socket file Outputs SysFlow to a unix domain socket rather than to a file
-v Print version information and exit
Example usage
Convert Sysdig scap file to SysFlow file with an export id. The output will be written to output.sf
. Note that the collector must be run with root privilege:
sysporter -r input.scap -w ./output.sf -e host
Trace a system live, and output SysFlow to files in a directory which are rotated every 30 seconds. The file name will be an epoch timestamp of when the file was initially written. Note that the trailing slash must be present. The example filter ensures that only SysFlow from containers is generated.
sysporter -G 30 -w ./output/ -e host -f “container.type!=host and container.type=docker”
Running the collector from a Docker container
The easiest way to run the SysFlow collector is from a Docker container, with host mount for the output trace files. The following command shows how to run sf-collector with trace files exported to /mnt/data
on the host.
docker run -d –privileged –name sf-collector \
-v /var/run/docker.sock:/host/var/run/docker.sock \
-v /dev:/host/dev -v /proc:/host/proc:ro \
-v /boot:/host/boot:ro -v /lib/modules:/host/lib/modules:ro \
-v /usr:/host/usr:ro -v /mnt/data:/mnt/data \
-e INTERVAL=60 \
-e EXPORTER_ID=${HOSTNAME} \
-e OUTPUT=/mnt/data/ \
-e FILTER=”container.name!=sf-collector and container.name!=sf-exporter” \
–rm sysflowtelemetry/sf-collector
where INTERVAL denotes the time in seconds before a new trace file is generated, EXPORTER_ID sets the exporter name, OUTPUT is the directory in which trace files are written, and FILTER is the filter expression used to filter collected events. Note: append container.type!=host
to FILTER expression to filter host events.
CRI-O support
The sf-collector project currently supports docker and kubernetes deployments (using the helm charts provided in sf-deployments). Container runtimes based on CRI-O is planned for futures releases of the collector.
SysFlow Processor (sf-processor repo)
The SysFlow processor is a lighweight edge analytics pipeline that can process and enrich SysFlow data. The processor is written in golang, and allows users to build and configure various pipelines using a set of built-in and custom plugins and drivers. Pipeline plugins are producer-consumer objects that follow an interface and pass data to one another through pre-defined channels in a multi-threaded environment. By contrast, a driver represents a data source, which pushes data to the plugins. The processor currently supports two builtin drivers, including one that reads sysflow from a file, and another that reads streaming sysflow over a domain socket. Plugins and drivers are configured using a JSON file.
A core built-in plugin is a policy engine that can apply logical rules to filter, alert, or semantically label sysflow records using a declarative language based on the Falco rules syntax with a few added extensions (more on this later).
Custom plugins and drivers can be implemented as dynamic libraries to tailor analytics to specific user requirements.
The endpoint of a pipeline configuration is an exporter plugin that sends the processed data to a target. The processor supports various types of export plugins for a variety of different targets.
Prerequisites
The processor has been tested on Ubuntu/RHEL distributions, but should work on any Linux system.
- Golang version 1.14+ and make (if buiding from sources)
- Docker, docker-compose (if building with docker)
Build
Clone the processor repository
git clone https://github.com/sysflow-telemetry/sf-processor.git
Build locally, from sources
cd sf-processor
make build
Build with docker
cd sf-processor
make docker-build
Usage
For usage information, type:
cd driver/ ./sfprocessor -help
This should yield the following usage statement:
Usage: sfprocessor [[-version]|[-driver ] [-log ] [-driverdir ] [-plugdir ] path]
Positional arguments:
path string
Input path
Arguments:
-config string
Path to pipeline configuration file (default “pipeline.json”)
-cpuprofile file
Write cpu profile to file
-driver string
Driver name {file|socket|} (default “file”)
-driverdir string
Dynamic driver directory (default “../resources/drivers”)
-log string
Log level {trace|info|warn|error} (default “info”)
-memprofile file
Write memory profile to file
-plugdir string
Dynamic plugins directory (default “../resources/plugins”)
-test
Test pipeline configuration
-traceprofile file
Write trace profile to file
-version
Output version informa
The four most important flags are config
, driverdir
, plugdir
, and driver
. The config
flag points to a pipeline configuration file, which describes the entire pipeline and settings for the individual settings for the plugins. The driverdir
and plugdir
flags specify where any dynamic drivers and plugins shared libraries reside that should be loaded by the processor at runtime. The driver
flag accepts a label to a pre-configured driver (either built-in or custom) that will be used as the data source to the pipeline. Currently, the pipeline only supports one driver at a time, but we anticipate handling multiple drivers in the future. There are two built-in drivers:
- file: loads a sysflow file reading driver that reads from
path
. - socket: the processor loads a sysflow streaming driver. The driver creates a domain socket named
path
and acts as a server waiting for a SysFlow collector to attach and send sysflow data.
Pipeline Configuration
The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a yaml
file. An example pipeline with this configuration looks as follows:
{
“pipeline”:[
{
“processor”: “sysflowreader”,
“handler”: “flattener”,
“in”: “sysflow sysflowchan”,
“out”: “flat flattenerchan”
},
{
“processor”: “policyengine”,
“in”: “flat flattenerchan”,
“out”: “evt eventchan”,
“policies”: “../resources/policies/runtimeintegrity”
},
{
“processor”: “exporter”,
“in”: “evt eventchan”,
“export”: “syslog”,
“proto”: “tcp”,
“tag”: “sysflow”,
“host”: “localhost”,
“port”: “514”
}
]
}tion
Note: This configuration can be found in: sf-collector/resources/pipelines/pipeline.syslog.json
This pipeline specifies three built-in plugins:
- sysflowreader: is a generic reader plugin that ingests sysflow from the driver, caches entities, and presents sysflow objects to a handler object (i.e., an object that implements the handler interface) for processing. In this case, we are using the flattener handler, but custom handlers are possible.
- policyengine: is the policy engine, which takes flattened (row-oriented) SysFlow records as input and outputs records, which represent alerts, or filtered sysflow records depending on the policy engine’s mode (more on this later).
- exporter: takes records from the policy engine, and exports them to ElasticSearch, syslog, file, or terminal, in a JSON format or in Elastic Common Schema (ECS) format. Note that custom export plugins can be created to export to other serialization formats and transport protocols.
Each plugin has a set of general attributes that are present in all plugins, and a set of attributes that are custom to the specific plugins. For more details on the specific attributes in this example, see the pipeline configuration template
The general attributes are as follows:
- processor (required): the name of the processor plugin to load. Processors must implement the SFProcessor interface; the name is the value that must be returned from the
GetName()
function as defined in the processor object. - handler (optional): the name of the handler object to be used for the processor. Handlers must implement the SFHandler interface.
- in (required): the input channel (i.e. golang channel) of objects that are passed to the plugin.
- out (optional): the output channel (i.e. golang channel) for objects that are pushed out of the plugin, and into the next plugin in the pipeline sequence.
Channels are modeled as channel objects that have an In
attribute representing some golang channel of objects. See SFChannel for an example. The syntax for a channel in the pipeline is [channel name] [channel type]
. Where channel type is the label given to the channel type at plugin registration (more on this later), and channel name is a unique identifier for the current channel instance. The name and type of an output channel in one plugin must match that of the name and type of the input channel of the next plugin in the pipeline sequence.
A plugin has exacly one input channel but it may specify more than one output channels. This allows pipeline definitions that fan out data to more than one receiver plugin similar to a Unix tee
command. While there must be always one SysFlow reader acting as the entry point of a pipeline, a pipeline configuration may specify policy engines passing data to different exporters or a SysFlow reader passing data to different policy engines. Generally, pipelines form a tree rather being a linear structure.
Policy engine configuration
The policy engine ("processor": "policyengine"
) plugin is driven by a set of rules. These rules are specified in a YAML which adopts the same syntax as the rules of the [Falco](https://falco.org/docs/rules] project. A policy engine plugin specification requires the following attributes:
- policies (required): The path to the YAML rules specification file. More information on rules can be found in the Rules section.
- mode (optional): The mode of the polcy engine. Allowed values are
alert
for generating rule-based alerts,filter
for rule-based filtering of SysFlow events, andbypasss
for unchnanged pass-on of raw syflow events. Default value istalert
. If mode isbypass
the policyengine attribute can be omitted.
Exporter configuration
An exporter ("processor": "exporter"
) plugin consists of two modules, an encoder for converting the data to a suitable format, and a transport module for sending the data to the target. Encoders target specific, i.e. for a particular export target a particular set of encoders may be used. In the exporter configuration the transport module is specified via the export paramater (required). The encoder is selected via the format parameter (optional). The default format is json
.
The following table lists the cuurently supported exporter modules and the corresponding encoders. Additional encoders and transport modules can be implemented if need arises. If you plan to contribute or want to get involved in the discussion please join the SysFlow community.
Some of these combinations require additional configuration as described in the following sections. null
is used for debugging the processor and doesn’t export any data.
Export to file
If export is set to file
, an additional parameter file.path allows the specification of the target file.
Export to syslog
If the export parameter is set to syslog
, output to syslog is enabled and the following addtional parameters are used:
- syslog.proto (optional): The protocol used for communicating with the syslog server. Allows values are
tcp
,tls
andudp
. Default istcp
. - syslog.tag (optional): The tag used for each Sysflow record in syslog. Default is
SysFlow
. - syslog.source (optional): If set adds a hostname to the syslog header.
- syslog.host (optional): The hostname of the sysflow server. Default is
localhost
. - syslog.port (optional): The port pf the syslow server. Default is
514
.
Export to ElasticSearch
Export to ElasticSearch is enabled by setting the config parameter export to es
. The only supported format for export to ElasticSearch is ecs
.
Data export is done via bulk ingestion. The ingestion can be controlled by some additional parameters which are read when the es
export target is selected. Required parameters specify the ES target, index and credentials. Optional parameters control some aspects of the behavior of the bulk ingestion and may have an effect on performance. You may need to adapt their valuesfor optimal performance in your environment.
- es.addresses (required): A comma-separated list of ES endpoints.
- es.index (required): The name of the ES index to ingest into.
- es.username (required): The ES username.
- es.password (required): The password for the specified ES user.
- buffer (optional) The bulk size as the number of records to be ingested at once. Default is
0
but value of0
indicates record-by-record ingestion which may be highly inefficient. - es.bulk.numWorkers (optional): The number of ingestion workers used in parallel. Default is
0
which means that the exporter uses as many workers as there are cores in the machine. - es.bulk.flashBuffer (optional): The size in bytes of the flush buffer for ingestion. It should be large enough to hold one bulk (the number of records specified in buffer), otherwise the bulk is broken into smaller chunks. Default is
5e+6
. - es.bulk.flushTimeout (optional): The flush buffer time threshold. Valid values are golang duration strings. Default is
30s
.
The Elastic exporter does not make any assumption on the existence or configuration of the index specified in es.index. If the index does not exist, Elastic will automatically create it and apply a default dynamic mapping. It may be beneficial to use an explicit mapping for the ECS data generated by the Elastic exporter. For convinience we provide an explicit mapping for creating a new tailored index in Elastic. For more information refer to the Elastic Mapping reference.
Export fo IBM Findings API (IBM Cloud Security & Compliance Center)
Export to IBM Findings API allows adding custom findings to the IBM Cloud Security & Compliance Center (SCC). The mode is enabled via setting the configuration parameter export to findings
. The format parameter must be set to occurence
in this case. For export to IBM Findings, the following parameters are used:
- findings.apikey (required): The API key used for the Advisor service instance.
- findings.url (required): The URL of the Advisor service instance.
- findings.accountid (required): The acccount ID used for the Advisor service instance.
- findings.provider (required): Unique ID of the note provider
- findings.region (required): The cloud region of Advisor service instance.
- findings.sqlqueryurl (required):
- findings.sqlquerycrn (required):
- findings.s3region (required):
- findings.s3bucket (required):
- findings.path (required):
- findings.pool.capacity (optional): The capacity of the findings pool, Default is
250
. - findings.pool.maxage (woptional): The maximum age of the security findings in the pool in minutes. Default is
1440
.
For more information about inserting custom findings into IBM SCC, refer to Custom Findings section of IBM Cloud Security Advisor.
Override plugin configuration attributes with environment variables
It is possible to override any of the custom attributes of a plugin using an environment variable. This is especially useful when operating the processor as a container, where you may have to deploy the processor to multiple nodes, and have attributes that change per node. If an environment variable is set, it overrides the setting inside the config file. The environment variables must follow the following structure:
- Environment variables must follow the naming schema
<PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>
- The plugin name inside the pipeline configuration file must be all lower case.
For example, to set the alert mode inside the policy engine, the following environment variable is set:
export POLICYENGINE_MODE=alert
To set the syslog values for the exporter:
export EXPORTER_TYPE=telemetry
export EXPORTER_SOURCE=${HOSTNAME}
export EXPORTER_EXPORT=syslog
export EXPORTER_HOST=192.168.2.10
export EXPORTER_PORT=514
If running as a docker container, environment variables can be passed with the docker run command:
docker run
-e EXPORTER_TYPE=telemetry \
-e EXPORTER_SOURCE=${HOSTNAME} \
-e EXPORTER_EXPORT=syslog \
-e EXPORTER_HOST=192.168.2.10 \
-e EXPORTER_PORT=514
…
Writing runtime policies
The policy engine adopts and extends the Falco rules definition syntax. Before reading the rest of this section, please go through the Falco Rules documentation to get familiar with rule, macro, and list syntax, all of which are supported in our policy engine. Policies are written in one or more yaml
files, and stored in a directory specified in the pipeline configuration file under the policies
attribute of the policy engine plugin.
Rules contain the following fields:
- rule: the name of the rule
- description: a textual description of the rule
- condition: a set of logical operations that can reference lists and macros, which when evaluating to true, can trigger an alert (when processor is in alert mode), or filter a sysflow record (when processor is in filter mode)
- action: a list of actions to take place when the rule evaluates to true. Actions can be any of the following (note: new actions will be added in the future):
- alert: processor outputs an alert
- tag: enriches or tags the sysflow record with the labels in the
tags
field. This can be useful for semantically labeling of records with TTPs for example.
- priority: label representing the severity of the alert can be: (1) low, medium, or high, or (2) emergency, alert, critical, error, warning, notice, informational, debug.
- tags (optional): set of labels appended to alert (default: empty).
- prefilter (optional): list of record types (
sf.type
) to whitelist before applying rule condition (default: empty). - enabled (optional): indicates whether the rule is enabled (default: true).
Macros are named conditions and contain the following fields:
- macro: the name of the macro
- condition: a set of logical operations that can reference lists and macros, which evaluate to true or false
Lists are named collections and contain the following fields:
- list: the name of the list
- items: a collection of values or lists
Filters blacklist records matching a condition:
- filter: the name of the filter
- condition: a set of logical operations that can reference lists and macros, which evaluate to true or false
For example, the rule below specifies that matching records are process events (sf.type = PE
), denoting EXEC
operations (sf.opflags = EXEC
) for which the process matches macro package_installers
. Additionally, the gloabl filter containers
preempitively removes from the processing stream any records for processes not running in a container environment.
# lists
– list: rpm_binaries
items: [dnf, rpm, rpmkey, yum, ‘”75-system-updat”‘, rhsmcertd-worke, subscription-ma,
repoquery, rpmkeys, rpmq, yum-cron, yum-config-mana, yum-debug-dump,
abrt-action-sav, rpmdb_stat, microdnf, rhn_check, yumdb]
– list: deb_binaries
items: [dpkg, dpkg-preconfigu, dpkg-reconfigur, dpkg-divert, apt, apt-get, aptitude,
frontend, preinst, add-apt-reposit, apt-auto-remova, apt-key,
apt-listchanges, unattended-upgr, apt-add-reposit]
– list: package_mgmt_binaries
items: [rpm_binaries, deb_binaries, update-alternat, gem, pip, pip3, sane-utils.post, alternatives, chef-client]
#macros
– macro: package_installers
condition: sf.proc.name pmatch (package_mgmt_binaries)
#global filters (blacklisting)
– filter: containers
condition: sf.container.type = host
#rule definitions
– rule: Package installer detected
desc: Use of package installer detected
condition: sf.opflags = EXEC and package_installers
action: [alert]
priority: medium
tags: [actionable-offense, suspicious-process]
prefilter: [PE] # record types for which this rule should be applied (whitelisting)
enabled: true
The following table shows a detailed list of attribute names supported by the policy engine, as well as their type, and comparative Falco attribute name. Our policy engine supports both SysFlow and Falco attribute naming convention to enable reuse of policies across the two frameworks.
Attributes | Description | Values | Falco Attribute |
---|---|---|---|
sf.type | Record type | PE,PF,NF,FF,FE | N/A |
sf.opflags | Operation flags | Operation Flags List: remove OP_ prefix | evt.type (remapped as falco event types) |
sf.ret | Return code | int | evt.res |
sf.ts | start timestamp(ns) | int64 | evt.time |
sf.endts | end timestamp(ns) | int64 | N/A |
sf.proc.pid | Process PID | int64 | proc.pid |
sf.proc.tid | Thread PID | int64 | thread.tid |
sf.proc.uid | Process user ID | int | user.uid |
sf.proc.user | Process user name | string | user.name |
sf.proc.gid | Process group ID | int | group.gid |
sf.proc.group | Process group name | string | group.name |
sf.proc.apid | Proc ancestors PIDs (qo) | int64 | proc.apid |
sf.proc.aname | Proc anctrs names (qo) (exclude path) | string | proc.aname |
sf.proc.exe | Process command/filename (with path) | string | proc.exe |
sf.proc.args | Process command arguments | string | proc.args |
sf.proc.name | Process name (qo) (exclude path) | string | proc.name |
sf.proc.cmdline | Process command line (qo) | string | proc.cmdline |
sf.proc.tty | Process TTY status | boolean | proc.tty |
sf.proc.entry | Process container entrypoint | bool | proc.vpid == 1 |
sf.proc.createts | Process creation timestamp (ns) | int64 | N/A |
sf.pproc.pid | Parent process ID | int64 | proc.ppid |
sf.pproc.gid | Parent process group ID | int64 | N/A |
sf.pproc.uid | Parent process user ID | int64 | N/A |
sf.pproc.group | Parent process group name | string | N/A |
sf.pproc.tty | Parent process TTY status | bool | N/A |
sf.pproc.entry | Parent process container entry | bool | N/A |
sf.pproc.user | Parent process user name | string | N/A |
sf.pproc.exe | Parent process command/filename | string | N/A |
sf.pproc.args | Parent process command arguments | string | N/A |
sf.pproc.name | Parent process name (qo) (no path) | string | proc.pname |
sf.pproc.cmdline | Parent process command line (qo) | string | proc.pcmdline |
sf.pproc.createts | Parent process creation timestamp | int64 | N/A |
sf.file.fd | File descriptor number | int | fd.num |
sf.file.path | File path | string | fd.name |
sf.file.newpath | New file path (used in some FileEvents) | string | N/A |
sf.file.name | File name (qo) | string | fd.filename |
sf.file.directory | File directory (qo) | string | fd.directory |
sf.file.type | File type | char ‘f’: file, 4: IPv4, 6: IPv6, ‘u’: unix socket, ‘p’: pipe, ‘e’: eventfd, ‘s’: signalfd, ‘l’: eventpoll, ‘i’: inotify, ‘o’: unknown. | fd.typechar |
sf.file.is_open_write | File open with write flag (qo) | bool | evt.is_open_write |
sf.file.is_open_read | File open with read flag (qo) | bool | evt.is_open_read |
sf.file.openflags | File open flags | int | evt.args |
sf.net.proto | Network protocol | int | fd.l4proto |
sf.net.sport | Source port | int | fd.sport |
sf.net.dport | Destination port | int | fd.dport |
sf.net.port | Src or Dst port (qo) | int | fd.port |
sf.net.sip | Source IP | int | fd.sip |
sf.net.dip | Destination IP | int | fd.dip |
sf.net.ip | Src or dst IP (qo) | int | fd.ip |
sf.res | File or network resource | string | fd.name |
sf.flow.rbytes | Flow bytes read/received | int64 | evt.res |
sf.flow.rops | Flow operations read/received | int64 | N/A |
sf.flow.wbytes | Flow bytes written/sent | int64 | evt.res |
sf.flow.wops | Flow bytes written/sent | int64 | N/A |
sf.container.id | Container ID | string | container.id |
sf.container.name | Container name | string | container.name |
sf.container.image.id | Container image ID | string | container.image.id |
sf.container.image | Container image name | string | container.image |
sf.container.type | Container type | CT_DOCKER, CT_LXC, CT_LIBVIRT_LXC, CT_MESOS, CT_RKT, CT_CUSTOM, CT_CRI, CT_CONTAINERD, CT_CRIO, CT_BPM | container.type |
sf.container.privileged | Container privilege status | bool | container.privileged |
sf.node.id | Node identifier | string | N/A |
sf.node.ip | Node IP address | string | N/A |
sf.schema.version | SysFlow schema version | string | N/A |
sf.version | SysFlow JSON schema version | int | N/A |
The policy language supports the following operations:
Operation | Description | Example |
---|---|---|
A and B | Returns true if both statements are true | sf.pproc.name=bash and sf.pproc.cmdline contains echo |
A or B | Returns true if one of the statements are true | sf.file.path = “/etc/passwd” or sf.file.path = “/etc/shadow” |
not A | Returns true if the statement isn’t true | not sf.pproc.exe = /usr/local/sbin/runc |
A = B | Returns true if A exactly matches B. Note, if B is a list, A only has to exact match one element of the list. If B is a list, it must be explicit. It cannot be a variable. If B is a variable use in instead. | sf.file.path = [“/etc/passwd”, “/etc/shadow”] |
A != B | Returns true if A is not equal to B. Note, if B is a list, A only has to be not equal to one element of the list. If B is a list, it must be explicit. It cannot be a variable. | sf.file.path != “/etc/passwd” |
A < B | Returns true if A is less than B. Note, if B is a list, A only has to be less than one element in the list. If B is a list, it must be explicit. It cannot be a variable. | sf.flow.wops < 1000 |
A <= B | Returns true if A is less than or equal to B. Note, if B is a list, A only has to be less than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable. | sf.flow.wops <= 1000 |
A > B | Returns true if A is greater than B. Note, if B is a list, A only has to be greater than one element in the list. If B is a list, it must be explicit. It cannot be a variable. | sf.flow.wops > 1000 |
A >= B | Returns true if A is greater than or equal to B. Note, if B is a list, A only has to be greater than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable. | sf.flow.wops >= 1000 |
A in B | Returns true if value A is an exact match to one of the elements in list B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list. | sf.proc.exe in (bin_binaries, usr_bin_binaries) |
A startswith B | Returns true if string A starts with string B | sf.file.path startswith ‘/home’ |
A endswith B | Returns true if string A ends with string B | sf.file.path endswith ‘.json’ |
A contains B | Returns true if string A contains string B | sf.pproc.name=java and sf.pproc.cmdline contains org.apache.hadoop |
A icontains B | Returns true if string A contains string B ignoring capitalization | sf.pproc.name=java and sf.pproc.cmdline icontains org.apache.hadooP |
A pmatch B | Returns true if string A partial matches one of the elements in B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list. | sf.proc.name pmatch (modify_passwd_binaries, verify_passwd_binaries, user_util_binaries) |
exists A | Checks if A is not a zero value (i.e. 0 for int, “” for string) | exists sf.file.path |
See the resources policies directory in github for examples. Feel free to contribute new and interesting rules through a github pull request.
Write a simple processing plugin
In addition to its core plugins, the processor also supports custom plugins that can be dynamically loaded into the processor via a compiled golang shared library using the golang plugin package. Custom plugins enables easy extension of the processor and the creation of custom pipelines tailored to specific use cases.
A dynamic plugin example is provided in github. The core of the plugin is building an object that implements an SFProcessor interface. Such an implementation looks as follows:
package main
import (
“sync”
“github.com/sysflow-telemetry/sf-apis/go/logger”
“github.com/sysflow-telemetry/sf-apis/go/plugins”
“github.com/sysflow-telemetry/sf-processor/core/flattener”
)
const (
pluginName string = “example”
)
// Plugin exports a symbol for this plugin.
var Plugin Example
// Example defines an example plugin.
type Example struct{}
// NewExample creates a new plugin instance.
func NewExample() plugins.SFProcessor {
return new(Example)
}
// GetName returns the plugin name.
func (s *Example) GetName() string {
return pluginName
}
// Init initializes the plugin with a configuration map.
func (s *Example) Init(conf map[string]string) error {
return nil
}
// Register registers plugin to plugin cache.
func (s *Example) Register(pc plugins.SFPluginCache) {
pc.AddProcessor(pluginName, NewExample)
}
// Process implements the main interface of the plugin.
func (s *Example) Process(ch interface{}, wg sync.WaitGroup) { cha := ch.(flattener.FlatChannel)
record := cha.In
logger.Trace.Println(“Example channel capacity:”, cap(record))
defer wg.Done()
logger.Trace.Println(“Starting Example”)
for {
fc, ok := <-record
if !ok {
logger.Trace.Println(“Channel closed. Shutting down.”)
break
}
logger.Info.Println(fc)
}
logger.Trace.Println(“Exiting Example”)
}
// SetOutChan sets the output channel of the plugin.
func (s *Example) SetOutChan(ch interface{}) {}
// Cleanup tears down plugin resources.
func (s *Example) Cleanup() {}
// This function is not run when module is used as a plugin.
func main() {}
The object must implement the following interface:
GetName()
– returns a lowercase string representing the plugin’s label. This label is important, because it identifies the plugin in thepipeline.json
file, enabling the processor to load the plugin. In the object above, this plugin is calledexample
. Note that the label must be unique.Init(config map[string]string) error
– used to initialize the plugin. The configuration map that is passed to the function stores all the configuration information defined in the plugin’s definition insidepipeline.json
(more on this later).Register(pc plugins.SFPluginCache)
– this registers the plugin with the plugin cache of the processor.pc.AddProcessor(pluginName, <plugin constructor function>)
(required) – registers the plugin namedexample
with the processor. You must define a constructor function using the conventionNew<PluginName>
which is used to instantiate the plugin, and returns it as anSFProcessor
interface – seeNewExample
for an example.pc.AddChannel(channelName, <output channel constructor function>)
(optional) – if your plugin is using a custom output channel of objects (i.e., the channel used to pass output objects from this plugin to the next in the pipeline), it should be included in this plugin.- The
channelName
should be a lowercase unique label defining the channel type. - The constructor function should return a golang
interface{}
representing an object that as anIn
attribute of typechan <ObjectToBePassed>
. We will call this object, a wrapped channel object going forward. For example, the channel object that passes sysflow objects is called SFChannel, and is defined here - For a complete example of defining an output channel, see
NewFlattenerChan
in the flattener as well as theRegister
function. TheFlatChannel
is defined here
- The
Process(ch interface{}, wg *sync.WaitGroup)
– this function is launched by the processor as a go thread and is where the main plugin processing occurs. It takes a wrapped channel object, which acts as the input data source to the plugin (i.e., this is the channel that is configured as the input channel to the plugin in the pipeline.json). It also takes a sync.WaitGroup object, which is used to signal to the processor when the plugin has completed running (seedefer wg.Done()
in code). The processor must loop on the input channel, and do its analysis on each input record. In this case, the example plugin is reading flat records and printing them to the screen.SetOutChan(ch interface{})
– sets the wrapped channel that will serve as the output channel for the plugin. The output channel is instantiated by the processor, which is also in charge of stitching the plugins together. If the plugin is the last one in the chain, then this function can be left empty. See theSetOutputChan
function in the flattener to see how an output channel is implemented.Cleanup()
– Used to cleanup any resources. This function is called by the processor after the pluginProcess
function exits. One of the key items to close in theCleanup
function is the output channel using the golangclose()
function. Closing the output channel enables the pipeline to be torn down gracefully and in sequence.main(){}
– this main method is not used by the plugin or processor. It’s required by golang in order to be able to compile as a shared object.
To compile the example plugin, use the provided Makefile:
cd plugins/example
make
This will build the plugin and copy it into resources/plugins/
.
To use the new plugin, use the configuration provided in github, which defines the following pipeline:
{
“pipeline”:[
{
“processor”: “sysflowreader”,
“handler”: “flattener”,
“in”: “sysflow sysflowchan”,
“out”: “flat flattenerchan”
},
{
“processor”: “example”,
“in”: “flat flattenerchan”
}
]
}
This pipeline contains two plugins:
- The builtin
sysflowReader
plugin with flattener handler, which takes raw sysflow objects, and flattens theminto arrays of integers and strings for easier processing in certain plugins like the policy engine. - The
example
plugin, which takes the flattened output from the sysflowreader plugin, and prints it the screen.
The key item to note is that the output channel (i.e., out
) of sysflowreader
matches the input channel (i.e., in
) of the example plugin. This ensures that the plugins will be properly stitched together.
To run the example pipeline:
cd driver ./sfprocessor -config ../plugins/example/pipeline.example.json -plugdir ../resources/plugins/ ../resources/traces/mon.1531776712.sf
Deploy collector and processor using docker
The following docker-compose file illustrates how to deploy the processor and the collector as containers.
version: “3.5”
services:
sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: false
volumes:
– socket-vol:/sock/
environment:
DRIVER: socket
INPUT_PATH: /sock/sysflow.sock
POLICYENGINE_MODE: alert
EXPORTER_TYPE: telemetry
EXPORTER_SOURCE: sysflow
EXPORTER_EXPORT: syslog
EXPORTER_HOST:
EXPORTER_PORT: 514
sf-collector:
container_name: sf-collector
image: sysflowtelemetry/sf-collector:latest
depends_on:
– “sf-processor”
privileged: true
volumes:
– /var/run/docker.sock:/host/var/run/docker.sock
– /dev:/host/dev
– /proc:/host/proc:ro
– /boot:/host/boot:ro
– /lib/modules:/host/lib/modules:ro
– /usr:/host/usr:ro
– /mnt/data:/mnt/data
– socket-vol:/sock/
– ./resources/traces:/tests/traces
environment:
EXPORTER_ID: ${HOSTNAME}
NODE_IP:
FILTER: “container.name!=sf-collector and container.name!=sf-processor”
INTERVAL: 300
SOCK_FILE: /sock/sysflow.sock
volumes:
socket-vol:
Setting up the collector environment
The key setting in the collector portion of the file is the FILTER
variable. Since the collector is built atop the sysdig core, it uses the sysdig filtering mechanism described here and can support all the sysdig attributes described there in case you want to filter on specific containers, processes, operations, etc. One of the more powerful filters is the container.type!=host
filter, which limits collection only to container monitoring. If you want to monitor the entire host, simply remove the container.type
operation from the filter.
Setting up the processor environment
As mentioned in a previous section, all custom plugin attributes can be set using the following: <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>
format. Note that the docker compose file sets several attributes including EXPORTER_TYPE
, EXPORTER_HOST
and EXPORTER_PORT
. Note that EXPORTER_SOURCE
is set to the bash environment variable ${HOSTNAME}
. HOSTNAME
must be explicitly exported before launching docker compose in order to be picked up.
export HOSTNAME docker-compose up
The following are the default locations of the pipeline configuration and plugins directory:
- pipeline.json –
/usr/local/sysflow/conf/pipeline.json
- plugins dir –
/usr/local/sysflow/resources/plugins
We can overwrite these particular files/dirs in the docker container with those on the host by setting up a virtual mounts mapping the host directories/files into the container using the volumes section of the sf-processor in the docker-compose.yaml.
sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: true
volumes:
– /var/run/docker.sock:/var/run/docker.sock
– socket-vol:/sock/
– ./resources/pipelines/pipeline.runtimeintegrity.json:/usr/local/sysflow/conf/pipeline.json
– ./resources/plugins:/usr/local/sysflow/resources/plugins
If using the policy engine, the policy folder defaults to the following location in the container:
/usr/local/sysflow/resources/policies/
This location can be overwritten by setting the POLICYENGINE_POLICIES
environment variable.
The docker container uses a default filter.yaml
policy that outputs sysflow records in json. You can use your own policy files from the host by mounting your policy directory into the container as follows:
sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: true
volumes:
– /var/run/docker.sock:/var/run/docker.sock
– socket-vol:/sock/
– ./resources/policies/runtimeintegrity/:/usr/local/sysflow/resources/policies/
SysFlow Exporter (sf-exporter repo)
SysFlow exporter to export SysFlow traces to S3-compliant object stores.
Note: For remote syslogging and other export formats and connectors, check the SysFlow processor project.
Build
This document describes how to build and run the application both inside a docker container and on a Linux host. Building and running the application inside a docker container is the easiest way to start. For convenience, skip the build step and pull pre-built images directly from Docker Hub.
To build the project, first clone the source code, with submodules:
git clone –recursive git@github.com:sysflow-telemetry/sf-exporter.git
To checkout submodules on an already cloned repo:
git submodule update –init –recursive
To build the docker image for the exporter locally, run:
docker build -t sf-exporter .
Docker usage
The easiest way to run the SysFlow exporter is from a Docker container, with host mount for the trace files to export. The following command shows how to run sf-exporter with trace files located in /mnt/data
on the host.
docker run -d –rm –name sf-exporter \
-e S3_ENDPOINT= \
-e S3_BUCKET= \
-e S3_ACCESS_KEY= \
-e S3_SECRET_KEY= \
-e NODE_IP=$HOSTNAME \
-e INTERVAL=150
-v /mnt/data:/mnt/data \
sysflowtelemetry/sf-exporter
The exporter is usually executed as a pod or docker-compose service together with the SysFlow collector. The exporter automatically removes exported files from the local filesystem it monitors. See the SysFlow deployments packages for more information.
Development
To build the exporter locally, run:
cd src & pip3 install -r requirements.txt
cd modules/sysflow/py3 & sudo python3 setup.py install
To run the exporter from the command line:
./exporter.py -h
usage: exporter.py [-h] [–exporttype {s3,local}] [–s3endpoint S3ENDPOINT]
[–s3port S3PORT] [–s3accesskey S3ACCESSKEY]
[–s3secretkey S3SECRETKEY] [–s3bucket S3BUCKET]
[–s3location S3LOCATION] [–s3prefix S3PREFIX]
[–secure [SECURE]] [–scaninterval SCANINTERVAL]
[–timeout TIMEOUT] [–agemin AGEMIN] [–dir DIR]
[–todir TODIR] [–nodename NODENAME] [–nodeip NODEIP]
[–podname PODNAME] [–podip PODIP]
[–podservice PODSERVICE] [–podns PODNS]
[–poduuid PODUUID] [–clusterid CLUSTERID]
sf-exporter: service for watching and uploading monitoring files to object
store.
optional arguments:
-h, –help show this help message and exit
–exporttype {s3,local}
export type
–s3endpoint S3ENDPOINT
s3 server address
–s3port S3PORT s3 server port
–s3accesskey S3ACCESSKEY
s3 access key
–s3secretkey S3SECRETKEY
s3 secret key
–s3bucket S3BUCKET target data bucket
–s3location S3LOCATION
target data bucket location
–s3prefix S3PREFIX exporter’s: static prefix directory for s3 bucket
–secure [SECURE] indicates if SSL connection
–scaninterval SCANINTERVAL
interval between scans
–timeout TIMEOUT connection timeout
–agemin AGEMIN number of minutes of traces to preserve in case of
repeated timeouts
–dir DIR data directory
–todir TODIR data directory
–nodename NODENAME exporter’s node name
–nodeip NODEIP exporter’s node IP
–podname PODNAME exporter’s pod name
–podip PODIP exporter’s pod IP
–podservice PODSERVICE
exporter’s pod service
–podns PODNS exporter’s pod namespace
–poduuid PODUUID exporter’s: pod UUID
–clusterid CLUSTERID
exporter’s: cluster ID