Jazz 1.25.+
Loading...
Searching...
No Matches
channel.h
1/* Jazz (c) 2018-2026 kaalam.ai (The Authors of Jazz), using (under the same license):
2
3 1. Biomodelling - The AATBlockQueue class (c) Jacques BasaldĂșa, 2009-2012 licensed
4 exclusively for the use in the Jazz server software.
5
6 Copyright 2009-2012 Jacques BasaldĂșa
7
8 2. BBVA - Jazz: A lightweight analytical web server for data-driven applications.
9
10 Copyright 2016-2017 Banco Bilbao Vizcaya Argentaria, S.A.
11
12 This product includes software developed at
13
14 BBVA (https://www.bbva.com/)
15
16 3. LMDB, Copyright 2011-2017 Howard Chu, Symas Corp. All rights reserved.
17
18 Licensed under http://www.OpenLDAP.org/license.html
19
20
21 Licensed under the Apache License, Version 2.0 (the "License");
22 you may not use this file except in compliance with the License.
23 You may obtain a copy of the License at
24
25 http://www.apache.org/licenses/LICENSE-2.0
26
27 Unless required by applicable law or agreed to in writing, software
28 distributed under the License is distributed on an "AS IS" BASIS,
29 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30 See the License for the specific language governing permissions and
31 limitations under the License.
32*/
33
34
35#include <map>
36
37#include <microhttpd.h>
38#include <curl/curl.h>
39
40#include "src/jazz_elements/container.h"
41
42#ifdef CATCH_TEST
43#ifndef INCLUDED_JAZZ_CATCH2
44#define INCLUDED_JAZZ_CATCH2
45
46#include "src/catch2/catch.hpp"
47
48#endif
49#endif
50
51
52#ifndef INCLUDED_JAZZ_ELEMENTS_CHANNEL
53#define INCLUDED_JAZZ_ELEMENTS_CHANNEL
54
55namespace jazz_elements
56{
57
58#define BASE_BASH_10BIT 0x022 //< First 10 bits of base "bash"
59#define BASE_FILE_10BIT 0x126 //< First 10 bits of base "file"
60#define BASE_HTTP_10BIT 0x288 //< First 10 bits of base "http"
61#define BASE_0_MQ_10BIT 0x1b0 //< First 10 bits of base "0-mq"
62
63
64#define MAX_FILE_OR_URL_SIZE 1712
65
67
68#define APPLY_NOTHING 0
69#define APPLY_NAME 1
70#define APPLY_URL 2
71#define APPLY_FUNCTION 3
72#define APPLY_FUNCT_CONST 4
73#define APPLY_FILTER 5
74#define APPLY_FILT_CONST 6
75#define APPLY_RAW 7
76#define APPLY_TEXT 8
77#define APPLY_ASSIGN_NOTHING 9
78#define APPLY_ASSIGN_NAME 10
79#define APPLY_ASSIGN_URL 11
80#define APPLY_ASSIGN_FUNCTION 12
81#define APPLY_ASSIGN_FUNCT_CONST 13
82#define APPLY_ASSIGN_FILTER 14
83#define APPLY_ASSIGN_FILT_CONST 15
84#define APPLY_ASSIGN_RAW 16
85#define APPLY_ASSIGN_TEXT 17
86#define APPLY_ASSIGN_CONST 18
87#define APPLY_NEW_ENTITY 19
88#define APPLY_GET_ATTRIBUTE 20
89#define APPLY_SET_ATTRIBUTE 21
90#define APPLY_JAZZ_INFO 22
91
92
93// Bit masks to trigger curl failures in Channel wrappers during tests.
94
95#define CURL_EASY_NO_BYPASS -1
96
97// #define TRIGGER_FAIL_MDB_DROP (1u << 14) This is the highest bit used in Persisted,
98#define TRIGGER_FAIL_CURL_EASY_INIT (1u << 15)
99#define TRIGGER_FAIL_CURL_EASY_PERFORM (1u << 16)
100#define TRIGGER_FAIL_CURL_EASY_GETINFO (1u << 17)
101
102#define TRIGGER_FAIL_GETIFADDRS (1u << 18)
103#define TRIGGER_FAIL_GETNAMEINFO (1u << 19)
104#define TRIGGER_FAIL_MYINDEX_FIND (1u << 20)
105#define TRIGGER_FAIL_FILE_IO (1u << 21)
106#define TRIGGER_FAIL_ZMQ (1u << 22)
107#define TRIGGER_FAIL_BASH (1u << 23)
108
110typedef std::map<int, String> MapIS;
111
112
114typedef std::map<String, Index> ConnMap;
115
116
118struct Socket {
119 char endpoint[120];
120 void *requester;
121};
122
123
125typedef std::vector<uint8_t> GetBuffer;
127
128
130struct PutBuffer {
131 uint64_t to_send;
132 uint8_t *p_base;
133};
135
136
138typedef std::map<String, Socket> PipeMap;
139
140
142typedef std::map<int, int> MapII;
143
144
156typedef unsigned int MHD_StatusCode;
157
158extern size_t get_callback(char *ptr, size_t size, size_t nmemb, void *container);
159extern size_t put_callback(char *ptr, size_t size, size_t nmemb, void *container);
160extern size_t dev_null(char *_ignore, size_t size, size_t nmemb, void *_ignore_2);
161
242class Channels : public Container {
243
244 public:
245
246 Channels(pLogger a_logger,
247 pConfigFile a_config);
248 ~Channels();
249
250 virtual pChar const id();
251
252 StatusCode start ();
254
255 virtual StatusCode get (pTransaction &p_txn,
256 pChar p_what);
257 virtual StatusCode get (pTransaction &p_txn,
258 pChar p_what,
259 pBlock p_row_filter);
260 virtual StatusCode get (pTransaction &p_txn,
261 pChar p_what,
262 pChar name);
263 virtual StatusCode locate (Locator &location,
264 pChar p_what);
265 virtual StatusCode header (StaticBlockHeader &hea,
266 pChar p_what);
267 virtual StatusCode header (pTransaction &p_txn,
268 pChar p_what);
269 virtual StatusCode put (pChar p_where,
270 pBlock p_block,
271 int mode = WRITE_AS_BASE_DEFAULT);
272 virtual StatusCode new_entity(pChar p_where);
273 virtual StatusCode remove (pChar p_where);
274 virtual StatusCode copy (pChar p_where,
275 pChar p_what);
276 virtual StatusCode modify (Locator &function,
277 pTuple p_args);
279 Name node,
280 pChar p_url);
282 pChar p_url,
283 pBlock p_block,
284 int mode = WRITE_AS_BASE_DEFAULT);
286 pChar p_url);
287
288 // Support for container names in the BaseAPI .base_names()
289
291
292 // Public config variables
293
297
300
302
303#ifndef CATCH_TEST
304 protected:
305#endif
306
318 inline StatusCode curl_get(pTransaction &p_txn, const char *url, Index *p_idx = nullptr) {
319 CURL *curl;
320 CURLcode c_ret;
321
322 curl = curl_easy_init();
323 if (curl == nullptr) return SERVICE_ERROR_NOT_READY;
324
325 GetBuffer buff = {};
326
327 curl_easy_setopt(curl, CURLOPT_URL, url);
328 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
329 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
330 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, get_callback);
331 curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &buff);
332
333 if (p_idx != nullptr) {
334 Index:: iterator it;
335 if ((it = p_idx->find("CURLOPT_USERNAME")) != p_idx->end())
336 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
337
338 if ((it = p_idx->find("CURLOPT_USERPWD")) != p_idx->end())
339 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
340
341 if ((it = p_idx->find("CURLOPT_COOKIEFILE")) != p_idx->end())
342 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
343
344 if ((it = p_idx->find("CURLOPT_COOKIEJAR")) != p_idx->end())
345 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
346 }
347 c_ret = curl_easy_perform(curl);
348
349 uint64_t response_code;
350
351 if (c_ret == CURLE_OK)
352 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
353
354 curl_easy_cleanup(curl);
355
356 switch (c_ret) {
357 case CURLE_OK:
358 break;
359 case CURLE_REMOTE_ACCESS_DENIED:
360 case CURLE_AUTH_ERROR:
361 return SERVICE_ERROR_READ_FORBIDDEN;
362 case CURLE_REMOTE_FILE_NOT_FOUND:
363 return SERVICE_ERROR_BLOCK_NOT_FOUND;
364 default:
365 return SERVICE_ERROR_IO_ERROR;
366 }
367
368 switch (response_code) {
369 case MHD_HTTP_OK:
370 case MHD_HTTP_CREATED:
371 case MHD_HTTP_ACCEPTED:
372 break;
373 case MHD_HTTP_NOT_FOUND:
374 case MHD_HTTP_GONE:
375 return SERVICE_ERROR_BLOCK_NOT_FOUND;
376 case MHD_HTTP_BAD_REQUEST:
377 return SERVICE_ERROR_WRONG_ARGUMENTS;
378 case MHD_HTTP_UNAUTHORIZED:
379 case MHD_HTTP_PAYMENT_REQUIRED:
380 case MHD_HTTP_FORBIDDEN:
381 case MHD_HTTP_METHOD_NOT_ALLOWED:
382 case MHD_HTTP_NOT_ACCEPTABLE:
383 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
384 case MHD_HTTP_TOO_MANY_REQUESTS:
385 return SERVICE_ERROR_READ_FORBIDDEN;
386 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
387 return SERVICE_ERROR_MISC_SERVER;
388 default:
389 return SERVICE_ERROR_IO_ERROR;
390 }
391 size_t buf_size = buff.size();
392 if (buf_size > MAX_BLOCK_SIZE) return SERVICE_ERROR_BLOCK_TOO_BIG;
393
394 buff.push_back(0);
395
396 return unwrap_received(p_txn, (pBlock) buff.data(), buf_size);
397 }
398
399
410 inline StatusCode curl_put(const char *url, pBlock p_blk, int mode = WRITE_AS_STRING | WRITE_AS_FULL_BLOCK, Index *p_idx = nullptr) {
411 CURL *curl;
412 CURLcode c_ret;
413
414 curl = curl_easy_init();
415 if (curl == nullptr) return SERVICE_ERROR_NOT_READY;
416
417 PutBuffer put_buff;
418
419 if ((mode & WRITE_AS_ANY_WRITE) == 0)
420 mode = WRITE_AS_STRING | WRITE_AS_FULL_BLOCK;
421
422 if ((mode & WRITE_AS_STRING) && ( (p_blk->cell_type == CELL_TYPE_STRING && p_blk->size == 1)
423 || (p_blk->cell_type == CELL_TYPE_BYTE && p_blk->rank == 1))) {
424 if (p_blk->cell_type == CELL_TYPE_STRING) {
425 put_buff.p_base = (uint8_t *) p_blk->get_string(0);
426 put_buff.to_send = strlen((const char *) put_buff.p_base);
427 } else {
428 put_buff.p_base = &p_blk->tensor.cell_byte[0];
429 put_buff.to_send = strnlen((const char *) put_buff.p_base, p_blk->size);
430 }
431 } else if ((mode & WRITE_AS_CONTENT) && ((p_blk->cell_type & 0xf0) == 0)) {
432 put_buff.to_send = p_blk->size*(p_blk->cell_type & 0xff);
433 put_buff.p_base = &p_blk->tensor.cell_byte[0];
434 } else if ((mode & WRITE_AS_FULL_BLOCK) && (p_blk->cell_type != CELL_TYPE_INDEX)) {
435 put_buff.to_send = p_blk->total_bytes;
436 put_buff.p_base = (uint8_t *) p_blk;
437 } else
438 return SERVICE_ERROR_WRONG_ARGUMENTS;
439
440 curl_easy_setopt(curl, CURLOPT_URL, url);
441 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
442 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
443 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dev_null);
444 curl_easy_setopt(curl, CURLOPT_READFUNCTION, put_callback);
445 curl_easy_setopt(curl, CURLOPT_READDATA, (void *) &put_buff);
446 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
447 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t) put_buff.to_send);
448
449 if (p_idx != nullptr) {
450 Index:: iterator it;
451 if ((it = p_idx->find("CURLOPT_USERNAME")) != p_idx->end())
452 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
453
454 if ((it = p_idx->find("CURLOPT_USERPWD")) != p_idx->end())
455 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
456
457 if ((it = p_idx->find("CURLOPT_COOKIEFILE")) != p_idx->end())
458 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
459
460 if ((it = p_idx->find("CURLOPT_COOKIEJAR")) != p_idx->end())
461 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
462 }
463 c_ret = curl_easy_perform(curl);
464
465 uint64_t response_code;
466
467 if (c_ret == CURLE_OK)
468 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
469
470 curl_easy_cleanup(curl);
471
472 switch (c_ret) {
473 case CURLE_OK:
474 break;
475 case CURLE_REMOTE_ACCESS_DENIED:
476 case CURLE_AUTH_ERROR:
477 return SERVICE_ERROR_WRITE_FORBIDDEN;
478 case CURLE_REMOTE_FILE_NOT_FOUND:
479 return SERVICE_ERROR_BLOCK_NOT_FOUND;
480 default:
481 return SERVICE_ERROR_IO_ERROR;
482 }
483
484 switch (response_code) {
485 case MHD_HTTP_OK:
486 case MHD_HTTP_CREATED:
487 case MHD_HTTP_ACCEPTED:
488 return SERVICE_NO_ERROR;
489 case MHD_HTTP_NOT_FOUND:
490 case MHD_HTTP_GONE:
491 return SERVICE_ERROR_BLOCK_NOT_FOUND;
492 case MHD_HTTP_BAD_REQUEST:
493 return SERVICE_ERROR_WRONG_ARGUMENTS;
494 case MHD_HTTP_UNAUTHORIZED:
495 case MHD_HTTP_PAYMENT_REQUIRED:
496 case MHD_HTTP_FORBIDDEN:
497 case MHD_HTTP_METHOD_NOT_ALLOWED:
498 case MHD_HTTP_NOT_ACCEPTABLE:
499 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
500 case MHD_HTTP_TOO_MANY_REQUESTS:
501 return SERVICE_ERROR_WRITE_FORBIDDEN;
502 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
503 return SERVICE_ERROR_MISC_SERVER;
504 }
505 return SERVICE_ERROR_IO_ERROR;
506 }
507
508
517 inline StatusCode curl_remove(const char *url, Index *p_idx = nullptr) {
518 CURL *curl;
519 CURLcode c_ret;
520
521 curl = curl_easy_init();
522 if (curl == nullptr) return SERVICE_ERROR_NOT_READY;
523
524 curl_easy_setopt(curl, CURLOPT_URL, url);
525 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
526 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
527 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
528 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dev_null);
529
530 if (p_idx != nullptr) {
531 Index:: iterator it;
532 if ((it = p_idx->find("CURLOPT_USERNAME")) != p_idx->end())
533 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
534
535 if ((it = p_idx->find("CURLOPT_USERPWD")) != p_idx->end())
536 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
537
538 if ((it = p_idx->find("CURLOPT_COOKIEFILE")) != p_idx->end())
539 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
540
541 if ((it = p_idx->find("CURLOPT_COOKIEJAR")) != p_idx->end())
542 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
543 }
544 c_ret = curl_easy_perform(curl);
545
546 uint64_t response_code;
547
548 if (c_ret == CURLE_OK)
549 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
550
551 curl_easy_cleanup(curl);
552
553 switch (c_ret) {
554 case CURLE_OK:
555 break;
556 case CURLE_REMOTE_ACCESS_DENIED:
557 case CURLE_AUTH_ERROR:
558 return SERVICE_ERROR_WRITE_FORBIDDEN;
559
560 case CURLE_REMOTE_FILE_NOT_FOUND:
561 return SERVICE_ERROR_BLOCK_NOT_FOUND;
562 default:
563 return SERVICE_ERROR_IO_ERROR;
564 }
565
566 switch (response_code) {
567 case MHD_HTTP_OK:
568 case MHD_HTTP_CREATED:
569 case MHD_HTTP_ACCEPTED:
570 return SERVICE_NO_ERROR;
571 case MHD_HTTP_NOT_FOUND:
572 case MHD_HTTP_GONE:
573 return SERVICE_ERROR_BLOCK_NOT_FOUND;
574 case MHD_HTTP_BAD_REQUEST:
575 return SERVICE_ERROR_WRONG_ARGUMENTS;
576 case MHD_HTTP_UNAUTHORIZED:
577 case MHD_HTTP_PAYMENT_REQUIRED:
578 case MHD_HTTP_FORBIDDEN:
579 case MHD_HTTP_METHOD_NOT_ALLOWED:
580 case MHD_HTTP_NOT_ACCEPTABLE:
581 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
582 case MHD_HTTP_TOO_MANY_REQUESTS:
583 return SERVICE_ERROR_WRITE_FORBIDDEN;
584 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
585 return SERVICE_ERROR_MISC_SERVER;
586 }
587 return SERVICE_ERROR_IO_ERROR;
588 }
589
590#ifndef CATCH_TEST
591 private:
592#endif
593 char HEX[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
594
605 inline bool compose_url(pChar p_dest, pChar p_node, pChar p_url, int buff_size) {
606 int nix;
607
608 for (MapIS::iterator it = jazz_node_name.begin(); true; ++it) {
609 if (it == jazz_node_name.end())
610 return false;
611
612 if (it->second == p_node) {
613 nix = it->first;
614 break;
615 }
616 }
617 int ret = snprintf(p_dest, buff_size, "http://%s:%i", jazz_node_ip[nix].c_str(), jazz_node_port[nix]);
618
619 if (ret < 0 || ret >= buff_size)
620 return false;
621
622 p_dest += ret;
623 buff_size -= ret;
624
625 while (buff_size > 0) {
626 u_int8_t cursor = *(p_url++);
627 switch (cursor) {
628 case 0:
629 *p_dest = 0;
630 return true;
631 case '!' ... '$':
632 case '&' ... '/':
633 case '0' ... '9':
634 case ':':
635 case ';':
636 case '=':
637 case '?':
638 case '@':
639 case 'A' ... 'Z':
640 case '[':
641 case ']':
642 case '_':
643 case 'a' ... 'z':
644 case '~':
645 *(p_dest++) = cursor;
646 buff_size--;
647 break;
648 default:
649 if (buff_size < 4)
650 return false;
651 *(p_dest++) = '%';
652 *(p_dest++) = HEX[cursor >> 4];
653 *(p_dest++) = HEX[cursor & 0x0f];
654 buff_size -= 3;
655 }
656 }
657 return false;
658 }
659
660 int can_curl = false;
661 int curl_ok = false;
662 int can_zmq = false;
663 int zmq_ok = false;
664 int can_bash = false;
665 int file_lev = 0;
666
669
670 void *zmq_context = nullptr;
671
672#ifdef CATCH_TEST
673 CURL * curl_easy_init ();
674 CURLcode curl_easy_perform(CURL *curl);
675 CURLcode curl_easy_getinfo(CURL *curl, CURLINFO info, uint64_t *response_code);
676
677 int curl_easy_return_code = CURL_EASY_NO_BYPASS;
678 int curl_easy_response = CURL_EASY_NO_BYPASS;
679#endif
680
681};
683
684
685#ifdef CATCH_TEST
686
687// Instancing Channels
688// -------------------
689
690extern Channels CHN;
691
692#endif
693
694} // namespace jazz_elements
695
696#endif // ifndef INCLUDED_JAZZ_ELEMENTS_CHANNEL
Channels: A Container doing block transactions across media (files, folders, shell,...
Definition channel.h:242
PipeMap pipes
A map of pipelines (zeroMQ connections)
Definition channel.h:667
MHD_StatusCode forward_del(Name node, pChar p_url)
Definition channel.cpp:1133
virtual StatusCode header(StaticBlockHeader &hea, pChar p_what)
Definition channel.cpp:547
int jazz_node_my_index
The index of the node in the cluster.
Definition channel.h:298
virtual StatusCode put(pChar p_where, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT)
Definition channel.cpp:581
StatusCode curl_remove(const char *url, Index *p_idx=nullptr)
The most low level remove function.
Definition channel.h:517
bool compose_url(pChar p_dest, pChar p_node, pChar p_url, int buff_size)
Compose a url from a node, a base and an entity.
Definition channel.h:605
String filesystem_root
The root of the filesystem.
Definition channel.h:301
int can_zmq
If true, the server can use zeroMQ based on configuration key ENABLE_ZEROMQ_CLIENT.
Definition channel.h:662
StatusCode curl_get(pTransaction &p_txn, const char *url, Index *p_idx=nullptr)
The most low level get function.
Definition channel.h:318
StatusCode curl_put(const char *url, pBlock p_blk, int mode=WRITE_AS_STRING|WRITE_AS_FULL_BLOCK, Index *p_idx=nullptr)
The most low level put function.
Definition channel.h:410
virtual StatusCode modify(Locator &function, pTuple p_args)
Definition channel.cpp:946
int file_lev
The level of file operations allowed based on configuration key ENABLE_FILE_LEVEL.
Definition channel.h:665
int can_bash
If true, the server can use bash based on configuration key ENABLE_BASH_EXEC.
Definition channel.h:664
virtual StatusCode remove(pChar p_where)
Definition channel.cpp:798
virtual StatusCode copy(pChar p_where, pChar p_what)
Definition channel.cpp:902
virtual StatusCode get(pTransaction &p_txn, pChar p_what)
Definition channel.cpp:352
void base_names(BaseNames &base_names)
Definition channel.cpp:1073
virtual StatusCode locate(Locator &location, pChar p_what)
Definition channel.cpp:532
MapIS jazz_node_ip
The ip addresses of the nodes in the cluster.
Definition channel.h:295
StatusCode shut_down()
Definition channel.cpp:310
char HEX[16]
Hexadecimal digits.
Definition channel.h:593
int zmq_ok
If true, zeroMQ is ready to be used based on config + zeroMQ initialization.
Definition channel.h:663
MHD_StatusCode forward_put(Name node, pChar p_url, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT)
Definition channel.cpp:1111
StatusCode start()
Definition channel.cpp:150
int can_curl
If true, the server can use libcurl based on configuration key ENABLE_HTTP_CLIENT.
Definition channel.h:660
MapII jazz_node_port
The ports of the nodes in the cluster.
Definition channel.h:296
void * zmq_context
The zeroMQ context.
Definition channel.h:670
MHD_StatusCode forward_get(pTransaction &p_txn, Name node, pChar p_url)
Definition channel.cpp:1090
virtual pChar const id()
Definition channel.cpp:140
MapIS jazz_node_name
The names of the nodes (other Jazz servers) in the cluster.
Definition channel.h:294
int jazz_node_cluster_size
The number of nodes in the cluster.
Definition channel.h:299
ConnMap connect
A map of http connections.
Definition channel.h:668
int curl_ok
If true, libcurl is ready to be used based on config + libcurl initialization.
Definition channel.h:661
virtual StatusCode new_entity(pChar p_where)
Definition channel.cpp:764
A configuration file as a key/value store.
Definition utils.h:218
Container: A Service to manage Jazz blocks. All Jazz blocks are managed by this or a descendant of th...
Definition container.h:287
StatusCode unwrap_received(pTransaction &p_txn)
Definition container.h:438
A simple logger.
Definition utils.h:248
Tuple: A Jazz Block with multiple Tensors.
Definition tuple.h:94
The namespace for Jazz Utils, Blocks, Kinds, Tuples, Containers, etc.
Definition block.cpp:39
std::string String
A standard string used in many other places in Jazz.
Definition types.h:239
GetBuffer * pGetBuffer
A pointer to a GetBuffer.
Definition channel.h:126
std::map< String, Index > ConnMap
A structure holding connections.
Definition channel.h:114
std::map< String, pContainer > BaseNames
A map of names for the containers (or structure engines like "map" or "tree" inside Volatile).
Definition container.h:157
std::map< String, Socket > PipeMap
A structure holding pipeline.
Definition channel.h:138
Channels * pChannels
A pointer to a Channels.
Definition channel.h:682
char * pChar
A pointer to a char buffer.
Definition types.h:189
class Block * pBlock
A (forward defined) pointer to a Block.
Definition block.h:66
char Name[NAME_SIZE]
A short identifier used in Blocks, Containers and API.
Definition types.h:187
std::vector< uint8_t > GetBuffer
A structure to share with the libcurl get callback.
Definition channel.h:125
std::map< int, int > MapII
A map for defining http config ports.
Definition channel.h:142
PutBuffer * pPutBuffer
A pointer to a PutBuffer.
Definition channel.h:134
int StatusCode
Type returned by the Service API.
Definition utils.h:142
size_t get_callback(char *ptr, size_t size, size_t nmemb, void *container)
A callback for libCURL GET.
Definition channel.cpp:61
unsigned int MHD_StatusCode
A proper type for specifying http status codes.
Definition channel.h:156
size_t put_callback(char *ptr, size_t size, size_t nmemb, void *container)
A callback for libCURL PUT.
Definition channel.cpp:107
size_t dev_null(char *_ignore, size_t size, size_t nmemb, void *_ignore_2)
A callback for libCURL GET to ignore all the blocks sent by the server in PUT and DELETE calls.
Definition channel.cpp:89
std::map< int, String > MapIS
A map for defining http config names.
Definition channel.h:110
std::map< String, String > Index
An Index kept in RAM by Volatile implemented as an stdlib map (string, string)
Definition types.h:243
Locator: A minimal structure to define the location of resources inside a Container.
Definition container.h:189
A structure keep state inside a put callback.
Definition channel.h:130
uint8_t * p_base
The pointer (updated after each call) to the data.
Definition channel.h:132
uint64_t to_send
Number of bytes to be sent.
Definition channel.h:131
A structure to hold a single pipeline.
Definition channel.h:118
char endpoint[120]
The endpoint at which the socket is connected.
Definition channel.h:119
void * requester
The (connected) zmq socket.
Definition channel.h:120
A Binary Compatible BlockHeader without Index (and therefore constructors/destructors)
Definition types.h:270
Transaction: A wrapper over a Block that defines the communication of a block with a Container.
Definition container.h:167