SysFlow Telemetry Pipeline is a framework for monitoring cloud and enterprise workloads. The framework builds the plumbing required for system telemetry so that users can focus on writing and sharing analytics on a scalable, common open-source platform.

The backbone of the telemetry pipeline is a new data format which lifts raw system event information into an abstraction that describes process behaviors, and their relationships with containers, files, and network activity. This object-relational format is highly compact, yet it provides broad visibility into legacy endpoints and container clouds.

The platform is designed as a pluggable edge processing architecture which includes a policy engine that accepts declarative policies that support edge filtering, tagging, and alerting on SysFlow streams. It also offers several APIs that allow users to process SysFlow with their favorite toolkits.

The pipeline can be deployed using Docker, Kubernetes, OpenShift, and bare metal/VMs. The SysFlow agent can be configured as an edge analytics pipeline to stream SysFlow records through rsyslog, or as a batch exporter of raw SysFlow traces to S3-compatible object stores.

An integrated Jupyter environment makes it easy to perform log hunting on collected traces. There are also Apache Avro schema files for SysFlow so that users can generate APIs for other programming languages. C++, Python, and Golang APIs are available, allowing users to interact with SysFlow traces programmatically.

Quick Start

We encourage you to check the documentation first, but here are a few tips for a quick start.

Starting the collection probe

The easiest way to run the SysFlow collector is from a Docker container, with host mount for the output trace files. The following command shows how to run sf-collector with trace files exported to /mnt/data on the host.

docker run -d –privileged –name sf-collector \
-v /var/run/docker.sock:/host/var/run/docker.sock \
-v /dev:/host/dev -v /proc:/host/proc:ro \
-v /boot:/host/boot:ro -v /lib/modules:/host/lib/modules:ro \
-v /usr:/host/usr:ro -v /mnt/data:/mnt/data \
-e INTERVAL=60 \
-e EXPORTER_ID=${HOSTNAME} \
-e OUTPUT=/mnt/data/ \
-e FILTER=”container.name!=sf-collector and container.name!=sf-exporter” \
–rm sysflowtelemetry/sf-collector

where INTERVAL denotes the time in seconds before a new trace file is generated, EXPORTER_ID sets the exporter name, OUTPUT is the directory in which trace files are written, and FILTER is the filter expression used to filter collected events. Note: append container.type!=host to FILTER expression to filter host events.

Deployment options

The SysFlow agent can be deployed in S3 (batch) or rsyslog (edge processing) export configurations. In the batch configuration, SysFlow exports the collected telemetry as trace files (batches of SysFlow records) to any S3-compliant object storage service.

In edge processing configuration, SysFlow exports the collected telemetry as events streamed to a rsyslog collector. This deployment enables the creation of customized edge pipelines, and offers a built-in policy engine to filter, enrich, and alert on SysFlow records.

Instructions for Docker ComposeHelm, and OpenShift deployments of complete SysFlow stacks are available here.

Inspecting Collected Traces

A command line utilitiy is provided for inspecting collected traces or convert traces from SysFlow’s compact binary format into human-readable JSON or CSV formats.

docker run –rm -v /mnt/data:/mnt/data sysflowtelemetry/sysprint /mnt/data/

where trace is the the name of the trace file inside /mnt/data. If empty, all files in /mnt/data are processed. By default, the traces are printed to the standard output with a default set of SysFlow attributes. For a complete list of options, run:

docker run --rm -v /mnt/data:/mnt/data sysflowtelemetry/sysprint  -h

Analyzing Collected Traces

A Jupyter environment is also available for inspecting and implementing analytic notebooks on collected SysFlow data. It includes APIs for data manipulation using Pandas dataframes and a native query language (sfql) with macro support. To start it locally with example notebooks, run:

git clone https://github.com/sysflow-telemetry/sf-apis.git && cd sf-apis
docker run –rm -d –name sfnb –user $(id -u):$(id -g) –group-add users -v $(pwd)/pynb:/home/jovyan/work -p 8888:8888 sysflowtelemetry/sfnb

Then, open a web browser and point it to http://localhost:8888 (alternatively, the remote server name or IP where the notebook is hosted). To obtain the notebook authentication token, run docker logs sfnb.

SysFlow Specification

SysFlow is an open specification for system event-level telemetry. The main goal of SysFlow is to create a standard and extensible data format for both security and performance analytics for compute workloads. An open standard will enable researchers and practitioners to more easily work on a common data format, and focus on analytics using open source software.

The primary objective of SysFlow is to lift raw system call data into more semantic process behaviors which promote significant data reductions for longer term forensic storage of data which is crucial for security analyzes. Through an object relational model of entities, events and flows, we enable SysFlow users to configure the desired granularity of data collection and filtering in order to facilitate most types of analysis in big data frameworks.

Overview

SysFlow is an object relational model of entities, events and flows that describe the behaviors of processes on a system, and encode them into an open format. A SysFlow exporter is designed to monitor system events of a workload, convert them to SysFlow objects, and output them in a binary output file. We envision that one exporter will be deployed per host (or Virtual Machine) and will output one binary file over a particular time period. Figure 1 show a detailed view of the objects that the SysFlow exporter will export.

Entities represent the components on a system that we are interested in monitoring. In this version of SysFlow, we support three types of entities: Containers, Processes, and Files. As shown in Figure 1, Containers contain both Processes and Files, and the three are linked through object identifiers (more on this later).

Entity behaviors are modeled as events or flows. Events represent important individual behaviors of an entity that are broken out on their own due to their importance, their rarity, or because maintaining operation order is important. An example of an event would be a process clone or exec, or the deletion or renaming of a file. By contrast, a Flow represents an aggregation of multiple events that naturally fit together to describe a particular behavior. For example, we can model the network interactions of a process and a remote host as a bidirectional flow that is composed of several events, including connect, read, write, and close.

The idea behind SysFlow is to enable the user to configure the granularity of system-level data desired based on resource limitations and data analytics requirements. In this way, behaviors can be broken out into individual events or combined into smaller aggregated volumetric flows. The current version of the specification describes events and flows in three key behavioral areas: Files, Networks, and Processes. Figure 1 shows these events and flows with their attributes and relationships to entities, which are described in greater details in the following sections

Entities

As mentioned above, entities are the components on a system that we are interested in monitoring. These include containers, processes, and files. We also support a special entity object called a Header, which stores information about the SysFlow version, and a unique ID representing the host or virtual machine monitored by the SysFlow exporter. The header is always the first record appearing in a SysFlow File. All other entities contain a timestamp, an object ID and a state. The timestamp is used to indicate the time at which the entity was exported to the SysFlow file.

Object ID

Object IDs allow events and flows to reference entities without having duplicate information stored in each record. Object IDs are not required to be globally unique across space and time. In fact, the only requirement for uniqueness is that no two objects managed by a SysFlow exporter can have the same ID simultaneously. Entities are always written to the binary output file before any events, and flows associated with them are exported. Since entities are exported first, each event, and flow is matched with the entity (with the same id) that is closest to it in the file. Furthermore, every binary output file must be self-contained, meaning that all entities referenced by flows/events must be present in every SysFlow file generated.

State

The state is an enumeration that indicates why an entity was written to disk. The state can currently be one of three values:

StateDescription
CREATEDIndicates that the entity was recently created on the host/VM. For example, a process clone.
MODIFIEDIndicates that some attributes of the entity were modified since the last time it was exported.
REUPIndicates that the entity already existed, but is being exported again, so that output files can be self-contained.

Each entity is defined below with recommendations on what to use for object identifiers, based on what is used in the current implementation of the SysFlow exporter.

Header

The Header entity is an object which appears at the beginning of each binary SysFlow file. It contains the current version of SysFlow as supported in the file, and the exporter ID.

AttributeTypeDescriptionSince (schema version)
versionlongThe current SysFlow version.1
exporterstringGlobally unique id representing the host monitored by SysFlow.1
ipstringIP address in dot notation representing the monitored host.2

Container

The Container entity represents a system or application container such as docker or LXC. It contains important information about the container including its id, name, and whether it is privileged.

AttributeTypeDescriptionSince (schema version)
idstringUnique string representing the Container Object as provided by docker, LXC, etc.1
stateenumstate of the process (CREATED, MODIFIED, REUP).not implemented
timestamp (ts)int64The timestamp when container object is exported (nanoseconds).not implemented
namestringContainer name as provided by docker, LXC, etc.1
imagestringImage name associated with container as provided by docker, LXC, etc.1
imageIDstringImage ID associated with container as provided by docker, LXC, etc.1
typeenumCan be one of: CT_DOCKER, CT_LXC, CT_LIBVIRT_LXC, CT_MESOS, CT_RKT, CT_CUSTOM1
privilegedbooleanIf true, the container is running with root privileges1

Process

The process entity represents a running process on the system. It contains important information about the process including its host pid, creation time, oid id, as well as references to its parent id. When a process entity is exported to a SysFlow file, all its parent processes should be exported before the process, as well as the process’s Container entity. Processes are only exported to a SysFlow file if an event or flow associated with that process or any of its threads are exported. Threads are not explicitly exported in the process object but are represented in events and flows through a thread id field. Finally, a Process entity only needs to be exported to a file once, unless it’s been modified by an event or flow.

NOTE: In current implementation, the creation timestamp is the time at which the process is cloned. If the process was cloned before capture was started, this value is 0. The current implementation also has problems getting absolute paths for exes when relative paths are used to launch processes.

AttributeTypeDescriptionSince (schema version)
stateenumstate of the process (CREATED, MODIFIED, REUP)1
OID:
host pid
create ts
struct
int64
int64
The Process OID contains the host pid of the project, and creation timestamp.1
POID:
parent host pid
parent create ts
struct
int64
int64
The OID of the parent process can be NULL if not available or if a root process.1
timestamp (ts)int64The timestamp when process object is exported (nanoseconds).1
exestringFull path (if available) of the executable used in the process launch; otherwise, it’s the name of the exe.1
exeArgsstringConcatenated list of args passed on process startup.1
uidint32User ID under which the process is running.1
userNamestringUser name under which the process is running.1
gidint32Group ID under which the process is running1
groupNamestringGroup Name under which the process is running1
ttybooleanIf true, the process is tied to a shell1
containerIdstringUnique string representing the Container Object to which the process resides. It can be NULL if process isn’t in a container.1
entrybooleanIf true, the process is a container or system entrypoint (i.e., virtual pid = 1).2

File

The File entity represents file-based resources on a system including files, directories, unix sockets, and pipes.

NOTE: Current implementation does not have access to inode related values, which would greatly improve object ids. Also, the current implementation has some issues with absolute paths when monitoring operations that use relative paths.

AttributeTypeDescriptionSince (schema version)
stateenumstate of the file (CREATED, MODIFIED, REUP)1
FOID:string (128bit)File Identifier, is a SHA1 hash of the concatenation of the path + container ID1
timestamp (ts)int64The timestamp when file object is exported (nanoseconds).1
restypeenumIndicates the resource type. Currently support: SF_FILE, SF_DIR, SF_UNIX (unix socket), SF_PIPE, SF_UNKNOWN1
pathstringFull path of the file/directory, or unique identifier for pipe, unix socket1
containerIdstringUnique string representing the Container Object to which the file resides. Can be NULL if file isn’t in a container.1

Events

Events represent important individual behaviors of an entity that are broken out on their own due to their importance, their rarity, or because maintaining operation order is important. In order to manage events and their differing attributes, we divide them into three different categories: Process, File, and Network events. These are described more in detail later on.

Each event and flow contains a process object id, a timestamp, a thread id, and a set of operation flags. The process object id represents the Process Entity on which the event occurred, while the thread id indicates which process thread was associated with the event.

Operation Flags

The operation flags describe the actual behavior associated with the event (or flow). The flags are represented in a single bitmap which enables multiple behaviors to be combined easily into a flow. An event will have a single bit active, while a flow could have several. The current supported flags are as follows:

OperationNumeric IDDescriptionSystem CallsEvts/Flows SupportedSince (schema version)
OP_CLONE(1 << 0)Process or thread cloned.clone()ProcessEvent1
OP_EXEC(1 << 1)Execution of a fileexecve()ProcessEvent1
OP_EXIT(1 << 2)Process or thread exit.exit()ProcessEvent1
OP_SETUID(1 << 3)UID of process was changedsetuid(), setresuidProcessEvent1
OP_SETNS(1 << 4)Process entering namespacesetns()FileFlow1
OP_ACCEPT(1 << 5)Process accepting network connectionsaccept(), select()NetworkFlow1
OP_CONNECT(1 << 6)Process connecting to remote host or processconnect()NetworkFlow1
OP_OPEN(1 << 7)Process opening a file/resourceopen(), openat(), create()FileFlow1
OP_READ_RECV(1 << 8)Process reading from file, receiving network dataread(),pread(),recv(),recvfrom(),recvmsg()NetworkFlow, FileFlow1
OP_WRITE_SEND(1 << 9)Process writing to file, sending network datawrite(),pwrite(),send(),sendto(),sendmsg()NetworkFlow, FileFlow1
OP_CLOSE(1 << 10)Process close resourceclose(),socketshutdownNetworkFlow, FileFlow1
OP_TRUNCATE(1 << 11)Premature closing of a flow due to exporter shutdownN/ANetworkFlow, FileFlow1
OP_SHUTDOWN(1 << 12)Shutdown all or part of a full duplex socket connectionshutdown()NetworkFlow1
OP_MMAP(1 << 13)Memory map of a file.mmap()FileFlow1
OP_DIGEST(1 << 14)Summary flow information for long running flowsN/ANetworkFlow, FileFlow1
OP_MKDIR(1 << 15)Make directorymkdir(), mkdirat()FileEvent1
OP_RMDIR(1 << 16)Remove directoryrmdir()FileEvent1
OP_LINK(1 << 17)Process creates hard link to existing filelink(), linkat()FileEvent1
OP_UNLINK(1 << 18)Process deletes fileunlink(), unlinkat()FileEvent1
OP_SYMLINK(1 << 19)Process creates sym link to existing filesymlink(), symlinkat()FileEvent1
OP_RENAME(1 << 20)File renamedrename(), renameat()FileEvent1

Process Event

A Process Event is an event that creates or modifies a process in some way. Currently, we support four Process Events (referred to as operations), and their behavior in SysFlow is described below.

OperationBehavior
OP_CLONEExported when a new process or thread is cloned. A new Process Entity should be exported prior to exporting the clone operation of a new process.
OP_EXECExported when a process calls an exec syscall. This event will modify an existing process, and should be accompanied by a modified Process Entity.
OP_EXITExported on a process or thread exit.
OP_SETUIDExported when a process’s UID is changed. This event will modify an existing process, and should be accompanied by a modified Process Entity.

The list of attributes for the Process Event are as follows:

AttributeTypeDescriptionSince (schema version)
OID:
host pid
create ts
struct
int64
int64
The OID of the process for which the event occurred.1
timestamp (ts)int64The timestamp when the event occurred (nanoseconds).1
tidint64The id of the thread associated with the ProcessEvent. If the running process is single threaded tid == pid1
opFlagsint64The id of the syscall associated with the event. See list of Operation Flags for details.1
argsstring[]An array of arguments encoded as string for the syscall.Sparingly implemented. Only really used with setuid for now.
retint64Syscall return value.1

File Event

A File Event is an event that creates, deletes or modifies a File Entity. Currently, we support six File Events (referred to as operations), and their behavior in SysFlow is described below.

OperationBehavior
OP_MKDIRExported when a new directory is created. Should be accompanied by a new File Entity representing the directory
OP_RMDIRExported when a directory is deleted.
OP_LINKExported when a process creates a hard link to an existing file. Should be accompanied by a new File Entity representing the new link.
OP_UNLINKExported when a process deletes a file.
OP_SYMLINKExported when a process creates a sym link to an existing file. Should be accompanied by a new File Entity representing the new link.
OP_RENAMEExported when a process creates renames an existing file. Should be accompanied by a new File Entity representing the renamed file.

NOTE: We’d like to also support chmod and chown but these two operations are not fully supported in sysdig. We’d also like to support umount and mount but these operations are not implemented. We anticipate supporting these in a future version.

The list of attributes for the File Event are as follows:

AttributeTypeDescriptionSince (schema version)
OID:
host pid
create ts
struct
int64
int64
The OID of the process for which the event occurred.1
timestamp (ts)int64The timestamp when the event occurred (nanoseconds).1
tidint64The id of the thread associated with the FileEvent. If the running process is single threaded tid == pid1
opFlagsint64The id of the syscall associated with the event. See list of Operation Flags for details.1
retint64Syscall return value.1
FOID:string (128bit)The id of the file on which the system call was called. File Identifier, is a SHA1 hash of the concatenation of the path + container ID.1
NewFOID:string (128bit)Some syscalls (link, symlink, etc.) convert one file into another requiring two files. This id is the id of the file secondary or new file on which the system call was called. File Identifier, is a SHA1 hash of the concatenation of the path + container ID. Can be NULL.1

Network Event

Currently, not implemented.

Flows

A Flow represents an aggregation of multiple events that naturally fit together to describe a particular behavior. They are designed to reduce data and collect statistics. Examples of flows include an application reading or writing to a file, or sending and receiving data from another process or host. Flows represent a number of events occurring over a period of time, and as such each flow has a set of operations (encoded in a bitmap), a start and an end time. One can determine the operations in the flow by decoding the operation flags.

A flow can be started by any supported operation and are exported in one of two ways. First, they are exported on an exit, or close event signifying the end of a connection, file interaction, or process. Second, a long running flow is exported after a preconfigured time period. After a long running flow is exported, its counters and flags are reset. However, if there is no activity on the flow over a preconfigured period of time, that flow is no longer exported.

In this section, we describe three categories of Flows: Process, File and Network Flows.

Process Flow

A Process Flow represents a summarization of the number of threads created and destroyed over a time period. Process Flows are partially implemented in the collector and will be fully implemented in a later release. Since schema version 2. Currently we support the following operations:

OperationBehavior
OP_CLONERecorded when a new thread is cloned.
OP_EXITRecorded on a thread exit.

The list of attributes for the Process Flow are as follows:

AttributeTypeDescriptionSince (schema version)
OID:
host pid
create ts
struct
int64
int64
The OID of the process for which the flow occurred.2
timestamp (ts)int64The timestamp when the flow starts (nanoseconds).2
numThreadsClonedint64The number of threads cloned during the duration of the flow.2
opFlagsint64 (bitmap)The id of one or more syscalls associated with the ProcessFlow. See list of Operation Flags for details.2
endTsint64The timestamp when the process flow is exported (nanoseconds).2
numThreadsExitedint64Number of threads exited during the duration of the flow.2
numCloneErrorsint64Number of clone errors occuring during the duration of the flow.2

File Flow

A File Flow represents a collection of operations on a file. Currently we support the following operations:

OperationBehavior
OP_SETNSProcess entering namespace entry in mounted file related to reference File Entity
OP_OPENProcess opening a file/resource.
OP_READ_RECVProcess reading from file/resource.
OP_WRITE_SENDProcess writing to file.
OP_MMAPProcessing memory mapping a file.
OP_CLOSEProcess closing resource. This action will close corresponding FileFlow.
OP_TRUNCATEIndicates Premature closing of a flow due to exporter shutdown.
OP_DIGESTSummary flow information for long running flows (not implemented).

The list of attributes for the File Flow are as follows:

AttributeTypeDescriptionSince (schema version)
OID:
host pid
create ts
struct
int64
int64
The OID of the process for which the flow occurred.1
timestamp (ts)int64The timestamp when the flow starts (nanoseconds).1
tidint64The id of the thread associated with the flow. If the running process is single threaded tid == pid1
opFlagsint64 (bitmap)The id of one or more syscalls associated with the FileFlow. See list of Operation Flags for details.1
openFlagsint64Flags associated with an open syscall if present.1
endTsint64The timestamp when the file flow is exported (nanoseconds).1
FOID:string (128bit)The id of the file on which the system call was called. File Identifier, is a SHA1 hash of the concatenation of the path + container ID.1
fdint32The file descriptor associated with the flow.1
numRRecvOpsint64Number of read operations performed during the duration of the flow.1
numWSendOpsint64Number of write operations performed during the duration of the flow.1
numRRecvBytesint64Number of bytes read during the duration of the flow.1
numWSendBytesint64Number of bytes written during the duration of the flow.1

Network Flow

A Network Flow represents a collection of operations on a network connection. Currently we support the following operations:

OperationBehavior
OP_ACCEPTProcess accepted a new network connection.
OP_CONNECTProcess connected to a remote host or process.
OP_READ_RECVProcess receiving data from a remote host or process.
OP_WRITE_SENDProcess sending data to a remote host or process.
OP_SHUTDOWNProcess shutdown full or single duplex connections.
OP_CLOSEProcess closing network connection. This action will close corresponding NetworkFlow.
OP_TRUNCATEIndicates Premature closing of a flow due to exporter shutdown.
OP_DIGESTSummary flow information for long running flows (not implemented).

The list of attributes for the Network Flow are as follows:

AttributeTypeDescriptionSince (schema version)
OID:
host pid
create ts
struct
int64
int64
The OID of the process for which the flow occurred.1
timestamp (ts)int64The timestamp when the flow starts (nanoseconds).1
tidint64The id of the thread associated with the flow. If the running process is single threaded tid == pid1
opFlagsint64 (bitmap)The id of one or more syscalls associated with the flow. See list of Operation Flags for details.1
endTsint64The timestamp when the flow is exported (nanoseconds).1
sipint32The source IP address.1
sportint16The source port.1
dipint32The destination IP address.1
dportint16The destination port.1
protoenumThe network protocol of the flow. Can be: TCP, UDP, ICMP, RAW1
numRRecvOpsint64Number of receive operations performed during the duration of the flow.1
numWSendOpsint64Number of send operations performed during the duration of the flow.1
numRRecvBytesint64Number of bytes received during the duration of the flow.1
numWSendBytesint64Number of bytes sent during the duration of the flow.

SysFlow Collector (sf-collector repo)

The SysFlow Collector monitors and collects system call and event information from hosts and exports them in the SysFlow format using Apache Avro object serialization. SysFlow lifts system call information into a higher order object relational form that models how containers, processes and files interact with their environment through process control flow, file, and network operations. Learn more about SysFlow in the SysFlow Specification Document.

The SysFlow Collector is currently built upon a Sysdig core and requires the Sysdig probe to passively collect system events and turn them into SysFlow. As a result, the collector supports Sysdig’s powerful filtering capabilities. Please see the build and installation instructions for installing the collector.

Installation And Usage

Installing the collector

Cloning source

The sf-collector project has been tested primarily on Ubuntu 16.04 and 18.04. The project will be tested on other flavors of UNIX in the future. This document describes how to build and run the application both inside a docker container and on a linux host. Building and running the application inside a docker container is the easiest way to start. For convenience, skip the build step and pull pre-built images directly from Docker Hub.

To build the project, first pull down the source code, with submodules:

git clone –recursive https://github.com/sysflow-telemetry/sf-collector.git

To checkout submodules on an already cloned repo:

git submodule update --init --recursive

Building as Docker container

To build as docker container:

cd sf-collector
make -C modules init
docker build –target runtime -t sf-collector .

The container is built in stages to enable caching of the intermediate steps of the build and reduce final image sizes.

Building directly on a host

First, install required dependencies:

apt install patch base-files binutils bzip2 libdpkg-perl perl make xz-utils libncurses5-dev libncursesw5-dev cmake libboost-all-dev g++ flex bison wget libelf-dev liblog4cxx-dev libapr1 libaprutil1 libsparsehash-dev libsnappy-dev libgoogle-glog-dev libjsoncpp-dev

To build the collector:

cd sf-collector
make install

Running the collector

Running the collector from the command line

The collector has the following options:

Usage: sysporter [options] -w
Options:
-h Show this help message and exit
-w file name/dir (required) The file or directory to which sysflow records are written. If a directory is specified (using a trailing slash), file name will be an epoch timestamp. If -G is specified, then the file name specified will have an epoch timestamp appended to it
-e exporterID A globally unique ID representing the host or VM being monitored which is stored in the sysflow dumpfile header. If -e not set, the hostname of the CURRENT machine is used, which may not be accurate for reading offline scap files
-G interval (in secs) Rotates the dumpfile specified in -w every interval seconds and appends epoch timestamp to file name
-r scap file The scap file to be read and dumped as sysflow format at the file specified by -w. If this option is not specified, a live capture is assumed
-s schema file The sysflow avro schema file (.avsc) used for schema validation (default: /usr/local/sysflow/conf/SysFlow.avsc)
-f filter Sysdig style filtering string to filter scap. Must be surrounded by quotes
-c Simple, fast filter to allow only container-related events to be dumped
-p cri-o path The path to the cri-o domain socket
-t cri-o timeout The amount of time in ms to wait for cri-o socket to respond
-u domain socket file Outputs SysFlow to a unix domain socket rather than to a file
-v Print version information and exit

Example usage

Convert Sysdig scap file to SysFlow file with an export id. The output will be written to output.sf. Note that the collector must be run with root privilege:

sysporter -r input.scap -w ./output.sf -e host

Trace a system live, and output SysFlow to files in a directory which are rotated every 30 seconds. The file name will be an epoch timestamp of when the file was initially written. Note that the trailing slash must be present. The example filter ensures that only SysFlow from containers is generated.

sysporter -G 30 -w ./output/ -e host -f “container.type!=host and container.type=docker”

Running the collector from a Docker container

The easiest way to run the SysFlow collector is from a Docker container, with host mount for the output trace files. The following command shows how to run sf-collector with trace files exported to /mnt/data on the host.

docker run -d –privileged –name sf-collector \
-v /var/run/docker.sock:/host/var/run/docker.sock \
-v /dev:/host/dev -v /proc:/host/proc:ro \
-v /boot:/host/boot:ro -v /lib/modules:/host/lib/modules:ro \
-v /usr:/host/usr:ro -v /mnt/data:/mnt/data \
-e INTERVAL=60 \
-e EXPORTER_ID=${HOSTNAME} \
-e OUTPUT=/mnt/data/ \
-e FILTER=”container.name!=sf-collector and container.name!=sf-exporter” \
–rm sysflowtelemetry/sf-collector

where INTERVAL denotes the time in seconds before a new trace file is generated, EXPORTER_ID sets the exporter name, OUTPUT is the directory in which trace files are written, and FILTER is the filter expression used to filter collected events. Note: append container.type!=host to FILTER expression to filter host events.

CRI-O support

The sf-collector project currently supports docker and kubernetes deployments (using the helm charts provided in sf-deployments). Container runtimes based on CRI-O is planned for futures releases of the collector.

SysFlow Processor (sf-processor repo)

The SysFlow processor is a lighweight edge analytics pipeline that can process and enrich SysFlow data. The processor is written in golang, and allows users to build and configure various pipelines using a set of built-in and custom plugins and drivers. Pipeline plugins are producer-consumer objects that follow an interface and pass data to one another through pre-defined channels in a multi-threaded environment. By contrast, a driver represents a data source, which pushes data to the plugins. The processor currently supports two builtin drivers, including one that reads sysflow from a file, and another that reads streaming sysflow over a domain socket. Plugins and drivers are configured using a JSON file.

A core built-in plugin is a policy engine that can apply logical rules to filter, alert, or semantically label sysflow records using a declarative language based on the Falco rules syntax with a few added extensions (more on this later).

Custom plugins and drivers can be implemented as dynamic libraries to tailor analytics to specific user requirements.

The endpoint of a pipeline configuration is an exporter plugin that sends the processed data to a target. The processor supports various types of export plugins for a variety of different targets.

Prerequisites

The processor has been tested on Ubuntu/RHEL distributions, but should work on any Linux system.

  • Golang version 1.14+ and make (if buiding from sources)
  • Docker, docker-compose (if building with docker)

Build

Clone the processor repository

git clone https://github.com/sysflow-telemetry/sf-processor.git

Build locally, from sources

cd sf-processor
make build

Build with docker

cd sf-processor
make docker-build

Usage

For usage information, type:

cd driver/
./sfprocessor -help

This should yield the following usage statement:

Usage: sfprocessor [[-version]|[-driver ] [-log ] [-driverdir ] [-plugdir ] path]
Positional arguments:
path string
Input path
Arguments:
-config string
Path to pipeline configuration file (default “pipeline.json”)
-cpuprofile file
Write cpu profile to fil
e
-driver string
Driver name {file|socket|} (default “file”)
-driverdir string
Dynamic driver directory (default “../resources/drivers”)
-log string
Log level {trace|info|warn|error} (default “info”)
-memprofile file
Write memory profile to file
-plugdir string
Dynamic plugins directory (default “../resources/plugins”)
-test
Test pipeline configuration
-traceprofile file
Write trace profile to file
-version
Output version informa

The four most important flags are configdriverdirplugdir, and driver. The config flag points to a pipeline configuration file, which describes the entire pipeline and settings for the individual settings for the plugins. The driverdir and plugdir flags specify where any dynamic drivers and plugins shared libraries reside that should be loaded by the processor at runtime. The driver flag accepts a label to a pre-configured driver (either built-in or custom) that will be used as the data source to the pipeline. Currently, the pipeline only supports one driver at a time, but we anticipate handling multiple drivers in the future. There are two built-in drivers:

  • file: loads a sysflow file reading driver that reads from path.
  • socket: the processor loads a sysflow streaming driver. The driver creates a domain socket named path and acts as a server waiting for a SysFlow collector to attach and send sysflow data.

Pipeline Configuration

The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a yaml file. An example pipeline with this configuration looks as follows:

{
“pipeline”:[
{
“processor”: “sysflowreader”,
“handler”: “flattener”,
“in”: “sysflow sysflowchan”,
“out”: “flat flattenerchan”
},
{
“processor”: “policyengine”,
“in”: “flat flattenerchan”,
“out”: “evt eventchan”,
“policies”: “../resources/policies/runtimeintegrity”
},
{
“processor”: “exporter”,
“in”: “evt eventchan”,
“export”: “syslog”,
“proto”: “tcp”,
“tag”: “sysflow”,
“host”: “localhost”,
“port”: “514”
}
]
}tion

Note: This configuration can be found in: sf-collector/resources/pipelines/pipeline.syslog.json

This pipeline specifies three built-in plugins:

  • sysflowreader: is a generic reader plugin that ingests sysflow from the driver, caches entities, and presents sysflow objects to a handler object (i.e., an object that implements the handler interface) for processing. In this case, we are using the flattener handler, but custom handlers are possible.
  • policyengine: is the policy engine, which takes flattened (row-oriented) SysFlow records as input and outputs records, which represent alerts, or filtered sysflow records depending on the policy engine’s mode (more on this later).
  • exporter: takes records from the policy engine, and exports them to ElasticSearch, syslog, file, or terminal, in a JSON format or in Elastic Common Schema (ECS) format. Note that custom export plugins can be created to export to other serialization formats and transport protocols.

Each plugin has a set of general attributes that are present in all plugins, and a set of attributes that are custom to the specific plugins. For more details on the specific attributes in this example, see the pipeline configuration template

The general attributes are as follows:

  • processor (required): the name of the processor plugin to load. Processors must implement the SFProcessor interface; the name is the value that must be returned from the GetName() function as defined in the processor object.
  • handler (optional): the name of the handler object to be used for the processor. Handlers must implement the SFHandler interface.
  • in (required): the input channel (i.e. golang channel) of objects that are passed to the plugin.
  • out (optional): the output channel (i.e. golang channel) for objects that are pushed out of the plugin, and into the next plugin in the pipeline sequence.

Channels are modeled as channel objects that have an In attribute representing some golang channel of objects. See SFChannel for an example. The syntax for a channel in the pipeline is [channel name] [channel type]. Where channel type is the label given to the channel type at plugin registration (more on this later), and channel name is a unique identifier for the current channel instance. The name and type of an output channel in one plugin must match that of the name and type of the input channel of the next plugin in the pipeline sequence.

A plugin has exacly one input channel but it may specify more than one output channels. This allows pipeline definitions that fan out data to more than one receiver plugin similar to a Unix tee command. While there must be always one SysFlow reader acting as the entry point of a pipeline, a pipeline configuration may specify policy engines passing data to different exporters or a SysFlow reader passing data to different policy engines. Generally, pipelines form a tree rather being a linear structure.

Policy engine configuration

The policy engine ("processor": "policyengine") plugin is driven by a set of rules. These rules are specified in a YAML which adopts the same syntax as the rules of the [Falco](https://falco.org/docs/rules] project. A policy engine plugin specification requires the following attributes:

  • policies (required): The path to the YAML rules specification file. More information on rules can be found in the Rules section.
  • mode (optional): The mode of the polcy engine. Allowed values are alert for generating rule-based alerts, filter for rule-based filtering of SysFlow events, and bypasss for unchnanged pass-on of raw syflow events. Default value ist alert. If mode is bypass the policyengine attribute can be omitted.

Exporter configuration

An exporter ("processor": "exporter") plugin consists of two modules, an encoder for converting the data to a suitable format, and a transport module for sending the data to the target. Encoders target specific, i.e. for a particular export target a particular set of encoders may be used. In the exporter configuration the transport module is specified via the export paramater (required). The encoder is selected via the format parameter (optional). The default format is json.

The following table lists the cuurently supported exporter modules and the corresponding encoders. Additional encoders and transport modules can be implemented if need arises. If you plan to contribute or want to get involved in the discussion please join the SysFlow community.

Some of these combinations require additional configuration as described in the following sections. null is used for debugging the processor and doesn’t export any data.

Export to file

If export is set to file, an additional parameter file.path allows the specification of the target file.

Export to syslog

If the export parameter is set to syslog, output to syslog is enabled and the following addtional parameters are used:

  • syslog.proto (optional): The protocol used for communicating with the syslog server. Allows values are tcptls and udp. Default is tcp.
  • syslog.tag (optional): The tag used for each Sysflow record in syslog. Default is SysFlow.
  • syslog.source (optional): If set adds a hostname to the syslog header.
  • syslog.host (optional): The hostname of the sysflow server. Default is localhost.
  • syslog.port (optional): The port pf the syslow server. Default is 514.

Export to ElasticSearch

Export to ElasticSearch is enabled by setting the config parameter export to es. The only supported format for export to ElasticSearch is ecs.

Data export is done via bulk ingestion. The ingestion can be controlled by some additional parameters which are read when the es export target is selected. Required parameters specify the ES target, index and credentials. Optional parameters control some aspects of the behavior of the bulk ingestion and may have an effect on performance. You may need to adapt their valuesfor optimal performance in your environment.

  • es.addresses (required): A comma-separated list of ES endpoints.
  • es.index (required): The name of the ES index to ingest into.
  • es.username (required): The ES username.
  • es.password (required): The password for the specified ES user.
  • buffer (optional) The bulk size as the number of records to be ingested at once. Default is 0 but value of 0 indicates record-by-record ingestion which may be highly inefficient.
  • es.bulk.numWorkers (optional): The number of ingestion workers used in parallel. Default is 0 which means that the exporter uses as many workers as there are cores in the machine.
  • es.bulk.flashBuffer (optional): The size in bytes of the flush buffer for ingestion. It should be large enough to hold one bulk (the number of records specified in buffer), otherwise the bulk is broken into smaller chunks. Default is 5e+6.
  • es.bulk.flushTimeout (optional): The flush buffer time threshold. Valid values are golang duration strings. Default is 30s.

The Elastic exporter does not make any assumption on the existence or configuration of the index specified in es.index. If the index does not exist, Elastic will automatically create it and apply a default dynamic mapping. It may be beneficial to use an explicit mapping for the ECS data generated by the Elastic exporter. For convinience we provide an explicit mapping for creating a new tailored index in Elastic. For more information refer to the Elastic Mapping reference.

Export fo IBM Findings API (IBM Cloud Security & Compliance Center)

Export to IBM Findings API allows adding custom findings to the IBM Cloud Security & Compliance Center (SCC). The mode is enabled via setting the configuration parameter export to findings. The format parameter must be set to occurence in this case. For export to IBM Findings, the following parameters are used:

  • findings.apikey (required): The API key used for the Advisor service instance.
  • findings.url (required): The URL of the Advisor service instance.
  • findings.accountid (required): The acccount ID used for the Advisor service instance.
  • findings.provider (required): Unique ID of the note provider
  • findings.region (required): The cloud region of Advisor service instance.
  • findings.sqlqueryurl (required):
  • findings.sqlquerycrn (required):
  • findings.s3region (required):
  • findings.s3bucket (required):
  • findings.path (required):
  • findings.pool.capacity (optional): The capacity of the findings pool, Default is 250.
  • findings.pool.maxage (woptional): The maximum age of the security findings in the pool in minutes. Default is 1440.

For more information about inserting custom findings into IBM SCC, refer to Custom Findings section of IBM Cloud Security Advisor.

Override plugin configuration attributes with environment variables

It is possible to override any of the custom attributes of a plugin using an environment variable. This is especially useful when operating the processor as a container, where you may have to deploy the processor to multiple nodes, and have attributes that change per node. If an environment variable is set, it overrides the setting inside the config file. The environment variables must follow the following structure:

  • Environment variables must follow the naming schema <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>
  • The plugin name inside the pipeline configuration file must be all lower case.

For example, to set the alert mode inside the policy engine, the following environment variable is set:

export POLICYENGINE_MODE=alert

To set the syslog values for the exporter:

export EXPORTER_TYPE=telemetry
export EXPORTER_SOURCE=${HOSTNAME}
export EXPORTER_EXPORT=syslog
export EXPORTER_HOST=192.168.2.10
export EXPORTER_PORT=514

If running as a docker container, environment variables can be passed with the docker run command:

docker run
-e EXPORTER_TYPE=telemetry \
-e EXPORTER_SOURCE=${HOSTNAME} \
-e EXPORTER_EXPORT=syslog \
-e EXPORTER_HOST=192.168.2.10 \
-e EXPORTER_PORT=514

Writing runtime policies

The policy engine adopts and extends the Falco rules definition syntax. Before reading the rest of this section, please go through the Falco Rules documentation to get familiar with rulemacro, and list syntax, all of which are supported in our policy engine. Policies are written in one or more yaml files, and stored in a directory specified in the pipeline configuration file under the policies attribute of the policy engine plugin.

Rules contain the following fields:

  • rule: the name of the rule
  • description: a textual description of the rule
  • condition: a set of logical operations that can reference lists and macros, which when evaluating to true, can trigger an alert (when processor is in alert mode), or filter a sysflow record (when processor is in filter mode)
  • action: a list of actions to take place when the rule evaluates to true. Actions can be any of the following (note: new actions will be added in the future):
    • alert: processor outputs an alert
    • tag: enriches or tags the sysflow record with the labels in the tags field. This can be useful for semantically labeling of records with TTPs for example.
  • priority: label representing the severity of the alert can be: (1) low, medium, or high, or (2) emergency, alert, critical, error, warning, notice, informational, debug.
  • tags (optional): set of labels appended to alert (default: empty).
  • prefilter (optional): list of record types (sf.type) to whitelist before applying rule condition (default: empty).
  • enabled (optional): indicates whether the rule is enabled (default: true).

Macros are named conditions and contain the following fields:

  • macro: the name of the macro
  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

Lists are named collections and contain the following fields:

  • list: the name of the list
  • items: a collection of values or lists

Filters blacklist records matching a condition:

  • filter: the name of the filter
  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

For example, the rule below specifies that matching records are process events (sf.type = PE), denoting EXEC operations (sf.opflags = EXEC) for which the process matches macro package_installers. Additionally, the gloabl filter containers preempitively removes from the processing stream any records for processes not running in a container environment.

# lists
– list: rpm_binaries
items: [dnf, rpm, rpmkey, yum, ‘”75-system-updat”‘, rhsmcertd-worke, subscription-ma,
repoquery, rpmkeys, rpmq, yum-cron, yum-config-mana, yum-debug-dump,
abrt-action-sav, rpmdb_stat, microdnf, rhn_check, yumdb]
– list: deb_binaries
items: [dpkg, dpkg-preconfigu, dpkg-reconfigur, dpkg-divert, apt, apt-get, aptitude,
frontend, preinst, add-apt-reposit, apt-auto-remova, apt-key,
apt-listchanges, unattended-upgr, apt-add-reposit]
– list: package_mgmt_binaries
items: [rpm_binaries, deb_binaries, update-alternat, gem, pip, pip3, sane-utils.post, alternatives, chef-client]
#macros
– macro: package_installers
condition: sf.proc.name pmatch (package_mgmt_binaries)
#global filters (blacklisting)
– filter: containers
condition: sf.container.type = host
#rule definitions
– rule: Package installer detected
desc: Use of package installer detected
condition: sf.opflags = EXEC and package_installers
action: [alert]
priority: medium
tags: [actionable-offense, suspicious-process]
prefilter: [PE] # record types for which this rule should be applied (whitelisting)
enabled: true

The following table shows a detailed list of attribute names supported by the policy engine, as well as their type, and comparative Falco attribute name. Our policy engine supports both SysFlow and Falco attribute naming convention to enable reuse of policies across the two frameworks.

AttributesDescriptionValuesFalco Attribute
sf.typeRecord typePE,PF,NF,FF,FEN/A
sf.opflagsOperation flagsOperation Flags List: remove OP_ prefixevt.type (remapped as falco event types)
sf.retReturn codeintevt.res
sf.tsstart timestamp(ns)int64evt.time
sf.endtsend timestamp(ns)int64N/A
sf.proc.pidProcess PIDint64proc.pid
sf.proc.tidThread PIDint64thread.tid
sf.proc.uidProcess user IDintuser.uid
sf.proc.userProcess user namestringuser.name
sf.proc.gidProcess group IDintgroup.gid
sf.proc.groupProcess group namestringgroup.name
sf.proc.apidProc ancestors PIDs (qo)int64proc.apid
sf.proc.anameProc anctrs names (qo) (exclude path)stringproc.aname
sf.proc.exeProcess command/filename (with path)stringproc.exe
sf.proc.argsProcess command argumentsstringproc.args
sf.proc.nameProcess name (qo) (exclude path)stringproc.name
sf.proc.cmdlineProcess command line (qo)stringproc.cmdline
sf.proc.ttyProcess TTY statusbooleanproc.tty
sf.proc.entryProcess container entrypointboolproc.vpid == 1
sf.proc.createtsProcess creation timestamp (ns)int64N/A
sf.pproc.pidParent process IDint64proc.ppid
sf.pproc.gidParent process group IDint64N/A
sf.pproc.uidParent process user IDint64N/A
sf.pproc.groupParent process group namestringN/A
sf.pproc.ttyParent process TTY statusboolN/A
sf.pproc.entryParent process container entryboolN/A
sf.pproc.userParent process user namestringN/A
sf.pproc.exeParent process command/filenamestringN/A
sf.pproc.argsParent process command argumentsstringN/A
sf.pproc.nameParent process name (qo) (no path)stringproc.pname
sf.pproc.cmdlineParent process command line (qo)stringproc.pcmdline
sf.pproc.createtsParent process creation timestampint64N/A
sf.file.fdFile descriptor numberintfd.num
sf.file.pathFile pathstringfd.name
sf.file.newpathNew file path (used in some FileEvents)stringN/A
sf.file.nameFile name (qo)stringfd.filename
sf.file.directoryFile directory (qo)stringfd.directory
sf.file.typeFile typechar ‘f’: file, 4: IPv4, 6: IPv6, ‘u’: unix socket, ‘p’: pipe, ‘e’: eventfd, ‘s’: signalfd, ‘l’: eventpoll, ‘i’: inotify, ‘o’: unknown.fd.typechar
sf.file.is_open_writeFile open with write flag (qo)boolevt.is_open_write
sf.file.is_open_readFile open with read flag (qo)boolevt.is_open_read
sf.file.openflagsFile open flagsintevt.args
sf.net.protoNetwork protocolintfd.l4proto
sf.net.sportSource portintfd.sport
sf.net.dportDestination portintfd.dport
sf.net.portSrc or Dst port (qo)intfd.port
sf.net.sipSource IPintfd.sip
sf.net.dipDestination IPintfd.dip
sf.net.ipSrc or dst IP (qo)intfd.ip
sf.resFile or network resourcestringfd.name
sf.flow.rbytesFlow bytes read/receivedint64evt.res
sf.flow.ropsFlow operations read/receivedint64N/A
sf.flow.wbytesFlow bytes written/sentint64evt.res
sf.flow.wopsFlow bytes written/sentint64N/A
sf.container.idContainer IDstringcontainer.id
sf.container.nameContainer namestringcontainer.name
sf.container.image.idContainer image IDstringcontainer.image.id
sf.container.imageContainer image namestringcontainer.image
sf.container.typeContainer typeCT_DOCKER, CT_LXC, CT_LIBVIRT_LXC, CT_MESOS, CT_RKT, CT_CUSTOM, CT_CRI, CT_CONTAINERD, CT_CRIO, CT_BPMcontainer.type
sf.container.privilegedContainer privilege statusboolcontainer.privileged
sf.node.idNode identifierstringN/A
sf.node.ipNode IP addressstringN/A
sf.schema.versionSysFlow schema versionstringN/A
sf.versionSysFlow JSON schema versionintN/A

The policy language supports the following operations:

OperationDescriptionExample
A and BReturns true if both statements are truesf.pproc.name=bash and sf.pproc.cmdline contains echo
A or BReturns true if one of the statements are truesf.file.path = “/etc/passwd” or sf.file.path = “/etc/shadow”
not AReturns true if the statement isn’t truenot sf.pproc.exe = /usr/local/sbin/runc
A = BReturns true if A exactly matches B. Note, if B is a list, A only has to exact match one element of the list. If B is a list, it must be explicit. It cannot be a variable. If B is a variable use in instead.sf.file.path = [“/etc/passwd”, “/etc/shadow”]
A != BReturns true if A is not equal to B. Note, if B is a list, A only has to be not equal to one element of the list. If B is a list, it must be explicit. It cannot be a variable.sf.file.path != “/etc/passwd”
A < BReturns true if A is less than B. Note, if B is a list, A only has to be less than one element in the list. If B is a list, it must be explicit. It cannot be a variable.sf.flow.wops < 1000
A <= BReturns true if A is less than or equal to B. Note, if B is a list, A only has to be less than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.sf.flow.wops <= 1000
A > BReturns true if A is greater than B. Note, if B is a list, A only has to be greater than one element in the list. If B is a list, it must be explicit. It cannot be a variable.sf.flow.wops > 1000
A >= BReturns true if A is greater than or equal to B. Note, if B is a list, A only has to be greater than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.sf.flow.wops >= 1000
A in BReturns true if value A is an exact match to one of the elements in list B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list.sf.proc.exe in (bin_binaries, usr_bin_binaries)
A startswith BReturns true if string A starts with string Bsf.file.path startswith ‘/home’
A endswith BReturns true if string A ends with string Bsf.file.path endswith ‘.json’
A contains BReturns true if string A contains string Bsf.pproc.name=java and sf.pproc.cmdline contains org.apache.hadoop
A icontains BReturns true if string A contains string B ignoring capitalizationsf.pproc.name=java and sf.pproc.cmdline icontains org.apache.hadooP
A pmatch BReturns true if string A partial matches one of the elements in B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list.sf.proc.name pmatch (modify_passwd_binaries, verify_passwd_binaries, user_util_binaries)
exists AChecks if A is not a zero value (i.e. 0 for int, “” for string)exists sf.file.path

See the resources policies directory in github for examples. Feel free to contribute new and interesting rules through a github pull request.

Write a simple processing plugin

In addition to its core plugins, the processor also supports custom plugins that can be dynamically loaded into the processor via a compiled golang shared library using the golang plugin package. Custom plugins enables easy extension of the processor and the creation of custom pipelines tailored to specific use cases.

A dynamic plugin example is provided in github. The core of the plugin is building an object that implements an SFProcessor interface. Such an implementation looks as follows:

package main
import (
“sync”
“github.com/sysflow-telemetry/sf-apis/go/logger”
“github.com/sysflow-telemetry/sf-apis/go/plugins”
“github.com/sysflow-telemetry/sf-processor/core/flattener”
)
const (
pluginName string = “example”
)
// Plugin exports a symbol for this plugin.
var Plugin Example
// Example defines an example plugin.
type Example struct{}
// NewExample creates a new plugin instance.
func NewExample() plugins.SFProcessor {
return new(Example)
}
// GetName returns the plugin name.
func (s *Example) GetName() string {
return pluginName
}
// Init initializes the plugin with a configuration map.
func (s *Example) Init(conf map[string]string) error {
return nil
}
// Register registers plugin to plugin cache.
func (s *Example) Register(pc plugins.SFPluginCache) {
pc.AddProcessor(pluginName, NewExample)
}
// Process implements the main interface of the plugin.
func (s *Example) Process(ch interface{}, wg sync.WaitGroup) { cha := ch.(flattener.FlatChannel)
record := cha.In
logger.Trace.Println(“Example channel capacity:”, cap(record))
defer wg.Done()
logger.Trace.Println(“Starting Example”)
for {
fc, ok := <-record
if !ok {
logger.Trace.Println(“Channel closed. Shutting down.”)
break
}
logger.Info.Println(fc)
}
logger.Trace.Println(“Exiting Example”)
}
// SetOutChan sets the output channel of the plugin.
func (s *Example) SetOutChan(ch interface{}) {}
// Cleanup tears down plugin resources.
func (s *Example) Cleanup() {}
// This function is not run when module is used as a plugin.
func main() {}

The object must implement the following interface:

  • GetName() – returns a lowercase string representing the plugin’s label. This label is important, because it identifies the plugin in the pipeline.json file, enabling the processor to load the plugin. In the object above, this plugin is called example. Note that the label must be unique.
  • Init(config map[string]string) error – used to initialize the plugin. The configuration map that is passed to the function stores all the configuration information defined in the plugin’s definition inside pipeline.json (more on this later).
  • Register(pc plugins.SFPluginCache) – this registers the plugin with the plugin cache of the processor.
    • pc.AddProcessor(pluginName, <plugin constructor function>) (required) – registers the plugin named example with the processor. You must define a constructor function using the convention New<PluginName> which is used to instantiate the plugin, and returns it as an SFProcessor interface – see NewExample for an example.
    • pc.AddChannel(channelName, <output channel constructor function>) (optional) – if your plugin is using a custom output channel of objects (i.e., the channel used to pass output objects from this plugin to the next in the pipeline), it should be included in this plugin.
      • The channelName should be a lowercase unique label defining the channel type.
      • The constructor function should return a golang interface{} representing an object that as an In attribute of type chan <ObjectToBePassed>. We will call this object, a wrapped channel object going forward. For example, the channel object that passes sysflow objects is called SFChannel, and is defined here
      • For a complete example of defining an output channel, see NewFlattenerChan in the flattener as well as the Register function. The FlatChannel is defined here
  • Process(ch interface{}, wg *sync.WaitGroup) – this function is launched by the processor as a go thread and is where the main plugin processing occurs. It takes a wrapped channel object, which acts as the input data source to the plugin (i.e., this is the channel that is configured as the input channel to the plugin in the pipeline.json). It also takes a sync.WaitGroup object, which is used to signal to the processor when the plugin has completed running (see defer wg.Done() in code). The processor must loop on the input channel, and do its analysis on each input record. In this case, the example plugin is reading flat records and printing them to the screen.
  • SetOutChan(ch interface{}) – sets the wrapped channel that will serve as the output channel for the plugin. The output channel is instantiated by the processor, which is also in charge of stitching the plugins together. If the plugin is the last one in the chain, then this function can be left empty. See the SetOutputChan function in the flattener to see how an output channel is implemented.
  • Cleanup() – Used to cleanup any resources. This function is called by the processor after the plugin Process function exits. One of the key items to close in the Cleanup function is the output channel using the golang close() function. Closing the output channel enables the pipeline to be torn down gracefully and in sequence.
  • main(){} – this main method is not used by the plugin or processor. It’s required by golang in order to be able to compile as a shared object.

To compile the example plugin, use the provided Makefile:

cd plugins/example
make

This will build the plugin and copy it into resources/plugins/.

To use the new plugin, use the configuration provided in github, which defines the following pipeline:

{
“pipeline”:[
{
“processor”: “sysflowreader”,
“handler”: “flattener”,
“in”: “sysflow sysflowchan”,
“out”: “flat flattenerchan”
},
{
“processor”: “example”,
“in”: “flat flattenerchan”
}
]
}

This pipeline contains two plugins:

  • The builtin sysflowReader plugin with flattener handler, which takes raw sysflow objects, and flattens theminto arrays of integers and strings for easier processing in certain plugins like the policy engine.
  • The example plugin, which takes the flattened output from the sysflowreader plugin, and prints it the screen.

The key item to note is that the output channel (i.e., out) of sysflowreader matches the input channel (i.e., in) of the example plugin. This ensures that the plugins will be properly stitched together.

To run the example pipeline:

cd driver
./sfprocessor -config ../plugins/example/pipeline.example.json -plugdir ../resources/plugins/  ../resources/traces/mon.1531776712.sf

Deploy collector and processor using docker

The following docker-compose file illustrates how to deploy the processor and the collector as containers.

version: “3.5”
services:
sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: false
volumes:
– socket-vol:/sock/
environment:
DRIVER: socket
INPUT_PATH: /sock/sysflow.sock
POLICYENGINE_MODE: alert
EXPORTER_TYPE: telemetry
EXPORTER_SOURCE: sysflow
EXPORTER_EXPORT: syslog
EXPORTER_HOST:
EXPORTER_PORT: 514
sf-collector:
container_name: sf-collector
image: sysflowtelemetry/sf-collector:latest
depends_on:
– “sf-processor”
privileged: true
volumes:
– /var/run/docker.sock:/host/var/run/docker.sock
– /dev:/host/dev
– /proc:/host/proc:ro
– /boot:/host/boot:ro
– /lib/modules:/host/lib/modules:ro
– /usr:/host/usr:ro
– /mnt/data:/mnt/data
– socket-vol:/sock/
– ./resources/traces:/tests/traces
environment:
EXPORTER_ID: ${HOSTNAME}
NODE_IP:
FILTER: “container.name!=sf-collector and container.name!=sf-processor”
INTERVAL: 300
SOCK_FILE: /sock/sysflow.sock
volumes:
socket-vol:

Setting up the collector environment

The key setting in the collector portion of the file is the FILTER variable. Since the collector is built atop the sysdig core, it uses the sysdig filtering mechanism described here and can support all the sysdig attributes described there in case you want to filter on specific containers, processes, operations, etc. One of the more powerful filters is the container.type!=host filter, which limits collection only to container monitoring. If you want to monitor the entire host, simply remove the container.type operation from the filter.

Setting up the processor environment

As mentioned in a previous section, all custom plugin attributes can be set using the following: <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME> format. Note that the docker compose file sets several attributes including EXPORTER_TYPEEXPORTER_HOST and EXPORTER_PORT. Note that EXPORTER_SOURCE is set to the bash environment variable ${HOSTNAME}HOSTNAME must be explicitly exported before launching docker compose in order to be picked up.

export HOSTNAME
docker-compose up

The following are the default locations of the pipeline configuration and plugins directory:

  • pipeline.json – /usr/local/sysflow/conf/pipeline.json
  • plugins dir – /usr/local/sysflow/resources/plugins

We can overwrite these particular files/dirs in the docker container with those on the host by setting up a virtual mounts mapping the host directories/files into the container using the volumes section of the sf-processor in the docker-compose.yaml.

sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: true
volumes:
– /var/run/docker.sock:/var/run/docker.sock
– socket-vol:/sock/
– ./resources/pipelines/pipeline.runtimeintegrity.json:/usr/local/sysflow/conf/pipeline.json
– ./resources/plugins:/usr/local/sysflow/resources/plugins

If using the policy engine, the policy folder defaults to the following location in the container:

/usr/local/sysflow/resources/policies/

This location can be overwritten by setting the POLICYENGINE_POLICIES environment variable.

The docker container uses a default filter.yaml policy that outputs sysflow records in json. You can use your own policy files from the host by mounting your policy directory into the container as follows:

sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: true
volumes:
– /var/run/docker.sock:/var/run/docker.sock
– socket-vol:/sock/
– ./resources/policies/runtimeintegrity/:/usr/local/sysflow/resources/policies/

SysFlow Exporter (sf-exporter repo)

SysFlow exporter to export SysFlow traces to S3-compliant object stores.

Note: For remote syslogging and other export formats and connectors, check the SysFlow processor project.

Build

This document describes how to build and run the application both inside a docker container and on a Linux host. Building and running the application inside a docker container is the easiest way to start. For convenience, skip the build step and pull pre-built images directly from Docker Hub.

To build the project, first clone the source code, with submodules:

git clone –recursive git@github.com:sysflow-telemetry/sf-exporter.git

To checkout submodules on an already cloned repo:

git submodule update –init –recursive

To build the docker image for the exporter locally, run:

docker build -t sf-exporter .

Docker usage

The easiest way to run the SysFlow exporter is from a Docker container, with host mount for the trace files to export. The following command shows how to run sf-exporter with trace files located in /mnt/data on the host.

docker run -d –rm –name sf-exporter \
-e S3_ENDPOINT= \
-e S3_BUCKET= \
-e S3_ACCESS_KEY= \
-e S3_SECRET_KEY= \
-e NODE_IP=$HOSTNAME \
-e INTERVAL=150
-v /mnt/data:/mnt/data \
sysflowtelemetry/sf-exporter

The exporter is usually executed as a pod or docker-compose service together with the SysFlow collector. The exporter automatically removes exported files from the local filesystem it monitors. See the SysFlow deployments packages for more information.

Development

To build the exporter locally, run:

cd src & pip3 install -r requirements.txt
cd modules/sysflow/py3 & sudo python3 setup.py install

To run the exporter from the command line:

./exporter.py -h
usage: exporter.py [-h] [–exporttype {s3,local}] [–s3endpoint S3ENDPOINT]
[–s3port S3PORT] [–s3accesskey S3ACCESSKEY]
[–s3secretkey S3SECRETKEY] [–s3bucket S3BUCKET]
[–s3location S3LOCATION] [–s3prefix S3PREFIX]
[–secure [SECURE]] [–scaninterval SCANINTERVAL]
[–timeout TIMEOUT] [–agemin AGEMIN] [–dir DIR]
[–todir TODIR] [–nodename NODENAME] [–nodeip NODEIP]
[–podname PODNAME] [–podip PODIP]
[–podservice PODSERVICE] [–podns PODNS]
[–poduuid PODUUID] [–clusterid CLUSTERID]
sf-exporter: service for watching and uploading monitoring files to object
store.
optional arguments:
-h, –help show this help message and exit
–exporttype {s3,local}
export type
–s3endpoint S3ENDPOINT
s3 server address
–s3port S3PORT s3 server port
–s3accesskey S3ACCESSKEY
s3 access key
–s3secretkey S3SECRETKEY
s3 secret key
–s3bucket S3BUCKET target data bucket
–s3location S3LOCATION
target data bucket location
–s3prefix S3PREFIX exporter’s: static prefix directory for s3 bucket
–secure [SECURE] indicates if SSL connection
–scaninterval SCANINTERVAL
interval between scans
–timeout TIMEOUT connection timeout
–agemin AGEMIN number of minutes of traces to preserve in case of
repeated timeouts
–dir DIR data directory
–todir TODIR data directory
–nodename NODENAME exporter’s node name
–nodeip NODEIP exporter’s node IP
–podname PODNAME exporter’s pod name
–podip PODIP exporter’s pod IP
–podservice PODSERVICE
exporter’s pod service
–podns PODNS exporter’s pod namespace
–poduuid PODUUID exporter’s: pod UUID
–clusterid CLUSTERID
exporter’s: cluster ID

LEAVE A REPLY

Please enter your comment!
Please enter your name here