API Reference

questdb.ingress

API for fast data ingestion into QuestDB.

class questdb.ingress.Buffer

Bases: object

Construct QuestDB-flavored InfluxDB Line Protocol (ILP) messages.

The Buffer.row() method is used to add a row to the buffer.

You can call this many times.

from questdb.ingress import Buffer

buf = Buffer()
buf.row(
    'table_name1',
    symbols={'s1', 'v1', 's2', 'v2'},
    columns={'c1': True, 'c2': 0.5})

buf.row(
    'table_name2',
    symbols={'questdb': '❤️'},
    columns={'like': 100000})

# Append any additional rows then, once ready, call
sender.flush(buffer)  # a `Sender` instance.

# The sender auto-cleared the buffer, ready for reuse.

buf.row(
    'table_name1',
    symbols={'s1', 'v1', 's2', 'v2'},
    columns={'c1': True, 'c2': 0.5})

# etc.
Buffer Constructor Arguments:
  • init_capacity (int): Initial capacity of the buffer in bytes. Defaults to 65536 (64KiB).

  • max_name_len (int): Maximum length of a column name. Defaults to 127 which is the same default value as QuestDB. This should match the cairo.max.file.name.length setting of the QuestDB instance you’re connecting to.

# These two buffer constructions are equivalent.
buf1 = Buffer()
buf2 = Buffer(init_capacity=65536, max_name_len=127)

To avoid having to manually set these arguments every time, you can call the sender’s new_buffer() method instead.

from questdb.ingress import Sender, Buffer

sender = Sender(host='localhost', port=9009,
    init_capacity=16384, max_name_len=64)
buf = sender.new_buffer()
assert buf.init_capacity == 16384
assert buf.max_name_len == 64
__str__()

Return the constructed buffer as a string. Use for debugging.

capacity() int

The current buffer capacity.

clear()

Reset the buffer.

Note that flushing a buffer will (unless otherwise specified) also automatically clear it.

This method is designed to be called only in conjunction with sender.flush(buffer, clear=False).

init_capacity

The initial capacity of the buffer when first created.

This may grow over time, see capacity().

max_name_len

Maximum length of a table or column name.

reserve(additional: int)

Ensure the buffer has at least additional bytes of future capacity.

Parameters

additional (int) – Additional bytes to reserve.

row(table_name: unicode, *, symbols: Optional[Dict[str, str]], columns: Optional[Dict[str, Union[bool, int, float, str, TimestampMicros, datetime]]], at: Union[None, TimestampNanos, datetime])

Add a single row (line) to the buffer.

At least one symbols or columns must be specified.

# All fields specified.
buffer.row(
    'table_name',
    symbols={'sym1': 'abc', 'sym2': 'def'},
    columns={
        'col1': True,
        'col2': 123,
        'col3': 3.14,
        'col4': 'xyz',
        'col5': TimestampMicros(123456789),
        'col6': datetime(2019, 1, 1, 12, 0, 0)},
    at=TimestampNanos(123456789))

# Only symbols specified. Designated timestamp assigned by the db.
buffer.row(
    'table_name',
    symbols={'sym1': 'abc', 'sym2': 'def'})

# Float columns and timestamp specified as `datetime.datetime`.
# Pay special attention to the timezone, which if unspecified is
# assumed to be the local timezone (and not UTC).
buffer.row(
    'sensor data',
    columns={
        'temperature': 24.5,
        'humidity': 0.5},
    at=datetime.datetime.utcnow())

Python strings passed as values to symbols are going to be encoded as the SYMBOL type in QuestDB, whilst Python strings passed as values to columns are going to be encoded as the STRING type.

Refer to the QuestDB documentation to understand the difference between the SYMBOL and STRING types (TL;DR: symbols are interned strings).

Parameters
  • table_name – The name of the table to which the row belongs.

  • symbols – A dictionary of symbol column names to str values.

  • columns – A dictionary of column names to bool, int, float, str, TimestampMicros or datetime values.

  • at – The timestamp of the row. If None, timestamp is assigned by the server. If datetime, the timestamp is converted to nanoseconds. A nanosecond unix epoch timestamp can be passed explicitly as a TimestampNanos object.

exception questdb.ingress.IngressError(code, msg)

Bases: Exception

An error whilst using the Sender or constructing its Buffer.

__init__(code, msg)
property code: IngressErrorCode

Return the error code.

class questdb.ingress.IngressErrorCode(value)

Bases: Enum

Category of Error.

AuthError = 6
CouldNotResolveAddr = 0
InvalidApiCall = 1
InvalidName = 4
InvalidTimestamp = 5
InvalidUtf8 = 3
SocketError = 2
TlsError = 7
class questdb.ingress.Sender

Bases: object

A sender is a client that inserts rows into QuestDB via the ILP protocol.

Inserting two rows

In this example, data will be flushed and sent at the end of the with block.

with Sender('localhost', 9009) as sender:
    sender.row(
        'weather_sensor',
        symbols={'id': 'toronto1'},
        columns={'temperature': 23.5, 'humidity': 0.49})
    sensor.row(
        'weather_sensor',
        symbols={'id': 'dubai2'},
        columns={'temperature': 41.2, 'humidity': 0.34})

The Sender object holds an internal buffer. The call to .row() simply forwards all arguments to the Buffer.row() method.

Explicit flushing

An explicit call to Sender.flush() will send any pending data immediately.

with Sender('localhost', 9009) as sender:
    sender.row(
        'weather_sensor',
        symbols={'id': 'toronto1'},
        columns={'temperature': 23.5, 'humidity': 0.49})
    sender.flush()
    sender.row(
        'weather_sensor',
        symbols={'id': 'dubai2'},
        columns={'temperature': 41.2, 'humidity': 0.34})
    sender.flush()

Auto-flushing (on by default)

To avoid accumulating very large buffers, the sender will flush the buffer automatically once its buffer reaches a certain byte-size watermark.

You can control this behavior by setting the auto_flush argument.

# Never flushes automatically.
sender = Sender('localhost', 9009, auto_flush=False)
sender = Sender('localhost', 9009, auto_flush=None) # Ditto.
sender = Sender('localhost', 9009, auto_flush=0)  # Ditto.

# Flushes automatically when the buffer reaches 1KiB.
sender = Sender('localhost', 9009, auto_flush=1024)

# Flushes automatically after every row.
sender = Sender('localhost', 9009, auto_flush=True)
sender = Sender('localhost', 9009, auto_flush=1)  # Ditto.

Authentication and TLS Encryption

This implementation supports authentication and TLS full-connection encryption.

The Sender(.., auth=..) argument is a tuple of (kid, d, x, y) as documented on the QuestDB ILP authentication documentation. Authentication is optional and disabled by default.

The Sender(.., tls=..) argument is one of:

  • False: No TLS encryption (default).

  • True: TLS encryption, accepting all common certificates as recognized by the webpki-roots Rust crate which in turn relies on https://mkcert.org/.

  • A str or pathlib.Path: Path to a PEM-encoded certificate authority file. This is useful for testing with self-signed certificates.

  • A special 'insecure_skip_verify' string: Dangerously disable all TLS certificate verification (do NOT use in production environments).

Positional constructor arguments for the ``Sender(..)``

  • host: Hostname or IP address of the QuestDB server.

  • port: Port number of the QuestDB server.

Keyword-only constructor arguments for the ``Sender(..)``

  • interface (str): Network interface to bind to. Set this if you have an accelerated network interface (e.g. Solarflare) and want to use it.

  • auth (tuple): Authentication tuple or None (default). See above for details.

  • tls (bool, pathlib.Path or str): TLS configuration or False (default). See above for details.

  • read_timeout (int): How long to wait for messages from the QuestDB server during the TLS handshake or authentication process. This field is expressed in milliseconds. The default is 15 seconds.

  • init_capacity (int): Initial buffer capacity of the internal buffer. See :class:`Buffer`’s constructor for more details.

  • max_name_length (int): Maximum length of a table or column name. See :class:`Buffer`’s constructor for more details.

  • auto_flush (bool or int): Whether to automatically flush the buffer when it reaches a certain byte-size watermark. See above for details.

__enter__()

Call Sender.connect() at the start of a with block.

__exit__(exc_type, _exc_val, _exc_tb)

Flush pending and disconnect at the end of a with block.

If the with block raises an exception, any pending data will NOT be flushed.

This is implemented by calling Sender.close().

__str__()

Inspect the contents of the internal buffer.

The str value returned represents the unsent data.

Also see Sender.__len__().

close(flush)

Disconnect.

This method is idempotent and can be called repeatedly.

Once a sender is closed, it can’t be re-used.

Parameters

flush (bool) – If True, flush the internal buffer before closing.

connect()

Connect to the QuestDB server.

This method is synchronous and will block until the connection is established.

If the connection is set up with authentication and/or TLS, this method will return only after the handshake(s) is/are complete.

flush(buffer, clear)

If called with no arguments, immediately flushes the internal buffer.

Alternatively you can flush a buffer that was constructed explicitly by passing buffer.

The buffer will be cleared by default, unless clear is set to False.

This method does nothing if the provided or internal buffer is empty.

Parameters
  • buffer – The buffer to flush. If None, the internal buffer is flushed.

  • clear – If True, the flushed buffer is cleared (default). If False, the flushed buffer is left in the internal buffer. Note that clear=False is only supported if buffer is also specified.

init_capacity

The initial capacity of the sender’s internal buffer.

max_name_len

Maximum length of a table or column name.

new_buffer()

Make a new configured buffer.

The buffer is set up with the configured init_capacity and max_name_len.

row(table_name: unicode, *, symbols: Optional[Dict[str, str]], columns: Optional[Dict[str, Union[bool, int, float, str, TimestampMicros, datetime]]], at: Union[None, TimestampNanos, datetime])

Write a row to the internal buffer.

This may be sent automatically depending on the auto_flush setting in the constructor.

Refer to the Buffer.row() documentation for details on arguments.

class questdb.ingress.TimestampMicros

Bases: object

A timestamp in microseconds since the UNIX epoch.

You may construct a TimestampMicros from an integer or a datetime.

# Can't be negative.
TimestampMicros(1657888365426838016)

# Careful with the timezeone!
TimestampMicros.from_datetime(datetime.datetime.utcnow())

When constructing from a datetime, you should take extra care to ensure that the timezone is correct.

For example, datetime.now() implies the local timezone which is probably not what you want.

When constructing the datetime object explicity, you pass in the timezone to use.

TimestampMicros.from_datetime(
    datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc))
classmethod from_datetime(dt: datetime)

Construct a TimestampMicros from a datetime.datetime object.

value

Number of microseconds.

class questdb.ingress.TimestampNanos

Bases: object

A timestamp in nanoseconds since the UNIX epoch.

You may construct a TimestampNanos from an integer or a datetime.

# Can't be negative.
TimestampNanos(1657888365426838016)

# Careful with the timezeone!
TimestampNanos.from_datetime(datetime.datetime.utcnow())

When constructing from a datetime, you should take extra care to ensure that the timezone is correct.

For example, datetime.now() implies the local timezone which is probably not what you want.

When constructing the datetime object explicity, you pass in the timezone to use.

TimestampMicros.from_datetime(
    datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc))
classmethod from_datetime(dt: datetime)

Construct a TimestampNanos from a datetime.datetime object.

value

Number of nanoseconds.