API Reference¶
questdb.ingress¶
- class questdb.ingress.Sender¶
Bases:
objectIngest data into QuestDB.
See the Sending Data over ILP documentation for more information.
- __enter__() Sender¶
Call
Sender.establish()at the start of awithblock.
- __exit__(exc_type, _exc_val, _exc_tb)¶
Flush pending and disconnect at the end of a
withblock.If the
withblock raises an exception, any pending data will NOT be flushed.This is implemented by calling
Sender.close().
- __init__(*args, **kwargs)¶
- auto_flush¶
Auto-flushing is enabled.
Consult the .auto_flush_rows, .auto_flush_bytes and .auto_flush_interval properties for the current active thresholds.
- auto_flush_bytes¶
Byte-count threshold for the auto-flush logic, or None if disabled.
- auto_flush_interval¶
Time interval threshold for the auto-flush logic, or None if disabled.
- auto_flush_rows¶
Row count threshold for the auto-flush logic, or None if disabled.
- close(flush=True)¶
Disconnect.
This method is idempotent and can be called repeatedly.
Once a sender is closed, it can’t be re-used.
- Parameters:
flush (bool) – If
True, flush the internal buffer before closing.
- dataframe(df, *, table_name: str | None = None, table_name_col: None | int | str = None, symbols: str | bool | List[int] | List[str] = 'auto', at: ServerTimestampType | int | str | TimestampNanos | datetime)¶
Write a Pandas DataFrame to the internal buffer.
Example:
import pandas as pd import questdb.ingress as qi df = pd.DataFrame({ 'car': pd.Categorical(['Nic 42', 'Eddi', 'Nic 42', 'Eddi']), 'position': [1, 2, 1, 2], 'speed': [89.3, 98.2, 3, 4], 'lat_gforce': [0.1, -0.2, -0.6, 0.4], 'accelleration': [0.1, -0.2, 0.6, 4.4], 'tyre_pressure': [2.6, 2.5, 2.6, 2.5], 'ts': [ pd.Timestamp('2022-08-09 13:56:00'), pd.Timestamp('2022-08-09 13:56:01'), pd.Timestamp('2022-08-09 13:56:02'), pd.Timestamp('2022-08-09 13:56:03')]}) with qi.Sender.from_env() as sender: sender.dataframe(df, table_name='race_metrics', at='ts')
This method builds on top of the
Buffer.dataframe()method. See its documentation for details on arguments.Additionally, this method also supports auto-flushing the buffer as specified in the
Sender’sauto_flushconstructor argument. Auto-flushing is implemented incrementally, meanting that when callingsender.dataframe(df)with a largedf, the sender may have sent some of the rows to the server already whist the rest of the rows are going to be sent at the next auto-flush or next explicit call toSender.flush().In case of data errors with auto-flushing enabled, some of the rows may have been transmitted to the server already.
- establish()¶
Prepare the sender for use.
If using ILP/HTTP this will initialize the HTTP connection pool.
If using ILP/TCP this will cause connection to the server and block until the connection is established.
If the TCP connection is set up with authentication and/or TLS, this method will return only after the handshake(s) is/are complete.
- flush(buffer=None, clear=True, transactional=False)¶
If called with no arguments, immediately flushes the internal buffer.
Alternatively you can flush a buffer that was constructed explicitly by passing
buffer.The buffer will be cleared by default, unless
clearis set toFalse.This method does nothing if the provided or internal buffer is empty.
- Parameters:
buffer – The buffer to flush. If
None, the internal buffer is flushed.clear – If
True, the flushed buffer is cleared (default). IfFalse, the flushed buffer is left in the internal buffer. Note thatclear=Falseis only supported ifbufferis also specified.transactional – If
Trueensures that the flushed buffer contains row for a single table, ensuring all data can be written transactionally. This feature requires ILP/HTTP and is not available when connecting over TCP. Default: False.
The Python GIL is released during the network IO operation.
- static from_conf(conf_str, *, bind_interface=None, username=None, password=None, token=None, token_x=None, token_y=None, auth_timeout=None, tls_verify=None, tls_ca=None, tls_roots=None, max_buf_size=None, retry_timeout=None, request_min_throughput=None, request_timeout=None, auto_flush=None, auto_flush_rows=None, auto_flush_bytes=None, auto_flush_interval=None, protocol_version=None, init_buf_size=None, max_name_len=None)¶
Construct a sender from a configuration string.
The additional arguments are used to specify additional parameters which are not present in the configuration string.
Note that any parameters already present in the configuration string cannot be overridden.
- static from_env(*, bind_interface=None, username=None, password=None, token=None, token_x=None, token_y=None, auth_timeout=None, tls_verify=None, tls_ca=None, tls_roots=None, max_buf_size=None, retry_timeout=None, request_min_throughput=None, request_timeout=None, auto_flush=None, auto_flush_rows=None, auto_flush_bytes=None, auto_flush_interval=None, protocol_version=None, init_buf_size=None, max_name_len=None)¶
Construct a sender from the
QDB_CLIENT_CONFenvironment variable.The environment variable must be set to a valid configuration string.
The additional arguments are used to specify additional parameters which are not present in the configuration string.
Note that any parameters already present in the configuration string cannot be overridden.
- init_buf_size¶
The initial capacity of the sender’s internal buffer.
- max_name_len¶
Maximum length of a table or column name.
- new_buffer()¶
Make a new configured buffer.
The buffer is set up with the configured init_buf_size and max_name_len.
- protocol_version¶
The protocol version used by the sender.
Protocol version 1 is retained for backwards compatibility with older QuestDB versions.
Protocol version 2 introduces binary floating point support and the array datatype.
- row(table_name: str, *, symbols: Dict[str, str] | None = None, columns: Dict[str, bool | int | float | str | TimestampMicros | datetime | np.ndarray] | None = None, at: TimestampNanos | datetime | ServerTimestampType)¶
Write a row to the internal buffer.
This may be sent automatically depending on the
auto_flushsetting in the constructor.Refer to the
Buffer.row()documentation for details on arguments.
- transaction(table_name: str)¶
Start a HTTP Transactions block.
- class questdb.ingress.Buffer¶
Bases:
objectConstruct QuestDB InfluxDB Line Protocol (ILP) messages. Version 1 is compatible with the InfluxDB Line Protocol.
The
Buffer.row()method is used to add a row to the buffer.You can call this many times.
from questdb.ingress import Buffer buf = Buffer() buf.row( 'table_name1', symbols={'s1', 'v1', 's2', 'v2'}, columns={'c1': True, 'c2': 0.5}) buf.row( 'table_name2', symbols={'questdb': '❤️'}, columns={'like': 100000}) # Append any additional rows then, once ready, call sender.flush(buffer) # a `Sender` instance. # The sender auto-cleared the buffer, ready for reuse. buf.row( 'table_name1', symbols={'s1', 'v1', 's2', 'v2'}, columns={'c1': True, 'c2': 0.5}) # etc.
- Buffer Constructor Arguments:
init_buf_size(int): Initial capacity of the buffer in bytes. Defaults to65536(64KiB).max_name_len(int): Maximum length of a column name. Defaults to127which is the same default value as QuestDB. This should match thecairo.max.file.name.lengthsetting of the QuestDB instance you’re connecting to.
# These two buffer constructions are equivalent. buf1 = Buffer() buf2 = Buffer(init_buf_size=65536, max_name_len=127)
To avoid having to manually set these arguments every time, you can call the sender’s
new_buffer()method instead.from questdb.ingress import Sender, Buffer sender = Sender('http', 'localhost', 9009, init_buf_size=16384, max_name_len=64) buf = sender.new_buffer() assert buf.init_buf_size == 16384 assert buf.max_name_len == 64
- clear()¶
Reset the buffer.
Note that flushing a buffer will (unless otherwise specified) also automatically clear it.
This method is designed to be called only in conjunction with
sender.flush(buffer, clear=False).
- dataframe(df, *, table_name: str | None = None, table_name_col: None | int | str = None, symbols: str | bool | List[int] | List[str] = 'auto', at: ServerTimestampType | int | str | TimestampNanos | datetime)¶
Add a pandas DataFrame to the buffer.
Also see the
Sender.dataframe()method if you’re not using the buffer explicitly. It supports the same parameters and also supports auto-flushing.This feature requires the
pandas,numpyandpyarrowpackage to be installed.Adding a dataframe can trigger auto-flushing behaviour, even between rows of the same dataframe. To avoid this, you can use HTTP and transactions (see
Sender.transaction()).- Parameters:
df (pandas.DataFrame) – The pandas DataFrame to serialize to the buffer.
table_name (str or None) –
The name of the table to which the rows belong.
If
None, the table name is taken from thetable_name_colparameter. If bothtable_nameandtable_name_colareNone, the table name is taken from the DataFrame’s index name (df.index.nameattribute).table_name_col (str or int or None) –
The name or index of the column in the DataFrame that contains the table name.
If
None, the table name is taken from thetable_nameparameter. If bothtable_nameandtable_name_colareNone, the table name is taken from the DataFrame’s index name (df.index.nameattribute).If
table_name_colis an integer, it is interpreted as the index of the column starting from0. The index of the column can be negative, in which case it is interpreted as an offset from the end of the DataFrame. E.g.-1is the last column.symbols (str or bool or list of str or list of int) –
The columns to be serialized as symbols.
If
'auto'(default), all columns of dtype'categorical'are serialized as symbols. IfTrue, allstrcolumns are serialized as symbols. IfFalse, no columns are serialized as symbols.The list of symbols can also be specified explicitly as a
listof column names (str) or indices (int). Integer indices start at0and can be negative, offset from the end of the DataFrame. E.g.-1is the last column.Only columns containing strings can be serialized as symbols.
at (TimestampNanos, datetime.datetime, int or str or None) –
The designated timestamp of the rows.
You can specify a single value for all rows or column name or index. If
ServerTimestamp, timestamp is assigned by the server for all rows. To pass in a timestamp explicitly as an integer use theTimestampNanoswrapper type. To get the current timestamp, useTimestampNanos.now(). When passing adatetime.datetimeobject, the timestamp is converted to nanoseconds. Adatetimeobject is assumed to be in the local timezone unless one is specified explicitly (so calldatetime.datetime.now(tz=datetime.timezone.utc)instead ofdatetime.datetime.utcnow()for the current timestamp to avoid bugs).To specify a different timestamp for each row, pass in a column name (
str) or index (int, 0-based index, negative index supported): In this case, the column needs to be of dtypedatetime64[ns](assumed to be in the UTC timezone and not local, due to differences in Pandas and Python datetime handling) ordatetime64[ns, tz]. When a timezone is specified in the column, it is converted to UTC automatically.A timestamp column can also contain
Nonevalues. The server will assign the current timestamp to those rows.Note: All timestamps are always converted to nanoseconds and in the UTC timezone. Timezone information is dropped before sending and QuestDB will not store any timezone information.
Note: It is an error to specify both
table_nameandtable_name_col.Note: The “index” column of the DataFrame is never serialized, even if it is named.
Example:
import pandas as pd import questdb.ingress as qi buf = qi.Buffer(protocol_version=2) # ... df = pd.DataFrame({ 'location': ['London', 'Managua', 'London'], 'temperature': [24.5, 35.0, 25.5], 'humidity': [0.5, 0.6, 0.45], 'ts': pd.date_range('2021-07-01', periods=3)}) buf.dataframe( df, table_name='weather', at='ts', symbols=['location']) # ... sender.flush(buf)
Pandas to ILP datatype mappings
Pandas Mappings¶ Pandas
dtypeNulls
ILP Datatype
'bool'N
BOOLEAN'boolean'N α
BOOLEAN'object'(boolobjects)N α
BOOLEAN'uint8'N
INTEGER'int8'N
INTEGER'uint16'N
INTEGER'int16'N
INTEGER'uint32'N
INTEGER'int32'N
INTEGER'uint64'N
INTEGERβ'int64'N
INTEGER'UInt8'Y
INTEGER'Int8'Y
INTEGER'UInt16'Y
INTEGER'Int16'Y
INTEGER'UInt32'Y
INTEGER'Int32'Y
INTEGER'UInt64'Y
INTEGERβ'Int64'Y
INTEGER'object'(intobjects)Y
INTEGERβ'float32'γY (
NaN)FLOAT'float64'Y (
NaN)FLOAT'object'(floatobjects)Y (
NaN)FLOAT'string'(strobjects)Y
STRING(default),SYMBOLviasymbolsarg. δ'string[pyarrow]'Y
STRING(default),SYMBOLviasymbolsarg. δ'category'(strobjects) εY
SYMBOL(default),STRINGviasymbolsarg. δ'object'(strobjects)Y
STRING(default),SYMBOLviasymbolsarg. δ'datetime64[ns]'Y
TIMESTAMPζ'datetime64[ns, tz]'Y
TIMESTAMPζNote
α: Note some pandas dtypes allow nulls (e.g.
'boolean'), where the QuestDB database does not.β: The valid range for integer values is -2^63 to 2^63-1. Any
'uint64','UInt64'or pythonintobject values outside this range will raise an error during serialization.γ: Upcast to 64-bit float during serialization.
δ: Columns containing strings can also be used to specify the table name. See
table_name_col.ε: We only support categories containing strings. If the category contains non-string values, an error will be raised.
ζ: The ‘.dataframe()’ method only supports datetimes with nanosecond precision. The designated timestamp column (see
atparameter) maintains the nanosecond precision, whilst values stored as columns have their precision truncated to microseconds. All dates are sent as UTC and any additional timezone information is dropped. If no timezone is specified, we follow the pandas convention of assuming the timezone is UTC. Datetimes before 1970-01-01 00:00:00 UTC are not supported. If a datetime value is specified asNone(NaT), it is interpreted as the current QuestDB server time set on receipt of message.
Error Handling and Recovery
In case an exception is raised during dataframe serialization, the buffer is left in its previous state. The buffer remains in a valid state and can be used for further calls even after an error.
For clarification, as an example, if an invalid
Nonevalue appears at the 3rd row for aboolcolumn, neither the 3rd nor the preceding rows are added to the buffer.Note: This differs from the
Sender.dataframe()method, which modifies this guarantee due to itsauto_flushlogic.Performance Considerations
The Python GIL is released during serialization if it is not needed. If any column requires the GIL, the entire serialization is done whilst holding the GIL.
Column types that require the GIL are:
Columns of
str,floatorintorfloatPython objects.The
'string[python]'dtype.
- init_buf_size¶
The initial capacity of the buffer when first created.
This may grow over time, see
capacity().
- max_name_len¶
Maximum length of a table or column name.
- reserve(additional: int)¶
Ensure the buffer has at least additional bytes of future capacity.
- Parameters:
additional (int) – Additional bytes to reserve.
- row(table_name: str, *, symbols: Dict[str, str | None] | None = None, columns: Dict[str, None | bool | int | float | str | TimestampMicros | datetime | np.ndarray] | None = None, at: ServerTimestampType | TimestampNanos | datetime)¶
Add a single row (line) to the buffer.
# All fields specified. buffer.row( 'table_name', symbols={'sym1': 'abc', 'sym2': 'def', 'sym3': None}, columns={ 'col1': True, 'col2': 123, 'col3': 3.14, 'col4': 'xyz', 'col5': TimestampMicros(123456789), 'col6': datetime(2019, 1, 1, 12, 0, 0), 'col7': np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]), 'col8': None}, at=TimestampNanos(123456789)) # Only symbols specified. Designated timestamp assigned by the db. buffer.row( 'table_name', symbols={'sym1': 'abc', 'sym2': 'def'}, at=Server.Timestamp) # Float columns and timestamp specified as `datetime.datetime`. # Pay special attention to the timezone, which if unspecified is # assumed to be the local timezone (and not UTC). buffer.row( 'sensor data', columns={ 'temperature': 24.5, 'humidity': 0.5}, at=datetime.datetime.now(tz=datetime.timezone.utc))
Python strings passed as values to
symbolsare going to be encoded as theSYMBOLtype in QuestDB, whilst Python strings passed as values tocolumnsare going to be encoded as theSTRINGtype.Refer to the QuestDB documentation to understand the difference between the
SYMBOLandSTRINGtypes (TL;DR: symbols are interned strings).Column values can be specified with Python types directly and map as so:
Python type
Serialized as ILP type
boolintfloatstrnp.ndarraydatetime.datetimeandTimestampMicrosNoneColumn is skipped and not serialized.
If the destination table was already created, then the columns types will be cast to the types of the existing columns whenever possible (Refer to the QuestDB documentation pages linked above).
Adding a row can trigger auto-flushing behaviour.
- Parameters:
table_name – The name of the table to which the row belongs.
symbols – A dictionary of symbol column names to
strvalues. As a convenience, you can also pass aNonevalue which will have the same effect as skipping the key: If the column already existed, it will be recorded asNULL, otherwise it will not be created.columns – A dictionary of column names to
bool,int,float,str,TimestampMicrosordatetimevalues. As a convenience, you can also pass aNonevalue which will have the same effect as skipping the key: If the column already existed, it will be recorded asNULL, otherwise it will not be created.at – The timestamp of the row. This is required! If
ServerTimestamp, timestamp is assigned by QuestDB. Ifdatetime, the timestamp is converted to nanoseconds. A nanosecond unix epoch timestamp can be passed explicitly as aTimestampNanosobject.
- class questdb.ingress.SenderTransaction¶
Bases:
objectA transaction for a specific table.
Transactions are not supported with ILP/TCP, only ILP/HTTP.
The sender API can only operate on one transaction at a time.
To create a transaction:
- __enter__()¶
- __exit__(exc_type, _exc_value, _traceback)¶
- commit()¶
Commit the transaction.
A commit is also automatic at the end of a successful with block.
This will flush the buffer.
- dataframe(df, *, symbols: str | bool | List[int] | List[str] = 'auto', at: ServerTimestampType | int | str | TimestampNanos | datetime)¶
Write a dataframe for the table in the transaction.
The table name is taken from the transaction.
- rollback()¶
Roll back the transaction.
A rollback is also automatic at the end of a failed with block.
This will clear the buffer.
- row(*, symbols: Dict[str, str | None] | None = None, columns: Dict[str, None | bool | int | float | str | TimestampMicros | datetime | np.ndarray] | None = None, at: ServerTimestampType | TimestampNanos | datetime)¶
Write a row for the table in the transaction.
The table name is taken from the transaction.
- class questdb.ingress.IngressError(code, msg)¶
Bases:
ExceptionAn error whilst using the
Senderor constructing itsBuffer.- __init__(code, msg)¶
- property code: IngressErrorCode¶
Return the error code.
- class questdb.ingress.IngressErrorCode(*values)¶
Bases:
EnumCategory of Error.
- ArrayInternalError = 12¶
- ArrayLargeDimError = 11¶
- ArrayWriteToBufferError = 13¶
- AuthError = 6¶
- BadDataFrame = 15¶
- ConfigError = 10¶
- CouldNotResolveAddr = 0¶
- HttpNotSupported = 8¶
- InvalidApiCall = 1¶
- InvalidName = 4¶
- InvalidTimestamp = 5¶
- InvalidUtf8 = 3¶
- ProtocolVersionError = 14¶
- ServerFlushError = 9¶
- SocketError = 2¶
- TlsError = 7¶
- class questdb.ingress.Protocol(*values)¶
Bases:
TaggedEnumProtocol to use for sending data to QuestDB.
See ILP/TCP or ILP/HTTP for more information.
- Http = ('http', 2)¶
- Https = ('https', 3)¶
- Tcp = ('tcp', 0)¶
- Tcps = ('tcps', 1)¶
- property tls_enabled¶
- class questdb.ingress.TimestampMicros¶
Bases:
objectA timestamp in microseconds since the UNIX epoch (UTC).
You may construct a
TimestampMicrosfrom an integer or adatetime.datetime, or simply call theTimestampMicros.now()method.# Recommended way to get the current timestamp. TimestampMicros.now() # The above is equivalent to: TimestampMicros(time.time_ns() // 1000) # You can provide a numeric timestamp too. It can't be negative. TimestampMicros(1657888365426838)
TimestampMicroscan also be constructed from adatetime.datetimeobject.TimestampMicros.from_datetime( datetime.datetime.now(tz=datetime.timezone.utc))
We recommend that when using
datetimeobjects, you explicitly pass in the timezone to use. This is becausedatetimeobjects without an associated timezone are assumed to be in the local timezone and it is easy to make mistakes (e.g. passingdatetime.datetime.utcnow()is a likely bug).- classmethod from_datetime(dt: datetime)¶
Construct a
TimestampMicrosfrom adatetime.datetimeobject.
- classmethod now()¶
Construct a
TimestampMicrosfrom the current time as UTC.
- value¶
Number of microseconds (Unix epoch timestamp, UTC).
- class questdb.ingress.TimestampNanos¶
Bases:
objectA timestamp in nanoseconds since the UNIX epoch (UTC).
You may construct a
TimestampNanosfrom an integer or adatetime.datetime, or simply call theTimestampNanos.now()method.# Recommended way to get the current timestamp. TimestampNanos.now() # The above is equivalent to: TimestampNanos(time.time_ns()) # You can provide a numeric timestamp too. It can't be negative. TimestampNanos(1657888365426838016)
TimestampNanoscan also be constructed from adatetimeobject.TimestampNanos.from_datetime( datetime.datetime.now(tz=datetime.timezone.utc))
We recommend that when using
datetimeobjects, you explicitly pass in the timezone to use. This is becausedatetimeobjects without an associated timezone are assumed to be in the local timezone and it is easy to make mistakes (e.g. passingdatetime.datetime.utcnow()is a likely bug).- classmethod from_datetime(dt: datetime)¶
Construct a
TimestampNanosfrom adatetime.datetimeobject.
- classmethod now()¶
Construct a
TimestampNanosfrom the current time as UTC.
- value¶
Number of nanoseconds (Unix epoch timestamp, UTC).
- class questdb.ingress.TlsCa(*values)¶
Bases:
TaggedEnumVerification mechanism for the server’s certificate.
Here
webpkirefers to the WebPKI library andosrefers to the operating system’s certificate store.See TLS for more information.
- OsRoots = ('os_roots', 1)¶
- PemFile = ('pem_file', 3)¶
- WebpkiAndOsRoots = ('webpki_and_os_roots', 2)¶
- WebpkiRoots = ('webpki_roots', 0)¶
- class questdb.ingress.ServerTimestampType¶
Bases:
objectA placeholder value to indicate that the data should be inserted using a server-generated-timestamp.
Don’t instantiate this class directly, use the singleton
ServerTimestampinstead.This feature is mostly provided for legacy compatibility. We recommend always specifying an explicit timestamp.
Using
ServerTimestampwill prevent QuestDB’s deduplication feature from working as it would generate unique rows on resubmission.
- questdb.ingress.ServerTimestamp¶
A placeholder value to indicate that the data should be inserted using a server-generated-timestamp.
Don’t instantiate this class directly, use the singleton
ServerTimestampinstead.This feature is mostly provided for legacy compatibility. We recommend always specifying an explicit timestamp.
Using
ServerTimestampwill prevent QuestDB’s deduplication feature from working as it would generate unique rows on resubmission.