![]() |
Jazz 1.25.+
|
Channels: A Container doing block transactions across media (files, folders, shell, http urls and zeroMQ servers) More...
#include <channel.h>
Public Member Functions | |
Channels (pLogger a_logger, pConfigFile a_config) | |
virtual pChar const | id () |
StatusCode | start () |
StatusCode | shut_down () |
virtual StatusCode | get (pTransaction &p_txn, pChar p_what) |
virtual StatusCode | get (pTransaction &p_txn, pChar p_what, pBlock p_row_filter) |
virtual StatusCode | get (pTransaction &p_txn, pChar p_what, pChar name) |
virtual StatusCode | locate (Locator &location, pChar p_what) |
virtual StatusCode | header (StaticBlockHeader &hea, pChar p_what) |
virtual StatusCode | header (pTransaction &p_txn, pChar p_what) |
virtual StatusCode | put (pChar p_where, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT) |
virtual StatusCode | new_entity (pChar p_where) |
virtual StatusCode | remove (pChar p_where) |
virtual StatusCode | copy (pChar p_where, pChar p_what) |
virtual StatusCode | modify (Locator &function, pTuple p_args) |
MHD_StatusCode | forward_get (pTransaction &p_txn, Name node, pChar p_url) |
MHD_StatusCode | forward_put (Name node, pChar p_url, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT) |
MHD_StatusCode | forward_del (Name node, pChar p_url) |
void | base_names (BaseNames &base_names) |
![]() | |
Container (pLogger a_logger, pConfigFile a_config) | |
StatusCode | start () |
StatusCode | shut_down () |
void | enter_read (pTransaction p_txn) |
void | enter_write (pTransaction p_txn) |
void | leave_read (pTransaction p_txn) |
void | leave_write (pTransaction p_txn) |
StatusCode | new_block (pTransaction &p_txn, int cell_type, int *dim, int fill_tensor=FILL_NEW_DONT_FILL, int stringbuff_size=0, const char *p_text=nullptr, char eol='\n', AttributeMap *att=nullptr) |
StatusCode | new_block (pTransaction &p_txn, int num_items, StaticBlockHeader p_hea[], Name p_names[], pBlock p_block[], AttributeMap *dims=nullptr, AttributeMap *att=nullptr) |
StatusCode | new_block (pTransaction &p_txn, pBlock p_from, pBlock p_row_filter, AttributeMap *att=nullptr) |
StatusCode | new_block (pTransaction &p_txn, pTuple p_from, pChar name, AttributeMap *att=nullptr) |
StatusCode | new_block (pTransaction &p_txn, pBlock p_from_text, int cell_type, pKind p_as_kind=nullptr, AttributeMap *att=nullptr) |
StatusCode | new_block (pTransaction &p_txn, pBlock p_from_raw, pChar p_fmt=nullptr, bool ret_as_string=false, AttributeMap *att=nullptr) |
StatusCode | new_block (pTransaction &p_txn, int cell_type) |
StatusCode | new_block (pTransaction &p_txn, Index &index) |
virtual StatusCode | new_transaction (pTransaction &p_txn) |
virtual void | destroy_transaction (pTransaction &p_txn) |
virtual StatusCode | exec (pTransaction &p_txn, Locator &function, pTuple p_args) |
virtual StatusCode | as_locator (Locator &result, pChar p_what) |
virtual StatusCode | get (pTransaction &p_txn, Locator &what) |
virtual StatusCode | get (pTransaction &p_txn, Locator &what, pBlock p_row_filter) |
virtual StatusCode | get (pTransaction &p_txn, Locator &what, pChar name) |
virtual StatusCode | locate (Locator &location, Locator &what) |
virtual StatusCode | header (StaticBlockHeader &hea, Locator &what) |
virtual StatusCode | header (pTransaction &p_txn, Locator &what) |
virtual StatusCode | put (Locator &where, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT) |
virtual StatusCode | new_entity (Locator &where) |
virtual StatusCode | remove (Locator &where) |
virtual StatusCode | copy (Locator &where, Locator &what) |
StatusCode | unwrap_received (pTransaction &p_txn) |
StatusCode | unwrap_received (pTransaction &p_txn, pBlock p_maybe_block, int rec_size) |
void | base_names (BaseNames &base_names) |
![]() | |
Service (pLogger a_logger, pConfigFile a_config) | |
void | log (int loglevel, const char *message) |
void | log_printf (int loglevel, const char *fmt,...) |
bool | get_conf_key (const char *key, int &value) |
bool | get_conf_key (const char *key, double &value) |
bool | get_conf_key (const char *key, std::string &value) |
Data Fields | |
MapIS | jazz_node_name = {} |
The names of the nodes (other Jazz servers) in the cluster. | |
MapIS | jazz_node_ip = {} |
The ip addresses of the nodes in the cluster. | |
MapII | jazz_node_port = {} |
The ports of the nodes in the cluster. | |
bool | search_my_node_index = false |
If true, the node index is searched in the cluster. | |
int | jazz_node_my_index = -1 |
The index of the node in the cluster. | |
int | jazz_node_cluster_size = 0 |
The number of nodes in the cluster. | |
std::string | filesystem_root = {} |
The root of the filesystem. | |
![]() | |
pLogger | p_log |
The logger. | |
pConfigFile | p_conf |
The configuration file. | |
Protected Member Functions | |
StatusCode | curl_get (pTransaction &p_txn, const char *url, Index *p_idx=nullptr) |
The most low level get function. | |
StatusCode | curl_put (const char *url, pBlock p_blk, int mode=WRITE_AS_STRING|WRITE_AS_FULL_BLOCK, Index *p_idx=nullptr) |
The most low level put function. | |
StatusCode | curl_remove (const char *url, Index *p_idx=nullptr) |
The most low level remove function. | |
![]() | |
void * | malloc (size_t size) |
pBlock | block_malloc (size_t size) |
void | lock_container () |
void | unlock_container () |
StatusCode | destroy_container () |
int | from_hex (char c) |
Private Member Functions | |
bool | compose_url (pChar p_dest, pChar p_node, pChar p_url, int buff_size) |
Compose a url from a node, a base and an entity. | |
Private Attributes | |
char | HEX [16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'} |
Hexadecimal digits. | |
int | can_curl = false |
If true, the server can use libcurl based on configuration key ENABLE_HTTP_CLIENT. | |
int | curl_ok = false |
If true, libcurl is ready to be used based on config + libcurl initialization. | |
int | can_zmq = false |
If true, the server can use zeroMQ based on configuration key ENABLE_ZEROMQ_CLIENT. | |
int | zmq_ok = false |
If true, zeroMQ is ready to be used based on config + zeroMQ initialization. | |
int | can_bash = false |
If true, the server can use bash based on configuration key ENABLE_BASH_EXEC. | |
int | file_lev = 0 |
The level of file operations allowed based on configuration key ENABLE_FILE_LEVEL. | |
PipeMap | pipes = {} |
A map of pipelines (zeroMQ connections) | |
ConnMap | connect = {} |
A map of http connections. | |
void * | zmq_context = nullptr |
The zeroMQ context. | |
Additional Inherited Members | |
![]() | |
int | max_transactions |
The configured ONE_SHOT_MAX_TRANSACTIONS. | |
uint64_t | warn_alloc_bytes |
Taken from ONE_SHOT_WARN_BLOCK_KBYTES. | |
uint64_t | fail_alloc_bytes |
Taken from ONE_SHOT_ERROR_BLOCK_KBYTES. | |
uint64_t | alloc_bytes |
The current allocation in bytes. | |
pTransaction | p_buffer |
The buffer for the transactions. | |
pTransaction | p_free |
The free list of transactions. | |
bool | alloc_warning_issued |
True if a warning was issued for over-allocation. | |
Lock32 | _lock_ |
A lock for the deque of transactions. | |
Channels: A Container doing block transactions across media (files, folders, shell, http urls and zeroMQ servers)
NOTES: 1. This is the only container that does not have a native interface. Since urls and file names can be very long the easy interface is all you need, there is no as_locator() parsing.
The 0-mq implementation in jazz_elements is only a zeroMQ client, not a server. The only operation it supports is a translate() call. For a translate() call to succeed, you must first have created a pipeline using a put call with a string. E.g, put("//0-mq/pipeline/speech2text") with a block containing a string like "tcp://localhost:5555" creates a pipeline named speech2text. Outside Jazz, you set up a zeroMQ server that listens to tcp://localhost:5555 and does speech to text, expecting, say 32-bit int vectors as input and returning a buffer of char as output. In your translate call you must provide a tuple with two Tensors with item names "input" and "result". The translate() call will send the raw tensor of the item "input" to the server and write whatever the server answers into the tensor named "result", just overriding the tensor without any dimension change. This operation expects the tensor to be binary (i.e., no variable length strings) and their shapes and types known by both parts.
In terms of the Jazz server API, this is a function call: either GET "//0-mq/speech2text/(//lmdb/stuff/my_tensor)" or GET "//0-mq/speech2text/(&[1,2,3];)" the argument can be anything in Persisted, Volatile, even a file or an //http get or a (%-encoded) constant as in the second example.
When using translate() as the method of Channel, you should omit the "pipeline" part, just translate(p_tuple, "//0-mq/speech2text");
Besides this, get("//0-mq/pipeline/speech2text") will return just a block with "tcp://localhost:5555" and remove("//0-mq/pipeline/speech2text") will destroy the pipeline. Any other call using "0-mq" returns SERVICE_ERROR_NOT_APPLICABLE.
"0-mq" operation must be enabled via configuration by setting ENABLE_ZEROMQ_CLIENT to something non-zero.
This is also a translate() call, the difference is you don't create the pipeline, it always exists and is called "//bash/exec". The Tuple is an array of byte, both ways "input" and "result". If the size of the "result" buffer is too small for the answer it will be filled up to the available size and something will be lost. The answer includes whatever a popen("bash script.sh") writes to stdout / stderr (where script.sh is the content of the "input" tensor). "bash" operation must be enabled via configuration by setting ENABLE_BASH_EXEC to something non-zero. There is no security check: it can be used for pushing AI creations to github or kill the server with //bash/exec(& jazz%20stop ;)
This read/writes/deletes to the filesystem. Since the API does not use locators, there is no hardcoded name restriction. Via the http server, just use the URL (&...;). Remember to %-encode whatever http expects to be encoded. E.g., get("//file/&whatever%20you%20want;"). Note that "//file/" is a mandatory prefix, therefore "//file/aa" is "aa" and //file//aa" is "/aa". get() gets files as arrays of byte and folders as an Index serialized as a Tuple (the keys are file names and the values either "file" or "folder"). put() writes either Jazz blocks with all the metadata (if mode == WRITE_EVERYTHING) of just the content of the tensor (if mode == WRITE_TENSOR_DATA). WRITE_ONLY_IF_EXISTS and WRITE_ONLY_IF_NOT_EXISTS work as expected. remove() deletes whatever matches the path either a file or a folder (with anything inside it). new_entity() creates a new folder. "file" operation must be enabled via configuration by setting ENABLE_FILE_LEVEL to 0 (disable everything), 1 (just read), 2 (cannot override == no remove and WRITE_ONLY_IF_NOT_EXISTS) or 3 (enable everything). @subsection autotoc_md8 "http" Reference Besides being an http server, Jazz is also an http client. The simplest mode of operation is just forwarding get(), put() and delete() calls that are intended for other nodes in a Jazz cluster. This is done at the top API level by just adding a node name. E.g., get("///node_x//lmdb/things/this") will forward the call to the node_x (if anything is well configured see JAZZ_NODE_NAME_.., etc.) and return the result just as if is was a local call. At the class level, this is done by forward_get(), forward_put() and forward_del(). You can also send simple GET, PUT and DELETE http calls to random urls by either using the get(), put() and remove() or using the Jazz http server API GET "//http&https://google.com;" The most advanced way to do it is creating a connection (similar to a "0-mq" pipeline) by put()-ing an Tuple to: //http/connection/a_name The tuple has two items named "key" and "value" that are vectors of string of the same size (like the ones returned by new_block(8)). The key must have a the mandatory "URL" and optionally: "CURLOPT_USERNAME", "CURLOPT_USERPWD", "CURLOPT_COOKIEFILE" and "CURLOPT_COOKIEJAR". See https://curl.se/libcurl/c/CURLOPT_USERNAME.html and https://everything.curl.dev/libcurl-http/cookies. Once the connection exists, you can get(), put() and remove() to just its name (without the word connection). I.e, get(txn, "//http/a_name") or get(txn, "//http/a_name/args") will send the http GET to connection[URL] + "args". Same for put() and remove(). If you remove("//http/connection/a_name"), you destroy the connection. get("//http/connection/a_name") returns an Index serialized as a Tuple with all the connection parameters. "http" operation must be enabled via configuration by setting ENABLE_HTTP_CLIENT to something non-zero.
jazz_elements::Channels::Channels | ( | pLogger | a_logger, |
pConfigFile | a_config | ||
) |
Initialize the Channels Container without starting it.
a_logger | A pointer to a Logger object. |
a_config | A pointer to a ConfigFile object. |
|
virtual |
Return object ID.
Reimplemented from jazz_elements::Service.
|
virtual |
Reads config variables and sets jazz_node_* public variables.
Reimplemented from jazz_elements::Service.
|
virtual |
Shuts down the Persisted Service
Reimplemented from jazz_elements::Service.
|
virtual |
Native (Channels) interface complete Block retrieval.
p_txn | A pointer to a Transaction passed by reference. If successful, the Container will return a pointer to a Transaction inside the Container. The data read from the endpoint will be stored as a rank == 1 CELL_TYPE_BYTE for all bases except "bash" shell output is returned as a rank == 1 CELL_TYPE_STRING |
p_what | The endpoint. |
Usage-wise, this is equivalent to a new_block() call. On success, it will return a Transaction that belongs to the Container and must be destroy_transaction()-ed when the caller is done.
NOTE**: See the description of Channels for reference.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface get(2) is not applicable.
p_txn | A pointer to a Transaction passed by reference. If successful, the Container will return a pointer to a Transaction inside the Container. |
p_what | Some string that as_locator() can parse into a Locator. E.g. //base/entity/key |
p_row_filter | The block we want to use as a filter. This is either a tensor of boolean or integer that can_filter(p_from). |
NOTE**: This always returns SERVICE_ERROR_NOT_APPLICABLE. Channels do not contain anything, just use get(1) instead.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface get(3) is not applicable.
p_txn | A pointer to a Transaction passed by reference. If successful, the Container will return a pointer to a Transaction inside the Container. |
p_what | Some string that as_locator() can parse into a Locator. E.g. //base/entity/key |
name | The name of the item to be selected. |
NOTE**: This always returns SERVICE_ERROR_NOT_APPLICABLE. Channels do not contain anything, just use get(1) instead.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface locate is not applicable.
location | The solved location of the block. |
p_what | A valid reference to a block. E.g. //deque/ent/~first, //tree/ent/key~parent, //queue/ent/~highest |
NOTE**: This always returns SERVICE_ERROR_NOT_APPLICABLE. Channels do not contain anything, just use get() instead.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface header is not applicable.
hea | A StaticBlockHeader structure that will receive the metadata. |
p_what | Some string that as_locator() can parse into a Locator. E.g. //base/entity/key |
NOTE**: This always returns SERVICE_ERROR_NOT_APPLICABLE. Channels do not contain anything, just use get() instead.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface header is not applicable.
p_txn | A pointer to a Transaction passed by reference. If successful, the Container will return a pointer to a Transaction inside the Container. |
p_what | Some string that as_locator() can parse into a Locator. E.g. //base/entity/key |
NOTE**: This always returns SERVICE_ERROR_NOT_APPLICABLE. Channels do not contain anything, just use get() instead.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface for Block writing
p_where | The destination endpoint. |
p_block | The Block to be written/sent by Channels. |
mode | Some writing restriction that depends on the base. WRITE_ONLY_IF_EXISTS and WRITE_ONLY_IF_NOT_EXISTS can only be used on file. WRITE_TENSOR_DATA should be used as default, otherwise the metadata will also be written/sent. |
NOTE**: See the description of Channels for reference.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface for creating folders
p_where | The path to the folder to be created via mkdir() |
NOTE**: This is only used with //file.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface for deleting:
p_where | Some endpoint to be deleted. |
NOTE**: See the description of Channels for reference.
Reimplemented from jazz_elements::Container.
|
virtual |
Easy Channels interface for Block copying (possibly across bases but inside the Channels).
p_where | Some destination endpoint. |
p_what | Some source endpoint. |
NOTE**: This is just a get() and a put(). See the description of Channels for reference.
Reimplemented from jazz_elements::Container.
|
virtual |
The function call interface for modify: In jazz_elements, this is only implemented in Channels.
function | Some description of a service. In general base/entity/key. In Channels the key must be empty and the entity is the pipeline. |
p_args | In Channels: A Tuple with two items, "input" with the data passed to the service and "result" with the data returned. The result will be overridden in-place without any allocation. |
This is what most frameworks would call predict(), something that takes any tensor as an input and returns another tensor. In channels, it just gives support to some other service doing that connected via zeroMQ or bash. Outside jazz_elements, the services use this to run their own models.
Reimplemented from jazz_elements::Container.
MHD_StatusCode jazz_elements::Channels::forward_get | ( | pTransaction & | p_txn, |
Name | node, | ||
pChar | p_url | ||
) |
Forwards an HTTP_GET call to another node in the Jazz cluster.
p_txn | A pTransaction owned by Channels. It must be destroy_transaction()-ed after successful use. |
node | The name of the endpoint node. It must be found in the cluster config. |
p_url | The unparsed url (server excluded) the remote Jazz server can serve. |
MHD_StatusCode jazz_elements::Channels::forward_put | ( | Name | node, |
pChar | p_url, | ||
pBlock | p_block, | ||
int | mode = WRITE_AS_BASE_DEFAULT |
||
) |
Forwards an HTTP_PUT call to another node in the Jazz cluster.
node | The name of the endpoint node. It must be found in the cluster config. |
p_url | The unparsed url (server excluded) the remote Jazz server can serve. |
p_block | A block to be put (owned by the caller). |
mode | WRITE_TENSOR_DATA, WRITE_C_STR or WRITE_EVERYTHING (see curl_put). |
MHD_StatusCode jazz_elements::Channels::forward_del | ( | Name | node, |
pChar | p_url | ||
) |
Forwards an HTTP_DELETE call to another node in the Jazz cluster.
node | The name of the endpoint node. It must be found in the cluster config. |
p_url | The unparsed url (server excluded) the remote Jazz server can serve. |
void jazz_elements::Channels::base_names | ( | BaseNames & | base_names | ) |
Add the base names for this Channels.
base_names | A BaseNames map passed by reference to which the base names of this object are added by this call. |
|
inlineprotected |
The most low level get function.
p_txn | A pointer to a Transaction passed by reference. If successful, the Container will return a pointer to a Transaction inside the Container. The caller can only use it read-only and must destroy_transaction() it when done. |
url | The url to be got. |
p_idx | Additional curl_easy_setopt() options passed in an Index. |
|
inlineprotected |
The most low level put function.
url | The url to put to. |
p_blk | The Block to be sent by Channels. |
mode | WRITE_TENSOR_DATA, WRITE_C_STR or WRITE_EVERYTHING |
p_idx | Additional curl_easy_setopt() options passed in an Index. |
|
inlineprotected |
The most low level remove function.
url | The url to put to. |
p_idx | Additional curl_easy_setopt() options passed in an Index. |
|
inlineprivate |
Compose a url from a node, a base and an entity.
p_dest | The destination buffer. |
p_node | The node name that has to be resolved to its ip and port. |
p_url | The rul that has to be urlencoded. |
buff_size | The size of the destination buffer. |