Ingestion: Digest Phase

The digest phase supports key range of functionality that processes, stores and queries data as it is being ingested. The most critical part of digest is the storage of files down to disk that will be used by LogScale. The full range of operations during this phase includes:

  • Run live searches with the incoming data before it is stored to return the newest events matching searches (for Live Queries).

  • Execute Alerts on the incoming data so that an action can be triggered as soon as an event matches.

  • Execute historical searches on most recent and current data.

  • Build segment files in preparation for storage.

  • Replicate segment files to other LogScale nodes to share load and support high availability.

  • Compress data ready for longer term storage.

The digest phase is critical as data cannot be searched until the data has been digested and stored. Digest supports two separate workflows to support the digest (and storage) of information and the execution of queries for live queries and alerts:

graph LR; KQ[Kafka Queue] SE[Save Events to Segment Files] PE[Push Events to Live Queries] L1[Live Query 1] L2[Live Query 2] L3[Live Query 3] L4[Live Query 4] KQ-->PE KQ-->SE PE-->L1 PE-->L2 PE-->L3 PE-->L4
Digest Layout

To handle large volumes of data the Kafka ingest queue is split into multiple partitions. Kafka uses partitions to scale up a queue by distributing the work across multiple machines each handling a different partition.

LogScale matches digest partitions one-to-one with the ingest queue partitions in Kafka. LogScale can add as many digest nodes to a cluster as there are digest partitions. LogScale will automatically divide the digest partitions among all the digest nodes in the cluster.

A datasource is mapped to a digest partition. The ingest layer will process all arriving data for a datasource on the right partition on the ingest queue and it will be handled by the matching digest worker. This means it is the same node that is handling all incoming data for a datasource. Multiple datasources are multiplexed on the same partitions, as seen in the diagram below.

There will often be thousands of datasources in a LogScale cluster and it is not feasible to have that many partitions on a Kafka queue.

%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% graph LR subgraph KQ ["Kafka Queues"] direction LR KQ1["Ingest Queue Partition 1"] KQ2["Ingest Queue Partition 2"] KQ3["Ingest Queue Partition 3"] KQ4["Ingest Queue Partition 4"] end subgraph D1 ["Digest Node 1"] direction LR DP1["Digest Partition Worker 1"] DS1A["Datasource 1A"] DS2A["Datasource 2A"] DS3A["Datasource 3A"] DP2["Digest Partition Worker 2"] DS1B["Datasource 1B"] DS2B["Datasource 2B"] DS3B["Datasource 3B"] end subgraph D2 ["Digest Node 2"] direction LR DP3["Digest Partition Worker 2"] DS1C["Datasource 1C"] DS2C["Datasource 2C"] DS3C["Datasource 3C"] end KQ1 --> DP1 KQ2 --> DP2 KQ3 --> DP3 DP1 --> DS1A DP1 --> DS2A DP1 --> DS3A DP2 --> DS1B DP2 --> DS2B DP2 --> DS3B DP3 --> DS1C DP3 --> DS2C DP3 --> DS3C

For more information on Kafka partitions, see blogpost1 and blogpost2

After data has been digested, the data for a datasource will be spread out across the cluster to improve search performance.

Digest coordination and robustness

Node failures during digest are managed by monitoring the digestion of data on the incoming Kafka partition. If a digest node is having problems other nodes will take over the digest partitions it has been assigned. Digest work is coordinated and there is always only one digest worker for a given digest partition.

The global database is used to coordinate between the nodes and choose a digest node for a partition. The information is recorded in the global database. The coordinator will notice failures and decide which nodes should take over. Digest workers use conditional writes to only be able to write to a given digest when they are actually the owner of that partition.

In the case of a failure for a digest node, another digest node can take over without losing data. Any data that was only stored on the failed node can be replayed from the Kafka queue, re-digested and stored on the replacement node. To avoid having to read far back on the Kafka ingest queue partitions, LogScale creates mini segments to keep the replay window small.

High availability and robustness is achieved by replicating data. See Data Replication and High Availability for more information.

Live Searches During Digest

Live searches are executed within the digest layer so that incoming events matching a live query are identified as soon as the data has been processed, without having to wait for the data to be stored and queries post-storage. For a more detailed description of how live searches are executed within the context of the overall searching mechanism, see Live Searches.

An expanded view of the live search mechanism during digest is shown below:

graph LR; subgraph DW ["Digest Worker"] KQ[Kafka Queue Events] SE[Save Events to Segment Files] PE[Push Events to Live Queries] L1[Live Query 1] L2[Live Query 2] L3[Live Query 3] L4[Live Query 4] end QC1[Query Coordinator 1] QC2[Query Coordinator 2] QC3[Query Coordinator 3] QC4[Query Coordinator 4] KQ-->PE KQ-->SE PE-->L1 PE-->L2 PE-->L3 PE-->L4 L1--Poll-->QC1 L2--Poll-->QC2 L3--Poll-->QC3 L4--Poll-->QC4

Live searches is designed to be lightweight (resource wise) and show data in real time. The real time aspect is achieved by pushing events into the running live searches as they are digested. Typically, it takes less than a second from an event hits a LogScale cluster until it is available for search and pushed into all live queries. As events are digested, they exist within memory and so it is quick and efficient to push them into live queries for processing.

There are, however, some tradeoffs. Saving data into segment files is tightly coupled running the live searches. This maximizes efficiency, but also means that the digest process is under pressure to execute the live search before the data is stored. This can create ingest delays as the data is not stored and therefore available for non-live queries.

To alleviate this problem, LogScale calculates the Query Cost. The cost is calculated by tracking the CPU and memory usage required to process the search. In the event that an ingest delay is identified, a costly search is killed.

Note

LogScale will always prioritize ingest over searches.

Creating Segment files

LogScale stores data in segment files. LogScale is a "write heavy" environment, and so the creation and storage of data on disk needs to be achieved as quickly and efficiently as possible. Because LogScale is not typically a read-heavy environment, LogScale can avoid spending time writing additional metadata like indexes to improve read performance.

Segment files have the following properties:

  • Each segment files is a large chunk of data belonging to a single datasource.

  • Each segment files is immutable and cannot be changed once written.

  • Segment files are replicated between storage nodes within the cluster to support resilience within the cluster.

  • Segment files are also designed to be really fast to scan in order to achieve fast searches. See Scanning data for more information on the scanning process.

  • Segment files are compressed on disk to provide efficient storage. The compression factor depends on data, but typically compression ratios between 5 and 20 times are not uncommon given the text nature of log files. Compression provides some advantages when searching, for more information see Compression.

    The trade-off for compression is the time spent compressing and decompressing the data against the reduction in storage requirements and network transfers when replicating data or storing it on bucket storage solutions.

  • During creation of the segment, LogScale also creates hash filters for the data, one filter for each file. The hash filter speeds up searches significantly, as described in Hash Filters.

Segment files are comprised of data blocks as shown in the figure below.

%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% graph LR NE["New Events from Kafka"] subgraph DW ["Digest Worker"] CB["Current Block (in memory)"] CH["Current Hashfilter (in memory)"] end NE-->CB NE-->CH CB-->OS1B3 CH-->HS1B3 subgraph OS ["Current Segment File"] OS1I["Index"] OS1B0["Block 0"] OS1B1["Block 1"] OS1B2["Block 2"] OS1B3["Block N"] end subgraph HS ["Hash filter for Current Segment"] HS1B0["Hashfilter Block 0"] HS1B1["Hashfilter Block 1"] HS1B2["Hashfilter Block 2"] HS1B3["Hashfilter Block N"] end OS1I ~~~ OS1B0 ~~~ OS1B1 ~~~ OS1B2 ~~~ OS1B3 HS1B0 ~~~ HS1B1 ~~~ HS1B2 ~~~ HS1B3

The process of writing data to segments has the following semantics:

  • Segment files are created by adding blocks to the current segment file, with one block added at a time.

  • Once writing to a segment file has been completed, it is closed and considered immutable.

  • Each block is compressed independently, and a corresponding hashfilter block written to the hashfilter file.

  • A block is the smallest component that can be read, which works with the LogScale search system which processes one block at a time.

  • Once the block is completed, it can be replicated to other storage nodes.

The segment writing process is as follows:

  1. A digest worker reads the events from a partition from the Kafka ingest queue.

  2. Each datasource will have one open segment file that is being populated with incoming events. Segment files are built one block at a time. The current block being build is kept in memory along with its corresponding hash filter.

  3. When the block is full, the data is written to the current segment file and the hash filter is written to the hash filter file for the segment.

  4. A segment file can be closed under two conditions:

    • The segment file has reached its maximum size.

    • The hashfilter file for the corresponding segment file has reach its maximum capacity.

    When the segment file fills up, it is closed along with its hash filter file. From here on the segment (and hash filter) is immutable and can be replicated to other digest nodes and bucket storage.

  5. When segments are finished, a segment entry is written to the Global Database so the whole cluster knows about the segment and its metadata.

The metadata written to the Global database when a segment file is closed includes:

  • Time span for the data (start time and end time, storing both the event and ingest timestamps).

  • Kafka offsets, so that data can be re-read from the Kafka queue in the event of a digest node failure.

  • The lists of LogScale nodes that have a copy of completed segment and a list of nodes which should have a copy of the segment (for efficient searching), called the owner host.

    A background process, the Segment Mover, continuously moves segments to the owner hosts.

A segment is the smallest entity that can be shared and replicated within the LogScale cluster. The faster that these segments can be closed provides some additional benefits:

  • Smaller segments are easier to be replicated between nodes.

  • In the event of a node fails, a smaller segment is easier to recreate by replaying the events from the Kafka queue.

The drive to create small segments causes the digest notes to create mini segments. However, mini segments are not optimal when searching, and having a large number of smaller segments increases the size of the global database required to map the segments to nodes.

To improve the performance, digest nodes merge together these smaller mini-segments into full segments in a separate process that merges the files together before they are sent to the storage nodes.

%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% block-beta columns 4 TS1["Mini segment file"] TS2["Mini segment file"] space TS3["Segment file"] block:SA ABI["Index"] AB0["Block 0"] AB1["Block 1"] AB2["Block 2"] end block:SB BBI["Index"] BB0["Block 0"] BB1["Block 1"] BB2["Block 2"] end blockArrowId6<["&nbsp;&nbsp;&nbsp;"]>(right) block:SL LBI["Index"] LB0["Block 0"] LB1["Block 1"] LB2["Block 2"] end block:HA HAB0["Hashfilter Block 0"] HAB1["Hashfilter Block 1"] HAB2["Hashfilter Block 2"] end block:HSB HBB0["Hashfilter Block 0"] HBB1["Hashfilter Block 1"] HBB2["Hashfilter Block 2"] end blockArrowId7<["&nbsp;&nbsp;&nbsp;"]>(right) block:HSL HLB0["Hashfilter Block 0"] HLB1["Hashfilter Block 1"] HLB2["Hashfilter Block 2"] end TH1["Mini segment file"] TH2["Mini segment file"] space TH3["Segment file"] style TS1 fill:none,stroke:#fff; style TS2 fill:none,stroke:#fff; style TS3 fill:none,stroke:#fff; style TH1 fill:none,stroke:#fff; style TH2 fill:none,stroke:#fff; style TH3 fill:none,stroke:#fff;

To improve the overall performance of this process, LogScale uses different compression algorithms for the mini segments and full segments. See Compression for more information.

Writing Segments and Datasources

Writing data to one datasource is single threaded. Only one thread can add data to the current block. Data is also written to disk in the same sequence it is read from Kafka to keep the failure scenarios simpler.

Segment File Sizing

The default LogScale configuration is for a block of data to be approximately 1MB of original (uncompressed) data. A block may be closed early if the hash filter fills up. Since a Hashfilter is a type of Bloomfilter, it's possible for the overall size to become to large to be efficient.

A mini segment is typically closed after 30 minutes to keep the "replay window" at 30 minutes. Mini segments are merged into full segments after 24 hours.

Segment File Syncing

For efficiency, blocks are constructed in memory and then written to disk less frequently and in larger single writes. Because the queue of incoming data is always available on Kafka as the durable storage, LogScale itself does not need to synchronise to disk so frequently.

If something fails data can be reread from the Kafka queue. Through the ingest/digest pipeline LogScale will also try to batch events. Often all or some of the events in a request will go to the same digest partition or event better to the same data source.

Sequential Segment Files

LogScale is designed to create large sequential segment files, to improve read performance from disks. Although SSDs are better at random reads, there is still a performance benefit of writing and reading data sequentially.