mcfadyeni commited on
Commit
5d5056e
·
1 Parent(s): 1669c07

rpc : nicer error messages for RPC server crash (llama/14076)

Browse files
Files changed (1) hide show
  1. ggml/src/ggml-rpc/ggml-rpc.cpp +18 -15
ggml/src/ggml-rpc/ggml-rpc.cpp CHANGED
@@ -53,6 +53,9 @@ struct socket_t {
53
  }
54
  };
55
 
 
 
 
56
  // all RPC structures must be packed
57
  #pragma pack(push, 1)
58
  // ggml_tensor is serialized into rpc_tensor
@@ -425,7 +428,7 @@ static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cm
425
  static bool check_server_version(const std::shared_ptr<socket_t> & sock) {
426
  rpc_msg_hello_rsp response;
427
  bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response));
428
- GGML_ASSERT(status);
429
  if (response.major != RPC_PROTO_MAJOR_VERSION || response.minor > RPC_PROTO_MINOR_VERSION) {
430
  fprintf(stderr, "RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
431
  return false;
@@ -481,7 +484,7 @@ static void ggml_backend_rpc_buffer_free_buffer(ggml_backend_buffer_t buffer) {
481
  ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context;
482
  rpc_msg_free_buffer_req request = {ctx->remote_ptr};
483
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_FREE_BUFFER, &request, sizeof(request), nullptr, 0);
484
- GGML_ASSERT(status);
485
  delete ctx;
486
  }
487
 
@@ -493,7 +496,7 @@ static void * ggml_backend_rpc_buffer_get_base(ggml_backend_buffer_t buffer) {
493
  rpc_msg_buffer_get_base_req request = {ctx->remote_ptr};
494
  rpc_msg_buffer_get_base_rsp response;
495
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_BUFFER_GET_BASE, &request, sizeof(request), &response, sizeof(response));
496
- GGML_ASSERT(status);
497
  ctx->base_ptr = reinterpret_cast<void *>(response.base_ptr);
498
  return ctx->base_ptr;
499
  }
@@ -545,7 +548,7 @@ static enum ggml_status ggml_backend_rpc_buffer_init_tensor(ggml_backend_buffer_
545
  request.tensor = serialize_tensor(tensor);
546
 
547
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_INIT_TENSOR, &request, sizeof(request), nullptr, 0);
548
- GGML_ASSERT(status);
549
  }
550
  return GGML_STATUS_SUCCESS;
551
  }
@@ -560,7 +563,7 @@ static void ggml_backend_rpc_buffer_set_tensor(ggml_backend_buffer_t buffer, ggm
560
  request.hash = fnv_hash((const uint8_t*)data, size);
561
  rpc_msg_set_tensor_hash_rsp response;
562
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_SET_TENSOR_HASH, &request, sizeof(request), &response, sizeof(response));
563
- GGML_ASSERT(status);
564
  if (response.result) {
565
  // the server has the same data, no need to send it
566
  return;
@@ -573,7 +576,7 @@ static void ggml_backend_rpc_buffer_set_tensor(ggml_backend_buffer_t buffer, ggm
573
  memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset));
574
  memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), data, size);
575
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_SET_TENSOR, input.data(), input.size());
576
- GGML_ASSERT(status);
577
  }
578
 
579
  static void ggml_backend_rpc_buffer_get_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * tensor, void * data, size_t offset, size_t size) {
@@ -583,7 +586,7 @@ static void ggml_backend_rpc_buffer_get_tensor(ggml_backend_buffer_t buffer, con
583
  request.offset = offset;
584
  request.size = size;
585
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_GET_TENSOR, &request, sizeof(request), data, size);
586
- GGML_ASSERT(status);
587
  }
588
 
589
  static bool ggml_backend_rpc_buffer_cpy_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * src, ggml_tensor * dst) {
@@ -601,7 +604,7 @@ static bool ggml_backend_rpc_buffer_cpy_tensor(ggml_backend_buffer_t buffer, con
601
  request.dst = serialize_tensor(dst);
602
  rpc_msg_copy_tensor_rsp response;
603
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_COPY_TENSOR, &request, sizeof(request), &response, sizeof(response));
604
- GGML_ASSERT(status);
605
  return response.result;
606
  }
607
 
@@ -609,7 +612,7 @@ static void ggml_backend_rpc_buffer_clear(ggml_backend_buffer_t buffer, uint8_t
609
  ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context;
610
  rpc_msg_buffer_clear_req request = {ctx->remote_ptr, value};
611
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_BUFFER_CLEAR, &request, sizeof(request), nullptr, 0);
612
- GGML_ASSERT(status);
613
  }
614
 
615
  static ggml_backend_buffer_i ggml_backend_rpc_buffer_interface = {
@@ -635,7 +638,7 @@ static ggml_backend_buffer_t ggml_backend_rpc_buffer_type_alloc_buffer(ggml_back
635
  rpc_msg_alloc_buffer_rsp response;
636
  auto sock = get_socket(buft_ctx->endpoint);
637
  bool status = send_rpc_cmd(sock, RPC_CMD_ALLOC_BUFFER, &request, sizeof(request), &response, sizeof(response));
638
- GGML_ASSERT(status);
639
  if (response.remote_ptr != 0) {
640
  ggml_backend_buffer_t buffer = ggml_backend_buffer_init(buft,
641
  ggml_backend_rpc_buffer_interface,
@@ -650,7 +653,7 @@ static ggml_backend_buffer_t ggml_backend_rpc_buffer_type_alloc_buffer(ggml_back
650
  static size_t get_alignment(const std::shared_ptr<socket_t> & sock) {
651
  rpc_msg_get_alignment_rsp response;
652
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_ALIGNMENT, nullptr, 0, &response, sizeof(response));
653
- GGML_ASSERT(status);
654
  return response.alignment;
655
  }
656
 
@@ -662,7 +665,7 @@ static size_t ggml_backend_rpc_buffer_type_get_alignment(ggml_backend_buffer_typ
662
  static size_t get_max_size(const std::shared_ptr<socket_t> & sock) {
663
  rpc_msg_get_max_size_rsp response;
664
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_MAX_SIZE, nullptr, 0, &response, sizeof(response));
665
- GGML_ASSERT(status);
666
  return response.max_size;
667
  }
668
 
@@ -683,7 +686,7 @@ static size_t ggml_backend_rpc_buffer_type_get_alloc_size(ggml_backend_buffer_ty
683
 
684
  rpc_msg_get_alloc_size_rsp response;
685
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_ALLOC_SIZE, &request, sizeof(request), &response, sizeof(response));
686
- GGML_ASSERT(status);
687
 
688
  return response.alloc_size;
689
  } else {
@@ -761,7 +764,7 @@ static enum ggml_status ggml_backend_rpc_graph_compute(ggml_backend_t backend, g
761
  rpc_msg_graph_compute_rsp response;
762
  auto sock = get_socket(rpc_ctx->endpoint);
763
  bool status = send_rpc_cmd(sock, RPC_CMD_GRAPH_COMPUTE, input.data(), input.size(), &response, sizeof(response));
764
- GGML_ASSERT(status);
765
  return (enum ggml_status)response.result;
766
  }
767
 
@@ -835,7 +838,7 @@ bool ggml_backend_is_rpc(ggml_backend_t backend) {
835
  static void get_device_memory(const std::shared_ptr<socket_t> & sock, size_t * free, size_t * total) {
836
  rpc_msg_get_device_memory_rsp response;
837
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_DEVICE_MEMORY, nullptr, 0, &response, sizeof(response));
838
- GGML_ASSERT(status);
839
  *free = response.free_mem;
840
  *total = response.total_mem;
841
  }
 
53
  }
54
  };
55
 
56
+ // macro for nicer error messages on server crash
57
+ #define RPC_STATUS_ASSERT(x) if (!(x)) GGML_ABORT("Remote RPC server crashed or returned malformed response")
58
+
59
  // all RPC structures must be packed
60
  #pragma pack(push, 1)
61
  // ggml_tensor is serialized into rpc_tensor
 
428
  static bool check_server_version(const std::shared_ptr<socket_t> & sock) {
429
  rpc_msg_hello_rsp response;
430
  bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response));
431
+ RPC_STATUS_ASSERT(status);
432
  if (response.major != RPC_PROTO_MAJOR_VERSION || response.minor > RPC_PROTO_MINOR_VERSION) {
433
  fprintf(stderr, "RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
434
  return false;
 
484
  ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context;
485
  rpc_msg_free_buffer_req request = {ctx->remote_ptr};
486
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_FREE_BUFFER, &request, sizeof(request), nullptr, 0);
487
+ RPC_STATUS_ASSERT(status);
488
  delete ctx;
489
  }
490
 
 
496
  rpc_msg_buffer_get_base_req request = {ctx->remote_ptr};
497
  rpc_msg_buffer_get_base_rsp response;
498
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_BUFFER_GET_BASE, &request, sizeof(request), &response, sizeof(response));
499
+ RPC_STATUS_ASSERT(status);
500
  ctx->base_ptr = reinterpret_cast<void *>(response.base_ptr);
501
  return ctx->base_ptr;
502
  }
 
548
  request.tensor = serialize_tensor(tensor);
549
 
550
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_INIT_TENSOR, &request, sizeof(request), nullptr, 0);
551
+ RPC_STATUS_ASSERT(status);
552
  }
553
  return GGML_STATUS_SUCCESS;
554
  }
 
563
  request.hash = fnv_hash((const uint8_t*)data, size);
564
  rpc_msg_set_tensor_hash_rsp response;
565
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_SET_TENSOR_HASH, &request, sizeof(request), &response, sizeof(response));
566
+ RPC_STATUS_ASSERT(status);
567
  if (response.result) {
568
  // the server has the same data, no need to send it
569
  return;
 
576
  memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset));
577
  memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), data, size);
578
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_SET_TENSOR, input.data(), input.size());
579
+ RPC_STATUS_ASSERT(status);
580
  }
581
 
582
  static void ggml_backend_rpc_buffer_get_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * tensor, void * data, size_t offset, size_t size) {
 
586
  request.offset = offset;
587
  request.size = size;
588
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_GET_TENSOR, &request, sizeof(request), data, size);
589
+ RPC_STATUS_ASSERT(status);
590
  }
591
 
592
  static bool ggml_backend_rpc_buffer_cpy_tensor(ggml_backend_buffer_t buffer, const ggml_tensor * src, ggml_tensor * dst) {
 
604
  request.dst = serialize_tensor(dst);
605
  rpc_msg_copy_tensor_rsp response;
606
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_COPY_TENSOR, &request, sizeof(request), &response, sizeof(response));
607
+ RPC_STATUS_ASSERT(status);
608
  return response.result;
609
  }
610
 
 
612
  ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context;
613
  rpc_msg_buffer_clear_req request = {ctx->remote_ptr, value};
614
  bool status = send_rpc_cmd(ctx->sock, RPC_CMD_BUFFER_CLEAR, &request, sizeof(request), nullptr, 0);
615
+ RPC_STATUS_ASSERT(status);
616
  }
617
 
618
  static ggml_backend_buffer_i ggml_backend_rpc_buffer_interface = {
 
638
  rpc_msg_alloc_buffer_rsp response;
639
  auto sock = get_socket(buft_ctx->endpoint);
640
  bool status = send_rpc_cmd(sock, RPC_CMD_ALLOC_BUFFER, &request, sizeof(request), &response, sizeof(response));
641
+ RPC_STATUS_ASSERT(status);
642
  if (response.remote_ptr != 0) {
643
  ggml_backend_buffer_t buffer = ggml_backend_buffer_init(buft,
644
  ggml_backend_rpc_buffer_interface,
 
653
  static size_t get_alignment(const std::shared_ptr<socket_t> & sock) {
654
  rpc_msg_get_alignment_rsp response;
655
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_ALIGNMENT, nullptr, 0, &response, sizeof(response));
656
+ RPC_STATUS_ASSERT(status);
657
  return response.alignment;
658
  }
659
 
 
665
  static size_t get_max_size(const std::shared_ptr<socket_t> & sock) {
666
  rpc_msg_get_max_size_rsp response;
667
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_MAX_SIZE, nullptr, 0, &response, sizeof(response));
668
+ RPC_STATUS_ASSERT(status);
669
  return response.max_size;
670
  }
671
 
 
686
 
687
  rpc_msg_get_alloc_size_rsp response;
688
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_ALLOC_SIZE, &request, sizeof(request), &response, sizeof(response));
689
+ RPC_STATUS_ASSERT(status);
690
 
691
  return response.alloc_size;
692
  } else {
 
764
  rpc_msg_graph_compute_rsp response;
765
  auto sock = get_socket(rpc_ctx->endpoint);
766
  bool status = send_rpc_cmd(sock, RPC_CMD_GRAPH_COMPUTE, input.data(), input.size(), &response, sizeof(response));
767
+ RPC_STATUS_ASSERT(status);
768
  return (enum ggml_status)response.result;
769
  }
770
 
 
838
  static void get_device_memory(const std::shared_ptr<socket_t> & sock, size_t * free, size_t * total) {
839
  rpc_msg_get_device_memory_rsp response;
840
  bool status = send_rpc_cmd(sock, RPC_CMD_GET_DEVICE_MEMORY, nullptr, 0, &response, sizeof(response));
841
+ RPC_STATUS_ASSERT(status);
842
  *free = response.free_mem;
843
  *total = response.total_mem;
844
  }