COSXMLUploadTask.cs 27 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using COSXML.Model;
  4. using COSXML.CosException;
  5. using COSXML.Model.Object;
  6. using System.IO;
  7. using COSXML.Common;
  8. using COSXML.Utils;
  9. using COSXML.Model.Tag;
  10. using COSXML.Log;
  11. using COSXML.Model.Bucket;
  12. using System.Threading;
  13. namespace COSXML.Transfer
  14. {
  15. public sealed class COSXMLUploadTask : COSXMLTask, IOnMultipartUploadStateListener
  16. {
  17. private long divisionSize;
  18. private long sliceSize;
  19. private const int MAX_ACTIVIE_TASKS = 2;
  20. private volatile int activieTasks = 0;
  21. private long sendOffset = 0L;
  22. // 实际要发送的总长度,类似于content-length
  23. private long sendContentLength = -1L;
  24. private string srcPath;
  25. private PutObjectRequest putObjectRequest;
  26. private DeleteObjectRequest deleteObjectRequest;
  27. private Object syncExit = new Object();
  28. private bool isExit = false;
  29. private ListPartsRequest listPartsRequest;
  30. private InitMultipartUploadRequest initMultiUploadRequest;
  31. private string uploadId;
  32. private Dictionary<UploadPartRequest, long> uploadPartRequestMap;
  33. private List<UploadPartRequest> uploadPartRequestList;
  34. private List<SliceStruct> sliceList;
  35. private Object syncPartCopyCount = new object();
  36. private int sliceCount;
  37. private long hasReceiveDataLength = 0;
  38. private object syncProgress = new Object();
  39. private CompleteMultipartUploadRequest completeMultiUploadRequest;
  40. private AbortMultipartUploadRequest abortMultiUploadRequest;
  41. private ListMultiUploadsRequest listMultiUploadsRequest;
  42. public int MaxConcurrent { private get; set; } = MAX_ACTIVIE_TASKS;
  43. public bool UseResumableUpload { private get; set; } = true;
  44. public string StorageClass { private get; set; }
  45. public COSXMLUploadTask(string bucket, string key)
  46. : base(bucket, key)
  47. {
  48. }
  49. public COSXMLUploadTask(PutObjectRequest request)
  50. : base(request.Bucket, request.Region, request.Key)
  51. {
  52. SetHeaders(request.GetRequestHeaders());
  53. }
  54. internal void SetDivision(long divisionSize, long sliceSize)
  55. {
  56. this.divisionSize = divisionSize;
  57. this.sliceSize = sliceSize;
  58. }
  59. public void SetSrcPath(string srcPath)
  60. {
  61. SetSrcPath(srcPath, -1L, -1L);
  62. }
  63. public void SetSrcPath(string srcPath, long fileOffset, long contentLength)
  64. {
  65. this.srcPath = srcPath;
  66. this.sendOffset = fileOffset >= 0 ? fileOffset : 0;
  67. this.sendContentLength = contentLength >= 0 ? contentLength : -1L;
  68. }
  69. public void SetUploadId(string uploadId)
  70. {
  71. this.uploadId = uploadId;
  72. }
  73. public string GetUploadId()
  74. {
  75. return uploadId;
  76. }
  77. internal void Upload()
  78. {
  79. //UpdateTaskState(TaskState.WAITTING);
  80. taskState = TaskState.Waiting;
  81. hasReceiveDataLength = 0;
  82. FileInfo fileInfo = null;
  83. long sourceLength = 0;
  84. try
  85. {
  86. fileInfo = new FileInfo(srcPath);
  87. sourceLength = fileInfo.Length;
  88. }
  89. catch (Exception ex)
  90. {
  91. lock (syncExit)
  92. {
  93. if (isExit)
  94. {
  95. return;
  96. }
  97. }
  98. if (UpdateTaskState(TaskState.Failed))
  99. {
  100. if (failCallback != null)
  101. {
  102. failCallback(new CosClientException((int)CosClientError.InvalidArgument, ex.Message, ex), null);
  103. }
  104. }
  105. //error
  106. return;
  107. }
  108. if (sendContentLength == -1L || (sendContentLength + sendOffset > sourceLength))
  109. {
  110. sendContentLength = sourceLength - sendOffset;
  111. }
  112. taskState = TaskState.Running;
  113. if (sendContentLength > divisionSize)
  114. {
  115. MultiUpload();
  116. }
  117. else
  118. {
  119. SimpleUpload();
  120. }
  121. }
  122. private void SimpleUpload()
  123. {
  124. putObjectRequest = new PutObjectRequest(bucket, key, srcPath, sendOffset, sendContentLength);
  125. if (customHeaders != null)
  126. {
  127. putObjectRequest.SetRequestHeaders(customHeaders);
  128. }
  129. if (progressCallback != null)
  130. {
  131. putObjectRequest.SetCosProgressCallback(progressCallback);
  132. }
  133. if (StorageClass != null)
  134. {
  135. putObjectRequest.SetCosStorageClass(StorageClass);
  136. }
  137. cosXmlServer.PutObject(putObjectRequest, delegate (CosResult cosResult)
  138. {
  139. lock (syncExit)
  140. {
  141. if (isExit)
  142. {
  143. if (taskState == TaskState.Cancel)
  144. {
  145. DeleteObject();
  146. }
  147. return;
  148. }
  149. }
  150. if (UpdateTaskState(TaskState.Completed))
  151. {
  152. PutObjectResult result = cosResult as PutObjectResult;
  153. UploadTaskResult uploadTaskResult = new UploadTaskResult();
  154. uploadTaskResult.SetResult(result);
  155. if (successCallback != null)
  156. {
  157. successCallback(uploadTaskResult);
  158. }
  159. }
  160. },
  161. delegate (CosClientException clientEx, CosServerException serverEx)
  162. {
  163. lock (syncExit)
  164. {
  165. if (isExit)
  166. {
  167. return;
  168. }
  169. }
  170. if (UpdateTaskState(TaskState.Failed))
  171. {
  172. if (failCallback != null)
  173. {
  174. failCallback(clientEx, serverEx);
  175. }
  176. }
  177. });
  178. }
  179. private void MultiUpload()
  180. {
  181. ComputeSliceNums();
  182. if (uploadId != null)
  183. {
  184. ListMultiParts();
  185. }
  186. else
  187. {
  188. if (UseResumableUpload)
  189. {
  190. CheckResumeblaUpload();
  191. }
  192. else
  193. {
  194. InitMultiUploadPart();
  195. }
  196. }
  197. }
  198. private void InitMultiUploadPart()
  199. {
  200. initMultiUploadRequest = new InitMultipartUploadRequest(bucket, key);
  201. if (customHeaders != null)
  202. {
  203. initMultiUploadRequest.SetRequestHeaders(customHeaders);
  204. }
  205. if (StorageClass != null)
  206. {
  207. initMultiUploadRequest.SetCosStorageClass(StorageClass);
  208. }
  209. cosXmlServer.InitMultipartUpload(initMultiUploadRequest, delegate (CosResult cosResult)
  210. {
  211. lock (syncExit)
  212. {
  213. if (isExit)
  214. {
  215. return;
  216. }
  217. }
  218. InitMultipartUploadResult result = cosResult as InitMultipartUploadResult;
  219. uploadId = result.initMultipartUpload.uploadId;
  220. //通知执行PartCopy
  221. OnInit();
  222. },
  223. delegate (CosClientException clientEx, CosServerException serverEx)
  224. {
  225. lock (syncExit)
  226. {
  227. if (isExit)
  228. {
  229. return;
  230. }
  231. }
  232. if (UpdateTaskState(TaskState.Failed))
  233. {
  234. OnFailed(clientEx, serverEx);
  235. }
  236. });
  237. }
  238. private void CheckResumeblaUpload()
  239. {
  240. listMultiUploadsRequest = new ListMultiUploadsRequest(bucket);
  241. listMultiUploadsRequest.SetPrefix(key);
  242. cosXmlServer.ListMultiUploads(listMultiUploadsRequest, delegate (CosResult cosResult)
  243. {
  244. // 取最新符合条件的uploadId
  245. ListMultiUploadsResult result = cosResult as ListMultiUploadsResult;
  246. var uploads = result.listMultipartUploads;
  247. if (uploads.uploads != null && uploads.uploads.Count > 0)
  248. {
  249. for (int i = uploads.uploads.Count - 1; i >= 0; i--)
  250. {
  251. var upload = uploads.uploads[i];
  252. if (upload.key != key)
  253. {
  254. continue;
  255. }
  256. CheckAllUploadParts(upload.uploadID);
  257. return;
  258. }
  259. }
  260. else
  261. {
  262. InitMultiUploadPart();
  263. }
  264. },
  265. delegate (CosClientException clientEx, CosServerException serverEx)
  266. {
  267. lock (syncExit)
  268. {
  269. if (isExit)
  270. {
  271. return;
  272. }
  273. }
  274. if (UpdateTaskState(TaskState.Failed))
  275. {
  276. OnFailed(clientEx, serverEx);
  277. }
  278. });
  279. }
  280. private void CheckAllUploadParts(string uploadId)
  281. {
  282. bool checkSucc = true;
  283. listPartsRequest = new ListPartsRequest(bucket, key, uploadId);
  284. cosXmlServer.ListParts(listPartsRequest, delegate (CosResult cosResult)
  285. {
  286. lock (syncExit)
  287. {
  288. if (isExit)
  289. {
  290. return;
  291. }
  292. }
  293. ListPartsResult result = cosResult as ListPartsResult;
  294. Dictionary<int, SliceStruct> sourceParts = new Dictionary<int, SliceStruct>(sliceList.Count);
  295. foreach (SliceStruct sliceStruct in sliceList)
  296. {
  297. sourceParts.Add(sliceStruct.partNumber, sliceStruct);
  298. }
  299. //检查已上传块的ETag和本地ETag是否一致
  300. foreach (ListParts.Part part in result.listParts.parts)
  301. {
  302. int partNumber = -1;
  303. bool parse = int.TryParse(part.partNumber, out partNumber);
  304. if (!parse)
  305. {
  306. throw new ArgumentException("ListParts.Part parse error");
  307. }
  308. SliceStruct sliceStruct = sourceParts[partNumber];
  309. //计算本地ETag
  310. if (!CompareSliceMD5(srcPath, sliceStruct.sliceStart, sliceStruct.sliceLength, part.eTag))
  311. {
  312. checkSucc = false;
  313. }
  314. }
  315. if (checkSucc)
  316. {
  317. this.uploadId = uploadId;
  318. UpdateSliceNums(result);
  319. OnInit();
  320. }
  321. else
  322. {
  323. InitMultiUploadPart();
  324. }
  325. },
  326. delegate (CosClientException clientEx, CosServerException serverEx)
  327. {
  328. lock(syncExit)
  329. {
  330. if (isExit)
  331. {
  332. return;
  333. }
  334. }
  335. if (UpdateTaskState(TaskState.Failed))
  336. {
  337. OnFailed(clientEx, serverEx);
  338. }
  339. });
  340. }
  341. private void ListMultiParts()
  342. {
  343. listPartsRequest = new ListPartsRequest(bucket, key, uploadId);
  344. cosXmlServer.ListParts(listPartsRequest, delegate (CosResult cosResult)
  345. {
  346. lock (syncExit)
  347. {
  348. if (isExit)
  349. {
  350. return;
  351. }
  352. }
  353. ListPartsResult result = cosResult as ListPartsResult;
  354. //指定uploadId时, 对已上传分块做校验, 已通过校验的分块会纳入续传范围
  355. UpdateSliceNums(result);
  356. //跳过Init流程
  357. OnInit();
  358. },
  359. delegate (CosClientException clientEx, CosServerException serverEx)
  360. {
  361. lock (syncExit)
  362. {
  363. if (isExit)
  364. {
  365. return;
  366. }
  367. }
  368. if (UpdateTaskState(TaskState.Failed))
  369. {
  370. OnFailed(clientEx, serverEx);
  371. }
  372. });
  373. }
  374. private void UploadPart()
  375. {
  376. activieTasks = 0;
  377. int size = sliceList.Count;
  378. sliceCount = size;
  379. uploadPartRequestMap = new Dictionary<UploadPartRequest, long>(size);
  380. uploadPartRequestList = new List<UploadPartRequest>(size);
  381. AutoResetEvent resetEvent = new AutoResetEvent(false);
  382. for (int i = 0; i < size; i++)
  383. {
  384. if (activieTasks > MaxConcurrent)
  385. {
  386. resetEvent.WaitOne();
  387. }
  388. lock (syncExit)
  389. {
  390. if (isExit)
  391. {
  392. return;
  393. }
  394. }
  395. SliceStruct sliceStruct = sliceList[i];
  396. if (!sliceStruct.isAlreadyUpload)
  397. {
  398. UploadPartRequest uploadPartRequest = new UploadPartRequest(bucket, key, sliceStruct.partNumber, uploadId, srcPath,
  399. sliceStruct.sliceStart, sliceStruct.sliceLength);
  400. if (customHeaders != null && customHeaders.ContainsKey(CosRequestHeaderKey.X_COS_TRAFFIC_LIMIT))
  401. {
  402. string trafficLimit = customHeaders[CosRequestHeaderKey.X_COS_TRAFFIC_LIMIT];
  403. uploadPartRequest.LimitTraffic(Convert.ToInt64(trafficLimit));
  404. }
  405. //打印进度
  406. uploadPartRequest.SetCosProgressCallback(
  407. delegate (long completed, long total)
  408. {
  409. lock (syncProgress)
  410. {
  411. long dataLen = hasReceiveDataLength + completed - uploadPartRequestMap[uploadPartRequest];
  412. UpdateProgress(dataLen, sendContentLength, false);
  413. hasReceiveDataLength = dataLen;
  414. uploadPartRequestMap[uploadPartRequest] = completed;
  415. }
  416. }
  417. );
  418. uploadPartRequestMap.Add(uploadPartRequest, 0);
  419. uploadPartRequestList.Add(uploadPartRequest);
  420. Interlocked.Increment(ref activieTasks);
  421. cosXmlServer.UploadPart(uploadPartRequest, delegate (CosResult result)
  422. {
  423. Interlocked.Decrement(ref activieTasks);
  424. UploadPartResult uploadPartResult = result as UploadPartResult;
  425. sliceStruct.eTag = uploadPartResult.eTag;
  426. lock (syncPartCopyCount)
  427. {
  428. sliceCount--;
  429. if (sliceCount == 0)
  430. {
  431. OnPart();
  432. }
  433. }
  434. resetEvent.Set();
  435. }, delegate (CosClientException clientEx, CosServerException serverEx)
  436. {
  437. Interlocked.Decrement(ref activieTasks);
  438. if (UpdateTaskState(TaskState.Failed))
  439. {
  440. OnFailed(clientEx, serverEx);
  441. }
  442. resetEvent.Set();
  443. });
  444. }
  445. else
  446. {
  447. lock (syncPartCopyCount)
  448. {
  449. sliceCount--;
  450. if (sliceCount == 0)
  451. {
  452. OnPart();
  453. return;
  454. }
  455. }
  456. }
  457. }
  458. }
  459. private void UpdateProgress(long complete, long total, bool isCompleted)
  460. {
  461. lock (syncExit)
  462. {
  463. if (isExit)
  464. {
  465. return;
  466. }
  467. }
  468. if (complete < total)
  469. {
  470. if (progressCallback != null)
  471. {
  472. progressCallback(complete, total);
  473. }
  474. }
  475. else
  476. {
  477. if (isCompleted)
  478. {
  479. if (progressCallback != null)
  480. {
  481. progressCallback(complete, total);
  482. }
  483. }
  484. else
  485. {
  486. if (progressCallback != null)
  487. {
  488. progressCallback(total - 1, total);
  489. }
  490. }
  491. }
  492. }
  493. private void CompleteMultipartUpload()
  494. {
  495. completeMultiUploadRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId);
  496. foreach (SliceStruct sliceStruct in sliceList)
  497. {
  498. // partNumberEtag 有序的
  499. // partNumberEtag 有序的
  500. completeMultiUploadRequest.SetPartNumberAndETag(sliceStruct.partNumber, sliceStruct.eTag);
  501. }
  502. cosXmlServer.CompleteMultiUpload(completeMultiUploadRequest, delegate (CosResult result)
  503. {
  504. lock (syncExit)
  505. {
  506. if (isExit)
  507. {
  508. return;
  509. }
  510. }
  511. if (UpdateTaskState(TaskState.Completed))
  512. {
  513. CompleteMultipartUploadResult completeMultiUploadResult = result as CompleteMultipartUploadResult;
  514. OnCompleted(completeMultiUploadResult);
  515. }
  516. }, delegate (CosClientException clientEx, CosServerException serverEx)
  517. {
  518. lock (syncExit)
  519. {
  520. if (isExit)
  521. {
  522. return;
  523. }
  524. }
  525. if (UpdateTaskState(TaskState.Failed))
  526. {
  527. OnFailed(clientEx, serverEx);
  528. }
  529. });
  530. }
  531. private void ComputeSliceNums()
  532. {
  533. int count = (int)(sendContentLength / sliceSize);
  534. sliceList = new List<SliceStruct>(count > 0 ? count : 1);
  535. // partNumber >= 1
  536. // partNumber >= 1
  537. int i = 1;
  538. for (; i < count; i++)
  539. {
  540. SliceStruct sliceStruct = new SliceStruct();
  541. sliceStruct.partNumber = i;
  542. sliceStruct.isAlreadyUpload = false;
  543. sliceStruct.sliceStart = sendOffset + (i - 1) * sliceSize;
  544. sliceStruct.sliceLength = sliceSize;
  545. sliceStruct.sliceEnd = sendOffset + i * sliceSize - 1;
  546. sliceList.Add(sliceStruct);
  547. }
  548. SliceStruct lastSliceStruct = new SliceStruct();
  549. lastSliceStruct.partNumber = i;
  550. lastSliceStruct.isAlreadyUpload = false;
  551. lastSliceStruct.sliceStart = sendOffset + (i - 1) * sliceSize;
  552. lastSliceStruct.sliceLength = sendContentLength - (i - 1) * sliceSize;
  553. lastSliceStruct.sliceEnd = sendOffset + sendContentLength - 1;
  554. sliceList.Add(lastSliceStruct);
  555. }
  556. private void UpdateSliceNums(ListPartsResult listPartsResult)
  557. {
  558. try
  559. {
  560. if (listPartsResult.listParts.parts != null)
  561. {
  562. //获取原来的parts并提取partNumber
  563. Dictionary<int, SliceStruct> sourceParts = new Dictionary<int, SliceStruct>(sliceList.Count);
  564. foreach (SliceStruct sliceStruct in sliceList)
  565. {
  566. sourceParts.Add(sliceStruct.partNumber, sliceStruct);
  567. }
  568. foreach (ListParts.Part part in listPartsResult.listParts.parts)
  569. {
  570. int partNumber = -1;
  571. bool parse = int.TryParse(part.partNumber, out partNumber);
  572. if (!parse)
  573. {
  574. throw new ArgumentException("ListParts.Part parse error");
  575. }
  576. SliceStruct sliceStruct = sourceParts[partNumber];
  577. sliceStruct.isAlreadyUpload = true;
  578. sliceStruct.eTag = part.eTag;
  579. lock (syncProgress)
  580. {
  581. long size = 0L;
  582. long.TryParse(part.size, out size);
  583. hasReceiveDataLength += size;
  584. }
  585. }
  586. }
  587. }
  588. catch (Exception ex)
  589. {
  590. lock (syncExit)
  591. {
  592. if (isExit)
  593. {
  594. return;
  595. }
  596. }
  597. if (UpdateTaskState(TaskState.Failed))
  598. {
  599. OnFailed(new CosClientException((int)CosClientError.InternalError, ex.Message, ex), null);
  600. }
  601. }
  602. }
  603. private bool CompareSliceMD5(string localFile, long offset, long length, string crc64ecma)
  604. {
  605. Crc64.InitECMA();
  606. String hash = String.Empty;
  607. try
  608. {
  609. using (FileStream fs = File.OpenRead(localFile))
  610. {
  611. fs.Seek(offset, SeekOrigin.Begin);
  612. string md5 = DigestUtils.GetMD5HexString(fs, length);
  613. fs.Close();
  614. crc64ecma = crc64ecma.Trim('"');
  615. return md5 == crc64ecma;
  616. }
  617. }
  618. catch (Exception e)
  619. {
  620. return false;
  621. }
  622. }
  623. public void OnInit()
  624. {
  625. //获取了uploadId
  626. UploadPart();
  627. }
  628. public void OnPart()
  629. {
  630. //获取了 part ETag
  631. CompleteMultipartUpload();
  632. }
  633. public void OnCompleted(CompleteMultipartUploadResult result)
  634. {
  635. UpdateProgress(sendContentLength, sendContentLength, true);
  636. //lock (syncExit)
  637. //{
  638. // isExit = true;
  639. //}
  640. if (successCallback != null)
  641. {
  642. UploadTaskResult uploadTaskResult = new UploadTaskResult();
  643. uploadTaskResult.SetResult(result);
  644. successCallback(uploadTaskResult);
  645. }
  646. }
  647. public void OnFailed(CosClientException clientEx, CosServerException serverEx)
  648. {
  649. lock (syncExit)
  650. {
  651. isExit = true;
  652. }
  653. if (failCallback != null)
  654. {
  655. failCallback(clientEx, serverEx);
  656. }
  657. }
  658. private void Abort()
  659. {
  660. abortMultiUploadRequest = new AbortMultipartUploadRequest(bucket, key, uploadId);
  661. cosXmlServer.AbortMultiUpload(abortMultiUploadRequest,
  662. delegate (CosResult cosResult)
  663. {
  664. },
  665. delegate (CosClientException cosClientException, CosServerException cosServerException)
  666. {
  667. DeleteObject();
  668. }
  669. );
  670. }
  671. private void DeleteObject()
  672. {
  673. deleteObjectRequest = new DeleteObjectRequest(bucket, key);
  674. cosXmlServer.DeleteObject(deleteObjectRequest,
  675. delegate (CosResult cosResult)
  676. {
  677. },
  678. delegate (CosClientException cosClientException, CosServerException cosServerException)
  679. {
  680. }
  681. );
  682. }
  683. private void RealCancle()
  684. {
  685. //cancle request
  686. cosXmlServer.Cancel(putObjectRequest);
  687. cosXmlServer.Cancel(initMultiUploadRequest);
  688. cosXmlServer.Cancel(completeMultiUploadRequest);
  689. cosXmlServer.Cancel(listPartsRequest);
  690. if (uploadPartRequestList != null)
  691. {
  692. foreach (UploadPartRequest uploadPartRequest in uploadPartRequestList)
  693. {
  694. cosXmlServer.Cancel(uploadPartRequest);
  695. }
  696. }
  697. }
  698. public override void Pause()
  699. {
  700. if (UpdateTaskState(TaskState.Pause))
  701. {
  702. //exit upload
  703. lock (syncExit)
  704. {
  705. isExit = true;
  706. }
  707. //cancle request
  708. RealCancle();
  709. }
  710. }
  711. public override void Cancel()
  712. {
  713. if (UpdateTaskState(TaskState.Cancel))
  714. {
  715. //exit upload
  716. lock (syncExit)
  717. {
  718. isExit = true;
  719. }
  720. //cancle request
  721. RealCancle();
  722. //abort
  723. Abort();
  724. uploadId = null;
  725. // throw exception if requested
  726. if (throwExceptionIfCancelled) {
  727. throw new CosClientException((int)CosClientError.UserCancelled, "Upload Task Cancelled by user");
  728. }
  729. }
  730. }
  731. public override void Resume()
  732. {
  733. if (UpdateTaskState(TaskState.Resume))
  734. {
  735. lock (syncExit)
  736. {
  737. //continue to upload
  738. //continue to upload
  739. isExit = false;
  740. }
  741. Upload();
  742. }
  743. }
  744. public class UploadTaskResult : CosResult
  745. {
  746. public string eTag;
  747. public void SetResult(PutObjectResult result)
  748. {
  749. this.eTag = result.eTag;
  750. this.httpCode = result.httpCode;
  751. this.httpMessage = result.httpMessage;
  752. this.responseHeaders = result.responseHeaders;
  753. }
  754. public void SetResult(CompleteMultipartUploadResult result)
  755. {
  756. this.eTag = result.completeResult.eTag;
  757. this.httpCode = result.httpCode;
  758. this.httpMessage = result.httpMessage;
  759. this.responseHeaders = result.responseHeaders;
  760. }
  761. public override string GetResultInfo()
  762. {
  763. return base.GetResultInfo() + ("\n : ETag: " + eTag);
  764. }
  765. }
  766. }
  767. }