Monitor Data Sources for Stale Data

Identify log sources that have stopped sending data using the selectLast() function

Query

flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result
logscale
selectLast([@ingesttimestamp]) | lasteventseentime := @ingesttimestamp
currenttime := now()
timediff := (currenttime - lasteventseentime)
timeinmins := timediff/60000 | timeinmins := format(format="%d", field=timeinmins)
timeinhours := timediff/3600000 | timeinhours := format(format="%d", field=timeinhours)
timeindays := timediff/86400000 | timeindays := format(format="%d", field=timeindays)
case { 
        timeinmins > 30 | timeinmins < 61 | format("No new data in the last %s minutes", field=["timeinminutes"], as=Alert.Name) ;
        timeinmins > 60 | timeinhours < 24 | format("No new data in the last %s hours", field=["timeinhours"], as=Alert.Name) ;
        timeinhours > 24 | format("No new data in the last %s days", field=["timeindays"], as=Alert.Name)
        }

Introduction

The selectLast() function can be used to identify the most recent event for a specific field, which is useful for monitoring data freshness and detecting stale data sources over extended periods.

In this example, the selectLast() function is used to identify when data sources have stopped sending events by comparing the timestamp of the most recent event with the current time. The query calculates the elapsed time and formats an appropriate message based on whether the delay is in minutes, hours, or days.

A time window of 30 days is recommended to effectively identify stale data sources and calculate the appropriate time elapsed since the last event.

Example incoming data might look like this:

@timestamp@ingesttimestampsourcemessage
2025-11-05T10:00:00.000Z2025-11-05T10:00:01.000Zwebserver-01User login successful
2025-11-05T10:01:00.000Z2025-11-05T10:01:01.000Zwebserver-01Connection established
2025-11-05T10:02:00.000Z2025-11-05T10:02:01.000Zwebserver-01Session timeout
2025-11-05T10:03:00.000Z2025-11-05T10:03:01.000Zwebserver-02User login successful
2025-10-15T09:00:00.000Z2025-10-15T09:00:01.000Zwebserver-03Last event received

Step-by-Step

  1. Starting with the source repository events.

  2. flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result style 1 fill:#ff0000,stroke-width:4px,stroke:#000;
    logscale
    selectLast([@ingesttimestamp]) | lasteventseentime := @ingesttimestamp

    Identifies the most recent @ingesttimestamp value within the 30-day window and returns the result in a new field named lasteventseentime. The selectLast() function will find the most recent event even if it is from several days ago, making it essential for identifying stale data sources.

  3. flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result style 2 fill:#ff0000,stroke-width:4px,stroke:#000;
    logscale
    currenttime := now()

    Gets the current time and returns the result in a field named currenttime. This timestamp will be used as the reference point for calculating elapsed time.

  4. flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result style 3 fill:#ff0000,stroke-width:4px,stroke:#000;
    logscale
    timediff := (currenttime - lasteventseentime)

    Calculates the time difference between current time and last event time in milliseconds, returning the result in timediff.

  5. flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result style 4 fill:#ff0000,stroke-width:4px,stroke:#000;
    logscale
    timeinmins := timediff/60000 | timeinmins := format(format="%d", field=timeinmins)

    Converts the time difference to minutes and formats it as an integer. The division by 60000 converts milliseconds to minutes.

  6. flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result style 5 fill:#ff0000,stroke-width:4px,stroke:#000;
    logscale
    timeinhours := timediff/3600000 | timeinhours := format(format="%d", field=timeinhours)

    Converts the time difference to hours and formats it as an integer. The division by 3600000 converts milliseconds to hours.

  7. flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result style 6 fill:#ff0000,stroke-width:4px,stroke:#000;
    logscale
    timeindays := timediff/86400000 | timeindays := format(format="%d", field=timeindays)

    Converts the time difference to days and formats it as an integer. The division by 86400000 converts milliseconds to days.

  8. flowchart LR; %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% repo{{Events}} 1{{Aggregate}} 2[(Function)] 3[[Expression]] 4[[Expression]] 5[[Expression]] 6[[Expression]] 7{Conditional} result{{Result Set}} repo --> 1 1 --> 2 2 --> 3 3 --> 4 4 --> 5 5 --> 6 6 --> 7 7 --> result style 7 fill:#ff0000,stroke-width:4px,stroke:#000;
    logscale
    case { 
            timeinmins > 30 | timeinmins < 61 | format("No new data in the last %s minutes", field=["timeinminutes"], as=Alert.Name) ;
            timeinmins > 60 | timeinhours < 24 | format("No new data in the last %s hours", field=["timeinhours"], as=Alert.Name) ;
            timeinhours > 24 | format("No new data in the last %s days", field=["timeindays"], as=Alert.Name)
            }

    Uses case statement to create appropriate alert messages based on the time elapsed since the last event:

    • For 30-60 minutes: Creates an alert message showing the delay in minutes.

    • For 1-24 hours: Creates an alert message showing the delay in hours.

    • For more than 24 hours: Creates an alert message showing the delay in days.

    The formatted message is returned in the Alert.Name field.

  9. Event Result set.

Summary and Results

The query is used to monitor data freshness by calculating the time elapsed since the last event was received, providing appropriately formatted alerts based on the duration of inactivity.

This query is useful, for example, to create alerts for stale data sources, monitor data pipeline health, ensure continuous data ingestion, and identify potential issues with data collection or transmission.

Sample output from the incoming example data:

@timestamp@ingesttimestampsourceAlert.Name
2025-11-05T10:03:00.000Z2025-11-05T10:03:01.000Zwebserver-02No new data in the last 45 minutes
2025-10-15T09:00:00.000Z2025-10-15T09:00:01.000Zwebserver-03No new data in the last 21 days

Note that it is recommended to set the time window for this search to 30 days to ensure proper detection of stale data sources. This extended window allows the query to find and report on data sources that have not sent data for extended periods, up to 30 days in the past.