Skip to content

Shipping Logs with Vector

Vector is a lightweight, high-performance observability pipeline that can collect, transform, and route logs to ClickHouse. Since Logchef reads directly from ClickHouse, Vector is the recommended way to ingest logs.

If you’re using the Docker quick start, Vector is already included and configured — demo syslog data flows into ClickHouse automatically. Skip to Adapting for Real Sources when you’re ready to ship your own logs.

For a standalone setup, you need three things:

  1. A ClickHouse table with the right schema
  2. A Vector config that transforms and ships logs
  3. Logchef pointed at the table
  • ClickHouse server running and accessible
  • Vector installed (v0.30+)
  • A Logchef instance (for querying)

Logchef works with any ClickHouse table that has a timestamp column, but the built-in OpenTelemetry schema gives you the best experience — severity filtering, service grouping, and flexible log_attributes all work out of the box.

Create the table:

CREATE TABLE IF NOT EXISTS default.logs
(
timestamp DateTime64(3) CODEC(DoubleDelta, LZ4),
severity_text LowCardinality(String) CODEC(ZSTD(1)),
severity_number Int32 CODEC(ZSTD(1)),
service_name LowCardinality(String) CODEC(ZSTD(1)),
namespace LowCardinality(String) CODEC(ZSTD(1)),
body String CODEC(ZSTD(1)),
log_attributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
INDEX idx_severity_text severity_text TYPE set(100) GRANULARITY 4,
INDEX idx_log_attributes_keys mapKeys(log_attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attributes_values mapValues(log_attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY toDate(timestamp)
ORDER BY (namespace, service_name, timestamp)
TTL toDateTime(timestamp) + INTERVAL 30 DAY
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

See Schema Design for details on field types, codecs, and indexing choices.

A minimal Vector config that collects syslog and ships to ClickHouse:

[api]
enabled = true
# Source: collect syslog
[sources.syslog_input]
type = "syslog"
address = "0.0.0.0:514"
mode = "udp"
# Transform: map to OTEL schema
[transforms.remap_logs]
inputs = ["syslog_input"]
type = "remap"
source = '''
structured = parse_syslog!(.message)
.timestamp = format_timestamp!(structured.timestamp, format: "%Y-%m-%d %H:%M:%S.%f")
.body = structured.message
.service_name = structured.appname
.namespace = "syslog"
# Map severity to standardized values
.severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) {
"ERROR"
} else if structured.severity == "warning" {
"WARN"
} else if structured.severity == "debug" {
"DEBUG"
} else if includes(["info", "notice"], structured.severity) {
"INFO"
} else {
structured.severity
}
# OTEL severity numbers
# https://opentelemetry.io/docs/specs/otel/logs/data-model/#severity-fields
.severity_number = if .severity_text == "ERROR" {
17
} else if .severity_text == "WARN" {
13
} else if .severity_text == "DEBUG" {
5
} else {
9
}
# Store source-specific fields in log_attributes
.log_attributes = {
"syslog.procid": structured.procid,
"syslog.facility": structured.facility,
"syslog.version": structured.version,
"syslog.hostname": structured.hostname
}
del(.message)
del(.source_type)
'''
# Sink: ship to ClickHouse
[sinks.clickhouse]
type = "clickhouse"
inputs = ["remap_logs"]
endpoint = "http://localhost:8123"
database = "default"
table = "logs"
compression = "gzip"
healthcheck.enabled = false
skip_unknown_fields = true

Run Vector:

Terminal window
vector --config vector.toml

Replace the [sources] and [transforms] sections for your log source. The sink stays the same.

[sources.file_logs]
type = "file"
include = ["/var/log/**/*.log"]
read_from = "beginning"
[transforms.remap_file_logs]
inputs = ["file_logs"]
type = "remap"
source = '''
.timestamp = now()
.service_name = "file_service"
.namespace = "files"
.body = .message
# Extract severity if present in the log line
severity_match = parse_regex(.message, r'^(?P<time>\S+)?\s+(?P<level>INFO|DEBUG|WARN|ERROR)') ?? {}
.severity_text = severity_match.level ?? "INFO"
.severity_number = if .severity_text == "ERROR" {
17
} else if .severity_text == "WARN" {
13
} else if .severity_text == "DEBUG" {
5
} else {
9
}
.log_attributes = {
"file.path": .file,
"host.name": get_hostname()
}
del(.message)
del(.source_type)
'''
[sources.docker_logs]
type = "docker_logs"
include_containers = ["*"]
[transforms.remap_docker_logs]
inputs = ["docker_logs"]
type = "remap"
source = '''
.timestamp = now()
.service_name = .container_name
.namespace = "containers"
.body = .message
.severity_text = if match(.message, r"(?i)error|exception|fail|critical") {
"ERROR"
} else if match(.message, r"(?i)warn|warning") {
"WARN"
} else if match(.message, r"(?i)debug") {
"DEBUG"
} else {
"INFO"
}
.severity_number = if .severity_text == "ERROR" {
17
} else if .severity_text == "WARN" {
13
} else if .severity_text == "DEBUG" {
5
} else {
9
}
.log_attributes = {
"container.name": .container_name,
"container.image": .container_image,
"container.id": .container_id
}
del(.message)
del(.source_type)
'''
[sources.journald_logs]
type = "journald"
include_units = ["nginx", "sshd", "my-app"]
[transforms.remap_journald_logs]
inputs = ["journald_logs"]
type = "remap"
source = '''
.timestamp = now()
.service_name = ._SYSTEMD_UNIT ?? "unknown"
.namespace = "systemd"
.body = .message
# journald uses numeric priority (0-7)
priority = to_int(._PRIORITY) ?? 6
.severity_text = if priority <= 3 {
"ERROR"
} else if priority == 4 {
"WARN"
} else if priority == 7 {
"DEBUG"
} else {
"INFO"
}
.severity_number = if .severity_text == "ERROR" {
17
} else if .severity_text == "WARN" {
13
} else if .severity_text == "DEBUG" {
5
} else {
9
}
.log_attributes = {
"systemd.unit": ._SYSTEMD_UNIT ?? "",
"host.name": ._HOSTNAME ?? "",
"process.pid": to_string(._PID) ?? "",
"process.uid": to_string(._UID) ?? ""
}
del(.message)
del(.source_type)
'''

Add a second transform stage to attach environment metadata:

[transforms.enrich_logs]
inputs = ["remap_logs"] # chain after your remap transform
type = "remap"
source = '''
.log_attributes.environment = get_env_var!("ENVIRONMENT")
.log_attributes.version = get_env_var!("APP_VERSION")
.log_attributes.region = get_env_var!("REGION")
'''

Then update the sink to read from enrich_logs instead:

[sinks.clickhouse]
inputs = ["enrich_logs"]
# ... rest stays the same

Once logs are flowing into ClickHouse:

  1. Log in to Logchef
  2. Go to Sources > Add Source
  3. Enter the ClickHouse connection details (host, port, database, table)
  4. Create or select a team and assign the source

Start querying:

namespace="syslog" and severity_text="ERROR"
service_name="my-app" and body~"connection refused"
log_attributes.container.name="api-server"

See Search Syntax and Query Examples for more.