37#include <microhttpd.h>
40#include "src/jazz_elements/container.h"
43#ifndef INCLUDED_JAZZ_CATCH2
44#define INCLUDED_JAZZ_CATCH2
46#include "src/catch2/catch.hpp"
52#ifndef INCLUDED_JAZZ_ELEMENTS_CHANNEL
53#define INCLUDED_JAZZ_ELEMENTS_CHANNEL
59#define BASE_BASH_10BIT 0x022
60#define BASE_FILE_10BIT 0x126
61#define BASE_HTTP_10BIT 0x288
62#define BASE_0_MQ_10BIT 0x1b0
65#define MAX_FILE_OR_URL_SIZE 1712
69#define APPLY_NOTHING 0
72#define APPLY_FUNCTION 3
73#define APPLY_FUNCT_CONST 4
75#define APPLY_FILT_CONST 6
78#define APPLY_ASSIGN_NOTHING 9
79#define APPLY_ASSIGN_NAME 10
80#define APPLY_ASSIGN_URL 11
81#define APPLY_ASSIGN_FUNCTION 12
82#define APPLY_ASSIGN_FUNCT_CONST 13
83#define APPLY_ASSIGN_FILTER 14
84#define APPLY_ASSIGN_FILT_CONST 15
85#define APPLY_ASSIGN_RAW 16
86#define APPLY_ASSIGN_TEXT 17
87#define APPLY_ASSIGN_CONST 18
88#define APPLY_NEW_ENTITY 19
89#define APPLY_GET_ATTRIBUTE 20
90#define APPLY_SET_ATTRIBUTE 21
91#define APPLY_JAZZ_INFO 22
95typedef std::map<int, std::string>
MapIS;
99typedef std::map<std::string, Index>
ConnMap;
143extern size_t get_callback(
char *ptr,
size_t size,
size_t nmemb,
void *container);
144extern size_t put_callback(
char *ptr,
size_t size,
size_t nmemb,
void *container);
145extern size_t dev_null(
char *_ignore,
size_t size,
size_t nmemb,
void *_ignore_2);
256 int mode = WRITE_AS_BASE_DEFAULT);
269 int mode = WRITE_AS_BASE_DEFAULT);
308 curl = curl_easy_init();
310 return SERVICE_ERROR_NOT_READY;
314 curl_easy_setopt(curl, CURLOPT_URL, url);
315 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
316 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
317 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
get_callback);
318 curl_easy_setopt(curl, CURLOPT_WRITEDATA, (
void *) &buff);
320 if (p_idx !=
nullptr) {
322 if ((it = p_idx->find(
"CURLOPT_USERNAME")) != p_idx->end())
323 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
325 if ((it = p_idx->find(
"CURLOPT_USERPWD")) != p_idx->end())
326 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
328 if ((it = p_idx->find(
"CURLOPT_COOKIEFILE")) != p_idx->end())
329 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
331 if ((it = p_idx->find(
"CURLOPT_COOKIEJAR")) != p_idx->end())
332 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
334 c_ret = curl_easy_perform(curl);
336 uint64_t response_code;
338 if (c_ret == CURLE_OK)
339 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
341 curl_easy_cleanup(curl);
346 case CURLE_REMOTE_ACCESS_DENIED:
347 case CURLE_AUTH_ERROR:
348 return SERVICE_ERROR_READ_FORBIDDEN;
349 case CURLE_REMOTE_FILE_NOT_FOUND:
350 return SERVICE_ERROR_BLOCK_NOT_FOUND;
352 return SERVICE_ERROR_IO_ERROR;
355 switch (response_code) {
357 case MHD_HTTP_CREATED:
358 case MHD_HTTP_ACCEPTED:
360 case MHD_HTTP_NOT_FOUND:
362 return SERVICE_ERROR_BLOCK_NOT_FOUND;
363 case MHD_HTTP_BAD_REQUEST:
364 return SERVICE_ERROR_WRONG_ARGUMENTS;
365 case MHD_HTTP_UNAUTHORIZED:
366 case MHD_HTTP_PAYMENT_REQUIRED:
367 case MHD_HTTP_FORBIDDEN:
368 case MHD_HTTP_METHOD_NOT_ALLOWED:
369 case MHD_HTTP_NOT_ACCEPTABLE:
370 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
371 case MHD_HTTP_TOO_MANY_REQUESTS:
372 return SERVICE_ERROR_READ_FORBIDDEN;
373 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
374 return SERVICE_ERROR_MISC_SERVER;
376 return SERVICE_ERROR_IO_ERROR;
378 size_t buf_size = buff.size();
379 if (buf_size > MAX_BLOCK_SIZE)
380 return SERVICE_ERROR_BLOCK_TOO_BIG;
402 curl = curl_easy_init();
404 return SERVICE_ERROR_NOT_READY;
408 if ((mode & WRITE_AS_ANY_WRITE) == 0)
409 mode = WRITE_AS_STRING | WRITE_AS_FULL_BLOCK;
411 if ((mode & WRITE_AS_STRING) && ( (p_blk->cell_type == CELL_TYPE_STRING && p_blk->size == 1)
412 || (p_blk->cell_type == CELL_TYPE_BYTE && p_blk->rank == 1))) {
413 if (p_blk->cell_type == CELL_TYPE_STRING) {
414 put_buff.
p_base = (uint8_t *) p_blk->get_string(0);
417 put_buff.
p_base = &p_blk->tensor.cell_byte[0];
418 put_buff.
to_send = strnlen((
const char *) put_buff.
p_base, p_blk->size);
420 }
else if ((mode & WRITE_AS_CONTENT) && ((p_blk->cell_type & 0xf0) == 0)) {
421 put_buff.
to_send = p_blk->size*(p_blk->cell_type & 0xff);
422 put_buff.
p_base = &p_blk->tensor.cell_byte[0];
423 }
else if ((mode & WRITE_AS_FULL_BLOCK) && (p_blk->cell_type != CELL_TYPE_INDEX)) {
424 put_buff.
to_send = p_blk->total_bytes;
425 put_buff.
p_base = (uint8_t *) p_blk;
427 return SERVICE_ERROR_WRONG_ARGUMENTS;
429 curl_easy_setopt(curl, CURLOPT_URL, url);
430 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
431 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
432 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
dev_null);
433 curl_easy_setopt(curl, CURLOPT_READFUNCTION,
put_callback);
434 curl_easy_setopt(curl, CURLOPT_READDATA, (
void *) &put_buff);
435 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
436 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t) put_buff.
to_send);
438 if (p_idx !=
nullptr) {
440 if ((it = p_idx->find(
"CURLOPT_USERNAME")) != p_idx->end())
441 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
443 if ((it = p_idx->find(
"CURLOPT_USERPWD")) != p_idx->end())
444 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
446 if ((it = p_idx->find(
"CURLOPT_COOKIEFILE")) != p_idx->end())
447 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
449 if ((it = p_idx->find(
"CURLOPT_COOKIEJAR")) != p_idx->end())
450 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
452 c_ret = curl_easy_perform(curl);
454 uint64_t response_code;
456 if (c_ret == CURLE_OK)
457 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
459 curl_easy_cleanup(curl);
464 case CURLE_REMOTE_ACCESS_DENIED:
465 case CURLE_AUTH_ERROR:
466 return SERVICE_ERROR_WRITE_FORBIDDEN;
467 case CURLE_REMOTE_FILE_NOT_FOUND:
468 return SERVICE_ERROR_BLOCK_NOT_FOUND;
470 return SERVICE_ERROR_IO_ERROR;
473 switch (response_code) {
475 case MHD_HTTP_CREATED:
476 case MHD_HTTP_ACCEPTED:
477 return SERVICE_NO_ERROR;
478 case MHD_HTTP_NOT_FOUND:
480 return SERVICE_ERROR_BLOCK_NOT_FOUND;
481 case MHD_HTTP_BAD_REQUEST:
482 return SERVICE_ERROR_WRONG_ARGUMENTS;
483 case MHD_HTTP_UNAUTHORIZED:
484 case MHD_HTTP_PAYMENT_REQUIRED:
485 case MHD_HTTP_FORBIDDEN:
486 case MHD_HTTP_METHOD_NOT_ALLOWED:
487 case MHD_HTTP_NOT_ACCEPTABLE:
488 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
489 case MHD_HTTP_TOO_MANY_REQUESTS:
490 return SERVICE_ERROR_WRITE_FORBIDDEN;
491 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
492 return SERVICE_ERROR_MISC_SERVER;
494 return SERVICE_ERROR_IO_ERROR;
510 curl = curl_easy_init();
512 return SERVICE_ERROR_NOT_READY;
514 curl_easy_setopt(curl, CURLOPT_URL, url);
515 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0);
516 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
517 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"DELETE");
518 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
dev_null);
520 if (p_idx !=
nullptr) {
522 if ((it = p_idx->find(
"CURLOPT_USERNAME")) != p_idx->end())
523 curl_easy_setopt(curl, CURLOPT_USERNAME, it->second.c_str());
525 if ((it = p_idx->find(
"CURLOPT_USERPWD")) != p_idx->end())
526 curl_easy_setopt(curl, CURLOPT_USERPWD, it->second.c_str());
528 if ((it = p_idx->find(
"CURLOPT_COOKIEFILE")) != p_idx->end())
529 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, it->second.c_str());
531 if ((it = p_idx->find(
"CURLOPT_COOKIEJAR")) != p_idx->end())
532 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, it->second.c_str());
534 c_ret = curl_easy_perform(curl);
536 uint64_t response_code;
538 if (c_ret == CURLE_OK)
539 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
541 curl_easy_cleanup(curl);
546 case CURLE_REMOTE_ACCESS_DENIED:
547 case CURLE_AUTH_ERROR:
548 return SERVICE_ERROR_WRITE_FORBIDDEN;
550 case CURLE_REMOTE_FILE_NOT_FOUND:
551 return SERVICE_ERROR_BLOCK_NOT_FOUND;
553 return SERVICE_ERROR_IO_ERROR;
556 switch (response_code) {
558 case MHD_HTTP_CREATED:
559 case MHD_HTTP_ACCEPTED:
560 return SERVICE_NO_ERROR;
561 case MHD_HTTP_NOT_FOUND:
563 return SERVICE_ERROR_BLOCK_NOT_FOUND;
564 case MHD_HTTP_BAD_REQUEST:
565 return SERVICE_ERROR_WRONG_ARGUMENTS;
566 case MHD_HTTP_UNAUTHORIZED:
567 case MHD_HTTP_PAYMENT_REQUIRED:
568 case MHD_HTTP_FORBIDDEN:
569 case MHD_HTTP_METHOD_NOT_ALLOWED:
570 case MHD_HTTP_NOT_ACCEPTABLE:
571 case MHD_HTTP_PROXY_AUTHENTICATION_REQUIRED:
572 case MHD_HTTP_TOO_MANY_REQUESTS:
573 return SERVICE_ERROR_WRITE_FORBIDDEN;
574 case MHD_HTTP_INTERNAL_SERVER_ERROR ... MHD_HTTP_LOOP_DETECTED:
575 return SERVICE_ERROR_MISC_SERVER;
577 return SERVICE_ERROR_IO_ERROR;
583 char HEX[16] = {
'0',
'1',
'2',
'3',
'4',
'5',
'6',
'7',
'8',
'9',
'A',
'B',
'C',
'D',
'E',
'F'};
602 if (it->second == p_node) {
609 if (ret < 0 || ret >= buff_size)
615 while (buff_size > 0) {
616 u_int8_t cursor = *(p_url++);
635 *(p_dest++) = cursor;
642 *(p_dest++) =
HEX[cursor >> 4];
643 *(p_dest++) =
HEX[cursor & 0x0f];
Channels: A Container doing block transactions across media (files, folders, shell,...
Definition channel.h:227
PipeMap pipes
A map of pipelines (zeroMQ connections)
Definition channel.h:657
MHD_StatusCode forward_del(Name node, pChar p_url)
Definition channel.cpp:1110
virtual StatusCode header(StaticBlockHeader &hea, pChar p_what)
Definition channel.cpp:504
int jazz_node_my_index
The index of the node in the cluster.
Definition channel.h:284
virtual StatusCode put(pChar p_where, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT)
Definition channel.cpp:538
StatusCode curl_remove(const char *url, Index *p_idx=nullptr)
The most low level remove function.
Definition channel.h:506
bool compose_url(pChar p_dest, pChar p_node, pChar p_url, int buff_size)
Compose a url from a node, a base and an entity.
Definition channel.h:595
int can_zmq
If true, the server can use zeroMQ based on configuration key ENABLE_ZEROMQ_CLIENT.
Definition channel.h:652
StatusCode curl_get(pTransaction &p_txn, const char *url, Index *p_idx=nullptr)
The most low level get function.
Definition channel.h:304
StatusCode curl_put(const char *url, pBlock p_blk, int mode=WRITE_AS_STRING|WRITE_AS_FULL_BLOCK, Index *p_idx=nullptr)
The most low level put function.
Definition channel.h:398
virtual StatusCode modify(Locator &function, pTuple p_args)
Definition channel.cpp:911
int file_lev
The level of file operations allowed based on configuration key ENABLE_FILE_LEVEL.
Definition channel.h:655
int can_bash
If true, the server can use bash based on configuration key ENABLE_BASH_EXEC.
Definition channel.h:654
virtual StatusCode remove(pChar p_where)
Definition channel.cpp:754
virtual StatusCode copy(pChar p_where, pChar p_what)
Definition channel.cpp:868
virtual StatusCode get(pTransaction &p_txn, pChar p_what)
Definition channel.cpp:306
void base_names(BaseNames &base_names)
Definition channel.cpp:1046
virtual StatusCode locate(Locator &location, pChar p_what)
Definition channel.cpp:489
MapIS jazz_node_ip
The ip addresses of the nodes in the cluster.
Definition channel.h:280
bool search_my_node_index
If true, the node index is searched in the cluster.
Definition channel.h:283
StatusCode shut_down()
Definition channel.cpp:262
char HEX[16]
Hexadecimal digits.
Definition channel.h:583
int zmq_ok
If true, zeroMQ is ready to be used based on config + zeroMQ initialization.
Definition channel.h:653
MHD_StatusCode forward_put(Name node, pChar p_url, pBlock p_block, int mode=WRITE_AS_BASE_DEFAULT)
Definition channel.cpp:1086
std::string filesystem_root
The root of the filesystem.
Definition channel.h:287
StatusCode start()
Definition channel.cpp:148
int can_curl
If true, the server can use libcurl based on configuration key ENABLE_HTTP_CLIENT.
Definition channel.h:650
MapII jazz_node_port
The ports of the nodes in the cluster.
Definition channel.h:281
void * zmq_context
The zeroMQ context.
Definition channel.h:660
MHD_StatusCode forward_get(pTransaction &p_txn, Name node, pChar p_url)
Definition channel.cpp:1063
virtual pChar const id()
Definition channel.cpp:138
MapIS jazz_node_name
The names of the nodes (other Jazz servers) in the cluster.
Definition channel.h:279
int jazz_node_cluster_size
The number of nodes in the cluster.
Definition channel.h:285
ConnMap connect
A map of http connections.
Definition channel.h:658
int curl_ok
If true, libcurl is ready to be used based on config + libcurl initialization.
Definition channel.h:651
virtual StatusCode new_entity(pChar p_where)
Definition channel.cpp:716
A configuration file as a key/value store.
Definition utils.h:217
Container: A Service to manage Jazz blocks. All Jazz blocks are managed by this or a descendant of th...
Definition container.h:282
StatusCode unwrap_received(pTransaction &p_txn)
Definition container.h:429
A simple logger.
Definition utils.h:245
Tuple: A Jazz Block with multiple Tensors.
Definition tuple.h:94
The namespace for Jazz Utils, Blocks, Kinds, Tuples, Containers, etc.
Definition block.cpp:39
GetBuffer * pGetBuffer
A pointer to a GetBuffer.
Definition channel.h:111
std::map< std::string, std::string > Index
An Index kept in RAM by Volatile implemented as an stdlib map (string, string)
Definition types.h:238
Channels * pChannels
A pointer to a Channels.
Definition channel.h:662
std::map< std::string, Index > ConnMap
A structure holding connections.
Definition channel.h:99
char * pChar
A pointer to a char buffer.
Definition types.h:185
class Block * pBlock
A (forward defined) pointer to a Block.
Definition block.h:66
char Name[NAME_SIZE]
A short identifier used in Blocks, Containers and API.
Definition types.h:183
std::vector< uint8_t > GetBuffer
A structure to share with the libcurl get callback.
Definition channel.h:110
std::map< std::string, Socket > PipeMap
A structure holding pipeline.
Definition channel.h:123
std::map< int, std::string > MapIS
A map for defining http config names.
Definition channel.h:95
std::map< std::string, pContainer > BaseNames
A map of names for the containers (or structure engines like "map" or "tree" inside Volatile).
Definition container.h:152
std::map< int, int > MapII
A map for defining http config ports.
Definition channel.h:127
PutBuffer * pPutBuffer
A pointer to a PutBuffer.
Definition channel.h:119
int StatusCode
Type returned by the Service API.
Definition utils.h:141
size_t get_callback(char *ptr, size_t size, size_t nmemb, void *container)
A callback for libCURL GET.
Definition channel.cpp:59
unsigned int MHD_StatusCode
A proper type for specifying http status codes.
Definition channel.h:141
size_t put_callback(char *ptr, size_t size, size_t nmemb, void *container)
A callback for libCURL PUT.
Definition channel.cpp:105
size_t dev_null(char *_ignore, size_t size, size_t nmemb, void *_ignore_2)
A callback for libCURL GET to ignore all the blocks sent by the server in PUT and DELETE calls.
Definition channel.cpp:87
Locator: A minimal structure to define the location of resources inside a Container.
Definition container.h:184
A structure keep state inside a put callback.
Definition channel.h:115
uint8_t * p_base
The pointer (updated after each call) to the data.
Definition channel.h:117
uint64_t to_send
Number of bytes to be sent.
Definition channel.h:116
A structure to hold a single pipeline.
Definition channel.h:103
char endpoint[120]
The endpoint at which the socket is connected.
Definition channel.h:104
void * requester
The (connected) zmq socket.
Definition channel.h:105
Transaction: A wrapper over a Block that defines the communication of a block with a Container.
Definition container.h:162