package main import ( "context" "database/sql" "encoding/json" "fmt" "log" "strconv" "strings" "time" _ "github.com/go-sql-driver/mysql" ) func main() { dsn := "root:123456@tcp(localhost:3306)/copy_play?charset=utf8mb4&parseTime=True&loc=Local" // 使用 sql.Open() 打开数据库连接 db, err := sql.Open("mysql", dsn) if err != nil { log.Fatalf("failed to open database: %v", err) } defer db.Close() // 养成好习惯,确保连接在函数结束时关闭 // 验证数据库连接是否有效 if err := db.Ping(); err != nil { log.Fatalf("failed to connect to database: %v", err) } fmt.Println("Successfully connected to the database!") err = copy(db, context.Background(), 10900, "_") if err != nil { log.Fatalf("failed to copy data") } } type Project struct { ID uint UserID uint Name string Env string Status string State []byte } // `id` int NOT NULL AUTO_INCREMENT, // // `experience_title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '经验体主题', // `version` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '当前版本号', // `session_id` bigint NOT NULL COMMENT '所属会话的唯一ID', // `created_at` datetime NOT NULL COMMENT '创建时间', // `updated_at` datetime NOT NULL COMMENT '更新时间', // `deleted_at` datetime DEFAULT NULL COMMENT '删除时间(未删为null)', // `publish_version` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '构建版本', // `status` int DEFAULT NULL COMMENT '经验体状态 0:下线 1:已上线 2:构建中', // `description` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '经验体描述', type Experience struct { ID uint ExperienceTitle string Version *string PublishVersion *string ExpStatus sql.NullInt64 Description *string } // `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '消息ID,主键', // `user_id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '用户ID', // `project_id` int NOT NULL COMMENT '项目ID', // `chat_id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '聊天会话ID', // `content` text COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息内容', // `tree_type` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'message' COMMENT '树类型:message、taskpoint', // `source` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息来源', // `type` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息类型', // `task_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '任务id', // `parent_task_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '父任务ID', // `task_status` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '任务点状态,tree_type为message时为空,taskpoint情况下为runing或finished,状态变更不会更新该字段只会添加消息', // `event_type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'mq消息event_type字段的记录', // `metadata` json DEFAULT NULL COMMENT '消息元数据', // `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', // `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', type Message struct { ID uint MsgUserID string ChatID string Content string TreeType string Source string MsgType string TaskID *string ParentTaskID *string TaskStatus *string EventType *string Metadata *string } // `id` int NOT NULL AUTO_INCREMENT, // `system_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '系统名称', // `system_desc` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '系统描述', // `created_at` datetime NOT NULL COMMENT '创建时间', // `updated_at` datetime NOT NULL COMMENT '更新时间', // `system_url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '系统域名', // `experience_id` int DEFAULT NULL COMMENT '经验体id', // `unique_system_id` varchar(255) DEFAULT NULL COMMENT '所属的全局唯一系统id', // `system_pages_desc` longtext COMMENT '系统下所有页面描述', // `system_summary` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '系统简介', // `matched_labels` json DEFAULT NULL COMMENT '系统关联的标签', // `memo_process_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否处理备忘录,0=未处理,1=已处理', type System struct { ID uint SystemName string SystemDesc *string SystemURL string ExperienceID *uint UniqueSystemID *string SystemPagesDesc *string SystemSummary *string MatchedLabels []byte MemoProcessStatus int } // `id` int NOT NULL AUTO_INCREMENT, // `relation_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, // `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, // `taskpoint_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, // `experience_id` int NOT NULL COMMENT '所属经验体的唯一ID', // `status` tinyint DEFAULT NULL COMMENT '0:关闭,1:开启', // `version` int DEFAULT NULL, // `created_at` datetime NOT NULL COMMENT '创建时间', // `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, // `last_heartbeat_time` datetime DEFAULT NULL COMMENT '最后一次心跳', // `clean_status` int NOT NULL DEFAULT '0' COMMENT '0为清洗完毕,1为需要判断清洗,2为判断中,3为需要清洗', // `dig_status` int NOT NULL DEFAULT '0' COMMENT '0=准备挖掘,1=挖掘中', // // 根据经验体ID来查询所有的关联插件 // TODO 应该使用新的taskpoint_id type RelationPlugin struct { UserID string TaskpointID string RelationID string Status sql.NullInt64 Version sql.NullInt64 LastHeartbeatTime *time.Time CleanStatus int DigStatus int } // `id` int NOT NULL AUTO_INCREMENT COMMENT '主键ID', // // `video_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '视频ID(与l_video_events表一致)', // `relation_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '关联经验体ID(支持UUID)', // `video_path` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '视频文件路径', // `start_time` bigint DEFAULT NULL COMMENT '视频开始毫秒时间戳', // `end_time` bigint DEFAULT NULL COMMENT '视频结束毫秒时间戳', // `upload_time` datetime DEFAULT NULL COMMENT '上传时间', // `created_at` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间', // `action_sequence` json DEFAULT NULL COMMENT '动作序列', // `system_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '系统名称', type Video struct { VideoID string RelationID string VideoPath string StartTime int64 EndTime int64 UploadTime time.Time // 这是user action表的id集合 ActionSeq []byte SystemName string } // `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', // // `parse_result` json NOT NULL COMMENT '解析结果(JSON格式)', // `relation_id` varchar(64) NOT NULL COMMENT '采集订单ID(字符串)', // `session_id` bigint NOT NULL COMMENT '会话ID', // `clean_plan_id` int DEFAULT NULL COMMENT '关联的清洗计划ID', // `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', // `status` tinyint NOT NULL DEFAULT '0' COMMENT '解析任务状态:0=待处理,1=处理中,2=已完成,3=处理失败', type MonitoringLogParseResult struct { ParseResult []byte RelationID string SessionID uint CleanPlanID *uint Status int } // `id` int NOT NULL AUTO_INCREMENT, // // `extracted_subjective_knowledge_desc` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '主观知识描述', // `session_id` bigint NOT NULL COMMENT '会话ID', // `dialog_ids` json DEFAULT NULL COMMENT '关联的对话ID列表', // `experience_id` int NOT NULL COMMENT '所属经验体的唯一ID', // `version` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '所属经验体的版本号', // `created_at` datetime NOT NULL COMMENT '创建时间', // `updated_at` datetime NOT NULL COMMENT '更新时间', // `status` int DEFAULT NULL COMMENT '0:通过挖掘产生但未实践 1:通过实践 2:实践未通过', // `taskpoint_id` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '任务点id', // `relation_id` varchar(200) DEFAULT NULL, // `system_ids` json DEFAULT NULL COMMENT '系统ID列表', // `page_ids` json DEFAULT NULL COMMENT '页面ID列表', // `category_tags` varchar(255) DEFAULT NULL COMMENT '分类标签', // `is_global_associated` int DEFAULT NULL COMMENT '是否全局关联 (0:否, 1:是)', type SubjectKnowledge struct { ExtractedSubjectiveKnowledgeDesc *string SessionID uint DialogIDs []byte ExperienceID uint Version *string Status *int TaskpointID *string RelationID *string SystemIDs []byte PageIDs []byte CategoryTags *string IsGlobalAssociated *int } // `id` int NOT NULL AUTO_INCREMENT COMMENT '采集的操作信息唯一ID', // // `relation_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '插件&系统通讯关系唯一ID', // `created_at` datetime(3) NOT NULL COMMENT '创建时间(操作上报的时间)', // `updated_at` datetime(3) NOT NULL COMMENT '更新时间', // `system_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '操作所在的系统名称', // `page_url` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '操作所在页面的 URL', // `element_text` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '被操作元素的文本', // `event_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '用户执行的操作类型(如 click、input)', // `page_html_path` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '页面 HTML 存储路径(OSS 路径)', // `screenshot_path` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '页面截图存储路径(OSS 路径)', // `element_tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '操作的元素标签(如 input、button等)', // `status` varchar(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '0' COMMENT '是否参与客观知识生成(0=否,1=是)', // `extra_data` json DEFAULT NULL COMMENT '用户行为数据集合', // `user_id` int DEFAULT NULL, // `experience_id` int DEFAULT NULL COMMENT '经验体id', // `page_id` int DEFAULT NULL COMMENT '所属页面', // `system_id` int DEFAULT NULL COMMENT '所属系统', // `log_dig_status` int NOT NULL DEFAULT '0' COMMENT '0=待挖掘,1=挖掘中,2=已挖掘', // `clean_page_desc` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗', // `clean_system_desc` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗', // `clean_html_diff` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗', // `clean_elements_json` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗', // `clean_page_inputs` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗', // `clean_page_stats` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗', // `processing_delay` int DEFAULT NULL, type UserAction struct { ID uint RelationID string SystemName string PageUrl string ElementText *string EventType string PageHtmlPath *string ScreenshotPath *string ElementTag *string Status string ExtraData []byte UserID uint ExperienceID uint PageID uint SystemID uint LogDigStatus int CleanPageDesc int CleanSystemDesc int CleanHtmlDiff int CleanElementsJson int CleanPageInputs int CleanPageStats int ProcessingDelay *int } // `id` int NOT NULL AUTO_INCREMENT, // // `page_url` json NOT NULL DEFAULT (json_array()) COMMENT '页面URL(JSON 数组)', // `page_desc` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '页面说明', // `elements_json` json DEFAULT NULL COMMENT '页面元素结构', // `statistics` json DEFAULT NULL COMMENT '页面统计结果&条件关系', // `system_id` int NOT NULL COMMENT '所属系统的唯一ID', // `system_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '所属系统名称', // `created_at` datetime NOT NULL COMMENT '创建时间', // `updated_at` datetime NOT NULL COMMENT '更新时间', // `input_json` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci, // `page_summary` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '页面简介', // `clickable` json NOT NULL DEFAULT (json_array()) COMMENT '可点击元素列表,元素格式:_,JSON 数组存储', // `experience_id` int DEFAULT NULL COMMENT '经验体id', // `page_type` json DEFAULT NULL COMMENT '页面类型与分析结果', // `unique_page_id` varchar(255) DEFAULT NULL COMMENT '所属的全局唯一页面id', // `unique_system_id` varchar(255) DEFAULT NULL COMMENT '所属的全局唯一系统id', // `memo_process_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否处理备忘录,0=未处理,1=已处理', type Page struct { ID uint PageUrl []byte PageDesc string ElementsJson []byte Statistics []byte SystemID uint SystemName string InputJson *string PageSummary *string Clickable []byte PageType []byte UniquePageID *string UniqueSystemID *string MemoProcessStatus int } func copy(db *sql.DB, ctx context.Context, projectID uint, _taskPointID string) error { tx, err := db.Begin() if err != nil { log.Fatalf("failed to begin transaction: %v", err) return err } defer tx.Rollback() var project Project query := "select id, user_id,name,env,status,state from l_projects lp where lp.id = ?" row := tx.QueryRowContext(ctx, query, projectID) if err := row.Scan(&project.ID, &project.UserID, &project.Name, &project.Env, &project.Status, &project.State); err != nil { log.Fatalf("failed to scan row: %v", err) return err } oldProjectID := project.ID // 向project表插入这条查询出来的数据,然后记录返回的ID xsql := "INSERT INTO l_projects (user_id, name, env, status, state, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)" result, err := tx.ExecContext(ctx, xsql, project.UserID, project.Name, project.Env, project.Status, project.State, time.Now(), time.Now()) if err != nil { log.Fatalf("failed to insert data: %v", err) return err } newProjectID, err := result.LastInsertId() if err != nil { log.Fatalf("failed to get last insert id: %v", err) return err } println("新项目ID:", newProjectID) var experience Experience query = "select id, experience_title, version, publish_version, status, description from l_experiences le where le.session_id = ?" row = tx.QueryRowContext(ctx, query, oldProjectID) if err := row.Scan(&experience.ID, &experience.ExperienceTitle, &experience.Version, &experience.PublishVersion, &experience.ExpStatus, &experience.Description); err != nil { log.Fatalf("failed to scan row: %v", err) return err } oldExperienceID := experience.ID // 插入一条新的经验体 xsql = "INSERT INTO l_experiences (experience_title, version, session_id, created_at, updated_at, publish_version, status, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" result, err = tx.ExecContext(ctx, xsql, experience.ExperienceTitle, experience.Version, newProjectID, time.Now(), time.Now(), experience.PublishVersion, experience.ExpStatus, experience.Description) if err != nil { log.Fatalf("failed to insert data into l_experiences: %v", err) return err } // 获取新的ID newExperienceID, err := result.LastInsertId() if err != nil { log.Fatalf("failed to get last insert id for l_experiences: %v", err) } println("新经验ID:", newExperienceID) // 查询一个项目ID下面的所有消息 // TODO 这里chat——id需要使用更新后的,而不是查询得到的 // TODO task_id parent_task_id 也需要更新 query = "select id, user_id, chat_id, content, tree_type, source, type, task_id, parent_task_id, task_status, event_type, metadata from l_message lm where lm.project_id = ?" rows, err := tx.QueryContext(ctx, query, oldProjectID) if err != nil { log.Fatalf("failed to query l_message: %v", err) return err } messages := make([]Message, 0) for rows.Next() { var message Message if err := rows.Scan(&message.ID, &message.MsgUserID, &message.ChatID, &message.Content, &message.TreeType, &message.Source, &message.MsgType, &message.TaskID, &message.ParentTaskID, &message.TaskStatus, &message.EventType, &message.Metadata); err != nil { log.Fatalf("failed to scan row from l_message: %v", err) return err } messages = append(messages, message) } rows.Close() msgIDRel := make(map[uint]uint) // 单独处理消息插入,需要记录新旧ID的关系 for _, msg := range messages { xsql = "INSERT INTO l_message (user_id, project_id, chat_id, content, tree_type, source, type, task_id, parent_task_id, task_status, event_type, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" result, err = tx.ExecContext(ctx, xsql, msg.MsgUserID, newProjectID, msg.ChatID, msg.Content, msg.TreeType, msg.Source, msg.MsgType, msg.TaskID, msg.ParentTaskID, msg.TaskStatus, msg.EventType, msg.Metadata, time.Now(), time.Now()) if err != nil { log.Fatalf("failed to insert data into l_message: %v", err) return err } newMsgID, err := result.LastInsertId() if err != nil { log.Fatalf("failed to get last insert id for l_message: %v", err) return err } msgIDRel[msg.ID] = uint(newMsgID) } for oldID, newID := range msgIDRel { println("旧消息ID:", oldID, "新消息ID:", newID) } // 查询经验体ID下的系统 query = "select id, system_name, system_desc, system_url, unique_system_id, system_pages_desc, system_summary, matched_labels, memo_process_status from l_systems ls where ls.experience_id = ?" rows, err = tx.QueryContext(ctx, query, oldExperienceID) if err != nil { log.Fatalf("failed to query l_systems: %v", err) return err } systems := make([]System, 0) for rows.Next() { var system System if err := rows.Scan(&system.ID, &system.SystemName, &system.SystemDesc, &system.SystemURL, &system.UniqueSystemID, &system.SystemPagesDesc, &system.SystemSummary, &system.MatchedLabels, &system.MemoProcessStatus); err != nil { log.Fatalf("failed to scan row from l_systems: %v", err) return err } systems = append(systems, system) } rows.Close() sysRel := make(map[uint]uint) for _, sys := range systems { xsql = "INSERT INTO l_systems (system_name, system_desc, system_url, experience_id, unique_system_id, system_pages_desc, system_summary, matched_labels, memo_process_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" result, err = tx.ExecContext(ctx, xsql, sys.SystemName, sys.SystemDesc, sys.SystemURL, newExperienceID, sys.UniqueSystemID, sys.SystemPagesDesc, sys.SystemSummary, sys.MatchedLabels, sys.MemoProcessStatus, time.Now(), time.Now()) if err != nil { log.Fatalf("failed to insert data into l_systems: %v", err) return err } newSysID, err := result.LastInsertId() if err != nil { log.Fatalf("failed to get last insert id for l_systems: %v", err) return err } sysRel[sys.ID] = uint(newSysID) } for oldID, newID := range sysRel { println("旧系统ID:", oldID, "新系统ID:", newID) } query = "select relation_id, user_id, taskpoint_id, status, version, last_heartbeat_time, clean_status, dig_status from l_plugin_relation where experience_id = ?" rows, err = tx.QueryContext(ctx, query, oldExperienceID) if err != nil { log.Fatalf("failed to query l_plugin_relation: %v", err) return err } relations := make([]RelationPlugin, 0) for rows.Next() { var relation RelationPlugin if err := rows.Scan(&relation.RelationID, &relation.UserID, &relation.TaskpointID, &relation.Status, &relation.Version, &relation.LastHeartbeatTime, &relation.CleanStatus, &relation.DigStatus); err != nil { log.Fatalf("failed to scan row from l_plugin_relation: %v", err) return err } relations = append(relations, relation) } rows.Close() newRelIDs := make([]string, 0) relRelIDs := make(map[string]string) for _, rel := range relations { oldRelID := rel.RelationID newRelID := rel.RelationID + "-" + strconv.FormatUint(uint64(newProjectID), 10) xsql := "INSERT INTO l_plugin_relation (relation_id, user_id, taskpoint_id, status, version, last_heartbeat_time, experience_id, clean_status, dig_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" _, err = tx.ExecContext(ctx, xsql, newRelID, rel.UserID, rel.TaskpointID, rel.Status, rel.Version, rel.LastHeartbeatTime, newExperienceID, rel.CleanStatus, rel.DigStatus, time.Now(), time.Now()) if err != nil { log.Fatalf("failed to insert data into l_plugin_relation: %v", err) return err } println("旧关联ID:", oldRelID, "新关联ID:", newRelID) newRelIDs = append(newRelIDs, newRelID) relRelIDs[oldRelID] = newRelID // 通过关联ID来获取video的内容 query = "select video_id, video_path, start_time, end_time, upload_time, action_sequence, system_name from l_videos where relation_id = ?" rows, err = tx.QueryContext(ctx, query, oldRelID) if err != nil { log.Fatalf("failed to query l_video: %v", err) return err } videos := make([]Video, 0) for rows.Next() { var video Video if err := rows.Scan(&video.VideoID, &video.VideoPath, &video.StartTime, &video.EndTime, &video.UploadTime, &video.ActionSeq, &video.SystemName); err != nil { log.Fatalf("failed to scan row from l_video: %v", err) return err } videos = append(videos, video) } rows.Close() // video可以支持批量插入 valueStrings := make([]string, 0, len(videos)) valueArgs := make([]any, 0, len(videos)*9) // 每个项目有9个字段 for _, video := range videos { oldVideoID := video.VideoID newVideoID := oldVideoID + "-" + strconv.FormatUint(uint64(newProjectID), 10) valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?)") valueArgs = append(valueArgs, newVideoID, newRelID, video.VideoPath, video.StartTime, video.EndTime, video.UploadTime, video.ActionSeq, video.SystemName, time.Now()) } xsql = fmt.Sprintf("INSERT INTO l_videos (video_id, relation_id, video_path, start_time, end_time, upload_time, action_sequence, system_name, created_at) VALUES %s", strings.Join(valueStrings, ",")) result, err = tx.ExecContext(ctx, xsql, valueArgs...) if err != nil { log.Fatalf("failed to insert data into l_videos: %v", err) return err } x, err := result.RowsAffected() if err != nil { log.Fatalf("failed to get rows affected for l_video: %v", err) return err } println("插入视频:", x) // 根据旧的relation_ID获取monitor_parse_result的内容 query = "select parse_result, session_id, clean_plan_id, status from l_monitoring_log_parse_result where relation_id = ?" rows, err = tx.QueryContext(ctx, query, oldRelID) if err != nil { log.Fatalf("failed to query l_monitoring_log_parse_result: %v", err) return err } parses := make([]MonitoringLogParseResult, 0) for rows.Next() { var parse MonitoringLogParseResult if err := rows.Scan(&parse.ParseResult, &parse.SessionID, &parse.CleanPlanID, &parse.Status); err != nil { log.Fatalf("failed to scan row from l_monitoring_log_parse_result: %v", err) return err } parses = append(parses, parse) } rows.Close() // 使用新的relation_id和新的project_id插入新记录 valueStrings = make([]string, 0, len(parses)) valueArgs = make([]any, 0, len(parses)*6) // 每个项目有6个字段 for _, parse := range parses { valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?)") valueArgs = append(valueArgs, parse.ParseResult, newRelID, newProjectID, parse.CleanPlanID, parse.Status, time.Now()) } xsql = fmt.Sprintf("INSERT INTO l_monitoring_log_parse_result (parse_result, relation_id, session_id, clean_plan_id, status, created_at) VALUES %s", strings.Join(valueStrings, ",")) result, err = tx.ExecContext(ctx, xsql, valueArgs...) if err != nil { log.Fatalf("failed to insert data into l_monitoring_log_parse_result: %v", err) return err } x, err = result.RowsAffected() if err != nil { log.Fatalf("failed to get rows affected for l_monitoring_log_parse_result: %v", err) return err } println("插入日志解析结果:", x) } // 根据旧的经验体ID读取pages内容 query = "select id, page_url, page_desc, elements_json, statistics, system_id, system_name, input_json, page_summary, clickable, page_type, unique_page_id, unique_system_id, memo_process_status from l_pages where experience_id = ?" rows, err = tx.QueryContext(ctx, query, oldExperienceID) if err != nil { log.Fatalf("failed to query l_pages: %v", err) return err } pages := make([]Page, 0) for rows.Next() { var page Page if err := rows.Scan(&page.ID, &page.PageUrl, &page.PageDesc, &page.ElementsJson, &page.Statistics, &page.SystemID, &page.SystemName, &page.InputJson, &page.PageSummary, &page.Clickable, &page.PageType, &page.UniquePageID, &page.UniqueSystemID, &page.MemoProcessStatus); err != nil { log.Fatalf("failed to scan row from l_pages: %v", err) return err } pages = append(pages, page) } rows.Close() pageIDRel := make(map[uint]uint) actionIDRel := make(map[uint]uint) for _, page := range pages { oldPageID := page.ID // 每个page单独插入 newSysID, ok := sysRel[page.SystemID] if !ok { log.Fatalf("failed to find new system ID for old system ID: %d", page.SystemID) return fmt.Errorf("failed to find new system ID for old system ID: %d", page.SystemID) } xsql = "INSERT INTO l_pages (page_url, page_desc, elements_json, statistics, system_id, system_name, input_json, page_summary, clickable, page_type, experience_id, unique_page_id, unique_system_id, memo_process_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" result, err = tx.ExecContext(ctx, xsql, page.PageUrl, page.PageDesc, page.ElementsJson, page.Statistics, newSysID, page.SystemName, page.InputJson, page.PageSummary, page.Clickable, page.PageType, newExperienceID, page.UniquePageID, page.UniqueSystemID, page.MemoProcessStatus, time.Now(), time.Now()) if err != nil { log.Fatalf("failed to insert data into l_pages: %v", err) return err } newPageID, err := result.LastInsertId() if err != nil { log.Fatalf("failed to get rows affected for l_pages: %v", err) return err } println("插入页面:", newPageID) pageIDRel[oldPageID] = uint(newPageID) // 使用旧的PageID获取user_action表中的数据 query = "select id, relation_id, system_name, page_url, element_text, event_type, page_html_path, screenshot_path, element_tag, status, extra_data, user_id, experience_id, page_id, system_id, log_dig_status, clean_page_desc, clean_system_desc, clean_html_diff, clean_elements_json, clean_page_inputs, clean_page_stats, processing_delay from l_user_actions where page_id = ?" rows, err = tx.QueryContext(ctx, query, oldPageID) if err != nil { log.Fatalf("failed to query l_user_actions: %v", err) return err } actions := make([]UserAction, 0) for rows.Next() { var action UserAction if err := rows.Scan(&action.ID, &action.RelationID, &action.SystemName, &action.PageUrl, &action.ElementText, &action.EventType, &action.PageHtmlPath, &action.ScreenshotPath, &action.ElementTag, &action.Status, &action.ExtraData, &action.UserID, &action.ExperienceID, &action.PageID, &action.SystemID, &action.LogDigStatus, &action.CleanPageDesc, &action.CleanSystemDesc, &action.CleanHtmlDiff, &action.CleanElementsJson, &action.CleanPageInputs, &action.CleanPageStats, &action.ProcessingDelay); err != nil { log.Fatalf("failed to scan row from l_user_actions: %v", err) return err } actions = append(actions, action) } rows.Close() // 需要收集每个新插入的user_action的id,所以要一个一个插入 for _, action := range actions { oldRelID := action.RelationID newRelID := oldRelID + "-" + strconv.FormatUint(uint64(newProjectID), 10) xsql = "INSERT INTO l_user_actions (relation_id, system_name, page_url, element_text, event_type, page_html_path, screenshot_path, element_tag, status, extra_data, user_id, experience_id, page_id, system_id, log_dig_status, clean_page_desc, clean_system_desc, clean_html_diff, clean_elements_json, clean_page_inputs, clean_page_stats, processing_delay, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" result, err = tx.ExecContext(ctx, xsql, newRelID, action.SystemName, action.PageUrl, action.ElementText, action.EventType, action.PageHtmlPath, action.ScreenshotPath, action.ElementTag, action.Status, action.ExtraData, action.UserID, newExperienceID, newPageID, sysRel[action.SystemID], action.LogDigStatus, action.CleanPageDesc, action.CleanSystemDesc, action.CleanHtmlDiff, action.CleanElementsJson, action.CleanPageInputs, action.CleanPageStats, action.ProcessingDelay, time.Now(), time.Now()) if err != nil { log.Fatalf("failed to insert data into l_user_actions: %v", err) return err } x, err := result.LastInsertId() if err != nil { log.Fatalf("failed to get last insert id for l_user_actions: %v", err) return err } actionIDRel[action.ID] = uint(x) } } for old, new := range actionIDRel { fmt.Printf("旧ActionID: %d, 新ActionID: %d\n", old, new) } updateVideoIDSequence := make(map[uint][]byte) for _, newRelID := range newRelIDs { videoQuery := "SELECT id, action_sequence FROM l_videos WHERE relation_id = ?" videoRows, err := tx.QueryContext(ctx, videoQuery, newRelID) if err != nil { log.Fatalf("failed to query l_videos: %v", err) return err } for videoRows.Next() { var videoID uint var actionSequence string if err := videoRows.Scan(&videoID, &actionSequence); err != nil { log.Fatalf("failed to scan row from l_videos: %v", err) return err } // 2. 然后将action_sequence解析为json数组 var actionIDs []uint if err := json.Unmarshal([]byte(actionSequence), &actionIDs); err != nil { log.Fatalf("failed to unmarshal action_sequence: %v", err) return err } fmt.Printf("old actions IDs: %+v\n", actionIDs) // 3. 然后根据新旧ID关系来创建好新的json数组 var newActionIDs []uint for _, oldID := range actionIDs { if newID, ok := actionIDRel[oldID]; ok { newActionIDs = append(newActionIDs, newID) } } fmt.Printf("new actions IDs: %+v\n", newActionIDs) // 4. 根据ID来更新video表 newActionSequence, err := json.Marshal(newActionIDs) if err != nil { log.Fatalf("failed to marshal new action_sequence: %v", err) return err } updateVideoIDSequence[videoID] = newActionSequence } videoRows.Close() } updateVideoQuery := "UPDATE l_videos SET action_sequence = ? WHERE id = ?" for videoID, newActionSequence := range updateVideoIDSequence { if _, err := tx.ExecContext(ctx, updateVideoQuery, newActionSequence, videoID); err != nil { log.Fatalf("failed to update l_video: %v", err) return err } println("更新视频ID:", videoID, "的用户动作序列") } // 根据旧的经验体ID获取主观知识表 query = "select extracted_subjective_knowledge_desc, session_id, dialog_ids, experience_id, version, status, taskpoint_id, relation_id, system_ids, page_ids, category_tags, is_global_associated from l_extracted_subjective_knowledges where experience_id = ?" rows, err = tx.QueryContext(ctx, query, oldExperienceID) if err != nil { log.Fatalf("failed to query l_extracted_subjective_knowledges: %v", err) return err } subjectKnowledges := make([]SubjectKnowledge, 0) for rows.Next() { var sk SubjectKnowledge if err := rows.Scan(&sk.ExtractedSubjectiveKnowledgeDesc, &sk.SessionID, &sk.DialogIDs, &sk.ExperienceID, &sk.Version, &sk.Status, &sk.TaskpointID, &sk.RelationID, &sk.SystemIDs, &sk.PageIDs, &sk.CategoryTags, &sk.IsGlobalAssociated); err != nil { log.Fatalf("failed to scan row from l_extracted_subjective_knowledges: %v", err) return err } subjectKnowledges = append(subjectKnowledges, sk) } rows.Close() for _, sk := range subjectKnowledges { // 解析DialogIDs var dialogIDs []uint if err := json.Unmarshal(sk.DialogIDs, &dialogIDs); err != nil { log.Fatalf("failed to unmarshal dialog_ids: %v", err) return err } var newDialogIDs []uint for _, oldID := range dialogIDs { if newID, ok := msgIDRel[oldID]; ok { newDialogIDs = append(newDialogIDs, newID) } } newDialogIDsJson, err := json.Marshal(newDialogIDs) if err != nil { log.Fatalf("failed to marshal new dialog_ids: %v", err) return err } // 解析systemIDs var systemIDs []uint if err := json.Unmarshal(sk.SystemIDs, &systemIDs); err != nil { log.Fatalf("failed to unmarshal system_ids: %v", err) return err } var newSystemIDs []uint for _, oldID := range systemIDs { if newID, ok := sysRel[oldID]; ok { newSystemIDs = append(newSystemIDs, newID) } } newSystemIDsJson, err := json.Marshal(newSystemIDs) if err != nil { log.Fatalf("failed to marshal new system_ids: %v", err) return err } // 解析pageIDs var pageIDs []uint if err := json.Unmarshal(sk.PageIDs, &pageIDs); err != nil { log.Fatalf("failed to unmarshal page_ids: %v", err) return err } var newPageIDs []uint for _, oldID := range pageIDs { if newID, ok := pageIDRel[oldID]; ok { newPageIDs = append(newPageIDs, newID) } } newPageIDsJson, err := json.Marshal(newPageIDs) if err != nil { log.Fatalf("failed to marshal new page_ids: %v", err) return err } newRelID := relRelIDs[*sk.RelationID] xsql = "INSERT INTO l_extracted_subjective_knowledges (extracted_subjective_knowledge_desc, session_id, dialog_ids, experience_id, version, status, taskpoint_id, relation_id, system_ids, page_ids, category_tags, is_global_associated, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" result, err = tx.ExecContext(ctx, xsql, sk.ExtractedSubjectiveKnowledgeDesc, newProjectID, newDialogIDsJson, newExperienceID, sk.Version, sk.Status, sk.TaskpointID, newRelID, newSystemIDsJson, newPageIDsJson, sk.CategoryTags, sk.IsGlobalAssociated, time.Now(), time.Now()) if err != nil { log.Fatalf("failed to insert data into l_extracted_subjective_knowledges: %v", err) return err } x, err := result.RowsAffected() if err != nil { log.Fatalf("failed to get rows affected for l_extracted_subjective_knowledges: %v", err) return err } println("插入主观知识:", x) } err = tx.Commit() if err != nil { log.Fatalf("failed to commit transaction: %v", err) return err } return nil }