COSXMLCopyTask.cs 19 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using COSXML.Model.Tag;
  5. using COSXML.Model.Object;
  6. using COSXML.Utils;
  7. using COSXML.Model;
  8. using COSXML.CosException;
  9. using COSXML.Log;
  10. using COSXML.Common;
  11. namespace COSXML.Transfer
  12. {
  13. public sealed class COSXMLCopyTask : COSXMLTask, IOnMultipartUploadStateListener
  14. {
  15. private long divisionSize;
  16. private long sliceSize;
  17. private CopySourceStruct copySource;
  18. private HeadObjectRequest headObjectRequest;
  19. private DeleteObjectRequest deleteObjectRequest;
  20. private long sourceSize;
  21. private CopyObjectRequest copyObjectRequest;
  22. private Object syncExit = new Object();
  23. private bool isExit = false;
  24. private InitMultipartUploadRequest initMultiUploadRequest;
  25. private string uploadId;
  26. private ListPartsRequest listPartsRequest;
  27. private List<UploadPartCopyRequest> uploadCopyCopyRequestList;
  28. private List<SliceStruct> sliceList;
  29. private Object syncPartCopyCount = new object();
  30. private int sliceCount;
  31. private CompleteMultipartUploadRequest completeMultiUploadRequest;
  32. private AbortMultipartUploadRequest abortMultiUploadRequest;
  33. public bool CompleteOnAllPartsCopyed { get; set; } = true;
  34. public COSXMLCopyTask(string bucket, string key, CopySourceStruct copySource)
  35. : base(bucket, key)
  36. {
  37. this.copySource = copySource;
  38. }
  39. internal void SetDivision(long divisionSize, long sliceSize)
  40. {
  41. this.divisionSize = divisionSize;
  42. this.sliceSize = sliceSize;
  43. }
  44. internal void Copy()
  45. {
  46. UpdateTaskState(TaskState.Waiting);
  47. //源对象是否存在
  48. if (copySource == null)
  49. {
  50. lock (syncExit)
  51. {
  52. if (isExit)
  53. {
  54. return;
  55. }
  56. }
  57. if (UpdateTaskState(TaskState.Failed))
  58. {
  59. if (failCallback != null)
  60. {
  61. failCallback(new CosClientException((int)CosClientError.InvalidArgument, "copySource = null"), null);
  62. }
  63. }
  64. //error
  65. return;
  66. }
  67. headObjectRequest = new HeadObjectRequest(copySource.bucket, copySource.key);
  68. headObjectRequest.Region = copySource.region;
  69. cosXmlServer.HeadObject(headObjectRequest, delegate (CosResult cosResult)
  70. {
  71. lock (syncExit)
  72. {
  73. if (isExit)
  74. {
  75. return;
  76. }
  77. }
  78. if (UpdateTaskState(TaskState.Running))
  79. {
  80. HeadObjectResult result = cosResult as HeadObjectResult;
  81. //源对象的长度
  82. sourceSize = result.size;
  83. if (sourceSize > divisionSize)
  84. {
  85. MultiPartCopy();
  86. }
  87. else
  88. {
  89. SimpleCopy();
  90. }
  91. }
  92. },
  93. delegate (CosClientException clientEx, CosServerException serverEx)
  94. {
  95. lock (syncExit)
  96. {
  97. if (isExit)
  98. {
  99. return;
  100. }
  101. }
  102. if (UpdateTaskState(TaskState.Failed))
  103. {
  104. if (failCallback != null)
  105. {
  106. failCallback(clientEx, serverEx);
  107. }
  108. }
  109. });
  110. }
  111. private void SimpleCopy()
  112. {
  113. copyObjectRequest = new CopyObjectRequest(bucket, key);
  114. copyObjectRequest.SetCopyMetaDataDirective(Common.CosMetaDataDirective.Copy);
  115. copyObjectRequest.SetCopySource(copySource);
  116. cosXmlServer.CopyObject(copyObjectRequest,
  117. delegate (CosResult cosResult)
  118. {
  119. lock (syncExit)
  120. {
  121. if (isExit)
  122. {
  123. if (taskState == TaskState.Cancel)
  124. {
  125. DeleteObject();
  126. }
  127. return;
  128. }
  129. }
  130. if (UpdateTaskState(TaskState.Completed))
  131. {
  132. CopyObjectResult result = cosResult as CopyObjectResult;
  133. CopyTaskResult copyTaskResult = new CopyTaskResult();
  134. copyTaskResult.SetResult(result);
  135. if (successCallback != null)
  136. {
  137. successCallback(copyTaskResult);
  138. }
  139. }
  140. },
  141. delegate (CosClientException clientEx, CosServerException serverEx)
  142. {
  143. lock (syncExit)
  144. {
  145. if (isExit)
  146. {
  147. return;
  148. }
  149. }
  150. if (UpdateTaskState(TaskState.Failed))
  151. {
  152. if (failCallback != null)
  153. {
  154. failCallback(clientEx, serverEx);
  155. }
  156. }
  157. }
  158. );
  159. }
  160. private void MultiPartCopy()
  161. {
  162. if (uploadId != null)
  163. {
  164. //list -> partCopy -> complete
  165. ListParts();
  166. }
  167. else
  168. {
  169. // init -> partCopy - > complete
  170. InitMultiUploadPart();
  171. }
  172. }
  173. private void InitMultiUploadPart()
  174. {
  175. initMultiUploadRequest = new InitMultipartUploadRequest(bucket, key);
  176. cosXmlServer.InitMultipartUpload(initMultiUploadRequest, delegate (CosResult cosResult)
  177. {
  178. lock (syncExit)
  179. {
  180. if (isExit)
  181. {
  182. return;
  183. }
  184. }
  185. InitMultipartUploadResult result = cosResult as InitMultipartUploadResult;
  186. uploadId = result.initMultipartUpload.uploadId;
  187. //计算分片块
  188. ComputeSliceNums();
  189. //通知执行PartCopy
  190. OnInit();
  191. },
  192. delegate (CosClientException clientEx, CosServerException serverEx)
  193. {
  194. lock (syncExit)
  195. {
  196. if (isExit)
  197. {
  198. return;
  199. }
  200. }
  201. if (UpdateTaskState(TaskState.Failed))
  202. {
  203. OnFailed(clientEx, serverEx);
  204. }
  205. });
  206. }
  207. private void ListParts()
  208. {
  209. listPartsRequest = new ListPartsRequest(bucket, key, uploadId);
  210. cosXmlServer.ListParts(listPartsRequest, delegate (CosResult cosResult)
  211. {
  212. lock (syncExit)
  213. {
  214. if (isExit)
  215. {
  216. return;
  217. }
  218. }
  219. ListPartsResult result = cosResult as ListPartsResult;
  220. //更新分块
  221. UpdateSliceNums(result);
  222. //通知执行PartCopy
  223. OnInit();
  224. },
  225. delegate (CosClientException clientEx, CosServerException serverEx)
  226. {
  227. lock (syncExit)
  228. {
  229. if (isExit)
  230. {
  231. return;
  232. }
  233. }
  234. if (UpdateTaskState(TaskState.Failed))
  235. {
  236. OnFailed(clientEx, serverEx);
  237. }
  238. });
  239. }
  240. private void PartCopy()
  241. {
  242. int size = sliceList.Count;
  243. sliceCount = size;
  244. uploadCopyCopyRequestList = new List<UploadPartCopyRequest>(size);
  245. for (int i = 0; i < size; i++)
  246. {
  247. if (isExit)
  248. {
  249. return;
  250. }
  251. SliceStruct sliceStruct = sliceList[i];
  252. if (!sliceStruct.isAlreadyUpload)
  253. {
  254. UploadPartCopyRequest uploadPartCopyRequest = new UploadPartCopyRequest(bucket, key, sliceStruct.partNumber, uploadId);
  255. uploadPartCopyRequest.SetCopySource(copySource);
  256. uploadPartCopyRequest.SetCopyRange(sliceStruct.sliceStart, sliceStruct.sliceEnd);
  257. uploadCopyCopyRequestList.Add(uploadPartCopyRequest);
  258. cosXmlServer.PartCopy(uploadPartCopyRequest, delegate (CosResult result)
  259. {
  260. lock (syncExit)
  261. {
  262. if (isExit)
  263. {
  264. return;
  265. }
  266. }
  267. UploadPartCopyResult uploadPartCopyResult = result as UploadPartCopyResult;
  268. sliceStruct.eTag = uploadPartCopyResult.copyPart.eTag;
  269. lock (syncPartCopyCount)
  270. {
  271. sliceCount--;
  272. if (sliceCount == 0)
  273. {
  274. OnPart();
  275. return;
  276. }
  277. }
  278. }, delegate (CosClientException clientEx, CosServerException serverEx)
  279. {
  280. lock (syncExit)
  281. {
  282. if (isExit)
  283. {
  284. return;
  285. }
  286. }
  287. if (UpdateTaskState(TaskState.Failed))
  288. {
  289. OnFailed(clientEx, serverEx);
  290. }
  291. return;
  292. });
  293. }
  294. else
  295. {
  296. lock (syncPartCopyCount)
  297. {
  298. sliceCount--;
  299. if (sliceCount == 0)
  300. {
  301. OnPart();
  302. return;
  303. }
  304. }
  305. }
  306. }
  307. }
  308. private void ComputeSliceNums()
  309. {
  310. int count = (int)(sourceSize / sliceSize);
  311. sliceList = new List<SliceStruct>(count > 0 ? count : 1);
  312. int i = 1;
  313. for (; i < count; i++)
  314. {
  315. SliceStruct sliceStruct = new SliceStruct();
  316. sliceStruct.partNumber = i;
  317. sliceStruct.isAlreadyUpload = false;
  318. sliceStruct.sliceStart = (i - 1) * sliceSize;
  319. sliceStruct.sliceEnd = i * sliceSize - 1;
  320. sliceList.Add(sliceStruct);
  321. }
  322. SliceStruct lastSliceStruct = new SliceStruct();
  323. lastSliceStruct.partNumber = i;
  324. lastSliceStruct.isAlreadyUpload = false;
  325. lastSliceStruct.sliceStart = (i - 1) * sliceSize;
  326. lastSliceStruct.sliceEnd = sourceSize - 1;
  327. sliceList.Add(lastSliceStruct);
  328. }
  329. private void UpdateSliceNums(ListPartsResult listPartsResult)
  330. {
  331. try
  332. {
  333. if (listPartsResult.listParts.parts != null)
  334. {
  335. //获取原来的parts并提取partNumber
  336. Dictionary<int, SliceStruct> sourceParts = new Dictionary<int, SliceStruct>(sliceList.Count);
  337. foreach (SliceStruct sliceStruct in sliceList)
  338. {
  339. sourceParts.Add(sliceStruct.partNumber, sliceStruct);
  340. }
  341. foreach (ListParts.Part part in listPartsResult.listParts.parts)
  342. {
  343. int partNumber = -1;
  344. bool parse = int.TryParse(part.partNumber, out partNumber);
  345. if (!parse)
  346. {
  347. throw new ArgumentException("ListParts.Part parse error");
  348. }
  349. SliceStruct sliceStruct = sourceParts[partNumber];
  350. sliceStruct.isAlreadyUpload = true;
  351. sliceStruct.eTag = part.eTag;
  352. }
  353. }
  354. }
  355. catch (Exception ex)
  356. {
  357. lock (syncExit)
  358. {
  359. if (isExit)
  360. {
  361. return;
  362. }
  363. }
  364. if (UpdateTaskState(TaskState.Failed))
  365. {
  366. OnFailed(new CosClientException((int)CosClientError.InternalError, ex.Message, ex), null);
  367. }
  368. }
  369. }
  370. private void CompleteMultipartUpload()
  371. {
  372. completeMultiUploadRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId);
  373. foreach (SliceStruct sliceStruct in sliceList)
  374. {
  375. completeMultiUploadRequest.SetPartNumberAndETag(sliceStruct.partNumber, sliceStruct.eTag);
  376. }
  377. cosXmlServer.CompleteMultiUpload(completeMultiUploadRequest, delegate (CosResult result)
  378. {
  379. lock (syncExit)
  380. {
  381. if (isExit)
  382. {
  383. return;
  384. }
  385. }
  386. if (UpdateTaskState(TaskState.Completed))
  387. {
  388. CompleteMultipartUploadResult completeMultiUploadResult = result as CompleteMultipartUploadResult;
  389. OnCompleted(completeMultiUploadResult);
  390. }
  391. }, delegate (CosClientException clientEx, CosServerException serverEx)
  392. {
  393. lock (syncExit)
  394. {
  395. if (isExit)
  396. {
  397. return;
  398. }
  399. }
  400. if (UpdateTaskState(TaskState.Failed))
  401. {
  402. OnFailed(clientEx, serverEx);
  403. }
  404. });
  405. }
  406. public void OnInit()
  407. {
  408. //获取了uploadId
  409. PartCopy();
  410. }
  411. public void OnPart()
  412. {
  413. if (CompleteOnAllPartsCopyed)
  414. {
  415. //获取了 part ETag
  416. CompleteMultipartUpload();
  417. }
  418. }
  419. public void OnCompleted(CompleteMultipartUploadResult result)
  420. {
  421. uploadId = null;
  422. //lock (syncExit)
  423. //{
  424. // isExit = true;
  425. //}
  426. //success
  427. if (successCallback != null)
  428. {
  429. CopyTaskResult copyTaskResult = new CopyTaskResult();
  430. copyTaskResult.SetResult(result);
  431. successCallback(copyTaskResult);
  432. }
  433. }
  434. public void OnFailed(CosClientException clientEx, CosServerException serverEx)
  435. {
  436. if (!isExit)
  437. {
  438. lock (syncExit)
  439. {
  440. isExit = true;
  441. }
  442. }
  443. if (failCallback != null)
  444. {
  445. failCallback(clientEx, serverEx);
  446. }
  447. }
  448. private void Abort()
  449. {
  450. abortMultiUploadRequest = new AbortMultipartUploadRequest(bucket, key, uploadId);
  451. cosXmlServer.AbortMultiUpload(abortMultiUploadRequest,
  452. delegate (CosResult cosResult)
  453. {
  454. },
  455. delegate (CosClientException cosClientException, CosServerException cosServerException)
  456. {
  457. DeleteObject();
  458. }
  459. );
  460. }
  461. private void DeleteObject()
  462. {
  463. deleteObjectRequest = new DeleteObjectRequest(bucket, key);
  464. cosXmlServer.DeleteObject(deleteObjectRequest,
  465. delegate (CosResult cosResult)
  466. {
  467. },
  468. delegate (CosClientException cosClientException, CosServerException cosServerException)
  469. {
  470. }
  471. );
  472. }
  473. private void RealCancle()
  474. {
  475. //cancle request
  476. cosXmlServer.Cancel(headObjectRequest);
  477. cosXmlServer.Cancel(copyObjectRequest);
  478. cosXmlServer.Cancel(initMultiUploadRequest);
  479. cosXmlServer.Cancel(completeMultiUploadRequest);
  480. if (uploadCopyCopyRequestList != null)
  481. {
  482. foreach (UploadPartCopyRequest uploadPartCopyRequest in uploadCopyCopyRequestList)
  483. {
  484. cosXmlServer.Cancel(uploadPartCopyRequest);
  485. }
  486. }
  487. }
  488. public override void Pause()
  489. {
  490. if (UpdateTaskState(TaskState.Pause))
  491. {
  492. //exit copy
  493. lock (syncExit)
  494. {
  495. isExit = true;
  496. }
  497. //cancle request
  498. RealCancle();
  499. }
  500. }
  501. public override void Cancel()
  502. {
  503. if (UpdateTaskState(TaskState.Cancel))
  504. {
  505. //exit copy
  506. lock (syncExit)
  507. {
  508. isExit = true;
  509. }
  510. //cancle request
  511. RealCancle();
  512. //abort
  513. Abort();
  514. uploadId = null;
  515. // throw exception if requested
  516. if (throwExceptionIfCancelled) {
  517. throw new CosClientException((int)CosClientError.UserCancelled, "Copy Task Cancelled by user");
  518. }
  519. }
  520. }
  521. public override void Resume()
  522. {
  523. if (UpdateTaskState(TaskState.Resume))
  524. {
  525. lock (syncExit)
  526. {
  527. isExit = false;
  528. }
  529. Copy();
  530. }
  531. }
  532. public class CopyTaskResult : CosResult
  533. {
  534. public string eTag;
  535. public void SetResult(CopyObjectResult result)
  536. {
  537. this.eTag = result.copyObject.eTag;
  538. this.httpCode = result.httpCode;
  539. this.httpMessage = result.httpMessage;
  540. this.responseHeaders = result.responseHeaders;
  541. }
  542. public void SetResult(CompleteMultipartUploadResult result)
  543. {
  544. this.eTag = result.completeResult.eTag;
  545. this.httpCode = result.httpCode;
  546. this.httpMessage = result.httpMessage;
  547. this.responseHeaders = result.responseHeaders;
  548. }
  549. public override string GetResultInfo()
  550. {
  551. return base.GetResultInfo() + ("\n : ETag: " + eTag);
  552. }
  553. }
  554. }
  555. }