Files
playground/oss.go.bak
XuanLee-HEALER df39693dff commit
2025-11-10 15:30:21 +08:00

192 lines
5.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"log"
mrand "math/rand"
"strconv"
"sync"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)
const (
ACCESS_KEY_ID = "LTAI5tMRYxK6YpEoNZ4U1AdS"
ACCESS_KEY_SECRET = "6qTiPMHmubJ3Y6CRNx8gpg9c2MCtrb"
BUCKET_NAME string = "goalfyagent-data-test"
ENDPOINT = "http://oss-us-west-1.aliyuncs.com"
REGION = "us-west-1"
)
const (
totalFiles = 1000
minFileSizeKB = 15
maxFileSizeKB = 20
// 协程数
numWorkers = 100
objectPrefix = "user/100/goalfymax/UserGlobalData"
)
// 任务结构体,包含文件名和文件内容
type uploadTask struct {
filename string
data []byte
}
// 模拟生成指定大小的随机文件数据
func generateRandomData(minKB, maxKB int) []byte {
sizeKB := minKB + mrand.Intn(maxKB-minKB+1)
sizeBytes := sizeKB * 1024
data := make([]byte, sizeBytes)
_, err := rand.Read(data)
if err != nil {
log.Fatalf("Failed to generate random data: %v", err)
}
return data
}
// 文件生成器,将任务发送到通道
func fileGenerator(tasks chan<- uploadTask, wg *sync.WaitGroup) {
defer wg.Done()
for i := range totalFiles {
// 创建层级目录
dirLevel1 := strconv.Itoa(i / 100) // 每100个文件一个一级目录
dirLevel2 := strconv.Itoa((i % 100) / 10) // 每10个文件一个二级目录
filename := fmt.Sprintf("%s%s/%s/file_%d.txt", objectPrefix, dirLevel1, dirLevel2, i)
task := uploadTask{
filename: filename,
data: generateRandomData(minFileSizeKB, maxFileSizeKB),
}
tasks <- task
}
close(tasks)
}
// 文件上传器,从通道接收任务并上传
func fileUploader(client *oss.Client, tasks <-chan uploadTask, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
reader := bytes.NewReader(task.data)
pr, err := client.PutObject(context.Background(), &oss.PutObjectRequest{
Bucket: oss.Ptr(BUCKET_NAME),
Key: oss.Ptr(task.filename),
Body: reader,
})
if err != nil && pr.StatusCode != 200 {
log.Printf("Failed to upload %s: %v\n", task.filename, err)
}
}
}
func main() {
// 创建OSS客户端
conf := oss.LoadDefaultConfig().WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
ACCESS_KEY_ID,
ACCESS_KEY_SECRET,
"",
)).WithRegion(REGION).WithEndpoint(ENDPOINT)
client := oss.NewClient(conf)
// // 创建一个带缓冲的通道
// tasks := make(chan uploadTask, totalFiles)
// var wg sync.WaitGroup
// // 启动文件生成协程
// wg.Add(1)
// go fileGenerator(tasks, &wg)
// // 启动多个文件上传协程(协程池)
// for range numWorkers {
// wg.Add(1)
// go fileUploader(client, tasks, &wg)
// }
// // 等待所有协程完成
// wg.Wait()
// log.Println("All files have been uploaded.")
// --- 第一次:全量遍历 ---
fmt.Println("--- 开始第一次全量遍历 ---")
fullListResult, err := ListObjectsInDirectory(client, BUCKET_NAME, objectPrefix, "")
if err != nil {
log.Fatalf("全量遍历失败: %v", err)
}
fmt.Printf("\n全量遍历完成共获取 %d 个文件。\n", len(fullListResult.Objects))
fmt.Printf("下一次增量遍历的Marker是: %s\n", fullListResult.NextMarker)
// 模拟将 marker 保存起来,用于下次增量遍历
nextMarker := fullListResult.NextMarker
// --- 第二次:增量遍历 ---
// 假设在两次遍历之间OSS中没有新增文件那么增量遍历应该返回0个文件
fmt.Println("\n--- 开始第二次增量遍历 ---")
incrementalResult, err := ListObjectsInDirectory(client, BUCKET_NAME, objectPrefix, nextMarker)
if err != nil {
log.Fatalf("增量遍历失败: %v", err)
}
fmt.Printf("\n增量遍历完成共获取 %d 个文件。\n", len(incrementalResult.Objects))
if len(incrementalResult.Objects) > 0 {
fmt.Printf("新的Marker是: %s\n", incrementalResult.NextMarker)
} else {
fmt.Println("没有新增文件。")
}
}
// ListObjectsResult 包含遍历到的文件列表和下一次增量遍历的起始点
type ListObjectsResult struct {
Objects []oss.ObjectProperties
NextMarker string
}
// ListObjectsInDirectory 遍历OSS指定目录下的文件
// bucketName: 存储空间名称
// directoryPrefix: 要遍历的目录,例如 "my-folder/"
// startAfterMarker: 上一次遍历结束的位置,用于增量遍历。首次全量遍历时传入空字符串""
func ListObjectsInDirectory(client *oss.Client, bucketName, directoryPrefix, startAfterMarker string) (*ListObjectsResult, error) {
// 2. 创建ListObjectsV2请求
request := &oss.ListObjectsV2Request{
Bucket: oss.Ptr(bucketName),
Prefix: oss.Ptr(directoryPrefix),
}
// 如果传入了marker则设置为增量遍历
if startAfterMarker != "" {
request.StartAfter = oss.Ptr(startAfterMarker)
}
// 3. 创建并使用分页器进行遍历
paginator := client.NewListObjectsV2Paginator(request,
func(po *oss.PaginatorOptions) { po.Limit = 200 })
var allObjects []oss.ObjectProperties
var lastObjectKey string
fmt.Println("开始遍历文件:")
for paginator.HasNext() {
page, err := paginator.NextPage(context.TODO())
if err != nil {
return nil, fmt.Errorf("获取下一页数据失败: %w", err)
}
for _, obj := range page.Contents {
fmt.Printf(" - %s\n", *obj.Key)
allObjects = append(allObjects, obj)
lastObjectKey = *obj.Key
}
}
result := &ListObjectsResult{
Objects: allObjects,
NextMarker: lastObjectKey,
}
return result, nil
}