API Reference¶
questdb.ingress¶
API for fast data ingestion into QuestDB.
- class questdb.ingress.Buffer¶
Bases:
objectConstruct QuestDB-flavored InfluxDB Line Protocol (ILP) messages.
The
Buffer.row()method is used to add a row to the buffer.You can call this many times.
from questdb.ingress import Buffer buf = Buffer() buf.row( 'table_name1', symbols={'s1', 'v1', 's2', 'v2'}, columns={'c1': True, 'c2': 0.5}) buf.row( 'table_name2', symbols={'questdb': '❤️'}, columns={'like': 100000}) # Append any additional rows then, once ready, call sender.flush(buffer) # a `Sender` instance. # The sender auto-cleared the buffer, ready for reuse. buf.row( 'table_name1', symbols={'s1', 'v1', 's2', 'v2'}, columns={'c1': True, 'c2': 0.5}) # etc.
- Buffer Constructor Arguments:
init_capacity(int): Initial capacity of the buffer in bytes. Defaults to65536(64KiB).max_name_len(int): Maximum length of a column name. Defaults to127which is the same default value as QuestDB. This should match thecairo.max.file.name.lengthsetting of the QuestDB instance you’re connecting to.
# These two buffer constructions are equivalent. buf1 = Buffer() buf2 = Buffer(init_capacity=65536, max_name_len=127)
To avoid having to manually set these arguments every time, you can call the sender’s
new_buffer()method instead.from questdb.ingress import Sender, Buffer sender = Sender(host='localhost', port=9009, init_capacity=16384, max_name_len=64) buf = sender.new_buffer() assert buf.init_capacity == 16384 assert buf.max_name_len == 64
- __str__()¶
Return the constructed buffer as a string. Use for debugging.
- capacity() int¶
The current buffer capacity.
- clear()¶
Reset the buffer.
Note that flushing a buffer will (unless otherwise specified) also automatically clear it.
This method is designed to be called only in conjunction with
sender.flush(buffer, clear=False).
- init_capacity¶
The initial capacity of the buffer when first created.
This may grow over time, see
capacity().
- max_name_len¶
Maximum length of a table or column name.
- reserve(additional: int)¶
Ensure the buffer has at least additional bytes of future capacity.
- Parameters
additional (int) – Additional bytes to reserve.
- row(table_name: unicode, *, symbols: Optional[Dict[str, str]], columns: Optional[Dict[str, Union[bool, int, float, str, TimestampMicros, datetime]]], at: Union[None, TimestampNanos, datetime])¶
Add a single row (line) to the buffer.
At least one
symbolsorcolumnsmust be specified.# All fields specified. buffer.row( 'table_name', symbols={'sym1': 'abc', 'sym2': 'def'}, columns={ 'col1': True, 'col2': 123, 'col3': 3.14, 'col4': 'xyz', 'col5': TimestampMicros(123456789), 'col6': datetime(2019, 1, 1, 12, 0, 0)}, at=TimestampNanos(123456789)) # Only symbols specified. Designated timestamp assigned by the db. buffer.row( 'table_name', symbols={'sym1': 'abc', 'sym2': 'def'}) # Float columns and timestamp specified as `datetime.datetime`. # Pay special attention to the timezone, which if unspecified is # assumed to be the local timezone (and not UTC). buffer.row( 'sensor data', columns={ 'temperature': 24.5, 'humidity': 0.5}, at=datetime.datetime.utcnow())
Python strings passed as values to
symbolsare going to be encoded as theSYMBOLtype in QuestDB, whilst Python strings passed as values tocolumnsare going to be encoded as theSTRINGtype.Refer to the QuestDB documentation to understand the difference between the
SYMBOLandSTRINGtypes (TL;DR: symbols are interned strings).- Parameters
table_name – The name of the table to which the row belongs.
symbols – A dictionary of symbol column names to
strvalues.columns – A dictionary of column names to
bool,int,float,str,TimestampMicrosordatetimevalues.at – The timestamp of the row. If
None, timestamp is assigned by the server. Ifdatetime, the timestamp is converted to nanoseconds. A nanosecond unix epoch timestamp can be passed explicitly as aTimestampNanosobject.
- exception questdb.ingress.IngressError(code, msg)¶
Bases:
ExceptionAn error whilst using the
Senderor constructing itsBuffer.- __init__(code, msg)¶
- property code: IngressErrorCode¶
Return the error code.
- class questdb.ingress.IngressErrorCode(value)¶
Bases:
EnumCategory of Error.
- AuthError = 6¶
- CouldNotResolveAddr = 0¶
- InvalidApiCall = 1¶
- InvalidName = 4¶
- InvalidTimestamp = 5¶
- InvalidUtf8 = 3¶
- SocketError = 2¶
- TlsError = 7¶
- class questdb.ingress.Sender¶
Bases:
objectA sender is a client that inserts rows into QuestDB via the ILP protocol.
Inserting two rows
In this example, data will be flushed and sent at the end of the
withblock.with Sender('localhost', 9009) as sender: sender.row( 'weather_sensor', symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}) sensor.row( 'weather_sensor', symbols={'id': 'dubai2'}, columns={'temperature': 41.2, 'humidity': 0.34})
The
Senderobject holds an internal buffer. The call to.row()simply forwards all arguments to theBuffer.row()method.Explicit flushing
An explicit call to
Sender.flush()will send any pending data immediately.with Sender('localhost', 9009) as sender: sender.row( 'weather_sensor', symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}) sender.flush() sender.row( 'weather_sensor', symbols={'id': 'dubai2'}, columns={'temperature': 41.2, 'humidity': 0.34}) sender.flush()
Auto-flushing (on by default)
To avoid accumulating very large buffers, the sender will flush the buffer automatically once its buffer reaches a certain byte-size watermark.
You can control this behavior by setting the
auto_flushargument.# Never flushes automatically. sender = Sender('localhost', 9009, auto_flush=False) sender = Sender('localhost', 9009, auto_flush=None) # Ditto. sender = Sender('localhost', 9009, auto_flush=0) # Ditto. # Flushes automatically when the buffer reaches 1KiB. sender = Sender('localhost', 9009, auto_flush=1024) # Flushes automatically after every row. sender = Sender('localhost', 9009, auto_flush=True) sender = Sender('localhost', 9009, auto_flush=1) # Ditto.
Authentication and TLS Encryption
This implementation supports authentication and TLS full-connection encryption.
The
Sender(.., auth=..)argument is a tuple of(kid, d, x, y)as documented on the QuestDB ILP authentication documentation. Authentication is optional and disabled by default.The
Sender(.., tls=..)argument is one of:False: No TLS encryption (default).True: TLS encryption, accepting all common certificates as recognized by the webpki-roots Rust crate which in turn relies on https://mkcert.org/.A
strorpathlib.Path: Path to a PEM-encoded certificate authority file. This is useful for testing with self-signed certificates.A special
'insecure_skip_verify'string: Dangerously disable all TLS certificate verification (do NOT use in production environments).
Positional constructor arguments for the ``Sender(..)``
host: Hostname or IP address of the QuestDB server.port: Port number of the QuestDB server.
Keyword-only constructor arguments for the ``Sender(..)``
interface(str): Network interface to bind to. Set this if you have an accelerated network interface (e.g. Solarflare) and want to use it.auth(tuple): Authentication tuple orNone(default). See above for details.tls(bool,pathlib.Pathorstr): TLS configuration orFalse(default). See above for details.read_timeout(int): How long to wait for messages from the QuestDB server during the TLS handshake or authentication process. This field is expressed in milliseconds. The default is 15 seconds.init_capacity(int): Initial buffer capacity of the internal buffer. See :class:`Buffer`’s constructor for more details.max_name_length(int): Maximum length of a table or column name. See :class:`Buffer`’s constructor for more details.auto_flush(boolorint): Whether to automatically flush the buffer when it reaches a certain byte-size watermark. See above for details.
- __enter__()¶
Call
Sender.connect()at the start of awithblock.
- __exit__(exc_type, _exc_val, _exc_tb)¶
Flush pending and disconnect at the end of a
withblock.If the
withblock raises an exception, any pending data will NOT be flushed.This is implemented by calling
Sender.close().
- __str__()¶
Inspect the contents of the internal buffer.
The
strvalue returned represents the unsent data.Also see
Sender.__len__().
- close(flush)¶
Disconnect.
This method is idempotent and can be called repeatedly.
Once a sender is closed, it can’t be re-used.
- Parameters
flush (bool) – If
True, flush the internal buffer before closing.
- connect()¶
Connect to the QuestDB server.
This method is synchronous and will block until the connection is established.
If the connection is set up with authentication and/or TLS, this method will return only after the handshake(s) is/are complete.
- flush(buffer, clear)¶
If called with no arguments, immediately flushes the internal buffer.
Alternatively you can flush a buffer that was constructed explicitly by passing
buffer.The buffer will be cleared by default, unless
clearis set toFalse.This method does nothing if the provided or internal buffer is empty.
- Parameters
buffer – The buffer to flush. If
None, the internal buffer is flushed.clear – If
True, the flushed buffer is cleared (default). IfFalse, the flushed buffer is left in the internal buffer. Note thatclear=Falseis only supported ifbufferis also specified.
- init_capacity¶
The initial capacity of the sender’s internal buffer.
- max_name_len¶
Maximum length of a table or column name.
- new_buffer()¶
Make a new configured buffer.
The buffer is set up with the configured init_capacity and max_name_len.
- row(table_name: unicode, *, symbols: Optional[Dict[str, str]], columns: Optional[Dict[str, Union[bool, int, float, str, TimestampMicros, datetime]]], at: Union[None, TimestampNanos, datetime])¶
Write a row to the internal buffer.
This may be sent automatically depending on the
auto_flushsetting in the constructor.Refer to the
Buffer.row()documentation for details on arguments.
- class questdb.ingress.TimestampMicros¶
Bases:
objectA timestamp in microseconds since the UNIX epoch.
You may construct a
TimestampMicrosfrom an integer or adatetime.# Can't be negative. TimestampMicros(1657888365426838016) # Careful with the timezeone! TimestampMicros.from_datetime(datetime.datetime.utcnow())
When constructing from a
datetime, you should take extra care to ensure that the timezone is correct.For example,
datetime.now()implies the local timezone which is probably not what you want.When constructing the
datetimeobject explicity, you pass in the timezone to use.TimestampMicros.from_datetime( datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc))
- classmethod from_datetime(dt: datetime)¶
Construct a
TimestampMicrosfrom adatetime.datetimeobject.
- value¶
Number of microseconds.
- class questdb.ingress.TimestampNanos¶
Bases:
objectA timestamp in nanoseconds since the UNIX epoch.
You may construct a
TimestampNanosfrom an integer or adatetime.# Can't be negative. TimestampNanos(1657888365426838016) # Careful with the timezeone! TimestampNanos.from_datetime(datetime.datetime.utcnow())
When constructing from a
datetime, you should take extra care to ensure that the timezone is correct.For example,
datetime.now()implies the local timezone which is probably not what you want.When constructing the
datetimeobject explicity, you pass in the timezone to use.TimestampMicros.from_datetime( datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc))
- classmethod from_datetime(dt: datetime)¶
Construct a
TimestampNanosfrom adatetime.datetimeobject.
- value¶
Number of nanoseconds.