Shipping Logs with Vector
Vector is a lightweight, high-performance observability pipeline that can collect, transform, and route logs to ClickHouse. Since Logchef reads directly from ClickHouse, Vector is the recommended way to ingest logs.
Quick Start
Section titled “Quick Start”If you’re using the Docker quick start, Vector is already included and configured — demo syslog data flows into ClickHouse automatically. Skip to Adapting for Real Sources when you’re ready to ship your own logs.
For a standalone setup, you need three things:
- A ClickHouse table with the right schema
- A Vector config that transforms and ships logs
- Logchef pointed at the table
Prerequisites
Section titled “Prerequisites”- ClickHouse server running and accessible
- Vector installed (v0.30+)
- A Logchef instance (for querying)
Create the ClickHouse Table
Section titled “Create the ClickHouse Table”Logchef works with any ClickHouse table that has a timestamp column, but the built-in OpenTelemetry schema gives you the best experience — severity filtering, service grouping, and flexible log_attributes all work out of the box.
Create the table:
CREATE TABLE IF NOT EXISTS default.logs( timestamp DateTime64(3) CODEC(DoubleDelta, LZ4), severity_text LowCardinality(String) CODEC(ZSTD(1)), severity_number Int32 CODEC(ZSTD(1)), service_name LowCardinality(String) CODEC(ZSTD(1)), namespace LowCardinality(String) CODEC(ZSTD(1)), body String CODEC(ZSTD(1)), log_attributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
INDEX idx_severity_text severity_text TYPE set(100) GRANULARITY 4, INDEX idx_log_attributes_keys mapKeys(log_attributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_log_attributes_values mapValues(log_attributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_body body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1)ENGINE = MergeTree()PARTITION BY toDate(timestamp)ORDER BY (namespace, service_name, timestamp)TTL toDateTime(timestamp) + INTERVAL 30 DAYSETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;See Schema Design for details on field types, codecs, and indexing choices.
Configure Vector
Section titled “Configure Vector”A minimal Vector config that collects syslog and ships to ClickHouse:
[api]enabled = true
# Source: collect syslog[sources.syslog_input]type = "syslog"address = "0.0.0.0:514"mode = "udp"
# Transform: map to OTEL schema[transforms.remap_logs]inputs = ["syslog_input"]type = "remap"source = ''' structured = parse_syslog!(.message)
.timestamp = format_timestamp!(structured.timestamp, format: "%Y-%m-%d %H:%M:%S.%f") .body = structured.message .service_name = structured.appname .namespace = "syslog"
# Map severity to standardized values .severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) { "ERROR" } else if structured.severity == "warning" { "WARN" } else if structured.severity == "debug" { "DEBUG" } else if includes(["info", "notice"], structured.severity) { "INFO" } else { structured.severity }
# OTEL severity numbers # https://opentelemetry.io/docs/specs/otel/logs/data-model/#severity-fields .severity_number = if .severity_text == "ERROR" { 17 } else if .severity_text == "WARN" { 13 } else if .severity_text == "DEBUG" { 5 } else { 9 }
# Store source-specific fields in log_attributes .log_attributes = { "syslog.procid": structured.procid, "syslog.facility": structured.facility, "syslog.version": structured.version, "syslog.hostname": structured.hostname }
del(.message) del(.source_type)'''
# Sink: ship to ClickHouse[sinks.clickhouse]type = "clickhouse"inputs = ["remap_logs"]endpoint = "http://localhost:8123"database = "default"table = "logs"compression = "gzip"healthcheck.enabled = falseskip_unknown_fields = trueRun Vector:
vector --config vector.tomlAdapting for Real Sources
Section titled “Adapting for Real Sources”Replace the [sources] and [transforms] sections for your log source. The sink stays the same.
File-based Logs
Section titled “File-based Logs”[sources.file_logs]type = "file"include = ["/var/log/**/*.log"]read_from = "beginning"
[transforms.remap_file_logs]inputs = ["file_logs"]type = "remap"source = ''' .timestamp = now() .service_name = "file_service" .namespace = "files" .body = .message
# Extract severity if present in the log line severity_match = parse_regex(.message, r'^(?P<time>\S+)?\s+(?P<level>INFO|DEBUG|WARN|ERROR)') ?? {} .severity_text = severity_match.level ?? "INFO" .severity_number = if .severity_text == "ERROR" { 17 } else if .severity_text == "WARN" { 13 } else if .severity_text == "DEBUG" { 5 } else { 9 }
.log_attributes = { "file.path": .file, "host.name": get_hostname() }
del(.message) del(.source_type)'''Docker Containers
Section titled “Docker Containers”[sources.docker_logs]type = "docker_logs"include_containers = ["*"]
[transforms.remap_docker_logs]inputs = ["docker_logs"]type = "remap"source = ''' .timestamp = now() .service_name = .container_name .namespace = "containers" .body = .message
.severity_text = if match(.message, r"(?i)error|exception|fail|critical") { "ERROR" } else if match(.message, r"(?i)warn|warning") { "WARN" } else if match(.message, r"(?i)debug") { "DEBUG" } else { "INFO" }
.severity_number = if .severity_text == "ERROR" { 17 } else if .severity_text == "WARN" { 13 } else if .severity_text == "DEBUG" { 5 } else { 9 }
.log_attributes = { "container.name": .container_name, "container.image": .container_image, "container.id": .container_id }
del(.message) del(.source_type)'''Journald (systemd)
Section titled “Journald (systemd)”[sources.journald_logs]type = "journald"include_units = ["nginx", "sshd", "my-app"]
[transforms.remap_journald_logs]inputs = ["journald_logs"]type = "remap"source = ''' .timestamp = now() .service_name = ._SYSTEMD_UNIT ?? "unknown" .namespace = "systemd" .body = .message
# journald uses numeric priority (0-7) priority = to_int(._PRIORITY) ?? 6 .severity_text = if priority <= 3 { "ERROR" } else if priority == 4 { "WARN" } else if priority == 7 { "DEBUG" } else { "INFO" }
.severity_number = if .severity_text == "ERROR" { 17 } else if .severity_text == "WARN" { 13 } else if .severity_text == "DEBUG" { 5 } else { 9 }
.log_attributes = { "systemd.unit": ._SYSTEMD_UNIT ?? "", "host.name": ._HOSTNAME ?? "", "process.pid": to_string(._PID) ?? "", "process.uid": to_string(._UID) ?? "" }
del(.message) del(.source_type)'''Enriching Logs
Section titled “Enriching Logs”Add a second transform stage to attach environment metadata:
[transforms.enrich_logs]inputs = ["remap_logs"] # chain after your remap transformtype = "remap"source = ''' .log_attributes.environment = get_env_var!("ENVIRONMENT") .log_attributes.version = get_env_var!("APP_VERSION") .log_attributes.region = get_env_var!("REGION")'''Then update the sink to read from enrich_logs instead:
[sinks.clickhouse]inputs = ["enrich_logs"]# ... rest stays the sameConnect Logchef
Section titled “Connect Logchef”Once logs are flowing into ClickHouse:
- Log in to Logchef
- Go to Sources > Add Source
- Enter the ClickHouse connection details (host, port, database, table)
- Create or select a team and assign the source
Start querying:
namespace="syslog" and severity_text="ERROR"service_name="my-app" and body~"connection refused"log_attributes.container.name="api-server"See Search Syntax and Query Examples for more.