Database Source

Overview

The Database Source is a new feature introduced in LogScale Collector 1.11 that enables the direct collection of events from relational database tables. This functionality allows you to ingest data from SQL databases by treating table rows as individual events, providing seamless integration between your database systems and LogScale.

The Database Source monitors specified tables for new row inserts and automatically ingests them into LogScale. The collector maintains checkpoint offsets to ensure reliable, resumable data collection without duplicating events.

Supported Databases

The Database Source supports the following relational database management systems:

  • PostgreSQL - Open-source relational database

  • MySQL - Open-source relational database

  • Microsoft SQL Server (MSSQL) - Enterprise relational database

  • Oracle Database - Enterprise relational database

All necessary database drivers are included with LogScale Collector - no additional driver installation is required.

How It Works

The Database Source operates by polling the database at regular intervals to check for new rows in the specified table. When new rows are detected, the collector automatically ingests them as individual events into LogScale.

Change Detection and Checkpointing

The collector uses an offset-based approach to track which rows have been ingested. After successfully ingesting a batch of rows, the collector checkpoints the latest offset, creating a resumable state. This mechanism ensures that:

  • Data collection can resume from the last successful checkpoint after restarts

  • Events are not duplicated during normal operation

  • System failures or interruptions don't result in data loss or repeated ingestion

The offset is typically based on an auto-incrementing primary key column or other monotonically increasing field in the database table that allows the collector to identify new rows.

Important: While uniqueness of the offset column is not strictly required, it is strongly recommended. Rows that share the same offset column value may be skipped during collection.

Prerequisites

Before configuring the Database Source, ensure you have:

  • LogScale Collector version 1.11 or later installed

  • Network connectivity between the collector and the database server

  • Database credentials with appropriate read permissions

  • A database table with an appropriate offset column (auto-incrementing primary key strongly encouraged)

  • A "message" column in the table to serve as the event's raw string content

Note: The message column doesn't need to exist as a physical column in your table. You can use custom SQL queries to format a message string from multiple columns, or create a database view that constructs the message field from your source data.

Configuration

Comprehensive examples of configuring the Database Source for all supported database flavors are provided below.

Prerequisites

First, define a sink that will receive the collected events:

yaml
sinks:
  logscale:
    type: humio
    token: "${HUMIO_INGEST_TOKEN}"
    url: "https://cloud.humio.com"

PostgreSQL Examples

Example 1: Simple Table Query (Builder Mode)

yaml
sources:
  radius_postgres:
    type: postgres
    hostname: postgres.example.com
    port: 5432
    database: radius
    username: logscale_reader
    password: "${POSTGRES_PASSWORD}"
    sink: logscale
    
    query:
      type: builder
      table: radacct
      keyColumn: radacctid
      messageColumn: message
      additionalColumns:
        username: username
        nas_ip: nasipaddress
        session_id: acctsessionid
        status: acctstatustype
        start_time: acctstarttime

Example 2: Custom SQL Query

yaml
sources:
  radius_postgres_custom:
    type: postgres
    hostname: postgres.example.com
    port: 5432
    database: radius
    username: logscale_reader
    password: "${POSTGRES_PASSWORD}"
    sink: logscale
    
    query:
      type: custom
      selectQuery: |
        SELECT 
          radacctid,
          username,
          nasipaddress,
          acctstatustype,
          message,
          acctstarttime
        FROM radacct 
        WHERE radacctid >= {{CHECKPOINT}}
        ORDER BY radacctid
      keyColumn: radacctid
      messageColumn: message
      additionalColumns:
        user: username
        nas: nasipaddress
        status: acctstatustype
    
    parser: radius-parser

Example 3: With TLS Authentication:

yaml
sources:
  secure_postgres:
    type: postgres
    hostname: secure-db.example.com
    port: 5432
    database: application_logs
    username: collector
    password: "${DB_PASSWORD}"
    
    # TLS Configuration
    caFile:
      - /etc/ssl/certs/ca-bundle.crt
    clientCertFile: /etc/ssl/certs/client-cert.pem
    clientKeyFile: /etc/ssl/private/client-key.pem
    
    sink: logscale
    
    query:
      type: builder
      table: app_events
      keyColumn: event_id
      messageColumn: log_message
      additionalColumns:
        severity: severity_level
        component: app_component
        host: hostname

MySQL Examples

Example 1: Simple Application Logs

yaml
sources:
  app_logs_mysql:
    type: mysql
    hostname: mysql.example.com
    port: 3306
    database: application
    username: log_reader
    password: "${MYSQL_PASSWORD}"
    sink: logscale
    
    query:
      type: builder
      table: application_logs
      keyColumn: log_id
      messageColumn: log_text
      additionalColumns:
        level: log_level
        module: module_name
        timestamp: created_at

Example 2: Custom Query

yaml
sources:
  audit_mysql:
    type: mysql
    hostname: mysql.example.com
    port: 3306
    database: audit_db
    username: audit_reader
    password: "${MYSQL_PASSWORD}"
    sink: logscale
    
    query:
      type: custom
      selectQuery: |
        SELECT 
          audit_id,
          action,
          username,
          resource,
          message,
          timestamp,
          ip_address
        FROM audit_log
        WHERE audit_id >= {{CHECKPOINT}}
        ORDER BY audit_id
      keyColumn: audit_id
      messageColumn: message
      additionalColumns:
        action: action
        user: username
        resource: resource
        ip: ip_address
    
    parser: audit-parser

Example 3: With TLS Authentication:

yaml
sources:
  secure_mysql:
    type: mysql
    hostname: mysql-prod.example.com
    port: 3306
    database: transactions
    username: readonly_user
    password: "${MYSQL_PASSWORD}"
    
    caFile:
      - /etc/ssl/certs/mysql-ca.pem
    clientCertFile: /etc/ssl/certs/mysql-client-cert.pem
    clientKeyFile: /etc/ssl/private/mysql-client-key.pem
    
    sink: logscale
    
    query:
      type: builder
      table: transaction_log
      keyColumn: transaction_id
      messageColumn: transaction_details
      additionalColumns:
        amount: amount
        currency: currency_code
        status: status
        merchant: merchant_id

Microsoft SQL Server (MSSQL) Examples

Example 1: Basic Alert Log Collection

yaml
sources:
  windows_events_mssql:
    type: mssql
    hostname: sqlserver.example.com
    port: 1433
    database: EventLogs
    username: log_collector
    password: "${MSSQL_PASSWORD}"
    sink: logscale
    
    query:
      type: builder
      table: WindowsEvents
      keyColumn: EventID
      messageColumn: EventMessage
      additionalColumns:
        computer: ComputerName
        event_type: EventType
        source: EventSource
        time_generated: TimeGenerated

Example 2: Custom Query

yaml
sources:
  security_mssql:
    type: mssql
    hostname: sqlserver.example.com
    port: 1433
    database: SecurityDB
    username: security_reader
    password: "${MSSQL_PASSWORD}"
    sink: logscale
    
    query:
      type: custom
      selectQuery: |
        SELECT 
          EventID,
          ComputerName,
          EventType,
          EventSource,
          EventMessage AS message,
          TimeGenerated,
          UserName,
          Domain
        FROM SecurityEvents
        WHERE EventID >= {{CHECKPOINT}}
        ORDER BY EventID
      keyColumn: EventID
      messageColumn: message
      additionalColumns:
        computer: ComputerName
        event_type: EventType
        user: UserName
        domain: Domain
    
    parser: windows-event-parser

Oracle Database Examples

Example 1: Basic Alert Log Collection

yaml
sources:
  oracle_alerts:
    type: oracle
    servers:
      - hostname: oracle-primary.example.com
        port: 1521
    serviceName: ORCL
    database: PROD
    username: log_reader
    password: "${ORACLE_PASSWORD}"
    sink: logscale
    
    query:
      type: builder
      table: ALERT_LOG
      keyColumn: ALERT_ID
      messageColumn: MESSAGE_TEXT
      additionalColumns:
        severity: SEVERITY_LEVEL
        component: COMPONENT_NAME
        timestamp: TIMESTAMP

Example 2: RAC Configuration with Custom Query - (Oracle Real Application Clusters (RAC) setup)

yaml
sources:
  oracle_rac_audit:
    type: oracle
    servers:
      - hostname: oracle-rac1.example.com
        port: 1521
      - hostname: oracle-rac2.example.com
        port: 1521
      - hostname: oracle-rac3.example.com
        port: 1521
    serviceName: PRODRAC
    database: AUDIT
    username: audit_reader
    password: "${ORACLE_PASSWORD}"
    sink: logscale
    
    query:
      type: custom
      selectQuery: |
        SELECT 
          AUDIT_ID,
          USERNAME,
          ACTION_NAME,
          OBJECT_NAME,
          MESSAGE_TEXT,
          TIMESTAMP,
          OS_USERNAME,
          USERHOST
        FROM DBA_AUDIT_TRAIL
        WHERE AUDIT_ID >= {{CHECKPOINT}}
        ORDER BY AUDIT_ID
      keyColumn: AUDIT_ID
      messageColumn: MESSAGE_TEXT
      additionalColumns:
        db_user: USERNAME
        action: ACTION_NAME
        object: OBJECT_NAME
        os_user: OS_USERNAME
        host: USERHOST
    
    parser: oracle-audit-parser

Example 3: With TLS Authentication:

yaml
sources:
  oracle_secure:
    type: oracle
    servers:
      - hostname: oracle-secure.example.com
        port: 2484  # SSL/TLS port
    serviceName: SECUREDB
    database: APPDATA
    username: secure_reader
    password: "${ORACLE_PASSWORD}"
    
    caFile:
      - /etc/oracle/wallet/ca-cert.pem
    clientCertFile: /etc/oracle/wallet/client-cert.pem
    clientKeyFile: /etc/oracle/wallet/client-key.pem
    
    sink: logscale
    
    query:
      type: builder
      table: APPLICATION_EVENTS
      keyColumn: EVENT_ID
      messageColumn: EVENT_MESSAGE
      additionalColumns:
        event_type: EVENT_TYPE
        module: MODULE_NAME
        session_id: SESSION_ID
        timestamp: EVENT_TIMESTAMP

Complete Configuration Example

Here's a complete configuration file with multiple database sources:

yaml
sinks:
  logscale:
    type: humio
    token: "${HUMIO_INGEST_TOKEN}"
    url: "https://cloud.humio.com"

sources:
  # PostgreSQL RADIUS logs
  radius_logs:
    type: postgres
    hostname: postgres.example.com
    port: 5432
    database: radius
    username: reader
    password: "${POSTGRES_PASSWORD}"
    sink: logscale
    query:
      type: builder
      table: radacct
      keyColumn: radacctid
      messageColumn: message
      additionalColumns:
        username: username
        status: acctstatustype

  # MySQL application logs
  app_logs:
    type: mysql
    hostname: mysql.example.com
    port: 3306
    database: application
    username: reader
    password: "${MYSQL_PASSWORD}"
    sink: logscale
    query:
      type: builder
      table: app_logs
      keyColumn: log_id
      messageColumn: log_message
      additionalColumns:
        level: log_level

  # MSSQL security events
  security_events:
    type: mssql
    hostname: sqlserver.example.com
    port: 1433
    database: Security
    username: reader
    password: "${MSSQL_PASSWORD}"
    sink: logscale
    query:
      type: builder
      table: SecurityEvents
      keyColumn: EventID
      messageColumn: EventMessage
      additionalColumns:
        computer: ComputerName

  # Oracle audit logs
  audit_logs:
    type: oracle
    servers:
      - hostname: oracle.example.com
        port: 1521
    serviceName: ORCL
    database: AUDIT
    username: reader
    password: "${ORACLE_PASSWORD}"
    sink: logscale
    query:
      type: builder
      table: AUDIT_LOG
      keyColumn: AUDIT_ID
      messageColumn: MESSAGE
      additionalColumns:
        user: USERNAME
SQL Query Options

The Database Source provides two approaches for retrieving data from your database:

Auto-Generated SQL

Provide the table name and column names, and the collector will automatically generate the appropriate SQL query with the necessary WHERE clause for offset tracking:

  • Specify the table name

  • List the columns to retrieve

  • The collector constructs the query with WHERE offset_column >= {{checkpoint}}

Custom SQL

Alternatively, provide your own custom SQL query with a placeholder for the offset filter:

  • Write a custom SELECT statement tailored to your needs

  • Include the {{checkpoint}} placeholder in the WHERE clause

  • The collector will substitute the checkpoint value when executing the query

Example custom SQL for RADIUS accounting logs:

sql
SELECT 
  radacctid,
  username,
  nasipaddress,
  acctstatustype,
  CONCAT('User: ', username, ' Status: ', acctstatustype, ' Session: ', acctsessionid) AS message,
  acctstarttime,
  acctsessiontime,
  acctinputoctets,
  acctoutputoctets
FROM radacct 
WHERE radacctid >= {{checkpoint}} 
  AND acctstatustype IN ('Start', 'Stop')
ORDER BY radacctid
Event Structure

Each database row is transformed into a LogScale event. The collector extracts all data as strings - rendering is performed by the database server.

Required Fields

  • message column: A designated column in your table that contains the event message. This is ingested as @rawstring following LogScale conventions

  • offset column: The tracking column (also called the "key") is automatically added to each event as @collect.key

Field Mapping

Column names are mapped to event field names. This mapping is customizable during configuration to match your LogScale schema requirements.

Example Event Mapping

Given a RADIUS accounting database row:

radacctid username nasipaddress acctstatustype message acctstarttime
98765 user@example.com 10.1.2.3 Stop User session ended 2024-01-15 10:30:45

The resulting LogScale event would contain:

Field Value
@rawstring User session ended
@collect.key 98765
username user@example.com
nasipaddress 10.1.2.3
acctstatustype Stop
acctstarttime 2024-01-15 10:30:45

Additional columns can be included in the event based on your configuration.

Best Practices

Choosing an Offset Column

Select an offset column that is:

  • Auto-incrementing primary key: (strongly encouraged)

  • Monotonically increasing: Values always increase for new rows

  • Indexed: Ensures efficient querying for new rows

  • Non-nullable: Every row must have a value

  • Immutable: Values should not change after insertion

  • Unique (recommended): Prevents rows from being skipped during collection

For timestamp-based offset columns, ensure the timestamp has sufficient resolution - at least millisecond precision, but microsecond precision is preferable to avoid rows sharing the same timestamp value.

Performance Optimization

To ensure optimal performance when collecting from high-volume tables:

  1. Create appropriate indexes on the offset column to speed up queries for new rows

  2. Use column filtering to only retrieve necessary fields, reducing data transfer

  3. Monitor collector resource usage and adjust configuration as needed

  4. Ensure the message column contains appropriately formatted event data

Security Considerations

Follow these security best practices:

  • Use read-only database accounts with minimal necessary privileges

  • Store credentials in environment variables instead of configuration files

  • TLS encryption is enforced by the Database Source implementation for all connections

  • Regularly rotate database credentials and update collector configuration accordingly

Starting Offset Configuration

When configuring a new Database Source or reinitializing collection, you can specify a starting offset value. This allows you to:

  • Begin collection from a specific point in the table rather than from the beginning

  • Skip historical data that has already been ingested through other means

  • Resume collection after resetting the checkpoint state

If no starting offset is configured and no checkpoint exists, the collector will begin from the earliest available row based on the offset column.

Monitoring and Troubleshooting

Monitoring Collector Status

The Database Source uses polling to check for new data at regular intervals. Monitor your Database Source using the following approaches:

  • Check collector logs for connection status and ingestion metrics

  • Monitor checkpoint advancement to ensure data is being collected

  • Track lag between database inserts and LogScale ingestion

  • Observe the polling frequency and adjust if necessary

  • Set up alerts for connection failures or ingestion stalls

Common Issues and Solutions
Issue Symptom Potential Causes and Solutions
Connection Failures Collector cannot connect to the database
  • Verify network connectivity using tools such as nc, psql, mysql, sqlcmd, or sqlplus

  • Confirm database credentials are correct

  • Check firewall rules allow connections from the collector host

  • Ensure the database service is running and accepting connections

  • Verify TLS/SSL certificates are valid if using certificate-based authentication

No Events Being Collected Collector connects but no events appear in LogScale
  • Verify new rows are being inserted into the monitored table

  • Check the offset column is being populated correctly for new rows

  • Ensure the "message" column contains valid data

  • Review any WHERE clause filters in custom SQL that might exclude rows

  • Examine collector logs for errors during query execution

  • Confirm LogScale sink configuration is correct

  • Verify the starting offset is set appropriately

Missing Events Some events are not collected from the database
  • Check for duplicate offset column values - rows with the same offset may be skipped

  • Verify offset column has sufficient resolution (millis or micros for timestamps)

  • Ensure offset column values are truly monotonically increasing

  • Review custom SQL for logic errors that might skip rows

  • Check for gaps in auto-incrementing sequences

Duplicate Events Same events appear multiple times in LogScale
  • Offset column values are not monotonically increasing

  • Rows are being updated with lower offset values

  • Multiple collectors are monitoring the same table without coordination

  • Checkpoint storage issues preventing offset persistence

Performance Degradation Collector consumes excessive resources or falls behind
  • Add or optimize database indexes on the offset column

  • Limit columns retrieved using selective column configuration

  • Consider database query performance and optimization

  • Review custom SQL for inefficient operations

  • Monitor database server load during polling intervals