Jazz 1.26.+
Loading...
Searching...
No Matches
channel.h
1/* Jazz (c) 2018-2026 kaalam.ai (The Authors of Jazz), using (under the same license):
2
3 1. Biomodelling - The AATBlockQueue class (c) Jacques BasaldĂșa, 2009-2012 licensed
4 exclusively for the use in the Jazz server software.
5
6 Copyright 2009-2012 Jacques BasaldĂșa
7
8 2. BBVA - Jazz: A lightweight analytical web server for data-driven applications.
9
10 Copyright 2016-2017 Banco Bilbao Vizcaya Argentaria, S.A.
11
12 This product includes software developed at
13
14 BBVA (https://www.bbva.com/)
15
16 3. LMDB, Copyright 2011-2017 Howard Chu, Symas Corp. All rights reserved.
17
18 Licensed under http://www.OpenLDAP.org/license.html
19
20
21 Licensed under the Apache License, Version 2.0 (the "License");
22 you may not use this file except in compliance with the License.
23 You may obtain a copy of the License at
24
25 http://www.apache.org/licenses/LICENSE-2.0
26
27 Unless required by applicable law or agreed to in writing, software
28 distributed under the License is distributed on an "AS IS" BASIS,
29 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30 See the License for the specific language governing permissions and
31 limitations under the License.
32*/
33
34
35#include <map>
36
37#include <microhttpd.h>
38#include <curl/curl.h>
39
40#include "src/jazz_elements/container.h"
41
42#ifdef CATCH_TEST
43#ifndef INCLUDED_JAZZ_CATCH2
44#define INCLUDED_JAZZ_CATCH2
45
46#include "src/catch2/catch.hpp"
47
48#endif
49#endif
50
51
52#ifndef INCLUDED_JAZZ_ELEMENTS_CHANNEL
53#define INCLUDED_JAZZ_ELEMENTS_CHANNEL
54
55namespace jazz_elements
56{
57
58#define BASE_BASH_10BIT 0x022 //< First 10 bits of base "bash"
59#define BASE_FILE_10BIT 0x126 //< First 10 bits of base "file"
60#define BASE_HTTP_10BIT 0x288 //< First 10 bits of base "http"
61#define BASE_0_MQ_10BIT 0x1b0 //< First 10 bits of base "0-mq"
62
63
64#define MAX_FILE_OR_URL_SIZE 1712
65
67
68#define APPLY_NOTHING 0
69#define APPLY_NAME 1
70#define APPLY_URL 2
71#define APPLY_FUNCTION 3
72#define APPLY_FUNCT_CONST 4
73#define APPLY_FILTER 5
74#define APPLY_FILT_CONST 6
75#define APPLY_RAW 7
76#define APPLY_TEXT 8
77#define APPLY_ASSIGN_NOTHING 9
78#define APPLY_ASSIGN_NAME 10
79#define APPLY_ASSIGN_URL 11
80#define APPLY_ASSIGN_FUNCTION 12
81#define APPLY_ASSIGN_FUNCT_CONST 13
82#define APPLY_ASSIGN_FILTER 14
83#define APPLY_ASSIGN_FILT_CONST 15
84#define APPLY_ASSIGN_RAW 16
85#define APPLY_ASSIGN_TEXT 17
86#define APPLY_ASSIGN_CONST 18
87#define APPLY_NEW_ENTITY 19
88#define APPLY_GET_ATTRIBUTE 20
89#define APPLY_SET_ATTRIBUTE 21
90#define APPLY_JAZZ_INFO 22
91
92
93// Bit masks to trigger curl failures in Channel wrappers during tests.
94
95#define CURL_EASY_NO_BYPASS -1
96
97// #define TRIGGER_FAIL_MDB_DROP (1u << 14) This is the highest bit used in Persisted,
98#define TRIGGER_FAIL_CURL_EASY_INIT (1u << 15)
99#define TRIGGER_FAIL_CURL_EASY_PERFORM (1u << 16)
100#define TRIGGER_FAIL_CURL_EASY_GETINFO (1u << 17)
101
102#define TRIGGER_FAIL_GETIFADDRS (1u << 18)
103#define TRIGGER_FAIL_GETNAMEINFO (1u << 19)
104#define TRIGGER_FAIL_MYINDEX_FIND (1u << 20)
105#define TRIGGER_FAIL_FILE_IO (1u << 21)
106#define TRIGGER_FAIL_ZMQ (1u << 22)
107#define TRIGGER_FAIL_BASH (1u << 23)
108
109#define TRIGGER_FAKE_FORWARD_SUCCESS (1u << 24)
110
112typedef std::map<int, String> MapIS;
113
114
116typedef std::map<String, Index> ConnMap;
117
118
120struct Socket {
121 char endpoint[120];
122 void *requester;
123};
124
125
127typedef std::vector<uint8_t> GetBuffer;
129
130
132struct PutBuffer {
133 uint64_t to_send;
134 uint8_t *p_base;
135};
137
138
140typedef std::map<String, Socket> PipeMap;
141
142
144typedef std::map<int, int> MapII;
145
146
158typedef unsigned int MHD_StatusCode;
159
160extern size_t get_callback(char *ptr, size_t size, size_t nmemb, void *container);
161extern size_t put_callback(char *ptr, size_t size, size_t nmemb, void *container);
162extern size_t dev_null(char *_ignore, size_t size, size_t nmemb, void *_ignore_2);
163
244class Channels : public Container {
245
246 public:
247
248 Channels(pLogger a_logger,
249 pConfigFile a_config);
250 ~Channels();
251
252 virtual pChar const id();
253
254 StatusCode start ();
256
257 virtual StatusCode get (pTransaction &p_txn,
258 pChar p_what);
259 virtual StatusCode get (pTransaction &p_txn,
260 pChar p_what,
261 pBlock p_row_filter);
262 virtual StatusCode get (pTransaction &p_txn,
263 pChar p_what,
264 pChar name);
265 virtual StatusCode locate (Locator &location,
266 pChar p_what);
267 virtual StatusCode header (StaticBlockHeader &hea,
268 pChar p_what);
269 virtual StatusCode header (pTransaction &p_txn,
270 pChar p_what);
271 virtual StatusCode put (pChar p_where,
272 pBlock p_block,
273 int mode = WRITE_AS_BASE_DEFAULT);
274 virtual StatusCode new_entity(pChar p_where);
275 virtual StatusCode remove (pChar p_where);
276 virtual StatusCode copy (pChar p_where,
277 pChar p_what);
278 virtual StatusCode modify (Locator &function,
279 pTuple p_args);
281 Name node,
282 pChar p_url);
284 pChar p_url,
285 pBlock p_block,
286 int mode = WRITE_AS_BASE_DEFAULT);
288 pChar p_url);
289
290 // Support for container names in the BaseAPI .base_names()
291
292 virtual void base_names(BaseNames &base_names);
293
294 // Public config variables
295
299
302
304
305#ifndef CATCH_TEST
306 protected:
307#endif
308
320 inline StatusCode curl_get(pTransaction &p_txn, const char *url, Index *p_idx = nullptr) {
321 CURL *curl;
322 CURLcode c_ret;
323
324 curl = curl_easy_init();
325 if (curl == nullptr) return SERVICE_ERROR_NOT_READY;
326
327 GetBuffer buff = {};
328
329 curl_easy_setopt(curl, CURLOPT_URL, url);
330 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
331 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
332 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, get_callback);
333 curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &buff);
334
335 if (p_idx != nullptr) {
336 Index:: iterator it;
337 if ((it = p_idx->find("CURLOPT_USERNAME")) != p_idx->end())
338 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
339
340 if ((it = p_idx->find("CURLOPT_USERPWD")) != p_idx->end())
341 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
342
343 if ((it = p_idx->find("CURLOPT_COOKIEFILE")) != p_idx->end())
344 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
345
346 if ((it = p_idx->find("CURLOPT_COOKIEJAR")) != p_idx->end())
347 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
348 }
349 c_ret = curl_easy_perform(curl);
350
351 uint64_t response_code;
352
353 if (c_ret == CURLE_OK)
354 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
355
356 curl_easy_cleanup(curl);
357
358 switch (c_ret) {
359 case CURLE_OK:
360 break;
361 case CURLE_REMOTE_ACCESS_DENIED:
362 case CURLE_AUTH_ERROR:
363 return SERVICE_ERROR_READ_FORBIDDEN;
364 case CURLE_REMOTE_FILE_NOT_FOUND:
365 return SERVICE_ERROR_BLOCK_NOT_FOUND;
366 default:
367 return SERVICE_ERROR_IO_ERROR;
368 }
369
370 switch (response_code) {
371 case MHD_HTTP_OK:
372 case MHD_HTTP_CREATED:
373 case MHD_HTTP_ACCEPTED:
374 break;
375 case MHD_HTTP_NOT_FOUND:
376 case MHD_HTTP_GONE:
377 return SERVICE_ERROR_BLOCK_NOT_FOUND;
378 case MHD_HTTP_BAD_REQUEST:
379 return SERVICE_ERROR_WRONG_ARGUMENTS;
380 case MHD_HTTP_UNAUTHORIZED:
381 case MHD_HTTP_PAYMENT_REQUIRED:
382 case MHD_HTTP_FORBIDDEN:
383 case MHD_HTTP_METHOD_NOT_ALLOWED:
384 case MHD_HTTP_NOT_ACCEPTABLE:
385 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
386 case MHD_HTTP_TOO_MANY_REQUESTS:
387 return SERVICE_ERROR_READ_FORBIDDEN;
388 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
389 return SERVICE_ERROR_MISC_SERVER;
390 default:
391 return SERVICE_ERROR_IO_ERROR;
392 }
393 size_t buf_size = buff.size();
394 if (buf_size > MAX_BLOCK_SIZE) return SERVICE_ERROR_BLOCK_TOO_BIG;
395
396 buff.push_back(0);
397
398 return unwrap_received(p_txn, (pBlock) buff.data(), buf_size);
399 }
400
401
412 inline StatusCode curl_put(const char *url, pBlock p_blk, int mode = WRITE_AS_STRING | WRITE_AS_FULL_BLOCK, Index *p_idx = nullptr) {
413 CURL *curl;
414 CURLcode c_ret;
415
416 curl = curl_easy_init();
417 if (curl == nullptr) return SERVICE_ERROR_NOT_READY;
418
419 PutBuffer put_buff;
420
421 if ((mode & WRITE_AS_ANY_WRITE) == 0)
422 mode = WRITE_AS_STRING | WRITE_AS_FULL_BLOCK;
423
424 if ((mode & WRITE_AS_STRING) && ( (p_blk->cell_type == CELL_TYPE_STRING && p_blk->size == 1)
425 || (p_blk->cell_type == CELL_TYPE_BYTE && p_blk->rank == 1))) {
426 if (p_blk->cell_type == CELL_TYPE_STRING) {
427 put_buff.p_base = (uint8_t *) p_blk->get_string(0);
428 put_buff.to_send = strlen((const char *) put_buff.p_base);
429 } else {
430 put_buff.p_base = &p_blk->tensor.cell_byte[0];
431 put_buff.to_send = strnlen((const char *) put_buff.p_base, p_blk->size);
432 }
433 } else if ((mode & WRITE_AS_CONTENT) && ((p_blk->cell_type & 0xf0) == 0)) {
434 put_buff.to_send = p_blk->size*(p_blk->cell_type & 0xff);
435 put_buff.p_base = &p_blk->tensor.cell_byte[0];
436 } else if ((mode & WRITE_AS_FULL_BLOCK) && (p_blk->cell_type != CELL_TYPE_INDEX)) {
437 put_buff.to_send = p_blk->total_bytes;
438 put_buff.p_base = (uint8_t *) p_blk;
439 } else
440 return SERVICE_ERROR_WRONG_ARGUMENTS;
441
442 curl_easy_setopt(curl, CURLOPT_URL, url);
443 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
444 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
445 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dev_null);
446 curl_easy_setopt(curl, CURLOPT_READFUNCTION, put_callback);
447 curl_easy_setopt(curl, CURLOPT_READDATA, (void *) &put_buff);
448 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
449 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t) put_buff.to_send);
450
451 if (p_idx != nullptr) {
452 Index:: iterator it;
453 if ((it = p_idx->find("CURLOPT_USERNAME")) != p_idx->end())
454 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
455
456 if ((it = p_idx->find("CURLOPT_USERPWD")) != p_idx->end())
457 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
458
459 if ((it = p_idx->find("CURLOPT_COOKIEFILE")) != p_idx->end())
460 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
461
462 if ((it = p_idx->find("CURLOPT_COOKIEJAR")) != p_idx->end())
463 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
464 }
465 c_ret = curl_easy_perform(curl);
466
467 uint64_t response_code;
468
469 if (c_ret == CURLE_OK)
470 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
471
472 curl_easy_cleanup(curl);
473
474 switch (c_ret) {
475 case CURLE_OK:
476 break;
477 case CURLE_REMOTE_ACCESS_DENIED:
478 case CURLE_AUTH_ERROR:
479 return SERVICE_ERROR_WRITE_FORBIDDEN;
480 case CURLE_REMOTE_FILE_NOT_FOUND:
481 return SERVICE_ERROR_BLOCK_NOT_FOUND;
482 default:
483 return SERVICE_ERROR_IO_ERROR;
484 }
485
486 switch (response_code) {
487 case MHD_HTTP_OK:
488 case MHD_HTTP_CREATED:
489 case MHD_HTTP_ACCEPTED:
490 return SERVICE_NO_ERROR;
491 case MHD_HTTP_NOT_FOUND:
492 case MHD_HTTP_GONE:
493 return SERVICE_ERROR_BLOCK_NOT_FOUND;
494 case MHD_HTTP_BAD_REQUEST:
495 return SERVICE_ERROR_WRONG_ARGUMENTS;
496 case MHD_HTTP_UNAUTHORIZED:
497 case MHD_HTTP_PAYMENT_REQUIRED:
498 case MHD_HTTP_FORBIDDEN:
499 case MHD_HTTP_METHOD_NOT_ALLOWED:
500 case MHD_HTTP_NOT_ACCEPTABLE:
501 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
502 case MHD_HTTP_TOO_MANY_REQUESTS:
503 return SERVICE_ERROR_WRITE_FORBIDDEN;
504 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
505 return SERVICE_ERROR_MISC_SERVER;
506 }
507 return SERVICE_ERROR_IO_ERROR;
508 }
509
510
519 inline StatusCode curl_remove(const char *url, Index *p_idx = nullptr) {
520 CURL *curl;
521 CURLcode c_ret;
522
523 curl = curl_easy_init();
524 if (curl == nullptr) return SERVICE_ERROR_NOT_READY;
525
526 curl_easy_setopt(curl, CURLOPT_URL, url);
527 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
528 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
529 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
530 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dev_null);
531
532 if (p_idx != nullptr) {
533 Index:: iterator it;
534 if ((it = p_idx->find("CURLOPT_USERNAME")) != p_idx->end())
535 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
536
537 if ((it = p_idx->find("CURLOPT_USERPWD")) != p_idx->end())
538 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
539
540 if ((it = p_idx->find("CURLOPT_COOKIEFILE")) != p_idx->end())
541 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
542
543 if ((it = p_idx->find("CURLOPT_COOKIEJAR")) != p_idx->end())
544 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
545 }
546 c_ret = curl_easy_perform(curl);
547
548 uint64_t response_code;
549
550 if (c_ret == CURLE_OK)
551 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
552
553 curl_easy_cleanup(curl);
554
555 switch (c_ret) {
556 case CURLE_OK:
557 break;
558 case CURLE_REMOTE_ACCESS_DENIED:
559 case CURLE_AUTH_ERROR:
560 return SERVICE_ERROR_WRITE_FORBIDDEN;
561
562 case CURLE_REMOTE_FILE_NOT_FOUND:
563 return SERVICE_ERROR_BLOCK_NOT_FOUND;
564 default:
565 return SERVICE_ERROR_IO_ERROR;
566 }
567
568 switch (response_code) {
569 case MHD_HTTP_OK:
570 case MHD_HTTP_CREATED:
571 case MHD_HTTP_ACCEPTED:
572 return SERVICE_NO_ERROR;
573 case MHD_HTTP_NOT_FOUND:
574 case MHD_HTTP_GONE:
575 return SERVICE_ERROR_BLOCK_NOT_FOUND;
576 case MHD_HTTP_BAD_REQUEST:
577 return SERVICE_ERROR_WRONG_ARGUMENTS;
578 case MHD_HTTP_UNAUTHORIZED:
579 case MHD_HTTP_PAYMENT_REQUIRED:
580 case MHD_HTTP_FORBIDDEN:
581 case MHD_HTTP_METHOD_NOT_ALLOWED:
582 case MHD_HTTP_NOT_ACCEPTABLE:
583 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
584 case MHD_HTTP_TOO_MANY_REQUESTS:
585 return SERVICE_ERROR_WRITE_FORBIDDEN;
586 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
587 return SERVICE_ERROR_MISC_SERVER;
588 }
589 return SERVICE_ERROR_IO_ERROR;
590 }
591
592#ifndef CATCH_TEST
593 private:
594#endif
595 char HEX[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
596
607 inline bool compose_url(pChar p_dest, pChar p_node, pChar p_url, int buff_size) {
608 int nix;
609
610 for (MapIS::iterator it = jazz_node_name.begin(); true; ++it) {
611 if (it == jazz_node_name.end())
612 return false;
613
614 if (it->second == p_node) {
615 nix = it->first;
616 break;
617 }
618 }
619 int ret = snprintf(p_dest, buff_size, "http://%s:%i", jazz_node_ip[nix].c_str(), jazz_node_port[nix]);
620
621 if (ret < 0 || ret >= buff_size)
622 return false;
623
624 p_dest += ret;
625 buff_size -= ret;
626
627 while (buff_size > 0) {
628 u_int8_t cursor = *(p_url++);
629 switch (cursor) {
630 case 0:
631 *p_dest = 0;
632 return true;
633 case '!' ... '$':
634 case '&' ... '/':
635 case '0' ... '9':
636 case ':':
637 case ';':
638 case '=':
639 case '?':
640 case '@':
641 case 'A' ... 'Z':
642 case '[':
643 case ']':
644 case '_':
645 case 'a' ... 'z':
646 case '~':
647 *(p_dest++) = cursor;
648 buff_size--;
649 break;
650 default:
651 if (buff_size < 4)
652 return false;
653 *(p_dest++) = '%';
654 *(p_dest++) = HEX[cursor >> 4];
655 *(p_dest++) = HEX[cursor & 0x0f];
656 buff_size -= 3;
657 }
658 }
659 return false;
660 }
661
662 int can_curl = false;
663 int curl_ok = false;
664 int can_zmq = false;
665 int zmq_ok = false;
666 int can_bash = false;
667 int file_lev = 0;
668
671
672 void *zmq_context = nullptr;
673
674#ifdef CATCH_TEST
675 CURL * curl_easy_init ();
676 CURLcode curl_easy_perform(CURL *curl);
677 CURLcode curl_easy_getinfo(CURL *curl, CURLINFO info, uint64_t *response_code);
678
679 int curl_easy_return_code = CURL_EASY_NO_BYPASS;
680 int curl_easy_response = CURL_EASY_NO_BYPASS;
681#endif
682
683};
685
686
687#ifdef CATCH_TEST
688
689// Instancing Channels
690// -------------------
691
692extern Channels CHN;
693
694#endif
695
696} // namespace jazz_elements
697
698#endif // ifndef INCLUDED_JAZZ_ELEMENTS_CHANNEL
Channels: A Container doing block transactions across media (files, folders, shell,...
Definition channel.h:244
PipeMap pipes
A map of pipelines (zeroMQ connections)
Definition channel.h:669
MHD_StatusCode forward_del(Name node, pChar p_url)
Definition channel.cpp:1138
virtual StatusCode header(StaticBlockHeader &hea, pChar p_what)
Definition channel.cpp:547
int jazz_node_my_index
The index of the node in the cluster.
Definition channel.h:300
virtual StatusCode put(pChar p_where, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT)
Definition channel.cpp:581
StatusCode curl_remove(const char *url, Index *p_idx=nullptr)
The most low level remove function.
Definition channel.h:519
bool compose_url(pChar p_dest, pChar p_node, pChar p_url, int buff_size)
Compose a url from a node, a base and an entity.
Definition channel.h:607
String filesystem_root
The root of the filesystem.
Definition channel.h:303
int can_zmq
If true, the server can use zeroMQ based on configuration key ENABLE_ZEROMQ_CLIENT.
Definition channel.h:664
StatusCode curl_get(pTransaction &p_txn, const char *url, Index *p_idx=nullptr)
The most low level get function.
Definition channel.h:320
StatusCode curl_put(const char *url, pBlock p_blk, int mode=WRITE_AS_STRING|WRITE_AS_FULL_BLOCK, Index *p_idx=nullptr)
The most low level put function.
Definition channel.h:412
virtual StatusCode modify(Locator &function, pTuple p_args)
Definition channel.cpp:946
int file_lev
The level of file operations allowed based on configuration key ENABLE_FILE_LEVEL.
Definition channel.h:667
int can_bash
If true, the server can use bash based on configuration key ENABLE_BASH_EXEC.
Definition channel.h:666
virtual StatusCode remove(pChar p_where)
Definition channel.cpp:798
virtual StatusCode copy(pChar p_where, pChar p_what)
Definition channel.cpp:902
virtual StatusCode get(pTransaction &p_txn, pChar p_what)
Definition channel.cpp:352
virtual void base_names(BaseNames &base_names)
Definition channel.cpp:1073
virtual StatusCode locate(Locator &location, pChar p_what)
Definition channel.cpp:532
MapIS jazz_node_ip
The ip addresses of the nodes in the cluster.
Definition channel.h:297
StatusCode shut_down()
Definition channel.cpp:310
char HEX[16]
Hexadecimal digits.
Definition channel.h:595
int zmq_ok
If true, zeroMQ is ready to be used based on config + zeroMQ initialization.
Definition channel.h:665
MHD_StatusCode forward_put(Name node, pChar p_url, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT)
Definition channel.cpp:1116
StatusCode start()
Definition channel.cpp:150
int can_curl
If true, the server can use libcurl based on configuration key ENABLE_HTTP_CLIENT.
Definition channel.h:662
MapII jazz_node_port
The ports of the nodes in the cluster.
Definition channel.h:298
void * zmq_context
The zeroMQ context.
Definition channel.h:672
MHD_StatusCode forward_get(pTransaction &p_txn, Name node, pChar p_url)
Definition channel.cpp:1090
virtual pChar const id()
Definition channel.cpp:140
MapIS jazz_node_name
The names of the nodes (other Jazz servers) in the cluster.
Definition channel.h:296
int jazz_node_cluster_size
The number of nodes in the cluster.
Definition channel.h:301
ConnMap connect
A map of http connections.
Definition channel.h:670
int curl_ok
If true, libcurl is ready to be used based on config + libcurl initialization.
Definition channel.h:663
virtual StatusCode new_entity(pChar p_where)
Definition channel.cpp:764
A configuration file as a key/value store.
Definition utils.h:218
Container: A Service to manage Jazz blocks. All Jazz blocks are managed by this or a descendant of th...
Definition container.h:296
StatusCode unwrap_received(pTransaction &p_txn)
Definition container.h:447
A simple logger.
Definition utils.h:248
Tuple: A Jazz Block with multiple Tensors.
Definition tuple.h:94
The namespace for Jazz Utils, Blocks, Kinds, Tuples, Containers, etc.
Definition block.cpp:39
std::string String
A standard string used in many other places in Jazz.
Definition types.h:239
GetBuffer * pGetBuffer
A pointer to a GetBuffer.
Definition channel.h:128
std::map< String, Index > ConnMap
A structure holding connections.
Definition channel.h:116
std::map< String, pContainer > BaseNames
A map of names for the containers (or structure engines like "map" or "tree" inside Volatile).
Definition container.h:166
std::map< String, Socket > PipeMap
A structure holding pipeline.
Definition channel.h:140
Channels * pChannels
A pointer to a Channels.
Definition channel.h:684
char * pChar
A pointer to a char buffer.
Definition types.h:189
class Block * pBlock
A (forward defined) pointer to a Block.
Definition block.h:66
char Name[NAME_SIZE]
A short identifier used in Blocks, Containers and API.
Definition types.h:187
std::vector< uint8_t > GetBuffer
A structure to share with the libcurl get callback.
Definition channel.h:127
std::map< int, int > MapII
A map for defining http config ports.
Definition channel.h:144
PutBuffer * pPutBuffer
A pointer to a PutBuffer.
Definition channel.h:136
int StatusCode
Type returned by the Service API.
Definition utils.h:142
size_t get_callback(char *ptr, size_t size, size_t nmemb, void *container)
A callback for libCURL GET.
Definition channel.cpp:61
unsigned int MHD_StatusCode
A proper type for specifying http status codes.
Definition channel.h:158
size_t put_callback(char *ptr, size_t size, size_t nmemb, void *container)
A callback for libCURL PUT.
Definition channel.cpp:107
size_t dev_null(char *_ignore, size_t size, size_t nmemb, void *_ignore_2)
A callback for libCURL GET to ignore all the blocks sent by the server in PUT and DELETE calls.
Definition channel.cpp:89
std::map< int, String > MapIS
A map for defining http config names.
Definition channel.h:112
std::map< String, String > Index
An Index kept in RAM by Volatile implemented as an stdlib map (string, string)
Definition types.h:243
Locator: A minimal structure to define the location of resources inside a Container.
Definition container.h:198
A structure keep state inside a put callback.
Definition channel.h:132
uint8_t * p_base
The pointer (updated after each call) to the data.
Definition channel.h:134
uint64_t to_send
Number of bytes to be sent.
Definition channel.h:133
A structure to hold a single pipeline.
Definition channel.h:120
char endpoint[120]
The endpoint at which the socket is connected.
Definition channel.h:121
void * requester
The (connected) zmq socket.
Definition channel.h:122
A Binary Compatible BlockHeader without Index (and therefore constructors/destructors)
Definition types.h:270
Transaction: A wrapper over a Block that defines the communication of a block with a Container.
Definition container.h:176