Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 54c624bde6 | |||
| 1a473d8b3a | |||
| 348391cd81 | |||
| 2f9d40a681 | |||
| 3368e8862f |
@ -10,5 +10,6 @@ import (
|
|||||||
|
|
||||||
// mcube
|
// mcube
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
cmd.Start()
|
cmd.Start()
|
||||||
}
|
}
|
||||||
|
|||||||
206
devcloud-mini/mflow/README.md
Normal file
206
devcloud-mini/mflow/README.md
Normal file
@ -0,0 +1,206 @@
|
|||||||
|
# 任务管理
|
||||||
|
|
||||||
|
基于K8s Job的流水线设计
|
||||||
|
|
||||||
|
+ 镜像构建:
|
||||||
|
+ [如何在Docker 中使用 Docker](https://www.6hu.cc/archives/78414.html)
|
||||||
|
+ [Build Images In Kubernetes](https://github.com/GoogleContainerTools/kaniko)
|
||||||
|
+ [kubernetes【工具】kaniko【1】【介绍】-无特权构建镜像](https://blog.csdn.net/xixihahalelehehe/article/details/121659254)
|
||||||
|
+ 镜像部署: [kubectl 工具镜像](https://hub.docker.com/r/bitnami/kubectl)
|
||||||
|
+ 虚拟机部署: [在 Docker 容器中配置 Ansible](https://learn.microsoft.com/zh-cn/azure/developer/ansible/configure-in-docker-container?tabs=azure-cli)
|
||||||
|
|
||||||
|
## 镜像构建
|
||||||
|
|
||||||
|
[最新国内镜像](https://liangyuanpeng.com/post/service-lank8s.cn/)
|
||||||
|
+ registry.k8s.io -> registry.lank8s.cn
|
||||||
|
+ gcr.io -> gcr.lank8s.cn
|
||||||
|
|
||||||
|
原生地址:
|
||||||
|
[Google Kaniko 仓库地址](https://console.cloud.google.com/gcr/images/kaniko-project/GLOBAL/executor)
|
||||||
|
|
||||||
|
转化地址:
|
||||||
|
[如何拉取gcr.io的镜像](https://github.com/anjia0532/gcr.io_mirror/search?p=1&q=kaniko&type=issues)
|
||||||
|
|
||||||
|
提前同步好的镜像:
|
||||||
|
+ [gcr.io/kaniko-project/executor:v1.23.2](https://github.com/anjia0532/gcr.io_mirror/issues/4321)
|
||||||
|
+ [gcr.io/kaniko-project/executor:v1.23.2-debug](https://github.com/anjia0532/gcr.io_mirror/issues/4068)
|
||||||
|
|
||||||
|
拉取最新版本的镜像:
|
||||||
|
```
|
||||||
|
docker pull anjia0532/kaniko-project.executor:v1.9.2
|
||||||
|
docker pull anjia0532/kaniko-project.executor:v1.9.2-debug
|
||||||
|
```
|
||||||
|
|
||||||
|
### 手动操作
|
||||||
|
|
||||||
|
启动一个deubg环境, 可以看看里面的工具(二进制可执行文件,工具的用法)
|
||||||
|
```sh
|
||||||
|
docker run -it --rm --entrypoint=/busybox/sh anjia0532/kaniko-project.executor:v1.23.2-debug
|
||||||
|
|
||||||
|
/ # ls -l /kaniko/
|
||||||
|
total 75448
|
||||||
|
-rwxr-xr-x 1 0 0 10900549 Sep 8 18:23 docker-credential-acr-env
|
||||||
|
-rwxr-xr-x 1 0 0 8981984 Sep 8 18:22 docker-credential-ecr-login
|
||||||
|
-rwxr-xr-x 1 0 0 7814415 Sep 8 18:21 docker-credential-gcr
|
||||||
|
-rwxr-xr-x 1 0 0 35250176 Sep 26 19:27 executor
|
||||||
|
drwxr-xr-x 3 0 0 4096 Sep 26 19:27 ssl
|
||||||
|
-rwxr-xr-x 1 0 0 14303232 Sep 26 19:27 warmer
|
||||||
|
/ # /kaniko/executor -h
|
||||||
|
Usage:
|
||||||
|
executor [flags]
|
||||||
|
executor [command]
|
||||||
|
|
||||||
|
Available Commands:
|
||||||
|
completion Generate the autocompletion script for the specified shell
|
||||||
|
help Help about any command
|
||||||
|
version Print the version number of kaniko
|
||||||
|
|
||||||
|
Flags:
|
||||||
|
--build-arg multi-arg type This flag allows you to pass in ARG values at build time. Set it repeatedly for multiple values.
|
||||||
|
--cache Use cache when building image
|
||||||
|
--cache-copy-layers Caches copy layers
|
||||||
|
--cache-dir string Specify a local directory to use as a cache. (default "/cache")
|
||||||
|
--cache-repo string Specify a repository to use as a cache, otherwise one will be inferred from the destination provided
|
||||||
|
--cache-run-layers Caches run layers (default true)
|
||||||
|
--cache-ttl duration Cache timeout in hours. Defaults to two weeks. (default 336h0m0s)
|
||||||
|
--cleanup Clean the filesystem at the end
|
||||||
|
--compressed-caching Compress the cached layers. Decreases build time, but increases memory usage. (default true)
|
||||||
|
-c, --context string Path to the dockerfile build context. (default "/workspace/")
|
||||||
|
--context-sub-path string Sub path within the given context.
|
||||||
|
--custom-platform string Specify the build platform if different from the current host
|
||||||
|
--customPlatform string This flag is deprecated. Please use '--custom-platform'.
|
||||||
|
-d, --destination multi-arg type Registry the final image should be pushed to. Set it repeatedly for multiple destinations.
|
||||||
|
--digest-file string Specify a file to save the digest of the built image to.
|
||||||
|
-f, --dockerfile string Path to the dockerfile to be built. (default "Dockerfile")
|
||||||
|
--force Force building outside of a container
|
||||||
|
--force-build-metadata Force add metadata layers to build image
|
||||||
|
--git gitoptions Branch to clone if build context is a git repository (default branch=,single-branch=false,recurse-submodules=false)
|
||||||
|
-h, --help help for executor
|
||||||
|
--ignore-path multi-arg type Ignore these paths when taking a snapshot. Set it repeatedly for multiple paths.
|
||||||
|
--ignore-var-run Ignore /var/run directory when taking image snapshot. Set it to false to preserve /var/run/ in destination image. (default true)
|
||||||
|
--image-fs-extract-retry int Number of retries for image FS extraction
|
||||||
|
--image-name-tag-with-digest-file string Specify a file to save the image name w/ image tag w/ digest of the built image to.
|
||||||
|
--image-name-with-digest-file string Specify a file to save the image name w/ digest of the built image to.
|
||||||
|
--insecure Push to insecure registry using plain HTTP
|
||||||
|
--insecure-pull Pull from insecure registry using plain HTTP
|
||||||
|
--insecure-registry multi-arg type Insecure registry using plain HTTP to push and pull. Set it repeatedly for multiple registries.
|
||||||
|
--kaniko-dir string Path to the kaniko directory, this takes precedence over the KANIKO_DIR environment variable. (default "/kaniko")
|
||||||
|
--label multi-arg type Set metadata for an image. Set it repeatedly for multiple labels.
|
||||||
|
--log-format string Log format (text, color, json) (default "color")
|
||||||
|
--log-timestamp Timestamp in log output
|
||||||
|
--no-push Do not push the image to the registry
|
||||||
|
--no-push-cache Do not push the cache layers to the registry
|
||||||
|
--oci-layout-path string Path to save the OCI image layout of the built image.
|
||||||
|
--push-retry int Number of retries for the push operation
|
||||||
|
--registry-certificate key-value-arg type Use the provided certificate for TLS communication with the given registry. Expected format is 'my.registry.url=/path/to/the/server/certificate'.
|
||||||
|
--registry-mirror multi-arg type Registry mirror to use as pull-through cache instead of docker.io. Set it repeatedly for multiple mirrors.
|
||||||
|
--reproducible Strip timestamps out of the image to make it reproducible
|
||||||
|
--single-snapshot Take a single snapshot at the end of the build.
|
||||||
|
--skip-tls-verify Push to insecure registry ignoring TLS verify
|
||||||
|
--skip-tls-verify-pull Pull from insecure registry ignoring TLS verify
|
||||||
|
--skip-tls-verify-registry multi-arg type Insecure registry ignoring TLS verify to push and pull. Set it repeatedly for multiple registries.
|
||||||
|
--skip-unused-stages Build only used stages if defined to true. Otherwise it builds by default all stages, even the unnecessaries ones until it reaches the target stage / end of Dockerfile
|
||||||
|
--snapshot-mode string Change the file attributes inspected during snapshotting (default "full")
|
||||||
|
--snapshotMode string This flag is deprecated. Please use '--snapshot-mode'.
|
||||||
|
--tar-path string Path to save the image in as a tarball instead of pushing
|
||||||
|
--tarPath string This flag is deprecated. Please use '--tar-path'.
|
||||||
|
--target string Set the target build stage to build
|
||||||
|
--use-new-run Use the experimental run implementation for detecting changes without requiring file system snapshots.
|
||||||
|
-v, --verbosity string Log level (trace, debug, info, warn, error, fatal, panic) (default "info")
|
||||||
|
|
||||||
|
Use "executor [command] --help" for more information about a command.
|
||||||
|
```
|
||||||
|
|
||||||
|
手动挂载并执行构建:
|
||||||
|
```sh
|
||||||
|
# 挂在项目到workspace目录下, 注意指定工作目录:/workspace
|
||||||
|
docker run -it -v ${PWD}/mflow:/workspace -w /workspace --entrypoint=/busybox/sh docker.io/anjia0532/kaniko-project.executor:v1.9.2-debug
|
||||||
|
# 执行构建
|
||||||
|
/kaniko/executor --no-push
|
||||||
|
```
|
||||||
|
|
||||||
|
### 基于k8s操作
|
||||||
|
|
||||||
|
[使用git工具镜像下载依赖](https://hub.docker.com/r/bitnami/git)
|
||||||
|
```sh
|
||||||
|
docker pull bitnami/git
|
||||||
|
```
|
||||||
|
|
||||||
|
测试下能否正常使用
|
||||||
|
```sh
|
||||||
|
# 挂载secret
|
||||||
|
# docker run -it -v ${HOME}/.ssh/:/root/.ssh -w /workspace bitnami/git
|
||||||
|
docker run --rm -it -v ${HOME}/.ssh/:/root/.ssh -w /workspace registry.cn-hangzhou.aliyuncs.com/godev/git:2.39.2
|
||||||
|
# 测试下载, 关于更多git参数说明请参考看: https://git-scm.com/docs/git-config
|
||||||
|
GIT_SSH_COMMAND='ssh -i ssh -i ./id_rsa.pub -o StrictHostKeyChecking=no' git clone git@github.com:infraboard/mpaas.git src --single-branch --branch=master
|
||||||
|
```
|
||||||
|
|
||||||
|
创建代码拉取的secret, 可以参考: [use-case-pod-with-ssh-keys](https://kubernetes.io/zh-cn/docs/concepts/configuration/secret/#use-case-pod-with-ssh-keys)
|
||||||
|
```
|
||||||
|
kubectl create secret generic git-ssh-key --from-file=id_rsa=${HOME}/.ssh/id_rsa
|
||||||
|
```
|
||||||
|
|
||||||
|
创建镜像推送的secret, [Pushing to Docker Hub](https://github.com/GoogleContainerTools/kaniko#pushing-to-docker-hub) 推送至指定远端镜像仓库须要credential的支持,因此须要将credential以secret的方式挂载到/kaniko/.docker/这个目录下,文件名称为config.json,内容以下:
|
||||||
|
[](./impl/test/kaniko_config_example.json)
|
||||||
|
|
||||||
|
YWRtaW46SGFyYm9yMTIzNDUK是通过registry用户名与密码以下命令获取
|
||||||
|
```sh
|
||||||
|
$ echo -n admin:Harbor12345 | base64
|
||||||
|
YWRtaW46SGFyYm9yMTIzNDUK
|
||||||
|
```
|
||||||
|
|
||||||
|
手动挂载并测试能否推送:
|
||||||
|
```sh
|
||||||
|
# 挂在项目到workspace目录下, 注意指定工作目录:/workspace
|
||||||
|
docker run --rm -it -v ${HOME}/Projects/inforboard/mflow:/workspace -v ${HOME}/Projects/inforboard/mflow/apps/job/impl/test/config.json:/kaniko/.docker/config.json -w /workspace --entrypoint=/busybox/sh registry.cn-hangzhou.aliyuncs.com/godev/kaniko-project.executor:v1.9.2-debug
|
||||||
|
# 执行构建
|
||||||
|
/kaniko/executor --cache=true --cache-repo=registry.cn-hangzhou.aliyuncs.com/build_cache/mpaas --compressed-caching=false --destination=registry.cn-hangzhou.aliyuncs.com/infraboard/mpaas:v0.0.1
|
||||||
|
```
|
||||||
|
|
||||||
|
最后创建secret
|
||||||
|
```sh
|
||||||
|
$ kubectl create secret generic kaniko-secret --from-file=apps/job/impl/test/config.json
|
||||||
|
secret/kaniko-secret created
|
||||||
|
|
||||||
|
$ kubectl get secret kaniko-secret
|
||||||
|
NAME TYPE DATA AGE
|
||||||
|
kaniko-secret Opaque 1 23s
|
||||||
|
```
|
||||||
|
|
||||||
|
共享配置Job共享Workdir:
|
||||||
|
[](./impl/test/build.yml)
|
||||||
|
|
||||||
|
## 镜像部署
|
||||||
|
|
||||||
|
[k8s 应用部署相关文档](https://kubernetes.io/docs/concepts/workloads/controllers/)
|
||||||
|
|
||||||
|
拉取工具镜像
|
||||||
|
```
|
||||||
|
docker pull bitnami/kubectl
|
||||||
|
```
|
||||||
|
|
||||||
|
本地测试
|
||||||
|
```sh
|
||||||
|
docker run -it -v ~/.kube/config:/.kube/config bitnami/kubectl get ns
|
||||||
|
```
|
||||||
|
|
||||||
|
k8s支持远程访问部署配置, 比如:
|
||||||
|
```
|
||||||
|
kubectl apply -f https://k8s.io/examples/controllers/nginx-deployment.yaml
|
||||||
|
kubectl apply -f http://localhost:8080/api/mflow/v1/export/deploys/cfrcv8ea0brnte3v3jc0
|
||||||
|
```
|
||||||
|
|
||||||
|
只更新版本
|
||||||
|
```
|
||||||
|
# 更新镜像版本
|
||||||
|
kubectl set image deployment/nginx busybox=busybox nginx=nginx:1.9.1
|
||||||
|
# 补充任务标签
|
||||||
|
kubectl annotate deployments cmdb-deployment task.mflow.inforboar.io/id="test" --overwrite
|
||||||
|
```
|
||||||
|
|
||||||
|
如果执行失败,也可以收到试试命令
|
||||||
|
```
|
||||||
|
kubectl set image deployment/cmdb-deployment cmdb-deployment=busybox:1.30
|
||||||
|
kubectl annotate deployments cmdb task.mflow.inforboar.io/id=cfthf85s99bgu9olqr50
|
||||||
|
```
|
||||||
|
|
||||||
356
devcloud-mini/mflow/flow.md
Normal file
356
devcloud-mini/mflow/flow.md
Normal file
@ -0,0 +1,356 @@
|
|||||||
|
# 流程解读
|
||||||
|
|
||||||
|
|
||||||
|
## 执行任务
|
||||||
|
|
||||||
|
mflow/apps/task/impl/job_task.go
|
||||||
|
|
||||||
|
1. RunJob
|
||||||
|
```go
|
||||||
|
func (i *impl) RunJob(ctx context.Context, in *pipeline.Task) (
|
||||||
|
*task.JobTask, error) {
|
||||||
|
ins := task.NewJobTask(in)
|
||||||
|
|
||||||
|
// 获取之前任务的状态, 因为里面有当前任务的审核状态
|
||||||
|
err := i.GetJotTaskStatus(ctx, ins)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 开启审核后, 执行任务则 调整审核状态为等待中
|
||||||
|
if ins.Spec.Audit.Enable {
|
||||||
|
auditStatus := ins.AuditStatus()
|
||||||
|
switch auditStatus {
|
||||||
|
case pipeline.AUDIT_STAGE_PENDDING:
|
||||||
|
ins.AuditStatusFlowTo(pipeline.AUDIT_STAGE_WAITING)
|
||||||
|
case pipeline.AUDIT_STAGE_DENY:
|
||||||
|
return nil, fmt.Errorf("任务审核未通过")
|
||||||
|
}
|
||||||
|
i.log.Debug().Msgf("任务: %s 审核状态为, %s", ins.Spec.TaskId, auditStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果不忽略执行, 并且审核通过, 则执行
|
||||||
|
if in.Enabled() && ins.AuditPass() {
|
||||||
|
// 任务状态检查与处理
|
||||||
|
switch ins.Status.Stage {
|
||||||
|
case task.STAGE_PENDDING:
|
||||||
|
ins.Status.Stage = task.STAGE_CREATING
|
||||||
|
case task.STAGE_ACTIVE:
|
||||||
|
return nil, exception.NewConflict("任务: %s 当前处于运行中, 需要等待运行结束后才能执行", in.TaskId)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 查询需要执行的Job: docker_build@v1
|
||||||
|
req := job.NewDescribeJobRequestByName(in.JobName)
|
||||||
|
|
||||||
|
j, err := i.job.DescribeJob(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ins.Job = j
|
||||||
|
ins.Spec.JobId = j.Meta.Id
|
||||||
|
i.log.Info().Msgf("describe job success, %s[%s]", j.Spec.Name, j.Meta.Id)
|
||||||
|
|
||||||
|
// 脱敏参数动态还原
|
||||||
|
in.RunParams.RestoreSensitive(j.Spec.RunParams)
|
||||||
|
|
||||||
|
// 合并允许参数(Job里面有默认值), 并检查参数合法性
|
||||||
|
// 注意Param的合并是有顺序的,也就是参数优先级(低-->高):
|
||||||
|
// 1. 系统变量(默认禁止修改)
|
||||||
|
// 2. job默认变量
|
||||||
|
// 3. job运行变量
|
||||||
|
// 4. pipeline 运行变量
|
||||||
|
// 5. pipeline 运行时变量
|
||||||
|
params := job.NewRunParamSet()
|
||||||
|
params.Add(ins.SystemRunParam()...)
|
||||||
|
params.Add(j.Spec.RunParams.Params...)
|
||||||
|
params.Merge(in.RunParams.Params...)
|
||||||
|
err = i.LoadPipelineRunParam(ctx, in, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 校验参数合法性
|
||||||
|
err = params.Validate()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("校验任务【%s】参数错误, %s", j.Spec.DisplayName, err)
|
||||||
|
}
|
||||||
|
i.log.Info().Msgf("params check ok, %s", params)
|
||||||
|
|
||||||
|
// 获取执行器执行
|
||||||
|
r := runner.GetRunner(j.Spec.RunnerType)
|
||||||
|
runReq := task.NewRunTaskRequest(ins.Spec.TaskId, j.Spec.RunnerSpec, params)
|
||||||
|
runReq.DryRun = in.RunParams.DryRun
|
||||||
|
runReq.Labels = in.Labels
|
||||||
|
runReq.ManualUpdateStatus = j.Spec.ManualUpdateStatus
|
||||||
|
|
||||||
|
i.log.Debug().Msgf("[%s] start run task: %s", ins.Spec.PipelineTask, in.TaskName)
|
||||||
|
status, err := r.Run(ctx, runReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("run job error, %s", err)
|
||||||
|
}
|
||||||
|
status.RunParams = params
|
||||||
|
ins.Status = status
|
||||||
|
|
||||||
|
// 添加搜索标签
|
||||||
|
ins.BuildSearchLabel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 保存任务
|
||||||
|
updateOpt := options.Update()
|
||||||
|
updateOpt.SetUpsert(true)
|
||||||
|
if _, err := i.jcol.UpdateByID(ctx, ins.Spec.TaskId, bson.M{"$set": ins}, updateOpt); err != nil {
|
||||||
|
return nil, exception.NewInternalServerError("upsert a job task document error, %s", err)
|
||||||
|
}
|
||||||
|
return ins, nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 任务状态更新
|
||||||
|
|
||||||
|
```go
|
||||||
|
// 更新Job状态
|
||||||
|
func (i *impl) UpdateJobTaskStatus(ctx context.Context, in *task.UpdateJobTaskStatusRequest) (
|
||||||
|
*task.JobTask, error) {
|
||||||
|
ins, err := i.DescribeJobTask(ctx, task.NewDescribeJobTaskRequest(in.Id))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 校验更新合法性
|
||||||
|
err = i.CheckAllowUpdate(ctx, ins, in.UpdateToken, in.ForceUpdateStatus)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
i.log.Debug().Msgf("更新任务状态: %s,当前状态: %s, 更新状态: %s",
|
||||||
|
ins.Spec.TaskId, ins.Status.Stage, in.Stage)
|
||||||
|
// 状态更新
|
||||||
|
ins.Status.UpdateStatus(in)
|
||||||
|
|
||||||
|
// 更新数据库
|
||||||
|
if err := i.updateJobTaskStatus(ctx, ins); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Job Task状态变更回调
|
||||||
|
i.JobTaskStatusChangedCallback(ctx, ins)
|
||||||
|
|
||||||
|
// Pipeline Task 状态变更回调
|
||||||
|
if ins.Spec.PipelineTask != "" {
|
||||||
|
// 如果状态未变化, 不触发流水线更新
|
||||||
|
if !in.ForceTriggerPipeline && !ins.Status.Changed {
|
||||||
|
i.log.Debug().Msgf("task %s status not changed: [%s], skip update pipeline", in.Id, in.Stage)
|
||||||
|
return ins, nil
|
||||||
|
}
|
||||||
|
_, err := i.PipelineTaskStatusChanged(ctx, ins)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ins, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *impl) JobTaskStatusChangedCallback(ctx context.Context, in *task.JobTask) {
|
||||||
|
// 状态未变化不通知
|
||||||
|
if in.Status == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 状态未变化不通知
|
||||||
|
if !in.Status.Changed {
|
||||||
|
i.log.Debug().Msgf("task %s status not changed [%s], skip status callback", in.Spec.TaskId, in.Status.Stage)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
i.log.Debug().Msgf("task %s 执行状态变化回调...", in.Spec.TaskId)
|
||||||
|
|
||||||
|
// 个人通知
|
||||||
|
for index := range in.Spec.MentionUsers {
|
||||||
|
mu := in.Spec.MentionUsers[index]
|
||||||
|
i.TaskMention(ctx, mu, in)
|
||||||
|
}
|
||||||
|
if len(in.Spec.MentionUsers) > 0 {
|
||||||
|
i.updateJobTaskMentionUser(ctx, in.Spec.TaskId, in.Spec.MentionUsers)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 群组通知
|
||||||
|
imRobotHooks := in.Spec.MatchedImRobotNotify(in.Status.Stage.String())
|
||||||
|
i.log.Debug().Msgf("task %s 群组通知: %v", in.Spec.TaskId, imRobotHooks)
|
||||||
|
i.hook.SendTaskStatus(ctx, imRobotHooks, in)
|
||||||
|
if len(imRobotHooks) > 0 {
|
||||||
|
i.updateJobTaskImRobotNotify(ctx, in.Spec.TaskId, imRobotHooks)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WebHook回调
|
||||||
|
webhooks := in.Spec.MatchedWebHooks(in.Status.Stage.String())
|
||||||
|
i.log.Debug().Msgf("task %s WebHook通知: %v", in.Spec.TaskId, webhooks)
|
||||||
|
i.hook.SendTaskStatus(ctx, webhooks, in)
|
||||||
|
if len(webhooks) > 0 {
|
||||||
|
i.updateJobTaskWebHook(ctx, in.Spec.TaskId, webhooks)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Pipeline 事件触发
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Pipeline中任务有变化时,
|
||||||
|
// 如果执行成功则 继续执行, 如果失败则标记Pipeline结束
|
||||||
|
// 当所有任务成功结束时标记Pipeline执行成功
|
||||||
|
func (i *impl) PipelineTaskStatusChanged(ctx context.Context, in *task.JobTask) (
|
||||||
|
*task.PipelineTask, error) {
|
||||||
|
if in == nil || in.Status == nil {
|
||||||
|
return nil, exception.NewBadRequest("job task or job task status is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if in.Spec.PipelineTask == "" {
|
||||||
|
return nil, exception.NewBadRequest("Pipeline Id参数缺失")
|
||||||
|
}
|
||||||
|
|
||||||
|
runErrorJobTasks := []*task.UpdateJobTaskStatusRequest{}
|
||||||
|
// 获取Pipeline Task, 因为Job Task是先保存在触发的回调, 这里获取的Pipeline Task是最新的
|
||||||
|
descReq := task.NewDescribePipelineTaskRequest(in.Spec.PipelineTask)
|
||||||
|
p, err := i.DescribePipelineTask(ctx, descReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// 更新当前任务的pipeline task状态
|
||||||
|
i.mustUpdatePipelineStatus(ctx, p)
|
||||||
|
|
||||||
|
// 如果JobTask正常执行, 则等待回调更新, 如果执行失败 则需要立即更新JobTask状态
|
||||||
|
for index := range runErrorJobTasks {
|
||||||
|
_, err = i.UpdateJobTaskStatus(ctx, runErrorJobTasks[index])
|
||||||
|
if err != nil {
|
||||||
|
p.MarkedFailed(err)
|
||||||
|
i.mustUpdatePipelineStatus(ctx, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 更新Pipeline Task 运行时环境变量
|
||||||
|
p.Status.RuntimeEnvs.Merge(in.RuntimeRunParams()...)
|
||||||
|
|
||||||
|
switch in.Status.Stage {
|
||||||
|
case task.STAGE_PENDDING,
|
||||||
|
task.STAGE_SCHEDULING,
|
||||||
|
task.STAGE_CREATING,
|
||||||
|
task.STAGE_ACTIVE,
|
||||||
|
task.STAGE_CANCELING:
|
||||||
|
// Task状态无变化
|
||||||
|
return p, nil
|
||||||
|
case task.STAGE_CANCELED:
|
||||||
|
// 任务取消, pipeline 取消执行
|
||||||
|
p.MarkedCanceled()
|
||||||
|
return p, nil
|
||||||
|
case task.STAGE_FAILED:
|
||||||
|
// 任务执行结果确认失败
|
||||||
|
if in.IsConfirmEnabled() && in.IsConfirming() {
|
||||||
|
// 状态确认中
|
||||||
|
i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName)
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
// 任务执行失败, 更新Pipeline状态为失败
|
||||||
|
if !in.Spec.RunParams.IgnoreFailed {
|
||||||
|
p.MarkedFailed(in.Status.MessageToError())
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
case task.STAGE_SUCCEEDED:
|
||||||
|
// 任务执行结果确认失败
|
||||||
|
if in.IsConfirmEnabled() && in.IsConfirming() {
|
||||||
|
// 状态确认中
|
||||||
|
i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName)
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
// 任务运行成功, pipeline继续执行
|
||||||
|
i.log.Info().Msgf("task: %s run successed", in.Spec.TaskId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
task执行成功或者忽略执行失败, 此时pipeline 仍然处于运行中, 需要获取下一个任务执行
|
||||||
|
*/
|
||||||
|
nexts, err := p.NextRun()
|
||||||
|
if err != nil {
|
||||||
|
p.MarkedFailed(err)
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果没有需要执行的任务, Pipeline执行结束, 更新Pipeline状态为成功
|
||||||
|
if nexts == nil || nexts.Len() == 0 {
|
||||||
|
p.MarkedSuccess()
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果有需要执行的JobTask, 继续执行
|
||||||
|
for index := range nexts.Items {
|
||||||
|
item := nexts.Items[index]
|
||||||
|
// 如果任务执行成功则等待任务的回调更新任务状态
|
||||||
|
// 如果任务执行失败, 直接更新任务状态
|
||||||
|
t, err := i.RunJob(ctx, item.Spec)
|
||||||
|
if err != nil {
|
||||||
|
updateT := task.NewUpdateJobTaskStatusRequest(item.Spec.TaskId)
|
||||||
|
updateT.UpdateToken = item.Spec.UpdateToken
|
||||||
|
updateT.MarkError(err)
|
||||||
|
runErrorJobTasks = append(runErrorJobTasks, updateT)
|
||||||
|
} else {
|
||||||
|
item.Status = t.Status
|
||||||
|
item.Job = t.Job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
如何获取下一组需要执行的任务: 任务执行调度
|
||||||
|
```go
|
||||||
|
// 返回下个需要执行的JobTask, 允许一次并行执行多个(批量执行)
|
||||||
|
// Task DryRun属性要继承PipelineTask
|
||||||
|
func (p *PipelineTask) NextRun() (*JobTaskSet, error) {
|
||||||
|
set := NewJobTaskSet()
|
||||||
|
var stage *StageStatus
|
||||||
|
|
||||||
|
if p.Status == nil || p.Pipeline == nil {
|
||||||
|
return set, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 需要未执行完成的Job Tasks
|
||||||
|
stages := p.Status.StageStatus
|
||||||
|
for i := range stages {
|
||||||
|
stage = stages[i]
|
||||||
|
|
||||||
|
// 找出Stage中未执行完的Job Task
|
||||||
|
set = stage.UnCompleteJobTask(p.Params.DryRun)
|
||||||
|
set.UpdateFromPipelineTask(p)
|
||||||
|
// 如果找到 直接Break
|
||||||
|
if set.Len() > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果所有Stage寻找完,都没找到, 表示PipelineTask执行完成
|
||||||
|
if set.Len() == 0 {
|
||||||
|
return set, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: 如果这些未执行当中的Job Task 有处于运行中的, 不会执行下个一个任务
|
||||||
|
if set.HasStage(STAGE_ACTIVE) {
|
||||||
|
return set, exception.NewConflict("Stage 还处于运行中")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 当未执行的任务中,没有运行中的时,剩下的就是需要被执行的任务
|
||||||
|
tasks := set.GetJobTaskByStage(STAGE_PENDDING)
|
||||||
|
|
||||||
|
nextTasks := NewJobTaskSet()
|
||||||
|
stageSpec := p.Pipeline.GetStage(stage.Name)
|
||||||
|
if stageSpec.IsParallel {
|
||||||
|
// 并行任务 返回该Stage所有等待执行的job
|
||||||
|
nextTasks.Add(tasks...)
|
||||||
|
} else {
|
||||||
|
// 串行任务取第一个
|
||||||
|
nextTasks.Add(tasks[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
return nextTasks.UpdateFromPipelineTask(p), nil
|
||||||
|
}
|
||||||
|
```
|
||||||
@ -1 +0,0 @@
|
|||||||
# 微服务研发云(发布+流水线)
|
|
||||||
47
skills/interview/README.md
Normal file
47
skills/interview/README.md
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
# 面试
|
||||||
|
|
||||||
|
## 项目准备
|
||||||
|
|
||||||
|
基础课: 能够自己写项目, 不能去问很基础的技术问题, 你需要问题的业务问题,一定要让人知道你有做项目的基础技术
|
||||||
|
|
||||||
|
关于你基础知识的加分项:
|
||||||
|
1. 博客
|
||||||
|
2. 项目: 代码,基础的练习代码 (基本的算法模型, 基本工具)
|
||||||
|
3. 准备一个 吃透的 基本原理
|
||||||
|
1. Go调度模型,其他语言是怎么设计(k8s 调度器), 如果理解后 融合到自己的项目中去,比如Job执行系统
|
||||||
|
2. 内存分配, Go语言的内存具体分配情况, 对比Rust, 内存泄露,如何自己操作内存, 比例都次copy,节省性能开销 "8" -> 8, 描述转化过程,描述内存开销 var a string = "8" 4byte
|
||||||
|
4. 提前涮一些算法, 性能优化: 去掉无用操作, (10 io, 1 io), 避免对象的重复分配
|
||||||
|
|
||||||
|
项目课:
|
||||||
|
1. book api, 脚本项目: 可用使用Go来开发一些 基本工具(运维)
|
||||||
|
2. 接口开发能力, crud项目的开发能力, vblog 项目, 合理组织工程架构
|
||||||
|
3. 微服务开发能力: rpc, grpc, kafka, 缓存,中央化认证, ...
|
||||||
|
4. 业务分析能力: 如何拆分业务模块, 业务定义(需要收集和分析)
|
||||||
|
|
||||||
|
## 面试
|
||||||
|
|
||||||
|
+ 筛选公司, 针对你筛选公司,做简历定制化, 做公司背调, 了解公司的产品
|
||||||
|
+ 把JD 需要的技能加强,不需要的做精简
|
||||||
|
+ 了解公司的产品, 非技术面 共同的话题
|
||||||
|
|
||||||
|
+ 简历的编写
|
||||||
|
+ 后端: 工程体现, 架构, 开发方向
|
||||||
|
+ Web开发(API, 类似单体服务, 初级岗位, RESTful接口)
|
||||||
|
+ 微服务开发(中台, MQ, RPC/GRPC)
|
||||||
|
+ 前端
|
||||||
|
+ 大前端(h5开发): pc/小程序/web
|
||||||
|
+ Web开发(JS体系, vue2/3, react, js/css)
|
||||||
|
+ 专业前端(ios/andriod)
|
||||||
|
+ 全栈: 人员不够, 团队初期
|
||||||
|
+ 一个人 就是一个团队 (运维开发)
|
||||||
|
+ 前端(vue3) + 后端(单体/分布式)
|
||||||
|
|
||||||
|
+ 投递简历:
|
||||||
|
+ 晚上8点过后投递
|
||||||
|
+ 面试 不要提前(5 ~ 10分钟)
|
||||||
|
|
||||||
|
+ 面试: 随机性很大(面试官因数很大)
|
||||||
|
+ 面试前 多做准备(提前1个月刷算法题)
|
||||||
|
+ 多面,多总结(好的面试都能发现自己的不足,然后提升自己)
|
||||||
|
+ 面试官和眼缘(职位的等级匹配)
|
||||||
|
+ 没有合适的机会的话,是可以去外部(转行,确的是实践经验(1~2年,再找下一份工作会非常容易))
|
||||||
12
skills/interview/basic_test.go
Normal file
12
skills/interview/basic_test.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package interview_test
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
// 变长 1 ~ 3
|
||||||
|
func TestBy(t *testing.T) {
|
||||||
|
|
||||||
|
// int64 (1)
|
||||||
|
// int8(1)
|
||||||
|
t.Log([]byte("1024"))
|
||||||
|
// int16 1024
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user